xref: /4.0.0/forestdb/src/filemgr.cc (revision 17f72b63)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <sys/stat.h>
23#include <stdarg.h>
24#if !defined(WIN32) && !defined(_WIN32)
25#include <sys/time.h>
26#endif
27
28#include "filemgr.h"
29#include "filemgr_ops.h"
30#include "hash_functions.h"
31#include "blockcache.h"
32#include "wal.h"
33#include "list.h"
34#include "fdb_internal.h"
35#include "time_utils.h"
36
37#include "memleak.h"
38
39#ifdef __DEBUG
40#ifndef __DEBUG_FILEMGR
41    #undef DBG
42    #undef DBGCMD
43    #undef DBGSW
44    #define DBG(...)
45    #define DBGCMD(...)
46    #define DBGSW(n, ...)
47#endif
48#endif
49
50// NBUCKET must be power of 2
51#define NBUCKET (1024)
52#define FILEMGR_MAGIC (UINT64_C(0xdeadcafebeefbeef))
53
54// global static variables
55#ifdef SPIN_INITIALIZER
56static spin_t initial_lock = SPIN_INITIALIZER;
57#else
58static volatile unsigned int initial_lock_status = 0;
59static spin_t initial_lock;
60#endif
61
62
63static volatile uint8_t filemgr_initialized = 0;
64static struct filemgr_config global_config;
65static struct hash hash;
66static spin_t filemgr_openlock;
67
68struct temp_buf_item{
69    void *addr;
70    struct list_elem le;
71};
72static struct list temp_buf;
73static spin_t temp_buf_lock;
74
75static bool lazy_file_deletion_enabled = false;
76static register_file_removal_func register_file_removal = NULL;
77static check_file_removal_func is_file_removed = NULL;
78
79static void spin_init_wrap(void *lock) {
80    spin_init((spin_t*)lock);
81}
82
83static void spin_destroy_wrap(void *lock) {
84    spin_destroy((spin_t*)lock);
85}
86
87static void spin_lock_wrap(void *lock) {
88    spin_lock((spin_t*)lock);
89}
90
91static void spin_unlock_wrap(void *lock) {
92    spin_unlock((spin_t*)lock);
93}
94
95static void mutex_init_wrap(void *lock) {
96    mutex_init((mutex_t*)lock);
97}
98
99static void mutex_destroy_wrap(void *lock) {
100    mutex_destroy((mutex_t*)lock);
101}
102
103static void mutex_lock_wrap(void *lock) {
104    mutex_lock((mutex_t*)lock);
105}
106
107static void mutex_unlock_wrap(void *lock) {
108    mutex_unlock((mutex_t*)lock);
109}
110
111static int _kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
112{
113    struct kvs_node *aa, *bb;
114    aa = _get_entry(a, struct kvs_node, avl_id);
115    bb = _get_entry(b, struct kvs_node, avl_id);
116
117    if (aa->id < bb->id) {
118        return -1;
119    } else if (aa->id > bb->id) {
120        return 1;
121    } else {
122        return 0;
123    }
124}
125
126static int _block_is_overlapped(void *pbid1, void *pis_writer1,
127                                void *pbid2, void *pis_writer2,
128                                void *aux)
129{
130    (void)aux;
131    bid_t bid1, is_writer1, bid2, is_writer2;
132    bid1 = *(bid_t*)pbid1;
133    is_writer1 = *(bid_t*)pis_writer1;
134    bid2 = *(bid_t*)pbid2;
135    is_writer2 = *(bid_t*)pis_writer2;
136
137    if (bid1 != bid2) {
138        // not overlapped
139        return 0;
140    } else {
141        // overlapped
142        if (!is_writer1 && !is_writer2) {
143            // both are readers
144            return 0;
145        } else {
146            return 1;
147        }
148    }
149}
150
151fdb_status fdb_log(err_log_callback *log_callback,
152                   fdb_status status,
153                   const char *format, ...)
154{
155    if (log_callback && log_callback->callback) {
156        char msg[1024];
157        va_list args;
158        va_start(args, format);
159        vsprintf(msg, format, args);
160        va_end(args);
161        log_callback->callback(status, msg, log_callback->ctx_data);
162    }
163    return status;
164}
165
166static void _log_errno_str(struct filemgr_ops *ops,
167                           err_log_callback *log_callback,
168                           fdb_status io_error,
169                           const char *what,
170                           const char *filename)
171{
172    if (io_error < 0) {
173        char errno_msg[512];
174        ops->get_errno_str(errno_msg, 512);
175        fdb_log(log_callback, io_error,
176                "Error in %s on a database file '%s', %s", what, filename, errno_msg);
177    }
178}
179
180static uint32_t _file_hash(struct hash *hash, struct hash_elem *e)
181{
182    struct filemgr *file = _get_entry(e, struct filemgr, e);
183    int len = strlen(file->filename);
184    return chksum(file->filename, len) & ((unsigned)(NBUCKET-1));
185}
186
187static int _file_cmp(struct hash_elem *a, struct hash_elem *b)
188{
189    struct filemgr *aa, *bb;
190    aa = _get_entry(a, struct filemgr, e);
191    bb = _get_entry(b, struct filemgr, e);
192    return strcmp(aa->filename, bb->filename);
193}
194
195void filemgr_init(struct filemgr_config *config)
196{
197    // global initialization
198    // initialized only once at first time
199    if (!filemgr_initialized) {
200#ifndef SPIN_INITIALIZER
201        // Note that only Windows passes through this routine
202        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
203            // atomically initialize spin lock only once
204            spin_init(&initial_lock);
205            initial_lock_status = 2;
206        } else {
207            // the others ... wait until initializing 'initial_lock' is done
208            while (initial_lock_status != 2) {
209                Sleep(1);
210            }
211        }
212#endif
213
214        spin_lock(&initial_lock);
215        if (!filemgr_initialized) {
216            global_config = *config;
217
218            if (global_config.ncacheblock > 0)
219                bcache_init(global_config.ncacheblock, global_config.blocksize);
220
221            hash_init(&hash, NBUCKET, _file_hash, _file_cmp);
222
223            // initialize temp buffer
224            list_init(&temp_buf);
225            spin_init(&temp_buf_lock);
226
227            // initialize global lock
228            spin_init(&filemgr_openlock);
229
230            // set the initialize flag
231            filemgr_initialized = 1;
232        }
233        spin_unlock(&initial_lock);
234    }
235}
236
237void filemgr_set_lazy_file_deletion(bool enable,
238                                    register_file_removal_func regis_func,
239                                    check_file_removal_func check_func)
240{
241    lazy_file_deletion_enabled = enable;
242    register_file_removal = regis_func;
243    is_file_removed = check_func;
244}
245
246static void * _filemgr_get_temp_buf()
247{
248    struct list_elem *e;
249    struct temp_buf_item *item;
250
251    spin_lock(&temp_buf_lock);
252    e = list_pop_front(&temp_buf);
253    if (e) {
254        item = _get_entry(e, struct temp_buf_item, le);
255    } else {
256        void *addr;
257
258        malloc_align(addr, FDB_SECTOR_SIZE,
259                     global_config.blocksize + sizeof(struct temp_buf_item));
260
261        item = (struct temp_buf_item *)((uint8_t *) addr + global_config.blocksize);
262        item->addr = addr;
263    }
264    spin_unlock(&temp_buf_lock);
265
266    return item->addr;
267}
268
269static void _filemgr_release_temp_buf(void *buf)
270{
271    struct temp_buf_item *item;
272
273    spin_lock(&temp_buf_lock);
274    item = (struct temp_buf_item*)((uint8_t *)buf + global_config.blocksize);
275    list_push_front(&temp_buf, &item->le);
276    spin_unlock(&temp_buf_lock);
277}
278
279static void _filemgr_shutdown_temp_buf()
280{
281    struct list_elem *e;
282    struct temp_buf_item *item;
283    size_t count=0;
284
285    spin_lock(&temp_buf_lock);
286    e = list_begin(&temp_buf);
287    while(e){
288        item = _get_entry(e, struct temp_buf_item, le);
289        e = list_remove(&temp_buf, e);
290        free_align(item->addr);
291        count++;
292    }
293    spin_unlock(&temp_buf_lock);
294}
295
296static fdb_status _filemgr_read_header(struct filemgr *file,
297                                       err_log_callback *log_callback)
298{
299    uint8_t marker[BLK_MARKER_SIZE];
300    filemgr_magic_t magic;
301    filemgr_header_len_t len;
302    uint8_t *buf;
303    uint32_t crc, crc_file;
304    fdb_status status = FDB_RESULT_SUCCESS;
305
306    // get temp buffer
307    buf = (uint8_t *) _filemgr_get_temp_buf();
308
309    if (atomic_get_uint64_t(&file->pos) > 0) {
310        // Crash Recovery Test 1: unaligned last block write
311        uint64_t remain = atomic_get_uint64_t(&file->pos) % file->blocksize;
312        if (remain) {
313            atomic_sub_uint64_t(&file->pos, remain);
314            atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
315            const char *msg = "Crash Detected: %" _F64 " non-block aligned bytes discarded "
316                "from a database file '%s'\n";
317            DBG(msg, remain, file->filename);
318            fdb_log(log_callback, FDB_RESULT_READ_FAIL /* Need to add a better error code*/,
319                    msg, remain, file->filename);
320        }
321
322        size_t block_counter = 0;
323        do {
324            ssize_t rv = file->ops->pread(file->fd, buf, file->blocksize,
325                atomic_get_uint64_t(&file->pos) - file->blocksize);
326            if (rv != file->blocksize) {
327                status = FDB_RESULT_READ_FAIL;
328                const char *msg = "Unable to read a database file '%s' with "
329                    "blocksize %" _F64 "\n";
330                DBG(msg, file->filename, file->blocksize);
331                fdb_log(log_callback, status, msg, file->filename, file->blocksize);
332                break;
333            }
334            ++block_counter;
335            memcpy(marker, buf + file->blocksize - BLK_MARKER_SIZE,
336                   BLK_MARKER_SIZE);
337
338            if (marker[0] == BLK_MARKER_DBHEADER) {
339                // possible need for byte conversions here
340                memcpy(&magic,
341                       buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
342                       sizeof(magic));
343                magic = _endian_decode(magic);
344
345                if (magic == FILEMGR_MAGIC) {
346                    memcpy(&len,
347                           buf + file->blocksize - BLK_MARKER_SIZE -
348                           sizeof(magic) - sizeof(len),
349                           sizeof(len));
350                    len = _endian_decode(len);
351
352                    crc = chksum(buf, len - sizeof(crc));
353                    memcpy(&crc_file, buf + len - sizeof(crc), sizeof(crc));
354                    crc_file = _endian_decode(crc_file);
355                    if (crc == crc_file) {
356                        file->header.data = (void *)malloc(len);
357
358                        memcpy(file->header.data, buf, len);
359                        memcpy(&file->header.revnum, buf + len,
360                               sizeof(filemgr_header_revnum_t));
361                        memcpy((void *) &file->header.seqnum,
362                                buf + len + sizeof(filemgr_header_revnum_t),
363                                sizeof(fdb_seqnum_t));
364                        file->header.revnum =
365                            _endian_decode(file->header.revnum);
366                        file->header.seqnum =
367                            _endian_decode(file->header.seqnum);
368                        file->header.size = len;
369                        atomic_store_uint64_t(&file->header.bid,
370                            (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1);
371                        atomic_store_uint64_t(&file->header.dirty_idtree_root,
372                                              BLK_NOT_FOUND);
373                        atomic_store_uint64_t(&file->header.dirty_seqtree_root,
374                                              BLK_NOT_FOUND);
375                        memset(&file->header.stat, 0x0, sizeof(file->header.stat));
376
377                        // release temp buffer
378                        _filemgr_release_temp_buf(buf);
379
380                        return FDB_RESULT_SUCCESS;
381                    } else {
382                        status = FDB_RESULT_CHECKSUM_ERROR;
383                        const char *msg = "Crash Detected: CRC on disk %u != %u "
384                            "in a database file '%s'\n";
385                        DBG(msg, crc_file, crc, file->filename);
386                        fdb_log(log_callback, status, msg, crc_file, crc,
387                                file->filename);
388                    }
389                } else {
390                    status = FDB_RESULT_FILE_CORRUPTION;
391                    const char *msg = "Crash Detected: Wrong Magic %" _F64 " != %" _F64
392                        " in a database file '%s'\n";
393                    DBG(msg, magic, FILEMGR_MAGIC, file->filename);
394                    fdb_log(log_callback, status, msg, magic, FILEMGR_MAGIC,
395                            file->filename);
396                }
397            } else {
398                status = FDB_RESULT_NO_DB_HEADERS;
399                if (block_counter == 1) {
400                    const char *msg = "Crash Detected: Last Block not DBHEADER %0.01x "
401                                      "in a database file '%s'\n";
402                    DBG(msg, marker[0], file->filename);
403                    fdb_log(log_callback, status, msg, marker[0], file->filename);
404                }
405            }
406
407            atomic_sub_uint64_t(&file->pos, file->blocksize);
408            atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
409        } while (atomic_get_uint64_t(&file->pos));
410    }
411
412    // release temp buffer
413    _filemgr_release_temp_buf(buf);
414
415    file->header.size = 0;
416    file->header.revnum = 0;
417    file->header.seqnum = 0;
418    file->header.data = NULL;
419    atomic_store_uint64_t(&file->header.bid, 0);
420    atomic_store_uint64_t(&file->header.dirty_idtree_root, BLK_NOT_FOUND);
421    atomic_store_uint64_t(&file->header.dirty_seqtree_root, BLK_NOT_FOUND);
422    memset(&file->header.stat, 0x0, sizeof(file->header.stat));
423    return status;
424}
425
426size_t filemgr_get_ref_count(struct filemgr *file)
427{
428    size_t ret = 0;
429    spin_lock(&file->lock);
430    ret = file->ref_count;
431    spin_unlock(&file->lock);
432    return ret;
433}
434
435uint64_t filemgr_get_bcache_used_space(void)
436{
437    uint64_t bcache_free_space = 0;
438    if (global_config.ncacheblock) { // If buffer cache is indeed configured
439        bcache_free_space = bcache_get_num_free_blocks();
440        bcache_free_space = (global_config.ncacheblock - bcache_free_space)
441                          * global_config.blocksize;
442    }
443    return bcache_free_space;
444}
445
446struct filemgr_prefetch_args {
447    struct filemgr *file;
448    uint64_t duration;
449    err_log_callback *log_callback;
450    void *aux;
451};
452
453static void *_filemgr_prefetch_thread(void *voidargs)
454{
455    struct filemgr_prefetch_args *args = (struct filemgr_prefetch_args*)voidargs;
456    uint8_t *buf = alca(uint8_t, args->file->blocksize);
457    uint64_t cur_pos = 0, i;
458    uint64_t bcache_free_space;
459    bid_t bid;
460    bool terminate = false;
461    struct timeval begin, cur, gap;
462
463    spin_lock(&args->file->lock);
464    cur_pos = atomic_get_uint64_t(&args->file->last_commit);
465    spin_unlock(&args->file->lock);
466    if (cur_pos < FILEMGR_PREFETCH_UNIT) {
467        terminate = true;
468    } else {
469        cur_pos -= FILEMGR_PREFETCH_UNIT;
470    }
471    // read backwards from the end of the file, in the unit of FILEMGR_PREFETCH_UNIT
472    gettimeofday(&begin, NULL);
473    while (!terminate) {
474        for (i = cur_pos;
475             i < cur_pos + FILEMGR_PREFETCH_UNIT;
476             i += args->file->blocksize) {
477
478            gettimeofday(&cur, NULL);
479            gap = _utime_gap(begin, cur);
480            bcache_free_space = bcache_get_num_free_blocks();
481            bcache_free_space *= args->file->blocksize;
482
483            if (args->file->prefetch_status == FILEMGR_PREFETCH_ABORT ||
484                gap.tv_sec >= (int64_t)args->duration ||
485                bcache_free_space < FILEMGR_PREFETCH_UNIT) {
486                // terminate thread when
487                // 1. got abort signal
488                // 2. time out
489                // 3. not enough free space in block cache
490                terminate = true;
491                break;
492            } else {
493                bid = i / args->file->blocksize;
494                if (filemgr_read(args->file, bid, buf, NULL, true)
495                        != FDB_RESULT_SUCCESS) {
496                    // 4. read failure
497                    fdb_log(args->log_callback, FDB_RESULT_READ_FAIL,
498                            "Prefetch thread failed to read a block with block id %" _F64
499                            " from a database file '%s'", bid, args->file->filename);
500                    terminate = true;
501                    break;
502                }
503            }
504        }
505
506        if (cur_pos >= FILEMGR_PREFETCH_UNIT) {
507            cur_pos -= FILEMGR_PREFETCH_UNIT;
508        } else {
509            // remaining space is less than FILEMGR_PREFETCH_UNIT
510            terminate = true;
511        }
512    }
513
514    args->file->prefetch_status = FILEMGR_PREFETCH_IDLE;
515    free(args);
516    return NULL;
517}
518
519// prefetch the given DB file
520void filemgr_prefetch(struct filemgr *file,
521                      struct filemgr_config *config,
522                      err_log_callback *log_callback)
523{
524    uint64_t bcache_free_space;
525
526    bcache_free_space = bcache_get_num_free_blocks();
527    bcache_free_space *= file->blocksize;
528
529    // block cache should have free space larger than FILEMGR_PREFETCH_UNIT
530    spin_lock(&file->lock);
531    if (atomic_get_uint64_t(&file->last_commit) > 0 &&
532        bcache_free_space >= FILEMGR_PREFETCH_UNIT) {
533        // invoke prefetch thread
534        struct filemgr_prefetch_args *args;
535        args = (struct filemgr_prefetch_args *)
536               calloc(1, sizeof(struct filemgr_prefetch_args));
537        args->file = file;
538        args->duration = config->prefetch_duration;
539        args->log_callback = log_callback;
540
541        file->prefetch_status = FILEMGR_PREFETCH_RUNNING;
542        thread_create(&file->prefetch_tid, _filemgr_prefetch_thread, args);
543    }
544    spin_unlock(&file->lock);
545}
546
547fdb_status filemgr_does_file_exist(char *filename) {
548    struct filemgr_ops *ops = get_filemgr_ops();
549    int fd = ops->open(filename, O_RDONLY, 0444);
550    if (fd < 0) {
551        return (fdb_status) fd;
552    }
553    ops->close(fd);
554    return FDB_RESULT_SUCCESS;
555}
556
557filemgr_open_result filemgr_open(char *filename, struct filemgr_ops *ops,
558                                 struct filemgr_config *config,
559                                 err_log_callback *log_callback)
560{
561    struct filemgr *file = NULL;
562    struct filemgr query;
563    struct hash_elem *e = NULL;
564    bool create = config->options & FILEMGR_CREATE;
565    int file_flag = 0x0;
566    int fd = -1;
567    fdb_status status;
568    filemgr_open_result result = {NULL, FDB_RESULT_OPEN_FAIL};
569
570    filemgr_init(config);
571
572    // check whether file is already opened or not
573    query.filename = filename;
574    spin_lock(&filemgr_openlock);
575    e = hash_find(&hash, &query.e);
576
577    if (e) {
578        // already opened (return existing structure)
579        file = _get_entry(e, struct filemgr, e);
580
581        spin_lock(&file->lock);
582        file->ref_count++;
583
584        if (atomic_get_uint8_t(&file->status) == FILE_CLOSED) { // if file was closed before
585            file_flag = O_RDWR;
586            if (create) {
587                file_flag |= O_CREAT;
588            }
589            *file->config = *config;
590            file->config->blocksize = global_config.blocksize;
591            file->config->ncacheblock = global_config.ncacheblock;
592            file_flag |= config->flag;
593            file->fd = file->ops->open(file->filename, file_flag, 0666);
594            if (file->fd < 0) {
595                if (file->fd == FDB_RESULT_NO_SUCH_FILE) {
596                    // A database file was manually deleted by the user.
597                    // Clean up global hash table, WAL index, and buffer cache.
598                    // Then, retry it with a create option below IFF it is not
599                    // a read-only open attempt
600                    struct hash_elem *ret;
601                    spin_unlock(&file->lock);
602                    ret = hash_remove(&hash, &file->e);
603                    fdb_assert(ret, 0, 0);
604                    filemgr_free_func(&file->e);
605                    if (!create) {
606                        _log_errno_str(ops, log_callback,
607                                FDB_RESULT_NO_SUCH_FILE, "OPEN", filename);
608                        spin_unlock(&filemgr_openlock);
609                        result.rv = FDB_RESULT_NO_SUCH_FILE;
610                        return result;
611                    }
612                } else {
613                    _log_errno_str(file->ops, log_callback,
614                                  (fdb_status)file->fd, "OPEN", filename);
615                    file->ref_count--;
616                    spin_unlock(&file->lock);
617                    spin_unlock(&filemgr_openlock);
618                    result.rv = file->fd;
619                    return result;
620                }
621            } else { // Reopening the closed file is succeed.
622                atomic_store_uint8_t(&file->status, FILE_NORMAL);
623                if (config->options & FILEMGR_SYNC) {
624                    file->fflags |= FILEMGR_SYNC;
625                } else {
626                    file->fflags &= ~FILEMGR_SYNC;
627                }
628                spin_unlock(&file->lock);
629                spin_unlock(&filemgr_openlock);
630                result.file = file;
631                result.rv = FDB_RESULT_SUCCESS;
632                return result;
633            }
634        } else { // file is already opened.
635
636            if (config->options & FILEMGR_SYNC) {
637                file->fflags |= FILEMGR_SYNC;
638            } else {
639                file->fflags &= ~FILEMGR_SYNC;
640            }
641
642            spin_unlock(&file->lock);
643            spin_unlock(&filemgr_openlock);
644            result.file = file;
645            result.rv = FDB_RESULT_SUCCESS;
646            return result;
647        }
648    }
649
650    file_flag = O_RDWR;
651    if (create) {
652        file_flag |= O_CREAT;
653    }
654    file_flag |= config->flag;
655    fd = ops->open(filename, file_flag, 0666);
656    if (fd < 0) {
657        _log_errno_str(ops, log_callback, (fdb_status)fd, "OPEN", filename);
658        spin_unlock(&filemgr_openlock);
659        result.rv = fd;
660        return result;
661    }
662    file = (struct filemgr*)calloc(1, sizeof(struct filemgr));
663    file->filename_len = strlen(filename);
664    file->filename = (char*)malloc(file->filename_len + 1);
665    strcpy(file->filename, filename);
666
667    file->ref_count = 1;
668
669    file->wal = (struct wal *)calloc(1, sizeof(struct wal));
670    file->wal->flag = 0;
671
672    file->ops = ops;
673    file->blocksize = global_config.blocksize;
674    atomic_init_uint8_t(&file->status, FILE_NORMAL);
675    file->config = (struct filemgr_config*)malloc(sizeof(struct filemgr_config));
676    *file->config = *config;
677    file->config->blocksize = global_config.blocksize;
678    file->config->ncacheblock = global_config.ncacheblock;
679    file->new_file = NULL;
680    file->old_filename = NULL;
681    file->fd = fd;
682
683    cs_off_t offset = file->ops->goto_eof(file->fd);
684    if (offset == FDB_RESULT_SEEK_FAIL) {
685        _log_errno_str(file->ops, log_callback, FDB_RESULT_SEEK_FAIL, "SEEK_END", filename);
686        file->ops->close(file->fd);
687        free(file->wal);
688        free(file->filename);
689        free(file->config);
690        free(file);
691        spin_unlock(&filemgr_openlock);
692        result.rv = FDB_RESULT_SEEK_FAIL;
693        return result;
694    }
695    atomic_init_uint64_t(&file->last_commit, offset);
696    atomic_init_uint64_t(&file->pos, offset);
697
698    file->bcache = NULL;
699    file->in_place_compaction = false;
700    file->kv_header = NULL;
701    file->prefetch_status = FILEMGR_PREFETCH_IDLE;
702
703    atomic_init_uint64_t(&file->header.bid, 0);
704    atomic_init_uint64_t(&file->header.dirty_idtree_root, 0);
705    atomic_init_uint64_t(&file->header.dirty_seqtree_root, 0);
706    _init_op_stats(&file->header.op_stat);
707    status = _filemgr_read_header(file, log_callback);
708    if (status != FDB_RESULT_SUCCESS) {
709        _log_errno_str(file->ops, log_callback, status, "READ", filename);
710        file->ops->close(file->fd);
711        free(file->wal);
712        free(file->filename);
713        free(file->config);
714        free(file);
715        spin_unlock(&filemgr_openlock);
716        result.rv = status;
717        return result;
718    }
719
720    spin_init(&file->lock);
721
722#ifdef __FILEMGR_DATA_PARTIAL_LOCK
723    struct plock_ops pops;
724    struct plock_config pconfig;
725
726    pops.init_user = mutex_init_wrap;
727    pops.lock_user = mutex_lock_wrap;
728    pops.unlock_user = mutex_unlock_wrap;
729    pops.destroy_user = mutex_destroy_wrap;
730    pops.init_internal = spin_init_wrap;
731    pops.lock_internal = spin_lock_wrap;
732    pops.unlock_internal = spin_unlock_wrap;
733    pops.destroy_internal = spin_destroy_wrap;
734    pops.is_overlapped = _block_is_overlapped;
735
736    memset(&pconfig, 0x0, sizeof(pconfig));
737    pconfig.ops = &pops;
738    pconfig.sizeof_lock_internal = sizeof(spin_t);
739    pconfig.sizeof_lock_user = sizeof(mutex_t);
740    pconfig.sizeof_range = sizeof(bid_t);
741    pconfig.aux = NULL;
742    plock_init(&file->plock, &pconfig);
743#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
744    int i;
745    for (i=0;i<DLOCK_MAX;++i) {
746        mutex_init(&file->data_mutex[i]);
747    }
748#else
749    int i;
750    for (i=0;i<DLOCK_MAX;++i) {
751        spin_init(&file->data_spinlock[i]);
752    }
753#endif //__FILEMGR_DATA_PARTIAL_LOCK
754
755    mutex_init(&file->writer_lock.mutex);
756    file->writer_lock.locked = false;
757
758    // initialize WAL
759    if (!wal_is_initialized(file)) {
760        wal_init(file, FDB_WAL_NBUCKET);
761    }
762
763    // init global transaction for the file
764    file->global_txn.wrapper = (struct wal_txn_wrapper*)
765                               malloc(sizeof(struct wal_txn_wrapper));
766    file->global_txn.wrapper->txn = &file->global_txn;
767    file->global_txn.handle = NULL;
768    if (atomic_get_uint64_t(&file->pos)) {
769        file->global_txn.prev_hdr_bid =
770            (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1;
771    } else {
772        file->global_txn.prev_hdr_bid = BLK_NOT_FOUND;
773    }
774    file->global_txn.items = (struct list *)malloc(sizeof(struct list));
775    list_init(file->global_txn.items);
776    file->global_txn.isolation = FDB_ISOLATION_READ_COMMITTED;
777    wal_add_transaction(file, &file->global_txn);
778
779    hash_insert(&hash, &file->e);
780    if (config->prefetch_duration > 0) {
781        filemgr_prefetch(file, config, log_callback);
782    }
783    spin_unlock(&filemgr_openlock);
784
785    if (config->options & FILEMGR_SYNC) {
786        file->fflags |= FILEMGR_SYNC;
787    } else {
788        file->fflags &= ~FILEMGR_SYNC;
789    }
790
791    result.file = file;
792    result.rv = FDB_RESULT_SUCCESS;
793    return result;
794}
795
796uint64_t filemgr_update_header(struct filemgr *file, void *buf, size_t len)
797{
798    uint64_t ret;
799
800    spin_lock(&file->lock);
801
802    if (file->header.data == NULL) {
803        file->header.data = (void *)malloc(len);
804    }else if (file->header.size < len){
805        file->header.data = (void *)realloc(file->header.data, len);
806    }
807    memcpy(file->header.data, buf, len);
808    file->header.size = len;
809    ++(file->header.revnum);
810    ret = file->header.revnum;
811
812    spin_unlock(&file->lock);
813
814    return ret;
815}
816
817filemgr_header_revnum_t filemgr_get_header_revnum(struct filemgr *file)
818{
819    filemgr_header_revnum_t ret;
820    spin_lock(&file->lock);
821    ret = file->header.revnum;
822    spin_unlock(&file->lock);
823    return ret;
824}
825
826// 'filemgr_get_seqnum' & 'filemgr_set_seqnum' have to be protected by
827// 'filemgr_mutex_lock' & 'filemgr_mutex_unlock'.
828fdb_seqnum_t filemgr_get_seqnum(struct filemgr *file)
829{
830    return file->header.seqnum;
831}
832
833void filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)
834{
835    file->header.seqnum = seqnum;
836}
837
838void* filemgr_get_header(struct filemgr *file, void *buf, size_t *len,
839                         bid_t *header_bid, fdb_seqnum_t *seqnum,
840                         filemgr_header_revnum_t *header_revnum)
841{
842    spin_lock(&file->lock);
843
844    if (file->header.size > 0) {
845        if (buf == NULL) {
846            buf = (void*)malloc(file->header.size);
847        }
848        memcpy(buf, file->header.data, file->header.size);
849    }
850
851    if (len) {
852        *len = file->header.size;
853    }
854    if (header_bid) {
855        *header_bid = ((file->header.size > 0) ?
856                       atomic_get_uint64_t(&file->header.bid) : BLK_NOT_FOUND);
857    }
858    if (seqnum) {
859        *seqnum = file->header.seqnum;
860    }
861    if (header_revnum) {
862        *header_revnum = file->header.revnum;
863    }
864
865    spin_unlock(&file->lock);
866
867    return buf;
868}
869
870fdb_status filemgr_fetch_header(struct filemgr *file, uint64_t bid,
871                                void *buf, size_t *len, fdb_seqnum_t *seqnum,
872                                filemgr_header_revnum_t *header_revnum,
873                                err_log_callback *log_callback)
874{
875    uint8_t *_buf;
876    uint8_t marker[BLK_MARKER_SIZE];
877    filemgr_header_len_t hdr_len;
878    filemgr_magic_t magic;
879    fdb_status status = FDB_RESULT_SUCCESS;
880
881    if (!bid || bid == BLK_NOT_FOUND) {
882        *len = 0; // No other header available
883        return FDB_RESULT_SUCCESS;
884    }
885    _buf = (uint8_t *)_filemgr_get_temp_buf();
886
887    status = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
888
889    if (status != FDB_RESULT_SUCCESS) {
890        fdb_log(log_callback, status,
891                "Failed to read a database header with block id %" _F64 " in "
892                "a database file '%s'", bid, file->filename);
893        _filemgr_release_temp_buf(_buf);
894        return status;
895    }
896    memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
897            BLK_MARKER_SIZE);
898
899    if (marker[0] != BLK_MARKER_DBHEADER) {
900        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
901                "A block marker of the database header block id %" _F64 " in "
902                "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
903                bid, file->filename);
904        _filemgr_release_temp_buf(_buf);
905        return FDB_RESULT_READ_FAIL;
906    }
907    memcpy(&magic,
908            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
909            sizeof(magic));
910    magic = _endian_decode(magic);
911    if (magic != FILEMGR_MAGIC) {
912        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
913                "A block magic value of the database header block id %" _F64 " in "
914                "a database file '%s' does NOT match FILEMGR_MAGIC!",
915                bid, file->filename);
916        _filemgr_release_temp_buf(_buf);
917        return FDB_RESULT_READ_FAIL;
918    }
919    memcpy(&hdr_len,
920            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
921            sizeof(hdr_len), sizeof(hdr_len));
922    hdr_len = _endian_decode(hdr_len);
923
924    memcpy(buf, _buf, hdr_len);
925    *len = hdr_len;
926
927    if (header_revnum) {
928        // copy the DB header revnum
929        filemgr_header_revnum_t _revnum;
930        memcpy(&_revnum, _buf + hdr_len, sizeof(_revnum));
931        *header_revnum = _endian_decode(_revnum);
932    }
933    if (seqnum) {
934        // copy default KVS's seqnum
935        fdb_seqnum_t _seqnum;
936        memcpy(&_seqnum, _buf + hdr_len + sizeof(filemgr_header_revnum_t),
937               sizeof(_seqnum));
938        *seqnum = _endian_decode(_seqnum);
939    }
940
941    _filemgr_release_temp_buf(_buf);
942
943    return status;
944}
945
946uint64_t filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid,
947                                   void *buf, size_t *len, fdb_seqnum_t *seqnum,
948                                   err_log_callback *log_callback)
949{
950    uint8_t *_buf;
951    uint8_t marker[BLK_MARKER_SIZE];
952    fdb_seqnum_t _seqnum;
953    filemgr_header_revnum_t _revnum;
954    filemgr_header_len_t hdr_len;
955    filemgr_magic_t magic;
956    bid_t _prev_bid, prev_bid;
957    int found = 0;
958
959    if (!bid || bid == BLK_NOT_FOUND) {
960        *len = 0; // No other header available
961        return bid;
962    }
963    _buf = (uint8_t *)_filemgr_get_temp_buf();
964
965    // Reverse scan the file for a previous DB header
966    do {
967        // Get prev_bid from the current header.
968        // Since the current header is already cached during the previous
969        // operation, no disk I/O will be triggered.
970        if (filemgr_read(file, (bid_t)bid, _buf, log_callback, true)
971                != FDB_RESULT_SUCCESS) {
972            break;
973        }
974
975        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
976               BLK_MARKER_SIZE);
977        memcpy(&magic,
978               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
979               sizeof(magic));
980        magic = _endian_decode(magic);
981
982        if (marker[0] != BLK_MARKER_DBHEADER ||
983            magic != FILEMGR_MAGIC) {
984            // not a header block
985            // this happens when this function is invoked between
986            // fdb_set() call and fdb_commit() call, so the last block
987            // in the file is not a header block
988            bid_t latest_hdr = filemgr_get_header_bid(file);
989            if (latest_hdr != BLK_NOT_FOUND && bid > latest_hdr) {
990                // get the latest header BID
991                bid = latest_hdr;
992            } else {
993                break;
994            }
995        } else {
996            memcpy(&_prev_bid,
997                   _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
998                       sizeof(hdr_len) - sizeof(_prev_bid),
999                   sizeof(_prev_bid));
1000            prev_bid = _endian_decode(_prev_bid);
1001            if (bid <= prev_bid) {
1002                // no more prev header, or broken linked list
1003                break;
1004            }
1005            bid = prev_bid;
1006        }
1007
1008        // Read the prev header
1009        fdb_status fs = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
1010        if (fs != FDB_RESULT_SUCCESS) {
1011            fdb_log(log_callback, fs,
1012                    "Failed to read a previous database header with block id %" _F64 " in "
1013                    "a database file '%s'", bid, file->filename);
1014            break;
1015        }
1016
1017        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1018               BLK_MARKER_SIZE);
1019        if (marker[0] != BLK_MARKER_DBHEADER) {
1020            if (bid) {
1021                // broken linked list
1022                fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1023                        "A block marker of the previous database header block id %"
1024                        _F64 " in "
1025                        "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1026                        bid, file->filename);
1027            }
1028            break;
1029        }
1030
1031        memcpy(&magic,
1032               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1033               sizeof(magic));
1034        magic = _endian_decode(magic);
1035        if (magic != FILEMGR_MAGIC) {
1036            // broken linked list
1037            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1038                    "A block magic value of the previous database header block id %" _F64 " in "
1039                    "a database file '%s' does NOT match FILEMGR_MAGIC!",
1040                    bid, file->filename);
1041            break;
1042        }
1043
1044        memcpy(&hdr_len,
1045               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1046               sizeof(hdr_len), sizeof(hdr_len));
1047        hdr_len = _endian_decode(hdr_len);
1048
1049        if (buf) {
1050            memcpy(buf, _buf, hdr_len);
1051        }
1052        memcpy(&_revnum, _buf + hdr_len,
1053               sizeof(filemgr_header_revnum_t));
1054        memcpy(&_seqnum,
1055               _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1056               sizeof(fdb_seqnum_t));
1057        *seqnum = _endian_decode(_seqnum);
1058        *len = hdr_len;
1059        found = 1;
1060        break;
1061    } while (false); // no repetition
1062
1063    if (!found) { // no other header found till end of file
1064        *len = 0;
1065    }
1066
1067    _filemgr_release_temp_buf(_buf);
1068
1069    return bid;
1070}
1071
1072fdb_status filemgr_close(struct filemgr *file, bool cleanup_cache_onclose,
1073                         const char *orig_file_name,
1074                         err_log_callback *log_callback)
1075{
1076    int rv = FDB_RESULT_SUCCESS;
1077
1078    spin_lock(&filemgr_openlock); // Grab the filemgr lock to avoid the race with
1079                                  // filemgr_open() because file->lock won't
1080                                  // prevent the race condition.
1081
1082    // remove filemgr structure if no thread refers to the file
1083    spin_lock(&file->lock);
1084    if (--(file->ref_count) == 0) {
1085        if (global_config.ncacheblock > 0 &&
1086            atomic_get_uint8_t(&file->status) != FILE_REMOVED_PENDING) {
1087            spin_unlock(&file->lock);
1088            // discard all dirty blocks belonged to this file
1089            bcache_remove_dirty_blocks(file);
1090        } else {
1091            // If the file is in pending removal (i.e., FILE_REMOVED_PENDING),
1092            // then its dirty block entries will be cleaned up in either
1093            // filemgr_free_func() or register_file_removal() below.
1094            spin_unlock(&file->lock);
1095        }
1096
1097        if (wal_is_initialized(file)) {
1098            wal_close(file);
1099        }
1100
1101        spin_lock(&file->lock);
1102        rv = file->ops->close(file->fd);
1103        if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING) {
1104            _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1105
1106            bool foreground_deletion = false;
1107
1108            // immediately remove file if background remove function is not set
1109            if (!lazy_file_deletion_enabled ||
1110                (file->new_file && file->new_file->in_place_compaction)) {
1111                // TODO: to avoid the scenario below, we prevent background
1112                //       deletion of in-place compacted files at this time.
1113                // 1) In-place compacted from 'A' to 'A.1'.
1114                // 2) Request to delete 'A'.
1115                // 3) Close 'A.1'; since 'A' is not deleted yet, 'A.1' is not renamed.
1116                // 4) User opens DB file using its original name 'A', not 'A.1'.
1117                // 5) Old file 'A' is opened, and then background thread deletes 'A'.
1118                // 6) Crash!
1119                remove(file->filename);
1120                foreground_deletion = true;
1121            }
1122
1123            // we can release lock becuase no one will open this file
1124            spin_unlock(&file->lock);
1125            struct hash_elem *ret = hash_remove(&hash, &file->e);
1126            fdb_assert(ret, 0, 0);
1127            spin_unlock(&filemgr_openlock);
1128
1129            if (foreground_deletion) {
1130                filemgr_free_func(&file->e);
1131            } else {
1132                register_file_removal(file, log_callback);
1133            }
1134            return (fdb_status) rv;
1135        } else {
1136            if (cleanup_cache_onclose) {
1137                _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1138                if (file->in_place_compaction && orig_file_name) {
1139                    struct hash_elem *elem = NULL;
1140                    struct filemgr query;
1141                    uint32_t old_file_refcount = 0;
1142
1143                    query.filename = (char *)orig_file_name;
1144                    elem = hash_find(&hash, &query.e);
1145
1146                    if (file->old_filename) {
1147                        struct hash_elem *elem_old = NULL;
1148                        struct filemgr query_old;
1149                        struct filemgr *old_file = NULL;
1150
1151                        // get old file's ref count if exists
1152                        query_old.filename = file->old_filename;
1153                        elem_old = hash_find(&hash, &query_old.e);
1154                        if (elem_old) {
1155                            old_file = _get_entry(elem_old, struct filemgr, e);
1156                            old_file_refcount = old_file->ref_count;
1157                        }
1158                    }
1159
1160                    // If old file is opened by other handle, renaming should be
1161                    // postponed. It will be renamed later by the handle referring
1162                    // to the old file.
1163                    if (!elem && old_file_refcount == 0 &&
1164                        is_file_removed(orig_file_name)) {
1165                        // If background file removal is not done yet, we postpone
1166                        // file renaming at this time.
1167                        if (rename(file->filename, orig_file_name) < 0) {
1168                            // Note that the renaming failure is not a critical
1169                            // issue because the last compacted file will be automatically
1170                            // identified and opened in the next fdb_open call.
1171                            _log_errno_str(file->ops, log_callback, FDB_RESULT_FILE_RENAME_FAIL,
1172                                           "CLOSE", file->filename);
1173                        }
1174                    }
1175                }
1176                spin_unlock(&file->lock);
1177                // Clean up global hash table, WAL index, and buffer cache.
1178                struct hash_elem *ret = hash_remove(&hash, &file->e);
1179                fdb_assert(ret, file, 0);
1180                spin_unlock(&filemgr_openlock);
1181                filemgr_free_func(&file->e);
1182                return (fdb_status) rv;
1183            } else {
1184                atomic_store_uint8_t(&file->status, FILE_CLOSED);
1185            }
1186        }
1187    }
1188
1189    _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1190
1191    spin_unlock(&file->lock);
1192    spin_unlock(&filemgr_openlock);
1193    return (fdb_status) rv;
1194}
1195
1196void filemgr_remove_all_buffer_blocks(struct filemgr *file)
1197{
1198    // remove all cached blocks
1199    if (global_config.ncacheblock > 0 && file->bcache) {
1200        bcache_remove_dirty_blocks(file);
1201        bcache_remove_clean_blocks(file);
1202        if (bcache_remove_file(file)) {
1203            file->bcache = NULL;
1204        }
1205    }
1206}
1207
1208void filemgr_free_func(struct hash_elem *h)
1209{
1210    struct filemgr *file = _get_entry(h, struct filemgr, e);
1211
1212    spin_lock(&file->lock);
1213    if (file->prefetch_status == FILEMGR_PREFETCH_RUNNING) {
1214        // prefetch thread is running
1215        void *ret;
1216        file->prefetch_status = FILEMGR_PREFETCH_ABORT;
1217        spin_unlock(&file->lock);
1218        // wait
1219        thread_join(file->prefetch_tid, &ret);
1220    } else {
1221        spin_unlock(&file->lock);
1222    }
1223
1224    // remove all cached blocks
1225    if (global_config.ncacheblock > 0 && file->bcache) {
1226        bcache_remove_dirty_blocks(file);
1227        bcache_remove_clean_blocks(file);
1228        if (bcache_remove_file(file)) {
1229            file->bcache = NULL;
1230        }
1231    }
1232
1233    if (file->kv_header) {
1234        // multi KV intance mode & KV header exists
1235        file->free_kv_header(file);
1236    }
1237
1238    // free global transaction
1239    wal_remove_transaction(file, &file->global_txn);
1240    free(file->global_txn.items);
1241    free(file->global_txn.wrapper);
1242
1243    // destroy WAL
1244    if (wal_is_initialized(file)) {
1245        wal_shutdown(file);
1246        size_t i = 0;
1247        size_t num_all_shards = wal_get_num_all_shards(file);
1248        // Free all WAL shards (including compactor's shard)
1249        for (; i < num_all_shards; ++i) {
1250            hash_free(&file->wal->key_shards[i].hash_bykey);
1251            spin_destroy(&file->wal->key_shards[i].lock);
1252            hash_free(&file->wal->seq_shards[i].hash_byseq);
1253            spin_destroy(&file->wal->seq_shards[i].lock);
1254        }
1255        spin_destroy(&file->wal->lock);
1256        atomic_destroy_uint32_t(&file->wal->size);
1257        atomic_destroy_uint32_t(&file->wal->num_flushable);
1258        atomic_destroy_uint64_t(&file->wal->datasize);
1259        free(file->wal->key_shards);
1260        free(file->wal->seq_shards);
1261    }
1262    free(file->wal);
1263
1264    // free filename and header
1265    free(file->filename);
1266    if (file->header.data) free(file->header.data);
1267    // free old filename if any
1268    free(file->old_filename);
1269
1270    // destroy locks
1271    spin_destroy(&file->lock);
1272
1273#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1274    plock_destroy(&file->plock);
1275#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1276    int i;
1277    for (i=0;i<DLOCK_MAX;++i) {
1278        mutex_destroy(&file->data_mutex[i]);
1279    }
1280#else
1281    int i;
1282    for (i=0;i<DLOCK_MAX;++i) {
1283        spin_destroy(&file->data_spinlock[i]);
1284    }
1285#endif //__FILEMGR_DATA_PARTIAL_LOCK
1286
1287    mutex_destroy(&file->writer_lock.mutex);
1288
1289    // free file structure
1290    free(file->config);
1291    free(file);
1292}
1293
1294// permanently remove file from cache (not just close)
1295// LCOV_EXCL_START
1296void filemgr_remove_file(struct filemgr *file)
1297{
1298    struct hash_elem *ret;
1299
1300    fdb_assert(file, file, NULL);
1301    fdb_assert(file->ref_count <= 0, file->ref_count, 0);
1302
1303    // remove from global hash table
1304    spin_lock(&filemgr_openlock);
1305    ret = hash_remove(&hash, &file->e);
1306    fdb_assert(ret, ret, NULL);
1307    spin_unlock(&filemgr_openlock);
1308
1309    if (!lazy_file_deletion_enabled ||
1310        (file->new_file && file->new_file->in_place_compaction)) {
1311        filemgr_free_func(&file->e);
1312    } else {
1313        register_file_removal(file, NULL);
1314    }
1315}
1316// LCOV_EXCL_STOP
1317
1318static
1319void *_filemgr_is_closed(struct hash_elem *h, void *ctx) {
1320    struct filemgr *file = _get_entry(h, struct filemgr, e);
1321    void *ret;
1322    spin_lock(&file->lock);
1323    if (file->ref_count != 0) {
1324        ret = (void *)file;
1325    } else {
1326        ret = NULL;
1327    }
1328    spin_unlock(&file->lock);
1329    return ret;
1330}
1331
1332fdb_status filemgr_shutdown()
1333{
1334    fdb_status ret = FDB_RESULT_SUCCESS;
1335    void *open_file;
1336    if (filemgr_initialized) {
1337
1338#ifndef SPIN_INITIALIZER
1339        // Windows: check if spin lock is already destroyed.
1340        if (InterlockedCompareExchange(&initial_lock_status, 1, 2) == 2) {
1341            spin_lock(&initial_lock);
1342        } else {
1343            // filemgr is already shut down
1344            return ret;
1345        }
1346#else
1347        spin_lock(&initial_lock);
1348#endif
1349
1350        if (!filemgr_initialized) {
1351            // filemgr is already shut down
1352#ifdef SPIN_INITIALIZER
1353            spin_unlock(&initial_lock);
1354#endif
1355            return ret;
1356        }
1357
1358        open_file = hash_scan(&hash, _filemgr_is_closed, NULL);
1359        if (!open_file) {
1360            hash_free_active(&hash, filemgr_free_func);
1361            if (global_config.ncacheblock > 0) {
1362                bcache_shutdown();
1363            }
1364            filemgr_initialized = 0;
1365#ifndef SPIN_INITIALIZER
1366            initial_lock_status = 0;
1367#else
1368            initial_lock = SPIN_INITIALIZER;
1369#endif
1370            _filemgr_shutdown_temp_buf();
1371            spin_unlock(&initial_lock);
1372#ifndef SPIN_INITIALIZER
1373            spin_destroy(&initial_lock);
1374#endif
1375        } else {
1376            spin_unlock(&initial_lock);
1377            ret = FDB_RESULT_FILE_IS_BUSY;
1378        }
1379    }
1380    return ret;
1381}
1382
1383bid_t filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)
1384{
1385    spin_lock(&file->lock);
1386    bid_t bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1387    atomic_add_uint64_t(&file->pos, file->blocksize);
1388
1389    if (global_config.ncacheblock <= 0) {
1390        // if block cache is turned off, write the allocated block before use
1391        uint8_t _buf = 0x0;
1392        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1393                                       atomic_get_uint64_t(&file->pos) - 1);
1394        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1395    }
1396    spin_unlock(&file->lock);
1397
1398    return bid;
1399}
1400
1401void filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin,
1402                            bid_t *end, err_log_callback *log_callback)
1403{
1404    spin_lock(&file->lock);
1405    *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1406    *end = *begin + nblock - 1;
1407    atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1408
1409    if (global_config.ncacheblock <= 0) {
1410        // if block cache is turned off, write the allocated block before use
1411        uint8_t _buf = 0x0;
1412        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1413                                       atomic_get_uint64_t(&file->pos) - 1);
1414        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1415    }
1416    spin_unlock(&file->lock);
1417}
1418
1419// atomically allocate NBLOCK blocks only when current file position is same to nextbid
1420bid_t filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock,
1421                                  bid_t *begin, bid_t *end,
1422                                  err_log_callback *log_callback)
1423{
1424    bid_t bid;
1425    spin_lock(&file->lock);
1426    bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1427    if (bid == nextbid) {
1428        *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1429        *end = *begin + nblock - 1;
1430        atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1431
1432        if (global_config.ncacheblock <= 0) {
1433            // if block cache is turned off, write the allocated block before use
1434            uint8_t _buf = 0x0;
1435            ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1436                                           atomic_get_uint64_t(&file->pos));
1437            _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1438        }
1439    }else{
1440        *begin = BLK_NOT_FOUND;
1441        *end = BLK_NOT_FOUND;
1442    }
1443    spin_unlock(&file->lock);
1444    return bid;
1445}
1446
1447#ifdef __CRC32
1448INLINE fdb_status _filemgr_crc32_check(struct filemgr *file, void *buf)
1449{
1450    if ( *((uint8_t*)buf + file->blocksize-1) == BLK_MARKER_BNODE ) {
1451        uint32_t crc_file, crc;
1452        memcpy(&crc_file, (uint8_t *) buf + BTREE_CRC_OFFSET, sizeof(crc_file));
1453        crc_file = _endian_decode(crc_file);
1454        memset((uint8_t *) buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1455        crc = chksum(buf, file->blocksize);
1456        if (crc != crc_file) {
1457            return FDB_RESULT_CHECKSUM_ERROR;
1458        }
1459    }
1460    return FDB_RESULT_SUCCESS;
1461}
1462#endif
1463
1464void filemgr_invalidate_block(struct filemgr *file, bid_t bid)
1465{
1466    if (global_config.ncacheblock > 0) {
1467        bcache_invalidate_block(file, bid);
1468    }
1469}
1470
1471fdb_status filemgr_read(struct filemgr *file, bid_t bid, void *buf,
1472                        err_log_callback *log_callback,
1473                        bool read_on_cache_miss)
1474{
1475    size_t lock_no;
1476    ssize_t r;
1477    uint64_t pos = bid * file->blocksize;
1478    fdb_status status = FDB_RESULT_SUCCESS;
1479    uint64_t curr_pos = atomic_get_uint64_t(&file->pos);
1480    fdb_assert(pos < curr_pos, pos, curr_pos);
1481
1482    if (global_config.ncacheblock > 0) {
1483        lock_no = bid % DLOCK_MAX;
1484        (void)lock_no;
1485
1486#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1487        plock_entry_t *plock_entry = NULL;
1488        bid_t is_writer = 0;
1489#endif
1490        bool locked = false;
1491        // Note: we don't need to grab lock for committed blocks
1492        // because they are immutable so that no writer will interfere and
1493        // overwrite dirty data
1494        if (filemgr_is_writable(file, bid)) {
1495#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1496            plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1497#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1498            mutex_lock(&file->data_mutex[lock_no]);
1499#else
1500            spin_lock(&file->data_spinlock[lock_no]);
1501#endif //__FILEMGR_DATA_PARTIAL_LOCK
1502            locked = true;
1503        }
1504
1505        r = bcache_read(file, bid, buf);
1506        if (r == 0) {
1507            // cache miss
1508            if (!read_on_cache_miss) {
1509                if (locked) {
1510#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1511                    plock_unlock(&file->plock, plock_entry);
1512#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1513                    mutex_unlock(&file->data_mutex[lock_no]);
1514#else
1515                    spin_unlock(&file->data_spinlock[lock_no]);
1516#endif //__FILEMGR_DATA_PARTIAL_LOCK
1517                }
1518                return FDB_RESULT_READ_FAIL;
1519            }
1520
1521            // if normal file, just read a block
1522            r = file->ops->pread(file->fd, buf, file->blocksize, pos);
1523            if (r != file->blocksize) {
1524                _log_errno_str(file->ops, log_callback,
1525                               (fdb_status) r, "READ", file->filename);
1526                if (locked) {
1527#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1528                    plock_unlock(&file->plock, plock_entry);
1529#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1530                    mutex_unlock(&file->data_mutex[lock_no]);
1531#else
1532                    spin_unlock(&file->data_spinlock[lock_no]);
1533#endif //__FILEMGR_DATA_PARTIAL_LOCK
1534                }
1535                return (fdb_status)r;
1536            }
1537#ifdef __CRC32
1538            status = _filemgr_crc32_check(file, buf);
1539            if (status != FDB_RESULT_SUCCESS) {
1540                _log_errno_str(file->ops, log_callback, status, "READ",
1541                        file->filename);
1542                if (locked) {
1543#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1544                    plock_unlock(&file->plock, plock_entry);
1545#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1546                    mutex_unlock(&file->data_mutex[lock_no]);
1547#else
1548                    spin_unlock(&file->data_spinlock[lock_no]);
1549#endif //__FILEMGR_DATA_PARTIAL_LOCK
1550                }
1551                return status;
1552            }
1553#endif
1554            r = bcache_write(file, bid, buf, BCACHE_REQ_CLEAN);
1555            if (r != global_config.blocksize) {
1556                if (locked) {
1557#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1558                    plock_unlock(&file->plock, plock_entry);
1559#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1560                    mutex_unlock(&file->data_mutex[lock_no]);
1561#else
1562                    spin_unlock(&file->data_spinlock[lock_no]);
1563#endif //__FILEMGR_DATA_PARTIAL_LOCK
1564                }
1565                _log_errno_str(file->ops, log_callback,
1566                               (fdb_status) r, "WRITE", file->filename);
1567                return FDB_RESULT_WRITE_FAIL;
1568            }
1569        }
1570        if (locked) {
1571#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1572            plock_unlock(&file->plock, plock_entry);
1573#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1574            mutex_unlock(&file->data_mutex[lock_no]);
1575#else
1576            spin_unlock(&file->data_spinlock[lock_no]);
1577#endif //__FILEMGR_DATA_PARTIAL_LOCK
1578        }
1579    } else {
1580        if (!read_on_cache_miss) {
1581            return FDB_RESULT_READ_FAIL;
1582        }
1583
1584        r = file->ops->pread(file->fd, buf, file->blocksize, pos);
1585        if (r != file->blocksize) {
1586            _log_errno_str(file->ops, log_callback, (fdb_status) r, "READ",
1587                           file->filename);
1588            return (fdb_status)r;
1589        }
1590
1591#ifdef __CRC32
1592        status = _filemgr_crc32_check(file, buf);
1593        if (status != FDB_RESULT_SUCCESS) {
1594            _log_errno_str(file->ops, log_callback, status, "READ",
1595                           file->filename);
1596            return status;
1597        }
1598#endif
1599    }
1600    return status;
1601}
1602
1603fdb_status filemgr_write_offset(struct filemgr *file, bid_t bid,
1604                                uint64_t offset, uint64_t len, void *buf,
1605                                err_log_callback *log_callback)
1606{
1607    fdb_assert(offset + len <= file->blocksize, offset + len, file);
1608
1609    size_t lock_no;
1610    ssize_t r = 0;
1611    uint64_t pos = bid * file->blocksize + offset;
1612    uint64_t curr_commit_pos = atomic_get_uint64_t(&file->last_commit);
1613    fdb_assert(pos >= curr_commit_pos, pos, curr_commit_pos);
1614
1615    if (global_config.ncacheblock > 0) {
1616        lock_no = bid % DLOCK_MAX;
1617        (void)lock_no;
1618
1619        bool locked = false;
1620#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1621        plock_entry_t *plock_entry;
1622        bid_t is_writer = 1;
1623        plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1624#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1625        mutex_lock(&file->data_mutex[lock_no]);
1626#else
1627        spin_lock(&file->data_spinlock[lock_no]);
1628#endif //__FILEMGR_DATA_PARTIAL_LOCK
1629        locked = true;
1630
1631        if (len == file->blocksize) {
1632            // write entire block .. we don't need to read previous block
1633            r = bcache_write(file, bid, buf, BCACHE_REQ_DIRTY);
1634            if (r != global_config.blocksize) {
1635                if (locked) {
1636#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1637                    plock_unlock(&file->plock, plock_entry);
1638#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1639                    mutex_unlock(&file->data_mutex[lock_no]);
1640#else
1641                    spin_unlock(&file->data_spinlock[lock_no]);
1642#endif //__FILEMGR_DATA_PARTIAL_LOCK
1643                }
1644                _log_errno_str(file->ops, log_callback,
1645                               (fdb_status) r, "WRITE", file->filename);
1646                return FDB_RESULT_WRITE_FAIL;
1647            }
1648        } else {
1649            // partially write buffer cache first
1650            r = bcache_write_partial(file, bid, buf, offset, len);
1651            if (r == 0) {
1652                // cache miss
1653                // write partially .. we have to read previous contents of the block
1654                uint64_t cur_file_pos = file->ops->goto_eof(file->fd);
1655                bid_t cur_file_last_bid = cur_file_pos / file->blocksize;
1656                void *_buf = _filemgr_get_temp_buf();
1657
1658                if (bid >= cur_file_last_bid) {
1659                    // this is the first time to write this block
1660                    // we don't need to read previous block from file.
1661                } else {
1662                    r = file->ops->pread(file->fd, _buf, file->blocksize,
1663                                         bid * file->blocksize);
1664                    if (r != file->blocksize) {
1665                        if (locked) {
1666#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1667                            plock_unlock(&file->plock, plock_entry);
1668#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1669                            mutex_unlock(&file->data_mutex[lock_no]);
1670#else
1671                            spin_unlock(&file->data_spinlock[lock_no]);
1672#endif //__FILEMGR_DATA_PARTIAL_LOCK
1673                        }
1674                        _filemgr_release_temp_buf(_buf);
1675                        _log_errno_str(file->ops, log_callback, (fdb_status) r,
1676                                       "READ", file->filename);
1677                        return FDB_RESULT_READ_FAIL;
1678                    }
1679                }
1680                memcpy((uint8_t *)_buf + offset, buf, len);
1681                r = bcache_write(file, bid, _buf, BCACHE_REQ_DIRTY);
1682                if (r != global_config.blocksize) {
1683                    if (locked) {
1684#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1685                        plock_unlock(&file->plock, plock_entry);
1686#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1687                        mutex_unlock(&file->data_mutex[lock_no]);
1688#else
1689                        spin_unlock(&file->data_spinlock[lock_no]);
1690#endif //__FILEMGR_DATA_PARTIAL_LOCK
1691                    }
1692                    _filemgr_release_temp_buf(_buf);
1693                    _log_errno_str(file->ops, log_callback,
1694                            (fdb_status) r, "WRITE", file->filename);
1695                    return FDB_RESULT_WRITE_FAIL;
1696                }
1697
1698                _filemgr_release_temp_buf(_buf);
1699            } // cache miss
1700        } // full block or partial block
1701
1702        if (locked) {
1703#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1704            plock_unlock(&file->plock, plock_entry);
1705#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1706            mutex_unlock(&file->data_mutex[lock_no]);
1707#else
1708            spin_unlock(&file->data_spinlock[lock_no]);
1709#endif //__FILEMGR_DATA_PARTIAL_LOCK
1710        }
1711
1712    } else { // block cache disabled
1713
1714#ifdef __CRC32
1715        if (len == file->blocksize) {
1716            uint8_t marker = *((uint8_t*)buf + file->blocksize - 1);
1717            if (marker == BLK_MARKER_BNODE) {
1718                memset((uint8_t *)buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1719                uint32_t crc32 = chksum(buf, file->blocksize);
1720                crc32 = _endian_encode(crc32);
1721                memcpy((uint8_t *)buf + BTREE_CRC_OFFSET, &crc32, sizeof(crc32));
1722            }
1723        }
1724#endif
1725
1726        r = file->ops->pwrite(file->fd, buf, len, pos);
1727        _log_errno_str(file->ops, log_callback, (fdb_status) r, "WRITE", file->filename);
1728        if ((uint64_t)r != len) {
1729            return FDB_RESULT_WRITE_FAIL;
1730        }
1731    } // block cache check
1732    return FDB_RESULT_SUCCESS;
1733}
1734
1735fdb_status filemgr_write(struct filemgr *file, bid_t bid, void *buf,
1736                   err_log_callback *log_callback)
1737{
1738    return filemgr_write_offset(file, bid, 0, file->blocksize, buf,
1739                                log_callback);
1740}
1741
1742fdb_status filemgr_commit(struct filemgr *file,
1743                          err_log_callback *log_callback)
1744{
1745    uint16_t header_len = file->header.size;
1746    uint16_t _header_len;
1747    bid_t _prev_bid;
1748    fdb_seqnum_t _seqnum;
1749    filemgr_header_revnum_t _revnum;
1750    int result = FDB_RESULT_SUCCESS;
1751    filemgr_magic_t magic = FILEMGR_MAGIC;
1752    filemgr_magic_t _magic;
1753
1754    if (global_config.ncacheblock > 0) {
1755        result = bcache_flush(file);
1756        if (result != FDB_RESULT_SUCCESS) {
1757            _log_errno_str(file->ops, log_callback, (fdb_status) result,
1758                           "FLUSH", file->filename);
1759            return (fdb_status)result;
1760        }
1761    }
1762
1763    spin_lock(&file->lock);
1764
1765    if (file->header.size > 0 && file->header.data) {
1766        void *buf = _filemgr_get_temp_buf();
1767        uint8_t marker[BLK_MARKER_SIZE];
1768
1769        // [header data]:        'header_len' bytes   <---+
1770        // [header revnum]:      8 bytes                  |
1771        // [default KVS seqnum]: 8 bytes                  |
1772        // ...                                            |
1773        // (empty)                                    blocksize
1774        // ...                                            |
1775        // [prev header bid]:    8 bytes                  |
1776        // [header length]:      2 bytes                  |
1777        // [magic number]:       8 bytes                  |
1778        // [block marker]:       1 byte               <---+
1779
1780        // header data
1781        memcpy(buf, file->header.data, header_len);
1782        // header rev number
1783        _revnum = _endian_encode(file->header.revnum);
1784        memcpy((uint8_t *)buf + header_len, &_revnum,
1785               sizeof(filemgr_header_revnum_t));
1786        // file's sequence number (default KVS seqnum)
1787        _seqnum = _endian_encode(file->header.seqnum);
1788        memcpy((uint8_t *)buf + header_len + sizeof(filemgr_header_revnum_t),
1789               &_seqnum, sizeof(fdb_seqnum_t));
1790
1791        // prev header bid
1792        _prev_bid = _endian_encode(atomic_get_uint64_t(&file->header.bid));
1793        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1794               - sizeof(header_len) - sizeof(_prev_bid) - BLK_MARKER_SIZE),
1795               &_prev_bid, sizeof(_prev_bid));
1796        // header length
1797        _header_len = _endian_encode(header_len);
1798        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1799               - sizeof(header_len) - BLK_MARKER_SIZE),
1800               &_header_len, sizeof(header_len));
1801        // magic number
1802        _magic = _endian_encode(magic);
1803        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1804               - BLK_MARKER_SIZE), &_magic, sizeof(magic));
1805
1806        // marker
1807        memset(marker, BLK_MARKER_DBHEADER, BLK_MARKER_SIZE);
1808        memcpy((uint8_t *)buf + file->blocksize - BLK_MARKER_SIZE,
1809               marker, BLK_MARKER_SIZE);
1810
1811        ssize_t rv = file->ops->pwrite(file->fd, buf, file->blocksize,
1812                                       atomic_get_uint64_t(&file->pos));
1813        _log_errno_str(file->ops, log_callback, (fdb_status) rv,
1814                       "WRITE", file->filename);
1815        if (rv != file->blocksize) {
1816            _filemgr_release_temp_buf(buf);
1817            spin_unlock(&file->lock);
1818            return FDB_RESULT_WRITE_FAIL;
1819        }
1820        atomic_store_uint64_t(&file->header.bid,
1821                              atomic_get_uint64_t(&file->pos) / file->blocksize);
1822        atomic_add_uint64_t(&file->pos, file->blocksize);
1823
1824        atomic_store_uint64_t(&file->header.dirty_idtree_root, BLK_NOT_FOUND);
1825        atomic_store_uint64_t(&file->header.dirty_seqtree_root, BLK_NOT_FOUND);
1826
1827        _filemgr_release_temp_buf(buf);
1828    }
1829    // race condition?
1830    atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
1831
1832    spin_unlock(&file->lock);
1833
1834    if (file->fflags & FILEMGR_SYNC) {
1835        result = file->ops->fsync(file->fd);
1836        _log_errno_str(file->ops, log_callback, (fdb_status)result, "FSYNC", file->filename);
1837    }
1838    return (fdb_status) result;
1839}
1840
1841fdb_status filemgr_sync(struct filemgr *file, err_log_callback *log_callback)
1842{
1843    fdb_status result = FDB_RESULT_SUCCESS;
1844    if (global_config.ncacheblock > 0) {
1845        result = bcache_flush(file);
1846        if (result != FDB_RESULT_SUCCESS) {
1847            _log_errno_str(file->ops, log_callback, (fdb_status) result,
1848                           "FLUSH", file->filename);
1849            return result;
1850        }
1851    }
1852
1853    if (file->fflags & FILEMGR_SYNC) {
1854        int rv = file->ops->fsync(file->fd);
1855        _log_errno_str(file->ops, log_callback, (fdb_status)rv, "FSYNC", file->filename);
1856        return (fdb_status) rv;
1857    }
1858    return result;
1859}
1860
1861int filemgr_update_file_status(struct filemgr *file, file_status_t status,
1862                                char *old_filename)
1863{
1864    int ret = 1;
1865    spin_lock(&file->lock);
1866    atomic_store_uint8_t(&file->status, status);
1867    if (old_filename) {
1868        if (!file->old_filename) {
1869            file->old_filename = old_filename;
1870        } else {
1871            ret = 0;
1872            fdb_assert(file->ref_count, file->ref_count, 0);
1873            free(old_filename);
1874        }
1875    }
1876    spin_unlock(&file->lock);
1877    return ret;
1878}
1879
1880void filemgr_set_compaction_state(struct filemgr *old_file, struct filemgr *new_file,
1881                                  file_status_t status)
1882{
1883    spin_lock(&old_file->lock);
1884    old_file->new_file = new_file;
1885    atomic_store_uint8_t(&old_file->status, status);
1886    spin_unlock(&old_file->lock);
1887}
1888
1889// Check if there is a file that still points to the old_file that is being
1890// compacted away. If so open the file and return its pointer.
1891static
1892void *_filemgr_check_stale_link(struct hash_elem *h, void *ctx) {
1893    struct filemgr *cur_file = (struct filemgr *)ctx;
1894    struct filemgr *file = _get_entry(h, struct filemgr, e);
1895    spin_lock(&file->lock);
1896    if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING &&
1897        file->new_file == cur_file) {
1898        // Incrementing reference counter below is the same as filemgr_open()
1899        // We need to do this to ensure that the pointer returned does not
1900        // get freed outside the filemgr_open lock
1901        file->ref_count++;
1902        spin_unlock(&file->lock);
1903        return (void *)file;
1904    }
1905    spin_unlock(&file->lock);
1906    return (void *)NULL;
1907}
1908
1909struct filemgr *filemgr_search_stale_links(struct filemgr *cur_file) {
1910    struct filemgr *very_old_file;
1911    spin_lock(&filemgr_openlock);
1912    very_old_file = (struct filemgr *)hash_scan(&hash,
1913                                         _filemgr_check_stale_link, cur_file);
1914    spin_unlock(&filemgr_openlock);
1915    return very_old_file;
1916}
1917
1918char *filemgr_redirect_old_file(struct filemgr *very_old_file,
1919                                     struct filemgr *new_file,
1920                                     filemgr_redirect_hdr_func
1921                                     redirect_header_func) {
1922    size_t old_header_len, new_header_len;
1923    uint16_t new_filename_len;
1924    char *past_filename;
1925    spin_lock(&very_old_file->lock);
1926    fdb_assert(very_old_file->header.size, very_old_file->header.size, 0);
1927    fdb_assert(very_old_file->new_file, very_old_file->new_file, 0);
1928    old_header_len = very_old_file->header.size;
1929    new_filename_len = strlen(new_file->filename);
1930    // Find out the new DB header length with new_file's filename
1931    new_header_len = old_header_len - strlen(very_old_file->new_file->filename)
1932        + new_filename_len;
1933    // As we are going to change the new_filename field in the DB header of the
1934    // very_old_file, maybe reallocate DB header buf to accomodate bigger value
1935    if (new_header_len > old_header_len) {
1936        very_old_file->header.data = realloc(very_old_file->header.data,
1937                new_header_len);
1938    }
1939    very_old_file->new_file = new_file; // Re-direct very_old_file to new_file
1940    past_filename = redirect_header_func((uint8_t *)very_old_file->header.data,
1941            new_file->filename, new_filename_len + 1);//Update in-memory header
1942    very_old_file->header.size = new_header_len;
1943    ++(very_old_file->header.revnum);
1944
1945    spin_unlock(&very_old_file->lock);
1946    return past_filename;
1947}
1948
1949void filemgr_remove_pending(struct filemgr *old_file, struct filemgr *new_file)
1950{
1951    fdb_assert(new_file, new_file, old_file);
1952
1953    spin_lock(&old_file->lock);
1954    if (old_file->ref_count > 0) {
1955        // delay removing
1956        old_file->new_file = new_file;
1957        atomic_store_uint8_t(&old_file->status, FILE_REMOVED_PENDING);
1958        spin_unlock(&old_file->lock);
1959    } else {
1960        // immediatly remove
1961        // LCOV_EXCL_START
1962        spin_unlock(&old_file->lock);
1963
1964        if (!lazy_file_deletion_enabled ||
1965            (old_file->new_file && old_file->new_file->in_place_compaction)) {
1966            remove(old_file->filename);
1967        }
1968        filemgr_remove_file(old_file);
1969        // LCOV_EXCL_STOP
1970    }
1971}
1972
1973// migrate default kv store stats over to new_file
1974struct kvs_ops_stat *filemgr_migrate_op_stats(struct filemgr *old_file,
1975                                              struct filemgr *new_file,
1976                                              struct kvs_info *kvs)
1977{
1978    kvs_ops_stat *ret = NULL;
1979    fdb_assert(new_file, new_file, old_file);
1980
1981    spin_lock(&old_file->lock);
1982    new_file->header.op_stat = old_file->header.op_stat;
1983    ret = &new_file->header.op_stat;
1984    spin_unlock(&old_file->lock);
1985    return ret;
1986}
1987
1988// Note: filemgr_openlock should be held before calling this function.
1989fdb_status filemgr_destroy_file(char *filename,
1990                                struct filemgr_config *config,
1991                                struct hash *destroy_file_set)
1992{
1993    struct filemgr *file = NULL;
1994    struct hash to_destroy_files;
1995    struct hash *destroy_set = (destroy_file_set ? destroy_file_set :
1996                                                  &to_destroy_files);
1997    struct filemgr query;
1998    struct hash_elem *e = NULL;
1999    fdb_status status = FDB_RESULT_SUCCESS;
2000    char *old_filename = NULL;
2001
2002    if (!destroy_file_set) { // top level or non-recursive call
2003        hash_init(destroy_set, NBUCKET, _file_hash, _file_cmp);
2004    }
2005
2006    query.filename = filename;
2007    // check whether file is already being destroyed in parent recursive call
2008    e = hash_find(destroy_set, &query.e);
2009    if (e) { // Duplicate filename found, nothing to be done in this call
2010        if (!destroy_file_set) { // top level or non-recursive call
2011            hash_free(destroy_set);
2012        }
2013        return status;
2014    } else {
2015        // Remember file. Stack value ok IFF single direction recursion
2016        hash_insert(destroy_set, &query.e);
2017    }
2018
2019    // check global list of known files to see if it is already opened or not
2020    e = hash_find(&hash, &query.e);
2021    if (e) {
2022        // already opened (return existing structure)
2023        file = _get_entry(e, struct filemgr, e);
2024
2025        spin_lock(&file->lock);
2026        if (file->ref_count) {
2027            spin_unlock(&file->lock);
2028            status = FDB_RESULT_FILE_IS_BUSY;
2029            if (!destroy_file_set) { // top level or non-recursive call
2030                hash_free(destroy_set);
2031            }
2032            return status;
2033        }
2034        spin_unlock(&file->lock);
2035        if (file->old_filename) {
2036            status = filemgr_destroy_file(file->old_filename, config,
2037                                          destroy_set);
2038            if (status != FDB_RESULT_SUCCESS) {
2039                if (!destroy_file_set) { // top level or non-recursive call
2040                    hash_free(destroy_set);
2041                }
2042                return status;
2043            }
2044        }
2045
2046        // Cleanup file from in-memory as well as on-disk
2047        e = hash_remove(&hash, &file->e);
2048        fdb_assert(e, e, 0);
2049        filemgr_free_func(&file->e);
2050        if (filemgr_does_file_exist(filename) == FDB_RESULT_SUCCESS) {
2051            if (remove(filename)) {
2052                status = FDB_RESULT_FILE_REMOVE_FAIL;
2053            }
2054        }
2055    } else { // file not in memory, read on-disk to destroy older versions..
2056        file = (struct filemgr *)alca(struct filemgr, 1);
2057        file->filename = filename;
2058        file->ops = get_filemgr_ops();
2059        file->fd = file->ops->open(file->filename, O_RDWR, 0666);
2060        file->blocksize = global_config.blocksize;
2061        if (file->fd < 0) {
2062            if (file->fd != FDB_RESULT_NO_SUCH_FILE) {
2063                if (!destroy_file_set) { // top level or non-recursive call
2064                    hash_free(destroy_set);
2065                }
2066                return FDB_RESULT_OPEN_FAIL;
2067            }
2068        } else { // file successfully opened, seek to end to get DB header
2069            cs_off_t offset = file->ops->goto_eof(file->fd);
2070            if (offset == FDB_RESULT_SEEK_FAIL) {
2071                if (!destroy_file_set) { // top level or non-recursive call
2072                    hash_free(destroy_set);
2073                }
2074                return FDB_RESULT_SEEK_FAIL;
2075            } else { // Need to read DB header which contains old filename
2076                atomic_store_uint64_t(&file->pos, offset);
2077                status = _filemgr_read_header(file, NULL);
2078                if (status != FDB_RESULT_SUCCESS) {
2079                    if (!destroy_file_set) { // top level or non-recursive call
2080                        hash_free(destroy_set);
2081                    }
2082                    file->ops->close(file->fd);
2083                    return status;
2084                }
2085                if (file->header.data) {
2086                    uint16_t *new_filename_len_ptr = (uint16_t *)((char *)
2087                                                     file->header.data + 64);
2088                    uint16_t new_filename_len =
2089                                      _endian_decode(*new_filename_len_ptr);
2090                    uint16_t *old_filename_len_ptr = (uint16_t *)((char *)
2091                                                     file->header.data + 66);
2092                    uint16_t old_filename_len =
2093                                      _endian_decode(*old_filename_len_ptr);
2094                    old_filename = (char *)file->header.data + 68
2095                                   + new_filename_len;
2096                    if (old_filename_len) {
2097                        status = filemgr_destroy_file(old_filename, config,
2098                                                      destroy_set);
2099                    }
2100                    free(file->header.data);
2101                }
2102                file->ops->close(file->fd);
2103                if (status == FDB_RESULT_SUCCESS) {
2104                    if (filemgr_does_file_exist(filename)
2105                                               == FDB_RESULT_SUCCESS) {
2106                        if (remove(filename)) {
2107                            status = FDB_RESULT_FILE_REMOVE_FAIL;
2108                        }
2109                    }
2110                }
2111            }
2112        }
2113    }
2114
2115    if (!destroy_file_set) { // top level or non-recursive call
2116        hash_free(destroy_set);
2117    }
2118
2119    return status;
2120}
2121
2122bool filemgr_is_rollback_on(struct filemgr *file)
2123{
2124    bool rv;
2125    spin_lock(&file->lock);
2126    rv = (file->fflags & FILEMGR_ROLLBACK_IN_PROG);
2127    spin_unlock(&file->lock);
2128    return rv;
2129}
2130
2131void filemgr_set_rollback(struct filemgr *file, uint8_t new_val)
2132{
2133    spin_lock(&file->lock);
2134    if (new_val) {
2135        file->fflags |= FILEMGR_ROLLBACK_IN_PROG;
2136    } else {
2137        file->fflags &= ~FILEMGR_ROLLBACK_IN_PROG;
2138    }
2139    spin_unlock(&file->lock);
2140}
2141
2142void filemgr_set_in_place_compaction(struct filemgr *file,
2143                                     bool in_place_compaction) {
2144    spin_lock(&file->lock);
2145    file->in_place_compaction = in_place_compaction;
2146    spin_unlock(&file->lock);
2147}
2148
2149bool filemgr_is_in_place_compaction_set(struct filemgr *file)
2150{
2151    bool ret = false;
2152    spin_lock(&file->lock);
2153    ret = file->in_place_compaction;
2154    spin_unlock(&file->lock);
2155    return ret;
2156}
2157
2158void filemgr_mutex_openlock(struct filemgr_config *config)
2159{
2160    filemgr_init(config);
2161
2162    spin_lock(&filemgr_openlock);
2163}
2164
2165void filemgr_mutex_openunlock(void)
2166{
2167    spin_unlock(&filemgr_openlock);
2168}
2169
2170void filemgr_mutex_lock(struct filemgr *file)
2171{
2172    mutex_lock(&file->writer_lock.mutex);
2173    file->writer_lock.locked = true;
2174}
2175
2176bool filemgr_mutex_trylock(struct filemgr *file) {
2177    if (mutex_trylock(&file->writer_lock.mutex)) {
2178        file->writer_lock.locked = true;
2179        return true;
2180    }
2181    return false;
2182}
2183
2184void filemgr_mutex_unlock(struct filemgr *file)
2185{
2186    if (file->writer_lock.locked) {
2187        file->writer_lock.locked = false;
2188        mutex_unlock(&file->writer_lock.mutex);
2189    }
2190}
2191
2192void filemgr_set_dirty_root(struct filemgr *file,
2193                            bid_t dirty_idtree_root,
2194                            bid_t dirty_seqtree_root)
2195{
2196    atomic_store_uint64_t(&file->header.dirty_idtree_root, dirty_idtree_root);
2197    atomic_store_uint64_t(&file->header.dirty_seqtree_root, dirty_seqtree_root);
2198}
2199
2200bool filemgr_is_commit_header(void *head_buffer, size_t blocksize)
2201{
2202    uint8_t marker[BLK_MARKER_SIZE];
2203    filemgr_magic_t magic;
2204    marker[0] = *(((uint8_t *)head_buffer)
2205                 + blocksize - BLK_MARKER_SIZE);
2206    if (marker[0] != BLK_MARKER_DBHEADER) {
2207        return false;
2208    }
2209
2210    memcpy(&magic, (uint8_t *) head_buffer
2211            + blocksize - BLK_MARKER_SIZE - sizeof(magic), sizeof(magic));
2212    magic = _endian_decode(magic);
2213
2214    return (magic == FILEMGR_MAGIC);
2215}
2216
2217void _kvs_stat_set(struct filemgr *file,
2218                   fdb_kvs_id_t kv_id,
2219                   struct kvs_stat stat)
2220{
2221    if (kv_id == 0) {
2222        spin_lock(&file->lock);
2223        file->header.stat = stat;
2224        spin_unlock(&file->lock);
2225    } else {
2226        struct avl_node *a;
2227        struct kvs_node query, *node;
2228        struct kvs_header *kv_header = file->kv_header;
2229
2230        spin_lock(&kv_header->lock);
2231        query.id = kv_id;
2232        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2233        if (a) {
2234            node = _get_entry(a, struct kvs_node, avl_id);
2235            node->stat = stat;
2236        }
2237        spin_unlock(&kv_header->lock);
2238    }
2239}
2240
2241void _kvs_stat_update_attr(struct filemgr *file,
2242                           fdb_kvs_id_t kv_id,
2243                           kvs_stat_attr_t attr,
2244                           int delta)
2245{
2246    spin_t *lock = NULL;
2247    struct kvs_stat *stat;
2248
2249    if (kv_id == 0) {
2250        stat = &file->header.stat;
2251        lock = &file->lock;
2252        spin_lock(lock);
2253    } else {
2254        struct avl_node *a;
2255        struct kvs_node query, *node;
2256        struct kvs_header *kv_header = file->kv_header;
2257
2258        lock = &kv_header->lock;
2259        spin_lock(lock);
2260        query.id = kv_id;
2261        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2262        if (!a) {
2263            // KV instance corresponding to the kv_id is already removed
2264            spin_unlock(lock);
2265            return;
2266        }
2267        node = _get_entry(a, struct kvs_node, avl_id);
2268        stat = &node->stat;
2269    }
2270
2271    if (attr == KVS_STAT_DATASIZE) {
2272        stat->datasize += delta;
2273    } else if (attr == KVS_STAT_NDOCS) {
2274        stat->ndocs += delta;
2275    } else if (attr == KVS_STAT_NLIVENODES) {
2276        stat->nlivenodes += delta;
2277    } else if (attr == KVS_STAT_WAL_NDELETES) {
2278        stat->wal_ndeletes += delta;
2279    } else if (attr == KVS_STAT_WAL_NDOCS) {
2280        stat->wal_ndocs += delta;
2281    }
2282    spin_unlock(lock);
2283}
2284
2285int _kvs_stat_get_kv_header(struct kvs_header *kv_header,
2286                            fdb_kvs_id_t kv_id,
2287                            struct kvs_stat *stat)
2288{
2289    int ret = 0;
2290    struct avl_node *a;
2291    struct kvs_node query, *node;
2292
2293    query.id = kv_id;
2294    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2295    if (a) {
2296        node = _get_entry(a, struct kvs_node, avl_id);
2297        *stat = node->stat;
2298    } else {
2299        ret = -1;
2300    }
2301    return ret;
2302}
2303
2304int _kvs_stat_get(struct filemgr *file,
2305                  fdb_kvs_id_t kv_id,
2306                  struct kvs_stat *stat)
2307{
2308    int ret = 0;
2309
2310    if (kv_id == 0) {
2311        spin_lock(&file->lock);
2312        *stat = file->header.stat;
2313        spin_unlock(&file->lock);
2314    } else {
2315        struct kvs_header *kv_header = file->kv_header;
2316
2317        spin_lock(&kv_header->lock);
2318        ret = _kvs_stat_get_kv_header(kv_header, kv_id, stat);
2319        spin_unlock(&kv_header->lock);
2320    }
2321
2322    return ret;
2323}
2324
2325uint64_t _kvs_stat_get_sum(struct filemgr *file,
2326                           kvs_stat_attr_t attr)
2327{
2328    struct avl_node *a;
2329    struct kvs_node *node;
2330    struct kvs_header *kv_header = file->kv_header;
2331
2332    uint64_t ret = 0;
2333    spin_lock(&file->lock);
2334    if (attr == KVS_STAT_DATASIZE) {
2335        ret += file->header.stat.datasize;
2336    } else if (attr == KVS_STAT_NDOCS) {
2337        ret += file->header.stat.ndocs;
2338    } else if (attr == KVS_STAT_NLIVENODES) {
2339        ret += file->header.stat.nlivenodes;
2340    } else if (attr == KVS_STAT_WAL_NDELETES) {
2341        ret += file->header.stat.wal_ndeletes;
2342    } else if (attr == KVS_STAT_WAL_NDOCS) {
2343        ret += file->header.stat.wal_ndocs;
2344    }
2345    spin_unlock(&file->lock);
2346
2347    if (kv_header) {
2348        spin_lock(&kv_header->lock);
2349        a = avl_first(kv_header->idx_id);
2350        while (a) {
2351            node = _get_entry(a, struct kvs_node, avl_id);
2352            a = avl_next(&node->avl_id);
2353
2354            if (attr == KVS_STAT_DATASIZE) {
2355                ret += node->stat.datasize;
2356            } else if (attr == KVS_STAT_NDOCS) {
2357                ret += node->stat.ndocs;
2358            } else if (attr == KVS_STAT_NLIVENODES) {
2359                ret += node->stat.nlivenodes;
2360            } else if (attr == KVS_STAT_WAL_NDELETES) {
2361                ret += node->stat.wal_ndeletes;
2362            } else if (attr == KVS_STAT_WAL_NDOCS) {
2363                ret += node->stat.wal_ndocs;
2364            }
2365        }
2366        spin_unlock(&kv_header->lock);
2367    }
2368
2369    return ret;
2370}
2371
2372int _kvs_ops_stat_get_kv_header(struct kvs_header *kv_header,
2373                                fdb_kvs_id_t kv_id,
2374                                struct kvs_ops_stat *stat)
2375{
2376    int ret = 0;
2377    struct avl_node *a;
2378    struct kvs_node query, *node;
2379
2380    query.id = kv_id;
2381    a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2382    if (a) {
2383        node = _get_entry(a, struct kvs_node, avl_id);
2384        *stat = node->op_stat;
2385    } else {
2386        ret = -1;
2387    }
2388    return ret;
2389}
2390
2391int _kvs_ops_stat_get(struct filemgr *file,
2392                      fdb_kvs_id_t kv_id,
2393                      struct kvs_ops_stat *stat)
2394{
2395    int ret = 0;
2396
2397    if (kv_id == 0) {
2398        spin_lock(&file->lock);
2399        *stat = file->header.op_stat;
2400        spin_unlock(&file->lock);
2401    } else {
2402        struct kvs_header *kv_header = file->kv_header;
2403
2404        spin_lock(&kv_header->lock);
2405        ret = _kvs_ops_stat_get_kv_header(kv_header, kv_id, stat);
2406        spin_unlock(&kv_header->lock);
2407    }
2408
2409    return ret;
2410}
2411
2412void _init_op_stats(struct kvs_ops_stat *stat) {
2413    atomic_init_uint64_t(&stat->num_sets, 0);
2414    atomic_init_uint64_t(&stat->num_dels, 0);
2415    atomic_init_uint64_t(&stat->num_commits, 0);
2416    atomic_init_uint64_t(&stat->num_compacts, 0);
2417    atomic_init_uint64_t(&stat->num_gets, 0);
2418    atomic_init_uint64_t(&stat->num_iterator_gets, 0);
2419    atomic_init_uint64_t(&stat->num_iterator_moves, 0);
2420}
2421
2422struct kvs_ops_stat *filemgr_get_ops_stats(struct filemgr *file,
2423                                           struct kvs_info *kvs)
2424{
2425    struct kvs_ops_stat *stat = NULL;
2426    if (!kvs || (kvs && kvs->id == 0)) {
2427        return &file->header.op_stat;
2428    } else {
2429        struct kvs_header *kv_header = file->kv_header;
2430        struct avl_node *a;
2431        struct kvs_node query, *node;
2432        spin_lock(&kv_header->lock);
2433        query.id = kvs->id;
2434        a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2435        if (a) {
2436            node = _get_entry(a, struct kvs_node, avl_id);
2437            stat = &node->op_stat;
2438        }
2439        spin_unlock(&kv_header->lock);
2440    }
2441    return stat;
2442}
2443
2444void buf2kvid(size_t chunksize, void *buf, fdb_kvs_id_t *id)
2445{
2446    size_t size_id = sizeof(fdb_kvs_id_t);
2447    fdb_kvs_id_t temp;
2448
2449    if (chunksize == size_id) {
2450        temp = *((fdb_kvs_id_t*)buf);
2451    } else if (chunksize < size_id) {
2452        temp = 0;
2453        memcpy((uint8_t*)&temp + (size_id - chunksize), buf, chunksize);
2454    } else { // chunksize > sizeof(fdb_kvs_id_t)
2455        memcpy(&temp, (uint8_t*)buf + (chunksize - size_id), size_id);
2456    }
2457    *id = _endian_decode(temp);
2458}
2459
2460void kvid2buf(size_t chunksize, fdb_kvs_id_t id, void *buf)
2461{
2462    size_t size_id = sizeof(fdb_kvs_id_t);
2463    id = _endian_encode(id);
2464
2465    if (chunksize == size_id) {
2466        memcpy(buf, &id, size_id);
2467    } else if (chunksize < size_id) {
2468        memcpy(buf, (uint8_t*)&id + (size_id - chunksize), chunksize);
2469    } else { // chunksize > sizeof(fdb_kvs_id_t)
2470        memset(buf, 0x0, chunksize - size_id);
2471        memcpy((uint8_t*)buf + (chunksize - size_id), &id, size_id);
2472    }
2473}
2474
2475void buf2buf(size_t chunksize_src, void *buf_src,
2476             size_t chunksize_dst, void *buf_dst)
2477{
2478    if (chunksize_dst == chunksize_src) {
2479        memcpy(buf_dst, buf_src, chunksize_src);
2480    } else if (chunksize_dst < chunksize_src) {
2481        memcpy(buf_dst, (uint8_t*)buf_src + (chunksize_src - chunksize_dst),
2482               chunksize_dst);
2483    } else { // chunksize_dst > chunksize_src
2484        memset(buf_dst, 0x0, chunksize_dst - chunksize_src);
2485        memcpy((uint8_t*)buf_dst + (chunksize_dst - chunksize_src),
2486               buf_src, chunksize_src);
2487    }
2488}
2489