xref: /4.6.0/forestdb/src/kv_instance.cc (revision 1d169510)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdlib.h>
19#include <string.h>
20
21#include "libforestdb/forestdb.h"
22#include "common.h"
23#include "internal_types.h"
24#include "fdb_internal.h"
25#include "configuration.h"
26#include "avltree.h"
27#include "list.h"
28#include "docio.h"
29#include "filemgr.h"
30#include "wal.h"
31#include "hbtrie.h"
32#include "btreeblock.h"
33#include "version.h"
34#include "staleblock.h"
35
36#include "memleak.h"
37#include "timing.h"
38#include "time_utils.h"
39
40static const char *default_kvs_name = DEFAULT_KVS_NAME;
41
42// list element for opened KV store handles
43// (in-memory data: managed by the file handle)
44struct kvs_opened_node {
45    fdb_kvs_handle *handle;
46    struct list_elem le;
47};
48
49// list element for custom cmp functions in fhandle
50struct cmp_func_node {
51    char *kvs_name;
52    fdb_custom_cmp_variable func;
53    struct list_elem le;
54};
55
56static int _kvs_cmp_name(struct avl_node *a, struct avl_node *b, void *aux)
57{
58    struct kvs_node *aa, *bb;
59    aa = _get_entry(a, struct kvs_node, avl_name);
60    bb = _get_entry(b, struct kvs_node, avl_name);
61    return strcmp(aa->kvs_name, bb->kvs_name);
62}
63
64static int _kvs_cmp_id(struct avl_node *a, struct avl_node *b, void *aux)
65{
66    struct kvs_node *aa, *bb;
67    aa = _get_entry(a, struct kvs_node, avl_id);
68    bb = _get_entry(b, struct kvs_node, avl_id);
69
70    if (aa->id < bb->id) {
71        return -1;
72    } else if (aa->id > bb->id) {
73        return 1;
74    } else {
75        return 0;
76    }
77}
78
79struct kvs_opened_node *_fdb_kvs_createNLinkKVHandle(fdb_file_handle *fhandle,
80                                                     fdb_kvs_handle *handle)
81{
82    struct kvs_opened_node *opened_node = (struct kvs_opened_node *)
83        calloc(1, sizeof(struct kvs_opened_node));
84    opened_node->handle = handle;
85
86    handle->node = opened_node;
87    spin_lock(&fhandle->lock);
88    list_push_back(fhandle->handles, &opened_node->le);
89    spin_unlock(&fhandle->lock);
90    return opened_node;
91}
92
93static bool _fdb_kvs_any_handle_opened(fdb_file_handle *fhandle,
94                                       fdb_kvs_id_t kv_id)
95{
96    struct filemgr *file = fhandle->root->file;
97    struct avl_node *a;
98    struct list_elem *e;
99    struct filemgr_fhandle_idx_node *fhandle_node;
100    struct kvs_opened_node *opened_node;
101    fdb_file_handle *file_handle;
102
103    spin_lock(&file->fhandle_idx_lock);
104    a = avl_first(&file->fhandle_idx);
105    while (a) {
106        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
107        a = avl_next(a);
108        file_handle = (fdb_file_handle *) fhandle_node->fhandle;
109        spin_lock(&file_handle->lock);
110        e = list_begin(file_handle->handles);
111        while (e) {
112            opened_node = _get_entry(e, struct kvs_opened_node, le);
113            if ((opened_node->handle->kvs && opened_node->handle->kvs->id == kv_id) ||
114                (kv_id == 0 && opened_node->handle->kvs == NULL)) // single KVS mode
115            {
116                // there is an opened handle
117                spin_unlock(&file_handle->lock);
118                spin_unlock(&file->fhandle_idx_lock);
119                return true;
120            }
121            e = list_next(e);
122        }
123        spin_unlock(&file_handle->lock);
124    }
125    spin_unlock(&file->fhandle_idx_lock);
126
127    return false;
128}
129
130void fdb_file_handle_init(fdb_file_handle *fhandle,
131                           fdb_kvs_handle *root)
132{
133    fhandle->root = root;
134    fhandle->flags = 0x0;
135    root->fhandle = fhandle;
136    fhandle->handles = (struct list*)calloc(1, sizeof(struct list));
137    fhandle->cmp_func_list = NULL;
138    spin_init(&fhandle->lock);
139}
140
141void fdb_file_handle_close_all(fdb_file_handle *fhandle)
142{
143    struct list_elem *e;
144    struct kvs_opened_node *node;
145
146    spin_lock(&fhandle->lock);
147    e = list_begin(fhandle->handles);
148    while (e) {
149        node = _get_entry(e, struct kvs_opened_node, le);
150        e = list_next(e);
151        _fdb_close(node->handle);
152        free(node->handle);
153        free(node);
154    }
155    spin_unlock(&fhandle->lock);
156}
157
158void fdb_file_handle_parse_cmp_func(fdb_file_handle *fhandle,
159                                    size_t n_func,
160                                    char **kvs_names,
161                                    fdb_custom_cmp_variable *functions)
162{
163    uint64_t i;
164    struct cmp_func_node *node;
165
166    if (n_func == 0 || !kvs_names || !functions) {
167        return;
168    }
169
170    fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
171    list_init(fhandle->cmp_func_list);
172
173    for (i=0;i<n_func;++i){
174        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
175        if (kvs_names[i]) {
176            node->kvs_name = (char*)calloc(1, strlen(kvs_names[i])+1);
177            strcpy(node->kvs_name, kvs_names[i]);
178        } else {
179            // NULL .. default KVS
180            node->kvs_name = NULL;
181        }
182        node->func = functions[i];
183        list_push_back(fhandle->cmp_func_list, &node->le);
184    }
185}
186
187// clone all items in cmp_func_list to fhandle->cmp_func_list
188void fdb_file_handle_clone_cmp_func_list(fdb_file_handle *fhandle,
189                                         struct list *cmp_func_list)
190{
191    struct list_elem *e;
192    struct cmp_func_node *src, *dst;
193
194    if (fhandle->cmp_func_list || /* already exist */
195        !cmp_func_list) {
196        return;
197    }
198
199    fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
200    list_init(fhandle->cmp_func_list);
201
202    e = list_begin(cmp_func_list);
203    while (e) {
204        src = _get_entry(e, struct cmp_func_node, le);
205        dst = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
206        if (src->kvs_name) {
207            dst->kvs_name = (char*)calloc(1, strlen(src->kvs_name)+1);
208            strcpy(dst->kvs_name, src->kvs_name);
209        } else {
210            dst->kvs_name = NULL; // default KVS
211        }
212        dst->func = src->func;
213        list_push_back(fhandle->cmp_func_list, &dst->le);
214        e = list_next(&src->le);
215    }
216}
217
218void fdb_file_handle_add_cmp_func(fdb_file_handle *fhandle,
219                                  char *kvs_name,
220                                  fdb_custom_cmp_variable cmp_func)
221{
222    struct cmp_func_node *node;
223
224    // create list if not exist
225    if (!fhandle->cmp_func_list) {
226        fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
227        list_init(fhandle->cmp_func_list);
228    }
229
230    node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
231    if (kvs_name) {
232        node->kvs_name = (char*)calloc(1, strlen(kvs_name)+1);
233        strcpy(node->kvs_name, kvs_name);
234    } else {
235        // default KVS
236        node->kvs_name = NULL;
237    }
238    node->func = cmp_func;
239    list_push_back(fhandle->cmp_func_list, &node->le);
240}
241
242void fdb_cmp_func_list_from_filemgr(struct filemgr *file, struct list *cmp_func_list)
243{
244    if (!file || !file->kv_header || !cmp_func_list) {
245        return;
246    }
247
248    struct cmp_func_node *node;
249
250    spin_lock(&file->kv_header->lock);
251    // Default KV store cmp function
252    if (file->kv_header->default_kvs_cmp) {
253        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
254        node->func = file->kv_header->default_kvs_cmp;
255        node->kvs_name = NULL;
256        list_push_back(cmp_func_list, &node->le);
257    }
258
259    // Rest of KV stores
260    struct kvs_node *kvs_node;
261    struct avl_node *a = avl_first(file->kv_header->idx_name);
262    while (a) {
263        kvs_node = _get_entry(a, struct kvs_node, avl_name);
264        a = avl_next(a);
265        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
266        node->func = kvs_node->custom_cmp;
267        node->kvs_name = (char*)calloc(1, strlen(kvs_node->kvs_name)+1);
268        strcpy(node->kvs_name, kvs_node->kvs_name);
269        list_push_back(cmp_func_list, &node->le);
270    }
271    spin_unlock(&file->kv_header->lock);
272}
273
274void fdb_free_cmp_func_list(struct list *cmp_func_list)
275{
276    if (!cmp_func_list) {
277        return;
278    }
279
280    struct cmp_func_node *cmp_node;
281    struct list_elem *e = list_begin(cmp_func_list);
282    while (e) {
283        cmp_node = _get_entry(e, struct cmp_func_node, le);
284        e = list_remove(cmp_func_list, &cmp_node->le);
285        free(cmp_node->kvs_name);
286        free(cmp_node);
287    }
288}
289
290static void _free_cmp_func_list(fdb_file_handle *fhandle)
291{
292    struct list_elem *e;
293    struct cmp_func_node *cmp_node;
294
295    if (!fhandle->cmp_func_list) {
296        return;
297    }
298
299    e = list_begin(fhandle->cmp_func_list);
300    while (e) {
301        cmp_node = _get_entry(e, struct cmp_func_node, le);
302        e = list_remove(fhandle->cmp_func_list, &cmp_node->le);
303
304        free(cmp_node->kvs_name);
305        free(cmp_node);
306    }
307    free(fhandle->cmp_func_list);
308    fhandle->cmp_func_list = NULL;
309}
310
311void fdb_file_handle_free(fdb_file_handle *fhandle)
312{
313    free(fhandle->handles);
314    _free_cmp_func_list(fhandle);
315    spin_destroy(&fhandle->lock);
316    free(fhandle);
317}
318
319fdb_status fdb_kvs_cmp_check(fdb_kvs_handle *handle)
320{
321    int ori_flag;
322    fdb_file_handle *fhandle = handle->fhandle;
323    fdb_custom_cmp_variable ori_custom_cmp;
324    struct filemgr *file = handle->file;
325    struct cmp_func_node *cmp_node;
326    struct kvs_node *kvs_node, query;
327    struct list_elem *e;
328    struct avl_node *a;
329
330    spin_lock(&file->kv_header->lock);
331    ori_flag = file->kv_header->custom_cmp_enabled;
332    ori_custom_cmp = file->kv_header->default_kvs_cmp;
333
334    if (fhandle->cmp_func_list) {
335        handle->kvs_config.custom_cmp = NULL;
336
337        e = list_begin(fhandle->cmp_func_list);
338        while (e) {
339            cmp_node = _get_entry(e, struct cmp_func_node, le);
340            if (cmp_node->kvs_name == NULL ||
341                    !strcmp(cmp_node->kvs_name, default_kvs_name)) { // default KVS
342                handle->kvs_config.custom_cmp = cmp_node->func;
343                file->kv_header->default_kvs_cmp = cmp_node->func;
344                file->kv_header->custom_cmp_enabled = 1;
345            } else {
346                // search by name
347                query.kvs_name = cmp_node->kvs_name;
348                a = avl_search(file->kv_header->idx_name,
349                               &query.avl_name,
350                               _kvs_cmp_name);
351                if (a) { // found
352                    kvs_node = _get_entry(a, struct kvs_node, avl_name);
353                    if (!kvs_node->custom_cmp) {
354                        kvs_node->custom_cmp = cmp_node->func;
355                    }
356                    file->kv_header->custom_cmp_enabled = 1;
357                }
358            }
359            e = list_next(&cmp_node->le);
360        }
361    }
362
363    // first check the default KVS
364    // 1. root handle has not been opened yet: don't care
365    // 2. root handle was opened before: must match the flag
366    if (fhandle->flags & FHANDLE_ROOT_INITIALIZED) {
367        if (fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP &&
368            handle->kvs_config.custom_cmp == NULL) {
369            // custom cmp function was assigned before,
370            // but no custom cmp function is assigned
371            file->kv_header->custom_cmp_enabled = ori_flag;
372            file->kv_header->default_kvs_cmp = ori_custom_cmp;
373            spin_unlock(&file->kv_header->lock);
374            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
375            if (!kvs_name) {
376                kvs_name = DEFAULT_KVS_NAME;
377            }
378            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
379                           "Error! Tried to open a KV store '%s', which was created with "
380                           "custom compare function enabled, without passing the same "
381                           "custom compare function.", kvs_name);
382        }
383        if (!(fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP) &&
384              handle->kvs_config.custom_cmp) {
385            // custom cmp function was not assigned before,
386            // but custom cmp function is assigned from user
387            file->kv_header->custom_cmp_enabled = ori_flag;
388            file->kv_header->default_kvs_cmp = ori_custom_cmp;
389            spin_unlock(&file->kv_header->lock);
390            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
391            if (!kvs_name) {
392                kvs_name = DEFAULT_KVS_NAME;
393            }
394            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
395                           "Error! Tried to open a KV store '%s', which was created without "
396                           "custom compare function, by passing custom compare function.",
397                    kvs_name);
398        }
399    }
400
401    // next check other KVSs
402    a = avl_first(file->kv_header->idx_name);
403    while (a) {
404        kvs_node = _get_entry(a, struct kvs_node, avl_name);
405        a = avl_next(a);
406
407        if (kvs_node->flags & KVS_FLAG_CUSTOM_CMP &&
408            kvs_node->custom_cmp == NULL) {
409            // custom cmp function was assigned before,
410            // but no custom cmp function is assigned
411            file->kv_header->custom_cmp_enabled = ori_flag;
412            file->kv_header->default_kvs_cmp = ori_custom_cmp;
413            spin_unlock(&file->kv_header->lock);
414            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
415            if (!kvs_name) {
416                kvs_name = DEFAULT_KVS_NAME;
417            }
418            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
419                           "Error! Tried to open a KV store '%s', which was created with "
420                           "custom compare function enabled, without passing the same "
421                           "custom compare function.", kvs_name);
422        }
423        if (!(kvs_node->flags & KVS_FLAG_CUSTOM_CMP) &&
424              kvs_node->custom_cmp) {
425            // custom cmp function was not assigned before,
426            // but custom cmp function is assigned from user
427            file->kv_header->custom_cmp_enabled = ori_flag;
428            file->kv_header->default_kvs_cmp = ori_custom_cmp;
429            spin_unlock(&file->kv_header->lock);
430            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
431            if (!kvs_name) {
432                kvs_name = DEFAULT_KVS_NAME;
433            }
434            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
435                           "Error! Tried to open a KV store '%s', which was created without "
436                           "custom compare function, by passing custom compare function.",
437                           kvs_name);
438        }
439    }
440
441    spin_unlock(&file->kv_header->lock);
442    return FDB_RESULT_SUCCESS;
443}
444
445fdb_custom_cmp_variable fdb_kvs_find_cmp_name(fdb_kvs_handle *handle,
446                                              char *kvs_name)
447{
448    fdb_file_handle *fhandle;
449    struct list_elem *e;
450    struct cmp_func_node *cmp_node;
451
452    fhandle = handle->fhandle;
453    if (!fhandle->cmp_func_list) {
454        return NULL;
455    }
456
457    e = list_begin(fhandle->cmp_func_list);
458    while (e) {
459        cmp_node = _get_entry(e, struct cmp_func_node, le);
460        if (kvs_name == NULL ||
461            !strcmp(kvs_name, default_kvs_name)) {
462            if (cmp_node->kvs_name == NULL ||
463                !strcmp(cmp_node->kvs_name, default_kvs_name)) { // default KVS
464                return cmp_node->func;
465            }
466        } else if (cmp_node->kvs_name &&
467                   !strcmp(cmp_node->kvs_name, kvs_name)) {
468            return cmp_node->func;
469        }
470        e = list_next(&cmp_node->le);
471    }
472    return NULL;
473}
474
475hbtrie_cmp_func *fdb_kvs_find_cmp_chunk(void *chunk, void *aux)
476{
477    fdb_kvs_id_t kv_id;
478    struct hbtrie *trie = (struct hbtrie *)aux;
479    struct btreeblk_handle *bhandle;
480    struct filemgr *file;
481    struct avl_node *a;
482    struct kvs_node query, *node;
483
484    bhandle = (struct btreeblk_handle*)trie->btreeblk_handle;
485    file = bhandle->file;
486
487    if (!file->kv_header->custom_cmp_enabled) {
488        return NULL;
489    }
490
491    buf2kvid(trie->chunksize, chunk, &kv_id);
492
493    // search by id
494    if (kv_id > 0) {
495        query.id = kv_id;
496        spin_lock(&file->kv_header->lock);
497        a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
498        spin_unlock(&file->kv_header->lock);
499
500        if (a) {
501            node = _get_entry(a, struct kvs_node, avl_id);
502            return (hbtrie_cmp_func *)node->custom_cmp;
503        }
504    } else {
505        // root handle
506        return (hbtrie_cmp_func *)file->kv_header->default_kvs_cmp;
507    }
508    return NULL;
509}
510
511void _fdb_kvs_init_root(fdb_kvs_handle *handle, struct filemgr *file) {
512    handle->kvs->type = KVS_ROOT;
513    handle->kvs->root = handle->fhandle->root;
514    // super handle's ID is always 0
515    handle->kvs->id = 0;
516    // force custom cmp function
517    spin_lock(&file->kv_header->lock);
518    handle->kvs_config.custom_cmp = file->kv_header->default_kvs_cmp;
519    spin_unlock(&file->kv_header->lock);
520}
521
522void fdb_kvs_info_create(fdb_kvs_handle *root_handle,
523                         fdb_kvs_handle *handle,
524                         struct filemgr *file,
525                         const char *kvs_name)
526{
527    struct kvs_node query, *kvs_node;
528    struct kvs_opened_node *opened_node;
529    struct avl_node *a;
530
531    handle->kvs = (struct kvs_info*)calloc(1, sizeof(struct kvs_info));
532
533    if (root_handle == NULL) {
534        // 'handle' is a super handle
535        _fdb_kvs_init_root(handle, file);
536    } else {
537        // 'handle' is a sub handle (i.e., KV instance in a DB instance)
538        handle->kvs->type = KVS_SUB;
539        handle->kvs->root = root_handle;
540
541        if (kvs_name) {
542            spin_lock(&file->kv_header->lock);
543            query.kvs_name = (char*)kvs_name;
544            a = avl_search(file->kv_header->idx_name, &query.avl_name,
545                           _kvs_cmp_name);
546            if (a == NULL) {
547                // KV instance name is not found
548                free(handle->kvs);
549                handle->kvs = NULL;
550                spin_unlock(&file->kv_header->lock);
551                return;
552            }
553            kvs_node = _get_entry(a, struct kvs_node, avl_name);
554            handle->kvs->id = kvs_node->id;
555            // force custom cmp function
556            handle->kvs_config.custom_cmp = kvs_node->custom_cmp;
557            spin_unlock(&file->kv_header->lock);
558        } else {
559            // snapshot of the root handle
560            handle->kvs->id = 0;
561        }
562
563        opened_node = (struct kvs_opened_node *)
564               calloc(1, sizeof(struct kvs_opened_node));
565        opened_node->handle = handle;
566
567        handle->node = opened_node;
568        spin_lock(&root_handle->fhandle->lock);
569        list_push_back(root_handle->fhandle->handles, &opened_node->le);
570        spin_unlock(&root_handle->fhandle->lock);
571    }
572}
573
574void fdb_kvs_info_free(fdb_kvs_handle *handle)
575{
576    if (handle->kvs == NULL) {
577        return;
578    }
579
580    free(handle->kvs);
581    handle->kvs = NULL;
582}
583
584void _fdb_kvs_header_create(struct kvs_header **kv_header_ptr)
585{
586    struct kvs_header *kv_header;
587
588    kv_header = (struct kvs_header *)calloc(1, sizeof(struct kvs_header));
589    *kv_header_ptr = kv_header;
590
591    // KV ID '0' is reserved for default KV instance (super handle)
592    kv_header->id_counter = 1;
593    kv_header->default_kvs_cmp = NULL;
594    kv_header->custom_cmp_enabled = 0;
595    kv_header->idx_name = (struct avl_tree*)malloc(sizeof(struct avl_tree));
596    kv_header->idx_id = (struct avl_tree*)malloc(sizeof(struct avl_tree));
597    kv_header->num_kv_stores = 0;
598    avl_init(kv_header->idx_name, NULL);
599    avl_init(kv_header->idx_id, NULL);
600    spin_init(&kv_header->lock);
601}
602
603void fdb_kvs_header_create(struct filemgr *file)
604{
605    if (file->kv_header) {
606        return; // already exist
607    }
608
609    _fdb_kvs_header_create(&file->kv_header);
610    file->free_kv_header = fdb_kvs_header_free;
611}
612
613void fdb_kvs_header_reset_all_stats(struct filemgr *file)
614{
615    struct avl_node *a;
616    struct kvs_node *node;
617    struct kvs_header *kv_header = file->kv_header;
618
619    spin_lock(&kv_header->lock);
620    a = avl_first(kv_header->idx_id);
621    while (a) {
622        node = _get_entry(a, struct kvs_node, avl_id);
623        a = avl_next(&node->avl_id);
624        memset(&node->stat, 0x0, sizeof(node->stat));
625    }
626    spin_unlock(&kv_header->lock);
627}
628
629void fdb_kvs_header_copy(fdb_kvs_handle *handle,
630                         struct filemgr *new_file,
631                         struct docio_handle *new_dhandle,
632                         uint64_t *new_file_kv_info_offset,
633                         bool create_new)
634{
635    struct avl_node *a, *aa;
636    struct kvs_node *node_old, *node_new;
637
638    if (create_new) {
639        struct kvs_header *kv_header;
640        // copy KV header data in 'handle' to new file
641        _fdb_kvs_header_create(&kv_header);
642        // read from 'handle->dhandle', and import into 'new_file'
643        fdb_kvs_header_read(kv_header, handle->dhandle,
644                            handle->kv_info_offset, handle->file->version, false);
645
646        // write KV header in 'new_file' using 'new_dhandle'
647        uint64_t new_kv_info_offset;
648        fdb_kvs_handle new_handle;
649        new_handle.file = new_file;
650        new_handle.dhandle = new_dhandle;
651        new_handle.kv_info_offset = BLK_NOT_FOUND;
652        new_kv_info_offset = fdb_kvs_header_append(&new_handle);
653        if (new_file_kv_info_offset) {
654            *new_file_kv_info_offset = new_kv_info_offset;
655        }
656
657        if (!filemgr_set_kv_header(new_file, kv_header, fdb_kvs_header_free)) {
658            // LCOV_EXCL_START
659            _fdb_kvs_header_free(kv_header);
660        } // LCOV_EXCL_STOP
661        fdb_kvs_header_reset_all_stats(new_file);
662    }
663
664    spin_lock(&handle->file->kv_header->lock);
665    spin_lock(&new_file->kv_header->lock);
666    // copy all in-memory custom cmp function pointers & seqnums
667    new_file->kv_header->default_kvs_cmp =
668        handle->file->kv_header->default_kvs_cmp;
669    new_file->kv_header->custom_cmp_enabled =
670        handle->file->kv_header->custom_cmp_enabled;
671    a = avl_first(handle->file->kv_header->idx_id);
672    while (a) {
673        node_old = _get_entry(a, struct kvs_node, avl_id);
674        aa = avl_search(new_file->kv_header->idx_id,
675                        &node_old->avl_id, _kvs_cmp_id);
676        assert(aa); // MUST exist
677        node_new = _get_entry(aa, struct kvs_node, avl_id);
678        node_new->custom_cmp = node_old->custom_cmp;
679        node_new->seqnum = node_old->seqnum;
680        node_new->op_stat = node_old->op_stat;
681        a = avl_next(a);
682    }
683    spin_unlock(&new_file->kv_header->lock);
684    spin_unlock(&handle->file->kv_header->lock);
685}
686
687// export KV header info to raw data
688static void _fdb_kvs_header_export(struct kvs_header *kv_header,
689                                   void **data, size_t *len, uint64_t version)
690{
691    /* << raw data structure >>
692     * [# KV instances]:        8 bytes
693     * [current KV ID counter]: 8 bytes
694     * ---
695     * [name length]:           2 bytes
696     * [instance name]:         x bytes
697     * [instance ID]:           8 bytes
698     * [sequence number]:       8 bytes
699     * [# live index nodes]:    8 bytes
700     * [# docs]:                8 bytes
701     * [data size]:             8 bytes
702     * [flags]:                 8 bytes
703     * [delta size]:            8 bytes (since MAGIC_001)
704     * [# deleted docs]:        8 bytes (since MAGIC_001)
705     * ...
706     *    Please note that if the above format is changed, please also change...
707     *    _fdb_kvs_get_snap_info()
708     *    _fdb_kvs_header_import()
709     *    _kvs_stat_get_sum_doc()
710     *    _kvs_stat_get_sum_attr
711     */
712
713    int size = 0;
714    int offset = 0;
715    uint16_t name_len, _name_len;
716    uint64_t c = 0;
717    uint64_t _n_kv, _kv_id, _flags;
718    uint64_t _nlivenodes, _ndocs, _datasize, _ndeletes;
719    int64_t _deltasize;
720    fdb_kvs_id_t _id_counter;
721    fdb_seqnum_t _seqnum;
722    struct kvs_node *node;
723    struct avl_node *a;
724
725    if (kv_header == NULL) {
726        *data = NULL;
727        *len = 0;
728        return ;
729    }
730
731    spin_lock(&kv_header->lock);
732
733    // pre-scan to estimate the size of data
734    size += sizeof(uint64_t);
735    size += sizeof(fdb_kvs_id_t);
736    a = avl_first(kv_header->idx_name);
737    while(a) {
738        node = _get_entry(a, struct kvs_node, avl_name);
739        c++;
740        size += sizeof(uint16_t); // length
741        size += strlen(node->kvs_name)+1; // name
742        size += sizeof(node->id); // ID
743        size += sizeof(node->seqnum); // seq number
744        size += sizeof(node->stat.nlivenodes); // # live index nodes
745        size += sizeof(node->stat.ndocs); // # docs
746        size += sizeof(node->stat.datasize); // data size
747        size += sizeof(node->flags); // flags
748        if (ver_is_atleast_magic_001(version)) {
749            size += sizeof(node->stat.deltasize); // delta size since commit
750            size += sizeof(node->stat.ndeletes); // # deleted docs
751        }
752        a = avl_next(a);
753    }
754
755    *data = (void *)malloc(size);
756
757    // # KV instances
758    _n_kv = _endian_encode(c);
759    memcpy((uint8_t*)*data + offset, &_n_kv, sizeof(_n_kv));
760    offset += sizeof(_n_kv);
761
762    // ID counter
763    _id_counter = _endian_encode(kv_header->id_counter);
764    memcpy((uint8_t*)*data + offset, &_id_counter, sizeof(_id_counter));
765    offset += sizeof(_id_counter);
766
767    a = avl_first(kv_header->idx_name);
768    while(a) {
769        node = _get_entry(a, struct kvs_node, avl_name);
770
771        // name length
772        name_len = strlen(node->kvs_name)+1;
773        _name_len = _endian_encode(name_len);
774        memcpy((uint8_t*)*data + offset, &_name_len, sizeof(_name_len));
775        offset += sizeof(_name_len);
776
777        // name
778        memcpy((uint8_t*)*data + offset, node->kvs_name, name_len);
779        offset += name_len;
780
781        // KV ID
782        _kv_id = _endian_encode(node->id);
783        memcpy((uint8_t*)*data + offset, &_kv_id, sizeof(_kv_id));
784        offset += sizeof(_kv_id);
785
786        // seq number
787        _seqnum = _endian_encode(node->seqnum);
788        memcpy((uint8_t*)*data + offset, &_seqnum, sizeof(_seqnum));
789        offset += sizeof(_seqnum);
790
791        // # live index nodes
792        _nlivenodes = _endian_encode(node->stat.nlivenodes);
793        memcpy((uint8_t*)*data + offset, &_nlivenodes, sizeof(_nlivenodes));
794        offset += sizeof(_nlivenodes);
795
796        // # docs
797        _ndocs = _endian_encode(node->stat.ndocs);
798        memcpy((uint8_t*)*data + offset, &_ndocs, sizeof(_ndocs));
799        offset += sizeof(_ndocs);
800
801        // datasize
802        _datasize = _endian_encode(node->stat.datasize);
803        memcpy((uint8_t*)*data + offset, &_datasize, sizeof(_datasize));
804        offset += sizeof(_datasize);
805
806        // flags
807        _flags = _endian_encode(node->flags);
808        memcpy((uint8_t*)*data + offset, &_flags, sizeof(_flags));
809        offset += sizeof(_flags);
810
811        if (ver_is_atleast_magic_001(version)) {
812            // # delta index nodes + docsize created after last commit
813            _deltasize = _endian_encode(node->stat.deltasize);
814            memcpy((uint8_t*)*data + offset, &_deltasize, sizeof(_deltasize));
815            offset += sizeof(_deltasize);
816
817            // # deleted documents
818            _ndeletes = _endian_encode(node->stat.ndeletes);
819            memcpy((uint8_t*)*data + offset, &_ndeletes, sizeof(_ndeletes));
820            offset += sizeof(_ndeletes);
821        }
822
823        a = avl_next(a);
824    }
825
826    *len = size;
827
828    spin_unlock(&kv_header->lock);
829}
830
831void _fdb_kvs_header_import(struct kvs_header *kv_header,
832                            void *data, size_t len, uint64_t version,
833                            bool only_seq_nums)
834{
835    uint64_t i, offset = 0;
836    uint16_t name_len, _name_len;
837    uint64_t n_kv, _n_kv, kv_id, _kv_id, flags, _flags;
838    uint64_t _nlivenodes, _ndocs, _datasize, _ndeletes;
839    int64_t _deltasize;
840    bool is_deltasize;
841    fdb_kvs_id_t id_counter, _id_counter;
842    fdb_seqnum_t seqnum, _seqnum;
843    struct kvs_node *node;
844
845    // # KV instances
846    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
847    offset += sizeof(_n_kv);
848    n_kv = _endian_decode(_n_kv);
849
850    // ID counter
851    memcpy(&_id_counter, (uint8_t*)data + offset, sizeof(_id_counter));
852    offset += sizeof(_id_counter);
853    id_counter = _endian_decode(_id_counter);
854
855    spin_lock(&kv_header->lock);
856    kv_header->id_counter = id_counter;
857
858    // Version control
859    if (!ver_is_atleast_magic_001(version)) {
860        is_deltasize = false;
861        _deltasize = 0;
862        _ndeletes = 0;
863    } else {
864        is_deltasize = true;
865    }
866
867    for (i=0;i<n_kv;++i){
868        // name length
869        uint64_t name_offset;
870        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
871        offset += sizeof(_name_len);
872        name_offset = offset;
873        name_len = _endian_decode(_name_len);
874
875        // name
876        offset += name_len;
877
878        // KV ID
879        memcpy(&_kv_id, (uint8_t*)data + offset, sizeof(_kv_id));
880        offset += sizeof(_kv_id);
881        kv_id = _endian_decode(_kv_id);
882
883        // Search if a given KV header node exists or not.
884        struct kvs_node query;
885        query.id = kv_id;
886        struct avl_node *a = avl_search(kv_header->idx_id, &query.avl_id,
887                                        _kvs_cmp_id);
888        if (a) {
889            node = _get_entry(a, struct kvs_node, avl_id);
890        } else {
891            node = (struct kvs_node *)calloc(1, sizeof(struct kvs_node));
892            node->kvs_name = (char *)malloc(name_len);
893            memcpy(node->kvs_name, (uint8_t*)data + name_offset, name_len);
894            node->id = kv_id;
895            _init_op_stats(&node->op_stat);
896        }
897
898        // seq number
899        memcpy(&_seqnum, (uint8_t*)data + offset, sizeof(_seqnum));
900        offset += sizeof(_seqnum);
901        seqnum = _endian_decode(_seqnum);
902        node->seqnum = seqnum;
903
904        // # live index nodes
905        memcpy(&_nlivenodes, (uint8_t*)data + offset, sizeof(_nlivenodes));
906        offset += sizeof(_nlivenodes);
907
908        // # docs
909        memcpy(&_ndocs, (uint8_t*)data + offset, sizeof(_ndocs));
910        offset += sizeof(_ndocs);
911
912        // datasize
913        memcpy(&_datasize, (uint8_t*)data + offset, sizeof(_datasize));
914        offset += sizeof(_datasize);
915
916        // flags
917        memcpy(&_flags, (uint8_t*)data + offset, sizeof(_flags));
918        offset += sizeof(_flags);
919        flags = _endian_decode(_flags);
920
921        if (is_deltasize) {
922            // delta document + index size since previous commit
923            memcpy(&_deltasize, (uint8_t*)data + offset,
924                   sizeof(_deltasize));
925            offset += sizeof(_deltasize);
926            memcpy(&_ndeletes, (uint8_t*)data + offset,
927                   sizeof(_ndeletes));
928            offset += sizeof(_ndeletes);
929        }
930
931        if (!only_seq_nums) {
932            node->stat.nlivenodes = _endian_decode(_nlivenodes);
933            node->stat.ndocs = _endian_decode(_ndocs);
934            node->stat.datasize = _endian_decode(_datasize);
935            node->stat.deltasize = _endian_decode(_deltasize);
936            node->stat.ndeletes = _endian_decode(_ndeletes);
937            node->flags = flags;
938            node->custom_cmp = NULL;
939        }
940
941        if (!a) { // Insert a new KV header node if not exist.
942            avl_insert(kv_header->idx_name, &node->avl_name, _kvs_cmp_name);
943            avl_insert(kv_header->idx_id, &node->avl_id, _kvs_cmp_id);
944            ++kv_header->num_kv_stores;
945        }
946    }
947    spin_unlock(&kv_header->lock);
948}
949
950fdb_status _fdb_kvs_get_snap_info(void *data, uint64_t version,
951                                  fdb_snapshot_info_t *snap_info)
952{
953    int i, offset = 0, sizeof_skipped_segments;
954    uint16_t name_len, _name_len;
955    int64_t n_kv, _n_kv;
956    bool is_deltasize;
957    fdb_seqnum_t _seqnum;
958    // Version control
959    if (!ver_is_atleast_magic_001(version)) {
960        is_deltasize = false;
961    } else {
962        is_deltasize = true;
963    }
964
965    // # KV instances
966    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
967    offset += sizeof(_n_kv);
968    // since n_kv doesn't count the default KVS, increase it by 1.
969    n_kv = _endian_decode(_n_kv) + 1;
970    assert(n_kv); // Must have at least one kv instance
971    snap_info->kvs_markers = (fdb_kvs_commit_marker_t *)malloc(
972                                   (n_kv) * sizeof(fdb_kvs_commit_marker_t));
973    if (!snap_info->kvs_markers) { // LCOV_EXCL_START
974        return FDB_RESULT_ALLOC_FAIL;
975    } // LCOV_EXCL_STOP
976
977    snap_info->num_kvs_markers = n_kv;
978
979    // Skip over ID counter
980    offset += sizeof(fdb_kvs_id_t);
981
982    sizeof_skipped_segments = sizeof(uint64_t) // seqnum will be the last read
983                            + sizeof(uint64_t) // skip over nlivenodes
984                            + sizeof(uint64_t) // skip over ndocs
985                            + sizeof(uint64_t) // skip over datasize
986                            + sizeof(uint64_t); // skip over flags
987    if (is_deltasize) {
988        sizeof_skipped_segments += sizeof(uint64_t); // skip over deltasize
989        sizeof_skipped_segments += sizeof(uint64_t); // skip over ndeletes
990    }
991
992    for (i = 0; i < n_kv-1; ++i){
993        fdb_kvs_commit_marker_t *info = &snap_info->kvs_markers[i];
994        // Read the kv store name length
995        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
996        offset += sizeof(_name_len);
997        name_len = _endian_decode(_name_len);
998
999        // Retrieve the KV Store name
1000        info->kv_store_name = (char *)malloc(name_len); // TODO: cleanup if err
1001        memcpy(info->kv_store_name, (uint8_t*)data + offset, name_len);
1002        offset += name_len;
1003
1004        // Skip over KV ID
1005        offset += sizeof(uint64_t);
1006
1007        // Retrieve the KV Store Commit Sequence number
1008        memcpy(&_seqnum, (uint8_t*)data + offset, sizeof(_seqnum));
1009        info->seqnum = _endian_decode(_seqnum);
1010
1011        // Skip over seqnum, nlivenodes, ndocs, datasize, flags etc onto next..
1012        offset += sizeof_skipped_segments;
1013    }
1014
1015    return FDB_RESULT_SUCCESS;
1016}
1017
1018uint64_t _kvs_stat_get_sum_attr(void *data, uint64_t version,
1019                                kvs_stat_attr_t attr)
1020{
1021    uint64_t ret = 0;
1022    int i, offset = 0;
1023    uint16_t name_len, _name_len;
1024    int64_t n_kv, _n_kv;
1025    bool is_deltasize;
1026    uint64_t nlivenodes, ndocs, datasize, flags;
1027    int64_t deltasize;
1028
1029    // Version control
1030    if (!ver_is_atleast_magic_001(version)) {
1031        is_deltasize = false;
1032    } else {
1033        is_deltasize = true;
1034    }
1035
1036    // # KV instances
1037    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
1038    offset += sizeof(_n_kv);
1039    // since n_kv doesn't count the default KVS, increase it by 1.
1040    n_kv = _endian_decode(_n_kv) + 1;
1041    assert(n_kv); // Must have at least one kv instance
1042
1043    // Skip over ID counter
1044    offset += sizeof(fdb_kvs_id_t);
1045
1046    for (i = 0; i < n_kv-1; ++i){
1047        // Read the kv store name length and skip over the length
1048        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
1049        offset += sizeof(_name_len);
1050        name_len = _endian_decode(_name_len);
1051
1052        // Skip over the KV Store name
1053        offset += name_len;
1054
1055        // Skip over KV ID
1056        offset += sizeof(uint64_t);
1057
1058        // Skip over KV store seqnum
1059        offset += sizeof(uint64_t);
1060
1061        // pick just the attribute requested, skipping over rest..
1062        if (attr == KVS_STAT_NLIVENODES) {
1063            memcpy(&nlivenodes, (uint8_t *)data + offset, sizeof(nlivenodes));
1064            ret += _endian_decode(nlivenodes);
1065            // skip over nlivenodes just read
1066            offset += sizeof(nlivenodes);
1067            // skip over ndocs, datasize, flags (and deltasize, ndeletes)
1068            offset += sizeof(nlivenodes) + sizeof(ndocs) + sizeof(datasize)
1069                   + sizeof(flags) + (is_deltasize ? sizeof(deltasize)*2 : 0);
1070        } else if (attr == KVS_STAT_DATASIZE) {
1071            offset += sizeof(nlivenodes) + sizeof(ndocs);
1072            memcpy(&datasize, (uint8_t *)data + offset, sizeof(datasize));
1073            ret += _endian_decode(datasize);
1074            // skip over datasize, flags (and deltasize, ndeletes)
1075            offset += sizeof(datasize) + sizeof(flags)
1076                   + (is_deltasize ? sizeof(deltasize)*2 : 0);
1077        } else if (attr == KVS_STAT_DELTASIZE) {
1078            if (is_deltasize) {
1079                offset += sizeof(nlivenodes) + sizeof(ndocs) + sizeof (datasize)
1080                        + sizeof(flags);
1081                memcpy(&deltasize, (uint8_t *)data + offset, sizeof(deltasize));
1082                ret += _endian_decode(deltasize);
1083                // skip over datasize, flags (and deltasize)
1084                offset += sizeof(deltasize)*2; // and ndeletes
1085            }
1086        } else { // Attribute fetched not implemented yet..
1087            fdb_assert(false, 0, attr); // Implement fetch for this attribute
1088        }
1089    }
1090
1091    return ret;
1092}
1093
1094uint64_t fdb_kvs_header_append(fdb_kvs_handle *handle)
1095{
1096    char *doc_key = alca(char, 32);
1097    void *data;
1098    size_t len;
1099    uint64_t kv_info_offset, prev_offset;
1100    struct docio_object doc;
1101    struct docio_length doc_len;
1102    struct filemgr *file = handle->file;
1103    struct docio_handle *dhandle = handle->dhandle;
1104
1105    _fdb_kvs_header_export(file->kv_header, &data, &len, file->version);
1106
1107    prev_offset = handle->kv_info_offset;
1108
1109    memset(&doc, 0, sizeof(struct docio_object));
1110    sprintf(doc_key, "KV_header");
1111    doc.key = (void *)doc_key;
1112    doc.meta = NULL;
1113    doc.body = data;
1114    doc.length.keylen = strlen(doc_key) + 1;
1115    doc.length.metalen = 0;
1116    doc.length.bodylen = len;
1117    doc.seqnum = 0;
1118    kv_info_offset = docio_append_doc_system(dhandle, &doc);
1119    free(data);
1120
1121    if (prev_offset != BLK_NOT_FOUND) {
1122        if (docio_read_doc_length(handle->dhandle, &doc_len, prev_offset)
1123            == FDB_RESULT_SUCCESS) {
1124            // mark stale
1125            filemgr_mark_stale(handle->file, prev_offset, _fdb_get_docsize(doc_len));
1126        }
1127    }
1128
1129    return kv_info_offset;
1130}
1131
1132void fdb_kvs_header_read(struct kvs_header *kv_header,
1133                         struct docio_handle *dhandle,
1134                         uint64_t kv_info_offset,
1135                         uint64_t version,
1136                         bool only_seq_nums)
1137{
1138    int64_t offset;
1139    struct docio_object doc;
1140
1141    memset(&doc, 0, sizeof(struct docio_object));
1142    offset = docio_read_doc(dhandle, kv_info_offset, &doc, true);
1143
1144    if (offset <= 0) {
1145        fdb_log(dhandle->log_callback, (fdb_status) offset,
1146                "Failed to read a KV header with the offset %" _F64 " from a "
1147                "database file '%s'", kv_info_offset, dhandle->file->filename);
1148        return;
1149    }
1150
1151    _fdb_kvs_header_import(kv_header, doc.body, doc.length.bodylen,
1152                           version, only_seq_nums);
1153    free_docio_object(&doc, 1, 1, 1);
1154}
1155
1156fdb_seqnum_t fdb_kvs_get_committed_seqnum(fdb_kvs_handle *handle)
1157{
1158    uint8_t *buf;
1159    uint64_t dummy64;
1160    uint64_t version;
1161    uint64_t kv_info_offset;
1162    size_t len;
1163    bid_t hdr_bid;
1164    fdb_seqnum_t seqnum = SEQNUM_NOT_USED;
1165    fdb_kvs_id_t id = 0;
1166    char *compacted_filename = NULL;
1167    struct filemgr *file = handle->file;
1168
1169    buf = alca(uint8_t, file->config->blocksize);
1170
1171    if (handle->kvs && handle->kvs->id > 0) {
1172        id = handle->kvs->id;
1173    }
1174
1175    hdr_bid = filemgr_get_header_bid(file);
1176    if (hdr_bid == BLK_NOT_FOUND) {
1177        // header doesn't exist
1178        return 0;
1179    }
1180
1181    // read header
1182    filemgr_fetch_header(file, hdr_bid, buf, &len, &seqnum, NULL, NULL,
1183                         &version, NULL, &handle->log_callback);
1184    if (id > 0) { // non-default KVS
1185        // read last KVS header
1186        fdb_fetch_header(version, buf, &dummy64, &dummy64,
1187                         &dummy64, &dummy64, &dummy64, &dummy64,
1188                         &dummy64, &dummy64,
1189                         &kv_info_offset, &dummy64,
1190                         &compacted_filename, NULL);
1191
1192        int64_t doc_offset;
1193        struct kvs_header *kv_header;
1194        struct docio_object doc;
1195
1196        _fdb_kvs_header_create(&kv_header);
1197        memset(&doc, 0, sizeof(struct docio_object));
1198        doc_offset = docio_read_doc(handle->dhandle,
1199                                    kv_info_offset, &doc, true);
1200
1201        if (doc_offset <= 0) {
1202            // fail
1203            _fdb_kvs_header_free(kv_header);
1204            return 0;
1205
1206        } else {
1207            _fdb_kvs_header_import(kv_header, doc.body,
1208                                   doc.length.bodylen, version, false);
1209            // get local sequence number for the KV instance
1210            seqnum = _fdb_kvs_get_seqnum(kv_header,
1211                                         handle->kvs->id);
1212            _fdb_kvs_header_free(kv_header);
1213            free_docio_object(&doc, 1, 1, 1);
1214        }
1215    }
1216    return seqnum;
1217}
1218
1219LIBFDB_API
1220fdb_status fdb_get_kvs_seqnum(fdb_kvs_handle *handle, fdb_seqnum_t *seqnum)
1221{
1222    if (!handle) {
1223        return FDB_RESULT_INVALID_HANDLE;
1224    }
1225
1226    if (!seqnum) {
1227        return FDB_RESULT_INVALID_ARGS;
1228    }
1229
1230    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
1231        return FDB_RESULT_HANDLE_BUSY;
1232    }
1233
1234    if (handle->shandle) {
1235        // handle for snapshot
1236        // return MAX_SEQNUM instead of the file's sequence number
1237        *seqnum = handle->max_seqnum;
1238    } else {
1239        fdb_check_file_reopen(handle, NULL);
1240        fdb_sync_db_header(handle);
1241
1242        struct filemgr *file;
1243        file = handle->file;
1244
1245        if (handle->kvs == NULL ||
1246            handle->kvs->id == 0) {
1247            filemgr_mutex_lock(file);
1248            *seqnum = filemgr_get_seqnum(file);
1249            filemgr_mutex_unlock(file);
1250        } else {
1251            *seqnum = fdb_kvs_get_seqnum(file, handle->kvs->id);
1252        }
1253    }
1254    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
1255    return FDB_RESULT_SUCCESS;
1256}
1257
1258void fdb_kvs_set_seqnum(struct filemgr *file,
1259                           fdb_kvs_id_t id,
1260                           fdb_seqnum_t seqnum)
1261{
1262    struct kvs_header *kv_header = file->kv_header;
1263    struct kvs_node query, *node;
1264    struct avl_node *a;
1265
1266    if (id == 0) {
1267        // default KV instance
1268        filemgr_set_seqnum(file, seqnum);
1269        return;
1270    }
1271
1272    spin_lock(&kv_header->lock);
1273    query.id = id;
1274    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1275    node = _get_entry(a, struct kvs_node, avl_id);
1276    node->seqnum = seqnum;
1277    spin_unlock(&kv_header->lock);
1278}
1279
1280void _fdb_kvs_header_free(struct kvs_header *kv_header)
1281{
1282    struct kvs_node *node;
1283    struct avl_node *a;
1284
1285    a = avl_first(kv_header->idx_name);
1286    while (a) {
1287        node = _get_entry(a, struct kvs_node, avl_name);
1288        a = avl_next(a);
1289        avl_remove(kv_header->idx_name, &node->avl_name);
1290
1291        free(node->kvs_name);
1292        free(node);
1293    }
1294    free(kv_header->idx_name);
1295    free(kv_header->idx_id);
1296    free(kv_header);
1297}
1298
1299void fdb_kvs_header_free(struct filemgr *file)
1300{
1301    if (file->kv_header == NULL) {
1302        return;
1303    }
1304
1305    _fdb_kvs_header_free(file->kv_header);
1306    file->kv_header = NULL;
1307}
1308
1309static fdb_status _fdb_kvs_create(fdb_kvs_handle *root_handle,
1310                                  const char *kvs_name,
1311                                  fdb_kvs_config *kvs_config)
1312{
1313    int kv_ins_name_len;
1314    fdb_status fs = FDB_RESULT_SUCCESS;
1315    struct avl_node *a;
1316    struct filemgr *file;
1317    struct kvs_node *node, query;
1318    struct kvs_header *kv_header;
1319
1320    if (root_handle->config.multi_kv_instances == false) {
1321        // cannot open KV instance under single DB instance mode
1322        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_CONFIG,
1323                       "Cannot open or create KV store instance '%s' because multi-KV "
1324                       "store instance mode is disabled.",
1325                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1326    }
1327    if (root_handle->kvs->type != KVS_ROOT) {
1328        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_HANDLE,
1329                       "Cannot open or create KV store instance '%s' because the handle "
1330                       "doesn't support multi-KV sotre instance mode.",
1331                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1332    }
1333
1334fdb_kvs_create_start:
1335    fdb_check_file_reopen(root_handle, NULL);
1336    filemgr_mutex_lock(root_handle->file);
1337    fdb_sync_db_header(root_handle);
1338
1339    if (filemgr_is_rollback_on(root_handle->file)) {
1340        filemgr_mutex_unlock(root_handle->file);
1341        return FDB_RESULT_FAIL_BY_ROLLBACK;
1342    }
1343
1344    file = root_handle->file;
1345
1346    file_status_t fstatus = filemgr_get_file_status(file);
1347    if (fstatus == FILE_REMOVED_PENDING) {
1348        // we must not write into this file
1349        // file status was changed by other thread .. start over
1350        filemgr_mutex_unlock(file);
1351        goto fdb_kvs_create_start;
1352    }
1353
1354    kv_header = file->kv_header;
1355    spin_lock(&kv_header->lock);
1356
1357    // find existing KV instance
1358    // search by name
1359    query.kvs_name = (char*)kvs_name;
1360    a = avl_search(kv_header->idx_name, &query.avl_name, _kvs_cmp_name);
1361    if (a) { // KV name already exists
1362        spin_unlock(&kv_header->lock);
1363        filemgr_mutex_unlock(file);
1364        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1365                       "Failed to create KV Store '%s' as it already exists.",
1366                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1367    }
1368
1369    // create a kvs_node and insert
1370    node = (struct kvs_node *)calloc(1, sizeof(struct kvs_node));
1371    node->id = kv_header->id_counter++;
1372    node->seqnum = 0;
1373    node->flags = 0x0;
1374    _init_op_stats(&node->op_stat);
1375    // search fhandle's custom cmp func list first
1376    node->custom_cmp = fdb_kvs_find_cmp_name(root_handle,
1377                                             (char *)kvs_name);
1378    if (node->custom_cmp == NULL && kvs_config->custom_cmp) {
1379        // follow kvs_config's custom cmp next
1380        node->custom_cmp = kvs_config->custom_cmp;
1381        // if custom cmp function is given by user but
1382        // there is no corresponding function in fhandle's list
1383        // add it into the list
1384        fdb_file_handle_add_cmp_func(root_handle->fhandle,
1385                                     (char*)kvs_name,
1386                                     kvs_config->custom_cmp);
1387    }
1388    if (node->custom_cmp) { // custom cmp function is used
1389        node->flags |= KVS_FLAG_CUSTOM_CMP;
1390        kv_header->custom_cmp_enabled = 1;
1391    }
1392    kv_ins_name_len = strlen(kvs_name)+1;
1393    node->kvs_name = (char *)malloc(kv_ins_name_len);
1394    strcpy(node->kvs_name, kvs_name);
1395
1396    avl_insert(kv_header->idx_name, &node->avl_name, _kvs_cmp_name);
1397    avl_insert(kv_header->idx_id, &node->avl_id, _kvs_cmp_id);
1398    ++kv_header->num_kv_stores;
1399    spin_unlock(&kv_header->lock);
1400
1401    // if compaction is in-progress,
1402    // create a same kvs_node for the new file
1403    if (file->new_file &&
1404        filemgr_get_file_status(file) == FILE_COMPACT_OLD) {
1405        struct kvs_node *node_new;
1406        struct kvs_header *kv_header_new;
1407
1408        kv_header_new = file->new_file->kv_header;
1409        node_new = (struct kvs_node*)calloc(1, sizeof(struct kvs_node));
1410        *node_new = *node;
1411        node_new->kvs_name = (char*)malloc(kv_ins_name_len);
1412        strcpy(node_new->kvs_name, kvs_name);
1413
1414        // insert into new file's kv_header
1415        spin_lock(&kv_header_new->lock);
1416        if (node->custom_cmp) {
1417            kv_header_new->custom_cmp_enabled = 1;
1418        }
1419        avl_insert(kv_header_new->idx_name, &node_new->avl_name, _kvs_cmp_name);
1420        avl_insert(kv_header_new->idx_id, &node_new->avl_id, _kvs_cmp_id);
1421        spin_unlock(&kv_header_new->lock);
1422    }
1423
1424    // since this function calls filemgr_commit() and appends a new DB header,
1425    // we should finalize & flush the previous dirty update before commit.
1426    bid_t dirty_idtree_root = BLK_NOT_FOUND;
1427    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
1428    struct filemgr_dirty_update_node *prev_node = NULL;
1429    struct filemgr_dirty_update_node *new_node = NULL;
1430
1431    _fdb_dirty_update_ready(root_handle, &prev_node, &new_node,
1432                            &dirty_idtree_root, &dirty_seqtree_root, false);
1433
1434    _fdb_dirty_update_finalize(root_handle, prev_node, new_node,
1435                               &dirty_idtree_root, &dirty_seqtree_root, true);
1436
1437    // append system doc
1438    root_handle->kv_info_offset = fdb_kvs_header_append(root_handle);
1439
1440    // if no compaction is being performed, append header and commit
1441    if (root_handle->file == file) {
1442        uint64_t cur_bmp_revnum = sb_get_bmp_revnum(file);
1443        root_handle->last_hdr_bid = filemgr_alloc(file, &root_handle->log_callback);
1444        root_handle->cur_header_revnum = fdb_set_file_header(root_handle, true);
1445        fs = filemgr_commit_bid(root_handle->file,
1446                                root_handle->last_hdr_bid,
1447                                cur_bmp_revnum,
1448                                !(root_handle->config.durability_opt & FDB_DRB_ASYNC),
1449                                &root_handle->log_callback);
1450        btreeblk_reset_subblock_info(root_handle->bhandle);
1451    }
1452
1453    filemgr_mutex_unlock(file);
1454
1455    return fs;
1456}
1457
1458// this function just returns pointer
1459char* _fdb_kvs_get_name(fdb_kvs_handle *handle, struct filemgr *file)
1460{
1461    struct kvs_node *node, query;
1462    struct avl_node *a;
1463
1464    if (handle->kvs == NULL) {
1465        // single KV instance mode
1466        return NULL;
1467    }
1468
1469    query.id = handle->kvs->id;
1470    if (query.id == 0) { // default KV instance
1471        return NULL;
1472    }
1473    spin_lock(&file->kv_header->lock);
1474    a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1475    if (a) {
1476        node = _get_entry(a, struct kvs_node, avl_id);
1477        spin_unlock(&file->kv_header->lock);
1478        return node->kvs_name;
1479    }
1480    spin_unlock(&file->kv_header->lock);
1481    return NULL;
1482}
1483
1484// this function just returns pointer to kvs_name & offset to user key
1485const char* _fdb_kvs_extract_name_off(fdb_kvs_handle *handle, void *keybuf,
1486                                      size_t *key_offset)
1487{
1488    struct kvs_node *node, query;
1489    struct avl_node *a;
1490    fdb_kvs_id_t kv_id;
1491    struct filemgr *file = handle->file;
1492
1493    if (!handle->kvs) { // single KV instance mode
1494        *key_offset = 0;
1495        return DEFAULT_KVS_NAME;
1496    }
1497
1498    *key_offset = handle->config.chunksize;
1499    buf2kvid(*key_offset, keybuf, &kv_id);
1500    query.id = kv_id;
1501    if (query.id == 0) { // default KV instance in multi kvs mode
1502        return default_kvs_name;
1503    }
1504    spin_lock(&file->kv_header->lock);
1505    a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1506    if (a) {
1507        node = _get_entry(a, struct kvs_node, avl_id);
1508        const char *kvs_name = node->kvs_name;
1509        spin_unlock(&file->kv_header->lock);
1510        return kvs_name;
1511    }
1512    spin_unlock(&file->kv_header->lock);
1513    return NULL;
1514}
1515
1516fdb_status _fdb_kvs_clone_snapshot(fdb_kvs_handle *handle_in,
1517                                   fdb_kvs_handle *handle_out)
1518{
1519    fdb_status fs;
1520    fdb_kvs_handle *root_handle = handle_in->kvs->root;
1521
1522    if (!handle_out->kvs) {
1523        // create kvs_info
1524        handle_out->kvs = (struct kvs_info*)calloc(1, sizeof(struct kvs_info));
1525        handle_out->kvs->type = handle_in->kvs->type;
1526        handle_out->kvs->id = handle_in->kvs->id;
1527        handle_out->kvs->root = root_handle;
1528        handle_out->kvs_config.custom_cmp = handle_in->kvs_config.custom_cmp;
1529
1530        struct kvs_opened_node *opened_node = (struct kvs_opened_node *)
1531            calloc(1, sizeof(struct kvs_opened_node));
1532        opened_node->handle = handle_out;
1533        handle_out->node = opened_node;
1534
1535        spin_lock(&root_handle->fhandle->lock);
1536        list_push_back(root_handle->fhandle->handles, &opened_node->le);
1537        spin_unlock(&root_handle->fhandle->lock);
1538    }
1539
1540    fs = _fdb_clone_snapshot(handle_in, handle_out);
1541    if (fs != FDB_RESULT_SUCCESS) {
1542        if (handle_out->node) {
1543            spin_lock(&root_handle->fhandle->lock);
1544            list_remove(root_handle->fhandle->handles, &handle_out->node->le);
1545            spin_unlock(&root_handle->fhandle->lock);
1546            free(handle_out->node);
1547        }
1548        free(handle_out->kvs);
1549    }
1550    return fs;
1551}
1552
1553// 1) allocate memory & create 'handle->kvs'
1554//    by calling fdb_kvs_info_create().
1555//      -> this will allocate a corresponding node and
1556//         insert it into fhandle->handles list.
1557// 2) if matching KVS name doesn't exist, create it.
1558// 3) call _fdb_open().
1559fdb_status _fdb_kvs_open(fdb_kvs_handle *root_handle,
1560                         fdb_config *config,
1561                         fdb_kvs_config *kvs_config,
1562                         struct filemgr *file,
1563                         const char *filename,
1564                         const char *kvs_name,
1565                         fdb_kvs_handle *handle)
1566{
1567    fdb_status fs;
1568
1569    if (handle->kvs == NULL) {
1570        // create kvs_info
1571        fdb_kvs_info_create(root_handle, handle, file, kvs_name);
1572    }
1573
1574    if (handle->kvs == NULL) {
1575        // KV instance name is not found
1576        if (!kvs_config->create_if_missing) {
1577            return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1578                           "Failed to open KV store '%s' because it doesn't exist.",
1579                           kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1580        }
1581        if (root_handle->config.flags == FDB_OPEN_FLAG_RDONLY) {
1582            return fdb_log(&root_handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
1583                           "Failed to create KV store '%s' because the KV store's handle "
1584                           "is read-only.", kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1585        }
1586
1587        // create
1588        fs = _fdb_kvs_create(root_handle, kvs_name, kvs_config);
1589        if (fs != FDB_RESULT_SUCCESS) { // create fail
1590            return FDB_RESULT_INVALID_KV_INSTANCE_NAME;
1591        }
1592        // create kvs_info again
1593        fdb_kvs_info_create(root_handle, handle, file, kvs_name);
1594        if (handle->kvs == NULL) { // fail again
1595            return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1596                           "Failed to create KV store '%s' because the KV store's handle "
1597                           "is read-only.", kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1598        }
1599    }
1600    fs = _fdb_open(handle, filename, FDB_AFILENAME, config);
1601    if (fs != FDB_RESULT_SUCCESS) {
1602        if (handle->node) {
1603            spin_lock(&root_handle->fhandle->lock);
1604            list_remove(root_handle->fhandle->handles, &handle->node->le);
1605            spin_unlock(&root_handle->fhandle->lock);
1606            free(handle->node);
1607        } // 'handle->node == NULL' happens only during rollback
1608        free(handle->kvs);
1609    }
1610    return fs;
1611}
1612
1613// 1) identify whether the requested KVS is default or non-default.
1614// 2) if the requested KVS is default,
1615//   2-1) As the root handle is already opened,
1616//        -> allocate memory for handle, and call _fdb_open().
1617//        -> 'handle->kvs' will be created in _fdb_open(),
1618//           since it is treated as a default handle.
1619//        -> allocate a corresponding node and insert it into
1620//           fhandle->handles list.
1621// 3) if the requested KVS is non-default,
1622//    -> allocate memory for handle, and call _fdb_kvs_open().
1623LIBFDB_API
1624fdb_status fdb_kvs_open(fdb_file_handle *fhandle,
1625                        fdb_kvs_handle **ptr_handle,
1626                        const char *kvs_name,
1627                        fdb_kvs_config *kvs_config)
1628{
1629    fdb_kvs_handle *handle;
1630    fdb_config config;
1631    fdb_status fs;
1632    fdb_kvs_handle *root_handle;
1633    fdb_kvs_config config_local;
1634    struct filemgr *file = NULL;
1635    struct filemgr *latest_file = NULL;
1636    LATENCY_STAT_START();
1637
1638    if (!fhandle || !fhandle->root) {
1639        return FDB_RESULT_INVALID_HANDLE;
1640    }
1641
1642    root_handle = fhandle->root;
1643    config = root_handle->config;
1644
1645    if (kvs_config) {
1646        if (validate_fdb_kvs_config(kvs_config)) {
1647            config_local = *kvs_config;
1648        } else {
1649            return FDB_RESULT_INVALID_CONFIG;
1650        }
1651    } else {
1652        config_local = get_default_kvs_config();
1653    }
1654
1655    fdb_check_file_reopen(root_handle, NULL);
1656    fdb_sync_db_header(root_handle);
1657
1658    file = root_handle->file;
1659    latest_file = root_handle->file;
1660
1661    if (kvs_name == NULL || !strcmp(kvs_name, default_kvs_name)) {
1662        // return the default KV store handle
1663        spin_lock(&fhandle->lock);
1664        if (!(fhandle->flags & FHANDLE_ROOT_OPENED)) {
1665            // the root handle is not opened yet
1666            // sync up the root handle
1667            fdb_custom_cmp_variable default_kvs_cmp;
1668
1669            root_handle->kvs_config = config_local;
1670
1671            if (root_handle->file->kv_header) {
1672                // search fhandle's custom cmp func list first
1673                default_kvs_cmp = fdb_kvs_find_cmp_name(root_handle, (char *)kvs_name);
1674
1675                spin_lock(&root_handle->file->kv_header->lock);
1676                root_handle->file->kv_header->default_kvs_cmp = default_kvs_cmp;
1677
1678                if (root_handle->file->kv_header->default_kvs_cmp == NULL &&
1679                    root_handle->kvs_config.custom_cmp) {
1680                    // follow kvs_config's custom cmp next
1681                    root_handle->file->kv_header->default_kvs_cmp =
1682                        root_handle->kvs_config.custom_cmp;
1683                    fdb_file_handle_add_cmp_func(fhandle, NULL,
1684                                                 root_handle->kvs_config.custom_cmp);
1685                }
1686
1687                if (root_handle->file->kv_header->default_kvs_cmp) {
1688                    root_handle->file->kv_header->custom_cmp_enabled = 1;
1689                    fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1690                }
1691                spin_unlock(&root_handle->file->kv_header->lock);
1692            }
1693
1694            fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1695            fhandle->flags |= FHANDLE_ROOT_OPENED;
1696        }
1697        // the root handle is already synced
1698        // open new default KV store handle
1699        spin_unlock(&fhandle->lock);
1700        handle = (fdb_kvs_handle*)calloc(1, sizeof(fdb_kvs_handle));
1701        handle->kvs_config = config_local;
1702        atomic_init_uint8_t(&handle->handle_busy, 0);
1703
1704        handle->fhandle = fhandle;
1705        fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1706        if (fs != FDB_RESULT_SUCCESS) {
1707            free(handle);
1708            *ptr_handle = NULL;
1709        } else {
1710            // insert into fhandle's list
1711            _fdb_kvs_createNLinkKVHandle(fhandle, handle);
1712            *ptr_handle = handle;
1713        }
1714        LATENCY_STAT_END(file, FDB_LATENCY_KVS_OPEN);
1715        return fs;
1716    }
1717
1718    if (config.multi_kv_instances == false) {
1719        // cannot open KV instance under single DB instance mode
1720        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_CONFIG,
1721                       "Cannot open KV store instance '%s' because multi-KV "
1722                       "store instance mode is disabled.",
1723                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1724    }
1725    if (root_handle->kvs->type != KVS_ROOT) {
1726        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_HANDLE,
1727                       "Cannot open KV store instance '%s' because the handle "
1728                       "doesn't support multi-KV sotre instance mode.",
1729                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1730    }
1731    if (root_handle->shandle) {
1732        // cannot open KV instance from a snapshot
1733        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_ARGS,
1734                       "Not allowed to open KV store instance '%s' from the "
1735                       "snapshot handle.",
1736                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1737    }
1738
1739    handle = (fdb_kvs_handle *)calloc(1, sizeof(fdb_kvs_handle));
1740    if (!handle) { // LCOV_EXCL_START
1741        return FDB_RESULT_ALLOC_FAIL;
1742    } // LCOV_EXCL_STOP
1743
1744    atomic_init_uint8_t(&handle->handle_busy, 0);
1745    handle->fhandle = fhandle;
1746    fs = _fdb_kvs_open(root_handle, &config, &config_local,
1747                       latest_file, file->filename, kvs_name, handle);
1748    if (fs == FDB_RESULT_SUCCESS) {
1749        *ptr_handle = handle;
1750    } else {
1751        *ptr_handle = NULL;
1752        free(handle);
1753    }
1754    LATENCY_STAT_END(file, FDB_LATENCY_KVS_OPEN);
1755    return fs;
1756}
1757
1758LIBFDB_API
1759fdb_status fdb_kvs_open_default(fdb_file_handle *fhandle,
1760                                fdb_kvs_handle **ptr_handle,
1761                                fdb_kvs_config *config)
1762{
1763    return fdb_kvs_open(fhandle, ptr_handle, NULL, config);
1764}
1765
1766// 1) remove corresponding node from fhandle->handles list.
1767// 2) call _fdb_close().
1768fdb_status _fdb_kvs_close(fdb_kvs_handle *handle)
1769{
1770    fdb_kvs_handle *root_handle = handle->fhandle->root;
1771    fdb_status fs;
1772
1773    if (handle->node) {
1774        spin_lock(&root_handle->fhandle->lock);
1775        list_remove(root_handle->fhandle->handles, &handle->node->le);
1776        spin_unlock(&root_handle->fhandle->lock);
1777        free(handle->node);
1778    } // 'handle->node == NULL' happens only during rollback
1779
1780    fs = _fdb_close(handle);
1781    return fs;
1782}
1783
1784// close all sub-KV store handles belonging to the root handle
1785fdb_status fdb_kvs_close_all(fdb_kvs_handle *root_handle)
1786{
1787    fdb_status fs;
1788    struct list_elem *e;
1789    struct kvs_opened_node *node;
1790
1791    spin_lock(&root_handle->fhandle->lock);
1792    e = list_begin(root_handle->fhandle->handles);
1793    while (e) {
1794        node = _get_entry(e, struct kvs_opened_node, le);
1795        e = list_remove(root_handle->fhandle->handles, &node->le);
1796        fs = _fdb_close(node->handle);
1797        if (fs != FDB_RESULT_SUCCESS) {
1798            spin_unlock(&root_handle->fhandle->lock);
1799            return fs;
1800        }
1801        fdb_kvs_info_free(node->handle);
1802        free(node->handle);
1803        free(node);
1804    }
1805    spin_unlock(&root_handle->fhandle->lock);
1806
1807    return FDB_RESULT_SUCCESS;
1808}
1809
1810// 1) identify whether the requested handle is for default KVS or not.
1811// 2) if the requested handle is for the default KVS,
1812//   2-1) if the requested handle must be the root handle,
1813//        -> call _fdb_close(),
1814//        -> free 'handle->kvs' by calling fdb_kvs_info_free(),
1815//        -> remove the corresponding node from fhandle->handles list,
1816//        -> free the memory for the handle.
1817// 3) if the requested handle is for non-default KVS,
1818//    -> call _fdb_kvs_close(),
1819//       -> this will remove the node from fhandle->handles list.
1820//    -> free 'handle->kvs' by calling fdb_kvs_info_free(),
1821//    -> free the memory for the handle.
1822LIBFDB_API
1823fdb_status fdb_kvs_close(fdb_kvs_handle *handle)
1824{
1825    fdb_status fs;
1826
1827    if (!handle) {
1828        return FDB_RESULT_INVALID_HANDLE;
1829    }
1830    if (handle->num_iterators) {
1831        // There are still active iterators created from this handle
1832        return FDB_RESULT_KV_STORE_BUSY;
1833    }
1834
1835    if (handle->shandle && handle->kvs == NULL) {
1836        // snapshot of the default KV store + single KV store mode
1837        // directly close handle
1838        // (snapshot of the other KV stores will be closed
1839        //  using _fdb_kvs_close(...) below)
1840        fs = _fdb_close(handle);
1841        if (fs == FDB_RESULT_SUCCESS) {
1842            free(handle);
1843        }
1844        return fs;
1845    }
1846
1847    if (handle->kvs == NULL ||
1848        handle->kvs->type == KVS_ROOT) {
1849        // the default KV store handle
1850
1851        fdb_assert(handle->fhandle->root != handle, handle, NULL);
1852        // the default KV store but not the root handle .. normally close
1853        spin_lock(&handle->fhandle->lock);
1854        fs = _fdb_close(handle);
1855        if (fs == FDB_RESULT_SUCCESS) {
1856            // remove from 'handles' list in the root node
1857            if (handle->kvs) {
1858                fdb_kvs_info_free(handle);
1859            }
1860            list_remove(handle->fhandle->handles, &handle->node->le);
1861            spin_unlock(&handle->fhandle->lock);
1862            free(handle->node);
1863            free(handle);
1864        } else {
1865            spin_unlock(&handle->fhandle->lock);
1866        }
1867        return fs;
1868    }
1869
1870    if (handle->kvs && handle->kvs->root == NULL) {
1871        return FDB_RESULT_INVALID_ARGS;
1872    }
1873    fs = _fdb_kvs_close(handle);
1874    if (fs == FDB_RESULT_SUCCESS) {
1875        fdb_kvs_info_free(handle);
1876        free(handle);
1877    }
1878    return fs;
1879}
1880
1881static
1882fdb_status _fdb_kvs_remove(fdb_file_handle *fhandle,
1883                           const char *kvs_name,
1884                           bool rollback_recreate)
1885{
1886    size_t size_chunk, size_id;
1887    uint8_t *_kv_id;
1888    fdb_status fs = FDB_RESULT_SUCCESS;
1889    fdb_kvs_id_t kv_id = 0;
1890    fdb_kvs_handle *root_handle;
1891    struct avl_node *a = NULL;
1892    struct filemgr *file;
1893    struct kvs_node *node, query;
1894    struct kvs_header *kv_header;
1895
1896    if (!fhandle || !fhandle->root) {
1897        return FDB_RESULT_INVALID_HANDLE;
1898    }
1899
1900    root_handle = fhandle->root;
1901
1902    if (root_handle->config.multi_kv_instances == false) {
1903        // cannot remove the KV instance under single DB instance mode
1904        return FDB_RESULT_INVALID_CONFIG;
1905    }
1906    if (root_handle->kvs->type != KVS_ROOT) {
1907        return FDB_RESULT_INVALID_HANDLE;
1908    }
1909
1910fdb_kvs_remove_start:
1911    fdb_check_file_reopen(root_handle, NULL);
1912    filemgr_mutex_lock(root_handle->file);
1913    fdb_sync_db_header(root_handle);
1914
1915    if (!rollback_recreate) {
1916        if (filemgr_is_rollback_on(root_handle->file)) {
1917            filemgr_mutex_unlock(root_handle->file);
1918            return FDB_RESULT_FAIL_BY_ROLLBACK;
1919        }
1920    }
1921
1922    file = root_handle->file;
1923
1924    file_status_t fstatus = filemgr_get_file_status(file);
1925    if (fstatus == FILE_REMOVED_PENDING) {
1926        // we must not write into this file
1927        // file status was changed by other thread .. start over
1928        filemgr_mutex_unlock(file);
1929        goto fdb_kvs_remove_start;
1930    } else if (fstatus == FILE_COMPACT_OLD) {
1931        // Cannot remove existing KV store during compaction.
1932        // To remove a KV store, the corresponding first chunk in HB+trie
1933        // should be unlinked. This can be possible in the old file during
1934        // compaction, but impossible in the new file, since existing documents
1935        // (including docs belonging to the KV store to be removed) are being moved.
1936        filemgr_mutex_unlock(file);
1937        return FDB_RESULT_FAIL_BY_COMPACTION;
1938    }
1939
1940    // find the kvs_node and remove
1941
1942    // search by name to get ID
1943    if (kvs_name == NULL || !strcmp(kvs_name, default_kvs_name)) {
1944        if (!rollback_recreate) {
1945            // default KV store .. KV ID = 0
1946            kv_id = 0;
1947            if (_fdb_kvs_any_handle_opened(fhandle, kv_id)) {
1948                // there is an opened handle
1949                filemgr_mutex_unlock(file);
1950                return FDB_RESULT_KV_STORE_BUSY;
1951            }
1952        }
1953        // reset KVS stats (excepting for WAL stats)
1954        file->header.stat.ndocs = 0;
1955        file->header.stat.nlivenodes = 0;
1956        file->header.stat.datasize = 0;
1957        file->header.stat.deltasize = 0;
1958
1959        // reset seqnum
1960        filemgr_set_seqnum(file, 0);
1961    } else {
1962        kv_header = file->kv_header;
1963        spin_lock(&kv_header->lock);
1964        query.kvs_name = (char*)kvs_name;
1965        a = avl_search(kv_header->idx_name, &query.avl_name, _kvs_cmp_name);
1966        if (a == NULL) { // KV name doesn't exist
1967            spin_unlock(&kv_header->lock);
1968            filemgr_mutex_unlock(file);
1969            return FDB_RESULT_KV_STORE_NOT_FOUND;
1970        }
1971        node = _get_entry(a, struct kvs_node, avl_name);
1972        kv_id = node->id;
1973
1974        if (!rollback_recreate) {
1975            spin_unlock(&kv_header->lock);
1976            if (_fdb_kvs_any_handle_opened(fhandle, kv_id)) {
1977                // there is an opened handle
1978                filemgr_mutex_unlock(file);
1979                return FDB_RESULT_KV_STORE_BUSY;
1980            }
1981            spin_lock(&kv_header->lock);
1982
1983            avl_remove(kv_header->idx_name, &node->avl_name);
1984            avl_remove(kv_header->idx_id, &node->avl_id);
1985            --kv_header->num_kv_stores;
1986            spin_unlock(&kv_header->lock);
1987
1988            kv_id = node->id;
1989
1990            // free node
1991            free(node->kvs_name);
1992            free(node);
1993        } else {
1994            // reset all stats except for WAL
1995            node->stat.ndocs = 0;
1996            node->stat.nlivenodes = 0;
1997            node->stat.datasize = 0;
1998            node->stat.deltasize = 0;
1999            node->seqnum = 0;
2000            spin_unlock(&kv_header->lock);
2001        }
2002    }
2003
2004    // discard all WAL entries
2005    wal_close_kv_ins(file, kv_id, &root_handle->log_callback);
2006
2007    bid_t dirty_idtree_root = BLK_NOT_FOUND;
2008    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
2009    struct filemgr_dirty_update_node *prev_node = NULL, *new_node = NULL;
2010
2011    _fdb_dirty_update_ready(root_handle, &prev_node, &new_node,
2012                            &dirty_idtree_root, &dirty_seqtree_root, false);
2013
2014    size_id = sizeof(fdb_kvs_id_t);
2015    size_chunk = root_handle->trie->chunksize;
2016
2017    // remove from super handle's HB+trie
2018    _kv_id = alca(uint8_t, size_chunk);
2019    kvid2buf(size_chunk, kv_id, _kv_id);
2020    hbtrie_remove_partial(root_handle->trie, _kv_id, size_chunk);
2021    btreeblk_end(root_handle->bhandle);
2022
2023    if (root_handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2024        _kv_id = alca(uint8_t, size_id);
2025        kvid2buf(size_id, kv_id, _kv_id);
2026        hbtrie_remove_partial(root_handle->seqtrie, _kv_id, size_id);
2027        btreeblk_end(root_handle->bhandle);
2028    }
2029
2030    _fdb_dirty_update_finalize(root_handle, prev_node, new_node,
2031                               &dirty_idtree_root, &dirty_seqtree_root, true);
2032
2033    // append system doc
2034    root_handle->kv_info_offset = fdb_kvs_header_append(root_handle);
2035
2036    // if no compaction is being performed, append header and commit
2037    if (root_handle->file == file) {
2038        uint64_t cur_bmp_revnum = sb_get_bmp_revnum(file);
2039        root_handle->last_hdr_bid = filemgr_alloc(file, &root_handle->log_callback);
2040        root_handle->cur_header_revnum = fdb_set_file_header(root_handle, true);
2041        fs = filemgr_commit_bid(root_handle->file,
2042                                root_handle->last_hdr_bid,
2043                                cur_bmp_revnum,
2044                                !(root_handle->config.durability_opt & FDB_DRB_ASYNC),
2045                                &root_handle->log_callback);
2046        btreeblk_reset_subblock_info(root_handle->bhandle);
2047    }
2048
2049    filemgr_mutex_unlock(file);
2050
2051    return fs;
2052}
2053
2054bool _fdb_kvs_is_busy(fdb_file_handle *fhandle)
2055{
2056    bool ret = false;
2057    struct filemgr *file = fhandle->root->file;
2058    struct avl_node *a;
2059    struct filemgr_fhandle_idx_node *fhandle_node;
2060    fdb_file_handle *file_handle;
2061
2062    spin_lock(&file->fhandle_idx_lock);
2063    a = avl_first(&file->fhandle_idx);
2064    while (a) {
2065        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
2066        a = avl_next(a);
2067        file_handle = (fdb_file_handle *) fhandle_node->fhandle;
2068        spin_lock(&file_handle->lock);
2069        if (list_begin(file_handle->handles) != NULL) {
2070            ret = true;
2071            spin_unlock(&file_handle->lock);
2072            break;
2073        }
2074        spin_unlock(&file_handle->lock);
2075    }
2076    spin_unlock(&file->fhandle_idx_lock);
2077
2078    return ret;
2079}
2080
2081fdb_status fdb_kvs_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
2082{
2083    fdb_config config;
2084    fdb_kvs_config kvs_config;
2085    fdb_kvs_handle *handle_in, *handle, *super_handle;
2086    fdb_status fs;
2087    fdb_seqnum_t old_seqnum;
2088    fdb_file_handle *fhandle;
2089    char *kvs_name;
2090
2091    if (!handle_ptr) {
2092        return FDB_RESULT_INVALID_HANDLE;
2093    }
2094
2095    handle_in = *handle_ptr;
2096
2097    if (!handle_in) {
2098        return FDB_RESULT_INVALID_HANDLE;
2099    }
2100
2101    if (!handle_in->kvs) {
2102        return FDB_RESULT_INVALID_ARGS;
2103    }
2104    super_handle = handle_in->kvs->root;
2105    fhandle = handle_in->fhandle;
2106    config = handle_in->config;
2107    kvs_config = handle_in->kvs_config;
2108
2109    if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
2110        return fdb_log(&handle_in->log_callback,
2111                       FDB_RESULT_RONLY_VIOLATION,
2112                       "Warning: Rollback is not allowed on "
2113                       "the read-only DB file '%s'.",
2114                       handle_in->file->filename);
2115    }
2116
2117    filemgr_mutex_lock(handle_in->file);
2118    filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
2119    // All transactions should be closed before rollback
2120    if (wal_txn_exists(handle_in->file)) {
2121        filemgr_set_rollback(handle_in->file, 0);
2122        filemgr_mutex_unlock(handle_in->file);
2123        return FDB_RESULT_FAIL_BY_TRANSACTION;
2124    }
2125
2126    // If compaction is running, wait until it is aborted.
2127    // TODO: Find a better way of waiting for the compaction abortion.
2128    unsigned int sleep_time = 10000; // 10 ms.
2129    file_status_t fstatus = filemgr_get_file_status(handle_in->file);
2130    while (fstatus == FILE_COMPACT_OLD) {
2131        filemgr_mutex_unlock(handle_in->file);
2132        decaying_usleep(&sleep_time, 1000000);
2133        filemgr_mutex_lock(handle_in->file);
2134        fstatus = filemgr_get_file_status(handle_in->file);
2135    }
2136    if (fstatus == FILE_REMOVED_PENDING) {
2137        filemgr_mutex_unlock(handle_in->file);
2138        fdb_check_file_reopen(handle_in, NULL);
2139    } else {
2140        filemgr_mutex_unlock(handle_in->file);
2141    }
2142
2143    fdb_sync_db_header(handle_in);
2144
2145    // if the max sequence number seen by this handle is lower than the
2146    // requested snapshot marker, it means the snapshot is not yet visible
2147    // even via the current fdb_kvs_handle
2148    if (seqnum > handle_in->seqnum) {
2149        filemgr_set_rollback(super_handle->file, 0); // allow mutations
2150        return FDB_RESULT_NO_DB_INSTANCE;
2151    }
2152
2153    kvs_name = _fdb_kvs_get_name(handle_in, handle_in->file);
2154    if (seqnum == 0) { // Handle special case of rollback to zero..
2155        fs = _fdb_kvs_remove(fhandle, kvs_name, true /*recreate!*/);
2156        filemgr_set_rollback(super_handle->file, 0); // allow mutations
2157        return fs;
2158    }
2159
2160    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
2161    if (!handle) { // LCOV_EXCL_START
2162        filemgr_set_rollback(handle_in->file, 0); // allow mutations
2163        return FDB_RESULT_ALLOC_FAIL;
2164    } // LCOV_EXCL_STOP
2165
2166    handle->max_seqnum = seqnum;
2167    handle->log_callback = handle_in->log_callback;
2168    handle->fhandle = fhandle;
2169    atomic_init_uint8_t(&handle->handle_busy, 0);
2170
2171    if (handle_in->kvs->type == KVS_SUB) {
2172        fs = _fdb_kvs_open(handle_in->kvs->root,
2173                           &config,
2174                           &kvs_config,
2175                           handle_in->file,
2176                           handle_in->file->filename,
2177                           kvs_name,
2178                           handle);
2179    } else {
2180        fs = _fdb_open(handle, handle_in->file->filename,
2181                       FDB_AFILENAME, &config);
2182    }
2183    filemgr_set_rollback(handle_in->file, 0); // allow mutations
2184
2185    if (fs == FDB_RESULT_SUCCESS) {
2186        // get KV instance's sub B+trees' root node BIDs
2187        // from both ID-tree and Seq-tree, AND
2188        // replace current handle's sub B+trees' root node BIDs
2189        // by old BIDs
2190        size_t size_chunk, size_id;
2191        bid_t id_root, seq_root, dummy;
2192        uint8_t *_kv_id;
2193        hbtrie_result hr;
2194
2195        size_chunk = handle->trie->chunksize;
2196        size_id = sizeof(fdb_kvs_id_t);
2197
2198        filemgr_mutex_lock(handle_in->file);
2199
2200        // read root BID of the KV instance from the old handle
2201        // and overwrite into the current handle
2202        _kv_id = alca(uint8_t, size_chunk);
2203        kvid2buf(size_chunk, handle->kvs->id, _kv_id);
2204        hr = hbtrie_find_partial(handle->trie, _kv_id,
2205                                 size_chunk, &id_root);
2206        btreeblk_end(handle->bhandle);
2207        if (hr == HBTRIE_RESULT_SUCCESS) {
2208            hbtrie_insert_partial(super_handle->trie,
2209                                  _kv_id, size_chunk,
2210                                  &id_root, &dummy);
2211        } else { // No Trie info in rollback header.
2212                 // Erase kv store from super handle's main index.
2213            hbtrie_remove_partial(super_handle->trie, _kv_id, size_chunk);
2214        }
2215        btreeblk_end(super_handle->bhandle);
2216
2217        if (config.seqtree_opt == FDB_SEQTREE_USE) {
2218            // same as above for seq-trie
2219            _kv_id = alca(uint8_t, size_id);
2220            kvid2buf(size_id, handle->kvs->id, _kv_id);
2221            hr = hbtrie_find_partial(handle->seqtrie, _kv_id,
2222                                     size_id, &seq_root);
2223            btreeblk_end(handle->bhandle);
2224            if (hr == HBTRIE_RESULT_SUCCESS) {
2225                hbtrie_insert_partial(super_handle->seqtrie,
2226                                      _kv_id, size_id,
2227                                      &seq_root, &dummy);
2228            } else { // No seqtrie info in rollback header.
2229                     // Erase kv store from super handle's seqtrie index.
2230                hbtrie_remove_partial(super_handle->seqtrie, _kv_id, size_id);
2231            }
2232            btreeblk_end(super_handle->bhandle);
2233        }
2234
2235        old_seqnum = fdb_kvs_get_seqnum(handle_in->file,
2236                                        handle_in->kvs->id);
2237        fdb_kvs_set_seqnum(handle_in->file,
2238                           handle_in->kvs->id, seqnum);
2239        handle_in->seqnum = seqnum;
2240        filemgr_mutex_unlock(handle_in->file);
2241
2242        super_handle->rollback_revnum = handle->rollback_revnum;
2243        fs = _fdb_commit(super_handle, FDB_COMMIT_MANUAL_WAL_FLUSH,
2244                         !(handle_in->config.durability_opt & FDB_DRB_ASYNC));
2245        if (fs == FDB_RESULT_SUCCESS) {
2246            _fdb_kvs_close(handle);
2247            *handle_ptr = handle_in;
2248            fdb_kvs_info_free(handle);
2249            free(handle);
2250        } else {
2251            // cancel the rolling-back of the sequence number
2252            fdb_log(&handle_in->log_callback, fs,
2253                    "Rollback failed due to a commit failure with a sequence "
2254                    "number %" _F64, seqnum);
2255            filemgr_mutex_lock(handle_in->file);
2256            fdb_kvs_set_seqnum(handle_in->file,
2257                               handle_in->kvs->id, old_seqnum);
2258            filemgr_mutex_unlock(handle_in->file);
2259            _fdb_kvs_close(handle);
2260            fdb_kvs_info_free(handle);
2261            free(handle);
2262        }
2263    } else {
2264        free(handle);
2265    }
2266
2267    return fs;
2268}
2269
2270LIBFDB_API
2271fdb_status fdb_kvs_remove(fdb_file_handle *fhandle,
2272                          const char *kvs_name)
2273{
2274    return _fdb_kvs_remove(fhandle, kvs_name, false);
2275}
2276
2277LIBFDB_API
2278fdb_status fdb_get_kvs_info(fdb_kvs_handle *handle, fdb_kvs_info *info)
2279{
2280    uint64_t ndocs;
2281    uint64_t ndeletes;
2282    uint64_t wal_docs;
2283    uint64_t wal_deletes;
2284    uint64_t wal_n_inserts;
2285    uint64_t datasize;
2286    uint64_t nlivenodes;
2287    fdb_kvs_id_t kv_id;
2288    struct avl_node *a;
2289    struct filemgr *file;
2290    struct kvs_node *node, query;
2291    struct kvs_header *kv_header;
2292    struct kvs_stat stat;
2293
2294    if (!handle) {
2295        return FDB_RESULT_INVALID_HANDLE;
2296    }
2297
2298    if (!info) {
2299        return FDB_RESULT_INVALID_ARGS;
2300    }
2301
2302    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2303        return FDB_RESULT_HANDLE_BUSY;
2304    }
2305
2306    if (!handle->shandle) { // snapshot handle should be immutable
2307        fdb_check_file_reopen(handle, NULL);
2308        fdb_sync_db_header(handle);
2309    }
2310
2311    file = handle->file;
2312
2313    if (handle->kvs == NULL) {
2314        info->name = default_kvs_name;
2315        kv_id = 0;
2316
2317    } else {
2318        kv_header = file->kv_header;
2319        kv_id = handle->kvs->id;
2320        spin_lock(&kv_header->lock);
2321
2322        query.id = handle->kvs->id;
2323        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
2324        if (a) { // sub handle
2325            node = _get_entry(a, struct kvs_node, avl_id);
2326            info->name = (const char*)node->kvs_name;
2327        } else { // root handle
2328            info->name = default_kvs_name;
2329        }
2330        spin_unlock(&kv_header->lock);
2331    }
2332
2333    if (handle->shandle) {
2334        // snapshot .. get its local stats
2335        snap_get_stat(handle->shandle, &stat);
2336    } else {
2337        _kvs_stat_get(file, kv_id, &stat);
2338    }
2339    ndocs = stat.ndocs;
2340    ndeletes = stat.ndeletes;
2341    wal_docs = stat.wal_ndocs;
2342    wal_deletes = stat.wal_ndeletes;
2343    wal_n_inserts = wal_docs - wal_deletes;
2344
2345    if (ndocs + wal_n_inserts < wal_deletes) {
2346        info->doc_count = 0;
2347    } else {
2348        if (ndocs) { // not accurate since some ndocs may be in wal_n_inserts
2349            info->doc_count = ndocs + wal_n_inserts - wal_deletes;
2350        } else { // this is accurate
2351            info->doc_count = wal_n_inserts;
2352        }
2353    }
2354
2355    if (ndeletes) { // not accurate since some ndeletes may be wal_n_deletes
2356        info->deleted_count = ndeletes + wal_deletes;
2357    } else { // this is accurate
2358        info->deleted_count = wal_deletes;
2359    }
2360
2361    datasize = stat.datasize;
2362    nlivenodes = stat.nlivenodes;
2363
2364    info->space_used = datasize;
2365    info->space_used += nlivenodes * handle->config.blocksize;
2366    info->file = handle->fhandle;
2367
2368    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
2369
2370    // This is another LIBFDB_API call, so handle is marked as free
2371    // in the line above before making this call
2372    fdb_get_kvs_seqnum(handle, &info->last_seqnum);
2373
2374    return FDB_RESULT_SUCCESS;
2375}
2376
2377LIBFDB_API
2378fdb_status fdb_get_kvs_ops_info(fdb_kvs_handle *handle, fdb_kvs_ops_info *info)
2379{
2380    fdb_kvs_id_t kv_id;
2381    struct filemgr *file;
2382    struct kvs_ops_stat stat;
2383    struct kvs_ops_stat root_stat;
2384
2385    if (!handle) {
2386        return FDB_RESULT_INVALID_HANDLE;
2387    }
2388
2389    if (!info) {
2390        return FDB_RESULT_INVALID_ARGS;
2391    }
2392
2393    fdb_kvs_handle *root_handle = handle->fhandle->root;
2394
2395    // for snapshot handle do not reopen new file as user is interested in
2396    // reader stats from the old file
2397    if (!handle->shandle) {
2398        // always get stats from the latest file
2399        fdb_check_file_reopen(handle, NULL);
2400        fdb_sync_db_header(handle);
2401    }
2402
2403    file = handle->file;
2404
2405    if (handle->kvs == NULL) {
2406        kv_id = 0;
2407    } else {
2408        kv_id = handle->kvs->id;
2409    }
2410
2411    _kvs_ops_stat_get(file, kv_id, &stat);
2412
2413    if (root_handle != handle) {
2414        _kvs_ops_stat_get(file, 0, &root_stat);
2415    } else {
2416        root_stat = stat;
2417    }
2418
2419    info->num_sets = atomic_get_uint64_t(&stat.num_sets, std::memory_order_relaxed);
2420    info->num_dels = atomic_get_uint64_t(&stat.num_dels, std::memory_order_relaxed);
2421    info->num_gets = atomic_get_uint64_t(&stat.num_gets, std::memory_order_relaxed);
2422    info->num_iterator_gets = atomic_get_uint64_t(&stat.num_iterator_gets,
2423                                                  std::memory_order_relaxed);
2424    info->num_iterator_gets = atomic_get_uint64_t(&stat.num_iterator_gets,
2425                                                  std::memory_order_relaxed);
2426    info->num_iterator_moves = atomic_get_uint64_t(&stat.num_iterator_moves,
2427                                                   std::memory_order_relaxed);
2428
2429    info->num_commits = atomic_get_uint64_t(&root_stat.num_commits,
2430                                            std::memory_order_relaxed);
2431    info->num_compacts = atomic_get_uint64_t(&root_stat.num_compacts,
2432                                             std::memory_order_relaxed);
2433    return FDB_RESULT_SUCCESS;
2434}
2435
2436LIBFDB_API
2437fdb_status fdb_get_kvs_name_list(fdb_file_handle *fhandle,
2438                                 fdb_kvs_name_list *kvs_name_list)
2439{
2440    size_t num, size, offset;
2441    char *ptr;
2442    char **segment;
2443    fdb_kvs_handle *root_handle;
2444    struct kvs_header *kv_header;
2445    struct kvs_node *node;
2446    struct avl_node *a;
2447
2448    if (!fhandle) {
2449        return FDB_RESULT_INVALID_HANDLE;
2450    }
2451
2452    if (!kvs_name_list) {
2453        return FDB_RESULT_INVALID_ARGS;
2454    }
2455
2456    root_handle = fhandle->root;
2457    kv_header = root_handle->file->kv_header;
2458
2459    spin_lock(&kv_header->lock);
2460    // sum all lengths of KVS names first
2461    // (to calculate the size of memory segment to be allocated)
2462    num = 1;
2463    size = strlen(default_kvs_name) + 1;
2464    a = avl_first(kv_header->idx_id);
2465    while (a) {
2466        node = _get_entry(a, struct kvs_node, avl_id);
2467        a = avl_next(&node->avl_id);
2468
2469        num++;
2470        size += strlen(node->kvs_name) + 1;
2471    }
2472    size += num * sizeof(char*);
2473
2474    // allocate memory segment
2475    segment = (char**)calloc(1, size);
2476    kvs_name_list->num_kvs_names = num;
2477    kvs_name_list->kvs_names = segment;
2478
2479    ptr = (char*)segment + num * sizeof(char*);
2480    offset = num = 0;
2481
2482    // copy default KVS name
2483    strcpy(ptr + offset, default_kvs_name);
2484    segment[num] = ptr + offset;
2485    num++;
2486    offset += strlen(default_kvs_name) + 1;
2487
2488    // copy the others
2489    a = avl_first(kv_header->idx_name);
2490    while (a) {
2491        node = _get_entry(a, struct kvs_node, avl_name);
2492        a = avl_next(&node->avl_name);
2493
2494        strcpy(ptr + offset, node->kvs_name);
2495        segment[num] = ptr + offset;
2496
2497        num++;
2498        offset += strlen(node->kvs_name) + 1;
2499    }
2500
2501    spin_unlock(&kv_header->lock);
2502
2503    return FDB_RESULT_SUCCESS;
2504}
2505
2506LIBFDB_API
2507fdb_status fdb_free_kvs_name_list(fdb_kvs_name_list *kvs_name_list)
2508{
2509    if (!kvs_name_list) {
2510        return FDB_RESULT_INVALID_ARGS;
2511    }
2512
2513    free(kvs_name_list->kvs_names);
2514    kvs_name_list->kvs_names = NULL;
2515    kvs_name_list->num_kvs_names = 0;
2516
2517    return FDB_RESULT_SUCCESS;
2518}
2519
2520stale_header_info fdb_get_smallest_active_header(fdb_kvs_handle *handle)
2521{
2522    uint8_t *hdr_buf = alca(uint8_t, handle->config.blocksize);
2523    size_t i, hdr_len;
2524    uint64_t n_headers;
2525    bid_t hdr_bid, last_wal_bid;
2526    filemgr_header_revnum_t hdr_revnum;
2527    filemgr_header_revnum_t cur_revnum;
2528    filemgr_magic_t magic;
2529    fdb_seqnum_t seqnum;
2530    fdb_file_handle *fhandle = NULL;
2531    stale_header_info ret;
2532    struct avl_node *a;
2533    struct filemgr_fhandle_idx_node *fhandle_node;
2534    struct list_elem *e;
2535    struct kvs_opened_node *item;
2536
2537    ret.revnum = cur_revnum = handle->fhandle->root->cur_header_revnum;
2538    ret.bid = handle->fhandle->root->last_hdr_bid;
2539
2540    spin_lock(&handle->file->fhandle_idx_lock);
2541
2542    // check all opened file handles
2543    a = avl_first(&handle->file->fhandle_idx);
2544    while (a) {
2545        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
2546        a = avl_next(a);
2547
2548        fhandle = (fdb_file_handle*)fhandle_node->fhandle;
2549        spin_lock(&fhandle->lock);
2550        // check all opened KVS handles belonging to the file handle
2551        e = list_begin(fhandle->handles);
2552        while (e) {
2553
2554            item = _get_entry(e, struct kvs_opened_node, le);
2555            e = list_next(e);
2556
2557            if (!item->handle->shandle) {
2558                // Only consider active snapshot handles since non-snapshot
2559                // handles will get synced upon their next forestdb api call.
2560                // This prevents "lazy" non-snapshot handles from holding up
2561                // stale block reclaim.
2562                continue;
2563            }
2564
2565            if (item->handle->cur_header_revnum < ret.revnum) {
2566                ret.revnum = item->handle->cur_header_revnum;
2567                ret.bid = item->handle->last_hdr_bid;
2568            }
2569        }
2570        spin_unlock(&fhandle->lock);
2571    }
2572
2573    spin_unlock(&handle->file->fhandle_idx_lock);
2574
2575    uint64_t num_keeping_headers =
2576        atomic_get_uint64_t(&handle->file->config->num_keeping_headers,
2577                            std::memory_order_relaxed);
2578    if (num_keeping_headers) {
2579        // backward scan previous header info to keep more headers
2580
2581        if (ret.bid == handle->last_hdr_bid) {
2582            // header in 'handle->last_hdr_bid' is not written into file yet!
2583            // we should start from the previous header
2584            hdr_bid = atomic_get_uint64_t(&handle->file->header.bid);
2585            hdr_revnum = handle->file->header.revnum - 1;
2586        } else {
2587            hdr_bid = ret.bid;
2588            hdr_revnum = ret.revnum;
2589        }
2590
2591        n_headers= num_keeping_headers;
2592        if (cur_revnum - hdr_revnum < n_headers) {
2593            n_headers = n_headers - (cur_revnum - hdr_revnum);
2594        } else {
2595            n_headers = 0;
2596        }
2597
2598        for (i=0; i<n_headers; ++i) {
2599            hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
2600                         hdr_buf, &hdr_len, &seqnum, &hdr_revnum, NULL,
2601                         &magic, NULL, &handle->log_callback);
2602            if (hdr_len) {
2603                ret.revnum = hdr_revnum;
2604                ret.bid = hdr_bid;
2605            } else {
2606                break;
2607            }
2608        }
2609    }
2610
2611    // although we keep more headers from the oldest active header, we have to
2612    // preserve the last WAL flushing header from the target header for data
2613    // consistency.
2614    uint64_t dummy64;
2615    char *new_filename;
2616
2617    filemgr_fetch_header(handle->file, ret.bid, hdr_buf, &hdr_len, &seqnum,
2618                         &hdr_revnum, NULL, &magic, NULL, &handle->log_callback);
2619    fdb_fetch_header(magic, hdr_buf, &dummy64, &dummy64, &dummy64, &dummy64,
2620                     &dummy64, &dummy64, &dummy64, &last_wal_bid, &dummy64,
2621                     &dummy64, &new_filename, NULL);
2622
2623    if (last_wal_bid != BLK_NOT_FOUND) {
2624        filemgr_fetch_header(handle->file, last_wal_bid, hdr_buf, &hdr_len, &seqnum,
2625                             &hdr_revnum, NULL, &magic, NULL, &handle->log_callback);
2626        ret.bid = last_wal_bid;
2627        ret.revnum = hdr_revnum;
2628    } else {
2629        // WAL has not been flushed yet .. we cannot trigger block reusing
2630        ret.bid = BLK_NOT_FOUND;
2631        ret.revnum = 0;
2632    }
2633
2634    return ret;
2635}
2636
2637