1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include <stdio.h>
19 #include <string.h>
20 #include <stdint.h>
21 #include <stdlib.h>
22 
23 #include "staleblock.h"
24 #include "btreeblock.h"
25 #include "list.h"
26 #include "docio.h"
27 #include "fdb_internal.h"
28 #include "version.h"
29 #include "time_utils.h"
30 
31 #ifdef _DOC_COMP
32 #include "snappy-c.h"
33 #endif
34 
35 #include "memleak.h"
36 
37 static bool compress_inmem_stale_info = true;
38 
_inmem_stale_cmp(struct avl_node *a, struct avl_node *b, void *aux)39 static int _inmem_stale_cmp(struct avl_node *a, struct avl_node *b, void *aux)
40 {
41     struct stale_info_commit *aa, *bb;
42     aa = _get_entry(a, struct stale_info_commit, avl);
43     bb = _get_entry(b, struct stale_info_commit, avl);
44 
45     if (aa->revnum < bb->revnum) {
46         return -1;
47     } else if (aa->revnum > bb->revnum) {
48         return 1;
49     } else {
50         return 0;
51     }
52 }
53 
fdb_add_inmem_stale_info(fdb_kvs_handle *handle, filemgr_header_revnum_t revnum, struct docio_object *doc, uint64_t doc_offset, bool system_doc_only)54 static void fdb_add_inmem_stale_info(fdb_kvs_handle *handle,
55                                      filemgr_header_revnum_t revnum,
56                                      struct docio_object *doc,
57                                      uint64_t doc_offset,
58                                      bool system_doc_only)
59 {
60     int ret;
61     size_t buflen = 0;
62     struct filemgr *file = handle->file;
63     struct avl_node *a;
64     struct stale_info_commit *item, query;
65     struct stale_info_entry *entry;
66 
67     // search using revnum first
68     query.revnum = revnum;
69     a = avl_search(&file->stale_info_tree, &query.avl, _inmem_stale_cmp);
70     if (a) {
71         // already exist
72         item = _get_entry(a, struct stale_info_commit, avl);
73     } else {
74         // not exist .. create a new one and insert into tree
75         item = (struct stale_info_commit *)calloc(1, sizeof(struct stale_info_commit));
76         item->revnum = revnum;
77         list_init(&item->doc_list);
78         avl_insert(&file->stale_info_tree, &item->avl, _inmem_stale_cmp);
79         file->stale_info_tree_loaded.store(true, std::memory_order_relaxed);
80     }
81 
82     entry = (struct stale_info_entry *)calloc(1, sizeof(struct stale_info_entry));
83 
84     if (!system_doc_only) {
85 #ifdef _DOC_COMP
86         if (compress_inmem_stale_info) {
87             buflen = snappy_max_compressed_length(doc->length.bodylen);;
88             entry->ctx = (void *)calloc(1, buflen);
89             ret = snappy_compress((char*)doc->body, doc->length.bodylen,
90                 (char*)entry->ctx, &buflen);
91             if (ret != 0) {
92                 fdb_log(NULL, FDB_RESULT_COMPRESSION_FAIL,
93                     "(fdb_add_inmem_stale_info) "
94                     "Compression error from a database file '%s'"
95                     ": return value %d, header revnum %" _F64 ", "
96                     "doc offset %" _F64 "\n",
97                     file->filename, ret, revnum, doc_offset);
98                 if (!a) {
99                     // 'item' is allocated in this function call.
100                     avl_remove(&file->stale_info_tree, &item->avl);
101                     free(item);
102                 }
103                 free(entry);
104                 return;
105             }
106         } else {
107             buflen = doc->length.bodylen;
108             entry->ctx = (void *)calloc(1, buflen);
109             memcpy(entry->ctx, doc->body, doc->length.bodylen);
110         }
111 #else
112         buflen = doc->length.bodylen;
113         entry->ctx = (void *)calloc(1, buflen);
114         memcpy(entry->ctx, doc->body, doc->length.bodylen);
115 #endif
116 
117         entry->ctxlen = doc->length.bodylen;
118 
119     } else {
120         // when 'system_doc_only' flag is set, just set to NULL.
121         // we need the doc's length and offset info only.
122         entry->ctx = NULL;
123         entry->ctxlen = 0;
124     }
125 
126     entry->comp_ctxlen = buflen;
127     entry->doclen = _fdb_get_docsize(doc->length);
128     entry->offset = doc_offset;
129     list_push_back(&item->doc_list, &entry->le);
130 }
131 
fdb_load_inmem_stale_info(fdb_kvs_handle *handle)132 void fdb_load_inmem_stale_info(fdb_kvs_handle *handle)
133 {
134     uint8_t keybuf[64];
135     int64_t ret;
136     bid_t offset, _offset, prev_offset;
137     filemgr_header_revnum_t revnum, _revnum;
138     btree_iterator bit;
139     btree_result br;
140     struct filemgr *file = handle->file;
141     struct docio_object doc;
142     bool expected = false;
143 
144     if (!std::atomic_compare_exchange_strong(&file->stale_info_tree_loaded,
145                                              &expected, true)) {
146         // stale info is already loaded (fast screening without mutex)
147         return;
148     }
149 
150     // first open of the DB file
151     // should grab mutex to avoid race with other writer
152     filemgr_mutex_lock(file);
153 
154     btree_iterator_init(handle->staletree, &bit, NULL);
155     do {
156         br = btree_next(&bit, (void*)&_revnum, (void*)&_offset);
157         btreeblk_end(handle->bhandle);
158         if (br != BTREE_RESULT_SUCCESS) {
159             break;
160         }
161 
162         revnum = _endian_decode(_revnum);
163         offset = _endian_decode(_offset);
164 
165         while (offset != BLK_NOT_FOUND) {
166             memset(&doc, 0x0, sizeof(doc));
167             // pre-allocated buffer for key
168             doc.key = (void*)keybuf;
169 
170             ret = docio_read_doc(handle->dhandle, offset, &doc, true);
171             if (ret <= 0) {
172                 // read fail .. escape
173                 fdb_log(NULL, (fdb_status)ret,
174                     "Error in reading a stale region info document "
175                     "from a database file '%s'"
176                     ": revnum %" _F64 ", offset %" _F64 "\n",
177                     file->filename, revnum, offset);
178                 offset = BLK_NOT_FOUND;
179                 continue;
180             }
181 
182             fdb_add_inmem_stale_info(handle, revnum, &doc, offset, false);
183 
184             // fetch previous doc offset
185             memcpy(&_offset, doc.body, sizeof(_offset));
186             prev_offset = _endian_decode(_offset);
187 
188             // We don't need to free 'meta' as it will be NULL.
189             free(doc.body);
190 
191             offset = prev_offset;
192         }
193     } while (true);
194     btree_iterator_free(&bit);
195 
196     filemgr_mutex_unlock(file);
197 }
198 
fdb_gather_stale_blocks(fdb_kvs_handle *handle, filemgr_header_revnum_t revnum, bid_t prev_hdr, uint64_t kv_info_offset, fdb_seqnum_t seqnum, struct list_elem *e_last, bool from_mergetree)199 void fdb_gather_stale_blocks(fdb_kvs_handle *handle,
200                              filemgr_header_revnum_t revnum,
201                              bid_t prev_hdr,
202                              uint64_t kv_info_offset,
203                              fdb_seqnum_t seqnum,
204                              struct list_elem *e_last,
205                              bool from_mergetree)
206 {
207     int64_t delta;
208     int r;
209     uint32_t count = 0;
210     uint32_t offset = 0, count_location;
211     uint32_t bufsize = 8192;
212     uint32_t _count, _len;
213     uint64_t _pos, _kv_info_offset;
214     uint8_t *buf = NULL;
215     bid_t doc_offset, _doc_offset;
216     bid_t _prev_hdr;
217     bool gather_staleblocks = true;
218     bool first_loop = true;
219     filemgr_header_revnum_t _revnum;
220     fdb_seqnum_t _seqnum;
221     struct kvs_stat stat;
222 
223     /*
224      * << stale block system doc structure >>
225      * [previous doc offset]: 8 bytes (0xffff.. if not exist)
226      * [previous header BID]: 8 bytes (0xffff.. if not exist)
227      * [KVS info doc offset]: 8 bytes (0xffff.. if not exist)
228      * [Default KVS seqnum]:  8 bytes
229      * [# items]:             4 bytes
230      * ---
231      * [position]:            8 bytes
232      * [length]:              4 bytes
233      * ...
234      */
235 
236     if (filemgr_get_stale_list(handle->file)) {
237         struct avl_node *a;
238         struct list_elem *e;
239         struct stale_data *item;
240 
241         r = _kvs_stat_get(handle->file, 0, &stat);
242         handle->bhandle->nlivenodes = stat.nlivenodes;
243         handle->bhandle->ndeltanodes = stat.nlivenodes;
244         (void)r;
245 
246         buf = (uint8_t *)calloc(1, bufsize);
247         _revnum = _endian_encode(revnum);
248 
249         // initial previous doc offset
250         memset(buf, 0xff, sizeof(bid_t));
251         count_location = sizeof(bid_t);
252 
253         // previous header BID
254         if (prev_hdr == 0 || prev_hdr == BLK_NOT_FOUND) {
255             // does not exist
256             memset(&_prev_hdr, 0xff, sizeof(_prev_hdr));
257         } else {
258             _prev_hdr = _endian_encode(prev_hdr);
259         }
260         memcpy(buf + sizeof(bid_t), &_prev_hdr, sizeof(bid_t));
261         count_location += sizeof(bid_t);
262 
263         // KVS info doc offset
264         _kv_info_offset = _endian_encode(kv_info_offset);
265         memcpy(buf + count_location, &_kv_info_offset, sizeof(uint64_t));
266         count_location += sizeof(uint64_t);
267 
268         // default KVS seqnum
269         _seqnum = _endian_encode(seqnum);
270         memcpy(buf + count_location, &_seqnum, sizeof(fdb_seqnum_t));
271         count_location += sizeof(fdb_seqnum_t);
272         count_location += sizeof(count);
273 
274         while(gather_staleblocks) {
275             // reserve space for
276             // prev offset (8), prev header (8), kv_info_offset (8),
277             // seqnum (8), count (4)
278             offset = count_location;
279 
280             if (first_loop && from_mergetree) {
281                 // gather from mergetree
282                 a = avl_first(&handle->file->mergetree);
283                 while (a) {
284                     item = _get_entry(a, struct stale_data, avl);
285 
286                     if (handle->staletree) {
287                         count++;
288 
289                         _pos = _endian_encode(item->pos);
290                         _len = _endian_encode(item->len);
291 
292                         memcpy(buf + offset, &_pos, sizeof(_pos));
293                         offset += sizeof(_pos);
294                         memcpy(buf + offset, &_len, sizeof(_len));
295                         offset += sizeof(_len);
296 
297                         if (offset + sizeof(_pos) + sizeof(_len) >= bufsize) {
298                             bufsize *= 2;
299                             buf = (uint8_t*)realloc(buf, bufsize);
300                         }
301                     }
302 
303 
304                     // If 'from_mergetree' flag is set, it means that this
305                     // function is called at the end of fdb_get_reusable_block(),
306                     // and those items are remaining (non-reusable) regions after
307                     // picking up reusable blocks from 'mergetree'.
308 
309                     // In the previous implementation, those items are converted
310                     // and stored as a system document. The document is re-read in
311                     // the next block reclaim, and then we reconstruct 'mergetree'
312                     // from the document; this is unnecessary duplicated overhead.
313 
314                     // As an optimization, we can simply keep those items in
315                     // 'mergetree' and use them in the next block reclaim, without
316                     // reading the corresponding system document; this also reduces
317                     // the commit latency much. Instead, to minimize memory
318                     // consumption, we don't need to maintain in-memory copy of the
319                     // system doc corresponding to the remaining items in the
320                     // 'mergetree', that will be created below.
321 
322                     // do not remove the item
323                     a = avl_next(&item->avl);
324                 }
325             } else {
326                 // gater from stale_list
327                 if (e_last) {
328                     e = list_next(e_last);
329                 } else {
330                     e = list_begin(handle->file->stale_list);
331                 }
332                 while (e) {
333                     item = _get_entry(e, struct stale_data, le);
334 
335                     if (handle->staletree) {
336                         count++;
337 
338                         _pos = _endian_encode(item->pos);
339                         _len = _endian_encode(item->len);
340 
341                         memcpy(buf + offset, &_pos, sizeof(_pos));
342                         offset += sizeof(_pos);
343                         memcpy(buf + offset, &_len, sizeof(_len));
344                         offset += sizeof(_len);
345 
346                         if (offset + sizeof(_pos) + sizeof(_len) >= bufsize) {
347                             bufsize *= 2;
348                             buf = (uint8_t*)realloc(buf, bufsize);
349                         }
350                     }
351 
352                     e = list_remove(handle->file->stale_list, e);
353                     free(item);
354                 }
355             }
356 
357             gather_staleblocks = false;
358             if (count) {
359                 char *doc_key = alca(char, 32);
360                 struct docio_object doc;
361 
362                 // store count
363                 _count = _endian_encode(count);
364                 memcpy(buf + count_location - sizeof(_count), &_count, sizeof(_count));
365 
366                 // append a system doc
367                 memset(&doc, 0x0, sizeof(doc));
368                 // add one to 'revnum' to get the next revision number
369                 // (note that filemgr_mutex() is grabbed so that no other thread
370                 //  will change the 'revnum').
371                 sprintf(doc_key, "stale_blocks_%" _F64, revnum);
372                 doc.key = (void*)doc_key;
373                 doc.body = buf;
374                 doc.length.keylen = strlen(doc_key) + 1;
375                 doc.length.metalen = 0;
376                 doc.length.bodylen = offset;
377                 doc.seqnum = 0;
378                 doc_offset = docio_append_doc_system(handle->dhandle, &doc);
379 
380                 // insert into stale-block tree
381                 _doc_offset = _endian_encode(doc_offset);
382                 btree_insert(handle->staletree, (void *)&_revnum, (void *)&_doc_offset);
383                 btreeblk_end(handle->bhandle);
384                 btreeblk_reset_subblock_info(handle->bhandle);
385 
386                 if (from_mergetree && first_loop) {
387                     // if from_mergetree flag is set and this is the first loop,
388                     // stale regions in this document are already in mergetree
389                     // so skip adding them into in-memory stale info tree.
390 
391                     // however, the system doc itself should be marked as stale
392                     // when the doc is reclaimed, thus we instead add a dummy entry
393                     // that containing doc offset, length info only.
394                     fdb_add_inmem_stale_info(handle, revnum, &doc, doc_offset, true);
395                 } else {
396                     // add the doc into in-memory stale info tree
397                     fdb_add_inmem_stale_info(handle, revnum, &doc, doc_offset, false);
398                 }
399 
400                 if (list_begin(filemgr_get_stale_list(handle->file))) {
401                     // updating stale tree brings another stale blocks.
402                     // recursively update until there is no more stale block.
403 
404                     // note that infinite loop will not occur because
405                     // 1) all updated index blocks for stale tree are still writable
406                     // 2) incoming keys for stale tree (revnum) are monotonic
407                     //    increasing order; most recently allocated node will be
408                     //    updated again.
409 
410                     count = 0;
411                     // save previous doc offset
412                     memcpy(buf, &_doc_offset, sizeof(_doc_offset));
413 
414                     // gather once again
415                     gather_staleblocks = true;
416                 }
417             }
418 
419             first_loop = false;
420         } // gather stale blocks
421 
422         delta = handle->bhandle->nlivenodes - stat.nlivenodes;
423         _kvs_stat_update_attr(handle->file, 0, KVS_STAT_NLIVENODES, delta);
424         delta = handle->bhandle->ndeltanodes - stat.nlivenodes;
425         delta *= handle->config.blocksize;
426         _kvs_stat_update_attr(handle->file, 0, KVS_STAT_DELTASIZE, delta);
427 
428         free(buf);
429     } else {
430         btreeblk_reset_subblock_info(handle->bhandle);
431     }
432 }
433 
_reusable_offset_cmp(struct avl_node *a, struct avl_node *b, void *aux)434 static int _reusable_offset_cmp(struct avl_node *a, struct avl_node *b, void *aux)
435 {
436     struct stale_data *aa, *bb;
437     aa = _get_entry(a, struct stale_data, avl);
438     bb = _get_entry(b, struct stale_data, avl);
439     return _CMP_U64(aa->pos, bb->pos);
440 }
441 
_insert_n_merge(struct avl_tree *tree, uint64_t item_pos, uint32_t item_len)442 static void _insert_n_merge(struct avl_tree *tree,
443                             uint64_t item_pos,
444                             uint32_t item_len)
445 {
446     struct stale_data query, *item;
447     struct avl_node *avl;
448 
449     // retrieve the tree first
450     query.pos = item_pos;
451     avl = avl_search(tree, &query.avl, _reusable_offset_cmp);
452     if (avl) {
453         // same offset already exists
454         item = _get_entry(avl, struct stale_data, avl);
455         // choose longer length
456         if (item->len < item_len) {
457             item->len = item_len;
458         }
459     } else {
460         // does not exist .. create a new item
461         item = (struct stale_data*)
462                calloc(1, sizeof(struct stale_data));
463         item->pos = item_pos;
464         item->len = item_len;
465         avl_insert(tree, &item->avl, _reusable_offset_cmp);
466     }
467 
468     // check prev/next item to see if they can be merged
469     struct avl_node *p_avl, *n_avl;
470     struct stale_data*p_item, *n_item;
471     p_avl = avl_prev(&item->avl);
472     if (p_avl) {
473         p_item = _get_entry(p_avl, struct stale_data, avl);
474         if (p_item->pos + p_item->len >= item->pos) {
475 
476             if (p_item->pos + p_item->len >= item->pos + item->len) {
477                 // 'item' is included in p_item .. simply remove it
478                 // (do nothing)
479             } else {
480                 // overlapping (or consecutive) .. merge two items
481                 p_item->len += item->len +
482                                (item->pos - p_item->pos - p_item->len);
483             }
484             // remove current item
485             avl_remove(tree, &item->avl);
486             free(item);
487             item = p_item;
488         }
489     }
490 
491     n_avl = avl_next(&item->avl);
492     if (n_avl) {
493         n_item = _get_entry(n_avl, struct stale_data, avl);
494         if (item->pos + item->len >= n_item->pos) {
495 
496             if (item->pos + item->len >= n_item->pos + n_item->len) {
497                 // 'n_item' is included in 'item' .. simply remove it
498                 // (do nothing)
499             } else {
500                 // overlapping (or consecutive) .. merge two items
501                 item->len += n_item->len +
502                              (n_item->pos - item->pos - item->len);
503             }
504             // remove next item
505             avl_remove(tree, &n_item->avl);
506             free(n_item);
507         }
508     }
509 }
510 
511 // Parse & fetch stale regions from the buffer 'ctx', which is the body of
512 // a stale info system document (from either in-memory stale-block-tree or
513 // on-disk stale-block-tree). After fetching, insert those regions
514 // into 'mergetree'.
_fetch_stale_info_doc(void *ctx, struct avl_tree *mergetree, uint64_t &prev_offset_out, uint64_t &prev_hdr_out)515 static void _fetch_stale_info_doc(void *ctx,
516                                   struct avl_tree *mergetree,
517                                   uint64_t &prev_offset_out,
518                                   uint64_t &prev_hdr_out)
519 {
520     uint32_t i, count, _count, item_len;
521     uint64_t pos;
522     uint64_t item_pos;
523 
524     pos = 0;
525 
526     // get previous doc offset
527     memcpy(&prev_offset_out, ctx, sizeof(prev_offset_out));
528     prev_offset_out = _endian_decode(prev_offset_out);
529     pos += sizeof(prev_offset_out);
530 
531     // get previous header BID
532     memcpy(&prev_hdr_out, (uint8_t*)ctx + pos, sizeof(prev_hdr_out));
533     prev_hdr_out = _endian_decode(prev_hdr_out);
534     (void)prev_hdr_out;
535     pos += sizeof(prev_hdr_out);
536 
537     // Skip kv_info_offset and default KVS's seqnum
538     pos += sizeof(uint64_t) + sizeof(fdb_seqnum_t);
539 
540     // get count;
541     memcpy(&_count, (uint8_t*)ctx + pos, sizeof(_count));
542     count = _endian_decode(_count);
543     pos += sizeof(_count);
544 
545     // get a stale region and insert/merge into tree
546     for (i=0;i<count;++i) {
547         memcpy(&item_pos, (uint8_t*)ctx + pos, sizeof(item_pos));
548         item_pos = _endian_decode(item_pos);
549         pos += sizeof(item_pos);
550 
551         memcpy(&item_len, (uint8_t*)ctx + pos, sizeof(item_len));
552         item_len = _endian_decode(item_len);
553         pos += sizeof(item_len);
554 
555         _insert_n_merge(mergetree, item_pos, item_len);
556     }
557 }
558 
fdb_get_reusable_block(fdb_kvs_handle *handle, stale_header_info stale_header)559 reusable_block_list fdb_get_reusable_block(fdb_kvs_handle *handle,
560                                            stale_header_info stale_header)
561 {
562     int64_t delta;
563     int r;
564     uint8_t keybuf[64];
565     uint32_t i;
566     uint32_t item_len;
567     uint32_t n_revnums, max_revnum_array = 256;
568     uint64_t item_pos;
569     btree_iterator bit;
570     btree_result br;
571     filemgr_header_revnum_t revnum_upto, prev_revnum = 0;
572     filemgr_header_revnum_t revnum = 0, _revnum;
573     filemgr_header_revnum_t *revnum_array;
574     bid_t offset, _offset, prev_offset;
575     bid_t prev_hdr = BLK_NOT_FOUND;
576     bool stale_tree_scan = true;
577     struct docio_object doc;
578     struct avl_tree *mergetree = &handle->file->mergetree;
579     struct avl_node *avl;
580     struct stale_data *item;
581     struct list_elem *e, *e_last;
582     struct kvs_stat stat;
583 
584     revnum_upto = stale_header.revnum;
585 
586     r = _kvs_stat_get(handle->file, 0, &stat);
587     handle->bhandle->nlivenodes = stat.nlivenodes;
588     handle->bhandle->ndeltanodes = stat.nlivenodes;
589     (void)r;
590 
591     revnum_array = (filemgr_header_revnum_t *)
592                    calloc(max_revnum_array, sizeof(filemgr_header_revnum_t));
593     n_revnums = 0;
594 
595     // remember the last stale list item to be preserved
596     e_last = list_end(handle->file->stale_list);
597 
598     avl = avl_first(&handle->file->stale_info_tree);
599     if (avl) {
600         // if in-memory stale info exists
601         void *uncomp_buf = NULL;
602         int r;
603         size_t uncomp_buflen = 128*1024; // 128 KB by default;
604         struct stale_info_commit *commit;
605         struct stale_info_entry *entry;
606 
607         stale_tree_scan = false;
608 
609         if (compress_inmem_stale_info) {
610             uncomp_buf = (void*)calloc(1, uncomp_buflen);
611             if (!uncomp_buf) {
612                 fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
613                     "(fdb_get_reusable_block) "
614                     "calloc of 'uncomp_buf' failed: "
615                     "database file '%s', "
616                     "uncomp_buflen %d\n",
617                     handle->file->filename,
618                     (int)uncomp_buflen);
619                 free(revnum_array);
620 
621                 reusable_block_list ret;
622                 ret.n_blocks = 0;
623                 ret.blocks = NULL;
624 
625                 return ret;
626             }
627         }
628 
629         while (avl) {
630             commit = _get_entry(avl, struct stale_info_commit, avl);
631             avl = avl_next(avl);
632 
633             prev_revnum = revnum;
634             revnum = commit->revnum;
635             if (revnum > revnum_upto) {
636                 revnum = prev_revnum;
637                 break;
638             }
639 
640             filemgr_header_revnum_t *new_revnum_array = revnum_array;
641             revnum_array[n_revnums++] = revnum;
642             if (n_revnums >= max_revnum_array) {
643                 max_revnum_array *= 2;
644                 new_revnum_array = (filemgr_header_revnum_t *)
645                                    realloc(revnum_array, max_revnum_array *
646                                            sizeof(filemgr_header_revnum_t));
647             }
648             if (!new_revnum_array) {
649                 // realloc() of revnum_array failed.
650                 fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
651                     "(fdb_get_reusable_block) "
652                     "realloc of 'revnum_array' failed: "
653                     "database file '%s', "
654                     "max_revnum_array %d\n",
655                     handle->file->filename,
656                     (int)max_revnum_array);
657                 free(uncomp_buf);
658                 free(revnum_array);
659 
660                 reusable_block_list ret;
661                 ret.n_blocks = 0;
662                 ret.blocks = NULL;
663 
664                 return ret;
665             }
666             revnum_array = new_revnum_array;
667 
668             avl_remove(&handle->file->stale_info_tree, &commit->avl);
669 
670             e = list_begin(&commit->doc_list);
671             while (e) {
672                 entry = _get_entry(e, struct stale_info_entry, le);
673                 e = list_next(&entry->le);
674                 list_remove(&commit->doc_list, &entry->le);
675 
676                 if (entry->ctx) {
677 #ifdef _DOC_COMP
678                     if (compress_inmem_stale_info) {
679                         // uncompression
680                         void* new_uncomp_buf = uncomp_buf;
681                         if (uncomp_buflen < entry->ctxlen) {
682                             uncomp_buflen = entry->ctxlen;
683                             new_uncomp_buf = (void*)realloc(uncomp_buf, uncomp_buflen);
684                         }
685 
686                         if (!new_uncomp_buf) {
687                             // realloc() of uncomp_buf failed.
688                             fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
689                                 "(fdb_get_reusable_block) "
690                                 "realloc of 'uncomp_buf' failed: "
691                                 "database file '%s', "
692                                 "uncomp_buflen %d, "
693                                 "entry->ctxlen %d\n",
694                                 handle->file->filename,
695                                 (int)uncomp_buflen,
696                                 (int)entry->ctxlen);
697                             free(uncomp_buf);
698                             free(revnum_array);
699 
700                             reusable_block_list ret;
701                             ret.n_blocks = 0;
702                             ret.blocks = NULL;
703 
704                             return ret;
705                         }
706                         uncomp_buf = new_uncomp_buf;
707 
708                         size_t len = uncomp_buflen;
709                         r = snappy_uncompress((char*)entry->ctx, entry->comp_ctxlen,
710                                               (char*)uncomp_buf, &len);
711                         if (r != 0) {
712                             fdb_log(NULL, FDB_RESULT_COMPRESSION_FAIL,
713                                 "(fdb_get_reusable_block) "
714                                 "Uncompression error from a database file '%s'"
715                                 ": return value %d, header revnum %" _F64 ", "
716                                 "doc offset %" _F64 "\n",
717                                 handle->file->filename, r, revnum, entry->offset);
718                             free(uncomp_buf);
719                             free(revnum_array);
720 
721                             reusable_block_list ret;
722                             ret.n_blocks = 0;
723                             ret.blocks = NULL;
724 
725                             return ret;
726                         }
727 
728                         // fetch the context
729                         _fetch_stale_info_doc(uncomp_buf, mergetree,
730                                               prev_offset, prev_hdr);
731                     } else {
732                         _fetch_stale_info_doc(entry->ctx, mergetree,
733                                               prev_offset, prev_hdr);
734                     }
735 #else
736                     _fetch_stale_info_doc(entry->ctx, mergetree, prev_offset, prev_hdr);
737 #endif
738                 }
739 
740                 // also insert/merge the system doc region
741                 struct stale_regions sr;
742 
743                 sr = filemgr_actual_stale_regions(handle->file, entry->offset,
744                                                   entry->doclen);
745 
746                 if (sr.n_regions > 1) {
747                     for (i=0; i<sr.n_regions; ++i){
748                         _insert_n_merge(mergetree, sr.regions[i].pos, sr.regions[i].len);
749                     }
750                     free(sr.regions);
751                 } else {
752                     _insert_n_merge(mergetree, sr.region.pos, sr.region.len);
753                 }
754 
755                 free(entry->ctx);
756                 free(entry);
757             }
758 
759             free(commit);
760         }
761         free(uncomp_buf);
762     }
763 
764     if (stale_tree_scan) {
765         // scan stale-block tree and get all stale regions
766         // corresponding to commit headers whose seq number is
767         // equal to or smaller than 'revnum_upto'
768         btree_iterator_init(handle->staletree, &bit, NULL);
769         do {
770             br = btree_next(&bit, (void*)&_revnum, (void*)&_offset);
771             btreeblk_end(handle->bhandle);
772             if (br != BTREE_RESULT_SUCCESS) {
773                 break;
774             }
775 
776             prev_revnum = revnum;
777             revnum = _endian_decode(_revnum);
778             if (revnum > revnum_upto) {
779                 revnum = prev_revnum;
780                 break;
781             }
782 
783             revnum_array[n_revnums++] = revnum;
784             if (n_revnums >= max_revnum_array) {
785                 max_revnum_array *= 2;
786                 revnum_array = (filemgr_header_revnum_t *)
787                                realloc(revnum_array, max_revnum_array *
788                                    sizeof(filemgr_header_revnum_t));
789             }
790             offset = _endian_decode(_offset);
791 
792             while (offset != BLK_NOT_FOUND) {
793                 memset(&doc, 0x0, sizeof(doc));
794                 // pre-allocated buffer for key
795                 doc.key = (void*)keybuf;
796 
797                 if (docio_read_doc(handle->dhandle, offset, &doc, true) <= 0) {
798                     // read fail .. escape
799                     offset = BLK_NOT_FOUND;
800                     continue;
801                 }
802 
803                 _fetch_stale_info_doc(doc.body, mergetree, prev_offset, prev_hdr);
804 
805                 // also insert/merge the system doc region
806                 size_t length = _fdb_get_docsize(doc.length);
807                 struct stale_regions sr;
808 
809                 sr = filemgr_actual_stale_regions(handle->file, offset, length);
810 
811                 if (sr.n_regions > 1) {
812                     for (i=0; i<sr.n_regions; ++i){
813                         _insert_n_merge(mergetree, sr.regions[i].pos, sr.regions[i].len);
814                     }
815                     free(sr.regions);
816                 } else {
817                     _insert_n_merge(mergetree, sr.region.pos, sr.region.len);
818                 }
819 
820                 // We don't need to free 'meta' as it will be NULL.
821                 free(doc.body);
822 
823                 offset = prev_offset;
824             }
825         } while (true);
826         btree_iterator_free(&bit);
827     }
828 
829     // remove merged commit headers
830     for (i=0; i<n_revnums; ++i) {
831         _revnum = _endian_encode(revnum_array[i]);
832         btree_remove(handle->staletree, (void*)&_revnum);
833         btreeblk_end(handle->bhandle);
834     }
835 
836     delta = handle->bhandle->nlivenodes - stat.nlivenodes;
837     _kvs_stat_update_attr(handle->file, 0, KVS_STAT_NLIVENODES, delta);
838     delta = handle->bhandle->ndeltanodes - stat.nlivenodes;
839     delta *= handle->config.blocksize;
840     _kvs_stat_update_attr(handle->file, 0, KVS_STAT_DELTASIZE, delta);
841 
842     // gather stale blocks generated by removing b+tree entries
843     if (e_last) {
844         e = list_next(e_last);
845     } else {
846         e = list_begin(handle->file->stale_list);
847     }
848     while (e) {
849         item = _get_entry(e, struct stale_data, le);
850         e = list_remove(handle->file->stale_list, e);
851 
852         _insert_n_merge(mergetree, item->pos, item->len);
853         free(item);
854     }
855 
856     // now merge stale regions as large as possible
857     size_t n_blocks =0 ;
858     size_t blocksize = handle->file->blocksize;
859     uint32_t max_blocks = 256;
860     uint32_t front_margin;
861     reusable_block_list ret;
862     struct reusable_block *blocks_arr;
863 
864     blocks_arr = (struct reusable_block*)
865         calloc(max_blocks, sizeof(struct reusable_block));
866 
867     avl = avl_first(mergetree);
868     while (avl) {
869         item = _get_entry(avl, struct stale_data, avl);
870         avl = avl_next(avl);
871 
872         // A stale region can be represented as follows:
873         //
874         //  block x-1 |   block x  |  block x+1 |  block x+2
875         //  -----+----+------------+------------+--------+----
876         //   ... |  A |      B     |      C     |    D   | ...
877         //  -----+----+------------+------------+--------+----
878         //
879         // Only segment 'B' and 'C' can be reusable, and the other segments
880         // (i.e., 'A' and 'D') should be re-inserted into stale-block tree.
881 
882         if (item->len < blocksize) {
883             // whole region is smaller than a block .. skip this item
884             //       |    block    |  ...
885             //  -----+---+-----+---+-------
886             //   ... |   |/////|   |  ...
887             //  -----+---+-----+---+-------
888             continue;
889         }
890 
891         //            <------------ item_len ------------>
892         //  block x-1 |   block x  |  block x+1 |  block x+2
893         //  -----+----+------------+------------+--------+----
894         //   ... |  A |      B     |      C     |    D   | ...
895         //  -----+----+------------+------------+--------+----
896         //            ^
897         //            item_pos
898         if (item->pos % blocksize) {
899             front_margin = blocksize - item->pos % blocksize;
900         } else {
901             front_margin = 0;
902         }
903         item_pos = item->pos + front_margin;
904         item_len = item->len - front_margin;
905 
906         if (item_len < blocksize) {
907             // Remaining length is smaller than a block. This means that there
908             // is no reusable block in this region (even though the region size is
909             // bigger than a block size) .. skip this item.
910             //
911             //       |   block x   |  block x+1  | ...
912             //  -----+------+------+-------+-----+----
913             //   ... |      |//////|///////|     | ...
914             //  -----+------+------+-------+-----+----
915             continue;
916         }
917 
918         // calculate # blocks and add to 'blocks'
919         blocks_arr[n_blocks].bid = item_pos / blocksize;
920         blocks_arr[n_blocks].count = item_len / blocksize;
921         n_blocks += 1;
922         if (n_blocks >= max_blocks) {
923             max_blocks *= 2;
924             blocks_arr = (struct reusable_block*)
925                 realloc(blocks_arr, max_blocks * sizeof(struct reusable_block));
926         }
927 
928         if (front_margin) {
929             // adjust the existing item to indicate 'A' in above example
930             item->len = front_margin;
931         } else {
932             // exactly aligned .. remove this item
933             avl_remove(mergetree, &item->avl);
934             free(item);
935         }
936 
937         uint32_t remaining_len;
938         remaining_len = item_len % blocksize;
939         if (remaining_len) {
940             // add a new item for the remaining region ('D' in above example)
941             struct stale_data *new_item;
942             new_item = (struct stale_data *)
943                        calloc(1, sizeof(struct stale_data));
944             new_item->pos = (blocks_arr[n_blocks-1].bid + blocks_arr[n_blocks-1].count)
945                             * blocksize;
946             new_item->len = remaining_len;
947             avl_insert(mergetree, &new_item->avl, _reusable_offset_cmp);
948             avl = avl_next(&new_item->avl);
949         }
950     }
951 
952     // re-write stale tree using the last revnum as a key
953     // in this case, only stale regions newly generated by this function are gathered,
954     // and prev_hdr is set to BLK_NOT_FOUND, as corresponding seq numbers are
955     // already removed.
956 
957     // however, do not remove the remaining items in the merge-tree and continue to
958     // merge them in the next block reclaim.
959     fdb_gather_stale_blocks(handle, revnum, BLK_NOT_FOUND, BLK_NOT_FOUND,
960                             0, e_last, true);
961 
962     free(revnum_array);
963 
964     ret.n_blocks = n_blocks;
965     ret.blocks = blocks_arr;
966     return ret;
967 }
968 
fdb_rollback_stale_blocks(fdb_kvs_handle *handle, filemgr_header_revnum_t cur_revnum)969 void fdb_rollback_stale_blocks(fdb_kvs_handle *handle,
970                                filemgr_header_revnum_t cur_revnum)
971 {
972     btree_result br;
973     filemgr_header_revnum_t i, _revnum;
974     struct avl_node *avl;
975     struct list_elem *elem;
976     struct stale_info_commit *commit, query;
977     struct stale_info_entry *entry;
978 
979     if (handle->rollback_revnum == 0) {
980         return;
981     }
982 
983     // remove from on-disk stale-tree
984     for (i = handle->rollback_revnum; i < cur_revnum; ++i) {
985         _revnum = _endian_encode(i);
986         br = btree_remove(handle->staletree, (void*)&_revnum);
987         // don't care the result
988         (void)br;
989         btreeblk_end(handle->bhandle);
990     }
991 
992     // also remove from in-memory stale-tree
993     query.revnum = handle->rollback_revnum;
994     avl = avl_search(&handle->file->stale_info_tree,
995                    &query.avl, _inmem_stale_cmp);
996     if (!avl) {
997         avl = avl_search_greater(&handle->file->stale_info_tree,
998                                &query.avl, _inmem_stale_cmp);
999     }
1000     while (avl) {
1001         commit = _get_entry(avl, struct stale_info_commit, avl);
1002         avl = avl_next(avl);
1003 
1004         avl_remove(&handle->file->stale_info_tree, &commit->avl);
1005 
1006         elem = list_begin(&commit->doc_list);
1007         while (elem) {
1008             entry = _get_entry(elem, struct stale_info_entry, le);
1009             elem = list_remove(&commit->doc_list, &entry->le);
1010 
1011             free(entry->ctx);
1012             free(entry);
1013         }
1014 
1015         free(commit);
1016     }
1017 }
1018 
1019