xref: /4.0.0/forestdb/src/forestdb.cc (revision ccb53154)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <sys/time.h>
25#endif
26
27#include "libforestdb/forestdb.h"
28#include "fdb_internal.h"
29#include "filemgr.h"
30#include "hbtrie.h"
31#include "list.h"
32#include "btree.h"
33#include "btree_kv.h"
34#include "btree_var_kv_ops.h"
35#include "docio.h"
36#include "btreeblock.h"
37#include "common.h"
38#include "wal.h"
39#include "snapshot.h"
40#include "filemgr_ops.h"
41#include "configuration.h"
42#include "internal_types.h"
43#include "compactor.h"
44#include "memleak.h"
45#include "time_utils.h"
46#include "system_resource_stats.h"
47
48#ifdef __DEBUG
49#ifndef __DEBUG_FDB
50    #undef DBG
51    #undef DBGCMD
52    #undef DBGSW
53    #define DBG(...)
54    #define DBGCMD(...)
55    #define DBGSW(n, ...)
56#endif
57#endif
58
59#ifdef _TRACE_HANDLES
60struct avl_tree open_handles;
61static spin_t open_handle_lock;
62static int _fdb_handle_cmp(struct avl_node *a, struct avl_node *b, void *aux)
63{
64    struct _fdb_kvs_handle *aa, *bb;
65    aa = _get_entry(a, struct _fdb_kvs_handle, avl_trace);
66    bb = _get_entry(b, struct _fdb_kvs_handle, avl_trace);
67    return (aa > bb) ? 1 : -1;
68}
69#endif
70
71static volatile uint8_t fdb_initialized = 0;
72static volatile uint8_t fdb_open_inprog = 0;
73#ifdef SPIN_INITIALIZER
74static spin_t initial_lock = SPIN_INITIALIZER;
75#else
76static volatile unsigned int initial_lock_status = 0;
77static spin_t initial_lock;
78#endif
79
80static fdb_status _fdb_wal_snapshot_func(void *handle, fdb_doc *doc,
81                                         uint64_t offset);
82
83INLINE int _cmp_uint64_t_endian_safe(void *key1, void *key2, void *aux)
84{
85    (void) aux;
86    uint64_t a,b;
87    a = *(uint64_t*)key1;
88    b = *(uint64_t*)key2;
89    a = _endian_decode(a);
90    b = _endian_decode(b);
91    return _CMP_U64(a, b);
92}
93
94size_t _fdb_readkey_wrap(void *handle, uint64_t offset, void *buf)
95{
96    keylen_t keylen;
97    offset = _endian_decode(offset);
98    docio_read_doc_key((struct docio_handle *)handle, offset, &keylen, buf);
99    return keylen;
100}
101
102size_t _fdb_readseq_wrap(void *handle, uint64_t offset, void *buf)
103{
104    int size_id, size_seq, size_chunk;
105    fdb_seqnum_t _seqnum;
106    struct docio_object doc;
107    struct docio_handle *dhandle = (struct docio_handle *)handle;
108
109    size_id = sizeof(fdb_kvs_id_t);
110    size_seq = sizeof(fdb_seqnum_t);
111    size_chunk = dhandle->file->config->chunksize;
112    memset(&doc, 0, sizeof(struct docio_object));
113
114    offset = _endian_decode(offset);
115    docio_read_doc_key_meta((struct docio_handle *)handle, offset, &doc,
116                            true);
117    buf2buf(size_chunk, doc.key, size_id, buf);
118    _seqnum = _endian_encode(doc.seqnum);
119    memcpy((uint8_t*)buf + size_id, &_seqnum, size_seq);
120
121    free(doc.key);
122    free(doc.meta);
123
124    return size_id + size_seq;
125}
126
127int _fdb_custom_cmp_wrap(void *key1, void *key2, void *aux)
128{
129    int is_key1_inf, is_key2_inf;
130    uint8_t *keystr1 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
131    uint8_t *keystr2 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
132    size_t keylen1, keylen2;
133    btree_cmp_args *args = (btree_cmp_args *)aux;
134    fdb_custom_cmp_variable cmp = (fdb_custom_cmp_variable)args->aux;
135
136    is_key1_inf = _is_inf_key(key1);
137    is_key2_inf = _is_inf_key(key2);
138    if (is_key1_inf && is_key2_inf) { // both are infinite
139        return 0;
140    } else if (!is_key1_inf && is_key2_inf) { // key2 is infinite
141        return -1;
142    } else if (is_key1_inf && !is_key2_inf) { // key1 is infinite
143        return 1;
144    }
145
146    _get_var_key(key1, (void*)keystr1, &keylen1);
147    _get_var_key(key2, (void*)keystr2, &keylen2);
148
149    if (keylen1 == 0 && keylen2 == 0) {
150        return 0;
151    } else if (keylen1 ==0 && keylen2 > 0) {
152        return -1;
153    } else if (keylen1 > 0 && keylen2 == 0) {
154        return 1;
155    }
156
157    return cmp(keystr1, keylen1, keystr2, keylen2);
158}
159
160void fdb_fetch_header(void *header_buf,
161                      bid_t *trie_root_bid,
162                      bid_t *seq_root_bid,
163                      uint64_t *ndocs,
164                      uint64_t *nlivenodes,
165                      uint64_t *datasize,
166                      uint64_t *last_wal_flush_hdr_bid,
167                      uint64_t *kv_info_offset,
168                      uint64_t *header_flags,
169                      char **new_filename,
170                      char **old_filename)
171{
172    size_t offset = 0;
173    uint16_t new_filename_len;
174    uint16_t old_filename_len;
175
176    seq_memcpy(trie_root_bid, (uint8_t *)header_buf + offset,
177               sizeof(bid_t), offset);
178    *trie_root_bid = _endian_decode(*trie_root_bid);
179
180    seq_memcpy(seq_root_bid, (uint8_t *)header_buf + offset,
181               sizeof(bid_t), offset);
182    *seq_root_bid = _endian_decode(*seq_root_bid);
183
184    seq_memcpy(ndocs, (uint8_t *)header_buf + offset,
185               sizeof(uint64_t), offset);
186    *ndocs = _endian_decode(*ndocs);
187
188    seq_memcpy(nlivenodes, (uint8_t *)header_buf + offset,
189               sizeof(uint64_t), offset);
190    *nlivenodes = _endian_decode(*nlivenodes);
191
192    seq_memcpy(datasize, (uint8_t *)header_buf + offset,
193               sizeof(uint64_t), offset);
194    *datasize = _endian_decode(*datasize);
195
196    seq_memcpy(last_wal_flush_hdr_bid, (uint8_t *)header_buf + offset,
197               sizeof(uint64_t), offset);
198    *last_wal_flush_hdr_bid = _endian_decode(*last_wal_flush_hdr_bid);
199
200    seq_memcpy(kv_info_offset, (uint8_t *)header_buf + offset,
201               sizeof(uint64_t), offset);
202    *kv_info_offset = _endian_decode(*kv_info_offset);
203
204    seq_memcpy(header_flags, (uint8_t *)header_buf + offset,
205               sizeof(uint64_t), offset);
206    *header_flags = _endian_decode(*header_flags);
207
208    seq_memcpy(&new_filename_len, (uint8_t *)header_buf + offset,
209               sizeof(new_filename_len), offset);
210    new_filename_len = _endian_decode(new_filename_len);
211    seq_memcpy(&old_filename_len, (uint8_t *)header_buf + offset,
212               sizeof(old_filename_len), offset);
213    old_filename_len = _endian_decode(old_filename_len);
214    if (new_filename_len) {
215        *new_filename = (char*)((uint8_t *)header_buf + offset);
216    } else {
217        *new_filename = NULL;
218    }
219    offset += new_filename_len;
220    if (old_filename && old_filename_len) {
221        *old_filename = (char *) malloc(old_filename_len);
222        seq_memcpy(*old_filename,
223                   (uint8_t *)header_buf + offset,
224                   old_filename_len, offset);
225    }
226}
227
228typedef enum {
229    FDB_RESTORE_NORMAL,
230    FDB_RESTORE_KV_INS,
231} fdb_restore_mode_t;
232
233INLINE void _fdb_restore_wal(fdb_kvs_handle *handle,
234                             fdb_restore_mode_t mode,
235                             bid_t hdr_bid,
236                             fdb_kvs_id_t kv_id_req)
237{
238    struct filemgr *file = handle->file;
239    uint32_t blocksize = handle->file->blocksize;
240    uint64_t last_wal_flush_hdr_bid = handle->last_wal_flush_hdr_bid;
241    uint64_t hdr_off = hdr_bid * FDB_BLOCKSIZE;
242    uint64_t offset = 0; //assume everything from first block needs restoration
243    err_log_callback *log_callback;
244
245    if (!hdr_off) { // Nothing to do if we don't have a header block offset
246        return;
247    }
248
249    filemgr_mutex_lock(file);
250    if (last_wal_flush_hdr_bid != BLK_NOT_FOUND) {
251        offset = (last_wal_flush_hdr_bid + 1) * blocksize;
252    }
253
254    // If a valid last header was retrieved and it matches the current header
255    // OR if WAL already had entries populated, then no crash recovery needed
256    if (hdr_off <= offset ||
257        (!handle->shandle && wal_get_size(file) &&
258            mode != FDB_RESTORE_KV_INS)) {
259        filemgr_mutex_unlock(file);
260        return;
261    }
262
263    // Temporarily disable the error logging callback as there are false positive
264    // checksum errors in docio_read_doc.
265    // TODO: Need to adapt docio_read_doc to separate false checksum errors.
266    log_callback = handle->dhandle->log_callback;
267    handle->dhandle->log_callback = NULL;
268
269    for (; offset < hdr_off;
270        offset = ((offset / blocksize) + 1) * blocksize) { // next block's off
271        if (!docio_check_buffer(handle->dhandle, offset / blocksize)) {
272            continue;
273        } else {
274            do {
275                struct docio_object doc;
276                uint64_t _offset;
277                uint64_t doc_offset;
278                memset(&doc, 0, sizeof(doc));
279                _offset = docio_read_doc(handle->dhandle, offset, &doc, true);
280                if (_offset == offset) { // reached unreadable doc, skip block
281                    break;
282                }
283                if (doc.key || (doc.length.flag & DOCIO_TXN_COMMITTED)) {
284                    // check if the doc is transactional or not, and
285                    // also check if the doc contains system info
286                    if (!(doc.length.flag & DOCIO_TXN_DIRTY) &&
287                        !(doc.length.flag & DOCIO_SYSTEM)) {
288                        if (doc.length.flag & DOCIO_TXN_COMMITTED) {
289                            // commit mark .. read doc offset
290                            doc_offset = doc.doc_offset;
291                            // read the previously skipped doc
292                            docio_read_doc(handle->dhandle, doc_offset, &doc, true);
293                            if (doc.key == NULL) { // doc read error
294                                free(doc.meta);
295                                free(doc.body);
296                                offset = _offset;
297                                continue;
298                            }
299                        } else {
300                            doc_offset = offset;
301                        }
302
303                        // If say a snapshot is taken on a db handle after
304                        // rollback, then skip WAL items after rollback point
305                        if (handle->config.seqtree_opt == FDB_SEQTREE_USE &&
306                            (mode == FDB_RESTORE_KV_INS || !handle->kvs) &&
307                            doc.seqnum > handle->seqnum) {
308                            free(doc.key);
309                            free(doc.meta);
310                            free(doc.body);
311                            offset = _offset;
312                            continue;
313                        }
314
315                        // restore document
316                        fdb_doc wal_doc;
317                        wal_doc.keylen = doc.length.keylen;
318                        wal_doc.bodylen = doc.length.bodylen;
319                        wal_doc.key = doc.key;
320                        wal_doc.seqnum = doc.seqnum;
321                        wal_doc.deleted = doc.length.flag & DOCIO_DELETED;
322
323                        if (!handle->shandle) {
324                            wal_doc.metalen = doc.length.metalen;
325                            wal_doc.meta = doc.meta;
326                            wal_doc.size_ondisk = _fdb_get_docsize(doc.length);
327
328                            if (handle->kvs) {
329                                // check seqnum before insert
330                                fdb_kvs_id_t kv_id;
331                                fdb_seqnum_t kv_seqnum;
332                                buf2kvid(handle->config.chunksize,
333                                         wal_doc.key, &kv_id);
334
335                                if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
336                                    kv_seqnum = fdb_kvs_get_seqnum(handle->file, kv_id);
337                                } else {
338                                    kv_seqnum = SEQNUM_NOT_USED;
339                                }
340                                if (doc.seqnum <= kv_seqnum &&
341                                        ((mode == FDB_RESTORE_KV_INS &&
342                                            kv_id == kv_id_req) ||
343                                         (mode == FDB_RESTORE_NORMAL)) ) {
344                                    // if mode is NORMAL, restore all items
345                                    // if mode is KV_INS, restore items matching ID
346                                    wal_insert(&file->global_txn, file,
347                                               &wal_doc, doc_offset, 0);
348                                }
349                            } else {
350                                wal_insert(&file->global_txn, file,
351                                           &wal_doc, doc_offset, 0);
352                            }
353                            if (doc.key) free(doc.key);
354                        } else {
355                            // snapshot
356                            if (handle->kvs) {
357                                fdb_kvs_id_t kv_id;
358                                buf2kvid(handle->config.chunksize,
359                                         wal_doc.key, &kv_id);
360                                if (kv_id == handle->kvs->id) {
361                                    // snapshot: insert ID matched documents only
362                                    snap_insert(handle->shandle,
363                                                &wal_doc, doc_offset);
364                                } else {
365                                    free(doc.key);
366                                }
367                            } else {
368                                snap_insert(handle->shandle, &wal_doc, doc_offset);
369                            }
370                        }
371                        free(doc.meta);
372                        free(doc.body);
373                        offset = _offset;
374                    } else {
375                        // skip transactional document or system document
376                        free(doc.key);
377                        free(doc.meta);
378                        free(doc.body);
379                        offset = _offset;
380                        // do not break.. read next doc
381                    }
382                } else {
383                    free(doc.key);
384                    free(doc.meta);
385                    free(doc.body);
386                    offset = _offset;
387                    break;
388                }
389            } while (offset + sizeof(struct docio_length) < hdr_off);
390        }
391    }
392    // wal commit
393    if (!handle->shandle) {
394        wal_commit(&file->global_txn, file, NULL, &handle->log_callback);
395    }
396    filemgr_mutex_unlock(file);
397    handle->dhandle->log_callback = log_callback;
398}
399
400INLINE fdb_status _fdb_recover_compaction(fdb_kvs_handle *handle,
401                                          const char *new_filename)
402{
403    fdb_kvs_handle new_db;
404    fdb_config config = handle->config;
405    struct filemgr *new_file;
406
407    memset(&new_db, 0, sizeof(new_db));
408    new_db.log_callback.callback = handle->log_callback.callback;
409    new_db.log_callback.ctx_data = handle->log_callback.ctx_data;
410    config.flags |= FDB_OPEN_FLAG_RDONLY;
411    new_db.fhandle = handle->fhandle;
412    new_db.kvs_config = handle->kvs_config;
413    fdb_status status = _fdb_open(&new_db, new_filename,
414                                  FDB_AFILENAME, &config);
415    if (status != FDB_RESULT_SUCCESS) {
416        return fdb_log(&handle->log_callback, status,
417                       "Error in opening a partially compacted file '%s' for recovery.",
418                       new_filename);
419    }
420
421    new_file = new_db.file;
422
423    if (new_file->old_filename &&
424        !strncmp(new_file->old_filename, handle->file->filename,
425                 FDB_MAX_FILENAME_LEN)) {
426        struct filemgr *old_file = handle->file;
427        // If new file has a recorded old_filename then it means that
428        // compaction has completed successfully. Mark self for deletion
429        filemgr_mutex_lock(new_file);
430
431        status = btreeblk_end(handle->bhandle);
432        if (status != FDB_RESULT_SUCCESS) {
433            filemgr_mutex_unlock(new_file);
434            _fdb_close(&new_db);
435            return status;
436        }
437        btreeblk_free(handle->bhandle);
438        free(handle->bhandle);
439        handle->bhandle = new_db.bhandle;
440
441        docio_free(handle->dhandle);
442        free(handle->dhandle);
443        handle->dhandle = new_db.dhandle;
444
445        hbtrie_free(handle->trie);
446        free(handle->trie);
447        handle->trie = new_db.trie;
448
449        wal_shutdown(handle->file);
450        handle->file = new_file;
451
452        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
453            if (handle->kvs) {
454                // multi KV instance mode
455                hbtrie_free(handle->seqtrie);
456                free(handle->seqtrie);
457                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
458                    handle->seqtrie = new_db.seqtrie;
459                }
460            } else {
461                free(handle->seqtree->kv_ops);
462                free(handle->seqtree);
463                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
464                    handle->seqtree = new_db.seqtree;
465                }
466            }
467        }
468
469        filemgr_mutex_unlock(new_file);
470        if (new_db.kvs) {
471            fdb_kvs_info_free(&new_db);
472        }
473        // remove self: WARNING must not close this handle if snapshots
474        // are yet to open this file
475        filemgr_remove_pending(old_file, new_db.file);
476        filemgr_close(old_file, 0, handle->filename, &handle->log_callback);
477        free(new_db.filename);
478        return FDB_RESULT_FAIL_BY_COMPACTION;
479    }
480
481    // As the new file is partially compacted, it should be removed upon close.
482    // Just in-case the new file gets opened before removal, point it to the old
483    // file to ensure availability of data.
484    filemgr_remove_pending(new_db.file, handle->file);
485    _fdb_close(&new_db);
486
487    return FDB_RESULT_SUCCESS;
488}
489
490LIBFDB_API
491fdb_status fdb_init(fdb_config *config)
492{
493    fdb_config _config;
494    compactor_config c_config;
495    struct filemgr_config f_config;
496
497    if (config) {
498        if (validate_fdb_config(config)) {
499            _config = *config;
500        } else {
501            return FDB_RESULT_INVALID_CONFIG;
502        }
503    } else {
504        _config = get_default_config();
505    }
506
507    // global initialization
508    // initialized only once at first time
509    if (!fdb_initialized) {
510#ifdef _TRACE_HANDLES
511        spin_init(&open_handle_lock);
512        avl_init(&open_handles, NULL);
513#endif
514
515#ifndef SPIN_INITIALIZER
516        // Note that only Windows passes through this routine
517        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
518            // atomically initialize spin lock only once
519            spin_init(&initial_lock);
520            initial_lock_status = 2;
521        } else {
522            // the others .. wait until initializing 'initial_lock' is done
523            while (initial_lock_status != 2) {
524                Sleep(1);
525            }
526        }
527#endif
528
529    }
530    spin_lock(&initial_lock);
531    if (!fdb_initialized) {
532        double ram_size = (double) get_memory_size();
533        if (ram_size * BCACHE_MEMORY_THRESHOLD < (double) _config.buffercache_size) {
534            spin_unlock(&initial_lock);
535            return FDB_RESULT_TOO_BIG_BUFFER_CACHE;
536        }
537        // initialize file manager and block cache
538        f_config.blocksize = _config.blocksize;
539        f_config.ncacheblock = _config.buffercache_size / _config.blocksize;
540        filemgr_init(&f_config);
541
542        // initialize compaction daemon
543        c_config.sleep_duration = _config.compactor_sleep_duration;
544        c_config.num_threads = _config.num_compactor_threads;
545        compactor_init(&c_config);
546
547        fdb_initialized = 1;
548    }
549    fdb_open_inprog++;
550    spin_unlock(&initial_lock);
551
552    return FDB_RESULT_SUCCESS;
553}
554
555LIBFDB_API
556fdb_config fdb_get_default_config(void) {
557    return get_default_config();
558}
559
560LIBFDB_API
561fdb_kvs_config fdb_get_default_kvs_config(void) {
562    return get_default_kvs_config();
563}
564
565LIBFDB_API
566fdb_status fdb_open(fdb_file_handle **ptr_fhandle,
567                    const char *filename,
568                    fdb_config *fconfig)
569{
570#ifdef _MEMPOOL
571    mempool_init();
572#endif
573
574    fdb_config config;
575    fdb_file_handle *fhandle;
576    fdb_kvs_handle *handle;
577
578    if (fconfig) {
579        if (validate_fdb_config(fconfig)) {
580            config = *fconfig;
581        } else {
582            return FDB_RESULT_INVALID_CONFIG;
583        }
584    } else {
585        config = get_default_config();
586    }
587
588    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
589    if (!fhandle) { // LCOV_EXCL_START
590        return FDB_RESULT_ALLOC_FAIL;
591    } // LCOV_EXCL_STOP
592
593    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
594    if (!handle) { // LCOV_EXCL_START
595        free(fhandle);
596        return FDB_RESULT_ALLOC_FAIL;
597    } // LCOV_EXCL_STOP
598
599    atomic_init_uint8_t(&handle->handle_busy, 0);
600    handle->shandle = NULL;
601    handle->kvs_config = get_default_kvs_config();
602
603    fdb_status fs = fdb_init(fconfig);
604    if (fs != FDB_RESULT_SUCCESS) {
605        free(handle);
606        free(fhandle);
607        return fs;
608    }
609    fdb_file_handle_init(fhandle, handle);
610
611    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
612    if (fs == FDB_RESULT_SUCCESS) {
613        *ptr_fhandle = fhandle;
614    } else {
615        *ptr_fhandle = NULL;
616        free(handle);
617        fdb_file_handle_free(fhandle);
618    }
619    spin_lock(&initial_lock);
620    fdb_open_inprog--;
621    spin_unlock(&initial_lock);
622    return fs;
623}
624
625LIBFDB_API
626fdb_status fdb_open_custom_cmp(fdb_file_handle **ptr_fhandle,
627                               const char *filename,
628                               fdb_config *fconfig,
629                               size_t num_functions,
630                               char **kvs_names,
631                               fdb_custom_cmp_variable *functions)
632{
633#ifdef _MEMPOOL
634    mempool_init();
635#endif
636
637    fdb_config config;
638    fdb_file_handle *fhandle;
639    fdb_kvs_handle *handle;
640
641    if (fconfig) {
642        if (validate_fdb_config(fconfig)) {
643            config = *fconfig;
644        } else {
645            return FDB_RESULT_INVALID_CONFIG;
646        }
647    } else {
648        config = get_default_config();
649    }
650
651    if (config.multi_kv_instances == false) {
652        // single KV instance mode does not support customized cmp function
653        return FDB_RESULT_INVALID_CONFIG;
654    }
655
656    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
657    if (!fhandle) { // LCOV_EXCL_START
658        return FDB_RESULT_ALLOC_FAIL;
659    } // LCOV_EXCL_STOP
660
661    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
662    if (!handle) { // LCOV_EXCL_START
663        free(fhandle);
664        return FDB_RESULT_ALLOC_FAIL;
665    } // LCOV_EXCL_STOP
666
667    atomic_init_uint8_t(&handle->handle_busy, 0);
668    handle->shandle = NULL;
669    handle->kvs_config = get_default_kvs_config();
670
671    fdb_status fs = fdb_init(fconfig);
672    if (fs != FDB_RESULT_SUCCESS) {
673        free(handle);
674        free(fhandle);
675        return fs;
676    }
677    fdb_file_handle_init(fhandle, handle);
678
679    // insert kvs_names and functions into fhandle's list
680    fdb_file_handle_parse_cmp_func(fhandle, num_functions,
681                                   kvs_names, functions);
682
683    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
684    if (fs == FDB_RESULT_SUCCESS) {
685        *ptr_fhandle = fhandle;
686    } else {
687        *ptr_fhandle = NULL;
688        free(handle);
689        fdb_file_handle_free(fhandle);
690    }
691    spin_lock(&initial_lock);
692    fdb_open_inprog--;
693    spin_unlock(&initial_lock);
694    return fs;
695}
696
697fdb_status fdb_open_for_compactor(fdb_file_handle **ptr_fhandle,
698                                  const char *filename,
699                                  fdb_config *fconfig,
700                                  struct list *cmp_func_list)
701{
702#ifdef _MEMPOOL
703    mempool_init();
704#endif
705
706    fdb_file_handle *fhandle;
707    fdb_kvs_handle *handle;
708
709    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
710    if (!fhandle) { // LCOV_EXCL_START
711        return FDB_RESULT_ALLOC_FAIL;
712    } // LCOV_EXCL_STOP
713
714    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
715    if (!handle) { // LCOV_EXCL_START
716        free(fhandle);
717        return FDB_RESULT_ALLOC_FAIL;
718    } // LCOV_EXCL_STOP
719
720    atomic_init_uint8_t(&handle->handle_busy, 0);
721    handle->shandle = NULL;
722
723    fdb_file_handle_init(fhandle, handle);
724    if (cmp_func_list) {
725        fdb_file_handle_clone_cmp_func_list(fhandle, cmp_func_list);
726    }
727    fdb_status fs = _fdb_open(handle, filename, FDB_VFILENAME, fconfig);
728    if (fs == FDB_RESULT_SUCCESS) {
729        *ptr_fhandle = fhandle;
730    } else {
731        *ptr_fhandle = NULL;
732        free(handle);
733        fdb_file_handle_free(fhandle);
734    }
735    return fs;
736}
737
738LIBFDB_API
739fdb_status fdb_snapshot_open(fdb_kvs_handle *handle_in,
740                             fdb_kvs_handle **ptr_handle, fdb_seqnum_t seqnum)
741{
742#ifdef _MEMPOOL
743    mempool_init();
744#endif
745
746    fdb_config config = handle_in->config;
747    fdb_kvs_config kvs_config = handle_in->kvs_config;
748    fdb_kvs_handle *handle;
749    fdb_status fs;
750    filemgr *file;
751    file_status_t fstatus = FILE_NORMAL;
752
753    if (!handle_in || !ptr_handle) {
754        return FDB_RESULT_INVALID_ARGS;
755    }
756
757    // Sequence trees are a must for snapshot creation
758    if (handle_in->config.seqtree_opt != FDB_SEQTREE_USE) {
759        return FDB_RESULT_INVALID_CONFIG;
760    }
761
762fdb_snapshot_open_start:
763    if (!handle_in->shandle) {
764        fdb_check_file_reopen(handle_in, &fstatus);
765        fdb_sync_db_header(handle_in);
766        file = handle_in->file;
767
768        if (handle_in->kvs && handle_in->kvs->type == KVS_SUB) {
769            handle_in->seqnum = fdb_kvs_get_seqnum(file,
770                                                   handle_in->kvs->id);
771        } else {
772            handle_in->seqnum = filemgr_get_seqnum(file);
773        }
774    } else {
775        file = handle_in->file;
776    }
777
778    // if the max sequence number seen by this handle is lower than the
779    // requested snapshot marker, it means the snapshot is not yet visible
780    // even via the current fdb_kvs_handle
781    if (seqnum != FDB_SNAPSHOT_INMEM && seqnum > handle_in->seqnum) {
782        return FDB_RESULT_NO_DB_INSTANCE;
783    }
784
785    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
786    if (!handle) { // LCOV_EXCL_START
787        return FDB_RESULT_ALLOC_FAIL;
788    } // LCOV_EXCL_STOP
789
790    atomic_init_uint8_t(&handle->handle_busy, 0);
791    handle->log_callback = handle_in->log_callback;
792    handle->max_seqnum = seqnum;
793    handle->fhandle = handle_in->fhandle;
794
795    config.flags |= FDB_OPEN_FLAG_RDONLY;
796    // do not perform compaction for snapshot
797    config.compaction_mode = FDB_COMPACTION_MANUAL;
798
799    // If cloning an existing snapshot handle, then rewind indexes
800    // to its last DB header and point its avl tree to existing snapshot's tree
801    if (handle_in->shandle) {
802        handle->last_hdr_bid = handle_in->last_hdr_bid; // do fast rewind
803        if (snap_clone(handle_in->shandle, handle_in->max_seqnum,
804                   &handle->shandle, seqnum) == FDB_RESULT_SUCCESS) {
805            handle->max_seqnum = FDB_SNAPSHOT_INMEM; // temp value to skip WAL
806        }
807    }
808
809    if (!handle->shandle) {
810        handle->shandle = (struct snap_handle *) calloc(1, sizeof(snap_handle));
811        if (!handle->shandle) { // LCOV_EXCL_START
812            free(handle);
813            return FDB_RESULT_ALLOC_FAIL;
814        } // LCOV_EXCL_STOP
815        snap_init(handle->shandle, handle_in);
816    }
817
818    if (handle_in->kvs) {
819        // sub-handle in multi KV instance mode
820        fs = _fdb_kvs_open(handle_in->kvs->root,
821                              &config, &kvs_config, file,
822                              file->filename,
823                              _fdb_kvs_get_name(handle_in,
824                                                   file),
825                              handle);
826    } else {
827        fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
828    }
829
830    if (fs == FDB_RESULT_SUCCESS) {
831        if (seqnum == FDB_SNAPSHOT_INMEM &&
832            !handle_in->shandle) {
833            fdb_seqnum_t upto_seq = seqnum;
834            // In-memory snapshot
835            wal_snapshot(handle->file, (void *)handle->shandle,
836                         handle_in->txn, &upto_seq, _fdb_wal_snapshot_func);
837            // set seqnum based on handle type (multikv or default)
838            if (handle_in->kvs && handle_in->kvs->id > 0) {
839                handle->max_seqnum =
840                    _fdb_kvs_get_seqnum(handle->file->kv_header,
841                                        handle_in->kvs->id);
842            } else {
843                handle->max_seqnum = filemgr_get_seqnum(handle->file);
844            }
845
846            // synchronize dirty root nodes if exist
847            if (filemgr_dirty_root_exist(handle->file)) {
848                bid_t dirty_idtree_root, dirty_seqtree_root;
849                filemgr_mutex_lock(handle->file);
850                filemgr_get_dirty_root(handle->file,
851                                       &dirty_idtree_root, &dirty_seqtree_root);
852                if (dirty_idtree_root != BLK_NOT_FOUND) {
853                    handle->trie->root_bid = dirty_idtree_root;
854                }
855                if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
856                    if (dirty_seqtree_root != BLK_NOT_FOUND) {
857                        if (handle->kvs) {
858                            handle->seqtrie->root_bid = dirty_seqtree_root;
859                        } else {
860                            btree_init_from_bid(handle->seqtree,
861                                                handle->seqtree->blk_handle,
862                                                handle->seqtree->blk_ops,
863                                                handle->seqtree->kv_ops,
864                                                handle->seqtree->blksize,
865                                                dirty_seqtree_root);
866                        }
867                    }
868                }
869                btreeblk_discard_blocks(handle->bhandle);
870                btreeblk_create_dirty_snapshot(handle->bhandle);
871                filemgr_mutex_unlock(handle->file);
872            }
873        } else if (handle->max_seqnum == FDB_SNAPSHOT_INMEM) {
874            // Snapshot is created on the other snapshot handle
875
876            handle->max_seqnum = handle_in->seqnum;
877
878            if (seqnum == FDB_SNAPSHOT_INMEM) {
879                // in-memory snapshot
880                // Clone dirty root nodes from the source snapshot by incrementing
881                // their ref counters
882                handle->trie->root_bid = handle_in->trie->root_bid;
883                if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
884                    if (handle->kvs) {
885                        handle->seqtrie->root_bid = handle_in->seqtrie->root_bid;
886                    } else {
887                        handle->seqtree->root_bid = handle_in->seqtree->root_bid;
888                    }
889                }
890                btreeblk_discard_blocks(handle->bhandle);
891                btreeblk_clone_dirty_snapshot(handle->bhandle,
892                                              handle_in->bhandle);
893            }
894        }
895        *ptr_handle = handle;
896    } else {
897        *ptr_handle = NULL;
898        snap_close(handle->shandle);
899        free(handle);
900        // If compactor thread had finished compaction just before this routine
901        // calls _fdb_open, then it is possible that the snapshot's DB header
902        // is only present in the new_file. So we must retry the snapshot
903        // open attempt IFF _fdb_open indicates FDB_RESULT_NO_DB_INSTANCE..
904        if (fs == FDB_RESULT_NO_DB_INSTANCE && fstatus == FILE_COMPACT_OLD) {
905            if (filemgr_get_file_status(file) == FILE_REMOVED_PENDING) {
906                goto fdb_snapshot_open_start;
907            }
908        }
909    }
910    return fs;
911}
912
913static fdb_status _fdb_reset(fdb_kvs_handle *handle, fdb_kvs_handle *handle_in);
914
915LIBFDB_API
916fdb_status fdb_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
917{
918#ifdef _MEMPOOL
919    mempool_init();
920#endif
921
922    fdb_config config;
923    fdb_kvs_handle *handle_in, *handle;
924    fdb_status fs;
925    fdb_seqnum_t old_seqnum;
926
927    if (!handle_ptr) {
928        return FDB_RESULT_INVALID_ARGS;
929    }
930
931    handle_in = *handle_ptr;
932    config = handle_in->config;
933
934    if (handle_in->kvs) {
935        return fdb_kvs_rollback(handle_ptr, seqnum);
936    }
937
938    // Sequence trees are a must for rollback
939    if (handle_in->config.seqtree_opt != FDB_SEQTREE_USE) {
940        return FDB_RESULT_INVALID_CONFIG;
941    }
942
943    if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
944        return fdb_log(&handle_in->log_callback, FDB_RESULT_RONLY_VIOLATION,
945                       "Warning: Rollback is not allowed on the read-only DB file '%s'.",
946                       handle_in->file->filename);
947    }
948
949    if (!atomic_cas_uint8_t(&handle_in->handle_busy, 0, 1)) {
950        return FDB_RESULT_HANDLE_BUSY;
951    }
952
953    filemgr_mutex_lock(handle_in->file);
954    filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
955    // All transactions should be closed before rollback
956    if (wal_txn_exists(handle_in->file)) {
957        filemgr_set_rollback(handle_in->file, 0);
958        filemgr_mutex_unlock(handle_in->file);
959        fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
960        return FDB_RESULT_FAIL_BY_TRANSACTION;
961    }
962
963    // If compaction is running, wait until it is aborted.
964    // TODO: Find a better way of waiting for the compaction abortion.
965    unsigned int sleep_time = 10000; // 10 ms.
966    file_status_t fstatus = filemgr_get_file_status(handle_in->file);
967    while (fstatus == FILE_COMPACT_OLD) {
968        filemgr_mutex_unlock(handle_in->file);
969        decaying_usleep(&sleep_time, 1000000);
970        filemgr_mutex_lock(handle_in->file);
971        fstatus = filemgr_get_file_status(handle_in->file);
972    }
973    if (fstatus == FILE_REMOVED_PENDING) {
974        filemgr_mutex_unlock(handle_in->file);
975        fdb_check_file_reopen(handle_in, NULL);
976    } else {
977        filemgr_mutex_unlock(handle_in->file);
978    }
979
980    fdb_sync_db_header(handle_in);
981
982    // if the max sequence number seen by this handle is lower than the
983    // requested snapshot marker, it means the snapshot is not yet visible
984    // even via the current fdb_kvs_handle
985    if (seqnum > handle_in->seqnum) {
986        filemgr_set_rollback(handle_in->file, 0); // allow mutations
987        fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
988        return FDB_RESULT_NO_DB_INSTANCE;
989    }
990
991    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
992    if (!handle) { // LCOV_EXCL_START
993        fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
994        return FDB_RESULT_ALLOC_FAIL;
995    } // LCOV_EXCL_STOP
996
997    atomic_init_uint8_t(&handle->handle_busy, 0);
998    handle->log_callback = handle_in->log_callback;
999    handle->fhandle = handle_in->fhandle;
1000    if (seqnum == 0) {
1001        fs = _fdb_reset(handle, handle_in);
1002    } else {
1003        handle->max_seqnum = seqnum;
1004        fs = _fdb_open(handle, handle_in->file->filename, FDB_AFILENAME,
1005                       &config);
1006    }
1007
1008    filemgr_set_rollback(handle_in->file, 0); // allow mutations
1009    if (fs == FDB_RESULT_SUCCESS) {
1010        // rollback the file's sequence number
1011        filemgr_mutex_lock(handle_in->file);
1012        old_seqnum = filemgr_get_seqnum(handle_in->file);
1013        filemgr_set_seqnum(handle_in->file, seqnum);
1014        filemgr_mutex_unlock(handle_in->file);
1015
1016        fs = _fdb_commit(handle, FDB_COMMIT_NORMAL);
1017        if (fs == FDB_RESULT_SUCCESS) {
1018            if (handle_in->txn) {
1019                handle->txn = handle_in->txn;
1020                handle_in->txn = NULL;
1021            }
1022            handle_in->fhandle->root = handle;
1023            _fdb_close_root(handle_in);
1024            handle->max_seqnum = 0;
1025            handle->seqnum = seqnum;
1026            *handle_ptr = handle;
1027        } else {
1028            // cancel the rolling-back of the sequence number
1029            filemgr_mutex_lock(handle_in->file);
1030            filemgr_set_seqnum(handle_in->file, old_seqnum);
1031            filemgr_mutex_unlock(handle_in->file);
1032            free(handle);
1033            fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
1034        }
1035    } else {
1036        free(handle);
1037        fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
1038    }
1039
1040    return fs;
1041}
1042
1043static void _fdb_init_file_config(const fdb_config *config,
1044                                  struct filemgr_config *fconfig) {
1045    fconfig->blocksize = config->blocksize;
1046    fconfig->ncacheblock = config->buffercache_size / config->blocksize;
1047    fconfig->chunksize = config->chunksize;
1048
1049    fconfig->options = 0x0;
1050    if (config->flags & FDB_OPEN_FLAG_CREATE) {
1051        fconfig->options |= FILEMGR_CREATE;
1052    }
1053    if (config->flags & FDB_OPEN_FLAG_RDONLY) {
1054        fconfig->options |= FILEMGR_READONLY;
1055    }
1056    if (!(config->durability_opt & FDB_DRB_ASYNC)) {
1057        fconfig->options |= FILEMGR_SYNC;
1058    }
1059
1060    fconfig->flag = 0x0;
1061    if ((config->durability_opt & FDB_DRB_ODIRECT) &&
1062        config->buffercache_size) {
1063        fconfig->flag |= _ARCH_O_DIRECT;
1064    }
1065
1066    fconfig->prefetch_duration = config->prefetch_duration;
1067    fconfig->num_wal_shards = config->num_wal_partitions;
1068    fconfig->num_bcache_shards = config->num_bcache_partitions;
1069}
1070
1071fdb_status _fdb_open(fdb_kvs_handle *handle,
1072                     const char *filename,
1073                     fdb_filename_mode_t filename_mode,
1074                     const fdb_config *config)
1075{
1076    struct filemgr_config fconfig;
1077    struct kvs_stat stat, empty_stat;
1078    bid_t trie_root_bid = BLK_NOT_FOUND;
1079    bid_t seq_root_bid = BLK_NOT_FOUND;
1080    fdb_seqnum_t seqnum = 0;
1081    fdb_seqtree_opt_t seqtree_opt = config->seqtree_opt;
1082    uint64_t ndocs = 0;
1083    uint64_t datasize = 0;
1084    uint64_t last_wal_flush_hdr_bid = BLK_NOT_FOUND;
1085    uint64_t kv_info_offset = BLK_NOT_FOUND;
1086    uint64_t header_flags = 0;
1087    uint8_t header_buf[FDB_BLOCKSIZE];
1088    char *compacted_filename = NULL;
1089    char *prev_filename = NULL;
1090    size_t header_len = 0;
1091    bool multi_kv_instances = config->multi_kv_instances;
1092
1093    uint64_t nlivenodes = 0;
1094    bid_t hdr_bid = 0; // initialize to zero for in-memory snapshot
1095    char actual_filename[FDB_MAX_FILENAME_LEN];
1096    char virtual_filename[FDB_MAX_FILENAME_LEN];
1097    char *target_filename = NULL;
1098    fdb_status status;
1099
1100    if (filename == NULL) {
1101        return FDB_RESULT_INVALID_ARGS;
1102    }
1103    if (strlen(filename) > (FDB_MAX_FILENAME_LEN - 8)) {
1104        // filename (including path) length is supported up to
1105        // (FDB_MAX_FILENAME_LEN - 8) bytes.
1106        return FDB_RESULT_TOO_LONG_FILENAME;
1107    }
1108
1109    if (filename_mode == FDB_VFILENAME &&
1110        !compactor_is_valid_mode(filename, (fdb_config *)config)) {
1111        return FDB_RESULT_INVALID_COMPACTION_MODE;
1112    }
1113
1114    _fdb_init_file_config(config, &fconfig);
1115
1116    if (filename_mode == FDB_VFILENAME) {
1117        compactor_get_actual_filename(filename, actual_filename,
1118                                      config->compaction_mode, &handle->log_callback);
1119    } else {
1120        strcpy(actual_filename, filename);
1121    }
1122
1123    if ( config->compaction_mode == FDB_COMPACTION_MANUAL ||
1124         (config->compaction_mode == FDB_COMPACTION_AUTO   &&
1125          filename_mode == FDB_VFILENAME) ) {
1126        // 1) manual compaction mode, OR
1127        // 2) auto compaction mode + 'filename' is virtual filename
1128        // -> copy 'filename'
1129        target_filename = (char *)filename;
1130    } else {
1131        // otherwise (auto compaction mode + 'filename' is actual filename)
1132        // -> copy 'virtual_filename'
1133        compactor_get_virtual_filename(filename, virtual_filename);
1134        target_filename = virtual_filename;
1135    }
1136
1137    handle->fileops = get_filemgr_ops();
1138    filemgr_open_result result = filemgr_open((char *)actual_filename,
1139                                              handle->fileops,
1140                                              &fconfig, &handle->log_callback);
1141    if (result.rv != FDB_RESULT_SUCCESS) {
1142        return (fdb_status) result.rv;
1143    }
1144
1145    handle->file = result.file;
1146    if (config->compaction_mode == FDB_COMPACTION_MANUAL &&
1147        strcmp(filename, actual_filename)) {
1148        // It is in-place compacted file if
1149        // 1) compaction mode is manual, and
1150        // 2) actual filename is different to the filename given by user.
1151        // In this case, set the in-place compaction flag.
1152        filemgr_set_in_place_compaction(handle->file, true);
1153    }
1154    if (filemgr_is_in_place_compaction_set(handle->file)) {
1155        // This file was in-place compacted.
1156        // set 'handle->filename' to the original filename to trigger file renaming
1157        compactor_get_virtual_filename(filename, virtual_filename);
1158        target_filename = virtual_filename;
1159    }
1160
1161    if (handle->filename) {
1162        handle->filename = (char *)realloc(handle->filename,
1163                                           strlen(target_filename)+1);
1164    } else {
1165        handle->filename = (char*)malloc(strlen(target_filename)+1);
1166    }
1167    strcpy(handle->filename, target_filename);
1168
1169    filemgr_mutex_lock(handle->file);
1170    // If cloning from a snapshot handle, fdb_snapshot_open would have already
1171    // set handle->last_hdr_bid to the block id of required header, so rewind..
1172    if (handle->shandle && handle->last_hdr_bid) {
1173        status = filemgr_fetch_header(handle->file, handle->last_hdr_bid,
1174                                      header_buf, &header_len, NULL,
1175                                      &handle->log_callback);
1176        if (status != FDB_RESULT_SUCCESS) {
1177            filemgr_mutex_unlock(handle->file);
1178            free(handle->filename);
1179            handle->filename = NULL;
1180            filemgr_close(handle->file, false, handle->filename,
1181                              &handle->log_callback);
1182            return status;
1183        }
1184    } else { // Normal open
1185        filemgr_get_header(handle->file, header_buf, &header_len);
1186        handle->last_hdr_bid = filemgr_get_header_bid(handle->file);
1187    }
1188
1189    // initialize the docio handle so kv headers may be read
1190    handle->dhandle = (struct docio_handle *)
1191                      calloc(1, sizeof(struct docio_handle));
1192    handle->dhandle->log_callback = &handle->log_callback;
1193    docio_init(handle->dhandle, handle->file, config->compress_document_body);
1194
1195    if (header_len > 0) {
1196        fdb_fetch_header(header_buf, &trie_root_bid,
1197                         &seq_root_bid, &ndocs, &nlivenodes,
1198                         &datasize, &last_wal_flush_hdr_bid, &kv_info_offset,
1199                         &header_flags, &compacted_filename, &prev_filename);
1200        // use existing setting for seqtree_opt
1201        if (header_flags & FDB_FLAG_SEQTREE_USE) {
1202            seqtree_opt = FDB_SEQTREE_USE;
1203        } else {
1204            seqtree_opt = FDB_SEQTREE_NOT_USE;
1205        }
1206        // set seqnum based on handle type (multikv or default)
1207        if (handle->kvs && handle->kvs->id > 0) {
1208            if (kv_info_offset != BLK_NOT_FOUND) {
1209                if (!handle->file->kv_header) {
1210                    fdb_kvs_header_create(handle->file);
1211                    // KV header already exists but not loaded .. read & import
1212                    fdb_kvs_header_read(handle->file, handle->dhandle,
1213                                        kv_info_offset, false);
1214                }
1215                seqnum = _fdb_kvs_get_seqnum(handle->file->kv_header,
1216                                             handle->kvs->id);
1217            } else { // no kv_info offset, ok to set seqnum to zero
1218                seqnum = 0;
1219            }
1220        } else {
1221            seqnum = filemgr_get_seqnum(handle->file);
1222        }
1223        // other flags
1224        if (header_flags & FDB_FLAG_ROOT_INITIALIZED) {
1225            handle->fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1226        }
1227        if (header_flags & FDB_FLAG_ROOT_CUSTOM_CMP) {
1228            handle->fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1229        }
1230        // use existing setting for multi KV instance mode
1231        if (kv_info_offset == BLK_NOT_FOUND) {
1232            multi_kv_instances = false;
1233        } else {
1234            multi_kv_instances = true;
1235        }
1236    }
1237
1238    handle->config = *config;
1239    handle->config.seqtree_opt = seqtree_opt;
1240    handle->config.multi_kv_instances = multi_kv_instances;
1241
1242    if (handle->shandle && handle->max_seqnum == FDB_SNAPSHOT_INMEM) {
1243        // Either an in-memory snapshot or cloning from an existing snapshot..
1244        filemgr_mutex_unlock(handle->file);
1245        hdr_bid = 0; // This prevents _fdb_restore_wal() as incoming handle's
1246                     // *_open() should have already restored it
1247    } else { // Persisted snapshot or file rollback..
1248        filemgr_mutex_unlock(handle->file);
1249
1250        hdr_bid = filemgr_get_pos(handle->file) / FDB_BLOCKSIZE;
1251        if (hdr_bid > 0) {
1252            --hdr_bid;
1253        }
1254        if (handle->max_seqnum) {
1255            struct kvs_stat stat_ori;
1256            // backup original stats
1257            if (handle->kvs) {
1258                _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1259            } else {
1260                _kvs_stat_get(handle->file, 0, &stat_ori);
1261            }
1262
1263            if (hdr_bid > handle->last_hdr_bid){
1264                // uncommitted data exists beyond the last DB header
1265                // get the last committed seq number
1266                fdb_seqnum_t seq_commit;
1267                seq_commit = fdb_kvs_get_committed_seqnum(handle);
1268                if (seq_commit == 0 || seq_commit < handle->max_seqnum) {
1269                    // In case, snapshot_open is attempted with latest uncommitted
1270                    // sequence number
1271                    header_len = 0;
1272                }
1273            }
1274            // Reverse scan the file to locate the DB header with seqnum marker
1275            while (header_len && seqnum != handle->max_seqnum) {
1276                hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
1277                                          header_buf, &header_len, &seqnum,
1278                                          &handle->log_callback);
1279                if (header_len == 0) {
1280                    continue; // header doesn't exist
1281                }
1282                fdb_fetch_header(header_buf, &trie_root_bid,
1283                                 &seq_root_bid, &ndocs, &nlivenodes,
1284                                 &datasize, &last_wal_flush_hdr_bid,
1285                                 &kv_info_offset, &header_flags,
1286                                 &compacted_filename, NULL);
1287                handle->last_hdr_bid = hdr_bid;
1288
1289                if (!handle->kvs || handle->kvs->id == 0) {
1290                    // single KVS mode OR default KVS
1291                    if (!handle->shandle) {
1292                        // rollback
1293                        struct kvs_stat stat_dst;
1294                        _kvs_stat_get(handle->file, 0, &stat_dst);
1295                        stat_dst.ndocs = ndocs;
1296                        stat_dst.datasize = datasize;
1297                        stat_dst.nlivenodes = nlivenodes;
1298                        _kvs_stat_set(handle->file, 0, stat_dst);
1299                    }
1300                    continue;
1301                }
1302
1303                uint64_t doc_offset;
1304                struct kvs_header *kv_header;
1305                struct docio_object doc;
1306
1307                _fdb_kvs_header_create(&kv_header);
1308                memset(&doc, 0, sizeof(struct docio_object));
1309                doc_offset = docio_read_doc(handle->dhandle,
1310                                            kv_info_offset, &doc, true);
1311
1312                if (doc_offset == kv_info_offset) {
1313                    header_len = 0; // fail
1314                    _fdb_kvs_header_free(kv_header);
1315                } else {
1316                    _fdb_kvs_header_import(kv_header, doc.body,
1317                                           doc.length.bodylen, false);
1318                    // get local sequence number for the KV instance
1319                    seqnum = _fdb_kvs_get_seqnum(kv_header,
1320                                                 handle->kvs->id);
1321                    if (!handle->shandle) {
1322                        // rollback: replace kv_header stats
1323                        // read from the current header's kv_header
1324                        struct kvs_stat stat_src, stat_dst;
1325                        _kvs_stat_get_kv_header(kv_header,
1326                                                handle->kvs->id,
1327                                                &stat_src);
1328                        _kvs_stat_get(handle->file,
1329                                      handle->kvs->id,
1330                                      &stat_dst);
1331                        // update ndocs, datasize, nlivenodes
1332                        // into the current file's kv_header
1333                        // Note: stats related to WAL should not be updated
1334                        //       at this time. They will be adjusted through
1335                        //       discard & restore routines below.
1336                        stat_dst.ndocs = stat_src.ndocs;
1337                        stat_dst.datasize = stat_src.datasize;
1338                        stat_dst.nlivenodes = stat_src.nlivenodes;
1339                        _kvs_stat_set(handle->file,
1340                                      handle->kvs->id,
1341                                      stat_dst);
1342                    }
1343                    _fdb_kvs_header_free(kv_header);
1344                    free_docio_object(&doc, 1, 1, 1);
1345                }
1346            }
1347            if (!header_len) { // Marker MUST match that of DB commit!
1348                // rollback original stats
1349                if (handle->kvs) {
1350                    _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1351                } else {
1352                    _kvs_stat_get(handle->file, 0, &stat_ori);
1353                }
1354
1355                docio_free(handle->dhandle);
1356                free(handle->dhandle);
1357                free(handle->filename);
1358                free(prev_filename);
1359                handle->filename = NULL;
1360                filemgr_close(handle->file, false, handle->filename,
1361                              &handle->log_callback);
1362                return FDB_RESULT_NO_DB_INSTANCE;
1363            }
1364
1365            if (!handle->shandle) { // Rollback mode, destroy file WAL..
1366                if (handle->config.multi_kv_instances) {
1367                    // multi KV instance mode
1368                    // clear only WAL items belonging to the instance
1369                    wal_close_kv_ins(handle->file,
1370                                     (handle->kvs)?(handle->kvs->id):(0));
1371                } else {
1372                    wal_shutdown(handle->file);
1373                }
1374            }
1375        } else { // snapshot to sequence number 0 requested..
1376            if (handle->shandle) { // fdb_snapshot_open API call
1377                if (seqnum) {
1378                    // Database currently has a non-zero seq number,
1379                    // but the snapshot was requested with a seq number zero.
1380                    docio_free(handle->dhandle);
1381                    free(handle->dhandle);
1382                    free(handle->filename);
1383                    free(prev_filename);
1384                    handle->filename = NULL;
1385                    filemgr_close(handle->file, false, handle->filename,
1386                                  &handle->log_callback);
1387                    return FDB_RESULT_NO_DB_INSTANCE;
1388                }
1389            } // end of zero max_seqnum but non-rollback check
1390        } // end of zero max_seqnum check
1391    } // end of durable snapshot locating
1392
1393    handle->btreeblkops = btreeblk_get_ops();
1394    handle->bhandle = (struct btreeblk_handle *)
1395                      calloc(1, sizeof(struct btreeblk_handle));
1396    handle->bhandle->log_callback = &handle->log_callback;
1397
1398    handle->dirty_updates = 0;
1399
1400    if (handle->config.compaction_buf_maxsize == 0) {
1401        handle->config.compaction_buf_maxsize = FDB_COMP_BUF_MINSIZE;
1402    }
1403
1404    btreeblk_init(handle->bhandle, handle->file, handle->file->blocksize);
1405
1406    handle->cur_header_revnum = filemgr_get_header_revnum(handle->file);
1407    handle->last_wal_flush_hdr_bid = last_wal_flush_hdr_bid;
1408
1409    memset(&empty_stat, 0x0, sizeof(empty_stat));
1410    _kvs_stat_get(handle->file, 0, &stat);
1411    if (!memcmp(&stat, &empty_stat, sizeof(stat))) { // first open
1412        // sync (default) KVS stat with DB header
1413        stat.nlivenodes = nlivenodes;
1414        stat.ndocs = ndocs;
1415        stat.datasize = datasize;
1416        _kvs_stat_set(handle->file, 0, stat);
1417    }
1418
1419    if (handle->config.multi_kv_instances) {
1420        // multi KV instance mode
1421        filemgr_mutex_lock(handle->file);
1422        if (kv_info_offset == BLK_NOT_FOUND) {
1423            // there is no KV header .. create & initialize
1424            fdb_kvs_header_create(handle->file);
1425            kv_info_offset = fdb_kvs_header_append(handle->file, handle->dhandle);
1426        } else if (handle->file->kv_header == NULL) {
1427            // KV header already exists but not loaded .. read & import
1428            fdb_kvs_header_create(handle->file);
1429            fdb_kvs_header_read(handle->file, handle->dhandle, kv_info_offset, false);
1430        }
1431
1432        // validation check for key order of all KV stores
1433        if (handle == handle->fhandle->root) {
1434            fdb_status fs = fdb_kvs_cmp_check(handle);
1435            filemgr_mutex_unlock(handle->file);
1436            if (fs != FDB_RESULT_SUCCESS) { // cmp function mismatch
1437                docio_free(handle->dhandle);
1438                free(handle->dhandle);
1439                btreeblk_free(handle->bhandle);
1440                free(handle->bhandle);
1441                free(handle->filename);
1442                handle->filename = NULL;
1443                filemgr_close(handle->file, false, handle->filename,
1444                              &handle->log_callback);
1445                return fs;
1446            }
1447        } else {
1448            filemgr_mutex_unlock(handle->file);
1449        }
1450    }
1451    handle->kv_info_offset = kv_info_offset;
1452
1453    if (handle->kv_info_offset != BLK_NOT_FOUND &&
1454        handle->kvs == NULL) {
1455        // multi KV instance mode .. turn on config flag
1456        handle->config.multi_kv_instances = true;
1457        // only super handle can be opened using fdb_open(...)
1458        fdb_kvs_info_create(NULL, handle, handle->file, NULL);
1459    }
1460
1461    if (handle->shandle) { // Populate snapshot stats..
1462        if (kv_info_offset == BLK_NOT_FOUND) { // Single KV mode
1463            memset(&handle->shandle->stat, 0x0,
1464                    sizeof(handle->shandle->stat));
1465            handle->shandle->stat.ndocs = ndocs;
1466            handle->shandle->stat.datasize = datasize;
1467            handle->shandle->stat.nlivenodes = nlivenodes;
1468        } else { // Multi KV instance mode, populate specific kv stats
1469            memset(&handle->shandle->stat, 0x0,
1470                    sizeof(handle->shandle->stat));
1471            _kvs_stat_get(handle->file, handle->kvs->id,
1472                    &handle->shandle->stat);
1473            // Since wal is restored below, we have to reset
1474            // wal stats to zero.
1475            handle->shandle->stat.wal_ndeletes = 0;
1476            handle->shandle->stat.wal_ndocs = 0;
1477        }
1478    }
1479
1480    // initialize pointer to the global operational stats of this KV store
1481    handle->op_stats = filemgr_get_ops_stats(handle->file, handle->kvs);
1482    fdb_assert(handle->op_stats, 0, 0);
1483
1484    handle->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1485    hbtrie_init(handle->trie, config->chunksize, OFFSET_SIZE,
1486                handle->file->blocksize, trie_root_bid,
1487                (void *)handle->bhandle, handle->btreeblkops,
1488                (void *)handle->dhandle, _fdb_readkey_wrap);
1489    // set aux for cmp wrapping function
1490    hbtrie_set_leaf_height_limit(handle->trie, 0xff);
1491    hbtrie_set_leaf_cmp(handle->trie, _fdb_custom_cmp_wrap);
1492
1493    if (handle->kvs) {
1494        hbtrie_set_map_function(handle->trie, fdb_kvs_find_cmp_chunk);
1495    }
1496
1497    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1498        handle->seqnum = seqnum;
1499
1500        if (handle->config.multi_kv_instances) {
1501            // multi KV instance mode .. HB+trie
1502            handle->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1503            hbtrie_init(handle->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
1504                        handle->file->blocksize, seq_root_bid,
1505                        (void *)handle->bhandle, handle->btreeblkops,
1506                        (void *)handle->dhandle, _fdb_readseq_wrap);
1507
1508        } else {
1509            // single KV instance mode .. normal B+tree
1510            struct btree_kv_ops *seq_kv_ops =
1511                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
1512            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
1513            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
1514
1515            handle->seqtree = (struct btree*)malloc(sizeof(struct btree));
1516            if (seq_root_bid == BLK_NOT_FOUND) {
1517                btree_init(handle->seqtree, (void *)handle->bhandle,
1518                           handle->btreeblkops, seq_kv_ops,
1519                           handle->config.blocksize, sizeof(fdb_seqnum_t),
1520                           OFFSET_SIZE, 0x0, NULL);
1521             }else{
1522                 btree_init_from_bid(handle->seqtree, (void *)handle->bhandle,
1523                                     handle->btreeblkops, seq_kv_ops,
1524                                     handle->config.blocksize, seq_root_bid);
1525             }
1526        }
1527    }else{
1528        handle->seqtree = NULL;
1529    }
1530
1531    if (handle->config.multi_kv_instances && handle->max_seqnum) {
1532        // restore only docs belonging to the KV instance
1533        // handle->kvs should not be NULL
1534        _fdb_restore_wal(handle, FDB_RESTORE_KV_INS,
1535                         hdr_bid, (handle->kvs)?(handle->kvs->id):(0));
1536    } else {
1537        // normal restore
1538        _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, hdr_bid, 0);
1539    }
1540
1541    if (compacted_filename &&
1542        filemgr_get_file_status(handle->file) == FILE_NORMAL &&
1543        !(config->flags & FDB_OPEN_FLAG_RDONLY)) { // do not recover read-only
1544        _fdb_recover_compaction(handle, compacted_filename);
1545    }
1546
1547    if (prev_filename) {
1548        if (!handle->shandle && strcmp(prev_filename, handle->file->filename)) {
1549            // record the old filename into the file handle of current file
1550            // and REMOVE old file on the first open
1551            // WARNING: snapshots must have been opened before this call
1552            if (filemgr_update_file_status(handle->file,
1553                                           filemgr_get_file_status(handle->file),
1554                                           prev_filename)) {
1555                // Open the old file with read-only mode.
1556                // (Temporarily disable log callback at this time since
1557                //  the old file might be already removed.)
1558                fconfig.options = FILEMGR_READONLY;
1559                filemgr_open_result result = filemgr_open(prev_filename,
1560                                                          handle->fileops,
1561                                                          &fconfig,
1562                                                          NULL);
1563                if (result.file) {
1564                    filemgr_remove_pending(result.file, handle->file);
1565                    filemgr_close(result.file, 0, handle->filename,
1566                                  &handle->log_callback);
1567                }
1568            }
1569        } else {
1570            free(prev_filename);
1571        }
1572    }
1573
1574    status = btreeblk_end(handle->bhandle);
1575    fdb_assert(status == FDB_RESULT_SUCCESS, status, handle);
1576
1577    // do not register read-only handles
1578    if (!(config->flags & FDB_OPEN_FLAG_RDONLY) &&
1579        config->compaction_mode == FDB_COMPACTION_AUTO) {
1580        status = compactor_register_file(handle->file, (fdb_config *)config,
1581                                         handle->fhandle->cmp_func_list,
1582                                         &handle->log_callback);
1583    }
1584
1585#ifdef _TRACE_HANDLES
1586    spin_lock(&open_handle_lock);
1587    avl_insert(&open_handles, &handle->avl_trace, _fdb_handle_cmp);
1588    spin_unlock(&open_handle_lock);
1589#endif
1590    return status;
1591}
1592
1593LIBFDB_API
1594fdb_status fdb_set_log_callback(fdb_kvs_handle *handle,
1595                                fdb_log_callback log_callback,
1596                                void *ctx_data)
1597{
1598    handle->log_callback.callback = log_callback;
1599    handle->log_callback.ctx_data = ctx_data;
1600    return FDB_RESULT_SUCCESS;
1601}
1602
1603LIBFDB_API
1604fdb_status fdb_doc_create(fdb_doc **doc, const void *key, size_t keylen,
1605                          const void *meta, size_t metalen,
1606                          const void *body, size_t bodylen)
1607{
1608    if (doc == NULL || keylen > FDB_MAX_KEYLEN ||
1609        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
1610        return FDB_RESULT_INVALID_ARGS;
1611    }
1612
1613    *doc = (fdb_doc*)calloc(1, sizeof(fdb_doc));
1614    if (*doc == NULL) { // LCOV_EXCL_START
1615        return FDB_RESULT_ALLOC_FAIL;
1616    } // LCOV_EXCL_STOP
1617
1618    (*doc)->seqnum = SEQNUM_NOT_USED;
1619
1620    if (key && keylen > 0) {
1621        (*doc)->key = (void *)malloc(keylen);
1622        if ((*doc)->key == NULL) { // LCOV_EXCL_START
1623            return FDB_RESULT_ALLOC_FAIL;
1624        } // LCOV_EXCL_STOP
1625        memcpy((*doc)->key, key, keylen);
1626        (*doc)->keylen = keylen;
1627    } else {
1628        (*doc)->key = NULL;
1629        (*doc)->keylen = 0;
1630    }
1631
1632    if (meta && metalen > 0) {
1633        (*doc)->meta = (void *)malloc(metalen);
1634        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
1635            return FDB_RESULT_ALLOC_FAIL;
1636        } // LCOV_EXCL_STOP
1637        memcpy((*doc)->meta, meta, metalen);
1638        (*doc)->metalen = metalen;
1639    } else {
1640        (*doc)->meta = NULL;
1641        (*doc)->metalen = 0;
1642    }
1643
1644    if (body && bodylen > 0) {
1645        (*doc)->body = (void *)malloc(bodylen);
1646        if ((*doc)->body == NULL) { // LCOV_EXCL_START
1647            return FDB_RESULT_ALLOC_FAIL;
1648        } // LCOV_EXCL_STOP
1649        memcpy((*doc)->body, body, bodylen);
1650        (*doc)->bodylen = bodylen;
1651    } else {
1652        (*doc)->body = NULL;
1653        (*doc)->bodylen = 0;
1654    }
1655
1656    (*doc)->size_ondisk = 0;
1657    (*doc)->deleted = false;
1658
1659    return FDB_RESULT_SUCCESS;
1660}
1661
1662LIBFDB_API
1663fdb_status fdb_doc_update(fdb_doc **doc,
1664                          const void *meta, size_t metalen,
1665                          const void *body, size_t bodylen)
1666{
1667    if (doc == NULL ||
1668        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
1669        return FDB_RESULT_INVALID_ARGS;
1670    }
1671    if (*doc == NULL) {
1672        return FDB_RESULT_INVALID_ARGS;
1673    }
1674
1675    if (meta && metalen > 0) {
1676        // free previous metadata
1677        free((*doc)->meta);
1678        // allocate new metadata
1679        (*doc)->meta = (void *)malloc(metalen);
1680        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
1681            return FDB_RESULT_ALLOC_FAIL;
1682        } // LCOV_EXCL_STOP
1683        memcpy((*doc)->meta, meta, metalen);
1684        (*doc)->metalen = metalen;
1685    }
1686
1687    if (body && bodylen > 0) {
1688        // free previous body
1689        free((*doc)->body);
1690        // allocate new body
1691        (*doc)->body = (void *)malloc(bodylen);
1692        if ((*doc)->body == NULL) { // LCOV_EXCL_START
1693            return FDB_RESULT_ALLOC_FAIL;
1694        } // LCOV_EXCL_STOP
1695        memcpy((*doc)->body, body, bodylen);
1696        (*doc)->bodylen = bodylen;
1697    }
1698
1699    return FDB_RESULT_SUCCESS;
1700}
1701
1702// doc MUST BE allocated by malloc
1703LIBFDB_API
1704fdb_status fdb_doc_free(fdb_doc *doc)
1705{
1706    if (doc) {
1707        free(doc->key);
1708        free(doc->meta);
1709        free(doc->body);
1710        free(doc);
1711    }
1712    return FDB_RESULT_SUCCESS;
1713}
1714
1715INLINE uint64_t _fdb_wal_get_old_offset(void *voidhandle,
1716                                        struct wal_item *item)
1717{
1718    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
1719    uint64_t old_offset = 0;
1720
1721    hbtrie_find_offset(handle->trie,
1722                       item->header->key,
1723                       item->header->keylen,
1724                       (void*)&old_offset);
1725    btreeblk_end(handle->bhandle);
1726    old_offset = _endian_decode(old_offset);
1727
1728    return old_offset;
1729}
1730
1731INLINE fdb_status _fdb_wal_snapshot_func(void *handle, fdb_doc *doc,
1732                                         uint64_t offset) {
1733
1734    return snap_insert((struct snap_handle *)handle, doc, offset);
1735}
1736
1737INLINE fdb_status _fdb_wal_flush_func(void *voidhandle, struct wal_item *item)
1738{
1739    hbtrie_result hr;
1740    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
1741    fdb_seqnum_t _seqnum;
1742    fdb_kvs_id_t kv_id;
1743    fdb_status fs = FDB_RESULT_SUCCESS;
1744    uint8_t *var_key = alca(uint8_t, handle->config.chunksize);
1745    int size_id, size_seq;
1746    uint8_t *kvid_seqnum;
1747    uint64_t old_offset, _offset;
1748    int delta, r;
1749    struct filemgr *file = handle->dhandle->file;
1750    struct kvs_stat stat;
1751
1752    memset(var_key, 0, handle->config.chunksize);
1753    if (handle->kvs) {
1754        buf2kvid(handle->config.chunksize, item->header->key, &kv_id);
1755    } else {
1756        kv_id = 0;
1757    }
1758
1759    if (item->action == WAL_ACT_INSERT ||
1760        item->action == WAL_ACT_LOGICAL_REMOVE) {
1761        _offset = _endian_encode(item->offset);
1762
1763        r = _kvs_stat_get(file, kv_id, &stat);
1764        if (r != 0) {
1765            // KV store corresponding to kv_id is already removed
1766            // skip this item
1767            return FDB_RESULT_SUCCESS;
1768        }
1769        handle->bhandle->nlivenodes = stat.nlivenodes;
1770
1771        hr = hbtrie_insert(handle->trie,
1772                           item->header->key,
1773                           item->header->keylen,
1774                           (void *)&_offset,
1775                           (void *)&old_offset);
1776
1777        fs = btreeblk_end(handle->bhandle);
1778        if (fs != FDB_RESULT_SUCCESS) {
1779            return fs;
1780        }
1781        old_offset = _endian_decode(old_offset);
1782
1783        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1784            _seqnum = _endian_encode(item->seqnum);
1785            if (handle->kvs) {
1786                // multi KV instance mode .. HB+trie
1787                uint64_t old_offset_local;
1788
1789                size_id = sizeof(fdb_kvs_id_t);
1790                size_seq = sizeof(fdb_seqnum_t);
1791                kvid_seqnum = alca(uint8_t, size_id + size_seq);
1792                kvid2buf(size_id, kv_id, kvid_seqnum);
1793                memcpy(kvid_seqnum + size_id, &_seqnum, size_seq);
1794                hbtrie_insert(handle->seqtrie, kvid_seqnum, size_id + size_seq,
1795                              (void *)&_offset, (void *)&old_offset_local);
1796            } else {
1797                btree_insert(handle->seqtree, (void *)&_seqnum,
1798                             (void *)&_offset);
1799            }
1800            fs = btreeblk_end(handle->bhandle);
1801            if (fs != FDB_RESULT_SUCCESS) {
1802                return fs;
1803            }
1804        }
1805
1806        delta = (int)handle->bhandle->nlivenodes - (int)stat.nlivenodes;
1807        _kvs_stat_update_attr(file, kv_id, KVS_STAT_NLIVENODES, delta);
1808
1809        if (hr == HBTRIE_RESULT_SUCCESS) {
1810            if (item->action == WAL_ACT_INSERT) {
1811                _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, 1);
1812            }
1813            _kvs_stat_update_attr(file, kv_id, KVS_STAT_DATASIZE,
1814                                  item->doc_size);
1815        } else { // update or logical delete
1816            struct docio_length len;
1817            // This block is already cached when we call HBTRIE_INSERT.
1818            // No additional block access.
1819            len = docio_read_doc_length(handle->dhandle, old_offset);
1820
1821            if (!(len.flag & DOCIO_DELETED)) {
1822                if (item->action == WAL_ACT_LOGICAL_REMOVE) {
1823                    _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, -1);
1824                }
1825            } else {
1826                if (item->action == WAL_ACT_INSERT) {
1827                    _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, 1);
1828                }
1829            }
1830
1831            delta = (int)item->doc_size - (int)_fdb_get_docsize(len);
1832            _kvs_stat_update_attr(file, kv_id, KVS_STAT_DATASIZE, delta);
1833        }
1834    } else {
1835        // Immediate remove
1836        // LCOV_EXCL_START
1837        hr = hbtrie_remove(handle->trie, item->header->key,
1838                           item->header->keylen);
1839        fs = btreeblk_end(handle->bhandle);
1840        if (fs != FDB_RESULT_SUCCESS) {
1841            return fs;
1842        }
1843
1844        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1845            _seqnum = _endian_encode(item->seqnum);
1846            if (handle->kvs) {
1847                // multi KV instance mode .. HB+trie
1848                size_id = sizeof(fdb_kvs_id_t);
1849                size_seq = sizeof(fdb_seqnum_t);
1850                kvid_seqnum = alca(uint8_t, size_id + size_seq);
1851                kvid2buf(size_id, kv_id, kvid_seqnum);
1852                memcpy(kvid_seqnum + size_id, &_seqnum, size_seq);
1853
1854                hbtrie_remove(handle->seqtrie, (void*)kvid_seqnum,
1855                              size_id + size_seq);
1856            } else {
1857                btree_remove(handle->seqtree, (void*)&_seqnum);
1858            }
1859            fs = btreeblk_end(handle->bhandle);
1860            if (fs != FDB_RESULT_SUCCESS) {
1861                return fs;
1862            }
1863        }
1864
1865        if (hr == HBTRIE_RESULT_SUCCESS) {
1866            _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, -1);
1867            delta = -(int)item->doc_size;
1868            _kvs_stat_update_attr(file, kv_id, KVS_STAT_DATASIZE, delta);
1869        }
1870        // LCOV_EXCL_STOP
1871    }
1872    return FDB_RESULT_SUCCESS;
1873}
1874
1875void fdb_sync_db_header(fdb_kvs_handle *handle)
1876{
1877    uint64_t cur_revnum = filemgr_get_header_revnum(handle->file);
1878    if (handle->cur_header_revnum != cur_revnum) {
1879        void *header_buf = NULL;
1880        size_t header_len;
1881
1882        handle->last_hdr_bid = filemgr_get_header_bid(handle->file);
1883        header_buf = filemgr_get_header(handle->file, NULL, &header_len);
1884        if (header_len > 0) {
1885            uint64_t header_flags, dummy64;
1886            bid_t idtree_root;
1887            bid_t new_seq_root;
1888            char *compacted_filename;
1889            char *prev_filename = NULL;
1890
1891            fdb_fetch_header(header_buf, &idtree_root,
1892                             &new_seq_root,
1893                             &dummy64, &dummy64,
1894                             &dummy64, &handle->last_wal_flush_hdr_bid,
1895                             &handle->kv_info_offset, &header_flags,
1896                             &compacted_filename, &prev_filename);
1897
1898            if (handle->dirty_updates) {
1899                // discard all cached writable b+tree nodes
1900                // to avoid data inconsistency with other writers
1901                btreeblk_discard_blocks(handle->bhandle);
1902            }
1903
1904            handle->trie->root_bid = idtree_root;
1905
1906            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1907                if (new_seq_root != handle->seqtree->root_bid) {
1908                    if (handle->config.multi_kv_instances) {
1909                        handle->seqtrie->root_bid = new_seq_root;
1910                    } else {
1911                        btree_init_from_bid(handle->seqtree,
1912                                            handle->seqtree->blk_handle,
1913                                            handle->seqtree->blk_ops,
1914                                            handle->seqtree->kv_ops,
1915                                            handle->seqtree->blksize,
1916                                            new_seq_root);
1917                    }
1918                }
1919            }
1920
1921            if (prev_filename) {
1922                free(prev_filename);
1923            }
1924
1925            handle->cur_header_revnum = cur_revnum;
1926            handle->dirty_updates = 0;
1927            if (handle->kvs) {
1928                // multiple KV instance mode AND sub handle
1929                handle->seqnum = fdb_kvs_get_seqnum(handle->file,
1930                                                    handle->kvs->id);
1931            } else {
1932                // super handle OR single KV instance mode
1933                handle->seqnum = filemgr_get_seqnum(handle->file);
1934            }
1935        }
1936        if (header_buf) {
1937            free(header_buf);
1938        }
1939    }
1940}
1941
1942fdb_status fdb_check_file_reopen(fdb_kvs_handle *handle, file_status_t *status)
1943{
1944    fdb_status fs = FDB_RESULT_SUCCESS;
1945    file_status_t fstatus = filemgr_get_file_status(handle->file);
1946    // check whether the compaction is done
1947    if (fstatus == FILE_REMOVED_PENDING) {
1948        uint64_t ndocs, datasize, nlivenodes, last_wal_flush_hdr_bid;
1949        uint64_t kv_info_offset, header_flags;
1950        size_t header_len;
1951        char *new_filename;
1952        uint8_t *buf = alca(uint8_t, handle->config.blocksize);
1953        bid_t trie_root_bid, seq_root_bid;
1954        fdb_config config = handle->config;
1955
1956        // close the current file and newly open the new file
1957        if (handle->config.compaction_mode == FDB_COMPACTION_AUTO) {
1958            // compaction daemon mode .. just close and then open
1959            char filename[FDB_MAX_FILENAME_LEN];
1960            strcpy(filename, handle->filename);
1961            fs = _fdb_close(handle);
1962            fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
1963            fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
1964            fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
1965        } else {
1966            filemgr_get_header(handle->file, buf, &header_len);
1967            fdb_fetch_header(buf,
1968                             &trie_root_bid, &seq_root_bid,
1969                             &ndocs, &nlivenodes, &datasize, &last_wal_flush_hdr_bid,
1970                             &kv_info_offset, &header_flags,
1971                             &new_filename, NULL);
1972            fs = _fdb_close(handle);
1973            fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
1974            fs = _fdb_open(handle, new_filename, FDB_AFILENAME, &config);
1975            fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
1976        }
1977    }
1978    if (status) {
1979        *status = fstatus;
1980    }
1981    return fs;
1982}
1983
1984static bool _fdb_sync_dirty_root(fdb_kvs_handle *handle)
1985{
1986    bool locked = false;
1987    bid_t dirty_idtree_root, dirty_seqtree_root;
1988
1989    if (handle->shandle) {
1990        // skip snapshot
1991        return locked;
1992    }
1993
1994    if ( ( handle->dirty_updates ||
1995           filemgr_dirty_root_exist(handle->file) )  &&
1996         filemgr_get_header_bid(handle->file) == handle->last_hdr_bid ) {
1997        // 1) { a) dirty WAL flush by this handle exists OR
1998        //      b) dirty WAL flush by other handle exists } AND
1999        // 2) no commit was performed yet.
2000        // grab lock for writer
2001        filemgr_mutex_lock(handle->file);
2002        locked = true;
2003
2004        // get dirty root nodes
2005        filemgr_get_dirty_root(handle->file,
2006                               &dirty_idtree_root, &dirty_seqtree_root);
2007        if (dirty_idtree_root != BLK_NOT_FOUND) {
2008            handle->trie->root_bid = dirty_idtree_root;
2009        }
2010        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2011            if (dirty_seqtree_root != BLK_NOT_FOUND) {
2012                if (handle->kvs) {
2013                    handle->seqtrie->root_bid = dirty_seqtree_root;
2014                } else {
2015                    btree_init_from_bid(handle->seqtree,
2016                                        handle->seqtree->blk_handle,
2017                                        handle->seqtree->blk_ops,
2018                                        handle->seqtree->kv_ops,
2019                                        handle->seqtree->blksize,
2020                                        dirty_seqtree_root);
2021                }
2022            }
2023        }
2024        btreeblk_discard_blocks(handle->bhandle);
2025    }
2026    return locked;
2027}
2028
2029LIBFDB_API
2030fdb_status fdb_get(fdb_kvs_handle *handle, fdb_doc *doc)
2031{
2032    uint64_t offset, _offset;
2033    struct docio_object _doc;
2034    struct filemgr *wal_file = NULL;
2035    struct docio_handle *dhandle;
2036    fdb_status wr;
2037    hbtrie_result hr = HBTRIE_RESULT_FAIL;
2038    fdb_txn *txn;
2039    fdb_doc doc_kv = *doc;
2040
2041    if (!handle || !doc || !doc->key || doc->keylen == 0 ||
2042        doc->keylen > FDB_MAX_KEYLEN ||
2043        (handle->kvs_config.custom_cmp &&
2044            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2045        return FDB_RESULT_INVALID_ARGS;
2046    }
2047
2048    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2049        return FDB_RESULT_HANDLE_BUSY;
2050    }
2051
2052    if (handle->kvs) {
2053        // multi KV instance mode
2054        int size_chunk = handle->config.chunksize;
2055        doc_kv.keylen = doc->keylen + size_chunk;
2056        doc_kv.key = alca(uint8_t, doc_kv.keylen);
2057        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
2058        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
2059    }
2060
2061    if (!handle->shandle) {
2062        fdb_check_file_reopen(handle, NULL);
2063        fdb_sync_db_header(handle);
2064
2065        wal_file = handle->file;
2066        dhandle = handle->dhandle;
2067
2068        txn = handle->fhandle->root->txn;
2069        if (!txn) {
2070            txn = &wal_file->global_txn;
2071        }
2072        if (handle->kvs) {
2073            wr = wal_find(txn, wal_file, &doc_kv, &offset);
2074        } else {
2075            wr = wal_find(txn, wal_file, doc, &offset);
2076        }
2077    } else {
2078        if (handle->kvs) {
2079            wr = snap_find(handle->shandle, &doc_kv, &offset);
2080        } else {
2081            wr = snap_find(handle->shandle, doc, &offset);
2082        }
2083        dhandle = handle->dhandle;
2084    }
2085
2086    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2087
2088    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2089        bool locked = _fdb_sync_dirty_root(handle);
2090
2091        if (handle->kvs) {
2092            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
2093                             (void *)&offset);
2094        } else {
2095            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
2096                             (void *)&offset);
2097        }
2098        btreeblk_end(handle->bhandle);
2099        offset = _endian_decode(offset);
2100
2101        if (locked) {
2102            // grab lock for writer if there are dirty updates
2103            filemgr_mutex_unlock(handle->file);
2104        }
2105    }
2106
2107    if (wr == FDB_RESULT_SUCCESS || hr != HBTRIE_RESULT_FAIL) {
2108        bool alloced_meta = doc->meta ? false : true;
2109        bool alloced_body = doc->body ? false : true;
2110        if (handle->kvs) {
2111            _doc.key = doc_kv.key;
2112            _doc.length.keylen = doc_kv.keylen;
2113        } else {
2114            _doc.key = doc->key;
2115            _doc.length.keylen = doc->keylen;
2116        }
2117        _doc.meta = doc->meta;
2118        _doc.body = doc->body;
2119
2120        if (wr == FDB_RESULT_SUCCESS && doc->deleted) {
2121            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2122            return FDB_RESULT_KEY_NOT_FOUND;
2123        }
2124
2125        _offset = docio_read_doc(dhandle, offset, &_doc, true);
2126        if (_offset == offset) {
2127            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2128            return FDB_RESULT_KEY_NOT_FOUND;
2129        }
2130
2131        if (_doc.length.keylen != doc_kv.keylen ||
2132            _doc.length.flag & DOCIO_DELETED) {
2133            free_docio_object(&_doc, 0, alloced_meta, alloced_body);
2134            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2135            return FDB_RESULT_KEY_NOT_FOUND;
2136        }
2137
2138        doc->seqnum = _doc.seqnum;
2139        doc->metalen = _doc.length.metalen;
2140        doc->bodylen = _doc.length.bodylen;
2141        doc->meta = _doc.meta;
2142        doc->body = _doc.body;
2143        doc->deleted = _doc.length.flag & DOCIO_DELETED;
2144        doc->size_ondisk = _fdb_get_docsize(_doc.length);
2145        doc->offset = offset;
2146
2147        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2148        return FDB_RESULT_SUCCESS;
2149    }
2150
2151    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2152    return FDB_RESULT_KEY_NOT_FOUND;
2153}
2154
2155// search document metadata using key
2156LIBFDB_API
2157fdb_status fdb_get_metaonly(fdb_kvs_handle *handle, fdb_doc *doc)
2158{
2159    uint64_t offset;
2160    struct docio_object _doc;
2161    struct docio_handle *dhandle;
2162    struct filemgr *wal_file = NULL;
2163    fdb_status wr;
2164    hbtrie_result hr = HBTRIE_RESULT_FAIL;
2165    fdb_txn *txn;
2166    fdb_doc doc_kv = *doc;
2167
2168    if (!handle || !doc || !doc->key ||
2169        doc->keylen == 0 || doc->keylen > FDB_MAX_KEYLEN ||
2170        (handle->kvs_config.custom_cmp &&
2171            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2172        return FDB_RESULT_INVALID_ARGS;
2173    }
2174
2175    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2176        return FDB_RESULT_HANDLE_BUSY;
2177    }
2178
2179    if (handle->kvs) {
2180        // multi KV instance mode
2181        int size_chunk = handle->config.chunksize;
2182        doc_kv.keylen = doc->keylen + size_chunk;
2183        doc_kv.key = alca(uint8_t, doc_kv.keylen);
2184        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
2185        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
2186    }
2187
2188    if (!handle->shandle) {
2189        fdb_check_file_reopen(handle, NULL);
2190        fdb_sync_db_header(handle);
2191
2192        wal_file = handle->file;
2193        dhandle = handle->dhandle;
2194
2195        txn = handle->fhandle->root->txn;
2196        if (!txn) {
2197            txn = &wal_file->global_txn;
2198        }
2199        if (handle->kvs) {
2200            wr = wal_find(txn, wal_file, &doc_kv, &offset);
2201        } else {
2202            wr = wal_find(txn, wal_file, doc, &offset);
2203        }
2204    } else {
2205        if (handle->kvs) {
2206            wr = snap_find(handle->shandle, &doc_kv, &offset);
2207        } else {
2208            wr = snap_find(handle->shandle, doc, &offset);
2209        }
2210        dhandle = handle->dhandle;
2211    }
2212
2213    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2214
2215    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2216        bool locked = _fdb_sync_dirty_root(handle);
2217
2218        if (handle->kvs) {
2219            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
2220                             (void *)&offset);
2221        } else {
2222            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
2223                             (void *)&offset);
2224        }
2225        btreeblk_end(handle->bhandle);
2226        offset = _endian_decode(offset);
2227
2228        if (locked) {
2229            filemgr_mutex_unlock(handle->file);
2230        }
2231    }
2232
2233    if (wr == FDB_RESULT_SUCCESS || hr != HBTRIE_RESULT_FAIL) {
2234        if (handle->kvs) {
2235            _doc.key = doc_kv.key;
2236            _doc.length.keylen = doc_kv.keylen;
2237        } else {
2238            _doc.key = doc->key;
2239            _doc.length.keylen = doc->keylen;
2240        }
2241        bool alloced_meta = doc->meta ? false : true;
2242        _doc.meta = doc->meta;
2243        _doc.body = doc->body;
2244
2245        uint64_t body_offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
2246                                                       true);
2247        if (body_offset == offset){
2248            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2249            return FDB_RESULT_KEY_NOT_FOUND;
2250        }
2251
2252        if (_doc.length.keylen != doc_kv.keylen) {
2253            free_docio_object(&_doc, 0, alloced_meta, 0);
2254            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2255            return FDB_RESULT_KEY_NOT_FOUND;
2256        }
2257
2258        doc->seqnum = _doc.seqnum;
2259        doc->metalen = _doc.length.metalen;
2260        doc->bodylen = _doc.length.bodylen;
2261        doc->meta = _doc.meta;
2262        doc->body = _doc.body;
2263        doc->deleted = _doc.length.flag & DOCIO_DELETED;
2264        doc->size_ondisk = _fdb_get_docsize(_doc.length);
2265        doc->offset = offset;
2266
2267        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2268        return FDB_RESULT_SUCCESS;
2269    }
2270
2271    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2272    return FDB_RESULT_KEY_NOT_FOUND;
2273}
2274
2275// search document using sequence number
2276LIBFDB_API
2277fdb_status fdb_get_byseq(fdb_kvs_handle *handle, fdb_doc *doc)
2278{
2279    uint64_t offset, _offset;
2280    struct docio_object _doc;
2281    struct docio_handle *dhandle;
2282    struct filemgr *wal_file = NULL;
2283    fdb_status wr;
2284    btree_result br = BTREE_RESULT_FAIL;
2285    fdb_seqnum_t _seqnum;
2286    fdb_txn *txn;
2287
2288    if (!handle || !doc || doc->seqnum == SEQNUM_NOT_USED) {
2289        return FDB_RESULT_INVALID_ARGS;
2290    }
2291
2292    // Sequence trees are a must for byseq operations
2293    if (handle->config.seqtree_opt != FDB_SEQTREE_USE) {
2294        return FDB_RESULT_INVALID_CONFIG;
2295    }
2296
2297    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2298        return FDB_RESULT_HANDLE_BUSY;
2299    }
2300
2301    if (!handle->shandle) {
2302        fdb_check_file_reopen(handle, NULL);
2303        fdb_sync_db_header(handle);
2304
2305        wal_file = handle->file;
2306        dhandle = handle->dhandle;
2307
2308        txn = handle->fhandle->root->txn;
2309        if (!txn) {
2310            txn = &wal_file->global_txn;
2311        }
2312        // prevent searching by key in WAL if 'doc' is not empty
2313        size_t key_len = doc->keylen;
2314        doc->keylen = 0;
2315        if (handle->kvs) {
2316            wr = wal_find_kv_id(txn, wal_file, handle->kvs->id, doc, &offset);
2317        } else {
2318            wr = wal_find(txn, wal_file, doc, &offset);
2319        }
2320        doc->keylen = key_len;
2321    } else {
2322        wr = snap_find(handle->shandle, doc, &offset);
2323        dhandle = handle->dhandle;
2324    }
2325
2326    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2327
2328    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2329        bool locked = _fdb_sync_dirty_root(handle);
2330
2331        _seqnum = _endian_encode(doc->seqnum);
2332        if (handle->kvs) {
2333            int size_id, size_seq;
2334            uint8_t *kv_seqnum;
2335            hbtrie_result hr;
2336            fdb_kvs_id_t _kv_id;
2337
2338            _kv_id = _endian_encode(handle->kvs->id);
2339            size_id = sizeof(fdb_kvs_id_t);
2340            size_seq = sizeof(fdb_seqnum_t);
2341            kv_seqnum = alca(uint8_t, size_id + size_seq);
2342            memcpy(kv_seqnum, &_kv_id, size_id);
2343            memcpy(kv_seqnum + size_id, &_seqnum, size_seq);
2344            hr = hbtrie_find(handle->seqtrie, (void *)kv_seqnum,
2345                             size_id + size_seq, (void *)&offset);
2346            br = (hr == HBTRIE_RESULT_SUCCESS)?(BTREE_RESULT_SUCCESS):(br);
2347        } else {
2348            br = btree_find(handle->seqtree, (void *)&_seqnum, (void *)&offset);
2349        }
2350        btreeblk_end(handle->bhandle);
2351        offset = _endian_decode(offset);
2352
2353        if (locked) {
2354            filemgr_mutex_unlock(handle->file);
2355        }
2356    }
2357
2358    if (wr == FDB_RESULT_SUCCESS || br != BTREE_RESULT_FAIL) {
2359        bool alloc_key, alloc_meta, alloc_body;
2360        if (!handle->kvs) { // single KVS mode
2361            _doc.key = doc->key;
2362            _doc.length.keylen = doc->keylen;
2363            alloc_key = doc->key ? false : true;
2364        } else {
2365            _doc.key = NULL;
2366            alloc_key = true;
2367        }
2368        alloc_meta = doc->meta ? false : true;
2369        _doc.meta = doc->meta;
2370        alloc_body = doc->body ? false : true;
2371        _doc.body = doc->body;
2372
2373        if (wr == FDB_RESULT_SUCCESS && doc->deleted) {
2374            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2375            return FDB_RESULT_KEY_NOT_FOUND;
2376        }
2377
2378        _offset = docio_read_doc(dhandle, offset, &_doc, true);
2379        if (_offset == offset) {
2380            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2381            return FDB_RESULT_KEY_NOT_FOUND;
2382        }
2383
2384        if (_doc.length.flag & DOCIO_DELETED) {
2385            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2386            free_docio_object(&_doc, alloc_key, alloc_meta, alloc_body);
2387            return FDB_RESULT_KEY_NOT_FOUND;
2388        }
2389
2390        doc->seqnum = _doc.seqnum;
2391        if (handle->kvs) {
2392            int size_chunk = handle->config.chunksize;
2393            doc->keylen = _doc.length.keylen - size_chunk;
2394            if (doc->key) { // doc->key is given by user
2395                memcpy(doc->key, (uint8_t*)_doc.key + size_chunk, doc->keylen);
2396                free_docio_object(&_doc, 1, 0, 0);
2397            } else {
2398                doc->key = _doc.key;
2399                memmove(doc->key, (uint8_t*)doc->key + size_chunk, doc->keylen);
2400            }
2401        } else {
2402            doc->keylen = _doc.length.keylen;
2403            doc->key = _doc.key;
2404        }
2405        doc->metalen = _doc.length.metalen;
2406        doc->bodylen = _doc.length.bodylen;
2407        doc->meta = _doc.meta;
2408        doc->body = _doc.body;
2409        doc->deleted = _doc.length.flag & DOCIO_DELETED;
2410        doc->size_ondisk = _fdb_get_docsize(_doc.length);
2411        doc->offset = offset;
2412
2413        fdb_assert(doc->seqnum == _doc.seqnum, doc->seqnum, _doc.seqnum);
2414
2415        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2416        return FDB_RESULT_SUCCESS;
2417    }
2418
2419    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2420    return FDB_RESULT_KEY_NOT_FOUND;
2421}
2422
2423// search document metadata using sequence number
2424LIBFDB_API
2425fdb_status fdb_get_metaonly_byseq(fdb_kvs_handle *handle, fdb_doc *doc)
2426{
2427    uint64_t offset;
2428    struct docio_object _doc;
2429    struct docio_handle *dhandle;
2430    struct filemgr *wal_file = NULL;
2431    fdb_status wr;
2432    btree_result br = BTREE_RESULT_FAIL;
2433    fdb_seqnum_t _seqnum;
2434    fdb_txn *txn = handle->fhandle->root->txn;
2435
2436    if (!handle || !doc || doc->seqnum == SEQNUM_NOT_USED) {
2437        return FDB_RESULT_INVALID_ARGS;
2438    }
2439
2440    // Sequence trees are a must for byseq operations
2441    if (handle->config.seqtree_opt != FDB_SEQTREE_USE) {
2442        return FDB_RESULT_INVALID_CONFIG;
2443    }
2444
2445    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2446        return FDB_RESULT_HANDLE_BUSY;
2447    }
2448
2449    if (!handle->shandle) {
2450        fdb_check_file_reopen(handle, NULL);
2451        fdb_sync_db_header(handle);
2452
2453        wal_file = handle->file;
2454        dhandle = handle->dhandle;
2455
2456        if (!txn) {
2457            txn = &wal_file->global_txn;
2458        }
2459        // prevent searching by key in WAL if 'doc' is not empty
2460        size_t key_len = doc->keylen;
2461        doc->keylen = 0;
2462        if (handle->kvs) {
2463            wr = wal_find_kv_id(txn, wal_file, handle->kvs->id, doc, &offset);
2464        } else {
2465            wr = wal_find(txn, wal_file, doc, &offset);
2466        }
2467        doc->keylen = key_len;
2468    } else {
2469        wr = snap_find(handle->shandle, doc, &offset);
2470        dhandle = handle->dhandle;
2471    }
2472
2473    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2474
2475    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2476        bool locked = _fdb_sync_dirty_root(handle);
2477
2478        _seqnum = _endian_encode(doc->seqnum);
2479        if (handle->kvs) {
2480            int size_id, size_seq;
2481            uint8_t *kv_seqnum;
2482            hbtrie_result hr;
2483            fdb_kvs_id_t _kv_id;
2484
2485            _kv_id = _endian_encode(handle->kvs->id);
2486            size_id = sizeof(fdb_kvs_id_t);
2487            size_seq = sizeof(fdb_seqnum_t);
2488            kv_seqnum = alca(uint8_t, size_id + size_seq);
2489            memcpy(kv_seqnum, &_kv_id, size_id);
2490            memcpy(kv_seqnum + size_id, &_seqnum, size_seq);
2491            hr = hbtrie_find(handle->seqtrie, (void *)kv_seqnum,
2492                             size_id + size_seq, (void *)&offset);
2493            br = (hr == HBTRIE_RESULT_SUCCESS)?(BTREE_RESULT_SUCCESS):(br);
2494        } else {
2495            br = btree_find(handle->seqtree, (void *)&_seqnum, (void *)&offset);
2496        }
2497        btreeblk_end(handle->bhandle);
2498        offset = _endian_decode(offset);
2499
2500        if (locked) {
2501            filemgr_mutex_unlock(handle->file);
2502        }
2503    }
2504
2505    if (wr == FDB_RESULT_SUCCESS || br != BTREE_RESULT_FAIL) {
2506        if (!handle->kvs) { // single KVS mode
2507            _doc.key = doc->key;
2508            _doc.length.keylen = doc->keylen;
2509        } else {
2510            _doc.key = NULL;
2511        }
2512        _doc.meta = doc->meta;
2513        _doc.body = doc->body;
2514
2515        uint64_t body_offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
2516                                                       true);
2517        if (body_offset == offset) {
2518            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2519            return FDB_RESULT_KEY_NOT_FOUND;
2520        }
2521
2522        if (handle->kvs) {
2523            int size_chunk = handle->config.chunksize;
2524            doc->keylen = _doc.length.keylen - size_chunk;
2525            if (doc->key) { // doc->key is given by user
2526                memcpy(doc->key, (uint8_t*)_doc.key + size_chunk, doc->keylen);
2527                free_docio_object(&_doc, 1, 0, 0);
2528            } else {
2529                doc->key = _doc.key;
2530                memmove(doc->key, (uint8_t*)doc->key + size_chunk, doc->keylen);
2531            }
2532        } else {
2533            doc->keylen = _doc.length.keylen;
2534            doc->key = _doc.key;
2535        }
2536        doc->metalen = _doc.length.metalen;
2537        doc->bodylen = _doc.length.bodylen;
2538        doc->meta = _doc.meta;
2539        doc->body = _doc.body;
2540        doc->deleted = _doc.length.flag & DOCIO_DELETED;
2541        doc->size_ondisk = _fdb_get_docsize(_doc.length);
2542        doc->offset = offset;
2543
2544        fdb_assert(doc->seqnum == _doc.seqnum, doc->seqnum, _doc.seqnum);
2545
2546        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2547        return FDB_RESULT_SUCCESS;
2548    }
2549
2550    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2551    return FDB_RESULT_KEY_NOT_FOUND;
2552}
2553
2554static uint8_t equal_docs(fdb_doc *doc, struct docio_object *_doc) {
2555    uint8_t rv = 1;
2556    // Compare a seq num if seq tree is enabled.
2557    if (doc->seqnum != SEQNUM_NOT_USED) {
2558        if (doc->seqnum != _doc->seqnum) {
2559            free(_doc->key);
2560            free(_doc->meta);
2561            free(_doc->body);
2562            _doc->key = _doc->meta = _doc->body = NULL;
2563            rv = 0;
2564        }
2565    } else { // Compare key and metadata
2566        if ((doc->key && memcmp(doc->key, _doc->key, doc->keylen)) ||
2567            (doc->meta && memcmp(doc->meta, _doc->meta, doc->metalen))) {
2568            free(_doc->key);
2569            free(_doc->meta);
2570            free(_doc->body);
2571            _doc->key = _doc->meta = _doc->body = NULL;
2572            rv = 0;
2573        }
2574    }
2575    return rv;
2576}
2577
2578INLINE void _remove_kv_id(fdb_kvs_handle *handle, struct docio_object *doc)
2579{
2580    size_t size_chunk = handle->config.chunksize;
2581    doc->length.keylen -= size_chunk;
2582    memmove(doc->key, (uint8_t*)doc->key + size_chunk, doc->length.keylen);
2583}
2584
2585// Retrieve a doc's metadata and body with a given doc offset in the database file.
2586LIBFDB_API
2587fdb_status fdb_get_byoffset(fdb_kvs_handle *handle, fdb_doc *doc)
2588{
2589    uint64_t offset = doc->offset;
2590    struct docio_object _doc;
2591
2592    if (!offset) {
2593        return FDB_RESULT_INVALID_ARGS;
2594    }
2595
2596    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2597        return FDB_RESULT_HANDLE_BUSY;
2598    }
2599
2600    atomic_incr_uint64_t(&handle->op_stats->num_gets);
2601    memset(&_doc, 0, sizeof(struct docio_object));
2602
2603    uint64_t _offset = docio_read_doc(handle->dhandle, offset, &_doc, true);
2604    if (_offset == offset) {
2605        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2606        return FDB_RESULT_KEY_NOT_FOUND;
2607    } else {
2608        if (handle->kvs) {
2609            fdb_kvs_id_t kv_id;
2610            buf2kvid(handle->config.chunksize, _doc.key, &kv_id);
2611            if (kv_id != handle->kvs->id) {
2612                fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2613                free_docio_object(&_doc, 1, 1, 1);
2614                return FDB_RESULT_KEY_NOT_FOUND;
2615            }
2616            _remove_kv_id(handle, &_doc);
2617        }
2618        if (!equal_docs(doc, &_doc)) {
2619            free_docio_object(&_doc, 1, 1, 1);
2620            fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2621            return FDB_RESULT_KEY_NOT_FOUND;
2622        }
2623    }
2624
2625    doc->seqnum = _doc.seqnum;
2626    doc->keylen = _doc.length.keylen;
2627    doc->metalen = _doc.length.metalen;
2628    doc->bodylen = _doc.length.bodylen;
2629    if (doc->key) {
2630        free(_doc.key);
2631    } else {
2632        doc->key = _doc.key;
2633    }
2634    if (doc->meta) {
2635        free(_doc.meta);
2636    } else {
2637        doc->meta = _doc.meta;
2638    }
2639    if (doc->body) {
2640        if (_doc.length.bodylen > 0) {
2641            memcpy(doc->body, _doc.body, _doc.length.bodylen);
2642        }
2643        free(_doc.body);
2644    } else {
2645        doc->body = _doc.body;
2646    }
2647    doc->deleted = _doc.length.flag & DOCIO_DELETED;
2648    doc->size_ondisk = _fdb_get_docsize(_doc.length);
2649    if (handle->kvs) {
2650        // Since _doc.length was adjusted in _remove_kv_id(),
2651        // we need to compensate it.
2652        doc->size_ondisk += handle->config.chunksize;
2653    }
2654
2655    if (_doc.length.flag & DOCIO_DELETED) {
2656        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2657        return FDB_RESULT_KEY_NOT_FOUND;
2658    }
2659
2660    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2661    return FDB_RESULT_SUCCESS;
2662}
2663
2664INLINE uint64_t _fdb_get_wal_threshold(fdb_kvs_handle *handle)
2665{
2666    return handle->config.wal_threshold;
2667}
2668
2669LIBFDB_API
2670fdb_status fdb_set(fdb_kvs_handle *handle, fdb_doc *doc)
2671{
2672    uint64_t offset;
2673    struct docio_object _doc;
2674    struct filemgr *file;
2675    struct docio_handle *dhandle;
2676    struct timeval tv;
2677    bool txn_enabled = false;
2678    bool sub_handle = false;
2679    bool wal_flushed = false;
2680    file_status_t fstatus;
2681    fdb_txn *txn = handle->fhandle->root->txn;
2682    fdb_status wr = FDB_RESULT_SUCCESS;
2683
2684    if (handle->config.flags & FDB_OPEN_FLAG_RDONLY) {
2685        return fdb_log(&handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
2686                       "Warning: SET is not allowed on the read-only DB file '%s'.",
2687                       handle->file->filename);
2688    }
2689
2690    if ( doc->key == NULL || doc->keylen == 0 ||
2691        doc->keylen > FDB_MAX_KEYLEN ||
2692        (doc->metalen > 0 && doc->meta == NULL) ||
2693        (doc->bodylen > 0 && doc->body == NULL) ||
2694        (handle->kvs_config.custom_cmp &&
2695            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2696        return FDB_RESULT_INVALID_ARGS;
2697    }
2698
2699    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2700        return FDB_RESULT_HANDLE_BUSY;
2701    }
2702
2703    _doc.length.keylen = doc->keylen;
2704    _doc.length.metalen = doc->metalen;
2705    _doc.length.bodylen = doc->deleted ? 0 : doc->bodylen;
2706    _doc.key = doc->key;
2707    _doc.meta = doc->meta;
2708    _doc.body = doc->deleted ? NULL : doc->body;
2709
2710    if (handle->kvs) {
2711        // multi KV instance mode
2712        // allocate more (temporary) space for key, to store ID number
2713        int size_chunk = handle->config.chunksize;
2714        _doc.length.keylen = doc->keylen + size_chunk;
2715        _doc.key = alca(uint8_t, _doc.length.keylen);
2716        // copy ID
2717        kvid2buf(size_chunk, handle->kvs->id, _doc.key);
2718        // copy key
2719        memcpy((uint8_t*)_doc.key + size_chunk, doc->key, doc->keylen);
2720
2721        if (handle->kvs->type == KVS_SUB) {
2722            sub_handle = true;
2723        } else {
2724            sub_handle = false;
2725        }
2726    }
2727
2728fdb_set_start:
2729    fdb_check_file_reopen(handle, NULL);
2730    filemgr_mutex_lock(handle->file);
2731    fdb_sync_db_header(handle);
2732
2733    if (filemgr_is_rollback_on(handle->file)) {
2734        filemgr_mutex_unlock(handle->file);
2735        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2736        return FDB_RESULT_FAIL_BY_ROLLBACK;
2737    }
2738
2739    file = handle->file;
2740    dhandle = handle->dhandle;
2741
2742    fstatus = filemgr_get_file_status(file);
2743    if (fstatus == FILE_REMOVED_PENDING) {
2744        // we must not write into this file
2745        // file status was changed by other thread .. start over
2746        filemgr_mutex_unlock(file);
2747        goto fdb_set_start;
2748    }
2749
2750    if (sub_handle) {
2751        // multiple KV instance mode AND sub handle
2752        handle->seqnum = fdb_kvs_get_seqnum(file, handle->kvs->id) + 1;
2753        fdb_kvs_set_seqnum(file, handle->kvs->id, handle->seqnum);
2754    } else {
2755        // super handle OR single KV instance mode
2756        handle->seqnum = filemgr_get_seqnum(file) + 1;
2757        filemgr_set_seqnum(file, handle->seqnum);
2758    }
2759    _doc.seqnum = doc->seqnum = handle->seqnum;
2760
2761    if (doc->deleted) {
2762        // set timestamp
2763        gettimeofday(&tv, NULL);
2764        _doc.timestamp = (timestamp_t)tv.tv_sec;
2765    } else {
2766        _doc.timestamp = 0;
2767    }
2768
2769    if (txn) {
2770        txn_enabled = true;
2771    }
2772
2773    offset = docio_append_doc(dhandle, &_doc, doc->deleted, txn_enabled);
2774    if (offset == BLK_NOT_FOUND) {
2775        filemgr_mutex_unlock(file);
2776        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2777        return FDB_RESULT_WRITE_FAIL;
2778    }
2779
2780    doc->size_ondisk = _fdb_get_docsize(_doc.length);
2781    doc->offset = offset;
2782    if (!txn) {
2783        txn = &file->global_txn;
2784    }
2785    if (handle->kvs) {
2786        // multi KV instance mode
2787        fdb_doc kv_ins_doc = *doc;
2788        kv_ins_doc.key = _doc.key;
2789        kv_ins_doc.keylen = _doc.length.keylen;
2790        wal_insert(txn, file, &kv_ins_doc, offset, 0);
2791    } else {
2792        wal_insert(txn, file, doc, offset, 0);
2793    }
2794
2795    if (wal_get_dirty_status(file)== FDB_WAL_CLEAN) {
2796        wal_set_dirty_status(file, FDB_WAL_DIRTY);
2797    }
2798
2799    if (handle->config.wal_flush_before_commit ||
2800         handle->config.auto_commit) {
2801        bid_t dirty_idtree_root, dirty_seqtree_root;
2802
2803        if (!txn_enabled) {
2804            handle->dirty_updates = 1;
2805        }
2806
2807        // MUST ensure that 'file' is always 'handle->file',
2808        // because this routine will not be executed during compaction.
2809        filemgr_get_dirty_root(file, &dirty_idtree_root, &dirty_seqtree_root);
2810
2811        // other concurrent writer flushed WAL before commit,
2812        // sync root node of each tree
2813        if (dirty_idtree_root != BLK_NOT_FOUND) {
2814            handle->trie->root_bid = dirty_idtree_root;
2815        }
2816        if (handle->config.seqtree_opt == FDB_SEQTREE_USE &&
2817            dirty_seqtree_root != BLK_NOT_FOUND) {
2818            if (handle->kvs) {
2819                handle->seqtrie->root_bid = dirty_seqtree_root;
2820            } else {
2821                btree_init_from_bid(handle->seqtree,
2822                                    handle->seqtree->blk_handle,
2823                                    handle->seqtree->blk_ops,
2824                                    handle->seqtree->kv_ops,
2825                                    handle->seqtree->blksize,
2826                                    dirty_seqtree_root);
2827            }
2828        }
2829
2830        if (wal_get_num_flushable(file) > _fdb_get_wal_threshold(handle)) {
2831            struct avl_tree flush_items;
2832
2833            // discard all cached writable blocks
2834            // to avoid data inconsistency with other writers
2835            btreeblk_discard_blocks(handle->bhandle);
2836
2837            // commit only for non-transactional WAL entries
2838            wr = wal_commit(&file->global_txn, file, NULL, &handle->log_callback);
2839            if (wr != FDB_RESULT_SUCCESS) {
2840                filemgr_mutex_unlock(file);
2841                fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0),
2842                           1, 0);
2843                return wr;
2844            }
2845            wr = wal_flush(file, (void *)handle,
2846                      _fdb_wal_flush_func, _fdb_wal_get_old_offset,
2847                      &flush_items);
2848            if (wr != FDB_RESULT_SUCCESS) {
2849                filemgr_mutex_unlock(file);
2850                fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0),
2851                           1, 0);
2852                return wr;
2853            }
2854            wal_set_dirty_status(file, FDB_WAL_PENDING);
2855            // it is ok to release flushed items becuase
2856            // these items are not actually committed yet.
2857            // they become visible after fdb_commit is invoked.
2858            wal_release_flushed_items(file, &flush_items);
2859
2860            // sync new root node
2861            dirty_idtree_root = handle->trie->root_bid;
2862            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2863                if (handle->kvs) {
2864                    dirty_seqtree_root = handle->seqtrie->root_bid;
2865                } else {
2866                    dirty_seqtree_root = handle->seqtree->root_bid;
2867                }
2868            }
2869            filemgr_set_dirty_root(file,
2870                                   dirty_idtree_root,
2871                                   dirty_seqtree_root);
2872
2873            wal_flushed = true;
2874            btreeblk_reset_subblock_info(handle->bhandle);
2875        }
2876    }
2877
2878    filemgr_mutex_unlock(file);
2879
2880    if (!doc->deleted) {
2881        atomic_incr_uint64_t(&handle->op_stats->num_sets);
2882    }
2883
2884    if (wal_flushed && handle->config.auto_commit) {
2885        fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2886        return fdb_commit(handle->fhandle, FDB_COMMIT_NORMAL);
2887    }
2888    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2889    return FDB_RESULT_SUCCESS;
2890}
2891
2892LIBFDB_API
2893fdb_status fdb_del(fdb_kvs_handle *handle, fdb_doc *doc)
2894{
2895    if (handle->config.flags & FDB_OPEN_FLAG_RDONLY) {
2896        return fdb_log(&handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
2897                       "Warning: DEL is not allowed on the read-only DB file '%s'.",
2898                       handle->file->filename);
2899    }
2900
2901    if (doc->key == NULL || doc->keylen == 0 ||
2902        doc->keylen > FDB_MAX_KEYLEN ||
2903        (handle->kvs_config.custom_cmp &&
2904            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2905        return FDB_RESULT_INVALID_ARGS;
2906    }
2907
2908    doc->deleted = true;
2909    fdb_doc _doc;
2910    _doc = *doc;
2911    _doc.bodylen = 0;
2912    _doc.body = NULL;
2913
2914    atomic_incr_uint64_t(&handle->op_stats->num_dels);
2915
2916    return fdb_set(handle, &_doc);
2917}
2918
2919static uint64_t _fdb_export_header_flags(fdb_kvs_handle *handle)
2920{
2921    uint64_t rv = 0;
2922    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2923        // seq tree is used
2924        rv |= FDB_FLAG_SEQTREE_USE;
2925    }
2926    if (handle->fhandle->flags & FHANDLE_ROOT_INITIALIZED) {
2927        // the default KVS is once opened
2928        rv |= FDB_FLAG_ROOT_INITIALIZED;
2929    }
2930    if (handle->fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP) {
2931        // the default KVS is based on custom key order
2932        rv |= FDB_FLAG_ROOT_CUSTOM_CMP;
2933    }
2934    return rv;
2935}
2936
2937uint64_t fdb_set_file_header(fdb_kvs_handle *handle)
2938{
2939    /*
2940    <ForestDB header>
2941    [offset]: (description)
2942    [     0]: BID of root node of root B+Tree of HB+Trie: 8 bytes
2943    [     8]: BID of root node of seq B+Tree: 8 bytes (0xFF.. if not used)
2944    [    16]: # of live documents: 8 bytes
2945    [    24]: # of live B+Tree nodes: 8 bytes
2946    [    32]: Data size (byte): 8 bytes
2947    [    40]: BID of the DB header created when last WAL flush: 8 bytes
2948    [    48]: Offset of the document containing KV instances' info: 8 bytes
2949    [    56]: Header flags: 8 bytes
2950    [    64]: Size of newly compacted target file name : 2 bytes
2951    [    66]: Size of old file name before compaction :  2 bytes
2952    [    68]: File name of newly compacted file : x bytes
2953    [  68+x]: File name of old file before compcation : y bytes
2954    [68+x+y]: CRC32: 4 bytes
2955    total size (header's length): 72+x+y bytes
2956
2957    Note: the list of functions that need to be modified
2958          if the header structure is changed:
2959
2960        _fdb_redirect_header() in forestdb.cc
2961        filemgr_destory_file() in filemgr.cc
2962    */
2963    uint8_t *buf = alca(uint8_t, handle->config.blocksize);
2964    uint16_t new_filename_len = 0;
2965    uint16_t old_filename_len = 0;
2966    uint16_t _edn_safe_16;
2967    uint32_t crc;
2968    uint64_t _edn_safe_64;
2969    size_t offset = 0;
2970    struct filemgr *cur_file;
2971    struct kvs_stat stat;
2972
2973    cur_file = handle->file;
2974
2975    // hb+trie or idtree root bid
2976    _edn_safe_64 = _endian_encode(handle->trie->root_bid);
2977    seq_memcpy(buf + offset, &_edn_safe_64, sizeof(handle->trie->root_bid), offset);
2978
2979    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2980        // b+tree root bid
2981        _edn_safe_64 = _endian_encode(handle->seqtree->root_bid);
2982        seq_memcpy(buf + offset, &_edn_safe_64,
2983            sizeof(handle->seqtree->root_bid), offset);
2984    } else {
2985        memset(buf + offset, 0xff, sizeof(uint64_t));
2986        offset += sizeof(uint64_t);
2987    }
2988
2989    // get stat
2990    _kvs_stat_get(cur_file, 0, &stat);
2991
2992    // # docs
2993    _edn_safe_64 = _endian_encode(stat.ndocs);
2994    seq_memcpy(buf + offset, &_edn_safe_64, sizeof(_edn_safe_64), offset);
2995    // # live nodes
2996    _edn_safe_64 = _endian_encode(stat.nlivenodes);
2997    seq_memcpy(buf + offset, &_edn_safe_64,
2998               sizeof(_edn_safe_64), offset);
2999    // data size
3000    _edn_safe_64 = _endian_encode(stat.datasize);
3001    seq_memcpy(buf + offset, &_edn_safe_64, sizeof(_edn_safe_64), offset);
3002    // last header bid
3003    _edn_safe_64 = _endian_encode(handle->last_wal_flush_hdr_bid);
3004    seq_memcpy(buf + offset, &_edn_safe_64,
3005               sizeof(handle->last_wal_flush_hdr_bid), offset);
3006    // kv info offset
3007    _edn_safe_64 = _endian_encode(handle->kv_info_offset);
3008    seq_memcpy(buf + offset, &_edn_safe_64,
3009               sizeof(handle->kv_info_offset), offset);
3010    // header flags
3011    _edn_safe_64 = _fdb_export_header_flags(handle);
3012    _edn_safe_64 = _endian_encode(_edn_safe_64);
3013    seq_memcpy(buf + offset, &_edn_safe_64,
3014               sizeof(_edn_safe_64), offset);
3015
3016    // size of newly compacted target file name
3017    if (handle->file->new_file) {
3018        new_filename_len = strlen(handle->file->new_file->filename) + 1;
3019    }
3020    _edn_safe_16 = _endian_encode(new_filename_len);
3021    seq_memcpy(buf + offset, &_edn_safe_16, sizeof(new_filename_len), offset);
3022
3023    // size of old filename before compaction
3024    if (handle->file->old_filename) {
3025        old_filename_len = strlen(handle->file->old_filename) + 1;
3026    }
3027    _edn_safe_16 = _endian_encode(old_filename_len);
3028    seq_memcpy(buf + offset, &_edn_safe_16, sizeof(old_filename_len), offset);
3029
3030    if (new_filename_len) {
3031        seq_memcpy(buf + offset, handle->file->new_file->filename,
3032                   new_filename_len, offset);
3033    }
3034
3035    if (old_filename_len) {
3036        seq_memcpy(buf + offset, handle->file->old_filename,
3037                   old_filename_len, offset);
3038    }
3039
3040    // crc32
3041    crc = chksum(buf, offset);
3042    crc = _endian_encode(crc);
3043    seq_memcpy(buf + offset, &crc, sizeof(crc), offset);
3044
3045    return filemgr_update_header(handle->file, buf, offset);
3046}
3047
3048static
3049char *_fdb_redirect_header(uint8_t *buf, char *new_filename,
3050                                 uint16_t new_filename_len) {
3051    uint16_t old_compact_filename_len; // size of existing old_filename in buf
3052    uint16_t new_compact_filename_len; // size of existing new_filename in buf
3053    uint16_t new_filename_len_enc = _endian_encode(new_filename_len);
3054    uint32_t crc;
3055    size_t crc_offset;
3056    size_t offset = 64;
3057    char *old_filename;
3058    // Read existing DB header's size of newly compacted filename
3059    seq_memcpy(&new_compact_filename_len, buf + offset, sizeof(uint16_t),
3060               offset);
3061    new_compact_filename_len = _endian_decode(new_compact_filename_len);
3062
3063    // Read existing DB header's size of filename before its compaction
3064    seq_memcpy(&old_compact_filename_len, buf + offset, sizeof(uint16_t),
3065               offset);
3066    old_compact_filename_len = _endian_decode(old_compact_filename_len);
3067
3068    // Update DB header's size of newly compacted filename to redirected one
3069    memcpy(buf + 64, &new_filename_len_enc, sizeof(uint16_t));
3070
3071    // Copy over existing DB header's old_filename to its new location
3072    old_filename = (char*)buf + offset + new_filename_len;
3073    if (new_compact_filename_len != new_filename_len) {
3074        memmove(old_filename, buf + offset + new_compact_filename_len,
3075                old_compact_filename_len);
3076    }
3077    // Update the DB header's new_filename to the redirected one
3078    memcpy(buf + 68, new_filename, new_filename_len);
3079    // Compute the DB header's new crc32 value
3080    crc_offset = 68 + new_filename_len + old_compact_filename_len;
3081    crc = chksum(buf, crc_offset);
3082    crc = _endian_encode(crc);
3083    // Update the DB header's new crc32 value
3084    memcpy(buf + crc_offset, &crc, sizeof(crc));
3085    // If the DB header indicated an old_filename, return it
3086    return old_compact_filename_len ? old_filename : NULL;
3087}
3088
3089static fdb_status _fdb_append_commit_mark(void *voidhandle, uint64_t offset)
3090{
3091    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
3092    struct docio_handle *dhandle;
3093
3094    dhandle = handle->dhandle;
3095    if (docio_append_commit_mark(dhandle, offset) == BLK_NOT_FOUND) {
3096        return FDB_RESULT_WRITE_FAIL;
3097    }
3098    return FDB_RESULT_SUCCESS;
3099}
3100
3101LIBFDB_API
3102fdb_status fdb_commit(fdb_file_handle *fhandle, fdb_commit_opt_t opt)
3103{
3104    return _fdb_commit(fhandle->root, opt);
3105}
3106
3107fdb_status _fdb_commit(fdb_kvs_handle *handle, fdb_commit_opt_t opt)
3108{
3109    fdb_txn *txn = handle->fhandle->root->txn;
3110    fdb_txn *earliest_txn;
3111    file_status_t fstatus;
3112    fdb_status fs = FDB_RESULT_SUCCESS;
3113    bool wal_flushed = false;
3114    bid_t dirty_idtree_root, dirty_seqtree_root;
3115    struct avl_tree flush_items;
3116    fdb_status