xref: /6.0.3/forestdb/src/staleblock.cc (revision c338e06e)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <string.h>
20#include <stdint.h>
21#include <stdlib.h>
22
23#include "staleblock.h"
24#include "btreeblock.h"
25#include "list.h"
26#include "docio.h"
27#include "fdb_internal.h"
28#include "version.h"
29#include "time_utils.h"
30
31#ifdef _DOC_COMP
32#include "snappy-c.h"
33#endif
34
35#include "memleak.h"
36
37static bool compress_inmem_stale_info = true;
38
39static int _inmem_stale_cmp(struct avl_node *a, struct avl_node *b, void *aux)
40{
41    struct stale_info_commit *aa, *bb;
42    aa = _get_entry(a, struct stale_info_commit, avl);
43    bb = _get_entry(b, struct stale_info_commit, avl);
44
45    if (aa->revnum < bb->revnum) {
46        return -1;
47    } else if (aa->revnum > bb->revnum) {
48        return 1;
49    } else {
50        return 0;
51    }
52}
53
54static void fdb_add_inmem_stale_info(fdb_kvs_handle *handle,
55                                     filemgr_header_revnum_t revnum,
56                                     struct docio_object *doc,
57                                     uint64_t doc_offset,
58                                     bool system_doc_only)
59{
60    int ret;
61    size_t buflen = 0;
62    struct filemgr *file = handle->file;
63    struct avl_node *a;
64    struct stale_info_commit *item, query;
65    struct stale_info_entry *entry;
66
67    // search using revnum first
68    query.revnum = revnum;
69    a = avl_search(&file->stale_info_tree, &query.avl, _inmem_stale_cmp);
70    if (a) {
71        // already exist
72        item = _get_entry(a, struct stale_info_commit, avl);
73    } else {
74        // not exist .. create a new one and insert into tree
75        item = (struct stale_info_commit *)calloc(1, sizeof(struct stale_info_commit));
76        item->revnum = revnum;
77        list_init(&item->doc_list);
78        avl_insert(&file->stale_info_tree, &item->avl, _inmem_stale_cmp);
79        file->stale_info_tree_loaded.store(true, std::memory_order_relaxed);
80    }
81
82    entry = (struct stale_info_entry *)calloc(1, sizeof(struct stale_info_entry));
83
84    if (!system_doc_only) {
85#ifdef _DOC_COMP
86        if (compress_inmem_stale_info) {
87            buflen = snappy_max_compressed_length(doc->length.bodylen);;
88            entry->ctx = (void *)calloc(1, buflen);
89            ret = snappy_compress((char*)doc->body, doc->length.bodylen,
90                (char*)entry->ctx, &buflen);
91            if (ret != 0) {
92                fdb_log(NULL, FDB_RESULT_COMPRESSION_FAIL,
93                    "(fdb_add_inmem_stale_info) "
94                    "Compression error from a database file '%s'"
95                    ": return value %d, header revnum %" _F64 ", "
96                    "doc offset %" _F64 "\n",
97                    file->filename, ret, revnum, doc_offset);
98                if (!a) {
99                    // 'item' is allocated in this function call.
100                    avl_remove(&file->stale_info_tree, &item->avl);
101                    free(item);
102                }
103                free(entry);
104                return;
105            }
106        } else {
107            buflen = doc->length.bodylen;
108            entry->ctx = (void *)calloc(1, buflen);
109            memcpy(entry->ctx, doc->body, doc->length.bodylen);
110        }
111#else
112        buflen = doc->length.bodylen;
113        entry->ctx = (void *)calloc(1, buflen);
114        memcpy(entry->ctx, doc->body, doc->length.bodylen);
115#endif
116
117        entry->ctxlen = doc->length.bodylen;
118
119    } else {
120        // when 'system_doc_only' flag is set, just set to NULL.
121        // we need the doc's length and offset info only.
122        entry->ctx = NULL;
123        entry->ctxlen = 0;
124    }
125
126    entry->comp_ctxlen = buflen;
127    entry->doclen = _fdb_get_docsize(doc->length);
128    entry->offset = doc_offset;
129    list_push_back(&item->doc_list, &entry->le);
130}
131
132void fdb_load_inmem_stale_info(fdb_kvs_handle *handle)
133{
134    uint8_t keybuf[64];
135    int64_t ret;
136    bid_t offset, _offset, prev_offset;
137    filemgr_header_revnum_t revnum, _revnum;
138    btree_iterator bit;
139    btree_result br;
140    struct filemgr *file = handle->file;
141    struct docio_object doc;
142    bool expected = false;
143
144    if (!std::atomic_compare_exchange_strong(&file->stale_info_tree_loaded,
145                                             &expected, true)) {
146        // stale info is already loaded (fast screening without mutex)
147        return;
148    }
149
150    // first open of the DB file
151    // should grab mutex to avoid race with other writer
152    filemgr_mutex_lock(file);
153
154    btree_iterator_init(handle->staletree, &bit, NULL);
155    do {
156        br = btree_next(&bit, (void*)&_revnum, (void*)&_offset);
157        btreeblk_end(handle->bhandle);
158        if (br != BTREE_RESULT_SUCCESS) {
159            break;
160        }
161
162        revnum = _endian_decode(_revnum);
163        offset = _endian_decode(_offset);
164
165        while (offset != BLK_NOT_FOUND) {
166            memset(&doc, 0x0, sizeof(doc));
167            // pre-allocated buffer for key
168            doc.key = (void*)keybuf;
169
170            ret = docio_read_doc(handle->dhandle, offset, &doc, true);
171            if (ret <= 0) {
172                // read fail .. escape
173                fdb_log(NULL, (fdb_status)ret,
174                    "Error in reading a stale region info document "
175                    "from a database file '%s'"
176                    ": revnum %" _F64 ", offset %" _F64 "\n",
177                    file->filename, revnum, offset);
178                offset = BLK_NOT_FOUND;
179                continue;
180            }
181
182            fdb_add_inmem_stale_info(handle, revnum, &doc, offset, false);
183
184            // fetch previous doc offset
185            memcpy(&_offset, doc.body, sizeof(_offset));
186            prev_offset = _endian_decode(_offset);
187
188            // We don't need to free 'meta' as it will be NULL.
189            free(doc.body);
190
191            offset = prev_offset;
192        }
193    } while (true);
194    btree_iterator_free(&bit);
195
196    filemgr_mutex_unlock(file);
197}
198
199void fdb_gather_stale_blocks(fdb_kvs_handle *handle,
200                             filemgr_header_revnum_t revnum,
201                             bid_t prev_hdr,
202                             uint64_t kv_info_offset,
203                             fdb_seqnum_t seqnum,
204                             struct list_elem *e_last,
205                             bool from_mergetree)
206{
207    int64_t delta;
208    int r;
209    uint32_t count = 0;
210    uint32_t offset = 0, count_location;
211    uint32_t bufsize = 8192;
212    uint32_t _count, _len;
213    uint64_t _pos, _kv_info_offset;
214    uint8_t *buf = NULL;
215    bid_t doc_offset, _doc_offset;
216    bid_t _prev_hdr;
217    bool gather_staleblocks = true;
218    bool first_loop = true;
219    filemgr_header_revnum_t _revnum;
220    fdb_seqnum_t _seqnum;
221    struct kvs_stat stat;
222
223    /*
224     * << stale block system doc structure >>
225     * [previous doc offset]: 8 bytes (0xffff.. if not exist)
226     * [previous header BID]: 8 bytes (0xffff.. if not exist)
227     * [KVS info doc offset]: 8 bytes (0xffff.. if not exist)
228     * [Default KVS seqnum]:  8 bytes
229     * [# items]:             4 bytes
230     * ---
231     * [position]:            8 bytes
232     * [length]:              4 bytes
233     * ...
234     */
235
236    if (filemgr_get_stale_list(handle->file)) {
237        struct avl_node *a;
238        struct list_elem *e;
239        struct stale_data *item;
240
241        r = _kvs_stat_get(handle->file, 0, &stat);
242        handle->bhandle->nlivenodes = stat.nlivenodes;
243        handle->bhandle->ndeltanodes = stat.nlivenodes;
244        (void)r;
245
246        buf = (uint8_t *)calloc(1, bufsize);
247        if (!buf) {
248            fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
249                    "(fdb_gather_stale_blocks) "
250                    "calloc of 'buf' failed: "
251                    "database file '%s', "
252                    "bufsize %d\n",
253                    handle->file->filename,
254                    (int)bufsize);
255            return;
256        }
257        _revnum = _endian_encode(revnum);
258
259        // initial previous doc offset
260        memset(buf, 0xff, sizeof(bid_t));
261        count_location = sizeof(bid_t);
262
263        // previous header BID
264        if (prev_hdr == 0 || prev_hdr == BLK_NOT_FOUND) {
265            // does not exist
266            memset(&_prev_hdr, 0xff, sizeof(_prev_hdr));
267        } else {
268            _prev_hdr = _endian_encode(prev_hdr);
269        }
270        memcpy(buf + sizeof(bid_t), &_prev_hdr, sizeof(bid_t));
271        count_location += sizeof(bid_t);
272
273        // KVS info doc offset
274        _kv_info_offset = _endian_encode(kv_info_offset);
275        memcpy(buf + count_location, &_kv_info_offset, sizeof(uint64_t));
276        count_location += sizeof(uint64_t);
277
278        // default KVS seqnum
279        _seqnum = _endian_encode(seqnum);
280        memcpy(buf + count_location, &_seqnum, sizeof(fdb_seqnum_t));
281        count_location += sizeof(fdb_seqnum_t);
282        count_location += sizeof(count);
283
284        while(gather_staleblocks) {
285            // reserve space for
286            // prev offset (8), prev header (8), kv_info_offset (8),
287            // seqnum (8), count (4)
288            offset = count_location;
289
290            if (first_loop && from_mergetree) {
291                // gather from mergetree
292                a = avl_first(&handle->file->mergetree);
293                while (a) {
294                    item = _get_entry(a, struct stale_data, avl);
295
296                    if (handle->staletree) {
297                        count++;
298
299                        _pos = _endian_encode(item->pos);
300                        _len = _endian_encode(item->len);
301
302                        memcpy(buf + offset, &_pos, sizeof(_pos));
303                        offset += sizeof(_pos);
304                        memcpy(buf + offset, &_len, sizeof(_len));
305                        offset += sizeof(_len);
306
307                        if (offset + sizeof(_pos) + sizeof(_len) >= bufsize) {
308                            bufsize *= 2;
309                            buf = (uint8_t*)realloc(buf, bufsize);
310                            if (!buf) {
311                                fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
312                                        "(fdb_gather_stale_blocks) "
313                                        "realloc of 'buf' failed: "
314                                        "database file '%s', "
315                                        "bufsize %d\n",
316                                        handle->file->filename,
317                                        (int)bufsize);
318                                return;
319                            }
320                        }
321                    }
322
323
324                    // If 'from_mergetree' flag is set, it means that this
325                    // function is called at the end of fdb_get_reusable_block(),
326                    // and those items are remaining (non-reusable) regions after
327                    // picking up reusable blocks from 'mergetree'.
328
329                    // In the previous implementation, those items are converted
330                    // and stored as a system document. The document is re-read in
331                    // the next block reclaim, and then we reconstruct 'mergetree'
332                    // from the document; this is unnecessary duplicated overhead.
333
334                    // As an optimization, we can simply keep those items in
335                    // 'mergetree' and use them in the next block reclaim, without
336                    // reading the corresponding system document; this also reduces
337                    // the commit latency much. Instead, to minimize memory
338                    // consumption, we don't need to maintain in-memory copy of the
339                    // system doc corresponding to the remaining items in the
340                    // 'mergetree', that will be created below.
341
342                    // do not remove the item
343                    a = avl_next(&item->avl);
344                }
345            } else {
346                // gater from stale_list
347                if (e_last) {
348                    e = list_next(e_last);
349                } else {
350                    e = list_begin(handle->file->stale_list);
351                }
352                while (e) {
353                    item = _get_entry(e, struct stale_data, le);
354
355                    if (handle->staletree) {
356                        count++;
357
358                        _pos = _endian_encode(item->pos);
359                        _len = _endian_encode(item->len);
360
361                        memcpy(buf + offset, &_pos, sizeof(_pos));
362                        offset += sizeof(_pos);
363                        memcpy(buf + offset, &_len, sizeof(_len));
364                        offset += sizeof(_len);
365
366                        if (offset + sizeof(_pos) + sizeof(_len) >= bufsize) {
367                            bufsize *= 2;
368                            buf = (uint8_t*)realloc(buf, bufsize);
369                            if (!buf) {
370                                fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
371                                        "(fdb_gather_stale_blocks) "
372                                        "realloc of 'buf' failed: "
373                                        "database file '%s', "
374                                        "bufsize %d\n",
375                                        handle->file->filename,
376                                        (int)bufsize);
377                                return;
378                            }
379                        }
380                    }
381
382                    e = list_remove(handle->file->stale_list, e);
383                    free(item);
384                }
385            }
386
387            gather_staleblocks = false;
388            if (count) {
389                char *doc_key = alca(char, 32);
390                struct docio_object doc;
391
392                // store count
393                _count = _endian_encode(count);
394                memcpy(buf + count_location - sizeof(_count), &_count, sizeof(_count));
395
396                // append a system doc
397                memset(&doc, 0x0, sizeof(doc));
398                // add one to 'revnum' to get the next revision number
399                // (note that filemgr_mutex() is grabbed so that no other thread
400                //  will change the 'revnum').
401                sprintf(doc_key, "stale_blocks_%" _F64, revnum);
402                doc.key = (void*)doc_key;
403                doc.body = buf;
404                doc.length.keylen = strlen(doc_key) + 1;
405                doc.length.metalen = 0;
406                doc.length.bodylen = offset;
407                doc.seqnum = 0;
408                doc_offset = docio_append_doc_system(handle->dhandle, &doc);
409
410                // insert into stale-block tree
411                _doc_offset = _endian_encode(doc_offset);
412                btree_insert(handle->staletree, (void *)&_revnum, (void *)&_doc_offset);
413                btreeblk_end(handle->bhandle);
414                btreeblk_reset_subblock_info(handle->bhandle);
415
416                if (from_mergetree && first_loop) {
417                    // if from_mergetree flag is set and this is the first loop,
418                    // stale regions in this document are already in mergetree
419                    // so skip adding them into in-memory stale info tree.
420
421                    // however, the system doc itself should be marked as stale
422                    // when the doc is reclaimed, thus we instead add a dummy entry
423                    // that containing doc offset, length info only.
424                    fdb_add_inmem_stale_info(handle, revnum, &doc, doc_offset, true);
425                } else {
426                    // add the doc into in-memory stale info tree
427                    fdb_add_inmem_stale_info(handle, revnum, &doc, doc_offset, false);
428                }
429
430                if (list_begin(filemgr_get_stale_list(handle->file))) {
431                    // updating stale tree brings another stale blocks.
432                    // recursively update until there is no more stale block.
433
434                    // note that infinite loop will not occur because
435                    // 1) all updated index blocks for stale tree are still writable
436                    // 2) incoming keys for stale tree (revnum) are monotonic
437                    //    increasing order; most recently allocated node will be
438                    //    updated again.
439
440                    count = 0;
441                    // save previous doc offset
442                    memcpy(buf, &_doc_offset, sizeof(_doc_offset));
443
444                    // gather once again
445                    gather_staleblocks = true;
446                }
447            }
448
449            first_loop = false;
450        } // gather stale blocks
451
452        delta = handle->bhandle->nlivenodes - stat.nlivenodes;
453        _kvs_stat_update_attr(handle->file, 0, KVS_STAT_NLIVENODES, delta);
454        delta = handle->bhandle->ndeltanodes - stat.nlivenodes;
455        delta *= handle->config.blocksize;
456        _kvs_stat_update_attr(handle->file, 0, KVS_STAT_DELTASIZE, delta);
457
458        free(buf);
459    } else {
460        btreeblk_reset_subblock_info(handle->bhandle);
461    }
462}
463
464static int _reusable_offset_cmp(struct avl_node *a, struct avl_node *b, void *aux)
465{
466    struct stale_data *aa, *bb;
467    aa = _get_entry(a, struct stale_data, avl);
468    bb = _get_entry(b, struct stale_data, avl);
469    return _CMP_U64(aa->pos, bb->pos);
470}
471
472static void _insert_n_merge(struct avl_tree *tree,
473                            uint64_t item_pos,
474                            uint32_t item_len)
475{
476    struct stale_data query, *item;
477    struct avl_node *avl;
478
479    // retrieve the tree first
480    query.pos = item_pos;
481    avl = avl_search(tree, &query.avl, _reusable_offset_cmp);
482    if (avl) {
483        // same offset already exists
484        item = _get_entry(avl, struct stale_data, avl);
485        // choose longer length
486        if (item->len < item_len) {
487            item->len = item_len;
488        }
489    } else {
490        // does not exist .. create a new item
491        item = (struct stale_data*)
492               calloc(1, sizeof(struct stale_data));
493        item->pos = item_pos;
494        item->len = item_len;
495        avl_insert(tree, &item->avl, _reusable_offset_cmp);
496    }
497
498    // check prev/next item to see if they can be merged
499    struct avl_node *p_avl, *n_avl;
500    struct stale_data*p_item, *n_item;
501    p_avl = avl_prev(&item->avl);
502    if (p_avl) {
503        p_item = _get_entry(p_avl, struct stale_data, avl);
504        if (p_item->pos + p_item->len >= item->pos) {
505
506            if (p_item->pos + p_item->len >= item->pos + item->len) {
507                // 'item' is included in p_item .. simply remove it
508                // (do nothing)
509            } else {
510                // overlapping (or consecutive) .. merge two items
511                p_item->len += item->len +
512                               (item->pos - p_item->pos - p_item->len);
513            }
514            // remove current item
515            avl_remove(tree, &item->avl);
516            free(item);
517            item = p_item;
518        }
519    }
520
521    n_avl = avl_next(&item->avl);
522    if (n_avl) {
523        n_item = _get_entry(n_avl, struct stale_data, avl);
524        if (item->pos + item->len >= n_item->pos) {
525
526            if (item->pos + item->len >= n_item->pos + n_item->len) {
527                // 'n_item' is included in 'item' .. simply remove it
528                // (do nothing)
529            } else {
530                // overlapping (or consecutive) .. merge two items
531                item->len += n_item->len +
532                             (n_item->pos - item->pos - item->len);
533            }
534            // remove next item
535            avl_remove(tree, &n_item->avl);
536            free(n_item);
537        }
538    }
539}
540
541// Parse & fetch stale regions from the buffer 'ctx', which is the body of
542// a stale info system document (from either in-memory stale-block-tree or
543// on-disk stale-block-tree). After fetching, insert those regions
544// into 'mergetree'.
545static void _fetch_stale_info_doc(void *ctx,
546                                  struct avl_tree *mergetree,
547                                  uint64_t &prev_offset_out,
548                                  uint64_t &prev_hdr_out)
549{
550    uint32_t i, count, _count, item_len;
551    uint64_t pos;
552    uint64_t item_pos;
553
554    pos = 0;
555
556    // get previous doc offset
557    memcpy(&prev_offset_out, ctx, sizeof(prev_offset_out));
558    prev_offset_out = _endian_decode(prev_offset_out);
559    pos += sizeof(prev_offset_out);
560
561    // get previous header BID
562    memcpy(&prev_hdr_out, (uint8_t*)ctx + pos, sizeof(prev_hdr_out));
563    prev_hdr_out = _endian_decode(prev_hdr_out);
564    (void)prev_hdr_out;
565    pos += sizeof(prev_hdr_out);
566
567    // Skip kv_info_offset and default KVS's seqnum
568    pos += sizeof(uint64_t) + sizeof(fdb_seqnum_t);
569
570    // get count;
571    memcpy(&_count, (uint8_t*)ctx + pos, sizeof(_count));
572    count = _endian_decode(_count);
573    pos += sizeof(_count);
574
575    // get a stale region and insert/merge into tree
576    for (i=0;i<count;++i) {
577        memcpy(&item_pos, (uint8_t*)ctx + pos, sizeof(item_pos));
578        item_pos = _endian_decode(item_pos);
579        pos += sizeof(item_pos);
580
581        memcpy(&item_len, (uint8_t*)ctx + pos, sizeof(item_len));
582        item_len = _endian_decode(item_len);
583        pos += sizeof(item_len);
584
585        _insert_n_merge(mergetree, item_pos, item_len);
586    }
587}
588
589reusable_block_list fdb_get_reusable_block(fdb_kvs_handle *handle,
590                                           stale_header_info stale_header)
591{
592    int64_t delta;
593    int r;
594    uint8_t keybuf[64];
595    uint32_t i;
596    uint32_t item_len;
597    uint32_t n_revnums, max_revnum_array = 256;
598    uint64_t item_pos;
599    btree_iterator bit;
600    btree_result br;
601    filemgr_header_revnum_t revnum_upto, prev_revnum = 0;
602    filemgr_header_revnum_t revnum = 0, _revnum;
603    filemgr_header_revnum_t *revnum_array;
604    bid_t offset, _offset, prev_offset;
605    bid_t prev_hdr = BLK_NOT_FOUND;
606    bool stale_tree_scan = true;
607    struct docio_object doc;
608    struct avl_tree *mergetree = &handle->file->mergetree;
609    struct avl_node *avl;
610    struct stale_data *item;
611    struct list_elem *e, *e_last;
612    struct kvs_stat stat;
613
614    revnum_upto = stale_header.revnum;
615
616    r = _kvs_stat_get(handle->file, 0, &stat);
617    handle->bhandle->nlivenodes = stat.nlivenodes;
618    handle->bhandle->ndeltanodes = stat.nlivenodes;
619    (void)r;
620
621    revnum_array = (filemgr_header_revnum_t *)
622                   calloc(max_revnum_array, sizeof(filemgr_header_revnum_t));
623    n_revnums = 0;
624
625    // remember the last stale list item to be preserved
626    e_last = list_end(handle->file->stale_list);
627
628    avl = avl_first(&handle->file->stale_info_tree);
629    if (avl) {
630        // if in-memory stale info exists
631        void *uncomp_buf = NULL;
632        int r;
633        size_t uncomp_buflen = 128*1024; // 128 KB by default;
634        struct stale_info_commit *commit;
635        struct stale_info_entry *entry;
636
637        stale_tree_scan = false;
638
639        if (compress_inmem_stale_info) {
640            uncomp_buf = (void*)calloc(1, uncomp_buflen);
641            if (!uncomp_buf) {
642                fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
643                    "(fdb_get_reusable_block) "
644                    "calloc of 'uncomp_buf' failed: "
645                    "database file '%s', "
646                    "uncomp_buflen %d\n",
647                    handle->file->filename,
648                    (int)uncomp_buflen);
649                free(revnum_array);
650
651                reusable_block_list ret;
652                ret.n_blocks = 0;
653                ret.blocks = NULL;
654
655                return ret;
656            }
657        }
658
659        while (avl) {
660            commit = _get_entry(avl, struct stale_info_commit, avl);
661            avl = avl_next(avl);
662
663            prev_revnum = revnum;
664            revnum = commit->revnum;
665            if (revnum > revnum_upto) {
666                revnum = prev_revnum;
667                break;
668            }
669
670            filemgr_header_revnum_t *new_revnum_array = revnum_array;
671            revnum_array[n_revnums++] = revnum;
672            if (n_revnums >= max_revnum_array) {
673                max_revnum_array *= 2;
674                new_revnum_array = (filemgr_header_revnum_t *)
675                                   realloc(revnum_array, max_revnum_array *
676                                           sizeof(filemgr_header_revnum_t));
677            }
678            if (!new_revnum_array) {
679                // realloc() of revnum_array failed.
680                fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
681                    "(fdb_get_reusable_block) "
682                    "realloc of 'revnum_array' failed: "
683                    "database file '%s', "
684                    "max_revnum_array %d\n",
685                    handle->file->filename,
686                    (int)max_revnum_array);
687                free(uncomp_buf);
688                free(revnum_array);
689
690                reusable_block_list ret;
691                ret.n_blocks = 0;
692                ret.blocks = NULL;
693
694                return ret;
695            }
696            revnum_array = new_revnum_array;
697
698            avl_remove(&handle->file->stale_info_tree, &commit->avl);
699
700            e = list_begin(&commit->doc_list);
701            while (e) {
702                entry = _get_entry(e, struct stale_info_entry, le);
703                e = list_next(&entry->le);
704                list_remove(&commit->doc_list, &entry->le);
705
706                if (entry->ctx) {
707#ifdef _DOC_COMP
708                    if (compress_inmem_stale_info) {
709                        // uncompression
710                        void* new_uncomp_buf = uncomp_buf;
711                        if (uncomp_buflen < entry->ctxlen) {
712                            uncomp_buflen = entry->ctxlen;
713                            new_uncomp_buf = (void*)realloc(uncomp_buf, uncomp_buflen);
714                        }
715
716                        if (!new_uncomp_buf) {
717                            // realloc() of uncomp_buf failed.
718                            fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
719                                "(fdb_get_reusable_block) "
720                                "realloc of 'uncomp_buf' failed: "
721                                "database file '%s', "
722                                "uncomp_buflen %d, "
723                                "entry->ctxlen %d\n",
724                                handle->file->filename,
725                                (int)uncomp_buflen,
726                                (int)entry->ctxlen);
727                            free(uncomp_buf);
728                            free(revnum_array);
729
730                            reusable_block_list ret;
731                            ret.n_blocks = 0;
732                            ret.blocks = NULL;
733
734                            return ret;
735                        }
736                        uncomp_buf = new_uncomp_buf;
737
738                        size_t len = uncomp_buflen;
739                        r = snappy_uncompress((char*)entry->ctx, entry->comp_ctxlen,
740                                              (char*)uncomp_buf, &len);
741                        if (r != 0) {
742                            fdb_log(NULL, FDB_RESULT_COMPRESSION_FAIL,
743                                "(fdb_get_reusable_block) "
744                                "Uncompression error from a database file '%s'"
745                                ": return value %d, header revnum %" _F64 ", "
746                                "doc offset %" _F64 "\n",
747                                handle->file->filename, r, revnum, entry->offset);
748                            free(uncomp_buf);
749                            free(revnum_array);
750
751                            reusable_block_list ret;
752                            ret.n_blocks = 0;
753                            ret.blocks = NULL;
754
755                            return ret;
756                        }
757
758                        // fetch the context
759                        _fetch_stale_info_doc(uncomp_buf, mergetree,
760                                              prev_offset, prev_hdr);
761                    } else {
762                        _fetch_stale_info_doc(entry->ctx, mergetree,
763                                              prev_offset, prev_hdr);
764                    }
765#else
766                    _fetch_stale_info_doc(entry->ctx, mergetree, prev_offset, prev_hdr);
767#endif
768                }
769
770                // also insert/merge the system doc region
771                struct stale_regions sr;
772
773                sr = filemgr_actual_stale_regions(handle->file, entry->offset,
774                                                  entry->doclen);
775
776                if (sr.n_regions > 1) {
777                    for (i=0; i<sr.n_regions; ++i){
778                        _insert_n_merge(mergetree, sr.regions[i].pos, sr.regions[i].len);
779                    }
780                    free(sr.regions);
781                } else {
782                    _insert_n_merge(mergetree, sr.region.pos, sr.region.len);
783                }
784
785                free(entry->ctx);
786                free(entry);
787            }
788
789            free(commit);
790        }
791        free(uncomp_buf);
792    }
793
794    if (stale_tree_scan) {
795        // scan stale-block tree and get all stale regions
796        // corresponding to commit headers whose seq number is
797        // equal to or smaller than 'revnum_upto'
798        btree_iterator_init(handle->staletree, &bit, NULL);
799        do {
800            br = btree_next(&bit, (void*)&_revnum, (void*)&_offset);
801            btreeblk_end(handle->bhandle);
802            if (br != BTREE_RESULT_SUCCESS) {
803                break;
804            }
805
806            prev_revnum = revnum;
807            revnum = _endian_decode(_revnum);
808            if (revnum > revnum_upto) {
809                revnum = prev_revnum;
810                break;
811            }
812
813            revnum_array[n_revnums++] = revnum;
814            if (n_revnums >= max_revnum_array) {
815                max_revnum_array *= 2;
816                revnum_array = (filemgr_header_revnum_t *)
817                               realloc(revnum_array, max_revnum_array *
818                                   sizeof(filemgr_header_revnum_t));
819            }
820            offset = _endian_decode(_offset);
821
822            while (offset != BLK_NOT_FOUND) {
823                memset(&doc, 0x0, sizeof(doc));
824                // pre-allocated buffer for key
825                doc.key = (void*)keybuf;
826
827                if (docio_read_doc(handle->dhandle, offset, &doc, true) <= 0) {
828                    // read fail .. escape
829                    offset = BLK_NOT_FOUND;
830                    continue;
831                }
832
833                _fetch_stale_info_doc(doc.body, mergetree, prev_offset, prev_hdr);
834
835                // also insert/merge the system doc region
836                size_t length = _fdb_get_docsize(doc.length);
837                struct stale_regions sr;
838
839                sr = filemgr_actual_stale_regions(handle->file, offset, length);
840
841                if (sr.n_regions > 1) {
842                    for (i=0; i<sr.n_regions; ++i){
843                        _insert_n_merge(mergetree, sr.regions[i].pos, sr.regions[i].len);
844                    }
845                    free(sr.regions);
846                } else {
847                    _insert_n_merge(mergetree, sr.region.pos, sr.region.len);
848                }
849
850                // We don't need to free 'meta' as it will be NULL.
851                free(doc.body);
852
853                offset = prev_offset;
854            }
855        } while (true);
856        btree_iterator_free(&bit);
857    }
858
859    // remove merged commit headers
860    for (i=0; i<n_revnums; ++i) {
861        _revnum = _endian_encode(revnum_array[i]);
862        btree_remove(handle->staletree, (void*)&_revnum);
863        btreeblk_end(handle->bhandle);
864    }
865
866    delta = handle->bhandle->nlivenodes - stat.nlivenodes;
867    _kvs_stat_update_attr(handle->file, 0, KVS_STAT_NLIVENODES, delta);
868    delta = handle->bhandle->ndeltanodes - stat.nlivenodes;
869    delta *= handle->config.blocksize;
870    _kvs_stat_update_attr(handle->file, 0, KVS_STAT_DELTASIZE, delta);
871
872    // gather stale blocks generated by removing b+tree entries
873    if (e_last) {
874        e = list_next(e_last);
875    } else {
876        e = list_begin(handle->file->stale_list);
877    }
878    while (e) {
879        item = _get_entry(e, struct stale_data, le);
880        e = list_remove(handle->file->stale_list, e);
881
882        _insert_n_merge(mergetree, item->pos, item->len);
883        free(item);
884    }
885
886    // now merge stale regions as large as possible
887    size_t n_blocks =0 ;
888    size_t blocksize = handle->file->blocksize;
889    uint32_t max_blocks = 256;
890    uint32_t front_margin;
891    reusable_block_list ret;
892    struct reusable_block *blocks_arr;
893
894    blocks_arr = (struct reusable_block*)
895        calloc(max_blocks, sizeof(struct reusable_block));
896
897    avl = avl_first(mergetree);
898    while (avl) {
899        item = _get_entry(avl, struct stale_data, avl);
900        avl = avl_next(avl);
901
902        // A stale region can be represented as follows:
903        //
904        //  block x-1 |   block x  |  block x+1 |  block x+2
905        //  -----+----+------------+------------+--------+----
906        //   ... |  A |      B     |      C     |    D   | ...
907        //  -----+----+------------+------------+--------+----
908        //
909        // Only segment 'B' and 'C' can be reusable, and the other segments
910        // (i.e., 'A' and 'D') should be re-inserted into stale-block tree.
911
912        if (item->len < blocksize) {
913            // whole region is smaller than a block .. skip this item
914            //       |    block    |  ...
915            //  -----+---+-----+---+-------
916            //   ... |   |/////|   |  ...
917            //  -----+---+-----+---+-------
918            continue;
919        }
920
921        //            <------------ item_len ------------>
922        //  block x-1 |   block x  |  block x+1 |  block x+2
923        //  -----+----+------------+------------+--------+----
924        //   ... |  A |      B     |      C     |    D   | ...
925        //  -----+----+------------+------------+--------+----
926        //            ^
927        //            item_pos
928        if (item->pos % blocksize) {
929            front_margin = blocksize - item->pos % blocksize;
930        } else {
931            front_margin = 0;
932        }
933        item_pos = item->pos + front_margin;
934        item_len = item->len - front_margin;
935
936        if (item_len < blocksize) {
937            // Remaining length is smaller than a block. This means that there
938            // is no reusable block in this region (even though the region size is
939            // bigger than a block size) .. skip this item.
940            //
941            //       |   block x   |  block x+1  | ...
942            //  -----+------+------+-------+-----+----
943            //   ... |      |//////|///////|     | ...
944            //  -----+------+------+-------+-----+----
945            continue;
946        }
947
948        // calculate # blocks and add to 'blocks'
949        blocks_arr[n_blocks].bid = item_pos / blocksize;
950        blocks_arr[n_blocks].count = item_len / blocksize;
951        n_blocks += 1;
952        if (n_blocks >= max_blocks) {
953            max_blocks *= 2;
954            blocks_arr = (struct reusable_block*)
955                realloc(blocks_arr, max_blocks * sizeof(struct reusable_block));
956        }
957
958        if (front_margin) {
959            // adjust the existing item to indicate 'A' in above example
960            item->len = front_margin;
961        } else {
962            // exactly aligned .. remove this item
963            avl_remove(mergetree, &item->avl);
964            free(item);
965        }
966
967        uint32_t remaining_len;
968        remaining_len = item_len % blocksize;
969        if (remaining_len) {
970            // add a new item for the remaining region ('D' in above example)
971            struct stale_data *new_item;
972            new_item = (struct stale_data *)
973                       calloc(1, sizeof(struct stale_data));
974            new_item->pos = (blocks_arr[n_blocks-1].bid + blocks_arr[n_blocks-1].count)
975                            * blocksize;
976            new_item->len = remaining_len;
977            avl_insert(mergetree, &new_item->avl, _reusable_offset_cmp);
978            avl = avl_next(&new_item->avl);
979        }
980    }
981
982    // re-write stale tree using the last revnum as a key
983    // in this case, only stale regions newly generated by this function are gathered,
984    // and prev_hdr is set to BLK_NOT_FOUND, as corresponding seq numbers are
985    // already removed.
986
987    // however, do not remove the remaining items in the merge-tree and continue to
988    // merge them in the next block reclaim.
989    fdb_gather_stale_blocks(handle, revnum, BLK_NOT_FOUND, BLK_NOT_FOUND,
990                            0, e_last, true);
991
992    free(revnum_array);
993
994    ret.n_blocks = n_blocks;
995    ret.blocks = blocks_arr;
996    return ret;
997}
998
999void fdb_rollback_stale_blocks(fdb_kvs_handle *handle,
1000                               filemgr_header_revnum_t cur_revnum)
1001{
1002    btree_result br;
1003    filemgr_header_revnum_t i, _revnum;
1004    struct avl_node *avl;
1005    struct list_elem *elem;
1006    struct stale_info_commit *commit, query;
1007    struct stale_info_entry *entry;
1008
1009    if (handle->rollback_revnum == 0) {
1010        return;
1011    }
1012
1013    // remove from on-disk stale-tree
1014    for (i = handle->rollback_revnum; i < cur_revnum; ++i) {
1015        _revnum = _endian_encode(i);
1016        br = btree_remove(handle->staletree, (void*)&_revnum);
1017        // don't care the result
1018        (void)br;
1019        btreeblk_end(handle->bhandle);
1020    }
1021
1022    // also remove from in-memory stale-tree
1023    query.revnum = handle->rollback_revnum;
1024    avl = avl_search(&handle->file->stale_info_tree,
1025                   &query.avl, _inmem_stale_cmp);
1026    if (!avl) {
1027        avl = avl_search_greater(&handle->file->stale_info_tree,
1028                               &query.avl, _inmem_stale_cmp);
1029    }
1030    while (avl) {
1031        commit = _get_entry(avl, struct stale_info_commit, avl);
1032        avl = avl_next(avl);
1033
1034        avl_remove(&handle->file->stale_info_tree, &commit->avl);
1035
1036        elem = list_begin(&commit->doc_list);
1037        while (elem) {
1038            entry = _get_entry(elem, struct stale_info_entry, le);
1039            elem = list_remove(&commit->doc_list, &entry->le);
1040
1041            free(entry->ctx);
1042            free(entry);
1043        }
1044
1045        free(commit);
1046    }
1047}
1048
1049