xref: /5.5.2/forestdb/src/kv_instance.cc (revision 56236603)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdlib.h>
19#include <string.h>
20
21#include "libforestdb/forestdb.h"
22#include "common.h"
23#include "internal_types.h"
24#include "fdb_internal.h"
25#include "configuration.h"
26#include "avltree.h"
27#include "list.h"
28#include "docio.h"
29#include "filemgr.h"
30#include "wal.h"
31#include "hbtrie.h"
32#include "btreeblock.h"
33#include "version.h"
34#include "staleblock.h"
35
36#include "memleak.h"
37#include "timing.h"
38#include "time_utils.h"
39
40static const char *default_kvs_name = DEFAULT_KVS_NAME;
41
42// list element for opened KV store handles
43// (in-memory data: managed by the file handle)
44struct kvs_opened_node {
45    fdb_kvs_handle *handle;
46    struct list_elem le;
47};
48
49// list element for custom cmp functions in fhandle
50struct cmp_func_node {
51    char *kvs_name;
52    fdb_custom_cmp_variable func;
53    struct list_elem le;
54};
55
56static int _kvs_cmp_name(struct avl_node *a, struct avl_node *b, void *aux)
57{
58    struct kvs_node *aa, *bb;
59    aa = _get_entry(a, struct kvs_node, avl_name);
60    bb = _get_entry(b, struct kvs_node, avl_name);
61    return strcmp(aa->kvs_name, bb->kvs_name);
62}
63
64static int _kvs_cmp_id(struct avl_node *a, struct avl_node *b, void *aux)
65{
66    struct kvs_node *aa, *bb;
67    aa = _get_entry(a, struct kvs_node, avl_id);
68    bb = _get_entry(b, struct kvs_node, avl_id);
69
70    if (aa->id < bb->id) {
71        return -1;
72    } else if (aa->id > bb->id) {
73        return 1;
74    } else {
75        return 0;
76    }
77}
78
79struct kvs_opened_node *_fdb_kvs_createNLinkKVHandle(fdb_file_handle *fhandle,
80                                                     fdb_kvs_handle *handle)
81{
82    struct kvs_opened_node *opened_node = (struct kvs_opened_node *)
83        calloc(1, sizeof(struct kvs_opened_node));
84    opened_node->handle = handle;
85
86    handle->node = opened_node;
87    spin_lock(&fhandle->lock);
88    list_push_back(fhandle->handles, &opened_node->le);
89    spin_unlock(&fhandle->lock);
90    return opened_node;
91}
92
93static bool _fdb_kvs_any_handle_opened(fdb_file_handle *fhandle,
94                                       fdb_kvs_id_t kv_id)
95{
96    struct filemgr *file = fhandle->root->file;
97    struct avl_node *a;
98    struct list_elem *e;
99    struct filemgr_fhandle_idx_node *fhandle_node;
100    struct kvs_opened_node *opened_node;
101    fdb_file_handle *file_handle;
102
103    spin_lock(&file->fhandle_idx_lock);
104    a = avl_first(&file->fhandle_idx);
105    while (a) {
106        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
107        a = avl_next(a);
108        file_handle = (fdb_file_handle *) fhandle_node->fhandle;
109        spin_lock(&file_handle->lock);
110        e = list_begin(file_handle->handles);
111        while (e) {
112            opened_node = _get_entry(e, struct kvs_opened_node, le);
113            if ((opened_node->handle->kvs && opened_node->handle->kvs->id == kv_id) ||
114                (kv_id == 0 && opened_node->handle->kvs == NULL)) // single KVS mode
115            {
116                // there is an opened handle
117                spin_unlock(&file_handle->lock);
118                spin_unlock(&file->fhandle_idx_lock);
119                return true;
120            }
121            e = list_next(e);
122        }
123        spin_unlock(&file_handle->lock);
124    }
125    spin_unlock(&file->fhandle_idx_lock);
126
127    return false;
128}
129
130void fdb_file_handle_init(fdb_file_handle *fhandle,
131                           fdb_kvs_handle *root)
132{
133    fhandle->root = root;
134    fhandle->flags = 0x0;
135    root->fhandle = fhandle;
136    fhandle->handles = (struct list*)calloc(1, sizeof(struct list));
137    fhandle->cmp_func_list = NULL;
138    spin_init(&fhandle->lock);
139}
140
141void fdb_file_handle_close_all(fdb_file_handle *fhandle)
142{
143    struct list_elem *e;
144    struct kvs_opened_node *node;
145
146    spin_lock(&fhandle->lock);
147    e = list_begin(fhandle->handles);
148    while (e) {
149        node = _get_entry(e, struct kvs_opened_node, le);
150        e = list_next(e);
151        _fdb_close(node->handle);
152        free(node->handle);
153        free(node);
154    }
155    spin_unlock(&fhandle->lock);
156}
157
158void fdb_file_handle_parse_cmp_func(fdb_file_handle *fhandle,
159                                    size_t n_func,
160                                    char **kvs_names,
161                                    fdb_custom_cmp_variable *functions)
162{
163    uint64_t i;
164    struct cmp_func_node *node;
165
166    if (n_func == 0 || !kvs_names || !functions) {
167        return;
168    }
169
170    fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
171    list_init(fhandle->cmp_func_list);
172
173    for (i=0;i<n_func;++i){
174        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
175        if (kvs_names[i]) {
176            node->kvs_name = (char*)calloc(1, strlen(kvs_names[i])+1);
177            strcpy(node->kvs_name, kvs_names[i]);
178        } else {
179            // NULL .. default KVS
180            node->kvs_name = NULL;
181        }
182        node->func = functions[i];
183        list_push_back(fhandle->cmp_func_list, &node->le);
184    }
185}
186
187// clone all items in cmp_func_list to fhandle->cmp_func_list
188void fdb_file_handle_clone_cmp_func_list(fdb_file_handle *fhandle,
189                                         struct list *cmp_func_list)
190{
191    struct list_elem *e;
192    struct cmp_func_node *src, *dst;
193
194    if (fhandle->cmp_func_list || /* already exist */
195        !cmp_func_list) {
196        return;
197    }
198
199    fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
200    list_init(fhandle->cmp_func_list);
201
202    e = list_begin(cmp_func_list);
203    while (e) {
204        src = _get_entry(e, struct cmp_func_node, le);
205        dst = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
206        if (src->kvs_name) {
207            dst->kvs_name = (char*)calloc(1, strlen(src->kvs_name)+1);
208            strcpy(dst->kvs_name, src->kvs_name);
209        } else {
210            dst->kvs_name = NULL; // default KVS
211        }
212        dst->func = src->func;
213        list_push_back(fhandle->cmp_func_list, &dst->le);
214        e = list_next(&src->le);
215    }
216}
217
218void fdb_file_handle_add_cmp_func(fdb_file_handle *fhandle,
219                                  char *kvs_name,
220                                  fdb_custom_cmp_variable cmp_func)
221{
222    struct cmp_func_node *node;
223
224    // create list if not exist
225    if (!fhandle->cmp_func_list) {
226        fhandle->cmp_func_list = (struct list*)calloc(1, sizeof(struct list));
227        list_init(fhandle->cmp_func_list);
228    }
229
230    node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
231    if (kvs_name) {
232        node->kvs_name = (char*)calloc(1, strlen(kvs_name)+1);
233        strcpy(node->kvs_name, kvs_name);
234    } else {
235        // default KVS
236        node->kvs_name = NULL;
237    }
238    node->func = cmp_func;
239    list_push_back(fhandle->cmp_func_list, &node->le);
240}
241
242void fdb_cmp_func_list_from_filemgr(struct filemgr *file, struct list *cmp_func_list)
243{
244    if (!file || !file->kv_header || !cmp_func_list) {
245        return;
246    }
247
248    struct cmp_func_node *node;
249
250    spin_lock(&file->kv_header->lock);
251    // Default KV store cmp function
252    if (file->kv_header->default_kvs_cmp) {
253        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
254        node->func = file->kv_header->default_kvs_cmp;
255        node->kvs_name = NULL;
256        list_push_back(cmp_func_list, &node->le);
257    }
258
259    // Rest of KV stores
260    struct kvs_node *kvs_node;
261    struct avl_node *a = avl_first(file->kv_header->idx_name);
262    while (a) {
263        kvs_node = _get_entry(a, struct kvs_node, avl_name);
264        a = avl_next(a);
265        node = (struct cmp_func_node*)calloc(1, sizeof(struct cmp_func_node));
266        node->func = kvs_node->custom_cmp;
267        node->kvs_name = (char*)calloc(1, strlen(kvs_node->kvs_name)+1);
268        strcpy(node->kvs_name, kvs_node->kvs_name);
269        list_push_back(cmp_func_list, &node->le);
270    }
271    spin_unlock(&file->kv_header->lock);
272}
273
274void fdb_free_cmp_func_list(struct list *cmp_func_list)
275{
276    if (!cmp_func_list) {
277        return;
278    }
279
280    struct cmp_func_node *cmp_node;
281    struct list_elem *e = list_begin(cmp_func_list);
282    while (e) {
283        cmp_node = _get_entry(e, struct cmp_func_node, le);
284        e = list_remove(cmp_func_list, &cmp_node->le);
285        free(cmp_node->kvs_name);
286        free(cmp_node);
287    }
288}
289
290static void _free_cmp_func_list(fdb_file_handle *fhandle)
291{
292    struct list_elem *e;
293    struct cmp_func_node *cmp_node;
294
295    if (!fhandle->cmp_func_list) {
296        return;
297    }
298
299    e = list_begin(fhandle->cmp_func_list);
300    while (e) {
301        cmp_node = _get_entry(e, struct cmp_func_node, le);
302        e = list_remove(fhandle->cmp_func_list, &cmp_node->le);
303
304        free(cmp_node->kvs_name);
305        free(cmp_node);
306    }
307    free(fhandle->cmp_func_list);
308    fhandle->cmp_func_list = NULL;
309}
310
311void fdb_file_handle_free(fdb_file_handle *fhandle)
312{
313    free(fhandle->handles);
314    _free_cmp_func_list(fhandle);
315    spin_destroy(&fhandle->lock);
316    free(fhandle);
317}
318
319fdb_status fdb_kvs_cmp_check(fdb_kvs_handle *handle)
320{
321    int ori_flag;
322    fdb_file_handle *fhandle = handle->fhandle;
323    fdb_custom_cmp_variable ori_custom_cmp;
324    struct filemgr *file = handle->file;
325    struct cmp_func_node *cmp_node;
326    struct kvs_node *kvs_node, query;
327    struct list_elem *e;
328    struct avl_node *a;
329
330    spin_lock(&file->kv_header->lock);
331    ori_flag = file->kv_header->custom_cmp_enabled;
332    ori_custom_cmp = file->kv_header->default_kvs_cmp;
333
334    if (fhandle->cmp_func_list) {
335        handle->kvs_config.custom_cmp = NULL;
336
337        e = list_begin(fhandle->cmp_func_list);
338        while (e) {
339            cmp_node = _get_entry(e, struct cmp_func_node, le);
340            if (cmp_node->kvs_name == NULL ||
341                    !strcmp(cmp_node->kvs_name, default_kvs_name)) { // default KVS
342                handle->kvs_config.custom_cmp = cmp_node->func;
343                file->kv_header->default_kvs_cmp = cmp_node->func;
344                file->kv_header->custom_cmp_enabled = 1;
345            } else {
346                // search by name
347                query.kvs_name = cmp_node->kvs_name;
348                a = avl_search(file->kv_header->idx_name,
349                               &query.avl_name,
350                               _kvs_cmp_name);
351                if (a) { // found
352                    kvs_node = _get_entry(a, struct kvs_node, avl_name);
353                    if (!kvs_node->custom_cmp) {
354                        kvs_node->custom_cmp = cmp_node->func;
355                    }
356                    file->kv_header->custom_cmp_enabled = 1;
357                }
358            }
359            e = list_next(&cmp_node->le);
360        }
361    }
362
363    // first check the default KVS
364    // 1. root handle has not been opened yet: don't care
365    // 2. root handle was opened before: must match the flag
366    if (fhandle->flags & FHANDLE_ROOT_INITIALIZED) {
367        if (fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP &&
368            handle->kvs_config.custom_cmp == NULL) {
369            // custom cmp function was assigned before,
370            // but no custom cmp function is assigned
371            file->kv_header->custom_cmp_enabled = ori_flag;
372            file->kv_header->default_kvs_cmp = ori_custom_cmp;
373            spin_unlock(&file->kv_header->lock);
374            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
375            if (!kvs_name) {
376                kvs_name = DEFAULT_KVS_NAME;
377            }
378            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
379                           "Error! Tried to open a KV store '%s', which was created with "
380                           "custom compare function enabled, without passing the same "
381                           "custom compare function.", kvs_name);
382        }
383        if (!(fhandle->flags & FHANDLE_ROOT_CUSTOM_CMP) &&
384              handle->kvs_config.custom_cmp) {
385            // custom cmp function was not assigned before,
386            // but custom cmp function is assigned from user
387            file->kv_header->custom_cmp_enabled = ori_flag;
388            file->kv_header->default_kvs_cmp = ori_custom_cmp;
389            spin_unlock(&file->kv_header->lock);
390            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
391            if (!kvs_name) {
392                kvs_name = DEFAULT_KVS_NAME;
393            }
394            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
395                           "Error! Tried to open a KV store '%s', which was created without "
396                           "custom compare function, by passing custom compare function.",
397                    kvs_name);
398        }
399    }
400
401    // next check other KVSs
402    a = avl_first(file->kv_header->idx_name);
403    while (a) {
404        kvs_node = _get_entry(a, struct kvs_node, avl_name);
405        a = avl_next(a);
406
407        if (kvs_node->flags & KVS_FLAG_CUSTOM_CMP &&
408            kvs_node->custom_cmp == NULL) {
409            // custom cmp function was assigned before,
410            // but no custom cmp function is assigned
411            file->kv_header->custom_cmp_enabled = ori_flag;
412            file->kv_header->default_kvs_cmp = ori_custom_cmp;
413            spin_unlock(&file->kv_header->lock);
414            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
415            if (!kvs_name) {
416                kvs_name = DEFAULT_KVS_NAME;
417            }
418            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
419                           "Error! Tried to open a KV store '%s', which was created with "
420                           "custom compare function enabled, without passing the same "
421                           "custom compare function.", kvs_name);
422        }
423        if (!(kvs_node->flags & KVS_FLAG_CUSTOM_CMP) &&
424              kvs_node->custom_cmp) {
425            // custom cmp function was not assigned before,
426            // but custom cmp function is assigned from user
427            file->kv_header->custom_cmp_enabled = ori_flag;
428            file->kv_header->default_kvs_cmp = ori_custom_cmp;
429            spin_unlock(&file->kv_header->lock);
430            const char *kvs_name = _fdb_kvs_get_name(handle, handle->file);
431            if (!kvs_name) {
432                kvs_name = DEFAULT_KVS_NAME;
433            }
434            return fdb_log(&handle->log_callback, FDB_RESULT_INVALID_CMP_FUNCTION,
435                           "Error! Tried to open a KV store '%s', which was created without "
436                           "custom compare function, by passing custom compare function.",
437                           kvs_name);
438        }
439    }
440
441    spin_unlock(&file->kv_header->lock);
442    return FDB_RESULT_SUCCESS;
443}
444
445fdb_custom_cmp_variable fdb_kvs_find_cmp_name(fdb_kvs_handle *handle,
446                                              char *kvs_name)
447{
448    fdb_file_handle *fhandle;
449    struct list_elem *e;
450    struct cmp_func_node *cmp_node;
451
452    fhandle = handle->fhandle;
453    if (!fhandle->cmp_func_list) {
454        return NULL;
455    }
456
457    e = list_begin(fhandle->cmp_func_list);
458    while (e) {
459        cmp_node = _get_entry(e, struct cmp_func_node, le);
460        if (kvs_name == NULL ||
461            !strcmp(kvs_name, default_kvs_name)) {
462            if (cmp_node->kvs_name == NULL ||
463                !strcmp(cmp_node->kvs_name, default_kvs_name)) { // default KVS
464                return cmp_node->func;
465            }
466        } else if (cmp_node->kvs_name &&
467                   !strcmp(cmp_node->kvs_name, kvs_name)) {
468            return cmp_node->func;
469        }
470        e = list_next(&cmp_node->le);
471    }
472    return NULL;
473}
474
475hbtrie_cmp_func *fdb_kvs_find_cmp_chunk(void *chunk, void *aux)
476{
477    fdb_kvs_id_t kv_id;
478    struct hbtrie *trie = (struct hbtrie *)aux;
479    struct btreeblk_handle *bhandle;
480    struct filemgr *file;
481    struct avl_node *a;
482    struct kvs_node query, *node;
483
484    bhandle = (struct btreeblk_handle*)trie->btreeblk_handle;
485    file = bhandle->file;
486
487    if (!file->kv_header->custom_cmp_enabled) {
488        return NULL;
489    }
490
491    buf2kvid(trie->chunksize, chunk, &kv_id);
492
493    // search by id
494    if (kv_id > 0) {
495        query.id = kv_id;
496        spin_lock(&file->kv_header->lock);
497        a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
498        spin_unlock(&file->kv_header->lock);
499
500        if (a) {
501            node = _get_entry(a, struct kvs_node, avl_id);
502            return (hbtrie_cmp_func *)node->custom_cmp;
503        }
504    } else {
505        // root handle
506        return (hbtrie_cmp_func *)file->kv_header->default_kvs_cmp;
507    }
508    return NULL;
509}
510
511void _fdb_kvs_init_root(fdb_kvs_handle *handle, struct filemgr *file) {
512    handle->kvs->type = KVS_ROOT;
513    handle->kvs->root = handle->fhandle->root;
514    // super handle's ID is always 0
515    handle->kvs->id = 0;
516    // force custom cmp function
517    spin_lock(&file->kv_header->lock);
518    handle->kvs_config.custom_cmp = file->kv_header->default_kvs_cmp;
519    spin_unlock(&file->kv_header->lock);
520}
521
522void fdb_kvs_info_create(fdb_kvs_handle *root_handle,
523                         fdb_kvs_handle *handle,
524                         struct filemgr *file,
525                         const char *kvs_name)
526{
527    struct kvs_node query, *kvs_node;
528    struct kvs_opened_node *opened_node;
529    struct avl_node *a;
530
531    handle->kvs = (struct kvs_info*)calloc(1, sizeof(struct kvs_info));
532
533    if (root_handle == NULL) {
534        // 'handle' is a super handle
535        _fdb_kvs_init_root(handle, file);
536    } else {
537        // 'handle' is a sub handle (i.e., KV instance in a DB instance)
538        handle->kvs->type = KVS_SUB;
539        handle->kvs->root = root_handle;
540
541        if (kvs_name) {
542            spin_lock(&file->kv_header->lock);
543            query.kvs_name = (char*)kvs_name;
544            a = avl_search(file->kv_header->idx_name, &query.avl_name,
545                           _kvs_cmp_name);
546            if (a == NULL) {
547                // KV instance name is not found
548                free(handle->kvs);
549                handle->kvs = NULL;
550                spin_unlock(&file->kv_header->lock);
551                return;
552            }
553            kvs_node = _get_entry(a, struct kvs_node, avl_name);
554            handle->kvs->id = kvs_node->id;
555            // force custom cmp function
556            handle->kvs_config.custom_cmp = kvs_node->custom_cmp;
557            spin_unlock(&file->kv_header->lock);
558        } else {
559            // snapshot of the root handle
560            handle->kvs->id = 0;
561        }
562
563        opened_node = (struct kvs_opened_node *)
564               calloc(1, sizeof(struct kvs_opened_node));
565        opened_node->handle = handle;
566
567        handle->node = opened_node;
568        spin_lock(&root_handle->fhandle->lock);
569        list_push_back(root_handle->fhandle->handles, &opened_node->le);
570        spin_unlock(&root_handle->fhandle->lock);
571    }
572}
573
574void fdb_kvs_info_free(fdb_kvs_handle *handle)
575{
576    if (handle->kvs == NULL) {
577        return;
578    }
579
580    free(handle->kvs);
581    handle->kvs = NULL;
582}
583
584void _fdb_kvs_header_create(struct kvs_header **kv_header_ptr)
585{
586    struct kvs_header *kv_header;
587
588    kv_header = (struct kvs_header *)calloc(1, sizeof(struct kvs_header));
589    *kv_header_ptr = kv_header;
590
591    // KV ID '0' is reserved for default KV instance (super handle)
592    kv_header->id_counter = 1;
593    kv_header->default_kvs_cmp = NULL;
594    kv_header->custom_cmp_enabled = 0;
595    kv_header->idx_name = (struct avl_tree*)malloc(sizeof(struct avl_tree));
596    kv_header->idx_id = (struct avl_tree*)malloc(sizeof(struct avl_tree));
597    kv_header->num_kv_stores = 0;
598    avl_init(kv_header->idx_name, NULL);
599    avl_init(kv_header->idx_id, NULL);
600    spin_init(&kv_header->lock);
601}
602
603void fdb_kvs_header_create(struct filemgr *file)
604{
605    if (file->kv_header) {
606        return; // already exist
607    }
608
609    _fdb_kvs_header_create(&file->kv_header);
610    file->free_kv_header = fdb_kvs_header_free;
611}
612
613void fdb_kvs_header_reset_all_stats(struct filemgr *file)
614{
615    struct avl_node *a;
616    struct kvs_node *node;
617    struct kvs_header *kv_header = file->kv_header;
618
619    spin_lock(&kv_header->lock);
620    a = avl_first(kv_header->idx_id);
621    while (a) {
622        node = _get_entry(a, struct kvs_node, avl_id);
623        a = avl_next(&node->avl_id);
624        memset(&node->stat, 0x0, sizeof(node->stat));
625    }
626    spin_unlock(&kv_header->lock);
627}
628
629void fdb_kvs_header_copy(fdb_kvs_handle *handle,
630                         struct filemgr *new_file,
631                         struct docio_handle *new_dhandle,
632                         uint64_t *new_file_kv_info_offset,
633                         bool create_new)
634{
635    struct avl_node *a, *aa;
636    struct kvs_node *node_old, *node_new;
637
638    if (create_new) {
639        struct kvs_header *kv_header;
640        // copy KV header data in 'handle' to new file
641        _fdb_kvs_header_create(&kv_header);
642        // read from 'handle->dhandle', and import into 'new_file'
643        fdb_kvs_header_read(kv_header, handle->dhandle,
644                            handle->kv_info_offset, handle->file->version, false);
645
646        // write KV header in 'new_file' using 'new_dhandle'
647        uint64_t new_kv_info_offset;
648        fdb_kvs_handle new_handle;
649        new_handle.file = new_file;
650        new_handle.dhandle = new_dhandle;
651        new_handle.kv_info_offset = BLK_NOT_FOUND;
652        new_kv_info_offset = fdb_kvs_header_append(&new_handle);
653        if (new_file_kv_info_offset) {
654            *new_file_kv_info_offset = new_kv_info_offset;
655        }
656
657        if (!filemgr_set_kv_header(new_file, kv_header, fdb_kvs_header_free)) {
658            // LCOV_EXCL_START
659            _fdb_kvs_header_free(kv_header);
660        } // LCOV_EXCL_STOP
661        fdb_kvs_header_reset_all_stats(new_file);
662    }
663
664    spin_lock(&handle->file->kv_header->lock);
665    spin_lock(&new_file->kv_header->lock);
666    // copy all in-memory custom cmp function pointers & seqnums
667    new_file->kv_header->default_kvs_cmp =
668        handle->file->kv_header->default_kvs_cmp;
669    new_file->kv_header->custom_cmp_enabled =
670        handle->file->kv_header->custom_cmp_enabled;
671    a = avl_first(handle->file->kv_header->idx_id);
672    while (a) {
673        node_old = _get_entry(a, struct kvs_node, avl_id);
674        aa = avl_search(new_file->kv_header->idx_id,
675                        &node_old->avl_id, _kvs_cmp_id);
676        assert(aa); // MUST exist
677        node_new = _get_entry(aa, struct kvs_node, avl_id);
678        node_new->custom_cmp = node_old->custom_cmp;
679        node_new->seqnum = node_old->seqnum;
680        node_new->op_stat = node_old->op_stat;
681        a = avl_next(a);
682    }
683    spin_unlock(&new_file->kv_header->lock);
684    spin_unlock(&handle->file->kv_header->lock);
685}
686
687// export KV header info to raw data
688static void _fdb_kvs_header_export(struct kvs_header *kv_header,
689                                   void **data, size_t *len, uint64_t version)
690{
691    /* << raw data structure >>
692     * [# KV instances]:        8 bytes
693     * [current KV ID counter]: 8 bytes
694     * ---
695     * [name length]:           2 bytes
696     * [instance name]:         x bytes
697     * [instance ID]:           8 bytes
698     * [sequence number]:       8 bytes
699     * [# live index nodes]:    8 bytes
700     * [# docs]:                8 bytes
701     * [data size]:             8 bytes
702     * [flags]:                 8 bytes
703     * [delta size]:            8 bytes (since MAGIC_001)
704     * [# deleted docs]:        8 bytes (since MAGIC_001)
705     * ...
706     *    Please note that if the above format is changed, please also change...
707     *    _fdb_kvs_get_snap_info()
708     *    _fdb_kvs_header_import()
709     *    _kvs_stat_get_sum_doc()
710     *    _kvs_stat_get_sum_attr
711     */
712
713    int size = 0;
714    int offset = 0;
715    uint16_t name_len, _name_len;
716    uint64_t c = 0;
717    uint64_t _n_kv, _kv_id, _flags;
718    uint64_t _nlivenodes, _ndocs, _datasize, _ndeletes;
719    int64_t _deltasize;
720    fdb_kvs_id_t _id_counter;
721    fdb_seqnum_t _seqnum;
722    struct kvs_node *node;
723    struct avl_node *a;
724
725    if (kv_header == NULL) {
726        *data = NULL;
727        *len = 0;
728        return ;
729    }
730
731    spin_lock(&kv_header->lock);
732
733    // pre-scan to estimate the size of data
734    size += sizeof(uint64_t);
735    size += sizeof(fdb_kvs_id_t);
736    a = avl_first(kv_header->idx_name);
737    while(a) {
738        node = _get_entry(a, struct kvs_node, avl_name);
739        c++;
740        size += sizeof(uint16_t); // length
741        size += strlen(node->kvs_name)+1; // name
742        size += sizeof(node->id); // ID
743        size += sizeof(node->seqnum); // seq number
744        size += sizeof(node->stat.nlivenodes); // # live index nodes
745        size += sizeof(node->stat.ndocs); // # docs
746        size += sizeof(node->stat.datasize); // data size
747        size += sizeof(node->flags); // flags
748        if (ver_is_atleast_magic_001(version)) {
749            size += sizeof(node->stat.deltasize); // delta size since commit
750            size += sizeof(node->stat.ndeletes); // # deleted docs
751        }
752        a = avl_next(a);
753    }
754
755    *data = (void *)malloc(size);
756
757    // # KV instances
758    _n_kv = _endian_encode(c);
759    memcpy((uint8_t*)*data + offset, &_n_kv, sizeof(_n_kv));
760    offset += sizeof(_n_kv);
761
762    // ID counter
763    _id_counter = _endian_encode(kv_header->id_counter);
764    memcpy((uint8_t*)*data + offset, &_id_counter, sizeof(_id_counter));
765    offset += sizeof(_id_counter);
766
767    a = avl_first(kv_header->idx_name);
768    while(a) {
769        node = _get_entry(a, struct kvs_node, avl_name);
770
771        // name length
772        name_len = strlen(node->kvs_name)+1;
773        _name_len = _endian_encode(name_len);
774        memcpy((uint8_t*)*data + offset, &_name_len, sizeof(_name_len));
775        offset += sizeof(_name_len);
776
777        // name
778        memcpy((uint8_t*)*data + offset, node->kvs_name, name_len);
779        offset += name_len;
780
781        // KV ID
782        _kv_id = _endian_encode(node->id);
783        memcpy((uint8_t*)*data + offset, &_kv_id, sizeof(_kv_id));
784        offset += sizeof(_kv_id);
785
786        // seq number
787        _seqnum = _endian_encode(node->seqnum);
788        memcpy((uint8_t*)*data + offset, &_seqnum, sizeof(_seqnum));
789        offset += sizeof(_seqnum);
790
791        // # live index nodes
792        _nlivenodes = _endian_encode(node->stat.nlivenodes);
793        memcpy((uint8_t*)*data + offset, &_nlivenodes, sizeof(_nlivenodes));
794        offset += sizeof(_nlivenodes);
795
796        // # docs
797        _ndocs = _endian_encode(node->stat.ndocs);
798        memcpy((uint8_t*)*data + offset, &_ndocs, sizeof(_ndocs));
799        offset += sizeof(_ndocs);
800
801        // datasize
802        _datasize = _endian_encode(node->stat.datasize);
803        memcpy((uint8_t*)*data + offset, &_datasize, sizeof(_datasize));
804        offset += sizeof(_datasize);
805
806        // flags
807        _flags = _endian_encode(node->flags);
808        memcpy((uint8_t*)*data + offset, &_flags, sizeof(_flags));
809        offset += sizeof(_flags);
810
811        if (ver_is_atleast_magic_001(version)) {
812            // # delta index nodes + docsize created after last commit
813            _deltasize = _endian_encode(node->stat.deltasize);
814            memcpy((uint8_t*)*data + offset, &_deltasize, sizeof(_deltasize));
815            offset += sizeof(_deltasize);
816
817            // # deleted documents
818            _ndeletes = _endian_encode(node->stat.ndeletes);
819            memcpy((uint8_t*)*data + offset, &_ndeletes, sizeof(_ndeletes));
820            offset += sizeof(_ndeletes);
821        }
822
823        a = avl_next(a);
824    }
825
826    *len = size;
827
828    spin_unlock(&kv_header->lock);
829}
830
831void _fdb_kvs_header_import(struct kvs_header *kv_header,
832                            void *data, size_t len, uint64_t version,
833                            bool only_seq_nums)
834{
835    uint64_t i, offset = 0;
836    uint16_t name_len, _name_len;
837    uint64_t n_kv, _n_kv, kv_id, _kv_id, flags, _flags;
838    uint64_t _nlivenodes, _ndocs, _datasize, _ndeletes;
839    int64_t _deltasize;
840    bool is_deltasize;
841    fdb_kvs_id_t id_counter, _id_counter;
842    fdb_seqnum_t seqnum, _seqnum;
843    struct kvs_node *node;
844
845    // # KV instances
846    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
847    offset += sizeof(_n_kv);
848    n_kv = _endian_decode(_n_kv);
849
850    // ID counter
851    memcpy(&_id_counter, (uint8_t*)data + offset, sizeof(_id_counter));
852    offset += sizeof(_id_counter);
853    id_counter = _endian_decode(_id_counter);
854
855    spin_lock(&kv_header->lock);
856    kv_header->id_counter = id_counter;
857
858    // Version control
859    if (!ver_is_atleast_magic_001(version)) {
860        is_deltasize = false;
861        _deltasize = 0;
862        _ndeletes = 0;
863    } else {
864        is_deltasize = true;
865    }
866
867    for (i=0;i<n_kv;++i){
868        // name length
869        uint64_t name_offset;
870        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
871        offset += sizeof(_name_len);
872        name_offset = offset;
873        name_len = _endian_decode(_name_len);
874
875        // name
876        offset += name_len;
877
878        // KV ID
879        memcpy(&_kv_id, (uint8_t*)data + offset, sizeof(_kv_id));
880        offset += sizeof(_kv_id);
881        kv_id = _endian_decode(_kv_id);
882
883        // Search if a given KV header node exists or not.
884        struct kvs_node query;
885        query.id = kv_id;
886        struct avl_node *a = avl_search(kv_header->idx_id, &query.avl_id,
887                                        _kvs_cmp_id);
888        if (a) {
889            node = _get_entry(a, struct kvs_node, avl_id);
890        } else {
891            node = (struct kvs_node *)calloc(1, sizeof(struct kvs_node));
892            node->kvs_name = (char *)malloc(name_len);
893            memcpy(node->kvs_name, (uint8_t*)data + name_offset, name_len);
894            node->id = kv_id;
895            _init_op_stats(&node->op_stat);
896        }
897
898        // seq number
899        memcpy(&_seqnum, (uint8_t*)data + offset, sizeof(_seqnum));
900        offset += sizeof(_seqnum);
901        seqnum = _endian_decode(_seqnum);
902        node->seqnum = seqnum;
903
904        // # live index nodes
905        memcpy(&_nlivenodes, (uint8_t*)data + offset, sizeof(_nlivenodes));
906        offset += sizeof(_nlivenodes);
907
908        // # docs
909        memcpy(&_ndocs, (uint8_t*)data + offset, sizeof(_ndocs));
910        offset += sizeof(_ndocs);
911
912        // datasize
913        memcpy(&_datasize, (uint8_t*)data + offset, sizeof(_datasize));
914        offset += sizeof(_datasize);
915
916        // flags
917        memcpy(&_flags, (uint8_t*)data + offset, sizeof(_flags));
918        offset += sizeof(_flags);
919        flags = _endian_decode(_flags);
920
921        if (is_deltasize) {
922            // delta document + index size since previous commit
923            memcpy(&_deltasize, (uint8_t*)data + offset,
924                   sizeof(_deltasize));
925            offset += sizeof(_deltasize);
926            memcpy(&_ndeletes, (uint8_t*)data + offset,
927                   sizeof(_ndeletes));
928            offset += sizeof(_ndeletes);
929        }
930
931        if (!only_seq_nums) {
932            node->stat.nlivenodes = _endian_decode(_nlivenodes);
933            node->stat.ndocs = _endian_decode(_ndocs);
934            node->stat.datasize = _endian_decode(_datasize);
935            node->stat.deltasize = _endian_decode(_deltasize);
936            node->stat.ndeletes = _endian_decode(_ndeletes);
937            node->flags = flags;
938            node->custom_cmp = NULL;
939        }
940
941        if (!a) { // Insert a new KV header node if not exist.
942            avl_insert(kv_header->idx_name, &node->avl_name, _kvs_cmp_name);
943            avl_insert(kv_header->idx_id, &node->avl_id, _kvs_cmp_id);
944            ++kv_header->num_kv_stores;
945        }
946    }
947    spin_unlock(&kv_header->lock);
948}
949
950fdb_status _fdb_kvs_get_snap_info(void *data, uint64_t version,
951                                  fdb_snapshot_info_t *snap_info)
952{
953    int i, offset = 0, sizeof_skipped_segments;
954    uint16_t name_len, _name_len;
955    int64_t n_kv, _n_kv;
956    bool is_deltasize;
957    fdb_seqnum_t _seqnum;
958    // Version control
959    if (!ver_is_atleast_magic_001(version)) {
960        is_deltasize = false;
961    } else {
962        is_deltasize = true;
963    }
964
965    // # KV instances
966    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
967    offset += sizeof(_n_kv);
968    // since n_kv doesn't count the default KVS, increase it by 1.
969    n_kv = _endian_decode(_n_kv) + 1;
970    assert(n_kv); // Must have at least one kv instance
971    snap_info->kvs_markers = (fdb_kvs_commit_marker_t *)malloc(
972                                   (n_kv) * sizeof(fdb_kvs_commit_marker_t));
973    if (!snap_info->kvs_markers) { // LCOV_EXCL_START
974        return FDB_RESULT_ALLOC_FAIL;
975    } // LCOV_EXCL_STOP
976
977    snap_info->num_kvs_markers = n_kv;
978
979    // Skip over ID counter
980    offset += sizeof(fdb_kvs_id_t);
981
982    sizeof_skipped_segments = sizeof(uint64_t) // seqnum will be the last read
983                            + sizeof(uint64_t) // skip over nlivenodes
984                            + sizeof(uint64_t) // skip over ndocs
985                            + sizeof(uint64_t) // skip over datasize
986                            + sizeof(uint64_t); // skip over flags
987    if (is_deltasize) {
988        sizeof_skipped_segments += sizeof(uint64_t); // skip over deltasize
989        sizeof_skipped_segments += sizeof(uint64_t); // skip over ndeletes
990    }
991
992    for (i = 0; i < n_kv-1; ++i){
993        fdb_kvs_commit_marker_t *info = &snap_info->kvs_markers[i];
994        // Read the kv store name length
995        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
996        offset += sizeof(_name_len);
997        name_len = _endian_decode(_name_len);
998
999        // Retrieve the KV Store name
1000        info->kv_store_name = (char *)malloc(name_len); // TODO: cleanup if err
1001        memcpy(info->kv_store_name, (uint8_t*)data + offset, name_len);
1002        offset += name_len;
1003
1004        // Skip over KV ID
1005        offset += sizeof(uint64_t);
1006
1007        // Retrieve the KV Store Commit Sequence number
1008        memcpy(&_seqnum, (uint8_t*)data + offset, sizeof(_seqnum));
1009        info->seqnum = _endian_decode(_seqnum);
1010
1011        // Skip over seqnum, nlivenodes, ndocs, datasize, flags etc onto next..
1012        offset += sizeof_skipped_segments;
1013    }
1014
1015    return FDB_RESULT_SUCCESS;
1016}
1017
1018uint64_t _kvs_stat_get_sum_attr(void *data, uint64_t version,
1019                                kvs_stat_attr_t attr)
1020{
1021    uint64_t ret = 0;
1022    int i, offset = 0;
1023    uint16_t name_len, _name_len;
1024    int64_t n_kv, _n_kv;
1025    bool is_deltasize;
1026    uint64_t nlivenodes, ndocs, datasize, flags;
1027    int64_t deltasize;
1028
1029    // Version control
1030    if (!ver_is_atleast_magic_001(version)) {
1031        is_deltasize = false;
1032    } else {
1033        is_deltasize = true;
1034    }
1035
1036    // # KV instances
1037    memcpy(&_n_kv, (uint8_t*)data + offset, sizeof(_n_kv));
1038    offset += sizeof(_n_kv);
1039    // since n_kv doesn't count the default KVS, increase it by 1.
1040    n_kv = _endian_decode(_n_kv) + 1;
1041    assert(n_kv); // Must have at least one kv instance
1042
1043    // Skip over ID counter
1044    offset += sizeof(fdb_kvs_id_t);
1045
1046    for (i = 0; i < n_kv-1; ++i){
1047        // Read the kv store name length and skip over the length
1048        memcpy(&_name_len, (uint8_t*)data + offset, sizeof(_name_len));
1049        offset += sizeof(_name_len);
1050        name_len = _endian_decode(_name_len);
1051
1052        // Skip over the KV Store name
1053        offset += name_len;
1054
1055        // Skip over KV ID
1056        offset += sizeof(uint64_t);
1057
1058        // Skip over KV store seqnum
1059        offset += sizeof(uint64_t);
1060
1061        // pick just the attribute requested, skipping over rest..
1062        if (attr == KVS_STAT_NLIVENODES) {
1063            memcpy(&nlivenodes, (uint8_t *)data + offset, sizeof(nlivenodes));
1064            ret += _endian_decode(nlivenodes);
1065            // skip over nlivenodes just read
1066            offset += sizeof(nlivenodes);
1067            // skip over ndocs, datasize, flags (and deltasize, ndeletes)
1068            offset += sizeof(nlivenodes) + sizeof(ndocs) + sizeof(datasize)
1069                   + sizeof(flags) + (is_deltasize ? sizeof(deltasize)*2 : 0);
1070        } else if (attr == KVS_STAT_DATASIZE) {
1071            offset += sizeof(nlivenodes) + sizeof(ndocs);
1072            memcpy(&datasize, (uint8_t *)data + offset, sizeof(datasize));
1073            ret += _endian_decode(datasize);
1074            // skip over datasize, flags (and deltasize, ndeletes)
1075            offset += sizeof(datasize) + sizeof(flags)
1076                   + (is_deltasize ? sizeof(deltasize)*2 : 0);
1077        } else if (attr == KVS_STAT_DELTASIZE) {
1078            if (is_deltasize) {
1079                offset += sizeof(nlivenodes) + sizeof(ndocs) + sizeof (datasize)
1080                        + sizeof(flags);
1081                memcpy(&deltasize, (uint8_t *)data + offset, sizeof(deltasize));
1082                ret += _endian_decode(deltasize);
1083                // skip over datasize, flags (and deltasize)
1084                offset += sizeof(deltasize)*2; // and ndeletes
1085            }
1086        } else { // Attribute fetched not implemented yet..
1087            fdb_assert(false, 0, attr); // Implement fetch for this attribute
1088        }
1089    }
1090
1091    return ret;
1092}
1093
1094uint64_t fdb_kvs_header_append(fdb_kvs_handle *handle)
1095{
1096    char *doc_key = alca(char, 32);
1097    void *data;
1098    size_t len;
1099    uint64_t kv_info_offset, prev_offset;
1100    struct docio_object doc;
1101    struct docio_length doc_len;
1102    struct filemgr *file = handle->file;
1103    struct docio_handle *dhandle = handle->dhandle;
1104
1105    _fdb_kvs_header_export(file->kv_header, &data, &len, file->version);
1106
1107    prev_offset = handle->kv_info_offset;
1108
1109    memset(&doc, 0, sizeof(struct docio_object));
1110    sprintf(doc_key, "KV_header");
1111    doc.key = (void *)doc_key;
1112    doc.meta = NULL;
1113    doc.body = data;
1114    doc.length.keylen = strlen(doc_key) + 1;
1115    doc.length.metalen = 0;
1116    doc.length.bodylen = len;
1117    doc.seqnum = 0;
1118    kv_info_offset = docio_append_doc_system(dhandle, &doc);
1119    free(data);
1120
1121    if (prev_offset != BLK_NOT_FOUND) {
1122        if (docio_read_doc_length(handle->dhandle, &doc_len, prev_offset)
1123            == FDB_RESULT_SUCCESS) {
1124            // mark stale
1125            filemgr_mark_stale(handle->file, prev_offset, _fdb_get_docsize(doc_len));
1126        }
1127    }
1128
1129    return kv_info_offset;
1130}
1131
1132void fdb_kvs_header_read(struct kvs_header *kv_header,
1133                         struct docio_handle *dhandle,
1134                         uint64_t kv_info_offset,
1135                         uint64_t version,
1136                         bool only_seq_nums)
1137{
1138    int64_t offset;
1139    struct docio_object doc;
1140
1141    memset(&doc, 0, sizeof(struct docio_object));
1142    offset = docio_read_doc(dhandle, kv_info_offset, &doc, true);
1143
1144    if (offset <= 0) {
1145        fdb_log(dhandle->log_callback, (fdb_status) offset,
1146                "Failed to read a KV header with the offset %" _F64 " from a "
1147                "database file '%s'", kv_info_offset, dhandle->file->filename);
1148        return;
1149    }
1150
1151    _fdb_kvs_header_import(kv_header, doc.body, doc.length.bodylen,
1152                           version, only_seq_nums);
1153    free_docio_object(&doc, 1, 1, 1);
1154}
1155
1156fdb_seqnum_t fdb_kvs_get_committed_seqnum(fdb_kvs_handle *handle)
1157{
1158    uint8_t *buf;
1159    uint64_t dummy64;
1160    uint64_t version;
1161    uint64_t kv_info_offset;
1162    size_t len;
1163    bid_t hdr_bid;
1164    fdb_seqnum_t seqnum = SEQNUM_NOT_USED;
1165    fdb_kvs_id_t id = 0;
1166    char *compacted_filename = NULL;
1167    struct filemgr *file = handle->file;
1168
1169    buf = alca(uint8_t, file->config->blocksize);
1170
1171    if (handle->kvs && handle->kvs->id > 0) {
1172        id = handle->kvs->id;
1173    }
1174
1175    hdr_bid = filemgr_get_header_bid(file);
1176    if (hdr_bid == BLK_NOT_FOUND) {
1177        // header doesn't exist
1178        return 0;
1179    }
1180
1181    // read header
1182    filemgr_fetch_header(file, hdr_bid, buf, &len, &seqnum, NULL, NULL,
1183                         &version, NULL, &handle->log_callback);
1184    if (id > 0) { // non-default KVS
1185        // read last KVS header
1186        fdb_fetch_header(version, buf, &dummy64, &dummy64,
1187                         &dummy64, &dummy64, &dummy64, &dummy64,
1188                         &dummy64, &dummy64,
1189                         &kv_info_offset, &dummy64,
1190                         &compacted_filename, NULL);
1191
1192        int64_t doc_offset;
1193        struct kvs_header *kv_header;
1194        struct docio_object doc;
1195
1196        _fdb_kvs_header_create(&kv_header);
1197        memset(&doc, 0, sizeof(struct docio_object));
1198        doc_offset = docio_read_doc(handle->dhandle,
1199                                    kv_info_offset, &doc, true);
1200
1201        if (doc_offset <= 0) {
1202            // fail
1203            _fdb_kvs_header_free(kv_header);
1204            return 0;
1205
1206        } else {
1207            _fdb_kvs_header_import(kv_header, doc.body,
1208                                   doc.length.bodylen, version, false);
1209            // get local sequence number for the KV instance
1210            seqnum = _fdb_kvs_get_seqnum(kv_header,
1211                                         handle->kvs->id);
1212            _fdb_kvs_header_free(kv_header);
1213            free_docio_object(&doc, 1, 1, 1);
1214        }
1215    }
1216    return seqnum;
1217}
1218
1219LIBFDB_API
1220fdb_status fdb_get_kvs_seqnum(fdb_kvs_handle *handle, fdb_seqnum_t *seqnum)
1221{
1222    if (!handle) {
1223        return FDB_RESULT_INVALID_HANDLE;
1224    }
1225
1226    if (!seqnum) {
1227        return FDB_RESULT_INVALID_ARGS;
1228    }
1229
1230    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
1231        return FDB_RESULT_HANDLE_BUSY;
1232    }
1233
1234    if (handle->shandle) {
1235        // handle for snapshot
1236        // return MAX_SEQNUM instead of the file's sequence number
1237        *seqnum = handle->max_seqnum;
1238    } else {
1239        fdb_check_file_reopen(handle, NULL);
1240        fdb_sync_db_header(handle);
1241
1242        struct filemgr *file;
1243        file = handle->file;
1244
1245        if (handle->kvs == NULL ||
1246            handle->kvs->id == 0) {
1247            filemgr_mutex_lock(file);
1248            *seqnum = filemgr_get_seqnum(file);
1249            filemgr_mutex_unlock(file);
1250        } else {
1251            *seqnum = fdb_kvs_get_seqnum(file, handle->kvs->id);
1252        }
1253    }
1254    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
1255    return FDB_RESULT_SUCCESS;
1256}
1257
1258void fdb_kvs_set_seqnum(struct filemgr *file,
1259                           fdb_kvs_id_t id,
1260                           fdb_seqnum_t seqnum)
1261{
1262    struct kvs_header *kv_header = file->kv_header;
1263    struct kvs_node query, *node;
1264    struct avl_node *a;
1265
1266    if (id == 0) {
1267        // default KV instance
1268        filemgr_set_seqnum(file, seqnum);
1269        return;
1270    }
1271
1272    spin_lock(&kv_header->lock);
1273    query.id = id;
1274    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1275    node = _get_entry(a, struct kvs_node, avl_id);
1276    node->seqnum = seqnum;
1277    spin_unlock(&kv_header->lock);
1278}
1279
1280void _fdb_kvs_header_free(struct kvs_header *kv_header)
1281{
1282    struct kvs_node *node;
1283    struct avl_node *a;
1284
1285    a = avl_first(kv_header->idx_name);
1286    while (a) {
1287        node = _get_entry(a, struct kvs_node, avl_name);
1288        a = avl_next(a);
1289        avl_remove(kv_header->idx_name, &node->avl_name);
1290
1291        free(node->kvs_name);
1292        free(node);
1293    }
1294    free(kv_header->idx_name);
1295    free(kv_header->idx_id);
1296    free(kv_header);
1297}
1298
1299void fdb_kvs_header_free(struct filemgr *file)
1300{
1301    if (file->kv_header == NULL) {
1302        return;
1303    }
1304
1305    _fdb_kvs_header_free(file->kv_header);
1306    file->kv_header = NULL;
1307}
1308
1309static fdb_status _fdb_kvs_create(fdb_kvs_handle *root_handle,
1310                                  const char *kvs_name,
1311                                  fdb_kvs_config *kvs_config)
1312{
1313    int kv_ins_name_len;
1314    fdb_status fs = FDB_RESULT_SUCCESS;
1315    struct avl_node *a;
1316    struct filemgr *file;
1317    struct kvs_node *node, query;
1318    struct kvs_header *kv_header;
1319
1320    if (root_handle->config.multi_kv_instances == false) {
1321        // cannot open KV instance under single DB instance mode
1322        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_CONFIG,
1323                       "Cannot open or create KV store instance '%s' because multi-KV "
1324                       "store instance mode is disabled.",
1325                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1326    }
1327    if (root_handle->kvs->type != KVS_ROOT) {
1328        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_HANDLE,
1329                       "Cannot open or create KV store instance '%s' because the handle "
1330                       "doesn't support multi-KV sotre instance mode.",
1331                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1332    }
1333
1334fdb_kvs_create_start:
1335    fdb_check_file_reopen(root_handle, NULL);
1336    filemgr_mutex_lock(root_handle->file);
1337    fdb_sync_db_header(root_handle);
1338
1339    if (filemgr_is_rollback_on(root_handle->file)) {
1340        filemgr_mutex_unlock(root_handle->file);
1341        return FDB_RESULT_FAIL_BY_ROLLBACK;
1342    }
1343
1344    file = root_handle->file;
1345
1346    file_status_t fstatus = filemgr_get_file_status(file);
1347    if (fstatus == FILE_REMOVED_PENDING) {
1348        // we must not write into this file
1349        // file status was changed by other thread .. start over
1350        filemgr_mutex_unlock(file);
1351        goto fdb_kvs_create_start;
1352    }
1353
1354    kv_header = file->kv_header;
1355    spin_lock(&kv_header->lock);
1356
1357    // find existing KV instance
1358    // search by name
1359    query.kvs_name = (char*)kvs_name;
1360    a = avl_search(kv_header->idx_name, &query.avl_name, _kvs_cmp_name);
1361    if (a) { // KV name already exists
1362        spin_unlock(&kv_header->lock);
1363        filemgr_mutex_unlock(file);
1364        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1365                       "Failed to create KV Store '%s' as it already exists.",
1366                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1367    }
1368
1369    // create a kvs_node and insert
1370    node = (struct kvs_node *)calloc(1, sizeof(struct kvs_node));
1371    node->id = kv_header->id_counter++;
1372    node->seqnum = 0;
1373    node->flags = 0x0;
1374    _init_op_stats(&node->op_stat);
1375    // search fhandle's custom cmp func list first
1376    node->custom_cmp = fdb_kvs_find_cmp_name(root_handle,
1377                                             (char *)kvs_name);
1378    if (node->custom_cmp == NULL && kvs_config->custom_cmp) {
1379        // follow kvs_config's custom cmp next
1380        node->custom_cmp = kvs_config->custom_cmp;
1381        // if custom cmp function is given by user but
1382        // there is no corresponding function in fhandle's list
1383        // add it into the list
1384        fdb_file_handle_add_cmp_func(root_handle->fhandle,
1385                                     (char*)kvs_name,
1386                                     kvs_config->custom_cmp);
1387    }
1388    if (node->custom_cmp) { // custom cmp function is used
1389        node->flags |= KVS_FLAG_CUSTOM_CMP;
1390        kv_header->custom_cmp_enabled = 1;
1391    }
1392    kv_ins_name_len = strlen(kvs_name)+1;
1393    node->kvs_name = (char *)malloc(kv_ins_name_len);
1394    strcpy(node->kvs_name, kvs_name);
1395
1396    avl_insert(kv_header->idx_name, &node->avl_name, _kvs_cmp_name);
1397    avl_insert(kv_header->idx_id, &node->avl_id, _kvs_cmp_id);
1398    ++kv_header->num_kv_stores;
1399    spin_unlock(&kv_header->lock);
1400
1401    // if compaction is in-progress,
1402    // create a same kvs_node for the new file
1403    if (filemgr_get_file_status(file) == FILE_COMPACT_OLD) {
1404
1405        struct filemgr *new_file = filemgr_get_instance(file->new_filename);
1406
1407        if (new_file) {
1408            struct kvs_node *node_new;
1409            struct kvs_header *kv_header_new;
1410
1411            kv_header_new = new_file->kv_header;
1412            node_new = (struct kvs_node*)calloc(1, sizeof(struct kvs_node));
1413            *node_new = *node;
1414            node_new->kvs_name = (char*)malloc(kv_ins_name_len);
1415            strcpy(node_new->kvs_name, kvs_name);
1416
1417            // insert into new file's kv_header
1418            spin_lock(&kv_header_new->lock);
1419            if (node->custom_cmp) {
1420                kv_header_new->custom_cmp_enabled = 1;
1421            }
1422            avl_insert(kv_header_new->idx_name, &node_new->avl_name, _kvs_cmp_name);
1423            avl_insert(kv_header_new->idx_id, &node_new->avl_id, _kvs_cmp_id);
1424            spin_unlock(&kv_header_new->lock);
1425        } else {
1426            // new_file should have been found if compaction is in progress
1427            fdb_assert(new_file, new_file, NULL);
1428        }
1429    }
1430
1431    // since this function calls filemgr_commit() and appends a new DB header,
1432    // we should finalize & flush the previous dirty update before commit.
1433    bid_t dirty_idtree_root = BLK_NOT_FOUND;
1434    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
1435    struct filemgr_dirty_update_node *prev_node = NULL;
1436    struct filemgr_dirty_update_node *new_node = NULL;
1437
1438    _fdb_dirty_update_ready(root_handle, &prev_node, &new_node,
1439                            &dirty_idtree_root, &dirty_seqtree_root, false);
1440
1441    _fdb_dirty_update_finalize(root_handle, prev_node, new_node,
1442                               &dirty_idtree_root, &dirty_seqtree_root, true);
1443
1444    // append system doc
1445    root_handle->kv_info_offset = fdb_kvs_header_append(root_handle);
1446
1447    // if no compaction is being performed, append header and commit
1448    if (root_handle->file == file) {
1449        uint64_t cur_bmp_revnum = sb_get_bmp_revnum(file);
1450        root_handle->last_hdr_bid = filemgr_alloc(file, &root_handle->log_callback);
1451        root_handle->cur_header_revnum = fdb_set_file_header(root_handle, true);
1452        fs = filemgr_commit_bid(root_handle->file,
1453                                root_handle->last_hdr_bid,
1454                                cur_bmp_revnum,
1455                                !(root_handle->config.durability_opt & FDB_DRB_ASYNC),
1456                                &root_handle->log_callback);
1457        btreeblk_reset_subblock_info(root_handle->bhandle);
1458    }
1459
1460    filemgr_mutex_unlock(file);
1461
1462    return fs;
1463}
1464
1465// this function just returns pointer
1466char* _fdb_kvs_get_name(fdb_kvs_handle *handle, struct filemgr *file)
1467{
1468    struct kvs_node *node, query;
1469    struct avl_node *a;
1470
1471    if (handle->kvs == NULL) {
1472        // single KV instance mode
1473        return NULL;
1474    }
1475
1476    query.id = handle->kvs->id;
1477    if (query.id == 0) { // default KV instance
1478        return NULL;
1479    }
1480    spin_lock(&file->kv_header->lock);
1481    a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1482    if (a) {
1483        node = _get_entry(a, struct kvs_node, avl_id);
1484        spin_unlock(&file->kv_header->lock);
1485        return node->kvs_name;
1486    }
1487    spin_unlock(&file->kv_header->lock);
1488    return NULL;
1489}
1490
1491// this function just returns pointer to kvs_name & offset to user key
1492const char* _fdb_kvs_extract_name_off(fdb_kvs_handle *handle, void *keybuf,
1493                                      size_t *key_offset)
1494{
1495    struct kvs_node *node, query;
1496    struct avl_node *a;
1497    fdb_kvs_id_t kv_id;
1498    struct filemgr *file = handle->file;
1499
1500    if (!handle->kvs) { // single KV instance mode
1501        *key_offset = 0;
1502        return DEFAULT_KVS_NAME;
1503    }
1504
1505    *key_offset = handle->config.chunksize;
1506    buf2kvid(*key_offset, keybuf, &kv_id);
1507    query.id = kv_id;
1508    if (query.id == 0) { // default KV instance in multi kvs mode
1509        return default_kvs_name;
1510    }
1511    spin_lock(&file->kv_header->lock);
1512    a = avl_search(file->kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
1513    if (a) {
1514        node = _get_entry(a, struct kvs_node, avl_id);
1515        const char *kvs_name = node->kvs_name;
1516        spin_unlock(&file->kv_header->lock);
1517        return kvs_name;
1518    }
1519    spin_unlock(&file->kv_header->lock);
1520    return NULL;
1521}
1522
1523fdb_status _fdb_kvs_clone_snapshot(fdb_kvs_handle *handle_in,
1524                                   fdb_kvs_handle *handle_out)
1525{
1526    fdb_status fs;
1527    fdb_kvs_handle *root_handle = handle_in->kvs->root;
1528
1529    if (!handle_out->kvs) {
1530        // create kvs_info
1531        handle_out->kvs = (struct kvs_info*)calloc(1, sizeof(struct kvs_info));
1532        handle_out->kvs->type = handle_in->kvs->type;
1533        handle_out->kvs->id = handle_in->kvs->id;
1534        handle_out->kvs->root = root_handle;
1535        handle_out->kvs_config.custom_cmp = handle_in->kvs_config.custom_cmp;
1536
1537        struct kvs_opened_node *opened_node = (struct kvs_opened_node *)
1538            calloc(1, sizeof(struct kvs_opened_node));
1539        opened_node->handle = handle_out;
1540        handle_out->node = opened_node;
1541
1542        spin_lock(&root_handle->fhandle->lock);
1543        list_push_back(root_handle->fhandle->handles, &opened_node->le);
1544        spin_unlock(&root_handle->fhandle->lock);
1545    }
1546
1547    fs = _fdb_clone_snapshot(handle_in, handle_out);
1548    if (fs != FDB_RESULT_SUCCESS) {
1549        if (handle_out->node) {
1550            spin_lock(&root_handle->fhandle->lock);
1551            list_remove(root_handle->fhandle->handles, &handle_out->node->le);
1552            spin_unlock(&root_handle->fhandle->lock);
1553            free(handle_out->node);
1554        }
1555        free(handle_out->kvs);
1556    }
1557    return fs;
1558}
1559
1560// 1) allocate memory & create 'handle->kvs'
1561//    by calling fdb_kvs_info_create().
1562//      -> this will allocate a corresponding node and
1563//         insert it into fhandle->handles list.
1564// 2) if matching KVS name doesn't exist, create it.
1565// 3) call _fdb_open().
1566fdb_status _fdb_kvs_open(fdb_kvs_handle *root_handle,
1567                         fdb_config *config,
1568                         fdb_kvs_config *kvs_config,
1569                         struct filemgr *file,
1570                         const char *filename,
1571                         const char *kvs_name,
1572                         fdb_kvs_handle *handle)
1573{
1574    fdb_status fs;
1575
1576    if (handle->kvs == NULL) {
1577        // create kvs_info
1578        fdb_kvs_info_create(root_handle, handle, file, kvs_name);
1579    }
1580
1581    if (handle->kvs == NULL) {
1582        // KV instance name is not found
1583        if (!kvs_config->create_if_missing) {
1584            return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1585                           "Failed to open KV store '%s' because it doesn't exist.",
1586                           kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1587        }
1588        if (root_handle->config.flags == FDB_OPEN_FLAG_RDONLY) {
1589            return fdb_log(&root_handle->log_callback, FDB_RESULT_RONLY_VIOLATION,
1590                           "Failed to create KV store '%s' because the KV store's handle "
1591                           "is read-only.", kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1592        }
1593
1594        // create
1595        fs = _fdb_kvs_create(root_handle, kvs_name, kvs_config);
1596        if (fs != FDB_RESULT_SUCCESS) { // create fail
1597            return FDB_RESULT_INVALID_KV_INSTANCE_NAME;
1598        }
1599        // create kvs_info again
1600        fdb_kvs_info_create(root_handle, handle, file, kvs_name);
1601        if (handle->kvs == NULL) { // fail again
1602            return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_KV_INSTANCE_NAME,
1603                           "Failed to create KV store '%s' because the KV store's handle "
1604                           "is read-only.", kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1605        }
1606    }
1607    fs = _fdb_open(handle, filename, FDB_AFILENAME, config);
1608    if (fs != FDB_RESULT_SUCCESS) {
1609        if (handle->node) {
1610            spin_lock(&root_handle->fhandle->lock);
1611            list_remove(root_handle->fhandle->handles, &handle->node->le);
1612            spin_unlock(&root_handle->fhandle->lock);
1613            free(handle->node);
1614        } // 'handle->node == NULL' happens only during rollback
1615        free(handle->kvs);
1616    }
1617    return fs;
1618}
1619
1620// 1) identify whether the requested KVS is default or non-default.
1621// 2) if the requested KVS is default,
1622//   2-1) As the root handle is already opened,
1623//        -> allocate memory for handle, and call _fdb_open().
1624//        -> 'handle->kvs' will be created in _fdb_open(),
1625//           since it is treated as a default handle.
1626//        -> allocate a corresponding node and insert it into
1627//           fhandle->handles list.
1628// 3) if the requested KVS is non-default,
1629//    -> allocate memory for handle, and call _fdb_kvs_open().
1630LIBFDB_API
1631fdb_status fdb_kvs_open(fdb_file_handle *fhandle,
1632                        fdb_kvs_handle **ptr_handle,
1633                        const char *kvs_name,
1634                        fdb_kvs_config *kvs_config)
1635{
1636    fdb_kvs_handle *handle;
1637    fdb_config config;
1638    fdb_status fs;
1639    fdb_kvs_handle *root_handle;
1640    fdb_kvs_config config_local;
1641    struct filemgr *file = NULL;
1642    struct filemgr *latest_file = NULL;
1643    LATENCY_STAT_START();
1644
1645    if (!fhandle || !fhandle->root) {
1646        return FDB_RESULT_INVALID_HANDLE;
1647    }
1648
1649    root_handle = fhandle->root;
1650    config = root_handle->config;
1651
1652    if (kvs_config) {
1653        if (validate_fdb_kvs_config(kvs_config)) {
1654            config_local = *kvs_config;
1655        } else {
1656            return FDB_RESULT_INVALID_CONFIG;
1657        }
1658    } else {
1659        config_local = get_default_kvs_config();
1660    }
1661
1662    fdb_check_file_reopen(root_handle, NULL);
1663    fdb_sync_db_header(root_handle);
1664
1665    file = root_handle->file;
1666    latest_file = root_handle->file;
1667
1668    if (kvs_name == NULL || !strcmp(kvs_name, default_kvs_name)) {
1669        // return the default KV store handle
1670        spin_lock(&fhandle->lock);
1671        if (!(fhandle->flags & FHANDLE_ROOT_OPENED)) {
1672            // the root handle is not opened yet
1673            // sync up the root handle
1674            fdb_custom_cmp_variable default_kvs_cmp;
1675
1676            root_handle->kvs_config = config_local;
1677
1678            if (root_handle->file->kv_header) {
1679                // search fhandle's custom cmp func list first
1680                default_kvs_cmp = fdb_kvs_find_cmp_name(root_handle, (char *)kvs_name);
1681
1682                spin_lock(&root_handle->file->kv_header->lock);
1683                root_handle->file->kv_header->default_kvs_cmp = default_kvs_cmp;
1684
1685                if (root_handle->file->kv_header->default_kvs_cmp == NULL &&
1686                    root_handle->kvs_config.custom_cmp) {
1687                    // follow kvs_config's custom cmp next
1688                    root_handle->file->kv_header->default_kvs_cmp =
1689                        root_handle->kvs_config.custom_cmp;
1690                    fdb_file_handle_add_cmp_func(fhandle, NULL,
1691                                                 root_handle->kvs_config.custom_cmp);
1692                }
1693
1694                if (root_handle->file->kv_header->default_kvs_cmp) {
1695                    root_handle->file->kv_header->custom_cmp_enabled = 1;
1696                    fhandle->flags |= FHANDLE_ROOT_CUSTOM_CMP;
1697                }
1698                spin_unlock(&root_handle->file->kv_header->lock);
1699            }
1700
1701            fhandle->flags |= FHANDLE_ROOT_INITIALIZED;
1702            fhandle->flags |= FHANDLE_ROOT_OPENED;
1703        }
1704        // the root handle is already synced
1705        // open new default KV store handle
1706        spin_unlock(&fhandle->lock);
1707        handle = (fdb_kvs_handle*)calloc(1, sizeof(fdb_kvs_handle));
1708        handle->kvs_config = config_local;
1709        atomic_init_uint8_t(&handle->handle_busy, 0);
1710
1711        handle->fhandle = fhandle;
1712        fs = _fdb_open(handle, file->filename, FDB_AFILENAME, &config);
1713        if (fs != FDB_RESULT_SUCCESS) {
1714            free(handle);
1715            *ptr_handle = NULL;
1716        } else {
1717            // insert into fhandle's list
1718            _fdb_kvs_createNLinkKVHandle(fhandle, handle);
1719            *ptr_handle = handle;
1720        }
1721        LATENCY_STAT_END(file, FDB_LATENCY_KVS_OPEN);
1722        return fs;
1723    }
1724
1725    if (config.multi_kv_instances == false) {
1726        // cannot open KV instance under single DB instance mode
1727        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_CONFIG,
1728                       "Cannot open KV store instance '%s' because multi-KV "
1729                       "store instance mode is disabled.",
1730                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1731    }
1732    if (root_handle->kvs->type != KVS_ROOT) {
1733        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_HANDLE,
1734                       "Cannot open KV store instance '%s' because the handle "
1735                       "doesn't support multi-KV sotre instance mode.",
1736                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1737    }
1738    if (root_handle->shandle) {
1739        // cannot open KV instance from a snapshot
1740        return fdb_log(&root_handle->log_callback, FDB_RESULT_INVALID_ARGS,
1741                       "Not allowed to open KV store instance '%s' from the "
1742                       "snapshot handle.",
1743                       kvs_name ? kvs_name : DEFAULT_KVS_NAME);
1744    }
1745
1746    handle = (fdb_kvs_handle *)calloc(1, sizeof(fdb_kvs_handle));
1747    if (!handle) { // LCOV_EXCL_START
1748        return FDB_RESULT_ALLOC_FAIL;
1749    } // LCOV_EXCL_STOP
1750
1751    atomic_init_uint8_t(&handle->handle_busy, 0);
1752    handle->fhandle = fhandle;
1753    fs = _fdb_kvs_open(root_handle, &config, &config_local,
1754                       latest_file, file->filename, kvs_name, handle);
1755    if (fs == FDB_RESULT_SUCCESS) {
1756        *ptr_handle = handle;
1757    } else {
1758        *ptr_handle = NULL;
1759        free(handle);
1760    }
1761    LATENCY_STAT_END(file, FDB_LATENCY_KVS_OPEN);
1762    return fs;
1763}
1764
1765LIBFDB_API
1766fdb_status fdb_kvs_open_default(fdb_file_handle *fhandle,
1767                                fdb_kvs_handle **ptr_handle,
1768                                fdb_kvs_config *config)
1769{
1770    return fdb_kvs_open(fhandle, ptr_handle, NULL, config);
1771}
1772
1773// 1) remove corresponding node from fhandle->handles list.
1774// 2) call _fdb_close().
1775fdb_status _fdb_kvs_close(fdb_kvs_handle *handle)
1776{
1777    fdb_kvs_handle *root_handle = handle->fhandle->root;
1778    fdb_status fs;
1779
1780    if (handle->node) {
1781        spin_lock(&root_handle->fhandle->lock);
1782        list_remove(root_handle->fhandle->handles, &handle->node->le);
1783        spin_unlock(&root_handle->fhandle->lock);
1784        free(handle->node);
1785    } // 'handle->node == NULL' happens only during rollback
1786
1787    fs = _fdb_close(handle);
1788    return fs;
1789}
1790
1791// close all sub-KV store handles belonging to the root handle
1792fdb_status fdb_kvs_close_all(fdb_kvs_handle *root_handle)
1793{
1794    fdb_status fs;
1795    struct list_elem *e;
1796    struct kvs_opened_node *node;
1797
1798    spin_lock(&root_handle->fhandle->lock);
1799    e = list_begin(root_handle->fhandle->handles);
1800    while (e) {
1801        node = _get_entry(e, struct kvs_opened_node, le);
1802        e = list_remove(root_handle->fhandle->handles, &node->le);
1803        fs = _fdb_close(node->handle);
1804        if (fs != FDB_RESULT_SUCCESS) {
1805            spin_unlock(&root_handle->fhandle->lock);
1806            return fs;
1807        }
1808        fdb_kvs_info_free(node->handle);
1809        free(node->handle);
1810        free(node);
1811    }
1812    spin_unlock(&root_handle->fhandle->lock);
1813
1814    return FDB_RESULT_SUCCESS;
1815}
1816
1817// 1) identify whether the requested handle is for default KVS or not.
1818// 2) if the requested handle is for the default KVS,
1819//   2-1) if the requested handle must be the root handle,
1820//        -> call _fdb_close(),
1821//        -> free 'handle->kvs' by calling fdb_kvs_info_free(),
1822//        -> remove the corresponding node from fhandle->handles list,
1823//        -> free the memory for the handle.
1824// 3) if the requested handle is for non-default KVS,
1825//    -> call _fdb_kvs_close(),
1826//       -> this will remove the node from fhandle->handles list.
1827//    -> free 'handle->kvs' by calling fdb_kvs_info_free(),
1828//    -> free the memory for the handle.
1829LIBFDB_API
1830fdb_status fdb_kvs_close(fdb_kvs_handle *handle)
1831{
1832    fdb_status fs;
1833
1834    if (!handle) {
1835        return FDB_RESULT_INVALID_HANDLE;
1836    }
1837    if (handle->num_iterators) {
1838        // There are still active iterators created from this handle
1839        return FDB_RESULT_KV_STORE_BUSY;
1840    }
1841
1842    if (handle->shandle && handle->kvs == NULL) {
1843        // snapshot of the default KV store + single KV store mode
1844        // directly close handle
1845        // (snapshot of the other KV stores will be closed
1846        //  using _fdb_kvs_close(...) below)
1847        fs = _fdb_close(handle);
1848        if (fs == FDB_RESULT_SUCCESS) {
1849            free(handle);
1850        }
1851        return fs;
1852    }
1853
1854    if (handle->kvs == NULL ||
1855        handle->kvs->type == KVS_ROOT) {
1856        // the default KV store handle
1857
1858        fdb_assert(handle->fhandle->root != handle, handle, NULL);
1859        // the default KV store but not the root handle .. normally close
1860        spin_lock(&handle->fhandle->lock);
1861        fs = _fdb_close(handle);
1862        if (fs == FDB_RESULT_SUCCESS) {
1863            // remove from 'handles' list in the root node
1864            if (handle->kvs) {
1865                fdb_kvs_info_free(handle);
1866            }
1867            list_remove(handle->fhandle->handles, &handle->node->le);
1868            spin_unlock(&handle->fhandle->lock);
1869            free(handle->node);
1870            free(handle);
1871        } else {
1872            spin_unlock(&handle->fhandle->lock);
1873        }
1874        return fs;
1875    }
1876
1877    if (handle->kvs && handle->kvs->root == NULL) {
1878        return FDB_RESULT_INVALID_ARGS;
1879    }
1880    fs = _fdb_kvs_close(handle);
1881    if (fs == FDB_RESULT_SUCCESS) {
1882        fdb_kvs_info_free(handle);
1883        free(handle);
1884    }
1885    return fs;
1886}
1887
1888static
1889fdb_status _fdb_kvs_remove(fdb_file_handle *fhandle,
1890                           const char *kvs_name,
1891                           bool rollback_recreate)
1892{
1893    size_t size_chunk, size_id;
1894    uint8_t *_kv_id;
1895    fdb_status fs = FDB_RESULT_SUCCESS;
1896    fdb_kvs_id_t kv_id = 0;
1897    fdb_kvs_handle *root_handle;
1898    struct avl_node *a = NULL;
1899    struct filemgr *file;
1900    struct kvs_node *node, query;
1901    struct kvs_header *kv_header;
1902    hbtrie_result hr;
1903
1904    if (!fhandle || !fhandle->root) {
1905        return FDB_RESULT_INVALID_HANDLE;
1906    }
1907
1908    root_handle = fhandle->root;
1909
1910    if (root_handle->config.multi_kv_instances == false) {
1911        // cannot remove the KV instance under single DB instance mode
1912        return FDB_RESULT_INVALID_CONFIG;
1913    }
1914    if (root_handle->kvs->type != KVS_ROOT) {
1915        return FDB_RESULT_INVALID_HANDLE;
1916    }
1917
1918fdb_kvs_remove_start:
1919    fdb_check_file_reopen(root_handle, NULL);
1920    filemgr_mutex_lock(root_handle->file);
1921    fdb_sync_db_header(root_handle);
1922
1923    if (!rollback_recreate) {
1924        if (filemgr_is_rollback_on(root_handle->file)) {
1925            filemgr_mutex_unlock(root_handle->file);
1926            return FDB_RESULT_FAIL_BY_ROLLBACK;
1927        }
1928    }
1929
1930    file = root_handle->file;
1931
1932    file_status_t fstatus = filemgr_get_file_status(file);
1933    if (fstatus == FILE_REMOVED_PENDING) {
1934        // we must not write into this file
1935        // file status was changed by other thread .. start over
1936        filemgr_mutex_unlock(file);
1937        goto fdb_kvs_remove_start;
1938    } else if (fstatus == FILE_COMPACT_OLD) {
1939        // Cannot remove existing KV store during compaction.
1940        // To remove a KV store, the corresponding first chunk in HB+trie
1941        // should be unlinked. This can be possible in the old file during
1942        // compaction, but impossible in the new file, since existing documents
1943        // (including docs belonging to the KV store to be removed) are being moved.
1944        filemgr_mutex_unlock(file);
1945        return FDB_RESULT_FAIL_BY_COMPACTION;
1946    }
1947
1948    // find the kvs_node and remove
1949
1950    // search by name to get ID
1951    if (kvs_name == NULL || !strcmp(kvs_name, default_kvs_name)) {
1952        if (!rollback_recreate) {
1953            // default KV store .. KV ID = 0
1954            kv_id = 0;
1955            if (_fdb_kvs_any_handle_opened(fhandle, kv_id)) {
1956                // there is an opened handle
1957                filemgr_mutex_unlock(file);
1958                return FDB_RESULT_KV_STORE_BUSY;
1959            }
1960        }
1961        // reset KVS stats (excepting for WAL stats)
1962        file->header.stat.ndocs = 0;
1963        file->header.stat.nlivenodes = 0;
1964        file->header.stat.datasize = 0;
1965        file->header.stat.deltasize = 0;
1966
1967        // reset seqnum
1968        filemgr_set_seqnum(file, 0);
1969    } else {
1970        kv_header = file->kv_header;
1971        spin_lock(&kv_header->lock);
1972        query.kvs_name = (char*)kvs_name;
1973        a = avl_search(kv_header->idx_name, &query.avl_name, _kvs_cmp_name);
1974        if (a == NULL) { // KV name doesn't exist
1975            spin_unlock(&kv_header->lock);
1976            filemgr_mutex_unlock(file);
1977            return FDB_RESULT_KV_STORE_NOT_FOUND;
1978        }
1979        node = _get_entry(a, struct kvs_node, avl_name);
1980        kv_id = node->id;
1981
1982        if (!rollback_recreate) {
1983            spin_unlock(&kv_header->lock);
1984            if (_fdb_kvs_any_handle_opened(fhandle, kv_id)) {
1985                // there is an opened handle
1986                filemgr_mutex_unlock(file);
1987                return FDB_RESULT_KV_STORE_BUSY;
1988            }
1989            spin_lock(&kv_header->lock);
1990
1991            avl_remove(kv_header->idx_name, &node->avl_name);
1992            avl_remove(kv_header->idx_id, &node->avl_id);
1993            --kv_header->num_kv_stores;
1994            spin_unlock(&kv_header->lock);
1995
1996            kv_id = node->id;
1997
1998            // free node
1999            free(node->kvs_name);
2000            free(node);
2001        } else {
2002            // reset all stats except for WAL
2003            node->stat.ndocs = 0;
2004            node->stat.nlivenodes = 0;
2005            node->stat.datasize = 0;
2006            node->stat.deltasize = 0;
2007            node->seqnum = 0;
2008            spin_unlock(&kv_header->lock);
2009        }
2010    }
2011
2012    // discard all WAL entries
2013    wal_close_kv_ins(file, kv_id, &root_handle->log_callback);
2014
2015    bid_t dirty_idtree_root = BLK_NOT_FOUND;
2016    bid_t dirty_seqtree_root = BLK_NOT_FOUND;
2017    struct filemgr_dirty_update_node *prev_node = NULL, *new_node = NULL;
2018
2019    _fdb_dirty_update_ready(root_handle, &prev_node, &new_node,
2020                            &dirty_idtree_root, &dirty_seqtree_root, false);
2021
2022    size_id = sizeof(fdb_kvs_id_t);
2023    size_chunk = root_handle->trie->chunksize;
2024
2025    // remove from super handle's HB+trie
2026    _kv_id = alca(uint8_t, size_chunk);
2027    kvid2buf(size_chunk, kv_id, _kv_id);
2028    hr = hbtrie_remove_partial(root_handle->trie, _kv_id, size_chunk);
2029    btreeblk_end(root_handle->bhandle);
2030    if (hr  == HBTRIE_CORRUPTED_RECOVERING_ERR){
2031        filemgr_mutex_unlock(file);
2032        _fdb_invalidate_dbheader(root_handle);
2033        return FDB_RECOVERABLE_ERR;
2034    }
2035
2036    if (root_handle->config.seqtree_opt == FDB_SEQTREE_USE) {
2037        _kv_id = alca(uint8_t, size_id);
2038        kvid2buf(size_id, kv_id, _kv_id);
2039        hr = hbtrie_remove_partial(root_handle->seqtrie, _kv_id, size_id);
2040        btreeblk_end(root_handle->bhandle);
2041        if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
2042            filemgr_mutex_unlock(file);
2043            _fdb_invalidate_dbheader(root_handle);
2044            return FDB_RECOVERABLE_ERR;
2045        }
2046    }
2047
2048    _fdb_dirty_update_finalize(root_handle, prev_node, new_node,
2049                               &dirty_idtree_root, &dirty_seqtree_root, true);
2050
2051    // append system doc
2052    root_handle->kv_info_offset = fdb_kvs_header_append(root_handle);
2053
2054    // if no compaction is being performed, append header and commit
2055    if (root_handle->file == file) {
2056        uint64_t cur_bmp_revnum = sb_get_bmp_revnum(file);
2057        root_handle->last_hdr_bid = filemgr_alloc(file, &root_handle->log_callback);
2058        root_handle->cur_header_revnum = fdb_set_file_header(root_handle, true);
2059        fs = filemgr_commit_bid(root_handle->file,
2060                                root_handle->last_hdr_bid,
2061                                cur_bmp_revnum,
2062                                !(root_handle->config.durability_opt & FDB_DRB_ASYNC),
2063                                &root_handle->log_callback);
2064        btreeblk_reset_subblock_info(root_handle->bhandle);
2065    }
2066
2067    filemgr_mutex_unlock(file);
2068
2069    return fs;
2070}
2071
2072bool _fdb_kvs_is_busy(fdb_file_handle *fhandle)
2073{
2074    bool ret = false;
2075    struct filemgr *file = fhandle->root->file;
2076    struct avl_node *a;
2077    struct filemgr_fhandle_idx_node *fhandle_node;
2078    fdb_file_handle *file_handle;
2079
2080    spin_lock(&file->fhandle_idx_lock);
2081    a = avl_first(&file->fhandle_idx);
2082    while (a) {
2083        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
2084        a = avl_next(a);
2085        file_handle = (fdb_file_handle *) fhandle_node->fhandle;
2086        spin_lock(&file_handle->lock);
2087        if (list_begin(file_handle->handles) != NULL) {
2088            ret = true;
2089            spin_unlock(&file_handle->lock);
2090            break;
2091        }
2092        spin_unlock(&file_handle->lock);
2093    }
2094    spin_unlock(&file->fhandle_idx_lock);
2095
2096    return ret;
2097}
2098
2099fdb_status fdb_kvs_rollback(fdb_kvs_handle **handle_ptr, fdb_seqnum_t seqnum)
2100{
2101    fdb_config config;
2102    fdb_kvs_config kvs_config;
2103    fdb_kvs_handle *handle_in, *handle, *super_handle;
2104    fdb_status fs;
2105    fdb_seqnum_t old_seqnum;
2106    fdb_file_handle *fhandle;
2107    char *kvs_name;
2108
2109    if (!handle_ptr) {
2110        return FDB_RESULT_INVALID_HANDLE;
2111    }
2112
2113    handle_in = *handle_ptr;
2114
2115    if (!handle_in) {
2116        return FDB_RESULT_INVALID_HANDLE;
2117    }
2118
2119    if (!handle_in->kvs) {
2120        return FDB_RESULT_INVALID_ARGS;
2121    }
2122    super_handle = handle_in->kvs->root;
2123    fhandle = handle_in->fhandle;
2124    config = handle_in->config;
2125    kvs_config = handle_in->kvs_config;
2126
2127    if (handle_in->config.flags & FDB_OPEN_FLAG_RDONLY) {
2128        return fdb_log(&handle_in->log_callback,
2129                       FDB_RESULT_RONLY_VIOLATION,
2130                       "Warning: Rollback is not allowed on "
2131                       "the read-only DB file '%s'.",
2132                       handle_in->file->filename);
2133    }
2134
2135    filemgr_mutex_lock(handle_in->file);
2136    filemgr_set_rollback(handle_in->file, 1); // disallow writes operations
2137    // All transactions should be closed before rollback
2138    if (wal_txn_exists(handle_in->file)) {
2139        filemgr_set_rollback(handle_in->file, 0);
2140        filemgr_mutex_unlock(handle_in->file);
2141        return FDB_RESULT_FAIL_BY_TRANSACTION;
2142    }
2143
2144    // If compaction is running, wait until it is aborted.
2145    // TODO: Find a better way of waiting for the compaction abortion.
2146    unsigned int sleep_time = 10000; // 10 ms.
2147    file_status_t fstatus = filemgr_get_file_status(handle_in->file);
2148    while (fstatus == FILE_COMPACT_OLD) {
2149        filemgr_mutex_unlock(handle_in->file);
2150        decaying_usleep(&sleep_time, 1000000);
2151        filemgr_mutex_lock(handle_in->file);
2152        fstatus = filemgr_get_file_status(handle_in->file);
2153    }
2154    if (fstatus == FILE_REMOVED_PENDING) {
2155        filemgr_mutex_unlock(handle_in->file);
2156        fdb_check_file_reopen(handle_in, NULL);
2157    } else {
2158        filemgr_mutex_unlock(handle_in->file);
2159    }
2160
2161    fdb_sync_db_header(handle_in);
2162
2163    // if the max sequence number seen by this handle is lower than the
2164    // requested snapshot marker, it means the snapshot is not yet visible
2165    // even via the current fdb_kvs_handle
2166    if (seqnum > handle_in->seqnum) {
2167        filemgr_set_rollback(super_handle->file, 0); // allow mutations
2168        return FDB_RESULT_NO_DB_INSTANCE;
2169    }
2170
2171    kvs_name = _fdb_kvs_get_name(handle_in, handle_in->file);
2172    if (seqnum == 0) { // Handle special case of rollback to zero..
2173        fs = _fdb_kvs_remove(fhandle, kvs_name, true /*recreate!*/);
2174        filemgr_set_rollback(super_handle->file, 0); // allow mutations
2175        return fs;
2176    }
2177
2178    handle = (fdb_kvs_handle *) calloc(1, sizeof(fdb_kvs_handle));
2179    if (!handle) { // LCOV_EXCL_START
2180        filemgr_set_rollback(handle_in->file, 0); // allow mutations
2181        return FDB_RESULT_ALLOC_FAIL;
2182    } // LCOV_EXCL_STOP
2183
2184    handle->max_seqnum = seqnum;
2185    handle->log_callback = handle_in->log_callback;
2186    handle->fhandle = fhandle;
2187    atomic_init_uint8_t(&handle->handle_busy, 0);
2188
2189    if (handle_in->kvs->type == KVS_SUB) {
2190        fs = _fdb_kvs_open(handle_in->kvs->root,
2191                           &config,
2192                           &kvs_config,
2193                           handle_in->file,
2194                           handle_in->file->filename,
2195                           kvs_name,
2196                           handle);
2197    } else {
2198        fs = _fdb_open(handle, handle_in->file->filename,
2199                       FDB_AFILENAME, &config);
2200    }
2201    filemgr_set_rollback(handle_in->file, 0); // allow mutations
2202
2203    if (fs == FDB_RESULT_SUCCESS) {
2204        // get KV instance's sub B+trees' root node BIDs
2205        // from both ID-tree and Seq-tree, AND
2206        // replace current handle's sub B+trees' root node BIDs
2207        // by old BIDs
2208        size_t size_chunk, size_id;
2209        bid_t id_root, seq_root, dummy;
2210        uint8_t *_kv_id;
2211        hbtrie_result hr;
2212
2213        size_chunk = handle->trie->chunksize;
2214        size_id = sizeof(fdb_kvs_id_t);
2215
2216        filemgr_mutex_lock(handle_in->file);
2217
2218        // read root BID of the KV instance from the old handle
2219        // and overwrite into the current handle
2220        _kv_id = alca(uint8_t, size_chunk);
2221        kvid2buf(size_chunk, handle->kvs->id, _kv_id);
2222        hr = hbtrie_find_partial(handle->trie, _kv_id,
2223                                 size_chunk, &id_root);
2224        btreeblk_end(handle->bhandle);
2225        if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
2226            _fdb_invalidate_dbheader(handle_in);
2227            _fdb_kvs_close(handle);
2228            fdb_kvs_info_free(handle);
2229            free(handle);
2230            return FDB_RECOVERABLE_ERR;
2231        }
2232        if (hr == HBTRIE_RESULT_SUCCESS) {
2233            hr = hbtrie_insert_partial(super_handle->trie,
2234                                  _kv_id, size_chunk,
2235                                  &id_root, &dummy);
2236            if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
2237                _fdb_invalidate_dbheader(handle_in);
2238                _fdb_kvs_close(handle);
2239                fdb_kvs_info_free(handle);
2240                free(handle);
2241                return FDB_RECOVERABLE_ERR;
2242            }
2243        } else { // No Trie info in rollback header.
2244                 // Erase kv store from super handle's main index.
2245            hr = hbtrie_remove_partial(super_handle->trie, _kv_id, size_chunk);
2246            if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
2247                _fdb_invalidate_dbheader(handle_in);
2248                fdb_kvs_info_free(handle);
2249                free(handle);
2250                return FDB_RECOVERABLE_ERR;
2251            }
2252        }
2253        btreeblk_end(super_handle->bhandle);
2254
2255        if (config.seqtree_opt == FDB_SEQTREE_USE) {
2256            // same as above for seq-trie
2257            _kv_id = alca(uint8_t, size_id);
2258            kvid2buf(size_id, handle->kvs->id, _kv_id);
2259            hr = hbtrie_find_partial(handle->seqtrie, _kv_id,
2260                                     size_id, &seq_root);
2261            btreeblk_end(handle->bhandle);
2262            if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
2263                _fdb_invalidate_dbheader(handle_in);
2264                _fdb_kvs_close(handle);
2265                fdb_kvs_info_free(handle);
2266                free(handle);
2267                return FDB_RECOVERABLE_ERR;
2268            }
2269            if (hr == HBTRIE_RESULT_SUCCESS) {
2270                hr = hbtrie_insert_partial(super_handle->seqtrie,
2271                                      _kv_id, size_id,
2272                                      &seq_root, &dummy);
2273                if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
2274                    _fdb_invalidate_dbheader(handle_in);
2275                    _fdb_kvs_close(handle);
2276                    fdb_kvs_info_free(handle);
2277                    free(handle);
2278                    return FDB_RECOVERABLE_ERR;
2279                }
2280            } else { // No seqtrie info in rollback header.
2281                     // Erase kv store from super handle's seqtrie index.
2282                hr = hbtrie_remove_partial(super_handle->seqtrie, _kv_id, size_id);
2283                if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
2284                    _fdb_invalidate_dbheader(handle_in);
2285                    _fdb_kvs_close(handle);
2286                    fdb_kvs_info_free(handle);
2287                    free(handle);
2288                    return FDB_RECOVERABLE_ERR;
2289                }
2290            }
2291            btreeblk_end(super_handle->bhandle);
2292        }
2293
2294        old_seqnum = fdb_kvs_get_seqnum(handle_in->file,
2295                                        handle_in->kvs->id);
2296        fdb_kvs_set_seqnum(handle_in->file,
2297                           handle_in->kvs->id, seqnum);
2298        handle_in->seqnum = seqnum;
2299        filemgr_mutex_unlock(handle_in->file);
2300
2301        super_handle->rollback_revnum = handle->rollback_revnum;
2302        fs = _fdb_commit(super_handle, FDB_COMMIT_MANUAL_WAL_FLUSH,
2303                         !(handle_in->config.durability_opt & FDB_DRB_ASYNC));
2304        if (fs == FDB_RESULT_SUCCESS) {
2305            _fdb_kvs_close(handle);
2306            *handle_ptr = handle_in;
2307            fdb_kvs_info_free(handle);
2308            free(handle);
2309        } else {
2310            // cancel the rolling-back of the sequence number
2311            fdb_log(&handle_in->log_callback, fs,
2312                    "Rollback failed due to a commit failure with a sequence "
2313                    "number %" _F64, seqnum);
2314            filemgr_mutex_lock(handle_in->file);
2315            fdb_kvs_set_seqnum(handle_in->file,
2316                               handle_in->kvs->id, old_seqnum);
2317            filemgr_mutex_unlock(handle_in->file);
2318            _fdb_kvs_close(handle);
2319            fdb_kvs_info_free(handle);
2320            free(handle);
2321        }
2322    } else {
2323        free(handle);
2324    }
2325
2326    return fs;
2327}
2328
2329LIBFDB_API
2330fdb_status fdb_kvs_remove(fdb_file_handle *fhandle,
2331                          const char *kvs_name)
2332{
2333    return _fdb_kvs_remove(fhandle, kvs_name, false);
2334}
2335
2336LIBFDB_API
2337fdb_status fdb_get_kvs_info(fdb_kvs_handle *handle, fdb_kvs_info *info)
2338{
2339    uint64_t ndocs;
2340    uint64_t ndeletes;
2341    uint64_t wal_docs;
2342    uint64_t wal_deletes;
2343    uint64_t wal_n_inserts;
2344    uint64_t datasize;
2345    uint64_t nlivenodes;
2346    fdb_kvs_id_t kv_id;
2347    struct avl_node *a;
2348    struct filemgr *file;
2349    struct kvs_node *node, query;
2350    struct kvs_header *kv_header;
2351    struct kvs_stat stat;
2352
2353    if (!handle) {
2354        return FDB_RESULT_INVALID_HANDLE;
2355    }
2356
2357    if (!info) {
2358        return FDB_RESULT_INVALID_ARGS;
2359    }
2360
2361    if (!atomic_cas_uint8_t(&handle->handle_busy, 0, 1)) {
2362        return FDB_RESULT_HANDLE_BUSY;
2363    }
2364
2365    if (!handle->shandle) { // snapshot handle should be immutable
2366        fdb_check_file_reopen(handle, NULL);
2367        fdb_sync_db_header(handle);
2368    }
2369
2370    file = handle->file;
2371
2372    if (handle->kvs == NULL) {
2373        info->name = default_kvs_name;
2374        kv_id = 0;
2375
2376    } else {
2377        kv_header = file->kv_header;
2378        kv_id = handle->kvs->id;
2379        spin_lock(&kv_header->lock);
2380
2381        query.id = handle->kvs->id;
2382        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_cmp_id);
2383        if (a) { // sub handle
2384            node = _get_entry(a, struct kvs_node, avl_id);
2385            info->name = (const char*)node->kvs_name;
2386        } else { // root handle
2387            info->name = default_kvs_name;
2388        }
2389        spin_unlock(&kv_header->lock);
2390    }
2391
2392    if (handle->shandle) {
2393        // snapshot .. get its local stats
2394        snap_get_stat(handle->shandle, &stat);
2395    } else {
2396        _kvs_stat_get(file, kv_id, &stat);
2397    }
2398    ndocs = stat.ndocs;
2399    ndeletes = stat.ndeletes;
2400    wal_docs = stat.wal_ndocs;
2401    wal_deletes = stat.wal_ndeletes;
2402    wal_n_inserts = wal_docs - wal_deletes;
2403
2404    if (ndocs + wal_n_inserts < wal_deletes) {
2405        info->doc_count = 0;
2406    } else {
2407        if (ndocs) { // not accurate since some ndocs may be in wal_n_inserts
2408            info->doc_count = ndocs + wal_n_inserts - wal_deletes;
2409        } else { // this is accurate
2410            info->doc_count = wal_n_inserts;
2411        }
2412    }
2413
2414    if (ndeletes) { // not accurate since some ndeletes may be wal_n_deletes
2415        info->deleted_count = ndeletes + wal_deletes;
2416    } else { // this is accurate
2417        info->deleted_count = wal_deletes;
2418    }
2419
2420    datasize = stat.datasize;
2421    nlivenodes = stat.nlivenodes;
2422
2423    info->space_used = datasize;
2424    info->space_used += nlivenodes * handle->config.blocksize;
2425    info->file = handle->fhandle;
2426
2427    atomic_cas_uint8_t(&handle->handle_busy, 1, 0);
2428
2429    // This is another LIBFDB_API call, so handle is marked as free
2430    // in the line above before making this call
2431    fdb_get_kvs_seqnum(handle, &info->last_seqnum);
2432
2433    return FDB_RESULT_SUCCESS;
2434}
2435
2436LIBFDB_API
2437fdb_status fdb_get_kvs_ops_info(fdb_kvs_handle *handle, fdb_kvs_ops_info *info)
2438{
2439    fdb_kvs_id_t kv_id;
2440    struct filemgr *file;
2441    struct kvs_ops_stat stat;
2442    struct kvs_ops_stat root_stat;
2443
2444    if (!handle) {
2445        return FDB_RESULT_INVALID_HANDLE;
2446    }
2447
2448    if (!info) {
2449        return FDB_RESULT_INVALID_ARGS;
2450    }
2451
2452    fdb_kvs_handle *root_handle = handle->fhandle->root;
2453
2454    // for snapshot handle do not reopen new file as user is interested in
2455    // reader stats from the old file
2456    if (!handle->shandle) {
2457        // always get stats from the latest file
2458        fdb_check_file_reopen(handle, NULL);
2459        fdb_sync_db_header(handle);
2460    }
2461
2462    file = handle->file;
2463
2464    if (handle->kvs == NULL) {
2465        kv_id = 0;
2466    } else {
2467        kv_id = handle->kvs->id;
2468    }
2469
2470    _kvs_ops_stat_get(file, kv_id, &stat);
2471
2472    if (root_handle != handle) {
2473        _kvs_ops_stat_get(file, 0, &root_stat);
2474    } else {
2475        root_stat = stat;
2476    }
2477
2478    info->num_sets = atomic_get_uint64_t(&stat.num_sets, std::memory_order_relaxed);
2479    info->num_dels = atomic_get_uint64_t(&stat.num_dels, std::memory_order_relaxed);
2480    info->num_gets = atomic_get_uint64_t(&stat.num_gets, std::memory_order_relaxed);
2481    info->num_iterator_gets = atomic_get_uint64_t(&stat.num_iterator_gets,
2482                                                  std::memory_order_relaxed);
2483    info->num_iterator_gets = atomic_get_uint64_t(&stat.num_iterator_gets,
2484                                                  std::memory_order_relaxed);
2485    info->num_iterator_moves = atomic_get_uint64_t(&stat.num_iterator_moves,
2486                                                   std::memory_order_relaxed);
2487
2488    info->num_commits = atomic_get_uint64_t(&root_stat.num_commits,
2489                                            std::memory_order_relaxed);
2490    info->num_compacts = atomic_get_uint64_t(&root_stat.num_compacts,
2491                                             std::memory_order_relaxed);
2492    return FDB_RESULT_SUCCESS;
2493}
2494
2495LIBFDB_API
2496fdb_status fdb_get_kvs_name_list(fdb_file_handle *fhandle,
2497                                 fdb_kvs_name_list *kvs_name_list)
2498{
2499    size_t num, size, offset;
2500    char *ptr;
2501    char **segment;
2502    fdb_kvs_handle *root_handle;
2503    struct kvs_header *kv_header;
2504    struct kvs_node *node;
2505    struct avl_node *a;
2506
2507    if (!fhandle) {
2508        return FDB_RESULT_INVALID_HANDLE;
2509    }
2510
2511    if (!kvs_name_list) {
2512        return FDB_RESULT_INVALID_ARGS;
2513    }
2514
2515    root_handle = fhandle->root;
2516    kv_header = root_handle->file->kv_header;
2517
2518    spin_lock(&kv_header->lock);
2519    // sum all lengths of KVS names first
2520    // (to calculate the size of memory segment to be allocated)
2521    num = 1;
2522    size = strlen(default_kvs_name) + 1;
2523    a = avl_first(kv_header->idx_id);
2524    while (a) {
2525        node = _get_entry(a, struct kvs_node, avl_id);
2526        a = avl_next(&node->avl_id);
2527
2528        num++;
2529        size += strlen(node->kvs_name) + 1;
2530    }
2531    size += num * sizeof(char*);
2532
2533    // allocate memory segment
2534    segment = (char**)calloc(1, size);
2535    kvs_name_list->num_kvs_names = num;
2536    kvs_name_list->kvs_names = segment;
2537
2538    ptr = (char*)segment + num * sizeof(char*);
2539    offset = num = 0;
2540
2541    // copy default KVS name
2542    strcpy(ptr + offset, default_kvs_name);
2543    segment[num] = ptr + offset;
2544    num++;
2545    offset += strlen(default_kvs_name) + 1;
2546
2547    // copy the others
2548    a = avl_first(kv_header->idx_name);
2549    while (a) {
2550        node = _get_entry(a, struct kvs_node, avl_name);
2551        a = avl_next(&node->avl_name);
2552
2553        strcpy(ptr + offset, node->kvs_name);
2554        segment[num] = ptr + offset;
2555
2556        num++;
2557        offset += strlen(node->kvs_name) + 1;
2558    }
2559
2560    spin_unlock(&kv_header->lock);
2561
2562    return FDB_RESULT_SUCCESS;
2563}
2564
2565LIBFDB_API
2566fdb_status fdb_free_kvs_name_list(fdb_kvs_name_list *kvs_name_list)
2567{
2568    if (!kvs_name_list) {
2569        return FDB_RESULT_INVALID_ARGS;
2570    }
2571
2572    free(kvs_name_list->kvs_names);
2573    kvs_name_list->kvs_names = NULL;
2574    kvs_name_list->num_kvs_names = 0;
2575
2576    return FDB_RESULT_SUCCESS;
2577}
2578
2579stale_header_info fdb_get_smallest_active_header(fdb_kvs_handle *handle)
2580{
2581    uint8_t *hdr_buf = alca(uint8_t, handle->config.blocksize);
2582    size_t i, hdr_len;
2583    uint64_t n_headers;
2584    bid_t hdr_bid, last_wal_bid;
2585    filemgr_header_revnum_t hdr_revnum;
2586    filemgr_header_revnum_t cur_revnum;
2587    filemgr_magic_t magic;
2588    fdb_seqnum_t seqnum;
2589    fdb_file_handle *fhandle = NULL;
2590    stale_header_info ret;
2591    struct avl_node *a;
2592    struct filemgr_fhandle_idx_node *fhandle_node;
2593    struct list_elem *e;
2594    struct kvs_opened_node *item;
2595
2596    ret.revnum = cur_revnum = handle->fhandle->root->cur_header_revnum;
2597    ret.bid = handle->fhandle->root->last_hdr_bid;
2598
2599    spin_lock(&handle->file->fhandle_idx_lock);
2600
2601    // check all opened file handles
2602    a = avl_first(&handle->file->fhandle_idx);
2603    while (a) {
2604        fhandle_node = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
2605        a = avl_next(a);
2606
2607        fhandle = (fdb_file_handle*)fhandle_node->fhandle;
2608        spin_lock(&fhandle->lock);
2609        // check all opened KVS handles belonging to the file handle
2610        e = list_begin(fhandle->handles);
2611        while (e) {
2612
2613            item = _get_entry(e, struct kvs_opened_node, le);
2614            e = list_next(e);
2615
2616            if (!item->handle->shandle) {
2617                // Only consider active snapshot handles since non-snapshot
2618                // handles will get synced upon their next forestdb api call.
2619                // This prevents "lazy" non-snapshot handles from holding up
2620                // stale block reclaim.
2621                continue;
2622            }
2623
2624            if (item->handle->cur_header_revnum < ret.revnum) {
2625                ret.revnum = item->handle->cur_header_revnum;
2626                ret.bid = item->handle->last_hdr_bid;
2627            }
2628        }
2629        spin_unlock(&fhandle->lock);
2630    }
2631
2632    spin_unlock(&handle->file->fhandle_idx_lock);
2633
2634    uint64_t num_keeping_headers =
2635        atomic_get_uint64_t(&handle->file->config->num_keeping_headers,
2636                            std::memory_order_relaxed);
2637    if (num_keeping_headers) {
2638        // backward scan previous header info to keep more headers
2639
2640        if (ret.bid == handle->last_hdr_bid) {
2641            // header in 'handle->last_hdr_bid' is not written into file yet!
2642            // we should start from the previous header
2643            hdr_bid = atomic_get_uint64_t(&handle->file->header.bid);
2644            hdr_revnum = handle->file->header.revnum - 1;
2645        } else {
2646            hdr_bid = ret.bid;
2647            hdr_revnum = ret.revnum;
2648        }
2649
2650        n_headers= num_keeping_headers;
2651        if (cur_revnum - hdr_revnum < n_headers) {
2652            n_headers = n_headers - (cur_revnum - hdr_revnum);
2653        } else {
2654            n_headers = 0;
2655        }
2656
2657        for (i=0; i<n_headers; ++i) {
2658            hdr_bid = filemgr_fetch_prev_header(handle->file, hdr_bid,
2659                         hdr_buf, &hdr_len, &seqnum, &hdr_revnum, NULL,
2660                         &magic, NULL, &handle->log_callback);
2661            if (hdr_len) {
2662                ret.revnum = hdr_revnum;
2663                ret.bid = hdr_bid;
2664            } else {
2665                break;
2666            }
2667        }
2668    }
2669
2670    // although we keep more headers from the oldest active header, we have to
2671    // preserve the last WAL flushing header from the target header for data
2672    // consistency.
2673    uint64_t dummy64;
2674    char *new_filename;
2675
2676    filemgr_fetch_header(handle->file, ret.bid, hdr_buf, &hdr_len, &seqnum,
2677                         &hdr_revnum, NULL, &magic, NULL, &handle->log_callback);
2678    fdb_fetch_header(magic, hdr_buf, &dummy64, &dummy64, &dummy64, &dummy64,
2679                     &dummy64, &dummy64, &dummy64, &last_wal_bid, &dummy64,
2680                     &dummy64, &new_filename, NULL);
2681
2682    if (last_wal_bid != BLK_NOT_FOUND) {
2683        filemgr_fetch_header(handle->file, last_wal_bid, hdr_buf, &hdr_len, &seqnum,
2684                             &hdr_revnum, NULL, &magic, NULL, &handle->log_callback);
2685        ret.bid = last_wal_bid;
2686        ret.revnum = hdr_revnum;
2687    } else {
2688        // WAL has not been flushed yet .. we cannot trigger block reusing
2689        ret.bid = BLK_NOT_FOUND;
2690        ret.revnum = 0;
2691    }
2692
2693    return ret;
2694}
2695
2696