xref: /4.0.0/forestdb/src/filemgr.cc (revision adab8ed5)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <sys/stat.h>
23#include <stdarg.h>
24#if !defined(WIN32) && !defined(_WIN32)
25#include <sys/time.h>
26#endif
27
28#include "filemgr.h"
29#include "filemgr_ops.h"
30#include "hash_functions.h"
31#include "blockcache.h"
32#include "wal.h"
33#include "list.h"
34#include "fdb_internal.h"
35#include "time_utils.h"
36
37#include "memleak.h"
38
39#ifdef __DEBUG
40#ifndef __DEBUG_FILEMGR
41    #undef DBG
42    #undef DBGCMD
43    #undef DBGSW
44    #define DBG(...)
45    #define DBGCMD(...)
46    #define DBGSW(n, ...)
47#endif
48#endif
49
50// NBUCKET must be power of 2
51#define NBUCKET (1024)
52#define FILEMGR_MAGIC (UINT64_C(0xdeadcafebeefbeef))
53
54// global static variables
55#ifdef SPIN_INITIALIZER
56static spin_t initial_lock = SPIN_INITIALIZER;
57#else
58static volatile unsigned int initial_lock_status = 0;
59static spin_t initial_lock;
60#endif
61
62
63static volatile uint8_t filemgr_initialized = 0;
64static struct filemgr_config global_config;
65static struct hash hash;
66static spin_t filemgr_openlock;
67
68struct temp_buf_item{
69    void *addr;
70    struct list_elem le;
71};
72static struct list temp_buf;
73static spin_t temp_buf_lock;
74
75static void _filemgr_free_func(struct hash_elem *h);
76
77static void spin_init_wrap(void *lock) {
78    spin_init((spin_t*)lock);
79}
80
81static void spin_destroy_wrap(void *lock) {
82    spin_destroy((spin_t*)lock);
83}
84
85static void spin_lock_wrap(void *lock) {
86    spin_lock((spin_t*)lock);
87}
88
89static void spin_unlock_wrap(void *lock) {
90    spin_unlock((spin_t*)lock);
91}
92
93static void mutex_init_wrap(void *lock) {
94    mutex_init((mutex_t*)lock);
95}
96
97static void mutex_destroy_wrap(void *lock) {
98    mutex_destroy((mutex_t*)lock);
99}
100
101static void mutex_lock_wrap(void *lock) {
102    mutex_lock((mutex_t*)lock);
103}
104
105static void mutex_unlock_wrap(void *lock) {
106    mutex_unlock((mutex_t*)lock);
107}
108
109static int _kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
110{
111    struct kvs_node *aa, *bb;
112    aa = _get_entry(a, struct kvs_node, avl_id);
113    bb = _get_entry(b, struct kvs_node, avl_id);
114
115    if (aa->id < bb->id) {
116        return -1;
117    } else if (aa->id > bb->id) {
118        return 1;
119    } else {
120        return 0;
121    }
122}
123
124static int _block_is_overlapped(void *pbid1, void *pis_writer1,
125                                void *pbid2, void *pis_writer2,
126                                void *aux)
127{
128    (void)aux;
129    bid_t bid1, is_writer1, bid2, is_writer2;
130    bid1 = *(bid_t*)pbid1;
131    is_writer1 = *(bid_t*)pis_writer1;
132    bid2 = *(bid_t*)pbid2;
133    is_writer2 = *(bid_t*)pis_writer2;
134
135    if (bid1 != bid2) {
136        // not overlapped
137        return 0;
138    } else {
139        // overlapped
140        if (!is_writer1 && !is_writer2) {
141            // both are readers
142            return 0;
143        } else {
144            return 1;
145        }
146    }
147}
148
149fdb_status fdb_log(err_log_callback *log_callback,
150                   fdb_status status,
151                   const char *format, ...)
152{
153    if (log_callback && log_callback->callback) {
154        char msg[1024];
155        va_list args;
156        va_start(args, format);
157        vsprintf(msg, format, args);
158        va_end(args);
159        log_callback->callback(status, msg, log_callback->ctx_data);
160    }
161    return status;
162}
163
164static void _log_errno_str(struct filemgr_ops *ops,
165                           err_log_callback *log_callback,
166                           fdb_status io_error,
167                           const char *what,
168                           const char *filename)
169{
170    if (io_error < 0) {
171        char errno_msg[512];
172        ops->get_errno_str(errno_msg, 512);
173        fdb_log(log_callback, io_error,
174                "Error in %s on a database file '%s', %s", what, filename, errno_msg);
175    }
176}
177
178static uint32_t _file_hash(struct hash *hash, struct hash_elem *e)
179{
180    struct filemgr *file = _get_entry(e, struct filemgr, e);
181    int len = strlen(file->filename);
182    return chksum(file->filename, len) & ((unsigned)(NBUCKET-1));
183}
184
185static int _file_cmp(struct hash_elem *a, struct hash_elem *b)
186{
187    struct filemgr *aa, *bb;
188    aa = _get_entry(a, struct filemgr, e);
189    bb = _get_entry(b, struct filemgr, e);
190    return strcmp(aa->filename, bb->filename);
191}
192
193void filemgr_init(struct filemgr_config *config)
194{
195    // global initialization
196    // initialized only once at first time
197    if (!filemgr_initialized) {
198#ifndef SPIN_INITIALIZER
199        // Note that only Windows passes through this routine
200        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
201            // atomically initialize spin lock only once
202            spin_init(&initial_lock);
203            initial_lock_status = 2;
204        } else {
205            // the others ... wait until initializing 'initial_lock' is done
206            while (initial_lock_status != 2) {
207                Sleep(1);
208            }
209        }
210#endif
211
212        spin_lock(&initial_lock);
213        if (!filemgr_initialized) {
214            global_config = *config;
215
216            if (global_config.ncacheblock > 0)
217                bcache_init(global_config.ncacheblock, global_config.blocksize);
218
219            hash_init(&hash, NBUCKET, _file_hash, _file_cmp);
220
221            // initialize temp buffer
222            list_init(&temp_buf);
223            spin_init(&temp_buf_lock);
224
225            // initialize global lock
226            spin_init(&filemgr_openlock);
227
228            // set the initialize flag
229            filemgr_initialized = 1;
230        }
231        spin_unlock(&initial_lock);
232    }
233}
234
235static void * _filemgr_get_temp_buf()
236{
237    struct list_elem *e;
238    struct temp_buf_item *item;
239
240    spin_lock(&temp_buf_lock);
241    e = list_pop_front(&temp_buf);
242    if (e) {
243        item = _get_entry(e, struct temp_buf_item, le);
244    } else {
245        void *addr;
246
247        malloc_align(addr, FDB_SECTOR_SIZE,
248                     global_config.blocksize + sizeof(struct temp_buf_item));
249
250        item = (struct temp_buf_item *)((uint8_t *) addr + global_config.blocksize);
251        item->addr = addr;
252    }
253    spin_unlock(&temp_buf_lock);
254
255    return item->addr;
256}
257
258static void _filemgr_release_temp_buf(void *buf)
259{
260    struct temp_buf_item *item;
261
262    spin_lock(&temp_buf_lock);
263    item = (struct temp_buf_item*)((uint8_t *)buf + global_config.blocksize);
264    list_push_front(&temp_buf, &item->le);
265    spin_unlock(&temp_buf_lock);
266}
267
268static void _filemgr_shutdown_temp_buf()
269{
270    struct list_elem *e;
271    struct temp_buf_item *item;
272    size_t count=0;
273
274    spin_lock(&temp_buf_lock);
275    e = list_begin(&temp_buf);
276    while(e){
277        item = _get_entry(e, struct temp_buf_item, le);
278        e = list_remove(&temp_buf, e);
279        free_align(item->addr);
280        count++;
281    }
282    spin_unlock(&temp_buf_lock);
283}
284
285static fdb_status _filemgr_read_header(struct filemgr *file,
286                                       err_log_callback *log_callback)
287{
288    uint8_t marker[BLK_MARKER_SIZE];
289    filemgr_magic_t magic;
290    filemgr_header_len_t len;
291    uint8_t *buf;
292    uint32_t crc, crc_file;
293    fdb_status status = FDB_RESULT_SUCCESS;
294
295    // get temp buffer
296    buf = (uint8_t *) _filemgr_get_temp_buf();
297
298    if (atomic_get_uint64_t(&file->pos) > 0) {
299        // Crash Recovery Test 1: unaligned last block write
300        uint64_t remain = atomic_get_uint64_t(&file->pos) % file->blocksize;
301        if (remain) {
302            atomic_sub_uint64_t(&file->pos, remain);
303            atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
304            const char *msg = "Crash Detected: %" _F64 " non-block aligned bytes discarded "
305                "from a database file '%s'\n";
306            DBG(msg, remain, file->filename);
307            fdb_log(log_callback, FDB_RESULT_READ_FAIL /* Need to add a better error code*/,
308                    msg, remain, file->filename);
309        }
310
311        size_t block_counter = 0;
312        do {
313            ssize_t rv = file->ops->pread(file->fd, buf, file->blocksize,
314                atomic_get_uint64_t(&file->pos) - file->blocksize);
315            if (rv != file->blocksize) {
316                status = FDB_RESULT_READ_FAIL;
317                const char *msg = "Unable to read a database file '%s' with "
318                    "blocksize %" _F64 "\n";
319                DBG(msg, file->filename, file->blocksize);
320                fdb_log(log_callback, status, msg, file->filename, file->blocksize);
321                break;
322            }
323            ++block_counter;
324            memcpy(marker, buf + file->blocksize - BLK_MARKER_SIZE,
325                   BLK_MARKER_SIZE);
326
327            if (marker[0] == BLK_MARKER_DBHEADER) {
328                // possible need for byte conversions here
329                memcpy(&magic,
330                       buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
331                       sizeof(magic));
332                magic = _endian_decode(magic);
333
334                if (magic == FILEMGR_MAGIC) {
335                    memcpy(&len,
336                           buf + file->blocksize - BLK_MARKER_SIZE -
337                           sizeof(magic) - sizeof(len),
338                           sizeof(len));
339                    len = _endian_decode(len);
340
341                    crc = chksum(buf, len - sizeof(crc));
342                    memcpy(&crc_file, buf + len - sizeof(crc), sizeof(crc));
343                    crc_file = _endian_decode(crc_file);
344                    if (crc == crc_file) {
345                        file->header.data = (void *)malloc(len);
346
347                        memcpy(file->header.data, buf, len);
348                        memcpy(&file->header.revnum, buf + len,
349                               sizeof(filemgr_header_revnum_t));
350                        memcpy((void *) &file->header.seqnum,
351                                buf + len + sizeof(filemgr_header_revnum_t),
352                                sizeof(fdb_seqnum_t));
353                        file->header.revnum =
354                            _endian_decode(file->header.revnum);
355                        file->header.seqnum =
356                            _endian_decode(file->header.seqnum);
357                        file->header.size = len;
358                        atomic_store_uint64_t(&file->header.bid,
359                            (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1);
360                        atomic_store_uint64_t(&file->header.dirty_idtree_root,
361                                              BLK_NOT_FOUND);
362                        atomic_store_uint64_t(&file->header.dirty_seqtree_root,
363                                              BLK_NOT_FOUND);
364                        memset(&file->header.stat, 0x0, sizeof(file->header.stat));
365
366                        // release temp buffer
367                        _filemgr_release_temp_buf(buf);
368
369                        return FDB_RESULT_SUCCESS;
370                    } else {
371                        status = FDB_RESULT_CHECKSUM_ERROR;
372                        const char *msg = "Crash Detected: CRC on disk %u != %u "
373                            "in a database file '%s'\n";
374                        DBG(msg, crc_file, crc, file->filename);
375                        fdb_log(log_callback, status, msg, crc_file, crc,
376                                file->filename);
377                    }
378                } else {
379                    status = FDB_RESULT_FILE_CORRUPTION;
380                    const char *msg = "Crash Detected: Wrong Magic %" _F64 " != %" _F64
381                        " in a database file '%s'\n";
382                    DBG(msg, magic, FILEMGR_MAGIC, file->filename);
383                    fdb_log(log_callback, status, msg, magic, FILEMGR_MAGIC,
384                            file->filename);
385                }
386            } else {
387                status = FDB_RESULT_NO_DB_HEADERS;
388                if (block_counter == 1) {
389                    const char *msg = "Crash Detected: Last Block not DBHEADER %0.01x "
390                                      "in a database file '%s'\n";
391                    DBG(msg, marker[0], file->filename);
392                    fdb_log(log_callback, status, msg, marker[0], file->filename);
393                }
394            }
395
396            atomic_sub_uint64_t(&file->pos, file->blocksize);
397            atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
398        } while (atomic_get_uint64_t(&file->pos));
399    }
400
401    // release temp buffer
402    _filemgr_release_temp_buf(buf);
403
404    file->header.size = 0;
405    file->header.revnum = 0;
406    file->header.seqnum = 0;
407    file->header.data = NULL;
408    atomic_store_uint64_t(&file->header.bid, 0);
409    atomic_store_uint64_t(&file->header.dirty_idtree_root, BLK_NOT_FOUND);
410    atomic_store_uint64_t(&file->header.dirty_seqtree_root, BLK_NOT_FOUND);
411    memset(&file->header.stat, 0x0, sizeof(file->header.stat));
412    return status;
413}
414
415size_t filemgr_get_ref_count(struct filemgr *file)
416{
417    size_t ret = 0;
418    spin_lock(&file->lock);
419    ret = file->ref_count;
420    spin_unlock(&file->lock);
421    return ret;
422}
423
424uint64_t filemgr_get_bcache_used_space(void)
425{
426    uint64_t bcache_free_space = 0;
427    if (global_config.ncacheblock) { // If buffer cache is indeed configured
428        bcache_free_space = bcache_get_num_free_blocks();
429        bcache_free_space = (global_config.ncacheblock - bcache_free_space)
430                          * global_config.blocksize;
431    }
432    return bcache_free_space;
433}
434
435struct filemgr_prefetch_args {
436    struct filemgr *file;
437    uint64_t duration;
438    err_log_callback *log_callback;
439    void *aux;
440};
441
442static void *_filemgr_prefetch_thread(void *voidargs)
443{
444    struct filemgr_prefetch_args *args = (struct filemgr_prefetch_args*)voidargs;
445    uint8_t *buf = alca(uint8_t, args->file->blocksize);
446    uint64_t cur_pos = 0, i;
447    uint64_t bcache_free_space;
448    bid_t bid;
449    bool terminate = false;
450    struct timeval begin, cur, gap;
451
452    spin_lock(&args->file->lock);
453    cur_pos = atomic_get_uint64_t(&args->file->last_commit);
454    spin_unlock(&args->file->lock);
455    if (cur_pos < FILEMGR_PREFETCH_UNIT) {
456        terminate = true;
457    } else {
458        cur_pos -= FILEMGR_PREFETCH_UNIT;
459    }
460    // read backwards from the end of the file, in the unit of FILEMGR_PREFETCH_UNIT
461    gettimeofday(&begin, NULL);
462    while (!terminate) {
463        for (i = cur_pos;
464             i < cur_pos + FILEMGR_PREFETCH_UNIT;
465             i += args->file->blocksize) {
466
467            gettimeofday(&cur, NULL);
468            gap = _utime_gap(begin, cur);
469            bcache_free_space = bcache_get_num_free_blocks();
470            bcache_free_space *= args->file->blocksize;
471
472            if (args->file->prefetch_status == FILEMGR_PREFETCH_ABORT ||
473                gap.tv_sec >= (int64_t)args->duration ||
474                bcache_free_space < FILEMGR_PREFETCH_UNIT) {
475                // terminate thread when
476                // 1. got abort signal
477                // 2. time out
478                // 3. not enough free space in block cache
479                terminate = true;
480                break;
481            } else {
482                bid = i / args->file->blocksize;
483                if (filemgr_read(args->file, bid, buf, NULL, true)
484                        != FDB_RESULT_SUCCESS) {
485                    // 4. read failure
486                    fdb_log(args->log_callback, FDB_RESULT_READ_FAIL,
487                            "Prefetch thread failed to read a block with block id %" _F64
488                            " from a database file '%s'", bid, args->file->filename);
489                    terminate = true;
490                    break;
491                }
492            }
493        }
494
495        if (cur_pos >= FILEMGR_PREFETCH_UNIT) {
496            cur_pos -= FILEMGR_PREFETCH_UNIT;
497        } else {
498            // remaining space is less than FILEMGR_PREFETCH_UNIT
499            terminate = true;
500        }
501    }
502
503    args->file->prefetch_status = FILEMGR_PREFETCH_IDLE;
504    free(args);
505    return NULL;
506}
507
508// prefetch the given DB file
509void filemgr_prefetch(struct filemgr *file,
510                      struct filemgr_config *config,
511                      err_log_callback *log_callback)
512{
513    uint64_t bcache_free_space;
514
515    bcache_free_space = bcache_get_num_free_blocks();
516    bcache_free_space *= file->blocksize;
517
518    // block cache should have free space larger than FILEMGR_PREFETCH_UNIT
519    spin_lock(&file->lock);
520    if (atomic_get_uint64_t(&file->last_commit) > 0 &&
521        bcache_free_space >= FILEMGR_PREFETCH_UNIT) {
522        // invoke prefetch thread
523        struct filemgr_prefetch_args *args;
524        args = (struct filemgr_prefetch_args *)
525               calloc(1, sizeof(struct filemgr_prefetch_args));
526        args->file = file;
527        args->duration = config->prefetch_duration;
528        args->log_callback = log_callback;
529
530        file->prefetch_status = FILEMGR_PREFETCH_RUNNING;
531        thread_create(&file->prefetch_tid, _filemgr_prefetch_thread, args);
532    }
533    spin_unlock(&file->lock);
534}
535
536fdb_status filemgr_does_file_exist(char *filename) {
537    struct filemgr_ops *ops = get_filemgr_ops();
538    int fd = ops->open(filename, O_RDONLY, 0444);
539    if (fd < 0) {
540        return (fdb_status) fd;
541    }
542    ops->close(fd);
543    return FDB_RESULT_SUCCESS;
544}
545
546filemgr_open_result filemgr_open(char *filename, struct filemgr_ops *ops,
547                                 struct filemgr_config *config,
548                                 err_log_callback *log_callback)
549{
550    struct filemgr *file = NULL;
551    struct filemgr query;
552    struct hash_elem *e = NULL;
553    bool create = config->options & FILEMGR_CREATE;
554    int file_flag = 0x0;
555    int fd = -1;
556    fdb_status status;
557    filemgr_open_result result = {NULL, FDB_RESULT_OPEN_FAIL};
558
559    filemgr_init(config);
560
561    // check whether file is already opened or not
562    query.filename = filename;
563    spin_lock(&filemgr_openlock);
564    e = hash_find(&hash, &query.e);
565
566    if (e) {
567        // already opened (return existing structure)
568        file = _get_entry(e, struct filemgr, e);
569
570        spin_lock(&file->lock);
571        file->ref_count++;
572
573        if (atomic_get_uint8_t(&file->status) == FILE_CLOSED) { // if file was closed before
574            file_flag = O_RDWR;
575            if (create) {
576                file_flag |= O_CREAT;
577            }
578            *file->config = *config;
579            file->config->blocksize = global_config.blocksize;
580            file->config->ncacheblock = global_config.ncacheblock;
581            file_flag |= config->flag;
582            file->fd = file->ops->open(file->filename, file_flag, 0666);
583            if (file->fd < 0) {
584                if (file->fd == FDB_RESULT_NO_SUCH_FILE) {
585                    // A database file was manually deleted by the user.
586                    // Clean up global hash table, WAL index, and buffer cache.
587                    // Then, retry it with a create option below IFF it is not
588                    // a read-only open attempt
589                    struct hash_elem *ret;
590                    spin_unlock(&file->lock);
591                    ret = hash_remove(&hash, &file->e);
592                    fdb_assert(ret, 0, 0);
593                    _filemgr_free_func(&file->e);
594                    if (!create) {
595                        _log_errno_str(ops, log_callback,
596                                FDB_RESULT_NO_SUCH_FILE, "OPEN", filename);
597                        spin_unlock(&filemgr_openlock);
598                        result.rv = FDB_RESULT_NO_SUCH_FILE;
599                        return result;
600                    }
601                } else {
602                    _log_errno_str(file->ops, log_callback,
603                                  (fdb_status)file->fd, "OPEN", filename);
604                    file->ref_count--;
605                    spin_unlock(&file->lock);
606                    spin_unlock(&filemgr_openlock);
607                    result.rv = file->fd;
608                    return result;
609                }
610            } else { // Reopening the closed file is succeed.
611                atomic_store_uint8_t(&file->status, FILE_NORMAL);
612                if (config->options & FILEMGR_SYNC) {
613                    file->fflags |= FILEMGR_SYNC;
614                } else {
615                    file->fflags &= ~FILEMGR_SYNC;
616                }
617                spin_unlock(&file->lock);
618                spin_unlock(&filemgr_openlock);
619                result.file = file;
620                result.rv = FDB_RESULT_SUCCESS;
621                return result;
622            }
623        } else { // file is already opened.
624
625            if (config->options & FILEMGR_SYNC) {
626                file->fflags |= FILEMGR_SYNC;
627            } else {
628                file->fflags &= ~FILEMGR_SYNC;
629            }
630
631            spin_unlock(&file->lock);
632            spin_unlock(&filemgr_openlock);
633            result.file = file;
634            result.rv = FDB_RESULT_SUCCESS;
635            return result;
636        }
637    }
638
639    file_flag = O_RDWR;
640    if (create) {
641        file_flag |= O_CREAT;
642    }
643    file_flag |= config->flag;
644    fd = ops->open(filename, file_flag, 0666);
645    if (fd < 0) {
646        _log_errno_str(ops, log_callback, (fdb_status)fd, "OPEN", filename);
647        spin_unlock(&filemgr_openlock);
648        result.rv = fd;
649        return result;
650    }
651    file = (struct filemgr*)calloc(1, sizeof(struct filemgr));
652    file->filename_len = strlen(filename);
653    file->filename = (char*)malloc(file->filename_len + 1);
654    strcpy(file->filename, filename);
655
656    file->ref_count = 1;
657
658    file->wal = (struct wal *)calloc(1, sizeof(struct wal));
659    file->wal->flag = 0;
660
661    file->ops = ops;
662    file->blocksize = global_config.blocksize;
663    atomic_init_uint8_t(&file->status, FILE_NORMAL);
664    file->config = (struct filemgr_config*)malloc(sizeof(struct filemgr_config));
665    *file->config = *config;
666    file->config->blocksize = global_config.blocksize;
667    file->config->ncacheblock = global_config.ncacheblock;
668    file->new_file = NULL;
669    file->old_filename = NULL;
670    file->fd = fd;
671
672    cs_off_t offset = file->ops->goto_eof(file->fd);
673    if (offset == FDB_RESULT_SEEK_FAIL) {
674        _log_errno_str(file->ops, log_callback, FDB_RESULT_SEEK_FAIL, "SEEK_END", filename);
675        file->ops->close(file->fd);
676        free(file->wal);
677        free(file->filename);
678        free(file->config);
679        free(file);
680        spin_unlock(&filemgr_openlock);
681        result.rv = FDB_RESULT_SEEK_FAIL;
682        return result;
683    }
684    atomic_init_uint64_t(&file->last_commit, offset);
685    atomic_init_uint64_t(&file->pos, offset);
686
687    file->bcache = NULL;
688    file->in_place_compaction = false;
689    file->kv_header = NULL;
690    file->prefetch_status = FILEMGR_PREFETCH_IDLE;
691
692    atomic_init_uint64_t(&file->header.bid, 0);
693    atomic_init_uint64_t(&file->header.dirty_idtree_root, 0);
694    atomic_init_uint64_t(&file->header.dirty_seqtree_root, 0);
695    _init_op_stats(&file->header.op_stat);
696    status = _filemgr_read_header(file, log_callback);
697    if (status != FDB_RESULT_SUCCESS) {
698        _log_errno_str(file->ops, log_callback, status, "READ", filename);
699        file->ops->close(file->fd);
700        free(file->wal);
701        free(file->filename);
702        free(file->config);
703        free(file);
704        spin_unlock(&filemgr_openlock);
705        result.rv = status;
706        return result;
707    }
708
709    spin_init(&file->lock);
710
711#ifdef __FILEMGR_DATA_PARTIAL_LOCK
712    struct plock_ops pops;
713    struct plock_config pconfig;
714
715    pops.init_user = mutex_init_wrap;
716    pops.lock_user = mutex_lock_wrap;
717    pops.unlock_user = mutex_unlock_wrap;
718    pops.destroy_user = mutex_destroy_wrap;
719    pops.init_internal = spin_init_wrap;
720    pops.lock_internal = spin_lock_wrap;
721    pops.unlock_internal = spin_unlock_wrap;
722    pops.destroy_internal = spin_destroy_wrap;
723    pops.is_overlapped = _block_is_overlapped;
724
725    memset(&pconfig, 0x0, sizeof(pconfig));
726    pconfig.ops = &pops;
727    pconfig.sizeof_lock_internal = sizeof(spin_t);
728    pconfig.sizeof_lock_user = sizeof(mutex_t);
729    pconfig.sizeof_range = sizeof(bid_t);
730    pconfig.aux = NULL;
731    plock_init(&file->plock, &pconfig);
732#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
733    int i;
734    for (i=0;i<DLOCK_MAX;++i) {
735        mutex_init(&file->data_mutex[i]);
736    }
737#else
738    int i;
739    for (i=0;i<DLOCK_MAX;++i) {
740        spin_init(&file->data_spinlock[i]);
741    }
742#endif //__FILEMGR_DATA_PARTIAL_LOCK
743
744    mutex_init(&file->writer_lock.mutex);
745    file->writer_lock.locked = false;
746
747    // initialize WAL
748    if (!wal_is_initialized(file)) {
749        wal_init(file, FDB_WAL_NBUCKET);
750    }
751
752    // init global transaction for the file
753    file->global_txn.wrapper = (struct wal_txn_wrapper*)
754                               malloc(sizeof(struct wal_txn_wrapper));
755    file->global_txn.wrapper->txn = &file->global_txn;
756    file->global_txn.handle = NULL;
757    if (atomic_get_uint64_t(&file->pos)) {
758        file->global_txn.prev_hdr_bid =
759            (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1;
760    } else {
761        file->global_txn.prev_hdr_bid = BLK_NOT_FOUND;
762    }
763    file->global_txn.items = (struct list *)malloc(sizeof(struct list));
764    list_init(file->global_txn.items);
765    file->global_txn.isolation = FDB_ISOLATION_READ_COMMITTED;
766    wal_add_transaction(file, &file->global_txn);
767
768    hash_insert(&hash, &file->e);
769    if (config->prefetch_duration > 0) {
770        filemgr_prefetch(file, config, log_callback);
771    }
772    spin_unlock(&filemgr_openlock);
773
774    if (config->options & FILEMGR_SYNC) {
775        file->fflags |= FILEMGR_SYNC;
776    } else {
777        file->fflags &= ~FILEMGR_SYNC;
778    }
779
780    result.file = file;
781    result.rv = FDB_RESULT_SUCCESS;
782    return result;
783}
784
785uint64_t filemgr_update_header(struct filemgr *file, void *buf, size_t len)
786{
787    uint64_t ret;
788
789    spin_lock(&file->lock);
790
791    if (file->header.data == NULL) {
792        file->header.data = (void *)malloc(len);
793    }else if (file->header.size < len){
794        file->header.data = (void *)realloc(file->header.data, len);
795    }
796    memcpy(file->header.data, buf, len);
797    file->header.size = len;
798    ++(file->header.revnum);
799    ret = file->header.revnum;
800
801    spin_unlock(&file->lock);
802
803    return ret;
804}
805
806filemgr_header_revnum_t filemgr_get_header_revnum(struct filemgr *file)
807{
808    filemgr_header_revnum_t ret;
809    spin_lock(&file->lock);
810    ret = file->header.revnum;
811    spin_unlock(&file->lock);
812    return ret;
813}
814
815// 'filemgr_get_seqnum' & 'filemgr_set_seqnum' have to be protected by
816// 'filemgr_mutex_lock' & 'filemgr_mutex_unlock'.
817fdb_seqnum_t filemgr_get_seqnum(struct filemgr *file)
818{
819    return file->header.seqnum;
820}
821
822void filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)
823{
824    file->header.seqnum = seqnum;
825}
826
827void* filemgr_get_header(struct filemgr *file, void *buf, size_t *len)
828{
829    spin_lock(&file->lock);
830
831    if (file->header.size > 0) {
832        if (buf == NULL) {
833            buf = (void*)malloc(file->header.size);
834        }
835        memcpy(buf, file->header.data, file->header.size);
836    }
837    *len = file->header.size;
838
839    spin_unlock(&file->lock);
840
841    return buf;
842}
843
844fdb_status filemgr_fetch_header(struct filemgr *file, uint64_t bid,
845                                void *buf, size_t *len, fdb_seqnum_t *seqnum,
846                                err_log_callback *log_callback)
847{
848    uint8_t *_buf;
849    uint8_t marker[BLK_MARKER_SIZE];
850    filemgr_header_len_t hdr_len;
851    filemgr_magic_t magic;
852    fdb_status status = FDB_RESULT_SUCCESS;
853
854    if (!bid || bid == BLK_NOT_FOUND) {
855        *len = 0; // No other header available
856        return FDB_RESULT_SUCCESS;
857    }
858    _buf = (uint8_t *)_filemgr_get_temp_buf();
859
860    status = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
861
862    if (status != FDB_RESULT_SUCCESS) {
863        fdb_log(log_callback, status,
864                "Failed to read a database header with block id %" _F64 " in "
865                "a database file '%s'", bid, file->filename);
866        _filemgr_release_temp_buf(_buf);
867        return status;
868    }
869    memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
870            BLK_MARKER_SIZE);
871
872    if (marker[0] != BLK_MARKER_DBHEADER) {
873        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
874                "A block marker of the database header block id %" _F64 " in "
875                "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
876                bid, file->filename);
877        _filemgr_release_temp_buf(_buf);
878        return FDB_RESULT_READ_FAIL;
879    }
880    memcpy(&magic,
881            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
882            sizeof(magic));
883    magic = _endian_decode(magic);
884    if (magic != FILEMGR_MAGIC) {
885        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
886                "A block magic value of the database header block id %" _F64 " in "
887                "a database file '%s' does NOT match FILEMGR_MAGIC!",
888                bid, file->filename);
889        _filemgr_release_temp_buf(_buf);
890        return FDB_RESULT_READ_FAIL;
891    }
892    memcpy(&hdr_len,
893            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
894            sizeof(hdr_len), sizeof(hdr_len));
895    hdr_len = _endian_decode(hdr_len);
896
897    memcpy(buf, _buf, hdr_len);
898    *len = hdr_len;
899
900    if (seqnum) {
901        // copy default KVS's seqnum
902        fdb_seqnum_t _seqnum;
903        memcpy(&_seqnum, _buf + hdr_len + sizeof(filemgr_header_revnum_t),
904               sizeof(_seqnum));
905        *seqnum = _endian_decode(_seqnum);
906    }
907
908    _filemgr_release_temp_buf(_buf);
909
910    return status;
911}
912
913uint64_t filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid,
914                                   void *buf, size_t *len, fdb_seqnum_t *seqnum,
915                                   err_log_callback *log_callback)
916{
917    uint8_t *_buf;
918    uint8_t marker[BLK_MARKER_SIZE];
919    fdb_seqnum_t _seqnum;
920    filemgr_header_revnum_t _revnum;
921    filemgr_header_len_t hdr_len;
922    filemgr_magic_t magic;
923    bid_t _prev_bid, prev_bid;
924    int found = 0;
925
926    if (!bid || bid == BLK_NOT_FOUND) {
927        *len = 0; // No other header available
928        return bid;
929    }
930    _buf = (uint8_t *)_filemgr_get_temp_buf();
931
932    // Reverse scan the file for a previous DB header
933    do {
934        // Get prev_bid from the current header.
935        // Since the current header is already cached during the previous
936        // operation, no disk I/O will be triggered.
937        if (filemgr_read(file, (bid_t)bid, _buf, log_callback, true)
938                != FDB_RESULT_SUCCESS) {
939            break;
940        }
941
942        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
943               BLK_MARKER_SIZE);
944        memcpy(&magic,
945               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
946               sizeof(magic));
947        magic = _endian_decode(magic);
948
949        if (marker[0] != BLK_MARKER_DBHEADER ||
950            magic != FILEMGR_MAGIC) {
951            // not a header block
952            // this happens when this function is invoked between
953            // fdb_set() call and fdb_commit() call, so the last block
954            // in the file is not a header block
955            bid_t latest_hdr = filemgr_get_header_bid(file);
956            if (latest_hdr != BLK_NOT_FOUND && bid > latest_hdr) {
957                // get the latest header BID
958                bid = latest_hdr;
959            } else {
960                break;
961            }
962        } else {
963            memcpy(&_prev_bid,
964                   _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
965                       sizeof(hdr_len) - sizeof(_prev_bid),
966                   sizeof(_prev_bid));
967            prev_bid = _endian_decode(_prev_bid);
968            if (bid <= prev_bid) {
969                // no more prev header, or broken linked list
970                break;
971            }
972            bid = prev_bid;
973        }
974
975        // Read the prev header
976        fdb_status fs = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
977        if (fs != FDB_RESULT_SUCCESS) {
978            fdb_log(log_callback, fs,
979                    "Failed to read a previous database header with block id %" _F64 " in "
980                    "a database file '%s'", bid, file->filename);
981            break;
982        }
983
984        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
985               BLK_MARKER_SIZE);
986        if (marker[0] != BLK_MARKER_DBHEADER) {
987            if (bid) {
988                // broken linked list
989                fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
990                        "A block marker of the previous database header block id %"
991                        _F64 " in "
992                        "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
993                        bid, file->filename);
994            }
995            break;
996        }
997
998        memcpy(&magic,
999               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1000               sizeof(magic));
1001        magic = _endian_decode(magic);
1002        if (magic != FILEMGR_MAGIC) {
1003            // broken linked list
1004            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1005                    "A block magic value of the previous database header block id %" _F64 " in "
1006                    "a database file '%s' does NOT match FILEMGR_MAGIC!",
1007                    bid, file->filename);
1008            break;
1009        }
1010
1011        memcpy(&hdr_len,
1012               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1013               sizeof(hdr_len), sizeof(hdr_len));
1014        hdr_len = _endian_decode(hdr_len);
1015
1016        if (buf) {
1017            memcpy(buf, _buf, hdr_len);
1018        }
1019        memcpy(&_revnum, _buf + hdr_len,
1020               sizeof(filemgr_header_revnum_t));
1021        memcpy(&_seqnum,
1022               _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1023               sizeof(fdb_seqnum_t));
1024        *seqnum = _endian_decode(_seqnum);
1025        *len = hdr_len;
1026        found = 1;
1027        break;
1028    } while (false); // no repetition
1029
1030    if (!found) { // no other header found till end of file
1031        *len = 0;
1032    }
1033
1034    _filemgr_release_temp_buf(_buf);
1035
1036    return bid;
1037}
1038
1039fdb_status filemgr_close(struct filemgr *file, bool cleanup_cache_onclose,
1040                         const char *orig_file_name,
1041                         err_log_callback *log_callback)
1042{
1043    int rv = FDB_RESULT_SUCCESS;
1044
1045    spin_lock(&filemgr_openlock); // Grab the filemgr lock to avoid the race with
1046                                  // filemgr_open() because file->lock won't
1047                                  // prevent the race condition.
1048
1049    // remove filemgr structure if no thread refers to the file
1050    spin_lock(&file->lock);
1051    if (--(file->ref_count) == 0) {
1052        spin_unlock(&file->lock);
1053        if (global_config.ncacheblock > 0) {
1054            // discard all dirty blocks belonged to this file
1055            bcache_remove_dirty_blocks(file);
1056        }
1057
1058        if (wal_is_initialized(file)) {
1059            wal_close(file);
1060        }
1061
1062        spin_lock(&file->lock);
1063        rv = file->ops->close(file->fd);
1064        if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING) {
1065            _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1066            // remove file
1067            remove(file->filename);
1068            // we can release lock becuase no one will open this file
1069            spin_unlock(&file->lock);
1070            struct hash_elem *ret = hash_remove(&hash, &file->e);
1071            fdb_assert(ret, 0, 0);
1072            spin_unlock(&filemgr_openlock);
1073            _filemgr_free_func(&file->e);
1074            return (fdb_status) rv;
1075        } else {
1076            if (cleanup_cache_onclose) {
1077                _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1078                if (file->in_place_compaction && orig_file_name) {
1079                    struct hash_elem *elem = NULL;
1080                    struct filemgr query;
1081                    uint32_t old_file_refcount = 0;
1082
1083                    query.filename = (char *)orig_file_name;
1084                    elem = hash_find(&hash, &query.e);
1085
1086                    if (file->old_filename) {
1087                        struct hash_elem *elem_old = NULL;
1088                        struct filemgr query_old;
1089                        struct filemgr *old_file = NULL;
1090
1091                        // get old file's ref count if exists
1092                        query_old.filename = file->old_filename;
1093                        elem_old = hash_find(&hash, &query_old.e);
1094                        if (elem_old) {
1095                            old_file = _get_entry(elem_old, struct filemgr, e);
1096                            old_file_refcount = old_file->ref_count;
1097                        }
1098                    }
1099                    // If old file is opened by other handle, renaming should be
1100                    // postponed. It will be renamed later by the handle referring
1101                    // to the old file.
1102                    if (!elem && old_file_refcount == 0 &&
1103                        rename(file->filename, orig_file_name) < 0) {
1104                        // Note that the renaming failure is not a critical
1105                        // issue because the last compacted file will be automatically
1106                        // identified and opened in the next fdb_open call.
1107                        _log_errno_str(file->ops, log_callback, FDB_RESULT_FILE_RENAME_FAIL,
1108                                       "CLOSE", file->filename);
1109                    }
1110                }
1111                spin_unlock(&file->lock);
1112                // Clean up global hash table, WAL index, and buffer cache.
1113                struct hash_elem *ret = hash_remove(&hash, &file->e);
1114                fdb_assert(ret, file, 0);
1115                spin_unlock(&filemgr_openlock);
1116                _filemgr_free_func(&file->e);
1117                return (fdb_status) rv;
1118            } else {
1119                atomic_store_uint8_t(&file->status, FILE_CLOSED);
1120            }
1121        }
1122    }
1123
1124    _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1125
1126    spin_unlock(&file->lock);
1127    spin_unlock(&filemgr_openlock);
1128    return (fdb_status) rv;
1129}
1130
1131static void _filemgr_free_func(struct hash_elem *h)
1132{
1133    struct filemgr *file = _get_entry(h, struct filemgr, e);
1134
1135    spin_lock(&file->lock);
1136    if (file->prefetch_status == FILEMGR_PREFETCH_RUNNING) {
1137        // prefetch thread is running
1138        void *ret;
1139        file->prefetch_status = FILEMGR_PREFETCH_ABORT;
1140        spin_unlock(&file->lock);
1141        // wait
1142        thread_join(file->prefetch_tid, &ret);
1143    } else {
1144        spin_unlock(&file->lock);
1145    }
1146
1147    // remove all cached blocks
1148    if (global_config.ncacheblock > 0) {
1149        bcache_remove_dirty_blocks(file);
1150        bcache_remove_clean_blocks(file);
1151        bcache_remove_file(file);
1152    }
1153
1154    if (file->kv_header) {
1155        // multi KV intance mode & KV header exists
1156        file->free_kv_header(file);
1157    }
1158
1159    // free global transaction
1160    wal_remove_transaction(file, &file->global_txn);
1161    free(file->global_txn.items);
1162    free(file->global_txn.wrapper);
1163
1164    // destroy WAL
1165    if (wal_is_initialized(file)) {
1166        wal_shutdown(file);
1167        size_t i = 0;
1168        size_t num_all_shards = wal_get_num_all_shards(file);
1169        // Free all WAL shards (including compactor's shard)
1170        for (; i < num_all_shards; ++i) {
1171            hash_free(&file->wal->key_shards[i].hash_bykey);
1172            spin_destroy(&file->wal->key_shards[i].lock);
1173            hash_free(&file->wal->seq_shards[i].hash_byseq);
1174            spin_destroy(&file->wal->seq_shards[i].lock);
1175        }
1176        spin_destroy(&file->wal->lock);
1177        atomic_destroy_uint32_t(&file->wal->size);
1178        atomic_destroy_uint32_t(&file->wal->num_flushable);
1179        atomic_destroy_uint64_t(&file->wal->datasize);
1180        free(file->wal->key_shards);
1181        free(file->wal->seq_shards);
1182    }
1183    free(file->wal);
1184
1185    // free filename and header
1186    free(file->filename);
1187    if (file->header.data) free(file->header.data);
1188    // free old filename if any
1189    free(file->old_filename);
1190
1191    // destroy locks
1192    spin_destroy(&file->lock);
1193
1194#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1195    plock_destroy(&file->plock);
1196#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1197    int i;
1198    for (i=0;i<DLOCK_MAX;++i) {
1199        mutex_destroy(&file->data_mutex[i]);
1200    }
1201#else
1202    int i;
1203    for (i=0;i<DLOCK_MAX;++i) {
1204        spin_destroy(&file->data_spinlock[i]);
1205    }
1206#endif //__FILEMGR_DATA_PARTIAL_LOCK
1207
1208    mutex_destroy(&file->writer_lock.mutex);
1209
1210    // free file structure
1211    free(file->config);
1212    free(file);
1213}
1214
1215// permanently remove file from cache (not just close)
1216// LCOV_EXCL_START
1217void filemgr_remove_file(struct filemgr *file)
1218{
1219    struct hash_elem *ret;
1220
1221    fdb_assert(file, file, NULL);
1222    fdb_assert(file->ref_count <= 0, file->ref_count, 0);
1223
1224    // remove from global hash table
1225    spin_lock(&filemgr_openlock);
1226    ret = hash_remove(&hash, &file->e);
1227    fdb_assert(ret, ret, NULL);
1228    spin_unlock(&filemgr_openlock);
1229
1230    _filemgr_free_func(&file->e);
1231}
1232// LCOV_EXCL_STOP
1233
1234static
1235void *_filemgr_is_closed(struct hash_elem *h, void *ctx) {
1236    struct filemgr *file = _get_entry(h, struct filemgr, e);
1237    void *ret;
1238    spin_lock(&file->lock);
1239    if (file->ref_count != 0) {
1240        ret = (void *)file;
1241    } else {
1242        ret = NULL;
1243    }
1244    spin_unlock(&file->lock);
1245    return ret;
1246}
1247
1248fdb_status filemgr_shutdown()
1249{
1250    fdb_status ret = FDB_RESULT_SUCCESS;
1251    void *open_file;
1252    if (filemgr_initialized) {
1253        spin_lock(&initial_lock);
1254        if (!filemgr_initialized) {
1255            // filemgr is already shut down
1256#ifdef SPIN_INITIALIZER
1257            spin_unlock(&initial_lock);
1258#endif
1259            return ret;
1260        }
1261
1262        open_file = hash_scan(&hash, _filemgr_is_closed, NULL);
1263        if (!open_file) {
1264            hash_free_active(&hash, _filemgr_free_func);
1265            if (global_config.ncacheblock > 0) {
1266                bcache_shutdown();
1267            }
1268            filemgr_initialized = 0;
1269#ifndef SPIN_INITIALIZER
1270            initial_lock_status = 0;
1271#else
1272            initial_lock = SPIN_INITIALIZER;
1273#endif
1274            _filemgr_shutdown_temp_buf();
1275            spin_unlock(&initial_lock);
1276#ifndef SPIN_INITIALIZER
1277            spin_destroy(&initial_lock);
1278#endif
1279        } else {
1280            spin_unlock(&initial_lock);
1281            ret = FDB_RESULT_FILE_IS_BUSY;
1282        }
1283    }
1284    return ret;
1285}
1286
1287bid_t filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)
1288{
1289    spin_lock(&file->lock);
1290    bid_t bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1291    atomic_add_uint64_t(&file->pos, file->blocksize);
1292
1293    if (global_config.ncacheblock <= 0) {
1294        // if block cache is turned off, write the allocated block before use
1295        uint8_t _buf = 0x0;
1296        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1297                                       atomic_get_uint64_t(&file->pos) - 1);
1298        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1299    }
1300    spin_unlock(&file->lock);
1301
1302    return bid;
1303}
1304
1305void filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin,
1306                            bid_t *end, err_log_callback *log_callback)
1307{
1308    spin_lock(&file->lock);
1309    *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1310    *end = *begin + nblock - 1;
1311    atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1312
1313    if (global_config.ncacheblock <= 0) {
1314        // if block cache is turned off, write the allocated block before use
1315        uint8_t _buf = 0x0;
1316        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1317                                       atomic_get_uint64_t(&file->pos) - 1);
1318        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1319    }
1320    spin_unlock(&file->lock);
1321}
1322
1323// atomically allocate NBLOCK blocks only when current file position is same to nextbid
1324bid_t filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock,
1325                                  bid_t *begin, bid_t *end,
1326                                  err_log_callback *log_callback)
1327{
1328    bid_t bid;
1329    spin_lock(&file->lock);
1330    bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1331    if (bid == nextbid) {
1332        *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1333        *end = *begin + nblock - 1;
1334        atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1335
1336        if (global_config.ncacheblock <= 0) {
1337            // if block cache is turned off, write the allocated block before use
1338            uint8_t _buf = 0x0;
1339            ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1340                                           atomic_get_uint64_t(&file->pos));
1341            _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1342        }
1343    }else{
1344        *begin = BLK_NOT_FOUND;
1345        *end = BLK_NOT_FOUND;
1346    }
1347    spin_unlock(&file->lock);
1348    return bid;
1349}
1350
1351#ifdef __CRC32
1352INLINE fdb_status _filemgr_crc32_check(struct filemgr *file, void *buf)
1353{
1354    if ( *((uint8_t*)buf + file->blocksize-1) == BLK_MARKER_BNODE ) {
1355        uint32_t crc_file, crc;
1356        memcpy(&crc_file, (uint8_t *) buf + BTREE_CRC_OFFSET, sizeof(crc_file));
1357        crc_file = _endian_decode(crc_file);
1358        memset((uint8_t *) buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1359        crc = chksum(buf, file->blocksize);
1360        if (crc != crc_file) {
1361            return FDB_RESULT_CHECKSUM_ERROR;
1362        }
1363    }
1364    return FDB_RESULT_SUCCESS;
1365}
1366#endif
1367
1368void filemgr_invalidate_block(struct filemgr *file, bid_t bid)
1369{
1370    if (global_config.ncacheblock > 0) {
1371        bcache_invalidate_block(file, bid);
1372    }
1373}
1374
1375fdb_status filemgr_read(struct filemgr *file, bid_t bid, void *buf,
1376                        err_log_callback *log_callback,
1377                        bool read_on_cache_miss)
1378{
1379    size_t lock_no;
1380    ssize_t r;
1381    uint64_t pos = bid * file->blocksize;
1382    fdb_status status = FDB_RESULT_SUCCESS;
1383    uint64_t curr_pos = atomic_get_uint64_t(&file->pos);
1384    fdb_assert(pos < curr_pos, pos, curr_pos);
1385
1386    if (global_config.ncacheblock > 0) {
1387        lock_no = bid % DLOCK_MAX;
1388        (void)lock_no;
1389
1390#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1391        plock_entry_t *plock_entry = NULL;
1392        bid_t is_writer = 0;
1393#endif
1394        bool locked = false;
1395        // Note: we don't need to grab lock for committed blocks
1396        // because they are immutable so that no writer will interfere and
1397        // overwrite dirty data
1398        if (filemgr_is_writable(file, bid)) {
1399#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1400            plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1401#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1402            mutex_lock(&file->data_mutex[lock_no]);
1403#else
1404            spin_lock(&file->data_spinlock[lock_no]);
1405#endif //__FILEMGR_DATA_PARTIAL_LOCK
1406            locked = true;
1407        }
1408
1409        r = bcache_read(file, bid, buf);
1410        if (r == 0) {
1411            // cache miss
1412            if (!read_on_cache_miss) {
1413                if (locked) {
1414#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1415                    plock_unlock(&file->plock, plock_entry);
1416#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1417                    mutex_unlock(&file->data_mutex[lock_no]);
1418#else
1419                    spin_unlock(&file->data_spinlock[lock_no]);
1420#endif //__FILEMGR_DATA_PARTIAL_LOCK
1421                }
1422                return FDB_RESULT_READ_FAIL;
1423            }
1424
1425            // if normal file, just read a block
1426            r = file->ops->pread(file->fd, buf, file->blocksize, pos);
1427            if (r != file->blocksize) {
1428                _log_errno_str(file->ops, log_callback,
1429                               (fdb_status) r, "READ", file->filename);
1430                if (locked) {
1431#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1432                    plock_unlock(&file->plock, plock_entry);
1433#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1434                    mutex_unlock(&file->data_mutex[lock_no]);
1435#else
1436                    spin_unlock(&file->data_spinlock[lock_no]);
1437#endif //__FILEMGR_DATA_PARTIAL_LOCK
1438                }
1439                return (fdb_status)r;
1440            }
1441#ifdef __CRC32
1442            status = _filemgr_crc32_check(file, buf);
1443            if (status != FDB_RESULT_SUCCESS) {
1444                _log_errno_str(file->ops, log_callback, status, "READ",
1445                        file->filename);
1446                if (locked) {
1447#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1448                    plock_unlock(&file->plock, plock_entry);
1449#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1450                    mutex_unlock(&file->data_mutex[lock_no]);
1451#else
1452                    spin_unlock(&file->data_spinlock[lock_no]);
1453#endif //__FILEMGR_DATA_PARTIAL_LOCK
1454                }
1455                return status;
1456            }
1457#endif
1458            r = bcache_write(file, bid, buf, BCACHE_REQ_CLEAN);
1459            if (r != global_config.blocksize) {
1460                if (locked) {
1461#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1462                    plock_unlock(&file->plock, plock_entry);
1463#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1464                    mutex_unlock(&file->data_mutex[lock_no]);
1465#else
1466                    spin_unlock(&file->data_spinlock[lock_no]);
1467#endif //__FILEMGR_DATA_PARTIAL_LOCK
1468                }
1469                _log_errno_str(file->ops, log_callback,
1470                               (fdb_status) r, "WRITE", file->filename);
1471                return FDB_RESULT_WRITE_FAIL;
1472            }
1473        }
1474        if (locked) {
1475#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1476            plock_unlock(&file->plock, plock_entry);
1477#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1478            mutex_unlock(&file->data_mutex[lock_no]);
1479#else
1480            spin_unlock(&file->data_spinlock[lock_no]);
1481#endif //__FILEMGR_DATA_PARTIAL_LOCK
1482        }
1483    } else {
1484        if (!read_on_cache_miss) {
1485            return FDB_RESULT_READ_FAIL;
1486        }
1487
1488        r = file->ops->pread(file->fd, buf, file->blocksize, pos);
1489        if (r != file->blocksize) {
1490            _log_errno_str(file->ops, log_callback, (fdb_status) r, "READ",
1491                           file->filename);
1492            return (fdb_status)r;
1493        }
1494
1495#ifdef __CRC32
1496        status = _filemgr_crc32_check(file, buf);
1497        if (status != FDB_RESULT_SUCCESS) {
1498            _log_errno_str(file->ops, log_callback, status, "READ",
1499                           file->filename);
1500            return status;
1501        }
1502#endif
1503    }
1504    return status;
1505}
1506
1507fdb_status filemgr_write_offset(struct filemgr *file, bid_t bid,
1508                                uint64_t offset, uint64_t len, void *buf,
1509                                err_log_callback *log_callback)
1510{
1511    fdb_assert(offset + len <= file->blocksize, offset + len, file);
1512
1513    size_t lock_no;
1514    ssize_t r = 0;
1515    uint64_t pos = bid * file->blocksize + offset;
1516    uint64_t curr_commit_pos = atomic_get_uint64_t(&file->last_commit);
1517    fdb_assert(pos >= curr_commit_pos, pos, curr_commit_pos);
1518
1519    if (global_config.ncacheblock > 0) {
1520        lock_no = bid % DLOCK_MAX;
1521        (void)lock_no;
1522
1523        bool locked = false;
1524#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1525        plock_entry_t *plock_entry;
1526        bid_t is_writer = 1;
1527        plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1528#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1529        mutex_lock(&file->data_mutex[lock_no]);
1530#else
1531        spin_lock(&file->data_spinlock[lock_no]);
1532#endif //__FILEMGR_DATA_PARTIAL_LOCK
1533        locked = true;
1534
1535        if (len == file->blocksize) {
1536            // write entire block .. we don't need to read previous block
1537            r = bcache_write(file, bid, buf, BCACHE_REQ_DIRTY);
1538            if (r != global_config.blocksize) {
1539                if (locked) {
1540#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1541                    plock_unlock(&file->plock, plock_entry);
1542#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1543                    mutex_unlock(&file->data_mutex[lock_no]);
1544#else
1545                    spin_unlock(&file->data_spinlock[lock_no]);
1546#endif //__FILEMGR_DATA_PARTIAL_LOCK
1547                }
1548                _log_errno_str(file->ops, log_callback,
1549                               (fdb_status) r, "WRITE", file->filename);
1550                return FDB_RESULT_WRITE_FAIL;
1551            }
1552        } else {
1553            // partially write buffer cache first
1554            r = bcache_write_partial(file, bid, buf, offset, len);
1555            if (r == 0) {
1556                // cache miss
1557                // write partially .. we have to read previous contents of the block
1558                uint64_t cur_file_pos = file->ops->goto_eof(file->fd);
1559                bid_t cur_file_last_bid = cur_file_pos / file->blocksize;
1560                void *_buf = _filemgr_get_temp_buf();
1561
1562                if (bid >= cur_file_last_bid) {
1563                    // this is the first time to write this block
1564                    // we don't need to read previous block from file.
1565                } else {
1566                    r = file->ops->pread(file->fd, _buf, file->blocksize,
1567                                         bid * file->blocksize);
1568                    if (r != file->blocksize) {
1569                        if (locked) {
1570#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1571                            plock_unlock(&file->plock, plock_entry);
1572#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1573                            mutex_unlock(&file->data_mutex[lock_no]);
1574#else
1575                            spin_unlock(&file->data_spinlock[lock_no]);
1576#endif //__FILEMGR_DATA_PARTIAL_LOCK
1577                        }
1578                        _filemgr_release_temp_buf(_buf);
1579                        _log_errno_str(file->ops, log_callback, (fdb_status) r,
1580                                       "READ", file->filename);
1581                        return FDB_RESULT_READ_FAIL;
1582                    }
1583                }
1584                memcpy((uint8_t *)_buf + offset, buf, len);
1585                r = bcache_write(file, bid, _buf, BCACHE_REQ_DIRTY);
1586                if (r != global_config.blocksize) {
1587                    if (locked) {
1588#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1589                        plock_unlock(&file->plock, plock_entry);
1590#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1591                        mutex_unlock(&file->data_mutex[lock_no]);
1592#else
1593                        spin_unlock(&file->data_spinlock[lock_no]);
1594#endif //__FILEMGR_DATA_PARTIAL_LOCK
1595                    }
1596                    _filemgr_release_temp_buf(_buf);
1597                    _log_errno_str(file->ops, log_callback,
1598                            (fdb_status) r, "WRITE", file->filename);
1599                    return FDB_RESULT_WRITE_FAIL;
1600                }
1601
1602                _filemgr_release_temp_buf(_buf);
1603            } // cache miss
1604        } // full block or partial block
1605
1606        if (locked) {
1607#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1608            plock_unlock(&file->plock, plock_entry);
1609#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1610            mutex_unlock(&file->data_mutex[lock_no]);
1611#else
1612            spin_unlock(&file->data_spinlock[lock_no]);
1613#endif //__FILEMGR_DATA_PARTIAL_LOCK
1614        }
1615
1616    } else { // block cache disabled
1617
1618#ifdef __CRC32
1619        if (len == file->blocksize) {
1620            uint8_t marker = *((uint8_t*)buf + file->blocksize - 1);
1621            if (marker == BLK_MARKER_BNODE) {
1622                memset((uint8_t *)buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1623                uint32_t crc32 = chksum(buf, file->blocksize);
1624                crc32 = _endian_encode(crc32);
1625                memcpy((uint8_t *)buf + BTREE_CRC_OFFSET, &crc32, sizeof(crc32));
1626            }
1627        }
1628#endif
1629
1630        r = file->ops->pwrite(file->fd, buf, len, pos);
1631        _log_errno_str(file->ops, log_callback, (fdb_status) r, "WRITE", file->filename);
1632        if ((uint64_t)r != len) {
1633            return FDB_RESULT_WRITE_FAIL;
1634        }
1635    } // block cache check
1636    return FDB_RESULT_SUCCESS;
1637}
1638
1639fdb_status filemgr_write(struct filemgr *file, bid_t bid, void *buf,
1640                   err_log_callback *log_callback)
1641{
1642    return filemgr_write_offset(file, bid, 0, file->blocksize, buf,
1643                                log_callback);
1644}
1645
1646fdb_status filemgr_commit(struct filemgr *file,
1647                          err_log_callback *log_callback)
1648{
1649    uint16_t header_len = file->header.size;
1650    uint16_t _header_len;
1651    bid_t _prev_bid;
1652    fdb_seqnum_t _seqnum;
1653    filemgr_header_revnum_t _revnum;
1654    int result = FDB_RESULT_SUCCESS;
1655    filemgr_magic_t magic = FILEMGR_MAGIC;
1656    filemgr_magic_t _magic;
1657
1658    if (global_config.ncacheblock > 0) {
1659        result = bcache_flush(file);
1660        if (result != FDB_RESULT_SUCCESS) {
1661            _log_errno_str(file->ops, log_callback, (fdb_status) result,
1662                           "FLUSH", file->filename);
1663            return (fdb_status)result;
1664        }
1665    }
1666
1667    spin_lock(&file->lock);
1668
1669    if (file->header.size > 0 && file->header.data) {
1670        void *buf = _filemgr_get_temp_buf();
1671        uint8_t marker[BLK_MARKER_SIZE];
1672
1673        // [header data]:        'header_len' bytes   <---+
1674        // [header revnum]:      8 bytes                  |
1675        // [default KVS seqnum]: 8 bytes                  |
1676        // ...                                            |
1677        // (empty)                                    blocksize
1678        // ...                                            |
1679        // [prev header bid]:    8 bytes                  |
1680        // [header length]:      2 bytes                  |
1681        // [magic number]:       8 bytes                  |
1682        // [block marker]:       1 byte               <---+
1683
1684        // header data
1685        memcpy(buf, file->header.data, header_len);
1686        // header rev number
1687        _revnum = _endian_encode(file->header.revnum);
1688        memcpy((uint8_t *)buf + header_len, &_revnum,
1689               sizeof(filemgr_header_revnum_t));
1690        // file's sequence number (default KVS seqnum)
1691        _seqnum = _endian_encode(file->header.seqnum);
1692        memcpy((uint8_t *)buf + header_len + sizeof(filemgr_header_revnum_t),
1693               &_seqnum, sizeof(fdb_seqnum_t));
1694
1695        // prev header bid
1696        _prev_bid = _endian_encode(atomic_get_uint64_t(&file->header.bid));
1697        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1698               - sizeof(header_len) - sizeof(_prev_bid) - BLK_MARKER_SIZE),
1699               &_prev_bid, sizeof(_prev_bid));
1700        // header length
1701        _header_len = _endian_encode(header_len);
1702        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1703               - sizeof(header_len) - BLK_MARKER_SIZE),
1704               &_header_len, sizeof(header_len));
1705        // magic number
1706        _magic = _endian_encode(magic);
1707        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1708               - BLK_MARKER_SIZE), &_magic, sizeof(magic));
1709
1710        // marker
1711        memset(marker, BLK_MARKER_DBHEADER, BLK_MARKER_SIZE);
1712        memcpy((uint8_t *)buf + file->blocksize - BLK_MARKER_SIZE,
1713               marker, BLK_MARKER_SIZE);
1714
1715        ssize_t rv = file->ops->pwrite(file->fd, buf, file->blocksize,
1716                                       atomic_get_uint64_t(&file->pos));
1717        _log_errno_str(file->ops, log_callback, (fdb_status) rv,
1718                       "WRITE", file->filename);
1719        if (rv != file->blocksize) {
1720            _filemgr_release_temp_buf(buf);
1721            spin_unlock(&file->lock);
1722            return FDB_RESULT_WRITE_FAIL;
1723        }
1724        atomic_store_uint64_t(&file->header.bid,
1725                              atomic_get_uint64_t(&file->pos) / file->blocksize);
1726        atomic_add_uint64_t(&file->pos, file->blocksize);
1727
1728        atomic_store_uint64_t(&file->header.dirty_idtree_root, BLK_NOT_FOUND);
1729        atomic_store_uint64_t(&file->header.dirty_seqtree_root, BLK_NOT_FOUND);
1730
1731        _filemgr_release_temp_buf(buf);
1732    }
1733    // race condition?
1734    atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
1735
1736    spin_unlock(&file->lock);
1737
1738    if (file->fflags & FILEMGR_SYNC) {
1739        result = file->ops->fsync(file->fd);
1740        _log_errno_str(file->ops, log_callback, (fdb_status)result, "FSYNC", file->filename);
1741    }
1742    return (fdb_status) result;
1743}
1744
1745fdb_status filemgr_sync(struct filemgr *file, err_log_callback *log_callback)
1746{
1747    fdb_status result = FDB_RESULT_SUCCESS;
1748    if (global_config.ncacheblock > 0) {
1749        result = bcache_flush(file);
1750        if (result != FDB_RESULT_SUCCESS) {
1751            _log_errno_str(file->ops, log_callback, (fdb_status) result,
1752                           "FLUSH", file->filename);
1753            return result;
1754        }
1755    }
1756
1757    if (file->fflags & FILEMGR_SYNC) {
1758        int rv = file->ops->fsync(file->fd);
1759        _log_errno_str(file->ops, log_callback, (fdb_status)rv, "FSYNC", file->filename);
1760        return (fdb_status) rv;
1761    }
1762    return result;
1763}
1764
1765int filemgr_update_file_status(struct filemgr *file, file_status_t status,
1766                                char *old_filename)
1767{
1768    int ret = 1;
1769    spin_lock(&file->lock);
1770    atomic_store_uint8_t(&file->status, status);
1771    if (old_filename) {
1772        if (!file->old_filename) {
1773            file->old_filename = old_filename;
1774        } else {
1775            ret = 0;
1776            fdb_assert(file->ref_count, file->ref_count, 0);
1777            free(old_filename);
1778        }
1779    }
1780    spin_unlock(&file->lock);
1781    return ret;
1782}
1783
1784void filemgr_set_compaction_state(struct filemgr *old_file, struct filemgr *new_file,
1785                                  file_status_t status)
1786{
1787    spin_lock(&old_file->lock);
1788    old_file->new_file = new_file;
1789    atomic_store_uint8_t(&old_file->status, status);
1790    spin_unlock(&old_file->lock);
1791}
1792
1793// Check if there is a file that still points to the old_file that is being
1794// compacted away. If so open the file and return its pointer.
1795static
1796void *_filemgr_check_stale_link(struct hash_elem *h, void *ctx) {
1797    struct filemgr *cur_file = (struct filemgr *)ctx;
1798    struct filemgr *file = _get_entry(h, struct filemgr, e);
1799    spin_lock(&file->lock);
1800    if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING &&
1801        file->new_file == cur_file) {
1802        // Incrementing reference counter below is the same as filemgr_open()
1803        // We need to do this to ensure that the pointer returned does not
1804        // get freed outside the filemgr_open lock
1805        file->ref_count++;
1806        spin_unlock(&file->lock);
1807        return (void *)file;
1808    }
1809    spin_unlock(&file->lock);
1810    return (void *)NULL;
1811}
1812
1813struct filemgr *filemgr_search_stale_links(struct filemgr *cur_file) {
1814    struct filemgr *very_old_file;
1815    spin_lock(&filemgr_openlock);
1816    very_old_file = (struct filemgr *)hash_scan(&hash,
1817                                         _filemgr_check_stale_link, cur_file);
1818    spin_unlock(&filemgr_openlock);
1819    return very_old_file;
1820}
1821
1822char *filemgr_redirect_old_file(struct filemgr *very_old_file,
1823                                     struct filemgr *new_file,
1824                                     filemgr_redirect_hdr_func
1825                                     redirect_header_func) {
1826    size_t old_header_len, new_header_len;
1827    uint16_t new_filename_len;
1828    char *past_filename;
1829    spin_lock(&very_old_file->lock);
1830    fdb_assert(very_old_file->header.size, very_old_file->header.size, 0);
1831    fdb_assert(very_old_file->new_file, very_old_file->new_file, 0);
1832    old_header_len = very_old_file->header.size;
1833    new_filename_len = strlen(new_file->filename);
1834    // Find out the new DB header length with new_file's filename
1835    new_header_len = old_header_len - strlen(very_old_file->new_file->filename)
1836        + new_filename_len;
1837    // As we are going to change the new_filename field in the DB header of the
1838    // very_old_file, maybe reallocate DB header buf to accomodate bigger value
1839    if (new_header_len > old_header_len) {
1840        very_old_file->header.data = realloc(very_old_file->header.data,
1841                new_header_len);
1842    }
1843    very_old_file->new_file = new_file; // Re-direct very_old_file to new_file
1844    past_filename = redirect_header_func((uint8_t *)very_old_file->header.data,
1845            new_file->filename, new_filename_len + 1);//Update in-memory header
1846    very_old_file->header.size = new_header_len;
1847    ++(very_old_file->header.revnum);
1848
1849    spin_unlock(&very_old_file->lock);
1850    return past_filename;
1851}
1852
1853void filemgr_remove_pending(struct filemgr *old_file, struct filemgr *new_file)
1854{
1855    fdb_assert(new_file, new_file, old_file);
1856
1857    spin_lock(&old_file->lock);
1858    if (old_file->ref_count > 0) {
1859        // delay removing
1860        old_file->new_file = new_file;
1861        atomic_store_uint8_t(&old_file->status, FILE_REMOVED_PENDING);
1862        spin_unlock(&old_file->lock);
1863    } else {
1864        // immediatly remove
1865        // LCOV_EXCL_START
1866        spin_unlock(&old_file->lock);
1867        remove(old_file->filename);
1868        filemgr_remove_file(old_file);
1869        // LCOV_EXCL_STOP
1870    }
1871}
1872
1873// migrate default kv store stats over to new_file
1874struct kvs_ops_stat *filemgr_migrate_op_stats(struct filemgr *old_file,
1875                                              struct filemgr *new_file,
1876                                              struct kvs_info *kvs)
1877{
1878    kvs_ops_stat *ret = NULL;
1879    fdb_assert(new_file, new_file, old_file);
1880
1881    spin_lock(&old_file->lock);
1882    new_file->header.op_stat = old_file->header.op_stat;
1883    ret = &new_file->header.op_stat;
1884    spin_unlock(&old_file->lock);
1885    return ret;
1886}
1887
1888// Note: filemgr_openlock should be held before calling this function.
1889fdb_status filemgr_destroy_file(char *filename,
1890                                struct filemgr_config *config,
1891                                struct hash *destroy_file_set)
1892{
1893    struct filemgr *file = NULL;
1894    struct hash to_destroy_files;
1895    struct hash *destroy_set = (destroy_file_set ? destroy_file_set :
1896                                                  &to_destroy_files);
1897    struct filemgr query;
1898    struct hash_elem *e = NULL;
1899    fdb_status status = FDB_RESULT_SUCCESS;
1900    char *old_filename = NULL;
1901
1902    if (!destroy_file_set) { // top level or non-recursive call
1903        hash_init(destroy_set, NBUCKET, _file_hash, _file_cmp);
1904    }
1905
1906    query.filename = filename;
1907    // check whether file is already being destroyed in parent recursive call
1908    e = hash_find(destroy_set, &query.e);
1909    if (e) { // Duplicate filename found, nothing to be done in this call
1910        if (!destroy_file_set) { // top level or non-recursive call
1911            hash_free(destroy_set);
1912        }
1913        return status;
1914    } else {
1915        // Remember file. Stack value ok IFF single direction recursion
1916        hash_insert(destroy_set, &query.e);
1917    }
1918
1919    // check global list of known files to see if it is already opened or not
1920    e = hash_find(&hash, &query.e);
1921    if (e) {
1922        // already opened (return existing structure)
1923        file = _get_entry(e, struct filemgr, e);
1924
1925        spin_lock(&file->lock);
1926        if (file->ref_count) {
1927            spin_unlock(&file->lock);
1928            status = FDB_RESULT_FILE_IS_BUSY;
1929            if (!destroy_file_set) { // top level or non-recursive call
1930                hash_free(destroy_set);
1931            }
1932            return status;
1933        }
1934        spin_unlock(&file->lock);
1935        if (file->old_filename) {
1936            status = filemgr_destroy_file(file->old_filename, config,
1937                                          destroy_set);
1938            if (status != FDB_RESULT_SUCCESS) {
1939                if (!destroy_file_set) { // top level or non-recursive call
1940                    hash_free(destroy_set);
1941                }
1942                return status;
1943            }
1944        }
1945
1946        // Cleanup file from in-memory as well as on-disk
1947        e = hash_remove(&hash, &file->e);
1948        fdb_assert(e, e, 0);
1949        _filemgr_free_func(&file->e);
1950        if (filemgr_does_file_exist(filename) == FDB_RESULT_SUCCESS) {
1951            if (remove(filename)) {
1952                status = FDB_RESULT_FILE_REMOVE_FAIL;
1953            }
1954        }
1955    } else { // file not in memory, read on-disk to destroy older versions..
1956        file = (struct filemgr *)alca(struct filemgr, 1);
1957        file->filename = filename;
1958        file->ops = get_filemgr_ops();
1959        file->fd = file->ops->open(file->filename, O_RDWR, 0666);
1960        file->blocksize = global_config.blocksize;
1961        if (file->fd < 0) {
1962            if (file->fd != FDB_RESULT_NO_SUCH_FILE) {
1963                if (!destroy_file_set) { // top level or non-recursive call
1964                    hash_free(destroy_set);
1965                }
1966                return FDB_RESULT_OPEN_FAIL;
1967            }
1968        } else { // file successfully opened, seek to end to get DB header
1969            cs_off_t offset = file->ops->goto_eof(file->fd);
1970            if (offset == FDB_RESULT_SEEK_FAIL) {
1971                if (!destroy_file_set) { // top level or non-recursive call
1972                    hash_free(destroy_set);
1973                }
1974                return FDB_RESULT_SEEK_FAIL;
1975            } else { // Need to read DB header which contains old filename
1976                atomic_store_uint64_t(&file->pos, offset);
1977                status = _filemgr_read_header(file, NULL);
1978                if (status != FDB_RESULT_SUCCESS) {
1979                    if (!destroy_file_set) { // top level or non-recursive call
1980                        hash_free(destroy_set);
1981                    }
1982                    file->ops->close(file->fd);
1983                    return status;
1984                }
1985                if (file->header.data) {
1986                    uint16_t *new_filename_len_ptr = (uint16_t *)((char *)
1987                                                     file->header.data + 64);
1988                    uint16_t new_filename_len =
1989                                      _endian_decode(*new_filename_len_ptr);
1990                    uint16_t *old_filename_len_ptr = (uint16_t *)((char *)
1991                                                     file->header.data + 66);
1992                    uint16_t old_filename_len =
1993                                      _endian_decode(*old_filename_len_ptr);
1994                    old_filename = (char *)file->header.data + 68
1995                                   + new_filename_len;
1996                    if (old_filename_len) {
1997                        status = filemgr_destroy_file(old_filename, config,
1998                                                      destroy_set);
1999                    }
2000                    free(file->header.data);
2001                }
2002                file->ops->close(file->fd);
2003                if (status == FDB_RESULT_SUCCESS) {
2004                    if (filemgr_does_file_exist(filename)
2005                                               == FDB_RESULT_SUCCESS) {
2006                        if (remove(filename)) {
2007                            status = FDB_RESULT_FILE_REMOVE_FAIL;
2008                        }
2009                    }
2010                }
2011            }
2012        }
2013    }
2014
2015    if (!destroy_file_set) { // top level or non-recursive call
2016        hash_free(destroy_set);
2017    }
2018
2019    return status;
2020}
2021
2022bool filemgr_is_rollback_on(struct filemgr *file)
2023{
2024    bool rv;
2025    spin_lock(&file->lock);
2026    rv = (file->fflags & FILEMGR_ROLLBACK_IN_PROG);
2027    spin_unlock(&file->lock);
2028    return rv;
2029}
2030
2031void filemgr_set_rollback(struct filemgr *file, uint8_t new_val)
2032{
2033    spin_lock(&file->lock);
2034    if (new_val) {
2035        file->fflags |= FILEMGR_ROLLBACK_IN_PROG;
2036    } else {
2037        file->fflags &= ~FILEMGR_ROLLBACK_IN_PROG;
2038    }
2039    spin_unlock(&file->lock);
2040}
2041
2042void filemgr_set_in_place_compaction(struct filemgr *file,
2043                                     bool in_place_compaction) {
2044    spin_lock(&file->lock);
2045    file->in_place_compaction = in_place_compaction;
2046    spin_unlock(&file->lock);
2047}
2048
2049bool filemgr_is_in_place_compaction_set(struct filemgr *file)
2050{
2051    bool ret = false;
2052    spin_lock(&file->lock);
2053    ret = file->in_place_compaction;
2054    spin_unlock(&file->lock);
2055    return ret;
2056}
2057
2058void filemgr_mutex_openlock(struct filemgr_config *config)
2059{
2060    filemgr_init(config);
2061
2062    spin_lock(&filemgr_openlock);
2063}
2064
2065void filemgr_mutex_openunlock(void)
2066{
2067    spin_unlock(&filemgr_openlock);
2068}
2069
2070void filemgr_mutex_lock(struct filemgr *file)
2071{
2072    mutex_lock(&file->writer_lock.mutex);
2073    file->writer_lock.locked = true;
2074}
2075
2076bool filemgr_mutex_trylock(struct filemgr *file) {
2077    if (mutex_trylock(&file->writer_lock.mutex)) {
2078        file->writer_lock.locked = true;
2079        return true;
2080    }
2081    return false;
2082}
2083
2084void filemgr_mutex_unlock(struct filemgr *file)
2085{
2086    if (file->writer_lock.locked) {
2087        file->writer_lock.locked = false;
2088        mutex_unlock(&file->writer_lock.mutex);
2089    }
2090}
2091
2092void filemgr_set_dirty_root(struct filemgr *file,
2093                            bid_t dirty_idtree_root,
2094                            bid_t dirty_seqtree_root)
2095{
2096    atomic_store_uint64_t(&file->header.dirty_idtree_root, dirty_idtree_root);
2097    atomic_store_uint64_t(&file->header.dirty_seqtree_root, dirty_seqtree_root);
2098}
2099
2100bool filemgr_is_commit_header(void *head_buffer, size_t blocksize)
2101{
2102    uint8_t marker[BLK_MARKER_SIZE];
2103    filemgr_magic_t magic;
2104    marker[0] = *(((uint8_t *)head_buffer)
2105                 + blocksize - BLK_MARKER_SIZE);
2106    if (marker[0] != BLK_MARKER_DBHEADER) {
2107        return false;
2108    }
2109
2110    memcpy(&magic, (uint8_t *) head_buffer
2111            + blocksize - BLK_MARKER_SIZE - sizeof(magic), sizeof(magic));
2112    magic = _endian_decode(magic);
2113
2114    return (magic == FILEMGR_MAGIC);
2115}
2116
2117void _kvs_stat_set(struct filemgr *file,
2118                   fdb_kvs_id_t kv_id,
2119                   struct kvs_stat stat)
2120{
2121    if (kv_id == 0) {
2122        spin_lock(&file->lock);
2123        file->header.stat = stat;
2124        spin_unlock(&file->lock);
2125    } else {
2126        struct avl_node *a;
2127        struct kvs_node query, *node;
2128        struct kvs_header *kv_header = file->kv_header;
2129
2130        spin_lock(&kv_header->lock);
2131        query.id = kv_id;
2132        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2133        if (a) {
2134            node = _get_entry(a, struct kvs_node, avl_id);
2135            node->stat = stat;
2136        }
2137        spin_unlock(&kv_header->lock);
2138    }
2139}
2140
2141void _kvs_stat_update_attr(struct filemgr *file,
2142                           fdb_kvs_id_t kv_id,
2143                           kvs_stat_attr_t attr,
2144                           int delta)
2145{
2146    spin_t *lock = NULL;
2147    struct kvs_stat *stat;
2148
2149    if (kv_id == 0) {
2150        stat = &file->header.stat;
2151        lock = &file->lock;
2152        spin_lock(lock);
2153    } else {
2154        struct avl_node *a;
2155        struct kvs_node query, *node;
2156        struct kvs_header *kv_header = file->kv_header;
2157
2158        lock = &kv_header->lock;
2159        spin_lock(lock);
2160        query.id = kv_id;
2161        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2162        if (!a) {
2163            // KV instance corresponding to the kv_id is already removed
2164            spin_unlock(lock);
2165            return;
2166        }
2167        node = _get_entry(a, struct kvs_node, avl_id);
2168        stat = &node->stat;
2169    }
2170
2171    if (attr == KVS_STAT_DATASIZE) {
2172        stat->datasize += delta;
2173    } else if (attr == KVS_STAT_NDOCS) {
2174        stat->ndocs += delta;
2175    } else if (attr == KVS_STAT_NLIVENODES) {
2176        stat->nlivenodes += delta;
2177    } else if (attr == KVS_STAT_WAL_NDELETES) {
2178        stat->wal_ndeletes += delta;
2179    } else if (attr == KVS_STAT_WAL_NDOCS) {
2180        stat->wal_ndocs += delta;
2181    }
2182    spin_unlock(lock);
2183}
2184
2185int _kvs_stat_get_kv_header(struct kvs_header *kv_header,
2186                            fdb_kvs_id_t kv_id,
2187                            struct kvs_stat *stat)
2188{
2189    int ret = 0;
2190    struct avl_node *a;
2191    struct kvs_node query, *node;
2192
2193    query.id = kv_id;
2194    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2195    if (a) {
2196        node = _get_entry(a, struct kvs_node, avl_id);
2197        *stat = node->stat;
2198    } else {
2199        ret = -1;
2200    }
2201    return ret;
2202}
2203
2204int _kvs_stat_get(struct filemgr *file,
2205                  fdb_kvs_id_t kv_id,
2206                  struct kvs_stat *stat)
2207{
2208    int ret = 0;
2209
2210    if (kv_id == 0) {
2211        spin_lock(&file->lock);
2212        *stat = file->header.stat;
2213        spin_unlock(&file->lock);
2214    } else {
2215        struct kvs_header *kv_header = file->kv_header;
2216
2217        spin_lock(&kv_header->lock);
2218        ret = _kvs_stat_get_kv_header(kv_header, kv_id, stat);
2219        spin_unlock(&kv_header->lock);
2220    }
2221
2222    return ret;
2223}
2224
2225uint64_t _kvs_stat_get_sum(struct filemgr *file,
2226                           kvs_stat_attr_t attr)
2227{
2228    struct avl_node *a;
2229    struct kvs_node *node;
2230    struct kvs_header *kv_header = file->kv_header;
2231
2232    uint64_t ret = 0;
2233    spin_lock(&file->lock);
2234    if (attr == KVS_STAT_DATASIZE) {
2235        ret += file->header.stat.datasize;
2236    } else if (attr == KVS_STAT_NDOCS) {
2237        ret += file->header.stat.ndocs;
2238    } else if (attr == KVS_STAT_NLIVENODES) {
2239        ret += file->header.stat.nlivenodes;
2240    } else if (attr == KVS_STAT_WAL_NDELETES) {
2241        ret += file->header.stat.wal_ndeletes;
2242    } else if (attr == KVS_STAT_WAL_NDOCS) {
2243        ret += file->header.stat.wal_ndocs;
2244    }
2245    spin_unlock(&file->lock);
2246
2247    if (kv_header) {
2248        spin_lock(&kv_header->lock);
2249        a = avl_first(kv_header->idx_id);
2250        while (a) {
2251            node = _get_entry(a, struct kvs_node, avl_id);
2252            a = avl_next(&node->avl_id);
2253
2254            if (attr == KVS_STAT_DATASIZE) {
2255                ret += node->stat.datasize;
2256            } else if (attr == KVS_STAT_NDOCS) {
2257                ret += node->stat.ndocs;
2258            } else if (attr == KVS_STAT_NLIVENODES) {
2259                ret += node->stat.nlivenodes;
2260            } else if (attr == KVS_STAT_WAL_NDELETES) {
2261                ret += node->stat.wal_ndeletes;
2262            } else if (attr == KVS_STAT_WAL_NDOCS) {
2263                ret += node->stat.wal_ndocs;
2264            }
2265        }
2266        spin_unlock(&kv_header->lock);
2267    }
2268
2269    return ret;
2270}
2271
2272int _kvs_ops_stat_get_kv_header(struct kvs_header *kv_header,
2273                                fdb_kvs_id_t kv_id,
2274                                struct kvs_ops_stat *stat)
2275{
2276    int ret = 0;
2277    struct avl_node *a;
2278    struct kvs_node query, *node;
2279
2280    query.id = kv_id;
2281    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2282    if (a) {
2283        node = _get_entry(a, struct kvs_node, avl_id);
2284        *stat = node->op_stat;
2285    } else {
2286        ret = -1;
2287    }
2288    return ret;
2289}
2290
2291int _kvs_ops_stat_get(struct filemgr *file,
2292                      fdb_kvs_id_t kv_id,
2293                      struct kvs_ops_stat *stat)
2294{
2295    int ret = 0;
2296
2297    if (kv_id == 0) {
2298        spin_lock(&file->lock);
2299        *stat = file->header.op_stat;
2300        spin_unlock(&file->lock);
2301    } else {
2302        struct kvs_header *kv_header = file->kv_header;
2303
2304        spin_lock(&kv_header->lock);
2305        ret = _kvs_ops_stat_get_kv_header(kv_header, kv_id, stat);
2306        spin_unlock(&kv_header->lock);
2307    }
2308
2309    return ret;
2310}
2311
2312void _init_op_stats(struct kvs_ops_stat *stat) {
2313    atomic_init_uint64_t(&stat->num_sets, 0);
2314    atomic_init_uint64_t(&stat->num_dels, 0);
2315    atomic_init_uint64_t(&stat->num_commits, 0);
2316    atomic_init_uint64_t(&stat->num_compacts, 0);
2317    atomic_init_uint64_t(&stat->num_gets, 0);
2318    atomic_init_uint64_t(&stat->num_iterator_gets, 0);
2319    atomic_init_uint64_t(&stat->num_iterator_moves, 0);
2320}
2321
2322struct kvs_ops_stat *filemgr_get_ops_stats(struct filemgr *file,
2323                                           struct kvs_info *kvs)
2324{
2325    struct kvs_ops_stat *stat = NULL;
2326    if (!kvs || (kvs && kvs->id == 0)) {
2327        return &file->header.op_stat;
2328    } else {
2329        struct kvs_header *kv_header = file->kv_header;
2330        struct avl_node *a;
2331        struct kvs_node query, *node;
2332        spin_lock(&kv_header->lock);
2333        query.id = kvs->id;
2334        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2335        if (a) {
2336            node = _get_entry(a, struct kvs_node, avl_id);
2337            stat = &node->op_stat;
2338        }
2339        spin_unlock(&kv_header->lock);
2340    }
2341    return stat;
2342}
2343
2344void buf2kvid(size_t chunksize, void *buf, fdb_kvs_id_t *id)
2345{
2346    size_t size_id = sizeof(fdb_kvs_id_t);
2347    fdb_kvs_id_t temp;
2348
2349    if (chunksize == size_id) {
2350        temp = *((fdb_kvs_id_t*)buf);
2351    } else if (chunksize < size_id) {
2352        temp = 0;
2353        memcpy((uint8_t*)&temp + (size_id - chunksize), buf, chunksize);
2354    } else { // chunksize > sizeof(fdb_kvs_id_t)
2355        memcpy(&temp, (uint8_t*)buf + (chunksize - size_id), size_id);
2356    }
2357    *id = _endian_decode(temp);
2358}
2359
2360void kvid2buf(size_t chunksize, fdb_kvs_id_t id, void *buf)
2361{
2362    size_t size_id = sizeof(fdb_kvs_id_t);
2363    id = _endian_encode(id);
2364
2365    if (chunksize == size_id) {
2366        memcpy(buf, &id, size_id);
2367    } else if (chunksize < size_id) {
2368        memcpy(buf, (uint8_t*)&id + (size_id - chunksize), chunksize);
2369    } else { // chunksize > sizeof(fdb_kvs_id_t)
2370        memset(buf, 0x0, chunksize - size_id);
2371        memcpy((uint8_t*)buf + (chunksize - size_id), &id, size_id);
2372    }
2373}
2374
2375void buf2buf(size_t chunksize_src, void *buf_src,
2376             size_t chunksize_dst, void *buf_dst)
2377{
2378    if (chunksize_dst == chunksize_src) {
2379        memcpy(buf_dst, buf_src, chunksize_src);
2380    } else if (chunksize_dst < chunksize_src) {
2381        memcpy(buf_dst, (uint8_t*)buf_src + (chunksize_src - chunksize_dst),
2382               chunksize_dst);
2383    } else { // chunksize_dst > chunksize_src
2384        memset(buf_dst, 0x0, chunksize_dst - chunksize_src);
2385        memcpy((uint8_t*)buf_dst + (chunksize_dst - chunksize_src),
2386               buf_src, chunksize_src);
2387    }
2388}
2389