xref: /6.0.3/forestdb/src/kv_instance.cc (revision 5403f419)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdlib.h>
19#include <string.h>
20
21#include "libforestdb/forestdb.h"
22#include "common.h"
23#include "internal_types.h"
24#include "fdb_internal.h"
25#include "configuration.h"
26#include "avltree.h"
27#include "list.h"
28#include "docio.h"
29#include "filemgr.h"
30#include "wal.h"
31#include "hbtrie.h"
32#include "btreeblock.h"
33#include "version.h"
34#include "staleblock.h"
35
36#include "memleak.h"
37#include "time_utils.h"
38
39static const char *default_kvs_name = DEFAULT_KVS_NAME;
40
41// list element for opened KV store handles
42// (in-memory data: managed by the file handle)
43struct kvs_opened_node {
44    fdb_kvs_handle *handle;
45    struct list_elem le;
46};
47
48// list element for custom cmp functions in fhandle
49struct cmp_func_node {
50    char *kvs_name;
51    fdb_custom_cmp_variable func;
52    struct list_elem le;
53};
54
55static int _kvs_cmp_name(struct avl_node *a, struct avl_node *b, void *aux)
56{
57    struct kvs_node *aa, *bb;
58    aa = _get_entry(a, struct kvs_node, avl_name);
59    bb = _get_entry(b, struct kvs_node, avl_name);
60    return strcmp(aa->kvs_name, bb->kvs_name);
61}
62
63static int _kvs_cmp_id(struct avl_node *a, struct avl_node *b, void *aux)
64{
65    struct kvs_node *aa, *bb;
66    aa = _get_entry(a, struct kvs_node, avl_id);
67    bb = _get_entry(b, struct kvs_node, avl_id);
68
69    if (aa->id < bb->id) {
70        return -1;
71    } else if (aa->id > bb->id) {
72        return 1;
73    } else {
74        return 0;
75    }
76}
77
78static bool _fdb_kvs_any_handle_opened(fdb_file_handle *fhandle,
79                                       fdb_kvs_id_t kv_id)
80{
81    struct filemgr *file = fhandle->root->file;
82    struct avl_node *a;
83    struct list_elem *e;
84    struct filemgr_fhandle_idx_node *fhandle_node;
85    struct kvs_opened_node *opened_node;
86    fdb_file_handle *file_handle;
87
88    spin_lock(&file->fhandle_idx_lock);
89    a = avl_first(&file->fhandle_idx);
90    while (a) {
91        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
92        a = avl_next(a);
93        file_handle = (fdb_file_handle *) fhandle_node->fhandle;
94        spin_lock(&file_handle->lock);
95        e = list_begin(file_handle->handles);
96        while (e) {
97            opened_node = _get_entry(e, struct kvs_opened_node, le);
98            if ((opened_node->handle->kvs && opened_node->handle->kvs->id == kv_id) ||
99                (kv_id == 0 && opened_node->handle->kvs == NULL)) // single KVS mode
100            {
101                // there is an opened handle
102                spin_unlock(&file_handle->lock);
103                spin_unlock(&file->fhandle_idx_lock);
104                return true;
105            }
106            e = list_next(e);
107        }
108        spin_unlock(&file_handle->lock);
109    }
110    spin_unlock(&file->fhandle_idx_lock);
111
112    return false;
113}
114
115void fdb_file_handle_init(fdb_file_handle *fhandle,
116                           fdb_kvs_handle *root)
117{
118    fhandle->root = root;
119    fhandle->flags = 0x0;
120    root->fhandle = fhandle;
121    fhandle->handles = (struct list*)calloc(1, sizeof(struct list));
122    fhandle->cmp_func_list = NULL;
123    spin_init(&fhandle->lock);
124}
125
126void fdb_file_handle_close_all(fdb_file_handle *fhandle)
127{
128    struct list_elem *e;
129    struct kvs_opened_node *node;
130
131    spin_lock(&fhandle->lock);
132    e = list_begin(fhandle->handles);
133    while (e) {
134        node = _get_entry(e, struct kvs_opened_node, le);
135        e = list_next(e);
136        _fdb_close(node->handle);
137        free(node->handle);
138        free(node);
139    }
140    spin_unlock(&fhandle->lock);
141}
142
143void fdb_file_handle_parse_cmp_func(fdb_file_handle *fhandle,
144                                    size_t n_func,
145                                    char **kvs_names,
146                                    fdb_custom_cmp_variable *functions)
147{
148    uint64_t i;
149    struct cmp_func_node *node;
150
151    if (n_func == 0 || !kvs_names || !functions) {
152        return;
153    }
154
155    fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
156    list_init(fhandle->cmp_func_list);
157
158    for (i=0;i<n_func;++i){
159        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
160        if (kvs_names[i]) {
161            node->kvs_name = (char*)calloc(1, strlen(kvs_names[i])+1);
162            strcpy(node->kvs_name, kvs_names[i]);
163        } else {
164            // NULL .. default KVS
165            node->kvs_name = NULL;
166        }
167        node->func = functions[i];
168        list_push_back(fhandle->cmp_func_list, &node->le);
169    }
170}
171
172// clone all items in cmp_func_list to fhandle->cmp_func_list
173void fdb_file_handle_clone_cmp_func_list(fdb_file_handle *fhandle,
174                                         struct list *cmp_func_list)
175{
176    struct list_elem *e;
177    struct cmp_func_node *src, *dst;
178
179    if (fhandle->cmp_func_list || /* already exist */
180        !cmp_func_list) {
181        return;
182    }
183
184    fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
185    list_init(fhandle->cmp_func_list);
186
187    e = list_begin(cmp_func_list);
188    while (e) {
189        src = _get_entry(e, struct cmp_func_node, le);
190        dst = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
191        if (src->kvs_name) {
192            dst->kvs_name = (char*)calloc(1, strlen(src->kvs_name)+1);
193            strcpy(dst->kvs_name, src->kvs_name);
194        } else {
195            dst->kvs_name = NULL; // default KVS
196        }
197        dst->func = src->func;
198        list_push_back(fhandle->cmp_func_list, &dst->le);
199        e = list_next(&src->le);
200    }
201}
202
203void fdb_file_handle_add_cmp_func(fdb_file_handle *fhandle,
204                                  char *kvs_name,
205                                  fdb_custom_cmp_variable cmp_func)
206{
207    struct cmp_func_node *node;
208
209    // create list if not exist
210    if (!fhandle->cmp_func_list) {
211        fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
212        list_init(fhandle->cmp_func_list);
213    }
214
215    node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
216    if (kvs_name) {
217        node->kvs_name = (char*)calloc(1, strlen(kvs_name)+1);
218        strcpy(node->kvs_name, kvs_name);
219    } else {
220        // default KVS
221        node->kvs_name = NULL;
222    }
223    node->func = cmp_func;
224    list_push_back(fhandle->cmp_func_list, &node->le);
225}
226
227void fdb_cmp_func_list_from_filemgr(struct filemgr *file, struct list *cmp_func_list)
228{
229    if (!file || !file->kv_header || !cmp_func_list) {
230        return;
231    }
232
233    struct cmp_func_node *node;
234
235    spin_lock(&file->kv_header->lock);
236    // Default KV store cmp function
237    if (file->kv_header->default_kvs_cmp) {
238        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
239        node->func = file->kv_header->default_kvs_cmp;
240        node->kvs_name = NULL;
241        list_push_back(cmp_func_list, &node->le);
242    }
243
244    // Rest of KV stores
245    struct kvs_node *kvs_node;
246    struct avl_node *a = avl_first(file->kv_header->idx_name);
247    while (a) {
248        kvs_node = _get_entry(a, struct kvs_node, avl_name);
249        a = avl_next(a);
250        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
251        node->func = kvs_node->custom_cmp;
252        node->kvs_name = (char*)calloc(1, strlen(kvs_node->kvs_name)+1);
253        strcpy(node->kvs_name, kvs_node->kvs_name);
254        list_push_back(cmp_func_list, &node->le);
255    }
256    spin_unlock(&file->kv_header->lock);
257}
258
259void fdb_free_cmp_func_list(struct list *cmp_func_list)
260{
261    if (!cmp_func_list) {
262        return;
263    }
264
265    struct cmp_func_node *cmp_node;
266    struct list_elem *e = list_begin(cmp_func_list);
267    while (e) {
268        cmp_node = _get_entry(e, struct cmp_func_node, le);
269        e = list_remove(cmp_func_list, &cmp_node->le);
270        free(cmp_node->kvs_name);
271        free(cmp_node);
272    }
273}
274
275static void _free_cmp_func_list(fdb_file_handle *fhandle)
276{
277    struct list_elem *e;
278    struct cmp_func_node *cmp_node;
279
280    if (!fhandle->cmp_func_list) {
281        return;
282    }
283
284    e = list_begin(fhandle->cmp_func_list);
285    while (e) {
286        cmp_node = _get_entry(e, struct cmp_func_node, le);
287        e = list_remove(fhandle->cmp_func_list, &cmp_node->le);
288
289        free(cmp_node->kvs_name);
290        free(cmp_node);
291    }
292    free(fhandle->cmp_func_list);
293    fhandle->cmp_func_list = NULL;
294}
295
296void fdb_file_handle_free(fdb_file_handle *fhandle)
297{
298    free(fhandle->handles);
299    _free_cmp_func_list(fhandle);
300    spin_destroy(&fhandle->lock);
301    free(fhandle);
302}
303
304fdb_status fdb_kvs_cmp_check(fdb_kvs_handle *handle)
305{
306    int ori_flag;
307    fdb_file_handle *fhandle = handle->fhandle;
308    fdb_custom_cmp_variable ori_custom_cmp;
309    struct filemgr *file = handle->file;
310    struct cmp_func_node *cmp_node;
311    struct kvs_node *kvs_node, query;
312    struct list_elem *e;
313    struct avl_node *a;
314
315    spin_lock(&file->kv_header->lock);
316    ori_flag = file->kv_header->custom_cmp_enabled;
317    ori_custom_cmp = file->kv_header->default_kvs_cmp;
318
319    if (fhandle->cmp_func_list) {
320        handle->kvs_config.custom_cmp = NULL;
321
322        e = list_begin(fhandle->cmp_func_list);
323        while (e) {
324            cmp_node = _get_entry(e, struct cmp_func_node, le);
325            if (cmp_node->kvs_name == NULL ||
326                    !strcmp(cmp_node->kvs_name, default_kvs_name)) { // default KVS
327                handle->kvs_config.custom_cmp = cmp_node->func;
328                file->kv_header->default_kvs_cmp = cmp_node->func;
329                file->kv_header->custom_cmp_enabled = 1;
330            } else {
331                // search by name
332                query.kvs_name = cmp_node->kvs_name;
333                a = avl_search(file->kv_header->idx_name,
334                               &query.avl_name,
335                               _kvs_cmp_name);
336                if (a) { // found
337                    kvs_node = _get_entry(a, struct kvs_node, avl_name);
338                    if (!kvs_node->custom_cmp) {
339                        kvs_node->custom_cmp = cmp_node->func;
340                    }
341                    file->kv_header->custom_cmp_enabled = 1;
342                }
343            }
344            e = list_next(&cmp_node->le);
345        }
346    }
347
348    // first check the default KVS
349    // 1. root handle has not been opened yet: don't care
350    // 2. root handle was opened before: must match the flag
351    if (fhandle->flags & FHANDLE_ROOT_INITIALIZED) {
352        if (fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP &&
353            handle->kvs_config.custom_cmp == NULL) {
354            // custom cmp function was assigned before,
355            // but no custom cmp function is assigned
356            file->kv_header->custom_cmp_enabled = ori_flag;
357            file->kv_header->default_kvs_cmp = ori_custom_cmp;
358            spin_unlock(&file->kv_header->lock);
359            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
360            if (!kvs_name) {
361                kvs_name = DEFAULT_KVS_NAME;
362            }
363            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
364                           "Error! Tried to open a KV store '%s', which was created with "
365                           "custom compare function enabled, without passing the same "
366                           "custom compare function.", kvs_name);
367        }
368        if (!(fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP) &&
369              handle->kvs_config.custom_cmp) {
370            // custom cmp function was not assigned before,
371            // but custom cmp function is assigned from user
372            file->kv_header->custom_cmp_enabled = ori_flag;
373            file->kv_header->default_kvs_cmp = ori_custom_cmp;
374            spin_unlock(&file->kv_header->lock);
375            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
376            if (!kvs_name) {
377                kvs_name = DEFAULT_KVS_NAME;
378            }
379            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
380                           "Error! Tried to open a KV store '%s', which was created without "
381                           "custom compare function, by passing custom compare function.",
382                    kvs_name);
383        }
384    }
385
386    // next check other KVSs
387    a = avl_first(file->kv_header->idx_name);
388    while (a) {
389        kvs_node = _get_entry(a, struct kvs_node, avl_name);
390        a = avl_next(a);
391
392        if (kvs_node->flags & KVS_FLAG_CUSTOM_CMP &&
393            kvs_node->custom_cmp == NULL) {
394            // custom cmp function was assigned before,
395            // but no custom cmp function is assigned
396            file->kv_header->custom_cmp_enabled = ori_flag;
397            file->kv_header->default_kvs_cmp = ori_custom_cmp;
398            spin_unlock(&file->kv_header->lock);
399            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
400            if (!kvs_name) {
401                kvs_name = DEFAULT_KVS_NAME;
402            }
403            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
404                           "Error! Tried to open a KV store '%s', which was created with "
405                           "custom compare function enabled, without passing the same "
406                           "custom compare function.", kvs_name);
407        }
408        if (!(kvs_node->flags & KVS_FLAG_CUSTOM_CMP) &&
409              kvs_node->custom_cmp) {
410            // custom cmp function was not assigned before,
411            // but custom cmp function is assigned from user
412            file->kv_header->custom_cmp_enabled = ori_flag;
413            file->kv_header->default_kvs_cmp = ori_custom_cmp;
414            spin_unlock(&file->kv_header->lock);
415            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
416            if (!kvs_name) {
417                kvs_name = DEFAULT_KVS_NAME;
418            }
419            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
420                           "Error! Tried to open a KV store '%s', which was created without "
421                           "custom compare function, by passing custom compare function.",
422                           kvs_name);
423        }
424    }
425
426    spin_unlock(&file->kv_header->lock);
427    return FDB_RESULT_SUCCESS;
428}
429
430fdb_custom_cmp_variable fdb_kvs_find_cmp_name(fdb_kvs_handle *handle,
431                                              char *kvs_name)
432{
433    fdb_file_handle *fhandle;
434    struct list_elem *e;
435    struct cmp_func_node *cmp_node;
436
437    fhandle = handle->fhandle;
438    if (!fhandle->cmp_func_list) {
439        return NULL;
440    }
441
442    e = list_begin(fhandle->cmp_func_list);
443    while (e) {
444        cmp_node = _get_entry(e, struct cmp_func_node, le);
445        if (kvs_name == NULL ||
446            !strcmp(kvs_name, default_kvs_name)) {
447            if (cmp_node->kvs_name == NULL ||
448                !strcmp(cmp_node->kvs_name, default_kvs_name)) { // default KVS
449                return cmp_node->func;
450            }
451        } else if (cmp_node->kvs_name &&
452                   !strcmp(cmp_node->kvs_name, kvs_name)) {
453            return cmp_node->func;
454        }
455        e = list_next(&cmp_node->le);
456    }
457    return NULL;
458}
459
460hbtrie_cmp_func *fdb_kvs_find_cmp_chunk(void *chunk, void *aux)
461{
462    fdb_kvs_id_t kv_id;
463    struct hbtrie *trie = (struct hbtrie *)aux;
464    struct btreeblk_handle *bhandle;
465    struct filemgr *file;
466    struct avl_node *a;
467    struct kvs_node query, *node;
468
469    bhandle = (struct btreeblk_handle*)trie->btreeblk_handle;
470    file = bhandle->file;
471
472    if (!file->kv_header->custom_cmp_enabled) {
473        return NULL;
474    }
475
476    buf2kvid(trie->chunksize, chunk, &kv_id);
477
478    // search by id
479    if (kv_id > 0) {
480        query.id = kv_id;
481        spin_lock(&file->kv_header->lock);
482        a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
483        spin_unlock(&file->kv_header->lock);
484
485        if (a) {
486            node = _get_entry(a, struct kvs_node, avl_id);
487            return (hbtrie_cmp_func *)node->custom_cmp;
488        }
489    } else {
490        // root handle
491        return (hbtrie_cmp_func *)file->kv_header->default_kvs_cmp;
492    }
493    return NULL;
494}
495
496void _fdb_kvs_init_root(fdb_kvs_handle *handle, struct filemgr *file) {
497    handle->kvs->type = KVS_ROOT;
498    handle->kvs->root = handle->fhandle->root;
499    // super handle's ID is always 0
500    handle->kvs->id = 0;
501    // force custom cmp function
502    spin_lock(&file->kv_header->lock);
503    handle->kvs_config.custom_cmp = file->kv_header->default_kvs_cmp;
504    spin_unlock(&file->kv_header->lock);
505}
506
507void fdb_kvs_info_create(fdb_kvs_handle *root_handle,
508                         fdb_kvs_handle *handle,
509                         struct filemgr *file,
510                         const char *kvs_name)
511{
512    struct kvs_node query, *kvs_node;
513    struct kvs_opened_node *opened_node;
514    struct avl_node *a;
515
516    handle->kvs = (struct kvs_info*)calloc(1, sizeof(struct kvs_info));
517
518    if (root_handle == NULL) {
519        // 'handle' is a super handle
520        _fdb_kvs_init_root(handle, file);
521    } else {
522        // 'handle' is a sub handle (i.e., KV instance in a DB instance)
523        handle->kvs->type = KVS_SUB;
524        handle->kvs->root = root_handle;
525
526        if (kvs_name) {
527            spin_lock(&file->kv_header->lock);
528            query.kvs_name = (char*)kvs_name;
529            a = avl_search(file->kv_header->idx_name, &query.avl_name,
530                           _kvs_cmp_name);
531            if (a == NULL) {
532                // KV instance name is not found
533                free(handle->kvs);
534                handle->kvs = NULL;
535                spin_unlock(&file->kv_header->lock);
536                return;
537            }
538            kvs_node = _get_entry(a, struct kvs_node, avl_name);
539            handle->kvs->id = kvs_node->id;
540            // force custom cmp function
541            handle->kvs_config.custom_cmp = kvs_node->custom_cmp;
542            spin_unlock(&file->kv_header->lock);
543        } else {
544            // snapshot of the root handle
545            handle->kvs->id = 0;
546        }
547
548        opened_node = (struct kvs_opened_node *)
549               calloc(1, sizeof(struct kvs_opened_node));
550        opened_node->handle = handle;
551
552        handle->node = opened_node;
553        spin_lock(&root_handle->fhandle->lock);
554        list_push_back(root_handle->fhandle->handles, &opened_node->le);
555        spin_unlock(&root_handle->fhandle->lock);
556    }
557}
558
559void fdb_kvs_info_free(fdb_kvs_handle *handle)
560{
561    if (handle->kvs == NULL) {
562        return;
563    }
564
565    free(handle->kvs);
566    handle->kvs = NULL;
567}
568
569void _fdb_kvs_header_create(struct kvs_header **kv_header_ptr)
570{
571    struct kvs_header *kv_header;
572
573    kv_header = (struct kvs_header *)calloc(1, sizeof(struct kvs_header));
574    *kv_header_ptr = kv_header;
575
576    // KV ID '0' is reserved for default KV instance (super handle)
577    kv_header->id_counter = 1;
578    kv_header->default_kvs_cmp = NULL;
579    kv_header->custom_cmp_enabled = 0;
580    kv_header->idx_name = (struct avl_tree*)malloc(sizeof(struct avl_tree));
581    kv_header->idx_id = (struct avl_tree*)malloc(sizeof(struct avl_tree));
582    kv_header->num_kv_stores = 0;
583    avl_init(kv_header->idx_name, NULL);
584    avl_init(kv_header->idx_id, NULL);
585    spin_init(&kv_header->lock);
586}
587
588void fdb_kvs_header_create(struct filemgr *file)
589{
590    if (file->kv_header) {
591        return; // already exist
592    }
593
594    _fdb_kvs_header_create(&file->kv_header);
595    file->free_kv_header = fdb_kvs_header_free;
596}
597
598void fdb_kvs_header_reset_all_stats(struct filemgr *file)
599{
600    struct avl_node *a;
601    struct kvs_node *node;
602    struct kvs_header *kv_header = file->kv_header;
603
604    spin_lock(&kv_header->lock);
605    a = avl_first(kv_header->idx_id);
606    while (a) {
607        node = _get_entry(a, struct kvs_node, avl_id);
608        a = avl_next(&node->avl_id);
609        memset(&node->stat, 0x0, sizeof(node->stat));
610    }
611    spin_unlock(&kv_header->lock);
612}
613
614void fdb_kvs_header_copy(fdb_kvs_handle *handle,
615                         struct filemgr *new_file,
616                         struct docio_handle *new_dhandle,
617                         uint64_t *new_file_kv_info_offset,
618                         bool create_new)
619{
620    struct avl_node *a, *aa;
621    struct kvs_node *node_old, *node_new;
622
623    if (create_new) {
624        struct kvs_header *kv_header;
625        // copy KV header data in 'handle' to new file
626        _fdb_kvs_header_create(&kv_header);
627        // read from 'handle->dhandle', and import into 'new_file'
628        fdb_kvs_header_read(kv_header, handle->dhandle,
629                            handle->kv_info_offset, handle->file->version, false);
630
631        // write KV header in 'new_file' using 'new_dhandle'
632        uint64_t new_kv_info_offset;
633        fdb_kvs_handle new_handle;
634        new_handle.file = new_file;
635        new_handle.dhandle = new_dhandle;
636        new_handle.kv_info_offset = BLK_NOT_FOUND;
637        new_kv_info_offset = fdb_kvs_header_append(&new_handle);
638        if (new_file_kv_info_offset) {
639            *new_file_kv_info_offset = new_kv_info_offset;
640        }
641
642        if (!filemgr_set_kv_header(new_file, kv_header, fdb_kvs_header_free)) {
643            // LCOV_EXCL_START
644            _fdb_kvs_header_free(kv_header);
645        } // LCOV_EXCL_STOP
646        fdb_kvs_header_reset_all_stats(new_file);
647    }
648
649    spin_lock(&handle->file->kv_header->lock);
650    spin_lock(&new_file->kv_header->lock);
651    // copy all in-memory custom cmp function pointers & seqnums
652    new_file->kv_header->default_kvs_cmp =
653        handle->file->kv_header->default_kvs_cmp;
654    new_file->kv_header->custom_cmp_enabled =
655        handle->file->kv_header->custom_cmp_enabled;
656    a = avl_first(handle->file->kv_header->idx_id);
657    while (a) {
658        node_old = _get_entry(a, struct kvs_node, avl_id);
659        aa = avl_search(new_file->kv_header->idx_id,
660                        &node_old->avl_id, _kvs_cmp_id);
661        assert(aa); // MUST exist
662        node_new = _get_entry(aa, struct kvs_node, avl_id);
663        node_new->custom_cmp = node_old->custom_cmp;
664        node_new->seqnum = node_old->seqnum;
665        node_new->op_stat = node_old->op_stat;
666        a = avl_next(a);
667    }
668    spin_unlock(&new_file->kv_header->lock);
669    spin_unlock(&handle->file->kv_header->lock);
670}
671
672// export KV header info to raw data
673static void _fdb_kvs_header_export(struct kvs_header *kv_header,
674                                   void **data, size_t *len)
675{
676    /* << raw data structure >>
677     * [# KV instances]:        8 bytes
678     * [current KV ID counter]: 8 bytes
679     * ---
680     * [name length]:           2 bytes
681     * [instance name]:         x bytes
682     * [instance ID]:           8 bytes
683     * [sequence number]:       8 bytes
684     * [# live index nodes]:    8 bytes
685     * [# docs]:                8 bytes
686     * [data size]:             8 bytes
687     * [flags]:                 8 bytes
688     * [delta size]:            8 bytes
689     * [# deleted docs]:        8 bytes
690     * ...
691     *    Please note that if the above format is changed, please also change...
692     *    _fdb_kvs_get_snap_info()
693     *    _fdb_kvs_header_import()
694     *    _kvs_stat_get_sum_doc()
695     *    _kvs_stat_get_sum_attr
696     */
697
698    int size = 0;
699    int offset = 0;
700    uint16_t name_len, _name_len;
701    uint64_t c = 0;
702    uint64_t _n_kv, _kv_id, _flags;
703    uint64_t _nlivenodes, _ndocs, _datasize, _ndeletes;
704    int64_t _deltasize;
705    fdb_kvs_id_t _id_counter;
706    fdb_seqnum_t _seqnum;
707    struct kvs_node *node;
708    struct avl_node *a;
709
710    if (kv_header == NULL) {
711        *data = NULL;
712        *len = 0;
713        return ;
714    }
715
716    spin_lock(&kv_header->lock);
717
718    // pre-scan to estimate the size of data
719    size += sizeof(uint64_t);
720    size += sizeof(fdb_kvs_id_t);
721    a = avl_first(kv_header->idx_name);
722    while(a) {
723        node = _get_entry(a, struct kvs_node, avl_name);
724        c++;
725        size += sizeof(uint16_t); // length
726        size += strlen(node->kvs_name)+1; // name
727        size += sizeof(node->id); // ID
728        size += sizeof(node->seqnum); // seq number
729        size += sizeof(node->stat.nlivenodes); // # live index nodes
730        size += sizeof(node->stat.ndocs); // # docs
731        size += sizeof(node->stat.datasize); // data size
732        size += sizeof(node->flags); // flags
733        size += sizeof(node->stat.deltasize); // delta size since commit
734        size += sizeof(node->stat.ndeletes); // # deleted docs
735        a = avl_next(a);
736    }
737
738    *data = (void *)malloc(size);
739
740    // # KV instances
741    _n_kv = _endian_encode(c);
742    memcpy((uint8_t*)*data + offset, &_n_kv, sizeof(_n_kv));
743    offset += sizeof(_n_kv);
744
745    // ID counter
746    _id_counter = _endian_encode(kv_header->id_counter);
747    memcpy((uint8_t*)*data + offset, &_id_counter, sizeof(_id_counter));
748    offset += sizeof(_id_counter);
749
750    a = avl_first(kv_header->idx_name);
751    while(a) {
752        node = _get_entry(a, struct kvs_node, avl_name);
753
754        // name length
755        name_len = strlen(node->kvs_name)+1;
756        _name_len = _endian_encode(name_len);
757        memcpy((uint8_t*)*data + offset, &_name_len, sizeof(_name_len));
758        offset += sizeof(_name_len);
759
760        // name
761        memcpy((uint8_t*)*data + offset, node->kvs_name, name_len);
762        offset += name_len;
763
764        // KV ID
765        _kv_id = _endian_encode(node->id);
766        memcpy((uint8_t*)*data + offset, &_kv_id, sizeof(_kv_id));
767        offset += sizeof(_kv_id);
768
769        // seq number
770        _seqnum = _endian_encode(node->seqnum);
771        memcpy((uint8_t*)*data + offset, &_seqnum, sizeof(_seqnum));
772        offset += sizeof(_seqnum);
773
774        // # live index nodes
775        _nlivenodes = _endian_encode(node->stat.nlivenodes);
776        memcpy((uint8_t*)*data + offset, &_nlivenodes, sizeof(_nlivenodes));
777        offset += sizeof(_nlivenodes);
778
779        // # docs
780        _ndocs = _endian_encode(node->stat.ndocs);
781        memcpy((uint8_t*)*data + offset, &_ndocs, sizeof(_ndocs));
782        offset += sizeof(_ndocs);
783
784        // datasize
785        _datasize = _endian_encode(node->stat.datasize);
786        memcpy((uint8_t*)*data + offset, &_datasize, sizeof(_datasize));
787        offset += sizeof(_datasize);
788
789        // flags
790        _flags = _endian_encode(node->flags);
791        memcpy((uint8_t*)*data + offset, &_flags, sizeof(_flags));
792        offset += sizeof(_flags);
793
794        // # delta index nodes + docsize created after last commit
795        _deltasize = _endian_encode(node->stat.deltasize);
796        memcpy((uint8_t*)*data + offset, &_deltasize, sizeof(_deltasize));
797        offset += sizeof(_deltasize);
798
799        // # deleted documents
800        _ndeletes = _endian_encode(node->stat.ndeletes);
801        memcpy((uint8_t*)*data + offset, &_ndeletes, sizeof(_ndeletes));
802        offset += sizeof(_ndeletes);
803
804        a = avl_next(a);
805    }
806
807    *len = size;
808
809    spin_unlock(&kv_header->lock);
810}
811
812void _fdb_kvs_header_import(struct kvs_header *kv_header,
813                            void *data, size_t len, uint64_t version,
814                            bool only_seq_nums)
815{
816    uint64_t i, offset = 0;
817    uint16_t name_len, _name_len;
818    uint64_t n_kv, _n_kv, kv_id, _kv_id, flags, _flags;
819    uint64_t _nlivenodes, _ndocs, _datasize, _ndeletes;
820    int64_t _deltasize;
821    bool is_deltasize;
822    fdb_kvs_id_t id_counter, _id_counter;
823    fdb_seqnum_t seqnum, _seqnum;
824    struct kvs_node *node;
825
826    // # KV instances
827    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
828    offset += sizeof(_n_kv);
829    n_kv = _endian_decode(_n_kv);
830
831    // ID counter
832    memcpy(&_id_counter, (uint8_t*)data + offset, sizeof(_id_counter));
833    offset += sizeof(_id_counter);
834    id_counter = _endian_decode(_id_counter);
835
836    spin_lock(&kv_header->lock);
837    kv_header->id_counter = id_counter;
838
839    // Version control
840    if (!ver_is_atleast_magic_001(version)) {
841        is_deltasize = false;
842        _deltasize = 0;
843        _ndeletes = 0;
844    } else {
845        is_deltasize = true;
846    }
847
848    for (i=0;i<n_kv;++i){
849        // name length
850        uint64_t name_offset;
851        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
852        offset += sizeof(_name_len);
853        name_offset = offset;
854        name_len = _endian_decode(_name_len);
855
856        // name
857        offset += name_len;
858
859        // KV ID
860        memcpy(&_kv_id, (uint8_t*)data + offset, sizeof(_kv_id));
861        offset += sizeof(_kv_id);
862        kv_id = _endian_decode(_kv_id);
863
864        // Search if a given KV header node exists or not.
865        struct kvs_node query;
866        query.id = kv_id;
867        struct avl_node *a = avl_search(kv_header->idx_id, &query.avl_id,
868                                        _kvs_cmp_id);
869        if (a) {
870            node = _get_entry(a, struct kvs_node, avl_id);
871        } else {
872            node = (struct kvs_node *)calloc(1, sizeof(struct kvs_node));
873            node->kvs_name = (char *)malloc(name_len);
874            memcpy(node->kvs_name, (uint8_t*)data + name_offset, name_len);
875            node->id = kv_id;
876            _init_op_stats(&node->op_stat);
877        }
878
879        // seq number
880        memcpy(&_seqnum, (uint8_t*)data + offset, sizeof(_seqnum));
881        offset += sizeof(_seqnum);
882        seqnum = _endian_decode(_seqnum);
883        node->seqnum = seqnum;
884
885        // # live index nodes
886        memcpy(&_nlivenodes, (uint8_t*)data + offset, sizeof(_nlivenodes));
887        offset += sizeof(_nlivenodes);
888
889        // # docs
890        memcpy(&_ndocs, (uint8_t*)data + offset, sizeof(_ndocs));
891        offset += sizeof(_ndocs);
892
893        // datasize
894        memcpy(&_datasize, (uint8_t*)data + offset, sizeof(_datasize));
895        offset += sizeof(_datasize);
896
897        // flags
898        memcpy(&_flags, (uint8_t*)data + offset, sizeof(_flags));
899        offset += sizeof(_flags);
900        flags = _endian_decode(_flags);
901
902        if (is_deltasize) {
903            // delta document + index size since previous commit
904            memcpy(&_deltasize, (uint8_t*)data + offset,
905                   sizeof(_deltasize));
906            offset += sizeof(_deltasize);
907            memcpy(&_ndeletes, (uint8_t*)data + offset,
908                   sizeof(_ndeletes));
909            offset += sizeof(_ndeletes);
910        }
911
912        if (!only_seq_nums) {
913            node->stat.nlivenodes = _endian_decode(_nlivenodes);
914            node->stat.ndocs = _endian_decode(_ndocs);
915            node->stat.datasize = _endian_decode(_datasize);
916            node->stat.deltasize = _endian_decode(_deltasize);
917            node->stat.ndeletes = _endian_decode(_ndeletes);
918            node->flags = flags;
919            node->custom_cmp = NULL;
920        }
921
922        if (!a) { // Insert a new KV header node if not exist.
923            avl_insert(kv_header->idx_name, &node->avl_name, _kvs_cmp_name);
924            avl_insert(kv_header->idx_id, &node->avl_id, _kvs_cmp_id);
925            ++kv_header->num_kv_stores;
926        }
927    }
928    spin_unlock(&kv_header->lock);
929}
930
931fdb_status _fdb_kvs_get_snap_info(void *data, uint64_t version,
932                                  fdb_snapshot_info_t *snap_info)
933{
934    int i, offset = 0, sizeof_skipped_segments;
935    uint16_t name_len, _name_len;
936    int64_t n_kv, _n_kv;
937    bool is_deltasize;
938    fdb_seqnum_t _seqnum;
939    // Version control
940    if (!ver_is_atleast_magic_001(version)) {
941        is_deltasize = false;
942    } else {
943        is_deltasize = true;
944    }
945
946    // # KV instances
947    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
948    offset += sizeof(_n_kv);
949    // since n_kv doesn't count the default KVS, increase it by 1.
950    n_kv = _endian_decode(_n_kv) + 1;
951    assert(n_kv); // Must have at least one kv instance
952    snap_info->kvs_markers = (fdb_kvs_commit_marker_t *)malloc(
953                                   (n_kv) * sizeof(fdb_kvs_commit_marker_t));
954    if (!snap_info->kvs_markers) { // LCOV_EXCL_START
955        return FDB_RESULT_ALLOC_FAIL;
956    } // LCOV_EXCL_STOP
957
958    snap_info->num_kvs_markers = n_kv;
959
960    // Skip over ID counter
961    offset += sizeof(fdb_kvs_id_t);
962
963    sizeof_skipped_segments = sizeof(uint64_t) // seqnum will be the last read
964                            + sizeof(uint64_t) // skip over nlivenodes
965                            + sizeof(uint64_t) // skip over ndocs
966                            + sizeof(uint64_t) // skip over datasize
967                            + sizeof(uint64_t); // skip over flags
968    if (is_deltasize) {
969        sizeof_skipped_segments += sizeof(uint64_t); // skip over deltasize
970        sizeof_skipped_segments += sizeof(uint64_t); // skip over ndeletes
971    }
972
973    for (i = 0; i < n_kv-1; ++i){
974        fdb_kvs_commit_marker_t *info = &snap_info->kvs_markers[i];
975        // Read the kv store name length
976        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
977        offset += sizeof(_name_len);
978        name_len = _endian_decode(_name_len);
979
980        // Retrieve the KV Store name
981        info->kv_store_name = (char *)malloc(name_len); // TODO: cleanup if err
982        memcpy(info->kv_store_name, (uint8_t*)data + offset, name_len);
983        offset += name_len;
984
985        // Skip over KV ID
986        offset += sizeof(uint64_t);
987
988        // Retrieve the KV Store Commit Sequence number
989        memcpy(&_seqnum, (uint8_t*)data + offset, sizeof(_seqnum));
990        info->seqnum = _endian_decode(_seqnum);
991
992        // Skip over seqnum, nlivenodes, ndocs, datasize, flags etc onto next..
993        offset += sizeof_skipped_segments;
994    }
995
996    return FDB_RESULT_SUCCESS;
997}
998
999uint64_t _kvs_stat_get_sum_attr(void *data, uint64_t version,
1000                                kvs_stat_attr_t attr)
1001{
1002    uint64_t ret = 0;
1003    int i, offset = 0;
1004    uint16_t name_len, _name_len;
1005    int64_t n_kv, _n_kv;
1006    bool is_deltasize;
1007    uint64_t nlivenodes, ndocs, datasize, flags;
1008    int64_t deltasize;
1009
1010    // Version control
1011    if (!ver_is_atleast_magic_001(version)) {
1012        is_deltasize = false;
1013    } else {
1014        is_deltasize = true;
1015    }
1016
1017    // # KV instances
1018    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
1019    offset += sizeof(_n_kv);
1020    // since n_kv doesn't count the default KVS, increase it by 1.
1021    n_kv = _endian_decode(_n_kv) + 1;
1022    assert(n_kv); // Must have at least one kv instance
1023
1024    // Skip over ID counter
1025    offset += sizeof(fdb_kvs_id_t);
1026
1027    for (i = 0; i < n_kv-1; ++i){
1028        // Read the kv store name length and skip over the length
1029        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
1030        offset += sizeof(_name_len);
1031        name_len = _endian_decode(_name_len);
1032
1033        // Skip over the KV Store name
1034        offset += name_len;
1035
1036        // Skip over KV ID
1037        offset += sizeof(uint64_t);
1038
1039        // Skip over KV store seqnum
1040        offset += sizeof(uint64_t);
1041
1042        // pick just the attribute requested, skipping over rest..
1043        if (attr == KVS_STAT_NLIVENODES) {
1044            memcpy(&nlivenodes, (uint8_t *)data + offset, sizeof(nlivenodes));
1045            ret += _endian_decode(nlivenodes);
1046            // skip over nlivenodes just read
1047            offset += sizeof(nlivenodes);
1048            // skip over ndocs, datasize, flags (and deltasize, ndeletes)
1049            offset += sizeof(nlivenodes) + sizeof(ndocs) + sizeof(datasize)
1050                   + sizeof(flags) + (is_deltasize ? sizeof(deltasize)*2 : 0);
1051        } else if (attr == KVS_STAT_DATASIZE) {
1052            offset += sizeof(nlivenodes) + sizeof(ndocs);
1053            memcpy(&datasize, (uint8_t *)data + offset, sizeof(datasize));
1054            ret += _endian_decode(datasize);
1055            // skip over datasize, flags (and deltasize, ndeletes)
1056            offset += sizeof(datasize) + sizeof(flags)
1057                   + (is_deltasize ? sizeof(deltasize)*2 : 0);
1058        } else if (attr == KVS_STAT_DELTASIZE) {
1059            if (is_deltasize) {
1060                offset += sizeof(nlivenodes) + sizeof(ndocs) + sizeof (datasize)
1061                        + sizeof(flags);
1062                memcpy(&deltasize, (uint8_t *)data + offset, sizeof(deltasize));
1063                ret += _endian_decode(deltasize);
1064                // skip over datasize, flags (and deltasize)
1065                offset += sizeof(deltasize)*2; // and ndeletes
1066            }
1067        } else { // Attribute fetched not implemented yet..
1068            fdb_assert(false, 0, attr); // Implement fetch for this attribute
1069        }
1070    }
1071
1072    return ret;
1073}
1074
1075uint64_t fdb_kvs_header_append(fdb_kvs_handle *handle)
1076{
1077    char *doc_key = alca(char, 32);
1078    void *data;
1079    size_t len;
1080    uint64_t kv_info_offset, prev_offset;
1081    struct docio_object doc;
1082    struct docio_length doc_len;
1083    struct filemgr *file = handle->file;
1084    struct docio_handle *dhandle = handle->dhandle;
1085
1086    _fdb_kvs_header_export(file->kv_header, &data, &len);
1087
1088    prev_offset = handle->kv_info_offset;
1089
1090    memset(&doc, 0, sizeof(struct docio_object));
1091    sprintf(doc_key, "KV_header");
1092    doc.key = (void *)doc_key;
1093    doc.meta = NULL;
1094    doc.body = data;
1095    doc.length.keylen = strlen(doc_key) + 1;
1096    doc.length.metalen = 0;
1097    doc.length.bodylen = len;
1098    doc.seqnum = 0;
1099    kv_info_offset = docio_append_doc_system(dhandle, &doc);
1100    free(data);
1101
1102    if (prev_offset != BLK_NOT_FOUND) {
1103        if (docio_read_doc_length(handle->dhandle, &doc_len, prev_offset)
1104            == FDB_RESULT_SUCCESS) {
1105            // mark stale
1106            filemgr_mark_stale(handle->file, prev_offset, _fdb_get_docsize(doc_len));
1107        }
1108    }
1109
1110    return kv_info_offset;
1111}
1112
1113void fdb_kvs_header_read(struct kvs_header *kv_header,
1114                         struct docio_handle *dhandle,
1115                         uint64_t kv_info_offset,
1116                         uint64_t version,
1117                         bool only_seq_nums)
1118{
1119    int64_t offset;
1120    struct docio_object doc;
1121
1122    memset(&doc, 0, sizeof(struct docio_object));
1123    offset = docio_read_doc(dhandle, kv_info_offset, &doc, true);
1124
1125    if (offset <= 0) {
1126        fdb_log(dhandle->log_callback, (fdb_status) offset,
1127                "Failed to read a KV header with the offset %" _F64 " from a "
1128                "database file '%s'", kv_info_offset, dhandle->file->filename);
1129        return;
1130    }
1131
1132    _fdb_kvs_header_import(kv_header, doc.body, doc.length.bodylen,
1133                           version, only_seq_nums);
1134    free_docio_object(&doc, 1, 1, 1);
1135}
1136
1137fdb_seqnum_t fdb_kvs_get_committed_seqnum(fdb_kvs_handle *handle)
1138{
1139    uint8_t *buf;
1140    uint64_t dummy64;
1141    uint64_t version;
1142    uint64_t kv_info_offset;
1143    size_t len;
1144    bid_t hdr_bid;
1145    fdb_seqnum_t seqnum = SEQNUM_NOT_USED;
1146    fdb_kvs_id_t id = 0;
1147    char *compacted_filename = NULL;
1148    struct filemgr *file = handle->file;
1149
1150    buf = alca(uint8_t, file->config->blocksize);
1151
1152    if (handle->kvs && handle->kvs->id > 0) {
1153        id = handle->kvs->id;
1154    }
1155
1156    hdr_bid = filemgr_get_header_bid(file);
1157    if (hdr_bid == BLK_NOT_FOUND) {
1158        // header doesn't exist
1159        return 0;
1160    }
1161
1162    // read header
1163    filemgr_fetch_header(file, hdr_bid, buf, &len, &seqnum, NULL, NULL,
1164                         &version, NULL, &handle->log_callback);
1165    if (id > 0) { // non-default KVS
1166        // read last KVS header
1167        fdb_fetch_header(version, buf, &dummy64, &dummy64,
1168                         &dummy64, &dummy64, &dummy64, &dummy64,
1169                         &dummy64, &dummy64,
1170                         &kv_info_offset, &dummy64,
1171                         &compacted_filename, NULL);
1172
1173        int64_t doc_offset;
1174        struct kvs_header *kv_header;
1175        struct docio_object doc;
1176
1177        _fdb_kvs_header_create(&kv_header);
1178        memset(&doc, 0, sizeof(struct docio_object));
1179        doc_offset = docio_read_doc(handle->dhandle,
1180                                    kv_info_offset, &doc, true);
1181
1182        if (doc_offset <= 0) {
1183            // fail
1184            _fdb_kvs_header_free(kv_header);
1185            return 0;
1186
1187        } else {
1188            _fdb_kvs_header_import(kv_header, doc.body,
1189                                   doc.length.bodylen, version, false);
1190            // get local sequence number for the KV instance
1191            seqnum = _fdb_kvs_get_seqnum(kv_header,
1192                                         handle->kvs->id);
1193            _fdb_kvs_header_free(kv_header);
1194            free_docio_object(&doc, 1, 1, 1);
1195        }
1196    }
1197    return seqnum;
1198}
1199
1200LIBFDB_API
1201fdb_status fdb_get_kvs_seqnum(fdb_kvs_handle *handle, fdb_seqnum_t *seqnum)
1202{
1203    if (!handle || !seqnum) {
1204        return FDB_RESULT_INVALID_ARGS;
1205    }
1206
1207    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
1208        return FDB_RESULT_HANDLE_BUSY;
1209    }
1210
1211    if (handle->shandle) {
1212        // handle for snapshot
1213        // return MAX_SEQNUM instead of the file's sequence number
1214        *seqnum = handle->max_seqnum;
1215    } else {
1216        fdb_check_file_reopen(handle, NULL);
1217        fdb_sync_db_header(handle);
1218
1219        struct filemgr *file;
1220        file = handle->file;
1221
1222        if (handle->kvs == NULL ||
1223            handle->kvs->id == 0) {
1224            filemgr_mutex_lock(file);
1225            *seqnum = filemgr_get_seqnum(file);
1226            filemgr_mutex_unlock(file);
1227        } else {
1228            *seqnum = fdb_kvs_get_seqnum(file, handle->kvs->id);
1229        }
1230    }
1231    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
1232    return FDB_RESULT_SUCCESS;
1233}
1234
1235void fdb_kvs_set_seqnum(struct filemgr *file,
1236                           fdb_kvs_id_t id,
1237                           fdb_seqnum_t seqnum)
1238{
1239    struct kvs_header *kv_header = file->kv_header;
1240    struct kvs_node query, *node;
1241    struct avl_node *a;
1242
1243    if (id == 0) {
1244        // default KV instance
1245        filemgr_set_seqnum(file, seqnum);
1246        return;
1247    }
1248
1249    spin_lock(&kv_header->lock);
1250    query.id = id;
1251    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1252    node = _get_entry(a, struct kvs_node, avl_id);
1253    node->seqnum = seqnum;
1254    spin_unlock(&kv_header->lock);
1255}
1256
1257void _fdb_kvs_header_free(struct kvs_header *kv_header)
1258{
1259    struct kvs_node *node;
1260    struct avl_node *a;
1261
1262    a = avl_first(kv_header->idx_name);
1263    while (a) {
1264        node = _get_entry(a, struct kvs_node, avl_name);
1265        a = avl_next(a);
1266        avl_remove(kv_header->idx_name, &node->avl_name);
1267
1268        free(node->kvs_name);
1269        free(node);
1270    }
1271    free(kv_header->idx_name);
1272    free(kv_header->idx_id);
1273    free(kv_header);
1274}
1275
1276void fdb_kvs_header_free(struct filemgr *file)
1277{
1278    if (file->kv_header == NULL) {
1279        return;
1280    }
1281
1282    _fdb_kvs_header_free(file->kv_header);
1283    file->kv_header = NULL;
1284}
1285
1286static fdb_status _fdb_kvs_create(fdb_kvs_handle *root_handle,
1287                                  const char *kvs_name,
1288                                  fdb_kvs_config *kvs_config)
1289{
1290    int kv_ins_name_len;
1291    fdb_status fs = FDB_RESULT_SUCCESS;
1292    struct avl_node *a;
1293    struct filemgr *file;
1294    struct kvs_node *node, query;
1295    struct kvs_header *kv_header;
1296
1297    if (root_handle->config.multi_kv_instances == false) {
1298        // cannot open KV instance under single DB instance mode
1299        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_CONFIG,
1300                       "Cannot open or create KV store instance '%s' because multi-KV "
1301                       "store instance mode is disabled.",
1302                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1303    }
1304    if (root_handle->kvs->type != KVS_ROOT) {
1305        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_HANDLE,
1306                       "Cannot open or create KV store instance '%s' because the handle "
1307                       "doesn't support multi-KV sotre instance mode.",
1308                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1309    }
1310
1311fdb_kvs_create_start:
1312    fdb_check_file_reopen(root_handle, NULL);
1313    filemgr_mutex_lock(root_handle->file);
1314    fdb_sync_db_header(root_handle);
1315
1316    if (filemgr_is_rollback_on(root_handle->file)) {
1317        filemgr_mutex_unlock(root_handle->file);
1318        return FDB_RESULT_FAIL_BY_ROLLBACK;
1319    }
1320
1321    file = root_handle->file;
1322
1323    file_status_t fstatus = filemgr_get_file_status(file);
1324    if (fstatus == FILE_REMOVED_PENDING) {
1325        // we must not write into this file
1326        // file status was changed by other thread .. start over
1327        filemgr_mutex_unlock(file);
1328        goto fdb_kvs_create_start;
1329    }
1330
1331    kv_header = file->kv_header;
1332    spin_lock(&kv_header->lock);
1333
1334    // find existing KV instance
1335    // search by name
1336    query.kvs_name = (char*)kvs_name;
1337    a = avl_search(kv_header->idx_name, &query.avl_name, _kvs_cmp_name);
1338    if (a) { // KV name already exists
1339        spin_unlock(&kv_header->lock);
1340        filemgr_mutex_unlock(file);
1341        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1342                       "Failed to create KV Store '%s' as it already exists.",
1343                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1344    }
1345
1346    // create a kvs_node and insert
1347    node = (struct kvs_node *)calloc(1, sizeof(struct kvs_node));
1348    node->id = kv_header->id_counter++;
1349    node->seqnum = 0;
1350    node->flags = 0x0;
1351    _init_op_stats(&node->op_stat);
1352    // search fhandle's custom cmp func list first
1353    node->custom_cmp = fdb_kvs_find_cmp_name(root_handle,
1354                                             (char *)kvs_name);
1355    if (node->custom_cmp == NULL && kvs_config->custom_cmp) {
1356        // follow kvs_config's custom cmp next
1357        node->custom_cmp = kvs_config->custom_cmp;
1358        // if custom cmp function is given by user but
1359        // there is no corresponding function in fhandle's list
1360        // add it into the list
1361        fdb_file_handle_add_cmp_func(root_handle->fhandle,
1362                                     (char*)kvs_name,
1363                                     kvs_config->custom_cmp);
1364    }
1365    if (node->custom_cmp) { // custom cmp function is used
1366        node->flags |= KVS_FLAG_CUSTOM_CMP;
1367        kv_header->custom_cmp_enabled = 1;
1368    }
1369    kv_ins_name_len = strlen(kvs_name)+1;
1370    node->kvs_name = (char *)malloc(kv_ins_name_len);
1371    strcpy(node->kvs_name, kvs_name);
1372
1373    avl_insert(kv_header->idx_name, &node->avl_name, _kvs_cmp_name);
1374    avl_insert(kv_header->idx_id, &node->avl_id, _kvs_cmp_id);
1375    ++kv_header->num_kv_stores;
1376    spin_unlock(&kv_header->lock);
1377
1378    // if compaction is in-progress,
1379    // create a same kvs_node for the new file
1380    if (file->new_file &&
1381        filemgr_get_file_status(file) == FILE_COMPACT_OLD) {
1382        struct kvs_node *node_new;
1383        struct kvs_header *kv_header_new;
1384
1385        kv_header_new = file->new_file->kv_header;
1386        node_new = (struct kvs_node*)calloc(1, sizeof(struct kvs_node));
1387        *node_new = *node;
1388        node_new->kvs_name = (char*)malloc(kv_ins_name_len);
1389        strcpy(node_new->kvs_name, kvs_name);
1390
1391        // insert into new file's kv_header
1392        spin_lock(&kv_header_new->lock);
1393        if (node->custom_cmp) {
1394            kv_header_new->custom_cmp_enabled = 1;
1395        }
1396        avl_insert(kv_header_new->idx_name, &node_new->avl_name, _kvs_cmp_name);
1397        avl_insert(kv_header_new->idx_id, &node_new->avl_id, _kvs_cmp_id);
1398        spin_unlock(&kv_header_new->lock);
1399    }
1400
1401    // since this function calls filemgr_commit() and appends a new DB header,
1402    // we should finalize & flush the previous dirty update before commit.
1403    bid_t dirty_idtree_root = BLK_NOT_FOUND;
1404    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
1405    struct filemgr_dirty_update_node *prev_node = NULL;
1406    struct filemgr_dirty_update_node *new_node = NULL;
1407
1408    _fdb_dirty_update_ready(root_handle, &prev_node, &new_node,
1409                            &dirty_idtree_root, &dirty_seqtree_root, false);
1410
1411    _fdb_dirty_update_finalize(root_handle, prev_node, new_node,
1412                               &dirty_idtree_root, &dirty_seqtree_root, true);
1413
1414    // append system doc
1415    root_handle->kv_info_offset = fdb_kvs_header_append(root_handle);
1416
1417    // if no compaction is being performed, append header and commit
1418    if (root_handle->file == file) {
1419        root_handle->cur_header_revnum = fdb_set_file_header(root_handle, true);
1420        fs = filemgr_commit(root_handle->file,
1421                !(root_handle->config.durability_opt & FDB_DRB_ASYNC),
1422                 &root_handle->log_callback);
1423    }
1424
1425    filemgr_mutex_unlock(file);
1426
1427    return fs;
1428}
1429
1430// this function just returns pointer
1431char* _fdb_kvs_get_name(fdb_kvs_handle *handle, struct filemgr *file)
1432{
1433    struct kvs_node *node, query;
1434    struct avl_node *a;
1435
1436    if (handle->kvs == NULL) {
1437        // single KV instance mode
1438        return NULL;
1439    }
1440
1441    query.id = handle->kvs->id;
1442    if (query.id == 0) { // default KV instance
1443        return NULL;
1444    }
1445    spin_lock(&file->kv_header->lock);
1446    a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1447    if (a) {
1448        node = _get_entry(a, struct kvs_node, avl_id);
1449        spin_unlock(&file->kv_header->lock);
1450        return node->kvs_name;
1451    }
1452    spin_unlock(&file->kv_header->lock);
1453    return NULL;
1454}
1455
1456// this function just returns pointer to kvs_name & offset to user key
1457const char* _fdb_kvs_extract_name_off(fdb_kvs_handle *handle, void *keybuf,
1458                                      size_t *key_offset)
1459{
1460    struct kvs_node *node, query;
1461    struct avl_node *a;
1462    fdb_kvs_id_t kv_id;
1463    struct filemgr *file = handle->file;
1464
1465    if (!handle->kvs) { // single KV instance mode
1466        *key_offset = 0;
1467        return DEFAULT_KVS_NAME;
1468    }
1469
1470    *key_offset = handle->config.chunksize;
1471    buf2kvid(*key_offset, keybuf, &kv_id);
1472    query.id = kv_id;
1473    if (query.id == 0) { // default KV instance in multi kvs mode
1474        return default_kvs_name;
1475    }
1476    spin_lock(&file->kv_header->lock);
1477    a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1478    if (a) {
1479        node = _get_entry(a, struct kvs_node, avl_id);
1480        const char *kvs_name = node->kvs_name;
1481        spin_unlock(&file->kv_header->lock);
1482        return kvs_name;
1483    }
1484    spin_unlock(&file->kv_header->lock);
1485    return NULL;
1486}
1487
1488fdb_status _fdb_kvs_clone_snapshot(fdb_kvs_handle *handle_in,
1489                                   fdb_kvs_handle *handle_out)
1490{
1491    fdb_status fs;
1492    fdb_kvs_handle *root_handle = handle_in->kvs->root;
1493
1494    if (!handle_out->kvs) {
1495        // create kvs_info
1496        handle_out->kvs = (struct kvs_info*)calloc(1, sizeof(struct kvs_info));
1497        handle_out->kvs->type = handle_in->kvs->type;
1498        handle_out->kvs->id = handle_in->kvs->id;
1499        handle_out->kvs->root = root_handle;
1500        handle_out->kvs_config.custom_cmp = handle_in->kvs_config.custom_cmp;
1501
1502        struct kvs_opened_node *opened_node = (struct kvs_opened_node *)
1503            calloc(1, sizeof(struct kvs_opened_node));
1504        opened_node->handle = handle_out;
1505        handle_out->node = opened_node;
1506
1507        spin_lock(&root_handle->fhandle->lock);
1508        list_push_back(root_handle->fhandle->handles, &opened_node->le);
1509        spin_unlock(&root_handle->fhandle->lock);
1510    }
1511
1512    fs = _fdb_clone_snapshot(handle_in, handle_out);
1513    if (fs != FDB_RESULT_SUCCESS) {
1514        if (handle_out->node) {
1515            spin_lock(&root_handle->fhandle->lock);
1516            list_remove(root_handle->fhandle->handles, &handle_out->node->le);
1517            spin_unlock(&root_handle->fhandle->lock);
1518            free(handle_out->node);
1519        }
1520        free(handle_out->kvs);
1521    }
1522    return fs;
1523}
1524
1525// 1) allocate memory & create 'handle->kvs'
1526//    by calling fdb_kvs_info_create().
1527//      -> this will allocate a corresponding node and
1528//         insert it into fhandle->handles list.
1529// 2) if matching KVS name doesn't exist, create it.
1530// 3) call _fdb_open().
1531fdb_status _fdb_kvs_open(fdb_kvs_handle *root_handle,
1532                         fdb_config *config,
1533                         fdb_kvs_config *kvs_config,
1534                         struct filemgr *file,
1535                         const char *filename,
1536                         const char *kvs_name,
1537                         fdb_kvs_handle *handle)
1538{
1539    fdb_status fs;
1540
1541    if (handle->kvs == NULL) {
1542        // create kvs_info
1543        fdb_kvs_info_create(root_handle, handle, file, kvs_name);
1544    }
1545
1546    if (handle->kvs == NULL) {
1547        // KV instance name is not found
1548        if (!kvs_config->create_if_missing) {
1549            return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1550                           "Failed to open KV store '%s' because it doesn't exist.",
1551                           kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1552        }
1553        if (root_handle->config.flags == FDB_OPEN_FLAG_RDONLY) {
1554            return fdb_log(&root_handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
1555                           "Failed to create KV store '%s' because the KV store's handle "
1556                           "is read-only.", kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1557        }
1558
1559        // create
1560        fs = _fdb_kvs_create(root_handle, kvs_name, kvs_config);
1561        if (fs != FDB_RESULT_SUCCESS) { // create fail
1562            return FDB_RESULT_INVALID_KV_INSTANCE_NAME;
1563        }
1564        // create kvs_info again
1565        fdb_kvs_info_create(root_handle, handle, file, kvs_name);
1566        if (handle->kvs == NULL) { // fail again
1567            return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1568                           "Failed to create KV store '%s' because the KV store's handle "
1569                           "is read-only.", kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1570        }
1571    }
1572    fs = _fdb_open(handle, filename, FDB_AFILENAME, config);
1573    if (fs != FDB_RESULT_SUCCESS) {
1574        if (handle->node) {
1575            spin_lock(&root_handle->fhandle->lock);
1576            list_remove(root_handle->fhandle->handles, &handle->node->le);
1577            spin_unlock(&root_handle->fhandle->lock);
1578            free(handle->node);
1579        } // 'handle->node == NULL' happens only during rollback
1580        free(handle->kvs);
1581    }
1582    return fs;
1583}
1584
1585// 1) identify whether the requested KVS is default or non-default.
1586// 2) if the requested KVS is default,
1587//   2-1) if no KVS handle is opened yet from this fhandle,
1588//        -> return the root handle.
1589//   2-2) if the root handle is already opened,
1590//        -> allocate memory for handle, and call _fdb_open().
1591//        -> 'handle->kvs' will be created in _fdb_open(),
1592//           since it is treated as a default handle.
1593//        -> allocate a corresponding node and insert it into
1594//           fhandle->handles list.
1595// 3) if the requested KVS is non-default,
1596//    -> allocate memory for handle, and call _fdb_kvs_open().
1597LIBFDB_API
1598fdb_status fdb_kvs_open(fdb_file_handle *fhandle,
1599                        fdb_kvs_handle **ptr_handle,
1600                        const char *kvs_name,
1601                        fdb_kvs_config *kvs_config)
1602{
1603    fdb_kvs_handle *handle;
1604    fdb_config config;
1605    fdb_status fs;
1606    fdb_kvs_handle *root_handle;
1607    fdb_kvs_config config_local;
1608    struct filemgr *file = NULL;
1609    struct filemgr *latest_file = NULL;
1610
1611    if (!fhandle) {
1612        return FDB_RESULT_INVALID_ARGS;
1613    }
1614    root_handle = fhandle->root;
1615    config = root_handle->config;
1616
1617    if (kvs_config) {
1618        if (validate_fdb_kvs_config(kvs_config)) {
1619            config_local = *kvs_config;
1620        } else {
1621            return FDB_RESULT_INVALID_CONFIG;
1622        }
1623    } else {
1624        config_local = get_default_kvs_config();
1625    }
1626
1627    fdb_check_file_reopen(root_handle, NULL);
1628    fdb_sync_db_header(root_handle);
1629
1630    file = root_handle->file;
1631    latest_file = root_handle->file;
1632
1633    if (kvs_name == NULL || !strcmp(kvs_name, default_kvs_name)) {
1634        // return the default KV store handle
1635        spin_lock(&fhandle->lock);
1636        if (!(fhandle->flags & FHANDLE_ROOT_OPENED)) {
1637            // the root handle is not opened yet
1638            // just return the root handle
1639            fdb_custom_cmp_variable default_kvs_cmp;
1640
1641            root_handle->kvs_config = config_local;
1642
1643            if (root_handle->file->kv_header) {
1644                // search fhandle's custom cmp func list first
1645                default_kvs_cmp = fdb_kvs_find_cmp_name(root_handle, (char *)kvs_name);
1646
1647                spin_lock(&root_handle->file->kv_header->lock);
1648                root_handle->file->kv_header->default_kvs_cmp = default_kvs_cmp;
1649
1650                if (root_handle->file->kv_header->default_kvs_cmp == NULL &&
1651                    root_handle->kvs_config.custom_cmp) {
1652                    // follow kvs_config's custom cmp next
1653                    root_handle->file->kv_header->default_kvs_cmp =
1654                        root_handle->kvs_config.custom_cmp;
1655                    fdb_file_handle_add_cmp_func(fhandle, NULL,
1656                                                 root_handle->kvs_config.custom_cmp);
1657                }
1658
1659                if (root_handle->file->kv_header->default_kvs_cmp) {
1660                    root_handle->file->kv_header->custom_cmp_enabled = 1;
1661                    fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1662                }
1663                spin_unlock(&root_handle->file->kv_header->lock);
1664            }
1665
1666            *ptr_handle = root_handle;
1667            fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1668            fhandle->flags |= FHANDLE_ROOT_OPENED;
1669            fs = FDB_RESULT_SUCCESS;
1670            spin_unlock(&fhandle->lock);
1671
1672        } else {
1673            // the root handle is already opened
1674            // open new default KV store handle
1675            spin_unlock(&fhandle->lock);
1676            handle = (fdb_kvs_handle*)calloc(1, sizeof(fdb_kvs_handle));
1677            handle->kvs_config = config_local;
1678            atomic_init_uint8_t(&handle->handle_busy, 0);
1679
1680            if (root_handle->file->kv_header) {
1681                spin_lock(&root_handle->file->kv_header->lock);
1682                handle->kvs_config.custom_cmp =
1683                    root_handle->file->kv_header->default_kvs_cmp;
1684                spin_unlock(&root_handle->file->kv_header->lock);
1685            }
1686
1687            handle->fhandle = fhandle;
1688            fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1689            if (fs != FDB_RESULT_SUCCESS) {
1690                free(handle);
1691                *ptr_handle = NULL;
1692            } else {
1693                // insert into fhandle's list
1694                struct kvs_opened_node *node;
1695                node = (struct kvs_opened_node *)
1696                       calloc(1, sizeof(struct kvs_opened_node));
1697                node->handle = handle;
1698                spin_lock(&fhandle->lock);
1699                list_push_front(fhandle->handles, &node->le);
1700                spin_unlock(&fhandle->lock);
1701
1702                handle->node = node;
1703                *ptr_handle = handle;
1704            }
1705        }
1706        return fs;
1707    }
1708
1709    if (config.multi_kv_instances == false) {
1710        // cannot open KV instance under single DB instance mode
1711        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_CONFIG,
1712                       "Cannot open KV store instance '%s' because multi-KV "
1713                       "store instance mode is disabled.",
1714                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1715    }
1716    if (root_handle->kvs->type != KVS_ROOT) {
1717        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_HANDLE,
1718                       "Cannot open KV store instance '%s' because the handle "
1719                       "doesn't support multi-KV sotre instance mode.",
1720                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1721    }
1722    if (root_handle->shandle) {
1723        // cannot open KV instance from a snapshot
1724        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_ARGS,
1725                       "Not allowed to open KV store instance '%s' from the "
1726                       "snapshot handle.",
1727                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1728    }
1729
1730    handle = (fdb_kvs_handle *)calloc(1, sizeof(fdb_kvs_handle));
1731    if (!handle) { // LCOV_EXCL_START
1732        return FDB_RESULT_ALLOC_FAIL;
1733    } // LCOV_EXCL_STOP
1734
1735    atomic_init_uint8_t(&handle->handle_busy, 0);
1736    handle->fhandle = fhandle;
1737    fs = _fdb_kvs_open(root_handle, &config, &config_local,
1738                       latest_file, file->filename, kvs_name, handle);
1739    if (fs == FDB_RESULT_SUCCESS) {
1740        *ptr_handle = handle;
1741    } else {
1742        *ptr_handle = NULL;
1743        free(handle);
1744    }
1745    return fs;
1746}
1747
1748LIBFDB_API
1749fdb_status fdb_kvs_open_default(fdb_file_handle *fhandle,
1750                                fdb_kvs_handle **ptr_handle,
1751                                fdb_kvs_config *config)
1752{
1753    return fdb_kvs_open(fhandle, ptr_handle, NULL, config);
1754}
1755
1756// 1) remove corresponding node from fhandle->handles list.
1757// 2) call _fdb_close().
1758static fdb_status _fdb_kvs_close(fdb_kvs_handle *handle)
1759{
1760    fdb_kvs_handle *root_handle = handle->kvs->root;
1761    fdb_status fs;
1762
1763    if (handle->node) {
1764        spin_lock(&root_handle->fhandle->lock);
1765        list_remove(root_handle->fhandle->handles, &handle->node->le);
1766        spin_unlock(&root_handle->fhandle->lock);
1767        free(handle->node);
1768    } // 'handle->node == NULL' happens only during rollback
1769
1770    fs = _fdb_close(handle);
1771    return fs;
1772}
1773
1774// close all sub-KV store handles belonging to the root handle
1775fdb_status fdb_kvs_close_all(fdb_kvs_handle *root_handle)
1776{
1777    fdb_status fs;
1778    struct list_elem *e;
1779    struct kvs_opened_node *node;
1780
1781    spin_lock(&root_handle->fhandle->lock);
1782    e = list_begin(root_handle->fhandle->handles);
1783    while (e) {
1784        node = _get_entry(e, struct kvs_opened_node, le);
1785        e = list_remove(root_handle->fhandle->handles, &node->le);
1786        fs = _fdb_close(node->handle);
1787        if (fs != FDB_RESULT_SUCCESS) {
1788            spin_unlock(&root_handle->fhandle->lock);
1789            return fs;
1790        }
1791        fdb_kvs_info_free(node->handle);
1792        free(node->handle);
1793        free(node);
1794    }
1795    spin_unlock(&root_handle->fhandle->lock);
1796
1797    return FDB_RESULT_SUCCESS;
1798}
1799
1800// 1) identify whether the requested handle is for default KVS or not.
1801// 2) if the requested handle is for the default KVS,
1802//   2-1) if the requested handle is the root handle,
1803//        -> just clear the OPENED flag.
1804//   2-2) if the requested handle is not the root handle,
1805//        -> call _fdb_close(),
1806//        -> free 'handle->kvs' by calling fdb_kvs_info_free(),
1807//        -> remove the corresponding node from fhandle->handles list,
1808//        -> free the memory for the handle.
1809// 3) if the requested handle is for non-default KVS,
1810//    -> call _fdb_kvs_close(),
1811//       -> this will remove the node from fhandle->handles list.
1812//    -> free 'handle->kvs' by calling fdb_kvs_info_free(),
1813//    -> free the memory for the handle.
1814LIBFDB_API
1815fdb_status fdb_kvs_close(fdb_kvs_handle *handle)
1816{
1817    fdb_status fs;
1818
1819    if (!handle) {
1820        return FDB_RESULT_INVALID_ARGS;
1821    }
1822    if (handle->num_iterators) {
1823        // There are still active iterators created from this handle
1824        return FDB_RESULT_KV_STORE_BUSY;
1825    }
1826
1827    if (handle->shandle && handle->kvs == NULL) {
1828        // snapshot of the default KV store + single KV store mode
1829        // directly close handle
1830        // (snapshot of the other KV stores will be closed
1831        //  using _fdb_kvs_close(...) below)
1832        fs = _fdb_close(handle);
1833        if (fs == FDB_RESULT_SUCCESS) {
1834            free(handle);
1835        }
1836        return fs;
1837    }
1838
1839    if (handle->kvs == NULL ||
1840        handle->kvs->type == KVS_ROOT) {
1841        // the default KV store handle
1842
1843        if (handle->fhandle->root == handle) {
1844            // do nothing for root handle
1845            // the root handle will be closed with fdb_close() API call.
1846            spin_lock(&handle->fhandle->lock);
1847            handle->fhandle->flags &= ~FHANDLE_ROOT_OPENED; // remove flag
1848            spin_unlock(&handle->fhandle->lock);
1849            return FDB_RESULT_SUCCESS;
1850
1851        } else {
1852            // the default KV store but not the root handle .. normally close
1853            spin_lock(&handle->fhandle->lock);
1854            fs = _fdb_close(handle);
1855            if (fs == FDB_RESULT_SUCCESS) {
1856                // remove from 'handles' list in the root node
1857                if (handle->kvs) {
1858                    fdb_kvs_info_free(handle);
1859                }
1860                list_remove(handle->fhandle->handles, &handle->node->le);
1861                spin_unlock(&handle->fhandle->lock);
1862                free(handle->node);
1863                free(handle);
1864            } else {
1865                spin_unlock(&handle->fhandle->lock);
1866            }
1867            return fs;
1868        }
1869    }
1870
1871    if (handle->kvs && handle->kvs->root == NULL) {
1872        return FDB_RESULT_INVALID_ARGS;
1873    }
1874    fs = _fdb_kvs_close(handle);
1875    if (fs == FDB_RESULT_SUCCESS) {
1876        fdb_kvs_info_free(handle);
1877        free(handle);
1878    }
1879    return fs;
1880}
1881
1882static
1883fdb_status _fdb_kvs_remove(fdb_file_handle *fhandle,
1884                           const char *kvs_name,
1885                           bool rollback_recreate)
1886{
1887    size_t size_chunk, size_id;
1888    uint8_t *_kv_id;
1889    fdb_status fs = FDB_RESULT_SUCCESS;
1890    fdb_kvs_id_t kv_id = 0;
1891    fdb_kvs_handle *root_handle;
1892    struct avl_node *a = NULL;
1893    struct filemgr *file;
1894    struct kvs_node *node, query;
1895    struct kvs_header *kv_header;
1896
1897    if (!fhandle) {
1898        return FDB_RESULT_INVALID_ARGS;
1899    }
1900    root_handle = fhandle->root;
1901
1902    if (root_handle->config.multi_kv_instances == false) {
1903        // cannot remove the KV instance under single DB instance mode
1904        return FDB_RESULT_INVALID_CONFIG;
1905    }
1906    if (root_handle->kvs->type != KVS_ROOT) {
1907        return FDB_RESULT_INVALID_HANDLE;
1908    }
1909
1910fdb_kvs_remove_start:
1911    if (!rollback_recreate) {
1912        fdb_check_file_reopen(root_handle, NULL);
1913        filemgr_mutex_lock(root_handle->file);
1914        fdb_sync_db_header(root_handle);
1915
1916        if (filemgr_is_rollback_on(root_handle->file)) {
1917            filemgr_mutex_unlock(root_handle->file);
1918            return FDB_RESULT_FAIL_BY_ROLLBACK;
1919        }
1920    } else {
1921        filemgr_mutex_lock(root_handle->file);
1922    }
1923
1924    file = root_handle->file;
1925
1926    file_status_t fstatus = filemgr_get_file_status(file);
1927    if (fstatus == FILE_REMOVED_PENDING) {
1928        // we must not write into this file
1929        // file status was changed by other thread .. start over
1930        filemgr_mutex_unlock(file);
1931        goto fdb_kvs_remove_start;
1932    } else if (fstatus == FILE_COMPACT_OLD) {
1933        // Cannot remove existing KV store during compaction.
1934        // To remove a KV store, the corresponding first chunk in HB+trie
1935        // should be unlinked. This can be possible in the old file during
1936        // compaction, but impossible in the new file, since existing documents
1937        // (including docs belonging to the KV store to be removed) are being moved.
1938        filemgr_mutex_unlock(file);
1939        return FDB_RESULT_FAIL_BY_COMPACTION;
1940    }
1941
1942    // find the kvs_node and remove
1943
1944    // search by name to get ID
1945    if (kvs_name == NULL || !strcmp(kvs_name, default_kvs_name)) {
1946        if (!rollback_recreate) {
1947            // default KV store .. KV ID = 0
1948            kv_id = 0;
1949            if (_fdb_kvs_any_handle_opened(fhandle, kv_id)) {
1950                // there is an opened handle
1951                filemgr_mutex_unlock(file);
1952                return FDB_RESULT_KV_STORE_BUSY;
1953            }
1954        }
1955        // reset KVS stats (excepting for WAL stats)
1956        file->header.stat.ndocs = 0;
1957        file->header.stat.nlivenodes = 0;
1958        file->header.stat.datasize = 0;
1959        file->header.stat.deltasize = 0;
1960
1961        // reset seqnum
1962        filemgr_set_seqnum(file, 0);
1963    } else {
1964        kv_header = file->kv_header;
1965        spin_lock(&kv_header->lock);
1966        query.kvs_name = (char*)kvs_name;
1967        a = avl_search(kv_header->idx_name, &query.avl_name, _kvs_cmp_name);
1968        if (a == NULL) { // KV name doesn't exist
1969            spin_unlock(&kv_header->lock);
1970            filemgr_mutex_unlock(file);
1971            return FDB_RESULT_KV_STORE_NOT_FOUND;
1972        }
1973        node = _get_entry(a, struct kvs_node, avl_name);
1974        kv_id = node->id;
1975
1976        if (!rollback_recreate) {
1977            spin_unlock(&kv_header->lock);
1978            if (_fdb_kvs_any_handle_opened(fhandle, kv_id)) {
1979                // there is an opened handle
1980                filemgr_mutex_unlock(file);
1981                return FDB_RESULT_KV_STORE_BUSY;
1982            }
1983            spin_lock(&kv_header->lock);
1984
1985            avl_remove(kv_header->idx_name, &node->avl_name);
1986            avl_remove(kv_header->idx_id, &node->avl_id);
1987            --kv_header->num_kv_stores;
1988            spin_unlock(&kv_header->lock);
1989
1990            kv_id = node->id;
1991
1992            // free node
1993            free(node->kvs_name);
1994            free(node);
1995        } else {
1996            // reset all stats except for WAL
1997            node->stat.ndocs = 0;
1998            node->stat.nlivenodes = 0;
1999            node->stat.datasize = 0;
2000            node->stat.deltasize = 0;
2001            node->seqnum = 0;
2002            spin_unlock(&kv_header->lock);
2003        }
2004    }
2005
2006    // discard all WAL entries
2007    wal_close_kv_ins(file, kv_id, &root_handle->log_callback);
2008
2009    bid_t dirty_idtree_root = BLK_NOT_FOUND;
2010    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
2011    struct filemgr_dirty_update_node *prev_node = NULL, *new_node = NULL;
2012
2013    _fdb_dirty_update_ready(root_handle, &prev_node, &new_node,
2014                            &dirty_idtree_root, &dirty_seqtree_root, false);
2015
2016    size_id = sizeof(fdb_kvs_id_t);
2017    size_chunk = root_handle->trie->chunksize;
2018
2019    // remove from super handle's HB+trie
2020    _kv_id = alca(uint8_t, size_chunk);
2021    kvid2buf(size_chunk, kv_id, _kv_id);
2022    hbtrie_remove_partial(root_handle->trie, _kv_id, size_chunk);
2023    btreeblk_end(root_handle->bhandle);
2024
2025    if (root_handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2026        _kv_id = alca(uint8_t, size_id);
2027        kvid2buf(size_id, kv_id, _kv_id);
2028        hbtrie_remove_partial(root_handle->seqtrie, _kv_id, size_id);
2029        btreeblk_end(root_handle->bhandle);
2030    }
2031
2032    _fdb_dirty_update_finalize(root_handle, prev_node, new_node,
2033                               &dirty_idtree_root, &dirty_seqtree_root, true);
2034
2035    // append system doc
2036    root_handle->kv_info_offset = fdb_kvs_header_append(root_handle);
2037
2038    // if no compaction is being performed, append header and commit
2039    if (root_handle->file == file) {
2040        root_handle->cur_header_revnum = fdb_set_file_header(root_handle, true);
2041        fs = filemgr_commit(root_handle->file,
2042                !(root_handle->config.durability_opt & FDB_DRB_ASYNC),
2043                &root_handle->log_callback);
2044    }
2045
2046    filemgr_mutex_unlock(file);
2047
2048    return fs;
2049}
2050
2051bool _fdb_kvs_is_busy(fdb_file_handle *fhandle)
2052{
2053    bool ret = false;
2054    struct filemgr *file = fhandle->root->file;
2055    struct avl_node *a;
2056    struct filemgr_fhandle_idx_node *fhandle_node;
2057    fdb_file_handle *file_handle;
2058
2059    spin_lock(&file->fhandle_idx_lock);
2060    a = avl_first(&file->fhandle_idx);
2061    while (a) {
2062        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
2063        a = avl_next(a);
2064        file_handle = (fdb_file_handle *) fhandle_node->fhandle;
2065        spin_lock(&file_handle->lock);
2066        if (list_begin(file_handle->handles) != NULL) {
2067            ret = true;
2068            spin_unlock(&file_handle->lock);
2069            break;
2070        }
2071        spin_unlock(&file_handle->lock);
2072    }
2073    spin_unlock(&file->fhandle_idx_lock);
2074
2075    return ret;
2076}
2077
2078fdb_status fdb_kvs_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
2079{
2080    fdb_config config;
2081    fdb_kvs_config kvs_config;
2082    fdb_kvs_handle *handle_in, *handle, *super_handle;
2083    fdb_status fs;
2084    fdb_seqnum_t old_seqnum;
2085    fdb_file_handle *fhandle;
2086    char *kvs_name;
2087
2088    if (!handle_ptr) {
2089        return FDB_RESULT_INVALID_ARGS;
2090    }
2091
2092    handle_in = *handle_ptr;
2093    if (!handle_in->kvs) {
2094        return FDB_RESULT_INVALID_ARGS;
2095    }
2096    super_handle = handle_in->kvs->root;
2097    fhandle = handle_in->fhandle;
2098    config = handle_in->config;
2099    kvs_config = handle_in->kvs_config;
2100
2101    if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
2102        return fdb_log(&handle_in->log_callback,
2103                       FDB_RESULT_RONLY_VIOLATION,
2104                       "Warning: Rollback is not allowed on "
2105                       "the read-only DB file '%s'.",
2106                       handle_in->file->filename);
2107    }
2108
2109    filemgr_mutex_lock(handle_in->file);
2110    filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
2111    // All transactions should be closed before rollback
2112    if (wal_txn_exists(handle_in->file)) {
2113        filemgr_set_rollback(handle_in->file, 0);
2114        filemgr_mutex_unlock(handle_in->file);
2115        return FDB_RESULT_FAIL_BY_TRANSACTION;
2116    }
2117
2118    // If compaction is running, wait until it is aborted.
2119    // TODO: Find a better way of waiting for the compaction abortion.
2120    unsigned int sleep_time = 10000; // 10 ms.
2121    file_status_t fstatus = filemgr_get_file_status(handle_in->file);
2122    while (fstatus == FILE_COMPACT_OLD) {
2123        filemgr_mutex_unlock(handle_in->file);
2124        decaying_usleep(&sleep_time, 1000000);
2125        filemgr_mutex_lock(handle_in->file);
2126        fstatus = filemgr_get_file_status(handle_in->file);
2127    }
2128    if (fstatus == FILE_REMOVED_PENDING) {
2129        filemgr_mutex_unlock(handle_in->file);
2130        fdb_check_file_reopen(handle_in, NULL);
2131    } else {
2132        filemgr_mutex_unlock(handle_in->file);
2133    }
2134
2135    fdb_sync_db_header(handle_in);
2136
2137    // if the max sequence number seen by this handle is lower than the
2138    // requested snapshot marker, it means the snapshot is not yet visible
2139    // even via the current fdb_kvs_handle
2140    if (seqnum > handle_in->seqnum) {
2141        filemgr_set_rollback(super_handle->file, 0); // allow mutations
2142        return FDB_RESULT_NO_DB_INSTANCE;
2143    }
2144
2145    kvs_name = _fdb_kvs_get_name(handle_in, handle_in->file);
2146    if (seqnum == 0) { // Handle special case of rollback to zero..
2147        fs = _fdb_kvs_remove(fhandle, kvs_name, true /*recreate!*/);
2148        filemgr_set_rollback(super_handle->file, 0); // allow mutations
2149        return fs;
2150    }
2151
2152    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
2153    if (!handle) { // LCOV_EXCL_START
2154        filemgr_set_rollback(handle_in->file, 0); // allow mutations
2155        return FDB_RESULT_ALLOC_FAIL;
2156    } // LCOV_EXCL_STOP
2157
2158    handle->max_seqnum = seqnum;
2159    handle->log_callback = handle_in->log_callback;
2160    handle->fhandle = fhandle;
2161    atomic_init_uint8_t(&handle->handle_busy, 0);
2162
2163    if (handle_in->kvs->type == KVS_SUB) {
2164        fs = _fdb_kvs_open(handle_in->kvs->root,
2165                           &config,
2166                           &kvs_config,
2167                           handle_in->file,
2168                           handle_in->file->filename,
2169                           kvs_name,
2170                           handle);
2171    } else {
2172        fs = _fdb_open(handle, handle_in->file->filename,
2173                       FDB_AFILENAME, &config);
2174    }
2175    filemgr_set_rollback(handle_in->file, 0); // allow mutations
2176
2177    if (fs == FDB_RESULT_SUCCESS) {
2178        // get KV instance's sub B+trees' root node BIDs
2179        // from both ID-tree and Seq-tree, AND
2180        // replace current handle's sub B+trees' root node BIDs
2181        // by old BIDs
2182        size_t size_chunk, size_id;
2183        bid_t id_root, seq_root, dummy;
2184        uint8_t *_kv_id;
2185        hbtrie_result hr;
2186
2187        size_chunk = handle->trie->chunksize;
2188        size_id = sizeof(fdb_kvs_id_t);
2189
2190        filemgr_mutex_lock(handle_in->file);
2191
2192        // read root BID of the KV instance from the old handle
2193        // and overwrite into the current handle
2194        _kv_id = alca(uint8_t, size_chunk);
2195        kvid2buf(size_chunk, handle->kvs->id, _kv_id);
2196        hr = hbtrie_find_partial(handle->trie, _kv_id,
2197                                 size_chunk, &id_root);
2198        btreeblk_end(handle->bhandle);
2199        if (hr == HBTRIE_RESULT_SUCCESS) {
2200            hbtrie_insert_partial(super_handle->trie,
2201                                  _kv_id, size_chunk,
2202                                  &id_root, &dummy);
2203        } else { // No Trie info in rollback header.
2204                 // Erase kv store from super handle's main index.
2205            hbtrie_remove_partial(super_handle->trie, _kv_id, size_chunk);
2206        }
2207        btreeblk_end(super_handle->bhandle);
2208
2209        if (config.seqtree_opt == FDB_SEQTREE_USE) {
2210            // same as above for seq-trie
2211            _kv_id = alca(uint8_t, size_id);
2212            kvid2buf(size_id, handle->kvs->id, _kv_id);
2213            hr = hbtrie_find_partial(handle->seqtrie, _kv_id,
2214                                     size_id, &seq_root);
2215            btreeblk_end(handle->bhandle);
2216            if (hr == HBTRIE_RESULT_SUCCESS) {
2217                hbtrie_insert_partial(super_handle->seqtrie,
2218                                      _kv_id, size_id,
2219                                      &seq_root, &dummy);
2220            } else { // No seqtrie info in rollback header.
2221                     // Erase kv store from super handle's seqtrie index.
2222                hbtrie_remove_partial(super_handle->seqtrie, _kv_id, size_id);
2223            }
2224            btreeblk_end(super_handle->bhandle);
2225        }
2226
2227        old_seqnum = fdb_kvs_get_seqnum(handle_in->file,
2228                                        handle_in->kvs->id);
2229        fdb_kvs_set_seqnum(handle_in->file,
2230                           handle_in->kvs->id, seqnum);
2231        handle_in->seqnum = seqnum;
2232        filemgr_mutex_unlock(handle_in->file);
2233
2234        fs = _fdb_commit(super_handle, FDB_COMMIT_MANUAL_WAL_FLUSH,
2235                         !(handle_in->config.durability_opt & FDB_DRB_ASYNC));
2236        if (fs == FDB_RESULT_SUCCESS) {
2237            _fdb_kvs_close(handle);
2238            *handle_ptr = handle_in;
2239            fdb_kvs_info_free(handle);
2240            free(handle);
2241        } else {
2242            // cancel the rolling-back of the sequence number
2243            fdb_log(&handle_in->log_callback, fs,
2244                    "Rollback failed due to a commit failure with a sequence "
2245                    "number %" _F64, seqnum);
2246            filemgr_mutex_lock(handle_in->file);
2247            fdb_kvs_set_seqnum(handle_in->file,
2248                               handle_in->kvs->id, old_seqnum);
2249            filemgr_mutex_unlock(handle_in->file);
2250            _fdb_kvs_close(handle);
2251            fdb_kvs_info_free(handle);
2252            free(handle);
2253        }
2254    } else {
2255        free(handle);
2256    }
2257
2258    return fs;
2259}
2260
2261LIBFDB_API
2262fdb_status fdb_kvs_remove(fdb_file_handle *fhandle,
2263                          const char *kvs_name)
2264{
2265    return _fdb_kvs_remove(fhandle, kvs_name, false);
2266}
2267
2268LIBFDB_API
2269fdb_status fdb_get_kvs_info(fdb_kvs_handle *handle, fdb_kvs_info *info)
2270{
2271    uint64_t ndocs;
2272    uint64_t ndeletes;
2273    uint64_t wal_docs;
2274    uint64_t wal_deletes;
2275    uint64_t wal_n_inserts;
2276    uint64_t datasize;
2277    uint64_t nlivenodes;
2278    fdb_kvs_id_t kv_id;
2279    struct avl_node *a;
2280    struct filemgr *file;
2281    struct kvs_node *node, query;
2282    struct kvs_header *kv_header;
2283    struct kvs_stat stat;
2284
2285    if (!handle || !info) {
2286        return FDB_RESULT_INVALID_ARGS;
2287    }
2288
2289    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2290        return FDB_RESULT_HANDLE_BUSY;
2291    }
2292
2293    if (!handle->shandle) { // snapshot handle should be immutable
2294        fdb_check_file_reopen(handle, NULL);
2295        fdb_sync_db_header(handle);
2296    }
2297
2298    file = handle->file;
2299
2300    if (handle->kvs == NULL) {
2301        info->name = default_kvs_name;
2302        kv_id = 0;
2303
2304    } else {
2305        kv_header = file->kv_header;
2306        kv_id = handle->kvs->id;
2307        spin_lock(&kv_header->lock);
2308
2309        query.id = handle->kvs->id;
2310        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
2311        if (a) { // sub handle
2312            node = _get_entry(a, struct kvs_node, avl_id);
2313            info->name = (const char*)node->kvs_name;
2314        } else { // root handle
2315            info->name = default_kvs_name;
2316        }
2317        spin_unlock(&kv_header->lock);
2318    }
2319
2320    if (handle->shandle) {
2321        // snapshot .. get its local stats
2322        snap_get_stat(handle->shandle, &stat);
2323    } else {
2324        _kvs_stat_get(file, kv_id, &stat);
2325    }
2326    ndocs = stat.ndocs;
2327    ndeletes = stat.ndeletes;
2328    wal_docs = stat.wal_ndocs;
2329    wal_deletes = stat.wal_ndeletes;
2330    wal_n_inserts = wal_docs - wal_deletes;
2331
2332    if (ndocs + wal_n_inserts < wal_deletes) {
2333        info->doc_count = 0;
2334    } else {
2335        if (ndocs) { // not accurate since some ndocs may be in wal_n_inserts
2336            info->doc_count = ndocs + wal_n_inserts - wal_deletes;
2337        } else { // this is accurate
2338            info->doc_count = wal_n_inserts;
2339        }
2340    }
2341
2342    if (ndeletes) { // not accurate since some ndeletes may be wal_n_deletes
2343        info->deleted_count = ndeletes + wal_deletes;
2344    } else { // this is accurate
2345        info->deleted_count = wal_deletes;
2346    }
2347
2348    datasize = stat.datasize;
2349    nlivenodes = stat.nlivenodes;
2350
2351    info->space_used = datasize;
2352    info->space_used += nlivenodes * handle->config.blocksize;
2353    info->file = handle->fhandle;
2354
2355    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
2356
2357    // This is another LIBFDB_API call, so handle is marked as free
2358    // in the line above before making this call
2359    fdb_get_kvs_seqnum(handle, &info->last_seqnum);
2360
2361    return FDB_RESULT_SUCCESS;
2362}
2363
2364LIBFDB_API
2365fdb_status fdb_get_kvs_ops_info(fdb_kvs_handle *handle, fdb_kvs_ops_info *info)
2366{
2367    fdb_kvs_id_t kv_id;
2368    struct filemgr *file;
2369    struct kvs_ops_stat stat;
2370    struct kvs_ops_stat root_stat;
2371    fdb_kvs_handle *root_handle = handle->fhandle->root;
2372
2373    if (!handle || !info) {
2374        return FDB_RESULT_INVALID_ARGS;
2375    }
2376
2377    // for snapshot handle do not reopen new file as user is interested in
2378    // reader stats from the old file
2379    if (!handle->shandle) {
2380        // always get stats from the latest file
2381        fdb_check_file_reopen(handle, NULL);
2382        fdb_sync_db_header(handle);
2383    }
2384
2385    file = handle->file;
2386
2387    if (handle->kvs == NULL) {
2388        kv_id = 0;
2389    } else {
2390        kv_id = handle->kvs->id;
2391    }
2392
2393    _kvs_ops_stat_get(file, kv_id, &stat);
2394
2395    if (root_handle != handle) {
2396        _kvs_ops_stat_get(file, 0, &root_stat);
2397    } else {
2398        root_stat = stat;
2399    }
2400
2401    info->num_sets = atomic_get_uint64_t(&stat.num_sets, std::memory_order_relaxed);
2402    info->num_dels = atomic_get_uint64_t(&stat.num_dels, std::memory_order_relaxed);
2403    info->num_gets = atomic_get_uint64_t(&stat.num_gets, std::memory_order_relaxed);
2404    info->num_iterator_gets = atomic_get_uint64_t(&stat.num_iterator_gets,
2405                                                  std::memory_order_relaxed);
2406    info->num_iterator_gets = atomic_get_uint64_t(&stat.num_iterator_gets,
2407                                                  std::memory_order_relaxed);
2408    info->num_iterator_moves = atomic_get_uint64_t(&stat.num_iterator_moves,
2409                                                   std::memory_order_relaxed);
2410
2411    info->num_commits = atomic_get_uint64_t(&root_stat.num_commits,
2412                                            std::memory_order_relaxed);
2413    info->num_compacts = atomic_get_uint64_t(&root_stat.num_compacts,
2414                                             std::memory_order_relaxed);
2415    return FDB_RESULT_SUCCESS;
2416}
2417
2418fdb_status fdb_get_kvs_name_list(fdb_file_handle *fhandle,
2419                                 fdb_kvs_name_list *kvs_name_list)
2420{
2421    size_t num, size, offset;
2422    char *ptr;
2423    char **segment;
2424    fdb_kvs_handle *root_handle;
2425    struct kvs_header *kv_header;
2426    struct kvs_node *node;
2427    struct avl_node *a;
2428
2429    if (!fhandle || !kvs_name_list) {
2430        return FDB_RESULT_INVALID_ARGS;
2431    }
2432
2433    root_handle = fhandle->root;
2434    kv_header = root_handle->file->kv_header;
2435
2436    spin_lock(&kv_header->lock);
2437    // sum all lengths of KVS names first
2438    // (to calculate the size of memory segment to be allocated)
2439    num = 1;
2440    size = strlen(default_kvs_name) + 1;
2441    a = avl_first(kv_header->idx_id);
2442    while (a) {
2443        node = _get_entry(a, struct kvs_node, avl_id);
2444        a = avl_next(&node->avl_id);
2445
2446        num++;
2447        size += strlen(node->kvs_name) + 1;
2448    }
2449    size += num * sizeof(char*);
2450
2451    // allocate memory segment
2452    segment = (char**)calloc(1, size);
2453    kvs_name_list->num_kvs_names = num;
2454    kvs_name_list->kvs_names = segment;
2455
2456    ptr = (char*)segment + num * sizeof(char*);
2457    offset = num = 0;
2458
2459    // copy default KVS name
2460    strcpy(ptr + offset, default_kvs_name);
2461    segment[num] = ptr + offset;
2462    num++;
2463    offset += strlen(default_kvs_name) + 1;
2464
2465    // copy the others
2466    a = avl_first(kv_header->idx_name);
2467    while (a) {
2468        node = _get_entry(a, struct kvs_node, avl_name);
2469        a = avl_next(&node->avl_name);
2470
2471        strcpy(ptr + offset, node->kvs_name);
2472        segment[num] = ptr + offset;
2473
2474        num++;
2475        offset += strlen(node->kvs_name) + 1;
2476    }
2477
2478    spin_unlock(&kv_header->lock);
2479
2480    return FDB_RESULT_SUCCESS;
2481}
2482
2483LIBFDB_API
2484fdb_status fdb_free_kvs_name_list(fdb_kvs_name_list *kvs_name_list)
2485{
2486    if (!kvs_name_list) {
2487        return FDB_RESULT_INVALID_ARGS;
2488    }
2489
2490    free(kvs_name_list->kvs_names);
2491    kvs_name_list->kvs_names = NULL;
2492    kvs_name_list->num_kvs_names = 0;
2493
2494    return FDB_RESULT_SUCCESS;
2495}
2496
2497stale_header_info fdb_get_smallest_active_header(fdb_kvs_handle *handle)
2498{
2499    uint8_t *hdr_buf = alca(uint8_t, handle->config.blocksize);
2500    size_t i, hdr_len;
2501    uint64_t n_headers;
2502    bid_t hdr_bid, last_wal_bid;
2503    filemgr_header_revnum_t hdr_revnum;
2504    filemgr_header_revnum_t cur_revnum;
2505    filemgr_magic_t magic;
2506    fdb_seqnum_t seqnum;
2507    fdb_file_handle *fhandle = NULL;
2508    stale_header_info ret;
2509    struct avl_node *a;
2510    struct filemgr_fhandle_idx_node *fhandle_node;
2511    struct list_elem *e;
2512    struct kvs_opened_node *item;
2513
2514    ret.revnum = cur_revnum = handle->fhandle->root->cur_header_revnum;
2515    ret.bid = handle->fhandle->root->last_hdr_bid;
2516
2517    spin_lock(&handle->file->fhandle_idx_lock);
2518
2519    // check all opened file handles
2520    a = avl_first(&handle->file->fhandle_idx);
2521    while (a) {
2522        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
2523        a = avl_next(a);
2524
2525        fhandle = (fdb_file_handle*)fhandle_node->fhandle;
2526        spin_lock(&fhandle->lock);
2527        // check all opened KVS handles belonging to the file handle
2528        e = list_begin(fhandle->handles);
2529        while (e) {
2530
2531            item = _get_entry(e, struct kvs_opened_node, le);
2532            e = list_next(e);
2533
2534            if (item->handle->cur_header_revnum < ret.revnum) {
2535                ret.revnum = item->handle->cur_header_revnum;
2536                ret.bid = item->handle->last_hdr_bid;
2537            }
2538        }
2539        spin_unlock(&fhandle->lock);
2540    }
2541
2542    spin_unlock(&handle->file->fhandle_idx_lock);
2543
2544    uint64_t num_keeping_headers =
2545        atomic_get_uint64_t(&handle->file->config->num_keeping_headers,
2546                            std::memory_order_relaxed);
2547    if (num_keeping_headers) {
2548        // backward scan previous header info to keep more headers
2549
2550        if (ret.bid == handle->last_hdr_bid) {
2551            // header in 'handle->last_hdr_bid' is not written into file yet!
2552            // we should start from the previous header
2553            hdr_bid = atomic_get_uint64_t(&handle->file->header.bid);
2554            hdr_revnum = handle->file->header.revnum;
2555        } else {
2556            hdr_bid = ret.bid;
2557            hdr_revnum = ret.revnum;
2558        }
2559
2560        n_headers= num_keeping_headers;
2561        if (cur_revnum - hdr_revnum < n_headers) {
2562            n_headers = n_headers - (cur_revnum - hdr_revnum);
2563        } else {
2564            n_headers = 0;
2565        }
2566
2567        for (i=0; i<n_headers; ++i) {
2568            hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
2569                         hdr_buf, &hdr_len, &seqnum, &hdr_revnum, NULL,
2570                         &magic, NULL, &handle->log_callback);
2571            if (hdr_len) {
2572                ret.revnum = hdr_revnum;
2573                ret.bid = hdr_bid;
2574            } else {
2575                break;
2576            }
2577        }
2578    }
2579
2580    // although we keep more headers from the oldest active header, we have to
2581    // preserve the last WAL flushing header from the target header for data
2582    // consistency.
2583    uint64_t dummy64;
2584    char *new_filename;
2585
2586    filemgr_fetch_header(handle->file, ret.bid, hdr_buf, &hdr_len, &seqnum,
2587                         &hdr_revnum, NULL, &magic, NULL, &handle->log_callback);
2588    fdb_fetch_header(magic, hdr_buf, &dummy64, &dummy64, &dummy64, &dummy64,
2589                     &dummy64, &dummy64, &dummy64, &last_wal_bid, &dummy64,
2590                     &dummy64, &new_filename, NULL);
2591
2592    if (last_wal_bid != BLK_NOT_FOUND) {
2593        filemgr_fetch_header(handle->file, last_wal_bid, hdr_buf, &hdr_len, &seqnum,
2594                             &hdr_revnum, NULL, &magic, NULL, &handle->log_callback);
2595        ret.bid = last_wal_bid;
2596        ret.revnum = hdr_revnum;
2597    } else {
2598        // WAL has not been flushed yet .. we cannot trigger block reusing
2599        ret.bid = BLK_NOT_FOUND;
2600        ret.revnum = 0;
2601    }
2602
2603    return ret;
2604}
2605
2606