xref: /6.0.3/forestdb/src/staleblock.cc (revision 74dd9214)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <string.h>
20#include <stdint.h>
21#include <stdlib.h>
22
23#include "staleblock.h"
24#include "btreeblock.h"
25#include "list.h"
26#include "docio.h"
27#include "fdb_internal.h"
28#include "version.h"
29#include "time_utils.h"
30
31#ifdef _DOC_COMP
32#include "snappy-c.h"
33#endif
34
35#include "memleak.h"
36
37static bool compress_inmem_stale_info = true;
38
39static int _inmem_stale_cmp(struct avl_node *a, struct avl_node *b, void *aux)
40{
41    struct stale_info_commit *aa, *bb;
42    aa = _get_entry(a, struct stale_info_commit, avl);
43    bb = _get_entry(b, struct stale_info_commit, avl);
44
45    if (aa->revnum < bb->revnum) {
46        return -1;
47    } else if (aa->revnum > bb->revnum) {
48        return 1;
49    } else {
50        return 0;
51    }
52}
53
54static void fdb_add_inmem_stale_info(fdb_kvs_handle *handle,
55                                     filemgr_header_revnum_t revnum,
56                                     struct docio_object *doc,
57                                     uint64_t doc_offset,
58                                     bool system_doc_only)
59{
60    int ret;
61    size_t buflen = 0;
62    struct filemgr *file = handle->file;
63    struct avl_node *a;
64    struct stale_info_commit *item, query;
65    struct stale_info_entry *entry;
66
67    // search using revnum first
68    query.revnum = revnum;
69    a = avl_search(&file->stale_info_tree, &query.avl, _inmem_stale_cmp);
70    if (a) {
71        // already exist
72        item = _get_entry(a, struct stale_info_commit, avl);
73    } else {
74        // not exist .. create a new one and insert into tree
75        item = (struct stale_info_commit *)calloc(1, sizeof(struct stale_info_commit));
76        item->revnum = revnum;
77        list_init(&item->doc_list);
78        avl_insert(&file->stale_info_tree, &item->avl, _inmem_stale_cmp);
79        file->stale_info_tree_loaded.store(true, std::memory_order_relaxed);
80    }
81
82    entry = (struct stale_info_entry *)calloc(1, sizeof(struct stale_info_entry));
83
84    if (!system_doc_only) {
85#ifdef _DOC_COMP
86        if (compress_inmem_stale_info) {
87            buflen = snappy_max_compressed_length(doc->length.bodylen);;
88            entry->ctx = (void *)calloc(1, buflen);
89            ret = snappy_compress((char*)doc->body, doc->length.bodylen,
90                (char*)entry->ctx, &buflen);
91            if (ret != 0) {
92                fdb_log(NULL, FDB_RESULT_COMPRESSION_FAIL,
93                    "(fdb_add_inmem_stale_info) "
94                    "Compression error from a database file '%s'"
95                    ": return value %d, header revnum %" _F64 ", "
96                    "doc offset %" _F64 "\n",
97                    file->filename, ret, revnum, doc_offset);
98                if (!a) {
99                    // 'item' is allocated in this function call.
100                    avl_remove(&file->stale_info_tree, &item->avl);
101                    free(item);
102                }
103                free(entry);
104                return;
105            }
106        } else {
107            buflen = doc->length.bodylen;
108            entry->ctx = (void *)calloc(1, buflen);
109            memcpy(entry->ctx, doc->body, doc->length.bodylen);
110        }
111#else
112        buflen = doc->length.bodylen;
113        entry->ctx = (void *)calloc(1, buflen);
114        memcpy(entry->ctx, doc->body, doc->length.bodylen);
115#endif
116
117        entry->ctxlen = doc->length.bodylen;
118
119    } else {
120        // when 'system_doc_only' flag is set, just set to NULL.
121        // we need the doc's length and offset info only.
122        entry->ctx = NULL;
123        entry->ctxlen = 0;
124    }
125
126    entry->comp_ctxlen = buflen;
127    entry->doclen = _fdb_get_docsize(doc->length);
128    entry->offset = doc_offset;
129    list_push_back(&item->doc_list, &entry->le);
130}
131
132void fdb_load_inmem_stale_info(fdb_kvs_handle *handle)
133{
134    uint8_t keybuf[64];
135    int64_t ret;
136    bid_t offset, _offset, prev_offset;
137    filemgr_header_revnum_t revnum, _revnum;
138    btree_iterator bit;
139    btree_result br;
140    struct filemgr *file = handle->file;
141    struct docio_object doc;
142    bool expected = false;
143
144    if (!std::atomic_compare_exchange_strong(&file->stale_info_tree_loaded,
145                                             &expected, true)) {
146        // stale info is already loaded (fast screening without mutex)
147        return;
148    }
149
150    // first open of the DB file
151    // should grab mutex to avoid race with other writer
152    filemgr_mutex_lock(file);
153
154    btree_iterator_init(handle->staletree, &bit, NULL);
155    do {
156        br = btree_next(&bit, (void*)&_revnum, (void*)&_offset);
157        btreeblk_end(handle->bhandle);
158        if (br != BTREE_RESULT_SUCCESS) {
159            break;
160        }
161
162        revnum = _endian_decode(_revnum);
163        offset = _endian_decode(_offset);
164
165        while (offset != BLK_NOT_FOUND) {
166            memset(&doc, 0x0, sizeof(doc));
167            // pre-allocated buffer for key
168            doc.key = (void*)keybuf;
169
170            ret = docio_read_doc(handle->dhandle, offset, &doc, true);
171            if (ret <= 0) {
172                // read fail .. escape
173                fdb_log(NULL, (fdb_status)ret,
174                    "Error in reading a stale region info document "
175                    "from a database file '%s'"
176                    ": revnum %" _F64 ", offset %" _F64 "\n",
177                    file->filename, revnum, offset);
178                offset = BLK_NOT_FOUND;
179                continue;
180            }
181
182            fdb_add_inmem_stale_info(handle, revnum, &doc, offset, false);
183
184            // fetch previous doc offset
185            memcpy(&_offset, doc.body, sizeof(_offset));
186            prev_offset = _endian_decode(_offset);
187
188            // We don't need to free 'meta' as it will be NULL.
189            free(doc.body);
190
191            offset = prev_offset;
192        }
193    } while (true);
194    btree_iterator_free(&bit);
195
196    filemgr_mutex_unlock(file);
197}
198
199void fdb_gather_stale_blocks(fdb_kvs_handle *handle,
200                             filemgr_header_revnum_t revnum,
201                             bid_t prev_hdr,
202                             uint64_t kv_info_offset,
203                             fdb_seqnum_t seqnum,
204                             struct list_elem *e_last,
205                             bool from_mergetree)
206{
207    int64_t delta;
208    int r;
209    uint32_t count = 0;
210    uint32_t offset = 0, count_location;
211    uint32_t bufsize = 8192;
212    uint32_t _count, _len;
213    uint64_t _pos, _kv_info_offset;
214    uint8_t *buf = NULL;
215    bid_t doc_offset, _doc_offset;
216    bid_t _prev_hdr;
217    bool gather_staleblocks = true;
218    bool first_loop = true;
219    filemgr_header_revnum_t _revnum;
220    fdb_seqnum_t _seqnum;
221    struct kvs_stat stat;
222
223    /*
224     * << stale block system doc structure >>
225     * [previous doc offset]: 8 bytes (0xffff.. if not exist)
226     * [previous header BID]: 8 bytes (0xffff.. if not exist)
227     * [KVS info doc offset]: 8 bytes (0xffff.. if not exist)
228     * [Default KVS seqnum]:  8 bytes
229     * [# items]:             4 bytes
230     * ---
231     * [position]:            8 bytes
232     * [length]:              4 bytes
233     * ...
234     */
235
236    if (filemgr_get_stale_list(handle->file)) {
237        struct avl_node *a;
238        struct list_elem *e;
239        struct stale_data *item;
240
241        r = _kvs_stat_get(handle->file, 0, &stat);
242        handle->bhandle->nlivenodes = stat.nlivenodes;
243        handle->bhandle->ndeltanodes = stat.nlivenodes;
244        (void)r;
245
246        buf = (uint8_t *)calloc(1, bufsize);
247        _revnum = _endian_encode(revnum);
248
249        // initial previous doc offset
250        memset(buf, 0xff, sizeof(bid_t));
251        count_location = sizeof(bid_t);
252
253        // previous header BID
254        if (prev_hdr == 0 || prev_hdr == BLK_NOT_FOUND) {
255            // does not exist
256            memset(&_prev_hdr, 0xff, sizeof(_prev_hdr));
257        } else {
258            _prev_hdr = _endian_encode(prev_hdr);
259        }
260        memcpy(buf + sizeof(bid_t), &_prev_hdr, sizeof(bid_t));
261        count_location += sizeof(bid_t);
262
263        // KVS info doc offset
264        _kv_info_offset = _endian_encode(kv_info_offset);
265        memcpy(buf + count_location, &_kv_info_offset, sizeof(uint64_t));
266        count_location += sizeof(uint64_t);
267
268        // default KVS seqnum
269        _seqnum = _endian_encode(seqnum);
270        memcpy(buf + count_location, &_seqnum, sizeof(fdb_seqnum_t));
271        count_location += sizeof(fdb_seqnum_t);
272        count_location += sizeof(count);
273
274        while(gather_staleblocks) {
275            // reserve space for
276            // prev offset (8), prev header (8), kv_info_offset (8),
277            // seqnum (8), count (4)
278            offset = count_location;
279
280            if (first_loop && from_mergetree) {
281                // gather from mergetree
282                a = avl_first(&handle->file->mergetree);
283                while (a) {
284                    item = _get_entry(a, struct stale_data, avl);
285
286                    if (handle->staletree) {
287                        count++;
288
289                        _pos = _endian_encode(item->pos);
290                        _len = _endian_encode(item->len);
291
292                        memcpy(buf + offset, &_pos, sizeof(_pos));
293                        offset += sizeof(_pos);
294                        memcpy(buf + offset, &_len, sizeof(_len));
295                        offset += sizeof(_len);
296
297                        if (offset + sizeof(_pos) + sizeof(_len) >= bufsize) {
298                            bufsize *= 2;
299                            buf = (uint8_t*)realloc(buf, bufsize);
300                        }
301                    }
302
303
304                    // If 'from_mergetree' flag is set, it means that this
305                    // function is called at the end of fdb_get_reusable_block(),
306                    // and those items are remaining (non-reusable) regions after
307                    // picking up reusable blocks from 'mergetree'.
308
309                    // In the previous implementation, those items are converted
310                    // and stored as a system document. The document is re-read in
311                    // the next block reclaim, and then we reconstruct 'mergetree'
312                    // from the document; this is unnecessary duplicated overhead.
313
314                    // As an optimization, we can simply keep those items in
315                    // 'mergetree' and use them in the next block reclaim, without
316                    // reading the corresponding system document; this also reduces
317                    // the commit latency much. Instead, to minimize memory
318                    // consumption, we don't need to maintain in-memory copy of the
319                    // system doc corresponding to the remaining items in the
320                    // 'mergetree', that will be created below.
321
322                    // do not remove the item
323                    a = avl_next(&item->avl);
324                }
325            } else {
326                // gater from stale_list
327                if (e_last) {
328                    e = list_next(e_last);
329                } else {
330                    e = list_begin(handle->file->stale_list);
331                }
332                while (e) {
333                    item = _get_entry(e, struct stale_data, le);
334
335                    if (handle->staletree) {
336                        count++;
337
338                        _pos = _endian_encode(item->pos);
339                        _len = _endian_encode(item->len);
340
341                        memcpy(buf + offset, &_pos, sizeof(_pos));
342                        offset += sizeof(_pos);
343                        memcpy(buf + offset, &_len, sizeof(_len));
344                        offset += sizeof(_len);
345
346                        if (offset + sizeof(_pos) + sizeof(_len) >= bufsize) {
347                            bufsize *= 2;
348                            buf = (uint8_t*)realloc(buf, bufsize);
349                        }
350                    }
351
352                    e = list_remove(handle->file->stale_list, e);
353                    free(item);
354                }
355            }
356
357            gather_staleblocks = false;
358            if (count) {
359                char *doc_key = alca(char, 32);
360                struct docio_object doc;
361
362                // store count
363                _count = _endian_encode(count);
364                memcpy(buf + count_location - sizeof(_count), &_count, sizeof(_count));
365
366                // append a system doc
367                memset(&doc, 0x0, sizeof(doc));
368                // add one to 'revnum' to get the next revision number
369                // (note that filemgr_mutex() is grabbed so that no other thread
370                //  will change the 'revnum').
371                sprintf(doc_key, "stale_blocks_%" _F64, revnum);
372                doc.key = (void*)doc_key;
373                doc.body = buf;
374                doc.length.keylen = strlen(doc_key) + 1;
375                doc.length.metalen = 0;
376                doc.length.bodylen = offset;
377                doc.seqnum = 0;
378                doc_offset = docio_append_doc_system(handle->dhandle, &doc);
379
380                // insert into stale-block tree
381                _doc_offset = _endian_encode(doc_offset);
382                btree_insert(handle->staletree, (void *)&_revnum, (void *)&_doc_offset);
383                btreeblk_end(handle->bhandle);
384                btreeblk_reset_subblock_info(handle->bhandle);
385
386                if (from_mergetree && first_loop) {
387                    // if from_mergetree flag is set and this is the first loop,
388                    // stale regions in this document are already in mergetree
389                    // so skip adding them into in-memory stale info tree.
390
391                    // however, the system doc itself should be marked as stale
392                    // when the doc is reclaimed, thus we instead add a dummy entry
393                    // that containing doc offset, length info only.
394                    fdb_add_inmem_stale_info(handle, revnum, &doc, doc_offset, true);
395                } else {
396                    // add the doc into in-memory stale info tree
397                    fdb_add_inmem_stale_info(handle, revnum, &doc, doc_offset, false);
398                }
399
400                if (list_begin(filemgr_get_stale_list(handle->file))) {
401                    // updating stale tree brings another stale blocks.
402                    // recursively update until there is no more stale block.
403
404                    // note that infinite loop will not occur because
405                    // 1) all updated index blocks for stale tree are still writable
406                    // 2) incoming keys for stale tree (revnum) are monotonic
407                    //    increasing order; most recently allocated node will be
408                    //    updated again.
409
410                    count = 0;
411                    // save previous doc offset
412                    memcpy(buf, &_doc_offset, sizeof(_doc_offset));
413
414                    // gather once again
415                    gather_staleblocks = true;
416                }
417            }
418
419            first_loop = false;
420        } // gather stale blocks
421
422        delta = handle->bhandle->nlivenodes - stat.nlivenodes;
423        _kvs_stat_update_attr(handle->file, 0, KVS_STAT_NLIVENODES, delta);
424        delta = handle->bhandle->ndeltanodes - stat.nlivenodes;
425        delta *= handle->config.blocksize;
426        _kvs_stat_update_attr(handle->file, 0, KVS_STAT_DELTASIZE, delta);
427
428        free(buf);
429    } else {
430        btreeblk_reset_subblock_info(handle->bhandle);
431    }
432}
433
434static int _reusable_offset_cmp(struct avl_node *a, struct avl_node *b, void *aux)
435{
436    struct stale_data *aa, *bb;
437    aa = _get_entry(a, struct stale_data, avl);
438    bb = _get_entry(b, struct stale_data, avl);
439    return _CMP_U64(aa->pos, bb->pos);
440}
441
442static void _insert_n_merge(struct avl_tree *tree,
443                            uint64_t item_pos,
444                            uint32_t item_len)
445{
446    struct stale_data query, *item;
447    struct avl_node *avl;
448
449    // retrieve the tree first
450    query.pos = item_pos;
451    avl = avl_search(tree, &query.avl, _reusable_offset_cmp);
452    if (avl) {
453        // same offset already exists
454        item = _get_entry(avl, struct stale_data, avl);
455        // choose longer length
456        if (item->len < item_len) {
457            item->len = item_len;
458        }
459    } else {
460        // does not exist .. create a new item
461        item = (struct stale_data*)
462               calloc(1, sizeof(struct stale_data));
463        item->pos = item_pos;
464        item->len = item_len;
465        avl_insert(tree, &item->avl, _reusable_offset_cmp);
466    }
467
468    // check prev/next item to see if they can be merged
469    struct avl_node *p_avl, *n_avl;
470    struct stale_data*p_item, *n_item;
471    p_avl = avl_prev(&item->avl);
472    if (p_avl) {
473        p_item = _get_entry(p_avl, struct stale_data, avl);
474        if (p_item->pos + p_item->len >= item->pos) {
475
476            if (p_item->pos + p_item->len >= item->pos + item->len) {
477                // 'item' is included in p_item .. simply remove it
478                // (do nothing)
479            } else {
480                // overlapping (or consecutive) .. merge two items
481                p_item->len += item->len +
482                               (item->pos - p_item->pos - p_item->len);
483            }
484            // remove current item
485            avl_remove(tree, &item->avl);
486            free(item);
487            item = p_item;
488        }
489    }
490
491    n_avl = avl_next(&item->avl);
492    if (n_avl) {
493        n_item = _get_entry(n_avl, struct stale_data, avl);
494        if (item->pos + item->len >= n_item->pos) {
495
496            if (item->pos + item->len >= n_item->pos + n_item->len) {
497                // 'n_item' is included in 'item' .. simply remove it
498                // (do nothing)
499            } else {
500                // overlapping (or consecutive) .. merge two items
501                item->len += n_item->len +
502                             (n_item->pos - item->pos - item->len);
503            }
504            // remove next item
505            avl_remove(tree, &n_item->avl);
506            free(n_item);
507        }
508    }
509}
510
511// Parse & fetch stale regions from the buffer 'ctx', which is the body of
512// a stale info system document (from either in-memory stale-block-tree or
513// on-disk stale-block-tree). After fetching, insert those regions
514// into 'mergetree'.
515static void _fetch_stale_info_doc(void *ctx,
516                                  struct avl_tree *mergetree,
517                                  uint64_t &prev_offset_out,
518                                  uint64_t &prev_hdr_out)
519{
520    uint32_t i, count, _count, item_len;
521    uint64_t pos;
522    uint64_t item_pos;
523
524    pos = 0;
525
526    // get previous doc offset
527    memcpy(&prev_offset_out, ctx, sizeof(prev_offset_out));
528    prev_offset_out = _endian_decode(prev_offset_out);
529    pos += sizeof(prev_offset_out);
530
531    // get previous header BID
532    memcpy(&prev_hdr_out, (uint8_t*)ctx + pos, sizeof(prev_hdr_out));
533    prev_hdr_out = _endian_decode(prev_hdr_out);
534    (void)prev_hdr_out;
535    pos += sizeof(prev_hdr_out);
536
537    // Skip kv_info_offset and default KVS's seqnum
538    pos += sizeof(uint64_t) + sizeof(fdb_seqnum_t);
539
540    // get count;
541    memcpy(&_count, (uint8_t*)ctx + pos, sizeof(_count));
542    count = _endian_decode(_count);
543    pos += sizeof(_count);
544
545    // get a stale region and insert/merge into tree
546    for (i=0;i<count;++i) {
547        memcpy(&item_pos, (uint8_t*)ctx + pos, sizeof(item_pos));
548        item_pos = _endian_decode(item_pos);
549        pos += sizeof(item_pos);
550
551        memcpy(&item_len, (uint8_t*)ctx + pos, sizeof(item_len));
552        item_len = _endian_decode(item_len);
553        pos += sizeof(item_len);
554
555        _insert_n_merge(mergetree, item_pos, item_len);
556    }
557}
558
559reusable_block_list fdb_get_reusable_block(fdb_kvs_handle *handle,
560                                           stale_header_info stale_header)
561{
562    int64_t delta;
563    int r;
564    uint8_t keybuf[64];
565    uint32_t i;
566    uint32_t item_len;
567    uint32_t n_revnums, max_revnum_array = 256;
568    uint64_t item_pos;
569    btree_iterator bit;
570    btree_result br;
571    filemgr_header_revnum_t revnum_upto, prev_revnum = 0;
572    filemgr_header_revnum_t revnum = 0, _revnum;
573    filemgr_header_revnum_t *revnum_array;
574    bid_t offset, _offset, prev_offset;
575    bid_t prev_hdr = BLK_NOT_FOUND;
576    bool stale_tree_scan = true;
577    struct docio_object doc;
578    struct avl_tree *mergetree = &handle->file->mergetree;
579    struct avl_node *avl;
580    struct stale_data *item;
581    struct list_elem *e, *e_last;
582    struct kvs_stat stat;
583
584    revnum_upto = stale_header.revnum;
585
586    r = _kvs_stat_get(handle->file, 0, &stat);
587    handle->bhandle->nlivenodes = stat.nlivenodes;
588    handle->bhandle->ndeltanodes = stat.nlivenodes;
589    (void)r;
590
591    revnum_array = (filemgr_header_revnum_t *)
592                   calloc(max_revnum_array, sizeof(filemgr_header_revnum_t));
593    n_revnums = 0;
594
595    // remember the last stale list item to be preserved
596    e_last = list_end(handle->file->stale_list);
597
598    avl = avl_first(&handle->file->stale_info_tree);
599    if (avl) {
600        // if in-memory stale info exists
601        void *uncomp_buf = NULL;
602        int r;
603        size_t uncomp_buflen = 128*1024; // 128 KB by default;
604        struct stale_info_commit *commit;
605        struct stale_info_entry *entry;
606
607        stale_tree_scan = false;
608
609        if (compress_inmem_stale_info) {
610            uncomp_buf = (void*)calloc(1, uncomp_buflen);
611            if (!uncomp_buf) {
612                fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
613                    "(fdb_get_reusable_block) "
614                    "calloc of 'uncomp_buf' failed: "
615                    "database file '%s', "
616                    "uncomp_buflen %d\n",
617                    handle->file->filename,
618                    (int)uncomp_buflen);
619                free(revnum_array);
620
621                reusable_block_list ret;
622                ret.n_blocks = 0;
623                ret.blocks = NULL;
624
625                return ret;
626            }
627        }
628
629        while (avl) {
630            commit = _get_entry(avl, struct stale_info_commit, avl);
631            avl = avl_next(avl);
632
633            prev_revnum = revnum;
634            revnum = commit->revnum;
635            if (revnum > revnum_upto) {
636                revnum = prev_revnum;
637                break;
638            }
639
640            filemgr_header_revnum_t *new_revnum_array = revnum_array;
641            revnum_array[n_revnums++] = revnum;
642            if (n_revnums >= max_revnum_array) {
643                max_revnum_array *= 2;
644                new_revnum_array = (filemgr_header_revnum_t *)
645                                   realloc(revnum_array, max_revnum_array *
646                                           sizeof(filemgr_header_revnum_t));
647            }
648            if (!new_revnum_array) {
649                // realloc() of revnum_array failed.
650                fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
651                    "(fdb_get_reusable_block) "
652                    "realloc of 'revnum_array' failed: "
653                    "database file '%s', "
654                    "max_revnum_array %d\n",
655                    handle->file->filename,
656                    (int)max_revnum_array);
657                free(uncomp_buf);
658                free(revnum_array);
659
660                reusable_block_list ret;
661                ret.n_blocks = 0;
662                ret.blocks = NULL;
663
664                return ret;
665            }
666            revnum_array = new_revnum_array;
667
668            avl_remove(&handle->file->stale_info_tree, &commit->avl);
669
670            e = list_begin(&commit->doc_list);
671            while (e) {
672                entry = _get_entry(e, struct stale_info_entry, le);
673                e = list_next(&entry->le);
674                list_remove(&commit->doc_list, &entry->le);
675
676                if (entry->ctx) {
677#ifdef _DOC_COMP
678                    if (compress_inmem_stale_info) {
679                        // uncompression
680                        void* new_uncomp_buf = uncomp_buf;
681                        if (uncomp_buflen < entry->ctxlen) {
682                            uncomp_buflen = entry->ctxlen;
683                            new_uncomp_buf = (void*)realloc(uncomp_buf, uncomp_buflen);
684                        }
685
686                        if (!new_uncomp_buf) {
687                            // realloc() of uncomp_buf failed.
688                            fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
689                                "(fdb_get_reusable_block) "
690                                "realloc of 'uncomp_buf' failed: "
691                                "database file '%s', "
692                                "uncomp_buflen %d, "
693                                "entry->ctxlen %d\n",
694                                handle->file->filename,
695                                (int)uncomp_buflen,
696                                (int)entry->ctxlen);
697                            free(uncomp_buf);
698                            free(revnum_array);
699
700                            reusable_block_list ret;
701                            ret.n_blocks = 0;
702                            ret.blocks = NULL;
703
704                            return ret;
705                        }
706                        uncomp_buf = new_uncomp_buf;
707
708                        size_t len = uncomp_buflen;
709                        r = snappy_uncompress((char*)entry->ctx, entry->comp_ctxlen,
710                                              (char*)uncomp_buf, &len);
711                        if (r != 0) {
712                            fdb_log(NULL, FDB_RESULT_COMPRESSION_FAIL,
713                                "(fdb_get_reusable_block) "
714                                "Uncompression error from a database file '%s'"
715                                ": return value %d, header revnum %" _F64 ", "
716                                "doc offset %" _F64 "\n",
717                                handle->file->filename, r, revnum, entry->offset);
718                            free(uncomp_buf);
719                            free(revnum_array);
720
721                            reusable_block_list ret;
722                            ret.n_blocks = 0;
723                            ret.blocks = NULL;
724
725                            return ret;
726                        }
727
728                        // fetch the context
729                        _fetch_stale_info_doc(uncomp_buf, mergetree,
730                                              prev_offset, prev_hdr);
731                    } else {
732                        _fetch_stale_info_doc(entry->ctx, mergetree,
733                                              prev_offset, prev_hdr);
734                    }
735#else
736                    _fetch_stale_info_doc(entry->ctx, mergetree, prev_offset, prev_hdr);
737#endif
738                }
739
740                // also insert/merge the system doc region
741                struct stale_regions sr;
742
743                sr = filemgr_actual_stale_regions(handle->file, entry->offset,
744                                                  entry->doclen);
745
746                if (sr.n_regions > 1) {
747                    for (i=0; i<sr.n_regions; ++i){
748                        _insert_n_merge(mergetree, sr.regions[i].pos, sr.regions[i].len);
749                    }
750                    free(sr.regions);
751                } else {
752                    _insert_n_merge(mergetree, sr.region.pos, sr.region.len);
753                }
754
755                free(entry->ctx);
756                free(entry);
757            }
758
759            free(commit);
760        }
761        free(uncomp_buf);
762    }
763
764    if (stale_tree_scan) {
765        // scan stale-block tree and get all stale regions
766        // corresponding to commit headers whose seq number is
767        // equal to or smaller than 'revnum_upto'
768        btree_iterator_init(handle->staletree, &bit, NULL);
769        do {
770            br = btree_next(&bit, (void*)&_revnum, (void*)&_offset);
771            btreeblk_end(handle->bhandle);
772            if (br != BTREE_RESULT_SUCCESS) {
773                break;
774            }
775
776            prev_revnum = revnum;
777            revnum = _endian_decode(_revnum);
778            if (revnum > revnum_upto) {
779                revnum = prev_revnum;
780                break;
781            }
782
783            revnum_array[n_revnums++] = revnum;
784            if (n_revnums >= max_revnum_array) {
785                max_revnum_array *= 2;
786                revnum_array = (filemgr_header_revnum_t *)
787                               realloc(revnum_array, max_revnum_array *
788                                   sizeof(filemgr_header_revnum_t));
789            }
790            offset = _endian_decode(_offset);
791
792            while (offset != BLK_NOT_FOUND) {
793                memset(&doc, 0x0, sizeof(doc));
794                // pre-allocated buffer for key
795                doc.key = (void*)keybuf;
796
797                if (docio_read_doc(handle->dhandle, offset, &doc, true) <= 0) {
798                    // read fail .. escape
799                    offset = BLK_NOT_FOUND;
800                    continue;
801                }
802
803                _fetch_stale_info_doc(doc.body, mergetree, prev_offset, prev_hdr);
804
805                // also insert/merge the system doc region
806                size_t length = _fdb_get_docsize(doc.length);
807                struct stale_regions sr;
808
809                sr = filemgr_actual_stale_regions(handle->file, offset, length);
810
811                if (sr.n_regions > 1) {
812                    for (i=0; i<sr.n_regions; ++i){
813                        _insert_n_merge(mergetree, sr.regions[i].pos, sr.regions[i].len);
814                    }
815                    free(sr.regions);
816                } else {
817                    _insert_n_merge(mergetree, sr.region.pos, sr.region.len);
818                }
819
820                // We don't need to free 'meta' as it will be NULL.
821                free(doc.body);
822
823                offset = prev_offset;
824            }
825        } while (true);
826        btree_iterator_free(&bit);
827    }
828
829    // remove merged commit headers
830    for (i=0; i<n_revnums; ++i) {
831        _revnum = _endian_encode(revnum_array[i]);
832        btree_remove(handle->staletree, (void*)&_revnum);
833        btreeblk_end(handle->bhandle);
834    }
835
836    delta = handle->bhandle->nlivenodes - stat.nlivenodes;
837    _kvs_stat_update_attr(handle->file, 0, KVS_STAT_NLIVENODES, delta);
838    delta = handle->bhandle->ndeltanodes - stat.nlivenodes;
839    delta *= handle->config.blocksize;
840    _kvs_stat_update_attr(handle->file, 0, KVS_STAT_DELTASIZE, delta);
841
842    // gather stale blocks generated by removing b+tree entries
843    if (e_last) {
844        e = list_next(e_last);
845    } else {
846        e = list_begin(handle->file->stale_list);
847    }
848    while (e) {
849        item = _get_entry(e, struct stale_data, le);
850        e = list_remove(handle->file->stale_list, e);
851
852        _insert_n_merge(mergetree, item->pos, item->len);
853        free(item);
854    }
855
856    // now merge stale regions as large as possible
857    size_t n_blocks =0 ;
858    size_t blocksize = handle->file->blocksize;
859    uint32_t max_blocks = 256;
860    uint32_t front_margin;
861    reusable_block_list ret;
862    struct reusable_block *blocks_arr;
863
864    blocks_arr = (struct reusable_block*)
865        calloc(max_blocks, sizeof(struct reusable_block));
866
867    avl = avl_first(mergetree);
868    while (avl) {
869        item = _get_entry(avl, struct stale_data, avl);
870        avl = avl_next(avl);
871
872        // A stale region can be represented as follows:
873        //
874        //  block x-1 |   block x  |  block x+1 |  block x+2
875        //  -----+----+------------+------------+--------+----
876        //   ... |  A |      B     |      C     |    D   | ...
877        //  -----+----+------------+------------+--------+----
878        //
879        // Only segment 'B' and 'C' can be reusable, and the other segments
880        // (i.e., 'A' and 'D') should be re-inserted into stale-block tree.
881
882        if (item->len < blocksize) {
883            // whole region is smaller than a block .. skip this item
884            //       |    block    |  ...
885            //  -----+---+-----+---+-------
886            //   ... |   |/////|   |  ...
887            //  -----+---+-----+---+-------
888            continue;
889        }
890
891        //            <------------ item_len ------------>
892        //  block x-1 |   block x  |  block x+1 |  block x+2
893        //  -----+----+------------+------------+--------+----
894        //   ... |  A |      B     |      C     |    D   | ...
895        //  -----+----+------------+------------+--------+----
896        //            ^
897        //            item_pos
898        if (item->pos % blocksize) {
899            front_margin = blocksize - item->pos % blocksize;
900        } else {
901            front_margin = 0;
902        }
903        item_pos = item->pos + front_margin;
904        item_len = item->len - front_margin;
905
906        if (item_len < blocksize) {
907            // Remaining length is smaller than a block. This means that there
908            // is no reusable block in this region (even though the region size is
909            // bigger than a block size) .. skip this item.
910            //
911            //       |   block x   |  block x+1  | ...
912            //  -----+------+------+-------+-----+----
913            //   ... |      |//////|///////|     | ...
914            //  -----+------+------+-------+-----+----
915            continue;
916        }
917
918        // calculate # blocks and add to 'blocks'
919        blocks_arr[n_blocks].bid = item_pos / blocksize;
920        blocks_arr[n_blocks].count = item_len / blocksize;
921        n_blocks += 1;
922        if (n_blocks >= max_blocks) {
923            max_blocks *= 2;
924            blocks_arr = (struct reusable_block*)
925                realloc(blocks_arr, max_blocks * sizeof(struct reusable_block));
926        }
927
928        if (front_margin) {
929            // adjust the existing item to indicate 'A' in above example
930            item->len = front_margin;
931        } else {
932            // exactly aligned .. remove this item
933            avl_remove(mergetree, &item->avl);
934            free(item);
935        }
936
937        uint32_t remaining_len;
938        remaining_len = item_len % blocksize;
939        if (remaining_len) {
940            // add a new item for the remaining region ('D' in above example)
941            struct stale_data *new_item;
942            new_item = (struct stale_data *)
943                       calloc(1, sizeof(struct stale_data));
944            new_item->pos = (blocks_arr[n_blocks-1].bid + blocks_arr[n_blocks-1].count)
945                            * blocksize;
946            new_item->len = remaining_len;
947            avl_insert(mergetree, &new_item->avl, _reusable_offset_cmp);
948            avl = avl_next(&new_item->avl);
949        }
950    }
951
952    // re-write stale tree using the last revnum as a key
953    // in this case, only stale regions newly generated by this function are gathered,
954    // and prev_hdr is set to BLK_NOT_FOUND, as corresponding seq numbers are
955    // already removed.
956
957    // however, do not remove the remaining items in the merge-tree and continue to
958    // merge them in the next block reclaim.
959    fdb_gather_stale_blocks(handle, revnum, BLK_NOT_FOUND, BLK_NOT_FOUND,
960                            0, e_last, true);
961
962    free(revnum_array);
963
964    ret.n_blocks = n_blocks;
965    ret.blocks = blocks_arr;
966    return ret;
967}
968
969void fdb_rollback_stale_blocks(fdb_kvs_handle *handle,
970                               filemgr_header_revnum_t cur_revnum)
971{
972    btree_result br;
973    filemgr_header_revnum_t i, _revnum;
974    struct avl_node *avl;
975    struct list_elem *elem;
976    struct stale_info_commit *commit, query;
977    struct stale_info_entry *entry;
978
979    if (handle->rollback_revnum == 0) {
980        return;
981    }
982
983    // remove from on-disk stale-tree
984    for (i = handle->rollback_revnum; i < cur_revnum; ++i) {
985        _revnum = _endian_encode(i);
986        br = btree_remove(handle->staletree, (void*)&_revnum);
987        // don't care the result
988        (void)br;
989        btreeblk_end(handle->bhandle);
990    }
991
992    // also remove from in-memory stale-tree
993    query.revnum = handle->rollback_revnum;
994    avl = avl_search(&handle->file->stale_info_tree,
995                   &query.avl, _inmem_stale_cmp);
996    if (!avl) {
997        avl = avl_search_greater(&handle->file->stale_info_tree,
998                               &query.avl, _inmem_stale_cmp);
999    }
1000    while (avl) {
1001        commit = _get_entry(avl, struct stale_info_commit, avl);
1002        avl = avl_next(avl);
1003
1004        avl_remove(&handle->file->stale_info_tree, &commit->avl);
1005
1006        elem = list_begin(&commit->doc_list);
1007        while (elem) {
1008            entry = _get_entry(elem, struct stale_info_entry, le);
1009            elem = list_remove(&commit->doc_list, &entry->le);
1010
1011            free(entry->ctx);
1012            free(entry);
1013        }
1014
1015        free(commit);
1016    }
1017}
1018
1019