xref: /6.0.3/forestdb/src/forestdb.cc (revision 5202bab4)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <sys/time.h>
25#endif
26
27#include "libforestdb/forestdb.h"
28#include "fdb_internal.h"
29#include "filemgr.h"
30#include "hbtrie.h"
31#include "list.h"
32#include "breakpad.h"
33#include "btree.h"
34#include "btree_kv.h"
35#include "btree_var_kv_ops.h"
36#include "docio.h"
37#include "btreeblock.h"
38#include "common.h"
39#include "wal.h"
40#include "filemgr_ops.h"
41#include "configuration.h"
42#include "internal_types.h"
43#include "bgflusher.h"
44#include "compactor.h"
45#include "memleak.h"
46#include "time_utils.h"
47#include "timing.h"
48#include "system_resource_stats.h"
49#include "version.h"
50#include "staleblock.h"
51
52#ifdef __DEBUG
53#ifndef __DEBUG_FDB
54    #undef DBG
55    #undef DBGCMD
56    #undef DBGSW
57    #define DBG(...)
58    #define DBGCMD(...)
59    #define DBGSW(n, ...)
60#endif
61#endif
62
63
64static volatile uint8_t fdb_initialized = 0;
65static volatile uint32_t fdb_open_inprog = 0;
66#ifdef SPIN_INITIALIZER
67static spin_t initial_lock = SPIN_INITIALIZER;
68#else
69static volatile unsigned int initial_lock_status = 0;
70static spin_t initial_lock;
71#endif
72
73INLINE int _cmp_uint64_t_endian_safe(void *key1, void *key2, void *aux)
74{
75    (void) aux;
76    uint64_t a,b;
77    a = *(uint64_t*)key1;
78    b = *(uint64_t*)key2;
79    a = _endian_decode(a);
80    b = _endian_decode(b);
81    return _CMP_U64(a, b);
82}
83
84size_t _fdb_readkey_wrap(void *handle, uint64_t offset, void *buf)
85{
86    fdb_status fs;
87    keylen_t keylen;
88    struct docio_handle *dhandle = (struct docio_handle*)handle;
89
90    offset = _endian_decode(offset);
91    fs = docio_read_doc_key(dhandle, offset, &keylen, buf);
92    if (fs == FDB_RESULT_SUCCESS) {
93        return keylen;
94    } else {
95        const char *msg = "docio_read_doc_key error: read failure on "
96            "offset %" _F64 " in a database file '%s' "
97            ": FDB status %d, lastbid 0x%" _X64 ", "
98            "curblock 0x%" _X64 ", curpos 0x%x\n";
99        fdb_log(NULL, FDB_RESULT_READ_FAIL, msg, offset,
100                dhandle->file->filename, fs, dhandle->lastbid,
101                dhandle->curblock, dhandle->curpos);
102        dbg_print_buf(dhandle->readbuffer, dhandle->file->blocksize, true, 16);
103        return 0;
104    }
105}
106
107size_t _fdb_readseq_wrap(void *handle, uint64_t offset, void *buf)
108{
109    int size_id, size_seq, size_chunk;
110    fdb_seqnum_t _seqnum;
111    struct docio_object doc;
112    struct docio_handle *dhandle = (struct docio_handle *)handle;
113
114    size_id = sizeof(fdb_kvs_id_t);
115    size_seq = sizeof(fdb_seqnum_t);
116    size_chunk = dhandle->file->config->chunksize;
117    memset(&doc, 0, sizeof(struct docio_object));
118
119    offset = _endian_decode(offset);
120    if (docio_read_doc_key_meta((struct docio_handle *)handle, offset,
121                                &doc, true) <= 0) {
122        return 0;
123    }
124    buf2buf(size_chunk, doc.key, size_id, buf);
125    _seqnum = _endian_encode(doc.seqnum);
126    memcpy((uint8_t*)buf + size_id, &_seqnum, size_seq);
127
128    free(doc.key);
129    free(doc.meta);
130
131    return size_id + size_seq;
132}
133
134int _fdb_custom_cmp_wrap(void *key1, void *key2, void *aux)
135{
136    int is_key1_inf, is_key2_inf;
137    uint8_t *keystr1 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
138    uint8_t *keystr2 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
139    size_t keylen1, keylen2;
140    btree_cmp_args *args = (btree_cmp_args *)aux;
141    fdb_custom_cmp_variable cmp = (fdb_custom_cmp_variable)args->aux;
142
143    is_key1_inf = _is_inf_key(key1);
144    is_key2_inf = _is_inf_key(key2);
145    if (is_key1_inf && is_key2_inf) { // both are infinite
146        return 0;
147    } else if (!is_key1_inf && is_key2_inf) { // key2 is infinite
148        return -1;
149    } else if (is_key1_inf && !is_key2_inf) { // key1 is infinite
150        return 1;
151    }
152
153    _get_var_key(key1, (void*)keystr1, &keylen1);
154    _get_var_key(key2, (void*)keystr2, &keylen2);
155
156    if (keylen1 == 0 && keylen2 == 0) {
157        return 0;
158    } else if (keylen1 ==0 && keylen2 > 0) {
159        return -1;
160    } else if (keylen1 > 0 && keylen2 == 0) {
161        return 1;
162    }
163
164    return cmp(keystr1, keylen1, keystr2, keylen2);
165}
166
167void fdb_fetch_header(uint64_t version,
168                      void *header_buf,
169                      bid_t *trie_root_bid,
170                      bid_t *seq_root_bid,
171                      bid_t *stale_root_bid,
172                      uint64_t *ndocs,
173                      uint64_t *ndeletes,
174                      uint64_t *nlivenodes,
175                      uint64_t *datasize,
176                      uint64_t *last_wal_flush_hdr_bid,
177                      uint64_t *kv_info_offset,
178                      uint64_t *header_flags,
179                      char **new_filename,
180                      char **old_filename)
181{
182    size_t offset = 0;
183    uint16_t new_filename_len;
184    uint16_t old_filename_len;
185
186    seq_memcpy(trie_root_bid, (uint8_t *)header_buf + offset,
187               sizeof(bid_t), offset);
188    *trie_root_bid = _endian_decode(*trie_root_bid);
189
190    seq_memcpy(seq_root_bid, (uint8_t *)header_buf + offset,
191               sizeof(bid_t), offset);
192    *seq_root_bid = _endian_decode(*seq_root_bid);
193
194    if (ver_staletree_support(version)) {
195        seq_memcpy(stale_root_bid, (uint8_t *)header_buf + offset,
196                   sizeof(bid_t), offset);
197        *stale_root_bid = _endian_decode(*stale_root_bid);
198    } else {
199        *stale_root_bid = BLK_NOT_FOUND;
200    }
201
202    seq_memcpy(ndocs, (uint8_t *)header_buf + offset,
203               sizeof(uint64_t), offset);
204    *ndocs = _endian_decode(*ndocs);
205    if (ver_is_atleast_magic_001(version)) {
206        seq_memcpy(ndeletes, (uint8_t *)header_buf + offset,
207                   sizeof(uint64_t), offset);
208        *ndeletes = _endian_decode(*ndeletes);
209    } else {
210        *ndeletes = 0;
211    }
212
213    seq_memcpy(nlivenodes, (uint8_t *)header_buf + offset,
214               sizeof(uint64_t), offset);
215    *nlivenodes = _endian_decode(*nlivenodes);
216
217    seq_memcpy(datasize, (uint8_t *)header_buf + offset,
218               sizeof(uint64_t), offset);
219    *datasize = _endian_decode(*datasize);
220
221    seq_memcpy(last_wal_flush_hdr_bid, (uint8_t *)header_buf + offset,
222               sizeof(uint64_t), offset);
223    *last_wal_flush_hdr_bid = _endian_decode(*last_wal_flush_hdr_bid);
224
225    seq_memcpy(kv_info_offset, (uint8_t *)header_buf + offset,
226               sizeof(uint64_t), offset);
227    *kv_info_offset = _endian_decode(*kv_info_offset);
228
229    seq_memcpy(header_flags, (uint8_t *)header_buf + offset,
230               sizeof(uint64_t), offset);
231    *header_flags = _endian_decode(*header_flags);
232
233    seq_memcpy(&new_filename_len, (uint8_t *)header_buf + offset,
234               sizeof(new_filename_len), offset);
235    new_filename_len = _endian_decode(new_filename_len);
236    seq_memcpy(&old_filename_len, (uint8_t *)header_buf + offset,
237               sizeof(old_filename_len), offset);
238    old_filename_len = _endian_decode(old_filename_len);
239    if (new_filename_len) {
240        *new_filename = (char*)((uint8_t *)header_buf + offset);
241    } else {
242        *new_filename = NULL;
243    }
244    offset += new_filename_len;
245    if (old_filename && old_filename_len) {
246        *old_filename = (char *) malloc(old_filename_len);
247        seq_memcpy(*old_filename,
248                   (uint8_t *)header_buf + offset,
249                   old_filename_len, offset);
250    }
251}
252
253// read the revnum of the given header of BID
254INLINE filemgr_header_revnum_t _fdb_get_header_revnum(fdb_kvs_handle *handle, bid_t bid)
255{
256    uint8_t *buf = alca(uint8_t, handle->file->blocksize);
257    uint64_t version;
258    size_t header_len;
259    fdb_seqnum_t seqnum;
260    filemgr_header_revnum_t revnum = 0;
261    fdb_status fs;
262
263    fs = filemgr_fetch_header(handle->file, bid, buf, &header_len,
264                              &seqnum, &revnum, NULL, &version, NULL,
265                              &handle->log_callback);
266    if (fs != FDB_RESULT_SUCCESS) {
267        return 0;
268    }
269    return revnum;
270}
271
272INLINE filemgr_header_revnum_t _fdb_get_bmp_revnum(fdb_kvs_handle *handle, bid_t bid)
273{
274    uint8_t *buf = alca(uint8_t, handle->file->blocksize);
275    uint64_t version, bmp_revnum = 0;
276    size_t header_len;
277    fdb_seqnum_t seqnum;
278    filemgr_header_revnum_t revnum;
279    fdb_status fs;
280
281    fs = filemgr_fetch_header(handle->file, bid, buf, &header_len,
282                              &seqnum, &revnum, NULL, &version, &bmp_revnum,
283                              &handle->log_callback);
284    if (fs != FDB_RESULT_SUCCESS) {
285        return 0;
286    }
287    return bmp_revnum;
288}
289
290void fdb_dummy_log_callback(int err_code, const char *err_msg, void *ctx_data)
291{
292    (void)err_code;
293    (void)err_msg;
294    (void)ctx_data;
295    return;
296}
297
298INLINE void _fdb_restore_wal(fdb_kvs_handle *handle,
299                             fdb_restore_mode_t mode,
300                             bid_t hdr_bid,
301                             fdb_kvs_id_t kv_id_req)
302{
303    struct filemgr *file = handle->file;
304    uint32_t blocksize = handle->file->blocksize;
305    uint64_t last_wal_flush_hdr_bid = handle->last_wal_flush_hdr_bid;
306    uint64_t hdr_off = hdr_bid * FDB_BLOCKSIZE;
307    uint64_t offset = 0; //assume everything from first block needs restoration
308    uint64_t filesize = filemgr_get_pos(handle->file);
309    uint64_t doc_scan_limit;
310    uint64_t start_bmp_revnum, stop_bmp_revnum;
311    uint64_t cur_bmp_revnum = (uint64_t)-1;
312    bid_t next_doc_block = BLK_NOT_FOUND;
313    struct _fdb_key_cmp_info cmp_info;
314    err_log_callback *log_callback;
315
316    if (!hdr_off) { // Nothing to do if we don't have a header block offset
317        return;
318    }
319
320    if (last_wal_flush_hdr_bid != BLK_NOT_FOUND) {
321        offset = (last_wal_flush_hdr_bid + 1) * blocksize;
322    }
323
324    // If a valid last header was retrieved and it matches the current header
325    // OR if WAL already had entries populated, then no crash recovery needed
326    if (hdr_off == offset || hdr_bid == last_wal_flush_hdr_bid ||
327        (!handle->shandle && wal_get_size(file) &&
328            mode != FDB_RESTORE_KV_INS)) {
329        return;
330    }
331
332    if (mode == FDB_RESTORE_NORMAL && !handle->shandle) {
333        // for normal WAL restore, set status to dirty
334        // (only when the previous status is clean or dirty)
335        wal_set_dirty_status(handle->file, FDB_WAL_DIRTY, true);
336    }
337
338    // Temporarily disable the error logging callback as there are false positive
339    // checksum errors in docio_read_doc.
340    // TODO: Need to adapt docio_read_doc to separate false checksum errors.
341    err_log_callback dummy_cb;
342    log_callback = handle->dhandle->log_callback;
343    dummy_cb.callback = fdb_dummy_log_callback;
344    dummy_cb.ctx_data = NULL;
345    handle->dhandle->log_callback = &dummy_cb;
346
347    if (!handle->shandle) {
348        filemgr_mutex_lock(file);
349    }
350    cmp_info.kvs_config = handle->kvs_config;
351    cmp_info.kvs = handle->kvs;
352
353    start_bmp_revnum = _fdb_get_bmp_revnum(handle, last_wal_flush_hdr_bid);
354    stop_bmp_revnum= _fdb_get_bmp_revnum(handle, hdr_bid);
355    cur_bmp_revnum = start_bmp_revnum;
356
357    // A: reused blocks during the 1st block reclaim (bmp_revnum: 1)
358    // B: reused blocks during the 2nd block reclaim (bmp_revnum: 2)
359    // otherwise: live block (bmp_revnum: 0)
360    //  1 2   3    4    5 6  7  8   9  10
361    // +-------------------------------------------+
362    // |  BBBBAAAAABBBBB  AAABBB    AAA            |
363    // +-------------------------------------------+
364    //              ^                     ^
365    //              hdr_bid               last_wal_flush
366    //
367    // scan order: 1 -> 5 -> 8 -> 10 -> 3 -> 6 -> 9 -> 2 -> 4 -> 7
368    // iteration #1: scan docs with bmp_revnum==0 in [last_wal_flush ~ filesize]
369    // iteration #2: scan docs with bmp_revnum==1 in [0 ~ filesize]
370    // iteration #3: scan docs with bmp_revnum==2 in [0 ~ hdr_bid]
371
372    do {
373        if (cur_bmp_revnum > stop_bmp_revnum) {
374            break;
375        } else if (cur_bmp_revnum == stop_bmp_revnum) {
376
377            bid_t sb_last_hdr_bid = BLK_NOT_FOUND;
378            if (handle->file->sb) {
379                sb_last_hdr_bid = atomic_get_uint64_t(&handle->file->sb->last_hdr_bid);
380            }
381            if (!handle->shandle && handle->file->sb &&
382                sb_last_hdr_bid != BLK_NOT_FOUND) {
383                hdr_off = (sb_last_hdr_bid+1) * blocksize;
384            }
385
386            doc_scan_limit = hdr_off;
387            if (offset >= hdr_off) {
388                break;
389            }
390        } else {
391            doc_scan_limit = filesize;
392        }
393
394        if (!docio_check_buffer(handle->dhandle, offset / blocksize,
395                                cur_bmp_revnum)) {
396            // not a document block .. move to next block
397        } else {
398            do {
399                struct docio_object doc;
400                int64_t _offset;
401                uint64_t doc_offset;
402                memset(&doc, 0, sizeof(doc));
403                _offset = docio_read_doc(handle->dhandle, offset, &doc, true);
404                if (_offset <= 0) { // reached unreadable doc, skip block
405                    // TODO: Need to have this function return fdb_status, so that
406                    // WAL restore operation should fail if offset < 0
407                    break;
408                } else if ((uint64_t)_offset < offset) {
409                    // If more than one writer is appending docs concurrently,
410                    // they have their own doc block linked list and doc blocks
411                    // may not be consecutive. For example,
412                    //
413                    // Writer 1): 100 -> 102 -> 2 -> 4     | commit
414                    // Writer 2):    101 - > 103 -> 3 -> 5 |
415                    //
416                    // In this case, if we read doc BID 102, then 'offset' will jump
417                    // to doc BID 2, without reading BID 103.
418                    //
419                    // To address this issue, in case that 'offset' decreases,
420                    // remember the next doc block, and follow the doc linked list
421                    // first. After the linked list ends, 'offset' cursor will be
422                    // reset to 'next_doc_block'.
423                    next_doc_block = (offset / blocksize) + 1;
424                }
425                if (doc.key || (doc.length.flag & DOCIO_TXN_COMMITTED)) {
426                    // check if the doc is transactional or not, and
427                    // also check if the doc contains system info
428                    if (!(doc.length.flag & DOCIO_TXN_DIRTY) &&
429                        !(doc.length.flag & DOCIO_SYSTEM)) {
430                        if (doc.length.flag & DOCIO_TXN_COMMITTED) {
431                            // commit mark .. read doc offset
432                            doc_offset = doc.doc_offset;
433                            // read the previously skipped doc
434                            if (docio_read_doc(handle->dhandle, doc_offset, &doc, true) <= 0) {
435                                // doc read error
436                                free(doc.key);
437                                free(doc.meta);
438                                free(doc.body);
439                                offset = _offset;
440                                continue;
441                            }
442                        } else {
443                            doc_offset = offset;
444                        }
445
446                        // If say a snapshot is taken on a db handle after
447                        // rollback, then skip WAL items after rollback point
448                        if ((mode == FDB_RESTORE_KV_INS || !handle->kvs) &&
449                            doc.seqnum > handle->seqnum) {
450                            free(doc.key);
451                            free(doc.meta);
452                            free(doc.body);
453                            offset = _offset;
454                            continue;
455                        }
456
457                        // restore document
458                        fdb_doc wal_doc;
459                        wal_doc.keylen = doc.length.keylen;
460                        wal_doc.bodylen = doc.length.bodylen;
461                        wal_doc.key = doc.key;
462                        wal_doc.seqnum = doc.seqnum;
463                        wal_doc.deleted = doc.length.flag & DOCIO_DELETED;
464
465                        if (!handle->shandle) {
466                            wal_doc.metalen = doc.length.metalen;
467                            wal_doc.meta = doc.meta;
468                            wal_doc.size_ondisk = _fdb_get_docsize(doc.length);
469
470                            if (handle->kvs) {
471                                // check seqnum before insert
472                                fdb_kvs_id_t kv_id;
473                                fdb_seqnum_t kv_seqnum;
474                                buf2kvid(handle->config.chunksize,
475                                         wal_doc.key, &kv_id);
476
477                                kv_seqnum = fdb_kvs_get_seqnum(handle->file, kv_id);
478                                if (doc.seqnum <= kv_seqnum &&
479                                        ((mode == FDB_RESTORE_KV_INS &&
480                                            kv_id == kv_id_req) ||
481                                         (mode == FDB_RESTORE_NORMAL)) ) {
482                                    // if mode is NORMAL, restore all items
483                                    // if mode is KV_INS, restore items matching ID
484                                    wal_insert(&file->global_txn, file, &cmp_info,
485                                               &wal_doc, doc_offset,
486                                               WAL_INS_WRITER);
487                                }
488                            } else {
489                                wal_insert(&file->global_txn, file, &cmp_info,
490                                           &wal_doc, doc_offset,
491                                           WAL_INS_WRITER);
492                            }
493                            if (doc.key) free(doc.key);
494                        } else {
495                            // snapshot
496                            if (handle->kvs) {
497                                fdb_kvs_id_t kv_id;
498                                buf2kvid(handle->config.chunksize,
499                                         wal_doc.key, &kv_id);
500                                if (kv_id == handle->kvs->id) {
501                                    // snapshot: insert ID matched documents only
502                                    wal_snap_insert(handle->shandle,
503                                                    &wal_doc, doc_offset);
504                                } else {
505                                    free(doc.key);
506                                }
507                            } else {
508                                wal_snap_insert(handle->shandle, &wal_doc,
509                                                doc_offset);
510                            }
511                        }
512                        free(doc.meta);
513                        free(doc.body);
514                        offset = _offset;
515                    } else {
516                        // skip transactional document or system document
517                        free(doc.key);
518                        free(doc.meta);
519                        free(doc.body);
520                        offset = _offset;
521                        // do not break.. read next doc
522                    }
523                } else {
524                    free(doc.key);
525                    free(doc.meta);
526                    free(doc.body);
527                    offset = _offset;
528                    break;
529                }
530            } while (offset + sizeof(struct docio_length) < doc_scan_limit);
531        }
532
533        if (next_doc_block != BLK_NOT_FOUND) {
534            offset = next_doc_block * blocksize;
535            next_doc_block = BLK_NOT_FOUND;
536        } else {
537            offset = ((offset / blocksize) + 1) * blocksize;
538        }
539        if (ver_superblock_support(handle->file->version) &&
540            offset >= filesize) {
541            // circular scan
542            offset = blocksize * handle->file->sb->config->num_sb;
543            cur_bmp_revnum++;
544        }
545    } while(true);
546
547    // wal commit
548    if (!handle->shandle) {
549        wal_commit(&file->global_txn, file, NULL, &handle->log_callback);
550        filemgr_mutex_unlock(file);
551    }
552    handle->dhandle->log_callback = log_callback;
553}
554
555INLINE fdb_status _fdb_recover_compaction(fdb_kvs_handle *handle,
556                                          const char *new_filename)
557{
558    fdb_kvs_handle new_db;
559    fdb_config config = handle->config;
560    struct filemgr *new_file;
561
562    memset(&new_db, 0, sizeof(new_db));
563    new_db.log_callback.callback = handle->log_callback.callback;
564    new_db.log_callback.ctx_data = handle->log_callback.ctx_data;
565    config.flags |= FDB_OPEN_FLAG_RDONLY;
566    new_db.fhandle = handle->fhandle;
567    new_db.kvs_config = handle->kvs_config;
568    fdb_status status = _fdb_open(&new_db, new_filename,
569                                  FDB_AFILENAME, &config);
570    if (status != FDB_RESULT_SUCCESS) {
571        return fdb_log(&handle->log_callback, status,
572                       "Error in opening a partially compacted file '%s' for recovery.",
573                       new_filename);
574    }
575
576    new_file = new_db.file;
577
578    if (new_file->old_filename &&
579        !strncmp(new_file->old_filename, handle->file->filename,
580                 FDB_MAX_FILENAME_LEN)) {
581        struct filemgr *old_file = handle->file;
582        // If new file has a recorded old_filename then it means that
583        // compaction has completed successfully. Mark self for deletion
584        filemgr_mutex_lock(new_file);
585
586        status = btreeblk_end(handle->bhandle);
587        if (status != FDB_RESULT_SUCCESS) {
588            filemgr_mutex_unlock(new_file);
589            _fdb_close(&new_db);
590            return status;
591        }
592        btreeblk_free(handle->bhandle);
593        free(handle->bhandle);
594        handle->bhandle = new_db.bhandle;
595
596        docio_free(handle->dhandle);
597        free(handle->dhandle);
598        handle->dhandle = new_db.dhandle;
599
600        hbtrie_free(handle->trie);
601        free(handle->trie);
602        handle->trie = new_db.trie;
603
604        wal_shutdown(handle->file, &handle->log_callback);
605        handle->file = new_file;
606
607        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
608            if (handle->kvs) {
609                // multi KV instance mode
610                hbtrie_free(handle->seqtrie);
611                free(handle->seqtrie);
612                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
613                    handle->seqtrie = new_db.seqtrie;
614                }
615            } else {
616                free(handle->seqtree->kv_ops);
617                free(handle->seqtree);
618                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
619                    handle->seqtree = new_db.seqtree;
620                }
621            }
622        }
623        handle->staletree = new_db.staletree;
624
625        filemgr_mutex_unlock(new_file);
626        if (new_db.kvs) {
627            fdb_kvs_info_free(&new_db);
628        }
629        // remove self: WARNING must not close this handle if snapshots
630        // are yet to open this file
631        filemgr_remove_pending(old_file, new_db.file, &new_db.log_callback);
632        filemgr_close(old_file, 0, handle->filename, &handle->log_callback);
633        free(new_db.filename);
634        return FDB_RESULT_FAIL_BY_COMPACTION;
635    }
636
637    // As the new file is partially compacted, it should be removed upon close.
638    // Just in-case the new file gets opened before removal, point it to the old
639    // file to ensure availability of data.
640    filemgr_remove_pending(new_db.file, handle->file, &handle->log_callback);
641    _fdb_close(&new_db);
642
643    return FDB_RESULT_SUCCESS;
644}
645
646#ifndef SPIN_INITIALIZER
647INLINE void init_initial_lock_status() {
648    // Note that only Windows passes through this routine
649    if (!fdb_initialized) {
650        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
651            // atomically initialize spin lock only once
652            spin_init(&initial_lock);
653            initial_lock_status = 2;
654        } else {
655            // the others .. wait until initializing 'initial_lock' is done
656            // TODO: Need to devise a better way of synchronization on Windows
657            while (initial_lock_status != 2) {
658                Sleep(1);
659            }
660        }
661    }
662}
663#endif
664
665LIBFDB_API
666fdb_status fdb_init(fdb_config *config)
667{
668    fdb_config _config;
669    compactor_config c_config;
670    bgflusher_config bgf_config;
671    struct filemgr_config f_config;
672
673    if (config) {
674        if (validate_fdb_config(config)) {
675            _config = *config;
676        } else {
677            return FDB_RESULT_INVALID_CONFIG;
678        }
679    } else {
680        _config = get_default_config();
681    }
682
683    // global initialization
684    // initialized only once at first time
685    if (!fdb_initialized) {
686
687#ifndef SPIN_INITIALIZER
688        init_initial_lock_status();
689#endif
690
691    }
692    spin_lock(&initial_lock);
693    if (!fdb_initialized) {
694#if !defined(_ANDROID_) && !defined(__ANDROID__)
695        // Some Android devices (e.g., Nexus 6) return incorrect RAM size.
696        // We temporarily disable validity checking of block cache size
697        // on Android platform at this time.
698        double ram_size = (double) get_memory_size();
699        if (ram_size * BCACHE_MEMORY_THRESHOLD < (double) _config.buffercache_size) {
700            spin_unlock(&initial_lock);
701            return FDB_RESULT_TOO_BIG_BUFFER_CACHE;
702        }
703#endif
704        // initialize file manager and block cache
705        f_config.blocksize = _config.blocksize;
706        f_config.ncacheblock = _config.buffercache_size / _config.blocksize;
707        filemgr_init(&f_config);
708        filemgr_set_lazy_file_deletion(true,
709                                       compactor_register_file_removing,
710                                       compactor_is_file_removed);
711        if (ver_superblock_support(ver_get_latest_magic())) {
712            struct sb_ops sb_ops = {sb_init, sb_get_default_config,
713                                    sb_read_latest, sb_alloc_block,
714                                    sb_bmp_is_writable, sb_get_bmp_revnum,
715                                    sb_get_min_live_revnum, sb_free};
716            filemgr_set_sb_operation(sb_ops);
717            sb_bmp_mask_init();
718        }
719
720        // initialize compaction daemon
721        c_config.sleep_duration = _config.compactor_sleep_duration;
722        c_config.num_threads = _config.num_compactor_threads;
723        compactor_init(&c_config);
724        // initialize background flusher daemon
725        // Temporarily disable background flushers until blockcache contention
726        // issue is resolved.
727        bgf_config.num_threads = 0; //_config.num_bgflusher_threads;
728        bgflusher_init(&bgf_config);
729
730        // Initialize breakpad
731        _dbg_handle_crashes(config->breakpad_minidump_dir);
732
733        fdb_initialized = 1;
734    }
735    spin_unlock(&initial_lock);
736
737    return FDB_RESULT_SUCCESS;
738}
739
740LIBFDB_API
741fdb_config fdb_get_default_config(void) {
742    return get_default_config();
743}
744
745LIBFDB_API
746fdb_kvs_config fdb_get_default_kvs_config(void) {
747    return get_default_kvs_config();
748}
749
750LIBFDB_API
751fdb_status fdb_open(fdb_file_handle **ptr_fhandle,
752                    const char *filename,
753                    fdb_config *fconfig)
754{
755#ifdef _MEMPOOL
756    mempool_init();
757#endif
758
759    fdb_config config;
760    fdb_file_handle *fhandle;
761    fdb_kvs_handle *handle;
762
763    if (fconfig) {
764        if (validate_fdb_config(fconfig)) {
765            config = *fconfig;
766        } else {
767            return FDB_RESULT_INVALID_CONFIG;
768        }
769    } else {
770        config = get_default_config();
771    }
772
773    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
774    if (!fhandle) { // LCOV_EXCL_START
775        return FDB_RESULT_ALLOC_FAIL;
776    } // LCOV_EXCL_STOP
777
778    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
779    if (!handle) { // LCOV_EXCL_START
780        free(fhandle);
781        return FDB_RESULT_ALLOC_FAIL;
782    } // LCOV_EXCL_STOP
783
784#ifndef SPIN_INITIALIZER
785    init_initial_lock_status();
786#endif
787
788    spin_lock(&initial_lock);
789    fdb_open_inprog++;
790    spin_unlock(&initial_lock);
791
792    atomic_init_uint8_t(&handle->handle_busy, 0);
793    handle->shandle = NULL;
794    handle->kvs_config = get_default_kvs_config();
795
796    fdb_status fs = fdb_init(fconfig);
797    if (fs != FDB_RESULT_SUCCESS) {
798        free(handle);
799        free(fhandle);
800        spin_lock(&initial_lock);
801        fdb_open_inprog--;
802        spin_unlock(&initial_lock);
803        return fs;
804    }
805    fdb_file_handle_init(fhandle, handle);
806
807    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
808    if (fs == FDB_RESULT_SUCCESS) {
809        *ptr_fhandle = fhandle;
810        filemgr_fhandle_add(handle->file, fhandle);
811    } else {
812        *ptr_fhandle = NULL;
813        free(handle);
814        fdb_file_handle_free(fhandle);
815    }
816    spin_lock(&initial_lock);
817    fdb_open_inprog--;
818    spin_unlock(&initial_lock);
819    return fs;
820}
821
822LIBFDB_API
823fdb_status fdb_open_custom_cmp(fdb_file_handle **ptr_fhandle,
824                               const char *filename,
825                               fdb_config *fconfig,
826                               size_t num_functions,
827                               char **kvs_names,
828                               fdb_custom_cmp_variable *functions)
829{
830#ifdef _MEMPOOL
831    mempool_init();
832#endif
833
834    fdb_config config;
835    fdb_file_handle *fhandle;
836    fdb_kvs_handle *handle;
837
838    if (fconfig) {
839        if (validate_fdb_config(fconfig)) {
840            config = *fconfig;
841        } else {
842            return FDB_RESULT_INVALID_CONFIG;
843        }
844    } else {
845        config = get_default_config();
846    }
847
848    if (config.multi_kv_instances == false) {
849        // single KV instance mode does not support customized cmp function
850        return FDB_RESULT_INVALID_CONFIG;
851    }
852
853    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
854    if (!fhandle) { // LCOV_EXCL_START
855        return FDB_RESULT_ALLOC_FAIL;
856    } // LCOV_EXCL_STOP
857
858    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
859    if (!handle) { // LCOV_EXCL_START
860        free(fhandle);
861        return FDB_RESULT_ALLOC_FAIL;
862    } // LCOV_EXCL_STOP
863
864#ifndef SPIN_INITIALIZER
865    init_initial_lock_status();
866#endif
867
868    spin_lock(&initial_lock);
869    fdb_open_inprog++;
870    spin_unlock(&initial_lock);
871
872    atomic_init_uint8_t(&handle->handle_busy, 0);
873    handle->shandle = NULL;
874    handle->kvs_config = get_default_kvs_config();
875
876    fdb_status fs = fdb_init(fconfig);
877    if (fs != FDB_RESULT_SUCCESS) {
878        free(handle);
879        free(fhandle);
880        spin_lock(&initial_lock);
881        fdb_open_inprog--;
882        spin_unlock(&initial_lock);
883        return fs;
884    }
885    fdb_file_handle_init(fhandle, handle);
886
887    // insert kvs_names and functions into fhandle's list
888    fdb_file_handle_parse_cmp_func(fhandle, num_functions,
889                                   kvs_names, functions);
890
891    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
892    if (fs == FDB_RESULT_SUCCESS) {
893        *ptr_fhandle = fhandle;
894        filemgr_fhandle_add(handle->file, fhandle);
895    } else {
896        *ptr_fhandle = NULL;
897        free(handle);
898        fdb_file_handle_free(fhandle);
899    }
900    spin_lock(&initial_lock);
901    fdb_open_inprog--;
902    spin_unlock(&initial_lock);
903    return fs;
904}
905
906fdb_status fdb_open_for_compactor(fdb_file_handle **ptr_fhandle,
907                                  const char *filename,
908                                  fdb_config *fconfig,
909                                  struct list *cmp_func_list)
910{
911#ifdef _MEMPOOL
912    mempool_init();
913#endif
914
915    fdb_file_handle *fhandle;
916    fdb_kvs_handle *handle;
917
918    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
919    if (!fhandle) { // LCOV_EXCL_START
920        return FDB_RESULT_ALLOC_FAIL;
921    } // LCOV_EXCL_STOP
922
923    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
924    if (!handle) { // LCOV_EXCL_START
925        free(fhandle);
926        return FDB_RESULT_ALLOC_FAIL;
927    } // LCOV_EXCL_STOP
928
929    atomic_init_uint8_t(&handle->handle_busy, 0);
930    handle->shandle = NULL;
931
932    fdb_file_handle_init(fhandle, handle);
933    if (cmp_func_list && list_begin(cmp_func_list)) {
934        fdb_file_handle_clone_cmp_func_list(fhandle, cmp_func_list);
935    }
936    fdb_status fs = _fdb_open(handle, filename, FDB_VFILENAME, fconfig);
937    if (fs == FDB_RESULT_SUCCESS) {
938        *ptr_fhandle = fhandle;
939        filemgr_fhandle_add(handle->file, fhandle);
940    } else {
941        *ptr_fhandle = NULL;
942        free(handle);
943        fdb_file_handle_free(fhandle);
944    }
945    return fs;
946}
947
948LIBFDB_API
949fdb_status fdb_snapshot_open(fdb_kvs_handle *handle_in,
950                             fdb_kvs_handle **ptr_handle, fdb_seqnum_t seqnum)
951{
952#ifdef _MEMPOOL
953    mempool_init();
954#endif
955
956    fdb_config config = handle_in->config;
957    fdb_kvs_config kvs_config = handle_in->kvs_config;
958    fdb_kvs_id_t kv_id = 0;
959    fdb_kvs_handle *handle;
960    fdb_txn *txn = NULL;
961    fdb_status fs = FDB_RESULT_SUCCESS;
962    filemgr *file;
963    file_status_t fstatus = FILE_NORMAL;
964    struct snap_handle dummy_shandle;
965    struct _fdb_key_cmp_info cmp_info;
966    LATENCY_STAT_START();
967
968    if (!handle_in || !ptr_handle) {
969        return FDB_RESULT_INVALID_ARGS;
970    }
971
972fdb_snapshot_open_start:
973    if (!handle_in->shandle) {
974        fdb_check_file_reopen(handle_in, &fstatus);
975        fdb_sync_db_header(handle_in);
976        file = handle_in->file;
977
978        if (handle_in->kvs && handle_in->kvs->type == KVS_SUB) {
979            handle_in->seqnum = fdb_kvs_get_seqnum(file,
980                                                   handle_in->kvs->id);
981        } else {
982            handle_in->seqnum = filemgr_get_seqnum(file);
983        }
984    } else {
985        file = handle_in->file;
986    }
987
988    // if the max sequence number seen by this handle is lower than the
989    // requested snapshot marker, it means the snapshot is not yet visible
990    // even via the current fdb_kvs_handle
991    if (seqnum != FDB_SNAPSHOT_INMEM && seqnum > handle_in->seqnum) {
992        return FDB_RESULT_NO_DB_INSTANCE;
993    }
994
995    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
996    if (!handle) { // LCOV_EXCL_START
997        return FDB_RESULT_ALLOC_FAIL;
998    } // LCOV_EXCL_STOP
999
1000    atomic_init_uint8_t(&handle->handle_busy, 0);
1001    handle->log_callback = handle_in->log_callback;
1002    handle->max_seqnum = seqnum;
1003    handle->fhandle = handle_in->fhandle;
1004
1005    config.flags |= FDB_OPEN_FLAG_RDONLY;
1006    // do not perform compaction for snapshot
1007    config.compaction_mode = FDB_COMPACTION_MANUAL;
1008
1009    // If cloning an existing snapshot handle, then rewind indexes
1010    // to its last DB header and point its avl tree to existing snapshot's tree
1011    bool clone_snapshot = false;
1012    if (handle_in->shandle) {
1013        handle->last_hdr_bid = handle_in->last_hdr_bid; // do fast rewind
1014        fs = wal_snapshot_clone(handle_in->shandle, &handle->shandle, seqnum);
1015        if (fs == FDB_RESULT_SUCCESS) {
1016            clone_snapshot = true;
1017            handle->max_seqnum = FDB_SNAPSHOT_INMEM; // temp value to skip WAL
1018        } else {
1019            fdb_log(&handle_in->log_callback, fs,
1020                    "Warning: Snapshot clone at sequence number %" _F64
1021                    "does not match its snapshot handle %" _F64
1022                    "in file '%s'.", seqnum, handle_in->seqnum,
1023                    handle_in->file->filename);
1024            free(handle);
1025            return fs;
1026        }
1027    }
1028
1029    cmp_info.kvs_config = handle_in->kvs_config;
1030    cmp_info.kvs = handle_in->kvs;
1031
1032    if (!handle->shandle) {
1033        txn = handle_in->fhandle->root->txn;
1034        if (!txn) {
1035            txn = &handle_in->file->global_txn;
1036        }
1037        if (handle_in->kvs) {
1038            kv_id = handle_in->kvs->id;
1039        }
1040        if (seqnum == FDB_SNAPSHOT_INMEM) {
1041            memset(&dummy_shandle, 0, sizeof(struct snap_handle));
1042            // tmp value to denote snapshot & not rollback to _fdb_open
1043            handle->shandle = &dummy_shandle; // dummy
1044        } else {
1045            fs = wal_dur_snapshot_open(seqnum, &cmp_info, file, txn,
1046                                       &handle->shandle);
1047        }
1048        if (fs != FDB_RESULT_SUCCESS) {
1049            free(handle);
1050            return fs;
1051        }
1052    }
1053
1054    if (handle_in->kvs) {
1055        // sub-handle in multi KV instance mode
1056        if (clone_snapshot) {
1057            fs = _fdb_kvs_clone_snapshot(handle_in, handle);
1058        } else {
1059            fs = _fdb_kvs_open(handle_in->kvs->root,
1060                              &config, &kvs_config, file,
1061                              file->filename,
1062                              _fdb_kvs_get_name(handle_in, file),
1063                              handle);
1064        }
1065    } else {
1066        if (clone_snapshot) {
1067            fs = _fdb_clone_snapshot(handle_in, handle);
1068        } else {
1069            fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1070        }
1071    }
1072
1073    if (fs == FDB_RESULT_SUCCESS) {
1074        if (seqnum == FDB_SNAPSHOT_INMEM &&
1075            !handle_in->shandle) {
1076            handle->max_seqnum = handle_in->seqnum;
1077
1078            // synchronize dirty root nodes if exist
1079            bid_t dirty_idtree_root = BLK_NOT_FOUND;
1080            bid_t dirty_seqtree_root = BLK_NOT_FOUND;
1081            struct filemgr_dirty_update_node *dirty_update;
1082
1083            dirty_update = filemgr_dirty_update_get_latest(handle->file);
1084            btreeblk_set_dirty_update(handle->bhandle, dirty_update);
1085
1086            if (dirty_update) {
1087                filemgr_dirty_update_get_root(handle->file, dirty_update,
1088                                       &dirty_idtree_root, &dirty_seqtree_root);
1089                _fdb_import_dirty_root(handle, dirty_idtree_root,
1090                                       dirty_seqtree_root);
1091                btreeblk_discard_blocks(handle->bhandle);
1092            }
1093            // Having synced the dirty root, make an in-memory WAL snapshot
1094            // TODO: Re-enable WAL sharing once ready...
1095#ifdef _MVCC_WAL_ENABLE
1096            fs = wal_snapshot_open(handle->file, txn, kv_id, seqnum,
1097                                   &cmp_info, &handle->shandle);
1098#else
1099            fs = wal_dur_snapshot_open(handle->seqnum, &cmp_info, file, txn,
1100                                       &handle->shandle);
1101            if (fs == FDB_RESULT_SUCCESS) {
1102                fs = wal_copyto_snapshot(file, handle->shandle,
1103                                        (bool)handle_in->kvs);
1104            }
1105            (void)kv_id;
1106#endif // _MVCC_WAL_ENABLE
1107        } else if (clone_snapshot) {
1108            // Snapshot is created on the other snapshot handle
1109
1110            handle->max_seqnum = handle_in->seqnum;
1111
1112            if (seqnum == FDB_SNAPSHOT_INMEM) {
1113                // in-memory snapshot
1114                // Clone dirty root nodes from the source snapshot by incrementing
1115                // their ref counters
1116                handle->trie->root_bid = handle_in->trie->root_bid;
1117                if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1118                    if (handle->kvs) {
1119                        handle->seqtrie->root_bid = handle_in->seqtrie->root_bid;
1120                    } else {
1121                        handle->seqtree->root_bid = handle_in->seqtree->root_bid;
1122                    }
1123                }
1124                btreeblk_discard_blocks(handle->bhandle);
1125
1126                // increase ref count for dirty update
1127                struct filemgr_dirty_update_node *dirty_update;
1128                dirty_update = btreeblk_get_dirty_update(handle_in->bhandle);
1129                filemgr_dirty_update_inc_ref_count(dirty_update);
1130                btreeblk_set_dirty_update(handle->bhandle, dirty_update);
1131            }
1132        }
1133        *ptr_handle = handle;
1134    } else {
1135        *ptr_handle = NULL;
1136        if (clone_snapshot || seqnum != FDB_SNAPSHOT_INMEM) {
1137            wal_snapshot_close(handle->shandle, handle->file);
1138        }
1139        free(handle);
1140        // If compactor thread had finished compaction just before this routine
1141        // calls _fdb_open, then it is possible that the snapshot's DB header
1142        // is only present in the new_file. So we must retry the snapshot
1143        // open attempt IFF _fdb_open indicates FDB_RESULT_NO_DB_INSTANCE..
1144        if (fs == FDB_RESULT_NO_DB_INSTANCE && fstatus == FILE_COMPACT_OLD) {
1145            if (filemgr_get_file_status(file) == FILE_REMOVED_PENDING) {
1146                goto fdb_snapshot_open_start;
1147            }
1148        }
1149    }
1150    if (seqnum == FDB_SNAPSHOT_INMEM) {
1151        LATENCY_STAT_END(file, FDB_LATENCY_SNAPSHOTS);
1152    } else {
1153        LATENCY_STAT_END(file, FDB_LATENCY_SNAPSHOT_DUR);
1154    }
1155    return fs;
1156}
1157
1158static fdb_status _fdb_reset(fdb_kvs_handle *handle, fdb_kvs_handle *handle_in);
1159
1160LIBFDB_API
1161fdb_status fdb_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
1162{
1163#ifdef _MEMPOOL
1164    mempool_init();
1165#endif
1166
1167    fdb_config config;
1168    fdb_kvs_handle *handle_in, *handle;
1169    fdb_status fs;
1170    fdb_seqnum_t old_seqnum;
1171
1172    if (!handle_ptr) {
1173        return FDB_RESULT_INVALID_ARGS;
1174    }
1175
1176    handle_in = *handle_ptr;
1177    config = handle_in->config;
1178
1179    if (handle_in->kvs) {
1180        return fdb_kvs_rollback(handle_ptr, seqnum);
1181    }
1182
1183    if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
1184        return fdb_log(&handle_in->log_callback, FDB_RESULT_RONLY_VIOLATION,
1185                       "Warning: Rollback is not allowed on the read-only DB file '%s'.",
1186                       handle_in->file->filename);
1187    }
1188
1189    if (!atomic_cas_uint8_t(&handle_in->handle_busy, 0, 1)) {
1190        return FDB_RESULT_HANDLE_BUSY;
1191    }
1192
1193    filemgr_mutex_lock(handle_in->file);
1194    filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
1195    // All transactions should be closed before rollback
1196    if (wal_txn_exists(handle_in->file)) {
1197        filemgr_set_rollback(handle_in->file, 0);
1198        filemgr_mutex_unlock(handle_in->file);
1199        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1200        return FDB_RESULT_FAIL_BY_TRANSACTION;
1201    }
1202
1203    // If compaction is running, wait until it is aborted.
1204    // TODO: Find a better way of waiting for the compaction abortion.
1205    unsigned int sleep_time = 10000; // 10 ms.
1206    file_status_t fstatus = filemgr_get_file_status(handle_in->file);
1207    while (fstatus == FILE_COMPACT_OLD) {
1208        filemgr_mutex_unlock(handle_in->file);
1209        decaying_usleep(&sleep_time, 1000000);
1210        filemgr_mutex_lock(handle_in->file);
1211        fstatus = filemgr_get_file_status(handle_in->file);
1212    }
1213    if (fstatus == FILE_REMOVED_PENDING) {
1214        filemgr_mutex_unlock(handle_in->file);
1215        fdb_check_file_reopen(handle_in, NULL);
1216    } else {
1217        filemgr_mutex_unlock(handle_in->file);
1218    }
1219
1220    fdb_sync_db_header(handle_in);
1221
1222    // if the max sequence number seen by this handle is lower than the
1223    // requested snapshot marker, it means the snapshot is not yet visible
1224    // even via the current fdb_kvs_handle
1225    if (seqnum > handle_in->seqnum) {
1226        filemgr_set_rollback(handle_in->file, 0); // allow mutations
1227        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1228        return FDB_RESULT_NO_DB_INSTANCE;
1229    }
1230
1231    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1232    if (!handle) { // LCOV_EXCL_START
1233        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1234        return FDB_RESULT_ALLOC_FAIL;
1235    } // LCOV_EXCL_STOP
1236
1237    atomic_init_uint8_t(&handle->handle_busy, 0);
1238    handle->log_callback = handle_in->log_callback;
1239    handle->fhandle = handle_in->fhandle;
1240    if (seqnum == 0) {
1241        fs = _fdb_reset(handle, handle_in);
1242    } else {
1243        handle->max_seqnum = seqnum;
1244        fs = _fdb_open(handle, handle_in->file->filename, FDB_AFILENAME,
1245                       &config);
1246    }
1247
1248    filemgr_set_rollback(handle_in->file, 0); // allow mutations
1249    if (fs == FDB_RESULT_SUCCESS) {
1250        // rollback the file's sequence number
1251        filemgr_mutex_lock(handle_in->file);
1252        old_seqnum = filemgr_get_seqnum(handle_in->file);
1253        filemgr_set_seqnum(handle_in->file, seqnum);
1254        filemgr_mutex_unlock(handle_in->file);
1255
1256        fs = _fdb_commit(handle, FDB_COMMIT_MANUAL_WAL_FLUSH,
1257                !(handle_in->config.durability_opt & FDB_DRB_ASYNC));
1258        if (fs == FDB_RESULT_SUCCESS) {
1259            if (handle_in->txn) {
1260                handle->txn = handle_in->txn;
1261                handle_in->txn = NULL;
1262            }
1263            handle_in->fhandle->root = handle;
1264            _fdb_close_root(handle_in);
1265            handle->max_seqnum = 0;
1266            handle->seqnum = seqnum;
1267            *handle_ptr = handle;
1268        } else {
1269            // cancel the rolling-back of the sequence number
1270            filemgr_mutex_lock(handle_in->file);
1271            filemgr_set_seqnum(handle_in->file, old_seqnum);
1272            filemgr_mutex_unlock(handle_in->file);
1273            free(handle);
1274            atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1275        }
1276    } else {
1277        free(handle);
1278        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1279    }
1280
1281    return fs;
1282}
1283
1284LIBFDB_API
1285fdb_status fdb_rollback_all(fdb_file_handle *fhandle,
1286                            fdb_snapshot_marker_t marker)
1287{
1288#ifdef _MEMPOOL
1289    mempool_init();
1290#endif
1291
1292    fdb_config config;
1293    fdb_kvs_handle *super_handle;
1294    fdb_kvs_handle rhandle;
1295    fdb_kvs_handle *handle = &rhandle;
1296    struct filemgr *file;
1297    fdb_kvs_config kvs_config;
1298    fdb_status fs;
1299    err_log_callback log_callback;
1300    struct kvs_info *kvs;
1301    struct snap_handle shandle; // dummy snap handle
1302
1303    if (!fhandle) {
1304        return FDB_RESULT_INVALID_ARGS;
1305    }
1306
1307    super_handle = fhandle->root;
1308    kvs = super_handle->kvs;
1309
1310    // fdb_rollback_all cannot be allowed when there are kv store instances
1311    // still open, because we do not have means of invalidating open kv handles
1312    // which may not be present in the rollback point
1313    if (kvs && _fdb_kvs_is_busy(fhandle)) {
1314        return FDB_RESULT_KV_STORE_BUSY;
1315    }
1316    file = super_handle->file;
1317    config = super_handle->config;
1318    kvs_config = super_handle->kvs_config;
1319    log_callback = super_handle->log_callback;
1320
1321    if (super_handle->config.flags & FDB_OPEN_FLAG_RDONLY) {
1322        return fdb_log(&super_handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
1323                       "Warning: Rollback is not allowed on the read-only DB file '%s'.",
1324                       super_handle->file->filename);
1325    }
1326
1327    filemgr_mutex_lock(super_handle->file);
1328    filemgr_set_rollback(super_handle->file, 1); // disallow writes operations
1329    // All transactions should be closed before rollback
1330    if (wal_txn_exists(super_handle->file)) {
1331        filemgr_set_rollback(super_handle->file, 0);
1332        filemgr_mutex_unlock(super_handle->file);
1333        return FDB_RESULT_FAIL_BY_TRANSACTION;
1334    }
1335
1336    // If compaction is running, wait until it is aborted.
1337    // TODO: Find a better way of waiting for the compaction abortion.
1338    unsigned int sleep_time = 10000; // 10 ms.
1339    file_status_t fstatus = filemgr_get_file_status(super_handle->file);
1340    while (fstatus == FILE_COMPACT_OLD) {
1341        filemgr_mutex_unlock(super_handle->file);
1342        decaying_usleep(&sleep_time, 1000000);
1343        filemgr_mutex_lock(super_handle->file);
1344        fstatus = filemgr_get_file_status(super_handle->file);
1345    }
1346    if (fstatus == FILE_REMOVED_PENDING) {
1347        filemgr_mutex_unlock(super_handle->file);
1348        fdb_check_file_reopen(super_handle, NULL);
1349    } else {
1350        filemgr_mutex_unlock(super_handle->file);
1351    }
1352
1353    fdb_sync_db_header(super_handle);
1354    // Shutdown WAL discarding entries from all KV Stores..
1355    fs = wal_shutdown(super_handle->file, &super_handle->log_callback);
1356    if (fs != FDB_RESULT_SUCCESS) {
1357        return fs;
1358    }
1359
1360    memset(handle, 0, sizeof(fdb_kvs_handle));
1361    memset(&shandle, 0, sizeof(struct snap_handle));
1362    handle->log_callback = log_callback;
1363    handle->fhandle = fhandle;
1364    handle->last_hdr_bid = (bid_t)marker; // Fast rewind on open
1365    handle->max_seqnum = FDB_SNAPSHOT_INMEM; // Prevent WAL restore on open
1366    handle->shandle = &shandle; // a dummy handle to prevent WAL restore
1367    if (kvs) {
1368        fdb_kvs_header_free(file); // KV header will be recreated below.
1369        handle->kvs = kvs; // re-use super_handle's kvs info
1370        handle->kvs_config = kvs_config;
1371    }
1372    handle->config = config;
1373
1374    fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1375
1376    if (handle->config.multi_kv_instances) {
1377        filemgr_mutex_lock(handle->file);
1378        fdb_kvs_header_create(handle->file);
1379        fdb_kvs_header_read(handle->file->kv_header, handle->dhandle,
1380                            handle->kv_info_offset,
1381                            handle->file->version, false);
1382        filemgr_mutex_unlock(handle->file);
1383    }
1384
1385    filemgr_set_rollback(file, 0); // allow mutations
1386    handle->shandle = NULL; // just a dummy handle never allocated
1387
1388    if (fs == FDB_RESULT_SUCCESS) {
1389        fdb_seqnum_t old_seqnum;
1390        // Restore WAL for all KV instances...
1391        _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, (bid_t)marker, 0);
1392
1393        // rollback the file's sequence number
1394        filemgr_mutex_lock(file);
1395        old_seqnum = filemgr_get_seqnum(file);
1396        filemgr_set_seqnum(file, handle->seqnum);
1397        filemgr_mutex_unlock(file);
1398
1399        fs = _fdb_commit(handle, FDB_COMMIT_NORMAL,
1400                         !(handle->config.durability_opt & FDB_DRB_ASYNC));
1401        if (fs == FDB_RESULT_SUCCESS) {
1402            _fdb_close(super_handle);
1403            *super_handle = *handle;
1404        } else {
1405            filemgr_mutex_lock(file);
1406            filemgr_set_seqnum(file, old_seqnum);
1407            filemgr_mutex_unlock(file);
1408        }
1409    } else { // Rollback failed, restore KV header
1410        fdb_kvs_header_create(file);
1411        fdb_kvs_header_read(file->kv_header, super_handle->dhandle,
1412                            super_handle->kv_info_offset,
1413                            ver_get_latest_magic(),
1414                            false);
1415    }
1416
1417    return fs;
1418}
1419
1420static void _fdb_init_file_config(const fdb_config *config,
1421                                  struct filemgr_config *fconfig) {
1422    fconfig->blocksize = config->blocksize;
1423    fconfig->ncacheblock = config->buffercache_size / config->blocksize;
1424    fconfig->chunksize = config->chunksize;
1425
1426    fconfig->options = 0x0;
1427    if (config->flags & FDB_OPEN_FLAG_CREATE) {
1428        fconfig->options |= FILEMGR_CREATE;
1429    }
1430    if (config->flags & FDB_OPEN_FLAG_RDONLY) {
1431        fconfig->options |= FILEMGR_READONLY;
1432    }
1433    if (!(config->durability_opt & FDB_DRB_ASYNC)) {
1434        fconfig->options |= FILEMGR_SYNC;
1435    }
1436
1437    fconfig->flag = 0x0;
1438    if ((config->durability_opt & FDB_DRB_ODIRECT) &&
1439        config->buffercache_size) {
1440        fconfig->flag |= _ARCH_O_DIRECT;
1441    }
1442
1443    fconfig->prefetch_duration = config->prefetch_duration;
1444    fconfig->num_wal_shards = config->num_wal_partitions;
1445    fconfig->num_bcache_shards = config->num_bcache_partitions;
1446    fconfig->encryption_key = config->encryption_key;
1447    atomic_store_uint64_t(&fconfig->block_reusing_threshold,
1448                          config->block_reusing_threshold,
1449                          std::memory_order_relaxed);
1450    atomic_store_uint64_t(&fconfig->num_keeping_headers,
1451                          config->num_keeping_headers,
1452                          std::memory_order_relaxed);
1453}
1454
1455fdb_status _fdb_clone_snapshot(fdb_kvs_handle *handle_in,
1456                               fdb_kvs_handle *handle_out)
1457{
1458    fdb_status status;
1459
1460    handle_out->config = handle_in->config;
1461    handle_out->kvs_config = handle_in->kvs_config;
1462    handle_out->fileops = handle_in->fileops;
1463    handle_out->file = handle_in->file;
1464    // Note that the file ref count will be decremented when the cloned snapshot
1465    // is closed through filemgr_close().
1466    filemgr_incr_ref_count(handle_out->file);
1467
1468    if (handle_out->filename) {
1469        handle_out->filename = (char *)realloc(handle_out->filename,
1470                                               strlen(handle_in->filename)+1);
1471    } else {
1472        handle_out->filename = (char*)malloc(strlen(handle_in->filename)+1);
1473    }
1474    strcpy(handle_out->filename, handle_in->filename);
1475
1476    // initialize the docio handle.
1477    handle_out->dhandle = (struct docio_handle *)
1478        calloc(1, sizeof(struct docio_handle));
1479    handle_out->dhandle->log_callback = &handle_out->log_callback;
1480    docio_init(handle_out->dhandle, handle_out->file,
1481               handle_out->config.compress_document_body);
1482
1483    // initialize the btree block handle.
1484    handle_out->btreeblkops = btreeblk_get_ops();
1485    handle_out->bhandle = (struct btreeblk_handle *)
1486        calloc(1, sizeof(struct btreeblk_handle));
1487    handle_out->bhandle->log_callback = &handle_out->log_callback;
1488    btreeblk_init(handle_out->bhandle, handle_out->file, handle_out->file->blocksize);
1489
1490    handle_out->dirty_updates = handle_in->dirty_updates;
1491    atomic_store_uint64_t(&handle_out->cur_header_revnum, handle_in->cur_header_revnum);
1492    handle_out->last_wal_flush_hdr_bid = handle_in->last_wal_flush_hdr_bid;
1493    handle_out->kv_info_offset = handle_in->kv_info_offset;
1494    handle_out->shandle->stat = handle_in->shandle->stat;
1495    handle_out->op_stats = handle_in->op_stats;
1496
1497    // initialize the trie handle
1498    handle_out->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1499    hbtrie_init(handle_out->trie, handle_out->config.chunksize, OFFSET_SIZE,
1500                handle_out->file->blocksize,
1501                handle_in->trie->root_bid, // Source snapshot's trie root bid
1502                (void *)handle_out->bhandle, handle_out->btreeblkops,
1503                (void *)handle_out->dhandle, _fdb_readkey_wrap);
1504    // set aux for cmp wrapping function
1505    hbtrie_set_leaf_height_limit(handle_out->trie, 0xff);
1506    hbtrie_set_leaf_cmp(handle_out->trie, _fdb_custom_cmp_wrap);
1507
1508    if (handle_out->kvs) {
1509        hbtrie_set_map_function(handle_out->trie, fdb_kvs_find_cmp_chunk);
1510    }
1511
1512    handle_out->seqnum = handle_in->seqnum;
1513    if (handle_out->config.seqtree_opt == FDB_SEQTREE_USE) {
1514        if (handle_out->config.multi_kv_instances) {
1515            // multi KV instance mode .. HB+trie
1516            handle_out->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1517            hbtrie_init(handle_out->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
1518                        handle_out->file->blocksize,
1519                        handle_in->seqtrie->root_bid, // Source snapshot's seqtrie root bid
1520                        (void *)handle_out->bhandle, handle_out->btreeblkops,
1521                        (void *)handle_out->dhandle, _fdb_readseq_wrap);
1522
1523        } else {
1524            // single KV instance mode .. normal B+tree
1525            struct btree_kv_ops *seq_kv_ops =
1526                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
1527            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
1528            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
1529
1530            handle_out->seqtree = (struct btree*)malloc(sizeof(struct btree));
1531            // Init the seq tree using the root bid of the source snapshot.
1532            btree_init_from_bid(handle_out->seqtree, (void *)handle_out->bhandle,
1533                                handle_out->btreeblkops, seq_kv_ops,
1534                                handle_out->config.blocksize,
1535                                handle_in->seqtree->root_bid);
1536        }
1537    } else{
1538        handle_out->seqtree = NULL;
1539    }
1540
1541    status = btreeblk_end(handle_out->bhandle);
1542    if (status != FDB_RESULT_SUCCESS) {
1543        const char *msg = "Snapshot clone operation fails due to the errors in "
1544            "btreeblk_end() in a database file '%s'\n";
1545        fdb_log(&handle_in->log_callback, status, msg, handle_in->file->filename);
1546    }
1547
1548    return status;
1549}
1550
1551fdb_status _fdb_open(fdb_kvs_handle *handle,
1552                     const char *filename,
1553                     fdb_filename_mode_t filename_mode,
1554                     const fdb_config *config)
1555{
1556    struct filemgr_config fconfig;
1557    struct kvs_stat stat, empty_stat;
1558    bid_t trie_root_bid = BLK_NOT_FOUND;
1559    bid_t seq_root_bid = BLK_NOT_FOUND;
1560    bid_t stale_root_bid = BLK_NOT_FOUND;
1561    fdb_seqnum_t seqnum = 0;
1562    filemgr_header_revnum_t header_revnum = 0;
1563    filemgr_header_revnum_t latest_header_revnum = 0;
1564    fdb_seqtree_opt_t seqtree_opt = config->seqtree_opt;
1565    uint64_t ndocs = 0;
1566    uint64_t ndeletes = 0;
1567    uint64_t datasize = 0;
1568    uint64_t deltasize = 0;
1569    uint64_t last_wal_flush_hdr_bid = BLK_NOT_FOUND;
1570    uint64_t kv_info_offset = BLK_NOT_FOUND;
1571    uint64_t version;
1572    uint64_t header_flags = 0;
1573    uint8_t header_buf[FDB_BLOCKSIZE];
1574    char *compacted_filename = NULL;
1575    char *prev_filename = NULL;
1576    size_t header_len = 0;
1577    bool multi_kv_instances = config->multi_kv_instances;
1578
1579    uint64_t nlivenodes = 0;
1580    bid_t hdr_bid = 0; // initialize to zero for in-memory snapshot
1581    char actual_filename[FDB_MAX_FILENAME_LEN];
1582    char virtual_filename[FDB_MAX_FILENAME_LEN];
1583    char *target_filename = NULL;
1584    fdb_status status;
1585
1586    if (filename == NULL) {
1587        return FDB_RESULT_INVALID_ARGS;
1588    }
1589    if (strlen(filename) > (FDB_MAX_FILENAME_LEN - 8)) {
1590        // filename (including path) length is supported up to
1591        // (FDB_MAX_FILENAME_LEN - 8) bytes.
1592        return FDB_RESULT_TOO_LONG_FILENAME;
1593    }
1594
1595    if (filename_mode == FDB_VFILENAME &&
1596        !compactor_is_valid_mode(filename, (fdb_config *)config)) {
1597        return FDB_RESULT_INVALID_COMPACTION_MODE;
1598    }
1599
1600    _fdb_init_file_config(config, &fconfig);
1601
1602    if (filename_mode == FDB_VFILENAME) {
1603        compactor_get_actual_filename(filename, actual_filename,
1604                                      config->compaction_mode, &handle->log_callback);
1605    } else {
1606        strcpy(actual_filename, filename);
1607    }
1608
1609    if ( config->compaction_mode == FDB_COMPACTION_MANUAL ||
1610         (config->compaction_mode == FDB_COMPACTION_AUTO   &&
1611          filename_mode == FDB_VFILENAME) ) {
1612        // 1) manual compaction mode, OR
1613        // 2) auto compaction mode + 'filename' is virtual filename
1614        // -> copy 'filename'
1615        target_filename = (char *)filename;
1616    } else {
1617        // otherwise (auto compaction mode + 'filename' is actual filename)
1618        // -> copy 'virtual_filename'
1619        compactor_get_virtual_filename(filename, virtual_filename);
1620        target_filename = virtual_filename;
1621    }
1622
1623    // If the user is requesting legacy CRC pass that down to filemgr
1624    if(config->flags & FDB_OPEN_WITH_LEGACY_CRC) {
1625        fconfig.options |= FILEMGR_CREATE_CRC32;
1626    }
1627
1628    handle->fileops = get_filemgr_ops();
1629    filemgr_open_result result = filemgr_open((char *)actual_filename,
1630                                              handle->fileops,
1631                                              &fconfig, &handle->log_callback);
1632    if (result.rv != FDB_RESULT_SUCCESS) {
1633        return (fdb_status) result.rv;
1634    }
1635    handle->file = result.file;
1636
1637    if (config->compaction_mode == FDB_COMPACTION_MANUAL &&
1638        strcmp(filename, actual_filename)) {
1639        // It is in-place compacted file if
1640        // 1) compaction mode is manual, and
1641        // 2) actual filename is different to the filename given by user.
1642        // In this case, set the in-place compaction flag.
1643        filemgr_set_in_place_compaction(handle->file, true);
1644    }
1645    if (filemgr_is_in_place_compaction_set(handle->file)) {
1646        // This file was in-place compacted.
1647        // set 'handle->filename' to the original filename to trigger file renaming
1648        compactor_get_virtual_filename(filename, virtual_filename);
1649        target_filename = virtual_filename;
1650    }
1651
1652    if (handle->filename) {
1653        handle->filename = (char *)realloc(handle->filename,
1654                                           strlen(target_filename)+1);
1655    } else {
1656        handle->filename = (char*)malloc(strlen(target_filename)+1);
1657    }
1658    strcpy(handle->filename, target_filename);
1659
1660    // If cloning from a snapshot handle, fdb_snapshot_open would have already
1661    // set handle->last_hdr_bid to the block id of required header, so rewind..
1662    if (handle->shandle && handle->last_hdr_bid) {
1663        status = filemgr_fetch_header(handle->file, handle->last_hdr_bid,
1664                                      header_buf, &header_len, &seqnum,
1665                                      &latest_header_revnum, &deltasize, &version,
1666                                      NULL, &handle->log_callback);
1667        if (status != FDB_RESULT_SUCCESS) {
1668            free(handle->filename);
1669            handle->filename = NULL;
1670            filemgr_close(handle->file, false, handle->filename,
1671                              &handle->log_callback);
1672            return status;
1673        }
1674    } else { // Normal open
1675        filemgr_get_header(handle->file, header_buf, &header_len,
1676                           &handle->last_hdr_bid, &seqnum, &latest_header_revnum);
1677        version = handle->file->version;
1678    }
1679
1680    // initialize the docio handle so kv headers may be read
1681    handle->dhandle = (struct docio_handle *)
1682                      calloc(1, sizeof(struct docio_handle));
1683    handle->dhandle->log_callback = &handle->log_callback;
1684    docio_init(handle->dhandle, handle->file, config->compress_document_body);
1685
1686    // fetch previous superblock bitmap info if exists
1687    // (this should be done after 'handle->dhandle' is initialized)
1688    if (handle->file->sb) {
1689        status = sb_bmp_fetch_doc(handle);
1690        if (status != FDB_RESULT_SUCCESS) {
1691            docio_free(handle->dhandle);
1692            free(handle->dhandle);
1693            free(handle->filename);
1694            handle->filename = NULL;
1695            filemgr_close(handle->file, false, handle->filename,
1696                              &handle->log_callback);
1697            return status;
1698        }
1699    }
1700
1701
1702    if (header_len > 0) {
1703        fdb_fetch_header(version, header_buf, &trie_root_bid, &seq_root_bid,
1704                         &stale_root_bid, &ndocs, &ndeletes, &nlivenodes,
1705                         &datasize, &last_wal_flush_hdr_bid, &kv_info_offset,
1706                         &header_flags, &compacted_filename, &prev_filename);
1707        // use existing setting for seqtree_opt
1708        if (header_flags & FDB_FLAG_SEQTREE_USE) {
1709            seqtree_opt = FDB_SEQTREE_USE;
1710        } else {
1711            seqtree_opt = FDB_SEQTREE_NOT_USE;
1712        }
1713        // Retrieve seqnum for multi-kv mode
1714        if (handle->kvs && handle->kvs->id > 0) {
1715            if (kv_info_offset != BLK_NOT_FOUND) {
1716                if (!filemgr_get_kv_header(handle->file)) {
1717                    struct kvs_header *kv_header;
1718                    _fdb_kvs_header_create(&kv_header);
1719                    // KV header already exists but not loaded .. read & import
1720                    fdb_kvs_header_read(kv_header, handle->dhandle,
1721                                        kv_info_offset, version, false);
1722                    if (!filemgr_set_kv_header(handle->file, kv_header,
1723                                               fdb_kvs_header_free)) {
1724                        _fdb_kvs_header_free(kv_header);
1725                    }
1726                }
1727                seqnum = _fdb_kvs_get_seqnum(handle->file->kv_header,
1728                                             handle->kvs->id);
1729            } else { // no kv_info offset, ok to set seqnum to zero
1730                seqnum = 0;
1731            }
1732        }
1733        // other flags
1734        if (header_flags & FDB_FLAG_ROOT_INITIALIZED) {
1735            handle->fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1736        }
1737        if (header_flags & FDB_FLAG_ROOT_CUSTOM_CMP) {
1738            handle->fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1739        }
1740        // use existing setting for multi KV instance mode
1741        if (kv_info_offset == BLK_NOT_FOUND) {
1742            multi_kv_instances = false;
1743        } else {
1744            multi_kv_instances = true;
1745        }
1746    }
1747
1748    handle->config = *config;
1749    handle->config.seqtree_opt = seqtree_opt;
1750    handle->config.multi_kv_instances = multi_kv_instances;
1751
1752    if (handle->shandle && handle->max_seqnum == FDB_SNAPSHOT_INMEM) {
1753        // Either an in-memory snapshot or cloning from an existing snapshot..
1754        hdr_bid = 0; // This prevents _fdb_restore_wal() as incoming handle's
1755                     // *_open() should have already restored it
1756    } else { // Persisted snapshot or file rollback..
1757
1758        // get the BID of the latest block
1759        // (it is OK if the block is not a DB header)
1760        bool dirty_data_exists = false;
1761        struct superblock *sb = handle->file->sb;
1762
1763        if (sb_bmp_exists(sb)) {
1764            dirty_data_exists = false;
1765            bid_t sb_last_hdr_bid = atomic_get_uint64_t(&sb->last_hdr_bid);
1766            if (sb_last_hdr_bid != BLK_NOT_FOUND) {
1767                // add 1 since we subtract 1 from 'hdr_bid' below soon
1768                hdr_bid = sb_last_hdr_bid + 1;
1769                if (atomic_get_uint64_t(&sb->cur_alloc_bid) != hdr_bid) {
1770                    // seq number has been increased since the last commit
1771                    seqnum = fdb_kvs_get_committed_seqnum(handle);
1772                }
1773            } else {
1774                hdr_bid = BLK_NOT_FOUND;
1775            }
1776        } else {
1777            hdr_bid = filemgr_get_pos(handle->file) / FDB_BLOCKSIZE;
1778            dirty_data_exists = (hdr_bid > handle->last_hdr_bid);
1779        }
1780
1781        if (hdr_bid == BLK_NOT_FOUND ||
1782            (sb && hdr_bid <= sb->config->num_sb)) {
1783            hdr_bid = 0;
1784        } else if (hdr_bid > 0) {
1785            --hdr_bid;
1786        }
1787
1788        if (handle->max_seqnum) {
1789            struct kvs_stat stat_ori;
1790            // backup original stats
1791            if (handle->kvs) {
1792                _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1793            } else {
1794                _kvs_stat_get(handle->file, 0, &stat_ori);
1795            }
1796
1797            if (dirty_data_exists){
1798                // uncommitted data exists beyond the last DB header
1799                // get the last committed seq number
1800                fdb_seqnum_t seq_commit;
1801                seq_commit = fdb_kvs_get_committed_seqnum(handle);
1802                if (seq_commit == 0 || seq_commit < handle->max_seqnum) {
1803                    // In case, snapshot_open is attempted with latest uncommitted
1804                    // sequence number
1805                    header_len = 0;
1806                } else if (seq_commit == handle->max_seqnum) {
1807                    // snapshot/rollback on the latest commit header
1808                    seqnum = seq_commit; // skip file reverse scan
1809                }
1810                hdr_bid = filemgr_get_header_bid(handle->file);
1811            }
1812            // Reverse scan the file to locate the DB header with seqnum marker
1813            while (header_len && seqnum != handle->max_seqnum) {
1814                hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
1815                                          header_buf, &header_len, &seqnum,
1816                                          &header_revnum, NULL, &version, NULL,
1817                                          &handle->log_callback);
1818                if (header_len == 0) {
1819                    continue; // header doesn't exist
1820                }
1821                fdb_fetch_header(version, header_buf, &trie_root_bid,
1822                                 &seq_root_bid, &stale_root_bid,
1823                                 &ndocs, &ndeletes, &nlivenodes,
1824                                 &datasize, &last_wal_flush_hdr_bid,
1825                                 &kv_info_offset, &header_flags,
1826                                 &compacted_filename, NULL);
1827                handle->last_hdr_bid = hdr_bid;
1828
1829                if (!handle->kvs || handle->kvs->id == 0) {
1830                    // single KVS mode OR default KVS
1831                    if (!handle->shandle) {
1832                        // rollback
1833                        struct kvs_stat stat_dst;
1834                        _kvs_stat_get(handle->file, 0, &stat_dst);
1835                        stat_dst.ndocs = ndocs;
1836                        stat_dst.ndeletes = ndeletes;
1837                        stat_dst.datasize = datasize;
1838                        stat_dst.nlivenodes = nlivenodes;
1839                        stat_dst.deltasize = deltasize;
1840                        _kvs_stat_set(handle->file, 0, stat_dst);
1841                    }
1842                    continue;
1843                }
1844
1845                int64_t doc_offset;
1846                struct kvs_header *kv_header;
1847                struct docio_object doc;
1848
1849                _fdb_kvs_header_create(&kv_header);
1850                memset(&doc, 0, sizeof(struct docio_object));
1851                doc_offset = docio_read_doc(handle->dhandle,
1852                                            kv_info_offset, &doc, true);
1853
1854                if (doc_offset <= 0) {
1855                    header_len = 0; // fail
1856                    _fdb_kvs_header_free(kv_header);
1857                } else {
1858                    _fdb_kvs_header_import(kv_header, doc.body,
1859                                           doc.length.bodylen, version, false);
1860                    // get local sequence number for the KV instance
1861                    seqnum = _fdb_kvs_get_seqnum(kv_header,
1862                                                 handle->kvs->id);
1863                    if (!handle->shandle) {
1864                        // rollback: replace kv_header stats
1865                        // read from the current header's kv_header
1866                        struct kvs_stat stat_src, stat_dst;
1867                        _kvs_stat_get_kv_header(kv_header,
1868                                                handle->kvs->id,
1869                                                &stat_src);
1870                        _kvs_stat_get(handle->file,
1871                                      handle->kvs->id,
1872                                      &stat_dst);
1873                        // update ndocs, datasize, nlivenodes
1874                        // into the current file's kv_header
1875                        // Note: stats related to WAL should not be updated
1876                        //       at this time. They will be adjusted through
1877                        //       discard & restore routines below.
1878                        stat_dst.ndocs = stat_src.ndocs;
1879                        stat_dst.datasize = stat_src.datasize;
1880                        stat_dst.nlivenodes = stat_src.nlivenodes;
1881                        _kvs_stat_set(handle->file,
1882                                      handle->kvs->id,
1883                                      stat_dst);
1884                    }
1885                    _fdb_kvs_header_free(kv_header);
1886                    free_docio_object(&doc, 1, 1, 1);
1887                }
1888            }
1889            if (!header_len) { // Marker MUST match that of DB commit!
1890                // rollback original stats
1891                if (handle->kvs) {
1892                    _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1893                } else {
1894                    _kvs_stat_get(handle->file, 0, &stat_ori);
1895                }
1896
1897                docio_free(handle->dhandle);
1898                free(handle->dhandle);
1899                free(handle->filename);
1900                free(prev_filename);
1901                handle->filename = NULL;
1902                filemgr_close(handle->file, false, handle->filename,
1903                              &handle->log_callback);
1904                return FDB_RESULT_NO_DB_INSTANCE;
1905            }
1906
1907            if (!handle->shandle) { // Rollback mode, destroy file WAL..
1908                if (handle->config.multi_kv_instances) {
1909                    // multi KV instance mode
1910                    // clear only WAL items belonging to the instance
1911                    wal_close_kv_ins(handle->file,
1912                                     (handle->kvs)?(handle->kvs->id):(0),
1913                                     &handle->log_callback);
1914                } else {
1915                    wal_shutdown(handle->file, &handle->log_callback);
1916                }
1917            }
1918        } else { // snapshot to sequence number 0 requested..
1919            if (handle->shandle) { // fdb_snapshot_open API call
1920                if (seqnum) {
1921                    // Database currently has a non-zero seq number,
1922                    // but the snapshot was requested with a seq number zero.
1923                    docio_free(handle->dhandle);
1924                    free(handle->dhandle);
1925                    free(handle->filename);
1926                    free(prev_filename);
1927                    handle->filename = NULL;
1928                    filemgr_close(handle->file, false, handle->filename,
1929                                  &handle->log_callback);
1930                    return FDB_RESULT_NO_DB_INSTANCE;
1931                }
1932            } // end of zero max_seqnum but non-rollback check
1933        } // end of zero max_seqnum check
1934    } // end of durable snapshot locating
1935
1936    handle->btreeblkops = btreeblk_get_ops();
1937    handle->bhandle = (struct btreeblk_handle *)
1938                      calloc(1, sizeof(struct btreeblk_handle));
1939    handle->bhandle->log_callback = &handle->log_callback;
1940
1941    handle->dirty_updates = 0;
1942
1943    if (handle->config.compaction_buf_maxsize == 0) {
1944        handle->config.compaction_buf_maxsize = FDB_COMP_BUF_MINSIZE;
1945    }
1946
1947    btreeblk_init(handle->bhandle, handle->file, handle->file->blocksize);
1948
1949    if (header_revnum && !filemgr_is_rollback_on(handle->file)) {
1950        // only for snapshot (excluding rollback)
1951        handle->cur_header_revnum = header_revnum;
1952    } else {
1953        handle->cur_header_revnum = latest_header_revnum;
1954    }
1955    handle->last_wal_flush_hdr_bid = last_wal_flush_hdr_bid;
1956
1957    memset(&empty_stat, 0x0, sizeof(empty_stat));
1958    _kvs_stat_get(handle->file, 0, &stat);
1959    if (!memcmp(&stat, &empty_stat, sizeof(stat))) { // first open
1960        // sync (default) KVS stat with DB header
1961        stat.nlivenodes = nlivenodes;
1962        stat.ndocs = ndocs;
1963        stat.datasize = datasize;
1964        _kvs_stat_set(handle->file, 0, stat);
1965    }
1966
1967    handle->kv_info_offset = kv_info_offset;
1968    if (handle->config.multi_kv_instances && !handle->shandle) {
1969        // multi KV instance mode
1970        filemgr_mutex_lock(handle->file);
1971        if (kv_info_offset == BLK_NOT_FOUND) {
1972            // there is no KV header .. create & initialize
1973            fdb_kvs_header_create(handle->file);
1974            // TODO: If another handle is opened before the first header is appended,
1975            // an unnecessary KV info doc is appended. We need to address it.
1976            kv_info_offset = fdb_kvs_header_append(handle);
1977        } else if (handle->file->kv_header == NULL) {
1978            // KV header already exists but not loaded .. read & import
1979            fdb_kvs_header_create(handle->file);
1980            fdb_kvs_header_read(handle->file->kv_header, handle->dhandle,
1981                                kv_info_offset, version, false);
1982        }
1983        filemgr_mutex_unlock(handle->file);
1984
1985        // validation check for key order of all KV stores
1986        if (handle == handle->fhandle->root) {
1987            fdb_status fs = fdb_kvs_cmp_check(handle);
1988            if (fs != FDB_RESULT_SUCCESS) { // cmp function mismatch
1989                docio_free(handle->dhandle);
1990                free(handle->dhandle);
1991                btreeblk_free(handle->bhandle);
1992                free(handle->bhandle);
1993                free(handle->filename);
1994                handle->filename = NULL;
1995                filemgr_close(handle->file, false, handle->filename,
1996                              &handle->log_callback);
1997                return fs;
1998            }
1999        }
2000    }
2001    handle->kv_info_offset = kv_info_offset;
2002
2003    if (handle->kv_info_offset != BLK_NOT_FOUND &&
2004        handle->kvs == NULL) {
2005        // multi KV instance mode .. turn on config flag
2006        handle->config.multi_kv_instances = true;
2007        // only super handle can be opened using fdb_open(...)
2008        fdb_kvs_info_create(NULL, handle, handle->file, NULL);
2009    }
2010
2011    if (handle->shandle) { // Populate snapshot stats..
2012        if (kv_info_offset == BLK_NOT_FOUND) { // Single KV mode
2013            memset(&handle->shandle->stat, 0x0,
2014                    sizeof(handle->shandle->stat));
2015            handle->shandle->stat.ndocs = ndocs;
2016            handle->shandle->stat.datasize = datasize;
2017            handle->shandle->stat.nlivenodes = nlivenodes;
2018        } else { // Multi KV instance mode, populate specific kv stats
2019            memset(&handle->shandle->stat, 0x0,
2020                    sizeof(handle->shandle->stat));
2021            _kvs_stat_get(handle->file, handle->kvs->id,
2022                    &handle->shandle->stat);
2023            // Since wal is restored below, we have to reset
2024            // wal stats to zero.
2025            handle->shandle->stat.wal_ndeletes = 0;
2026            handle->shandle->stat.wal_ndocs = 0;
2027        }
2028    }
2029
2030    // initialize pointer to the global operational stats of this KV store
2031    handle->op_stats = filemgr_get_ops_stats(handle->file, handle->kvs);
2032    if (!handle->op_stats) {
2033        const char *msg = "Database open fails due to the error in retrieving "
2034            "the global operational stats of KV store in a database file '%s'\n";
2035        fdb_log(&handle->log_callback, FDB_RESULT_OPEN_FAIL, msg,
2036                handle->file->filename);
2037        return FDB_RESULT_OPEN_FAIL;
2038    }
2039
2040    handle->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
2041    hbtrie_init(handle->trie, config->chunksize, OFFSET_SIZE,
2042                handle->file->blocksize, trie_root_bid,
2043                (void *)handle->bhandle, handle->btreeblkops,
2044                (void *)handle->dhandle, _fdb_readkey_wrap);
2045    // set aux for cmp wrapping function
2046    hbtrie_set_leaf_height_limit(handle->trie, 0xff);
2047    hbtrie_set_leaf_cmp(handle->trie, _fdb_custom_cmp_wrap);
2048
2049    if (handle->kvs) {
2050        hbtrie_set_map_function(handle->trie, fdb_kvs_find_cmp_chunk);
2051    }
2052
2053    handle->seqnum = seqnum;
2054    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2055        if (handle->config.multi_kv_instances) {
2056            // multi KV instance mode .. HB+trie
2057            handle->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
2058            hbtrie_init(handle->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
2059                        handle->file->blocksize, seq_root_bid,
2060                        (void *)handle->bhandle, handle->btreeblkops,
2061                        (void *)handle->dhandle, _fdb_readseq_wrap);
2062
2063        } else {
2064            // single KV instance mode .. normal B+tree
2065            struct btree_kv_ops *seq_kv_ops =
2066                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
2067            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
2068            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
2069
2070            handle->seqtree = (struct btree*)malloc(sizeof(struct btree));
2071            if (seq_root_bid == BLK_NOT_FOUND) {
2072                btree_init(handle->seqtree, (void *)handle->bhandle,
2073                           handle->btreeblkops, seq_kv_ops,
2074                           handle->config.blocksize, sizeof(fdb_seqnum_t),
2075                           OFFSET_SIZE, 0x0, NULL);
2076             }else{
2077                 btree_init_from_bid(handle->seqtree, (void *)handle->bhandle,
2078                                     handle->btreeblkops, seq_kv_ops,
2079                                     handle->config.blocksize, seq_root_bid);
2080             }
2081        }
2082    }else{
2083        handle->seqtree = NULL;
2084    }
2085
2086    // Stale-block tree (supported since MAGIC_002)
2087    // this tree is independent to multi/single KVS mode option
2088    if (ver_staletree_support(handle->file->version)) {
2089        // normal B+tree
2090        struct btree_kv_ops *stale_kv_ops =
2091            (struct btree_kv_ops *)calloc(1, sizeof(struct btree_kv_ops));
2092        stale_kv_ops = btree_kv_get_kb64_vb64(stale_kv_ops);
2093        stale_kv_ops->cmp = _cmp_uint64_t_endian_safe;
2094
2095        handle->staletree = (struct btree*)calloc(1, sizeof(struct btree));
2096        if (stale_root_bid == BLK_NOT_FOUND) {
2097            btree_init(handle->staletree, (void *)handle->bhandle,
2098                       handle->btreeblkops, stale_kv_ops,
2099                       handle->config.blocksize, sizeof(filemgr_header_revnum_t),
2100                       OFFSET_SIZE, 0x0, NULL);
2101         }else{
2102            btree_init_from_bid(handle->staletree, (void *)handle->bhandle,
2103                                handle->btreeblkops, stale_kv_ops,
2104                                handle->config.blocksize, stale_root_bid);
2105         }
2106    } else {
2107        handle->staletree = NULL;
2108    }
2109
2110    if (handle->config.multi_kv_instances && handle->max_seqnum) {
2111        // restore only docs belonging to the KV instance
2112        // handle->kvs should not be NULL
2113        _fdb_restore_wal(handle, FDB_RESTORE_KV_INS,
2114                         hdr_bid, (handle->kvs)?(handle->kvs->id):(0));
2115    } else {
2116        // normal restore
2117        _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, hdr_bid, 0);
2118    }
2119
2120    if (compacted_filename &&
2121        filemgr_get_file_status(handle->file) == FILE_NORMAL &&
2122        !(config->flags & FDB_OPEN_FLAG_RDONLY)) { // do not recover read-only
2123        _fdb_recover_compaction(handle, compacted_filename);
2124    }
2125
2126    if (prev_filename) {
2127        if (!handle->shandle && strcmp(prev_filename, handle->file->filename)) {
2128            // record the old filename into the file handle of current file
2129            // and REMOVE old file on the first open
2130            // WARNING: snapshots must have been opened before this call
2131            if (filemgr_update_file_status(handle->file,
2132                                           filemgr_get_file_status(handle->file),
2133                                           prev_filename)) {
2134                // Open the old file with read-only mode.
2135                // (Temporarily disable log callback at this time since
2136                //  the old file might be already removed.)
2137                fconfig.options = FILEMGR_READONLY;
2138                filemgr_open_result result = filemgr_open(prev_filename,
2139                                                          handle->fileops,
2140                                                          &fconfig,
2141                                                          NULL);
2142                if (result.file) {
2143                    filemgr_remove_pending(result.file, handle->file,
2144                                           &handle->log_callback);
2145                    filemgr_close(result.file, 0, handle->filename,
2146                                  &handle->log_callback);
2147                }
2148            } else {
2149                free(prev_filename);
2150            }
2151        } else {
2152            free(prev_filename);
2153        }
2154    }
2155
2156    status = btreeblk_end(handle->bhandle);
2157    if (status != FDB_RESULT_SUCCESS) {
2158        return status;
2159    }
2160
2161    // do not register read-only handles
2162    if (!(config->flags & FDB_OPEN_FLAG_RDONLY)) {
2163        if (config->compaction_mode == FDB_COMPACTION_AUTO) {
2164            status = compactor_register_file(handle->file,
2165                                             (fdb_config *)config,
2166                                             &handle->log_callback);
2167        }
2168        if (status == FDB_RESULT_SUCCESS) {
2169            status = bgflusher_register_file(handle->file,
2170                                             (fdb_config *)config,
2171                                             &handle->log_callback);
2172        }
2173    }
2174
2175    return status;
2176}
2177
2178LIBFDB_API
2179fdb_status fdb_set_log_callback(fdb_kvs_handle *handle,
2180                                fdb_log_callback log_callback,
2181                                void *ctx_data)
2182{
2183    handle->log_callback.callback = log_callback;
2184    handle->log_callback.ctx_data = ctx_data;
2185    return FDB_RESULT_SUCCESS;
2186}
2187
2188LIBFDB_API
2189void fdb_set_fatal_error_callback(fdb_fatal_error_callback err_callback)
2190{
2191    fatal_error_callback = err_callback;
2192}
2193
2194LIBFDB_API
2195fdb_status fdb_doc_create(fdb_doc **doc, const void *key, size_t keylen,
2196                          const void *meta, size_t metalen,
2197                          const void *body, size_t bodylen)
2198{
2199    if (doc == NULL || keylen > FDB_MAX_KEYLEN ||
2200        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
2201        return FDB_RESULT_INVALID_ARGS;
2202    }
2203
2204    *doc = (fdb_doc*)calloc(1, sizeof(fdb_doc));
2205    if (*doc == NULL) { // LCOV_EXCL_START
2206        return FDB_RESULT_ALLOC_FAIL;
2207    } // LCOV_EXCL_STOP
2208
2209    (*doc)->seqnum = SEQNUM_NOT_USED;
2210
2211    if (key && keylen > 0) {
2212        (*doc)->key = (void *)malloc(keylen);
2213        if ((*doc)->key == NULL) { // LCOV_EXCL_START
2214            return FDB_RESULT_ALLOC_FAIL;
2215        } // LCOV_EXCL_STOP
2216        memcpy((*doc)->key, key, keylen);
2217        (*doc)->keylen = keylen;
2218    } else {
2219        (*doc)->key = NULL;
2220        (*doc)->keylen = 0;
2221    }
2222
2223    if (meta && metalen > 0) {
2224        (*doc)->meta = (void *)malloc(metalen);
2225        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
2226            return FDB_RESULT_ALLOC_FAIL;
2227        } // LCOV_EXCL_STOP
2228        memcpy((*doc)->meta, meta, metalen);
2229        (*doc)->metalen = metalen;
2230    } else {
2231        (*doc)->meta = NULL;
2232        (*doc)->metalen = 0;
2233    }
2234
2235    if (body && bodylen > 0) {
2236        (*doc)->body = (void *)malloc(bodylen);
2237        if ((*doc)->body == NULL) { // LCOV_EXCL_START
2238            return FDB_RESULT_ALLOC_FAIL;
2239        } // LCOV_EXCL_STOP
2240        memcpy((*doc)->body, body, bodylen);
2241        (*doc)->bodylen = bodylen;
2242    } else {
2243        (*doc)->body = NULL;
2244        (*doc)->bodylen = 0;
2245    }
2246
2247    return FDB_RESULT_SUCCESS;
2248}
2249
2250LIBFDB_API
2251fdb_status fdb_doc_update(fdb_doc **doc,
2252                          const void *meta, size_t metalen,
2253                          const void *body, size_t bodylen)
2254{
2255    if (doc == NULL ||
2256        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
2257        return FDB_RESULT_INVALID_ARGS;
2258    }
2259    if (*doc == NULL) {
2260        return FDB_RESULT_INVALID_ARGS;
2261    }
2262
2263    if (meta && metalen > 0) {
2264        // free previous metadata
2265        free((*doc)->meta);
2266        // allocate new metadata
2267        (*doc)->meta = (void *)malloc(metalen);
2268        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
2269            return FDB_RESULT_ALLOC_FAIL;
2270        } // LCOV_EXCL_STOP
2271        memcpy((*doc)->meta, meta, metalen);
2272        (*doc)->metalen = metalen;
2273    }
2274
2275    if (body && bodylen > 0) {
2276        // free previous body
2277        free((*doc)->body);
2278        // allocate new body
2279        (*doc)->body = (void *)malloc(bodylen);
2280        if ((*doc)->body == NULL) { // LCOV_EXCL_START
2281            return FDB_RESULT_ALLOC_FAIL;
2282        } // LCOV_EXCL_STOP
2283        memcpy((*doc)->body, body, bodylen);
2284        (*doc)->bodylen = bodylen;
2285    }
2286
2287    (*doc)->seqnum = SEQNUM_NOT_USED;
2288    return FDB_RESULT_SUCCESS;
2289}
2290
2291LIBFDB_API
2292void fdb_doc_set_seqnum(fdb_doc *doc,
2293                        const fdb_seqnum_t seqnum)
2294{
2295    doc->seqnum = seqnum;
2296    if (seqnum != SEQNUM_NOT_USED) {
2297        doc->flags |= FDB_CUSTOM_SEQNUM; // fdb_set will now use above seqnum
2298    } else { // reset custom seqnum flag, fdb_set will now generate new seqnum
2299        doc->flags &= ~FDB_CUSTOM_SEQNUM;
2300    }
2301}
2302
2303// doc MUST BE allocated by malloc
2304LIBFDB_API
2305fdb_status fdb_doc_free(fdb_doc *doc)
2306{
2307    if (doc) {
2308        free(doc->key);
2309        free(doc->meta);
2310        free(doc->body);
2311        free(doc);
2312    }
2313    return FDB_RESULT_SUCCESS;
2314}
2315
2316INLINE uint64_t _fdb_wal_get_old_offset(void *voidhandle,
2317                                        struct wal_item *item)
2318{
2319    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
2320    uint64_t old_offset = 0;
2321
2322    if (item->action == WAL_ACT_REMOVE) {
2323        // For immediate remove, old_offset value is critical
2324        // so that we should get an exact value.
2325        hbtrie_find(handle->trie,
2326                    item->header->key,
2327                    item->header->keylen,
2328                    (void*)&old_offset);
2329    } else {
2330        hbtrie_find_offset(handle->trie,
2331                           item->header->key,
2332                           item->header->keylen,
2333                           (void*)&old_offset);
2334    }
2335    btreeblk_end(handle->bhandle);
2336    old_offset = _endian_decode(old_offset);
2337
2338    return old_offset;
2339}
2340
2341// A stale sequence number entry that can be purged from the sequence tree
2342// during the WAL flush.
2343struct wal_stale_seq_entry {
2344    fdb_kvs_id_t kv_id;
2345    fdb_seqnum_t seqnum;
2346    struct avl_node avl_entry;
2347};
2348
2349// Delta changes in KV store stats during the WAL flush
2350struct wal_kvs_delta_stat {
2351    fdb_kvs_id_t kv_id;
2352    int64_t nlivenodes;
2353    int64_t ndocs;
2354    int64_t ndeletes;
2355    int64_t datasize;
2356    int64_t deltasize;
2357    struct avl_node avl_entry;
2358};
2359
2360INLINE int _fdb_seq_entry_cmp(struct avl_node *a, struct avl_node *b, void *aux)
2361{
2362    (void) aux;
2363    struct wal_stale_seq_entry *entry1 = _get_entry(a, struct wal_stale_seq_entry,
2364                                                    avl_entry);
2365    struct wal_stale_seq_entry *entry2 = _get_entry(b, struct wal_stale_seq_entry,
2366                                                    avl_entry);
2367    if (entry1->kv_id < entry2->kv_id) {
2368        return -1;
2369    } else if (entry1->kv_id > entry2->kv_id) {
2370        return 1;
2371    } else {
2372        return _CMP_U64(entry1->seqnum, entry2->seqnum);
2373    }
2374}
2375
2376
2377// Compare function to sort KVS delta stat entries in the AVL tree during WAL flush
2378INLINE int _kvs_delta_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
2379{
2380    (void) aux;
2381    struct wal_kvs_delta_stat *stat1 = _get_entry(a, struct wal_kvs_delta_stat,
2382                                                  avl_entry);
2383    struct wal_kvs_delta_stat *stat2 = _get_entry(b, struct wal_kvs_delta_stat,
2384                                                  avl_entry);
2385    if (stat1->kv_id < stat2->kv_id) {
2386        return -1;
2387    } else if (stat1->kv_id > stat2->kv_id) {
2388        return 1;
2389    } else {
2390        return 0;
2391    }
2392}
2393
2394INLINE void _fdb_wal_flush_seq_purge(void *dbhandle,
2395                                     struct avl_tree *stale_seqnum_list,
2396                                     struct avl_tree *kvs_delta_stats)
2397{
2398    fdb_seqnum_t _seqnum;
2399    int64_t nlivenodes;
2400    int64_t ndeltanodes;
2401    int64_t delta;
2402    uint8_t kvid_seqnum[sizeof(fdb_kvs_id_t) + sizeof(fdb_seqnum_t)];
2403    struct wal_stale_seq_entry *seq_entry;
2404    struct wal_kvs_delta_stat *delta_stat;
2405    struct wal_kvs_delta_stat kvs_delta_query;
2406
2407    fdb_kvs_handle *handle = (fdb_kvs_handle *)dbhandle;
2408    struct avl_node *node = avl_first(stale_seqnum_list);
2409    while (node) {
2410        seq_entry = _get_entry(node, struct wal_stale_seq_entry, avl_entry);
2411        node = avl_next(node);
2412        nlivenodes = handle->bhandle->nlivenodes;
2413        ndeltanodes = handle->bhandle->ndeltanodes;
2414        _seqnum = _endian_encode(seq_entry->seqnum);
2415        if (handle->kvs) {
2416            // multi KV instance mode .. HB+trie
2417            kvid2buf(sizeof(fdb_kvs_id_t), seq_entry->kv_id, kvid_seqnum);
2418            memcpy(kvid_seqnum + sizeof(fdb_kvs_id_t), &_seqnum, sizeof(fdb_seqnum_t));
2419            hbtrie_remove(handle->seqtrie, (void*)kvid_seqnum,
2420                          sizeof(fdb_kvs_id_t) + sizeof(fdb_seqnum_t));
2421        } else {
2422            btree_remove(handle->seqtree, (void*)&_seqnum);
2423        }
2424        btreeblk_end(handle->bhandle);
2425
2426        kvs_delta_query.kv_id = seq_entry->kv_id;
2427        avl_node *delta_stat_node = avl_search(kvs_delta_stats,
2428                                               &kvs_delta_query.avl_entry,
2429                                               _kvs_delta_stat_cmp);
2430        if (delta_stat_node) {
2431            delta_stat = _get_entry(delta_stat_node, struct wal_kvs_delta_stat,
2432                                    avl_entry);
2433            delta = handle->bhandle->nlivenodes - nlivenodes;
2434            delta_stat->nlivenodes += delta;
2435            delta = handle->bhandle->ndeltanodes - ndeltanodes;
2436            delta *= handle->config.blocksize;
2437            delta_stat->deltasize += delta;
2438        }
2439        avl_remove(stale_seqnum_list, &seq_entry->avl_entry);
2440        free(seq_entry);
2441    }
2442}
2443
2444INLINE void _fdb_wal_flush_kvs_delta_stats(struct filemgr *file,
2445                                           struct avl_tree *kvs_delta_stats)
2446{
2447    struct avl_node *node;
2448    struct wal_kvs_delta_stat *delta_stat;
2449    node = avl_first(kvs_delta_stats);
2450    while (node) {
2451        delta_stat = _get_entry(node, struct wal_kvs_delta_stat, avl_entry);
2452        node = avl_next(node);
2453        _kvs_stat_update_attr(file, delta_stat->kv_id,
2454                              KVS_STAT_DATASIZE, delta_stat->datasize);
2455        _kvs_stat_update_attr(file, delta_stat->kv_id,
2456                              KVS_STAT_NDOCS, delta_stat->ndocs);
2457        _kvs_stat_update_attr(file, delta_stat->kv_id,
2458                              KVS_STAT_NDELETES, delta_stat->ndeletes);
2459        _kvs_stat_update_attr(file, delta_stat->kv_id,
2460                              KVS_STAT_NLIVENODES, delta_stat->nlivenodes);
2461        _kvs_stat_update_attr(file, delta_stat->kv_id,
2462                              KVS_STAT_DELTASIZE, delta_stat->deltasize);
2463        avl_remove(kvs_delta_stats, &delta_stat->avl_entry);
2464        free(delta_stat);
2465    }
2466}
2467
2468INLINE fdb_status _fdb_wal_flush_func(void *voidhandle,
2469                                      struct wal_item *item,
2470                                      struct avl_tree *stale_seqnum_list,
2471                                      struct avl_tree *kvs_delta_stats)
2472{
2473    hbtrie_result hr;
2474    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
2475    fdb_seqnum_t _seqnum;
2476    fdb_kvs_id_t kv_id = 0;
2477    fdb_status fs = FDB_RESULT_SUCCESS;
2478    uint8_t *var_key = alca(uint8_t, handle->config.chunksize);
2479    int size_id, size_seq;
2480    uint8_t *kvid_seqnum;
2481    uint64_t old_offset;
2482    int64_t _offset;
2483    int64_t delta;
2484    struct docio_object _doc;
2485    struct filemgr *file = handle->dhandle->file;
2486
2487    memset(var_key, 0, handle->config.chunksize);
2488    if (handle->kvs) {
2489        buf2kvid(handle->config.chunksize, item->header->key, &kv_id);
2490    } else {
2491        kv_id = 0;
2492    }
2493
2494    struct wal_kvs_delta_stat *kvs_delta_stat;
2495    struct wal_kvs_delta_stat kvs_delta_query;
2496    kvs_delta_query.kv_id = kv_id;
2497    avl_node *delta_stat_node = avl_search(kvs_delta_stats,
2498                                           &kvs_delta_query.avl_entry,
2499                                           _kvs_delta_stat_cmp);
2500    if (delta_stat_node) {
2501        kvs_delta_stat = _get_entry(delta_stat_node, struct wal_kvs_delta_stat,
2502                                    avl_entry);
2503    } else {
2504        kvs_delta_stat = (struct wal_kvs_delta_stat *)
2505            calloc(1, sizeof(struct wal_kvs_delta_stat));
2506        kvs_delta_stat->kv_id = kv_id;
2507        avl_insert(kvs_delta_stats, &kvs_delta_stat->avl_entry,
2508                   _kvs_delta_stat_cmp);
2509    }
2510
2511    int64_t nlivenodes = handle->bhandle->nlivenodes;
2512    int64_t ndeltanodes = handle->bhandle->ndeltanodes;
2513
2514    if (item->action == WAL_ACT_INSERT ||
2515        item->action == WAL_ACT_LOGICAL_REMOVE) {
2516        _offset = _endian_encode(item->offset);
2517
2518        hr = hbtrie_insert(handle->trie,
2519                           item->header->key,
2520                           item->header->keylen,
2521                           (void *)&_offset,
2522                           (void *)&old_offset);
2523
2524        fs = btreeblk_end(handle->bhandle);
2525        if (fs != FDB_RESULT_SUCCESS) {
2526            return fs;
2527        }
2528        old_offset = _endian_decode(old_offset);
2529
2530        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2531            _seqnum = _endian_encode(item->seqnum);
2532            if (handle->kvs) {
2533                // multi KV instance mode .. HB+trie
2534                uint64_t old_offset_local;
2535
2536                size_id = sizeof(fdb_kvs_id_t);
2537                size_seq = sizeof(fdb_seqnum_t);
2538                kvid_seqnum = alca(uint8_t, size_id + size_seq);
2539                kvid2buf(size_id, kv_id, kvid_seqnum);
2540                memcpy(kvid_seqnum + size_id, &_seqnum, size_seq);
2541                hbtrie_insert(handle->seqtrie, kvid_seqnum, size_id + size_seq,
2542                              (void *)&_offset, (void *)&old_offset_local);
2543            } else {
2544                btree_insert(handle->seqtree, (void *)&_seqnum,
2545                             (void *)&_offset);
2546            }
2547            fs = btreeblk_end(handle->bhandle);
2548            if (fs != FDB_RESULT_SUCCESS) {
2549                return fs;
2550            }
2551        }
2552
2553        delta = handle->bhandle->nlivenodes - nlivenodes;
2554        kvs_delta_stat->nlivenodes += delta;
2555        delta = handle->bhandle->ndeltanodes - ndeltanodes;
2556        delta *= handle->config.blocksize;
2557        kvs_delta_stat->deltasize += delta;
2558
2559        if (old_offset == BLK_NOT_FOUND) {
2560            if (item->action == WAL_ACT_INSERT) {
2561                ++kvs_delta_stat->ndocs;
2562            } else { // inserted a logical deleted doc into main index
2563                ++kvs_delta_stat->ndeletes;
2564            }
2565            kvs_delta_stat->datasize += item->doc_size;
2566            kvs_delta_stat->deltasize += item->doc_size;
2567        } else { // update or logical delete
2568            // This block is already cached when we call HBTRIE_INSERT.
2569            // No additional block access.
2570            char dummy_key[FDB_MAX_KEYLEN];
2571            _doc.meta = _doc.body = NULL;
2572            _doc.key = &dummy_key;
2573            _offset = docio_read_doc_key_meta(handle->dhandle, old_offset,
2574                                              &_doc, true);
2575            if (_offset < 0) {
2576                return (fdb_status) _offset;
2577            } else if (_offset == 0) {
2578                // Note that this is not an error as old_offset is pointing to
2579                // the zero-filled region in a document block.
2580                return FDB_RESULT_KEY_NOT_FOUND;
2581            }
2582            free(_doc.meta);
2583            filemgr_mark_stale(file, old_offset, _fdb_get_docsize(_doc.length));
2584
2585            if (!(_doc.length.flag & DOCIO_DELETED)) {//prev doc was not deleted
2586                if (item->action == WAL_ACT_LOGICAL_REMOVE) { // now deleted
2587                    --kvs_delta_stat->ndocs;
2588                    ++kvs_delta_stat->ndeletes;
2589                } // else no change (prev doc was insert, now just an update)
2590            } else { // prev doc in main index was a logically deleted doc
2591                if (item->action == WAL_ACT_INSERT) { // now undeleted
2592                    ++kvs_delta_stat->ndocs;
2593                    --kvs_delta_stat->ndeletes;
2594                } // else no change (prev doc was deleted, now re-deleted)
2595            }
2596
2597            delta = (int)item->doc_size - (int)_fdb_get_docsize(_doc.length);
2598            kvs_delta_stat->datasize += delta;
2599            if (handle->last_hdr_bid * handle->config.blocksize < old_offset) {
2600                kvs_delta_stat->deltasize += delta;
2601            } else {
2602                kvs_delta_stat->deltasize += (int)item->doc_size;
2603            }
2604
2605            // Avoid duplicates (remove previous sequence number)
2606            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2607                struct wal_stale_seq_entry *entry = (struct wal_stale_seq_entry *)
2608                    calloc(1, sizeof(struct wal_stale_seq_entry));
2609                entry->kv_id = kv_id;
2610                entry->seqnum = _doc.seqnum;
2611                avl_insert(stale_seqnum_list, &entry->avl_entry,
2612                           _fdb_seq_entry_cmp);
2613            }
2614        }
2615    } else {
2616        // Immediate remove
2617        old_offset = item->old_offset;
2618        hr = hbtrie_remove(handle->trie, item->header->key,
2619                           item->header->keylen);
2620        fs = btreeblk_end(handle->bhandle);
2621        if (fs != FDB_RESULT_SUCCESS) {
2622            return fs;
2623        }
2624
2625        if (hr == HBTRIE_RESULT_SUCCESS) {
2626            // This block is already cached when we call _fdb_wal_get_old_offset
2627            // No additional block access should be done.
2628            char dummy_key[FDB_MAX_KEYLEN];
2629            _doc.meta = _doc.body = NULL;
2630            _doc.key = &dummy_key;
2631            _offset = docio_read_doc_key_meta(handle->dhandle, old_offset,
2632                                              &_doc, true);
2633            if (_offset < 0) {
2634                return (fdb_status) _offset;
2635            } else if (_offset == 0) {
2636                return FDB_RESULT_KEY_NOT_FOUND;
2637            }
2638            free(_doc.meta);
2639            filemgr_mark_stale(file, old_offset, _fdb_get_docsize(_doc.length));
2640
2641            // Reduce the total number of docs by one
2642            --kvs_delta_stat->ndocs;
2643            if (_doc.length.flag & DOCIO_DELETED) {//prev deleted doc is dropped
2644                --kvs_delta_stat->ndeletes;
2645            }
2646
2647            // Reduce the total datasize by size of previously present doc
2648            delta = -(int)_fdb_get_docsize(_doc.length);
2649            kvs_delta_stat->datasize += delta;
2650            // if multiple wal flushes happen before commit, then it's possible
2651            // that this doc deleted was inserted & flushed after last commit
2652            // In this case we need to update the deltasize too which tracks
2653            // the amount of new data inserted between commits.
2654            if (handle->last_hdr_bid * handle->config.blocksize < old_offset) {
2655                kvs_delta_stat->deltasize += delta;
2656            }
2657
2658            // remove sequence number for the removed doc
2659            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2660                struct wal_stale_seq_entry *entry = (struct wal_stale_seq_entry *)
2661                    calloc(1, sizeof(struct wal_stale_seq_entry));
2662                entry->kv_id = kv_id;
2663                entry->seqnum = _doc.seqnum;
2664                avl_insert(stale_seqnum_list, &entry->avl_entry, _fdb_seq_entry_cmp);
2665            }
2666
2667            // Update index size to new size after the remove operation
2668            delta = handle->bhandle->nlivenodes - nlivenodes;
2669            kvs_delta_stat->nlivenodes += delta;
2670
2671            // ndeltanodes measures number of new index nodes created due to
2672            // this hbtrie_remove() operation
2673            delta = (int)handle->bhandle->ndeltanodes - ndeltanodes;
2674            delta *= handle->config.blocksize;
2675            kvs_delta_stat->deltasize += delta;
2676        }
2677    }
2678    return FDB_RESULT_SUCCESS;
2679}
2680
2681void fdb_sync_db_header(fdb_kvs_handle *handle)
2682{
2683    uint64_t cur_revnum = filemgr_get_header_revnum(handle->file);
2684    if (handle->cur_header_revnum != cur_revnum) {
2685        void *header_buf = NULL;
2686        size_t header_len;
2687        bid_t hdr_bid;
2688        filemgr_header_revnum_t revnum;
2689
2690        header_buf = filemgr_get_header(handle->file, NULL, &header_len,
2691                                        &hdr_bid, NULL, &revnum);
2692        if (header_len > 0) {
2693            uint64_t header_flags, dummy64, version;
2694            bid_t idtree_root;
2695            bid_t new_seq_root;
2696            bid_t new_stale_root;
2697            char *compacted_filename;
2698            char *prev_filename = NULL;
2699
2700            version = handle->file->version;
2701            handle->last_hdr_bid = hdr_bid;
2702            handle->cur_header_revnum = revnum;
2703
2704            fdb_fetch_header(version, header_buf, &idtree_root,
2705                             &new_seq_root, &new_stale_root, &dummy64,
2706                             &dummy64, &dummy64,
2707                             &dummy64, &handle->last_wal_flush_hdr_bid,
2708                             &handle->kv_info_offset, &header_flags,
2709                             &compacted_filename, &prev_filename);
2710
2711            if (handle->dirty_updates) {
2712                // discard all cached writable b+tree nodes
2713                // to avoid data inconsistency with other writers
2714                btreeblk_discard_blocks(handle->bhandle);
2715            }
2716
2717            handle->trie->root_bid = idtree_root;
2718
2719            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2720                if (new_seq_root != handle->seqtree->root_bid) {
2721                    if (handle->config.multi_kv_instances) {
2722                        handle->seqtrie->root_bid = new_seq_root;
2723                    } else {
2724                        btree_init_from_bid(handle->seqtree,
2725                                            handle->seqtree->blk_handle,
2726                                            handle->seqtree->blk_ops,
2727                                            handle->seqtree->kv_ops,
2728                                            handle->seqtree->blksize,
2729                                            new_seq_root);
2730                    }
2731                }
2732            }
2733
2734            if (ver_staletree_support(version)) {
2735                btree_init_from_bid(handle->staletree,
2736                                    handle->staletree->blk_handle,
2737                                    handle->staletree->blk_ops,
2738                                    handle->staletree->kv_ops,
2739                                    handle->staletree->blksize,
2740                                    new_stale_root);
2741            } else {
2742                handle->staletree = NULL;
2743            }
2744
2745            if (prev_filename) {
2746                free(prev_filename);
2747            }
2748
2749            handle->dirty_updates = 0;
2750            if (handle->kvs) {
2751                // multiple KV instance mode AND sub handle
2752                handle->seqnum = fdb_kvs_get_seqnum(handle->file,
2753                                                    handle->kvs->id);
2754            } else {
2755                // super handle OR single KV instance mode
2756                handle->seqnum = filemgr_get_seqnum(handle->file);
2757            }
2758        } else {
2759            handle->last_hdr_bid = filemgr_get_header_bid(handle->file);
2760        }
2761
2762        if (header_buf) {
2763            free(header_buf);
2764        }
2765    }
2766}
2767
2768fdb_status fdb_check_file_reopen(fdb_kvs_handle *handle, file_status_t *status)
2769{
2770    bool fhandle_ret;
2771    fdb_status fs = FDB_RESULT_SUCCESS;
2772    file_status_t fstatus = filemgr_get_file_status(handle->file);
2773    // check whether the compaction is done
2774    if (fstatus == FILE_REMOVED_PENDING) {
2775        uint64_t ndocs, ndeletes, datasize, nlivenodes, last_wal_flush_hdr_bid;
2776        uint64_t kv_info_offset, header_flags;
2777        size_t header_len;
2778        char *new_filename;
2779        uint8_t *buf = alca(uint8_t, handle->config.blocksize);
2780        bid_t trie_root_bid, seq_root_bid, stale_root_bid;
2781        fdb_config config = handle->config;
2782
2783        // close the current file and newly open the new file
2784        if (handle->config.compaction_mode == FDB_COMPACTION_AUTO) {
2785            // compaction daemon mode .. just close and then open
2786            char filename[FDB_MAX_FILENAME_LEN];
2787            strcpy(filename, handle->filename);
2788
2789            // We don't need to maintain fhandle list for the old file
2790            // as there will be no more mutation on the file.
2791            fhandle_ret = filemgr_fhandle_remove(handle->file, handle->fhandle);
2792            fs = _fdb_close(handle);
2793            if (fs != FDB_RESULT_SUCCESS) {
2794                if (fhandle_ret) {
2795                    filemgr_fhandle_add(handle->file, handle->fhandle);
2796                }
2797                return fs;
2798            }
2799
2800            fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
2801            if (fs != FDB_RESULT_SUCCESS) {
2802                return fs;
2803            }
2804            filemgr_fhandle_add(handle->file, handle->fhandle);
2805
2806        } else {
2807            filemgr_get_header(handle->file, buf, &header_len, NULL, NULL, NULL);
2808            fdb_fetch_header(handle->file->version, buf,
2809                             &trie_root_bid, &seq_root_bid, &stale_root_bid,
2810                             &ndocs, &ndeletes, &nlivenodes, &datasize,
2811                             &last_wal_flush_hdr_bid,
2812                             &kv_info_offset, &header_flags,
2813                             &new_filename, NULL);
2814
2815            fhandle_ret = filemgr_fhandle_remove(handle->file, handle->fhandle);
2816            fs = _fdb_close(handle);
2817            if (fs != FDB_RESULT_SUCCESS) {
2818                if (fhandle_ret) {
2819                    filemgr_fhandle_add(handle->file, handle->fhandle);
2820                }
2821                return fs;
2822            }
2823
2824            fs = _fdb_open(handle, new_filename, FDB_AFILENAME, &config);
2825            if (fs != FDB_RESULT_SUCCESS) {
2826                return fs;
2827            }
2828            filemgr_fhandle_add(handle->file, handle->fhandle);
2829        }
2830    }
2831    if (status) {
2832        *status = fstatus;
2833    }
2834    return fs;
2835}
2836
2837static void _fdb_sync_dirty_root(fdb_kvs_handle *handle)
2838{
2839    bid_t dirty_idtree_root = BLK_NOT_FOUND;
2840    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
2841
2842    if (handle->shandle) {
2843        // skip snapshot
2844        return;
2845    }
2846
2847    struct filemgr_dirty_update_node *dirty_update;
2848    dirty_update = filemgr_dirty_update_get_latest(handle->file);
2849    btreeblk_set_dirty_update(handle->bhandle, dirty_update);
2850
2851    if (dirty_update) {
2852        filemgr_dirty_update_get_root(handle->file, dirty_update,
2853                                      &dirty_idtree_root, &dirty_seqtree_root);
2854        _fdb_import_dirty_root(handle, dirty_idtree_root, dirty_seqtree_root);
2855        btreeblk_discard_blocks(handle->bhandle);
2856    }
2857
2858    return;
2859}
2860
2861static void _fdb_release_dirty_root(fdb_kvs_handle *handle)
2862{
2863    if (!handle->shandle) {
2864        struct filemgr_dirty_update_node *dirty_update;
2865        dirty_update = btreeblk_get_dirty_update(handle->bhandle);
2866        if (dirty_update) {
2867            filemgr_dirty_update_close_node(handle->file, dirty_update);
2868            btreeblk_clear_dirty_update(handle->bhandle);
2869        }
2870    }
2871}
2872
2873LIBFDB_API
2874fdb_status fdb_get(fdb_kvs_handle *handle, fdb_doc *doc)
2875{
2876    uint64_t offset;
2877    int64_t _offset;
2878    struct docio_object _doc;
2879    struct filemgr *wal_file = NULL;
2880    struct docio_handle *dhandle;
2881    struct _fdb_key_cmp_info cmp_info;
2882    fdb_status wr;
2883    hbtrie_result hr = HBTRIE_RESULT_FAIL;
2884    fdb_txn *txn;
2885    fdb_doc doc_kv;
2886    LATENCY_STAT_START();
2887
2888    if (!handle || !doc || !doc->key || doc->keylen == 0 ||
2889        doc->keylen > FDB_MAX_KEYLEN ||
2890        (handle->kvs_config.custom_cmp &&
2891            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2892        return FDB_RESULT_INVALID_ARGS;
2893    }
2894
2895    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2896        return FDB_RESULT_HANDLE_BUSY;
2897    }
2898
2899    doc_kv = *doc;
2900
2901    if (handle->kvs) {
2902        // multi KV instance mode
2903        int size_chunk = handle->config.chunksize;
2904        doc_kv.keylen = doc->keylen + size_chunk;
2905        doc_kv.key = alca(uint8_t, doc_kv.keylen);
2906        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
2907        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
2908    }
2909
2910    if (!handle->shandle) {
2911        fdb_check_file_reopen(handle, NULL);
2912        txn = handle->fhandle->root->txn;
2913        if (!txn) {
2914            txn = &handle->file->global_txn;
2915        }
2916    } else {
2917        txn = handle->shandle->snap_txn;
2918    }
2919
2920    cmp_info.kvs_config = handle->kvs_config;
2921    cmp_info.kvs = handle->kvs;
2922    wal_file = handle->file;
2923    dhandle = handle->dhandle;
2924
2925    if (handle->kvs) {
2926        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, &doc_kv,
2927                      &offset);
2928    } else {
2929        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, doc,
2930                      &offset);
2931    }
2932
2933    if (!handle->shandle) {
2934        fdb_sync_db_header(handle);
2935    }
2936
2937    atomic_incr_uint64_t(&handle->op_stats->num_gets, std::memory_order_relaxed);
2938
2939    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2940        _fdb_sync_dirty_root(handle);
2941
2942        if (handle->kvs) {
2943            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
2944                             (void *)&offset);
2945        } else {
2946            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
2947                             (void *)&offset);
2948        }
2949        btreeblk_end(handle->bhandle);
2950        offset = _endian_decode(offset);
2951
2952        _fdb_release_dirty_root(handle);
2953    }
2954
2955    if ((wr == FDB_RESULT_SUCCESS && offset != BLK_NOT_FOUND) ||
2956         hr == HBTRIE_RESULT_SUCCESS) {
2957        bool alloced_meta = doc->meta ? false : true;
2958        bool alloced_body = doc->body ? false : true;
2959        if (handle->kvs) {
2960            _doc.key = doc_kv.key;
2961            _doc.length.keylen = doc_kv.keylen;
2962            doc->deleted = doc_kv.deleted; // update deleted field if wal_find
2963        } else {
2964            _doc.key = doc->key;
2965            _doc.length.keylen = doc->keylen;
2966        }
2967        _doc.meta = doc->meta;
2968        _doc.body = doc->body;
2969
2970        if (wr == FDB_RESULT_SUCCESS && doc->deleted) {
2971            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
2972            return FDB_RESULT_KEY_NOT_FOUND;
2973        }
2974
2975        _offset = docio_read_doc(dhandle, offset, &_doc, true);
2976        if (_offset <= 0) {
2977            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
2978            return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
2979        }
2980
2981        if (_doc.length.keylen != doc_kv.keylen ||
2982            _doc.length.flag & DOCIO_DELETED) {
2983            free_docio_object(&_doc, 0, alloced_meta, alloced_body);
2984            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
2985            return FDB_RESULT_KEY_NOT_FOUND;
2986        }
2987
2988        doc->seqnum = _doc.seqnum;
2989        doc->metalen = _doc.length.metalen;
2990        doc->bodylen = _doc.length.bodylen;
2991        doc->meta = _doc.meta;
2992        doc->body = _doc.body;
2993        doc->deleted = _doc.length.flag & DOCIO_DELETED;
2994        doc->size_ondisk = _fdb_get_docsize(_doc.length);
2995        doc->offset = offset;
2996
2997        LATENCY_STAT_END(handle->file, FDB_LATENCY_GETS);
2998        atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
2999        return FDB_RESULT_SUCCESS;
3000    }
3001
3002    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3003    return FDB_RESULT_KEY_NOT_FOUND;
3004}
3005
3006// search document metadata using key
3007LIBFDB_API
3008fdb_status fdb_get_metaonly(fdb_kvs_handle *handle, fdb_doc *doc)
3009{
3010    uint64_t offset;
3011    struct docio_object _doc;
3012    struct docio_handle *dhandle;
3013    struct filemgr *wal_file = NULL;
3014    fdb_status wr;
3015    hbtrie_result hr = HBTRIE_RESULT_FAIL;
3016    fdb_txn *txn;
3017    struct _fdb_key_cmp_info cmp_info;
3018    fdb_doc doc_kv;
3019    LATENCY_STAT_START();
3020
3021    if (!handle || !doc || !doc->key ||
3022        doc->keylen == 0 || doc->keylen > FDB_MAX_KEYLEN ||
3023        (handle->kvs_config.custom_cmp &&
3024            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
3025        return FDB_RESULT_INVALID_ARGS;
3026    }
3027
3028    doc_kv = *doc;
3029
3030    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
3031        return FDB_RESULT_HANDLE_BUSY;
3032    }
3033
3034    if (handle->kvs) {
3035        // multi KV instance mode
3036        int size_chunk = handle->config.chunksize;
3037        doc_kv.keylen = doc->keylen + size_chunk;
3038        doc_kv.key = alca(uint8_t, doc_kv.keylen);
3039        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
3040        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
3041    }
3042
3043    if (!handle->shandle) {
3044        fdb_check_file_reopen(handle, NULL);
3045        txn = handle->fhandle->root->txn;
3046        if (!txn) {
3047            txn = &handle->file->global_txn;
3048        }
3049    } else {
3050        txn = handle->shandle->snap_txn;
3051    }
3052
3053    cmp_info.kvs_config = handle->kvs_config;
3054    cmp_info.kvs = handle->kvs;
3055    wal_file = handle->file;
3056    dhandle = handle->dhandle;
3057
3058    if (handle->kvs) {
3059        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, &doc_kv,
3060                      &offset);
3061    } else {
3062        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, doc, &offset);
3063    }
3064
3065    if (!handle->shandle) {
3066        fdb_sync_db_header(handle);
3067    }
3068    atomic_incr_uint64_t(&handle->op_stats->num_gets, std::memory_order_relaxed);
3069
3070    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
3071        _fdb_sync_dirty_root(handle);
3072
3073        if (handle->kvs) {
3074            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
3075                             (void *)&offset);
3076        } else {
3077            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
3078                             (void *)&offset);
3079        }
3080        btreeblk_end(handle->bhandle);
3081        offset = _endian_decode(offset);
3082
3083        _fdb_release_dirty_root(handle);
3084    }
3085
3086    if ((wr == FDB_RESULT_SUCCESS && offset != BLK_NOT_FOUND) ||
3087         hr == HBTRIE_RESULT_SUCCESS) {
3088        if (handle->kvs) {
3089            _doc.key = doc_kv.key;
3090            _doc.length.keylen = doc_kv.keylen;
3091        } else {
3092            _doc.key = doc->key;
3093            _doc.length.keylen = doc->keylen;
3094        }
3095        bool alloced_meta = doc->meta ? false : true;
3096        _doc.meta = doc->meta;
3097        _doc.body = doc->body;
3098
3099        int64_t body_offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
3100                                                       true);
3101        if (body_offset <= 0){
3102            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3103            return body_offset < 0 ? (fdb_status)body_offset : FDB_RESULT_KEY_NOT_FOUND;
3104        }
3105
3106        if (_doc.length.keylen != doc_kv.keylen) {
3107            free_docio_object(&_doc, 0, alloced_meta, 0);
3108            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3109            return FDB_RESULT_KEY_NOT_FOUND;
3110        }
3111
3112        doc->seqnum = _doc.seqnum;
3113        doc->metalen = _doc.length.metalen;
3114        doc->bodylen = _doc.length.bodylen;
3115        doc->meta = _doc.meta;
3116        doc->body = _doc.body;
3117        doc->deleted = _doc.length.flag & DOCIO_DELETED;
3118        doc->size_ondisk = _fdb_get_docsize(_doc.length);
3119        doc->offset = offset;
3120
3121        LATENCY_STAT_END(handle->file, FDB_LATENCY_GETS);
3122        atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3123        return FDB_RESULT_SUCCESS;
3124    }
3125
3126    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3127    return FDB_RESULT_KEY_NOT_FOUND;
3128}
3129
3130// search document using sequence number
3131LIBFDB_API
3132fdb_status fdb_get_byseq(fdb_kvs_handle *handle, fdb_doc *doc)
3133{
3134    uint64_t offset;
3135    int64_t _offset;
3136    struct docio_object _doc;
3137    struct docio_handle *dhandle;
3138    struct filemgr *wal_file = NULL;
3139    fdb_status wr;
3140    btree_result br = BTREE_RESULT_FAIL;
3141    fdb_seqnum_t _seqnum;
3142    fdb_txn *txn;
3143    struct _fdb_key_cmp_info cmp_info;
3144    LATENCY_STAT_START();
3145
3146    if (!handle || !doc || doc->seqnum == SEQNUM_NOT_USED) {
3147        return FDB_RESULT_INVALID_ARGS;
3148    }
3149
3150    // Sequence trees are a must for byseq operations
3151    if (handle->config.seqtree_opt != FDB_SEQTREE_USE) {
3152        return FDB_RESULT_INVALID_CONFIG;
3153    }
3154
3155    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
3156        return FDB_RESULT_HANDLE_BUSY;
3157    }
3158
3159    if (!handle->shandle) {
3160        fdb_check_file_reopen(handle, NULL);
3161
3162        txn = handle->fhandle->root->txn;
3163        if (!txn) {
3164            txn = &handle->file->global_txn;
3165        }
3166    } else {
3167        txn = handle->shandle->snap_txn;
3168    }
3169
3170    cmp_info.kvs_config = handle->kvs_config;
3171    cmp_info.kvs = handle->kvs;
3172    wal_file = handle->file;
3173    dhandle = handle->dhandle;
3174
3175    // prevent searching by key in WAL if 'doc' is not empty
3176    size_t key_len = doc->keylen;
3177    doc->keylen = 0;
3178    if (handle->kvs) {
3179        wr = wal_find_kv_id(txn, wal_file, handle->kvs->id, &cmp_info,
3180                            handle->shandle, doc, &offset);
3181    } else {
3182        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, doc, &offset);
3183    }
3184
3185    doc->keylen =