xref: /4.0.0/forestdb/src/filemgr.cc (revision e5d8350a)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <sys/stat.h>
23#include <stdarg.h>
24#if !defined(WIN32) && !defined(_WIN32)
25#include <sys/time.h>
26#endif
27
28#include "filemgr.h"
29#include "filemgr_ops.h"
30#include "hash_functions.h"
31#include "blockcache.h"
32#include "wal.h"
33#include "list.h"
34#include "fdb_internal.h"
35#include "time_utils.h"
36
37#include "memleak.h"
38
39#ifdef __DEBUG
40#ifndef __DEBUG_FILEMGR
41    #undef DBG
42    #undef DBGCMD
43    #undef DBGSW
44    #define DBG(...)
45    #define DBGCMD(...)
46    #define DBGSW(n, ...)
47#endif
48#endif
49
50// NBUCKET must be power of 2
51#define NBUCKET (1024)
52#define FILEMGR_MAGIC (UINT64_C(0xdeadcafebeefbeef))
53
54// global static variables
55#ifdef SPIN_INITIALIZER
56static spin_t initial_lock = SPIN_INITIALIZER;
57#else
58static volatile unsigned int initial_lock_status = 0;
59static spin_t initial_lock;
60#endif
61
62
63static volatile uint8_t filemgr_initialized = 0;
64static struct filemgr_config global_config;
65static struct hash hash;
66static spin_t filemgr_openlock;
67
68struct temp_buf_item{
69    void *addr;
70    struct list_elem le;
71};
72static struct list temp_buf;
73static spin_t temp_buf_lock;
74
75static void _filemgr_free_func(struct hash_elem *h);
76
77static void spin_init_wrap(void *lock) {
78    spin_init((spin_t*)lock);
79}
80
81static void spin_destroy_wrap(void *lock) {
82    spin_destroy((spin_t*)lock);
83}
84
85static void spin_lock_wrap(void *lock) {
86    spin_lock((spin_t*)lock);
87}
88
89static void spin_unlock_wrap(void *lock) {
90    spin_unlock((spin_t*)lock);
91}
92
93static void mutex_init_wrap(void *lock) {
94    mutex_init((mutex_t*)lock);
95}
96
97static void mutex_destroy_wrap(void *lock) {
98    mutex_destroy((mutex_t*)lock);
99}
100
101static void mutex_lock_wrap(void *lock) {
102    mutex_lock((mutex_t*)lock);
103}
104
105static void mutex_unlock_wrap(void *lock) {
106    mutex_unlock((mutex_t*)lock);
107}
108
109static int _kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
110{
111    struct kvs_node *aa, *bb;
112    aa = _get_entry(a, struct kvs_node, avl_id);
113    bb = _get_entry(b, struct kvs_node, avl_id);
114
115    if (aa->id < bb->id) {
116        return -1;
117    } else if (aa->id > bb->id) {
118        return 1;
119    } else {
120        return 0;
121    }
122}
123
124static int _block_is_overlapped(void *pbid1, void *pis_writer1,
125                                void *pbid2, void *pis_writer2,
126                                void *aux)
127{
128    (void)aux;
129    bid_t bid1, is_writer1, bid2, is_writer2;
130    bid1 = *(bid_t*)pbid1;
131    is_writer1 = *(bid_t*)pis_writer1;
132    bid2 = *(bid_t*)pbid2;
133    is_writer2 = *(bid_t*)pis_writer2;
134
135    if (bid1 != bid2) {
136        // not overlapped
137        return 0;
138    } else {
139        // overlapped
140        if (!is_writer1 && !is_writer2) {
141            // both are readers
142            return 0;
143        } else {
144            return 1;
145        }
146    }
147}
148
149fdb_status fdb_log(err_log_callback *log_callback,
150                   fdb_status status,
151                   const char *format, ...)
152{
153    if (log_callback && log_callback->callback) {
154        char msg[1024];
155        va_list args;
156        va_start(args, format);
157        vsprintf(msg, format, args);
158        va_end(args);
159        log_callback->callback(status, msg, log_callback->ctx_data);
160    }
161    return status;
162}
163
164static void _log_errno_str(struct filemgr_ops *ops,
165                           err_log_callback *log_callback,
166                           fdb_status io_error,
167                           const char *what,
168                           const char *filename)
169{
170    if (io_error < 0) {
171        char errno_msg[512];
172        ops->get_errno_str(errno_msg, 512);
173        fdb_log(log_callback, io_error,
174                "Error in %s on a database file '%s', %s", what, filename, errno_msg);
175    }
176}
177
178static uint32_t _file_hash(struct hash *hash, struct hash_elem *e)
179{
180    struct filemgr *file = _get_entry(e, struct filemgr, e);
181    int len = strlen(file->filename);
182    return chksum(file->filename, len) & ((unsigned)(NBUCKET-1));
183}
184
185static int _file_cmp(struct hash_elem *a, struct hash_elem *b)
186{
187    struct filemgr *aa, *bb;
188    aa = _get_entry(a, struct filemgr, e);
189    bb = _get_entry(b, struct filemgr, e);
190    return strcmp(aa->filename, bb->filename);
191}
192
193void filemgr_init(struct filemgr_config *config)
194{
195    // global initialization
196    // initialized only once at first time
197    if (!filemgr_initialized) {
198#ifndef SPIN_INITIALIZER
199        // Note that only Windows passes through this routine
200        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
201            // atomically initialize spin lock only once
202            spin_init(&initial_lock);
203            initial_lock_status = 2;
204        } else {
205            // the others ... wait until initializing 'initial_lock' is done
206            while (initial_lock_status != 2) {
207                Sleep(1);
208            }
209        }
210#endif
211
212        spin_lock(&initial_lock);
213        if (!filemgr_initialized) {
214            global_config = *config;
215
216            if (global_config.ncacheblock > 0)
217                bcache_init(global_config.ncacheblock, global_config.blocksize);
218
219            hash_init(&hash, NBUCKET, _file_hash, _file_cmp);
220
221            // initialize temp buffer
222            list_init(&temp_buf);
223            spin_init(&temp_buf_lock);
224
225            // initialize global lock
226            spin_init(&filemgr_openlock);
227
228            // set the initialize flag
229            filemgr_initialized = 1;
230        }
231        spin_unlock(&initial_lock);
232    }
233}
234
235static void * _filemgr_get_temp_buf()
236{
237    struct list_elem *e;
238    struct temp_buf_item *item;
239
240    spin_lock(&temp_buf_lock);
241    e = list_pop_front(&temp_buf);
242    if (e) {
243        item = _get_entry(e, struct temp_buf_item, le);
244    } else {
245        void *addr;
246
247        malloc_align(addr, FDB_SECTOR_SIZE,
248                     global_config.blocksize + sizeof(struct temp_buf_item));
249
250        item = (struct temp_buf_item *)((uint8_t *) addr + global_config.blocksize);
251        item->addr = addr;
252    }
253    spin_unlock(&temp_buf_lock);
254
255    return item->addr;
256}
257
258static void _filemgr_release_temp_buf(void *buf)
259{
260    struct temp_buf_item *item;
261
262    spin_lock(&temp_buf_lock);
263    item = (struct temp_buf_item*)((uint8_t *)buf + global_config.blocksize);
264    list_push_front(&temp_buf, &item->le);
265    spin_unlock(&temp_buf_lock);
266}
267
268static void _filemgr_shutdown_temp_buf()
269{
270    struct list_elem *e;
271    struct temp_buf_item *item;
272    size_t count=0;
273
274    spin_lock(&temp_buf_lock);
275    e = list_begin(&temp_buf);
276    while(e){
277        item = _get_entry(e, struct temp_buf_item, le);
278        e = list_remove(&temp_buf, e);
279        free_align(item->addr);
280        count++;
281    }
282    spin_unlock(&temp_buf_lock);
283}
284
285static fdb_status _filemgr_read_header(struct filemgr *file,
286                                       err_log_callback *log_callback)
287{
288    uint8_t marker[BLK_MARKER_SIZE];
289    filemgr_magic_t magic;
290    filemgr_header_len_t len;
291    uint8_t *buf;
292    uint32_t crc, crc_file;
293    fdb_status status = FDB_RESULT_SUCCESS;
294
295    // get temp buffer
296    buf = (uint8_t *) _filemgr_get_temp_buf();
297
298    if (atomic_get_uint64_t(&file->pos) > 0) {
299        // Crash Recovery Test 1: unaligned last block write
300        uint64_t remain = atomic_get_uint64_t(&file->pos) % file->blocksize;
301        if (remain) {
302            atomic_sub_uint64_t(&file->pos, remain);
303            atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
304            const char *msg = "Crash Detected: %" _F64 " non-block aligned bytes discarded "
305                "from a database file '%s'\n";
306            DBG(msg, remain, file->filename);
307            fdb_log(log_callback, FDB_RESULT_READ_FAIL /* Need to add a better error code*/,
308                    msg, remain, file->filename);
309        }
310
311        size_t block_counter = 0;
312        do {
313            ssize_t rv = file->ops->pread(file->fd, buf, file->blocksize,
314                atomic_get_uint64_t(&file->pos) - file->blocksize);
315            if (rv != file->blocksize) {
316                status = FDB_RESULT_READ_FAIL;
317                const char *msg = "Unable to read a database file '%s' with "
318                    "blocksize %" _F64 "\n";
319                DBG(msg, file->filename, file->blocksize);
320                fdb_log(log_callback, status, msg, file->filename, file->blocksize);
321                break;
322            }
323            ++block_counter;
324            memcpy(marker, buf + file->blocksize - BLK_MARKER_SIZE,
325                   BLK_MARKER_SIZE);
326
327            if (marker[0] == BLK_MARKER_DBHEADER) {
328                // possible need for byte conversions here
329                memcpy(&magic,
330                       buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
331                       sizeof(magic));
332                magic = _endian_decode(magic);
333
334                if (magic == FILEMGR_MAGIC) {
335                    memcpy(&len,
336                           buf + file->blocksize - BLK_MARKER_SIZE -
337                           sizeof(magic) - sizeof(len),
338                           sizeof(len));
339                    len = _endian_decode(len);
340
341                    crc = chksum(buf, len - sizeof(crc));
342                    memcpy(&crc_file, buf + len - sizeof(crc), sizeof(crc));
343                    crc_file = _endian_decode(crc_file);
344                    if (crc == crc_file) {
345                        file->header.data = (void *)malloc(len);
346
347                        memcpy(file->header.data, buf, len);
348                        memcpy(&file->header.revnum, buf + len,
349                               sizeof(filemgr_header_revnum_t));
350                        memcpy((void *) &file->header.seqnum,
351                                buf + len + sizeof(filemgr_header_revnum_t),
352                                sizeof(fdb_seqnum_t));
353                        file->header.revnum =
354                            _endian_decode(file->header.revnum);
355                        file->header.seqnum =
356                            _endian_decode(file->header.seqnum);
357                        file->header.size = len;
358                        atomic_store_uint64_t(&file->header.bid,
359                            (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1);
360                        atomic_store_uint64_t(&file->header.dirty_idtree_root,
361                                              BLK_NOT_FOUND);
362                        atomic_store_uint64_t(&file->header.dirty_seqtree_root,
363                                              BLK_NOT_FOUND);
364                        memset(&file->header.stat, 0x0, sizeof(file->header.stat));
365
366                        // release temp buffer
367                        _filemgr_release_temp_buf(buf);
368
369                        return FDB_RESULT_SUCCESS;
370                    } else {
371                        status = FDB_RESULT_CHECKSUM_ERROR;
372                        const char *msg = "Crash Detected: CRC on disk %u != %u "
373                            "in a database file '%s'\n";
374                        DBG(msg, crc_file, crc, file->filename);
375                        fdb_log(log_callback, status, msg, crc_file, crc,
376                                file->filename);
377                    }
378                } else {
379                    status = FDB_RESULT_FILE_CORRUPTION;
380                    const char *msg = "Crash Detected: Wrong Magic %" _F64 " != %" _F64
381                        " in a database file '%s'\n";
382                    DBG(msg, magic, FILEMGR_MAGIC, file->filename);
383                    fdb_log(log_callback, status, msg, magic, FILEMGR_MAGIC,
384                            file->filename);
385                }
386            } else {
387                status = FDB_RESULT_NO_DB_HEADERS;
388                if (block_counter == 1) {
389                    const char *msg = "Crash Detected: Last Block not DBHEADER %0.01x "
390                                      "in a database file '%s'\n";
391                    DBG(msg, marker[0], file->filename);
392                    fdb_log(log_callback, status, msg, marker[0], file->filename);
393                }
394            }
395
396            atomic_sub_uint64_t(&file->pos, file->blocksize);
397            atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
398        } while (atomic_get_uint64_t(&file->pos));
399    }
400
401    // release temp buffer
402    _filemgr_release_temp_buf(buf);
403
404    file->header.size = 0;
405    file->header.revnum = 0;
406    file->header.seqnum = 0;
407    file->header.data = NULL;
408    atomic_store_uint64_t(&file->header.bid, 0);
409    atomic_store_uint64_t(&file->header.dirty_idtree_root, BLK_NOT_FOUND);
410    atomic_store_uint64_t(&file->header.dirty_seqtree_root, BLK_NOT_FOUND);
411    memset(&file->header.stat, 0x0, sizeof(file->header.stat));
412    return status;
413}
414
415size_t filemgr_get_ref_count(struct filemgr *file)
416{
417    size_t ret = 0;
418    spin_lock(&file->lock);
419    ret = file->ref_count;
420    spin_unlock(&file->lock);
421    return ret;
422}
423
424uint64_t filemgr_get_bcache_used_space(void)
425{
426    uint64_t bcache_free_space = 0;
427    if (global_config.ncacheblock) { // If buffer cache is indeed configured
428        bcache_free_space = bcache_get_num_free_blocks();
429        bcache_free_space = (global_config.ncacheblock - bcache_free_space)
430                          * global_config.blocksize;
431    }
432    return bcache_free_space;
433}
434
435struct filemgr_prefetch_args {
436    struct filemgr *file;
437    uint64_t duration;
438    err_log_callback *log_callback;
439    void *aux;
440};
441
442static void *_filemgr_prefetch_thread(void *voidargs)
443{
444    struct filemgr_prefetch_args *args = (struct filemgr_prefetch_args*)voidargs;
445    uint8_t *buf = alca(uint8_t, args->file->blocksize);
446    uint64_t cur_pos = 0, i;
447    uint64_t bcache_free_space;
448    bid_t bid;
449    bool terminate = false;
450    struct timeval begin, cur, gap;
451
452    spin_lock(&args->file->lock);
453    cur_pos = atomic_get_uint64_t(&args->file->last_commit);
454    spin_unlock(&args->file->lock);
455    if (cur_pos < FILEMGR_PREFETCH_UNIT) {
456        terminate = true;
457    } else {
458        cur_pos -= FILEMGR_PREFETCH_UNIT;
459    }
460    // read backwards from the end of the file, in the unit of FILEMGR_PREFETCH_UNIT
461    gettimeofday(&begin, NULL);
462    while (!terminate) {
463        for (i = cur_pos;
464             i < cur_pos + FILEMGR_PREFETCH_UNIT;
465             i += args->file->blocksize) {
466
467            gettimeofday(&cur, NULL);
468            gap = _utime_gap(begin, cur);
469            bcache_free_space = bcache_get_num_free_blocks();
470            bcache_free_space *= args->file->blocksize;
471
472            if (args->file->prefetch_status == FILEMGR_PREFETCH_ABORT ||
473                gap.tv_sec >= (int64_t)args->duration ||
474                bcache_free_space < FILEMGR_PREFETCH_UNIT) {
475                // terminate thread when
476                // 1. got abort signal
477                // 2. time out
478                // 3. not enough free space in block cache
479                terminate = true;
480                break;
481            } else {
482                bid = i / args->file->blocksize;
483                if (filemgr_read(args->file, bid, buf, NULL, true)
484                        != FDB_RESULT_SUCCESS) {
485                    // 4. read failure
486                    fdb_log(args->log_callback, FDB_RESULT_READ_FAIL,
487                            "Prefetch thread failed to read a block with block id %" _F64
488                            " from a database file '%s'", bid, args->file->filename);
489                    terminate = true;
490                    break;
491                }
492            }
493        }
494
495        if (cur_pos >= FILEMGR_PREFETCH_UNIT) {
496            cur_pos -= FILEMGR_PREFETCH_UNIT;
497        } else {
498            // remaining space is less than FILEMGR_PREFETCH_UNIT
499            terminate = true;
500        }
501    }
502
503    args->file->prefetch_status = FILEMGR_PREFETCH_IDLE;
504    free(args);
505    return NULL;
506}
507
508// prefetch the given DB file
509void filemgr_prefetch(struct filemgr *file,
510                      struct filemgr_config *config,
511                      err_log_callback *log_callback)
512{
513    uint64_t bcache_free_space;
514
515    bcache_free_space = bcache_get_num_free_blocks();
516    bcache_free_space *= file->blocksize;
517
518    // block cache should have free space larger than FILEMGR_PREFETCH_UNIT
519    spin_lock(&file->lock);
520    if (atomic_get_uint64_t(&file->last_commit) > 0 &&
521        bcache_free_space >= FILEMGR_PREFETCH_UNIT) {
522        // invoke prefetch thread
523        struct filemgr_prefetch_args *args;
524        args = (struct filemgr_prefetch_args *)
525               calloc(1, sizeof(struct filemgr_prefetch_args));
526        args->file = file;
527        args->duration = config->prefetch_duration;
528        args->log_callback = log_callback;
529
530        file->prefetch_status = FILEMGR_PREFETCH_RUNNING;
531        thread_create(&file->prefetch_tid, _filemgr_prefetch_thread, args);
532    }
533    spin_unlock(&file->lock);
534}
535
536fdb_status filemgr_does_file_exist(char *filename) {
537    struct filemgr_ops *ops = get_filemgr_ops();
538    int fd = ops->open(filename, O_RDONLY, 0444);
539    if (fd < 0) {
540        return (fdb_status) fd;
541    }
542    ops->close(fd);
543    return FDB_RESULT_SUCCESS;
544}
545
546filemgr_open_result filemgr_open(char *filename, struct filemgr_ops *ops,
547                                 struct filemgr_config *config,
548                                 err_log_callback *log_callback)
549{
550    struct filemgr *file = NULL;
551    struct filemgr query;
552    struct hash_elem *e = NULL;
553    bool create = config->options & FILEMGR_CREATE;
554    int file_flag = 0x0;
555    int fd = -1;
556    fdb_status status;
557    filemgr_open_result result = {NULL, FDB_RESULT_OPEN_FAIL};
558
559    filemgr_init(config);
560
561    // check whether file is already opened or not
562    query.filename = filename;
563    spin_lock(&filemgr_openlock);
564    e = hash_find(&hash, &query.e);
565
566    if (e) {
567        // already opened (return existing structure)
568        file = _get_entry(e, struct filemgr, e);
569
570        spin_lock(&file->lock);
571        file->ref_count++;
572
573        if (atomic_get_uint8_t(&file->status) == FILE_CLOSED) { // if file was closed before
574            file_flag = O_RDWR;
575            if (create) {
576                file_flag |= O_CREAT;
577            }
578            *file->config = *config;
579            file->config->blocksize = global_config.blocksize;
580            file->config->ncacheblock = global_config.ncacheblock;
581            file_flag |= config->flag;
582            file->fd = file->ops->open(file->filename, file_flag, 0666);
583            if (file->fd < 0) {
584                if (file->fd == FDB_RESULT_NO_SUCH_FILE) {
585                    // A database file was manually deleted by the user.
586                    // Clean up global hash table, WAL index, and buffer cache.
587                    // Then, retry it with a create option below IFF it is not
588                    // a read-only open attempt
589                    struct hash_elem *ret;
590                    spin_unlock(&file->lock);
591                    ret = hash_remove(&hash, &file->e);
592                    fdb_assert(ret, 0, 0);
593                    _filemgr_free_func(&file->e);
594                    if (!create) {
595                        _log_errno_str(ops, log_callback,
596                                FDB_RESULT_NO_SUCH_FILE, "OPEN", filename);
597                        spin_unlock(&filemgr_openlock);
598                        result.rv = FDB_RESULT_NO_SUCH_FILE;
599                        return result;
600                    }
601                } else {
602                    _log_errno_str(file->ops, log_callback,
603                                  (fdb_status)file->fd, "OPEN", filename);
604                    file->ref_count--;
605                    spin_unlock(&file->lock);
606                    spin_unlock(&filemgr_openlock);
607                    result.rv = file->fd;
608                    return result;
609                }
610            } else { // Reopening the closed file is succeed.
611                atomic_store_uint8_t(&file->status, FILE_NORMAL);
612                if (config->options & FILEMGR_SYNC) {
613                    file->fflags |= FILEMGR_SYNC;
614                } else {
615                    file->fflags &= ~FILEMGR_SYNC;
616                }
617                spin_unlock(&file->lock);
618                spin_unlock(&filemgr_openlock);
619                result.file = file;
620                result.rv = FDB_RESULT_SUCCESS;
621                return result;
622            }
623        } else { // file is already opened.
624
625            if (config->options & FILEMGR_SYNC) {
626                file->fflags |= FILEMGR_SYNC;
627            } else {
628                file->fflags &= ~FILEMGR_SYNC;
629            }
630
631            spin_unlock(&file->lock);
632            spin_unlock(&filemgr_openlock);
633            result.file = file;
634            result.rv = FDB_RESULT_SUCCESS;
635            return result;
636        }
637    }
638
639    file_flag = O_RDWR;
640    if (create) {
641        file_flag |= O_CREAT;
642    }
643    file_flag |= config->flag;
644    fd = ops->open(filename, file_flag, 0666);
645    if (fd < 0) {
646        _log_errno_str(ops, log_callback, (fdb_status)fd, "OPEN", filename);
647        spin_unlock(&filemgr_openlock);
648        result.rv = fd;
649        return result;
650    }
651    file = (struct filemgr*)calloc(1, sizeof(struct filemgr));
652    file->filename_len = strlen(filename);
653    file->filename = (char*)malloc(file->filename_len + 1);
654    strcpy(file->filename, filename);
655
656    file->ref_count = 1;
657
658    file->wal = (struct wal *)calloc(1, sizeof(struct wal));
659    file->wal->flag = 0;
660
661    file->ops = ops;
662    file->blocksize = global_config.blocksize;
663    atomic_init_uint8_t(&file->status, FILE_NORMAL);
664    file->config = (struct filemgr_config*)malloc(sizeof(struct filemgr_config));
665    *file->config = *config;
666    file->config->blocksize = global_config.blocksize;
667    file->config->ncacheblock = global_config.ncacheblock;
668    file->new_file = NULL;
669    file->old_filename = NULL;
670    file->fd = fd;
671
672    cs_off_t offset = file->ops->goto_eof(file->fd);
673    if (offset == FDB_RESULT_SEEK_FAIL) {
674        _log_errno_str(file->ops, log_callback, FDB_RESULT_SEEK_FAIL, "SEEK_END", filename);
675        file->ops->close(file->fd);
676        free(file->wal);
677        free(file->filename);
678        free(file->config);
679        free(file);
680        spin_unlock(&filemgr_openlock);
681        result.rv = FDB_RESULT_SEEK_FAIL;
682        return result;
683    }
684    atomic_init_uint64_t(&file->last_commit, offset);
685    atomic_init_uint64_t(&file->pos, offset);
686
687    file->bcache = NULL;
688    file->in_place_compaction = false;
689    file->kv_header = NULL;
690    file->prefetch_status = FILEMGR_PREFETCH_IDLE;
691
692    atomic_init_uint64_t(&file->header.bid, 0);
693    atomic_init_uint64_t(&file->header.dirty_idtree_root, 0);
694    atomic_init_uint64_t(&file->header.dirty_seqtree_root, 0);
695    _init_op_stats(&file->header.op_stat);
696    status = _filemgr_read_header(file, log_callback);
697    if (status != FDB_RESULT_SUCCESS) {
698        _log_errno_str(file->ops, log_callback, status, "READ", filename);
699        file->ops->close(file->fd);
700        free(file->wal);
701        free(file->filename);
702        free(file->config);
703        free(file);
704        spin_unlock(&filemgr_openlock);
705        result.rv = status;
706        return result;
707    }
708
709    spin_init(&file->lock);
710
711#ifdef __FILEMGR_DATA_PARTIAL_LOCK
712    struct plock_ops pops;
713    struct plock_config pconfig;
714
715    pops.init_user = mutex_init_wrap;
716    pops.lock_user = mutex_lock_wrap;
717    pops.unlock_user = mutex_unlock_wrap;
718    pops.destroy_user = mutex_destroy_wrap;
719    pops.init_internal = spin_init_wrap;
720    pops.lock_internal = spin_lock_wrap;
721    pops.unlock_internal = spin_unlock_wrap;
722    pops.destroy_internal = spin_destroy_wrap;
723    pops.is_overlapped = _block_is_overlapped;
724
725    memset(&pconfig, 0x0, sizeof(pconfig));
726    pconfig.ops = &pops;
727    pconfig.sizeof_lock_internal = sizeof(spin_t);
728    pconfig.sizeof_lock_user = sizeof(mutex_t);
729    pconfig.sizeof_range = sizeof(bid_t);
730    pconfig.aux = NULL;
731    plock_init(&file->plock, &pconfig);
732#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
733    int i;
734    for (i=0;i<DLOCK_MAX;++i) {
735        mutex_init(&file->data_mutex[i]);
736    }
737#else
738    int i;
739    for (i=0;i<DLOCK_MAX;++i) {
740        spin_init(&file->data_spinlock[i]);
741    }
742#endif //__FILEMGR_DATA_PARTIAL_LOCK
743
744    mutex_init(&file->writer_lock.mutex);
745    file->writer_lock.locked = false;
746
747    // initialize WAL
748    if (!wal_is_initialized(file)) {
749        wal_init(file, FDB_WAL_NBUCKET);
750    }
751
752    // init global transaction for the file
753    file->global_txn.wrapper = (struct wal_txn_wrapper*)
754                               malloc(sizeof(struct wal_txn_wrapper));
755    file->global_txn.wrapper->txn = &file->global_txn;
756    file->global_txn.handle = NULL;
757    if (atomic_get_uint64_t(&file->pos)) {
758        file->global_txn.prev_hdr_bid =
759            (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1;
760    } else {
761        file->global_txn.prev_hdr_bid = BLK_NOT_FOUND;
762    }
763    file->global_txn.items = (struct list *)malloc(sizeof(struct list));
764    list_init(file->global_txn.items);
765    file->global_txn.isolation = FDB_ISOLATION_READ_COMMITTED;
766    wal_add_transaction(file, &file->global_txn);
767
768    hash_insert(&hash, &file->e);
769    if (config->prefetch_duration > 0) {
770        filemgr_prefetch(file, config, log_callback);
771    }
772    spin_unlock(&filemgr_openlock);
773
774    if (config->options & FILEMGR_SYNC) {
775        file->fflags |= FILEMGR_SYNC;
776    } else {
777        file->fflags &= ~FILEMGR_SYNC;
778    }
779
780    result.file = file;
781    result.rv = FDB_RESULT_SUCCESS;
782    return result;
783}
784
785uint64_t filemgr_update_header(struct filemgr *file, void *buf, size_t len)
786{
787    uint64_t ret;
788
789    spin_lock(&file->lock);
790
791    if (file->header.data == NULL) {
792        file->header.data = (void *)malloc(len);
793    }else if (file->header.size < len){
794        file->header.data = (void *)realloc(file->header.data, len);
795    }
796    memcpy(file->header.data, buf, len);
797    file->header.size = len;
798    ++(file->header.revnum);
799    ret = file->header.revnum;
800
801    spin_unlock(&file->lock);
802
803    return ret;
804}
805
806filemgr_header_revnum_t filemgr_get_header_revnum(struct filemgr *file)
807{
808    filemgr_header_revnum_t ret;
809    spin_lock(&file->lock);
810    ret = file->header.revnum;
811    spin_unlock(&file->lock);
812    return ret;
813}
814
815// 'filemgr_get_seqnum' & 'filemgr_set_seqnum' have to be protected by
816// 'filemgr_mutex_lock' & 'filemgr_mutex_unlock'.
817fdb_seqnum_t filemgr_get_seqnum(struct filemgr *file)
818{
819    return file->header.seqnum;
820}
821
822void filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)
823{
824    file->header.seqnum = seqnum;
825}
826
827void* filemgr_get_header(struct filemgr *file, void *buf, size_t *len)
828{
829    spin_lock(&file->lock);
830
831    if (file->header.size > 0) {
832        if (buf == NULL) {
833            buf = (void*)malloc(file->header.size);
834        }
835        memcpy(buf, file->header.data, file->header.size);
836    }
837    *len = file->header.size;
838
839    spin_unlock(&file->lock);
840
841    return buf;
842}
843
844fdb_status filemgr_fetch_header(struct filemgr *file, uint64_t bid,
845                                void *buf, size_t *len, fdb_seqnum_t *seqnum,
846                                err_log_callback *log_callback)
847{
848    uint8_t *_buf;
849    uint8_t marker[BLK_MARKER_SIZE];
850    filemgr_header_len_t hdr_len;
851    filemgr_magic_t magic;
852    fdb_status status = FDB_RESULT_SUCCESS;
853
854    if (!bid || bid == BLK_NOT_FOUND) {
855        *len = 0; // No other header available
856        return FDB_RESULT_SUCCESS;
857    }
858    _buf = (uint8_t *)_filemgr_get_temp_buf();
859
860    status = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
861
862    if (status != FDB_RESULT_SUCCESS) {
863        fdb_log(log_callback, status,
864                "Failed to read a database header with block id %" _F64 " in "
865                "a database file '%s'", bid, file->filename);
866        _filemgr_release_temp_buf(_buf);
867        return status;
868    }
869    memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
870            BLK_MARKER_SIZE);
871
872    if (marker[0] != BLK_MARKER_DBHEADER) {
873        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
874                "A block marker of the database header block id %" _F64 " in "
875                "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
876                bid, file->filename);
877        _filemgr_release_temp_buf(_buf);
878        return FDB_RESULT_READ_FAIL;
879    }
880    memcpy(&magic,
881            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
882            sizeof(magic));
883    magic = _endian_decode(magic);
884    if (magic != FILEMGR_MAGIC) {
885        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
886                "A block magic value of the database header block id %" _F64 " in "
887                "a database file '%s' does NOT match FILEMGR_MAGIC!",
888                bid, file->filename);
889        _filemgr_release_temp_buf(_buf);
890        return FDB_RESULT_READ_FAIL;
891    }
892    memcpy(&hdr_len,
893            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
894            sizeof(hdr_len), sizeof(hdr_len));
895    hdr_len = _endian_decode(hdr_len);
896
897    memcpy(buf, _buf, hdr_len);
898    *len = hdr_len;
899
900    if (seqnum) {
901        // copy default KVS's seqnum
902        fdb_seqnum_t _seqnum;
903        memcpy(&_seqnum, _buf + hdr_len + sizeof(filemgr_header_revnum_t),
904               sizeof(_seqnum));
905        *seqnum = _endian_decode(_seqnum);
906    }
907
908    _filemgr_release_temp_buf(_buf);
909
910    return status;
911}
912
913uint64_t filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid,
914                                   void *buf, size_t *len, fdb_seqnum_t *seqnum,
915                                   err_log_callback *log_callback)
916{
917    uint8_t *_buf;
918    uint8_t marker[BLK_MARKER_SIZE];
919    fdb_seqnum_t _seqnum;
920    filemgr_header_revnum_t _revnum;
921    filemgr_header_len_t hdr_len;
922    filemgr_magic_t magic;
923    bid_t _prev_bid, prev_bid;
924    int found = 0;
925
926    if (!bid || bid == BLK_NOT_FOUND) {
927        *len = 0; // No other header available
928        return bid;
929    }
930    _buf = (uint8_t *)_filemgr_get_temp_buf();
931
932    // Reverse scan the file for a previous DB header
933    do {
934        // Get prev_bid from the current header.
935        // Since the current header is already cached during the previous
936        // operation, no disk I/O will be triggered.
937        if (filemgr_read(file, (bid_t)bid, _buf, log_callback, true)
938                != FDB_RESULT_SUCCESS) {
939            break;
940        }
941
942        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
943               BLK_MARKER_SIZE);
944        memcpy(&magic,
945               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
946               sizeof(magic));
947        magic = _endian_decode(magic);
948
949        if (marker[0] != BLK_MARKER_DBHEADER ||
950            magic != FILEMGR_MAGIC) {
951            // not a header block
952            // this happens when this function is invoked between
953            // fdb_set() call and fdb_commit() call, so the last block
954            // in the file is not a header block
955            bid_t latest_hdr = filemgr_get_header_bid(file);
956            if (latest_hdr != BLK_NOT_FOUND && bid > latest_hdr) {
957                // get the latest header BID
958                bid = latest_hdr;
959            } else {
960                break;
961            }
962        } else {
963            memcpy(&_prev_bid,
964                   _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
965                       sizeof(hdr_len) - sizeof(_prev_bid),
966                   sizeof(_prev_bid));
967            prev_bid = _endian_decode(_prev_bid);
968            if (bid <= prev_bid) {
969                // no more prev header, or broken linked list
970                break;
971            }
972            bid = prev_bid;
973        }
974
975        // Read the prev header
976        fdb_status fs = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
977        if (fs != FDB_RESULT_SUCCESS) {
978            fdb_log(log_callback, fs,
979                    "Failed to read a previous database header with block id %" _F64 " in "
980                    "a database file '%s'", bid, file->filename);
981            break;
982        }
983
984        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
985               BLK_MARKER_SIZE);
986        if (marker[0] != BLK_MARKER_DBHEADER) {
987            if (bid) {
988                // broken linked list
989                fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
990                        "A block marker of the previous database header block id %"
991                        _F64 " in "
992                        "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
993                        bid, file->filename);
994            }
995            break;
996        }
997
998        memcpy(&magic,
999               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1000               sizeof(magic));
1001        magic = _endian_decode(magic);
1002        if (magic != FILEMGR_MAGIC) {
1003            // broken linked list
1004            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1005                    "A block magic value of the previous database header block id %" _F64 " in "
1006                    "a database file '%s' does NOT match FILEMGR_MAGIC!",
1007                    bid, file->filename);
1008            break;
1009        }
1010
1011        memcpy(&hdr_len,
1012               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1013               sizeof(hdr_len), sizeof(hdr_len));
1014        hdr_len = _endian_decode(hdr_len);
1015
1016        if (buf) {
1017            memcpy(buf, _buf, hdr_len);
1018        }
1019        memcpy(&_revnum, _buf + hdr_len,
1020               sizeof(filemgr_header_revnum_t));
1021        memcpy(&_seqnum,
1022               _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1023               sizeof(fdb_seqnum_t));
1024        *seqnum = _endian_decode(_seqnum);
1025        *len = hdr_len;
1026        found = 1;
1027        break;
1028    } while (false); // no repetition
1029
1030    if (!found) { // no other header found till end of file
1031        *len = 0;
1032    }
1033
1034    _filemgr_release_temp_buf(_buf);
1035
1036    return bid;
1037}
1038
1039fdb_status filemgr_close(struct filemgr *file, bool cleanup_cache_onclose,
1040                         const char *orig_file_name,
1041                         err_log_callback *log_callback)
1042{
1043    int rv = FDB_RESULT_SUCCESS;
1044
1045    spin_lock(&filemgr_openlock); // Grab the filemgr lock to avoid the race with
1046                                  // filemgr_open() because file->lock won't
1047                                  // prevent the race condition.
1048
1049    // remove filemgr structure if no thread refers to the file
1050    spin_lock(&file->lock);
1051    if (--(file->ref_count) == 0) {
1052        spin_unlock(&file->lock);
1053        if (global_config.ncacheblock > 0) {
1054            // discard all dirty blocks belonged to this file
1055            bcache_remove_dirty_blocks(file);
1056        }
1057
1058        if (wal_is_initialized(file)) {
1059            wal_close(file);
1060        }
1061
1062        spin_lock(&file->lock);
1063        rv = file->ops->close(file->fd);
1064        if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING) {
1065            _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1066            // remove file
1067            remove(file->filename);
1068            // we can release lock becuase no one will open this file
1069            spin_unlock(&file->lock);
1070            struct hash_elem *ret = hash_remove(&hash, &file->e);
1071            fdb_assert(ret, 0, 0);
1072            spin_unlock(&filemgr_openlock);
1073            _filemgr_free_func(&file->e);
1074            return (fdb_status) rv;
1075        } else {
1076            if (cleanup_cache_onclose) {
1077                _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1078                if (file->in_place_compaction && orig_file_name) {
1079                    struct hash_elem *elem = NULL;
1080                    struct filemgr query;
1081                    uint32_t old_file_refcount = 0;
1082
1083                    query.filename = (char *)orig_file_name;
1084                    elem = hash_find(&hash, &query.e);
1085
1086                    if (file->old_filename) {
1087                        struct hash_elem *elem_old = NULL;
1088                        struct filemgr query_old;
1089                        struct filemgr *old_file = NULL;
1090
1091                        // get old file's ref count if exists
1092                        query_old.filename = file->old_filename;
1093                        elem_old = hash_find(&hash, &query_old.e);
1094                        if (elem_old) {
1095                            old_file = _get_entry(elem_old, struct filemgr, e);
1096                            old_file_refcount = old_file->ref_count;
1097                        }
1098                    }
1099                    // If old file is opened by other handle, renaming should be
1100                    // postponed. It will be renamed later by the handle referring
1101                    // to the old file.
1102                    if (!elem && old_file_refcount == 0 &&
1103                        rename(file->filename, orig_file_name) < 0) {
1104                        // Note that the renaming failure is not a critical
1105                        // issue because the last compacted file will be automatically
1106                        // identified and opened in the next fdb_open call.
1107                        _log_errno_str(file->ops, log_callback, FDB_RESULT_FILE_RENAME_FAIL,
1108                                       "CLOSE", file->filename);
1109                    }
1110                }
1111                spin_unlock(&file->lock);
1112                // Clean up global hash table, WAL index, and buffer cache.
1113                struct hash_elem *ret = hash_remove(&hash, &file->e);
1114                fdb_assert(ret, file, 0);
1115                spin_unlock(&filemgr_openlock);
1116                _filemgr_free_func(&file->e);
1117                return (fdb_status) rv;
1118            } else {
1119                atomic_store_uint8_t(&file->status, FILE_CLOSED);
1120            }
1121        }
1122    }
1123
1124    _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1125
1126    spin_unlock(&file->lock);
1127    spin_unlock(&filemgr_openlock);
1128    return (fdb_status) rv;
1129}
1130
1131static void _filemgr_free_func(struct hash_elem *h)
1132{
1133    struct filemgr *file = _get_entry(h, struct filemgr, e);
1134
1135    spin_lock(&file->lock);
1136    if (file->prefetch_status == FILEMGR_PREFETCH_RUNNING) {
1137        // prefetch thread is running
1138        void *ret;
1139        file->prefetch_status = FILEMGR_PREFETCH_ABORT;
1140        spin_unlock(&file->lock);
1141        // wait
1142        thread_join(file->prefetch_tid, &ret);
1143    } else {
1144        spin_unlock(&file->lock);
1145    }
1146
1147    // remove all cached blocks
1148    if (global_config.ncacheblock > 0) {
1149        bcache_remove_dirty_blocks(file);
1150        bcache_remove_clean_blocks(file);
1151        bcache_remove_file(file);
1152    }
1153
1154    if (file->kv_header) {
1155        // multi KV intance mode & KV header exists
1156        file->free_kv_header(file);
1157    }
1158
1159    // free global transaction
1160    wal_remove_transaction(file, &file->global_txn);
1161    free(file->global_txn.items);
1162    free(file->global_txn.wrapper);
1163
1164    // destroy WAL
1165    if (wal_is_initialized(file)) {
1166        wal_shutdown(file);
1167        size_t i = 0;
1168        size_t num_all_shards = wal_get_num_all_shards(file);
1169        // Free all WAL shards (including compactor's shard)
1170        for (; i < num_all_shards; ++i) {
1171            hash_free(&file->wal->key_shards[i].hash_bykey);
1172            spin_destroy(&file->wal->key_shards[i].lock);
1173            hash_free(&file->wal->seq_shards[i].hash_byseq);
1174            spin_destroy(&file->wal->seq_shards[i].lock);
1175        }
1176        spin_destroy(&file->wal->lock);
1177        atomic_destroy_uint32_t(&file->wal->size);
1178        atomic_destroy_uint32_t(&file->wal->num_flushable);
1179        atomic_destroy_uint64_t(&file->wal->datasize);
1180        free(file->wal->key_shards);
1181        free(file->wal->seq_shards);
1182    }
1183    free(file->wal);
1184
1185    // free filename and header
1186    free(file->filename);
1187    if (file->header.data) free(file->header.data);
1188    // free old filename if any
1189    free(file->old_filename);
1190
1191    // destroy locks
1192    spin_destroy(&file->lock);
1193
1194#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1195    plock_destroy(&file->plock);
1196#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1197    int i;
1198    for (i=0;i<DLOCK_MAX;++i) {
1199        mutex_destroy(&file->data_mutex[i]);
1200    }
1201#else
1202    int i;
1203    for (i=0;i<DLOCK_MAX;++i) {
1204        spin_destroy(&file->data_spinlock[i]);
1205    }
1206#endif //__FILEMGR_DATA_PARTIAL_LOCK
1207
1208    mutex_destroy(&file->writer_lock.mutex);
1209
1210    // free file structure
1211    free(file->config);
1212    free(file);
1213}
1214
1215// permanently remove file from cache (not just close)
1216// LCOV_EXCL_START
1217void filemgr_remove_file(struct filemgr *file)
1218{
1219    struct hash_elem *ret;
1220
1221    fdb_assert(file, file, NULL);
1222    fdb_assert(file->ref_count <= 0, file->ref_count, 0);
1223
1224    // remove from global hash table
1225    spin_lock(&filemgr_openlock);
1226    ret = hash_remove(&hash, &file->e);
1227    fdb_assert(ret, ret, NULL);
1228    spin_unlock(&filemgr_openlock);
1229
1230    _filemgr_free_func(&file->e);
1231}
1232// LCOV_EXCL_STOP
1233
1234static
1235void *_filemgr_is_closed(struct hash_elem *h, void *ctx) {
1236    struct filemgr *file = _get_entry(h, struct filemgr, e);
1237    void *ret;
1238    spin_lock(&file->lock);
1239    if (file->ref_count != 0) {
1240        ret = (void *)file;
1241    } else {
1242        ret = NULL;
1243    }
1244    spin_unlock(&file->lock);
1245    return ret;
1246}
1247
1248fdb_status filemgr_shutdown()
1249{
1250    fdb_status ret = FDB_RESULT_SUCCESS;
1251    void *open_file;
1252    if (filemgr_initialized) {
1253
1254#ifndef SPIN_INITIALIZER
1255        // Windows: check if spin lock is already destroyed.
1256        if (InterlockedCompareExchange(&initial_lock_status, 1, 2) == 2) {
1257            spin_lock(&initial_lock);
1258        } else {
1259            // filemgr is already shut down
1260            return ret;
1261        }
1262#else
1263        spin_lock(&initial_lock);
1264#endif
1265
1266        if (!filemgr_initialized) {
1267            // filemgr is already shut down
1268#ifdef SPIN_INITIALIZER
1269            spin_unlock(&initial_lock);
1270#endif
1271            return ret;
1272        }
1273
1274        open_file = hash_scan(&hash, _filemgr_is_closed, NULL);
1275        if (!open_file) {
1276            hash_free_active(&hash, _filemgr_free_func);
1277            if (global_config.ncacheblock > 0) {
1278                bcache_shutdown();
1279            }
1280            filemgr_initialized = 0;
1281#ifndef SPIN_INITIALIZER
1282            initial_lock_status = 0;
1283#else
1284            initial_lock = SPIN_INITIALIZER;
1285#endif
1286            _filemgr_shutdown_temp_buf();
1287            spin_unlock(&initial_lock);
1288#ifndef SPIN_INITIALIZER
1289            spin_destroy(&initial_lock);
1290#endif
1291        } else {
1292            spin_unlock(&initial_lock);
1293            ret = FDB_RESULT_FILE_IS_BUSY;
1294        }
1295    }
1296    return ret;
1297}
1298
1299bid_t filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)
1300{
1301    spin_lock(&file->lock);
1302    bid_t bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1303    atomic_add_uint64_t(&file->pos, file->blocksize);
1304
1305    if (global_config.ncacheblock <= 0) {
1306        // if block cache is turned off, write the allocated block before use
1307        uint8_t _buf = 0x0;
1308        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1309                                       atomic_get_uint64_t(&file->pos) - 1);
1310        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1311    }
1312    spin_unlock(&file->lock);
1313
1314    return bid;
1315}
1316
1317void filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin,
1318                            bid_t *end, err_log_callback *log_callback)
1319{
1320    spin_lock(&file->lock);
1321    *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1322    *end = *begin + nblock - 1;
1323    atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1324
1325    if (global_config.ncacheblock <= 0) {
1326        // if block cache is turned off, write the allocated block before use
1327        uint8_t _buf = 0x0;
1328        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1329                                       atomic_get_uint64_t(&file->pos) - 1);
1330        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1331    }
1332    spin_unlock(&file->lock);
1333}
1334
1335// atomically allocate NBLOCK blocks only when current file position is same to nextbid
1336bid_t filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock,
1337                                  bid_t *begin, bid_t *end,
1338                                  err_log_callback *log_callback)
1339{
1340    bid_t bid;
1341    spin_lock(&file->lock);
1342    bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1343    if (bid == nextbid) {
1344        *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1345        *end = *begin + nblock - 1;
1346        atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1347
1348        if (global_config.ncacheblock <= 0) {
1349            // if block cache is turned off, write the allocated block before use
1350            uint8_t _buf = 0x0;
1351            ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1352                                           atomic_get_uint64_t(&file->pos));
1353            _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1354        }
1355    }else{
1356        *begin = BLK_NOT_FOUND;
1357        *end = BLK_NOT_FOUND;
1358    }
1359    spin_unlock(&file->lock);
1360    return bid;
1361}
1362
1363#ifdef __CRC32
1364INLINE fdb_status _filemgr_crc32_check(struct filemgr *file, void *buf)
1365{
1366    if ( *((uint8_t*)buf + file->blocksize-1) == BLK_MARKER_BNODE ) {
1367        uint32_t crc_file, crc;
1368        memcpy(&crc_file, (uint8_t *) buf + BTREE_CRC_OFFSET, sizeof(crc_file));
1369        crc_file = _endian_decode(crc_file);
1370        memset((uint8_t *) buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1371        crc = chksum(buf, file->blocksize);
1372        if (crc != crc_file) {
1373            return FDB_RESULT_CHECKSUM_ERROR;
1374        }
1375    }
1376    return FDB_RESULT_SUCCESS;
1377}
1378#endif
1379
1380void filemgr_invalidate_block(struct filemgr *file, bid_t bid)
1381{
1382    if (global_config.ncacheblock > 0) {
1383        bcache_invalidate_block(file, bid);
1384    }
1385}
1386
1387fdb_status filemgr_read(struct filemgr *file, bid_t bid, void *buf,
1388                        err_log_callback *log_callback,
1389                        bool read_on_cache_miss)
1390{
1391    size_t lock_no;
1392    ssize_t r;
1393    uint64_t pos = bid * file->blocksize;
1394    fdb_status status = FDB_RESULT_SUCCESS;
1395    uint64_t curr_pos = atomic_get_uint64_t(&file->pos);
1396    fdb_assert(pos < curr_pos, pos, curr_pos);
1397
1398    if (global_config.ncacheblock > 0) {
1399        lock_no = bid % DLOCK_MAX;
1400        (void)lock_no;
1401
1402#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1403        plock_entry_t *plock_entry = NULL;
1404        bid_t is_writer = 0;
1405#endif
1406        bool locked = false;
1407        // Note: we don't need to grab lock for committed blocks
1408        // because they are immutable so that no writer will interfere and
1409        // overwrite dirty data
1410        if (filemgr_is_writable(file, bid)) {
1411#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1412            plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1413#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1414            mutex_lock(&file->data_mutex[lock_no]);
1415#else
1416            spin_lock(&file->data_spinlock[lock_no]);
1417#endif //__FILEMGR_DATA_PARTIAL_LOCK
1418            locked = true;
1419        }
1420
1421        r = bcache_read(file, bid, buf);
1422        if (r == 0) {
1423            // cache miss
1424            if (!read_on_cache_miss) {
1425                if (locked) {
1426#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1427                    plock_unlock(&file->plock, plock_entry);
1428#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1429                    mutex_unlock(&file->data_mutex[lock_no]);
1430#else
1431                    spin_unlock(&file->data_spinlock[lock_no]);
1432#endif //__FILEMGR_DATA_PARTIAL_LOCK
1433                }
1434                return FDB_RESULT_READ_FAIL;
1435            }
1436
1437            // if normal file, just read a block
1438            r = file->ops->pread(file->fd, buf, file->blocksize, pos);
1439            if (r != file->blocksize) {
1440                _log_errno_str(file->ops, log_callback,
1441                               (fdb_status) r, "READ", file->filename);
1442                if (locked) {
1443#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1444                    plock_unlock(&file->plock, plock_entry);
1445#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1446                    mutex_unlock(&file->data_mutex[lock_no]);
1447#else
1448                    spin_unlock(&file->data_spinlock[lock_no]);
1449#endif //__FILEMGR_DATA_PARTIAL_LOCK
1450                }
1451                return (fdb_status)r;
1452            }
1453#ifdef __CRC32
1454            status = _filemgr_crc32_check(file, buf);
1455            if (status != FDB_RESULT_SUCCESS) {
1456                _log_errno_str(file->ops, log_callback, status, "READ",
1457                        file->filename);
1458                if (locked) {
1459#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1460                    plock_unlock(&file->plock, plock_entry);
1461#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1462                    mutex_unlock(&file->data_mutex[lock_no]);
1463#else
1464                    spin_unlock(&file->data_spinlock[lock_no]);
1465#endif //__FILEMGR_DATA_PARTIAL_LOCK
1466                }
1467                return status;
1468            }
1469#endif
1470            r = bcache_write(file, bid, buf, BCACHE_REQ_CLEAN);
1471            if (r != global_config.blocksize) {
1472                if (locked) {
1473#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1474                    plock_unlock(&file->plock, plock_entry);
1475#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1476                    mutex_unlock(&file->data_mutex[lock_no]);
1477#else
1478                    spin_unlock(&file->data_spinlock[lock_no]);
1479#endif //__FILEMGR_DATA_PARTIAL_LOCK
1480                }
1481                _log_errno_str(file->ops, log_callback,
1482                               (fdb_status) r, "WRITE", file->filename);
1483                return FDB_RESULT_WRITE_FAIL;
1484            }
1485        }
1486        if (locked) {
1487#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1488            plock_unlock(&file->plock, plock_entry);
1489#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1490            mutex_unlock(&file->data_mutex[lock_no]);
1491#else
1492            spin_unlock(&file->data_spinlock[lock_no]);
1493#endif //__FILEMGR_DATA_PARTIAL_LOCK
1494        }
1495    } else {
1496        if (!read_on_cache_miss) {
1497            return FDB_RESULT_READ_FAIL;
1498        }
1499
1500        r = file->ops->pread(file->fd, buf, file->blocksize, pos);
1501        if (r != file->blocksize) {
1502            _log_errno_str(file->ops, log_callback, (fdb_status) r, "READ",
1503                           file->filename);
1504            return (fdb_status)r;
1505        }
1506
1507#ifdef __CRC32
1508        status = _filemgr_crc32_check(file, buf);
1509        if (status != FDB_RESULT_SUCCESS) {
1510            _log_errno_str(file->ops, log_callback, status, "READ",
1511                           file->filename);
1512            return status;
1513        }
1514#endif
1515    }
1516    return status;
1517}
1518
1519fdb_status filemgr_write_offset(struct filemgr *file, bid_t bid,
1520                                uint64_t offset, uint64_t len, void *buf,
1521                                err_log_callback *log_callback)
1522{
1523    fdb_assert(offset + len <= file->blocksize, offset + len, file);
1524
1525    size_t lock_no;
1526    ssize_t r = 0;
1527    uint64_t pos = bid * file->blocksize + offset;
1528    uint64_t curr_commit_pos = atomic_get_uint64_t(&file->last_commit);
1529    fdb_assert(pos >= curr_commit_pos, pos, curr_commit_pos);
1530
1531    if (global_config.ncacheblock > 0) {
1532        lock_no = bid % DLOCK_MAX;
1533        (void)lock_no;
1534
1535        bool locked = false;
1536#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1537        plock_entry_t *plock_entry;
1538        bid_t is_writer = 1;
1539        plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1540#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1541        mutex_lock(&file->data_mutex[lock_no]);
1542#else
1543        spin_lock(&file->data_spinlock[lock_no]);
1544#endif //__FILEMGR_DATA_PARTIAL_LOCK
1545        locked = true;
1546
1547        if (len == file->blocksize) {
1548            // write entire block .. we don't need to read previous block
1549            r = bcache_write(file, bid, buf, BCACHE_REQ_DIRTY);
1550            if (r != global_config.blocksize) {
1551                if (locked) {
1552#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1553                    plock_unlock(&file->plock, plock_entry);
1554#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1555                    mutex_unlock(&file->data_mutex[lock_no]);
1556#else
1557                    spin_unlock(&file->data_spinlock[lock_no]);
1558#endif //__FILEMGR_DATA_PARTIAL_LOCK
1559                }
1560                _log_errno_str(file->ops, log_callback,
1561                               (fdb_status) r, "WRITE", file->filename);
1562                return FDB_RESULT_WRITE_FAIL;
1563            }
1564        } else {
1565            // partially write buffer cache first
1566            r = bcache_write_partial(file, bid, buf, offset, len);
1567            if (r == 0) {
1568                // cache miss
1569                // write partially .. we have to read previous contents of the block
1570                uint64_t cur_file_pos = file->ops->goto_eof(file->fd);
1571                bid_t cur_file_last_bid = cur_file_pos / file->blocksize;
1572                void *_buf = _filemgr_get_temp_buf();
1573
1574                if (bid >= cur_file_last_bid) {
1575                    // this is the first time to write this block
1576                    // we don't need to read previous block from file.
1577                } else {
1578                    r = file->ops->pread(file->fd, _buf, file->blocksize,
1579                                         bid * file->blocksize);
1580                    if (r != file->blocksize) {
1581                        if (locked) {
1582#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1583                            plock_unlock(&file->plock, plock_entry);
1584#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1585                            mutex_unlock(&file->data_mutex[lock_no]);
1586#else
1587                            spin_unlock(&file->data_spinlock[lock_no]);
1588#endif //__FILEMGR_DATA_PARTIAL_LOCK
1589                        }
1590                        _filemgr_release_temp_buf(_buf);
1591                        _log_errno_str(file->ops, log_callback, (fdb_status) r,
1592                                       "READ", file->filename);
1593                        return FDB_RESULT_READ_FAIL;
1594                    }
1595                }
1596                memcpy((uint8_t *)_buf + offset, buf, len);
1597                r = bcache_write(file, bid, _buf, BCACHE_REQ_DIRTY);
1598                if (r != global_config.blocksize) {
1599                    if (locked) {
1600#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1601                        plock_unlock(&file->plock, plock_entry);
1602#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1603                        mutex_unlock(&file->data_mutex[lock_no]);
1604#else
1605                        spin_unlock(&file->data_spinlock[lock_no]);
1606#endif //__FILEMGR_DATA_PARTIAL_LOCK
1607                    }
1608                    _filemgr_release_temp_buf(_buf);
1609                    _log_errno_str(file->ops, log_callback,
1610                            (fdb_status) r, "WRITE", file->filename);
1611                    return FDB_RESULT_WRITE_FAIL;
1612                }
1613
1614                _filemgr_release_temp_buf(_buf);
1615            } // cache miss
1616        } // full block or partial block
1617
1618        if (locked) {
1619#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1620            plock_unlock(&file->plock, plock_entry);
1621#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1622            mutex_unlock(&file->data_mutex[lock_no]);
1623#else
1624            spin_unlock(&file->data_spinlock[lock_no]);
1625#endif //__FILEMGR_DATA_PARTIAL_LOCK
1626        }
1627
1628    } else { // block cache disabled
1629
1630#ifdef __CRC32
1631        if (len == file->blocksize) {
1632            uint8_t marker = *((uint8_t*)buf + file->blocksize - 1);
1633            if (marker == BLK_MARKER_BNODE) {
1634                memset((uint8_t *)buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1635                uint32_t crc32 = chksum(buf, file->blocksize);
1636                crc32 = _endian_encode(crc32);
1637                memcpy((uint8_t *)buf + BTREE_CRC_OFFSET, &crc32, sizeof(crc32));
1638            }
1639        }
1640#endif
1641
1642        r = file->ops->pwrite(file->fd, buf, len, pos);
1643        _log_errno_str(file->ops, log_callback, (fdb_status) r, "WRITE", file->filename);
1644        if ((uint64_t)r != len) {
1645            return FDB_RESULT_WRITE_FAIL;
1646        }
1647    } // block cache check
1648    return FDB_RESULT_SUCCESS;
1649}
1650
1651fdb_status filemgr_write(struct filemgr *file, bid_t bid, void *buf,
1652                   err_log_callback *log_callback)
1653{
1654    return filemgr_write_offset(file, bid, 0, file->blocksize, buf,
1655                                log_callback);
1656}
1657
1658fdb_status filemgr_commit(struct filemgr *file,
1659                          err_log_callback *log_callback)
1660{
1661    uint16_t header_len = file->header.size;
1662    uint16_t _header_len;
1663    bid_t _prev_bid;
1664    fdb_seqnum_t _seqnum;
1665    filemgr_header_revnum_t _revnum;
1666    int result = FDB_RESULT_SUCCESS;
1667    filemgr_magic_t magic = FILEMGR_MAGIC;
1668    filemgr_magic_t _magic;
1669
1670    if (global_config.ncacheblock > 0) {
1671        result = bcache_flush(file);
1672        if (result != FDB_RESULT_SUCCESS) {
1673            _log_errno_str(file->ops, log_callback, (fdb_status) result,
1674                           "FLUSH", file->filename);
1675            return (fdb_status)result;
1676        }
1677    }
1678
1679    spin_lock(&file->lock);
1680
1681    if (file->header.size > 0 && file->header.data) {
1682        void *buf = _filemgr_get_temp_buf();
1683        uint8_t marker[BLK_MARKER_SIZE];
1684
1685        // [header data]:        'header_len' bytes   <---+
1686        // [header revnum]:      8 bytes                  |
1687        // [default KVS seqnum]: 8 bytes                  |
1688        // ...                                            |
1689        // (empty)                                    blocksize
1690        // ...                                            |
1691        // [prev header bid]:    8 bytes                  |
1692        // [header length]:      2 bytes                  |
1693        // [magic number]:       8 bytes                  |
1694        // [block marker]:       1 byte               <---+
1695
1696        // header data
1697        memcpy(buf, file->header.data, header_len);
1698        // header rev number
1699        _revnum = _endian_encode(file->header.revnum);
1700        memcpy((uint8_t *)buf + header_len, &_revnum,
1701               sizeof(filemgr_header_revnum_t));
1702        // file's sequence number (default KVS seqnum)
1703        _seqnum = _endian_encode(file->header.seqnum);
1704        memcpy((uint8_t *)buf + header_len + sizeof(filemgr_header_revnum_t),
1705               &_seqnum, sizeof(fdb_seqnum_t));
1706
1707        // prev header bid
1708        _prev_bid = _endian_encode(atomic_get_uint64_t(&file->header.bid));
1709        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1710               - sizeof(header_len) - sizeof(_prev_bid) - BLK_MARKER_SIZE),
1711               &_prev_bid, sizeof(_prev_bid));
1712        // header length
1713        _header_len = _endian_encode(header_len);
1714        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1715               - sizeof(header_len) - BLK_MARKER_SIZE),
1716               &_header_len, sizeof(header_len));
1717        // magic number
1718        _magic = _endian_encode(magic);
1719        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1720               - BLK_MARKER_SIZE), &_magic, sizeof(magic));
1721
1722        // marker
1723        memset(marker, BLK_MARKER_DBHEADER, BLK_MARKER_SIZE);
1724        memcpy((uint8_t *)buf + file->blocksize - BLK_MARKER_SIZE,
1725               marker, BLK_MARKER_SIZE);
1726
1727        ssize_t rv = file->ops->pwrite(file->fd, buf, file->blocksize,
1728                                       atomic_get_uint64_t(&file->pos));
1729        _log_errno_str(file->ops, log_callback, (fdb_status) rv,
1730                       "WRITE", file->filename);
1731        if (rv != file->blocksize) {
1732            _filemgr_release_temp_buf(buf);
1733            spin_unlock(&file->lock);
1734            return FDB_RESULT_WRITE_FAIL;
1735        }
1736        atomic_store_uint64_t(&file->header.bid,
1737                              atomic_get_uint64_t(&file->pos) / file->blocksize);
1738        atomic_add_uint64_t(&file->pos, file->blocksize);
1739
1740        atomic_store_uint64_t(&file->header.dirty_idtree_root, BLK_NOT_FOUND);
1741        atomic_store_uint64_t(&file->header.dirty_seqtree_root, BLK_NOT_FOUND);
1742
1743        _filemgr_release_temp_buf(buf);
1744    }
1745    // race condition?
1746    atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
1747
1748    spin_unlock(&file->lock);
1749
1750    if (file->fflags & FILEMGR_SYNC) {
1751        result = file->ops->fsync(file->fd);
1752        _log_errno_str(file->ops, log_callback, (fdb_status)result, "FSYNC", file->filename);
1753    }
1754    return (fdb_status) result;
1755}
1756
1757fdb_status filemgr_sync(struct filemgr *file, err_log_callback *log_callback)
1758{
1759    fdb_status result = FDB_RESULT_SUCCESS;
1760    if (global_config.ncacheblock > 0) {
1761        result = bcache_flush(file);
1762        if (result != FDB_RESULT_SUCCESS) {
1763            _log_errno_str(file->ops, log_callback, (fdb_status) result,
1764                           "FLUSH", file->filename);
1765            return result;
1766        }
1767    }
1768
1769    if (file->fflags & FILEMGR_SYNC) {
1770        int rv = file->ops->fsync(file->fd);
1771        _log_errno_str(file->ops, log_callback, (fdb_status)rv, "FSYNC", file->filename);
1772        return (fdb_status) rv;
1773    }
1774    return result;
1775}
1776
1777int filemgr_update_file_status(struct filemgr *file, file_status_t status,
1778                                char *old_filename)
1779{
1780    int ret = 1;
1781    spin_lock(&file->lock);
1782    atomic_store_uint8_t(&file->status, status);
1783    if (old_filename) {
1784        if (!file->old_filename) {
1785            file->old_filename = old_filename;
1786        } else {
1787            ret = 0;
1788            fdb_assert(file->ref_count, file->ref_count, 0);
1789            free(old_filename);
1790        }
1791    }
1792    spin_unlock(&file->lock);
1793    return ret;
1794}
1795
1796void filemgr_set_compaction_state(struct filemgr *old_file, struct filemgr *new_file,
1797                                  file_status_t status)
1798{
1799    spin_lock(&old_file->lock);
1800    old_file->new_file = new_file;
1801    atomic_store_uint8_t(&old_file->status, status);
1802    spin_unlock(&old_file->lock);
1803}
1804
1805// Check if there is a file that still points to the old_file that is being
1806// compacted away. If so open the file and return its pointer.
1807static
1808void *_filemgr_check_stale_link(struct hash_elem *h, void *ctx) {
1809    struct filemgr *cur_file = (struct filemgr *)ctx;
1810    struct filemgr *file = _get_entry(h, struct filemgr, e);
1811    spin_lock(&file->lock);
1812    if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING &&
1813        file->new_file == cur_file) {
1814        // Incrementing reference counter below is the same as filemgr_open()
1815        // We need to do this to ensure that the pointer returned does not
1816        // get freed outside the filemgr_open lock
1817        file->ref_count++;
1818        spin_unlock(&file->lock);
1819        return (void *)file;
1820    }
1821    spin_unlock(&file->lock);
1822    return (void *)NULL;
1823}
1824
1825struct filemgr *filemgr_search_stale_links(struct filemgr *cur_file) {
1826    struct filemgr *very_old_file;
1827    spin_lock(&filemgr_openlock);
1828    very_old_file = (struct filemgr *)hash_scan(&hash,
1829                                         _filemgr_check_stale_link, cur_file);
1830    spin_unlock(&filemgr_openlock);
1831    return very_old_file;
1832}
1833
1834char *filemgr_redirect_old_file(struct filemgr *very_old_file,
1835                                     struct filemgr *new_file,
1836                                     filemgr_redirect_hdr_func
1837                                     redirect_header_func) {
1838    size_t old_header_len, new_header_len;
1839    uint16_t new_filename_len;
1840    char *past_filename;
1841    spin_lock(&very_old_file->lock);
1842    fdb_assert(very_old_file->header.size, very_old_file->header.size, 0);
1843    fdb_assert(very_old_file->new_file, very_old_file->new_file, 0);
1844    old_header_len = very_old_file->header.size;
1845    new_filename_len = strlen(new_file->filename);
1846    // Find out the new DB header length with new_file's filename
1847    new_header_len = old_header_len - strlen(very_old_file->new_file->filename)
1848        + new_filename_len;
1849    // As we are going to change the new_filename field in the DB header of the
1850    // very_old_file, maybe reallocate DB header buf to accomodate bigger value
1851    if (new_header_len > old_header_len) {
1852        very_old_file->header.data = realloc(very_old_file->header.data,
1853                new_header_len);
1854    }
1855    very_old_file->new_file = new_file; // Re-direct very_old_file to new_file
1856    past_filename = redirect_header_func((uint8_t *)very_old_file->header.data,
1857            new_file->filename, new_filename_len + 1);//Update in-memory header
1858    very_old_file->header.size = new_header_len;
1859    ++(very_old_file->header.revnum);
1860
1861    spin_unlock(&very_old_file->lock);
1862    return past_filename;
1863}
1864
1865void filemgr_remove_pending(struct filemgr *old_file, struct filemgr *new_file)
1866{
1867    fdb_assert(new_file, new_file, old_file);
1868
1869    spin_lock(&old_file->lock);
1870    if (old_file->ref_count > 0) {
1871        // delay removing
1872        old_file->new_file = new_file;
1873        atomic_store_uint8_t(&old_file->status, FILE_REMOVED_PENDING);
1874        spin_unlock(&old_file->lock);
1875    } else {
1876        // immediatly remove
1877        // LCOV_EXCL_START
1878        spin_unlock(&old_file->lock);
1879        remove(old_file->filename);
1880        filemgr_remove_file(old_file);
1881        // LCOV_EXCL_STOP
1882    }
1883}
1884
1885// migrate default kv store stats over to new_file
1886struct kvs_ops_stat *filemgr_migrate_op_stats(struct filemgr *old_file,
1887                                              struct filemgr *new_file,
1888                                              struct kvs_info *kvs)
1889{
1890    kvs_ops_stat *ret = NULL;
1891    fdb_assert(new_file, new_file, old_file);
1892
1893    spin_lock(&old_file->lock);
1894    new_file->header.op_stat = old_file->header.op_stat;
1895    ret = &new_file->header.op_stat;
1896    spin_unlock(&old_file->lock);
1897    return ret;
1898}
1899
1900// Note: filemgr_openlock should be held before calling this function.
1901fdb_status filemgr_destroy_file(char *filename,
1902                                struct filemgr_config *config,
1903                                struct hash *destroy_file_set)
1904{
1905    struct filemgr *file = NULL;
1906    struct hash to_destroy_files;
1907    struct hash *destroy_set = (destroy_file_set ? destroy_file_set :
1908                                                  &to_destroy_files);
1909    struct filemgr query;
1910    struct hash_elem *e = NULL;
1911    fdb_status status = FDB_RESULT_SUCCESS;
1912    char *old_filename = NULL;
1913
1914    if (!destroy_file_set) { // top level or non-recursive call
1915        hash_init(destroy_set, NBUCKET, _file_hash, _file_cmp);
1916    }
1917
1918    query.filename = filename;
1919    // check whether file is already being destroyed in parent recursive call
1920    e = hash_find(destroy_set, &query.e);
1921    if (e) { // Duplicate filename found, nothing to be done in this call
1922        if (!destroy_file_set) { // top level or non-recursive call
1923            hash_free(destroy_set);
1924        }
1925        return status;
1926    } else {
1927        // Remember file. Stack value ok IFF single direction recursion
1928        hash_insert(destroy_set, &query.e);
1929    }
1930
1931    // check global list of known files to see if it is already opened or not
1932    e = hash_find(&hash, &query.e);
1933    if (e) {
1934        // already opened (return existing structure)
1935        file = _get_entry(e, struct filemgr, e);
1936
1937        spin_lock(&file->lock);
1938        if (file->ref_count) {
1939            spin_unlock(&file->lock);
1940            status = FDB_RESULT_FILE_IS_BUSY;
1941            if (!destroy_file_set) { // top level or non-recursive call
1942                hash_free(destroy_set);
1943            }
1944            return status;
1945        }
1946        spin_unlock(&file->lock);
1947        if (file->old_filename) {
1948            status = filemgr_destroy_file(file->old_filename, config,
1949                                          destroy_set);
1950            if (status != FDB_RESULT_SUCCESS) {
1951                if (!destroy_file_set) { // top level or non-recursive call
1952                    hash_free(destroy_set);
1953                }
1954                return status;
1955            }
1956        }
1957
1958        // Cleanup file from in-memory as well as on-disk
1959        e = hash_remove(&hash, &file->e);
1960        fdb_assert(e, e, 0);
1961        _filemgr_free_func(&file->e);
1962        if (filemgr_does_file_exist(filename) == FDB_RESULT_SUCCESS) {
1963            if (remove(filename)) {
1964                status = FDB_RESULT_FILE_REMOVE_FAIL;
1965            }
1966        }
1967    } else { // file not in memory, read on-disk to destroy older versions..
1968        file = (struct filemgr *)alca(struct filemgr, 1);
1969        file->filename = filename;
1970        file->ops = get_filemgr_ops();
1971        file->fd = file->ops->open(file->filename, O_RDWR, 0666);
1972        file->blocksize = global_config.blocksize;
1973        if (file->fd < 0) {
1974            if (file->fd != FDB_RESULT_NO_SUCH_FILE) {
1975                if (!destroy_file_set) { // top level or non-recursive call
1976                    hash_free(destroy_set);
1977                }
1978                return FDB_RESULT_OPEN_FAIL;
1979            }
1980        } else { // file successfully opened, seek to end to get DB header
1981            cs_off_t offset = file->ops->goto_eof(file->fd);
1982            if (offset == FDB_RESULT_SEEK_FAIL) {
1983                if (!destroy_file_set) { // top level or non-recursive call
1984                    hash_free(destroy_set);
1985                }
1986                return FDB_RESULT_SEEK_FAIL;
1987            } else { // Need to read DB header which contains old filename
1988                atomic_store_uint64_t(&file->pos, offset);
1989                status = _filemgr_read_header(file, NULL);
1990                if (status != FDB_RESULT_SUCCESS) {
1991                    if (!destroy_file_set) { // top level or non-recursive call
1992                        hash_free(destroy_set);
1993                    }
1994                    file->ops->close(file->fd);
1995                    return status;
1996                }
1997                if (file->header.data) {
1998                    uint16_t *new_filename_len_ptr = (uint16_t *)((char *)
1999                                                     file->header.data + 64);
2000                    uint16_t new_filename_len =
2001                                      _endian_decode(*new_filename_len_ptr);
2002                    uint16_t *old_filename_len_ptr = (uint16_t *)((char *)
2003                                                     file->header.data + 66);
2004                    uint16_t old_filename_len =
2005                                      _endian_decode(*old_filename_len_ptr);
2006                    old_filename = (char *)file->header.data + 68
2007                                   + new_filename_len;
2008                    if (old_filename_len) {
2009                        status = filemgr_destroy_file(old_filename, config,
2010                                                      destroy_set);
2011                    }
2012                    free(file->header.data);
2013                }
2014                file->ops->close(file->fd);
2015                if (status == FDB_RESULT_SUCCESS) {
2016                    if (filemgr_does_file_exist(filename)
2017                                               == FDB_RESULT_SUCCESS) {
2018                        if (remove(filename)) {
2019                            status = FDB_RESULT_FILE_REMOVE_FAIL;
2020                        }
2021                    }
2022                }
2023            }
2024        }
2025    }
2026
2027    if (!destroy_file_set) { // top level or non-recursive call
2028        hash_free(destroy_set);
2029    }
2030
2031    return status;
2032}
2033
2034bool filemgr_is_rollback_on(struct filemgr *file)
2035{
2036    bool rv;
2037    spin_lock(&file->lock);
2038    rv = (file->fflags & FILEMGR_ROLLBACK_IN_PROG);
2039    spin_unlock(&file->lock);
2040    return rv;
2041}
2042
2043void filemgr_set_rollback(struct filemgr *file, uint8_t new_val)
2044{
2045    spin_lock(&file->lock);
2046    if (new_val) {
2047        file->fflags |= FILEMGR_ROLLBACK_IN_PROG;
2048    } else {
2049        file->fflags &= ~FILEMGR_ROLLBACK_IN_PROG;
2050    }
2051    spin_unlock(&file->lock);
2052}
2053
2054void filemgr_set_in_place_compaction(struct filemgr *file,
2055                                     bool in_place_compaction) {
2056    spin_lock(&file->lock);
2057    file->in_place_compaction = in_place_compaction;
2058    spin_unlock(&file->lock);
2059}
2060
2061bool filemgr_is_in_place_compaction_set(struct filemgr *file)
2062{
2063    bool ret = false;
2064    spin_lock(&file->lock);
2065    ret = file->in_place_compaction;
2066    spin_unlock(&file->lock);
2067    return ret;
2068}
2069
2070void filemgr_mutex_openlock(struct filemgr_config *config)
2071{
2072    filemgr_init(config);
2073
2074    spin_lock(&filemgr_openlock);
2075}
2076
2077void filemgr_mutex_openunlock(void)
2078{
2079    spin_unlock(&filemgr_openlock);
2080}
2081
2082void filemgr_mutex_lock(struct filemgr *file)
2083{
2084    mutex_lock(&file->writer_lock.mutex);
2085    file->writer_lock.locked = true;
2086}
2087
2088bool filemgr_mutex_trylock(struct filemgr *file) {
2089    if (mutex_trylock(&file->writer_lock.mutex)) {
2090        file->writer_lock.locked = true;
2091        return true;
2092    }
2093    return false;
2094}
2095
2096void filemgr_mutex_unlock(struct filemgr *file)
2097{
2098    if (file->writer_lock.locked) {
2099        file->writer_lock.locked = false;
2100        mutex_unlock(&file->writer_lock.mutex);
2101    }
2102}
2103
2104void filemgr_set_dirty_root(struct filemgr *file,
2105                            bid_t dirty_idtree_root,
2106                            bid_t dirty_seqtree_root)
2107{
2108    atomic_store_uint64_t(&file->header.dirty_idtree_root, dirty_idtree_root);
2109    atomic_store_uint64_t(&file->header.dirty_seqtree_root, dirty_seqtree_root);
2110}
2111
2112bool filemgr_is_commit_header(void *head_buffer, size_t blocksize)
2113{
2114    uint8_t marker[BLK_MARKER_SIZE];
2115    filemgr_magic_t magic;
2116    marker[0] = *(((uint8_t *)head_buffer)
2117                 + blocksize - BLK_MARKER_SIZE);
2118    if (marker[0] != BLK_MARKER_DBHEADER) {
2119        return false;
2120    }
2121
2122    memcpy(&magic, (uint8_t *) head_buffer
2123            + blocksize - BLK_MARKER_SIZE - sizeof(magic), sizeof(magic));
2124    magic = _endian_decode(magic);
2125
2126    return (magic == FILEMGR_MAGIC);
2127}
2128
2129void _kvs_stat_set(struct filemgr *file,
2130                   fdb_kvs_id_t kv_id,
2131                   struct kvs_stat stat)
2132{
2133    if (kv_id == 0) {
2134        spin_lock(&file->lock);
2135        file->header.stat = stat;
2136        spin_unlock(&file->lock);
2137    } else {
2138        struct avl_node *a;
2139        struct kvs_node query, *node;
2140        struct kvs_header *kv_header = file->kv_header;
2141
2142        spin_lock(&kv_header->lock);
2143        query.id = kv_id;
2144        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2145        if (a) {
2146            node = _get_entry(a, struct kvs_node, avl_id);
2147            node->stat = stat;
2148        }
2149        spin_unlock(&kv_header->lock);
2150    }
2151}
2152
2153void _kvs_stat_update_attr(struct filemgr *file,
2154                           fdb_kvs_id_t kv_id,
2155                           kvs_stat_attr_t attr,
2156                           int delta)
2157{
2158    spin_t *lock = NULL;
2159    struct kvs_stat *stat;
2160
2161    if (kv_id == 0) {
2162        stat = &file->header.stat;
2163        lock = &file->lock;
2164        spin_lock(lock);
2165    } else {
2166        struct avl_node *a;
2167        struct kvs_node query, *node;
2168        struct kvs_header *kv_header = file->kv_header;
2169
2170        lock = &kv_header->lock;
2171        spin_lock(lock);
2172        query.id = kv_id;
2173        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2174        if (!a) {
2175            // KV instance corresponding to the kv_id is already removed
2176            spin_unlock(lock);
2177            return;
2178        }
2179        node = _get_entry(a, struct kvs_node, avl_id);
2180        stat = &node->stat;
2181    }
2182
2183    if (attr == KVS_STAT_DATASIZE) {
2184        stat->datasize += delta;
2185    } else if (attr == KVS_STAT_NDOCS) {
2186        stat->ndocs += delta;
2187    } else if (attr == KVS_STAT_NLIVENODES) {
2188        stat->nlivenodes += delta;
2189    } else if (attr == KVS_STAT_WAL_NDELETES) {
2190        stat->wal_ndeletes += delta;
2191    } else if (attr == KVS_STAT_WAL_NDOCS) {
2192        stat->wal_ndocs += delta;
2193    }
2194    spin_unlock(lock);
2195}
2196
2197int _kvs_stat_get_kv_header(struct kvs_header *kv_header,
2198                            fdb_kvs_id_t kv_id,
2199                            struct kvs_stat *stat)
2200{
2201    int ret = 0;
2202    struct avl_node *a;
2203    struct kvs_node query, *node;
2204
2205    query.id = kv_id;
2206    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2207    if (a) {
2208        node = _get_entry(a, struct kvs_node, avl_id);
2209        *stat = node->stat;
2210    } else {
2211        ret = -1;
2212    }
2213    return ret;
2214}
2215
2216int _kvs_stat_get(struct filemgr *file,
2217                  fdb_kvs_id_t kv_id,
2218                  struct kvs_stat *stat)
2219{
2220    int ret = 0;
2221
2222    if (kv_id == 0) {
2223        spin_lock(&file->lock);
2224        *stat = file->header.stat;
2225        spin_unlock(&file->lock);
2226    } else {
2227        struct kvs_header *kv_header = file->kv_header;
2228
2229        spin_lock(&kv_header->lock);
2230        ret = _kvs_stat_get_kv_header(kv_header, kv_id, stat);
2231        spin_unlock(&kv_header->lock);
2232    }
2233
2234    return ret;
2235}
2236
2237uint64_t _kvs_stat_get_sum(struct filemgr *file,
2238                           kvs_stat_attr_t attr)
2239{
2240    struct avl_node *a;
2241    struct kvs_node *node;
2242    struct kvs_header *kv_header = file->kv_header;
2243
2244    uint64_t ret = 0;
2245    spin_lock(&file->lock);
2246    if (attr == KVS_STAT_DATASIZE) {
2247        ret += file->header.stat.datasize;
2248    } else if (attr == KVS_STAT_NDOCS) {
2249        ret += file->header.stat.ndocs;
2250    } else if (attr == KVS_STAT_NLIVENODES) {
2251        ret += file->header.stat.nlivenodes;
2252    } else if (attr == KVS_STAT_WAL_NDELETES) {
2253        ret += file->header.stat.wal_ndeletes;
2254    } else if (attr == KVS_STAT_WAL_NDOCS) {
2255        ret += file->header.stat.wal_ndocs;
2256    }
2257    spin_unlock(&file->lock);
2258
2259    if (kv_header) {
2260        spin_lock(&kv_header->lock);
2261        a = avl_first(kv_header->idx_id);
2262        while (a) {
2263            node = _get_entry(a, struct kvs_node, avl_id);
2264            a = avl_next(&node->avl_id);
2265
2266            if (attr == KVS_STAT_DATASIZE) {
2267                ret += node->stat.datasize;
2268            } else if (attr == KVS_STAT_NDOCS) {
2269                ret += node->stat.ndocs;
2270            } else if (attr == KVS_STAT_NLIVENODES) {
2271                ret += node->stat.nlivenodes;
2272            } else if (attr == KVS_STAT_WAL_NDELETES) {
2273                ret += node->stat.wal_ndeletes;
2274            } else if (attr == KVS_STAT_WAL_NDOCS) {
2275                ret += node->stat.wal_ndocs;
2276            }
2277        }
2278        spin_unlock(&kv_header->lock);
2279    }
2280
2281    return ret;
2282}
2283
2284int _kvs_ops_stat_get_kv_header(struct kvs_header *kv_header,
2285                                fdb_kvs_id_t kv_id,
2286                                struct kvs_ops_stat *stat)
2287{
2288    int ret = 0;
2289    struct avl_node *a;
2290    struct kvs_node query, *node;
2291
2292    query.id = kv_id;
2293    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2294    if (a) {
2295        node = _get_entry(a, struct kvs_node, avl_id);
2296        *stat = node->op_stat;
2297    } else {
2298        ret = -1;
2299    }
2300    return ret;
2301}
2302
2303int _kvs_ops_stat_get(struct filemgr *file,
2304                      fdb_kvs_id_t kv_id,
2305                      struct kvs_ops_stat *stat)
2306{
2307    int ret = 0;
2308
2309    if (kv_id == 0) {
2310        spin_lock(&file->lock);
2311        *stat = file->header.op_stat;
2312        spin_unlock(&file->lock);
2313    } else {
2314        struct kvs_header *kv_header = file->kv_header;
2315
2316        spin_lock(&kv_header->lock);
2317        ret = _kvs_ops_stat_get_kv_header(kv_header, kv_id, stat);
2318        spin_unlock(&kv_header->lock);
2319    }
2320
2321    return ret;
2322}
2323
2324void _init_op_stats(struct kvs_ops_stat *stat) {
2325    atomic_init_uint64_t(&stat->num_sets, 0);
2326    atomic_init_uint64_t(&stat->num_dels, 0);
2327    atomic_init_uint64_t(&stat->num_commits, 0);
2328    atomic_init_uint64_t(&stat->num_compacts, 0);
2329    atomic_init_uint64_t(&stat->num_gets, 0);
2330    atomic_init_uint64_t(&stat->num_iterator_gets, 0);
2331    atomic_init_uint64_t(&stat->num_iterator_moves, 0);
2332}
2333
2334struct kvs_ops_stat *filemgr_get_ops_stats(struct filemgr *file,
2335                                           struct kvs_info *kvs)
2336{
2337    struct kvs_ops_stat *stat = NULL;
2338    if (!kvs || (kvs && kvs->id == 0)) {
2339        return &file->header.op_stat;
2340    } else {
2341        struct kvs_header *kv_header = file->kv_header;
2342        struct avl_node *a;
2343        struct kvs_node query, *node;
2344        spin_lock(&kv_header->lock);
2345        query.id = kvs->id;
2346        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2347        if (a) {
2348            node = _get_entry(a, struct kvs_node, avl_id);
2349            stat = &node->op_stat;
2350        }
2351        spin_unlock(&kv_header->lock);
2352    }
2353    return stat;
2354}
2355
2356void buf2kvid(size_t chunksize, void *buf, fdb_kvs_id_t *id)
2357{
2358    size_t size_id = sizeof(fdb_kvs_id_t);
2359    fdb_kvs_id_t temp;
2360
2361    if (chunksize == size_id) {
2362        temp = *((fdb_kvs_id_t*)buf);
2363    } else if (chunksize < size_id) {
2364        temp = 0;
2365        memcpy((uint8_t*)&temp + (size_id - chunksize), buf, chunksize);
2366    } else { // chunksize > sizeof(fdb_kvs_id_t)
2367        memcpy(&temp, (uint8_t*)buf + (chunksize - size_id), size_id);
2368    }
2369    *id = _endian_decode(temp);
2370}
2371
2372void kvid2buf(size_t chunksize, fdb_kvs_id_t id, void *buf)
2373{
2374    size_t size_id = sizeof(fdb_kvs_id_t);
2375    id = _endian_encode(id);
2376
2377    if (chunksize == size_id) {
2378        memcpy(buf, &id, size_id);
2379    } else if (chunksize < size_id) {
2380        memcpy(buf, (uint8_t*)&id + (size_id - chunksize), chunksize);
2381    } else { // chunksize > sizeof(fdb_kvs_id_t)
2382        memset(buf, 0x0, chunksize - size_id);
2383        memcpy((uint8_t*)buf + (chunksize - size_id), &id, size_id);
2384    }
2385}
2386
2387void buf2buf(size_t chunksize_src, void *buf_src,
2388             size_t chunksize_dst, void *buf_dst)
2389{
2390    if (chunksize_dst == chunksize_src) {
2391        memcpy(buf_dst, buf_src, chunksize_src);
2392    } else if (chunksize_dst < chunksize_src) {
2393        memcpy(buf_dst, (uint8_t*)buf_src + (chunksize_src - chunksize_dst),
2394               chunksize_dst);
2395    } else { // chunksize_dst > chunksize_src
2396        memset(buf_dst, 0x0, chunksize_dst - chunksize_src);
2397        memcpy((uint8_t*)buf_dst + (chunksize_dst - chunksize_src),
2398               buf_src, chunksize_src);
2399    }
2400}
2401