xref: /4.0.0/forestdb/src/forestdb.cc (revision 606c32b5)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <sys/time.h>
25#endif
26
27#include "libforestdb/forestdb.h"
28#include "fdb_internal.h"
29#include "filemgr.h"
30#include "hbtrie.h"
31#include "list.h"
32#include "btree.h"
33#include "btree_kv.h"
34#include "btree_var_kv_ops.h"
35#include "docio.h"
36#include "btreeblock.h"
37#include "common.h"
38#include "wal.h"
39#include "snapshot.h"
40#include "filemgr_ops.h"
41#include "configuration.h"
42#include "internal_types.h"
43#include "compactor.h"
44#include "memleak.h"
45#include "time_utils.h"
46#include "system_resource_stats.h"
47
48#ifdef __DEBUG
49#ifndef __DEBUG_FDB
50    #undef DBG
51    #undef DBGCMD
52    #undef DBGSW
53    #define DBG(...)
54    #define DBGCMD(...)
55    #define DBGSW(n, ...)
56#endif
57#endif
58
59#ifdef _TRACE_HANDLES
60struct avl_tree open_handles;
61static spin_t open_handle_lock;
62static int _fdb_handle_cmp(struct avl_node *a, struct avl_node *b, void *aux)
63{
64    struct _fdb_kvs_handle *aa, *bb;
65    aa = _get_entry(a, struct _fdb_kvs_handle, avl_trace);
66    bb = _get_entry(b, struct _fdb_kvs_handle, avl_trace);
67    return (aa > bb) ? 1 : -1;
68}
69#endif
70
71static volatile uint8_t fdb_initialized = 0;
72static volatile uint8_t fdb_open_inprog = 0;
73#ifdef SPIN_INITIALIZER
74static spin_t initial_lock = SPIN_INITIALIZER;
75#else
76static volatile unsigned int initial_lock_status = 0;
77static spin_t initial_lock;
78#endif
79
80static fdb_status _fdb_wal_snapshot_func(void *handle, fdb_doc *doc,
81                                         uint64_t offset);
82
83INLINE int _cmp_uint64_t_endian_safe(void *key1, void *key2, void *aux)
84{
85    (void) aux;
86    uint64_t a,b;
87    a = *(uint64_t*)key1;
88    b = *(uint64_t*)key2;
89    a = _endian_decode(a);
90    b = _endian_decode(b);
91    return _CMP_U64(a, b);
92}
93
94size_t _fdb_readkey_wrap(void *handle, uint64_t offset, void *buf)
95{
96    keylen_t keylen;
97    offset = _endian_decode(offset);
98    docio_read_doc_key((struct docio_handle *)handle, offset, &keylen, buf);
99    return keylen;
100}
101
102size_t _fdb_readseq_wrap(void *handle, uint64_t offset, void *buf)
103{
104    int size_id, size_seq, size_chunk;
105    fdb_seqnum_t _seqnum;
106    struct docio_object doc;
107    struct docio_handle *dhandle = (struct docio_handle *)handle;
108
109    size_id = sizeof(fdb_kvs_id_t);
110    size_seq = sizeof(fdb_seqnum_t);
111    size_chunk = dhandle->file->config->chunksize;
112    memset(&doc, 0, sizeof(struct docio_object));
113
114    offset = _endian_decode(offset);
115    docio_read_doc_key_meta((struct docio_handle *)handle, offset, &doc,
116                            true);
117    buf2buf(size_chunk, doc.key, size_id, buf);
118    _seqnum = _endian_encode(doc.seqnum);
119    memcpy((uint8_t*)buf + size_id, &_seqnum, size_seq);
120
121    free(doc.key);
122    free(doc.meta);
123
124    return size_id + size_seq;
125}
126
127int _fdb_custom_cmp_wrap(void *key1, void *key2, void *aux)
128{
129    int is_key1_inf, is_key2_inf;
130    uint8_t *keystr1 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
131    uint8_t *keystr2 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
132    size_t keylen1, keylen2;
133    btree_cmp_args *args = (btree_cmp_args *)aux;
134    fdb_custom_cmp_variable cmp = (fdb_custom_cmp_variable)args->aux;
135
136    is_key1_inf = _is_inf_key(key1);
137    is_key2_inf = _is_inf_key(key2);
138    if (is_key1_inf && is_key2_inf) { // both are infinite
139        return 0;
140    } else if (!is_key1_inf && is_key2_inf) { // key2 is infinite
141        return -1;
142    } else if (is_key1_inf && !is_key2_inf) { // key1 is infinite
143        return 1;
144    }
145
146    _get_var_key(key1, (void*)keystr1, &keylen1);
147    _get_var_key(key2, (void*)keystr2, &keylen2);
148
149    if (keylen1 == 0 && keylen2 == 0) {
150        return 0;
151    } else if (keylen1 ==0 && keylen2 > 0) {
152        return -1;
153    } else if (keylen1 > 0 && keylen2 == 0) {
154        return 1;
155    }
156
157    return cmp(keystr1, keylen1, keystr2, keylen2);
158}
159
160void fdb_fetch_header(void *header_buf,
161                      bid_t *trie_root_bid,
162                      bid_t *seq_root_bid,
163                      uint64_t *ndocs,
164                      uint64_t *nlivenodes,
165                      uint64_t *datasize,
166                      uint64_t *last_wal_flush_hdr_bid,
167                      uint64_t *kv_info_offset,
168                      uint64_t *header_flags,
169                      char **new_filename,
170                      char **old_filename)
171{
172    size_t offset = 0;
173    uint16_t new_filename_len;
174    uint16_t old_filename_len;
175
176    seq_memcpy(trie_root_bid, (uint8_t *)header_buf + offset,
177               sizeof(bid_t), offset);
178    *trie_root_bid = _endian_decode(*trie_root_bid);
179
180    seq_memcpy(seq_root_bid, (uint8_t *)header_buf + offset,
181               sizeof(bid_t), offset);
182    *seq_root_bid = _endian_decode(*seq_root_bid);
183
184    seq_memcpy(ndocs, (uint8_t *)header_buf + offset,
185               sizeof(uint64_t), offset);
186    *ndocs = _endian_decode(*ndocs);
187
188    seq_memcpy(nlivenodes, (uint8_t *)header_buf + offset,
189               sizeof(uint64_t), offset);
190    *nlivenodes = _endian_decode(*nlivenodes);
191
192    seq_memcpy(datasize, (uint8_t *)header_buf + offset,
193               sizeof(uint64_t), offset);
194    *datasize = _endian_decode(*datasize);
195
196    seq_memcpy(last_wal_flush_hdr_bid, (uint8_t *)header_buf + offset,
197               sizeof(uint64_t), offset);
198    *last_wal_flush_hdr_bid = _endian_decode(*last_wal_flush_hdr_bid);
199
200    seq_memcpy(kv_info_offset, (uint8_t *)header_buf + offset,
201               sizeof(uint64_t), offset);
202    *kv_info_offset = _endian_decode(*kv_info_offset);
203
204    seq_memcpy(header_flags, (uint8_t *)header_buf + offset,
205               sizeof(uint64_t), offset);
206    *header_flags = _endian_decode(*header_flags);
207
208    seq_memcpy(&new_filename_len, (uint8_t *)header_buf + offset,
209               sizeof(new_filename_len), offset);
210    new_filename_len = _endian_decode(new_filename_len);
211    seq_memcpy(&old_filename_len, (uint8_t *)header_buf + offset,
212               sizeof(old_filename_len), offset);
213    old_filename_len = _endian_decode(old_filename_len);
214    if (new_filename_len) {
215        *new_filename = (char*)((uint8_t *)header_buf + offset);
216    } else {
217        *new_filename = NULL;
218    }
219    offset += new_filename_len;
220    if (old_filename && old_filename_len) {
221        *old_filename = (char *) malloc(old_filename_len);
222        seq_memcpy(*old_filename,
223                   (uint8_t *)header_buf + offset,
224                   old_filename_len, offset);
225    }
226}
227
228typedef enum {
229    FDB_RESTORE_NORMAL,
230    FDB_RESTORE_KV_INS,
231} fdb_restore_mode_t;
232
233INLINE void _fdb_restore_wal(fdb_kvs_handle *handle,
234                             fdb_restore_mode_t mode,
235                             bid_t hdr_bid,
236                             fdb_kvs_id_t kv_id_req)
237{
238    struct filemgr *file = handle->file;
239    uint32_t blocksize = handle->file->blocksize;
240    uint64_t last_wal_flush_hdr_bid = handle->last_wal_flush_hdr_bid;
241    uint64_t hdr_off = hdr_bid * FDB_BLOCKSIZE;
242    uint64_t offset = 0; //assume everything from first block needs restoration
243    err_log_callback *log_callback;
244
245    if (!hdr_off) { // Nothing to do if we don't have a header block offset
246        return;
247    }
248
249    if (last_wal_flush_hdr_bid != BLK_NOT_FOUND) {
250        offset = (last_wal_flush_hdr_bid + 1) * blocksize;
251    }
252
253    // If a valid last header was retrieved and it matches the current header
254    // OR if WAL already had entries populated, then no crash recovery needed
255    if (hdr_off <= offset ||
256        (!handle->shandle && wal_get_size(file) &&
257            mode != FDB_RESTORE_KV_INS)) {
258        return;
259    }
260
261    // Temporarily disable the error logging callback as there are false positive
262    // checksum errors in docio_read_doc.
263    // TODO: Need to adapt docio_read_doc to separate false checksum errors.
264    log_callback = handle->dhandle->log_callback;
265    handle->dhandle->log_callback = NULL;
266
267    if (!handle->shandle) {
268        filemgr_mutex_lock(file);
269    }
270    for (; offset < hdr_off;
271        offset = ((offset / blocksize) + 1) * blocksize) { // next block's off
272        if (!docio_check_buffer(handle->dhandle, offset / blocksize)) {
273            continue;
274        } else {
275            do {
276                struct docio_object doc;
277                uint64_t _offset;
278                uint64_t doc_offset;
279                memset(&doc, 0, sizeof(doc));
280                _offset = docio_read_doc(handle->dhandle, offset, &doc, true);
281                if (_offset == offset) { // reached unreadable doc, skip block
282                    break;
283                }
284                if (doc.key || (doc.length.flag & DOCIO_TXN_COMMITTED)) {
285                    // check if the doc is transactional or not, and
286                    // also check if the doc contains system info
287                    if (!(doc.length.flag & DOCIO_TXN_DIRTY) &&
288                        !(doc.length.flag & DOCIO_SYSTEM)) {
289                        if (doc.length.flag & DOCIO_TXN_COMMITTED) {
290                            // commit mark .. read doc offset
291                            doc_offset = doc.doc_offset;
292                            // read the previously skipped doc
293                            docio_read_doc(handle->dhandle, doc_offset, &doc, true);
294                            if (doc.key == NULL) { // doc read error
295                                free(doc.meta);
296                                free(doc.body);
297                                offset = _offset;
298                                continue;
299                            }
300                        } else {
301                            doc_offset = offset;
302                        }
303
304                        // If say a snapshot is taken on a db handle after
305                        // rollback, then skip WAL items after rollback point
306                        if (handle->config.seqtree_opt == FDB_SEQTREE_USE &&
307                            (mode == FDB_RESTORE_KV_INS || !handle->kvs) &&
308                            doc.seqnum > handle->seqnum) {
309                            free(doc.key);
310                            free(doc.meta);
311                            free(doc.body);
312                            offset = _offset;
313                            continue;
314                        }
315
316                        // restore document
317                        fdb_doc wal_doc;
318                        wal_doc.keylen = doc.length.keylen;
319                        wal_doc.bodylen = doc.length.bodylen;
320                        wal_doc.key = doc.key;
321                        wal_doc.seqnum = doc.seqnum;
322                        wal_doc.deleted = doc.length.flag & DOCIO_DELETED;
323
324                        if (!handle->shandle) {
325                            wal_doc.metalen = doc.length.metalen;
326                            wal_doc.meta = doc.meta;
327                            wal_doc.size_ondisk = _fdb_get_docsize(doc.length);
328
329                            if (handle->kvs) {
330                                // check seqnum before insert
331                                fdb_kvs_id_t kv_id;
332                                fdb_seqnum_t kv_seqnum;
333                                buf2kvid(handle->config.chunksize,
334                                         wal_doc.key, &kv_id);
335
336                                if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
337                                    kv_seqnum = fdb_kvs_get_seqnum(handle->file, kv_id);
338                                } else {
339                                    kv_seqnum = SEQNUM_NOT_USED;
340                                }
341                                if (doc.seqnum <= kv_seqnum &&
342                                        ((mode == FDB_RESTORE_KV_INS &&
343                                            kv_id == kv_id_req) ||
344                                         (mode == FDB_RESTORE_NORMAL)) ) {
345                                    // if mode is NORMAL, restore all items
346                                    // if mode is KV_INS, restore items matching ID
347                                    wal_insert(&file->global_txn, file,
348                                               &wal_doc, doc_offset, 0);
349                                }
350                            } else {
351                                wal_insert(&file->global_txn, file,
352                                           &wal_doc, doc_offset, 0);
353                            }
354                            if (doc.key) free(doc.key);
355                        } else {
356                            // snapshot
357                            if (handle->kvs) {
358                                fdb_kvs_id_t kv_id;
359                                buf2kvid(handle->config.chunksize,
360                                         wal_doc.key, &kv_id);
361                                if (kv_id == handle->kvs->id) {
362                                    // snapshot: insert ID matched documents only
363                                    snap_insert(handle->shandle,
364                                                &wal_doc, doc_offset);
365                                } else {
366                                    free(doc.key);
367                                }
368                            } else {
369                                snap_insert(handle->shandle, &wal_doc, doc_offset);
370                            }
371                        }
372                        free(doc.meta);
373                        free(doc.body);
374                        offset = _offset;
375                    } else {
376                        // skip transactional document or system document
377                        free(doc.key);
378                        free(doc.meta);
379                        free(doc.body);
380                        offset = _offset;
381                        // do not break.. read next doc
382                    }
383                } else {
384                    free(doc.key);
385                    free(doc.meta);
386                    free(doc.body);
387                    offset = _offset;
388                    break;
389                }
390            } while (offset + sizeof(struct docio_length) < hdr_off);
391        }
392    }
393    // wal commit
394    if (!handle->shandle) {
395        wal_commit(&file->global_txn, file, NULL, &handle->log_callback);
396        filemgr_mutex_unlock(file);
397    }
398    handle->dhandle->log_callback = log_callback;
399}
400
401INLINE fdb_status _fdb_recover_compaction(fdb_kvs_handle *handle,
402                                          const char *new_filename)
403{
404    fdb_kvs_handle new_db;
405    fdb_config config = handle->config;
406    struct filemgr *new_file;
407
408    memset(&new_db, 0, sizeof(new_db));
409    new_db.log_callback.callback = handle->log_callback.callback;
410    new_db.log_callback.ctx_data = handle->log_callback.ctx_data;
411    config.flags |= FDB_OPEN_FLAG_RDONLY;
412    new_db.fhandle = handle->fhandle;
413    new_db.kvs_config = handle->kvs_config;
414    fdb_status status = _fdb_open(&new_db, new_filename,
415                                  FDB_AFILENAME, &config);
416    if (status != FDB_RESULT_SUCCESS) {
417        return fdb_log(&handle->log_callback, status,
418                       "Error in opening a partially compacted file '%s' for recovery.",
419                       new_filename);
420    }
421
422    new_file = new_db.file;
423
424    if (new_file->old_filename &&
425        !strncmp(new_file->old_filename, handle->file->filename,
426                 FDB_MAX_FILENAME_LEN)) {
427        struct filemgr *old_file = handle->file;
428        // If new file has a recorded old_filename then it means that
429        // compaction has completed successfully. Mark self for deletion
430        filemgr_mutex_lock(new_file);
431
432        status = btreeblk_end(handle->bhandle);
433        if (status != FDB_RESULT_SUCCESS) {
434            filemgr_mutex_unlock(new_file);
435            _fdb_close(&new_db);
436            return status;
437        }
438        btreeblk_free(handle->bhandle);
439        free(handle->bhandle);
440        handle->bhandle = new_db.bhandle;
441
442        docio_free(handle->dhandle);
443        free(handle->dhandle);
444        handle->dhandle = new_db.dhandle;
445
446        hbtrie_free(handle->trie);
447        free(handle->trie);
448        handle->trie = new_db.trie;
449
450        wal_shutdown(handle->file);
451        handle->file = new_file;
452
453        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
454            if (handle->kvs) {
455                // multi KV instance mode
456                hbtrie_free(handle->seqtrie);
457                free(handle->seqtrie);
458                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
459                    handle->seqtrie = new_db.seqtrie;
460                }
461            } else {
462                free(handle->seqtree->kv_ops);
463                free(handle->seqtree);
464                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
465                    handle->seqtree = new_db.seqtree;
466                }
467            }
468        }
469
470        filemgr_mutex_unlock(new_file);
471        if (new_db.kvs) {
472            fdb_kvs_info_free(&new_db);
473        }
474        // remove self: WARNING must not close this handle if snapshots
475        // are yet to open this file
476        filemgr_remove_pending(old_file, new_db.file);
477        filemgr_close(old_file, 0, handle->filename, &handle->log_callback);
478        free(new_db.filename);
479        return FDB_RESULT_FAIL_BY_COMPACTION;
480    }
481
482    // As the new file is partially compacted, it should be removed upon close.
483    // Just in-case the new file gets opened before removal, point it to the old
484    // file to ensure availability of data.
485    filemgr_remove_pending(new_db.file, handle->file);
486    _fdb_close(&new_db);
487
488    return FDB_RESULT_SUCCESS;
489}
490
491LIBFDB_API
492fdb_status fdb_init(fdb_config *config)
493{
494    fdb_config _config;
495    compactor_config c_config;
496    struct filemgr_config f_config;
497
498    if (config) {
499        if (validate_fdb_config(config)) {
500            _config = *config;
501        } else {
502            return FDB_RESULT_INVALID_CONFIG;
503        }
504    } else {
505        _config = get_default_config();
506    }
507
508    // global initialization
509    // initialized only once at first time
510    if (!fdb_initialized) {
511#ifdef _TRACE_HANDLES
512        spin_init(&open_handle_lock);
513        avl_init(&open_handles, NULL);
514#endif
515
516#ifndef SPIN_INITIALIZER
517        // Note that only Windows passes through this routine
518        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
519            // atomically initialize spin lock only once
520            spin_init(&initial_lock);
521            initial_lock_status = 2;
522        } else {
523            // the others .. wait until initializing 'initial_lock' is done
524            while (initial_lock_status != 2) {
525                Sleep(1);
526            }
527        }
528#endif
529
530    }
531    spin_lock(&initial_lock);
532    if (!fdb_initialized) {
533        double ram_size = (double) get_memory_size();
534        if (ram_size * BCACHE_MEMORY_THRESHOLD < (double) _config.buffercache_size) {
535            spin_unlock(&initial_lock);
536            return FDB_RESULT_TOO_BIG_BUFFER_CACHE;
537        }
538        // initialize file manager and block cache
539        f_config.blocksize = _config.blocksize;
540        f_config.ncacheblock = _config.buffercache_size / _config.blocksize;
541        filemgr_init(&f_config);
542        filemgr_set_lazy_file_deletion(true,
543                                       compactor_register_file_removing,
544                                       compactor_is_file_removed);
545
546        // initialize compaction daemon
547        c_config.sleep_duration = _config.compactor_sleep_duration;
548        c_config.num_threads = _config.num_compactor_threads;
549        compactor_init(&c_config);
550
551        fdb_initialized = 1;
552    }
553    fdb_open_inprog++;
554    spin_unlock(&initial_lock);
555
556    return FDB_RESULT_SUCCESS;
557}
558
559LIBFDB_API
560fdb_config fdb_get_default_config(void) {
561    return get_default_config();
562}
563
564LIBFDB_API
565fdb_kvs_config fdb_get_default_kvs_config(void) {
566    return get_default_kvs_config();
567}
568
569LIBFDB_API
570fdb_status fdb_open(fdb_file_handle **ptr_fhandle,
571                    const char *filename,
572                    fdb_config *fconfig)
573{
574#ifdef _MEMPOOL
575    mempool_init();
576#endif
577
578    fdb_config config;
579    fdb_file_handle *fhandle;
580    fdb_kvs_handle *handle;
581
582    if (fconfig) {
583        if (validate_fdb_config(fconfig)) {
584            config = *fconfig;
585        } else {
586            return FDB_RESULT_INVALID_CONFIG;
587        }
588    } else {
589        config = get_default_config();
590    }
591
592    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
593    if (!fhandle) { // LCOV_EXCL_START
594        return FDB_RESULT_ALLOC_FAIL;
595    } // LCOV_EXCL_STOP
596
597    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
598    if (!handle) { // LCOV_EXCL_START
599        free(fhandle);
600        return FDB_RESULT_ALLOC_FAIL;
601    } // LCOV_EXCL_STOP
602
603    atomic_init_uint8_t(&handle->handle_busy, 0);
604    handle->shandle = NULL;
605    handle->kvs_config = get_default_kvs_config();
606
607    fdb_status fs = fdb_init(fconfig);
608    if (fs != FDB_RESULT_SUCCESS) {
609        free(handle);
610        free(fhandle);
611        return fs;
612    }
613    fdb_file_handle_init(fhandle, handle);
614
615    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
616    if (fs == FDB_RESULT_SUCCESS) {
617        *ptr_fhandle = fhandle;
618    } else {
619        *ptr_fhandle = NULL;
620        free(handle);
621        fdb_file_handle_free(fhandle);
622    }
623    spin_lock(&initial_lock);
624    fdb_open_inprog--;
625    spin_unlock(&initial_lock);
626    return fs;
627}
628
629LIBFDB_API
630fdb_status fdb_open_custom_cmp(fdb_file_handle **ptr_fhandle,
631                               const char *filename,
632                               fdb_config *fconfig,
633                               size_t num_functions,
634                               char **kvs_names,
635                               fdb_custom_cmp_variable *functions)
636{
637#ifdef _MEMPOOL
638    mempool_init();
639#endif
640
641    fdb_config config;
642    fdb_file_handle *fhandle;
643    fdb_kvs_handle *handle;
644
645    if (fconfig) {
646        if (validate_fdb_config(fconfig)) {
647            config = *fconfig;
648        } else {
649            return FDB_RESULT_INVALID_CONFIG;
650        }
651    } else {
652        config = get_default_config();
653    }
654
655    if (config.multi_kv_instances == false) {
656        // single KV instance mode does not support customized cmp function
657        return FDB_RESULT_INVALID_CONFIG;
658    }
659
660    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
661    if (!fhandle) { // LCOV_EXCL_START
662        return FDB_RESULT_ALLOC_FAIL;
663    } // LCOV_EXCL_STOP
664
665    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
666    if (!handle) { // LCOV_EXCL_START
667        free(fhandle);
668        return FDB_RESULT_ALLOC_FAIL;
669    } // LCOV_EXCL_STOP
670
671    atomic_init_uint8_t(&handle->handle_busy, 0);
672    handle->shandle = NULL;
673    handle->kvs_config = get_default_kvs_config();
674
675    fdb_status fs = fdb_init(fconfig);
676    if (fs != FDB_RESULT_SUCCESS) {
677        free(handle);
678        free(fhandle);
679        return fs;
680    }
681    fdb_file_handle_init(fhandle, handle);
682
683    // insert kvs_names and functions into fhandle's list
684    fdb_file_handle_parse_cmp_func(fhandle, num_functions,
685                                   kvs_names, functions);
686
687    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
688    if (fs == FDB_RESULT_SUCCESS) {
689        *ptr_fhandle = fhandle;
690    } else {
691        *ptr_fhandle = NULL;
692        free(handle);
693        fdb_file_handle_free(fhandle);
694    }
695    spin_lock(&initial_lock);
696    fdb_open_inprog--;
697    spin_unlock(&initial_lock);
698    return fs;
699}
700
701fdb_status fdb_open_for_compactor(fdb_file_handle **ptr_fhandle,
702                                  const char *filename,
703                                  fdb_config *fconfig,
704                                  struct list *cmp_func_list)
705{
706#ifdef _MEMPOOL
707    mempool_init();
708#endif
709
710    fdb_file_handle *fhandle;
711    fdb_kvs_handle *handle;
712
713    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
714    if (!fhandle) { // LCOV_EXCL_START
715        return FDB_RESULT_ALLOC_FAIL;
716    } // LCOV_EXCL_STOP
717
718    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
719    if (!handle) { // LCOV_EXCL_START
720        free(fhandle);
721        return FDB_RESULT_ALLOC_FAIL;
722    } // LCOV_EXCL_STOP
723
724    atomic_init_uint8_t(&handle->handle_busy, 0);
725    handle->shandle = NULL;
726
727    fdb_file_handle_init(fhandle, handle);
728    if (cmp_func_list) {
729        fdb_file_handle_clone_cmp_func_list(fhandle, cmp_func_list);
730    }
731    fdb_status fs = _fdb_open(handle, filename, FDB_VFILENAME, fconfig);
732    if (fs == FDB_RESULT_SUCCESS) {
733        *ptr_fhandle = fhandle;
734    } else {
735        *ptr_fhandle = NULL;
736        free(handle);
737        fdb_file_handle_free(fhandle);
738    }
739    return fs;
740}
741
742LIBFDB_API
743fdb_status fdb_snapshot_open(fdb_kvs_handle *handle_in,
744                             fdb_kvs_handle **ptr_handle, fdb_seqnum_t seqnum)
745{
746#ifdef _MEMPOOL
747    mempool_init();
748#endif
749
750    fdb_config config = handle_in->config;
751    fdb_kvs_config kvs_config = handle_in->kvs_config;
752    fdb_kvs_handle *handle;
753    fdb_status fs;
754    filemgr *file;
755    file_status_t fstatus = FILE_NORMAL;
756
757    if (!handle_in || !ptr_handle) {
758        return FDB_RESULT_INVALID_ARGS;
759    }
760
761    // Sequence trees are a must for snapshot creation
762    if (handle_in->config.seqtree_opt != FDB_SEQTREE_USE) {
763        return FDB_RESULT_INVALID_CONFIG;
764    }
765
766fdb_snapshot_open_start:
767    if (!handle_in->shandle) {
768        fdb_check_file_reopen(handle_in, &fstatus);
769        fdb_sync_db_header(handle_in);
770        file = handle_in->file;
771
772        if (handle_in->kvs && handle_in->kvs->type == KVS_SUB) {
773            handle_in->seqnum = fdb_kvs_get_seqnum(file,
774                                                   handle_in->kvs->id);
775        } else {
776            handle_in->seqnum = filemgr_get_seqnum(file);
777        }
778    } else {
779        file = handle_in->file;
780    }
781
782    // if the max sequence number seen by this handle is lower than the
783    // requested snapshot marker, it means the snapshot is not yet visible
784    // even via the current fdb_kvs_handle
785    if (seqnum != FDB_SNAPSHOT_INMEM && seqnum > handle_in->seqnum) {
786        return FDB_RESULT_NO_DB_INSTANCE;
787    }
788
789    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
790    if (!handle) { // LCOV_EXCL_START
791        return FDB_RESULT_ALLOC_FAIL;
792    } // LCOV_EXCL_STOP
793
794    atomic_init_uint8_t(&handle->handle_busy, 0);
795    handle->log_callback = handle_in->log_callback;
796    handle->max_seqnum = seqnum;
797    handle->fhandle = handle_in->fhandle;
798
799    config.flags |= FDB_OPEN_FLAG_RDONLY;
800    // do not perform compaction for snapshot
801    config.compaction_mode = FDB_COMPACTION_MANUAL;
802
803    // If cloning an existing snapshot handle, then rewind indexes
804    // to its last DB header and point its avl tree to existing snapshot's tree
805    bool clone_snapshot = false;
806    if (handle_in->shandle) {
807        handle->last_hdr_bid = handle_in->last_hdr_bid; // do fast rewind
808        if (snap_clone(handle_in->shandle, handle_in->max_seqnum,
809                   &handle->shandle, seqnum) == FDB_RESULT_SUCCESS) {
810            handle->max_seqnum = FDB_SNAPSHOT_INMEM; // temp value to skip WAL
811            clone_snapshot = true;
812        }
813    }
814
815    if (!handle->shandle) {
816        handle->shandle = (struct snap_handle *) calloc(1, sizeof(snap_handle));
817        if (!handle->shandle) { // LCOV_EXCL_START
818            free(handle);
819            return FDB_RESULT_ALLOC_FAIL;
820        } // LCOV_EXCL_STOP
821        snap_init(handle->shandle, handle_in);
822    }
823
824    if (handle_in->kvs) {
825        // sub-handle in multi KV instance mode
826        if (clone_snapshot) {
827            fs = _fdb_kvs_clone_snapshot(handle_in, handle);
828        } else {
829            fs = _fdb_kvs_open(handle_in->kvs->root,
830                              &config, &kvs_config, file,
831                              file->filename,
832                              _fdb_kvs_get_name(handle_in, file),
833                              handle);
834        }
835    } else {
836        if (clone_snapshot) {
837            fs = _fdb_clone_snapshot(handle_in, handle);
838        } else {
839            fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
840        }
841    }
842
843    if (fs == FDB_RESULT_SUCCESS) {
844        if (seqnum == FDB_SNAPSHOT_INMEM &&
845            !handle_in->shandle) {
846            fdb_seqnum_t upto_seq = seqnum;
847            // In-memory snapshot
848            wal_snapshot(handle->file, (void *)handle->shandle,
849                         handle_in->txn, &upto_seq, _fdb_wal_snapshot_func);
850            // set seqnum based on handle type (multikv or default)
851            if (handle_in->kvs && handle_in->kvs->id > 0) {
852                handle->max_seqnum =
853                    _fdb_kvs_get_seqnum(handle->file->kv_header,
854                                        handle_in->kvs->id);
855            } else {
856                handle->max_seqnum = filemgr_get_seqnum(handle->file);
857            }
858
859            // synchronize dirty root nodes if exist
860            if (filemgr_dirty_root_exist(handle->file)) {
861                bid_t dirty_idtree_root, dirty_seqtree_root;
862                filemgr_mutex_lock(handle->file);
863                filemgr_get_dirty_root(handle->file,
864                                       &dirty_idtree_root, &dirty_seqtree_root);
865                if (dirty_idtree_root != BLK_NOT_FOUND) {
866                    handle->trie->root_bid = dirty_idtree_root;
867                }
868                if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
869                    if (dirty_seqtree_root != BLK_NOT_FOUND) {
870                        if (handle->kvs) {
871                            handle->seqtrie->root_bid = dirty_seqtree_root;
872                        } else {
873                            btree_init_from_bid(handle->seqtree,
874                                                handle->seqtree->blk_handle,
875                                                handle->seqtree->blk_ops,
876                                                handle->seqtree->kv_ops,
877                                                handle->seqtree->blksize,
878                                                dirty_seqtree_root);
879                        }
880                    }
881                }
882                btreeblk_discard_blocks(handle->bhandle);
883                btreeblk_create_dirty_snapshot(handle->bhandle);
884                filemgr_mutex_unlock(handle->file);
885            }
886        } else if (clone_snapshot) {
887            // Snapshot is created on the other snapshot handle
888
889            handle->max_seqnum = handle_in->seqnum;
890
891            if (seqnum == FDB_SNAPSHOT_INMEM) {
892                // in-memory snapshot
893                // Clone dirty root nodes from the source snapshot by incrementing
894                // their ref counters
895                handle->trie->root_bid = handle_in->trie->root_bid;
896                if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
897                    if (handle->kvs) {
898                        handle->seqtrie->root_bid = handle_in->seqtrie->root_bid;
899                    } else {
900                        handle->seqtree->root_bid = handle_in->seqtree->root_bid;
901                    }
902                }
903                btreeblk_discard_blocks(handle->bhandle);
904                btreeblk_clone_dirty_snapshot(handle->bhandle,
905                                              handle_in->bhandle);
906            }
907        }
908        *ptr_handle = handle;
909    } else {
910        *ptr_handle = NULL;
911        snap_close(handle->shandle);
912        free(handle);
913        // If compactor thread had finished compaction just before this routine
914        // calls _fdb_open, then it is possible that the snapshot's DB header
915        // is only present in the new_file. So we must retry the snapshot
916        // open attempt IFF _fdb_open indicates FDB_RESULT_NO_DB_INSTANCE..
917        if (fs == FDB_RESULT_NO_DB_INSTANCE && fstatus == FILE_COMPACT_OLD) {
918            if (filemgr_get_file_status(file) == FILE_REMOVED_PENDING) {
919                goto fdb_snapshot_open_start;
920            }
921        }
922    }
923    return fs;
924}
925
926static fdb_status _fdb_reset(fdb_kvs_handle *handle, fdb_kvs_handle *handle_in);
927
928LIBFDB_API
929fdb_status fdb_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
930{
931#ifdef _MEMPOOL
932    mempool_init();
933#endif
934
935    fdb_config config;
936    fdb_kvs_handle *handle_in, *handle;
937    fdb_status fs;
938    fdb_seqnum_t old_seqnum;
939
940    if (!handle_ptr) {
941        return FDB_RESULT_INVALID_ARGS;
942    }
943
944    handle_in = *handle_ptr;
945    config = handle_in->config;
946
947    if (handle_in->kvs) {
948        return fdb_kvs_rollback(handle_ptr, seqnum);
949    }
950
951    // Sequence trees are a must for rollback
952    if (handle_in->config.seqtree_opt != FDB_SEQTREE_USE) {
953        return FDB_RESULT_INVALID_CONFIG;
954    }
955
956    if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
957        return fdb_log(&handle_in->log_callback, FDB_RESULT_RONLY_VIOLATION,
958                       "Warning: Rollback is not allowed on the read-only DB file '%s'.",
959                       handle_in->file->filename);
960    }
961
962    if (!atomic_cas_uint8_t(&handle_in->handle_busy, 0, 1)) {
963        return FDB_RESULT_HANDLE_BUSY;
964    }
965
966    filemgr_mutex_lock(handle_in->file);
967    filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
968    // All transactions should be closed before rollback
969    if (wal_txn_exists(handle_in->file)) {
970        filemgr_set_rollback(handle_in->file, 0);
971        filemgr_mutex_unlock(handle_in->file);
972        fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
973        return FDB_RESULT_FAIL_BY_TRANSACTION;
974    }
975
976    // If compaction is running, wait until it is aborted.
977    // TODO: Find a better way of waiting for the compaction abortion.
978    unsigned int sleep_time = 10000; // 10 ms.
979    file_status_t fstatus = filemgr_get_file_status(handle_in->file);
980    while (fstatus == FILE_COMPACT_OLD) {
981        filemgr_mutex_unlock(handle_in->file);
982        decaying_usleep(&sleep_time, 1000000);
983        filemgr_mutex_lock(handle_in->file);
984        fstatus = filemgr_get_file_status(handle_in->file);
985    }
986    if (fstatus == FILE_REMOVED_PENDING) {
987        filemgr_mutex_unlock(handle_in->file);
988        fdb_check_file_reopen(handle_in, NULL);
989    } else {
990        filemgr_mutex_unlock(handle_in->file);
991    }
992
993    fdb_sync_db_header(handle_in);
994
995    // if the max sequence number seen by this handle is lower than the
996    // requested snapshot marker, it means the snapshot is not yet visible
997    // even via the current fdb_kvs_handle
998    if (seqnum > handle_in->seqnum) {
999        filemgr_set_rollback(handle_in->file, 0); // allow mutations
1000        fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
1001        return FDB_RESULT_NO_DB_INSTANCE;
1002    }
1003
1004    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1005    if (!handle) { // LCOV_EXCL_START
1006        fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
1007        return FDB_RESULT_ALLOC_FAIL;
1008    } // LCOV_EXCL_STOP
1009
1010    atomic_init_uint8_t(&handle->handle_busy, 0);
1011    handle->log_callback = handle_in->log_callback;
1012    handle->fhandle = handle_in->fhandle;
1013    if (seqnum == 0) {
1014        fs = _fdb_reset(handle, handle_in);
1015    } else {
1016        handle->max_seqnum = seqnum;
1017        fs = _fdb_open(handle, handle_in->file->filename, FDB_AFILENAME,
1018                       &config);
1019    }
1020
1021    filemgr_set_rollback(handle_in->file, 0); // allow mutations
1022    if (fs == FDB_RESULT_SUCCESS) {
1023        // rollback the file's sequence number
1024        filemgr_mutex_lock(handle_in->file);
1025        old_seqnum = filemgr_get_seqnum(handle_in->file);
1026        filemgr_set_seqnum(handle_in->file, seqnum);
1027        filemgr_mutex_unlock(handle_in->file);
1028
1029        fs = _fdb_commit(handle, FDB_COMMIT_NORMAL);
1030        if (fs == FDB_RESULT_SUCCESS) {
1031            if (handle_in->txn) {
1032                handle->txn = handle_in->txn;
1033                handle_in->txn = NULL;
1034            }
1035            handle_in->fhandle->root = handle;
1036            _fdb_close_root(handle_in);
1037            handle->max_seqnum = 0;
1038            handle->seqnum = seqnum;
1039            *handle_ptr = handle;
1040        } else {
1041            // cancel the rolling-back of the sequence number
1042            filemgr_mutex_lock(handle_in->file);
1043            filemgr_set_seqnum(handle_in->file, old_seqnum);
1044            filemgr_mutex_unlock(handle_in->file);
1045            free(handle);
1046            fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
1047        }
1048    } else {
1049        free(handle);
1050        fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
1051    }
1052
1053    return fs;
1054}
1055
1056static void _fdb_init_file_config(const fdb_config *config,
1057                                  struct filemgr_config *fconfig) {
1058    fconfig->blocksize = config->blocksize;
1059    fconfig->ncacheblock = config->buffercache_size / config->blocksize;
1060    fconfig->chunksize = config->chunksize;
1061
1062    fconfig->options = 0x0;
1063    if (config->flags & FDB_OPEN_FLAG_CREATE) {
1064        fconfig->options |= FILEMGR_CREATE;
1065    }
1066    if (config->flags & FDB_OPEN_FLAG_RDONLY) {
1067        fconfig->options |= FILEMGR_READONLY;
1068    }
1069    if (!(config->durability_opt & FDB_DRB_ASYNC)) {
1070        fconfig->options |= FILEMGR_SYNC;
1071    }
1072
1073    fconfig->flag = 0x0;
1074    if ((config->durability_opt & FDB_DRB_ODIRECT) &&
1075        config->buffercache_size) {
1076        fconfig->flag |= _ARCH_O_DIRECT;
1077    }
1078
1079    fconfig->prefetch_duration = config->prefetch_duration;
1080    fconfig->num_wal_shards = config->num_wal_partitions;
1081    fconfig->num_bcache_shards = config->num_bcache_partitions;
1082}
1083
1084fdb_status _fdb_clone_snapshot(fdb_kvs_handle *handle_in,
1085                               fdb_kvs_handle *handle_out)
1086{
1087    fdb_status status;
1088
1089    handle_out->config = handle_in->config;
1090    handle_out->kvs_config = handle_in->kvs_config;
1091    handle_out->fileops = handle_in->fileops;
1092    handle_out->file = handle_in->file;
1093    // Note that the file ref count will be decremented when the cloned snapshot
1094    // is closed through filemgr_close().
1095    filemgr_incr_ref_count(handle_out->file);
1096
1097    if (handle_out->filename) {
1098        handle_out->filename = (char *)realloc(handle_out->filename,
1099                                               strlen(handle_in->filename)+1);
1100    } else {
1101        handle_out->filename = (char*)malloc(strlen(handle_in->filename)+1);
1102    }
1103    strcpy(handle_out->filename, handle_in->filename);
1104
1105    // initialize the docio handle.
1106    handle_out->dhandle = (struct docio_handle *)
1107        calloc(1, sizeof(struct docio_handle));
1108    handle_out->dhandle->log_callback = &handle_out->log_callback;
1109    docio_init(handle_out->dhandle, handle_out->file,
1110               handle_out->config.compress_document_body);
1111
1112    // initialize the btree block handle.
1113    handle_out->btreeblkops = btreeblk_get_ops();
1114    handle_out->bhandle = (struct btreeblk_handle *)
1115        calloc(1, sizeof(struct btreeblk_handle));
1116    handle_out->bhandle->log_callback = &handle_out->log_callback;
1117    btreeblk_init(handle_out->bhandle, handle_out->file, handle_out->file->blocksize);
1118
1119    handle_out->dirty_updates = handle_in->dirty_updates;
1120    handle_out->cur_header_revnum = handle_in->cur_header_revnum;
1121    handle_out->last_wal_flush_hdr_bid = handle_in->last_wal_flush_hdr_bid;
1122    handle_out->kv_info_offset = handle_in->kv_info_offset;
1123    handle_out->shandle->stat = handle_in->shandle->stat;
1124    handle_out->op_stats = handle_in->op_stats;
1125
1126    // initialize the trie handle
1127    handle_out->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1128    hbtrie_init(handle_out->trie, handle_out->config.chunksize, OFFSET_SIZE,
1129                handle_out->file->blocksize,
1130                handle_in->trie->root_bid, // Source snapshot's trie root bid
1131                (void *)handle_out->bhandle, handle_out->btreeblkops,
1132                (void *)handle_out->dhandle, _fdb_readkey_wrap);
1133    // set aux for cmp wrapping function
1134    hbtrie_set_leaf_height_limit(handle_out->trie, 0xff);
1135    hbtrie_set_leaf_cmp(handle_out->trie, _fdb_custom_cmp_wrap);
1136
1137    if (handle_out->kvs) {
1138        hbtrie_set_map_function(handle_out->trie, fdb_kvs_find_cmp_chunk);
1139    }
1140
1141    if (handle_out->config.seqtree_opt == FDB_SEQTREE_USE) {
1142        handle_out->seqnum = handle_in->seqnum;
1143
1144        if (handle_out->config.multi_kv_instances) {
1145            // multi KV instance mode .. HB+trie
1146            handle_out->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1147            hbtrie_init(handle_out->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
1148                        handle_out->file->blocksize,
1149                        handle_in->seqtrie->root_bid, // Source snapshot's seqtrie root bid
1150                        (void *)handle_out->bhandle, handle_out->btreeblkops,
1151                        (void *)handle_out->dhandle, _fdb_readseq_wrap);
1152
1153        } else {
1154            // single KV instance mode .. normal B+tree
1155            struct btree_kv_ops *seq_kv_ops =
1156                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
1157            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
1158            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
1159
1160            handle_out->seqtree = (struct btree*)malloc(sizeof(struct btree));
1161            // Init the seq tree using the root bid of the source snapshot.
1162            btree_init_from_bid(handle_out->seqtree, (void *)handle_out->bhandle,
1163                                handle_out->btreeblkops, seq_kv_ops,
1164                                handle_out->config.blocksize,
1165                                handle_in->seqtree->root_bid);
1166        }
1167    } else{
1168        handle_out->seqtree = NULL;
1169    }
1170
1171    status = btreeblk_end(handle_out->bhandle);
1172    fdb_assert(status == FDB_RESULT_SUCCESS, status, handle_out);
1173
1174#ifdef _TRACE_HANDLES
1175    spin_lock(&open_handle_lock);
1176    avl_insert(&open_handles, &handle_out->avl_trace, _fdb_handle_cmp);
1177    spin_unlock(&open_handle_lock);
1178#endif
1179    return status;
1180}
1181
1182fdb_status _fdb_open(fdb_kvs_handle *handle,
1183                     const char *filename,
1184                     fdb_filename_mode_t filename_mode,
1185                     const fdb_config *config)
1186{
1187    struct filemgr_config fconfig;
1188    struct kvs_stat stat, empty_stat;
1189    bid_t trie_root_bid = BLK_NOT_FOUND;
1190    bid_t seq_root_bid = BLK_NOT_FOUND;
1191    fdb_seqnum_t seqnum = 0;
1192    filemgr_header_revnum_t header_revnum = 0;
1193    fdb_seqtree_opt_t seqtree_opt = config->seqtree_opt;
1194    uint64_t ndocs = 0;
1195    uint64_t datasize = 0;
1196    uint64_t last_wal_flush_hdr_bid = BLK_NOT_FOUND;
1197    uint64_t kv_info_offset = BLK_NOT_FOUND;
1198    uint64_t header_flags = 0;
1199    uint8_t header_buf[FDB_BLOCKSIZE];
1200    char *compacted_filename = NULL;
1201    char *prev_filename = NULL;
1202    size_t header_len = 0;
1203    bool multi_kv_instances = config->multi_kv_instances;
1204
1205    uint64_t nlivenodes = 0;
1206    bid_t hdr_bid = 0; // initialize to zero for in-memory snapshot
1207    char actual_filename[FDB_MAX_FILENAME_LEN];
1208    char virtual_filename[FDB_MAX_FILENAME_LEN];
1209    char *target_filename = NULL;
1210    fdb_status status;
1211
1212    if (filename == NULL) {
1213        return FDB_RESULT_INVALID_ARGS;
1214    }
1215    if (strlen(filename) > (FDB_MAX_FILENAME_LEN - 8)) {
1216        // filename (including path) length is supported up to
1217        // (FDB_MAX_FILENAME_LEN - 8) bytes.
1218        return FDB_RESULT_TOO_LONG_FILENAME;
1219    }
1220
1221    if (filename_mode == FDB_VFILENAME &&
1222        !compactor_is_valid_mode(filename, (fdb_config *)config)) {
1223        return FDB_RESULT_INVALID_COMPACTION_MODE;
1224    }
1225
1226    _fdb_init_file_config(config, &fconfig);
1227
1228    if (filename_mode == FDB_VFILENAME) {
1229        compactor_get_actual_filename(filename, actual_filename,
1230                                      config->compaction_mode, &handle->log_callback);
1231    } else {
1232        strcpy(actual_filename, filename);
1233    }
1234
1235    if ( config->compaction_mode == FDB_COMPACTION_MANUAL ||
1236         (config->compaction_mode == FDB_COMPACTION_AUTO   &&
1237          filename_mode == FDB_VFILENAME) ) {
1238        // 1) manual compaction mode, OR
1239        // 2) auto compaction mode + 'filename' is virtual filename
1240        // -> copy 'filename'
1241        target_filename = (char *)filename;
1242    } else {
1243        // otherwise (auto compaction mode + 'filename' is actual filename)
1244        // -> copy 'virtual_filename'
1245        compactor_get_virtual_filename(filename, virtual_filename);
1246        target_filename = virtual_filename;
1247    }
1248
1249    handle->fileops = get_filemgr_ops();
1250    filemgr_open_result result = filemgr_open((char *)actual_filename,
1251                                              handle->fileops,
1252                                              &fconfig, &handle->log_callback);
1253    if (result.rv != FDB_RESULT_SUCCESS) {
1254        return (fdb_status) result.rv;
1255    }
1256
1257    handle->file = result.file;
1258    if (config->compaction_mode == FDB_COMPACTION_MANUAL &&
1259        strcmp(filename, actual_filename)) {
1260        // It is in-place compacted file if
1261        // 1) compaction mode is manual, and
1262        // 2) actual filename is different to the filename given by user.
1263        // In this case, set the in-place compaction flag.
1264        filemgr_set_in_place_compaction(handle->file, true);
1265    }
1266    if (filemgr_is_in_place_compaction_set(handle->file)) {
1267        // This file was in-place compacted.
1268        // set 'handle->filename' to the original filename to trigger file renaming
1269        compactor_get_virtual_filename(filename, virtual_filename);
1270        target_filename = virtual_filename;
1271    }
1272
1273    if (handle->filename) {
1274        handle->filename = (char *)realloc(handle->filename,
1275                                           strlen(target_filename)+1);
1276    } else {
1277        handle->filename = (char*)malloc(strlen(target_filename)+1);
1278    }
1279    strcpy(handle->filename, target_filename);
1280
1281    // If cloning from a snapshot handle, fdb_snapshot_open would have already
1282    // set handle->last_hdr_bid to the block id of required header, so rewind..
1283    if (handle->shandle && handle->last_hdr_bid) {
1284        status = filemgr_fetch_header(handle->file, handle->last_hdr_bid,
1285                                      header_buf, &header_len, &seqnum,
1286                                      &header_revnum, &handle->log_callback);
1287        if (status != FDB_RESULT_SUCCESS) {
1288            free(handle->filename);
1289            handle->filename = NULL;
1290            filemgr_close(handle->file, false, handle->filename,
1291                              &handle->log_callback);
1292            return status;
1293        }
1294    } else { // Normal open
1295        filemgr_get_header(handle->file, header_buf, &header_len,
1296                           &handle->last_hdr_bid, &seqnum, &header_revnum);
1297    }
1298
1299    // initialize the docio handle so kv headers may be read
1300    handle->dhandle = (struct docio_handle *)
1301                      calloc(1, sizeof(struct docio_handle));
1302    handle->dhandle->log_callback = &handle->log_callback;
1303    docio_init(handle->dhandle, handle->file, config->compress_document_body);
1304
1305    if (header_len > 0) {
1306        fdb_fetch_header(header_buf, &trie_root_bid,
1307                         &seq_root_bid, &ndocs, &nlivenodes,
1308                         &datasize, &last_wal_flush_hdr_bid, &kv_info_offset,
1309                         &header_flags, &compacted_filename, &prev_filename);
1310        // use existing setting for seqtree_opt
1311        if (header_flags & FDB_FLAG_SEQTREE_USE) {
1312            seqtree_opt = FDB_SEQTREE_USE;
1313        } else {
1314            seqtree_opt = FDB_SEQTREE_NOT_USE;
1315        }
1316        // Retrieve seqnum for multi-kv mode
1317        if (handle->kvs && handle->kvs->id > 0) {
1318            if (kv_info_offset != BLK_NOT_FOUND) {
1319                if (!handle->file->kv_header) {
1320                    fdb_kvs_header_create(handle->file);
1321                    // KV header already exists but not loaded .. read & import
1322                    fdb_kvs_header_read(handle->file, handle->dhandle,
1323                                        kv_info_offset, false);
1324                }
1325                seqnum = _fdb_kvs_get_seqnum(handle->file->kv_header,
1326                                             handle->kvs->id);
1327            } else { // no kv_info offset, ok to set seqnum to zero
1328                seqnum = 0;
1329            }
1330        }
1331        // other flags
1332        if (header_flags & FDB_FLAG_ROOT_INITIALIZED) {
1333            handle->fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1334        }
1335        if (header_flags & FDB_FLAG_ROOT_CUSTOM_CMP) {
1336            handle->fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1337        }
1338        // use existing setting for multi KV instance mode
1339        if (kv_info_offset == BLK_NOT_FOUND) {
1340            multi_kv_instances = false;
1341        } else {
1342            multi_kv_instances = true;
1343        }
1344    }
1345
1346    handle->config = *config;
1347    handle->config.seqtree_opt = seqtree_opt;
1348    handle->config.multi_kv_instances = multi_kv_instances;
1349
1350    if (handle->shandle && handle->max_seqnum == FDB_SNAPSHOT_INMEM) {
1351        // Either an in-memory snapshot or cloning from an existing snapshot..
1352        hdr_bid = 0; // This prevents _fdb_restore_wal() as incoming handle's
1353                     // *_open() should have already restored it
1354    } else { // Persisted snapshot or file rollback..
1355        hdr_bid = filemgr_get_pos(handle->file) / FDB_BLOCKSIZE;
1356        if (hdr_bid > 0) {
1357            --hdr_bid;
1358        }
1359        if (handle->max_seqnum) {
1360            struct kvs_stat stat_ori;
1361            // backup original stats
1362            if (handle->kvs) {
1363                _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1364            } else {
1365                _kvs_stat_get(handle->file, 0, &stat_ori);
1366            }
1367
1368            if (hdr_bid > handle->last_hdr_bid){
1369                // uncommitted data exists beyond the last DB header
1370                // get the last committed seq number
1371                fdb_seqnum_t seq_commit;
1372                seq_commit = fdb_kvs_get_committed_seqnum(handle);
1373                if (seq_commit == 0 || seq_commit < handle->max_seqnum) {
1374                    // In case, snapshot_open is attempted with latest uncommitted
1375                    // sequence number
1376                    header_len = 0;
1377                }
1378            }
1379            // Reverse scan the file to locate the DB header with seqnum marker
1380            while (header_len && seqnum != handle->max_seqnum) {
1381                hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
1382                                          header_buf, &header_len, &seqnum,
1383                                          &handle->log_callback);
1384                if (header_len == 0) {
1385                    continue; // header doesn't exist
1386                }
1387                fdb_fetch_header(header_buf, &trie_root_bid,
1388                                 &seq_root_bid, &ndocs, &nlivenodes,
1389                                 &datasize, &last_wal_flush_hdr_bid,
1390                                 &kv_info_offset, &header_flags,
1391                                 &compacted_filename, NULL);
1392                handle->last_hdr_bid = hdr_bid;
1393
1394                if (!handle->kvs || handle->kvs->id == 0) {
1395                    // single KVS mode OR default KVS
1396                    if (!handle->shandle) {
1397                        // rollback
1398                        struct kvs_stat stat_dst;
1399                        _kvs_stat_get(handle->file, 0, &stat_dst);
1400                        stat_dst.ndocs = ndocs;
1401                        stat_dst.datasize = datasize;
1402                        stat_dst.nlivenodes = nlivenodes;
1403                        _kvs_stat_set(handle->file, 0, stat_dst);
1404                    }
1405                    continue;
1406                }
1407
1408                uint64_t doc_offset;
1409                struct kvs_header *kv_header;
1410                struct docio_object doc;
1411
1412                _fdb_kvs_header_create(&kv_header);
1413                memset(&doc, 0, sizeof(struct docio_object));
1414                doc_offset = docio_read_doc(handle->dhandle,
1415                                            kv_info_offset, &doc, true);
1416
1417                if (doc_offset == kv_info_offset) {
1418                    header_len = 0; // fail
1419                    _fdb_kvs_header_free(kv_header);
1420                } else {
1421                    _fdb_kvs_header_import(kv_header, doc.body,
1422                                           doc.length.bodylen, false);
1423                    // get local sequence number for the KV instance
1424                    seqnum = _fdb_kvs_get_seqnum(kv_header,
1425                                                 handle->kvs->id);
1426                    if (!handle->shandle) {
1427                        // rollback: replace kv_header stats
1428                        // read from the current header's kv_header
1429                        struct kvs_stat stat_src, stat_dst;
1430                        _kvs_stat_get_kv_header(kv_header,
1431                                                handle->kvs->id,
1432                                                &stat_src);
1433                        _kvs_stat_get(handle->file,
1434                                      handle->kvs->id,
1435                                      &stat_dst);
1436                        // update ndocs, datasize, nlivenodes
1437                        // into the current file's kv_header
1438                        // Note: stats related to WAL should not be updated
1439                        //       at this time. They will be adjusted through
1440                        //       discard & restore routines below.
1441                        stat_dst.ndocs = stat_src.ndocs;
1442                        stat_dst.datasize = stat_src.datasize;
1443                        stat_dst.nlivenodes = stat_src.nlivenodes;
1444                        _kvs_stat_set(handle->file,
1445                                      handle->kvs->id,
1446                                      stat_dst);
1447                    }
1448                    _fdb_kvs_header_free(kv_header);
1449                    free_docio_object(&doc, 1, 1, 1);
1450                }
1451            }
1452            if (!header_len) { // Marker MUST match that of DB commit!
1453                // rollback original stats
1454                if (handle->kvs) {
1455                    _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1456                } else {
1457                    _kvs_stat_get(handle->file, 0, &stat_ori);
1458                }
1459
1460                docio_free(handle->dhandle);
1461                free(handle->dhandle);
1462                free(handle->filename);
1463                free(prev_filename);
1464                handle->filename = NULL;
1465                filemgr_close(handle->file, false, handle->filename,
1466                              &handle->log_callback);
1467                return FDB_RESULT_NO_DB_INSTANCE;
1468            }
1469
1470            if (!handle->shandle) { // Rollback mode, destroy file WAL..
1471                if (handle->config.multi_kv_instances) {
1472                    // multi KV instance mode
1473                    // clear only WAL items belonging to the instance
1474                    wal_close_kv_ins(handle->file,
1475                                     (handle->kvs)?(handle->kvs->id):(0));
1476                } else {
1477                    wal_shutdown(handle->file);
1478                }
1479            }
1480        } else { // snapshot to sequence number 0 requested..
1481            if (handle->shandle) { // fdb_snapshot_open API call
1482                if (seqnum) {
1483                    // Database currently has a non-zero seq number,
1484                    // but the snapshot was requested with a seq number zero.
1485                    docio_free(handle->dhandle);
1486                    free(handle->dhandle);
1487                    free(handle->filename);
1488                    free(prev_filename);
1489                    handle->filename = NULL;
1490                    filemgr_close(handle->file, false, handle->filename,
1491                                  &handle->log_callback);
1492                    return FDB_RESULT_NO_DB_INSTANCE;
1493                }
1494            } // end of zero max_seqnum but non-rollback check
1495        } // end of zero max_seqnum check
1496    } // end of durable snapshot locating
1497
1498    handle->btreeblkops = btreeblk_get_ops();
1499    handle->bhandle = (struct btreeblk_handle *)
1500                      calloc(1, sizeof(struct btreeblk_handle));
1501    handle->bhandle->log_callback = &handle->log_callback;
1502
1503    handle->dirty_updates = 0;
1504
1505    if (handle->config.compaction_buf_maxsize == 0) {
1506        handle->config.compaction_buf_maxsize = FDB_COMP_BUF_MINSIZE;
1507    }
1508
1509    btreeblk_init(handle->bhandle, handle->file, handle->file->blocksize);
1510
1511    handle->cur_header_revnum = header_revnum;
1512    handle->last_wal_flush_hdr_bid = last_wal_flush_hdr_bid;
1513
1514    memset(&empty_stat, 0x0, sizeof(empty_stat));
1515    _kvs_stat_get(handle->file, 0, &stat);
1516    if (!memcmp(&stat, &empty_stat, sizeof(stat))) { // first open
1517        // sync (default) KVS stat with DB header
1518        stat.nlivenodes = nlivenodes;
1519        stat.ndocs = ndocs;
1520        stat.datasize = datasize;
1521        _kvs_stat_set(handle->file, 0, stat);
1522    }
1523
1524    if (handle->config.multi_kv_instances && !handle->shandle) {
1525        // multi KV instance mode
1526        filemgr_mutex_lock(handle->file);
1527        if (kv_info_offset == BLK_NOT_FOUND) {
1528            // there is no KV header .. create & initialize
1529            fdb_kvs_header_create(handle->file);
1530            kv_info_offset = fdb_kvs_header_append(handle->file, handle->dhandle);
1531        } else if (handle->file->kv_header == NULL) {
1532            // KV header already exists but not loaded .. read & import
1533            fdb_kvs_header_create(handle->file);
1534            fdb_kvs_header_read(handle->file, handle->dhandle, kv_info_offset, false);
1535        }
1536        filemgr_mutex_unlock(handle->file);
1537
1538        // validation check for key order of all KV stores
1539        if (handle == handle->fhandle->root) {
1540            fdb_status fs = fdb_kvs_cmp_check(handle);
1541            if (fs != FDB_RESULT_SUCCESS) { // cmp function mismatch
1542                docio_free(handle->dhandle);
1543                free(handle->dhandle);
1544                btreeblk_free(handle->bhandle);
1545                free(handle->bhandle);
1546                free(handle->filename);
1547                handle->filename = NULL;
1548                filemgr_close(handle->file, false, handle->filename,
1549                              &handle->log_callback);
1550                return fs;
1551            }
1552        }
1553    }
1554    handle->kv_info_offset = kv_info_offset;
1555
1556    if (handle->kv_info_offset != BLK_NOT_FOUND &&
1557        handle->kvs == NULL) {
1558        // multi KV instance mode .. turn on config flag
1559        handle->config.multi_kv_instances = true;
1560        // only super handle can be opened using fdb_open(...)
1561        fdb_kvs_info_create(NULL, handle, handle->file, NULL);
1562    }
1563
1564    if (handle->shandle) { // Populate snapshot stats..
1565        if (kv_info_offset == BLK_NOT_FOUND) { // Single KV mode
1566            memset(&handle->shandle->stat, 0x0,
1567                    sizeof(handle->shandle->stat));
1568            handle->shandle->stat.ndocs = ndocs;
1569            handle->shandle->stat.datasize = datasize;
1570            handle->shandle->stat.nlivenodes = nlivenodes;
1571        } else { // Multi KV instance mode, populate specific kv stats
1572            memset(&handle->shandle->stat, 0x0,
1573                    sizeof(handle->shandle->stat));
1574            _kvs_stat_get(handle->file, handle->kvs->id,
1575                    &handle->shandle->stat);
1576            // Since wal is restored below, we have to reset
1577            // wal stats to zero.
1578            handle->shandle->stat.wal_ndeletes = 0;
1579            handle->shandle->stat.wal_ndocs = 0;
1580        }
1581    }
1582
1583    // initialize pointer to the global operational stats of this KV store
1584    handle->op_stats = filemgr_get_ops_stats(handle->file, handle->kvs);
1585    fdb_assert(handle->op_stats, 0, 0);
1586
1587    handle->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1588    hbtrie_init(handle->trie, config->chunksize, OFFSET_SIZE,
1589                handle->file->blocksize, trie_root_bid,
1590                (void *)handle->bhandle, handle->btreeblkops,
1591                (void *)handle->dhandle, _fdb_readkey_wrap);
1592    // set aux for cmp wrapping function
1593    hbtrie_set_leaf_height_limit(handle->trie, 0xff);
1594    hbtrie_set_leaf_cmp(handle->trie, _fdb_custom_cmp_wrap);
1595
1596    if (handle->kvs) {
1597        hbtrie_set_map_function(handle->trie, fdb_kvs_find_cmp_chunk);
1598    }
1599
1600    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1601        handle->seqnum = seqnum;
1602
1603        if (handle->config.multi_kv_instances) {
1604            // multi KV instance mode .. HB+trie
1605            handle->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1606            hbtrie_init(handle->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
1607                        handle->file->blocksize, seq_root_bid,
1608                        (void *)handle->bhandle, handle->btreeblkops,
1609                        (void *)handle->dhandle, _fdb_readseq_wrap);
1610
1611        } else {
1612            // single KV instance mode .. normal B+tree
1613            struct btree_kv_ops *seq_kv_ops =
1614                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
1615            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
1616            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
1617
1618            handle->seqtree = (struct btree*)malloc(sizeof(struct btree));
1619            if (seq_root_bid == BLK_NOT_FOUND) {
1620                btree_init(handle->seqtree, (void *)handle->bhandle,
1621                           handle->btreeblkops, seq_kv_ops,
1622                           handle->config.blocksize, sizeof(fdb_seqnum_t),
1623                           OFFSET_SIZE, 0x0, NULL);
1624             }else{
1625                 btree_init_from_bid(handle->seqtree, (void *)handle->bhandle,
1626                                     handle->btreeblkops, seq_kv_ops,
1627                                     handle->config.blocksize, seq_root_bid);
1628             }
1629        }
1630    }else{
1631        handle->seqtree = NULL;
1632    }
1633
1634    if (handle->config.multi_kv_instances && handle->max_seqnum) {
1635        // restore only docs belonging to the KV instance
1636        // handle->kvs should not be NULL
1637        _fdb_restore_wal(handle, FDB_RESTORE_KV_INS,
1638                         hdr_bid, (handle->kvs)?(handle->kvs->id):(0));
1639    } else {
1640        // normal restore
1641        _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, hdr_bid, 0);
1642    }
1643
1644    if (compacted_filename &&
1645        filemgr_get_file_status(handle->file) == FILE_NORMAL &&
1646        !(config->flags & FDB_OPEN_FLAG_RDONLY)) { // do not recover read-only
1647        _fdb_recover_compaction(handle, compacted_filename);
1648    }
1649
1650    if (prev_filename) {
1651        if (!handle->shandle && strcmp(prev_filename, handle->file->filename)) {
1652            // record the old filename into the file handle of current file
1653            // and REMOVE old file on the first open
1654            // WARNING: snapshots must have been opened before this call
1655            if (filemgr_update_file_status(handle->file,
1656                                           filemgr_get_file_status(handle->file),
1657                                           prev_filename)) {
1658                // Open the old file with read-only mode.
1659                // (Temporarily disable log callback at this time since
1660                //  the old file might be already removed.)
1661                fconfig.options = FILEMGR_READONLY;
1662                filemgr_open_result result = filemgr_open(prev_filename,
1663                                                          handle->fileops,
1664                                                          &fconfig,
1665                                                          NULL);
1666                if (result.file) {
1667                    filemgr_remove_pending(result.file, handle->file);
1668                    filemgr_close(result.file, 0, handle->filename,
1669                                  &handle->log_callback);
1670                }
1671            }
1672        } else {
1673            free(prev_filename);
1674        }
1675    }
1676
1677    status = btreeblk_end(handle->bhandle);
1678    fdb_assert(status == FDB_RESULT_SUCCESS, status, handle);
1679
1680    // do not register read-only handles
1681    if (!(config->flags & FDB_OPEN_FLAG_RDONLY) &&
1682        config->compaction_mode == FDB_COMPACTION_AUTO) {
1683        status = compactor_register_file(handle->file, (fdb_config *)config,
1684                                         handle->fhandle->cmp_func_list,
1685                                         &handle->log_callback);
1686    }
1687
1688#ifdef _TRACE_HANDLES
1689    spin_lock(&open_handle_lock);
1690    avl_insert(&open_handles, &handle->avl_trace, _fdb_handle_cmp);
1691    spin_unlock(&open_handle_lock);
1692#endif
1693    return status;
1694}
1695
1696LIBFDB_API
1697fdb_status fdb_set_log_callback(fdb_kvs_handle *handle,
1698                                fdb_log_callback log_callback,
1699                                void *ctx_data)
1700{
1701    handle->log_callback.callback = log_callback;
1702    handle->log_callback.ctx_data = ctx_data;
1703    return FDB_RESULT_SUCCESS;
1704}
1705
1706LIBFDB_API
1707fdb_status fdb_doc_create(fdb_doc **doc, const void *key, size_t keylen,
1708                          const void *meta, size_t metalen,
1709                          const void *body, size_t bodylen)
1710{
1711    if (doc == NULL || keylen > FDB_MAX_KEYLEN ||
1712        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
1713        return FDB_RESULT_INVALID_ARGS;
1714    }
1715
1716    *doc = (fdb_doc*)calloc(1, sizeof(fdb_doc));
1717    if (*doc == NULL) { // LCOV_EXCL_START
1718        return FDB_RESULT_ALLOC_FAIL;
1719    } // LCOV_EXCL_STOP
1720
1721    (*doc)->seqnum = SEQNUM_NOT_USED;
1722
1723    if (key && keylen > 0) {
1724        (*doc)->key = (void *)malloc(keylen);
1725        if ((*doc)->key == NULL) { // LCOV_EXCL_START
1726            return FDB_RESULT_ALLOC_FAIL;
1727        } // LCOV_EXCL_STOP
1728        memcpy((*doc)->key, key, keylen);
1729        (*doc)->keylen = keylen;
1730    } else {
1731        (*doc)->key = NULL;
1732        (*doc)->keylen = 0;
1733    }
1734
1735    if (meta && metalen > 0) {
1736        (*doc)->meta = (void *)malloc(metalen);
1737        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
1738            return FDB_RESULT_ALLOC_FAIL;
1739        } // LCOV_EXCL_STOP
1740        memcpy((*doc)->meta, meta, metalen);
1741        (*doc)->metalen = metalen;
1742    } else {
1743        (*doc)->meta = NULL;
1744        (*doc)->metalen = 0;
1745    }
1746
1747    if (body && bodylen > 0) {
1748        (*doc)->body = (void *)malloc(bodylen);
1749        if ((*doc)->body == NULL) { // LCOV_EXCL_START
1750            return FDB_RESULT_ALLOC_FAIL;
1751        } // LCOV_EXCL_STOP
1752        memcpy((*doc)->body, body, bodylen);
1753        (*doc)->bodylen = bodylen;
1754    } else {
1755        (*doc)->body = NULL;
1756        (*doc)->bodylen = 0;
1757    }
1758
1759    (*doc)->size_ondisk = 0;
1760    (*doc)->deleted = false;
1761
1762    return FDB_RESULT_SUCCESS;
1763}
1764
1765LIBFDB_API
1766fdb_status fdb_doc_update(fdb_doc **doc,
1767                          const void *meta, size_t metalen,
1768                          const void *body, size_t bodylen)
1769{
1770    if (doc == NULL ||
1771        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
1772        return FDB_RESULT_INVALID_ARGS;
1773    }
1774    if (*doc == NULL) {
1775        return FDB_RESULT_INVALID_ARGS;
1776    }
1777
1778    if (meta && metalen > 0) {
1779        // free previous metadata
1780        free((*doc)->meta);
1781        // allocate new metadata
1782        (*doc)->meta = (void *)malloc(metalen);
1783        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
1784            return FDB_RESULT_ALLOC_FAIL;
1785        } // LCOV_EXCL_STOP
1786        memcpy((*doc)->meta, meta, metalen);
1787        (*doc)->metalen = metalen;
1788    }
1789
1790    if (body && bodylen > 0) {
1791        // free previous body
1792        free((*doc)->body);
1793        // allocate new body
1794        (*doc)->body = (void *)malloc(bodylen);
1795        if ((*doc)->body == NULL) { // LCOV_EXCL_START
1796            return FDB_RESULT_ALLOC_FAIL;
1797        } // LCOV_EXCL_STOP
1798        memcpy((*doc)->body, body, bodylen);
1799        (*doc)->bodylen = bodylen;
1800    }
1801
1802    return FDB_RESULT_SUCCESS;
1803}
1804
1805// doc MUST BE allocated by malloc
1806LIBFDB_API
1807fdb_status fdb_doc_free(fdb_doc *doc)
1808{
1809    if (doc) {
1810        free(doc->key);
1811        free(doc->meta);
1812        free(doc->body);
1813        free(doc);
1814    }
1815    return FDB_RESULT_SUCCESS;
1816}
1817
1818INLINE uint64_t _fdb_wal_get_old_offset(void *voidhandle,
1819                                        struct wal_item *item)
1820{
1821    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
1822    uint64_t old_offset = 0;
1823
1824    hbtrie_find_offset(handle->trie,
1825                       item->header->key,
1826                       item->header->keylen,
1827                       (void*)&old_offset);
1828    btreeblk_end(handle->bhandle);
1829    old_offset = _endian_decode(old_offset);
1830
1831    return old_offset;
1832}
1833
1834INLINE fdb_status _fdb_wal_snapshot_func(void *handle, fdb_doc *doc,
1835                                         uint64_t offset) {
1836
1837    return snap_insert((struct snap_handle *)handle, doc, offset);
1838}
1839
1840INLINE fdb_status _fdb_wal_flush_func(void *voidhandle, struct wal_item *item)
1841{
1842    hbtrie_result hr;
1843    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
1844    fdb_seqnum_t _seqnum;
1845    fdb_kvs_id_t kv_id;
1846    fdb_status fs = FDB_RESULT_SUCCESS;
1847    uint8_t *var_key = alca(uint8_t, handle->config.chunksize);
1848    int size_id, size_seq;
1849    uint8_t *kvid_seqnum;
1850    uint64_t old_offset, _offset;
1851    int delta, r;
1852    struct filemgr *file = handle->dhandle->file;
1853    struct kvs_stat stat;
1854
1855    memset(var_key, 0, handle->config.chunksize);
1856    if (handle->kvs) {
1857        buf2kvid(handle->config.chunksize, item->header->key, &kv_id);
1858    } else {
1859        kv_id = 0;
1860    }
1861
1862    if (item->action == WAL_ACT_INSERT ||
1863        item->action == WAL_ACT_LOGICAL_REMOVE) {
1864        _offset = _endian_encode(item->offset);
1865
1866        r = _kvs_stat_get(file, kv_id, &stat);
1867        if (r != 0) {
1868            // KV store corresponding to kv_id is already removed
1869            // skip this item
1870            return FDB_RESULT_SUCCESS;
1871        }
1872        handle->bhandle->nlivenodes = stat.nlivenodes;
1873
1874        hr = hbtrie_insert(handle->trie,
1875                           item->header->key,
1876                           item->header->keylen,
1877                           (void *)&_offset,
1878                           (void *)&old_offset);
1879
1880        fs = btreeblk_end(handle->bhandle);
1881        if (fs != FDB_RESULT_SUCCESS) {
1882            return fs;
1883        }
1884        old_offset = _endian_decode(old_offset);
1885
1886        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1887            _seqnum = _endian_encode(item->seqnum);
1888            if (handle->kvs) {
1889                // multi KV instance mode .. HB+trie
1890                uint64_t old_offset_local;
1891
1892                size_id = sizeof(fdb_kvs_id_t);
1893                size_seq = sizeof(fdb_seqnum_t);
1894                kvid_seqnum = alca(uint8_t, size_id + size_seq);
1895                kvid2buf(size_id, kv_id, kvid_seqnum);
1896                memcpy(kvid_seqnum + size_id, &_seqnum, size_seq);
1897                hbtrie_insert(handle->seqtrie, kvid_seqnum, size_id + size_seq,
1898                              (void *)&_offset, (void *)&old_offset_local);
1899            } else {
1900                btree_insert(handle->seqtree, (void *)&_seqnum,
1901                             (void *)&_offset);
1902            }
1903            fs = btreeblk_end(handle->bhandle);
1904            if (fs != FDB_RESULT_SUCCESS) {
1905                return fs;
1906            }
1907        }
1908
1909        delta = (int)handle->bhandle->nlivenodes - (int)stat.nlivenodes;
1910        _kvs_stat_update_attr(file, kv_id, KVS_STAT_NLIVENODES, delta);
1911
1912        if (hr == HBTRIE_RESULT_SUCCESS) {
1913            if (item->action == WAL_ACT_INSERT) {
1914                _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, 1);
1915            }
1916            _kvs_stat_update_attr(file, kv_id, KVS_STAT_DATASIZE,
1917                                  item->doc_size);
1918        } else { // update or logical delete
1919            struct docio_length len;
1920            // This block is already cached when we call HBTRIE_INSERT.
1921            // No additional block access.
1922            len = docio_read_doc_length(handle->dhandle, old_offset);
1923
1924            if (!(len.flag & DOCIO_DELETED)) {
1925                if (item->action == WAL_ACT_LOGICAL_REMOVE) {
1926                    _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, -1);
1927                }
1928            } else {
1929                if (item->action == WAL_ACT_INSERT) {
1930                    _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, 1);
1931                }
1932            }
1933
1934            delta = (int)item->doc_size - (int)_fdb_get_docsize(len);
1935            _kvs_stat_update_attr(file, kv_id, KVS_STAT_DATASIZE, delta);
1936        }
1937    } else {
1938        // Immediate remove
1939        // LCOV_EXCL_START
1940        hr = hbtrie_remove(handle->trie, item->header->key,
1941                           item->header->keylen);
1942        fs = btreeblk_end(handle->bhandle);
1943        if (fs != FDB_RESULT_SUCCESS) {
1944            return fs;
1945        }
1946
1947        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1948            _seqnum = _endian_encode(item->seqnum);
1949            if (handle->kvs) {
1950                // multi KV instance mode .. HB+trie
1951                size_id = sizeof(fdb_kvs_id_t);
1952                size_seq = sizeof(fdb_seqnum_t);
1953                kvid_seqnum = alca(uint8_t, size_id + size_seq);
1954                kvid2buf(size_id, kv_id, kvid_seqnum);
1955                memcpy(kvid_seqnum + size_id, &_seqnum, size_seq);
1956
1957                hbtrie_remove(handle->seqtrie, (void*)kvid_seqnum,
1958                              size_id + size_seq);
1959            } else {
1960                btree_remove(handle->seqtree, (void*)&_seqnum);
1961            }
1962            fs = btreeblk_end(handle->bhandle);
1963            if (fs != FDB_RESULT_SUCCESS) {
1964                return fs;
1965            }
1966        }
1967
1968        if (hr == HBTRIE_RESULT_SUCCESS) {
1969            _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, -1);
1970            delta = -(int)item->doc_size;
1971            _kvs_stat_update_attr(file, kv_id, KVS_STAT_DATASIZE, delta);
1972        }
1973        // LCOV_EXCL_STOP
1974    }
1975    return FDB_RESULT_SUCCESS;
1976}
1977
1978void fdb_sync_db_header(fdb_kvs_handle *handle)
1979{
1980    uint64_t cur_revnum = filemgr_get_header_revnum(handle->file);
1981    if (handle->cur_header_revnum != cur_revnum) {
1982        void *header_buf = NULL;
1983        size_t header_len;
1984
1985        handle->last_hdr_bid = filemgr_get_header_bid(handle->file);
1986        header_buf = filemgr_get_header(handle->file, NULL, &header_len,
1987                                        NULL, NULL, NULL);
1988        if (header_len > 0) {
1989            uint64_t header_flags, dummy64;
1990            bid_t idtree_root;
1991            bid_t new_seq_root;
1992            char *compacted_filename;
1993            char *prev_filename = NULL;
1994
1995            fdb_fetch_header(header_buf, &idtree_root,
1996                             &new_seq_root,
1997                             &dummy64, &dummy64,
1998                             &dummy64, &handle->last_wal_flush_hdr_bid,
1999                             &handle->kv_info_offset, &header_flags,
2000                             &compacted_filename, &prev_filename);
2001
2002            if (handle->dirty_updates) {
2003                // discard all cached writable b+tree nodes
2004                // to avoid data inconsistency with other writers
2005                btreeblk_discard_blocks(handle->bhandle);
2006            }
2007
2008            handle->trie->root_bid = idtree_root;
2009
2010            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2011                if (new_seq_root != handle->seqtree->root_bid) {
2012                    if (handle->config.multi_kv_instances) {
2013                        handle->seqtrie->root_bid = new_seq_root;
2014                    } else {
2015                        btree_init_from_bid(handle->seqtree,
2016                                            handle->seqtree->blk_handle,
2017                                            handle->seqtree->blk_ops,
2018                                            handle->seqtree->kv_ops,
2019                                            handle->seqtree->blksize,
2020                                            new_seq_root);
2021                    }
2022                }
2023            }
2024
2025            if (prev_filename) {
2026                free(prev_filename);
2027            }
2028
2029            handle->cur_header_revnum = cur_revnum;
2030            handle->dirty_updates = 0;
2031            if (handle->kvs) {
2032                // multiple KV instance mode AND sub handle
2033                handle->seqnum = fdb_kvs_get_seqnum(handle->file,
2034                                                    handle->kvs->id);
2035            } else {
2036                // super handle OR single KV instance mode
2037                handle->seqnum = filemgr_get_seqnum(handle->file);
2038            }
2039        }
2040        if (header_buf) {
2041            free(header_buf);
2042        }
2043    }
2044}
2045
2046fdb_status fdb_check_file_reopen(fdb_kvs_handle *handle, file_status_t *status)
2047{
2048    fdb_status fs = FDB_RESULT_SUCCESS;
2049    file_status_t fstatus = filemgr_get_file_status(handle->file);
2050    // check whether the compaction is done
2051    if (fstatus == FILE_REMOVED_PENDING) {
2052        uint64_t ndocs, datasize, nlivenodes, last_wal_flush_hdr_bid;
2053        uint64_t kv_info_offset, header_flags;
2054        size_t header_len;
2055        char *new_filename;
2056        uint8_t *buf = alca(uint8_t, handle->config.blocksize);
2057        bid_t trie_root_bid, seq_root_bid;
2058        fdb_config config = handle->config;
2059
2060        // close the current file and newly open the new file
2061        if (handle->config.compaction_mode == FDB_COMPACTION_AUTO) {
2062            // compaction daemon mode .. just close and then open
2063            char filename[FDB_MAX_FILENAME_LEN];
2064            strcpy(filename, handle->filename);
2065            fs = _fdb_close(handle);
2066            fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
2067            fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
2068            fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
2069        } else {
2070            filemgr_get_header(handle->file, buf, &header_len, NULL, NULL, NULL);
2071            fdb_fetch_header(buf,
2072                             &trie_root_bid, &seq_root_bid,
2073                             &ndocs, &nlivenodes, &datasize, &last_wal_flush_hdr_bid,
2074                             &kv_info_offset, &header_flags,
2075                             &new_filename, NULL);
2076            fs = _fdb_close(handle);
2077            fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
2078            fs = _fdb_open(handle, new_filename, FDB_AFILENAME, &config);
2079            fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
2080        }
2081    }
2082    if (status) {
2083        *status = fstatus;
2084    }
2085    return fs;
2086}
2087
2088static bool _fdb_sync_dirty_root(fdb_kvs_handle *handle)
2089{
2090    bool locked = false;
2091    bid_t dirty_idtree_root, dirty_seqtree_root;
2092
2093    if (handle->shandle) {
2094        // skip snapshot
2095        return locked;
2096    }
2097
2098    if ( ( handle->dirty_updates ||
2099           filemgr_dirty_root_exist(handle->file) )  &&
2100         filemgr_get_header_bid(handle->file) == handle->last_hdr_bid ) {
2101        // 1) { a) dirty WAL flush by this handle exists OR
2102        //      b) dirty WAL flush by other handle exists } AND
2103        // 2) no commit was performed yet.
2104        // grab lock for writer
2105        filemgr_mutex_lock(handle->file);
2106        locked = true;
2107
2108        // get dirty root nodes
2109        filemgr_get_dirty_root(handle->file,
2110                               &dirty_idtree_root, &dirty_seqtree_root);
2111        if (dirty_idtree_root != BLK_NOT_FOUND) {
2112            handle->trie->root_bid = dirty_idtree_root;
2113        }
2114        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2115            if (dirty_seqtree_root != BLK_NOT_FOUND) {
2116                if (handle->kvs) {
2117                    handle->seqtrie->root_bid = dirty_seqtree_root;
2118                } else {
2119                    btree_init_from_bid(handle->seqtree,
2120                                        handle->seqtree->blk_handle,
2121                                        handle->seqtree->blk_ops,
2122                                        handle->seqtree->kv_ops,
2123                                        handle->seqtree->blksize,
2124                                        dirty_seqtree_root);
2125                }
2126            }
2127        }
2128        btreeblk_discard_blocks(handle->bhandle);
2129    }
2130    return locked;
2131}
2132
2133LIBFDB_API
2134fdb_status fdb_get(fdb_kvs_handle *handle, fdb_doc *doc)
2135{
2136    uint64_t offset, _offset;
2137    struct docio_object _doc;
2138    struct filemgr *wal_file = NULL;
2139    struct docio_handle *dhandle;
2140    fdb_status wr;
2141    hbtrie_result hr = HBTRIE_RESULT_FAIL;
2142    fdb_txn *txn;
2143    fdb_doc doc_kv = *doc;
2144
2145    if (!handle || !doc || !doc->key || doc->keylen == 0 ||
2146        doc->keylen > FDB_MAX_KEYLEN ||
2147        (handle->kvs_config.custom_cmp &&
2148            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2149        return FDB_RESULT_INVALID_ARGS;
2150    }
2151
2152    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2153        return FDB_RESULT_HANDLE_BUSY;
2154    }
2155
2156    if (handle->kvs) {
2157        // multi KV instance mode
2158        int size_chunk = handle->config.chunksize;
2159        doc_kv.keylen = doc->keylen + size_chunk;
2160        doc_kv.key = alca(uint8_t, doc_kv.keylen);
2161        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
2162        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
2163    }
2164
2165    if (!handle->shandle) {
2166        fdb_check_file_reopen(handle, NULL);
2167        fdb_sync_db_header(handle);
2168
2169        wal_file = handle->file;
2170        dhandle = handle->dhandle;
2171
2172        txn = handle->fhandle->root->txn;
2173        if (!txn) {
2174            txn = &wal_file->global_txn;
2175        }
2176        if (handle->kvs) {
2177            wr = wal_find(txn, wal_file, &doc_kv, &offset);
2178        } else {
2179            wr = wal_find(txn, wal_file, doc, &offset);
2180        }
2181    } else {
2182        if (handle->kvs) {
2183            wr = snap_find(handle->shandle, &doc_kv, &offset);
2184        } else {
2185            wr = snap_find(handle->shandle, doc, &offset);
2186        }
2187        dhandle = handle->dhandle;
2188    }
2189
2190    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2191
2192    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2193        bool locked = _fdb_sync_dirty_root(handle);
2194
2195        if (handle->kvs) {
2196            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
2197                             (void *)&offset);
2198        } else {
2199            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
2200                             (void *)&offset);
2201        }
2202        btreeblk_end(handle->bhandle);
2203        offset = _endian_decode(offset);
2204
2205        if (locked) {
2206            // grab lock for writer if there are dirty updates
2207            filemgr_mutex_unlock(handle->file);
2208        }
2209    }
2210
2211    if (wr == FDB_RESULT_SUCCESS || hr != HBTRIE_RESULT_FAIL) {
2212        bool alloced_meta = doc->meta ? false : true;
2213        bool alloced_body = doc->body ? false : true;
2214        if (handle->kvs) {
2215            _doc.key = doc_kv.key;
2216            _doc.length.keylen = doc_kv.keylen;
2217        } else {
2218            _doc.key = doc->key;
2219            _doc.length.keylen = doc->keylen;
2220        }
2221        _doc.meta = doc->meta;
2222        _doc.body = doc->body;
2223
2224        if (wr == FDB_RESULT_SUCCESS && doc->deleted) {
2225            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2226            return FDB_RESULT_KEY_NOT_FOUND;
2227        }
2228
2229        _offset = docio_read_doc(dhandle, offset, &_doc, true);
2230        if (_offset == offset) {
2231            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2232            return FDB_RESULT_KEY_NOT_FOUND;
2233        }
2234
2235        if (_doc.length.keylen != doc_kv.keylen ||
2236            _doc.length.flag & DOCIO_DELETED) {
2237            free_docio_object(&_doc, 0, alloced_meta, alloced_body);
2238            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2239            return FDB_RESULT_KEY_NOT_FOUND;
2240        }
2241
2242        doc->seqnum = _doc.seqnum;
2243        doc->metalen = _doc.length.metalen;
2244        doc->bodylen = _doc.length.bodylen;
2245        doc->meta = _doc.meta;
2246        doc->body = _doc.body;
2247        doc->deleted = _doc.length.flag & DOCIO_DELETED;
2248        doc->size_ondisk = _fdb_get_docsize(_doc.length);
2249        doc->offset = offset;
2250
2251        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2252        return FDB_RESULT_SUCCESS;
2253    }
2254
2255    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2256    return FDB_RESULT_KEY_NOT_FOUND;
2257}
2258
2259// search document metadata using key
2260LIBFDB_API
2261fdb_status fdb_get_metaonly(fdb_kvs_handle *handle, fdb_doc *doc)
2262{
2263    uint64_t offset;
2264    struct docio_object _doc;
2265    struct docio_handle *dhandle;
2266    struct filemgr *wal_file = NULL;
2267    fdb_status wr;
2268    hbtrie_result hr = HBTRIE_RESULT_FAIL;
2269    fdb_txn *txn;
2270    fdb_doc doc_kv = *doc;
2271
2272    if (!handle || !doc || !doc->key ||
2273        doc->keylen == 0 || doc->keylen > FDB_MAX_KEYLEN ||
2274        (handle->kvs_config.custom_cmp &&
2275            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2276        return FDB_RESULT_INVALID_ARGS;
2277    }
2278
2279    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2280        return FDB_RESULT_HANDLE_BUSY;
2281    }
2282
2283    if (handle->kvs) {
2284        // multi KV instance mode
2285        int size_chunk = handle->config.chunksize;
2286        doc_kv.keylen = doc->keylen + size_chunk;
2287        doc_kv.key = alca(uint8_t, doc_kv.keylen);
2288        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
2289        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
2290    }
2291
2292    if (!handle->shandle) {
2293        fdb_check_file_reopen(handle, NULL);
2294        fdb_sync_db_header(handle);
2295
2296        wal_file = handle->file;
2297        dhandle = handle->dhandle;
2298
2299        txn = handle->fhandle->root->txn;
2300        if (!txn) {
2301            txn = &wal_file->global_txn;
2302        }
2303        if (handle->kvs) {
2304            wr = wal_find(txn, wal_file, &doc_kv, &offset);
2305        } else {
2306            wr = wal_find(txn, wal_file, doc, &offset);
2307        }
2308    } else {
2309        if (handle->kvs) {
2310            wr = snap_find(handle->shandle, &doc_kv, &offset);
2311        } else {
2312            wr = snap_find(handle->shandle, doc, &offset);
2313        }
2314        dhandle = handle->dhandle;
2315    }
2316
2317    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2318
2319    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2320        bool locked = _fdb_sync_dirty_root(handle);
2321
2322        if (handle->kvs) {
2323            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
2324                             (void *)&offset);
2325        } else {
2326            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
2327                             (void *)&offset);
2328        }
2329        btreeblk_end(handle->bhandle);
2330        offset = _endian_decode(offset);
2331
2332        if (locked) {
2333            filemgr_mutex_unlock(handle->file);
2334        }
2335    }
2336
2337    if (wr == FDB_RESULT_SUCCESS || hr != HBTRIE_RESULT_FAIL) {
2338        if (handle->kvs) {
2339            _doc.key = doc_kv.key;
2340            _doc.length.keylen = doc_kv.keylen;
2341        } else {
2342            _doc.key = doc->key;
2343            _doc.length.keylen = doc->keylen;
2344        }
2345        bool alloced_meta = doc->meta ? false : true;
2346        _doc.meta = doc->meta;
2347        _doc.body = doc->body;
2348
2349        uint64_t body_offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
2350                                                       true);
2351        if (body_offset == offset){
2352            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2353            return FDB_RESULT_KEY_NOT_FOUND;
2354        }
2355
2356        if (_doc.length.keylen != doc_kv.keylen) {
2357            free_docio_object(&_doc, 0, alloced_meta, 0);
2358            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2359            return FDB_RESULT_KEY_NOT_FOUND;
2360        }
2361
2362        doc->seqnum = _doc.seqnum;
2363        doc->metalen = _doc.length.metalen;
2364        doc->bodylen = _doc.length.bodylen;
2365        doc->meta = _doc.meta;
2366        doc->body = _doc.body;
2367        doc->deleted = _doc.length.flag & DOCIO_DELETED;
2368        doc->size_ondisk = _fdb_get_docsize(_doc.length);
2369        doc->offset = offset;
2370
2371        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2372        return FDB_RESULT_SUCCESS;
2373    }
2374
2375    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2376    return FDB_RESULT_KEY_NOT_FOUND;
2377}
2378
2379// search document using sequence number
2380LIBFDB_API
2381fdb_status fdb_get_byseq(fdb_kvs_handle *handle, fdb_doc *doc)
2382{
2383    uint64_t offset, _offset;
2384    struct docio_object _doc;
2385    struct docio_handle *dhandle;
2386    struct filemgr *wal_file = NULL;
2387    fdb_status wr;
2388    btree_result br = BTREE_RESULT_FAIL;
2389    fdb_seqnum_t _seqnum;
2390    fdb_txn *txn;
2391
2392    if (!handle || !doc || doc->seqnum == SEQNUM_NOT_USED) {
2393        return FDB_RESULT_INVALID_ARGS;
2394    }
2395
2396    // Sequence trees are a must for byseq operations
2397    if (handle->config.seqtree_opt != FDB_SEQTREE_USE) {
2398        return FDB_RESULT_INVALID_CONFIG;
2399    }
2400
2401    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2402        return FDB_RESULT_HANDLE_BUSY;
2403    }
2404
2405    if (!handle->shandle) {
2406        fdb_check_file_reopen(handle, NULL);
2407        fdb_sync_db_header(handle);
2408
2409        wal_file = handle->file;
2410        dhandle = handle->dhandle;
2411
2412        txn = handle->fhandle->root->txn;
2413        if (!txn) {
2414            txn = &wal_file->global_txn;
2415        }
2416        // prevent searching by key in WAL if 'doc' is not empty
2417        size_t key_len = doc->keylen;
2418        doc->keylen = 0;
2419        if (handle->kvs) {
2420            wr = wal_find_kv_id(txn, wal_file, handle->kvs->id, doc, &offset);
2421        } else {
2422            wr = wal_find(txn, wal_file, doc, &offset);
2423        }
2424        doc->keylen = key_len;
2425    } else {
2426        wr = snap_find(handle->shandle, doc, &offset);
2427        dhandle = handle->dhandle;
2428    }
2429
2430    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2431
2432    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2433        bool locked = _fdb_sync_dirty_root(handle);
2434
2435        _seqnum = _endian_encode(doc->seqnum);
2436        if (handle->kvs) {
2437            int size_id, size_seq;
2438            uint8_t *kv_seqnum;
2439            hbtrie_result hr;
2440            fdb_kvs_id_t _kv_id;
2441
2442            _kv_id = _endian_encode(handle->kvs->id);
2443            size_id = sizeof(fdb_kvs_id_t);
2444            size_seq = sizeof(fdb_seqnum_t);
2445            kv_seqnum = alca(uint8_t, size_id + size_seq);
2446            memcpy(kv_seqnum, &_kv_id, size_id);
2447            memcpy(kv_seqnum + size_id, &_seqnum, size_seq);
2448            hr = hbtrie_find(handle->seqtrie, (void *)kv_seqnum,
2449                             size_id + size_seq, (void *)&offset);
2450            br = (hr == HBTRIE_RESULT_SUCCESS)?(BTREE_RESULT_SUCCESS):(br);
2451        } else {
2452            br = btree_find(handle->seqtree, (void *)&_seqnum, (void *)&offset);
2453        }
2454        btreeblk_end(handle->bhandle);
2455        offset = _endian_decode(offset);
2456
2457        if (locked) {
2458            filemgr_mutex_unlock(handle->file);
2459        }
2460    }
2461
2462    if (wr == FDB_RESULT_SUCCESS || br != BTREE_RESULT_FAIL) {
2463        bool alloc_key, alloc_meta, alloc_body;
2464        if (!handle->kvs) { // single KVS mode
2465            _doc.key = doc->key;
2466            _doc.length.keylen = doc->keylen;
2467            alloc_key = doc->key ? false : true;
2468        } else {
2469            _doc.key = NULL;
2470            alloc_key = true;
2471        }
2472        alloc_meta = doc->meta ? false : true;
2473        _doc.meta = doc->meta;
2474        alloc_body = doc->body ? false : true;
2475        _doc.body = doc->body;
2476
2477        if (wr == FDB_RESULT_SUCCESS && doc->deleted) {
2478            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2479            return FDB_RESULT_KEY_NOT_FOUND;
2480        }
2481
2482        _offset = docio_read_doc(dhandle, offset, &_doc, true);
2483        if (_offset == offset) {
2484            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2485            return FDB_RESULT_KEY_NOT_FOUND;
2486        }
2487
2488        if (_doc.length.flag & DOCIO_DELETED) {
2489            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2490            free_docio_object(&_doc, alloc_key, alloc_meta, alloc_body);
2491            return FDB_RESULT_KEY_NOT_FOUND;
2492        }
2493
2494        doc->seqnum = _doc.seqnum;
2495        if (handle->kvs) {
2496            int size_chunk = handle->config.chunksize;
2497            doc->keylen = _doc.length.keylen - size_chunk;
2498            if (doc->key) { // doc->key is given by user
2499                memcpy(doc->key, (uint8_t*)_doc.key + size_chunk, doc->keylen);
2500                free_docio_object(&_doc, 1, 0, 0);
2501            } else {
2502                doc->key = _doc.key;
2503                memmove(doc->key, (uint8_t*)doc->key + size_chunk, doc->keylen);
2504            }
2505        } else {
2506            doc->keylen = _doc.length.keylen;
2507            doc->key = _doc.key;
2508        }
2509        doc->metalen = _doc.length.metalen;
2510        doc->bodylen = _doc.length.bodylen;
2511        doc->meta = _doc.meta;
2512        doc->body = _doc.body;
2513        doc->deleted = _doc.length.flag & DOCIO_DELETED;
2514        doc->size_ondisk = _fdb_get_docsize(_doc.length);
2515        doc->offset = offset;
2516
2517        fdb_assert(doc->seqnum == _doc.seqnum, doc->seqnum, _doc.seqnum);
2518
2519        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2520        return FDB_RESULT_SUCCESS;
2521    }
2522
2523    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2524    return FDB_RESULT_KEY_NOT_FOUND;
2525}
2526
2527// search document metadata using sequence number
2528LIBFDB_API
2529fdb_status fdb_get_metaonly_byseq(fdb_kvs_handle *handle, fdb_doc *doc)
2530{
2531    uint64_t offset;
2532    struct docio_object _doc;
2533    struct docio_handle *dhandle;
2534    struct filemgr *wal_file = NULL;
2535    fdb_status wr;
2536    btree_result br = BTREE_RESULT_FAIL;
2537    fdb_seqnum_t _seqnum;
2538    fdb_txn *txn = handle->fhandle->root->txn;
2539
2540    if (!handle || !doc || doc->seqnum == SEQNUM_NOT_USED) {
2541        return FDB_RESULT_INVALID_ARGS;
2542    }
2543
2544    // Sequence trees are a must for byseq operations
2545    if (handle->config.seqtree_opt != FDB_SEQTREE_USE) {
2546        return FDB_RESULT_INVALID_CONFIG;
2547    }
2548
2549    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2550        return FDB_RESULT_HANDLE_BUSY;
2551    }
2552
2553    if (!handle->shandle) {
2554        fdb_check_file_reopen(handle, NULL);
2555        fdb_sync_db_header(handle);
2556
2557        wal_file = handle->file;
2558        dhandle = handle->dhandle;
2559
2560        if (!txn) {
2561            txn = &wal_file->global_txn;
2562        }
2563        // prevent searching by key in WAL if 'doc' is not empty
2564        size_t key_len = doc->keylen;
2565        doc->keylen = 0;
2566        if (handle->kvs) {
2567            wr = wal_find_kv_id(txn, wal_file, handle->kvs->id, doc, &offset);
2568        } else {
2569            wr = wal_find(txn, wal_file, doc, &offset);
2570        }
2571        doc->keylen = key_len;
2572    } else {
2573        wr = snap_find(handle->shandle, doc, &offset);
2574        dhandle = handle->dhandle;
2575    }
2576
2577    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2578
2579    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2580        bool locked = _fdb_sync_dirty_root(handle);
2581
2582        _seqnum = _endian_encode(doc->seqnum);
2583        if (handle->kvs) {
2584            int size_id, size_seq;
2585            uint8_t *kv_seqnum;
2586            hbtrie_result hr;
2587            fdb_kvs_id_t _kv_id;
2588
2589            _kv_id = _endian_encode(handle->kvs->id);
2590            size_id = sizeof(fdb_kvs_id_t);
2591            size_seq = sizeof(fdb_seqnum_t);
2592            kv_seqnum = alca(uint8_t, size_id + size_seq);
2593            memcpy(kv_seqnum, &_kv_id, size_id);
2594            memcpy(kv_seqnum + size_id, &_seqnum, size_seq);
2595            hr = hbtrie_find(handle->seqtrie, (void *)kv_seqnum,
2596                             size_id + size_seq, (void *)&offset);
2597            br = (hr == HBTRIE_RESULT_SUCCESS)?(BTREE_RESULT_SUCCESS):(br);
2598        } else {
2599            br = btree_find(handle->seqtree, (void *)&_seqnum, (void *)&offset);
2600        }
2601        btreeblk_end(handle->bhandle);
2602        offset = _endian_decode(offset);
2603
2604        if (locked) {
2605            filemgr_mutex_unlock(handle->file);
2606        }
2607    }
2608
2609    if (wr == FDB_RESULT_SUCCESS || br != BTREE_RESULT_FAIL) {
2610        if (!handle->kvs) { // single KVS mode
2611            _doc.key = doc->key;
2612            _doc.length.keylen = doc->keylen;
2613        } else {
2614            _doc.key = NULL;
2615        }
2616        _doc.meta = doc->meta;
2617        _doc.body = doc->body;
2618
2619        uint64_t body_offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
2620                                                       true);
2621        if (body_offset == offset) {
2622            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2623            return FDB_RESULT_KEY_NOT_FOUND;
2624        }
2625
2626        if (handle->kvs) {
2627            int size_chunk = handle->config.chunksize;
2628            doc->keylen = _doc.length.keylen - size_chunk;
2629            if (doc->key) { // doc->key is given by user
2630                memcpy(doc->key, (uint8_t*)_doc.key + size_chunk, doc->keylen);
2631                free_docio_object(&_doc, 1, 0, 0);
2632            } else {
2633                doc->key = _doc.key;
2634                memmove(doc->key, (uint8_t*)doc->key + size_chunk, doc->keylen);
2635            }
2636        } else {
2637            doc->keylen = _doc.length.keylen;
2638            doc->key = _doc.key;
2639        }
2640        doc->metalen = _doc.length.metalen;
2641        doc->bodylen = _doc.length.bodylen;
2642        doc->meta = _doc.meta;
2643        doc->body = _doc.body;
2644        doc->deleted = _doc.length.flag & DOCIO_DELETED;
2645        doc->size_ondisk = _fdb_get_docsize(_doc.length);
2646        doc->offset = offset;
2647
2648        fdb_assert(doc->seqnum == _doc.seqnum, doc->seqnum, _doc.seqnum);
2649
2650        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2651        return FDB_RESULT_SUCCESS;
2652    }
2653
2654    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2655    return FDB_RESULT_KEY_NOT_FOUND;
2656}
2657
2658static uint8_t equal_docs(fdb_doc *doc, struct docio_object *_doc) {
2659    uint8_t rv = 1;
2660    // Compare a seq num if seq tree is enabled.
2661    if (doc->seqnum != SEQNUM_NOT_USED) {
2662        if (doc->seqnum != _doc->seqnum) {
2663            free(_doc->key);
2664            free(_doc->meta);
2665            free(_doc->body);
2666            _doc->key = _doc->meta = _doc->body = NULL;
2667            rv = 0;
2668        }
2669    } else { // Compare key and metadata
2670        if ((doc->key && memcmp(doc->key, _doc->key, doc->keylen)) ||
2671            (doc->meta && memcmp(doc->meta, _doc->meta, doc->metalen))) {
2672            free(_doc->key);
2673            free(_doc->meta);
2674            free(_doc->body);
2675            _doc->key = _doc->meta = _doc->body = NULL;
2676            rv = 0;
2677        }
2678    }
2679    return rv;
2680}
2681
2682INLINE void _remove_kv_id(fdb_kvs_handle *handle, struct docio_object *doc)
2683{
2684    size_t size_chunk = handle->config.chunksize;
2685    doc->length.keylen -= size_chunk;
2686    memmove(doc->key, (uint8_t*)doc->key + size_chunk, doc->length.keylen);
2687}
2688
2689// Retrieve a doc's metadata and body with a given doc offset in the database file.
2690LIBFDB_API
2691fdb_status fdb_get_byoffset(fdb_kvs_handle *handle, fdb_doc *doc)
2692{
2693    uint64_t offset = doc->offset;
2694    struct docio_object _doc;
2695
2696    if (!offset) {
2697        return FDB_RESULT_INVALID_ARGS;
2698    }
2699
2700    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2701        return FDB_RESULT_HANDLE_BUSY;
2702    }
2703
2704    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2705    memset(&_doc, 0, sizeof(struct docio_object));
2706
2707    uint64_t _offset = docio_read_doc(handle->dhandle, offset, &_doc, true);
2708    if (_offset == offset) {
2709        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2710        return FDB_RESULT_KEY_NOT_FOUND;
2711    } else {
2712        if (handle->kvs) {
2713            fdb_kvs_id_t kv_id;
2714            buf2kvid(handle->config.chunksize, _doc.key, &kv_id);
2715            if (kv_id != handle->kvs->id) {
2716                fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2717                free_docio_object(&_doc, 1, 1, 1);
2718                return FDB_RESULT_KEY_NOT_FOUND;
2719            }
2720            _remove_kv_id(handle, &_doc);
2721        }
2722        if (!equal_docs(doc, &_doc)) {
2723            free_docio_object(&_doc, 1, 1, 1);
2724            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2725            return FDB_RESULT_KEY_NOT_FOUND;
2726        }
2727    }
2728
2729    doc->seqnum = _doc.seqnum;
2730    doc->keylen = _doc.length.keylen;
2731    doc->metalen = _doc.length.metalen;
2732    doc->bodylen = _doc.length.bodylen;
2733    if (doc->key) {
2734        free(_doc.key);
2735    } else {
2736        doc->key = _doc.key;
2737    }
2738    if (doc->meta) {
2739        free(_doc.meta);
2740    } else {
2741        doc->meta = _doc.meta;
2742    }
2743    if (doc->body) {
2744        if (_doc.length.bodylen > 0) {
2745            memcpy(doc->body, _doc.body, _doc.length.bodylen);
2746        }
2747        free(_doc.body);
2748    } else {
2749        doc->body = _doc.body;
2750    }
2751    doc->deleted = _doc.length.flag & DOCIO_DELETED;
2752    doc->size_ondisk = _fdb_get_docsize(_doc.length);
2753    if (handle->kvs) {
2754        // Since _doc.length was adjusted in _remove_kv_id(),
2755        // we need to compensate it.
2756        doc->size_ondisk += handle->config.chunksize;
2757    }
2758
2759    if (_doc.length.flag & DOCIO_DELETED) {
2760        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2761        return FDB_RESULT_KEY_NOT_FOUND;
2762    }
2763
2764    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2765    return FDB_RESULT_SUCCESS;
2766}
2767
2768INLINE uint64_t _fdb_get_wal_threshold(fdb_kvs_handle *handle)
2769{
2770    return handle->config.wal_threshold;
2771}
2772
2773LIBFDB_API
2774fdb_status fdb_set(fdb_kvs_handle *handle, fdb_doc *doc)
2775{
2776    uint64_t offset;
2777    struct docio_object _doc;
2778    struct filemgr *file;
2779    struct docio_handle *dhandle;
2780    struct timeval tv;
2781    bool txn_enabled = false;
2782    bool sub_handle = false;
2783    bool wal_flushed = false;
2784    file_status_t fstatus;
2785    fdb_txn *txn = handle->fhandle->root->txn;
2786    fdb_status wr = FDB_RESULT_SUCCESS;
2787
2788    if (handle->config.flags & FDB_OPEN_FLAG_RDONLY) {
2789        return fdb_log(&handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
2790                       "Warning: SET is not allowed on the read-only DB file '%s'.",
2791                       handle->file->filename);
2792    }
2793
2794    if ( doc->key == NULL || doc->keylen == 0 ||
2795        doc->keylen > FDB_MAX_KEYLEN ||
2796        (doc->metalen > 0 && doc->meta == NULL) ||
2797        (doc->bodylen > 0 && doc->body == NULL) ||
2798        (handle->kvs_config.custom_cmp &&
2799            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2800        return FDB_RESULT_INVALID_ARGS;
2801    }
2802
2803    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2804        return FDB_RESULT_HANDLE_BUSY;
2805    }
2806
2807    _doc.length.keylen = doc->keylen;
2808    _doc.length.metalen = doc->metalen;
2809    _doc.length.bodylen = doc->deleted ? 0 : doc->bodylen;
2810    _doc.key = doc->key;
2811    _doc.meta = doc->meta;
2812    _doc.body = doc->deleted ? NULL : doc->body;
2813
2814    if (handle->kvs) {
2815        // multi KV instance mode
2816        // allocate more (temporary) space for key, to store ID number
2817        int size_chunk = handle->config.chunksize;
2818        _doc.length.keylen = doc->keylen + size_chunk;
2819        _doc.key = alca(uint8_t, _doc.length.keylen);
2820        // copy ID
2821        kvid2buf(size_chunk, handle->kvs->id, _doc.key);
2822        // copy key
2823        memcpy((uint8_t*)_doc.key + size_chunk, doc->key, doc->keylen);
2824
2825        if (handle->kvs->type == KVS_SUB) {
2826            sub_handle = true;
2827        } else {
2828            sub_handle = false;
2829        }
2830    }
2831
2832fdb_set_start:
2833    fdb_check_file_reopen(handle, NULL);
2834
2835    size_t throttling_delay = filemgr_get_throttling_delay(handle->file);
2836    if (throttling_delay) {
2837        usleep(throttling_delay);
2838    }
2839
2840    filemgr_mutex_lock(handle->file);
2841    fdb_sync_db_header(handle);
2842
2843    if (filemgr_is_rollback_on(handle->file)) {
2844        filemgr_mutex_unlock(handle->file);
2845        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2846        return FDB_RESULT_FAIL_BY_ROLLBACK;
2847    }
2848
2849    file = handle->file;
2850    dhandle = handle->dhandle;
2851
2852    fstatus = filemgr_get_file_status(file);
2853    if (fstatus == FILE_REMOVED_PENDING) {
2854        // we must not write into this file
2855        // file status was changed by other thread .. start over
2856        filemgr_mutex_unlock(file);
2857        goto fdb_set_start;
2858    }
2859
2860    if (sub_handle) {
2861        // multiple KV instance mode AND sub handle
2862        handle->seqnum = fdb_kvs_get_seqnum(file, handle->kvs->id) + 1;
2863        fdb_kvs_set_seqnum(file, handle->kvs->id, handle->seqnum);
2864    } else {
2865        // super handle OR single KV instance mode
2866        handle->seqnum = filemgr_get_seqnum(file) + 1;
2867        filemgr_set_seqnum(file, handle->seqnum);
2868    }
2869    _doc.seqnum = doc->seqnum = handle->seqnum;
2870
2871    if (doc->deleted) {
2872        // set timestamp
2873        gettimeofday(&tv, NULL);
2874        _doc.timestamp = (timestamp_t)tv.tv_sec;
2875    } else {
2876        _doc.timestamp = 0;
2877    }
2878
2879    if (txn) {
2880        txn_enabled = true;
2881    }
2882
2883    offset = docio_append_doc(dhandle, &_doc, doc->deleted, txn_enabled);
2884    if (offset == BLK_NOT_FOUND) {
2885        filemgr_mutex_unlock(file);
2886        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2887        return FDB_RESULT_WRITE_FAIL;
2888    }
2889
2890    doc->size_ondisk = _fdb_get_docsize(_doc.length);
2891    doc->offset = offset;
2892    if (!txn) {
2893        txn = &file->global_txn;
2894    }
2895    if (handle->kvs) {
2896        // multi KV instance mode
2897        fdb_doc kv_ins_doc = *doc;
2898        kv_ins_doc.key = _doc.key;
2899        kv_ins_doc.keylen = _doc.length.keylen;
2900        wal_insert(txn, file, &kv_ins_doc, offset, 0);
2901    } else {
2902        wal_insert(txn, file, doc, offset, 0);
2903    }
2904
2905    if (wal_get_dirty_status(file)== FDB_WAL_CLEAN) {
2906        wal_set_dirty_status(file, FDB_WAL_DIRTY);
2907    }
2908
2909    if (handle->config.wal_flush_before_commit ||
2910         handle->config.auto_commit) {
2911        bid_t dirty_idtree_root, dirty_seqtree_root;
2912
2913        if (!txn_enabled) {
2914            handle->dirty_updates = 1;
2915        }
2916
2917        // MUST ensure that 'file' is always 'handle->file',
2918        // because this routine will not be executed during compaction.
2919        filemgr_get_dirty_root(file, &dirty_idtree_root, &dirty_seqtree_root);
2920
2921        // other concurrent writer flushed WAL before commit,
2922        // sync root node of each tree
2923        if (dirty_idtree_root != BLK_NOT_FOUND) {
2924            handle->trie->root_bid = dirty_idtree_root;
2925        }
2926        if (handle->config.seqtree_opt == FDB_SEQTREE_USE &&
2927            dirty_seqtree_root != BLK_NOT_FOUND) {
2928            if (handle->kvs) {
2929                handle->seqtrie->root_bid = dirty_seqtree_root;
2930            } else {
2931                btree_init_from_bid(handle->seqtree,
2932                                    handle->seqtree->blk_handle,
2933                                    handle->seqtree->blk_ops,
2934                                    handle->seqtree->kv_ops,
2935                                    handle->seqtree->blksize,
2936                                    dirty_seqtree_root);
2937            }
2938        }
2939
2940        if (wal_get_num_flushable(file) > _fdb_get_wal_threshold(handle)) {
2941            struct avl_tree flush_items;
2942
2943            // discard all cached writable blocks
2944            // to avoid data inconsistency with other writers
2945            btreeblk_discard_blocks(handle->bhandle);
2946
2947            // commit only for non-transactional WAL entries
2948            wr = wal_commit(&file->global_txn, file, NULL, &handle->log_callback);
2949            if (wr != FDB_RESULT_SUCCESS) {
2950                filemgr_mutex_unlock(file);
2951                fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0),
2952                           1, 0);
2953                return wr;
2954            }
2955            wr = wal_flush(file, (void *)handle,
2956                      _fdb_wal_flush_func, _fdb_wal_get_old_offset,
2957                      &flush_items);
2958            if (wr != FDB_RESULT_SUCCESS) {
2959                filemgr_mutex_unlock(file);
2960                fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0),
2961                           1, 0);
2962                return wr;
2963            }
2964            wal_set_dirty_status(file, FDB_WAL_PENDING);
2965            // it is ok to release flushed items becuase
2966            // these items are not actually committed yet.
2967            // they become visible after fdb_commit is invoked.
2968            wal_release_flushed_items(file, &flush_items);
2969
2970            // sync new root node
2971            dirty_idtree_root = handle->trie->root_bid;
2972            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2973                if (handle->kvs) {
2974                    dirty_seqtree_root = handle->seqtrie->root_bid;
2975                } else {
2976                    dirty_seqtree_root = handle->seqtree->root_bid;
2977                }
2978            }
2979            filemgr_set_dirty_root(file,
2980                                   dirty_idtree_root,
2981                                   dirty_seqtree_root);
2982
2983            wal_flushed = true;
2984            btreeblk_reset_subblock_info(handle->bhandle);
2985        }
2986    }
2987
2988    filemgr_mutex_unlock(file);
2989
2990    if (!doc->deleted) {
2991        atomic_incr_uint64_t(&handle->op_stats->num_sets);
2992    }
2993
2994    if (wal_flushed && handle->config.auto_commit) {
2995        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2996        return fdb_commit(handle->fhandle, FDB_COMMIT_NORMAL);
2997    }
2998    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2999    return FDB_RESULT_SUCCESS;
3000}
3001
3002LIBFDB_API
3003fdb_status fdb_del(fdb_kvs_handle *handle, fdb_doc *doc)
3004{
3005    if (handle->config.flags & FDB_OPEN_FLAG_RDONLY) {
3006        return fdb_log(&handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
3007                       "Warning: DEL is not allowed on the read-only DB file '%s'.",
3008                       handle->file->filename);
3009    }
3010
3011    if (doc->key == NULL || doc->keylen == 0 ||
3012        doc->keylen > FDB_MAX_KEYLEN ||
3013        (handle->kvs_config.custom_cmp &&
3014            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
3015        return FDB_RESULT_INVALID_ARGS;
3016    }
3017
3018    doc->deleted = true;
3019    fdb_doc _doc;
3020    _doc = *doc;
3021    _doc.bodylen = 0;
3022    _doc.body = NULL;
3023
3024    atomic_incr_uint64_t(&handle->op_stats->num_dels);
3025
3026    return fdb_set(handle, &_doc);
3027}
3028
3029static uint64_t _fdb_export_header_flags(fdb_kvs_handle *handle)
3030{
3031    uint64_t rv = 0;
3032    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
3033        // seq tree is used
3034        rv |= FDB_FLAG_SEQTREE_USE;
3035    }
3036    if (handle->fhandle->flags & FHANDLE_ROOT_INITIALIZED) {
3037        // the default KVS is once opened
3038        rv |= FDB_FLAG_ROOT_INITIALIZED;
3039    }
3040    if (handle->fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP) {
3041        // the default KVS is based on custom key order
3042        rv |= FDB_FLAG_ROOT_CUSTOM_CMP;
3043    }
3044    return rv;
3045}
3046
3047uint64_t fdb_set_file_header(fdb_kvs_handle *handle)
3048{
3049    /*
3050    <ForestDB header>
3051    [offset]: (description)
3052    [     0]: BID of root node of root B+Tree of HB+Trie: 8 bytes
3053    [     8]: BID of root node of seq B+Tree: 8 bytes (0xFF.. if not used)
3054    [    16]: # of live documents: 8 bytes
3055    [    24]: # of live B+Tree nodes: 8 bytes
3056    [    32]: Data size (byte): 8 bytes
3057    [    40]: BID of the DB header created when last WAL flush: 8 bytes
3058    [    48]: Offset of the document containing KV instances' info: 8 bytes
3059    [    56]: Header flags: 8 bytes
3060    [    64]: Size of newly compacted target file name : 2 bytes
3061    [    66]: Size of old file name before compaction :  2 bytes
3062    [    68]: File name of newly compacted file : x bytes
3063    [  68+x]: File name of old file before compcation : y bytes
3064    [68+x+y]: CRC32: 4 bytes
3065    total size (header's length): 72+x+y bytes
3066
3067    Note: the list of functions that need to be modified
3068          if the header structure is changed:
3069
3070        _fdb_redirect_header() in forestdb.cc
3071        filemgr_destory_file() in filemgr.cc
3072    */
3073    uint8_t *buf = alca(uint8_t, handle->config.blocksize);
3074    uint16_t new_filename_len = 0;
3075    uint16_t old_filename_len = 0;
3076    uint16_t _edn_safe_16;
3077    uint32_t crc;
3078    uint64_t _edn_safe_64;
3079    size_t offset = 0;
3080    struct filemgr *cur_file;
3081    struct kvs_stat stat;
3082
3083    cur_file = handle->file;
3084
3085    // hb+trie or idtree root bid
3086    _edn_safe_64 = _endian_encode(handle->trie->root_bid);
3087    seq_memcpy(buf + offset, &_edn_safe_64, sizeof(handle->trie->root_bid), offset);
3088
3089    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
3090        // b+tree root bid
3091        _edn_safe_64 = _endian_encode(handle->seqtree->root_bid);
3092        seq_memcpy(buf + offset, &_edn_safe_64,
3093            sizeof(handle->seqtree->root_bid), offset);
3094    } else {
3095        memset(buf + offset, 0xff, sizeof(uint64_t));
3096        offset += sizeof(uint64_t);
3097    }
3098
3099    // get stat
3100    _kvs_stat_get(cur_file, 0, &stat);
3101
3102    // # docs
3103    _edn_safe_64 = _endian_encode(stat.ndocs);
3104    seq_memcpy(buf + offset, &_edn_safe_64, sizeof(_edn_safe_64), offset);
3105    // # live nodes
3106    _edn_safe_64 = _endian_encode(stat.nlivenodes);
3107    seq_memcpy(buf + offset, &_edn_safe_64,
3108               sizeof(_edn_safe_64), offset);
3109    // data size
3110    _edn_safe_64 =