xref: /4.6.0/forestdb/src/kv_instance.cc (revision b9e5d3c2)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdlib.h>
19#include <string.h>
20
21#include "libforestdb/forestdb.h"
22#include "common.h"
23#include "internal_types.h"
24#include "fdb_internal.h"
25#include "configuration.h"
26#include "avltree.h"
27#include "list.h"
28#include "docio.h"
29#include "filemgr.h"
30#include "wal.h"
31#include "hbtrie.h"
32#include "btreeblock.h"
33#include "version.h"
34#include "staleblock.h"
35
36#include "memleak.h"
37#include "timing.h"
38#include "time_utils.h"
39
40static const char *default_kvs_name = DEFAULT_KVS_NAME;
41
42// list element for opened KV store handles
43// (in-memory data: managed by the file handle)
44struct kvs_opened_node {
45    fdb_kvs_handle *handle;
46    struct list_elem le;
47};
48
49// list element for custom cmp functions in fhandle
50struct cmp_func_node {
51    char *kvs_name;
52    fdb_custom_cmp_variable func;
53    struct list_elem le;
54};
55
56static int _kvs_cmp_name(struct avl_node *a, struct avl_node *b, void *aux)
57{
58    struct kvs_node *aa, *bb;
59    aa = _get_entry(a, struct kvs_node, avl_name);
60    bb = _get_entry(b, struct kvs_node, avl_name);
61    return strcmp(aa->kvs_name, bb->kvs_name);
62}
63
64static int _kvs_cmp_id(struct avl_node *a, struct avl_node *b, void *aux)
65{
66    struct kvs_node *aa, *bb;
67    aa = _get_entry(a, struct kvs_node, avl_id);
68    bb = _get_entry(b, struct kvs_node, avl_id);
69
70    if (aa->id < bb->id) {
71        return -1;
72    } else if (aa->id > bb->id) {
73        return 1;
74    } else {
75        return 0;
76    }
77}
78
79static bool _fdb_kvs_any_handle_opened(fdb_file_handle *fhandle,
80                                       fdb_kvs_id_t kv_id)
81{
82    struct filemgr *file = fhandle->root->file;
83    struct avl_node *a;
84    struct list_elem *e;
85    struct filemgr_fhandle_idx_node *fhandle_node;
86    struct kvs_opened_node *opened_node;
87    fdb_file_handle *file_handle;
88
89    spin_lock(&file->fhandle_idx_lock);
90    a = avl_first(&file->fhandle_idx);
91    while (a) {
92        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
93        a = avl_next(a);
94        file_handle = (fdb_file_handle *) fhandle_node->fhandle;
95        spin_lock(&file_handle->lock);
96        e = list_begin(file_handle->handles);
97        while (e) {
98            opened_node = _get_entry(e, struct kvs_opened_node, le);
99            if ((opened_node->handle->kvs && opened_node->handle->kvs->id == kv_id) ||
100                (kv_id == 0 && opened_node->handle->kvs == NULL)) // single KVS mode
101            {
102                // there is an opened handle
103                spin_unlock(&file_handle->lock);
104                spin_unlock(&file->fhandle_idx_lock);
105                return true;
106            }
107            e = list_next(e);
108        }
109        spin_unlock(&file_handle->lock);
110    }
111    spin_unlock(&file->fhandle_idx_lock);
112
113    return false;
114}
115
116void fdb_file_handle_init(fdb_file_handle *fhandle,
117                           fdb_kvs_handle *root)
118{
119    fhandle->root = root;
120    fhandle->flags = 0x0;
121    root->fhandle = fhandle;
122    fhandle->handles = (struct list*)calloc(1, sizeof(struct list));
123    fhandle->cmp_func_list = NULL;
124    spin_init(&fhandle->lock);
125}
126
127void fdb_file_handle_close_all(fdb_file_handle *fhandle)
128{
129    struct list_elem *e;
130    struct kvs_opened_node *node;
131
132    spin_lock(&fhandle->lock);
133    e = list_begin(fhandle->handles);
134    while (e) {
135        node = _get_entry(e, struct kvs_opened_node, le);
136        e = list_next(e);
137        _fdb_close(node->handle);
138        free(node->handle);
139        free(node);
140    }
141    spin_unlock(&fhandle->lock);
142}
143
144void fdb_file_handle_parse_cmp_func(fdb_file_handle *fhandle,
145                                    size_t n_func,
146                                    char **kvs_names,
147                                    fdb_custom_cmp_variable *functions)
148{
149    uint64_t i;
150    struct cmp_func_node *node;
151
152    if (n_func == 0 || !kvs_names || !functions) {
153        return;
154    }
155
156    fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
157    list_init(fhandle->cmp_func_list);
158
159    for (i=0;i<n_func;++i){
160        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
161        if (kvs_names[i]) {
162            node->kvs_name = (char*)calloc(1, strlen(kvs_names[i])+1);
163            strcpy(node->kvs_name, kvs_names[i]);
164        } else {
165            // NULL .. default KVS
166            node->kvs_name = NULL;
167        }
168        node->func = functions[i];
169        list_push_back(fhandle->cmp_func_list, &node->le);
170    }
171}
172
173// clone all items in cmp_func_list to fhandle->cmp_func_list
174void fdb_file_handle_clone_cmp_func_list(fdb_file_handle *fhandle,
175                                         struct list *cmp_func_list)
176{
177    struct list_elem *e;
178    struct cmp_func_node *src, *dst;
179
180    if (fhandle->cmp_func_list || /* already exist */
181        !cmp_func_list) {
182        return;
183    }
184
185    fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
186    list_init(fhandle->cmp_func_list);
187
188    e = list_begin(cmp_func_list);
189    while (e) {
190        src = _get_entry(e, struct cmp_func_node, le);
191        dst = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
192        if (src->kvs_name) {
193            dst->kvs_name = (char*)calloc(1, strlen(src->kvs_name)+1);
194            strcpy(dst->kvs_name, src->kvs_name);
195        } else {
196            dst->kvs_name = NULL; // default KVS
197        }
198        dst->func = src->func;
199        list_push_back(fhandle->cmp_func_list, &dst->le);
200        e = list_next(&src->le);
201    }
202}
203
204void fdb_file_handle_add_cmp_func(fdb_file_handle *fhandle,
205                                  char *kvs_name,
206                                  fdb_custom_cmp_variable cmp_func)
207{
208    struct cmp_func_node *node;
209
210    // create list if not exist
211    if (!fhandle->cmp_func_list) {
212        fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
213        list_init(fhandle->cmp_func_list);
214    }
215
216    node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
217    if (kvs_name) {
218        node->kvs_name = (char*)calloc(1, strlen(kvs_name)+1);
219        strcpy(node->kvs_name, kvs_name);
220    } else {
221        // default KVS
222        node->kvs_name = NULL;
223    }
224    node->func = cmp_func;
225    list_push_back(fhandle->cmp_func_list, &node->le);
226}
227
228void fdb_cmp_func_list_from_filemgr(struct filemgr *file, struct list *cmp_func_list)
229{
230    if (!file || !file->kv_header || !cmp_func_list) {
231        return;
232    }
233
234    struct cmp_func_node *node;
235
236    spin_lock(&file->kv_header->lock);
237    // Default KV store cmp function
238    if (file->kv_header->default_kvs_cmp) {
239        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
240        node->func = file->kv_header->default_kvs_cmp;
241        node->kvs_name = NULL;
242        list_push_back(cmp_func_list, &node->le);
243    }
244
245    // Rest of KV stores
246    struct kvs_node *kvs_node;
247    struct avl_node *a = avl_first(file->kv_header->idx_name);
248    while (a) {
249        kvs_node = _get_entry(a, struct kvs_node, avl_name);
250        a = avl_next(a);
251        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
252        node->func = kvs_node->custom_cmp;
253        node->kvs_name = (char*)calloc(1, strlen(kvs_node->kvs_name)+1);
254        strcpy(node->kvs_name, kvs_node->kvs_name);
255        list_push_back(cmp_func_list, &node->le);
256    }
257    spin_unlock(&file->kv_header->lock);
258}
259
260void fdb_free_cmp_func_list(struct list *cmp_func_list)
261{
262    if (!cmp_func_list) {
263        return;
264    }
265
266    struct cmp_func_node *cmp_node;
267    struct list_elem *e = list_begin(cmp_func_list);
268    while (e) {
269        cmp_node = _get_entry(e, struct cmp_func_node, le);
270        e = list_remove(cmp_func_list, &cmp_node->le);
271        free(cmp_node->kvs_name);
272        free(cmp_node);
273    }
274}
275
276static void _free_cmp_func_list(fdb_file_handle *fhandle)
277{
278    struct list_elem *e;
279    struct cmp_func_node *cmp_node;
280
281    if (!fhandle->cmp_func_list) {
282        return;
283    }
284
285    e = list_begin(fhandle->cmp_func_list);
286    while (e) {
287        cmp_node = _get_entry(e, struct cmp_func_node, le);
288        e = list_remove(fhandle->cmp_func_list, &cmp_node->le);
289
290        free(cmp_node->kvs_name);
291        free(cmp_node);
292    }
293    free(fhandle->cmp_func_list);
294    fhandle->cmp_func_list = NULL;
295}
296
297void fdb_file_handle_free(fdb_file_handle *fhandle)
298{
299    free(fhandle->handles);
300    _free_cmp_func_list(fhandle);
301    spin_destroy(&fhandle->lock);
302    free(fhandle);
303}
304
305fdb_status fdb_kvs_cmp_check(fdb_kvs_handle *handle)
306{
307    int ori_flag;
308    fdb_file_handle *fhandle = handle->fhandle;
309    fdb_custom_cmp_variable ori_custom_cmp;
310    struct filemgr *file = handle->file;
311    struct cmp_func_node *cmp_node;
312    struct kvs_node *kvs_node, query;
313    struct list_elem *e;
314    struct avl_node *a;
315
316    spin_lock(&file->kv_header->lock);
317    ori_flag = file->kv_header->custom_cmp_enabled;
318    ori_custom_cmp = file->kv_header->default_kvs_cmp;
319
320    if (fhandle->cmp_func_list) {
321        handle->kvs_config.custom_cmp = NULL;
322
323        e = list_begin(fhandle->cmp_func_list);
324        while (e) {
325            cmp_node = _get_entry(e, struct cmp_func_node, le);
326            if (cmp_node->kvs_name == NULL ||
327                    !strcmp(cmp_node->kvs_name, default_kvs_name)) { // default KVS
328                handle->kvs_config.custom_cmp = cmp_node->func;
329                file->kv_header->default_kvs_cmp = cmp_node->func;
330                file->kv_header->custom_cmp_enabled = 1;
331            } else {
332                // search by name
333                query.kvs_name = cmp_node->kvs_name;
334                a = avl_search(file->kv_header->idx_name,
335                               &query.avl_name,
336                               _kvs_cmp_name);
337                if (a) { // found
338                    kvs_node = _get_entry(a, struct kvs_node, avl_name);
339                    if (!kvs_node->custom_cmp) {
340                        kvs_node->custom_cmp = cmp_node->func;
341                    }
342                    file->kv_header->custom_cmp_enabled = 1;
343                }
344            }
345            e = list_next(&cmp_node->le);
346        }
347    }
348
349    // first check the default KVS
350    // 1. root handle has not been opened yet: don't care
351    // 2. root handle was opened before: must match the flag
352    if (fhandle->flags & FHANDLE_ROOT_INITIALIZED) {
353        if (fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP &&
354            handle->kvs_config.custom_cmp == NULL) {
355            // custom cmp function was assigned before,
356            // but no custom cmp function is assigned
357            file->kv_header->custom_cmp_enabled = ori_flag;
358            file->kv_header->default_kvs_cmp = ori_custom_cmp;
359            spin_unlock(&file->kv_header->lock);
360            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
361            if (!kvs_name) {
362                kvs_name = DEFAULT_KVS_NAME;
363            }
364            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
365                           "Error! Tried to open a KV store '%s', which was created with "
366                           "custom compare function enabled, without passing the same "
367                           "custom compare function.", kvs_name);
368        }
369        if (!(fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP) &&
370              handle->kvs_config.custom_cmp) {
371            // custom cmp function was not assigned before,
372            // but custom cmp function is assigned from user
373            file->kv_header->custom_cmp_enabled = ori_flag;
374            file->kv_header->default_kvs_cmp = ori_custom_cmp;
375            spin_unlock(&file->kv_header->lock);
376            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
377            if (!kvs_name) {
378                kvs_name = DEFAULT_KVS_NAME;
379            }
380            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
381                           "Error! Tried to open a KV store '%s', which was created without "
382                           "custom compare function, by passing custom compare function.",
383                    kvs_name);
384        }
385    }
386
387    // next check other KVSs
388    a = avl_first(file->kv_header->idx_name);
389    while (a) {
390        kvs_node = _get_entry(a, struct kvs_node, avl_name);
391        a = avl_next(a);
392
393        if (kvs_node->flags & KVS_FLAG_CUSTOM_CMP &&
394            kvs_node->custom_cmp == NULL) {
395            // custom cmp function was assigned before,
396            // but no custom cmp function is assigned
397            file->kv_header->custom_cmp_enabled = ori_flag;
398            file->kv_header->default_kvs_cmp = ori_custom_cmp;
399            spin_unlock(&file->kv_header->lock);
400            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
401            if (!kvs_name) {
402                kvs_name = DEFAULT_KVS_NAME;
403            }
404            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
405                           "Error! Tried to open a KV store '%s', which was created with "
406                           "custom compare function enabled, without passing the same "
407                           "custom compare function.", kvs_name);
408        }
409        if (!(kvs_node->flags & KVS_FLAG_CUSTOM_CMP) &&
410              kvs_node->custom_cmp) {
411            // custom cmp function was not assigned before,
412            // but custom cmp function is assigned from user
413            file->kv_header->custom_cmp_enabled = ori_flag;
414            file->kv_header->default_kvs_cmp = ori_custom_cmp;
415            spin_unlock(&file->kv_header->lock);
416            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
417            if (!kvs_name) {
418                kvs_name = DEFAULT_KVS_NAME;
419            }
420            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
421                           "Error! Tried to open a KV store '%s', which was created without "
422                           "custom compare function, by passing custom compare function.",
423                           kvs_name);
424        }
425    }
426
427    spin_unlock(&file->kv_header->lock);
428    return FDB_RESULT_SUCCESS;
429}
430
431fdb_custom_cmp_variable fdb_kvs_find_cmp_name(fdb_kvs_handle *handle,
432                                              char *kvs_name)
433{
434    fdb_file_handle *fhandle;
435    struct list_elem *e;
436    struct cmp_func_node *cmp_node;
437
438    fhandle = handle->fhandle;
439    if (!fhandle->cmp_func_list) {
440        return NULL;
441    }
442
443    e = list_begin(fhandle->cmp_func_list);
444    while (e) {
445        cmp_node = _get_entry(e, struct cmp_func_node, le);
446        if (kvs_name == NULL ||
447            !strcmp(kvs_name, default_kvs_name)) {
448            if (cmp_node->kvs_name == NULL ||
449                !strcmp(cmp_node->kvs_name, default_kvs_name)) { // default KVS
450                return cmp_node->func;
451            }
452        } else if (cmp_node->kvs_name &&
453                   !strcmp(cmp_node->kvs_name, kvs_name)) {
454            return cmp_node->func;
455        }
456        e = list_next(&cmp_node->le);
457    }
458    return NULL;
459}
460
461hbtrie_cmp_func *fdb_kvs_find_cmp_chunk(void *chunk, void *aux)
462{
463    fdb_kvs_id_t kv_id;
464    struct hbtrie *trie = (struct hbtrie *)aux;
465    struct btreeblk_handle *bhandle;
466    struct filemgr *file;
467    struct avl_node *a;
468    struct kvs_node query, *node;
469
470    bhandle = (struct btreeblk_handle*)trie->btreeblk_handle;
471    file = bhandle->file;
472
473    if (!file->kv_header->custom_cmp_enabled) {
474        return NULL;
475    }
476
477    buf2kvid(trie->chunksize, chunk, &kv_id);
478
479    // search by id
480    if (kv_id > 0) {
481        query.id = kv_id;
482        spin_lock(&file->kv_header->lock);
483        a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
484        spin_unlock(&file->kv_header->lock);
485
486        if (a) {
487            node = _get_entry(a, struct kvs_node, avl_id);
488            return (hbtrie_cmp_func *)node->custom_cmp;
489        }
490    } else {
491        // root handle
492        return (hbtrie_cmp_func *)file->kv_header->default_kvs_cmp;
493    }
494    return NULL;
495}
496
497void _fdb_kvs_init_root(fdb_kvs_handle *handle, struct filemgr *file) {
498    handle->kvs->type = KVS_ROOT;
499    handle->kvs->root = handle->fhandle->root;
500    // super handle's ID is always 0
501    handle->kvs->id = 0;
502    // force custom cmp function
503    spin_lock(&file->kv_header->lock);
504    handle->kvs_config.custom_cmp = file->kv_header->default_kvs_cmp;
505    spin_unlock(&file->kv_header->lock);
506}
507
508void fdb_kvs_info_create(fdb_kvs_handle *root_handle,
509                         fdb_kvs_handle *handle,
510                         struct filemgr *file,
511                         const char *kvs_name)
512{
513    struct kvs_node query, *kvs_node;
514    struct kvs_opened_node *opened_node;
515    struct avl_node *a;
516
517    handle->kvs = (struct kvs_info*)calloc(1, sizeof(struct kvs_info));
518
519    if (root_handle == NULL) {
520        // 'handle' is a super handle
521        _fdb_kvs_init_root(handle, file);
522    } else {
523        // 'handle' is a sub handle (i.e., KV instance in a DB instance)
524        handle->kvs->type = KVS_SUB;
525        handle->kvs->root = root_handle;
526
527        if (kvs_name) {
528            spin_lock(&file->kv_header->lock);
529            query.kvs_name = (char*)kvs_name;
530            a = avl_search(file->kv_header->idx_name, &query.avl_name,
531                           _kvs_cmp_name);
532            if (a == NULL) {
533                // KV instance name is not found
534                free(handle->kvs);
535                handle->kvs = NULL;
536                spin_unlock(&file->kv_header->lock);
537                return;
538            }
539            kvs_node = _get_entry(a, struct kvs_node, avl_name);
540            handle->kvs->id = kvs_node->id;
541            // force custom cmp function
542            handle->kvs_config.custom_cmp = kvs_node->custom_cmp;
543            spin_unlock(&file->kv_header->lock);
544        } else {
545            // snapshot of the root handle
546            handle->kvs->id = 0;
547        }
548
549        opened_node = (struct kvs_opened_node *)
550               calloc(1, sizeof(struct kvs_opened_node));
551        opened_node->handle = handle;
552
553        handle->node = opened_node;
554        spin_lock(&root_handle->fhandle->lock);
555        list_push_back(root_handle->fhandle->handles, &opened_node->le);
556        spin_unlock(&root_handle->fhandle->lock);
557    }
558}
559
560void fdb_kvs_info_free(fdb_kvs_handle *handle)
561{
562    if (handle->kvs == NULL) {
563        return;
564    }
565
566    free(handle->kvs);
567    handle->kvs = NULL;
568}
569
570void _fdb_kvs_header_create(struct kvs_header **kv_header_ptr)
571{
572    struct kvs_header *kv_header;
573
574    kv_header = (struct kvs_header *)calloc(1, sizeof(struct kvs_header));
575    *kv_header_ptr = kv_header;
576
577    // KV ID '0' is reserved for default KV instance (super handle)
578    kv_header->id_counter = 1;
579    kv_header->default_kvs_cmp = NULL;
580    kv_header->custom_cmp_enabled = 0;
581    kv_header->idx_name = (struct avl_tree*)malloc(sizeof(struct avl_tree));
582    kv_header->idx_id = (struct avl_tree*)malloc(sizeof(struct avl_tree));
583    kv_header->num_kv_stores = 0;
584    avl_init(kv_header->idx_name, NULL);
585    avl_init(kv_header->idx_id, NULL);
586    spin_init(&kv_header->lock);
587}
588
589void fdb_kvs_header_create(struct filemgr *file)
590{
591    if (file->kv_header) {
592        return; // already exist
593    }
594
595    _fdb_kvs_header_create(&file->kv_header);
596    file->free_kv_header = fdb_kvs_header_free;
597}
598
599void fdb_kvs_header_reset_all_stats(struct filemgr *file)
600{
601    struct avl_node *a;
602    struct kvs_node *node;
603    struct kvs_header *kv_header = file->kv_header;
604
605    spin_lock(&kv_header->lock);
606    a = avl_first(kv_header->idx_id);
607    while (a) {
608        node = _get_entry(a, struct kvs_node, avl_id);
609        a = avl_next(&node->avl_id);
610        memset(&node->stat, 0x0, sizeof(node->stat));
611    }
612    spin_unlock(&kv_header->lock);
613}
614
615void fdb_kvs_header_copy(fdb_kvs_handle *handle,
616                         struct filemgr *new_file,
617                         struct docio_handle *new_dhandle,
618                         uint64_t *new_file_kv_info_offset,
619                         bool create_new)
620{
621    struct avl_node *a, *aa;
622    struct kvs_node *node_old, *node_new;
623
624    if (create_new) {
625        struct kvs_header *kv_header;
626        // copy KV header data in 'handle' to new file
627        _fdb_kvs_header_create(&kv_header);
628        // read from 'handle->dhandle', and import into 'new_file'
629        fdb_kvs_header_read(kv_header, handle->dhandle,
630                            handle->kv_info_offset, handle->file->version, false);
631
632        // write KV header in 'new_file' using 'new_dhandle'
633        uint64_t new_kv_info_offset;
634        fdb_kvs_handle new_handle;
635        new_handle.file = new_file;
636        new_handle.dhandle = new_dhandle;
637        new_handle.kv_info_offset = BLK_NOT_FOUND;
638        new_kv_info_offset = fdb_kvs_header_append(&new_handle);
639        if (new_file_kv_info_offset) {
640            *new_file_kv_info_offset = new_kv_info_offset;
641        }
642
643        if (!filemgr_set_kv_header(new_file, kv_header, fdb_kvs_header_free)) {
644            // LCOV_EXCL_START
645            _fdb_kvs_header_free(kv_header);
646        } // LCOV_EXCL_STOP
647        fdb_kvs_header_reset_all_stats(new_file);
648    }
649
650    spin_lock(&handle->file->kv_header->lock);
651    spin_lock(&new_file->kv_header->lock);
652    // copy all in-memory custom cmp function pointers & seqnums
653    new_file->kv_header->default_kvs_cmp =
654        handle->file->kv_header->default_kvs_cmp;
655    new_file->kv_header->custom_cmp_enabled =
656        handle->file->kv_header->custom_cmp_enabled;
657    a = avl_first(handle->file->kv_header->idx_id);
658    while (a) {
659        node_old = _get_entry(a, struct kvs_node, avl_id);
660        aa = avl_search(new_file->kv_header->idx_id,
661                        &node_old->avl_id, _kvs_cmp_id);
662        assert(aa); // MUST exist
663        node_new = _get_entry(aa, struct kvs_node, avl_id);
664        node_new->custom_cmp = node_old->custom_cmp;
665        node_new->seqnum = node_old->seqnum;
666        node_new->op_stat = node_old->op_stat;
667        a = avl_next(a);
668    }
669    spin_unlock(&new_file->kv_header->lock);
670    spin_unlock(&handle->file->kv_header->lock);
671}
672
673// export KV header info to raw data
674static void _fdb_kvs_header_export(struct kvs_header *kv_header,
675                                   void **data, size_t *len, uint64_t version)
676{
677    /* << raw data structure >>
678     * [# KV instances]:        8 bytes
679     * [current KV ID counter]: 8 bytes
680     * ---
681     * [name length]:           2 bytes
682     * [instance name]:         x bytes
683     * [instance ID]:           8 bytes
684     * [sequence number]:       8 bytes
685     * [# live index nodes]:    8 bytes
686     * [# docs]:                8 bytes
687     * [data size]:             8 bytes
688     * [flags]:                 8 bytes
689     * [delta size]:            8 bytes (since MAGIC_001)
690     * [# deleted docs]:        8 bytes (since MAGIC_001)
691     * ...
692     *    Please note that if the above format is changed, please also change...
693     *    _fdb_kvs_get_snap_info()
694     *    _fdb_kvs_header_import()
695     *    _kvs_stat_get_sum_doc()
696     *    _kvs_stat_get_sum_attr
697     */
698
699    int size = 0;
700    int offset = 0;
701    uint16_t name_len, _name_len;
702    uint64_t c = 0;
703    uint64_t _n_kv, _kv_id, _flags;
704    uint64_t _nlivenodes, _ndocs, _datasize, _ndeletes;
705    int64_t _deltasize;
706    fdb_kvs_id_t _id_counter;
707    fdb_seqnum_t _seqnum;
708    struct kvs_node *node;
709    struct avl_node *a;
710
711    if (kv_header == NULL) {
712        *data = NULL;
713        *len = 0;
714        return ;
715    }
716
717    spin_lock(&kv_header->lock);
718
719    // pre-scan to estimate the size of data
720    size += sizeof(uint64_t);
721    size += sizeof(fdb_kvs_id_t);
722    a = avl_first(kv_header->idx_name);
723    while(a) {
724        node = _get_entry(a, struct kvs_node, avl_name);
725        c++;
726        size += sizeof(uint16_t); // length
727        size += strlen(node->kvs_name)+1; // name
728        size += sizeof(node->id); // ID
729        size += sizeof(node->seqnum); // seq number
730        size += sizeof(node->stat.nlivenodes); // # live index nodes
731        size += sizeof(node->stat.ndocs); // # docs
732        size += sizeof(node->stat.datasize); // data size
733        size += sizeof(node->flags); // flags
734        if (ver_is_atleast_magic_001(version)) {
735            size += sizeof(node->stat.deltasize); // delta size since commit
736            size += sizeof(node->stat.ndeletes); // # deleted docs
737        }
738        a = avl_next(a);
739    }
740
741    *data = (void *)malloc(size);
742
743    // # KV instances
744    _n_kv = _endian_encode(c);
745    memcpy((uint8_t*)*data + offset, &_n_kv, sizeof(_n_kv));
746    offset += sizeof(_n_kv);
747
748    // ID counter
749    _id_counter = _endian_encode(kv_header->id_counter);
750    memcpy((uint8_t*)*data + offset, &_id_counter, sizeof(_id_counter));
751    offset += sizeof(_id_counter);
752
753    a = avl_first(kv_header->idx_name);
754    while(a) {
755        node = _get_entry(a, struct kvs_node, avl_name);
756
757        // name length
758        name_len = strlen(node->kvs_name)+1;
759        _name_len = _endian_encode(name_len);
760        memcpy((uint8_t*)*data + offset, &_name_len, sizeof(_name_len));
761        offset += sizeof(_name_len);
762
763        // name
764        memcpy((uint8_t*)*data + offset, node->kvs_name, name_len);
765        offset += name_len;
766
767        // KV ID
768        _kv_id = _endian_encode(node->id);
769        memcpy((uint8_t*)*data + offset, &_kv_id, sizeof(_kv_id));
770        offset += sizeof(_kv_id);
771
772        // seq number
773        _seqnum = _endian_encode(node->seqnum);
774        memcpy((uint8_t*)*data + offset, &_seqnum, sizeof(_seqnum));
775        offset += sizeof(_seqnum);
776
777        // # live index nodes
778        _nlivenodes = _endian_encode(node->stat.nlivenodes);
779        memcpy((uint8_t*)*data + offset, &_nlivenodes, sizeof(_nlivenodes));
780        offset += sizeof(_nlivenodes);
781
782        // # docs
783        _ndocs = _endian_encode(node->stat.ndocs);
784        memcpy((uint8_t*)*data + offset, &_ndocs, sizeof(_ndocs));
785        offset += sizeof(_ndocs);
786
787        // datasize
788        _datasize = _endian_encode(node->stat.datasize);
789        memcpy((uint8_t*)*data + offset, &_datasize, sizeof(_datasize));
790        offset += sizeof(_datasize);
791
792        // flags
793        _flags = _endian_encode(node->flags);
794        memcpy((uint8_t*)*data + offset, &_flags, sizeof(_flags));
795        offset += sizeof(_flags);
796
797        if (ver_is_atleast_magic_001(version)) {
798            // # delta index nodes + docsize created after last commit
799            _deltasize = _endian_encode(node->stat.deltasize);
800            memcpy((uint8_t*)*data + offset, &_deltasize, sizeof(_deltasize));
801            offset += sizeof(_deltasize);
802
803            // # deleted documents
804            _ndeletes = _endian_encode(node->stat.ndeletes);
805            memcpy((uint8_t*)*data + offset, &_ndeletes, sizeof(_ndeletes));
806            offset += sizeof(_ndeletes);
807        }
808
809        a = avl_next(a);
810    }
811
812    *len = size;
813
814    spin_unlock(&kv_header->lock);
815}
816
817void _fdb_kvs_header_import(struct kvs_header *kv_header,
818                            void *data, size_t len, uint64_t version,
819                            bool only_seq_nums)
820{
821    uint64_t i, offset = 0;
822    uint16_t name_len, _name_len;
823    uint64_t n_kv, _n_kv, kv_id, _kv_id, flags, _flags;
824    uint64_t _nlivenodes, _ndocs, _datasize, _ndeletes;
825    int64_t _deltasize;
826    bool is_deltasize;
827    fdb_kvs_id_t id_counter, _id_counter;
828    fdb_seqnum_t seqnum, _seqnum;
829    struct kvs_node *node;
830
831    // # KV instances
832    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
833    offset += sizeof(_n_kv);
834    n_kv = _endian_decode(_n_kv);
835
836    // ID counter
837    memcpy(&_id_counter, (uint8_t*)data + offset, sizeof(_id_counter));
838    offset += sizeof(_id_counter);
839    id_counter = _endian_decode(_id_counter);
840
841    spin_lock(&kv_header->lock);
842    kv_header->id_counter = id_counter;
843
844    // Version control
845    if (!ver_is_atleast_magic_001(version)) {
846        is_deltasize = false;
847        _deltasize = 0;
848        _ndeletes = 0;
849    } else {
850        is_deltasize = true;
851    }
852
853    for (i=0;i<n_kv;++i){
854        // name length
855        uint64_t name_offset;
856        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
857        offset += sizeof(_name_len);
858        name_offset = offset;
859        name_len = _endian_decode(_name_len);
860
861        // name
862        offset += name_len;
863
864        // KV ID
865        memcpy(&_kv_id, (uint8_t*)data + offset, sizeof(_kv_id));
866        offset += sizeof(_kv_id);
867        kv_id = _endian_decode(_kv_id);
868
869        // Search if a given KV header node exists or not.
870        struct kvs_node query;
871        query.id = kv_id;
872        struct avl_node *a = avl_search(kv_header->idx_id, &query.avl_id,
873                                        _kvs_cmp_id);
874        if (a) {
875            node = _get_entry(a, struct kvs_node, avl_id);
876        } else {
877            node = (struct kvs_node *)calloc(1, sizeof(struct kvs_node));
878            node->kvs_name = (char *)malloc(name_len);
879            memcpy(node->kvs_name, (uint8_t*)data + name_offset, name_len);
880            node->id = kv_id;
881            _init_op_stats(&node->op_stat);
882        }
883
884        // seq number
885        memcpy(&_seqnum, (uint8_t*)data + offset, sizeof(_seqnum));
886        offset += sizeof(_seqnum);
887        seqnum = _endian_decode(_seqnum);
888        node->seqnum = seqnum;
889
890        // # live index nodes
891        memcpy(&_nlivenodes, (uint8_t*)data + offset, sizeof(_nlivenodes));
892        offset += sizeof(_nlivenodes);
893
894        // # docs
895        memcpy(&_ndocs, (uint8_t*)data + offset, sizeof(_ndocs));
896        offset += sizeof(_ndocs);
897
898        // datasize
899        memcpy(&_datasize, (uint8_t*)data + offset, sizeof(_datasize));
900        offset += sizeof(_datasize);
901
902        // flags
903        memcpy(&_flags, (uint8_t*)data + offset, sizeof(_flags));
904        offset += sizeof(_flags);
905        flags = _endian_decode(_flags);
906
907        if (is_deltasize) {
908            // delta document + index size since previous commit
909            memcpy(&_deltasize, (uint8_t*)data + offset,
910                   sizeof(_deltasize));
911            offset += sizeof(_deltasize);
912            memcpy(&_ndeletes, (uint8_t*)data + offset,
913                   sizeof(_ndeletes));
914            offset += sizeof(_ndeletes);
915        }
916
917        if (!only_seq_nums) {
918            node->stat.nlivenodes = _endian_decode(_nlivenodes);
919            node->stat.ndocs = _endian_decode(_ndocs);
920            node->stat.datasize = _endian_decode(_datasize);
921            node->stat.deltasize = _endian_decode(_deltasize);
922            node->stat.ndeletes = _endian_decode(_ndeletes);
923            node->flags = flags;
924            node->custom_cmp = NULL;
925        }
926
927        if (!a) { // Insert a new KV header node if not exist.
928            avl_insert(kv_header->idx_name, &node->avl_name, _kvs_cmp_name);
929            avl_insert(kv_header->idx_id, &node->avl_id, _kvs_cmp_id);
930            ++kv_header->num_kv_stores;
931        }
932    }
933    spin_unlock(&kv_header->lock);
934}
935
936fdb_status _fdb_kvs_get_snap_info(void *data, uint64_t version,
937                                  fdb_snapshot_info_t *snap_info)
938{
939    int i, offset = 0, sizeof_skipped_segments;
940    uint16_t name_len, _name_len;
941    int64_t n_kv, _n_kv;
942    bool is_deltasize;
943    fdb_seqnum_t _seqnum;
944    // Version control
945    if (!ver_is_atleast_magic_001(version)) {
946        is_deltasize = false;
947    } else {
948        is_deltasize = true;
949    }
950
951    // # KV instances
952    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
953    offset += sizeof(_n_kv);
954    // since n_kv doesn't count the default KVS, increase it by 1.
955    n_kv = _endian_decode(_n_kv) + 1;
956    assert(n_kv); // Must have at least one kv instance
957    snap_info->kvs_markers = (fdb_kvs_commit_marker_t *)malloc(
958                                   (n_kv) * sizeof(fdb_kvs_commit_marker_t));
959    if (!snap_info->kvs_markers) { // LCOV_EXCL_START
960        return FDB_RESULT_ALLOC_FAIL;
961    } // LCOV_EXCL_STOP
962
963    snap_info->num_kvs_markers = n_kv;
964
965    // Skip over ID counter
966    offset += sizeof(fdb_kvs_id_t);
967
968    sizeof_skipped_segments = sizeof(uint64_t) // seqnum will be the last read
969                            + sizeof(uint64_t) // skip over nlivenodes
970                            + sizeof(uint64_t) // skip over ndocs
971                            + sizeof(uint64_t) // skip over datasize
972                            + sizeof(uint64_t); // skip over flags
973    if (is_deltasize) {
974        sizeof_skipped_segments += sizeof(uint64_t); // skip over deltasize
975        sizeof_skipped_segments += sizeof(uint64_t); // skip over ndeletes
976    }
977
978    for (i = 0; i < n_kv-1; ++i){
979        fdb_kvs_commit_marker_t *info = &snap_info->kvs_markers[i];
980        // Read the kv store name length
981        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
982        offset += sizeof(_name_len);
983        name_len = _endian_decode(_name_len);
984
985        // Retrieve the KV Store name
986        info->kv_store_name = (char *)malloc(name_len); // TODO: cleanup if err
987        memcpy(info->kv_store_name, (uint8_t*)data + offset, name_len);
988        offset += name_len;
989
990        // Skip over KV ID
991        offset += sizeof(uint64_t);
992
993        // Retrieve the KV Store Commit Sequence number
994        memcpy(&_seqnum, (uint8_t*)data + offset, sizeof(_seqnum));
995        info->seqnum = _endian_decode(_seqnum);
996
997        // Skip over seqnum, nlivenodes, ndocs, datasize, flags etc onto next..
998        offset += sizeof_skipped_segments;
999    }
1000
1001    return FDB_RESULT_SUCCESS;
1002}
1003
1004uint64_t _kvs_stat_get_sum_attr(void *data, uint64_t version,
1005                                kvs_stat_attr_t attr)
1006{
1007    uint64_t ret = 0;
1008    int i, offset = 0;
1009    uint16_t name_len, _name_len;
1010    int64_t n_kv, _n_kv;
1011    bool is_deltasize;
1012    uint64_t nlivenodes, ndocs, datasize, flags;
1013    int64_t deltasize;
1014
1015    // Version control
1016    if (!ver_is_atleast_magic_001(version)) {
1017        is_deltasize = false;
1018    } else {
1019        is_deltasize = true;
1020    }
1021
1022    // # KV instances
1023    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
1024    offset += sizeof(_n_kv);
1025    // since n_kv doesn't count the default KVS, increase it by 1.
1026    n_kv = _endian_decode(_n_kv) + 1;
1027    assert(n_kv); // Must have at least one kv instance
1028
1029    // Skip over ID counter
1030    offset += sizeof(fdb_kvs_id_t);
1031
1032    for (i = 0; i < n_kv-1; ++i){
1033        // Read the kv store name length and skip over the length
1034        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
1035        offset += sizeof(_name_len);
1036        name_len = _endian_decode(_name_len);
1037
1038        // Skip over the KV Store name
1039        offset += name_len;
1040
1041        // Skip over KV ID
1042        offset += sizeof(uint64_t);
1043
1044        // Skip over KV store seqnum
1045        offset += sizeof(uint64_t);
1046
1047        // pick just the attribute requested, skipping over rest..
1048        if (attr == KVS_STAT_NLIVENODES) {
1049            memcpy(&nlivenodes, (uint8_t *)data + offset, sizeof(nlivenodes));
1050            ret += _endian_decode(nlivenodes);
1051            // skip over nlivenodes just read
1052            offset += sizeof(nlivenodes);
1053            // skip over ndocs, datasize, flags (and deltasize, ndeletes)
1054            offset += sizeof(nlivenodes) + sizeof(ndocs) + sizeof(datasize)
1055                   + sizeof(flags) + (is_deltasize ? sizeof(deltasize)*2 : 0);
1056        } else if (attr == KVS_STAT_DATASIZE) {
1057            offset += sizeof(nlivenodes) + sizeof(ndocs);
1058            memcpy(&datasize, (uint8_t *)data + offset, sizeof(datasize));
1059            ret += _endian_decode(datasize);
1060            // skip over datasize, flags (and deltasize, ndeletes)
1061            offset += sizeof(datasize) + sizeof(flags)
1062                   + (is_deltasize ? sizeof(deltasize)*2 : 0);
1063        } else if (attr == KVS_STAT_DELTASIZE) {
1064            if (is_deltasize) {
1065                offset += sizeof(nlivenodes) + sizeof(ndocs) + sizeof (datasize)
1066                        + sizeof(flags);
1067                memcpy(&deltasize, (uint8_t *)data + offset, sizeof(deltasize));
1068                ret += _endian_decode(deltasize);
1069                // skip over datasize, flags (and deltasize)
1070                offset += sizeof(deltasize)*2; // and ndeletes
1071            }
1072        } else { // Attribute fetched not implemented yet..
1073            fdb_assert(false, 0, attr); // Implement fetch for this attribute
1074        }
1075    }
1076
1077    return ret;
1078}
1079
1080uint64_t fdb_kvs_header_append(fdb_kvs_handle *handle)
1081{
1082    char *doc_key = alca(char, 32);
1083    void *data;
1084    size_t len;
1085    uint64_t kv_info_offset, prev_offset;
1086    struct docio_object doc;
1087    struct docio_length doc_len;
1088    struct filemgr *file = handle->file;
1089    struct docio_handle *dhandle = handle->dhandle;
1090
1091    _fdb_kvs_header_export(file->kv_header, &data, &len, file->version);
1092
1093    prev_offset = handle->kv_info_offset;
1094
1095    memset(&doc, 0, sizeof(struct docio_object));
1096    sprintf(doc_key, "KV_header");
1097    doc.key = (void *)doc_key;
1098    doc.meta = NULL;
1099    doc.body = data;
1100    doc.length.keylen = strlen(doc_key) + 1;
1101    doc.length.metalen = 0;
1102    doc.length.bodylen = len;
1103    doc.seqnum = 0;
1104    kv_info_offset = docio_append_doc_system(dhandle, &doc);
1105    free(data);
1106
1107    if (prev_offset != BLK_NOT_FOUND) {
1108        if (docio_read_doc_length(handle->dhandle, &doc_len, prev_offset)
1109            == FDB_RESULT_SUCCESS) {
1110            // mark stale
1111            filemgr_mark_stale(handle->file, prev_offset, _fdb_get_docsize(doc_len));
1112        }
1113    }
1114
1115    return kv_info_offset;
1116}
1117
1118void fdb_kvs_header_read(struct kvs_header *kv_header,
1119                         struct docio_handle *dhandle,
1120                         uint64_t kv_info_offset,
1121                         uint64_t version,
1122                         bool only_seq_nums)
1123{
1124    int64_t offset;
1125    struct docio_object doc;
1126
1127    memset(&doc, 0, sizeof(struct docio_object));
1128    offset = docio_read_doc(dhandle, kv_info_offset, &doc, true);
1129
1130    if (offset <= 0) {
1131        fdb_log(dhandle->log_callback, (fdb_status) offset,
1132                "Failed to read a KV header with the offset %" _F64 " from a "
1133                "database file '%s'", kv_info_offset, dhandle->file->filename);
1134        return;
1135    }
1136
1137    _fdb_kvs_header_import(kv_header, doc.body, doc.length.bodylen,
1138                           version, only_seq_nums);
1139    free_docio_object(&doc, 1, 1, 1);
1140}
1141
1142fdb_seqnum_t fdb_kvs_get_committed_seqnum(fdb_kvs_handle *handle)
1143{
1144    uint8_t *buf;
1145    uint64_t dummy64;
1146    uint64_t version;
1147    uint64_t kv_info_offset;
1148    size_t len;
1149    bid_t hdr_bid;
1150    fdb_seqnum_t seqnum = SEQNUM_NOT_USED;
1151    fdb_kvs_id_t id = 0;
1152    char *compacted_filename = NULL;
1153    struct filemgr *file = handle->file;
1154
1155    buf = alca(uint8_t, file->config->blocksize);
1156
1157    if (handle->kvs && handle->kvs->id > 0) {
1158        id = handle->kvs->id;
1159    }
1160
1161    hdr_bid = filemgr_get_header_bid(file);
1162    if (hdr_bid == BLK_NOT_FOUND) {
1163        // header doesn't exist
1164        return 0;
1165    }
1166
1167    // read header
1168    filemgr_fetch_header(file, hdr_bid, buf, &len, &seqnum, NULL, NULL,
1169                         &version, NULL, &handle->log_callback);
1170    if (id > 0) { // non-default KVS
1171        // read last KVS header
1172        fdb_fetch_header(version, buf, &dummy64, &dummy64,
1173                         &dummy64, &dummy64, &dummy64, &dummy64,
1174                         &dummy64, &dummy64,
1175                         &kv_info_offset, &dummy64,
1176                         &compacted_filename, NULL);
1177
1178        int64_t doc_offset;
1179        struct kvs_header *kv_header;
1180        struct docio_object doc;
1181
1182        _fdb_kvs_header_create(&kv_header);
1183        memset(&doc, 0, sizeof(struct docio_object));
1184        doc_offset = docio_read_doc(handle->dhandle,
1185                                    kv_info_offset, &doc, true);
1186
1187        if (doc_offset <= 0) {
1188            // fail
1189            _fdb_kvs_header_free(kv_header);
1190            return 0;
1191
1192        } else {
1193            _fdb_kvs_header_import(kv_header, doc.body,
1194                                   doc.length.bodylen, version, false);
1195            // get local sequence number for the KV instance
1196            seqnum = _fdb_kvs_get_seqnum(kv_header,
1197                                         handle->kvs->id);
1198            _fdb_kvs_header_free(kv_header);
1199            free_docio_object(&doc, 1, 1, 1);
1200        }
1201    }
1202    return seqnum;
1203}
1204
1205LIBFDB_API
1206fdb_status fdb_get_kvs_seqnum(fdb_kvs_handle *handle, fdb_seqnum_t *seqnum)
1207{
1208    if (!handle) {
1209        return FDB_RESULT_INVALID_HANDLE;
1210    }
1211
1212    if (!seqnum) {
1213        return FDB_RESULT_INVALID_ARGS;
1214    }
1215
1216    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
1217        return FDB_RESULT_HANDLE_BUSY;
1218    }
1219
1220    if (handle->shandle) {
1221        // handle for snapshot
1222        // return MAX_SEQNUM instead of the file's sequence number
1223        *seqnum = handle->max_seqnum;
1224    } else {
1225        fdb_check_file_reopen(handle, NULL);
1226        fdb_sync_db_header(handle);
1227
1228        struct filemgr *file;
1229        file = handle->file;
1230
1231        if (handle->kvs == NULL ||
1232            handle->kvs->id == 0) {
1233            filemgr_mutex_lock(file);
1234            *seqnum = filemgr_get_seqnum(file);
1235            filemgr_mutex_unlock(file);
1236        } else {
1237            *seqnum = fdb_kvs_get_seqnum(file, handle->kvs->id);
1238        }
1239    }
1240    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
1241    return FDB_RESULT_SUCCESS;
1242}
1243
1244void fdb_kvs_set_seqnum(struct filemgr *file,
1245                           fdb_kvs_id_t id,
1246                           fdb_seqnum_t seqnum)
1247{
1248    struct kvs_header *kv_header = file->kv_header;
1249    struct kvs_node query, *node;
1250    struct avl_node *a;
1251
1252    if (id == 0) {
1253        // default KV instance
1254        filemgr_set_seqnum(file, seqnum);
1255        return;
1256    }
1257
1258    spin_lock(&kv_header->lock);
1259    query.id = id;
1260    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1261    node = _get_entry(a, struct kvs_node, avl_id);
1262    node->seqnum = seqnum;
1263    spin_unlock(&kv_header->lock);
1264}
1265
1266void _fdb_kvs_header_free(struct kvs_header *kv_header)
1267{
1268    struct kvs_node *node;
1269    struct avl_node *a;
1270
1271    a = avl_first(kv_header->idx_name);
1272    while (a) {
1273        node = _get_entry(a, struct kvs_node, avl_name);
1274        a = avl_next(a);
1275        avl_remove(kv_header->idx_name, &node->avl_name);
1276
1277        free(node->kvs_name);
1278        free(node);
1279    }
1280    free(kv_header->idx_name);
1281    free(kv_header->idx_id);
1282    free(kv_header);
1283}
1284
1285void fdb_kvs_header_free(struct filemgr *file)
1286{
1287    if (file->kv_header == NULL) {
1288        return;
1289    }
1290
1291    _fdb_kvs_header_free(file->kv_header);
1292    file->kv_header = NULL;
1293}
1294
1295static fdb_status _fdb_kvs_create(fdb_kvs_handle *root_handle,
1296                                  const char *kvs_name,
1297                                  fdb_kvs_config *kvs_config)
1298{
1299    int kv_ins_name_len;
1300    fdb_status fs = FDB_RESULT_SUCCESS;
1301    struct avl_node *a;
1302    struct filemgr *file;
1303    struct kvs_node *node, query;
1304    struct kvs_header *kv_header;
1305
1306    if (root_handle->config.multi_kv_instances == false) {
1307        // cannot open KV instance under single DB instance mode
1308        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_CONFIG,
1309                       "Cannot open or create KV store instance '%s' because multi-KV "
1310                       "store instance mode is disabled.",
1311                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1312    }
1313    if (root_handle->kvs->type != KVS_ROOT) {
1314        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_HANDLE,
1315                       "Cannot open or create KV store instance '%s' because the handle "
1316                       "doesn't support multi-KV sotre instance mode.",
1317                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1318    }
1319
1320fdb_kvs_create_start:
1321    fdb_check_file_reopen(root_handle, NULL);
1322    filemgr_mutex_lock(root_handle->file);
1323    fdb_sync_db_header(root_handle);
1324
1325    if (filemgr_is_rollback_on(root_handle->file)) {
1326        filemgr_mutex_unlock(root_handle->file);
1327        return FDB_RESULT_FAIL_BY_ROLLBACK;
1328    }
1329
1330    file = root_handle->file;
1331
1332    file_status_t fstatus = filemgr_get_file_status(file);
1333    if (fstatus == FILE_REMOVED_PENDING) {
1334        // we must not write into this file
1335        // file status was changed by other thread .. start over
1336        filemgr_mutex_unlock(file);
1337        goto fdb_kvs_create_start;
1338    }
1339
1340    kv_header = file->kv_header;
1341    spin_lock(&kv_header->lock);
1342
1343    // find existing KV instance
1344    // search by name
1345    query.kvs_name = (char*)kvs_name;
1346    a = avl_search(kv_header->idx_name, &query.avl_name, _kvs_cmp_name);
1347    if (a) { // KV name already exists
1348        spin_unlock(&kv_header->lock);
1349        filemgr_mutex_unlock(file);
1350        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1351                       "Failed to create KV Store '%s' as it already exists.",
1352                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1353    }
1354
1355    // create a kvs_node and insert
1356    node = (struct kvs_node *)calloc(1, sizeof(struct kvs_node));
1357    node->id = kv_header->id_counter++;
1358    node->seqnum = 0;
1359    node->flags = 0x0;
1360    _init_op_stats(&node->op_stat);
1361    // search fhandle's custom cmp func list first
1362    node->custom_cmp = fdb_kvs_find_cmp_name(root_handle,
1363                                             (char *)kvs_name);
1364    if (node->custom_cmp == NULL && kvs_config->custom_cmp) {
1365        // follow kvs_config's custom cmp next
1366        node->custom_cmp = kvs_config->custom_cmp;
1367        // if custom cmp function is given by user but
1368        // there is no corresponding function in fhandle's list
1369        // add it into the list
1370        fdb_file_handle_add_cmp_func(root_handle->fhandle,
1371                                     (char*)kvs_name,
1372                                     kvs_config->custom_cmp);
1373    }
1374    if (node->custom_cmp) { // custom cmp function is used
1375        node->flags |= KVS_FLAG_CUSTOM_CMP;
1376        kv_header->custom_cmp_enabled = 1;
1377    }
1378    kv_ins_name_len = strlen(kvs_name)+1;
1379    node->kvs_name = (char *)malloc(kv_ins_name_len);
1380    strcpy(node->kvs_name, kvs_name);
1381
1382    avl_insert(kv_header->idx_name, &node->avl_name, _kvs_cmp_name);
1383    avl_insert(kv_header->idx_id, &node->avl_id, _kvs_cmp_id);
1384    ++kv_header->num_kv_stores;
1385    spin_unlock(&kv_header->lock);
1386
1387    // if compaction is in-progress,
1388    // create a same kvs_node for the new file
1389    if (file->new_file &&
1390        filemgr_get_file_status(file) == FILE_COMPACT_OLD) {
1391        struct kvs_node *node_new;
1392        struct kvs_header *kv_header_new;
1393
1394        kv_header_new = file->new_file->kv_header;
1395        node_new = (struct kvs_node*)calloc(1, sizeof(struct kvs_node));
1396        *node_new = *node;
1397        node_new->kvs_name = (char*)malloc(kv_ins_name_len);
1398        strcpy(node_new->kvs_name, kvs_name);
1399
1400        // insert into new file's kv_header
1401        spin_lock(&kv_header_new->lock);
1402        if (node->custom_cmp) {
1403            kv_header_new->custom_cmp_enabled = 1;
1404        }
1405        avl_insert(kv_header_new->idx_name, &node_new->avl_name, _kvs_cmp_name);
1406        avl_insert(kv_header_new->idx_id, &node_new->avl_id, _kvs_cmp_id);
1407        spin_unlock(&kv_header_new->lock);
1408    }
1409
1410    // since this function calls filemgr_commit() and appends a new DB header,
1411    // we should finalize & flush the previous dirty update before commit.
1412    bid_t dirty_idtree_root = BLK_NOT_FOUND;
1413    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
1414    struct filemgr_dirty_update_node *prev_node = NULL;
1415    struct filemgr_dirty_update_node *new_node = NULL;
1416
1417    _fdb_dirty_update_ready(root_handle, &prev_node, &new_node,
1418                            &dirty_idtree_root, &dirty_seqtree_root, false);
1419
1420    _fdb_dirty_update_finalize(root_handle, prev_node, new_node,
1421                               &dirty_idtree_root, &dirty_seqtree_root, true);
1422
1423    // append system doc
1424    root_handle->kv_info_offset = fdb_kvs_header_append(root_handle);
1425
1426    // if no compaction is being performed, append header and commit
1427    if (root_handle->file == file) {
1428        uint64_t cur_bmp_revnum = sb_get_bmp_revnum(file);
1429        root_handle->last_hdr_bid = filemgr_alloc(file, &root_handle->log_callback);
1430        root_handle->cur_header_revnum = fdb_set_file_header(root_handle, true);
1431        fs = filemgr_commit_bid(root_handle->file,
1432                                root_handle->last_hdr_bid,
1433                                cur_bmp_revnum,
1434                                !(root_handle->config.durability_opt & FDB_DRB_ASYNC),
1435                                &root_handle->log_callback);
1436        btreeblk_reset_subblock_info(root_handle->bhandle);
1437    }
1438
1439    filemgr_mutex_unlock(file);
1440
1441    return fs;
1442}
1443
1444// this function just returns pointer
1445char* _fdb_kvs_get_name(fdb_kvs_handle *handle, struct filemgr *file)
1446{
1447    struct kvs_node *node, query;
1448    struct avl_node *a;
1449
1450    if (handle->kvs == NULL) {
1451        // single KV instance mode
1452        return NULL;
1453    }
1454
1455    query.id = handle->kvs->id;
1456    if (query.id == 0) { // default KV instance
1457        return NULL;
1458    }
1459    spin_lock(&file->kv_header->lock);
1460    a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1461    if (a) {
1462        node = _get_entry(a, struct kvs_node, avl_id);
1463        spin_unlock(&file->kv_header->lock);
1464        return node->kvs_name;
1465    }
1466    spin_unlock(&file->kv_header->lock);
1467    return NULL;
1468}
1469
1470// this function just returns pointer to kvs_name & offset to user key
1471const char* _fdb_kvs_extract_name_off(fdb_kvs_handle *handle, void *keybuf,
1472                                      size_t *key_offset)
1473{
1474    struct kvs_node *node, query;
1475    struct avl_node *a;
1476    fdb_kvs_id_t kv_id;
1477    struct filemgr *file = handle->file;
1478
1479    if (!handle->kvs) { // single KV instance mode
1480        *key_offset = 0;
1481        return DEFAULT_KVS_NAME;
1482    }
1483
1484    *key_offset = handle->config.chunksize;
1485    buf2kvid(*key_offset, keybuf, &kv_id);
1486    query.id = kv_id;
1487    if (query.id == 0) { // default KV instance in multi kvs mode
1488        return default_kvs_name;
1489    }
1490    spin_lock(&file->kv_header->lock);
1491    a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1492    if (a) {
1493        node = _get_entry(a, struct kvs_node, avl_id);
1494        const char *kvs_name = node->kvs_name;
1495        spin_unlock(&file->kv_header->lock);
1496        return kvs_name;
1497    }
1498    spin_unlock(&file->kv_header->lock);
1499    return NULL;
1500}
1501
1502fdb_status _fdb_kvs_clone_snapshot(fdb_kvs_handle *handle_in,
1503                                   fdb_kvs_handle *handle_out)
1504{
1505    fdb_status fs;
1506    fdb_kvs_handle *root_handle = handle_in->kvs->root;
1507
1508    if (!handle_out->kvs) {
1509        // create kvs_info
1510        handle_out->kvs = (struct kvs_info*)calloc(1, sizeof(struct kvs_info));
1511        handle_out->kvs->type = handle_in->kvs->type;
1512        handle_out->kvs->id = handle_in->kvs->id;
1513        handle_out->kvs->root = root_handle;
1514        handle_out->kvs_config.custom_cmp = handle_in->kvs_config.custom_cmp;
1515
1516        struct kvs_opened_node *opened_node = (struct kvs_opened_node *)
1517            calloc(1, sizeof(struct kvs_opened_node));
1518        opened_node->handle = handle_out;
1519        handle_out->node = opened_node;
1520
1521        spin_lock(&root_handle->fhandle->lock);
1522        list_push_back(root_handle->fhandle->handles, &opened_node->le);
1523        spin_unlock(&root_handle->fhandle->lock);
1524    }
1525
1526    fs = _fdb_clone_snapshot(handle_in, handle_out);
1527    if (fs != FDB_RESULT_SUCCESS) {
1528        if (handle_out->node) {
1529            spin_lock(&root_handle->fhandle->lock);
1530            list_remove(root_handle->fhandle->handles, &handle_out->node->le);
1531            spin_unlock(&root_handle->fhandle->lock);
1532            free(handle_out->node);
1533        }
1534        free(handle_out->kvs);
1535    }
1536    return fs;
1537}
1538
1539// 1) allocate memory & create 'handle->kvs'
1540//    by calling fdb_kvs_info_create().
1541//      -> this will allocate a corresponding node and
1542//         insert it into fhandle->handles list.
1543// 2) if matching KVS name doesn't exist, create it.
1544// 3) call _fdb_open().
1545fdb_status _fdb_kvs_open(fdb_kvs_handle *root_handle,
1546                         fdb_config *config,
1547                         fdb_kvs_config *kvs_config,
1548                         struct filemgr *file,
1549                         const char *filename,
1550                         const char *kvs_name,
1551                         fdb_kvs_handle *handle)
1552{
1553    fdb_status fs;
1554
1555    if (handle->kvs == NULL) {
1556        // create kvs_info
1557        fdb_kvs_info_create(root_handle, handle, file, kvs_name);
1558    }
1559
1560    if (handle->kvs == NULL) {
1561        // KV instance name is not found
1562        if (!kvs_config->create_if_missing) {
1563            return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1564                           "Failed to open KV store '%s' because it doesn't exist.",
1565                           kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1566        }
1567        if (root_handle->config.flags == FDB_OPEN_FLAG_RDONLY) {
1568            return fdb_log(&root_handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
1569                           "Failed to create KV store '%s' because the KV store's handle "
1570                           "is read-only.", kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1571        }
1572
1573        // create
1574        fs = _fdb_kvs_create(root_handle, kvs_name, kvs_config);
1575        if (fs != FDB_RESULT_SUCCESS) { // create fail
1576            return FDB_RESULT_INVALID_KV_INSTANCE_NAME;
1577        }
1578        // create kvs_info again
1579        fdb_kvs_info_create(root_handle, handle, file, kvs_name);
1580        if (handle->kvs == NULL) { // fail again
1581            return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1582                           "Failed to create KV store '%s' because the KV store's handle "
1583                           "is read-only.", kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1584        }
1585    }
1586    fs = _fdb_open(handle, filename, FDB_AFILENAME, config);
1587    if (fs != FDB_RESULT_SUCCESS) {
1588        if (handle->node) {
1589            spin_lock(&root_handle->fhandle->lock);
1590            list_remove(root_handle->fhandle->handles, &handle->node->le);
1591            spin_unlock(&root_handle->fhandle->lock);
1592            free(handle->node);
1593        } // 'handle->node == NULL' happens only during rollback
1594        free(handle->kvs);
1595    }
1596    return fs;
1597}
1598
1599// 1) identify whether the requested KVS is default or non-default.
1600// 2) if the requested KVS is default,
1601//   2-1) if no KVS handle is opened yet from this fhandle,
1602//        -> return the root handle.
1603//   2-2) if the root handle is already opened,
1604//        -> allocate memory for handle, and call _fdb_open().
1605//        -> 'handle->kvs' will be created in _fdb_open(),
1606//           since it is treated as a default handle.
1607//        -> allocate a corresponding node and insert it into
1608//           fhandle->handles list.
1609// 3) if the requested KVS is non-default,
1610//    -> allocate memory for handle, and call _fdb_kvs_open().
1611LIBFDB_API
1612fdb_status fdb_kvs_open(fdb_file_handle *fhandle,
1613                        fdb_kvs_handle **ptr_handle,
1614                        const char *kvs_name,
1615                        fdb_kvs_config *kvs_config)
1616{
1617    fdb_kvs_handle *handle;
1618    fdb_config config;
1619    fdb_status fs;
1620    fdb_kvs_handle *root_handle;
1621    fdb_kvs_config config_local;
1622    struct filemgr *file = NULL;
1623    struct filemgr *latest_file = NULL;
1624    LATENCY_STAT_START();
1625
1626    if (!fhandle || !fhandle->root) {
1627        return FDB_RESULT_INVALID_HANDLE;
1628    }
1629
1630    root_handle = fhandle->root;
1631    config = root_handle->config;
1632
1633    if (kvs_config) {
1634        if (validate_fdb_kvs_config(kvs_config)) {
1635            config_local = *kvs_config;
1636        } else {
1637            return FDB_RESULT_INVALID_CONFIG;
1638        }
1639    } else {
1640        config_local = get_default_kvs_config();
1641    }
1642
1643    fdb_check_file_reopen(root_handle, NULL);
1644    fdb_sync_db_header(root_handle);
1645
1646    file = root_handle->file;
1647    latest_file = root_handle->file;
1648
1649    if (kvs_name == NULL || !strcmp(kvs_name, default_kvs_name)) {
1650        // return the default KV store handle
1651        spin_lock(&fhandle->lock);
1652        if (!(fhandle->flags & FHANDLE_ROOT_OPENED)) {
1653            // the root handle is not opened yet
1654            // just return the root handle
1655            fdb_custom_cmp_variable default_kvs_cmp;
1656
1657            root_handle->kvs_config = config_local;
1658
1659            if (root_handle->file->kv_header) {
1660                // search fhandle's custom cmp func list first
1661                default_kvs_cmp = fdb_kvs_find_cmp_name(root_handle, (char *)kvs_name);
1662
1663                spin_lock(&root_handle->file->kv_header->lock);
1664                root_handle->file->kv_header->default_kvs_cmp = default_kvs_cmp;
1665
1666                if (root_handle->file->kv_header->default_kvs_cmp == NULL &&
1667                    root_handle->kvs_config.custom_cmp) {
1668                    // follow kvs_config's custom cmp next
1669                    root_handle->file->kv_header->default_kvs_cmp =
1670                        root_handle->kvs_config.custom_cmp;
1671                    fdb_file_handle_add_cmp_func(fhandle, NULL,
1672                                                 root_handle->kvs_config.custom_cmp);
1673                }
1674
1675                if (root_handle->file->kv_header->default_kvs_cmp) {
1676                    root_handle->file->kv_header->custom_cmp_enabled = 1;
1677                    fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1678                }
1679                spin_unlock(&root_handle->file->kv_header->lock);
1680            }
1681
1682            *ptr_handle = root_handle;
1683            fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1684            fhandle->flags |= FHANDLE_ROOT_OPENED;
1685            fs = FDB_RESULT_SUCCESS;
1686            spin_unlock(&fhandle->lock);
1687
1688        } else {
1689            // the root handle is already opened
1690            // open new default KV store handle
1691            spin_unlock(&fhandle->lock);
1692            handle = (fdb_kvs_handle*)calloc(1, sizeof(fdb_kvs_handle));
1693            handle->kvs_config = config_local;
1694            atomic_init_uint8_t(&handle->handle_busy, 0);
1695
1696            if (root_handle->file->kv_header) {
1697                spin_lock(&root_handle->file->kv_header->lock);
1698                handle->kvs_config.custom_cmp =
1699                    root_handle->file->kv_header->default_kvs_cmp;
1700                spin_unlock(&root_handle->file->kv_header->lock);
1701            }
1702
1703            handle->fhandle = fhandle;
1704            fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1705            if (fs != FDB_RESULT_SUCCESS) {
1706                free(handle);
1707                *ptr_handle = NULL;
1708            } else {
1709                // insert into fhandle's list
1710                struct kvs_opened_node *node;
1711                node = (struct kvs_opened_node *)
1712                       calloc(1, sizeof(struct kvs_opened_node));
1713                node->handle = handle;
1714                spin_lock(&fhandle->lock);
1715                list_push_front(fhandle->handles, &node->le);
1716                spin_unlock(&fhandle->lock);
1717
1718                handle->node = node;
1719                *ptr_handle = handle;
1720            }
1721        }
1722        LATENCY_STAT_END(file, FDB_LATENCY_KVS_OPEN);
1723        return fs;
1724    }
1725
1726    if (config.multi_kv_instances == false) {
1727        // cannot open KV instance under single DB instance mode
1728        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_CONFIG,
1729                       "Cannot open KV store instance '%s' because multi-KV "
1730                       "store instance mode is disabled.",
1731                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1732    }
1733    if (root_handle->kvs->type != KVS_ROOT) {
1734        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_HANDLE,
1735                       "Cannot open KV store instance '%s' because the handle "
1736                       "doesn't support multi-KV sotre instance mode.",
1737                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1738    }
1739    if (root_handle->shandle) {
1740        // cannot open KV instance from a snapshot
1741        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_ARGS,
1742                       "Not allowed to open KV store instance '%s' from the "
1743                       "snapshot handle.",
1744                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1745    }
1746
1747    handle = (fdb_kvs_handle *)calloc(1, sizeof(fdb_kvs_handle));
1748    if (!handle) { // LCOV_EXCL_START
1749        return FDB_RESULT_ALLOC_FAIL;
1750    } // LCOV_EXCL_STOP
1751
1752    atomic_init_uint8_t(&handle->handle_busy, 0);
1753    handle->fhandle = fhandle;
1754    fs = _fdb_kvs_open(root_handle, &config, &config_local,
1755                       latest_file, file->filename, kvs_name, handle);
1756    if (fs == FDB_RESULT_SUCCESS) {
1757        *ptr_handle = handle;
1758    } else {
1759        *ptr_handle = NULL;
1760        free(handle);
1761    }
1762    LATENCY_STAT_END(file, FDB_LATENCY_KVS_OPEN);
1763    return fs;
1764}
1765
1766LIBFDB_API
1767fdb_status fdb_kvs_open_default(fdb_file_handle *fhandle,
1768                                fdb_kvs_handle **ptr_handle,
1769                                fdb_kvs_config *config)
1770{
1771    return fdb_kvs_open(fhandle, ptr_handle, NULL, config);
1772}
1773
1774// 1) remove corresponding node from fhandle->handles list.
1775// 2) call _fdb_close().
1776static fdb_status _fdb_kvs_close(fdb_kvs_handle *handle)
1777{
1778    fdb_kvs_handle *root_handle = handle->kvs->root;
1779    fdb_status fs;
1780
1781    if (handle->node) {
1782        spin_lock(&root_handle->fhandle->lock);
1783        list_remove(root_handle->fhandle->handles, &handle->node->le);
1784        spin_unlock(&root_handle->fhandle->lock);
1785        free(handle->node);
1786    } // 'handle->node == NULL' happens only during rollback
1787
1788    fs = _fdb_close(handle);
1789    return fs;
1790}
1791
1792// close all sub-KV store handles belonging to the root handle
1793fdb_status fdb_kvs_close_all(fdb_kvs_handle *root_handle)
1794{
1795    fdb_status fs;
1796    struct list_elem *e;
1797    struct kvs_opened_node *node;
1798
1799    spin_lock(&root_handle->fhandle->lock);
1800    e = list_begin(root_handle->fhandle->handles);
1801    while (e) {
1802        node = _get_entry(e, struct kvs_opened_node, le);
1803        e = list_remove(root_handle->fhandle->handles, &node->le);
1804        fs = _fdb_close(node->handle);
1805        if (fs != FDB_RESULT_SUCCESS) {
1806            spin_unlock(&root_handle->fhandle->lock);
1807            return fs;
1808        }
1809        fdb_kvs_info_free(node->handle);
1810        free(node->handle);
1811        free(node);
1812    }
1813    spin_unlock(&root_handle->fhandle->lock);
1814
1815    return FDB_RESULT_SUCCESS;
1816}
1817
1818// 1) identify whether the requested handle is for default KVS or not.
1819// 2) if the requested handle is for the default KVS,
1820//   2-1) if the requested handle is the root handle,
1821//        -> just clear the OPENED flag.
1822//   2-2) if the requested handle is not the root handle,
1823//        -> call _fdb_close(),
1824//        -> free 'handle->kvs' by calling fdb_kvs_info_free(),
1825//        -> remove the corresponding node from fhandle->handles list,
1826//        -> free the memory for the handle.
1827// 3) if the requested handle is for non-default KVS,
1828//    -> call _fdb_kvs_close(),
1829//       -> this will remove the node from fhandle->handles list.
1830//    -> free 'handle->kvs' by calling fdb_kvs_info_free(),
1831//    -> free the memory for the handle.
1832LIBFDB_API
1833fdb_status fdb_kvs_close(fdb_kvs_handle *handle)
1834{
1835    fdb_status fs;
1836
1837    if (!handle) {
1838        return FDB_RESULT_INVALID_HANDLE;
1839    }
1840    if (handle->num_iterators) {
1841        // There are still active iterators created from this handle
1842        return FDB_RESULT_KV_STORE_BUSY;
1843    }
1844
1845    if (handle->shandle && handle->kvs == NULL) {
1846        // snapshot of the default KV store + single KV store mode
1847        // directly close handle
1848        // (snapshot of the other KV stores will be closed
1849        //  using _fdb_kvs_close(...) below)
1850        fs = _fdb_close(handle);
1851        if (fs == FDB_RESULT_SUCCESS) {
1852            free(handle);
1853        }
1854        return fs;
1855    }
1856
1857    if (handle->kvs == NULL ||
1858        handle->kvs->type == KVS_ROOT) {
1859        // the default KV store handle
1860
1861        if (handle->fhandle->root == handle) {
1862            // do nothing for root handle
1863            // the root handle will be closed with fdb_close() API call.
1864            spin_lock(&handle->fhandle->lock);
1865            handle->fhandle->flags &= ~FHANDLE_ROOT_OPENED; // remove flag
1866            spin_unlock(&handle->fhandle->lock);
1867            return FDB_RESULT_SUCCESS;
1868
1869        } else {
1870            // the default KV store but not the root handle .. normally close
1871            spin_lock(&handle->fhandle->lock);
1872            fs = _fdb_close(handle);
1873            if (fs == FDB_RESULT_SUCCESS) {
1874                // remove from 'handles' list in the root node
1875                if (handle->kvs) {
1876                    fdb_kvs_info_free(handle);
1877                }
1878                list_remove(handle->fhandle->handles, &handle->node->le);
1879                spin_unlock(&handle->fhandle->lock);
1880                free(handle->node);
1881                free(handle);
1882            } else {
1883                spin_unlock(&handle->fhandle->lock);
1884            }
1885            return fs;
1886        }
1887    }
1888
1889    if (handle->kvs && handle->kvs->root == NULL) {
1890        return FDB_RESULT_INVALID_ARGS;
1891    }
1892    fs = _fdb_kvs_close(handle);
1893    if (fs == FDB_RESULT_SUCCESS) {
1894        fdb_kvs_info_free(handle);
1895        free(handle);
1896    }
1897    return fs;
1898}
1899
1900static
1901fdb_status _fdb_kvs_remove(fdb_file_handle *fhandle,
1902                           const char *kvs_name,
1903                           bool rollback_recreate)
1904{
1905    size_t size_chunk, size_id;
1906    uint8_t *_kv_id;
1907    fdb_status fs = FDB_RESULT_SUCCESS;
1908    fdb_kvs_id_t kv_id = 0;
1909    fdb_kvs_handle *root_handle;
1910    struct avl_node *a = NULL;
1911    struct filemgr *file;
1912    struct kvs_node *node, query;
1913    struct kvs_header *kv_header;
1914
1915    if (!fhandle || !fhandle->root) {
1916        return FDB_RESULT_INVALID_HANDLE;
1917    }
1918
1919    root_handle = fhandle->root;
1920
1921    if (root_handle->config.multi_kv_instances == false) {
1922        // cannot remove the KV instance under single DB instance mode
1923        return FDB_RESULT_INVALID_CONFIG;
1924    }
1925    if (root_handle->kvs->type != KVS_ROOT) {
1926        return FDB_RESULT_INVALID_HANDLE;
1927    }
1928
1929fdb_kvs_remove_start:
1930    if (!rollback_recreate) {
1931        fdb_check_file_reopen(root_handle, NULL);
1932        filemgr_mutex_lock(root_handle->file);
1933        fdb_sync_db_header(root_handle);
1934
1935        if (filemgr_is_rollback_on(root_handle->file)) {
1936            filemgr_mutex_unlock(root_handle->file);
1937            return FDB_RESULT_FAIL_BY_ROLLBACK;
1938        }
1939    } else {
1940        filemgr_mutex_lock(root_handle->file);
1941    }
1942
1943    file = root_handle->file;
1944
1945    file_status_t fstatus = filemgr_get_file_status(file);
1946    if (fstatus == FILE_REMOVED_PENDING) {
1947        // we must not write into this file
1948        // file status was changed by other thread .. start over
1949        filemgr_mutex_unlock(file);
1950        goto fdb_kvs_remove_start;
1951    } else if (fstatus == FILE_COMPACT_OLD) {
1952        // Cannot remove existing KV store during compaction.
1953        // To remove a KV store, the corresponding first chunk in HB+trie
1954        // should be unlinked. This can be possible in the old file during
1955        // compaction, but impossible in the new file, since existing documents
1956        // (including docs belonging to the KV store to be removed) are being moved.
1957        filemgr_mutex_unlock(file);
1958        return FDB_RESULT_FAIL_BY_COMPACTION;
1959    }
1960
1961    // find the kvs_node and remove
1962
1963    // search by name to get ID
1964    if (kvs_name == NULL || !strcmp(kvs_name, default_kvs_name)) {
1965        if (!rollback_recreate) {
1966            // default KV store .. KV ID = 0
1967            kv_id = 0;
1968            if (_fdb_kvs_any_handle_opened(fhandle, kv_id)) {
1969                // there is an opened handle
1970                filemgr_mutex_unlock(file);
1971                return FDB_RESULT_KV_STORE_BUSY;
1972            }
1973        }
1974        // reset KVS stats (excepting for WAL stats)
1975        file->header.stat.ndocs = 0;
1976        file->header.stat.nlivenodes = 0;
1977        file->header.stat.datasize = 0;
1978        file->header.stat.deltasize = 0;
1979
1980        // reset seqnum
1981        filemgr_set_seqnum(file, 0);
1982    } else {
1983        kv_header = file->kv_header;
1984        spin_lock(&kv_header->lock);
1985        query.kvs_name = (char*)kvs_name;
1986        a = avl_search(kv_header->idx_name, &query.avl_name, _kvs_cmp_name);
1987        if (a == NULL) { // KV name doesn't exist
1988            spin_unlock(&kv_header->lock);
1989            filemgr_mutex_unlock(file);
1990            return FDB_RESULT_KV_STORE_NOT_FOUND;
1991        }
1992        node = _get_entry(a, struct kvs_node, avl_name);
1993        kv_id = node->id;
1994
1995        if (!rollback_recreate) {
1996            spin_unlock(&kv_header->lock);
1997            if (_fdb_kvs_any_handle_opened(fhandle, kv_id)) {
1998                // there is an opened handle
1999                filemgr_mutex_unlock(file);
2000                return FDB_RESULT_KV_STORE_BUSY;
2001            }
2002            spin_lock(&kv_header->lock);
2003
2004            avl_remove(kv_header->idx_name, &node->avl_name);
2005            avl_remove(kv_header->idx_id, &node->avl_id);
2006            --kv_header->num_kv_stores;
2007            spin_unlock(&kv_header->lock);
2008
2009            kv_id = node->id;
2010
2011            // free node
2012            free(node->kvs_name);
2013            free(node);
2014        } else {
2015            // reset all stats except for WAL
2016            node->stat.ndocs = 0;
2017            node->stat.nlivenodes = 0;
2018            node->stat.datasize = 0;
2019            node->stat.deltasize = 0;
2020            node->seqnum = 0;
2021            spin_unlock(&kv_header->lock);
2022        }
2023    }
2024
2025    // discard all WAL entries
2026    wal_close_kv_ins(file, kv_id, &root_handle->log_callback);
2027
2028    bid_t dirty_idtree_root = BLK_NOT_FOUND;
2029    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
2030    struct filemgr_dirty_update_node *prev_node = NULL, *new_node = NULL;
2031
2032    _fdb_dirty_update_ready(root_handle, &prev_node, &new_node,
2033                            &dirty_idtree_root, &dirty_seqtree_root, false);
2034
2035    size_id = sizeof(fdb_kvs_id_t);
2036    size_chunk = root_handle->trie->chunksize;
2037
2038    // remove from super handle's HB+trie
2039    _kv_id = alca(uint8_t, size_chunk);
2040    kvid2buf(size_chunk, kv_id, _kv_id);
2041    hbtrie_remove_partial(root_handle->trie, _kv_id, size_chunk);
2042    btreeblk_end(root_handle->bhandle);
2043
2044    if (root_handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2045        _kv_id = alca(uint8_t, size_id);
2046        kvid2buf(size_id, kv_id, _kv_id);
2047        hbtrie_remove_partial(root_handle->seqtrie, _kv_id, size_id);
2048        btreeblk_end(root_handle->bhandle);
2049    }
2050
2051    _fdb_dirty_update_finalize(root_handle, prev_node, new_node,
2052                               &dirty_idtree_root, &dirty_seqtree_root, true);
2053
2054    // append system doc
2055    root_handle->kv_info_offset = fdb_kvs_header_append(root_handle);
2056
2057    // if no compaction is being performed, append header and commit
2058    if (root_handle->file == file) {
2059        uint64_t cur_bmp_revnum = sb_get_bmp_revnum(file);
2060        root_handle->last_hdr_bid = filemgr_alloc(file, &root_handle->log_callback);
2061        root_handle->cur_header_revnum = fdb_set_file_header(root_handle, true);
2062        fs = filemgr_commit_bid(root_handle->file,
2063                                root_handle->last_hdr_bid,
2064                                cur_bmp_revnum,
2065                                !(root_handle->config.durability_opt & FDB_DRB_ASYNC),
2066                                &root_handle->log_callback);
2067        btreeblk_reset_subblock_info(root_handle->bhandle);
2068    }
2069
2070    filemgr_mutex_unlock(file);
2071
2072    return fs;
2073}
2074
2075bool _fdb_kvs_is_busy(fdb_file_handle *fhandle)
2076{
2077    bool ret = false;
2078    struct filemgr *file = fhandle->root->file;
2079    struct avl_node *a;
2080    struct filemgr_fhandle_idx_node *fhandle_node;
2081    fdb_file_handle *file_handle;
2082
2083    spin_lock(&file->fhandle_idx_lock);
2084    a = avl_first(&file->fhandle_idx);
2085    while (a) {
2086        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
2087        a = avl_next(a);
2088        file_handle = (fdb_file_handle *) fhandle_node->fhandle;
2089        spin_lock(&file_handle->lock);
2090        if (list_begin(file_handle->handles) != NULL) {
2091            ret = true;
2092            spin_unlock(&file_handle->lock);
2093            break;
2094        }
2095        spin_unlock(&file_handle->lock);
2096    }
2097    spin_unlock(&file->fhandle_idx_lock);
2098
2099    return ret;
2100}
2101
2102fdb_status fdb_kvs_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
2103{
2104    fdb_config config;
2105    fdb_kvs_config kvs_config;
2106    fdb_kvs_handle *handle_in, *handle, *super_handle;
2107    fdb_status fs;
2108    fdb_seqnum_t old_seqnum;
2109    fdb_file_handle *fhandle;
2110    char *kvs_name;
2111
2112    if (!handle_ptr) {
2113        return FDB_RESULT_INVALID_HANDLE;
2114    }
2115
2116    handle_in = *handle_ptr;
2117
2118    if (!handle_in) {
2119        return FDB_RESULT_INVALID_HANDLE;
2120    }
2121
2122    if (!handle_in->kvs) {
2123        return FDB_RESULT_INVALID_ARGS;
2124    }
2125    super_handle = handle_in->kvs->root;
2126    fhandle = handle_in->fhandle;
2127    config = handle_in->config;
2128    kvs_config = handle_in->kvs_config;
2129
2130    if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
2131        return fdb_log(&handle_in->log_callback,
2132                       FDB_RESULT_RONLY_VIOLATION,
2133                       "Warning: Rollback is not allowed on "
2134                       "the read-only DB file '%s'.",
2135                       handle_in->file->filename);
2136    }
2137
2138    filemgr_mutex_lock(handle_in->file);
2139    filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
2140    // All transactions should be closed before rollback
2141    if (wal_txn_exists(handle_in->file)) {
2142        filemgr_set_rollback(handle_in->file, 0);
2143        filemgr_mutex_unlock(handle_in->file);
2144        return FDB_RESULT_FAIL_BY_TRANSACTION;
2145    }
2146
2147    // If compaction is running, wait until it is aborted.
2148    // TODO: Find a better way of waiting for the compaction abortion.
2149    unsigned int sleep_time = 10000; // 10 ms.
2150    file_status_t fstatus = filemgr_get_file_status(handle_in->file);
2151    while (fstatus == FILE_COMPACT_OLD) {
2152        filemgr_mutex_unlock(handle_in->file);
2153        decaying_usleep(&sleep_time, 1000000);
2154        filemgr_mutex_lock(handle_in->file);
2155        fstatus = filemgr_get_file_status(handle_in->file);
2156    }
2157    if (fstatus == FILE_REMOVED_PENDING) {
2158        filemgr_mutex_unlock(handle_in->file);
2159        fdb_check_file_reopen(handle_in, NULL);
2160    } else {
2161        filemgr_mutex_unlock(handle_in->file);
2162    }
2163
2164    fdb_sync_db_header(handle_in);
2165
2166    // if the max sequence number seen by this handle is lower than the
2167    // requested snapshot marker, it means the snapshot is not yet visible
2168    // even via the current fdb_kvs_handle
2169    if (seqnum > handle_in->seqnum) {
2170        filemgr_set_rollback(super_handle->file, 0); // allow mutations
2171        return FDB_RESULT_NO_DB_INSTANCE;
2172    }
2173
2174    kvs_name = _fdb_kvs_get_name(handle_in, handle_in->file);
2175    if (seqnum == 0) { // Handle special case of rollback to zero..
2176        fs = _fdb_kvs_remove(fhandle, kvs_name, true /*recreate!*/);
2177        filemgr_set_rollback(super_handle->file, 0); // allow mutations
2178        return fs;
2179    }
2180
2181    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
2182    if (!handle) { // LCOV_EXCL_START
2183        filemgr_set_rollback(handle_in->file, 0); // allow mutations
2184        return FDB_RESULT_ALLOC_FAIL;
2185    } // LCOV_EXCL_STOP
2186
2187    handle->max_seqnum = seqnum;
2188    handle->log_callback = handle_in->log_callback;
2189    handle->fhandle = fhandle;
2190    atomic_init_uint8_t(&handle->handle_busy, 0);
2191
2192    if (handle_in->kvs->type == KVS_SUB) {
2193        fs = _fdb_kvs_open(handle_in->kvs->root,
2194                           &config,
2195                           &kvs_config,
2196                           handle_in->file,
2197                           handle_in->file->filename,
2198                           kvs_name,
2199                           handle);
2200    } else {
2201        fs = _fdb_open(handle, handle_in->file->filename,
2202                       FDB_AFILENAME, &config);
2203    }
2204    filemgr_set_rollback(handle_in->file, 0); // allow mutations
2205
2206    if (fs == FDB_RESULT_SUCCESS) {
2207        // get KV instance's sub B+trees' root node BIDs
2208        // from both ID-tree and Seq-tree, AND
2209        // replace current handle's sub B+trees' root node BIDs
2210        // by old BIDs
2211        size_t size_chunk, size_id;
2212        bid_t id_root, seq_root, dummy;
2213        uint8_t *_kv_id;
2214        hbtrie_result hr;
2215
2216        size_chunk = handle->trie->chunksize;
2217        size_id = sizeof(fdb_kvs_id_t);
2218
2219        filemgr_mutex_lock(handle_in->file);
2220
2221        // read root BID of the KV instance from the old handle
2222        // and overwrite into the current handle
2223        _kv_id = alca(uint8_t, size_chunk);
2224        kvid2buf(size_chunk, handle->kvs->id, _kv_id);
2225        hr = hbtrie_find_partial(handle->trie, _kv_id,
2226                                 size_chunk, &id_root);
2227        btreeblk_end(handle->bhandle);
2228        if (hr == HBTRIE_RESULT_SUCCESS) {
2229            hbtrie_insert_partial(super_handle->trie,
2230                                  _kv_id, size_chunk,
2231                                  &id_root, &dummy);
2232        } else { // No Trie info in rollback header.
2233                 // Erase kv store from super handle's main index.
2234            hbtrie_remove_partial(super_handle->trie, _kv_id, size_chunk);
2235        }
2236        btreeblk_end(super_handle->bhandle);
2237
2238        if (config.seqtree_opt == FDB_SEQTREE_USE) {
2239            // same as above for seq-trie
2240            _kv_id = alca(uint8_t, size_id);
2241            kvid2buf(size_id, handle->kvs->id, _kv_id);
2242            hr = hbtrie_find_partial(handle->seqtrie, _kv_id,
2243                                     size_id, &seq_root);
2244            btreeblk_end(handle->bhandle);
2245            if (hr == HBTRIE_RESULT_SUCCESS) {
2246                hbtrie_insert_partial(super_handle->seqtrie,
2247                                      _kv_id, size_id,
2248                                      &seq_root, &dummy);
2249            } else { // No seqtrie info in rollback header.
2250                     // Erase kv store from super handle's seqtrie index.
2251                hbtrie_remove_partial(super_handle->seqtrie, _kv_id, size_id);
2252            }
2253            btreeblk_end(super_handle->bhandle);
2254        }
2255
2256        old_seqnum = fdb_kvs_get_seqnum(handle_in->file,
2257                                        handle_in->kvs->id);
2258        fdb_kvs_set_seqnum(handle_in->file,
2259                           handle_in->kvs->id, seqnum);
2260        handle_in->seqnum = seqnum;
2261        filemgr_mutex_unlock(handle_in->file);
2262
2263        super_handle->rollback_revnum = handle->rollback_revnum;
2264        fs = _fdb_commit(super_handle, FDB_COMMIT_MANUAL_WAL_FLUSH,
2265                         !(handle_in->config.durability_opt & FDB_DRB_ASYNC));
2266        if (fs == FDB_RESULT_SUCCESS) {
2267            _fdb_kvs_close(handle);
2268            *handle_ptr = handle_in;
2269            fdb_kvs_info_free(handle);
2270            free(handle);
2271        } else {
2272            // cancel the rolling-back of the sequence number
2273            fdb_log(&handle_in->log_callback, fs,
2274                    "Rollback failed due to a commit failure with a sequence "
2275                    "number %" _F64, seqnum);
2276            filemgr_mutex_lock(handle_in->file);
2277            fdb_kvs_set_seqnum(handle_in->file,
2278                               handle_in->kvs->id, old_seqnum);
2279            filemgr_mutex_unlock(handle_in->file);
2280            _fdb_kvs_close(handle);
2281            fdb_kvs_info_free(handle);
2282            free(handle);
2283        }
2284    } else {
2285        free(handle);
2286    }
2287
2288    return fs;
2289}
2290
2291LIBFDB_API
2292fdb_status fdb_kvs_remove(fdb_file_handle *fhandle,
2293                          const char *kvs_name)
2294{
2295    return _fdb_kvs_remove(fhandle, kvs_name, false);
2296}
2297
2298LIBFDB_API
2299fdb_status fdb_get_kvs_info(fdb_kvs_handle *handle, fdb_kvs_info *info)
2300{
2301    uint64_t ndocs;
2302    uint64_t ndeletes;
2303    uint64_t wal_docs;
2304    uint64_t wal_deletes;
2305    uint64_t wal_n_inserts;
2306    uint64_t datasize;
2307    uint64_t nlivenodes;
2308    fdb_kvs_id_t kv_id;
2309    struct avl_node *a;
2310    struct filemgr *file;
2311    struct kvs_node *node, query;
2312    struct kvs_header *kv_header;
2313    struct kvs_stat stat;
2314
2315    if (!handle) {
2316        return FDB_RESULT_INVALID_HANDLE;
2317    }
2318
2319    if (!info) {
2320        return FDB_RESULT_INVALID_ARGS;
2321    }
2322
2323    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2324        return FDB_RESULT_HANDLE_BUSY;
2325    }
2326
2327    if (!handle->shandle) { // snapshot handle should be immutable
2328        fdb_check_file_reopen(handle, NULL);
2329        fdb_sync_db_header(handle);
2330    }
2331
2332    file = handle->file;
2333
2334    if (handle->kvs == NULL) {
2335        info->name = default_kvs_name;
2336        kv_id = 0;
2337
2338    } else {
2339        kv_header = file->kv_header;
2340        kv_id = handle->kvs->id;
2341        spin_lock(&kv_header->lock);
2342
2343        query.id = handle->kvs->id;
2344        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
2345        if (a) { // sub handle
2346            node = _get_entry(a, struct kvs_node, avl_id);
2347            info->name = (const char*)node->kvs_name;
2348        } else { // root handle
2349            info->name = default_kvs_name;
2350        }
2351        spin_unlock(&kv_header->lock);
2352    }
2353
2354    if (handle->shandle) {
2355        // snapshot .. get its local stats
2356        snap_get_stat(handle->shandle, &stat);
2357    } else {
2358        _kvs_stat_get(file, kv_id, &stat);
2359    }
2360    ndocs = stat.ndocs;
2361    ndeletes = stat.ndeletes;
2362    wal_docs = stat.wal_ndocs;
2363    wal_deletes = stat.wal_ndeletes;
2364    wal_n_inserts = wal_docs - wal_deletes;
2365
2366    if (ndocs + wal_n_inserts < wal_deletes) {
2367        info->doc_count = 0;
2368    } else {
2369        if (ndocs) { // not accurate since some ndocs may be in wal_n_inserts
2370            info->doc_count = ndocs + wal_n_inserts - wal_deletes;
2371        } else { // this is accurate
2372            info->doc_count = wal_n_inserts;
2373        }
2374    }
2375
2376    if (ndeletes) { // not accurate since some ndeletes may be wal_n_deletes
2377        info->deleted_count = ndeletes + wal_deletes;
2378    } else { // this is accurate
2379        info->deleted_count = wal_deletes;
2380    }
2381
2382    datasize = stat.datasize;
2383    nlivenodes = stat.nlivenodes;
2384
2385    info->space_used = datasize;
2386    info->space_used += nlivenodes * handle->config.blocksize;
2387    info->file = handle->fhandle;
2388
2389    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
2390
2391    // This is another LIBFDB_API call, so handle is marked as free
2392    // in the line above before making this call
2393    fdb_get_kvs_seqnum(handle, &info->last_seqnum);
2394
2395    return FDB_RESULT_SUCCESS;
2396}
2397
2398LIBFDB_API
2399fdb_status fdb_get_kvs_ops_info(fdb_kvs_handle *handle, fdb_kvs_ops_info *info)
2400{
2401    fdb_kvs_id_t kv_id;
2402    struct filemgr *file;
2403    struct kvs_ops_stat stat;
2404    struct kvs_ops_stat root_stat;
2405
2406    if (!handle) {
2407        return FDB_RESULT_INVALID_HANDLE;
2408    }
2409
2410    if (!info) {
2411        return FDB_RESULT_INVALID_ARGS;
2412    }
2413
2414    fdb_kvs_handle *root_handle = handle->fhandle->root;
2415
2416    // for snapshot handle do not reopen new file as user is interested in
2417    // reader stats from the old file
2418    if (!handle->shandle) {
2419        // always get stats from the latest file
2420        fdb_check_file_reopen(handle, NULL);
2421        fdb_sync_db_header(handle);
2422    }
2423
2424    file = handle->file;
2425
2426    if (handle->kvs == NULL) {
2427        kv_id = 0;
2428    } else {
2429        kv_id = handle->kvs->id;
2430    }
2431
2432    _kvs_ops_stat_get(file, kv_id, &stat);
2433
2434    if (root_handle != handle) {
2435        _kvs_ops_stat_get(file, 0, &root_stat);
2436    } else {
2437        root_stat = stat;
2438    }
2439
2440    info->num_sets = atomic_get_uint64_t(&stat.num_sets, std::memory_order_relaxed);
2441    info->num_dels = atomic_get_uint64_t(&stat.num_dels, std::memory_order_relaxed);
2442    info->num_gets = atomic_get_uint64_t(&stat.num_gets, std::memory_order_relaxed);
2443    info->num_iterator_gets = atomic_get_uint64_t(&stat.num_iterator_gets,
2444                                                  std::memory_order_relaxed);
2445    info->num_iterator_gets = atomic_get_uint64_t(&stat.num_iterator_gets,
2446                                                  std::memory_order_relaxed);
2447    info->num_iterator_moves = atomic_get_uint64_t(&stat.num_iterator_moves,
2448                                                   std::memory_order_relaxed);
2449
2450    info->num_commits = atomic_get_uint64_t(&root_stat.num_commits,
2451                                            std::memory_order_relaxed);
2452    info->num_compacts = atomic_get_uint64_t(&root_stat.num_compacts,
2453                                             std::memory_order_relaxed);
2454    return FDB_RESULT_SUCCESS;
2455}
2456
2457LIBFDB_API
2458fdb_status fdb_get_kvs_name_list(fdb_file_handle *fhandle,
2459                                 fdb_kvs_name_list *kvs_name_list)
2460{
2461    size_t num, size, offset;
2462    char *ptr;
2463    char **segment;
2464    fdb_kvs_handle *root_handle;
2465    struct kvs_header *kv_header;
2466    struct kvs_node *node;
2467    struct avl_node *a;
2468
2469    if (!fhandle) {
2470        return FDB_RESULT_INVALID_HANDLE;
2471    }
2472
2473    if (!kvs_name_list) {
2474        return FDB_RESULT_INVALID_ARGS;
2475    }
2476
2477    root_handle = fhandle->root;
2478    kv_header = root_handle->file->kv_header;
2479
2480    spin_lock(&kv_header->lock);
2481    // sum all lengths of KVS names first
2482    // (to calculate the size of memory segment to be allocated)
2483    num = 1;
2484    size = strlen(default_kvs_name) + 1;
2485    a = avl_first(kv_header->idx_id);
2486    while (a) {
2487        node = _get_entry(a, struct kvs_node, avl_id);
2488        a = avl_next(&node->avl_id);
2489
2490        num++;
2491        size += strlen(node->kvs_name) + 1;
2492    }
2493    size += num * sizeof(char*);
2494
2495    // allocate memory segment
2496    segment = (char**)calloc(1, size);
2497    kvs_name_list->num_kvs_names = num;
2498    kvs_name_list->kvs_names = segment;
2499
2500    ptr = (char*)segment + num * sizeof(char*);
2501    offset = num = 0;
2502
2503    // copy default KVS name
2504    strcpy(ptr + offset, default_kvs_name);
2505    segment[num] = ptr + offset;
2506    num++;
2507    offset += strlen(default_kvs_name) + 1;
2508
2509    // copy the others
2510    a = avl_first(kv_header->idx_name);
2511    while (a) {
2512        node = _get_entry(a, struct kvs_node, avl_name);
2513        a = avl_next(&node->avl_name);
2514
2515        strcpy(ptr + offset, node->kvs_name);
2516        segment[num] = ptr + offset;
2517
2518        num++;
2519        offset += strlen(node->kvs_name) + 1;
2520    }
2521
2522    spin_unlock(&kv_header->lock);
2523
2524    return FDB_RESULT_SUCCESS;
2525}
2526
2527LIBFDB_API
2528fdb_status fdb_free_kvs_name_list(fdb_kvs_name_list *kvs_name_list)
2529{
2530    if (!kvs_name_list) {
2531        return FDB_RESULT_INVALID_ARGS;
2532    }
2533
2534    free(kvs_name_list->kvs_names);
2535    kvs_name_list->kvs_names = NULL;
2536    kvs_name_list->num_kvs_names = 0;
2537
2538    return FDB_RESULT_SUCCESS;
2539}
2540
2541stale_header_info fdb_get_smallest_active_header(fdb_kvs_handle *handle)
2542{
2543    uint8_t *hdr_buf = alca(uint8_t, handle->config.blocksize);
2544    size_t i, hdr_len;
2545    uint64_t n_headers;
2546    bid_t hdr_bid, last_wal_bid;
2547    filemgr_header_revnum_t hdr_revnum;
2548    filemgr_header_revnum_t cur_revnum;
2549    filemgr_magic_t magic;
2550    fdb_seqnum_t seqnum;
2551    fdb_file_handle *fhandle = NULL;
2552    stale_header_info ret;
2553    struct avl_node *a;
2554    struct filemgr_fhandle_idx_node *fhandle_node;
2555    struct list_elem *e;
2556    struct kvs_opened_node *item;
2557
2558    ret.revnum = cur_revnum = handle->fhandle->root->cur_header_revnum;
2559    ret.bid = handle->fhandle->root->last_hdr_bid;
2560
2561    spin_lock(&handle->file->fhandle_idx_lock);
2562
2563    // check all opened file handles
2564    a = avl_first(&handle->file->fhandle_idx);
2565    while (a) {
2566        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
2567        a = avl_next(a);
2568
2569        fhandle = (fdb_file_handle*)fhandle_node->fhandle;
2570        spin_lock(&fhandle->lock);
2571        // check all opened KVS handles belonging to the file handle
2572        e = list_begin(fhandle->handles);
2573        while (e) {
2574
2575            item = _get_entry(e, struct kvs_opened_node, le);
2576            e = list_next(e);
2577
2578            if (item->handle->cur_header_revnum < ret.revnum) {
2579                ret.revnum = item->handle->cur_header_revnum;
2580                ret.bid = item->handle->last_hdr_bid;
2581            }
2582        }
2583        spin_unlock(&fhandle->lock);
2584    }
2585
2586    spin_unlock(&handle->file->fhandle_idx_lock);
2587
2588    uint64_t num_keeping_headers =
2589        atomic_get_uint64_t(&handle->file->config->num_keeping_headers,
2590                            std::memory_order_relaxed);
2591    if (num_keeping_headers) {
2592        // backward scan previous header info to keep more headers
2593
2594        if (ret.bid == handle->last_hdr_bid) {
2595            // header in 'handle->last_hdr_bid' is not written into file yet!
2596            // we should start from the previous header
2597            hdr_bid = atomic_get_uint64_t(&handle->file->header.bid);
2598            hdr_revnum = handle->file->header.revnum;
2599        } else {
2600            hdr_bid = ret.bid;
2601            hdr_revnum = ret.revnum;
2602        }
2603
2604        n_headers= num_keeping_headers;
2605        if (cur_revnum - hdr_revnum < n_headers) {
2606            n_headers = n_headers - (cur_revnum - hdr_revnum);
2607        } else {
2608            n_headers = 0;
2609        }
2610
2611        for (i=0; i<n_headers; ++i) {
2612            hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
2613                         hdr_buf, &hdr_len, &seqnum, &hdr_revnum, NULL,
2614                         &magic, NULL, &handle->log_callback);
2615            if (hdr_len) {
2616                ret.revnum = hdr_revnum;
2617                ret.bid = hdr_bid;
2618            } else {
2619                break;
2620            }
2621        }
2622    }
2623
2624    // although we keep more headers from the oldest active header, we have to
2625    // preserve the last WAL flushing header from the target header for data
2626    // consistency.
2627    uint64_t dummy64;
2628    char *new_filename;
2629
2630    filemgr_fetch_header(handle->file, ret.bid, hdr_buf, &hdr_len, &seqnum,
2631                         &hdr_revnum, NULL, &magic, NULL, &handle->log_callback);
2632    fdb_fetch_header(magic, hdr_buf, &dummy64, &dummy64, &dummy64, &dummy64,
2633                     &dummy64, &dummy64, &dummy64, &last_wal_bid, &dummy64,
2634                     &dummy64, &new_filename, NULL);
2635
2636    if (last_wal_bid != BLK_NOT_FOUND) {
2637        filemgr_fetch_header(handle->file, last_wal_bid, hdr_buf, &hdr_len, &seqnum,
2638                             &hdr_revnum, NULL, &magic, NULL, &handle->log_callback);
2639        ret.bid = last_wal_bid;
2640        ret.revnum = hdr_revnum;
2641    } else {
2642        // WAL has not been flushed yet .. we cannot trigger block reusing
2643        ret.bid = BLK_NOT_FOUND;
2644        ret.revnum = 0;
2645    }
2646
2647    return ret;
2648}
2649
2650