xref: /6.6.0/forestdb/src/filemgr.cc (revision 6428a5c4)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <sys/stat.h>
23#include <stdarg.h>
24#if !defined(WIN32) && !defined(_WIN32)
25#include <sys/time.h>
26#include <libgen.h>
27#endif
28#include <time.h>
29
30#include "filemgr.h"
31#include "filemgr_ops.h"
32#include "hash_functions.h"
33#include "blockcache.h"
34#include "wal.h"
35#include "list.h"
36#include "fdb_internal.h"
37#include "time_utils.h"
38#include "encryption.h"
39#include "version.h"
40
41#include "memleak.h"
42
43#ifdef __DEBUG
44#ifndef __DEBUG_FILEMGR
45    #undef DBG
46    #undef DBGCMD
47    #undef DBGSW
48    #define DBG(...)
49    #define DBGCMD(...)
50    #define DBGSW(n, ...)
51#endif
52#endif
53
54// NBUCKET must be power of 2
55#define NBUCKET (1024)
56
57// global static variables
58#ifdef SPIN_INITIALIZER
59static spin_t initial_lock = SPIN_INITIALIZER;
60#else
61static volatile unsigned int initial_lock_status = 0;
62static spin_t initial_lock;
63#endif
64
65
66static volatile uint8_t filemgr_initialized = 0;
67extern volatile uint8_t bgflusher_initialized;
68static struct filemgr_config global_config;
69static struct hash hash;
70static spin_t filemgr_openlock;
71
72static const int MAX_STAT_UPDATE_RETRIES = 5;
73
74struct temp_buf_item{
75    void *addr;
76    struct list_elem le;
77};
78static struct list temp_buf;
79static spin_t temp_buf_lock;
80
81static bool lazy_file_deletion_enabled = false;
82static register_file_removal_func register_file_removal = NULL;
83static check_file_removal_func is_file_removed = NULL;
84
85static struct sb_ops sb_ops;
86
87static void spin_init_wrap(void *lock) {
88    spin_init((spin_t*)lock);
89}
90
91static void spin_destroy_wrap(void *lock) {
92    spin_destroy((spin_t*)lock);
93}
94
95static void spin_lock_wrap(void *lock) {
96    spin_lock((spin_t*)lock);
97}
98
99static void spin_unlock_wrap(void *lock) {
100    spin_unlock((spin_t*)lock);
101}
102
103static void mutex_init_wrap(void *lock) {
104    mutex_init((mutex_t*)lock);
105}
106
107static void mutex_destroy_wrap(void *lock) {
108    mutex_destroy((mutex_t*)lock);
109}
110
111static void mutex_lock_wrap(void *lock) {
112    mutex_lock((mutex_t*)lock);
113}
114
115static void mutex_unlock_wrap(void *lock) {
116    mutex_unlock((mutex_t*)lock);
117}
118
119static int _kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
120{
121    struct kvs_node *aa, *bb;
122    aa = _get_entry(a, struct kvs_node, avl_id);
123    bb = _get_entry(b, struct kvs_node, avl_id);
124
125    if (aa->id < bb->id) {
126        return -1;
127    } else if (aa->id > bb->id) {
128        return 1;
129    } else {
130        return 0;
131    }
132}
133
134static int _block_is_overlapped(void *pbid1, void *pis_writer1,
135                                void *pbid2, void *pis_writer2,
136                                void *aux)
137{
138    (void)aux;
139    bid_t bid1, is_writer1, bid2, is_writer2;
140    bid1 = *(bid_t*)pbid1;
141    is_writer1 = *(bid_t*)pis_writer1;
142    bid2 = *(bid_t*)pbid2;
143    is_writer2 = *(bid_t*)pis_writer2;
144
145    if (bid1 != bid2) {
146        // not overlapped
147        return 0;
148    } else {
149        // overlapped
150        if (!is_writer1 && !is_writer2) {
151            // both are readers
152            return 0;
153        } else {
154            return 1;
155        }
156    }
157}
158
159void printISOTime(char* buffer, size_t buffer_len) {
160    struct tm* tm_now;
161    time_t rawtime;
162    time(&rawtime);
163    tm_now = localtime(&rawtime);
164
165    // 2017-06-22T10:00:00
166    size_t time_len = strftime(buffer, buffer_len,
167                               "%Y-%m-%dT%H:%M:%S", tm_now);
168
169    // Add milliseconds
170    timeval cur_time;
171    gettimeofday(&cur_time, NULL);
172    size_t milli = cur_time.tv_usec / 1000;
173    // 2017-06-22T10:00:00.123
174    sprintf(buffer + time_len, ".%03d", (int)milli);
175    time_len += 4;
176
177    // timezone offset format: -0500
178    char tz_offset_str[6];
179    size_t offset_len =  strftime(tz_offset_str, 6,
180                                  "%z", tm_now);
181    if (offset_len < 5) {
182        // Time zone info is not supported, skip it.
183        return;
184    }
185
186    // hour
187    strncat(buffer, tz_offset_str, 3);
188    // :
189    strcat(buffer, ":");
190    // min
191    strncat(buffer, tz_offset_str + 3, 2);
192    // final format: 2017-06-22T10:00:00.123-05:00
193}
194
195fdb_status fdb_log(err_log_callback *log_callback,
196                   fdb_status status,
197                   const char *format, ...)
198{
199    char msg[4096];
200    va_list args;
201    va_start(args, format);
202    vsprintf(msg, format, args);
203    va_end(args);
204
205    if (log_callback && log_callback->callback) {
206        log_callback->callback(status, msg, log_callback->ctx_data);
207    } else {
208        char ISO_time_buffer[64];
209        printISOTime(ISO_time_buffer, 64);
210        if (status != FDB_RESULT_SUCCESS) {
211            fprintf(stderr, "%s [ERRO][FDB] %s\n", ISO_time_buffer, msg);
212        } else {
213            fprintf(stderr, "%s [INFO][FDB] %s\n", ISO_time_buffer, msg);
214        }
215    }
216    return status;
217}
218
219static void _log_errno_str(struct filemgr_ops *ops,
220                           err_log_callback *log_callback,
221                           fdb_status io_error,
222                           const char *what,
223                           const char *filename)
224{
225    if (io_error < 0) {
226        char errno_msg[512];
227        ops->get_errno_str(errno_msg, 512);
228        fdb_log(log_callback, io_error,
229                "Error in %s on a database file '%s', %s", what, filename, errno_msg);
230    }
231}
232
233static uint32_t _file_hash(struct hash *hash, struct hash_elem *e)
234{
235    struct filemgr *file = _get_entry(e, struct filemgr, e);
236    int len = strlen(file->filename);
237
238    return get_checksum(reinterpret_cast<const uint8_t*>(file->filename), len) &
239                        ((unsigned)(NBUCKET-1));
240}
241
242static int _file_cmp(struct hash_elem *a, struct hash_elem *b)
243{
244    struct filemgr *aa, *bb;
245    aa = _get_entry(a, struct filemgr, e);
246    bb = _get_entry(b, struct filemgr, e);
247    return strcmp(aa->filename, bb->filename);
248}
249
250void filemgr_init(struct filemgr_config *config)
251{
252    // global initialization
253    // initialized only once at first time
254    if (!filemgr_initialized) {
255#ifndef SPIN_INITIALIZER
256        // Note that only Windows passes through this routine
257        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
258            // atomically initialize spin lock only once
259            spin_init(&initial_lock);
260            initial_lock_status = 2;
261        } else {
262            // the others ... wait until initializing 'initial_lock' is done
263            while (initial_lock_status != 2) {
264                Sleep(1);
265            }
266        }
267#endif
268
269        spin_lock(&initial_lock);
270        if (!filemgr_initialized) {
271            memset(&sb_ops, 0x0, sizeof(sb_ops));
272            global_config = *config;
273
274            if (global_config.ncacheblock > 0)
275                bcache_init(global_config.ncacheblock, global_config.blocksize);
276
277            hash_init(&hash, NBUCKET, _file_hash, _file_cmp);
278
279            // initialize temp buffer
280            list_init(&temp_buf);
281            spin_init(&temp_buf_lock);
282
283            // initialize global lock
284            spin_init(&filemgr_openlock);
285
286            // set the initialize flag
287            filemgr_initialized = 1;
288        }
289        spin_unlock(&initial_lock);
290    }
291}
292
293void filemgr_set_lazy_file_deletion(bool enable,
294                                    register_file_removal_func regis_func,
295                                    check_file_removal_func check_func)
296{
297    lazy_file_deletion_enabled = enable;
298    register_file_removal = regis_func;
299    is_file_removed = check_func;
300}
301
302void filemgr_set_sb_operation(struct sb_ops ops)
303{
304    sb_ops = ops;
305}
306
307static void * _filemgr_get_temp_buf()
308{
309    struct list_elem *e;
310    struct temp_buf_item *item;
311
312    spin_lock(&temp_buf_lock);
313    e = list_pop_front(&temp_buf);
314    if (e) {
315        item = _get_entry(e, struct temp_buf_item, le);
316    } else {
317        void *addr = NULL;
318
319        malloc_align(addr, FDB_SECTOR_SIZE,
320                     global_config.blocksize + sizeof(struct temp_buf_item));
321
322        item = (struct temp_buf_item *)((uint8_t *) addr + global_config.blocksize);
323        item->addr = addr;
324    }
325    spin_unlock(&temp_buf_lock);
326
327    return item->addr;
328}
329
330static void _filemgr_release_temp_buf(void *buf)
331{
332    struct temp_buf_item *item;
333
334    spin_lock(&temp_buf_lock);
335    item = (struct temp_buf_item*)((uint8_t *)buf + global_config.blocksize);
336    list_push_front(&temp_buf, &item->le);
337    spin_unlock(&temp_buf_lock);
338}
339
340static void _filemgr_shutdown_temp_buf()
341{
342    struct list_elem *e;
343    struct temp_buf_item *item;
344    size_t count=0;
345
346    spin_lock(&temp_buf_lock);
347    e = list_begin(&temp_buf);
348    while(e){
349        item = _get_entry(e, struct temp_buf_item, le);
350        e = list_remove(&temp_buf, e);
351        free_align(item->addr);
352        count++;
353    }
354    spin_unlock(&temp_buf_lock);
355}
356
357// Read a block from the file, decrypting if necessary.
358static ssize_t filemgr_read_block(struct filemgr *file, void *buf, bid_t bid) {
359    ssize_t result = file->ops->pread(file->fd, buf, file->blocksize,
360                                      file->blocksize*bid);
361    if (file->encryption.ops && result > 0) {
362        if (result != (ssize_t)file->blocksize)
363            return FDB_RESULT_READ_FAIL;
364        fdb_status status = fdb_decrypt_block(&file->encryption, buf, result, bid);
365        if (status != FDB_RESULT_SUCCESS)
366            return status;
367    }
368    return result;
369}
370
371// Write consecutive block(s) to the file, encrypting if necessary.
372ssize_t filemgr_write_blocks(struct filemgr *file, void *buf, unsigned num_blocks, bid_t start_bid) {
373    size_t blocksize = file->blocksize;
374    cs_off_t offset = start_bid * blocksize;
375    size_t nbytes = num_blocks * blocksize;
376    if (file->encryption.ops == NULL) {
377        return file->ops->pwrite(file->fd, buf, nbytes, offset);
378    } else {
379        uint8_t *encrypted_buf;
380        if (nbytes > 4096)
381            encrypted_buf = (uint8_t*)malloc(nbytes);
382        else
383            encrypted_buf = alca(uint8_t, nbytes); // most common case (writing single block)
384        if (!encrypted_buf)
385            return FDB_RESULT_ALLOC_FAIL;
386        fdb_status status = fdb_encrypt_blocks(&file->encryption,
387                                               encrypted_buf,
388                                               buf,
389                                               blocksize,
390                                               num_blocks,
391                                               start_bid);
392        if (nbytes > 4096)
393            free(encrypted_buf);
394        if (status != FDB_RESULT_SUCCESS)
395            return status;
396        return file->ops->pwrite(file->fd, encrypted_buf, nbytes, offset);
397    }
398}
399
400int filemgr_is_writable(struct filemgr *file, bid_t bid)
401{
402    if (sb_bmp_exists(file->sb) && sb_ops.is_writable) {
403        // block reusing is enabled
404        return sb_ops.is_writable(file, bid);
405    } else {
406        uint64_t pos = bid * file->blocksize;
407        // Note that we don't need to grab file->lock here because
408        // 1) both file->pos and file->last_commit are only incremented.
409        // 2) file->last_commit is updated using the value of file->pos,
410        //    and always equal to or smaller than file->pos.
411        return (pos <  atomic_get_uint64_t(&file->pos) &&
412                pos >= atomic_get_uint64_t(&file->last_commit));
413    }
414}
415
416uint64_t filemgr_get_sb_bmp_revnum(struct filemgr *file)
417{
418    if (file->sb && sb_ops.get_bmp_revnum) {
419        return sb_ops.get_bmp_revnum(file);
420    } else {
421        return 0;
422    }
423}
424
425static fdb_status _filemgr_read_header(struct filemgr *file,
426                                       err_log_callback *log_callback)
427{
428    uint8_t marker[BLK_MARKER_SIZE];
429    filemgr_magic_t magic = ver_get_latest_magic();
430    filemgr_header_len_t len;
431    uint8_t *buf;
432    uint32_t crc, crc_file;
433    bool check_crc32_open_rule = false;
434    fdb_status status = FDB_RESULT_SUCCESS;
435    bid_t hdr_bid, hdr_bid_local;
436    size_t min_filesize = 0;
437    size_t bad_header_count = 0;
438
439    // get temp buffer
440    buf = (uint8_t *) _filemgr_get_temp_buf();
441
442    // If a header is found crc_mode can change to reflect the file
443    if (file->crc_mode == CRC32) {
444        check_crc32_open_rule = true;
445    }
446
447    hdr_bid = atomic_get_uint64_t(&file->pos) / file->blocksize - 1;
448    hdr_bid_local = hdr_bid;
449
450    if (file->sb) {
451        // superblock exists .. file size does not start from zero.
452        min_filesize = file->sb->config->num_sb * file->blocksize;
453        bid_t sb_last_hdr_bid = atomic_get_uint64_t(&file->sb->last_hdr_bid);
454        if (sb_last_hdr_bid != BLK_NOT_FOUND) {
455            hdr_bid = hdr_bid_local = sb_last_hdr_bid;
456        }
457        // if header info does not exist in superblock,
458        // get DB header at the end of the file.
459    }
460
461    if (atomic_get_uint64_t(&file->pos) > min_filesize) {
462        // Crash Recovery Test 1: unaligned last block write
463        uint64_t remain = atomic_get_uint64_t(&file->pos) % file->blocksize;
464        if (remain) {
465            atomic_sub_uint64_t(&file->pos, remain);
466            atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
467            const char *msg = "Crash Detected: %" _F64 " non-block aligned bytes discarded "
468                "from a database file '%s'\n";
469            DBG(msg, remain, file->filename);
470            fdb_log(log_callback, FDB_RESULT_READ_FAIL /* Need to add a better error code*/,
471                    msg, remain, file->filename);
472        }
473
474        size_t block_counter = 0;
475        do {
476            if (hdr_bid_local * file->blocksize >= file->pos) {
477                // Handling EOF scenario
478                status = FDB_RESULT_NO_DB_HEADERS;
479                const char *msg = "Unable to read block from file '%s' as EOF "
480                                  "reached\n";
481                fdb_log(log_callback, status, msg, file->filename);
482                break;
483            }
484            ssize_t rv = filemgr_read_block(file, buf, hdr_bid_local);
485            if (rv != (ssize_t)file->blocksize) {
486                status = (fdb_status) rv;
487                const char *msg = "Unable to read a database file '%s' with "
488                                  "blocksize %u\n";
489                DBG(msg, file->filename, file->blocksize);
490                fdb_log(log_callback, status, msg, file->filename, file->blocksize);
491                break;
492            }
493            ++block_counter;
494            memcpy(marker, buf + file->blocksize - BLK_MARKER_SIZE,
495                   BLK_MARKER_SIZE);
496
497            if (marker[0] == BLK_MARKER_DBHEADER) {
498                // possible need for byte conversions here
499                memcpy(&magic,
500                       buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
501                       sizeof(magic));
502                magic = _endian_decode(magic);
503
504                if (ver_is_valid_magic(magic)) {
505
506                    memcpy(&len,
507                           buf + file->blocksize - BLK_MARKER_SIZE -
508                           sizeof(magic) - sizeof(len),
509                           sizeof(len));
510                    len = _endian_decode(len);
511
512                    memcpy(&crc_file, buf + len - sizeof(crc), sizeof(crc));
513                    crc_file = _endian_decode(crc_file);
514
515                    // crc check and detect the crc_mode
516                    if (detect_and_check_crc(reinterpret_cast<const uint8_t*>(buf),
517                                             len - sizeof(crc),
518                                             crc_file,
519                                             &file->crc_mode)) {
520                        // crc mode is detected and known.
521                        // check the rules of opening legacy CRC
522                        if (check_crc32_open_rule && file->crc_mode != CRC32) {
523                            const char *msg = "Open of CRC32C file"
524                                              " with forced CRC32\n";
525                            status = FDB_RESULT_INVALID_ARGS;
526                            DBG(msg);
527                            fdb_log(log_callback, status, msg);
528                            break;
529                        } else {
530                            status = FDB_RESULT_SUCCESS;
531
532                            file->header.data = (void *)malloc(file->blocksize);
533
534                            memcpy(file->header.data, buf, len);
535                            memcpy(&file->header.revnum, buf + len,
536                                   sizeof(filemgr_header_revnum_t));
537                            memcpy((void *) &file->header.seqnum,
538                                    buf + len + sizeof(filemgr_header_revnum_t),
539                                    sizeof(fdb_seqnum_t));
540
541                            if (ver_superblock_support(magic)) {
542                                // last_writable_bmp_revnum should be same with
543                                // the current bmp_revnum (since it indicates the
544                                // 'bmp_revnum' of 'sb->cur_alloc_bid').
545                                atomic_store_uint64_t(&file->last_writable_bmp_revnum,
546                                                      filemgr_get_sb_bmp_revnum(file));
547                            }
548
549                            file->header.revnum =
550                                _endian_decode(file->header.revnum);
551                            file->header.seqnum =
552                                _endian_decode(file->header.seqnum.load());
553                            file->header.size = len;
554                            atomic_store_uint64_t(&file->header.bid, hdr_bid_local);
555                            memset(&file->header.stat, 0x0, sizeof(file->header.stat));
556
557                            // release temp buffer
558                            _filemgr_release_temp_buf(buf);
559                        }
560
561                        file->version = magic;
562                        return status;
563                    } else {
564                        status = FDB_RESULT_CHECKSUM_ERROR;
565                        uint32_t crc32 = 0, crc32c = 0;
566                        crc32 = get_checksum(reinterpret_cast<const uint8_t*>(buf),
567                                             len - sizeof(crc),
568                                             CRC32);
569#ifdef _CRC32C
570                        crc32c = get_checksum(reinterpret_cast<const uint8_t*>(buf),
571                                              len - sizeof(crc),
572                                              CRC32C);
573#endif
574                        const char *msg = "Crash Detected: CRC on disk %u != (%u | %u) "
575                            "in a database file '%s'\n";
576                        DBG(msg, crc_file, crc32, crc32c, file->filename);
577                        fdb_log(log_callback, status, msg, crc_file, crc32, crc32c,
578                                file->filename);
579                    }
580                } else {
581                    status = FDB_RESULT_FILE_CORRUPTION;
582                    const char *msg = "Crash Detected: Wrong Magic %" _F64
583                                      " in a database file '%s'\n";
584                    fdb_log(log_callback, status, msg, magic, file->filename);
585                }
586            } else {
587                status = FDB_RESULT_NO_DB_HEADERS;
588                if (block_counter == 1) {
589                    const char *msg = "Crash Detected: Last Block not DBHEADER %0.01x "
590                                      "in a database file '%s'\n";
591                    DBG(msg, marker[0], file->filename);
592                    fdb_log(log_callback, status, msg, marker[0], file->filename);
593                }
594                // if the header was invalidated due to corruption detection, handle that
595                if (marker[0] == BLK_MARKER_BAD){
596                    bad_header_count++;
597                    if (file->config->num_keeping_headers &&
598                        bad_header_count >= file->config->num_keeping_headers){
599                        status = FDB_NONRECOVERABLE_ERR;
600                        fdb_log(log_callback, status,
601                                "Found too many (%d) bad dbheader blocks",
602                                bad_header_count);
603                        break;
604                    }
605                }
606            }
607
608            atomic_store_uint64_t(&file->last_commit, hdr_bid_local * file->blocksize);
609            // traverse headers in a circular manner
610            if (hdr_bid_local) {
611                hdr_bid_local--;
612            } else {
613                hdr_bid_local = atomic_get_uint64_t(&file->pos) / file->blocksize - 1;
614            }
615        } while (hdr_bid_local != hdr_bid);
616    }
617
618    // release temp buffer
619    _filemgr_release_temp_buf(buf);
620
621    file->header.size = 0;
622    file->header.revnum = 0;
623    file->header.seqnum = 0;
624    file->header.data = NULL;
625    atomic_store_uint64_t(&file->header.bid, 0);
626    memset(&file->header.stat, 0x0, sizeof(file->header.stat));
627    file->version = magic;
628    if (bad_header_count > 0  && status == FDB_RESULT_NO_DB_HEADERS)
629        status = FDB_NONRECOVERABLE_ERR;
630    return status;
631}
632
633size_t filemgr_get_ref_count(struct filemgr *file)
634{
635    size_t ret = 0;
636    spin_lock(&file->lock);
637    ret = atomic_get_uint32_t(&file->ref_count);
638    spin_unlock(&file->lock);
639    return ret;
640}
641
642uint64_t filemgr_get_bcache_used_space(void)
643{
644    uint64_t bcache_free_space = 0;
645    if (global_config.ncacheblock) { // If buffer cache is indeed configured
646        bcache_free_space = bcache_get_num_free_blocks();
647        bcache_free_space = (global_config.ncacheblock - bcache_free_space)
648                          * global_config.blocksize;
649    }
650    return bcache_free_space;
651}
652
653struct filemgr_prefetch_args {
654    struct filemgr *file;
655    uint64_t duration;
656    err_log_callback *log_callback;
657    void *aux;
658};
659
660static void *_filemgr_prefetch_thread(void *voidargs)
661{
662    struct filemgr_prefetch_args *args = (struct filemgr_prefetch_args*)voidargs;
663    uint8_t *buf = alca(uint8_t, args->file->blocksize);
664    uint64_t cur_pos = 0, i;
665    uint64_t bcache_free_space;
666    bid_t bid;
667    bool terminate = false;
668    struct timeval begin, cur, gap;
669
670    spin_lock(&args->file->lock);
671    cur_pos = atomic_get_uint64_t(&args->file->last_commit);
672    spin_unlock(&args->file->lock);
673    if (cur_pos < FILEMGR_PREFETCH_UNIT) {
674        terminate = true;
675    } else {
676        cur_pos -= FILEMGR_PREFETCH_UNIT;
677    }
678    // read backwards from the end of the file, in the unit of FILEMGR_PREFETCH_UNIT
679    gettimeofday(&begin, NULL);
680    while (!terminate) {
681        for (i = cur_pos;
682             i < cur_pos + FILEMGR_PREFETCH_UNIT;
683             i += args->file->blocksize) {
684
685            gettimeofday(&cur, NULL);
686            gap = _utime_gap(begin, cur);
687            bcache_free_space = bcache_get_num_free_blocks();
688            bcache_free_space *= args->file->blocksize;
689
690            if (atomic_get_uint8_t(&args->file->prefetch_status)
691                == FILEMGR_PREFETCH_ABORT ||
692                gap.tv_sec >= (int64_t)args->duration ||
693                bcache_free_space < FILEMGR_PREFETCH_UNIT) {
694                // terminate thread when
695                // 1. got abort signal
696                // 2. time out
697                // 3. not enough free space in block cache
698                terminate = true;
699                break;
700            } else {
701                bid = i / args->file->blocksize;
702                if (filemgr_read(args->file, bid, buf, NULL, true)
703                        != FDB_RESULT_SUCCESS) {
704                    // 4. read failure
705                    fdb_log(args->log_callback, FDB_RESULT_READ_FAIL,
706                            "Prefetch thread failed to read a block with block id %" _F64
707                            " from a database file '%s'", bid, args->file->filename);
708                    terminate = true;
709                    break;
710                }
711            }
712        }
713
714        if (cur_pos >= FILEMGR_PREFETCH_UNIT) {
715            cur_pos -= FILEMGR_PREFETCH_UNIT;
716        } else {
717            // remaining space is less than FILEMGR_PREFETCH_UNIT
718            terminate = true;
719        }
720    }
721
722    atomic_cas_uint8_t(&args->file->prefetch_status, FILEMGR_PREFETCH_RUNNING,
723                       FILEMGR_PREFETCH_IDLE);
724    free(args);
725    return NULL;
726}
727
728// prefetch the given DB file
729void filemgr_prefetch(struct filemgr *file,
730                      struct filemgr_config *config,
731                      err_log_callback *log_callback)
732{
733    uint64_t bcache_free_space;
734
735    bcache_free_space = bcache_get_num_free_blocks();
736    bcache_free_space *= file->blocksize;
737
738    // block cache should have free space larger than FILEMGR_PREFETCH_UNIT
739    spin_lock(&file->lock);
740    if (atomic_get_uint64_t(&file->last_commit) > 0 &&
741        bcache_free_space >= FILEMGR_PREFETCH_UNIT) {
742        // invoke prefetch thread
743        struct filemgr_prefetch_args *args;
744        args = (struct filemgr_prefetch_args *)
745               calloc(1, sizeof(struct filemgr_prefetch_args));
746        args->file = file;
747        args->duration = config->prefetch_duration;
748        args->log_callback = log_callback;
749
750        if (atomic_cas_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_IDLE,
751                               FILEMGR_PREFETCH_RUNNING)) {
752            thread_create(&file->prefetch_tid, _filemgr_prefetch_thread, args);
753        }
754    }
755    spin_unlock(&file->lock);
756}
757
758fdb_status filemgr_does_file_exist(char *filename) {
759    struct filemgr_ops *ops = get_filemgr_ops();
760    int fd = ops->open(filename, O_RDONLY, 0444);
761    if (fd < 0) {
762        return (fdb_status) fd;
763    }
764    ops->close(fd);
765    return FDB_RESULT_SUCCESS;
766}
767
768static fdb_status _filemgr_load_sb(struct filemgr *file,
769                                   err_log_callback *log_callback)
770{
771    fdb_status status = FDB_RESULT_SUCCESS;
772    struct sb_config sconfig;
773
774    if (sb_ops.init && sb_ops.get_default_config && sb_ops.read_latest) {
775        sconfig = sb_ops.get_default_config();
776        if (filemgr_get_pos(file)) {
777            // existing file
778            status = sb_ops.read_latest(file, sconfig, log_callback);
779        } else {
780            // new file
781            status = sb_ops.init(file, sconfig, log_callback);
782        }
783    }
784
785    return status;
786}
787
788static filemgr* get_instance_UNLOCKED(const char *filename)
789{
790    if (!filename) {
791        return NULL;
792    }
793
794    struct filemgr query;
795    struct hash_elem *e = NULL;
796    struct filemgr *file = NULL;
797
798    query.filename = (char*)filename;
799    e = hash_find(&hash, &query.e);
800    if (e) {
801        file = _get_entry(e, struct filemgr, e);
802    }
803    return file;
804}
805
806struct filemgr* filemgr_get_instance(const char* filename)
807{
808    spin_lock(&filemgr_openlock);
809    struct filemgr *file = get_instance_UNLOCKED(filename);
810    spin_unlock(&filemgr_openlock);
811
812    return file;
813}
814
815filemgr_open_result filemgr_open(char *filename, struct filemgr_ops *ops,
816                                 struct filemgr_config *config,
817                                 err_log_callback *log_callback)
818{
819    struct filemgr *file = NULL;
820    struct filemgr query;
821    struct hash_elem *e = NULL;
822    bool create = config->options & FILEMGR_CREATE;
823    int file_flag = 0x0;
824    int fd = -1;
825    fdb_status status;
826    filemgr_open_result result = {NULL, FDB_RESULT_OPEN_FAIL};
827
828    filemgr_init(config);
829
830    if (config->encryption_key.algorithm != FDB_ENCRYPTION_NONE && global_config.ncacheblock <= 0) {
831        // cannot use encryption without a block cache
832        result.rv = FDB_RESULT_CRYPTO_ERROR;
833        return result;
834    }
835
836    // check whether file is already opened or not
837    query.filename = filename;
838    spin_lock(&filemgr_openlock);
839    e = hash_find(&hash, &query.e);
840
841    if (e) {
842        // already opened (return existing structure)
843        file = _get_entry(e, struct filemgr, e);
844
845        if (atomic_incr_uint32_t(&file->ref_count) > 1 &&
846            atomic_get_uint8_t(&file->status) != FILE_CLOSED) {
847            spin_unlock(&filemgr_openlock);
848            result.file = file;
849            result.rv = FDB_RESULT_SUCCESS;
850            return result;
851        }
852
853        spin_lock(&file->lock);
854
855        if (atomic_get_uint8_t(&file->status) == FILE_CLOSED) { // if file was closed before
856            file_flag = O_RDWR;
857            if (create) {
858                file_flag |= O_CREAT;
859            }
860            *file->config = *config;
861            file->config->blocksize = global_config.blocksize;
862            file->config->ncacheblock = global_config.ncacheblock;
863            file_flag |= config->flag;
864            file->fd = file->ops->open(file->filename, file_flag, 0666);
865            if (file->fd < 0) {
866                if (file->fd == FDB_RESULT_NO_SUCH_FILE) {
867                    // A database file was manually deleted by the user.
868                    // Clean up global hash table, WAL index, and buffer cache.
869                    // Then, retry it with a create option below IFF it is not
870                    // a read-only open attempt
871                    struct hash_elem *ret;
872                    spin_unlock(&file->lock);
873                    ret = hash_remove(&hash, &file->e);
874                    fdb_assert(ret, 0, 0);
875                    filemgr_free_func(&file->e);
876                    if (!create) {
877                        _log_errno_str(ops, log_callback,
878                                FDB_RESULT_NO_SUCH_FILE, "OPEN", filename);
879                        spin_unlock(&filemgr_openlock);
880                        result.rv = FDB_RESULT_NO_SUCH_FILE;
881                        return result;
882                    }
883                } else {
884                    _log_errno_str(file->ops, log_callback,
885                                  (fdb_status)file->fd, "OPEN", filename);
886                    atomic_decr_uint32_t(&file->ref_count);
887                    spin_unlock(&file->lock);
888                    spin_unlock(&filemgr_openlock);
889                    result.rv = file->fd;
890                    return result;
891                }
892            } else { // Reopening the closed file is succeed.
893                atomic_store_uint8_t(&file->status, FILE_NORMAL);
894                if (config->options & FILEMGR_SYNC) {
895                    file->fflags |= FILEMGR_SYNC;
896                } else {
897                    file->fflags &= ~FILEMGR_SYNC;
898                }
899
900                spin_unlock(&file->lock);
901                spin_unlock(&filemgr_openlock);
902
903                result.file = file;
904                result.rv = FDB_RESULT_SUCCESS;
905                return result;
906            }
907        } else { // file is already opened.
908
909            if (config->options & FILEMGR_SYNC) {
910                file->fflags |= FILEMGR_SYNC;
911            } else {
912                file->fflags &= ~FILEMGR_SYNC;
913            }
914
915            spin_unlock(&file->lock);
916            spin_unlock(&filemgr_openlock);
917            result.file = file;
918            result.rv = FDB_RESULT_SUCCESS;
919            return result;
920        }
921    }
922
923    file_flag = O_RDWR;
924    if (create) {
925        file_flag |= O_CREAT;
926    }
927    file_flag |= config->flag;
928    fd = ops->open(filename, file_flag, 0666);
929    if (fd < 0) {
930        _log_errno_str(ops, log_callback, (fdb_status)fd, "OPEN", filename);
931        spin_unlock(&filemgr_openlock);
932        result.rv = fd;
933        return result;
934    }
935    file = (struct filemgr*)calloc(1, sizeof(struct filemgr));
936    file->filename_len = strlen(filename);
937    file->filename = (char*)malloc(file->filename_len + 1);
938    strcpy(file->filename, filename);
939
940    atomic_init_uint32_t(&file->ref_count, 1);
941    file->stale_list = NULL;
942
943    status = fdb_init_encryptor(&file->encryption, &config->encryption_key);
944    if (status != FDB_RESULT_SUCCESS) {
945        ops->close(fd);
946        free(file);
947        spin_unlock(&filemgr_openlock);
948        result.rv = status;
949        return result;
950    }
951
952    file->wal = (struct wal *)calloc(1, sizeof(struct wal));
953    file->wal->flag = 0;
954
955    file->ops = ops;
956    file->blocksize = global_config.blocksize;
957    atomic_init_uint8_t(&file->status, FILE_NORMAL);
958    file->config = (struct filemgr_config*)malloc(sizeof(struct filemgr_config));
959    *file->config = *config;
960    file->config->blocksize = global_config.blocksize;
961    file->config->ncacheblock = global_config.ncacheblock;
962    file->old_filename = NULL;
963    file->new_filename = NULL;
964    file->fd = fd;
965
966    cs_off_t offset = file->ops->goto_eof(file->fd);
967    if (offset < 0) {
968        _log_errno_str(file->ops, log_callback, (fdb_status) offset, "SEEK_END", filename);
969        file->ops->close(file->fd);
970        free(file->wal);
971        free(file->filename);
972        free(file->config);
973        free(file);
974        spin_unlock(&filemgr_openlock);
975        result.rv = (fdb_status) offset;
976        return result;
977    }
978    atomic_init_uint64_t(&file->last_commit, offset);
979    atomic_init_uint64_t(&file->last_writable_bmp_revnum, 0);
980    atomic_init_uint64_t(&file->pos, offset);
981    atomic_init_uint32_t(&file->throttling_delay, 0);
982    atomic_init_uint64_t(&file->num_invalidated_blocks, 0);
983    atomic_init_uint8_t(&file->io_in_prog, 0);
984
985#ifdef _LATENCY_STATS
986    for (int i = 0; i < FDB_LATENCY_NUM_STATS; ++i) {
987        filemgr_init_latency_stat(&file->lat_stats[i]);
988    }
989#endif // _LATENCY_STATS
990
991    file->bcache = NULL;
992    file->in_place_compaction = false;
993    file->kv_header = NULL;
994    atomic_init_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_IDLE);
995
996    atomic_init_uint64_t(&file->header.bid, 0);
997    _init_op_stats(&file->header.op_stat);
998
999    spin_init(&file->lock);
1000    file->stale_list = (struct list*)calloc(1, sizeof(struct list));
1001    list_init(file->stale_list);
1002    avl_init(&file->stale_info_tree, NULL);
1003    avl_init(&file->mergetree, NULL);
1004    file->stale_info_tree_loaded = false;
1005
1006    filemgr_dirty_update_init(file);
1007
1008    spin_init(&file->fhandle_idx_lock);
1009    avl_init(&file->fhandle_idx, NULL);
1010
1011#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1012    struct plock_ops pops;
1013    struct plock_config pconfig;
1014
1015    pops.init_user = mutex_init_wrap;
1016    pops.lock_user = mutex_lock_wrap;
1017    pops.unlock_user = mutex_unlock_wrap;
1018    pops.destroy_user = mutex_destroy_wrap;
1019    pops.init_internal = spin_init_wrap;
1020    pops.lock_internal = spin_lock_wrap;
1021    pops.unlock_internal = spin_unlock_wrap;
1022    pops.destroy_internal = spin_destroy_wrap;
1023    pops.is_overlapped = _block_is_overlapped;
1024
1025    memset(&pconfig, 0x0, sizeof(pconfig));
1026    pconfig.ops = &pops;
1027    pconfig.sizeof_lock_internal = sizeof(spin_t);
1028    pconfig.sizeof_lock_user = sizeof(mutex_t);
1029    pconfig.sizeof_range = sizeof(bid_t);
1030    pconfig.aux = NULL;
1031    plock_init(&file->plock, &pconfig);
1032#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1033    int i;
1034    for (i=0;i<DLOCK_MAX;++i) {
1035        mutex_init(&file->data_mutex[i]);
1036    }
1037#else
1038    int i;
1039    for (i=0;i<DLOCK_MAX;++i) {
1040        spin_init(&file->data_spinlock[i]);
1041    }
1042#endif //__FILEMGR_DATA_PARTIAL_LOCK
1043
1044    mutex_init(&file->writer_lock.mutex);
1045    file->writer_lock.locked = false;
1046
1047    // Note: CRC must be initialized before superblock loading
1048    // initialize CRC mode
1049    if (file->config && file->config->options & FILEMGR_CREATE_CRC32) {
1050        file->crc_mode = CRC32;
1051    } else {
1052        file->crc_mode = CRC_DEFAULT;
1053    }
1054
1055    do { // repeat until both superblock and DB header are correctly read
1056        // init or load superblock
1057        status = _filemgr_load_sb(file, log_callback);
1058        // we can tolerate SB_READ_FAIL for old version file
1059        if (status != FDB_RESULT_SB_READ_FAIL &&
1060            status != FDB_RESULT_SUCCESS) {
1061            _log_errno_str(file->ops, log_callback, status, "READ", file->filename);
1062            file->ops->close(file->fd);
1063            free(file->stale_list);
1064            free(file->wal);
1065            free(file->filename);
1066            free(file->config);
1067            free(file);
1068            spin_unlock(&filemgr_openlock);
1069            result.rv = status;
1070            return result;
1071        }
1072
1073        // read header
1074        status = _filemgr_read_header(file, log_callback);
1075        if (file->sb && status == FDB_RESULT_NO_DB_HEADERS) {
1076            // this happens when user created & closed a file without any mutations,
1077            // thus there is no other data but superblocks.
1078            // we can tolerate this case.
1079        } else if (status != FDB_RESULT_SUCCESS) {
1080            _log_errno_str(file->ops, log_callback, status, "READ", filename);
1081            file->ops->close(file->fd);
1082            if (file->sb) {
1083                sb_ops.release(file);
1084            }
1085            free(file->stale_list);
1086            free(file->wal);
1087            free(file->filename);
1088            free(file->config);
1089            free(file);
1090            spin_unlock(&filemgr_openlock);
1091            result.rv = status;
1092            return result;
1093        }
1094
1095        if (file->sb &&
1096            file->header.revnum != atomic_get_uint64_t(&file->sb->last_hdr_revnum)) {
1097            // superblock exists but the corresponding DB header does not match.
1098            // read another candidate.
1099            continue;
1100        }
1101
1102        break;
1103    } while (true);
1104
1105    // initialize WAL
1106    if (!wal_is_initialized(file)) {
1107        wal_init(file, FDB_WAL_NBUCKET);
1108    }
1109
1110    // init global transaction for the file
1111    file->global_txn.wrapper = (struct wal_txn_wrapper*)
1112                               malloc(sizeof(struct wal_txn_wrapper));
1113    file->global_txn.wrapper->txn = &file->global_txn;
1114    file->global_txn.handle = NULL;
1115    if (atomic_get_uint64_t(&file->pos)) {
1116        file->global_txn.prev_hdr_bid =
1117            (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1;
1118    } else {
1119        file->global_txn.prev_hdr_bid = BLK_NOT_FOUND;
1120    }
1121    file->global_txn.prev_revnum = 0;
1122    file->global_txn.items = (struct list *)malloc(sizeof(struct list));
1123    list_init(file->global_txn.items);
1124    file->global_txn.isolation = FDB_ISOLATION_READ_COMMITTED;
1125    wal_add_transaction(file, &file->global_txn);
1126
1127    hash_insert(&hash, &file->e);
1128    if (config->prefetch_duration > 0) {
1129        filemgr_prefetch(file, config, log_callback);
1130    }
1131
1132    spin_unlock(&filemgr_openlock);
1133
1134    if (config->options & FILEMGR_SYNC) {
1135        file->fflags |= FILEMGR_SYNC;
1136    } else {
1137        file->fflags &= ~FILEMGR_SYNC;
1138    }
1139
1140    result.file = file;
1141    result.rv = FDB_RESULT_SUCCESS;
1142    fdb_log(log_callback, FDB_RESULT_SUCCESS, "Forestdb opened database file %s",
1143            filename);
1144
1145    return result;
1146}
1147
1148uint64_t filemgr_update_header(struct filemgr *file,
1149                               void *buf,
1150                               size_t len,
1151                               bool inc_revnum)
1152{
1153    uint64_t ret;
1154
1155    spin_lock(&file->lock);
1156
1157    if (file->header.data == NULL) {
1158        file->header.data = (void *)malloc(file->blocksize);
1159    }
1160    memcpy(file->header.data, buf, len);
1161    file->header.size = len;
1162    if (inc_revnum) {
1163        ++(file->header.revnum);
1164    }
1165    ret = file->header.revnum;
1166
1167    spin_unlock(&file->lock);
1168
1169    return ret;
1170}
1171
1172filemgr_header_revnum_t filemgr_get_header_revnum(struct filemgr *file)
1173{
1174    filemgr_header_revnum_t ret;
1175    spin_lock(&file->lock);
1176    ret = file->header.revnum;
1177    spin_unlock(&file->lock);
1178    return ret;
1179}
1180
1181// 'filemgr_get_seqnum', 'filemgr_set_seqnum',
1182// 'filemgr_get_walflush_revnum', 'filemgr_set_walflush_revnum'
1183// have to be protected by 'filemgr_mutex_lock' & 'filemgr_mutex_unlock'.
1184fdb_seqnum_t filemgr_get_seqnum(struct filemgr *file)
1185{
1186    return file->header.seqnum;
1187}
1188
1189void filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)
1190{
1191    file->header.seqnum = seqnum;
1192}
1193
1194void* filemgr_get_header(struct filemgr *file, void *buf, size_t *len,
1195                         bid_t *header_bid, fdb_seqnum_t *seqnum,
1196                         filemgr_header_revnum_t *header_revnum)
1197{
1198    spin_lock(&file->lock);
1199
1200    if (file->header.size > 0) {
1201        if (buf == NULL) {
1202            buf = (void*)malloc(file->header.size);
1203        }
1204        memcpy(buf, file->header.data, file->header.size);
1205    }
1206
1207    if (len) {
1208        *len = file->header.size;
1209    }
1210    if (header_bid) {
1211        *header_bid = filemgr_get_header_bid(file);
1212    }
1213    if (seqnum) {
1214        *seqnum = file->header.seqnum;
1215    }
1216    if (header_revnum) {
1217        *header_revnum = file->header.revnum;
1218    }
1219
1220    spin_unlock(&file->lock);
1221
1222    return buf;
1223}
1224
1225fdb_status filemgr_fetch_header(struct filemgr *file, uint64_t bid,
1226                                void *buf, size_t *len, fdb_seqnum_t *seqnum,
1227                                filemgr_header_revnum_t *header_revnum,
1228                                uint64_t *deltasize, uint64_t *version,
1229                                uint64_t *sb_bmp_revnum,
1230                                err_log_callback *log_callback)
1231{
1232    uint8_t *_buf;
1233    uint8_t marker[BLK_MARKER_SIZE];
1234    filemgr_header_len_t hdr_len;
1235    uint64_t _deltasize, _bmp_revnum;
1236    filemgr_magic_t magic;
1237    fdb_status status = FDB_RESULT_SUCCESS;
1238
1239    *len = 0;
1240
1241    if (!bid || bid == BLK_NOT_FOUND) {
1242        // No other header available
1243        return FDB_RESULT_SUCCESS;
1244    }
1245
1246    _buf = (uint8_t *)_filemgr_get_temp_buf();
1247
1248    status = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
1249
1250    if (status != FDB_RESULT_SUCCESS) {
1251        fdb_log(log_callback, status,
1252                "Failed to read a database header with block id %" _F64 " in "
1253                "a database file '%s'", bid, file->filename);
1254        _filemgr_release_temp_buf(_buf);
1255        return status;
1256    }
1257    memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1258            BLK_MARKER_SIZE);
1259
1260    if (marker[0] != BLK_MARKER_DBHEADER) {
1261        // Comment this warning log as of now because the circular block reuse
1262        // can cause false alarms as a previous stale header block can be reclaimed
1263        // and reused for incoming writes.
1264        /*
1265        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1266                "A block marker of the database header block id %" _F64 " in "
1267                "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1268                bid, file->filename);
1269        */
1270        _filemgr_release_temp_buf(_buf);
1271        return FDB_RESULT_READ_FAIL;
1272    }
1273    memcpy(&magic,
1274            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1275            sizeof(magic));
1276    magic = _endian_decode(magic);
1277    if (!ver_is_valid_magic(magic)) {
1278        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1279                "A block magic value of %" _F64 " in the database header block"
1280                "id %" _F64 " in a database file '%s'"
1281                "does NOT match FILEMGR_MAGIC %" _F64 "!",
1282                magic, bid, file->filename, ver_get_latest_magic());
1283        _filemgr_release_temp_buf(_buf);
1284        return FDB_RESULT_FILE_CORRUPTION;
1285    }
1286    memcpy(&hdr_len,
1287            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1288            sizeof(hdr_len), sizeof(hdr_len));
1289    hdr_len = _endian_decode(hdr_len);
1290
1291    memcpy(buf, _buf, hdr_len);
1292    *len = hdr_len;
1293    *version = magic;
1294
1295    if (header_revnum) {
1296        // copy the DB header revnum
1297        filemgr_header_revnum_t _revnum;
1298        memcpy(&_revnum, _buf + hdr_len, sizeof(_revnum));
1299        *header_revnum = _endian_decode(_revnum);
1300    }
1301    if (seqnum) {
1302        // copy default KVS's seqnum
1303        fdb_seqnum_t _seqnum;
1304        memcpy(&_seqnum, _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1305               sizeof(_seqnum));
1306        *seqnum = _endian_decode(_seqnum);
1307    }
1308
1309    if (ver_is_atleast_magic_001(magic)) {
1310        if (deltasize) {
1311            memcpy(&_deltasize, _buf + file->blocksize - BLK_MARKER_SIZE
1312                    - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1313                    - sizeof(_deltasize), sizeof(_deltasize));
1314            *deltasize = _endian_decode(_deltasize);
1315        }
1316    }
1317
1318    if (sb_bmp_revnum && ver_superblock_support(magic)) {
1319        memcpy(&_bmp_revnum, _buf + file->blocksize - BLK_MARKER_SIZE
1320                - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1321                - sizeof(_deltasize) - sizeof(_bmp_revnum), sizeof(_bmp_revnum));
1322        *sb_bmp_revnum = _endian_decode(_bmp_revnum);
1323    }
1324
1325    _filemgr_release_temp_buf(_buf);
1326
1327    return status;
1328}
1329
1330uint64_t filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid,
1331                                   void *buf, size_t *len, fdb_seqnum_t *seqnum,
1332                                   filemgr_header_revnum_t *revnum,
1333                                   uint64_t *deltasize, uint64_t *version,
1334                                   uint64_t *sb_bmp_revnum,
1335                                   err_log_callback *log_callback)
1336{
1337    uint8_t *_buf;
1338    uint8_t marker[BLK_MARKER_SIZE];
1339    fdb_seqnum_t _seqnum;
1340    filemgr_header_revnum_t _revnum, cur_revnum, prev_revnum;
1341    filemgr_header_len_t hdr_len;
1342    filemgr_magic_t magic;
1343    bid_t _prev_bid, prev_bid;
1344    uint64_t _deltasize, _bmp_revnum;
1345    int found = 0;
1346
1347    *len = 0;
1348
1349    if (!bid || bid == BLK_NOT_FOUND) {
1350        // No other header available
1351        return bid;
1352    }
1353    _buf = (uint8_t *)_filemgr_get_temp_buf();
1354
1355    // Reverse scan the file for a previous DB header
1356    do {
1357        // Get prev_bid from the current header.
1358        // Since the current header is already cached during the previous
1359        // operation, no disk I/O will be triggered.
1360        if (filemgr_read(file, (bid_t)bid, _buf, log_callback, true)
1361                != FDB_RESULT_SUCCESS) {
1362            break;
1363        }
1364
1365        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1366               BLK_MARKER_SIZE);
1367        memcpy(&magic,
1368               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1369               sizeof(magic));
1370        magic = _endian_decode(magic);
1371
1372        if (marker[0] != BLK_MARKER_DBHEADER ||
1373            !ver_is_valid_magic(magic)) {
1374            // not a header block
1375            // this happens when this function is invoked between
1376            // fdb_set() call and fdb_commit() call, so the last block
1377            // in the file is not a header block
1378            bid_t latest_hdr = filemgr_get_header_bid(file);
1379            if (latest_hdr != BLK_NOT_FOUND && bid > latest_hdr) {
1380                // get the latest header BID
1381                bid = latest_hdr;
1382            } else {
1383                break;
1384            }
1385            cur_revnum = file->header.revnum + 1;
1386        } else {
1387
1388            memcpy(&hdr_len,
1389                   _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1390                   sizeof(hdr_len), sizeof(hdr_len));
1391            hdr_len = _endian_decode(hdr_len);
1392
1393            memcpy(&_revnum, _buf + hdr_len,
1394                   sizeof(filemgr_header_revnum_t));
1395            cur_revnum = _endian_decode(_revnum);
1396
1397            if (sb_bmp_exists(file->sb)) {
1398                // first check revnum
1399                if (cur_revnum <= sb_ops.get_min_live_revnum(file)) {
1400                    // previous headers already have been reclaimed
1401                    // no more logical prev header
1402                    break;
1403                }
1404            }
1405
1406            memcpy(&_prev_bid,
1407                   _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1408                       sizeof(hdr_len) - sizeof(_prev_bid),
1409                   sizeof(_prev_bid));
1410            prev_bid = _endian_decode(_prev_bid);
1411            bid = prev_bid;
1412        }
1413
1414        // Read the prev header
1415        fdb_status fs = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
1416        if (fs != FDB_RESULT_SUCCESS) {
1417            fdb_log(log_callback, fs,
1418                    "Failed to read a previous database header with block id %"
1419                    _F64 " in "
1420                    "a database file '%s'", bid, file->filename);
1421            break;
1422        }
1423
1424        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1425               BLK_MARKER_SIZE);
1426        if (marker[0] != BLK_MARKER_DBHEADER) {
1427            if (bid) {
1428                // broken linked list
1429                fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1430                        "A block marker of the previous database header block id %"
1431                        _F64 " in "
1432                        "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1433                        bid, file->filename);
1434            }
1435            break;
1436        }
1437
1438        memcpy(&magic,
1439               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1440               sizeof(magic));
1441        magic = _endian_decode(magic);
1442        if (!ver_is_valid_magic(magic)) {
1443            // broken linked list
1444            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1445                    "A block magic value of %" _F64
1446                    " of the previous database header block id %" _F64 " in "
1447                    "a database file '%s' does NOT match FILEMGR_MAGIC %"
1448                    _F64"!", magic,
1449                    bid, file->filename, ver_get_latest_magic());
1450            break;
1451        }
1452
1453        memcpy(&hdr_len,
1454               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1455               sizeof(hdr_len), sizeof(hdr_len));
1456        hdr_len = _endian_decode(hdr_len);
1457
1458        if (buf) {
1459            memcpy(buf, _buf, hdr_len);
1460        }
1461        memcpy(&_revnum, _buf + hdr_len,
1462               sizeof(filemgr_header_revnum_t));
1463        prev_revnum = _endian_decode(_revnum);
1464        if (prev_revnum >= cur_revnum ||
1465            prev_revnum < sb_ops.get_min_live_revnum(file)) {
1466            // no more prev header, or broken linked list
1467            break;
1468        }
1469
1470        memcpy(&_seqnum,
1471               _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1472               sizeof(fdb_seqnum_t));
1473        if (ver_is_atleast_magic_001(magic)) {
1474            if (deltasize) {
1475                memcpy(&_deltasize,
1476                        _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic)
1477                       - sizeof(hdr_len) - sizeof(prev_bid) - sizeof(_deltasize),
1478                        sizeof(_deltasize));
1479                *deltasize = _endian_decode(_deltasize);
1480            }
1481        }
1482
1483        if (sb_bmp_revnum && ver_superblock_support(magic)) {
1484            memcpy(&_bmp_revnum, _buf + file->blocksize - BLK_MARKER_SIZE
1485                    - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1486                    - sizeof(_deltasize) - sizeof(_bmp_revnum), sizeof(_bmp_revnum));
1487            *sb_bmp_revnum = _endian_decode(_bmp_revnum);
1488        }
1489
1490        if (revnum) {
1491            *revnum = prev_revnum;
1492        }
1493        *seqnum = _endian_decode(_seqnum);
1494        *len = hdr_len;
1495        *version = magic;
1496        found = 1;
1497        break;
1498    } while (false); // no repetition
1499
1500    if (!found) { // no other header found till end of file
1501        *len = 0;
1502        bid = BLK_NOT_FOUND;
1503    }
1504
1505    _filemgr_release_temp_buf(_buf);
1506
1507    return bid;
1508}
1509
1510fdb_status filemgr_close(struct filemgr *file, bool cleanup_cache_onclose,
1511                         const char *orig_file_name,
1512                         err_log_callback *log_callback)
1513{
1514    int rv = FDB_RESULT_SUCCESS;
1515
1516    if (atomic_decr_uint32_t(&file->ref_count) > 0) {
1517        // File is still accessed by other readers or writers.
1518        return FDB_RESULT_SUCCESS;
1519    }
1520
1521    fdb_log(log_callback, (fdb_status)rv, "Forestdb closed database file %s",
1522            file->filename);
1523
1524    spin_lock(&filemgr_openlock); // Grab the filemgr lock to avoid the race with
1525                                  // filemgr_open() because file->lock won't
1526                                  // prevent the race condition.
1527
1528    // remove filemgr structure if no thread refers to the file
1529    spin_lock(&file->lock);
1530    if (atomic_get_uint32_t(&file->ref_count) == 0) {
1531        if (global_config.ncacheblock > 0 &&
1532            atomic_get_uint8_t(&file->status) != FILE_REMOVED_PENDING) {
1533            spin_unlock(&file->lock);
1534            // discard all dirty blocks belonged to this file
1535            bcache_remove_dirty_blocks(file);
1536        } else {
1537            // If the file is in pending removal (i.e., FILE_REMOVED_PENDING),
1538            // then its dirty block entries will be cleaned up in either
1539            // filemgr_free_func() or register_file_removal() below.
1540            spin_unlock(&file->lock);
1541        }
1542
1543        if (wal_is_initialized(file)) {
1544            wal_close(file, log_callback);
1545        }
1546#ifdef _LATENCY_STATS_DUMP_TO_FILE
1547        filemgr_dump_latency_stat(file, log_callback);
1548#endif // _LATENCY_STATS_DUMP_TO_FILE
1549
1550        spin_lock(&file->lock);
1551
1552        if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING) {
1553
1554            bool foreground_deletion = false;
1555            struct filemgr* new_file = get_instance_UNLOCKED(file->new_filename);
1556
1557            // immediately remove file if background remove function is not set
1558            if (!lazy_file_deletion_enabled ||
1559                (new_file && new_file->in_place_compaction)) {
1560                // TODO: to avoid the scenario below, we prevent background
1561                //       deletion of in-place compacted files at this time.
1562                // 1) In-place compacted from 'A' to 'A.1'.
1563                // 2) Request to delete 'A'.
1564                // 3) Close 'A.1'; since 'A' is not deleted yet, 'A.1' is not renamed.
1565                // 4) User opens DB file using its original name 'A', not 'A.1'.
1566                // 5) Old file 'A' is opened, and then background thread deletes 'A'.
1567                // 6) Crash!
1568
1569                // As the file is already unlinked, the file will be removed
1570                // as soon as we close it.
1571                rv = file->ops->close(file->fd);
1572                _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1573#if defined(WIN32) || defined(_WIN32)
1574                // For Windows, we need to manually remove the file.
1575                remove(file->filename);
1576#endif
1577                foreground_deletion = true;
1578            }
1579
1580            // we can release lock becuase no one will open this file
1581            spin_unlock(&file->lock);
1582            struct hash_elem *ret = hash_remove(&hash, &file->e);
1583            fdb_assert(ret, 0, 0);
1584
1585            spin_unlock(&filemgr_openlock);
1586
1587            if (foreground_deletion) {
1588                filemgr_free_func(&file->e);
1589            } else {
1590                register_file_removal(file, log_callback);
1591            }
1592            return (fdb_status) rv;
1593        } else {
1594
1595            rv = file->ops->close(file->fd);
1596            if (cleanup_cache_onclose) {
1597                _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1598                if (file->in_place_compaction && orig_file_name) {
1599                    struct hash_elem *elem = NULL;
1600                    struct filemgr query;
1601                    uint32_t old_file_refcount = 0;
1602
1603                    query.filename = (char *)orig_file_name;
1604                    elem = hash_find(&hash, &query.e);
1605
1606                    if (file->old_filename) {
1607                        struct hash_elem *elem_old = NULL;
1608                        struct filemgr query_old;
1609                        struct filemgr *old_file = NULL;
1610
1611                        // get old file's ref count if exists
1612                        query_old.filename = file->old_filename;
1613                        elem_old = hash_find(&hash, &query_old.e);
1614                        if (elem_old) {
1615                            old_file = _get_entry(elem_old, struct filemgr, e);
1616                            old_file_refcount = atomic_get_uint32_t(&old_file->ref_count);
1617                        }
1618                    }
1619
1620                    // If old file is opened by other handle, renaming should be
1621                    // postponed. It will be renamed later by the handle referring
1622                    // to the old file.
1623                    if (!elem && old_file_refcount == 0 &&
1624                        is_file_removed(orig_file_name)) {
1625                        // If background file removal is not done yet, we postpone
1626                        // file renaming at this time.
1627                        if (rename(file->filename, orig_file_name) < 0) {
1628                            // Note that the renaming failure is not a critical
1629                            // issue because the last compacted file will be automatically
1630                            // identified and opened in the next fdb_open call.
1631                            _log_errno_str(file->ops, log_callback, FDB_RESULT_FILE_RENAME_FAIL,
1632                                           "CLOSE", file->filename);
1633                        }
1634                    }
1635                }
1636                spin_unlock(&file->lock);
1637                // Clean up global hash table, WAL index, and buffer cache.
1638                struct hash_elem *ret = hash_remove(&hash, &file->e);
1639                fdb_assert(ret, file, 0);
1640
1641                spin_unlock(&filemgr_openlock);
1642
1643                filemgr_free_func(&file->e);
1644                return (fdb_status) rv;
1645            } else {
1646                atomic_store_uint8_t(&file->status, FILE_CLOSED);
1647            }
1648        }
1649    }
1650
1651    _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1652
1653    spin_unlock(&file->lock);
1654    spin_unlock(&filemgr_openlock);
1655    return (fdb_status) rv;
1656}
1657
1658void filemgr_remove_all_buffer_blocks(struct filemgr *file)
1659{
1660    // remove all cached blocks
1661    if (global_config.ncacheblock > 0 &&
1662            file->bcache.load(std::memory_order_relaxed)) {
1663        bcache_remove_dirty_blocks(file);
1664        bcache_remove_clean_blocks(file);
1665        bcache_remove_file(file);
1666        file->bcache.store(NULL, std::memory_order_relaxed);
1667    }
1668}
1669
1670void _free_fhandle_idx(struct avl_tree *idx);
1671void filemgr_free_func(struct hash_elem *h)
1672{
1673    struct filemgr *file = _get_entry(h, struct filemgr, e);
1674
1675    filemgr_prefetch_status_t prefetch_state =
1676                              atomic_get_uint8_t(&file->prefetch_status);
1677
1678    atomic_store_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_ABORT);
1679    if (prefetch_state == FILEMGR_PREFETCH_RUNNING) {
1680        // prefetch thread was running
1681        void *ret;
1682        // wait (the thread must have been created..)
1683        thread_join(file->prefetch_tid, &ret);
1684    }
1685
1686    // remove all cached blocks
1687    if (global_config.ncacheblock > 0 &&
1688            file->bcache.load(std::memory_order_relaxed)) {
1689        bcache_remove_dirty_blocks(file);
1690        bcache_remove_clean_blocks(file);
1691        bcache_remove_file(file);
1692        file->bcache.store(NULL, std::memory_order_relaxed);
1693    }
1694
1695    if (file->kv_header) {
1696        // multi KV intance mode & KV header exists
1697        file->free_kv_header(file);
1698    }
1699
1700    // free global transaction
1701    wal_remove_transaction(file, &file->global_txn);
1702    free(file->global_txn.items);
1703    free(file->global_txn.wrapper);
1704
1705    // destroy WAL
1706    if (wal_is_initialized(file)) {
1707        wal_shutdown(file, NULL);
1708        wal_destroy(file);
1709    }
1710    free(file->wal);
1711
1712#ifdef _LATENCY_STATS
1713    for (int x = 0; x < FDB_LATENCY_NUM_STATS; ++x) {
1714        filemgr_destroy_latency_stat(&file->lat_stats[x]);
1715    }
1716#endif // _LATENCY_STATS
1717
1718    // free filename and header
1719    free(file->filename);
1720    if (file->header.data) free(file->header.data);
1721
1722    // free old/new filename if any
1723    free(file->old_filename);
1724    free(file->new_filename);
1725
1726    // destroy locks
1727    spin_destroy(&file->lock);
1728
1729#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1730    plock_destroy(&file->plock);
1731#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1732    int i;
1733    for (i=0;i<DLOCK_MAX;++i) {
1734        mutex_destroy(&file->data_mutex[i]);
1735    }
1736#else
1737    int i;
1738    for (i=0;i<DLOCK_MAX;++i) {
1739        spin_destroy(&file->data_spinlock[i]);
1740    }
1741#endif //__FILEMGR_DATA_PARTIAL_LOCK
1742
1743    mutex_destroy(&file->writer_lock.mutex);
1744
1745    // free superblock
1746    if (sb_ops.release) {
1747        sb_ops.release(file);
1748    }
1749
1750    // free dirty update index
1751    filemgr_dirty_update_free(file);
1752
1753    // free fhandle idx
1754    _free_fhandle_idx(&file->fhandle_idx);
1755    spin_destroy(&file->fhandle_idx_lock);
1756
1757    // free file structure
1758    struct list *stale_list = filemgr_get_stale_list(file);
1759    filemgr_clear_stale_list(file);
1760    filemgr_clear_stale_info_tree(file);
1761    filemgr_clear_mergetree(file);
1762    free(stale_list);
1763    free(file->config);
1764    free(file);
1765}
1766
1767// permanently remove file from cache (not just close)
1768// LCOV_EXCL_START
1769void filemgr_remove_file(struct filemgr *file, err_log_callback *log_callback)
1770{
1771    struct hash_elem *ret;
1772
1773    if (!file || atomic_get_uint32_t(&file->ref_count) > 0) {
1774        return;
1775    }
1776
1777    // remove from global hash table
1778    spin_lock(&filemgr_openlock);
1779    ret = hash_remove(&hash, &file->e);
1780    fdb_assert(ret, ret, NULL);
1781    spin_unlock(&filemgr_openlock);
1782
1783    struct filemgr *new_file = filemgr_get_instance(file->new_filename);
1784
1785    if (!lazy_file_deletion_enabled ||
1786        (new_file && new_file->in_place_compaction)) {
1787        filemgr_free_func(&file->e);
1788    } else {
1789        register_file_removal(file, log_callback);
1790    }
1791}
1792// LCOV_EXCL_STOP
1793
1794static
1795void *_filemgr_is_closed(struct hash_elem *h, void *ctx) {
1796    struct filemgr *file = _get_entry(h, struct filemgr, e);
1797    void *ret;
1798    spin_lock(&file->lock);
1799    if (atomic_get_uint32_t(&file->ref_count) != 0) {
1800        ret = (void *)file;
1801    } else {
1802        ret = NULL;
1803    }
1804    spin_unlock(&file->lock);
1805    return ret;
1806}
1807
1808fdb_status filemgr_shutdown()
1809{
1810    fdb_status ret = FDB_RESULT_SUCCESS;
1811    void *open_file;
1812    if (filemgr_initialized) {
1813
1814#ifndef SPIN_INITIALIZER
1815        // Windows: check if spin lock is already destroyed.
1816        if (InterlockedCompareExchange(&initial_lock_status, 1, 2) == 2) {
1817            spin_lock(&initial_lock);
1818        } else {
1819            // filemgr is already shut down
1820            return ret;
1821        }
1822#else
1823        spin_lock(&initial_lock);
1824#endif
1825
1826        if (!filemgr_initialized) {
1827            // filemgr is already shut down
1828#ifdef SPIN_INITIALIZER
1829            spin_unlock(&initial_lock);
1830#endif
1831            return ret;
1832        }
1833
1834        spin_lock(&filemgr_openlock);
1835        open_file = hash_scan(&hash, _filemgr_is_closed, NULL);
1836        spin_unlock(&filemgr_openlock);
1837        if (!open_file) {
1838            hash_free_active(&hash, filemgr_free_func);
1839            if (global_config.ncacheblock > 0) {
1840                bcache_shutdown();
1841            }
1842            filemgr_initialized = 0;
1843            _filemgr_shutdown_temp_buf();
1844            spin_unlock(&initial_lock);
1845#ifndef SPIN_INITIALIZER
1846            initial_lock_status = 0;
1847            spin_destroy(&initial_lock);
1848#endif
1849        } else {
1850            spin_unlock(&initial_lock);
1851            ret = FDB_RESULT_FILE_IS_BUSY;
1852        }
1853    }
1854    return ret;
1855}
1856
1857bid_t filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)
1858{
1859    spin_lock(&file->lock);
1860    bid_t bid = BLK_NOT_FOUND;
1861
1862    // block reusing is not allowed for being compacted file
1863    // for easy implementation.
1864    if (filemgr_get_file_status(file) == FILE_NORMAL &&
1865        file->sb && sb_ops.alloc_block) {
1866        bid = sb_ops.alloc_block(file);
1867    }
1868    if (bid == BLK_NOT_FOUND) {
1869        bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1870        atomic_add_uint64_t(&file->pos, file->blocksize);
1871    }
1872
1873    if (global_config.ncacheblock <= 0) {
1874        // if block cache is turned off, write the allocated block before use
1875        uint8_t _buf = 0x0;
1876        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1877                                       (bid+1) * file->blocksize - 1);
1878        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1879    }
1880    spin_unlock(&file->lock);
1881
1882    return bid;
1883}
1884
1885// Note that both alloc_multiple & alloc_multiple_cond are not used in
1886// the new version of DB file (with superblock support).
1887void filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin,
1888                            bid_t *end, err_log_callback *log_callback)
1889{
1890    spin_lock(&file->lock);
1891    *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1892    *end = *begin + nblock - 1;
1893    atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1894
1895    if (global_config.ncacheblock <= 0) {
1896        // if block cache is turned off, write the allocated block before use
1897        uint8_t _buf = 0x0;
1898        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1899                                       atomic_get_uint64_t(&file->pos) - 1);
1900        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1901    }
1902    spin_unlock(&file->lock);
1903}
1904
1905// atomically allocate NBLOCK blocks only when current file position is same to nextbid
1906bid_t filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock,
1907                                  bid_t *begin, bid_t *end,
1908                                  err_log_callback *log_callback)
1909{
1910    bid_t bid;
1911    spin_lock(&file->lock);
1912    bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1913    if (bid == nextbid) {
1914        *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1915        *end = *begin + nblock - 1;
1916        atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1917
1918        if (global_config.ncacheblock <= 0) {
1919            // if block cache is turned off, write the allocated block before use
1920            uint8_t _buf = 0x0;
1921            ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1922                                           atomic_get_uint64_t(&file->pos));
1923            _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1924        }
1925    }else{
1926        *begin = BLK_NOT_FOUND;
1927        *end = BLK_NOT_FOUND;
1928    }
1929    spin_unlock(&file->lock);
1930    return bid;
1931}
1932
1933#ifdef __CRC32
1934INLINE fdb_status _filemgr_crc32_check(struct filemgr *file, void *buf)
1935{
1936    if ( *((uint8_t*)buf + file->blocksize-1) == BLK_MARKER_BNODE ) {
1937        uint32_t crc_file = 0;
1938        memcpy(&crc_file, (uint8_t *) buf + BTREE_CRC_OFFSET, sizeof(crc_file));
1939        crc_file = _endian_decode(crc_file);
1940        memset((uint8_t *) buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1941        if (!perform_integrity_check(reinterpret_cast<const uint8_t*>(buf),
1942                                     file->blocksize,
1943                                     crc_file,
1944                                     file->crc_mode)) {
1945            return FDB_RESULT_CHECKSUM_ERROR;
1946        }
1947    }
1948    return FDB_RESULT_SUCCESS;
1949}
1950#endif
1951
1952fdb_status filemgr_invalidate_dbheader(struct filemgr *file, bid_t bid,
1953                                       err_log_callback *log_callback){
1954    uint8_t marker[BLK_MARKER_SIZE];
1955
1956    // get temp buffer
1957    uint8_t *buf = (uint8_t *) _filemgr_get_temp_buf();
1958
1959    // get the dbheader
1960    ssize_t rv = filemgr_read_block(file, buf, bid);
1961    if (rv != (ssize_t)file->blocksize) {
1962        _log_errno_str(file->ops, log_callback,
1963                       (fdb_status) rv, "READ", file->filename);
1964        return FDB_RESULT_READ_FAIL;
1965    }
1966    // set the marker to an invalid value
1967    memset(marker, BLK_MARKER_BAD, BLK_MARKER_SIZE);
1968    memcpy((uint8_t *)buf + file->blocksize - BLK_MARKER_SIZE,
1969           marker, BLK_MARKER_SIZE);
1970
1971    // write it back
1972    rv = filemgr_write_blocks(file, buf, 1, bid);
1973    _log_errno_str(file->ops, log_callback, (fdb_status) rv,
1974                   "WRITE", file->filename);
1975    if (rv != (ssize_t)file->blocksize) {
1976        _filemgr_release_temp_buf(buf);
1977        spin_unlock(&file->lock);
1978        filemgr_clear_io_inprog(file);
1979        return rv < 0 ? (fdb_status) rv : FDB_RESULT_WRITE_FAIL;
1980    }
1981    _filemgr_release_temp_buf(buf);
1982    file->ops->fsync(file->fd);
1983    return FDB_RESULT_SUCCESS;
1984}
1985bool filemgr_invalidate_block(struct filemgr *file, bid_t bid)
1986{
1987    bool ret;
1988    if (atomic_get_uint64_t(&file->last_commit) < bid * file->blocksize) {
1989        ret = true; // block invalidated was allocated recently (uncommitted)
1990    } else {
1991        ret = false; // a block from the past is invalidated (committed)
1992    }
1993    if (global_config.ncacheblock > 0) {
1994        bcache_invalidate_block(file, bid);
1995    }
1996    return ret;
1997}
1998
1999bool filemgr_is_fully_resident(struct filemgr *file)
2000{
2001    bool ret = false;
2002    if (global_config.ncacheblock > 0) {
2003        //TODO: A better thing to do is to track number of document blocks
2004        // and only compare those with the cached document block count
2005        double num_cached_blocks = (double)bcache_get_num_blocks(file);
2006        uint64_t num_blocks = atomic_get_uint64_t(&file->pos)
2007                                 / file->blocksize;
2008        double num_fblocks = (double)num_blocks;
2009        if (num_cached_blocks > num_fblocks * FILEMGR_RESIDENT_THRESHOLD) {
2010            ret = true;
2011        }
2012    }
2013    return ret;
2014}
2015
2016uint64_t filemgr_flush_immutable(struct filemgr *file,
2017                                   err_log_callback *log_callback)
2018{
2019    uint64_t ret = 0;
2020    if (global_config.ncacheblock > 0) {
2021        if (atomic_get_uint8_t(&file->io_in_prog)) {
2022            return 0;
2023        }
2024        ret = bcache_get_num_immutable(file);
2025        if (!ret) {
2026            return ret;
2027        }
2028        fdb_status rv = bcache_flush_immutable(file);
2029        if (rv != FDB_RESULT_SUCCESS) {
2030            _log_errno_str(file->ops, log_callback, (fdb_status)rv, "WRITE",
2031                           file->filename);
2032        }
2033        return bcache_get_num_immutable(file);
2034    }
2035
2036    return ret;
2037}
2038
2039fdb_status filemgr_read(struct filemgr *file, bid_t bid, void *buf,
2040                        err_log_callback *log_callback,
2041                        bool read_on_cache_miss)
2042{
2043    size_t lock_no;
2044    ssize_t r;
2045    uint64_t pos = bid * file->blocksize;
2046    fdb_status status = FDB_RESULT_SUCCESS;
2047    uint64_t curr_pos = atomic_get_uint64_t(&file->pos);
2048
2049    if (pos >= curr_pos) {
2050        const char *msg = "Read error: read offset %" _F64 " exceeds the file's "
2051                          "current offset %" _F64 " in a database file '%s'\n";
2052        fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, pos, curr_pos,
2053                file->filename);
2054        return FDB_RESULT_READ_FAIL;
2055    }
2056
2057    if (global_config.ncacheblock > 0) {
2058        lock_no = bid % DLOCK_MAX;
2059        (void)lock_no;
2060
2061#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2062        plock_entry_t *plock_entry = NULL;
2063        bid_t is_writer = 0;
2064#endif
2065        bool locked = false;
2066        // Note: we don't need to grab lock for committed blocks
2067        // because they are immutable so that no writer will interfere and
2068        // overwrite dirty data
2069        if (filemgr_is_writable(file, bid)) {
2070#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2071            plock_entry = plock_lock(&file->plock, &bid, &is_writer);
2072#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2073            mutex_lock(&file->data_mutex[lock_no]);
2074#else
2075            spin_lock(&file->data_spinlock[lock_no]);
2076#endif //__FILEMGR_DATA_PARTIAL_LOCK
2077            locked = true;
2078        }
2079
2080        r = bcache_read(file, bid, buf);
2081        if (r == 0) {
2082            // cache miss
2083            if (!read_on_cache_miss) {
2084                if (locked) {
2085#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2086                    plock_unlock(&file->plock, plock_entry);
2087#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2088                    mutex_unlock(&file->data_mutex[lock_no]);
2089#else
2090                    spin_unlock(&file->data_spinlock[lock_no]);
2091#endif //__FILEMGR_DATA_PARTIAL_LOCK
2092                }
2093                const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2094                    "doesn't exist in the cache and read_on_cache_miss flag is turned on.\n";
2095                fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, bid,
2096                        file->filename);
2097                return FDB_RESULT_READ_FAIL;
2098            }
2099
2100            // if normal file, just read a block
2101            r = filemgr_read_block(file, buf, bid);
2102            if (r != (ssize_t)file->blocksize) {
2103                _log_errno_str(file->ops, log_callback,
2104                               (fdb_status) r, "READ", file->filename);
2105                if (locked) {
2106#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2107                    plock_unlock(&file->plock, plock_entry);
2108#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2109                    mutex_unlock(&file->data_mutex[lock_no]);
2110#else
2111                    spin_unlock(&file->data_spinlock[lock_no]);
2112#endif //__FILEMGR_DATA_PARTIAL_LOCK
2113                }
2114                const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2115                    "is not read correctly: only %d bytes read.\n";
2116                status = r < 0 ? (fdb_status)r : FDB_RESULT_READ_FAIL;
2117                fdb_log(log_callback, status, msg, bid, file->filename, r);
2118                if (!log_callback || !log_callback->callback) {
2119                    dbg_print_buf(buf, file->blocksize, true, 16);
2120                }
2121                return status;
2122            }
2123#ifdef __CRC32
2124            status = _filemgr_crc32_check(file, buf);
2125            if (status != FDB_RESULT_SUCCESS) {
2126                _log_errno_str(file->ops, log_callback, status, "READ",
2127                        file->filename);
2128                if (locked) {
2129#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2130                    plock_unlock(&file->plock, plock_entry);
2131#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2132                    mutex_unlock(&file->data_mutex[lock_no]);
2133#else
2134                    spin_unlock(&file->data_spinlock[lock_no]);
2135#endif //__FILEMGR_DATA_PARTIAL_LOCK
2136                }
2137                const char *msg = "Read error: checksum error on BID %" _F64 " in a database file '%s' "
2138                    ": marker %x\n";
2139                fdb_log(log_callback, status, msg, bid,
2140                        file->filename, *((uint8_t*)buf + file->blocksize-1));
2141                if (!log_callback || !log_callback->callback) {
2142                    dbg_print_buf(buf, file->blocksize, true, 16);
2143                }
2144                return status;
2145            }
2146#endif
2147            r = bcache_write(file, bid, buf, BCACHE_REQ_CLEAN, false);
2148            if (r != global_config.blocksize) {
2149                if (locked) {
2150#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2151                    plock_unlock(&file->plock, plock_entry);
2152#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2153                    mutex_unlock(&file->data_mutex[lock_no]);
2154#else
2155                    spin_unlock(&file->data_spinlock[lock_no]);
2156#endif //__FILEMGR_DATA_PARTIAL_LOCK
2157                }
2158                _log_errno_str(file->ops, log_callback,
2159                               (fdb_status) r, "WRITE", file->filename);
2160                const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2161                    "is not written in cache correctly: only %d bytes written.\n";
2162                status = r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2163                fdb_log(log_callback, status, msg, bid, file->filename, r);
2164                if (!log_callback || !log_callback->callback) {
2165                    dbg_print_buf(buf, file->blocksize, true, 16);
2166                }
2167                return status;
2168            }
2169        }
2170        if (locked) {
2171#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2172            plock_unlock(&file->plock, plock_entry);
2173#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2174            mutex_unlock(&file->data_mutex[lock_no]);
2175#else
2176            spin_unlock(&file->data_spinlock[lock_no]);
2177#endif //__FILEMGR_DATA_PARTIAL_LOCK
2178        }
2179    } else {
2180        if (!read_on_cache_miss) {
2181            const char *msg = "Read error: BID %" _F64 " in a database file '%s':"
2182                "block cache is not enabled.\n";
2183            fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, bid,
2184                    file->filename);
2185            return FDB_RESULT_READ_FAIL;
2186        }
2187
2188        r = filemgr_read_block(file, buf, bid);
2189        if (r != (ssize_t)file->blocksize) {
2190            _log_errno_str(file->ops, log_callback, (fdb_status) r, "READ",
2191                           file->filename);
2192            const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2193                "is not read correctly: only %d bytes read (block cache disabled).\n";
2194            status = (r < 0)? (fdb_status)r : FDB_RESULT_READ_FAIL;
2195            fdb_log(log_callback, status, msg, bid, file->filename, r);
2196            if (!log_callback || !log_callback->callback) {
2197                dbg_print_buf(buf, file->blocksize, true, 16);
2198            }
2199            return status;
2200        }
2201
2202#ifdef __CRC32
2203        status = _filemgr_crc32_check(file, buf);
2204        if (status != FDB_RESULT_SUCCESS) {
2205            _log_errno_str(file->ops, log_callback, status, "READ",
2206                           file->filename);
2207            const char *msg = "Read error: checksum error on BID %" _F64 " in a database file '%s' "
2208                ": marker %x (block cache disabled)\n";
2209            fdb_log(log_callback, status, msg, bid,
2210                    file->filename, *((uint8_t*)buf + file->blocksize-1));
2211            if (!log_callback || !log_callback->callback) {
2212                dbg_print_buf(buf, file->blocksize, true, 16);
2213            }
2214            return status;
2215        }
2216#endif
2217    }
2218    return status;
2219}
2220
2221fdb_status filemgr_write_offset(struct filemgr *file, bid_t bid,
2222                                uint64_t offset, uint64_t len, void *buf,
2223                                bool final_write,
2224                                err_log_callback *log_callback)
2225{
2226    size_t lock_no;
2227    ssize_t r = 0;
2228    uint64_t pos = bid * file->blocksize + offset;
2229    uint64_t curr_commit_pos = atomic_get_uint64_t(&file->last_commit);
2230
2231    if (offset + len > file->blocksize) {
2232        const char *msg = "Write error: trying to write the buffer data "
2233            "(offset: %" _F64 ", len: %" _F64 " that exceeds the block size "
2234            "%" _F64 " in a database file '%s'\n";
2235        fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, offset, len,
2236                file->blocksize, file->filename);
2237        return FDB_RESULT_WRITE_FAIL;
2238    }
2239
2240    if (sb_bmp_exists(file->sb)) {
2241        // block reusing is enabled
2242        if (!sb_ops.is_writable(file, bid)) {
2243            const char *msg = "Write error: trying to write at the offset %" _F64 " that is "
2244                              "not identified as a reusable block in "
2245                              "a database file '%s'\n";
2246            fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, pos, file->filename);
2247            return FDB_RESULT_WRITE_FAIL;
2248        }
2249    } else if (pos < curr_commit_pos) {
2250        // stale blocks are not reused yet
2251        if (file->sb == NULL ||
2252            (file->sb && pos >= file->sb->config->num_sb * file->blocksize)) {
2253            // (non-sequential update is exceptionally allowed for superblocks)
2254            const char *msg = "Write error: trying to write at the offset %" _F64 " that is "
2255                              "smaller than the current commit offset %" _F64 " in "
2256                              "a database file '%s'\n";
2257            fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, pos, curr_commit_pos,
2258                    file->filename);
2259            return FDB_RESULT_WRITE_FAIL;
2260        }
2261    }
2262
2263    if (global_config.ncacheblock > 0) {
2264        lock_no = bid % DLOCK_MAX;
2265        (void)lock_no;
2266
2267        bool locked = false;
2268#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2269        plock_entry_t *plock_entry;
2270        bid_t is_writer = 1;
2271        plock_entry = plock_lock(&file->plock, &bid, &is_writer);
2272#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2273        mutex_lock(&file->data_mutex[lock_no]);
2274#else
2275        spin_lock(&file->data_spinlock[lock_no]);
2276#endif //__FILEMGR_DATA_PARTIAL_LOCK
2277        locked = true;
2278
2279        if (len == file->blocksize) {
2280            // write entire block .. we don't need to read previous block
2281            r = bcache_write(file, bid, buf, BCACHE_REQ_DIRTY, final_write);
2282            if (r != global_config.blocksize) {
2283                if (locked) {
2284#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2285                    plock_unlock(&file->plock, plock_entry);
2286#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2287                    mutex_unlock(&file->data_mutex[lock_no]);
2288#else
2289                    spin_unlock(&file->data_spinlock[lock_no]);
2290#endif //__FILEMGR_DATA_PARTIAL_LOCK
2291                }
2292                _log_errno_str(file->ops, log_callback,
2293                               (fdb_status) r, "WRITE", file->filename);
2294                return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2295            }
2296        } else {
2297            // partially write buffer cache first
2298            r = bcache_write_partial(file, bid, buf, offset, len, final_write);
2299            if (r == 0) {
2300                // cache miss
2301                // write partially .. we have to read previous contents of the block
2302                int64_t cur_file_pos = file->ops->goto_eof(file->fd);
2303                if (cur_file_pos < 0) {
2304                    _log_errno_str(file->ops, log_callback,
2305                                   (fdb_status) cur_file_pos, "EOF", file->filename);
2306                    return (fdb_status) cur_file_pos;
2307                }
2308                bid_t cur_file_last_bid = cur_file_pos / file->blocksize;
2309                void *_buf = _filemgr_get_temp_buf();
2310
2311                if (bid >= cur_file_last_bid) {
2312                    // this is the first time to write this block
2313                    // we don't need to read previous block from file.
2314                } else {
2315                    r = filemgr_read_block(file, _buf, bid);
2316                    if (r != (ssize_t)file->blocksize) {
2317                        if (locked) {
2318#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2319                            plock_unlock(&file->plock, plock_entry);
2320#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2321                            mutex_unlock(&file->data_mutex[lock_no]);
2322#else
2323                            spin_unlock(&file->data_spinlock[lock_no]);
2324#endif //__FILEMGR_DATA_PARTIAL_LOCK
2325                        }
2326                        _filemgr_release_temp_buf(_buf);
2327                        _log_errno_str(file->ops, log_callback, (fdb_status) r,
2328                                       "READ", file->filename);
2329                        return r < 0 ? (fdb_status) r : FDB_RESULT_READ_FAIL;
2330                    }
2331                }
2332                memcpy((uint8_t *)_buf + offset, buf, len);
2333                r = bcache_write(file, bid, _buf, BCACHE_REQ_DIRTY, final_write);
2334                if (r != global_config.blocksize) {
2335                    if (locked) {
2336#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2337                        plock_unlock(&file->plock, plock_entry);
2338#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2339                        mutex_unlock(&file->data_mutex[lock_no]);
2340#else
2341                        spin_unlock(&file->data_spinlock[lock_no]);
2342#endif //__FILEMGR_DATA_PARTIAL_LOCK
2343                    }
2344                    _filemgr_release_temp_buf(_buf);
2345                    _log_errno_str(file->ops, log_callback,
2346                            (fdb_status) r, "WRITE", file->filename);
2347                    return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2348                }
2349
2350                _filemgr_release_temp_buf(_buf);
2351            } // cache miss
2352        } // full block or partial block
2353
2354        if (locked) {
2355#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2356            plock_unlock(&file->plock, plock_entry);
2357#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2358            mutex_unlock(&file->data_mutex[lock_no]);
2359#else
2360            spin_unlock(&file->data_spinlock[lock_no]);
2361#endif //__FILEMGR_DATA_PARTIAL_LOCK
2362        }
2363    } else { // block cache disabled
2364
2365#ifdef __CRC32
2366        if (len == file->blocksize) {
2367            uint8_t marker = *((uint8_t*)buf + file->blocksize - 1);
2368            if (marker == BLK_MARKER_BNODE) {
2369                memset((uint8_t *)buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
2370                uint32_t crc32 = get_checksum(reinterpret_cast<const uint8_t*>(buf),
2371                                              file->blocksize,
2372                                              file->crc_mode);
2373                crc32 = _endian_encode(crc32);
2374                memcpy((uint8_t *)buf + BTREE_CRC_OFFSET, &crc32, sizeof(crc32));
2375            }
2376        }
2377#endif
2378
2379        r = file->ops->pwrite(file->fd, buf, len, pos);
2380        _log_errno_str(file->ops, log_callback, (fdb_status) r, "WRITE", file->filename);
2381        if ((uint64_t)r != len) {
2382            return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2383        }
2384    } // block cache check
2385    return FDB_RESULT_SUCCESS;
2386}
2387
2388fdb_status filemgr_write(struct filemgr *file, bid_t bid, void *buf,
2389                   err_log_callback *log_callback)
2390{
2391    return filemgr_write_offset(file, bid, 0, file->blocksize, buf,
2392                                false, // TODO: track immutability of index blk
2393                                log_callback);
2394}
2395
2396fdb_status filemgr_commit(struct filemgr *file, bool sync,
2397                          err_log_callback *log_callback)
2398{
2399    // append header at the end of the file
2400    uint64_t bmp_revnum = 0;
2401    if (sb_ops.get_bmp_revnum) {
2402        bmp_revnum = sb_ops.get_bmp_revnum(file);
2403    }
2404    return filemgr_commit_bid(file, BLK_NOT_FOUND, bmp_revnum,
2405                              sync, log_callback);
2406}
2407
2408fdb_status filemgr_commit_bid(struct filemgr *file, bid_t bid,
2409                              uint64_t bmp_revnum, bool sync,
2410                              err_log_callback *log_callback)
2411{
2412    struct avl_node *a;
2413    struct kvs_node *node;
2414    bid_t prev_bid, _prev_bid;
2415    uint64_t _deltasize, _bmp_revnum;
2416    fdb_seqnum_t _seqnum;
2417    filemgr_header_revnum_t _revnum;
2418    int result = FDB_RESULT_SUCCESS;
2419    bool block_reusing = false;
2420
2421    filemgr_set_io_inprog(file);
2422    if (global_config.ncacheblock > 0) {
2423        result = bcache_flush(file);
2424        if (result != FDB_RESULT_SUCCESS) {
2425            _log_errno_str(file->ops, log_callback, (fdb_status) result,
2426                           "FLUSH", file->filename);
2427            filemgr_clear_io_inprog(file);
2428            return (fdb_status)result;
2429        }
2430    }
2431
2432    spin_lock(&file->lock);
2433
2434    uint16_t header_len = file->header.size;
2435    struct kvs_header *kv_header = file->kv_header;
2436    filemgr_magic_t magic = file->version;
2437
2438    if (file->header.size > 0 && file->header.data) {
2439        void *buf = _filemgr_get_temp_buf();
2440        uint8_t marker[BLK_MARKER_SIZE];
2441
2442        // [header data]:        'header_len' bytes   <---+
2443        // [header revnum]:      8 bytes                  |
2444        // [default KVS seqnum]: 8 bytes                  |
2445        // ...                                            |
2446        // (empty)                                    blocksize
2447        // ...                                            |
2448        // [SB bitmap revnum]:   8 bytes                  |
2449        // [Delta size]:         8 bytes                  |
2450        // [prev header bid]:    8 bytes                  |
2451        // [header length]:      2 bytes                  |
2452        // [magic number]:       8 bytes                  |
2453        // [block marker]:       1 byte               <---+
2454
2455        // header data
2456        memcpy(buf, file->header.data, header_len);
2457        // header rev number
2458        _revnum = _endian_encode(file->header.revnum);
2459        memcpy((uint8_t *)buf + header_len, &_revnum,
2460               sizeof(filemgr_header_revnum_t));
2461        // file's sequence number (default KVS seqnum)
2462        _seqnum = _endian_encode(file->header.seqnum.load());
2463        memcpy((uint8_t *)buf + header_len + sizeof(filemgr_header_revnum_t),
2464               &_seqnum, sizeof(fdb_seqnum_t));
2465
2466        // current header's sb bmp revision number
2467        if (file->sb) {
2468            _bmp_revnum = _endian_encode(bmp_revnum);
2469            memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2470                   - sizeof(header_len) - sizeof(_prev_bid)
2471                   - sizeof(_deltasize) - sizeof(_bmp_revnum)
2472                   - BLK_MARKER_SIZE),
2473                   &_bmp_revnum, sizeof(_bmp_revnum));
2474        }
2475
2476        // delta size since prior commit
2477        _deltasize = _endian_encode(file->header.stat.deltasize //index+data
2478                                  + wal_get_datasize(file)); // wal datasize
2479        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2480               - sizeof(header_len) - sizeof(_prev_bid)*2 - BLK_MARKER_SIZE),
2481               &_deltasize, sizeof(_deltasize));
2482
2483        // Reset in-memory delta size of the header for next commit...
2484        file->header.stat.deltasize = 0; // single kv store header
2485        if (kv_header) { // multi kv store stats
2486            a = avl_first(kv_header->idx_id);
2487            while (a) {
2488                node = _get_entry(a, struct kvs_node, avl_id);
2489                a = avl_next(&node->avl_id);
2490                node->stat.deltasize = 0;
2491            }
2492        }
2493
2494        // prev header bid
2495        prev_bid = atomic_get_uint64_t(&file->header.bid);
2496        _prev_bid = _endian_encode(prev_bid);
2497        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2498               - sizeof(header_len) - sizeof(_prev_bid) - BLK_MARKER_SIZE),
2499               &_prev_bid, sizeof(_prev_bid));
2500        // header length
2501        header_len = _endian_encode(header_len);
2502        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2503               - sizeof(header_len) - BLK_MARKER_SIZE),
2504               &header_len, sizeof(header_len));
2505        // magic number
2506        magic = _endian_encode(magic);
2507        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2508               - BLK_MARKER_SIZE), &magic, sizeof(magic));
2509
2510        // marker
2511        memset(marker, BLK_MARKER_DBHEADER, BLK_MARKER_SIZE);
2512        memcpy((uint8_t *)buf + file->blocksize - BLK_MARKER_SIZE,
2513               marker, BLK_MARKER_SIZE);
2514
2515        if (bid == BLK_NOT_FOUND) {
2516            // append header at the end of file
2517            bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
2518            block_reusing = false;
2519        } else {
2520            // write header in the allocated (reused) block
2521            block_reusing = true;
2522            // we MUST invalidate the header block 'bid', since previous
2523            // contents of 'bid' may remain in block cache and cause data
2524            // inconsistency if reading header block hits the cache.
2525            bcache_invalidate_block(file, bid);
2526        }
2527
2528        ssize_t rv = filemgr_write_blocks(file, buf, 1, bid);
2529        _log_errno_str(file->ops, log_callback, (fdb_status) rv,
2530                       "WRITE", file->filename);
2531        if (rv != (ssize_t)file->blocksize) {
2532            _filemgr_release_temp_buf(buf);
2533            spin_unlock(&file->lock);
2534            filemgr_clear_io_inprog(file);
2535            return rv < 0 ? (fdb_status) rv : FDB_RESULT_WRITE_FAIL;
2536        }
2537
2538        if (prev_bid) {
2539            // mark prev DB header as stale
2540            filemgr_add_stale_block(file, prev_bid * file->blocksize, file->blocksize);
2541        }
2542
2543        atomic_store_uint64_t(&file->header.bid, bid);
2544        if (!block_reusing) {
2545            atomic_add_uint64_t(&file->pos, file->blocksize);
2546        }
2547
2548        _filemgr_release_temp_buf(buf);
2549    }
2550
2551    if (sb_bmp_exists(file->sb) &&
2552        atomic_get_uint64_t(&file->sb->cur_alloc_bid) != BLK_NOT_FOUND &&
2553        atomic_get_uint8_t(&file->status) == FILE_NORMAL) {
2554        // block reusing is currently enabled
2555        atomic_store_uint64_t(&file->last_commit,
2556            atomic_get_uint64_t(&file->sb->cur_alloc_bid) * file->blocksize);
2557        // Since some more blocks may be allocated after the header block
2558        // (for storing BMP data or system docs for stale info)
2559        // so that the block pointed to by 'cur_alloc_bid' may have
2560        // different BMP revision number. So we have to use the
2561        // up-to-date bmp_revnum here.
2562        /*
2563          sb_bmp_is_writable() was reporting a block is not writable.
2564          This is because the test bid >= last_commit fails. This is due to
2565          the last_writable_bmp_revnum getting set in all cases, even when
2566          last_commit was set to the end of file.
2567          This causes the a bitmap version mismatch in the check for a writable
2568          block. This fix is a work around to avoid the failure. By setting the
2569          last_writable_bmp_revnum to current bitmap version only when
2570          last_commit is not set to end of file.
2571        */
2572        atomic_store_uint64_t(&file->last_writable_bmp_revnum,
2573                              filemgr_get_sb_bmp_revnum(file));
2574    } else {
2575        atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
2576    }
2577    spin_unlock(&file->lock);
2578
2579    if (sync) {
2580        result = file->ops->fsync(file->fd);
2581        _log_errno_str(file->ops, log_callback, (fdb_status)result,
2582                       "FSYNC", file->filename);
2583    }
2584    filemgr_clear_io_inprog(file);
2585    return (fdb_status) result;
2586}
2587
2588fdb_status filemgr_sync(struct filemgr *file, bool sync_option,
2589                        err_log_callback *log_callback)
2590{
2591    fdb_status result = FDB_RESULT_SUCCESS;
2592    if (global_config.ncacheblock > 0) {
2593        result = bcache_flush(file);
2594        if (result != FDB_RESULT_SUCCESS) {
2595            _log_errno_str(file->ops, log_callback, (fdb_status) result,
2596                           "FLUSH", file->filename);
2597            return result;
2598        }
2599    }
2600
2601    if (sync_option && file->fflags & FILEMGR_SYNC) {
2602        int rv = file->ops->fsync(file->fd);
2603        _log_errno_str(file->ops, log_callback, (fdb_status)rv, "FSYNC", file->filename);
2604        return (fdb_status) rv;
2605    }
2606    return result;
2607}
2608
2609fdb_status filemgr_copy_file_range(struct filemgr *src_file,
2610                                   struct filemgr *dst_file,
2611                                   bid_t src_bid, bid_t dst_bid,
2612                                   bid_t clone_len)
2613{
2614    uint32_t blocksize = src_file->blocksize;
2615    fdb_status fs = (fdb_status)dst_file->ops->copy_file_range(
2616                                            src_file->fs_type,
2617                                            src_file->fd,
2618                                            dst_file->fd,
2619                                            src_bid * blocksize,
2620                                            dst_bid * blocksize,
2621                                            clone_len * blocksize);
2622    if (fs != FDB_RESULT_SUCCESS) {
2623        return fs;
2624    }
2625    atomic_store_uint64_t(&dst_file->pos, (dst_bid + clone_len) * blocksize);
2626    return FDB_RESULT_SUCCESS;
2627}
2628
2629void filemgr_update_file_status(struct filemgr *file, file_status_t status)
2630{
2631    spin_lock(&file->lock);
2632    atomic_store_uint8_t(&file->status, status);
2633    spin_unlock(&file->lock);
2634}
2635
2636static void assign_old_filename(struct filemgr *file, const char *old_filename)
2637{
2638    free(file->old_filename);
2639    if (old_filename) {
2640        file->old_filename = (char*)malloc(strlen(old_filename) + 1);
2641        strcpy(file->old_filename, old_filename);
2642    } else {
2643        file->old_filename = NULL;
2644    }
2645}
2646
2647static void assign_new_filename(struct filemgr *file, const char *new_filename)
2648{
2649    free(file->new_filename);
2650    if (new_filename) {
2651        file->new_filename = (char*)malloc(strlen(new_filename) + 1);
2652        strcpy(file->new_filename, new_filename);
2653    } else {
2654        file->new_filename = NULL;
2655    }
2656}
2657
2658bool filemgr_update_file_linkage(struct filemgr *file,
2659                                 const char *old_filename,
2660                                 const char *new_filename)
2661{
2662
2663    bool ret = true;
2664    spin_lock(&file->lock);
2665    if (old_filename) {
2666        if (!file->old_filename) {
2667            assign_old_filename(file, old_filename);
2668        } else {
2669            ret = false;
2670            fdb_assert(atomic_get_uint32_t(&file->ref_count),
2671                       atomic_get_uint32_t(&file->ref_count), 0);
2672        }
2673    }
2674    if (new_filename) {
2675        assign_new_filename(file, new_filename);
2676    }
2677    spin_unlock(&file->lock);
2678    return ret;
2679}
2680
2681void filemgr_set_compaction_state(struct filemgr *old_file,
2682                                  struct filemgr *new_file,
2683                                  file_status_t status)
2684{
2685    if (old_file) {
2686        spin_lock(&old_file->lock);
2687        assign_new_filename(old_file, new_file ? new_file->filename : NULL);
2688        atomic_store_uint8_t(&old_file->status, status);
2689        spin_unlock(&old_file->lock);
2690
2691        if (new_file) {
2692            spin_lock(&new_file->lock);
2693            assign_old_filename(new_file, old_file->filename);
2694            spin_unlock(&new_file->lock);
2695        }
2696    }
2697}
2698
2699bool filemgr_set_kv_header(struct filemgr *file, struct kvs_header *kv_header,
2700                           void (*free_kv_header)(struct filemgr *file))
2701{
2702    bool ret;
2703    spin_lock(&file->lock);
2704
2705    if (!file->kv_header) {
2706        file->kv_header = kv_header;
2707        file->free_kv_header = free_kv_header;
2708        ret = true;
2709    } else {
2710        ret = false;
2711    }
2712
2713    spin_unlock(&file->lock);
2714
2715    return ret;
2716}
2717
2718struct kvs_header *filemgr_get_kv_header(struct filemgr *file)
2719{
2720    struct kvs_header *kv_header = NULL;
2721    spin_lock(&file->lock);
2722    kv_header = file->kv_header;
2723    spin_unlock(&file->lock);
2724    return kv_header;
2725}
2726
2727// Check if there is a file that still points to the old_file that is being
2728// compacted away. If so open the file and return its pointer.
2729static
2730void *_filemgr_check_stale_link(struct hash_elem *h, void *ctx) {
2731    struct filemgr *cur_file = (struct filemgr *)ctx;
2732    struct filemgr *file = _get_entry(h, struct filemgr, e);
2733    spin_lock(&file->lock);
2734    if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING &&
2735        !strcmp(file->new_filename, cur_file->filename)) {
2736        // Incrementing reference counter below is the same as filemgr_open()
2737        // We need to do this to ensure that the pointer returned does not
2738        // get freed outside the filemgr_open lock
2739        atomic_incr_uint32_t(&file->ref_count);
2740        spin_unlock(&file->lock);
2741        return (void *)file;
2742    }
2743    spin_unlock(&file->lock);
2744    return (void *)NULL;
2745}
2746
2747struct filemgr *filemgr_search_stale_links(struct filemgr *cur_file) {
2748    struct filemgr *very_old_file;
2749    spin_lock(&filemgr_openlock);
2750    very_old_file = (struct filemgr *)hash_scan(&hash,
2751                                         _filemgr_check_stale_link, cur_file);
2752    spin_unlock(&filemgr_openlock);
2753    return very_old_file;
2754}
2755
2756char *filemgr_redirect_old_file(struct filemgr *very_old_file,
2757                                struct filemgr *new_file,
2758                                filemgr_redirect_hdr_func
2759                                redirect_header_func) {
2760    if (!very_old_file || !new_file) {
2761        return NULL;
2762    }
2763
2764    size_t old_header_len, new_header_len;
2765    uint16_t new_filename_len;
2766    char *past_filename;
2767    spin_lock(&very_old_file->lock);
2768
2769    struct filemgr *new_file_of_very_old_file =
2770        filemgr_get_instance(very_old_file->new_filename);
2771
2772    if (very_old_file->header.size == 0 || !new_file_of_very_old_file) {
2773        spin_unlock(&very_old_file->lock);
2774        return NULL;
2775    }
2776
2777    old_header_len = very_old_file->header.size;
2778    new_filename_len = strlen(new_file->filename);
2779    // Find out the new DB header length with new_file's filename
2780    new_header_len = old_header_len
2781                     - strlen(new_file_of_very_old_file->filename)
2782                     + new_filename_len;
2783    // As we are going to change the new_filename field in the DB header of the
2784    // very_old_file, maybe reallocate DB header buf to accomodate bigger value
2785    if (new_header_len > old_header_len) {
2786        very_old_file->header.data = realloc(very_old_file->header.data,
2787                                             new_file->blocksize);
2788    }
2789    // Re-direct very_old_file to new_file
2790    assign_new_filename(very_old_file, new_file->filename);
2791    // Note that the old_filename of the new_file is not updated, this
2792    // is so that every file in the history is reachable from the current file.
2793
2794    past_filename = redirect_header_func(very_old_file,
2795                                         (uint8_t *)very_old_file->header.data,
2796                                         new_file);//Update in-memory header
2797    very_old_file->header.size = new_header_len;
2798    ++(very_old_file->header.revnum);
2799
2800    spin_unlock(&very_old_file->lock);
2801    return past_filename;
2802}
2803
2804#include <string>
2805int compare_directory_paths(char *file1, char *file2) {
2806    std::string path1(file1);
2807    std::string path2(file2);
2808    std::string dir1;
2809    std::string dir2;
2810    char sep = '/';
2811#ifdef _WIN32
2812    sep = '\\';
2813#endif
2814
2815    size_t i = path1.rfind(sep, path1.length());
2816    if (i != std::string::npos) {
2817        dir1 = path1.substr(0,i);
2818    } else {
2819        dir1 = ".";
2820    }
2821
2822    i = path2.rfind(sep, path2.length());
2823    if (i != std::string::npos) {
2824        dir2 = path2.substr(0,i);
2825    } else {
2826        dir2 = ".";
2827    }
2828    return dir1.compare(dir2);
2829}
2830
2831void filemgr_remove_pending(struct filemgr *old_file,
2832                            struct filemgr *new_file,
2833                            err_log_callback *log_callback)
2834{
2835    if (new_file == NULL) {
2836        return;
2837    }
2838
2839    int diff_dir = compare_directory_paths(old_file->filename, new_file->filename);
2840    spin_lock(&old_file->lock);
2841    if (atomic_get_uint32_t(&old_file->ref_count) > 0) {
2842        // delay removing
2843        assign_new_filename(old_file, new_file->filename);
2844        if (!diff_dir) {
2845            atomic_store_uint8_t(&old_file->status, FILE_REMOVED_PENDING);
2846        }
2847#if !(defined(WIN32) || defined(_WIN32))
2848        // Only for Posix
2849        int ret;
2850        /* if the old_file and new_file are in different directories
2851           do not remove the old_file
2852        */
2853        if (!diff_dir){
2854            ret = unlink(old_file->filename);
2855            _log_errno_str(old_file->ops, log_callback, (fdb_status)ret,
2856                       "UNLINK", old_file->filename);
2857        }
2858#endif
2859
2860        spin_unlock(&old_file->lock);
2861
2862        // Update new_file's old_filename
2863        spin_lock(&new_file->lock);
2864        assign_old_filename(new_file, old_file->filename);
2865        spin_unlock(&new_file->lock);
2866    } else {
2867        // immediatly remove
2868        // LCOV_EXCL_START
2869        spin_unlock(&old_file->lock);
2870
2871        struct filemgr *new_file_of_old_file =
2872            filemgr_get_instance(old_file->new_filename);
2873
2874        if (!lazy_file_deletion_enabled ||
2875            (new_file_of_old_file && new_file_of_old_file->in_place_compaction)) {
2876            if (!diff_dir) {
2877                remove(old_file->filename);
2878            }
2879        }
2880        filemgr_remove_file(old_file, log_callback);
2881        // LCOV_EXCL_STOP
2882    }
2883}
2884
2885// migrate default kv store stats over to new_file
2886struct kvs_ops_stat *filemgr_migrate_op_stats(struct filemgr *old_file,
2887                                              struct filemgr *new_file,
2888                                              struct kvs_info *kvs)
2889{
2890    kvs_ops_stat *ret = NULL;
2891    if (new_file == NULL) {
2892        return NULL;
2893    }
2894
2895    spin_lock(&old_file->lock);
2896    new_file->header.op_stat = old_file->header.op_stat;
2897    ret = &new_file->header.op_stat;
2898    spin_unlock(&old_file->lock);
2899    return ret;
2900}
2901
2902// Note: filemgr_openlock should be held before calling this function.
2903fdb_status filemgr_destroy_file(char *filename,
2904                                struct filemgr_config *config,
2905                                struct hash *destroy_file_set)
2906{
2907    struct filemgr *file = NULL;
2908    struct hash to_destroy_files;
2909    struct hash *destroy_set = (destroy_file_set ? destroy_file_set :
2910                                                  &to_destroy_files);
2911    struct filemgr query;
2912    struct hash_elem *e = NULL;
2913    fdb_status status = FDB_RESULT_SUCCESS;
2914    char *old_filename = NULL;
2915
2916    if (!destroy_file_set) { // top level or non-recursive call
2917        hash_init(destroy_set, NBUCKET, _file_hash, _file_cmp);
2918    }
2919
2920    query.filename = filename;
2921    // check whether file is already being destroyed in parent recursive call
2922    e = hash_find(destroy_set, &query.e);
2923    if (e) { // Duplicate filename found, nothing to be done in this call
2924        if (!destroy_file_set) { // top level or non-recursive call
2925            hash_free(destroy_set);
2926        }
2927        return status;
2928    } else {
2929        // Remember file. Stack value ok IFF single direction recursion
2930        hash_insert(destroy_set, &query.e);
2931    }
2932
2933    // check global list of known files to see if it is already opened or not
2934    e = hash_find(&hash, &query.e);
2935    if (e) {
2936        // already opened (return existing structure)
2937        file = _get_entry(e, struct filemgr, e);
2938
2939        spin_lock(&file->lock);
2940        if (atomic_get_uint32_t(&file->ref_count)) {
2941            spin_unlock(&file->lock);
2942            status = FDB_RESULT_FILE_IS_BUSY;
2943            if (!destroy_file_set) { // top level or non-recursive call
2944                hash_free(destroy_set);
2945            }
2946            return status;
2947        }
2948        spin_unlock(&file->lock);
2949        if (file->old_filename) {
2950            status = filemgr_destroy_file(file->old_filename, config,
2951                                          destroy_set);
2952            if (status != FDB_RESULT_SUCCESS) {
2953                if (!destroy_file_set) { // top level or non-recursive call
2954                    hash_free(destroy_set);
2955                }
2956                return status;
2957            }
2958        }
2959
2960        // Cleanup file from in-memory as well as on-disk
2961        e = hash_remove(&hash, &file->e);
2962        fdb_assert(e, e, 0);
2963        filemgr_free_func(&file->e);
2964        if (filemgr_does_file_exist(filename) == FDB_RESULT_SUCCESS) {
2965            if (remove(filename)) {
2966                status = FDB_RESULT_FILE_REMOVE_FAIL;
2967            }
2968        }
2969    } else { // file not in memory, read on-disk to destroy older versions..
2970        file = (struct filemgr *)alca(struct filemgr, 1);
2971        memset(file, 0x0, sizeof(struct filemgr));
2972        file->filename = filename;
2973        file->ops = get_filemgr_ops();
2974        file->fd = file->ops->open(file->filename, O_RDWR, 0666);
2975        file->blocksize = global_config.blocksize;
2976        file->config = (struct filemgr_config *)alca(struct filemgr_config, 1);
2977        *file->config = *config;
2978        fdb_init_encryptor(&file->encryption, &config->encryption_key);
2979        if (file->fd < 0) {
2980            if (file->fd != FDB_RESULT_NO_SUCH_FILE) {
2981                if (!destroy_file_set) { // top level or non-recursive call
2982                    hash_free(destroy_set);
2983                }
2984                return (fdb_status) file->fd;
2985            }
2986        } else { // file successfully opened, seek to end to get DB header
2987            cs_off_t offset = file->ops->goto_eof(file->fd);
2988            if (offset < 0) {
2989                if (!destroy_file_set) { // top level or non-recursive call
2990                    hash_free(destroy_set);
2991                }
2992                return (fdb_status) offset;
2993            } else { // Need to read DB header which contains old filename
2994                atomic_store_uint64_t(&file->pos, offset);
2995                // initialize CRC mode
2996                if (file->config && file->config->options & FILEMGR_CREATE_CRC32) {
2997                    file->crc_mode = CRC32;
2998                } else {
2999                    file->crc_mode = CRC_DEFAULT;
3000                }
3001
3002                status = _filemgr_load_sb(file, NULL);
3003                if (status != FDB_RESULT_SUCCESS) {
3004                    if (!destroy_file_set) { // top level or non-recursive call
3005                        hash_free(destroy_set);
3006                    }
3007                    file->ops->close(file->fd);
3008                    return status;
3009                }
3010
3011                status = _filemgr_read_header(file, NULL);
3012                if (status != FDB_RESULT_SUCCESS) {
3013                    if (!destroy_file_set) { // top level or non-recursive call
3014                        hash_free(destroy_set);
3015                    }
3016                    file->ops->close(file->fd);
3017                    if (sb_ops.release && file->sb) {
3018                        sb_ops.release(file);
3019                    }
3020                    return status;
3021                }
3022                if (file->header.data) {
3023                    size_t new_fnamelen_off = ver_get_new_filename_off(file->
3024                                                                      version);
3025                    size_t old_fnamelen_off = new_fnamelen_off + 2;
3026                    uint16_t *new_filename_len_ptr = (uint16_t *)((char *)
3027                                                     file->header.data
3028                                                     + new_fnamelen_off);
3029                    uint16_t new_filename_len =
3030                                      _endian_decode(*new_filename_len_ptr);
3031                    uint16_t *old_filename_len_ptr = (uint16_t *)((char *)
3032                                                     file->header.data
3033                                                     + old_fnamelen_off);
3034                    uint16_t old_filename_len =
3035                                      _endian_decode(*old_filename_len_ptr);
3036                    old_filename = (char *)file->header.data + old_fnamelen_off
3037                                   + 2 + new_filename_len;
3038                    if (old_filename_len) {
3039                        status = filemgr_destroy_file(old_filename, config,
3040                                                      destroy_set);
3041                    }
3042                    free(file->header.data);
3043                }
3044                file->ops->close(file->fd);
3045                if (sb_ops.release && file->sb) {
3046                    sb_ops.release(file);
3047                }
3048                if (status == FDB_RESULT_SUCCESS) {
3049                    if (filemgr_does_file_exist(filename)
3050                                               == FDB_RESULT_SUCCESS) {
3051                        if (remove(filename)) {
3052                            status = FDB_RESULT_FILE_REMOVE_FAIL;
3053                        }
3054                    }
3055                }
3056            }
3057        }
3058    }
3059
3060    if (!destroy_file_set) { // top level or non-recursive call
3061        hash_free(destroy_set);
3062    }
3063
3064    return status;
3065}
3066
3067bool filemgr_is_rollback_on(struct filemgr *file)
3068{
3069    bool rv;
3070    spin_lock(&file->lock);
3071    rv = (file->fflags & FILEMGR_ROLLBACK_IN_PROG);
3072    spin_unlock(&file->lock);
3073    return rv;
3074}
3075
3076void filemgr_set_rollback(struct filemgr *file, uint8_t new_val)
3077{
3078    spin_lock(&file->lock);
3079    if (new_val) {
3080        file->fflags |= FILEMGR_ROLLBACK_IN_PROG;
3081    } else {
3082        file->fflags &= ~FILEMGR_ROLLBACK_IN_PROG;
3083    }
3084    spin_unlock(&file->lock);
3085}
3086
3087void filemgr_set_cancel_compaction(struct filemgr *file, bool cancel)
3088{
3089    spin_lock(&file->lock);
3090    if (cancel) {
3091        file->fflags |= FILEMGR_CANCEL_COMPACTION;
3092    } else {
3093        file->fflags &= ~FILEMGR_CANCEL_COMPACTION;
3094    }
3095    spin_unlock(&file->lock);
3096}
3097
3098bool filemgr_is_compaction_cancellation_requested(struct filemgr *file)
3099{
3100    bool rv;
3101    spin_lock(&file->lock);
3102    rv = (file->fflags & FILEMGR_CANCEL_COMPACTION);
3103    spin_unlock(&file->lock);
3104    return rv;
3105}
3106
3107void filemgr_set_successfully_compacted(struct filemgr *file)
3108{
3109    spin_lock(&file->lock);
3110    file->fflags |= FILEMGR_SUCCESSFULLY_COMPACTED;
3111    spin_unlock(&file->lock);
3112}
3113
3114bool filemgr_is_successfully_compacted(struct filemgr *file)
3115{
3116    bool rv;
3117    spin_lock(&file->lock);
3118    rv = (file->fflags & FILEMGR_SUCCESSFULLY_COMPACTED);
3119    spin_unlock(&file->lock);
3120    return rv;
3121}
3122
3123void filemgr_set_in_place_compaction(struct filemgr *file,
3124                                     bool in_place_compaction) {
3125    spin_lock(&file->lock);
3126    file->in_place_compaction = in_place_compaction;
3127    spin_unlock(&file->lock);
3128}
3129
3130bool filemgr_is_in_place_compaction_set(struct filemgr *file)
3131
3132{
3133    bool ret = false;
3134    spin_lock(&file->lock);
3135    ret = file->in_place_compaction;
3136    spin_unlock(&file->lock);
3137    return ret;
3138}
3139
3140void filemgr_mutex_openlock(struct filemgr_config *config)
3141{
3142    filemgr_init(config);
3143
3144    spin_lock(&filemgr_openlock);
3145}
3146
3147void filemgr_mutex_openunlock(void)
3148{
3149    spin_unlock(&filemgr_openlock);
3150}
3151
3152void filemgr_mutex_lock(struct filemgr *file)
3153{
3154    mutex_lock(&file->writer_lock.mutex);
3155    file->writer_lock.locked = true;
3156}
3157
3158bool filemgr_mutex_trylock(struct filemgr *file) {
3159    if (mutex_trylock(&file->writer_lock.mutex)) {
3160        file->writer_lock.locked = true;
3161        return true;
3162    }
3163    return false;
3164}
3165
3166void filemgr_mutex_unlock(struct filemgr *file)
3167{
3168    if (file->writer_lock.locked) {
3169        file->writer_lock.locked = false;
3170        mutex_unlock(&file->writer_lock.mutex);
3171    }
3172}
3173
3174bool filemgr_is_commit_header(void *head_buffer, size_t blocksize)
3175{
3176    uint8_t marker[BLK_MARKER_SIZE];
3177    filemgr_magic_t magic;
3178    marker[0] = *(((uint8_t *)head_buffer)
3179                 + blocksize - BLK_MARKER_SIZE);
3180    if (marker[0] != BLK_MARKER_DBHEADER) {
3181        return false;
3182    }
3183
3184    memcpy(&magic, (uint8_t *) head_buffer
3185            + blocksize - BLK_MARKER_SIZE - sizeof(magic), sizeof(magic));
3186    magic = _endian_decode(magic);
3187
3188    return ver_is_valid_magic(magic);
3189}
3190
3191bool filemgr_is_cow_supported(struct filemgr *src, struct filemgr *dst)
3192{
3193    src->fs_type = src->ops->get_fs_type(src->fd);
3194    if (src->fs_type < 0) {
3195        return false;
3196    }
3197    dst->fs_type = dst->ops->get_fs_type(dst->fd);
3198    if (dst->fs_type < 0) {
3199        return false;
3200    }
3201    if (src->fs_type == dst->fs_type && src->fs_type != FILEMGR_FS_NO_COW) {
3202        return true;
3203    }
3204    return false;
3205}
3206
3207void filemgr_set_throttling_delay(struct filemgr *file, uint64_t delay_us)
3208{
3209    atomic_store_uint32_t(&file->throttling_delay, delay_us,
3210                          std::memory_order_relaxed);
3211}
3212
3213uint32_t filemgr_get_throttling_delay(struct filemgr *file)
3214{
3215    return atomic_get_uint32_t(&file->throttling_delay,
3216                               std::memory_order_relaxed);
3217}
3218
3219void filemgr_clear_stale_list(struct filemgr *file)
3220{
3221    if (file->stale_list) {
3222        // if the items in the list are not freed yet, release them first.
3223        struct list_elem *e;
3224        struct stale_data *item;
3225
3226        e = list_begin(file->stale_list);
3227        while (e) {
3228            item = _get_entry(e, struct stale_data, le);
3229            e = list_remove(file->stale_list, e);
3230            free(item);
3231        }
3232        file->stale_list = NULL;
3233    }
3234}
3235
3236void filemgr_clear_stale_info_tree(struct filemgr *file)
3237{
3238    struct avl_node *a;
3239    struct list_elem *e;
3240    struct stale_info_commit *commit;
3241    struct stale_info_entry *entry;
3242
3243    a = avl_first(&file->stale_info_tree);
3244    while (a) {
3245        commit = _get_entry(a, struct stale_info_commit, avl);
3246        a = avl_next(&commit->avl);
3247        avl_remove(&file->stale_info_tree, &commit->avl);
3248
3249        e = list_begin(&commit->doc_list);
3250        while (e) {
3251            entry = _get_entry(e, struct stale_info_entry, le);
3252            e = list_next(&entry->le);
3253            list_remove(&commit->doc_list, &entry->le);
3254            free(entry->ctx);
3255            free(entry);
3256        }
3257        free(commit);
3258    }
3259}
3260
3261void filemgr_clear_mergetree(struct filemgr *file)
3262{
3263    struct avl_node *a;
3264    struct stale_data *entry;
3265
3266    a = avl_first(&file->mergetree);
3267    while (a) {
3268        entry = _get_entry(a, struct stale_data, avl);
3269        a = avl_next(&entry->avl);
3270        avl_remove(&file->mergetree, &entry->avl);
3271        free(entry);
3272    }
3273}
3274
3275void filemgr_add_stale_block(struct filemgr *file,
3276                             bid_t pos,
3277                             size_t len)
3278{
3279    if (file->stale_list) {
3280        struct stale_data *item;
3281        struct list_elem *e;
3282
3283        e = list_end(file->stale_list);
3284
3285        if (e) {
3286            item = _get_entry(e, struct stale_data, le);
3287            if (item->pos + item->len == pos) {
3288                // merge if consecutive item
3289                item->len += len;
3290                return;
3291            }
3292        }
3293
3294        item = (struct stale_data*)calloc(1, sizeof(struct stale_data));
3295        item->pos = pos;
3296        item->len = len;
3297        list_push_back(file->stale_list, &item->le);
3298    }
3299}
3300
3301size_t filemgr_actual_stale_length(struct filemgr *file,
3302                                   bid_t offset,
3303                                   size_t length)
3304{
3305    size_t actual_len;
3306    bid_t start_bid, end_bid;
3307
3308    start_bid = offset / file->blocksize;
3309    end_bid = (offset + length) / file->blocksize;
3310
3311    actual_len = length + (end_bid - start_bid);
3312    if ((offset + actual_len) % file->blocksize ==
3313        file->blocksize - 1) {
3314        actual_len += 1;
3315    }
3316
3317    return actual_len;
3318}
3319
3320// if a document is not physically consecutive,
3321// return all fragmented regions.
3322struct stale_regions filemgr_actual_stale_regions(struct filemgr *file,
3323                                                  bid_t offset,
3324                                                  size_t length)
3325{
3326    uint8_t *buf = alca(uint8_t, file->blocksize);
3327    size_t remaining = length;
3328    size_t real_blocksize = file->blocksize;
3329    size_t blocksize = real_blocksize;
3330    size_t cur_pos, space_in_block, count;
3331    bid_t cur_bid;
3332    bool non_consecutive = ver_non_consecutive_doc(file->version);
3333    struct docblk_meta blk_meta;
3334    struct stale_regions ret;
3335    struct stale_data *arr = NULL, *cur_region;
3336
3337    if (non_consecutive) {
3338        blocksize -= DOCBLK_META_SIZE;
3339
3340        cur_bid = offset / file->blocksize;
3341        // relative position in the block 'cur_bid'
3342        cur_pos = offset % file->blocksize;
3343
3344        count = 0;