xref: /6.6.0/forestdb/src/forestdb.cc (revision 04f1e58b)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <sys/time.h>
25#endif
26
27#include "libforestdb/forestdb.h"
28#include "fdb_internal.h"
29#include "filemgr.h"
30#include "hbtrie.h"
31#include "list.h"
32#include "breakpad.h"
33#include "btree.h"
34#include "btree_kv.h"
35#include "btree_var_kv_ops.h"
36#include "docio.h"
37#include "btreeblock.h"
38#include "common.h"
39#include "wal.h"
40#include "filemgr_ops.h"
41#include "configuration.h"
42#include "internal_types.h"
43#include "bgflusher.h"
44#include "compactor.h"
45#include "memleak.h"
46#include "time_utils.h"
47#include "timing.h"
48#include "system_resource_stats.h"
49#include "version.h"
50#include "staleblock.h"
51
52#ifdef __DEBUG
53#ifndef __DEBUG_FDB
54    #undef DBG
55    #undef DBGCMD
56    #undef DBGSW
57    #define DBG(...)
58    #define DBGCMD(...)
59    #define DBGSW(n, ...)
60#endif
61#endif
62
63
64static atomic_uint8_t fdb_initialized(0);
65static volatile uint32_t fdb_open_inprog = 0;
66#ifdef SPIN_INITIALIZER
67static spin_t initial_lock = SPIN_INITIALIZER;
68#else
69static volatile unsigned int initial_lock_status = 0;
70static spin_t initial_lock;
71#endif
72
73INLINE int _cmp_uint64_t_endian_safe(void *key1, void *key2, void *aux)
74{
75    (void) aux;
76    uint64_t a,b;
77    a = *(uint64_t*)key1;
78    b = *(uint64_t*)key2;
79    a = _endian_decode(a);
80    b = _endian_decode(b);
81    return _CMP_U64(a, b);
82}
83
84size_t _fdb_readkey_wrap(void *handle, uint64_t offset, void *buf)
85{
86    fdb_status fs;
87    keylen_t keylen;
88    struct docio_handle *dhandle = (struct docio_handle*)handle;
89
90    offset = _endian_decode(offset);
91    fs = docio_read_doc_key(dhandle, offset, &keylen, buf);
92    if (fs == FDB_RESULT_SUCCESS) {
93        return keylen;
94    } else {
95        const char *msg = "docio_read_doc_key error: read failure on "
96            "offset %" _F64 " in a database file '%s' "
97            ": FDB status %d, lastbid 0x%" _X64 ", "
98            "curblock 0x%" _X64 ", curpos 0x%x\n";
99        fdb_log(NULL, FDB_RESULT_READ_FAIL, msg, offset,
100                dhandle->file->filename, fs, dhandle->lastbid,
101                dhandle->curblock, dhandle->curpos);
102        dbg_print_buf(dhandle->readbuffer, dhandle->file->blocksize, true, 16);
103        return 0;
104    }
105}
106
107size_t _fdb_readseq_wrap(void *handle, uint64_t offset, void *buf)
108{
109    int size_id, size_seq, size_chunk;
110    fdb_seqnum_t _seqnum;
111    struct docio_object doc;
112    struct docio_handle *dhandle = (struct docio_handle *)handle;
113
114    size_id = sizeof(fdb_kvs_id_t);
115    size_seq = sizeof(fdb_seqnum_t);
116    size_chunk = dhandle->file->config->chunksize;
117    memset(&doc, 0, sizeof(struct docio_object));
118
119    offset = _endian_decode(offset);
120    if (docio_read_doc_key_meta((struct docio_handle *)handle, offset,
121                                &doc, true) <= 0) {
122        return 0;
123    }
124    buf2buf(size_chunk, doc.key, size_id, buf);
125    _seqnum = _endian_encode(doc.seqnum);
126    memcpy((uint8_t*)buf + size_id, &_seqnum, size_seq);
127
128    free(doc.key);
129    free(doc.meta);
130
131    return size_id + size_seq;
132}
133
134int _fdb_custom_cmp_wrap(void *key1, void *key2, void *aux)
135{
136    int is_key1_inf, is_key2_inf;
137    uint8_t *keystr1 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
138    uint8_t *keystr2 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
139    size_t keylen1, keylen2;
140    btree_cmp_args *args = (btree_cmp_args *)aux;
141    fdb_custom_cmp_variable cmp = (fdb_custom_cmp_variable)args->aux;
142
143    is_key1_inf = _is_inf_key(key1);
144    is_key2_inf = _is_inf_key(key2);
145    if (is_key1_inf && is_key2_inf) { // both are infinite
146        return 0;
147    } else if (!is_key1_inf && is_key2_inf) { // key2 is infinite
148        return -1;
149    } else if (is_key1_inf && !is_key2_inf) { // key1 is infinite
150        return 1;
151    }
152
153    _get_var_key(key1, (void*)keystr1, &keylen1);
154    _get_var_key(key2, (void*)keystr2, &keylen2);
155
156    if (keylen1 == 0 && keylen2 == 0) {
157        return 0;
158    } else if (keylen1 ==0 && keylen2 > 0) {
159        return -1;
160    } else if (keylen1 > 0 && keylen2 == 0) {
161        return 1;
162    }
163
164    return cmp(keystr1, keylen1, keystr2, keylen2);
165}
166
167void fdb_fetch_header(uint64_t version,
168                      void *header_buf,
169                      bid_t *trie_root_bid,
170                      bid_t *seq_root_bid,
171                      bid_t *stale_root_bid,
172                      uint64_t *ndocs,
173                      uint64_t *ndeletes,
174                      uint64_t *nlivenodes,
175                      uint64_t *datasize,
176                      uint64_t *last_wal_flush_hdr_bid,
177                      uint64_t *kv_info_offset,
178                      uint64_t *header_flags,
179                      char **new_filename,
180                      char **old_filename)
181{
182    size_t offset = 0;
183    uint16_t new_filename_len;
184    uint16_t old_filename_len;
185
186    seq_memcpy(trie_root_bid, (uint8_t *)header_buf + offset,
187               sizeof(bid_t), offset);
188    *trie_root_bid = _endian_decode(*trie_root_bid);
189
190    seq_memcpy(seq_root_bid, (uint8_t *)header_buf + offset,
191               sizeof(bid_t), offset);
192    *seq_root_bid = _endian_decode(*seq_root_bid);
193
194    if (ver_staletree_support(version)) {
195        seq_memcpy(stale_root_bid, (uint8_t *)header_buf + offset,
196                   sizeof(bid_t), offset);
197        *stale_root_bid = _endian_decode(*stale_root_bid);
198    } else {
199        *stale_root_bid = BLK_NOT_FOUND;
200    }
201
202    seq_memcpy(ndocs, (uint8_t *)header_buf + offset,
203               sizeof(uint64_t), offset);
204    *ndocs = _endian_decode(*ndocs);
205    if (ver_is_atleast_magic_001(version)) {
206        seq_memcpy(ndeletes, (uint8_t *)header_buf + offset,
207                   sizeof(uint64_t), offset);
208        *ndeletes = _endian_decode(*ndeletes);
209    } else {
210        *ndeletes = 0;
211    }
212
213    seq_memcpy(nlivenodes, (uint8_t *)header_buf + offset,
214               sizeof(uint64_t), offset);
215    *nlivenodes = _endian_decode(*nlivenodes);
216
217    seq_memcpy(datasize, (uint8_t *)header_buf + offset,
218               sizeof(uint64_t), offset);
219    *datasize = _endian_decode(*datasize);
220
221    seq_memcpy(last_wal_flush_hdr_bid, (uint8_t *)header_buf + offset,
222               sizeof(uint64_t), offset);
223    *last_wal_flush_hdr_bid = _endian_decode(*last_wal_flush_hdr_bid);
224
225    seq_memcpy(kv_info_offset, (uint8_t *)header_buf + offset,
226               sizeof(uint64_t), offset);
227    *kv_info_offset = _endian_decode(*kv_info_offset);
228
229    seq_memcpy(header_flags, (uint8_t *)header_buf + offset,
230               sizeof(uint64_t), offset);
231    *header_flags = _endian_decode(*header_flags);
232
233    seq_memcpy(&new_filename_len, (uint8_t *)header_buf + offset,
234               sizeof(new_filename_len), offset);
235    new_filename_len = _endian_decode(new_filename_len);
236    seq_memcpy(&old_filename_len, (uint8_t *)header_buf + offset,
237               sizeof(old_filename_len), offset);
238    old_filename_len = _endian_decode(old_filename_len);
239    if (new_filename_len) {
240        *new_filename = (char*)((uint8_t *)header_buf + offset);
241    } else {
242        *new_filename = NULL;
243    }
244    offset += new_filename_len;
245    if (old_filename && old_filename_len) {
246        *old_filename = (char *) malloc(old_filename_len);
247        seq_memcpy(*old_filename,
248                   (uint8_t *)header_buf + offset,
249                   old_filename_len, offset);
250    }
251}
252
253// read the revnum of the given header of BID
254INLINE filemgr_header_revnum_t _fdb_get_header_revnum(fdb_kvs_handle *handle, bid_t bid)
255{
256    uint8_t *buf = alca(uint8_t, handle->file->blocksize);
257    uint64_t version;
258    size_t header_len;
259    fdb_seqnum_t seqnum;
260    filemgr_header_revnum_t revnum = 0;
261    fdb_status fs;
262
263    fs = filemgr_fetch_header(handle->file, bid, buf, &header_len,
264                              &seqnum, &revnum, NULL, &version, NULL,
265                              &handle->log_callback);
266    if (fs != FDB_RESULT_SUCCESS) {
267        return 0;
268    }
269    return revnum;
270}
271
272INLINE filemgr_header_revnum_t _fdb_get_bmp_revnum(fdb_kvs_handle *handle, bid_t bid)
273{
274    uint8_t *buf = alca(uint8_t, handle->file->blocksize);
275    uint64_t version, bmp_revnum = 0;
276    size_t header_len;
277    fdb_seqnum_t seqnum;
278    filemgr_header_revnum_t revnum;
279    fdb_status fs;
280
281    fs = filemgr_fetch_header(handle->file, bid, buf, &header_len,
282                              &seqnum, &revnum, NULL, &version, &bmp_revnum,
283                              &handle->log_callback);
284    if (fs != FDB_RESULT_SUCCESS) {
285        return 0;
286    }
287    return bmp_revnum;
288}
289
290void fdb_dummy_log_callback(int err_code, const char *err_msg, void *ctx_data)
291{
292    (void)err_code;
293    (void)err_msg;
294    (void)ctx_data;
295    return;
296}
297
298INLINE void _fdb_restore_wal(fdb_kvs_handle *handle,
299                             fdb_restore_mode_t mode,
300                             bid_t hdr_bid,
301                             fdb_kvs_id_t kv_id_req)
302{
303    struct filemgr *file = handle->file;
304    uint32_t blocksize = handle->file->blocksize;
305    uint64_t last_wal_flush_hdr_bid = handle->last_wal_flush_hdr_bid;
306    uint64_t hdr_off = hdr_bid * FDB_BLOCKSIZE;
307    uint64_t offset = 0; //assume everything from first block needs restoration
308    uint64_t filesize = filemgr_get_pos(handle->file);
309    uint64_t doc_scan_limit;
310    uint64_t start_bmp_revnum, stop_bmp_revnum;
311    uint64_t cur_bmp_revnum = (uint64_t)-1;
312    bid_t next_doc_block = BLK_NOT_FOUND;
313    struct _fdb_key_cmp_info cmp_info;
314    err_log_callback *log_callback;
315
316    if (mode == FDB_RESTORE_NORMAL && !handle->shandle &&
317        !wal_try_restore(handle->file)) { // Atomically try to restore WAL
318        // Some other thread or previous open had successfully initialized WAL
319        // We can simply return here
320        return;
321    }
322
323    if (!hdr_off) { // Nothing to do if we don't have a header block offset
324        return;
325    }
326
327    if (last_wal_flush_hdr_bid != BLK_NOT_FOUND) {
328        offset = (last_wal_flush_hdr_bid + 1) * blocksize;
329    }
330
331    // If a valid last header was retrieved and it matches the current header
332    if (hdr_off == offset || hdr_bid == last_wal_flush_hdr_bid) {
333        return; // No WAL section in the file
334    }
335
336    if (mode == FDB_RESTORE_NORMAL && !handle->shandle) {
337        // for normal WAL restore, set status to dirty
338        // (only when the previous status is clean or dirty)
339        wal_set_dirty_status(handle->file, FDB_WAL_DIRTY, true);
340    }
341
342    // Temporarily disable the error logging callback as there are false positive
343    // checksum errors in docio_read_doc.
344    // TODO: Need to adapt docio_read_doc to separate false checksum errors.
345    err_log_callback dummy_cb;
346    log_callback = handle->dhandle->log_callback;
347    dummy_cb.callback = fdb_dummy_log_callback;
348    dummy_cb.ctx_data = NULL;
349    handle->dhandle->log_callback = &dummy_cb;
350
351    if (!handle->shandle) {
352        filemgr_mutex_lock(file);
353    }
354    cmp_info.kvs_config = handle->kvs_config;
355    cmp_info.kvs = handle->kvs;
356
357    start_bmp_revnum = _fdb_get_bmp_revnum(handle, last_wal_flush_hdr_bid);
358    stop_bmp_revnum= _fdb_get_bmp_revnum(handle, hdr_bid);
359    cur_bmp_revnum = start_bmp_revnum;
360
361    // A: reused blocks during the 1st block reclaim (bmp_revnum: 1)
362    // B: reused blocks during the 2nd block reclaim (bmp_revnum: 2)
363    // otherwise: live block (bmp_revnum: 0)
364    //  1 2   3    4    5 6  7  8   9  10
365    // +-------------------------------------------+
366    // |  BBBBAAAAABBBBB  AAABBB    AAA            |
367    // +-------------------------------------------+
368    //              ^                     ^
369    //              hdr_bid               last_wal_flush
370    //
371    // scan order: 1 -> 5 -> 8 -> 10 -> 3 -> 6 -> 9 -> 2 -> 4 -> 7
372    // iteration #1: scan docs with bmp_revnum==0 in [last_wal_flush ~ filesize]
373    // iteration #2: scan docs with bmp_revnum==1 in [0 ~ filesize]
374    // iteration #3: scan docs with bmp_revnum==2 in [0 ~ hdr_bid]
375
376    do {
377        if (cur_bmp_revnum > stop_bmp_revnum) {
378            break;
379        } else if (cur_bmp_revnum == stop_bmp_revnum) {
380
381            bid_t sb_last_hdr_bid = BLK_NOT_FOUND;
382            if (handle->file->sb) {
383                sb_last_hdr_bid = atomic_get_uint64_t(&handle->file->sb->last_hdr_bid);
384            }
385            if (!handle->shandle && handle->file->sb &&
386                sb_last_hdr_bid != BLK_NOT_FOUND) {
387                hdr_off = (sb_last_hdr_bid+1) * blocksize;
388            }
389
390            doc_scan_limit = hdr_off;
391            if (offset >= hdr_off) {
392                break;
393            }
394        } else {
395            doc_scan_limit = filesize;
396        }
397
398        if (!docio_check_buffer(handle->dhandle, offset / blocksize,
399                                cur_bmp_revnum)) {
400            // not a document block .. move to next block
401        } else {
402            do {
403                struct docio_object doc;
404                int64_t _offset;
405                uint64_t doc_offset;
406                memset(&doc, 0, sizeof(doc));
407                _offset = docio_read_doc(handle->dhandle, offset, &doc, true);
408                if (_offset <= 0) { // reached unreadable doc, skip block
409                    // TODO: Need to have this function return fdb_status, so that
410                    // WAL restore operation should fail if offset < 0
411                    break;
412                } else if ((uint64_t)_offset < offset) {
413                    // If more than one writer is appending docs concurrently,
414                    // they have their own doc block linked list and doc blocks
415                    // may not be consecutive. For example,
416                    //
417                    // Writer 1): 100 -> 102 -> 2 -> 4     | commit
418                    // Writer 2):    101 - > 103 -> 3 -> 5 |
419                    //
420                    // In this case, if we read doc BID 102, then 'offset' will jump
421                    // to doc BID 2, without reading BID 103.
422                    //
423                    // To address this issue, in case that 'offset' decreases,
424                    // remember the next doc block, and follow the doc linked list
425                    // first. After the linked list ends, 'offset' cursor will be
426                    // reset to 'next_doc_block'.
427                    next_doc_block = (offset / blocksize) + 1;
428                }
429                if (doc.key || (doc.length.flag & DOCIO_TXN_COMMITTED)) {
430                    // check if the doc is transactional or not, and
431                    // also check if the doc contains system info
432                    if (!(doc.length.flag & DOCIO_TXN_DIRTY) &&
433                        !(doc.length.flag & DOCIO_SYSTEM)) {
434                        if (doc.length.flag & DOCIO_TXN_COMMITTED) {
435                            // commit mark .. read doc offset
436                            doc_offset = doc.doc_offset;
437                            // read the previously skipped doc
438                            if (docio_read_doc(handle->dhandle, doc_offset, &doc, true) <= 0) {
439                                // doc read error
440                                free(doc.key);
441                                free(doc.meta);
442                                free(doc.body);
443                                offset = _offset;
444                                continue;
445                            }
446                        } else {
447                            doc_offset = offset;
448                        }
449
450                        // If say a snapshot is taken on a db handle after
451                        // rollback, then skip WAL items after rollback point
452                        if ((mode == FDB_RESTORE_KV_INS || !handle->kvs) &&
453                            doc.seqnum > handle->seqnum) {
454                            free(doc.key);
455                            free(doc.meta);
456                            free(doc.body);
457                            offset = _offset;
458                            continue;
459                        }
460
461                        // restore document
462                        fdb_doc wal_doc;
463                        wal_doc.keylen = doc.length.keylen;
464                        wal_doc.bodylen = doc.length.bodylen;
465                        wal_doc.key = doc.key;
466                        wal_doc.seqnum = doc.seqnum;
467                        wal_doc.deleted = doc.length.flag & DOCIO_DELETED;
468
469                        if (!handle->shandle) {
470                            wal_doc.metalen = doc.length.metalen;
471                            wal_doc.meta = doc.meta;
472                            wal_doc.size_ondisk = _fdb_get_docsize(doc.length);
473
474                            if (handle->kvs) {
475                                // check seqnum before insert
476                                fdb_kvs_id_t kv_id;
477                                fdb_seqnum_t kv_seqnum;
478                                buf2kvid(handle->config.chunksize,
479                                         wal_doc.key, &kv_id);
480
481                                kv_seqnum = fdb_kvs_get_seqnum(handle->file, kv_id);
482                                if (doc.seqnum <= kv_seqnum &&
483                                        ((mode == FDB_RESTORE_KV_INS &&
484                                            kv_id == kv_id_req) ||
485                                         (mode == FDB_RESTORE_NORMAL)) ) {
486                                    // if mode is NORMAL, restore all items
487                                    // if mode is KV_INS, restore items matching ID
488                                    wal_insert(&file->global_txn, file, &cmp_info,
489                                               &wal_doc, doc_offset,
490                                               WAL_INS_WRITER);
491                                }
492                            } else {
493                                wal_insert(&file->global_txn, file, &cmp_info,
494                                           &wal_doc, doc_offset,
495                                           WAL_INS_WRITER);
496                            }
497                            if (doc.key) free(doc.key);
498                        } else {
499                            // snapshot
500                            if (handle->kvs) {
501                                fdb_kvs_id_t kv_id;
502                                buf2kvid(handle->config.chunksize,
503                                         wal_doc.key, &kv_id);
504                                if (kv_id == handle->kvs->id) {
505                                    // snapshot: insert ID matched documents only
506                                    wal_snap_insert(handle->shandle,
507                                                    &wal_doc, doc_offset);
508                                } else {
509                                    free(doc.key);
510                                }
511                            } else {
512                                wal_snap_insert(handle->shandle, &wal_doc,
513                                                doc_offset);
514                            }
515                        }
516                        free(doc.meta);
517                        free(doc.body);
518                        offset = _offset;
519                    } else {
520                        // skip transactional document or system document
521                        free(doc.key);
522                        free(doc.meta);
523                        free(doc.body);
524                        offset = _offset;
525                        // do not break.. read next doc
526                    }
527                } else {
528                    free(doc.key);
529                    free(doc.meta);
530                    free(doc.body);
531                    offset = _offset;
532                    break;
533                }
534            } while (offset + sizeof(struct docio_length) < doc_scan_limit);
535        }
536
537        if (next_doc_block != BLK_NOT_FOUND) {
538            offset = next_doc_block * blocksize;
539            next_doc_block = BLK_NOT_FOUND;
540        } else {
541            offset = ((offset / blocksize) + 1) * blocksize;
542        }
543        if (ver_superblock_support(handle->file->version) &&
544            offset >= filesize) {
545            // circular scan
546            struct superblock *sb = handle->file->sb;
547            if (sb && sb->config) {
548                offset = blocksize * sb->config->num_sb;
549                cur_bmp_revnum++;
550            }
551        }
552    } while(true);
553
554    // wal commit
555    if (!handle->shandle) {
556        wal_commit(&file->global_txn, file, NULL, &handle->log_callback);
557        filemgr_mutex_unlock(file);
558    }
559    handle->dhandle->log_callback = log_callback;
560}
561
562INLINE fdb_status _fdb_recover_compaction(fdb_kvs_handle *handle,
563                                          const char *new_filename)
564{
565    fdb_kvs_handle new_db;
566    fdb_config config = handle->config;
567    struct filemgr *new_file;
568
569    // As partially compacted file may contain various errors,
570    // we temporarily disable log callback for compaction recovery.
571    memset(&new_db, 0, sizeof(new_db));
572    new_db.log_callback.callback = NULL;
573    new_db.log_callback.ctx_data = NULL;
574    config.flags |= FDB_OPEN_FLAG_RDONLY;
575    new_db.fhandle = handle->fhandle;
576    new_db.kvs_config = handle->kvs_config;
577    fdb_status status = _fdb_open(&new_db, new_filename,
578                                  FDB_AFILENAME, &config);
579    if (status != FDB_RESULT_SUCCESS) {
580        return fdb_log(&handle->log_callback, status,
581                       "Error in opening a partially compacted file '%s' for recovery.",
582                       new_filename);
583    }
584
585    new_file = new_db.file;
586
587    if (new_file->old_filename &&
588        !strncmp(new_file->old_filename, handle->file->filename,
589                 FDB_MAX_FILENAME_LEN)) {
590        struct filemgr *old_file = handle->file;
591        // If new file has a recorded old_filename then it means that
592        // compaction has completed successfully. Mark self for deletion
593        filemgr_mutex_lock(new_file);
594
595        status = btreeblk_end(handle->bhandle);
596        if (status != FDB_RESULT_SUCCESS) {
597            filemgr_mutex_unlock(new_file);
598            _fdb_close(&new_db);
599            return status;
600        }
601        btreeblk_free(handle->bhandle);
602        free(handle->bhandle);
603        handle->bhandle = new_db.bhandle;
604
605        docio_free(handle->dhandle);
606        free(handle->dhandle);
607        handle->dhandle = new_db.dhandle;
608
609        hbtrie_free(handle->trie);
610        free(handle->trie);
611        handle->trie = new_db.trie;
612
613        wal_shutdown(handle->file, &handle->log_callback);
614        handle->file = new_file;
615
616        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
617            if (handle->kvs) {
618                // multi KV instance mode
619                hbtrie_free(handle->seqtrie);
620                free(handle->seqtrie);
621                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
622                    handle->seqtrie = new_db.seqtrie;
623                }
624            } else {
625                free(handle->seqtree->kv_ops);
626                free(handle->seqtree);
627                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
628                    handle->seqtree = new_db.seqtree;
629                }
630            }
631        }
632        handle->staletree = new_db.staletree;
633
634        filemgr_mutex_unlock(new_file);
635        if (new_db.kvs) {
636            fdb_kvs_info_free(&new_db);
637        }
638        fdb_log(&handle->log_callback, FDB_RESULT_FAIL_BY_COMPACTION,
639                "Successfully used partially compacted file '%s' for recovery replacing old file %s.",
640                new_filename, new_file->old_filename);
641        // remove self: WARNING must not close this handle if snapshots
642        // are yet to open this file
643        filemgr_remove_pending(old_file, new_db.file, &new_db.log_callback);
644        filemgr_close(old_file, 0, handle->filename, &handle->log_callback);
645        free(new_db.filename);
646        return FDB_RESULT_FAIL_BY_COMPACTION;
647    }
648
649    // As the new file is partially compacted, it should be removed upon close.
650    // Just in-case the new file gets opened before removal, point it to the old
651    // file to ensure availability of data.
652    fdb_log(&handle->log_callback, FDB_RESULT_SUCCESS,
653            "Partially compacted file '%s' could not be used for recovery. Using old file %s.",
654                new_filename, handle->file->filename);
655    filemgr_remove_pending(new_db.file, handle->file, &handle->log_callback);
656    _fdb_close(&new_db);
657
658    return FDB_RESULT_SUCCESS;
659}
660
661#ifndef SPIN_INITIALIZER
662INLINE void init_initial_lock_status() {
663    // Note that only Windows passes through this routine
664    if (!fdb_initialized) {
665        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
666            // atomically initialize spin lock only once
667            spin_init(&initial_lock);
668            initial_lock_status = 2;
669        } else {
670            // the others .. wait until initializing 'initial_lock' is done
671            // TODO: Need to devise a better way of synchronization on Windows
672            while (initial_lock_status != 2) {
673                Sleep(1);
674            }
675        }
676    }
677}
678#endif
679
680LIBFDB_API
681fdb_status fdb_init(fdb_config *config)
682{
683    fdb_config _config;
684    compactor_config c_config;
685    bgflusher_config bgf_config;
686    struct filemgr_config f_config;
687
688    if (config) {
689        if (validate_fdb_config(config)) {
690            _config = *config;
691        } else {
692            return FDB_RESULT_INVALID_CONFIG;
693        }
694    } else {
695        _config = get_default_config();
696    }
697
698    // global initialization
699    // initialized only once at first time
700    if (!fdb_initialized) {
701
702#ifndef SPIN_INITIALIZER
703        init_initial_lock_status();
704#endif
705
706    }
707    spin_lock(&initial_lock);
708    if (!fdb_initialized) {
709#if !defined(_ANDROID_) && !defined(__ANDROID__)
710        // Some Android devices (e.g., Nexus 6) return incorrect RAM size.
711        // We temporarily disable validity checking of block cache size
712        // on Android platform at this time.
713        double ram_size = (double) get_memory_size();
714        if (ram_size * BCACHE_MEMORY_THRESHOLD < (double) _config.buffercache_size) {
715            spin_unlock(&initial_lock);
716            return FDB_RESULT_TOO_BIG_BUFFER_CACHE;
717        }
718#endif
719        // initialize file manager and block cache
720        f_config.blocksize = _config.blocksize;
721        f_config.ncacheblock = _config.buffercache_size / _config.blocksize;
722        f_config.seqtree_opt = _config.seqtree_opt;
723        filemgr_init(&f_config);
724        filemgr_set_lazy_file_deletion(true,
725                                       compactor_register_file_removing,
726                                       compactor_is_file_removed);
727        if (ver_superblock_support(ver_get_latest_magic())) {
728            struct sb_ops sb_ops = {sb_init, sb_get_default_config,
729                                    sb_read_latest, sb_alloc_block,
730                                    sb_bmp_is_writable, sb_get_bmp_revnum,
731                                    sb_get_min_live_revnum, sb_free};
732            filemgr_set_sb_operation(sb_ops);
733            sb_bmp_mask_init();
734        }
735
736        // initialize compaction daemon
737        c_config.sleep_duration = _config.compactor_sleep_duration;
738        c_config.num_threads = _config.num_compactor_threads;
739        compactor_init(&c_config);
740        // initialize background flusher daemon
741        // Temporarily disable background flushers until blockcache contention
742        // issue is resolved.
743        bgf_config.num_threads = 0; //_config.num_bgflusher_threads;
744        bgflusher_init(&bgf_config);
745
746        // Initialize breakpad
747        _dbg_handle_crashes(config->breakpad_minidump_dir);
748
749        fdb_initialized = 1;
750    }
751    spin_unlock(&initial_lock);
752
753    return FDB_RESULT_SUCCESS;
754}
755
756LIBFDB_API
757fdb_config fdb_get_default_config(void) {
758    return get_default_config();
759}
760
761LIBFDB_API
762fdb_kvs_config fdb_get_default_kvs_config(void) {
763    return get_default_kvs_config();
764}
765
766LIBFDB_API
767fdb_status fdb_open(fdb_file_handle **ptr_fhandle,
768                    const char *filename,
769                    fdb_config *fconfig)
770{
771#ifdef _MEMPOOL
772    mempool_init();
773#endif
774
775    fdb_config config;
776    fdb_file_handle *fhandle;
777    fdb_kvs_handle *handle;
778    LATENCY_STAT_START();
779
780    if (fconfig) {
781        if (validate_fdb_config(fconfig)) {
782            config = *fconfig;
783        } else {
784            return FDB_RESULT_INVALID_CONFIG;
785        }
786    } else {
787        config = get_default_config();
788    }
789
790    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
791    if (!fhandle) { // LCOV_EXCL_START
792        return FDB_RESULT_ALLOC_FAIL;
793    } // LCOV_EXCL_STOP
794
795    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
796    if (!handle) { // LCOV_EXCL_START
797        free(fhandle);
798        return FDB_RESULT_ALLOC_FAIL;
799    } // LCOV_EXCL_STOP
800
801#ifndef SPIN_INITIALIZER
802    init_initial_lock_status();
803#endif
804
805    spin_lock(&initial_lock);
806    fdb_open_inprog++;
807    spin_unlock(&initial_lock);
808
809    atomic_init_uint8_t(&handle->handle_busy, 0);
810    handle->shandle = NULL;
811    handle->kvs_config = get_default_kvs_config();
812
813    fdb_status fs = fdb_init(fconfig);
814    if (fs != FDB_RESULT_SUCCESS) {
815        free(handle);
816        free(fhandle);
817        spin_lock(&initial_lock);
818        fdb_open_inprog--;
819        spin_unlock(&initial_lock);
820        return fs;
821    }
822    fdb_file_handle_init(fhandle, handle);
823
824    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
825    if (fs == FDB_RESULT_SUCCESS) {
826        *ptr_fhandle = fhandle;
827        filemgr_fhandle_add(handle->file, fhandle);
828        LATENCY_STAT_END(handle->file, FDB_LATENCY_OPEN);
829    } else {
830        *ptr_fhandle = NULL;
831        free(handle);
832        fdb_file_handle_free(fhandle);
833    }
834    spin_lock(&initial_lock);
835    fdb_open_inprog--;
836    spin_unlock(&initial_lock);
837    return fs;
838}
839
840LIBFDB_API
841fdb_status fdb_open_custom_cmp(fdb_file_handle **ptr_fhandle,
842                               const char *filename,
843                               fdb_config *fconfig,
844                               size_t num_functions,
845                               char **kvs_names,
846                               fdb_custom_cmp_variable *functions)
847{
848#ifdef _MEMPOOL
849    mempool_init();
850#endif
851
852    fdb_config config;
853    fdb_file_handle *fhandle;
854    fdb_kvs_handle *handle;
855
856    if (fconfig) {
857        if (validate_fdb_config(fconfig)) {
858            config = *fconfig;
859        } else {
860            return FDB_RESULT_INVALID_CONFIG;
861        }
862    } else {
863        config = get_default_config();
864    }
865
866    if (config.multi_kv_instances == false) {
867        // single KV instance mode does not support customized cmp function
868        return FDB_RESULT_INVALID_CONFIG;
869    }
870
871    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
872    if (!fhandle) { // LCOV_EXCL_START
873        return FDB_RESULT_ALLOC_FAIL;
874    } // LCOV_EXCL_STOP
875
876    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
877    if (!handle) { // LCOV_EXCL_START
878        free(fhandle);
879        return FDB_RESULT_ALLOC_FAIL;
880    } // LCOV_EXCL_STOP
881
882#ifndef SPIN_INITIALIZER
883    init_initial_lock_status();
884#endif
885
886    spin_lock(&initial_lock);
887    fdb_open_inprog++;
888    spin_unlock(&initial_lock);
889
890    atomic_init_uint8_t(&handle->handle_busy, 0);
891    handle->shandle = NULL;
892    handle->kvs_config = get_default_kvs_config();
893
894    fdb_status fs = fdb_init(fconfig);
895    if (fs != FDB_RESULT_SUCCESS) {
896        free(handle);
897        free(fhandle);
898        spin_lock(&initial_lock);
899        fdb_open_inprog--;
900        spin_unlock(&initial_lock);
901        return fs;
902    }
903    fdb_file_handle_init(fhandle, handle);
904
905    // insert kvs_names and functions into fhandle's list
906    fdb_file_handle_parse_cmp_func(fhandle, num_functions,
907                                   kvs_names, functions);
908
909    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
910    if (fs == FDB_RESULT_SUCCESS) {
911        *ptr_fhandle = fhandle;
912        filemgr_fhandle_add(handle->file, fhandle);
913    } else {
914        *ptr_fhandle = NULL;
915        free(handle);
916        fdb_file_handle_free(fhandle);
917    }
918    spin_lock(&initial_lock);
919    fdb_open_inprog--;
920    spin_unlock(&initial_lock);
921    return fs;
922}
923
924fdb_status fdb_open_for_compactor(fdb_file_handle **ptr_fhandle,
925                                  const char *filename,
926                                  fdb_config *fconfig,
927                                  struct list *cmp_func_list)
928{
929#ifdef _MEMPOOL
930    mempool_init();
931#endif
932
933    fdb_file_handle *fhandle;
934    fdb_kvs_handle *handle;
935
936    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
937    if (!fhandle) { // LCOV_EXCL_START
938        return FDB_RESULT_ALLOC_FAIL;
939    } // LCOV_EXCL_STOP
940
941    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
942    if (!handle) { // LCOV_EXCL_START
943        free(fhandle);
944        return FDB_RESULT_ALLOC_FAIL;
945    } // LCOV_EXCL_STOP
946
947    atomic_init_uint8_t(&handle->handle_busy, 0);
948    handle->shandle = NULL;
949
950    fdb_file_handle_init(fhandle, handle);
951    if (cmp_func_list && list_begin(cmp_func_list)) {
952        fdb_file_handle_clone_cmp_func_list(fhandle, cmp_func_list);
953    }
954    fdb_status fs = _fdb_open(handle, filename, FDB_VFILENAME, fconfig);
955    if (fs == FDB_RESULT_SUCCESS) {
956        *ptr_fhandle = fhandle;
957        filemgr_fhandle_add(handle->file, fhandle);
958    } else {
959        *ptr_fhandle = NULL;
960        free(handle);
961        fdb_file_handle_free(fhandle);
962    }
963    return fs;
964}
965
966LIBFDB_API
967fdb_status fdb_snapshot_open(fdb_kvs_handle *handle_in,
968                             fdb_kvs_handle **ptr_handle, fdb_seqnum_t seqnum)
969{
970#ifdef _MEMPOOL
971    mempool_init();
972#endif
973
974    if (!handle_in || !ptr_handle) {
975        return FDB_RESULT_INVALID_HANDLE;
976    }
977
978    fdb_config config = handle_in->config;
979    fdb_kvs_config kvs_config = handle_in->kvs_config;
980    fdb_kvs_id_t kv_id = 0;
981    fdb_kvs_handle *handle;
982    fdb_txn *txn = NULL;
983    fdb_status fs = FDB_RESULT_SUCCESS;
984    filemgr *file;
985    file_status_t fstatus = FILE_NORMAL;
986    struct snap_handle dummy_shandle;
987    struct _fdb_key_cmp_info cmp_info;
988    LATENCY_STAT_START();
989
990fdb_snapshot_open_start:
991    if (!handle_in->shandle) {
992        fdb_check_file_reopen(handle_in, &fstatus);
993        fdb_sync_db_header(handle_in);
994        file = handle_in->file;
995
996        if (handle_in->kvs && handle_in->kvs->type == KVS_SUB) {
997            handle_in->seqnum = fdb_kvs_get_seqnum(file,
998                                                   handle_in->kvs->id);
999        } else {
1000            handle_in->seqnum = filemgr_get_seqnum(file);
1001        }
1002    } else {
1003        file = handle_in->file;
1004    }
1005
1006    // if the max sequence number seen by this handle is lower than the
1007    // requested snapshot marker, it means the snapshot is not yet visible
1008    // even via the current fdb_kvs_handle
1009    if (seqnum != FDB_SNAPSHOT_INMEM && seqnum > handle_in->seqnum) {
1010        return FDB_RESULT_NO_DB_INSTANCE;
1011    }
1012
1013    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1014    if (!handle) { // LCOV_EXCL_START
1015        return FDB_RESULT_ALLOC_FAIL;
1016    } // LCOV_EXCL_STOP
1017
1018    atomic_init_uint8_t(&handle->handle_busy, 0);
1019    handle->log_callback = handle_in->log_callback;
1020    handle->max_seqnum = seqnum;
1021    handle->fhandle = handle_in->fhandle;
1022
1023    config.flags |= FDB_OPEN_FLAG_RDONLY;
1024    // do not perform compaction for snapshot
1025    config.compaction_mode = FDB_COMPACTION_MANUAL;
1026
1027    // If cloning an existing snapshot handle, then rewind indexes
1028    // to its last DB header and point its avl tree to existing snapshot's tree
1029    bool clone_snapshot = false;
1030    if (handle_in->shandle) {
1031        atomic_store_uint64_t(&handle->last_hdr_bid,  // do fast rewind
1032                              atomic_get_uint64_t(&handle_in->last_hdr_bid));
1033        fs = wal_snapshot_clone(handle_in->shandle, &handle->shandle, seqnum);
1034        if (fs == FDB_RESULT_SUCCESS) {
1035            clone_snapshot = true;
1036            handle->max_seqnum = FDB_SNAPSHOT_INMEM; // temp value to skip WAL
1037        } else {
1038            fdb_log(&handle_in->log_callback, fs,
1039                    "Warning: Snapshot clone at sequence number %" _F64
1040                    "does not match its snapshot handle %" _F64
1041                    "in file '%s'.", seqnum, handle_in->seqnum,
1042                    handle_in->file->filename);
1043            free(handle);
1044            return fs;
1045        }
1046    }
1047
1048    cmp_info.kvs_config = handle_in->kvs_config;
1049    cmp_info.kvs = handle_in->kvs;
1050
1051    if (!handle->shandle) {
1052        txn = handle_in->fhandle->root->txn;
1053        if (!txn) {
1054            txn = &handle_in->file->global_txn;
1055        }
1056        if (handle_in->kvs) {
1057            kv_id = handle_in->kvs->id;
1058        }
1059        if (seqnum == FDB_SNAPSHOT_INMEM) {
1060            memset(&dummy_shandle, 0, sizeof(struct snap_handle));
1061            // tmp value to denote snapshot & not rollback to _fdb_open
1062            handle->shandle = &dummy_shandle; // dummy
1063        } else {
1064            fs = wal_dur_snapshot_open(seqnum, &cmp_info, file, txn,
1065                                       &handle->shandle);
1066        }
1067        if (fs != FDB_RESULT_SUCCESS) {
1068            free(handle);
1069            return fs;
1070        }
1071    }
1072
1073    if (handle_in->kvs) {
1074        // sub-handle in multi KV instance mode
1075        if (clone_snapshot) {
1076            fs = _fdb_kvs_clone_snapshot(handle_in, handle);
1077        } else {
1078            fs = _fdb_kvs_open(handle_in->kvs->root,
1079                              &config, &kvs_config, file,
1080                              file->filename,
1081                              _fdb_kvs_get_name(handle_in, file),
1082                              handle);
1083        }
1084    } else {
1085        if (clone_snapshot) {
1086            fs = _fdb_clone_snapshot(handle_in, handle);
1087        } else {
1088            fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1089        }
1090    }
1091
1092    if (fs == FDB_RESULT_SUCCESS) {
1093        if (seqnum == FDB_SNAPSHOT_INMEM &&
1094            !handle_in->shandle) {
1095            handle->max_seqnum = handle_in->seqnum;
1096
1097            // synchronize dirty root nodes if exist
1098            bid_t dirty_idtree_root = BLK_NOT_FOUND;
1099            bid_t dirty_seqtree_root = BLK_NOT_FOUND;
1100            struct filemgr_dirty_update_node *dirty_update;
1101
1102            dirty_update = filemgr_dirty_update_get_latest(handle->file);
1103            btreeblk_set_dirty_update(handle->bhandle, dirty_update);
1104
1105            if (dirty_update) {
1106                filemgr_dirty_update_get_root(handle->file, dirty_update,
1107                                       &dirty_idtree_root, &dirty_seqtree_root);
1108                _fdb_import_dirty_root(handle, dirty_idtree_root,
1109                                       dirty_seqtree_root);
1110                btreeblk_discard_blocks(handle->bhandle);
1111            }
1112            // Having synced the dirty root, make an in-memory WAL snapshot
1113            // TODO: Re-enable WAL sharing once ready...
1114#ifdef _MVCC_WAL_ENABLE
1115            fs = wal_snapshot_open(handle->file, txn, kv_id, seqnum,
1116                                   &cmp_info, &handle->shandle);
1117#else
1118            fs = wal_dur_snapshot_open(handle->seqnum, &cmp_info, file, txn,
1119                                       &handle->shandle);
1120            if (fs == FDB_RESULT_SUCCESS) {
1121                fs = wal_copyto_snapshot(file, handle->shandle,
1122                                        (bool)handle_in->kvs);
1123            }
1124            (void)kv_id;
1125#endif // _MVCC_WAL_ENABLE
1126        } else if (clone_snapshot) {
1127            // Snapshot is created on the other snapshot handle
1128
1129            handle->max_seqnum = handle_in->seqnum;
1130
1131            if (seqnum == FDB_SNAPSHOT_INMEM) {
1132                // in-memory snapshot
1133                // Clone dirty root nodes from the source snapshot by incrementing
1134                // their ref counters
1135                handle->trie->root_bid = handle_in->trie->root_bid;
1136                if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1137                    if (handle->kvs) {
1138                        handle->seqtrie->root_bid = handle_in->seqtrie->root_bid;
1139                    } else {
1140                        handle->seqtree->root_bid = handle_in->seqtree->root_bid;
1141                    }
1142                }
1143                btreeblk_discard_blocks(handle->bhandle);
1144
1145                // increase ref count for dirty update
1146                struct filemgr_dirty_update_node *dirty_update;
1147                dirty_update = btreeblk_get_dirty_update(handle_in->bhandle);
1148                filemgr_dirty_update_inc_ref_count(dirty_update);
1149                btreeblk_set_dirty_update(handle->bhandle, dirty_update);
1150            }
1151        }
1152        *ptr_handle = handle;
1153    } else {
1154        *ptr_handle = NULL;
1155        if (clone_snapshot || seqnum != FDB_SNAPSHOT_INMEM) {
1156            wal_snapshot_close(handle->shandle, handle->file);
1157        }
1158        free(handle);
1159        // If compactor thread had finished compaction just before this routine
1160        // calls _fdb_open, then it is possible that the snapshot's DB header
1161        // is only present in the new_file. So we must retry the snapshot
1162        // open attempt IFF _fdb_open indicates FDB_RESULT_NO_DB_INSTANCE..
1163        if (fs == FDB_RESULT_NO_DB_INSTANCE && fstatus == FILE_COMPACT_OLD) {
1164            if (filemgr_get_file_status(file) == FILE_REMOVED_PENDING) {
1165                goto fdb_snapshot_open_start;
1166            }
1167        }
1168    }
1169
1170    if (handle_in->shandle) {
1171        LATENCY_STAT_END(file, FDB_LATENCY_SNAP_CLONE);
1172    } else if (seqnum == FDB_SNAPSHOT_INMEM) {
1173        LATENCY_STAT_END(file, FDB_LATENCY_SNAP_INMEM);
1174    } else {
1175        LATENCY_STAT_END(file, FDB_LATENCY_SNAP_DUR);
1176    }
1177    return fs;
1178}
1179
1180static fdb_status _fdb_reset(fdb_kvs_handle *handle, fdb_kvs_handle *handle_in);
1181
1182LIBFDB_API
1183fdb_status fdb_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
1184{
1185#ifdef _MEMPOOL
1186    mempool_init();
1187#endif
1188
1189    fdb_config config;
1190    fdb_kvs_handle *handle_in, *handle;
1191    fdb_status fs;
1192    fdb_seqnum_t old_seqnum;
1193
1194    if (!handle_ptr) {
1195        return FDB_RESULT_INVALID_HANDLE;
1196    }
1197
1198    handle_in = *handle_ptr;
1199
1200    if (!handle_in) {
1201        return FDB_RESULT_INVALID_HANDLE;
1202    }
1203
1204    config = handle_in->config;
1205
1206    if (handle_in->kvs) {
1207        return fdb_kvs_rollback(handle_ptr, seqnum);
1208    }
1209
1210    if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
1211        return fdb_log(&handle_in->log_callback, FDB_RESULT_RONLY_VIOLATION,
1212                       "Warning: Rollback is not allowed on the read-only DB file '%s'.",
1213                       handle_in->file->filename);
1214    }
1215
1216    if (!atomic_cas_uint8_t(&handle_in->handle_busy, 0, 1)) {
1217        return FDB_RESULT_HANDLE_BUSY;
1218    }
1219
1220    filemgr_mutex_lock(handle_in->file);
1221    filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
1222    // All transactions should be closed before rollback
1223    if (wal_txn_exists(handle_in->file)) {
1224        filemgr_set_rollback(handle_in->file, 0);
1225        filemgr_mutex_unlock(handle_in->file);
1226        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1227        return FDB_RESULT_FAIL_BY_TRANSACTION;
1228    }
1229
1230    // If compaction is running, wait until it is aborted.
1231    // TODO: Find a better way of waiting for the compaction abortion.
1232    unsigned int sleep_time = 10000; // 10 ms.
1233    file_status_t fstatus = filemgr_get_file_status(handle_in->file);
1234    while (fstatus == FILE_COMPACT_OLD) {
1235        filemgr_mutex_unlock(handle_in->file);
1236        decaying_usleep(&sleep_time, 1000000);
1237        filemgr_mutex_lock(handle_in->file);
1238        fstatus = filemgr_get_file_status(handle_in->file);
1239    }
1240    if (fstatus == FILE_REMOVED_PENDING) {
1241        filemgr_mutex_unlock(handle_in->file);
1242        fdb_check_file_reopen(handle_in, NULL);
1243    } else {
1244        filemgr_mutex_unlock(handle_in->file);
1245    }
1246
1247    fdb_sync_db_header(handle_in);
1248
1249    // if the max sequence number seen by this handle is lower than the
1250    // requested snapshot marker, it means the snapshot is not yet visible
1251    // even via the current fdb_kvs_handle
1252    if (seqnum > handle_in->seqnum) {
1253        filemgr_set_rollback(handle_in->file, 0); // allow mutations
1254        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1255        return FDB_RESULT_NO_DB_INSTANCE;
1256    }
1257
1258    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1259    if (!handle) { // LCOV_EXCL_START
1260        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1261        return FDB_RESULT_ALLOC_FAIL;
1262    } // LCOV_EXCL_STOP
1263
1264    atomic_init_uint8_t(&handle->handle_busy, 0);
1265    handle->log_callback = handle_in->log_callback;
1266    handle->fhandle = handle_in->fhandle;
1267    if (seqnum == 0) {
1268        fs = _fdb_reset(handle, handle_in);
1269    } else {
1270        handle->max_seqnum = seqnum;
1271        fs = _fdb_open(handle, handle_in->file->filename, FDB_AFILENAME,
1272                       &config);
1273    }
1274
1275    filemgr_set_rollback(handle_in->file, 0); // allow mutations
1276    if (fs == FDB_RESULT_SUCCESS) {
1277        // rollback the file's sequence number
1278        filemgr_mutex_lock(handle_in->file);
1279        old_seqnum = filemgr_get_seqnum(handle_in->file);
1280        filemgr_set_seqnum(handle_in->file, seqnum);
1281        filemgr_mutex_unlock(handle_in->file);
1282
1283        fs = _fdb_commit(handle, FDB_COMMIT_MANUAL_WAL_FLUSH,
1284                !(handle_in->config.durability_opt & FDB_DRB_ASYNC));
1285        if (fs == FDB_RESULT_SUCCESS) {
1286            if (handle_in->txn) {
1287                handle->txn = handle_in->txn;
1288                handle_in->txn = NULL;
1289            }
1290            // Close, unlink and free the caller's rollback handle.
1291            _fdb_kvs_close(handle_in);
1292            free(handle_in);
1293            // Link the newly opened handle into the file handle's list
1294            _fdb_kvs_createNLinkKVHandle(handle->fhandle, handle);
1295            handle->max_seqnum = 0;
1296            handle->seqnum = seqnum;
1297            // Set the newly opened rolled-back handle as caller's handle
1298            *handle_ptr = handle;
1299        } else {
1300            // cancel the rolling-back of the sequence number
1301            filemgr_mutex_lock(handle_in->file);
1302            filemgr_set_seqnum(handle_in->file, old_seqnum);
1303            filemgr_mutex_unlock(handle_in->file);
1304            free(handle);
1305            atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1306        }
1307    } else {
1308        free(handle);
1309        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1310    }
1311
1312    return fs;
1313}
1314
1315LIBFDB_API
1316fdb_status fdb_rollback_all(fdb_file_handle *fhandle,
1317                            fdb_snapshot_marker_t marker)
1318{
1319#ifdef _MEMPOOL
1320    mempool_init();
1321#endif
1322
1323    fdb_config config;
1324    fdb_kvs_handle *super_handle;
1325    fdb_kvs_handle rhandle;
1326    fdb_kvs_handle *handle = &rhandle;
1327    struct filemgr *file;
1328    fdb_kvs_config kvs_config;
1329    fdb_status fs;
1330    err_log_callback log_callback;
1331    struct kvs_info *kvs;
1332    struct snap_handle shandle; // dummy snap handle
1333
1334    if (!fhandle) {
1335        return FDB_RESULT_INVALID_HANDLE;
1336    }
1337
1338    super_handle = fhandle->root;
1339    kvs = super_handle->kvs;
1340
1341    // fdb_rollback_all cannot be allowed when there are kv store instances
1342    // still open, because we do not have means of invalidating open kv handles
1343    // which may not be present in the rollback point
1344    if (kvs && _fdb_kvs_is_busy(fhandle)) {
1345        return FDB_RESULT_KV_STORE_BUSY;
1346    }
1347    file = super_handle->file;
1348    config = super_handle->config;
1349    kvs_config = super_handle->kvs_config;
1350    log_callback = super_handle->log_callback;
1351
1352    if (super_handle->config.flags & FDB_OPEN_FLAG_RDONLY) {
1353        return fdb_log(&super_handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
1354                       "Warning: Rollback is not allowed on the read-only DB file '%s'.",
1355                       super_handle->file->filename);
1356    }
1357
1358    filemgr_mutex_lock(super_handle->file);
1359    filemgr_set_rollback(super_handle->file, 1); // disallow writes operations
1360    // All transactions should be closed before rollback
1361    if (wal_txn_exists(super_handle->file)) {
1362        filemgr_set_rollback(super_handle->file, 0);
1363        filemgr_mutex_unlock(super_handle->file);
1364        return FDB_RESULT_FAIL_BY_TRANSACTION;
1365    }
1366
1367    // If compaction is running, wait until it is aborted.
1368    // TODO: Find a better way of waiting for the compaction abortion.
1369    unsigned int sleep_time = 10000; // 10 ms.
1370    file_status_t fstatus = filemgr_get_file_status(super_handle->file);
1371    while (fstatus == FILE_COMPACT_OLD) {
1372        filemgr_mutex_unlock(super_handle->file);
1373        decaying_usleep(&sleep_time, 1000000);
1374        filemgr_mutex_lock(super_handle->file);
1375        fstatus = filemgr_get_file_status(super_handle->file);
1376    }
1377    if (fstatus == FILE_REMOVED_PENDING) {
1378        filemgr_mutex_unlock(super_handle->file);
1379        fdb_check_file_reopen(super_handle, NULL);
1380    } else {
1381        filemgr_mutex_unlock(super_handle->file);
1382    }
1383
1384    fdb_sync_db_header(super_handle);
1385    // Shutdown WAL discarding entries from all KV Stores..
1386    fs = wal_shutdown(super_handle->file, &super_handle->log_callback);
1387    if (fs != FDB_RESULT_SUCCESS) {
1388        return fs;
1389    }
1390
1391    memset(handle, 0, sizeof(fdb_kvs_handle));
1392    memset(&shandle, 0, sizeof(struct snap_handle));
1393    handle->log_callback = log_callback;
1394    handle->fhandle = fhandle;
1395    // Fast rewind on open...
1396    atomic_store_uint64_t(&handle->last_hdr_bid, (bid_t)marker);
1397    handle->max_seqnum = FDB_SNAPSHOT_INMEM; // Prevent WAL restore on open
1398    handle->shandle = &shandle; // a dummy handle to prevent WAL restore
1399    if (kvs) {
1400        fdb_kvs_header_free(file); // KV header will be recreated below.
1401        handle->kvs = kvs; // re-use super_handle's kvs info
1402        handle->kvs_config = kvs_config;
1403    }
1404    handle->config = config;
1405
1406    fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1407
1408    if (handle->config.multi_kv_instances) {
1409        filemgr_mutex_lock(handle->file);
1410        fdb_kvs_header_create(handle->file);
1411        fdb_kvs_header_read(handle->file->kv_header, handle->dhandle,
1412                            handle->kv_info_offset,
1413                            handle->file->version, false);
1414        filemgr_mutex_unlock(handle->file);
1415    }
1416
1417    filemgr_set_rollback(file, 0); // allow mutations
1418    handle->shandle = NULL; // just a dummy handle never allocated
1419
1420    if (fs == FDB_RESULT_SUCCESS) {
1421        fdb_seqnum_t old_seqnum;
1422        // Restore WAL for all KV instances...
1423        _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, (bid_t)marker, 0);
1424
1425        // rollback the file's sequence number
1426        filemgr_mutex_lock(file);
1427        old_seqnum = filemgr_get_seqnum(file);
1428        filemgr_set_seqnum(file, handle->seqnum);
1429        filemgr_mutex_unlock(file);
1430
1431        fs = _fdb_commit(handle, FDB_COMMIT_NORMAL,
1432                         !(handle->config.durability_opt & FDB_DRB_ASYNC));
1433        if (fs == FDB_RESULT_SUCCESS) {
1434            _fdb_close(super_handle);
1435            *super_handle = *handle;
1436        } else {
1437            filemgr_mutex_lock(file);
1438            filemgr_set_seqnum(file, old_seqnum);
1439            filemgr_mutex_unlock(file);
1440        }
1441    } else { // Rollback failed, restore KV header
1442        fdb_kvs_header_create(file);
1443        fdb_kvs_header_read(file->kv_header, super_handle->dhandle,
1444                            super_handle->kv_info_offset,
1445                            ver_get_latest_magic(),
1446                            false);
1447    }
1448
1449    return fs;
1450}
1451
1452static void _fdb_init_file_config(const fdb_config *config,
1453                                  struct filemgr_config *fconfig) {
1454    fconfig->blocksize = config->blocksize;
1455    fconfig->ncacheblock = config->buffercache_size / config->blocksize;
1456    fconfig->chunksize = config->chunksize;
1457
1458    fconfig->options = 0x0;
1459    fconfig->seqtree_opt = config->seqtree_opt;
1460
1461    if (config->flags & FDB_OPEN_FLAG_CREATE) {
1462        fconfig->options |= FILEMGR_CREATE;
1463    }
1464    if (config->flags & FDB_OPEN_FLAG_RDONLY) {
1465        fconfig->options |= FILEMGR_READONLY;
1466    }
1467    if (!(config->durability_opt & FDB_DRB_ASYNC)) {
1468        fconfig->options |= FILEMGR_SYNC;
1469    }
1470
1471    fconfig->flag = 0x0;
1472    if ((config->durability_opt & FDB_DRB_ODIRECT) &&
1473        config->buffercache_size) {
1474        fconfig->flag |= _ARCH_O_DIRECT;
1475    }
1476
1477    fconfig->prefetch_duration = config->prefetch_duration;
1478    fconfig->num_wal_shards = config->num_wal_partitions;
1479    fconfig->num_bcache_shards = config->num_bcache_partitions;
1480    fconfig->encryption_key = config->encryption_key;
1481    atomic_store_uint64_t(&fconfig->block_reusing_threshold,
1482                          config->block_reusing_threshold,
1483                          std::memory_order_relaxed);
1484    atomic_store_uint64_t(&fconfig->num_keeping_headers,
1485                          config->num_keeping_headers,
1486                          std::memory_order_relaxed);
1487}
1488
1489fdb_status _fdb_clone_snapshot(fdb_kvs_handle *handle_in,
1490                               fdb_kvs_handle *handle_out)
1491{
1492    fdb_status status;
1493
1494    handle_out->config = handle_in->config;
1495    handle_out->kvs_config = handle_in->kvs_config;
1496    handle_out->fileops = handle_in->fileops;
1497    handle_out->file = handle_in->file;
1498    // Note that the file ref count will be decremented when the cloned snapshot
1499    // is closed through filemgr_close().
1500    filemgr_incr_ref_count(handle_out->file);
1501
1502    bool filename_allocated = false;
1503    if (handle_out->filename) {
1504        handle_out->filename = (char *)realloc(handle_out->filename,
1505                                               strlen(handle_in->filename)+1);
1506    } else {
1507        handle_out->filename = (char*)malloc(strlen(handle_in->filename)+1);
1508        filename_allocated = true;
1509    }
1510    strcpy(handle_out->filename, handle_in->filename);
1511
1512    // initialize the docio handle.
1513    handle_out->dhandle = (struct docio_handle *)
1514        calloc(1, sizeof(struct docio_handle));
1515    handle_out->dhandle->log_callback = &handle_out->log_callback;
1516    status = docio_init(handle_out->dhandle, handle_out->file,
1517                        handle_out->config.compress_document_body);
1518    if (status != FDB_RESULT_SUCCESS) {
1519        free(handle_out->dhandle);
1520        if (filename_allocated) {
1521            free(handle_out->filename);
1522        }
1523        return status;
1524    }
1525
1526    // initialize the btree block handle.
1527    handle_out->btreeblkops = btreeblk_get_ops();
1528    handle_out->bhandle = (struct btreeblk_handle *)
1529        calloc(1, sizeof(struct btreeblk_handle));
1530    handle_out->bhandle->log_callback = &handle_out->log_callback;
1531    btreeblk_init(handle_out->bhandle, handle_out->file, handle_out->file->blocksize);
1532
1533    handle_out->dirty_updates = handle_in->dirty_updates;
1534    atomic_store_uint64_t(&handle_out->cur_header_revnum, handle_in->cur_header_revnum);
1535    handle_out->last_wal_flush_hdr_bid = handle_in->last_wal_flush_hdr_bid;
1536    handle_out->kv_info_offset = handle_in->kv_info_offset;
1537    handle_out->op_stats = handle_in->op_stats;
1538
1539    // initialize the trie handle
1540    handle_out->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1541    hbtrie_init(handle_out->trie, handle_out->config.chunksize, OFFSET_SIZE,
1542                handle_out->file->blocksize,
1543                handle_in->trie->root_bid, // Source snapshot's trie root bid
1544                (void *)handle_out->bhandle, handle_out->btreeblkops,
1545                (void *)handle_out->dhandle, _fdb_readkey_wrap);
1546    // set aux for cmp wrapping function
1547    hbtrie_set_leaf_height_limit(handle_out->trie, 0xff);
1548    hbtrie_set_leaf_cmp(handle_out->trie, _fdb_custom_cmp_wrap);
1549
1550    if (handle_out->kvs) {
1551        hbtrie_set_map_function(handle_out->trie, fdb_kvs_find_cmp_chunk);
1552    }
1553
1554    handle_out->seqnum = handle_in->seqnum;
1555    if (handle_out->config.seqtree_opt == FDB_SEQTREE_USE) {
1556        if (handle_out->config.multi_kv_instances) {
1557            // multi KV instance mode .. HB+trie
1558            handle_out->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1559            hbtrie_init(handle_out->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
1560                        handle_out->file->blocksize,
1561                        handle_in->seqtrie->root_bid, // Source snapshot's seqtrie root bid
1562                        (void *)handle_out->bhandle, handle_out->btreeblkops,
1563                        (void *)handle_out->dhandle, _fdb_readseq_wrap);
1564
1565        } else {
1566            // single KV instance mode .. normal B+tree
1567            struct btree_kv_ops *seq_kv_ops =
1568                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
1569            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
1570            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
1571
1572            handle_out->seqtree = (struct btree*)malloc(sizeof(struct btree));
1573            // Init the seq tree using the root bid of the source snapshot.
1574            btree_init_from_bid(handle_out->seqtree, (void *)handle_out->bhandle,
1575                                handle_out->btreeblkops, seq_kv_ops,
1576                                handle_out->config.blocksize,
1577                                handle_in->seqtree->root_bid);
1578        }
1579    } else{
1580        handle_out->seqtree = NULL;
1581    }
1582
1583    status = btreeblk_end(handle_out->bhandle);
1584    if (status != FDB_RESULT_SUCCESS) {
1585        const char *msg = "Snapshot clone operation fails due to the errors in "
1586            "btreeblk_end() in a database file '%s'\n";
1587        fdb_log(&handle_in->log_callback, status, msg, handle_in->file->filename);
1588    }
1589
1590    return status;
1591}
1592
1593fdb_status _fdb_open(fdb_kvs_handle *handle,
1594                     const char *filename,
1595                     fdb_filename_mode_t filename_mode,
1596                     const fdb_config *config)
1597{
1598    struct filemgr_config fconfig;
1599    struct kvs_stat stat, empty_stat;
1600    bid_t trie_root_bid = BLK_NOT_FOUND;
1601    bid_t seq_root_bid = BLK_NOT_FOUND;
1602    bid_t stale_root_bid = BLK_NOT_FOUND;
1603    fdb_seqnum_t seqnum = 0;
1604    filemgr_header_revnum_t header_revnum = 0;
1605    filemgr_header_revnum_t latest_header_revnum = 0;
1606    fdb_seqtree_opt_t seqtree_opt = config->seqtree_opt;
1607    uint64_t ndocs = 0;
1608    uint64_t ndeletes = 0;
1609    uint64_t datasize = 0;
1610    uint64_t deltasize = 0;
1611    uint64_t last_wal_flush_hdr_bid = BLK_NOT_FOUND;
1612    uint64_t kv_info_offset = BLK_NOT_FOUND;
1613    uint64_t version;
1614    uint64_t header_flags = 0;
1615    uint8_t header_buf[FDB_BLOCKSIZE];
1616    char *compacted_filename = NULL;
1617    char *prev_filename = NULL;
1618    size_t header_len = 0;
1619    bool multi_kv_instances = config->multi_kv_instances;
1620
1621    uint64_t nlivenodes = 0;
1622    bid_t hdr_bid = 0; // initialize to zero for in-memory snapshot
1623    char actual_filename[FDB_MAX_FILENAME_LEN];
1624    char virtual_filename[FDB_MAX_FILENAME_LEN];
1625    char *target_filename = NULL;
1626    fdb_status status;
1627
1628    if (filename == NULL) {
1629        return FDB_RESULT_INVALID_ARGS;
1630    }
1631    if (strlen(filename) > (FDB_MAX_FILENAME_LEN - 8)) {
1632        // filename (including path) length is supported up to
1633        // (FDB_MAX_FILENAME_LEN - 8) bytes.
1634        return FDB_RESULT_TOO_LONG_FILENAME;
1635    }
1636
1637    if (filename_mode == FDB_VFILENAME &&
1638        !compactor_is_valid_mode(filename, (fdb_config *)config)) {
1639        return FDB_RESULT_INVALID_COMPACTION_MODE;
1640    }
1641
1642    _fdb_init_file_config(config, &fconfig);
1643
1644    if (filename_mode == FDB_VFILENAME) {
1645        compactor_get_actual_filename(filename, actual_filename,
1646                                      config->compaction_mode, &handle->log_callback);
1647    } else {
1648        strcpy(actual_filename, filename);
1649    }
1650
1651    if ( config->compaction_mode == FDB_COMPACTION_MANUAL ||
1652         (config->compaction_mode == FDB_COMPACTION_AUTO   &&
1653          filename_mode == FDB_VFILENAME) ) {
1654        // 1) manual compaction mode, OR
1655        // 2) auto compaction mode + 'filename' is virtual filename
1656        // -> copy 'filename'
1657        target_filename = (char *)filename;
1658    } else {
1659        // otherwise (auto compaction mode + 'filename' is actual filename)
1660        // -> copy 'virtual_filename'
1661        compactor_get_virtual_filename(filename, virtual_filename);
1662        target_filename = virtual_filename;
1663    }
1664
1665    // If the user is requesting legacy CRC pass that down to filemgr
1666    if(config->flags & FDB_OPEN_WITH_LEGACY_CRC) {
1667        fconfig.options |= FILEMGR_CREATE_CRC32;
1668    }
1669
1670    handle->fileops = get_filemgr_ops();
1671    filemgr_open_result result = filemgr_open((char *)actual_filename,
1672                                              handle->fileops,
1673                                              &fconfig, &handle->log_callback);
1674    if (result.rv != FDB_RESULT_SUCCESS) {
1675        return (fdb_status) result.rv;
1676    }
1677    handle->file = result.file;
1678
1679    if (config->compaction_mode == FDB_COMPACTION_MANUAL &&
1680        strcmp(filename, actual_filename)) {
1681        // It is in-place compacted file if
1682        // 1) compaction mode is manual, and
1683        // 2) actual filename is different to the filename given by user.
1684        // In this case, set the in-place compaction flag.
1685        filemgr_set_in_place_compaction(handle->file, true);
1686    }
1687    if (filemgr_is_in_place_compaction_set(handle->file)) {
1688        // This file was in-place compacted.
1689        // set 'handle->filename' to the original filename to trigger file renaming
1690        compactor_get_virtual_filename(filename, virtual_filename);
1691        target_filename = virtual_filename;
1692    }
1693
1694    if (handle->filename) {
1695        handle->filename = (char *)realloc(handle->filename,
1696                                           strlen(target_filename)+1);
1697    } else {
1698        handle->filename = (char*)malloc(strlen(target_filename)+1);
1699    }
1700    strcpy(handle->filename, target_filename);
1701
1702    // If cloning from a snapshot handle, fdb_snapshot_open would have already
1703    // set handle->last_hdr_bid to the block id of required header, so rewind..
1704    bid_t last_hdr_bid = atomic_get_uint64_t(&handle->last_hdr_bid);
1705    if (handle->shandle && last_hdr_bid) {
1706        status = filemgr_fetch_header(handle->file, last_hdr_bid,
1707                                      header_buf, &header_len, &seqnum,
1708                                      &latest_header_revnum, &deltasize, &version,
1709                                      NULL, &handle->log_callback);
1710        if (status != FDB_RESULT_SUCCESS) {
1711            free(handle->filename);
1712            handle->filename = NULL;
1713            filemgr_close(handle->file, false, handle->filename,
1714                              &handle->log_callback);
1715            return status;
1716        }
1717    } else { // Normal open
1718        filemgr_get_header(handle->file, header_buf, &header_len,
1719                           &last_hdr_bid, &seqnum, &latest_header_revnum);
1720        atomic_store_uint64_t(&handle->last_hdr_bid, last_hdr_bid);
1721        version = handle->file->version;
1722    }
1723
1724    // initialize the docio handle so kv headers may be read
1725    handle->dhandle = (struct docio_handle *)
1726                      calloc(1, sizeof(struct docio_handle));
1727    handle->dhandle->log_callback = &handle->log_callback;
1728    status = docio_init(handle->dhandle, handle->file,
1729                        config->compress_document_body);
1730    if (status != FDB_RESULT_SUCCESS) {
1731        free(handle->dhandle);
1732        free(handle->filename);
1733        handle->filename = NULL;
1734        filemgr_close(handle->file, false, handle->filename,
1735                          &handle->log_callback);
1736        return status;
1737    }
1738
1739    // fetch previous superblock bitmap info if exists
1740    // (this should be done after 'handle->dhandle' is initialized)
1741    if (handle->file->sb) {
1742        status = sb_bmp_fetch_doc(handle);
1743        if (status != FDB_RESULT_SUCCESS) {
1744            docio_free(handle->dhandle);
1745            free(handle->dhandle);
1746            free(handle->filename);
1747            handle->filename = NULL;
1748            filemgr_close(handle->file, false, handle->filename,
1749                              &handle->log_callback);
1750            return status;
1751        }
1752    }
1753
1754
1755    if (header_len > 0) {
1756        fdb_fetch_header(version, header_buf, &trie_root_bid, &seq_root_bid,
1757                         &stale_root_bid, &ndocs, &ndeletes, &nlivenodes,
1758                         &datasize, &last_wal_flush_hdr_bid, &kv_info_offset,
1759                         &header_flags, &compacted_filename, &prev_filename);
1760        // use existing setting for seqtree_opt
1761        if (header_flags & FDB_FLAG_SEQTREE_USE) {
1762            seqtree_opt = FDB_SEQTREE_USE;
1763        } else {
1764            seqtree_opt = FDB_SEQTREE_NOT_USE;
1765        }
1766        // Retrieve seqnum for multi-kv mode
1767        if (handle->kvs && handle->kvs->id > 0) {
1768            if (kv_info_offset != BLK_NOT_FOUND) {
1769                if (!filemgr_get_kv_header(handle->file)) {
1770                    struct kvs_header *kv_header;
1771                    _fdb_kvs_header_create(&kv_header);
1772                    // KV header already exists but not loaded .. read & import
1773                    fdb_kvs_header_read(kv_header, handle->dhandle,
1774                                        kv_info_offset, version, false);
1775                    if (!filemgr_set_kv_header(handle->file, kv_header,
1776                                               fdb_kvs_header_free)) {
1777                        _fdb_kvs_header_free(kv_header);
1778                    }
1779                }
1780                seqnum = _fdb_kvs_get_seqnum(handle->file->kv_header,
1781                                             handle->kvs->id);
1782            } else { // no kv_info offset, ok to set seqnum to zero
1783                seqnum = 0;
1784            }
1785        }
1786        // other flags
1787        if (header_flags & FDB_FLAG_ROOT_INITIALIZED) {
1788            handle->fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1789        }
1790        if (header_flags & FDB_FLAG_ROOT_CUSTOM_CMP) {
1791            handle->fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1792        }
1793        if (header_flags & FDB_FLAG_SUCCESSFULLY_COMPACTED) {
1794            filemgr_set_successfully_compacted(handle->file);
1795        }
1796        // use existing setting for multi KV instance mode
1797        if (kv_info_offset == BLK_NOT_FOUND) {
1798            multi_kv_instances = false;
1799        } else {
1800            multi_kv_instances = true;
1801        }
1802    }
1803
1804    handle->config = *config;
1805    handle->config.seqtree_opt = seqtree_opt;
1806    handle->config.multi_kv_instances = multi_kv_instances;
1807
1808    if (handle->shandle && handle->max_seqnum == FDB_SNAPSHOT_INMEM) {
1809        // Either an in-memory snapshot or cloning from an existing snapshot..
1810        hdr_bid = 0; // This prevents _fdb_restore_wal() as incoming handle's
1811                     // *_open() should have already restored it
1812    } else { // Persisted snapshot or file rollback..
1813
1814        // get the BID of the latest block
1815        // (it is OK if the block is not a DB header)
1816        bool dirty_data_exists = false;
1817        struct superblock *sb = handle->file->sb;
1818
1819        if (sb_bmp_exists(sb)) {
1820            dirty_data_exists = false;
1821            bid_t sb_last_hdr_bid = atomic_get_uint64_t(&sb->last_hdr_bid);
1822            if (sb_last_hdr_bid != BLK_NOT_FOUND) {
1823                // add 1 since we subtract 1 from 'hdr_bid' below soon
1824                hdr_bid = sb_last_hdr_bid + 1;
1825                if (atomic_get_uint64_t(&sb->cur_alloc_bid) != hdr_bid) {
1826                    // seq number has been increased since the last commit
1827                    seqnum = fdb_kvs_get_committed_seqnum(handle);
1828                }
1829            } else {
1830                hdr_bid = BLK_NOT_FOUND;
1831            }
1832        } else {
1833            hdr_bid = filemgr_get_pos(handle->file) / FDB_BLOCKSIZE;
1834            dirty_data_exists = (hdr_bid >
1835                        atomic_get_uint64_t(&handle->last_hdr_bid));
1836        }
1837
1838        if (hdr_bid == BLK_NOT_FOUND ||
1839            (sb && hdr_bid <= sb->config->num_sb)) {
1840            hdr_bid = 0;
1841        } else if (hdr_bid > 0) {
1842            --hdr_bid;
1843        }
1844
1845        if (handle->max_seqnum) {
1846            struct kvs_stat stat_ori;
1847            // backup original stats
1848            if (handle->kvs) {
1849                _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1850            } else {
1851                _kvs_stat_get(handle->file, 0, &stat_ori);
1852            }
1853
1854            if (dirty_data_exists){
1855                // uncommitted data exists beyond the last DB header
1856                // get the last committed seq number
1857                fdb_seqnum_t seq_commit;
1858                seq_commit = fdb_kvs_get_committed_seqnum(handle);
1859                if (seq_commit == 0 || seq_commit < handle->max_seqnum) {
1860                    // In case, snapshot_open is attempted with latest uncommitted
1861                    // sequence number
1862                    header_len = 0;
1863                } else if (seq_commit == handle->max_seqnum) {
1864                    // snapshot/rollback on the latest commit header
1865                    seqnum = seq_commit; // skip file reverse scan
1866                }
1867                hdr_bid = filemgr_get_header_bid(handle->file);
1868            }
1869            // Reverse scan the file to locate the DB header with seqnum marker
1870            header_revnum = latest_header_revnum;
1871            while (header_len && seqnum != handle->max_seqnum) {
1872                hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
1873                                          header_buf, &header_len, &seqnum,
1874                                          &header_revnum, NULL, &version, NULL,
1875                                          &handle->log_callback);
1876                if (header_len == 0) {
1877                    continue; // header doesn't exist
1878                }
1879                fdb_fetch_header(version, header_buf, &trie_root_bid,
1880                                 &seq_root_bid, &stale_root_bid,
1881                                 &ndocs, &ndeletes, &nlivenodes,
1882                                 &datasize, &last_wal_flush_hdr_bid,
1883                                 &kv_info_offset, &header_flags,
1884                                 &compacted_filename, NULL);
1885                atomic_store_uint64_t(&handle->last_hdr_bid, hdr_bid);
1886
1887                if (!handle->kvs || handle->kvs->id == 0) {
1888                    // single KVS mode OR default KVS
1889                    if (!handle->shandle) {
1890                        // rollback
1891                        struct kvs_stat stat_dst;
1892                        _kvs_stat_get(handle->file, 0, &stat_dst);
1893                        stat_dst.ndocs = ndocs;
1894                        stat_dst.ndeletes = ndeletes;
1895                        stat_dst.datasize = datasize;
1896                        stat_dst.nlivenodes = nlivenodes;
1897                        stat_dst.deltasize = deltasize;
1898                        _kvs_stat_set(handle->file, 0, stat_dst);
1899                    }
1900                    continue;
1901                }
1902
1903                int64_t doc_offset;
1904                struct kvs_header *kv_header;
1905                struct docio_object doc;
1906
1907                _fdb_kvs_header_create(&kv_header);
1908                memset(&doc, 0, sizeof(struct docio_object));
1909                doc_offset = docio_read_doc(handle->dhandle,
1910                                            kv_info_offset, &doc, true);
1911
1912                if (doc_offset <= 0) {
1913                    header_len = 0; // fail
1914                    _fdb_kvs_header_free(kv_header);
1915                } else {
1916                    _fdb_kvs_header_import(kv_header, doc.body,
1917                                           doc.length.bodylen, version, false);
1918                    // get local sequence number for the KV instance
1919                    seqnum = _fdb_kvs_get_seqnum(kv_header,
1920                                                 handle->kvs->id);
1921                    if (!handle->shandle) {
1922                        // rollback: replace kv_header stats
1923                        // read from the current header's kv_header
1924                        struct kvs_stat stat_src, stat_dst;
1925                        _kvs_stat_get_kv_header(kv_header,
1926                                                handle->kvs->id,
1927                                                &stat_src);
1928                        _kvs_stat_get(handle->file,
1929                                      handle->kvs->id,
1930                                      &stat_dst);
1931                        // update ndocs, datasize, nlivenodes
1932                        // into the current file's kv_header
1933                        // Note: stats related to WAL should not be updated
1934                        //       at this time. They will be adjusted through
1935                        //       discard & restore routines below.
1936                        stat_dst.ndocs = stat_src.ndocs;
1937                        stat_dst.datasize = stat_src.datasize;
1938                        stat_dst.nlivenodes = stat_src.nlivenodes;
1939                        _kvs_stat_set(handle->file,
1940                                      handle->kvs->id,
1941                                      stat_dst);
1942                    }
1943                    _fdb_kvs_header_free(kv_header);
1944                    free_docio_object(&doc, 1, 1, 1);
1945                }
1946            }
1947
1948            if (header_len && // header exists
1949                config->block_reusing_threshold > 0 && // block reuse is enabled
1950                config->block_reusing_threshold < 100 &&
1951                header_revnum < sb_get_min_live_revnum(handle->file)) {
1952                // cannot perform rollback/snapshot beyond the last live header
1953                header_len = 0;
1954            }
1955
1956            if (!header_len) { // Marker MUST match that of DB commit!
1957                // rollback original stats
1958                if (handle->kvs) {
1959                    _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1960                } else {
1961                    _kvs_stat_get(handle->file, 0, &stat_ori);
1962                }
1963
1964                docio_free(handle->dhandle);
1965                free(handle->dhandle);
1966                free(handle->filename);
1967                free(prev_filename);
1968                handle->filename = NULL;
1969                filemgr_close(handle->file, false, handle->filename,
1970                              &handle->log_callback);
1971                return FDB_RESULT_NO_DB_INSTANCE;
1972            }
1973
1974            if (!handle->shandle) { // Rollback mode, destroy file WAL..
1975                if (handle->config.multi_kv_instances) {
1976                    // multi KV instance mode
1977                    // clear only WAL items belonging to the instance
1978                    wal_close_kv_ins(handle->file,
1979                                     (handle->kvs)?(handle->kvs->id):(0),
1980                                     &handle->log_callback);
1981                } else {
1982                    wal_shutdown(handle->file, &handle->log_callback);
1983                }
1984            }
1985        } else { // snapshot to sequence number 0 requested..
1986            if (handle->shandle) { // fdb_snapshot_open API call
1987                if (seqnum) {
1988                    // Database currently has a non-zero seq number,
1989                    // but the snapshot was requested with a seq number zero.
1990                    docio_free(handle->dhandle);
1991                    free(handle->dhandle);
1992                    free(handle->filename);
1993                    free(prev_filename);
1994                    handle->filename = NULL;
1995                    filemgr_close(handle->file, false, handle->filename,
1996                                  &handle->log_callback);
1997                    return FDB_RESULT_NO_DB_INSTANCE;
1998                }
1999            } // end of zero max_seqnum but non-rollback check
2000        } // end of zero max_seqnum check
2001    } // end of durable snapshot locating
2002
2003    handle->btreeblkops = btreeblk_get_ops();
2004    handle->bhandle = (struct btreeblk_handle *)
2005                      calloc(1, sizeof(struct btreeblk_handle));
2006    handle->bhandle->log_callback = &handle->log_callback;
2007
2008    handle->dirty_updates = 0;
2009
2010    if (handle->config.compaction_buf_maxsize == 0) {
2011        handle->config.compaction_buf_maxsize = FDB_COMP_BUF_MINSIZE;
2012    }
2013
2014    btreeblk_init(handle->bhandle, handle->file, handle->file->blocksize);
2015
2016    handle->cur_header_revnum = latest_header_revnum;
2017    if (header_revnum) {
2018        if (filemgr_is_rollback_on(handle->file)) {
2019            // rollback mode
2020            // set rollback header revnum
2021            handle->rollback_revnum = header_revnum;
2022        } else {
2023            // snapshot mode (only for snapshot)
2024            handle->cur_header_revnum = header_revnum;
2025        }
2026    }
2027    handle->last_wal_flush_hdr_bid = last_wal_flush_hdr_bid;
2028
2029    memset(&empty_stat, 0x0, sizeof(empty_stat));
2030    _kvs_stat_get(handle->file, 0, &stat);
2031    if (!memcmp(&stat, &empty_stat, sizeof(stat))) { // first open
2032        // sync (default) KVS stat with DB header
2033        stat.nlivenodes = nlivenodes;
2034        stat.ndocs = ndocs;
2035        stat.datasize = datasize;
2036        _kvs_stat_set(handle->file, 0, stat);
2037    }
2038
2039    handle->kv_info_offset = kv_info_offset;
2040    if (handle->config.multi_kv_instances && !handle->shandle) {
2041        // multi KV instance mode
2042        filemgr_mutex_lock(handle->file);
2043        if (kv_info_offset == BLK_NOT_FOUND) {
2044            // there is no KV header .. create & initialize
2045            fdb_kvs_header_create(handle->file);
2046            // TODO: If another handle is opened before the first header is appended,
2047            // an unnecessary KV info doc is appended. We need to address it.
2048            kv_info_offset = fdb_kvs_header_append(handle);
2049        } else if (handle->file->kv_header == NULL) {
2050            // KV header already exists but not loaded .. read & import
2051            fdb_kvs_header_create(handle->file);
2052            fdb_kvs_header_read(handle->file->kv_header, handle->dhandle,
2053                                kv_info_offset, version, false);
2054        }
2055        filemgr_mutex_unlock(handle->file);
2056
2057        // validation check for key order of all KV stores
2058        if (handle == handle->fhandle->root) {
2059            fdb_status fs = fdb_kvs_cmp_check(handle);
2060            if (fs != FDB_RESULT_SUCCESS) { // cmp function mismatch
2061                docio_free(handle->dhandle);
2062                free(handle->dhandle);
2063                btreeblk_free(handle->bhandle);
2064                free(handle->bhandle);
2065                free(handle->filename);
2066                handle->filename = NULL;
2067                filemgr_close(handle->file, false, handle->filename,
2068                              &handle->log_callback);
2069                return fs;
2070            }
2071        }
2072    }
2073    handle->kv_info_offset = kv_info_offset;
2074
2075    if (handle->kv_info_offset != BLK_NOT_FOUND &&
2076        handle->kvs == NULL) {
2077        // multi KV instance mode .. turn on config flag
2078        handle->config.multi_kv_instances = true;
2079        // only super handle can be opened using fdb_open(...)
2080        fdb_kvs_info_create(NULL, handle, handle->file, NULL);
2081    }
2082
2083    if (handle->shandle) { // Populate snapshot stats..
2084        if (kv_info_offset == BLK_NOT_FOUND) { // Single KV mode
2085            memset(&handle->shandle->stat, 0x0,
2086                    sizeof(handle->shandle->stat));
2087            handle->shandle->stat.ndocs = ndocs;
2088            handle->shandle->stat.datasize = datasize;
2089            handle->shandle->stat.nlivenodes = nlivenodes;
2090        } else { // Multi KV instance mode, populate specific kv stats
2091            memset(&handle->shandle->stat, 0x0,
2092                    sizeof(handle->shandle->stat));
2093            _kvs_stat_get(handle->file, handle->kvs->id,
2094                    &handle->shandle->stat);
2095            // Since wal is restored below, we have to reset
2096            // wal stats to zero.
2097            handle->shandle->stat.wal_ndeletes = 0;
2098            handle->shandle->stat.wal_ndocs = 0;
2099        }
2100    }
2101
2102    // initialize pointer to the global operational stats of this KV store
2103    handle->op_stats = filemgr_get_ops_stats(handle->file, handle->kvs);
2104    if (!handle->op_stats) {
2105        const char *msg = "Database open fails due to the error in retrieving "
2106            "the global operational stats of KV store in a database file '%s'\n";
2107        fdb_log(&handle->log_callback, FDB_RESULT_OPEN_FAIL, msg,
2108                handle->file->filename);
2109        return FDB_RESULT_OPEN_FAIL;
2110    }
2111
2112    handle->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
2113    hbtrie_init(handle->trie, config->chunksize, OFFSET_SIZE,
2114                handle->file->blocksize, trie_root_bid,
2115                (void *)handle->bhandle, handle->btreeblkops,
2116                (void *)handle->dhandle, _fdb_readkey_wrap);
2117    // set aux for cmp wrapping function
2118    hbtrie_set_leaf_height_limit(handle->trie, 0xff);
2119    hbtrie_set_leaf_cmp(handle->trie, _fdb_custom_cmp_wrap);
2120
2121    if (handle->kvs) {
2122        hbtrie_set_map_function(handle->trie, fdb_kvs_find_cmp_chunk);
2123    }
2124
2125    handle->seqnum = seqnum;
2126    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2127        if (handle->config.multi_kv_instances) {
2128            // multi KV instance mode .. HB+trie
2129            handle->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
2130            hbtrie_init(handle->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
2131                        handle->file->blocksize, seq_root_bid,
2132                        (void *)handle->bhandle, handle->btreeblkops,
2133                        (void *)handle->dhandle, _fdb_readseq_wrap);
2134
2135        } else {
2136            // single KV instance mode .. normal B+tree
2137            struct btree_kv_ops *seq_kv_ops =
2138                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
2139            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
2140            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
2141
2142            handle->seqtree = (struct btree*)malloc(sizeof(struct btree));
2143            if (seq_root_bid == BLK_NOT_FOUND) {
2144                btree_init(handle->seqtree, (void *)handle->bhandle,
2145                           handle->btreeblkops, seq_kv_ops,
2146                           handle->config.blocksize, sizeof(fdb_seqnum_t),
2147                           OFFSET_SIZE, 0x0, NULL);
2148            }else{
2149                btree_init_from_bid(handle->seqtree, (void *)handle->bhandle,
2150                                    handle->btreeblkops, seq_kv_ops,
2151                                    handle->config.blocksize, seq_root_bid);
2152            }
2153        }
2154    }else{
2155        handle->seqtree = NULL;
2156    }
2157
2158    // Stale-block tree (supported since MAGIC_002)
2159    // this tree is independent to multi/single KVS mode option
2160    if (ver_staletree_support(handle->file->version)) {
2161        // normal B+tree
2162        struct btree_kv_ops *stale_kv_ops =
2163            (struct btree_kv_ops *)calloc(1, sizeof(struct btree_kv_ops));
2164        stale_kv_ops = btree_kv_get_kb64_vb64(stale_kv_ops);
2165        stale_kv_ops->cmp = _cmp_uint64_t_endian_safe;
2166
2167        handle->staletree = (struct btree*)calloc(1, sizeof(struct btree));
2168        if (stale_root_bid == BLK_NOT_FOUND) {
2169            btree_init(handle->staletree, (void *)handle->bhandle,
2170                       handle->btreeblkops, stale_kv_ops,
2171                       handle->config.blocksize, sizeof(filemgr_header_revnum_t),
2172                       OFFSET_SIZE, 0x0, NULL);
2173         }else{
2174            btree_init_from_bid(handle->staletree, (void *)handle->bhandle,
2175                                handle->btreeblkops, stale_kv_ops,
2176                                handle->config.blocksize, stale_root_bid);
2177            // prefetch stale info into memory
2178            fdb_load_inmem_stale_info(handle);
2179         }
2180    } else {
2181        handle->staletree = NULL;
2182    }
2183
2184    if (handle->config.multi_kv_instances && handle->max_seqnum) {
2185        // restore only docs belonging to the KV instance
2186        // handle->kvs should not be NULL
2187        _fdb_restore_wal(handle, FDB_RESTORE_KV_INS,
2188                         hdr_bid, (handle->kvs)?(handle->kvs->id):(0));
2189    } else {
2190        // normal restore
2191        _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, hdr_bid, 0);
2192    }
2193
2194    if (compacted_filename &&
2195        filemgr_get_file_status(handle->file) == FILE_NORMAL &&
2196        !(config->flags & FDB_OPEN_FLAG_RDONLY)) { // do not recover read-only
2197        status = _fdb_recover_compaction(handle, compacted_filename);
2198        if (status == FDB_RESULT_FAIL_BY_COMPACTION) {
2199            // recovery would have unlinked the previous file
2200            free(prev_filename);
2201            prev_filename = NULL;
2202        }
2203        // Either
2204        // 1. recovered the newly compacted file and deleted the old file or
2205        // 2. recovery failed and are going to stick to the old file or
2206        // In both cases, the old_filename and new_filename are not needed.
2207        if (handle->file){
2208            handle->file->old_filename =  NULL;
2209            handle->file->new_filename =  NULL;
2210        }
2211    }
2212
2213    if (prev_filename) {
2214        if (!handle->shandle && strcmp(prev_filename, handle->file->filename)) {
2215            // record the old filename into the file handle of current file
2216            // and REMOVE old file on the first open
2217            // WARNING: snapshots must have been opened before this call
2218            if (filemgr_update_file_linkage(handle->file, prev_filename, NULL)) {
2219                // Open the old file with read-only mode.
2220                // (Temporarily disable log callback at this time since
2221                //  the old file might be already removed.)
2222                err_log_callback dummy_cb;
2223                dummy_cb.callback = fdb_dummy_log_callback;
2224                dummy_cb.ctx_data = NULL;
2225                fconfig.options = FILEMGR_READONLY;
2226                filemgr_open_result result = filemgr_open(prev_filename,
2227                                                          handle->fileops,
2228                                                          &fconfig,
2229                                                          &dummy_cb);
2230                if (result.file) {
2231                    filemgr_remove_pending(result.file, handle->file,
2232                                           &handle->log_callback);
2233                    filemgr_close(result.file, 0, handle->filename,
2234                                  &handle->log_callback);
2235                }
2236            }
2237        }
2238        // we allocated a memory region for file->old_filename and
2239        // prev_filename would be copied to there,
2240        // so it is OK to free it here whatever the result is.
2241        free(prev_filename);
2242    }
2243
2244    status = btreeblk_end(handle->bhandle);
2245    if (status != FDB_RESULT_SUCCESS) {
2246        // When fdb_kvs_open() is being issued in parallel with fdb_open()
2247        // it is possible that this call (fdb_open()) hits a write failure
2248        // because the btreeblock to be written was already made immutable
2249        // by the commit from the fdb_kvs_open(). Simpy ignore this error case.
2250        if (status == FDB_RESULT_WRITE_FAIL) {
2251            if (filemgr_get_header_revnum(handle->file)
2252                                             == latest_header_revnum) {
2253                return status;
2254            } else {
2255                status = FDB_RESULT_SUCCESS;
2256            }
2257        } else {
2258            return status;
2259        }
2260    }
2261
2262    // do not register read-only handles
2263    if (!(config->flags & FDB_OPEN_FLAG_RDONLY)) {
2264        if (config->compaction_mode == FDB_COMPACTION_AUTO) {
2265            status = compactor_register_file(handle->file,
2266                                             (fdb_config *)config,
2267                                             &handle->log_callback);
2268        }
2269        if (status == FDB_RESULT_SUCCESS) {
2270            status = bgflusher_register_file(handle->file,
2271                                             (fdb_config *)config,
2272                                             &handle->log_callback);
2273        }
2274    }
2275
2276    return status;
2277}
2278
2279LIBFDB_API
2280fdb_status fdb_set_log_callback(fdb_kvs_handle *handle,
2281                                fdb_log_callback log_callback,
2282                                void *ctx_data)
2283{
2284    if (!handle) {
2285        return FDB_RESULT_INVALID_HANDLE;
2286    }
2287
2288    handle->log_callback.callback = log_callback;
2289    handle->log_callback.ctx_data = ctx_data;
2290    return FDB_RESULT_SUCCESS;
2291}
2292
2293LIBFDB_API
2294void fdb_set_fatal_error_callback(fdb_fatal_error_callback err_callback)
2295{
2296    fatal_error_callback = err_callback;
2297}
2298
2299LIBFDB_API
2300fdb_status fdb_doc_create(fdb_doc **doc, const void *key, size_t keylen,
2301                          const void *meta, size_t metalen,
2302                          const void *body, size_t bodylen)
2303{
2304    if (doc == NULL || keylen > FDB_MAX_KEYLEN ||
2305        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
2306        return FDB_RESULT_INVALID_ARGS;
2307    }
2308
2309    *doc = (fdb_doc*)calloc(1, sizeof(fdb_doc));
2310    if (*doc == NULL) { // LCOV_EXCL_START
2311        return FDB_RESULT_ALLOC_FAIL;
2312    } // LCOV_EXCL_STOP
2313
2314    (*doc)->seqnum = SEQNUM_NOT_USED;
2315
2316    if (key && keylen > 0) {
2317        (*doc)->key = (void *)malloc(keylen);
2318        if ((*doc)->key == NULL) { // LCOV_EXCL_START
2319            return FDB_RESULT_ALLOC_FAIL;
2320        } // LCOV_EXCL_STOP
2321        memcpy((*doc)->key, key, keylen);
2322        (*doc)->keylen = keylen;
2323    } else {
2324        (*doc)->key = NULL;
2325        (*doc)->keylen = 0;
2326    }
2327
2328    if (meta && metalen > 0) {
2329        (*doc)->meta = (void *)malloc(metalen);
2330        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
2331            return FDB_RESULT_ALLOC_FAIL;
2332        } // LCOV_EXCL_STOP
2333        memcpy((*doc)->meta, meta, metalen);
2334        (*doc)->metalen = metalen;
2335    } else {
2336        (*doc)->meta = NULL;
2337        (*doc)->metalen = 0;
2338    }
2339
2340    if (body && bodylen > 0) {
2341        (*doc)->body = (void *)malloc(bodylen);
2342        if ((*doc)->body == NULL) { // LCOV_EXCL_START
2343            return FDB_RESULT_ALLOC_FAIL;
2344        } // LCOV_EXCL_STOP
2345        memcpy((*doc)->body, body, bodylen);
2346        (*doc)->bodylen = bodylen;
2347    } else {
2348        (*doc)->body = NULL;
2349        (*doc)->bodylen = 0;
2350    }
2351
2352    return FDB_RESULT_SUCCESS;
2353}
2354
2355LIBFDB_API
2356fdb_status fdb_doc_update(fdb_doc **doc,
2357                          const void *meta, size_t metalen,
2358                          const void *body, size_t bodylen)
2359{
2360    if (doc == NULL ||
2361        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
2362        return FDB_RESULT_INVALID_ARGS;
2363    }
2364    if (*doc == NULL) {
2365        return FDB_RESULT_INVALID_ARGS;
2366    }
2367
2368    if (meta && metalen > 0) {
2369        // free previous metadata
2370        free((*doc)->meta);
2371        // allocate new metadata
2372        (*doc)->meta = (void *)malloc(metalen);
2373        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
2374            return FDB_RESULT_ALLOC_FAIL;
2375        } // LCOV_EXCL_STOP
2376        memcpy((*doc)->meta, meta, metalen);
2377        (*doc)->metalen = metalen;
2378    }
2379
2380    if (body && bodylen > 0) {
2381        // free previous body
2382        free((*doc)->body);
2383        // allocate new body
2384        (*doc)->body = (void *)malloc(bodylen);
2385        if ((*doc)->body == NULL) { // LCOV_EXCL_START
2386            return FDB_RESULT_ALLOC_FAIL;
2387        } // LCOV_EXCL_STOP
2388        memcpy((*doc)->body, body, bodylen);
2389        (*doc)->bodylen = bodylen;
2390    }
2391
2392    (*doc)->seqnum = SEQNUM_NOT_USED;
2393    return FDB_RESULT_SUCCESS;
2394}
2395
2396LIBFDB_API
2397void fdb_doc_set_seqnum(fdb_doc *doc,
2398                        const fdb_seqnum_t seqnum)
2399{
2400    if (doc) {
2401        doc->seqnum = seqnum;
2402        if (seqnum != SEQNUM_NOT_USED) {
2403            doc->flags |= FDB_CUSTOM_SEQNUM; // fdb_set will now use above seqnum
2404        } else { // reset custom seqnum flag, fdb_set will now generate new seqnum
2405            doc->flags &= ~FDB_CUSTOM_SEQNUM;
2406        }
2407    }
2408}
2409
2410// doc MUST BE allocated by malloc
2411LIBFDB_API
2412fdb_status fdb_doc_free(fdb_doc *doc)
2413{
2414    if (doc) {
2415        free(doc->key);
2416        free(doc->meta);
2417        free(doc->body);
2418        free(doc);
2419    }
2420    return FDB_RESULT_SUCCESS;
2421}
2422
2423INLINE uint64_t _fdb_wal_get_old_offset(void *voidhandle,
2424                                        struct wal_item *item)
2425{
2426    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
2427    uint64_t old_offset = 0;
2428
2429    if (item->action == WAL_ACT_REMOVE) {
2430        // For immediate remove, old_offset value is critical
2431        // so that we should get an exact value.
2432        hbtrie_find(handle->trie,
2433                    item->header->key,
2434                    item->header->keylen,
2435                    (void*)&old_offset);
2436    } else {
2437        hbtrie_find_offset(handle->trie,
2438                           item->header->key,
2439                           item->header->keylen,
2440                           (void*)&old_offset);
2441    }
2442    btreeblk_end(handle->bhandle);
2443    old_offset = _endian_decode(old_offset);
2444
2445    return old_offset;
2446}
2447
2448// A stale sequence number entry that can be purged from the sequence tree
2449// during the WAL flush.
2450struct wal_stale_seq_entry {
2451    fdb_kvs_id_t kv_id;
2452    fdb_seqnum_t seqnum;
2453    struct avl_node avl_entry;
2454};
2455
2456// Delta changes in KV store stats during the WAL flush
2457struct wal_kvs_delta_stat {
2458    fdb_kvs_id_t kv_id;
2459    int64_t nlivenodes;
2460    int64_t ndocs;
2461    int64_t ndeletes;
2462    int64_t datasize;
2463    int64_t deltasize;
2464    struct avl_node avl_entry;
2465};
2466
2467INLINE int _fdb_seq_entry_cmp(struct avl_node *a, struct avl_node *b, void *aux)
2468{
2469    (void) aux;
2470    struct wal_stale_seq_entry *entry1 = _get_entry(a, struct wal_stale_seq_entry,
2471                                                    avl_entry);
2472    struct wal_stale_seq_entry *entry2 = _get_entry(b, struct wal_stale_seq_entry,
2473                                                    avl_entry);
2474    if (entry1->kv_id < entry2->kv_id) {
2475        return -1;
2476    } else if (entry1->kv_id > entry2->kv_id) {
2477        return 1;
2478    } else {
2479        return _CMP_U64(entry1->seqnum, entry2->seqnum);
2480    }
2481}
2482
2483
2484// Compare function to sort KVS delta stat entries in the AVL tree during WAL flush
2485INLINE int _kvs_delta_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
2486{
2487    (void) aux;
2488    struct wal_kvs_delta_stat *stat1 = _get_entry(a, struct wal_kvs_delta_stat,
2489                                                  avl_entry);
2490    struct wal_kvs_delta_stat *stat2 = _get_entry(b, struct wal_kvs_delta_stat,
2491                                                  avl_entry);
2492    if (stat1->kv_id < stat2->kv_id) {
2493        return -1;
2494    } else if (stat1->kv_id > stat2->kv_id) {
2495        return 1;
2496    } else {
2497        return 0;
2498    }
2499}
2500
2501INLINE void _fdb_wal_flush_seq_purge(void *dbhandle,
2502                                     struct avl_tree *stale_seqnum_list,
2503                                     struct avl_tree *kvs_delta_stats)
2504{
2505    fdb_seqnum_t _seqnum;
2506    int64_t nlivenodes;
2507    int64_t ndeltanodes;
2508    int64_t delta;
2509    uint8_t kvid_seqnum[sizeof(fdb_kvs_id_t) + sizeof(fdb_seqnum_t)];
2510    struct wal_stale_seq_entry *seq_entry;
2511    struct wal_kvs_delta_stat *delta_stat;
2512    struct wal_kvs_delta_stat kvs_delta_query;
2513
2514    fdb_kvs_handle *handle = (fdb_kvs_handle *)dbhandle;
2515    struct avl_node *node = avl_first(stale_seqnum_list);
2516    while (node) {
2517        seq_entry = _get_entry(node, struct wal_stale_seq_entry, avl_entry);
2518        node = avl_next(node);
2519        nlivenodes = handle->bhandle->nlivenodes;
2520        ndeltanodes = handle->bhandle->ndeltanodes;
2521        _seqnum = _endian_encode(seq_entry->seqnum);
2522        if (handle->kvs) {
2523            // multi KV instance mode .. HB+trie
2524            kvid2buf(sizeof(fdb_kvs_id_t), seq_entry->kv_id, kvid_seqnum);
2525            memcpy(kvid_seqnum + sizeof(fdb_kvs_id_t), &_seqnum, sizeof(fdb_seqnum_t));
2526            hbtrie_remove(handle->seqtrie, (void*)kvid_seqnum,
2527                          sizeof(fdb_kvs_id_t) + sizeof(fdb_seqnum_t));
2528        } else {
2529            btree_remove(handle->seqtree, (void*)&_seqnum);
2530        }
2531        btreeblk_end(handle->bhandle);
2532
2533        kvs_delta_query.kv_id = seq_entry->kv_id;
2534        avl_node *delta_stat_node = avl_search(kvs_delta_stats,
2535                                               &kvs_delta_query.avl_entry,
2536                                               _kvs_delta_stat_cmp);
2537        if (delta_stat_node) {
2538            delta_stat = _get_entry(delta_stat_node, struct wal_kvs_delta_stat,
2539                                    avl_entry);
2540            delta = handle->bhandle->nlivenodes - nlivenodes;
2541            delta_stat->nlivenodes += delta;
2542            delta = handle->bhandle->ndeltanodes - ndeltanodes;
2543            delta *= handle->config.blocksize;
2544            delta_stat->deltasize += delta;
2545        }
2546        avl_remove(stale_seqnum_list, &seq_entry->avl_entry);
2547        free(seq_entry);
2548    }
2549}
2550
2551INLINE void _fdb_wal_flush_kvs_delta_stats(struct filemgr *file,
2552                                           struct avl_tree *kvs_delta_stats)
2553{
2554    struct avl_node *node;
2555    struct wal_kvs_delta_stat *delta_stat;
2556    node = avl_first(kvs_delta_stats);
2557    while (node) {
2558        delta_stat = _get_entry(node, struct wal_kvs_delta_stat, avl_entry);
2559        node = avl_next(node);
2560        _kvs_stat_update_attr(file, delta_stat->kv_id,
2561                              KVS_STAT_DATASIZE, delta_stat->datasize);
2562        _kvs_stat_update_attr(file, delta_stat->kv_id,
2563                              KVS_STAT_NDOCS, delta_stat->ndocs);
2564        _kvs_stat_update_attr(file, delta_stat->kv_id,
2565                              KVS_STAT_NDELETES, delta_stat->ndeletes);
2566        _kvs_stat_update_attr(file, delta_stat->kv_id,
2567                              KVS_STAT_NLIVENODES, delta_stat->nlivenodes);
2568        _kvs_stat_update_attr(file, delta_stat->kv_id,
2569                              KVS_STAT_DELTASIZE, delta_stat->deltasize);
2570        avl_remove(kvs_delta_stats, &delta_stat->avl_entry);
2571        free(delta_stat);
2572    }
2573}
2574
2575INLINE fdb_status _fdb_wal_flush_func(void *voidhandle,
2576                                      struct wal_item *item,
2577                                      struct avl_tree *stale_seqnum_list,
2578                                      struct avl_tree *kvs_delta_stats)
2579{
2580    hbtrie_result hr;
2581    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
2582    fdb_seqnum_t _seqnum;
2583    fdb_kvs_id_t kv_id = 0;
2584    fdb_status fs = FDB_RESULT_SUCCESS;
2585    uint8_t *var_key = alca(uint8_t, handle->config.chunksize);
2586    int size_id, size_seq;
2587    uint8_t *kvid_seqnum;
2588    uint64_t old_offset;
2589    int64_t _offset;
2590    int64_t delta;
2591    struct docio_object _doc;
2592    struct filemgr *file = handle->dhandle->file;
2593
2594    memset(var_key, 0, handle->config.chunksize);
2595    if (handle->kvs) {
2596        buf2kvid(handle->config.chunksize, item->header->key, &kv_id);
2597    } else {
2598        kv_id = 0;
2599    }
2600
2601    struct wal_kvs_delta_stat *kvs_delta_stat;
2602    struct wal_kvs_delta_stat kvs_delta_query;
2603    kvs_delta_query.kv_id = kv_id;
2604    avl_node *delta_stat_node = avl_search(kvs_delta_stats,
2605                                           &kvs_delta_query.avl_entry,
2606                                           _kvs_delta_stat_cmp);
2607    if (delta_stat_node) {
2608        kvs_delta_stat = _get_entry(delta_stat_node, struct wal_kvs_delta_stat,
2609                                    avl_entry);
2610    } else {
2611        kvs_delta_stat = (struct wal_kvs_delta_stat *)
2612            calloc(1, sizeof(struct wal_kvs_delta_stat));
2613        kvs_delta_stat->kv_id = kv_id;
2614        avl_insert(kvs_delta_stats, &kvs_delta_stat->avl_entry,
2615                   _kvs_delta_stat_cmp);
2616    }
2617
2618    int64_t nlivenodes = handle->bhandle->nlivenodes;
2619    int64_t ndeltanodes = handle->bhandle->ndeltanodes;
2620
2621    if (item->action == WAL_ACT_INSERT ||
2622        item->action == WAL_ACT_LOGICAL_REMOVE) {
2623        _offset = _endian_encode(item->offset);
2624
2625        hbtrie_insert(handle->trie,
2626                      item->header->key,
2627                      item->header->keylen,
2628                      (void *)&_offset,
2629                      (void *)&old_offset);
2630
2631        fs = btreeblk_end(handle->bhandle);
2632        if (fs != FDB_RESULT_SUCCESS) {
2633            return fs;
2634        }
2635        old_offset = _endian_decode(old_offset);
2636
2637        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2638            _seqnum = _endian_encode(item->seqnum);
2639            if (handle->kvs) {
2640                // multi KV instance mode .. HB+trie
2641                uint64_t old_offset_local;
2642
2643                size_id = sizeof(fdb_kvs_id_t);
2644                size_seq = sizeof(fdb_seqnum_t);
2645                kvid_seqnum = alca(uint8_t, size_id + size_seq);
2646                kvid2buf(size_id, kv_id, kvid_seqnum);
2647                memcpy(kvid_seqnum + size_id, &_seqnum, size_seq);
2648                hbtrie_insert(handle->seqtrie, kvid_seqnum, size_id + size_seq,
2649                              (void *)&_offset, (void *)&old_offset_local);
2650            } else {
2651                btree_insert(handle->seqtree, (void *)&_seqnum,
2652                             (void *)&_offset);
2653            }
2654            fs = btreeblk_end(handle->bhandle);
2655            if (fs != FDB_RESULT_SUCCESS) {
2656                return fs;
2657            }
2658        }
2659
2660        delta = handle->bhandle->nlivenodes - nlivenodes;
2661        kvs_delta_stat->nlivenodes += delta;
2662        delta = handle->bhandle->ndeltanodes - ndeltanodes;
2663        delta *= handle->config.blocksize;
2664        kvs_delta_stat->deltasize += delta;
2665
2666        if (old_offset == BLK_NOT_FOUND) {
2667            if (item->action == WAL_ACT_INSERT) {
2668                ++kvs_delta_stat->ndocs;
2669            } else { // inserted a logical deleted doc into main index
2670                ++kvs_delta_stat->ndeletes;
2671            }
2672            kvs_delta_stat->datasize += item->doc_size;
2673            kvs_delta_stat->deltasize += item->doc_size;
2674        } else { // update or logical delete
2675            // This block is already cached when we call HBTRIE_INSERT.
2676            // No additional block access.
2677            char dummy_key[FDB_MAX_KEYLEN];
2678            _doc.meta = _doc.body = NULL;
2679            _doc.key = &dummy_key;
2680            _offset = docio_read_doc_key_meta(handle->dhandle, old_offset,
2681                                              &_doc, true);
2682            if (_offset < 0) {
2683                return (fdb_status) _offset;
2684            } else if (_offset == 0) {
2685                // Note that this is not an error as old_offset is pointing to
2686                // the zero-filled region in a document block.
2687                return FDB_RESULT_KEY_NOT_FOUND;
2688            }
2689            free(_doc.meta);
2690            filemgr_mark_stale(file, old_offset, _fdb_get_docsize(_doc.length));
2691
2692            if (!(_doc.length.flag & DOCIO_DELETED)) {//prev doc was not deleted
2693                if (item->action == WAL_ACT_LOGICAL_REMOVE) { // now deleted
2694                    --kvs_delta_stat->ndocs;
2695                    ++kvs_delta_stat->ndeletes;
2696                } // else no change (prev doc was insert, now just an update)
2697            } else { // prev doc in main index was a logically deleted doc
2698                if (item->action == WAL_ACT_INSERT) { // now undeleted
2699                    ++kvs_delta_stat->ndocs;
2700                    --kvs_delta_stat->ndeletes;
2701                } // else no change (prev doc was deleted, now re-deleted)
2702            }
2703
2704            delta = (int)item->doc_size - (int)_fdb_get_docsize(_doc.length);
2705            kvs_delta_stat->datasize += delta;
2706            bid_t last_hdr_bid = atomic_get_uint64_t(&handle->last_hdr_bid);
2707            if (last_hdr_bid * handle->config.blocksize < old_offset) {
2708                kvs_delta_stat->deltasize += delta;
2709            } else {
2710                kvs_delta_stat->deltasize += (int)item->doc_size;
2711            }
2712
2713            // Avoid duplicates (remove previous sequence number)
2714            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2715                struct wal_stale_seq_entry *entry = (struct wal_stale_seq_entry *)
2716                    calloc(1, sizeof(struct wal_stale_seq_entry));
2717                entry->kv_id = kv_id;
2718                entry->seqnum = _doc.seqnum;
2719                avl_insert(stale_seqnum_list, &entry->avl_entry,
2720                           _fdb_seq_entry_cmp);
2721            }
2722        }
2723    } else {
2724        // Immediate remove
2725        old_offset = item->old_offset;
2726        hr = hbtrie_remove(handle->trie, item->header->key,
2727                           item->header->keylen);
2728        fs = btreeblk_end(handle->bhandle);
2729        if (fs != FDB_RESULT_SUCCESS) {
2730            return fs;
2731        }
2732
2733        if (hr == HBTRIE_RESULT_SUCCESS) {
2734            // This block is already cached when we call _fdb_wal_get_old_offset
2735            // No additional block access should be done.
2736            char dummy_key[FDB_MAX_KEYLEN];
2737            _doc.meta = _doc.body = NULL;
2738            _doc.key = &dummy_key;
2739            _offset = docio_read_doc_key_meta(handle->dhandle, old_offset,
2740                                              &_doc, true);
2741            if (_offset < 0) {
2742                return (fdb_status) _offset;
2743            } else if (_offset == 0) {
2744                return FDB_RESULT_KEY_NOT_FOUND;
2745            }
2746            free(_doc.meta);
2747            filemgr_mark_stale(file, old_offset, _fdb_get_docsize(_doc.length));
2748
2749            // Reduce the total number of docs by one
2750            --kvs_delta_stat->ndocs;
2751            if (_doc.length.flag & DOCIO_DELETED) {//prev deleted doc is dropped
2752                --kvs_delta_stat->ndeletes;
2753            }
2754
2755            // Reduce the total datasize by size of previously present doc
2756            delta = -(int)_fdb_get_docsize(_doc.length);
2757            kvs_delta_stat->datasize += delta;
2758            // if multiple wal flushes happen before commit, then it's possible
2759            // that this doc deleted was inserted & flushed after last commit
2760            // In this case we need to update the deltasize too which tracks
2761            // the amount of new data inserted between commits.
2762            bid_t last_hdr_bid = atomic_get_uint64_t(&handle->last_hdr_bid);
2763            if (last_hdr_bid * handle->config.blocksize < old_offset) {
2764                kvs_delta_stat->deltasize += delta;
2765            }
2766
2767            // remove sequence number for the removed doc
2768            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2769                struct wal_stale_seq_entry *entry = (struct wal_stale_seq_entry *)
2770                    calloc(1, sizeof(struct wal_stale_seq_entry));
2771                entry->kv_id = kv_id;
2772                entry->seqnum = _doc.seqnum;
2773                avl_insert(stale_seqnum_list, &entry->avl_entry, _fdb_seq_entry_cmp);
2774            }
2775
2776            // Update index size to new size after the remove operation
2777            delta = handle->bhandle->nlivenodes - nlivenodes;
2778            kvs_delta_stat->nlivenodes += delta;
2779
2780            // ndeltanodes measures number of new index nodes created due to
2781            // this hbtrie_remove() operation
2782            delta = (int)handle->bhandle->ndeltanodes - ndeltanodes;
2783            delta *= handle->config.blocksize;
2784            kvs_delta_stat->deltasize += delta;
2785        }
2786    }
2787    return FDB_RESULT_SUCCESS;
2788}
2789
2790void fdb_sync_db_header(fdb_kvs_handle *handle)
2791{
2792    uint64_t cur_revnum = filemgr_get_header_revnum(handle->file);
2793    if (handle->cur_header_revnum != cur_revnum) {
2794        void *header_buf = NULL;
2795        size_t header_len;
2796        bid_t hdr_bid;
2797        filemgr_header_revnum_t revnum;
2798
2799        header_buf = filemgr_get_header(handle->file, NULL, &header_len,
2800                                        &hdr_bid, NULL, &revnum);
2801        if (header_len > 0) {
2802            uint64_t header_flags, dummy64, version;
2803            bid_t idtree_root;
2804            bid_t new_seq_root;
2805            bid_t new_stale_root;
2806            char *compacted_filename;
2807            char *prev_filename = NULL;
2808
2809            version = handle->file->version;
2810            atomic_store_uint64_t(&handle->last_hdr_bid, hdr_bid);
2811            handle->cur_header_revnum = revnum;
2812
2813            fdb_fetch_header(version, header_buf, &idtree_root,
2814                             &new_seq_root, &new_stale_root, &dummy64,
2815                             &dummy64, &dummy64,
2816                             &dummy64, &handle->last_wal_flush_hdr_bid,
2817                             &handle->kv_info_offset, &header_flags,
2818                             &compacted_filename, &prev_filename);
2819
2820            if (handle->dirty_updates) {
2821                // discard all cached writable b+tree nodes
2822                // to avoid data inconsistency with other writers
2823                btreeblk_discard_blocks(handle->bhandle);
2824            }
2825
2826            handle->trie->root_bid = idtree_root;
2827
2828            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2829                if (new_seq_root != handle->seqtree->root_bid) {
2830                    if (handle->config.multi_kv_instances) {
2831                        handle->seqtrie->root_bid = new_seq_root;
2832                    } else {
2833                        btree_init_from_bid(handle->seqtree,
2834                                            handle->seqtree->blk_handle,
2835                                            handle->seqtree->blk_ops,
2836                                            handle->seqtree->kv_ops,
2837                                            handle->seqtree->blksize,
2838                                            new_seq_root);
2839                    }
2840                }
2841            }
2842
2843            if (ver_staletree_support(version)) {
2844                btree_init_from_bid(handle->staletree,
2845                                    handle->staletree->blk_handle,
2846                                    handle->staletree->blk_ops,
2847                                    handle->staletree->kv_ops,
2848                                    handle->staletree->blksize,
2849                                    new_stale_root);
2850            } else {
2851                handle->staletree = NULL;
2852            }
2853
2854            if (prev_filename) {
2855                free(prev_filename);
2856            }
2857
2858            handle->dirty_updates = 0;
2859            if (handle->kvs) {
2860                // multiple KV instance mode AND sub handle
2861                handle->seqnum = fdb_kvs_get_seqnum(handle->file,
2862                                                    handle->kvs->id);
2863            } else {
2864                // super handle OR single KV instance mode
2865                handle->seqnum = filemgr_get_seqnum(handle->file);
2866            }
2867        } else {
2868            atomic_store_uint64_t(&handle->last_hdr_bid,
2869                                  filemgr_get_header_bid(handle->file));
2870        }
2871
2872        if (header_buf) {
2873            free(header_buf);
2874        }
2875    } else {
2876        if (handle == handle->fhandle->root) {
2877            // MB-20091: Commits use root handle that points to default kv store
2878            // The same default KV Store can have a different user-level handle.
2879            // To ensure that the root handle which will do the commit always
2880            // remains updated with the latest sequence number generated by the
2881            // user KVS Handle, we must always update the root handle's seqnum
2882            // even if there are no new commit headers to sync up in the file.
2883            handle->seqnum = filemgr_get_seqnum(handle->file);
2884        }
2885    }
2886}
2887
2888fdb_status fdb_check_file_reopen(fdb_kvs_handle *handle, file_status_t *status)
2889{
2890    bool fhandle_ret;
2891    fdb_status fs = FDB_RESULT_SUCCESS;
2892    file_status_t fstatus = filemgr_get_file_status(handle->file);
2893    // check whether the compaction is done
2894    if (fstatus == FILE_REMOVED_PENDING) {
2895        uint64_t ndocs, ndeletes, datasize, nlivenodes, last_wal_flush_hdr_bid;
2896        uint64_t kv_info_offset, header_flags;
2897        size_t header_len;
2898        char *new_filename;
2899        uint8_t *buf = alca(uint8_t, handle->config.blocksize);
2900        bid_t trie_root_bid, seq_root_bid, stale_root_bid;
2901        fdb_config config = handle->config;
2902
2903        // close the current file and newly open the new file
2904        if (handle->config.compaction_mode == FDB_COMPACTION_AUTO) {
2905            // compaction daemon mode .. just close and then open
2906            char filename[FDB_MAX_FILENAME_LEN];
2907            strcpy(filename, handle->filename);
2908
2909            // We don't need to maintain fhandle list for the old file
2910            // as there will be no more mutation on the file.
2911            fhandle_ret = filemgr_fhandle_remove(handle->file, handle->fhandle);
2912            fs = _fdb_close(handle);
2913            if (fs != FDB_RESULT_SUCCESS) {
2914                if (fhandle_ret) {
2915                    filemgr_fhandle_add(handle->file, handle->fhandle);
2916                }
2917                return fs;
2918            }
2919
2920            fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
2921            if (fs != FDB_RESULT_SUCCESS) {
2922                return fs;
2923            }
2924            filemgr_fhandle_add(handle->file, handle->fhandle);
2925
2926        } else {
2927            filemgr_get_header(handle->file, buf, &header_len, NULL, NULL, NULL);
2928            fdb_fetch_header(handle->file->version, buf,
2929                             &trie_root_bid, &seq_root_bid, &stale_root_bid,
2930                             &ndocs, &ndeletes, &nlivenodes, &datasize,
2931                             &last_wal_flush_hdr_bid,
2932                             &kv_info_offset, &header_flags,
2933                             &new_filename, NULL);
2934
2935            fhandle_ret = filemgr_fhandle_remove(handle->file, handle->fhandle);
2936            fs = _fdb_close(handle);
2937            if (fs != FDB_RESULT_SUCCESS) {
2938                if (fhandle_ret) {
2939                    filemgr_fhandle_add(handle->file, handle->fhandle);
2940                }
2941                return fs;
2942            }
2943
2944            fs = _fdb_open(handle, new_filename, FDB_AFILENAME, &config);
2945            if (fs != FDB_RESULT_SUCCESS) {
2946                return fs;
2947            }
2948            filemgr_fhandle_add(handle->file, handle->fhandle);
2949        }
2950    }
2951    if (status) {
2952        *status = fstatus;
2953    }
2954    return fs;
2955}
2956
2957static void _fdb_sync_dirty_root(fdb_kvs_handle *handle)
2958{
2959    bid_t dirty_idtree_root = BLK_NOT_FOUND;
2960    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
2961
2962    if (handle->shandle) {
2963        // skip snapshot
2964        return;
2965    }
2966
2967    struct filemgr_dirty_update_node *dirty_update;
2968    dirty_update = filemgr_dirty_update_get_latest(handle->file);
2969    btreeblk_set_dirty_update(handle->bhandle, dirty_update);
2970
2971    if (dirty_update) {
2972        filemgr_dirty_update_get_root(handle->file, dirty_update,
2973                                      &dirty_idtree_root, &dirty_seqtree_root);
2974        _fdb_import_dirty_root(handle, dirty_idtree_root, dirty_seqtree_root);
2975        btreeblk_discard_blocks(handle->bhandle);
2976    }
2977
2978    return;
2979}
2980
2981static void _fdb_release_dirty_root(fdb_kvs_handle *handle)
2982{
2983    if (!handle->shandle) {
2984        struct filemgr_dirty_update_node *dirty_update;
2985        dirty_update = btreeblk_get_dirty_update(handle->bhandle);
2986        if (dirty_update) {
2987            filemgr_dirty_update_close_node(handle->file, dirty_update);
2988            btreeblk_clear_dirty_update(handle->bhandle);
2989        }
2990    }
2991}
2992
2993LIBFDB_API
2994fdb_status fdb_get(fdb_kvs_handle *handle, fdb_doc *doc)
2995{
2996    uint64_t offset;
2997    int64_t _offset;
2998    struct docio_object _doc;
2999    struct filemgr *wal_file = NULL;
3000    struct docio_handle *dhandle;
3001    struct _fdb_key_cmp_info cmp_info;
3002    fdb_status wr;
3003    hbtrie_result hr = HBTRIE_RESULT_FAIL;
3004    fdb_txn *txn;
3005    fdb_doc doc_kv;
3006    LATENCY_STAT_START();
3007
3008    if (!handle) {
3009        return FDB_RESULT_INVALID_HANDLE;
3010    }
3011
3012    if (!doc || !doc->key || doc->keylen == 0 ||
3013        doc->keylen > FDB_MAX_KEYLEN ||
3014        (handle->kvs_config.custom_cmp &&
3015            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
3016        return FDB_RESULT_INVALID_ARGS;
3017    }
3018
3019    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
3020        return FDB_RESULT_HANDLE_BUSY;
3021    }
3022
3023    doc_kv = *doc;
3024
3025    if (handle->kvs) {
3026        // multi KV instance mode
3027        int size_chunk = handle->config.chunksize;
3028        doc_kv.keylen = doc->keylen + size_chunk;
3029        doc_kv.key = alca(uint8_t, doc_kv.keylen);
3030        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
3031        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
3032    }
3033
3034    if (!handle->shandle) {
3035        fdb_check_file_reopen(handle, NULL);
3036        txn = handle->fhandle->root->txn;
3037        if (!txn) {
3038            txn = &handle->file->global_txn;
3039        }
3040    } else {
3041        txn = handle->shandle->snap_txn;
3042    }
3043
3044    cmp_info.kvs_config = handle->kvs_config;
3045    cmp_info.kvs = handle->kvs;
3046    wal_file = handle->file;
3047    dhandle = handle->dhandle;
3048
3049    if (handle->kvs) {
3050        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, &doc_kv,
3051                      &offset);
3052    } else {
3053        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, doc,
3054                      &offset);
3055    }
3056
3057    if (!handle->shandle) {
3058        fdb_sync_db_header(handle);
3059    }
3060
3061    atomic_incr_uint64_t(&handle->op_stats->num_gets, std::memory_order_relaxed);
3062
3063    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
3064        _fdb_sync_dirty_root(handle);
3065
3066        if (handle->kvs) {
3067            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
3068                             (void *)&offset);
3069        } else {
3070            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
3071                             (void *)&offset);
3072        }
3073        btreeblk_end(handle->bhandle);
3074        offset = _endian_decode(offset);
3075
3076        _fdb_release_dirty_root(handle);
3077    }
3078
3079    if ((wr == FDB_RESULT_SUCCESS && offset != BLK_NOT_FOUND) ||
3080         hr == HBTRIE_RESULT_SUCCESS) {
3081        bool alloced_meta = doc->meta ? false : true;
3082        bool alloced_body = doc->body ? false : true;
3083        if (handle->kvs) {
3084            _doc.key = doc_kv.key;
3085            _doc.length.keylen = doc_kv.keylen;
3086            doc->deleted = doc_kv.deleted; // update deleted field if wal_find
3087        } else {
3088            _doc.key = doc->key;
3089            _doc.length.keylen = doc->keylen;
3090        }
3091        _doc.meta = doc->meta;
3092        _doc.body = doc->body;
3093
3094        if (wr == FDB_RESULT_SUCCESS && doc->deleted) {
3095            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3096            return FDB_RESULT_KEY_NOT_FOUND;
3097        }
3098
3099        _offset = docio_read_doc(dhandle, offset, &_doc, true);
3100        if (_offset <= 0) {
3101            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3102            return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
3103        }
3104
3105        if (_doc.length.keylen != doc_kv.keylen ||
3106            _doc.length.flag & DOCIO_DELETED) {
3107            free_docio_object(&_doc, 0, alloced_meta, alloced_body);
3108            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3109            return FDB_RESULT_KEY_NOT_FOUND;
3110        }
3111
3112        doc->seqnum = _doc.seqnum;
3113        doc->metalen = _doc.length.metalen;
3114        doc->bodylen = _doc.length.bodylen;
3115        doc->meta = _doc.meta;
3116        doc->body = _doc.body;
3117        doc->deleted = _doc.length.flag & DOCIO_DELETED;
3118        doc->size_ondisk = _fdb_get_docsize(_doc.length);
3119        doc->offset = offset;
3120
3121        LATENCY_STAT_END(handle->file, FDB_LATENCY_GETS);
3122        atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3123        return FDB_RESULT_SUCCESS;
3124    }
3125
3126    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3127    return FDB_RESULT_KEY_NOT_FOUND;
3128}
3129
3130// search document metadata using key
3131LIBFDB_API
3132fdb_status fdb_get_metaonly(fdb_kvs_handle *handle, fdb_doc *doc)
3133{
3134    uint64_t offset;
3135    struct docio_object _doc;
3136    struct docio_handle *dhandle;
3137    struct filemgr *wal_file = NULL;
3138    fdb_status wr;
3139    hbtrie_result hr = HBTRIE_RESULT_FAIL;
3140    fdb_txn *txn;
3141    struct _fdb_key_cmp_info cmp_info;
3142    fdb_doc doc_kv;
3143    LATENCY_STAT_START();
3144
3145    if (!handle) {
3146        return FDB_RESULT_INVALID_HANDLE;
3147    }
3148
3149    if (!doc || !doc->key ||
3150        doc->keylen == 0 || doc->keylen > FDB_MAX_KEYLEN ||
3151        (handle->kvs_config.custom_cmp &&
3152            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
3153        return FDB_RESULT_INVALID_ARGS;
3154    }
3155
3156    doc_kv = *doc;
3157
3158    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
3159        return FDB_RESULT_HANDLE_BUSY;
3160    }
3161
3162    if (handle->kvs) {
3163        // multi KV instance mode
3164        int size_chunk = handle->config.chunksize;
3165        doc_kv.keylen = doc->keylen + size_chunk;
3166        doc_kv.key = alca(uint8_t, doc_kv.keylen);
3167        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
3168        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
3169    }
3170
3171    if (!handle->shandle) {
3172        fdb_check_file_reopen(handle, NULL);
3173        txn = handle->fhandle->root->txn;
3174        if (!txn) {
3175            txn = &handle->file->global_txn;
3176        }
3177    } else {
3178        txn = handle->shandle->snap_txn;
3179    }
3180
3181    cmp_info.kvs_config = handle->kvs_config;
3182    cmp_info.kvs = handle->kvs;
3183    wal_file = handle->file;
3184    dhandle = handle->dhandle;
3185
3186    if (handle->kvs) {
3187        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, &doc_kv,
3188                      &offset);
3189    } else {
3190        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, doc, &offset);
3191    }
3192
3193    if (!handle->shandle) {
3194        fdb_sync_db_header(handle);
3195    }
3196    atomic_incr_uint64_t(&handle->op_stats->num_gets, std::memory_order_relaxed);
3197
3198    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
3199        _fdb_sync_dirty_root(handle);
3200
3201        if (handle->kvs) {
3202            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
3203                             (void *)&offset);
3204        } else {
3205            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
3206                             (void *)&offset);
3207        }
3208        btreeblk_end(handle->bhandle);
3209        offset = _endian_decode(offset);
3210
3211        _fdb_release_dirty_root(handle);
3212    }
3213
3214    if ((wr == FDB_RESULT_SUCCESS && offset != BLK_NOT_FOUND) ||
3215         hr == HBTRIE_RESULT_SUCCESS) {
3216        if (handle->kvs) {
3217            _doc.key = doc_kv.key;
3218            _doc.length.keylen = doc_kv.keylen;
3219        } else {
3220            _doc.key = doc->key;
3221            _doc.length.