1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <fcntl.h>
22 #include <time.h>
23 #if !defined(WIN32) && !defined(_WIN32)
24 #include <sys/time.h>
25 #endif
26 
27 #include "libforestdb/forestdb.h"
28 #include "fdb_internal.h"
29 #include "filemgr.h"
30 #include "hbtrie.h"
31 #include "list.h"
32 #include "btree.h"
33 #include "btree_kv.h"
34 #include "btree_var_kv_ops.h"
35 #include "docio.h"
36 #include "btreeblock.h"
37 #include "common.h"
38 #include "wal.h"
39 #include "snapshot.h"
40 #include "filemgr_ops.h"
41 #include "configuration.h"
42 #include "internal_types.h"
43 #include "compactor.h"
44 #include "memleak.h"
45 #include "time_utils.h"
46 #include "system_resource_stats.h"
47 
48 #ifdef __DEBUG
49 #ifndef __DEBUG_FDB
50     #undef DBG
51     #undef DBGCMD
52     #undef DBGSW
53     #define DBG(...)
54     #define DBGCMD(...)
55     #define DBGSW(n, ...)
56 #endif
57 #endif
58 
59 #ifdef _TRACE_HANDLES
60 struct avl_tree open_handles;
61 static spin_t open_handle_lock;
_fdb_handle_cmp(struct avl_node *a, struct avl_node *b, void *aux)62 static int _fdb_handle_cmp(struct avl_node *a, struct avl_node *b, void *aux)
63 {
64     struct _fdb_kvs_handle *aa, *bb;
65     aa = _get_entry(a, struct _fdb_kvs_handle, avl_trace);
66     bb = _get_entry(b, struct _fdb_kvs_handle, avl_trace);
67     return (aa > bb) ? 1 : -1;
68 }
69 #endif
70 
71 static volatile uint8_t fdb_initialized = 0;
72 static volatile uint8_t fdb_open_inprog = 0;
73 #ifdef SPIN_INITIALIZER
74 static spin_t initial_lock = SPIN_INITIALIZER;
75 #else
76 static volatile unsigned int initial_lock_status = 0;
77 static spin_t initial_lock;
78 #endif
79 
80 static fdb_status _fdb_wal_snapshot_func(void *handle, fdb_doc *doc,
81                                          uint64_t offset);
82 
_cmp_uint64_t_endian_safe(void *key1, void *key2, void *aux)83 INLINE int _cmp_uint64_t_endian_safe(void *key1, void *key2, void *aux)
84 {
85     (void) aux;
86     uint64_t a,b;
87     a = *(uint64_t*)key1;
88     b = *(uint64_t*)key2;
89     a = _endian_decode(a);
90     b = _endian_decode(b);
91     return _CMP_U64(a, b);
92 }
93 
_fdb_readkey_wrap(void *handle, uint64_t offset, void *buf)94 size_t _fdb_readkey_wrap(void *handle, uint64_t offset, void *buf)
95 {
96     keylen_t keylen;
97     offset = _endian_decode(offset);
98     docio_read_doc_key((struct docio_handle *)handle, offset, &keylen, buf);
99     return keylen;
100 }
101 
_fdb_readseq_wrap(void *handle, uint64_t offset, void *buf)102 size_t _fdb_readseq_wrap(void *handle, uint64_t offset, void *buf)
103 {
104     int size_id, size_seq, size_chunk;
105     fdb_seqnum_t _seqnum;
106     struct docio_object doc;
107     struct docio_handle *dhandle = (struct docio_handle *)handle;
108 
109     size_id = sizeof(fdb_kvs_id_t);
110     size_seq = sizeof(fdb_seqnum_t);
111     size_chunk = dhandle->file->config->chunksize;
112     memset(&doc, 0, sizeof(struct docio_object));
113 
114     offset = _endian_decode(offset);
115     docio_read_doc_key_meta((struct docio_handle *)handle, offset, &doc,
116                             true);
117     buf2buf(size_chunk, doc.key, size_id, buf);
118     _seqnum = _endian_encode(doc.seqnum);
119     memcpy((uint8_t*)buf + size_id, &_seqnum, size_seq);
120 
121     free(doc.key);
122     free(doc.meta);
123 
124     return size_id + size_seq;
125 }
126 
_fdb_custom_cmp_wrap(void *key1, void *key2, void *aux)127 int _fdb_custom_cmp_wrap(void *key1, void *key2, void *aux)
128 {
129     int is_key1_inf, is_key2_inf;
130     uint8_t *keystr1 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
131     uint8_t *keystr2 = alca(uint8_t, FDB_MAX_KEYLEN_INTERNAL);
132     size_t keylen1, keylen2;
133     btree_cmp_args *args = (btree_cmp_args *)aux;
134     fdb_custom_cmp_variable cmp = (fdb_custom_cmp_variable)args->aux;
135 
136     is_key1_inf = _is_inf_key(key1);
137     is_key2_inf = _is_inf_key(key2);
138     if (is_key1_inf && is_key2_inf) { // both are infinite
139         return 0;
140     } else if (!is_key1_inf && is_key2_inf) { // key2 is infinite
141         return -1;
142     } else if (is_key1_inf && !is_key2_inf) { // key1 is infinite
143         return 1;
144     }
145 
146     _get_var_key(key1, (void*)keystr1, &keylen1);
147     _get_var_key(key2, (void*)keystr2, &keylen2);
148 
149     if (keylen1 == 0 && keylen2 == 0) {
150         return 0;
151     } else if (keylen1 ==0 && keylen2 > 0) {
152         return -1;
153     } else if (keylen1 > 0 && keylen2 == 0) {
154         return 1;
155     }
156 
157     return cmp(keystr1, keylen1, keystr2, keylen2);
158 }
159 
fdb_fetch_header(void *header_buf, bid_t *trie_root_bid, bid_t *seq_root_bid, uint64_t *ndocs, uint64_t *nlivenodes, uint64_t *datasize, uint64_t *last_wal_flush_hdr_bid, uint64_t *kv_info_offset, uint64_t *header_flags, char **new_filename, char **old_filename)160 void fdb_fetch_header(void *header_buf,
161                       bid_t *trie_root_bid,
162                       bid_t *seq_root_bid,
163                       uint64_t *ndocs,
164                       uint64_t *nlivenodes,
165                       uint64_t *datasize,
166                       uint64_t *last_wal_flush_hdr_bid,
167                       uint64_t *kv_info_offset,
168                       uint64_t *header_flags,
169                       char **new_filename,
170                       char **old_filename)
171 {
172     size_t offset = 0;
173     uint16_t new_filename_len;
174     uint16_t old_filename_len;
175 
176     seq_memcpy(trie_root_bid, (uint8_t *)header_buf + offset,
177                sizeof(bid_t), offset);
178     *trie_root_bid = _endian_decode(*trie_root_bid);
179 
180     seq_memcpy(seq_root_bid, (uint8_t *)header_buf + offset,
181                sizeof(bid_t), offset);
182     *seq_root_bid = _endian_decode(*seq_root_bid);
183 
184     seq_memcpy(ndocs, (uint8_t *)header_buf + offset,
185                sizeof(uint64_t), offset);
186     *ndocs = _endian_decode(*ndocs);
187 
188     seq_memcpy(nlivenodes, (uint8_t *)header_buf + offset,
189                sizeof(uint64_t), offset);
190     *nlivenodes = _endian_decode(*nlivenodes);
191 
192     seq_memcpy(datasize, (uint8_t *)header_buf + offset,
193                sizeof(uint64_t), offset);
194     *datasize = _endian_decode(*datasize);
195 
196     seq_memcpy(last_wal_flush_hdr_bid, (uint8_t *)header_buf + offset,
197                sizeof(uint64_t), offset);
198     *last_wal_flush_hdr_bid = _endian_decode(*last_wal_flush_hdr_bid);
199 
200     seq_memcpy(kv_info_offset, (uint8_t *)header_buf + offset,
201                sizeof(uint64_t), offset);
202     *kv_info_offset = _endian_decode(*kv_info_offset);
203 
204     seq_memcpy(header_flags, (uint8_t *)header_buf + offset,
205                sizeof(uint64_t), offset);
206     *header_flags = _endian_decode(*header_flags);
207 
208     seq_memcpy(&new_filename_len, (uint8_t *)header_buf + offset,
209                sizeof(new_filename_len), offset);
210     new_filename_len = _endian_decode(new_filename_len);
211     seq_memcpy(&old_filename_len, (uint8_t *)header_buf + offset,
212                sizeof(old_filename_len), offset);
213     old_filename_len = _endian_decode(old_filename_len);
214     if (new_filename_len) {
215         *new_filename = (char*)((uint8_t *)header_buf + offset);
216     } else {
217         *new_filename = NULL;
218     }
219     offset += new_filename_len;
220     if (old_filename && old_filename_len) {
221         *old_filename = (char *) malloc(old_filename_len);
222         seq_memcpy(*old_filename,
223                    (uint8_t *)header_buf + offset,
224                    old_filename_len, offset);
225     }
226 }
227 
228 typedef enum {
229     FDB_RESTORE_NORMAL,
230     FDB_RESTORE_KV_INS,
231 } fdb_restore_mode_t;
232 
_fdb_restore_wal(fdb_kvs_handle *handle, fdb_restore_mode_t mode, bid_t hdr_bid, fdb_kvs_id_t kv_id_req)233 INLINE void _fdb_restore_wal(fdb_kvs_handle *handle,
234                              fdb_restore_mode_t mode,
235                              bid_t hdr_bid,
236                              fdb_kvs_id_t kv_id_req)
237 {
238     struct filemgr *file = handle->file;
239     uint32_t blocksize = handle->file->blocksize;
240     uint64_t last_wal_flush_hdr_bid = handle->last_wal_flush_hdr_bid;
241     uint64_t hdr_off = hdr_bid * FDB_BLOCKSIZE;
242     uint64_t offset = 0; //assume everything from first block needs restoration
243     err_log_callback *log_callback;
244 
245     if (!hdr_off) { // Nothing to do if we don't have a header block offset
246         return;
247     }
248 
249     if (last_wal_flush_hdr_bid != BLK_NOT_FOUND) {
250         offset = (last_wal_flush_hdr_bid + 1) * blocksize;
251     }
252 
253     // If a valid last header was retrieved and it matches the current header
254     // OR if WAL already had entries populated, then no crash recovery needed
255     if (hdr_off <= offset ||
256         (!handle->shandle && wal_get_size(file) &&
257             mode != FDB_RESTORE_KV_INS)) {
258         return;
259     }
260 
261     // Temporarily disable the error logging callback as there are false positive
262     // checksum errors in docio_read_doc.
263     // TODO: Need to adapt docio_read_doc to separate false checksum errors.
264     log_callback = handle->dhandle->log_callback;
265     handle->dhandle->log_callback = NULL;
266 
267     if (!handle->shandle) {
268         filemgr_mutex_lock(file);
269     }
270     for (; offset < hdr_off;
271         offset = ((offset / blocksize) + 1) * blocksize) { // next block's off
272         if (!docio_check_buffer(handle->dhandle, offset / blocksize)) {
273             continue;
274         } else {
275             do {
276                 struct docio_object doc;
277                 uint64_t _offset;
278                 uint64_t doc_offset;
279                 memset(&doc, 0, sizeof(doc));
280                 _offset = docio_read_doc(handle->dhandle, offset, &doc, true);
281                 if (_offset == offset) { // reached unreadable doc, skip block
282                     break;
283                 }
284                 if (doc.key || (doc.length.flag & DOCIO_TXN_COMMITTED)) {
285                     // check if the doc is transactional or not, and
286                     // also check if the doc contains system info
287                     if (!(doc.length.flag & DOCIO_TXN_DIRTY) &&
288                         !(doc.length.flag & DOCIO_SYSTEM)) {
289                         if (doc.length.flag & DOCIO_TXN_COMMITTED) {
290                             // commit mark .. read doc offset
291                             doc_offset = doc.doc_offset;
292                             // read the previously skipped doc
293                             docio_read_doc(handle->dhandle, doc_offset, &doc, true);
294                             if (doc.key == NULL) { // doc read error
295                                 free(doc.meta);
296                                 free(doc.body);
297                                 offset = _offset;
298                                 continue;
299                             }
300                         } else {
301                             doc_offset = offset;
302                         }
303 
304                         // If say a snapshot is taken on a db handle after
305                         // rollback, then skip WAL items after rollback point
306                         if (handle->config.seqtree_opt == FDB_SEQTREE_USE &&
307                             (mode == FDB_RESTORE_KV_INS || !handle->kvs) &&
308                             doc.seqnum > handle->seqnum) {
309                             free(doc.key);
310                             free(doc.meta);
311                             free(doc.body);
312                             offset = _offset;
313                             continue;
314                         }
315 
316                         // restore document
317                         fdb_doc wal_doc;
318                         wal_doc.keylen = doc.length.keylen;
319                         wal_doc.bodylen = doc.length.bodylen;
320                         wal_doc.key = doc.key;
321                         wal_doc.seqnum = doc.seqnum;
322                         wal_doc.deleted = doc.length.flag & DOCIO_DELETED;
323 
324                         if (!handle->shandle) {
325                             wal_doc.metalen = doc.length.metalen;
326                             wal_doc.meta = doc.meta;
327                             wal_doc.size_ondisk = _fdb_get_docsize(doc.length);
328 
329                             if (handle->kvs) {
330                                 // check seqnum before insert
331                                 fdb_kvs_id_t kv_id;
332                                 fdb_seqnum_t kv_seqnum;
333                                 buf2kvid(handle->config.chunksize,
334                                          wal_doc.key, &kv_id);
335 
336                                 if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
337                                     kv_seqnum = fdb_kvs_get_seqnum(handle->file, kv_id);
338                                 } else {
339                                     kv_seqnum = SEQNUM_NOT_USED;
340                                 }
341                                 if (doc.seqnum <= kv_seqnum &&
342                                         ((mode == FDB_RESTORE_KV_INS &&
343                                             kv_id == kv_id_req) ||
344                                          (mode == FDB_RESTORE_NORMAL)) ) {
345                                     // if mode is NORMAL, restore all items
346                                     // if mode is KV_INS, restore items matching ID
347                                     wal_insert(&file->global_txn, file,
348                                                &wal_doc, doc_offset, 0);
349                                 }
350                             } else {
351                                 wal_insert(&file->global_txn, file,
352                                            &wal_doc, doc_offset, 0);
353                             }
354                             if (doc.key) free(doc.key);
355                         } else {
356                             // snapshot
357                             if (handle->kvs) {
358                                 fdb_kvs_id_t kv_id;
359                                 buf2kvid(handle->config.chunksize,
360                                          wal_doc.key, &kv_id);
361                                 if (kv_id == handle->kvs->id) {
362                                     // snapshot: insert ID matched documents only
363                                     snap_insert(handle->shandle,
364                                                 &wal_doc, doc_offset);
365                                 } else {
366                                     free(doc.key);
367                                 }
368                             } else {
369                                 snap_insert(handle->shandle, &wal_doc, doc_offset);
370                             }
371                         }
372                         free(doc.meta);
373                         free(doc.body);
374                         offset = _offset;
375                     } else {
376                         // skip transactional document or system document
377                         free(doc.key);
378                         free(doc.meta);
379                         free(doc.body);
380                         offset = _offset;
381                         // do not break.. read next doc
382                     }
383                 } else {
384                     free(doc.key);
385                     free(doc.meta);
386                     free(doc.body);
387                     offset = _offset;
388                     break;
389                 }
390             } while (offset + sizeof(struct docio_length) < hdr_off);
391         }
392     }
393     // wal commit
394     if (!handle->shandle) {
395         wal_commit(&file->global_txn, file, NULL, &handle->log_callback);
396         filemgr_mutex_unlock(file);
397     }
398     handle->dhandle->log_callback = log_callback;
399 }
400 
_fdb_recover_compaction(fdb_kvs_handle *handle, const char *new_filename)401 INLINE fdb_status _fdb_recover_compaction(fdb_kvs_handle *handle,
402                                           const char *new_filename)
403 {
404     fdb_kvs_handle new_db;
405     fdb_config config = handle->config;
406     struct filemgr *new_file;
407 
408     memset(&new_db, 0, sizeof(new_db));
409     new_db.log_callback.callback = handle->log_callback.callback;
410     new_db.log_callback.ctx_data = handle->log_callback.ctx_data;
411     config.flags |= FDB_OPEN_FLAG_RDONLY;
412     new_db.fhandle = handle->fhandle;
413     new_db.kvs_config = handle->kvs_config;
414     fdb_status status = _fdb_open(&new_db, new_filename,
415                                   FDB_AFILENAME, &config);
416     if (status != FDB_RESULT_SUCCESS) {
417         return fdb_log(&handle->log_callback, status,
418                        "Error in opening a partially compacted file '%s' for recovery.",
419                        new_filename);
420     }
421 
422     new_file = new_db.file;
423 
424     if (new_file->old_filename &&
425         !strncmp(new_file->old_filename, handle->file->filename,
426                  FDB_MAX_FILENAME_LEN)) {
427         struct filemgr *old_file = handle->file;
428         // If new file has a recorded old_filename then it means that
429         // compaction has completed successfully. Mark self for deletion
430         filemgr_mutex_lock(new_file);
431 
432         status = btreeblk_end(handle->bhandle);
433         if (status != FDB_RESULT_SUCCESS) {
434             filemgr_mutex_unlock(new_file);
435             _fdb_close(&new_db);
436             return status;
437         }
438         btreeblk_free(handle->bhandle);
439         free(handle->bhandle);
440         handle->bhandle = new_db.bhandle;
441 
442         docio_free(handle->dhandle);
443         free(handle->dhandle);
444         handle->dhandle = new_db.dhandle;
445 
446         hbtrie_free(handle->trie);
447         free(handle->trie);
448         handle->trie = new_db.trie;
449 
450         wal_shutdown(handle->file);
451         handle->file = new_file;
452 
453         if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
454             if (handle->kvs) {
455                 // multi KV instance mode
456                 hbtrie_free(handle->seqtrie);
457                 free(handle->seqtrie);
458                 if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
459                     handle->seqtrie = new_db.seqtrie;
460                 }
461             } else {
462                 free(handle->seqtree->kv_ops);
463                 free(handle->seqtree);
464                 if (new_db.config.seqtree_opt == FDB_SEQTREE_USE) {
465                     handle->seqtree = new_db.seqtree;
466                 }
467             }
468         }
469 
470         filemgr_mutex_unlock(new_file);
471         if (new_db.kvs) {
472             fdb_kvs_info_free(&new_db);
473         }
474         // remove self: WARNING must not close this handle if snapshots
475         // are yet to open this file
476         filemgr_remove_pending(old_file, new_db.file);
477         filemgr_close(old_file, 0, handle->filename, &handle->log_callback);
478         free(new_db.filename);
479         return FDB_RESULT_FAIL_BY_COMPACTION;
480     }
481 
482     // As the new file is partially compacted, it should be removed upon close.
483     // Just in-case the new file gets opened before removal, point it to the old
484     // file to ensure availability of data.
485     filemgr_remove_pending(new_db.file, handle->file);
486     _fdb_close(&new_db);
487 
488     return FDB_RESULT_SUCCESS;
489 }
490 
491 LIBFDB_API
fdb_init(fdb_config *config)492 fdb_status fdb_init(fdb_config *config)
493 {
494     fdb_config _config;
495     compactor_config c_config;
496     struct filemgr_config f_config;
497 
498     if (config) {
499         if (validate_fdb_config(config)) {
500             _config = *config;
501         } else {
502             return FDB_RESULT_INVALID_CONFIG;
503         }
504     } else {
505         _config = get_default_config();
506     }
507 
508     // global initialization
509     // initialized only once at first time
510     if (!fdb_initialized) {
511 #ifdef _TRACE_HANDLES
512         spin_init(&open_handle_lock);
513         avl_init(&open_handles, NULL);
514 #endif
515 
516 #ifndef SPIN_INITIALIZER
517         // Note that only Windows passes through this routine
518         if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
519             // atomically initialize spin lock only once
520             spin_init(&initial_lock);
521             initial_lock_status = 2;
522         } else {
523             // the others .. wait until initializing 'initial_lock' is done
524             while (initial_lock_status != 2) {
525                 Sleep(1);
526             }
527         }
528 #endif
529 
530     }
531     spin_lock(&initial_lock);
532     if (!fdb_initialized) {
533         double ram_size = (double) get_memory_size();
534         if (ram_size * BCACHE_MEMORY_THRESHOLD < (double) _config.buffercache_size) {
535             spin_unlock(&initial_lock);
536             return FDB_RESULT_TOO_BIG_BUFFER_CACHE;
537         }
538         // initialize file manager and block cache
539         f_config.blocksize = _config.blocksize;
540         f_config.ncacheblock = _config.buffercache_size / _config.blocksize;
541         filemgr_init(&f_config);
542         filemgr_set_lazy_file_deletion(true,
543                                        compactor_register_file_removing,
544                                        compactor_is_file_removed);
545 
546         // initialize compaction daemon
547         c_config.sleep_duration = _config.compactor_sleep_duration;
548         c_config.num_threads = _config.num_compactor_threads;
549         compactor_init(&c_config);
550 
551         fdb_initialized = 1;
552     }
553     fdb_open_inprog++;
554     spin_unlock(&initial_lock);
555 
556     return FDB_RESULT_SUCCESS;
557 }
558 
559 LIBFDB_API
fdb_get_default_config(void)560 fdb_config fdb_get_default_config(void) {
561     return get_default_config();
562 }
563 
564 LIBFDB_API
fdb_get_default_kvs_config(void)565 fdb_kvs_config fdb_get_default_kvs_config(void) {
566     return get_default_kvs_config();
567 }
568 
569 LIBFDB_API
fdb_open(fdb_file_handle **ptr_fhandle, const char *filename, fdb_config *fconfig)570 fdb_status fdb_open(fdb_file_handle **ptr_fhandle,
571                     const char *filename,
572                     fdb_config *fconfig)
573 {
574 #ifdef _MEMPOOL
575     mempool_init();
576 #endif
577 
578     fdb_config config;
579     fdb_file_handle *fhandle;
580     fdb_kvs_handle *handle;
581 
582     if (fconfig) {
583         if (validate_fdb_config(fconfig)) {
584             config = *fconfig;
585         } else {
586             return FDB_RESULT_INVALID_CONFIG;
587         }
588     } else {
589         config = get_default_config();
590     }
591 
592     fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
593     if (!fhandle) { // LCOV_EXCL_START
594         return FDB_RESULT_ALLOC_FAIL;
595     } // LCOV_EXCL_STOP
596 
597     handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
598     if (!handle) { // LCOV_EXCL_START
599         free(fhandle);
600         return FDB_RESULT_ALLOC_FAIL;
601     } // LCOV_EXCL_STOP
602 
603     atomic_init_uint8_t(&handle->handle_busy, 0);
604     handle->shandle = NULL;
605     handle->kvs_config = get_default_kvs_config();
606 
607     fdb_status fs = fdb_init(fconfig);
608     if (fs != FDB_RESULT_SUCCESS) {
609         free(handle);
610         free(fhandle);
611         return fs;
612     }
613     fdb_file_handle_init(fhandle, handle);
614 
615     fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
616     if (fs == FDB_RESULT_SUCCESS) {
617         *ptr_fhandle = fhandle;
618     } else {
619         *ptr_fhandle = NULL;
620         free(handle);
621         fdb_file_handle_free(fhandle);
622     }
623     spin_lock(&initial_lock);
624     fdb_open_inprog--;
625     spin_unlock(&initial_lock);
626     return fs;
627 }
628 
629 LIBFDB_API
fdb_open_custom_cmp(fdb_file_handle **ptr_fhandle, const char *filename, fdb_config *fconfig, size_t num_functions, char **kvs_names, fdb_custom_cmp_variable *functions)630 fdb_status fdb_open_custom_cmp(fdb_file_handle **ptr_fhandle,
631                                const char *filename,
632                                fdb_config *fconfig,
633                                size_t num_functions,
634                                char **kvs_names,
635                                fdb_custom_cmp_variable *functions)
636 {
637 #ifdef _MEMPOOL
638     mempool_init();
639 #endif
640 
641     fdb_config config;
642     fdb_file_handle *fhandle;
643     fdb_kvs_handle *handle;
644 
645     if (fconfig) {
646         if (validate_fdb_config(fconfig)) {
647             config = *fconfig;
648         } else {
649             return FDB_RESULT_INVALID_CONFIG;
650         }
651     } else {
652         config = get_default_config();
653     }
654 
655     if (config.multi_kv_instances == false) {
656         // single KV instance mode does not support customized cmp function
657         return FDB_RESULT_INVALID_CONFIG;
658     }
659 
660     fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
661     if (!fhandle) { // LCOV_EXCL_START
662         return FDB_RESULT_ALLOC_FAIL;
663     } // LCOV_EXCL_STOP
664 
665     handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
666     if (!handle) { // LCOV_EXCL_START
667         free(fhandle);
668         return FDB_RESULT_ALLOC_FAIL;
669     } // LCOV_EXCL_STOP
670 
671     atomic_init_uint8_t(&handle->handle_busy, 0);
672     handle->shandle = NULL;
673     handle->kvs_config = get_default_kvs_config();
674 
675     fdb_status fs = fdb_init(fconfig);
676     if (fs != FDB_RESULT_SUCCESS) {
677         free(handle);
678         free(fhandle);
679         return fs;
680     }
681     fdb_file_handle_init(fhandle, handle);
682 
683     // insert kvs_names and functions into fhandle's list
684     fdb_file_handle_parse_cmp_func(fhandle, num_functions,
685                                    kvs_names, functions);
686 
687     fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
688     if (fs == FDB_RESULT_SUCCESS) {
689         *ptr_fhandle = fhandle;
690     } else {
691         *ptr_fhandle = NULL;
692         free(handle);
693         fdb_file_handle_free(fhandle);
694     }
695     spin_lock(&initial_lock);
696     fdb_open_inprog--;
697     spin_unlock(&initial_lock);
698     return fs;
699 }
700 
fdb_open_for_compactor(fdb_file_handle **ptr_fhandle, const char *filename, fdb_config *fconfig, struct list *cmp_func_list)701 fdb_status fdb_open_for_compactor(fdb_file_handle **ptr_fhandle,
702                                   const char *filename,
703                                   fdb_config *fconfig,
704                                   struct list *cmp_func_list)
705 {
706 #ifdef _MEMPOOL
707     mempool_init();
708 #endif
709 
710     fdb_file_handle *fhandle;
711     fdb_kvs_handle *handle;
712 
713     fhandle = (fdb_file_handle*)calloc(1, sizeof(fdb_file_handle));
714     if (!fhandle) { // LCOV_EXCL_START
715         return FDB_RESULT_ALLOC_FAIL;
716     } // LCOV_EXCL_STOP
717 
718     handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
719     if (!handle) { // LCOV_EXCL_START
720         free(fhandle);
721         return FDB_RESULT_ALLOC_FAIL;
722     } // LCOV_EXCL_STOP
723 
724     atomic_init_uint8_t(&handle->handle_busy, 0);
725     handle->shandle = NULL;
726 
727     fdb_file_handle_init(fhandle, handle);
728     if (cmp_func_list) {
729         fdb_file_handle_clone_cmp_func_list(fhandle, cmp_func_list);
730     }
731     fdb_status fs = _fdb_open(handle, filename, FDB_VFILENAME, fconfig);
732     if (fs == FDB_RESULT_SUCCESS) {
733         *ptr_fhandle = fhandle;
734     } else {
735         *ptr_fhandle = NULL;
736         free(handle);
737         fdb_file_handle_free(fhandle);
738     }
739     return fs;
740 }
741 
742 LIBFDB_API
fdb_snapshot_open(fdb_kvs_handle *handle_in, fdb_kvs_handle **ptr_handle, fdb_seqnum_t seqnum)743 fdb_status fdb_snapshot_open(fdb_kvs_handle *handle_in,
744                              fdb_kvs_handle **ptr_handle, fdb_seqnum_t seqnum)
745 {
746 #ifdef _MEMPOOL
747     mempool_init();
748 #endif
749 
750     fdb_config config = handle_in->config;
751     fdb_kvs_config kvs_config = handle_in->kvs_config;
752     fdb_kvs_handle *handle;
753     fdb_status fs;
754     filemgr *file;
755     file_status_t fstatus = FILE_NORMAL;
756 
757     if (!handle_in || !ptr_handle) {
758         return FDB_RESULT_INVALID_ARGS;
759     }
760 
761     // Sequence trees are a must for snapshot creation
762     if (handle_in->config.seqtree_opt != FDB_SEQTREE_USE) {
763         return FDB_RESULT_INVALID_CONFIG;
764     }
765 
766 fdb_snapshot_open_start:
767     if (!handle_in->shandle) {
768         fdb_check_file_reopen(handle_in, &fstatus);
769         fdb_sync_db_header(handle_in);
770         file = handle_in->file;
771 
772         if (handle_in->kvs && handle_in->kvs->type == KVS_SUB) {
773             handle_in->seqnum = fdb_kvs_get_seqnum(file,
774                                                    handle_in->kvs->id);
775         } else {
776             handle_in->seqnum = filemgr_get_seqnum(file);
777         }
778     } else {
779         file = handle_in->file;
780     }
781 
782     // if the max sequence number seen by this handle is lower than the
783     // requested snapshot marker, it means the snapshot is not yet visible
784     // even via the current fdb_kvs_handle
785     if (seqnum != FDB_SNAPSHOT_INMEM && seqnum > handle_in->seqnum) {
786         return FDB_RESULT_NO_DB_INSTANCE;
787     }
788 
789     handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
790     if (!handle) { // LCOV_EXCL_START
791         return FDB_RESULT_ALLOC_FAIL;
792     } // LCOV_EXCL_STOP
793 
794     atomic_init_uint8_t(&handle->handle_busy, 0);
795     handle->log_callback = handle_in->log_callback;
796     handle->max_seqnum = seqnum;
797     handle->fhandle = handle_in->fhandle;
798 
799     config.flags |= FDB_OPEN_FLAG_RDONLY;
800     // do not perform compaction for snapshot
801     config.compaction_mode = FDB_COMPACTION_MANUAL;
802 
803     // If cloning an existing snapshot handle, then rewind indexes
804     // to its last DB header and point its avl tree to existing snapshot's tree
805     bool clone_snapshot = false;
806     if (handle_in->shandle) {
807         handle->last_hdr_bid = handle_in->last_hdr_bid; // do fast rewind
808         if (snap_clone(handle_in->shandle, handle_in->max_seqnum,
809                    &handle->shandle, seqnum) == FDB_RESULT_SUCCESS) {
810             handle->max_seqnum = FDB_SNAPSHOT_INMEM; // temp value to skip WAL
811             clone_snapshot = true;
812         }
813     }
814 
815     if (!handle->shandle) {
816         handle->shandle = (struct snap_handle *) calloc(1, sizeof(snap_handle));
817         if (!handle->shandle) { // LCOV_EXCL_START
818             free(handle);
819             return FDB_RESULT_ALLOC_FAIL;
820         } // LCOV_EXCL_STOP
821         snap_init(handle->shandle, handle_in);
822     }
823 
824     if (handle_in->kvs) {
825         // sub-handle in multi KV instance mode
826         if (clone_snapshot) {
827             fs = _fdb_kvs_clone_snapshot(handle_in, handle);
828         } else {
829             fs = _fdb_kvs_open(handle_in->kvs->root,
830                               &config, &kvs_config, file,
831                               file->filename,
832                               _fdb_kvs_get_name(handle_in, file),
833                               handle);
834         }
835     } else {
836         if (clone_snapshot) {
837             fs = _fdb_clone_snapshot(handle_in, handle);
838         } else {
839             fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
840         }
841     }
842 
843     if (fs == FDB_RESULT_SUCCESS) {
844         if (seqnum == FDB_SNAPSHOT_INMEM &&
845             !handle_in->shandle) {
846             fdb_seqnum_t upto_seq = seqnum;
847             // In-memory snapshot
848             wal_snapshot(handle->file, (void *)handle->shandle,
849                          handle_in->txn, &upto_seq, _fdb_wal_snapshot_func);
850             // set seqnum based on handle type (multikv or default)
851             if (handle_in->kvs && handle_in->kvs->id > 0) {
852                 handle->max_seqnum =
853                     _fdb_kvs_get_seqnum(handle->file->kv_header,
854                                         handle_in->kvs->id);
855             } else {
856                 handle->max_seqnum = filemgr_get_seqnum(handle->file);
857             }
858 
859             // synchronize dirty root nodes if exist
860             if (filemgr_dirty_root_exist(handle->file)) {
861                 bid_t dirty_idtree_root, dirty_seqtree_root;
862                 filemgr_mutex_lock(handle->file);
863                 filemgr_get_dirty_root(handle->file,
864                                        &dirty_idtree_root, &dirty_seqtree_root);
865                 if (dirty_idtree_root != BLK_NOT_FOUND) {
866                     handle->trie->root_bid = dirty_idtree_root;
867                 }
868                 if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
869                     if (dirty_seqtree_root != BLK_NOT_FOUND) {
870                         if (handle->kvs) {
871                             handle->seqtrie->root_bid = dirty_seqtree_root;
872                         } else {
873                             btree_init_from_bid(handle->seqtree,
874                                                 handle->seqtree->blk_handle,
875                                                 handle->seqtree->blk_ops,
876                                                 handle->seqtree->kv_ops,
877                                                 handle->seqtree->blksize,
878                                                 dirty_seqtree_root);
879                         }
880                     }
881                 }
882                 btreeblk_discard_blocks(handle->bhandle);
883                 btreeblk_create_dirty_snapshot(handle->bhandle);
884                 filemgr_mutex_unlock(handle->file);
885             }
886         } else if (clone_snapshot) {
887             // Snapshot is created on the other snapshot handle
888 
889             handle->max_seqnum = handle_in->seqnum;
890 
891             if (seqnum == FDB_SNAPSHOT_INMEM) {
892                 // in-memory snapshot
893                 // Clone dirty root nodes from the source snapshot by incrementing
894                 // their ref counters
895                 handle->trie->root_bid = handle_in->trie->root_bid;
896                 if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
897                     if (handle->kvs) {
898                         handle->seqtrie->root_bid = handle_in->seqtrie->root_bid;
899                     } else {
900                         handle->seqtree->root_bid = handle_in->seqtree->root_bid;
901                     }
902                 }
903                 btreeblk_discard_blocks(handle->bhandle);
904                 btreeblk_clone_dirty_snapshot(handle->bhandle,
905                                               handle_in->bhandle);
906             }
907         }
908         *ptr_handle = handle;
909     } else {
910         *ptr_handle = NULL;
911         snap_close(handle->shandle);
912         free(handle);
913         // If compactor thread had finished compaction just before this routine
914         // calls _fdb_open, then it is possible that the snapshot's DB header
915         // is only present in the new_file. So we must retry the snapshot
916         // open attempt IFF _fdb_open indicates FDB_RESULT_NO_DB_INSTANCE..
917         if (fs == FDB_RESULT_NO_DB_INSTANCE && fstatus == FILE_COMPACT_OLD) {
918             if (filemgr_get_file_status(file) == FILE_REMOVED_PENDING) {
919                 goto fdb_snapshot_open_start;
920             }
921         }
922     }
923     return fs;
924 }
925 
926 static fdb_status _fdb_reset(fdb_kvs_handle *handle, fdb_kvs_handle *handle_in);
927 
928 LIBFDB_API
fdb_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)929 fdb_status fdb_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
930 {
931 #ifdef _MEMPOOL
932     mempool_init();
933 #endif
934 
935     fdb_config config;
936     fdb_kvs_handle *handle_in, *handle;
937     fdb_status fs;
938     fdb_seqnum_t old_seqnum;
939 
940     if (!handle_ptr) {
941         return FDB_RESULT_INVALID_ARGS;
942     }
943 
944     handle_in = *handle_ptr;
945     config = handle_in->config;
946 
947     if (handle_in->kvs) {
948         return fdb_kvs_rollback(handle_ptr, seqnum);
949     }
950 
951     // Sequence trees are a must for rollback
952     if (handle_in->config.seqtree_opt != FDB_SEQTREE_USE) {
953         return FDB_RESULT_INVALID_CONFIG;
954     }
955 
956     if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
957         return fdb_log(&handle_in->log_callback, FDB_RESULT_RONLY_VIOLATION,
958                        "Warning: Rollback is not allowed on the read-only DB file '%s'.",
959                        handle_in->file->filename);
960     }
961 
962     if (!atomic_cas_uint8_t(&handle_in->handle_busy, 0, 1)) {
963         return FDB_RESULT_HANDLE_BUSY;
964     }
965 
966     filemgr_mutex_lock(handle_in->file);
967     filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
968     // All transactions should be closed before rollback
969     if (wal_txn_exists(handle_in->file)) {
970         filemgr_set_rollback(handle_in->file, 0);
971         filemgr_mutex_unlock(handle_in->file);
972         fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
973         return FDB_RESULT_FAIL_BY_TRANSACTION;
974     }
975 
976     // If compaction is running, wait until it is aborted.
977     // TODO: Find a better way of waiting for the compaction abortion.
978     unsigned int sleep_time = 10000; // 10 ms.
979     file_status_t fstatus = filemgr_get_file_status(handle_in->file);
980     while (fstatus == FILE_COMPACT_OLD) {
981         filemgr_mutex_unlock(handle_in->file);
982         decaying_usleep(&sleep_time, 1000000);
983         filemgr_mutex_lock(handle_in->file);
984         fstatus = filemgr_get_file_status(handle_in->file);
985     }
986     if (fstatus == FILE_REMOVED_PENDING) {
987         filemgr_mutex_unlock(handle_in->file);
988         fdb_check_file_reopen(handle_in, NULL);
989     } else {
990         filemgr_mutex_unlock(handle_in->file);
991     }
992 
993     fdb_sync_db_header(handle_in);
994 
995     // if the max sequence number seen by this handle is lower than the
996     // requested snapshot marker, it means the snapshot is not yet visible
997     // even via the current fdb_kvs_handle
998     if (seqnum > handle_in->seqnum) {
999         filemgr_set_rollback(handle_in->file, 0); // allow mutations
1000         fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
1001         return FDB_RESULT_NO_DB_INSTANCE;
1002     }
1003 
1004     handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1005     if (!handle) { // LCOV_EXCL_START
1006         fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
1007         return FDB_RESULT_ALLOC_FAIL;
1008     } // LCOV_EXCL_STOP
1009 
1010     atomic_init_uint8_t(&handle->handle_busy, 0);
1011     handle->log_callback = handle_in->log_callback;
1012     handle->fhandle = handle_in->fhandle;
1013     if (seqnum == 0) {
1014         fs = _fdb_reset(handle, handle_in);
1015     } else {
1016         handle->max_seqnum = seqnum;
1017         fs = _fdb_open(handle, handle_in->file->filename, FDB_AFILENAME,
1018                        &config);
1019     }
1020 
1021     filemgr_set_rollback(handle_in->file, 0); // allow mutations
1022     if (fs == FDB_RESULT_SUCCESS) {
1023         // rollback the file's sequence number
1024         filemgr_mutex_lock(handle_in->file);
1025         old_seqnum = filemgr_get_seqnum(handle_in->file);
1026         filemgr_set_seqnum(handle_in->file, seqnum);
1027         filemgr_mutex_unlock(handle_in->file);
1028 
1029         fs = _fdb_commit(handle, FDB_COMMIT_NORMAL);
1030         if (fs == FDB_RESULT_SUCCESS) {
1031             if (handle_in->txn) {
1032                 handle->txn = handle_in->txn;
1033                 handle_in->txn = NULL;
1034             }
1035             handle_in->fhandle->root = handle;
1036             _fdb_close_root(handle_in);
1037             handle->max_seqnum = 0;
1038             handle->seqnum = seqnum;
1039             *handle_ptr = handle;
1040         } else {
1041             // cancel the rolling-back of the sequence number
1042             filemgr_mutex_lock(handle_in->file);
1043             filemgr_set_seqnum(handle_in->file, old_seqnum);
1044             filemgr_mutex_unlock(handle_in->file);
1045             free(handle);
1046             fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
1047         }
1048     } else {
1049         free(handle);
1050         fdb_assert(atomic_cas_uint8_t(&handle_in->handle_busy, 1, 0), 1, 0);
1051     }
1052 
1053     return fs;
1054 }
1055 
_fdb_init_file_config(const fdb_config *config, struct filemgr_config *fconfig)1056 static void _fdb_init_file_config(const fdb_config *config,
1057                                   struct filemgr_config *fconfig) {
1058     fconfig->blocksize = config->blocksize;
1059     fconfig->ncacheblock = config->buffercache_size / config->blocksize;
1060     fconfig->chunksize = config->chunksize;
1061 
1062     fconfig->options = 0x0;
1063     if (config->flags & FDB_OPEN_FLAG_CREATE) {
1064         fconfig->options |= FILEMGR_CREATE;
1065     }
1066     if (config->flags & FDB_OPEN_FLAG_RDONLY) {
1067         fconfig->options |= FILEMGR_READONLY;
1068     }
1069     if (!(config->durability_opt & FDB_DRB_ASYNC)) {
1070         fconfig->options |= FILEMGR_SYNC;
1071     }
1072 
1073     fconfig->flag = 0x0;
1074     if ((config->durability_opt & FDB_DRB_ODIRECT) &&
1075         config->buffercache_size) {
1076         fconfig->flag |= _ARCH_O_DIRECT;
1077     }
1078 
1079     fconfig->prefetch_duration = config->prefetch_duration;
1080     fconfig->num_wal_shards = config->num_wal_partitions;
1081     fconfig->num_bcache_shards = config->num_bcache_partitions;
1082 }
1083 
_fdb_clone_snapshot(fdb_kvs_handle *handle_in, fdb_kvs_handle *handle_out)1084 fdb_status _fdb_clone_snapshot(fdb_kvs_handle *handle_in,
1085                                fdb_kvs_handle *handle_out)
1086 {
1087     fdb_status status;
1088 
1089     handle_out->config = handle_in->config;
1090     handle_out->kvs_config = handle_in->kvs_config;
1091     handle_out->fileops = handle_in->fileops;
1092     handle_out->file = handle_in->file;
1093     // Note that the file ref count will be decremented when the cloned snapshot
1094     // is closed through filemgr_close().
1095     filemgr_incr_ref_count(handle_out->file);
1096 
1097     if (handle_out->filename) {
1098         handle_out->filename = (char *)realloc(handle_out->filename,
1099                                                strlen(handle_in->filename)+1);
1100     } else {
1101         handle_out->filename = (char*)malloc(strlen(handle_in->filename)+1);
1102     }
1103     strcpy(handle_out->filename, handle_in->filename);
1104 
1105     // initialize the docio handle.
1106     handle_out->dhandle = (struct docio_handle *)
1107         calloc(1, sizeof(struct docio_handle));
1108     handle_out->dhandle->log_callback = &handle_out->log_callback;
1109     docio_init(handle_out->dhandle, handle_out->file,
1110                handle_out->config.compress_document_body);
1111 
1112     // initialize the btree block handle.
1113     handle_out->btreeblkops = btreeblk_get_ops();
1114     handle_out->bhandle = (struct btreeblk_handle *)
1115         calloc(1, sizeof(struct btreeblk_handle));
1116     handle_out->bhandle->log_callback = &handle_out->log_callback;
1117     btreeblk_init(handle_out->bhandle, handle_out->file, handle_out->file->blocksize);
1118 
1119     handle_out->dirty_updates = handle_in->dirty_updates;
1120     handle_out->cur_header_revnum = handle_in->cur_header_revnum;
1121     handle_out->last_wal_flush_hdr_bid = handle_in->last_wal_flush_hdr_bid;
1122     handle_out->kv_info_offset = handle_in->kv_info_offset;
1123     handle_out->shandle->stat = handle_in->shandle->stat;
1124     handle_out->op_stats = handle_in->op_stats;
1125 
1126     // initialize the trie handle
1127     handle_out->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1128     hbtrie_init(handle_out->trie, handle_out->config.chunksize, OFFSET_SIZE,
1129                 handle_out->file->blocksize,
1130                 handle_in->trie->root_bid, // Source snapshot's trie root bid
1131                 (void *)handle_out->bhandle, handle_out->btreeblkops,
1132                 (void *)handle_out->dhandle, _fdb_readkey_wrap);
1133     // set aux for cmp wrapping function
1134     hbtrie_set_leaf_height_limit(handle_out->trie, 0xff);
1135     hbtrie_set_leaf_cmp(handle_out->trie, _fdb_custom_cmp_wrap);
1136 
1137     if (handle_out->kvs) {
1138         hbtrie_set_map_function(handle_out->trie, fdb_kvs_find_cmp_chunk);
1139     }
1140 
1141     if (handle_out->config.seqtree_opt == FDB_SEQTREE_USE) {
1142         handle_out->seqnum = handle_in->seqnum;
1143 
1144         if (handle_out->config.multi_kv_instances) {
1145             // multi KV instance mode .. HB+trie
1146             handle_out->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1147             hbtrie_init(handle_out->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
1148                         handle_out->file->blocksize,
1149                         handle_in->seqtrie->root_bid, // Source snapshot's seqtrie root bid
1150                         (void *)handle_out->bhandle, handle_out->btreeblkops,
1151                         (void *)handle_out->dhandle, _fdb_readseq_wrap);
1152 
1153         } else {
1154             // single KV instance mode .. normal B+tree
1155             struct btree_kv_ops *seq_kv_ops =
1156                 (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
1157             seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
1158             seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
1159 
1160             handle_out->seqtree = (struct btree*)malloc(sizeof(struct btree));
1161             // Init the seq tree using the root bid of the source snapshot.
1162             btree_init_from_bid(handle_out->seqtree, (void *)handle_out->bhandle,
1163                                 handle_out->btreeblkops, seq_kv_ops,
1164                                 handle_out->config.blocksize,
1165                                 handle_in->seqtree->root_bid);
1166         }
1167     } else{
1168         handle_out->seqtree = NULL;
1169     }
1170 
1171     status = btreeblk_end(handle_out->bhandle);
1172     fdb_assert(status == FDB_RESULT_SUCCESS, status, handle_out);
1173 
1174 #ifdef _TRACE_HANDLES
1175     spin_lock(&open_handle_lock);
1176     avl_insert(&open_handles, &handle_out->avl_trace, _fdb_handle_cmp);
1177     spin_unlock(&open_handle_lock);
1178 #endif
1179     return status;
1180 }
1181 
_fdb_open(fdb_kvs_handle *handle, const char *filename, fdb_filename_mode_t filename_mode, const fdb_config *config)1182 fdb_status _fdb_open(fdb_kvs_handle *handle,
1183                      const char *filename,
1184                      fdb_filename_mode_t filename_mode,
1185                      const fdb_config *config)
1186 {
1187     struct filemgr_config fconfig;
1188     struct kvs_stat stat, empty_stat;
1189     bid_t trie_root_bid = BLK_NOT_FOUND;
1190     bid_t seq_root_bid = BLK_NOT_FOUND;
1191     fdb_seqnum_t seqnum = 0;
1192     filemgr_header_revnum_t header_revnum = 0;
1193     fdb_seqtree_opt_t seqtree_opt = config->seqtree_opt;
1194     uint64_t ndocs = 0;
1195     uint64_t datasize = 0;
1196     uint64_t last_wal_flush_hdr_bid = BLK_NOT_FOUND;
1197     uint64_t kv_info_offset = BLK_NOT_FOUND;
1198     uint64_t header_flags = 0;
1199     uint8_t header_buf[FDB_BLOCKSIZE];
1200     char *compacted_filename = NULL;
1201     char *prev_filename = NULL;
1202     size_t header_len = 0;
1203     bool multi_kv_instances = config->multi_kv_instances;
1204 
1205     uint64_t nlivenodes = 0;
1206     bid_t hdr_bid = 0; // initialize to zero for in-memory snapshot
1207     char actual_filename[FDB_MAX_FILENAME_LEN];
1208     char virtual_filename[FDB_MAX_FILENAME_LEN];
1209     char *target_filename = NULL;
1210     fdb_status status;
1211 
1212     if (filename == NULL) {
1213         return FDB_RESULT_INVALID_ARGS;
1214     }
1215     if (strlen(filename) > (FDB_MAX_FILENAME_LEN - 8)) {
1216         // filename (including path) length is supported up to
1217         // (FDB_MAX_FILENAME_LEN - 8) bytes.
1218         return FDB_RESULT_TOO_LONG_FILENAME;
1219     }
1220 
1221     if (filename_mode == FDB_VFILENAME &&
1222         !compactor_is_valid_mode(filename, (fdb_config *)config)) {
1223         return FDB_RESULT_INVALID_COMPACTION_MODE;
1224     }
1225 
1226     _fdb_init_file_config(config, &fconfig);
1227 
1228     if (filename_mode == FDB_VFILENAME) {
1229         compactor_get_actual_filename(filename, actual_filename,
1230                                       config->compaction_mode, &handle->log_callback);
1231     } else {
1232         strcpy(actual_filename, filename);
1233     }
1234 
1235     if ( config->compaction_mode == FDB_COMPACTION_MANUAL ||
1236          (config->compaction_mode == FDB_COMPACTION_AUTO   &&
1237           filename_mode == FDB_VFILENAME) ) {
1238         // 1) manual compaction mode, OR
1239         // 2) auto compaction mode + 'filename' is virtual filename
1240         // -> copy 'filename'
1241         target_filename = (char *)filename;
1242     } else {
1243         // otherwise (auto compaction mode + 'filename' is actual filename)
1244         // -> copy 'virtual_filename'
1245         compactor_get_virtual_filename(filename, virtual_filename);
1246         target_filename = virtual_filename;
1247     }
1248 
1249     handle->fileops = get_filemgr_ops();
1250     filemgr_open_result result = filemgr_open((char *)actual_filename,
1251                                               handle->fileops,
1252                                               &fconfig, &handle->log_callback);
1253     if (result.rv != FDB_RESULT_SUCCESS) {
1254         return (fdb_status) result.rv;
1255     }
1256 
1257     handle->file = result.file;
1258     if (config->compaction_mode == FDB_COMPACTION_MANUAL &&
1259         strcmp(filename, actual_filename)) {
1260         // It is in-place compacted file if
1261         // 1) compaction mode is manual, and
1262         // 2) actual filename is different to the filename given by user.
1263         // In this case, set the in-place compaction flag.
1264         filemgr_set_in_place_compaction(handle->file, true);
1265     }
1266     if (filemgr_is_in_place_compaction_set(handle->file)) {
1267         // This file was in-place compacted.
1268         // set 'handle->filename' to the original filename to trigger file renaming
1269         compactor_get_virtual_filename(filename, virtual_filename);
1270         target_filename = virtual_filename;
1271     }
1272 
1273     if (handle->filename) {
1274         handle->filename = (char *)realloc(handle->filename,
1275                                            strlen(target_filename)+1);
1276     } else {
1277         handle->filename = (char*)malloc(strlen(target_filename)+1);
1278     }
1279     strcpy(handle->filename, target_filename);
1280 
1281     // If cloning from a snapshot handle, fdb_snapshot_open would have already
1282     // set handle->last_hdr_bid to the block id of required header, so rewind..
1283     if (handle->shandle && handle->last_hdr_bid) {
1284         status = filemgr_fetch_header(handle->file, handle->last_hdr_bid,
1285                                       header_buf, &header_len, &seqnum,
1286                                       &header_revnum, &handle->log_callback);
1287         if (status != FDB_RESULT_SUCCESS) {
1288             free(handle->filename);
1289             handle->filename = NULL;
1290             filemgr_close(handle->file, false, handle->filename,
1291                               &handle->log_callback);
1292             return status;
1293         }
1294     } else { // Normal open
1295         filemgr_get_header(handle->file, header_buf, &header_len,
1296                            &handle->last_hdr_bid, &seqnum, &header_revnum);
1297     }
1298 
1299     // initialize the docio handle so kv headers may be read
1300     handle->dhandle = (struct docio_handle *)
1301                       calloc(1, sizeof(struct docio_handle));
1302     handle->dhandle->log_callback = &handle->log_callback;
1303     docio_init(handle->dhandle, handle->file, config->compress_document_body);
1304 
1305     if (header_len > 0) {
1306         fdb_fetch_header(header_buf, &trie_root_bid,
1307                          &seq_root_bid, &ndocs, &nlivenodes,
1308                          &datasize, &last_wal_flush_hdr_bid, &kv_info_offset,
1309                          &header_flags, &compacted_filename, &prev_filename);
1310         // use existing setting for seqtree_opt
1311         if (header_flags & FDB_FLAG_SEQTREE_USE) {
1312             seqtree_opt = FDB_SEQTREE_USE;
1313         } else {
1314             seqtree_opt = FDB_SEQTREE_NOT_USE;
1315         }
1316         // Retrieve seqnum for multi-kv mode
1317         if (handle->kvs && handle->kvs->id > 0) {
1318             if (kv_info_offset != BLK_NOT_FOUND) {
1319                 if (!handle->file->kv_header) {
1320                     fdb_kvs_header_create(handle->file);
1321                     // KV header already exists but not loaded .. read & import
1322                     fdb_kvs_header_read(handle->file, handle->dhandle,
1323                                         kv_info_offset, false);
1324                 }
1325                 seqnum = _fdb_kvs_get_seqnum(handle->file->kv_header,
1326                                              handle->kvs->id);
1327             } else { // no kv_info offset, ok to set seqnum to zero
1328                 seqnum = 0;
1329             }
1330         }
1331         // other flags
1332         if (header_flags & FDB_FLAG_ROOT_INITIALIZED) {
1333             handle->fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1334         }
1335         if (header_flags & FDB_FLAG_ROOT_CUSTOM_CMP) {
1336             handle->fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1337         }
1338         // use existing setting for multi KV instance mode
1339         if (kv_info_offset == BLK_NOT_FOUND) {
1340             multi_kv_instances = false;
1341         } else {
1342             multi_kv_instances = true;
1343         }
1344     }
1345 
1346     handle->config = *config;
1347     handle->config.seqtree_opt = seqtree_opt;
1348     handle->config.multi_kv_instances = multi_kv_instances;
1349 
1350     if (handle->shandle && handle->max_seqnum == FDB_SNAPSHOT_INMEM) {
1351         // Either an in-memory snapshot or cloning from an existing snapshot..
1352         hdr_bid = 0; // This prevents _fdb_restore_wal() as incoming handle's
1353                      // *_open() should have already restored it
1354     } else { // Persisted snapshot or file rollback..
1355         hdr_bid = filemgr_get_pos(handle->file) / FDB_BLOCKSIZE;
1356         if (hdr_bid > 0) {
1357             --hdr_bid;
1358         }
1359         if (handle->max_seqnum) {
1360             struct kvs_stat stat_ori;
1361             // backup original stats
1362             if (handle->kvs) {
1363                 _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1364             } else {
1365                 _kvs_stat_get(handle->file, 0, &stat_ori);
1366             }
1367 
1368             if (hdr_bid > handle->last_hdr_bid){
1369                 // uncommitted data exists beyond the last DB header
1370                 // get the last committed seq number
1371                 fdb_seqnum_t seq_commit;
1372                 seq_commit = fdb_kvs_get_committed_seqnum(handle);
1373                 if (seq_commit == 0 || seq_commit < handle->max_seqnum) {
1374                     // In case, snapshot_open is attempted with latest uncommitted
1375                     // sequence number
1376                     header_len = 0;
1377                 }
1378             }
1379             // Reverse scan the file to locate the DB header with seqnum marker
1380             while (header_len && seqnum != handle->max_seqnum) {
1381                 hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
1382                                           header_buf, &header_len, &seqnum,
1383                                           &handle->log_callback);
1384                 if (header_len == 0) {
1385                     continue; // header doesn't exist
1386                 }
1387                 fdb_fetch_header(header_buf, &trie_root_bid,
1388                                  &seq_root_bid, &ndocs, &nlivenodes,
1389                                  &datasize, &last_wal_flush_hdr_bid,
1390                                  &kv_info_offset, &header_flags,
1391                                  &compacted_filename, NULL);
1392                 handle->last_hdr_bid = hdr_bid;
1393 
1394                 if (!handle->kvs || handle->kvs->id == 0) {
1395                     // single KVS mode OR default KVS
1396                     if (!handle->shandle) {
1397                         // rollback
1398                         struct kvs_stat stat_dst;
1399                         _kvs_stat_get(handle->file, 0, &stat_dst);
1400                         stat_dst.ndocs = ndocs;
1401                         stat_dst.datasize = datasize;
1402                         stat_dst.nlivenodes = nlivenodes;
1403                         _kvs_stat_set(handle->file, 0, stat_dst);
1404                     }
1405                     continue;
1406                 }
1407 
1408                 uint64_t doc_offset;
1409                 struct kvs_header *kv_header;
1410                 struct docio_object doc;
1411 
1412                 _fdb_kvs_header_create(&kv_header);
1413                 memset(&doc, 0, sizeof(struct docio_object));
1414                 doc_offset = docio_read_doc(handle->dhandle,
1415                                             kv_info_offset, &doc, true);
1416 
1417                 if (doc_offset == kv_info_offset) {
1418                     header_len = 0; // fail
1419                     _fdb_kvs_header_free(kv_header);
1420                 } else {
1421                     _fdb_kvs_header_import(kv_header, doc.body,
1422                                            doc.length.bodylen, false);
1423                     // get local sequence number for the KV instance
1424                     seqnum = _fdb_kvs_get_seqnum(kv_header,
1425                                                  handle->kvs->id);
1426                     if (!handle->shandle) {
1427                         // rollback: replace kv_header stats
1428                         // read from the current header's kv_header
1429                         struct kvs_stat stat_src, stat_dst;
1430                         _kvs_stat_get_kv_header(kv_header,
1431                                                 handle->kvs->id,
1432                                                 &stat_src);
1433                         _kvs_stat_get(handle->file,
1434                                       handle->kvs->id,
1435                                       &stat_dst);
1436                         // update ndocs, datasize, nlivenodes
1437                         // into the current file's kv_header
1438                         // Note: stats related to WAL should not be updated
1439                         //       at this time. They will be adjusted through
1440                         //       discard & restore routines below.
1441                         stat_dst.ndocs = stat_src.ndocs;
1442                         stat_dst.datasize = stat_src.datasize;
1443                         stat_dst.nlivenodes = stat_src.nlivenodes;
1444                         _kvs_stat_set(handle->file,
1445                                       handle->kvs->id,
1446                                       stat_dst);
1447                     }
1448                     _fdb_kvs_header_free(kv_header);
1449                     free_docio_object(&doc, 1, 1, 1);
1450                 }
1451             }
1452             if (!header_len) { // Marker MUST match that of DB commit!
1453                 // rollback original stats
1454                 if (handle->kvs) {
1455                     _kvs_stat_get(handle->file, handle->kvs->id, &stat_ori);
1456                 } else {
1457                     _kvs_stat_get(handle->file, 0, &stat_ori);
1458                 }
1459 
1460                 docio_free(handle->dhandle);
1461                 free(handle->dhandle);
1462                 free(handle->filename);
1463                 free(prev_filename);
1464                 handle->filename = NULL;
1465                 filemgr_close(handle->file, false, handle->filename,
1466                               &handle->log_callback);
1467                 return FDB_RESULT_NO_DB_INSTANCE;
1468             }
1469 
1470             if (!handle->shandle) { // Rollback mode, destroy file WAL..
1471                 if (handle->config.multi_kv_instances) {
1472                     // multi KV instance mode
1473                     // clear only WAL items belonging to the instance
1474                     wal_close_kv_ins(handle->file,
1475                                      (handle->kvs)?(handle->kvs->id):(0));
1476                 } else {
1477                     wal_shutdown(handle->file);
1478                 }
1479             }
1480         } else { // snapshot to sequence number 0 requested..
1481             if (handle->shandle) { // fdb_snapshot_open API call
1482                 if (seqnum) {
1483                     // Database currently has a non-zero seq number,
1484                     // but the snapshot was requested with a seq number zero.
1485                     docio_free(handle->dhandle);
1486                     free(handle->dhandle);
1487                     free(handle->filename);
1488                     free(prev_filename);
1489                     handle->filename = NULL;
1490                     filemgr_close(handle->file, false, handle->filename,
1491                                   &handle->log_callback);
1492                     return FDB_RESULT_NO_DB_INSTANCE;
1493                 }
1494             } // end of zero max_seqnum but non-rollback check
1495         } // end of zero max_seqnum check
1496     } // end of durable snapshot locating
1497 
1498     handle->btreeblkops = btreeblk_get_ops();
1499     handle->bhandle = (struct btreeblk_handle *)
1500                       calloc(1, sizeof(struct btreeblk_handle));
1501     handle->bhandle->log_callback = &handle->log_callback;
1502 
1503     handle->dirty_updates = 0;
1504 
1505     if (handle->config.compaction_buf_maxsize == 0) {
1506         handle->config.compaction_buf_maxsize = FDB_COMP_BUF_MINSIZE;
1507     }
1508 
1509     btreeblk_init(handle->bhandle, handle->file, handle->file->blocksize);
1510 
1511     handle->cur_header_revnum = header_revnum;
1512     handle->last_wal_flush_hdr_bid = last_wal_flush_hdr_bid;
1513 
1514     memset(&empty_stat, 0x0, sizeof(empty_stat));
1515     _kvs_stat_get(handle->file, 0, &stat);
1516     if (!memcmp(&stat, &empty_stat, sizeof(stat))) { // first open
1517         // sync (default) KVS stat with DB header
1518         stat.nlivenodes = nlivenodes;
1519         stat.ndocs = ndocs;
1520         stat.datasize = datasize;
1521         _kvs_stat_set(handle->file, 0, stat);
1522     }
1523 
1524     if (handle->config.multi_kv_instances && !handle->shandle) {
1525         // multi KV instance mode
1526         filemgr_mutex_lock(handle->file);
1527         if (kv_info_offset == BLK_NOT_FOUND) {
1528             // there is no KV header .. create & initialize
1529             fdb_kvs_header_create(handle->file);
1530             kv_info_offset = fdb_kvs_header_append(handle->file, handle->dhandle);
1531         } else if (handle->file->kv_header == NULL) {
1532             // KV header already exists but not loaded .. read & import
1533             fdb_kvs_header_create(handle->file);
1534             fdb_kvs_header_read(handle->file, handle->dhandle, kv_info_offset, false);
1535         }
1536         filemgr_mutex_unlock(handle->file);
1537 
1538         // validation check for key order of all KV stores
1539         if (handle == handle->fhandle->root) {
1540             fdb_status fs = fdb_kvs_cmp_check(handle);
1541             if (fs != FDB_RESULT_SUCCESS) { // cmp function mismatch
1542                 docio_free(handle->dhandle);
1543                 free(handle->dhandle);
1544                 btreeblk_free(handle->bhandle);
1545                 free(handle->bhandle);
1546                 free(handle->filename);
1547                 handle->filename = NULL;
1548                 filemgr_close(handle->file, false, handle->filename,
1549                               &handle->log_callback);
1550                 return fs;
1551             }
1552         }
1553     }
1554     handle->kv_info_offset = kv_info_offset;
1555 
1556     if (handle->kv_info_offset != BLK_NOT_FOUND &&
1557         handle->kvs == NULL) {
1558         // multi KV instance mode .. turn on config flag
1559         handle->config.multi_kv_instances = true;
1560         // only super handle can be opened using fdb_open(...)
1561         fdb_kvs_info_create(NULL, handle, handle->file, NULL);
1562     }
1563 
1564     if (handle->shandle) { // Populate snapshot stats..
1565         if (kv_info_offset == BLK_NOT_FOUND) { // Single KV mode
1566             memset(&handle->shandle->stat, 0x0,
1567                     sizeof(handle->shandle->stat));
1568             handle->shandle->stat.ndocs = ndocs;
1569             handle->shandle->stat.datasize = datasize;
1570             handle->shandle->stat.nlivenodes = nlivenodes;
1571         } else { // Multi KV instance mode, populate specific kv stats
1572             memset(&handle->shandle->stat, 0x0,
1573                     sizeof(handle->shandle->stat));
1574             _kvs_stat_get(handle->file, handle->kvs->id,
1575                     &handle->shandle->stat);
1576             // Since wal is restored below, we have to reset
1577             // wal stats to zero.
1578             handle->shandle->stat.wal_ndeletes = 0;
1579             handle->shandle->stat.wal_ndocs = 0;
1580         }
1581     }
1582 
1583     // initialize pointer to the global operational stats of this KV store
1584     handle->op_stats = filemgr_get_ops_stats(handle->file, handle->kvs);
1585     fdb_assert(handle->op_stats, 0, 0);
1586 
1587     handle->trie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1588     hbtrie_init(handle->trie, config->chunksize, OFFSET_SIZE,
1589                 handle->file->blocksize, trie_root_bid,
1590                 (void *)handle->bhandle, handle->btreeblkops,
1591                 (void *)handle->dhandle, _fdb_readkey_wrap);
1592     // set aux for cmp wrapping function
1593     hbtrie_set_leaf_height_limit(handle->trie, 0xff);
1594     hbtrie_set_leaf_cmp(handle->trie, _fdb_custom_cmp_wrap);
1595 
1596     if (handle->kvs) {
1597         hbtrie_set_map_function(handle->trie, fdb_kvs_find_cmp_chunk);
1598     }
1599 
1600     if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1601         handle->seqnum = seqnum;
1602 
1603         if (handle->config.multi_kv_instances) {
1604             // multi KV instance mode .. HB+trie
1605             handle->seqtrie = (struct hbtrie *)malloc(sizeof(struct hbtrie));
1606             hbtrie_init(handle->seqtrie, sizeof(fdb_kvs_id_t), OFFSET_SIZE,
1607                         handle->file->blocksize, seq_root_bid,
1608                         (void *)handle->bhandle, handle->btreeblkops,
1609                         (void *)handle->dhandle, _fdb_readseq_wrap);
1610 
1611         } else {
1612             // single KV instance mode .. normal B+tree
1613             struct btree_kv_ops *seq_kv_ops =
1614                 (struct btree_kv_ops *)malloc(sizeof(struct btree_kv_ops));
1615             seq_kv_ops = btree_kv_get_kb64_vb64(seq_kv_ops);
1616             seq_kv_ops->cmp = _cmp_uint64_t_endian_safe;
1617 
1618             handle->seqtree = (struct btree*)malloc(sizeof(struct btree));
1619             if (seq_root_bid == BLK_NOT_FOUND) {
1620                 btree_init(handle->seqtree, (void *)handle->bhandle,
1621                            handle->btreeblkops, seq_kv_ops,
1622                            handle->config.blocksize, sizeof(fdb_seqnum_t),
1623                            OFFSET_SIZE, 0x0, NULL);
1624              }else{
1625                  btree_init_from_bid(handle->seqtree, (void *)handle->bhandle,
1626                                      handle->btreeblkops, seq_kv_ops,
1627                                      handle->config.blocksize, seq_root_bid);
1628              }
1629         }
1630     }else{
1631         handle->seqtree = NULL;
1632     }
1633 
1634     if (handle->config.multi_kv_instances && handle->max_seqnum) {
1635         // restore only docs belonging to the KV instance
1636         // handle->kvs should not be NULL
1637         _fdb_restore_wal(handle, FDB_RESTORE_KV_INS,
1638                          hdr_bid, (handle->kvs)?(handle->kvs->id):(0));
1639     } else {
1640         // normal restore
1641         _fdb_restore_wal(handle, FDB_RESTORE_NORMAL, hdr_bid, 0);
1642     }
1643 
1644     if (compacted_filename &&
1645         filemgr_get_file_status(handle->file) == FILE_NORMAL &&
1646         !(config->flags & FDB_OPEN_FLAG_RDONLY)) { // do not recover read-only
1647         _fdb_recover_compaction(handle, compacted_filename);
1648     }
1649 
1650     if (prev_filename) {
1651         if (!handle->shandle && strcmp(prev_filename, handle->file->filename)) {
1652             // record the old filename into the file handle of current file
1653             // and REMOVE old file on the first open
1654             // WARNING: snapshots must have been opened before this call
1655             if (filemgr_update_file_status(handle->file,
1656                                            filemgr_get_file_status(handle->file),
1657                                            prev_filename)) {
1658                 // Open the old file with read-only mode.
1659                 // (Temporarily disable log callback at this time since
1660                 //  the old file might be already removed.)
1661                 fconfig.options = FILEMGR_READONLY;
1662                 filemgr_open_result result = filemgr_open(prev_filename,
1663                                                           handle->fileops,
1664                                                           &fconfig,
1665                                                           NULL);
1666                 if (result.file) {
1667                     filemgr_remove_pending(result.file, handle->file);
1668                     filemgr_close(result.file, 0, handle->filename,
1669                                   &handle->log_callback);
1670                 }
1671             }
1672         } else {
1673             free(prev_filename);
1674         }
1675     }
1676 
1677     status = btreeblk_end(handle->bhandle);
1678     fdb_assert(status == FDB_RESULT_SUCCESS, status, handle);
1679 
1680     // do not register read-only handles
1681     if (!(config->flags & FDB_OPEN_FLAG_RDONLY) &&
1682         config->compaction_mode == FDB_COMPACTION_AUTO) {
1683         status = compactor_register_file(handle->file, (fdb_config *)config,
1684                                          handle->fhandle->cmp_func_list,
1685                                          &handle->log_callback);
1686     }
1687 
1688 #ifdef _TRACE_HANDLES
1689     spin_lock(&open_handle_lock);
1690     avl_insert(&open_handles, &handle->avl_trace, _fdb_handle_cmp);
1691     spin_unlock(&open_handle_lock);
1692 #endif
1693     return status;
1694 }
1695 
1696 LIBFDB_API
fdb_set_log_callback(fdb_kvs_handle *handle, fdb_log_callback log_callback, void *ctx_data)1697 fdb_status fdb_set_log_callback(fdb_kvs_handle *handle,
1698                                 fdb_log_callback log_callback,
1699                                 void *ctx_data)
1700 {
1701     handle->log_callback.callback = log_callback;
1702     handle->log_callback.ctx_data = ctx_data;
1703     return FDB_RESULT_SUCCESS;
1704 }
1705 
1706 LIBFDB_API
fdb_doc_create(fdb_doc **doc, const void *key, size_t keylen, const void *meta, size_t metalen, const void *body, size_t bodylen)1707 fdb_status fdb_doc_create(fdb_doc **doc, const void *key, size_t keylen,
1708                           const void *meta, size_t metalen,
1709                           const void *body, size_t bodylen)
1710 {
1711     if (doc == NULL || keylen > FDB_MAX_KEYLEN ||
1712         metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
1713         return FDB_RESULT_INVALID_ARGS;
1714     }
1715 
1716     *doc = (fdb_doc*)calloc(1, sizeof(fdb_doc));
1717     if (*doc == NULL) { // LCOV_EXCL_START
1718         return FDB_RESULT_ALLOC_FAIL;
1719     } // LCOV_EXCL_STOP
1720 
1721     (*doc)->seqnum = SEQNUM_NOT_USED;
1722 
1723     if (key && keylen > 0) {
1724         (*doc)->key = (void *)malloc(keylen);
1725         if ((*doc)->key == NULL) { // LCOV_EXCL_START
1726             return FDB_RESULT_ALLOC_FAIL;
1727         } // LCOV_EXCL_STOP
1728         memcpy((*doc)->key, key, keylen);
1729         (*doc)->keylen = keylen;
1730     } else {
1731         (*doc)->key = NULL;
1732         (*doc)->keylen = 0;
1733     }
1734 
1735     if (meta && metalen > 0) {
1736         (*doc)->meta = (void *)malloc(metalen);
1737         if ((*doc)->meta == NULL) { // LCOV_EXCL_START
1738             return FDB_RESULT_ALLOC_FAIL;
1739         } // LCOV_EXCL_STOP
1740         memcpy((*doc)->meta, meta, metalen);
1741         (*doc)->metalen = metalen;
1742     } else {
1743         (*doc)->meta = NULL;
1744         (*doc)->metalen = 0;
1745     }
1746 
1747     if (body && bodylen > 0) {
1748         (*doc)->body = (void *)malloc(bodylen);
1749         if ((*doc)->body == NULL) { // LCOV_EXCL_START
1750             return FDB_RESULT_ALLOC_FAIL;
1751         } // LCOV_EXCL_STOP
1752         memcpy((*doc)->body, body, bodylen);
1753         (*doc)->bodylen = bodylen;
1754     } else {
1755         (*doc)->body = NULL;
1756         (*doc)->bodylen = 0;
1757     }
1758 
1759     (*doc)->size_ondisk = 0;
1760     (*doc)->deleted = false;
1761 
1762     return FDB_RESULT_SUCCESS;
1763 }
1764 
1765 LIBFDB_API
fdb_doc_update(fdb_doc **doc, const void *meta, size_t metalen, const void *body, size_t bodylen)1766 fdb_status fdb_doc_update(fdb_doc **doc,
1767                           const void *meta, size_t metalen,
1768                           const void *body, size_t bodylen)
1769 {
1770     if (doc == NULL ||
1771         metalen > FDB_MAX_METALEN || bodylen > FDB_MAX_BODYLEN) {
1772         return FDB_RESULT_INVALID_ARGS;
1773     }
1774     if (*doc == NULL) {
1775         return FDB_RESULT_INVALID_ARGS;
1776     }
1777 
1778     if (meta && metalen > 0) {
1779         // free previous metadata
1780         free((*doc)->meta);
1781         // allocate new metadata
1782         (*doc)->meta = (void *)malloc(metalen);
1783         if ((*doc)->meta == NULL) { // LCOV_EXCL_START
1784             return FDB_RESULT_ALLOC_FAIL;
1785         } // LCOV_EXCL_STOP
1786         memcpy((*doc)->meta, meta, metalen);
1787         (*doc)->metalen = metalen;
1788     }
1789 
1790     if (body && bodylen > 0) {
1791         // free previous body
1792         free((*doc)->body);
1793         // allocate new body
1794         (*doc)->body = (void *)malloc(bodylen);
1795         if ((*doc)->body == NULL) { // LCOV_EXCL_START
1796             return FDB_RESULT_ALLOC_FAIL;
1797         } // LCOV_EXCL_STOP
1798         memcpy((*doc)->body, body, bodylen);
1799         (*doc)->bodylen = bodylen;
1800     }
1801 
1802     return FDB_RESULT_SUCCESS;
1803 }
1804 
1805 // doc MUST BE allocated by malloc
1806 LIBFDB_API
fdb_doc_free(fdb_doc *doc)1807 fdb_status fdb_doc_free(fdb_doc *doc)
1808 {
1809     if (doc) {
1810         free(doc->key);
1811         free(doc->meta);
1812         free(doc->body);
1813         free(doc);
1814     }
1815     return FDB_RESULT_SUCCESS;
1816 }
1817 
_fdb_wal_get_old_offset(void *voidhandle, struct wal_item *item)1818 INLINE uint64_t _fdb_wal_get_old_offset(void *voidhandle,
1819                                         struct wal_item *item)
1820 {
1821     fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
1822     uint64_t old_offset = 0;
1823 
1824     hbtrie_find_offset(handle->trie,
1825                        item->header->key,
1826                        item->header->keylen,
1827                        (void*)&old_offset);
1828     btreeblk_end(handle->bhandle);
1829     old_offset = _endian_decode(old_offset);
1830 
1831     return old_offset;
1832 }
1833 
_fdb_wal_snapshot_func(void *handle, fdb_doc *doc, uint64_t offset)1834 INLINE fdb_status _fdb_wal_snapshot_func(void *handle, fdb_doc *doc,
1835                                          uint64_t offset) {
1836 
1837     return snap_insert((struct snap_handle *)handle, doc, offset);
1838 }
1839 
_fdb_wal_flush_func(void *voidhandle, struct wal_item *item)1840 INLINE fdb_status _fdb_wal_flush_func(void *voidhandle, struct wal_item *item)
1841 {
1842     hbtrie_result hr;
1843     fdb_kvs_handle *handle = (fdb_kvs_handle *)voidhandle;
1844     fdb_seqnum_t _seqnum;
1845     fdb_kvs_id_t kv_id;
1846     fdb_status fs = FDB_RESULT_SUCCESS;
1847     uint8_t *var_key = alca(uint8_t, handle->config.chunksize);
1848     int size_id, size_seq;
1849     uint8_t *kvid_seqnum;
1850     uint64_t old_offset, _offset;
1851     int delta, r;
1852     struct filemgr *file = handle->dhandle->file;
1853     struct kvs_stat stat;
1854 
1855     memset(var_key, 0, handle->config.chunksize);
1856     if (handle->kvs) {
1857         buf2kvid(handle->config.chunksize, item->header->key, &kv_id);
1858     } else {
1859         kv_id = 0;
1860     }
1861 
1862     if (item->action == WAL_ACT_INSERT ||
1863         item->action == WAL_ACT_LOGICAL_REMOVE) {
1864         _offset = _endian_encode(item->offset);
1865 
1866         r = _kvs_stat_get(file, kv_id, &stat);
1867         if (r != 0) {
1868             // KV store corresponding to kv_id is already removed
1869             // skip this item
1870             return FDB_RESULT_SUCCESS;
1871         }
1872         handle->bhandle->nlivenodes = stat.nlivenodes;
1873 
1874         hr = hbtrie_insert(handle->trie,
1875                            item->header->key,
1876                            item->header->keylen,
1877                            (void *)&_offset,
1878                            (void *)&old_offset);
1879 
1880         fs = btreeblk_end(handle->bhandle);
1881         if (fs != FDB_RESULT_SUCCESS) {
1882             return fs;
1883         }
1884         old_offset = _endian_decode(old_offset);
1885 
1886         if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1887             _seqnum = _endian_encode(item->seqnum);
1888             if (handle->kvs) {
1889                 // multi KV instance mode .. HB+trie
1890                 uint64_t old_offset_local;
1891 
1892                 size_id = sizeof(fdb_kvs_id_t);
1893                 size_seq = sizeof(fdb_seqnum_t);
1894                 kvid_seqnum = alca(uint8_t, size_id + size_seq);
1895                 kvid2buf(size_id, kv_id, kvid_seqnum);
1896                 memcpy(kvid_seqnum + size_id, &_seqnum, size_seq);
1897                 hbtrie_insert(handle->seqtrie, kvid_seqnum, size_id + size_seq,
1898                               (void *)&_offset, (void *)&old_offset_local);
1899             } else {
1900                 btree_insert(handle->seqtree, (void *)&_seqnum,
1901                              (void *)&_offset);
1902             }
1903             fs = btreeblk_end(handle->bhandle);
1904             if (fs != FDB_RESULT_SUCCESS) {
1905                 return fs;
1906             }
1907         }
1908 
1909         delta = (int)handle->bhandle->nlivenodes - (int)stat.nlivenodes;
1910         _kvs_stat_update_attr(file, kv_id, KVS_STAT_NLIVENODES, delta);
1911 
1912         if (hr == HBTRIE_RESULT_SUCCESS) {
1913             if (item->action == WAL_ACT_INSERT) {
1914                 _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, 1);
1915             }
1916             _kvs_stat_update_attr(file, kv_id, KVS_STAT_DATASIZE,
1917                                   item->doc_size);
1918         } else { // update or logical delete
1919             struct docio_length len;
1920             // This block is already cached when we call HBTRIE_INSERT.
1921             // No additional block access.
1922             len = docio_read_doc_length(handle->dhandle, old_offset);
1923 
1924             if (!(len.flag & DOCIO_DELETED)) {
1925                 if (item->action == WAL_ACT_LOGICAL_REMOVE) {
1926                     _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, -1);
1927                 }
1928             } else {
1929                 if (item->action == WAL_ACT_INSERT) {
1930                     _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, 1);
1931                 }
1932             }
1933 
1934             delta = (int)item->doc_size - (int)_fdb_get_docsize(len);
1935             _kvs_stat_update_attr(file, kv_id, KVS_STAT_DATASIZE, delta);
1936         }
1937     } else {
1938         // Immediate remove
1939         // LCOV_EXCL_START
1940         hr = hbtrie_remove(handle->trie, item->header->key,
1941                            item->header->keylen);
1942         fs = btreeblk_end(handle->bhandle);
1943         if (fs != FDB_RESULT_SUCCESS) {
1944             return fs;
1945         }
1946 
1947         if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1948             _seqnum = _endian_encode(item->seqnum);
1949             if (handle->kvs) {
1950                 // multi KV instance mode .. HB+trie
1951                 size_id = sizeof(fdb_kvs_id_t);
1952                 size_seq = sizeof(fdb_seqnum_t);
1953                 kvid_seqnum = alca(uint8_t, size_id + size_seq);
1954                 kvid2buf(size_id, kv_id, kvid_seqnum);
1955                 memcpy(kvid_seqnum + size_id, &_seqnum, size_seq);
1956 
1957                 hbtrie_remove(handle->seqtrie, (void*)kvid_seqnum,
1958                               size_id + size_seq);
1959             } else {
1960                 btree_remove(handle->seqtree, (void*)&_seqnum);
1961             }
1962             fs = btreeblk_end(handle->bhandle);
1963             if (fs != FDB_RESULT_SUCCESS) {
1964                 return fs;
1965             }
1966         }
1967 
1968         if (hr == HBTRIE_RESULT_SUCCESS) {
1969             _kvs_stat_update_attr(file, kv_id, KVS_STAT_NDOCS, -1);
1970             delta = -(int)item->doc_size;
1971             _kvs_stat_update_attr(file, kv_id, KVS_STAT_DATASIZE, delta);
1972         }
1973         // LCOV_EXCL_STOP
1974     }
1975     return FDB_RESULT_SUCCESS;
1976 }
1977 
fdb_sync_db_header(fdb_kvs_handle *handle)1978 void fdb_sync_db_header(fdb_kvs_handle *handle)
1979 {
1980     uint64_t cur_revnum = filemgr_get_header_revnum(handle->file);
1981     if (handle->cur_header_revnum != cur_revnum) {
1982         void *header_buf = NULL;
1983         size_t header_len;
1984 
1985         handle->last_hdr_bid = filemgr_get_header_bid(handle->file);
1986         header_buf = filemgr_get_header(handle->file, NULL, &header_len,
1987                                         NULL, NULL, NULL);
1988         if (header_len > 0) {
1989             uint64_t header_flags, dummy64;
1990             bid_t idtree_root;
1991             bid_t new_seq_root;
1992             char *compacted_filename;
1993             char *prev_filename = NULL;
1994 
1995             fdb_fetch_header(header_buf, &idtree_root,
1996                              &new_seq_root,
1997                              &dummy64, &dummy64,
1998                              &dummy64, &handle->last_wal_flush_hdr_bid,
1999                              &handle->kv_info_offset, &header_flags,
2000                              &compacted_filename, &prev_filename);
2001 
2002             if (handle->dirty_updates) {
2003                 // discard all cached writable b+tree nodes
2004                 // to avoid data inconsistency with other writers
2005                 btreeblk_discard_blocks(handle->bhandle);
2006             }
2007 
2008             handle->trie->root_bid = idtree_root;
2009 
2010             if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2011                 if (new_seq_root != handle->seqtree->root_bid) {
2012                     if (handle->config.multi_kv_instances) {
2013                         handle->seqtrie->root_bid = new_seq_root;
2014                     } else {
2015                         btree_init_from_bid(handle->seqtree,
2016                                             handle->seqtree->blk_handle,
2017                                             handle->seqtree->blk_ops,
2018                                             handle->seqtree->kv_ops,
2019                                             handle->seqtree->blksize,
2020                                             new_seq_root);
2021                     }
2022                 }
2023             }
2024 
2025             if (prev_filename) {
2026                 free(prev_filename);
2027             }
2028 
2029             handle->cur_header_revnum = cur_revnum;
2030             handle->dirty_updates = 0;
2031             if (handle->kvs) {
2032                 // multiple KV instance mode AND sub handle
2033                 handle->seqnum = fdb_kvs_get_seqnum(handle->file,
2034                                                     handle->kvs->id);
2035             } else {
2036                 // super handle OR single KV instance mode
2037                 handle->seqnum = filemgr_get_seqnum(handle->file);
2038             }
2039         }
2040         if (header_buf) {
2041             free(header_buf);
2042         }
2043     }
2044 }
2045 
fdb_check_file_reopen(fdb_kvs_handle *handle, file_status_t *status)2046 fdb_status fdb_check_file_reopen(fdb_kvs_handle *handle, file_status_t *status)
2047 {
2048     fdb_status fs = FDB_RESULT_SUCCESS;
2049     file_status_t fstatus = filemgr_get_file_status(handle->file);
2050     // check whether the compaction is done
2051     if (fstatus == FILE_REMOVED_PENDING) {
2052         uint64_t ndocs, datasize, nlivenodes, last_wal_flush_hdr_bid;
2053         uint64_t kv_info_offset, header_flags;
2054         size_t header_len;
2055         char *new_filename;
2056         uint8_t *buf = alca(uint8_t, handle->config.blocksize);
2057         bid_t trie_root_bid, seq_root_bid;
2058         fdb_config config = handle->config;
2059 
2060         // close the current file and newly open the new file
2061         if (handle->config.compaction_mode == FDB_COMPACTION_AUTO) {
2062             // compaction daemon mode .. just close and then open
2063             char filename[FDB_MAX_FILENAME_LEN];
2064             strcpy(filename, handle->filename);
2065             fs = _fdb_close(handle);
2066             fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
2067             fs = _fdb_open(handle, filename, FDB_VFILENAME, &config);
2068             fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
2069         } else {
2070             filemgr_get_header(handle->file, buf, &header_len, NULL, NULL, NULL);
2071             fdb_fetch_header(buf,
2072                              &trie_root_bid, &seq_root_bid,
2073                              &ndocs, &nlivenodes, &datasize, &last_wal_flush_hdr_bid,
2074                              &kv_info_offset, &header_flags,
2075                              &new_filename, NULL);
2076             fs = _fdb_close(handle);
2077             fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
2078             fs = _fdb_open(handle, new_filename, FDB_AFILENAME, &config);
2079             fdb_assert(fs == FDB_RESULT_SUCCESS, fs, handle);
2080         }
2081     }
2082     if (status) {
2083         *status = fstatus;
2084     }
2085     return fs;
2086 }
2087 
_fdb_sync_dirty_root(fdb_kvs_handle *handle)2088 static bool _fdb_sync_dirty_root(fdb_kvs_handle *handle)
2089 {
2090     bool locked = false;
2091     bid_t dirty_idtree_root, dirty_seqtree_root;
2092 
2093     if (handle->shandle) {
2094         // skip snapshot
2095         return locked;
2096     }
2097 
2098     if ( ( handle->dirty_updates ||
2099            filemgr_dirty_root_exist(handle->file) )  &&
2100          filemgr_get_header_bid(handle->file) == handle->last_hdr_bid ) {
2101         // 1) { a) dirty WAL flush by this handle exists OR
2102         //      b) dirty WAL flush by other handle exists } AND
2103         // 2) no commit was performed yet.
2104         // grab lock for writer
2105         filemgr_mutex_lock(handle->file);
2106         locked = true;
2107 
2108         // get dirty root nodes
2109         filemgr_get_dirty_root(handle->file,
2110                                &dirty_idtree_root, &dirty_seqtree_root);
2111         if (dirty_idtree_root != BLK_NOT_FOUND) {
2112             handle->trie->root_bid = dirty_idtree_root;
2113         }
2114         if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2115             if (dirty_seqtree_root != BLK_NOT_FOUND) {
2116                 if (handle->kvs) {
2117                     handle->seqtrie->root_bid = dirty_seqtree_root;
2118                 } else {
2119                     btree_init_from_bid(handle->seqtree,
2120                                         handle->seqtree->blk_handle,
2121                                         handle->seqtree->blk_ops,
2122                                         handle->seqtree->kv_ops,
2123                                         handle->seqtree->blksize,
2124                                         dirty_seqtree_root);
2125                 }
2126             }
2127         }
2128         btreeblk_discard_blocks(handle->bhandle);
2129     }
2130     return locked;
2131 }
2132 
2133 LIBFDB_API
fdb_get(fdb_kvs_handle *handle, fdb_doc *doc)2134 fdb_status fdb_get(fdb_kvs_handle *handle, fdb_doc *doc)
2135 {
2136     uint64_t offset, _offset;
2137     struct docio_object _doc;
2138     struct filemgr *wal_file = NULL;
2139     struct docio_handle *dhandle;
2140     fdb_status wr;
2141     hbtrie_result hr = HBTRIE_RESULT_FAIL;
2142     fdb_txn *txn;
2143     fdb_doc doc_kv = *doc;
2144 
2145     if (!handle || !doc || !doc->key || doc->keylen == 0 ||
2146         doc->keylen > FDB_MAX_KEYLEN ||
2147         (handle->kvs_config.custom_cmp &&
2148             doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2149         return FDB_RESULT_INVALID_ARGS;
2150     }
2151 
2152     if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2153         return FDB_RESULT_HANDLE_BUSY;
2154     }
2155 
2156     if (handle->kvs) {
2157         // multi KV instance mode
2158         int size_chunk = handle->config.chunksize;
2159         doc_kv.keylen = doc->keylen + size_chunk;
2160         doc_kv.key = alca(uint8_t, doc_kv.keylen);
2161         kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
2162         memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
2163     }
2164 
2165     if (!handle->shandle) {
2166         fdb_check_file_reopen(handle, NULL);
2167         fdb_sync_db_header(handle);
2168 
2169         wal_file = handle->file;
2170         dhandle = handle->dhandle;
2171 
2172         txn = handle->fhandle->root->txn;
2173         if (!txn) {
2174             txn = &wal_file->global_txn;
2175         }
2176         if (handle->kvs) {
2177             wr = wal_find(txn, wal_file, &doc_kv, &offset);
2178         } else {
2179             wr = wal_find(txn, wal_file, doc, &offset);
2180         }
2181     } else {
2182         if (handle->kvs) {
2183             wr = snap_find(handle->shandle, &doc_kv, &offset);
2184         } else {
2185             wr = snap_find(handle->shandle, doc, &offset);
2186         }
2187         dhandle = handle->dhandle;
2188     }
2189 
2190     atomic_incr_uint64_t(&handle->op_stats->num_gets);
2191 
2192     if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2193         bool locked = _fdb_sync_dirty_root(handle);
2194 
2195         if (handle->kvs) {
2196             hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
2197                              (void *)&offset);
2198         } else {
2199             hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
2200                              (void *)&offset);
2201         }
2202         btreeblk_end(handle->bhandle);
2203         offset = _endian_decode(offset);
2204 
2205         if (locked) {
2206             // grab lock for writer if there are dirty updates
2207             filemgr_mutex_unlock(handle->file);
2208         }
2209     }
2210 
2211     if (wr == FDB_RESULT_SUCCESS || hr != HBTRIE_RESULT_FAIL) {
2212         bool alloced_meta = doc->meta ? false : true;
2213         bool alloced_body = doc->body ? false : true;
2214         if (handle->kvs) {
2215             _doc.key = doc_kv.key;
2216             _doc.length.keylen = doc_kv.keylen;
2217         } else {
2218             _doc.key = doc->key;
2219             _doc.length.keylen = doc->keylen;
2220         }
2221         _doc.meta = doc->meta;
2222         _doc.body = doc->body;
2223 
2224         if (wr == FDB_RESULT_SUCCESS && doc->deleted) {
2225             fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2226             return FDB_RESULT_KEY_NOT_FOUND;
2227         }
2228 
2229         _offset = docio_read_doc(dhandle, offset, &_doc, true);
2230         if (_offset == offset) {
2231             fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2232             return FDB_RESULT_KEY_NOT_FOUND;
2233         }
2234 
2235         if (_doc.length.keylen != doc_kv.keylen ||
2236             _doc.length.flag & DOCIO_DELETED) {
2237             free_docio_object(&_doc, 0, alloced_meta, alloced_body);
2238             fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2239             return FDB_RESULT_KEY_NOT_FOUND;
2240         }
2241 
2242         doc->seqnum = _doc.seqnum;
2243         doc->metalen = _doc.length.metalen;
2244         doc->bodylen = _doc.length.bodylen;
2245         doc->meta = _doc.meta;
2246         doc->body = _doc.body;
2247         doc->deleted = _doc.length.flag & DOCIO_DELETED;
2248         doc->size_ondisk = _fdb_get_docsize(_doc.length);
2249         doc->offset = offset;
2250 
2251         fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2252         return FDB_RESULT_SUCCESS;
2253     }
2254 
2255     fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2256     return FDB_RESULT_KEY_NOT_FOUND;
2257 }
2258 
2259 // search document metadata using key
2260 LIBFDB_API
fdb_get_metaonly(fdb_kvs_handle *handle, fdb_doc *doc)2261 fdb_status fdb_get_metaonly(fdb_kvs_handle *handle, fdb_doc *doc)
2262 {
2263     uint64_t offset;
2264     struct docio_object _doc;
2265     struct docio_handle *dhandle;
2266     struct filemgr *wal_file = NULL;
2267     fdb_status wr;
2268     hbtrie_result hr = HBTRIE_RESULT_FAIL;
2269     fdb_txn *txn;
2270     fdb_doc doc_kv = *doc;
2271 
2272     if (!handle || !doc || !doc->key ||
2273         doc->keylen == 0 || doc->keylen > FDB_MAX_KEYLEN ||
2274         (handle->kvs_config.custom_cmp &&
2275             doc->keylen > handle->config.blocksize - HBTRIE_HEADROOM)) {
2276         return FDB_RESULT_INVALID_ARGS;
2277     }
2278 
2279     if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2280         return FDB_RESULT_HANDLE_BUSY;
2281     }
2282 
2283     if (handle->kvs) {
2284         // multi KV instance mode
2285         int size_chunk = handle->config.chunksize;
2286         doc_kv.keylen = doc->keylen + size_chunk;
2287         doc_kv.key = alca(uint8_t, doc_kv.keylen);
2288         kvid2buf(size_chunk, handle->kvs->id, doc_kv.key);
2289         memcpy((uint8_t*)doc_kv.key + size_chunk, doc->key, doc->keylen);
2290     }
2291 
2292     if (!handle->shandle) {
2293         fdb_check_file_reopen(handle, NULL);
2294         fdb_sync_db_header(handle);
2295 
2296         wal_file = handle->file;
2297         dhandle = handle->dhandle;
2298 
2299         txn = handle->fhandle->root->txn;
2300         if (!txn) {
2301             txn = &wal_file->global_txn;
2302         }
2303         if (handle->kvs) {
2304             wr = wal_find(txn, wal_file, &doc_kv, &offset);
2305         } else {
2306             wr = wal_find(txn, wal_file, doc, &offset);
2307         }
2308     } else {
2309         if (handle->kvs) {
2310             wr = snap_find(handle->shandle, &doc_kv, &offset);
2311         } else {
2312             wr = snap_find(handle->shandle, doc, &offset);
2313         }
2314         dhandle = handle->dhandle;
2315     }
2316 
2317     atomic_incr_uint64_t(&handle->op_stats->num_gets);
2318 
2319     if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2320         bool locked = _fdb_sync_dirty_root(handle);
2321 
2322         if (handle->kvs) {
2323             hr = hbtrie_find(handle->trie, doc_kv.key, doc_kv.keylen,
2324                              (void *)&offset);
2325         } else {
2326             hr = hbtrie_find(handle->trie, doc->key, doc->keylen,
2327                              (void *)&offset);
2328         }
2329         btreeblk_end(handle->bhandle);
2330         offset = _endian_decode(offset);
2331 
2332         if (locked) {
2333             filemgr_mutex_unlock(handle->file);
2334         }
2335     }
2336 
2337     if (wr == FDB_RESULT_SUCCESS || hr != HBTRIE_RESULT_FAIL) {
2338         if (handle->kvs) {
2339             _doc.key = doc_kv.key;
2340             _doc.length.keylen = doc_kv.keylen;
2341         } else {
2342             _doc.key = doc->key;
2343             _doc.length.keylen = doc->keylen;
2344         }
2345         bool alloced_meta = doc->meta ? false : true;
2346         _doc.meta = doc->meta;
2347         _doc.body = doc->body;
2348 
2349         uint64_t body_offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
2350                                                        true);
2351         if (body_offset == offset){
2352             fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2353             return FDB_RESULT_KEY_NOT_FOUND;
2354         }
2355 
2356         if (_doc.length.keylen != doc_kv.keylen) {
2357             free_docio_object(&_doc, 0, alloced_meta, 0);
2358             fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2359             return FDB_RESULT_KEY_NOT_FOUND;
2360         }
2361 
2362         doc->seqnum = _doc.seqnum;
2363         doc->metalen = _doc.length.metalen;
2364         doc->bodylen = _doc.length.bodylen;
2365         doc->meta = _doc.meta;
2366         doc->body = _doc.body;
2367         doc->deleted = _doc.length.flag & DOCIO_DELETED;
2368         doc->size_ondisk = _fdb_get_docsize(_doc.length);
2369         doc->offset = offset;
2370 
2371         fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2372         return FDB_RESULT_SUCCESS;
2373     }
2374 
2375     fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2376     return FDB_RESULT_KEY_NOT_FOUND;
2377 }
2378 
2379 // search document using sequence number
2380 LIBFDB_API
fdb_get_byseq(fdb_kvs_handle *handle, fdb_doc *doc)2381 fdb_status fdb_get_byseq(fdb_kvs_handle *handle, fdb_doc *doc)
2382 {
2383     uint64_t offset, _offset;
2384     struct docio_object _doc;
2385     struct docio_handle *dhandle;
2386     struct filemgr *wal_file = NULL;
2387     fdb_status wr;
2388     btree_result br = BTREE_RESULT_FAIL;
2389     fdb_seqnum_t _seqnum;
2390     fdb_txn *txn;
2391 
2392     if (!handle || !doc || doc->seqnum == SEQNUM_NOT_USED) {
2393         return FDB_RESULT_INVALID_ARGS;
2394     }
2395 
2396     // Sequence trees are a must for byseq operations
2397     if (handle->config.seqtree_opt != FDB_SEQTREE_USE) {
2398         return FDB_RESULT_INVALID_CONFIG;
2399     }
2400 
2401     if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2402         return FDB_RESULT_HANDLE_BUSY;
2403     }
2404 
2405     if (!handle->shandle) {
2406         fdb_check_file_reopen(handle, NULL);
2407         fdb_sync_db_header(handle);
2408 
2409         wal_file = handle->file;
2410         dhandle = handle->dhandle;
2411 
2412         txn = handle->fhandle->root->txn;
2413         if (!txn) {
2414             txn = &wal_file->global_txn;
2415         }
2416         // prevent searching by key in WAL if 'doc' is not empty
2417         size_t key_len = doc->keylen;
2418         doc->keylen = 0;
2419         if (handle->kvs) {
2420             wr = wal_find_kv_id(txn, wal_file, handle->kvs->id, doc, &offset);
2421         } else {
2422             wr = wal_find(txn, wal_file, doc, &offset);
2423         }
2424         doc->keylen = key_len;
2425     } else {
2426         wr = snap_find(handle->shandle, doc, &offset);
2427         dhandle = handle->dhandle;
2428     }
2429 
2430     atomic_incr_uint64_t(&handle->op_stats->num_gets);
2431 
2432     if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2433         bool locked = _fdb_sync_dirty_root(handle);
2434 
2435         _seqnum = _endian_encode(doc->seqnum);
2436         if (handle->kvs) {
2437             int size_id, size_seq;
2438             uint8_t *kv_seqnum;
2439             hbtrie_result hr;
2440             fdb_kvs_id_t _kv_id;
2441 
2442             _kv_id = _endian_encode(handle->kvs->id);
2443             size_id = sizeof(fdb_kvs_id_t);
2444             size_seq = sizeof(fdb_seqnum_t);
2445             kv_seqnum = alca(uint8_t, size_id + size_seq);
2446             memcpy(kv_seqnum, &_kv_id, size_id);
2447             memcpy(kv_seqnum + size_id, &_seqnum, size_seq);
2448             hr = hbtrie_find(handle->seqtrie, (void *)kv_seqnum,
2449                              size_id + size_seq, (void *)&offset);
2450             br = (hr == HBTRIE_RESULT_SUCCESS)?(BTREE_RESULT_SUCCESS):(br);
2451         } else {
2452             br = btree_find(handle->seqtree, (void *)&_seqnum, (void *)&offset);
2453         }
2454         btreeblk_end(handle->bhandle);
2455         offset = _endian_decode(offset);
2456 
2457         if (locked) {
2458             filemgr_mutex_unlock(handle->file);
2459         }
2460     }
2461 
2462     if (wr == FDB_RESULT_SUCCESS || br != BTREE_RESULT_FAIL) {
2463         bool alloc_key, alloc_meta, alloc_body;
2464         if (!handle->kvs) { // single KVS mode
2465             _doc.key = doc->key;
2466             _doc.length.keylen = doc->keylen;
2467             alloc_key = doc->key ? false : true;
2468         } else {
2469             _doc.key = NULL;
2470             alloc_key = true;
2471         }
2472         alloc_meta = doc->meta ? false : true;
2473         _doc.meta = doc->meta;
2474         alloc_body = doc->body ? false : true;
2475         _doc.body = doc->body;
2476 
2477         if (wr == FDB_RESULT_SUCCESS && doc->deleted) {
2478             fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2479             return FDB_RESULT_KEY_NOT_FOUND;
2480         }
2481 
2482         _offset = docio_read_doc(dhandle, offset, &_doc, true);
2483         if (_offset == offset) {
2484             fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2485             return FDB_RESULT_KEY_NOT_FOUND;
2486         }
2487 
2488         if (_doc.length.flag & DOCIO_DELETED) {
2489             fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2490             free_docio_object(&_doc, alloc_key, alloc_meta, alloc_body);
2491             return FDB_RESULT_KEY_NOT_FOUND;
2492         }
2493 
2494         doc->seqnum = _doc.seqnum;
2495         if (handle->kvs) {
2496             int size_chunk = handle->config.chunksize;
2497             doc->keylen = _doc.length.keylen - size_chunk;
2498             if (doc->key) { // doc->key is given by user
2499                 memcpy(doc->key, (uint8_t*)_doc.key + size_chunk, doc->keylen);
2500                 free_docio_object(&_doc, 1, 0, 0);
2501             } else {
2502                 doc->key = _doc.key;
2503                 memmove(doc->key, (uint8_t*)doc->key + size_chunk, doc->keylen);
2504             }
2505         } else {
2506             doc->keylen = _doc.length.keylen;
2507             doc->key = _doc.key;
2508         }
2509         doc->metalen = _doc.length.metalen;
2510         doc->bodylen = _doc.length.bodylen;
2511         doc->meta = _doc.meta;
2512         doc->body = _doc.body;
2513         doc->deleted = _doc.length.flag & DOCIO_DELETED;
2514         doc->size_ondisk = _fdb_get_docsize(_doc.length);
2515         doc->offset = offset;
2516 
2517         fdb_assert(doc->seqnum == _doc.seqnum, doc->seqnum, _doc.seqnum);
2518 
2519         fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2520         return FDB_RESULT_SUCCESS;
2521     }
2522 
2523     fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2524     return FDB_RESULT_KEY_NOT_FOUND;
2525 }
2526 
2527 // search document metadata using sequence number
2528 LIBFDB_API
fdb_get_metaonly_byseq(fdb_kvs_handle *handle, fdb_doc *doc)2529 fdb_status fdb_get_metaonly_byseq(fdb_kvs_handle *handle, fdb_doc *doc)
2530 {
2531     uint64_t offset;
2532     struct docio_object _doc;
2533     struct docio_handle *dhandle;
2534     struct filemgr *wal_file = NULL;
2535     fdb_status wr;
2536     btree_result br = BTREE_RESULT_FAIL;
2537     fdb_seqnum_t _seqnum;
2538     fdb_txn *txn = handle->fhandle->root->txn;
2539 
2540     if (!handle || !doc || doc->seqnum == SEQNUM_NOT_USED) {
2541         return FDB_RESULT_INVALID_ARGS;
2542     }
2543 
2544     // Sequence trees are a must for byseq operations
2545     if (handle->config.seqtree_opt != FDB_SEQTREE_USE) {
2546         return FDB_RESULT_INVALID_CONFIG;
2547     }
2548 
2549     if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2550         return FDB_RESULT_HANDLE_BUSY;
2551     }
2552 
2553     if (!handle->shandle) {
2554         fdb_check_file_reopen(handle, NULL);
2555         fdb_sync_db_header(handle);
2556 
2557         wal_file = handle->file;
2558         dhandle = handle->dhandle;
2559 
2560         if (!txn) {
2561             txn = &wal_file->global_txn;
2562         }
2563         // prevent searching by key in WAL if 'doc' is not empty
2564         size_t key_len = doc->keylen;
2565         doc->keylen = 0;
2566         if (handle->kvs) {
2567             wr = wal_find_kv_id(txn, wal_file, handle->kvs->id, doc, &offset);
2568         } else {
2569             wr = wal_find(txn, wal_file, doc, &offset);
2570         }
2571         doc->keylen = key_len;
2572     } else {
2573         wr = snap_find(handle->shandle, doc, &offset);
2574         dhandle = handle->dhandle;
2575     }
2576 
2577     atomic_incr_uint64_t(&handle->op_stats->num_gets);
2578 
2579     if (wr == FDB_RESULT_KEY_NOT_FOUND) {
2580         bool locked = _fdb_sync_dirty_root(handle);
2581 
2582         _seqnum = _endian_encode(doc->seqnum);
2583         if (handle->kvs) {
2584             int size_id, size_seq;
2585             uint8_t *kv_seqnum;
2586             hbtrie_result hr;
2587             fdb_kvs_id_t _kv_id;
2588 
2589             _kv_id = _endian_encode(handle->kvs->id);
2590             size_id = sizeof(fdb_kvs_id_t);
2591             size_seq = sizeof(fdb_seqnum_t);
2592             kv_seqnum = alca(uint8_t, size_id + size_seq);
2593             memcpy(kv_seqnum, &_kv_id, size_id);
2594             memcpy(kv_seqnum + size_id, &_seqnum, size_seq);
2595             hr = hbtrie_find(handle->seqtrie, (void *)kv_seqnum,
2596                              size_id + size_seq, (void *)&offset);
2597             br = (hr == HBTRIE_RESULT_SUCCESS)?(BTREE_RESULT_SUCCESS):(br);
2598         } else {
2599             br = btree_find(handle->seqtree, (void *)&_seqnum, (void *)&offset);
2600         }
2601         btreeblk_end(handle->bhandle);
2602         offset = _endian_decode(offset);
2603 
2604         if (locked) {
2605             filemgr_mutex_unlock(handle->file);
2606         }
2607     }
2608 
2609     if (wr == FDB_RESULT_SUCCESS || br != BTREE_RESULT_FAIL) {
2610         if (!handle->kvs) { // single KVS mode
2611             _doc.key = doc->key;
2612             _doc.length.keylen = doc->keylen;
2613         } else {
2614             _doc.key = NULL;
2615         }
2616         _doc.meta = doc->meta;
2617         _doc.body = doc->body;
2618 
2619         uint64_t body_offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
2620                                                        true);
2621         if (body_offset == offset) {
2622             fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2623             return FDB_RESULT_KEY_NOT_FOUND;
2624         }
2625 
2626         if (handle->kvs) {
2627             int size_chunk = handle->config.chunksize;
2628             doc->keylen = _doc.length.keylen - size_chunk;
2629             if (doc->key) { // doc->key is given by user
2630                 memcpy(doc->key, (uint8_t*)_doc.key + size_chunk, doc->keylen);
2631                 free_docio_object(&_doc, 1, 0, 0);
2632             } else {
2633                 doc->key = _doc.key;
2634                 memmove(doc->key, (uint8_t*)doc->key + size_chunk, doc->keylen);
2635             }
2636         } else {
2637             doc->keylen = _doc.length.keylen;
2638             doc->key = _doc.key;
2639         }
2640         doc->metalen = _doc.length.metalen;
2641         doc->bodylen = _doc.length.bodylen;
2642         doc->meta = _doc.meta;
2643         doc->body = _doc.body;
2644         doc->deleted = _doc.length.flag & DOCIO_DELETED;
2645         doc->size_ondisk = _fdb_get_docsize(_doc.length);
2646         doc->offset = offset;
2647 
2648         fdb_assert(doc->seqnum == _doc.seqnum, doc->seqnum, _doc.seqnum);
2649 
2650         fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2651         return FDB_RESULT_SUCCESS;
2652     }
2653 
2654     fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2655     return FDB_RESULT_KEY_NOT_FOUND;
2656 }
2657 
equal_docs(fdb_doc *doc, struct docio_object *_doc)2658 static uint8_t equal_docs(fdb_doc *doc, struct docio_object *_doc) {
2659     uint8_t rv = 1;
2660     // Compare a seq num if seq tree is enabled.
2661     if (doc->seqnum != SEQNUM_NOT_USED) {
2662         if (doc->seqnum != _doc->seqnum) {
2663             free(_doc->key);
2664             free(_doc->meta);
2665             free(_doc->body);
2666             _doc->key = _doc->meta = _doc->body = NULL;
2667             rv = 0;
2668         }
2669     } else { // Compare key and metadata
2670         if ((doc->key && memcmp(doc->key, _doc->key, doc->keylen)) ||
2671             (doc->meta && memcmp(doc->meta, _doc->meta, doc->metalen))) {
2672             free(_doc->key);
2673             free(_doc->meta);
2674             free(_doc->body);
2675             _doc->key = _doc->meta = _doc->body = NULL;
2676             rv = 0;
2677         }
2678     }
2679     return rv;
2680 }
2681 
_remove_kv_id(fdb_kvs_handle *handle, struct docio_object *doc)2682 INLINE void _remove_kv_id(fdb_kvs_handle *handle, struct docio_object *doc)
2683 {
2684     size_t size_chunk = handle->config.chunksize;
2685     doc->length.keylen -= size_chunk;
2686     memmove(doc->key, (uint8_t*)doc->key + size_chunk, doc->length.keylen);
2687 }
2688 
2689 // Retrieve a doc's metadata and body with a given doc offset in the database file.
2690 LIBFDB_API
fdb_get_byoffset(fdb_kvs_handle *handle, fdb_doc *doc)2691 fdb_status fdb_get_byoffset(fdb_kvs_handle *handle, fdb_doc *doc)
2692 {
2693     uint64_t offset = doc->offset;
2694     struct docio_object _doc;
2695 
2696     if (!offset) {
2697         return FDB_RESULT_INVALID_ARGS;
2698     }
2699 
2700     if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2701         return FDB_RESULT_HANDLE_BUSY;
2702     }
2703 
2704     atomic_incr_uint64_t(&handle->op_stats->num_gets);
2705     memset(&_doc, 0, sizeof(struct docio_object));
2706 
2707     uint64_t _offset = docio_read_doc(handle->dhandle, offset, &_doc, true);
2708     if (_offset == offset) {
2709         fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2710         return FDB_RESULT_KEY_NOT_FOUND;
2711     } else {
2712         if (handle->kvs) {
2713             fdb_kvs_id_t kv_id;
2714             buf2kvid(handle->config.chunksize, _doc.key, &kv_id);
2715             if (kv_id != handle->kvs->id) {
2716                 fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2717                 free_docio_object(&_doc, 1, 1, 1);
2718                 return FDB_RESULT_KEY_NOT_FOUND;
2719             }
2720             _remove_kv_id(handle, &_doc);
2721         }
2722         if (!equal_docs(doc, &_doc)) {
2723             free_docio_object(&_doc, 1, 1, 1);
2724             fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2725             return FDB_RESULT_KEY_NOT_FOUND;
2726         }
2727     }
2728 
2729     doc->seqnum = _doc.seqnum;
2730     doc->keylen = _doc.length.keylen;
2731     doc->metalen = _doc.length.metalen;
2732     doc->bodylen = _doc.length.bodylen;
2733     if (doc->key) {
2734         free(_doc.key);
2735     } else {
2736         doc->key = _doc.key;
2737     }
2738     if (doc->meta) {
2739         free(_doc.meta);
2740     } else {
2741         doc->meta = _doc.meta;
2742     }
2743     if (doc->body) {
2744         if (_doc.length.bodylen > 0) {
2745             memcpy(doc->body, _doc.body, _doc.length.bodylen);
2746         }
2747         free(_doc.body);
2748     } else {
2749         doc->body = _doc.body;
2750     }
2751     doc->deleted = _doc.length.flag & DOCIO_DELETED;
2752     doc->size_ondisk = _fdb_get_docsize(_doc.length);
2753     if (handle->kvs) {
2754         // Since _doc.length was adjusted in _remove_kv_id(),
2755         // we need to compensate it.
2756         doc->size_ondisk += handle->config.chunksize;
2757     }
2758 
2759     if (_doc.length.flag & DOCIO_DELETED) {
2760         fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2761         return FDB_RESULT_KEY_NOT_FOUND;
2762     }
2763 
2764     fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2765     return FDB_RESULT_SUCCESS;
2766 }
2767 
_fdb_get_wal_threshold(fdb_kvs_handle *handle)2768 INLINE uint64_t _fdb_get_wal_threshold(fdb_kvs_handle *handle)
2769 {
2770     return handle->config.wal_threshold;
2771 }
2772 
2773 LIBFDB_API
fdb_set(fdb_kvs_handle *handle, fdb_doc *doc)2774 fdb_status fdb_set(fdb_kvs_handle *handle, fdb_doc *doc)