1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <fcntl.h>
22 #include <time.h>
23 #if !defined(WIN32) && !defined(_WIN32)
24 #include <sys/time.h>
25 #endif
26 
27 #include "libforestdb/forestdb.h"
28 #include "fdb_internal.h"
29 #include "filemgr.h"
30 #include "hbtrie.h"
31 #include "list.h"
32 #include "breakpad.h"
33 #include "btree.h"
34 #include "btree_kv.h"
35 #include "btree_var_kv_ops.h"
36 #include "docio.h"
37 #include "btreeblock.h"
38 #include "common.h"
39 #include "wal.h"
40 #include "filemgr_ops.h"
41 #include "configuration.h"
42 #include "internal_types.h"
43 #include "bgflusher.h"
44 #include "compactor.h"
45 #include "memleak.h"
46 #include "time_utils.h"
47 #include "timing.h"
48 #include "system_resource_stats.h"
49 #include "version.h"
50 #include "staleblock.h"
51 
52 #ifdef __DEBUG
53 #ifndef __DEBUG_FDB
54     #undef DBG
55     #undef DBGCMD
56     #undef DBGSW
57     #define DBG(...)
58     #define DBGCMD(...)
59     #define DBGSW(n, ...)
60 #endif
61 #endif
62 
63 
64 static atomic_uint8_t fdb_initialized(0);
65 static volatile uint32_t fdb_open_inprog = 0;
66 #ifdef SPIN_INITIALIZER
67 static spin_t initial_lock = SPIN_INITIALIZER;
68 #else
69 static volatile unsigned int initial_lock_status = 0;
70 static spin_t initial_lock;
71 #endif
72 
_cmp_uint64_t_endian_safe(void *key1, void *key2, void *aux)73 INLINE int _cmp_uint64_t_endian_safe(void *key1, void *key2, void *aux)
74 {
75     (void) aux;
76     uint64_t a,b;
77     a = *(uint64_t*)key1;
78     b = *(uint64_t*)key2;
79     a = _endian_decode(a);
80     b = _endian_decode(b);
81     return _CMP_U64(a, b);
82 }
83 
_fdb_readkey_wrap(void *handle, uint64_t offset, void *buf)84 size_t _fdb_readkey_wrap(void *handle, uint64_t offset, void *buf)
85 {
86     fdb_status fs;
87     keylen_t keylen;
88     struct docio_handle *dhandle = (struct docio_handle*)handle;
89 
90     offset = _endian_decode(offset);
91     fs = docio_read_doc_key(dhandle, offset, &keylen, buf);
92     if (fs == FDB_RESULT_SUCCESS) {
93         return keylen;
94     } else {
95         const char *msg = "docio_read_doc_key error: read failure on "
96             "offset %" _F64 " in a database file '%s' "
97             ": FDB status %d, lastbid 0x%" _X64 ", "
98             "curblock 0x%" _X64 ", curpos 0x%x\n";
99         fdb_log(NULL, FDB_RESULT_READ_FAIL, msg, offset,
100                 dhandle->file->filename, fs, dhandle->lastbid,
101                 dhandle->curblock, dhandle->curpos);
102         dbg_print_buf(dhandle->readbuffer, dhandle->file->blocksize, true, 16);
103         return 0;
104     }
105 }
106 
_fdb_invalidate_dbheader(fdb_kvs_handle *handle )107 void _fdb_invalidate_dbheader(fdb_kvs_handle *handle ){
108     bid_t hdr_bid;
109     hdr_bid = handle->last_hdr_bid;
110     if (hdr_bid != BLK_NOT_FOUND){
111         // invalidate the last dbheader
112         filemgr_invalidate_dbheader(handle->file, hdr_bid, &handle->log_callback);
113     }
114 }
_fdb_readseq_wrap(void *handle, uint64_t offset, void *buf)115 size_t _fdb_readseq_wrap(void *handle, uint64_t offset, void *buf)
116 {
117     int size_id, size_seq, size_chunk;
118     fdb_seqnum_t _seqnum;
119     struct docio_object doc;
120     struct docio_handle *dhandle = (struct docio_handle *)handle;
121 
122     size_id = sizeof(fdb_kvs_id_t);
123     size_seq = sizeof(fdb_seqnum_t);
124     size_chunk = dhandle->file->config->chunksize;
125     memset(&doc, 0, sizeof(struct docio_object));
126 
127     offset = _endian_decode(offset);
128     if (docio_read_doc_key_meta((struct docio_handle *)handle, offset,
129                                 &doc, true) <= 0) {
130         return 0;
131     }
132     buf2buf(size_chunk, doc.key, size_id, buf);
133     _seqnum = _endian_encode(doc.seqnum);
134     memcpy((uint8_t*)buf + size_id, &_seqnum, size_seq);
135 
136     free(doc.key);
137     free(doc.meta);
138 
139     return size_id + size_seq;
140 }
141 
_fdb_custom_cmp_wrap(void *key1, void *key2, void *aux)142 int _fdb_custom_cmp_wrap(void *key1, void *key2, void *aux)
143 {
144     int is_key1_inf, is_key2_inf;
145     uint8_t *keystr1 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
146     uint8_t *keystr2 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
147     size_t keylen1, keylen2;
148     btree_cmp_args *args = (btree_cmp_args *)aux;
149     fdb_custom_cmp_variable cmp = (fdb_custom_cmp_variable)args->aux;
150 
151     is_key1_inf = _is_inf_key(key1);
152     is_key2_inf = _is_inf_key(key2);
153     if (is_key1_inf && is_key2_inf) { // both are infinite
154         return 0;
155     } else if (!is_key1_inf && is_key2_inf) { // key2 is infinite
156         return -1;
157     } else if (is_key1_inf && !is_key2_inf) { // key1 is infinite
158         return 1;
159     }
160 
161     _get_var_key(key1, (void*)keystr1, &keylen1);
162     _get_var_key(key2, (void*)keystr2, &keylen2);
163 
164     if (keylen1 == 0 && keylen2 == 0) {
165         return 0;
166     } else if (keylen1 ==0 && keylen2 > 0) {
167         return -1;
168     } else if (keylen1 > 0 && keylen2 == 0) {
169         return 1;
170     }
171 
172     return cmp(keystr1, keylen1, keystr2, keylen2);
173 }
174 
fdb_fetch_header(uint64_t version, void *header_buf, bid_t *trie_root_bid, bid_t *seq_root_bid, bid_t *stale_root_bid, uint64_t *ndocs, uint64_t *ndeletes, uint64_t *nlivenodes, uint64_t *datasize, uint64_t *last_wal_flush_hdr_bid, uint64_t *kv_info_offset, uint64_t *header_flags, char **new_filename, char **old_filename)175 void fdb_fetch_header(uint64_t version,
176                       void *header_buf,
177                       bid_t *trie_root_bid,
178                       bid_t *seq_root_bid,
179                       bid_t *stale_root_bid,
180                       uint64_t *ndocs,
181                       uint64_t *ndeletes,
182                       uint64_t *nlivenodes,
183                       uint64_t *datasize,
184                       uint64_t *last_wal_flush_hdr_bid,
185                       uint64_t *kv_info_offset,
186                       uint64_t *header_flags,
187                       char **new_filename,
188                       char **old_filename)
189 {
190     size_t offset = 0;
191     uint16_t new_filename_len;
192     uint16_t old_filename_len;
193 
194     seq_memcpy(trie_root_bid, (uint8_t *)header_buf + offset,
195                sizeof(bid_t), offset);
196     *trie_root_bid = _endian_decode(*trie_root_bid);
197 
198     seq_memcpy(seq_root_bid, (uint8_t *)header_buf + offset,
199                sizeof(bid_t), offset);
200     *seq_root_bid = _endian_decode(*seq_root_bid);
201 
202     if (ver_staletree_support(version)) {
203         seq_memcpy(stale_root_bid, (uint8_t *)header_buf + offset,
204                    sizeof(bid_t), offset);
205         *stale_root_bid = _endian_decode(*stale_root_bid);
206     } else {
207         *stale_root_bid = BLK_NOT_FOUND;
208     }
209 
210     seq_memcpy(ndocs, (uint8_t *)header_buf + offset,
211                sizeof(uint64_t), offset);
212     *ndocs = _endian_decode(*ndocs);
213     if (ver_is_atleast_magic_001(version)) {
214         seq_memcpy(ndeletes, (uint8_t *)header_buf + offset,
215                    sizeof(uint64_t), offset);
216         *ndeletes = _endian_decode(*ndeletes);
217     } else {
218         *ndeletes = 0;
219     }
220 
221     seq_memcpy(nlivenodes, (uint8_t *)header_buf + offset,
222                sizeof(uint64_t), offset);
223     *nlivenodes = _endian_decode(*nlivenodes);
224 
225     seq_memcpy(datasize, (uint8_t *)header_buf + offset,
226                sizeof(uint64_t), offset);
227     *datasize = _endian_decode(*datasize);
228 
229     seq_memcpy(last_wal_flush_hdr_bid, (uint8_t *)header_buf + offset,
230                sizeof(uint64_t), offset);
231     *last_wal_flush_hdr_bid = _endian_decode(*last_wal_flush_hdr_bid);
232 
233     seq_memcpy(kv_info_offset, (uint8_t *)header_buf + offset,
234                sizeof(uint64_t), offset);
235     *kv_info_offset = _endian_decode(*kv_info_offset);
236 
237     seq_memcpy(header_flags, (uint8_t *)header_buf + offset,
238                sizeof(uint64_t), offset);
239     *header_flags = _endian_decode(*header_flags);
240 
241     seq_memcpy(&new_filename_len, (uint8_t *)header_buf + offset,
242                sizeof(new_filename_len), offset);
243     new_filename_len = _endian_decode(new_filename_len);
244     seq_memcpy(&old_filename_len, (uint8_t *)header_buf + offset,
245                sizeof(old_filename_len), offset);
246     old_filename_len = _endian_decode(old_filename_len);
247     if (new_filename_len) {
248         *new_filename = (char*)((uint8_t *)header_buf + offset);
249     } else {
250         *new_filename = NULL;
251     }
252     offset += new_filename_len;
253     if (old_filename && old_filename_len) {
254         *old_filename = (char *) malloc(old_filename_len);
255         seq_memcpy(*old_filename,
256                    (uint8_t *)header_buf + offset,
257                    old_filename_len, offset);
258     }
259 }
260 
261 // read the revnum of the given header of BID
_fdb_get_header_revnum(fdb_kvs_handle *handle, bid_t bid)262 INLINE filemgr_header_revnum_t _fdb_get_header_revnum(fdb_kvs_handle *handle, bid_t bid)
263 {
264     uint8_t *buf = alca(uint8_t, handle->file->blocksize);
265     uint64_t version;
266     size_t header_len;
267     fdb_seqnum_t seqnum;
268     filemgr_header_revnum_t revnum = 0;
269     fdb_status fs;
270 
271     fs = filemgr_fetch_header(handle->file, bid, buf, &header_len,
272                               &seqnum, &revnum, NULL, &version, NULL,
273                               &handle->log_callback);
274     if (fs != FDB_RESULT_SUCCESS) {
275         return 0;
276     }
277     return revnum;
278 }
279 
_fdb_get_bmp_revnum(fdb_kvs_handle *handle, bid_t bid)280 INLINE filemgr_header_revnum_t _fdb_get_bmp_revnum(fdb_kvs_handle *handle, bid_t bid)
281 {
282     uint8_t *buf = alca(uint8_t, handle->file->blocksize);
283     uint64_t version, bmp_revnum = 0;
284     size_t header_len;
285     fdb_seqnum_t seqnum;
286     filemgr_header_revnum_t revnum;
287     fdb_status fs;
288 
289     fs = filemgr_fetch_header(handle->file, bid, buf, &header_len,
290                               &seqnum, &revnum, NULL, &version, &bmp_revnum,
291                               &handle->log_callback);
292     if (fs != FDB_RESULT_SUCCESS) {
293         return 0;
294     }
295     return bmp_revnum;
296 }
297 
fdb_dummy_log_callback(int err_code, const char *err_msg, void *ctx_data)298 void fdb_dummy_log_callback(int err_code, const char *err_msg, void *ctx_data)
299 {
300     (void)err_code;
301     (void)err_msg;
302     (void)ctx_data;
303     return;
304 }
305 
_fdb_restore_wal(fdb_kvs_handle *handle, fdb_restore_mode_t mode, bid_t hdr_bid, fdb_kvs_id_t kv_id_req)306 INLINE void _fdb_restore_wal(fdb_kvs_handle *handle,
307                              fdb_restore_mode_t mode,
308                              bid_t hdr_bid,
309                              fdb_kvs_id_t kv_id_req)
310 {
311     struct filemgr *file = handle->file;
312     uint32_t blocksize = handle->file->blocksize;
313     uint64_t last_wal_flush_hdr_bid = handle->last_wal_flush_hdr_bid;
314     uint64_t hdr_off = hdr_bid * FDB_BLOCKSIZE;
315     uint64_t offset = 0; //assume everything from first block needs restoration
316     uint64_t filesize = filemgr_get_pos(handle->file);
317     uint64_t doc_scan_limit;
318     uint64_t start_bmp_revnum, stop_bmp_revnum;
319     uint64_t cur_bmp_revnum = (uint64_t)-1;
320     bid_t next_doc_block = BLK_NOT_FOUND;
321     struct _fdb_key_cmp_info cmp_info;
322     err_log_callback *log_callback;
323 
324     if (mode == FDB_RESTORE_NORMAL && !handle->shandle &&
325         !wal_try_restore(handle->file)) { // Atomically try to restore WAL
326         // Some other thread or previous open had successfully initialized WAL
327         // We can simply return here
328         return;
329     }
330 
331     if (!hdr_off) { // Nothing to do if we don't have a header block offset
332         return;
333     }
334 
335     if (last_wal_flush_hdr_bid != BLK_NOT_FOUND) {
336         offset = (last_wal_flush_hdr_bid + 1) * blocksize;
337     }
338 
339     // If a valid last header was retrieved and it matches the current header
340     if (hdr_off == offset || hdr_bid == last_wal_flush_hdr_bid) {
341         return; // No WAL section in the file
342     }
343 
344     if (mode == FDB_RESTORE_NORMAL && !handle->shandle) {
345         // for normal WAL restore, set status to dirty
346         // (only when the previous status is clean or dirty)
347         wal_set_dirty_status(handle->file, FDB_WAL_DIRTY, true);
348     }
349 
350     // Temporarily disable the error logging callback as there are false positive
351     // checksum errors in docio_read_doc.
352     // TODO: Need to adapt docio_read_doc to separate false checksum errors.
353     err_log_callback dummy_cb;
354     log_callback = handle->dhandle->log_callback;
355     dummy_cb.callback = fdb_dummy_log_callback;
356     dummy_cb.ctx_data = NULL;
357     handle->dhandle->log_callback = &dummy_cb;
358 
359     if (!handle->shandle) {
360         filemgr_mutex_lock(file);
361     }
362     cmp_info.kvs_config = handle->kvs_config;
363     cmp_info.kvs = handle->kvs;
364 
365     start_bmp_revnum = _fdb_get_bmp_revnum(handle, last_wal_flush_hdr_bid);
366     stop_bmp_revnum= _fdb_get_bmp_revnum(handle, hdr_bid);
367     cur_bmp_revnum = start_bmp_revnum;
368 
369     // A: reused blocks during the 1st block reclaim (bmp_revnum: 1)
370     // B: reused blocks during the 2nd block reclaim (bmp_revnum: 2)
371     // otherwise: live block (bmp_revnum: 0)
372     //  1 2   3    4    5 6  7  8   9  10
373     // +-------------------------------------------+
374     // |  BBBBAAAAABBBBB  AAABBB    AAA            |
375     // +-------------------------------------------+
376     //              ^                     ^
377     //              hdr_bid               last_wal_flush
378     //
379     // scan order: 1 -> 5 -> 8 -> 10 -> 3 -> 6 -> 9 -> 2 -> 4 -> 7
380     // iteration #1: scan docs with bmp_revnum==0 in [last_wal_flush ~ filesize]
381     // iteration #2: scan docs with bmp_revnum==1 in [0 ~ filesize]
382     // iteration #3: scan docs with bmp_revnum==2 in [0 ~ hdr_bid]
383 
384     do {
385         if (cur_bmp_revnum > stop_bmp_revnum) {
386             break;
387         } else if (cur_bmp_revnum == stop_bmp_revnum) {
388 
389             bid_t sb_last_hdr_bid = BLK_NOT_FOUND;
390             if (handle->file->sb) {
391                 sb_last_hdr_bid = atomic_get_uint64_t(&handle->file->sb->last_hdr_bid);
392             }
393             if (!handle->shandle && handle->file->sb &&
394                 sb_last_hdr_bid != BLK_NOT_FOUND) {
395                 hdr_off = (sb_last_hdr_bid+1) * blocksize;
396             }
397 
398             doc_scan_limit = hdr_off;
399             if (offset >= hdr_off) {
400                 break;
401             }
402         } else {
403             doc_scan_limit = filesize;
404         }
405 
406         if (!docio_check_buffer(handle->dhandle, offset / blocksize,
407                                 cur_bmp_revnum)) {
408             // not a document block .. move to next block
409         } else {
410             do {
411                 struct docio_object doc;
412                 int64_t _offset;
413                 uint64_t doc_offset;
414                 memset(&doc, 0, sizeof(doc));
415                 _offset = docio_read_doc(handle->dhandle, offset, &doc, true);
416                 if (_offset <= 0) { // reached unreadable doc, skip block
417                     // TODO: Need to have this function return fdb_status, so that
418                     // WAL restore operation should fail if offset < 0
419                     break;
420                 } else if ((uint64_t)_offset < offset) {
421                     // If more than one writer is appending docs concurrently,
422                     // they have their own doc block linked list and doc blocks
423                     // may not be consecutive. For example,
424                     //
425                     // Writer 1): 100 -> 102 -> 2 -> 4     | commit
426                     // Writer 2):    101 - > 103 -> 3 -> 5 |
427                     //
428                     // In this case, if we read doc BID 102, then 'offset' will jump
429                     // to doc BID 2, without reading BID 103.
430                     //
431                     // To address this issue, in case that 'offset' decreases,
432                     // remember the next doc block, and follow the doc linked list
433                     // first. After the linked list ends, 'offset' cursor will be
434                     // reset to 'next_doc_block'.
435                     next_doc_block = (offset / blocksize) + 1;
436                 }
437                 if (doc.key || (doc.length.flag & DOCIO_TXN_COMMITTED)) {
438                     // check if the doc is transactional or not, and
439                     // also check if the doc contains system info
440                     if (!(doc.length.flag & DOCIO_TXN_DIRTY) &&
441                         !(doc.length.flag & DOCIO_SYSTEM)) {
442                         if (doc.length.flag & DOCIO_TXN_COMMITTED) {
443                             // commit mark .. read doc offset
444                             doc_offset = doc.doc_offset;
445                             // read the previously skipped doc
446                             if (docio_read_doc(handle->dhandle, doc_offset, &doc, true) <= 0) {
447                                 // doc read error
448                                 free(doc.key);
449                                 free(doc.meta);
450                                 free(doc.body);
451                                 offset = _offset;
452                                 continue;
453                             }
454                         } else {
455                             doc_offset = offset;
456                         }
457 
458                         // If say a snapshot is taken on a db handle after
459                         // rollback, then skip WAL items after rollback point
460                         if ((mode == FDB_RESTORE_KV_INS || !handle->kvs) &&
461                             doc.seqnum > handle->seqnum) {
462                             free(doc.key);
463                             free(doc.meta);
464                             free(doc.body);
465                             offset = _offset;
466                             continue;
467                         }
468 
469                         // restore document
470                         fdb_doc wal_doc;
471                         wal_doc.keylen = doc.length.keylen;
472                         wal_doc.bodylen = doc.length.bodylen;
473                         wal_doc.key = doc.key;
474                         wal_doc.seqnum = doc.seqnum;
475                         wal_doc.deleted = doc.length.flag & DOCIO_DELETED;
476 
477                         if (!handle->shandle) {
478                             wal_doc.metalen = doc.length.metalen;
479                             wal_doc.meta = doc.meta;
480                             wal_doc.size_ondisk = _fdb_get_docsize(doc.length);
481 
482                             if (handle->kvs) {
483                                 // check seqnum before insert
484                                 fdb_kvs_id_t kv_id;
485                                 fdb_seqnum_t kv_seqnum;
486                                 buf2kvid(handle->config.chunksize,
487                                          wal_doc.key, &kv_id);
488 
489                                 kv_seqnum = fdb_kvs_get_seqnum(handle->file, kv_id);
490                                 if (doc.seqnum <= kv_seqnum &&
491                                         ((mode == FDB_RESTORE_KV_INS &&
492                                             kv_id == kv_id_req) ||
493                                          (mode == FDB_RESTORE_NORMAL)) ) {
494                                     // if mode is NORMAL, restore all items
495                                     // if mode is KV_INS, restore items matching ID
496                                     wal_insert(&file->global_txn, file, &cmp_info,
497                                                &wal_doc, doc_offset,
498                                                WAL_INS_WRITER);
499                                 }
500                             } else {
501                                 wal_insert(&file->global_txn, file, &cmp_info,
502                                            &wal_doc, doc_offset,
503                                            WAL_INS_WRITER);
504                             }
505                             if (doc.key) free(doc.key);
506                         } else {
507                             // snapshot
508                             if (handle->kvs) {
509                                 fdb_kvs_id_t kv_id;
510                                 buf2kvid(handle->config.chunksize,
511                                          wal_doc.key, &kv_id);
512                                 if (kv_id == handle->kvs->id) {
513                                     // snapshot: insert ID matched documents only
514                                     wal_snap_insert(handle->shandle,
515                                                     &wal_doc, doc_offset);
516                                 } else {
517                                     free(doc.key);
518                                 }
519                             } else {
520                                 wal_snap_insert(handle->shandle, &wal_doc,
521                                                 doc_offset);
522                             }
523                         }
524                         free(doc.meta);
525                         free(doc.body);
526                         offset = _offset;
527                     } else {
528                         // skip transactional document or system document
529                         free(doc.key);
530                         free(doc.meta);
531                         free(doc.body);
532                         offset = _offset;
533                         // do not break.. read next doc
534                     }
535                 } else {
536                     free(doc.key);
537                     free(doc.meta);
538                     free(doc.body);
539                     offset = _offset;
540                     break;
541                 }
542             } while (offset + sizeof(struct docio_length) < doc_scan_limit);
543         }
544 
545         if (next_doc_block != BLK_NOT_FOUND) {
546             offset = next_doc_block * blocksize;
547             next_doc_block = BLK_NOT_FOUND;
548         } else {
549             offset = ((offset / blocksize) + 1) * blocksize;
550         }
551         if (ver_superblock_support(handle->file->version) &&
552             offset >= filesize) {
553             // circular scan
554             struct superblock *sb = handle->file->sb;
555             if (sb && sb->config) {
556                 offset = blocksize * sb->config->num_sb;
557                 cur_bmp_revnum++;
558             }
559         }
560     } while(true);
561 
562     // wal commit
563     if (!handle->shandle) {
564         wal_commit(&file->global_txn, file, NULL, &handle->log_callback);
565         filemgr_mutex_unlock(file);
566     }
567     handle->dhandle->log_callback = log_callback;
568 }
569 
_fdb_recover_compaction(fdb_kvs_handle *handle, const char *new_filename)570 INLINE fdb_status _fdb_recover_compaction(fdb_kvs_handle *handle,
571                                           const char *new_filename)
572 {
573     fdb_kvs_handle new_db;
574     fdb_config config = handle->config;
575     struct filemgr *new_file;
576 
577     // As partially compacted file may contain various errors,
578     // we temporarily disable log callback for compaction recovery.
579     memset(&new_db, 0, sizeof(new_db));
580     new_db.log_callback.callback = NULL;
581     new_db.log_callback.ctx_data = NULL;
582     config.flags |= FDB_OPEN_FLAG_RDONLY;
583     new_db.fhandle = handle->fhandle;
584     new_db.kvs_config = handle->kvs_config;
585     fdb_status status = _fdb_open(&new_db, new_filename,
586                                   FDB_AFILENAME, &config);
587     if (status != FDB_RESULT_SUCCESS) {
588         return fdb_log(&handle->log_callback, status,
589                        "Error in opening a partially compacted file '%s' for recovery.",
590                        new_filename);
591     }
592 
593     new_file = new_db.file;
594 
595     if (new_file->old_filename &&
596         !strncmp(new_file->old_filename, handle->file->filename,
597                  FDB_MAX_FILENAME_LEN)) {
598         struct filemgr *old_file = handle->file;
599         // If new file has a recorded old_filename then it means that
600         // compaction has completed successfully. Mark self for deletion
601         filemgr_mutex_lock(new_file);
602 
603         status = btreeblk_end(handle->bhandle);
604         if (status != FDB_RESULT_SUCCESS) {
605             filemgr_mutex_unlock(new_file);
606             _fdb_close(&new_db);
607             return status;
608         }
609         btreeblk_free(handle->bhandle);
610         free(handle->bhandle);
611         handle->bhandle = new_db.bhandle;
612 
613         docio_free(handle->dhandle);
614         free(handle->dhandle);
615         handle->dhandle = new_db.dhandle;
616 
617         hbtrie_free(handle->trie);
618         free(handle->trie);
619         handle->trie = new_db.trie;
620 
621         wal_shutdown(handle->file, &handle->log_callback);
622         handle->file = new_file;
623 
624         if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
625             if (handle->kvs) {
626                 // multi KV instance mode
627                 hbtrie_free(handle->seqtrie);
628                 free(handle->seqtrie);
629                 if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
630                     handle->seqtrie = new_db.seqtrie;
631                 }
632             } else {
633                 free(handle->seqtree->kv_ops);
634                 free(handle->seqtree);
635                 if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
636                     handle->seqtree = new_db.seqtree;
637                 }
638             }
639         }
640         handle->staletree = new_db.staletree;
641 
642         filemgr_mutex_unlock(new_file);
643         if (new_db.kvs) {
644             fdb_kvs_info_free(&new_db);
645         }
646         fdb_log(&handle->log_callback, FDB_RESULT_FAIL_BY_COMPACTION,
647                 "Successfully used partially compacted file '%s' for recovery replacing old file %s.",
648                 new_filename, new_file->old_filename);
649         // remove self: WARNING must not close this handle if snapshots
650         // are yet to open this file
651         filemgr_remove_pending(old_file, new_db.file, &new_db.log_callback);
652         filemgr_close(old_file, 0, handle->filename, &handle->log_callback);
653         free(new_db.filename);
654         return FDB_RESULT_FAIL_BY_COMPACTION;
655     }
656 
657     // As the new file is partially compacted, it should be removed upon close.
658     // Just in-case the new file gets opened before removal, point it to the old
659     // file to ensure availability of data.
660     fdb_log(&handle->log_callback, FDB_RESULT_SUCCESS,
661             "Partially compacted file '%s' could not be used for recovery. Using old file %s.",
662                 new_filename, handle->file->filename);
663     filemgr_remove_pending(new_db.file, handle->file, &handle->log_callback);
664     _fdb_close(&new_db);
665 
666     return FDB_RESULT_SUCCESS;
667 }
668 
669 #ifndef SPIN_INITIALIZER
init_initial_lock_status()670 INLINE void init_initial_lock_status() {
671     // Note that only Windows passes through this routine
672     if (!fdb_initialized) {
673         if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
674             // atomically initialize spin lock only once
675             spin_init(&initial_lock);
676             initial_lock_status = 2;
677         } else {
678             // the others .. wait until initializing 'initial_lock' is done
679             // TODO: Need to devise a better way of synchronization on Windows
680             while (initial_lock_status != 2) {
681                 Sleep(1);
682             }
683         }
684     }
685 }
686 #endif
687 
688 LIBFDB_API
fdb_init(fdb_config *config)689 fdb_status fdb_init(fdb_config *config)
690 {
691     fdb_config _config;
692     compactor_config c_config;
693     bgflusher_config bgf_config;
694     struct filemgr_config f_config;
695 
696     if (config) {
697         if (validate_fdb_config(config)) {
698             _config = *config;
699         } else {
700             return FDB_RESULT_INVALID_CONFIG;
701         }
702     } else {
703         _config = get_default_config();
704     }
705 
706     // global initialization
707     // initialized only once at first time
708     if (!fdb_initialized) {
709 
710 #ifndef SPIN_INITIALIZER
711         init_initial_lock_status();
712 #endif
713 
714     }
715     spin_lock(&initial_lock);
716     if (!fdb_initialized) {
717 #if !defined(_ANDROID_) && !defined(__ANDROID__)
718         // Some Android devices (e.g., Nexus 6) return incorrect RAM size.
719         // We temporarily disable validity checking of block cache size
720         // on Android platform at this time.
721         double ram_size = (double) get_memory_size();
722         if (ram_size * BCACHE_MEMORY_THRESHOLD < (double) _config.buffercache_size) {
723             spin_unlock(&initial_lock);
724             return FDB_RESULT_TOO_BIG_BUFFER_CACHE;
725         }
726 #endif
727         // initialize file manager and block cache
728         f_config.blocksize = _config.blocksize;
729         f_config.ncacheblock = _config.buffercache_size / _config.blocksize;
730         f_config.seqtree_opt = _config.seqtree_opt;
731         filemgr_init(&f_config);
732         filemgr_set_lazy_file_deletion(true,
733                                        compactor_register_file_removing,
734                                        compactor_is_file_removed);
735         if (ver_superblock_support(ver_get_latest_magic())) {
736             struct sb_ops sb_ops = {sb_init, sb_get_default_config,
737                                     sb_read_latest, sb_alloc_block,
738                                     sb_bmp_is_writable, sb_get_bmp_revnum,
739                                     sb_get_min_live_revnum, sb_free};
740             filemgr_set_sb_operation(sb_ops);
741             sb_bmp_mask_init();
742         }
743 
744         // initialize compaction daemon
745         c_config.sleep_duration = _config.compactor_sleep_duration;
746         c_config.num_threads = _config.num_compactor_threads;
747         compactor_init(&c_config);
748         // initialize background flusher daemon
749         // Temporarily disable background flushers until blockcache contention
750         // issue is resolved.
751         bgf_config.num_threads = 0; //_config.num_bgflusher_threads;
752         bgflusher_init(&bgf_config);
753 
754         // Initialize breakpad
755         _dbg_handle_crashes(config->breakpad_minidump_dir);
756 
757         fdb_initialized = 1;
758     }
759     spin_unlock(&initial_lock);
760 
761     return FDB_RESULT_SUCCESS;
762 }
763 
764 LIBFDB_API
fdb_get_default_config(void)765 fdb_config fdb_get_default_config(void) {
766     return get_default_config();
767 }
768 
769 LIBFDB_API
fdb_get_default_kvs_config(void)770 fdb_kvs_config fdb_get_default_kvs_config(void) {
771     return get_default_kvs_config();
772 }
773 
774 LIBFDB_API
fdb_open(fdb_file_handle **ptr_fhandle, const char *filename, fdb_config *fconfig)775 fdb_status fdb_open(fdb_file_handle **ptr_fhandle,
776                     const char *filename,
777                     fdb_config *fconfig)
778 {
779 #ifdef _MEMPOOL
780     mempool_init();
781 #endif
782 
783     fdb_config config;
784     fdb_file_handle *fhandle;
785     fdb_kvs_handle *handle;
786     LATENCY_STAT_START();
787 
788     if (fconfig) {
789         if (validate_fdb_config(fconfig)) {
790             config = *fconfig;
791         } else {
792             return FDB_RESULT_INVALID_CONFIG;
793         }
794     } else {
795         config = get_default_config();
796     }
797 
798     fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
799     if (!fhandle) { // LCOV_EXCL_START
800         return FDB_RESULT_ALLOC_FAIL;
801     } // LCOV_EXCL_STOP
802 
803     handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
804     if (!handle) { // LCOV_EXCL_START
805         free(fhandle);
806         return FDB_RESULT_ALLOC_FAIL;
807     } // LCOV_EXCL_STOP
808 
809 #ifndef SPIN_INITIALIZER
810     init_initial_lock_status();
811 #endif
812 
813     spin_lock(&initial_lock);
814     fdb_open_inprog++;
815     spin_unlock(&initial_lock);
816 
817     atomic_init_uint8_t(&handle->handle_busy, 0);
818     handle->shandle = NULL;
819     handle->kvs_config = get_default_kvs_config();
820 
821     fdb_status fs = fdb_init(fconfig);
822     if (fs != FDB_RESULT_SUCCESS) {
823         free(handle);
824         free(fhandle);
825         spin_lock(&initial_lock);
826         fdb_open_inprog--;
827         spin_unlock(&initial_lock);
828         return fs;
829     }
830     fdb_file_handle_init(fhandle, handle);
831 
832     fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
833     if (fs == FDB_RESULT_SUCCESS) {
834         *ptr_fhandle = fhandle;
835         filemgr_fhandle_add(handle->file, fhandle);
836         LATENCY_STAT_END(handle->file, FDB_LATENCY_OPEN);
837     } else {
838         *ptr_fhandle = NULL;
839         free(handle);
840         fdb_file_handle_free(fhandle);
841     }
842     spin_lock(&initial_lock);
843     fdb_open_inprog--;
844     spin_unlock(&initial_lock);
845     return fs;
846 }
847 
848 LIBFDB_API
fdb_open_custom_cmp(fdb_file_handle **ptr_fhandle, const char *filename, fdb_config *fconfig, size_t num_functions, char **kvs_names, fdb_custom_cmp_variable *functions)849 fdb_status fdb_open_custom_cmp(fdb_file_handle **ptr_fhandle,
850                                const char *filename,
851                                fdb_config *fconfig,
852                                size_t num_functions,
853                                char **kvs_names,
854                                fdb_custom_cmp_variable *functions)
855 {
856 #ifdef _MEMPOOL
857     mempool_init();
858 #endif
859 
860     fdb_config config;
861     fdb_file_handle *fhandle;
862     fdb_kvs_handle *handle;
863 
864     if (fconfig) {
865         if (validate_fdb_config(fconfig)) {
866             config = *fconfig;
867         } else {
868             return FDB_RESULT_INVALID_CONFIG;
869         }
870     } else {
871         config = get_default_config();
872     }
873 
874     if (config.multi_kv_instances == false) {
875         // single KV instance mode does not support customized cmp function
876         return FDB_RESULT_INVALID_CONFIG;
877     }
878 
879     fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
880     if (!fhandle) { // LCOV_EXCL_START
881         return FDB_RESULT_ALLOC_FAIL;
882     } // LCOV_EXCL_STOP
883 
884     handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
885     if (!handle) { // LCOV_EXCL_START
886         free(fhandle);
887         return FDB_RESULT_ALLOC_FAIL;
888     } // LCOV_EXCL_STOP
889 
890 #ifndef SPIN_INITIALIZER
891     init_initial_lock_status();
892 #endif
893 
894     spin_lock(&initial_lock);
895     fdb_open_inprog++;
896     spin_unlock(&initial_lock);
897 
898     atomic_init_uint8_t(&handle->handle_busy, 0);
899     handle->shandle = NULL;
900     handle->kvs_config = get_default_kvs_config();
901 
902     fdb_status fs = fdb_init(fconfig);
903     if (fs != FDB_RESULT_SUCCESS) {
904         free(handle);
905         free(fhandle);
906         spin_lock(&initial_lock);
907         fdb_open_inprog--;
908         spin_unlock(&initial_lock);
909         return fs;
910     }
911     fdb_file_handle_init(fhandle, handle);
912 
913     // insert kvs_names and functions into fhandle's list
914     fdb_file_handle_parse_cmp_func(fhandle, num_functions,
915                                    kvs_names, functions);
916 
917     fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
918     if (fs == FDB_RESULT_SUCCESS) {
919         *ptr_fhandle = fhandle;
920         filemgr_fhandle_add(handle->file, fhandle);
921     } else {
922         *ptr_fhandle = NULL;
923         free(handle);
924         fdb_file_handle_free(fhandle);
925     }
926     spin_lock(&initial_lock);
927     fdb_open_inprog--;
928     spin_unlock(&initial_lock);
929     return fs;
930 }
931 
fdb_open_for_compactor(fdb_file_handle **ptr_fhandle, const char *filename, fdb_config *fconfig, struct list *cmp_func_list)932 fdb_status fdb_open_for_compactor(fdb_file_handle **ptr_fhandle,
933                                   const char *filename,
934                                   fdb_config *fconfig,
935                                   struct list *cmp_func_list)
936 {
937 #ifdef _MEMPOOL
938     mempool_init();
939 #endif
940 
941     fdb_file_handle *fhandle;
942     fdb_kvs_handle *handle;
943 
944     fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
945     if (!fhandle) { // LCOV_EXCL_START
946         return FDB_RESULT_ALLOC_FAIL;
947     } // LCOV_EXCL_STOP
948 
949     handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
950     if (!handle) { // LCOV_EXCL_START
951         free(fhandle);
952         return FDB_RESULT_ALLOC_FAIL;
953     } // LCOV_EXCL_STOP
954 
955     atomic_init_uint8_t(&handle->handle_busy, 0);
956     handle->shandle = NULL;
957 
958     fdb_file_handle_init(fhandle, handle);
959     if (cmp_func_list && list_begin(cmp_func_list)) {
960         fdb_file_handle_clone_cmp_func_list(fhandle, cmp_func_list);
961     }
962     fdb_status fs = _fdb_open(handle, filename, FDB_VFILENAME, fconfig);
963     if (fs == FDB_RESULT_SUCCESS) {
964         *ptr_fhandle = fhandle;
965         filemgr_fhandle_add(handle->file, fhandle);
966     } else {
967         *ptr_fhandle = NULL;
968         free(handle);
969         fdb_file_handle_free(fhandle);
970     }
971     return fs;
972 }
973 
974 LIBFDB_API
fdb_snapshot_open(fdb_kvs_handle *handle_in, fdb_kvs_handle **ptr_handle, fdb_seqnum_t seqnum)975 fdb_status fdb_snapshot_open(fdb_kvs_handle *handle_in,
976                              fdb_kvs_handle **ptr_handle, fdb_seqnum_t seqnum)
977 {
978 #ifdef _MEMPOOL
979     mempool_init();
980 #endif
981 
982     if (!handle_in || !ptr_handle) {
983         return FDB_RESULT_INVALID_HANDLE;
984     }
985 
986     fdb_config config = handle_in->config;
987     fdb_kvs_config kvs_config = handle_in->kvs_config;
988     fdb_kvs_id_t kv_id = 0;
989     fdb_kvs_handle *handle;
990     fdb_txn *txn = NULL;
991     fdb_status fs = FDB_RESULT_SUCCESS;
992     filemgr *file;
993     file_status_t fstatus = FILE_NORMAL;
994     struct snap_handle dummy_shandle;
995     struct _fdb_key_cmp_info cmp_info;
996     LATENCY_STAT_START();
997 
998 fdb_snapshot_open_start:
999     if (!handle_in->shandle) {
1000         fdb_check_file_reopen(handle_in, &fstatus);
1001         fdb_sync_db_header(handle_in);
1002         file = handle_in->file;
1003 
1004         if (handle_in->kvs && handle_in->kvs->type == KVS_SUB) {
1005             handle_in->seqnum = fdb_kvs_get_seqnum(file,
1006                                                    handle_in->kvs->id);
1007         } else {
1008             handle_in->seqnum = filemgr_get_seqnum(file);
1009         }
1010     } else {
1011         file = handle_in->file;
1012     }
1013 
1014     // if the max sequence number seen by this handle is lower than the
1015     // requested snapshot marker, it means the snapshot is not yet visible
1016     // even via the current fdb_kvs_handle
1017     if (seqnum != FDB_SNAPSHOT_INMEM && seqnum > handle_in->seqnum) {
1018         return FDB_RESULT_NO_DB_INSTANCE;
1019     }
1020 
1021     handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1022     if (!handle) { // LCOV_EXCL_START
1023         return FDB_RESULT_ALLOC_FAIL;
1024     } // LCOV_EXCL_STOP
1025 
1026     atomic_init_uint8_t(&handle->handle_busy, 0);
1027     handle->log_callback = handle_in->log_callback;
1028     handle->max_seqnum = seqnum;
1029     handle->fhandle = handle_in->fhandle;
1030 
1031     config.flags |= FDB_OPEN_FLAG_RDONLY;
1032     // do not perform compaction for snapshot
1033     config.compaction_mode = FDB_COMPACTION_MANUAL;
1034 
1035     // If cloning an existing snapshot handle, then rewind indexes
1036     // to its last DB header and point its avl tree to existing snapshot's tree
1037     bool clone_snapshot = false;
1038     if (handle_in->shandle) {
1039         atomic_store_uint64_t(&handle->last_hdr_bid,  // do fast rewind
1040                               atomic_get_uint64_t(&handle_in->last_hdr_bid));
1041         fs = wal_snapshot_clone(handle_in->shandle, &handle->shandle, seqnum);
1042         if (fs == FDB_RESULT_SUCCESS) {
1043             clone_snapshot = true;
1044             handle->max_seqnum = FDB_SNAPSHOT_INMEM; // temp value to skip WAL
1045         } else {
1046             fdb_log(&handle_in->log_callback, fs,
1047                     "Warning: Snapshot clone at sequence number %" _F64
1048                     "does not match its snapshot handle %" _F64
1049                     "in file '%s'.", seqnum, handle_in->seqnum,
1050                     handle_in->file->filename);
1051             free(handle);
1052             return fs;
1053         }
1054     }
1055 
1056     cmp_info.kvs_config = handle_in->kvs_config;
1057     cmp_info.kvs = handle_in->kvs;
1058 
1059     if (!handle->shandle) {
1060         txn = handle_in->fhandle->root->txn;
1061         if (!txn) {
1062             txn = &handle_in->file->global_txn;
1063         }
1064         if (handle_in->kvs) {
1065             kv_id = handle_in->kvs->id;
1066         }
1067         if (seqnum == FDB_SNAPSHOT_INMEM) {
1068             memset(&dummy_shandle, 0, sizeof(struct snap_handle));
1069             // tmp value to denote snapshot & not rollback to _fdb_open
1070             handle->shandle = &dummy_shandle; // dummy
1071         } else {
1072             fs = wal_dur_snapshot_open(seqnum, &cmp_info, file, txn,
1073                                        &handle->shandle);
1074         }
1075         if (fs != FDB_RESULT_SUCCESS) {
1076             free(handle);
1077             return fs;
1078         }
1079     }
1080 
1081     if (handle_in->kvs) {
1082         // sub-handle in multi KV instance mode
1083         if (clone_snapshot) {
1084             fs = _fdb_kvs_clone_snapshot(handle_in, handle);
1085         } else {
1086             fs = _fdb_kvs_open(handle_in->kvs->root,
1087                               &config, &kvs_config, file,
1088                               file->filename,
1089                               _fdb_kvs_get_name(handle_in, file),
1090                               handle);
1091         }
1092     } else {
1093         if (clone_snapshot) {
1094             fs = _fdb_clone_snapshot(handle_in, handle);
1095         } else {
1096             fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1097         }
1098     }
1099 
1100     if (fs == FDB_RESULT_SUCCESS) {
1101         if (seqnum == FDB_SNAPSHOT_INMEM &&
1102             !handle_in->shandle) {
1103             handle->max_seqnum = handle_in->seqnum;
1104 
1105             // synchronize dirty root nodes if exist
1106             bid_t dirty_idtree_root = BLK_NOT_FOUND;
1107             bid_t dirty_seqtree_root = BLK_NOT_FOUND;
1108             struct filemgr_dirty_update_node *dirty_update;
1109 
1110             dirty_update = filemgr_dirty_update_get_latest(handle->file);
1111             btreeblk_set_dirty_update(handle->bhandle, dirty_update);
1112 
1113             if (dirty_update) {
1114                 filemgr_dirty_update_get_root(handle->file, dirty_update,
1115                                        &dirty_idtree_root, &dirty_seqtree_root);
1116                 _fdb_import_dirty_root(handle, dirty_idtree_root,
1117                                        dirty_seqtree_root);
1118                 btreeblk_discard_blocks(handle->bhandle);
1119             }
1120             // Having synced the dirty root, make an in-memory WAL snapshot
1121             // TODO: Re-enable WAL sharing once ready...
1122 #ifdef _MVCC_WAL_ENABLE
1123             fs = wal_snapshot_open(handle->file, txn, kv_id, seqnum,
1124                                    &cmp_info, &handle->shandle);
1125 #else
1126             fs = wal_dur_snapshot_open(handle->seqnum, &cmp_info, file, txn,
1127                                        &handle->shandle);
1128             if (fs == FDB_RESULT_SUCCESS) {
1129                 fs = wal_copyto_snapshot(file, handle->shandle,
1130                                         (bool)handle_in->kvs);
1131             }
1132             (void)kv_id;
1133 #endif // _MVCC_WAL_ENABLE
1134         } else if (clone_snapshot) {
1135             // Snapshot is created on the other snapshot handle
1136 
1137             handle->max_seqnum = handle_in->seqnum;
1138 
1139             if (seqnum == FDB_SNAPSHOT_INMEM) {
1140                 // in-memory snapshot
1141                 // Clone dirty root nodes from the source snapshot by incrementing
1142                 // their ref counters
1143                 handle->trie->root_bid = handle_in->trie->root_bid;
1144                 if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1145                     if (handle->kvs) {
1146                         handle->seqtrie->root_bid = handle_in->seqtrie->root_bid;
1147                     } else {
1148                         handle->seqtree->root_bid = handle_in->seqtree->root_bid;
1149                     }
1150                 }
1151                 btreeblk_discard_blocks(handle->bhandle);
1152 
1153                 // increase ref count for dirty update
1154                 struct filemgr_dirty_update_node *dirty_update;
1155                 dirty_update = btreeblk_get_dirty_update(handle_in->bhandle);
1156                 filemgr_dirty_update_inc_ref_count(dirty_update);
1157                 btreeblk_set_dirty_update(handle->bhandle, dirty_update);
1158             }
1159         }
1160         *ptr_handle = handle;
1161     } else {
1162         *ptr_handle = NULL;
1163         if (clone_snapshot || seqnum != FDB_SNAPSHOT_INMEM) {
1164             wal_snapshot_close(handle->shandle, handle->file);
1165         }
1166         free(handle);
1167         // If compactor thread had finished compaction just before this routine
1168         // calls _fdb_open, then it is possible that the snapshot's DB header
1169         // is only present in the new_file. So we must retry the snapshot
1170         // open attempt IFF _fdb_open indicates FDB_RESULT_NO_DB_INSTANCE..
1171         if (fs == FDB_RESULT_NO_DB_INSTANCE && fstatus == FILE_COMPACT_OLD) {
1172             if (filemgr_get_file_status(file) == FILE_REMOVED_PENDING) {
1173                 goto fdb_snapshot_open_start;
1174             }
1175         }
1176     }
1177 
1178     if (handle_in->shandle) {
1179         LATENCY_STAT_END(file, FDB_LATENCY_SNAP_CLONE);
1180     } else if (seqnum == FDB_SNAPSHOT_INMEM) {
1181         LATENCY_STAT_END(file, FDB_LATENCY_SNAP_INMEM);
1182     } else {
1183         LATENCY_STAT_END(file, FDB_LATENCY_SNAP_DUR);
1184     }
1185     return fs;
1186 }
1187 
1188 static fdb_status _fdb_reset(fdb_kvs_handle *handle, fdb_kvs_handle *handle_in);
1189 
1190 LIBFDB_API
fdb_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)1191 fdb_status fdb_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
1192 {
1193 #ifdef _MEMPOOL
1194     mempool_init();
1195 #endif
1196 
1197     fdb_config config;
1198     fdb_kvs_handle *handle_in, *handle;
1199     fdb_status fs;
1200     fdb_seqnum_t old_seqnum;
1201 
1202     if (!handle_ptr) {
1203         return FDB_RESULT_INVALID_HANDLE;
1204     }
1205 
1206     handle_in = *handle_ptr;
1207 
1208     if (!handle_in) {
1209         return FDB_RESULT_INVALID_HANDLE;
1210     }
1211 
1212     config = handle_in->config;
1213 
1214     if (handle_in->kvs) {
1215         return fdb_kvs_rollback(handle_ptr, seqnum);
1216     }
1217 
1218     if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
1219         return fdb_log(&handle_in->log_callback, FDB_RESULT_RONLY_VIOLATION,
1220                        "Warning: Rollback is not allowed on the read-only DB file '%s'.",
1221                        handle_in->file->filename);
1222     }
1223 
1224     if (!atomic_cas_uint8_t(&handle_in->handle_busy, 0, 1)) {
1225         return FDB_RESULT_HANDLE_BUSY;
1226     }
1227 
1228     filemgr_mutex_lock(handle_in->file);
1229     filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
1230     // All transactions should be closed before rollback
1231     if (wal_txn_exists(handle_in->file)) {
1232         filemgr_set_rollback(handle_in->file, 0);
1233         filemgr_mutex_unlock(handle_in->file);
1234         atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1235         return FDB_RESULT_FAIL_BY_TRANSACTION;
1236     }
1237 
1238     // If compaction is running, wait until it is aborted.
1239     // TODO: Find a better way of waiting for the compaction abortion.
1240     unsigned int sleep_time = 10000; // 10 ms.
1241     file_status_t fstatus = filemgr_get_file_status(handle_in->file);
1242     while (fstatus == FILE_COMPACT_OLD) {
1243         filemgr_mutex_unlock(handle_in->file);
1244         decaying_usleep(&sleep_time, 1000000);
1245         filemgr_mutex_lock(handle_in->file);
1246         fstatus = filemgr_get_file_status(handle_in->file);
1247     }
1248     if (fstatus == FILE_REMOVED_PENDING) {
1249         filemgr_mutex_unlock(handle_in->file);
1250         fdb_check_file_reopen(handle_in, NULL);
1251     } else {
1252         filemgr_mutex_unlock(handle_in->file);
1253     }
1254 
1255     fdb_sync_db_header(handle_in);
1256 
1257     // if the max sequence number seen by this handle is lower than the
1258     // requested snapshot marker, it means the snapshot is not yet visible
1259     // even via the current fdb_kvs_handle
1260     if (seqnum > handle_in->seqnum) {
1261         filemgr_set_rollback(handle_in->file, 0); // allow mutations
1262         atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1263         return FDB_RESULT_NO_DB_INSTANCE;
1264     }
1265 
1266     handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1267     if (!handle) { // LCOV_EXCL_START
1268         atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1269         return FDB_RESULT_ALLOC_FAIL;
1270     } // LCOV_EXCL_STOP
1271 
1272     atomic_init_uint8_t(&handle->handle_busy, 0);
1273     handle->log_callback = handle_in->log_callback;
1274     handle->fhandle = handle_in->fhandle;
1275     if (seqnum == 0) {
1276         fs = _fdb_reset(handle, handle_in);
1277     } else {
1278         handle->max_seqnum = seqnum;
1279         fs = _fdb_open(handle, handle_in->file->filename, FDB_AFILENAME,
1280                        &config);
1281     }
1282 
1283     filemgr_set_rollback(handle_in->file, 0); // allow mutations
1284     if (fs == FDB_RESULT_SUCCESS) {
1285         // rollback the file's sequence number
1286         filemgr_mutex_lock(handle_in->file);
1287         old_seqnum = filemgr_get_seqnum(handle_in->file);
1288         filemgr_set_seqnum(handle_in->file, seqnum);
1289         filemgr_mutex_unlock(handle_in->file);
1290 
1291         fs = _fdb_commit(handle, FDB_COMMIT_MANUAL_WAL_FLUSH,
1292                 !(handle_in->config.durability_opt & FDB_DRB_ASYNC));
1293         if (fs == FDB_RESULT_SUCCESS) {
1294             if (handle_in->txn) {
1295                 handle->txn = handle_in->txn;
1296                 handle_in->txn = NULL;
1297             }
1298             // Close, unlink and free the caller's rollback handle.
1299             _fdb_kvs_close(handle_in);
1300             free(handle_in);
1301             // Link the newly opened handle into the file handle's list
1302             _fdb_kvs_createNLinkKVHandle(handle->fhandle, handle);
1303             handle->max_seqnum = 0;
1304             handle->seqnum = seqnum;
1305             // Set the newly opened rolled-back handle as caller's handle
1306             *handle_ptr = handle;
1307         } else {
1308             // cancel the rolling-back of the sequence number
1309             filemgr_mutex_lock(handle_in->file);
1310             filemgr_set_seqnum(handle_in->file, old_seqnum);
1311             filemgr_mutex_unlock(handle_in->file);
1312             free(handle);
1313             atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1314         }
1315     } else {
1316         free(handle);
1317         atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0);
1318     }
1319 
1320     return fs;
1321 }
1322 
1323 LIBFDB_API
fdb_rollback_all(fdb_file_handle *fhandle, fdb_snapshot_marker_t marker)1324 fdb_status fdb_rollback_all(fdb_file_handle *fhandle,
1325                             fdb_snapshot_marker_t marker)
1326 {
1327 #ifdef _MEMPOOL
1328     mempool_init();
1329 #endif
1330 
1331     fdb_config config;
1332     fdb_kvs_handle *super_handle;
1333     fdb_kvs_handle rhandle;
1334     fdb_kvs_handle *handle = &rhandle;
1335     struct filemgr *file;
1336     fdb_kvs_config kvs_config;
1337     fdb_status fs;
1338     err_log_callback log_callback;
1339     struct kvs_info *kvs;
1340     struct snap_handle shandle; // dummy snap handle
1341 
1342     if (!fhandle) {
1343         return FDB_RESULT_INVALID_HANDLE;
1344     }
1345 
1346     super_handle = fhandle->root;
1347     kvs = super_handle->kvs;
1348 
1349     // fdb_rollback_all cannot be allowed when there are kv store instances
1350     // still open, because we do not have means of invalidating open kv handles
1351     // which may not be present in the rollback point
1352     if (kvs && _fdb_kvs_is_busy(fhandle)) {
1353         return FDB_RESULT_KV_STORE_BUSY;
1354     }
1355     file = super_handle->file;
1356     config = super_handle->config;
1357     kvs_config = super_handle->kvs_config;
1358     log_callback = super_handle->log_callback;
1359 
1360     if (super_handle->config.flags & FDB_OPEN_FLAG_RDONLY) {
1361         return fdb_log(&super_handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
1362                        "Warning: Rollback is not allowed on the read-only DB file '%s'.",
1363                        super_handle->file->filename);
1364     }
1365 
1366     filemgr_mutex_lock(super_handle->file);
1367     filemgr_set_rollback(super_handle->file, 1); // disallow writes operations
1368     // All transactions should be closed before rollback
1369     if (wal_txn_exists(super_handle->file)) {
1370         filemgr_set_rollback(super_handle->file, 0);
1371         filemgr_mutex_unlock(super_handle->file);
1372         return FDB_RESULT_FAIL_BY_TRANSACTION;
1373     }
1374 
1375     // If compaction is running, wait until it is aborted.
1376     // TODO: Find a better way of waiting for the compaction abortion.
1377     unsigned int sleep_time = 10000; // 10 ms.
1378     file_status_t fstatus = filemgr_get_file_status(super_handle->file);
1379     while (fstatus == FILE_COMPACT_OLD) {
1380         filemgr_mutex_unlock(super_handle->file);
1381         decaying_usleep(&sleep_time, 1000000);
1382         filemgr_mutex_lock(super_handle->file);
1383         fstatus = filemgr_get_file_status(super_handle->file);
1384     }
1385     if (fstatus == FILE_REMOVED_PENDING) {
1386         filemgr_mutex_unlock(super_handle->file);
1387         fdb_check_file_reopen(super_handle, NULL);
1388     } else {
1389         filemgr_mutex_unlock(super_handle->file);
1390     }
1391 
1392     fdb_sync_db_header(super_handle);
1393     // Shutdown WAL discarding entries from all KV Stores..
1394     fs = wal_shutdown(super_handle->file, &super_handle->log_callback);
1395     if (fs != FDB_RESULT_SUCCESS) {
1396         return fs;
1397     }
1398 
1399     memset(handle, 0, sizeof(fdb_kvs_handle));
1400     memset(&shandle, 0, sizeof(struct snap_handle));
1401     handle->log_callback = log_callback;
1402     handle->fhandle = fhandle;
1403     // Fast rewind on open...
1404     atomic_store_uint64_t(&handle->last_hdr_bid, (bid_t)marker);
1405     handle->max_seqnum = FDB_SNAPSHOT_INMEM; // Prevent WAL restore on open
1406     handle->shandle = &shandle; // a dummy handle to prevent WAL restore
1407     if (kvs) {
1408         fdb_kvs_header_free(file); // KV header will be recreated below.
1409         handle->kvs = kvs; // re-use super_handle's kvs info
1410         handle->kvs_config = kvs_config;
1411     }
1412     handle->config = config;
1413 
1414     fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1415 
1416     if (handle->config.multi_kv_instances) {
1417         filemgr_mutex_lock(handle->file);
1418         fdb_kvs_header_create(handle->file);
1419         fdb_kvs_header_read(handle->file->kv_header, handle->dhandle,
1420                             handle->kv_info_offset,
1421                             handle->file->version, false);
1422         filemgr_mutex_unlock(handle->file);
1423     }
1424 
1425     filemgr_set_rollback(file, 0); // allow mutations
1426     handle->shandle = NULL; // just a dummy handle never allocated
1427 
1428     if (fs == FDB_RESULT_SUCCESS) {
1429         fdb_seqnum_t old_seqnum;
1430         // Restore WAL for all KV instances...
1431         _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, (bid_t)marker, 0);
1432 
1433         // rollback the file's sequence number
1434         filemgr_mutex_lock(file);
1435         old_seqnum = filemgr_get_seqnum(file);
1436         filemgr_set_seqnum(file, handle->seqnum);
1437         filemgr_mutex_unlock(file);
1438 
1439         fs = _fdb_commit(handle, FDB_COMMIT_NORMAL,
1440                          !(handle->config.durability_opt & FDB_DRB_ASYNC));
1441         if (fs == FDB_RESULT_SUCCESS) {
1442             _fdb_close(super_handle);
1443             *super_handle = *handle;
1444         } else {
1445             filemgr_mutex_lock(file);
1446             filemgr_set_seqnum(file, old_seqnum);
1447             filemgr_mutex_unlock(file);
1448         }
1449     } else { // Rollback failed, restore KV header
1450         fdb_kvs_header_create(file);
1451         fdb_kvs_header_read(file->kv_header, super_handle->dhandle,
1452                             super_handle->kv_info_offset,
1453                             ver_get_latest_magic(),
1454                             false);
1455     }
1456 
1457     return fs;
1458 }
1459 
_fdb_init_file_config(const fdb_config *config, struct filemgr_config *fconfig)1460 static void _fdb_init_file_config(const fdb_config *config,
1461                                   struct filemgr_config *fconfig) {
1462     fconfig->blocksize = config->blocksize;
1463     fconfig->ncacheblock = config->buffercache_size / config->blocksize;
1464     fconfig->chunksize = config->chunksize;
1465 
1466     fconfig->options = 0x0;
1467     fconfig->seqtree_opt = config->seqtree_opt;
1468 
1469     if (config->flags & FDB_OPEN_FLAG_CREATE) {
1470         fconfig->options |= FILEMGR_CREATE;
1471     }
1472     if (config->flags & FDB_OPEN_FLAG_RDONLY) {
1473         fconfig->options |= FILEMGR_READONLY;
1474     }
1475     if (!(config->durability_opt & FDB_DRB_ASYNC)) {
1476         fconfig->options |= FILEMGR_SYNC;
1477     }
1478 
1479     fconfig->flag = 0x0;
1480     if ((config->durability_opt & FDB_DRB_ODIRECT) &&
1481         config->buffercache_size) {
1482         fconfig->flag |= _ARCH_O_DIRECT;
1483     }
1484 
1485     fconfig->prefetch_duration = config->prefetch_duration;
1486     fconfig->num_wal_shards = config->num_wal_partitions;
1487     fconfig->num_bcache_shards = config->num_bcache_partitions;
1488     fconfig->encryption_key = config->encryption_key;
1489     atomic_store_uint64_t(&fconfig->block_reusing_threshold,
1490                           config->block_reusing_threshold,
1491                           std::memory_order_relaxed);
1492     atomic_store_uint64_t(&fconfig->num_keeping_headers,
1493                           config->num_keeping_headers,
1494                           std::memory_order_relaxed);
1495 }
1496 
_fdb_clone_snapshot(fdb_kvs_handle *handle_in, fdb_kvs_handle *handle_out)1497 fdb_status _fdb_clone_snapshot(fdb_kvs_handle *handle_in,
1498                                fdb_kvs_handle *handle_out)
1499 {
1500     fdb_status status;
1501 
1502     handle_out->config = handle_in->config;
1503     handle_out->kvs_config = handle_in->kvs_config;
1504     handle_out->fileops = handle_in->fileops;
1505     handle_out->file = handle_in->file;
1506     // Note that the file ref count will be decremented when the cloned snapshot
1507     // is closed through filemgr_close().
1508     filemgr_incr_ref_count(handle_out->file);
1509 
1510     bool filename_allocated = false;
1511     if (handle_out->filename) {
1512         handle_out->filename = (char *)realloc(handle_out->filename,
1513                                                strlen(handle_in->filename)+1);
1514     } else {
1515         handle_out->filename = (char*)malloc(strlen(handle_in->filename)+1);
1516         filename_allocated = true;
1517     }
1518     strcpy(handle_out->filename, handle_in->filename);
1519 
1520     // initialize the docio handle.
1521     handle_out->dhandle = (struct docio_handle *)
1522         calloc(1, sizeof(struct docio_handle));
1523     handle_out->dhandle->log_callback = &handle_out->log_callback;
1524     status = docio_init(handle_out->dhandle, handle_out->file,
1525                         handle_out->config.compress_document_body);
1526     if (status != FDB_RESULT_SUCCESS) {
1527         free(handle_out->dhandle);
1528         if (filename_allocated) {
1529             free(handle_out->filename);
1530         }
1531         return status;
1532     }
1533 
1534     // initialize the btree block handle.
1535     handle_out->btreeblkops = btreeblk_get_ops();
1536     handle_out->bhandle = (struct btreeblk_handle *)
1537         calloc(1, sizeof(struct btreeblk_handle));
1538     handle_out->bhandle->log_callback = &handle_out->log_callback;
1539     btreeblk_init(handle_out->bhandle, handle_out->file, handle_out->file->blocksize);
1540 
1541     handle_out->dirty_updates = handle_in->dirty_updates;
1542     atomic_store_uint64_t(&handle_out->cur_header_revnum, handle_in->cur_header_revnum);
1543     handle_out->last_wal_flush_hdr_bid = handle_in->last_wal_flush_hdr_bid;
1544     handle_out->kv_info_offset = handle_in->kv_info_offset;
1545     handle_out->op_stats = handle_in->op_stats;
1546 
1547     // initialize the trie handle
1548     handle_out->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1549     hbtrie_init(handle_out->trie, handle_out->config.chunksize, OFFSET_SIZE,
1550                 handle_out->file->blocksize,
1551                 handle_in->trie->root_bid, // Source snapshot's trie root bid
1552                 (void *)handle_out->bhandle, handle_out->btreeblkops,
1553                 (void *)handle_out->dhandle, _fdb_readkey_wrap);
1554     // set aux for cmp wrapping function
1555     hbtrie_set_leaf_height_limit(handle_out->trie, 0xff);
1556     hbtrie_set_leaf_cmp(handle_out->trie, _fdb_custom_cmp_wrap);
1557 
1558     if (handle_out->kvs) {
1559         hbtrie_set_map_function(handle_out->trie, fdb_kvs_find_cmp_chunk);
1560     }
1561 
1562     handle_out->seqnum = handle_in->seqnum;
1563     if (handle_out->config.seqtree_opt == FDB_SEQTREE_USE) {
1564         if (handle_out->config.multi_kv_instances) {
1565             // multi KV instance mode .. HB+trie
1566             handle_out->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1567             hbtrie_init(handle_out->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
1568                         handle_out->file->blocksize,
1569                         handle_in->seqtrie->root_bid, // Source snapshot's seqtrie root bid
1570                         (void *)handle_out->bhandle, handle_out->btreeblkops,
1571                         (void *)handle_out->dhandle, _fdb_readseq_wrap);
1572 
1573         } else {
1574             // single KV instance mode .. normal B+tree
1575             struct btree_kv_ops *seq_kv_ops =
1576                 (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
1577             seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
1578             seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
1579 
1580             handle_out->seqtree = (struct btree*)malloc(sizeof(struct btree));
1581             // Init the seq tree using the root bid of the source snapshot.
1582             btree_init_from_bid(handle_out->seqtree, (void *)handle_out->bhandle,
1583                                 handle_out->btreeblkops, seq_kv_ops,
1584                                 handle_out->config.blocksize,
1585                                 handle_in->seqtree->root_bid);
1586         }
1587     } else{
1588         handle_out->seqtree = NULL;
1589     }
1590 
1591     status = btreeblk_end(handle_out->bhandle);
1592     if (status != FDB_RESULT_SUCCESS) {
1593         const char *msg = "Snapshot clone operation fails due to the errors in "
1594             "btreeblk_end() in a database file '%s'\n";
1595         fdb_log(&handle_in->log_callback, status, msg, handle_in->file->filename);
1596     }
1597 
1598     return status;
1599 }
1600 
_fdb_open(fdb_kvs_handle *handle, const char *filename, fdb_filename_mode_t filename_mode, const fdb_config *config)1601 fdb_status _fdb_open(fdb_kvs_handle *handle,
1602                      const char *filename,
1603                      fdb_filename_mode_t filename_mode,
1604                      const fdb_config *config)
1605 {
1606     struct filemgr_config fconfig;
1607     struct kvs_stat stat, empty_stat;
1608     bid_t trie_root_bid = BLK_NOT_FOUND;
1609     bid_t seq_root_bid = BLK_NOT_FOUND;
1610     bid_t stale_root_bid = BLK_NOT_FOUND;
1611     fdb_seqnum_t seqnum = 0;
1612     filemgr_header_revnum_t header_revnum = 0;
1613     filemgr_header_revnum_t latest_header_revnum = 0;
1614     fdb_seqtree_opt_t seqtree_opt = config->seqtree_opt;
1615     uint64_t ndocs = 0;
1616     uint64_t ndeletes = 0;
1617     uint64_t datasize = 0;
1618     uint64_t deltasize = 0;
1619     uint64_t last_wal_flush_hdr_bid = BLK_NOT_FOUND;
1620     uint64_t kv_info_offset = BLK_NOT_FOUND;
1621     uint64_t version;
1622     uint64_t header_flags = 0;
1623     uint8_t header_buf[FDB_BLOCKSIZE];
1624     char *compacted_filename = NULL;
1625     char *prev_filename = NULL;
1626     size_t header_len = 0;
1627     bool multi_kv_instances = config->multi_kv_instances;
1628 
1629     uint64_t nlivenodes = 0;
1630     bid_t hdr_bid = 0; // initialize to zero for in-memory snapshot
1631     char actual_filename[FDB_MAX_FILENAME_LEN];
1632     char virtual_filename[FDB_MAX_FILENAME_LEN];
1633     char *target_filename = NULL;
1634     fdb_status status;
1635 
1636     if (filename == NULL) {
1637         return FDB_RESULT_INVALID_ARGS;
1638     }
1639     if (strlen(filename) > (FDB_MAX_FILENAME_LEN - 8)) {
1640         // filename (including path) length is supported up to
1641         // (FDB_MAX_FILENAME_LEN - 8) bytes.
1642         return FDB_RESULT_TOO_LONG_FILENAME;
1643     }
1644 
1645     if (filename_mode == FDB_VFILENAME &&
1646         !compactor_is_valid_mode(filename, (fdb_config *)config)) {
1647         return FDB_RESULT_INVALID_COMPACTION_MODE;
1648     }
1649 
1650     _fdb_init_file_config(config, &fconfig);
1651 
1652     if (filename_mode == FDB_VFILENAME) {
1653         compactor_get_actual_filename(filename, actual_filename,
1654                                       config->compaction_mode, &handle->log_callback);
1655     } else {
1656         strcpy(actual_filename, filename);
1657     }
1658 
1659     if ( config->compaction_mode == FDB_COMPACTION_MANUAL ||
1660          (config->compaction_mode == FDB_COMPACTION_AUTO   &&
1661           filename_mode == FDB_VFILENAME) ) {
1662         // 1) manual compaction mode, OR
1663         // 2) auto compaction mode + 'filename' is virtual filename
1664         // -> copy 'filename'
1665         target_filename = (char *)filename;
1666     } else {
1667         // otherwise (auto compaction mode + 'filename' is actual filename)
1668         // -> copy 'virtual_filename'
1669         compactor_get_virtual_filename(filename, virtual_filename);
1670         target_filename = virtual_filename;
1671     }
1672 
1673     // If the user is requesting legacy CRC pass that down to filemgr
1674     if(config->flags & FDB_OPEN_WITH_LEGACY_CRC) {
1675         fconfig.options |= FILEMGR_CREATE_CRC32;
1676     }
1677 
1678     handle->fileops = get_filemgr_ops();
1679     filemgr_open_result result = filemgr_open((char *)actual_filename,
1680                                               handle->fileops,
1681                                               &fconfig, &handle->log_callback);
1682     if (result.rv != FDB_RESULT_SUCCESS) {
1683         return (fdb_status) result.rv;
1684     }
1685     handle->file = result.file;
1686 
1687     if (config->compaction_mode == FDB_COMPACTION_MANUAL &&
1688         strcmp(filename, actual_filename)) {
1689         // It is in-place compacted file if
1690         // 1) compaction mode is manual, and
1691         // 2) actual filename is different to the filename given by user.
1692         // In this case, set the in-place compaction flag.
1693         filemgr_set_in_place_compaction(handle->file, true);
1694     }
1695     if (filemgr_is_in_place_compaction_set(handle->file)) {
1696         // This file was in-place compacted.
1697         // set 'handle->filename' to the original filename to trigger file renaming
1698         compactor_get_virtual_filename(filename, virtual_filename);
1699         target_filename = virtual_filename;
1700     }
1701 
1702     if (handle->filename) {
1703         handle->filename = (char *)realloc(handle->filename,
1704                                            strlen(target_filename)+1);
1705     } else {
1706         handle->filename = (char*)malloc(strlen(target_filename)+1);
1707     }
1708     strcpy(handle->filename, target_filename);
1709 
1710     // If cloning from a snapshot handle, fdb_snapshot_open would have already
1711     // set handle->last_hdr_bid to the block id of required header, so rewind..
1712     bid_t last_hdr_bid = atomic_get_uint64_t(&handle->last_hdr_bid);
1713     if (handle->shandle && last_hdr_bid) {
1714         status = filemgr_fetch_header(handle->file, last_hdr_bid,
1715                                       header_buf, &header_len, &seqnum,
1716                                       &latest_header_revnum, &deltasize, &version,
1717                                       NULL, &handle->log_callback);
1718         if (status != FDB_RESULT_SUCCESS) {
1719             free(handle->filename);
1720             handle->filename = NULL;
1721             filemgr_close(handle->file, false, handle->filename,
1722                               &handle->log_callback);
1723             return status;
1724         }
1725     } else { // Normal open
1726         filemgr_get_header(handle->file, header_buf, &header_len,
1727                            &last_hdr_bid, &seqnum, &latest_header_revnum);
1728         atomic_store_uint64_t(&handle->last_hdr_bid, last_hdr_bid);
1729         version = handle->file->version;
1730     }
1731 
1732     // initialize the docio handle so kv headers may be read
1733     handle->dhandle = (struct docio_handle *)
1734                       calloc(1, sizeof(struct docio_handle));
1735     handle->dhandle->log_callback = &handle->log_callback;
1736     status = docio_init(handle->dhandle, handle->file,
1737                         config->compress_document_body);
1738     if (status != FDB_RESULT_SUCCESS) {
1739         free(handle->dhandle);
1740         free(handle->filename);
1741         handle->filename = NULL;
1742         filemgr_close(handle->file, false, handle->filename,
1743                           &handle->log_callback);
1744         return status;
1745     }
1746 
1747     // fetch previous superblock bitmap info if exists
1748     // (this should be done after 'handle->dhandle' is initialized)
1749     if (handle->file->sb) {
1750         status = sb_bmp_fetch_doc(handle);
1751         if (status != FDB_RESULT_SUCCESS) {
1752             docio_free(handle->dhandle);
1753             free(handle->dhandle);
1754             free(handle->filename);
1755             handle->filename = NULL;
1756             filemgr_close(handle->file, false, handle->filename,
1757                               &handle->log_callback);
1758             return status;
1759         }
1760     }
1761 
1762 
1763     if (header_len > 0) {
1764         fdb_fetch_header(version, header_buf, &trie_root_bid, &seq_root_bid,
1765                          &stale_root_bid, &ndocs, &ndeletes, &nlivenodes,
1766                          &datasize, &last_wal_flush_hdr_bid, &kv_info_offset,
1767                          &header_flags, &compacted_filename, &prev_filename);
1768         // use existing setting for seqtree_opt
1769         if (header_flags & FDB_FLAG_SEQTREE_USE) {
1770             seqtree_opt = FDB_SEQTREE_USE;
1771         } else {
1772             seqtree_opt = FDB_SEQTREE_NOT_USE;
1773         }
1774         // Retrieve seqnum for multi-kv mode
1775         if (handle->kvs && handle->kvs->id > 0) {
1776             if (kv_info_offset != BLK_NOT_FOUND) {
1777                 if (!filemgr_get_kv_header(handle->file)) {
1778                     struct kvs_header *kv_header;
1779                     _fdb_kvs_header_create(&kv_header);
1780                     // KV header already exists but not loaded .. read & import
1781                     fdb_kvs_header_read(kv_header, handle->dhandle,
1782                                         kv_info_offset, version, false);
1783                     if (!filemgr_set_kv_header(handle->file, kv_header,
1784                                                fdb_kvs_header_free)) {
1785                         _fdb_kvs_header_free(kv_header);
1786                     }
1787                 }
1788                 seqnum = _fdb_kvs_get_seqnum(handle->file->kv_header,
1789                                              handle->kvs->id);
1790             } else { // no kv_info offset, ok to set seqnum to zero
1791                 seqnum = 0;
1792             }
1793         }
1794         // other flags
1795         if (header_flags & FDB_FLAG_ROOT_INITIALIZED) {
1796             handle->fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1797         }
1798         if (header_flags & FDB_FLAG_ROOT_CUSTOM_CMP) {
1799             handle->fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1800         }
1801         if (header_flags & FDB_FLAG_SUCCESSFULLY_COMPACTED) {
1802             filemgr_set_successfully_compacted(handle->file);
1803         }
1804         // use existing setting for multi KV instance mode
1805         if (kv_info_offset == BLK_NOT_FOUND) {
1806             multi_kv_instances = false;
1807         } else {
1808             multi_kv_instances = true;
1809         }
1810     }
1811 
1812     handle->config = *config;
1813     handle->config.seqtree_opt = seqtree_opt;
1814     handle->config.multi_kv_instances = multi_kv_instances;
1815 
1816     if (handle->shandle && handle->max_seqnum == FDB_SNAPSHOT_INMEM) {
1817         // Either an in-memory snapshot or cloning from an existing snapshot..
1818         hdr_bid = 0; // This prevents _fdb_restore_wal() as incoming handle's
1819                      // *_open() should have already restored it
1820     } else { // Persisted snapshot or file rollback..
1821 
1822         // get the BID of the latest block
1823         // (it is OK if the block is not a DB header)
1824         bool dirty_data_exists = false;
1825         struct superblock *sb = handle->file->sb;
1826 
1827         if (sb_bmp_exists(sb)) {
1828             dirty_data_exists = false;
1829             bid_t sb_last_hdr_bid = atomic_get_uint64_t(&sb->last_hdr_bid);
1830             if (sb_last_hdr_bid != BLK_NOT_FOUND) {
1831                 // add 1 since we subtract 1 from 'hdr_bid' below soon
1832                 hdr_bid = sb_last_hdr_bid + 1;
1833                 if (atomic_get_uint64_t(&sb->cur_alloc_bid) != hdr_bid) {
1834                     // seq number has been increased since the last commit
1835                     seqnum = fdb_kvs_get_committed_seqnum(handle);
1836                 }
1837             } else {
1838                 hdr_bid = BLK_NOT_FOUND;
1839             }
1840         } else {
1841             hdr_bid = filemgr_get_pos(handle->file) / FDB_BLOCKSIZE;
1842             dirty_data_exists = (hdr_bid >
1843                         atomic_get_uint64_t(&handle->last_hdr_bid));
1844         }
1845 
1846         if (hdr_bid == BLK_NOT_FOUND ||
1847             (sb && hdr_bid <= sb->config->num_sb)) {
1848             hdr_bid = 0;
1849         } else if (hdr_bid > 0) {
1850             --hdr_bid;
1851         }
1852 
1853         if (handle->max_seqnum) {
1854             struct kvs_stat stat_ori;
1855             // backup original stats
1856             if (handle->kvs) {
1857                 _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1858             } else {
1859                 _kvs_stat_get(handle->file, 0, &stat_ori);
1860             }
1861 
1862             if (dirty_data_exists){
1863                 // uncommitted data exists beyond the last DB header
1864                 // get the last committed seq number
1865                 fdb_seqnum_t seq_commit;
1866                 seq_commit = fdb_kvs_get_committed_seqnum(handle);
1867                 if (seq_commit == 0 || seq_commit < handle->max_seqnum) {
1868                     // In case, snapshot_open is attempted with latest uncommitted
1869                     // sequence number
1870                     header_len = 0;
1871                 } else if (seq_commit == handle->max_seqnum) {
1872                     // snapshot/rollback on the latest commit header
1873                     seqnum = seq_commit; // skip file reverse scan
1874                 }
1875                 hdr_bid = filemgr_get_header_bid(handle->file);
1876             }
1877             // Reverse scan the file to locate the DB header with seqnum marker
1878             header_revnum = latest_header_revnum;
1879             while (header_len && seqnum != handle->max_seqnum) {
1880                 hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
1881                                           header_buf, &header_len, &seqnum,
1882                                           &header_revnum, NULL, &version, NULL,
1883                                           &handle->log_callback);
1884                 if (header_len == 0) {
1885                     continue; // header doesn't exist
1886                 }
1887                 fdb_fetch_header(version, header_buf, &trie_root_bid,
1888                                  &seq_root_bid, &stale_root_bid,
1889                                  &ndocs, &ndeletes, &nlivenodes,
1890                                  &datasize, &last_wal_flush_hdr_bid,
1891                                  &kv_info_offset, &header_flags,
1892                                  &compacted_filename, NULL);
1893                 atomic_store_uint64_t(&handle->last_hdr_bid, hdr_bid);
1894 
1895                 if (!handle->kvs || handle->kvs->id == 0) {
1896                     // single KVS mode OR default KVS
1897                     if (!handle->shandle) {
1898                         // rollback
1899                         struct kvs_stat stat_dst;
1900                         _kvs_stat_get(handle->file, 0, &stat_dst);
1901                         stat_dst.ndocs = ndocs;
1902                         stat_dst.ndeletes = ndeletes;
1903                         stat_dst.datasize = datasize;
1904                         stat_dst.nlivenodes = nlivenodes;
1905                         stat_dst.deltasize = deltasize;
1906                         _kvs_stat_set(handle->file, 0, stat_dst);
1907                     }
1908                     continue;
1909                 }
1910 
1911                 int64_t doc_offset;
1912                 struct kvs_header *kv_header;
1913                 struct docio_object doc;
1914 
1915                 _fdb_kvs_header_create(&kv_header);
1916                 memset(&doc, 0, sizeof(struct docio_object));
1917                 doc_offset = docio_read_doc(handle->dhandle,
1918                                             kv_info_offset, &doc, true);
1919 
1920                 if (doc_offset <= 0) {
1921                     header_len = 0; // fail
1922                     _fdb_kvs_header_free(kv_header);
1923                 } else {
1924                     _fdb_kvs_header_import(kv_header, doc.body,
1925                                            doc.length.bodylen, version, false);
1926                     // get local sequence number for the KV instance
1927                     seqnum = _fdb_kvs_get_seqnum(kv_header,
1928                                                  handle->kvs->id);
1929                     if (!handle->shandle) {
1930                         // rollback: replace kv_header stats
1931                         // read from the current header's kv_header
1932                         struct kvs_stat stat_src, stat_dst;
1933                         _kvs_stat_get_kv_header(kv_header,
1934                                                 handle->kvs->id,
1935                                                 &stat_src);
1936                         _kvs_stat_get(handle->file,
1937                                       handle->kvs->id,
1938                                       &stat_dst);
1939                         // update ndocs, datasize, nlivenodes
1940                         // into the current file's kv_header
1941                         // Note: stats related to WAL should not be updated
1942                         //       at this time. They will be adjusted through
1943                         //       discard & restore routines below.
1944                         stat_dst.ndocs = stat_src.ndocs;
1945                         stat_dst.datasize = stat_src.datasize;
1946                         stat_dst.nlivenodes = stat_src.nlivenodes;
1947                         _kvs_stat_set(handle->file,
1948                                       handle->kvs->id,
1949                                       stat_dst);
1950                     }
1951                     _fdb_kvs_header_free(kv_header);
1952                     free_docio_object(&doc, 1, 1, 1);
1953                 }
1954             }
1955 
1956             if (header_len && // header exists
1957                 config->block_reusing_threshold > 0 && // block reuse is enabled
1958                 config->block_reusing_threshold < 100 &&
1959                 header_revnum < sb_get_min_live_revnum(handle->file)) {
1960                 // cannot perform rollback/snapshot beyond the last live header
1961                 header_len = 0;
1962             }
1963 
1964             if (!header_len) { // Marker MUST match that of DB commit!
1965                 // rollback original stats
1966                 if (handle->kvs) {
1967                     _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1968                 } else {
1969                     _kvs_stat_get(handle->file, 0, &stat_ori);
1970                 }
1971 
1972                 docio_free(handle->dhandle);
1973                 free(handle->dhandle);
1974                 free(handle->filename);
1975                 free(prev_filename);
1976                 handle->filename = NULL;
1977                 filemgr_close(handle->file, false, handle->filename,
1978                               &handle->log_callback);
1979                 return FDB_RESULT_NO_DB_INSTANCE;
1980             }
1981 
1982             if (!handle->shandle) { // Rollback mode, destroy file WAL..
1983                 if (handle->config.multi_kv_instances) {
1984                     // multi KV instance mode
1985                     // clear only WAL items belonging to the instance
1986                     wal_close_kv_ins(handle->file,
1987                                      (handle->kvs)?(handle->kvs->id):(0),
1988                                      &handle->log_callback);
1989                 } else {
1990                     wal_shutdown(handle->file, &handle->log_callback);
1991                 }
1992             }
1993         } else { // snapshot to sequence number 0 requested..
1994             if (handle->shandle) { // fdb_snapshot_open API call
1995                 if (seqnum) {
1996                     // Database currently has a non-zero seq number,
1997                     // but the snapshot was requested with a seq number zero.
1998                     docio_free(handle->dhandle);
1999                     free(handle->dhandle);
2000                     free(handle->filename);
2001                     free(prev_filename);
2002                     handle->filename = NULL;
2003                     filemgr_close(handle->file, false, handle->filename,
2004                                   &handle->log_callback);
2005                     return FDB_RESULT_NO_DB_INSTANCE;
2006                 }
2007             } // end of zero max_seqnum but non-rollback check
2008         } // end of zero max_seqnum check
2009     } // end of durable snapshot locating
2010 
2011     handle->btreeblkops = btreeblk_get_ops();
2012     handle->bhandle = (struct btreeblk_handle *)
2013                       calloc(1, sizeof(struct btreeblk_handle));
2014     handle->bhandle->log_callback = &handle->log_callback;
2015 
2016     handle->dirty_updates = 0;
2017 
2018     if (handle->config.compaction_buf_maxsize == 0) {
2019         handle->config.compaction_buf_maxsize = FDB_COMP_BUF_MINSIZE;
2020     }
2021 
2022     btreeblk_init(handle->bhandle, handle->file, handle->file->blocksize);
2023 
2024     handle->cur_header_revnum = latest_header_revnum;
2025     if (header_revnum) {
2026         if (filemgr_is_rollback_on(handle->file)) {
2027             // rollback mode
2028             // set rollback header revnum
2029             handle->rollback_revnum = header_revnum;
2030         } else {
2031             // snapshot mode (only for snapshot)
2032             handle->cur_header_revnum = header_revnum;
2033         }
2034     }
2035     handle->last_wal_flush_hdr_bid = last_wal_flush_hdr_bid;
2036 
2037     memset(&empty_stat, 0x0, sizeof(empty_stat));
2038     _kvs_stat_get(handle->file, 0, &stat);
2039     if (!memcmp(&stat, &empty_stat, sizeof(stat))) { // first open
2040         // sync (default) KVS stat with DB header
2041         stat.nlivenodes = nlivenodes;
2042         stat.ndocs = ndocs;
2043         stat.datasize = datasize;
2044         _kvs_stat_set(handle->file, 0, stat);
2045     }
2046 
2047     handle->kv_info_offset = kv_info_offset;
2048     if (handle->config.multi_kv_instances && !handle->shandle) {
2049         // multi KV instance mode
2050         filemgr_mutex_lock(handle->file);
2051         if (kv_info_offset == BLK_NOT_FOUND) {
2052             // there is no KV header .. create & initialize
2053             fdb_kvs_header_create(handle->file);
2054             // TODO: If another handle is opened before the first header is appended,
2055             // an unnecessary KV info doc is appended. We need to address it.
2056             kv_info_offset = fdb_kvs_header_append(handle);
2057         } else if (handle->file->kv_header == NULL) {
2058             // KV header already exists but not loaded .. read & import
2059             fdb_kvs_header_create(handle->file);
2060             fdb_kvs_header_read(handle->file->kv_header, handle->dhandle,
2061                                 kv_info_offset, version, false);
2062         }
2063         filemgr_mutex_unlock(handle->file);
2064 
2065         // validation check for key order of all KV stores
2066         if (handle == handle->fhandle->root) {
2067             fdb_status fs = fdb_kvs_cmp_check(handle);
2068             if (fs != FDB_RESULT_SUCCESS) { // cmp function mismatch
2069                 docio_free(handle->dhandle);
2070                 free(handle->dhandle);
2071                 btreeblk_free(handle->bhandle);
2072                 free(handle->bhandle);
2073                 free(handle->filename);
2074                 handle->filename = NULL;
2075                 filemgr_close(handle->file, false, handle->filename,
2076                               &handle->log_callback);
2077                 return fs;
2078             }
2079         }
2080     }
2081     handle->kv_info_offset = kv_info_offset;
2082 
2083     if (handle->kv_info_offset != BLK_NOT_FOUND &&
2084         handle->kvs == NULL) {
2085         // multi KV instance mode .. turn on config flag
2086         handle->config.multi_kv_instances = true;
2087         // only super handle can be opened using fdb_open(...)
2088         fdb_kvs_info_create(NULL, handle, handle->file, NULL);
2089     }
2090 
2091     if (handle->shandle) { // Populate snapshot stats..
2092         if (kv_info_offset == BLK_NOT_FOUND) { // Single KV mode
2093             memset(&handle->shandle->stat, 0x0,
2094                     sizeof(handle->shandle->stat));
2095             handle->shandle->stat.ndocs = ndocs;
2096             handle->shandle->stat.datasize = datasize;
2097             handle->shandle->stat.nlivenodes = nlivenodes;
2098         } else { // Multi KV instance mode, populate specific kv stats
2099             memset(&handle->shandle->stat, 0x0,
2100                     sizeof(handle->shandle->stat));
2101             _kvs_stat_get(handle->file, handle->kvs->id,
2102                     &handle->shandle->stat);
2103             // Since wal is restored below, we have to reset
2104             // wal stats to zero.
2105             handle->shandle->stat.wal_ndeletes = 0;
2106             handle->shandle->stat.wal_ndocs = 0;
2107         }
2108     }
2109 
2110     // initialize pointer to the global operational stats of this KV store
2111     handle->op_stats = filemgr_get_ops_stats(handle->file, handle->kvs);
2112     if (!handle->op_stats) {
2113         const char *msg = "Database open fails due to the error in retrieving "
2114             "the global operational stats of KV store in a database file '%s'\n";
2115         fdb_log(&handle->log_callback, FDB_RESULT_OPEN_FAIL, msg,
2116                 handle->file->filename);
2117         return FDB_RESULT_OPEN_FAIL;
2118     }
2119 
2120     handle->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
2121     hbtrie_init(handle->trie, config->chunksize, OFFSET_SIZE,
2122                 handle->file->blocksize, trie_root_bid,
2123                 (void *)handle->bhandle, handle->btreeblkops,
2124                 (void *)handle->dhandle, _fdb_readkey_wrap);
2125     // set aux for cmp wrapping function
2126     hbtrie_set_leaf_height_limit(handle->trie, 0xff);
2127     hbtrie_set_leaf_cmp(handle->trie, _fdb_custom_cmp_wrap);
2128 
2129     if (handle->kvs) {
2130         hbtrie_set_map_function(handle->trie, fdb_kvs_find_cmp_chunk);
2131     }
2132 
2133     handle->seqnum = seqnum;
2134     if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2135         if (handle->config.multi_kv_instances) {
2136             // multi KV instance mode .. HB+trie
2137             handle->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
2138             hbtrie_init(handle->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
2139                         handle->file->blocksize, seq_root_bid,
2140                         (void *)handle->bhandle, handle->btreeblkops,
2141                         (void *)handle->dhandle, _fdb_readseq_wrap);
2142 
2143         } else {
2144             // single KV instance mode .. normal B+tree
2145             struct btree_kv_ops *seq_kv_ops =
2146                 (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
2147             seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
2148             seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
2149 
2150             handle->seqtree = (struct btree*)malloc(sizeof(struct btree));
2151             if (seq_root_bid == BLK_NOT_FOUND) {
2152                 btree_init(handle->seqtree, (void *)handle->bhandle,
2153                            handle->btreeblkops, seq_kv_ops,
2154                            handle->config.blocksize, sizeof(fdb_seqnum_t),
2155                            OFFSET_SIZE, 0x0, NULL);
2156             }else{
2157                 if (btree_init_from_bid(handle->seqtree, (void *)handle->bhandle,
2158                                     handle->btreeblkops, seq_kv_ops,
2159                                         handle->config.blocksize, seq_root_bid) != BTREE_RESULT_SUCCESS){
2160                     _fdb_invalidate_dbheader(handle);
2161                     free(handle->dhandle);
2162                     free(handle->filename);
2163                     handle->filename = NULL;
2164                     filemgr_close(handle->file, false, handle->filename,
2165                                   &handle->log_callback);
2166                     return FDB_RECOVERABLE_ERR;
2167                 }
2168             }
2169         }
2170     }else{
2171         handle->seqtree = NULL;
2172     }
2173 
2174     // Stale-block tree (supported since MAGIC_002)
2175     // this tree is independent to multi/single KVS mode option
2176     if (ver_staletree_support(handle->file->version)) {
2177         // normal B+tree
2178         struct btree_kv_ops *stale_kv_ops =
2179             (struct btree_kv_ops *)calloc(1, sizeof(struct btree_kv_ops));
2180         stale_kv_ops = btree_kv_get_kb64_vb64(stale_kv_ops);
2181         stale_kv_ops->cmp = _cmp_uint64_t_endian_safe;
2182 
2183         handle->staletree = (struct btree*)calloc(1, sizeof(struct btree));
2184         if (stale_root_bid == BLK_NOT_FOUND) {
2185             btree_init(handle->staletree, (void *)handle->bhandle,
2186                        handle->btreeblkops, stale_kv_ops,
2187                        handle->config.blocksize, sizeof(filemgr_header_revnum_t),
2188                        OFFSET_SIZE, 0x0, NULL);
2189          }else{
2190             if (btree_init_from_bid(handle->staletree, (void *)handle->bhandle,
2191                                 handle->btreeblkops, stale_kv_ops,
2192                                     handle->config.blocksize, stale_root_bid) != BTREE_RESULT_SUCCESS){
2193                 _fdb_invalidate_dbheader(handle);
2194                 free(handle->dhandle);
2195                 free(handle->filename);
2196                 handle->filename = NULL;
2197                 filemgr_close(handle->file, false, handle->filename,
2198                               &handle->log_callback);
2199                 return FDB_RECOVERABLE_ERR;
2200             }
2201             // prefetch stale info into memory
2202             fdb_load_inmem_stale_info(handle);
2203          }
2204     } else {
2205         handle->staletree = NULL;
2206     }
2207 
2208     if (handle->config.multi_kv_instances && handle->max_seqnum) {
2209         // restore only docs belonging to the KV instance
2210         // handle->kvs should not be NULL
2211         _fdb_restore_wal(handle, FDB_RESTORE_KV_INS,
2212                          hdr_bid, (handle->kvs)?(handle->kvs->id):(0));
2213     } else {
2214         // normal restore
2215         _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, hdr_bid, 0);
2216     }
2217 
2218     if (compacted_filename &&
2219         filemgr_get_file_status(handle->file) == FILE_NORMAL &&
2220         !(config->flags & FDB_OPEN_FLAG_RDONLY)) { // do not recover read-only
2221         status = _fdb_recover_compaction(handle, compacted_filename);
2222         if (status == FDB_RESULT_FAIL_BY_COMPACTION) {
2223             // recovery would have unlinked the previous file
2224             free(prev_filename);
2225             prev_filename = NULL;
2226         }
2227         // Either
2228         // 1. recovered the newly compacted file and deleted the old file or
2229         // 2. recovery failed and are going to stick to the old file or
2230         // In both cases, the old_filename and new_filename are not needed.
2231         if (handle->file){
2232             handle->file->old_filename =  NULL;
2233             handle->file->new_filename =  NULL;
2234         }
2235     }
2236 
2237     if (prev_filename) {
2238         if (!handle->shandle && strcmp(prev_filename, handle->file->filename)) {
2239             // record the old filename into the file handle of current file
2240             // and REMOVE old file on the first open
2241             // WARNING: snapshots must have been opened before this call
2242             if (filemgr_update_file_linkage(handle->file, prev_filename, NULL)) {
2243                 // Open the old file with read-only mode.
2244                 // (Temporarily disable log callback at this time since
2245                 //  the old file might be already removed.)
2246                 err_log_callback dummy_cb;
2247                 dummy_cb.callback = fdb_dummy_log_callback;
2248                 dummy_cb.ctx_data = NULL;
2249                 fconfig.options = FILEMGR_READONLY;
2250                 filemgr_open_result result = filemgr_open(prev_filename,
2251                                                           handle->fileops,
2252                                                           &fconfig,
2253                                                           &dummy_cb);
2254                 if (result.file) {
2255                     filemgr_remove_pending(result.file, handle->file,
2256                                            &handle->log_callback);
2257                     filemgr_close(result.file, 0, handle->filename,
2258                                   &handle->log_callback);
2259                 }
2260             }
2261         }
2262         // we allocated a memory region for file->old_filename and
2263         // prev_filename would be copied to there,
2264         // so it is OK to free it here whatever the result is.
2265         free(prev_filename);
2266     }
2267 
2268     status = btreeblk_end(handle->bhandle);
2269     if (status != FDB_RESULT_SUCCESS) {
2270         // When fdb_kvs_open() is being issued in parallel with fdb_open()
2271         // it is possible that this call (fdb_open()) hits a write failure
2272         // because the btreeblock to be written was already made immutable
2273         // by the commit from the fdb_kvs_open(). Simpy ignore this error case.
2274         if (status == FDB_RESULT_WRITE_FAIL) {
2275             if (filemgr_get_header_revnum(handle->file)
2276                                              == latest_header_revnum) {
2277                 return status;
2278             } else {
2279                 status = FDB_RESULT_SUCCESS;
2280             }
2281         } else {
2282             return status;
2283         }
2284     }
2285 
2286     // do not register read-only handles
2287     if (!(config->flags & FDB_OPEN_FLAG_RDONLY)) {
2288         if (config->compaction_mode == FDB_COMPACTION_AUTO) {
2289             status = compactor_register_file(handle->file,
2290                                              (fdb_config *)config,
2291                                              &handle->log_callback);
2292         }
2293         if (status == FDB_RESULT_SUCCESS) {
2294             status = bgflusher_register_file(handle->file,
2295                                              (fdb_config *)config,
2296                                              &handle->log_callback);
2297         }
2298     }
2299 
2300     return status;
2301 }
2302 
2303 LIBFDB_API
fdb_set_log_callback(fdb_kvs_handle *handle, fdb_log_callback log_callback, void *ctx_data)2304 fdb_status fdb_set_log_callback(fdb_kvs_handle *handle,
2305                                 fdb_log_callback log_callback,
2306                                 void *ctx_data)
2307 {
2308     if (!handle) {
2309         return FDB_RESULT_INVALID_HANDLE;
2310     }
2311 
2312     handle->log_callback.callback = log_callback;
2313     handle->log_callback.ctx_data = ctx_data;
2314     return FDB_RESULT_SUCCESS;
2315 }
2316 
2317 LIBFDB_API
fdb_set_fatal_error_callback(fdb_fatal_error_callback err_callback)2318 void fdb_set_fatal_error_callback(fdb_fatal_error_callback err_callback)
2319 {
2320     fatal_error_callback = err_callback;
2321 }
2322 
2323 LIBFDB_API
fdb_doc_create(fdb_doc **doc, const void *key, size_t keylen, const void *meta, size_t metalen, const void *body, size_t bodylen)2324 fdb_status fdb_doc_create(fdb_doc **doc, const void *key, size_t keylen,
2325                           const void *meta, size_t metalen,
2326                           const void *body, size_t bodylen)
2327 {
2328     if (doc == NULL || keylen > FDB_MAX_KEYLEN ||
2329         metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
2330         return FDB_RESULT_INVALID_ARGS;
2331     }
2332 
2333     *doc = (fdb_doc*)calloc(1, sizeof(fdb_doc));
2334     if (*doc == NULL) { // LCOV_EXCL_START
2335         return FDB_RESULT_ALLOC_FAIL;
2336     } // LCOV_EXCL_STOP
2337 
2338     (*doc)->seqnum = SEQNUM_NOT_USED;
2339 
2340     if (key && keylen > 0) {
2341         (*doc)->key = (void *)malloc(keylen);
2342         if ((*doc)->key == NULL) { // LCOV_EXCL_START
2343             return FDB_RESULT_ALLOC_FAIL;
2344         } // LCOV_EXCL_STOP
2345         memcpy((*doc)->key, key, keylen);
2346         (*doc)->keylen = keylen;
2347     } else {
2348         (*doc)->key = NULL;
2349         (*doc)->keylen = 0;
2350     }
2351 
2352     if (meta && metalen > 0) {
2353         (*doc)->meta = (void *)malloc(metalen);
2354         if ((*doc)->meta == NULL) { // LCOV_EXCL_START
2355             return FDB_RESULT_ALLOC_FAIL;
2356         } // LCOV_EXCL_STOP
2357         memcpy((*doc)->meta, meta, metalen);
2358         (*doc)->metalen = metalen;
2359     } else {
2360         (*doc)->meta = NULL;
2361         (*doc)->metalen = 0;
2362     }
2363 
2364     if (body && bodylen > 0) {
2365         (*doc)->body = (void *)malloc(bodylen);
2366         if ((*doc)->body == NULL) { // LCOV_EXCL_START
2367             return FDB_RESULT_ALLOC_FAIL;
2368         } // LCOV_EXCL_STOP
2369         memcpy((*doc)->body, body, bodylen);
2370         (*doc)->bodylen = bodylen;
2371     } else {
2372         (*doc)->body = NULL;
2373         (*doc)->bodylen = 0;
2374     }
2375 
2376     return FDB_RESULT_SUCCESS;
2377 }
2378 
2379 LIBFDB_API
fdb_doc_update(fdb_doc **doc, const void *meta, size_t metalen, const void *body, size_t bodylen)2380 fdb_status fdb_doc_update(fdb_doc **doc,
2381                           const void *meta, size_t metalen,
2382                           const void *body, size_t bodylen)
2383 {
2384     if (doc == NULL ||
2385         metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
2386         return FDB_RESULT_INVALID_ARGS;
2387     }
2388     if (*doc == NULL) {
2389         return FDB_RESULT_INVALID_ARGS;
2390     }
2391 
2392     if (meta && metalen > 0) {
2393         // free previous metadata
2394         free((*doc)->meta);
2395         // allocate new metadata
2396         (*doc)->meta = (void *)malloc(metalen);
2397         if ((*doc)->meta == NULL) { // LCOV_EXCL_START
2398             return FDB_RESULT_ALLOC_FAIL;
2399         } // LCOV_EXCL_STOP
2400         memcpy((*doc)->meta, meta, metalen);
2401         (*doc)->metalen = metalen;
2402     }
2403 
2404     if (body && bodylen > 0) {
2405         // free previous body
2406         free((*doc)->body);
2407         // allocate new body
2408         (*doc)->body = (void *)malloc(bodylen);
2409         if ((*doc)->body == NULL) { // LCOV_EXCL_START
2410             return FDB_RESULT_ALLOC_FAIL;
2411         } // LCOV_EXCL_STOP
2412         memcpy((*doc)->body, body, bodylen);
2413         (*doc)->bodylen = bodylen;
2414     }
2415 
2416     (*doc)->seqnum = SEQNUM_NOT_USED;
2417     return FDB_RESULT_SUCCESS;
2418 }
2419 
2420 LIBFDB_API
fdb_doc_set_seqnum(fdb_doc *doc, const fdb_seqnum_t seqnum)2421 void fdb_doc_set_seqnum(fdb_doc *doc,
2422                         const fdb_seqnum_t seqnum)
2423 {
2424     if (doc) {
2425         doc->seqnum = seqnum;
2426         if (seqnum != SEQNUM_NOT_USED) {
2427             doc->flags |= FDB_CUSTOM_SEQNUM; // fdb_set will now use above seqnum
2428         } else { // reset custom seqnum flag, fdb_set will now generate new seqnum
2429             doc->flags &= ~FDB_CUSTOM_SEQNUM;
2430         }
2431     }
2432 }
2433 
2434 // doc MUST BE allocated by malloc
2435 LIBFDB_API
fdb_doc_free(fdb_doc *doc)2436 fdb_status fdb_doc_free(fdb_doc *doc)
2437 {
2438     if (doc) {
2439         free(doc->key);
2440         free(doc->meta);
2441         free(doc->body);
2442         free(doc);
2443     }
2444     return FDB_RESULT_SUCCESS;
2445 }
2446 
_fdb_wal_get_old_offset(void *voidhandle, struct wal_item *item, uint64_t *ret_old_offset)2447 INLINE fdb_status _fdb_wal_get_old_offset(void *voidhandle,
2448                                         struct wal_item *item,
2449                                         uint64_t *ret_old_offset)
2450 {
2451     fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
2452     uint64_t old_offset = 0;
2453 
2454     if (item->action == WAL_ACT_REMOVE) {
2455         // For immediate remove, old_offset value is critical
2456         // so that we should get an exact value.
2457         if (hbtrie_find(handle->trie,
2458                     item->header->key,
2459                     item->header->keylen,
2460                         (void*)&old_offset) == HBTRIE_CORRUPTED_RECOVERING_ERR){
2461             _fdb_invalidate_dbheader(handle);
2462             return FDB_RECOVERABLE_ERR;
2463         }
2464     } else {
2465         if (hbtrie_find_offset(handle->trie,
2466                            item->header->key,
2467                            item->header->keylen,
2468                                (void*)&old_offset) == HBTRIE_CORRUPTED_RECOVERING_ERR){
2469             _fdb_invalidate_dbheader(handle);
2470             return FDB_RECOVERABLE_ERR;
2471         }
2472     }
2473     btreeblk_end(handle->bhandle);
2474     *ret_old_offset = _endian_decode(old_offset);
2475 
2476     return FDB_RESULT_SUCCESS;
2477 }
2478 
2479 // A stale sequence number entry that can be purged from the sequence tree
2480 // during the WAL flush.
2481 struct wal_stale_seq_entry {
2482     fdb_kvs_id_t kv_id;
2483     fdb_seqnum_t seqnum;
2484     struct avl_node avl_entry;
2485 };
2486 
2487 // Delta changes in KV store stats during the WAL flush
2488 struct wal_kvs_delta_stat {
2489     fdb_kvs_id_t kv_id;
2490     int64_t nlivenodes;
2491     int64_t ndocs;
2492     int64_t ndeletes;
2493     int64_t datasize;
2494     int64_t deltasize;
2495     struct avl_node avl_entry;
2496 };
2497 
_fdb_seq_entry_cmp(struct avl_node *a, struct avl_node *b, void *aux)2498 INLINE int _fdb_seq_entry_cmp(struct avl_node *a, struct avl_node *b, void *aux)
2499 {
2500     (void) aux;
2501     struct wal_stale_seq_entry *entry1 = _get_entry(a, struct wal_stale_seq_entry,
2502                                                     avl_entry);
2503     struct wal_stale_seq_entry *entry2 = _get_entry(b, struct wal_stale_seq_entry,
2504                                                     avl_entry);
2505     if (entry1->kv_id < entry2->kv_id) {
2506         return -1;
2507     } else if (entry1->kv_id > entry2->kv_id) {
2508         return 1;
2509     } else {
2510         return _CMP_U64(entry1->seqnum, entry2->seqnum);
2511     }
2512 }
2513 
2514 
2515 // Compare function to sort KVS delta stat entries in the AVL tree during WAL flush
_kvs_delta_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)2516 INLINE int _kvs_delta_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
2517 {
2518     (void) aux;
2519     struct wal_kvs_delta_stat *stat1 = _get_entry(a, struct wal_kvs_delta_stat,
2520                                                   avl_entry);
2521     struct wal_kvs_delta_stat *stat2 = _get_entry(b, struct wal_kvs_delta_stat,
2522                                                   avl_entry);
2523     if (stat1->kv_id < stat2->kv_id) {
2524         return -1;
2525     } else if (stat1->kv_id > stat2->kv_id) {
2526         return 1;
2527     } else {
2528         return 0;
2529     }
2530 }
2531 
_fdb_wal_flush_seq_purge(void *dbhandle, struct avl_tree *stale_seqnum_list, struct avl_tree *kvs_delta_stats)2532 INLINE fdb_status _fdb_wal_flush_seq_purge(void *dbhandle,
2533                                      struct avl_tree *stale_seqnum_list,
2534                                      struct avl_tree *kvs_delta_stats)
2535 {
2536     fdb_seqnum_t _seqnum;
2537     int64_t nlivenodes;
2538     int64_t ndeltanodes;
2539     int64_t delta;
2540     uint8_t kvid_seqnum[sizeof(fdb_kvs_id_t) + sizeof(fdb_seqnum_t)];
2541     struct wal_stale_seq_entry *seq_entry;
2542     struct wal_kvs_delta_stat *delta_stat;
2543     struct wal_kvs_delta_stat kvs_delta_query;
2544 
2545     fdb_kvs_handle *handle = (fdb_kvs_handle *)dbhandle;
2546     struct avl_node *node = avl_first(stale_seqnum_list);
2547     while (node) {
2548         seq_entry = _get_entry(node, struct wal_stale_seq_entry, avl_entry);
2549         node = avl_next(node);
2550         nlivenodes = handle->bhandle->nlivenodes;
2551         ndeltanodes = handle->bhandle->ndeltanodes;
2552         _seqnum = _endian_encode(seq_entry->seqnum);
2553         if (handle->kvs) {
2554             // multi KV instance mode .. HB+trie
2555             kvid2buf(sizeof(fdb_kvs_id_t), seq_entry->kv_id, kvid_seqnum);
2556             memcpy(kvid_seqnum + sizeof(fdb_kvs_id_t), &_seqnum, sizeof(fdb_seqnum_t));
2557             if (hbtrie_remove(handle->seqtrie, (void*)kvid_seqnum,
2558                               sizeof(fdb_kvs_id_t) + sizeof(fdb_seqnum_t))
2559                 == HBTRIE_CORRUPTED_RECOVERING_ERR){
2560                 _fdb_invalidate_dbheader(handle);
2561                 return FDB_RECOVERABLE_ERR;
2562             }
2563         } else {
2564             btree_remove(handle->seqtree, (void*)&_seqnum);
2565         }
2566         btreeblk_end(handle->bhandle);
2567 
2568         kvs_delta_query.kv_id = seq_entry->kv_id;
2569         avl_node *delta_stat_node = avl_search(kvs_delta_stats,
2570                                                &kvs_delta_query.avl_entry,
2571                                                _kvs_delta_stat_cmp);
2572         if (delta_stat_node) {
2573             delta_stat = _get_entry(delta_stat_node, struct wal_kvs_delta_stat,
2574                                     avl_entry);
2575             delta = handle->bhandle->nlivenodes - nlivenodes;
2576             delta_stat->nlivenodes += delta;
2577             delta = handle->bhandle->ndeltanodes - ndeltanodes;
2578             delta *= handle->config.blocksize;
2579             delta_stat->deltasize += delta;
2580         }
2581         avl_remove(stale_seqnum_list, &seq_entry->avl_entry);
2582         free(seq_entry);
2583     }
2584     return FDB_RESULT_SUCCESS;
2585 }
2586 
_fdb_wal_flush_kvs_delta_stats(struct filemgr *file, struct avl_tree *kvs_delta_stats)2587 INLINE void _fdb_wal_flush_kvs_delta_stats(struct filemgr *file,
2588                                            struct avl_tree *kvs_delta_stats)
2589 {
2590     struct avl_node *node;
2591     struct wal_kvs_delta_stat *delta_stat;
2592     node = avl_first(kvs_delta_stats);
2593     while (node) {
2594         delta_stat = _get_entry(node, struct wal_kvs_delta_stat, avl_entry);
2595         node = avl_next(node);
2596         _kvs_stat_update_attr(file, delta_stat->kv_id,
2597                               KVS_STAT_DATASIZE, delta_stat->datasize);
2598         _kvs_stat_update_attr(file, delta_stat->kv_id,
2599                               KVS_STAT_NDOCS, delta_stat->ndocs);
2600         _kvs_stat_update_attr(file, delta_stat->kv_id,
2601                               KVS_STAT_NDELETES, delta_stat->ndeletes);
2602         _kvs_stat_update_attr(file, delta_stat->kv_id,
2603                               KVS_STAT_NLIVENODES, delta_stat->nlivenodes);
2604         _kvs_stat_update_attr(file, delta_stat->kv_id,
2605                               KVS_STAT_DELTASIZE, delta_stat->deltasize);
2606         avl_remove(kvs_delta_stats, &delta_stat->avl_entry);
2607         free(delta_stat);
2608     }
2609 }
2610 
_fdb_wal_flush_func(void *voidhandle, struct wal_item *item, struct avl_tree *stale_seqnum_list, struct avl_tree *kvs_delta_stats)2611 INLINE fdb_status _fdb_wal_flush_func(void *voidhandle,
2612                                       struct wal_item *item,
2613                                       struct avl_tree *stale_seqnum_list,
2614                                       struct avl_tree *kvs_delta_stats)
2615 {
2616     hbtrie_result hr;
2617     fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
2618     fdb_seqnum_t _seqnum;
2619     fdb_kvs_id_t kv_id = 0;
2620     fdb_status fs = FDB_RESULT_SUCCESS;
2621     uint8_t *var_key = alca(uint8_t, handle->config.chunksize);
2622     int size_id, size_seq;
2623     uint8_t *kvid_seqnum;
2624     uint64_t old_offset;
2625     int64_t _offset;
2626     int64_t delta;
2627     struct docio_object _doc;
2628     struct filemgr *file = handle->dhandle->file;
2629 
2630     memset(var_key, 0, handle->config.chunksize);
2631     if (handle->kvs) {
2632         buf2kvid(handle->config.chunksize, item->header->key, &kv_id);
2633     } else {
2634         kv_id = 0;
2635     }
2636 
2637     struct wal_kvs_delta_stat *kvs_delta_stat;
2638     struct wal_kvs_delta_stat kvs_delta_query;
2639     kvs_delta_query.kv_id = kv_id;
2640     avl_node *delta_stat_node = avl_search(kvs_delta_stats,
2641                                            &kvs_delta_query.avl_entry,
2642                                            _kvs_delta_stat_cmp);
2643     if (delta_stat_node) {
2644         kvs_delta_stat = _get_entry(delta_stat_node, struct wal_kvs_delta_stat,
2645                                     avl_entry);
2646     } else {
2647         kvs_delta_stat = (struct wal_kvs_delta_stat *)
2648             calloc(1, sizeof(struct wal_kvs_delta_stat));
2649         kvs_delta_stat->kv_id = kv_id;
2650         avl_insert(kvs_delta_stats, &kvs_delta_stat->avl_entry,
2651                    _kvs_delta_stat_cmp);
2652     }
2653 
2654     int64_t nlivenodes = handle->bhandle->nlivenodes;
2655     int64_t ndeltanodes = handle->bhandle->ndeltanodes;
2656 
2657     if (item->action == WAL_ACT_INSERT ||
2658         item->action == WAL_ACT_LOGICAL_REMOVE) {
2659         _offset = _endian_encode(item->offset);
2660 
2661         if (hbtrie_insert(handle->trie,
2662                       item->header->key,
2663                       item->header->keylen,
2664                       (void *)&_offset,
2665                           (void *)&old_offset) == HBTRIE_CORRUPTED_RECOVERING_ERR){
2666             _fdb_invalidate_dbheader(handle);
2667             return FDB_RECOVERABLE_ERR;
2668         }
2669 
2670         fs = btreeblk_end(handle->bhandle);
2671         if (fs != FDB_RESULT_SUCCESS) {
2672             return fs;
2673         }
2674         old_offset = _endian_decode(old_offset);
2675 
2676         if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2677             _seqnum = _endian_encode(item->seqnum);
2678             if (handle->kvs) {
2679                 // multi KV instance mode .. HB+trie
2680                 uint64_t old_offset_local;
2681 
2682                 size_id = sizeof(fdb_kvs_id_t);
2683                 size_seq = sizeof(fdb_seqnum_t);
2684                 kvid_seqnum = alca(uint8_t, size_id + size_seq);
2685                 kvid2buf(size_id, kv_id, kvid_seqnum);
2686                 memcpy(kvid_seqnum + size_id, &_seqnum, size_seq);
2687                 hbtrie_insert(handle->seqtrie, kvid_seqnum, size_id + size_seq,
2688                               (void *)&_offset, (void *)&old_offset_local);
2689             } else {
2690                 btree_insert(handle->seqtree, (void *)&_seqnum,
2691                              (void *)&_offset);
2692             }
2693             fs = btreeblk_end(handle->bhandle);
2694             if (fs != FDB_RESULT_SUCCESS) {
2695                 return fs;
2696             }
2697         }
2698 
2699         delta = handle->bhandle->nlivenodes - nlivenodes;
2700         kvs_delta_stat->nlivenodes += delta;
2701         delta = handle->bhandle->ndeltanodes - ndeltanodes;
2702         delta *= handle->config.blocksize;
2703         kvs_delta_stat->deltasize += delta;
2704 
2705         if (old_offset == BLK_NOT_FOUND) {
2706             if (item->action == WAL_ACT_INSERT) {
2707                 ++kvs_delta_stat->ndocs;
2708             } else { // inserted a logical deleted doc into main index
2709                 ++kvs_delta_stat->ndeletes;
2710             }
2711             kvs_delta_stat->datasize += item->doc_size;
2712             kvs_delta_stat->deltasize += item->doc_size;
2713         } else { // update or logical delete
2714             // This block is already cached when we call HBTRIE_INSERT.
2715             // No additional block access.
2716             char dummy_key[FDB_MAX_KEYLEN];
2717             _doc.meta = _doc.body = NULL;
2718             _doc.key = &dummy_key;
2719             _offset = docio_read_doc_key_meta(handle->dhandle, old_offset,
2720                                               &_doc, true);
2721             if (_offset < 0) {
2722                 return (fdb_status) _offset;
2723             } else if (_offset == 0) {
2724                 // Note that this is not an error as old_offset is pointing to
2725                 // the zero-filled region in a document block.
2726                 return FDB_RESULT_KEY_NOT_FOUND;
2727             }
2728             free(_doc.meta);
2729             filemgr_mark_stale(file, old_offset, _fdb_get_docsize(_doc.length));
2730 
2731             if (!(_doc.length.flag & DOCIO_DELETED)) {//prev doc was not deleted
2732                 if (item->action == WAL_ACT_LOGICAL_REMOVE) { // now deleted
2733                     --kvs_delta_stat->ndocs;
2734                     ++kvs_delta_stat->ndeletes;
2735                 } // else no change (prev doc was insert, now just an update)
2736             } else { // prev doc in main index was a logically deleted doc
2737                 if (item->action == WAL_ACT_INSERT) { // now undeleted
2738                     ++kvs_delta_stat->ndocs;
2739                     --kvs_delta_stat->ndeletes;
2740                 } // else no change (prev doc was deleted, now re-deleted)
2741             }
2742 
2743             delta = (int)item->doc_size - (int)_fdb_get_docsize(_doc.length);
2744             kvs_delta_stat->datasize += delta;
2745             bid_t last_hdr_bid = atomic_get_uint64_t(&handle->last_hdr_bid);
2746             if (last_hdr_bid * handle->config.blocksize < old_offset) {
2747                 kvs_delta_stat->deltasize += delta;
2748             } else {
2749                 kvs_delta_stat->deltasize += (int)item->doc_size;
2750             }
2751 
2752             // Avoid duplicates (remove previous sequence number)
2753             if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2754                 struct wal_stale_seq_entry *entry = (struct wal_stale_seq_entry *)
2755                     calloc(1, sizeof(struct wal_stale_seq_entry));
2756                 entry->kv_id = kv_id;
2757                 entry->seqnum = _doc.seqnum;
2758                 avl_insert(stale_seqnum_list, &entry->avl_entry,
2759                            _fdb_seq_entry_cmp);
2760             }
2761         }
2762     } else {
2763         // Immediate remove
2764         old_offset = item->old_offset;
2765         hr = hbtrie_remove(handle->trie, item->header->key,
2766                            item->header->keylen);
2767         if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
2768             _fdb_invalidate_dbheader(handle);
2769             return FDB_RECOVERABLE_ERR;
2770         }
2771         fs = btreeblk_end(handle->bhandle);
2772         if (fs != FDB_RESULT_SUCCESS) {
2773             return fs;
2774         }
2775 
2776         if (hr == HBTRIE_RESULT_SUCCESS) {
2777             // This block is already cached when we call _fdb_wal_get_old_offset
2778             // No additional block access should be done.
2779             char dummy_key[FDB_MAX_KEYLEN];
2780             _doc.meta = _doc.body = NULL;
2781             _doc.key = &dummy_key;
2782             _offset = docio_read_doc_key_meta(handle->dhandle, old_offset,
2783                                               &_doc, true);
2784             if (_offset < 0) {
2785                 return (fdb_status) _offset;
2786             } else if (_offset == 0) {
2787                 return FDB_RESULT_KEY_NOT_FOUND;
2788             }
2789             free(_doc.meta);
2790             filemgr_mark_stale(file, old_offset, _fdb_get_docsize(_doc.length));
2791 
2792             // Reduce the total number of docs by one
2793             --kvs_delta_stat->ndocs;
2794             if (_doc.length.flag & DOCIO_DELETED) {//prev deleted doc is dropped
2795                 --kvs_delta_stat->ndeletes;
2796             }
2797 
2798             // Reduce the total datasize by size of previously present doc
2799             delta = -(int)_fdb_get_docsize(_doc.length);
2800             kvs_delta_stat->datasize += delta;
2801             // if multiple wal flushes happen before commit, then it's possible
2802             // that this doc deleted was inserted & flushed after last commit
2803             // In this case we need to update the deltasize too which tracks
2804             // the amount of new data inserted between commits.
2805             bid_t last_hdr_bid = atomic_get_uint64_t(&handle->last_hdr_bid);
2806             if (last_hdr_bid * handle->config.blocksize < old_offset) {
2807                 kvs_delta_stat->deltasize += delta;
2808             }
2809 
2810             // remove sequence number for the removed doc
2811             if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2812                 struct wal_stale_seq_entry *entry = (struct wal_stale_seq_entry *)
2813                     calloc(1, sizeof(struct wal_stale_seq_entry));
2814                 entry->kv_id = kv_id;
2815                 entry->seqnum = _doc.seqnum;
2816                 avl_insert(stale_seqnum_list, &entry->avl_entry, _fdb_seq_entry_cmp);
2817             }
2818 
2819             // Update index size to new size after the remove operation
2820             delta = handle->bhandle->nlivenodes - nlivenodes;
2821             kvs_delta_stat->nlivenodes += delta;
2822 
2823             // ndeltanodes measures number of new index nodes created due to
2824             // this hbtrie_remove() operation
2825             delta = (int)handle->bhandle->ndeltanodes - ndeltanodes;
2826             delta *= handle->config.blocksize;
2827             kvs_delta_stat->deltasize += delta;
2828         }
2829     }
2830     return FDB_RESULT_SUCCESS;
2831 }
2832 
fdb_sync_db_header(fdb_kvs_handle *handle)2833 void fdb_sync_db_header(fdb_kvs_handle *handle)
2834 {
2835     uint64_t cur_revnum = filemgr_get_header_revnum(handle->file);
2836     if (handle->cur_header_revnum != cur_revnum) {
2837         void *header_buf = NULL;
2838         size_t header_len;
2839         bid_t hdr_bid;
2840         filemgr_header_revnum_t revnum;
2841 
2842         header_buf = filemgr_get_header(handle->file, NULL, &header_len,
2843                                         &hdr_bid, NULL, &revnum);
2844         if (header_len > 0) {
2845             uint64_t header_flags, dummy64, version;
2846             bid_t idtree_root;
2847             bid_t new_seq_root;
2848             bid_t new_stale_root;
2849             char *compacted_filename;
2850             char *prev_filename = NULL;
2851 
2852             version = handle->file->version;
2853             atomic_store_uint64_t(&handle->last_hdr_bid, hdr_bid);
2854             handle->cur_header_revnum = revnum;
2855 
2856             fdb_fetch_header(version, header_buf, &idtree_root,
2857                              &new_seq_root, &new_stale_root, &dummy64,
2858                              &dummy64, &dummy64,
2859                              &dummy64, &handle->last_wal_flush_hdr_bid,
2860                              &handle->kv_info_offset, &header_flags,
2861                              &compacted_filename, &prev_filename);
2862 
2863             if (handle->dirty_updates) {
2864                 // discard all cached writable b+tree nodes
2865                 // to avoid data inconsistency with other writers
2866                 btreeblk_discard_blocks(handle->bhandle);
2867             }
2868 
2869             handle->trie->root_bid = idtree_root;
2870 
2871             if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2872                 if (new_seq_root != handle->seqtree->root_bid) {
2873                     if (handle->config.multi_kv_instances) {
2874                         handle->seqtrie->root_bid = new_seq_root;
2875                     } else {
2876                         btree_init_from_bid(handle->seqtree,
2877                                             handle->seqtree->blk_handle,
2878                                             handle->seqtree->blk_ops,
2879                                             handle->seqtree->kv_ops,
2880                                             handle->seqtree->blksize,
2881                                             new_seq_root);
2882                     }
2883                 }
2884             }
2885 
2886             if (ver_staletree_support(version)) {