xref: /6.0.3/forestdb/src/forestdb.cc (revision 5b78091c)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <sys/time.h>
25#endif
26
27#include "libforestdb/forestdb.h"
28#include "fdb_internal.h"
29#include "filemgr.h"
30#include "hbtrie.h"
31#include "list.h"
32#include "breakpad.h"
33#include "btree.h"
34#include "btree_kv.h"
35#include "btree_var_kv_ops.h"
36#include "docio.h"
37#include "btreeblock.h"
38#include "common.h"
39#include "wal.h"
40#include "filemgr_ops.h"
41#include "configuration.h"
42#include "internal_types.h"
43#include "bgflusher.h"
44#include "compactor.h"
45#include "memleak.h"
46#include "time_utils.h"
47#include "timing.h"
48#include "system_resource_stats.h"
49#include "version.h"
50#include "staleblock.h"
51
52#ifdef __DEBUG
53#ifndef __DEBUG_FDB
54    #undef DBG
55    #undef DBGCMD
56    #undef DBGSW
57    #define DBG(...)
58    #define DBGCMD(...)
59    #define DBGSW(n, ...)
60#endif
61#endif
62
63
64static atomic_uint8_t fdb_initialized(0);
65static volatile uint32_t fdb_open_inprog = 0;
66#ifdef SPIN_INITIALIZER
67static spin_t initial_lock = SPIN_INITIALIZER;
68#else
69static volatile unsigned int initial_lock_status = 0;
70static spin_t initial_lock;
71#endif
72
73INLINE int _cmp_uint64_t_endian_safe(void *key1, void *key2, void *aux)
74{
75    (void) aux;
76    uint64_t a,b;
77    a = *(uint64_t*)key1;
78    b = *(uint64_t*)key2;
79    a = _endian_decode(a);
80    b = _endian_decode(b);
81    return _CMP_U64(a, b);
82}
83
84size_t _fdb_readkey_wrap(void *handle, uint64_t offset, void *buf)
85{
86    fdb_status fs;
87    keylen_t keylen;
88    struct docio_handle *dhandle = (struct docio_handle*)handle;
89
90    offset = _endian_decode(offset);
91    fs = docio_read_doc_key(dhandle, offset, &keylen, buf);
92    if (fs == FDB_RESULT_SUCCESS) {
93        return keylen;
94    } else {
95        const char *msg = "docio_read_doc_key error: read failure on "
96            "offset %" _F64 " in a database file '%s' "
97            ": FDB status %d, lastbid 0x%" _X64 ", "
98            "curblock 0x%" _X64 ", curpos 0x%x\n";
99        fdb_log(NULL, FDB_RESULT_READ_FAIL, msg, offset,
100                dhandle->file->filename, fs, dhandle->lastbid,
101                dhandle->curblock, dhandle->curpos);
102        dbg_print_buf(dhandle->readbuffer, dhandle->file->blocksize, true, 16);
103        return 0;
104    }
105}
106
107size_t _fdb_readseq_wrap(void *handle, uint64_t offset, void *buf)
108{
109    int size_id, size_seq, size_chunk;
110    fdb_seqnum_t _seqnum;
111    struct docio_object doc;
112    struct docio_handle *dhandle = (struct docio_handle *)handle;
113
114    size_id = sizeof(fdb_kvs_id_t);
115    size_seq = sizeof(fdb_seqnum_t);
116    size_chunk = dhandle->file->config->chunksize;
117    memset(&doc, 0, sizeof(struct docio_object));
118
119    offset = _endian_decode(offset);
120    if (docio_read_doc_key_meta((struct docio_handle *)handle, offset,
121                                &doc, true) <= 0) {
122        return 0;
123    }
124    buf2buf(size_chunk, doc.key, size_id, buf);
125    _seqnum = _endian_encode(doc.seqnum);
126    memcpy((uint8_t*)buf + size_id, &_seqnum, size_seq);
127
128    free(doc.key);
129    free(doc.meta);
130
131    return size_id + size_seq;
132}
133
134int _fdb_custom_cmp_wrap(void *key1, void *key2, void *aux)
135{
136    int is_key1_inf, is_key2_inf;
137    uint8_t *keystr1 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
138    uint8_t *keystr2 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
139    size_t keylen1, keylen2;
140    btree_cmp_args *args = (btree_cmp_args *)aux;
141    fdb_custom_cmp_variable cmp = (fdb_custom_cmp_variable)args->aux;
142
143    is_key1_inf = _is_inf_key(key1);
144    is_key2_inf = _is_inf_key(key2);
145    if (is_key1_inf && is_key2_inf) { // both are infinite
146        return 0;
147    } else if (!is_key1_inf && is_key2_inf) { // key2 is infinite
148        return -1;
149    } else if (is_key1_inf && !is_key2_inf) { // key1 is infinite
150        return 1;
151    }
152
153    _get_var_key(key1, (void*)keystr1, &keylen1);
154    _get_var_key(key2, (void*)keystr2, &keylen2);
155
156    if (keylen1 == 0 && keylen2 == 0) {
157        return 0;
158    } else if (keylen1 ==0 && keylen2 > 0) {
159        return -1;
160    } else if (keylen1 > 0 && keylen2 == 0) {
161        return 1;
162    }
163
164    return cmp(keystr1, keylen1, keystr2, keylen2);
165}
166
167void fdb_fetch_header(uint64_t version,
168                      void *header_buf,
169                      bid_t *trie_root_bid,
170                      bid_t *seq_root_bid,
171                      bid_t *stale_root_bid,
172                      uint64_t *ndocs,
173                      uint64_t *ndeletes,
174                      uint64_t *nlivenodes,
175                      uint64_t *datasize,
176                      uint64_t *last_wal_flush_hdr_bid,
177                      uint64_t *kv_info_offset,
178                      uint64_t *header_flags,
179                      char **new_filename,
180                      char **old_filename)
181{
182    size_t offset = 0;
183    uint16_t new_filename_len;
184    uint16_t old_filename_len;
185
186    seq_memcpy(trie_root_bid, (uint8_t *)header_buf + offset,
187               sizeof(bid_t), offset);
188    *trie_root_bid = _endian_decode(*trie_root_bid);
189
190    seq_memcpy(seq_root_bid, (uint8_t *)header_buf + offset,
191               sizeof(bid_t), offset);
192    *seq_root_bid = _endian_decode(*seq_root_bid);
193
194    if (ver_staletree_support(version)) {
195        seq_memcpy(stale_root_bid, (uint8_t *)header_buf + offset,
196                   sizeof(bid_t), offset);
197        *stale_root_bid = _endian_decode(*stale_root_bid);
198    } else {
199        *stale_root_bid = BLK_NOT_FOUND;
200    }
201
202    seq_memcpy(ndocs, (uint8_t *)header_buf + offset,
203               sizeof(uint64_t), offset);
204    *ndocs = _endian_decode(*ndocs);
205    if (ver_is_atleast_magic_001(version)) {
206        seq_memcpy(ndeletes, (uint8_t *)header_buf + offset,
207                   sizeof(uint64_t), offset);
208        *ndeletes = _endian_decode(*ndeletes);
209    } else {
210        *ndeletes = 0;
211    }
212
213    seq_memcpy(nlivenodes, (uint8_t *)header_buf + offset,
214               sizeof(uint64_t), offset);
215    *nlivenodes = _endian_decode(*nlivenodes);
216
217    seq_memcpy(datasize, (uint8_t *)header_buf + offset,
218               sizeof(uint64_t), offset);
219    *datasize = _endian_decode(*datasize);
220
221    seq_memcpy(last_wal_flush_hdr_bid, (uint8_t *)header_buf + offset,
222               sizeof(uint64_t), offset);
223    *last_wal_flush_hdr_bid = _endian_decode(*last_wal_flush_hdr_bid);
224
225    seq_memcpy(kv_info_offset, (uint8_t *)header_buf + offset,
226               sizeof(uint64_t), offset);
227    *kv_info_offset = _endian_decode(*kv_info_offset);
228
229    seq_memcpy(header_flags, (uint8_t *)header_buf + offset,
230               sizeof(uint64_t), offset);
231    *header_flags = _endian_decode(*header_flags);
232
233    seq_memcpy(&new_filename_len, (uint8_t *)header_buf + offset,
234               sizeof(new_filename_len), offset);
235    new_filename_len = _endian_decode(new_filename_len);
236    seq_memcpy(&old_filename_len, (uint8_t *)header_buf + offset,
237               sizeof(old_filename_len), offset);
238    old_filename_len = _endian_decode(old_filename_len);
239    if (new_filename_len) {
240        *new_filename = (char*)((uint8_t *)header_buf + offset);
241    } else {
242        *new_filename = NULL;
243    }
244    offset += new_filename_len;
245    if (old_filename && old_filename_len) {
246        *old_filename = (char *) malloc(old_filename_len);
247        seq_memcpy(*old_filename,
248                   (uint8_t *)header_buf + offset,
249                   old_filename_len, offset);
250    }
251}
252
253// read the revnum of the given header of BID
254INLINE filemgr_header_revnum_t _fdb_get_header_revnum(fdb_kvs_handle *handle, bid_t bid)
255{
256    uint8_t *buf = alca(uint8_t, handle->file->blocksize);
257    uint64_t version;
258    size_t header_len;
259    fdb_seqnum_t seqnum;
260    filemgr_header_revnum_t revnum = 0;
261    fdb_status fs;
262
263    fs = filemgr_fetch_header(handle->file, bid, buf, &header_len,
264                              &seqnum, &revnum, NULL, &version, NULL,
265                              &handle->log_callback);
266    if (fs != FDB_RESULT_SUCCESS) {
267        return 0;
268    }
269    return revnum;
270}
271
272INLINE filemgr_header_revnum_t _fdb_get_bmp_revnum(fdb_kvs_handle *handle, bid_t bid)
273{
274    uint8_t *buf = alca(uint8_t, handle->file->blocksize);
275    uint64_t version, bmp_revnum = 0;
276    size_t header_len;
277    fdb_seqnum_t seqnum;
278    filemgr_header_revnum_t revnum;
279    fdb_status fs;
280
281    fs = filemgr_fetch_header(handle->file, bid, buf, &header_len,
282                              &seqnum, &revnum, NULL, &version, &bmp_revnum,
283                              &handle->log_callback);
284    if (fs != FDB_RESULT_SUCCESS) {
285        return 0;
286    }
287    return bmp_revnum;
288}
289
290void fdb_dummy_log_callback(int err_code, const char *err_msg, void *ctx_data)
291{
292    (void)err_code;
293    (void)err_msg;
294    (void)ctx_data;
295    return;
296}
297
298INLINE void _fdb_restore_wal(fdb_kvs_handle *handle,
299                             fdb_restore_mode_t mode,
300                             bid_t hdr_bid,
301                             fdb_kvs_id_t kv_id_req)
302{
303    struct filemgr *file = handle->file;
304    uint32_t blocksize = handle->file->blocksize;
305    uint64_t last_wal_flush_hdr_bid = handle->last_wal_flush_hdr_bid;
306    uint64_t hdr_off = hdr_bid * FDB_BLOCKSIZE;
307    uint64_t offset = 0; //assume everything from first block needs restoration
308    uint64_t filesize = filemgr_get_pos(handle->file);
309    uint64_t doc_scan_limit;
310    uint64_t start_bmp_revnum, stop_bmp_revnum;
311    uint64_t cur_bmp_revnum = (uint64_t)-1;
312    bid_t next_doc_block = BLK_NOT_FOUND;
313    struct _fdb_key_cmp_info cmp_info;
314    err_log_callback *log_callback;
315
316    if (mode == FDB_RESTORE_NORMAL && !handle->shandle &&
317        !wal_try_restore(handle->file)) { // Atomically try to restore WAL
318        // Some other thread or previous open had successfully initialized WAL
319        // We can simply return here
320        return;
321    }
322
323    if (!hdr_off) { // Nothing to do if we don't have a header block offset
324        return;
325    }
326
327    if (last_wal_flush_hdr_bid != BLK_NOT_FOUND) {
328        offset = (last_wal_flush_hdr_bid + 1) * blocksize;
329    }
330
331    // If a valid last header was retrieved and it matches the current header
332    if (hdr_off == offset || hdr_bid == last_wal_flush_hdr_bid) {
333        return; // No WAL section in the file
334    }
335
336    if (mode == FDB_RESTORE_NORMAL && !handle->shandle) {
337        // for normal WAL restore, set status to dirty
338        // (only when the previous status is clean or dirty)
339        wal_set_dirty_status(handle->file, FDB_WAL_DIRTY, true);
340    }
341
342    // Temporarily disable the error logging callback as there are false positive
343    // checksum errors in docio_read_doc.
344    // TODO: Need to adapt docio_read_doc to separate false checksum errors.
345    err_log_callback dummy_cb;
346    log_callback = handle->dhandle->log_callback;
347    dummy_cb.callback = fdb_dummy_log_callback;
348    dummy_cb.ctx_data = NULL;
349    handle->dhandle->log_callback = &dummy_cb;
350
351    if (!handle->shandle) {
352        filemgr_mutex_lock(file);
353    }
354    cmp_info.kvs_config = handle->kvs_config;
355    cmp_info.kvs = handle->kvs;
356
357    start_bmp_revnum = _fdb_get_bmp_revnum(handle, last_wal_flush_hdr_bid);
358    stop_bmp_revnum= _fdb_get_bmp_revnum(handle, hdr_bid);
359    cur_bmp_revnum = start_bmp_revnum;
360
361    // A: reused blocks during the 1st block reclaim (bmp_revnum: 1)
362    // B: reused blocks during the 2nd block reclaim (bmp_revnum: 2)
363    // otherwise: live block (bmp_revnum: 0)
364    //  1 2   3    4    5 6  7  8   9  10
365    // +-------------------------------------------+
366    // |  BBBBAAAAABBBBB  AAABBB    AAA            |
367    // +-------------------------------------------+
368    //              ^                     ^
369    //              hdr_bid               last_wal_flush
370    //
371    // scan order: 1 -> 5 -> 8 -> 10 -> 3 -> 6 -> 9 -> 2 -> 4 -> 7
372    // iteration #1: scan docs with bmp_revnum==0 in [last_wal_flush ~ filesize]
373    // iteration #2: scan docs with bmp_revnum==1 in [0 ~ filesize]
374    // iteration #3: scan docs with bmp_revnum==2 in [0 ~ hdr_bid]
375
376    do {
377        if (cur_bmp_revnum > stop_bmp_revnum) {
378            break;
379        } else if (cur_bmp_revnum == stop_bmp_revnum) {
380
381            bid_t sb_last_hdr_bid = BLK_NOT_FOUND;
382            if (handle->file->sb) {
383                sb_last_hdr_bid = atomic_get_uint64_t(&handle->file->sb->last_hdr_bid);
384            }
385            if (!handle->shandle && handle->file->sb &&
386                sb_last_hdr_bid != BLK_NOT_FOUND) {
387                hdr_off = (sb_last_hdr_bid+1) * blocksize;
388            }
389
390            doc_scan_limit = hdr_off;
391            if (offset >= hdr_off) {
392                break;
393            }
394        } else {
395            doc_scan_limit = filesize;
396        }
397
398        if (!docio_check_buffer(handle->dhandle, offset / blocksize,
399                                cur_bmp_revnum)) {
400            // not a document block .. move to next block
401        } else {
402            do {
403                struct docio_object doc;
404                int64_t _offset;
405                uint64_t doc_offset;
406                memset(&doc, 0, sizeof(doc));
407                _offset = docio_read_doc(handle->dhandle, offset, &doc, true);
408                if (_offset <= 0) { // reached unreadable doc, skip block
409                    // TODO: Need to have this function return fdb_status, so that
410                    // WAL restore operation should fail if offset < 0
411                    break;
412                } else if ((uint64_t)_offset < offset) {
413                    // If more than one writer is appending docs concurrently,
414                    // they have their own doc block linked list and doc blocks
415                    // may not be consecutive. For example,
416                    //
417                    // Writer 1): 100 -> 102 -> 2 -> 4     | commit
418                    // Writer 2):    101 - > 103 -> 3 -> 5 |
419                    //
420                    // In this case, if we read doc BID 102, then 'offset' will jump
421                    // to doc BID 2, without reading BID 103.
422                    //
423                    // To address this issue, in case that 'offset' decreases,
424                    // remember the next doc block, and follow the doc linked list
425                    // first. After the linked list ends, 'offset' cursor will be
426                    // reset to 'next_doc_block'.
427                    next_doc_block = (offset / blocksize) + 1;
428                }
429                if (doc.key || (doc.length.flag & DOCIO_TXN_COMMITTED)) {
430                    // check if the doc is transactional or not, and
431                    // also check if the doc contains system info
432                    if (!(doc.length.flag & DOCIO_TXN_DIRTY) &&
433                        !(doc.length.flag & DOCIO_SYSTEM)) {
434                        if (doc.length.flag & DOCIO_TXN_COMMITTED) {
435                            // commit mark .. read doc offset
436                            doc_offset = doc.doc_offset;
437                            // read the previously skipped doc
438                            if (docio_read_doc(handle->dhandle, doc_offset, &doc, true) <= 0) {
439                                // doc read error
440                                free(doc.key);
441                                free(doc.meta);
442                                free(doc.body);
443                                offset = _offset;
444                                continue;
445                            }
446                        } else {
447                            doc_offset = offset;
448                        }
449
450                        // If say a snapshot is taken on a db handle after
451                        // rollback, then skip WAL items after rollback point
452                        if ((mode == FDB_RESTORE_KV_INS || !handle->kvs) &&
453                            doc.seqnum > handle->seqnum) {
454                            free(doc.key);
455                            free(doc.meta);
456                            free(doc.body);
457                            offset = _offset;
458                            continue;
459                        }
460
461                        // restore document
462                        fdb_doc wal_doc;
463                        wal_doc.keylen = doc.length.keylen;
464                        wal_doc.bodylen = doc.length.bodylen;
465                        wal_doc.key = doc.key;
466                        wal_doc.seqnum = doc.seqnum;
467                        wal_doc.deleted = doc.length.flag & DOCIO_DELETED;
468
469                        if (!handle->shandle) {
470                            wal_doc.metalen = doc.length.metalen;
471                            wal_doc.meta = doc.meta;
472                            wal_doc.size_ondisk = _fdb_get_docsize(doc.length);
473
474                            if (handle->kvs) {
475                                // check seqnum before insert
476                                fdb_kvs_id_t kv_id;
477                                fdb_seqnum_t kv_seqnum;
478                                buf2kvid(handle->config.chunksize,
479                                         wal_doc.key, &kv_id);
480
481                                kv_seqnum = fdb_kvs_get_seqnum(handle->file, kv_id);
482                                if (doc.seqnum <= kv_seqnum &&
483                                        ((mode == FDB_RESTORE_KV_INS &&
484                                            kv_id == kv_id_req) ||
485                                         (mode == FDB_RESTORE_NORMAL)) ) {
486                                    // if mode is NORMAL, restore all items
487                                    // if mode is KV_INS, restore items matching ID
488                                    wal_insert(&file->global_txn, file, &cmp_info,
489                                               &wal_doc, doc_offset,
490                                               WAL_INS_WRITER);
491                                }
492                            } else {
493                                wal_insert(&file->global_txn, file, &cmp_info,
494                                           &wal_doc, doc_offset,
495                                           WAL_INS_WRITER);
496                            }
497                            if (doc.key) free(doc.key);
498                        } else {
499                            // snapshot
500                            if (handle->kvs) {
501                                fdb_kvs_id_t kv_id;
502                                buf2kvid(handle->config.chunksize,
503                                         wal_doc.key, &kv_id);
504                                if (kv_id == handle->kvs->id) {
505                                    // snapshot: insert ID matched documents only
506                                    wal_snap_insert(handle->shandle,
507                                                    &wal_doc, doc_offset);
508                                } else {
509                                    free(doc.key);
510                                }
511                            } else {
512                                wal_snap_insert(handle->shandle, &wal_doc,
513                                                doc_offset);
514                            }
515                        }
516                        free(doc.meta);
517                        free(doc.body);
518                        offset = _offset;
519                    } else {
520                        // skip transactional document or system document
521                        free(doc.key);
522                        free(doc.meta);
523                        free(doc.body);
524                        offset = _offset;
525                        // do not break.. read next doc
526                    }
527                } else {
528                    free(doc.key);
529                    free(doc.meta);
530                    free(doc.body);
531                    offset = _offset;
532                    break;
533                }
534            } while (offset + sizeof(struct docio_length) < doc_scan_limit);
535        }
536
537        if (next_doc_block != BLK_NOT_FOUND) {
538            offset = next_doc_block * blocksize;
539            next_doc_block = BLK_NOT_FOUND;
540        } else {
541            offset = ((offset / blocksize) + 1) * blocksize;
542        }
543        if (ver_superblock_support(handle->file->version) &&
544            offset >= filesize) {
545            // circular scan
546            struct superblock *sb = handle->file->sb;
547            if (sb && sb->config) {
548                offset = blocksize * sb->config->num_sb;
549                cur_bmp_revnum++;
550            }
551        }
552    } while(true);
553
554    // wal commit
555    if (!handle->shandle) {
556        wal_commit(&file->global_txn, file, NULL, &handle->log_callback);
557        filemgr_mutex_unlock(file);
558    }
559    handle->dhandle->log_callback = log_callback;
560}
561
562INLINE fdb_status _fdb_recover_compaction(fdb_kvs_handle *handle,
563                                          const char *new_filename)
564{
565    fdb_kvs_handle new_db;
566    fdb_config config = handle->config;
567    struct filemgr *new_file;
568
569    // As partially compacted file may contain various errors,
570    // we temporarily disable log callback for compaction recovery.
571    memset(&new_db, 0, sizeof(new_db));
572    new_db.log_callback.callback = NULL;
573    new_db.log_callback.ctx_data = NULL;
574    config.flags |= FDB_OPEN_FLAG_RDONLY;
575    new_db.fhandle = handle->fhandle;
576    new_db.kvs_config = handle->kvs_config;
577    fdb_status status = _fdb_open(&new_db, new_filename,
578                                  FDB_AFILENAME, &config);
579    if (status != FDB_RESULT_SUCCESS) {
580        return fdb_log(&handle->log_callback, status,
581                       "Error in opening a partially compacted file '%s' for recovery.",
582                       new_filename);
583    }
584
585    new_file = new_db.file;
586
587    if (new_file->old_filename &&
588        !strncmp(new_file->old_filename, handle->file->filename,
589                 FDB_MAX_FILENAME_LEN)) {
590        struct filemgr *old_file = handle->file;
591        // If new file has a recorded old_filename then it means that
592        // compaction has completed successfully. Mark self for deletion
593        filemgr_mutex_lock(new_file);
594
595        status = btreeblk_end(handle->bhandle);
596        if (status != FDB_RESULT_SUCCESS) {
597            filemgr_mutex_unlock(new_file);
598            _fdb_close(&new_db);
599            return status;
600        }
601        btreeblk_free(handle->bhandle);
602        free(handle->bhandle);
603        handle->bhandle = new_db.bhandle;
604
605        docio_free(handle->dhandle);
606        free(handle->dhandle);
607        handle->dhandle = new_db.dhandle;
608
609        hbtrie_free(handle->trie);
610        free(handle->trie);
611        handle->trie = new_db.trie;
612
613        wal_shutdown(handle->file, &handle->log_callback);
614        handle->file = new_file;
615
616        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
617            if (handle->kvs) {
618                // multi KV instance mode
619                hbtrie_free(handle->seqtrie);
620                free(handle->seqtrie);
621                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
622                    handle->seqtrie = new_db.seqtrie;
623                }
624            } else {
625                free(handle->seqtree->kv_ops);
626                free(handle->seqtree);
627                if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
628                    handle->seqtree = new_db.seqtree;
629                }
630            }
631        }
632        handle->staletree = new_db.staletree;
633
634        filemgr_mutex_unlock(new_file);
635        if (new_db.kvs) {
636            fdb_kvs_info_free(&new_db);
637        }
638        // remove self: WARNING must not close this handle if snapshots
639        // are yet to open this file
640        filemgr_remove_pending(old_file, new_db.file, &new_db.log_callback);
641        filemgr_close(old_file, 0, handle->filename, &handle->log_callback);
642        free(new_db.filename);
643        return FDB_RESULT_FAIL_BY_COMPACTION;
644    }
645
646    // As the new file is partially compacted, it should be removed upon close.
647    // Just in-case the new file gets opened before removal, point it to the old
648    // file to ensure availability of data.
649    filemgr_remove_pending(new_db.file, handle->file, &handle->log_callback);
650    _fdb_close(&new_db);
651
652    return FDB_RESULT_SUCCESS;
653}
654
655#ifndef SPIN_INITIALIZER
656INLINE void init_initial_lock_status() {
657    // Note that only Windows passes through this routine
658    if (!fdb_initialized) {
659        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
660            // atomically initialize spin lock only once
661            spin_init(&initial_lock);
662            initial_lock_status = 2;
663        } else {
664            // the others .. wait until initializing 'initial_lock' is done
665            // TODO: Need to devise a better way of synchronization on Windows
666            while (initial_lock_status != 2) {
667                Sleep(1);
668            }
669        }
670    }
671}
672#endif
673
674LIBFDB_API
675fdb_status fdb_init(fdb_config *config)
676{
677    fdb_config _config;
678    compactor_config c_config;
679    bgflusher_config bgf_config;
680    struct filemgr_config f_config;
681
682    if (config) {
683        if (validate_fdb_config(config)) {
684            _config = *config;
685        } else {
686            return FDB_RESULT_INVALID_CONFIG;
687        }
688    } else {
689        _config = get_default_config();
690    }
691
692    // global initialization
693    // initialized only once at first time
694    if (!fdb_initialized) {
695
696#ifndef SPIN_INITIALIZER
697        init_initial_lock_status();
698#endif
699
700    }
701    spin_lock(&initial_lock);
702    if (!fdb_initialized) {
703#if !defined(_ANDROID_) && !defined(__ANDROID__)
704        // Some Android devices (e.g., Nexus 6) return incorrect RAM size.
705        // We temporarily disable validity checking of block cache size
706        // on Android platform at this time.
707        double ram_size = (double) get_memory_size();
708        if (ram_size * BCACHE_MEMORY_THRESHOLD < (double) _config.buffercache_size) {
709            spin_unlock(&initial_lock);
710            return FDB_RESULT_TOO_BIG_BUFFER_CACHE;
711        }
712#endif
713        // initialize file manager and block cache
714        f_config.blocksize = _config.blocksize;
715        f_config.ncacheblock = _config.buffercache_size / _config.blocksize;
716        f_config.seqtree_opt = _config.seqtree_opt;
717        filemgr_init(&f_config);
718        filemgr_set_lazy_file_deletion(true,
719                                       compactor_register_file_removing,
720                                       compactor_is_file_removed);
721        if (ver_superblock_support(ver_get_latest_magic())) {
722            struct sb_ops sb_ops = {sb_init, sb_get_default_config,
723                                    sb_read_latest, sb_alloc_block,
724                                    sb_bmp_is_writable, sb_get_bmp_revnum,
725                                    sb_get_min_live_revnum, sb_free};
726            filemgr_set_sb_operation(sb_ops);
727            sb_bmp_mask_init();
728        }
729
730        // initialize compaction daemon
731        c_config.sleep_duration = _config.compactor_sleep_duration;
732        c_config.num_threads = _config.num_compactor_threads;
733        compactor_init(&c_config);
734        // initialize background flusher daemon
735        // Temporarily disable background flushers until blockcache contention
736        // issue is resolved.
737        bgf_config.num_threads = 0; //_config.num_bgflusher_threads;
738        bgflusher_init(&bgf_config);
739
740        // Initialize breakpad
741        _dbg_handle_crashes(config->breakpad_minidump_dir);
742
743        fdb_initialized = 1;
744    }
745    spin_unlock(&initial_lock);
746
747    return FDB_RESULT_SUCCESS;
748}
749
750LIBFDB_API
751fdb_config fdb_get_default_config(void) {
752    return get_default_config();
753}
754
755LIBFDB_API
756fdb_kvs_config fdb_get_default_kvs_config(void) {
757    return get_default_kvs_config();
758}
759
760LIBFDB_API
761fdb_status fdb_open(fdb_file_handle **ptr_fhandle,
762                    const char *filename,
763                    fdb_config *fconfig)
764{
765#ifdef _MEMPOOL
766    mempool_init();
767#endif
768
769    fdb_config config;
770    fdb_file_handle *fhandle;
771    fdb_kvs_handle *handle;
772    LATENCY_STAT_START();
773
774    if (fconfig) {
775        if (validate_fdb_config(fconfig)) {
776            config = *fconfig;
777        } else {
778            return FDB_RESULT_INVALID_CONFIG;
779        }
780    } else {
781        config = get_default_config();
782    }
783
784    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
785    if (!fhandle) { // LCOV_EXCL_START
786        return FDB_RESULT_ALLOC_FAIL;
787    } // LCOV_EXCL_STOP
788
789    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
790    if (!handle) { // LCOV_EXCL_START
791        free(fhandle);
792        return FDB_RESULT_ALLOC_FAIL;
793    } // LCOV_EXCL_STOP
794
795#ifndef SPIN_INITIALIZER
796    init_initial_lock_status();
797#endif
798
799    spin_lock(&initial_lock);
800    fdb_open_inprog++;
801    spin_unlock(&initial_lock);
802
803    atomic_init_uint8_t(&handle->handle_busy, 0);
804    handle->shandle = NULL;
805    handle->kvs_config = get_default_kvs_config();
806
807    fdb_status fs = fdb_init(fconfig);
808    if (fs != FDB_RESULT_SUCCESS) {
809        free(handle);
810        free(fhandle);
811        spin_lock(&initial_lock);
812        fdb_open_inprog--;
813        spin_unlock(&initial_lock);
814        return fs;
815    }
816    fdb_file_handle_init(fhandle, handle);
817
818    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
819    if (fs == FDB_RESULT_SUCCESS) {
820        *ptr_fhandle = fhandle;
821        filemgr_fhandle_add(handle->file, fhandle);
822        LATENCY_STAT_END(handle->file, FDB_LATENCY_OPEN);
823    } else {
824        *ptr_fhandle = NULL;
825        free(handle);
826        fdb_file_handle_free(fhandle);
827    }
828    spin_lock(&initial_lock);
829    fdb_open_inprog--;
830    spin_unlock(&initial_lock);
831    return fs;
832}
833
834LIBFDB_API
835fdb_status fdb_open_custom_cmp(fdb_file_handle **ptr_fhandle,
836                               const char *filename,
837                               fdb_config *fconfig,
838                               size_t num_functions,
839                               char **kvs_names,
840                               fdb_custom_cmp_variable *functions)
841{
842#ifdef _MEMPOOL
843    mempool_init();
844#endif
845
846    fdb_config config;
847    fdb_file_handle *fhandle;
848    fdb_kvs_handle *handle;
849
850    if (fconfig) {
851        if (validate_fdb_config(fconfig)) {
852            config = *fconfig;
853        } else {
854            return FDB_RESULT_INVALID_CONFIG;
855        }
856    } else {
857        config = get_default_config();
858    }
859
860    if (config.multi_kv_instances == false) {
861        // single KV instance mode does not support customized cmp function
862        return FDB_RESULT_INVALID_CONFIG;
863    }
864
865    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
866    if (!fhandle) { // LCOV_EXCL_START
867        return FDB_RESULT_ALLOC_FAIL;
868    } // LCOV_EXCL_STOP
869
870    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
871    if (!handle) { // LCOV_EXCL_START
872        free(fhandle);
873        return FDB_RESULT_ALLOC_FAIL;
874    } // LCOV_EXCL_STOP
875
876#ifndef SPIN_INITIALIZER
877    init_initial_lock_status();
878#endif
879
880    spin_lock(&initial_lock);
881    fdb_open_inprog++;
882    spin_unlock(&initial_lock);
883
884    atomic_init_uint8_t(&handle->handle_busy, 0);
885    handle->shandle = NULL;
886    handle->kvs_config = get_default_kvs_config();
887
888    fdb_status fs = fdb_init(fconfig);
889    if (fs != FDB_RESULT_SUCCESS) {
890        free(handle);
891        free(fhandle);
892        spin_lock(&initial_lock);
893        fdb_open_inprog--;
894        spin_unlock(&initial_lock);
895        return fs;
896    }
897    fdb_file_handle_init(fhandle, handle);
898
899    // insert kvs_names and functions into fhandle's list
900    fdb_file_handle_parse_cmp_func(fhandle, num_functions,
901                                   kvs_names, functions);
902
903    fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
904    if (fs == FDB_RESULT_SUCCESS) {
905        *ptr_fhandle = fhandle;
906        filemgr_fhandle_add(handle->file, fhandle);
907    } else {
908        *ptr_fhandle = NULL;
909        free(handle);
910        fdb_file_handle_free(fhandle);
911    }
912    spin_lock(&initial_lock);
913    fdb_open_inprog--;
914    spin_unlock(&initial_lock);
915    return fs;
916}
917
918fdb_status fdb_open_for_compactor(fdb_file_handle **ptr_fhandle,
919                                  const char *filename,
920                                  fdb_config *fconfig,
921                                  struct list *cmp_func_list)
922{
923#ifdef _MEMPOOL
924    mempool_init();
925#endif
926
927    fdb_file_handle *fhandle;
928    fdb_kvs_handle *handle;
929
930    fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
931    if (!fhandle) { // LCOV_EXCL_START
932        return FDB_RESULT_ALLOC_FAIL;
933    } // LCOV_EXCL_STOP
934
935    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
936    if (!handle) { // LCOV_EXCL_START
937        free(fhandle);
938        return FDB_RESULT_ALLOC_FAIL;
939    } // LCOV_EXCL_STOP
940
941    atomic_init_uint8_t(&handle->handle_busy, 0);
942    handle->shandle = NULL;
943
944    fdb_file_handle_init(fhandle, handle);
945    if (cmp_func_list && list_begin(cmp_func_list)) {
946        fdb_file_handle_clone_cmp_func_list(fhandle, cmp_func_list);
947    }
948    fdb_status fs = _fdb_open(handle, filename, FDB_VFILENAME, fconfig);
949    if (fs == FDB_RESULT_SUCCESS) {
950        *ptr_fhandle = fhandle;
951        filemgr_fhandle_add(handle->file, fhandle);
952    } else {
953        *ptr_fhandle = NULL;
954        free(handle);
955        fdb_file_handle_free(fhandle);
956    }
957    return fs;
958}
959
960LIBFDB_API
961fdb_status fdb_snapshot_open(fdb_kvs_handle *handle_in,
962                             fdb_kvs_handle **ptr_handle, fdb_seqnum_t seqnum)
963{
964#ifdef _MEMPOOL
965    mempool_init();
966#endif
967
968    if (!handle_in || !ptr_handle) {
969        return FDB_RESULT_INVALID_HANDLE;
970    }
971
972    fdb_config config = handle_in->config;
973    fdb_kvs_config kvs_config = handle_in->kvs_config;
974    fdb_kvs_id_t kv_id = 0;
975    fdb_kvs_handle *handle;
976    fdb_txn *txn = NULL;
977    fdb_status fs = FDB_RESULT_SUCCESS;
978    filemgr *file;
979    file_status_t fstatus = FILE_NORMAL;
980    struct snap_handle dummy_shandle;
981    struct _fdb_key_cmp_info cmp_info;
982    LATENCY_STAT_START();
983
984fdb_snapshot_open_start:
985    if (!handle_in->shandle) {
986        fdb_check_file_reopen(handle_in, &fstatus);
987        fdb_sync_db_header(handle_in);
988        file = handle_in->file;
989
990        if (handle_in->kvs && handle_in->kvs->type == KVS_SUB) {
991            handle_in->seqnum = fdb_kvs_get_seqnum(file,
992                                                   handle_in->kvs->id);
993        } else {
994            handle_in->seqnum = filemgr_get_seqnum(file);
995        }
996    } else {
997        file = handle_in->file;
998    }
999
1000    // if the max sequence number seen by this handle is lower than the
1001    // requested snapshot marker, it means the snapshot is not yet visible
1002    // even via the current fdb_kvs_handle
1003    if (seqnum != FDB_SNAPSHOT_INMEM && seqnum > handle_in->seqnum) {
1004        return FDB_RESULT_NO_DB_INSTANCE;
1005    }
1006
1007    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1008    if (!handle) { // LCOV_EXCL_START
1009        return FDB_RESULT_ALLOC_FAIL;
1010    } // LCOV_EXCL_STOP
1011
1012    atomic_init_uint8_t(&handle->handle_busy, 0);
1013    handle->log_callback = handle_in->log_callback;
1014    handle->max_seqnum = seqnum;
1015    handle->fhandle = handle_in->fhandle;
1016
1017    config.flags |= FDB_OPEN_FLAG_RDONLY;
1018    // do not perform compaction for snapshot
1019    config.compaction_mode = FDB_COMPACTION_MANUAL;
1020
1021    // If cloning an existing snapshot handle, then rewind indexes
1022    // to its last DB header and point its avl tree to existing snapshot's tree
1023    bool clone_snapshot = false;
1024    if (handle_in->shandle) {
1025        atomic_store_uint64_t(&handle->last_hdr_bid,  // do fast rewind
1026                              atomic_get_uint64_t(&handle_in->last_hdr_bid));
1027        fs = wal_snapshot_clone(handle_in->shandle, &handle->shandle, seqnum);
1028        if (fs == FDB_RESULT_SUCCESS) {
1029            clone_snapshot = true;
1030            handle->max_seqnum = FDB_SNAPSHOT_INMEM; // temp value to skip WAL
1031        } else {
1032            fdb_log(&handle_in->log_callback, fs,
1033                    "Warning: Snapshot clone at sequence number %" _F64
1034                    "does not match its snapshot handle %" _F64
1035                    "in file '%s'.", seqnum, handle_in->seqnum,
1036                    handle_in->file->filename);
1037            free(handle);
1038            return fs;
1039        }
1040    }
1041
1042    cmp_info.kvs_config = handle_in->kvs_config;
1043    cmp_info.kvs = handle_in->kvs;
1044
1045    if (!handle->shandle) {
1046        txn = handle_in->fhandle->root->txn;
1047        if (!txn) {
1048            txn = &handle_in->file->global_txn;
1049        }
1050        if (handle_in->kvs) {
1051            kv_id = handle_in->kvs->id;
1052        }
1053        if (seqnum == FDB_SNAPSHOT_INMEM) {
1054            memset(&dummy_shandle, 0, sizeof(struct snap_handle));
1055            // tmp value to denote snapshot & not rollback to _fdb_open
1056            handle->shandle = &dummy_shandle; // dummy
1057        } else {
1058            fs = wal_dur_snapshot_open(seqnum, &cmp_info, file, txn,
1059                                       &handle->shandle);
1060        }
1061        if (fs != FDB_RESULT_SUCCESS) {
1062            free(handle);
1063            return fs;
1064        }
1065    }
1066
1067    if (handle_in->kvs) {
1068        // sub-handle in multi KV instance mode
1069        if (clone_snapshot) {
1070            fs = _fdb_kvs_clone_snapshot(handle_in, handle);
1071        } else {
1072            fs = _fdb_kvs_open(handle_in->kvs->root,
1073                              &config, &kvs_config, file,
1074                              file->filename,
1075                              _fdb_kvs_get_name(handle_in, file),
1076                              handle);
1077        }
1078    } else {
1079        if (clone_snapshot) {
1080            fs = _fdb_clone_snapshot(handle_in, handle);
1081        } else {
1082            fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1083        }
1084    }
1085
1086    if (fs == FDB_RESULT_SUCCESS) {
1087        if (seqnum == FDB_SNAPSHOT_INMEM &&
1088            !handle_in->shandle) {
1089            handle->max_seqnum = handle_in->seqnum;
1090
1091            // synchronize dirty root nodes if exist
1092            bid_t dirty_idtree_root = BLK_NOT_FOUND;
1093            bid_t dirty_seqtree_root = BLK_NOT_FOUND;
1094            struct filemgr_dirty_update_node *dirty_update;
1095
1096            dirty_update = filemgr_dirty_update_get_latest(handle->file);
1097            btreeblk_set_dirty_update(handle->bhandle, dirty_update);
1098
1099            if (dirty_update) {
1100                filemgr_dirty_update_get_root(handle->file, dirty_update,
1101                                       &dirty_idtree_root, &dirty_seqtree_root);
1102                _fdb_import_dirty_root(handle, dirty_idtree_root,
1103                                       dirty_seqtree_root);
1104                btreeblk_discard_blocks(handle->bhandle);
1105            }
1106            // Having synced the dirty root, make an in-memory WAL snapshot
1107            // TODO: Re-enable WAL sharing once ready...
1108#ifdef _MVCC_WAL_ENABLE
1109            fs = wal_snapshot_open(handle->file, txn, kv_id, seqnum,
1110                                   &cmp_info, &handle->shandle);
1111#else
1112            fs = wal_dur_snapshot_open(handle->seqnum, &cmp_info, file, txn,
1113                                       &handle->shandle);
1114            if (fs == FDB_RESULT_SUCCESS) {
1115                fs = wal_copyto_snapshot(file, handle->shandle,
1116                                        (bool)handle_in->kvs);
1117            }
1118            (void)kv_id;
1119#endif // _MVCC_WAL_ENABLE
1120        } else if (clone_snapshot) {
1121            // Snapshot is created on the other snapshot handle
1122
1123            handle->max_seqnum = handle_in->seqnum;
1124
1125            if (seqnum == FDB_SNAPSHOT_INMEM) {
1126                // in-memory snapshot
1127                // Clone dirty root nodes from the source snapshot by incrementing
1128                // their ref counters
1129                handle->trie->root_bid = handle_in->trie->root_bid;
1130                if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1131                    if (handle->kvs) {
1132                        handle->seqtrie->root_bid = handle_in->seqtrie->root_bid;
1133                    } else {
1134                        handle->seqtree->root_bid = handle_in->seqtree->root_bid;
1135                    }
1136                }
1137                btreeblk_discard_blocks(handle->bhandle);
1138
1139                // increase ref count for dirty update
1140                struct filemgr_dirty_update_node *dirty_update;
1141                dirty_update = btreeblk_get_dirty_update(handle_in->bhandle);
1142                filemgr_dirty_update_inc_ref_count(dirty_update);
1143                btreeblk_set_dirty_update(handle->bhandle, dirty_update);
1144            }
1145        }
1146        *ptr_handle = handle;
1147    } else {
1148        *ptr_handle = NULL;
1149        if (clone_snapshot || seqnum != FDB_SNAPSHOT_INMEM) {
1150            wal_snapshot_close(handle->shandle, handle->file);
1151        }
1152        free(handle);
1153        // If compactor thread had finished compaction just before this routine
1154        // calls _fdb_open, then it is possible that the snapshot's DB header
1155        // is only present in the new_file. So we must retry the snapshot
1156        // open attempt IFF _fdb_open indicates FDB_RESULT_NO_DB_INSTANCE..
1157        if (fs == FDB_RESULT_NO_DB_INSTANCE && fstatus == FILE_COMPACT_OLD) {
1158            if (filemgr_get_file_status(file) == FILE_REMOVED_PENDING) {
1159                goto fdb_snapshot_open_start;
1160            }
1161        }
1162    }
1163
1164    if (handle_in->shandle) {
1165        LATENCY_STAT_END(file, FDB_LATENCY_SNAP_CLONE);
1166    } else if (seqnum == FDB_SNAPSHOT_INMEM) {
1167        LATENCY_STAT_END(file, FDB_LATENCY_SNAP_INMEM);
1168    } else {
1169        LATENCY_STAT_END(file, FDB_LATENCY_SNAP_DUR);
1170    }
1171    return fs;
1172}
1173
1174static fdb_status _fdb_reset(fdb_kvs_handle *handle, fdb_kvs_handle *handle_in);
1175
1176LIBFDB_API
1177fdb_status fdb_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
1178{
1179#ifdef _MEMPOOL
1180    mempool_init();
1181#endif
1182
1183    fdb_config config;
1184    fdb_kvs_handle *handle_in, *handle;
1185    fdb_status fs;
1186    fdb_seqnum_t old_seqnum;
1187
1188    if (!handle_ptr) {
1189        return FDB_RESULT_INVALID_HANDLE;
1190    }
1191
1192    handle_in = *handle_ptr;
1193
1194    if (!handle_in) {
1195        return FDB_RESULT_INVALID_HANDLE;
1196    }
1197
1198    config = handle_in->config;
1199
1200    if (handle_in->kvs) {
1201        return fdb_kvs_rollback(handle_ptr, seqnum);
1202    }
1203
1204    if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
1205        return fdb_log(&handle_in->log_callback, FDB_RESULT_RONLY_VIOLATION,
1206                       "Warning: Rollback is not allowed on the read-only DB file '%s'.",
1207                       handle_in->file->filename);
1208    }
1209
1210    if (!atomic_cas_uint8_t(&handle_in->handle_busy, 0, 1)) {
1211        return FDB_RESULT_HANDLE_BUSY;
1212    }
1213
1214    filemgr_mutex_lock(handle_in->file);
1215    filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
1216    // All transactions should be closed before rollback
1217    if (wal_txn_exists(handle_in->file)) {
1218        filemgr_set_rollback(handle_in->file, 0);
1219        filemgr_mutex_unlock(handle_in->file);
1220        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1221        return FDB_RESULT_FAIL_BY_TRANSACTION;
1222    }
1223
1224    // If compaction is running, wait until it is aborted.
1225    // TODO: Find a better way of waiting for the compaction abortion.
1226    unsigned int sleep_time = 10000; // 10 ms.
1227    file_status_t fstatus = filemgr_get_file_status(handle_in->file);
1228    while (fstatus == FILE_COMPACT_OLD) {
1229        filemgr_mutex_unlock(handle_in->file);
1230        decaying_usleep(&sleep_time, 1000000);
1231        filemgr_mutex_lock(handle_in->file);
1232        fstatus = filemgr_get_file_status(handle_in->file);
1233    }
1234    if (fstatus == FILE_REMOVED_PENDING) {
1235        filemgr_mutex_unlock(handle_in->file);
1236        fdb_check_file_reopen(handle_in, NULL);
1237    } else {
1238        filemgr_mutex_unlock(handle_in->file);
1239    }
1240
1241    fdb_sync_db_header(handle_in);
1242
1243    // if the max sequence number seen by this handle is lower than the
1244    // requested snapshot marker, it means the snapshot is not yet visible
1245    // even via the current fdb_kvs_handle
1246    if (seqnum > handle_in->seqnum) {
1247        filemgr_set_rollback(handle_in->file, 0); // allow mutations
1248        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1249        return FDB_RESULT_NO_DB_INSTANCE;
1250    }
1251
1252    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1253    if (!handle) { // LCOV_EXCL_START
1254        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1255        return FDB_RESULT_ALLOC_FAIL;
1256    } // LCOV_EXCL_STOP
1257
1258    atomic_init_uint8_t(&handle->handle_busy, 0);
1259    handle->log_callback = handle_in->log_callback;
1260    handle->fhandle = handle_in->fhandle;
1261    if (seqnum == 0) {
1262        fs = _fdb_reset(handle, handle_in);
1263    } else {
1264        handle->max_seqnum = seqnum;
1265        fs = _fdb_open(handle, handle_in->file->filename, FDB_AFILENAME,
1266                       &config);
1267    }
1268
1269    filemgr_set_rollback(handle_in->file, 0); // allow mutations
1270    if (fs == FDB_RESULT_SUCCESS) {
1271        // rollback the file's sequence number
1272        filemgr_mutex_lock(handle_in->file);
1273        old_seqnum = filemgr_get_seqnum(handle_in->file);
1274        filemgr_set_seqnum(handle_in->file, seqnum);
1275        filemgr_mutex_unlock(handle_in->file);
1276
1277        fs = _fdb_commit(handle, FDB_COMMIT_MANUAL_WAL_FLUSH,
1278                !(handle_in->config.durability_opt & FDB_DRB_ASYNC));
1279        if (fs == FDB_RESULT_SUCCESS) {
1280            if (handle_in->txn) {
1281                handle->txn = handle_in->txn;
1282                handle_in->txn = NULL;
1283            }
1284            handle_in->fhandle->root = handle;
1285            _fdb_close_root(handle_in);
1286            handle->max_seqnum = 0;
1287            handle->seqnum = seqnum;
1288            *handle_ptr = handle;
1289        } else {
1290            // cancel the rolling-back of the sequence number
1291            filemgr_mutex_lock(handle_in->file);
1292            filemgr_set_seqnum(handle_in->file, old_seqnum);
1293            filemgr_mutex_unlock(handle_in->file);
1294            free(handle);
1295            atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1296        }
1297    } else {
1298        free(handle);
1299        atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1300    }
1301
1302    return fs;
1303}
1304
1305LIBFDB_API
1306fdb_status fdb_rollback_all(fdb_file_handle *fhandle,
1307                            fdb_snapshot_marker_t marker)
1308{
1309#ifdef _MEMPOOL
1310    mempool_init();
1311#endif
1312
1313    fdb_config config;
1314    fdb_kvs_handle *super_handle;
1315    fdb_kvs_handle rhandle;
1316    fdb_kvs_handle *handle = &rhandle;
1317    struct filemgr *file;
1318    fdb_kvs_config kvs_config;
1319    fdb_status fs;
1320    err_log_callback log_callback;
1321    struct kvs_info *kvs;
1322    struct snap_handle shandle; // dummy snap handle
1323
1324    if (!fhandle) {
1325        return FDB_RESULT_INVALID_HANDLE;
1326    }
1327
1328    super_handle = fhandle->root;
1329    kvs = super_handle->kvs;
1330
1331    // fdb_rollback_all cannot be allowed when there are kv store instances
1332    // still open, because we do not have means of invalidating open kv handles
1333    // which may not be present in the rollback point
1334    if (kvs && _fdb_kvs_is_busy(fhandle)) {
1335        return FDB_RESULT_KV_STORE_BUSY;
1336    }
1337    file = super_handle->file;
1338    config = super_handle->config;
1339    kvs_config = super_handle->kvs_config;
1340    log_callback = super_handle->log_callback;
1341
1342    if (super_handle->config.flags & FDB_OPEN_FLAG_RDONLY) {
1343        return fdb_log(&super_handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
1344                       "Warning: Rollback is not allowed on the read-only DB file '%s'.",
1345                       super_handle->file->filename);
1346    }
1347
1348    filemgr_mutex_lock(super_handle->file);
1349    filemgr_set_rollback(super_handle->file, 1); // disallow writes operations
1350    // All transactions should be closed before rollback
1351    if (wal_txn_exists(super_handle->file)) {
1352        filemgr_set_rollback(super_handle->file, 0);
1353        filemgr_mutex_unlock(super_handle->file);
1354        return FDB_RESULT_FAIL_BY_TRANSACTION;
1355    }
1356
1357    // If compaction is running, wait until it is aborted.
1358    // TODO: Find a better way of waiting for the compaction abortion.
1359    unsigned int sleep_time = 10000; // 10 ms.
1360    file_status_t fstatus = filemgr_get_file_status(super_handle->file);
1361    while (fstatus == FILE_COMPACT_OLD) {
1362        filemgr_mutex_unlock(super_handle->file);
1363        decaying_usleep(&sleep_time, 1000000);
1364        filemgr_mutex_lock(super_handle->file);
1365        fstatus = filemgr_get_file_status(super_handle->file);
1366    }
1367    if (fstatus == FILE_REMOVED_PENDING) {
1368        filemgr_mutex_unlock(super_handle->file);
1369        fdb_check_file_reopen(super_handle, NULL);
1370    } else {
1371        filemgr_mutex_unlock(super_handle->file);
1372    }
1373
1374    fdb_sync_db_header(super_handle);
1375    // Shutdown WAL discarding entries from all KV Stores..
1376    fs = wal_shutdown(super_handle->file, &super_handle->log_callback);
1377    if (fs != FDB_RESULT_SUCCESS) {
1378        return fs;
1379    }
1380
1381    memset(handle, 0, sizeof(fdb_kvs_handle));
1382    memset(&shandle, 0, sizeof(struct snap_handle));
1383    handle->log_callback = log_callback;
1384    handle->fhandle = fhandle;
1385    // Fast rewind on open...
1386    atomic_store_uint64_t(&handle->last_hdr_bid, (bid_t)marker);
1387    handle->max_seqnum = FDB_SNAPSHOT_INMEM; // Prevent WAL restore on open
1388    handle->shandle = &shandle; // a dummy handle to prevent WAL restore
1389    if (kvs) {
1390        fdb_kvs_header_free(file); // KV header will be recreated below.
1391        handle->kvs = kvs; // re-use super_handle's kvs info
1392        handle->kvs_config = kvs_config;
1393    }
1394    handle->config = config;
1395
1396    fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1397
1398    if (handle->config.multi_kv_instances) {
1399        filemgr_mutex_lock(handle->file);
1400        fdb_kvs_header_create(handle->file);
1401        fdb_kvs_header_read(handle->file->kv_header, handle->dhandle,
1402                            handle->kv_info_offset,
1403                            handle->file->version, false);
1404        filemgr_mutex_unlock(handle->file);
1405    }
1406
1407    filemgr_set_rollback(file, 0); // allow mutations
1408    handle->shandle = NULL; // just a dummy handle never allocated
1409
1410    if (fs == FDB_RESULT_SUCCESS) {
1411        fdb_seqnum_t old_seqnum;
1412        // Restore WAL for all KV instances...
1413        _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, (bid_t)marker, 0);
1414
1415        // rollback the file's sequence number
1416        filemgr_mutex_lock(file);
1417        old_seqnum = filemgr_get_seqnum(file);
1418        filemgr_set_seqnum(file, handle->seqnum);
1419        filemgr_mutex_unlock(file);
1420
1421        fs = _fdb_commit(handle, FDB_COMMIT_NORMAL,
1422                         !(handle->config.durability_opt & FDB_DRB_ASYNC));
1423        if (fs == FDB_RESULT_SUCCESS) {
1424            _fdb_close(super_handle);
1425            *super_handle = *handle;
1426        } else {
1427            filemgr_mutex_lock(file);
1428            filemgr_set_seqnum(file, old_seqnum);
1429            filemgr_mutex_unlock(file);
1430        }
1431    } else { // Rollback failed, restore KV header
1432        fdb_kvs_header_create(file);
1433        fdb_kvs_header_read(file->kv_header, super_handle->dhandle,
1434                            super_handle->kv_info_offset,
1435                            ver_get_latest_magic(),
1436                            false);
1437    }
1438
1439    return fs;
1440}
1441
1442static void _fdb_init_file_config(const fdb_config *config,
1443                                  struct filemgr_config *fconfig) {
1444    fconfig->blocksize = config->blocksize;
1445    fconfig->ncacheblock = config->buffercache_size / config->blocksize;
1446    fconfig->chunksize = config->chunksize;
1447
1448    fconfig->options = 0x0;
1449    fconfig->seqtree_opt = config->seqtree_opt;
1450
1451    if (config->flags & FDB_OPEN_FLAG_CREATE) {
1452        fconfig->options |= FILEMGR_CREATE;
1453    }
1454    if (config->flags & FDB_OPEN_FLAG_RDONLY) {
1455        fconfig->options |= FILEMGR_READONLY;
1456    }
1457    if (!(config->durability_opt & FDB_DRB_ASYNC)) {
1458        fconfig->options |= FILEMGR_SYNC;
1459    }
1460
1461    fconfig->flag = 0x0;
1462    if ((config->durability_opt & FDB_DRB_ODIRECT) &&
1463        config->buffercache_size) {
1464        fconfig->flag |= _ARCH_O_DIRECT;
1465    }
1466
1467    fconfig->prefetch_duration = config->prefetch_duration;
1468    fconfig->num_wal_shards = config->num_wal_partitions;
1469    fconfig->num_bcache_shards = config->num_bcache_partitions;
1470    fconfig->encryption_key = config->encryption_key;
1471    atomic_store_uint64_t(&fconfig->block_reusing_threshold,
1472                          config->block_reusing_threshold,
1473                          std::memory_order_relaxed);
1474    atomic_store_uint64_t(&fconfig->num_keeping_headers,
1475                          config->num_keeping_headers,
1476                          std::memory_order_relaxed);
1477}
1478
1479fdb_status _fdb_clone_snapshot(fdb_kvs_handle *handle_in,
1480                               fdb_kvs_handle *handle_out)
1481{
1482    fdb_status status;
1483
1484    handle_out->config = handle_in->config;
1485    handle_out->kvs_config = handle_in->kvs_config;
1486    handle_out->fileops = handle_in->fileops;
1487    handle_out->file = handle_in->file;
1488    // Note that the file ref count will be decremented when the cloned snapshot
1489    // is closed through filemgr_close().
1490    filemgr_incr_ref_count(handle_out->file);
1491
1492    if (handle_out->filename) {
1493        handle_out->filename = (char *)realloc(handle_out->filename,
1494                                               strlen(handle_in->filename)+1);
1495    } else {
1496        handle_out->filename = (char*)malloc(strlen(handle_in->filename)+1);
1497    }
1498    strcpy(handle_out->filename, handle_in->filename);
1499
1500    // initialize the docio handle.
1501    handle_out->dhandle = (struct docio_handle *)
1502        calloc(1, sizeof(struct docio_handle));
1503    handle_out->dhandle->log_callback = &handle_out->log_callback;
1504    docio_init(handle_out->dhandle, handle_out->file,
1505               handle_out->config.compress_document_body);
1506
1507    // initialize the btree block handle.
1508    handle_out->btreeblkops = btreeblk_get_ops();
1509    handle_out->bhandle = (struct btreeblk_handle *)
1510        calloc(1, sizeof(struct btreeblk_handle));
1511    handle_out->bhandle->log_callback = &handle_out->log_callback;
1512    btreeblk_init(handle_out->bhandle, handle_out->file, handle_out->file->blocksize);
1513
1514    handle_out->dirty_updates = handle_in->dirty_updates;
1515    atomic_store_uint64_t(&handle_out->cur_header_revnum, handle_in->cur_header_revnum);
1516    handle_out->last_wal_flush_hdr_bid = handle_in->last_wal_flush_hdr_bid;
1517    handle_out->kv_info_offset = handle_in->kv_info_offset;
1518    handle_out->op_stats = handle_in->op_stats;
1519
1520    // initialize the trie handle
1521    handle_out->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1522    hbtrie_init(handle_out->trie, handle_out->config.chunksize, OFFSET_SIZE,
1523                handle_out->file->blocksize,
1524                handle_in->trie->root_bid, // Source snapshot's trie root bid
1525                (void *)handle_out->bhandle, handle_out->btreeblkops,
1526                (void *)handle_out->dhandle, _fdb_readkey_wrap);
1527    // set aux for cmp wrapping function
1528    hbtrie_set_leaf_height_limit(handle_out->trie, 0xff);
1529    hbtrie_set_leaf_cmp(handle_out->trie, _fdb_custom_cmp_wrap);
1530
1531    if (handle_out->kvs) {
1532        hbtrie_set_map_function(handle_out->trie, fdb_kvs_find_cmp_chunk);
1533    }
1534
1535    handle_out->seqnum = handle_in->seqnum;
1536    if (handle_out->config.seqtree_opt == FDB_SEQTREE_USE) {
1537        if (handle_out->config.multi_kv_instances) {
1538            // multi KV instance mode .. HB+trie
1539            handle_out->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1540            hbtrie_init(handle_out->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
1541                        handle_out->file->blocksize,
1542                        handle_in->seqtrie->root_bid, // Source snapshot's seqtrie root bid
1543                        (void *)handle_out->bhandle, handle_out->btreeblkops,
1544                        (void *)handle_out->dhandle, _fdb_readseq_wrap);
1545
1546        } else {
1547            // single KV instance mode .. normal B+tree
1548            struct btree_kv_ops *seq_kv_ops =
1549                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
1550            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
1551            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
1552
1553            handle_out->seqtree = (struct btree*)malloc(sizeof(struct btree));
1554            // Init the seq tree using the root bid of the source snapshot.
1555            btree_init_from_bid(handle_out->seqtree, (void *)handle_out->bhandle,
1556                                handle_out->btreeblkops, seq_kv_ops,
1557                                handle_out->config.blocksize,
1558                                handle_in->seqtree->root_bid);
1559        }
1560    } else{
1561        handle_out->seqtree = NULL;
1562    }
1563
1564    status = btreeblk_end(handle_out->bhandle);
1565    if (status != FDB_RESULT_SUCCESS) {
1566        const char *msg = "Snapshot clone operation fails due to the errors in "
1567            "btreeblk_end() in a database file '%s'\n";
1568        fdb_log(&handle_in->log_callback, status, msg, handle_in->file->filename);
1569    }
1570
1571    return status;
1572}
1573
1574fdb_status _fdb_open(fdb_kvs_handle *handle,
1575                     const char *filename,
1576                     fdb_filename_mode_t filename_mode,
1577                     const fdb_config *config)
1578{
1579    struct filemgr_config fconfig;
1580    struct kvs_stat stat, empty_stat;
1581    bid_t trie_root_bid = BLK_NOT_FOUND;
1582    bid_t seq_root_bid = BLK_NOT_FOUND;
1583    bid_t stale_root_bid = BLK_NOT_FOUND;
1584    fdb_seqnum_t seqnum = 0;
1585    filemgr_header_revnum_t header_revnum = 0;
1586    filemgr_header_revnum_t latest_header_revnum = 0;
1587    fdb_seqtree_opt_t seqtree_opt = config->seqtree_opt;
1588    uint64_t ndocs = 0;
1589    uint64_t ndeletes = 0;
1590    uint64_t datasize = 0;
1591    uint64_t deltasize = 0;
1592    uint64_t last_wal_flush_hdr_bid = BLK_NOT_FOUND;
1593    uint64_t kv_info_offset = BLK_NOT_FOUND;
1594    uint64_t version;
1595    uint64_t header_flags = 0;
1596    uint8_t header_buf[FDB_BLOCKSIZE];
1597    char *compacted_filename = NULL;
1598    char *prev_filename = NULL;
1599    size_t header_len = 0;
1600    bool multi_kv_instances = config->multi_kv_instances;
1601
1602    uint64_t nlivenodes = 0;
1603    bid_t hdr_bid = 0; // initialize to zero for in-memory snapshot
1604    char actual_filename[FDB_MAX_FILENAME_LEN];
1605    char virtual_filename[FDB_MAX_FILENAME_LEN];
1606    char *target_filename = NULL;
1607    fdb_status status;
1608
1609    if (filename == NULL) {
1610        return FDB_RESULT_INVALID_ARGS;
1611    }
1612    if (strlen(filename) > (FDB_MAX_FILENAME_LEN - 8)) {
1613        // filename (including path) length is supported up to
1614        // (FDB_MAX_FILENAME_LEN - 8) bytes.
1615        return FDB_RESULT_TOO_LONG_FILENAME;
1616    }
1617
1618    if (filename_mode == FDB_VFILENAME &&
1619        !compactor_is_valid_mode(filename, (fdb_config *)config)) {
1620        return FDB_RESULT_INVALID_COMPACTION_MODE;
1621    }
1622
1623    _fdb_init_file_config(config, &fconfig);
1624
1625    if (filename_mode == FDB_VFILENAME) {
1626        compactor_get_actual_filename(filename, actual_filename,
1627                                      config->compaction_mode, &handle->log_callback);
1628    } else {
1629        strcpy(actual_filename, filename);
1630    }
1631
1632    if ( config->compaction_mode == FDB_COMPACTION_MANUAL ||
1633         (config->compaction_mode == FDB_COMPACTION_AUTO   &&
1634          filename_mode == FDB_VFILENAME) ) {
1635        // 1) manual compaction mode, OR
1636        // 2) auto compaction mode + 'filename' is virtual filename
1637        // -> copy 'filename'
1638        target_filename = (char *)filename;
1639    } else {
1640        // otherwise (auto compaction mode + 'filename' is actual filename)
1641        // -> copy 'virtual_filename'
1642        compactor_get_virtual_filename(filename, virtual_filename);
1643        target_filename = virtual_filename;
1644    }
1645
1646    // If the user is requesting legacy CRC pass that down to filemgr
1647    if(config->flags & FDB_OPEN_WITH_LEGACY_CRC) {
1648        fconfig.options |= FILEMGR_CREATE_CRC32;
1649    }
1650
1651    handle->fileops = get_filemgr_ops();
1652    filemgr_open_result result = filemgr_open((char *)actual_filename,
1653                                              handle->fileops,
1654                                              &fconfig, &handle->log_callback);
1655    if (result.rv != FDB_RESULT_SUCCESS) {
1656        return (fdb_status) result.rv;
1657    }
1658    handle->file = result.file;
1659
1660    if (config->compaction_mode == FDB_COMPACTION_MANUAL &&
1661        strcmp(filename, actual_filename)) {
1662        // It is in-place compacted file if
1663        // 1) compaction mode is manual, and
1664        // 2) actual filename is different to the filename given by user.
1665        // In this case, set the in-place compaction flag.
1666        filemgr_set_in_place_compaction(handle->file, true);
1667    }
1668    if (filemgr_is_in_place_compaction_set(handle->file)) {
1669        // This file was in-place compacted.
1670        // set 'handle->filename' to the original filename to trigger file renaming
1671        compactor_get_virtual_filename(filename, virtual_filename);
1672        target_filename = virtual_filename;
1673    }
1674
1675    if (handle->filename) {
1676        handle->filename = (char *)realloc(handle->filename,
1677                                           strlen(target_filename)+1);
1678    } else {
1679        handle->filename = (char*)malloc(strlen(target_filename)+1);
1680    }
1681    strcpy(handle->filename, target_filename);
1682
1683    // If cloning from a snapshot handle, fdb_snapshot_open would have already
1684    // set handle->last_hdr_bid to the block id of required header, so rewind..
1685    bid_t last_hdr_bid = atomic_get_uint64_t(&handle->last_hdr_bid);
1686    if (handle->shandle && last_hdr_bid) {
1687        status = filemgr_fetch_header(handle->file, last_hdr_bid,
1688                                      header_buf, &header_len, &seqnum,
1689                                      &latest_header_revnum, &deltasize, &version,
1690                                      NULL, &handle->log_callback);
1691        if (status != FDB_RESULT_SUCCESS) {
1692            free(handle->filename);
1693            handle->filename = NULL;
1694            filemgr_close(handle->file, false, handle->filename,
1695                              &handle->log_callback);
1696            return status;
1697        }
1698    } else { // Normal open
1699        filemgr_get_header(handle->file, header_buf, &header_len,
1700                           &last_hdr_bid, &seqnum, &latest_header_revnum);
1701        atomic_store_uint64_t(&handle->last_hdr_bid, last_hdr_bid);
1702        version = handle->file->version;
1703    }
1704
1705    // initialize the docio handle so kv headers may be read
1706    handle->dhandle = (struct docio_handle *)
1707                      calloc(1, sizeof(struct docio_handle));
1708    handle->dhandle->log_callback = &handle->log_callback;
1709    docio_init(handle->dhandle, handle->file, config->compress_document_body);
1710
1711    // fetch previous superblock bitmap info if exists
1712    // (this should be done after 'handle->dhandle' is initialized)
1713    if (handle->file->sb) {
1714        status = sb_bmp_fetch_doc(handle);
1715        if (status != FDB_RESULT_SUCCESS) {
1716            docio_free(handle->dhandle);
1717            free(handle->dhandle);
1718            free(handle->filename);
1719            handle->filename = NULL;
1720            filemgr_close(handle->file, false, handle->filename,
1721                              &handle->log_callback);
1722            return status;
1723        }
1724    }
1725
1726
1727    if (header_len > 0) {
1728        fdb_fetch_header(version, header_buf, &trie_root_bid, &seq_root_bid,
1729                         &stale_root_bid, &ndocs, &ndeletes, &nlivenodes,
1730                         &datasize, &last_wal_flush_hdr_bid, &kv_info_offset,
1731                         &header_flags, &compacted_filename, &prev_filename);
1732        // use existing setting for seqtree_opt
1733        if (header_flags & FDB_FLAG_SEQTREE_USE) {
1734            seqtree_opt = FDB_SEQTREE_USE;
1735        } else {
1736            seqtree_opt = FDB_SEQTREE_NOT_USE;
1737        }
1738        // Retrieve seqnum for multi-kv mode
1739        if (handle->kvs && handle->kvs->id > 0) {
1740            if (kv_info_offset != BLK_NOT_FOUND) {
1741                if (!filemgr_get_kv_header(handle->file)) {
1742                    struct kvs_header *kv_header;
1743                    _fdb_kvs_header_create(&kv_header);
1744                    // KV header already exists but not loaded .. read & import
1745                    fdb_kvs_header_read(kv_header, handle->dhandle,
1746                                        kv_info_offset, version, false);
1747                    if (!filemgr_set_kv_header(handle->file, kv_header,
1748                                               fdb_kvs_header_free)) {
1749                        _fdb_kvs_header_free(kv_header);
1750                    }
1751                }
1752                seqnum = _fdb_kvs_get_seqnum(handle->file->kv_header,
1753                                             handle->kvs->id);
1754            } else { // no kv_info offset, ok to set seqnum to zero
1755                seqnum = 0;
1756            }
1757        }
1758        // other flags
1759        if (header_flags & FDB_FLAG_ROOT_INITIALIZED) {
1760            handle->fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1761        }
1762        if (header_flags & FDB_FLAG_ROOT_CUSTOM_CMP) {
1763            handle->fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1764        }
1765        // use existing setting for multi KV instance mode
1766        if (kv_info_offset == BLK_NOT_FOUND) {
1767            multi_kv_instances = false;
1768        } else {
1769            multi_kv_instances = true;
1770        }
1771    }
1772
1773    handle->config = *config;
1774    handle->config.seqtree_opt = seqtree_opt;
1775    handle->config.multi_kv_instances = multi_kv_instances;
1776
1777    if (handle->shandle && handle->max_seqnum == FDB_SNAPSHOT_INMEM) {
1778        // Either an in-memory snapshot or cloning from an existing snapshot..
1779        hdr_bid = 0; // This prevents _fdb_restore_wal() as incoming handle's
1780                     // *_open() should have already restored it
1781    } else { // Persisted snapshot or file rollback..
1782
1783        // get the BID of the latest block
1784        // (it is OK if the block is not a DB header)
1785        bool dirty_data_exists = false;
1786        struct superblock *sb = handle->file->sb;
1787
1788        if (sb_bmp_exists(sb)) {
1789            dirty_data_exists = false;
1790            bid_t sb_last_hdr_bid = atomic_get_uint64_t(&sb->last_hdr_bid);
1791            if (sb_last_hdr_bid != BLK_NOT_FOUND) {
1792                // add 1 since we subtract 1 from 'hdr_bid' below soon
1793                hdr_bid = sb_last_hdr_bid + 1;
1794                if (atomic_get_uint64_t(&sb->cur_alloc_bid) != hdr_bid) {
1795                    // seq number has been increased since the last commit
1796                    seqnum = fdb_kvs_get_committed_seqnum(handle);
1797                }
1798            } else {
1799                hdr_bid = BLK_NOT_FOUND;
1800            }
1801        } else {
1802            hdr_bid = filemgr_get_pos(handle->file) / FDB_BLOCKSIZE;
1803            dirty_data_exists = (hdr_bid >
1804                        atomic_get_uint64_t(&handle->last_hdr_bid));
1805        }
1806
1807        if (hdr_bid == BLK_NOT_FOUND ||
1808            (sb && hdr_bid <= sb->config->num_sb)) {
1809            hdr_bid = 0;
1810        } else if (hdr_bid > 0) {
1811            --hdr_bid;
1812        }
1813
1814        if (handle->max_seqnum) {
1815            struct kvs_stat stat_ori;
1816            // backup original stats
1817            if (handle->kvs) {
1818                _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1819            } else {
1820                _kvs_stat_get(handle->file, 0, &stat_ori);
1821            }
1822
1823            if (dirty_data_exists){
1824                // uncommitted data exists beyond the last DB header
1825                // get the last committed seq number
1826                fdb_seqnum_t seq_commit;
1827                seq_commit = fdb_kvs_get_committed_seqnum(handle);
1828                if (seq_commit == 0 || seq_commit < handle->max_seqnum) {
1829                    // In case, snapshot_open is attempted with latest uncommitted
1830                    // sequence number
1831                    header_len = 0;
1832                } else if (seq_commit == handle->max_seqnum) {
1833                    // snapshot/rollback on the latest commit header
1834                    seqnum = seq_commit; // skip file reverse scan
1835                }
1836                hdr_bid = filemgr_get_header_bid(handle->file);
1837            }
1838            // Reverse scan the file to locate the DB header with seqnum marker
1839            header_revnum = latest_header_revnum;
1840            while (header_len && seqnum != handle->max_seqnum) {
1841                hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
1842                                          header_buf, &header_len, &seqnum,
1843                                          &header_revnum, NULL, &version, NULL,
1844                                          &handle->log_callback);
1845                if (header_len == 0) {
1846                    continue; // header doesn't exist
1847                }
1848                fdb_fetch_header(version, header_buf, &trie_root_bid,
1849                                 &seq_root_bid, &stale_root_bid,
1850                                 &ndocs, &ndeletes, &nlivenodes,
1851                                 &datasize, &last_wal_flush_hdr_bid,
1852                                 &kv_info_offset, &header_flags,
1853                                 &compacted_filename, NULL);
1854                atomic_store_uint64_t(&handle->last_hdr_bid, hdr_bid);
1855
1856                if (!handle->kvs || handle->kvs->id == 0) {
1857                    // single KVS mode OR default KVS
1858                    if (!handle->shandle) {
1859                        // rollback
1860                        struct kvs_stat stat_dst;
1861                        _kvs_stat_get(handle->file, 0, &stat_dst);
1862                        stat_dst.ndocs = ndocs;
1863                        stat_dst.ndeletes = ndeletes;
1864                        stat_dst.datasize = datasize;
1865                        stat_dst.nlivenodes = nlivenodes;
1866                        stat_dst.deltasize = deltasize;
1867                        _kvs_stat_set(handle->file, 0, stat_dst);
1868                    }
1869                    continue;
1870                }
1871
1872                int64_t doc_offset;
1873                struct kvs_header *kv_header;
1874                struct docio_object doc;
1875
1876                _fdb_kvs_header_create(&kv_header);
1877                memset(&doc, 0, sizeof(struct docio_object));
1878                doc_offset = docio_read_doc(handle->dhandle,
1879                                            kv_info_offset, &doc, true);
1880
1881                if (doc_offset <= 0) {
1882                    header_len = 0; // fail
1883                    _fdb_kvs_header_free(kv_header);
1884                } else {
1885                    _fdb_kvs_header_import(kv_header, doc.body,
1886                                           doc.length.bodylen, version, false);
1887                    // get local sequence number for the KV instance
1888                    seqnum = _fdb_kvs_get_seqnum(kv_header,
1889                                                 handle->kvs->id);
1890                    if (!handle->shandle) {
1891                        // rollback: replace kv_header stats
1892                        // read from the current header's kv_header
1893                        struct kvs_stat stat_src, stat_dst;
1894                        _kvs_stat_get_kv_header(kv_header,
1895                                                handle->kvs->id,
1896                                                &stat_src);
1897                        _kvs_stat_get(handle->file,
1898                                      handle->kvs->id,
1899                                      &stat_dst);
1900                        // update ndocs, datasize, nlivenodes
1901                        // into the current file's kv_header
1902                        // Note: stats related to WAL should not be updated
1903                        //       at this time. They will be adjusted through
1904                        //       discard & restore routines below.
1905                        stat_dst.ndocs = stat_src.ndocs;
1906                        stat_dst.datasize = stat_src.datasize;
1907                        stat_dst.nlivenodes = stat_src.nlivenodes;
1908                        _kvs_stat_set(handle->file,
1909                                      handle->kvs->id,
1910                                      stat_dst);
1911                    }
1912                    _fdb_kvs_header_free(kv_header);
1913                    free_docio_object(&doc, 1, 1, 1);
1914                }
1915            }
1916
1917            if (header_len && // header exists
1918                config->block_reusing_threshold > 0 && // block reuse is enabled
1919                config->block_reusing_threshold < 100 &&
1920                header_revnum < sb_get_min_live_revnum(handle->file)) {
1921                // cannot perform rollback/snapshot beyond the last live header
1922                header_len = 0;
1923            }
1924
1925            if (!header_len) { // Marker MUST match that of DB commit!
1926                // rollback original stats
1927                if (handle->kvs) {
1928                    _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1929                } else {
1930                    _kvs_stat_get(handle->file, 0, &stat_ori);
1931                }
1932
1933                docio_free(handle->dhandle);
1934                free(handle->dhandle);
1935                free(handle->filename);
1936                free(prev_filename);
1937                handle->filename = NULL;
1938                filemgr_close(handle->file, false, handle->filename,
1939                              &handle->log_callback);
1940                return FDB_RESULT_NO_DB_INSTANCE;
1941            }
1942
1943            if (!handle->shandle) { // Rollback mode, destroy file WAL..
1944                if (handle->config.multi_kv_instances) {
1945                    // multi KV instance mode
1946                    // clear only WAL items belonging to the instance
1947                    wal_close_kv_ins(handle->file,
1948                                     (handle->kvs)?(handle->kvs->id):(0),
1949                                     &handle->log_callback);
1950                } else {
1951                    wal_shutdown(handle->file, &handle->log_callback);
1952                }
1953            }
1954        } else { // snapshot to sequence number 0 requested..
1955            if (handle->shandle) { // fdb_snapshot_open API call
1956                if (seqnum) {
1957                    // Database currently has a non-zero seq number,
1958                    // but the snapshot was requested with a seq number zero.
1959                    docio_free(handle->dhandle);
1960                    free(handle->dhandle);
1961                    free(handle->filename);
1962                    free(prev_filename);
1963                    handle->filename = NULL;
1964                    filemgr_close(handle->file, false, handle->filename,
1965                                  &handle->log_callback);
1966                    return FDB_RESULT_NO_DB_INSTANCE;
1967                }
1968            } // end of zero max_seqnum but non-rollback check
1969        } // end of zero max_seqnum check
1970    } // end of durable snapshot locating
1971
1972    handle->btreeblkops = btreeblk_get_ops();
1973    handle->bhandle = (struct btreeblk_handle *)
1974                      calloc(1, sizeof(struct btreeblk_handle));
1975    handle->bhandle->log_callback = &handle->log_callback;
1976
1977    handle->dirty_updates = 0;
1978
1979    if (handle->config.compaction_buf_maxsize == 0) {
1980        handle->config.compaction_buf_maxsize = FDB_COMP_BUF_MINSIZE;
1981    }
1982
1983    btreeblk_init(handle->bhandle, handle->file, handle->file->blocksize);
1984
1985    handle->cur_header_revnum = latest_header_revnum;
1986    if (header_revnum) {
1987        if (filemgr_is_rollback_on(handle->file)) {
1988            // rollback mode
1989            // set rollback header revnum
1990            handle->rollback_revnum = header_revnum;
1991        } else {
1992            // snapshot mode (only for snapshot)
1993            handle->cur_header_revnum = header_revnum;
1994        }
1995    }
1996    handle->last_wal_flush_hdr_bid = last_wal_flush_hdr_bid;
1997
1998    memset(&empty_stat, 0x0, sizeof(empty_stat));
1999    _kvs_stat_get(handle->file, 0, &stat);
2000    if (!memcmp(&stat, &empty_stat, sizeof(stat))) { // first open
2001        // sync (default) KVS stat with DB header
2002        stat.nlivenodes = nlivenodes;
2003        stat.ndocs = ndocs;
2004        stat.datasize = datasize;
2005        _kvs_stat_set(handle->file, 0, stat);
2006    }
2007
2008    handle->kv_info_offset = kv_info_offset;
2009    if (handle->config.multi_kv_instances && !handle->shandle) {
2010        // multi KV instance mode
2011        filemgr_mutex_lock(handle->file);
2012        if (kv_info_offset == BLK_NOT_FOUND) {
2013            // there is no KV header .. create & initialize
2014            fdb_kvs_header_create(handle->file);
2015            // TODO: If another handle is opened before the first header is appended,
2016            // an unnecessary KV info doc is appended. We need to address it.
2017            kv_info_offset = fdb_kvs_header_append(handle);
2018        } else if (handle->file->kv_header == NULL) {
2019            // KV header already exists but not loaded .. read & import
2020            fdb_kvs_header_create(handle->file);
2021            fdb_kvs_header_read(handle->file->kv_header, handle->dhandle,
2022                                kv_info_offset, version, false);
2023        }
2024        filemgr_mutex_unlock(handle->file);
2025
2026        // validation check for key order of all KV stores
2027        if (handle == handle->fhandle->root) {
2028            fdb_status fs = fdb_kvs_cmp_check(handle);
2029            if (fs != FDB_RESULT_SUCCESS) { // cmp function mismatch
2030                docio_free(handle->dhandle);
2031                free(handle->dhandle);
2032                btreeblk_free(handle->bhandle);
2033                free(handle->bhandle);
2034                free(handle->filename);
2035                handle->filename = NULL;
2036                filemgr_close(handle->file, false, handle->filename,
2037                              &handle->log_callback);
2038                return fs;
2039            }
2040        }
2041    }
2042    handle->kv_info_offset = kv_info_offset;
2043
2044    if (handle->kv_info_offset != BLK_NOT_FOUND &&
2045        handle->kvs == NULL) {
2046        // multi KV instance mode .. turn on config flag
2047        handle->config.multi_kv_instances = true;
2048        // only super handle can be opened using fdb_open(...)
2049        fdb_kvs_info_create(NULL, handle, handle->file, NULL);
2050    }
2051
2052    if (handle->shandle) { // Populate snapshot stats..
2053        if (kv_info_offset == BLK_NOT_FOUND) { // Single KV mode
2054            memset(&handle->shandle->stat, 0x0,
2055                    sizeof(handle->shandle->stat));
2056            handle->shandle->stat.ndocs = ndocs;
2057            handle->shandle->stat.datasize = datasize;
2058            handle->shandle->stat.nlivenodes = nlivenodes;
2059        } else { // Multi KV instance mode, populate specific kv stats
2060            memset(&handle->shandle->stat, 0x0,
2061                    sizeof(handle->shandle->stat));
2062            _kvs_stat_get(handle->file, handle->kvs->id,
2063                    &handle->shandle->stat);
2064            // Since wal is restored below, we have to reset
2065            // wal stats to zero.
2066            handle->shandle->stat.wal_ndeletes = 0;
2067            handle->shandle->stat.wal_ndocs = 0;
2068        }
2069    }
2070
2071    // initialize pointer to the global operational stats of this KV store
2072    handle->op_stats = filemgr_get_ops_stats(handle->file, handle->kvs);
2073    if (!handle->op_stats) {
2074        const char *msg = "Database open fails due to the error in retrieving "
2075            "the global operational stats of KV store in a database file '%s'\n";
2076        fdb_log(&handle->log_callback, FDB_RESULT_OPEN_FAIL, msg,
2077                handle->file->filename);
2078        return FDB_RESULT_OPEN_FAIL;
2079    }
2080
2081    handle->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
2082    hbtrie_init(handle->trie, config->chunksize, OFFSET_SIZE,
2083                handle->file->blocksize, trie_root_bid,
2084                (void *)handle->bhandle, handle->btreeblkops,
2085                (void *)handle->dhandle, _fdb_readkey_wrap);
2086    // set aux for cmp wrapping function
2087    hbtrie_set_leaf_height_limit(handle->trie, 0xff);
2088    hbtrie_set_leaf_cmp(handle->trie, _fdb_custom_cmp_wrap);
2089
2090    if (handle->kvs) {
2091        hbtrie_set_map_function(handle->trie, fdb_kvs_find_cmp_chunk);
2092    }
2093
2094    handle->seqnum = seqnum;
2095    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2096        if (handle->config.multi_kv_instances) {
2097            // multi KV instance mode .. HB+trie
2098            handle->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
2099            hbtrie_init(handle->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
2100                        handle->file->blocksize, seq_root_bid,
2101                        (void *)handle->bhandle, handle->btreeblkops,
2102                        (void *)handle->dhandle, _fdb_readseq_wrap);
2103
2104        } else {
2105            // single KV instance mode .. normal B+tree
2106            struct btree_kv_ops *seq_kv_ops =
2107                (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
2108            seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
2109            seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
2110
2111            handle->seqtree = (struct btree*)malloc(sizeof(struct btree));
2112            if (seq_root_bid == BLK_NOT_FOUND) {
2113                btree_init(handle->seqtree, (void *)handle->bhandle,
2114                           handle->btreeblkops, seq_kv_ops,
2115                           handle->config.blocksize, sizeof(fdb_seqnum_t),
2116                           OFFSET_SIZE, 0x0, NULL);
2117            }else{
2118                btree_init_from_bid(handle->seqtree, (void *)handle->bhandle,
2119                                    handle->btreeblkops, seq_kv_ops,
2120                                    handle->config.blocksize, seq_root_bid);
2121            }
2122        }
2123    }else{
2124        handle->seqtree = NULL;
2125    }
2126
2127    // Stale-block tree (supported since MAGIC_002)
2128    // this tree is independent to multi/single KVS mode option
2129    if (ver_staletree_support(handle->file->version)) {
2130        // normal B+tree
2131        struct btree_kv_ops *stale_kv_ops =
2132            (struct btree_kv_ops *)calloc(1, sizeof(struct btree_kv_ops));
2133        stale_kv_ops = btree_kv_get_kb64_vb64(stale_kv_ops);
2134        stale_kv_ops->cmp = _cmp_uint64_t_endian_safe;
2135
2136        handle->staletree = (struct btree*)calloc(1, sizeof(struct btree));
2137        if (stale_root_bid == BLK_NOT_FOUND) {
2138            btree_init(handle->staletree, (void *)handle->bhandle,
2139                       handle->btreeblkops, stale_kv_ops,
2140                       handle->config.blocksize, sizeof(filemgr_header_revnum_t),
2141                       OFFSET_SIZE, 0x0, NULL);
2142         }else{
2143            btree_init_from_bid(handle->staletree, (void *)handle->bhandle,
2144                                handle->btreeblkops, stale_kv_ops,
2145                                handle->config.blocksize, stale_root_bid);
2146            // prefetch stale info into memory
2147            fdb_load_inmem_stale_info(handle);
2148         }
2149    } else {
2150        handle->staletree = NULL;
2151    }
2152
2153    if (handle->config.multi_kv_instances && handle->max_seqnum) {
2154        // restore only docs belonging to the KV instance
2155        // handle->kvs should not be NULL
2156        _fdb_restore_wal(handle, FDB_RESTORE_KV_INS,
2157                         hdr_bid, (handle->kvs)?(handle->kvs->id):(0));
2158    } else {
2159        // normal restore
2160        _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, hdr_bid, 0);
2161    }
2162
2163    if (compacted_filename &&
2164        filemgr_get_file_status(handle->file) == FILE_NORMAL &&
2165        !(config->flags & FDB_OPEN_FLAG_RDONLY)) { // do not recover read-only
2166        _fdb_recover_compaction(handle, compacted_filename);
2167    }
2168
2169    if (prev_filename) {
2170        if (!handle->shandle && strcmp(prev_filename, handle->file->filename)) {
2171            // record the old filename into the file handle of current file
2172            // and REMOVE old file on the first open
2173            // WARNING: snapshots must have been opened before this call
2174            if (filemgr_update_file_status(handle->file,
2175                                           filemgr_get_file_status(handle->file),
2176                                           prev_filename)) {
2177                // Open the old file with read-only mode.
2178                // (Temporarily disable log callback at this time since
2179                //  the old file might be already removed.)
2180                err_log_callback dummy_cb;
2181                dummy_cb.callback = fdb_dummy_log_callback;
2182                dummy_cb.ctx_data = NULL;
2183                fconfig.options = FILEMGR_READONLY;
2184                filemgr_open_result result = filemgr_open(prev_filename,
2185                                                          handle->fileops,
2186                                                          &fconfig,
2187                                                          &dummy_cb);
2188                if (result.file) {
2189                    filemgr_remove_pending(result.file, handle->file,
2190                                           &handle->log_callback);
2191                    filemgr_close(result.file, 0, handle->filename,
2192                                  &handle->log_callback);
2193                }
2194            } else {
2195                free(prev_filename);
2196            }
2197        } else {
2198            free(prev_filename);
2199        }
2200    }
2201
2202    status = btreeblk_end(handle->bhandle);
2203    if (status != FDB_RESULT_SUCCESS) {
2204        // When fdb_kvs_open() is being issued in parallel with fdb_open()
2205        // it is possible that this call (fdb_open()) hits a write failure
2206        // because the btreeblock to be written was already made immutable
2207        // by the commit from the fdb_kvs_open(). Simpy ignore this error case.
2208        if (status == FDB_RESULT_WRITE_FAIL) {
2209            if (filemgr_get_header_revnum(handle->file)
2210                                             == latest_header_revnum) {
2211                return status;
2212            } else {
2213                status = FDB_RESULT_SUCCESS;
2214            }
2215        } else {
2216            return status;
2217        }
2218    }
2219
2220    // do not register read-only handles
2221    if (!(config->flags & FDB_OPEN_FLAG_RDONLY)) {
2222        if (config->compaction_mode == FDB_COMPACTION_AUTO) {
2223            status = compactor_register_file(handle->file,
2224                                             (fdb_config *)config,
2225                                             &handle->log_callback);
2226        }
2227        if (status == FDB_RESULT_SUCCESS) {
2228            status = bgflusher_register_file(handle->file,
2229                                             (fdb_config *)config,
2230                                             &handle->log_callback);
2231        }
2232    }
2233
2234    return status;
2235}
2236
2237LIBFDB_API
2238fdb_status fdb_set_log_callback(fdb_kvs_handle *handle,
2239                                fdb_log_callback log_callback,
2240                                void *ctx_data)
2241{
2242    if (!handle) {
2243        return FDB_RESULT_INVALID_HANDLE;
2244    }
2245
2246    handle->log_callback.callback = log_callback;
2247    handle->log_callback.ctx_data = ctx_data;
2248    return FDB_RESULT_SUCCESS;
2249}
2250
2251LIBFDB_API
2252void fdb_set_fatal_error_callback(fdb_fatal_error_callback err_callback)
2253{
2254    fatal_error_callback = err_callback;
2255}
2256
2257LIBFDB_API
2258fdb_status fdb_doc_create(fdb_doc **doc, const void *key, size_t keylen,
2259                          const void *meta, size_t metalen,
2260                          const void *body, size_t bodylen)
2261{
2262    if (doc == NULL || keylen > FDB_MAX_KEYLEN ||
2263        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
2264        return FDB_RESULT_INVALID_ARGS;
2265    }
2266
2267    *doc = (fdb_doc*)calloc(1, sizeof(fdb_doc));
2268    if (*doc == NULL) { // LCOV_EXCL_START
2269        return FDB_RESULT_ALLOC_FAIL;
2270    } // LCOV_EXCL_STOP
2271
2272    (*doc)->seqnum = SEQNUM_NOT_USED;
2273
2274    if (key && keylen > 0) {
2275        (*doc)->key = (void *)malloc(keylen);
2276        if ((*doc)->key == NULL) { // LCOV_EXCL_START
2277            return FDB_RESULT_ALLOC_FAIL;
2278        } // LCOV_EXCL_STOP
2279        memcpy((*doc)->key, key, keylen);
2280        (*doc)->keylen = keylen;
2281    } else {
2282        (*doc)->key = NULL;
2283        (*doc)->keylen = 0;
2284    }
2285
2286    if (meta && metalen > 0) {
2287        (*doc)->meta = (void *)malloc(metalen);
2288        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
2289            return FDB_RESULT_ALLOC_FAIL;
2290        } // LCOV_EXCL_STOP
2291        memcpy((*doc)->meta, meta, metalen);
2292        (*doc)->metalen = metalen;
2293    } else {
2294        (*doc)->meta = NULL;
2295        (*doc)->metalen = 0;
2296    }
2297
2298    if (body && bodylen > 0) {
2299        (*doc)->body = (void *)malloc(bodylen);
2300        if ((*doc)->body == NULL) { // LCOV_EXCL_START
2301            return FDB_RESULT_ALLOC_FAIL;
2302        } // LCOV_EXCL_STOP
2303        memcpy((*doc)->body, body, bodylen);
2304        (*doc)->bodylen = bodylen;
2305    } else {
2306        (*doc)->body = NULL;
2307        (*doc)->bodylen = 0;
2308    }
2309
2310    return FDB_RESULT_SUCCESS;
2311}
2312
2313LIBFDB_API
2314fdb_status fdb_doc_update(fdb_doc **doc,
2315                          const void *meta, size_t metalen,
2316                          const void *body, size_t bodylen)
2317{
2318    if (doc == NULL ||
2319        metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
2320        return FDB_RESULT_INVALID_ARGS;
2321    }
2322    if (*doc == NULL) {
2323        return FDB_RESULT_INVALID_ARGS;
2324    }
2325
2326    if (meta && metalen > 0) {
2327        // free previous metadata
2328        free((*doc)->meta);
2329        // allocate new metadata
2330        (*doc)->meta = (void *)malloc(metalen);
2331        if ((*doc)->meta == NULL) { // LCOV_EXCL_START
2332            return FDB_RESULT_ALLOC_FAIL;
2333        } // LCOV_EXCL_STOP
2334        memcpy((*doc)->meta, meta, metalen);
2335        (*doc)->metalen = metalen;
2336    }
2337
2338    if (body && bodylen > 0) {
2339        // free previous body
2340        free((*doc)->body);
2341        // allocate new body
2342        (*doc)->body = (void *)malloc(bodylen);
2343        if ((*doc)->body == NULL) { // LCOV_EXCL_START
2344            return FDB_RESULT_ALLOC_FAIL;
2345        } // LCOV_EXCL_STOP
2346        memcpy((*doc)->body, body, bodylen);
2347        (*doc)->bodylen = bodylen;
2348    }
2349
2350    (*doc)->seqnum = SEQNUM_NOT_USED;
2351    return FDB_RESULT_SUCCESS;
2352}
2353
2354LIBFDB_API
2355void fdb_doc_set_seqnum(fdb_doc *doc,
2356                        const fdb_seqnum_t seqnum)
2357{
2358    if (doc) {
2359        doc->seqnum = seqnum;
2360        if (seqnum != SEQNUM_NOT_USED) {
2361            doc->flags |= FDB_CUSTOM_SEQNUM; // fdb_set will now use above seqnum
2362        } else { // reset custom seqnum flag, fdb_set will now generate new seqnum
2363            doc->flags &= ~FDB_CUSTOM_SEQNUM;
2364        }
2365    }
2366}
2367
2368// doc MUST BE allocated by malloc
2369LIBFDB_API
2370fdb_status fdb_doc_free(fdb_doc *doc)
2371{
2372    if (doc) {
2373        free(doc->key);
2374        free(doc->meta);
2375        free(doc->body);
2376        free(doc);
2377    }
2378    return FDB_RESULT_SUCCESS;
2379}
2380
2381INLINE uint64_t _fdb_wal_get_old_offset(void *voidhandle,
2382                                        struct wal_item *item)
2383{
2384    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
2385    uint64_t old_offset = 0;
2386
2387    if (item->action == WAL_ACT_REMOVE) {
2388        // For immediate remove, old_offset value is critical
2389        // so that we should get an exact value.
2390        hbtrie_find(handle->trie,
2391                    item->header->key,
2392                    item->header->keylen,
2393                    (void*)&old_offset);
2394    } else {
2395        hbtrie_find_offset(handle->trie,
2396                           item->header->key,
2397                           item->header->keylen,
2398                           (void*)&old_offset);
2399    }
2400    btreeblk_end(handle->bhandle);
2401    old_offset = _endian_decode(old_offset);
2402
2403    return old_offset;
2404}
2405
2406// A stale sequence number entry that can be purged from the sequence tree
2407// during the WAL flush.
2408struct wal_stale_seq_entry {
2409    fdb_kvs_id_t kv_id;
2410    fdb_seqnum_t seqnum;
2411    struct avl_node avl_entry;
2412};
2413
2414// Delta changes in KV store stats during the WAL flush
2415struct wal_kvs_delta_stat {
2416    fdb_kvs_id_t kv_id;
2417    int64_t nlivenodes;
2418    int64_t ndocs;
2419    int64_t ndeletes;
2420    int64_t datasize;
2421    int64_t deltasize;
2422    struct avl_node avl_entry;
2423};
2424
2425INLINE int _fdb_seq_entry_cmp(struct avl_node *a, struct avl_node *b, void *aux)
2426{
2427    (void) aux;
2428    struct wal_stale_seq_entry *entry1 = _get_entry(a, struct wal_stale_seq_entry,
2429                                                    avl_entry);
2430    struct wal_stale_seq_entry *entry2 = _get_entry(b, struct wal_stale_seq_entry,
2431                                                    avl_entry);
2432    if (entry1->kv_id < entry2->kv_id) {
2433        return -1;
2434    } else if (entry1->kv_id > entry2->kv_id) {
2435        return 1;
2436    } else {
2437        return _CMP_U64(entry1->seqnum, entry2->seqnum);
2438    }
2439}
2440
2441
2442// Compare function to sort KVS delta stat entries in the AVL tree during WAL flush
2443INLINE int _kvs_delta_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
2444{
2445    (void) aux;
2446    struct wal_kvs_delta_stat *stat1 = _get_entry(a, struct wal_kvs_delta_stat,
2447                                                  avl_entry);
2448    struct wal_kvs_delta_stat *stat2 = _get_entry(b, struct wal_kvs_delta_stat,
2449                                                  avl_entry);
2450    if (stat1->kv_id < stat2->kv_id) {
2451        return -1;
2452    } else if (stat1->kv_id > stat2->kv_id) {
2453        return 1;
2454    } else {
2455        return 0;
2456    }
2457}
2458
2459INLINE void _fdb_wal_flush_seq_purge(void *dbhandle,
2460                                     struct avl_tree *stale_seqnum_list,
2461                                     struct avl_tree *kvs_delta_stats)
2462{
2463    fdb_seqnum_t _seqnum;
2464    int64_t nlivenodes;
2465    int64_t ndeltanodes;
2466    int64_t delta;
2467    uint8_t kvid_seqnum[sizeof(fdb_kvs_id_t) + sizeof(fdb_seqnum_t)];
2468    struct wal_stale_seq_entry *seq_entry;
2469    struct wal_kvs_delta_stat *delta_stat;
2470    struct wal_kvs_delta_stat kvs_delta_query;
2471
2472    fdb_kvs_handle *handle = (fdb_kvs_handle *)dbhandle;
2473    struct avl_node *node = avl_first(stale_seqnum_list);
2474    while (node) {
2475        seq_entry = _get_entry(node, struct wal_stale_seq_entry, avl_entry);
2476        node = avl_next(node);
2477        nlivenodes = handle->bhandle->nlivenodes;
2478        ndeltanodes = handle->bhandle->ndeltanodes;
2479        _seqnum = _endian_encode(seq_entry->seqnum);
2480        if (handle->kvs) {
2481            // multi KV instance mode .. HB+trie
2482            kvid2buf(sizeof(fdb_kvs_id_t), seq_entry->kv_id, kvid_seqnum);
2483            memcpy(kvid_seqnum + sizeof(fdb_kvs_id_t), &_seqnum, sizeof(fdb_seqnum_t));
2484            hbtrie_remove(handle->seqtrie, (void*)kvid_seqnum,
2485                          sizeof(fdb_kvs_id_t) + sizeof(fdb_seqnum_t));
2486        } else {
2487            btree_remove(handle->seqtree, (void*)&_seqnum);
2488        }
2489        btreeblk_end(handle->bhandle);
2490
2491        kvs_delta_query.kv_id = seq_entry->kv_id;
2492        avl_node *delta_stat_node = avl_search(kvs_delta_stats,
2493                                               &kvs_delta_query.avl_entry,
2494                                               _kvs_delta_stat_cmp);
2495        if (delta_stat_node) {
2496            delta_stat = _get_entry(delta_stat_node, struct wal_kvs_delta_stat,
2497                                    avl_entry);
2498            delta = handle->bhandle->nlivenodes - nlivenodes;
2499            delta_stat->nlivenodes += delta;
2500            delta = handle->bhandle->ndeltanodes - ndeltanodes;
2501            delta *= handle->config.blocksize;
2502            delta_stat->deltasize += delta;
2503        }
2504        avl_remove(stale_seqnum_list, &seq_entry->avl_entry);
2505        free(seq_entry);
2506    }
2507}
2508
2509INLINE void _fdb_wal_flush_kvs_delta_stats(struct filemgr *file,
2510                                           struct avl_tree *kvs_delta_stats)
2511{
2512    struct avl_node *node;
2513    struct wal_kvs_delta_stat *delta_stat;
2514    node = avl_first(kvs_delta_stats);
2515    while (node) {
2516        delta_stat = _get_entry(node, struct wal_kvs_delta_stat, avl_entry);
2517        node = avl_next(node);
2518        _kvs_stat_update_attr(file, delta_stat->kv_id,
2519                              KVS_STAT_DATASIZE, delta_stat->datasize);
2520        _kvs_stat_update_attr(file, delta_stat->kv_id,
2521                              KVS_STAT_NDOCS, delta_stat->ndocs);
2522        _kvs_stat_update_attr(file, delta_stat->kv_id,
2523                              KVS_STAT_NDELETES, delta_stat->ndeletes);
2524        _kvs_stat_update_attr(file, delta_stat->kv_id,
2525                              KVS_STAT_NLIVENODES, delta_stat->nlivenodes);
2526        _kvs_stat_update_attr(file, delta_stat->kv_id,
2527                              KVS_STAT_DELTASIZE, delta_stat->deltasize);
2528        avl_remove(kvs_delta_stats, &delta_stat->avl_entry);
2529        free(delta_stat);
2530    }
2531}
2532
2533INLINE fdb_status _fdb_wal_flush_func(void *voidhandle,
2534                                      struct wal_item *item,
2535                                      struct avl_tree *stale_seqnum_list,
2536                                      struct avl_tree *kvs_delta_stats)
2537{
2538    hbtrie_result hr;
2539    fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
2540    fdb_seqnum_t _seqnum;
2541    fdb_kvs_id_t kv_id = 0;
2542    fdb_status fs = FDB_RESULT_SUCCESS;
2543    uint8_t *var_key = alca(uint8_t, handle->config.chunksize);
2544    int size_id, size_seq;
2545    uint8_t *kvid_seqnum;
2546    uint64_t old_offset;
2547    int64_t _offset;
2548    int64_t delta;
2549    struct docio_object _doc;
2550    struct filemgr *file = handle->dhandle->file;
2551
2552    memset(var_key, 0, handle->config.chunksize);
2553    if (handle->kvs) {
2554        buf2kvid(handle->config.chunksize, item->header->key, &kv_id);
2555    } else {
2556        kv_id = 0;
2557    }
2558
2559    struct wal_kvs_delta_stat *kvs_delta_stat;
2560    struct wal_kvs_delta_stat kvs_delta_query;
2561    kvs_delta_query.kv_id = kv_id;
2562    avl_node *delta_stat_node = avl_search(kvs_delta_stats,
2563                                           &kvs_delta_query.avl_entry,
2564                                           _kvs_delta_stat_cmp);
2565    if (delta_stat_node) {
2566        kvs_delta_stat = _get_entry(delta_stat_node, struct wal_kvs_delta_stat,
2567                                    avl_entry);
2568    } else {
2569        kvs_delta_stat = (struct wal_kvs_delta_stat *)
2570            calloc(1, sizeof(struct wal_kvs_delta_stat));
2571        kvs_delta_stat->kv_id = kv_id;
2572        avl_insert(kvs_delta_stats, &kvs_delta_stat->avl_entry,
2573                   _kvs_delta_stat_cmp);
2574    }
2575
2576    int64_t nlivenodes = handle->bhandle->nlivenodes;
2577    int64_t ndeltanodes = handle->bhandle->ndeltanodes;
2578
2579    if (item->action == WAL_ACT_INSERT ||
2580        item->action == WAL_ACT_LOGICAL_REMOVE) {
2581        _offset = _endian_encode(item->offset);
2582
2583        hbtrie_insert(handle->trie,
2584                      item->header->key,
2585                      item->header->keylen,
2586                      (void *)&_offset,
2587                      (void *)&old_offset);
2588
2589        fs = btreeblk_end(handle->bhandle);
2590        if (fs != FDB_RESULT_SUCCESS) {
2591            return fs;
2592        }
2593        old_offset = _endian_decode(old_offset);
2594
2595        if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2596            _seqnum = _endian_encode(item->seqnum);
2597            if (handle->kvs) {
2598                // multi KV instance mode .. HB+trie
2599                uint64_t old_offset_local;
2600
2601                size_id = sizeof(fdb_kvs_id_t);
2602                size_seq = sizeof(fdb_seqnum_t);
2603                kvid_seqnum = alca(uint8_t, size_id + size_seq);
2604                kvid2buf(size_id, kv_id, kvid_seqnum);
2605                memcpy(kvid_seqnum + size_id, &_seqnum, size_seq);
2606                hbtrie_insert(handle->seqtrie, kvid_seqnum, size_id + size_seq,
2607                              (void *)&_offset, (void *)&old_offset_local);
2608            } else {
2609                btree_insert(handle->seqtree, (void *)&_seqnum,
2610                             (void *)&_offset);
2611            }
2612            fs = btreeblk_end(handle->bhandle);
2613            if (fs != FDB_RESULT_SUCCESS) {
2614                return fs;
2615            }
2616        }
2617
2618        delta = handle->bhandle->nlivenodes - nlivenodes;
2619        kvs_delta_stat->nlivenodes += delta;
2620        delta = handle->bhandle->ndeltanodes - ndeltanodes;
2621        delta *= handle->config.blocksize;
2622        kvs_delta_stat->deltasize += delta;
2623
2624        if (old_offset == BLK_NOT_FOUND) {
2625            if (item->action == WAL_ACT_INSERT) {
2626                ++kvs_delta_stat->ndocs;
2627            } else { // inserted a logical deleted doc into main index
2628                ++kvs_delta_stat->ndeletes;
2629            }
2630            kvs_delta_stat->datasize += item->doc_size;
2631            kvs_delta_stat->deltasize += item->doc_size;
2632        } else { // update or logical delete
2633            // This block is already cached when we call HBTRIE_INSERT.
2634            // No additional block access.
2635            char dummy_key[FDB_MAX_KEYLEN];
2636            _doc.meta = _doc.body = NULL;
2637            _doc.key = &dummy_key;
2638            _offset = docio_read_doc_key_meta(handle->dhandle, old_offset,
2639                                              &_doc, true);
2640            if (_offset < 0) {
2641                return (fdb_status) _offset;
2642            } else if (_offset == 0) {
2643                // Note that this is not an error as old_offset is pointing to
2644                // the zero-filled region in a document block.
2645                return FDB_RESULT_KEY_NOT_FOUND;
2646            }
2647            free(_doc.meta);
2648            filemgr_mark_stale(file, old_offset, _fdb_get_docsize(_doc.length));
2649
2650            if (!(_doc.length.flag & DOCIO_DELETED)) {//prev doc was not deleted
2651                if (item->action == WAL_ACT_LOGICAL_REMOVE) { // now deleted
2652                    --kvs_delta_stat->ndocs;
2653                    ++kvs_delta_stat->ndeletes;
2654                } // else no change (prev doc was insert, now just an update)
2655            } else { // prev doc in main index was a logically deleted doc
2656                if (item->action == WAL_ACT_INSERT) { // now undeleted
2657                    ++kvs_delta_stat->ndocs;
2658                    --kvs_delta_stat->ndeletes;
2659                } // else no change (prev doc was deleted, now re-deleted)
2660            }
2661
2662            delta = (int)item->doc_size - (int)_fdb_get_docsize(_doc.length);
2663            kvs_delta_stat->datasize += delta;
2664            bid_t last_hdr_bid = atomic_get_uint64_t(&handle->last_hdr_bid);
2665            if (last_hdr_bid * handle->config.blocksize < old_offset) {
2666                kvs_delta_stat->deltasize += delta;
2667            } else {
2668                kvs_delta_stat->deltasize += (int)item->doc_size;
2669            }
2670
2671            // Avoid duplicates (remove previous sequence number)
2672            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2673                struct wal_stale_seq_entry *entry = (struct wal_stale_seq_entry *)
2674                    calloc(1, sizeof(struct wal_stale_seq_entry));
2675                entry->kv_id = kv_id;
2676                entry->seqnum = _doc.seqnum;
2677                avl_insert(stale_seqnum_list, &entry->avl_entry,
2678                           _fdb_seq_entry_cmp);
2679            }
2680        }
2681    } else {
2682        // Immediate remove
2683        old_offset = item->old_offset;
2684        hr = hbtrie_remove(handle->trie, item->header->key,
2685                           item->header->keylen);
2686        fs = btreeblk_end(handle->bhandle);
2687        if (fs != FDB_RESULT_SUCCESS) {
2688            return fs;
2689        }
2690
2691        if (hr == HBTRIE_RESULT_SUCCESS) {
2692            // This block is already cached when we call _fdb_wal_get_old_offset
2693            // No additional block access should be done.
2694            char dummy_key[FDB_MAX_KEYLEN];
2695            _doc.meta = _doc.body = NULL;
2696            _doc.key = &dummy_key;
2697            _offset = docio_read_doc_key_meta(handle->dhandle, old_offset,
2698                                              &_doc, true);
2699            if (_offset < 0) {
2700                return (fdb_status) _offset;
2701            } else if (_offset == 0) {
2702                return FDB_RESULT_KEY_NOT_FOUND;
2703            }
2704            free(_doc.meta);
2705            filemgr_mark_stale(file, old_offset, _fdb_get_docsize(_doc.length));
2706
2707            // Reduce the total number of docs by one
2708            --kvs_delta_stat->ndocs;
2709            if (_doc.length.flag & DOCIO_DELETED) {//prev deleted doc is dropped
2710                --kvs_delta_stat->ndeletes;
2711            }
2712
2713            // Reduce the total datasize by size of previously present doc
2714            delta = -(int)_fdb_get_docsize(_doc.length);
2715            kvs_delta_stat->datasize += delta;
2716            // if multiple wal flushes happen before commit, then it's possible
2717            // that this doc deleted was inserted & flushed after last commit
2718            // In this case we need to update the deltasize too which tracks
2719            // the amount of new data inserted between commits.
2720            bid_t last_hdr_bid = atomic_get_uint64_t(&handle->last_hdr_bid);
2721            if (last_hdr_bid * handle->config.blocksize < old_offset) {
2722                kvs_delta_stat->deltasize += delta;
2723            }
2724
2725            // remove sequence number for the removed doc
2726            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2727                struct wal_stale_seq_entry *entry = (struct wal_stale_seq_entry *)
2728                    calloc(1, sizeof(struct wal_stale_seq_entry));
2729                entry->kv_id = kv_id;
2730                entry->seqnum = _doc.seqnum;
2731                avl_insert(stale_seqnum_list, &entry->avl_entry, _fdb_seq_entry_cmp);
2732            }
2733
2734            // Update index size to new size after the remove operation
2735            delta = handle->bhandle->nlivenodes - nlivenodes;
2736            kvs_delta_stat->nlivenodes += delta;
2737
2738            // ndeltanodes measures number of new index nodes created due to
2739            // this hbtrie_remove() operation
2740            delta = (int)handle->bhandle->ndeltanodes - ndeltanodes;
2741            delta *= handle->config.blocksize;
2742            kvs_delta_stat->deltasize += delta;
2743        }
2744    }
2745    return FDB_RESULT_SUCCESS;
2746}
2747
2748void fdb_sync_db_header(fdb_kvs_handle *handle)
2749{
2750    uint64_t cur_revnum = filemgr_get_header_revnum(handle->file);
2751    if (handle->cur_header_revnum != cur_revnum) {
2752        void *header_buf = NULL;
2753        size_t header_len;
2754        bid_t hdr_bid;
2755        filemgr_header_revnum_t revnum;
2756
2757        header_buf = filemgr_get_header(handle->file, NULL, &header_len,
2758                                        &hdr_bid, NULL, &revnum);
2759        if (header_len > 0) {
2760            uint64_t header_flags, dummy64, version;
2761            bid_t idtree_root;
2762            bid_t new_seq_root;
2763            bid_t new_stale_root;
2764            char *compacted_filename;
2765            char *prev_filename = NULL;
2766
2767            version = handle->file->version;
2768            atomic_store_uint64_t(&handle->last_hdr_bid, hdr_bid);
2769            handle->cur_header_revnum = revnum;
2770
2771            fdb_fetch_header(version, header_buf, &idtree_root,
2772                             &new_seq_root, &new_stale_root, &dummy64,
2773                             &dummy64, &dummy64,
2774                             &dummy64, &handle->last_wal_flush_hdr_bid,
2775                             &handle->kv_info_offset, &header_flags,
2776                             &compacted_filename, &prev_filename);
2777
2778            if (handle->dirty_updates) {
2779                // discard all cached writable b+tree nodes
2780                // to avoid data inconsistency with other writers
2781                btreeblk_discard_blocks(handle->bhandle);
2782            }
2783
2784            handle->trie->root_bid = idtree_root;
2785
2786            if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2787                if (new_seq_root != handle->seqtree->root_bid) {
2788                    if (handle->config.multi_kv_instances) {
2789                        handle->seqtrie->root_bid = new_seq_root;
2790                    } else {
2791                        btree_init_from_bid(handle->seqtree,
2792                                            handle->seqtree->blk_handle,
2793                                            handle->seqtree->blk_ops,
2794                                            handle->seqtree->kv_ops,
2795                                            handle->seqtree->blksize,
2796                                            new_seq_root);
2797                    }
2798                }
2799            }
2800
2801            if (ver_staletree_support(version)) {
2802                btree_init_from_bid(handle->staletree,
2803                                    handle->staletree->blk_handle,
2804                                    handle->staletree->blk_ops,
2805                                    handle->staletree->kv_ops,
2806                                    handle->staletree->blksize,
2807                                    new_stale_root);
2808            } else {
2809                handle->staletree = NULL;
2810            }
2811
2812            if (prev_filename) {
2813                free(prev_filename);
2814            }
2815
2816            handle->dirty_updates = 0;
2817            if (handle->kvs) {
2818                // multiple KV instance mode AND sub handle
2819                handle->seqnum = fdb_kvs_get_seqnum(handle->file,
2820                                                    handle->kvs->id);
2821            } else {
2822                // super handle OR single KV instance mode
2823                handle->seqnum = filemgr_get_seqnum(handle->file);
2824            }
2825        } else {
2826            atomic_store_uint64_t(&handle->last_hdr_bid,
2827                                  filemgr_get_header_bid(handle->file));
2828        }
2829
2830        if (header_buf) {
2831            free(header_buf);
2832        }
2833    }
2834}
2835
2836fdb_status fdb_check_file_reopen(fdb_kvs_handle *handle, file_status_t *status)
2837{
2838    bool fhandle_ret;
2839    fdb_status fs = FDB_RESULT_SUCCESS;
2840    file_status_t fstatus = filemgr_get_file_status(handle->file);
2841    // check whether the compaction is done
2842    if (fstatus == FILE_REMOVED_PENDING) {
2843        uint64_t ndocs, ndeletes, datasize, nlivenodes, last_wal_flush_hdr_bid;
2844        uint64_t kv_info_offset, header_flags;
2845        size_t header_len;
2846        char *new_filename;
2847        uint8_t *buf = alca(uint8_t, handle->config.blocksize);
2848        bid_t trie_root_bid, seq_root_bid, stale_root_bid;
2849        fdb_config config = handle->config;
2850
2851        // close the current file and newly open the new file
2852        if (handle->config.compaction_mode == FDB_COMPACTION_AUTO) {
2853            // compaction daemon mode .. just close and then open
2854            char filename[FDB_MAX_FILENAME_LEN];
2855            strcpy(filename, handle->filename);
2856
2857            // We don't need to maintain fhandle list for the old file
2858            // as there will be no more mutation on the file.
2859            fhandle_ret = filemgr_fhandle_remove(handle->file, handle->fhandle);
2860            fs = _fdb_close(handle);
2861            if (fs != FDB_RESULT_SUCCESS) {
2862                if (fhandle_ret) {
2863                    filemgr_fhandle_add(handle->file, handle->fhandle);
2864                }
2865                return fs;
2866            }
2867
2868            fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
2869            if (fs != FDB_RESULT_SUCCESS) {
2870                return fs;
2871            }
2872            filemgr_fhandle_add(handle->file, handle->fhandle);
2873
2874        } else {
2875            filemgr_get_header(handle->file, buf, &header_len, NULL, NULL, NULL);
2876            fdb_fetch_header(handle->file->version, buf,
2877                             &trie_root_bid, &seq_root_bid, &stale_root_bid,
2878                             &ndocs, &ndeletes, &nlivenodes, &datasize,
2879                             &last_wal_flush_hdr_bid,
2880                             &kv_info_offset, &header_flags,
2881                             &new_filename, NULL);
2882
2883            fhandle_ret = filemgr_fhandle_remove(handle->file, handle->fhandle);
2884            fs = _fdb_close(handle);
2885            if (fs != FDB_RESULT_SUCCESS) {
2886                if (fhandle_ret) {
2887                    filemgr_fhandle_add(handle->file, handle->fhandle);
2888                }
2889                return fs;
2890            }
2891
2892            fs = _fdb_open(handle, new_filename, FDB_AFILENAME, &config);
2893            if (fs != FDB_RESULT_SUCCESS) {
2894                return fs;
2895            }
2896            filemgr_fhandle_add(handle->file, handle->fhandle);
2897        }
2898    }
2899    if (status) {
2900        *status = fstatus;
2901    }
2902    return fs;
2903}
2904
2905static void _fdb_sync_dirty_root(fdb_kvs_handle *handle)
2906{
2907    bid_t dirty_idtree_root = BLK_NOT_FOUND;
2908    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
2909
2910    if (handle->shandle) {
2911        // skip snapshot
2912        return;
2913    }
2914
2915    struct filemgr_dirty_update_node *dirty_update;
2916    dirty_update = filemgr_dirty_update_get_latest(handle->file);
2917    btreeblk_set_dirty_update(handle->bhandle, dirty_update);
2918
2919    if (dirty_update) {
2920        filemgr_dirty_update_get_root(handle->file, dirty_update,
2921                                      &dirty_idtree_root, &dirty_seqtree_root);
2922        _fdb_import_dirty_root(handle, dirty_idtree_root, dirty_seqtree_root);
2923        btreeblk_discard_blocks(handle->bhandle);
2924    }
2925
2926    return;
2927}
2928
2929static void _fdb_release_dirty_root(fdb_kvs_handle *handle)
2930{
2931    if (!handle->shandle) {
2932        struct filemgr_dirty_update_node *dirty_update;
2933        dirty_update = btreeblk_get_dirty_update(handle->bhandle);
2934        if (dirty_update) {
2935            filemgr_dirty_update_close_node(handle->file, dirty_update);
2936            btreeblk_clear_dirty_update(handle->bhandle);
2937        }
2938    }
2939}
2940
2941LIBFDB_API
2942fdb_status fdb_get(fdb_kvs_handle *handle, fdb_doc *doc)
2943{
2944    uint64_t offset;
2945    int64_t _offset;
2946    struct docio_object _doc;
2947    struct filemgr *wal_file = NULL;
2948    struct docio_handle *dhandle;
2949    struct _fdb_key_cmp_info cmp_info;
2950    fdb_status wr;
2951    hbtrie_result hr = HBTRIE_RESULT_FAIL;
2952    fdb_txn *txn;
2953    fdb_doc doc_kv;
2954    LATENCY_STAT_START();
2955
2956    if (!handle) {
2957        return FDB_RESULT_INVALID_HANDLE;
2958    }
2959
2960    if (!doc || !doc->key || doc->keylen == 0 ||
2961        doc->keylen > FDB_MAX_KEYLEN ||
2962        (handle->kvs_config.custom_cmp &&
2963            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2964        return FDB_RESULT_INVALID_ARGS;
2965    }
2966
2967    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2968        return FDB_RESULT_HANDLE_BUSY;
2969    }
2970
2971    doc_kv = *doc;
2972
2973    if (handle->kvs) {
2974        // multi KV instance mode
2975        int size_chunk = handle->config.chunksize;
2976        doc_kv.keylen = doc->keylen + size_chunk;
2977        doc_kv.key = alca(uint8_t, doc_kv.keylen);
2978        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
2979        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
2980    }
2981
2982    if (!handle->shandle) {
2983        fdb_check_file_reopen(handle, NULL);
2984        txn = handle->fhandle->root->txn;
2985        if (!txn) {
2986            txn = &handle->file->global_txn;
2987        }
2988    } else {
2989        txn = handle->shandle->snap_txn;
2990    }
2991
2992    cmp_info.kvs_config = handle->kvs_config;
2993    cmp_info.kvs = handle->kvs;
2994    wal_file = handle->file;
2995    dhandle = handle->dhandle;
2996
2997    if (handle->kvs) {
2998        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, &doc_kv,
2999                      &offset);
3000    } else {
3001        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, doc,
3002                      &offset);
3003    }
3004
3005    if (!handle->shandle) {
3006        fdb_sync_db_header(handle);
3007    }
3008
3009    atomic_incr_uint64_t(&handle->op_stats->num_gets, std::memory_order_relaxed);
3010
3011    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
3012        _fdb_sync_dirty_root(handle);
3013
3014        if (handle->kvs) {
3015            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
3016                             (void *)&offset);
3017        } else {
3018            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
3019                             (void *)&offset);
3020        }
3021        btreeblk_end(handle->bhandle);
3022        offset = _endian_decode(offset);
3023
3024        _fdb_release_dirty_root(handle);
3025    }
3026
3027    if ((wr == FDB_RESULT_SUCCESS && offset != BLK_NOT_FOUND) ||
3028         hr == HBTRIE_RESULT_SUCCESS) {
3029        bool alloced_meta = doc->meta ? false : true;
3030        bool alloced_body = doc->body ? false : true;
3031        if (handle->kvs) {
3032            _doc.key = doc_kv.key;
3033            _doc.length.keylen = doc_kv.keylen;
3034            doc->deleted = doc_kv.deleted; // update deleted field if wal_find
3035        } else {
3036            _doc.key = doc->key;
3037            _doc.length.keylen = doc->keylen;
3038        }
3039        _doc.meta = doc->meta;
3040        _doc.body = doc->body;
3041
3042        if (wr == FDB_RESULT_SUCCESS && doc->deleted) {
3043            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3044            return FDB_RESULT_KEY_NOT_FOUND;
3045        }
3046
3047        _offset = docio_read_doc(dhandle, offset, &_doc, true);
3048        if (_offset <= 0) {
3049            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3050            return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
3051        }
3052
3053        if (_doc.length.keylen != doc_kv.keylen ||
3054            _doc.length.flag & DOCIO_DELETED) {
3055            free_docio_object(&_doc, 0, alloced_meta, alloced_body);
3056            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3057            return FDB_RESULT_KEY_NOT_FOUND;
3058        }
3059
3060        doc->seqnum = _doc.seqnum;
3061        doc->metalen = _doc.length.metalen;
3062        doc->bodylen = _doc.length.bodylen;
3063        doc->meta = _doc.meta;
3064        doc->body = _doc.body;
3065        doc->deleted = _doc.length.flag & DOCIO_DELETED;
3066        doc->size_ondisk = _fdb_get_docsize(_doc.length);
3067        doc->offset = offset;
3068
3069        LATENCY_STAT_END(handle->file, FDB_LATENCY_GETS);
3070        atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3071        return FDB_RESULT_SUCCESS;
3072    }
3073
3074    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3075    return FDB_RESULT_KEY_NOT_FOUND;
3076}
3077
3078// search document metadata using key
3079LIBFDB_API
3080fdb_status fdb_get_metaonly(fdb_kvs_handle *handle, fdb_doc *doc)
3081{
3082    uint64_t offset;
3083    struct docio_object _doc;
3084    struct docio_handle *dhandle;
3085    struct filemgr *wal_file = NULL;
3086    fdb_status wr;
3087    hbtrie_result hr = HBTRIE_RESULT_FAIL;
3088    fdb_txn *txn;
3089    struct _fdb_key_cmp_info cmp_info;
3090    fdb_doc doc_kv;
3091    LATENCY_STAT_START();
3092
3093    if (!handle) {
3094        return FDB_RESULT_INVALID_HANDLE;
3095    }
3096
3097    if (!doc || !doc->key ||
3098        doc->keylen == 0 || doc->keylen > FDB_MAX_KEYLEN ||
3099        (handle->kvs_config.custom_cmp &&
3100            doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
3101        return FDB_RESULT_INVALID_ARGS;
3102    }
3103
3104    doc_kv = *doc;
3105
3106    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
3107        return FDB_RESULT_HANDLE_BUSY;
3108    }
3109
3110    if (handle->kvs) {
3111        // multi KV instance mode
3112        int size_chunk = handle->config.chunksize;
3113        doc_kv.keylen = doc->keylen + size_chunk;
3114        doc_kv.key = alca(uint8_t, doc_kv.keylen);
3115        kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
3116        memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
3117    }
3118
3119    if (!handle->shandle) {
3120        fdb_check_file_reopen(handle, NULL);
3121        txn = handle->fhandle->root->txn;
3122        if (!txn) {
3123            txn = &handle->file->global_txn;
3124        }
3125    } else {
3126        txn = handle->shandle->snap_txn;
3127    }
3128
3129    cmp_info.kvs_config = handle->kvs_config;
3130    cmp_info.kvs = handle->kvs;
3131    wal_file = handle->file;
3132    dhandle = handle->dhandle;
3133
3134    if (handle->kvs) {
3135        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, &doc_kv,
3136                      &offset);
3137    } else {
3138        wr = wal_find(txn, wal_file, &cmp_info, handle->shandle, doc, &offset);
3139    }
3140
3141    if (!handle->shandle) {
3142        fdb_sync_db_header(handle);
3143    }
3144    atomic_incr_uint64_t(&handle->op_stats->num_gets, std::memory_order_relaxed);
3145
3146    if (wr == FDB_RESULT_KEY_NOT_FOUND) {
3147        _fdb_sync_dirty_root(handle);
3148
3149        if (handle->kvs) {
3150            hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
3151                             (void *)&offset);
3152        } else {
3153            hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
3154                             (void *)&offset);
3155        }
3156        btreeblk_end(handle->bhandle);
3157        offset = _endian_decode(offset);
3158
3159        _fdb_release_dirty_root(handle);
3160    }
3161
3162    if ((wr == FDB_RESULT_SUCCESS && offset != BLK_NOT_FOUND) ||
3163         hr == HBTRIE_RESULT_SUCCESS) {
3164        if (handle->kvs) {
3165            _doc.key = doc_kv.key;
3166            _doc.length.keylen = doc_kv.keylen;
3167        } else {
3168            _doc.key = doc->key;
3169            _doc.length.keylen = doc->keylen;
3170        }
3171        bool alloced_meta = doc->meta ? false : true;
3172        _doc.meta = doc->meta;
3173        _doc.body = doc->body;
3174
3175        int64_t body_offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
3176                                                       true);
3177        if (body_offset <= 0){
3178            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3179            return body_offset < 0 ? (fdb_status)body_offset : FDB_RESULT_KEY_NOT_FOUND;
3180        }
3181
3182        if (_doc.length.keylen != doc_kv.keylen) {
3183            free_docio_object(&_doc, 0, alloced_meta, 0);
3184            atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3185            return FDB_RESULT_KEY_NOT_FOUND;
3186        }
3187
3188        doc->seqnum = _doc.seqnum;
3189        doc->metalen = _doc.length.metalen;
3190        doc->bodylen = _doc.length.bodylen;
3191        doc->meta = _doc.meta;
3192        doc->body = _doc.body;
3193        doc->deleted = _doc.length.flag & DOCIO_DELETED;
3194        doc->size_ondisk = _fdb_get_docsize(_doc.length);
3195        doc->offset = offset;
3196
3197        LATENCY_STAT_END(handle->file, FDB_LATENCY_GETS);
3198        atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3199        return FDB_RESULT_SUCCESS;
3200    }
3201
3202    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
3203    return FDB_RESULT_KEY_NOT_FOUND;
3204}
3205
3206// search document using sequence number
3207LIBFDB_API
3208fdb_status fdb_get_byseq(fdb_kvs_handle *handle, fdb_doc *doc)
3209