xref: /4.0.0/forestdb/src/kv_instance.cc (revision 36149418)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdlib.h>
19#include <string.h>
20
21#include "libforestdb/forestdb.h"
22#include "common.h"
23#include "internal_types.h"
24#include "fdb_internal.h"
25#include "configuration.h"
26#include "avltree.h"
27#include "list.h"
28#include "docio.h"
29#include "filemgr.h"
30#include "wal.h"
31#include "hbtrie.h"
32#include "btreeblock.h"
33#include "snapshot.h"
34
35#include "memleak.h"
36#include "time_utils.h"
37
38static const char *default_kvs_name = DEFAULT_KVS_NAME;
39
40// list element for opened KV store handles
41// (in-memory data: managed by the file handle)
42struct kvs_opened_node {
43    fdb_kvs_handle *handle;
44    struct list_elem le;
45};
46
47// list element for custom cmp functions in fhandle
48struct cmp_func_node {
49    char *kvs_name;
50    fdb_custom_cmp_variable func;
51    struct list_elem le;
52};
53
54static int _kvs_cmp_name(struct avl_node *a, struct avl_node *b, void *aux)
55{
56    struct kvs_node *aa, *bb;
57    aa = _get_entry(a, struct kvs_node, avl_name);
58    bb = _get_entry(b, struct kvs_node, avl_name);
59    return strcmp(aa->kvs_name, bb->kvs_name);
60}
61
62static int _kvs_cmp_id(struct avl_node *a, struct avl_node *b, void *aux)
63{
64    struct kvs_node *aa, *bb;
65    aa = _get_entry(a, struct kvs_node, avl_id);
66    bb = _get_entry(b, struct kvs_node, avl_id);
67
68    if (aa->id < bb->id) {
69        return -1;
70    } else if (aa->id > bb->id) {
71        return 1;
72    } else {
73        return 0;
74    }
75}
76
77void fdb_file_handle_init(fdb_file_handle *fhandle,
78                           fdb_kvs_handle *root)
79{
80    fhandle->root = root;
81    fhandle->flags = 0x0;
82    root->fhandle = fhandle;
83    fhandle->handles = (struct list*)calloc(1, sizeof(struct list));
84    fhandle->cmp_func_list = NULL;
85    spin_init(&fhandle->lock);
86}
87
88void fdb_file_handle_close_all(fdb_file_handle *fhandle)
89{
90    struct list_elem *e;
91    struct kvs_opened_node *node;
92
93    spin_lock(&fhandle->lock);
94    e = list_begin(fhandle->handles);
95    while (e) {
96        node = _get_entry(e, struct kvs_opened_node, le);
97        e = list_next(e);
98        _fdb_close(node->handle);
99        free(node->handle);
100        free(node);
101    }
102    spin_unlock(&fhandle->lock);
103}
104
105void fdb_file_handle_parse_cmp_func(fdb_file_handle *fhandle,
106                                    size_t n_func,
107                                    char **kvs_names,
108                                    fdb_custom_cmp_variable *functions)
109{
110    uint64_t i;
111    struct cmp_func_node *node;
112
113    if (n_func == 0 || !kvs_names || !functions) {
114        return;
115    }
116
117    fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
118    list_init(fhandle->cmp_func_list);
119
120    for (i=0;i<n_func;++i){
121        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
122        if (kvs_names[i]) {
123            node->kvs_name = (char*)calloc(1, strlen(kvs_names[i])+1);
124            strcpy(node->kvs_name, kvs_names[i]);
125        } else {
126            // NULL .. default KVS
127            node->kvs_name = NULL;
128        }
129        node->func = functions[i];
130        list_push_back(fhandle->cmp_func_list, &node->le);
131    }
132}
133
134// clone all items in cmp_func_list to fhandle->cmp_func_list
135void fdb_file_handle_clone_cmp_func_list(fdb_file_handle *fhandle,
136                                         struct list *cmp_func_list)
137{
138    struct list_elem *e;
139    struct cmp_func_node *src, *dst;
140
141    if (fhandle->cmp_func_list || /* already exist */
142        !cmp_func_list) {
143        return;
144    }
145
146    fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
147    list_init(fhandle->cmp_func_list);
148
149    e = list_begin(cmp_func_list);
150    while (e) {
151        src = _get_entry(e, struct cmp_func_node, le);
152        dst = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
153        if (src->kvs_name) {
154            dst->kvs_name = (char*)calloc(1, strlen(src->kvs_name)+1);
155            strcpy(dst->kvs_name, src->kvs_name);
156        } else {
157            dst->kvs_name = NULL; // default KVS
158        }
159        dst->func = src->func;
160        list_push_back(fhandle->cmp_func_list, &dst->le);
161        e = list_next(&src->le);
162    }
163}
164
165void fdb_file_handle_add_cmp_func(fdb_file_handle *fhandle,
166                                  char *kvs_name,
167                                  fdb_custom_cmp_variable cmp_func)
168{
169    struct cmp_func_node *node;
170
171    // create list if not exist
172    if (!fhandle->cmp_func_list) {
173        fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
174        list_init(fhandle->cmp_func_list);
175    }
176
177    node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
178    if (kvs_name) {
179        node->kvs_name = (char*)calloc(1, strlen(kvs_name)+1);
180        strcpy(node->kvs_name, kvs_name);
181    } else {
182        // default KVS
183        node->kvs_name = NULL;
184    }
185    node->func = cmp_func;
186    list_push_back(fhandle->cmp_func_list, &node->le);
187}
188
189static void _free_cmp_func_list(fdb_file_handle *fhandle)
190{
191    struct list_elem *e;
192    struct cmp_func_node *cmp_node;
193
194    if (!fhandle->cmp_func_list) {
195        return;
196    }
197
198    e = list_begin(fhandle->cmp_func_list);
199    while (e) {
200        cmp_node = _get_entry(e, struct cmp_func_node, le);
201        e = list_remove(fhandle->cmp_func_list, &cmp_node->le);
202
203        free(cmp_node->kvs_name);
204        free(cmp_node);
205    }
206    free(fhandle->cmp_func_list);
207    fhandle->cmp_func_list = NULL;
208}
209
210void fdb_file_handle_free(fdb_file_handle *fhandle)
211{
212    free(fhandle->handles);
213    _free_cmp_func_list(fhandle);
214    spin_destroy(&fhandle->lock);
215    free(fhandle);
216}
217
218fdb_status fdb_kvs_cmp_check(fdb_kvs_handle *handle)
219{
220    int ori_flag;
221    fdb_file_handle *fhandle = handle->fhandle;
222    fdb_custom_cmp_variable ori_custom_cmp;
223    struct filemgr *file = handle->file;
224    struct cmp_func_node *cmp_node;
225    struct kvs_node *kvs_node, query;
226    struct list_elem *e;
227    struct avl_node *a;
228
229    spin_lock(&file->kv_header->lock);
230    ori_flag = file->kv_header->custom_cmp_enabled;
231    ori_custom_cmp = file->kv_header->default_kvs_cmp;
232
233    if (fhandle->cmp_func_list) {
234        handle->kvs_config.custom_cmp = NULL;
235
236        e = list_begin(fhandle->cmp_func_list);
237        while (e) {
238            cmp_node = _get_entry(e, struct cmp_func_node, le);
239            if (cmp_node->kvs_name == NULL ||
240                    !strcmp(cmp_node->kvs_name, default_kvs_name)) { // default KVS
241                handle->kvs_config.custom_cmp = cmp_node->func;
242                file->kv_header->default_kvs_cmp = cmp_node->func;
243                file->kv_header->custom_cmp_enabled = 1;
244            } else {
245                // search by name
246                query.kvs_name = cmp_node->kvs_name;
247                a = avl_search(file->kv_header->idx_name,
248                               &query.avl_name,
249                               _kvs_cmp_name);
250                if (a) { // found
251                    kvs_node = _get_entry(a, struct kvs_node, avl_name);
252                    if (!kvs_node->custom_cmp) {
253                        kvs_node->custom_cmp = cmp_node->func;
254                    }
255                    file->kv_header->custom_cmp_enabled = 1;
256                }
257            }
258            e = list_next(&cmp_node->le);
259        }
260    }
261
262    // first check the default KVS
263    // 1. root handle has not been opened yet: don't care
264    // 2. root handle was opened before: must match the flag
265    if (fhandle->flags & FHANDLE_ROOT_INITIALIZED) {
266        if (fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP &&
267            handle->kvs_config.custom_cmp == NULL) {
268            // custom cmp function was assigned before,
269            // but no custom cmp function is assigned
270            file->kv_header->custom_cmp_enabled = ori_flag;
271            file->kv_header->default_kvs_cmp = ori_custom_cmp;
272            spin_unlock(&file->kv_header->lock);
273            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
274            if (!kvs_name) {
275                kvs_name = DEFAULT_KVS_NAME;
276            }
277            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
278                           "Error! Tried to open a KV store '%s', which was created with "
279                           "custom compare function enabled, without passing the same "
280                           "custom compare function.", kvs_name);
281        }
282        if (!(fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP) &&
283              handle->kvs_config.custom_cmp) {
284            // custom cmp function was not assigned before,
285            // but custom cmp function is assigned from user
286            file->kv_header->custom_cmp_enabled = ori_flag;
287            file->kv_header->default_kvs_cmp = ori_custom_cmp;
288            spin_unlock(&file->kv_header->lock);
289            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
290            if (!kvs_name) {
291                kvs_name = DEFAULT_KVS_NAME;
292            }
293            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
294                           "Error! Tried to open a KV store '%s', which was created without "
295                           "custom compare function, by passing custom compare function.",
296                    kvs_name);
297        }
298    }
299
300    // next check other KVSs
301    a = avl_first(file->kv_header->idx_name);
302    while (a) {
303        kvs_node = _get_entry(a, struct kvs_node, avl_name);
304        a = avl_next(a);
305
306        if (kvs_node->flags & KVS_FLAG_CUSTOM_CMP &&
307            kvs_node->custom_cmp == NULL) {
308            // custom cmp function was assigned before,
309            // but no custom cmp function is assigned
310            file->kv_header->custom_cmp_enabled = ori_flag;
311            file->kv_header->default_kvs_cmp = ori_custom_cmp;
312            spin_unlock(&file->kv_header->lock);
313            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
314            if (!kvs_name) {
315                kvs_name = DEFAULT_KVS_NAME;
316            }
317            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
318                           "Error! Tried to open a KV store '%s', which was created with "
319                           "custom compare function enabled, without passing the same "
320                           "custom compare function.", kvs_name);
321        }
322        if (!(kvs_node->flags & KVS_FLAG_CUSTOM_CMP) &&
323              kvs_node->custom_cmp) {
324            // custom cmp function was not assigned before,
325            // but custom cmp function is assigned from user
326            file->kv_header->custom_cmp_enabled = ori_flag;
327            file->kv_header->default_kvs_cmp = ori_custom_cmp;
328            spin_unlock(&file->kv_header->lock);
329            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
330            if (!kvs_name) {
331                kvs_name = DEFAULT_KVS_NAME;
332            }
333            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
334                           "Error! Tried to open a KV store '%s', which was created without "
335                           "custom compare function, by passing custom compare function.",
336                           kvs_name);
337        }
338    }
339
340    spin_unlock(&file->kv_header->lock);
341    return FDB_RESULT_SUCCESS;
342}
343
344fdb_custom_cmp_variable fdb_kvs_find_cmp_name(fdb_kvs_handle *handle,
345                                              char *kvs_name)
346{
347    fdb_file_handle *fhandle;
348    struct list_elem *e;
349    struct cmp_func_node *cmp_node;
350
351    fhandle = handle->fhandle;
352    if (!fhandle->cmp_func_list) {
353        return NULL;
354    }
355
356    e = list_begin(fhandle->cmp_func_list);
357    while (e) {
358        cmp_node = _get_entry(e, struct cmp_func_node, le);
359        if (kvs_name == NULL ||
360            !strcmp(kvs_name, default_kvs_name)) {
361            if (cmp_node->kvs_name == NULL ||
362                !strcmp(cmp_node->kvs_name, default_kvs_name)) { // default KVS
363                return cmp_node->func;
364            }
365        } else if (cmp_node->kvs_name &&
366                   !strcmp(cmp_node->kvs_name, kvs_name)) {
367            return cmp_node->func;
368        }
369        e = list_next(&cmp_node->le);
370    }
371    return NULL;
372}
373
374hbtrie_cmp_func *fdb_kvs_find_cmp_chunk(void *chunk, void *aux)
375{
376    fdb_kvs_id_t kv_id;
377    struct hbtrie *trie = (struct hbtrie *)aux;
378    struct btreeblk_handle *bhandle;
379    struct filemgr *file;
380    struct avl_node *a;
381    struct kvs_node query, *node;
382
383    bhandle = (struct btreeblk_handle*)trie->btreeblk_handle;
384    file = bhandle->file;
385
386    if (!file->kv_header->custom_cmp_enabled) {
387        return NULL;
388    }
389
390    buf2kvid(trie->chunksize, chunk, &kv_id);
391
392    // search by id
393    if (kv_id > 0) {
394        query.id = kv_id;
395        spin_lock(&file->kv_header->lock);
396        a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
397        spin_unlock(&file->kv_header->lock);
398
399        if (a) {
400            node = _get_entry(a, struct kvs_node, avl_id);
401            return (hbtrie_cmp_func *)node->custom_cmp;
402        }
403    } else {
404        // root handle
405        return (hbtrie_cmp_func *)file->kv_header->default_kvs_cmp;
406    }
407    return NULL;
408}
409
410void _fdb_kvs_init_root(fdb_kvs_handle *handle, struct filemgr *file) {
411    handle->kvs->type = KVS_ROOT;
412    handle->kvs->root = handle->fhandle->root;
413    // super handle's ID is always 0
414    handle->kvs->id = 0;
415    // force custom cmp function
416    spin_lock(&file->kv_header->lock);
417    handle->kvs_config.custom_cmp = file->kv_header->default_kvs_cmp;
418    spin_unlock(&file->kv_header->lock);
419}
420
421void fdb_kvs_info_create(fdb_kvs_handle *root_handle,
422                         fdb_kvs_handle *handle,
423                         struct filemgr *file,
424                         const char *kvs_name)
425{
426    struct kvs_node query, *kvs_node;
427    struct kvs_opened_node *opened_node;
428    struct avl_node *a;
429
430    handle->kvs = (struct kvs_info*)calloc(1, sizeof(struct kvs_info));
431
432    if (root_handle == NULL) {
433        // 'handle' is a super handle
434        _fdb_kvs_init_root(handle, file);
435    } else {
436        // 'handle' is a sub handle (i.e., KV instance in a DB instance)
437        handle->kvs->type = KVS_SUB;
438        handle->kvs->root = root_handle;
439
440        if (kvs_name) {
441            spin_lock(&file->kv_header->lock);
442            query.kvs_name = (char*)kvs_name;
443            a = avl_search(file->kv_header->idx_name, &query.avl_name,
444                           _kvs_cmp_name);
445            if (a == NULL) {
446                // KV instance name is not found
447                free(handle->kvs);
448                handle->kvs = NULL;
449                spin_unlock(&file->kv_header->lock);
450                return;
451            }
452            kvs_node = _get_entry(a, struct kvs_node, avl_name);
453            handle->kvs->id = kvs_node->id;
454            // force custom cmp function
455            handle->kvs_config.custom_cmp = kvs_node->custom_cmp;
456            spin_unlock(&file->kv_header->lock);
457        } else {
458            // snapshot of the root handle
459            handle->kvs->id = 0;
460        }
461
462        opened_node = (struct kvs_opened_node *)
463               calloc(1, sizeof(struct kvs_opened_node));
464        opened_node->handle = handle;
465
466        handle->node = opened_node;
467        spin_lock(&root_handle->fhandle->lock);
468        list_push_back(root_handle->fhandle->handles, &opened_node->le);
469        spin_unlock(&root_handle->fhandle->lock);
470    }
471}
472
473void fdb_kvs_info_free(fdb_kvs_handle *handle)
474{
475    if (handle->kvs == NULL) {
476        return;
477    }
478
479    free(handle->kvs);
480    handle->kvs = NULL;
481}
482
483void _fdb_kvs_header_create(struct kvs_header **kv_header_ptr)
484{
485    struct kvs_header *kv_header;
486
487    kv_header = (struct kvs_header *)calloc(1, sizeof(struct kvs_header));
488    *kv_header_ptr = kv_header;
489
490    // KV ID '0' is reserved for default KV instance (super handle)
491    kv_header->id_counter = 1;
492    kv_header->default_kvs_cmp = NULL;
493    kv_header->custom_cmp_enabled = 0;
494    kv_header->idx_name = (struct avl_tree*)malloc(sizeof(struct avl_tree));
495    kv_header->idx_id = (struct avl_tree*)malloc(sizeof(struct avl_tree));
496    avl_init(kv_header->idx_name, NULL);
497    avl_init(kv_header->idx_id, NULL);
498    spin_init(&kv_header->lock);
499}
500
501void fdb_kvs_header_create(struct filemgr *file)
502{
503    if (file->kv_header) {
504        return; // already exist
505    }
506
507    _fdb_kvs_header_create(&file->kv_header);
508    file->free_kv_header = fdb_kvs_header_free;
509}
510
511void fdb_kvs_header_reset_all_stats(struct filemgr *file)
512{
513    struct avl_node *a;
514    struct kvs_node *node;
515    struct kvs_header *kv_header = file->kv_header;
516
517    spin_lock(&kv_header->lock);
518    a = avl_first(kv_header->idx_id);
519    while (a) {
520        node = _get_entry(a, struct kvs_node, avl_id);
521        a = avl_next(&node->avl_id);
522        memset(&node->stat, 0x0, sizeof(node->stat));
523    }
524    spin_unlock(&kv_header->lock);
525}
526
527void fdb_kvs_header_copy(fdb_kvs_handle *handle,
528                         struct filemgr *new_file,
529                         struct docio_handle *new_dhandle,
530                         bool create_new)
531{
532    struct avl_node *a, *aa;
533    struct kvs_node *node_old, *node_new;
534
535    if (create_new) {
536        // copy KV header data in 'handle' to new file
537        fdb_kvs_header_create(new_file);
538        // read from 'handle->dhandle', and import into 'new_file'
539        fdb_kvs_header_read(new_file, handle->dhandle,
540                            handle->kv_info_offset, false);
541        // write KV header in 'new_file' using 'new_dhandle'
542        handle->kv_info_offset = fdb_kvs_header_append(new_file,
543                                                          new_dhandle);
544        fdb_kvs_header_reset_all_stats(new_file);
545    }
546
547    spin_lock(&handle->file->kv_header->lock);
548    spin_lock(&new_file->kv_header->lock);
549    // copy all in-memory custom cmp function pointers & seqnums
550    new_file->kv_header->default_kvs_cmp =
551        handle->file->kv_header->default_kvs_cmp;
552    new_file->kv_header->custom_cmp_enabled =
553        handle->file->kv_header->custom_cmp_enabled;
554    a = avl_first(handle->file->kv_header->idx_id);
555    while (a) {
556        node_old = _get_entry(a, struct kvs_node, avl_id);
557        aa = avl_search(new_file->kv_header->idx_id,
558                        &node_old->avl_id, _kvs_cmp_id);
559        assert(aa); // MUST exist
560        node_new = _get_entry(aa, struct kvs_node, avl_id);
561        node_new->custom_cmp = node_old->custom_cmp;
562        node_new->seqnum = node_old->seqnum;
563        node_new->op_stat = node_old->op_stat;
564        a = avl_next(a);
565    }
566    spin_unlock(&new_file->kv_header->lock);
567    spin_unlock(&handle->file->kv_header->lock);
568}
569
570// export KV header info to raw data
571static void _fdb_kvs_header_export(struct kvs_header *kv_header,
572                                   void **data, size_t *len)
573{
574    /* << raw data structure >>
575     * [# KV instances]:        8 bytes
576     * [current KV ID counter]: 8 bytes
577     * ---
578     * [name length]:           2 bytes
579     * [instance name]:         x bytes
580     * [instance ID]:           8 bytes
581     * [sequence number]:       8 bytes
582     * [# live index nodes]:    8 bytes
583     * [# docs]:                8 bytes
584     * [data size]:             8 bytes
585     * [flags]:                 8 bytes
586     * ...
587     */
588
589    int size = 0;
590    int offset = 0;
591    uint16_t name_len, _name_len;
592    uint64_t c = 0;
593    uint64_t _n_kv, _kv_id, _flags;
594    uint64_t _nlivenodes, _ndocs, _datasize;
595    fdb_kvs_id_t _id_counter;
596    fdb_seqnum_t _seqnum;
597    struct kvs_node *node;
598    struct avl_node *a;
599
600    if (kv_header == NULL) {
601        *data = NULL;
602        *len = 0;
603        return ;
604    }
605
606    spin_lock(&kv_header->lock);
607
608    // pre-scan to estimate the size of data
609    size += sizeof(uint64_t);
610    size += sizeof(fdb_kvs_id_t);
611    a = avl_first(kv_header->idx_name);
612    while(a) {
613        node = _get_entry(a, struct kvs_node, avl_name);
614        c++;
615        size += sizeof(uint16_t); // length
616        size += strlen(node->kvs_name)+1; // name
617        size += sizeof(node->id); // ID
618        size += sizeof(node->seqnum); // seq number
619        size += sizeof(node->stat.nlivenodes); // # live index nodes
620        size += sizeof(node->stat.ndocs); // # docs
621        size += sizeof(node->stat.datasize); // data size
622        size += sizeof(node->flags); // flags
623        a = avl_next(a);
624    }
625
626    *data = (void *)malloc(size);
627
628    // # KV instances
629    _n_kv = _endian_encode(c);
630    memcpy((uint8_t*)*data + offset, &_n_kv, sizeof(_n_kv));
631    offset += sizeof(_n_kv);
632
633    // ID counter
634    _id_counter = _endian_encode(kv_header->id_counter);
635    memcpy((uint8_t*)*data + offset, &_id_counter, sizeof(_id_counter));
636    offset += sizeof(_id_counter);
637
638    a = avl_first(kv_header->idx_name);
639    while(a) {
640        node = _get_entry(a, struct kvs_node, avl_name);
641
642        // name length
643        name_len = strlen(node->kvs_name)+1;
644        _name_len = _endian_encode(name_len);
645        memcpy((uint8_t*)*data + offset, &_name_len, sizeof(_name_len));
646        offset += sizeof(_name_len);
647
648        // name
649        memcpy((uint8_t*)*data + offset, node->kvs_name, name_len);
650        offset += name_len;
651
652        // KV ID
653        _kv_id = _endian_encode(node->id);
654        memcpy((uint8_t*)*data + offset, &_kv_id, sizeof(_kv_id));
655        offset += sizeof(_kv_id);
656
657        // seq number
658        _seqnum = _endian_encode(node->seqnum);
659        memcpy((uint8_t*)*data + offset, &_seqnum, sizeof(_seqnum));
660        offset += sizeof(_seqnum);
661
662        // # live index nodes
663        _nlivenodes = _endian_encode(node->stat.nlivenodes);
664        memcpy((uint8_t*)*data + offset, &_nlivenodes, sizeof(_nlivenodes));
665        offset += sizeof(_nlivenodes);
666
667        // # docs
668        _ndocs = _endian_encode(node->stat.ndocs);
669        memcpy((uint8_t*)*data + offset, &_ndocs, sizeof(_ndocs));
670        offset += sizeof(_ndocs);
671
672        // datasize
673        _datasize = _endian_encode(node->stat.datasize);
674        memcpy((uint8_t*)*data + offset, &_datasize, sizeof(_datasize));
675        offset += sizeof(_datasize);
676
677        // flags
678        _flags = _endian_encode(node->flags);
679        memcpy((uint8_t*)*data + offset, &_flags, sizeof(_flags));
680        offset += sizeof(_flags);
681
682        a = avl_next(a);
683    }
684
685    *len = size;
686
687    spin_unlock(&kv_header->lock);
688}
689
690void _fdb_kvs_header_import(struct kvs_header *kv_header,
691                            void *data, size_t len, bool only_seq_nums)
692{
693    uint64_t i, offset = 0;
694    uint16_t name_len, _name_len;
695    uint64_t n_kv, _n_kv, kv_id, _kv_id, flags, _flags;
696    uint64_t _nlivenodes, _ndocs, _datasize;
697    fdb_kvs_id_t id_counter, _id_counter;
698    fdb_seqnum_t seqnum, _seqnum;
699    struct kvs_node *node;
700
701    // # KV instances
702    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
703    offset += sizeof(_n_kv);
704    n_kv = _endian_decode(_n_kv);
705
706    // ID counter
707    memcpy(&_id_counter, (uint8_t*)data + offset, sizeof(_id_counter));
708    offset += sizeof(_id_counter);
709    id_counter = _endian_decode(_id_counter);
710
711    spin_lock(&kv_header->lock);
712    kv_header->id_counter = id_counter;
713
714    for (i=0;i<n_kv;++i){
715        // name length
716        uint64_t name_offset;
717        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
718        offset += sizeof(_name_len);
719        name_offset = offset;
720        name_len = _endian_decode(_name_len);
721
722        // name
723        offset += name_len;
724
725        // KV ID
726        memcpy(&_kv_id, (uint8_t*)data + offset, sizeof(_kv_id));
727        offset += sizeof(_kv_id);
728        kv_id = _endian_decode(_kv_id);
729
730        // Search if a given KV header node exists or not.
731        struct kvs_node query;
732        query.id = kv_id;
733        struct avl_node *a = avl_search(kv_header->idx_id, &query.avl_id,
734                                        _kvs_cmp_id);
735        if (a) {
736            node = _get_entry(a, struct kvs_node, avl_id);
737        } else {
738            node = (struct kvs_node *)calloc(1, sizeof(struct kvs_node));
739            node->kvs_name = (char *)malloc(name_len);
740            memcpy(node->kvs_name, (uint8_t*)data + name_offset, name_len);
741            node->id = kv_id;
742            _init_op_stats(&node->op_stat);
743        }
744
745        // seq number
746        memcpy(&_seqnum, (uint8_t*)data + offset, sizeof(_seqnum));
747        offset += sizeof(_seqnum);
748        seqnum = _endian_decode(_seqnum);
749        node->seqnum = seqnum;
750
751        // # live index nodes
752        memcpy(&_nlivenodes, (uint8_t*)data + offset, sizeof(_nlivenodes));
753        offset += sizeof(_nlivenodes);
754
755        // # docs
756        memcpy(&_ndocs, (uint8_t*)data + offset, sizeof(_ndocs));
757        offset += sizeof(_ndocs);
758
759        // datasize
760        memcpy(&_datasize, (uint8_t*)data + offset, sizeof(_datasize));
761        offset += sizeof(_datasize);
762
763        // flags
764        memcpy(&_flags, (uint8_t*)data + offset, sizeof(_flags));
765        offset += sizeof(_flags);
766        flags = _endian_decode(_flags);
767
768        if (!only_seq_nums) {
769            node->stat.nlivenodes = _endian_decode(_nlivenodes);
770            node->stat.ndocs = _endian_decode(_ndocs);
771            node->stat.datasize = _endian_decode(_datasize);
772            node->flags = flags;
773            node->custom_cmp = NULL;
774        }
775
776        if (!a) { // Insert a new KV header node if not exist.
777            avl_insert(kv_header->idx_name, &node->avl_name, _kvs_cmp_name);
778            avl_insert(kv_header->idx_id, &node->avl_id, _kvs_cmp_id);
779        }
780    }
781    spin_unlock(&kv_header->lock);
782}
783
784fdb_status _fdb_kvs_get_snap_info(void *data,
785                                  fdb_snapshot_info_t *snap_info)
786{
787    int i, offset = 0, sizeof_skipped_segments;
788    uint16_t name_len, _name_len;
789    int64_t n_kv, _n_kv;
790    fdb_seqnum_t _seqnum;
791
792    // # KV instances
793    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
794    offset += sizeof(_n_kv);
795    // since n_kv doesn't count the default KVS, increase it by 1.
796    n_kv = _endian_decode(_n_kv) + 1;
797    assert(n_kv); // Must have at least one kv instance
798    snap_info->kvs_markers = (fdb_kvs_commit_marker_t *)malloc(
799                                   (n_kv) * sizeof(fdb_kvs_commit_marker_t));
800    if (!snap_info->kvs_markers) { // LCOV_EXCL_START
801        return FDB_RESULT_ALLOC_FAIL;
802    } // LCOV_EXCL_STOP
803
804    snap_info->num_kvs_markers = n_kv;
805
806    // Skip over ID counter
807    offset += sizeof(fdb_kvs_id_t);
808
809    sizeof_skipped_segments = sizeof(uint64_t) // seqnum will be the last read
810                            + sizeof(uint64_t) // skip over nlivenodes
811                            + sizeof(uint64_t) // skip over ndocs
812                            + sizeof(uint64_t) // skip over datasize
813                            + sizeof(uint64_t); // skip over flags
814
815    for (i = 0; i < n_kv-1; ++i){
816        fdb_kvs_commit_marker_t *info = &snap_info->kvs_markers[i];
817        // Read the kv store name length
818        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
819        offset += sizeof(_name_len);
820        name_len = _endian_decode(_name_len);
821
822        // Retrieve the KV Store name
823        info->kv_store_name = (char *)malloc(name_len); // TODO: cleanup if err
824        memcpy(info->kv_store_name, (uint8_t*)data + offset, name_len);
825        offset += name_len;
826
827        // Skip over KV ID
828        offset += sizeof(uint64_t);
829
830        // Retrieve the KV Store Commit Sequence number
831        memcpy(&_seqnum, (uint8_t*)data + offset, sizeof(_seqnum));
832        info->seqnum = _endian_decode(_seqnum);
833
834        // Skip over seqnum, nlivenodes, ndocs, datasize and flags onto next..
835        offset += sizeof_skipped_segments;
836    }
837
838    return FDB_RESULT_SUCCESS;
839}
840
841uint64_t fdb_kvs_header_append(struct filemgr *file,
842                                  struct docio_handle *dhandle)
843{
844    char *doc_key = alca(char, 32);
845    void *data;
846    size_t len;
847    uint64_t kv_info_offset;
848    struct docio_object doc;
849
850    _fdb_kvs_header_export(file->kv_header, &data, &len);
851
852    memset(&doc, 0, sizeof(struct docio_object));
853    sprintf(doc_key, "KV_header");
854    doc.key = (void *)doc_key;
855    doc.meta = NULL;
856    doc.body = data;
857    doc.length.keylen = strlen(doc_key) + 1;
858    doc.length.metalen = 0;
859    doc.length.bodylen = len;
860    doc.seqnum = 0;
861    kv_info_offset = docio_append_doc_system(dhandle, &doc);
862    free(data);
863
864    return kv_info_offset;
865}
866
867void fdb_kvs_header_read(struct filemgr *file,
868                         struct docio_handle *dhandle,
869                         uint64_t kv_info_offset,
870                         bool only_seq_nums)
871{
872    uint64_t offset;
873    struct docio_object doc;
874
875    memset(&doc, 0, sizeof(struct docio_object));
876    offset = docio_read_doc(dhandle, kv_info_offset, &doc, true);
877
878    if (offset == kv_info_offset) {
879        fdb_log(dhandle->log_callback, FDB_RESULT_READ_FAIL,
880                "Failed to read a KV header with the offset %" _F64 " from a "
881                "database file '%s'", kv_info_offset, file->filename);
882        return;
883    }
884
885    _fdb_kvs_header_import(file->kv_header, doc.body, doc.length.bodylen,
886                           only_seq_nums);
887    free_docio_object(&doc, 1, 1, 1);
888}
889
890fdb_seqnum_t _fdb_kvs_get_seqnum(struct kvs_header *kv_header,
891                                 fdb_kvs_id_t id)
892{
893    fdb_seqnum_t seqnum;
894    struct kvs_node query, *node;
895    struct avl_node *a;
896
897    spin_lock(&kv_header->lock);
898    query.id = id;
899    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
900    if (a) {
901        node = _get_entry(a, struct kvs_node, avl_id);
902        seqnum = node->seqnum;
903    } else {
904        // not existing KV ID.
905        // this is necessary for _fdb_restore_wal()
906        // not to restore documents in deleted KV store.
907        seqnum = 0;
908    }
909    spin_unlock(&kv_header->lock);
910
911    return seqnum;
912}
913
914fdb_seqnum_t fdb_kvs_get_seqnum(struct filemgr *file,
915                                fdb_kvs_id_t id)
916{
917    if (id == 0) {
918        // default KV instance
919        return filemgr_get_seqnum(file);
920    }
921
922    return _fdb_kvs_get_seqnum(file->kv_header, id);
923}
924
925fdb_seqnum_t fdb_kvs_get_committed_seqnum(fdb_kvs_handle *handle)
926{
927    uint8_t *buf;
928    uint64_t dummy64;
929    uint64_t kv_info_offset;
930    size_t len;
931    bid_t hdr_bid;
932    fdb_seqnum_t seqnum = SEQNUM_NOT_USED;
933    fdb_kvs_id_t id = 0;
934    char *compacted_filename = NULL;
935    struct filemgr *file = handle->file;
936
937    buf = alca(uint8_t, file->config->blocksize);
938
939    if (handle->kvs && handle->kvs->id > 0) {
940        id = handle->kvs->id;
941    }
942
943    hdr_bid = filemgr_get_header_bid(file);
944    if (hdr_bid == BLK_NOT_FOUND) {
945        // header doesn't exist
946        return 0;
947    }
948
949    // read header
950    filemgr_fetch_header(file, hdr_bid, buf, &len, &seqnum, NULL, &handle->log_callback);
951    if (id > 0) { // non-default KVS
952        // read last KVS header
953        fdb_fetch_header(buf, &dummy64,
954                         &dummy64, &dummy64, &dummy64,
955                         &dummy64, &dummy64,
956                         &kv_info_offset, &dummy64,
957                         &compacted_filename, NULL);
958
959        uint64_t doc_offset;
960        struct kvs_header *kv_header;
961        struct docio_object doc;
962
963        _fdb_kvs_header_create(&kv_header);
964        memset(&doc, 0, sizeof(struct docio_object));
965        doc_offset = docio_read_doc(handle->dhandle,
966                                    kv_info_offset, &doc, true);
967
968        if (doc_offset == kv_info_offset) {
969            // fail
970            _fdb_kvs_header_free(kv_header);
971            return 0;
972
973        } else {
974            _fdb_kvs_header_import(kv_header, doc.body,
975                                   doc.length.bodylen, false);
976            // get local sequence number for the KV instance
977            seqnum = _fdb_kvs_get_seqnum(kv_header,
978                                         handle->kvs->id);
979            _fdb_kvs_header_free(kv_header);
980            free_docio_object(&doc, 1, 1, 1);
981        }
982    }
983    return seqnum;
984}
985
986LIBFDB_API
987fdb_status fdb_get_kvs_seqnum(fdb_kvs_handle *handle, fdb_seqnum_t *seqnum)
988{
989    if (!handle) {
990        return FDB_RESULT_INVALID_HANDLE;
991    }
992    if (!seqnum) {
993        return FDB_RESULT_INVALID_ARGS;
994    }
995
996    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
997        return FDB_RESULT_HANDLE_BUSY;
998    }
999
1000    if (handle->shandle) {
1001        // handle for snapshot
1002        // return MAX_SEQNUM instead of the file's sequence number
1003        *seqnum = handle->max_seqnum;
1004    } else {
1005        fdb_check_file_reopen(handle, NULL);
1006        fdb_sync_db_header(handle);
1007
1008        struct filemgr *file;
1009        file = handle->file;
1010
1011        if (handle->kvs == NULL ||
1012            handle->kvs->id == 0) {
1013            filemgr_mutex_lock(file);
1014            *seqnum = filemgr_get_seqnum(file);
1015            filemgr_mutex_unlock(file);
1016        } else {
1017            *seqnum = fdb_kvs_get_seqnum(file, handle->kvs->id);
1018        }
1019    }
1020    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
1021    return FDB_RESULT_SUCCESS;
1022}
1023
1024void fdb_kvs_set_seqnum(struct filemgr *file,
1025                           fdb_kvs_id_t id,
1026                           fdb_seqnum_t seqnum)
1027{
1028    struct kvs_header *kv_header = file->kv_header;
1029    struct kvs_node query, *node;
1030    struct avl_node *a;
1031
1032    if (id == 0) {
1033        // default KV instance
1034        filemgr_set_seqnum(file, seqnum);
1035        return;
1036    }
1037
1038    spin_lock(&kv_header->lock);
1039    query.id = id;
1040    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1041    node = _get_entry(a, struct kvs_node, avl_id);
1042    node->seqnum = seqnum;
1043    spin_unlock(&kv_header->lock);
1044}
1045
1046void _fdb_kvs_header_free(struct kvs_header *kv_header)
1047{
1048    struct kvs_node *node;
1049    struct avl_node *a;
1050
1051    a = avl_first(kv_header->idx_name);
1052    while (a) {
1053        node = _get_entry(a, struct kvs_node, avl_name);
1054        a = avl_next(a);
1055        avl_remove(kv_header->idx_name, &node->avl_name);
1056
1057        free(node->kvs_name);
1058        free(node);
1059    }
1060    free(kv_header->idx_name);
1061    free(kv_header->idx_id);
1062    free(kv_header);
1063}
1064
1065void fdb_kvs_header_free(struct filemgr *file)
1066{
1067    if (file->kv_header == NULL) {
1068        return;
1069    }
1070
1071    _fdb_kvs_header_free(file->kv_header);
1072    file->kv_header = NULL;
1073}
1074
1075static fdb_status _fdb_kvs_create(fdb_kvs_handle *root_handle,
1076                                  const char *kvs_name,
1077                                  fdb_kvs_config *kvs_config)
1078{
1079    int kv_ins_name_len;
1080    fdb_status fs = FDB_RESULT_SUCCESS;
1081    struct avl_node *a;
1082    struct filemgr *file;
1083    struct docio_handle *dhandle;
1084    struct kvs_node *node, query;
1085    struct kvs_header *kv_header;
1086
1087    if (root_handle->config.multi_kv_instances == false) {
1088        // cannot open KV instance under single DB instance mode
1089        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_CONFIG,
1090                       "Cannot open or create KV store instance '%s' because multi-KV "
1091                       "store instance mode is disabled.",
1092                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1093    }
1094    if (root_handle->kvs->type != KVS_ROOT) {
1095        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_HANDLE,
1096                       "Cannot open or create KV store instance '%s' because the handle "
1097                       "doesn't support multi-KV sotre instance mode.",
1098                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1099    }
1100
1101fdb_kvs_create_start:
1102    fdb_check_file_reopen(root_handle, NULL);
1103    filemgr_mutex_lock(root_handle->file);
1104    fdb_sync_db_header(root_handle);
1105
1106    if (filemgr_is_rollback_on(root_handle->file)) {
1107        filemgr_mutex_unlock(root_handle->file);
1108        return FDB_RESULT_FAIL_BY_ROLLBACK;
1109    }
1110
1111    file = root_handle->file;
1112    dhandle = root_handle->dhandle;
1113
1114    file_status_t fstatus = filemgr_get_file_status(file);
1115    if (fstatus == FILE_REMOVED_PENDING) {
1116        // we must not write into this file
1117        // file status was changed by other thread .. start over
1118        filemgr_mutex_unlock(file);
1119        goto fdb_kvs_create_start;
1120    }
1121
1122    kv_header = file->kv_header;
1123    spin_lock(&kv_header->lock);
1124
1125    // find existing KV instance
1126    // search by name
1127    query.kvs_name = (char*)kvs_name;
1128    a = avl_search(kv_header->idx_name, &query.avl_name, _kvs_cmp_name);
1129    if (a) { // KV name already exists
1130        spin_unlock(&kv_header->lock);
1131        filemgr_mutex_unlock(file);
1132        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1133                       "Failed to create KV Store '%s' as it already exists.",
1134                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1135    }
1136
1137    // create a kvs_node and insert
1138    node = (struct kvs_node *)calloc(1, sizeof(struct kvs_node));
1139    node->id = kv_header->id_counter++;
1140    node->seqnum = 0;
1141    node->flags = 0x0;
1142    _init_op_stats(&node->op_stat);
1143    // search fhandle's custom cmp func list first
1144    node->custom_cmp = fdb_kvs_find_cmp_name(root_handle,
1145                                             (char *)kvs_name);
1146    if (node->custom_cmp == NULL && kvs_config->custom_cmp) {
1147        // follow kvs_config's custom cmp next
1148        node->custom_cmp = kvs_config->custom_cmp;
1149        // if custom cmp function is given by user but
1150        // there is no corresponding function in fhandle's list
1151        // add it into the list
1152        fdb_file_handle_add_cmp_func(root_handle->fhandle,
1153                                     (char*)kvs_name,
1154                                     kvs_config->custom_cmp);
1155    }
1156    if (node->custom_cmp) { // custom cmp function is used
1157        node->flags |= KVS_FLAG_CUSTOM_CMP;
1158        kv_header->custom_cmp_enabled = 1;
1159    }
1160    kv_ins_name_len = strlen(kvs_name)+1;
1161    node->kvs_name = (char *)malloc(kv_ins_name_len);
1162    strcpy(node->kvs_name, kvs_name);
1163
1164    avl_insert(kv_header->idx_name, &node->avl_name, _kvs_cmp_name);
1165    avl_insert(kv_header->idx_id, &node->avl_id, _kvs_cmp_id);
1166    spin_unlock(&kv_header->lock);
1167
1168    // if compaction is in-progress,
1169    // create a same kvs_node for the new file
1170    if (file->new_file &&
1171        filemgr_get_file_status(file) == FILE_COMPACT_OLD) {
1172        struct kvs_node *node_new;
1173        struct kvs_header *kv_header_new;
1174
1175        kv_header_new = file->new_file->kv_header;
1176        node_new = (struct kvs_node*)calloc(1, sizeof(struct kvs_node));
1177        *node_new = *node;
1178        node_new->kvs_name = (char*)malloc(kv_ins_name_len);
1179        strcpy(node_new->kvs_name, kvs_name);
1180
1181        // insert into new file's kv_header
1182        spin_lock(&kv_header_new->lock);
1183        if (node->custom_cmp) {
1184            kv_header_new->custom_cmp_enabled = 1;
1185        }
1186        avl_insert(kv_header_new->idx_name, &node_new->avl_name, _kvs_cmp_name);
1187        avl_insert(kv_header_new->idx_id, &node_new->avl_id, _kvs_cmp_id);
1188        spin_unlock(&kv_header_new->lock);
1189    }
1190
1191    // sync dirty root nodes
1192    bid_t dirty_idtree_root, dirty_seqtree_root;
1193    filemgr_get_dirty_root(root_handle->file, &dirty_idtree_root, &dirty_seqtree_root);
1194    if (dirty_idtree_root != BLK_NOT_FOUND) {
1195        root_handle->trie->root_bid = dirty_idtree_root;
1196    }
1197    if (root_handle->config.seqtree_opt == FDB_SEQTREE_USE &&
1198        dirty_seqtree_root != BLK_NOT_FOUND) {
1199        if (root_handle->kvs) {
1200            root_handle->seqtrie->root_bid = dirty_seqtree_root;
1201        } else {
1202            btree_init_from_bid(root_handle->seqtree,
1203                                root_handle->seqtree->blk_handle,
1204                                root_handle->seqtree->blk_ops,
1205                                root_handle->seqtree->kv_ops,
1206                                root_handle->seqtree->blksize,
1207                                dirty_seqtree_root);
1208        }
1209    }
1210
1211    // append system doc
1212    root_handle->kv_info_offset = fdb_kvs_header_append(file, dhandle);
1213
1214    // if no compaction is being performed, append header and commit
1215    if (root_handle->file == file) {
1216        root_handle->cur_header_revnum = fdb_set_file_header(root_handle);
1217        fs = filemgr_commit(root_handle->file, &root_handle->log_callback);
1218    }
1219
1220    filemgr_mutex_unlock(file);
1221
1222    return fs;
1223}
1224
1225// this function just returns pointer
1226char* _fdb_kvs_get_name(fdb_kvs_handle *handle, struct filemgr *file)
1227{
1228    struct kvs_node *node, query;
1229    struct avl_node *a;
1230
1231    if (handle->kvs == NULL) {
1232        // single KV instance mode
1233        return NULL;
1234    }
1235
1236    query.id = handle->kvs->id;
1237    if (query.id == 0) { // default KV instance
1238        return NULL;
1239    }
1240    spin_lock(&file->kv_header->lock);
1241    a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1242    if (a) {
1243        node = _get_entry(a, struct kvs_node, avl_id);
1244        spin_unlock(&file->kv_header->lock);
1245        return node->kvs_name;
1246    }
1247    spin_unlock(&file->kv_header->lock);
1248    return NULL;
1249}
1250
1251fdb_status _fdb_kvs_clone_snapshot(fdb_kvs_handle *handle_in,
1252                                   fdb_kvs_handle *handle_out)
1253{
1254    fdb_status fs;
1255    fdb_kvs_handle *root_handle = handle_in->kvs->root;
1256
1257    if (!handle_out->kvs) {
1258        // create kvs_info
1259        handle_out->kvs = (struct kvs_info*)calloc(1, sizeof(struct kvs_info));
1260        handle_out->kvs->type = handle_in->kvs->type;
1261        handle_out->kvs->id = handle_in->kvs->id;
1262        handle_out->kvs->root = root_handle;
1263        handle_out->kvs_config.custom_cmp = handle_in->kvs_config.custom_cmp;
1264
1265        struct kvs_opened_node *opened_node = (struct kvs_opened_node *)
1266            calloc(1, sizeof(struct kvs_opened_node));
1267        opened_node->handle = handle_out;
1268        handle_out->node = opened_node;
1269
1270        spin_lock(&root_handle->fhandle->lock);
1271        list_push_back(root_handle->fhandle->handles, &opened_node->le);
1272        spin_unlock(&root_handle->fhandle->lock);
1273    }
1274
1275    fs = _fdb_clone_snapshot(handle_in, handle_out);
1276    if (fs != FDB_RESULT_SUCCESS) {
1277        if (handle_out->node) {
1278            spin_lock(&root_handle->fhandle->lock);
1279            list_remove(root_handle->fhandle->handles, &handle_out->node->le);
1280            spin_unlock(&root_handle->fhandle->lock);
1281            free(handle_out->node);
1282        }
1283        free(handle_out->kvs);
1284    }
1285    return fs;
1286}
1287
1288// 1) allocate memory & create 'handle->kvs'
1289//    by calling fdb_kvs_info_create().
1290//      -> this will allocate a corresponding node and
1291//         insert it into fhandle->handles list.
1292// 2) if matching KVS name doesn't exist, create it.
1293// 3) call _fdb_open().
1294fdb_status _fdb_kvs_open(fdb_kvs_handle *root_handle,
1295                         fdb_config *config,
1296                         fdb_kvs_config *kvs_config,
1297                         struct filemgr *file,
1298                         const char *filename,
1299                         const char *kvs_name,
1300                         fdb_kvs_handle *handle)
1301{
1302    fdb_status fs;
1303
1304    if (handle->kvs == NULL) {
1305        // create kvs_info
1306        fdb_kvs_info_create(root_handle, handle, file, kvs_name);
1307    }
1308
1309    if (handle->kvs == NULL) {
1310        // KV instance name is not found
1311        if (!kvs_config->create_if_missing) {
1312            return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1313                           "Failed to open KV store '%s' because it doesn't exist.",
1314                           kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1315        }
1316        if (root_handle->config.flags == FDB_OPEN_FLAG_RDONLY) {
1317            return fdb_log(&root_handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
1318                           "Failed to create KV store '%s' because the KV store's handle "
1319                           "is read-only.", kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1320        }
1321
1322        // create
1323        fs = _fdb_kvs_create(root_handle, kvs_name, kvs_config);
1324        if (fs != FDB_RESULT_SUCCESS) { // create fail
1325            return FDB_RESULT_INVALID_KV_INSTANCE_NAME;
1326        }
1327        // create kvs_info again
1328        fdb_kvs_info_create(root_handle, handle, file, kvs_name);
1329        if (handle->kvs == NULL) { // fail again
1330            return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1331                           "Failed to create KV store '%s' because the KV store's handle "
1332                           "is read-only.", kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1333        }
1334    }
1335    fs = _fdb_open(handle, filename, FDB_AFILENAME, config);
1336    if (fs != FDB_RESULT_SUCCESS) {
1337        if (handle->node) {
1338            spin_lock(&root_handle->fhandle->lock);
1339            list_remove(root_handle->fhandle->handles, &handle->node->le);
1340            spin_unlock(&root_handle->fhandle->lock);
1341            free(handle->node);
1342        } // 'handle->node == NULL' happens only during rollback
1343        free(handle->kvs);
1344    }
1345    return fs;
1346}
1347
1348// 1) identify whether the requested KVS is default or non-default.
1349// 2) if the requested KVS is default,
1350//   2-1) if no KVS handle is opened yet from this fhandle,
1351//        -> return the root handle.
1352//   2-2) if the root handle is already opened,
1353//        -> allocate memory for handle, and call _fdb_open().
1354//        -> 'handle->kvs' will be created in _fdb_open(),
1355//           since it is treated as a default handle.
1356//        -> allocate a corresponding node and insert it into
1357//           fhandle->handles list.
1358// 3) if the requested KVS is non-default,
1359//    -> allocate memory for handle, and call _fdb_kvs_open().
1360LIBFDB_API
1361fdb_status fdb_kvs_open(fdb_file_handle *fhandle,
1362                        fdb_kvs_handle **ptr_handle,
1363                        const char *kvs_name,
1364                        fdb_kvs_config *kvs_config)
1365{
1366    fdb_kvs_handle *handle;
1367    fdb_config config;
1368    fdb_status fs;
1369    fdb_kvs_handle *root_handle;
1370    fdb_kvs_config config_local;
1371    struct filemgr *file = NULL;
1372    struct filemgr *latest_file = NULL;
1373
1374    if (!fhandle) {
1375        return FDB_RESULT_INVALID_HANDLE;
1376    }
1377    root_handle = fhandle->root;
1378    config = root_handle->config;
1379
1380    if (kvs_config) {
1381        if (validate_fdb_kvs_config(kvs_config)) {
1382            config_local = *kvs_config;
1383        } else {
1384            return FDB_RESULT_INVALID_CONFIG;
1385        }
1386    } else {
1387        config_local = get_default_kvs_config();
1388    }
1389
1390    fdb_check_file_reopen(root_handle, NULL);
1391    fdb_sync_db_header(root_handle);
1392
1393    file = root_handle->file;
1394    latest_file = root_handle->file;
1395
1396    if (kvs_name == NULL || !strcmp(kvs_name, default_kvs_name)) {
1397        // return the default KV store handle
1398        spin_lock(&fhandle->lock);
1399        if (!(fhandle->flags & FHANDLE_ROOT_OPENED)) {
1400            // the root handle is not opened yet
1401            // just return the root handle
1402            fdb_custom_cmp_variable default_kvs_cmp;
1403
1404            root_handle->kvs_config = config_local;
1405
1406            if (root_handle->file->kv_header) {
1407                // search fhandle's custom cmp func list first
1408                default_kvs_cmp = fdb_kvs_find_cmp_name(root_handle, (char *)kvs_name);
1409
1410                spin_lock(&root_handle->file->kv_header->lock);
1411                root_handle->file->kv_header->default_kvs_cmp = default_kvs_cmp;
1412
1413                if (root_handle->file->kv_header->default_kvs_cmp == NULL &&
1414                    root_handle->kvs_config.custom_cmp) {
1415                    // follow kvs_config's custom cmp next
1416                    root_handle->file->kv_header->default_kvs_cmp =
1417                        root_handle->kvs_config.custom_cmp;
1418                }
1419
1420                if (root_handle->file->kv_header->default_kvs_cmp) {
1421                    root_handle->file->kv_header->custom_cmp_enabled = 1;
1422                    fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1423                }
1424                spin_unlock(&root_handle->file->kv_header->lock);
1425            }
1426
1427            *ptr_handle = root_handle;
1428            fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1429            fhandle->flags |= FHANDLE_ROOT_OPENED;
1430            fs = FDB_RESULT_SUCCESS;
1431            spin_unlock(&fhandle->lock);
1432
1433        } else {
1434            // the root handle is already opened
1435            // open new default KV store handle
1436            spin_unlock(&fhandle->lock);
1437            handle = (fdb_kvs_handle*)calloc(1, sizeof(fdb_kvs_handle));
1438            handle->kvs_config = config_local;
1439            atomic_init_uint8_t(&handle->handle_busy, 0);
1440
1441            if (root_handle->file->kv_header) {
1442                spin_lock(&root_handle->file->kv_header->lock);
1443                handle->kvs_config.custom_cmp =
1444                    root_handle->file->kv_header->default_kvs_cmp;
1445                spin_unlock(&root_handle->file->kv_header->lock);
1446            }
1447
1448            handle->fhandle = fhandle;
1449            fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1450            if (fs != FDB_RESULT_SUCCESS) {
1451                free(handle);
1452                *ptr_handle = NULL;
1453            } else {
1454                // insert into fhandle's list
1455                struct kvs_opened_node *node;
1456                node = (struct kvs_opened_node *)
1457                       calloc(1, sizeof(struct kvs_opened_node));
1458                node->handle = handle;
1459                spin_lock(&fhandle->lock);
1460                list_push_front(fhandle->handles, &node->le);
1461                spin_unlock(&fhandle->lock);
1462
1463                handle->node = node;
1464                *ptr_handle = handle;
1465            }
1466        }
1467        return fs;
1468    }
1469
1470    if (config.multi_kv_instances == false) {
1471        // cannot open KV instance under single DB instance mode
1472        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_CONFIG,
1473                       "Cannot open KV store instance '%s' because multi-KV "
1474                       "store instance mode is disabled.",
1475                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1476    }
1477    if (root_handle->kvs->type != KVS_ROOT) {
1478        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_HANDLE,
1479                       "Cannot open KV store instance '%s' because the handle "
1480                       "doesn't support multi-KV sotre instance mode.",
1481                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1482    }
1483    if (root_handle->shandle) {
1484        // cannot open KV instance from a snapshot
1485        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_ARGS,
1486                       "Not allowed to open KV store instance '%s' from the "
1487                       "snapshot handle.",
1488                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1489    }
1490
1491    handle = (fdb_kvs_handle *)calloc(1, sizeof(fdb_kvs_handle));
1492    if (!handle) { // LCOV_EXCL_START
1493        return FDB_RESULT_ALLOC_FAIL;
1494    } // LCOV_EXCL_STOP
1495
1496    atomic_init_uint8_t(&handle->handle_busy, 0);
1497    handle->fhandle = fhandle;
1498    fs = _fdb_kvs_open(root_handle, &config, &config_local,
1499                       latest_file, file->filename, kvs_name, handle);
1500    if (fs == FDB_RESULT_SUCCESS) {
1501        *ptr_handle = handle;
1502    } else {
1503        *ptr_handle = NULL;
1504        free(handle);
1505    }
1506    return fs;
1507}
1508
1509LIBFDB_API
1510fdb_status fdb_kvs_open_default(fdb_file_handle *fhandle,
1511                                fdb_kvs_handle **ptr_handle,
1512                                fdb_kvs_config *config)
1513{
1514    return fdb_kvs_open(fhandle, ptr_handle, NULL, config);
1515}
1516
1517// 1) remove corresponding node from fhandle->handles list.
1518// 2) call _fdb_close().
1519static fdb_status _fdb_kvs_close(fdb_kvs_handle *handle)
1520{
1521    fdb_kvs_handle *root_handle = handle->kvs->root;
1522    fdb_status fs;
1523
1524    if (handle->node) {
1525        spin_lock(&root_handle->fhandle->lock);
1526        list_remove(root_handle->fhandle->handles, &handle->node->le);
1527        spin_unlock(&root_handle->fhandle->lock);
1528        free(handle->node);
1529    } // 'handle->node == NULL' happens only during rollback
1530
1531    fs = _fdb_close(handle);
1532    return fs;
1533}
1534
1535// close all sub-KV store handles belonging to the root handle
1536fdb_status fdb_kvs_close_all(fdb_kvs_handle *root_handle)
1537{
1538    fdb_status fs;
1539    struct list_elem *e;
1540    struct kvs_opened_node *node;
1541
1542    spin_lock(&root_handle->fhandle->lock);
1543    e = list_begin(root_handle->fhandle->handles);
1544    while (e) {
1545        node = _get_entry(e, struct kvs_opened_node, le);
1546        e = list_remove(root_handle->fhandle->handles, &node->le);
1547        fs = _fdb_close(node->handle);
1548        if (fs != FDB_RESULT_SUCCESS) {
1549            spin_unlock(&root_handle->fhandle->lock);
1550            return fs;
1551        }
1552        fdb_kvs_info_free(node->handle);
1553        free(node->handle);
1554        free(node);
1555    }
1556    spin_unlock(&root_handle->fhandle->lock);
1557
1558    return FDB_RESULT_SUCCESS;
1559}
1560
1561// 1) identify whether the requested handle is for default KVS or not.
1562// 2) if the requested handle is for the default KVS,
1563//   2-1) if the requested handle is the root handle,
1564//        -> just clear the OPENED flag.
1565//   2-2) if the requested handle is not the root handle,
1566//        -> call _fdb_close(),
1567//        -> free 'handle->kvs' by calling fdb_kvs_info_free(),
1568//        -> remove the corresponding node from fhandle->handles list,
1569//        -> free the memory for the handle.
1570// 3) if the requested handle is for non-default KVS,
1571//    -> call _fdb_kvs_close(),
1572//       -> this will remove the node from fhandle->handles list.
1573//    -> free 'handle->kvs' by calling fdb_kvs_info_free(),
1574//    -> free the memory for the handle.
1575LIBFDB_API
1576fdb_status fdb_kvs_close(fdb_kvs_handle *handle)
1577{
1578    fdb_status fs;
1579
1580    if (!handle) {
1581        return FDB_RESULT_INVALID_HANDLE;
1582    }
1583
1584    if (handle->shandle && handle->kvs == NULL) {
1585        // snapshot of the default KV store + single KV store mode
1586        // directly close handle
1587        // (snapshot of the other KV stores will be closed
1588        //  using _fdb_kvs_close(...) below)
1589        fs = _fdb_close(handle);
1590        if (fs == FDB_RESULT_SUCCESS) {
1591            free(handle);
1592        }
1593        return fs;
1594    }
1595
1596    if (handle->kvs == NULL ||
1597        handle->kvs->type == KVS_ROOT) {
1598        // the default KV store handle
1599
1600        if (handle->fhandle->root == handle) {
1601            // do nothing for root handle
1602            // the root handle will be closed with fdb_close() API call.
1603            spin_lock(&handle->fhandle->lock);
1604            handle->fhandle->flags &= ~FHANDLE_ROOT_OPENED; // remove flag
1605            spin_unlock(&handle->fhandle->lock);
1606            return FDB_RESULT_SUCCESS;
1607
1608        } else {
1609            // the default KV store but not the root handle .. normally close
1610            spin_lock(&handle->fhandle->lock);
1611            fs = _fdb_close(handle);
1612            if (fs == FDB_RESULT_SUCCESS) {
1613                // remove from 'handles' list in the root node
1614                if (handle->kvs) {
1615                    fdb_kvs_info_free(handle);
1616                }
1617                list_remove(handle->fhandle->handles, &handle->node->le);
1618                spin_unlock(&handle->fhandle->lock);
1619                free(handle->node);
1620                free(handle);
1621            } else {
1622                spin_unlock(&handle->fhandle->lock);
1623            }
1624            return fs;
1625        }
1626    }
1627
1628    if (handle->kvs && handle->kvs->root == NULL) {
1629        return FDB_RESULT_INVALID_ARGS;
1630    }
1631    fs = _fdb_kvs_close(handle);
1632    if (fs == FDB_RESULT_SUCCESS) {
1633        fdb_kvs_info_free(handle);
1634        free(handle);
1635    }
1636    return fs;
1637}
1638
1639static
1640fdb_status _fdb_kvs_remove(fdb_file_handle *fhandle,
1641                           const char *kvs_name,
1642                           bool rollback_recreate)
1643{
1644    size_t size_chunk, size_id;
1645    uint8_t *_kv_id;
1646    fdb_status fs = FDB_RESULT_SUCCESS;
1647    fdb_kvs_id_t kv_id = 0;
1648    fdb_kvs_handle *root_handle;
1649    struct avl_node *a = NULL;
1650    struct list_elem *e;
1651    struct filemgr *file;
1652    struct docio_handle *dhandle;
1653    struct kvs_node *node, query;
1654    struct kvs_header *kv_header;
1655    struct kvs_opened_node *opened_node;
1656
1657    if (!fhandle) {
1658        return FDB_RESULT_INVALID_HANDLE;
1659    }
1660    root_handle = fhandle->root;
1661
1662    if (root_handle->config.multi_kv_instances == false) {
1663        // cannot remove the KV instance under single DB instance mode
1664        return FDB_RESULT_INVALID_CONFIG;
1665    }
1666    if (root_handle->kvs->type != KVS_ROOT) {
1667        return FDB_RESULT_INVALID_HANDLE;
1668    }
1669
1670fdb_kvs_remove_start:
1671    if (!rollback_recreate) {
1672        fdb_check_file_reopen(root_handle, NULL);
1673        filemgr_mutex_lock(root_handle->file);
1674        fdb_sync_db_header(root_handle);
1675
1676        if (filemgr_is_rollback_on(root_handle->file)) {
1677            filemgr_mutex_unlock(root_handle->file);
1678            return FDB_RESULT_FAIL_BY_ROLLBACK;
1679        }
1680    } else {
1681        filemgr_mutex_lock(root_handle->file);
1682    }
1683
1684    file = root_handle->file;
1685    dhandle = root_handle->dhandle;
1686
1687    file_status_t fstatus = filemgr_get_file_status(file);
1688    if (fstatus == FILE_REMOVED_PENDING) {
1689        // we must not write into this file
1690        // file status was changed by other thread .. start over
1691        filemgr_mutex_unlock(file);
1692        goto fdb_kvs_remove_start;
1693    } else if (fstatus == FILE_COMPACT_OLD) {
1694        // Cannot remove existing KV store during compaction.
1695        // To remove a KV store, the corresponding first chunk in HB+trie
1696        // should be unlinked. This can be possible in the old file during
1697        // compaction, but impossible in the new file, since existing documents
1698        // (including docs belonging to the KV store to be removed) are being moved.
1699        filemgr_mutex_unlock(file);
1700        return FDB_RESULT_FAIL_BY_COMPACTION;
1701    }
1702
1703    // find the kvs_node and remove
1704
1705    // search by name to get ID
1706    spin_lock(&root_handle->fhandle->lock);
1707
1708    if (kvs_name == NULL || !strcmp(kvs_name, default_kvs_name)) {
1709        if (!rollback_recreate) {
1710            // default KV store .. KV ID = 0
1711            kv_id = 0;
1712            e = list_begin(root_handle->fhandle->handles);
1713            while (e) {
1714                opened_node = _get_entry(e, struct kvs_opened_node, le);
1715                if ((opened_node->handle->kvs &&
1716                     opened_node->handle->kvs->id == kv_id) ||
1717                     opened_node->handle->kvs == NULL) // single KVS mode
1718                {
1719                    // there is an opened handle
1720                    spin_unlock(&root_handle->fhandle->lock);
1721                    filemgr_mutex_unlock(file);
1722                    return FDB_RESULT_KV_STORE_BUSY;
1723                }
1724                e = list_next(e);
1725            }
1726        }
1727        // reset KVS stats (excepting for WAL stats)
1728        file->header.stat.ndocs = 0;
1729        file->header.stat.nlivenodes = 0;
1730        file->header.stat.datasize = 0;
1731        // reset seqnum
1732        filemgr_set_seqnum(file, 0);
1733        spin_unlock(&root_handle->fhandle->lock);
1734    } else {
1735        kv_header = file->kv_header;
1736        spin_lock(&kv_header->lock);
1737        query.kvs_name = (char*)kvs_name;
1738        a = avl_search(kv_header->idx_name, &query.avl_name, _kvs_cmp_name);
1739        if (a == NULL) { // KV name doesn't exist
1740            spin_unlock(&kv_header->lock);
1741            spin_unlock(&root_handle->fhandle->lock);
1742            filemgr_mutex_unlock(file);
1743            return FDB_RESULT_KV_STORE_NOT_FOUND;
1744        }
1745        node = _get_entry(a, struct kvs_node, avl_name);
1746        kv_id = node->id;
1747
1748        if (!rollback_recreate) {
1749            e = list_begin(root_handle->fhandle->handles);
1750            while (e) {
1751                opened_node = _get_entry(e, struct kvs_opened_node, le);
1752                if (opened_node->handle->kvs &&
1753                    opened_node->handle->kvs->id == kv_id) {
1754                    // there is an opened handle
1755                    spin_unlock(&kv_header->lock);
1756                    spin_unlock(&root_handle->fhandle->lock);
1757                    filemgr_mutex_unlock(file);
1758                    return FDB_RESULT_KV_STORE_BUSY;
1759                }
1760                e = list_next(e);
1761            }
1762
1763            avl_remove(kv_header->idx_name, &node->avl_name);
1764            avl_remove(kv_header->idx_id, &node->avl_id);
1765            spin_unlock(&kv_header->lock);
1766            spin_unlock(&root_handle->fhandle->lock);
1767
1768            kv_id = node->id;
1769
1770            // free node
1771            free(node->kvs_name);
1772            free(node);
1773        } else {
1774            // reset all stats except for WAL
1775            node->stat.ndocs = 0;
1776            node->stat.nlivenodes = 0;
1777            node->stat.datasize = 0;
1778            node->seqnum = 0;
1779            spin_unlock(&kv_header->lock);
1780            spin_unlock(&root_handle->fhandle->lock);
1781        }
1782    }
1783
1784    // discard all WAL entries
1785    wal_close_kv_ins(file, kv_id);
1786
1787    // sync dirty root nodes
1788    bid_t dirty_idtree_root, dirty_seqtree_root;
1789    filemgr_get_dirty_root(root_handle->file, &dirty_idtree_root, &dirty_seqtree_root);
1790    if (dirty_idtree_root != BLK_NOT_FOUND) {
1791        root_handle->trie->root_bid = dirty_idtree_root;
1792    }
1793    if (root_handle->config.seqtree_opt == FDB_SEQTREE_USE &&
1794        dirty_seqtree_root != BLK_NOT_FOUND) {
1795        if (root_handle->kvs) {
1796            root_handle->seqtrie->root_bid = dirty_seqtree_root;
1797        } else {
1798            btree_init_from_bid(root_handle->seqtree,
1799                                root_handle->seqtree->blk_handle,
1800                                root_handle->seqtree->blk_ops,
1801                                root_handle->seqtree->kv_ops,
1802                                root_handle->seqtree->blksize,
1803                                dirty_seqtree_root);
1804        }
1805    }
1806
1807    size_id = sizeof(fdb_kvs_id_t);
1808    size_chunk = root_handle->trie->chunksize;
1809
1810    // remove from super handle's HB+trie
1811    _kv_id = alca(uint8_t, size_chunk);
1812    kvid2buf(size_chunk, kv_id, _kv_id);
1813    hbtrie_remove_partial(root_handle->trie, _kv_id, size_chunk);
1814    btreeblk_end(root_handle->bhandle);
1815
1816    if (root_handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1817        _kv_id = alca(uint8_t, size_id);
1818        kvid2buf(size_id, kv_id, _kv_id);
1819        hbtrie_remove_partial(root_handle->seqtrie, _kv_id, size_id);
1820        btreeblk_end(root_handle->bhandle);
1821    }
1822
1823    // append system doc
1824    root_handle->kv_info_offset = fdb_kvs_header_append(file, dhandle);
1825
1826    // if no compaction is being performed, append header and commit
1827    if (root_handle->file == file) {
1828        root_handle->cur_header_revnum = fdb_set_file_header(root_handle);
1829        fs = filemgr_commit(root_handle->file, &root_handle->log_callback);
1830    }
1831
1832    filemgr_mutex_unlock(file);
1833
1834    return fs;
1835}
1836
1837fdb_status fdb_kvs_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
1838{
1839    fdb_config config;
1840    fdb_kvs_config kvs_config;
1841    fdb_kvs_handle *handle_in, *handle, *super_handle;
1842    fdb_status fs;
1843    fdb_seqnum_t old_seqnum;
1844    fdb_file_handle *fhandle;
1845    char *kvs_name;
1846
1847    if (!handle_ptr) {
1848        return FDB_RESULT_INVALID_ARGS;
1849    }
1850
1851    handle_in = *handle_ptr;
1852    if (!handle_in->kvs) {
1853        return FDB_RESULT_INVALID_ARGS;
1854    }
1855    super_handle = handle_in->kvs->root;
1856    fhandle = handle_in->fhandle;
1857    config = handle_in->config;
1858    kvs_config = handle_in->kvs_config;
1859
1860    // Sequence trees are a must for rollback
1861    if (handle_in->config.seqtree_opt != FDB_SEQTREE_USE) {
1862        return FDB_RESULT_INVALID_CONFIG;
1863    }
1864
1865    if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
1866        return fdb_log(&handle_in->log_callback,
1867                       FDB_RESULT_RONLY_VIOLATION,
1868                       "Warning: Rollback is not allowed on "
1869                       "the read-only DB file '%s'.",
1870                       handle_in->file->filename);
1871    }
1872
1873    filemgr_mutex_lock(handle_in->file);
1874    filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
1875    // All transactions should be closed before rollback
1876    if (wal_txn_exists(handle_in->file)) {
1877        filemgr_set_rollback(handle_in->file, 0);
1878        filemgr_mutex_unlock(handle_in->file);
1879        return FDB_RESULT_FAIL_BY_TRANSACTION;
1880    }
1881
1882    // If compaction is running, wait until it is aborted.
1883    // TODO: Find a better way of waiting for the compaction abortion.
1884    unsigned int sleep_time = 10000; // 10 ms.
1885    file_status_t fstatus = filemgr_get_file_status(handle_in->file);
1886    while (fstatus == FILE_COMPACT_OLD) {
1887        filemgr_mutex_unlock(handle_in->file);
1888        decaying_usleep(&sleep_time, 1000000);
1889        filemgr_mutex_lock(handle_in->file);
1890        fstatus = filemgr_get_file_status(handle_in->file);
1891    }
1892    if (fstatus == FILE_REMOVED_PENDING) {
1893        filemgr_mutex_unlock(handle_in->file);
1894        fdb_check_file_reopen(handle_in, NULL);
1895    } else {
1896        filemgr_mutex_unlock(handle_in->file);
1897    }
1898
1899    fdb_sync_db_header(handle_in);
1900
1901    // if the max sequence number seen by this handle is lower than the
1902    // requested snapshot marker, it means the snapshot is not yet visible
1903    // even via the current fdb_kvs_handle
1904    if (seqnum > handle_in->seqnum) {
1905        filemgr_set_rollback(super_handle->file, 0); // allow mutations
1906        return FDB_RESULT_NO_DB_INSTANCE;
1907    }
1908
1909    kvs_name = _fdb_kvs_get_name(handle_in, handle_in->file);
1910    if (seqnum == 0) { // Handle special case of rollback to zero..
1911        fs = _fdb_kvs_remove(fhandle, kvs_name, true /*recreate!*/);
1912        filemgr_set_rollback(super_handle->file, 0); // allow mutations
1913        return fs;
1914    }
1915
1916    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
1917    if (!handle) { // LCOV_EXCL_START
1918        filemgr_set_rollback(handle_in->file, 0); // allow mutations
1919        return FDB_RESULT_ALLOC_FAIL;
1920    } // LCOV_EXCL_STOP
1921
1922    handle->max_seqnum = seqnum;
1923    handle->log_callback = handle_in->log_callback;
1924    handle->fhandle = fhandle;
1925    atomic_init_uint8_t(&handle->handle_busy, 0);
1926
1927    if (handle_in->kvs->type == KVS_SUB) {
1928        fs = _fdb_kvs_open(handle_in->kvs->root,
1929                           &config,
1930                           &kvs_config,
1931                           handle_in->file,
1932                           handle_in->file->filename,
1933                           kvs_name,
1934                           handle);
1935    } else {
1936        fs = _fdb_open(handle, handle_in->file->filename,
1937                       FDB_AFILENAME, &config);
1938    }
1939    filemgr_set_rollback(handle_in->file, 0); // allow mutations
1940
1941    if (fs == FDB_RESULT_SUCCESS) {
1942        // get KV instance's sub B+trees' root node BIDs
1943        // from both ID-tree and Seq-tree, AND
1944        // replace current handle's sub B+trees' root node BIDs
1945        // by old BIDs
1946        size_t size_chunk, size_id;
1947        bid_t id_root, seq_root, dummy;
1948        uint8_t *_kv_id;
1949        hbtrie_result hr;
1950
1951        size_chunk = handle->trie->chunksize;
1952        size_id = sizeof(fdb_kvs_id_t);
1953
1954        filemgr_mutex_lock(handle_in->file);
1955
1956        // read root BID of the KV instance from the old handle
1957        // and overwrite into the current handle
1958        _kv_id = alca(uint8_t, size_chunk);
1959        kvid2buf(size_chunk, handle->kvs->id, _kv_id);
1960        hr = hbtrie_find_partial(handle->trie, _kv_id,
1961                                 size_chunk, &id_root);
1962        btreeblk_end(handle->bhandle);
1963        if (hr == HBTRIE_RESULT_SUCCESS) {
1964            hbtrie_insert_partial(super_handle->trie,
1965                                  _kv_id, size_chunk,
1966                                  &id_root, &dummy);
1967        } else { // No Trie info in rollback header.
1968                 // Erase kv store from super handle's main index.
1969            hbtrie_remove_partial(super_handle->trie, _kv_id, size_chunk);
1970        }
1971        btreeblk_end(super_handle->bhandle);
1972
1973        // same as above for seq-trie
1974        _kv_id = alca(uint8_t, size_id);
1975        kvid2buf(size_id, handle->kvs->id, _kv_id);
1976        hr = hbtrie_find_partial(handle->seqtrie, _kv_id,
1977                                 size_id, &seq_root);
1978        btreeblk_end(handle->bhandle);
1979        if (hr == HBTRIE_RESULT_SUCCESS) {
1980            hbtrie_insert_partial(super_handle->seqtrie,
1981                                  _kv_id, size_id,
1982                                  &seq_root, &dummy);
1983        } else { // No seqtrie info in rollback header.
1984                 // Erase kv store from super handle's seqtrie index.
1985            hbtrie_remove_partial(super_handle->seqtrie, _kv_id, size_id);
1986        }
1987        btreeblk_end(super_handle->bhandle);
1988
1989        old_seqnum = fdb_kvs_get_seqnum(handle_in->file,
1990                                        handle_in->kvs->id);
1991        fdb_kvs_set_seqnum(handle_in->file,
1992                           handle_in->kvs->id, seqnum);
1993        handle_in->seqnum = seqnum;
1994        filemgr_mutex_unlock(handle_in->file);
1995
1996        fs = _fdb_commit(super_handle, FDB_COMMIT_NORMAL);
1997        if (fs == FDB_RESULT_SUCCESS) {
1998            _fdb_kvs_close(handle);
1999            *handle_ptr = handle_in;
2000            fdb_kvs_info_free(handle);
2001            free(handle);
2002        } else {
2003            // cancel the rolling-back of the sequence number
2004            fdb_log(&handle_in->log_callback, fs,
2005                    "Rollback failed due to a commit failure with a sequence "
2006                    "number %" _F64, seqnum);
2007            filemgr_mutex_lock(handle_in->file);
2008            fdb_kvs_set_seqnum(handle_in->file,
2009                               handle_in->kvs->id, old_seqnum);
2010            filemgr_mutex_unlock(handle_in->file);
2011            _fdb_kvs_close(handle);
2012            fdb_kvs_info_free(handle);
2013            free(handle);
2014        }
2015    } else {
2016        free(handle);
2017    }
2018
2019    return fs;
2020}
2021
2022LIBFDB_API
2023fdb_status fdb_kvs_remove(fdb_file_handle *fhandle,
2024                          const char *kvs_name)
2025{
2026    return _fdb_kvs_remove(fhandle, kvs_name, false);
2027}
2028
2029LIBFDB_API
2030fdb_status fdb_get_kvs_info(fdb_kvs_handle *handle, fdb_kvs_info *info)
2031{
2032    uint64_t ndocs;
2033    uint64_t wal_docs;
2034    uint64_t wal_deletes;
2035    uint64_t wal_n_inserts;
2036    uint64_t datasize;
2037    uint64_t nlivenodes;
2038    fdb_kvs_id_t kv_id;
2039    struct avl_node *a;
2040    struct filemgr *file;
2041    struct kvs_node *node, query;
2042    struct kvs_header *kv_header;
2043    struct kvs_stat stat;
2044
2045    if (!handle || !info) {
2046        return FDB_RESULT_INVALID_ARGS;
2047    }
2048
2049    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2050        return FDB_RESULT_HANDLE_BUSY;
2051    }
2052
2053    if (!handle->shandle) { // snapshot handle should be immutable
2054        fdb_check_file_reopen(handle, NULL);
2055        fdb_sync_db_header(handle);
2056    }
2057
2058    file = handle->file;
2059
2060    if (handle->kvs == NULL) {
2061        info->name = default_kvs_name;
2062        kv_id = 0;
2063
2064    } else {
2065        kv_header = file->kv_header;
2066        kv_id = handle->kvs->id;
2067        spin_lock(&kv_header->lock);
2068
2069        query.id = handle->kvs->id;
2070        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
2071        if (a) { // sub handle
2072            node = _get_entry(a, struct kvs_node, avl_id);
2073            info->name = (const char*)node->kvs_name;
2074        } else { // root handle
2075            info->name = default_kvs_name;
2076        }
2077        spin_unlock(&kv_header->lock);
2078    }
2079
2080    if (handle->shandle) {
2081        // snapshot .. get its local stats
2082        snap_get_stat(handle->shandle, &stat);
2083    } else {
2084        _kvs_stat_get(file, kv_id, &stat);
2085    }
2086    ndocs = stat.ndocs;
2087    wal_docs = stat.wal_ndocs;
2088    wal_deletes = stat.wal_ndeletes;
2089    wal_n_inserts = wal_docs - wal_deletes;
2090
2091    if (ndocs + wal_n_inserts < wal_deletes) {
2092        info->doc_count = 0;
2093    } else {
2094        if (ndocs) {
2095            info->doc_count = ndocs + wal_n_inserts - wal_deletes;
2096        } else {
2097            info->doc_count = wal_n_inserts;
2098        }
2099    }
2100
2101    datasize = stat.datasize;
2102    nlivenodes = stat.nlivenodes;
2103
2104    info->space_used = datasize;
2105    info->space_used += nlivenodes * handle->config.blocksize;
2106    info->file = handle->fhandle;
2107
2108    fdb_assert(atomic_cas_uint8_t(&handle->handle_busy, 1, 0), 1, 0);
2109
2110    // This is another LIBFDB_API call, so handle is marked as free
2111    // in the line above before making this call
2112    fdb_get_kvs_seqnum(handle, &info->last_seqnum);
2113
2114    return FDB_RESULT_SUCCESS;
2115}
2116
2117LIBFDB_API
2118fdb_status fdb_get_kvs_ops_info(fdb_kvs_handle *handle, fdb_kvs_ops_info *info)
2119{
2120    fdb_kvs_id_t kv_id;
2121    struct filemgr *file;
2122    struct kvs_ops_stat stat;
2123    struct kvs_ops_stat root_stat;
2124    fdb_kvs_handle *root_handle = handle->fhandle->root;
2125
2126    if (!handle || !info) {
2127        return FDB_RESULT_INVALID_ARGS;
2128    }
2129
2130    // for snapshot handle do not reopen new file as user is interested in
2131    // reader stats from the old file
2132    if (!handle->shandle) {
2133        // always get stats from the latest file
2134        fdb_check_file_reopen(handle, NULL);
2135        fdb_sync_db_header(handle);
2136    }
2137
2138    file = handle->file;
2139
2140    if (handle->kvs == NULL) {
2141        kv_id = 0;
2142    } else {
2143        kv_id = handle->kvs->id;
2144    }
2145
2146    _kvs_ops_stat_get(file, kv_id, &stat);
2147
2148    if (root_handle != handle) {
2149        _kvs_ops_stat_get(file, 0, &root_stat);
2150    } else {
2151        root_stat = stat;
2152    }
2153
2154    info->num_sets = atomic_get_uint64_t(&stat.num_sets);
2155    info->num_dels = atomic_get_uint64_t(&stat.num_dels);
2156    info->num_gets = atomic_get_uint64_t(&stat.num_gets);
2157    info->num_iterator_gets = atomic_get_uint64_t(&stat.num_iterator_gets);
2158    info->num_iterator_gets = atomic_get_uint64_t(&stat.num_iterator_gets);
2159    info->num_iterator_moves = atomic_get_uint64_t(&stat.num_iterator_moves);
2160
2161    info->num_commits = atomic_get_uint64_t(&root_stat.num_commits);
2162    info->num_compacts = atomic_get_uint64_t(&root_stat.num_compacts);
2163    return FDB_RESULT_SUCCESS;
2164}
2165
2166fdb_status fdb_get_kvs_name_list(fdb_file_handle *fhandle,
2167                                 fdb_kvs_name_list *kvs_name_list)
2168{
2169    size_t num, size, offset;
2170    char *ptr;
2171    char **segment;
2172    fdb_kvs_handle *root_handle;
2173    struct kvs_header *kv_header;
2174    struct kvs_node *node;
2175    struct avl_node *a;
2176
2177    if (!fhandle || !kvs_name_list) {
2178        return FDB_RESULT_INVALID_ARGS;
2179    }
2180
2181    root_handle = fhandle->root;
2182    kv_header = root_handle->file->kv_header;
2183
2184    spin_lock(&kv_header->lock);
2185    // sum all lengths of KVS names first
2186    // (to calculate the size of memory segment to be allocated)
2187    num = 1;
2188    size = strlen(default_kvs_name) + 1;
2189    a = avl_first(kv_header->idx_id);
2190    while (a) {
2191        node = _get_entry(a, struct kvs_node, avl_id);
2192        a = avl_next(&node->avl_id);
2193
2194        num++;
2195        size += strlen(node->kvs_name) + 1;
2196    }
2197    size += num * sizeof(char*);
2198
2199    // allocate memory segment
2200    segment = (char**)calloc(1, size);
2201    kvs_name_list->num_kvs_names = num;
2202    kvs_name_list->kvs_names = segment;
2203
2204    ptr = (char*)segment + num * sizeof(char*);
2205    offset = num = 0;
2206
2207    // copy default KVS name
2208    strcpy(ptr + offset, default_kvs_name);
2209    segment[num] = ptr + offset;
2210    num++;
2211    offset += strlen(default_kvs_name) + 1;
2212
2213    // copy the others
2214    a = avl_first(kv_header->idx_name);
2215    while (a) {
2216        node = _get_entry(a, struct kvs_node, avl_name);
2217        a = avl_next(&node->avl_name);
2218
2219        strcpy(ptr + offset, node->kvs_name);
2220        segment[num] = ptr + offset;
2221
2222        num++;
2223        offset += strlen(node->kvs_name) + 1;
2224    }
2225
2226    spin_unlock(&kv_header->lock);
2227
2228    return FDB_RESULT_SUCCESS;
2229}
2230
2231LIBFDB_API
2232fdb_status fdb_free_kvs_name_list(fdb_kvs_name_list *kvs_name_list)
2233{
2234    if (!kvs_name_list) {
2235        return FDB_RESULT_INVALID_ARGS;
2236    }
2237    free(kvs_name_list->kvs_names);
2238    kvs_name_list->kvs_names = NULL;
2239    kvs_name_list->num_kvs_names = 0;
2240
2241    return FDB_RESULT_SUCCESS;
2242}
2243