xref: /5.5.2/forestdb/src/kv_instance.cc (revision e4615599)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdlib.h>
19#include <string.h>
20
21#include "libforestdb/forestdb.h"
22#include "common.h"
23#include "internal_types.h"
24#include "fdb_internal.h"
25#include "configuration.h"
26#include "avltree.h"
27#include "list.h"
28#include "docio.h"
29#include "filemgr.h"
30#include "wal.h"
31#include "hbtrie.h"
32#include "btreeblock.h"
33#include "version.h"
34#include "staleblock.h"
35
36#include "memleak.h"
37#include "timing.h"
38#include "time_utils.h"
39
40static const char *default_kvs_name = DEFAULT_KVS_NAME;
41
42// list element for opened KV store handles
43// (in-memory data: managed by the file handle)
44struct kvs_opened_node {
45    fdb_kvs_handle *handle;
46    struct list_elem le;
47};
48
49// list element for custom cmp functions in fhandle
50struct cmp_func_node {
51    char *kvs_name;
52    fdb_custom_cmp_variable func;
53    struct list_elem le;
54};
55
56static int _kvs_cmp_name(struct avl_node *a, struct avl_node *b, void *aux)
57{
58    struct kvs_node *aa, *bb;
59    aa = _get_entry(a, struct kvs_node, avl_name);
60    bb = _get_entry(b, struct kvs_node, avl_name);
61    return strcmp(aa->kvs_name, bb->kvs_name);
62}
63
64static int _kvs_cmp_id(struct avl_node *a, struct avl_node *b, void *aux)
65{
66    struct kvs_node *aa, *bb;
67    aa = _get_entry(a, struct kvs_node, avl_id);
68    bb = _get_entry(b, struct kvs_node, avl_id);
69
70    if (aa->id < bb->id) {
71        return -1;
72    } else if (aa->id > bb->id) {
73        return 1;
74    } else {
75        return 0;
76    }
77}
78
79struct kvs_opened_node *_fdb_kvs_createNLinkKVHandle(fdb_file_handle *fhandle,
80                                                     fdb_kvs_handle *handle)
81{
82    struct kvs_opened_node *opened_node = (struct kvs_opened_node *)
83        calloc(1, sizeof(struct kvs_opened_node));
84    opened_node->handle = handle;
85
86    handle->node = opened_node;
87    spin_lock(&fhandle->lock);
88    list_push_back(fhandle->handles, &opened_node->le);
89    spin_unlock(&fhandle->lock);
90    return opened_node;
91}
92
93static bool _fdb_kvs_any_handle_opened(fdb_file_handle *fhandle,
94                                       fdb_kvs_id_t kv_id)
95{
96    struct filemgr *file = fhandle->root->file;
97    struct avl_node *a;
98    struct list_elem *e;
99    struct filemgr_fhandle_idx_node *fhandle_node;
100    struct kvs_opened_node *opened_node;
101    fdb_file_handle *file_handle;
102
103    spin_lock(&file->fhandle_idx_lock);
104    a = avl_first(&file->fhandle_idx);
105    while (a) {
106        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
107        a = avl_next(a);
108        file_handle = (fdb_file_handle *) fhandle_node->fhandle;
109        spin_lock(&file_handle->lock);
110        e = list_begin(file_handle->handles);
111        while (e) {
112            opened_node = _get_entry(e, struct kvs_opened_node, le);
113            if ((opened_node->handle->kvs && opened_node->handle->kvs->id == kv_id) ||
114                (kv_id == 0 && opened_node->handle->kvs == NULL)) // single KVS mode
115            {
116                // there is an opened handle
117                spin_unlock(&file_handle->lock);
118                spin_unlock(&file->fhandle_idx_lock);
119                return true;
120            }
121            e = list_next(e);
122        }
123        spin_unlock(&file_handle->lock);
124    }
125    spin_unlock(&file->fhandle_idx_lock);
126
127    return false;
128}
129
130void fdb_file_handle_init(fdb_file_handle *fhandle,
131                           fdb_kvs_handle *root)
132{
133    fhandle->root = root;
134    fhandle->flags = 0x0;
135    root->fhandle = fhandle;
136    fhandle->handles = (struct list*)calloc(1, sizeof(struct list));
137    fhandle->cmp_func_list = NULL;
138    spin_init(&fhandle->lock);
139}
140
141void fdb_file_handle_close_all(fdb_file_handle *fhandle)
142{
143    struct list_elem *e;
144    struct kvs_opened_node *node;
145
146    spin_lock(&fhandle->lock);
147    e = list_begin(fhandle->handles);
148    while (e) {
149        node = _get_entry(e, struct kvs_opened_node, le);
150        e = list_next(e);
151        _fdb_close(node->handle);
152        free(node->handle);
153        free(node);
154    }
155    spin_unlock(&fhandle->lock);
156}
157
158void fdb_file_handle_parse_cmp_func(fdb_file_handle *fhandle,
159                                    size_t n_func,
160                                    char **kvs_names,
161                                    fdb_custom_cmp_variable *functions)
162{
163    uint64_t i;
164    struct cmp_func_node *node;
165
166    if (n_func == 0 || !kvs_names || !functions) {
167        return;
168    }
169
170    fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
171    list_init(fhandle->cmp_func_list);
172
173    for (i=0;i<n_func;++i){
174        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
175        if (kvs_names[i]) {
176            node->kvs_name = (char*)calloc(1, strlen(kvs_names[i])+1);
177            strcpy(node->kvs_name, kvs_names[i]);
178        } else {
179            // NULL .. default KVS
180            node->kvs_name = NULL;
181        }
182        node->func = functions[i];
183        list_push_back(fhandle->cmp_func_list, &node->le);
184    }
185}
186
187// clone all items in cmp_func_list to fhandle->cmp_func_list
188void fdb_file_handle_clone_cmp_func_list(fdb_file_handle *fhandle,
189                                         struct list *cmp_func_list)
190{
191    struct list_elem *e;
192    struct cmp_func_node *src, *dst;
193
194    if (fhandle->cmp_func_list || /* already exist */
195        !cmp_func_list) {
196        return;
197    }
198
199    fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
200    list_init(fhandle->cmp_func_list);
201
202    e = list_begin(cmp_func_list);
203    while (e) {
204        src = _get_entry(e, struct cmp_func_node, le);
205        dst = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
206        if (src->kvs_name) {
207            dst->kvs_name = (char*)calloc(1, strlen(src->kvs_name)+1);
208            strcpy(dst->kvs_name, src->kvs_name);
209        } else {
210            dst->kvs_name = NULL; // default KVS
211        }
212        dst->func = src->func;
213        list_push_back(fhandle->cmp_func_list, &dst->le);
214        e = list_next(&src->le);
215    }
216}
217
218void fdb_file_handle_add_cmp_func(fdb_file_handle *fhandle,
219                                  char *kvs_name,
220                                  fdb_custom_cmp_variable cmp_func)
221{
222    struct cmp_func_node *node;
223
224    // create list if not exist
225    if (!fhandle->cmp_func_list) {
226        fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
227        list_init(fhandle->cmp_func_list);
228    }
229
230    node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
231    if (kvs_name) {
232        node->kvs_name = (char*)calloc(1, strlen(kvs_name)+1);
233        strcpy(node->kvs_name, kvs_name);
234    } else {
235        // default KVS
236        node->kvs_name = NULL;
237    }
238    node->func = cmp_func;
239    list_push_back(fhandle->cmp_func_list, &node->le);
240}
241
242void fdb_cmp_func_list_from_filemgr(struct filemgr *file, struct list *cmp_func_list)
243{
244    if (!file || !file->kv_header || !cmp_func_list) {
245        return;
246    }
247
248    struct cmp_func_node *node;
249
250    spin_lock(&file->kv_header->lock);
251    // Default KV store cmp function
252    if (file->kv_header->default_kvs_cmp) {
253        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
254        node->func = file->kv_header->default_kvs_cmp;
255        node->kvs_name = NULL;
256        list_push_back(cmp_func_list, &node->le);
257    }
258
259    // Rest of KV stores
260    struct kvs_node *kvs_node;
261    struct avl_node *a = avl_first(file->kv_header->idx_name);
262    while (a) {
263        kvs_node = _get_entry(a, struct kvs_node, avl_name);
264        a = avl_next(a);
265        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
266        node->func = kvs_node->custom_cmp;
267        node->kvs_name = (char*)calloc(1, strlen(kvs_node->kvs_name)+1);
268        strcpy(node->kvs_name, kvs_node->kvs_name);
269        list_push_back(cmp_func_list, &node->le);
270    }
271    spin_unlock(&file->kv_header->lock);
272}
273
274void fdb_free_cmp_func_list(struct list *cmp_func_list)
275{
276    if (!cmp_func_list) {
277        return;
278    }
279
280    struct cmp_func_node *cmp_node;
281    struct list_elem *e = list_begin(cmp_func_list);
282    while (e) {
283        cmp_node = _get_entry(e, struct cmp_func_node, le);
284        e = list_remove(cmp_func_list, &cmp_node->le);
285        free(cmp_node->kvs_name);
286        free(cmp_node);
287    }
288}
289
290static void _free_cmp_func_list(fdb_file_handle *fhandle)
291{
292    struct list_elem *e;
293    struct cmp_func_node *cmp_node;
294
295    if (!fhandle->cmp_func_list) {
296        return;
297    }
298
299    e = list_begin(fhandle->cmp_func_list);
300    while (e) {
301        cmp_node = _get_entry(e, struct cmp_func_node, le);
302        e = list_remove(fhandle->cmp_func_list, &cmp_node->le);
303
304        free(cmp_node->kvs_name);
305        free(cmp_node);
306    }
307    free(fhandle->cmp_func_list);
308    fhandle->cmp_func_list = NULL;
309}
310
311void fdb_file_handle_free(fdb_file_handle *fhandle)
312{
313    free(fhandle->handles);
314    _free_cmp_func_list(fhandle);
315    spin_destroy(&fhandle->lock);
316    free(fhandle);
317}
318
319fdb_status fdb_kvs_cmp_check(fdb_kvs_handle *handle)
320{
321    int ori_flag;
322    fdb_file_handle *fhandle = handle->fhandle;
323    fdb_custom_cmp_variable ori_custom_cmp;
324    struct filemgr *file = handle->file;
325    struct cmp_func_node *cmp_node;
326    struct kvs_node *kvs_node, query;
327    struct list_elem *e;
328    struct avl_node *a;
329
330    spin_lock(&file->kv_header->lock);
331    ori_flag = file->kv_header->custom_cmp_enabled;
332    ori_custom_cmp = file->kv_header->default_kvs_cmp;
333
334    if (fhandle->cmp_func_list) {
335        handle->kvs_config.custom_cmp = NULL;
336
337        e = list_begin(fhandle->cmp_func_list);
338        while (e) {
339            cmp_node = _get_entry(e, struct cmp_func_node, le);
340            if (cmp_node->kvs_name == NULL ||
341                    !strcmp(cmp_node->kvs_name, default_kvs_name)) { // default KVS
342                handle->kvs_config.custom_cmp = cmp_node->func;
343                file->kv_header->default_kvs_cmp = cmp_node->func;
344                file->kv_header->custom_cmp_enabled = 1;
345            } else {
346                // search by name
347                query.kvs_name = cmp_node->kvs_name;
348                a = avl_search(file->kv_header->idx_name,
349                               &query.avl_name,
350                               _kvs_cmp_name);
351                if (a) { // found
352                    kvs_node = _get_entry(a, struct kvs_node, avl_name);
353                    if (!kvs_node->custom_cmp) {
354                        kvs_node->custom_cmp = cmp_node->func;
355                    }
356                    file->kv_header->custom_cmp_enabled = 1;
357                }
358            }
359            e = list_next(&cmp_node->le);
360        }
361    }
362
363    // first check the default KVS
364    // 1. root handle has not been opened yet: don't care
365    // 2. root handle was opened before: must match the flag
366    if (fhandle->flags & FHANDLE_ROOT_INITIALIZED) {
367        if (fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP &&
368            handle->kvs_config.custom_cmp == NULL) {
369            // custom cmp function was assigned before,
370            // but no custom cmp function is assigned
371            file->kv_header->custom_cmp_enabled = ori_flag;
372            file->kv_header->default_kvs_cmp = ori_custom_cmp;
373            spin_unlock(&file->kv_header->lock);
374            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
375            if (!kvs_name) {
376                kvs_name = DEFAULT_KVS_NAME;
377            }
378            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
379                           "Error! Tried to open a KV store '%s', which was created with "
380                           "custom compare function enabled, without passing the same "
381                           "custom compare function.", kvs_name);
382        }
383        if (!(fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP) &&
384              handle->kvs_config.custom_cmp) {
385            // custom cmp function was not assigned before,
386            // but custom cmp function is assigned from user
387            file->kv_header->custom_cmp_enabled = ori_flag;
388            file->kv_header->default_kvs_cmp = ori_custom_cmp;
389            spin_unlock(&file->kv_header->lock);
390            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
391            if (!kvs_name) {
392                kvs_name = DEFAULT_KVS_NAME;
393            }
394            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
395                           "Error! Tried to open a KV store '%s', which was created without "
396                           "custom compare function, by passing custom compare function.",
397                    kvs_name);
398        }
399    }
400
401    // next check other KVSs
402    a = avl_first(file->kv_header->idx_name);
403    while (a) {
404        kvs_node = _get_entry(a, struct kvs_node, avl_name);
405        a = avl_next(a);
406
407        if (kvs_node->flags & KVS_FLAG_CUSTOM_CMP &&
408            kvs_node->custom_cmp == NULL) {
409            // custom cmp function was assigned before,
410            // but no custom cmp function is assigned
411            file->kv_header->custom_cmp_enabled = ori_flag;
412            file->kv_header->default_kvs_cmp = ori_custom_cmp;
413            spin_unlock(&file->kv_header->lock);
414            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
415            if (!kvs_name) {
416                kvs_name = DEFAULT_KVS_NAME;
417            }
418            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
419                           "Error! Tried to open a KV store '%s', which was created with "
420                           "custom compare function enabled, without passing the same "
421                           "custom compare function.", kvs_name);
422        }
423        if (!(kvs_node->flags & KVS_FLAG_CUSTOM_CMP) &&
424              kvs_node->custom_cmp) {
425            // custom cmp function was not assigned before,
426            // but custom cmp function is assigned from user
427            file->kv_header->custom_cmp_enabled = ori_flag;
428            file->kv_header->default_kvs_cmp = ori_custom_cmp;
429            spin_unlock(&file->kv_header->lock);
430            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
431            if (!kvs_name) {
432                kvs_name = DEFAULT_KVS_NAME;
433            }
434            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
435                           "Error! Tried to open a KV store '%s', which was created without "
436                           "custom compare function, by passing custom compare function.",
437                           kvs_name);
438        }
439    }
440
441    spin_unlock(&file->kv_header->lock);
442    return FDB_RESULT_SUCCESS;
443}
444
445fdb_custom_cmp_variable fdb_kvs_find_cmp_name(fdb_kvs_handle *handle,
446                                              char *kvs_name)
447{
448    fdb_file_handle *fhandle;
449    struct list_elem *e;
450    struct cmp_func_node *cmp_node;
451
452    fhandle = handle->fhandle;
453    if (!fhandle->cmp_func_list) {
454        return NULL;
455    }
456
457    e = list_begin(fhandle->cmp_func_list);
458    while (e) {
459        cmp_node = _get_entry(e, struct cmp_func_node, le);
460        if (kvs_name == NULL ||
461            !strcmp(kvs_name, default_kvs_name)) {
462            if (cmp_node->kvs_name == NULL ||
463                !strcmp(cmp_node->kvs_name, default_kvs_name)) { // default KVS
464                return cmp_node->func;
465            }
466        } else if (cmp_node->kvs_name &&
467                   !strcmp(cmp_node->kvs_name, kvs_name)) {
468            return cmp_node->func;
469        }
470        e = list_next(&cmp_node->le);
471    }
472    return NULL;
473}
474
475hbtrie_cmp_func *fdb_kvs_find_cmp_chunk(void *chunk, void *aux)
476{
477    fdb_kvs_id_t kv_id;
478    struct hbtrie *trie = (struct hbtrie *)aux;
479    struct btreeblk_handle *bhandle;
480    struct filemgr *file;
481    struct avl_node *a;
482    struct kvs_node query, *node;
483
484    bhandle = (struct btreeblk_handle*)trie->btreeblk_handle;
485    file = bhandle->file;
486
487    if (!file->kv_header->custom_cmp_enabled) {
488        return NULL;
489    }
490
491    buf2kvid(trie->chunksize, chunk, &kv_id);
492
493    // search by id
494    if (kv_id > 0) {
495        query.id = kv_id;
496        spin_lock(&file->kv_header->lock);
497        a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
498        spin_unlock(&file->kv_header->lock);
499
500        if (a) {
501            node = _get_entry(a, struct kvs_node, avl_id);
502            return (hbtrie_cmp_func *)node->custom_cmp;
503        }
504    } else {
505        // root handle
506        return (hbtrie_cmp_func *)file->kv_header->default_kvs_cmp;
507    }
508    return NULL;
509}
510
511void _fdb_kvs_init_root(fdb_kvs_handle *handle, struct filemgr *file) {
512    handle->kvs->type = KVS_ROOT;
513    handle->kvs->root = handle->fhandle->root;
514    // super handle's ID is always 0
515    handle->kvs->id = 0;
516    // force custom cmp function
517    spin_lock(&file->kv_header->lock);
518    handle->kvs_config.custom_cmp = file->kv_header->default_kvs_cmp;
519    spin_unlock(&file->kv_header->lock);
520}
521
522void fdb_kvs_info_create(fdb_kvs_handle *root_handle,
523                         fdb_kvs_handle *handle,
524                         struct filemgr *file,
525                         const char *kvs_name)
526{
527    struct kvs_node query, *kvs_node;
528    struct kvs_opened_node *opened_node;
529    struct avl_node *a;
530
531    handle->kvs = (struct kvs_info*)calloc(1, sizeof(struct kvs_info));
532
533    if (root_handle == NULL) {
534        // 'handle' is a super handle
535        _fdb_kvs_init_root(handle, file);
536    } else {
537        // 'handle' is a sub handle (i.e., KV instance in a DB instance)
538        handle->kvs->type = KVS_SUB;
539        handle->kvs->root = root_handle;
540
541        if (kvs_name) {
542            spin_lock(&file->kv_header->lock);
543            query.kvs_name = (char*)kvs_name;
544            a = avl_search(file->kv_header->idx_name, &query.avl_name,
545                           _kvs_cmp_name);
546            if (a == NULL) {
547                // KV instance name is not found
548                free(handle->kvs);
549                handle->kvs = NULL;
550                spin_unlock(&file->kv_header->lock);
551                return;
552            }
553            kvs_node = _get_entry(a, struct kvs_node, avl_name);
554            handle->kvs->id = kvs_node->id;
555            // force custom cmp function
556            handle->kvs_config.custom_cmp = kvs_node->custom_cmp;
557            spin_unlock(&file->kv_header->lock);
558        } else {
559            // snapshot of the root handle
560            handle->kvs->id = 0;
561        }
562
563        opened_node = (struct kvs_opened_node *)
564               calloc(1, sizeof(struct kvs_opened_node));
565        opened_node->handle = handle;
566
567        handle->node = opened_node;
568        spin_lock(&root_handle->fhandle->lock);
569        list_push_back(root_handle->fhandle->handles, &opened_node->le);
570        spin_unlock(&root_handle->fhandle->lock);
571    }
572}
573
574void fdb_kvs_info_free(fdb_kvs_handle *handle)
575{
576    if (handle->kvs == NULL) {
577        return;
578    }
579
580    free(handle->kvs);
581    handle->kvs = NULL;
582}
583
584void _fdb_kvs_header_create(struct kvs_header **kv_header_ptr)
585{
586    struct kvs_header *kv_header;
587
588    kv_header = (struct kvs_header *)calloc(1, sizeof(struct kvs_header));
589    *kv_header_ptr = kv_header;
590
591    // KV ID '0' is reserved for default KV instance (super handle)
592    kv_header->id_counter = 1;
593    kv_header->default_kvs_cmp = NULL;
594    kv_header->custom_cmp_enabled = 0;
595    kv_header->idx_name = (struct avl_tree*)malloc(sizeof(struct avl_tree));
596    kv_header->idx_id = (struct avl_tree*)malloc(sizeof(struct avl_tree));
597    kv_header->num_kv_stores = 0;
598    avl_init(kv_header->idx_name, NULL);
599    avl_init(kv_header->idx_id, NULL);
600    spin_init(&kv_header->lock);
601}
602
603void fdb_kvs_header_create(struct filemgr *file)
604{
605    if (file->kv_header) {
606        return; // already exist
607    }
608
609    _fdb_kvs_header_create(&file->kv_header);
610    file->free_kv_header = fdb_kvs_header_free;
611}
612
613void fdb_kvs_header_reset_all_stats(struct filemgr *file)
614{
615    struct avl_node *a;
616    struct kvs_node *node;
617    struct kvs_header *kv_header = file->kv_header;
618
619    spin_lock(&kv_header->lock);
620    a = avl_first(kv_header->idx_id);
621    while (a) {
622        node = _get_entry(a, struct kvs_node, avl_id);
623        a = avl_next(&node->avl_id);
624        memset(&node->stat, 0x0, sizeof(node->stat));
625    }
626    spin_unlock(&kv_header->lock);
627}
628
629void fdb_kvs_header_copy(fdb_kvs_handle *handle,
630                         struct filemgr *new_file,
631                         struct docio_handle *new_dhandle,
632                         uint64_t *new_file_kv_info_offset,
633                         bool create_new)
634{
635    struct avl_node *a, *aa;
636    struct kvs_node *node_old, *node_new;
637
638    if (create_new) {
639        struct kvs_header *kv_header;
640        // copy KV header data in 'handle' to new file
641        _fdb_kvs_header_create(&kv_header);
642        // read from 'handle->dhandle', and import into 'new_file'
643        fdb_kvs_header_read(kv_header, handle->dhandle,
644                            handle->kv_info_offset, handle->file->version, false);
645
646        // write KV header in 'new_file' using 'new_dhandle'
647        uint64_t new_kv_info_offset;
648        fdb_kvs_handle new_handle;
649        new_handle.file = new_file;
650        new_handle.dhandle = new_dhandle;
651        new_handle.kv_info_offset = BLK_NOT_FOUND;
652        new_kv_info_offset = fdb_kvs_header_append(&new_handle);
653        if (new_file_kv_info_offset) {
654            *new_file_kv_info_offset = new_kv_info_offset;
655        }
656
657        if (!filemgr_set_kv_header(new_file, kv_header, fdb_kvs_header_free)) {
658            // LCOV_EXCL_START
659            _fdb_kvs_header_free(kv_header);
660        } // LCOV_EXCL_STOP
661        fdb_kvs_header_reset_all_stats(new_file);
662    }
663
664    spin_lock(&handle->file->kv_header->lock);
665    spin_lock(&new_file->kv_header->lock);
666    // copy all in-memory custom cmp function pointers & seqnums
667    new_file->kv_header->default_kvs_cmp =
668        handle->file->kv_header->default_kvs_cmp;
669    new_file->kv_header->custom_cmp_enabled =
670        handle->file->kv_header->custom_cmp_enabled;
671    a = avl_first(handle->file->kv_header->idx_id);
672    while (a) {
673        node_old = _get_entry(a, struct kvs_node, avl_id);
674        aa = avl_search(new_file->kv_header->idx_id,
675                        &node_old->avl_id, _kvs_cmp_id);
676        assert(aa); // MUST exist
677        node_new = _get_entry(aa, struct kvs_node, avl_id);
678        node_new->custom_cmp = node_old->custom_cmp;
679        node_new->seqnum = node_old->seqnum;
680        node_new->op_stat = node_old->op_stat;
681        a = avl_next(a);
682    }
683    spin_unlock(&new_file->kv_header->lock);
684    spin_unlock(&handle->file->kv_header->lock);
685}
686
687// export KV header info to raw data
688static void _fdb_kvs_header_export(struct kvs_header *kv_header,
689                                   void **data, size_t *len, uint64_t version)
690{
691    /* << raw data structure >>
692     * [# KV instances]:        8 bytes
693     * [current KV ID counter]: 8 bytes
694     * ---
695     * [name length]:           2 bytes
696     * [instance name]:         x bytes
697     * [instance ID]:           8 bytes
698     * [sequence number]:       8 bytes
699     * [# live index nodes]:    8 bytes
700     * [# docs]:                8 bytes
701     * [data size]:             8 bytes
702     * [flags]:                 8 bytes
703     * [delta size]:            8 bytes (since MAGIC_001)
704     * [# deleted docs]:        8 bytes (since MAGIC_001)
705     * ...
706     *    Please note that if the above format is changed, please also change...
707     *    _fdb_kvs_get_snap_info()
708     *    _fdb_kvs_header_import()
709     *    _kvs_stat_get_sum_doc()
710     *    _kvs_stat_get_sum_attr
711     */
712
713    int size = 0;
714    int offset = 0;
715    uint16_t name_len, _name_len;
716    uint64_t c = 0;
717    uint64_t _n_kv, _kv_id, _flags;
718    uint64_t _nlivenodes, _ndocs, _datasize, _ndeletes;
719    int64_t _deltasize;
720    fdb_kvs_id_t _id_counter;
721    fdb_seqnum_t _seqnum;
722    struct kvs_node *node;
723    struct avl_node *a;
724
725    if (kv_header == NULL) {
726        *data = NULL;
727        *len = 0;
728        return ;
729    }
730
731    spin_lock(&kv_header->lock);
732
733    // pre-scan to estimate the size of data
734    size += sizeof(uint64_t);
735    size += sizeof(fdb_kvs_id_t);
736    a = avl_first(kv_header->idx_name);
737    while(a) {
738        node = _get_entry(a, struct kvs_node, avl_name);
739        c++;
740        size += sizeof(uint16_t); // length
741        size += strlen(node->kvs_name)+1; // name
742        size += sizeof(node->id); // ID
743        size += sizeof(node->seqnum); // seq number
744        size += sizeof(node->stat.nlivenodes); // # live index nodes
745        size += sizeof(node->stat.ndocs); // # docs
746        size += sizeof(node->stat.datasize); // data size
747        size += sizeof(node->flags); // flags
748        if (ver_is_atleast_magic_001(version)) {
749            size += sizeof(node->stat.deltasize); // delta size since commit
750            size += sizeof(node->stat.ndeletes); // # deleted docs
751        }
752        a = avl_next(a);
753    }
754
755    *data = (void *)malloc(size);
756
757    // # KV instances
758    _n_kv = _endian_encode(c);
759    memcpy((uint8_t*)*data + offset, &_n_kv, sizeof(_n_kv));
760    offset += sizeof(_n_kv);
761
762    // ID counter
763    _id_counter = _endian_encode(kv_header->id_counter);
764    memcpy((uint8_t*)*data + offset, &_id_counter, sizeof(_id_counter));
765    offset += sizeof(_id_counter);
766
767    a = avl_first(kv_header->idx_name);
768    while(a) {
769        node = _get_entry(a, struct kvs_node, avl_name);
770
771        // name length
772        name_len = strlen(node->kvs_name)+1;
773        _name_len = _endian_encode(name_len);
774        memcpy((uint8_t*)*data + offset, &_name_len, sizeof(_name_len));
775        offset += sizeof(_name_len);
776
777        // name
778        memcpy((uint8_t*)*data + offset, node->kvs_name, name_len);
779        offset += name_len;
780
781        // KV ID
782        _kv_id = _endian_encode(node->id);
783        memcpy((uint8_t*)*data + offset, &_kv_id, sizeof(_kv_id));
784        offset += sizeof(_kv_id);
785
786        // seq number
787        _seqnum = _endian_encode(node->seqnum);
788        memcpy((uint8_t*)*data + offset, &_seqnum, sizeof(_seqnum));
789        offset += sizeof(_seqnum);
790
791        // # live index nodes
792        _nlivenodes = _endian_encode(node->stat.nlivenodes);
793        memcpy((uint8_t*)*data + offset, &_nlivenodes, sizeof(_nlivenodes));
794        offset += sizeof(_nlivenodes);
795
796        // # docs
797        _ndocs = _endian_encode(node->stat.ndocs);
798        memcpy((uint8_t*)*data + offset, &_ndocs, sizeof(_ndocs));
799        offset += sizeof(_ndocs);
800
801        // datasize
802        _datasize = _endian_encode(node->stat.datasize);
803        memcpy((uint8_t*)*data + offset, &_datasize, sizeof(_datasize));
804        offset += sizeof(_datasize);
805
806        // flags
807        _flags = _endian_encode(node->flags);
808        memcpy((uint8_t*)*data + offset, &_flags, sizeof(_flags));
809        offset += sizeof(_flags);
810
811        if (ver_is_atleast_magic_001(version)) {
812            // # delta index nodes + docsize created after last commit
813            _deltasize = _endian_encode(node->stat.deltasize);
814            memcpy((uint8_t*)*data + offset, &_deltasize, sizeof(_deltasize));
815            offset += sizeof(_deltasize);
816
817            // # deleted documents
818            _ndeletes = _endian_encode(node->stat.ndeletes);
819            memcpy((uint8_t*)*data + offset, &_ndeletes, sizeof(_ndeletes));
820            offset += sizeof(_ndeletes);
821        }
822
823        a = avl_next(a);
824    }
825
826    *len = size;
827
828    spin_unlock(&kv_header->lock);
829}
830
831void _fdb_kvs_header_import(struct kvs_header *kv_header,
832                            void *data, size_t len, uint64_t version,
833                            bool only_seq_nums)
834{
835    uint64_t i, offset = 0;
836    uint16_t name_len, _name_len;
837    uint64_t n_kv, _n_kv, kv_id, _kv_id, flags, _flags;
838    uint64_t _nlivenodes, _ndocs, _datasize, _ndeletes;
839    int64_t _deltasize;
840    bool is_deltasize;
841    fdb_kvs_id_t id_counter, _id_counter;
842    fdb_seqnum_t seqnum, _seqnum;
843    struct kvs_node *node;
844
845    // # KV instances
846    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
847    offset += sizeof(_n_kv);
848    n_kv = _endian_decode(_n_kv);
849
850    // ID counter
851    memcpy(&_id_counter, (uint8_t*)data + offset, sizeof(_id_counter));
852    offset += sizeof(_id_counter);
853    id_counter = _endian_decode(_id_counter);
854
855    spin_lock(&kv_header->lock);
856    kv_header->id_counter = id_counter;
857
858    // Version control
859    if (!ver_is_atleast_magic_001(version)) {
860        is_deltasize = false;
861        _deltasize = 0;
862        _ndeletes = 0;
863    } else {
864        is_deltasize = true;
865    }
866
867    for (i=0;i<n_kv;++i){
868        // name length
869        uint64_t name_offset;
870        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
871        offset += sizeof(_name_len);
872        name_offset = offset;
873        name_len = _endian_decode(_name_len);
874
875        // name
876        offset += name_len;
877
878        // KV ID
879        memcpy(&_kv_id, (uint8_t*)data + offset, sizeof(_kv_id));
880        offset += sizeof(_kv_id);
881        kv_id = _endian_decode(_kv_id);
882
883        // Search if a given KV header node exists or not.
884        struct kvs_node query;
885        query.id = kv_id;
886        struct avl_node *a = avl_search(kv_header->idx_id, &query.avl_id,
887                                        _kvs_cmp_id);
888        if (a) {
889            node = _get_entry(a, struct kvs_node, avl_id);
890        } else {
891            node = (struct kvs_node *)calloc(1, sizeof(struct kvs_node));
892            node->kvs_name = (char *)malloc(name_len);
893            memcpy(node->kvs_name, (uint8_t*)data + name_offset, name_len);
894            node->id = kv_id;
895            _init_op_stats(&node->op_stat);
896        }
897
898        // seq number
899        memcpy(&_seqnum, (uint8_t*)data + offset, sizeof(_seqnum));
900        offset += sizeof(_seqnum);
901        seqnum = _endian_decode(_seqnum);
902        node->seqnum = seqnum;
903
904        // # live index nodes
905        memcpy(&_nlivenodes, (uint8_t*)data + offset, sizeof(_nlivenodes));
906        offset += sizeof(_nlivenodes);
907
908        // # docs
909        memcpy(&_ndocs, (uint8_t*)data + offset, sizeof(_ndocs));
910        offset += sizeof(_ndocs);
911
912        // datasize
913        memcpy(&_datasize, (uint8_t*)data + offset, sizeof(_datasize));
914        offset += sizeof(_datasize);
915
916        // flags
917        memcpy(&_flags, (uint8_t*)data + offset, sizeof(_flags));
918        offset += sizeof(_flags);
919        flags = _endian_decode(_flags);
920
921        if (is_deltasize) {
922            // delta document + index size since previous commit
923            memcpy(&_deltasize, (uint8_t*)data + offset,
924                   sizeof(_deltasize));
925            offset += sizeof(_deltasize);
926            memcpy(&_ndeletes, (uint8_t*)data + offset,
927                   sizeof(_ndeletes));
928            offset += sizeof(_ndeletes);
929        }
930
931        if (!only_seq_nums) {
932            node->stat.nlivenodes = _endian_decode(_nlivenodes);
933            node->stat.ndocs = _endian_decode(_ndocs);
934            node->stat.datasize = _endian_decode(_datasize);
935            node->stat.deltasize = _endian_decode(_deltasize);
936            node->stat.ndeletes = _endian_decode(_ndeletes);
937            node->flags = flags;
938            node->custom_cmp = NULL;
939        }
940
941        if (!a) { // Insert a new KV header node if not exist.
942            avl_insert(kv_header->idx_name, &node->avl_name, _kvs_cmp_name);
943            avl_insert(kv_header->idx_id, &node->avl_id, _kvs_cmp_id);
944            ++kv_header->num_kv_stores;
945        }
946    }
947    spin_unlock(&kv_header->lock);
948}
949
950fdb_status _fdb_kvs_get_snap_info(void *data, uint64_t version,
951                                  fdb_snapshot_info_t *snap_info)
952{
953    int i, offset = 0, sizeof_skipped_segments;
954    uint16_t name_len, _name_len;
955    int64_t n_kv, _n_kv;
956    bool is_deltasize;
957    fdb_seqnum_t _seqnum;
958    // Version control
959    if (!ver_is_atleast_magic_001(version)) {
960        is_deltasize = false;
961    } else {
962        is_deltasize = true;
963    }
964
965    // # KV instances
966    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
967    offset += sizeof(_n_kv);
968    // since n_kv doesn't count the default KVS, increase it by 1.
969    n_kv = _endian_decode(_n_kv) + 1;
970    assert(n_kv); // Must have at least one kv instance
971    snap_info->kvs_markers = (fdb_kvs_commit_marker_t *)malloc(
972                                   (n_kv) * sizeof(fdb_kvs_commit_marker_t));
973    if (!snap_info->kvs_markers) { // LCOV_EXCL_START
974        return FDB_RESULT_ALLOC_FAIL;
975    } // LCOV_EXCL_STOP
976
977    snap_info->num_kvs_markers = n_kv;
978
979    // Skip over ID counter
980    offset += sizeof(fdb_kvs_id_t);
981
982    sizeof_skipped_segments = sizeof(uint64_t) // seqnum will be the last read
983                            + sizeof(uint64_t) // skip over nlivenodes
984                            + sizeof(uint64_t) // skip over ndocs
985                            + sizeof(uint64_t) // skip over datasize
986                            + sizeof(uint64_t); // skip over flags
987    if (is_deltasize) {
988        sizeof_skipped_segments += sizeof(uint64_t); // skip over deltasize
989        sizeof_skipped_segments += sizeof(uint64_t); // skip over ndeletes
990    }
991
992    for (i = 0; i < n_kv-1; ++i){
993        fdb_kvs_commit_marker_t *info = &snap_info->kvs_markers[i];
994        // Read the kv store name length
995        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
996        offset += sizeof(_name_len);
997        name_len = _endian_decode(_name_len);
998
999        // Retrieve the KV Store name
1000        info->kv_store_name = (char *)malloc(name_len); // TODO: cleanup if err
1001        memcpy(info->kv_store_name, (uint8_t*)data + offset, name_len);
1002        offset += name_len;
1003
1004        // Skip over KV ID
1005        offset += sizeof(uint64_t);
1006
1007        // Retrieve the KV Store Commit Sequence number
1008        memcpy(&_seqnum, (uint8_t*)data + offset, sizeof(_seqnum));
1009        info->seqnum = _endian_decode(_seqnum);
1010
1011        // Skip over seqnum, nlivenodes, ndocs, datasize, flags etc onto next..
1012        offset += sizeof_skipped_segments;
1013    }
1014
1015    return FDB_RESULT_SUCCESS;
1016}
1017
1018uint64_t _kvs_stat_get_sum_attr(void *data, uint64_t version,
1019                                kvs_stat_attr_t attr)
1020{
1021    uint64_t ret = 0;
1022    int i, offset = 0;
1023    uint16_t name_len, _name_len;
1024    int64_t n_kv, _n_kv;
1025    bool is_deltasize;
1026    uint64_t nlivenodes, ndocs, datasize, flags;
1027    int64_t deltasize;
1028
1029    // Version control
1030    if (!ver_is_atleast_magic_001(version)) {
1031        is_deltasize = false;
1032    } else {
1033        is_deltasize = true;
1034    }
1035
1036    // # KV instances
1037    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
1038    offset += sizeof(_n_kv);
1039    // since n_kv doesn't count the default KVS, increase it by 1.
1040    n_kv = _endian_decode(_n_kv) + 1;
1041    assert(n_kv); // Must have at least one kv instance
1042
1043    // Skip over ID counter
1044    offset += sizeof(fdb_kvs_id_t);
1045
1046    for (i = 0; i < n_kv-1; ++i){
1047        // Read the kv store name length and skip over the length
1048        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
1049        offset += sizeof(_name_len);
1050        name_len = _endian_decode(_name_len);
1051
1052        // Skip over the KV Store name
1053        offset += name_len;
1054
1055        // Skip over KV ID
1056        offset += sizeof(uint64_t);
1057
1058        // Skip over KV store seqnum
1059        offset += sizeof(uint64_t);
1060
1061        // pick just the attribute requested, skipping over rest..
1062        if (attr == KVS_STAT_NLIVENODES) {
1063            memcpy(&nlivenodes, (uint8_t *)data + offset, sizeof(nlivenodes));
1064            ret += _endian_decode(nlivenodes);
1065            // skip over nlivenodes just read
1066            offset += sizeof(nlivenodes);
1067            // skip over ndocs, datasize, flags (and deltasize, ndeletes)
1068            offset += sizeof(nlivenodes) + sizeof(ndocs) + sizeof(datasize)
1069                   + sizeof(flags) + (is_deltasize ? sizeof(deltasize)*2 : 0);
1070        } else if (attr == KVS_STAT_DATASIZE) {
1071            offset += sizeof(nlivenodes) + sizeof(ndocs);
1072            memcpy(&datasize, (uint8_t *)data + offset, sizeof(datasize));
1073            ret += _endian_decode(datasize);
1074            // skip over datasize, flags (and deltasize, ndeletes)
1075            offset += sizeof(datasize) + sizeof(flags)
1076                   + (is_deltasize ? sizeof(deltasize)*2 : 0);
1077        } else if (attr == KVS_STAT_DELTASIZE) {
1078            if (is_deltasize) {
1079                offset += sizeof(nlivenodes) + sizeof(ndocs) + sizeof (datasize)
1080                        + sizeof(flags);
1081                memcpy(&deltasize, (uint8_t *)data + offset, sizeof(deltasize));
1082                ret += _endian_decode(deltasize);
1083                // skip over datasize, flags (and deltasize)
1084                offset += sizeof(deltasize)*2; // and ndeletes
1085            }
1086        } else { // Attribute fetched not implemented yet..
1087            fdb_assert(false, 0, attr); // Implement fetch for this attribute
1088        }
1089    }
1090
1091    return ret;
1092}
1093
1094uint64_t fdb_kvs_header_append(fdb_kvs_handle *handle)
1095{
1096    char *doc_key = alca(char, 32);
1097    void *data;
1098    size_t len;
1099    uint64_t kv_info_offset, prev_offset;
1100    struct docio_object doc;
1101    struct docio_length doc_len;
1102    struct filemgr *file = handle->file;
1103    struct docio_handle *dhandle = handle->dhandle;
1104
1105    _fdb_kvs_header_export(file->kv_header, &data, &len, file->version);
1106
1107    prev_offset = handle->kv_info_offset;
1108
1109    memset(&doc, 0, sizeof(struct docio_object));
1110    sprintf(doc_key, "KV_header");
1111    doc.key = (void *)doc_key;
1112    doc.meta = NULL;
1113    doc.body = data;
1114    doc.length.keylen = strlen(doc_key) + 1;
1115    doc.length.metalen = 0;
1116    doc.length.bodylen = len;
1117    doc.seqnum = 0;
1118    kv_info_offset = docio_append_doc_system(dhandle, &doc);
1119    free(data);
1120
1121    if (prev_offset != BLK_NOT_FOUND) {
1122        if (docio_read_doc_length(handle->dhandle, &doc_len, prev_offset)
1123            == FDB_RESULT_SUCCESS) {
1124            // mark stale
1125            filemgr_mark_stale(handle->file, prev_offset, _fdb_get_docsize(doc_len));
1126        }
1127    }
1128
1129    return kv_info_offset;
1130}
1131
1132void fdb_kvs_header_read(struct kvs_header *kv_header,
1133                         struct docio_handle *dhandle,
1134                         uint64_t kv_info_offset,
1135                         uint64_t version,
1136                         bool only_seq_nums)
1137{
1138    int64_t offset;
1139    struct docio_object doc;
1140
1141    memset(&doc, 0, sizeof(struct docio_object));
1142    offset = docio_read_doc(dhandle, kv_info_offset, &doc, true);
1143
1144    if (offset <= 0) {
1145        fdb_log(dhandle->log_callback, (fdb_status) offset,
1146                "Failed to read a KV header with the offset %" _F64 " from a "
1147                "database file '%s'", kv_info_offset, dhandle->file->filename);
1148        return;
1149    }
1150
1151    _fdb_kvs_header_import(kv_header, doc.body, doc.length.bodylen,
1152                           version, only_seq_nums);
1153    free_docio_object(&doc, 1, 1, 1);
1154}
1155
1156fdb_seqnum_t fdb_kvs_get_committed_seqnum(fdb_kvs_handle *handle)
1157{
1158    uint8_t *buf;
1159    uint64_t dummy64;
1160    uint64_t version;
1161    uint64_t kv_info_offset;
1162    size_t len;
1163    bid_t hdr_bid;
1164    fdb_seqnum_t seqnum = SEQNUM_NOT_USED;
1165    fdb_kvs_id_t id = 0;
1166    char *compacted_filename = NULL;
1167    struct filemgr *file = handle->file;
1168
1169    buf = alca(uint8_t, file->config->blocksize);
1170
1171    if (handle->kvs && handle->kvs->id > 0) {
1172        id = handle->kvs->id;
1173    }
1174
1175    hdr_bid = filemgr_get_header_bid(file);
1176    if (hdr_bid == BLK_NOT_FOUND) {
1177        // header doesn't exist
1178        return 0;
1179    }
1180
1181    // read header
1182    filemgr_fetch_header(file, hdr_bid, buf, &len, &seqnum, NULL, NULL,
1183                         &version, NULL, &handle->log_callback);
1184    if (id > 0) { // non-default KVS
1185        // read last KVS header
1186        fdb_fetch_header(version, buf, &dummy64, &dummy64,
1187                         &dummy64, &dummy64, &dummy64, &dummy64,
1188                         &dummy64, &dummy64,
1189                         &kv_info_offset, &dummy64,
1190                         &compacted_filename, NULL);
1191
1192        int64_t doc_offset;
1193        struct kvs_header *kv_header;
1194        struct docio_object doc;
1195
1196        _fdb_kvs_header_create(&kv_header);
1197        memset(&doc, 0, sizeof(struct docio_object));
1198        doc_offset = docio_read_doc(handle->dhandle,
1199                                    kv_info_offset, &doc, true);
1200
1201        if (doc_offset <= 0) {
1202            // fail
1203            _fdb_kvs_header_free(kv_header);
1204            return 0;
1205
1206        } else {
1207            _fdb_kvs_header_import(kv_header, doc.body,
1208                                   doc.length.bodylen, version, false);
1209            // get local sequence number for the KV instance
1210            seqnum = _fdb_kvs_get_seqnum(kv_header,
1211                                         handle->kvs->id);
1212            _fdb_kvs_header_free(kv_header);
1213            free_docio_object(&doc, 1, 1, 1);
1214        }
1215    }
1216    return seqnum;
1217}
1218
1219LIBFDB_API
1220fdb_status fdb_get_kvs_seqnum(fdb_kvs_handle *handle, fdb_seqnum_t *seqnum)
1221{
1222    if (!handle) {
1223        return FDB_RESULT_INVALID_HANDLE;
1224    }
1225
1226    if (!seqnum) {
1227        return FDB_RESULT_INVALID_ARGS;
1228    }
1229
1230    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
1231        return FDB_RESULT_HANDLE_BUSY;
1232    }
1233
1234    if (handle->shandle) {
1235        // handle for snapshot
1236        // return MAX_SEQNUM instead of the file's sequence number
1237        *seqnum = handle->max_seqnum;
1238    } else {
1239        fdb_check_file_reopen(handle, NULL);
1240        fdb_sync_db_header(handle);
1241
1242        struct filemgr *file;
1243        file = handle->file;
1244
1245        if (handle->kvs == NULL ||
1246            handle->kvs->id == 0) {
1247            filemgr_mutex_lock(file);
1248            *seqnum = filemgr_get_seqnum(file);
1249            filemgr_mutex_unlock(file);
1250        } else {
1251            *seqnum = fdb_kvs_get_seqnum(file, handle->kvs->id);
1252        }
1253    }
1254    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
1255    return FDB_RESULT_SUCCESS;
1256}
1257
1258void fdb_kvs_set_seqnum(struct filemgr *file,
1259                           fdb_kvs_id_t id,
1260                           fdb_seqnum_t seqnum)
1261{
1262    struct kvs_header *kv_header = file->kv_header;
1263    struct kvs_node query, *node;
1264    struct avl_node *a;
1265
1266    if (id == 0) {
1267        // default KV instance
1268        filemgr_set_seqnum(file, seqnum);
1269        return;
1270    }
1271
1272    spin_lock(&kv_header->lock);
1273    query.id = id;
1274    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1275    node = _get_entry(a, struct kvs_node, avl_id);
1276    node->seqnum = seqnum;
1277    spin_unlock(&kv_header->lock);
1278}
1279
1280void _fdb_kvs_header_free(struct kvs_header *kv_header)
1281{
1282    struct kvs_node *node;
1283    struct avl_node *a;
1284
1285    a = avl_first(kv_header->idx_name);
1286    while (a) {
1287        node = _get_entry(a, struct kvs_node, avl_name);
1288        a = avl_next(a);
1289        avl_remove(kv_header->idx_name, &node->avl_name);
1290
1291        free(node->kvs_name);
1292        free(node);
1293    }
1294    free(kv_header->idx_name);
1295    free(kv_header->idx_id);
1296    free(kv_header);
1297}
1298
1299void fdb_kvs_header_free(struct filemgr *file)
1300{
1301    if (file->kv_header == NULL) {
1302        return;
1303    }
1304
1305    _fdb_kvs_header_free(file->kv_header);
1306    file->kv_header = NULL;
1307}
1308
1309static fdb_status _fdb_kvs_create(fdb_kvs_handle *root_handle,
1310                                  const char *kvs_name,
1311                                  fdb_kvs_config *kvs_config)
1312{
1313    int kv_ins_name_len;
1314    fdb_status fs = FDB_RESULT_SUCCESS;
1315    struct avl_node *a;
1316    struct filemgr *file;
1317    struct kvs_node *node, query;
1318    struct kvs_header *kv_header;
1319
1320    if (root_handle->config.multi_kv_instances == false) {
1321        // cannot open KV instance under single DB instance mode
1322        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_CONFIG,
1323                       "Cannot open or create KV store instance '%s' because multi-KV "
1324                       "store instance mode is disabled.",
1325                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1326    }
1327    if (root_handle->kvs->type != KVS_ROOT) {
1328        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_HANDLE,
1329                       "Cannot open or create KV store instance '%s' because the handle "
1330                       "doesn't support multi-KV sotre instance mode.",
1331                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1332    }
1333
1334fdb_kvs_create_start:
1335    fdb_check_file_reopen(root_handle, NULL);
1336    filemgr_mutex_lock(root_handle->file);
1337    fdb_sync_db_header(root_handle);
1338
1339    if (filemgr_is_rollback_on(root_handle->file)) {
1340        filemgr_mutex_unlock(root_handle->file);
1341        return FDB_RESULT_FAIL_BY_ROLLBACK;
1342    }
1343
1344    file = root_handle->file;
1345
1346    file_status_t fstatus = filemgr_get_file_status(file);
1347    if (fstatus == FILE_REMOVED_PENDING) {
1348        // we must not write into this file
1349        // file status was changed by other thread .. start over
1350        filemgr_mutex_unlock(file);
1351        goto fdb_kvs_create_start;
1352    }
1353
1354    kv_header = file->kv_header;
1355    spin_lock(&kv_header->lock);
1356
1357    // find existing KV instance
1358    // search by name
1359    query.kvs_name = (char*)kvs_name;
1360    a = avl_search(kv_header->idx_name, &query.avl_name, _kvs_cmp_name);
1361    if (a) { // KV name already exists
1362        spin_unlock(&kv_header->lock);
1363        filemgr_mutex_unlock(file);
1364        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1365                       "Failed to create KV Store '%s' as it already exists.",
1366                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1367    }
1368
1369    // create a kvs_node and insert
1370    node = (struct kvs_node *)calloc(1, sizeof(struct kvs_node));
1371    node->id = kv_header->id_counter++;
1372    node->seqnum = 0;
1373    node->flags = 0x0;
1374    _init_op_stats(&node->op_stat);
1375    // search fhandle's custom cmp func list first
1376    node->custom_cmp = fdb_kvs_find_cmp_name(root_handle,
1377                                             (char *)kvs_name);
1378    if (node->custom_cmp == NULL && kvs_config->custom_cmp) {
1379        // follow kvs_config's custom cmp next
1380        node->custom_cmp = kvs_config->custom_cmp;
1381        // if custom cmp function is given by user but
1382        // there is no corresponding function in fhandle's list
1383        // add it into the list
1384        fdb_file_handle_add_cmp_func(root_handle->fhandle,
1385                                     (char*)kvs_name,
1386                                     kvs_config->custom_cmp);
1387    }
1388    if (node->custom_cmp) { // custom cmp function is used
1389        node->flags |= KVS_FLAG_CUSTOM_CMP;
1390        kv_header->custom_cmp_enabled = 1;
1391    }
1392    kv_ins_name_len = strlen(kvs_name)+1;
1393    node->kvs_name = (char *)malloc(kv_ins_name_len);
1394    strcpy(node->kvs_name, kvs_name);
1395
1396    avl_insert(kv_header->idx_name, &node->avl_name, _kvs_cmp_name);
1397    avl_insert(kv_header->idx_id, &node->avl_id, _kvs_cmp_id);
1398    ++kv_header->num_kv_stores;
1399    spin_unlock(&kv_header->lock);
1400
1401    // if compaction is in-progress,
1402    // create a same kvs_node for the new file
1403    if (filemgr_get_file_status(file) == FILE_COMPACT_OLD) {
1404
1405        struct filemgr *new_file = filemgr_get_instance(file->new_filename);
1406
1407        if (new_file) {
1408            struct kvs_node *node_new;
1409            struct kvs_header *kv_header_new;
1410
1411            kv_header_new = new_file->kv_header;
1412            node_new = (struct kvs_node*)calloc(1, sizeof(struct kvs_node));
1413            *node_new = *node;
1414            node_new->kvs_name = (char*)malloc(kv_ins_name_len);
1415            strcpy(node_new->kvs_name, kvs_name);
1416
1417            // insert into new file's kv_header
1418            spin_lock(&kv_header_new->lock);
1419            if (node->custom_cmp) {
1420                kv_header_new->custom_cmp_enabled = 1;
1421            }
1422            avl_insert(kv_header_new->idx_name, &node_new->avl_name, _kvs_cmp_name);
1423            avl_insert(kv_header_new->idx_id, &node_new->avl_id, _kvs_cmp_id);
1424            spin_unlock(&kv_header_new->lock);
1425        } else {
1426            // new_file should have been found if compaction is in progress
1427            fdb_assert(new_file, new_file, NULL);
1428        }
1429    }
1430
1431    // since this function calls filemgr_commit() and appends a new DB header,
1432    // we should finalize & flush the previous dirty update before commit.
1433    bid_t dirty_idtree_root = BLK_NOT_FOUND;
1434    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
1435    struct filemgr_dirty_update_node *prev_node = NULL;
1436    struct filemgr_dirty_update_node *new_node = NULL;
1437
1438    _fdb_dirty_update_ready(root_handle, &prev_node, &new_node,
1439                            &dirty_idtree_root, &dirty_seqtree_root, false);
1440
1441    _fdb_dirty_update_finalize(root_handle, prev_node, new_node,
1442                               &dirty_idtree_root, &dirty_seqtree_root, true);
1443
1444    // append system doc
1445    root_handle->kv_info_offset = fdb_kvs_header_append(root_handle);
1446
1447    // if no compaction is being performed, append header and commit
1448    if (root_handle->file == file) {
1449        uint64_t cur_bmp_revnum = sb_get_bmp_revnum(file);
1450        root_handle->last_hdr_bid = filemgr_alloc(file, &root_handle->log_callback);
1451        root_handle->cur_header_revnum = fdb_set_file_header(root_handle, true);
1452        fs = filemgr_commit_bid(root_handle->file,
1453                                root_handle->last_hdr_bid,
1454                                cur_bmp_revnum,
1455                                !(root_handle->config.durability_opt & FDB_DRB_ASYNC),
1456                                &root_handle->log_callback);
1457        btreeblk_reset_subblock_info(root_handle->bhandle);
1458    }
1459
1460    filemgr_mutex_unlock(file);
1461
1462    return fs;
1463}
1464
1465// this function just returns pointer
1466char* _fdb_kvs_get_name(fdb_kvs_handle *handle, struct filemgr *file)
1467{
1468    struct kvs_node *node, query;
1469    struct avl_node *a;
1470
1471    if (handle->kvs == NULL) {
1472        // single KV instance mode
1473        return NULL;
1474    }
1475
1476    query.id = handle->kvs->id;
1477    if (query.id == 0) { // default KV instance
1478        return NULL;
1479    }
1480    spin_lock(&file->kv_header->lock);
1481    a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1482    if (a) {
1483        node = _get_entry(a, struct kvs_node, avl_id);
1484        spin_unlock(&file->kv_header->lock);
1485        return node->kvs_name;
1486    }
1487    spin_unlock(&file->kv_header->lock);
1488    return NULL;
1489}
1490
1491// this function just returns pointer to kvs_name & offset to user key
1492const char* _fdb_kvs_extract_name_off(fdb_kvs_handle *handle, void *keybuf,
1493                                      size_t *key_offset)
1494{
1495    struct kvs_node *node, query;
1496    struct avl_node *a;
1497    fdb_kvs_id_t kv_id;
1498    struct filemgr *file = handle->file;
1499
1500    if (!handle->kvs) { // single KV instance mode
1501        *key_offset = 0;
1502        return DEFAULT_KVS_NAME;
1503    }
1504
1505    *key_offset = handle->config.chunksize;
1506    buf2kvid(*key_offset, keybuf, &kv_id);
1507    query.id = kv_id;
1508    if (query.id == 0) { // default KV instance in multi kvs mode
1509        return default_kvs_name;
1510    }
1511    spin_lock(&file->kv_header->lock);
1512    a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1513    if (a) {
1514        node = _get_entry(a, struct kvs_node, avl_id);
1515        const char *kvs_name = node->kvs_name;
1516        spin_unlock(&file->kv_header->lock);
1517        return kvs_name;
1518    }
1519    spin_unlock(&file->kv_header->lock);
1520    return NULL;
1521}
1522
1523fdb_status _fdb_kvs_clone_snapshot(fdb_kvs_handle *handle_in,
1524                                   fdb_kvs_handle *handle_out)
1525{
1526    fdb_status fs;
1527    fdb_kvs_handle *root_handle = handle_in->kvs->root;
1528
1529    if (!handle_out->kvs) {
1530        // create kvs_info
1531        handle_out->kvs = (struct kvs_info*)calloc(1, sizeof(struct kvs_info));
1532        handle_out->kvs->type = handle_in->kvs->type;
1533        handle_out->kvs->id = handle_in->kvs->id;
1534        handle_out->kvs->root = root_handle;
1535        handle_out->kvs_config.custom_cmp = handle_in->kvs_config.custom_cmp;
1536
1537        struct kvs_opened_node *opened_node = (struct kvs_opened_node *)
1538            calloc(1, sizeof(struct kvs_opened_node));
1539        opened_node->handle = handle_out;
1540        handle_out->node = opened_node;
1541
1542        spin_lock(&root_handle->fhandle->lock);
1543        list_push_back(root_handle->fhandle->handles, &opened_node->le);
1544        spin_unlock(&root_handle->fhandle->lock);
1545    }
1546
1547    fs = _fdb_clone_snapshot(handle_in, handle_out);
1548    if (fs != FDB_RESULT_SUCCESS) {
1549        if (handle_out->node) {
1550            spin_lock(&root_handle->fhandle->lock);
1551            list_remove(root_handle->fhandle->handles, &handle_out->node->le);
1552            spin_unlock(&root_handle->fhandle->lock);
1553            free(handle_out->node);
1554        }
1555        free(handle_out->kvs);
1556    }
1557    return fs;
1558}
1559
1560// 1) allocate memory & create 'handle->kvs'
1561//    by calling fdb_kvs_info_create().
1562//      -> this will allocate a corresponding node and
1563//         insert it into fhandle->handles list.
1564// 2) if matching KVS name doesn't exist, create it.
1565// 3) call _fdb_open().
1566fdb_status _fdb_kvs_open(fdb_kvs_handle *root_handle,
1567                         fdb_config *config,
1568                         fdb_kvs_config *kvs_config,
1569                         struct filemgr *file,
1570                         const char *filename,
1571                         const char *kvs_name,
1572                         fdb_kvs_handle *handle)
1573{
1574    fdb_status fs;
1575
1576    if (handle->kvs == NULL) {
1577        // create kvs_info
1578        fdb_kvs_info_create(root_handle, handle, file, kvs_name);
1579    }
1580
1581    if (handle->kvs == NULL) {
1582        // KV instance name is not found
1583        if (!kvs_config->create_if_missing) {
1584            return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1585                           "Failed to open KV store '%s' because it doesn't exist.",
1586                           kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1587        }
1588        if (root_handle->config.flags == FDB_OPEN_FLAG_RDONLY) {
1589            return fdb_log(&root_handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
1590                           "Failed to create KV store '%s' because the KV store's handle "
1591                           "is read-only.", kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1592        }
1593
1594        // create
1595        fs = _fdb_kvs_create(root_handle, kvs_name, kvs_config);
1596        if (fs != FDB_RESULT_SUCCESS) { // create fail
1597            return FDB_RESULT_INVALID_KV_INSTANCE_NAME;
1598        }
1599        // create kvs_info again
1600        fdb_kvs_info_create(root_handle, handle, file, kvs_name);
1601        if (handle->kvs == NULL) { // fail again
1602            return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1603                           "Failed to create KV store '%s' because the KV store's handle "
1604                           "is read-only.", kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1605        }
1606    }
1607    fs = _fdb_open(handle, filename, FDB_AFILENAME, config);
1608    if (fs != FDB_RESULT_SUCCESS) {
1609        if (handle->node) {
1610            spin_lock(&root_handle->fhandle->lock);
1611            list_remove(root_handle->fhandle->handles, &handle->node->le);
1612            spin_unlock(&root_handle->fhandle->lock);
1613            free(handle->node);
1614        } // 'handle->node == NULL' happens only during rollback
1615        free(handle->kvs);
1616    }
1617    return fs;
1618}
1619
1620// 1) identify whether the requested KVS is default or non-default.
1621// 2) if the requested KVS is default,
1622//   2-1) As the root handle is already opened,
1623//        -> allocate memory for handle, and call _fdb_open().
1624//        -> 'handle->kvs' will be created in _fdb_open(),
1625//           since it is treated as a default handle.
1626//        -> allocate a corresponding node and insert it into
1627//           fhandle->handles list.
1628// 3) if the requested KVS is non-default,
1629//    -> allocate memory for handle, and call _fdb_kvs_open().
1630LIBFDB_API
1631fdb_status fdb_kvs_open(fdb_file_handle *fhandle,
1632                        fdb_kvs_handle **ptr_handle,
1633                        const char *kvs_name,
1634                        fdb_kvs_config *kvs_config)
1635{
1636    fdb_kvs_handle *handle;
1637    fdb_config config;
1638    fdb_status fs;
1639    fdb_kvs_handle *root_handle;
1640    fdb_kvs_config config_local;
1641    struct filemgr *file = NULL;
1642    struct filemgr *latest_file = NULL;
1643    LATENCY_STAT_START();
1644
1645    if (!fhandle || !fhandle->root) {
1646        return FDB_RESULT_INVALID_HANDLE;
1647    }
1648
1649    root_handle = fhandle->root;
1650    config = root_handle->config;
1651
1652    if (kvs_config) {
1653        if (validate_fdb_kvs_config(kvs_config)) {
1654            config_local = *kvs_config;
1655        } else {
1656            return FDB_RESULT_INVALID_CONFIG;
1657        }
1658    } else {
1659        config_local = get_default_kvs_config();
1660    }
1661
1662    fdb_check_file_reopen(root_handle, NULL);
1663    fdb_sync_db_header(root_handle);
1664
1665    file = root_handle->file;
1666    latest_file = root_handle->file;
1667
1668    if (kvs_name == NULL || !strcmp(kvs_name, default_kvs_name)) {
1669        // return the default KV store handle
1670        spin_lock(&fhandle->lock);
1671        if (!(fhandle->flags & FHANDLE_ROOT_OPENED)) {
1672            // the root handle is not opened yet
1673            // sync up the root handle
1674            fdb_custom_cmp_variable default_kvs_cmp;
1675
1676            root_handle->kvs_config = config_local;
1677
1678            if (root_handle->file->kv_header) {
1679                // search fhandle's custom cmp func list first
1680                default_kvs_cmp = fdb_kvs_find_cmp_name(root_handle, (char *)kvs_name);
1681
1682                spin_lock(&root_handle->file->kv_header->lock);
1683                root_handle->file->kv_header->default_kvs_cmp = default_kvs_cmp;
1684
1685                if (root_handle->file->kv_header->default_kvs_cmp == NULL &&
1686                    root_handle->kvs_config.custom_cmp) {
1687                    // follow kvs_config's custom cmp next
1688                    root_handle->file->kv_header->default_kvs_cmp =
1689                        root_handle->kvs_config.custom_cmp;
1690                    fdb_file_handle_add_cmp_func(fhandle, NULL,
1691                                                 root_handle->kvs_config.custom_cmp);
1692                }
1693
1694                if (root_handle->file->kv_header->default_kvs_cmp) {
1695                    root_handle->file->kv_header->custom_cmp_enabled = 1;
1696                    fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1697                }
1698                spin_unlock(&root_handle->file->kv_header->lock);
1699            }
1700
1701            fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1702            fhandle->flags |= FHANDLE_ROOT_OPENED;
1703        }
1704        // the root handle is already synced
1705        // open new default KV store handle
1706        spin_unlock(&fhandle->lock);
1707        handle = (fdb_kvs_handle*)calloc(1, sizeof(fdb_kvs_handle));
1708        handle->kvs_config = config_local;
1709        atomic_init_uint8_t(&handle->handle_busy, 0);
1710
1711        handle->fhandle = fhandle;
1712        fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1713        if (fs != FDB_RESULT_SUCCESS) {
1714            free(handle);
1715            *ptr_handle = NULL;
1716        } else {
1717            // insert into fhandle's list
1718            _fdb_kvs_createNLinkKVHandle(fhandle, handle);
1719            *ptr_handle = handle;
1720        }
1721        LATENCY_STAT_END(file, FDB_LATENCY_KVS_OPEN);
1722        return fs;
1723    }
1724
1725    if (config.multi_kv_instances == false) {
1726        // cannot open KV instance under single DB instance mode
1727        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_CONFIG,
1728                       "Cannot open KV store instance '%s' because multi-KV "
1729                       "store instance mode is disabled.",
1730                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1731    }
1732    if (root_handle->kvs->type != KVS_ROOT) {
1733        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_HANDLE,
1734                       "Cannot open KV store instance '%s' because the handle "
1735                       "doesn't support multi-KV sotre instance mode.",
1736                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1737    }
1738    if (root_handle->shandle) {
1739        // cannot open KV instance from a snapshot
1740        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_ARGS,
1741                       "Not allowed to open KV store instance '%s' from the "
1742                       "snapshot handle.",
1743                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1744    }
1745
1746    handle = (fdb_kvs_handle *)calloc(1, sizeof(fdb_kvs_handle));
1747    if (!handle) { // LCOV_EXCL_START
1748        return FDB_RESULT_ALLOC_FAIL;
1749    } // LCOV_EXCL_STOP
1750
1751    atomic_init_uint8_t(&handle->handle_busy, 0);
1752    handle->fhandle = fhandle;
1753    fs = _fdb_kvs_open(root_handle, &config, &config_local,
1754                       latest_file, file->filename, kvs_name, handle);
1755    if (fs == FDB_RESULT_SUCCESS) {
1756        *ptr_handle = handle;
1757    } else {
1758        *ptr_handle = NULL;
1759        free(handle);
1760    }
1761    LATENCY_STAT_END(file, FDB_LATENCY_KVS_OPEN);
1762    return fs;
1763}
1764
1765LIBFDB_API
1766fdb_status fdb_kvs_open_default(fdb_file_handle *fhandle,
1767                                fdb_kvs_handle **ptr_handle,
1768                                fdb_kvs_config *config)
1769{
1770    return fdb_kvs_open(fhandle, ptr_handle, NULL, config);
1771}
1772
1773// 1) remove corresponding node from fhandle->handles list.
1774// 2) call _fdb_close().
1775fdb_status _fdb_kvs_close(fdb_kvs_handle *handle)
1776{
1777    fdb_kvs_handle *root_handle = handle->fhandle->root;
1778    fdb_status fs;
1779
1780    if (handle->node) {
1781        spin_lock(&root_handle->fhandle->lock);
1782        list_remove(root_handle->fhandle->handles, &handle->node->le);
1783        spin_unlock(&root_handle->fhandle->lock);
1784        free(handle->node);
1785    } // 'handle->node == NULL' happens only during rollback
1786
1787    fs = _fdb_close(handle);
1788    return fs;
1789}
1790
1791// close all sub-KV store handles belonging to the root handle
1792fdb_status fdb_kvs_close_all(fdb_kvs_handle *root_handle)
1793{
1794    fdb_status fs;
1795    struct list_elem *e;
1796    struct kvs_opened_node *node;
1797
1798    spin_lock(&root_handle->fhandle->lock);
1799    e = list_begin(root_handle->fhandle->handles);
1800    while (e) {
1801        node = _get_entry(e, struct kvs_opened_node, le);
1802        e = list_remove(root_handle->fhandle->handles, &node->le);
1803        fs = _fdb_close(node->handle);
1804        if (fs != FDB_RESULT_SUCCESS) {
1805            spin_unlock(&root_handle->fhandle->lock);
1806            return fs;
1807        }
1808        fdb_kvs_info_free(node->handle);
1809        free(node->handle);
1810        free(node);
1811    }
1812    spin_unlock(&root_handle->fhandle->lock);
1813
1814    return FDB_RESULT_SUCCESS;
1815}
1816
1817// 1) identify whether the requested handle is for default KVS or not.
1818// 2) if the requested handle is for the default KVS,
1819//   2-1) if the requested handle must be the root handle,
1820//        -> call _fdb_close(),
1821//        -> free 'handle->kvs' by calling fdb_kvs_info_free(),
1822//        -> remove the corresponding node from fhandle->handles list,
1823//        -> free the memory for the handle.
1824// 3) if the requested handle is for non-default KVS,
1825//    -> call _fdb_kvs_close(),
1826//       -> this will remove the node from fhandle->handles list.
1827//    -> free 'handle->kvs' by calling fdb_kvs_info_free(),
1828//    -> free the memory for the handle.
1829LIBFDB_API
1830fdb_status fdb_kvs_close(fdb_kvs_handle *handle)
1831{
1832    fdb_status fs;
1833
1834    if (!handle) {
1835        return FDB_RESULT_INVALID_HANDLE;
1836    }
1837    if (handle->num_iterators) {
1838        // There are still active iterators created from this handle
1839        return FDB_RESULT_KV_STORE_BUSY;
1840    }
1841
1842    if (handle->shandle && handle->kvs == NULL) {
1843        // snapshot of the default KV store + single KV store mode
1844        // directly close handle
1845        // (snapshot of the other KV stores will be closed
1846        //  using _fdb_kvs_close(...) below)
1847        fs = _fdb_close(handle);
1848        if (fs == FDB_RESULT_SUCCESS) {
1849            free(handle);
1850        }
1851        return fs;
1852    }
1853
1854    if (handle->kvs == NULL ||
1855        handle->kvs->type == KVS_ROOT) {
1856        // the default KV store handle
1857
1858        fdb_assert(handle->fhandle->root != handle, handle, NULL);
1859        // the default KV store but not the root handle .. normally close
1860        spin_lock(&handle->fhandle->lock);
1861        fs = _fdb_close(handle);
1862        if (fs == FDB_RESULT_SUCCESS) {
1863            // remove from 'handles' list in the root node
1864            if (handle->kvs) {
1865                fdb_kvs_info_free(handle);
1866            }
1867            list_remove(handle->fhandle->handles, &handle->node->le);
1868            spin_unlock(&handle->fhandle->lock);
1869            free(handle->node);
1870            free(handle);
1871        } else {
1872            spin_unlock(&handle->fhandle->lock);
1873        }
1874        return fs;
1875    }
1876
1877    if (handle->kvs && handle->kvs->root == NULL) {
1878        return FDB_RESULT_INVALID_ARGS;
1879    }
1880    fs = _fdb_kvs_close(handle);
1881    if (fs == FDB_RESULT_SUCCESS) {
1882        fdb_kvs_info_free(handle);
1883        free(handle);
1884    }
1885    return fs;
1886}
1887
1888static
1889fdb_status _fdb_kvs_remove(fdb_file_handle *fhandle,
1890                           const char *kvs_name,
1891                           bool rollback_recreate)
1892{
1893    size_t size_chunk, size_id;
1894    uint8_t *_kv_id;
1895    fdb_status fs = FDB_RESULT_SUCCESS;
1896    fdb_kvs_id_t kv_id = 0;
1897    fdb_kvs_handle *root_handle;
1898    struct avl_node *a = NULL;
1899    struct filemgr *file;
1900    struct kvs_node *node, query;
1901    struct kvs_header *kv_header;
1902
1903    if (!fhandle || !fhandle->root) {
1904        return FDB_RESULT_INVALID_HANDLE;
1905    }
1906
1907    root_handle = fhandle->root;
1908
1909    if (root_handle->config.multi_kv_instances == false) {
1910        // cannot remove the KV instance under single DB instance mode
1911        return FDB_RESULT_INVALID_CONFIG;
1912    }
1913    if (root_handle->kvs->type != KVS_ROOT) {
1914        return FDB_RESULT_INVALID_HANDLE;
1915    }
1916
1917fdb_kvs_remove_start:
1918    fdb_check_file_reopen(root_handle, NULL);
1919    filemgr_mutex_lock(root_handle->file);
1920    fdb_sync_db_header(root_handle);
1921
1922    if (!rollback_recreate) {
1923        if (filemgr_is_rollback_on(root_handle->file)) {
1924            filemgr_mutex_unlock(root_handle->file);
1925            return FDB_RESULT_FAIL_BY_ROLLBACK;
1926        }
1927    }
1928
1929    file = root_handle->file;
1930
1931    file_status_t fstatus = filemgr_get_file_status(file);
1932    if (fstatus == FILE_REMOVED_PENDING) {
1933        // we must not write into this file
1934        // file status was changed by other thread .. start over
1935        filemgr_mutex_unlock(file);
1936        goto fdb_kvs_remove_start;
1937    } else if (fstatus == FILE_COMPACT_OLD) {
1938        // Cannot remove existing KV store during compaction.
1939        // To remove a KV store, the corresponding first chunk in HB+trie
1940        // should be unlinked. This can be possible in the old file during
1941        // compaction, but impossible in the new file, since existing documents
1942        // (including docs belonging to the KV store to be removed) are being moved.
1943        filemgr_mutex_unlock(file);
1944        return FDB_RESULT_FAIL_BY_COMPACTION;
1945    }
1946
1947    // find the kvs_node and remove
1948
1949    // search by name to get ID
1950    if (kvs_name == NULL || !strcmp(kvs_name, default_kvs_name)) {
1951        if (!rollback_recreate) {
1952            // default KV store .. KV ID = 0
1953            kv_id = 0;
1954            if (_fdb_kvs_any_handle_opened(fhandle, kv_id)) {
1955                // there is an opened handle
1956                filemgr_mutex_unlock(file);
1957                return FDB_RESULT_KV_STORE_BUSY;
1958            }
1959        }
1960        // reset KVS stats (excepting for WAL stats)
1961        file->header.stat.ndocs = 0;
1962        file->header.stat.nlivenodes = 0;
1963        file->header.stat.datasize = 0;
1964        file->header.stat.deltasize = 0;
1965
1966        // reset seqnum
1967        filemgr_set_seqnum(file, 0);
1968    } else {
1969        kv_header = file->kv_header;
1970        spin_lock(&kv_header->lock);
1971        query.kvs_name = (char*)kvs_name;
1972        a = avl_search(kv_header->idx_name, &query.avl_name, _kvs_cmp_name);
1973        if (a == NULL) { // KV name doesn't exist
1974            spin_unlock(&kv_header->lock);
1975            filemgr_mutex_unlock(file);
1976            return FDB_RESULT_KV_STORE_NOT_FOUND;
1977        }
1978        node = _get_entry(a, struct kvs_node, avl_name);
1979        kv_id = node->id;
1980
1981        if (!rollback_recreate) {
1982            spin_unlock(&kv_header->lock);
1983            if (_fdb_kvs_any_handle_opened(fhandle, kv_id)) {
1984                // there is an opened handle
1985                filemgr_mutex_unlock(file);
1986                return FDB_RESULT_KV_STORE_BUSY;
1987            }
1988            spin_lock(&kv_header->lock);
1989
1990            avl_remove(kv_header->idx_name, &node->avl_name);
1991            avl_remove(kv_header->idx_id, &node->avl_id);
1992            --kv_header->num_kv_stores;
1993            spin_unlock(&kv_header->lock);
1994
1995            kv_id = node->id;
1996
1997            // free node
1998            free(node->kvs_name);
1999            free(node);
2000        } else {
2001            // reset all stats except for WAL
2002            node->stat.ndocs = 0;
2003            node->stat.nlivenodes = 0;
2004            node->stat.datasize = 0;
2005            node->stat.deltasize = 0;
2006            node->seqnum = 0;
2007            spin_unlock(&kv_header->lock);
2008        }
2009    }
2010
2011    // discard all WAL entries
2012    wal_close_kv_ins(file, kv_id, &root_handle->log_callback);
2013
2014    bid_t dirty_idtree_root = BLK_NOT_FOUND;
2015    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
2016    struct filemgr_dirty_update_node *prev_node = NULL, *new_node = NULL;
2017
2018    _fdb_dirty_update_ready(root_handle, &prev_node, &new_node,
2019                            &dirty_idtree_root, &dirty_seqtree_root, false);
2020
2021    size_id = sizeof(fdb_kvs_id_t);
2022    size_chunk = root_handle->trie->chunksize;
2023
2024    // remove from super handle's HB+trie
2025    _kv_id = alca(uint8_t, size_chunk);
2026    kvid2buf(size_chunk, kv_id, _kv_id);
2027    hbtrie_remove_partial(root_handle->trie, _kv_id, size_chunk);
2028    btreeblk_end(root_handle->bhandle);
2029
2030    if (root_handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2031        _kv_id = alca(uint8_t, size_id);
2032        kvid2buf(size_id, kv_id, _kv_id);
2033        hbtrie_remove_partial(root_handle->seqtrie, _kv_id, size_id);
2034        btreeblk_end(root_handle->bhandle);
2035    }
2036
2037    _fdb_dirty_update_finalize(root_handle, prev_node, new_node,
2038                               &dirty_idtree_root, &dirty_seqtree_root, true);
2039
2040    // append system doc
2041    root_handle->kv_info_offset = fdb_kvs_header_append(root_handle);
2042
2043    // if no compaction is being performed, append header and commit
2044    if (root_handle->file == file) {
2045        uint64_t cur_bmp_revnum = sb_get_bmp_revnum(file);
2046        root_handle->last_hdr_bid = filemgr_alloc(file, &root_handle->log_callback);
2047        root_handle->cur_header_revnum = fdb_set_file_header(root_handle, true);
2048        fs = filemgr_commit_bid(root_handle->file,
2049                                root_handle->last_hdr_bid,
2050                                cur_bmp_revnum,
2051                                !(root_handle->config.durability_opt & FDB_DRB_ASYNC),
2052                                &root_handle->log_callback);
2053        btreeblk_reset_subblock_info(root_handle->bhandle);
2054    }
2055
2056    filemgr_mutex_unlock(file);
2057
2058    return fs;
2059}
2060
2061bool _fdb_kvs_is_busy(fdb_file_handle *fhandle)
2062{
2063    bool ret = false;
2064    struct filemgr *file = fhandle->root->file;
2065    struct avl_node *a;
2066    struct filemgr_fhandle_idx_node *fhandle_node;
2067    fdb_file_handle *file_handle;
2068
2069    spin_lock(&file->fhandle_idx_lock);
2070    a = avl_first(&file->fhandle_idx);
2071    while (a) {
2072        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
2073        a = avl_next(a);
2074        file_handle = (fdb_file_handle *) fhandle_node->fhandle;
2075        spin_lock(&file_handle->lock);
2076        if (list_begin(file_handle->handles) != NULL) {
2077            ret = true;
2078            spin_unlock(&file_handle->lock);
2079            break;
2080        }
2081        spin_unlock(&file_handle->lock);
2082    }
2083    spin_unlock(&file->fhandle_idx_lock);
2084
2085    return ret;
2086}
2087
2088fdb_status fdb_kvs_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
2089{
2090    fdb_config config;
2091    fdb_kvs_config kvs_config;
2092    fdb_kvs_handle *handle_in, *handle, *super_handle;
2093    fdb_status fs;
2094    fdb_seqnum_t old_seqnum;
2095    fdb_file_handle *fhandle;
2096    char *kvs_name;
2097
2098    if (!handle_ptr) {
2099        return FDB_RESULT_INVALID_HANDLE;
2100    }
2101
2102    handle_in = *handle_ptr;
2103
2104    if (!handle_in) {
2105        return FDB_RESULT_INVALID_HANDLE;
2106    }
2107
2108    if (!handle_in->kvs) {
2109        return FDB_RESULT_INVALID_ARGS;
2110    }
2111    super_handle = handle_in->kvs->root;
2112    fhandle = handle_in->fhandle;
2113    config = handle_in->config;
2114    kvs_config = handle_in->kvs_config;
2115
2116    if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
2117        return fdb_log(&handle_in->log_callback,
2118                       FDB_RESULT_RONLY_VIOLATION,
2119                       "Warning: Rollback is not allowed on "
2120                       "the read-only DB file '%s'.",
2121                       handle_in->file->filename);
2122    }
2123
2124    filemgr_mutex_lock(handle_in->file);
2125    filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
2126    // All transactions should be closed before rollback
2127    if (wal_txn_exists(handle_in->file)) {
2128        filemgr_set_rollback(handle_in->file, 0);
2129        filemgr_mutex_unlock(handle_in->file);
2130        return FDB_RESULT_FAIL_BY_TRANSACTION;
2131    }
2132
2133    // If compaction is running, wait until it is aborted.
2134    // TODO: Find a better way of waiting for the compaction abortion.
2135    unsigned int sleep_time = 10000; // 10 ms.
2136    file_status_t fstatus = filemgr_get_file_status(handle_in->file);
2137    while (fstatus == FILE_COMPACT_OLD) {
2138        filemgr_mutex_unlock(handle_in->file);
2139        decaying_usleep(&sleep_time, 1000000);
2140        filemgr_mutex_lock(handle_in->file);
2141        fstatus = filemgr_get_file_status(handle_in->file);
2142    }
2143    if (fstatus == FILE_REMOVED_PENDING) {
2144        filemgr_mutex_unlock(handle_in->file);
2145        fdb_check_file_reopen(handle_in, NULL);
2146    } else {
2147        filemgr_mutex_unlock(handle_in->file);
2148    }
2149
2150    fdb_sync_db_header(handle_in);
2151
2152    // if the max sequence number seen by this handle is lower than the
2153    // requested snapshot marker, it means the snapshot is not yet visible
2154    // even via the current fdb_kvs_handle
2155    if (seqnum > handle_in->seqnum) {
2156        filemgr_set_rollback(super_handle->file, 0); // allow mutations
2157        return FDB_RESULT_NO_DB_INSTANCE;
2158    }
2159
2160    kvs_name = _fdb_kvs_get_name(handle_in, handle_in->file);
2161    if (seqnum == 0) { // Handle special case of rollback to zero..
2162        fs = _fdb_kvs_remove(fhandle, kvs_name, true /*recreate!*/);
2163        filemgr_set_rollback(super_handle->file, 0); // allow mutations
2164        return fs;
2165    }
2166
2167    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
2168    if (!handle) { // LCOV_EXCL_START
2169        filemgr_set_rollback(handle_in->file, 0); // allow mutations
2170        return FDB_RESULT_ALLOC_FAIL;
2171    } // LCOV_EXCL_STOP
2172
2173    handle->max_seqnum = seqnum;
2174    handle->log_callback = handle_in->log_callback;
2175    handle->fhandle = fhandle;
2176    atomic_init_uint8_t(&handle->handle_busy, 0);
2177
2178    if (handle_in->kvs->type == KVS_SUB) {
2179        fs = _fdb_kvs_open(handle_in->kvs->root,
2180                           &config,
2181                           &kvs_config,
2182                           handle_in->file,
2183                           handle_in->file->filename,
2184                           kvs_name,
2185                           handle);
2186    } else {
2187        fs = _fdb_open(handle, handle_in->file->filename,
2188                       FDB_AFILENAME, &config);
2189    }
2190    filemgr_set_rollback(handle_in->file, 0); // allow mutations
2191
2192    if (fs == FDB_RESULT_SUCCESS) {
2193        // get KV instance's sub B+trees' root node BIDs
2194        // from both ID-tree and Seq-tree, AND
2195        // replace current handle's sub B+trees' root node BIDs
2196        // by old BIDs
2197        size_t size_chunk, size_id;
2198        bid_t id_root, seq_root, dummy;
2199        uint8_t *_kv_id;
2200        hbtrie_result hr;
2201
2202        size_chunk = handle->trie->chunksize;
2203        size_id = sizeof(fdb_kvs_id_t);
2204
2205        filemgr_mutex_lock(handle_in->file);
2206
2207        // read root BID of the KV instance from the old handle
2208        // and overwrite into the current handle
2209        _kv_id = alca(uint8_t, size_chunk);
2210        kvid2buf(size_chunk, handle->kvs->id, _kv_id);
2211        hr = hbtrie_find_partial(handle->trie, _kv_id,
2212                                 size_chunk, &id_root);
2213        btreeblk_end(handle->bhandle);
2214        if (hr == HBTRIE_RESULT_SUCCESS) {
2215            hbtrie_insert_partial(super_handle->trie,
2216                                  _kv_id, size_chunk,
2217                                  &id_root, &dummy);
2218        } else { // No Trie info in rollback header.
2219                 // Erase kv store from super handle's main index.
2220            hbtrie_remove_partial(super_handle->trie, _kv_id, size_chunk);
2221        }
2222        btreeblk_end(super_handle->bhandle);
2223
2224        if (config.seqtree_opt == FDB_SEQTREE_USE) {
2225            // same as above for seq-trie
2226            _kv_id = alca(uint8_t, size_id);
2227            kvid2buf(size_id, handle->kvs->id, _kv_id);
2228            hr = hbtrie_find_partial(handle->seqtrie, _kv_id,
2229                                     size_id, &seq_root);
2230            btreeblk_end(handle->bhandle);
2231            if (hr == HBTRIE_RESULT_SUCCESS) {
2232                hbtrie_insert_partial(super_handle->seqtrie,
2233                                      _kv_id, size_id,
2234                                      &seq_root, &dummy);
2235            } else { // No seqtrie info in rollback header.
2236                     // Erase kv store from super handle's seqtrie index.
2237                hbtrie_remove_partial(super_handle->seqtrie, _kv_id, size_id);
2238            }
2239            btreeblk_end(super_handle->bhandle);
2240        }
2241
2242        old_seqnum = fdb_kvs_get_seqnum(handle_in->file,
2243                                        handle_in->kvs->id);
2244        fdb_kvs_set_seqnum(handle_in->file,
2245                           handle_in->kvs->id, seqnum);
2246        handle_in->seqnum = seqnum;
2247        filemgr_mutex_unlock(handle_in->file);
2248
2249        super_handle->rollback_revnum = handle->rollback_revnum;
2250        fs = _fdb_commit(super_handle, FDB_COMMIT_MANUAL_WAL_FLUSH,
2251                         !(handle_in->config.durability_opt & FDB_DRB_ASYNC));
2252        if (fs == FDB_RESULT_SUCCESS) {
2253            _fdb_kvs_close(handle);
2254            *handle_ptr = handle_in;
2255            fdb_kvs_info_free(handle);
2256            free(handle);
2257        } else {
2258            // cancel the rolling-back of the sequence number
2259            fdb_log(&handle_in->log_callback, fs,
2260                    "Rollback failed due to a commit failure with a sequence "
2261                    "number %" _F64, seqnum);
2262            filemgr_mutex_lock(handle_in->file);
2263            fdb_kvs_set_seqnum(handle_in->file,
2264                               handle_in->kvs->id, old_seqnum);
2265            filemgr_mutex_unlock(handle_in->file);
2266            _fdb_kvs_close(handle);
2267            fdb_kvs_info_free(handle);
2268            free(handle);
2269        }
2270    } else {
2271        free(handle);
2272    }
2273
2274    return fs;
2275}
2276
2277LIBFDB_API
2278fdb_status fdb_kvs_remove(fdb_file_handle *fhandle,
2279                          const char *kvs_name)
2280{
2281    return _fdb_kvs_remove(fhandle, kvs_name, false);
2282}
2283
2284LIBFDB_API
2285fdb_status fdb_get_kvs_info(fdb_kvs_handle *handle, fdb_kvs_info *info)
2286{
2287    uint64_t ndocs;
2288    uint64_t ndeletes;
2289    uint64_t wal_docs;
2290    uint64_t wal_deletes;
2291    uint64_t wal_n_inserts;
2292    uint64_t datasize;
2293    uint64_t nlivenodes;
2294    fdb_kvs_id_t kv_id;
2295    struct avl_node *a;
2296    struct filemgr *file;
2297    struct kvs_node *node, query;
2298    struct kvs_header *kv_header;
2299    struct kvs_stat stat;
2300
2301    if (!handle) {
2302        return FDB_RESULT_INVALID_HANDLE;
2303    }
2304
2305    if (!info) {
2306        return FDB_RESULT_INVALID_ARGS;
2307    }
2308
2309    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2310        return FDB_RESULT_HANDLE_BUSY;
2311    }
2312
2313    if (!handle->shandle) { // snapshot handle should be immutable
2314        fdb_check_file_reopen(handle, NULL);
2315        fdb_sync_db_header(handle);
2316    }
2317
2318    file = handle->file;
2319
2320    if (handle->kvs == NULL) {
2321        info->name = default_kvs_name;
2322        kv_id = 0;
2323
2324    } else {
2325        kv_header = file->kv_header;
2326        kv_id = handle->kvs->id;
2327        spin_lock(&kv_header->lock);
2328
2329        query.id = handle->kvs->id;
2330        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
2331        if (a) { // sub handle
2332            node = _get_entry(a, struct kvs_node, avl_id);
2333            info->name = (const char*)node->kvs_name;
2334        } else { // root handle
2335            info->name = default_kvs_name;
2336        }
2337        spin_unlock(&kv_header->lock);
2338    }
2339
2340    if (handle->shandle) {
2341        // snapshot .. get its local stats
2342        snap_get_stat(handle->shandle, &stat);
2343    } else {
2344        _kvs_stat_get(file, kv_id, &stat);
2345    }
2346    ndocs = stat.ndocs;
2347    ndeletes = stat.ndeletes;
2348    wal_docs = stat.wal_ndocs;
2349    wal_deletes = stat.wal_ndeletes;
2350    wal_n_inserts = wal_docs - wal_deletes;
2351
2352    if (ndocs + wal_n_inserts < wal_deletes) {
2353        info->doc_count = 0;
2354    } else {
2355        if (ndocs) { // not accurate since some ndocs may be in wal_n_inserts
2356            info->doc_count = ndocs + wal_n_inserts - wal_deletes;
2357        } else { // this is accurate
2358            info->doc_count = wal_n_inserts;
2359        }
2360    }
2361
2362    if (ndeletes) { // not accurate since some ndeletes may be wal_n_deletes
2363        info->deleted_count = ndeletes + wal_deletes;
2364    } else { // this is accurate
2365        info->deleted_count = wal_deletes;
2366    }
2367
2368    datasize = stat.datasize;
2369    nlivenodes = stat.nlivenodes;
2370
2371    info->space_used = datasize;
2372    info->space_used += nlivenodes * handle->config.blocksize;
2373    info->file = handle->fhandle;
2374
2375    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
2376
2377    // This is another LIBFDB_API call, so handle is marked as free
2378    // in the line above before making this call
2379    fdb_get_kvs_seqnum(handle, &info->last_seqnum);
2380
2381    return FDB_RESULT_SUCCESS;
2382}
2383
2384LIBFDB_API
2385fdb_status fdb_get_kvs_ops_info(fdb_kvs_handle *handle, fdb_kvs_ops_info *info)
2386{
2387    fdb_kvs_id_t kv_id;
2388    struct filemgr *file;
2389    struct kvs_ops_stat stat;
2390    struct kvs_ops_stat root_stat;
2391
2392    if (!handle) {
2393        return FDB_RESULT_INVALID_HANDLE;
2394    }
2395
2396    if (!info) {
2397        return FDB_RESULT_INVALID_ARGS;
2398    }
2399
2400    fdb_kvs_handle *root_handle = handle->fhandle->root;
2401
2402    // for snapshot handle do not reopen new file as user is interested in
2403    // reader stats from the old file
2404    if (!handle->shandle) {
2405        // always get stats from the latest file
2406        fdb_check_file_reopen(handle, NULL);
2407        fdb_sync_db_header(handle);
2408    }
2409
2410    file = handle->file;
2411
2412    if (handle->kvs == NULL) {
2413        kv_id = 0;
2414    } else {
2415        kv_id = handle->kvs->id;
2416    }
2417
2418    _kvs_ops_stat_get(file, kv_id, &stat);
2419
2420    if (root_handle != handle) {
2421        _kvs_ops_stat_get(file, 0, &root_stat);
2422    } else {
2423        root_stat = stat;
2424    }
2425
2426    info->num_sets = atomic_get_uint64_t(&stat.num_sets, std::memory_order_relaxed);
2427    info->num_dels = atomic_get_uint64_t(&stat.num_dels, std::memory_order_relaxed);
2428    info->num_gets = atomic_get_uint64_t(&stat.num_gets, std::memory_order_relaxed);
2429    info->num_iterator_gets = atomic_get_uint64_t(&stat.num_iterator_gets,
2430                                                  std::memory_order_relaxed);
2431    info->num_iterator_gets = atomic_get_uint64_t(&stat.num_iterator_gets,
2432                                                  std::memory_order_relaxed);
2433    info->num_iterator_moves = atomic_get_uint64_t(&stat.num_iterator_moves,
2434                                                   std::memory_order_relaxed);
2435
2436    info->num_commits = atomic_get_uint64_t(&root_stat.num_commits,
2437                                            std::memory_order_relaxed);
2438    info->num_compacts = atomic_get_uint64_t(&root_stat.num_compacts,
2439                                             std::memory_order_relaxed);
2440    return FDB_RESULT_SUCCESS;
2441}
2442
2443LIBFDB_API
2444fdb_status fdb_get_kvs_name_list(fdb_file_handle *fhandle,
2445                                 fdb_kvs_name_list *kvs_name_list)
2446{
2447    size_t num, size, offset;
2448    char *ptr;
2449    char **segment;
2450    fdb_kvs_handle *root_handle;
2451    struct kvs_header *kv_header;
2452    struct kvs_node *node;
2453    struct avl_node *a;
2454
2455    if (!fhandle) {
2456        return FDB_RESULT_INVALID_HANDLE;
2457    }
2458
2459    if (!kvs_name_list) {
2460        return FDB_RESULT_INVALID_ARGS;
2461    }
2462
2463    root_handle = fhandle->root;
2464    kv_header = root_handle->file->kv_header;
2465
2466    spin_lock(&kv_header->lock);
2467    // sum all lengths of KVS names first
2468    // (to calculate the size of memory segment to be allocated)
2469    num = 1;
2470    size = strlen(default_kvs_name) + 1;
2471    a = avl_first(kv_header->idx_id);
2472    while (a) {
2473        node = _get_entry(a, struct kvs_node, avl_id);
2474        a = avl_next(&node->avl_id);
2475
2476        num++;
2477        size += strlen(node->kvs_name) + 1;
2478    }
2479    size += num * sizeof(char*);
2480
2481    // allocate memory segment
2482    segment = (char**)calloc(1, size);
2483    kvs_name_list->num_kvs_names = num;
2484    kvs_name_list->kvs_names = segment;
2485
2486    ptr = (char*)segment + num * sizeof(char*);
2487    offset = num = 0;
2488
2489    // copy default KVS name
2490    strcpy(ptr + offset, default_kvs_name);
2491    segment[num] = ptr + offset;
2492    num++;
2493    offset += strlen(default_kvs_name) + 1;
2494
2495    // copy the others
2496    a = avl_first(kv_header->idx_name);
2497    while (a) {
2498        node = _get_entry(a, struct kvs_node, avl_name);
2499        a = avl_next(&node->avl_name);
2500
2501        strcpy(ptr + offset, node->kvs_name);
2502        segment[num] = ptr + offset;
2503
2504        num++;
2505        offset += strlen(node->kvs_name) + 1;
2506    }
2507
2508    spin_unlock(&kv_header->lock);
2509
2510    return FDB_RESULT_SUCCESS;
2511}
2512
2513LIBFDB_API
2514fdb_status fdb_free_kvs_name_list(fdb_kvs_name_list *kvs_name_list)
2515{
2516    if (!kvs_name_list) {
2517        return FDB_RESULT_INVALID_ARGS;
2518    }
2519
2520    free(kvs_name_list->kvs_names);
2521    kvs_name_list->kvs_names = NULL;
2522    kvs_name_list->num_kvs_names = 0;
2523
2524    return FDB_RESULT_SUCCESS;
2525}
2526
2527stale_header_info fdb_get_smallest_active_header(fdb_kvs_handle *handle)
2528{
2529    uint8_t *hdr_buf = alca(uint8_t, handle->config.blocksize);
2530    size_t i, hdr_len;
2531    uint64_t n_headers;
2532    bid_t hdr_bid, last_wal_bid;
2533    filemgr_header_revnum_t hdr_revnum;
2534    filemgr_header_revnum_t cur_revnum;
2535    filemgr_magic_t magic;
2536    fdb_seqnum_t seqnum;
2537    fdb_file_handle *fhandle = NULL;
2538    stale_header_info ret;
2539    struct avl_node *a;
2540    struct filemgr_fhandle_idx_node *fhandle_node;
2541    struct list_elem *e;
2542    struct kvs_opened_node *item;
2543
2544    ret.revnum = cur_revnum = handle->fhandle->root->cur_header_revnum;
2545    ret.bid = handle->fhandle->root->last_hdr_bid;
2546
2547    spin_lock(&handle->file->fhandle_idx_lock);
2548
2549    // check all opened file handles
2550    a = avl_first(&handle->file->fhandle_idx);
2551    while (a) {
2552        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
2553        a = avl_next(a);
2554
2555        fhandle = (fdb_file_handle*)fhandle_node->fhandle;
2556        spin_lock(&fhandle->lock);
2557        // check all opened KVS handles belonging to the file handle
2558        e = list_begin(fhandle->handles);
2559        while (e) {
2560
2561            item = _get_entry(e, struct kvs_opened_node, le);
2562            e = list_next(e);
2563
2564            if (!item->handle->shandle) {
2565                // Only consider active snapshot handles since non-snapshot
2566                // handles will get synced upon their next forestdb api call.
2567                // This prevents "lazy" non-snapshot handles from holding up
2568                // stale block reclaim.
2569                continue;
2570            }
2571
2572            if (item->handle->cur_header_revnum < ret.revnum) {
2573                ret.revnum = item->handle->cur_header_revnum;
2574                ret.bid = item->handle->last_hdr_bid;
2575            }
2576        }
2577        spin_unlock(&fhandle->lock);
2578    }
2579
2580    spin_unlock(&handle->file->fhandle_idx_lock);
2581
2582    uint64_t num_keeping_headers =
2583        atomic_get_uint64_t(&handle->file->config->num_keeping_headers,
2584                            std::memory_order_relaxed);
2585    if (num_keeping_headers) {
2586        // backward scan previous header info to keep more headers
2587
2588        if (ret.bid == handle->last_hdr_bid) {
2589            // header in 'handle->last_hdr_bid' is not written into file yet!
2590            // we should start from the previous header
2591            hdr_bid = atomic_get_uint64_t(&handle->file->header.bid);
2592            hdr_revnum = handle->file->header.revnum - 1;
2593        } else {
2594            hdr_bid = ret.bid;
2595            hdr_revnum = ret.revnum;
2596        }
2597
2598        n_headers= num_keeping_headers;
2599        if (cur_revnum - hdr_revnum < n_headers) {
2600            n_headers = n_headers - (cur_revnum - hdr_revnum);
2601        } else {
2602            n_headers = 0;
2603        }
2604
2605        for (i=0; i<n_headers; ++i) {
2606            hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
2607                         hdr_buf, &hdr_len, &seqnum, &hdr_revnum, NULL,
2608                         &magic, NULL, &handle->log_callback);
2609            if (hdr_len) {
2610                ret.revnum = hdr_revnum;
2611                ret.bid = hdr_bid;
2612            } else {
2613                break;
2614            }
2615        }
2616    }
2617
2618    // although we keep more headers from the oldest active header, we have to
2619    // preserve the last WAL flushing header from the target header for data
2620    // consistency.
2621    uint64_t dummy64;
2622    char *new_filename;
2623
2624    filemgr_fetch_header(handle->file, ret.bid, hdr_buf, &hdr_len, &seqnum,
2625                         &hdr_revnum, NULL, &magic, NULL, &handle->log_callback);
2626    fdb_fetch_header(magic, hdr_buf, &dummy64, &dummy64, &dummy64, &dummy64,
2627                     &dummy64, &dummy64, &dummy64, &last_wal_bid, &dummy64,
2628                     &dummy64, &new_filename, NULL);
2629
2630    if (last_wal_bid != BLK_NOT_FOUND) {
2631        filemgr_fetch_header(handle->file, last_wal_bid, hdr_buf, &hdr_len, &seqnum,
2632                             &hdr_revnum, NULL, &magic, NULL, &handle->log_callback);
2633        ret.bid = last_wal_bid;
2634        ret.revnum = hdr_revnum;
2635    } else {
2636        // WAL has not been flushed yet .. we cannot trigger block reusing
2637        ret.bid = BLK_NOT_FOUND;
2638        ret.revnum = 0;
2639    }
2640
2641    return ret;
2642}
2643
2644