xref: /4.0.0/forestdb/src/forestdb.cc (revision 17f72b63)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <sys/time.h>
25#endif
26
27#include "libforestdb/forestdb.h"
28#include "fdb_internal.h"
29#include "filemgr.h"
30#include "hbtrie.h"
31#include "list.h"
32#include "btree.h"
33#include "btree_kv.h"
34#include "btree_var_kv_ops.h"
35#include "docio.h"
36#include "btreeblock.h"
37#include "common.h"
38#include "wal.h"
39#include "snapshot.h"
40#include "filemgr_ops.h"
41#include "configuration.h"
42#include "internal_types.h"
43#include "compactor.h"
44#include "memleak.h"
45#include "time_utils.h"
46#include "system_resource_stats.h"
47
48#ifdef __DEBUG
49#ifndef __DEBUG_FDB
50    #undef DBG
51    #undef DBGCMD
52    #undef DBGSW
53    #define DBG(...)
54    #define DBGCMD(...)
55    #define DBGSW(n, ...)
56#endif
57#endif
58
59#ifdef _TRACE_HANDLES
60struct avl_tree open_handles;
61static spin_t open_handle_lock;
62static int _fdb_handle_cmp(struct avl_node *a, struct avl_node *b, void *aux)
63{
64    struct _fdb_kvs_handle *aa, *bb;
65    aa = _get_entry(a, struct _fdb_kvs_handle, avl_trace);
66    bb = _get_entry(b, struct _fdb_kvs_handle, avl_trace);
67    return (aa > bb) ? 1 : -1;
68}
69#endif
70
71static volatile uint8_t fdb_initialized = 0;
72static volatile uint8_t fdb_open_inprog = 0;
73#ifdef SPIN_INITIALIZER
74static spin_t initial_lock = SPIN_INITIALIZER;
75#else
76static volatile unsigned int initial_lock_status = 0;
77static spin_t initial_lock;
78#endif
79
80static fdb_status _fdb_wal_snapshot_func(void *handle, fdb_doc *doc,
81                                         uint64_t offset);
82
83INLINE int _cmp_uint64_t_endian_safe(void *key1, void *key2, void *aux)
84{
85    (void) aux;
86    uint64_t a,b;
87    a = *(uint64_t*)key1;
88    b = *(uint64_t*)key2;
89    a = _endian_decode(a);
90    b = _endian_decode(b);
91    return _CMP_U64(a, b);
92}
93
94size_t _fdb_readkey_wrap(void *handle, uint64_t offset, void *buf)
95{
96    keylen_t keylen;
97    offset = _endian_decode(offset);
98    docio_read_doc_key((struct docio_handle *)handle, offset, &keylen, buf);
99    return keylen;
100}
101
102size_t _fdb_readseq_wrap(void *handle, uint64_t offset, void *buf)
103{
104    int size_id, size_seq, size_chunk;
105    fdb_seqnum_t _seqnum;
106    struct docio_object doc;
107    struct docio_handle *dhandle = (struct docio_handle *)handle;
108
109    size_id = sizeof(fdb_kvs_id_t);
110    size_seq = sizeof(fdb_seqnum_t);
111    size_chunk = dhandle->file->config->chunksize;
112    memset(&doc, 0, sizeof(struct docio_object));
113
114    offset = _endian_decode(offset);
115    docio_read_doc_key_meta((struct docio_handle *)handle, offset, &doc,
116                            true);
117    buf2buf(size_chunk, doc.key, size_id, buf);
118    _seqnum = _endian_encode(doc.seqnum);
119    memcpy((uint8_t*)buf + size_id, &_seqnum, size_seq);
120
121    free(doc.key);
122    free(doc.meta);
123
124    return size_id + size_seq;
125}
126
127int _fdb_custom_cmp_wrap(void *key1, void *key2, void *aux)
128{
129    int is_key1_inf, is_key2_inf;
130    uint8_t *keystr1 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
131    uint8_t *keystr2 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
132    size_t keylen1, keylen2;
133    btree_cmp_args *args = (btree_cmp_args *)aux;
134    fdb_custom_cmp_variable cmp = (fdb_custom_cmp_variable)args->aux;
135
136    is_key1_inf = _is_inf_key(key1);
137    is_key2_inf = _is_inf_key(key2);
138    if (is_key1_inf && is_key2_inf) { // both are infinite
139        return 0;
140    } else if (!is_key1_inf && is_key2_inf) { // key2 is infinite
141        return -1;
142    } else if (is_key1_inf && !is_key2_inf) { // key1 is infinite
143        return 1;
144    }
145
146    _get_var_key(key1, (void*)keystr1, &keylen1);
147    _get_var_key(key2, (void*)keystr2, &keylen2);
148
149    if (keylen1 == 0 && keylen2 == 0) {
150        return 0;
151    } else if (keylen1 ==0 && keylen2 > 0) {
152        return -1;
153    } else if (keylen1 > 0 && keylen2 == 0) {
154        return 1;
155    }
156
157    return cmp(keystr1, keylen1, keystr2, keylen2);
158}
159
160void fdb_fetch_header(void *header_buf,
161                      bid_t *trie_root_bid,
162                      bid_t *seq_root_bid,
163                      uint64_t *ndocs,
164                      uint64_t *nlivenodes,
165                      uint64_t *datasize,
166                      uint64_t *last_wal_flush_hdr_bid,
167                      uint64_t *kv_info_offset,
168                      uint64_t *header_flags,
169                      char **new_filename,
170                      char **old_filename)
171{
172    size_t offset = 0;
173    uint16_t new_filename_len;
174    uint16_t old_filename_len;
175
176    seq_memcpy(trie_root_bid, (uint8_t *)header_buf + offset,
177               sizeof(bid_t), offset);
178    *trie_root_bid = _endian_decode(*trie_root_bid);
179
180    seq_memcpy(seq_root_bid, (uint8_t *)header_buf + offset,
181               sizeof(bid_t), offset);
182    *seq_root_bid = _endian_decode(*seq_root_bid);
183
184    seq_memcpy(ndocs, (uint8_t *)header_buf + offset,
185               sizeof(uint64_t), offset);
186    *ndocs = _endian_decode(*ndocs);
187
188    seq_memcpy(nlivenodes, (uint8_t *)header_buf + offset,
189               sizeof(uint64_t), offset);
190    *nlivenodes = _endian_decode(*nlivenodes);
191
192    seq_memcpy(datasize, (uint8_t *)header_buf + offset,
193               sizeof(uint64_t), offset);
194    *datasize = _endian_decode(*datasize);
195
196    seq_memcpy(last_wal_flush_hdr_bid, (uint8_t *)header_buf + offset,
197               sizeof(uint64_t), offset);
198    *last_wal_flush_hdr_bid = _endian_decode(*last_wal_flush_hdr_bid);
199
200    seq_memcpy(kv_info_offset, (uint8_t *)header_buf + offset,
201               sizeof(uint64_t), offset);
202    *kv_info_offset = _endian_decode(*kv_info_offset);
203
204    seq_memcpy(header_flags, (uint8_t *)header_buf + offset,
205               sizeof(uint64_t), offset);
206    *header_flags = _endian_decode(*header_flags);
207
208    seq_memcpy(&new_filename_len, (uint8_t *)header_buf + offset,
209               sizeof(new_filename_len), offset);
210    new_filename_len = _endian_decode(new_filename_len);
211    seq_memcpy(&old_filename_len, (uint8_t *)header_buf + offset,
212               sizeof(old_filename_len), offset);
213    old_filename_len = _endian_decode(old_filename_len);
214    if (new_filename_len) {
215        *new_filename = (char*)((uint8_t *)header_buf + offset);
216    } else {
217        *new_filename = NULL;
218    }
219    offset += new_filename_len;
220    if (old_filename && old_filename_len) {
221        *old_filename = (char *) malloc(old_filename_len);
222        seq_memcpy(*old_filename,
223                   (uint8_t *)header_buf + offset,
224                   old_filename_len, offset);
225    }
226}
227
228typedef enum {
229    FDB_RESTORE_NORMAL,
230    FDB_RESTORE_KV_INS,
231} fdb_restore_mode_t;
232
233INLINE void _fdb_restore_wal(fdb_kvs_handle *handle,
234                             fdb_restore_mode_t mode,
235                             bid_t hdr_bid,
236                             fdb_kvs_id_t kv_id_req)
237{
238    struct filemgr *file = handle->file;
239    uint32_t blocksize = handle->file->blocksize;
240    uint64_t last_wal_flush_hdr_bid = handle->last_wal_flush_hdr_bid;
241    uint64_t hdr_off = hdr_bid * FDB_BLOCKSIZE;
242    uint64_t offset = 0; //assume everything from first block needs restoration
243    err_log_callback *log_callback;
244
245    if (!hdr_off) { // Nothing to do if we don't have a header block offset
246        return;
247    }
248
249    if (last_wal_flush_hdr_bid != BLK_NOT_FOUND) {
250        offset = (last_wal_flush_hdr_bid + 1) * blocksize;
251    }
252
253    // If a valid last header was retrieved and it matches the current header
254    // OR if WAL already had entries populated, then no crash recovery needed
255    if (hdr_off <= offset ||
256        (!handle->shandle && wal_get_size(file) &&
257            mode != FDB_RESTORE_KV_INS)) {
258        return;
259    }
260
261    // Temporarily disable the error logging callback as there are false positive
262    // checksum errors in docio_read_doc.
263    // TODO: Need to adapt docio_read_doc to separate false checksum errors.
264    log_callback = handle->dhandle->log_callback;
265    handle->dhandle->log_callback = NULL;
266
267    filemgr_mutex_lock(file);
268    for (; offset < hdr_off;
269        offset = ((offset / blocksize) + 1) * blocksize) { // next block's off
270        if (!docio_check_buffer(handle->dhandle, offset / blocksize)) {
271            continue;
272        } else {
273            do {
274                struct docio_object doc;
275                uint64_t _offset;
276                uint64_t doc_offset;
277                memset(&doc, 0, sizeof(doc));
278                _offset = docio_read_doc(handle->dhandle, offset, &doc, true);
279                if (_offset == offset) { // reached unreadable doc, skip block
280                    break;
281                }
282                if (doc.key || (doc.length.flag & DOCIO_TXN_COMMITTED)) {
283                    // check if the doc is transactional or not, and
284                    // also check if the doc contains system info
285                    if (!(doc.length.flag & DOCIO_TXN_DIRTY) &&
286                        !(doc.length.flag & DOCIO_SYSTEM)) {
287                        if (doc.length.flag & DOCIO_TXN_COMMITTED) {
288                            // commit mark .. read doc offset
289                            doc_offset = doc.doc_offset;
290                            // read the previously skipped doc
291                            docio_read_doc(handle->dhandle, doc_offset, &doc, true);
292                            if (doc.key == NULL) { // doc read error
293                                free(doc.meta);
294                                free(doc.body);
295                                offset = _offset;
296                                continue;
297                            }
298                        } else {
299                            doc_offset = offset;
300                        }
301
302                        // If say a snapshot is taken on a db handle after
303                        // rollback, then skip WAL items after rollback point
304                        if (handle->config.seqtree_opt == FDB_SEQTREE_USE &&
305                            (mode == FDB_RESTORE_KV_INS || !handle->kvs) &&
306                            doc.seqnum > handle->seqnum) {
307                            free(doc.key);
308                            free(doc.meta);
309                            free(doc.body);
310                            offset = _offset;
311                            continue;
312                        }
313
314                        // restore document
315                        fdb_doc wal_doc;
316                        wal_doc.keylen = doc.length.keylen;
317                        wal_doc.bodylen = doc.length.bodylen;
318                        wal_doc.key = doc.key;
319                        wal_doc.seqnum = doc.seqnum;
320                        wal_doc.deleted = doc.length.flag & DOCIO_DELETED;
321
322                        if (!handle->shandle) {
323                            wal_doc.metalen = doc.length.metalen;
324                            wal_doc.meta = doc.meta;
325                            wal_doc.size_ondisk = _fdb_get_docsize(doc.length);
326
327                            if (handle->kvs) {
328                                // check seqnum before insert
329                                fdb_kvs_id_t kv_id;
330                                fdb_seqnum_t kv_seqnum;
331                                buf2kvid(handle->config.chunksize,
332                                         wal_doc.key, &kv_id);
333
334                                if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
335                                    kv_seqnum = fdb_kvs_get_seqnum(handle->file, kv_id);
336                                } else {
337                                    kv_seqnum = SEQNUM_NOT_USED;
338                                }
339                                if (doc.seqnum <= kv_seqnum &&
340                                        ((mode == FDB_RESTORE_KV_INS &&
341                                            kv_id == kv_id_req) ||
342                                         (mode == FDB_RESTORE_NORMAL)) ) {
343                                    // if mode is NORMAL, restore all items
344                                    // if mode is KV_INS, restore items matching ID
345                                    wal_insert(&file->global_txn, file,
346                                               &wal_doc, doc_offset, 0);
347                                }
348                            } else {
349                                wal_insert(&file->global_txn, file,
350                                           &wal_doc, doc_offset, 0);
351                            }
352                            if (doc.key) free(doc.key);
353                        } else {
354                            // snapshot
355                            if (handle->kvs) {
356                                fdb_kvs_id_t kv_id;
357                                buf2kvid(handle->config.chunksize,
358                                         wal_doc.key, &kv_id);
359                                if (kv_id == handle->kvs->id) {
360                                    // snapshot: insert ID matched documents only
361                                    snap_insert(handle->shandle,
362                                                &wal_doc, doc_offset);
363                                } else {
364                                    free(doc.key);
365                                }
366                            } else {
367                                snap_insert(handle->shandle, &wal_doc, doc_offset);
368                            }
369                        }
370                        free(doc.meta);
371                        free(doc.body);
372                        offset = _offset;
373                    } else {
374                        // skip transactional document or system document
375                        free(doc.key);
376                        free(doc.meta);
377                        free(doc.body);
378                        offset = _offset;
379                        // do not break.. read next doc
380                    }
381                } else {
382                    free(doc.key);
383                    free(doc.meta);
384                    free(doc.body);
385                    offset = _offset;
386                    break;
387                }
388            } while (offset + sizeof(struct docio_length) < hdr_off);
389        }
390    }
391    // wal commit
392    if (!handle->shandle) {
393        wal_commit(&file->global_txn, file, NULL, &handle->log_callback);
394    }
395    filemgr_mutex_unlock(file);
396    handle->dhandle->log_callback = log_callback;
397}
398
399INLINE fdb_status _fdb_recover_compaction(fdb_kvs_handle *handle,
400                                          const char *new_filename)
401{
402    fdb_kvs_handle new_db;
403    fdb_config config = handle->config;
404    struct filemgr *new_file;
405
406    memset(&new_db, 0, sizeof(new_db));
407    new_db.log_callback.callback = handle->log_callback.callback;
408    new_db.log_callback.ctx_data = handle->log_callback.ctx_data;
409    config.flags |= FDB_OPEN_FLAG_RDONLY;
410    new_db.fhandle = handle->fhandle;
411    new_db.kvs_config = handle->kvs_config;
412    fdb_status status = _fdb_open(&new_db, new_filename,
413                                  FDB_AFILENAME, &config);
414    if (status != FDB_RESULT_SUCCESS) {
415        return fdb_log(&handle->log_callback, status,
416                       "Error in opening a partially compacted file '%s' for recovery.",
417                       new_filename);
418    }
419
420    new_file = new_db.file;
421
422    if (new_file->old_filename &&
423        !strncmp(new_file->old_filename, handle->file->filename,
424                 FDB_MAX_FILENAME_LEN)) {
425        struct filemgr *old_file = handle->file;
426        // If new file has a recorded old_filename then it means that
427        // compaction has completed successfully. Mark self for deletion
428        filemgr_mutex_lock(new_file);
429
430        status = btreeblk_end(handle->bhandle);
431        if (status != FDB_RESULT_SUCCESS) {
432            filemgr_mutex_unlock(new_file);
433            _fdb_close(&new_db);
434            return status;
435        }
436        btreeblk_free(handle->bhandle);
437        free(handle->bhandle);
438        handle->bhandle = new_db.bhandle;
439
440        docio_free(handle->dhandle);
441        free(handle->dhandle);
442        handle->dhandle = new_db.dhandle;
443
444        hbtrie_free(handle->trie);
445        free(handle->trie);
446        handle->trie = new_db.trie;
447
448        wal_shutdown(handle->file);
449        handle->file = new_file;
450
451        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
452            if (handle->kvs) {
453                // multi KV instance mode
454                hbtrie_free(handle->seqtrie);
455                free(handle->seqtrie);
456                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
457                    handle->seqtrie = new_db.seqtrie;
458                }
459            } else {
460                free(handle->seqtree->kv_ops);
461                free(handle->seqtree);
462                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
463                    handle->seqtree = new_db.seqtree;
464                }
465            }
466        }
467
468        filemgr_mutex_unlock(new_file);
469        if (new_db.kvs) {
470            fdb_kvs_info_free(&new_db);
471        }
472        // remove self: WARNING must not close this handle if snapshots
473        // are yet to open this file
474        filemgr_remove_pending(old_file, new_db.file);
475        filemgr_close(old_file, 0, handle->filename, &handle->log_callback);
476        free(new_db.filename);
477        return FDB_RESULT_FAIL_BY_COMPACTION;
478    }
479
480    // As the new file is partially compacted, it should be removed upon close.
481    // Just in-case the new file gets opened before removal, point it to the old
482    // file to ensure availability of data.
483    filemgr_remove_pending(new_db.file, handle->file);
484    _fdb_close(&new_db);
485
486    return FDB_RESULT_SUCCESS;
487}
488
489LIBFDB_API
490fdb_status fdb_init(fdb_config *config)
491{
492    fdb_config _config;
493    compactor_config c_config;
494    struct filemgr_config f_config;
495
496    if (config) {
497        if (validate_fdb_config(config)) {
498            _config = *config;
499        } else {
500            return FDB_RESULT_INVALID_CONFIG;
501        }
502    } else {
503        _config = get_default_config();
504    }
505
506    // global initialization
507    // initialized only once at first time
508    if (!fdb_initialized) {
509#ifdef _TRACE_HANDLES
510        spin_init(&open_handle_lock);
511        avl_init(&open_handles, NULL);
512#endif
513
514#ifndef SPIN_INITIALIZER
515        // Note that only Windows passes through this routine
516        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
517            // atomically initialize spin lock only once
518            spin_init(&initial_lock);
519            initial_lock_status = 2;
520        } else {
521            // the others .. wait until initializing 'initial_lock' is done
522            while (initial_lock_status != 2) {
523                Sleep(1);
524            }
525        }
526#endif
527
528    }
529    spin_lock(&initial_lock);
530    if (!fdb_initialized) {
531        double ram_size = (double) get_memory_size();
532        if (ram_size * BCACHE_MEMORY_THRESHOLD < (double) _config.buffercache_size) {
533            spin_unlock(&initial_lock);
534            return FDB_RESULT_TOO_BIG_BUFFER_CACHE;
535        }
536        // initialize file manager and block cache
537        f_config.blocksize = _config.blocksize;
538        f_config.ncacheblock = _config.buffercache_size / _config.blocksize;
539        filemgr_init(&f_config);
540        filemgr_set_lazy_file_deletion(true,
541                                       compactor_register_file_removing,
542                                       compactor_is_file_removed);
543
544        // initialize compaction daemon
545        c_config.sleep_duration = _config.compactor_sleep_duration;
546        c_config.num_threads = _config.num_compactor_threads;
547        compactor_init(&c_config);
548
549        fdb_initialized = 1;
550    }
551    fdb_open_inprog++;
552    spin_unlock(&initial_lock);
553
554    return FDB_RESULT_SUCCESS;
555}
556
557LIBFDB_API
558fdb_config fdb_get_default_config(void) {
559    return get_default_config();
560}
561
562LIBFDB_API
563fdb_kvs_config fdb_get_default_kvs_config(void) {
564    return get_default_kvs_config();
565}
566
567LIBFDB_API
568fdb_status fdb_open(fdb_file_handle **ptr_fhandle,
569                    const char *filename,
570                    fdb_config *fconfig)
571{
572#ifdef _MEMPOOL
573    mempool_init();
574#endif
575
576    fdb_config config;
577    fdb_file_handle *fhandle;
578    fdb_kvs_handle *handle;
579
580    if (fconfig) {
581        if (validate_fdb_config(fconfig)) {
582            config = *fconfig;
583        } else {
584            return FDB_RESULT_INVALID_CONFIG;
585        }
586    } else {
587        config = get_default_config();
588    }
589
590    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
591    if (!fhandle) { // LCOV_EXCL_START
592        return FDB_RESULT_ALLOC_FAIL;
593    } // LCOV_EXCL_STOP
594
595    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
596    if (!handle) { // LCOV_EXCL_START
597        free(fhandle);
598        return FDB_RESULT_ALLOC_FAIL;
599    } // LCOV_EXCL_STOP
600
601    atomic_init_uint8_t(&handle->handle_busy, 0);
602    handle->shandle = NULL;
603    handle->kvs_config = get_default_kvs_config();
604
605    fdb_status fs = fdb_init(fconfig);
606    if (fs != FDB_RESULT_SUCCESS) {
607        free(handle);
608        free(fhandle);
609        return fs;
610    }
611    fdb_file_handle_init(fhandle, handle);
612
613    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
614    if (fs == FDB_RESULT_SUCCESS) {
615        *ptr_fhandle = fhandle;
616    } else {
617        *ptr_fhandle = NULL;
618        free(handle);
619        fdb_file_handle_free(fhandle);
620    }
621    spin_lock(&initial_lock);
622    fdb_open_inprog--;
623    spin_unlock(&initial_lock);
624    return fs;
625}
626
627LIBFDB_API
628fdb_status fdb_open_custom_cmp(fdb_file_handle **ptr_fhandle,
629                               const char *filename,
630                               fdb_config *fconfig,
631                               size_t num_functions,
632                               char **kvs_names,
633                               fdb_custom_cmp_variable *functions)
634{
635#ifdef _MEMPOOL
636    mempool_init();
637#endif
638
639    fdb_config config;
640    fdb_file_handle *fhandle;
641    fdb_kvs_handle *handle;
642
643    if (fconfig) {
644        if (validate_fdb_config(fconfig)) {
645            config = *fconfig;
646        } else {
647            return FDB_RESULT_INVALID_CONFIG;
648        }
649    } else {
650        config = get_default_config();
651    }
652
653    if (config.multi_kv_instances == false) {
654        // single KV instance mode does not support customized cmp function
655        return FDB_RESULT_INVALID_CONFIG;
656    }
657
658    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
659    if (!fhandle) { // LCOV_EXCL_START
660        return FDB_RESULT_ALLOC_FAIL;
661    } // LCOV_EXCL_STOP
662
663    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
664    if (!handle) { // LCOV_EXCL_START
665        free(fhandle);
666        return FDB_RESULT_ALLOC_FAIL;
667    } // LCOV_EXCL_STOP
668
669    atomic_init_uint8_t(&handle->handle_busy, 0);
670    handle->shandle = NULL;
671    handle->kvs_config = get_default_kvs_config();
672
673    fdb_status fs = fdb_init(fconfig);
674    if (fs != FDB_RESULT_SUCCESS) {
675        free(handle);
676        free(fhandle);
677        return fs;
678    }
679    fdb_file_handle_init(fhandle, handle);
680
681    // insert kvs_names and functions into fhandle's list
682    fdb_file_handle_parse_cmp_func(fhandle, num_functions,
683                                   kvs_names, functions);
684
685    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
686    if (fs == FDB_RESULT_SUCCESS) {
687        *ptr_fhandle = fhandle;
688    } else {
689        *ptr_fhandle = NULL;
690        free(handle);
691        fdb_file_handle_free(fhandle);
692    }
693    spin_lock(&initial_lock);
694    fdb_open_inprog--;
695    spin_unlock(&initial_lock);
696    return fs;
697}
698
699fdb_status fdb_open_for_compactor(fdb_file_handle **ptr_fhandle,
700                                  const char *filename,
701                                  fdb_config *fconfig,
702                                  struct list *cmp_func_list)
703{
704#ifdef _MEMPOOL
705    mempool_init();
706#endif
707
708    fdb_file_handle *fhandle;
709    fdb_kvs_handle *handle;
710
711    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
712    if (!fhandle) { // LCOV_EXCL_START
713        return FDB_RESULT_ALLOC_FAIL;
714    } // LCOV_EXCL_STOP
715
716    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
717    if (!handle) { // LCOV_EXCL_START
718        free(fhandle);
719        return FDB_RESULT_ALLOC_FAIL;
720    } // LCOV_EXCL_STOP
721
722    atomic_init_uint8_t(&handle->handle_busy, 0);
723    handle->shandle = NULL;
724
725    fdb_file_handle_init(fhandle, handle);
726    if (cmp_func_list) {
727        fdb_file_handle_clone_cmp_func_list(fhandle, cmp_func_list);
728    }
729    fdb_status fs = _fdb_open(handle, filename, FDB_VFILENAME, fconfig);
730    if (fs == FDB_RESULT_SUCCESS) {
731        *ptr_fhandle = fhandle;
732    } else {
733        *ptr_fhandle = NULL;
734        free(handle);
735        fdb_file_handle_free(fhandle);
736    }
737    return fs;
738}
739
740LIBFDB_API
741fdb_status fdb_snapshot_open(fdb_kvs_handle *handle_in,
742                             fdb_kvs_handle **ptr_handle, fdb_seqnum_t seqnum)
743{
744#ifdef _MEMPOOL
745    mempool_init();
746#endif
747
748    fdb_config config = handle_in->config;
749    fdb_kvs_config kvs_config = handle_in->kvs_config;
750    fdb_kvs_handle *handle;
751    fdb_status fs;
752    filemgr *file;
753    file_status_t fstatus = FILE_NORMAL;
754
755    if (!handle_in || !ptr_handle) {
756        return FDB_RESULT_INVALID_ARGS;
757    }
758
759    // Sequence trees are a must for snapshot creation
760    if (handle_in->config.seqtree_opt != FDB_SEQTREE_USE) {
761        return FDB_RESULT_INVALID_CONFIG;
762    }
763
764fdb_snapshot_open_start:
765    if (!handle_in->shandle) {
766        fdb_check_file_reopen(handle_in, &fstatus);
767        fdb_sync_db_header(handle_in);
768        file = handle_in->file;
769
770        if (handle_in->kvs && handle_in->kvs->type == KVS_SUB) {
771            handle_in->seqnum = fdb_kvs_get_seqnum(file,
772                                                   handle_in->kvs->id);
773        } else {
774            handle_in->seqnum = filemgr_get_seqnum(file);
775        }
776    } else {
777        file = handle_in->file;
778    }
779
780    // if the max sequence number seen by this handle is lower than the
781    // requested snapshot marker, it means the snapshot is not yet visible
782    // even via the current fdb_kvs_handle
783    if (seqnum != FDB_SNAPSHOT_INMEM && seqnum > handle_in->seqnum) {
784        return FDB_RESULT_NO_DB_INSTANCE;
785    }
786
787    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
788    if (!handle) { // LCOV_EXCL_START
789        return FDB_RESULT_ALLOC_FAIL;
790    } // LCOV_EXCL_STOP
791
792    atomic_init_uint8_t(&handle->handle_busy, 0);
793    handle->log_callback = handle_in->log_callback;
794    handle->max_seqnum = seqnum;
795    handle->fhandle = handle_in->fhandle;
796
797    config.flags |= FDB_OPEN_FLAG_RDONLY;
798    // do not perform compaction for snapshot
799    config.compaction_mode = FDB_COMPACTION_MANUAL;
800
801    // If cloning an existing snapshot handle, then rewind indexes
802    // to its last DB header and point its avl tree to existing snapshot's tree
803    bool clone_snapshot = false;
804    if (handle_in->shandle) {
805        handle->last_hdr_bid = handle_in->last_hdr_bid; // do fast rewind
806        if (snap_clone(handle_in->shandle, handle_in->max_seqnum,
807                   &handle->shandle, seqnum) == FDB_RESULT_SUCCESS) {
808            handle->max_seqnum = FDB_SNAPSHOT_INMEM; // temp value to skip WAL
809            clone_snapshot = true;
810        }
811    }
812
813    if (!handle->shandle) {
814        handle->shandle = (struct snap_handle *) calloc(1, sizeof(snap_handle));
815        if (!handle->shandle) { // LCOV_EXCL_START
816            free(handle);
817            return FDB_RESULT_ALLOC_FAIL;
818        } // LCOV_EXCL_STOP
819        snap_init(handle->shandle, handle_in);
820    }
821
822    if (handle_in->kvs) {
823        // sub-handle in multi KV instance mode
824        if (clone_snapshot) {
825            fs = _fdb_kvs_clone_snapshot(handle_in, handle);
826        } else {
827            fs = _fdb_kvs_open(handle_in->kvs->root,
828                              &config, &kvs_config, file,
829                              file->filename,
830                              _fdb_kvs_get_name(handle_in, file),
831                              handle);
832        }
833    } else {
834        if (clone_snapshot) {
835            fs = _fdb_clone_snapshot(handle_in, handle);
836        } else {
837            fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
838        }
839    }
840
841    if (fs == FDB_RESULT_SUCCESS) {
842        if (seqnum == FDB_SNAPSHOT_INMEM &&
843            !handle_in->shandle) {
844            fdb_seqnum_t upto_seq = seqnum;
845            // In-memory snapshot
846            wal_snapshot(handle->file, (void *)handle->shandle,
847                         handle_in->txn, &upto_seq, _fdb_wal_snapshot_func);
848            // set seqnum based on handle type (multikv or default)
849            if (handle_in->kvs && handle_in->kvs->id > 0) {
850                handle->max_seqnum =
851                    _fdb_kvs_get_seqnum(handle->file->kv_header,
852                                        handle_in->kvs->id);
853            } else {
854                handle->max_seqnum = filemgr_get_seqnum(handle->file);
855            }
856
857            // synchronize dirty root nodes if exist
858            if (filemgr_dirty_root_exist(handle->file)) {
859                bid_t dirty_idtree_root, dirty_seqtree_root;
860                filemgr_mutex_lock(handle->file);
861                filemgr_get_dirty_root(handle->file,
862                                       &dirty_idtree_root, &dirty_seqtree_root);
863                if (dirty_idtree_root != BLK_NOT_FOUND) {
864                    handle->trie->root_bid = dirty_idtree_root;
865                }
866                if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
867                    if (dirty_seqtree_root != BLK_NOT_FOUND) {
868                        if (handle->kvs) {
869                            handle->seqtrie->root_bid = dirty_seqtree_root;
870                        } else {
871                            btree_init_from_bid(handle->seqtree,
872                                                handle->seqtree->blk_handle,
873                                                handle->seqtree->blk_ops,
874                                                handle->seqtree->kv_ops,
875                                                handle->seqtree->blksize,
876                                                dirty_seqtree_root);
877                        }
878                    }
879                }
880                btreeblk_discard_blocks(handle->bhandle);
881                btreeblk_create_dirty_snapshot(handle->bhandle);
882                filemgr_mutex_unlock(handle->file);
883            }
884        } else if (clone_snapshot) {
885            // Snapshot is created on the other snapshot handle
886
887            handle->max_seqnum = handle_in->seqnum;
888
889            if (seqnum == FDB_SNAPSHOT_INMEM) {
890                // in-memory snapshot
891                // Clone dirty root nodes from the source snapshot by incrementing
892                // their ref counters
893                handle->trie->root_bid = handle_in->trie->root_bid;
894                if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
895                    if (handle->kvs) {
896                        handle->seqtrie->root_bid = handle_in->seqtrie->root_bid;
897                    } else {
898                        handle->seqtree->root_bid = handle_in->seqtree->root_bid;
899                    }
900                }
901                btreeblk_discard_blocks(handle->bhandle);
902                btreeblk_clone_dirty_snapshot(handle->bhandle,
903                                              handle_in->bhandle);
904            }
905        }
906        *ptr_handle = handle;
907    } else {
908        *ptr_handle = NULL;
909        snap_close(handle->shandle);
910        free(handle);
911        // If compactor thread had finished compaction just before this routine
912        // calls _fdb_open, then it is possible that the snapshot's DB header
913        // is only present in the new_file. So we must retry the snapshot
914        // open attempt IFF _fdb_open indicates FDB_RESULT_NO_DB_INSTANCE..
915        if (fs == FDB_RESULT_NO_DB_INSTANCE && fstatus == FILE_COMPACT_OLD) {
916            if (filemgr_get_file_status(file) == FILE_REMOVED_PENDING) {
917                goto fdb_snapshot_open_start;
918            }
919        }
920    }
921    return fs;
922}
923
924static fdb_status _fdb_reset(fdb_kvs_handle *handle, fdb_kvs_handle *handle_in);
925
926LIBFDB_API
927fdb_status fdb_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
928{
929#ifdef _MEMPOOL
930    mempool_init();
931#endif
932
933    fdb_config config;
934    fdb_kvs_handle *handle_in, *handle;
935    fdb_status fs;
936    fdb_seqnum_t old_seqnum;
937
938    if (!handle_ptr) {
939        return FDB_RESULT_INVALID_ARGS;
940    }
941
942    handle_in = *handle_ptr;
943    config = handle_in->config;
944
945    if (handle_in->kvs) {
946        return fdb_kvs_rollback(handle_ptr, seqnum);
947    }
948
949    // Sequence trees are a must for rollback
950    if (handle_in->config.seqtree_opt != FDB_SEQTREE_USE) {
951        return FDB_RESULT_INVALID_CONFIG;
952    }
953
954    if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
955        return fdb_log(&handle_in->log_callback, FDB_RESULT_RONLY_VIOLATION,
956                       "Warning: Rollback is not allowed on the read-only DB file '%s'.",
957                       handle_in->file->filename);
958    }
959
960    if (!atomic_cas_uint8_t(&handle_in->handle_busy, 0, 1)) {
961        return FDB_RESULT_HANDLE_BUSY;
962    }
963
964    filemgr_mutex_lock(handle_in->file);
965    filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
966    // All transactions should be closed before rollback
967    if (wal_txn_exists(handle_in->file)) {
968        filemgr_set_rollback(handle_in->file, 0);
969        filemgr_mutex_unlock(handle_in->file);
970        fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
971        return FDB_RESULT_FAIL_BY_TRANSACTION;
972    }
973
974    // If compaction is running, wait until it is aborted.
975    // TODO: Find a better way of waiting for the compaction abortion.
976    unsigned int sleep_time = 10000; // 10 ms.
977    file_status_t fstatus = filemgr_get_file_status(handle_in->file);
978    while (fstatus == FILE_COMPACT_OLD) {
979        filemgr_mutex_unlock(handle_in->file);
980        decaying_usleep(&sleep_time, 1000000);
981        filemgr_mutex_lock(handle_in->file);
982        fstatus = filemgr_get_file_status(handle_in->file);
983    }
984    if (fstatus == FILE_REMOVED_PENDING) {
985        filemgr_mutex_unlock(handle_in->file);
986        fdb_check_file_reopen(handle_in, NULL);
987    } else {
988        filemgr_mutex_unlock(handle_in->file);
989    }
990
991    fdb_sync_db_header(handle_in);
992
993    // if the max sequence number seen by this handle is lower than the
994    // requested snapshot marker, it means the snapshot is not yet visible
995    // even via the current fdb_kvs_handle
996    if (seqnum > handle_in->seqnum) {
997        filemgr_set_rollback(handle_in->file, 0); // allow mutations
998        fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
999        return FDB_RESULT_NO_DB_INSTANCE;
1000    }
1001
1002    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1003    if (!handle) { // LCOV_EXCL_START
1004        fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
1005        return FDB_RESULT_ALLOC_FAIL;
1006    } // LCOV_EXCL_STOP
1007
1008    atomic_init_uint8_t(&handle->handle_busy, 0);
1009    handle->log_callback = handle_in->log_callback;
1010    handle->fhandle = handle_in->fhandle;
1011    if (seqnum == 0) {
1012        fs = _fdb_reset(handle, handle_in);
1013    } else {
1014        handle->max_seqnum = seqnum;
1015        fs = _fdb_open(handle, handle_in->file->filename, FDB_AFILENAME,
1016                       &config);
1017    }
1018
1019    filemgr_set_rollback(handle_in->file, 0); // allow mutations
1020    if (fs == FDB_RESULT_SUCCESS) {
1021        // rollback the file's sequence number
1022        filemgr_mutex_lock(handle_in->file);
1023        old_seqnum = filemgr_get_seqnum(handle_in->file);
1024        filemgr_set_seqnum(handle_in->file, seqnum);
1025        filemgr_mutex_unlock(handle_in->file);
1026
1027        fs = _fdb_commit(handle, FDB_COMMIT_NORMAL);
1028        if (fs == FDB_RESULT_SUCCESS) {
1029            if (handle_in->txn) {
1030                handle->txn = handle_in->txn;
1031                handle_in->txn = NULL;
1032            }
1033            handle_in->fhandle->root = handle;
1034            _fdb_close_root(handle_in);
1035            handle->max_seqnum = 0;
1036            handle->seqnum = seqnum;
1037            *handle_ptr = handle;
1038        } else {
1039            // cancel the rolling-back of the sequence number
1040            filemgr_mutex_lock(handle_in->file);
1041            filemgr_set_seqnum(handle_in->file, old_seqnum);
1042            filemgr_mutex_unlock(handle_in->file);
1043            free(handle);
1044            fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
1045        }
1046    } else {
1047        free(handle);
1048        fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
1049    }
1050
1051    return fs;
1052}
1053
1054static void _fdb_init_file_config(const fdb_config *config,
1055                                  struct filemgr_config *fconfig) {
1056    fconfig->blocksize = config->blocksize;
1057    fconfig->ncacheblock = config->buffercache_size / config->blocksize;
1058    fconfig->chunksize = config->chunksize;
1059
1060    fconfig->options = 0x0;
1061    if (config->flags & FDB_OPEN_FLAG_CREATE) {
1062        fconfig->options |= FILEMGR_CREATE;
1063    }
1064    if (config->flags & FDB_OPEN_FLAG_RDONLY) {
1065        fconfig->options |= FILEMGR_READONLY;
1066    }
1067    if (!(config->durability_opt & FDB_DRB_ASYNC)) {
1068        fconfig->options |= FILEMGR_SYNC;
1069    }
1070
1071    fconfig->flag = 0x0;
1072    if ((config->durability_opt & FDB_DRB_ODIRECT) &&
1073        config->buffercache_size) {
1074        fconfig->flag |= _ARCH_O_DIRECT;
1075    }
1076
1077    fconfig->prefetch_duration = config->prefetch_duration;
1078    fconfig->num_wal_shards = config->num_wal_partitions;
1079    fconfig->num_bcache_shards = config->num_bcache_partitions;
1080}
1081
1082fdb_status _fdb_clone_snapshot(fdb_kvs_handle *handle_in,
1083                               fdb_kvs_handle *handle_out)
1084{
1085    fdb_status status;
1086
1087    handle_out->config = handle_in->config;
1088    handle_out->kvs_config = handle_in->kvs_config;
1089    handle_out->fileops = handle_in->fileops;
1090    handle_out->file = handle_in->file;
1091    // Note that the file ref count will be decremented when the cloned snapshot
1092    // is closed through filemgr_close().
1093    filemgr_incr_ref_count(handle_out->file);
1094
1095    if (handle_out->filename) {
1096        handle_out->filename = (char *)realloc(handle_out->filename,
1097                                               strlen(handle_in->filename)+1);
1098    } else {
1099        handle_out->filename = (char*)malloc(strlen(handle_in->filename)+1);
1100    }
1101    strcpy(handle_out->filename, handle_in->filename);
1102
1103    // initialize the docio handle.
1104    handle_out->dhandle = (struct docio_handle *)
1105        calloc(1, sizeof(struct docio_handle));
1106    handle_out->dhandle->log_callback = &handle_out->log_callback;
1107    docio_init(handle_out->dhandle, handle_out->file,
1108               handle_out->config.compress_document_body);
1109
1110    // initialize the btree block handle.
1111    handle_out->btreeblkops = btreeblk_get_ops();
1112    handle_out->bhandle = (struct btreeblk_handle *)
1113        calloc(1, sizeof(struct btreeblk_handle));
1114    handle_out->bhandle->log_callback = &handle_out->log_callback;
1115    btreeblk_init(handle_out->bhandle, handle_out->file, handle_out->file->blocksize);
1116
1117    handle_out->dirty_updates = handle_in->dirty_updates;
1118    handle_out->cur_header_revnum = handle_in->cur_header_revnum;
1119    handle_out->last_wal_flush_hdr_bid = handle_in->last_wal_flush_hdr_bid;
1120    handle_out->kv_info_offset = handle_in->kv_info_offset;
1121    handle_out->shandle->stat = handle_in->shandle->stat;
1122    handle_out->op_stats = handle_in->op_stats;
1123
1124    // initialize the trie handle
1125    handle_out->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1126    hbtrie_init(handle_out->trie, handle_out->config.chunksize, OFFSET_SIZE,
1127                handle_out->file->blocksize,
1128                handle_in->trie->root_bid, // Source snapshot's trie root bid
1129                (void *)handle_out->bhandle, handle_out->btreeblkops,
1130                (void *)handle_out->dhandle, _fdb_readkey_wrap);
1131    // set aux for cmp wrapping function
1132    hbtrie_set_leaf_height_limit(handle_out->trie, 0xff);
1133    hbtrie_set_leaf_cmp(handle_out->trie, _fdb_custom_cmp_wrap);
1134
1135    if (handle_out->kvs) {
1136        hbtrie_set_map_function(handle_out->trie, fdb_kvs_find_cmp_chunk);
1137    }
1138
1139    if (handle_out->config.seqtree_opt == FDB_SEQTREE_USE) {
1140        handle_out->seqnum = handle_in->seqnum;
1141
1142        if (handle_out->config.multi_kv_instances) {
1143            // multi KV instance mode .. HB+trie
1144            handle_out->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1145            hbtrie_init(handle_out->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
1146                        handle_out->file->blocksize,
1147                        handle_in->seqtrie->root_bid, // Source snapshot's seqtrie root bid
1148                        (void *)handle_out->bhandle, handle_out->btreeblkops,
1149                        (void *)handle_out->dhandle, _fdb_readseq_wrap);
1150
1151        } else {
1152            // single KV instance mode .. normal B+tree
1153            struct btree_kv_ops *seq_kv_ops =
1154                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
1155            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
1156            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
1157
1158            handle_out->seqtree = (struct btree*)malloc(sizeof(struct btree));
1159            // Init the seq tree using the root bid of the source snapshot.
1160            btree_init_from_bid(handle_out->seqtree, (void *)handle_out->bhandle,
1161                                handle_out->btreeblkops, seq_kv_ops,
1162                                handle_out->config.blocksize,
1163                                handle_in->seqtree->root_bid);
1164        }
1165    } else{
1166        handle_out->seqtree = NULL;
1167    }
1168
1169    status = btreeblk_end(handle_out->bhandle);
1170    fdb_assert(status == FDB_RESULT_SUCCESS, status, handle_out);
1171
1172#ifdef _TRACE_HANDLES
1173    spin_lock(&open_handle_lock);
1174    avl_insert(&open_handles, &handle_out->avl_trace, _fdb_handle_cmp);
1175    spin_unlock(&open_handle_lock);
1176#endif
1177    return status;
1178}
1179
1180fdb_status _fdb_open(fdb_kvs_handle *handle,
1181                     const char *filename,
1182                     fdb_filename_mode_t filename_mode,
1183                     const fdb_config *config)
1184{
1185    struct filemgr_config fconfig;
1186    struct kvs_stat stat, empty_stat;
1187    bid_t trie_root_bid = BLK_NOT_FOUND;
1188    bid_t seq_root_bid = BLK_NOT_FOUND;
1189    fdb_seqnum_t seqnum = 0;
1190    filemgr_header_revnum_t header_revnum = 0;
1191    fdb_seqtree_opt_t seqtree_opt = config->seqtree_opt;
1192    uint64_t ndocs = 0;
1193    uint64_t datasize = 0;
1194    uint64_t last_wal_flush_hdr_bid = BLK_NOT_FOUND;
1195    uint64_t kv_info_offset = BLK_NOT_FOUND;
1196    uint64_t header_flags = 0;
1197    uint8_t header_buf[FDB_BLOCKSIZE];
1198    char *compacted_filename = NULL;
1199    char *prev_filename = NULL;
1200    size_t header_len = 0;
1201    bool multi_kv_instances = config->multi_kv_instances;
1202
1203    uint64_t nlivenodes = 0;
1204    bid_t hdr_bid = 0; // initialize to zero for in-memory snapshot
1205    char actual_filename[FDB_MAX_FILENAME_LEN];
1206    char virtual_filename[FDB_MAX_FILENAME_LEN];
1207    char *target_filename = NULL;
1208    fdb_status status;
1209
1210    if (filename == NULL) {
1211        return FDB_RESULT_INVALID_ARGS;
1212    }
1213    if (strlen(filename) > (FDB_MAX_FILENAME_LEN - 8)) {
1214        // filename (including path) length is supported up to
1215        // (FDB_MAX_FILENAME_LEN - 8) bytes.
1216        return FDB_RESULT_TOO_LONG_FILENAME;
1217    }
1218
1219    if (filename_mode == FDB_VFILENAME &&
1220        !compactor_is_valid_mode(filename, (fdb_config *)config)) {
1221        return FDB_RESULT_INVALID_COMPACTION_MODE;
1222    }
1223
1224    _fdb_init_file_config(config, &fconfig);
1225
1226    if (filename_mode == FDB_VFILENAME) {
1227        compactor_get_actual_filename(filename, actual_filename,
1228                                      config->compaction_mode, &handle->log_callback);
1229    } else {
1230        strcpy(actual_filename, filename);
1231    }
1232
1233    if ( config->compaction_mode == FDB_COMPACTION_MANUAL ||
1234         (config->compaction_mode == FDB_COMPACTION_AUTO   &&
1235          filename_mode == FDB_VFILENAME) ) {
1236        // 1) manual compaction mode, OR
1237        // 2) auto compaction mode + 'filename' is virtual filename
1238        // -> copy 'filename'
1239        target_filename = (char *)filename;
1240    } else {
1241        // otherwise (auto compaction mode + 'filename' is actual filename)
1242        // -> copy 'virtual_filename'
1243        compactor_get_virtual_filename(filename, virtual_filename);
1244        target_filename = virtual_filename;
1245    }
1246
1247    handle->fileops = get_filemgr_ops();
1248    filemgr_open_result result = filemgr_open((char *)actual_filename,
1249                                              handle->fileops,
1250                                              &fconfig, &handle->log_callback);
1251    if (result.rv != FDB_RESULT_SUCCESS) {
1252        return (fdb_status) result.rv;
1253    }
1254
1255    handle->file = result.file;
1256    if (config->compaction_mode == FDB_COMPACTION_MANUAL &&
1257        strcmp(filename, actual_filename)) {
1258        // It is in-place compacted file if
1259        // 1) compaction mode is manual, and
1260        // 2) actual filename is different to the filename given by user.
1261        // In this case, set the in-place compaction flag.
1262        filemgr_set_in_place_compaction(handle->file, true);
1263    }
1264    if (filemgr_is_in_place_compaction_set(handle->file)) {
1265        // This file was in-place compacted.
1266        // set 'handle->filename' to the original filename to trigger file renaming
1267        compactor_get_virtual_filename(filename, virtual_filename);
1268        target_filename = virtual_filename;
1269    }
1270
1271    if (handle->filename) {
1272        handle->filename = (char *)realloc(handle->filename,
1273                                           strlen(target_filename)+1);
1274    } else {
1275        handle->filename = (char*)malloc(strlen(target_filename)+1);
1276    }
1277    strcpy(handle->filename, target_filename);
1278
1279    // If cloning from a snapshot handle, fdb_snapshot_open would have already
1280    // set handle->last_hdr_bid to the block id of required header, so rewind..
1281    if (handle->shandle && handle->last_hdr_bid) {
1282        status = filemgr_fetch_header(handle->file, handle->last_hdr_bid,
1283                                      header_buf, &header_len, &seqnum,
1284                                      &header_revnum, &handle->log_callback);
1285        if (status != FDB_RESULT_SUCCESS) {
1286            free(handle->filename);
1287            handle->filename = NULL;
1288            filemgr_close(handle->file, false, handle->filename,
1289                              &handle->log_callback);
1290            return status;
1291        }
1292    } else { // Normal open
1293        filemgr_get_header(handle->file, header_buf, &header_len,
1294                           &handle->last_hdr_bid, &seqnum, &header_revnum);
1295    }
1296
1297    // initialize the docio handle so kv headers may be read
1298    handle->dhandle = (struct docio_handle *)
1299                      calloc(1, sizeof(struct docio_handle));
1300    handle->dhandle->log_callback = &handle->log_callback;
1301    docio_init(handle->dhandle, handle->file, config->compress_document_body);
1302
1303    if (header_len > 0) {
1304        fdb_fetch_header(header_buf, &trie_root_bid,
1305                         &seq_root_bid, &ndocs, &nlivenodes,
1306                         &datasize, &last_wal_flush_hdr_bid, &kv_info_offset,
1307                         &header_flags, &compacted_filename, &prev_filename);
1308        // use existing setting for seqtree_opt
1309        if (header_flags & FDB_FLAG_SEQTREE_USE) {
1310            seqtree_opt = FDB_SEQTREE_USE;
1311        } else {
1312            seqtree_opt = FDB_SEQTREE_NOT_USE;
1313        }
1314        // Retrieve seqnum for multi-kv mode
1315        if (handle->kvs && handle->kvs->id > 0) {
1316            if (kv_info_offset != BLK_NOT_FOUND) {
1317                if (!handle->file->kv_header) {
1318                    fdb_kvs_header_create(handle->file);
1319                    // KV header already exists but not loaded .. read & import
1320                    fdb_kvs_header_read(handle->file, handle->dhandle,
1321                                        kv_info_offset, false);
1322                }
1323                seqnum = _fdb_kvs_get_seqnum(handle->file->kv_header,
1324                                             handle->kvs->id);
1325            } else { // no kv_info offset, ok to set seqnum to zero
1326                seqnum = 0;
1327            }
1328        }
1329        // other flags
1330        if (header_flags & FDB_FLAG_ROOT_INITIALIZED) {
1331            handle->fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1332        }
1333        if (header_flags & FDB_FLAG_ROOT_CUSTOM_CMP) {
1334            handle->fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1335        }
1336        // use existing setting for multi KV instance mode
1337        if (kv_info_offset == BLK_NOT_FOUND) {
1338            multi_kv_instances = false;
1339        } else {
1340            multi_kv_instances = true;
1341        }
1342    }
1343
1344    handle->config = *config;
1345    handle->config.seqtree_opt = seqtree_opt;
1346    handle->config.multi_kv_instances = multi_kv_instances;
1347
1348    if (handle->shandle && handle->max_seqnum == FDB_SNAPSHOT_INMEM) {
1349        // Either an in-memory snapshot or cloning from an existing snapshot..
1350        hdr_bid = 0; // This prevents _fdb_restore_wal() as incoming handle's
1351                     // *_open() should have already restored it
1352    } else { // Persisted snapshot or file rollback..
1353        hdr_bid = filemgr_get_pos(handle->file) / FDB_BLOCKSIZE;
1354        if (hdr_bid > 0) {
1355            --hdr_bid;
1356        }
1357        if (handle->max_seqnum) {
1358            struct kvs_stat stat_ori;
1359            // backup original stats
1360            if (handle->kvs) {
1361                _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1362            } else {
1363                _kvs_stat_get(handle->file, 0, &stat_ori);
1364            }
1365
1366            if (hdr_bid > handle->last_hdr_bid){
1367                // uncommitted data exists beyond the last DB header
1368                // get the last committed seq number
1369                fdb_seqnum_t seq_commit;
1370                seq_commit = fdb_kvs_get_committed_seqnum(handle);
1371                if (seq_commit == 0 || seq_commit < handle->max_seqnum) {
1372                    // In case, snapshot_open is attempted with latest uncommitted
1373                    // sequence number
1374                    header_len = 0;
1375                }
1376            }
1377            // Reverse scan the file to locate the DB header with seqnum marker
1378            while (header_len && seqnum != handle->max_seqnum) {
1379                hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
1380                                          header_buf, &header_len, &seqnum,
1381                                          &handle->log_callback);
1382                if (header_len == 0) {
1383                    continue; // header doesn't exist
1384                }
1385                fdb_fetch_header(header_buf, &trie_root_bid,
1386                                 &seq_root_bid, &ndocs, &nlivenodes,
1387                                 &datasize, &last_wal_flush_hdr_bid,
1388                                 &kv_info_offset, &header_flags,
1389                                 &compacted_filename, NULL);
1390                handle->last_hdr_bid = hdr_bid;
1391
1392                if (!handle->kvs || handle->kvs->id == 0) {
1393                    // single KVS mode OR default KVS
1394                    if (!handle->shandle) {
1395                        // rollback
1396                        struct kvs_stat stat_dst;
1397                        _kvs_stat_get(handle->file, 0, &stat_dst);
1398                        stat_dst.ndocs = ndocs;
1399                        stat_dst.datasize = datasize;
1400                        stat_dst.nlivenodes = nlivenodes;
1401                        _kvs_stat_set(handle->file, 0, stat_dst);
1402                    }
1403                    continue;
1404                }
1405
1406                uint64_t doc_offset;
1407                struct kvs_header *kv_header;
1408                struct docio_object doc;
1409
1410                _fdb_kvs_header_create(&kv_header);
1411                memset(&doc, 0, sizeof(struct docio_object));
1412                doc_offset = docio_read_doc(handle->dhandle,
1413                                            kv_info_offset, &doc, true);
1414
1415                if (doc_offset == kv_info_offset) {
1416                    header_len = 0; // fail
1417                    _fdb_kvs_header_free(kv_header);
1418                } else {
1419                    _fdb_kvs_header_import(kv_header, doc.body,
1420                                           doc.length.bodylen, false);
1421                    // get local sequence number for the KV instance
1422                    seqnum = _fdb_kvs_get_seqnum(kv_header,
1423                                                 handle->kvs->id);
1424                    if (!handle->shandle) {
1425                        // rollback: replace kv_header stats
1426                        // read from the current header's kv_header
1427                        struct kvs_stat stat_src, stat_dst;
1428                        _kvs_stat_get_kv_header(kv_header,
1429                                                handle->kvs->id,
1430                                                &stat_src);
1431                        _kvs_stat_get(handle->file,
1432                                      handle->kvs->id,
1433                                      &stat_dst);
1434                        // update ndocs, datasize, nlivenodes
1435                        // into the current file's kv_header
1436                        // Note: stats related to WAL should not be updated
1437                        //       at this time. They will be adjusted through
1438                        //       discard & restore routines below.
1439                        stat_dst.ndocs = stat_src.ndocs;
1440                        stat_dst.datasize = stat_src.datasize;
1441                        stat_dst.nlivenodes = stat_src.nlivenodes;
1442                        _kvs_stat_set(handle->file,
1443                                      handle->kvs->id,
1444                                      stat_dst);
1445                    }
1446                    _fdb_kvs_header_free(kv_header);
1447                    free_docio_object(&doc, 1, 1, 1);
1448                }
1449            }
1450            if (!header_len) { // Marker MUST match that of DB commit!
1451                // rollback original stats
1452                if (handle->kvs) {
1453                    _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1454                } else {
1455                    _kvs_stat_get(handle->file, 0, &stat_ori);
1456                }
1457
1458                docio_free(handle->dhandle);
1459                free(handle->dhandle);
1460                free(handle->filename);
1461                free(prev_filename);
1462                handle->filename = NULL;
1463                filemgr_close(handle->file, false, handle->filename,
1464                              &handle->log_callback);
1465                return FDB_RESULT_NO_DB_INSTANCE;
1466            }
1467
1468            if (!handle->shandle) { // Rollback mode, destroy file WAL..
1469                if (handle->config.multi_kv_instances) {
1470                    // multi KV instance mode
1471                    // clear only WAL items belonging to the instance
1472                    wal_close_kv_ins(handle->file,
1473                                     (handle->kvs)?(handle->kvs->id):(0));
1474                } else {
1475                    wal_shutdown(handle->file);
1476                }
1477            }
1478        } else { // snapshot to sequence number 0 requested..
1479            if (handle->shandle) { // fdb_snapshot_open API call
1480                if (seqnum) {
1481                    // Database currently has a non-zero seq number,
1482                    // but the snapshot was requested with a seq number zero.
1483                    docio_free(handle->dhandle);
1484                    free(handle->dhandle);
1485                    free(handle->filename);
1486                    free(prev_filename);
1487                    handle->filename = NULL;
1488                    filemgr_close(handle->file, false, handle->filename,
1489                                  &handle->log_callback);
1490                    return FDB_RESULT_NO_DB_INSTANCE;
1491                }
1492            } // end of zero max_seqnum but non-rollback check
1493        } // end of zero max_seqnum check
1494    } // end of durable snapshot locating
1495
1496    handle->btreeblkops = btreeblk_get_ops();
1497    handle->bhandle = (struct btreeblk_handle *)
1498                      calloc(1, sizeof(struct btreeblk_handle));
1499    handle->bhandle->log_callback = &handle->log_callback;
1500
1501    handle->dirty_updates = 0;
1502
1503    if (handle->config.compaction_buf_maxsize == 0) {
1504        handle->config.compaction_buf_maxsize = FDB_COMP_BUF_MINSIZE;
1505    }
1506
1507    btreeblk_init(handle->bhandle, handle->file, handle->file->blocksize);
1508
1509    handle->cur_header_revnum = header_revnum;
1510    handle->last_wal_flush_hdr_bid = last_wal_flush_hdr_bid;
1511
1512    memset(&empty_stat, 0x0, sizeof(empty_stat));
1513    _kvs_stat_get(handle->file, 0, &stat);
1514    if (!memcmp(&stat, &empty_stat, sizeof(stat))) { // first open
1515        // sync (default) KVS stat with DB header
1516        stat.nlivenodes = nlivenodes;
1517        stat.ndocs = ndocs;
1518        stat.datasize = datasize;
1519        _kvs_stat_set(handle->file, 0, stat);
1520    }
1521
1522    if (handle->config.multi_kv_instances) {
1523        // multi KV instance mode
1524        filemgr_mutex_lock(handle->file);
1525        if (kv_info_offset == BLK_NOT_FOUND) {
1526            // there is no KV header .. create & initialize
1527            fdb_kvs_header_create(handle->file);
1528            kv_info_offset = fdb_kvs_header_append(handle->file, handle->dhandle);
1529        } else if (handle->file->kv_header == NULL) {
1530            // KV header already exists but not loaded .. read & import
1531            fdb_kvs_header_create(handle->file);
1532            fdb_kvs_header_read(handle->file, handle->dhandle, kv_info_offset, false);
1533        }
1534        filemgr_mutex_unlock(handle->file);
1535
1536        // validation check for key order of all KV stores
1537        if (handle == handle->fhandle->root) {
1538            fdb_status fs = fdb_kvs_cmp_check(handle);
1539            if (fs != FDB_RESULT_SUCCESS) { // cmp function mismatch
1540                docio_free(handle->dhandle);
1541                free(handle->dhandle);
1542                btreeblk_free(handle->bhandle);
1543                free(handle->bhandle);
1544                free(handle->filename);
1545                handle->filename = NULL;
1546                filemgr_close(handle->file, false, handle->filename,
1547                              &handle->log_callback);
1548                return fs;
1549            }
1550        }
1551    }
1552    handle->kv_info_offset = kv_info_offset;
1553
1554    if (handle->kv_info_offset != BLK_NOT_FOUND &&
1555        handle->kvs == NULL) {
1556        // multi KV instance mode .. turn on config flag
1557        handle->config.multi_kv_instances = true;
1558        // only super handle can be opened using fdb_open(...)
1559        fdb_kvs_info_create(NULL, handle, handle->file, NULL);
1560    }
1561
1562    if (handle->shandle) { // Populate snapshot stats..
1563        if (kv_info_offset == BLK_NOT_FOUND) { // Single KV mode
1564            memset(&handle->shandle->stat, 0x0,
1565                    sizeof(handle->shandle->stat));
1566            handle->shandle->stat.ndocs = ndocs;
1567            handle->shandle->stat.datasize = datasize;
1568            handle->shandle->stat.nlivenodes = nlivenodes;
1569        } else { // Multi KV instance mode, populate specific kv stats
1570            memset(&handle->shandle->stat, 0x0,
1571                    sizeof(handle->shandle->stat));
1572            _kvs_stat_get(handle->file, handle->kvs->id,
1573                    &handle->shandle->stat);
1574            // Since wal is restored below, we have to reset
1575            // wal stats to zero.
1576            handle->shandle->stat.wal_ndeletes = 0;
1577            handle->shandle->stat.wal_ndocs = 0;
1578        }
1579    }
1580
1581    // initialize pointer to the global operational stats of this KV store
1582    handle->op_stats = filemgr_get_ops_stats(handle->file, handle->kvs);
1583    fdb_assert(handle->op_stats, 0, 0);
1584
1585    handle->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1586    hbtrie_init(handle->trie, config->chunksize, OFFSET_SIZE,
1587                handle->file->blocksize, trie_root_bid,
1588                (void *)handle->bhandle, handle->btreeblkops,
1589                (void *)handle->dhandle, _fdb_readkey_wrap);
1590    // set aux for cmp wrapping function
1591    hbtrie_set_leaf_height_limit(handle->trie, 0xff);
1592    hbtrie_set_leaf_cmp(handle->trie, _fdb_custom_cmp_wrap);
1593
1594    if (handle->kvs) {
1595        hbtrie_set_map_function(handle->trie, fdb_kvs_find_cmp_chunk);
1596    }
1597
1598    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1599        handle->seqnum = seqnum;
1600
1601        if (handle->config.multi_kv_instances) {
1602            // multi KV instance mode .. HB+trie
1603            handle->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1604            hbtrie_init(handle->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
1605                        handle->file->blocksize, seq_root_bid,
1606                        (void *)handle->bhandle, handle->btreeblkops,
1607                        (void *)handle->dhandle, _fdb_readseq_wrap);
1608
1609        } else {
1610            // single KV instance mode .. normal B+tree
1611            struct btree_kv_ops *seq_kv_ops =
1612                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
1613            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
1614            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
1615
1616            handle->seqtree = (struct btree*)malloc(sizeof(struct btree));
1617            if (seq_root_bid == BLK_NOT_FOUND) {
1618                btree_init(handle->seqtree, (void *)handle->bhandle,
1619                           handle->btreeblkops, seq_kv_ops,
1620                           handle->config.blocksize, sizeof(fdb_seqnum_t),
1621                           OFFSET_SIZE, 0x0, NULL);
1622             }else{
1623                 btree_init_from_bid(handle->seqtree, (void *)handle->bhandle,
1624                                     handle->btreeblkops, seq_kv_ops,
1625                                     handle->config.blocksize, seq_root_bid);
1626             }
1627        }
1628    }else{
1629        handle->seqtree = NULL;
1630    }
1631
1632    if (handle->config.multi_kv_instances && handle->max_seqnum) {
1633        // restore only docs belonging to the KV instance
1634        // handle->kvs should not be NULL
1635        _fdb_restore_wal(handle, FDB_RESTORE_KV_INS,
1636                         hdr_bid, (handle->kvs)?(handle->kvs->id):(0));
1637    } else {
1638        // normal restore
1639        _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, hdr_bid, 0);
1640    }
1641
1642    if (compacted_filename &&
1643        filemgr_get_file_status(handle->file) == FILE_NORMAL &&
1644        !(config->flags & FDB_OPEN_FLAG_RDONLY)) { // do not recover read-only
1645        _fdb_recover_compaction(handle, compacted_filename);
1646    }
1647
1648    if (prev_filename) {
1649        if (!handle->shandle && strcmp(prev_filename, handle->file->filename)) {
1650            // record the old filename into the file handle of current file
1651            // and REMOVE old file on the first open
1652            // WARNING: snapshots must have been opened before this call
1653            if (filemgr_update_file_status(handle->file,
1654                                           filemgr_get_file_status(handle->file),
1655                                           prev_filename)) {
1656                // Open the old file with read-only mode.
1657                // (Temporarily disable log callback at this time since
1658                //  the old file might be already removed.)
1659                fconfig.options = FILEMGR_READONLY;
1660                filemgr_open_result result = filemgr_open(prev_filename,
1661                                                          handle->fileops,
1662                                                          &fconfig,
1663                                                          NULL);
1664                if (result.file) {
1665                    filemgr_remove_pending(result.file, handle->file);
1666                    filemgr_close(result.file, 0, handle->filename,
1667                                  &handle->log_callback);
1668                }
1669            }
1670        } else {
1671            free(prev_filename);
1672        }
1673    }
1674
1675    status = btreeblk_end(handle->bhandle);
1676    fdb_assert(status == FDB_RESULT_SUCCESS, status, handle);
1677
1678    // do not register read-only handles
1679    if (!(config->flags & FDB_OPEN_FLAG_RDONLY) &&
1680        config->compaction_mode == FDB_COMPACTION_AUTO) {
1681        status = compactor_register_file(handle->file, (fdb_config *)config,
1682                                         handle->fhandle->cmp_func_list,
1683                                         &handle->log_callback);
1684    }
1685
1686#ifdef _TRACE_HANDLES
1687    spin_lock(&open_handle_lock);
1688    avl_insert(&open_handles, &handle->avl_trace, _fdb_handle_cmp);
1689    spin_unlock(&open_handle_lock);
1690#endif
1691    return status;
1692}
1693
1694LIBFDB_API
1695fdb_status fdb_set_log_callback(fdb_kvs_handle *handle,
1696                                fdb_log_callback log_callback,
1697                                void *ctx_data)
1698{
1699    handle->log_callback.callback = log_callback;
1700    handle->log_callback.ctx_data = ctx_data;
1701    return FDB_RESULT_SUCCESS;
1702}
1703
1704LIBFDB_API
1705fdb_status fdb_doc_create(fdb_doc **doc, const void *key, size_t keylen,
1706                          const void *meta, size_t metalen,
1707                          const void *body, size_t bodylen)
1708{
1709    if (doc == NULL || keylen > FDB_MAX_KEYLEN ||
1710        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
1711        return FDB_RESULT_INVALID_ARGS;
1712    }
1713
1714    *doc = (fdb_doc*)calloc(1, sizeof(fdb_doc));
1715    if (*doc == NULL) { // LCOV_EXCL_START
1716        return FDB_RESULT_ALLOC_FAIL;
1717    } // LCOV_EXCL_STOP
1718
1719    (*doc)->seqnum = SEQNUM_NOT_USED;
1720
1721    if (key && keylen > 0) {
1722        (*doc)->key = (void *)malloc(keylen);
1723        if ((*doc)->key == NULL) { // LCOV_EXCL_START
1724            return FDB_RESULT_ALLOC_FAIL;
1725        } // LCOV_EXCL_STOP
1726        memcpy((*doc)->key, key, keylen);
1727        (*doc)->keylen = keylen;
1728    } else {
1729        (*doc)->key = NULL;
1730        (*doc)->keylen = 0;
1731    }
1732
1733    if (meta && metalen > 0) {
1734        (*doc)->meta = (void *)malloc(metalen);
1735        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
1736            return FDB_RESULT_ALLOC_FAIL;
1737        } // LCOV_EXCL_STOP
1738        memcpy((*doc)->meta, meta, metalen);
1739        (*doc)->metalen = metalen;
1740    } else {
1741        (*doc)->meta = NULL;
1742        (*doc)->metalen = 0;
1743    }
1744
1745    if (body && bodylen > 0) {
1746        (*doc)->body = (void *)malloc(bodylen);
1747        if ((*doc)->body == NULL) { // LCOV_EXCL_START
1748            return FDB_RESULT_ALLOC_FAIL;
1749        } // LCOV_EXCL_STOP
1750        memcpy((*doc)->body, body, bodylen);
1751        (*doc)->bodylen = bodylen;
1752    } else {
1753        (*doc)->body = NULL;
1754        (*doc)->bodylen = 0;
1755    }
1756
1757    (*doc)->size_ondisk = 0;
1758    (*doc)->deleted = false;
1759
1760    return FDB_RESULT_SUCCESS;
1761}
1762
1763LIBFDB_API
1764fdb_status fdb_doc_update(fdb_doc **doc,
1765                          const void *meta, size_t metalen,
1766                          const void *body, size_t bodylen)
1767{
1768    if (doc == NULL ||
1769        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
1770        return FDB_RESULT_INVALID_ARGS;
1771    }
1772    if (*doc == NULL) {
1773        return FDB_RESULT_INVALID_ARGS;
1774    }
1775
1776    if (meta && metalen > 0) {
1777        // free previous metadata
1778        free((*doc)->meta);
1779        // allocate new metadata
1780        (*doc)->meta = (void *)malloc(metalen);
1781        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
1782            return FDB_RESULT_ALLOC_FAIL;
1783        } // LCOV_EXCL_STOP
1784        memcpy((*doc)->meta, meta, metalen);
1785        (*doc)->metalen = metalen;
1786    }
1787
1788    if (body && bodylen > 0) {
1789        // free previous body
1790        free((*doc)->body);
1791        // allocate new body
1792        (*doc)->body = (void *)malloc(bodylen);
1793        if ((*doc)->body == NULL) { // LCOV_EXCL_START
1794            return FDB_RESULT_ALLOC_FAIL;
1795        } // LCOV_EXCL_STOP
1796        memcpy((*doc)->body, body, bodylen);
1797        (*doc)->bodylen = bodylen;
1798    }
1799
1800    return FDB_RESULT_SUCCESS;
1801}
1802
1803// doc MUST BE allocated by malloc
1804LIBFDB_API
1805fdb_status fdb_doc_free(fdb_doc *doc)
1806{
1807    if (doc) {
1808        free(doc->key);
1809        free(doc->meta);
1810        free(doc->body);
1811        free(doc);
1812    }
1813    return FDB_RESULT_SUCCESS;
1814}
1815
1816INLINE uint64_t _fdb_wal_get_old_offset(void *voidhandle,
1817                                        struct wal_item *item)
1818{
1819    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
1820    uint64_t old_offset = 0;
1821
1822    hbtrie_find_offset(handle->trie,
1823                       item->header->key,
1824                       item->header->keylen,
1825                       (void*)&old_offset);
1826    btreeblk_end(handle->bhandle);
1827    old_offset = _endian_decode(old_offset);
1828
1829    return old_offset;
1830}
1831
1832INLINE fdb_status _fdb_wal_snapshot_func(void *handle, fdb_doc *doc,
1833                                         uint64_t offset) {
1834
1835    return snap_insert((struct snap_handle *)handle, doc, offset);
1836}
1837
1838INLINE fdb_status _fdb_wal_flush_func(void *voidhandle, struct wal_item *item)
1839{
1840    hbtrie_result hr;
1841    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
1842    fdb_seqnum_t _seqnum;
1843    fdb_kvs_id_t kv_id;
1844    fdb_status fs = FDB_RESULT_SUCCESS;
1845    uint8_t *var_key = alca(uint8_t, handle->config.chunksize);
1846    int size_id, size_seq;
1847    uint8_t *kvid_seqnum;
1848    uint64_t old_offset, _offset;
1849    int delta, r;
1850    struct filemgr *file = handle->dhandle->file;
1851    struct kvs_stat stat;
1852
1853    memset(var_key, 0, handle->config.chunksize);
1854    if (handle->kvs) {
1855        buf2kvid(handle->config.chunksize, item->header->key, &kv_id);
1856    } else {
1857        kv_id = 0;
1858    }
1859
1860    if (item->action == WAL_ACT_INSERT ||
1861        item->action == WAL_ACT_LOGICAL_REMOVE) {
1862        _offset = _endian_encode(item->offset);
1863
1864        r = _kvs_stat_get(file, kv_id, &stat);
1865        if (r != 0) {
1866            // KV store corresponding to kv_id is already removed
1867            // skip this item
1868            return FDB_RESULT_SUCCESS;
1869        }
1870        handle->bhandle->nlivenodes = stat.nlivenodes;
1871
1872        hr = hbtrie_insert(handle->trie,
1873                           item->header->key,
1874                           item->header->keylen,
1875                           (void *)&_offset,
1876                           (void *)&old_offset);
1877
1878        fs = btreeblk_end(handle->bhandle);
1879        if (fs != FDB_RESULT_SUCCESS) {
1880            return fs;
1881        }
1882        old_offset = _endian_decode(old_offset);
1883
1884        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1885            _seqnum = _endian_encode(item->seqnum);
1886            if (handle->kvs) {
1887                // multi KV instance mode .. HB+trie
1888                uint64_t old_offset_local;
1889
1890                size_id = sizeof(fdb_kvs_id_t);
1891                size_seq = sizeof(fdb_seqnum_t);
1892                kvid_seqnum = alca(uint8_t, size_id + size_seq);
1893                kvid2buf(size_id, kv_id, kvid_seqnum);
1894                memcpy(kvid_seqnum + size_id, &_seqnum, size_seq);
1895                hbtrie_insert(handle->seqtrie, kvid_seqnum, size_id + size_seq,
1896                              (void *)&_offset, (void *)&old_offset_local);
1897            } else {
1898                btree_insert(handle->seqtree, (void *)&_seqnum,
1899                             (void *)&_offset);
1900            }
1901            fs = btreeblk_end(handle->bhandle);
1902            if (fs != FDB_RESULT_SUCCESS) {
1903                return fs;
1904            }
1905        }
1906
1907        delta = (int)handle->bhandle->nlivenodes - (int)stat.nlivenodes;
1908        _kvs_stat_update_attr(file, kv_id, KVS_STAT_NLIVENODES, delta);
1909
1910        if (hr == HBTRIE_RESULT_SUCCESS) {
1911            if (item->action == WAL_ACT_INSERT) {
1912                _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, 1);
1913            }
1914            _kvs_stat_update_attr(file, kv_id, KVS_STAT_DATASIZE,
1915                                  item->doc_size);
1916        } else { // update or logical delete
1917            struct docio_length len;
1918            // This block is already cached when we call HBTRIE_INSERT.
1919            // No additional block access.
1920            len = docio_read_doc_length(handle->dhandle, old_offset);
1921
1922            if (!(len.flag & DOCIO_DELETED)) {
1923                if (item->action == WAL_ACT_LOGICAL_REMOVE) {
1924                    _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, -1);
1925                }
1926            } else {
1927                if (item->action == WAL_ACT_INSERT) {
1928                    _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, 1);
1929                }
1930            }
1931
1932            delta = (int)item->doc_size - (int)_fdb_get_docsize(len);
1933            _kvs_stat_update_attr(file, kv_id, KVS_STAT_DATASIZE, delta);
1934        }
1935    } else {
1936        // Immediate remove
1937        // LCOV_EXCL_START
1938        hr = hbtrie_remove(handle->trie, item->header->key,
1939                           item->header->keylen);
1940        fs = btreeblk_end(handle->bhandle);
1941        if (fs != FDB_RESULT_SUCCESS) {
1942            return fs;
1943        }
1944
1945        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1946            _seqnum = _endian_encode(item->seqnum);
1947            if (handle->kvs) {
1948                // multi KV instance mode .. HB+trie
1949                size_id = sizeof(fdb_kvs_id_t);
1950                size_seq = sizeof(fdb_seqnum_t);
1951                kvid_seqnum = alca(uint8_t, size_id + size_seq);
1952                kvid2buf(size_id, kv_id, kvid_seqnum);
1953                memcpy(kvid_seqnum + size_id, &_seqnum, size_seq);
1954
1955                hbtrie_remove(handle->seqtrie, (void*)kvid_seqnum,
1956                              size_id + size_seq);
1957            } else {
1958                btree_remove(handle->seqtree, (void*)&_seqnum);
1959            }
1960            fs = btreeblk_end(handle->bhandle);
1961            if (fs != FDB_RESULT_SUCCESS) {
1962                return fs;
1963            }
1964        }
1965
1966        if (hr == HBTRIE_RESULT_SUCCESS) {
1967            _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, -1);
1968            delta = -(int)item->doc_size;
1969            _kvs_stat_update_attr(file, kv_id, KVS_STAT_DATASIZE, delta);
1970        }
1971        // LCOV_EXCL_STOP
1972    }
1973    return FDB_RESULT_SUCCESS;
1974}
1975
1976void fdb_sync_db_header(fdb_kvs_handle *handle)
1977{
1978    uint64_t cur_revnum = filemgr_get_header_revnum(handle->file);
1979    if (handle->cur_header_revnum != cur_revnum) {
1980        void *header_buf = NULL;
1981        size_t header_len;
1982
1983        handle->last_hdr_bid = filemgr_get_header_bid(handle->file);
1984        header_buf = filemgr_get_header(handle->file, NULL, &header_len,
1985                                        NULL, NULL, NULL);
1986        if (header_len > 0) {
1987            uint64_t header_flags, dummy64;
1988            bid_t idtree_root;
1989            bid_t new_seq_root;
1990            char *compacted_filename;
1991            char *prev_filename = NULL;
1992
1993            fdb_fetch_header(header_buf, &idtree_root,
1994                             &new_seq_root,
1995                             &dummy64, &dummy64,
1996                             &dummy64, &handle->last_wal_flush_hdr_bid,
1997                             &handle->kv_info_offset, &header_flags,
1998                             &compacted_filename, &prev_filename);
1999
2000            if (handle->dirty_updates) {
2001                // discard all cached writable b+tree nodes
2002                // to avoid data inconsistency with other writers
2003                btreeblk_discard_blocks(handle->bhandle);
2004            }
2005
2006            handle->trie->root_bid = idtree_root;
2007
2008            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2009                if (new_seq_root != handle->seqtree->root_bid) {
2010                    if (handle->config.multi_kv_instances) {
2011                        handle->seqtrie->root_bid = new_seq_root;
2012                    } else {
2013                        btree_init_from_bid(handle->seqtree,
2014                                            handle->seqtree->blk_handle,
2015                                            handle->seqtree->blk_ops,
2016                                            handle->seqtree->kv_ops,
2017                                            handle->seqtree->blksize,
2018                                            new_seq_root);
2019                    }
2020                }
2021            }
2022
2023            if (prev_filename) {
2024                free(prev_filename);
2025            }
2026
2027            handle->cur_header_revnum = cur_revnum;
2028            handle->dirty_updates = 0;
2029            if (handle->kvs) {
2030                // multiple KV instance mode AND sub handle
2031                handle->seqnum = fdb_kvs_get_seqnum(handle->file,
2032                                                    handle->kvs->id);
2033            } else {
2034                // super handle OR single KV instance mode
2035                handle->seqnum = filemgr_get_seqnum(handle->file);
2036            }
2037        }
2038        if (header_buf) {
2039            free(header_buf);
2040        }
2041    }
2042}
2043
2044fdb_status fdb_check_file_reopen(fdb_kvs_handle *handle, file_status_t *status)
2045{
2046    fdb_status fs = FDB_RESULT_SUCCESS;
2047    file_status_t fstatus = filemgr_get_file_status(handle->file);
2048    // check whether the compaction is done
2049    if (fstatus == FILE_REMOVED_PENDING) {
2050        uint64_t ndocs, datasize, nlivenodes, last_wal_flush_hdr_bid;
2051        uint64_t kv_info_offset, header_flags;
2052        size_t header_len;
2053        char *new_filename;
2054        uint8_t *buf = alca(uint8_t, handle->config.blocksize);
2055        bid_t trie_root_bid, seq_root_bid;
2056        fdb_config config = handle->config;
2057
2058        // close the current file and newly open the new file
2059        if (handle->config.compaction_mode == FDB_COMPACTION_AUTO) {
2060            // compaction daemon mode .. just close and then open
2061            char filename[FDB_MAX_FILENAME_LEN];
2062            strcpy(filename, handle->filename);
2063            fs = _fdb_close(handle);
2064            fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
2065            fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
2066            fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
2067        } else {
2068            filemgr_get_header(handle->file, buf, &header_len, NULL, NULL, NULL);
2069            fdb_fetch_header(buf,
2070                             &trie_root_bid, &seq_root_bid,
2071                             &ndocs, &nlivenodes, &datasize, &last_wal_flush_hdr_bid,
2072                             &kv_info_offset, &header_flags,
2073                             &new_filename, NULL);
2074            fs = _fdb_close(handle);
2075            fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
2076            fs = _fdb_open(handle, new_filename, FDB_AFILENAME, &config);
2077            fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
2078        }
2079    }
2080    if (status) {
2081        *status = fstatus;
2082    }
2083    return fs;
2084}
2085
2086static bool _fdb_sync_dirty_root(fdb_kvs_handle *handle)
2087{
2088    bool locked = false;
2089    bid_t dirty_idtree_root, dirty_seqtree_root;
2090
2091    if (handle->shandle) {
2092        // skip snapshot
2093        return locked;
2094    }
2095
2096    if ( ( handle->dirty_updates ||
2097           filemgr_dirty_root_exist(handle->file) )  &&
2098         filemgr_get_header_bid(handle->file) == handle->last_hdr_bid ) {
2099        // 1) { a) dirty WAL flush by this handle exists OR
2100        //      b) dirty WAL flush by other handle exists } AND
2101        // 2) no commit was performed yet.
2102        // grab lock for writer
2103        filemgr_mutex_lock(handle->file);
2104        locked = true;
2105
2106        // get dirty root nodes
2107        filemgr_get_dirty_root(handle->file,
2108                               &dirty_idtree_root, &dirty_seqtree_root);
2109        if (dirty_idtree_root != BLK_NOT_FOUND) {
2110            handle->trie->root_bid = dirty_idtree_root;
2111        }
2112        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2113            if (dirty_seqtree_root != BLK_NOT_FOUND) {
2114                if (handle->kvs) {
2115                    handle->seqtrie->root_bid = dirty_seqtree_root;
2116                } else {
2117                    btree_init_from_bid(handle->seqtree,
2118                                        handle->seqtree->blk_handle,
2119                                        handle->seqtree->blk_ops,
2120                                        handle->seqtree->kv_ops,
2121                                        handle->seqtree->blksize,
2122                                        dirty_seqtree_root);
2123                }
2124            }
2125        }
2126        btreeblk_discard_blocks(handle->bhandle);
2127    }
2128    return locked;
2129}
2130
2131LIBFDB_API
2132fdb_status fdb_get(fdb_kvs_handle *handle, fdb_doc *doc)
2133{
2134    uint64_t offset, _offset;
2135    struct docio_object _doc;
2136    struct filemgr *wal_file = NULL;
2137    struct docio_handle *dhandle;
2138    fdb_status wr;
2139    hbtrie_result hr = HBTRIE_RESULT_FAIL;
2140    fdb_txn *txn;
2141    fdb_doc doc_kv = *doc;
2142
2143    if (!handle || !doc || !doc->key || doc->keylen == 0 ||
2144        doc->keylen > FDB_MAX_KEYLEN ||
2145        (handle->kvs_config.custom_cmp &&
2146            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2147        return FDB_RESULT_INVALID_ARGS;
2148    }
2149
2150    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2151        return FDB_RESULT_HANDLE_BUSY;
2152    }
2153
2154    if (handle->kvs) {
2155        // multi KV instance mode
2156        int size_chunk = handle->config.chunksize;
2157        doc_kv.keylen = doc->keylen + size_chunk;
2158        doc_kv.key = alca(uint8_t, doc_kv.keylen);
2159        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
2160        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
2161    }
2162
2163    if (!handle->shandle) {
2164        fdb_check_file_reopen(handle, NULL);
2165        fdb_sync_db_header(handle);
2166
2167        wal_file = handle->file;
2168        dhandle = handle->dhandle;
2169
2170        txn = handle->fhandle->root->txn;
2171        if (!txn) {
2172            txn = &wal_file->global_txn;
2173        }
2174        if (handle->kvs) {
2175            wr = wal_find(txn, wal_file, &doc_kv, &offset);
2176        } else {
2177            wr = wal_find(txn, wal_file, doc, &offset);
2178        }
2179    } else {
2180        if (handle->kvs) {
2181            wr = snap_find(handle->shandle, &doc_kv, &offset);
2182        } else {
2183            wr = snap_find(handle->shandle, doc, &offset);
2184        }
2185        dhandle = handle->dhandle;
2186    }
2187
2188    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2189
2190    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2191        bool locked = _fdb_sync_dirty_root(handle);
2192
2193        if (handle->kvs) {
2194            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
2195                             (void *)&offset);
2196        } else {
2197            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
2198                             (void *)&offset);
2199        }
2200        btreeblk_end(handle->bhandle);
2201        offset = _endian_decode(offset);
2202
2203        if (locked) {
2204            // grab lock for writer if there are dirty updates
2205            filemgr_mutex_unlock(handle->file);
2206        }
2207    }
2208
2209    if (wr == FDB_RESULT_SUCCESS || hr != HBTRIE_RESULT_FAIL) {
2210        bool alloced_meta = doc->meta ? false : true;
2211        bool alloced_body = doc->body ? false : true;
2212        if (handle->kvs) {
2213            _doc.key = doc_kv.key;
2214            _doc.length.keylen = doc_kv.keylen;
2215        } else {
2216            _doc.key = doc->key;
2217            _doc.length.keylen = doc->keylen;
2218        }
2219        _doc.meta = doc->meta;
2220        _doc.body = doc->body;
2221
2222        if (wr == FDB_RESULT_SUCCESS && doc->deleted) {
2223            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2224            return FDB_RESULT_KEY_NOT_FOUND;
2225        }
2226
2227        _offset = docio_read_doc(dhandle, offset, &_doc, true);
2228        if (_offset == offset) {
2229            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2230            return FDB_RESULT_KEY_NOT_FOUND;
2231        }
2232
2233        if (_doc.length.keylen != doc_kv.keylen ||
2234            _doc.length.flag & DOCIO_DELETED) {
2235            free_docio_object(&_doc, 0, alloced_meta, alloced_body);
2236            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2237            return FDB_RESULT_KEY_NOT_FOUND;
2238        }
2239
2240        doc->seqnum = _doc.seqnum;
2241        doc->metalen = _doc.length.metalen;
2242        doc->bodylen = _doc.length.bodylen;
2243        doc->meta = _doc.meta;
2244        doc->body = _doc.body;
2245        doc->deleted = _doc.length.flag & DOCIO_DELETED;
2246        doc->size_ondisk = _fdb_get_docsize(_doc.length);
2247        doc->offset = offset;
2248
2249        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2250        return FDB_RESULT_SUCCESS;
2251    }
2252
2253    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2254    return FDB_RESULT_KEY_NOT_FOUND;
2255}
2256
2257// search document metadata using key
2258LIBFDB_API
2259fdb_status fdb_get_metaonly(fdb_kvs_handle *handle, fdb_doc *doc)
2260{
2261    uint64_t offset;
2262    struct docio_object _doc;
2263    struct docio_handle *dhandle;
2264    struct filemgr *wal_file = NULL;
2265    fdb_status wr;
2266    hbtrie_result hr = HBTRIE_RESULT_FAIL;
2267    fdb_txn *txn;
2268    fdb_doc doc_kv = *doc;
2269
2270    if (!handle || !doc || !doc->key ||
2271        doc->keylen == 0 || doc->keylen > FDB_MAX_KEYLEN ||
2272        (handle->kvs_config.custom_cmp &&
2273            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2274        return FDB_RESULT_INVALID_ARGS;
2275    }
2276
2277    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2278        return FDB_RESULT_HANDLE_BUSY;
2279    }
2280
2281    if (handle->kvs) {
2282        // multi KV instance mode
2283        int size_chunk = handle->config.chunksize;
2284        doc_kv.keylen = doc->keylen + size_chunk;
2285        doc_kv.key = alca(uint8_t, doc_kv.keylen);
2286        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
2287        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
2288    }
2289
2290    if (!handle->shandle) {
2291        fdb_check_file_reopen(handle, NULL);
2292        fdb_sync_db_header(handle);
2293
2294        wal_file = handle->file;
2295        dhandle = handle->dhandle;
2296
2297        txn = handle->fhandle->root->txn;
2298        if (!txn) {
2299            txn = &wal_file->global_txn;
2300        }
2301        if (handle->kvs) {
2302            wr = wal_find(txn, wal_file, &doc_kv, &offset);
2303        } else {
2304            wr = wal_find(txn, wal_file, doc, &offset);
2305        }
2306    } else {
2307        if (handle->kvs) {
2308            wr = snap_find(handle->shandle, &doc_kv, &offset);
2309        } else {
2310            wr = snap_find(handle->shandle, doc, &offset);
2311        }
2312        dhandle = handle->dhandle;
2313    }
2314
2315    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2316
2317    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2318        bool locked = _fdb_sync_dirty_root(handle);
2319
2320        if (handle->kvs) {
2321            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
2322                             (void *)&offset);
2323        } else {
2324            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
2325                             (void *)&offset);
2326        }
2327        btreeblk_end(handle->bhandle);
2328        offset = _endian_decode(offset);
2329
2330        if (locked) {
2331            filemgr_mutex_unlock(handle->file);
2332        }
2333    }
2334
2335    if (wr == FDB_RESULT_SUCCESS || hr != HBTRIE_RESULT_FAIL) {
2336        if (handle->kvs) {
2337            _doc.key = doc_kv.key;
2338            _doc.length.keylen = doc_kv.keylen;
2339        } else {
2340            _doc.key = doc->key;
2341            _doc.length.keylen = doc->keylen;
2342        }
2343        bool alloced_meta = doc->meta ? false : true;
2344        _doc.meta = doc->meta;
2345        _doc.body = doc->body;
2346
2347        uint64_t body_offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
2348                                                       true);
2349        if (body_offset == offset){
2350            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2351            return FDB_RESULT_KEY_NOT_FOUND;
2352        }
2353
2354        if (_doc.length.keylen != doc_kv.keylen) {
2355            free_docio_object(&_doc, 0, alloced_meta, 0);
2356            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2357            return FDB_RESULT_KEY_NOT_FOUND;
2358        }
2359
2360        doc->seqnum = _doc.seqnum;
2361        doc->metalen = _doc.length.metalen;
2362        doc->bodylen = _doc.length.bodylen;
2363        doc->meta = _doc.meta;
2364        doc->body = _doc.body;
2365        doc->deleted = _doc.length.flag & DOCIO_DELETED;
2366        doc->size_ondisk = _fdb_get_docsize(_doc.length);
2367        doc->offset = offset;
2368
2369        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2370        return FDB_RESULT_SUCCESS;
2371    }
2372
2373    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2374    return FDB_RESULT_KEY_NOT_FOUND;
2375}
2376
2377// search document using sequence number
2378LIBFDB_API
2379fdb_status fdb_get_byseq(fdb_kvs_handle *handle, fdb_doc *doc)
2380{
2381    uint64_t offset, _offset;
2382    struct docio_object _doc;
2383    struct docio_handle *dhandle;
2384    struct filemgr *wal_file = NULL;
2385    fdb_status wr;
2386    btree_result br = BTREE_RESULT_FAIL;
2387    fdb_seqnum_t _seqnum;
2388    fdb_txn *txn;
2389
2390    if (!handle || !doc || doc->seqnum == SEQNUM_NOT_USED) {
2391        return FDB_RESULT_INVALID_ARGS;
2392    }
2393
2394    // Sequence trees are a must for byseq operations
2395    if (handle->config.seqtree_opt != FDB_SEQTREE_USE) {
2396        return FDB_RESULT_INVALID_CONFIG;
2397    }
2398
2399    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2400        return FDB_RESULT_HANDLE_BUSY;
2401    }
2402
2403    if (!handle->shandle) {
2404        fdb_check_file_reopen(handle, NULL);
2405        fdb_sync_db_header(handle);
2406
2407        wal_file = handle->file;
2408        dhandle = handle->dhandle;
2409
2410        txn = handle->fhandle->root->txn;
2411        if (!txn) {
2412            txn = &wal_file->global_txn;
2413        }
2414        // prevent searching by key in WAL if 'doc' is not empty
2415        size_t key_len = doc->keylen;
2416        doc->keylen = 0;
2417        if (handle->kvs) {
2418            wr = wal_find_kv_id(txn, wal_file, handle->kvs->id, doc, &offset);
2419        } else {
2420            wr = wal_find(txn, wal_file, doc, &offset);
2421        }
2422        doc->keylen = key_len;
2423    } else {
2424        wr = snap_find(handle->shandle, doc, &offset);
2425        dhandle = handle->dhandle;
2426    }
2427
2428    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2429
2430    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2431        bool locked = _fdb_sync_dirty_root(handle);
2432
2433        _seqnum = _endian_encode(doc->seqnum);
2434        if (handle->kvs) {
2435            int size_id, size_seq;
2436            uint8_t *kv_seqnum;
2437            hbtrie_result hr;
2438            fdb_kvs_id_t _kv_id;
2439
2440            _kv_id = _endian_encode(handle->kvs->id);
2441            size_id = sizeof(fdb_kvs_id_t);
2442            size_seq = sizeof(fdb_seqnum_t);
2443            kv_seqnum = alca(uint8_t, size_id + size_seq);
2444            memcpy(kv_seqnum, &_kv_id, size_id);
2445            memcpy(kv_seqnum + size_id, &_seqnum, size_seq);
2446            hr = hbtrie_find(handle->seqtrie, (void *)kv_seqnum,
2447                             size_id + size_seq, (void *)&offset);
2448            br = (hr == HBTRIE_RESULT_SUCCESS)?(BTREE_RESULT_SUCCESS):(br);
2449        } else {
2450            br = btree_find(handle->seqtree, (void *)&_seqnum, (void *)&offset);
2451        }
2452        btreeblk_end(handle->bhandle);
2453        offset = _endian_decode(offset);
2454
2455        if (locked) {
2456            filemgr_mutex_unlock(handle->file);
2457        }
2458    }
2459
2460    if (wr == FDB_RESULT_SUCCESS || br != BTREE_RESULT_FAIL) {
2461        bool alloc_key, alloc_meta, alloc_body;
2462        if (!handle->kvs) { // single KVS mode
2463            _doc.key = doc->key;
2464            _doc.length.keylen = doc->keylen;
2465            alloc_key = doc->key ? false : true;
2466        } else {
2467            _doc.key = NULL;
2468            alloc_key = true;
2469        }
2470        alloc_meta = doc->meta ? false : true;
2471        _doc.meta = doc->meta;
2472        alloc_body = doc->body ? false : true;
2473        _doc.body = doc->body;
2474
2475        if (wr == FDB_RESULT_SUCCESS && doc->deleted) {
2476            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2477            return FDB_RESULT_KEY_NOT_FOUND;
2478        }
2479
2480        _offset = docio_read_doc(dhandle, offset, &_doc, true);
2481        if (_offset == offset) {
2482            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2483            return FDB_RESULT_KEY_NOT_FOUND;
2484        }
2485
2486        if (_doc.length.flag & DOCIO_DELETED) {
2487            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2488            free_docio_object(&_doc, alloc_key, alloc_meta, alloc_body);
2489            return FDB_RESULT_KEY_NOT_FOUND;
2490        }
2491
2492        doc->seqnum = _doc.seqnum;
2493        if (handle->kvs) {
2494            int size_chunk = handle->config.chunksize;
2495            doc->keylen = _doc.length.keylen - size_chunk;
2496            if (doc->key) { // doc->key is given by user
2497                memcpy(doc->key, (uint8_t*)_doc.key + size_chunk, doc->keylen);
2498                free_docio_object(&_doc, 1, 0, 0);
2499            } else {
2500                doc->key = _doc.key;
2501                memmove(doc->key, (uint8_t*)doc->key + size_chunk, doc->keylen);
2502            }
2503        } else {
2504            doc->keylen = _doc.length.keylen;
2505            doc->key = _doc.key;
2506        }
2507        doc->metalen = _doc.length.metalen;
2508        doc->bodylen = _doc.length.bodylen;
2509        doc->meta = _doc.meta;
2510        doc->body = _doc.body;
2511        doc->deleted = _doc.length.flag & DOCIO_DELETED;
2512        doc->size_ondisk = _fdb_get_docsize(_doc.length);
2513        doc->offset = offset;
2514
2515        fdb_assert(doc->seqnum == _doc.seqnum, doc->seqnum, _doc.seqnum);
2516
2517        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2518        return FDB_RESULT_SUCCESS;
2519    }
2520
2521    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2522    return FDB_RESULT_KEY_NOT_FOUND;
2523}
2524
2525// search document metadata using sequence number
2526LIBFDB_API
2527fdb_status fdb_get_metaonly_byseq(fdb_kvs_handle *handle, fdb_doc *doc)
2528{
2529    uint64_t offset;
2530    struct docio_object _doc;
2531    struct docio_handle *dhandle;
2532    struct filemgr *wal_file = NULL;
2533    fdb_status wr;
2534    btree_result br = BTREE_RESULT_FAIL;
2535    fdb_seqnum_t _seqnum;
2536    fdb_txn *txn = handle->fhandle->root->txn;
2537
2538    if (!handle || !doc || doc->seqnum == SEQNUM_NOT_USED) {
2539        return FDB_RESULT_INVALID_ARGS;
2540    }
2541
2542    // Sequence trees are a must for byseq operations
2543    if (handle->config.seqtree_opt != FDB_SEQTREE_USE) {
2544        return FDB_RESULT_INVALID_CONFIG;
2545    }
2546
2547    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2548        return FDB_RESULT_HANDLE_BUSY;
2549    }
2550
2551    if (!handle->shandle) {
2552        fdb_check_file_reopen(handle, NULL);
2553        fdb_sync_db_header(handle);
2554
2555        wal_file = handle->file;
2556        dhandle = handle->dhandle;
2557
2558        if (!txn) {
2559            txn = &wal_file->global_txn;
2560        }
2561        // prevent searching by key in WAL if 'doc' is not empty
2562        size_t key_len = doc->keylen;
2563        doc->keylen = 0;
2564        if (handle->kvs) {
2565            wr = wal_find_kv_id(txn, wal_file, handle->kvs->id, doc, &offset);
2566        } else {
2567            wr = wal_find(txn, wal_file, doc, &offset);
2568        }
2569        doc->keylen = key_len;
2570    } else {
2571        wr = snap_find(handle->shandle, doc, &offset);
2572        dhandle = handle->dhandle;
2573    }
2574
2575    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2576
2577    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2578        bool locked = _fdb_sync_dirty_root(handle);
2579
2580        _seqnum = _endian_encode(doc->seqnum);
2581        if (handle->kvs) {
2582            int size_id, size_seq;
2583            uint8_t *kv_seqnum;
2584            hbtrie_result hr;
2585            fdb_kvs_id_t _kv_id;
2586
2587            _kv_id = _endian_encode(handle->kvs->id);
2588            size_id = sizeof(fdb_kvs_id_t);
2589            size_seq = sizeof(fdb_seqnum_t);
2590            kv_seqnum = alca(uint8_t, size_id + size_seq);
2591            memcpy(kv_seqnum, &_kv_id, size_id);
2592            memcpy(kv_seqnum + size_id, &_seqnum, size_seq);
2593            hr = hbtrie_find(handle->seqtrie, (void *)kv_seqnum,
2594                             size_id + size_seq, (void *)&offset);
2595            br = (hr == HBTRIE_RESULT_SUCCESS)?(BTREE_RESULT_SUCCESS):(br);
2596        } else {
2597            br = btree_find(handle->seqtree, (void *)&_seqnum, (void *)&offset);
2598        }
2599        btreeblk_end(handle->bhandle);
2600        offset = _endian_decode(offset);
2601
2602        if (locked) {
2603            filemgr_mutex_unlock(handle->file);
2604        }
2605    }
2606
2607    if (wr == FDB_RESULT_SUCCESS || br != BTREE_RESULT_FAIL) {
2608        if (!handle->kvs) { // single KVS mode
2609            _doc.key = doc->key;
2610            _doc.length.keylen = doc->keylen;
2611        } else {
2612            _doc.key = NULL;
2613        }
2614        _doc.meta = doc->meta;
2615        _doc.body = doc->body;
2616
2617        uint64_t body_offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
2618                                                       true);
2619        if (body_offset == offset) {
2620            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2621            return FDB_RESULT_KEY_NOT_FOUND;
2622        }
2623
2624        if (handle->kvs) {
2625            int size_chunk = handle->config.chunksize;
2626            doc->keylen = _doc.length.keylen - size_chunk;
2627            if (doc->key) { // doc->key is given by user
2628                memcpy(doc->key, (uint8_t*)_doc.key + size_chunk, doc->keylen);
2629                free_docio_object(&_doc, 1, 0, 0);
2630            } else {
2631                doc->key = _doc.key;
2632                memmove(doc->key, (uint8_t*)doc->key + size_chunk, doc->keylen);
2633            }
2634        } else {
2635            doc->keylen = _doc.length.keylen;
2636            doc->key = _doc.key;
2637        }
2638        doc->metalen = _doc.length.metalen;
2639        doc->bodylen = _doc.length.bodylen;
2640        doc->meta = _doc.meta;
2641        doc->body = _doc.body;
2642        doc->deleted = _doc.length.flag & DOCIO_DELETED;
2643        doc->size_ondisk = _fdb_get_docsize(_doc.length);
2644        doc->offset = offset;
2645
2646        fdb_assert(doc->seqnum == _doc.seqnum, doc->seqnum, _doc.seqnum);
2647
2648        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2649        return FDB_RESULT_SUCCESS;
2650    }
2651
2652    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2653    return FDB_RESULT_KEY_NOT_FOUND;
2654}
2655
2656static uint8_t equal_docs(fdb_doc *doc, struct docio_object *_doc) {
2657    uint8_t rv = 1;
2658    // Compare a seq num if seq tree is enabled.
2659    if (doc->seqnum != SEQNUM_NOT_USED) {
2660        if (doc->seqnum != _doc->seqnum) {
2661            free(_doc->key);
2662            free(_doc->meta);
2663            free(_doc->body);
2664            _doc->key = _doc->meta = _doc->body = NULL;
2665            rv = 0;
2666        }
2667    } else { // Compare key and metadata
2668        if ((doc->key && memcmp(doc->key, _doc->key, doc->keylen)) ||
2669            (doc->meta && memcmp(doc->meta, _doc->meta, doc->metalen))) {
2670            free(_doc->key);
2671            free(_doc->meta);
2672            free(_doc->body);
2673            _doc->key = _doc->meta = _doc->body = NULL;
2674            rv = 0;
2675        }
2676    }
2677    return rv;
2678}
2679
2680INLINE void _remove_kv_id(fdb_kvs_handle *handle, struct docio_object *doc)
2681{
2682    size_t size_chunk = handle->config.chunksize;
2683    doc->length.keylen -= size_chunk;
2684    memmove(doc->key, (uint8_t*)doc->key + size_chunk, doc->length.keylen);
2685}
2686
2687// Retrieve a doc's metadata and body with a given doc offset in the database file.
2688LIBFDB_API
2689fdb_status fdb_get_byoffset(fdb_kvs_handle *handle, fdb_doc *doc)
2690{
2691    uint64_t offset = doc->offset;
2692    struct docio_object _doc;
2693
2694    if (!offset) {
2695        return FDB_RESULT_INVALID_ARGS;
2696    }
2697
2698    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2699        return FDB_RESULT_HANDLE_BUSY;
2700    }
2701
2702    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2703    memset(&_doc, 0, sizeof(struct docio_object));
2704
2705    uint64_t _offset = docio_read_doc(handle->dhandle, offset, &_doc, true);
2706    if (_offset == offset) {
2707        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2708        return FDB_RESULT_KEY_NOT_FOUND;
2709    } else {
2710        if (handle->kvs) {
2711            fdb_kvs_id_t kv_id;
2712            buf2kvid(handle->config.chunksize, _doc.key, &kv_id);
2713            if (kv_id != handle->kvs->id) {
2714                fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2715                free_docio_object(&_doc, 1, 1, 1);
2716                return FDB_RESULT_KEY_NOT_FOUND;
2717            }
2718            _remove_kv_id(handle, &_doc);
2719        }
2720        if (!equal_docs(doc, &_doc)) {
2721            free_docio_object(&_doc, 1, 1, 1);
2722            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2723            return FDB_RESULT_KEY_NOT_FOUND;
2724        }
2725    }
2726
2727    doc->seqnum = _doc.seqnum;
2728    doc->keylen = _doc.length.keylen;
2729    doc->metalen = _doc.length.metalen;
2730    doc->bodylen = _doc.length.bodylen;
2731    if (doc->key) {
2732        free(_doc.key);
2733    } else {
2734        doc->key = _doc.key;
2735    }
2736    if (doc->meta) {
2737        free(_doc.meta);
2738    } else {
2739        doc->meta = _doc.meta;
2740    }
2741    if (doc->body) {
2742        if (_doc.length.bodylen > 0) {
2743            memcpy(doc->body, _doc.body, _doc.length.bodylen);
2744        }
2745        free(_doc.body);
2746    } else {
2747        doc->body = _doc.body;
2748    }
2749    doc->deleted = _doc.length.flag & DOCIO_DELETED;
2750    doc->size_ondisk = _fdb_get_docsize(_doc.length);
2751    if (handle->kvs) {
2752        // Since _doc.length was adjusted in _remove_kv_id(),
2753        // we need to compensate it.
2754        doc->size_ondisk += handle->config.chunksize;
2755    }
2756
2757    if (_doc.length.flag & DOCIO_DELETED) {
2758        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2759        return FDB_RESULT_KEY_NOT_FOUND;
2760    }
2761
2762    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2763    return FDB_RESULT_SUCCESS;
2764}
2765
2766INLINE uint64_t _fdb_get_wal_threshold(fdb_kvs_handle *handle)
2767{
2768    return handle->config.wal_threshold;
2769}
2770
2771LIBFDB_API
2772fdb_status fdb_set(fdb_kvs_handle *handle, fdb_doc *doc)
2773{
2774    uint64_t offset;
2775    struct docio_object _doc;
2776    struct filemgr *file;
2777    struct docio_handle *dhandle;
2778    struct timeval tv;
2779    bool txn_enabled = false;
2780    bool sub_handle = false;
2781    bool wal_flushed = false;
2782    file_status_t fstatus;
2783    fdb_txn *txn = handle->fhandle->root->txn;
2784    fdb_status wr = FDB_RESULT_SUCCESS;
2785
2786    if (handle->config.flags & FDB_OPEN_FLAG_RDONLY) {
2787        return fdb_log(&handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
2788                       "Warning: SET is not allowed on the read-only DB file '%s'.",
2789                       handle->file->filename);
2790    }
2791
2792    if ( doc->key == NULL || doc->keylen == 0 ||
2793        doc->keylen > FDB_MAX_KEYLEN ||
2794        (doc->metalen > 0 && doc->meta == NULL) ||
2795        (doc->bodylen > 0 && doc->body == NULL) ||
2796        (handle->kvs_config.custom_cmp &&
2797            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2798        return FDB_RESULT_INVALID_ARGS;
2799    }
2800
2801    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2802        return FDB_RESULT_HANDLE_BUSY;
2803    }
2804
2805    _doc.length.keylen = doc->keylen;
2806    _doc.length.metalen = doc->metalen;
2807    _doc.length.bodylen = doc->deleted ? 0 : doc->bodylen;
2808    _doc.key = doc->key;
2809    _doc.meta = doc->meta;
2810    _doc.body = doc->deleted ? NULL : doc->body;
2811
2812    if (handle->kvs) {
2813        // multi KV instance mode
2814        // allocate more (temporary) space for key, to store ID number
2815        int size_chunk = handle->config.chunksize;
2816        _doc.length.keylen = doc->keylen + size_chunk;
2817        _doc.key = alca(uint8_t, _doc.length.keylen);
2818        // copy ID
2819        kvid2buf(size_chunk, handle->kvs->id, _doc.key);
2820        // copy key
2821        memcpy((uint8_t*)_doc.key + size_chunk, doc->key, doc->keylen);
2822
2823        if (handle->kvs->type == KVS_SUB) {
2824            sub_handle = true;
2825        } else {
2826            sub_handle = false;
2827        }
2828    }
2829
2830fdb_set_start:
2831    fdb_check_file_reopen(handle, NULL);
2832    filemgr_mutex_lock(handle->file);
2833    fdb_sync_db_header(handle);
2834
2835    if (filemgr_is_rollback_on(handle->file)) {
2836        filemgr_mutex_unlock(handle->file);
2837        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2838        return FDB_RESULT_FAIL_BY_ROLLBACK;
2839    }
2840
2841    file = handle->file;
2842    dhandle = handle->dhandle;
2843
2844    fstatus = filemgr_get_file_status(file);
2845    if (fstatus == FILE_REMOVED_PENDING) {
2846        // we must not write into this file
2847        // file status was changed by other thread .. start over
2848        filemgr_mutex_unlock(file);
2849        goto fdb_set_start;
2850    }
2851
2852    if (sub_handle) {
2853        // multiple KV instance mode AND sub handle
2854        handle->seqnum = fdb_kvs_get_seqnum(file, handle->kvs->id) + 1;
2855        fdb_kvs_set_seqnum(file, handle->kvs->id, handle->seqnum);
2856    } else {
2857        // super handle OR single KV instance mode
2858        handle->seqnum = filemgr_get_seqnum(file) + 1;
2859        filemgr_set_seqnum(file, handle->seqnum);
2860    }
2861    _doc.seqnum = doc->seqnum = handle->seqnum;
2862
2863    if (doc->deleted) {
2864        // set timestamp
2865        gettimeofday(&tv, NULL);
2866        _doc.timestamp = (timestamp_t)tv.tv_sec;
2867    } else {
2868        _doc.timestamp = 0;
2869    }
2870
2871    if (txn) {
2872        txn_enabled = true;
2873    }
2874
2875    offset = docio_append_doc(dhandle, &_doc, doc->deleted, txn_enabled);
2876    if (offset == BLK_NOT_FOUND) {
2877        filemgr_mutex_unlock(file);
2878        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2879        return FDB_RESULT_WRITE_FAIL;
2880    }
2881
2882    doc->size_ondisk = _fdb_get_docsize(_doc.length);
2883    doc->offset = offset;
2884    if (!txn) {
2885        txn = &file->global_txn;
2886    }
2887    if (handle->kvs) {
2888        // multi KV instance mode
2889        fdb_doc kv_ins_doc = *doc;
2890        kv_ins_doc.key = _doc.key;
2891        kv_ins_doc.keylen = _doc.length.keylen;
2892        wal_insert(txn, file, &kv_ins_doc, offset, 0);
2893    } else {
2894        wal_insert(txn, file, doc, offset, 0);
2895    }
2896
2897    if (wal_get_dirty_status(file)== FDB_WAL_CLEAN) {
2898        wal_set_dirty_status(file, FDB_WAL_DIRTY);
2899    }
2900
2901    if (handle->config.wal_flush_before_commit ||
2902         handle->config.auto_commit) {
2903        bid_t dirty_idtree_root, dirty_seqtree_root;
2904
2905        if (!txn_enabled) {
2906            handle->dirty_updates = 1;
2907        }
2908
2909        // MUST ensure that 'file' is always 'handle->file',
2910        // because this routine will not be executed during compaction.
2911        filemgr_get_dirty_root(file, &dirty_idtree_root, &dirty_seqtree_root);
2912
2913        // other concurrent writer flushed WAL before commit,
2914        // sync root node of each tree
2915        if (dirty_idtree_root != BLK_NOT_FOUND) {
2916            handle->trie->root_bid = dirty_idtree_root;
2917        }
2918        if (handle->config.seqtree_opt == FDB_SEQTREE_USE &&
2919            dirty_seqtree_root != BLK_NOT_FOUND) {
2920            if (handle->kvs) {
2921                handle->seqtrie->root_bid = dirty_seqtree_root;
2922            } else {
2923                btree_init_from_bid(handle->seqtree,
2924                                    handle->seqtree->blk_handle,
2925                                    handle->seqtree->blk_ops,
2926                                    handle->seqtree->kv_ops,
2927                                    handle->seqtree->blksize,
2928                                    dirty_seqtree_root);
2929            }
2930        }
2931
2932        if (wal_get_num_flushable(file) > _fdb_get_wal_threshold(handle)) {
2933            struct avl_tree flush_items;
2934
2935            // discard all cached writable blocks
2936            // to avoid data inconsistency with other writers
2937            btreeblk_discard_blocks(handle->bhandle);
2938
2939            // commit only for non-transactional WAL entries
2940            wr = wal_commit(&file->global_txn, file, NULL, &handle->log_callback);
2941            if (wr != FDB_RESULT_SUCCESS) {
2942                filemgr_mutex_unlock(file);
2943                fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0),
2944                           1, 0);
2945                return wr;
2946            }
2947            wr = wal_flush(file, (void *)handle,
2948                      _fdb_wal_flush_func, _fdb_wal_get_old_offset,
2949                      &flush_items);
2950            if (wr != FDB_RESULT_SUCCESS) {
2951                filemgr_mutex_unlock(file);
2952                fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0),
2953                           1, 0);
2954                return wr;
2955            }
2956            wal_set_dirty_status(file, FDB_WAL_PENDING);
2957            // it is ok to release flushed items becuase
2958            // these items are not actually committed yet.
2959            // they become visible after fdb_commit is invoked.
2960            wal_release_flushed_items(file, &flush_items);
2961
2962            // sync new root node
2963            dirty_idtree_root = handle->trie->root_bid;
2964            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2965                if (handle->kvs) {
2966                    dirty_seqtree_root = handle->seqtrie->root_bid;
2967                } else {
2968                    dirty_seqtree_root = handle->seqtree->root_bid;
2969                }
2970            }
2971            filemgr_set_dirty_root(file,
2972                                   dirty_idtree_root,
2973                                   dirty_seqtree_root);
2974
2975            wal_flushed = true;
2976            btreeblk_reset_subblock_info(handle->bhandle);
2977        }
2978    }
2979
2980    filemgr_mutex_unlock(file);
2981
2982    if (!doc->deleted) {
2983        atomic_incr_uint64_t(&handle->op_stats->num_sets);
2984    }
2985
2986    if (wal_flushed && handle->config.auto_commit) {
2987        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2988        return fdb_commit(handle->fhandle, FDB_COMMIT_NORMAL);
2989    }
2990    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2991    return FDB_RESULT_SUCCESS;
2992}
2993
2994LIBFDB_API
2995fdb_status fdb_del(fdb_kvs_handle *handle, fdb_doc *doc)
2996{
2997    if (handle->config.flags & FDB_OPEN_FLAG_RDONLY) {
2998        return fdb_log(&handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
2999                       "Warning: DEL is not allowed on the read-only DB file '%s'.",
3000                       handle->file->filename);
3001    }
3002
3003    if (doc->key == NULL || doc->keylen == 0 ||
3004        doc->keylen > FDB_MAX_KEYLEN ||
3005        (handle->kvs_config.custom_cmp &&
3006            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
3007        return FDB_RESULT_INVALID_ARGS;
3008    }
3009
3010    doc->deleted = true;
3011    fdb_doc _doc;
3012    _doc = *doc;
3013    _doc.bodylen = 0;
3014    _doc.body = NULL;
3015
3016    atomic_incr_uint64_t(&handle->op_stats->num_dels);
3017
3018    return fdb_set(handle, &_doc);
3019}
3020
3021static uint64_t _fdb_export_header_flags(fdb_kvs_handle *handle)
3022{
3023    uint64_t rv = 0;
3024    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
3025        // seq tree is used
3026        rv |= FDB_FLAG_SEQTREE_USE;
3027    }
3028    if (handle->fhandle->flags & FHANDLE_ROOT_INITIALIZED) {
3029        // the default KVS is once opened
3030        rv |= FDB_FLAG_ROOT_INITIALIZED;
3031    }
3032    if (handle->fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP) {
3033        // the default KVS is based on custom key order
3034        rv |= FDB_FLAG_ROOT_CUSTOM_CMP;
3035    }
3036    return rv;
3037}
3038
3039uint64_t fdb_set_file_header(fdb_kvs_handle *handle)
3040{
3041    /*
3042    <ForestDB header>
3043    [offset]: (description)
3044    [     0]: BID of root node of root B+Tree of HB+Trie: 8 bytes
3045    [     8]: BID of root node of seq B+Tree: 8 bytes (0xFF.. if not used)
3046    [    16]: # of live documents: 8 bytes
3047    [    24]: # of live B+Tree nodes: 8 bytes
3048    [    32]: Data size (byte): 8 bytes
3049    [    40]: BID of the DB header created when last WAL flush: 8 bytes
3050    [    48]: Offset of the document containing KV instances' info: 8 bytes
3051    [    56]: Header flags: 8 bytes
3052    [    64]: Size of newly compacted target file name : 2 bytes
3053    [    66]: Size of old file name before compaction :  2 bytes
3054    [    68]: File name of newly compacted file : x bytes
3055    [  68+x]: File name of old file before compcation : y bytes
3056    [68+x+y]: CRC32: 4 bytes
3057    total size (header's length): 72+x+y bytes
3058
3059    Note: the list of functions that need to be modified
3060          if the header structure is changed:
3061
3062        _fdb_redirect_header() in forestdb.cc
3063        filemgr_destory_file() in filemgr.cc
3064    */
3065    uint8_t *buf = alca(uint8_t, handle->config.blocksize);
3066    uint16_t new_filename_len = 0;
3067    uint16_t old_filename_len = 0;
3068    uint16_t _edn_safe_16;
3069    uint32_t crc;
3070    uint64_t _edn_safe_64;
3071    size_t offset = 0;
3072    struct filemgr *cur_file;
3073    struct kvs_stat stat;
3074
3075    cur_file = handle->file;
3076
3077    // hb+trie or idtree root bid
3078    _edn_safe_64 = _endian_encode(handle->trie->root_bid);
3079    seq_memcpy(buf + offset, &_edn_safe_64, sizeof(handle->trie->root_bid), offset);
3080
3081    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
3082        // b+tree root bid
3083        _edn_safe_64 = _endian_encode(handle->seqtree->root_bid);
3084        seq_memcpy(buf + offset, &_edn_safe_64,
3085            sizeof(handle->seqtree->root_bid), offset);
3086    } else {
3087        memset(buf + offset, 0xff, sizeof(uint64_t));
3088        offset += sizeof(uint64_t);
3089    }
3090
3091    // get stat
3092    _kvs_stat_get(cur_file, 0, &stat);
3093
3094    // # docs
3095    _edn_safe_64 = _endian_encode(stat.ndocs);
3096    seq_memcpy(buf + offset, &_edn_safe_64, sizeof(_edn_safe_64), offset);
3097    // # live nodes
3098    _edn_safe_64 = _endian_encode(stat.nlivenodes);
3099    seq_memcpy(buf + offset, &_edn_safe_64,
3100               sizeof(_edn_safe_64), offset);
3101    // data size
3102    _edn_safe_64 = _endian_encode(stat.datasize);
3103    seq_memcpy(buf + offset, &_edn_safe_64, sizeof(_edn_safe_64), offset);
3104    // last header bid
3105    _edn_safe_64 = _endian_encode(handle->last_wal_flush_hdr_bid);
3106    seq_memcpy(