xref: /6.0.3/forestdb/src/forestdb.cc (revision c835d423)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <sys/time.h>
25#endif
26
27#include "libforestdb/forestdb.h"
28#include "fdb_internal.h"
29#include "filemgr.h"
30#include "hbtrie.h"
31#include "list.h"
32#include "breakpad.h"
33#include "btree.h"
34#include "btree_kv.h"
35#include "btree_var_kv_ops.h"
36#include "docio.h"
37#include "btreeblock.h"
38#include "common.h"
39#include "wal.h"
40#include "filemgr_ops.h"
41#include "configuration.h"
42#include "internal_types.h"
43#include "bgflusher.h"
44#include "compactor.h"
45#include "memleak.h"
46#include "time_utils.h"
47#include "timing.h"
48#include "system_resource_stats.h"
49#include "version.h"
50#include "staleblock.h"
51
52#ifdef __DEBUG
53#ifndef __DEBUG_FDB
54    #undef DBG
55    #undef DBGCMD
56    #undef DBGSW
57    #define DBG(...)
58    #define DBGCMD(...)
59    #define DBGSW(n, ...)
60#endif
61#endif
62
63
64static volatile uint8_t fdb_initialized = 0;
65static volatile uint32_t fdb_open_inprog = 0;
66#ifdef SPIN_INITIALIZER
67static spin_t initial_lock = SPIN_INITIALIZER;
68#else
69static volatile unsigned int initial_lock_status = 0;
70static spin_t initial_lock;
71#endif
72
73INLINE int _cmp_uint64_t_endian_safe(void *key1, void *key2, void *aux)
74{
75    (void) aux;
76    uint64_t a,b;
77    a = *(uint64_t*)key1;
78    b = *(uint64_t*)key2;
79    a = _endian_decode(a);
80    b = _endian_decode(b);
81    return _CMP_U64(a, b);
82}
83
84size_t _fdb_readkey_wrap(void *handle, uint64_t offset, void *buf)
85{
86    fdb_status fs;
87    keylen_t keylen;
88    struct docio_handle *dhandle = (struct docio_handle*)handle;
89
90    offset = _endian_decode(offset);
91    fs = docio_read_doc_key(dhandle, offset, &keylen, buf);
92    if (fs == FDB_RESULT_SUCCESS) {
93        return keylen;
94    } else {
95        const char *msg = "docio_read_doc_key error: read failure on "
96            "offset %" _F64 " in a database file '%s' "
97            ": FDB status %d, lastbid 0x%" _X64 ", "
98            "curblock 0x%" _X64 ", curpos 0x%x\n";
99        fdb_log(NULL, FDB_RESULT_READ_FAIL, msg, offset,
100                dhandle->file->filename, fs, dhandle->lastbid,
101                dhandle->curblock, dhandle->curpos);
102        dbg_print_buf(dhandle->readbuffer, dhandle->file->blocksize, true, 16);
103        return 0;
104    }
105}
106
107size_t _fdb_readseq_wrap(void *handle, uint64_t offset, void *buf)
108{
109    int size_id, size_seq, size_chunk;
110    fdb_seqnum_t _seqnum;
111    struct docio_object doc;
112    struct docio_handle *dhandle = (struct docio_handle *)handle;
113
114    size_id = sizeof(fdb_kvs_id_t);
115    size_seq = sizeof(fdb_seqnum_t);
116    size_chunk = dhandle->file->config->chunksize;
117    memset(&doc, 0, sizeof(struct docio_object));
118
119    offset = _endian_decode(offset);
120    if (docio_read_doc_key_meta((struct docio_handle *)handle, offset,
121                                &doc, true) <= 0) {
122        return 0;
123    }
124    buf2buf(size_chunk, doc.key, size_id, buf);
125    _seqnum = _endian_encode(doc.seqnum);
126    memcpy((uint8_t*)buf + size_id, &_seqnum, size_seq);
127
128    free(doc.key);
129    free(doc.meta);
130
131    return size_id + size_seq;
132}
133
134int _fdb_custom_cmp_wrap(void *key1, void *key2, void *aux)
135{
136    int is_key1_inf, is_key2_inf;
137    uint8_t *keystr1 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
138    uint8_t *keystr2 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
139    size_t keylen1, keylen2;
140    btree_cmp_args *args = (btree_cmp_args *)aux;
141    fdb_custom_cmp_variable cmp = (fdb_custom_cmp_variable)args->aux;
142
143    is_key1_inf = _is_inf_key(key1);
144    is_key2_inf = _is_inf_key(key2);
145    if (is_key1_inf && is_key2_inf) { // both are infinite
146        return 0;
147    } else if (!is_key1_inf && is_key2_inf) { // key2 is infinite
148        return -1;
149    } else if (is_key1_inf && !is_key2_inf) { // key1 is infinite
150        return 1;
151    }
152
153    _get_var_key(key1, (void*)keystr1, &keylen1);
154    _get_var_key(key2, (void*)keystr2, &keylen2);
155
156    if (keylen1 == 0 && keylen2 == 0) {
157        return 0;
158    } else if (keylen1 ==0 && keylen2 > 0) {
159        return -1;
160    } else if (keylen1 > 0 && keylen2 == 0) {
161        return 1;
162    }
163
164    return cmp(keystr1, keylen1, keystr2, keylen2);
165}
166
167void fdb_fetch_header(uint64_t version,
168                      void *header_buf,
169                      bid_t *trie_root_bid,
170                      bid_t *seq_root_bid,
171                      bid_t *stale_root_bid,
172                      uint64_t *ndocs,
173                      uint64_t *ndeletes,
174                      uint64_t *nlivenodes,
175                      uint64_t *datasize,
176                      uint64_t *last_wal_flush_hdr_bid,
177                      uint64_t *kv_info_offset,
178                      uint64_t *header_flags,
179                      char **new_filename,
180                      char **old_filename)
181{
182    size_t offset = 0;
183    uint16_t new_filename_len;
184    uint16_t old_filename_len;
185
186    seq_memcpy(trie_root_bid, (uint8_t *)header_buf + offset,
187               sizeof(bid_t), offset);
188    *trie_root_bid = _endian_decode(*trie_root_bid);
189
190    seq_memcpy(seq_root_bid, (uint8_t *)header_buf + offset,
191               sizeof(bid_t), offset);
192    *seq_root_bid = _endian_decode(*seq_root_bid);
193
194    if (ver_staletree_support(version)) {
195        seq_memcpy(stale_root_bid, (uint8_t *)header_buf + offset,
196                   sizeof(bid_t), offset);
197        *stale_root_bid = _endian_decode(*stale_root_bid);
198    } else {
199        *stale_root_bid = BLK_NOT_FOUND;
200    }
201
202    seq_memcpy(ndocs, (uint8_t *)header_buf + offset,
203               sizeof(uint64_t), offset);
204    *ndocs = _endian_decode(*ndocs);
205    if (ver_is_atleast_magic_001(version)) {
206        seq_memcpy(ndeletes, (uint8_t *)header_buf + offset,
207                   sizeof(uint64_t), offset);
208        *ndeletes = _endian_decode(*ndeletes);
209    } else {
210        *ndeletes = 0;
211    }
212
213    seq_memcpy(nlivenodes, (uint8_t *)header_buf + offset,
214               sizeof(uint64_t), offset);
215    *nlivenodes = _endian_decode(*nlivenodes);
216
217    seq_memcpy(datasize, (uint8_t *)header_buf + offset,
218               sizeof(uint64_t), offset);
219    *datasize = _endian_decode(*datasize);
220
221    seq_memcpy(last_wal_flush_hdr_bid, (uint8_t *)header_buf + offset,
222               sizeof(uint64_t), offset);
223    *last_wal_flush_hdr_bid = _endian_decode(*last_wal_flush_hdr_bid);
224
225    seq_memcpy(kv_info_offset, (uint8_t *)header_buf + offset,
226               sizeof(uint64_t), offset);
227    *kv_info_offset = _endian_decode(*kv_info_offset);
228
229    seq_memcpy(header_flags, (uint8_t *)header_buf + offset,
230               sizeof(uint64_t), offset);
231    *header_flags = _endian_decode(*header_flags);
232
233    seq_memcpy(&new_filename_len, (uint8_t *)header_buf + offset,
234               sizeof(new_filename_len), offset);
235    new_filename_len = _endian_decode(new_filename_len);
236    seq_memcpy(&old_filename_len, (uint8_t *)header_buf + offset,
237               sizeof(old_filename_len), offset);
238    old_filename_len = _endian_decode(old_filename_len);
239    if (new_filename_len) {
240        *new_filename = (char*)((uint8_t *)header_buf + offset);
241    } else {
242        *new_filename = NULL;
243    }
244    offset += new_filename_len;
245    if (old_filename && old_filename_len) {
246        *old_filename = (char *) malloc(old_filename_len);
247        seq_memcpy(*old_filename,
248                   (uint8_t *)header_buf + offset,
249                   old_filename_len, offset);
250    }
251}
252
253// read the revnum of the given header of BID
254INLINE filemgr_header_revnum_t _fdb_get_header_revnum(fdb_kvs_handle *handle, bid_t bid)
255{
256    uint8_t *buf = alca(uint8_t, handle->file->blocksize);
257    uint64_t version;
258    size_t header_len;
259    fdb_seqnum_t seqnum;
260    filemgr_header_revnum_t revnum = 0;
261    fdb_status fs;
262
263    fs = filemgr_fetch_header(handle->file, bid, buf, &header_len,
264                              &seqnum, &revnum, NULL, &version, NULL,
265                              &handle->log_callback);
266    if (fs != FDB_RESULT_SUCCESS) {
267        return 0;
268    }
269    return revnum;
270}
271
272INLINE filemgr_header_revnum_t _fdb_get_bmp_revnum(fdb_kvs_handle *handle, bid_t bid)
273{
274    uint8_t *buf = alca(uint8_t, handle->file->blocksize);
275    uint64_t version, bmp_revnum = 0;
276    size_t header_len;
277    fdb_seqnum_t seqnum;
278    filemgr_header_revnum_t revnum;
279    fdb_status fs;
280
281    fs = filemgr_fetch_header(handle->file, bid, buf, &header_len,
282                              &seqnum, &revnum, NULL, &version, &bmp_revnum,
283                              &handle->log_callback);
284    if (fs != FDB_RESULT_SUCCESS) {
285        return 0;
286    }
287    return bmp_revnum;
288}
289
290void fdb_dummy_log_callback(int err_code, const char *err_msg, void *ctx_data)
291{
292    (void)err_code;
293    (void)err_msg;
294    (void)ctx_data;
295    return;
296}
297
298INLINE void _fdb_restore_wal(fdb_kvs_handle *handle,
299                             fdb_restore_mode_t mode,
300                             bid_t hdr_bid,
301                             fdb_kvs_id_t kv_id_req)
302{
303    struct filemgr *file = handle->file;
304    uint32_t blocksize = handle->file->blocksize;
305    uint64_t last_wal_flush_hdr_bid = handle->last_wal_flush_hdr_bid;
306    uint64_t hdr_off = hdr_bid * FDB_BLOCKSIZE;
307    uint64_t offset = 0; //assume everything from first block needs restoration
308    uint64_t filesize = filemgr_get_pos(handle->file);
309    uint64_t doc_scan_limit;
310    uint64_t start_bmp_revnum, stop_bmp_revnum;
311    uint64_t cur_bmp_revnum = (uint64_t)-1;
312    bid_t next_doc_block = BLK_NOT_FOUND;
313    struct _fdb_key_cmp_info cmp_info;
314    err_log_callback *log_callback;
315
316    if (!hdr_off) { // Nothing to do if we don't have a header block offset
317        return;
318    }
319
320    if (last_wal_flush_hdr_bid != BLK_NOT_FOUND) {
321        offset = (last_wal_flush_hdr_bid + 1) * blocksize;
322    }
323
324    // If a valid last header was retrieved and it matches the current header
325    // OR if WAL already had entries populated, then no crash recovery needed
326    if (hdr_off == offset || hdr_bid == last_wal_flush_hdr_bid ||
327        (!handle->shandle && wal_get_size(file) &&
328            mode != FDB_RESTORE_KV_INS)) {
329        return;
330    }
331
332    if (mode == FDB_RESTORE_NORMAL && !handle->shandle) {
333        // for normal WAL restore, set status to dirty
334        // (only when the previous status is clean or dirty)
335        wal_set_dirty_status(handle->file, FDB_WAL_DIRTY, true);
336    }
337
338    // Temporarily disable the error logging callback as there are false positive
339    // checksum errors in docio_read_doc.
340    // TODO: Need to adapt docio_read_doc to separate false checksum errors.
341    err_log_callback dummy_cb;
342    log_callback = handle->dhandle->log_callback;
343    dummy_cb.callback = fdb_dummy_log_callback;
344    dummy_cb.ctx_data = NULL;
345    handle->dhandle->log_callback = &dummy_cb;
346
347    if (!handle->shandle) {
348        filemgr_mutex_lock(file);
349    }
350    cmp_info.kvs_config = handle->kvs_config;
351    cmp_info.kvs = handle->kvs;
352
353    start_bmp_revnum = _fdb_get_bmp_revnum(handle, last_wal_flush_hdr_bid);
354    stop_bmp_revnum= _fdb_get_bmp_revnum(handle, hdr_bid);
355    cur_bmp_revnum = start_bmp_revnum;
356
357    // A: reused blocks during the 1st block reclaim (bmp_revnum: 1)
358    // B: reused blocks during the 2nd block reclaim (bmp_revnum: 2)
359    // otherwise: live block (bmp_revnum: 0)
360    //  1 2   3    4    5 6  7  8   9  10
361    // +-------------------------------------------+
362    // |  BBBBAAAAABBBBB  AAABBB    AAA            |
363    // +-------------------------------------------+
364    //              ^                     ^
365    //              hdr_bid               last_wal_flush
366    //
367    // scan order: 1 -> 5 -> 8 -> 10 -> 3 -> 6 -> 9 -> 2 -> 4 -> 7
368    // iteration #1: scan docs with bmp_revnum==0 in [last_wal_flush ~ filesize]
369    // iteration #2: scan docs with bmp_revnum==1 in [0 ~ filesize]
370    // iteration #3: scan docs with bmp_revnum==2 in [0 ~ hdr_bid]
371
372    do {
373        if (cur_bmp_revnum > stop_bmp_revnum) {
374            break;
375        } else if (cur_bmp_revnum == stop_bmp_revnum) {
376
377            bid_t sb_last_hdr_bid = BLK_NOT_FOUND;
378            if (handle->file->sb) {
379                sb_last_hdr_bid = atomic_get_uint64_t(&handle->file->sb->last_hdr_bid);
380            }
381            if (!handle->shandle && handle->file->sb &&
382                sb_last_hdr_bid != BLK_NOT_FOUND) {
383                hdr_off = (sb_last_hdr_bid+1) * blocksize;
384            }
385
386            doc_scan_limit = hdr_off;
387            if (offset >= hdr_off) {
388                break;
389            }
390        } else {
391            doc_scan_limit = filesize;
392        }
393
394        if (!docio_check_buffer(handle->dhandle, offset / blocksize,
395                                cur_bmp_revnum)) {
396            // not a document block .. move to next block
397        } else {
398            do {
399                struct docio_object doc;
400                int64_t _offset;
401                uint64_t doc_offset;
402                memset(&doc, 0, sizeof(doc));
403                _offset = docio_read_doc(handle->dhandle, offset, &doc, true);
404                if (_offset <= 0) { // reached unreadable doc, skip block
405                    // TODO: Need to have this function return fdb_status, so that
406                    // WAL restore operation should fail if offset < 0
407                    break;
408                } else if ((uint64_t)_offset < offset) {
409                    // If more than one writer is appending docs concurrently,
410                    // they have their own doc block linked list and doc blocks
411                    // may not be consecutive. For example,
412                    //
413                    // Writer 1): 100 -> 102 -> 2 -> 4     | commit
414                    // Writer 2):    101 - > 103 -> 3 -> 5 |
415                    //
416                    // In this case, if we read doc BID 102, then 'offset' will jump
417                    // to doc BID 2, without reading BID 103.
418                    //
419                    // To address this issue, in case that 'offset' decreases,
420                    // remember the next doc block, and follow the doc linked list
421                    // first. After the linked list ends, 'offset' cursor will be
422                    // reset to 'next_doc_block'.
423                    next_doc_block = (offset / blocksize) + 1;
424                }
425                if (doc.key || (doc.length.flag & DOCIO_TXN_COMMITTED)) {
426                    // check if the doc is transactional or not, and
427                    // also check if the doc contains system info
428                    if (!(doc.length.flag & DOCIO_TXN_DIRTY) &&
429                        !(doc.length.flag & DOCIO_SYSTEM)) {
430                        if (doc.length.flag & DOCIO_TXN_COMMITTED) {
431                            // commit mark .. read doc offset
432                            doc_offset = doc.doc_offset;
433                            // read the previously skipped doc
434                            if (docio_read_doc(handle->dhandle, doc_offset, &doc, true) <= 0) {
435                                // doc read error
436                                free(doc.key);
437                                free(doc.meta);
438                                free(doc.body);
439                                offset = _offset;
440                                continue;
441                            }
442                        } else {
443                            doc_offset = offset;
444                        }
445
446                        // If say a snapshot is taken on a db handle after
447                        // rollback, then skip WAL items after rollback point
448                        if ((mode == FDB_RESTORE_KV_INS || !handle->kvs) &&
449                            doc.seqnum > handle->seqnum) {
450                            free(doc.key);
451                            free(doc.meta);
452                            free(doc.body);
453                            offset = _offset;
454                            continue;
455                        }
456
457                        // restore document
458                        fdb_doc wal_doc;
459                        wal_doc.keylen = doc.length.keylen;
460                        wal_doc.bodylen = doc.length.bodylen;
461                        wal_doc.key = doc.key;
462                        wal_doc.seqnum = doc.seqnum;
463                        wal_doc.deleted = doc.length.flag & DOCIO_DELETED;
464
465                        if (!handle->shandle) {
466                            wal_doc.metalen = doc.length.metalen;
467                            wal_doc.meta = doc.meta;
468                            wal_doc.size_ondisk = _fdb_get_docsize(doc.length);
469
470                            if (handle->kvs) {
471                                // check seqnum before insert
472                                fdb_kvs_id_t kv_id;
473                                fdb_seqnum_t kv_seqnum;
474                                buf2kvid(handle->config.chunksize,
475                                         wal_doc.key, &kv_id);
476
477                                kv_seqnum = fdb_kvs_get_seqnum(handle->file, kv_id);
478                                if (doc.seqnum <= kv_seqnum &&
479                                        ((mode == FDB_RESTORE_KV_INS &&
480                                            kv_id == kv_id_req) ||
481                                         (mode == FDB_RESTORE_NORMAL)) ) {
482                                    // if mode is NORMAL, restore all items
483                                    // if mode is KV_INS, restore items matching ID
484                                    wal_insert(&file->global_txn, file, &cmp_info,
485                                               &wal_doc, doc_offset,
486                                               WAL_INS_WRITER);
487                                }
488                            } else {
489                                wal_insert(&file->global_txn, file, &cmp_info,
490                                           &wal_doc, doc_offset,
491                                           WAL_INS_WRITER);
492                            }
493                            if (doc.key) free(doc.key);
494                        } else {
495                            // snapshot
496                            if (handle->kvs) {
497                                fdb_kvs_id_t kv_id;
498                                buf2kvid(handle->config.chunksize,
499                                         wal_doc.key, &kv_id);
500                                if (kv_id == handle->kvs->id) {
501                                    // snapshot: insert ID matched documents only
502                                    wal_snap_insert(handle->shandle,
503                                                    &wal_doc, doc_offset);
504                                } else {
505                                    free(doc.key);
506                                }
507                            } else {
508                                wal_snap_insert(handle->shandle, &wal_doc,
509                                                doc_offset);
510                            }
511                        }
512                        free(doc.meta);
513                        free(doc.body);
514                        offset = _offset;
515                    } else {
516                        // skip transactional document or system document
517                        free(doc.key);
518                        free(doc.meta);
519                        free(doc.body);
520                        offset = _offset;
521                        // do not break.. read next doc
522                    }
523                } else {
524                    free(doc.key);
525                    free(doc.meta);
526                    free(doc.body);
527                    offset = _offset;
528                    break;
529                }
530            } while (offset + sizeof(struct docio_length) < doc_scan_limit);
531        }
532
533        if (next_doc_block != BLK_NOT_FOUND) {
534            offset = next_doc_block * blocksize;
535            next_doc_block = BLK_NOT_FOUND;
536        } else {
537            offset = ((offset / blocksize) + 1) * blocksize;
538        }
539        if (ver_superblock_support(handle->file->version) &&
540            offset >= filesize) {
541            // circular scan
542            struct superblock *sb = handle->file->sb;
543            if (sb && sb->config) {
544                offset = blocksize * sb->config->num_sb;
545                cur_bmp_revnum++;
546            }
547        }
548    } while(true);
549
550    // wal commit
551    if (!handle->shandle) {
552        wal_commit(&file->global_txn, file, NULL, &handle->log_callback);
553        filemgr_mutex_unlock(file);
554    }
555    handle->dhandle->log_callback = log_callback;
556}
557
558INLINE fdb_status _fdb_recover_compaction(fdb_kvs_handle *handle,
559                                          const char *new_filename)
560{
561    fdb_kvs_handle new_db;
562    fdb_config config = handle->config;
563    struct filemgr *new_file;
564
565    memset(&new_db, 0, sizeof(new_db));
566    new_db.log_callback.callback = handle->log_callback.callback;
567    new_db.log_callback.ctx_data = handle->log_callback.ctx_data;
568    config.flags |= FDB_OPEN_FLAG_RDONLY;
569    new_db.fhandle = handle->fhandle;
570    new_db.kvs_config = handle->kvs_config;
571    fdb_status status = _fdb_open(&new_db, new_filename,
572                                  FDB_AFILENAME, &config);
573    if (status != FDB_RESULT_SUCCESS) {
574        return fdb_log(&handle->log_callback, status,
575                       "Error in opening a partially compacted file '%s' for recovery.",
576                       new_filename);
577    }
578
579    new_file = new_db.file;
580
581    if (new_file->old_filename &&
582        !strncmp(new_file->old_filename, handle->file->filename,
583                 FDB_MAX_FILENAME_LEN)) {
584        struct filemgr *old_file = handle->file;
585        // If new file has a recorded old_filename then it means that
586        // compaction has completed successfully. Mark self for deletion
587        filemgr_mutex_lock(new_file);
588
589        status = btreeblk_end(handle->bhandle);
590        if (status != FDB_RESULT_SUCCESS) {
591            filemgr_mutex_unlock(new_file);
592            _fdb_close(&new_db);
593            return status;
594        }
595        btreeblk_free(handle->bhandle);
596        free(handle->bhandle);
597        handle->bhandle = new_db.bhandle;
598
599        docio_free(handle->dhandle);
600        free(handle->dhandle);
601        handle->dhandle = new_db.dhandle;
602
603        hbtrie_free(handle->trie);
604        free(handle->trie);
605        handle->trie = new_db.trie;
606
607        wal_shutdown(handle->file, &handle->log_callback);
608        handle->file = new_file;
609
610        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
611            if (handle->kvs) {
612                // multi KV instance mode
613                hbtrie_free(handle->seqtrie);
614                free(handle->seqtrie);
615                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
616                    handle->seqtrie = new_db.seqtrie;
617                }
618            } else {
619                free(handle->seqtree->kv_ops);
620                free(handle->seqtree);
621                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
622                    handle->seqtree = new_db.seqtree;
623                }
624            }
625        }
626        handle->staletree = new_db.staletree;
627
628        filemgr_mutex_unlock(new_file);
629        if (new_db.kvs) {
630            fdb_kvs_info_free(&new_db);
631        }
632        // remove self: WARNING must not close this handle if snapshots
633        // are yet to open this file
634        filemgr_remove_pending(old_file, new_db.file, &new_db.log_callback);
635        filemgr_close(old_file, 0, handle->filename, &handle->log_callback);
636        free(new_db.filename);
637        return FDB_RESULT_FAIL_BY_COMPACTION;
638    }
639
640    // As the new file is partially compacted, it should be removed upon close.
641    // Just in-case the new file gets opened before removal, point it to the old
642    // file to ensure availability of data.
643    filemgr_remove_pending(new_db.file, handle->file, &handle->log_callback);
644    _fdb_close(&new_db);
645
646    return FDB_RESULT_SUCCESS;
647}
648
649#ifndef SPIN_INITIALIZER
650INLINE void init_initial_lock_status() {
651    // Note that only Windows passes through this routine
652    if (!fdb_initialized) {
653        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
654            // atomically initialize spin lock only once
655            spin_init(&initial_lock);
656            initial_lock_status = 2;
657        } else {
658            // the others .. wait until initializing 'initial_lock' is done
659            // TODO: Need to devise a better way of synchronization on Windows
660            while (initial_lock_status != 2) {
661                Sleep(1);
662            }
663        }
664    }
665}
666#endif
667
668LIBFDB_API
669fdb_status fdb_init(fdb_config *config)
670{
671    fdb_config _config;
672    compactor_config c_config;
673    bgflusher_config bgf_config;
674    struct filemgr_config f_config;
675
676    if (config) {
677        if (validate_fdb_config(config)) {
678            _config = *config;
679        } else {
680            return FDB_RESULT_INVALID_CONFIG;
681        }
682    } else {
683        _config = get_default_config();
684    }
685
686    // global initialization
687    // initialized only once at first time
688    if (!fdb_initialized) {
689
690#ifndef SPIN_INITIALIZER
691        init_initial_lock_status();
692#endif
693
694    }
695    spin_lock(&initial_lock);
696    if (!fdb_initialized) {
697#if !defined(_ANDROID_) && !defined(__ANDROID__)
698        // Some Android devices (e.g., Nexus 6) return incorrect RAM size.
699        // We temporarily disable validity checking of block cache size
700        // on Android platform at this time.
701        double ram_size = (double) get_memory_size();
702        if (ram_size * BCACHE_MEMORY_THRESHOLD < (double) _config.buffercache_size) {
703            spin_unlock(&initial_lock);
704            return FDB_RESULT_TOO_BIG_BUFFER_CACHE;
705        }
706#endif
707        // initialize file manager and block cache
708        f_config.blocksize = _config.blocksize;
709        f_config.ncacheblock = _config.buffercache_size / _config.blocksize;
710        f_config.seqtree_opt = _config.seqtree_opt;
711        filemgr_init(&f_config);
712        filemgr_set_lazy_file_deletion(true,
713                                       compactor_register_file_removing,
714                                       compactor_is_file_removed);
715        if (ver_superblock_support(ver_get_latest_magic())) {
716            struct sb_ops sb_ops = {sb_init, sb_get_default_config,
717                                    sb_read_latest, sb_alloc_block,
718                                    sb_bmp_is_writable, sb_get_bmp_revnum,
719                                    sb_get_min_live_revnum, sb_free};
720            filemgr_set_sb_operation(sb_ops);
721            sb_bmp_mask_init();
722        }
723
724        // initialize compaction daemon
725        c_config.sleep_duration = _config.compactor_sleep_duration;
726        c_config.num_threads = _config.num_compactor_threads;
727        compactor_init(&c_config);
728        // initialize background flusher daemon
729        // Temporarily disable background flushers until blockcache contention
730        // issue is resolved.
731        bgf_config.num_threads = 0; //_config.num_bgflusher_threads;
732        bgflusher_init(&bgf_config);
733
734        // Initialize breakpad
735        _dbg_handle_crashes(config->breakpad_minidump_dir);
736
737        fdb_initialized = 1;
738    }
739    spin_unlock(&initial_lock);
740
741    return FDB_RESULT_SUCCESS;
742}
743
744LIBFDB_API
745fdb_config fdb_get_default_config(void) {
746    return get_default_config();
747}
748
749LIBFDB_API
750fdb_kvs_config fdb_get_default_kvs_config(void) {
751    return get_default_kvs_config();
752}
753
754LIBFDB_API
755fdb_status fdb_open(fdb_file_handle **ptr_fhandle,
756                    const char *filename,
757                    fdb_config *fconfig)
758{
759#ifdef _MEMPOOL
760    mempool_init();
761#endif
762
763    fdb_config config;
764    fdb_file_handle *fhandle;
765    fdb_kvs_handle *handle;
766
767    if (fconfig) {
768        if (validate_fdb_config(fconfig)) {
769            config = *fconfig;
770        } else {
771            return FDB_RESULT_INVALID_CONFIG;
772        }
773    } else {
774        config = get_default_config();
775    }
776
777    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
778    if (!fhandle) { // LCOV_EXCL_START
779        return FDB_RESULT_ALLOC_FAIL;
780    } // LCOV_EXCL_STOP
781
782    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
783    if (!handle) { // LCOV_EXCL_START
784        free(fhandle);
785        return FDB_RESULT_ALLOC_FAIL;
786    } // LCOV_EXCL_STOP
787
788#ifndef SPIN_INITIALIZER
789    init_initial_lock_status();
790#endif
791
792    spin_lock(&initial_lock);
793    fdb_open_inprog++;
794    spin_unlock(&initial_lock);
795
796    atomic_init_uint8_t(&handle->handle_busy, 0);
797    handle->shandle = NULL;
798    handle->kvs_config = get_default_kvs_config();
799
800    fdb_status fs = fdb_init(fconfig);
801    if (fs != FDB_RESULT_SUCCESS) {
802        free(handle);
803        free(fhandle);
804        spin_lock(&initial_lock);
805        fdb_open_inprog--;
806        spin_unlock(&initial_lock);
807        return fs;
808    }
809    fdb_file_handle_init(fhandle, handle);
810
811    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
812    if (fs == FDB_RESULT_SUCCESS) {
813        *ptr_fhandle = fhandle;
814        filemgr_fhandle_add(handle->file, fhandle);
815    } else {
816        *ptr_fhandle = NULL;
817        free(handle);
818        fdb_file_handle_free(fhandle);
819    }
820    spin_lock(&initial_lock);
821    fdb_open_inprog--;
822    spin_unlock(&initial_lock);
823    return fs;
824}
825
826LIBFDB_API
827fdb_status fdb_open_custom_cmp(fdb_file_handle **ptr_fhandle,
828                               const char *filename,
829                               fdb_config *fconfig,
830                               size_t num_functions,
831                               char **kvs_names,
832                               fdb_custom_cmp_variable *functions)
833{
834#ifdef _MEMPOOL
835    mempool_init();
836#endif
837
838    fdb_config config;
839    fdb_file_handle *fhandle;
840    fdb_kvs_handle *handle;
841
842    if (fconfig) {
843        if (validate_fdb_config(fconfig)) {
844            config = *fconfig;
845        } else {
846            return FDB_RESULT_INVALID_CONFIG;
847        }
848    } else {
849        config = get_default_config();
850    }
851
852    if (config.multi_kv_instances == false) {
853        // single KV instance mode does not support customized cmp function
854        return FDB_RESULT_INVALID_CONFIG;
855    }
856
857    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
858    if (!fhandle) { // LCOV_EXCL_START
859        return FDB_RESULT_ALLOC_FAIL;
860    } // LCOV_EXCL_STOP
861
862    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
863    if (!handle) { // LCOV_EXCL_START
864        free(fhandle);
865        return FDB_RESULT_ALLOC_FAIL;
866    } // LCOV_EXCL_STOP
867
868#ifndef SPIN_INITIALIZER
869    init_initial_lock_status();
870#endif
871
872    spin_lock(&initial_lock);
873    fdb_open_inprog++;
874    spin_unlock(&initial_lock);
875
876    atomic_init_uint8_t(&handle->handle_busy, 0);
877    handle->shandle = NULL;
878    handle->kvs_config = get_default_kvs_config();
879
880    fdb_status fs = fdb_init(fconfig);
881    if (fs != FDB_RESULT_SUCCESS) {
882        free(handle);
883        free(fhandle);
884        spin_lock(&initial_lock);
885        fdb_open_inprog--;
886        spin_unlock(&initial_lock);
887        return fs;
888    }
889    fdb_file_handle_init(fhandle, handle);
890
891    // insert kvs_names and functions into fhandle's list
892    fdb_file_handle_parse_cmp_func(fhandle, num_functions,
893                                   kvs_names, functions);
894
895    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
896    if (fs == FDB_RESULT_SUCCESS) {
897        *ptr_fhandle = fhandle;
898        filemgr_fhandle_add(handle->file, fhandle);
899    } else {
900        *ptr_fhandle = NULL;
901        free(handle);
902        fdb_file_handle_free(fhandle);
903    }
904    spin_lock(&initial_lock);
905    fdb_open_inprog--;
906    spin_unlock(&initial_lock);
907    return fs;
908}
909
910fdb_status fdb_open_for_compactor(fdb_file_handle **ptr_fhandle,
911                                  const char *filename,
912                                  fdb_config *fconfig,
913                                  struct list *cmp_func_list)
914{
915#ifdef _MEMPOOL
916    mempool_init();
917#endif
918
919    fdb_file_handle *fhandle;
920    fdb_kvs_handle *handle;
921
922    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
923    if (!fhandle) { // LCOV_EXCL_START
924        return FDB_RESULT_ALLOC_FAIL;
925    } // LCOV_EXCL_STOP
926
927    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
928    if (!handle) { // LCOV_EXCL_START
929        free(fhandle);
930        return FDB_RESULT_ALLOC_FAIL;
931    } // LCOV_EXCL_STOP
932
933    atomic_init_uint8_t(&handle->handle_busy, 0);
934    handle->shandle = NULL;
935
936    fdb_file_handle_init(fhandle, handle);
937    if (cmp_func_list && list_begin(cmp_func_list)) {
938        fdb_file_handle_clone_cmp_func_list(fhandle, cmp_func_list);
939    }
940    fdb_status fs = _fdb_open(handle, filename, FDB_VFILENAME, fconfig);
941    if (fs == FDB_RESULT_SUCCESS) {
942        *ptr_fhandle = fhandle;
943        filemgr_fhandle_add(handle->file, fhandle);
944    } else {
945        *ptr_fhandle = NULL;
946        free(handle);
947        fdb_file_handle_free(fhandle);
948    }
949    return fs;
950}
951
952LIBFDB_API
953fdb_status fdb_snapshot_open(fdb_kvs_handle *handle_in,
954                             fdb_kvs_handle **ptr_handle, fdb_seqnum_t seqnum)
955{
956#ifdef _MEMPOOL
957    mempool_init();
958#endif
959
960    fdb_config config = handle_in->config;
961    fdb_kvs_config kvs_config = handle_in->kvs_config;
962    fdb_kvs_id_t kv_id = 0;
963    fdb_kvs_handle *handle;
964    fdb_txn *txn = NULL;
965    fdb_status fs = FDB_RESULT_SUCCESS;
966    filemgr *file;
967    file_status_t fstatus = FILE_NORMAL;
968    struct snap_handle dummy_shandle;
969    struct _fdb_key_cmp_info cmp_info;
970    LATENCY_STAT_START();
971
972    if (!handle_in || !ptr_handle) {
973        return FDB_RESULT_INVALID_ARGS;
974    }
975
976fdb_snapshot_open_start:
977    if (!handle_in->shandle) {
978        fdb_check_file_reopen(handle_in, &fstatus);
979        fdb_sync_db_header(handle_in);
980        file = handle_in->file;
981
982        if (handle_in->kvs && handle_in->kvs->type == KVS_SUB) {
983            handle_in->seqnum = fdb_kvs_get_seqnum(file,
984                                                   handle_in->kvs->id);
985        } else {
986            handle_in->seqnum = filemgr_get_seqnum(file);
987        }
988    } else {
989        file = handle_in->file;
990    }
991
992    // if the max sequence number seen by this handle is lower than the
993    // requested snapshot marker, it means the snapshot is not yet visible
994    // even via the current fdb_kvs_handle
995    if (seqnum != FDB_SNAPSHOT_INMEM && seqnum > handle_in->seqnum) {
996        return FDB_RESULT_NO_DB_INSTANCE;
997    }
998
999    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1000    if (!handle) { // LCOV_EXCL_START
1001        return FDB_RESULT_ALLOC_FAIL;
1002    } // LCOV_EXCL_STOP
1003
1004    atomic_init_uint8_t(&handle->handle_busy, 0);
1005    handle->log_callback = handle_in->log_callback;
1006    handle->max_seqnum = seqnum;
1007    handle->fhandle = handle_in->fhandle;
1008
1009    config.flags |= FDB_OPEN_FLAG_RDONLY;
1010    // do not perform compaction for snapshot
1011    config.compaction_mode = FDB_COMPACTION_MANUAL;
1012
1013    // If cloning an existing snapshot handle, then rewind indexes
1014    // to its last DB header and point its avl tree to existing snapshot's tree
1015    bool clone_snapshot = false;
1016    if (handle_in->shandle) {
1017        handle->last_hdr_bid = handle_in->last_hdr_bid; // do fast rewind
1018        fs = wal_snapshot_clone(handle_in->shandle, &handle->shandle, seqnum);
1019        if (fs == FDB_RESULT_SUCCESS) {
1020            clone_snapshot = true;
1021            handle->max_seqnum = FDB_SNAPSHOT_INMEM; // temp value to skip WAL
1022        } else {
1023            fdb_log(&handle_in->log_callback, fs,
1024                    "Warning: Snapshot clone at sequence number %" _F64
1025                    "does not match its snapshot handle %" _F64
1026                    "in file '%s'.", seqnum, handle_in->seqnum,
1027                    handle_in->file->filename);
1028            free(handle);
1029            return fs;
1030        }
1031    }
1032
1033    cmp_info.kvs_config = handle_in->kvs_config;
1034    cmp_info.kvs = handle_in->kvs;
1035
1036    if (!handle->shandle) {
1037        txn = handle_in->fhandle->root->txn;
1038        if (!txn) {
1039            txn = &handle_in->file->global_txn;
1040        }
1041        if (handle_in->kvs) {
1042            kv_id = handle_in->kvs->id;
1043        }
1044        if (seqnum == FDB_SNAPSHOT_INMEM) {
1045            memset(&dummy_shandle, 0, sizeof(struct snap_handle));
1046            // tmp value to denote snapshot & not rollback to _fdb_open
1047            handle->shandle = &dummy_shandle; // dummy
1048        } else {
1049            fs = wal_dur_snapshot_open(seqnum, &cmp_info, file, txn,
1050                                       &handle->shandle);
1051        }
1052        if (fs != FDB_RESULT_SUCCESS) {
1053            free(handle);
1054            return fs;
1055        }
1056    }
1057
1058    if (handle_in->kvs) {
1059        // sub-handle in multi KV instance mode
1060        if (clone_snapshot) {
1061            fs = _fdb_kvs_clone_snapshot(handle_in, handle);
1062        } else {
1063            fs = _fdb_kvs_open(handle_in->kvs->root,
1064                              &config, &kvs_config, file,
1065                              file->filename,
1066                              _fdb_kvs_get_name(handle_in, file),
1067                              handle);
1068        }
1069    } else {
1070        if (clone_snapshot) {
1071            fs = _fdb_clone_snapshot(handle_in, handle);
1072        } else {
1073            fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1074        }
1075    }
1076
1077    if (fs == FDB_RESULT_SUCCESS) {
1078        if (seqnum == FDB_SNAPSHOT_INMEM &&
1079            !handle_in->shandle) {
1080            handle->max_seqnum = handle_in->seqnum;
1081
1082            // synchronize dirty root nodes if exist
1083            bid_t dirty_idtree_root = BLK_NOT_FOUND;
1084            bid_t dirty_seqtree_root = BLK_NOT_FOUND;
1085            struct filemgr_dirty_update_node *dirty_update;
1086
1087            dirty_update = filemgr_dirty_update_get_latest(handle->file);
1088            btreeblk_set_dirty_update(handle->bhandle, dirty_update);
1089
1090            if (dirty_update) {
1091                filemgr_dirty_update_get_root(handle->file, dirty_update,
1092                                       &dirty_idtree_root, &dirty_seqtree_root);
1093                _fdb_import_dirty_root(handle, dirty_idtree_root,
1094                                       dirty_seqtree_root);
1095                btreeblk_discard_blocks(handle->bhandle);
1096            }
1097            // Having synced the dirty root, make an in-memory WAL snapshot
1098            // TODO: Re-enable WAL sharing once ready...
1099#ifdef _MVCC_WAL_ENABLE
1100            fs = wal_snapshot_open(handle->file, txn, kv_id, seqnum,
1101                                   &cmp_info, &handle->shandle);
1102#else
1103            fs = wal_dur_snapshot_open(handle->seqnum, &cmp_info, file, txn,
1104                                       &handle->shandle);
1105            if (fs == FDB_RESULT_SUCCESS) {
1106                fs = wal_copyto_snapshot(file, handle->shandle,
1107                                        (bool)handle_in->kvs);
1108            }
1109            (void)kv_id;
1110#endif // _MVCC_WAL_ENABLE
1111        } else if (clone_snapshot) {
1112            // Snapshot is created on the other snapshot handle
1113
1114            handle->max_seqnum = handle_in->seqnum;
1115
1116            if (seqnum == FDB_SNAPSHOT_INMEM) {
1117                // in-memory snapshot
1118                // Clone dirty root nodes from the source snapshot by incrementing
1119                // their ref counters
1120                handle->trie->root_bid = handle_in->trie->root_bid;
1121                if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1122                    if (handle->kvs) {
1123                        handle->seqtrie->root_bid = handle_in->seqtrie->root_bid;
1124                    } else {
1125                        handle->seqtree->root_bid = handle_in->seqtree->root_bid;
1126                    }
1127                }
1128                btreeblk_discard_blocks(handle->bhandle);
1129
1130                // increase ref count for dirty update
1131                struct filemgr_dirty_update_node *dirty_update;
1132                dirty_update = btreeblk_get_dirty_update(handle_in->bhandle);
1133                filemgr_dirty_update_inc_ref_count(dirty_update);
1134                btreeblk_set_dirty_update(handle->bhandle, dirty_update);
1135            }
1136        }
1137        *ptr_handle = handle;
1138    } else {
1139        *ptr_handle = NULL;
1140        if (clone_snapshot || seqnum != FDB_SNAPSHOT_INMEM) {
1141            wal_snapshot_close(handle->shandle, handle->file);
1142        }
1143        free(handle);
1144        // If compactor thread had finished compaction just before this routine
1145        // calls _fdb_open, then it is possible that the snapshot's DB header
1146        // is only present in the new_file. So we must retry the snapshot
1147        // open attempt IFF _fdb_open indicates FDB_RESULT_NO_DB_INSTANCE..
1148        if (fs == FDB_RESULT_NO_DB_INSTANCE && fstatus == FILE_COMPACT_OLD) {
1149            if (filemgr_get_file_status(file) == FILE_REMOVED_PENDING) {
1150                goto fdb_snapshot_open_start;
1151            }
1152        }
1153    }
1154    if (seqnum == FDB_SNAPSHOT_INMEM) {
1155        LATENCY_STAT_END(file, FDB_LATENCY_SNAPSHOTS);
1156    } else {
1157        LATENCY_STAT_END(file, FDB_LATENCY_SNAPSHOT_DUR);
1158    }
1159    return fs;
1160}
1161
1162static fdb_status _fdb_reset(fdb_kvs_handle *handle, fdb_kvs_handle *handle_in);
1163
1164LIBFDB_API
1165fdb_status fdb_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
1166{
1167#ifdef _MEMPOOL
1168    mempool_init();
1169#endif
1170
1171    fdb_config config;
1172    fdb_kvs_handle *handle_in, *handle;
1173    fdb_status fs;
1174    fdb_seqnum_t old_seqnum;
1175
1176    if (!handle_ptr) {
1177        return FDB_RESULT_INVALID_ARGS;
1178    }
1179
1180    handle_in = *handle_ptr;
1181    config = handle_in->config;
1182
1183    if (handle_in->kvs) {
1184        return fdb_kvs_rollback(handle_ptr, seqnum);
1185    }
1186
1187    if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
1188        return fdb_log(&handle_in->log_callback, FDB_RESULT_RONLY_VIOLATION,
1189                       "Warning: Rollback is not allowed on the read-only DB file '%s'.",
1190                       handle_in->file->filename);
1191    }
1192
1193    if (!atomic_cas_uint8_t(&handle_in->handle_busy, 0, 1)) {
1194        return FDB_RESULT_HANDLE_BUSY;
1195    }
1196
1197    filemgr_mutex_lock(handle_in->file);
1198    filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
1199    // All transactions should be closed before rollback
1200    if (wal_txn_exists(handle_in->file)) {
1201        filemgr_set_rollback(handle_in->file, 0);
1202        filemgr_mutex_unlock(handle_in->file);
1203        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1204        return FDB_RESULT_FAIL_BY_TRANSACTION;
1205    }
1206
1207    // If compaction is running, wait until it is aborted.
1208    // TODO: Find a better way of waiting for the compaction abortion.
1209    unsigned int sleep_time = 10000; // 10 ms.
1210    file_status_t fstatus = filemgr_get_file_status(handle_in->file);
1211    while (fstatus == FILE_COMPACT_OLD) {
1212        filemgr_mutex_unlock(handle_in->file);
1213        decaying_usleep(&sleep_time, 1000000);
1214        filemgr_mutex_lock(handle_in->file);
1215        fstatus = filemgr_get_file_status(handle_in->file);
1216    }
1217    if (fstatus == FILE_REMOVED_PENDING) {
1218        filemgr_mutex_unlock(handle_in->file);
1219        fdb_check_file_reopen(handle_in, NULL);
1220    } else {
1221        filemgr_mutex_unlock(handle_in->file);
1222    }
1223
1224    fdb_sync_db_header(handle_in);
1225
1226    // if the max sequence number seen by this handle is lower than the
1227    // requested snapshot marker, it means the snapshot is not yet visible
1228    // even via the current fdb_kvs_handle
1229    if (seqnum > handle_in->seqnum) {
1230        filemgr_set_rollback(handle_in->file, 0); // allow mutations
1231        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1232        return FDB_RESULT_NO_DB_INSTANCE;
1233    }
1234
1235    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1236    if (!handle) { // LCOV_EXCL_START
1237        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1238        return FDB_RESULT_ALLOC_FAIL;
1239    } // LCOV_EXCL_STOP
1240
1241    atomic_init_uint8_t(&handle->handle_busy, 0);
1242    handle->log_callback = handle_in->log_callback;
1243    handle->fhandle = handle_in->fhandle;
1244    if (seqnum == 0) {
1245        fs = _fdb_reset(handle, handle_in);
1246    } else {
1247        handle->max_seqnum = seqnum;
1248        fs = _fdb_open(handle, handle_in->file->filename, FDB_AFILENAME,
1249                       &config);
1250    }
1251
1252    filemgr_set_rollback(handle_in->file, 0); // allow mutations
1253    if (fs == FDB_RESULT_SUCCESS) {
1254        // rollback the file's sequence number
1255        filemgr_mutex_lock(handle_in->file);
1256        old_seqnum = filemgr_get_seqnum(handle_in->file);
1257        filemgr_set_seqnum(handle_in->file, seqnum);
1258        filemgr_mutex_unlock(handle_in->file);
1259
1260        fs = _fdb_commit(handle, FDB_COMMIT_MANUAL_WAL_FLUSH,
1261                !(handle_in->config.durability_opt & FDB_DRB_ASYNC));
1262        if (fs == FDB_RESULT_SUCCESS) {
1263            if (handle_in->txn) {
1264                handle->txn = handle_in->txn;
1265                handle_in->txn = NULL;
1266            }
1267            handle_in->fhandle->root = handle;
1268            _fdb_close_root(handle_in);
1269            handle->max_seqnum = 0;
1270            handle->seqnum = seqnum;
1271            *handle_ptr = handle;
1272        } else {
1273            // cancel the rolling-back of the sequence number
1274            filemgr_mutex_lock(handle_in->file);
1275            filemgr_set_seqnum(handle_in->file, old_seqnum);
1276            filemgr_mutex_unlock(handle_in->file);
1277            free(handle);
1278            atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1279        }
1280    } else {
1281        free(handle);
1282        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1283    }
1284
1285    return fs;
1286}
1287
1288LIBFDB_API
1289fdb_status fdb_rollback_all(fdb_file_handle *fhandle,
1290                            fdb_snapshot_marker_t marker)
1291{
1292#ifdef _MEMPOOL
1293    mempool_init();
1294#endif
1295
1296    fdb_config config;
1297    fdb_kvs_handle *super_handle;
1298    fdb_kvs_handle rhandle;
1299    fdb_kvs_handle *handle = &rhandle;
1300    struct filemgr *file;
1301    fdb_kvs_config kvs_config;
1302    fdb_status fs;
1303    err_log_callback log_callback;
1304    struct kvs_info *kvs;
1305    struct snap_handle shandle; // dummy snap handle
1306
1307    if (!fhandle) {
1308        return FDB_RESULT_INVALID_ARGS;
1309    }
1310
1311    super_handle = fhandle->root;
1312    kvs = super_handle->kvs;
1313
1314    // fdb_rollback_all cannot be allowed when there are kv store instances
1315    // still open, because we do not have means of invalidating open kv handles
1316    // which may not be present in the rollback point
1317    if (kvs && _fdb_kvs_is_busy(fhandle)) {
1318        return FDB_RESULT_KV_STORE_BUSY;
1319    }
1320    file = super_handle->file;
1321    config = super_handle->config;
1322    kvs_config = super_handle->kvs_config;
1323    log_callback = super_handle->log_callback;
1324
1325    if (super_handle->config.flags & FDB_OPEN_FLAG_RDONLY) {
1326        return fdb_log(&super_handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
1327                       "Warning: Rollback is not allowed on the read-only DB file '%s'.",
1328                       super_handle->file->filename);
1329    }
1330
1331    filemgr_mutex_lock(super_handle->file);
1332    filemgr_set_rollback(super_handle->file, 1); // disallow writes operations
1333    // All transactions should be closed before rollback
1334    if (wal_txn_exists(super_handle->file)) {
1335        filemgr_set_rollback(super_handle->file, 0);
1336        filemgr_mutex_unlock(super_handle->file);
1337        return FDB_RESULT_FAIL_BY_TRANSACTION;
1338    }
1339
1340    // If compaction is running, wait until it is aborted.
1341    // TODO: Find a better way of waiting for the compaction abortion.
1342    unsigned int sleep_time = 10000; // 10 ms.
1343    file_status_t fstatus = filemgr_get_file_status(super_handle->file);
1344    while (fstatus == FILE_COMPACT_OLD) {
1345        filemgr_mutex_unlock(super_handle->file);
1346        decaying_usleep(&sleep_time, 1000000);
1347        filemgr_mutex_lock(super_handle->file);
1348        fstatus = filemgr_get_file_status(super_handle->file);
1349    }
1350    if (fstatus == FILE_REMOVED_PENDING) {
1351        filemgr_mutex_unlock(super_handle->file);
1352        fdb_check_file_reopen(super_handle, NULL);
1353    } else {
1354        filemgr_mutex_unlock(super_handle->file);
1355    }
1356
1357    fdb_sync_db_header(super_handle);
1358    // Shutdown WAL discarding entries from all KV Stores..
1359    fs = wal_shutdown(super_handle->file, &super_handle->log_callback);
1360    if (fs != FDB_RESULT_SUCCESS) {
1361        return fs;
1362    }
1363
1364    memset(handle, 0, sizeof(fdb_kvs_handle));
1365    memset(&shandle, 0, sizeof(struct snap_handle));
1366    handle->log_callback = log_callback;
1367    handle->fhandle = fhandle;
1368    handle->last_hdr_bid = (bid_t)marker; // Fast rewind on open
1369    handle->max_seqnum = FDB_SNAPSHOT_INMEM; // Prevent WAL restore on open
1370    handle->shandle = &shandle; // a dummy handle to prevent WAL restore
1371    if (kvs) {
1372        fdb_kvs_header_free(file); // KV header will be recreated below.
1373        handle->kvs = kvs; // re-use super_handle's kvs info
1374        handle->kvs_config = kvs_config;
1375    }
1376    handle->config = config;
1377
1378    fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1379
1380    if (handle->config.multi_kv_instances) {
1381        filemgr_mutex_lock(handle->file);
1382        fdb_kvs_header_create(handle->file);
1383        fdb_kvs_header_read(handle->file->kv_header, handle->dhandle,
1384                            handle->kv_info_offset,
1385                            handle->file->version, false);
1386        filemgr_mutex_unlock(handle->file);
1387    }
1388
1389    filemgr_set_rollback(file, 0); // allow mutations
1390    handle->shandle = NULL; // just a dummy handle never allocated
1391
1392    if (fs == FDB_RESULT_SUCCESS) {
1393        fdb_seqnum_t old_seqnum;
1394        // Restore WAL for all KV instances...
1395        _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, (bid_t)marker, 0);
1396
1397        // rollback the file's sequence number
1398        filemgr_mutex_lock(file);
1399        old_seqnum = filemgr_get_seqnum(file);
1400        filemgr_set_seqnum(file, handle->seqnum);
1401        filemgr_mutex_unlock(file);
1402
1403        fs = _fdb_commit(handle, FDB_COMMIT_NORMAL,
1404                         !(handle->config.durability_opt & FDB_DRB_ASYNC));
1405        if (fs == FDB_RESULT_SUCCESS) {
1406            _fdb_close(super_handle);
1407            *super_handle = *handle;
1408        } else {
1409            filemgr_mutex_lock(file);
1410            filemgr_set_seqnum(file, old_seqnum);
1411            filemgr_mutex_unlock(file);
1412        }
1413    } else { // Rollback failed, restore KV header
1414        fdb_kvs_header_create(file);
1415        fdb_kvs_header_read(file->kv_header, super_handle->dhandle,
1416                            super_handle->kv_info_offset,
1417                            ver_get_latest_magic(),
1418                            false);
1419    }
1420
1421    return fs;
1422}
1423
1424static void _fdb_init_file_config(const fdb_config *config,
1425                                  struct filemgr_config *fconfig) {
1426    fconfig->blocksize = config->blocksize;
1427    fconfig->ncacheblock = config->buffercache_size / config->blocksize;
1428    fconfig->chunksize = config->chunksize;
1429
1430    fconfig->options = 0x0;
1431    fconfig->seqtree_opt = config->seqtree_opt;
1432
1433    if (config->flags & FDB_OPEN_FLAG_CREATE) {
1434        fconfig->options |= FILEMGR_CREATE;
1435    }
1436    if (config->flags & FDB_OPEN_FLAG_RDONLY) {
1437        fconfig->options |= FILEMGR_READONLY;
1438    }
1439    if (!(config->durability_opt & FDB_DRB_ASYNC)) {
1440        fconfig->options |= FILEMGR_SYNC;
1441    }
1442
1443    fconfig->flag = 0x0;
1444    if ((config->durability_opt & FDB_DRB_ODIRECT) &&
1445        config->buffercache_size) {
1446        fconfig->flag |= _ARCH_O_DIRECT;
1447    }
1448
1449    fconfig->prefetch_duration = config->prefetch_duration;
1450    fconfig->num_wal_shards = config->num_wal_partitions;
1451    fconfig->num_bcache_shards = config->num_bcache_partitions;
1452    fconfig->encryption_key = config->encryption_key;
1453    atomic_store_uint64_t(&fconfig->block_reusing_threshold,
1454                          config->block_reusing_threshold,
1455                          std::memory_order_relaxed);
1456    atomic_store_uint64_t(&fconfig->num_keeping_headers,
1457                          config->num_keeping_headers,
1458                          std::memory_order_relaxed);
1459}
1460
1461fdb_status _fdb_clone_snapshot(fdb_kvs_handle *handle_in,
1462                               fdb_kvs_handle *handle_out)
1463{
1464    fdb_status status;
1465
1466    handle_out->config = handle_in->config;
1467    handle_out->kvs_config = handle_in->kvs_config;
1468    handle_out->fileops = handle_in->fileops;
1469    handle_out->file = handle_in->file;
1470    // Note that the file ref count will be decremented when the cloned snapshot
1471    // is closed through filemgr_close().
1472    filemgr_incr_ref_count(handle_out->file);
1473
1474    if (handle_out->filename) {
1475        handle_out->filename = (char *)realloc(handle_out->filename,
1476                                               strlen(handle_in->filename)+1);
1477    } else {
1478        handle_out->filename = (char*)malloc(strlen(handle_in->filename)+1);
1479    }
1480    strcpy(handle_out->filename, handle_in->filename);
1481
1482    // initialize the docio handle.
1483    handle_out->dhandle = (struct docio_handle *)
1484        calloc(1, sizeof(struct docio_handle));
1485    handle_out->dhandle->log_callback = &handle_out->log_callback;
1486    docio_init(handle_out->dhandle, handle_out->file,
1487               handle_out->config.compress_document_body);
1488
1489    // initialize the btree block handle.
1490    handle_out->btreeblkops = btreeblk_get_ops();
1491    handle_out->bhandle = (struct btreeblk_handle *)
1492        calloc(1, sizeof(struct btreeblk_handle));
1493    handle_out->bhandle->log_callback = &handle_out->log_callback;
1494    btreeblk_init(handle_out->bhandle, handle_out->file, handle_out->file->blocksize);
1495
1496    handle_out->dirty_updates = handle_in->dirty_updates;
1497    atomic_store_uint64_t(&handle_out->cur_header_revnum, handle_in->cur_header_revnum);
1498    handle_out->last_wal_flush_hdr_bid = handle_in->last_wal_flush_hdr_bid;
1499    handle_out->kv_info_offset = handle_in->kv_info_offset;
1500    handle_out->op_stats = handle_in->op_stats;
1501
1502    // initialize the trie handle
1503    handle_out->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1504    hbtrie_init(handle_out->trie, handle_out->config.chunksize, OFFSET_SIZE,
1505                handle_out->file->blocksize,
1506                handle_in->trie->root_bid, // Source snapshot's trie root bid
1507                (void *)handle_out->bhandle, handle_out->btreeblkops,
1508                (void *)handle_out->dhandle, _fdb_readkey_wrap);
1509    // set aux for cmp wrapping function
1510    hbtrie_set_leaf_height_limit(handle_out->trie, 0xff);
1511    hbtrie_set_leaf_cmp(handle_out->trie, _fdb_custom_cmp_wrap);
1512
1513    if (handle_out->kvs) {
1514        hbtrie_set_map_function(handle_out->trie, fdb_kvs_find_cmp_chunk);
1515    }
1516
1517    handle_out->seqnum = handle_in->seqnum;
1518    if (handle_out->config.seqtree_opt == FDB_SEQTREE_USE) {
1519        if (handle_out->config.multi_kv_instances) {
1520            // multi KV instance mode .. HB+trie
1521            handle_out->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1522            hbtrie_init(handle_out->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
1523                        handle_out->file->blocksize,
1524                        handle_in->seqtrie->root_bid, // Source snapshot's seqtrie root bid
1525                        (void *)handle_out->bhandle, handle_out->btreeblkops,
1526                        (void *)handle_out->dhandle, _fdb_readseq_wrap);
1527
1528        } else {
1529            // single KV instance mode .. normal B+tree
1530            struct btree_kv_ops *seq_kv_ops =
1531                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
1532            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
1533            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
1534
1535            handle_out->seqtree = (struct btree*)malloc(sizeof(struct btree));
1536            // Init the seq tree using the root bid of the source snapshot.
1537            btree_init_from_bid(handle_out->seqtree, (void *)handle_out->bhandle,
1538                                handle_out->btreeblkops, seq_kv_ops,
1539                                handle_out->config.blocksize,
1540                                handle_in->seqtree->root_bid);
1541        }
1542    } else{
1543        handle_out->seqtree = NULL;
1544    }
1545
1546    status = btreeblk_end(handle_out->bhandle);
1547    if (status != FDB_RESULT_SUCCESS) {
1548        const char *msg = "Snapshot clone operation fails due to the errors in "
1549            "btreeblk_end() in a database file '%s'\n";
1550        fdb_log(&handle_in->log_callback, status, msg, handle_in->file->filename);
1551    }
1552
1553    return status;
1554}
1555
1556fdb_status _fdb_open(fdb_kvs_handle *handle,
1557                     const char *filename,
1558                     fdb_filename_mode_t filename_mode,
1559                     const fdb_config *config)
1560{
1561    struct filemgr_config fconfig;
1562    struct kvs_stat stat, empty_stat;
1563    bid_t trie_root_bid = BLK_NOT_FOUND;
1564    bid_t seq_root_bid = BLK_NOT_FOUND;
1565    bid_t stale_root_bid = BLK_NOT_FOUND;
1566    fdb_seqnum_t seqnum = 0;
1567    filemgr_header_revnum_t header_revnum = 0;
1568    filemgr_header_revnum_t latest_header_revnum = 0;
1569    fdb_seqtree_opt_t seqtree_opt = config->seqtree_opt;
1570    uint64_t ndocs = 0;
1571    uint64_t ndeletes = 0;
1572    uint64_t datasize = 0;
1573    uint64_t deltasize = 0;
1574    uint64_t last_wal_flush_hdr_bid = BLK_NOT_FOUND;
1575    uint64_t kv_info_offset = BLK_NOT_FOUND;
1576    uint64_t version;
1577    uint64_t header_flags = 0;
1578    uint8_t header_buf[FDB_BLOCKSIZE];
1579    char *compacted_filename = NULL;
1580    char *prev_filename = NULL;
1581    size_t header_len = 0;
1582    bool multi_kv_instances = config->multi_kv_instances;
1583
1584    uint64_t nlivenodes = 0;
1585    bid_t hdr_bid = 0; // initialize to zero for in-memory snapshot
1586    char actual_filename[FDB_MAX_FILENAME_LEN];
1587    char virtual_filename[FDB_MAX_FILENAME_LEN];
1588    char *target_filename = NULL;
1589    fdb_status status;
1590
1591    if (filename == NULL) {
1592        return FDB_RESULT_INVALID_ARGS;
1593    }
1594    if (strlen(filename) > (FDB_MAX_FILENAME_LEN - 8)) {
1595        // filename (including path) length is supported up to
1596        // (FDB_MAX_FILENAME_LEN - 8) bytes.
1597        return FDB_RESULT_TOO_LONG_FILENAME;
1598    }
1599
1600    if (filename_mode == FDB_VFILENAME &&
1601        !compactor_is_valid_mode(filename, (fdb_config *)config)) {
1602        return FDB_RESULT_INVALID_COMPACTION_MODE;
1603    }
1604
1605    _fdb_init_file_config(config, &fconfig);
1606
1607    if (filename_mode == FDB_VFILENAME) {
1608        compactor_get_actual_filename(filename, actual_filename,
1609                                      config->compaction_mode, &handle->log_callback);
1610    } else {
1611        strcpy(actual_filename, filename);
1612    }
1613
1614    if ( config->compaction_mode == FDB_COMPACTION_MANUAL ||
1615         (config->compaction_mode == FDB_COMPACTION_AUTO   &&
1616          filename_mode == FDB_VFILENAME) ) {
1617        // 1) manual compaction mode, OR
1618        // 2) auto compaction mode + 'filename' is virtual filename
1619        // -> copy 'filename'
1620        target_filename = (char *)filename;
1621    } else {
1622        // otherwise (auto compaction mode + 'filename' is actual filename)
1623        // -> copy 'virtual_filename'
1624        compactor_get_virtual_filename(filename, virtual_filename);
1625        target_filename = virtual_filename;
1626    }
1627
1628    // If the user is requesting legacy CRC pass that down to filemgr
1629    if(config->flags & FDB_OPEN_WITH_LEGACY_CRC) {
1630        fconfig.options |= FILEMGR_CREATE_CRC32;
1631    }
1632
1633    handle->fileops = get_filemgr_ops();
1634    filemgr_open_result result = filemgr_open((char *)actual_filename,
1635                                              handle->fileops,
1636                                              &fconfig, &handle->log_callback);
1637    if (result.rv != FDB_RESULT_SUCCESS) {
1638        return (fdb_status) result.rv;
1639    }
1640    handle->file = result.file;
1641
1642    if (config->compaction_mode == FDB_COMPACTION_MANUAL &&
1643        strcmp(filename, actual_filename)) {
1644        // It is in-place compacted file if
1645        // 1) compaction mode is manual, and
1646        // 2) actual filename is different to the filename given by user.
1647        // In this case, set the in-place compaction flag.
1648        filemgr_set_in_place_compaction(handle->file, true);
1649    }
1650    if (filemgr_is_in_place_compaction_set(handle->file)) {
1651        // This file was in-place compacted.
1652        // set 'handle->filename' to the original filename to trigger file renaming
1653        compactor_get_virtual_filename(filename, virtual_filename);
1654        target_filename = virtual_filename;
1655    }
1656
1657    if (handle->filename) {
1658        handle->filename = (char *)realloc(handle->filename,
1659                                           strlen(target_filename)+1);
1660    } else {
1661        handle->filename = (char*)malloc(strlen(target_filename)+1);
1662    }
1663    strcpy(handle->filename, target_filename);
1664
1665    // If cloning from a snapshot handle, fdb_snapshot_open would have already
1666    // set handle->last_hdr_bid to the block id of required header, so rewind..
1667    if (handle->shandle && handle->last_hdr_bid) {
1668        status = filemgr_fetch_header(handle->file, handle->last_hdr_bid,
1669                                      header_buf, &header_len, &seqnum,
1670                                      &latest_header_revnum, &deltasize, &version,
1671                                      NULL, &handle->log_callback);
1672        if (status != FDB_RESULT_SUCCESS) {
1673            free(handle->filename);
1674            handle->filename = NULL;
1675            filemgr_close(handle->file, false, handle->filename,
1676                              &handle->log_callback);
1677            return status;
1678        }
1679    } else { // Normal open
1680        filemgr_get_header(handle->file, header_buf, &header_len,
1681                           &handle->last_hdr_bid, &seqnum, &latest_header_revnum);
1682        version = handle->file->version;
1683    }
1684
1685    // initialize the docio handle so kv headers may be read
1686    handle->dhandle = (struct docio_handle *)
1687                      calloc(1, sizeof(struct docio_handle));
1688    handle->dhandle->log_callback = &handle->log_callback;
1689    docio_init(handle->dhandle, handle->file, config->compress_document_body);
1690
1691    // fetch previous superblock bitmap info if exists
1692    // (this should be done after 'handle->dhandle' is initialized)
1693    if (handle->file->sb) {
1694        status = sb_bmp_fetch_doc(handle);
1695        if (status != FDB_RESULT_SUCCESS) {
1696            docio_free(handle->dhandle);
1697            free(handle->dhandle);
1698            free(handle->filename);
1699            handle->filename = NULL;
1700            filemgr_close(handle->file, false, handle->filename,
1701                              &handle->log_callback);
1702            return status;
1703        }
1704    }
1705
1706
1707    if (header_len > 0) {
1708        fdb_fetch_header(version, header_buf, &trie_root_bid, &seq_root_bid,
1709                         &stale_root_bid, &ndocs, &ndeletes, &nlivenodes,
1710                         &datasize, &last_wal_flush_hdr_bid, &kv_info_offset,
1711                         &header_flags, &compacted_filename, &prev_filename);
1712        // use existing setting for seqtree_opt
1713        if (header_flags & FDB_FLAG_SEQTREE_USE) {
1714            seqtree_opt = FDB_SEQTREE_USE;
1715        } else {
1716            seqtree_opt = FDB_SEQTREE_NOT_USE;
1717        }
1718        // Retrieve seqnum for multi-kv mode
1719        if (handle->kvs && handle->kvs->id > 0) {
1720            if (kv_info_offset != BLK_NOT_FOUND) {
1721                if (!filemgr_get_kv_header(handle->file)) {
1722                    struct kvs_header *kv_header;
1723                    _fdb_kvs_header_create(&kv_header);
1724                    // KV header already exists but not loaded .. read & import
1725                    fdb_kvs_header_read(kv_header, handle->dhandle,
1726                                        kv_info_offset, version, false);
1727                    if (!filemgr_set_kv_header(handle->file, kv_header,
1728                                               fdb_kvs_header_free)) {
1729                        _fdb_kvs_header_free(kv_header);
1730                    }
1731                }
1732                seqnum = _fdb_kvs_get_seqnum(handle->file->kv_header,
1733                                             handle->kvs->id);
1734            } else { // no kv_info offset, ok to set seqnum to zero
1735                seqnum = 0;
1736            }
1737        }
1738        // other flags
1739        if (header_flags & FDB_FLAG_ROOT_INITIALIZED) {
1740            handle->fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1741        }
1742        if (header_flags & FDB_FLAG_ROOT_CUSTOM_CMP) {
1743            handle->fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1744        }
1745        // use existing setting for multi KV instance mode
1746        if (kv_info_offset == BLK_NOT_FOUND) {
1747            multi_kv_instances = false;
1748        } else {
1749            multi_kv_instances = true;
1750        }
1751    }
1752
1753    handle->config = *config;
1754    handle->config.seqtree_opt = seqtree_opt;
1755    handle->config.multi_kv_instances = multi_kv_instances;
1756
1757    if (handle->shandle && handle->max_seqnum == FDB_SNAPSHOT_INMEM) {
1758        // Either an in-memory snapshot or cloning from an existing snapshot..
1759        hdr_bid = 0; // This prevents _fdb_restore_wal() as incoming handle's
1760                     // *_open() should have already restored it
1761    } else { // Persisted snapshot or file rollback..
1762
1763        // get the BID of the latest block
1764        // (it is OK if the block is not a DB header)
1765        bool dirty_data_exists = false;
1766        struct superblock *sb = handle->file->sb;
1767
1768        if (sb_bmp_exists(sb)) {
1769            dirty_data_exists = false;
1770            bid_t sb_last_hdr_bid = atomic_get_uint64_t(&sb->last_hdr_bid);
1771            if (sb_last_hdr_bid != BLK_NOT_FOUND) {
1772                // add 1 since we subtract 1 from 'hdr_bid' below soon
1773                hdr_bid = sb_last_hdr_bid + 1;
1774                if (atomic_get_uint64_t(&sb->cur_alloc_bid) != hdr_bid) {
1775                    // seq number has been increased since the last commit
1776                    seqnum = fdb_kvs_get_committed_seqnum(handle);
1777                }
1778            } else {
1779                hdr_bid = BLK_NOT_FOUND;
1780            }
1781        } else {
1782            hdr_bid = filemgr_get_pos(handle->file) / FDB_BLOCKSIZE;
1783            dirty_data_exists = (hdr_bid > handle->last_hdr_bid);
1784        }
1785
1786        if (hdr_bid == BLK_NOT_FOUND ||
1787            (sb && hdr_bid <= sb->config->num_sb)) {
1788            hdr_bid = 0;
1789        } else if (hdr_bid > 0) {
1790            --hdr_bid;
1791        }
1792
1793        if (handle->max_seqnum) {
1794            struct kvs_stat stat_ori;
1795            // backup original stats
1796            if (handle->kvs) {
1797                _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1798            } else {
1799                _kvs_stat_get(handle->file, 0, &stat_ori);
1800            }
1801
1802            if (dirty_data_exists){
1803                // uncommitted data exists beyond the last DB header
1804                // get the last committed seq number
1805                fdb_seqnum_t seq_commit;
1806                seq_commit = fdb_kvs_get_committed_seqnum(handle);
1807                if (seq_commit == 0 || seq_commit < handle->max_seqnum) {
1808                    // In case, snapshot_open is attempted with latest uncommitted
1809                    // sequence number
1810                    header_len = 0;
1811                } else if (seq_commit == handle->max_seqnum) {
1812                    // snapshot/rollback on the latest commit header
1813                    seqnum = seq_commit; // skip file reverse scan
1814                }
1815                hdr_bid = filemgr_get_header_bid(handle->file);
1816            }
1817            // Reverse scan the file to locate the DB header with seqnum marker
1818            while (header_len && seqnum != handle->max_seqnum) {
1819                hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
1820                                          header_buf, &header_len, &seqnum,
1821                                          &header_revnum, NULL, &version, NULL,
1822                                          &handle->log_callback);
1823                if (header_len == 0) {
1824                    continue; // header doesn't exist
1825                }
1826                fdb_fetch_header(version, header_buf, &trie_root_bid,
1827                                 &seq_root_bid, &stale_root_bid,
1828                                 &ndocs, &ndeletes, &nlivenodes,
1829                                 &datasize, &last_wal_flush_hdr_bid,
1830                                 &kv_info_offset, &header_flags,
1831                                 &compacted_filename, NULL);
1832                handle->last_hdr_bid = hdr_bid;
1833
1834                if (!handle->kvs || handle->kvs->id == 0) {
1835                    // single KVS mode OR default KVS
1836                    if (!handle->shandle) {
1837                        // rollback
1838                        struct kvs_stat stat_dst;
1839                        _kvs_stat_get(handle->file, 0, &stat_dst);
1840                        stat_dst.ndocs = ndocs;
1841                        stat_dst.ndeletes = ndeletes;
1842                        stat_dst.datasize = datasize;
1843                        stat_dst.nlivenodes = nlivenodes;
1844                        stat_dst.deltasize = deltasize;
1845                        _kvs_stat_set(handle->file, 0, stat_dst);
1846                    }
1847                    continue;
1848                }
1849
1850                int64_t doc_offset;
1851                struct kvs_header *kv_header;
1852                struct docio_object doc;
1853
1854                _fdb_kvs_header_create(&kv_header);
1855                memset(&doc, 0, sizeof(struct docio_object));
1856                doc_offset = docio_read_doc(handle->dhandle,
1857                                            kv_info_offset, &doc, true);
1858
1859                if (doc_offset <= 0) {
1860                    header_len = 0; // fail
1861                    _fdb_kvs_header_free(kv_header);
1862                } else {
1863                    _fdb_kvs_header_import(kv_header, doc.body,
1864                                           doc.length.bodylen, version, false);
1865                    // get local sequence number for the KV instance
1866                    seqnum = _fdb_kvs_get_seqnum(kv_header,
1867                                                 handle->kvs->id);
1868                    if (!handle->shandle) {
1869                        // rollback: replace kv_header stats
1870                        // read from the current header's kv_header
1871                        struct kvs_stat stat_src, stat_dst;
1872                        _kvs_stat_get_kv_header(kv_header,
1873                                                handle->kvs->id,
1874                                                &stat_src);
1875                        _kvs_stat_get(handle->file,
1876                                      handle->kvs->id,
1877                                      &stat_dst);
1878                        // update ndocs, datasize, nlivenodes
1879                        // into the current file's kv_header
1880                        // Note: stats related to WAL should not be updated
1881                        //       at this time. They will be adjusted through
1882                        //       discard & restore routines below.
1883                        stat_dst.ndocs = stat_src.ndocs;
1884                        stat_dst.datasize = stat_src.datasize;
1885                        stat_dst.nlivenodes = stat_src.nlivenodes;
1886                        _kvs_stat_set(handle->file,
1887                                      handle->kvs->id,
1888                                      stat_dst);
1889                    }
1890                    _fdb_kvs_header_free(kv_header);
1891                    free_docio_object(&doc, 1, 1, 1);
1892                }
1893            }
1894            if (!header_len) { // Marker MUST match that of DB commit!
1895                // rollback original stats
1896                if (handle->kvs) {
1897                    _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1898                } else {
1899                    _kvs_stat_get(handle->file, 0, &stat_ori);
1900                }
1901
1902                docio_free(handle->dhandle);
1903                free(handle->dhandle);
1904                free(handle->filename);
1905                free(prev_filename);
1906                handle->filename = NULL;
1907                filemgr_close(handle->file, false, handle->filename,
1908                              &handle->log_callback);
1909                return FDB_RESULT_NO_DB_INSTANCE;
1910            }
1911
1912            if (!handle->shandle) { // Rollback mode, destroy file WAL..
1913                if (handle->config.multi_kv_instances) {
1914                    // multi KV instance mode
1915                    // clear only WAL items belonging to the instance
1916                    wal_close_kv_ins(handle->file,
1917                                     (handle->kvs)?(handle->kvs->id):(0),
1918                                     &handle->log_callback);
1919                } else {
1920                    wal_shutdown(handle->file, &handle->log_callback);
1921                }
1922            }
1923        } else { // snapshot to sequence number 0 requested..
1924            if (handle->shandle) { // fdb_snapshot_open API call
1925                if (seqnum) {
1926                    // Database currently has a non-zero seq number,
1927                    // but the snapshot was requested with a seq number zero.
1928                    docio_free(handle->dhandle);
1929                    free(handle->dhandle);
1930                    free(handle->filename);
1931                    free(prev_filename);
1932                    handle->filename = NULL;
1933                    filemgr_close(handle->file, false, handle->filename,
1934                                  &handle->log_callback);
1935                    return FDB_RESULT_NO_DB_INSTANCE;
1936                }
1937            } // end of zero max_seqnum but non-rollback check
1938        } // end of zero max_seqnum check
1939    } // end of durable snapshot locating
1940
1941    handle->btreeblkops = btreeblk_get_ops();
1942    handle->bhandle = (struct btreeblk_handle *)
1943                      calloc(1, sizeof(struct btreeblk_handle));
1944    handle->bhandle->log_callback = &handle->log_callback;
1945
1946    handle->dirty_updates = 0;
1947
1948    if (handle->config.compaction_buf_maxsize == 0) {
1949        handle->config.compaction_buf_maxsize = FDB_COMP_BUF_MINSIZE;
1950    }
1951
1952    btreeblk_init(handle->bhandle, handle->file, handle->file->blocksize);
1953
1954    if (header_revnum && !filemgr_is_rollback_on(handle->file)) {
1955        // only for snapshot (excluding rollback)
1956        handle->cur_header_revnum = header_revnum;
1957    } else {
1958        handle->cur_header_revnum = latest_header_revnum;
1959    }
1960    handle->last_wal_flush_hdr_bid = last_wal_flush_hdr_bid;
1961
1962    memset(&empty_stat, 0x0, sizeof(empty_stat));
1963    _kvs_stat_get(handle->file, 0, &stat);
1964    if (!memcmp(&stat, &empty_stat, sizeof(stat))) { // first open
1965        // sync (default) KVS stat with DB header
1966        stat.nlivenodes = nlivenodes;
1967        stat.ndocs = ndocs;
1968        stat.datasize = datasize;
1969        _kvs_stat_set(handle->file, 0, stat);
1970    }
1971
1972    handle->kv_info_offset = kv_info_offset;
1973    if (handle->config.multi_kv_instances && !handle->shandle) {
1974        // multi KV instance mode
1975        filemgr_mutex_lock(handle->file);
1976        if (kv_info_offset == BLK_NOT_FOUND) {
1977            // there is no KV header .. create & initialize
1978            fdb_kvs_header_create(handle->file);
1979            // TODO: If another handle is opened before the first header is appended,
1980            // an unnecessary KV info doc is appended. We need to address it.
1981            kv_info_offset = fdb_kvs_header_append(handle);
1982        } else if (handle->file->kv_header == NULL) {
1983            // KV header already exists but not loaded .. read & import
1984            fdb_kvs_header_create(handle->file);
1985            fdb_kvs_header_read(handle->file->kv_header, handle->dhandle,
1986                                kv_info_offset, version, false);
1987        }
1988        filemgr_mutex_unlock(handle->file);
1989
1990        // validation check for key order of all KV stores
1991        if (handle == handle->fhandle->root) {
1992            fdb_status fs = fdb_kvs_cmp_check(handle);
1993            if (fs != FDB_RESULT_SUCCESS) { // cmp function mismatch
1994                docio_free(handle->dhandle);
1995                free(handle->dhandle);
1996                btreeblk_free(handle->bhandle);
1997                free(handle->bhandle);
1998                free(handle->filename);
1999                handle->filename = NULL;
2000                filemgr_close(handle->file, false, handle->filename,
2001                              &handle->log_callback);
2002                return fs;
2003            }
2004        }
2005    }
2006    handle->kv_info_offset = kv_info_offset;
2007
2008    if (handle->kv_info_offset != BLK_NOT_FOUND &&
2009        handle->kvs == NULL) {
2010        // multi KV instance mode .. turn on config flag
2011        handle->config.multi_kv_instances = true;
2012        // only super handle can be opened using fdb_open(...)
2013        fdb_kvs_info_create(NULL, handle, handle->file, NULL);
2014    }
2015
2016    if (handle->shandle) { // Populate snapshot stats..
2017        if (kv_info_offset == BLK_NOT_FOUND) { // Single KV mode
2018            memset(&handle->shandle->stat, 0x0,
2019                    sizeof(handle->shandle->stat));
2020            handle->shandle->stat.ndocs = ndocs;
2021            handle->shandle->stat.datasize = datasize;
2022            handle->shandle->stat.nlivenodes = nlivenodes;
2023        } else { // Multi KV instance mode, populate specific kv stats
2024            memset(&handle->shandle->stat, 0x0,
2025                    sizeof(handle->shandle->stat));
2026            _kvs_stat_get(handle->file, handle->kvs->id,
2027                    &handle->shandle->stat);
2028            // Since wal is restored below, we have to reset
2029            // wal stats to zero.
2030            handle->shandle->stat.wal_ndeletes = 0;
2031            handle->shandle->stat.wal_ndocs = 0;
2032        }
2033    }
2034
2035    // initialize pointer to the global operational stats of this KV store
2036    handle->op_stats = filemgr_get_ops_stats(handle->file, handle->kvs);
2037    if (!handle->op_stats) {
2038        const char *msg = "Database open fails due to the error in retrieving "
2039            "the global operational stats of KV store in a database file '%s'\n";
2040        fdb_log(&handle->log_callback, FDB_RESULT_OPEN_FAIL, msg,
2041                handle->file->filename);
2042        return FDB_RESULT_OPEN_FAIL;
2043    }
2044
2045    handle->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
2046    hbtrie_init(handle->trie, config->chunksize, OFFSET_SIZE,
2047                handle->file->blocksize, trie_root_bid,
2048                (void *)handle->bhandle, handle->btreeblkops,
2049                (void *)handle->dhandle, _fdb_readkey_wrap);
2050    // set aux for cmp wrapping function
2051    hbtrie_set_leaf_height_limit(handle->trie, 0xff);
2052    hbtrie_set_leaf_cmp(handle->trie, _fdb_custom_cmp_wrap);
2053
2054    if (handle->kvs) {
2055        hbtrie_set_map_function(handle->trie, fdb_kvs_find_cmp_chunk);
2056    }
2057
2058    handle->seqnum = seqnum;
2059    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2060        if (handle->config.multi_kv_instances) {
2061            // multi KV instance mode .. HB+trie
2062            handle->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
2063            hbtrie_init(handle->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
2064                        handle->file->blocksize, seq_root_bid,
2065                        (void *)handle->bhandle, handle->btreeblkops,
2066                        (void *)handle->dhandle, _fdb_readseq_wrap);
2067
2068        } else {
2069            // single KV instance mode .. normal B+tree
2070            struct btree_kv_ops *seq_kv_ops =
2071                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
2072            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
2073            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
2074
2075            handle->seqtree = (struct btree*)malloc(sizeof(struct btree));
2076            if (seq_root_bid == BLK_NOT_FOUND) {
2077                btree_init(handle->seqtree, (void *)handle->bhandle,
2078                           handle->btreeblkops, seq_kv_ops,
2079                           handle->config.blocksize, sizeof(fdb_seqnum_t),
2080                           OFFSET_SIZE, 0x0, NULL);
2081             }else{
2082                 btree_init_from_bid(handle->seqtree, (void *)handle->bhandle,
2083                                     handle->btreeblkops, seq_kv_ops,
2084                                     handle->config.blocksize, seq_root_bid);
2085             }
2086        }
2087    }else{
2088        handle->seqtree = NULL;
2089    }
2090
2091    // Stale-block tree (supported since MAGIC_002)
2092    // this tree is independent to multi/single KVS mode option
2093    if (ver_staletree_support(handle->file->version)) {
2094        // normal B+tree
2095        struct btree_kv_ops *stale_kv_ops =
2096            (struct btree_kv_ops *)calloc(1, sizeof(struct btree_kv_ops));
2097        stale_kv_ops = btree_kv_get_kb64_vb64(stale_kv_ops);
2098        stale_kv_ops->cmp = _cmp_uint64_t_endian_safe;
2099
2100        handle->staletree = (struct btree*)calloc(1, sizeof(struct btree));
2101        if (stale_root_bid == BLK_NOT_FOUND) {
2102            btree_init(handle->staletree, (void *)handle->bhandle,
2103                       handle->btreeblkops, stale_kv_ops,
2104                       handle->config.blocksize, sizeof(filemgr_header_revnum_t),
2105                       OFFSET_SIZE, 0x0, NULL);
2106         }else{
2107            btree_init_from_bid(handle->staletree, (void *)handle->bhandle,
2108                                handle->btreeblkops, stale_kv_ops,
2109                                handle->config.blocksize, stale_root_bid);
2110         }
2111    } else {
2112        handle->staletree = NULL;
2113    }
2114
2115    if (handle->config.multi_kv_instances && handle->max_seqnum) {
2116        // restore only docs belonging to the KV instance
2117        // handle->kvs should not be NULL
2118        _fdb_restore_wal(handle, FDB_RESTORE_KV_INS,
2119                         hdr_bid, (handle->kvs)?(handle->kvs->id):(0));
2120    } else {
2121        // normal restore
2122        _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, hdr_bid, 0);
2123    }
2124
2125    if (compacted_filename &&
2126        filemgr_get_file_status(handle->file) == FILE_NORMAL &&
2127        !(config->flags & FDB_OPEN_FLAG_RDONLY)) { // do not recover read-only
2128        _fdb_recover_compaction(handle, compacted_filename);
2129    }
2130
2131    if (prev_filename) {
2132        if (!handle->shandle && strcmp(prev_filename, handle->file->filename)) {
2133            // record the old filename into the file handle of current file
2134            // and REMOVE old file on the first open
2135            // WARNING: snapshots must have been opened before this call
2136            if (filemgr_update_file_status(handle->file,
2137                                           filemgr_get_file_status(handle->file),
2138                                           prev_filename)) {
2139                // Open the old file with read-only mode.
2140                // (Temporarily disable log callback at this time since
2141                //  the old file might be already removed.)
2142                fconfig.options = FILEMGR_READONLY;
2143                filemgr_open_result result = filemgr_open(prev_filename,
2144                                                          handle->fileops,
2145                                                          &fconfig,
2146                                                          NULL);
2147                if (result.file) {
2148                    filemgr_remove_pending(result.file, handle->file,
2149                                           &handle->log_callback);
2150                    filemgr_close(result.file, 0, handle->filename,
2151                                  &handle->log_callback);
2152                }
2153            } else {
2154                free(prev_filename);
2155            }
2156        } else {
2157            free(prev_filename);
2158        }
2159    }
2160
2161    status = btreeblk_end(handle->bhandle);
2162    if (status != FDB_RESULT_SUCCESS) {
2163        return status;
2164    }
2165
2166    // do not register read-only handles
2167    if (!(config->flags & FDB_OPEN_FLAG_RDONLY)) {
2168        if (config->compaction_mode == FDB_COMPACTION_AUTO) {
2169            status = compactor_register_file(handle->file,
2170                                             (fdb_config *)config,
2171                                             &handle->log_callback);
2172        }
2173        if (status == FDB_RESULT_SUCCESS) {
2174            status = bgflusher_register_file(handle->file,
2175                                             (fdb_config *)config,
2176                                             &handle->log_callback);
2177        }
2178    }
2179
2180    return status;
2181}
2182
2183LIBFDB_API
2184fdb_status fdb_set_log_callback(fdb_kvs_handle *handle,
2185                                fdb_log_callback log_callback,
2186                                void *ctx_data)
2187{
2188    handle->log_callback.callback = log_callback;
2189    handle->log_callback.ctx_data = ctx_data;
2190    return FDB_RESULT_SUCCESS;
2191}
2192
2193LIBFDB_API
2194void fdb_set_fatal_error_callback(fdb_fatal_error_callback err_callback)
2195{
2196    fatal_error_callback = err_callback;
2197}
2198
2199LIBFDB_API
2200fdb_status fdb_doc_create(fdb_doc **doc, const void *key, size_t keylen,
2201                          const void *meta, size_t metalen,
2202                          const void *body, size_t bodylen)
2203{
2204    if (doc == NULL || keylen > FDB_MAX_KEYLEN ||
2205        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
2206        return FDB_RESULT_INVALID_ARGS;
2207    }
2208
2209    *doc = (fdb_doc*)calloc(1, sizeof(fdb_doc));
2210    if (*doc == NULL) { // LCOV_EXCL_START
2211        return FDB_RESULT_ALLOC_FAIL;
2212    } // LCOV_EXCL_STOP
2213
2214    (*doc)->seqnum = SEQNUM_NOT_USED;
2215
2216    if (key && keylen > 0) {
2217        (*doc)->key = (void *)malloc(keylen);
2218        if ((*doc)->key == NULL) { // LCOV_EXCL_START
2219            return FDB_RESULT_ALLOC_FAIL;
2220        } // LCOV_EXCL_STOP
2221        memcpy((*doc)->key, key, keylen);
2222        (*doc)->keylen = keylen;
2223    } else {
2224        (*doc)->key = NULL;
2225        (*doc)->keylen = 0;
2226    }
2227
2228    if (meta && metalen > 0) {
2229        (*doc)->meta = (void *)malloc(metalen);
2230        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
2231            return FDB_RESULT_ALLOC_FAIL;
2232        } // LCOV_EXCL_STOP
2233        memcpy((*doc)->meta, meta, metalen);
2234        (*doc)->metalen = metalen;
2235    } else {
2236        (*doc)->meta = NULL;
2237        (*doc)->metalen = 0;
2238    }
2239
2240    if (body && bodylen > 0) {
2241        (*doc)->body = (void *)malloc(bodylen);
2242        if ((*doc)->body == NULL) { // LCOV_EXCL_START
2243            return FDB_RESULT_ALLOC_FAIL;
2244        } // LCOV_EXCL_STOP
2245        memcpy((*doc)->body, body, bodylen);
2246        (*doc)->bodylen = bodylen;
2247    } else {
2248        (*doc)->body = NULL;
2249        (*doc)->bodylen = 0;
2250    }
2251
2252    return FDB_RESULT_SUCCESS;
2253}
2254
2255LIBFDB_API
2256fdb_status fdb_doc_update(fdb_doc **doc,
2257                          const void *meta, size_t metalen,
2258                          const void *body, size_t bodylen)
2259{
2260    if (doc == NULL ||
2261        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
2262        return FDB_RESULT_INVALID_ARGS;
2263    }
2264    if (*doc == NULL) {
2265        return FDB_RESULT_INVALID_ARGS;
2266    }
2267
2268    if (meta && metalen > 0) {
2269        // free previous metadata
2270        free((*doc)->meta);
2271        // allocate new metadata
2272        (*doc)->meta = (void *)malloc(metalen);
2273        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
2274            return FDB_RESULT_ALLOC_FAIL;
2275        } // LCOV_EXCL_STOP
2276        memcpy((*doc)->meta, meta, metalen);
2277        (*doc)->metalen = metalen;
2278    }
2279
2280    if (body && bodylen > 0) {
2281        // free previous body
2282        free((*doc)->body);
2283        // allocate new body
2284        (*doc)->body = (void *)malloc(bodylen);
2285        if ((*doc)->body == NULL) { // LCOV_EXCL_START
2286            return FDB_RESULT_ALLOC_FAIL;
2287        } // LCOV_EXCL_STOP
2288        memcpy((*doc)->body, body, bodylen);
2289        (*doc)->bodylen = bodylen;
2290    }
2291
2292    (*doc)->seqnum = SEQNUM_NOT_USED;
2293    return FDB_RESULT_SUCCESS;
2294}
2295
2296LIBFDB_API
2297void fdb_doc_set_seqnum(fdb_doc *doc,
2298                        const fdb_seqnum_t seqnum)
2299{
2300    doc->seqnum = seqnum;
2301    if (seqnum != SEQNUM_NOT_USED) {
2302        doc->flags |= FDB_CUSTOM_SEQNUM; // fdb_set will now use above seqnum
2303    } else { // reset custom seqnum flag, fdb_set will now generate new seqnum
2304        doc->flags &= ~FDB_CUSTOM_SEQNUM;
2305    }
2306}
2307
2308// doc MUST BE allocated by malloc
2309LIBFDB_API
2310fdb_status fdb_doc_free(fdb_doc *doc)
2311{
2312    if (doc) {
2313        free(doc->key);
2314        free(doc->meta);
2315        free(doc->body);
2316        free(doc);
2317    }
2318    return FDB_RESULT_SUCCESS;
2319}
2320
2321INLINE uint64_t _fdb_wal_get_old_offset(void *voidhandle,
2322                                        struct wal_item *item)
2323{
2324    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
2325    uint64_t old_offset = 0;
2326
2327    if (item->action == WAL_ACT_REMOVE) {
2328        // For immediate remove, old_offset value is critical
2329        // so that we should get an exact value.
2330        hbtrie_find(handle->trie,
2331                    item->header->key,
2332                    item->header->keylen,
2333                    (void*)&old_offset);
2334    } else {
2335        hbtrie_find_offset(handle->trie,
2336                           item->header->key,
2337                           item->header->keylen,
2338                           (void*)&old_offset);
2339    }
2340    btreeblk_end(handle->bhandle);
2341    old_offset = _endian_decode(old_offset);
2342
2343    return old_offset;
2344}
2345
2346// A stale sequence number entry that can be purged from the sequence tree
2347// during the WAL flush.
2348struct wal_stale_seq_entry {
2349    fdb_kvs_id_t kv_id;
2350    fdb_seqnum_t seqnum;
2351    struct avl_node avl_entry;
2352};
2353
2354// Delta changes in KV store stats during the WAL flush
2355struct wal_kvs_delta_stat {
2356    fdb_kvs_id_t kv_id;
2357    int64_t nlivenodes;
2358    int64_t ndocs;
2359    int64_t ndeletes;
2360    int64_t datasize;
2361    int64_t deltasize;
2362    struct avl_node avl_entry;
2363};
2364
2365INLINE int _fdb_seq_entry_cmp(struct avl_node *a, struct avl_node *b, void *aux)
2366{
2367    (void) aux;
2368    struct wal_stale_seq_entry *entry1 = _get_entry(a, struct wal_stale_seq_entry,
2369                                                    avl_entry);
2370    struct wal_stale_seq_entry *entry2 = _get_entry(b, struct wal_stale_seq_entry,
2371                                                    avl_entry);
2372    if (entry1->kv_id < entry2->kv_id) {
2373        return -1;
2374    } else if (entry1->kv_id > entry2->kv_id) {
2375        return 1;
2376    } else {
2377        return _CMP_U64(entry1->seqnum, entry2->seqnum);
2378    }
2379}
2380
2381
2382// Compare function to sort KVS delta stat entries in the AVL tree during WAL flush
2383INLINE int _kvs_delta_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
2384{
2385    (void) aux;
2386    struct wal_kvs_delta_stat *stat1 = _get_entry(a, struct wal_kvs_delta_stat,
2387                                                  avl_entry);
2388    struct wal_kvs_delta_stat *stat2 = _get_entry(b, struct wal_kvs_delta_stat,
2389                                                  avl_entry);
2390    if (stat1->kv_id < stat2->kv_id) {
2391        return -1;
2392    } else if (stat1->kv_id > stat2->kv_id) {
2393        return 1;
2394    } else {
2395        return 0;
2396    }
2397}
2398
2399INLINE void _fdb_wal_flush_seq_purge(void *dbhandle,
2400                                     struct avl_tree *stale_seqnum_list,
2401                                     struct avl_tree *kvs_delta_stats)
2402{
2403    fdb_seqnum_t _seqnum;
2404    int64_t nlivenodes;
2405    int64_t ndeltanodes;
2406    int64_t delta;
2407    uint8_t kvid_seqnum[sizeof(fdb_kvs_id_t) + sizeof(fdb_seqnum_t)];
2408    struct wal_stale_seq_entry *seq_entry;
2409    struct wal_kvs_delta_stat *delta_stat;
2410    struct wal_kvs_delta_stat kvs_delta_query;
2411
2412    fdb_kvs_handle *handle = (fdb_kvs_handle *)dbhandle;
2413    struct avl_node *node = avl_first(stale_seqnum_list);
2414    while (node) {
2415        seq_entry = _get_entry(node, struct wal_stale_seq_entry, avl_entry);
2416        node = avl_next(node);
2417        nlivenodes = handle->bhandle->nlivenodes;
2418        ndeltanodes = handle->bhandle->ndeltanodes;
2419        _seqnum = _endian_encode(seq_entry->seqnum);
2420        if (handle->kvs) {
2421            // multi KV instance mode .. HB+trie
2422            kvid2buf(sizeof(fdb_kvs_id_t), seq_entry->kv_id, kvid_seqnum);
2423            memcpy(kvid_seqnum + sizeof(fdb_kvs_id_t), &_seqnum, sizeof(fdb_seqnum_t));
2424            hbtrie_remove(handle->seqtrie, (void*)kvid_seqnum,
2425                          sizeof(fdb_kvs_id_t) + sizeof(fdb_seqnum_t));
2426        } else {
2427            btree_remove(handle->seqtree, (void*)&_seqnum);
2428        }
2429        btreeblk_end(handle->bhandle);
2430
2431        kvs_delta_query.kv_id = seq_entry->kv_id;
2432        avl_node *delta_stat_node = avl_search(kvs_delta_stats,
2433                                               &kvs_delta_query.avl_entry,
2434                                               _kvs_delta_stat_cmp);
2435        if (delta_stat_node) {
2436            delta_stat = _get_entry(delta_stat_node, struct wal_kvs_delta_stat,
2437                                    avl_entry);
2438            delta = handle->bhandle->nlivenodes - nlivenodes;
2439            delta_stat->nlivenodes += delta;
2440            delta = handle->bhandle->ndeltanodes - ndeltanodes;
2441            delta *= handle->config.blocksize;
2442            delta_stat->deltasize += delta;
2443        }
2444        avl_remove(stale_seqnum_list, &seq_entry->avl_entry);
2445        free(seq_entry);
2446    }
2447}
2448
2449INLINE void _fdb_wal_flush_kvs_delta_stats(struct filemgr *file,
2450                                           struct avl_tree *kvs_delta_stats)
2451{
2452    struct avl_node *node;
2453    struct wal_kvs_delta_stat *delta_stat;
2454    node = avl_first(kvs_delta_stats);
2455    while (node) {
2456        delta_stat = _get_entry(node, struct wal_kvs_delta_stat, avl_entry);
2457        node = avl_next(node);
2458        _kvs_stat_update_attr(file, delta_stat->kv_id,
2459                              KVS_STAT_DATASIZE, delta_stat->datasize);
2460        _kvs_stat_update_attr(file, delta_stat->kv_id,
2461                              KVS_STAT_NDOCS, delta_stat->ndocs);
2462        _kvs_stat_update_attr(file, delta_stat->kv_id,
2463                              KVS_STAT_NDELETES, delta_stat->ndeletes);
2464        _kvs_stat_update_attr(file, delta_stat->kv_id,
2465                              KVS_STAT_NLIVENODES, delta_stat->nlivenodes);
2466        _kvs_stat_update_attr(file, delta_stat->kv_id,
2467                              KVS_STAT_DELTASIZE, delta_stat->deltasize);
2468        avl_remove(kvs_delta_stats, &delta_stat->avl_entry);
2469        free(delta_stat);
2470    }
2471}
2472
2473INLINE fdb_status _fdb_wal_flush_func(void *voidhandle,
2474                                      struct wal_item *item,
2475                                      struct avl_tree *stale_seqnum_list,
2476                                      struct avl_tree *kvs_delta_stats)
2477{
2478    hbtrie_result hr;
2479    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
2480    fdb_seqnum_t _seqnum;
2481    fdb_kvs_id_t kv_id = 0;
2482    fdb_status fs = FDB_RESULT_SUCCESS;
2483    uint8_t *var_key = alca(uint8_t, handle->config.chunksize);
2484    int size_id, size_seq;
2485    uint8_t *kvid_seqnum;
2486    uint64_t old_offset;
2487    int64_t _offset;
2488    int64_t delta;
2489    struct docio_object _doc;
2490    struct filemgr *file = handle->dhandle->file;
2491
2492    memset(var_key, 0, handle->config.chunksize);
2493    if (handle->kvs) {
2494        buf2kvid(handle->config.chunksize, item->header->key, &kv_id);
2495    } else {
2496        kv_id = 0;
2497    }
2498
2499    struct wal_kvs_delta_stat *kvs_delta_stat;
2500    struct wal_kvs_delta_stat kvs_delta_query;
2501    kvs_delta_query.kv_id = kv_id;
2502    avl_node *delta_stat_node = avl_search(kvs_delta_stats,
2503                                           &kvs_delta_query.avl_entry,
2504                                           _kvs_delta_stat_cmp);
2505    if (delta_stat_node) {
2506        kvs_delta_stat = _get_entry(delta_stat_node, struct wal_kvs_delta_stat,
2507                                    avl_entry);
2508    } else {
2509        kvs_delta_stat = (struct wal_kvs_delta_stat *)
2510            calloc(1, sizeof(struct wal_kvs_delta_stat));
2511        kvs_delta_stat->kv_id = kv_id;
2512        avl_insert(kvs_delta_stats, &kvs_delta_stat->avl_entry,
2513                   _kvs_delta_stat_cmp);
2514    }
2515
2516    int64_t nlivenodes = handle->bhandle->nlivenodes;
2517    int64_t ndeltanodes = handle->bhandle->ndeltanodes;
2518
2519    if (item->action == WAL_ACT_INSERT ||
2520        item->action == WAL_ACT_LOGICAL_REMOVE) {
2521        _offset = _endian_encode(item->offset);
2522
2523        hbtrie_insert(handle->trie,
2524                      item->header->key,
2525                      item->header->keylen,
2526                      (void *)&_offset,
2527                      (void *)&old_offset);
2528
2529        fs = btreeblk_end(handle->bhandle);
2530        if (fs != FDB_RESULT_SUCCESS) {
2531            return fs;
2532        }
2533        old_offset = _endian_decode(old_offset);
2534
2535        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2536            _seqnum = _endian_encode(item->seqnum);
2537            if (handle->kvs) {
2538                // multi KV instance mode .. HB+trie
2539                uint64_t old_offset_local;
2540
2541                size_id = sizeof(fdb_kvs_id_t);
2542                size_seq = sizeof(fdb_seqnum_t);
2543                kvid_seqnum = alca(uint8_t, size_id + size_seq);
2544                kvid2buf(size_id, kv_id, kvid_seqnum);
2545                memcpy(kvid_seqnum + size_id, &_seqnum, size_seq);
2546                hbtrie_insert(handle->seqtrie, kvid_seqnum, size_id + size_seq,
2547                              (void *)&_offset, (void *)&old_offset_local);
2548            } else {
2549                btree_insert(handle->seqtree, (void *)&_seqnum,
2550                             (void *)&_offset);
2551            }
2552            fs = btreeblk_end(handle->bhandle);
2553            if (fs != FDB_RESULT_SUCCESS) {
2554                return fs;
2555            }
2556        }
2557
2558        delta = handle->bhandle->nlivenodes - nlivenodes;
2559        kvs_delta_stat->nlivenodes += delta;
2560        delta = handle->bhandle->ndeltanodes - ndeltanodes;
2561        delta *= handle->config.blocksize;
2562        kvs_delta_stat->deltasize += delta;
2563
2564        if (old_offset == BLK_NOT_FOUND) {
2565            if (item->action == WAL_ACT_INSERT) {
2566                ++kvs_delta_stat->ndocs;
2567            } else { // inserted a logical deleted doc into main index
2568                ++kvs_delta_stat->ndeletes;
2569            }
2570            kvs_delta_stat->datasize += item->doc_size;
2571            kvs_delta_stat->deltasize += item->doc_size;
2572        } else { // update or logical delete
2573            // This block is already cached when we call HBTRIE_INSERT.
2574            // No additional block access.
2575            char dummy_key[FDB_MAX_KEYLEN];
2576            _doc.meta = _doc.body = NULL;
2577            _doc.key = &dummy_key;
2578            _offset = docio_read_doc_key_meta(handle->dhandle, old_offset,
2579                                              &_doc, true);
2580            if (_offset < 0) {
2581                return (fdb_status) _offset;
2582            } else if (_offset == 0) {
2583                // Note that this is not an error as old_offset is pointing to
2584                // the zero-filled region in a document block.
2585                return FDB_RESULT_KEY_NOT_FOUND;
2586            }
2587            free(_doc.meta);
2588            filemgr_mark_stale(file, old_offset, _fdb_get_docsize(_doc.length));
2589
2590            if (!(_doc.length.flag & DOCIO_DELETED)) {//prev doc was not deleted
2591                if (item->action == WAL_ACT_LOGICAL_REMOVE) { // now deleted
2592                    --kvs_delta_stat->ndocs;
2593                    ++kvs_delta_stat->ndeletes;
2594                } // else no change (prev doc was insert, now just an update)
2595            } else { // prev doc in main index was a logically deleted doc
2596                if (item->action == WAL_ACT_INSERT) { // now undeleted
2597                    ++kvs_delta_stat->ndocs;
2598                    --kvs_delta_stat->ndeletes;
2599                } // else no change (prev doc was deleted, now re-deleted)
2600            }
2601
2602            delta = (int)item->doc_size - (int)_fdb_get_docsize(_doc.length);
2603            kvs_delta_stat->datasize += delta;
2604            if (handle->last_hdr_bid * handle->config.blocksize < old_offset) {
2605                kvs_delta_stat->deltasize += delta;
2606            } else {
2607                kvs_delta_stat->deltasize += (int)item->doc_size;
2608            }
2609
2610            // Avoid duplicates (remove previous sequence number)
2611            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2612                struct wal_stale_seq_entry *entry = (struct wal_stale_seq_entry *)
2613                    calloc(1, sizeof(struct wal_stale_seq_entry));
2614                entry->kv_id = kv_id;
2615                entry->seqnum = _doc.seqnum;
2616                avl_insert(stale_seqnum_list, &entry->avl_entry,
2617                           _fdb_seq_entry_cmp);
2618            }
2619        }
2620    } else {
2621        // Immediate remove
2622        old_offset = item->old_offset;
2623        hr = hbtrie_remove(handle->trie, item->header->key,
2624                           item->header->keylen);
2625        fs = btreeblk_end(handle->bhandle);
2626        if (fs != FDB_RESULT_SUCCESS) {
2627            return fs;
2628        }
2629
2630        if (hr == HBTRIE_RESULT_SUCCESS) {
2631            // This block is already cached when we call _fdb_wal_get_old_offset
2632            // No additional block access should be done.
2633            char dummy_key[FDB_MAX_KEYLEN];
2634            _doc.meta = _doc.body = NULL;
2635            _doc.key = &dummy_key;
2636            _offset = docio_read_doc_key_meta(handle->dhandle, old_offset,
2637                                              &_doc, true);
2638            if (_offset < 0) {
2639                return (fdb_status) _offset;
2640            } else if (_offset == 0) {
2641                return FDB_RESULT_KEY_NOT_FOUND;
2642            }
2643            free(_doc.meta);
2644            filemgr_mark_stale(file, old_offset, _fdb_get_docsize(_doc.length));
2645
2646            // Reduce the total number of docs by one
2647            --kvs_delta_stat->ndocs;
2648            if (_doc.length.flag & DOCIO_DELETED) {//prev deleted doc is dropped
2649                --kvs_delta_stat->ndeletes;
2650            }
2651
2652            // Reduce the total datasize by size of previously present doc
2653            delta = -(int)_fdb_get_docsize(_doc.length);
2654            kvs_delta_stat->datasize += delta;
2655            // if multiple wal flushes happen before commit, then it's possible
2656            // that this doc deleted was inserted & flushed after last commit
2657            // In this case we need to update the deltasize too which tracks
2658            // the amount of new data inserted between commits.
2659            if (handle->last_hdr_bid * handle->config.blocksize < old_offset) {
2660                kvs_delta_stat->deltasize += delta;
2661            }
2662
2663            // remove sequence number for the removed doc
2664            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2665                struct wal_stale_seq_entry *entry = (struct wal_stale_seq_entry *)
2666                    calloc(1, sizeof(struct wal_stale_seq_entry));
2667                entry->kv_id = kv_id;
2668                entry->seqnum = _doc.seqnum;
2669                avl_insert(stale_seqnum_list, &entry->avl_entry, _fdb_seq_entry_cmp);
2670            }
2671
2672            // Update index size to new size after the remove operation
2673            delta = handle->bhandle->nlivenodes - nlivenodes;
2674            kvs_delta_stat->nlivenodes += delta;
2675
2676            // ndeltanodes measures number of new index nodes created due to
2677            // this hbtrie_remove() operation
2678            delta = (int)handle->bhandle->ndeltanodes - ndeltanodes;
2679            delta *= handle->config.blocksize;
2680            kvs_delta_stat->deltasize += delta;
2681        }
2682    }
2683    return FDB_RESULT_SUCCESS;
2684}
2685
2686void fdb_sync_db_header(fdb_kvs_handle *handle)
2687{
2688    uint64_t cur_revnum = filemgr_get_header_revnum(handle->file);
2689    if (handle->cur_header_revnum != cur_revnum) {
2690        void *header_buf = NULL;
2691        size_t header_len;
2692        bid_t hdr_bid;
2693        filemgr_header_revnum_t revnum;
2694
2695        header_buf = filemgr_get_header(handle->file, NULL, &header_len,
2696                                        &hdr_bid, NULL, &revnum);
2697        if (header_len > 0) {
2698            uint64_t header_flags, dummy64, version;
2699            bid_t idtree_root;
2700            bid_t new_seq_root;
2701            bid_t new_stale_root;
2702            char *compacted_filename;
2703            char *prev_filename = NULL;
2704
2705            version = handle->file->version;
2706            handle->last_hdr_bid = hdr_bid;
2707            handle->cur_header_revnum = revnum;
2708
2709            fdb_fetch_header(version, header_buf, &idtree_root,
2710                             &new_seq_root, &new_stale_root, &dummy64,
2711                             &dummy64, &dummy64,
2712                             &dummy64, &handle->last_wal_flush_hdr_bid,
2713                             &handle->kv_info_offset, &header_flags,
2714                             &compacted_filename, &prev_filename);
2715
2716            if (handle->dirty_updates) {
2717                // discard all cached writable b+tree nodes
2718                // to avoid data inconsistency with other writers
2719                btreeblk_discard_blocks(handle->bhandle);
2720            }
2721
2722            handle->trie->root_bid = idtree_root;
2723
2724            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2725                if (new_seq_root != handle->seqtree->root_bid) {
2726                    if (handle->config.multi_kv_instances) {
2727                        handle->seqtrie->root_bid = new_seq_root;
2728                    } else {
2729                        btree_init_from_bid(handle->seqtree,
2730                                            handle->seqtree->blk_handle,
2731                                            handle->seqtree->blk_ops,
2732                                            handle->seqtree->kv_ops,
2733                                            handle->seqtree->blksize,
2734                                            new_seq_root);
2735                    }
2736                }
2737            }
2738
2739            if (ver_staletree_support(version)) {
2740                btree_init_from_bid(handle->staletree,
2741                                    handle->staletree->blk_handle,
2742                                    handle->staletree->blk_ops,
2743                                    handle->staletree->kv_ops,
2744                                    handle->staletree->blksize,
2745                                    new_stale_root);
2746            } else {
2747                handle->staletree = NULL;
2748            }
2749
2750            if (prev_filename) {
2751                free(prev_filename);
2752            }
2753
2754            handle->dirty_updates = 0;
2755            if (handle->kvs) {
2756                // multiple KV instance mode AND sub handle
2757                handle->seqnum = fdb_kvs_get_seqnum(handle->file,
2758                                                    handle->kvs->id);
2759            } else {
2760                // super handle OR single KV instance mode
2761                handle->seqnum = filemgr_get_seqnum(handle->file);
2762            }
2763        } else {
2764            handle->last_hdr_bid = filemgr_get_header_bid(handle->file);
2765        }
2766
2767        if (header_buf) {
2768            free(header_buf);
2769        }
2770    }
2771}
2772
2773fdb_status fdb_check_file_reopen(fdb_kvs_handle *handle, file_status_t *status)
2774{
2775    bool fhandle_ret;
2776    fdb_status fs = FDB_RESULT_SUCCESS;
2777    file_status_t fstatus = filemgr_get_file_status(handle->file);
2778    // check whether the compaction is done
2779    if (fstatus == FILE_REMOVED_PENDING) {
2780        uint64_t ndocs, ndeletes, datasize, nlivenodes, last_wal_flush_hdr_bid;
2781        uint64_t kv_info_offset, header_flags;
2782        size_t header_len;
2783        char *new_filename;
2784        uint8_t *buf = alca(uint8_t, handle->config.blocksize);
2785        bid_t trie_root_bid, seq_root_bid, stale_root_bid;
2786        fdb_config config = handle->config;
2787
2788        // close the current file and newly open the new file
2789        if (handle->config.compaction_mode == FDB_COMPACTION_AUTO) {
2790            // compaction daemon mode .. just close and then open
2791            char filename[FDB_MAX_FILENAME_LEN];
2792            strcpy(filename, handle->filename);
2793
2794            // We don't need to maintain fhandle list for the old file
2795            // as there will be no more mutation on the file.
2796            fhandle_ret = filemgr_fhandle_remove(handle->file, handle->fhandle);
2797            fs = _fdb_close(handle);
2798            if (fs != FDB_RESULT_SUCCESS) {
2799                if (fhandle_ret) {
2800                    filemgr_fhandle_add(handle->file, handle->fhandle);
2801                }
2802                return fs;
2803            }
2804
2805            fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
2806            if (fs != FDB_RESULT_SUCCESS) {
2807                return fs;
2808            }
2809            filemgr_fhandle_add(handle->file, handle->fhandle);
2810
2811        } else {
2812            filemgr_get_header(handle->file, buf, &header_len, NULL, NULL, NULL);
2813            fdb_fetch_header(handle->file->version, buf,
2814                             &trie_root_bid, &seq_root_bid, &stale_root_bid,
2815                             &ndocs, &ndeletes, &nlivenodes, &datasize,
2816                             &last_wal_flush_hdr_bid,
2817                             &kv_info_offset, &header_flags,
2818                             &new_filename, NULL);
2819
2820            fhandle_ret = filemgr_fhandle_remove(handle->file, handle->fhandle);
2821            fs = _fdb_close(handle);
2822            if (fs != FDB_RESULT_SUCCESS) {
2823                if (fhandle_ret) {
2824                    filemgr_fhandle_add(handle->file, handle->fhandle);
2825                }
2826                return fs;
2827            }
2828
2829            fs = _fdb_open(handle, new_filename, FDB_AFILENAME, &config);
2830            if (fs != FDB_RESULT_SUCCESS) {
2831                return fs;
2832            }
2833            filemgr_fhandle_add(handle->file, handle->fhandle);
2834        }
2835    }
2836    if (status) {
2837        *status = fstatus;
2838    }
2839    return fs;
2840}
2841
2842static void _fdb_sync_dirty_root(fdb_kvs_handle *handle)
2843{
2844    bid_t dirty_idtree_root = BLK_NOT_FOUND;
2845    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
2846
2847    if (handle->shandle) {
2848        // skip snapshot
2849        return;
2850    }
2851
2852    struct filemgr_dirty_update_node *dirty_update;
2853    dirty_update = filemgr_dirty_update_get_latest(handle->file);
2854    btreeblk_set_dirty_update(handle->bhandle, dirty_update);
2855
2856    if (dirty_update) {
2857        filemgr_dirty_update_get_root(handle->file, dirty_update,
2858                                      &dirty_idtree_root, &dirty_seqtree_root);
2859        _fdb_import_dirty_root(handle, dirty_idtree_root, dirty_seqtree_root);
2860        btreeblk_discard_blocks(handle->bhandle);
2861    }
2862
2863    return;
2864}
2865
2866static void _fdb_release_dirty_root(fdb_kvs_handle *handle)
2867{
2868    if (!handle->shandle) {
2869        struct filemgr_dirty_update_node *dirty_update;
2870        dirty_update = btreeblk_get_dirty_update(handle->bhandle);
2871        if (dirty_update) {
2872            filemgr_dirty_update_close_node(handle->file, dirty_update);
2873            btreeblk_clear_dirty_update(handle->bhandle);
2874        }
2875    }
2876}
2877
2878LIBFDB_API
2879fdb_status fdb_get(fdb_kvs_handle *handle, fdb_doc *doc)
2880{
2881    uint64_t offset;
2882    int64_t _offset;
2883    struct docio_object _doc;
2884    struct filemgr *wal_file = NULL;
2885    struct docio_handle *dhandle;
2886    struct _fdb_key_cmp_info cmp_info;
2887    fdb_status wr;
2888    hbtrie_result hr = HBTRIE_RESULT_FAIL;
2889    fdb_txn *txn;
2890    fdb_doc doc_kv;
2891    LATENCY_STAT_START();
2892
2893    if (!handle || !doc || !doc->key || doc->keylen == 0 ||
2894        doc->keylen > FDB_MAX_KEYLEN ||
2895        (handle->kvs_config.custom_cmp &&
2896            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2897        return FDB_RESULT_INVALID_ARGS;
2898    }
2899
2900    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2901        return FDB_RESULT_HANDLE_BUSY;
2902    }
2903
2904    doc_kv = *doc;
2905
2906    if (handle->kvs) {
2907        // multi KV instance mode
2908        int size_chunk = handle->config.chunksize;
2909        doc_kv.keylen = doc->keylen + size_chunk;
2910        doc_kv.key = alca(uint8_t, doc_kv.keylen);
2911        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
2912        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
2913    }
2914
2915    if (!handle->shandle) {
2916        fdb_check_file_reopen(handle, NULL);
2917        txn = handle->fhandle->root->txn;
2918        if (!txn) {
2919            txn = &handle->file->global_txn;
2920        }
2921    } else {
2922        txn = handle->shandle->snap_txn;
2923    }
2924
2925    cmp_info.kvs_config = handle->kvs_config;
2926    cmp_info.kvs = handle->kvs;
2927    wal_file = handle->file;
2928    dhandle = handle->dhandle;
2929
2930    if (handle->kvs) {
2931        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, &doc_kv,
2932                      &offset);
2933    } else {
2934        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, doc,
2935                      &offset);
2936    }
2937
2938    if (!handle->shandle) {
2939        fdb_sync_db_header(handle);
2940    }
2941
2942    atomic_incr_uint64_t(&handle->op_stats->num_gets, std::memory_order_relaxed);
2943
2944    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2945        _fdb_sync_dirty_root(handle);
2946
2947        if (handle->kvs) {
2948            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
2949                             (void *)&offset);
2950        } else {
2951            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
2952                             (void *)&offset);
2953        }
2954        btreeblk_end(handle->bhandle);
2955        offset = _endian_decode(offset);
2956
2957        _fdb_release_dirty_root(handle);
2958    }
2959
2960    if ((wr == FDB_RESULT_SUCCESS && offset != BLK_NOT_FOUND) ||
2961         hr == HBTRIE_RESULT_SUCCESS) {
2962        bool alloced_meta = doc->meta ? false : true;
2963        bool alloced_body = doc->body ? false : true;
2964        if (handle->kvs) {
2965            _doc.key = doc_kv.key;
2966            _doc.length.keylen = doc_kv.keylen;
2967            doc->deleted = doc_kv.deleted; // update deleted field if wal_find
2968        } else {
2969            _doc.key = doc->key;
2970            _doc.length.keylen = doc->keylen;
2971        }
2972        _doc.meta = doc->meta;
2973        _doc.body = doc->body;
2974
2975        if (wr == FDB_RESULT_SUCCESS && doc->deleted) {
2976            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
2977            return FDB_RESULT_KEY_NOT_FOUND;
2978        }
2979
2980        _offset = docio_read_doc(dhandle, offset, &_doc, true);
2981        if (_offset <= 0) {
2982            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
2983            return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
2984        }
2985
2986        if (_doc.length.keylen != doc_kv.keylen ||
2987            _doc.length.flag & DOCIO_DELETED) {
2988            free_docio_object(&_doc, 0, alloced_meta, alloced_body);
2989            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
2990            return FDB_RESULT_KEY_NOT_FOUND;
2991        }
2992
2993        doc->seqnum = _doc.seqnum;
2994        doc->metalen = _doc.length.metalen;
2995        doc->bodylen = _doc.length.bodylen;
2996        doc->meta = _doc.meta;
2997        doc->body = _doc.body;
2998        doc->deleted = _doc.length.flag & DOCIO_DELETED;
2999        doc->size_ondisk = _fdb_get_docsize(_doc.length);
3000        doc->offset = offset;
3001
3002        LATENCY_STAT_END(handle->file, FDB_LATENCY_GETS);
3003        atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3004        return FDB_RESULT_SUCCESS;
3005    }
3006
3007    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3008    return FDB_RESULT_KEY_NOT_FOUND;
3009}
3010
3011// search document metadata using key
3012LIBFDB_API
3013fdb_status fdb_get_metaonly(fdb_kvs_handle *handle, fdb_doc *doc)
3014{
3015    uint64_t offset;
3016    struct docio_object _doc;
3017    struct docio_handle *dhandle;
3018    struct filemgr *wal_file = NULL;
3019    fdb_status wr;
3020    hbtrie_result hr = HBTRIE_RESULT_FAIL;
3021    fdb_txn *txn;
3022    struct _fdb_key_cmp_info cmp_info;
3023    fdb_doc doc_kv;
3024    LATENCY_STAT_START();
3025
3026    if (!handle || !doc || !doc->key ||
3027        doc->keylen == 0 || doc->keylen > FDB_MAX_KEYLEN ||
3028        (handle->kvs_config.custom_cmp &&
3029            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
3030        return FDB_RESULT_INVALID_ARGS;
3031    }
3032
3033    doc_kv = *doc;
3034
3035    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
3036        return FDB_RESULT_HANDLE_BUSY;
3037    }
3038
3039    if (handle->kvs) {
3040        // multi KV instance mode
3041        int size_chunk = handle->config.chunksize;
3042        doc_kv.keylen = doc->keylen + size_chunk;
3043        doc_kv.key = alca(uint8_t, doc_kv.keylen);
3044        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
3045        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
3046    }
3047
3048    if (!handle->shandle) {
3049        fdb_check_file_reopen(handle, NULL);
3050        txn = handle->fhandle->root->txn;
3051        if (!txn) {
3052            txn = &handle->file->global_txn;
3053        }
3054    } else {
3055        txn = handle->shandle->snap_txn;
3056    }
3057
3058    cmp_info.kvs_config = handle->kvs_config;
3059    cmp_info.kvs = handle->kvs;
3060    wal_file = handle->file;
3061    dhandle = handle->dhandle;
3062
3063    if (handle->kvs) {
3064        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, &doc_kv,
3065                      &offset);
3066    } else {
3067        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, doc, &offset);
3068    }
3069
3070    if (!handle->shandle) {
3071        fdb_sync_db_header(handle);
3072    }
3073    atomic_incr_uint64_t(&handle->op_stats->num_gets, std::memory_order_relaxed);
3074
3075    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
3076        _fdb_sync_dirty_root(handle);
3077
3078        if (handle->kvs) {
3079            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
3080                             (void *)&offset);
3081        } else {
3082            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
3083                             (void *)&offset);
3084        }
3085        btreeblk_end(handle->bhandle);
3086        offset = _endian_decode(offset);
3087
3088        _fdb_release_dirty_root(handle);
3089    }
3090
3091    if ((wr == FDB_RESULT_SUCCESS && offset != BLK_NOT_FOUND) ||
3092         hr == HBTRIE_RESULT_SUCCESS) {
3093        if (handle->kvs) {
3094            _doc.key = doc_kv.key;
3095            _doc.length.keylen = doc_kv.keylen;
3096        } else {
3097            _doc.key = doc->key;
3098            _doc.length.keylen = doc->keylen;
3099        }
3100        bool alloced_meta = doc->meta ? false : true;
3101        _doc.meta = doc->meta;
3102        _doc.body = doc->body;
3103
3104        int64_t body_offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
3105                                                       true);
3106        if (body_offset <= 0){
3107            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3108            return body_offset < 0 ? (fdb_status)body_offset : FDB_RESULT_KEY_NOT_FOUND;
3109        }
3110
3111        if (_doc.length.keylen != doc_kv.keylen) {
3112            free_docio_object(&_doc, 0, alloced_meta, 0);
3113            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3114            return FDB_RESULT_KEY_NOT_FOUND;
3115        }
3116
3117        doc->seqnum = _doc.seqnum;
3118        doc->metalen = _doc.length.metalen;
3119        doc->bodylen = _doc.length.bodylen;
3120        doc->meta = _doc.meta;
3121        doc->body = _doc.body;
3122        doc->deleted = _doc.length.flag & DOCIO_DELETED;
3123        doc->size_ondisk = _fdb_get_docsize(_doc.length);
3124        doc->offset = offset;
3125
3126        LATENCY_STAT_END(handle->file, FDB_LATENCY_GETS);
3127        atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3128        return FDB_RESULT_SUCCESS;
3129    }
3130
3131    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3132    return FDB_RESULT_KEY_NOT_FOUND;
3133}
3134
3135// search document using sequence number
3136LIBFDB_API
3137fdb_status fdb_get_byseq(fdb_kvs_handle *handle, fdb_doc *doc)
3138{
3139    uint64_t offset;
3140    int64_t _offset;
3141    struct docio_object _doc;
3142    struct docio_handle *dhandle;
3143    struct filemgr *wal_file = NULL;
3144    fdb_status wr;
3145    btree_result br = BTREE_RESULT_FAIL;
3146    fdb_seqnum_t _seqnum;
3147    fdb_txn *txn;
3148    struct _fdb_key_cmp_info cmp_info;
3149    LATENCY_STAT_START();
3150
3151    if (!handle || !doc || doc->seqnum == SEQNUM_NOT_USED) {
3152        return FDB_RESULT_INVALID_ARGS;
3153    }
3154
3155    // Sequence trees are a must for byseq operations
3156    if (handle->config.seqtree_opt != FDB_SEQTREE_USE) {
3157        return FDB_RESULT_INVALID_CONFIG;
3158    }
3159
3160    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
3161        return FDB_RESULT_HANDLE_BUSY;
3162    }
3163
3164    if (!handle->shandle) {
3165        fdb_check_file_reopen(handle, NULL);
3166
3167        txn = handle->fhandle->root->txn;
3168        if (!txn) {
3169            txn = &handle->file->global_txn;
3170        }
3171    } else {
3172        txn = handle->shandle->snap_txn;
3173    }
3174
3175    cmp_info.kvs_config = handle->kvs_config;
3176    cmp_info.kvs = handle->kvs;
3177    wal_file = handle->file;
3178    dhandle = handle->dhandle;
3179
3180    // prevent searching by key in WAL if 'doc' is not empty
3181    size_t key_len = doc->keylen;
3182    doc->keylen = 0;
3183    if (handle->kvs) {
3184        wr = wal_find_kv_id(txn, wal_file, handle->kvs->id, &cmp_info,
3185                            handle->shandle, doc, &offset);
3186    } else {
3187        wr = wal_find(tx