xref: /6.0.3/forestdb/src/filemgr.cc (revision a54fc06a)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <sys/stat.h>
23#include <stdarg.h>
24#if !defined(WIN32) && !defined(_WIN32)
25#include <sys/time.h>
26#endif
27#include <time.h>
28
29#include "filemgr.h"
30#include "filemgr_ops.h"
31#include "hash_functions.h"
32#include "blockcache.h"
33#include "wal.h"
34#include "list.h"
35#include "fdb_internal.h"
36#include "time_utils.h"
37#include "encryption.h"
38#include "version.h"
39
40#include "memleak.h"
41
42#ifdef __DEBUG
43#ifndef __DEBUG_FILEMGR
44    #undef DBG
45    #undef DBGCMD
46    #undef DBGSW
47    #define DBG(...)
48    #define DBGCMD(...)
49    #define DBGSW(n, ...)
50#endif
51#endif
52
53// NBUCKET must be power of 2
54#define NBUCKET (1024)
55
56// global static variables
57#ifdef SPIN_INITIALIZER
58static spin_t initial_lock = SPIN_INITIALIZER;
59#else
60static volatile unsigned int initial_lock_status = 0;
61static spin_t initial_lock;
62#endif
63
64
65static volatile uint8_t filemgr_initialized = 0;
66extern volatile uint8_t bgflusher_initialized;
67static struct filemgr_config global_config;
68static struct hash hash;
69static spin_t filemgr_openlock;
70
71static const int MAX_STAT_UPDATE_RETRIES = 5;
72
73struct temp_buf_item{
74    void *addr;
75    struct list_elem le;
76};
77static struct list temp_buf;
78static spin_t temp_buf_lock;
79
80static bool lazy_file_deletion_enabled = false;
81static register_file_removal_func register_file_removal = NULL;
82static check_file_removal_func is_file_removed = NULL;
83
84static struct sb_ops sb_ops;
85
86static void spin_init_wrap(void *lock) {
87    spin_init((spin_t*)lock);
88}
89
90static void spin_destroy_wrap(void *lock) {
91    spin_destroy((spin_t*)lock);
92}
93
94static void spin_lock_wrap(void *lock) {
95    spin_lock((spin_t*)lock);
96}
97
98static void spin_unlock_wrap(void *lock) {
99    spin_unlock((spin_t*)lock);
100}
101
102static void mutex_init_wrap(void *lock) {
103    mutex_init((mutex_t*)lock);
104}
105
106static void mutex_destroy_wrap(void *lock) {
107    mutex_destroy((mutex_t*)lock);
108}
109
110static void mutex_lock_wrap(void *lock) {
111    mutex_lock((mutex_t*)lock);
112}
113
114static void mutex_unlock_wrap(void *lock) {
115    mutex_unlock((mutex_t*)lock);
116}
117
118static int _kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
119{
120    struct kvs_node *aa, *bb;
121    aa = _get_entry(a, struct kvs_node, avl_id);
122    bb = _get_entry(b, struct kvs_node, avl_id);
123
124    if (aa->id < bb->id) {
125        return -1;
126    } else if (aa->id > bb->id) {
127        return 1;
128    } else {
129        return 0;
130    }
131}
132
133static int _block_is_overlapped(void *pbid1, void *pis_writer1,
134                                void *pbid2, void *pis_writer2,
135                                void *aux)
136{
137    (void)aux;
138    bid_t bid1, is_writer1, bid2, is_writer2;
139    bid1 = *(bid_t*)pbid1;
140    is_writer1 = *(bid_t*)pis_writer1;
141    bid2 = *(bid_t*)pbid2;
142    is_writer2 = *(bid_t*)pis_writer2;
143
144    if (bid1 != bid2) {
145        // not overlapped
146        return 0;
147    } else {
148        // overlapped
149        if (!is_writer1 && !is_writer2) {
150            // both are readers
151            return 0;
152        } else {
153            return 1;
154        }
155    }
156}
157
158void printISOTime(char* buffer, size_t buffer_len) {
159    struct tm* tm_now;
160    time_t rawtime;
161    time(&rawtime);
162    tm_now = localtime(&rawtime);
163
164    // 2017-06-22T10:00:00
165    size_t time_len = strftime(buffer, buffer_len,
166                               "%Y-%m-%dT%H:%M:%S", tm_now);
167
168    // Add milliseconds
169    timeval cur_time;
170    gettimeofday(&cur_time, NULL);
171    size_t milli = cur_time.tv_usec / 1000;
172    // 2017-06-22T10:00:00.123
173    sprintf(buffer + time_len, ".%03d", (int)milli);
174    time_len += 4;
175
176    // timezone offset format: -0500
177    char tz_offset_str[6];
178    size_t offset_len =  strftime(tz_offset_str, 6,
179                                  "%z", tm_now);
180    if (offset_len < 5) {
181        // Time zone info is not supported, skip it.
182        return;
183    }
184
185    // hour
186    strncat(buffer, tz_offset_str, 3);
187    // :
188    strcat(buffer, ":");
189    // min
190    strncat(buffer, tz_offset_str + 3, 2);
191    // final format: 2017-06-22T10:00:00.123-05:00
192}
193
194fdb_status fdb_log(err_log_callback *log_callback,
195                   fdb_status status,
196                   const char *format, ...)
197{
198    char msg[4096];
199    va_list args;
200    va_start(args, format);
201    vsprintf(msg, format, args);
202    va_end(args);
203
204    if (log_callback && log_callback->callback) {
205        log_callback->callback(status, msg, log_callback->ctx_data);
206    } else {
207        char ISO_time_buffer[64];
208        printISOTime(ISO_time_buffer, 64);
209        if (status != FDB_RESULT_SUCCESS) {
210            fprintf(stderr, "%s [ERRO][FDB] %s\n", ISO_time_buffer, msg);
211        } else {
212            fprintf(stderr, "%s [INFO][FDB] %s\n", ISO_time_buffer, msg);
213        }
214    }
215    return status;
216}
217
218static void _log_errno_str(struct filemgr_ops *ops,
219                           err_log_callback *log_callback,
220                           fdb_status io_error,
221                           const char *what,
222                           const char *filename)
223{
224    if (io_error < 0) {
225        char errno_msg[512];
226        ops->get_errno_str(errno_msg, 512);
227        fdb_log(log_callback, io_error,
228                "Error in %s on a database file '%s', %s", what, filename, errno_msg);
229    }
230}
231
232static uint32_t _file_hash(struct hash *hash, struct hash_elem *e)
233{
234    struct filemgr *file = _get_entry(e, struct filemgr, e);
235    int len = strlen(file->filename);
236
237    return get_checksum(reinterpret_cast<const uint8_t*>(file->filename), len) &
238                        ((unsigned)(NBUCKET-1));
239}
240
241static int _file_cmp(struct hash_elem *a, struct hash_elem *b)
242{
243    struct filemgr *aa, *bb;
244    aa = _get_entry(a, struct filemgr, e);
245    bb = _get_entry(b, struct filemgr, e);
246    return strcmp(aa->filename, bb->filename);
247}
248
249void filemgr_init(struct filemgr_config *config)
250{
251    // global initialization
252    // initialized only once at first time
253    if (!filemgr_initialized) {
254#ifndef SPIN_INITIALIZER
255        // Note that only Windows passes through this routine
256        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
257            // atomically initialize spin lock only once
258            spin_init(&initial_lock);
259            initial_lock_status = 2;
260        } else {
261            // the others ... wait until initializing 'initial_lock' is done
262            while (initial_lock_status != 2) {
263                Sleep(1);
264            }
265        }
266#endif
267
268        spin_lock(&initial_lock);
269        if (!filemgr_initialized) {
270            memset(&sb_ops, 0x0, sizeof(sb_ops));
271            global_config = *config;
272
273            if (global_config.ncacheblock > 0)
274                bcache_init(global_config.ncacheblock, global_config.blocksize);
275
276            hash_init(&hash, NBUCKET, _file_hash, _file_cmp);
277
278            // initialize temp buffer
279            list_init(&temp_buf);
280            spin_init(&temp_buf_lock);
281
282            // initialize global lock
283            spin_init(&filemgr_openlock);
284
285            // set the initialize flag
286            filemgr_initialized = 1;
287        }
288        spin_unlock(&initial_lock);
289    }
290}
291
292void filemgr_set_lazy_file_deletion(bool enable,
293                                    register_file_removal_func regis_func,
294                                    check_file_removal_func check_func)
295{
296    lazy_file_deletion_enabled = enable;
297    register_file_removal = regis_func;
298    is_file_removed = check_func;
299}
300
301void filemgr_set_sb_operation(struct sb_ops ops)
302{
303    sb_ops = ops;
304}
305
306static void * _filemgr_get_temp_buf()
307{
308    struct list_elem *e;
309    struct temp_buf_item *item;
310
311    spin_lock(&temp_buf_lock);
312    e = list_pop_front(&temp_buf);
313    if (e) {
314        item = _get_entry(e, struct temp_buf_item, le);
315    } else {
316        void *addr = NULL;
317
318        malloc_align(addr, FDB_SECTOR_SIZE,
319                     global_config.blocksize + sizeof(struct temp_buf_item));
320
321        item = (struct temp_buf_item *)((uint8_t *) addr + global_config.blocksize);
322        item->addr = addr;
323    }
324    spin_unlock(&temp_buf_lock);
325
326    return item->addr;
327}
328
329static void _filemgr_release_temp_buf(void *buf)
330{
331    struct temp_buf_item *item;
332
333    spin_lock(&temp_buf_lock);
334    item = (struct temp_buf_item*)((uint8_t *)buf + global_config.blocksize);
335    list_push_front(&temp_buf, &item->le);
336    spin_unlock(&temp_buf_lock);
337}
338
339static void _filemgr_shutdown_temp_buf()
340{
341    struct list_elem *e;
342    struct temp_buf_item *item;
343    size_t count=0;
344
345    spin_lock(&temp_buf_lock);
346    e = list_begin(&temp_buf);
347    while(e){
348        item = _get_entry(e, struct temp_buf_item, le);
349        e = list_remove(&temp_buf, e);
350        free_align(item->addr);
351        count++;
352    }
353    spin_unlock(&temp_buf_lock);
354}
355
356// Read a block from the file, decrypting if necessary.
357static ssize_t filemgr_read_block(struct filemgr *file, void *buf, bid_t bid) {
358    ssize_t result = file->ops->pread(file->fd, buf, file->blocksize,
359                                      file->blocksize*bid);
360    if (file->encryption.ops && result > 0) {
361        if (result != (ssize_t)file->blocksize)
362            return FDB_RESULT_READ_FAIL;
363        fdb_status status = fdb_decrypt_block(&file->encryption, buf, result, bid);
364        if (status != FDB_RESULT_SUCCESS)
365            return status;
366    }
367    return result;
368}
369
370// Write consecutive block(s) to the file, encrypting if necessary.
371ssize_t filemgr_write_blocks(struct filemgr *file, void *buf, unsigned num_blocks, bid_t start_bid) {
372    size_t blocksize = file->blocksize;
373    cs_off_t offset = start_bid * blocksize;
374    size_t nbytes = num_blocks * blocksize;
375    if (file->encryption.ops == NULL) {
376        return file->ops->pwrite(file->fd, buf, nbytes, offset);
377    } else {
378        uint8_t *encrypted_buf;
379        if (nbytes > 4096)
380            encrypted_buf = (uint8_t*)malloc(nbytes);
381        else
382            encrypted_buf = alca(uint8_t, nbytes); // most common case (writing single block)
383        if (!encrypted_buf)
384            return FDB_RESULT_ALLOC_FAIL;
385        fdb_status status = fdb_encrypt_blocks(&file->encryption,
386                                               encrypted_buf,
387                                               buf,
388                                               blocksize,
389                                               num_blocks,
390                                               start_bid);
391        if (nbytes > 4096)
392            free(encrypted_buf);
393        if (status != FDB_RESULT_SUCCESS)
394            return status;
395        return file->ops->pwrite(file->fd, encrypted_buf, nbytes, offset);
396    }
397}
398
399int filemgr_is_writable(struct filemgr *file, bid_t bid)
400{
401    if (sb_bmp_exists(file->sb) && sb_ops.is_writable) {
402        // block reusing is enabled
403        return sb_ops.is_writable(file, bid);
404    } else {
405        uint64_t pos = bid * file->blocksize;
406        // Note that we don't need to grab file->lock here because
407        // 1) both file->pos and file->last_commit are only incremented.
408        // 2) file->last_commit is updated using the value of file->pos,
409        //    and always equal to or smaller than file->pos.
410        return (pos <  atomic_get_uint64_t(&file->pos) &&
411                pos >= atomic_get_uint64_t(&file->last_commit));
412    }
413}
414
415uint64_t filemgr_get_sb_bmp_revnum(struct filemgr *file)
416{
417    if (file->sb && sb_ops.get_bmp_revnum) {
418        return sb_ops.get_bmp_revnum(file);
419    } else {
420        return 0;
421    }
422}
423
424static fdb_status _filemgr_read_header(struct filemgr *file,
425                                       err_log_callback *log_callback)
426{
427    uint8_t marker[BLK_MARKER_SIZE];
428    filemgr_magic_t magic = ver_get_latest_magic();
429    filemgr_header_len_t len;
430    uint8_t *buf;
431    uint32_t crc, crc_file;
432    bool check_crc32_open_rule = false;
433    fdb_status status = FDB_RESULT_SUCCESS;
434    bid_t hdr_bid, hdr_bid_local;
435    size_t min_filesize = 0;
436
437    // get temp buffer
438    buf = (uint8_t *) _filemgr_get_temp_buf();
439
440    // If a header is found crc_mode can change to reflect the file
441    if (file->crc_mode == CRC32) {
442        check_crc32_open_rule = true;
443    }
444
445    hdr_bid = atomic_get_uint64_t(&file->pos) / file->blocksize - 1;
446    hdr_bid_local = hdr_bid;
447
448    if (file->sb) {
449        // superblock exists .. file size does not start from zero.
450        min_filesize = file->sb->config->num_sb * file->blocksize;
451        bid_t sb_last_hdr_bid = atomic_get_uint64_t(&file->sb->last_hdr_bid);
452        if (sb_last_hdr_bid != BLK_NOT_FOUND) {
453            hdr_bid = hdr_bid_local = sb_last_hdr_bid;
454        }
455        // if header info does not exist in superblock,
456        // get DB header at the end of the file.
457    }
458
459    if (atomic_get_uint64_t(&file->pos) > min_filesize) {
460        // Crash Recovery Test 1: unaligned last block write
461        uint64_t remain = atomic_get_uint64_t(&file->pos) % file->blocksize;
462        if (remain) {
463            atomic_sub_uint64_t(&file->pos, remain);
464            atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
465            const char *msg = "Crash Detected: %" _F64 " non-block aligned bytes discarded "
466                "from a database file '%s'\n";
467            DBG(msg, remain, file->filename);
468            fdb_log(log_callback, FDB_RESULT_READ_FAIL /* Need to add a better error code*/,
469                    msg, remain, file->filename);
470        }
471
472        size_t block_counter = 0;
473        do {
474            if (hdr_bid_local * file->blocksize >= file->pos) {
475                // Handling EOF scenario
476                status = FDB_RESULT_NO_DB_HEADERS;
477                const char *msg = "Unable to read block from file '%s' as EOF "
478                                  "reached\n";
479                fdb_log(log_callback, status, msg, file->filename);
480                break;
481            }
482            ssize_t rv = filemgr_read_block(file, buf, hdr_bid_local);
483            if (rv != (ssize_t)file->blocksize) {
484                status = (fdb_status) rv;
485                const char *msg = "Unable to read a database file '%s' with "
486                                  "blocksize %u\n";
487                DBG(msg, file->filename, file->blocksize);
488                fdb_log(log_callback, status, msg, file->filename, file->blocksize);
489                break;
490            }
491            ++block_counter;
492            memcpy(marker, buf + file->blocksize - BLK_MARKER_SIZE,
493                   BLK_MARKER_SIZE);
494
495            if (marker[0] == BLK_MARKER_DBHEADER) {
496                // possible need for byte conversions here
497                memcpy(&magic,
498                       buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
499                       sizeof(magic));
500                magic = _endian_decode(magic);
501
502                if (ver_is_valid_magic(magic)) {
503
504                    memcpy(&len,
505                           buf + file->blocksize - BLK_MARKER_SIZE -
506                           sizeof(magic) - sizeof(len),
507                           sizeof(len));
508                    len = _endian_decode(len);
509
510                    memcpy(&crc_file, buf + len - sizeof(crc), sizeof(crc));
511                    crc_file = _endian_decode(crc_file);
512
513                    // crc check and detect the crc_mode
514                    if (detect_and_check_crc(reinterpret_cast<const uint8_t*>(buf),
515                                             len - sizeof(crc),
516                                             crc_file,
517                                             &file->crc_mode)) {
518                        // crc mode is detected and known.
519                        // check the rules of opening legacy CRC
520                        if (check_crc32_open_rule && file->crc_mode != CRC32) {
521                            const char *msg = "Open of CRC32C file"
522                                              " with forced CRC32\n";
523                            status = FDB_RESULT_INVALID_ARGS;
524                            DBG(msg);
525                            fdb_log(log_callback, status, msg);
526                            break;
527                        } else {
528                            status = FDB_RESULT_SUCCESS;
529
530                            file->header.data = (void *)malloc(file->blocksize);
531
532                            memcpy(file->header.data, buf, len);
533                            memcpy(&file->header.revnum, buf + len,
534                                   sizeof(filemgr_header_revnum_t));
535                            memcpy((void *) &file->header.seqnum,
536                                    buf + len + sizeof(filemgr_header_revnum_t),
537                                    sizeof(fdb_seqnum_t));
538
539                            if (ver_superblock_support(magic)) {
540                                // last_writable_bmp_revnum should be same with
541                                // the current bmp_revnum (since it indicates the
542                                // 'bmp_revnum' of 'sb->cur_alloc_bid').
543                                atomic_store_uint64_t(&file->last_writable_bmp_revnum,
544                                                      filemgr_get_sb_bmp_revnum(file));
545                            }
546
547                            file->header.revnum =
548                                _endian_decode(file->header.revnum);
549                            file->header.seqnum =
550                                _endian_decode(file->header.seqnum.load());
551                            file->header.size = len;
552                            atomic_store_uint64_t(&file->header.bid, hdr_bid_local);
553                            memset(&file->header.stat, 0x0, sizeof(file->header.stat));
554
555                            // release temp buffer
556                            _filemgr_release_temp_buf(buf);
557                        }
558
559                        file->version = magic;
560                        return status;
561                    } else {
562                        status = FDB_RESULT_CHECKSUM_ERROR;
563                        uint32_t crc32 = 0, crc32c = 0;
564                        crc32 = get_checksum(reinterpret_cast<const uint8_t*>(buf),
565                                             len - sizeof(crc),
566                                             CRC32);
567#ifdef _CRC32C
568                        crc32c = get_checksum(reinterpret_cast<const uint8_t*>(buf),
569                                              len - sizeof(crc),
570                                              CRC32C);
571#endif
572                        const char *msg = "Crash Detected: CRC on disk %u != (%u | %u) "
573                            "in a database file '%s'\n";
574                        DBG(msg, crc_file, crc32, crc32c, file->filename);
575                        fdb_log(log_callback, status, msg, crc_file, crc32, crc32c,
576                                file->filename);
577                    }
578                } else {
579                    status = FDB_RESULT_FILE_CORRUPTION;
580                    const char *msg = "Crash Detected: Wrong Magic %" _F64
581                                      " in a database file '%s'\n";
582                    fdb_log(log_callback, status, msg, magic, file->filename);
583                }
584            } else {
585                status = FDB_RESULT_NO_DB_HEADERS;
586                if (block_counter == 1) {
587                    const char *msg = "Crash Detected: Last Block not DBHEADER %0.01x "
588                                      "in a database file '%s'\n";
589                    DBG(msg, marker[0], file->filename);
590                    fdb_log(log_callback, status, msg, marker[0], file->filename);
591                }
592            }
593
594            atomic_store_uint64_t(&file->last_commit, hdr_bid_local * file->blocksize);
595            // traverse headers in a circular manner
596            if (hdr_bid_local) {
597                hdr_bid_local--;
598            } else {
599                hdr_bid_local = atomic_get_uint64_t(&file->pos) / file->blocksize - 1;
600            }
601        } while (hdr_bid_local != hdr_bid);
602    }
603
604    // release temp buffer
605    _filemgr_release_temp_buf(buf);
606
607    file->header.size = 0;
608    file->header.revnum = 0;
609    file->header.seqnum = 0;
610    file->header.data = NULL;
611    atomic_store_uint64_t(&file->header.bid, 0);
612    memset(&file->header.stat, 0x0, sizeof(file->header.stat));
613    file->version = magic;
614    return status;
615}
616
617size_t filemgr_get_ref_count(struct filemgr *file)
618{
619    size_t ret = 0;
620    spin_lock(&file->lock);
621    ret = atomic_get_uint32_t(&file->ref_count);
622    spin_unlock(&file->lock);
623    return ret;
624}
625
626uint64_t filemgr_get_bcache_used_space(void)
627{
628    uint64_t bcache_free_space = 0;
629    if (global_config.ncacheblock) { // If buffer cache is indeed configured
630        bcache_free_space = bcache_get_num_free_blocks();
631        bcache_free_space = (global_config.ncacheblock - bcache_free_space)
632                          * global_config.blocksize;
633    }
634    return bcache_free_space;
635}
636
637struct filemgr_prefetch_args {
638    struct filemgr *file;
639    uint64_t duration;
640    err_log_callback *log_callback;
641    void *aux;
642};
643
644static void *_filemgr_prefetch_thread(void *voidargs)
645{
646    struct filemgr_prefetch_args *args = (struct filemgr_prefetch_args*)voidargs;
647    uint8_t *buf = alca(uint8_t, args->file->blocksize);
648    uint64_t cur_pos = 0, i;
649    uint64_t bcache_free_space;
650    bid_t bid;
651    bool terminate = false;
652    struct timeval begin, cur, gap;
653
654    spin_lock(&args->file->lock);
655    cur_pos = atomic_get_uint64_t(&args->file->last_commit);
656    spin_unlock(&args->file->lock);
657    if (cur_pos < FILEMGR_PREFETCH_UNIT) {
658        terminate = true;
659    } else {
660        cur_pos -= FILEMGR_PREFETCH_UNIT;
661    }
662    // read backwards from the end of the file, in the unit of FILEMGR_PREFETCH_UNIT
663    gettimeofday(&begin, NULL);
664    while (!terminate) {
665        for (i = cur_pos;
666             i < cur_pos + FILEMGR_PREFETCH_UNIT;
667             i += args->file->blocksize) {
668
669            gettimeofday(&cur, NULL);
670            gap = _utime_gap(begin, cur);
671            bcache_free_space = bcache_get_num_free_blocks();
672            bcache_free_space *= args->file->blocksize;
673
674            if (atomic_get_uint8_t(&args->file->prefetch_status)
675                == FILEMGR_PREFETCH_ABORT ||
676                gap.tv_sec >= (int64_t)args->duration ||
677                bcache_free_space < FILEMGR_PREFETCH_UNIT) {
678                // terminate thread when
679                // 1. got abort signal
680                // 2. time out
681                // 3. not enough free space in block cache
682                terminate = true;
683                break;
684            } else {
685                bid = i / args->file->blocksize;
686                if (filemgr_read(args->file, bid, buf, NULL, true)
687                        != FDB_RESULT_SUCCESS) {
688                    // 4. read failure
689                    fdb_log(args->log_callback, FDB_RESULT_READ_FAIL,
690                            "Prefetch thread failed to read a block with block id %" _F64
691                            " from a database file '%s'", bid, args->file->filename);
692                    terminate = true;
693                    break;
694                }
695            }
696        }
697
698        if (cur_pos >= FILEMGR_PREFETCH_UNIT) {
699            cur_pos -= FILEMGR_PREFETCH_UNIT;
700        } else {
701            // remaining space is less than FILEMGR_PREFETCH_UNIT
702            terminate = true;
703        }
704    }
705
706    atomic_cas_uint8_t(&args->file->prefetch_status, FILEMGR_PREFETCH_RUNNING,
707                       FILEMGR_PREFETCH_IDLE);
708    free(args);
709    return NULL;
710}
711
712// prefetch the given DB file
713void filemgr_prefetch(struct filemgr *file,
714                      struct filemgr_config *config,
715                      err_log_callback *log_callback)
716{
717    uint64_t bcache_free_space;
718
719    bcache_free_space = bcache_get_num_free_blocks();
720    bcache_free_space *= file->blocksize;
721
722    // block cache should have free space larger than FILEMGR_PREFETCH_UNIT
723    spin_lock(&file->lock);
724    if (atomic_get_uint64_t(&file->last_commit) > 0 &&
725        bcache_free_space >= FILEMGR_PREFETCH_UNIT) {
726        // invoke prefetch thread
727        struct filemgr_prefetch_args *args;
728        args = (struct filemgr_prefetch_args *)
729               calloc(1, sizeof(struct filemgr_prefetch_args));
730        args->file = file;
731        args->duration = config->prefetch_duration;
732        args->log_callback = log_callback;
733
734        if (atomic_cas_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_IDLE,
735                               FILEMGR_PREFETCH_RUNNING)) {
736            thread_create(&file->prefetch_tid, _filemgr_prefetch_thread, args);
737        }
738    }
739    spin_unlock(&file->lock);
740}
741
742fdb_status filemgr_does_file_exist(char *filename) {
743    struct filemgr_ops *ops = get_filemgr_ops();
744    int fd = ops->open(filename, O_RDONLY, 0444);
745    if (fd < 0) {
746        return (fdb_status) fd;
747    }
748    ops->close(fd);
749    return FDB_RESULT_SUCCESS;
750}
751
752static fdb_status _filemgr_load_sb(struct filemgr *file,
753                                   err_log_callback *log_callback)
754{
755    fdb_status status = FDB_RESULT_SUCCESS;
756    struct sb_config sconfig;
757
758    if (sb_ops.init && sb_ops.get_default_config && sb_ops.read_latest) {
759        sconfig = sb_ops.get_default_config();
760        if (filemgr_get_pos(file)) {
761            // existing file
762            status = sb_ops.read_latest(file, sconfig, log_callback);
763        } else {
764            // new file
765            status = sb_ops.init(file, sconfig, log_callback);
766        }
767    }
768
769    return status;
770}
771
772static filemgr* get_instance_UNLOCKED(const char *filename)
773{
774    if (!filename) {
775        return NULL;
776    }
777
778    struct filemgr query;
779    struct hash_elem *e = NULL;
780    struct filemgr *file = NULL;
781
782    query.filename = (char*)filename;
783    e = hash_find(&hash, &query.e);
784    if (e) {
785        file = _get_entry(e, struct filemgr, e);
786    }
787    return file;
788}
789
790struct filemgr* filemgr_get_instance(const char* filename)
791{
792    spin_lock(&filemgr_openlock);
793    struct filemgr *file = get_instance_UNLOCKED(filename);
794    spin_unlock(&filemgr_openlock);
795
796    return file;
797}
798
799filemgr_open_result filemgr_open(char *filename, struct filemgr_ops *ops,
800                                 struct filemgr_config *config,
801                                 err_log_callback *log_callback)
802{
803    struct filemgr *file = NULL;
804    struct filemgr query;
805    struct hash_elem *e = NULL;
806    bool create = config->options & FILEMGR_CREATE;
807    int file_flag = 0x0;
808    int fd = -1;
809    fdb_status status;
810    filemgr_open_result result = {NULL, FDB_RESULT_OPEN_FAIL};
811
812    filemgr_init(config);
813
814    if (config->encryption_key.algorithm != FDB_ENCRYPTION_NONE && global_config.ncacheblock <= 0) {
815        // cannot use encryption without a block cache
816        result.rv = FDB_RESULT_CRYPTO_ERROR;
817        return result;
818    }
819
820    // check whether file is already opened or not
821    query.filename = filename;
822    spin_lock(&filemgr_openlock);
823    e = hash_find(&hash, &query.e);
824
825    if (e) {
826        // already opened (return existing structure)
827        file = _get_entry(e, struct filemgr, e);
828
829        if (atomic_incr_uint32_t(&file->ref_count) > 1 &&
830            atomic_get_uint8_t(&file->status) != FILE_CLOSED) {
831            spin_unlock(&filemgr_openlock);
832            result.file = file;
833            result.rv = FDB_RESULT_SUCCESS;
834            return result;
835        }
836
837        spin_lock(&file->lock);
838
839        if (atomic_get_uint8_t(&file->status) == FILE_CLOSED) { // if file was closed before
840            file_flag = O_RDWR;
841            if (create) {
842                file_flag |= O_CREAT;
843            }
844            *file->config = *config;
845            file->config->blocksize = global_config.blocksize;
846            file->config->ncacheblock = global_config.ncacheblock;
847            file_flag |= config->flag;
848            file->fd = file->ops->open(file->filename, file_flag, 0666);
849            if (file->fd < 0) {
850                if (file->fd == FDB_RESULT_NO_SUCH_FILE) {
851                    // A database file was manually deleted by the user.
852                    // Clean up global hash table, WAL index, and buffer cache.
853                    // Then, retry it with a create option below IFF it is not
854                    // a read-only open attempt
855                    struct hash_elem *ret;
856                    spin_unlock(&file->lock);
857                    ret = hash_remove(&hash, &file->e);
858                    fdb_assert(ret, 0, 0);
859                    filemgr_free_func(&file->e);
860                    if (!create) {
861                        _log_errno_str(ops, log_callback,
862                                FDB_RESULT_NO_SUCH_FILE, "OPEN", filename);
863                        spin_unlock(&filemgr_openlock);
864                        result.rv = FDB_RESULT_NO_SUCH_FILE;
865                        return result;
866                    }
867                } else {
868                    _log_errno_str(file->ops, log_callback,
869                                  (fdb_status)file->fd, "OPEN", filename);
870                    atomic_decr_uint32_t(&file->ref_count);
871                    spin_unlock(&file->lock);
872                    spin_unlock(&filemgr_openlock);
873                    result.rv = file->fd;
874                    return result;
875                }
876            } else { // Reopening the closed file is succeed.
877                atomic_store_uint8_t(&file->status, FILE_NORMAL);
878                if (config->options & FILEMGR_SYNC) {
879                    file->fflags |= FILEMGR_SYNC;
880                } else {
881                    file->fflags &= ~FILEMGR_SYNC;
882                }
883
884                spin_unlock(&file->lock);
885                spin_unlock(&filemgr_openlock);
886
887                result.file = file;
888                result.rv = FDB_RESULT_SUCCESS;
889                return result;
890            }
891        } else { // file is already opened.
892
893            if (config->options & FILEMGR_SYNC) {
894                file->fflags |= FILEMGR_SYNC;
895            } else {
896                file->fflags &= ~FILEMGR_SYNC;
897            }
898
899            spin_unlock(&file->lock);
900            spin_unlock(&filemgr_openlock);
901            result.file = file;
902            result.rv = FDB_RESULT_SUCCESS;
903            return result;
904        }
905    }
906
907    file_flag = O_RDWR;
908    if (create) {
909        file_flag |= O_CREAT;
910    }
911    file_flag |= config->flag;
912    fd = ops->open(filename, file_flag, 0666);
913    if (fd < 0) {
914        _log_errno_str(ops, log_callback, (fdb_status)fd, "OPEN", filename);
915        spin_unlock(&filemgr_openlock);
916        result.rv = fd;
917        return result;
918    }
919    file = (struct filemgr*)calloc(1, sizeof(struct filemgr));
920    file->filename_len = strlen(filename);
921    file->filename = (char*)malloc(file->filename_len + 1);
922    strcpy(file->filename, filename);
923
924    atomic_init_uint32_t(&file->ref_count, 1);
925    file->stale_list = NULL;
926
927    status = fdb_init_encryptor(&file->encryption, &config->encryption_key);
928    if (status != FDB_RESULT_SUCCESS) {
929        ops->close(fd);
930        free(file);
931        spin_unlock(&filemgr_openlock);
932        result.rv = status;
933        return result;
934    }
935
936    file->wal = (struct wal *)calloc(1, sizeof(struct wal));
937    file->wal->flag = 0;
938
939    file->ops = ops;
940    file->blocksize = global_config.blocksize;
941    atomic_init_uint8_t(&file->status, FILE_NORMAL);
942    file->config = (struct filemgr_config*)malloc(sizeof(struct filemgr_config));
943    *file->config = *config;
944    file->config->blocksize = global_config.blocksize;
945    file->config->ncacheblock = global_config.ncacheblock;
946    file->old_filename = NULL;
947    file->new_filename = NULL;
948    file->fd = fd;
949
950    cs_off_t offset = file->ops->goto_eof(file->fd);
951    if (offset < 0) {
952        _log_errno_str(file->ops, log_callback, (fdb_status) offset, "SEEK_END", filename);
953        file->ops->close(file->fd);
954        free(file->wal);
955        free(file->filename);
956        free(file->config);
957        free(file);
958        spin_unlock(&filemgr_openlock);
959        result.rv = (fdb_status) offset;
960        return result;
961    }
962    atomic_init_uint64_t(&file->last_commit, offset);
963    atomic_init_uint64_t(&file->last_writable_bmp_revnum, 0);
964    atomic_init_uint64_t(&file->pos, offset);
965    atomic_init_uint32_t(&file->throttling_delay, 0);
966    atomic_init_uint64_t(&file->num_invalidated_blocks, 0);
967    atomic_init_uint8_t(&file->io_in_prog, 0);
968
969#ifdef _LATENCY_STATS
970    for (int i = 0; i < FDB_LATENCY_NUM_STATS; ++i) {
971        filemgr_init_latency_stat(&file->lat_stats[i]);
972    }
973#endif // _LATENCY_STATS
974
975    file->bcache = NULL;
976    file->in_place_compaction = false;
977    file->kv_header = NULL;
978    atomic_init_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_IDLE);
979
980    atomic_init_uint64_t(&file->header.bid, 0);
981    _init_op_stats(&file->header.op_stat);
982
983    spin_init(&file->lock);
984    file->stale_list = (struct list*)calloc(1, sizeof(struct list));
985    list_init(file->stale_list);
986    avl_init(&file->stale_info_tree, NULL);
987    avl_init(&file->mergetree, NULL);
988    file->stale_info_tree_loaded = false;
989
990    filemgr_dirty_update_init(file);
991
992    spin_init(&file->fhandle_idx_lock);
993    avl_init(&file->fhandle_idx, NULL);
994
995#ifdef __FILEMGR_DATA_PARTIAL_LOCK
996    struct plock_ops pops;
997    struct plock_config pconfig;
998
999    pops.init_user = mutex_init_wrap;
1000    pops.lock_user = mutex_lock_wrap;
1001    pops.unlock_user = mutex_unlock_wrap;
1002    pops.destroy_user = mutex_destroy_wrap;
1003    pops.init_internal = spin_init_wrap;
1004    pops.lock_internal = spin_lock_wrap;
1005    pops.unlock_internal = spin_unlock_wrap;
1006    pops.destroy_internal = spin_destroy_wrap;
1007    pops.is_overlapped = _block_is_overlapped;
1008
1009    memset(&pconfig, 0x0, sizeof(pconfig));
1010    pconfig.ops = &pops;
1011    pconfig.sizeof_lock_internal = sizeof(spin_t);
1012    pconfig.sizeof_lock_user = sizeof(mutex_t);
1013    pconfig.sizeof_range = sizeof(bid_t);
1014    pconfig.aux = NULL;
1015    plock_init(&file->plock, &pconfig);
1016#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1017    int i;
1018    for (i=0;i<DLOCK_MAX;++i) {
1019        mutex_init(&file->data_mutex[i]);
1020    }
1021#else
1022    int i;
1023    for (i=0;i<DLOCK_MAX;++i) {
1024        spin_init(&file->data_spinlock[i]);
1025    }
1026#endif //__FILEMGR_DATA_PARTIAL_LOCK
1027
1028    mutex_init(&file->writer_lock.mutex);
1029    file->writer_lock.locked = false;
1030
1031    // Note: CRC must be initialized before superblock loading
1032    // initialize CRC mode
1033    if (file->config && file->config->options & FILEMGR_CREATE_CRC32) {
1034        file->crc_mode = CRC32;
1035    } else {
1036        file->crc_mode = CRC_DEFAULT;
1037    }
1038
1039    do { // repeat until both superblock and DB header are correctly read
1040        // init or load superblock
1041        status = _filemgr_load_sb(file, log_callback);
1042        // we can tolerate SB_READ_FAIL for old version file
1043        if (status != FDB_RESULT_SB_READ_FAIL &&
1044            status != FDB_RESULT_SUCCESS) {
1045            _log_errno_str(file->ops, log_callback, status, "READ", file->filename);
1046            file->ops->close(file->fd);
1047            free(file->stale_list);
1048            free(file->wal);
1049            free(file->filename);
1050            free(file->config);
1051            free(file);
1052            spin_unlock(&filemgr_openlock);
1053            result.rv = status;
1054            return result;
1055        }
1056
1057        // read header
1058        status = _filemgr_read_header(file, log_callback);
1059        if (file->sb && status == FDB_RESULT_NO_DB_HEADERS) {
1060            // this happens when user created & closed a file without any mutations,
1061            // thus there is no other data but superblocks.
1062            // we can tolerate this case.
1063        } else if (status != FDB_RESULT_SUCCESS) {
1064            _log_errno_str(file->ops, log_callback, status, "READ", filename);
1065            file->ops->close(file->fd);
1066            if (file->sb) {
1067                sb_ops.release(file);
1068            }
1069            free(file->stale_list);
1070            free(file->wal);
1071            free(file->filename);
1072            free(file->config);
1073            free(file);
1074            spin_unlock(&filemgr_openlock);
1075            result.rv = status;
1076            return result;
1077        }
1078
1079        if (file->sb &&
1080            file->header.revnum != atomic_get_uint64_t(&file->sb->last_hdr_revnum)) {
1081            // superblock exists but the corresponding DB header does not match.
1082            // read another candidate.
1083            continue;
1084        }
1085
1086        break;
1087    } while (true);
1088
1089    // initialize WAL
1090    if (!wal_is_initialized(file)) {
1091        wal_init(file, FDB_WAL_NBUCKET);
1092    }
1093
1094    // init global transaction for the file
1095    file->global_txn.wrapper = (struct wal_txn_wrapper*)
1096                               malloc(sizeof(struct wal_txn_wrapper));
1097    file->global_txn.wrapper->txn = &file->global_txn;
1098    file->global_txn.handle = NULL;
1099    if (atomic_get_uint64_t(&file->pos)) {
1100        file->global_txn.prev_hdr_bid =
1101            (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1;
1102    } else {
1103        file->global_txn.prev_hdr_bid = BLK_NOT_FOUND;
1104    }
1105    file->global_txn.prev_revnum = 0;
1106    file->global_txn.items = (struct list *)malloc(sizeof(struct list));
1107    list_init(file->global_txn.items);
1108    file->global_txn.isolation = FDB_ISOLATION_READ_COMMITTED;
1109    wal_add_transaction(file, &file->global_txn);
1110
1111    hash_insert(&hash, &file->e);
1112    if (config->prefetch_duration > 0) {
1113        filemgr_prefetch(file, config, log_callback);
1114    }
1115
1116    spin_unlock(&filemgr_openlock);
1117
1118    if (config->options & FILEMGR_SYNC) {
1119        file->fflags |= FILEMGR_SYNC;
1120    } else {
1121        file->fflags &= ~FILEMGR_SYNC;
1122    }
1123
1124    result.file = file;
1125    result.rv = FDB_RESULT_SUCCESS;
1126    fdb_log(log_callback, FDB_RESULT_SUCCESS, "Forestdb opened database file %s",
1127            filename);
1128
1129    return result;
1130}
1131
1132uint64_t filemgr_update_header(struct filemgr *file,
1133                               void *buf,
1134                               size_t len,
1135                               bool inc_revnum)
1136{
1137    uint64_t ret;
1138
1139    spin_lock(&file->lock);
1140
1141    if (file->header.data == NULL) {
1142        file->header.data = (void *)malloc(file->blocksize);
1143    }
1144    memcpy(file->header.data, buf, len);
1145    file->header.size = len;
1146    if (inc_revnum) {
1147        ++(file->header.revnum);
1148    }
1149    ret = file->header.revnum;
1150
1151    spin_unlock(&file->lock);
1152
1153    return ret;
1154}
1155
1156filemgr_header_revnum_t filemgr_get_header_revnum(struct filemgr *file)
1157{
1158    filemgr_header_revnum_t ret;
1159    spin_lock(&file->lock);
1160    ret = file->header.revnum;
1161    spin_unlock(&file->lock);
1162    return ret;
1163}
1164
1165// 'filemgr_get_seqnum', 'filemgr_set_seqnum',
1166// 'filemgr_get_walflush_revnum', 'filemgr_set_walflush_revnum'
1167// have to be protected by 'filemgr_mutex_lock' & 'filemgr_mutex_unlock'.
1168fdb_seqnum_t filemgr_get_seqnum(struct filemgr *file)
1169{
1170    return file->header.seqnum;
1171}
1172
1173void filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)
1174{
1175    file->header.seqnum = seqnum;
1176}
1177
1178void* filemgr_get_header(struct filemgr *file, void *buf, size_t *len,
1179                         bid_t *header_bid, fdb_seqnum_t *seqnum,
1180                         filemgr_header_revnum_t *header_revnum)
1181{
1182    spin_lock(&file->lock);
1183
1184    if (file->header.size > 0) {
1185        if (buf == NULL) {
1186            buf = (void*)malloc(file->header.size);
1187        }
1188        memcpy(buf, file->header.data, file->header.size);
1189    }
1190
1191    if (len) {
1192        *len = file->header.size;
1193    }
1194    if (header_bid) {
1195        *header_bid = filemgr_get_header_bid(file);
1196    }
1197    if (seqnum) {
1198        *seqnum = file->header.seqnum;
1199    }
1200    if (header_revnum) {
1201        *header_revnum = file->header.revnum;
1202    }
1203
1204    spin_unlock(&file->lock);
1205
1206    return buf;
1207}
1208
1209fdb_status filemgr_fetch_header(struct filemgr *file, uint64_t bid,
1210                                void *buf, size_t *len, fdb_seqnum_t *seqnum,
1211                                filemgr_header_revnum_t *header_revnum,
1212                                uint64_t *deltasize, uint64_t *version,
1213                                uint64_t *sb_bmp_revnum,
1214                                err_log_callback *log_callback)
1215{
1216    uint8_t *_buf;
1217    uint8_t marker[BLK_MARKER_SIZE];
1218    filemgr_header_len_t hdr_len;
1219    uint64_t _deltasize, _bmp_revnum;
1220    filemgr_magic_t magic;
1221    fdb_status status = FDB_RESULT_SUCCESS;
1222
1223    *len = 0;
1224
1225    if (!bid || bid == BLK_NOT_FOUND) {
1226        // No other header available
1227        return FDB_RESULT_SUCCESS;
1228    }
1229
1230    _buf = (uint8_t *)_filemgr_get_temp_buf();
1231
1232    status = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
1233
1234    if (status != FDB_RESULT_SUCCESS) {
1235        fdb_log(log_callback, status,
1236                "Failed to read a database header with block id %" _F64 " in "
1237                "a database file '%s'", bid, file->filename);
1238        _filemgr_release_temp_buf(_buf);
1239        return status;
1240    }
1241    memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1242            BLK_MARKER_SIZE);
1243
1244    if (marker[0] != BLK_MARKER_DBHEADER) {
1245        // Comment this warning log as of now because the circular block reuse
1246        // can cause false alarms as a previous stale header block can be reclaimed
1247        // and reused for incoming writes.
1248        /*
1249        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1250                "A block marker of the database header block id %" _F64 " in "
1251                "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1252                bid, file->filename);
1253        */
1254        _filemgr_release_temp_buf(_buf);
1255        return FDB_RESULT_READ_FAIL;
1256    }
1257    memcpy(&magic,
1258            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1259            sizeof(magic));
1260    magic = _endian_decode(magic);
1261    if (!ver_is_valid_magic(magic)) {
1262        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1263                "A block magic value of %" _F64 " in the database header block"
1264                "id %" _F64 " in a database file '%s'"
1265                "does NOT match FILEMGR_MAGIC %" _F64 "!",
1266                magic, bid, file->filename, ver_get_latest_magic());
1267        _filemgr_release_temp_buf(_buf);
1268        return FDB_RESULT_FILE_CORRUPTION;
1269    }
1270    memcpy(&hdr_len,
1271            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1272            sizeof(hdr_len), sizeof(hdr_len));
1273    hdr_len = _endian_decode(hdr_len);
1274
1275    memcpy(buf, _buf, hdr_len);
1276    *len = hdr_len;
1277    *version = magic;
1278
1279    if (header_revnum) {
1280        // copy the DB header revnum
1281        filemgr_header_revnum_t _revnum;
1282        memcpy(&_revnum, _buf + hdr_len, sizeof(_revnum));
1283        *header_revnum = _endian_decode(_revnum);
1284    }
1285    if (seqnum) {
1286        // copy default KVS's seqnum
1287        fdb_seqnum_t _seqnum;
1288        memcpy(&_seqnum, _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1289               sizeof(_seqnum));
1290        *seqnum = _endian_decode(_seqnum);
1291    }
1292
1293    if (ver_is_atleast_magic_001(magic)) {
1294        if (deltasize) {
1295            memcpy(&_deltasize, _buf + file->blocksize - BLK_MARKER_SIZE
1296                    - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1297                    - sizeof(_deltasize), sizeof(_deltasize));
1298            *deltasize = _endian_decode(_deltasize);
1299        }
1300    }
1301
1302    if (sb_bmp_revnum && ver_superblock_support(magic)) {
1303        memcpy(&_bmp_revnum, _buf + file->blocksize - BLK_MARKER_SIZE
1304                - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1305                - sizeof(_deltasize) - sizeof(_bmp_revnum), sizeof(_bmp_revnum));
1306        *sb_bmp_revnum = _endian_decode(_bmp_revnum);
1307    }
1308
1309    _filemgr_release_temp_buf(_buf);
1310
1311    return status;
1312}
1313
1314uint64_t filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid,
1315                                   void *buf, size_t *len, fdb_seqnum_t *seqnum,
1316                                   filemgr_header_revnum_t *revnum,
1317                                   uint64_t *deltasize, uint64_t *version,
1318                                   uint64_t *sb_bmp_revnum,
1319                                   err_log_callback *log_callback)
1320{
1321    uint8_t *_buf;
1322    uint8_t marker[BLK_MARKER_SIZE];
1323    fdb_seqnum_t _seqnum;
1324    filemgr_header_revnum_t _revnum, cur_revnum, prev_revnum;
1325    filemgr_header_len_t hdr_len;
1326    filemgr_magic_t magic;
1327    bid_t _prev_bid, prev_bid;
1328    uint64_t _deltasize, _bmp_revnum;
1329    int found = 0;
1330
1331    *len = 0;
1332
1333    if (!bid || bid == BLK_NOT_FOUND) {
1334        // No other header available
1335        return bid;
1336    }
1337    _buf = (uint8_t *)_filemgr_get_temp_buf();
1338
1339    // Reverse scan the file for a previous DB header
1340    do {
1341        // Get prev_bid from the current header.
1342        // Since the current header is already cached during the previous
1343        // operation, no disk I/O will be triggered.
1344        if (filemgr_read(file, (bid_t)bid, _buf, log_callback, true)
1345                != FDB_RESULT_SUCCESS) {
1346            break;
1347        }
1348
1349        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1350               BLK_MARKER_SIZE);
1351        memcpy(&magic,
1352               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1353               sizeof(magic));
1354        magic = _endian_decode(magic);
1355
1356        if (marker[0] != BLK_MARKER_DBHEADER ||
1357            !ver_is_valid_magic(magic)) {
1358            // not a header block
1359            // this happens when this function is invoked between
1360            // fdb_set() call and fdb_commit() call, so the last block
1361            // in the file is not a header block
1362            bid_t latest_hdr = filemgr_get_header_bid(file);
1363            if (latest_hdr != BLK_NOT_FOUND && bid > latest_hdr) {
1364                // get the latest header BID
1365                bid = latest_hdr;
1366            } else {
1367                break;
1368            }
1369            cur_revnum = file->header.revnum + 1;
1370        } else {
1371
1372            memcpy(&hdr_len,
1373                   _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1374                   sizeof(hdr_len), sizeof(hdr_len));
1375            hdr_len = _endian_decode(hdr_len);
1376
1377            memcpy(&_revnum, _buf + hdr_len,
1378                   sizeof(filemgr_header_revnum_t));
1379            cur_revnum = _endian_decode(_revnum);
1380
1381            if (sb_bmp_exists(file->sb)) {
1382                // first check revnum
1383                if (cur_revnum <= sb_ops.get_min_live_revnum(file)) {
1384                    // previous headers already have been reclaimed
1385                    // no more logical prev header
1386                    break;
1387                }
1388            }
1389
1390            memcpy(&_prev_bid,
1391                   _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1392                       sizeof(hdr_len) - sizeof(_prev_bid),
1393                   sizeof(_prev_bid));
1394            prev_bid = _endian_decode(_prev_bid);
1395            bid = prev_bid;
1396        }
1397
1398        // Read the prev header
1399        fdb_status fs = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
1400        if (fs != FDB_RESULT_SUCCESS) {
1401            fdb_log(log_callback, fs,
1402                    "Failed to read a previous database header with block id %"
1403                    _F64 " in "
1404                    "a database file '%s'", bid, file->filename);
1405            break;
1406        }
1407
1408        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1409               BLK_MARKER_SIZE);
1410        if (marker[0] != BLK_MARKER_DBHEADER) {
1411            if (bid) {
1412                // broken linked list
1413                fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1414                        "A block marker of the previous database header block id %"
1415                        _F64 " in "
1416                        "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1417                        bid, file->filename);
1418            }
1419            break;
1420        }
1421
1422        memcpy(&magic,
1423               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1424               sizeof(magic));
1425        magic = _endian_decode(magic);
1426        if (!ver_is_valid_magic(magic)) {
1427            // broken linked list
1428            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1429                    "A block magic value of %" _F64
1430                    " of the previous database header block id %" _F64 " in "
1431                    "a database file '%s' does NOT match FILEMGR_MAGIC %"
1432                    _F64"!", magic,
1433                    bid, file->filename, ver_get_latest_magic());
1434            break;
1435        }
1436
1437        memcpy(&hdr_len,
1438               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1439               sizeof(hdr_len), sizeof(hdr_len));
1440        hdr_len = _endian_decode(hdr_len);
1441
1442        if (buf) {
1443            memcpy(buf, _buf, hdr_len);
1444        }
1445        memcpy(&_revnum, _buf + hdr_len,
1446               sizeof(filemgr_header_revnum_t));
1447        prev_revnum = _endian_decode(_revnum);
1448        if (prev_revnum >= cur_revnum ||
1449            prev_revnum < sb_ops.get_min_live_revnum(file)) {
1450            // no more prev header, or broken linked list
1451            break;
1452        }
1453
1454        memcpy(&_seqnum,
1455               _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1456               sizeof(fdb_seqnum_t));
1457        if (ver_is_atleast_magic_001(magic)) {
1458            if (deltasize) {
1459                memcpy(&_deltasize,
1460                        _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic)
1461                       - sizeof(hdr_len) - sizeof(prev_bid) - sizeof(_deltasize),
1462                        sizeof(_deltasize));
1463                *deltasize = _endian_decode(_deltasize);
1464            }
1465        }
1466
1467        if (sb_bmp_revnum && ver_superblock_support(magic)) {
1468            memcpy(&_bmp_revnum, _buf + file->blocksize - BLK_MARKER_SIZE
1469                    - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1470                    - sizeof(_deltasize) - sizeof(_bmp_revnum), sizeof(_bmp_revnum));
1471            *sb_bmp_revnum = _endian_decode(_bmp_revnum);
1472        }
1473
1474        if (revnum) {
1475            *revnum = prev_revnum;
1476        }
1477        *seqnum = _endian_decode(_seqnum);
1478        *len = hdr_len;
1479        *version = magic;
1480        found = 1;
1481        break;
1482    } while (false); // no repetition
1483
1484    if (!found) { // no other header found till end of file
1485        *len = 0;
1486        bid = BLK_NOT_FOUND;
1487    }
1488
1489    _filemgr_release_temp_buf(_buf);
1490
1491    return bid;
1492}
1493
1494fdb_status filemgr_close(struct filemgr *file, bool cleanup_cache_onclose,
1495                         const char *orig_file_name,
1496                         err_log_callback *log_callback)
1497{
1498    int rv = FDB_RESULT_SUCCESS;
1499
1500    if (atomic_decr_uint32_t(&file->ref_count) > 0) {
1501        // File is still accessed by other readers or writers.
1502        return FDB_RESULT_SUCCESS;
1503    }
1504
1505    fdb_log(log_callback, (fdb_status)rv, "Forestdb closed database file %s",
1506            file->filename);
1507
1508    spin_lock(&filemgr_openlock); // Grab the filemgr lock to avoid the race with
1509                                  // filemgr_open() because file->lock won't
1510                                  // prevent the race condition.
1511
1512    // remove filemgr structure if no thread refers to the file
1513    spin_lock(&file->lock);
1514    if (atomic_get_uint32_t(&file->ref_count) == 0) {
1515        if (global_config.ncacheblock > 0 &&
1516            atomic_get_uint8_t(&file->status) != FILE_REMOVED_PENDING) {
1517            spin_unlock(&file->lock);
1518            // discard all dirty blocks belonged to this file
1519            bcache_remove_dirty_blocks(file);
1520        } else {
1521            // If the file is in pending removal (i.e., FILE_REMOVED_PENDING),
1522            // then its dirty block entries will be cleaned up in either
1523            // filemgr_free_func() or register_file_removal() below.
1524            spin_unlock(&file->lock);
1525        }
1526
1527        if (wal_is_initialized(file)) {
1528            wal_close(file, log_callback);
1529        }
1530#ifdef _LATENCY_STATS_DUMP_TO_FILE
1531        filemgr_dump_latency_stat(file, log_callback);
1532#endif // _LATENCY_STATS_DUMP_TO_FILE
1533
1534        spin_lock(&file->lock);
1535
1536        if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING) {
1537
1538            bool foreground_deletion = false;
1539            struct filemgr* new_file = get_instance_UNLOCKED(file->new_filename);
1540
1541            // immediately remove file if background remove function is not set
1542            if (!lazy_file_deletion_enabled ||
1543                (new_file && new_file->in_place_compaction)) {
1544                // TODO: to avoid the scenario below, we prevent background
1545                //       deletion of in-place compacted files at this time.
1546                // 1) In-place compacted from 'A' to 'A.1'.
1547                // 2) Request to delete 'A'.
1548                // 3) Close 'A.1'; since 'A' is not deleted yet, 'A.1' is not renamed.
1549                // 4) User opens DB file using its original name 'A', not 'A.1'.
1550                // 5) Old file 'A' is opened, and then background thread deletes 'A'.
1551                // 6) Crash!
1552
1553                // As the file is already unlinked, the file will be removed
1554                // as soon as we close it.
1555                rv = file->ops->close(file->fd);
1556                _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1557#if defined(WIN32) || defined(_WIN32)
1558                // For Windows, we need to manually remove the file.
1559                remove(file->filename);
1560#endif
1561                foreground_deletion = true;
1562            }
1563
1564            // we can release lock becuase no one will open this file
1565            spin_unlock(&file->lock);
1566            struct hash_elem *ret = hash_remove(&hash, &file->e);
1567            fdb_assert(ret, 0, 0);
1568
1569            spin_unlock(&filemgr_openlock);
1570
1571            if (foreground_deletion) {
1572                filemgr_free_func(&file->e);
1573            } else {
1574                register_file_removal(file, log_callback);
1575            }
1576            return (fdb_status) rv;
1577        } else {
1578
1579            rv = file->ops->close(file->fd);
1580            if (cleanup_cache_onclose) {
1581                _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1582                if (file->in_place_compaction && orig_file_name) {
1583                    struct hash_elem *elem = NULL;
1584                    struct filemgr query;
1585                    uint32_t old_file_refcount = 0;
1586
1587                    query.filename = (char *)orig_file_name;
1588                    elem = hash_find(&hash, &query.e);
1589
1590                    if (file->old_filename) {
1591                        struct hash_elem *elem_old = NULL;
1592                        struct filemgr query_old;
1593                        struct filemgr *old_file = NULL;
1594
1595                        // get old file's ref count if exists
1596                        query_old.filename = file->old_filename;
1597                        elem_old = hash_find(&hash, &query_old.e);
1598                        if (elem_old) {
1599                            old_file = _get_entry(elem_old, struct filemgr, e);
1600                            old_file_refcount = atomic_get_uint32_t(&old_file->ref_count);
1601                        }
1602                    }
1603
1604                    // If old file is opened by other handle, renaming should be
1605                    // postponed. It will be renamed later by the handle referring
1606                    // to the old file.
1607                    if (!elem && old_file_refcount == 0 &&
1608                        is_file_removed(orig_file_name)) {
1609                        // If background file removal is not done yet, we postpone
1610                        // file renaming at this time.
1611                        if (rename(file->filename, orig_file_name) < 0) {
1612                            // Note that the renaming failure is not a critical
1613                            // issue because the last compacted file will be automatically
1614                            // identified and opened in the next fdb_open call.
1615                            _log_errno_str(file->ops, log_callback, FDB_RESULT_FILE_RENAME_FAIL,
1616                                           "CLOSE", file->filename);
1617                        }
1618                    }
1619                }
1620                spin_unlock(&file->lock);
1621                // Clean up global hash table, WAL index, and buffer cache.
1622                struct hash_elem *ret = hash_remove(&hash, &file->e);
1623                fdb_assert(ret, file, 0);
1624
1625                spin_unlock(&filemgr_openlock);
1626
1627                filemgr_free_func(&file->e);
1628                return (fdb_status) rv;
1629            } else {
1630                atomic_store_uint8_t(&file->status, FILE_CLOSED);
1631            }
1632        }
1633    }
1634
1635    _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1636
1637    spin_unlock(&file->lock);
1638    spin_unlock(&filemgr_openlock);
1639    return (fdb_status) rv;
1640}
1641
1642void filemgr_remove_all_buffer_blocks(struct filemgr *file)
1643{
1644    // remove all cached blocks
1645    if (global_config.ncacheblock > 0 &&
1646            file->bcache.load(std::memory_order_relaxed)) {
1647        bcache_remove_dirty_blocks(file);
1648        bcache_remove_clean_blocks(file);
1649        bcache_remove_file(file);
1650        file->bcache.store(NULL, std::memory_order_relaxed);
1651    }
1652}
1653
1654void _free_fhandle_idx(struct avl_tree *idx);
1655void filemgr_free_func(struct hash_elem *h)
1656{
1657    struct filemgr *file = _get_entry(h, struct filemgr, e);
1658
1659    filemgr_prefetch_status_t prefetch_state =
1660                              atomic_get_uint8_t(&file->prefetch_status);
1661
1662    atomic_store_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_ABORT);
1663    if (prefetch_state == FILEMGR_PREFETCH_RUNNING) {
1664        // prefetch thread was running
1665        void *ret;
1666        // wait (the thread must have been created..)
1667        thread_join(file->prefetch_tid, &ret);
1668    }
1669
1670    // remove all cached blocks
1671    if (global_config.ncacheblock > 0 &&
1672            file->bcache.load(std::memory_order_relaxed)) {
1673        bcache_remove_dirty_blocks(file);
1674        bcache_remove_clean_blocks(file);
1675        bcache_remove_file(file);
1676        file->bcache.store(NULL, std::memory_order_relaxed);
1677    }
1678
1679    if (file->kv_header) {
1680        // multi KV intance mode & KV header exists
1681        file->free_kv_header(file);
1682    }
1683
1684    // free global transaction
1685    wal_remove_transaction(file, &file->global_txn);
1686    free(file->global_txn.items);
1687    free(file->global_txn.wrapper);
1688
1689    // destroy WAL
1690    if (wal_is_initialized(file)) {
1691        wal_shutdown(file, NULL);
1692        wal_destroy(file);
1693    }
1694    free(file->wal);
1695
1696#ifdef _LATENCY_STATS
1697    for (int x = 0; x < FDB_LATENCY_NUM_STATS; ++x) {
1698        filemgr_destroy_latency_stat(&file->lat_stats[x]);
1699    }
1700#endif // _LATENCY_STATS
1701
1702    // free filename and header
1703    free(file->filename);
1704    if (file->header.data) free(file->header.data);
1705
1706    // free old/new filename if any
1707    free(file->old_filename);
1708    free(file->new_filename);
1709
1710    // destroy locks
1711    spin_destroy(&file->lock);
1712
1713#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1714    plock_destroy(&file->plock);
1715#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1716    int i;
1717    for (i=0;i<DLOCK_MAX;++i) {
1718        mutex_destroy(&file->data_mutex[i]);
1719    }
1720#else
1721    int i;
1722    for (i=0;i<DLOCK_MAX;++i) {
1723        spin_destroy(&file->data_spinlock[i]);
1724    }
1725#endif //__FILEMGR_DATA_PARTIAL_LOCK
1726
1727    mutex_destroy(&file->writer_lock.mutex);
1728
1729    // free superblock
1730    if (sb_ops.release) {
1731        sb_ops.release(file);
1732    }
1733
1734    // free dirty update index
1735    filemgr_dirty_update_free(file);
1736
1737    // free fhandle idx
1738    _free_fhandle_idx(&file->fhandle_idx);
1739    spin_destroy(&file->fhandle_idx_lock);
1740
1741    // free file structure
1742    struct list *stale_list = filemgr_get_stale_list(file);
1743    filemgr_clear_stale_list(file);
1744    filemgr_clear_stale_info_tree(file);
1745    filemgr_clear_mergetree(file);
1746    free(stale_list);
1747    free(file->config);
1748    free(file);
1749}
1750
1751// permanently remove file from cache (not just close)
1752// LCOV_EXCL_START
1753void filemgr_remove_file(struct filemgr *file, err_log_callback *log_callback)
1754{
1755    struct hash_elem *ret;
1756
1757    if (!file || atomic_get_uint32_t(&file->ref_count) > 0) {
1758        return;
1759    }
1760
1761    // remove from global hash table
1762    spin_lock(&filemgr_openlock);
1763    ret = hash_remove(&hash, &file->e);
1764    fdb_assert(ret, ret, NULL);
1765    spin_unlock(&filemgr_openlock);
1766
1767    struct filemgr *new_file = filemgr_get_instance(file->new_filename);
1768
1769    if (!lazy_file_deletion_enabled ||
1770        (new_file && new_file->in_place_compaction)) {
1771        filemgr_free_func(&file->e);
1772    } else {
1773        register_file_removal(file, log_callback);
1774    }
1775}
1776// LCOV_EXCL_STOP
1777
1778static
1779void *_filemgr_is_closed(struct hash_elem *h, void *ctx) {
1780    struct filemgr *file = _get_entry(h, struct filemgr, e);
1781    void *ret;
1782    spin_lock(&file->lock);
1783    if (atomic_get_uint32_t(&file->ref_count) != 0) {
1784        ret = (void *)file;
1785    } else {
1786        ret = NULL;
1787    }
1788    spin_unlock(&file->lock);
1789    return ret;
1790}
1791
1792fdb_status filemgr_shutdown()
1793{
1794    fdb_status ret = FDB_RESULT_SUCCESS;
1795    void *open_file;
1796    if (filemgr_initialized) {
1797
1798#ifndef SPIN_INITIALIZER
1799        // Windows: check if spin lock is already destroyed.
1800        if (InterlockedCompareExchange(&initial_lock_status, 1, 2) == 2) {
1801            spin_lock(&initial_lock);
1802        } else {
1803            // filemgr is already shut down
1804            return ret;
1805        }
1806#else
1807        spin_lock(&initial_lock);
1808#endif
1809
1810        if (!filemgr_initialized) {
1811            // filemgr is already shut down
1812#ifdef SPIN_INITIALIZER
1813            spin_unlock(&initial_lock);
1814#endif
1815            return ret;
1816        }
1817
1818        spin_lock(&filemgr_openlock);
1819        open_file = hash_scan(&hash, _filemgr_is_closed, NULL);
1820        spin_unlock(&filemgr_openlock);
1821        if (!open_file) {
1822            hash_free_active(&hash, filemgr_free_func);
1823            if (global_config.ncacheblock > 0) {
1824                bcache_shutdown();
1825            }
1826            filemgr_initialized = 0;
1827#ifndef SPIN_INITIALIZER
1828            initial_lock_status = 0;
1829#else
1830            initial_lock = SPIN_INITIALIZER;
1831#endif
1832            _filemgr_shutdown_temp_buf();
1833            spin_unlock(&initial_lock);
1834#ifndef SPIN_INITIALIZER
1835            spin_destroy(&initial_lock);
1836#endif
1837        } else {
1838            spin_unlock(&initial_lock);
1839            ret = FDB_RESULT_FILE_IS_BUSY;
1840        }
1841    }
1842    return ret;
1843}
1844
1845bid_t filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)
1846{
1847    spin_lock(&file->lock);
1848    bid_t bid = BLK_NOT_FOUND;
1849
1850    // block reusing is not allowed for being compacted file
1851    // for easy implementation.
1852    if (filemgr_get_file_status(file) == FILE_NORMAL &&
1853        file->sb && sb_ops.alloc_block) {
1854        bid = sb_ops.alloc_block(file);
1855    }
1856    if (bid == BLK_NOT_FOUND) {
1857        bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1858        atomic_add_uint64_t(&file->pos, file->blocksize);
1859    }
1860
1861    if (global_config.ncacheblock <= 0) {
1862        // if block cache is turned off, write the allocated block before use
1863        uint8_t _buf = 0x0;
1864        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1865                                       (bid+1) * file->blocksize - 1);
1866        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1867    }
1868    spin_unlock(&file->lock);
1869
1870    return bid;
1871}
1872
1873// Note that both alloc_multiple & alloc_multiple_cond are not used in
1874// the new version of DB file (with superblock support).
1875void filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin,
1876                            bid_t *end, err_log_callback *log_callback)
1877{
1878    spin_lock(&file->lock);
1879    *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1880    *end = *begin + nblock - 1;
1881    atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1882
1883    if (global_config.ncacheblock <= 0) {
1884        // if block cache is turned off, write the allocated block before use
1885        uint8_t _buf = 0x0;
1886        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1887                                       atomic_get_uint64_t(&file->pos) - 1);
1888        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1889    }
1890    spin_unlock(&file->lock);
1891}
1892
1893// atomically allocate NBLOCK blocks only when current file position is same to nextbid
1894bid_t filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock,
1895                                  bid_t *begin, bid_t *end,
1896                                  err_log_callback *log_callback)
1897{
1898    bid_t bid;
1899    spin_lock(&file->lock);
1900    bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1901    if (bid == nextbid) {
1902        *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1903        *end = *begin + nblock - 1;
1904        atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1905
1906        if (global_config.ncacheblock <= 0) {
1907            // if block cache is turned off, write the allocated block before use
1908            uint8_t _buf = 0x0;
1909            ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1910                                           atomic_get_uint64_t(&file->pos));
1911            _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1912        }
1913    }else{
1914        *begin = BLK_NOT_FOUND;
1915        *end = BLK_NOT_FOUND;
1916    }
1917    spin_unlock(&file->lock);
1918    return bid;
1919}
1920
1921#ifdef __CRC32
1922INLINE fdb_status _filemgr_crc32_check(struct filemgr *file, void *buf)
1923{
1924    if ( *((uint8_t*)buf + file->blocksize-1) == BLK_MARKER_BNODE ) {
1925        uint32_t crc_file = 0;
1926        memcpy(&crc_file, (uint8_t *) buf + BTREE_CRC_OFFSET, sizeof(crc_file));
1927        crc_file = _endian_decode(crc_file);
1928        memset((uint8_t *) buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1929        if (!perform_integrity_check(reinterpret_cast<const uint8_t*>(buf),
1930                                     file->blocksize,
1931                                     crc_file,
1932                                     file->crc_mode)) {
1933            return FDB_RESULT_CHECKSUM_ERROR;
1934        }
1935    }
1936    return FDB_RESULT_SUCCESS;
1937}
1938#endif
1939
1940bool filemgr_invalidate_block(struct filemgr *file, bid_t bid)
1941{
1942    bool ret;
1943    if (atomic_get_uint64_t(&file->last_commit) < bid * file->blocksize) {
1944        ret = true; // block invalidated was allocated recently (uncommitted)
1945    } else {
1946        ret = false; // a block from the past is invalidated (committed)
1947    }
1948    if (global_config.ncacheblock > 0) {
1949        bcache_invalidate_block(file, bid);
1950    }
1951    return ret;
1952}
1953
1954bool filemgr_is_fully_resident(struct filemgr *file)
1955{
1956    bool ret = false;
1957    if (global_config.ncacheblock > 0) {
1958        //TODO: A better thing to do is to track number of document blocks
1959        // and only compare those with the cached document block count
1960        double num_cached_blocks = (double)bcache_get_num_blocks(file);
1961        uint64_t num_blocks = atomic_get_uint64_t(&file->pos)
1962                                 / file->blocksize;
1963        double num_fblocks = (double)num_blocks;
1964        if (num_cached_blocks > num_fblocks * FILEMGR_RESIDENT_THRESHOLD) {
1965            ret = true;
1966        }
1967    }
1968    return ret;
1969}
1970
1971uint64_t filemgr_flush_immutable(struct filemgr *file,
1972                                   err_log_callback *log_callback)
1973{
1974    uint64_t ret = 0;
1975    if (global_config.ncacheblock > 0) {
1976        if (atomic_get_uint8_t(&file->io_in_prog)) {
1977            return 0;
1978        }
1979        ret = bcache_get_num_immutable(file);
1980        if (!ret) {
1981            return ret;
1982        }
1983        fdb_status rv = bcache_flush_immutable(file);
1984        if (rv != FDB_RESULT_SUCCESS) {
1985            _log_errno_str(file->ops, log_callback, (fdb_status)rv, "WRITE",
1986                           file->filename);
1987        }
1988        return bcache_get_num_immutable(file);
1989    }
1990
1991    return ret;
1992}
1993
1994fdb_status filemgr_read(struct filemgr *file, bid_t bid, void *buf,
1995                        err_log_callback *log_callback,
1996                        bool read_on_cache_miss)
1997{
1998    size_t lock_no;
1999    ssize_t r;
2000    uint64_t pos = bid * file->blocksize;
2001    fdb_status status = FDB_RESULT_SUCCESS;
2002    uint64_t curr_pos = atomic_get_uint64_t(&file->pos);
2003
2004    if (pos >= curr_pos) {
2005        const char *msg = "Read error: read offset %" _F64 " exceeds the file's "
2006                          "current offset %" _F64 " in a database file '%s'\n";
2007        fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, pos, curr_pos,
2008                file->filename);
2009        return FDB_RESULT_READ_FAIL;
2010    }
2011
2012    if (global_config.ncacheblock > 0) {
2013        lock_no = bid % DLOCK_MAX;
2014        (void)lock_no;
2015
2016#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2017        plock_entry_t *plock_entry = NULL;
2018        bid_t is_writer = 0;
2019#endif
2020        bool locked = false;
2021        // Note: we don't need to grab lock for committed blocks
2022        // because they are immutable so that no writer will interfere and
2023        // overwrite dirty data
2024        if (filemgr_is_writable(file, bid)) {
2025#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2026            plock_entry = plock_lock(&file->plock, &bid, &is_writer);
2027#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2028            mutex_lock(&file->data_mutex[lock_no]);
2029#else
2030            spin_lock(&file->data_spinlock[lock_no]);
2031#endif //__FILEMGR_DATA_PARTIAL_LOCK
2032            locked = true;
2033        }
2034
2035        r = bcache_read(file, bid, buf);
2036        if (r == 0) {
2037            // cache miss
2038            if (!read_on_cache_miss) {
2039                if (locked) {
2040#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2041                    plock_unlock(&file->plock, plock_entry);
2042#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2043                    mutex_unlock(&file->data_mutex[lock_no]);
2044#else
2045                    spin_unlock(&file->data_spinlock[lock_no]);
2046#endif //__FILEMGR_DATA_PARTIAL_LOCK
2047                }
2048                const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2049                    "doesn't exist in the cache and read_on_cache_miss flag is turned on.\n";
2050                fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, bid,
2051                        file->filename);
2052                return FDB_RESULT_READ_FAIL;
2053            }
2054
2055            // if normal file, just read a block
2056            r = filemgr_read_block(file, buf, bid);
2057            if (r != (ssize_t)file->blocksize) {
2058                _log_errno_str(file->ops, log_callback,
2059                               (fdb_status) r, "READ", file->filename);
2060                if (locked) {
2061#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2062                    plock_unlock(&file->plock, plock_entry);
2063#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2064                    mutex_unlock(&file->data_mutex[lock_no]);
2065#else
2066                    spin_unlock(&file->data_spinlock[lock_no]);
2067#endif //__FILEMGR_DATA_PARTIAL_LOCK
2068                }
2069                const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2070                    "is not read correctly: only %d bytes read.\n";
2071                status = r < 0 ? (fdb_status)r : FDB_RESULT_READ_FAIL;
2072                fdb_log(log_callback, status, msg, bid, file->filename, r);
2073                if (!log_callback || !log_callback->callback) {
2074                    dbg_print_buf(buf, file->blocksize, true, 16);
2075                }
2076                return status;
2077            }
2078#ifdef __CRC32
2079            status = _filemgr_crc32_check(file, buf);
2080            if (status != FDB_RESULT_SUCCESS) {
2081                _log_errno_str(file->ops, log_callback, status, "READ",
2082                        file->filename);
2083                if (locked) {
2084#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2085                    plock_unlock(&file->plock, plock_entry);
2086#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2087                    mutex_unlock(&file->data_mutex[lock_no]);
2088#else
2089                    spin_unlock(&file->data_spinlock[lock_no]);
2090#endif //__FILEMGR_DATA_PARTIAL_LOCK
2091                }
2092                const char *msg = "Read error: checksum error on BID %" _F64 " in a database file '%s' "
2093                    ": marker %x\n";
2094                fdb_log(log_callback, status, msg, bid,
2095                        file->filename, *((uint8_t*)buf + file->blocksize-1));
2096                if (!log_callback || !log_callback->callback) {
2097                    dbg_print_buf(buf, file->blocksize, true, 16);
2098                }
2099                return status;
2100            }
2101#endif
2102            r = bcache_write(file, bid, buf, BCACHE_REQ_CLEAN, false);
2103            if (r != global_config.blocksize) {
2104                if (locked) {
2105#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2106                    plock_unlock(&file->plock, plock_entry);
2107#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2108                    mutex_unlock(&file->data_mutex[lock_no]);
2109#else
2110                    spin_unlock(&file->data_spinlock[lock_no]);
2111#endif //__FILEMGR_DATA_PARTIAL_LOCK
2112                }
2113                _log_errno_str(file->ops, log_callback,
2114                               (fdb_status) r, "WRITE", file->filename);
2115                const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2116                    "is not written in cache correctly: only %d bytes written.\n";
2117                status = r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2118                fdb_log(log_callback, status, msg, bid, file->filename, r);
2119                if (!log_callback || !log_callback->callback) {
2120                    dbg_print_buf(buf, file->blocksize, true, 16);
2121                }
2122                return status;
2123            }
2124        }
2125        if (locked) {
2126#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2127            plock_unlock(&file->plock, plock_entry);
2128#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2129            mutex_unlock(&file->data_mutex[lock_no]);
2130#else
2131            spin_unlock(&file->data_spinlock[lock_no]);
2132#endif //__FILEMGR_DATA_PARTIAL_LOCK
2133        }
2134    } else {
2135        if (!read_on_cache_miss) {
2136            const char *msg = "Read error: BID %" _F64 " in a database file '%s':"
2137                "block cache is not enabled.\n";
2138            fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, bid,
2139                    file->filename);
2140            return FDB_RESULT_READ_FAIL;
2141        }
2142
2143        r = filemgr_read_block(file, buf, bid);
2144        if (r != (ssize_t)file->blocksize) {
2145            _log_errno_str(file->ops, log_callback, (fdb_status) r, "READ",
2146                           file->filename);
2147            const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2148                "is not read correctly: only %d bytes read (block cache disabled).\n";
2149            status = (r < 0)? (fdb_status)r : FDB_RESULT_READ_FAIL;
2150            fdb_log(log_callback, status, msg, bid, file->filename, r);
2151            if (!log_callback || !log_callback->callback) {
2152                dbg_print_buf(buf, file->blocksize, true, 16);
2153            }
2154            return status;
2155        }
2156
2157#ifdef __CRC32
2158        status = _filemgr_crc32_check(file, buf);
2159        if (status != FDB_RESULT_SUCCESS) {
2160            _log_errno_str(file->ops, log_callback, status, "READ",
2161                           file->filename);
2162            const char *msg = "Read error: checksum error on BID %" _F64 " in a database file '%s' "
2163                ": marker %x (block cache disabled)\n";
2164            fdb_log(log_callback, status, msg, bid,
2165                    file->filename, *((uint8_t*)buf + file->blocksize-1));
2166            if (!log_callback || !log_callback->callback) {
2167                dbg_print_buf(buf, file->blocksize, true, 16);
2168            }
2169            return status;
2170        }
2171#endif
2172    }
2173    return status;
2174}
2175
2176fdb_status filemgr_write_offset(struct filemgr *file, bid_t bid,
2177                                uint64_t offset, uint64_t len, void *buf,
2178                                bool final_write,
2179                                err_log_callback *log_callback)
2180{
2181    size_t lock_no;
2182    ssize_t r = 0;
2183    uint64_t pos = bid * file->blocksize + offset;
2184    uint64_t curr_commit_pos = atomic_get_uint64_t(&file->last_commit);
2185
2186    if (offset + len > file->blocksize) {
2187        const char *msg = "Write error: trying to write the buffer data "
2188            "(offset: %" _F64 ", len: %" _F64 " that exceeds the block size "
2189            "%" _F64 " in a database file '%s'\n";
2190        fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, offset, len,
2191                file->blocksize, file->filename);
2192        return FDB_RESULT_WRITE_FAIL;
2193    }
2194
2195    if (sb_bmp_exists(file->sb)) {
2196        // block reusing is enabled
2197        if (!sb_ops.is_writable(file, bid)) {
2198            const char *msg = "Write error: trying to write at the offset %" _F64 " that is "
2199                              "not identified as a reusable block in "
2200                              "a database file '%s'\n";
2201            fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, pos, file->filename);
2202            return FDB_RESULT_WRITE_FAIL;
2203        }
2204    } else if (pos < curr_commit_pos) {
2205        // stale blocks are not reused yet
2206        if (file->sb == NULL ||
2207            (file->sb && pos >= file->sb->config->num_sb * file->blocksize)) {
2208            // (non-sequential update is exceptionally allowed for superblocks)
2209            const char *msg = "Write error: trying to write at the offset %" _F64 " that is "
2210                              "smaller than the current commit offset %" _F64 " in "
2211                              "a database file '%s'\n";
2212            fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, pos, curr_commit_pos,
2213                    file->filename);
2214            return FDB_RESULT_WRITE_FAIL;
2215        }
2216    }
2217
2218    if (global_config.ncacheblock > 0) {
2219        lock_no = bid % DLOCK_MAX;
2220        (void)lock_no;
2221
2222        bool locked = false;
2223#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2224        plock_entry_t *plock_entry;
2225        bid_t is_writer = 1;
2226        plock_entry = plock_lock(&file->plock, &bid, &is_writer);
2227#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2228        mutex_lock(&file->data_mutex[lock_no]);
2229#else
2230        spin_lock(&file->data_spinlock[lock_no]);
2231#endif //__FILEMGR_DATA_PARTIAL_LOCK
2232        locked = true;
2233
2234        if (len == file->blocksize) {
2235            // write entire block .. we don't need to read previous block
2236            r = bcache_write(file, bid, buf, BCACHE_REQ_DIRTY, final_write);
2237            if (r != global_config.blocksize) {
2238                if (locked) {
2239#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2240                    plock_unlock(&file->plock, plock_entry);
2241#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2242                    mutex_unlock(&file->data_mutex[lock_no]);
2243#else
2244                    spin_unlock(&file->data_spinlock[lock_no]);
2245#endif //__FILEMGR_DATA_PARTIAL_LOCK
2246                }
2247                _log_errno_str(file->ops, log_callback,
2248                               (fdb_status) r, "WRITE", file->filename);
2249                return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2250            }
2251        } else {
2252            // partially write buffer cache first
2253            r = bcache_write_partial(file, bid, buf, offset, len, final_write);
2254            if (r == 0) {
2255                // cache miss
2256                // write partially .. we have to read previous contents of the block
2257                int64_t cur_file_pos = file->ops->goto_eof(file->fd);
2258                if (cur_file_pos < 0) {
2259                    _log_errno_str(file->ops, log_callback,
2260                                   (fdb_status) cur_file_pos, "EOF", file->filename);
2261                    return (fdb_status) cur_file_pos;
2262                }
2263                bid_t cur_file_last_bid = cur_file_pos / file->blocksize;
2264                void *_buf = _filemgr_get_temp_buf();
2265
2266                if (bid >= cur_file_last_bid) {
2267                    // this is the first time to write this block
2268                    // we don't need to read previous block from file.
2269                } else {
2270                    r = filemgr_read_block(file, _buf, bid);
2271                    if (r != (ssize_t)file->blocksize) {
2272                        if (locked) {
2273#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2274                            plock_unlock(&file->plock, plock_entry);
2275#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2276                            mutex_unlock(&file->data_mutex[lock_no]);
2277#else
2278                            spin_unlock(&file->data_spinlock[lock_no]);
2279#endif //__FILEMGR_DATA_PARTIAL_LOCK
2280                        }
2281                        _filemgr_release_temp_buf(_buf);
2282                        _log_errno_str(file->ops, log_callback, (fdb_status) r,
2283                                       "READ", file->filename);
2284                        return r < 0 ? (fdb_status) r : FDB_RESULT_READ_FAIL;
2285                    }
2286                }
2287                memcpy((uint8_t *)_buf + offset, buf, len);
2288                r = bcache_write(file, bid, _buf, BCACHE_REQ_DIRTY, final_write);
2289                if (r != global_config.blocksize) {
2290                    if (locked) {
2291#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2292                        plock_unlock(&file->plock, plock_entry);
2293#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2294                        mutex_unlock(&file->data_mutex[lock_no]);
2295#else
2296                        spin_unlock(&file->data_spinlock[lock_no]);
2297#endif //__FILEMGR_DATA_PARTIAL_LOCK
2298                    }
2299                    _filemgr_release_temp_buf(_buf);
2300                    _log_errno_str(file->ops, log_callback,
2301                            (fdb_status) r, "WRITE", file->filename);
2302                    return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2303                }
2304
2305                _filemgr_release_temp_buf(_buf);
2306            } // cache miss
2307        } // full block or partial block
2308
2309        if (locked) {
2310#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2311            plock_unlock(&file->plock, plock_entry);
2312#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2313            mutex_unlock(&file->data_mutex[lock_no]);
2314#else
2315            spin_unlock(&file->data_spinlock[lock_no]);
2316#endif //__FILEMGR_DATA_PARTIAL_LOCK
2317        }
2318    } else { // block cache disabled
2319
2320#ifdef __CRC32
2321        if (len == file->blocksize) {
2322            uint8_t marker = *((uint8_t*)buf + file->blocksize - 1);
2323            if (marker == BLK_MARKER_BNODE) {
2324                memset((uint8_t *)buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
2325                uint32_t crc32 = get_checksum(reinterpret_cast<const uint8_t*>(buf),
2326                                              file->blocksize,
2327                                              file->crc_mode);
2328                crc32 = _endian_encode(crc32);
2329                memcpy((uint8_t *)buf + BTREE_CRC_OFFSET, &crc32, sizeof(crc32));
2330            }
2331        }
2332#endif
2333
2334        r = file->ops->pwrite(file->fd, buf, len, pos);
2335        _log_errno_str(file->ops, log_callback, (fdb_status) r, "WRITE", file->filename);
2336        if ((uint64_t)r != len) {
2337            return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2338        }
2339    } // block cache check
2340    return FDB_RESULT_SUCCESS;
2341}
2342
2343fdb_status filemgr_write(struct filemgr *file, bid_t bid, void *buf,
2344                   err_log_callback *log_callback)
2345{
2346    return filemgr_write_offset(file, bid, 0, file->blocksize, buf,
2347                                false, // TODO: track immutability of index blk
2348                                log_callback);
2349}
2350
2351fdb_status filemgr_commit(struct filemgr *file, bool sync,
2352                          err_log_callback *log_callback)
2353{
2354    // append header at the end of the file
2355    uint64_t bmp_revnum = 0;
2356    if (sb_ops.get_bmp_revnum) {
2357        bmp_revnum = sb_ops.get_bmp_revnum(file);
2358    }
2359    return filemgr_commit_bid(file, BLK_NOT_FOUND, bmp_revnum,
2360                              sync, log_callback);
2361}
2362
2363fdb_status filemgr_commit_bid(struct filemgr *file, bid_t bid,
2364                              uint64_t bmp_revnum, bool sync,
2365                              err_log_callback *log_callback)
2366{
2367    struct avl_node *a;
2368    struct kvs_node *node;
2369    bid_t prev_bid, _prev_bid;
2370    uint64_t _deltasize, _bmp_revnum;
2371    fdb_seqnum_t _seqnum;
2372    filemgr_header_revnum_t _revnum;
2373    int result = FDB_RESULT_SUCCESS;
2374    bool block_reusing = false;
2375
2376    filemgr_set_io_inprog(file);
2377    if (global_config.ncacheblock > 0) {
2378        result = bcache_flush(file);
2379        if (result != FDB_RESULT_SUCCESS) {
2380            _log_errno_str(file->ops, log_callback, (fdb_status) result,
2381                           "FLUSH", file->filename);
2382            filemgr_clear_io_inprog(file);
2383            return (fdb_status)result;
2384        }
2385    }
2386
2387    spin_lock(&file->lock);
2388
2389    uint16_t header_len = file->header.size;
2390    struct kvs_header *kv_header = file->kv_header;
2391    filemgr_magic_t magic = file->version;
2392
2393    if (file->header.size > 0 && file->header.data) {
2394        void *buf = _filemgr_get_temp_buf();
2395        uint8_t marker[BLK_MARKER_SIZE];
2396
2397        // [header data]:        'header_len' bytes   <---+
2398        // [header revnum]:      8 bytes                  |
2399        // [default KVS seqnum]: 8 bytes                  |
2400        // ...                                            |
2401        // (empty)                                    blocksize
2402        // ...                                            |
2403        // [SB bitmap revnum]:   8 bytes                  |
2404        // [Delta size]:         8 bytes                  |
2405        // [prev header bid]:    8 bytes                  |
2406        // [header length]:      2 bytes                  |
2407        // [magic number]:       8 bytes                  |
2408        // [block marker]:       1 byte               <---+
2409
2410        // header data
2411        memcpy(buf, file->header.data, header_len);
2412        // header rev number
2413        _revnum = _endian_encode(file->header.revnum);
2414        memcpy((uint8_t *)buf + header_len, &_revnum,
2415               sizeof(filemgr_header_revnum_t));
2416        // file's sequence number (default KVS seqnum)
2417        _seqnum = _endian_encode(file->header.seqnum.load());
2418        memcpy((uint8_t *)buf + header_len + sizeof(filemgr_header_revnum_t),
2419               &_seqnum, sizeof(fdb_seqnum_t));
2420
2421        // current header's sb bmp revision number
2422        if (file->sb) {
2423            _bmp_revnum = _endian_encode(bmp_revnum);
2424            memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2425                   - sizeof(header_len) - sizeof(_prev_bid)
2426                   - sizeof(_deltasize) - sizeof(_bmp_revnum)
2427                   - BLK_MARKER_SIZE),
2428                   &_bmp_revnum, sizeof(_bmp_revnum));
2429        }
2430
2431        // delta size since prior commit
2432        _deltasize = _endian_encode(file->header.stat.deltasize //index+data
2433                                  + wal_get_datasize(file)); // wal datasize
2434        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2435               - sizeof(header_len) - sizeof(_prev_bid)*2 - BLK_MARKER_SIZE),
2436               &_deltasize, sizeof(_deltasize));
2437
2438        // Reset in-memory delta size of the header for next commit...
2439        file->header.stat.deltasize = 0; // single kv store header
2440        if (kv_header) { // multi kv store stats
2441            a = avl_first(kv_header->idx_id);
2442            while (a) {
2443                node = _get_entry(a, struct kvs_node, avl_id);
2444                a = avl_next(&node->avl_id);
2445                node->stat.deltasize = 0;
2446            }
2447        }
2448
2449        // prev header bid
2450        prev_bid = atomic_get_uint64_t(&file->header.bid);
2451        _prev_bid = _endian_encode(prev_bid);
2452        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2453               - sizeof(header_len) - sizeof(_prev_bid) - BLK_MARKER_SIZE),
2454               &_prev_bid, sizeof(_prev_bid));
2455        // header length
2456        header_len = _endian_encode(header_len);
2457        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2458               - sizeof(header_len) - BLK_MARKER_SIZE),
2459               &header_len, sizeof(header_len));
2460        // magic number
2461        magic = _endian_encode(magic);
2462        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2463               - BLK_MARKER_SIZE), &magic, sizeof(magic));
2464
2465        // marker
2466        memset(marker, BLK_MARKER_DBHEADER, BLK_MARKER_SIZE);
2467        memcpy((uint8_t *)buf + file->blocksize - BLK_MARKER_SIZE,
2468               marker, BLK_MARKER_SIZE);
2469
2470        if (bid == BLK_NOT_FOUND) {
2471            // append header at the end of file
2472            bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
2473            block_reusing = false;
2474        } else {
2475            // write header in the allocated (reused) block
2476            block_reusing = true;
2477            // we MUST invalidate the header block 'bid', since previous
2478            // contents of 'bid' may remain in block cache and cause data
2479            // inconsistency if reading header block hits the cache.
2480            bcache_invalidate_block(file, bid);
2481        }
2482
2483        ssize_t rv = filemgr_write_blocks(file, buf, 1, bid);
2484        _log_errno_str(file->ops, log_callback, (fdb_status) rv,
2485                       "WRITE", file->filename);
2486        if (rv != (ssize_t)file->blocksize) {
2487            _filemgr_release_temp_buf(buf);
2488            spin_unlock(&file->lock);
2489            filemgr_clear_io_inprog(file);
2490            return rv < 0 ? (fdb_status) rv : FDB_RESULT_WRITE_FAIL;
2491        }
2492
2493        if (prev_bid) {
2494            // mark prev DB header as stale
2495            filemgr_add_stale_block(file, prev_bid * file->blocksize, file->blocksize);
2496        }
2497
2498        atomic_store_uint64_t(&file->header.bid, bid);
2499        if (!block_reusing) {
2500            atomic_add_uint64_t(&file->pos, file->blocksize);
2501        }
2502
2503        _filemgr_release_temp_buf(buf);
2504    }
2505
2506    if (sb_bmp_exists(file->sb) &&
2507        atomic_get_uint64_t(&file->sb->cur_alloc_bid) != BLK_NOT_FOUND &&
2508        atomic_get_uint8_t(&file->status) == FILE_NORMAL) {
2509        // block reusing is currently enabled
2510        atomic_store_uint64_t(&file->last_commit,
2511            atomic_get_uint64_t(&file->sb->cur_alloc_bid) * file->blocksize);
2512        // Since some more blocks may be allocated after the header block
2513        // (for storing BMP data or system docs for stale info)
2514        // so that the block pointed to by 'cur_alloc_bid' may have
2515        // different BMP revision number. So we have to use the
2516        // up-to-date bmp_revnum here.
2517        /*
2518          sb_bmp_is_writable() was reporting a block is not writable.
2519          This is because the test bid >= last_commit fails. This is due to
2520          the last_writable_bmp_revnum getting set in all cases, even when
2521          last_commit was set to the end of file.
2522          This causes the a bitmap version mismatch in the check for a writable
2523          block. This fix is a work around to avoid the failure. By setting the
2524          last_writable_bmp_revnum to current bitmap version only when
2525          last_commit is not set to end of file.
2526        */
2527        atomic_store_uint64_t(&file->last_writable_bmp_revnum,
2528                              filemgr_get_sb_bmp_revnum(file));
2529    } else {
2530        atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
2531    }
2532    spin_unlock(&file->lock);
2533
2534    if (sync) {
2535        result = file->ops->fsync(file->fd);
2536        _log_errno_str(file->ops, log_callback, (fdb_status)result,
2537                       "FSYNC", file->filename);
2538    }
2539    filemgr_clear_io_inprog(file);
2540    return (fdb_status) result;
2541}
2542
2543fdb_status filemgr_sync(struct filemgr *file, bool sync_option,
2544                        err_log_callback *log_callback)
2545{
2546    fdb_status result = FDB_RESULT_SUCCESS;
2547    if (global_config.ncacheblock > 0) {
2548        result = bcache_flush(file);
2549        if (result != FDB_RESULT_SUCCESS) {
2550            _log_errno_str(file->ops, log_callback, (fdb_status) result,
2551                           "FLUSH", file->filename);
2552            return result;
2553        }
2554    }
2555
2556    if (sync_option && file->fflags & FILEMGR_SYNC) {
2557        int rv = file->ops->fsync(file->fd);
2558        _log_errno_str(file->ops, log_callback, (fdb_status)rv, "FSYNC", file->filename);
2559        return (fdb_status) rv;
2560    }
2561    return result;
2562}
2563
2564fdb_status filemgr_copy_file_range(struct filemgr *src_file,
2565                                   struct filemgr *dst_file,
2566                                   bid_t src_bid, bid_t dst_bid,
2567                                   bid_t clone_len)
2568{
2569    uint32_t blocksize = src_file->blocksize;
2570    fdb_status fs = (fdb_status)dst_file->ops->copy_file_range(
2571                                            src_file->fs_type,
2572                                            src_file->fd,
2573                                            dst_file->fd,
2574                                            src_bid * blocksize,
2575                                            dst_bid * blocksize,
2576                                            clone_len * blocksize);
2577    if (fs != FDB_RESULT_SUCCESS) {
2578        return fs;
2579    }
2580    atomic_store_uint64_t(&dst_file->pos, (dst_bid + clone_len) * blocksize);
2581    return FDB_RESULT_SUCCESS;
2582}
2583
2584void filemgr_update_file_status(struct filemgr *file, file_status_t status)
2585{
2586    spin_lock(&file->lock);
2587    atomic_store_uint8_t(&file->status, status);
2588    spin_unlock(&file->lock);
2589}
2590
2591static void assign_old_filename(struct filemgr *file, const char *old_filename)
2592{
2593    free(file->old_filename);
2594    if (old_filename) {
2595        file->old_filename = (char*)malloc(strlen(old_filename) + 1);
2596        strcpy(file->old_filename, old_filename);
2597    } else {
2598        file->old_filename = NULL;
2599    }
2600}
2601
2602static void assign_new_filename(struct filemgr *file, const char *new_filename)
2603{
2604    free(file->new_filename);
2605    if (new_filename) {
2606        file->new_filename = (char*)malloc(strlen(new_filename) + 1);
2607        strcpy(file->new_filename, new_filename);
2608    } else {
2609        file->new_filename = NULL;
2610    }
2611}
2612
2613bool filemgr_update_file_linkage(struct filemgr *file,
2614                                 const char *old_filename,
2615                                 const char *new_filename)
2616{
2617
2618    bool ret = true;
2619    spin_lock(&file->lock);
2620    if (old_filename) {
2621        if (!file->old_filename) {
2622            assign_old_filename(file, old_filename);
2623        } else {
2624            ret = false;
2625            fdb_assert(atomic_get_uint32_t(&file->ref_count),
2626                       atomic_get_uint32_t(&file->ref_count), 0);
2627        }
2628    }
2629    if (new_filename) {
2630        assign_new_filename(file, new_filename);
2631    }
2632    spin_unlock(&file->lock);
2633    return ret;
2634}
2635
2636void filemgr_set_compaction_state(struct filemgr *old_file,
2637                                  struct filemgr *new_file,
2638                                  file_status_t status)
2639{
2640    if (old_file) {
2641        spin_lock(&old_file->lock);
2642        assign_new_filename(old_file, new_file ? new_file->filename : NULL);
2643        atomic_store_uint8_t(&old_file->status, status);
2644        spin_unlock(&old_file->lock);
2645
2646        if (new_file) {
2647            spin_lock(&new_file->lock);
2648            assign_old_filename(new_file, old_file->filename);
2649            spin_unlock(&new_file->lock);
2650        }
2651    }
2652}
2653
2654bool filemgr_set_kv_header(struct filemgr *file, struct kvs_header *kv_header,
2655                           void (*free_kv_header)(struct filemgr *file))
2656{
2657    bool ret;
2658    spin_lock(&file->lock);
2659
2660    if (!file->kv_header) {
2661        file->kv_header = kv_header;
2662        file->free_kv_header = free_kv_header;
2663        ret = true;
2664    } else {
2665        ret = false;
2666    }
2667
2668    spin_unlock(&file->lock);
2669
2670    return ret;
2671}
2672
2673struct kvs_header *filemgr_get_kv_header(struct filemgr *file)
2674{
2675    struct kvs_header *kv_header = NULL;
2676    spin_lock(&file->lock);
2677    kv_header = file->kv_header;
2678    spin_unlock(&file->lock);
2679    return kv_header;
2680}
2681
2682// Check if there is a file that still points to the old_file that is being
2683// compacted away. If so open the file and return its pointer.
2684static
2685void *_filemgr_check_stale_link(struct hash_elem *h, void *ctx) {
2686    struct filemgr *cur_file = (struct filemgr *)ctx;
2687    struct filemgr *file = _get_entry(h, struct filemgr, e);
2688    spin_lock(&file->lock);
2689    if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING &&
2690        !strcmp(file->new_filename, cur_file->filename)) {
2691        // Incrementing reference counter below is the same as filemgr_open()
2692        // We need to do this to ensure that the pointer returned does not
2693        // get freed outside the filemgr_open lock
2694        atomic_incr_uint32_t(&file->ref_count);
2695        spin_unlock(&file->lock);
2696        return (void *)file;
2697    }
2698    spin_unlock(&file->lock);
2699    return (void *)NULL;
2700}
2701
2702struct filemgr *filemgr_search_stale_links(struct filemgr *cur_file) {
2703    struct filemgr *very_old_file;
2704    spin_lock(&filemgr_openlock);
2705    very_old_file = (struct filemgr *)hash_scan(&hash,
2706                                         _filemgr_check_stale_link, cur_file);
2707    spin_unlock(&filemgr_openlock);
2708    return very_old_file;
2709}
2710
2711char *filemgr_redirect_old_file(struct filemgr *very_old_file,
2712                                struct filemgr *new_file,
2713                                filemgr_redirect_hdr_func
2714                                redirect_header_func) {
2715    if (!very_old_file || !new_file) {
2716        return NULL;
2717    }
2718
2719    size_t old_header_len, new_header_len;
2720    uint16_t new_filename_len;
2721    char *past_filename;
2722    spin_lock(&very_old_file->lock);
2723
2724    struct filemgr *new_file_of_very_old_file =
2725        filemgr_get_instance(very_old_file->new_filename);
2726
2727    if (very_old_file->header.size == 0 || !new_file_of_very_old_file) {
2728        spin_unlock(&very_old_file->lock);
2729        return NULL;
2730    }
2731
2732    old_header_len = very_old_file->header.size;
2733    new_filename_len = strlen(new_file->filename);
2734    // Find out the new DB header length with new_file's filename
2735    new_header_len = old_header_len
2736                     - strlen(new_file_of_very_old_file->filename)
2737                     + new_filename_len;
2738    // As we are going to change the new_filename field in the DB header of the
2739    // very_old_file, maybe reallocate DB header buf to accomodate bigger value
2740    if (new_header_len > old_header_len) {
2741        very_old_file->header.data = realloc(very_old_file->header.data,
2742                                             new_file->blocksize);
2743    }
2744    // Re-direct very_old_file to new_file
2745    assign_new_filename(very_old_file, new_file->filename);
2746    // Note that the old_filename of the new_file is not updated, this
2747    // is so that every file in the history is reachable from the current file.
2748
2749    past_filename = redirect_header_func(very_old_file,
2750                                         (uint8_t *)very_old_file->header.data,
2751                                         new_file);//Update in-memory header
2752    very_old_file->header.size = new_header_len;
2753    ++(very_old_file->header.revnum);
2754
2755    spin_unlock(&very_old_file->lock);
2756    return past_filename;
2757}
2758
2759void filemgr_remove_pending(struct filemgr *old_file,
2760                            struct filemgr *new_file,
2761                            err_log_callback *log_callback)
2762{
2763    if (new_file == NULL) {
2764        return;
2765    }
2766
2767    spin_lock(&old_file->lock);
2768    if (atomic_get_uint32_t(&old_file->ref_count) > 0) {
2769        // delay removing
2770        assign_new_filename(old_file, new_file->filename);
2771        atomic_store_uint8_t(&old_file->status, FILE_REMOVED_PENDING);
2772
2773#if !(defined(WIN32) || defined(_WIN32))
2774        // Only for Posix
2775        int ret;
2776        ret = unlink(old_file->filename);
2777        _log_errno_str(old_file->ops, log_callback, (fdb_status)ret,
2778                       "UNLINK", old_file->filename);
2779#endif
2780
2781        spin_unlock(&old_file->lock);
2782
2783        // Update new_file's old_filename
2784        spin_lock(&new_file->lock);
2785        assign_old_filename(new_file, old_file->filename);
2786        spin_unlock(&new_file->lock);
2787    } else {
2788        // immediatly remove
2789        // LCOV_EXCL_START
2790        spin_unlock(&old_file->lock);
2791
2792        struct filemgr *new_file_of_old_file =
2793            filemgr_get_instance(old_file->new_filename);
2794
2795        if (!lazy_file_deletion_enabled ||
2796            (new_file_of_old_file && new_file_of_old_file->in_place_compaction)) {
2797            remove(old_file->filename);
2798        }
2799        filemgr_remove_file(old_file, log_callback);
2800        // LCOV_EXCL_STOP
2801    }
2802}
2803
2804// migrate default kv store stats over to new_file
2805struct kvs_ops_stat *filemgr_migrate_op_stats(struct filemgr *old_file,
2806                                              struct filemgr *new_file,
2807                                              struct kvs_info *kvs)
2808{
2809    kvs_ops_stat *ret = NULL;
2810    if (new_file == NULL) {
2811        return NULL;
2812    }
2813
2814    spin_lock(&old_file->lock);
2815    new_file->header.op_stat = old_file->header.op_stat;
2816    ret = &new_file->header.op_stat;
2817    spin_unlock(&old_file->lock);
2818    return ret;
2819}
2820
2821// Note: filemgr_openlock should be held before calling this function.
2822fdb_status filemgr_destroy_file(char *filename,
2823                                struct filemgr_config *config,
2824                                struct hash *destroy_file_set)
2825{
2826    struct filemgr *file = NULL;
2827    struct hash to_destroy_files;
2828    struct hash *destroy_set = (destroy_file_set ? destroy_file_set :
2829                                                  &to_destroy_files);
2830    struct filemgr query;
2831    struct hash_elem *e = NULL;
2832    fdb_status status = FDB_RESULT_SUCCESS;
2833    char *old_filename = NULL;
2834
2835    if (!destroy_file_set) { // top level or non-recursive call
2836        hash_init(destroy_set, NBUCKET, _file_hash, _file_cmp);
2837    }
2838
2839    query.filename = filename;
2840    // check whether file is already being destroyed in parent recursive call
2841    e = hash_find(destroy_set, &query.e);
2842    if (e) { // Duplicate filename found, nothing to be done in this call
2843        if (!destroy_file_set) { // top level or non-recursive call
2844            hash_free(destroy_set);
2845        }
2846        return status;
2847    } else {
2848        // Remember file. Stack value ok IFF single direction recursion
2849        hash_insert(destroy_set, &query.e);
2850    }
2851
2852    // check global list of known files to see if it is already opened or not
2853    e = hash_find(&hash, &query.e);
2854    if (e) {
2855        // already opened (return existing structure)
2856        file = _get_entry(e, struct filemgr, e);
2857
2858        spin_lock(&file->lock);
2859        if (atomic_get_uint32_t(&file->ref_count)) {
2860            spin_unlock(&file->lock);
2861            status = FDB_RESULT_FILE_IS_BUSY;
2862            if (!destroy_file_set) { // top level or non-recursive call
2863                hash_free(destroy_set);
2864            }
2865            return status;
2866        }
2867        spin_unlock(&file->lock);
2868        if (file->old_filename) {
2869            status = filemgr_destroy_file(file->old_filename, config,
2870                                          destroy_set);
2871            if (status != FDB_RESULT_SUCCESS) {
2872                if (!destroy_file_set) { // top level or non-recursive call
2873                    hash_free(destroy_set);
2874                }
2875                return status;
2876            }
2877        }
2878
2879        // Cleanup file from in-memory as well as on-disk
2880        e = hash_remove(&hash, &file->e);
2881        fdb_assert(e, e, 0);
2882        filemgr_free_func(&file->e);
2883        if (filemgr_does_file_exist(filename) == FDB_RESULT_SUCCESS) {
2884            if (remove(filename)) {
2885                status = FDB_RESULT_FILE_REMOVE_FAIL;
2886            }
2887        }
2888    } else { // file not in memory, read on-disk to destroy older versions..
2889        file = (struct filemgr *)alca(struct filemgr, 1);
2890        memset(file, 0x0, sizeof(struct filemgr));
2891        file->filename = filename;
2892        file->ops = get_filemgr_ops();
2893        file->fd = file->ops->open(file->filename, O_RDWR, 0666);
2894        file->blocksize = global_config.blocksize;
2895        file->config = (struct filemgr_config *)alca(struct filemgr_config, 1);
2896        *file->config = *config;
2897        fdb_init_encryptor(&file->encryption, &config->encryption_key);
2898        if (file->fd < 0) {
2899            if (file->fd != FDB_RESULT_NO_SUCH_FILE) {
2900                if (!destroy_file_set) { // top level or non-recursive call
2901                    hash_free(destroy_set);
2902                }
2903                return (fdb_status) file->fd;
2904            }
2905        } else { // file successfully opened, seek to end to get DB header
2906            cs_off_t offset = file->ops->goto_eof(file->fd);
2907            if (offset < 0) {
2908                if (!destroy_file_set) { // top level or non-recursive call
2909                    hash_free(destroy_set);
2910                }
2911                return (fdb_status) offset;
2912            } else { // Need to read DB header which contains old filename
2913                atomic_store_uint64_t(&file->pos, offset);
2914                // initialize CRC mode
2915                if (file->config && file->config->options & FILEMGR_CREATE_CRC32) {
2916                    file->crc_mode = CRC32;
2917                } else {
2918                    file->crc_mode = CRC_DEFAULT;
2919                }
2920
2921                status = _filemgr_load_sb(file, NULL);
2922                if (status != FDB_RESULT_SUCCESS) {
2923                    if (!destroy_file_set) { // top level or non-recursive call
2924                        hash_free(destroy_set);
2925                    }
2926                    file->ops->close(file->fd);
2927                    return status;
2928                }
2929
2930                status = _filemgr_read_header(file, NULL);
2931                if (status != FDB_RESULT_SUCCESS) {
2932                    if (!destroy_file_set) { // top level or non-recursive call
2933                        hash_free(destroy_set);
2934                    }
2935                    file->ops->close(file->fd);
2936                    if (sb_ops.release && file->sb) {
2937                        sb_ops.release(file);
2938                    }
2939                    return status;
2940                }
2941                if (file->header.data) {
2942                    size_t new_fnamelen_off = ver_get_new_filename_off(file->
2943                                                                      version);
2944                    size_t old_fnamelen_off = new_fnamelen_off + 2;
2945                    uint16_t *new_filename_len_ptr = (uint16_t *)((char *)
2946                                                     file->header.data
2947                                                     + new_fnamelen_off);
2948                    uint16_t new_filename_len =
2949                                      _endian_decode(*new_filename_len_ptr);
2950                    uint16_t *old_filename_len_ptr = (uint16_t *)((char *)
2951                                                     file->header.data
2952                                                     + old_fnamelen_off);
2953                    uint16_t old_filename_len =
2954                                      _endian_decode(*old_filename_len_ptr);
2955                    old_filename = (char *)file->header.data + old_fnamelen_off
2956                                   + 2 + new_filename_len;
2957                    if (old_filename_len) {
2958                        status = filemgr_destroy_file(old_filename, config,
2959                                                      destroy_set);
2960                    }
2961                    free(file->header.data);
2962                }
2963                file->ops->close(file->fd);
2964                if (sb_ops.release && file->sb) {
2965                    sb_ops.release(file);
2966                }
2967                if (status == FDB_RESULT_SUCCESS) {
2968                    if (filemgr_does_file_exist(filename)
2969                                               == FDB_RESULT_SUCCESS) {
2970                        if (remove(filename)) {
2971                            status = FDB_RESULT_FILE_REMOVE_FAIL;
2972                        }
2973                    }
2974                }
2975            }
2976        }
2977    }
2978
2979    if (!destroy_file_set) { // top level or non-recursive call
2980        hash_free(destroy_set);
2981    }
2982
2983    return status;
2984}
2985
2986bool filemgr_is_rollback_on(struct filemgr *file)
2987{
2988    bool rv;
2989    spin_lock(&file->lock);
2990    rv = (file->fflags & FILEMGR_ROLLBACK_IN_PROG);
2991    spin_unlock(&file->lock);
2992    return rv;
2993}
2994
2995void filemgr_set_rollback(struct filemgr *file, uint8_t new_val)
2996{
2997    spin_lock(&file->lock);
2998    if (new_val) {
2999        file->fflags |= FILEMGR_ROLLBACK_IN_PROG;
3000    } else {
3001        file->fflags &= ~FILEMGR_ROLLBACK_IN_PROG;
3002    }
3003    spin_unlock(&file->lock);
3004}
3005
3006void filemgr_set_cancel_compaction(struct filemgr *file, bool cancel)
3007{
3008    spin_lock(&file->lock);
3009    if (cancel) {
3010        file->fflags |= FILEMGR_CANCEL_COMPACTION;
3011    } else {
3012        file->fflags &= ~FILEMGR_CANCEL_COMPACTION;
3013    }
3014    spin_unlock(&file->lock);
3015}
3016
3017bool filemgr_is_compaction_cancellation_requested(struct filemgr *file)
3018{
3019    bool rv;
3020    spin_lock(&file->lock);
3021    rv = (file->fflags & FILEMGR_CANCEL_COMPACTION);
3022    spin_unlock(&file->lock);
3023    return rv;
3024}
3025
3026void filemgr_set_successfully_compacted(struct filemgr *file)
3027{
3028    spin_lock(&file->lock);
3029    file->fflags |= FILEMGR_SUCCESSFULLY_COMPACTED;
3030    spin_unlock(&file->lock);
3031}
3032
3033bool filemgr_is_successfully_compacted(struct filemgr *file)
3034{
3035    bool rv;
3036    spin_lock(&file->lock);
3037    rv = (file->fflags & FILEMGR_SUCCESSFULLY_COMPACTED);
3038    spin_unlock(&file->lock);
3039    return rv;
3040}
3041
3042void filemgr_set_in_place_compaction(struct filemgr *file,
3043                                     bool in_place_compaction) {
3044    spin_lock(&file->lock);
3045    file->in_place_compaction = in_place_compaction;
3046    spin_unlock(&file->lock);
3047}
3048
3049bool filemgr_is_in_place_compaction_set(struct filemgr *file)
3050
3051{
3052    bool ret = false;
3053    spin_lock(&file->lock);
3054    ret = file->in_place_compaction;
3055    spin_unlock(&file->lock);
3056    return ret;
3057}
3058
3059void filemgr_mutex_openlock(struct filemgr_config *config)
3060{
3061    filemgr_init(config);
3062
3063    spin_lock(&filemgr_openlock);
3064}
3065
3066void filemgr_mutex_openunlock(void)
3067{
3068    spin_unlock(&filemgr_openlock);
3069}
3070
3071void filemgr_mutex_lock(struct filemgr *file)
3072{
3073    mutex_lock(&file->writer_lock.mutex);
3074    file->writer_lock.locked = true;
3075}
3076
3077bool filemgr_mutex_trylock(struct filemgr *file) {
3078    if (mutex_trylock(&file->writer_lock.mutex)) {
3079        file->writer_lock.locked = true;
3080        return true;
3081    }
3082    return false;
3083}
3084
3085void filemgr_mutex_unlock(struct filemgr *file)
3086{
3087    if (file->writer_lock.locked) {
3088        file->writer_lock.locked = false;
3089        mutex_unlock(&file->writer_lock.mutex);
3090    }
3091}
3092
3093bool filemgr_is_commit_header(void *head_buffer, size_t blocksize)
3094{
3095    uint8_t marker[BLK_MARKER_SIZE];
3096    filemgr_magic_t magic;
3097    marker[0] = *(((uint8_t *)head_buffer)
3098                 + blocksize - BLK_MARKER_SIZE);
3099    if (marker[0] != BLK_MARKER_DBHEADER) {
3100        return false;
3101    }
3102
3103    memcpy(&magic, (uint8_t *) head_buffer
3104            + blocksize - BLK_MARKER_SIZE - sizeof(magic), sizeof(magic));
3105    magic = _endian_decode(magic);
3106
3107    return ver_is_valid_magic(magic);
3108}
3109
3110bool filemgr_is_cow_supported(struct filemgr *src, struct filemgr *dst)
3111{
3112    src->fs_type = src->ops->get_fs_type(src->fd);
3113    if (src->fs_type < 0) {
3114        return false;
3115    }
3116    dst->fs_type = dst->ops->get_fs_type(dst->fd);
3117    if (dst->fs_type < 0) {
3118        return false;
3119    }
3120    if (src->fs_type == dst->fs_type && src->fs_type != FILEMGR_FS_NO_COW) {
3121        return true;
3122    }
3123    return false;
3124}
3125
3126void filemgr_set_throttling_delay(struct filemgr *file, uint64_t delay_us)
3127{
3128    atomic_store_uint32_t(&file->throttling_delay, delay_us,
3129                          std::memory_order_relaxed);
3130}
3131
3132uint32_t filemgr_get_throttling_delay(struct filemgr *file)
3133{
3134    return atomic_get_uint32_t(&file->throttling_delay,
3135                               std::memory_order_relaxed);
3136}
3137
3138void filemgr_clear_stale_list(struct filemgr *file)
3139{
3140    if (file->stale_list) {
3141        // if the items in the list are not freed yet, release them first.
3142        struct list_elem *e;
3143        struct stale_data *item;
3144
3145        e = list_begin(file->stale_list);
3146        while (e) {
3147            item = _get_entry(e, struct stale_data, le);
3148            e = list_remove(file->stale_list, e);
3149            free(item);
3150        }
3151        file->stale_list = NULL;
3152    }
3153}
3154
3155void filemgr_clear_stale_info_tree(struct filemgr *file)
3156{
3157    struct avl_node *a;
3158    struct list_elem *e;
3159    struct stale_info_commit *commit;
3160    struct stale_info_entry *entry;
3161
3162    a = avl_first(&file->stale_info_tree);
3163    while (a) {
3164        commit = _get_entry(a, struct stale_info_commit, avl);
3165        a = avl_next(&commit->avl);
3166        avl_remove(&file->stale_info_tree, &commit->avl);
3167
3168        e = list_begin(&commit->doc_list);
3169        while (e) {
3170            entry = _get_entry(e, struct stale_info_entry, le);
3171            e = list_next(&entry->le);
3172            list_remove(&commit->doc_list, &entry->le);
3173            free(entry->ctx);
3174            free(entry);
3175        }
3176        free(commit);
3177    }
3178}
3179
3180void filemgr_clear_mergetree(struct filemgr *file)
3181{
3182    struct avl_node *a;
3183    struct stale_data *entry;
3184
3185    a = avl_first(&file->mergetree);
3186    while (a) {
3187        entry = _get_entry(a, struct stale_data, avl);
3188        a = avl_next(&entry->avl);
3189        avl_remove(&file->mergetree, &entry->avl);
3190        free(entry);
3191    }
3192}
3193
3194void filemgr_add_stale_block(struct filemgr *file,
3195                             bid_t pos,
3196                             size_t len)
3197{
3198    if (file->stale_list) {
3199        struct stale_data *item;
3200        struct list_elem *e;
3201
3202        e = list_end(file->stale_list);
3203
3204        if (e) {
3205            item = _get_entry(e, struct stale_data, le);
3206            if (item->pos + item->len == pos) {
3207                // merge if consecutive item
3208                item->len += len;
3209                return;
3210            }
3211        }
3212
3213        item = (struct stale_data*)calloc(1, sizeof(struct stale_data));
3214        item->pos = pos;
3215        item->len = len;
3216        list_push_back(file->stale_list, &item->le);
3217    }
3218}
3219
3220size_t filemgr_actual_stale_length(struct filemgr *file,
3221                                   bid_t offset,
3222                                   size_t length)
3223{
3224    size_t actual_len;
3225    bid_t start_bid, end_bid;
3226
3227    start_bid = offset / file->blocksize;
3228    end_bid = (offset + length) / file->blocksize;
3229
3230    actual_len = length + (end_bid - start_bid);
3231    if ((offset + actual_len) % file->blocksize ==
3232        file->blocksize - 1) {
3233        actual_len += 1;
3234    }
3235
3236    return actual_len;
3237}
3238
3239// if a document is not physically consecutive,
3240// return all fragmented regions.
3241struct stale_regions filemgr_actual_stale_regions(struct filemgr *file,
3242                                                  bid_t offset,
3243                                                  size_t length)
3244{
3245    uint8_t *buf = alca(uint8_t, file->blocksize);
3246    size_t remaining = length;
3247    size_t real_blocksize = file->blocksize;
3248    size_t blocksize = real_blocksize;
3249    size_t cur_pos, space_in_block, count;
3250    bid_t cur_bid;
3251    bool non_consecutive = ver_non_consecutive_doc(file->version);
3252    struct docblk_meta blk_meta;
3253    struct stale_regions ret;
3254    struct stale_data *arr = NULL, *cur_region;
3255
3256    if (non_consecutive) {
3257        blocksize -= DOCBLK_META_SIZE;
3258
3259        cur_bid = offset / file->blocksize;
3260        // relative position in the block 'cur_bid'
3261        cur_pos = offset % file->blocksize;
3262
3263        count = 0;
3264        while (remaining) {
3265            if (count == 1) {
3266                // more than one stale region .. allocate array
3267                size_t arr_size = (length / blocksize) + 2;
3268                arr = (struct stale_data *)calloc(arr_size, sizeof(struct stale_data));
3269                arr[0] = ret.region;
3270                ret.regions = arr;
3271            }
3272
3273            if (count == 0) {
3274                // Since n_regions will be 1 in most cases,
3275                // we do not allocate heap memory when 'n_regions==1'.
3276                cur_region = &ret.region;
3277            } else {
3278                cur_region = &ret.regions[count];
3279            }
3280            cur_region->pos = (cur_bid * real_blocksize) + cur_pos;
3281
3282            // subtract data size in the current block
3283            space_in_block = blocksize - cur_pos;
3284            if (space_in_block <= remaining) {
3285                // rest of the current block (including block meta)
3286                cur_region->len = real_blocksize - cur_pos;
3287                remaining -= space_in_block;
3288            } else {
3289                cur_region->len = remaining;
3290                remaining = 0;
3291            }
3292            count++;
3293
3294            if (remaining) {
3295                // get next BID
3296                filemgr_read(file, cur_bid, (void *)buf, NULL, true);
3297                memcpy(&blk_meta, buf + blocksize, sizeof(blk_meta));
3298                cur_bid = _endian_decode(blk_meta.next_bid);
3299                cur_pos = 0; // beginning of the block
3300            }
3301        }
3302        ret.n_regions = count;
3303
3304    } else {
3305        // doc blocks are consecutive .. always return a single region.
3306        ret.n_regions = 1;
3307        ret.region.pos = offset;
3308        ret.region.len = filemgr_actual_stale_length(file, offset, length);
3309    }
3310
3311    return ret;
3312}
3313
3314void filemgr_mark_stale(struct filemgr *file,
3315                        bid_t offset,
3316                        size_t length)
3317{
3318    if (file->stale_list && length) {
3319        size_t i;
3320        struct stale_regions sr;
3321
3322        sr = filemgr_actual_stale_regions(file, offset, length);
3323
3324        if (sr.n_regions > 1) {
3325            for (i=0; i<sr.n_regions; ++i){
3326                filemgr_add_stale_block(file, sr.regions[i].pos, sr.regions[i].len);
3327            }
3328            free(sr.regions);
3329        } else if (sr.n_regions == 1) {
3330            filemgr_add_stale_block(file, sr.region.pos, sr.region.len);
3331        }
3332    }
3333}
3334
3335INLINE int _fhandle_idx_cmp(struct avl_node *a, struct avl_node *b, void *aux)
3336{
3337    uint64_t aaa, bbb;
3338    struct filemgr_fhandle_idx_node *aa, *bb;
3339    aa = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
3340    bb = _get_entry(b, struct filemgr_fhandle_idx_node, avl);
3341    aaa = (uint64_t)aa->fhandle;
3342    bbb = (uint64_t)bb->fhandle;
3343
3344#ifdef __BIT_CMP
3345    return _CMP_U64(aaa, bbb);
3346#else
3347    if (aaa < bbb) {
3348        return -1;
3349    } else if (aaa > bbb) {
3350        return 1;