xref: /4.0.0/forestdb/src/filemgr.cc (revision 41a273e3)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <sys/stat.h>
23#include <stdarg.h>
24#if !defined(WIN32) && !defined(_WIN32)
25#include <sys/time.h>
26#endif
27
28#include "filemgr.h"
29#include "filemgr_ops.h"
30#include "hash_functions.h"
31#include "blockcache.h"
32#include "wal.h"
33#include "list.h"
34#include "fdb_internal.h"
35#include "time_utils.h"
36
37#include "memleak.h"
38
39#ifdef __DEBUG
40#ifndef __DEBUG_FILEMGR
41    #undef DBG
42    #undef DBGCMD
43    #undef DBGSW
44    #define DBG(...)
45    #define DBGCMD(...)
46    #define DBGSW(n, ...)
47#endif
48#endif
49
50// NBUCKET must be power of 2
51#define NBUCKET (1024)
52#define FILEMGR_MAGIC (UINT64_C(0xdeadcafebeefbeef))
53
54// global static variables
55#ifdef SPIN_INITIALIZER
56static spin_t initial_lock = SPIN_INITIALIZER;
57#else
58static volatile unsigned int initial_lock_status = 0;
59static spin_t initial_lock;
60#endif
61
62
63static volatile uint8_t filemgr_initialized = 0;
64static struct filemgr_config global_config;
65static struct hash hash;
66static spin_t filemgr_openlock;
67
68struct temp_buf_item{
69    void *addr;
70    struct list_elem le;
71};
72static struct list temp_buf;
73static spin_t temp_buf_lock;
74
75static bool lazy_file_deletion_enabled = false;
76static register_file_removal_func register_file_removal = NULL;
77static check_file_removal_func is_file_removed = NULL;
78
79static void spin_init_wrap(void *lock) {
80    spin_init((spin_t*)lock);
81}
82
83static void spin_destroy_wrap(void *lock) {
84    spin_destroy((spin_t*)lock);
85}
86
87static void spin_lock_wrap(void *lock) {
88    spin_lock((spin_t*)lock);
89}
90
91static void spin_unlock_wrap(void *lock) {
92    spin_unlock((spin_t*)lock);
93}
94
95static void mutex_init_wrap(void *lock) {
96    mutex_init((mutex_t*)lock);
97}
98
99static void mutex_destroy_wrap(void *lock) {
100    mutex_destroy((mutex_t*)lock);
101}
102
103static void mutex_lock_wrap(void *lock) {
104    mutex_lock((mutex_t*)lock);
105}
106
107static void mutex_unlock_wrap(void *lock) {
108    mutex_unlock((mutex_t*)lock);
109}
110
111static int _kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
112{
113    struct kvs_node *aa, *bb;
114    aa = _get_entry(a, struct kvs_node, avl_id);
115    bb = _get_entry(b, struct kvs_node, avl_id);
116
117    if (aa->id < bb->id) {
118        return -1;
119    } else if (aa->id > bb->id) {
120        return 1;
121    } else {
122        return 0;
123    }
124}
125
126static int _block_is_overlapped(void *pbid1, void *pis_writer1,
127                                void *pbid2, void *pis_writer2,
128                                void *aux)
129{
130    (void)aux;
131    bid_t bid1, is_writer1, bid2, is_writer2;
132    bid1 = *(bid_t*)pbid1;
133    is_writer1 = *(bid_t*)pis_writer1;
134    bid2 = *(bid_t*)pbid2;
135    is_writer2 = *(bid_t*)pis_writer2;
136
137    if (bid1 != bid2) {
138        // not overlapped
139        return 0;
140    } else {
141        // overlapped
142        if (!is_writer1 && !is_writer2) {
143            // both are readers
144            return 0;
145        } else {
146            return 1;
147        }
148    }
149}
150
151fdb_status fdb_log(err_log_callback *log_callback,
152                   fdb_status status,
153                   const char *format, ...)
154{
155    if (log_callback && log_callback->callback) {
156        char msg[1024];
157        va_list args;
158        va_start(args, format);
159        vsprintf(msg, format, args);
160        va_end(args);
161        log_callback->callback(status, msg, log_callback->ctx_data);
162    }
163    return status;
164}
165
166static void _log_errno_str(struct filemgr_ops *ops,
167                           err_log_callback *log_callback,
168                           fdb_status io_error,
169                           const char *what,
170                           const char *filename)
171{
172    if (io_error < 0) {
173        char errno_msg[512];
174        ops->get_errno_str(errno_msg, 512);
175        fdb_log(log_callback, io_error,
176                "Error in %s on a database file '%s', %s", what, filename, errno_msg);
177    }
178}
179
180static uint32_t _file_hash(struct hash *hash, struct hash_elem *e)
181{
182    struct filemgr *file = _get_entry(e, struct filemgr, e);
183    int len = strlen(file->filename);
184    return chksum(file->filename, len) & ((unsigned)(NBUCKET-1));
185}
186
187static int _file_cmp(struct hash_elem *a, struct hash_elem *b)
188{
189    struct filemgr *aa, *bb;
190    aa = _get_entry(a, struct filemgr, e);
191    bb = _get_entry(b, struct filemgr, e);
192    return strcmp(aa->filename, bb->filename);
193}
194
195void filemgr_init(struct filemgr_config *config)
196{
197    // global initialization
198    // initialized only once at first time
199    if (!filemgr_initialized) {
200#ifndef SPIN_INITIALIZER
201        // Note that only Windows passes through this routine
202        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
203            // atomically initialize spin lock only once
204            spin_init(&initial_lock);
205            initial_lock_status = 2;
206        } else {
207            // the others ... wait until initializing 'initial_lock' is done
208            while (initial_lock_status != 2) {
209                Sleep(1);
210            }
211        }
212#endif
213
214        spin_lock(&initial_lock);
215        if (!filemgr_initialized) {
216            global_config = *config;
217
218            if (global_config.ncacheblock > 0)
219                bcache_init(global_config.ncacheblock, global_config.blocksize);
220
221            hash_init(&hash, NBUCKET, _file_hash, _file_cmp);
222
223            // initialize temp buffer
224            list_init(&temp_buf);
225            spin_init(&temp_buf_lock);
226
227            // initialize global lock
228            spin_init(&filemgr_openlock);
229
230            // set the initialize flag
231            filemgr_initialized = 1;
232        }
233        spin_unlock(&initial_lock);
234    }
235}
236
237void filemgr_set_lazy_file_deletion(bool enable,
238                                    register_file_removal_func regis_func,
239                                    check_file_removal_func check_func)
240{
241    lazy_file_deletion_enabled = enable;
242    register_file_removal = regis_func;
243    is_file_removed = check_func;
244}
245
246static void * _filemgr_get_temp_buf()
247{
248    struct list_elem *e;
249    struct temp_buf_item *item;
250
251    spin_lock(&temp_buf_lock);
252    e = list_pop_front(&temp_buf);
253    if (e) {
254        item = _get_entry(e, struct temp_buf_item, le);
255    } else {
256        void *addr;
257
258        malloc_align(addr, FDB_SECTOR_SIZE,
259                     global_config.blocksize + sizeof(struct temp_buf_item));
260
261        item = (struct temp_buf_item *)((uint8_t *) addr + global_config.blocksize);
262        item->addr = addr;
263    }
264    spin_unlock(&temp_buf_lock);
265
266    return item->addr;
267}
268
269static void _filemgr_release_temp_buf(void *buf)
270{
271    struct temp_buf_item *item;
272
273    spin_lock(&temp_buf_lock);
274    item = (struct temp_buf_item*)((uint8_t *)buf + global_config.blocksize);
275    list_push_front(&temp_buf, &item->le);
276    spin_unlock(&temp_buf_lock);
277}
278
279static void _filemgr_shutdown_temp_buf()
280{
281    struct list_elem *e;
282    struct temp_buf_item *item;
283    size_t count=0;
284
285    spin_lock(&temp_buf_lock);
286    e = list_begin(&temp_buf);
287    while(e){
288        item = _get_entry(e, struct temp_buf_item, le);
289        e = list_remove(&temp_buf, e);
290        free_align(item->addr);
291        count++;
292    }
293    spin_unlock(&temp_buf_lock);
294}
295
296static fdb_status _filemgr_read_header(struct filemgr *file,
297                                       err_log_callback *log_callback)
298{
299    uint8_t marker[BLK_MARKER_SIZE];
300    filemgr_magic_t magic;
301    filemgr_header_len_t len;
302    uint8_t *buf;
303    uint32_t crc, crc_file;
304    fdb_status status = FDB_RESULT_SUCCESS;
305
306    // get temp buffer
307    buf = (uint8_t *) _filemgr_get_temp_buf();
308
309    if (atomic_get_uint64_t(&file->pos) > 0) {
310        // Crash Recovery Test 1: unaligned last block write
311        uint64_t remain = atomic_get_uint64_t(&file->pos) % file->blocksize;
312        if (remain) {
313            atomic_sub_uint64_t(&file->pos, remain);
314            atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
315            const char *msg = "Crash Detected: %" _F64 " non-block aligned bytes discarded "
316                "from a database file '%s'\n";
317            DBG(msg, remain, file->filename);
318            fdb_log(log_callback, FDB_RESULT_READ_FAIL /* Need to add a better error code*/,
319                    msg, remain, file->filename);
320        }
321
322        size_t block_counter = 0;
323        do {
324            ssize_t rv = file->ops->pread(file->fd, buf, file->blocksize,
325                atomic_get_uint64_t(&file->pos) - file->blocksize);
326            if (rv != file->blocksize) {
327                status = FDB_RESULT_READ_FAIL;
328                const char *msg = "Unable to read a database file '%s' with "
329                    "blocksize %" _F64 "\n";
330                DBG(msg, file->filename, file->blocksize);
331                fdb_log(log_callback, status, msg, file->filename, file->blocksize);
332                break;
333            }
334            ++block_counter;
335            memcpy(marker, buf + file->blocksize - BLK_MARKER_SIZE,
336                   BLK_MARKER_SIZE);
337
338            if (marker[0] == BLK_MARKER_DBHEADER) {
339                // possible need for byte conversions here
340                memcpy(&magic,
341                       buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
342                       sizeof(magic));
343                magic = _endian_decode(magic);
344
345                if (magic == FILEMGR_MAGIC) {
346                    memcpy(&len,
347                           buf + file->blocksize - BLK_MARKER_SIZE -
348                           sizeof(magic) - sizeof(len),
349                           sizeof(len));
350                    len = _endian_decode(len);
351
352                    crc = chksum(buf, len - sizeof(crc));
353                    memcpy(&crc_file, buf + len - sizeof(crc), sizeof(crc));
354                    crc_file = _endian_decode(crc_file);
355                    if (crc == crc_file) {
356                        file->header.data = (void *)malloc(len);
357
358                        memcpy(file->header.data, buf, len);
359                        memcpy(&file->header.revnum, buf + len,
360                               sizeof(filemgr_header_revnum_t));
361                        memcpy((void *) &file->header.seqnum,
362                                buf + len + sizeof(filemgr_header_revnum_t),
363                                sizeof(fdb_seqnum_t));
364                        file->header.revnum =
365                            _endian_decode(file->header.revnum);
366                        file->header.seqnum =
367                            _endian_decode(file->header.seqnum);
368                        file->header.size = len;
369                        atomic_store_uint64_t(&file->header.bid,
370                            (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1);
371                        atomic_store_uint64_t(&file->header.dirty_idtree_root,
372                                              BLK_NOT_FOUND);
373                        atomic_store_uint64_t(&file->header.dirty_seqtree_root,
374                                              BLK_NOT_FOUND);
375                        memset(&file->header.stat, 0x0, sizeof(file->header.stat));
376
377                        // release temp buffer
378                        _filemgr_release_temp_buf(buf);
379
380                        return FDB_RESULT_SUCCESS;
381                    } else {
382                        status = FDB_RESULT_CHECKSUM_ERROR;
383                        const char *msg = "Crash Detected: CRC on disk %u != %u "
384                            "in a database file '%s'\n";
385                        DBG(msg, crc_file, crc, file->filename);
386                        fdb_log(log_callback, status, msg, crc_file, crc,
387                                file->filename);
388                    }
389                } else {
390                    status = FDB_RESULT_FILE_CORRUPTION;
391                    const char *msg = "Crash Detected: Wrong Magic %" _F64 " != %" _F64
392                        " in a database file '%s'\n";
393                    DBG(msg, magic, FILEMGR_MAGIC, file->filename);
394                    fdb_log(log_callback, status, msg, magic, FILEMGR_MAGIC,
395                            file->filename);
396                }
397            } else {
398                status = FDB_RESULT_NO_DB_HEADERS;
399                if (block_counter == 1) {
400                    const char *msg = "Crash Detected: Last Block not DBHEADER %0.01x "
401                                      "in a database file '%s'\n";
402                    DBG(msg, marker[0], file->filename);
403                    fdb_log(log_callback, status, msg, marker[0], file->filename);
404                }
405            }
406
407            atomic_sub_uint64_t(&file->pos, file->blocksize);
408            atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
409        } while (atomic_get_uint64_t(&file->pos));
410    }
411
412    // release temp buffer
413    _filemgr_release_temp_buf(buf);
414
415    file->header.size = 0;
416    file->header.revnum = 0;
417    file->header.seqnum = 0;
418    file->header.data = NULL;
419    atomic_store_uint64_t(&file->header.bid, 0);
420    atomic_store_uint64_t(&file->header.dirty_idtree_root, BLK_NOT_FOUND);
421    atomic_store_uint64_t(&file->header.dirty_seqtree_root, BLK_NOT_FOUND);
422    memset(&file->header.stat, 0x0, sizeof(file->header.stat));
423    return status;
424}
425
426size_t filemgr_get_ref_count(struct filemgr *file)
427{
428    size_t ret = 0;
429    spin_lock(&file->lock);
430    ret = file->ref_count;
431    spin_unlock(&file->lock);
432    return ret;
433}
434
435uint64_t filemgr_get_bcache_used_space(void)
436{
437    uint64_t bcache_free_space = 0;
438    if (global_config.ncacheblock) { // If buffer cache is indeed configured
439        bcache_free_space = bcache_get_num_free_blocks();
440        bcache_free_space = (global_config.ncacheblock - bcache_free_space)
441                          * global_config.blocksize;
442    }
443    return bcache_free_space;
444}
445
446struct filemgr_prefetch_args {
447    struct filemgr *file;
448    uint64_t duration;
449    err_log_callback *log_callback;
450    void *aux;
451};
452
453static void *_filemgr_prefetch_thread(void *voidargs)
454{
455    struct filemgr_prefetch_args *args = (struct filemgr_prefetch_args*)voidargs;
456    uint8_t *buf = alca(uint8_t, args->file->blocksize);
457    uint64_t cur_pos = 0, i;
458    uint64_t bcache_free_space;
459    bid_t bid;
460    bool terminate = false;
461    struct timeval begin, cur, gap;
462
463    spin_lock(&args->file->lock);
464    cur_pos = atomic_get_uint64_t(&args->file->last_commit);
465    spin_unlock(&args->file->lock);
466    if (cur_pos < FILEMGR_PREFETCH_UNIT) {
467        terminate = true;
468    } else {
469        cur_pos -= FILEMGR_PREFETCH_UNIT;
470    }
471    // read backwards from the end of the file, in the unit of FILEMGR_PREFETCH_UNIT
472    gettimeofday(&begin, NULL);
473    while (!terminate) {
474        for (i = cur_pos;
475             i < cur_pos + FILEMGR_PREFETCH_UNIT;
476             i += args->file->blocksize) {
477
478            gettimeofday(&cur, NULL);
479            gap = _utime_gap(begin, cur);
480            bcache_free_space = bcache_get_num_free_blocks();
481            bcache_free_space *= args->file->blocksize;
482
483            if (args->file->prefetch_status == FILEMGR_PREFETCH_ABORT ||
484                gap.tv_sec >= (int64_t)args->duration ||
485                bcache_free_space < FILEMGR_PREFETCH_UNIT) {
486                // terminate thread when
487                // 1. got abort signal
488                // 2. time out
489                // 3. not enough free space in block cache
490                terminate = true;
491                break;
492            } else {
493                bid = i / args->file->blocksize;
494                if (filemgr_read(args->file, bid, buf, NULL, true)
495                        != FDB_RESULT_SUCCESS) {
496                    // 4. read failure
497                    fdb_log(args->log_callback, FDB_RESULT_READ_FAIL,
498                            "Prefetch thread failed to read a block with block id %" _F64
499                            " from a database file '%s'", bid, args->file->filename);
500                    terminate = true;
501                    break;
502                }
503            }
504        }
505
506        if (cur_pos >= FILEMGR_PREFETCH_UNIT) {
507            cur_pos -= FILEMGR_PREFETCH_UNIT;
508        } else {
509            // remaining space is less than FILEMGR_PREFETCH_UNIT
510            terminate = true;
511        }
512    }
513
514    args->file->prefetch_status = FILEMGR_PREFETCH_IDLE;
515    free(args);
516    return NULL;
517}
518
519// prefetch the given DB file
520void filemgr_prefetch(struct filemgr *file,
521                      struct filemgr_config *config,
522                      err_log_callback *log_callback)
523{
524    uint64_t bcache_free_space;
525
526    bcache_free_space = bcache_get_num_free_blocks();
527    bcache_free_space *= file->blocksize;
528
529    // block cache should have free space larger than FILEMGR_PREFETCH_UNIT
530    spin_lock(&file->lock);
531    if (atomic_get_uint64_t(&file->last_commit) > 0 &&
532        bcache_free_space >= FILEMGR_PREFETCH_UNIT) {
533        // invoke prefetch thread
534        struct filemgr_prefetch_args *args;
535        args = (struct filemgr_prefetch_args *)
536               calloc(1, sizeof(struct filemgr_prefetch_args));
537        args->file = file;
538        args->duration = config->prefetch_duration;
539        args->log_callback = log_callback;
540
541        file->prefetch_status = FILEMGR_PREFETCH_RUNNING;
542        thread_create(&file->prefetch_tid, _filemgr_prefetch_thread, args);
543    }
544    spin_unlock(&file->lock);
545}
546
547fdb_status filemgr_does_file_exist(char *filename) {
548    struct filemgr_ops *ops = get_filemgr_ops();
549    int fd = ops->open(filename, O_RDONLY, 0444);
550    if (fd < 0) {
551        return (fdb_status) fd;
552    }
553    ops->close(fd);
554    return FDB_RESULT_SUCCESS;
555}
556
557filemgr_open_result filemgr_open(char *filename, struct filemgr_ops *ops,
558                                 struct filemgr_config *config,
559                                 err_log_callback *log_callback)
560{
561    struct filemgr *file = NULL;
562    struct filemgr query;
563    struct hash_elem *e = NULL;
564    bool create = config->options & FILEMGR_CREATE;
565    int file_flag = 0x0;
566    int fd = -1;
567    fdb_status status;
568    filemgr_open_result result = {NULL, FDB_RESULT_OPEN_FAIL};
569
570    filemgr_init(config);
571
572    // check whether file is already opened or not
573    query.filename = filename;
574    spin_lock(&filemgr_openlock);
575    e = hash_find(&hash, &query.e);
576
577    if (e) {
578        // already opened (return existing structure)
579        file = _get_entry(e, struct filemgr, e);
580
581        spin_lock(&file->lock);
582        file->ref_count++;
583
584        if (atomic_get_uint8_t(&file->status) == FILE_CLOSED) { // if file was closed before
585            file_flag = O_RDWR;
586            if (create) {
587                file_flag |= O_CREAT;
588            }
589            *file->config = *config;
590            file->config->blocksize = global_config.blocksize;
591            file->config->ncacheblock = global_config.ncacheblock;
592            file_flag |= config->flag;
593            file->fd = file->ops->open(file->filename, file_flag, 0666);
594            if (file->fd < 0) {
595                if (file->fd == FDB_RESULT_NO_SUCH_FILE) {
596                    // A database file was manually deleted by the user.
597                    // Clean up global hash table, WAL index, and buffer cache.
598                    // Then, retry it with a create option below IFF it is not
599                    // a read-only open attempt
600                    struct hash_elem *ret;
601                    spin_unlock(&file->lock);
602                    ret = hash_remove(&hash, &file->e);
603                    fdb_assert(ret, 0, 0);
604                    filemgr_free_func(&file->e);
605                    if (!create) {
606                        _log_errno_str(ops, log_callback,
607                                FDB_RESULT_NO_SUCH_FILE, "OPEN", filename);
608                        spin_unlock(&filemgr_openlock);
609                        result.rv = FDB_RESULT_NO_SUCH_FILE;
610                        return result;
611                    }
612                } else {
613                    _log_errno_str(file->ops, log_callback,
614                                  (fdb_status)file->fd, "OPEN", filename);
615                    file->ref_count--;
616                    spin_unlock(&file->lock);
617                    spin_unlock(&filemgr_openlock);
618                    result.rv = file->fd;
619                    return result;
620                }
621            } else { // Reopening the closed file is succeed.
622                atomic_store_uint8_t(&file->status, FILE_NORMAL);
623                if (config->options & FILEMGR_SYNC) {
624                    file->fflags |= FILEMGR_SYNC;
625                } else {
626                    file->fflags &= ~FILEMGR_SYNC;
627                }
628                spin_unlock(&file->lock);
629                spin_unlock(&filemgr_openlock);
630                result.file = file;
631                result.rv = FDB_RESULT_SUCCESS;
632                return result;
633            }
634        } else { // file is already opened.
635
636            if (config->options & FILEMGR_SYNC) {
637                file->fflags |= FILEMGR_SYNC;
638            } else {
639                file->fflags &= ~FILEMGR_SYNC;
640            }
641
642            spin_unlock(&file->lock);
643            spin_unlock(&filemgr_openlock);
644            result.file = file;
645            result.rv = FDB_RESULT_SUCCESS;
646            return result;
647        }
648    }
649
650    file_flag = O_RDWR;
651    if (create) {
652        file_flag |= O_CREAT;
653    }
654    file_flag |= config->flag;
655    fd = ops->open(filename, file_flag, 0666);
656    if (fd < 0) {
657        _log_errno_str(ops, log_callback, (fdb_status)fd, "OPEN", filename);
658        spin_unlock(&filemgr_openlock);
659        result.rv = fd;
660        return result;
661    }
662    file = (struct filemgr*)calloc(1, sizeof(struct filemgr));
663    file->filename_len = strlen(filename);
664    file->filename = (char*)malloc(file->filename_len + 1);
665    strcpy(file->filename, filename);
666
667    file->ref_count = 1;
668
669    file->wal = (struct wal *)calloc(1, sizeof(struct wal));
670    file->wal->flag = 0;
671
672    file->ops = ops;
673    file->blocksize = global_config.blocksize;
674    atomic_init_uint8_t(&file->status, FILE_NORMAL);
675    file->config = (struct filemgr_config*)malloc(sizeof(struct filemgr_config));
676    *file->config = *config;
677    file->config->blocksize = global_config.blocksize;
678    file->config->ncacheblock = global_config.ncacheblock;
679    file->new_file = NULL;
680    file->old_filename = NULL;
681    file->fd = fd;
682
683    cs_off_t offset = file->ops->goto_eof(file->fd);
684    if (offset == FDB_RESULT_SEEK_FAIL) {
685        _log_errno_str(file->ops, log_callback, FDB_RESULT_SEEK_FAIL, "SEEK_END", filename);
686        file->ops->close(file->fd);
687        free(file->wal);
688        free(file->filename);
689        free(file->config);
690        free(file);
691        spin_unlock(&filemgr_openlock);
692        result.rv = FDB_RESULT_SEEK_FAIL;
693        return result;
694    }
695    atomic_init_uint64_t(&file->last_commit, offset);
696    atomic_init_uint64_t(&file->pos, offset);
697    atomic_init_uint32_t(&file->throttling_delay, 0);
698
699    file->bcache = NULL;
700    file->in_place_compaction = false;
701    file->kv_header = NULL;
702    file->prefetch_status = FILEMGR_PREFETCH_IDLE;
703
704    atomic_init_uint64_t(&file->header.bid, 0);
705    atomic_init_uint64_t(&file->header.dirty_idtree_root, 0);
706    atomic_init_uint64_t(&file->header.dirty_seqtree_root, 0);
707    _init_op_stats(&file->header.op_stat);
708    status = _filemgr_read_header(file, log_callback);
709    if (status != FDB_RESULT_SUCCESS) {
710        _log_errno_str(file->ops, log_callback, status, "READ", filename);
711        file->ops->close(file->fd);
712        free(file->wal);
713        free(file->filename);
714        free(file->config);
715        free(file);
716        spin_unlock(&filemgr_openlock);
717        result.rv = status;
718        return result;
719    }
720
721    spin_init(&file->lock);
722
723#ifdef __FILEMGR_DATA_PARTIAL_LOCK
724    struct plock_ops pops;
725    struct plock_config pconfig;
726
727    pops.init_user = mutex_init_wrap;
728    pops.lock_user = mutex_lock_wrap;
729    pops.unlock_user = mutex_unlock_wrap;
730    pops.destroy_user = mutex_destroy_wrap;
731    pops.init_internal = spin_init_wrap;
732    pops.lock_internal = spin_lock_wrap;
733    pops.unlock_internal = spin_unlock_wrap;
734    pops.destroy_internal = spin_destroy_wrap;
735    pops.is_overlapped = _block_is_overlapped;
736
737    memset(&pconfig, 0x0, sizeof(pconfig));
738    pconfig.ops = &pops;
739    pconfig.sizeof_lock_internal = sizeof(spin_t);
740    pconfig.sizeof_lock_user = sizeof(mutex_t);
741    pconfig.sizeof_range = sizeof(bid_t);
742    pconfig.aux = NULL;
743    plock_init(&file->plock, &pconfig);
744#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
745    int i;
746    for (i=0;i<DLOCK_MAX;++i) {
747        mutex_init(&file->data_mutex[i]);
748    }
749#else
750    int i;
751    for (i=0;i<DLOCK_MAX;++i) {
752        spin_init(&file->data_spinlock[i]);
753    }
754#endif //__FILEMGR_DATA_PARTIAL_LOCK
755
756    mutex_init(&file->writer_lock.mutex);
757    file->writer_lock.locked = false;
758
759    // initialize WAL
760    if (!wal_is_initialized(file)) {
761        wal_init(file, FDB_WAL_NBUCKET);
762    }
763
764    // init global transaction for the file
765    file->global_txn.wrapper = (struct wal_txn_wrapper*)
766                               malloc(sizeof(struct wal_txn_wrapper));
767    file->global_txn.wrapper->txn = &file->global_txn;
768    file->global_txn.handle = NULL;
769    if (atomic_get_uint64_t(&file->pos)) {
770        file->global_txn.prev_hdr_bid =
771            (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1;
772    } else {
773        file->global_txn.prev_hdr_bid = BLK_NOT_FOUND;
774    }
775    file->global_txn.items = (struct list *)malloc(sizeof(struct list));
776    list_init(file->global_txn.items);
777    file->global_txn.isolation = FDB_ISOLATION_READ_COMMITTED;
778    wal_add_transaction(file, &file->global_txn);
779
780    hash_insert(&hash, &file->e);
781    if (config->prefetch_duration > 0) {
782        filemgr_prefetch(file, config, log_callback);
783    }
784    spin_unlock(&filemgr_openlock);
785
786    if (config->options & FILEMGR_SYNC) {
787        file->fflags |= FILEMGR_SYNC;
788    } else {
789        file->fflags &= ~FILEMGR_SYNC;
790    }
791
792    result.file = file;
793    result.rv = FDB_RESULT_SUCCESS;
794    return result;
795}
796
797uint64_t filemgr_update_header(struct filemgr *file, void *buf, size_t len)
798{
799    uint64_t ret;
800
801    spin_lock(&file->lock);
802
803    if (file->header.data == NULL) {
804        file->header.data = (void *)malloc(len);
805    }else if (file->header.size < len){
806        file->header.data = (void *)realloc(file->header.data, len);
807    }
808    memcpy(file->header.data, buf, len);
809    file->header.size = len;
810    ++(file->header.revnum);
811    ret = file->header.revnum;
812
813    spin_unlock(&file->lock);
814
815    return ret;
816}
817
818filemgr_header_revnum_t filemgr_get_header_revnum(struct filemgr *file)
819{
820    filemgr_header_revnum_t ret;
821    spin_lock(&file->lock);
822    ret = file->header.revnum;
823    spin_unlock(&file->lock);
824    return ret;
825}
826
827// 'filemgr_get_seqnum' & 'filemgr_set_seqnum' have to be protected by
828// 'filemgr_mutex_lock' & 'filemgr_mutex_unlock'.
829fdb_seqnum_t filemgr_get_seqnum(struct filemgr *file)
830{
831    return file->header.seqnum;
832}
833
834void filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)
835{
836    file->header.seqnum = seqnum;
837}
838
839void* filemgr_get_header(struct filemgr *file, void *buf, size_t *len,
840                         bid_t *header_bid, fdb_seqnum_t *seqnum,
841                         filemgr_header_revnum_t *header_revnum)
842{
843    spin_lock(&file->lock);
844
845    if (file->header.size > 0) {
846        if (buf == NULL) {
847            buf = (void*)malloc(file->header.size);
848        }
849        memcpy(buf, file->header.data, file->header.size);
850    }
851
852    if (len) {
853        *len = file->header.size;
854    }
855    if (header_bid) {
856        *header_bid = ((file->header.size > 0) ?
857                       atomic_get_uint64_t(&file->header.bid) : BLK_NOT_FOUND);
858    }
859    if (seqnum) {
860        *seqnum = file->header.seqnum;
861    }
862    if (header_revnum) {
863        *header_revnum = file->header.revnum;
864    }
865
866    spin_unlock(&file->lock);
867
868    return buf;
869}
870
871fdb_status filemgr_fetch_header(struct filemgr *file, uint64_t bid,
872                                void *buf, size_t *len, fdb_seqnum_t *seqnum,
873                                filemgr_header_revnum_t *header_revnum,
874                                err_log_callback *log_callback)
875{
876    uint8_t *_buf;
877    uint8_t marker[BLK_MARKER_SIZE];
878    filemgr_header_len_t hdr_len;
879    filemgr_magic_t magic;
880    fdb_status status = FDB_RESULT_SUCCESS;
881
882    if (!bid || bid == BLK_NOT_FOUND) {
883        *len = 0; // No other header available
884        return FDB_RESULT_SUCCESS;
885    }
886    _buf = (uint8_t *)_filemgr_get_temp_buf();
887
888    status = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
889
890    if (status != FDB_RESULT_SUCCESS) {
891        fdb_log(log_callback, status,
892                "Failed to read a database header with block id %" _F64 " in "
893                "a database file '%s'", bid, file->filename);
894        _filemgr_release_temp_buf(_buf);
895        return status;
896    }
897    memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
898            BLK_MARKER_SIZE);
899
900    if (marker[0] != BLK_MARKER_DBHEADER) {
901        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
902                "A block marker of the database header block id %" _F64 " in "
903                "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
904                bid, file->filename);
905        _filemgr_release_temp_buf(_buf);
906        return FDB_RESULT_READ_FAIL;
907    }
908    memcpy(&magic,
909            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
910            sizeof(magic));
911    magic = _endian_decode(magic);
912    if (magic != FILEMGR_MAGIC) {
913        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
914                "A block magic value of the database header block id %" _F64 " in "
915                "a database file '%s' does NOT match FILEMGR_MAGIC!",
916                bid, file->filename);
917        _filemgr_release_temp_buf(_buf);
918        return FDB_RESULT_READ_FAIL;
919    }
920    memcpy(&hdr_len,
921            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
922            sizeof(hdr_len), sizeof(hdr_len));
923    hdr_len = _endian_decode(hdr_len);
924
925    memcpy(buf, _buf, hdr_len);
926    *len = hdr_len;
927
928    if (header_revnum) {
929        // copy the DB header revnum
930        filemgr_header_revnum_t _revnum;
931        memcpy(&_revnum, _buf + hdr_len, sizeof(_revnum));
932        *header_revnum = _endian_decode(_revnum);
933    }
934    if (seqnum) {
935        // copy default KVS's seqnum
936        fdb_seqnum_t _seqnum;
937        memcpy(&_seqnum, _buf + hdr_len + sizeof(filemgr_header_revnum_t),
938               sizeof(_seqnum));
939        *seqnum = _endian_decode(_seqnum);
940    }
941
942    _filemgr_release_temp_buf(_buf);
943
944    return status;
945}
946
947uint64_t filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid,
948                                   void *buf, size_t *len, fdb_seqnum_t *seqnum,
949                                   err_log_callback *log_callback)
950{
951    uint8_t *_buf;
952    uint8_t marker[BLK_MARKER_SIZE];
953    fdb_seqnum_t _seqnum;
954    filemgr_header_revnum_t _revnum;
955    filemgr_header_len_t hdr_len;
956    filemgr_magic_t magic;
957    bid_t _prev_bid, prev_bid;
958    int found = 0;
959
960    if (!bid || bid == BLK_NOT_FOUND) {
961        *len = 0; // No other header available
962        return bid;
963    }
964    _buf = (uint8_t *)_filemgr_get_temp_buf();
965
966    // Reverse scan the file for a previous DB header
967    do {
968        // Get prev_bid from the current header.
969        // Since the current header is already cached during the previous
970        // operation, no disk I/O will be triggered.
971        if (filemgr_read(file, (bid_t)bid, _buf, log_callback, true)
972                != FDB_RESULT_SUCCESS) {
973            break;
974        }
975
976        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
977               BLK_MARKER_SIZE);
978        memcpy(&magic,
979               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
980               sizeof(magic));
981        magic = _endian_decode(magic);
982
983        if (marker[0] != BLK_MARKER_DBHEADER ||
984            magic != FILEMGR_MAGIC) {
985            // not a header block
986            // this happens when this function is invoked between
987            // fdb_set() call and fdb_commit() call, so the last block
988            // in the file is not a header block
989            bid_t latest_hdr = filemgr_get_header_bid(file);
990            if (latest_hdr != BLK_NOT_FOUND && bid > latest_hdr) {
991                // get the latest header BID
992                bid = latest_hdr;
993            } else {
994                break;
995            }
996        } else {
997            memcpy(&_prev_bid,
998                   _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
999                       sizeof(hdr_len) - sizeof(_prev_bid),
1000                   sizeof(_prev_bid));
1001            prev_bid = _endian_decode(_prev_bid);
1002            if (bid <= prev_bid) {
1003                // no more prev header, or broken linked list
1004                break;
1005            }
1006            bid = prev_bid;
1007        }
1008
1009        // Read the prev header
1010        fdb_status fs = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
1011        if (fs != FDB_RESULT_SUCCESS) {
1012            fdb_log(log_callback, fs,
1013                    "Failed to read a previous database header with block id %" _F64 " in "
1014                    "a database file '%s'", bid, file->filename);
1015            break;
1016        }
1017
1018        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1019               BLK_MARKER_SIZE);
1020        if (marker[0] != BLK_MARKER_DBHEADER) {
1021            if (bid) {
1022                // broken linked list
1023                fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1024                        "A block marker of the previous database header block id %"
1025                        _F64 " in "
1026                        "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1027                        bid, file->filename);
1028            }
1029            break;
1030        }
1031
1032        memcpy(&magic,
1033               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1034               sizeof(magic));
1035        magic = _endian_decode(magic);
1036        if (magic != FILEMGR_MAGIC) {
1037            // broken linked list
1038            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1039                    "A block magic value of the previous database header block id %" _F64 " in "
1040                    "a database file '%s' does NOT match FILEMGR_MAGIC!",
1041                    bid, file->filename);
1042            break;
1043        }
1044
1045        memcpy(&hdr_len,
1046               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1047               sizeof(hdr_len), sizeof(hdr_len));
1048        hdr_len = _endian_decode(hdr_len);
1049
1050        if (buf) {
1051            memcpy(buf, _buf, hdr_len);
1052        }
1053        memcpy(&_revnum, _buf + hdr_len,
1054               sizeof(filemgr_header_revnum_t));
1055        memcpy(&_seqnum,
1056               _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1057               sizeof(fdb_seqnum_t));
1058        *seqnum = _endian_decode(_seqnum);
1059        *len = hdr_len;
1060        found = 1;
1061        break;
1062    } while (false); // no repetition
1063
1064    if (!found) { // no other header found till end of file
1065        *len = 0;
1066    }
1067
1068    _filemgr_release_temp_buf(_buf);
1069
1070    return bid;
1071}
1072
1073fdb_status filemgr_close(struct filemgr *file, bool cleanup_cache_onclose,
1074                         const char *orig_file_name,
1075                         err_log_callback *log_callback)
1076{
1077    int rv = FDB_RESULT_SUCCESS;
1078
1079    spin_lock(&filemgr_openlock); // Grab the filemgr lock to avoid the race with
1080                                  // filemgr_open() because file->lock won't
1081                                  // prevent the race condition.
1082
1083    // remove filemgr structure if no thread refers to the file
1084    spin_lock(&file->lock);
1085    if (--(file->ref_count) == 0) {
1086        if (global_config.ncacheblock > 0 &&
1087            atomic_get_uint8_t(&file->status) != FILE_REMOVED_PENDING) {
1088            spin_unlock(&file->lock);
1089            // discard all dirty blocks belonged to this file
1090            bcache_remove_dirty_blocks(file);
1091        } else {
1092            // If the file is in pending removal (i.e., FILE_REMOVED_PENDING),
1093            // then its dirty block entries will be cleaned up in either
1094            // filemgr_free_func() or register_file_removal() below.
1095            spin_unlock(&file->lock);
1096        }
1097
1098        if (wal_is_initialized(file)) {
1099            wal_close(file);
1100        }
1101
1102        spin_lock(&file->lock);
1103        rv = file->ops->close(file->fd);
1104        if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING) {
1105            _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1106
1107            bool foreground_deletion = false;
1108
1109            // immediately remove file if background remove function is not set
1110            if (!lazy_file_deletion_enabled ||
1111                (file->new_file && file->new_file->in_place_compaction)) {
1112                // TODO: to avoid the scenario below, we prevent background
1113                //       deletion of in-place compacted files at this time.
1114                // 1) In-place compacted from 'A' to 'A.1'.
1115                // 2) Request to delete 'A'.
1116                // 3) Close 'A.1'; since 'A' is not deleted yet, 'A.1' is not renamed.
1117                // 4) User opens DB file using its original name 'A', not 'A.1'.
1118                // 5) Old file 'A' is opened, and then background thread deletes 'A'.
1119                // 6) Crash!
1120                remove(file->filename);
1121                foreground_deletion = true;
1122            }
1123
1124            // we can release lock becuase no one will open this file
1125            spin_unlock(&file->lock);
1126            struct hash_elem *ret = hash_remove(&hash, &file->e);
1127            fdb_assert(ret, 0, 0);
1128            spin_unlock(&filemgr_openlock);
1129
1130            if (foreground_deletion) {
1131                filemgr_free_func(&file->e);
1132            } else {
1133                register_file_removal(file, log_callback);
1134            }
1135            return (fdb_status) rv;
1136        } else {
1137            if (cleanup_cache_onclose) {
1138                _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1139                if (file->in_place_compaction && orig_file_name) {
1140                    struct hash_elem *elem = NULL;
1141                    struct filemgr query;
1142                    uint32_t old_file_refcount = 0;
1143
1144                    query.filename = (char *)orig_file_name;
1145                    elem = hash_find(&hash, &query.e);
1146
1147                    if (file->old_filename) {
1148                        struct hash_elem *elem_old = NULL;
1149                        struct filemgr query_old;
1150                        struct filemgr *old_file = NULL;
1151
1152                        // get old file's ref count if exists
1153                        query_old.filename = file->old_filename;
1154                        elem_old = hash_find(&hash, &query_old.e);
1155                        if (elem_old) {
1156                            old_file = _get_entry(elem_old, struct filemgr, e);
1157                            old_file_refcount = old_file->ref_count;
1158                        }
1159                    }
1160
1161                    // If old file is opened by other handle, renaming should be
1162                    // postponed. It will be renamed later by the handle referring
1163                    // to the old file.
1164                    if (!elem && old_file_refcount == 0 &&
1165                        is_file_removed(orig_file_name)) {
1166                        // If background file removal is not done yet, we postpone
1167                        // file renaming at this time.
1168                        if (rename(file->filename, orig_file_name) < 0) {
1169                            // Note that the renaming failure is not a critical
1170                            // issue because the last compacted file will be automatically
1171                            // identified and opened in the next fdb_open call.
1172                            _log_errno_str(file->ops, log_callback, FDB_RESULT_FILE_RENAME_FAIL,
1173                                           "CLOSE", file->filename);
1174                        }
1175                    }
1176                }
1177                spin_unlock(&file->lock);
1178                // Clean up global hash table, WAL index, and buffer cache.
1179                struct hash_elem *ret = hash_remove(&hash, &file->e);
1180                fdb_assert(ret, file, 0);
1181                spin_unlock(&filemgr_openlock);
1182                filemgr_free_func(&file->e);
1183                return (fdb_status) rv;
1184            } else {
1185                atomic_store_uint8_t(&file->status, FILE_CLOSED);
1186            }
1187        }
1188    }
1189
1190    _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1191
1192    spin_unlock(&file->lock);
1193    spin_unlock(&filemgr_openlock);
1194    return (fdb_status) rv;
1195}
1196
1197void filemgr_remove_all_buffer_blocks(struct filemgr *file)
1198{
1199    // remove all cached blocks
1200    if (global_config.ncacheblock > 0 && file->bcache) {
1201        bcache_remove_dirty_blocks(file);
1202        bcache_remove_clean_blocks(file);
1203        bcache_remove_file(file);
1204        file->bcache = NULL;
1205    }
1206}
1207
1208void filemgr_free_func(struct hash_elem *h)
1209{
1210    struct filemgr *file = _get_entry(h, struct filemgr, e);
1211
1212    spin_lock(&file->lock);
1213    if (file->prefetch_status == FILEMGR_PREFETCH_RUNNING) {
1214        // prefetch thread is running
1215        void *ret;
1216        file->prefetch_status = FILEMGR_PREFETCH_ABORT;
1217        spin_unlock(&file->lock);
1218        // wait
1219        thread_join(file->prefetch_tid, &ret);
1220    } else {
1221        spin_unlock(&file->lock);
1222    }
1223
1224    // remove all cached blocks
1225    if (global_config.ncacheblock > 0 && file->bcache) {
1226        bcache_remove_dirty_blocks(file);
1227        bcache_remove_clean_blocks(file);
1228        bcache_remove_file(file);
1229        file->bcache = NULL;
1230    }
1231
1232    if (file->kv_header) {
1233        // multi KV intance mode & KV header exists
1234        file->free_kv_header(file);
1235    }
1236
1237    // free global transaction
1238    wal_remove_transaction(file, &file->global_txn);
1239    free(file->global_txn.items);
1240    free(file->global_txn.wrapper);
1241
1242    // destroy WAL
1243    if (wal_is_initialized(file)) {
1244        wal_shutdown(file);
1245        size_t i = 0;
1246        size_t num_all_shards = wal_get_num_all_shards(file);
1247        // Free all WAL shards (including compactor's shard)
1248        for (; i < num_all_shards; ++i) {
1249            hash_free(&file->wal->key_shards[i].hash_bykey);
1250            spin_destroy(&file->wal->key_shards[i].lock);
1251            hash_free(&file->wal->seq_shards[i].hash_byseq);
1252            spin_destroy(&file->wal->seq_shards[i].lock);
1253        }
1254        spin_destroy(&file->wal->lock);
1255        atomic_destroy_uint32_t(&file->wal->size);
1256        atomic_destroy_uint32_t(&file->wal->num_flushable);
1257        atomic_destroy_uint64_t(&file->wal->datasize);
1258        free(file->wal->key_shards);
1259        free(file->wal->seq_shards);
1260    }
1261    free(file->wal);
1262
1263    // free filename and header
1264    free(file->filename);
1265    if (file->header.data) free(file->header.data);
1266    // free old filename if any
1267    free(file->old_filename);
1268
1269    // destroy locks
1270    spin_destroy(&file->lock);
1271
1272#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1273    plock_destroy(&file->plock);
1274#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1275    int i;
1276    for (i=0;i<DLOCK_MAX;++i) {
1277        mutex_destroy(&file->data_mutex[i]);
1278    }
1279#else
1280    int i;
1281    for (i=0;i<DLOCK_MAX;++i) {
1282        spin_destroy(&file->data_spinlock[i]);
1283    }
1284#endif //__FILEMGR_DATA_PARTIAL_LOCK
1285
1286    mutex_destroy(&file->writer_lock.mutex);
1287
1288    atomic_destroy_uint64_t(&file->pos);
1289    atomic_destroy_uint64_t(&file->last_commit);
1290    atomic_destroy_uint32_t(&file->throttling_delay);
1291
1292    // free file structure
1293    free(file->config);
1294    free(file);
1295}
1296
1297// permanently remove file from cache (not just close)
1298// LCOV_EXCL_START
1299void filemgr_remove_file(struct filemgr *file)
1300{
1301    struct hash_elem *ret;
1302
1303    fdb_assert(file, file, NULL);
1304    fdb_assert(file->ref_count <= 0, file->ref_count, 0);
1305
1306    // remove from global hash table
1307    spin_lock(&filemgr_openlock);
1308    ret = hash_remove(&hash, &file->e);
1309    fdb_assert(ret, ret, NULL);
1310    spin_unlock(&filemgr_openlock);
1311
1312    if (!lazy_file_deletion_enabled ||
1313        (file->new_file && file->new_file->in_place_compaction)) {
1314        filemgr_free_func(&file->e);
1315    } else {
1316        register_file_removal(file, NULL);
1317    }
1318}
1319// LCOV_EXCL_STOP
1320
1321static
1322void *_filemgr_is_closed(struct hash_elem *h, void *ctx) {
1323    struct filemgr *file = _get_entry(h, struct filemgr, e);
1324    void *ret;
1325    spin_lock(&file->lock);
1326    if (file->ref_count != 0) {
1327        ret = (void *)file;
1328    } else {
1329        ret = NULL;
1330    }
1331    spin_unlock(&file->lock);
1332    return ret;
1333}
1334
1335fdb_status filemgr_shutdown()
1336{
1337    fdb_status ret = FDB_RESULT_SUCCESS;
1338    void *open_file;
1339    if (filemgr_initialized) {
1340
1341#ifndef SPIN_INITIALIZER
1342        // Windows: check if spin lock is already destroyed.
1343        if (InterlockedCompareExchange(&initial_lock_status, 1, 2) == 2) {
1344            spin_lock(&initial_lock);
1345        } else {
1346            // filemgr is already shut down
1347            return ret;
1348        }
1349#else
1350        spin_lock(&initial_lock);
1351#endif
1352
1353        if (!filemgr_initialized) {
1354            // filemgr is already shut down
1355#ifdef SPIN_INITIALIZER
1356            spin_unlock(&initial_lock);
1357#endif
1358            return ret;
1359        }
1360
1361        open_file = hash_scan(&hash, _filemgr_is_closed, NULL);
1362        if (!open_file) {
1363            hash_free_active(&hash, filemgr_free_func);
1364            if (global_config.ncacheblock > 0) {
1365                bcache_shutdown();
1366            }
1367            filemgr_initialized = 0;
1368#ifndef SPIN_INITIALIZER
1369            initial_lock_status = 0;
1370#else
1371            initial_lock = SPIN_INITIALIZER;
1372#endif
1373            _filemgr_shutdown_temp_buf();
1374            spin_unlock(&initial_lock);
1375#ifndef SPIN_INITIALIZER
1376            spin_destroy(&initial_lock);
1377#endif
1378        } else {
1379            spin_unlock(&initial_lock);
1380            ret = FDB_RESULT_FILE_IS_BUSY;
1381        }
1382    }
1383    return ret;
1384}
1385
1386bid_t filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)
1387{
1388    spin_lock(&file->lock);
1389    bid_t bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1390    atomic_add_uint64_t(&file->pos, file->blocksize);
1391
1392    if (global_config.ncacheblock <= 0) {
1393        // if block cache is turned off, write the allocated block before use
1394        uint8_t _buf = 0x0;
1395        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1396                                       atomic_get_uint64_t(&file->pos) - 1);
1397        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1398    }
1399    spin_unlock(&file->lock);
1400
1401    return bid;
1402}
1403
1404void filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin,
1405                            bid_t *end, err_log_callback *log_callback)
1406{
1407    spin_lock(&file->lock);
1408    *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1409    *end = *begin + nblock - 1;
1410    atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1411
1412    if (global_config.ncacheblock <= 0) {
1413        // if block cache is turned off, write the allocated block before use
1414        uint8_t _buf = 0x0;
1415        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1416                                       atomic_get_uint64_t(&file->pos) - 1);
1417        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1418    }
1419    spin_unlock(&file->lock);
1420}
1421
1422// atomically allocate NBLOCK blocks only when current file position is same to nextbid
1423bid_t filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock,
1424                                  bid_t *begin, bid_t *end,
1425                                  err_log_callback *log_callback)
1426{
1427    bid_t bid;
1428    spin_lock(&file->lock);
1429    bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1430    if (bid == nextbid) {
1431        *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1432        *end = *begin + nblock - 1;
1433        atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1434
1435        if (global_config.ncacheblock <= 0) {
1436            // if block cache is turned off, write the allocated block before use
1437            uint8_t _buf = 0x0;
1438            ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1439                                           atomic_get_uint64_t(&file->pos));
1440            _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1441        }
1442    }else{
1443        *begin = BLK_NOT_FOUND;
1444        *end = BLK_NOT_FOUND;
1445    }
1446    spin_unlock(&file->lock);
1447    return bid;
1448}
1449
1450#ifdef __CRC32
1451INLINE fdb_status _filemgr_crc32_check(struct filemgr *file, void *buf)
1452{
1453    if ( *((uint8_t*)buf + file->blocksize-1) == BLK_MARKER_BNODE ) {
1454        uint32_t crc_file, crc;
1455        memcpy(&crc_file, (uint8_t *) buf + BTREE_CRC_OFFSET, sizeof(crc_file));
1456        crc_file = _endian_decode(crc_file);
1457        memset((uint8_t *) buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1458        crc = chksum(buf, file->blocksize);
1459        if (crc != crc_file) {
1460            return FDB_RESULT_CHECKSUM_ERROR;
1461        }
1462    }
1463    return FDB_RESULT_SUCCESS;
1464}
1465#endif
1466
1467void filemgr_invalidate_block(struct filemgr *file, bid_t bid)
1468{
1469    if (global_config.ncacheblock > 0) {
1470        bcache_invalidate_block(file, bid);
1471    }
1472}
1473
1474fdb_status filemgr_read(struct filemgr *file, bid_t bid, void *buf,
1475                        err_log_callback *log_callback,
1476                        bool read_on_cache_miss)
1477{
1478    size_t lock_no;
1479    ssize_t r;
1480    uint64_t pos = bid * file->blocksize;
1481    fdb_status status = FDB_RESULT_SUCCESS;
1482    uint64_t curr_pos = atomic_get_uint64_t(&file->pos);
1483    fdb_assert(pos < curr_pos, pos, curr_pos);
1484
1485    if (global_config.ncacheblock > 0) {
1486        lock_no = bid % DLOCK_MAX;
1487        (void)lock_no;
1488
1489#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1490        plock_entry_t *plock_entry = NULL;
1491        bid_t is_writer = 0;
1492#endif
1493        bool locked = false;
1494        // Note: we don't need to grab lock for committed blocks
1495        // because they are immutable so that no writer will interfere and
1496        // overwrite dirty data
1497        if (filemgr_is_writable(file, bid)) {
1498#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1499            plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1500#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1501            mutex_lock(&file->data_mutex[lock_no]);
1502#else
1503            spin_lock(&file->data_spinlock[lock_no]);
1504#endif //__FILEMGR_DATA_PARTIAL_LOCK
1505            locked = true;
1506        }
1507
1508        r = bcache_read(file, bid, buf);
1509        if (r == 0) {
1510            // cache miss
1511            if (!read_on_cache_miss) {
1512                if (locked) {
1513#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1514                    plock_unlock(&file->plock, plock_entry);
1515#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1516                    mutex_unlock(&file->data_mutex[lock_no]);
1517#else
1518                    spin_unlock(&file->data_spinlock[lock_no]);
1519#endif //__FILEMGR_DATA_PARTIAL_LOCK
1520                }
1521                return FDB_RESULT_READ_FAIL;
1522            }
1523
1524            // if normal file, just read a block
1525            r = file->ops->pread(file->fd, buf, file->blocksize, pos);
1526            if (r != file->blocksize) {
1527                _log_errno_str(file->ops, log_callback,
1528                               (fdb_status) r, "READ", file->filename);
1529                if (locked) {
1530#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1531                    plock_unlock(&file->plock, plock_entry);
1532#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1533                    mutex_unlock(&file->data_mutex[lock_no]);
1534#else
1535                    spin_unlock(&file->data_spinlock[lock_no]);
1536#endif //__FILEMGR_DATA_PARTIAL_LOCK
1537                }
1538                return (fdb_status)r;
1539            }
1540#ifdef __CRC32
1541            status = _filemgr_crc32_check(file, buf);
1542            if (status != FDB_RESULT_SUCCESS) {
1543                _log_errno_str(file->ops, log_callback, status, "READ",
1544                        file->filename);
1545                if (locked) {
1546#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1547                    plock_unlock(&file->plock, plock_entry);
1548#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1549                    mutex_unlock(&file->data_mutex[lock_no]);
1550#else
1551                    spin_unlock(&file->data_spinlock[lock_no]);
1552#endif //__FILEMGR_DATA_PARTIAL_LOCK
1553                }
1554                return status;
1555            }
1556#endif
1557            r = bcache_write(file, bid, buf, BCACHE_REQ_CLEAN);
1558            if (r != global_config.blocksize) {
1559                if (locked) {
1560#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1561                    plock_unlock(&file->plock, plock_entry);
1562#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1563                    mutex_unlock(&file->data_mutex[lock_no]);
1564#else
1565                    spin_unlock(&file->data_spinlock[lock_no]);
1566#endif //__FILEMGR_DATA_PARTIAL_LOCK
1567                }
1568                _log_errno_str(file->ops, log_callback,
1569                               (fdb_status) r, "WRITE", file->filename);
1570                return FDB_RESULT_WRITE_FAIL;
1571            }
1572        }
1573        if (locked) {
1574#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1575            plock_unlock(&file->plock, plock_entry);
1576#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1577            mutex_unlock(&file->data_mutex[lock_no]);
1578#else
1579            spin_unlock(&file->data_spinlock[lock_no]);
1580#endif //__FILEMGR_DATA_PARTIAL_LOCK
1581        }
1582    } else {
1583        if (!read_on_cache_miss) {
1584            return FDB_RESULT_READ_FAIL;
1585        }
1586
1587        r = file->ops->pread(file->fd, buf, file->blocksize, pos);
1588        if (r != file->blocksize) {
1589            _log_errno_str(file->ops, log_callback, (fdb_status) r, "READ",
1590                           file->filename);
1591            return (fdb_status)r;
1592        }
1593
1594#ifdef __CRC32
1595        status = _filemgr_crc32_check(file, buf);
1596        if (status != FDB_RESULT_SUCCESS) {
1597            _log_errno_str(file->ops, log_callback, status, "READ",
1598                           file->filename);
1599            return status;
1600        }
1601#endif
1602    }
1603    return status;
1604}
1605
1606fdb_status filemgr_write_offset(struct filemgr *file, bid_t bid,
1607                                uint64_t offset, uint64_t len, void *buf,
1608                                err_log_callback *log_callback)
1609{
1610    fdb_assert(offset + len <= file->blocksize, offset + len, file);
1611
1612    size_t lock_no;
1613    ssize_t r = 0;
1614    uint64_t pos = bid * file->blocksize + offset;
1615    uint64_t curr_commit_pos = atomic_get_uint64_t(&file->last_commit);
1616    fdb_assert(pos >= curr_commit_pos, pos, curr_commit_pos);
1617
1618    if (global_config.ncacheblock > 0) {
1619        lock_no = bid % DLOCK_MAX;
1620        (void)lock_no;
1621
1622        bool locked = false;
1623#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1624        plock_entry_t *plock_entry;
1625        bid_t is_writer = 1;
1626        plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1627#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1628        mutex_lock(&file->data_mutex[lock_no]);
1629#else
1630        spin_lock(&file->data_spinlock[lock_no]);
1631#endif //__FILEMGR_DATA_PARTIAL_LOCK
1632        locked = true;
1633
1634        if (len == file->blocksize) {
1635            // write entire block .. we don't need to read previous block
1636            r = bcache_write(file, bid, buf, BCACHE_REQ_DIRTY);
1637            if (r != global_config.blocksize) {
1638                if (locked) {
1639#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1640                    plock_unlock(&file->plock, plock_entry);
1641#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1642                    mutex_unlock(&file->data_mutex[lock_no]);
1643#else
1644                    spin_unlock(&file->data_spinlock[lock_no]);
1645#endif //__FILEMGR_DATA_PARTIAL_LOCK
1646                }
1647                _log_errno_str(file->ops, log_callback,
1648                               (fdb_status) r, "WRITE", file->filename);
1649                return FDB_RESULT_WRITE_FAIL;
1650            }
1651        } else {
1652            // partially write buffer cache first
1653            r = bcache_write_partial(file, bid, buf, offset, len);
1654            if (r == 0) {
1655                // cache miss
1656                // write partially .. we have to read previous contents of the block
1657                uint64_t cur_file_pos = file->ops->goto_eof(file->fd);
1658                bid_t cur_file_last_bid = cur_file_pos / file->blocksize;
1659                void *_buf = _filemgr_get_temp_buf();
1660
1661                if (bid >= cur_file_last_bid) {
1662                    // this is the first time to write this block
1663                    // we don't need to read previous block from file.
1664                } else {
1665                    r = file->ops->pread(file->fd, _buf, file->blocksize,
1666                                         bid * file->blocksize);
1667                    if (r != file->blocksize) {
1668                        if (locked) {
1669#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1670                            plock_unlock(&file->plock, plock_entry);
1671#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1672                            mutex_unlock(&file->data_mutex[lock_no]);
1673#else
1674                            spin_unlock(&file->data_spinlock[lock_no]);
1675#endif //__FILEMGR_DATA_PARTIAL_LOCK
1676                        }
1677                        _filemgr_release_temp_buf(_buf);
1678                        _log_errno_str(file->ops, log_callback, (fdb_status) r,
1679                                       "READ", file->filename);
1680                        return FDB_RESULT_READ_FAIL;
1681                    }
1682                }
1683                memcpy((uint8_t *)_buf + offset, buf, len);
1684                r = bcache_write(file, bid, _buf, BCACHE_REQ_DIRTY);
1685                if (r != global_config.blocksize) {
1686                    if (locked) {
1687#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1688                        plock_unlock(&file->plock, plock_entry);
1689#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1690                        mutex_unlock(&file->data_mutex[lock_no]);
1691#else
1692                        spin_unlock(&file->data_spinlock[lock_no]);
1693#endif //__FILEMGR_DATA_PARTIAL_LOCK
1694                    }
1695                    _filemgr_release_temp_buf(_buf);
1696                    _log_errno_str(file->ops, log_callback,
1697                            (fdb_status) r, "WRITE", file->filename);
1698                    return FDB_RESULT_WRITE_FAIL;
1699                }
1700
1701                _filemgr_release_temp_buf(_buf);
1702            } // cache miss
1703        } // full block or partial block
1704
1705        if (locked) {
1706#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1707            plock_unlock(&file->plock, plock_entry);
1708#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1709            mutex_unlock(&file->data_mutex[lock_no]);
1710#else
1711            spin_unlock(&file->data_spinlock[lock_no]);
1712#endif //__FILEMGR_DATA_PARTIAL_LOCK
1713        }
1714
1715    } else { // block cache disabled
1716
1717#ifdef __CRC32
1718        if (len == file->blocksize) {
1719            uint8_t marker = *((uint8_t*)buf + file->blocksize - 1);
1720            if (marker == BLK_MARKER_BNODE) {
1721                memset((uint8_t *)buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1722                uint32_t crc32 = chksum(buf, file->blocksize);
1723                crc32 = _endian_encode(crc32);
1724                memcpy((uint8_t *)buf + BTREE_CRC_OFFSET, &crc32, sizeof(crc32));
1725            }
1726        }
1727#endif
1728
1729        r = file->ops->pwrite(file->fd, buf, len, pos);
1730        _log_errno_str(file->ops, log_callback, (fdb_status) r, "WRITE", file->filename);
1731        if ((uint64_t)r != len) {
1732            return FDB_RESULT_WRITE_FAIL;
1733        }
1734    } // block cache check
1735    return FDB_RESULT_SUCCESS;
1736}
1737
1738fdb_status filemgr_write(struct filemgr *file, bid_t bid, void *buf,
1739                   err_log_callback *log_callback)
1740{
1741    return filemgr_write_offset(file, bid, 0, file->blocksize, buf,
1742                                log_callback);
1743}
1744
1745fdb_status filemgr_commit(struct filemgr *file,
1746                          err_log_callback *log_callback)
1747{
1748    uint16_t header_len = file->header.size;
1749    uint16_t _header_len;
1750    bid_t _prev_bid;
1751    fdb_seqnum_t _seqnum;
1752    filemgr_header_revnum_t _revnum;
1753    int result = FDB_RESULT_SUCCESS;
1754    filemgr_magic_t magic = FILEMGR_MAGIC;
1755    filemgr_magic_t _magic;
1756
1757    if (global_config.ncacheblock > 0) {
1758        result = bcache_flush(file);
1759        if (result != FDB_RESULT_SUCCESS) {
1760            _log_errno_str(file->ops, log_callback, (fdb_status) result,
1761                           "FLUSH", file->filename);
1762            return (fdb_status)result;
1763        }
1764    }
1765
1766    spin_lock(&file->lock);
1767
1768    if (file->header.size > 0 && file->header.data) {
1769        void *buf = _filemgr_get_temp_buf();
1770        uint8_t marker[BLK_MARKER_SIZE];
1771
1772        // [header data]:        'header_len' bytes   <---+
1773        // [header revnum]:      8 bytes                  |
1774        // [default KVS seqnum]: 8 bytes                  |
1775        // ...                                            |
1776        // (empty)                                    blocksize
1777        // ...                                            |
1778        // [prev header bid]:    8 bytes                  |
1779        // [header length]:      2 bytes                  |
1780        // [magic number]:       8 bytes                  |
1781        // [block marker]:       1 byte               <---+
1782
1783        // header data
1784        memcpy(buf, file->header.data, header_len);
1785        // header rev number
1786        _revnum = _endian_encode(file->header.revnum);
1787        memcpy((uint8_t *)buf + header_len, &_revnum,
1788               sizeof(filemgr_header_revnum_t));
1789        // file's sequence number (default KVS seqnum)
1790        _seqnum = _endian_encode(file->header.seqnum);
1791        memcpy((uint8_t *)buf + header_len + sizeof(filemgr_header_revnum_t),
1792               &_seqnum, sizeof(fdb_seqnum_t));
1793
1794        // prev header bid
1795        _prev_bid = _endian_encode(atomic_get_uint64_t(&file->header.bid));
1796        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1797               - sizeof(header_len) - sizeof(_prev_bid) - BLK_MARKER_SIZE),
1798               &_prev_bid, sizeof(_prev_bid));
1799        // header length
1800        _header_len = _endian_encode(header_len);
1801        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1802               - sizeof(header_len) - BLK_MARKER_SIZE),
1803               &_header_len, sizeof(header_len));
1804        // magic number
1805        _magic = _endian_encode(magic);
1806        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1807               - BLK_MARKER_SIZE), &_magic, sizeof(magic));
1808
1809        // marker
1810        memset(marker, BLK_MARKER_DBHEADER, BLK_MARKER_SIZE);
1811        memcpy((uint8_t *)buf + file->blocksize - BLK_MARKER_SIZE,
1812               marker, BLK_MARKER_SIZE);
1813
1814        ssize_t rv = file->ops->pwrite(file->fd, buf, file->blocksize,
1815                                       atomic_get_uint64_t(&file->pos));
1816        _log_errno_str(file->ops, log_callback, (fdb_status) rv,
1817                       "WRITE", file->filename);
1818        if (rv != file->blocksize) {
1819            _filemgr_release_temp_buf(buf);
1820            spin_unlock(&file->lock);
1821            return FDB_RESULT_WRITE_FAIL;
1822        }
1823        atomic_store_uint64_t(&file->header.bid,
1824                              atomic_get_uint64_t(&file->pos) / file->blocksize);
1825        atomic_add_uint64_t(&file->pos, file->blocksize);
1826
1827        atomic_store_uint64_t(&file->header.dirty_idtree_root, BLK_NOT_FOUND);
1828        atomic_store_uint64_t(&file->header.dirty_seqtree_root, BLK_NOT_FOUND);
1829
1830        _filemgr_release_temp_buf(buf);
1831    }
1832    // race condition?
1833    atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
1834
1835    spin_unlock(&file->lock);
1836
1837    if (file->fflags & FILEMGR_SYNC) {
1838        result = file->ops->fsync(file->fd);
1839        _log_errno_str(file->ops, log_callback, (fdb_status)result, "FSYNC", file->filename);
1840    }
1841    return (fdb_status) result;
1842}
1843
1844fdb_status filemgr_sync(struct filemgr *file, err_log_callback *log_callback)
1845{
1846    fdb_status result = FDB_RESULT_SUCCESS;
1847    if (global_config.ncacheblock > 0) {
1848        result = bcache_flush(file);
1849        if (result != FDB_RESULT_SUCCESS) {
1850            _log_errno_str(file->ops, log_callback, (fdb_status) result,
1851                           "FLUSH", file->filename);
1852            return result;
1853        }
1854    }
1855
1856    if (file->fflags & FILEMGR_SYNC) {
1857        int rv = file->ops->fsync(file->fd);
1858        _log_errno_str(file->ops, log_callback, (fdb_status)rv, "FSYNC", file->filename);
1859        return (fdb_status) rv;
1860    }
1861    return result;
1862}
1863
1864int filemgr_update_file_status(struct filemgr *file, file_status_t status,
1865                                char *old_filename)
1866{
1867    int ret = 1;
1868    spin_lock(&file->lock);
1869    atomic_store_uint8_t(&file->status, status);
1870    if (old_filename) {
1871        if (!file->old_filename) {
1872            file->old_filename = old_filename;
1873        } else {
1874            ret = 0;
1875            fdb_assert(file->ref_count, file->ref_count, 0);
1876            free(old_filename);
1877        }
1878    }
1879    spin_unlock(&file->lock);
1880    return ret;
1881}
1882
1883void filemgr_set_compaction_state(struct filemgr *old_file, struct filemgr *new_file,
1884                                  file_status_t status)
1885{
1886    spin_lock(&old_file->lock);
1887    old_file->new_file = new_file;
1888    atomic_store_uint8_t(&old_file->status, status);
1889    spin_unlock(&old_file->lock);
1890}
1891
1892// Check if there is a file that still points to the old_file that is being
1893// compacted away. If so open the file and return its pointer.
1894static
1895void *_filemgr_check_stale_link(struct hash_elem *h, void *ctx) {
1896    struct filemgr *cur_file = (struct filemgr *)ctx;
1897    struct filemgr *file = _get_entry(h, struct filemgr, e);
1898    spin_lock(&file->lock);
1899    if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING &&
1900        file->new_file == cur_file) {
1901        // Incrementing reference counter below is the same as filemgr_open()
1902        // We need to do this to ensure that the pointer returned does not
1903        // get freed outside the filemgr_open lock
1904        file->ref_count++;
1905        spin_unlock(&file->lock);
1906        return (void *)file;
1907    }
1908    spin_unlock(&file->lock);
1909    return (void *)NULL;
1910}
1911
1912struct filemgr *filemgr_search_stale_links(struct filemgr *cur_file) {
1913    struct filemgr *very_old_file;
1914    spin_lock(&filemgr_openlock);
1915    very_old_file = (struct filemgr *)hash_scan(&hash,
1916                                         _filemgr_check_stale_link, cur_file);
1917    spin_unlock(&filemgr_openlock);
1918    return very_old_file;
1919}
1920
1921char *filemgr_redirect_old_file(struct filemgr *very_old_file,
1922                                     struct filemgr *new_file,
1923                                     filemgr_redirect_hdr_func
1924                                     redirect_header_func) {
1925    size_t old_header_len, new_header_len;
1926    uint16_t new_filename_len;
1927    char *past_filename;
1928    spin_lock(&very_old_file->lock);
1929    fdb_assert(very_old_file->header.size, very_old_file->header.size, 0);
1930    fdb_assert(very_old_file->new_file, very_old_file->new_file, 0);
1931    old_header_len = very_old_file->header.size;
1932    new_filename_len = strlen(new_file->filename);
1933    // Find out the new DB header length with new_file's filename
1934    new_header_len = old_header_len - strlen(very_old_file->new_file->filename)
1935        + new_filename_len;
1936    // As we are going to change the new_filename field in the DB header of the
1937    // very_old_file, maybe reallocate DB header buf to accomodate bigger value
1938    if (new_header_len > old_header_len) {
1939        very_old_file->header.data = realloc(very_old_file->header.data,
1940                new_header_len);
1941    }
1942    very_old_file->new_file = new_file; // Re-direct very_old_file to new_file
1943    past_filename = redirect_header_func((uint8_t *)very_old_file->header.data,
1944            new_file->filename, new_filename_len + 1);//Update in-memory header
1945    very_old_file->header.size = new_header_len;
1946    ++(very_old_file->header.revnum);
1947
1948    spin_unlock(&very_old_file->lock);
1949    return past_filename;
1950}
1951
1952void filemgr_remove_pending(struct filemgr *old_file, struct filemgr *new_file)
1953{
1954    fdb_assert(new_file, new_file, old_file);
1955
1956    spin_lock(&old_file->lock);
1957    if (old_file->ref_count > 0) {
1958        // delay removing
1959        old_file->new_file = new_file;
1960        atomic_store_uint8_t(&old_file->status, FILE_REMOVED_PENDING);
1961        spin_unlock(&old_file->lock);
1962    } else {
1963        // immediatly remove
1964        // LCOV_EXCL_START
1965        spin_unlock(&old_file->lock);
1966
1967        if (!lazy_file_deletion_enabled ||
1968            (old_file->new_file && old_file->new_file->in_place_compaction)) {
1969            remove(old_file->filename);
1970        }
1971        filemgr_remove_file(old_file);
1972        // LCOV_EXCL_STOP
1973    }
1974}
1975
1976// migrate default kv store stats over to new_file
1977struct kvs_ops_stat *filemgr_migrate_op_stats(struct filemgr *old_file,
1978                                              struct filemgr *new_file,
1979                                              struct kvs_info *kvs)
1980{
1981    kvs_ops_stat *ret = NULL;
1982    fdb_assert(new_file, new_file, old_file);
1983
1984    spin_lock(&old_file->lock);
1985    new_file->header.op_stat = old_file->header.op_stat;
1986    ret = &new_file->header.op_stat;
1987    spin_unlock(&old_file->lock);
1988    return ret;
1989}
1990
1991// Note: filemgr_openlock should be held before calling this function.
1992fdb_status filemgr_destroy_file(char *filename,
1993                                struct filemgr_config *config,
1994                                struct hash *destroy_file_set)
1995{
1996    struct filemgr *file = NULL;
1997    struct hash to_destroy_files;
1998    struct hash *destroy_set = (destroy_file_set ? destroy_file_set :
1999                                                  &to_destroy_files);
2000    struct filemgr query;
2001    struct hash_elem *e = NULL;
2002    fdb_status status = FDB_RESULT_SUCCESS;
2003    char *old_filename = NULL;
2004
2005    if (!destroy_file_set) { // top level or non-recursive call
2006        hash_init(destroy_set, NBUCKET, _file_hash, _file_cmp);
2007    }
2008
2009    query.filename = filename;
2010    // check whether file is already being destroyed in parent recursive call
2011    e = hash_find(destroy_set, &query.e);
2012    if (e) { // Duplicate filename found, nothing to be done in this call
2013        if (!destroy_file_set) { // top level or non-recursive call
2014            hash_free(destroy_set);
2015        }
2016        return status;
2017    } else {
2018        // Remember file. Stack value ok IFF single direction recursion
2019        hash_insert(destroy_set, &query.e);
2020    }
2021
2022    // check global list of known files to see if it is already opened or not
2023    e = hash_find(&hash, &query.e);
2024    if (e) {
2025        // already opened (return existing structure)
2026        file = _get_entry(e, struct filemgr, e);
2027
2028        spin_lock(&file->lock);
2029        if (file->ref_count) {
2030            spin_unlock(&file->lock);
2031            status = FDB_RESULT_FILE_IS_BUSY;
2032            if (!destroy_file_set) { // top level or non-recursive call
2033                hash_free(destroy_set);
2034            }
2035            return status;
2036        }
2037        spin_unlock(&file->lock);
2038        if (file->old_filename) {
2039            status = filemgr_destroy_file(file->old_filename, config,
2040                                          destroy_set);
2041            if (status != FDB_RESULT_SUCCESS) {
2042                if (!destroy_file_set) { // top level or non-recursive call
2043                    hash_free(destroy_set);
2044                }
2045                return status;
2046            }
2047        }
2048
2049        // Cleanup file from in-memory as well as on-disk
2050        e = hash_remove(&hash, &file->e);
2051        fdb_assert(e, e, 0);
2052        filemgr_free_func(&file->e);
2053        if (filemgr_does_file_exist(filename) == FDB_RESULT_SUCCESS) {
2054            if (remove(filename)) {
2055                status = FDB_RESULT_FILE_REMOVE_FAIL;
2056            }
2057        }
2058    } else { // file not in memory, read on-disk to destroy older versions..
2059        file = (struct filemgr *)alca(struct filemgr, 1);
2060        file->filename = filename;
2061        file->ops = get_filemgr_ops();
2062        file->fd = file->ops->open(file->filename, O_RDWR, 0666);
2063        file->blocksize = global_config.blocksize;
2064        if (file->fd < 0) {
2065            if (file->fd != FDB_RESULT_NO_SUCH_FILE) {
2066                if (!destroy_file_set) { // top level or non-recursive call
2067                    hash_free(destroy_set);
2068                }
2069                return FDB_RESULT_OPEN_FAIL;
2070            }
2071        } else { // file successfully opened, seek to end to get DB header
2072            cs_off_t offset = file->ops->goto_eof(file->fd);
2073            if (offset == FDB_RESULT_SEEK_FAIL) {
2074                if (!destroy_file_set) { // top level or non-recursive call
2075                    hash_free(destroy_set);
2076                }
2077                return FDB_RESULT_SEEK_FAIL;
2078            } else { // Need to read DB header which contains old filename
2079                atomic_store_uint64_t(&file->pos, offset);
2080                status = _filemgr_read_header(file, NULL);
2081                if (status != FDB_RESULT_SUCCESS) {
2082                    if (!destroy_file_set) { // top level or non-recursive call
2083                        hash_free(destroy_set);
2084                    }
2085                    file->ops->close(file->fd);
2086                    return status;
2087                }
2088                if (file->header.data) {
2089                    uint16_t *new_filename_len_ptr = (uint16_t *)((char *)
2090                                                     file->header.data + 64);
2091                    uint16_t new_filename_len =
2092                                      _endian_decode(*new_filename_len_ptr);
2093                    uint16_t *old_filename_len_ptr = (uint16_t *)((char *)
2094                                                     file->header.data + 66);
2095                    uint16_t old_filename_len =
2096                                      _endian_decode(*old_filename_len_ptr);
2097                    old_filename = (char *)file->header.data + 68
2098                                   + new_filename_len;
2099                    if (old_filename_len) {
2100                        status = filemgr_destroy_file(old_filename, config,
2101                                                      destroy_set);
2102                    }
2103                    free(file->header.data);
2104                }
2105                file->ops->close(file->fd);
2106                if (status == FDB_RESULT_SUCCESS) {
2107                    if (filemgr_does_file_exist(filename)
2108                                               == FDB_RESULT_SUCCESS) {
2109                        if (remove(filename)) {
2110                            status = FDB_RESULT_FILE_REMOVE_FAIL;
2111                        }
2112                    }
2113                }
2114            }
2115        }
2116    }
2117
2118    if (!destroy_file_set) { // top level or non-recursive call
2119        hash_free(destroy_set);
2120    }
2121
2122    return status;
2123}
2124
2125bool filemgr_is_rollback_on(struct filemgr *file)
2126{
2127    bool rv;
2128    spin_lock(&file->lock);
2129    rv = (file->fflags & FILEMGR_ROLLBACK_IN_PROG);
2130    spin_unlock(&file->lock);
2131    return rv;
2132}
2133
2134void filemgr_set_rollback(struct filemgr *file, uint8_t new_val)
2135{
2136    spin_lock(&file->lock);
2137    if (new_val) {
2138        file->fflags |= FILEMGR_ROLLBACK_IN_PROG;
2139    } else {
2140        file->fflags &= ~FILEMGR_ROLLBACK_IN_PROG;
2141    }
2142    spin_unlock(&file->lock);
2143}
2144
2145void filemgr_set_in_place_compaction(struct filemgr *file,
2146                                     bool in_place_compaction) {
2147    spin_lock(&file->lock);
2148    file->in_place_compaction = in_place_compaction;
2149    spin_unlock(&file->lock);
2150}
2151
2152bool filemgr_is_in_place_compaction_set(struct filemgr *file)
2153{
2154    bool ret = false;
2155    spin_lock(&file->lock);
2156    ret = file->in_place_compaction;
2157    spin_unlock(&file->lock);
2158    return ret;
2159}
2160
2161void filemgr_mutex_openlock(struct filemgr_config *config)
2162{
2163    filemgr_init(config);
2164
2165    spin_lock(&filemgr_openlock);
2166}
2167
2168void filemgr_mutex_openunlock(void)
2169{
2170    spin_unlock(&filemgr_openlock);
2171}
2172
2173void filemgr_mutex_lock(struct filemgr *file)
2174{
2175    mutex_lock(&file->writer_lock.mutex);
2176    file->writer_lock.locked = true;
2177}
2178
2179bool filemgr_mutex_trylock(struct filemgr *file) {
2180    if (mutex_trylock(&file->writer_lock.mutex)) {
2181        file->writer_lock.locked = true;
2182        return true;
2183    }
2184    return false;
2185}
2186
2187void filemgr_mutex_unlock(struct filemgr *file)
2188{
2189    if (file->writer_lock.locked) {
2190        file->writer_lock.locked = false;
2191        mutex_unlock(&file->writer_lock.mutex);
2192    }
2193}
2194
2195void filemgr_set_dirty_root(struct filemgr *file,
2196                            bid_t dirty_idtree_root,
2197                            bid_t dirty_seqtree_root)
2198{
2199    atomic_store_uint64_t(&file->header.dirty_idtree_root, dirty_idtree_root);
2200    atomic_store_uint64_t(&file->header.dirty_seqtree_root, dirty_seqtree_root);
2201}
2202
2203bool filemgr_is_commit_header(void *head_buffer, size_t blocksize)
2204{
2205    uint8_t marker[BLK_MARKER_SIZE];
2206    filemgr_magic_t magic;
2207    marker[0] = *(((uint8_t *)head_buffer)
2208                 + blocksize - BLK_MARKER_SIZE);
2209    if (marker[0] != BLK_MARKER_DBHEADER) {
2210        return false;
2211    }
2212
2213    memcpy(&magic, (uint8_t *) head_buffer
2214            + blocksize - BLK_MARKER_SIZE - sizeof(magic), sizeof(magic));
2215    magic = _endian_decode(magic);
2216
2217    return (magic == FILEMGR_MAGIC);
2218}
2219
2220void filemgr_set_throttling_delay(struct filemgr *file, uint64_t delay_us)
2221{
2222    atomic_store_uint32_t(&file->throttling_delay, delay_us);
2223}
2224
2225uint32_t filemgr_get_throttling_delay(struct filemgr *file)
2226{
2227    return atomic_get_uint32_t(&file->throttling_delay);
2228}
2229
2230void _kvs_stat_set(struct filemgr *file,
2231                   fdb_kvs_id_t kv_id,
2232                   struct kvs_stat stat)
2233{
2234    if (kv_id == 0) {
2235        spin_lock(&file->lock);
2236        file->header.stat = stat;
2237        spin_unlock(&file->lock);
2238    } else {
2239        struct avl_node *a;
2240        struct kvs_node query, *node;
2241        struct kvs_header *kv_header = file->kv_header;
2242
2243        spin_lock(&kv_header->lock);
2244        query.id = kv_id;
2245        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2246        if (a) {
2247            node = _get_entry(a, struct kvs_node, avl_id);
2248            node->stat = stat;
2249        }
2250        spin_unlock(&kv_header->lock);
2251    }
2252}
2253
2254void _kvs_stat_update_attr(struct filemgr *file,
2255                           fdb_kvs_id_t kv_id,
2256                           kvs_stat_attr_t attr,
2257                           int delta)
2258{
2259    spin_t *lock = NULL;
2260    struct kvs_stat *stat;
2261
2262    if (kv_id == 0) {
2263        stat = &file->header.stat;
2264        lock = &file->lock;
2265        spin_lock(lock);
2266    } else {
2267        struct avl_node *a;
2268        struct kvs_node query, *node;
2269        struct kvs_header *kv_header = file->kv_header;
2270
2271        lock = &kv_header->lock;
2272        spin_lock(lock);
2273        query.id = kv_id;
2274        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2275        if (!a) {
2276            // KV instance corresponding to the kv_id is already removed
2277            spin_unlock(lock);
2278            return;
2279        }
2280        node = _get_entry(a, struct kvs_node, avl_id);
2281        stat = &node->stat;
2282    }
2283
2284    if (attr == KVS_STAT_DATASIZE) {
2285        stat->datasize += delta;
2286    } else if (attr == KVS_STAT_NDOCS) {
2287        stat->ndocs += delta;
2288    } else if (attr == KVS_STAT_NLIVENODES) {
2289        stat->nlivenodes += delta;
2290    } else if (attr == KVS_STAT_WAL_NDELETES) {
2291        stat->wal_ndeletes += delta;
2292    } else if (attr == KVS_STAT_WAL_NDOCS) {
2293        stat->wal_ndocs += delta;
2294    }
2295    spin_unlock(lock);
2296}
2297
2298int _kvs_stat_get_kv_header(struct kvs_header *kv_header,
2299                            fdb_kvs_id_t kv_id,
2300                            struct kvs_stat *stat)
2301{
2302    int ret = 0;
2303    struct avl_node *a;
2304    struct kvs_node query, *node;
2305
2306    query.id = kv_id;
2307    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2308    if (a) {
2309        node = _get_entry(a, struct kvs_node, avl_id);
2310        *stat = node->stat;
2311    } else {
2312        ret = -1;
2313    }
2314    return ret;
2315}
2316
2317int _kvs_stat_get(struct filemgr *file,
2318                  fdb_kvs_id_t kv_id,
2319                  struct kvs_stat *stat)
2320{
2321    int ret = 0;
2322
2323    if (kv_id == 0) {
2324        spin_lock(&file->lock);
2325        *stat = file->header.stat;
2326        spin_unlock(&file->lock);
2327    } else {
2328        struct kvs_header *kv_header = file->kv_header;
2329
2330        spin_lock(&kv_header->lock);
2331        ret = _kvs_stat_get_kv_header(kv_header, kv_id, stat);
2332        spin_unlock(&kv_header->lock);
2333    }
2334
2335    return ret;
2336}
2337
2338uint64_t _kvs_stat_get_sum(struct filemgr *file,
2339                           kvs_stat_attr_t attr)
2340{
2341    struct avl_node *a;
2342    struct kvs_node *node;
2343    struct kvs_header *kv_header = file->kv_header;
2344
2345    uint64_t ret = 0;
2346    spin_lock(&file->lock);
2347    if (attr == KVS_STAT_DATASIZE) {
2348        ret += file->header.stat.datasize;
2349    } else if (attr == KVS_STAT_NDOCS) {
2350        ret += file->header.stat.ndocs;
2351    } else if (attr == KVS_STAT_NLIVENODES) {
2352        ret += file->header.stat.nlivenodes;
2353    } else if (attr == KVS_STAT_WAL_NDELETES) {
2354        ret += file->header.stat.wal_ndeletes;
2355    } else if (attr == KVS_STAT_WAL_NDOCS) {
2356        ret += file->header.stat.wal_ndocs;
2357    }
2358    spin_unlock(&file->lock);
2359
2360    if (kv_header) {
2361        spin_lock(&kv_header->lock);
2362        a = avl_first(kv_header->idx_id);
2363        while (a) {
2364            node = _get_entry(a, struct kvs_node, avl_id);
2365            a = avl_next(&node->avl_id);
2366
2367            if (attr == KVS_STAT_DATASIZE) {
2368                ret += node->stat.datasize;
2369            } else if (attr == KVS_STAT_NDOCS) {
2370                ret += node->stat.ndocs;
2371            } else if (attr == KVS_STAT_NLIVENODES) {
2372                ret += node->stat.nlivenodes;
2373            } else if (attr == KVS_STAT_WAL_NDELETES) {
2374                ret += node->stat.wal_ndeletes;
2375            } else if (attr == KVS_STAT_WAL_NDOCS) {
2376                ret += node->stat.wal_ndocs;
2377            }
2378        }
2379        spin_unlock(&kv_header->lock);
2380    }
2381
2382    return ret;
2383}
2384
2385int _kvs_ops_stat_get_kv_header(struct kvs_header *kv_header,
2386                                fdb_kvs_id_t kv_id,
2387                                struct kvs_ops_stat *stat)
2388{
2389    int ret = 0;
2390    struct avl_node *a;
2391    struct kvs_node query, *node;
2392
2393    query.id = kv_id;
2394    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2395    if (a) {
2396        node = _get_entry(a, struct kvs_node, avl_id);
2397        *stat = node->op_stat;
2398    } else {
2399        ret = -1;
2400    }
2401    return ret;
2402}
2403
2404int _kvs_ops_stat_get(struct filemgr *file,
2405                      fdb_kvs_id_t kv_id,
2406                      struct kvs_ops_stat *stat)
2407{
2408    int ret = 0;
2409
2410    if (kv_id == 0) {
2411        spin_lock(&file->lock);
2412        *stat = file->header.op_stat;
2413        spin_unlock(&file->lock);
2414    } else {
2415        struct kvs_header *kv_header = file->kv_header;
2416
2417        spin_lock(&kv_header->lock);
2418        ret = _kvs_ops_stat_get_kv_header(kv_header, kv_id, stat);
2419        spin_unlock(&kv_header->lock);
2420    }
2421
2422    return ret;
2423}
2424
2425void _init_op_stats(struct kvs_ops_stat *stat) {
2426    atomic_init_uint64_t(&stat->num_sets, 0);
2427    atomic_init_uint64_t(&stat->num_dels, 0);
2428    atomic_init_uint64_t(&stat->num_commits, 0);
2429    atomic_init_uint64_t(&stat->num_compacts, 0);
2430    atomic_init_uint64_t(&stat->num_gets, 0);
2431    atomic_init_uint64_t(&stat->num_iterator_gets, 0);
2432    atomic_init_uint64_t(&stat->num_iterator_moves, 0);
2433}
2434
2435struct kvs_ops_stat *filemgr_get_ops_stats(struct filemgr *file,
2436                                           struct kvs_info *kvs)
2437{
2438    struct kvs_ops_stat *stat = NULL;
2439    if (!kvs || (kvs && kvs->id == 0)) {
2440        return &file->header.op_stat;
2441    } else {
2442        struct kvs_header *kv_header = file->kv_header;
2443        struct avl_node *a;
2444        struct kvs_node query, *node;
2445        spin_lock(&kv_header->lock);
2446        query.id = kvs->id;
2447        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2448        if (a) {
2449            node = _get_entry(a, struct kvs_node, avl_id);
2450            stat = &node->op_stat;
2451        }
2452        spin_unlock(&kv_header->lock);
2453    }
2454    return stat;
2455}
2456
2457void buf2kvid(size_t chunksize, void *buf, fdb_kvs_id_t *id)
2458{
2459    size_t size_id = sizeof(fdb_kvs_id_t);
2460    fdb_kvs_id_t temp;
2461
2462    if (chunksize == size_id) {
2463        temp = *((fdb_kvs_id_t*)buf);
2464    } else if (chunksize < size_id) {
2465        temp = 0;
2466        memcpy((uint8_t*)&temp + (size_id - chunksize), buf, chunksize);
2467    } else { // chunksize > sizeof(fdb_kvs_id_t)
2468        memcpy(&temp, (uint8_t*)buf + (chunksize - size_id), size_id);
2469    }
2470    *id = _endian_decode(temp);
2471}
2472
2473void kvid2buf(size_t chunksize, fdb_kvs_id_t id, void *buf)
2474{
2475    size_t size_id = sizeof(fdb_kvs_id_t);
2476    id = _endian_encode(id);
2477
2478    if (chunksize == size_id) {
2479        memcpy(buf, &id, size_id);
2480    } else if (chunksize < size_id) {
2481        memcpy(buf, (uint8_t*)&id + (size_id - chunksize), chunksize);
2482    } else { // chunksize > sizeof(fdb_kvs_id_t)
2483        memset(buf, 0x0, chunksize - size_id);
2484        memcpy((uint8_t*)buf + (chunksize - size_id), &id, size_id);
2485    }
2486}
2487
2488void buf2buf(size_t chunksize_src, void *buf_src,
2489             size_t chunksize_dst, void *buf_dst)
2490{
2491    if (chunksize_dst == chunksize_src) {
2492        memcpy(buf_dst, buf_src, chunksize_src);
2493    } else if (chunksize_dst < chunksize_src) {
2494        memcpy(buf_dst, (uint8_t*)buf_src + (chunksize_src - chunksize_dst),
2495               chunksize_dst);
2496    } else { // chunksize_dst > chunksize_src
2497        memset(buf_dst, 0x0, chunksize_dst - chunksize_src);
2498        memcpy((uint8_t*)buf_dst + (chunksize_dst - chunksize_src),
2499               buf_src, chunksize_src);
2500    }
2501}
2502