xref: /6.6.0/forestdb/src/forestdb.cc (revision 4c3b2f9b)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <sys/time.h>
25#endif
26
27#include "libforestdb/forestdb.h"
28#include "fdb_internal.h"
29#include "filemgr.h"
30#include "hbtrie.h"
31#include "list.h"
32#include "breakpad.h"
33#include "btree.h"
34#include "btree_kv.h"
35#include "btree_var_kv_ops.h"
36#include "docio.h"
37#include "btreeblock.h"
38#include "common.h"
39#include "wal.h"
40#include "filemgr_ops.h"
41#include "configuration.h"
42#include "internal_types.h"
43#include "bgflusher.h"
44#include "compactor.h"
45#include "memleak.h"
46#include "time_utils.h"
47#include "timing.h"
48#include "system_resource_stats.h"
49#include "version.h"
50#include "staleblock.h"
51
52#ifdef __DEBUG
53#ifndef __DEBUG_FDB
54    #undef DBG
55    #undef DBGCMD
56    #undef DBGSW
57    #define DBG(...)
58    #define DBGCMD(...)
59    #define DBGSW(n, ...)
60#endif
61#endif
62
63
64static atomic_uint8_t fdb_initialized(0);
65static volatile uint32_t fdb_open_inprog = 0;
66#ifdef SPIN_INITIALIZER
67static spin_t initial_lock = SPIN_INITIALIZER;
68#else
69static volatile unsigned int initial_lock_status = 0;
70static spin_t initial_lock;
71#endif
72
73INLINE int _cmp_uint64_t_endian_safe(void *key1, void *key2, void *aux)
74{
75    (void) aux;
76    uint64_t a,b;
77    a = *(uint64_t*)key1;
78    b = *(uint64_t*)key2;
79    a = _endian_decode(a);
80    b = _endian_decode(b);
81    return _CMP_U64(a, b);
82}
83
84size_t _fdb_readkey_wrap(void *handle, uint64_t offset, void *buf)
85{
86    fdb_status fs;
87    keylen_t keylen;
88    struct docio_handle *dhandle = (struct docio_handle*)handle;
89
90    offset = _endian_decode(offset);
91    fs = docio_read_doc_key(dhandle, offset, &keylen, buf);
92    if (fs == FDB_RESULT_SUCCESS) {
93        return keylen;
94    } else {
95        const char *msg = "docio_read_doc_key error: read failure on "
96            "offset %" _F64 " in a database file '%s' "
97            ": FDB status %d, lastbid 0x%" _X64 ", "
98            "curblock 0x%" _X64 ", curpos 0x%x\n";
99        fdb_log(NULL, FDB_RESULT_READ_FAIL, msg, offset,
100                dhandle->file->filename, fs, dhandle->lastbid,
101                dhandle->curblock, dhandle->curpos);
102        dbg_print_buf(dhandle->readbuffer, dhandle->file->blocksize, true, 16);
103        return 0;
104    }
105}
106
107void _fdb_invalidate_dbheader(fdb_kvs_handle *handle ){
108    bid_t hdr_bid;
109    hdr_bid = handle->last_hdr_bid;
110    if (hdr_bid != BLK_NOT_FOUND){
111        // invalidate the last dbheader
112        filemgr_invalidate_dbheader(handle->file, hdr_bid, &handle->log_callback);
113    }
114}
115size_t _fdb_readseq_wrap(void *handle, uint64_t offset, void *buf)
116{
117    int size_id, size_seq, size_chunk;
118    fdb_seqnum_t _seqnum;
119    struct docio_object doc;
120    struct docio_handle *dhandle = (struct docio_handle *)handle;
121
122    size_id = sizeof(fdb_kvs_id_t);
123    size_seq = sizeof(fdb_seqnum_t);
124    size_chunk = dhandle->file->config->chunksize;
125    memset(&doc, 0, sizeof(struct docio_object));
126
127    offset = _endian_decode(offset);
128    if (docio_read_doc_key_meta((struct docio_handle *)handle, offset,
129                                &doc, true) <= 0) {
130        return 0;
131    }
132    buf2buf(size_chunk, doc.key, size_id, buf);
133    _seqnum = _endian_encode(doc.seqnum);
134    memcpy((uint8_t*)buf + size_id, &_seqnum, size_seq);
135
136    free(doc.key);
137    free(doc.meta);
138
139    return size_id + size_seq;
140}
141
142int _fdb_custom_cmp_wrap(void *key1, void *key2, void *aux)
143{
144    int is_key1_inf, is_key2_inf;
145    uint8_t *keystr1 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
146    uint8_t *keystr2 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
147    size_t keylen1, keylen2;
148    btree_cmp_args *args = (btree_cmp_args *)aux;
149    fdb_custom_cmp_variable cmp = (fdb_custom_cmp_variable)args->aux;
150
151    is_key1_inf = _is_inf_key(key1);
152    is_key2_inf = _is_inf_key(key2);
153    if (is_key1_inf && is_key2_inf) { // both are infinite
154        return 0;
155    } else if (!is_key1_inf && is_key2_inf) { // key2 is infinite
156        return -1;
157    } else if (is_key1_inf && !is_key2_inf) { // key1 is infinite
158        return 1;
159    }
160
161    _get_var_key(key1, (void*)keystr1, &keylen1);
162    _get_var_key(key2, (void*)keystr2, &keylen2);
163
164    if (keylen1 == 0 && keylen2 == 0) {
165        return 0;
166    } else if (keylen1 ==0 && keylen2 > 0) {
167        return -1;
168    } else if (keylen1 > 0 && keylen2 == 0) {
169        return 1;
170    }
171
172    return cmp(keystr1, keylen1, keystr2, keylen2);
173}
174
175void fdb_fetch_header(uint64_t version,
176                      void *header_buf,
177                      bid_t *trie_root_bid,
178                      bid_t *seq_root_bid,
179                      bid_t *stale_root_bid,
180                      uint64_t *ndocs,
181                      uint64_t *ndeletes,
182                      uint64_t *nlivenodes,
183                      uint64_t *datasize,
184                      uint64_t *last_wal_flush_hdr_bid,
185                      uint64_t *kv_info_offset,
186                      uint64_t *header_flags,
187                      char **new_filename,
188                      char **old_filename)
189{
190    size_t offset = 0;
191    uint16_t new_filename_len;
192    uint16_t old_filename_len;
193
194    seq_memcpy(trie_root_bid, (uint8_t *)header_buf + offset,
195               sizeof(bid_t), offset);
196    *trie_root_bid = _endian_decode(*trie_root_bid);
197
198    seq_memcpy(seq_root_bid, (uint8_t *)header_buf + offset,
199               sizeof(bid_t), offset);
200    *seq_root_bid = _endian_decode(*seq_root_bid);
201
202    if (ver_staletree_support(version)) {
203        seq_memcpy(stale_root_bid, (uint8_t *)header_buf + offset,
204                   sizeof(bid_t), offset);
205        *stale_root_bid = _endian_decode(*stale_root_bid);
206    } else {
207        *stale_root_bid = BLK_NOT_FOUND;
208    }
209
210    seq_memcpy(ndocs, (uint8_t *)header_buf + offset,
211               sizeof(uint64_t), offset);
212    *ndocs = _endian_decode(*ndocs);
213    if (ver_is_atleast_magic_001(version)) {
214        seq_memcpy(ndeletes, (uint8_t *)header_buf + offset,
215                   sizeof(uint64_t), offset);
216        *ndeletes = _endian_decode(*ndeletes);
217    } else {
218        *ndeletes = 0;
219    }
220
221    seq_memcpy(nlivenodes, (uint8_t *)header_buf + offset,
222               sizeof(uint64_t), offset);
223    *nlivenodes = _endian_decode(*nlivenodes);
224
225    seq_memcpy(datasize, (uint8_t *)header_buf + offset,
226               sizeof(uint64_t), offset);
227    *datasize = _endian_decode(*datasize);
228
229    seq_memcpy(last_wal_flush_hdr_bid, (uint8_t *)header_buf + offset,
230               sizeof(uint64_t), offset);
231    *last_wal_flush_hdr_bid = _endian_decode(*last_wal_flush_hdr_bid);
232
233    seq_memcpy(kv_info_offset, (uint8_t *)header_buf + offset,
234               sizeof(uint64_t), offset);
235    *kv_info_offset = _endian_decode(*kv_info_offset);
236
237    seq_memcpy(header_flags, (uint8_t *)header_buf + offset,
238               sizeof(uint64_t), offset);
239    *header_flags = _endian_decode(*header_flags);
240
241    seq_memcpy(&new_filename_len, (uint8_t *)header_buf + offset,
242               sizeof(new_filename_len), offset);
243    new_filename_len = _endian_decode(new_filename_len);
244    seq_memcpy(&old_filename_len, (uint8_t *)header_buf + offset,
245               sizeof(old_filename_len), offset);
246    old_filename_len = _endian_decode(old_filename_len);
247    if (new_filename_len) {
248        *new_filename = (char*)((uint8_t *)header_buf + offset);
249    } else {
250        *new_filename = NULL;
251    }
252    offset += new_filename_len;
253    if (old_filename && old_filename_len) {
254        *old_filename = (char *) malloc(old_filename_len);
255        seq_memcpy(*old_filename,
256                   (uint8_t *)header_buf + offset,
257                   old_filename_len, offset);
258    }
259}
260
261// read the revnum of the given header of BID
262INLINE filemgr_header_revnum_t _fdb_get_header_revnum(fdb_kvs_handle *handle, bid_t bid)
263{
264    uint8_t *buf = alca(uint8_t, handle->file->blocksize);
265    uint64_t version;
266    size_t header_len;
267    fdb_seqnum_t seqnum;
268    filemgr_header_revnum_t revnum = 0;
269    fdb_status fs;
270
271    fs = filemgr_fetch_header(handle->file, bid, buf, &header_len,
272                              &seqnum, &revnum, NULL, &version, NULL,
273                              &handle->log_callback);
274    if (fs != FDB_RESULT_SUCCESS) {
275        return 0;
276    }
277    return revnum;
278}
279
280INLINE filemgr_header_revnum_t _fdb_get_bmp_revnum(fdb_kvs_handle *handle, bid_t bid)
281{
282    uint8_t *buf = alca(uint8_t, handle->file->blocksize);
283    uint64_t version, bmp_revnum = 0;
284    size_t header_len;
285    fdb_seqnum_t seqnum;
286    filemgr_header_revnum_t revnum;
287    fdb_status fs;
288
289    fs = filemgr_fetch_header(handle->file, bid, buf, &header_len,
290                              &seqnum, &revnum, NULL, &version, &bmp_revnum,
291                              &handle->log_callback);
292    if (fs != FDB_RESULT_SUCCESS) {
293        return 0;
294    }
295    return bmp_revnum;
296}
297
298void fdb_dummy_log_callback(int err_code, const char *err_msg, void *ctx_data)
299{
300    (void)err_code;
301    (void)err_msg;
302    (void)ctx_data;
303    return;
304}
305
306INLINE void _fdb_restore_wal(fdb_kvs_handle *handle,
307                             fdb_restore_mode_t mode,
308                             bid_t hdr_bid,
309                             fdb_kvs_id_t kv_id_req)
310{
311    struct filemgr *file = handle->file;
312    uint32_t blocksize = handle->file->blocksize;
313    uint64_t last_wal_flush_hdr_bid = handle->last_wal_flush_hdr_bid;
314    uint64_t hdr_off = hdr_bid * FDB_BLOCKSIZE;
315    uint64_t offset = 0; //assume everything from first block needs restoration
316    uint64_t filesize = filemgr_get_pos(handle->file);
317    uint64_t doc_scan_limit;
318    uint64_t start_bmp_revnum, stop_bmp_revnum;
319    uint64_t cur_bmp_revnum = (uint64_t)-1;
320    bid_t next_doc_block = BLK_NOT_FOUND;
321    struct _fdb_key_cmp_info cmp_info;
322    err_log_callback *log_callback;
323
324    if (mode == FDB_RESTORE_NORMAL && !handle->shandle &&
325        !wal_try_restore(handle->file)) { // Atomically try to restore WAL
326        // Some other thread or previous open had successfully initialized WAL
327        // We can simply return here
328        return;
329    }
330
331    if (!hdr_off) { // Nothing to do if we don't have a header block offset
332        return;
333    }
334
335    if (last_wal_flush_hdr_bid != BLK_NOT_FOUND) {
336        offset = (last_wal_flush_hdr_bid + 1) * blocksize;
337    }
338
339    // If a valid last header was retrieved and it matches the current header
340    if (hdr_off == offset || hdr_bid == last_wal_flush_hdr_bid) {
341        return; // No WAL section in the file
342    }
343
344    if (mode == FDB_RESTORE_NORMAL && !handle->shandle) {
345        // for normal WAL restore, set status to dirty
346        // (only when the previous status is clean or dirty)
347        wal_set_dirty_status(handle->file, FDB_WAL_DIRTY, true);
348    }
349
350    // Temporarily disable the error logging callback as there are false positive
351    // checksum errors in docio_read_doc.
352    // TODO: Need to adapt docio_read_doc to separate false checksum errors.
353    err_log_callback dummy_cb;
354    log_callback = handle->dhandle->log_callback;
355    dummy_cb.callback = fdb_dummy_log_callback;
356    dummy_cb.ctx_data = NULL;
357    handle->dhandle->log_callback = &dummy_cb;
358
359    if (!handle->shandle) {
360        filemgr_mutex_lock(file);
361    }
362    cmp_info.kvs_config = handle->kvs_config;
363    cmp_info.kvs = handle->kvs;
364
365    start_bmp_revnum = _fdb_get_bmp_revnum(handle, last_wal_flush_hdr_bid);
366    stop_bmp_revnum= _fdb_get_bmp_revnum(handle, hdr_bid);
367    cur_bmp_revnum = start_bmp_revnum;
368
369    // A: reused blocks during the 1st block reclaim (bmp_revnum: 1)
370    // B: reused blocks during the 2nd block reclaim (bmp_revnum: 2)
371    // otherwise: live block (bmp_revnum: 0)
372    //  1 2   3    4    5 6  7  8   9  10
373    // +-------------------------------------------+
374    // |  BBBBAAAAABBBBB  AAABBB    AAA            |
375    // +-------------------------------------------+
376    //              ^                     ^
377    //              hdr_bid               last_wal_flush
378    //
379    // scan order: 1 -> 5 -> 8 -> 10 -> 3 -> 6 -> 9 -> 2 -> 4 -> 7
380    // iteration #1: scan docs with bmp_revnum==0 in [last_wal_flush ~ filesize]
381    // iteration #2: scan docs with bmp_revnum==1 in [0 ~ filesize]
382    // iteration #3: scan docs with bmp_revnum==2 in [0 ~ hdr_bid]
383
384    do {
385        if (cur_bmp_revnum > stop_bmp_revnum) {
386            break;
387        } else if (cur_bmp_revnum == stop_bmp_revnum) {
388
389            bid_t sb_last_hdr_bid = BLK_NOT_FOUND;
390            if (handle->file->sb) {
391                sb_last_hdr_bid = atomic_get_uint64_t(&handle->file->sb->last_hdr_bid);
392            }
393            if (!handle->shandle && handle->file->sb &&
394                sb_last_hdr_bid != BLK_NOT_FOUND) {
395                hdr_off = (sb_last_hdr_bid+1) * blocksize;
396            }
397
398            doc_scan_limit = hdr_off;
399            if (offset >= hdr_off) {
400                break;
401            }
402        } else {
403            doc_scan_limit = filesize;
404        }
405
406        if (!docio_check_buffer(handle->dhandle, offset / blocksize,
407                                cur_bmp_revnum)) {
408            // not a document block .. move to next block
409        } else {
410            do {
411                struct docio_object doc;
412                int64_t _offset;
413                uint64_t doc_offset;
414                memset(&doc, 0, sizeof(doc));
415                _offset = docio_read_doc(handle->dhandle, offset, &doc, true);
416                if (_offset <= 0) { // reached unreadable doc, skip block
417                    // TODO: Need to have this function return fdb_status, so that
418                    // WAL restore operation should fail if offset < 0
419                    break;
420                } else if ((uint64_t)_offset < offset) {
421                    // If more than one writer is appending docs concurrently,
422                    // they have their own doc block linked list and doc blocks
423                    // may not be consecutive. For example,
424                    //
425                    // Writer 1): 100 -> 102 -> 2 -> 4     | commit
426                    // Writer 2):    101 - > 103 -> 3 -> 5 |
427                    //
428                    // In this case, if we read doc BID 102, then 'offset' will jump
429                    // to doc BID 2, without reading BID 103.
430                    //
431                    // To address this issue, in case that 'offset' decreases,
432                    // remember the next doc block, and follow the doc linked list
433                    // first. After the linked list ends, 'offset' cursor will be
434                    // reset to 'next_doc_block'.
435                    next_doc_block = (offset / blocksize) + 1;
436                }
437                if (doc.key || (doc.length.flag & DOCIO_TXN_COMMITTED)) {
438                    // check if the doc is transactional or not, and
439                    // also check if the doc contains system info
440                    if (!(doc.length.flag & DOCIO_TXN_DIRTY) &&
441                        !(doc.length.flag & DOCIO_SYSTEM)) {
442                        if (doc.length.flag & DOCIO_TXN_COMMITTED) {
443                            // commit mark .. read doc offset
444                            doc_offset = doc.doc_offset;
445                            // read the previously skipped doc
446                            if (docio_read_doc(handle->dhandle, doc_offset, &doc, true) <= 0) {
447                                // doc read error
448                                free(doc.key);
449                                free(doc.meta);
450                                free(doc.body);
451                                offset = _offset;
452                                continue;
453                            }
454                        } else {
455                            doc_offset = offset;
456                        }
457
458                        // If say a snapshot is taken on a db handle after
459                        // rollback, then skip WAL items after rollback point
460                        if ((mode == FDB_RESTORE_KV_INS || !handle->kvs) &&
461                            doc.seqnum > handle->seqnum) {
462                            free(doc.key);
463                            free(doc.meta);
464                            free(doc.body);
465                            offset = _offset;
466                            continue;
467                        }
468
469                        // restore document
470                        fdb_doc wal_doc;
471                        wal_doc.keylen = doc.length.keylen;
472                        wal_doc.bodylen = doc.length.bodylen;
473                        wal_doc.key = doc.key;
474                        wal_doc.seqnum = doc.seqnum;
475                        wal_doc.deleted = doc.length.flag & DOCIO_DELETED;
476
477                        if (!handle->shandle) {
478                            wal_doc.metalen = doc.length.metalen;
479                            wal_doc.meta = doc.meta;
480                            wal_doc.size_ondisk = _fdb_get_docsize(doc.length);
481
482                            if (handle->kvs) {
483                                // check seqnum before insert
484                                fdb_kvs_id_t kv_id;
485                                fdb_seqnum_t kv_seqnum;
486                                buf2kvid(handle->config.chunksize,
487                                         wal_doc.key, &kv_id);
488
489                                kv_seqnum = fdb_kvs_get_seqnum(handle->file, kv_id);
490                                if (doc.seqnum <= kv_seqnum &&
491                                        ((mode == FDB_RESTORE_KV_INS &&
492                                            kv_id == kv_id_req) ||
493                                         (mode == FDB_RESTORE_NORMAL)) ) {
494                                    // if mode is NORMAL, restore all items
495                                    // if mode is KV_INS, restore items matching ID
496                                    wal_insert(&file->global_txn, file, &cmp_info,
497                                               &wal_doc, doc_offset,
498                                               WAL_INS_WRITER);
499                                }
500                            } else {
501                                wal_insert(&file->global_txn, file, &cmp_info,
502                                           &wal_doc, doc_offset,
503                                           WAL_INS_WRITER);
504                            }
505                            if (doc.key) free(doc.key);
506                        } else {
507                            // snapshot
508                            if (handle->kvs) {
509                                fdb_kvs_id_t kv_id;
510                                buf2kvid(handle->config.chunksize,
511                                         wal_doc.key, &kv_id);
512                                if (kv_id == handle->kvs->id) {
513                                    // snapshot: insert ID matched documents only
514                                    wal_snap_insert(handle->shandle,
515                                                    &wal_doc, doc_offset);
516                                } else {
517                                    free(doc.key);
518                                }
519                            } else {
520                                wal_snap_insert(handle->shandle, &wal_doc,
521                                                doc_offset);
522                            }
523                        }
524                        free(doc.meta);
525                        free(doc.body);
526                        offset = _offset;
527                    } else {
528                        // skip transactional document or system document
529                        free(doc.key);
530                        free(doc.meta);
531                        free(doc.body);
532                        offset = _offset;
533                        // do not break.. read next doc
534                    }
535                } else {
536                    free(doc.key);
537                    free(doc.meta);
538                    free(doc.body);
539                    offset = _offset;
540                    break;
541                }
542            } while (offset + sizeof(struct docio_length) < doc_scan_limit);
543        }
544
545        if (next_doc_block != BLK_NOT_FOUND) {
546            offset = next_doc_block * blocksize;
547            next_doc_block = BLK_NOT_FOUND;
548        } else {
549            offset = ((offset / blocksize) + 1) * blocksize;
550        }
551        if (ver_superblock_support(handle->file->version) &&
552            offset >= filesize) {
553            // circular scan
554            struct superblock *sb = handle->file->sb;
555            if (sb && sb->config) {
556                offset = blocksize * sb->config->num_sb;
557                cur_bmp_revnum++;
558            }
559        }
560    } while(true);
561
562    // wal commit
563    if (!handle->shandle) {
564        wal_commit(&file->global_txn, file, NULL, &handle->log_callback);
565        filemgr_mutex_unlock(file);
566    }
567    handle->dhandle->log_callback = log_callback;
568}
569
570INLINE fdb_status _fdb_recover_compaction(fdb_kvs_handle *handle,
571                                          const char *new_filename)
572{
573    fdb_kvs_handle new_db;
574    fdb_config config = handle->config;
575    struct filemgr *new_file;
576
577    // As partially compacted file may contain various errors,
578    // we temporarily disable log callback for compaction recovery.
579    memset(&new_db, 0, sizeof(new_db));
580    new_db.log_callback.callback = NULL;
581    new_db.log_callback.ctx_data = NULL;
582    config.flags |= FDB_OPEN_FLAG_RDONLY;
583    new_db.fhandle = handle->fhandle;
584    new_db.kvs_config = handle->kvs_config;
585    fdb_status status = _fdb_open(&new_db, new_filename,
586                                  FDB_AFILENAME, &config);
587    if (status != FDB_RESULT_SUCCESS) {
588        return fdb_log(&handle->log_callback, status,
589                       "Error in opening a partially compacted file '%s' for recovery.",
590                       new_filename);
591    }
592
593    new_file = new_db.file;
594
595    if (new_file->old_filename &&
596        !strncmp(new_file->old_filename, handle->file->filename,
597                 FDB_MAX_FILENAME_LEN)) {
598        struct filemgr *old_file = handle->file;
599        // If new file has a recorded old_filename then it means that
600        // compaction has completed successfully. Mark self for deletion
601        filemgr_mutex_lock(new_file);
602
603        status = btreeblk_end(handle->bhandle);
604        if (status != FDB_RESULT_SUCCESS) {
605            filemgr_mutex_unlock(new_file);
606            _fdb_close(&new_db);
607            return status;
608        }
609        btreeblk_free(handle->bhandle);
610        free(handle->bhandle);
611        handle->bhandle = new_db.bhandle;
612
613        docio_free(handle->dhandle);
614        free(handle->dhandle);
615        handle->dhandle = new_db.dhandle;
616
617        hbtrie_free(handle->trie);
618        free(handle->trie);
619        handle->trie = new_db.trie;
620
621        wal_shutdown(handle->file, &handle->log_callback);
622        handle->file = new_file;
623
624        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
625            if (handle->kvs) {
626                // multi KV instance mode
627                hbtrie_free(handle->seqtrie);
628                free(handle->seqtrie);
629                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
630                    handle->seqtrie = new_db.seqtrie;
631                }
632            } else {
633                free(handle->seqtree->kv_ops);
634                free(handle->seqtree);
635                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
636                    handle->seqtree = new_db.seqtree;
637                }
638            }
639        }
640        handle->staletree = new_db.staletree;
641
642        filemgr_mutex_unlock(new_file);
643        if (new_db.kvs) {
644            fdb_kvs_info_free(&new_db);
645        }
646        fdb_log(&handle->log_callback, FDB_RESULT_FAIL_BY_COMPACTION,
647                "Successfully used partially compacted file '%s' for recovery replacing old file %s.",
648                new_filename, new_file->old_filename);
649        // remove self: WARNING must not close this handle if snapshots
650        // are yet to open this file
651        filemgr_remove_pending(old_file, new_db.file, &new_db.log_callback);
652        filemgr_close(old_file, 0, handle->filename, &handle->log_callback);
653        free(new_db.filename);
654        return FDB_RESULT_FAIL_BY_COMPACTION;
655    }
656
657    // As the new file is partially compacted, it should be removed upon close.
658    // Just in-case the new file gets opened before removal, point it to the old
659    // file to ensure availability of data.
660    fdb_log(&handle->log_callback, FDB_RESULT_SUCCESS,
661            "Partially compacted file '%s' could not be used for recovery. Using old file %s.",
662                new_filename, handle->file->filename);
663    filemgr_remove_pending(new_db.file, handle->file, &handle->log_callback);
664    _fdb_close(&new_db);
665
666    return FDB_RESULT_SUCCESS;
667}
668
669#ifndef SPIN_INITIALIZER
670INLINE void init_initial_lock_status() {
671    // Note that only Windows passes through this routine
672    if (!fdb_initialized) {
673        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
674            // atomically initialize spin lock only once
675            spin_init(&initial_lock);
676            initial_lock_status = 2;
677        } else {
678            // the others .. wait until initializing 'initial_lock' is done
679            // TODO: Need to devise a better way of synchronization on Windows
680            while (initial_lock_status != 2) {
681                Sleep(1);
682            }
683        }
684    }
685}
686#endif
687
688LIBFDB_API
689fdb_status fdb_init(fdb_config *config)
690{
691    fdb_config _config;
692    compactor_config c_config;
693    bgflusher_config bgf_config;
694    struct filemgr_config f_config;
695
696    if (config) {
697        if (validate_fdb_config(config)) {
698            _config = *config;
699        } else {
700            return FDB_RESULT_INVALID_CONFIG;
701        }
702    } else {
703        _config = get_default_config();
704    }
705
706    // global initialization
707    // initialized only once at first time
708    if (!fdb_initialized) {
709
710#ifndef SPIN_INITIALIZER
711        init_initial_lock_status();
712#endif
713
714    }
715    spin_lock(&initial_lock);
716    if (!fdb_initialized) {
717#if !defined(_ANDROID_) && !defined(__ANDROID__)
718        // Some Android devices (e.g., Nexus 6) return incorrect RAM size.
719        // We temporarily disable validity checking of block cache size
720        // on Android platform at this time.
721        double ram_size = (double) get_memory_size();
722
723#if defined(__linux__)
724        /* handle the control group case if needed */
725        uint64_t mem_limit_size=0;
726        FILE *mem_limit_file;
727        char cgroup_file[] = "/sys/fs/cgroup/memory/memory.limit_in_bytes";
728
729        mem_limit_file = fopen(cgroup_file, "r");
730        if (mem_limit_file) {
731            fscanf(mem_limit_file, "%" _F64, &mem_limit_size);
732            if (ram_size > mem_limit_size) {
733                ram_size = mem_limit_size;
734            }
735        }
736#endif /* __linux__ */
737
738        if (ram_size * BCACHE_MEMORY_THRESHOLD < (double) _config.buffercache_size) {
739            spin_unlock(&initial_lock);
740            return FDB_RESULT_TOO_BIG_BUFFER_CACHE;
741        }
742#endif
743        // initialize file manager and block cache
744        f_config.blocksize = _config.blocksize;
745        f_config.ncacheblock = _config.buffercache_size / _config.blocksize;
746        f_config.seqtree_opt = _config.seqtree_opt;
747        filemgr_init(&f_config);
748        filemgr_set_lazy_file_deletion(true,
749                                       compactor_register_file_removing,
750                                       compactor_is_file_removed);
751        if (ver_superblock_support(ver_get_latest_magic())) {
752            struct sb_ops sb_ops = {sb_init, sb_get_default_config,
753                                    sb_read_latest, sb_alloc_block,
754                                    sb_bmp_is_writable, sb_get_bmp_revnum,
755                                    sb_get_min_live_revnum, sb_free};
756            filemgr_set_sb_operation(sb_ops);
757            sb_bmp_mask_init();
758        }
759
760        // initialize compaction daemon
761        c_config.sleep_duration = _config.compactor_sleep_duration;
762        c_config.num_threads = _config.num_compactor_threads;
763        compactor_init(&c_config);
764        // initialize background flusher daemon
765        // Temporarily disable background flushers until blockcache contention
766        // issue is resolved.
767        bgf_config.num_threads = 0; //_config.num_bgflusher_threads;
768        bgflusher_init(&bgf_config);
769
770        // Initialize breakpad
771        _dbg_handle_crashes(config->breakpad_minidump_dir);
772
773        fdb_initialized = 1;
774    }
775    spin_unlock(&initial_lock);
776
777    return FDB_RESULT_SUCCESS;
778}
779
780LIBFDB_API
781fdb_config fdb_get_default_config(void) {
782    return get_default_config();
783}
784
785LIBFDB_API
786fdb_kvs_config fdb_get_default_kvs_config(void) {
787    return get_default_kvs_config();
788}
789
790LIBFDB_API
791fdb_status fdb_open(fdb_file_handle **ptr_fhandle,
792                    const char *filename,
793                    fdb_config *fconfig)
794{
795#ifdef _MEMPOOL
796    mempool_init();
797#endif
798
799    fdb_config config;
800    fdb_file_handle *fhandle;
801    fdb_kvs_handle *handle;
802    LATENCY_STAT_START();
803
804    if (fconfig) {
805        if (validate_fdb_config(fconfig)) {
806            config = *fconfig;
807        } else {
808            return FDB_RESULT_INVALID_CONFIG;
809        }
810    } else {
811        config = get_default_config();
812    }
813
814    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
815    if (!fhandle) { // LCOV_EXCL_START
816        return FDB_RESULT_ALLOC_FAIL;
817    } // LCOV_EXCL_STOP
818
819    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
820    if (!handle) { // LCOV_EXCL_START
821        free(fhandle);
822        return FDB_RESULT_ALLOC_FAIL;
823    } // LCOV_EXCL_STOP
824
825#ifndef SPIN_INITIALIZER
826    init_initial_lock_status();
827#endif
828
829    spin_lock(&initial_lock);
830    fdb_open_inprog++;
831    spin_unlock(&initial_lock);
832
833    atomic_init_uint8_t(&handle->handle_busy, 0);
834    handle->shandle = NULL;
835    handle->kvs_config = get_default_kvs_config();
836
837    fdb_status fs = fdb_init(fconfig);
838    if (fs != FDB_RESULT_SUCCESS) {
839        free(handle);
840        free(fhandle);
841        spin_lock(&initial_lock);
842        fdb_open_inprog--;
843        spin_unlock(&initial_lock);
844        return fs;
845    }
846    fdb_file_handle_init(fhandle, handle);
847
848    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
849    if (fs == FDB_RESULT_SUCCESS) {
850        *ptr_fhandle = fhandle;
851        filemgr_fhandle_add(handle->file, fhandle);
852        LATENCY_STAT_END(handle->file, FDB_LATENCY_OPEN);
853    } else {
854        *ptr_fhandle = NULL;
855        free(handle);
856        fdb_file_handle_free(fhandle);
857    }
858    spin_lock(&initial_lock);
859    fdb_open_inprog--;
860    spin_unlock(&initial_lock);
861    return fs;
862}
863
864LIBFDB_API
865fdb_status fdb_open_custom_cmp(fdb_file_handle **ptr_fhandle,
866                               const char *filename,
867                               fdb_config *fconfig,
868                               size_t num_functions,
869                               char **kvs_names,
870                               fdb_custom_cmp_variable *functions)
871{
872#ifdef _MEMPOOL
873    mempool_init();
874#endif
875
876    fdb_config config;
877    fdb_file_handle *fhandle;
878    fdb_kvs_handle *handle;
879
880    if (fconfig) {
881        if (validate_fdb_config(fconfig)) {
882            config = *fconfig;
883        } else {
884            return FDB_RESULT_INVALID_CONFIG;
885        }
886    } else {
887        config = get_default_config();
888    }
889
890    if (config.multi_kv_instances == false) {
891        // single KV instance mode does not support customized cmp function
892        return FDB_RESULT_INVALID_CONFIG;
893    }
894
895    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
896    if (!fhandle) { // LCOV_EXCL_START
897        return FDB_RESULT_ALLOC_FAIL;
898    } // LCOV_EXCL_STOP
899
900    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
901    if (!handle) { // LCOV_EXCL_START
902        free(fhandle);
903        return FDB_RESULT_ALLOC_FAIL;
904    } // LCOV_EXCL_STOP
905
906#ifndef SPIN_INITIALIZER
907    init_initial_lock_status();
908#endif
909
910    spin_lock(&initial_lock);
911    fdb_open_inprog++;
912    spin_unlock(&initial_lock);
913
914    atomic_init_uint8_t(&handle->handle_busy, 0);
915    handle->shandle = NULL;
916    handle->kvs_config = get_default_kvs_config();
917
918    fdb_status fs = fdb_init(fconfig);
919    if (fs != FDB_RESULT_SUCCESS) {
920        free(handle);
921        free(fhandle);
922        spin_lock(&initial_lock);
923        fdb_open_inprog--;
924        spin_unlock(&initial_lock);
925        return fs;
926    }
927    fdb_file_handle_init(fhandle, handle);
928
929    // insert kvs_names and functions into fhandle's list
930    fdb_file_handle_parse_cmp_func(fhandle, num_functions,
931                                   kvs_names, functions);
932
933    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
934    if (fs == FDB_RESULT_SUCCESS) {
935        *ptr_fhandle = fhandle;
936        filemgr_fhandle_add(handle->file, fhandle);
937    } else {
938        *ptr_fhandle = NULL;
939        free(handle);
940        fdb_file_handle_free(fhandle);
941    }
942    spin_lock(&initial_lock);
943    fdb_open_inprog--;
944    spin_unlock(&initial_lock);
945    return fs;
946}
947
948fdb_status fdb_open_for_compactor(fdb_file_handle **ptr_fhandle,
949                                  const char *filename,
950                                  fdb_config *fconfig,
951                                  struct list *cmp_func_list)
952{
953#ifdef _MEMPOOL
954    mempool_init();
955#endif
956
957    fdb_file_handle *fhandle;
958    fdb_kvs_handle *handle;
959
960    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
961    if (!fhandle) { // LCOV_EXCL_START
962        return FDB_RESULT_ALLOC_FAIL;
963    } // LCOV_EXCL_STOP
964
965    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
966    if (!handle) { // LCOV_EXCL_START
967        free(fhandle);
968        return FDB_RESULT_ALLOC_FAIL;
969    } // LCOV_EXCL_STOP
970
971    atomic_init_uint8_t(&handle->handle_busy, 0);
972    handle->shandle = NULL;
973
974    fdb_file_handle_init(fhandle, handle);
975    if (cmp_func_list && list_begin(cmp_func_list)) {
976        fdb_file_handle_clone_cmp_func_list(fhandle, cmp_func_list);
977    }
978    fdb_status fs = _fdb_open(handle, filename, FDB_VFILENAME, fconfig);
979    if (fs == FDB_RESULT_SUCCESS) {
980        *ptr_fhandle = fhandle;
981        filemgr_fhandle_add(handle->file, fhandle);
982    } else {
983        *ptr_fhandle = NULL;
984        free(handle);
985        fdb_file_handle_free(fhandle);
986    }
987    return fs;
988}
989
990LIBFDB_API
991fdb_status fdb_snapshot_open(fdb_kvs_handle *handle_in,
992                             fdb_kvs_handle **ptr_handle, fdb_seqnum_t seqnum)
993{
994#ifdef _MEMPOOL
995    mempool_init();
996#endif
997
998    if (!handle_in || !ptr_handle) {
999        return FDB_RESULT_INVALID_HANDLE;
1000    }
1001
1002    fdb_config config = handle_in->config;
1003    fdb_kvs_config kvs_config = handle_in->kvs_config;
1004    fdb_kvs_id_t kv_id = 0;
1005    fdb_kvs_handle *handle;
1006    fdb_txn *txn = NULL;
1007    fdb_status fs = FDB_RESULT_SUCCESS;
1008    filemgr *file;
1009    file_status_t fstatus = FILE_NORMAL;
1010    struct snap_handle dummy_shandle;
1011    struct _fdb_key_cmp_info cmp_info;
1012    LATENCY_STAT_START();
1013
1014fdb_snapshot_open_start:
1015    if (!handle_in->shandle) {
1016        fdb_check_file_reopen(handle_in, &fstatus);
1017        fdb_sync_db_header(handle_in);
1018        file = handle_in->file;
1019
1020        if (handle_in->kvs && handle_in->kvs->type == KVS_SUB) {
1021            handle_in->seqnum = fdb_kvs_get_seqnum(file,
1022                                                   handle_in->kvs->id);
1023        } else {
1024            handle_in->seqnum = filemgr_get_seqnum(file);
1025        }
1026    } else {
1027        file = handle_in->file;
1028    }
1029
1030    // if the max sequence number seen by this handle is lower than the
1031    // requested snapshot marker, it means the snapshot is not yet visible
1032    // even via the current fdb_kvs_handle
1033    if (seqnum != FDB_SNAPSHOT_INMEM && seqnum > handle_in->seqnum) {
1034        return FDB_RESULT_NO_DB_INSTANCE;
1035    }
1036
1037    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1038    if (!handle) { // LCOV_EXCL_START
1039        return FDB_RESULT_ALLOC_FAIL;
1040    } // LCOV_EXCL_STOP
1041
1042    atomic_init_uint8_t(&handle->handle_busy, 0);
1043    handle->log_callback = handle_in->log_callback;
1044    handle->max_seqnum = seqnum;
1045    handle->fhandle = handle_in->fhandle;
1046
1047    config.flags |= FDB_OPEN_FLAG_RDONLY;
1048    // do not perform compaction for snapshot
1049    config.compaction_mode = FDB_COMPACTION_MANUAL;
1050
1051    // If cloning an existing snapshot handle, then rewind indexes
1052    // to its last DB header and point its avl tree to existing snapshot's tree
1053    bool clone_snapshot = false;
1054    if (handle_in->shandle) {
1055        atomic_store_uint64_t(&handle->last_hdr_bid,  // do fast rewind
1056                              atomic_get_uint64_t(&handle_in->last_hdr_bid));
1057        fs = wal_snapshot_clone(handle_in->shandle, &handle->shandle, seqnum);
1058        if (fs == FDB_RESULT_SUCCESS) {
1059            clone_snapshot = true;
1060            handle->max_seqnum = FDB_SNAPSHOT_INMEM; // temp value to skip WAL
1061        } else {
1062            fdb_log(&handle_in->log_callback, fs,
1063                    "Warning: Snapshot clone at sequence number %" _F64
1064                    "does not match its snapshot handle %" _F64
1065                    "in file '%s'.", seqnum, handle_in->seqnum,
1066                    handle_in->file->filename);
1067            free(handle);
1068            return fs;
1069        }
1070    }
1071
1072    cmp_info.kvs_config = handle_in->kvs_config;
1073    cmp_info.kvs = handle_in->kvs;
1074
1075    if (!handle->shandle) {
1076        txn = handle_in->fhandle->root->txn;
1077        if (!txn) {
1078            txn = &handle_in->file->global_txn;
1079        }
1080        if (handle_in->kvs) {
1081            kv_id = handle_in->kvs->id;
1082        }
1083        if (seqnum == FDB_SNAPSHOT_INMEM) {
1084            memset(&dummy_shandle, 0, sizeof(struct snap_handle));
1085            // tmp value to denote snapshot & not rollback to _fdb_open
1086            handle->shandle = &dummy_shandle; // dummy
1087        } else {
1088            fs = wal_dur_snapshot_open(seqnum, &cmp_info, file, txn,
1089                                       &handle->shandle);
1090        }
1091        if (fs != FDB_RESULT_SUCCESS) {
1092            free(handle);
1093            return fs;
1094        }
1095    }
1096
1097    if (handle_in->kvs) {
1098        // sub-handle in multi KV instance mode
1099        if (clone_snapshot) {
1100            fs = _fdb_kvs_clone_snapshot(handle_in, handle);
1101        } else {
1102            fs = _fdb_kvs_open(handle_in->kvs->root,
1103                              &config, &kvs_config, file,
1104                              file->filename,
1105                              _fdb_kvs_get_name(handle_in, file),
1106                              handle);
1107        }
1108    } else {
1109        if (clone_snapshot) {
1110            fs = _fdb_clone_snapshot(handle_in, handle);
1111        } else {
1112            fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1113        }
1114    }
1115
1116    if (fs == FDB_RESULT_SUCCESS) {
1117        if (seqnum == FDB_SNAPSHOT_INMEM &&
1118            !handle_in->shandle) {
1119            handle->max_seqnum = handle_in->seqnum;
1120
1121            // synchronize dirty root nodes if exist
1122            bid_t dirty_idtree_root = BLK_NOT_FOUND;
1123            bid_t dirty_seqtree_root = BLK_NOT_FOUND;
1124            struct filemgr_dirty_update_node *dirty_update;
1125
1126            dirty_update = filemgr_dirty_update_get_latest(handle->file);
1127            btreeblk_set_dirty_update(handle->bhandle, dirty_update);
1128
1129            if (dirty_update) {
1130                filemgr_dirty_update_get_root(handle->file, dirty_update,
1131                                       &dirty_idtree_root, &dirty_seqtree_root);
1132                _fdb_import_dirty_root(handle, dirty_idtree_root,
1133                                       dirty_seqtree_root);
1134                btreeblk_discard_blocks(handle->bhandle);
1135            }
1136            // Having synced the dirty root, make an in-memory WAL snapshot
1137            // TODO: Re-enable WAL sharing once ready...
1138#ifdef _MVCC_WAL_ENABLE
1139            fs = wal_snapshot_open(handle->file, txn, kv_id, seqnum,
1140                                   &cmp_info, &handle->shandle);
1141#else
1142            fs = wal_dur_snapshot_open(handle->seqnum, &cmp_info, file, txn,
1143                                       &handle->shandle);
1144            if (fs == FDB_RESULT_SUCCESS) {
1145                fs = wal_copyto_snapshot(file, handle->shandle,
1146                                        (bool)handle_in->kvs);
1147            }
1148            (void)kv_id;
1149#endif // _MVCC_WAL_ENABLE
1150        } else if (clone_snapshot) {
1151            // Snapshot is created on the other snapshot handle
1152
1153            handle->max_seqnum = handle_in->seqnum;
1154
1155            if (seqnum == FDB_SNAPSHOT_INMEM) {
1156                // in-memory snapshot
1157                // Clone dirty root nodes from the source snapshot by incrementing
1158                // their ref counters
1159                handle->trie->root_bid = handle_in->trie->root_bid;
1160                if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1161                    if (handle->kvs) {
1162                        handle->seqtrie->root_bid = handle_in->seqtrie->root_bid;
1163                    } else {
1164                        handle->seqtree->root_bid = handle_in->seqtree->root_bid;
1165                    }
1166                }
1167                btreeblk_discard_blocks(handle->bhandle);
1168
1169                // increase ref count for dirty update
1170                struct filemgr_dirty_update_node *dirty_update;
1171                dirty_update = btreeblk_get_dirty_update(handle_in->bhandle);
1172                filemgr_dirty_update_inc_ref_count(dirty_update);
1173                btreeblk_set_dirty_update(handle->bhandle, dirty_update);
1174            }
1175        }
1176        *ptr_handle = handle;
1177    } else {
1178        *ptr_handle = NULL;
1179        if (clone_snapshot || seqnum != FDB_SNAPSHOT_INMEM) {
1180            wal_snapshot_close(handle->shandle, handle->file);
1181        }
1182        free(handle);
1183        // If compactor thread had finished compaction just before this routine
1184        // calls _fdb_open, then it is possible that the snapshot's DB header
1185        // is only present in the new_file. So we must retry the snapshot
1186        // open attempt IFF _fdb_open indicates FDB_RESULT_NO_DB_INSTANCE..
1187        if (fs == FDB_RESULT_NO_DB_INSTANCE && fstatus == FILE_COMPACT_OLD) {
1188            if (filemgr_get_file_status(file) == FILE_REMOVED_PENDING) {
1189                goto fdb_snapshot_open_start;
1190            }
1191        }
1192    }
1193
1194    if (handle_in->shandle) {
1195        LATENCY_STAT_END(file, FDB_LATENCY_SNAP_CLONE);
1196    } else if (seqnum == FDB_SNAPSHOT_INMEM) {
1197        LATENCY_STAT_END(file, FDB_LATENCY_SNAP_INMEM);
1198    } else {
1199        LATENCY_STAT_END(file, FDB_LATENCY_SNAP_DUR);
1200    }
1201    return fs;
1202}
1203
1204static fdb_status _fdb_reset(fdb_kvs_handle *handle, fdb_kvs_handle *handle_in);
1205
1206LIBFDB_API
1207fdb_status fdb_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
1208{
1209#ifdef _MEMPOOL
1210    mempool_init();
1211#endif
1212
1213    fdb_config config;
1214    fdb_kvs_handle *handle_in, *handle;
1215    fdb_status fs;
1216    fdb_seqnum_t old_seqnum;
1217
1218    if (!handle_ptr) {
1219        return FDB_RESULT_INVALID_HANDLE;
1220    }
1221
1222    handle_in = *handle_ptr;
1223
1224    if (!handle_in) {
1225        return FDB_RESULT_INVALID_HANDLE;
1226    }
1227
1228    config = handle_in->config;
1229
1230    if (handle_in->kvs) {
1231        return fdb_kvs_rollback(handle_ptr, seqnum);
1232    }
1233
1234    if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
1235        return fdb_log(&handle_in->log_callback, FDB_RESULT_RONLY_VIOLATION,
1236                       "Warning: Rollback is not allowed on the read-only DB file '%s'.",
1237                       handle_in->file->filename);
1238    }
1239
1240    if (!atomic_cas_uint8_t(&handle_in->handle_busy, 0, 1)) {
1241        return FDB_RESULT_HANDLE_BUSY;
1242    }
1243
1244    filemgr_mutex_lock(handle_in->file);
1245    filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
1246    // All transactions should be closed before rollback
1247    if (wal_txn_exists(handle_in->file)) {
1248        filemgr_set_rollback(handle_in->file, 0);
1249        filemgr_mutex_unlock(handle_in->file);
1250        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1251        return FDB_RESULT_FAIL_BY_TRANSACTION;
1252    }
1253
1254    // If compaction is running, wait until it is aborted.
1255    // TODO: Find a better way of waiting for the compaction abortion.
1256    unsigned int sleep_time = 10000; // 10 ms.
1257    file_status_t fstatus = filemgr_get_file_status(handle_in->file);
1258    while (fstatus == FILE_COMPACT_OLD) {
1259        filemgr_mutex_unlock(handle_in->file);
1260        decaying_usleep(&sleep_time, 1000000);
1261        filemgr_mutex_lock(handle_in->file);
1262        fstatus = filemgr_get_file_status(handle_in->file);
1263    }
1264    if (fstatus == FILE_REMOVED_PENDING) {
1265        filemgr_mutex_unlock(handle_in->file);
1266        fdb_check_file_reopen(handle_in, NULL);
1267    } else {
1268        filemgr_mutex_unlock(handle_in->file);
1269    }
1270
1271    fdb_sync_db_header(handle_in);
1272
1273    // if the max sequence number seen by this handle is lower than the
1274    // requested snapshot marker, it means the snapshot is not yet visible
1275    // even via the current fdb_kvs_handle
1276    if (seqnum > handle_in->seqnum) {
1277        filemgr_set_rollback(handle_in->file, 0); // allow mutations
1278        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1279        return FDB_RESULT_NO_DB_INSTANCE;
1280    }
1281
1282    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1283    if (!handle) { // LCOV_EXCL_START
1284        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1285        return FDB_RESULT_ALLOC_FAIL;
1286    } // LCOV_EXCL_STOP
1287
1288    atomic_init_uint8_t(&handle->handle_busy, 0);
1289    handle->log_callback = handle_in->log_callback;
1290    handle->fhandle = handle_in->fhandle;
1291    if (seqnum == 0) {
1292        fs = _fdb_reset(handle, handle_in);
1293    } else {
1294        handle->max_seqnum = seqnum;
1295        fs = _fdb_open(handle, handle_in->file->filename, FDB_AFILENAME,
1296                       &config);
1297    }
1298
1299    filemgr_set_rollback(handle_in->file, 0); // allow mutations
1300    if (fs == FDB_RESULT_SUCCESS) {
1301        // rollback the file's sequence number
1302        filemgr_mutex_lock(handle_in->file);
1303        old_seqnum = filemgr_get_seqnum(handle_in->file);
1304        filemgr_set_seqnum(handle_in->file, seqnum);
1305        filemgr_mutex_unlock(handle_in->file);
1306
1307        fs = _fdb_commit(handle, FDB_COMMIT_MANUAL_WAL_FLUSH,
1308                !(handle_in->config.durability_opt & FDB_DRB_ASYNC));
1309        if (fs == FDB_RESULT_SUCCESS) {
1310            if (handle_in->txn) {
1311                handle->txn = handle_in->txn;
1312                handle_in->txn = NULL;
1313            }
1314            // Close, unlink and free the caller's rollback handle.
1315            _fdb_kvs_close(handle_in);
1316            free(handle_in);
1317            // Link the newly opened handle into the file handle's list
1318            _fdb_kvs_createNLinkKVHandle(handle->fhandle, handle);
1319            handle->max_seqnum = 0;
1320            handle->seqnum = seqnum;
1321            // Set the newly opened rolled-back handle as caller's handle
1322            *handle_ptr = handle;
1323        } else {
1324            // cancel the rolling-back of the sequence number
1325            filemgr_mutex_lock(handle_in->file);
1326            filemgr_set_seqnum(handle_in->file, old_seqnum);
1327            filemgr_mutex_unlock(handle_in->file);
1328            free(handle);
1329            atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1330        }
1331    } else {
1332        free(handle);
1333        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1334    }
1335
1336    return fs;
1337}
1338
1339LIBFDB_API
1340fdb_status fdb_rollback_all(fdb_file_handle *fhandle,
1341                            fdb_snapshot_marker_t marker)
1342{
1343#ifdef _MEMPOOL
1344    mempool_init();
1345#endif
1346
1347    fdb_config config;
1348    fdb_kvs_handle *super_handle;
1349    fdb_kvs_handle rhandle;
1350    fdb_kvs_handle *handle = &rhandle;
1351    struct filemgr *file;
1352    fdb_kvs_config kvs_config;
1353    fdb_status fs;
1354    err_log_callback log_callback;
1355    struct kvs_info *kvs;
1356    struct snap_handle shandle; // dummy snap handle
1357
1358    if (!fhandle) {
1359        return FDB_RESULT_INVALID_HANDLE;
1360    }
1361
1362    super_handle = fhandle->root;
1363    kvs = super_handle->kvs;
1364
1365    // fdb_rollback_all cannot be allowed when there are kv store instances
1366    // still open, because we do not have means of invalidating open kv handles
1367    // which may not be present in the rollback point
1368    if (kvs && _fdb_kvs_is_busy(fhandle)) {
1369        return FDB_RESULT_KV_STORE_BUSY;
1370    }
1371    file = super_handle->file;
1372    config = super_handle->config;
1373    kvs_config = super_handle->kvs_config;
1374    log_callback = super_handle->log_callback;
1375
1376    if (super_handle->config.flags & FDB_OPEN_FLAG_RDONLY) {
1377        return fdb_log(&super_handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
1378                       "Warning: Rollback is not allowed on the read-only DB file '%s'.",
1379                       super_handle->file->filename);
1380    }
1381
1382    filemgr_mutex_lock(super_handle->file);
1383    filemgr_set_rollback(super_handle->file, 1); // disallow writes operations
1384    // All transactions should be closed before rollback
1385    if (wal_txn_exists(super_handle->file)) {
1386        filemgr_set_rollback(super_handle->file, 0);
1387        filemgr_mutex_unlock(super_handle->file);
1388        return FDB_RESULT_FAIL_BY_TRANSACTION;
1389    }
1390
1391    // If compaction is running, wait until it is aborted.
1392    // TODO: Find a better way of waiting for the compaction abortion.
1393    unsigned int sleep_time = 10000; // 10 ms.
1394    file_status_t fstatus = filemgr_get_file_status(super_handle->file);
1395    while (fstatus == FILE_COMPACT_OLD) {
1396        filemgr_mutex_unlock(super_handle->file);
1397        decaying_usleep(&sleep_time, 1000000);
1398        filemgr_mutex_lock(super_handle->file);
1399        fstatus = filemgr_get_file_status(super_handle->file);
1400    }
1401    if (fstatus == FILE_REMOVED_PENDING) {
1402        filemgr_mutex_unlock(super_handle->file);
1403        fdb_check_file_reopen(super_handle, NULL);
1404    } else {
1405        filemgr_mutex_unlock(super_handle->file);
1406    }
1407
1408    fdb_sync_db_header(super_handle);
1409    // Shutdown WAL discarding entries from all KV Stores..
1410    fs = wal_shutdown(super_handle->file, &super_handle->log_callback);
1411    if (fs != FDB_RESULT_SUCCESS) {
1412        return fs;
1413    }
1414
1415    memset(handle, 0, sizeof(fdb_kvs_handle));
1416    memset(&shandle, 0, sizeof(struct snap_handle));
1417    handle->log_callback = log_callback;
1418    handle->fhandle = fhandle;
1419    // Fast rewind on open...
1420    atomic_store_uint64_t(&handle->last_hdr_bid, (bid_t)marker);
1421    handle->max_seqnum = FDB_SNAPSHOT_INMEM; // Prevent WAL restore on open
1422    handle->shandle = &shandle; // a dummy handle to prevent WAL restore
1423    if (kvs) {
1424        fdb_kvs_header_free(file); // KV header will be recreated below.
1425        handle->kvs = kvs; // re-use super_handle's kvs info
1426        handle->kvs_config = kvs_config;
1427    }
1428    handle->config = config;
1429
1430    fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1431
1432    if (handle->config.multi_kv_instances) {
1433        filemgr_mutex_lock(handle->file);
1434        fdb_kvs_header_create(handle->file);
1435        fdb_kvs_header_read(handle->file->kv_header, handle->dhandle,
1436                            handle->kv_info_offset,
1437                            handle->file->version, false);
1438        filemgr_mutex_unlock(handle->file);
1439    }
1440
1441    filemgr_set_rollback(file, 0); // allow mutations
1442    handle->shandle = NULL; // just a dummy handle never allocated
1443
1444    if (fs == FDB_RESULT_SUCCESS) {
1445        fdb_seqnum_t old_seqnum;
1446        // Restore WAL for all KV instances...
1447        _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, (bid_t)marker, 0);
1448
1449        // rollback the file's sequence number
1450        filemgr_mutex_lock(file);
1451        old_seqnum = filemgr_get_seqnum(file);
1452        filemgr_set_seqnum(file, handle->seqnum);
1453        filemgr_mutex_unlock(file);
1454
1455        fs = _fdb_commit(handle, FDB_COMMIT_NORMAL,
1456                         !(handle->config.durability_opt & FDB_DRB_ASYNC));
1457        if (fs == FDB_RESULT_SUCCESS) {
1458            _fdb_close(super_handle);
1459            *super_handle = *handle;
1460        } else {
1461            filemgr_mutex_lock(file);
1462            filemgr_set_seqnum(file, old_seqnum);
1463            filemgr_mutex_unlock(file);
1464        }
1465    } else { // Rollback failed, restore KV header
1466        fdb_kvs_header_create(file);
1467        fdb_kvs_header_read(file->kv_header, super_handle->dhandle,
1468                            super_handle->kv_info_offset,
1469                            ver_get_latest_magic(),
1470                            false);
1471    }
1472
1473    return fs;
1474}
1475
1476static void _fdb_init_file_config(const fdb_config *config,
1477                                  struct filemgr_config *fconfig) {
1478    fconfig->blocksize = config->blocksize;
1479    fconfig->ncacheblock = config->buffercache_size / config->blocksize;
1480    fconfig->chunksize = config->chunksize;
1481
1482    fconfig->options = 0x0;
1483    fconfig->seqtree_opt = config->seqtree_opt;
1484
1485    if (config->flags & FDB_OPEN_FLAG_CREATE) {
1486        fconfig->options |= FILEMGR_CREATE;
1487    }
1488    if (config->flags & FDB_OPEN_FLAG_RDONLY) {
1489        fconfig->options |= FILEMGR_READONLY;
1490    }
1491    if (!(config->durability_opt & FDB_DRB_ASYNC)) {
1492        fconfig->options |= FILEMGR_SYNC;
1493    }
1494
1495    fconfig->flag = 0x0;
1496    if ((config->durability_opt & FDB_DRB_ODIRECT) &&
1497        config->buffercache_size) {
1498        fconfig->flag |= _ARCH_O_DIRECT;
1499    }
1500
1501    fconfig->prefetch_duration = config->prefetch_duration;
1502    fconfig->num_wal_shards = config->num_wal_partitions;
1503    fconfig->num_bcache_shards = config->num_bcache_partitions;
1504    fconfig->encryption_key = config->encryption_key;
1505    atomic_store_uint64_t(&fconfig->block_reusing_threshold,
1506                          config->block_reusing_threshold,
1507                          std::memory_order_relaxed);
1508    atomic_store_uint64_t(&fconfig->num_keeping_headers,
1509                          config->num_keeping_headers,
1510                          std::memory_order_relaxed);
1511}
1512
1513fdb_status _fdb_clone_snapshot(fdb_kvs_handle *handle_in,
1514                               fdb_kvs_handle *handle_out)
1515{
1516    fdb_status status;
1517
1518    handle_out->config = handle_in->config;
1519    handle_out->kvs_config = handle_in->kvs_config;
1520    handle_out->fileops = handle_in->fileops;
1521    handle_out->file = handle_in->file;
1522    // Note that the file ref count will be decremented when the cloned snapshot
1523    // is closed through filemgr_close().
1524    filemgr_incr_ref_count(handle_out->file);
1525
1526    bool filename_allocated = false;
1527    if (handle_out->filename) {
1528        handle_out->filename = (char *)realloc(handle_out->filename,
1529                                               strlen(handle_in->filename)+1);
1530    } else {
1531        handle_out->filename = (char*)malloc(strlen(handle_in->filename)+1);
1532        filename_allocated = true;
1533    }
1534    strcpy(handle_out->filename, handle_in->filename);
1535
1536    // initialize the docio handle.
1537    handle_out->dhandle = (struct docio_handle *)
1538        calloc(1, sizeof(struct docio_handle));
1539    handle_out->dhandle->log_callback = &handle_out->log_callback;
1540    status = docio_init(handle_out->dhandle, handle_out->file,
1541                        handle_out->config.compress_document_body);
1542    if (status != FDB_RESULT_SUCCESS) {
1543        free(handle_out->dhandle);
1544        if (filename_allocated) {
1545            free(handle_out->filename);
1546        }
1547        return status;
1548    }
1549
1550    // initialize the btree block handle.
1551    handle_out->btreeblkops = btreeblk_get_ops();
1552    handle_out->bhandle = (struct btreeblk_handle *)
1553        calloc(1, sizeof(struct btreeblk_handle));
1554    handle_out->bhandle->log_callback = &handle_out->log_callback;
1555    btreeblk_init(handle_out->bhandle, handle_out->file, handle_out->file->blocksize);
1556
1557    handle_out->dirty_updates = handle_in->dirty_updates;
1558    atomic_store_uint64_t(&handle_out->cur_header_revnum, handle_in->cur_header_revnum);
1559    handle_out->last_wal_flush_hdr_bid = handle_in->last_wal_flush_hdr_bid;
1560    handle_out->kv_info_offset = handle_in->kv_info_offset;
1561    handle_out->op_stats = handle_in->op_stats;
1562
1563    // initialize the trie handle
1564    handle_out->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1565    hbtrie_init(handle_out->trie, handle_out->config.chunksize, OFFSET_SIZE,
1566                handle_out->file->blocksize,
1567                handle_in->trie->root_bid, // Source snapshot's trie root bid
1568                (void *)handle_out->bhandle, handle_out->btreeblkops,
1569                (void *)handle_out->dhandle, _fdb_readkey_wrap);
1570    // set aux for cmp wrapping function
1571    hbtrie_set_leaf_height_limit(handle_out->trie, 0xff);
1572    hbtrie_set_leaf_cmp(handle_out->trie, _fdb_custom_cmp_wrap);
1573
1574    if (handle_out->kvs) {
1575        hbtrie_set_map_function(handle_out->trie, fdb_kvs_find_cmp_chunk);
1576    }
1577
1578    handle_out->seqnum = handle_in->seqnum;
1579    if (handle_out->config.seqtree_opt == FDB_SEQTREE_USE) {
1580        if (handle_out->config.multi_kv_instances) {
1581            // multi KV instance mode .. HB+trie
1582            handle_out->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1583            hbtrie_init(handle_out->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
1584                        handle_out->file->blocksize,
1585                        handle_in->seqtrie->root_bid, // Source snapshot's seqtrie root bid
1586                        (void *)handle_out->bhandle, handle_out->btreeblkops,
1587                        (void *)handle_out->dhandle, _fdb_readseq_wrap);
1588
1589        } else {
1590            // single KV instance mode .. normal B+tree
1591            struct btree_kv_ops *seq_kv_ops =
1592                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
1593            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
1594            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
1595
1596            handle_out->seqtree = (struct btree*)malloc(sizeof(struct btree));
1597            // Init the seq tree using the root bid of the source snapshot.
1598            btree_init_from_bid(handle_out->seqtree, (void *)handle_out->bhandle,
1599                                handle_out->btreeblkops, seq_kv_ops,
1600                                handle_out->config.blocksize,
1601                                handle_in->seqtree->root_bid);
1602        }
1603    } else{
1604        handle_out->seqtree = NULL;
1605    }
1606
1607    status = btreeblk_end(handle_out->bhandle);
1608    if (status != FDB_RESULT_SUCCESS) {
1609        const char *msg = "Snapshot clone operation fails due to the errors in "
1610            "btreeblk_end() in a database file '%s'\n";
1611        fdb_log(&handle_in->log_callback, status, msg, handle_in->file->filename);
1612    }
1613
1614    return status;
1615}
1616
1617fdb_status _fdb_open(fdb_kvs_handle *handle,
1618                     const char *filename,
1619                     fdb_filename_mode_t filename_mode,
1620                     const fdb_config *config)
1621{
1622    struct filemgr_config fconfig;
1623    struct kvs_stat stat, empty_stat;
1624    bid_t trie_root_bid = BLK_NOT_FOUND;
1625    bid_t seq_root_bid = BLK_NOT_FOUND;
1626    bid_t stale_root_bid = BLK_NOT_FOUND;
1627    fdb_seqnum_t seqnum = 0;
1628    filemgr_header_revnum_t header_revnum = 0;
1629    filemgr_header_revnum_t latest_header_revnum = 0;
1630    fdb_seqtree_opt_t seqtree_opt = config->seqtree_opt;
1631    uint64_t ndocs = 0;
1632    uint64_t ndeletes = 0;
1633    uint64_t datasize = 0;
1634    uint64_t deltasize = 0;
1635    uint64_t last_wal_flush_hdr_bid = BLK_NOT_FOUND;
1636    uint64_t kv_info_offset = BLK_NOT_FOUND;
1637    uint64_t version;
1638    uint64_t header_flags = 0;
1639    uint8_t header_buf[FDB_BLOCKSIZE];
1640    char *compacted_filename = NULL;
1641    char *prev_filename = NULL;
1642    size_t header_len = 0;
1643    bool multi_kv_instances = config->multi_kv_instances;
1644
1645    uint64_t nlivenodes = 0;
1646    bid_t hdr_bid = 0; // initialize to zero for in-memory snapshot
1647    char actual_filename[FDB_MAX_FILENAME_LEN];
1648    char virtual_filename[FDB_MAX_FILENAME_LEN];
1649    char *target_filename = NULL;
1650    fdb_status status;
1651
1652    if (filename == NULL) {
1653        return FDB_RESULT_INVALID_ARGS;
1654    }
1655    if (strlen(filename) > (FDB_MAX_FILENAME_LEN - 8)) {
1656        // filename (including path) length is supported up to
1657        // (FDB_MAX_FILENAME_LEN - 8) bytes.
1658        return FDB_RESULT_TOO_LONG_FILENAME;
1659    }
1660
1661    if (filename_mode == FDB_VFILENAME &&
1662        !compactor_is_valid_mode(filename, (fdb_config *)config)) {
1663        return FDB_RESULT_INVALID_COMPACTION_MODE;
1664    }
1665
1666    _fdb_init_file_config(config, &fconfig);
1667
1668    if (filename_mode == FDB_VFILENAME) {
1669        compactor_get_actual_filename(filename, actual_filename,
1670                                      config->compaction_mode, &handle->log_callback);
1671    } else {
1672        strcpy(actual_filename, filename);
1673    }
1674
1675    if ( config->compaction_mode == FDB_COMPACTION_MANUAL ||
1676         (config->compaction_mode == FDB_COMPACTION_AUTO   &&
1677          filename_mode == FDB_VFILENAME) ) {
1678        // 1) manual compaction mode, OR
1679        // 2) auto compaction mode + 'filename' is virtual filename
1680        // -> copy 'filename'
1681        target_filename = (char *)filename;
1682    } else {
1683        // otherwise (auto compaction mode + 'filename' is actual filename)
1684        // -> copy 'virtual_filename'
1685        compactor_get_virtual_filename(filename, virtual_filename);
1686        target_filename = virtual_filename;
1687    }
1688
1689    // If the user is requesting legacy CRC pass that down to filemgr
1690    if(config->flags & FDB_OPEN_WITH_LEGACY_CRC) {
1691        fconfig.options |= FILEMGR_CREATE_CRC32;
1692    }
1693
1694    handle->fileops = get_filemgr_ops();
1695    filemgr_open_result result = filemgr_open((char *)actual_filename,
1696                                              handle->fileops,
1697                                              &fconfig, &handle->log_callback);
1698    if (result.rv != FDB_RESULT_SUCCESS) {
1699        return (fdb_status) result.rv;
1700    }
1701    handle->file = result.file;
1702
1703    if (config->compaction_mode == FDB_COMPACTION_MANUAL &&
1704        strcmp(filename, actual_filename)) {
1705        // It is in-place compacted file if
1706        // 1) compaction mode is manual, and
1707        // 2) actual filename is different to the filename given by user.
1708        // In this case, set the in-place compaction flag.
1709        filemgr_set_in_place_compaction(handle->file, true);
1710    }
1711    if (filemgr_is_in_place_compaction_set(handle->file)) {
1712        // This file was in-place compacted.
1713        // set 'handle->filename' to the original filename to trigger file renaming
1714        compactor_get_virtual_filename(filename, virtual_filename);
1715        target_filename = virtual_filename;
1716    }
1717
1718    if (handle->filename) {
1719        handle->filename = (char *)realloc(handle->filename,
1720                                           strlen(target_filename)+1);
1721    } else {
1722        handle->filename = (char*)malloc(strlen(target_filename)+1);
1723    }
1724    strcpy(handle->filename, target_filename);
1725
1726    // If cloning from a snapshot handle, fdb_snapshot_open would have already
1727    // set handle->last_hdr_bid to the block id of required header, so rewind..
1728    bid_t last_hdr_bid = atomic_get_uint64_t(&handle->last_hdr_bid);
1729    if (handle->shandle && last_hdr_bid) {
1730        status = filemgr_fetch_header(handle->file, last_hdr_bid,
1731                                      header_buf, &header_len, &seqnum,
1732                                      &latest_header_revnum, &deltasize, &version,
1733                                      NULL, &handle->log_callback);
1734        if (status != FDB_RESULT_SUCCESS) {
1735            free(handle->filename);
1736            handle->filename = NULL;
1737            filemgr_close(handle->file, false, handle->filename,
1738                              &handle->log_callback);
1739            return status;
1740        }
1741    } else { // Normal open
1742        filemgr_get_header(handle->file, header_buf, &header_len,
1743                           &last_hdr_bid, &seqnum, &latest_header_revnum);
1744        atomic_store_uint64_t(&handle->last_hdr_bid, last_hdr_bid);
1745        version = handle->file->version;
1746    }
1747
1748    // initialize the docio handle so kv headers may be read
1749    handle->dhandle = (struct docio_handle *)
1750                      calloc(1, sizeof(struct docio_handle));
1751    handle->dhandle->log_callback = &handle->log_callback;
1752    status = docio_init(handle->dhandle, handle->file,
1753                        config->compress_document_body);
1754    if (status != FDB_RESULT_SUCCESS) {
1755        free(handle->dhandle);
1756        free(handle->filename);
1757        handle->filename = NULL;
1758        filemgr_close(handle->file, false, handle->filename,
1759                          &handle->log_callback);
1760        return status;
1761    }
1762
1763    // fetch previous superblock bitmap info if exists
1764    // (this should be done after 'handle->dhandle' is initialized)
1765    if (handle->file->sb) {
1766        status = sb_bmp_fetch_doc(handle);
1767        if (status != FDB_RESULT_SUCCESS) {
1768            docio_free(handle->dhandle);
1769            free(handle->dhandle);
1770            free(handle->filename);
1771            handle->filename = NULL;
1772            filemgr_close(handle->file, false, handle->filename,
1773                              &handle->log_callback);
1774            return status;
1775        }
1776    }
1777
1778
1779    if (header_len > 0) {
1780        fdb_fetch_header(version, header_buf, &trie_root_bid, &seq_root_bid,
1781                         &stale_root_bid, &ndocs, &ndeletes, &nlivenodes,
1782                         &datasize, &last_wal_flush_hdr_bid, &kv_info_offset,
1783                         &header_flags, &compacted_filename, &prev_filename);
1784        // use existing setting for seqtree_opt
1785        if (header_flags & FDB_FLAG_SEQTREE_USE) {
1786            seqtree_opt = FDB_SEQTREE_USE;
1787        } else {
1788            seqtree_opt = FDB_SEQTREE_NOT_USE;
1789        }
1790        // Retrieve seqnum for multi-kv mode
1791        if (handle->kvs && handle->kvs->id > 0) {
1792            if (kv_info_offset != BLK_NOT_FOUND) {
1793                if (!filemgr_get_kv_header(handle->file)) {
1794                    struct kvs_header *kv_header;
1795                    _fdb_kvs_header_create(&kv_header);
1796                    // KV header already exists but not loaded .. read & import
1797                    fdb_kvs_header_read(kv_header, handle->dhandle,
1798                                        kv_info_offset, version, false);
1799                    if (!filemgr_set_kv_header(handle->file, kv_header,
1800                                               fdb_kvs_header_free)) {
1801                        _fdb_kvs_header_free(kv_header);
1802                    }
1803                }
1804                seqnum = _fdb_kvs_get_seqnum(handle->file->kv_header,
1805                                             handle->kvs->id);
1806            } else { // no kv_info offset, ok to set seqnum to zero
1807                seqnum = 0;
1808            }
1809        }
1810        // other flags
1811        if (header_flags & FDB_FLAG_ROOT_INITIALIZED) {
1812            handle->fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1813        }
1814        if (header_flags & FDB_FLAG_ROOT_CUSTOM_CMP) {
1815            handle->fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1816        }
1817        if (header_flags & FDB_FLAG_SUCCESSFULLY_COMPACTED) {
1818            filemgr_set_successfully_compacted(handle->file);
1819        }
1820        // use existing setting for multi KV instance mode
1821        if (kv_info_offset == BLK_NOT_FOUND) {
1822            multi_kv_instances = false;
1823        } else {
1824            multi_kv_instances = true;
1825        }
1826    }
1827
1828    handle->config = *config;
1829    handle->config.seqtree_opt = seqtree_opt;
1830    handle->config.multi_kv_instances = multi_kv_instances;
1831
1832    if (handle->shandle && handle->max_seqnum == FDB_SNAPSHOT_INMEM) {
1833        // Either an in-memory snapshot or cloning from an existing snapshot..
1834        hdr_bid = 0; // This prevents _fdb_restore_wal() as incoming handle's
1835                     // *_open() should have already restored it
1836    } else { // Persisted snapshot or file rollback..
1837
1838        // get the BID of the latest block
1839        // (it is OK if the block is not a DB header)
1840        bool dirty_data_exists = false;
1841        struct superblock *sb = handle->file->sb;
1842
1843        if (sb_bmp_exists(sb)) {
1844            dirty_data_exists = false;
1845            bid_t sb_last_hdr_bid = atomic_get_uint64_t(&sb->last_hdr_bid);
1846            if (sb_last_hdr_bid != BLK_NOT_FOUND) {
1847                // add 1 since we subtract 1 from 'hdr_bid' below soon
1848                hdr_bid = sb_last_hdr_bid + 1;
1849                if (atomic_get_uint64_t(&sb->cur_alloc_bid) != hdr_bid) {
1850                    // seq number has been increased since the last commit
1851                    seqnum = fdb_kvs_get_committed_seqnum(handle);
1852                }
1853            } else {
1854                hdr_bid = BLK_NOT_FOUND;
1855            }
1856        } else {
1857            hdr_bid = filemgr_get_pos(handle->file) / FDB_BLOCKSIZE;
1858            dirty_data_exists = (hdr_bid >
1859                        atomic_get_uint64_t(&handle->last_hdr_bid));
1860        }
1861
1862        if (hdr_bid == BLK_NOT_FOUND ||
1863            (sb && hdr_bid <= sb->config->num_sb)) {
1864            hdr_bid = 0;
1865        } else if (hdr_bid > 0) {
1866            --hdr_bid;
1867        }
1868
1869        if (handle->max_seqnum) {
1870            struct kvs_stat stat_ori;
1871            // backup original stats
1872            if (handle->kvs) {
1873                _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1874            } else {
1875                _kvs_stat_get(handle->file, 0, &stat_ori);
1876            }
1877
1878            if (dirty_data_exists){
1879                // uncommitted data exists beyond the last DB header
1880                // get the last committed seq number
1881                fdb_seqnum_t seq_commit;
1882                seq_commit = fdb_kvs_get_committed_seqnum(handle);
1883                if (seq_commit == 0 || seq_commit < handle->max_seqnum) {
1884                    // In case, snapshot_open is attempted with latest uncommitted
1885                    // sequence number
1886                    header_len = 0;
1887                } else if (seq_commit == handle->max_seqnum) {
1888                    // snapshot/rollback on the latest commit header
1889                    seqnum = seq_commit; // skip file reverse scan
1890                }
1891                hdr_bid = filemgr_get_header_bid(handle->file);
1892            }
1893            // Reverse scan the file to locate the DB header with seqnum marker
1894            header_revnum = latest_header_revnum;
1895            while (header_len && seqnum != handle->max_seqnum) {
1896                hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
1897                                          header_buf, &header_len, &seqnum,
1898                                          &header_revnum, NULL, &version, NULL,
1899                                          &handle->log_callback);
1900                if (header_len == 0) {
1901                    continue; // header doesn't exist
1902                }
1903                fdb_fetch_header(version, header_buf, &trie_root_bid,
1904                                 &seq_root_bid, &stale_root_bid,
1905                                 &ndocs, &ndeletes, &nlivenodes,
1906                                 &datasize, &last_wal_flush_hdr_bid,
1907                                 &kv_info_offset, &header_flags,
1908                                 &compacted_filename, NULL);
1909                atomic_store_uint64_t(&handle->last_hdr_bid, hdr_bid);
1910
1911                if (!handle->kvs || handle->kvs->id == 0) {
1912                    // single KVS mode OR default KVS
1913                    if (!handle->shandle) {
1914                        // rollback
1915                        struct kvs_stat stat_dst;
1916                        _kvs_stat_get(handle->file, 0, &stat_dst);
1917                        stat_dst.ndocs = ndocs;
1918                        stat_dst.ndeletes = ndeletes;
1919                        stat_dst.datasize = datasize;
1920                        stat_dst.nlivenodes = nlivenodes;
1921                        stat_dst.deltasize = deltasize;
1922                        _kvs_stat_set(handle->file, 0, stat_dst);
1923                    }
1924                    continue;
1925                }
1926
1927                int64_t doc_offset;
1928                struct kvs_header *kv_header;
1929                struct docio_object doc;
1930
1931                _fdb_kvs_header_create(&kv_header);
1932                memset(&doc, 0, sizeof(struct docio_object));
1933                doc_offset = docio_read_doc(handle->dhandle,
1934                                            kv_info_offset, &doc, true);
1935
1936                if (doc_offset <= 0) {
1937                    header_len = 0; // fail
1938                    _fdb_kvs_header_free(kv_header);
1939                } else {
1940                    _fdb_kvs_header_import(kv_header, doc.body,
1941                                           doc.length.bodylen, version, false);
1942                    // get local sequence number for the KV instance
1943                    seqnum = _fdb_kvs_get_seqnum(kv_header,
1944                                                 handle->kvs->id);
1945                    if (!handle->shandle) {
1946                        // rollback: replace kv_header stats
1947                        // read from the current header's kv_header
1948                        struct kvs_stat stat_src, stat_dst;
1949                        _kvs_stat_get_kv_header(kv_header,
1950                                                handle->kvs->id,
1951                                                &stat_src);
1952                        _kvs_stat_get(handle->file,
1953                                      handle->kvs->id,
1954                                      &stat_dst);
1955                        // update ndocs, datasize, nlivenodes
1956                        // into the current file's kv_header
1957                        // Note: stats related to WAL should not be updated
1958                        //       at this time. They will be adjusted through
1959                        //       discard & restore routines below.
1960                        stat_dst.ndocs = stat_src.ndocs;
1961                        stat_dst.datasize = stat_src.datasize;
1962                        stat_dst.nlivenodes = stat_src.nlivenodes;
1963                        _kvs_stat_set(handle->file,
1964                                      handle->kvs->id,
1965                                      stat_dst);
1966                    }
1967                    _fdb_kvs_header_free(kv_header);
1968                    free_docio_object(&doc, 1, 1, 1);
1969                }
1970            }
1971
1972            if (header_len && // header exists
1973                config->block_reusing_threshold > 0 && // block reuse is enabled
1974                config->block_reusing_threshold < 100 &&
1975                header_revnum < sb_get_min_live_revnum(handle->file)) {
1976                // cannot perform rollback/snapshot beyond the last live header
1977                header_len = 0;
1978            }
1979
1980            if (!header_len) { // Marker MUST match that of DB commit!
1981                // rollback original stats
1982                if (handle->kvs) {
1983                    _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1984                } else {
1985                    _kvs_stat_get(handle->file, 0, &stat_ori);
1986                }
1987
1988                docio_free(handle->dhandle);
1989                free(handle->dhandle);
1990                free(handle->filename);
1991                free(prev_filename);
1992                handle->filename = NULL;
1993                filemgr_close(handle->file, false, handle->filename,
1994                              &handle->log_callback);
1995                return FDB_RESULT_NO_DB_INSTANCE;
1996            }
1997
1998            if (!handle->shandle) { // Rollback mode, destroy file WAL..
1999                if (handle->config.multi_kv_instances) {
2000                    // multi KV instance mode
2001                    // clear only WAL items belonging to the instance
2002                    wal_close_kv_ins(handle->file,
2003                                     (handle->kvs)?(handle->kvs->id):(0),
2004                                     &handle->log_callback);
2005                } else {
2006                    wal_shutdown(handle->file, &handle->log_callback);
2007                }
2008            }
2009        } else { // snapshot to sequence number 0 requested..
2010            if (handle->shandle) { // fdb_snapshot_open API call
2011                if (seqnum) {
2012                    // Database currently has a non-zero seq number,
2013                    // but the snapshot was requested with a seq number zero.
2014                    docio_free(handle->dhandle);
2015                    free(handle->dhandle);
2016                    free(handle->filename);
2017                    free(prev_filename);
2018                    handle->filename = NULL;
2019                    filemgr_close(handle->file, false, handle->filename,
2020                                  &handle->log_callback);
2021                    return FDB_RESULT_NO_DB_INSTANCE;
2022                }
2023            } // end of zero max_seqnum but non-rollback check
2024        } // end of zero max_seqnum check
2025    } // end of durable snapshot locating
2026
2027    handle->btreeblkops = btreeblk_get_ops();
2028    handle->bhandle = (struct btreeblk_handle *)
2029                      calloc(1, sizeof(struct btreeblk_handle));
2030    handle->bhandle->log_callback = &handle->log_callback;
2031
2032    handle->dirty_updates = 0;
2033
2034    if (handle->config.compaction_buf_maxsize == 0) {
2035        handle->config.compaction_buf_maxsize = FDB_COMP_BUF_MINSIZE;
2036    }
2037
2038    btreeblk_init(handle->bhandle, handle->file, handle->file->blocksize);
2039
2040    handle->cur_header_revnum = latest_header_revnum;
2041    if (header_revnum) {
2042        if (filemgr_is_rollback_on(handle->file)) {
2043            // rollback mode
2044            // set rollback header revnum
2045            handle->rollback_revnum = header_revnum;
2046        } else {
2047            // snapshot mode (only for snapshot)
2048            handle->cur_header_revnum = header_revnum;
2049        }
2050    }
2051    handle->last_wal_flush_hdr_bid = last_wal_flush_hdr_bid;
2052
2053    memset(&empty_stat, 0x0, sizeof(empty_stat));
2054    _kvs_stat_get(handle->file, 0, &stat);
2055    if (!memcmp(&stat, &empty_stat, sizeof(stat))) { // first open
2056        // sync (default) KVS stat with DB header
2057        stat.nlivenodes = nlivenodes;
2058        stat.ndocs = ndocs;
2059        stat.datasize = datasize;
2060        _kvs_stat_set(handle->file, 0, stat);
2061    }
2062
2063    handle->kv_info_offset = kv_info_offset;
2064    if (handle->config.multi_kv_instances && !handle->shandle) {
2065        // multi KV instance mode
2066        filemgr_mutex_lock(handle->file);
2067        if (kv_info_offset == BLK_NOT_FOUND) {
2068            // there is no KV header .. create & initialize
2069            fdb_kvs_header_create(handle->file);
2070            // TODO: If another handle is opened before the first header is appended,
2071            // an unnecessary KV info doc is appended. We need to address it.
2072            kv_info_offset = fdb_kvs_header_append(handle);
2073        } else if (handle->file->kv_header == NULL) {
2074            // KV header already exists but not loaded .. read & import
2075            fdb_kvs_header_create(handle->file);
2076            fdb_kvs_header_read(handle->file->kv_header, handle->dhandle,
2077                                kv_info_offset, version, false);
2078        }
2079        filemgr_mutex_unlock(handle->file);
2080
2081        // validation check for key order of all KV stores
2082        if (handle == handle->fhandle->root) {
2083            fdb_status fs = fdb_kvs_cmp_check(handle);
2084            if (fs != FDB_RESULT_SUCCESS) { // cmp function mismatch
2085                docio_free(handle->dhandle);
2086                free(handle->dhandle);
2087                btreeblk_free(handle->bhandle);
2088                free(handle->bhandle);
2089                free(handle->filename);
2090                handle->filename = NULL;
2091                filemgr_close(handle->file, false, handle->filename,
2092                              &handle->log_callback);
2093                return fs;
2094            }
2095        }
2096    }
2097    handle->kv_info_offset = kv_info_offset;
2098
2099    if (handle->kv_info_offset != BLK_NOT_FOUND &&
2100        handle->kvs == NULL) {
2101        // multi KV instance mode .. turn on config flag
2102        handle->config.multi_kv_instances = true;
2103        // only super handle can be opened using fdb_open(...)
2104        fdb_kvs_info_create(NULL, handle, handle->file, NULL);
2105    }
2106
2107    if (handle->shandle) { // Populate snapshot stats..
2108        if (kv_info_offset == BLK_NOT_FOUND) { // Single KV mode
2109            memset(&handle->shandle->stat, 0x0,
2110                    sizeof(handle->shandle->stat));
2111            handle->shandle->stat.ndocs = ndocs;
2112            handle->shandle->stat.datasize = datasize;
2113            handle->shandle->stat.nlivenodes = nlivenodes;
2114        } else { // Multi KV instance mode, populate specific kv stats
2115            memset(&handle->shandle->stat, 0x0,
2116                    sizeof(handle->shandle->stat));
2117            _kvs_stat_get(handle->file, handle->kvs->id,
2118                    &handle->shandle->stat);
2119            // Since wal is restored below, we have to reset
2120            // wal stats to zero.
2121            handle->shandle->stat.wal_ndeletes = 0;
2122            handle->shandle->stat.wal_ndocs = 0;
2123        }
2124    }
2125
2126    // initialize pointer to the global operational stats of this KV store
2127    handle->op_stats = filemgr_get_ops_stats(handle->file, handle->kvs);
2128    if (!handle->op_stats) {
2129        const char *msg = "Database open fails due to the error in retrieving "
2130            "the global operational stats of KV store in a database file '%s'\n";
2131        fdb_log(&handle->log_callback, FDB_RESULT_OPEN_FAIL, msg,
2132                handle->file->filename);
2133        return FDB_RESULT_OPEN_FAIL;
2134    }
2135
2136    handle->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
2137    hbtrie_init(handle->trie, config->chunksize, OFFSET_SIZE,
2138                handle->file->blocksize, trie_root_bid,
2139                (void *)handle->bhandle, handle->btreeblkops,
2140                (void *)handle->dhandle, _fdb_readkey_wrap);
2141    // set aux for cmp wrapping function
2142    hbtrie_set_leaf_height_limit(handle->trie, 0xff);
2143    hbtrie_set_leaf_cmp(handle->trie, _fdb_custom_cmp_wrap);
2144
2145    if (handle->kvs) {
2146        hbtrie_set_map_function(handle->trie, fdb_kvs_find_cmp_chunk);
2147    }
2148
2149    handle->seqnum = seqnum;
2150    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2151        if (handle->config.multi_kv_instances) {
2152            // multi KV instance mode .. HB+trie
2153            handle->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
2154            hbtrie_init(handle->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
2155                        handle->file->blocksize, seq_root_bid,
2156                        (void *)handle->bhandle, handle->btreeblkops,
2157                        (void *)handle->dhandle, _fdb_readseq_wrap);
2158
2159        } else {
2160            // single KV instance mode .. normal B+tree
2161            struct btree_kv_ops *seq_kv_ops =
2162                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
2163            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
2164            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
2165
2166            handle->seqtree = (struct btree*)malloc(sizeof(struct btree));
2167            if (seq_root_bid == BLK_NOT_FOUND) {
2168                btree_init(handle->seqtree, (void *)handle->bhandle,
2169                           handle->btreeblkops, seq_kv_ops,
2170                           handle->config.blocksize, sizeof(fdb_seqnum_t),
2171                           OFFSET_SIZE, 0x0, NULL);
2172            }else{
2173                if (btree_init_from_bid(handle->seqtree, (void *)handle->bhandle,
2174                                    handle->btreeblkops, seq_kv_ops,
2175                                        handle->config.blocksize, seq_root_bid) != BTREE_RESULT_SUCCESS){
2176                    _fdb_invalidate_dbheader(handle);
2177                    free(handle->dhandle);
2178                    free(handle->filename);
2179                    handle->filename = NULL;
2180                    filemgr_close(handle->file, false, handle->filename,
2181                                  &handle->log_callback);
2182                    return FDB_RECOVERABLE_ERR;
2183                }
2184            }
2185        }
2186    }else{
2187        handle->seqtree = NULL;
2188    }
2189
2190    // Stale-block tree (supported since MAGIC_002)
2191    // this tree is independent to multi/single KVS mode option
2192    if (ver_staletree_support(handle->file->version)) {
2193        // normal B+tree
2194        struct btree_kv_ops *stale_kv_ops =
2195            (struct btree_kv_ops *)calloc(1, sizeof(struct btree_kv_ops));
2196        stale_kv_ops = btree_kv_get_kb64_vb64(stale_kv_ops);
2197        stale_kv_ops->cmp = _cmp_uint64_t_endian_safe;
2198
2199        handle->staletree = (struct btree*)calloc(1, sizeof(struct btree));
2200        if (stale_root_bid == BLK_NOT_FOUND) {
2201            btree_init(handle->staletree, (void *)handle->bhandle,
2202                       handle->btreeblkops, stale_kv_ops,
2203                       handle->config.blocksize, sizeof(filemgr_header_revnum_t),
2204                       OFFSET_SIZE, 0x0, NULL);
2205         }else{
2206            if (btree_init_from_bid(handle->staletree, (void *)handle->bhandle,
2207                                handle->btreeblkops, stale_kv_ops,
2208                                    handle->config.blocksize, stale_root_bid) != BTREE_RESULT_SUCCESS){
2209                _fdb_invalidate_dbheader(handle);
2210                free(handle->dhandle);
2211                free(handle->filename);
2212                handle->filename = NULL;
2213                filemgr_close(handle->file, false, handle->filename,
2214                              &handle->log_callback);
2215                return FDB_RECOVERABLE_ERR;
2216            }
2217            // prefetch stale info into memory
2218            fdb_load_inmem_stale_info(handle);
2219         }
2220    } else {
2221        handle->staletree = NULL;
2222    }
2223
2224    if (handle->config.multi_kv_instances && handle->max_seqnum) {
2225        // restore only docs belonging to the KV instance
2226        // handle->kvs should not be NULL
2227        _fdb_restore_wal(handle, FDB_RESTORE_KV_INS,
2228                         hdr_bid, (handle->kvs)?(handle->kvs->id):(0));
2229    } else {
2230        // normal restore
2231        _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, hdr_bid, 0);
2232    }
2233
2234    if (compacted_filename &&
2235        filemgr_get_file_status(handle->file) == FILE_NORMAL &&
2236        !(config->flags & FDB_OPEN_FLAG_RDONLY)) { // do not recover read-only
2237        status = _fdb_recover_compaction(handle, compacted_filename);
2238        if (status == FDB_RESULT_FAIL_BY_COMPACTION) {
2239            // recovery would have unlinked the previous file
2240            free(prev_filename);
2241            prev_filename = NULL;
2242        }
2243        // Either
2244        // 1. recovered the newly compacted file and deleted the old file or
2245        // 2. recovery failed and are going to stick to the old file or
2246        // In both cases, the old_filename and new_filename are not needed.
2247        if (handle->file){
2248            handle->file->old_filename =  NULL;
2249            handle->file->new_filename =  NULL;
2250        }
2251    }
2252
2253    if (prev_filename) {
2254        if (!handle->shandle && strcmp(prev_filename, handle->file->filename)) {
2255            // record the old filename into the file handle of current file
2256            // and REMOVE old file on the first open
2257            // WARNING: snapshots must have been opened before this call
2258            if (filemgr_update_file_linkage(handle->file, prev_filename, NULL)) {
2259                // Open the old file with read-only mode.
2260                // (Temporarily disable log callback at this time since
2261                //  the old file might be already removed.)
2262                err_log_callback dummy_cb;
2263                dummy_cb.callback = fdb_dummy_log_callback;
2264                dummy_cb.ctx_data = NULL;
2265                fconfig.options = FILEMGR_READONLY;
2266                filemgr_open_result result = filemgr_open(prev_filename,
2267                                                          handle->fileops,
2268                                                          &fconfig,
2269                                                          &dummy_cb);
2270                if (result.file) {
2271                    filemgr_remove_pending(result.file, handle->file,
2272                                           &handle->log_callback);
2273                    filemgr_close(result.file, 0, handle->filename,
2274                                  &handle->log_callback);
2275                }
2276            }
2277        }
2278        // we allocated a memory region for file->old_filename and
2279        // prev_filename would be copied to there,
2280        // so it is OK to free it here whatever the result is.
2281        free(prev_filename);
2282    }
2283
2284    status = btreeblk_end(handle->bhandle);
2285    if (status != FDB_RESULT_SUCCESS) {
2286        // When fdb_kvs_open() is being issued in parallel with fdb_open()
2287        // it is possible that this call (fdb_open()) hits a write failure
2288        // because the btreeblock to be written was already made immutable
2289        // by the commit from the fdb_kvs_open(). Simpy ignore this error case.
2290        if (status == FDB_RESULT_WRITE_FAIL) {
2291            if (filemgr_get_header_revnum(handle->file)
2292                                             == latest_header_revnum) {
2293                return status;
2294            } else {
2295                status = FDB_RESULT_SUCCESS;
2296            }
2297        } else {
2298            return status;
2299        }
2300    }
2301
2302    // do not register read-only handles
2303    if (!(config->flags & FDB_OPEN_FLAG_RDONLY)) {
2304        if (config->compaction_mode == FDB_COMPACTION_AUTO) {
2305            status = compactor_register_file(handle->file,
2306                                             (fdb_config *)config,
2307                                             &handle->log_callback);
2308        }
2309        if (status == FDB_RESULT_SUCCESS) {
2310            status = bgflusher_register_file(handle->file,
2311                                             (fdb_config *)config,
2312                                             &handle->log_callback);
2313        }
2314    }
2315
2316    return status;
2317}
2318
2319LIBFDB_API
2320fdb_status fdb_set_log_callback(fdb_kvs_handle *handle,
2321                                fdb_log_callback log_callback,
2322                                void *ctx_data)
2323{
2324    if (!handle) {
2325        return FDB_RESULT_INVALID_HANDLE;
2326    }
2327
2328    handle->log_callback.callback = log_callback;
2329    handle->log_callback.ctx_data = ctx_data;
2330    return FDB_RESULT_SUCCESS;
2331}
2332
2333LIBFDB_API
2334void fdb_set_fatal_error_callback(fdb_fatal_error_callback err_callback)
2335{
2336    fatal_error_callback = err_callback;
2337}
2338
2339LIBFDB_API
2340fdb_status fdb_doc_create(fdb_doc **doc, const void *key, size_t keylen,
2341                          const void *meta, size_t metalen,
2342                          const void *body, size_t bodylen)
2343{
2344    if (doc == NULL || keylen > FDB_MAX_KEYLEN ||
2345        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
2346        return FDB_RESULT_INVALID_ARGS;
2347    }
2348
2349    *doc = (fdb_doc*)calloc(1, sizeof(fdb_doc));
2350    if (*doc == NULL) { // LCOV_EXCL_START
2351        return FDB_RESULT_ALLOC_FAIL;
2352    } // LCOV_EXCL_STOP
2353
2354    (*doc)->seqnum = SEQNUM_NOT_USED;
2355
2356    if (key && keylen > 0) {
2357        (*doc)->key = (void *)malloc(keylen);
2358        if ((*doc)->key == NULL) { // LCOV_EXCL_START
2359            return FDB_RESULT_ALLOC_FAIL;
2360        } // LCOV_EXCL_STOP
2361        memcpy((*doc)->key, key, keylen);
2362        (*doc)->keylen = keylen;
2363    } else {
2364        (*doc)->key = NULL;
2365        (*doc)->keylen = 0;
2366    }
2367
2368    if (meta && metalen > 0) {
2369        (*doc)->meta = (void *)malloc(metalen);
2370        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
2371            return FDB_RESULT_ALLOC_FAIL;
2372        } // LCOV_EXCL_STOP
2373        memcpy((*doc)->meta, meta, metalen);
2374        (*doc)->metalen = metalen;
2375    } else {
2376        (*doc)->meta = NULL;
2377        (*doc)->metalen = 0;
2378    }
2379
2380    if (body && bodylen > 0) {
2381        (*doc)->body = (void *)malloc(bodylen);
2382        if ((*doc)->body == NULL) { // LCOV_EXCL_START
2383            return FDB_RESULT_ALLOC_FAIL;
2384        } // LCOV_EXCL_STOP
2385        memcpy((*doc)->body, body, bodylen);
2386        (*doc)->bodylen = bodylen;
2387    } else {
2388        (*doc)->body = NULL;
2389        (*doc)->bodylen = 0;
2390    }
2391
2392    return FDB_RESULT_SUCCESS;
2393}
2394
2395LIBFDB_API
2396fdb_status fdb_doc_update(fdb_doc **doc,
2397                          const void *meta, size_t metalen,
2398                          const void *body, size_t bodylen)
2399{
2400    if (doc == NULL ||
2401        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
2402        return FDB_RESULT_INVALID_ARGS;
2403    }
2404    if (*doc == NULL) {
2405        return FDB_RESULT_INVALID_ARGS;
2406    }
2407
2408    if (meta && metalen > 0) {
2409        // free previous metadata
2410        free((*doc)->meta);
2411        // allocate new metadata
2412        (*doc)->meta = (void *)malloc(metalen);
2413        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
2414            return FDB_RESULT_ALLOC_FAIL;
2415        } // LCOV_EXCL_STOP
2416        memcpy((*doc)->meta, meta, metalen);
2417        (*doc)->metalen = metalen;
2418    }
2419
2420    if (body && bodylen > 0) {
2421        // free previous body
2422        free((*doc)->body);
2423        // allocate new body
2424        (*doc)->body = (void *)malloc(bodylen);
2425        if ((*doc)->body == NULL) { // LCOV_EXCL_START
2426            return FDB_RESULT_ALLOC_FAIL;
2427        } // LCOV_EXCL_STOP
2428        memcpy((*doc)->body, body, bodylen);
2429        (*doc)->bodylen = bodylen;
2430    }
2431
2432    (*doc)->seqnum = SEQNUM_NOT_USED;
2433    return FDB_RESULT_SUCCESS;
2434}
2435
2436LIBFDB_API
2437void fdb_doc_set_seqnum(fdb_doc *doc,
2438                        const fdb_seqnum_t seqnum)
2439{
2440    if (doc) {
2441        doc->seqnum = seqnum;
2442        if (seqnum != SEQNUM_NOT_USED) {
2443            doc->flags |= FDB_CUSTOM_SEQNUM; // fdb_set will now use above seqnum
2444        } else { // reset custom seqnum flag, fdb_set will now generate new seqnum
2445            doc->flags &= ~FDB_CUSTOM_SEQNUM;
2446        }
2447    }
2448}
2449
2450// doc MUST BE allocated by malloc
2451LIBFDB_API
2452fdb_status fdb_doc_free(fdb_doc *doc)
2453{
2454    if (doc) {
2455        free(doc->key);
2456        free(doc->meta);
2457        free(doc->body);
2458        free(doc);
2459    }
2460    return FDB_RESULT_SUCCESS;
2461}
2462
2463INLINE fdb_status _fdb_wal_get_old_offset(void *voidhandle,
2464                                        struct wal_item *item,
2465                                        uint64_t *ret_old_offset)
2466{
2467    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
2468    uint64_t old_offset = 0;
2469
2470    if (item->action == WAL_ACT_REMOVE) {
2471        // For immediate remove, old_offset value is critical
2472        // so that we should get an exact value.
2473        if (hbtrie_find(handle->trie,
2474                    item->header->key,
2475                    item->header->keylen,
2476                        (void*)&old_offset) == HBTRIE_CORRUPTED_RECOVERING_ERR){
2477            _fdb_invalidate_dbheader(handle);
2478            return FDB_RECOVERABLE_ERR;
2479        }
2480    } else {
2481        if (hbtrie_find_offset(handle->trie,
2482                           item->header->key,
2483                           item->header->keylen,
2484                               (void*)&old_offset) == HBTRIE_CORRUPTED_RECOVERING_ERR){
2485            _fdb_invalidate_dbheader(handle);
2486            return FDB_RECOVERABLE_ERR;
2487        }
2488    }
2489    btreeblk_end(handle->bhandle);
2490    *ret_old_offset = _endian_decode(old_offset);
2491
2492    return FDB_RESULT_SUCCESS;
2493}
2494
2495// A stale sequence number entry that can be purged from the sequence tree
2496// during the WAL flush.
2497struct wal_stale_seq_entry {
2498    fdb_kvs_id_t kv_id;
2499    fdb_seqnum_t seqnum;
2500    struct avl_node avl_entry;
2501};
2502
2503// Delta changes in KV store stats during the WAL flush
2504struct wal_kvs_delta_stat {
2505    fdb_kvs_id_t kv_id;
2506    int64_t nlivenodes;
2507    int64_t ndocs;
2508    int64_t ndeletes;
2509    int64_t datasize;
2510    int64_t deltasize;
2511    struct avl_node avl_entry;
2512};
2513
2514INLINE int _fdb_seq_entry_cmp(struct avl_node *a, struct avl_node *b, void *aux)
2515{
2516    (void) aux;
2517    struct wal_stale_seq_entry *entry1 = _get_entry(a, struct wal_stale_seq_entry,
2518                                                    avl_entry);
2519    struct wal_stale_seq_entry *entry2 = _get_entry(b, struct wal_stale_seq_entry,
2520                                                    avl_entry);
2521    if (entry1->kv_id < entry2->kv_id) {
2522        return -1;
2523    } else if (entry1->kv_id > entry2->kv_id) {
2524        return 1;
2525    } else {
2526        return _CMP_U64(entry1->seqnum, entry2->seqnum);
2527    }
2528}
2529
2530
2531// Compare function to sort KVS delta stat entries in the AVL tree during WAL flush
2532INLINE int _kvs_delta_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
2533{
2534    (void) aux;
2535    struct wal_kvs_delta_stat *stat1 = _get_entry(a, struct wal_kvs_delta_stat,
2536                                                  avl_entry);
2537    struct wal_kvs_delta_stat *stat2 = _get_entry(b, struct wal_kvs_delta_stat,
2538                                                  avl_entry);
2539    if (stat1->kv_id < stat2->kv_id) {
2540        return -1;
2541    } else if (stat1->kv_id > stat2->kv_id) {
2542        return 1;
2543    } else {
2544        return 0;
2545    }
2546}
2547
2548INLINE fdb_status _fdb_wal_flush_seq_purge(void *dbhandle,
2549                                     struct avl_tree *stale_seqnum_list,
2550                                     struct avl_tree *kvs_delta_stats)
2551{
2552    fdb_seqnum_t _seqnum;
2553    int64_t nlivenodes;
2554    int64_t ndeltanodes;
2555    int64_t delta;
2556    uint8_t kvid_seqnum[sizeof(fdb_kvs_id_t) + sizeof(fdb_seqnum_t)];
2557    struct wal_stale_seq_entry *seq_entry;
2558    struct wal_kvs_delta_stat *delta_stat;
2559    struct wal_kvs_delta_stat kvs_delta_query;
2560
2561    fdb_kvs_handle *handle = (fdb_kvs_handle *)dbhandle;
2562    struct avl_node *node = avl_first(stale_seqnum_list);
2563    while (node) {
2564        seq_entry = _get_entry(node, struct wal_stale_seq_entry, avl_entry);
2565        node = avl_next(node);
2566        nlivenodes = handle->bhandle->nlivenodes;
2567        ndeltanodes = handle->bhandle->ndeltanodes;
2568        _seqnum = _endian_encode(seq_entry->seqnum);
2569        if (handle->kvs) {
2570            // multi KV instance mode .. HB+trie
2571            kvid2buf(sizeof(fdb_kvs_id_t), seq_entry->kv_id, kvid_seqnum);
2572            memcpy(kvid_seqnum + sizeof(fdb_kvs_id_t), &_seqnum, sizeof(fdb_seqnum_t));
2573            if (hbtrie_remove(handle->seqtrie, (void*)kvid_seqnum,
2574                              sizeof(fdb_kvs_id_t) + sizeof(fdb_seqnum_t))
2575                == HBTRIE_CORRUPTED_RECOVERING_ERR){
2576                _fdb_invalidate_dbheader(handle);
2577                return FDB_RECOVERABLE_ERR;
2578            }
2579        } else {
2580            btree_remove(handle->seqtree, (void*)&_seqnum);
2581        }
2582        btreeblk_end(handle->bhandle);
2583
2584        kvs_delta_query.kv_id = seq_entry->kv_id;
2585        avl_node *delta_stat_node = avl_search(kvs_delta_stats,
2586                                               &kvs_delta_query.avl_entry,
2587                                               _kvs_delta_stat_cmp);
2588        if (delta_stat_node) {
2589            delta_stat = _get_entry(delta_stat_node, struct wal_kvs_delta_stat,
2590                                    avl_entry);
2591            delta = handle->bhandle->nlivenodes - nlivenodes;
2592            delta_stat->nlivenodes += delta;
2593            delta = handle->bhandle->ndeltanodes - ndeltanodes;
2594            delta *= handle->config.blocksize;
2595            delta_stat->deltasize += delta;
2596        }
2597        avl_remove(stale_seqnum_list, &seq_entry->avl_entry);
2598        free(seq_entry);
2599    }
2600    return FDB_RESULT_SUCCESS;
2601}
2602
2603INLINE void _fdb_wal_flush_kvs_delta_stats(struct filemgr *file,
2604                                           struct avl_tree *kvs_delta_stats)
2605{
2606    struct avl_node *node;
2607    struct wal_kvs_delta_stat *delta_stat;
2608    node = avl_first(kvs_delta_stats);
2609    while (node) {
2610        delta_stat = _get_entry(node, struct wal_kvs_delta_stat, avl_entry);
2611        node = avl_next(node);
2612        _kvs_stat_update_attr(file, delta_stat->kv_id,
2613                              KVS_STAT_DATASIZE, delta_stat->datasize);
2614        _kvs_stat_update_attr(file, delta_stat->kv_id,
2615                              KVS_STAT_NDOCS, delta_stat->ndocs);
2616        _kvs_stat_update_attr(file, delta_stat->kv_id,
2617                              KVS_STAT_NDELETES, delta_stat->ndeletes);
2618        _kvs_stat_update_attr(file, delta_stat->kv_id,
2619                              KVS_STAT_NLIVENODES, delta_stat->nlivenodes);
2620        _kvs_stat_update_attr(file, delta_stat->kv_id,
2621                              KVS_STAT_DELTASIZE, delta_stat->deltasize);
2622        avl_remove(kvs_delta_stats, &delta_stat->avl_entry);
2623        free(delta_stat);
2624    }
2625}
2626
2627INLINE fdb_status _fdb_wal_flush_func(void *voidhandle,
2628                                      struct wal_item *item,
2629                                      struct avl_tree *stale_seqnum_list,
2630                                      struct avl_tree *kvs_delta_stats)
2631{
2632    hbtrie_result hr;
2633    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
2634    fdb_seqnum_t _seqnum;
2635    fdb_kvs_id_t kv_id = 0;
2636    fdb_status fs = FDB_RESULT_SUCCESS;
2637    uint8_t *var_key = alca(uint8_t, handle->config.chunksize);
2638    int size_id, size_seq;
2639    uint8_t *kvid_seqnum;
2640    uint64_t old_offset;
2641    int64_t _offset;
2642    int64_t delta;
2643    struct docio_object _doc;
2644    struct filemgr *file = handle->dhandle->file;
2645
2646    memset(var_key, 0, handle->config.chunksize);
2647    if (handle->kvs) {
2648        buf2kvid(handle->config.chunksize, item->header->key, &kv_id);
2649    } else {
2650        kv_id = 0;
2651    }
2652
2653    struct wal_kvs_delta_stat *kvs_delta_stat;
2654    struct wal_kvs_delta_stat kvs_delta_query;
2655    kvs_delta_query.kv_id = kv_id;
2656    avl_node *delta_stat_node = avl_search(kvs_delta_stats,
2657                                           &kvs_delta_query.avl_entry,
2658                                           _kvs_delta_stat_cmp);
2659    if (delta_stat_node) {
2660        kvs_delta_stat = _get_entry(delta_stat_node, struct wal_kvs_delta_stat,
2661                                    avl_entry);
2662    } else {
2663        kvs_delta_stat = (struct wal_kvs_delta_stat *)
2664            calloc(1, sizeof(struct wal_kvs_delta_stat));
2665        kvs_delta_stat->kv_id = kv_id;
2666        avl_insert(kvs_delta_stats, &kvs_delta_stat->avl_entry,
2667                   _kvs_delta_stat_cmp);
2668    }
2669
2670    int64_t nlivenodes = handle->bhandle->nlivenodes;
2671    int64_t ndeltanodes = handle->bhandle->ndeltanodes;
2672
2673    if (item->action == WAL_ACT_INSERT ||
2674        item->action == WAL_ACT_LOGICAL_REMOVE) {
2675        _offset = _endian_encode(item->offset);
2676
2677        if (hbtrie_insert(handle->trie,
2678                      item->header->key,
2679                      item->header->keylen,
2680                      (void *)&_offset,
2681                          (void *)&old_offset) == HBTRIE_CORRUPTED_RECOVERING_ERR){
2682            _fdb_invalidate_dbheader(handle);
2683            return FDB_RECOVERABLE_ERR;
2684        }
2685
2686        fs = btreeblk_end(handle->bhandle);
2687        if (fs != FDB_RESULT_SUCCESS) {
2688            return fs;
2689        }
2690        old_offset = _endian_decode(old_offset);
2691
2692        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2693            _seqnum = _endian_encode(item->seqnum);
2694            if (handle->kvs) {
2695                // multi KV instance mode .. HB+trie
2696                uint64_t old_offset_local;
2697
2698                size_id = sizeof(fdb_kvs_id_t);
2699                size_seq = sizeof(fdb_seqnum_t);
2700                kvid_seqnum = alca(uint8_t, size_id + size_seq);
2701                kvid2buf(size_id, kv_id, kvid_seqnum);
2702                memcpy(kvid_seqnum + size_id, &_seqnum, size_seq);
2703                hbtrie_insert(handle->seqtrie, kvid_seqnum, size_id + size_seq,
2704                              (void *)&_offset, (void *)&old_offset_local);
2705            } else {
2706                btree_insert(handle->seqtree, (void *)&_seqnum,
2707                             (void *)&_offset);
2708            }
2709            fs = btreeblk_end(handle->bhandle);
2710            if (fs != FDB_RESULT_SUCCESS) {
2711                return fs;
2712            }
2713        }
2714
2715        delta = handle->bhandle->nlivenodes - nlivenodes;
2716        kvs_delta_stat->nlivenodes += delta;
2717        delta = handle->bhandle->ndeltanodes - ndeltanodes;
2718        delta *= handle->config.blocksize;
2719        kvs_delta_stat->deltasize += delta;
2720
2721        if (old_offset == BLK_NOT_FOUND) {
2722            if (item->action == WAL_ACT_INSERT) {
2723                ++kvs_delta_stat->ndocs;
2724            } else { // inserted a logical deleted doc into main index
2725                ++kvs_delta_stat->ndeletes;
2726            }
2727            kvs_delta_stat->datasize += item->doc_size;
2728            kvs_delta_stat->deltasize += item->doc_size;
2729        } else { // update or logical delete
2730            // This block is already cached when we call HBTRIE_INSERT.
2731            // No additional block access.
2732            char dummy_key[FDB_MAX_KEYLEN];
2733            _doc.meta = _doc.body = NULL;
2734            _doc.key = &dummy_key;
2735            _offset = docio_read_doc_key_meta(handle->dhandle, old_offset,
2736                                              &_doc, true);
2737            if (_offset < 0) {
2738                return (fdb_status) _offset;
2739            } else if (_offset == 0) {
2740                // Note that this is not an error as old_offset is pointing to
2741                // the zero-filled region in a document block.
2742                return FDB_RESULT_KEY_NOT_FOUND;
2743            }
2744            free(_doc.meta);
2745            filemgr_mark_stale(file, old_offset, _fdb_get_docsize(_doc.length));
2746
2747            if (!(_doc.length.flag & DOCIO_DELETED)) {//prev doc was not deleted
2748                if (item->action == WAL_ACT_LOGICAL_REMOVE) { // now deleted
2749                    --kvs_delta_stat->ndocs;
2750                    ++kvs_delta_stat->ndeletes;
2751                } // else no change (prev doc was insert, now just an update)
2752            } else { // prev doc in main index was a logically deleted doc
2753                if (item->action == WAL_ACT_INSERT) { // now undeleted
2754                    ++kvs_delta_stat->ndocs;
2755                    --kvs_delta_stat->ndeletes;
2756                } // else no change (prev doc was deleted, now re-deleted)
2757            }
2758
2759            delta = (int)item->doc_size - (int)_fdb_get_docsize(_doc.length);
2760            kvs_delta_stat->datasize += delta;
2761            bid_t last_hdr_bid = atomic_get_uint64_t(&handle->last_hdr_bid);
2762            if (last_hdr_bid * handle->config.blocksize < old_offset) {
2763                kvs_delta_stat->deltasize += delta;
2764            } else {
2765                kvs_delta_stat->deltasize += (int)item->doc_size;
2766            }
2767
2768            // Avoid duplicates (remove previous sequence number)
2769            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2770                struct wal_stale_seq_entry *entry = (struct wal_stale_seq_entry *)
2771                    calloc(1, sizeof(struct wal_stale_seq_entry));
2772                entry->kv_id = kv_id;
2773                entry->seqnum = _doc.seqnum;
2774                avl_insert(stale_seqnum_list, &entry->avl_entry,
2775                           _fdb_seq_entry_cmp);
2776            }
2777        }
2778    } else {
2779        // Immediate remove
2780        old_offset = item->old_offset;
2781        hr = hbtrie_remove(handle->trie, item->header->key,
2782                           item->header->keylen);
2783        if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
2784            _fdb_invalidate_dbheader(handle);
2785            return FDB_RECOVERABLE_ERR;
2786        }
2787        fs = btreeblk_end(handle->bhandle);
2788        if (fs != FDB_RESULT_SUCCESS) {
2789            return fs;
2790        }
2791
2792        if (hr == HBTRIE_RESULT_SUCCESS) {
2793            // This block is already cached when we call _fdb_wal_get_old_offset
2794            // No additional block access should be done.
2795            char dummy_key[FDB_MAX_KEYLEN];
2796            _doc.meta = _doc.body = NULL;
2797            _doc.key = &dummy_key;
2798            _offset = docio_read_doc_key_meta(handle->dhandle, old_offset,
2799                                              &_doc, true);
2800            if (_offset < 0) {
2801                return (fdb_status) _offset;
2802            } else if (_offset == 0) {
2803                return FDB_RESULT_KEY_NOT_FOUND;
2804            }
2805            free(_doc.meta);
2806            filemgr_mark_stale(file, old_offset, _fdb_get_docsize(_doc.length));
2807
2808            // Reduce the total number of docs by one
2809            --kvs_delta_stat->ndocs;
2810            if (_doc.length.flag & DOCIO_DELETED) {//prev deleted doc is dropped
2811                --kvs_delta_stat->ndeletes;
2812            }
2813
2814            // Reduce the total datasize by size of previously present doc
2815            delta = -(int)_fdb_get_docsize(_doc.length);
2816            kvs_delta_stat->datasize += delta;
2817            // if multiple wal flushes happen before commit, then it's possible
2818            // that this doc deleted was inserted & flushed after last commit
2819            // In this case we need to update the deltasize too which tracks
2820            // the amount of new data inserted between commits.
2821            bid_t last_hdr_bid = atomic_get_uint64_t(&handle->last_hdr_bid);
2822            if (last_hdr_bid * handle->config.blocksize < old_offset) {
2823                kvs_delta_stat->deltasize += delta;
2824            }
2825
2826            // remove sequence number for the removed doc
2827            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2828                struct wal_stale_seq_entry *entry = (struct wal_stale_seq_entry *)
2829                    calloc(1, sizeof(struct wal_stale_seq_entry));
2830                entry->kv_id = kv_id;
2831                entry->seqnum = _doc.seqnum;
2832                avl_insert(stale_seqnum_list, &entry->avl_entry, _fdb_seq_entry_cmp);
2833            }
2834
2835            // Update index size to new size after the remove operation
2836            delta = handle->bhandle->nlivenodes - nlivenodes;
2837            kvs_delta_stat->nlivenodes += delta;
2838
2839            // ndeltanodes measures number of new index nodes created due to
2840            // this hbtrie_remove() operation
2841            delta = (int)handle->bhandle->ndeltanodes - ndeltanodes;
2842            delta *= handle->config.blocksize;
2843            kvs_delta_stat->deltasize += delta;
2844        }
2845    }
2846    return FDB_RESULT_SUCCESS;
2847}
2848
2849void fdb_sync_db_header(fdb_kvs_handle *handle)
2850{
2851    uint64_t cur_revnum = filemgr_get_header_revnum(handle->file);
2852    if (handle->cur_header_revnum != cur_revnum) {
2853        void *header_buf = NULL;
2854        size_t header_len;
2855        bid_t hdr_bid;
2856        filemgr_header_revnum_t revnum;
2857
2858        header_buf = filemgr_get_header(handle->file, NULL, &header_len,
2859                                        &hdr_bid, NULL, &revnum);
2860        if (header_len > 0) {
2861            uint64_t header_flags, dummy64, version;
2862            bid_t idtree_root;
2863            bid_t new_seq_root;
2864            bid_t new_stale_root;
2865            char *compacted_filename;
2866            char *prev_filename = NULL;
2867
2868            version = handle->file->version;
2869            atomic_store_uint64_t(&handle->last_hdr_bid, hdr_bid);
2870            handle->cur_header_revnum = revnum;
2871
2872            fdb_fetch_header(version, header_buf, &idtree_root,
2873                             &new_seq_root, &new_stale_root, &dummy64,
2874                             &dummy64, &dummy64,
2875                             &dummy64, &handle->last_wal_flush_hdr_bid,
2876                             &handle->kv_info_offset, &header_flags,
2877                             &compacted_filename, &prev_filename);
2878
2879            if (handle->dirty_updates) {
2880                // discard all cached writable b+tree nodes
2881                // to avoid data inconsistency with other writers
2882                btreeblk_discard_blocks(handle->bhandle);
2883            }
2884
2885            handle->trie->root_bid = idtree_root;
2886
2887            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2888                if (new_seq_root != handle->seqtree->root_bid) {
2889                    if (handle->config.multi_kv_instances) {
2890                        handle->seqtrie->root_bid = new_seq_root;
2891                    } else {
2892                        btree_init_from_bid(handle->seqtree,
2893                                            handle->seqtree->blk_handle,
2894                                            handle->seqtree->blk_ops,
2895                                            handle->seqtree->kv_ops,
2896                                            handle->seqtree->blksize,
2897                                            new_seq_root);
2898                    }
2899                }
2900            }
2901
2902            if (ver_staletree_support(version)) {
2903                btree_init_from_bid(handle->staletree,
2904                                    handle->staletree->blk_handle,
2905                                    handle->staletree->blk_ops,
2906                                    handle->staletree->kv_ops,
2907                                    handle->staletree->blksize,
2908                                    new_stale_root);
2909            } else {
2910                handle->staletree = NULL;
2911            }
2912
2913            if (prev_filename) {
2914                free(prev_filename);
2915            }
2916
2917            handle->dirty_updates = 0;
2918            if (handle->kvs) {
2919                // multiple KV instance mode AND sub handle
2920                handle->seqnum = fdb_kvs_get_seqnum(handle->file,
2921                                                    handle->kvs->id);
2922            } else {
2923                // super handle OR single KV instance mode
2924                handle->seqnum = filemgr_get_seqnum(handle->file);
2925            }
2926        } else {
2927            atomic_store_uint64_t(&handle->last_hdr_bid,
2928                                  filemgr_get_header_bid(handle->file));
2929        }
2930
2931        if (header_buf) {
2932            free(header_buf);
2933        }
2934    } else {
2935        if (handle == handle->fhandle->root) {
2936            // MB-20091: Commits use root handle that points to default kv store
2937            // The same default KV Store can have a different user-level handle.
2938            // To ensure that the root handle which will do the commit always
2939            // remains updated with the latest sequence number generated by the
2940            // user KVS Handle, we must always update the root handle's seqnum
2941            // even if there are no new commit headers to sync up in the file.
2942            handle->seqnum = filemgr_get_seqnum(handle->file);
2943        }
2944    }
2945}
2946
2947fdb_status fdb_check_file_reopen(fdb_kvs_handle *handle, file_status_t *status)
2948{
2949    bool fhandle_ret;
2950    fdb_status fs = FDB_RESULT_SUCCESS;
2951    file_status_t fstatus = filemgr_get_file_status(handle->file);
2952    // check whether the compaction is done
2953    if (fstatus == FILE_REMOVED_PENDING) {
2954        uint64_t ndocs, ndeletes, datasize, nlivenodes, last_wal_flush_hdr_bid;
2955        uint64_t kv_info_offset, header_flags;
2956        size_t header_len;
2957        char *new_filename;
2958        uint8_t *buf = alca(uint8_t, handle->config.blocksize);
2959        bid_t trie_root_bid, seq_root_bid, stale_root_bid;
2960        fdb_config config = handle->config;
2961
2962        // close the current file and newly open the new file
2963        if (handle->config.compaction_mode == FDB_COMPACTION_AUTO) {
2964            // compaction daemon mode .. just close and then open
2965            char filename[FDB_MAX_FILENAME_LEN];
2966            strcpy(filename, handle->filename);
2967
2968            // We don't need to maintain fhandle list for the old file
2969            // as there will be no more mutation on the file.
2970            fhandle_ret = filemgr_fhandle_remove(handle->file, handle->fhandle);
2971            fs = _fdb_close(handle);
2972            if (fs != FDB_RESULT_SUCCESS) {
2973                if (fhandle_ret) {
2974                    filemgr_fhandle_add(handle->file, handle->fhandle);
2975                }
2976                return fs;
2977            }
2978
2979            fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
2980            if (fs != FDB_RESULT_SUCCESS) {
2981                return fs;
2982            }
2983            filemgr_fhandle_add(handle->file, handle->fhandle);
2984
2985        } else {
2986            filemgr_get_header(handle->file, buf, &header_len, NULL, NULL, NULL);
2987            fdb_fetch_header(handle->file->version, buf,
2988                             &trie_root_bid, &seq_root_bid, &stale_root_bid,
2989                             &ndocs, &ndeletes, &nlivenodes, &datasize,
2990                             &last_wal_flush_hdr_bid,
2991                             &kv_info_offset, &header_flags,
2992                             &new_filename, NULL);
2993
2994            fhandle_ret = filemgr_fhandle_remove(handle->file, handle->fhandle);
2995            fs = _fdb_close(handle);
2996            if (fs != FDB_RESULT_SUCCESS) {
2997                if (fhandle_ret) {
2998                    filemgr_fhandle_add(handle->file, handle->fhandle);
2999                }
3000                return fs;
3001            }
3002
3003            fs = _fdb_open(handle, new_filename, FDB_AFILENAME, &config);
3004            if (fs != FDB_RESULT_SUCCESS) {
3005                return fs;
3006            }
3007            filemgr_fhandle_add(handle->file, handle->fhandle);
3008        }
3009    }
3010    if (status) {
3011        *status = fstatus;
3012    }
3013    return fs;
3014}
3015
3016static void _fdb_sync_dirty_root(fdb_kvs_handle *handle)
3017{
3018    bid_t dirty_idtree_root = BLK_NOT_FOUND;
3019    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
3020
3021    if (handle->shandle) {
3022        // skip snapshot
3023        return;
3024    }
3025
3026    struct filemgr_dirty_update_node *dirty_update;
3027    dirty_update = filemgr_dirty_update_get_latest(handle->file);
3028    btreeblk_set_dirty_update(handle->bhandle, dirty_update);
3029
3030    if (dirty_update) {
3031        filemgr_dirty_update_get_root(handle->file, dirty_update,
3032                                      &dirty_idtree_root, &dirty_seqtree_root);
3033        _fdb_import_dirty_root(handle, dirty_idtree_root, dirty_seqtree_root);
3034        btreeblk_discard_blocks(handle->bhandle);
3035    }
3036
3037    return;
3038}
3039
3040static void _fdb_release_dirty_root(fdb_kvs_handle *handle)
3041{
3042    if (!handle->shandle) {
3043        struct filemgr_dirty_update_node *dirty_update;
3044        dirty_update = btreeblk_get_dirty_update(handle->bhandle);
3045        if (dirty_update) {
3046            filemgr_dirty_update_close_node(handle->file, dirty_update);
3047            btreeblk_clear_dirty_update(handle->bhandle);
3048        }
3049    }
3050}
3051
3052LIBFDB_API
3053fdb_status fdb_get(fdb_kvs_handle *handle, fdb_doc *doc)
3054{
3055    uint64_t offset;
3056    int64_t _offset;
3057    struct docio_object _doc;
3058    struct filemgr *wal_file = NULL;
3059    struct docio_handle *dhandle;
3060    struct _fdb_key_cmp_info cmp_info;
3061    fdb_status wr;
3062    hbtrie_result hr = HBTRIE_RESULT_FAIL;
3063    fdb_txn *txn;
3064    fdb_doc doc_kv;
3065    LATENCY_STAT_START();
3066
3067    if (!handle) {
3068        return FDB_RESULT_INVALID_HANDLE;
3069    }
3070
3071    if (!doc || !doc->key || doc->keylen == 0 ||
3072        doc->keylen > FDB_MAX_KEYLEN ||
3073        (handle->kvs_config.custom_cmp &&
3074            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
3075        return FDB_RESULT_INVALID_ARGS;
3076    }
3077
3078    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
3079        return FDB_RESULT_HANDLE_BUSY;
3080    }
3081
3082    doc_kv = *doc;
3083
3084    if (handle->kvs) {
3085        // multi KV instance mode
3086        int size_chunk = handle->config.chunksize;
3087        doc_kv.keylen = doc->keylen + size_chunk;
3088        doc_kv.key = alca(uint8_t, doc_kv.keylen);
3089        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
3090        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
3091    }
3092
3093    if (!handle->shandle) {
3094        fdb_check_file_reopen(handle, NULL);
3095        txn = handle->fhandle->root->txn;
3096        if (!txn) {
3097            txn = &handle->file->global_txn;
3098        }
3099    } else {
3100        txn = handle->shandle->snap_txn;
3101    }
3102
3103    cmp_info.kvs_config = handle->kvs_config;
3104    cmp_info.kvs = handle->kvs;
3105    wal_file = handle->file;
3106    dhandle = handle->dhandle;
3107
3108    if (handle->kvs) {
3109        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, &doc_kv,
3110                      &offset);
3111    } else {
3112        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, doc,
3113                      &offset);
3114    }
3115
3116    if (!handle->shandle) {
3117        fdb_sync_db_header(handle);
3118    }
3119
3120    atomic_incr_uint64_t(&handle->op_stats->num_gets, std::memory_order_relaxed);
3121
3122    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
3123        _fdb_sync_dirty_root(handle);
3124
3125        if (handle->kvs) {
3126            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
3127                             (void *)&offset);
3128            if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
3129                atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3130                _fdb_invalidate_dbheader(handle);
3131                return FDB_RECOVERABLE_ERR;
3132            }
3133        } else {
3134            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
3135                             (void *)&offset);
3136            if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
3137                atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3138                _fdb_invalidate_dbheader(handle);
3139                return FDB_RECOVERABLE_ERR;
3140            }
3141        }
3142        btreeblk_end(handle->bhandle);
3143        offset = _endian_decode(offset);
3144
3145        _fdb_release_dirty_root(handle);
3146    }
3147
3148    if ((wr == FDB_RESULT_SUCCESS && offset != BLK_NOT_FOUND) ||
3149         hr == HBTRIE_RESULT_SUCCESS) {
3150        bool alloced_meta = doc->meta ? false : true;
3151        bool alloced_body = doc->body ? false : true;
3152        if (handle->kvs) {
3153            _doc.key = doc_kv.key;
3154            _doc.length.keylen = doc_kv.keylen;
3155            doc->deleted = doc_kv.deleted; // update deleted field if wal_find
3156        } else {
3157            _doc.key = doc->key;
3158            _doc.length.keylen = doc->keylen;
3159        }
3160        _doc.meta = doc->meta;
3161        _doc.body = doc->body;
3162
3163        if (wr == FDB_RESULT_SUCCESS && doc->deleted) {
3164            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3165            return FDB_RESULT_KEY_NOT_FOUND;
3166        }
3167
3168        _offset = docio_read_doc(dhandle, offset, &_doc, true);
3169        if (_offset <= 0) {
3170            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3171            return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
3172        }
3173
3174        if (_doc.length.keylen != doc_kv.keylen ||
3175            _doc.length.flag & DOCIO_DELETED) {
3176            free_docio_object(&_doc, 0, alloced_meta, alloced_body);
3177            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3178            return FDB_RESULT_KEY_NOT_FOUND;
3179        }
3180
3181        doc->seqnum = _doc.seqnum;
3182        doc->metalen = _doc.length.metalen;
3183        doc->bodylen = _doc.length.bodylen;
3184        doc->meta = _doc.meta;
3185        doc->body = _doc.body;
3186        doc->deleted = _doc.length.flag & DOCIO_DELETED;
3187        doc->size_ondisk = _fdb_get_docsize(_doc.length);
3188        doc->offset = offset;
3189
3190        LATENCY_STAT_END(handle->file, FDB_LATENCY_GETS);
3191        atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3192        return FDB_RESULT_SUCCESS;
3193    }
3194
3195    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3196    return FDB_RESULT_KEY_NOT_FOUND;
3197}
3198
3199// search document metadata using key
3200LIBFDB_API
3201fdb_status fdb_get_metaonly(fdb_kvs_handle *handle, fdb_doc *doc)
3202{
3203    uint64_t offset;
3204    struct docio_object _doc;
3205    struct docio_handle *dhandle;
3206    struct filemgr *wal_file = NULL;
3207    fdb_status wr;
3208    hbtrie_result hr = HBTRIE_RESULT_FAIL;
3209    fdb_txn *txn;
3210    struct _fdb_key_cmp_info cmp_info;
3211    fdb_doc doc_kv;
3212    LATENCY_STAT_START();
3213
3214    if (!handle) {
3215        return FDB_RESULT_INVALID_HANDLE;
3216    }
3217
3218    if (!doc || !doc->key ||
3219        doc->keylen == 0 || doc->keylen > FDB_MAX_KEYLEN ||
3220        (handle->kvs_config.custom_cmp &&
3221            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
3222        return FDB_RESULT_INVALID_ARGS;
3223    }
3224
3225    doc_kv = *doc;
3226
3227    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
3228        return FDB_RESULT_HANDLE_BUSY;
3229    }
3230
3231    if (handle->kvs) {
3232        // multi KV instance mode
3233        int size_chunk = handle->config.chunksize;
3234        doc_kv.keylen = doc->keylen + size_chunk;
3235        doc_kv.key =