xref: /4.0.0/forestdb/src/filemgr.cc (revision 5f7821ba)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <sys/stat.h>
23#include <stdarg.h>
24#if !defined(WIN32) && !defined(_WIN32)
25#include <sys/time.h>
26#endif
27
28#include "filemgr.h"
29#include "filemgr_ops.h"
30#include "hash_functions.h"
31#include "blockcache.h"
32#include "wal.h"
33#include "list.h"
34#include "fdb_internal.h"
35#include "time_utils.h"
36
37#include "memleak.h"
38
39#ifdef __DEBUG
40#ifndef __DEBUG_FILEMGR
41    #undef DBG
42    #undef DBGCMD
43    #undef DBGSW
44    #define DBG(...)
45    #define DBGCMD(...)
46    #define DBGSW(n, ...)
47#endif
48#endif
49
50// NBUCKET must be power of 2
51#define NBUCKET (1024)
52#define FILEMGR_MAGIC (UINT64_C(0xdeadcafebeefbeef))
53
54// global static variables
55#ifdef SPIN_INITIALIZER
56static spin_t initial_lock = SPIN_INITIALIZER;
57#else
58static volatile unsigned int initial_lock_status = 0;
59static spin_t initial_lock;
60#endif
61
62
63static volatile uint8_t filemgr_initialized = 0;
64static struct filemgr_config global_config;
65static struct hash hash;
66static spin_t filemgr_openlock;
67
68struct temp_buf_item{
69    void *addr;
70    struct list_elem le;
71};
72static struct list temp_buf;
73static spin_t temp_buf_lock;
74
75static bool lazy_file_deletion_enabled = false;
76static register_file_removal_func register_file_removal = NULL;
77static check_file_removal_func is_file_removed = NULL;
78
79static void spin_init_wrap(void *lock) {
80    spin_init((spin_t*)lock);
81}
82
83static void spin_destroy_wrap(void *lock) {
84    spin_destroy((spin_t*)lock);
85}
86
87static void spin_lock_wrap(void *lock) {
88    spin_lock((spin_t*)lock);
89}
90
91static void spin_unlock_wrap(void *lock) {
92    spin_unlock((spin_t*)lock);
93}
94
95static void mutex_init_wrap(void *lock) {
96    mutex_init((mutex_t*)lock);
97}
98
99static void mutex_destroy_wrap(void *lock) {
100    mutex_destroy((mutex_t*)lock);
101}
102
103static void mutex_lock_wrap(void *lock) {
104    mutex_lock((mutex_t*)lock);
105}
106
107static void mutex_unlock_wrap(void *lock) {
108    mutex_unlock((mutex_t*)lock);
109}
110
111static int _kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
112{
113    struct kvs_node *aa, *bb;
114    aa = _get_entry(a, struct kvs_node, avl_id);
115    bb = _get_entry(b, struct kvs_node, avl_id);
116
117    if (aa->id < bb->id) {
118        return -1;
119    } else if (aa->id > bb->id) {
120        return 1;
121    } else {
122        return 0;
123    }
124}
125
126static int _block_is_overlapped(void *pbid1, void *pis_writer1,
127                                void *pbid2, void *pis_writer2,
128                                void *aux)
129{
130    (void)aux;
131    bid_t bid1, is_writer1, bid2, is_writer2;
132    bid1 = *(bid_t*)pbid1;
133    is_writer1 = *(bid_t*)pis_writer1;
134    bid2 = *(bid_t*)pbid2;
135    is_writer2 = *(bid_t*)pis_writer2;
136
137    if (bid1 != bid2) {
138        // not overlapped
139        return 0;
140    } else {
141        // overlapped
142        if (!is_writer1 && !is_writer2) {
143            // both are readers
144            return 0;
145        } else {
146            return 1;
147        }
148    }
149}
150
151fdb_status fdb_log(err_log_callback *log_callback,
152                   fdb_status status,
153                   const char *format, ...)
154{
155    if (log_callback && log_callback->callback) {
156        char msg[1024];
157        va_list args;
158        va_start(args, format);
159        vsprintf(msg, format, args);
160        va_end(args);
161        log_callback->callback(status, msg, log_callback->ctx_data);
162    }
163    return status;
164}
165
166static void _log_errno_str(struct filemgr_ops *ops,
167                           err_log_callback *log_callback,
168                           fdb_status io_error,
169                           const char *what,
170                           const char *filename)
171{
172    if (io_error < 0) {
173        char errno_msg[512];
174        ops->get_errno_str(errno_msg, 512);
175        fdb_log(log_callback, io_error,
176                "Error in %s on a database file '%s', %s", what, filename, errno_msg);
177    }
178}
179
180static uint32_t _file_hash(struct hash *hash, struct hash_elem *e)
181{
182    struct filemgr *file = _get_entry(e, struct filemgr, e);
183    int len = strlen(file->filename);
184    return chksum(file->filename, len) & ((unsigned)(NBUCKET-1));
185}
186
187static int _file_cmp(struct hash_elem *a, struct hash_elem *b)
188{
189    struct filemgr *aa, *bb;
190    aa = _get_entry(a, struct filemgr, e);
191    bb = _get_entry(b, struct filemgr, e);
192    return strcmp(aa->filename, bb->filename);
193}
194
195void filemgr_init(struct filemgr_config *config)
196{
197    // global initialization
198    // initialized only once at first time
199    if (!filemgr_initialized) {
200#ifndef SPIN_INITIALIZER
201        // Note that only Windows passes through this routine
202        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
203            // atomically initialize spin lock only once
204            spin_init(&initial_lock);
205            initial_lock_status = 2;
206        } else {
207            // the others ... wait until initializing 'initial_lock' is done
208            while (initial_lock_status != 2) {
209                Sleep(1);
210            }
211        }
212#endif
213
214        spin_lock(&initial_lock);
215        if (!filemgr_initialized) {
216            global_config = *config;
217
218            if (global_config.ncacheblock > 0)
219                bcache_init(global_config.ncacheblock, global_config.blocksize);
220
221            hash_init(&hash, NBUCKET, _file_hash, _file_cmp);
222
223            // initialize temp buffer
224            list_init(&temp_buf);
225            spin_init(&temp_buf_lock);
226
227            // initialize global lock
228            spin_init(&filemgr_openlock);
229
230            // set the initialize flag
231            filemgr_initialized = 1;
232        }
233        spin_unlock(&initial_lock);
234    }
235}
236
237void filemgr_set_lazy_file_deletion(bool enable,
238                                    register_file_removal_func regis_func,
239                                    check_file_removal_func check_func)
240{
241    lazy_file_deletion_enabled = enable;
242    register_file_removal = regis_func;
243    is_file_removed = check_func;
244}
245
246static void * _filemgr_get_temp_buf()
247{
248    struct list_elem *e;
249    struct temp_buf_item *item;
250
251    spin_lock(&temp_buf_lock);
252    e = list_pop_front(&temp_buf);
253    if (e) {
254        item = _get_entry(e, struct temp_buf_item, le);
255    } else {
256        void *addr;
257
258        malloc_align(addr, FDB_SECTOR_SIZE,
259                     global_config.blocksize + sizeof(struct temp_buf_item));
260
261        item = (struct temp_buf_item *)((uint8_t *) addr + global_config.blocksize);
262        item->addr = addr;
263    }
264    spin_unlock(&temp_buf_lock);
265
266    return item->addr;
267}
268
269static void _filemgr_release_temp_buf(void *buf)
270{
271    struct temp_buf_item *item;
272
273    spin_lock(&temp_buf_lock);
274    item = (struct temp_buf_item*)((uint8_t *)buf + global_config.blocksize);
275    list_push_front(&temp_buf, &item->le);
276    spin_unlock(&temp_buf_lock);
277}
278
279static void _filemgr_shutdown_temp_buf()
280{
281    struct list_elem *e;
282    struct temp_buf_item *item;
283    size_t count=0;
284
285    spin_lock(&temp_buf_lock);
286    e = list_begin(&temp_buf);
287    while(e){
288        item = _get_entry(e, struct temp_buf_item, le);
289        e = list_remove(&temp_buf, e);
290        free_align(item->addr);
291        count++;
292    }
293    spin_unlock(&temp_buf_lock);
294}
295
296static fdb_status _filemgr_read_header(struct filemgr *file,
297                                       err_log_callback *log_callback)
298{
299    uint8_t marker[BLK_MARKER_SIZE];
300    filemgr_magic_t magic;
301    filemgr_header_len_t len;
302    uint8_t *buf;
303    uint32_t crc, crc_file;
304    fdb_status status = FDB_RESULT_SUCCESS;
305
306    // get temp buffer
307    buf = (uint8_t *) _filemgr_get_temp_buf();
308
309    if (atomic_get_uint64_t(&file->pos) > 0) {
310        // Crash Recovery Test 1: unaligned last block write
311        uint64_t remain = atomic_get_uint64_t(&file->pos) % file->blocksize;
312        if (remain) {
313            atomic_sub_uint64_t(&file->pos, remain);
314            atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
315            const char *msg = "Crash Detected: %" _F64 " non-block aligned bytes discarded "
316                "from a database file '%s'\n";
317            DBG(msg, remain, file->filename);
318            fdb_log(log_callback, FDB_RESULT_READ_FAIL /* Need to add a better error code*/,
319                    msg, remain, file->filename);
320        }
321
322        size_t block_counter = 0;
323        do {
324            ssize_t rv = file->ops->pread(file->fd, buf, file->blocksize,
325                atomic_get_uint64_t(&file->pos) - file->blocksize);
326            if (rv != file->blocksize) {
327                status = FDB_RESULT_READ_FAIL;
328                const char *msg = "Unable to read a database file '%s' with "
329                    "blocksize %" _F64 "\n";
330                DBG(msg, file->filename, file->blocksize);
331                fdb_log(log_callback, status, msg, file->filename, file->blocksize);
332                break;
333            }
334            ++block_counter;
335            memcpy(marker, buf + file->blocksize - BLK_MARKER_SIZE,
336                   BLK_MARKER_SIZE);
337
338            if (marker[0] == BLK_MARKER_DBHEADER) {
339                // possible need for byte conversions here
340                memcpy(&magic,
341                       buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
342                       sizeof(magic));
343                magic = _endian_decode(magic);
344
345                if (magic == FILEMGR_MAGIC) {
346                    memcpy(&len,
347                           buf + file->blocksize - BLK_MARKER_SIZE -
348                           sizeof(magic) - sizeof(len),
349                           sizeof(len));
350                    len = _endian_decode(len);
351
352                    crc = chksum(buf, len - sizeof(crc));
353                    memcpy(&crc_file, buf + len - sizeof(crc), sizeof(crc));
354                    crc_file = _endian_decode(crc_file);
355                    if (crc == crc_file) {
356                        file->header.data = (void *)malloc(len);
357
358                        memcpy(file->header.data, buf, len);
359                        memcpy(&file->header.revnum, buf + len,
360                               sizeof(filemgr_header_revnum_t));
361                        memcpy((void *) &file->header.seqnum,
362                                buf + len + sizeof(filemgr_header_revnum_t),
363                                sizeof(fdb_seqnum_t));
364                        file->header.revnum =
365                            _endian_decode(file->header.revnum);
366                        file->header.seqnum =
367                            _endian_decode(file->header.seqnum);
368                        file->header.size = len;
369                        atomic_store_uint64_t(&file->header.bid,
370                            (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1);
371                        atomic_store_uint64_t(&file->header.dirty_idtree_root,
372                                              BLK_NOT_FOUND);
373                        atomic_store_uint64_t(&file->header.dirty_seqtree_root,
374                                              BLK_NOT_FOUND);
375                        memset(&file->header.stat, 0x0, sizeof(file->header.stat));
376
377                        // release temp buffer
378                        _filemgr_release_temp_buf(buf);
379
380                        return FDB_RESULT_SUCCESS;
381                    } else {
382                        status = FDB_RESULT_CHECKSUM_ERROR;
383                        const char *msg = "Crash Detected: CRC on disk %u != %u "
384                            "in a database file '%s'\n";
385                        DBG(msg, crc_file, crc, file->filename);
386                        fdb_log(log_callback, status, msg, crc_file, crc,
387                                file->filename);
388                    }
389                } else {
390                    status = FDB_RESULT_FILE_CORRUPTION;
391                    const char *msg = "Crash Detected: Wrong Magic %" _F64 " != %" _F64
392                        " in a database file '%s'\n";
393                    DBG(msg, magic, FILEMGR_MAGIC, file->filename);
394                    fdb_log(log_callback, status, msg, magic, FILEMGR_MAGIC,
395                            file->filename);
396                }
397            } else {
398                status = FDB_RESULT_NO_DB_HEADERS;
399                if (block_counter == 1) {
400                    const char *msg = "Crash Detected: Last Block not DBHEADER %0.01x "
401                                      "in a database file '%s'\n";
402                    DBG(msg, marker[0], file->filename);
403                    fdb_log(log_callback, status, msg, marker[0], file->filename);
404                }
405            }
406
407            atomic_sub_uint64_t(&file->pos, file->blocksize);
408            atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
409        } while (atomic_get_uint64_t(&file->pos));
410    }
411
412    // release temp buffer
413    _filemgr_release_temp_buf(buf);
414
415    file->header.size = 0;
416    file->header.revnum = 0;
417    file->header.seqnum = 0;
418    file->header.data = NULL;
419    atomic_store_uint64_t(&file->header.bid, 0);
420    atomic_store_uint64_t(&file->header.dirty_idtree_root, BLK_NOT_FOUND);
421    atomic_store_uint64_t(&file->header.dirty_seqtree_root, BLK_NOT_FOUND);
422    memset(&file->header.stat, 0x0, sizeof(file->header.stat));
423    return status;
424}
425
426size_t filemgr_get_ref_count(struct filemgr *file)
427{
428    size_t ret = 0;
429    spin_lock(&file->lock);
430    ret = file->ref_count;
431    spin_unlock(&file->lock);
432    return ret;
433}
434
435uint64_t filemgr_get_bcache_used_space(void)
436{
437    uint64_t bcache_free_space = 0;
438    if (global_config.ncacheblock) { // If buffer cache is indeed configured
439        bcache_free_space = bcache_get_num_free_blocks();
440        bcache_free_space = (global_config.ncacheblock - bcache_free_space)
441                          * global_config.blocksize;
442    }
443    return bcache_free_space;
444}
445
446struct filemgr_prefetch_args {
447    struct filemgr *file;
448    uint64_t duration;
449    err_log_callback *log_callback;
450    void *aux;
451};
452
453static void *_filemgr_prefetch_thread(void *voidargs)
454{
455    struct filemgr_prefetch_args *args = (struct filemgr_prefetch_args*)voidargs;
456    uint8_t *buf = alca(uint8_t, args->file->blocksize);
457    uint64_t cur_pos = 0, i;
458    uint64_t bcache_free_space;
459    bid_t bid;
460    bool terminate = false;
461    struct timeval begin, cur, gap;
462
463    spin_lock(&args->file->lock);
464    cur_pos = atomic_get_uint64_t(&args->file->last_commit);
465    spin_unlock(&args->file->lock);
466    if (cur_pos < FILEMGR_PREFETCH_UNIT) {
467        terminate = true;
468    } else {
469        cur_pos -= FILEMGR_PREFETCH_UNIT;
470    }
471    // read backwards from the end of the file, in the unit of FILEMGR_PREFETCH_UNIT
472    gettimeofday(&begin, NULL);
473    while (!terminate) {
474        for (i = cur_pos;
475             i < cur_pos + FILEMGR_PREFETCH_UNIT;
476             i += args->file->blocksize) {
477
478            gettimeofday(&cur, NULL);
479            gap = _utime_gap(begin, cur);
480            bcache_free_space = bcache_get_num_free_blocks();
481            bcache_free_space *= args->file->blocksize;
482
483            if (args->file->prefetch_status == FILEMGR_PREFETCH_ABORT ||
484                gap.tv_sec >= (int64_t)args->duration ||
485                bcache_free_space < FILEMGR_PREFETCH_UNIT) {
486                // terminate thread when
487                // 1. got abort signal
488                // 2. time out
489                // 3. not enough free space in block cache
490                terminate = true;
491                break;
492            } else {
493                bid = i / args->file->blocksize;
494                if (filemgr_read(args->file, bid, buf, NULL, true)
495                        != FDB_RESULT_SUCCESS) {
496                    // 4. read failure
497                    fdb_log(args->log_callback, FDB_RESULT_READ_FAIL,
498                            "Prefetch thread failed to read a block with block id %" _F64
499                            " from a database file '%s'", bid, args->file->filename);
500                    terminate = true;
501                    break;
502                }
503            }
504        }
505
506        if (cur_pos >= FILEMGR_PREFETCH_UNIT) {
507            cur_pos -= FILEMGR_PREFETCH_UNIT;
508        } else {
509            // remaining space is less than FILEMGR_PREFETCH_UNIT
510            terminate = true;
511        }
512    }
513
514    args->file->prefetch_status = FILEMGR_PREFETCH_IDLE;
515    free(args);
516    return NULL;
517}
518
519// prefetch the given DB file
520void filemgr_prefetch(struct filemgr *file,
521                      struct filemgr_config *config,
522                      err_log_callback *log_callback)
523{
524    uint64_t bcache_free_space;
525
526    bcache_free_space = bcache_get_num_free_blocks();
527    bcache_free_space *= file->blocksize;
528
529    // block cache should have free space larger than FILEMGR_PREFETCH_UNIT
530    spin_lock(&file->lock);
531    if (atomic_get_uint64_t(&file->last_commit) > 0 &&
532        bcache_free_space >= FILEMGR_PREFETCH_UNIT) {
533        // invoke prefetch thread
534        struct filemgr_prefetch_args *args;
535        args = (struct filemgr_prefetch_args *)
536               calloc(1, sizeof(struct filemgr_prefetch_args));
537        args->file = file;
538        args->duration = config->prefetch_duration;
539        args->log_callback = log_callback;
540
541        file->prefetch_status = FILEMGR_PREFETCH_RUNNING;
542        thread_create(&file->prefetch_tid, _filemgr_prefetch_thread, args);
543    }
544    spin_unlock(&file->lock);
545}
546
547fdb_status filemgr_does_file_exist(char *filename) {
548    struct filemgr_ops *ops = get_filemgr_ops();
549    int fd = ops->open(filename, O_RDONLY, 0444);
550    if (fd < 0) {
551        return (fdb_status) fd;
552    }
553    ops->close(fd);
554    return FDB_RESULT_SUCCESS;
555}
556
557filemgr_open_result filemgr_open(char *filename, struct filemgr_ops *ops,
558                                 struct filemgr_config *config,
559                                 err_log_callback *log_callback)
560{
561    struct filemgr *file = NULL;
562    struct filemgr query;
563    struct hash_elem *e = NULL;
564    bool create = config->options & FILEMGR_CREATE;
565    int file_flag = 0x0;
566    int fd = -1;
567    fdb_status status;
568    filemgr_open_result result = {NULL, FDB_RESULT_OPEN_FAIL};
569
570    filemgr_init(config);
571
572    // check whether file is already opened or not
573    query.filename = filename;
574    spin_lock(&filemgr_openlock);
575    e = hash_find(&hash, &query.e);
576
577    if (e) {
578        // already opened (return existing structure)
579        file = _get_entry(e, struct filemgr, e);
580
581        spin_lock(&file->lock);
582        file->ref_count++;
583
584        if (atomic_get_uint8_t(&file->status) == FILE_CLOSED) { // if file was closed before
585            file_flag = O_RDWR;
586            if (create) {
587                file_flag |= O_CREAT;
588            }
589            *file->config = *config;
590            file->config->blocksize = global_config.blocksize;
591            file->config->ncacheblock = global_config.ncacheblock;
592            file_flag |= config->flag;
593            file->fd = file->ops->open(file->filename, file_flag, 0666);
594            if (file->fd < 0) {
595                if (file->fd == FDB_RESULT_NO_SUCH_FILE) {
596                    // A database file was manually deleted by the user.
597                    // Clean up global hash table, WAL index, and buffer cache.
598                    // Then, retry it with a create option below IFF it is not
599                    // a read-only open attempt
600                    struct hash_elem *ret;
601                    spin_unlock(&file->lock);
602                    ret = hash_remove(&hash, &file->e);
603                    fdb_assert(ret, 0, 0);
604                    filemgr_free_func(&file->e);
605                    if (!create) {
606                        _log_errno_str(ops, log_callback,
607                                FDB_RESULT_NO_SUCH_FILE, "OPEN", filename);
608                        spin_unlock(&filemgr_openlock);
609                        result.rv = FDB_RESULT_NO_SUCH_FILE;
610                        return result;
611                    }
612                } else {
613                    _log_errno_str(file->ops, log_callback,
614                                  (fdb_status)file->fd, "OPEN", filename);
615                    file->ref_count--;
616                    spin_unlock(&file->lock);
617                    spin_unlock(&filemgr_openlock);
618                    result.rv = file->fd;
619                    return result;
620                }
621            } else { // Reopening the closed file is succeed.
622                atomic_store_uint8_t(&file->status, FILE_NORMAL);
623                if (config->options & FILEMGR_SYNC) {
624                    file->fflags |= FILEMGR_SYNC;
625                } else {
626                    file->fflags &= ~FILEMGR_SYNC;
627                }
628                spin_unlock(&file->lock);
629                spin_unlock(&filemgr_openlock);
630                result.file = file;
631                result.rv = FDB_RESULT_SUCCESS;
632                return result;
633            }
634        } else { // file is already opened.
635
636            if (config->options & FILEMGR_SYNC) {
637                file->fflags |= FILEMGR_SYNC;
638            } else {
639                file->fflags &= ~FILEMGR_SYNC;
640            }
641
642            spin_unlock(&file->lock);
643            spin_unlock(&filemgr_openlock);
644            result.file = file;
645            result.rv = FDB_RESULT_SUCCESS;
646            return result;
647        }
648    }
649
650    file_flag = O_RDWR;
651    if (create) {
652        file_flag |= O_CREAT;
653    }
654    file_flag |= config->flag;
655    fd = ops->open(filename, file_flag, 0666);
656    if (fd < 0) {
657        _log_errno_str(ops, log_callback, (fdb_status)fd, "OPEN", filename);
658        spin_unlock(&filemgr_openlock);
659        result.rv = fd;
660        return result;
661    }
662    file = (struct filemgr*)calloc(1, sizeof(struct filemgr));
663    file->filename_len = strlen(filename);
664    file->filename = (char*)malloc(file->filename_len + 1);
665    strcpy(file->filename, filename);
666
667    file->ref_count = 1;
668
669    file->wal = (struct wal *)calloc(1, sizeof(struct wal));
670    file->wal->flag = 0;
671
672    file->ops = ops;
673    file->blocksize = global_config.blocksize;
674    atomic_init_uint8_t(&file->status, FILE_NORMAL);
675    file->config = (struct filemgr_config*)malloc(sizeof(struct filemgr_config));
676    *file->config = *config;
677    file->config->blocksize = global_config.blocksize;
678    file->config->ncacheblock = global_config.ncacheblock;
679    file->new_file = NULL;
680    file->old_filename = NULL;
681    file->fd = fd;
682
683    cs_off_t offset = file->ops->goto_eof(file->fd);
684    if (offset == FDB_RESULT_SEEK_FAIL) {
685        _log_errno_str(file->ops, log_callback, FDB_RESULT_SEEK_FAIL, "SEEK_END", filename);
686        file->ops->close(file->fd);
687        free(file->wal);
688        free(file->filename);
689        free(file->config);
690        free(file);
691        spin_unlock(&filemgr_openlock);
692        result.rv = FDB_RESULT_SEEK_FAIL;
693        return result;
694    }
695    atomic_init_uint64_t(&file->last_commit, offset);
696    atomic_init_uint64_t(&file->pos, offset);
697
698    file->bcache = NULL;
699    file->in_place_compaction = false;
700    file->kv_header = NULL;
701    file->prefetch_status = FILEMGR_PREFETCH_IDLE;
702
703    atomic_init_uint64_t(&file->header.bid, 0);
704    atomic_init_uint64_t(&file->header.dirty_idtree_root, 0);
705    atomic_init_uint64_t(&file->header.dirty_seqtree_root, 0);
706    _init_op_stats(&file->header.op_stat);
707    status = _filemgr_read_header(file, log_callback);
708    if (status != FDB_RESULT_SUCCESS) {
709        _log_errno_str(file->ops, log_callback, status, "READ", filename);
710        file->ops->close(file->fd);
711        free(file->wal);
712        free(file->filename);
713        free(file->config);
714        free(file);
715        spin_unlock(&filemgr_openlock);
716        result.rv = status;
717        return result;
718    }
719
720    spin_init(&file->lock);
721
722#ifdef __FILEMGR_DATA_PARTIAL_LOCK
723    struct plock_ops pops;
724    struct plock_config pconfig;
725
726    pops.init_user = mutex_init_wrap;
727    pops.lock_user = mutex_lock_wrap;
728    pops.unlock_user = mutex_unlock_wrap;
729    pops.destroy_user = mutex_destroy_wrap;
730    pops.init_internal = spin_init_wrap;
731    pops.lock_internal = spin_lock_wrap;
732    pops.unlock_internal = spin_unlock_wrap;
733    pops.destroy_internal = spin_destroy_wrap;
734    pops.is_overlapped = _block_is_overlapped;
735
736    memset(&pconfig, 0x0, sizeof(pconfig));
737    pconfig.ops = &pops;
738    pconfig.sizeof_lock_internal = sizeof(spin_t);
739    pconfig.sizeof_lock_user = sizeof(mutex_t);
740    pconfig.sizeof_range = sizeof(bid_t);
741    pconfig.aux = NULL;
742    plock_init(&file->plock, &pconfig);
743#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
744    int i;
745    for (i=0;i<DLOCK_MAX;++i) {
746        mutex_init(&file->data_mutex[i]);
747    }
748#else
749    int i;
750    for (i=0;i<DLOCK_MAX;++i) {
751        spin_init(&file->data_spinlock[i]);
752    }
753#endif //__FILEMGR_DATA_PARTIAL_LOCK
754
755    mutex_init(&file->writer_lock.mutex);
756    file->writer_lock.locked = false;
757
758    // initialize WAL
759    if (!wal_is_initialized(file)) {
760        wal_init(file, FDB_WAL_NBUCKET);
761    }
762
763    // init global transaction for the file
764    file->global_txn.wrapper = (struct wal_txn_wrapper*)
765                               malloc(sizeof(struct wal_txn_wrapper));
766    file->global_txn.wrapper->txn = &file->global_txn;
767    file->global_txn.handle = NULL;
768    if (atomic_get_uint64_t(&file->pos)) {
769        file->global_txn.prev_hdr_bid =
770            (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1;
771    } else {
772        file->global_txn.prev_hdr_bid = BLK_NOT_FOUND;
773    }
774    file->global_txn.items = (struct list *)malloc(sizeof(struct list));
775    list_init(file->global_txn.items);
776    file->global_txn.isolation = FDB_ISOLATION_READ_COMMITTED;
777    wal_add_transaction(file, &file->global_txn);
778
779    hash_insert(&hash, &file->e);
780    if (config->prefetch_duration > 0) {
781        filemgr_prefetch(file, config, log_callback);
782    }
783    spin_unlock(&filemgr_openlock);
784
785    if (config->options & FILEMGR_SYNC) {
786        file->fflags |= FILEMGR_SYNC;
787    } else {
788        file->fflags &= ~FILEMGR_SYNC;
789    }
790
791    result.file = file;
792    result.rv = FDB_RESULT_SUCCESS;
793    return result;
794}
795
796uint64_t filemgr_update_header(struct filemgr *file, void *buf, size_t len)
797{
798    uint64_t ret;
799
800    spin_lock(&file->lock);
801
802    if (file->header.data == NULL) {
803        file->header.data = (void *)malloc(len);
804    }else if (file->header.size < len){
805        file->header.data = (void *)realloc(file->header.data, len);
806    }
807    memcpy(file->header.data, buf, len);
808    file->header.size = len;
809    ++(file->header.revnum);
810    ret = file->header.revnum;
811
812    spin_unlock(&file->lock);
813
814    return ret;
815}
816
817filemgr_header_revnum_t filemgr_get_header_revnum(struct filemgr *file)
818{
819    filemgr_header_revnum_t ret;
820    spin_lock(&file->lock);
821    ret = file->header.revnum;
822    spin_unlock(&file->lock);
823    return ret;
824}
825
826// 'filemgr_get_seqnum' & 'filemgr_set_seqnum' have to be protected by
827// 'filemgr_mutex_lock' & 'filemgr_mutex_unlock'.
828fdb_seqnum_t filemgr_get_seqnum(struct filemgr *file)
829{
830    return file->header.seqnum;
831}
832
833void filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)
834{
835    file->header.seqnum = seqnum;
836}
837
838void* filemgr_get_header(struct filemgr *file, void *buf, size_t *len)
839{
840    spin_lock(&file->lock);
841
842    if (file->header.size > 0) {
843        if (buf == NULL) {
844            buf = (void*)malloc(file->header.size);
845        }
846        memcpy(buf, file->header.data, file->header.size);
847    }
848    *len = file->header.size;
849
850    spin_unlock(&file->lock);
851
852    return buf;
853}
854
855fdb_status filemgr_fetch_header(struct filemgr *file, uint64_t bid,
856                                void *buf, size_t *len, fdb_seqnum_t *seqnum,
857                                err_log_callback *log_callback)
858{
859    uint8_t *_buf;
860    uint8_t marker[BLK_MARKER_SIZE];
861    filemgr_header_len_t hdr_len;
862    filemgr_magic_t magic;
863    fdb_status status = FDB_RESULT_SUCCESS;
864
865    if (!bid || bid == BLK_NOT_FOUND) {
866        *len = 0; // No other header available
867        return FDB_RESULT_SUCCESS;
868    }
869    _buf = (uint8_t *)_filemgr_get_temp_buf();
870
871    status = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
872
873    if (status != FDB_RESULT_SUCCESS) {
874        fdb_log(log_callback, status,
875                "Failed to read a database header with block id %" _F64 " in "
876                "a database file '%s'", bid, file->filename);
877        _filemgr_release_temp_buf(_buf);
878        return status;
879    }
880    memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
881            BLK_MARKER_SIZE);
882
883    if (marker[0] != BLK_MARKER_DBHEADER) {
884        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
885                "A block marker of the database header block id %" _F64 " in "
886                "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
887                bid, file->filename);
888        _filemgr_release_temp_buf(_buf);
889        return FDB_RESULT_READ_FAIL;
890    }
891    memcpy(&magic,
892            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
893            sizeof(magic));
894    magic = _endian_decode(magic);
895    if (magic != FILEMGR_MAGIC) {
896        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
897                "A block magic value of the database header block id %" _F64 " in "
898                "a database file '%s' does NOT match FILEMGR_MAGIC!",
899                bid, file->filename);
900        _filemgr_release_temp_buf(_buf);
901        return FDB_RESULT_READ_FAIL;
902    }
903    memcpy(&hdr_len,
904            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
905            sizeof(hdr_len), sizeof(hdr_len));
906    hdr_len = _endian_decode(hdr_len);
907
908    memcpy(buf, _buf, hdr_len);
909    *len = hdr_len;
910
911    if (seqnum) {
912        // copy default KVS's seqnum
913        fdb_seqnum_t _seqnum;
914        memcpy(&_seqnum, _buf + hdr_len + sizeof(filemgr_header_revnum_t),
915               sizeof(_seqnum));
916        *seqnum = _endian_decode(_seqnum);
917    }
918
919    _filemgr_release_temp_buf(_buf);
920
921    return status;
922}
923
924uint64_t filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid,
925                                   void *buf, size_t *len, fdb_seqnum_t *seqnum,
926                                   err_log_callback *log_callback)
927{
928    uint8_t *_buf;
929    uint8_t marker[BLK_MARKER_SIZE];
930    fdb_seqnum_t _seqnum;
931    filemgr_header_revnum_t _revnum;
932    filemgr_header_len_t hdr_len;
933    filemgr_magic_t magic;
934    bid_t _prev_bid, prev_bid;
935    int found = 0;
936
937    if (!bid || bid == BLK_NOT_FOUND) {
938        *len = 0; // No other header available
939        return bid;
940    }
941    _buf = (uint8_t *)_filemgr_get_temp_buf();
942
943    // Reverse scan the file for a previous DB header
944    do {
945        // Get prev_bid from the current header.
946        // Since the current header is already cached during the previous
947        // operation, no disk I/O will be triggered.
948        if (filemgr_read(file, (bid_t)bid, _buf, log_callback, true)
949                != FDB_RESULT_SUCCESS) {
950            break;
951        }
952
953        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
954               BLK_MARKER_SIZE);
955        memcpy(&magic,
956               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
957               sizeof(magic));
958        magic = _endian_decode(magic);
959
960        if (marker[0] != BLK_MARKER_DBHEADER ||
961            magic != FILEMGR_MAGIC) {
962            // not a header block
963            // this happens when this function is invoked between
964            // fdb_set() call and fdb_commit() call, so the last block
965            // in the file is not a header block
966            bid_t latest_hdr = filemgr_get_header_bid(file);
967            if (latest_hdr != BLK_NOT_FOUND && bid > latest_hdr) {
968                // get the latest header BID
969                bid = latest_hdr;
970            } else {
971                break;
972            }
973        } else {
974            memcpy(&_prev_bid,
975                   _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
976                       sizeof(hdr_len) - sizeof(_prev_bid),
977                   sizeof(_prev_bid));
978            prev_bid = _endian_decode(_prev_bid);
979            if (bid <= prev_bid) {
980                // no more prev header, or broken linked list
981                break;
982            }
983            bid = prev_bid;
984        }
985
986        // Read the prev header
987        fdb_status fs = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
988        if (fs != FDB_RESULT_SUCCESS) {
989            fdb_log(log_callback, fs,
990                    "Failed to read a previous database header with block id %" _F64 " in "
991                    "a database file '%s'", bid, file->filename);
992            break;
993        }
994
995        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
996               BLK_MARKER_SIZE);
997        if (marker[0] != BLK_MARKER_DBHEADER) {
998            if (bid) {
999                // broken linked list
1000                fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1001                        "A block marker of the previous database header block id %"
1002                        _F64 " in "
1003                        "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1004                        bid, file->filename);
1005            }
1006            break;
1007        }
1008
1009        memcpy(&magic,
1010               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1011               sizeof(magic));
1012        magic = _endian_decode(magic);
1013        if (magic != FILEMGR_MAGIC) {
1014            // broken linked list
1015            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1016                    "A block magic value of the previous database header block id %" _F64 " in "
1017                    "a database file '%s' does NOT match FILEMGR_MAGIC!",
1018                    bid, file->filename);
1019            break;
1020        }
1021
1022        memcpy(&hdr_len,
1023               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1024               sizeof(hdr_len), sizeof(hdr_len));
1025        hdr_len = _endian_decode(hdr_len);
1026
1027        if (buf) {
1028            memcpy(buf, _buf, hdr_len);
1029        }
1030        memcpy(&_revnum, _buf + hdr_len,
1031               sizeof(filemgr_header_revnum_t));
1032        memcpy(&_seqnum,
1033               _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1034               sizeof(fdb_seqnum_t));
1035        *seqnum = _endian_decode(_seqnum);
1036        *len = hdr_len;
1037        found = 1;
1038        break;
1039    } while (false); // no repetition
1040
1041    if (!found) { // no other header found till end of file
1042        *len = 0;
1043    }
1044
1045    _filemgr_release_temp_buf(_buf);
1046
1047    return bid;
1048}
1049
1050fdb_status filemgr_close(struct filemgr *file, bool cleanup_cache_onclose,
1051                         const char *orig_file_name,
1052                         err_log_callback *log_callback)
1053{
1054    int rv = FDB_RESULT_SUCCESS;
1055
1056    spin_lock(&filemgr_openlock); // Grab the filemgr lock to avoid the race with
1057                                  // filemgr_open() because file->lock won't
1058                                  // prevent the race condition.
1059
1060    // remove filemgr structure if no thread refers to the file
1061    spin_lock(&file->lock);
1062    if (--(file->ref_count) == 0) {
1063        if (global_config.ncacheblock > 0 &&
1064            atomic_get_uint8_t(&file->status) != FILE_REMOVED_PENDING) {
1065            spin_unlock(&file->lock);
1066            // discard all dirty blocks belonged to this file
1067            bcache_remove_dirty_blocks(file);
1068        } else {
1069            // If the file is in pending removal (i.e., FILE_REMOVED_PENDING),
1070            // then its dirty block entries will be cleaned up in either
1071            // filemgr_free_func() or register_file_removal() below.
1072            spin_unlock(&file->lock);
1073        }
1074
1075        if (wal_is_initialized(file)) {
1076            wal_close(file);
1077        }
1078
1079        spin_lock(&file->lock);
1080        rv = file->ops->close(file->fd);
1081        if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING) {
1082            _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1083
1084            bool foreground_deletion = false;
1085
1086            // immediately remove file if background remove function is not set
1087            if (!lazy_file_deletion_enabled ||
1088                (file->new_file && file->new_file->in_place_compaction)) {
1089                // TODO: to avoid the scenario below, we prevent background
1090                //       deletion of in-place compacted files at this time.
1091                // 1) In-place compacted from 'A' to 'A.1'.
1092                // 2) Request to delete 'A'.
1093                // 3) Close 'A.1'; since 'A' is not deleted yet, 'A.1' is not renamed.
1094                // 4) User opens DB file using its original name 'A', not 'A.1'.
1095                // 5) Old file 'A' is opened, and then background thread deletes 'A'.
1096                // 6) Crash!
1097                remove(file->filename);
1098                foreground_deletion = true;
1099            }
1100
1101            // we can release lock becuase no one will open this file
1102            spin_unlock(&file->lock);
1103            struct hash_elem *ret = hash_remove(&hash, &file->e);
1104            fdb_assert(ret, 0, 0);
1105            spin_unlock(&filemgr_openlock);
1106
1107            if (foreground_deletion) {
1108                filemgr_free_func(&file->e);
1109            } else {
1110                register_file_removal(file, log_callback);
1111            }
1112            return (fdb_status) rv;
1113        } else {
1114            if (cleanup_cache_onclose) {
1115                _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1116                if (file->in_place_compaction && orig_file_name) {
1117                    struct hash_elem *elem = NULL;
1118                    struct filemgr query;
1119                    uint32_t old_file_refcount = 0;
1120
1121                    query.filename = (char *)orig_file_name;
1122                    elem = hash_find(&hash, &query.e);
1123
1124                    if (file->old_filename) {
1125                        struct hash_elem *elem_old = NULL;
1126                        struct filemgr query_old;
1127                        struct filemgr *old_file = NULL;
1128
1129                        // get old file's ref count if exists
1130                        query_old.filename = file->old_filename;
1131                        elem_old = hash_find(&hash, &query_old.e);
1132                        if (elem_old) {
1133                            old_file = _get_entry(elem_old, struct filemgr, e);
1134                            old_file_refcount = old_file->ref_count;
1135                        }
1136                    }
1137
1138                    // If old file is opened by other handle, renaming should be
1139                    // postponed. It will be renamed later by the handle referring
1140                    // to the old file.
1141                    if (!elem && old_file_refcount == 0 &&
1142                        is_file_removed(orig_file_name)) {
1143                        // If background file removal is not done yet, we postpone
1144                        // file renaming at this time.
1145                        if (rename(file->filename, orig_file_name) < 0) {
1146                            // Note that the renaming failure is not a critical
1147                            // issue because the last compacted file will be automatically
1148                            // identified and opened in the next fdb_open call.
1149                            _log_errno_str(file->ops, log_callback, FDB_RESULT_FILE_RENAME_FAIL,
1150                                           "CLOSE", file->filename);
1151                        }
1152                    }
1153                }
1154                spin_unlock(&file->lock);
1155                // Clean up global hash table, WAL index, and buffer cache.
1156                struct hash_elem *ret = hash_remove(&hash, &file->e);
1157                fdb_assert(ret, file, 0);
1158                spin_unlock(&filemgr_openlock);
1159                filemgr_free_func(&file->e);
1160                return (fdb_status) rv;
1161            } else {
1162                atomic_store_uint8_t(&file->status, FILE_CLOSED);
1163            }
1164        }
1165    }
1166
1167    _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1168
1169    spin_unlock(&file->lock);
1170    spin_unlock(&filemgr_openlock);
1171    return (fdb_status) rv;
1172}
1173
1174void filemgr_remove_all_buffer_blocks(struct filemgr *file)
1175{
1176    // remove all cached blocks
1177    if (global_config.ncacheblock > 0 && file->bcache) {
1178        bcache_remove_dirty_blocks(file);
1179        bcache_remove_clean_blocks(file);
1180        if (bcache_remove_file(file)) {
1181            file->bcache = NULL;
1182        }
1183    }
1184}
1185
1186void filemgr_free_func(struct hash_elem *h)
1187{
1188    struct filemgr *file = _get_entry(h, struct filemgr, e);
1189
1190    spin_lock(&file->lock);
1191    if (file->prefetch_status == FILEMGR_PREFETCH_RUNNING) {
1192        // prefetch thread is running
1193        void *ret;
1194        file->prefetch_status = FILEMGR_PREFETCH_ABORT;
1195        spin_unlock(&file->lock);
1196        // wait
1197        thread_join(file->prefetch_tid, &ret);
1198    } else {
1199        spin_unlock(&file->lock);
1200    }
1201
1202    // remove all cached blocks
1203    if (global_config.ncacheblock > 0 && file->bcache) {
1204        bcache_remove_dirty_blocks(file);
1205        bcache_remove_clean_blocks(file);
1206        if (bcache_remove_file(file)) {
1207            file->bcache = NULL;
1208        }
1209    }
1210
1211    if (file->kv_header) {
1212        // multi KV intance mode & KV header exists
1213        file->free_kv_header(file);
1214    }
1215
1216    // free global transaction
1217    wal_remove_transaction(file, &file->global_txn);
1218    free(file->global_txn.items);
1219    free(file->global_txn.wrapper);
1220
1221    // destroy WAL
1222    if (wal_is_initialized(file)) {
1223        wal_shutdown(file);
1224        size_t i = 0;
1225        size_t num_all_shards = wal_get_num_all_shards(file);
1226        // Free all WAL shards (including compactor's shard)
1227        for (; i < num_all_shards; ++i) {
1228            hash_free(&file->wal->key_shards[i].hash_bykey);
1229            spin_destroy(&file->wal->key_shards[i].lock);
1230            hash_free(&file->wal->seq_shards[i].hash_byseq);
1231            spin_destroy(&file->wal->seq_shards[i].lock);
1232        }
1233        spin_destroy(&file->wal->lock);
1234        atomic_destroy_uint32_t(&file->wal->size);
1235        atomic_destroy_uint32_t(&file->wal->num_flushable);
1236        atomic_destroy_uint64_t(&file->wal->datasize);
1237        free(file->wal->key_shards);
1238        free(file->wal->seq_shards);
1239    }
1240    free(file->wal);
1241
1242    // free filename and header
1243    free(file->filename);
1244    if (file->header.data) free(file->header.data);
1245    // free old filename if any
1246    free(file->old_filename);
1247
1248    // destroy locks
1249    spin_destroy(&file->lock);
1250
1251#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1252    plock_destroy(&file->plock);
1253#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1254    int i;
1255    for (i=0;i<DLOCK_MAX;++i) {
1256        mutex_destroy(&file->data_mutex[i]);
1257    }
1258#else
1259    int i;
1260    for (i=0;i<DLOCK_MAX;++i) {
1261        spin_destroy(&file->data_spinlock[i]);
1262    }
1263#endif //__FILEMGR_DATA_PARTIAL_LOCK
1264
1265    mutex_destroy(&file->writer_lock.mutex);
1266
1267    // free file structure
1268    free(file->config);
1269    free(file);
1270}
1271
1272// permanently remove file from cache (not just close)
1273// LCOV_EXCL_START
1274void filemgr_remove_file(struct filemgr *file)
1275{
1276    struct hash_elem *ret;
1277
1278    fdb_assert(file, file, NULL);
1279    fdb_assert(file->ref_count <= 0, file->ref_count, 0);
1280
1281    // remove from global hash table
1282    spin_lock(&filemgr_openlock);
1283    ret = hash_remove(&hash, &file->e);
1284    fdb_assert(ret, ret, NULL);
1285    spin_unlock(&filemgr_openlock);
1286
1287    if (!lazy_file_deletion_enabled ||
1288        (file->new_file && file->new_file->in_place_compaction)) {
1289        filemgr_free_func(&file->e);
1290    } else {
1291        register_file_removal(file, NULL);
1292    }
1293}
1294// LCOV_EXCL_STOP
1295
1296static
1297void *_filemgr_is_closed(struct hash_elem *h, void *ctx) {
1298    struct filemgr *file = _get_entry(h, struct filemgr, e);
1299    void *ret;
1300    spin_lock(&file->lock);
1301    if (file->ref_count != 0) {
1302        ret = (void *)file;
1303    } else {
1304        ret = NULL;
1305    }
1306    spin_unlock(&file->lock);
1307    return ret;
1308}
1309
1310fdb_status filemgr_shutdown()
1311{
1312    fdb_status ret = FDB_RESULT_SUCCESS;
1313    void *open_file;
1314    if (filemgr_initialized) {
1315
1316#ifndef SPIN_INITIALIZER
1317        // Windows: check if spin lock is already destroyed.
1318        if (InterlockedCompareExchange(&initial_lock_status, 1, 2) == 2) {
1319            spin_lock(&initial_lock);
1320        } else {
1321            // filemgr is already shut down
1322            return ret;
1323        }
1324#else
1325        spin_lock(&initial_lock);
1326#endif
1327
1328        if (!filemgr_initialized) {
1329            // filemgr is already shut down
1330#ifdef SPIN_INITIALIZER
1331            spin_unlock(&initial_lock);
1332#endif
1333            return ret;
1334        }
1335
1336        open_file = hash_scan(&hash, _filemgr_is_closed, NULL);
1337        if (!open_file) {
1338            hash_free_active(&hash, filemgr_free_func);
1339            if (global_config.ncacheblock > 0) {
1340                bcache_shutdown();
1341            }
1342            filemgr_initialized = 0;
1343#ifndef SPIN_INITIALIZER
1344            initial_lock_status = 0;
1345#else
1346            initial_lock = SPIN_INITIALIZER;
1347#endif
1348            _filemgr_shutdown_temp_buf();
1349            spin_unlock(&initial_lock);
1350#ifndef SPIN_INITIALIZER
1351            spin_destroy(&initial_lock);
1352#endif
1353        } else {
1354            spin_unlock(&initial_lock);
1355            ret = FDB_RESULT_FILE_IS_BUSY;
1356        }
1357    }
1358    return ret;
1359}
1360
1361bid_t filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)
1362{
1363    spin_lock(&file->lock);
1364    bid_t bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1365    atomic_add_uint64_t(&file->pos, file->blocksize);
1366
1367    if (global_config.ncacheblock <= 0) {
1368        // if block cache is turned off, write the allocated block before use
1369        uint8_t _buf = 0x0;
1370        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1371                                       atomic_get_uint64_t(&file->pos) - 1);
1372        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1373    }
1374    spin_unlock(&file->lock);
1375
1376    return bid;
1377}
1378
1379void filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin,
1380                            bid_t *end, err_log_callback *log_callback)
1381{
1382    spin_lock(&file->lock);
1383    *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1384    *end = *begin + nblock - 1;
1385    atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1386
1387    if (global_config.ncacheblock <= 0) {
1388        // if block cache is turned off, write the allocated block before use
1389        uint8_t _buf = 0x0;
1390        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1391                                       atomic_get_uint64_t(&file->pos) - 1);
1392        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1393    }
1394    spin_unlock(&file->lock);
1395}
1396
1397// atomically allocate NBLOCK blocks only when current file position is same to nextbid
1398bid_t filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock,
1399                                  bid_t *begin, bid_t *end,
1400                                  err_log_callback *log_callback)
1401{
1402    bid_t bid;
1403    spin_lock(&file->lock);
1404    bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1405    if (bid == nextbid) {
1406        *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1407        *end = *begin + nblock - 1;
1408        atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1409
1410        if (global_config.ncacheblock <= 0) {
1411            // if block cache is turned off, write the allocated block before use
1412            uint8_t _buf = 0x0;
1413            ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1414                                           atomic_get_uint64_t(&file->pos));
1415            _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1416        }
1417    }else{
1418        *begin = BLK_NOT_FOUND;
1419        *end = BLK_NOT_FOUND;
1420    }
1421    spin_unlock(&file->lock);
1422    return bid;
1423}
1424
1425#ifdef __CRC32
1426INLINE fdb_status _filemgr_crc32_check(struct filemgr *file, void *buf)
1427{
1428    if ( *((uint8_t*)buf + file->blocksize-1) == BLK_MARKER_BNODE ) {
1429        uint32_t crc_file, crc;
1430        memcpy(&crc_file, (uint8_t *) buf + BTREE_CRC_OFFSET, sizeof(crc_file));
1431        crc_file = _endian_decode(crc_file);
1432        memset((uint8_t *) buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1433        crc = chksum(buf, file->blocksize);
1434        if (crc != crc_file) {
1435            return FDB_RESULT_CHECKSUM_ERROR;
1436        }
1437    }
1438    return FDB_RESULT_SUCCESS;
1439}
1440#endif
1441
1442void filemgr_invalidate_block(struct filemgr *file, bid_t bid)
1443{
1444    if (global_config.ncacheblock > 0) {
1445        bcache_invalidate_block(file, bid);
1446    }
1447}
1448
1449fdb_status filemgr_read(struct filemgr *file, bid_t bid, void *buf,
1450                        err_log_callback *log_callback,
1451                        bool read_on_cache_miss)
1452{
1453    size_t lock_no;
1454    ssize_t r;
1455    uint64_t pos = bid * file->blocksize;
1456    fdb_status status = FDB_RESULT_SUCCESS;
1457    uint64_t curr_pos = atomic_get_uint64_t(&file->pos);
1458    fdb_assert(pos < curr_pos, pos, curr_pos);
1459
1460    if (global_config.ncacheblock > 0) {
1461        lock_no = bid % DLOCK_MAX;
1462        (void)lock_no;
1463
1464#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1465        plock_entry_t *plock_entry = NULL;
1466        bid_t is_writer = 0;
1467#endif
1468        bool locked = false;
1469        // Note: we don't need to grab lock for committed blocks
1470        // because they are immutable so that no writer will interfere and
1471        // overwrite dirty data
1472        if (filemgr_is_writable(file, bid)) {
1473#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1474            plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1475#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1476            mutex_lock(&file->data_mutex[lock_no]);
1477#else
1478            spin_lock(&file->data_spinlock[lock_no]);
1479#endif //__FILEMGR_DATA_PARTIAL_LOCK
1480            locked = true;
1481        }
1482
1483        r = bcache_read(file, bid, buf);
1484        if (r == 0) {
1485            // cache miss
1486            if (!read_on_cache_miss) {
1487                if (locked) {
1488#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1489                    plock_unlock(&file->plock, plock_entry);
1490#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1491                    mutex_unlock(&file->data_mutex[lock_no]);
1492#else
1493                    spin_unlock(&file->data_spinlock[lock_no]);
1494#endif //__FILEMGR_DATA_PARTIAL_LOCK
1495                }
1496                return FDB_RESULT_READ_FAIL;
1497            }
1498
1499            // if normal file, just read a block
1500            r = file->ops->pread(file->fd, buf, file->blocksize, pos);
1501            if (r != file->blocksize) {
1502                _log_errno_str(file->ops, log_callback,
1503                               (fdb_status) r, "READ", file->filename);
1504                if (locked) {
1505#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1506                    plock_unlock(&file->plock, plock_entry);
1507#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1508                    mutex_unlock(&file->data_mutex[lock_no]);
1509#else
1510                    spin_unlock(&file->data_spinlock[lock_no]);
1511#endif //__FILEMGR_DATA_PARTIAL_LOCK
1512                }
1513                return (fdb_status)r;
1514            }
1515#ifdef __CRC32
1516            status = _filemgr_crc32_check(file, buf);
1517            if (status != FDB_RESULT_SUCCESS) {
1518                _log_errno_str(file->ops, log_callback, status, "READ",
1519                        file->filename);
1520                if (locked) {
1521#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1522                    plock_unlock(&file->plock, plock_entry);
1523#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1524                    mutex_unlock(&file->data_mutex[lock_no]);
1525#else
1526                    spin_unlock(&file->data_spinlock[lock_no]);
1527#endif //__FILEMGR_DATA_PARTIAL_LOCK
1528                }
1529                return status;
1530            }
1531#endif
1532            r = bcache_write(file, bid, buf, BCACHE_REQ_CLEAN);
1533            if (r != global_config.blocksize) {
1534                if (locked) {
1535#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1536                    plock_unlock(&file->plock, plock_entry);
1537#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1538                    mutex_unlock(&file->data_mutex[lock_no]);
1539#else
1540                    spin_unlock(&file->data_spinlock[lock_no]);
1541#endif //__FILEMGR_DATA_PARTIAL_LOCK
1542                }
1543                _log_errno_str(file->ops, log_callback,
1544                               (fdb_status) r, "WRITE", file->filename);
1545                return FDB_RESULT_WRITE_FAIL;
1546            }
1547        }
1548        if (locked) {
1549#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1550            plock_unlock(&file->plock, plock_entry);
1551#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1552            mutex_unlock(&file->data_mutex[lock_no]);
1553#else
1554            spin_unlock(&file->data_spinlock[lock_no]);
1555#endif //__FILEMGR_DATA_PARTIAL_LOCK
1556        }
1557    } else {
1558        if (!read_on_cache_miss) {
1559            return FDB_RESULT_READ_FAIL;
1560        }
1561
1562        r = file->ops->pread(file->fd, buf, file->blocksize, pos);
1563        if (r != file->blocksize) {
1564            _log_errno_str(file->ops, log_callback, (fdb_status) r, "READ",
1565                           file->filename);
1566            return (fdb_status)r;
1567        }
1568
1569#ifdef __CRC32
1570        status = _filemgr_crc32_check(file, buf);
1571        if (status != FDB_RESULT_SUCCESS) {
1572            _log_errno_str(file->ops, log_callback, status, "READ",
1573                           file->filename);
1574            return status;
1575        }
1576#endif
1577    }
1578    return status;
1579}
1580
1581fdb_status filemgr_write_offset(struct filemgr *file, bid_t bid,
1582                                uint64_t offset, uint64_t len, void *buf,
1583                                err_log_callback *log_callback)
1584{
1585    fdb_assert(offset + len <= file->blocksize, offset + len, file);
1586
1587    size_t lock_no;
1588    ssize_t r = 0;
1589    uint64_t pos = bid * file->blocksize + offset;
1590    uint64_t curr_commit_pos = atomic_get_uint64_t(&file->last_commit);
1591    fdb_assert(pos >= curr_commit_pos, pos, curr_commit_pos);
1592
1593    if (global_config.ncacheblock > 0) {
1594        lock_no = bid % DLOCK_MAX;
1595        (void)lock_no;
1596
1597        bool locked = false;
1598#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1599        plock_entry_t *plock_entry;
1600        bid_t is_writer = 1;
1601        plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1602#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1603        mutex_lock(&file->data_mutex[lock_no]);
1604#else
1605        spin_lock(&file->data_spinlock[lock_no]);
1606#endif //__FILEMGR_DATA_PARTIAL_LOCK
1607        locked = true;
1608
1609        if (len == file->blocksize) {
1610            // write entire block .. we don't need to read previous block
1611            r = bcache_write(file, bid, buf, BCACHE_REQ_DIRTY);
1612            if (r != global_config.blocksize) {
1613                if (locked) {
1614#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1615                    plock_unlock(&file->plock, plock_entry);
1616#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1617                    mutex_unlock(&file->data_mutex[lock_no]);
1618#else
1619                    spin_unlock(&file->data_spinlock[lock_no]);
1620#endif //__FILEMGR_DATA_PARTIAL_LOCK
1621                }
1622                _log_errno_str(file->ops, log_callback,
1623                               (fdb_status) r, "WRITE", file->filename);
1624                return FDB_RESULT_WRITE_FAIL;
1625            }
1626        } else {
1627            // partially write buffer cache first
1628            r = bcache_write_partial(file, bid, buf, offset, len);
1629            if (r == 0) {
1630                // cache miss
1631                // write partially .. we have to read previous contents of the block
1632                uint64_t cur_file_pos = file->ops->goto_eof(file->fd);
1633                bid_t cur_file_last_bid = cur_file_pos / file->blocksize;
1634                void *_buf = _filemgr_get_temp_buf();
1635
1636                if (bid >= cur_file_last_bid) {
1637                    // this is the first time to write this block
1638                    // we don't need to read previous block from file.
1639                } else {
1640                    r = file->ops->pread(file->fd, _buf, file->blocksize,
1641                                         bid * file->blocksize);
1642                    if (r != file->blocksize) {
1643                        if (locked) {
1644#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1645                            plock_unlock(&file->plock, plock_entry);
1646#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1647                            mutex_unlock(&file->data_mutex[lock_no]);
1648#else
1649                            spin_unlock(&file->data_spinlock[lock_no]);
1650#endif //__FILEMGR_DATA_PARTIAL_LOCK
1651                        }
1652                        _filemgr_release_temp_buf(_buf);
1653                        _log_errno_str(file->ops, log_callback, (fdb_status) r,
1654                                       "READ", file->filename);
1655                        return FDB_RESULT_READ_FAIL;
1656                    }
1657                }
1658                memcpy((uint8_t *)_buf + offset, buf, len);
1659                r = bcache_write(file, bid, _buf, BCACHE_REQ_DIRTY);
1660                if (r != global_config.blocksize) {
1661                    if (locked) {
1662#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1663                        plock_unlock(&file->plock, plock_entry);
1664#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1665                        mutex_unlock(&file->data_mutex[lock_no]);
1666#else
1667                        spin_unlock(&file->data_spinlock[lock_no]);
1668#endif //__FILEMGR_DATA_PARTIAL_LOCK
1669                    }
1670                    _filemgr_release_temp_buf(_buf);
1671                    _log_errno_str(file->ops, log_callback,
1672                            (fdb_status) r, "WRITE", file->filename);
1673                    return FDB_RESULT_WRITE_FAIL;
1674                }
1675
1676                _filemgr_release_temp_buf(_buf);
1677            } // cache miss
1678        } // full block or partial block
1679
1680        if (locked) {
1681#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1682            plock_unlock(&file->plock, plock_entry);
1683#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1684            mutex_unlock(&file->data_mutex[lock_no]);
1685#else
1686            spin_unlock(&file->data_spinlock[lock_no]);
1687#endif //__FILEMGR_DATA_PARTIAL_LOCK
1688        }
1689
1690    } else { // block cache disabled
1691
1692#ifdef __CRC32
1693        if (len == file->blocksize) {
1694            uint8_t marker = *((uint8_t*)buf + file->blocksize - 1);
1695            if (marker == BLK_MARKER_BNODE) {
1696                memset((uint8_t *)buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1697                uint32_t crc32 = chksum(buf, file->blocksize);
1698                crc32 = _endian_encode(crc32);
1699                memcpy((uint8_t *)buf + BTREE_CRC_OFFSET, &crc32, sizeof(crc32));
1700            }
1701        }
1702#endif
1703
1704        r = file->ops->pwrite(file->fd, buf, len, pos);
1705        _log_errno_str(file->ops, log_callback, (fdb_status) r, "WRITE", file->filename);
1706        if ((uint64_t)r != len) {
1707            return FDB_RESULT_WRITE_FAIL;
1708        }
1709    } // block cache check
1710    return FDB_RESULT_SUCCESS;
1711}
1712
1713fdb_status filemgr_write(struct filemgr *file, bid_t bid, void *buf,
1714                   err_log_callback *log_callback)
1715{
1716    return filemgr_write_offset(file, bid, 0, file->blocksize, buf,
1717                                log_callback);
1718}
1719
1720fdb_status filemgr_commit(struct filemgr *file,
1721                          err_log_callback *log_callback)
1722{
1723    uint16_t header_len = file->header.size;
1724    uint16_t _header_len;
1725    bid_t _prev_bid;
1726    fdb_seqnum_t _seqnum;
1727    filemgr_header_revnum_t _revnum;
1728    int result = FDB_RESULT_SUCCESS;
1729    filemgr_magic_t magic = FILEMGR_MAGIC;
1730    filemgr_magic_t _magic;
1731
1732    if (global_config.ncacheblock > 0) {
1733        result = bcache_flush(file);
1734        if (result != FDB_RESULT_SUCCESS) {
1735            _log_errno_str(file->ops, log_callback, (fdb_status) result,
1736                           "FLUSH", file->filename);
1737            return (fdb_status)result;
1738        }
1739    }
1740
1741    spin_lock(&file->lock);
1742
1743    if (file->header.size > 0 && file->header.data) {
1744        void *buf = _filemgr_get_temp_buf();
1745        uint8_t marker[BLK_MARKER_SIZE];
1746
1747        // [header data]:        'header_len' bytes   <---+
1748        // [header revnum]:      8 bytes                  |
1749        // [default KVS seqnum]: 8 bytes                  |
1750        // ...                                            |
1751        // (empty)                                    blocksize
1752        // ...                                            |
1753        // [prev header bid]:    8 bytes                  |
1754        // [header length]:      2 bytes                  |
1755        // [magic number]:       8 bytes                  |
1756        // [block marker]:       1 byte               <---+
1757
1758        // header data
1759        memcpy(buf, file->header.data, header_len);
1760        // header rev number
1761        _revnum = _endian_encode(file->header.revnum);
1762        memcpy((uint8_t *)buf + header_len, &_revnum,
1763               sizeof(filemgr_header_revnum_t));
1764        // file's sequence number (default KVS seqnum)
1765        _seqnum = _endian_encode(file->header.seqnum);
1766        memcpy((uint8_t *)buf + header_len + sizeof(filemgr_header_revnum_t),
1767               &_seqnum, sizeof(fdb_seqnum_t));
1768
1769        // prev header bid
1770        _prev_bid = _endian_encode(atomic_get_uint64_t(&file->header.bid));
1771        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1772               - sizeof(header_len) - sizeof(_prev_bid) - BLK_MARKER_SIZE),
1773               &_prev_bid, sizeof(_prev_bid));
1774        // header length
1775        _header_len = _endian_encode(header_len);
1776        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1777               - sizeof(header_len) - BLK_MARKER_SIZE),
1778               &_header_len, sizeof(header_len));
1779        // magic number
1780        _magic = _endian_encode(magic);
1781        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1782               - BLK_MARKER_SIZE), &_magic, sizeof(magic));
1783
1784        // marker
1785        memset(marker, BLK_MARKER_DBHEADER, BLK_MARKER_SIZE);
1786        memcpy((uint8_t *)buf + file->blocksize - BLK_MARKER_SIZE,
1787               marker, BLK_MARKER_SIZE);
1788
1789        ssize_t rv = file->ops->pwrite(file->fd, buf, file->blocksize,
1790                                       atomic_get_uint64_t(&file->pos));
1791        _log_errno_str(file->ops, log_callback, (fdb_status) rv,
1792                       "WRITE", file->filename);
1793        if (rv != file->blocksize) {
1794            _filemgr_release_temp_buf(buf);
1795            spin_unlock(&file->lock);
1796            return FDB_RESULT_WRITE_FAIL;
1797        }
1798        atomic_store_uint64_t(&file->header.bid,
1799                              atomic_get_uint64_t(&file->pos) / file->blocksize);
1800        atomic_add_uint64_t(&file->pos, file->blocksize);
1801
1802        atomic_store_uint64_t(&file->header.dirty_idtree_root, BLK_NOT_FOUND);
1803        atomic_store_uint64_t(&file->header.dirty_seqtree_root, BLK_NOT_FOUND);
1804
1805        _filemgr_release_temp_buf(buf);
1806    }
1807    // race condition?
1808    atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
1809
1810    spin_unlock(&file->lock);
1811
1812    if (file->fflags & FILEMGR_SYNC) {
1813        result = file->ops->fsync(file->fd);
1814        _log_errno_str(file->ops, log_callback, (fdb_status)result, "FSYNC", file->filename);
1815    }
1816    return (fdb_status) result;
1817}
1818
1819fdb_status filemgr_sync(struct filemgr *file, err_log_callback *log_callback)
1820{
1821    fdb_status result = FDB_RESULT_SUCCESS;
1822    if (global_config.ncacheblock > 0) {
1823        result = bcache_flush(file);
1824        if (result != FDB_RESULT_SUCCESS) {
1825            _log_errno_str(file->ops, log_callback, (fdb_status) result,
1826                           "FLUSH", file->filename);
1827            return result;
1828        }
1829    }
1830
1831    if (file->fflags & FILEMGR_SYNC) {
1832        int rv = file->ops->fsync(file->fd);
1833        _log_errno_str(file->ops, log_callback, (fdb_status)rv, "FSYNC", file->filename);
1834        return (fdb_status) rv;
1835    }
1836    return result;
1837}
1838
1839int filemgr_update_file_status(struct filemgr *file, file_status_t status,
1840                                char *old_filename)
1841{
1842    int ret = 1;
1843    spin_lock(&file->lock);
1844    atomic_store_uint8_t(&file->status, status);
1845    if (old_filename) {
1846        if (!file->old_filename) {
1847            file->old_filename = old_filename;
1848        } else {
1849            ret = 0;
1850            fdb_assert(file->ref_count, file->ref_count, 0);
1851            free(old_filename);
1852        }
1853    }
1854    spin_unlock(&file->lock);
1855    return ret;
1856}
1857
1858void filemgr_set_compaction_state(struct filemgr *old_file, struct filemgr *new_file,
1859                                  file_status_t status)
1860{
1861    spin_lock(&old_file->lock);
1862    old_file->new_file = new_file;
1863    atomic_store_uint8_t(&old_file->status, status);
1864    spin_unlock(&old_file->lock);
1865}
1866
1867// Check if there is a file that still points to the old_file that is being
1868// compacted away. If so open the file and return its pointer.
1869static
1870void *_filemgr_check_stale_link(struct hash_elem *h, void *ctx) {
1871    struct filemgr *cur_file = (struct filemgr *)ctx;
1872    struct filemgr *file = _get_entry(h, struct filemgr, e);
1873    spin_lock(&file->lock);
1874    if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING &&
1875        file->new_file == cur_file) {
1876        // Incrementing reference counter below is the same as filemgr_open()
1877        // We need to do this to ensure that the pointer returned does not
1878        // get freed outside the filemgr_open lock
1879        file->ref_count++;
1880        spin_unlock(&file->lock);
1881        return (void *)file;
1882    }
1883    spin_unlock(&file->lock);
1884    return (void *)NULL;
1885}
1886
1887struct filemgr *filemgr_search_stale_links(struct filemgr *cur_file) {
1888    struct filemgr *very_old_file;
1889    spin_lock(&filemgr_openlock);
1890    very_old_file = (struct filemgr *)hash_scan(&hash,
1891                                         _filemgr_check_stale_link, cur_file);
1892    spin_unlock(&filemgr_openlock);
1893    return very_old_file;
1894}
1895
1896char *filemgr_redirect_old_file(struct filemgr *very_old_file,
1897                                     struct filemgr *new_file,
1898                                     filemgr_redirect_hdr_func
1899                                     redirect_header_func) {
1900    size_t old_header_len, new_header_len;
1901    uint16_t new_filename_len;
1902    char *past_filename;
1903    spin_lock(&very_old_file->lock);
1904    fdb_assert(very_old_file->header.size, very_old_file->header.size, 0);
1905    fdb_assert(very_old_file->new_file, very_old_file->new_file, 0);
1906    old_header_len = very_old_file->header.size;
1907    new_filename_len = strlen(new_file->filename);
1908    // Find out the new DB header length with new_file's filename
1909    new_header_len = old_header_len - strlen(very_old_file->new_file->filename)
1910        + new_filename_len;
1911    // As we are going to change the new_filename field in the DB header of the
1912    // very_old_file, maybe reallocate DB header buf to accomodate bigger value
1913    if (new_header_len > old_header_len) {
1914        very_old_file->header.data = realloc(very_old_file->header.data,
1915                new_header_len);
1916    }
1917    very_old_file->new_file = new_file; // Re-direct very_old_file to new_file
1918    past_filename = redirect_header_func((uint8_t *)very_old_file->header.data,
1919            new_file->filename, new_filename_len + 1);//Update in-memory header
1920    very_old_file->header.size = new_header_len;
1921    ++(very_old_file->header.revnum);
1922
1923    spin_unlock(&very_old_file->lock);
1924    return past_filename;
1925}
1926
1927void filemgr_remove_pending(struct filemgr *old_file, struct filemgr *new_file)
1928{
1929    fdb_assert(new_file, new_file, old_file);
1930
1931    spin_lock(&old_file->lock);
1932    if (old_file->ref_count > 0) {
1933        // delay removing
1934        old_file->new_file = new_file;
1935        atomic_store_uint8_t(&old_file->status, FILE_REMOVED_PENDING);
1936        spin_unlock(&old_file->lock);
1937    } else {
1938        // immediatly remove
1939        // LCOV_EXCL_START
1940        spin_unlock(&old_file->lock);
1941
1942        if (!lazy_file_deletion_enabled ||
1943            (old_file->new_file && old_file->new_file->in_place_compaction)) {
1944            remove(old_file->filename);
1945        }
1946        filemgr_remove_file(old_file);
1947        // LCOV_EXCL_STOP
1948    }
1949}
1950
1951// migrate default kv store stats over to new_file
1952struct kvs_ops_stat *filemgr_migrate_op_stats(struct filemgr *old_file,
1953                                              struct filemgr *new_file,
1954                                              struct kvs_info *kvs)
1955{
1956    kvs_ops_stat *ret = NULL;
1957    fdb_assert(new_file, new_file, old_file);
1958
1959    spin_lock(&old_file->lock);
1960    new_file->header.op_stat = old_file->header.op_stat;
1961    ret = &new_file->header.op_stat;
1962    spin_unlock(&old_file->lock);
1963    return ret;
1964}
1965
1966// Note: filemgr_openlock should be held before calling this function.
1967fdb_status filemgr_destroy_file(char *filename,
1968                                struct filemgr_config *config,
1969                                struct hash *destroy_file_set)
1970{
1971    struct filemgr *file = NULL;
1972    struct hash to_destroy_files;
1973    struct hash *destroy_set = (destroy_file_set ? destroy_file_set :
1974                                                  &to_destroy_files);
1975    struct filemgr query;
1976    struct hash_elem *e = NULL;
1977    fdb_status status = FDB_RESULT_SUCCESS;
1978    char *old_filename = NULL;
1979
1980    if (!destroy_file_set) { // top level or non-recursive call
1981        hash_init(destroy_set, NBUCKET, _file_hash, _file_cmp);
1982    }
1983
1984    query.filename = filename;
1985    // check whether file is already being destroyed in parent recursive call
1986    e = hash_find(destroy_set, &query.e);
1987    if (e) { // Duplicate filename found, nothing to be done in this call
1988        if (!destroy_file_set) { // top level or non-recursive call
1989            hash_free(destroy_set);
1990        }
1991        return status;
1992    } else {
1993        // Remember file. Stack value ok IFF single direction recursion
1994        hash_insert(destroy_set, &query.e);
1995    }
1996
1997    // check global list of known files to see if it is already opened or not
1998    e = hash_find(&hash, &query.e);
1999    if (e) {
2000        // already opened (return existing structure)
2001        file = _get_entry(e, struct filemgr, e);
2002
2003        spin_lock(&file->lock);
2004        if (file->ref_count) {
2005            spin_unlock(&file->lock);
2006            status = FDB_RESULT_FILE_IS_BUSY;
2007            if (!destroy_file_set) { // top level or non-recursive call
2008                hash_free(destroy_set);
2009            }
2010            return status;
2011        }
2012        spin_unlock(&file->lock);
2013        if (file->old_filename) {
2014            status = filemgr_destroy_file(file->old_filename, config,
2015                                          destroy_set);
2016            if (status != FDB_RESULT_SUCCESS) {
2017                if (!destroy_file_set) { // top level or non-recursive call
2018                    hash_free(destroy_set);
2019                }
2020                return status;
2021            }
2022        }
2023
2024        // Cleanup file from in-memory as well as on-disk
2025        e = hash_remove(&hash, &file->e);
2026        fdb_assert(e, e, 0);
2027        filemgr_free_func(&file->e);
2028        if (filemgr_does_file_exist(filename) == FDB_RESULT_SUCCESS) {
2029            if (remove(filename)) {
2030                status = FDB_RESULT_FILE_REMOVE_FAIL;
2031            }
2032        }
2033    } else { // file not in memory, read on-disk to destroy older versions..
2034        file = (struct filemgr *)alca(struct filemgr, 1);
2035        file->filename = filename;
2036        file->ops = get_filemgr_ops();
2037        file->fd = file->ops->open(file->filename, O_RDWR, 0666);
2038        file->blocksize = global_config.blocksize;
2039        if (file->fd < 0) {
2040            if (file->fd != FDB_RESULT_NO_SUCH_FILE) {
2041                if (!destroy_file_set) { // top level or non-recursive call
2042                    hash_free(destroy_set);
2043                }
2044                return FDB_RESULT_OPEN_FAIL;
2045            }
2046        } else { // file successfully opened, seek to end to get DB header
2047            cs_off_t offset = file->ops->goto_eof(file->fd);
2048            if (offset == FDB_RESULT_SEEK_FAIL) {
2049                if (!destroy_file_set) { // top level or non-recursive call
2050                    hash_free(destroy_set);
2051                }
2052                return FDB_RESULT_SEEK_FAIL;
2053            } else { // Need to read DB header which contains old filename
2054                atomic_store_uint64_t(&file->pos, offset);
2055                status = _filemgr_read_header(file, NULL);
2056                if (status != FDB_RESULT_SUCCESS) {
2057                    if (!destroy_file_set) { // top level or non-recursive call
2058                        hash_free(destroy_set);
2059                    }
2060                    file->ops->close(file->fd);
2061                    return status;
2062                }
2063                if (file->header.data) {
2064                    uint16_t *new_filename_len_ptr = (uint16_t *)((char *)
2065                                                     file->header.data + 64);
2066                    uint16_t new_filename_len =
2067                                      _endian_decode(*new_filename_len_ptr);
2068                    uint16_t *old_filename_len_ptr = (uint16_t *)((char *)
2069                                                     file->header.data + 66);
2070                    uint16_t old_filename_len =
2071                                      _endian_decode(*old_filename_len_ptr);
2072                    old_filename = (char *)file->header.data + 68
2073                                   + new_filename_len;
2074                    if (old_filename_len) {
2075                        status = filemgr_destroy_file(old_filename, config,
2076                                                      destroy_set);
2077                    }
2078                    free(file->header.data);
2079                }
2080                file->ops->close(file->fd);
2081                if (status == FDB_RESULT_SUCCESS) {
2082                    if (filemgr_does_file_exist(filename)
2083                                               == FDB_RESULT_SUCCESS) {
2084                        if (remove(filename)) {
2085                            status = FDB_RESULT_FILE_REMOVE_FAIL;
2086                        }
2087                    }
2088                }
2089            }
2090        }
2091    }
2092
2093    if (!destroy_file_set) { // top level or non-recursive call
2094        hash_free(destroy_set);
2095    }
2096
2097    return status;
2098}
2099
2100bool filemgr_is_rollback_on(struct filemgr *file)
2101{
2102    bool rv;
2103    spin_lock(&file->lock);
2104    rv = (file->fflags & FILEMGR_ROLLBACK_IN_PROG);
2105    spin_unlock(&file->lock);
2106    return rv;
2107}
2108
2109void filemgr_set_rollback(struct filemgr *file, uint8_t new_val)
2110{
2111    spin_lock(&file->lock);
2112    if (new_val) {
2113        file->fflags |= FILEMGR_ROLLBACK_IN_PROG;
2114    } else {
2115        file->fflags &= ~FILEMGR_ROLLBACK_IN_PROG;
2116    }
2117    spin_unlock(&file->lock);
2118}
2119
2120void filemgr_set_in_place_compaction(struct filemgr *file,
2121                                     bool in_place_compaction) {
2122    spin_lock(&file->lock);
2123    file->in_place_compaction = in_place_compaction;
2124    spin_unlock(&file->lock);
2125}
2126
2127bool filemgr_is_in_place_compaction_set(struct filemgr *file)
2128{
2129    bool ret = false;
2130    spin_lock(&file->lock);
2131    ret = file->in_place_compaction;
2132    spin_unlock(&file->lock);
2133    return ret;
2134}
2135
2136void filemgr_mutex_openlock(struct filemgr_config *config)
2137{
2138    filemgr_init(config);
2139
2140    spin_lock(&filemgr_openlock);
2141}
2142
2143void filemgr_mutex_openunlock(void)
2144{
2145    spin_unlock(&filemgr_openlock);
2146}
2147
2148void filemgr_mutex_lock(struct filemgr *file)
2149{
2150    mutex_lock(&file->writer_lock.mutex);
2151    file->writer_lock.locked = true;
2152}
2153
2154bool filemgr_mutex_trylock(struct filemgr *file) {
2155    if (mutex_trylock(&file->writer_lock.mutex)) {
2156        file->writer_lock.locked = true;
2157        return true;
2158    }
2159    return false;
2160}
2161
2162void filemgr_mutex_unlock(struct filemgr *file)
2163{
2164    if (file->writer_lock.locked) {
2165        file->writer_lock.locked = false;
2166        mutex_unlock(&file->writer_lock.mutex);
2167    }
2168}
2169
2170void filemgr_set_dirty_root(struct filemgr *file,
2171                            bid_t dirty_idtree_root,
2172                            bid_t dirty_seqtree_root)
2173{
2174    atomic_store_uint64_t(&file->header.dirty_idtree_root, dirty_idtree_root);
2175    atomic_store_uint64_t(&file->header.dirty_seqtree_root, dirty_seqtree_root);
2176}
2177
2178bool filemgr_is_commit_header(void *head_buffer, size_t blocksize)
2179{
2180    uint8_t marker[BLK_MARKER_SIZE];
2181    filemgr_magic_t magic;
2182    marker[0] = *(((uint8_t *)head_buffer)
2183                 + blocksize - BLK_MARKER_SIZE);
2184    if (marker[0] != BLK_MARKER_DBHEADER) {
2185        return false;
2186    }
2187
2188    memcpy(&magic, (uint8_t *) head_buffer
2189            + blocksize - BLK_MARKER_SIZE - sizeof(magic), sizeof(magic));
2190    magic = _endian_decode(magic);
2191
2192    return (magic == FILEMGR_MAGIC);
2193}
2194
2195void _kvs_stat_set(struct filemgr *file,
2196                   fdb_kvs_id_t kv_id,
2197                   struct kvs_stat stat)
2198{
2199    if (kv_id == 0) {
2200        spin_lock(&file->lock);
2201        file->header.stat = stat;
2202        spin_unlock(&file->lock);
2203    } else {
2204        struct avl_node *a;
2205        struct kvs_node query, *node;
2206        struct kvs_header *kv_header = file->kv_header;
2207
2208        spin_lock(&kv_header->lock);
2209        query.id = kv_id;
2210        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2211        if (a) {
2212            node = _get_entry(a, struct kvs_node, avl_id);
2213            node->stat = stat;
2214        }
2215        spin_unlock(&kv_header->lock);
2216    }
2217}
2218
2219void _kvs_stat_update_attr(struct filemgr *file,
2220                           fdb_kvs_id_t kv_id,
2221                           kvs_stat_attr_t attr,
2222                           int delta)
2223{
2224    spin_t *lock = NULL;
2225    struct kvs_stat *stat;
2226
2227    if (kv_id == 0) {
2228        stat = &file->header.stat;
2229        lock = &file->lock;
2230        spin_lock(lock);
2231    } else {
2232        struct avl_node *a;
2233        struct kvs_node query, *node;
2234        struct kvs_header *kv_header = file->kv_header;
2235
2236        lock = &kv_header->lock;
2237        spin_lock(lock);
2238        query.id = kv_id;
2239        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2240        if (!a) {
2241            // KV instance corresponding to the kv_id is already removed
2242            spin_unlock(lock);
2243            return;
2244        }
2245        node = _get_entry(a, struct kvs_node, avl_id);
2246        stat = &node->stat;
2247    }
2248
2249    if (attr == KVS_STAT_DATASIZE) {
2250        stat->datasize += delta;
2251    } else if (attr == KVS_STAT_NDOCS) {
2252        stat->ndocs += delta;
2253    } else if (attr == KVS_STAT_NLIVENODES) {
2254        stat->nlivenodes += delta;
2255    } else if (attr == KVS_STAT_WAL_NDELETES) {
2256        stat->wal_ndeletes += delta;
2257    } else if (attr == KVS_STAT_WAL_NDOCS) {
2258        stat->wal_ndocs += delta;
2259    }
2260    spin_unlock(lock);
2261}
2262
2263int _kvs_stat_get_kv_header(struct kvs_header *kv_header,
2264                            fdb_kvs_id_t kv_id,
2265                            struct kvs_stat *stat)
2266{
2267    int ret = 0;
2268    struct avl_node *a;
2269    struct kvs_node query, *node;
2270
2271    query.id = kv_id;
2272    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2273    if (a) {
2274        node = _get_entry(a, struct kvs_node, avl_id);
2275        *stat = node->stat;
2276    } else {
2277        ret = -1;
2278    }
2279    return ret;
2280}
2281
2282int _kvs_stat_get(struct filemgr *file,
2283                  fdb_kvs_id_t kv_id,
2284                  struct kvs_stat *stat)
2285{
2286    int ret = 0;
2287
2288    if (kv_id == 0) {
2289        spin_lock(&file->lock);
2290        *stat = file->header.stat;
2291        spin_unlock(&file->lock);
2292    } else {
2293        struct kvs_header *kv_header = file->kv_header;
2294
2295        spin_lock(&kv_header->lock);
2296        ret = _kvs_stat_get_kv_header(kv_header, kv_id, stat);
2297        spin_unlock(&kv_header->lock);
2298    }
2299
2300    return ret;
2301}
2302
2303uint64_t _kvs_stat_get_sum(struct filemgr *file,
2304                           kvs_stat_attr_t attr)
2305{
2306    struct avl_node *a;
2307    struct kvs_node *node;
2308    struct kvs_header *kv_header = file->kv_header;
2309
2310    uint64_t ret = 0;
2311    spin_lock(&file->lock);
2312    if (attr == KVS_STAT_DATASIZE) {
2313        ret += file->header.stat.datasize;
2314    } else if (attr == KVS_STAT_NDOCS) {
2315        ret += file->header.stat.ndocs;
2316    } else if (attr == KVS_STAT_NLIVENODES) {
2317        ret += file->header.stat.nlivenodes;
2318    } else if (attr == KVS_STAT_WAL_NDELETES) {
2319        ret += file->header.stat.wal_ndeletes;
2320    } else if (attr == KVS_STAT_WAL_NDOCS) {
2321        ret += file->header.stat.wal_ndocs;
2322    }
2323    spin_unlock(&file->lock);
2324
2325    if (kv_header) {
2326        spin_lock(&kv_header->lock);
2327        a = avl_first(kv_header->idx_id);
2328        while (a) {
2329            node = _get_entry(a, struct kvs_node, avl_id);
2330            a = avl_next(&node->avl_id);
2331
2332            if (attr == KVS_STAT_DATASIZE) {
2333                ret += node->stat.datasize;
2334            } else if (attr == KVS_STAT_NDOCS) {
2335                ret += node->stat.ndocs;
2336            } else if (attr == KVS_STAT_NLIVENODES) {
2337                ret += node->stat.nlivenodes;
2338            } else if (attr == KVS_STAT_WAL_NDELETES) {
2339                ret += node->stat.wal_ndeletes;
2340            } else if (attr == KVS_STAT_WAL_NDOCS) {
2341                ret += node->stat.wal_ndocs;
2342            }
2343        }
2344        spin_unlock(&kv_header->lock);
2345    }
2346
2347    return ret;
2348}
2349
2350int _kvs_ops_stat_get_kv_header(struct kvs_header *kv_header,
2351                                fdb_kvs_id_t kv_id,
2352                                struct kvs_ops_stat *stat)
2353{
2354    int ret = 0;
2355    struct avl_node *a;
2356    struct kvs_node query, *node;
2357
2358    query.id = kv_id;
2359    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2360    if (a) {
2361        node = _get_entry(a, struct kvs_node, avl_id);
2362        *stat = node->op_stat;
2363    } else {
2364        ret = -1;
2365    }
2366    return ret;
2367}
2368
2369int _kvs_ops_stat_get(struct filemgr *file,
2370                      fdb_kvs_id_t kv_id,
2371                      struct kvs_ops_stat *stat)
2372{
2373    int ret = 0;
2374
2375    if (kv_id == 0) {
2376        spin_lock(&file->lock);
2377        *stat = file->header.op_stat;
2378        spin_unlock(&file->lock);
2379    } else {
2380        struct kvs_header *kv_header = file->kv_header;
2381
2382        spin_lock(&kv_header->lock);
2383        ret = _kvs_ops_stat_get_kv_header(kv_header, kv_id, stat);
2384        spin_unlock(&kv_header->lock);
2385    }
2386
2387    return ret;
2388}
2389
2390void _init_op_stats(struct kvs_ops_stat *stat) {
2391    atomic_init_uint64_t(&stat->num_sets, 0);
2392    atomic_init_uint64_t(&stat->num_dels, 0);
2393    atomic_init_uint64_t(&stat->num_commits, 0);
2394    atomic_init_uint64_t(&stat->num_compacts, 0);
2395    atomic_init_uint64_t(&stat->num_gets, 0);
2396    atomic_init_uint64_t(&stat->num_iterator_gets, 0);
2397    atomic_init_uint64_t(&stat->num_iterator_moves, 0);
2398}
2399
2400struct kvs_ops_stat *filemgr_get_ops_stats(struct filemgr *file,
2401                                           struct kvs_info *kvs)
2402{
2403    struct kvs_ops_stat *stat = NULL;
2404    if (!kvs || (kvs && kvs->id == 0)) {
2405        return &file->header.op_stat;
2406    } else {
2407        struct kvs_header *kv_header = file->kv_header;
2408        struct avl_node *a;
2409        struct kvs_node query, *node;
2410        spin_lock(&kv_header->lock);
2411        query.id = kvs->id;
2412        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2413        if (a) {
2414            node = _get_entry(a, struct kvs_node, avl_id);
2415            stat = &node->op_stat;
2416        }
2417        spin_unlock(&kv_header->lock);
2418    }
2419    return stat;
2420}
2421
2422void buf2kvid(size_t chunksize, void *buf, fdb_kvs_id_t *id)
2423{
2424    size_t size_id = sizeof(fdb_kvs_id_t);
2425    fdb_kvs_id_t temp;
2426
2427    if (chunksize == size_id) {
2428        temp = *((fdb_kvs_id_t*)buf);
2429    } else if (chunksize < size_id) {
2430        temp = 0;
2431        memcpy((uint8_t*)&temp + (size_id - chunksize), buf, chunksize);
2432    } else { // chunksize > sizeof(fdb_kvs_id_t)
2433        memcpy(&temp, (uint8_t*)buf + (chunksize - size_id), size_id);
2434    }
2435    *id = _endian_decode(temp);
2436}
2437
2438void kvid2buf(size_t chunksize, fdb_kvs_id_t id, void *buf)
2439{
2440    size_t size_id = sizeof(fdb_kvs_id_t);
2441    id = _endian_encode(id);
2442
2443    if (chunksize == size_id) {
2444        memcpy(buf, &id, size_id);
2445    } else if (chunksize < size_id) {
2446        memcpy(buf, (uint8_t*)&id + (size_id - chunksize), chunksize);
2447    } else { // chunksize > sizeof(fdb_kvs_id_t)
2448        memset(buf, 0x0, chunksize - size_id);
2449        memcpy((uint8_t*)buf + (chunksize - size_id), &id, size_id);
2450    }
2451}
2452
2453void buf2buf(size_t chunksize_src, void *buf_src,
2454             size_t chunksize_dst, void *buf_dst)
2455{
2456    if (chunksize_dst == chunksize_src) {
2457        memcpy(buf_dst, buf_src, chunksize_src);
2458    } else if (chunksize_dst < chunksize_src) {
2459        memcpy(buf_dst, (uint8_t*)buf_src + (chunksize_src - chunksize_dst),
2460               chunksize_dst);
2461    } else { // chunksize_dst > chunksize_src
2462        memset(buf_dst, 0x0, chunksize_dst - chunksize_src);
2463        memcpy((uint8_t*)buf_dst + (chunksize_dst - chunksize_src),
2464               buf_src, chunksize_src);
2465    }
2466}
2467