xref: /6.6.0/forestdb/src/filemgr.cc (revision e4615599)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <sys/stat.h>
23#include <stdarg.h>
24#if !defined(WIN32) && !defined(_WIN32)
25#include <sys/time.h>
26#endif
27
28#include "filemgr.h"
29#include "filemgr_ops.h"
30#include "hash_functions.h"
31#include "blockcache.h"
32#include "wal.h"
33#include "list.h"
34#include "fdb_internal.h"
35#include "time_utils.h"
36#include "encryption.h"
37#include "version.h"
38
39#include "memleak.h"
40
41#ifdef __DEBUG
42#ifndef __DEBUG_FILEMGR
43    #undef DBG
44    #undef DBGCMD
45    #undef DBGSW
46    #define DBG(...)
47    #define DBGCMD(...)
48    #define DBGSW(n, ...)
49#endif
50#endif
51
52// NBUCKET must be power of 2
53#define NBUCKET (1024)
54
55// global static variables
56#ifdef SPIN_INITIALIZER
57static spin_t initial_lock = SPIN_INITIALIZER;
58#else
59static volatile unsigned int initial_lock_status = 0;
60static spin_t initial_lock;
61#endif
62
63
64static volatile uint8_t filemgr_initialized = 0;
65extern volatile uint8_t bgflusher_initialized;
66static struct filemgr_config global_config;
67static struct hash hash;
68static spin_t filemgr_openlock;
69
70static const int MAX_STAT_UPDATE_RETRIES = 5;
71
72struct temp_buf_item{
73    void *addr;
74    struct list_elem le;
75};
76static struct list temp_buf;
77static spin_t temp_buf_lock;
78
79static bool lazy_file_deletion_enabled = false;
80static register_file_removal_func register_file_removal = NULL;
81static check_file_removal_func is_file_removed = NULL;
82
83static struct sb_ops sb_ops;
84
85static void spin_init_wrap(void *lock) {
86    spin_init((spin_t*)lock);
87}
88
89static void spin_destroy_wrap(void *lock) {
90    spin_destroy((spin_t*)lock);
91}
92
93static void spin_lock_wrap(void *lock) {
94    spin_lock((spin_t*)lock);
95}
96
97static void spin_unlock_wrap(void *lock) {
98    spin_unlock((spin_t*)lock);
99}
100
101static void mutex_init_wrap(void *lock) {
102    mutex_init((mutex_t*)lock);
103}
104
105static void mutex_destroy_wrap(void *lock) {
106    mutex_destroy((mutex_t*)lock);
107}
108
109static void mutex_lock_wrap(void *lock) {
110    mutex_lock((mutex_t*)lock);
111}
112
113static void mutex_unlock_wrap(void *lock) {
114    mutex_unlock((mutex_t*)lock);
115}
116
117static int _kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
118{
119    struct kvs_node *aa, *bb;
120    aa = _get_entry(a, struct kvs_node, avl_id);
121    bb = _get_entry(b, struct kvs_node, avl_id);
122
123    if (aa->id < bb->id) {
124        return -1;
125    } else if (aa->id > bb->id) {
126        return 1;
127    } else {
128        return 0;
129    }
130}
131
132static int _block_is_overlapped(void *pbid1, void *pis_writer1,
133                                void *pbid2, void *pis_writer2,
134                                void *aux)
135{
136    (void)aux;
137    bid_t bid1, is_writer1, bid2, is_writer2;
138    bid1 = *(bid_t*)pbid1;
139    is_writer1 = *(bid_t*)pis_writer1;
140    bid2 = *(bid_t*)pbid2;
141    is_writer2 = *(bid_t*)pis_writer2;
142
143    if (bid1 != bid2) {
144        // not overlapped
145        return 0;
146    } else {
147        // overlapped
148        if (!is_writer1 && !is_writer2) {
149            // both are readers
150            return 0;
151        } else {
152            return 1;
153        }
154    }
155}
156
157fdb_status fdb_log(err_log_callback *log_callback,
158                   fdb_status status,
159                   const char *format, ...)
160{
161    char msg[4096];
162    va_list args;
163    va_start(args, format);
164    vsprintf(msg, format, args);
165    va_end(args);
166
167    if (log_callback && log_callback->callback) {
168        log_callback->callback(status, msg, log_callback->ctx_data);
169    } else {
170        if (status != FDB_RESULT_SUCCESS) {
171            fprintf(stderr, "[FDB ERR] %s\n", msg);
172        } else {
173            fprintf(stderr, "[FDB INFO] %s\n", msg);
174        }
175    }
176    return status;
177}
178
179static void _log_errno_str(struct filemgr_ops *ops,
180                           err_log_callback *log_callback,
181                           fdb_status io_error,
182                           const char *what,
183                           const char *filename)
184{
185    if (io_error < 0) {
186        char errno_msg[512];
187        ops->get_errno_str(errno_msg, 512);
188        fdb_log(log_callback, io_error,
189                "Error in %s on a database file '%s', %s", what, filename, errno_msg);
190    }
191}
192
193static uint32_t _file_hash(struct hash *hash, struct hash_elem *e)
194{
195    struct filemgr *file = _get_entry(e, struct filemgr, e);
196    int len = strlen(file->filename);
197
198    return get_checksum(reinterpret_cast<const uint8_t*>(file->filename), len) &
199                        ((unsigned)(NBUCKET-1));
200}
201
202static int _file_cmp(struct hash_elem *a, struct hash_elem *b)
203{
204    struct filemgr *aa, *bb;
205    aa = _get_entry(a, struct filemgr, e);
206    bb = _get_entry(b, struct filemgr, e);
207    return strcmp(aa->filename, bb->filename);
208}
209
210void filemgr_init(struct filemgr_config *config)
211{
212    // global initialization
213    // initialized only once at first time
214    if (!filemgr_initialized) {
215#ifndef SPIN_INITIALIZER
216        // Note that only Windows passes through this routine
217        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
218            // atomically initialize spin lock only once
219            spin_init(&initial_lock);
220            initial_lock_status = 2;
221        } else {
222            // the others ... wait until initializing 'initial_lock' is done
223            while (initial_lock_status != 2) {
224                Sleep(1);
225            }
226        }
227#endif
228
229        spin_lock(&initial_lock);
230        if (!filemgr_initialized) {
231            memset(&sb_ops, 0x0, sizeof(sb_ops));
232            global_config = *config;
233
234            if (global_config.ncacheblock > 0)
235                bcache_init(global_config.ncacheblock, global_config.blocksize);
236
237            hash_init(&hash, NBUCKET, _file_hash, _file_cmp);
238
239            // initialize temp buffer
240            list_init(&temp_buf);
241            spin_init(&temp_buf_lock);
242
243            // initialize global lock
244            spin_init(&filemgr_openlock);
245
246            // set the initialize flag
247            filemgr_initialized = 1;
248        }
249        spin_unlock(&initial_lock);
250    }
251}
252
253void filemgr_set_lazy_file_deletion(bool enable,
254                                    register_file_removal_func regis_func,
255                                    check_file_removal_func check_func)
256{
257    lazy_file_deletion_enabled = enable;
258    register_file_removal = regis_func;
259    is_file_removed = check_func;
260}
261
262void filemgr_set_sb_operation(struct sb_ops ops)
263{
264    sb_ops = ops;
265}
266
267static void * _filemgr_get_temp_buf()
268{
269    struct list_elem *e;
270    struct temp_buf_item *item;
271
272    spin_lock(&temp_buf_lock);
273    e = list_pop_front(&temp_buf);
274    if (e) {
275        item = _get_entry(e, struct temp_buf_item, le);
276    } else {
277        void *addr = NULL;
278
279        malloc_align(addr, FDB_SECTOR_SIZE,
280                     global_config.blocksize + sizeof(struct temp_buf_item));
281
282        item = (struct temp_buf_item *)((uint8_t *) addr + global_config.blocksize);
283        item->addr = addr;
284    }
285    spin_unlock(&temp_buf_lock);
286
287    return item->addr;
288}
289
290static void _filemgr_release_temp_buf(void *buf)
291{
292    struct temp_buf_item *item;
293
294    spin_lock(&temp_buf_lock);
295    item = (struct temp_buf_item*)((uint8_t *)buf + global_config.blocksize);
296    list_push_front(&temp_buf, &item->le);
297    spin_unlock(&temp_buf_lock);
298}
299
300static void _filemgr_shutdown_temp_buf()
301{
302    struct list_elem *e;
303    struct temp_buf_item *item;
304    size_t count=0;
305
306    spin_lock(&temp_buf_lock);
307    e = list_begin(&temp_buf);
308    while(e){
309        item = _get_entry(e, struct temp_buf_item, le);
310        e = list_remove(&temp_buf, e);
311        free_align(item->addr);
312        count++;
313    }
314    spin_unlock(&temp_buf_lock);
315}
316
317// Read a block from the file, decrypting if necessary.
318static ssize_t filemgr_read_block(struct filemgr *file, void *buf, bid_t bid) {
319    ssize_t result = file->ops->pread(file->fd, buf, file->blocksize,
320                                      file->blocksize*bid);
321    if (file->encryption.ops && result > 0) {
322        if (result != (ssize_t)file->blocksize)
323            return FDB_RESULT_READ_FAIL;
324        fdb_status status = fdb_decrypt_block(&file->encryption, buf, result, bid);
325        if (status != FDB_RESULT_SUCCESS)
326            return status;
327    }
328    return result;
329}
330
331// Write consecutive block(s) to the file, encrypting if necessary.
332ssize_t filemgr_write_blocks(struct filemgr *file, void *buf, unsigned num_blocks, bid_t start_bid) {
333    size_t blocksize = file->blocksize;
334    cs_off_t offset = start_bid * blocksize;
335    size_t nbytes = num_blocks * blocksize;
336    if (file->encryption.ops == NULL) {
337        return file->ops->pwrite(file->fd, buf, nbytes, offset);
338    } else {
339        uint8_t *encrypted_buf;
340        if (nbytes > 4096)
341            encrypted_buf = (uint8_t*)malloc(nbytes);
342        else
343            encrypted_buf = alca(uint8_t, nbytes); // most common case (writing single block)
344        if (!encrypted_buf)
345            return FDB_RESULT_ALLOC_FAIL;
346        fdb_status status = fdb_encrypt_blocks(&file->encryption,
347                                               encrypted_buf,
348                                               buf,
349                                               blocksize,
350                                               num_blocks,
351                                               start_bid);
352        if (nbytes > 4096)
353            free(encrypted_buf);
354        if (status != FDB_RESULT_SUCCESS)
355            return status;
356        return file->ops->pwrite(file->fd, encrypted_buf, nbytes, offset);
357    }
358}
359
360int filemgr_is_writable(struct filemgr *file, bid_t bid)
361{
362    if (sb_bmp_exists(file->sb) && sb_ops.is_writable) {
363        // block reusing is enabled
364        return sb_ops.is_writable(file, bid);
365    } else {
366        uint64_t pos = bid * file->blocksize;
367        // Note that we don't need to grab file->lock here because
368        // 1) both file->pos and file->last_commit are only incremented.
369        // 2) file->last_commit is updated using the value of file->pos,
370        //    and always equal to or smaller than file->pos.
371        return (pos <  atomic_get_uint64_t(&file->pos) &&
372                pos >= atomic_get_uint64_t(&file->last_commit));
373    }
374}
375
376uint64_t filemgr_get_sb_bmp_revnum(struct filemgr *file)
377{
378    if (file->sb && sb_ops.get_bmp_revnum) {
379        return sb_ops.get_bmp_revnum(file);
380    } else {
381        return 0;
382    }
383}
384
385static fdb_status _filemgr_read_header(struct filemgr *file,
386                                       err_log_callback *log_callback)
387{
388    uint8_t marker[BLK_MARKER_SIZE];
389    filemgr_magic_t magic = ver_get_latest_magic();
390    filemgr_header_len_t len;
391    uint8_t *buf;
392    uint32_t crc, crc_file;
393    bool check_crc32_open_rule = false;
394    fdb_status status = FDB_RESULT_SUCCESS;
395    bid_t hdr_bid, hdr_bid_local;
396    size_t min_filesize = 0;
397
398    // get temp buffer
399    buf = (uint8_t *) _filemgr_get_temp_buf();
400
401    // If a header is found crc_mode can change to reflect the file
402    if (file->crc_mode == CRC32) {
403        check_crc32_open_rule = true;
404    }
405
406    hdr_bid = atomic_get_uint64_t(&file->pos) / file->blocksize - 1;
407    hdr_bid_local = hdr_bid;
408
409    if (file->sb) {
410        // superblock exists .. file size does not start from zero.
411        min_filesize = file->sb->config->num_sb * file->blocksize;
412        bid_t sb_last_hdr_bid = atomic_get_uint64_t(&file->sb->last_hdr_bid);
413        if (sb_last_hdr_bid != BLK_NOT_FOUND) {
414            hdr_bid = hdr_bid_local = sb_last_hdr_bid;
415        }
416        // if header info does not exist in superblock,
417        // get DB header at the end of the file.
418    }
419
420    if (atomic_get_uint64_t(&file->pos) > min_filesize) {
421        // Crash Recovery Test 1: unaligned last block write
422        uint64_t remain = atomic_get_uint64_t(&file->pos) % file->blocksize;
423        if (remain) {
424            atomic_sub_uint64_t(&file->pos, remain);
425            atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
426            const char *msg = "Crash Detected: %" _F64 " non-block aligned bytes discarded "
427                "from a database file '%s'\n";
428            DBG(msg, remain, file->filename);
429            fdb_log(log_callback, FDB_RESULT_READ_FAIL /* Need to add a better error code*/,
430                    msg, remain, file->filename);
431        }
432
433        size_t block_counter = 0;
434        do {
435            if (hdr_bid_local * file->blocksize >= file->pos) {
436                // Handling EOF scenario
437                status = FDB_RESULT_NO_DB_HEADERS;
438                const char *msg = "Unable to read block from file '%s' as EOF "
439                                  "reached\n";
440                fdb_log(log_callback, status, msg, file->filename);
441                break;
442            }
443            ssize_t rv = filemgr_read_block(file, buf, hdr_bid_local);
444            if (rv != (ssize_t)file->blocksize) {
445                status = (fdb_status) rv;
446                const char *msg = "Unable to read a database file '%s' with "
447                                  "blocksize %u\n";
448                DBG(msg, file->filename, file->blocksize);
449                fdb_log(log_callback, status, msg, file->filename, file->blocksize);
450                break;
451            }
452            ++block_counter;
453            memcpy(marker, buf + file->blocksize - BLK_MARKER_SIZE,
454                   BLK_MARKER_SIZE);
455
456            if (marker[0] == BLK_MARKER_DBHEADER) {
457                // possible need for byte conversions here
458                memcpy(&magic,
459                       buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
460                       sizeof(magic));
461                magic = _endian_decode(magic);
462
463                if (ver_is_valid_magic(magic)) {
464
465                    memcpy(&len,
466                           buf + file->blocksize - BLK_MARKER_SIZE -
467                           sizeof(magic) - sizeof(len),
468                           sizeof(len));
469                    len = _endian_decode(len);
470
471                    memcpy(&crc_file, buf + len - sizeof(crc), sizeof(crc));
472                    crc_file = _endian_decode(crc_file);
473
474                    // crc check and detect the crc_mode
475                    if (detect_and_check_crc(reinterpret_cast<const uint8_t*>(buf),
476                                             len - sizeof(crc),
477                                             crc_file,
478                                             &file->crc_mode)) {
479                        // crc mode is detected and known.
480                        // check the rules of opening legacy CRC
481                        if (check_crc32_open_rule && file->crc_mode != CRC32) {
482                            const char *msg = "Open of CRC32C file"
483                                              " with forced CRC32\n";
484                            status = FDB_RESULT_INVALID_ARGS;
485                            DBG(msg);
486                            fdb_log(log_callback, status, msg);
487                            break;
488                        } else {
489                            status = FDB_RESULT_SUCCESS;
490
491                            file->header.data = (void *)malloc(file->blocksize);
492
493                            memcpy(file->header.data, buf, len);
494                            memcpy(&file->header.revnum, buf + len,
495                                   sizeof(filemgr_header_revnum_t));
496                            memcpy((void *) &file->header.seqnum,
497                                    buf + len + sizeof(filemgr_header_revnum_t),
498                                    sizeof(fdb_seqnum_t));
499
500                            if (ver_superblock_support(magic)) {
501                                // last_writable_bmp_revnum should be same with
502                                // the current bmp_revnum (since it indicates the
503                                // 'bmp_revnum' of 'sb->cur_alloc_bid').
504                                atomic_store_uint64_t(&file->last_writable_bmp_revnum,
505                                                      filemgr_get_sb_bmp_revnum(file));
506                            }
507
508                            file->header.revnum =
509                                _endian_decode(file->header.revnum);
510                            file->header.seqnum =
511                                _endian_decode(file->header.seqnum.load());
512                            file->header.size = len;
513                            atomic_store_uint64_t(&file->header.bid, hdr_bid_local);
514                            memset(&file->header.stat, 0x0, sizeof(file->header.stat));
515
516                            // release temp buffer
517                            _filemgr_release_temp_buf(buf);
518                        }
519
520                        file->version = magic;
521                        return status;
522                    } else {
523                        status = FDB_RESULT_CHECKSUM_ERROR;
524                        uint32_t crc32 = 0, crc32c = 0;
525                        crc32 = get_checksum(reinterpret_cast<const uint8_t*>(buf),
526                                             len - sizeof(crc),
527                                             CRC32);
528#ifdef _CRC32C
529                        crc32c = get_checksum(reinterpret_cast<const uint8_t*>(buf),
530                                              len - sizeof(crc),
531                                              CRC32C);
532#endif
533                        const char *msg = "Crash Detected: CRC on disk %u != (%u | %u) "
534                            "in a database file '%s'\n";
535                        DBG(msg, crc_file, crc32, crc32c, file->filename);
536                        fdb_log(log_callback, status, msg, crc_file, crc32, crc32c,
537                                file->filename);
538                    }
539                } else {
540                    status = FDB_RESULT_FILE_CORRUPTION;
541                    const char *msg = "Crash Detected: Wrong Magic %" _F64
542                                      " in a database file '%s'\n";
543                    fdb_log(log_callback, status, msg, magic, file->filename);
544                }
545            } else {
546                status = FDB_RESULT_NO_DB_HEADERS;
547                if (block_counter == 1) {
548                    const char *msg = "Crash Detected: Last Block not DBHEADER %0.01x "
549                                      "in a database file '%s'\n";
550                    DBG(msg, marker[0], file->filename);
551                    fdb_log(log_callback, status, msg, marker[0], file->filename);
552                }
553            }
554
555            atomic_store_uint64_t(&file->last_commit, hdr_bid_local * file->blocksize);
556            // traverse headers in a circular manner
557            if (hdr_bid_local) {
558                hdr_bid_local--;
559            } else {
560                hdr_bid_local = atomic_get_uint64_t(&file->pos) / file->blocksize - 1;
561            }
562        } while (hdr_bid_local != hdr_bid);
563    }
564
565    // release temp buffer
566    _filemgr_release_temp_buf(buf);
567
568    file->header.size = 0;
569    file->header.revnum = 0;
570    file->header.seqnum = 0;
571    file->header.data = NULL;
572    atomic_store_uint64_t(&file->header.bid, 0);
573    memset(&file->header.stat, 0x0, sizeof(file->header.stat));
574    file->version = magic;
575    return status;
576}
577
578size_t filemgr_get_ref_count(struct filemgr *file)
579{
580    size_t ret = 0;
581    spin_lock(&file->lock);
582    ret = atomic_get_uint32_t(&file->ref_count);
583    spin_unlock(&file->lock);
584    return ret;
585}
586
587uint64_t filemgr_get_bcache_used_space(void)
588{
589    uint64_t bcache_free_space = 0;
590    if (global_config.ncacheblock) { // If buffer cache is indeed configured
591        bcache_free_space = bcache_get_num_free_blocks();
592        bcache_free_space = (global_config.ncacheblock - bcache_free_space)
593                          * global_config.blocksize;
594    }
595    return bcache_free_space;
596}
597
598struct filemgr_prefetch_args {
599    struct filemgr *file;
600    uint64_t duration;
601    err_log_callback *log_callback;
602    void *aux;
603};
604
605static void *_filemgr_prefetch_thread(void *voidargs)
606{
607    struct filemgr_prefetch_args *args = (struct filemgr_prefetch_args*)voidargs;
608    uint8_t *buf = alca(uint8_t, args->file->blocksize);
609    uint64_t cur_pos = 0, i;
610    uint64_t bcache_free_space;
611    bid_t bid;
612    bool terminate = false;
613    struct timeval begin, cur, gap;
614
615    spin_lock(&args->file->lock);
616    cur_pos = atomic_get_uint64_t(&args->file->last_commit);
617    spin_unlock(&args->file->lock);
618    if (cur_pos < FILEMGR_PREFETCH_UNIT) {
619        terminate = true;
620    } else {
621        cur_pos -= FILEMGR_PREFETCH_UNIT;
622    }
623    // read backwards from the end of the file, in the unit of FILEMGR_PREFETCH_UNIT
624    gettimeofday(&begin, NULL);
625    while (!terminate) {
626        for (i = cur_pos;
627             i < cur_pos + FILEMGR_PREFETCH_UNIT;
628             i += args->file->blocksize) {
629
630            gettimeofday(&cur, NULL);
631            gap = _utime_gap(begin, cur);
632            bcache_free_space = bcache_get_num_free_blocks();
633            bcache_free_space *= args->file->blocksize;
634
635            if (atomic_get_uint8_t(&args->file->prefetch_status)
636                == FILEMGR_PREFETCH_ABORT ||
637                gap.tv_sec >= (int64_t)args->duration ||
638                bcache_free_space < FILEMGR_PREFETCH_UNIT) {
639                // terminate thread when
640                // 1. got abort signal
641                // 2. time out
642                // 3. not enough free space in block cache
643                terminate = true;
644                break;
645            } else {
646                bid = i / args->file->blocksize;
647                if (filemgr_read(args->file, bid, buf, NULL, true)
648                        != FDB_RESULT_SUCCESS) {
649                    // 4. read failure
650                    fdb_log(args->log_callback, FDB_RESULT_READ_FAIL,
651                            "Prefetch thread failed to read a block with block id %" _F64
652                            " from a database file '%s'", bid, args->file->filename);
653                    terminate = true;
654                    break;
655                }
656            }
657        }
658
659        if (cur_pos >= FILEMGR_PREFETCH_UNIT) {
660            cur_pos -= FILEMGR_PREFETCH_UNIT;
661        } else {
662            // remaining space is less than FILEMGR_PREFETCH_UNIT
663            terminate = true;
664        }
665    }
666
667    atomic_cas_uint8_t(&args->file->prefetch_status, FILEMGR_PREFETCH_RUNNING,
668                       FILEMGR_PREFETCH_IDLE);
669    free(args);
670    return NULL;
671}
672
673// prefetch the given DB file
674void filemgr_prefetch(struct filemgr *file,
675                      struct filemgr_config *config,
676                      err_log_callback *log_callback)
677{
678    uint64_t bcache_free_space;
679
680    bcache_free_space = bcache_get_num_free_blocks();
681    bcache_free_space *= file->blocksize;
682
683    // block cache should have free space larger than FILEMGR_PREFETCH_UNIT
684    spin_lock(&file->lock);
685    if (atomic_get_uint64_t(&file->last_commit) > 0 &&
686        bcache_free_space >= FILEMGR_PREFETCH_UNIT) {
687        // invoke prefetch thread
688        struct filemgr_prefetch_args *args;
689        args = (struct filemgr_prefetch_args *)
690               calloc(1, sizeof(struct filemgr_prefetch_args));
691        args->file = file;
692        args->duration = config->prefetch_duration;
693        args->log_callback = log_callback;
694
695        if (atomic_cas_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_IDLE,
696                               FILEMGR_PREFETCH_RUNNING)) {
697            thread_create(&file->prefetch_tid, _filemgr_prefetch_thread, args);
698        }
699    }
700    spin_unlock(&file->lock);
701}
702
703fdb_status filemgr_does_file_exist(char *filename) {
704    struct filemgr_ops *ops = get_filemgr_ops();
705    int fd = ops->open(filename, O_RDONLY, 0444);
706    if (fd < 0) {
707        return (fdb_status) fd;
708    }
709    ops->close(fd);
710    return FDB_RESULT_SUCCESS;
711}
712
713static fdb_status _filemgr_load_sb(struct filemgr *file,
714                                   err_log_callback *log_callback)
715{
716    fdb_status status = FDB_RESULT_SUCCESS;
717    struct sb_config sconfig;
718
719    if (sb_ops.init && sb_ops.get_default_config && sb_ops.read_latest) {
720        sconfig = sb_ops.get_default_config();
721        if (filemgr_get_pos(file)) {
722            // existing file
723            status = sb_ops.read_latest(file, sconfig, log_callback);
724        } else {
725            // new file
726            status = sb_ops.init(file, sconfig, log_callback);
727        }
728    }
729
730    return status;
731}
732
733static filemgr* get_instance_UNLOCKED(const char *filename)
734{
735    if (!filename) {
736        return NULL;
737    }
738
739    struct filemgr query;
740    struct hash_elem *e = NULL;
741    struct filemgr *file = NULL;
742
743    query.filename = (char*)filename;
744    e = hash_find(&hash, &query.e);
745    if (e) {
746        file = _get_entry(e, struct filemgr, e);
747    }
748    return file;
749}
750
751struct filemgr* filemgr_get_instance(const char* filename)
752{
753    spin_lock(&filemgr_openlock);
754    struct filemgr *file = get_instance_UNLOCKED(filename);
755    spin_unlock(&filemgr_openlock);
756
757    return file;
758}
759
760filemgr_open_result filemgr_open(char *filename, struct filemgr_ops *ops,
761                                 struct filemgr_config *config,
762                                 err_log_callback *log_callback)
763{
764    struct filemgr *file = NULL;
765    struct filemgr query;
766    struct hash_elem *e = NULL;
767    bool create = config->options & FILEMGR_CREATE;
768    int file_flag = 0x0;
769    int fd = -1;
770    fdb_status status;
771    filemgr_open_result result = {NULL, FDB_RESULT_OPEN_FAIL};
772
773    filemgr_init(config);
774
775    if (config->encryption_key.algorithm != FDB_ENCRYPTION_NONE && global_config.ncacheblock <= 0) {
776        // cannot use encryption without a block cache
777        result.rv = FDB_RESULT_CRYPTO_ERROR;
778        return result;
779    }
780
781    // check whether file is already opened or not
782    query.filename = filename;
783    spin_lock(&filemgr_openlock);
784    e = hash_find(&hash, &query.e);
785
786    if (e) {
787        // already opened (return existing structure)
788        file = _get_entry(e, struct filemgr, e);
789
790        if (atomic_incr_uint32_t(&file->ref_count) > 1 &&
791            atomic_get_uint8_t(&file->status) != FILE_CLOSED) {
792            spin_unlock(&filemgr_openlock);
793            result.file = file;
794            result.rv = FDB_RESULT_SUCCESS;
795            return result;
796        }
797
798        spin_lock(&file->lock);
799
800        if (atomic_get_uint8_t(&file->status) == FILE_CLOSED) { // if file was closed before
801            file_flag = O_RDWR;
802            if (create) {
803                file_flag |= O_CREAT;
804            }
805            *file->config = *config;
806            file->config->blocksize = global_config.blocksize;
807            file->config->ncacheblock = global_config.ncacheblock;
808            file_flag |= config->flag;
809            file->fd = file->ops->open(file->filename, file_flag, 0666);
810            if (file->fd < 0) {
811                if (file->fd == FDB_RESULT_NO_SUCH_FILE) {
812                    // A database file was manually deleted by the user.
813                    // Clean up global hash table, WAL index, and buffer cache.
814                    // Then, retry it with a create option below IFF it is not
815                    // a read-only open attempt
816                    struct hash_elem *ret;
817                    spin_unlock(&file->lock);
818                    ret = hash_remove(&hash, &file->e);
819                    fdb_assert(ret, 0, 0);
820                    filemgr_free_func(&file->e);
821                    if (!create) {
822                        _log_errno_str(ops, log_callback,
823                                FDB_RESULT_NO_SUCH_FILE, "OPEN", filename);
824                        spin_unlock(&filemgr_openlock);
825                        result.rv = FDB_RESULT_NO_SUCH_FILE;
826                        return result;
827                    }
828                } else {
829                    _log_errno_str(file->ops, log_callback,
830                                  (fdb_status)file->fd, "OPEN", filename);
831                    atomic_decr_uint32_t(&file->ref_count);
832                    spin_unlock(&file->lock);
833                    spin_unlock(&filemgr_openlock);
834                    result.rv = file->fd;
835                    return result;
836                }
837            } else { // Reopening the closed file is succeed.
838                atomic_store_uint8_t(&file->status, FILE_NORMAL);
839                if (config->options & FILEMGR_SYNC) {
840                    file->fflags |= FILEMGR_SYNC;
841                } else {
842                    file->fflags &= ~FILEMGR_SYNC;
843                }
844
845                spin_unlock(&file->lock);
846                spin_unlock(&filemgr_openlock);
847
848                result.file = file;
849                result.rv = FDB_RESULT_SUCCESS;
850                return result;
851            }
852        } else { // file is already opened.
853
854            if (config->options & FILEMGR_SYNC) {
855                file->fflags |= FILEMGR_SYNC;
856            } else {
857                file->fflags &= ~FILEMGR_SYNC;
858            }
859
860            spin_unlock(&file->lock);
861            spin_unlock(&filemgr_openlock);
862            result.file = file;
863            result.rv = FDB_RESULT_SUCCESS;
864            return result;
865        }
866    }
867
868    file_flag = O_RDWR;
869    if (create) {
870        file_flag |= O_CREAT;
871    }
872    file_flag |= config->flag;
873    fd = ops->open(filename, file_flag, 0666);
874    if (fd < 0) {
875        _log_errno_str(ops, log_callback, (fdb_status)fd, "OPEN", filename);
876        spin_unlock(&filemgr_openlock);
877        result.rv = fd;
878        return result;
879    }
880    file = (struct filemgr*)calloc(1, sizeof(struct filemgr));
881    file->filename_len = strlen(filename);
882    file->filename = (char*)malloc(file->filename_len + 1);
883    strcpy(file->filename, filename);
884
885    atomic_init_uint32_t(&file->ref_count, 1);
886    file->stale_list = NULL;
887
888    status = fdb_init_encryptor(&file->encryption, &config->encryption_key);
889    if (status != FDB_RESULT_SUCCESS) {
890        ops->close(fd);
891        free(file);
892        spin_unlock(&filemgr_openlock);
893        result.rv = status;
894        return result;
895    }
896
897    file->wal = (struct wal *)calloc(1, sizeof(struct wal));
898    file->wal->flag = 0;
899
900    file->ops = ops;
901    file->blocksize = global_config.blocksize;
902    atomic_init_uint8_t(&file->status, FILE_NORMAL);
903    file->config = (struct filemgr_config*)malloc(sizeof(struct filemgr_config));
904    *file->config = *config;
905    file->config->blocksize = global_config.blocksize;
906    file->config->ncacheblock = global_config.ncacheblock;
907    file->old_filename = NULL;
908    file->new_filename = NULL;
909    file->fd = fd;
910
911    cs_off_t offset = file->ops->goto_eof(file->fd);
912    if (offset < 0) {
913        _log_errno_str(file->ops, log_callback, (fdb_status) offset, "SEEK_END", filename);
914        file->ops->close(file->fd);
915        free(file->wal);
916        free(file->filename);
917        free(file->config);
918        free(file);
919        spin_unlock(&filemgr_openlock);
920        result.rv = (fdb_status) offset;
921        return result;
922    }
923    atomic_init_uint64_t(&file->last_commit, offset);
924    atomic_init_uint64_t(&file->last_writable_bmp_revnum, 0);
925    atomic_init_uint64_t(&file->pos, offset);
926    atomic_init_uint32_t(&file->throttling_delay, 0);
927    atomic_init_uint64_t(&file->num_invalidated_blocks, 0);
928    atomic_init_uint8_t(&file->io_in_prog, 0);
929
930#ifdef _LATENCY_STATS
931    for (int i = 0; i < FDB_LATENCY_NUM_STATS; ++i) {
932        filemgr_init_latency_stat(&file->lat_stats[i]);
933    }
934#endif // _LATENCY_STATS
935
936    file->bcache = NULL;
937    file->in_place_compaction = false;
938    file->kv_header = NULL;
939    atomic_init_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_IDLE);
940
941    atomic_init_uint64_t(&file->header.bid, 0);
942    _init_op_stats(&file->header.op_stat);
943
944    spin_init(&file->lock);
945    file->stale_list = (struct list*)calloc(1, sizeof(struct list));
946    list_init(file->stale_list);
947    avl_init(&file->stale_info_tree, NULL);
948    avl_init(&file->mergetree, NULL);
949    file->stale_info_tree_loaded = false;
950
951    filemgr_dirty_update_init(file);
952
953    spin_init(&file->fhandle_idx_lock);
954    avl_init(&file->fhandle_idx, NULL);
955
956#ifdef __FILEMGR_DATA_PARTIAL_LOCK
957    struct plock_ops pops;
958    struct plock_config pconfig;
959
960    pops.init_user = mutex_init_wrap;
961    pops.lock_user = mutex_lock_wrap;
962    pops.unlock_user = mutex_unlock_wrap;
963    pops.destroy_user = mutex_destroy_wrap;
964    pops.init_internal = spin_init_wrap;
965    pops.lock_internal = spin_lock_wrap;
966    pops.unlock_internal = spin_unlock_wrap;
967    pops.destroy_internal = spin_destroy_wrap;
968    pops.is_overlapped = _block_is_overlapped;
969
970    memset(&pconfig, 0x0, sizeof(pconfig));
971    pconfig.ops = &pops;
972    pconfig.sizeof_lock_internal = sizeof(spin_t);
973    pconfig.sizeof_lock_user = sizeof(mutex_t);
974    pconfig.sizeof_range = sizeof(bid_t);
975    pconfig.aux = NULL;
976    plock_init(&file->plock, &pconfig);
977#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
978    int i;
979    for (i=0;i<DLOCK_MAX;++i) {
980        mutex_init(&file->data_mutex[i]);
981    }
982#else
983    int i;
984    for (i=0;i<DLOCK_MAX;++i) {
985        spin_init(&file->data_spinlock[i]);
986    }
987#endif //__FILEMGR_DATA_PARTIAL_LOCK
988
989    mutex_init(&file->writer_lock.mutex);
990    file->writer_lock.locked = false;
991
992    // Note: CRC must be initialized before superblock loading
993    // initialize CRC mode
994    if (file->config && file->config->options & FILEMGR_CREATE_CRC32) {
995        file->crc_mode = CRC32;
996    } else {
997        file->crc_mode = CRC_DEFAULT;
998    }
999
1000    do { // repeat until both superblock and DB header are correctly read
1001        // init or load superblock
1002        status = _filemgr_load_sb(file, log_callback);
1003        // we can tolerate SB_READ_FAIL for old version file
1004        if (status != FDB_RESULT_SB_READ_FAIL &&
1005            status != FDB_RESULT_SUCCESS) {
1006            _log_errno_str(file->ops, log_callback, status, "READ", file->filename);
1007            file->ops->close(file->fd);
1008            free(file->stale_list);
1009            free(file->wal);
1010            free(file->filename);
1011            free(file->config);
1012            free(file);
1013            spin_unlock(&filemgr_openlock);
1014            result.rv = status;
1015            return result;
1016        }
1017
1018        // read header
1019        status = _filemgr_read_header(file, log_callback);
1020        if (file->sb && status == FDB_RESULT_NO_DB_HEADERS) {
1021            // this happens when user created & closed a file without any mutations,
1022            // thus there is no other data but superblocks.
1023            // we can tolerate this case.
1024        } else if (status != FDB_RESULT_SUCCESS) {
1025            _log_errno_str(file->ops, log_callback, status, "READ", filename);
1026            file->ops->close(file->fd);
1027            if (file->sb) {
1028                sb_ops.release(file);
1029            }
1030            free(file->stale_list);
1031            free(file->wal);
1032            free(file->filename);
1033            free(file->config);
1034            free(file);
1035            spin_unlock(&filemgr_openlock);
1036            result.rv = status;
1037            return result;
1038        }
1039
1040        if (file->sb &&
1041            file->header.revnum != atomic_get_uint64_t(&file->sb->last_hdr_revnum)) {
1042            // superblock exists but the corresponding DB header does not match.
1043            // read another candidate.
1044            continue;
1045        }
1046
1047        break;
1048    } while (true);
1049
1050    // initialize WAL
1051    if (!wal_is_initialized(file)) {
1052        wal_init(file, FDB_WAL_NBUCKET);
1053    }
1054
1055    // init global transaction for the file
1056    file->global_txn.wrapper = (struct wal_txn_wrapper*)
1057                               malloc(sizeof(struct wal_txn_wrapper));
1058    file->global_txn.wrapper->txn = &file->global_txn;
1059    file->global_txn.handle = NULL;
1060    if (atomic_get_uint64_t(&file->pos)) {
1061        file->global_txn.prev_hdr_bid =
1062            (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1;
1063    } else {
1064        file->global_txn.prev_hdr_bid = BLK_NOT_FOUND;
1065    }
1066    file->global_txn.prev_revnum = 0;
1067    file->global_txn.items = (struct list *)malloc(sizeof(struct list));
1068    list_init(file->global_txn.items);
1069    file->global_txn.isolation = FDB_ISOLATION_READ_COMMITTED;
1070    wal_add_transaction(file, &file->global_txn);
1071
1072    hash_insert(&hash, &file->e);
1073    if (config->prefetch_duration > 0) {
1074        filemgr_prefetch(file, config, log_callback);
1075    }
1076
1077    spin_unlock(&filemgr_openlock);
1078
1079    if (config->options & FILEMGR_SYNC) {
1080        file->fflags |= FILEMGR_SYNC;
1081    } else {
1082        file->fflags &= ~FILEMGR_SYNC;
1083    }
1084
1085    result.file = file;
1086    result.rv = FDB_RESULT_SUCCESS;
1087    fdb_log(log_callback, FDB_RESULT_SUCCESS, "Forestdb opened database file %s",
1088            filename);
1089
1090    return result;
1091}
1092
1093uint64_t filemgr_update_header(struct filemgr *file,
1094                               void *buf,
1095                               size_t len,
1096                               bool inc_revnum)
1097{
1098    uint64_t ret;
1099
1100    spin_lock(&file->lock);
1101
1102    if (file->header.data == NULL) {
1103        file->header.data = (void *)malloc(file->blocksize);
1104    }
1105    memcpy(file->header.data, buf, len);
1106    file->header.size = len;
1107    if (inc_revnum) {
1108        ++(file->header.revnum);
1109    }
1110    ret = file->header.revnum;
1111
1112    spin_unlock(&file->lock);
1113
1114    return ret;
1115}
1116
1117filemgr_header_revnum_t filemgr_get_header_revnum(struct filemgr *file)
1118{
1119    filemgr_header_revnum_t ret;
1120    spin_lock(&file->lock);
1121    ret = file->header.revnum;
1122    spin_unlock(&file->lock);
1123    return ret;
1124}
1125
1126// 'filemgr_get_seqnum', 'filemgr_set_seqnum',
1127// 'filemgr_get_walflush_revnum', 'filemgr_set_walflush_revnum'
1128// have to be protected by 'filemgr_mutex_lock' & 'filemgr_mutex_unlock'.
1129fdb_seqnum_t filemgr_get_seqnum(struct filemgr *file)
1130{
1131    return file->header.seqnum;
1132}
1133
1134void filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)
1135{
1136    file->header.seqnum = seqnum;
1137}
1138
1139void* filemgr_get_header(struct filemgr *file, void *buf, size_t *len,
1140                         bid_t *header_bid, fdb_seqnum_t *seqnum,
1141                         filemgr_header_revnum_t *header_revnum)
1142{
1143    spin_lock(&file->lock);
1144
1145    if (file->header.size > 0) {
1146        if (buf == NULL) {
1147            buf = (void*)malloc(file->header.size);
1148        }
1149        memcpy(buf, file->header.data, file->header.size);
1150    }
1151
1152    if (len) {
1153        *len = file->header.size;
1154    }
1155    if (header_bid) {
1156        *header_bid = filemgr_get_header_bid(file);
1157    }
1158    if (seqnum) {
1159        *seqnum = file->header.seqnum;
1160    }
1161    if (header_revnum) {
1162        *header_revnum = file->header.revnum;
1163    }
1164
1165    spin_unlock(&file->lock);
1166
1167    return buf;
1168}
1169
1170fdb_status filemgr_fetch_header(struct filemgr *file, uint64_t bid,
1171                                void *buf, size_t *len, fdb_seqnum_t *seqnum,
1172                                filemgr_header_revnum_t *header_revnum,
1173                                uint64_t *deltasize, uint64_t *version,
1174                                uint64_t *sb_bmp_revnum,
1175                                err_log_callback *log_callback)
1176{
1177    uint8_t *_buf;
1178    uint8_t marker[BLK_MARKER_SIZE];
1179    filemgr_header_len_t hdr_len;
1180    uint64_t _deltasize, _bmp_revnum;
1181    filemgr_magic_t magic;
1182    fdb_status status = FDB_RESULT_SUCCESS;
1183
1184    *len = 0;
1185
1186    if (!bid || bid == BLK_NOT_FOUND) {
1187        // No other header available
1188        return FDB_RESULT_SUCCESS;
1189    }
1190
1191    _buf = (uint8_t *)_filemgr_get_temp_buf();
1192
1193    status = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
1194
1195    if (status != FDB_RESULT_SUCCESS) {
1196        fdb_log(log_callback, status,
1197                "Failed to read a database header with block id %" _F64 " in "
1198                "a database file '%s'", bid, file->filename);
1199        _filemgr_release_temp_buf(_buf);
1200        return status;
1201    }
1202    memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1203            BLK_MARKER_SIZE);
1204
1205    if (marker[0] != BLK_MARKER_DBHEADER) {
1206        // Comment this warning log as of now because the circular block reuse
1207        // can cause false alarms as a previous stale header block can be reclaimed
1208        // and reused for incoming writes.
1209        /*
1210        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1211                "A block marker of the database header block id %" _F64 " in "
1212                "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1213                bid, file->filename);
1214        */
1215        _filemgr_release_temp_buf(_buf);
1216        return FDB_RESULT_READ_FAIL;
1217    }
1218    memcpy(&magic,
1219            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1220            sizeof(magic));
1221    magic = _endian_decode(magic);
1222    if (!ver_is_valid_magic(magic)) {
1223        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1224                "A block magic value of %" _F64 " in the database header block"
1225                "id %" _F64 " in a database file '%s'"
1226                "does NOT match FILEMGR_MAGIC %" _F64 "!",
1227                magic, bid, file->filename, ver_get_latest_magic());
1228        _filemgr_release_temp_buf(_buf);
1229        return FDB_RESULT_FILE_CORRUPTION;
1230    }
1231    memcpy(&hdr_len,
1232            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1233            sizeof(hdr_len), sizeof(hdr_len));
1234    hdr_len = _endian_decode(hdr_len);
1235
1236    memcpy(buf, _buf, hdr_len);
1237    *len = hdr_len;
1238    *version = magic;
1239
1240    if (header_revnum) {
1241        // copy the DB header revnum
1242        filemgr_header_revnum_t _revnum;
1243        memcpy(&_revnum, _buf + hdr_len, sizeof(_revnum));
1244        *header_revnum = _endian_decode(_revnum);
1245    }
1246    if (seqnum) {
1247        // copy default KVS's seqnum
1248        fdb_seqnum_t _seqnum;
1249        memcpy(&_seqnum, _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1250               sizeof(_seqnum));
1251        *seqnum = _endian_decode(_seqnum);
1252    }
1253
1254    if (ver_is_atleast_magic_001(magic)) {
1255        if (deltasize) {
1256            memcpy(&_deltasize, _buf + file->blocksize - BLK_MARKER_SIZE
1257                    - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1258                    - sizeof(_deltasize), sizeof(_deltasize));
1259            *deltasize = _endian_decode(_deltasize);
1260        }
1261    }
1262
1263    if (sb_bmp_revnum && ver_superblock_support(magic)) {
1264        memcpy(&_bmp_revnum, _buf + file->blocksize - BLK_MARKER_SIZE
1265                - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1266                - sizeof(_deltasize) - sizeof(_bmp_revnum), sizeof(_bmp_revnum));
1267        *sb_bmp_revnum = _endian_decode(_bmp_revnum);
1268    }
1269
1270    _filemgr_release_temp_buf(_buf);
1271
1272    return status;
1273}
1274
1275uint64_t filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid,
1276                                   void *buf, size_t *len, fdb_seqnum_t *seqnum,
1277                                   filemgr_header_revnum_t *revnum,
1278                                   uint64_t *deltasize, uint64_t *version,
1279                                   uint64_t *sb_bmp_revnum,
1280                                   err_log_callback *log_callback)
1281{
1282    uint8_t *_buf;
1283    uint8_t marker[BLK_MARKER_SIZE];
1284    fdb_seqnum_t _seqnum;
1285    filemgr_header_revnum_t _revnum, cur_revnum, prev_revnum;
1286    filemgr_header_len_t hdr_len;
1287    filemgr_magic_t magic;
1288    bid_t _prev_bid, prev_bid;
1289    uint64_t _deltasize, _bmp_revnum;
1290    int found = 0;
1291
1292    *len = 0;
1293
1294    if (!bid || bid == BLK_NOT_FOUND) {
1295        // No other header available
1296        return bid;
1297    }
1298    _buf = (uint8_t *)_filemgr_get_temp_buf();
1299
1300    // Reverse scan the file for a previous DB header
1301    do {
1302        // Get prev_bid from the current header.
1303        // Since the current header is already cached during the previous
1304        // operation, no disk I/O will be triggered.
1305        if (filemgr_read(file, (bid_t)bid, _buf, log_callback, true)
1306                != FDB_RESULT_SUCCESS) {
1307            break;
1308        }
1309
1310        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1311               BLK_MARKER_SIZE);
1312        memcpy(&magic,
1313               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1314               sizeof(magic));
1315        magic = _endian_decode(magic);
1316
1317        if (marker[0] != BLK_MARKER_DBHEADER ||
1318            !ver_is_valid_magic(magic)) {
1319            // not a header block
1320            // this happens when this function is invoked between
1321            // fdb_set() call and fdb_commit() call, so the last block
1322            // in the file is not a header block
1323            bid_t latest_hdr = filemgr_get_header_bid(file);
1324            if (latest_hdr != BLK_NOT_FOUND && bid > latest_hdr) {
1325                // get the latest header BID
1326                bid = latest_hdr;
1327            } else {
1328                break;
1329            }
1330            cur_revnum = file->header.revnum + 1;
1331        } else {
1332
1333            memcpy(&hdr_len,
1334                   _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1335                   sizeof(hdr_len), sizeof(hdr_len));
1336            hdr_len = _endian_decode(hdr_len);
1337
1338            memcpy(&_revnum, _buf + hdr_len,
1339                   sizeof(filemgr_header_revnum_t));
1340            cur_revnum = _endian_decode(_revnum);
1341
1342            if (sb_bmp_exists(file->sb)) {
1343                // first check revnum
1344                if (cur_revnum <= sb_ops.get_min_live_revnum(file)) {
1345                    // previous headers already have been reclaimed
1346                    // no more logical prev header
1347                    break;
1348                }
1349            }
1350
1351            memcpy(&_prev_bid,
1352                   _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1353                       sizeof(hdr_len) - sizeof(_prev_bid),
1354                   sizeof(_prev_bid));
1355            prev_bid = _endian_decode(_prev_bid);
1356            bid = prev_bid;
1357        }
1358
1359        // Read the prev header
1360        fdb_status fs = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
1361        if (fs != FDB_RESULT_SUCCESS) {
1362            fdb_log(log_callback, fs,
1363                    "Failed to read a previous database header with block id %"
1364                    _F64 " in "
1365                    "a database file '%s'", bid, file->filename);
1366            break;
1367        }
1368
1369        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1370               BLK_MARKER_SIZE);
1371        if (marker[0] != BLK_MARKER_DBHEADER) {
1372            if (bid) {
1373                // broken linked list
1374                fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1375                        "A block marker of the previous database header block id %"
1376                        _F64 " in "
1377                        "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1378                        bid, file->filename);
1379            }
1380            break;
1381        }
1382
1383        memcpy(&magic,
1384               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1385               sizeof(magic));
1386        magic = _endian_decode(magic);
1387        if (!ver_is_valid_magic(magic)) {
1388            // broken linked list
1389            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1390                    "A block magic value of %" _F64
1391                    " of the previous database header block id %" _F64 " in "
1392                    "a database file '%s' does NOT match FILEMGR_MAGIC %"
1393                    _F64"!", magic,
1394                    bid, file->filename, ver_get_latest_magic());
1395            break;
1396        }
1397
1398        memcpy(&hdr_len,
1399               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1400               sizeof(hdr_len), sizeof(hdr_len));
1401        hdr_len = _endian_decode(hdr_len);
1402
1403        if (buf) {
1404            memcpy(buf, _buf, hdr_len);
1405        }
1406        memcpy(&_revnum, _buf + hdr_len,
1407               sizeof(filemgr_header_revnum_t));
1408        prev_revnum = _endian_decode(_revnum);
1409        if (prev_revnum >= cur_revnum ||
1410            prev_revnum < sb_ops.get_min_live_revnum(file)) {
1411            // no more prev header, or broken linked list
1412            break;
1413        }
1414
1415        memcpy(&_seqnum,
1416               _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1417               sizeof(fdb_seqnum_t));
1418        if (ver_is_atleast_magic_001(magic)) {
1419            if (deltasize) {
1420                memcpy(&_deltasize,
1421                        _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic)
1422                       - sizeof(hdr_len) - sizeof(prev_bid) - sizeof(_deltasize),
1423                        sizeof(_deltasize));
1424                *deltasize = _endian_decode(_deltasize);
1425            }
1426        }
1427
1428        if (sb_bmp_revnum && ver_superblock_support(magic)) {
1429            memcpy(&_bmp_revnum, _buf + file->blocksize - BLK_MARKER_SIZE
1430                    - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1431                    - sizeof(_deltasize) - sizeof(_bmp_revnum), sizeof(_bmp_revnum));
1432            *sb_bmp_revnum = _endian_decode(_bmp_revnum);
1433        }
1434
1435        if (revnum) {
1436            *revnum = prev_revnum;
1437        }
1438        *seqnum = _endian_decode(_seqnum);
1439        *len = hdr_len;
1440        *version = magic;
1441        found = 1;
1442        break;
1443    } while (false); // no repetition
1444
1445    if (!found) { // no other header found till end of file
1446        *len = 0;
1447        bid = BLK_NOT_FOUND;
1448    }
1449
1450    _filemgr_release_temp_buf(_buf);
1451
1452    return bid;
1453}
1454
1455fdb_status filemgr_close(struct filemgr *file, bool cleanup_cache_onclose,
1456                         const char *orig_file_name,
1457                         err_log_callback *log_callback)
1458{
1459    int rv = FDB_RESULT_SUCCESS;
1460
1461    if (atomic_decr_uint32_t(&file->ref_count) > 0) {
1462        // File is still accessed by other readers or writers.
1463        return FDB_RESULT_SUCCESS;
1464    }
1465
1466    fdb_log(log_callback, (fdb_status)rv, "Forestdb closed database file %s",
1467            file->filename);
1468
1469    spin_lock(&filemgr_openlock); // Grab the filemgr lock to avoid the race with
1470                                  // filemgr_open() because file->lock won't
1471                                  // prevent the race condition.
1472
1473    // remove filemgr structure if no thread refers to the file
1474    spin_lock(&file->lock);
1475    if (atomic_get_uint32_t(&file->ref_count) == 0) {
1476        if (global_config.ncacheblock > 0 &&
1477            atomic_get_uint8_t(&file->status) != FILE_REMOVED_PENDING) {
1478            spin_unlock(&file->lock);
1479            // discard all dirty blocks belonged to this file
1480            bcache_remove_dirty_blocks(file);
1481        } else {
1482            // If the file is in pending removal (i.e., FILE_REMOVED_PENDING),
1483            // then its dirty block entries will be cleaned up in either
1484            // filemgr_free_func() or register_file_removal() below.
1485            spin_unlock(&file->lock);
1486        }
1487
1488        if (wal_is_initialized(file)) {
1489            wal_close(file, log_callback);
1490        }
1491#ifdef _LATENCY_STATS_DUMP_TO_FILE
1492        filemgr_dump_latency_stat(file, log_callback);
1493#endif // _LATENCY_STATS_DUMP_TO_FILE
1494
1495        spin_lock(&file->lock);
1496
1497        if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING) {
1498
1499            bool foreground_deletion = false;
1500            struct filemgr* new_file = get_instance_UNLOCKED(file->new_filename);
1501
1502            // immediately remove file if background remove function is not set
1503            if (!lazy_file_deletion_enabled ||
1504                (new_file && new_file->in_place_compaction)) {
1505                // TODO: to avoid the scenario below, we prevent background
1506                //       deletion of in-place compacted files at this time.
1507                // 1) In-place compacted from 'A' to 'A.1'.
1508                // 2) Request to delete 'A'.
1509                // 3) Close 'A.1'; since 'A' is not deleted yet, 'A.1' is not renamed.
1510                // 4) User opens DB file using its original name 'A', not 'A.1'.
1511                // 5) Old file 'A' is opened, and then background thread deletes 'A'.
1512                // 6) Crash!
1513
1514                // As the file is already unlinked, the file will be removed
1515                // as soon as we close it.
1516                rv = file->ops->close(file->fd);
1517                _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1518#if defined(WIN32) || defined(_WIN32)
1519                // For Windows, we need to manually remove the file.
1520                remove(file->filename);
1521#endif
1522                foreground_deletion = true;
1523            }
1524
1525            // we can release lock becuase no one will open this file
1526            spin_unlock(&file->lock);
1527            struct hash_elem *ret = hash_remove(&hash, &file->e);
1528            fdb_assert(ret, 0, 0);
1529
1530            spin_unlock(&filemgr_openlock);
1531
1532            if (foreground_deletion) {
1533                filemgr_free_func(&file->e);
1534            } else {
1535                register_file_removal(file, log_callback);
1536            }
1537            return (fdb_status) rv;
1538        } else {
1539
1540            rv = file->ops->close(file->fd);
1541            if (cleanup_cache_onclose) {
1542                _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1543                if (file->in_place_compaction && orig_file_name) {
1544                    struct hash_elem *elem = NULL;
1545                    struct filemgr query;
1546                    uint32_t old_file_refcount = 0;
1547
1548                    query.filename = (char *)orig_file_name;
1549                    elem = hash_find(&hash, &query.e);
1550
1551                    if (file->old_filename) {
1552                        struct hash_elem *elem_old = NULL;
1553                        struct filemgr query_old;
1554                        struct filemgr *old_file = NULL;
1555
1556                        // get old file's ref count if exists
1557                        query_old.filename = file->old_filename;
1558                        elem_old = hash_find(&hash, &query_old.e);
1559                        if (elem_old) {
1560                            old_file = _get_entry(elem_old, struct filemgr, e);
1561                            old_file_refcount = atomic_get_uint32_t(&old_file->ref_count);
1562                        }
1563                    }
1564
1565                    // If old file is opened by other handle, renaming should be
1566                    // postponed. It will be renamed later by the handle referring
1567                    // to the old file.
1568                    if (!elem && old_file_refcount == 0 &&
1569                        is_file_removed(orig_file_name)) {
1570                        // If background file removal is not done yet, we postpone
1571                        // file renaming at this time.
1572                        if (rename(file->filename, orig_file_name) < 0) {
1573                            // Note that the renaming failure is not a critical
1574                            // issue because the last compacted file will be automatically
1575                            // identified and opened in the next fdb_open call.
1576                            _log_errno_str(file->ops, log_callback, FDB_RESULT_FILE_RENAME_FAIL,
1577                                           "CLOSE", file->filename);
1578                        }
1579                    }
1580                }
1581                spin_unlock(&file->lock);
1582                // Clean up global hash table, WAL index, and buffer cache.
1583                struct hash_elem *ret = hash_remove(&hash, &file->e);
1584                fdb_assert(ret, file, 0);
1585
1586                spin_unlock(&filemgr_openlock);
1587
1588                filemgr_free_func(&file->e);
1589                return (fdb_status) rv;
1590            } else {
1591                atomic_store_uint8_t(&file->status, FILE_CLOSED);
1592            }
1593        }
1594    }
1595
1596    _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1597
1598    spin_unlock(&file->lock);
1599    spin_unlock(&filemgr_openlock);
1600    return (fdb_status) rv;
1601}
1602
1603void filemgr_remove_all_buffer_blocks(struct filemgr *file)
1604{
1605    // remove all cached blocks
1606    if (global_config.ncacheblock > 0 &&
1607            file->bcache.load(std::memory_order_relaxed)) {
1608        bcache_remove_dirty_blocks(file);
1609        bcache_remove_clean_blocks(file);
1610        bcache_remove_file(file);
1611        file->bcache.store(NULL, std::memory_order_relaxed);
1612    }
1613}
1614
1615void _free_fhandle_idx(struct avl_tree *idx);
1616void filemgr_free_func(struct hash_elem *h)
1617{
1618    struct filemgr *file = _get_entry(h, struct filemgr, e);
1619
1620    filemgr_prefetch_status_t prefetch_state =
1621                              atomic_get_uint8_t(&file->prefetch_status);
1622
1623    atomic_store_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_ABORT);
1624    if (prefetch_state == FILEMGR_PREFETCH_RUNNING) {
1625        // prefetch thread was running
1626        void *ret;
1627        // wait (the thread must have been created..)
1628        thread_join(file->prefetch_tid, &ret);
1629    }
1630
1631    // remove all cached blocks
1632    if (global_config.ncacheblock > 0 &&
1633            file->bcache.load(std::memory_order_relaxed)) {
1634        bcache_remove_dirty_blocks(file);
1635        bcache_remove_clean_blocks(file);
1636        bcache_remove_file(file);
1637        file->bcache.store(NULL, std::memory_order_relaxed);
1638    }
1639
1640    if (file->kv_header) {
1641        // multi KV intance mode & KV header exists
1642        file->free_kv_header(file);
1643    }
1644
1645    // free global transaction
1646    wal_remove_transaction(file, &file->global_txn);
1647    free(file->global_txn.items);
1648    free(file->global_txn.wrapper);
1649
1650    // destroy WAL
1651    if (wal_is_initialized(file)) {
1652        wal_shutdown(file, NULL);
1653        wal_destroy(file);
1654    }
1655    free(file->wal);
1656
1657#ifdef _LATENCY_STATS
1658    for (int x = 0; x < FDB_LATENCY_NUM_STATS; ++x) {
1659        filemgr_destroy_latency_stat(&file->lat_stats[x]);
1660    }
1661#endif // _LATENCY_STATS
1662
1663    // free filename and header
1664    free(file->filename);
1665    if (file->header.data) free(file->header.data);
1666
1667    // free old/new filename if any
1668    free(file->old_filename);
1669    free(file->new_filename);
1670
1671    // destroy locks
1672    spin_destroy(&file->lock);
1673
1674#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1675    plock_destroy(&file->plock);
1676#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1677    int i;
1678    for (i=0;i<DLOCK_MAX;++i) {
1679        mutex_destroy(&file->data_mutex[i]);
1680    }
1681#else
1682    int i;
1683    for (i=0;i<DLOCK_MAX;++i) {
1684        spin_destroy(&file->data_spinlock[i]);
1685    }
1686#endif //__FILEMGR_DATA_PARTIAL_LOCK
1687
1688    mutex_destroy(&file->writer_lock.mutex);
1689
1690    // free superblock
1691    if (sb_ops.release) {
1692        sb_ops.release(file);
1693    }
1694
1695    // free dirty update index
1696    filemgr_dirty_update_free(file);
1697
1698    // free fhandle idx
1699    _free_fhandle_idx(&file->fhandle_idx);
1700    spin_destroy(&file->fhandle_idx_lock);
1701
1702    // free file structure
1703    struct list *stale_list = filemgr_get_stale_list(file);
1704    filemgr_clear_stale_list(file);
1705    filemgr_clear_stale_info_tree(file);
1706    filemgr_clear_mergetree(file);
1707    free(stale_list);
1708    free(file->config);
1709    free(file);
1710}
1711
1712// permanently remove file from cache (not just close)
1713// LCOV_EXCL_START
1714void filemgr_remove_file(struct filemgr *file, err_log_callback *log_callback)
1715{
1716    struct hash_elem *ret;
1717
1718    if (!file || atomic_get_uint32_t(&file->ref_count) > 0) {
1719        return;
1720    }
1721
1722    // remove from global hash table
1723    spin_lock(&filemgr_openlock);
1724    ret = hash_remove(&hash, &file->e);
1725    fdb_assert(ret, ret, NULL);
1726    spin_unlock(&filemgr_openlock);
1727
1728    struct filemgr *new_file = filemgr_get_instance(file->new_filename);
1729
1730    if (!lazy_file_deletion_enabled ||
1731        (new_file && new_file->in_place_compaction)) {
1732        filemgr_free_func(&file->e);
1733    } else {
1734        register_file_removal(file, log_callback);
1735    }
1736}
1737// LCOV_EXCL_STOP
1738
1739static
1740void *_filemgr_is_closed(struct hash_elem *h, void *ctx) {
1741    struct filemgr *file = _get_entry(h, struct filemgr, e);
1742    void *ret;
1743    spin_lock(&file->lock);
1744    if (atomic_get_uint32_t(&file->ref_count) != 0) {
1745        ret = (void *)file;
1746    } else {
1747        ret = NULL;
1748    }
1749    spin_unlock(&file->lock);
1750    return ret;
1751}
1752
1753fdb_status filemgr_shutdown()
1754{
1755    fdb_status ret = FDB_RESULT_SUCCESS;
1756    void *open_file;
1757    if (filemgr_initialized) {
1758
1759#ifndef SPIN_INITIALIZER
1760        // Windows: check if spin lock is already destroyed.
1761        if (InterlockedCompareExchange(&initial_lock_status, 1, 2) == 2) {
1762            spin_lock(&initial_lock);
1763        } else {
1764            // filemgr is already shut down
1765            return ret;
1766        }
1767#else
1768        spin_lock(&initial_lock);
1769#endif
1770
1771        if (!filemgr_initialized) {
1772            // filemgr is already shut down
1773#ifdef SPIN_INITIALIZER
1774            spin_unlock(&initial_lock);
1775#endif
1776            return ret;
1777        }
1778
1779        spin_lock(&filemgr_openlock);
1780        open_file = hash_scan(&hash, _filemgr_is_closed, NULL);
1781        spin_unlock(&filemgr_openlock);
1782        if (!open_file) {
1783            hash_free_active(&hash, filemgr_free_func);
1784            if (global_config.ncacheblock > 0) {
1785                bcache_shutdown();
1786            }
1787            filemgr_initialized = 0;
1788#ifndef SPIN_INITIALIZER
1789            initial_lock_status = 0;
1790#else
1791            initial_lock = SPIN_INITIALIZER;
1792#endif
1793            _filemgr_shutdown_temp_buf();
1794            spin_unlock(&initial_lock);
1795#ifndef SPIN_INITIALIZER
1796            spin_destroy(&initial_lock);
1797#endif
1798        } else {
1799            spin_unlock(&initial_lock);
1800            ret = FDB_RESULT_FILE_IS_BUSY;
1801        }
1802    }
1803    return ret;
1804}
1805
1806bid_t filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)
1807{
1808    spin_lock(&file->lock);
1809    bid_t bid = BLK_NOT_FOUND;
1810
1811    // block reusing is not allowed for being compacted file
1812    // for easy implementation.
1813    if (filemgr_get_file_status(file) == FILE_NORMAL &&
1814        file->sb && sb_ops.alloc_block) {
1815        bid = sb_ops.alloc_block(file);
1816    }
1817    if (bid == BLK_NOT_FOUND) {
1818        bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1819        atomic_add_uint64_t(&file->pos, file->blocksize);
1820    }
1821
1822    if (global_config.ncacheblock <= 0) {
1823        // if block cache is turned off, write the allocated block before use
1824        uint8_t _buf = 0x0;
1825        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1826                                       (bid+1) * file->blocksize - 1);
1827        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1828    }
1829    spin_unlock(&file->lock);
1830
1831    return bid;
1832}
1833
1834// Note that both alloc_multiple & alloc_multiple_cond are not used in
1835// the new version of DB file (with superblock support).
1836void filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin,
1837                            bid_t *end, err_log_callback *log_callback)
1838{
1839    spin_lock(&file->lock);
1840    *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1841    *end = *begin + nblock - 1;
1842    atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1843
1844    if (global_config.ncacheblock <= 0) {
1845        // if block cache is turned off, write the allocated block before use
1846        uint8_t _buf = 0x0;
1847        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1848                                       atomic_get_uint64_t(&file->pos) - 1);
1849        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1850    }
1851    spin_unlock(&file->lock);
1852}
1853
1854// atomically allocate NBLOCK blocks only when current file position is same to nextbid
1855bid_t filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock,
1856                                  bid_t *begin, bid_t *end,
1857                                  err_log_callback *log_callback)
1858{
1859    bid_t bid;
1860    spin_lock(&file->lock);
1861    bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1862    if (bid == nextbid) {
1863        *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1864        *end = *begin + nblock - 1;
1865        atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1866
1867        if (global_config.ncacheblock <= 0) {
1868            // if block cache is turned off, write the allocated block before use
1869            uint8_t _buf = 0x0;
1870            ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1871                                           atomic_get_uint64_t(&file->pos));
1872            _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1873        }
1874    }else{
1875        *begin = BLK_NOT_FOUND;
1876        *end = BLK_NOT_FOUND;
1877    }
1878    spin_unlock(&file->lock);
1879    return bid;
1880}
1881
1882#ifdef __CRC32
1883INLINE fdb_status _filemgr_crc32_check(struct filemgr *file, void *buf)
1884{
1885    if ( *((uint8_t*)buf + file->blocksize-1) == BLK_MARKER_BNODE ) {
1886        uint32_t crc_file = 0;
1887        memcpy(&crc_file, (uint8_t *) buf + BTREE_CRC_OFFSET, sizeof(crc_file));
1888        crc_file = _endian_decode(crc_file);
1889        memset((uint8_t *) buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1890        if (!perform_integrity_check(reinterpret_cast<const uint8_t*>(buf),
1891                                     file->blocksize,
1892                                     crc_file,
1893                                     file->crc_mode)) {
1894            return FDB_RESULT_CHECKSUM_ERROR;
1895        }
1896    }
1897    return FDB_RESULT_SUCCESS;
1898}
1899#endif
1900
1901bool filemgr_invalidate_block(struct filemgr *file, bid_t bid)
1902{
1903    bool ret;
1904    if (atomic_get_uint64_t(&file->last_commit) < bid * file->blocksize) {
1905        ret = true; // block invalidated was allocated recently (uncommitted)
1906    } else {
1907        ret = false; // a block from the past is invalidated (committed)
1908    }
1909    if (global_config.ncacheblock > 0) {
1910        bcache_invalidate_block(file, bid);
1911    }
1912    return ret;
1913}
1914
1915bool filemgr_is_fully_resident(struct filemgr *file)
1916{
1917    bool ret = false;
1918    if (global_config.ncacheblock > 0) {
1919        //TODO: A better thing to do is to track number of document blocks
1920        // and only compare those with the cached document block count
1921        double num_cached_blocks = (double)bcache_get_num_blocks(file);
1922        uint64_t num_blocks = atomic_get_uint64_t(&file->pos)
1923                                 / file->blocksize;
1924        double num_fblocks = (double)num_blocks;
1925        if (num_cached_blocks > num_fblocks * FILEMGR_RESIDENT_THRESHOLD) {
1926            ret = true;
1927        }
1928    }
1929    return ret;
1930}
1931
1932uint64_t filemgr_flush_immutable(struct filemgr *file,
1933                                   err_log_callback *log_callback)
1934{
1935    uint64_t ret = 0;
1936    if (global_config.ncacheblock > 0) {
1937        if (atomic_get_uint8_t(&file->io_in_prog)) {
1938            return 0;
1939        }
1940        ret = bcache_get_num_immutable(file);
1941        if (!ret) {
1942            return ret;
1943        }
1944        fdb_status rv = bcache_flush_immutable(file);
1945        if (rv != FDB_RESULT_SUCCESS) {
1946            _log_errno_str(file->ops, log_callback, (fdb_status)rv, "WRITE",
1947                           file->filename);
1948        }
1949        return bcache_get_num_immutable(file);
1950    }
1951
1952    return ret;
1953}
1954
1955fdb_status filemgr_read(struct filemgr *file, bid_t bid, void *buf,
1956                        err_log_callback *log_callback,
1957                        bool read_on_cache_miss)
1958{
1959    size_t lock_no;
1960    ssize_t r;
1961    uint64_t pos = bid * file->blocksize;
1962    fdb_status status = FDB_RESULT_SUCCESS;
1963    uint64_t curr_pos = atomic_get_uint64_t(&file->pos);
1964
1965    if (pos >= curr_pos) {
1966        const char *msg = "Read error: read offset %" _F64 " exceeds the file's "
1967                          "current offset %" _F64 " in a database file '%s'\n";
1968        fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, pos, curr_pos,
1969                file->filename);
1970        return FDB_RESULT_READ_FAIL;
1971    }
1972
1973    if (global_config.ncacheblock > 0) {
1974        lock_no = bid % DLOCK_MAX;
1975        (void)lock_no;
1976
1977#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1978        plock_entry_t *plock_entry = NULL;
1979        bid_t is_writer = 0;
1980#endif
1981        bool locked = false;
1982        // Note: we don't need to grab lock for committed blocks
1983        // because they are immutable so that no writer will interfere and
1984        // overwrite dirty data
1985        if (filemgr_is_writable(file, bid)) {
1986#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1987            plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1988#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1989            mutex_lock(&file->data_mutex[lock_no]);
1990#else
1991            spin_lock(&file->data_spinlock[lock_no]);
1992#endif //__FILEMGR_DATA_PARTIAL_LOCK
1993            locked = true;
1994        }
1995
1996        r = bcache_read(file, bid, buf);
1997        if (r == 0) {
1998            // cache miss
1999            if (!read_on_cache_miss) {
2000                if (locked) {
2001#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2002                    plock_unlock(&file->plock, plock_entry);
2003#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2004                    mutex_unlock(&file->data_mutex[lock_no]);
2005#else
2006                    spin_unlock(&file->data_spinlock[lock_no]);
2007#endif //__FILEMGR_DATA_PARTIAL_LOCK
2008                }
2009                const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2010                    "doesn't exist in the cache and read_on_cache_miss flag is turned on.\n";
2011                fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, bid,
2012                        file->filename);
2013                return FDB_RESULT_READ_FAIL;
2014            }
2015
2016            // if normal file, just read a block
2017            r = filemgr_read_block(file, buf, bid);
2018            if (r != (ssize_t)file->blocksize) {
2019                _log_errno_str(file->ops, log_callback,
2020                               (fdb_status) r, "READ", file->filename);
2021                if (locked) {
2022#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2023                    plock_unlock(&file->plock, plock_entry);
2024#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2025                    mutex_unlock(&file->data_mutex[lock_no]);
2026#else
2027                    spin_unlock(&file->data_spinlock[lock_no]);
2028#endif //__FILEMGR_DATA_PARTIAL_LOCK
2029                }
2030                const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2031                    "is not read correctly: only %d bytes read.\n";
2032                status = r < 0 ? (fdb_status)r : FDB_RESULT_READ_FAIL;
2033                fdb_log(log_callback, status, msg, bid, file->filename, r);
2034                if (!log_callback || !log_callback->callback) {
2035                    dbg_print_buf(buf, file->blocksize, true, 16);
2036                }
2037                return status;
2038            }
2039#ifdef __CRC32
2040            status = _filemgr_crc32_check(file, buf);
2041            if (status != FDB_RESULT_SUCCESS) {
2042                _log_errno_str(file->ops, log_callback, status, "READ",
2043                        file->filename);
2044                if (locked) {
2045#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2046                    plock_unlock(&file->plock, plock_entry);
2047#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2048                    mutex_unlock(&file->data_mutex[lock_no]);
2049#else
2050                    spin_unlock(&file->data_spinlock[lock_no]);
2051#endif //__FILEMGR_DATA_PARTIAL_LOCK
2052                }
2053                const char *msg = "Read error: checksum error on BID %" _F64 " in a database file '%s' "
2054                    ": marker %x\n";
2055                fdb_log(log_callback, status, msg, bid,
2056                        file->filename, *((uint8_t*)buf + file->blocksize-1));
2057                if (!log_callback || !log_callback->callback) {
2058                    dbg_print_buf(buf, file->blocksize, true, 16);
2059                }
2060                return status;
2061            }
2062#endif
2063            r = bcache_write(file, bid, buf, BCACHE_REQ_CLEAN, false);
2064            if (r != global_config.blocksize) {
2065                if (locked) {
2066#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2067                    plock_unlock(&file->plock, plock_entry);
2068#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2069                    mutex_unlock(&file->data_mutex[lock_no]);
2070#else
2071                    spin_unlock(&file->data_spinlock[lock_no]);
2072#endif //__FILEMGR_DATA_PARTIAL_LOCK
2073                }
2074                _log_errno_str(file->ops, log_callback,
2075                               (fdb_status) r, "WRITE", file->filename);
2076                const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2077                    "is not written in cache correctly: only %d bytes written.\n";
2078                status = r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2079                fdb_log(log_callback, status, msg, bid, file->filename, r);
2080                if (!log_callback || !log_callback->callback) {
2081                    dbg_print_buf(buf, file->blocksize, true, 16);
2082                }
2083                return status;
2084            }
2085        }
2086        if (locked) {
2087#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2088            plock_unlock(&file->plock, plock_entry);
2089#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2090            mutex_unlock(&file->data_mutex[lock_no]);
2091#else
2092            spin_unlock(&file->data_spinlock[lock_no]);
2093#endif //__FILEMGR_DATA_PARTIAL_LOCK
2094        }
2095    } else {
2096        if (!read_on_cache_miss) {
2097            const char *msg = "Read error: BID %" _F64 " in a database file '%s':"
2098                "block cache is not enabled.\n";
2099            fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, bid,
2100                    file->filename);
2101            return FDB_RESULT_READ_FAIL;
2102        }
2103
2104        r = filemgr_read_block(file, buf, bid);
2105        if (r != (ssize_t)file->blocksize) {
2106            _log_errno_str(file->ops, log_callback, (fdb_status) r, "READ",
2107                           file->filename);
2108            const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2109                "is not read correctly: only %d bytes read (block cache disabled).\n";
2110            status = (r < 0)? (fdb_status)r : FDB_RESULT_READ_FAIL;
2111            fdb_log(log_callback, status, msg, bid, file->filename, r);
2112            if (!log_callback || !log_callback->callback) {
2113                dbg_print_buf(buf, file->blocksize, true, 16);
2114            }
2115            return status;
2116        }
2117
2118#ifdef __CRC32
2119        status = _filemgr_crc32_check(file, buf);
2120        if (status != FDB_RESULT_SUCCESS) {
2121            _log_errno_str(file->ops, log_callback, status, "READ",
2122                           file->filename);
2123            const char *msg = "Read error: checksum error on BID %" _F64 " in a database file '%s' "
2124                ": marker %x (block cache disabled)\n";
2125            fdb_log(log_callback, status, msg, bid,
2126                    file->filename, *((uint8_t*)buf + file->blocksize-1));
2127            if (!log_callback || !log_callback->callback) {
2128                dbg_print_buf(buf, file->blocksize, true, 16);
2129            }
2130            return status;
2131        }
2132#endif
2133    }
2134    return status;
2135}
2136
2137fdb_status filemgr_write_offset(struct filemgr *file, bid_t bid,
2138                                uint64_t offset, uint64_t len, void *buf,
2139                                bool final_write,
2140                                err_log_callback *log_callback)
2141{
2142    size_t lock_no;
2143    ssize_t r = 0;
2144    uint64_t pos = bid * file->blocksize + offset;
2145    uint64_t curr_commit_pos = atomic_get_uint64_t(&file->last_commit);
2146
2147    if (offset + len > file->blocksize) {
2148        const char *msg = "Write error: trying to write the buffer data "
2149            "(offset: %" _F64 ", len: %" _F64 " that exceeds the block size "
2150            "%" _F64 " in a database file '%s'\n";
2151        fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, offset, len,
2152                file->blocksize, file->filename);
2153        return FDB_RESULT_WRITE_FAIL;
2154    }
2155
2156    if (sb_bmp_exists(file->sb)) {
2157        // block reusing is enabled
2158        if (!sb_ops.is_writable(file, bid)) {
2159            const char *msg = "Write error: trying to write at the offset %" _F64 " that is "
2160                              "not identified as a reusable block in "
2161                              "a database file '%s'\n";
2162            fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, pos, file->filename);
2163            return FDB_RESULT_WRITE_FAIL;
2164        }
2165    } else if (pos < curr_commit_pos) {
2166        // stale blocks are not reused yet
2167        if (file->sb == NULL ||
2168            (file->sb && pos >= file->sb->config->num_sb * file->blocksize)) {
2169            // (non-sequential update is exceptionally allowed for superblocks)
2170            const char *msg = "Write error: trying to write at the offset %" _F64 " that is "
2171                              "smaller than the current commit offset %" _F64 " in "
2172                              "a database file '%s'\n";
2173            fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, pos, curr_commit_pos,
2174                    file->filename);
2175            return FDB_RESULT_WRITE_FAIL;
2176        }
2177    }
2178
2179    if (global_config.ncacheblock > 0) {
2180        lock_no = bid % DLOCK_MAX;
2181        (void)lock_no;
2182
2183        bool locked = false;
2184#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2185        plock_entry_t *plock_entry;
2186        bid_t is_writer = 1;
2187        plock_entry = plock_lock(&file->plock, &bid, &is_writer);
2188#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2189        mutex_lock(&file->data_mutex[lock_no]);
2190#else
2191        spin_lock(&file->data_spinlock[lock_no]);
2192#endif //__FILEMGR_DATA_PARTIAL_LOCK
2193        locked = true;
2194
2195        if (len == file->blocksize) {
2196            // write entire block .. we don't need to read previous block
2197            r = bcache_write(file, bid, buf, BCACHE_REQ_DIRTY, final_write);
2198            if (r != global_config.blocksize) {
2199                if (locked) {
2200#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2201                    plock_unlock(&file->plock, plock_entry);
2202#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2203                    mutex_unlock(&file->data_mutex[lock_no]);
2204#else
2205                    spin_unlock(&file->data_spinlock[lock_no]);
2206#endif //__FILEMGR_DATA_PARTIAL_LOCK
2207                }
2208                _log_errno_str(file->ops, log_callback,
2209                               (fdb_status) r, "WRITE", file->filename);
2210                return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2211            }
2212        } else {
2213            // partially write buffer cache first
2214            r = bcache_write_partial(file, bid, buf, offset, len, final_write);
2215            if (r == 0) {
2216                // cache miss
2217                // write partially .. we have to read previous contents of the block
2218                int64_t cur_file_pos = file->ops->goto_eof(file->fd);
2219                if (cur_file_pos < 0) {
2220                    _log_errno_str(file->ops, log_callback,
2221                                   (fdb_status) cur_file_pos, "EOF", file->filename);
2222                    return (fdb_status) cur_file_pos;
2223                }
2224                bid_t cur_file_last_bid = cur_file_pos / file->blocksize;
2225                void *_buf = _filemgr_get_temp_buf();
2226
2227                if (bid >= cur_file_last_bid) {
2228                    // this is the first time to write this block
2229                    // we don't need to read previous block from file.
2230                } else {
2231                    r = filemgr_read_block(file, _buf, bid);
2232                    if (r != (ssize_t)file->blocksize) {
2233                        if (locked) {
2234#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2235                            plock_unlock(&file->plock, plock_entry);
2236#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2237                            mutex_unlock(&file->data_mutex[lock_no]);
2238#else
2239                            spin_unlock(&file->data_spinlock[lock_no]);
2240#endif //__FILEMGR_DATA_PARTIAL_LOCK
2241                        }
2242                        _filemgr_release_temp_buf(_buf);
2243                        _log_errno_str(file->ops, log_callback, (fdb_status) r,
2244                                       "READ", file->filename);
2245                        return r < 0 ? (fdb_status) r : FDB_RESULT_READ_FAIL;
2246                    }
2247                }
2248                memcpy((uint8_t *)_buf + offset, buf, len);
2249                r = bcache_write(file, bid, _buf, BCACHE_REQ_DIRTY, final_write);
2250                if (r != global_config.blocksize) {
2251                    if (locked) {
2252#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2253                        plock_unlock(&file->plock, plock_entry);
2254#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2255                        mutex_unlock(&file->data_mutex[lock_no]);
2256#else
2257                        spin_unlock(&file->data_spinlock[lock_no]);
2258#endif //__FILEMGR_DATA_PARTIAL_LOCK
2259                    }
2260                    _filemgr_release_temp_buf(_buf);
2261                    _log_errno_str(file->ops, log_callback,
2262                            (fdb_status) r, "WRITE", file->filename);
2263                    return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2264                }
2265
2266                _filemgr_release_temp_buf(_buf);
2267            } // cache miss
2268        } // full block or partial block
2269
2270        if (locked) {
2271#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2272            plock_unlock(&file->plock, plock_entry);
2273#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2274            mutex_unlock(&file->data_mutex[lock_no]);
2275#else
2276            spin_unlock(&file->data_spinlock[lock_no]);
2277#endif //__FILEMGR_DATA_PARTIAL_LOCK
2278        }
2279    } else { // block cache disabled
2280
2281#ifdef __CRC32
2282        if (len == file->blocksize) {
2283            uint8_t marker = *((uint8_t*)buf + file->blocksize - 1);
2284            if (marker == BLK_MARKER_BNODE) {
2285                memset((uint8_t *)buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
2286                uint32_t crc32 = get_checksum(reinterpret_cast<const uint8_t*>(buf),
2287                                              file->blocksize,
2288                                              file->crc_mode);
2289                crc32 = _endian_encode(crc32);
2290                memcpy((uint8_t *)buf + BTREE_CRC_OFFSET, &crc32, sizeof(crc32));
2291            }
2292        }
2293#endif
2294
2295        r = file->ops->pwrite(file->fd, buf, len, pos);
2296        _log_errno_str(file->ops, log_callback, (fdb_status) r, "WRITE", file->filename);
2297        if ((uint64_t)r != len) {
2298            return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2299        }
2300    } // block cache check
2301    return FDB_RESULT_SUCCESS;
2302}
2303
2304fdb_status filemgr_write(struct filemgr *file, bid_t bid, void *buf,
2305                   err_log_callback *log_callback)
2306{
2307    return filemgr_write_offset(file, bid, 0, file->blocksize, buf,
2308                                false, // TODO: track immutability of index blk
2309                                log_callback);
2310}
2311
2312fdb_status filemgr_commit(struct filemgr *file, bool sync,
2313                          err_log_callback *log_callback)
2314{
2315    // append header at the end of the file
2316    uint64_t bmp_revnum = 0;
2317    if (sb_ops.get_bmp_revnum) {
2318        bmp_revnum = sb_ops.get_bmp_revnum(file);
2319    }
2320    return filemgr_commit_bid(file, BLK_NOT_FOUND, bmp_revnum,
2321                              sync, log_callback);
2322}
2323
2324fdb_status filemgr_commit_bid(struct filemgr *file, bid_t bid,
2325                              uint64_t bmp_revnum, bool sync,
2326                              err_log_callback *log_callback)
2327{
2328    struct avl_node *a;
2329    struct kvs_node *node;
2330    bid_t prev_bid, _prev_bid;
2331    uint64_t _deltasize, _bmp_revnum;
2332    fdb_seqnum_t _seqnum;
2333    filemgr_header_revnum_t _revnum;
2334    int result = FDB_RESULT_SUCCESS;
2335    bool block_reusing = false;
2336
2337    filemgr_set_io_inprog(file);
2338    if (global_config.ncacheblock > 0) {
2339        result = bcache_flush(file);
2340        if (result != FDB_RESULT_SUCCESS) {
2341            _log_errno_str(file->ops, log_callback, (fdb_status) result,
2342                           "FLUSH", file->filename);
2343            filemgr_clear_io_inprog(file);
2344            return (fdb_status)result;
2345        }
2346    }
2347
2348    spin_lock(&file->lock);
2349
2350    uint16_t header_len = file->header.size;
2351    struct kvs_header *kv_header = file->kv_header;
2352    filemgr_magic_t magic = file->version;
2353
2354    if (file->header.size > 0 && file->header.data) {
2355        void *buf = _filemgr_get_temp_buf();
2356        uint8_t marker[BLK_MARKER_SIZE];
2357
2358        // [header data]:        'header_len' bytes   <---+
2359        // [header revnum]:      8 bytes                  |
2360        // [default KVS seqnum]: 8 bytes                  |
2361        // ...                                            |
2362        // (empty)                                    blocksize
2363        // ...                                            |
2364        // [SB bitmap revnum]:   8 bytes                  |
2365        // [Delta size]:         8 bytes                  |
2366        // [prev header bid]:    8 bytes                  |
2367        // [header length]:      2 bytes                  |
2368        // [magic number]:       8 bytes                  |
2369        // [block marker]:       1 byte               <---+
2370
2371        // header data
2372        memcpy(buf, file->header.data, header_len);
2373        // header rev number
2374        _revnum = _endian_encode(file->header.revnum);
2375        memcpy((uint8_t *)buf + header_len, &_revnum,
2376               sizeof(filemgr_header_revnum_t));
2377        // file's sequence number (default KVS seqnum)
2378        _seqnum = _endian_encode(file->header.seqnum.load());
2379        memcpy((uint8_t *)buf + header_len + sizeof(filemgr_header_revnum_t),
2380               &_seqnum, sizeof(fdb_seqnum_t));
2381
2382        // current header's sb bmp revision number
2383        if (file->sb) {
2384            _bmp_revnum = _endian_encode(bmp_revnum);
2385            memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2386                   - sizeof(header_len) - sizeof(_prev_bid)
2387                   - sizeof(_deltasize) - sizeof(_bmp_revnum)
2388                   - BLK_MARKER_SIZE),
2389                   &_bmp_revnum, sizeof(_bmp_revnum));
2390        }
2391
2392        // delta size since prior commit
2393        _deltasize = _endian_encode(file->header.stat.deltasize //index+data
2394                                  + wal_get_datasize(file)); // wal datasize
2395        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2396               - sizeof(header_len) - sizeof(_prev_bid)*2 - BLK_MARKER_SIZE),
2397               &_deltasize, sizeof(_deltasize));
2398
2399        // Reset in-memory delta size of the header for next commit...
2400        file->header.stat.deltasize = 0; // single kv store header
2401        if (kv_header) { // multi kv store stats
2402            a = avl_first(kv_header->idx_id);
2403            while (a) {
2404                node = _get_entry(a, struct kvs_node, avl_id);
2405                a = avl_next(&node->avl_id);
2406                node->stat.deltasize = 0;
2407            }
2408        }
2409
2410        // prev header bid
2411        prev_bid = atomic_get_uint64_t(&file->header.bid);
2412        _prev_bid = _endian_encode(prev_bid);
2413        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2414               - sizeof(header_len) - sizeof(_prev_bid) - BLK_MARKER_SIZE),
2415               &_prev_bid, sizeof(_prev_bid));
2416        // header length
2417        header_len = _endian_encode(header_len);
2418        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2419               - sizeof(header_len) - BLK_MARKER_SIZE),
2420               &header_len, sizeof(header_len));
2421        // magic number
2422        magic = _endian_encode(magic);
2423        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2424               - BLK_MARKER_SIZE), &magic, sizeof(magic));
2425
2426        // marker
2427        memset(marker, BLK_MARKER_DBHEADER, BLK_MARKER_SIZE);
2428        memcpy((uint8_t *)buf + file->blocksize - BLK_MARKER_SIZE,
2429               marker, BLK_MARKER_SIZE);
2430
2431        if (bid == BLK_NOT_FOUND) {
2432            // append header at the end of file
2433            bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
2434            block_reusing = false;
2435        } else {
2436            // write header in the allocated (reused) block
2437            block_reusing = true;
2438            // we MUST invalidate the header block 'bid', since previous
2439            // contents of 'bid' may remain in block cache and cause data
2440            // inconsistency if reading header block hits the cache.
2441            bcache_invalidate_block(file, bid);
2442        }
2443
2444        ssize_t rv = filemgr_write_blocks(file, buf, 1, bid);
2445        _log_errno_str(file->ops, log_callback, (fdb_status) rv,
2446                       "WRITE", file->filename);
2447        if (rv != (ssize_t)file->blocksize) {
2448            _filemgr_release_temp_buf(buf);
2449            spin_unlock(&file->lock);
2450            filemgr_clear_io_inprog(file);
2451            return rv < 0 ? (fdb_status) rv : FDB_RESULT_WRITE_FAIL;
2452        }
2453
2454        if (prev_bid) {
2455            // mark prev DB header as stale
2456            filemgr_add_stale_block(file, prev_bid * file->blocksize, file->blocksize);
2457        }
2458
2459        atomic_store_uint64_t(&file->header.bid, bid);
2460        if (!block_reusing) {
2461            atomic_add_uint64_t(&file->pos, file->blocksize);
2462        }
2463
2464        _filemgr_release_temp_buf(buf);
2465    }
2466
2467    if (sb_bmp_exists(file->sb) &&
2468        atomic_get_uint64_t(&file->sb->cur_alloc_bid) != BLK_NOT_FOUND &&
2469        atomic_get_uint8_t(&file->status) == FILE_NORMAL) {
2470        // block reusing is currently enabled
2471        atomic_store_uint64_t(&file->last_commit,
2472            atomic_get_uint64_t(&file->sb->cur_alloc_bid) * file->blocksize);
2473    } else {
2474        atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
2475    }
2476    if (file->sb) {
2477        // Since some more blocks may be allocated after the header block
2478        // (for storing BMP data or system docs for stale info)
2479        // so that the block pointed to by 'cur_alloc_bid' may have
2480        // different BMP revision number. So we have to use the
2481        // up-to-date bmp_revnum here.
2482        atomic_store_uint64_t(&file->last_writable_bmp_revnum,
2483                              filemgr_get_sb_bmp_revnum(file));
2484    }
2485
2486    spin_unlock(&file->lock);
2487
2488    if (sync) {
2489        result = file->ops->fsync(file->fd);
2490        _log_errno_str(file->ops, log_callback, (fdb_status)result,
2491                       "FSYNC", file->filename);
2492    }
2493    filemgr_clear_io_inprog(file);
2494    return (fdb_status) result;
2495}
2496
2497fdb_status filemgr_sync(struct filemgr *file, bool sync_option,
2498                        err_log_callback *log_callback)
2499{
2500    fdb_status result = FDB_RESULT_SUCCESS;
2501    if (global_config.ncacheblock > 0) {
2502        result = bcache_flush(file);
2503        if (result != FDB_RESULT_SUCCESS) {
2504            _log_errno_str(file->ops, log_callback, (fdb_status) result,
2505                           "FLUSH", file->filename);
2506            return result;
2507        }
2508    }
2509
2510    if (sync_option && file->fflags & FILEMGR_SYNC) {
2511        int rv = file->ops->fsync(file->fd);
2512        _log_errno_str(file->ops, log_callback, (fdb_status)rv, "FSYNC", file->filename);
2513        return (fdb_status) rv;
2514    }
2515    return result;
2516}
2517
2518fdb_status filemgr_copy_file_range(struct filemgr *src_file,
2519                                   struct filemgr *dst_file,
2520                                   bid_t src_bid, bid_t dst_bid,
2521                                   bid_t clone_len)
2522{
2523    uint32_t blocksize = src_file->blocksize;
2524    fdb_status fs = (fdb_status)dst_file->ops->copy_file_range(
2525                                            src_file->fs_type,
2526                                            src_file->fd,
2527                                            dst_file->fd,
2528                                            src_bid * blocksize,
2529                                            dst_bid * blocksize,
2530                                            clone_len * blocksize);
2531    if (fs != FDB_RESULT_SUCCESS) {
2532        return fs;
2533    }
2534    atomic_store_uint64_t(&dst_file->pos, (dst_bid + clone_len) * blocksize);
2535    return FDB_RESULT_SUCCESS;
2536}
2537
2538void filemgr_update_file_status(struct filemgr *file, file_status_t status)
2539{
2540    spin_lock(&file->lock);
2541    atomic_store_uint8_t(&file->status, status);
2542    spin_unlock(&file->lock);
2543}
2544
2545static void assign_old_filename(struct filemgr *file, const char *old_filename)
2546{
2547    free(file->old_filename);
2548    if (old_filename) {
2549        file->old_filename = (char*)malloc(strlen(old_filename) + 1);
2550        strcpy(file->old_filename, old_filename);
2551    } else {
2552        file->old_filename = NULL;
2553    }
2554}
2555
2556static void assign_new_filename(struct filemgr *file, const char *new_filename)
2557{
2558    free(file->new_filename);
2559    if (new_filename) {
2560        file->new_filename = (char*)malloc(strlen(new_filename) + 1);
2561        strcpy(file->new_filename, new_filename);
2562    } else {
2563        file->new_filename = NULL;
2564    }
2565}
2566
2567bool filemgr_update_file_linkage(struct filemgr *file,
2568                                 const char *old_filename,
2569                                 const char *new_filename)
2570{
2571
2572    bool ret = true;
2573    spin_lock(&file->lock);
2574    if (old_filename) {
2575        if (!file->old_filename) {
2576            assign_old_filename(file, old_filename);
2577        } else {
2578            ret = false;
2579            fdb_assert(atomic_get_uint32_t(&file->ref_count),
2580                       atomic_get_uint32_t(&file->ref_count), 0);
2581        }
2582    }
2583    if (new_filename) {
2584        assign_new_filename(file, new_filename);
2585    }
2586    spin_unlock(&file->lock);
2587    return ret;
2588}
2589
2590void filemgr_set_compaction_state(struct filemgr *old_file,
2591                                  struct filemgr *new_file,
2592                                  file_status_t status)
2593{
2594    if (old_file) {
2595        spin_lock(&old_file->lock);
2596        assign_new_filename(old_file, new_file ? new_file->filename : NULL);
2597        atomic_store_uint8_t(&old_file->status, status);
2598        spin_unlock(&old_file->lock);
2599
2600        if (new_file) {
2601            spin_lock(&new_file->lock);
2602            assign_old_filename(new_file, old_file->filename);
2603            spin_unlock(&new_file->lock);
2604        }
2605    }
2606}
2607
2608bool filemgr_set_kv_header(struct filemgr *file, struct kvs_header *kv_header,
2609                           void (*free_kv_header)(struct filemgr *file))
2610{
2611    bool ret;
2612    spin_lock(&file->lock);
2613
2614    if (!file->kv_header) {
2615        file->kv_header = kv_header;
2616        file->free_kv_header = free_kv_header;
2617        ret = true;
2618    } else {
2619        ret = false;
2620    }
2621
2622    spin_unlock(&file->lock);
2623
2624    return ret;
2625}
2626
2627struct kvs_header *filemgr_get_kv_header(struct filemgr *file)
2628{
2629    struct kvs_header *kv_header = NULL;
2630    spin_lock(&file->lock);
2631    kv_header = file->kv_header;
2632    spin_unlock(&file->lock);
2633    return kv_header;
2634}
2635
2636// Check if there is a file that still points to the old_file that is being
2637// compacted away. If so open the file and return its pointer.
2638static
2639void *_filemgr_check_stale_link(struct hash_elem *h, void *ctx) {
2640    struct filemgr *cur_file = (struct filemgr *)ctx;
2641    struct filemgr *file = _get_entry(h, struct filemgr, e);
2642    spin_lock(&file->lock);
2643    if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING &&
2644        !strcmp(file->new_filename, cur_file->filename)) {
2645        // Incrementing reference counter below is the same as filemgr_open()
2646        // We need to do this to ensure that the pointer returned does not
2647        // get freed outside the filemgr_open lock
2648        atomic_incr_uint32_t(&file->ref_count);
2649        spin_unlock(&file->lock);
2650        return (void *)file;
2651    }
2652    spin_unlock(&file->lock);
2653    return (void *)NULL;
2654}
2655
2656struct filemgr *filemgr_search_stale_links(struct filemgr *cur_file) {
2657    struct filemgr *very_old_file;
2658    spin_lock(&filemgr_openlock);
2659    very_old_file = (struct filemgr *)hash_scan(&hash,
2660                                         _filemgr_check_stale_link, cur_file);
2661    spin_unlock(&filemgr_openlock);
2662    return very_old_file;
2663}
2664
2665char *filemgr_redirect_old_file(struct filemgr *very_old_file,
2666                                struct filemgr *new_file,
2667                                filemgr_redirect_hdr_func
2668                                redirect_header_func) {
2669    if (!very_old_file || !new_file) {
2670        return NULL;
2671    }
2672
2673    size_t old_header_len, new_header_len;
2674    uint16_t new_filename_len;
2675    char *past_filename;
2676    spin_lock(&very_old_file->lock);
2677
2678    struct filemgr *new_file_of_very_old_file =
2679        filemgr_get_instance(very_old_file->new_filename);
2680
2681    if (very_old_file->header.size == 0 || !new_file_of_very_old_file) {
2682        spin_unlock(&very_old_file->lock);
2683        return NULL;
2684    }
2685
2686    old_header_len = very_old_file->header.size;
2687    new_filename_len = strlen(new_file->filename);
2688    // Find out the new DB header length with new_file's filename
2689    new_header_len = old_header_len
2690                     - strlen(new_file_of_very_old_file->filename)
2691                     + new_filename_len;
2692    // As we are going to change the new_filename field in the DB header of the
2693    // very_old_file, maybe reallocate DB header buf to accomodate bigger value
2694    if (new_header_len > old_header_len) {
2695        very_old_file->header.data = realloc(very_old_file->header.data,
2696                                             new_file->blocksize);
2697    }
2698    // Re-direct very_old_file to new_file
2699    assign_new_filename(very_old_file, new_file->filename);
2700    // Note that the old_filename of the new_file is not updated, this
2701    // is so that every file in the history is reachable from the current file.
2702
2703    past_filename = redirect_header_func(very_old_file,
2704                                         (uint8_t *)very_old_file->header.data,
2705                                         new_file);//Update in-memory header
2706    very_old_file->header.size = new_header_len;
2707    ++(very_old_file->header.revnum);
2708
2709    spin_unlock(&very_old_file->lock);
2710    return past_filename;
2711}
2712
2713void filemgr_remove_pending(struct filemgr *old_file,
2714                            struct filemgr *new_file,
2715                            err_log_callback *log_callback)
2716{
2717    if (new_file == NULL) {
2718        return;
2719    }
2720
2721    spin_lock(&old_file->lock);
2722    if (atomic_get_uint32_t(&old_file->ref_count) > 0) {
2723        // delay removing
2724        assign_new_filename(old_file, new_file->filename);
2725        atomic_store_uint8_t(&old_file->status, FILE_REMOVED_PENDING);
2726
2727#if !(defined(WIN32) || defined(_WIN32))
2728        // Only for Posix
2729        int ret;
2730        ret = unlink(old_file->filename);
2731        _log_errno_str(old_file->ops, log_callback, (fdb_status)ret,
2732                       "UNLINK", old_file->filename);
2733#endif
2734
2735        spin_unlock(&old_file->lock);
2736
2737        // Update new_file's old_filename
2738        spin_lock(&new_file->lock);
2739        assign_old_filename(new_file, old_file->filename);
2740        spin_unlock(&new_file->lock);
2741    } else {
2742        // immediatly remove
2743        // LCOV_EXCL_START
2744        spin_unlock(&old_file->lock);
2745
2746        struct filemgr *new_file_of_old_file =
2747            filemgr_get_instance(old_file->new_filename);
2748
2749        if (!lazy_file_deletion_enabled ||
2750            (new_file_of_old_file && new_file_of_old_file->in_place_compaction)) {
2751            remove(old_file->filename);
2752        }
2753        filemgr_remove_file(old_file, log_callback);
2754        // LCOV_EXCL_STOP
2755    }
2756}
2757
2758// migrate default kv store stats over to new_file
2759struct kvs_ops_stat *filemgr_migrate_op_stats(struct filemgr *old_file,
2760                                              struct filemgr *new_file,
2761                                              struct kvs_info *kvs)
2762{
2763    kvs_ops_stat *ret = NULL;
2764    if (new_file == NULL) {
2765        return NULL;
2766    }
2767
2768    spin_lock(&old_file->lock);
2769    new_file->header.op_stat = old_file->header.op_stat;
2770    ret = &new_file->header.op_stat;
2771    spin_unlock(&old_file->lock);
2772    return ret;
2773}
2774
2775// Note: filemgr_openlock should be held before calling this function.
2776fdb_status filemgr_destroy_file(char *filename,
2777                                struct filemgr_config *config,
2778                                struct hash *destroy_file_set)
2779{
2780    struct filemgr *file = NULL;
2781    struct hash to_destroy_files;
2782    struct hash *destroy_set = (destroy_file_set ? destroy_file_set :
2783                                                  &to_destroy_files);
2784    struct filemgr query;
2785    struct hash_elem *e = NULL;
2786    fdb_status status = FDB_RESULT_SUCCESS;
2787    char *old_filename = NULL;
2788
2789    if (!destroy_file_set) { // top level or non-recursive call
2790        hash_init(destroy_set, NBUCKET, _file_hash, _file_cmp);
2791    }
2792
2793    query.filename = filename;
2794    // check whether file is already being destroyed in parent recursive call
2795    e = hash_find(destroy_set, &query.e);
2796    if (e) { // Duplicate filename found, nothing to be done in this call
2797        if (!destroy_file_set) { // top level or non-recursive call
2798            hash_free(destroy_set);
2799        }
2800        return status;
2801    } else {
2802        // Remember file. Stack value ok IFF single direction recursion
2803        hash_insert(destroy_set, &query.e);
2804    }
2805
2806    // check global list of known files to see if it is already opened or not
2807    e = hash_find(&hash, &query.e);
2808    if (e) {
2809        // already opened (return existing structure)
2810        file = _get_entry(e, struct filemgr, e);
2811
2812        spin_lock(&file->lock);
2813        if (atomic_get_uint32_t(&file->ref_count)) {
2814            spin_unlock(&file->lock);
2815            status = FDB_RESULT_FILE_IS_BUSY;
2816            if (!destroy_file_set) { // top level or non-recursive call
2817                hash_free(destroy_set);
2818            }
2819            return status;
2820        }
2821        spin_unlock(&file->lock);
2822        if (file->old_filename) {
2823            status = filemgr_destroy_file(file->old_filename, config,
2824                                          destroy_set);
2825            if (status != FDB_RESULT_SUCCESS) {
2826                if (!destroy_file_set) { // top level or non-recursive call
2827                    hash_free(destroy_set);
2828                }
2829                return status;
2830            }
2831        }
2832
2833        // Cleanup file from in-memory as well as on-disk
2834        e = hash_remove(&hash, &file->e);
2835        fdb_assert(e, e, 0);
2836        filemgr_free_func(&file->e);
2837        if (filemgr_does_file_exist(filename) == FDB_RESULT_SUCCESS) {
2838            if (remove(filename)) {
2839                status = FDB_RESULT_FILE_REMOVE_FAIL;
2840            }
2841        }
2842    } else { // file not in memory, read on-disk to destroy older versions..
2843        file = (struct filemgr *)alca(struct filemgr, 1);
2844        memset(file, 0x0, sizeof(struct filemgr));
2845        file->filename = filename;
2846        file->ops = get_filemgr_ops();
2847        file->fd = file->ops->open(file->filename, O_RDWR, 0666);
2848        file->blocksize = global_config.blocksize;
2849        file->config = (struct filemgr_config *)alca(struct filemgr_config, 1);
2850        *file->config = *config;
2851        fdb_init_encryptor(&file->encryption, &config->encryption_key);
2852        if (file->fd < 0) {
2853            if (file->fd != FDB_RESULT_NO_SUCH_FILE) {
2854                if (!destroy_file_set) { // top level or non-recursive call
2855                    hash_free(destroy_set);
2856                }
2857                return (fdb_status) file->fd;
2858            }
2859        } else { // file successfully opened, seek to end to get DB header
2860            cs_off_t offset = file->ops->goto_eof(file->fd);
2861            if (offset < 0) {
2862                if (!destroy_file_set) { // top level or non-recursive call
2863                    hash_free(destroy_set);
2864                }
2865                return (fdb_status) offset;
2866            } else { // Need to read DB header which contains old filename
2867                atomic_store_uint64_t(&file->pos, offset);
2868                // initialize CRC mode
2869                if (file->config && file->config->options & FILEMGR_CREATE_CRC32) {
2870                    file->crc_mode = CRC32;
2871                } else {
2872                    file->crc_mode = CRC_DEFAULT;
2873                }
2874
2875                status = _filemgr_load_sb(file, NULL);
2876                if (status != FDB_RESULT_SUCCESS) {
2877                    if (!destroy_file_set) { // top level or non-recursive call
2878                        hash_free(destroy_set);
2879                    }
2880                    file->ops->close(file->fd);
2881                    return status;
2882                }
2883
2884                status = _filemgr_read_header(file, NULL);
2885                if (status != FDB_RESULT_SUCCESS) {
2886                    if (!destroy_file_set) { // top level or non-recursive call
2887                        hash_free(destroy_set);
2888                    }
2889                    file->ops->close(file->fd);
2890                    if (sb_ops.release && file->sb) {
2891                        sb_ops.release(file);
2892                    }
2893                    return status;
2894                }
2895                if (file->header.data) {
2896                    size_t new_fnamelen_off = ver_get_new_filename_off(file->
2897                                                                      version);
2898                    size_t old_fnamelen_off = new_fnamelen_off + 2;
2899                    uint16_t *new_filename_len_ptr = (uint16_t *)((char *)
2900                                                     file->header.data
2901                                                     + new_fnamelen_off);
2902                    uint16_t new_filename_len =
2903                                      _endian_decode(*new_filename_len_ptr);
2904                    uint16_t *old_filename_len_ptr = (uint16_t *)((char *)
2905                                                     file->header.data
2906                                                     + old_fnamelen_off);
2907                    uint16_t old_filename_len =
2908                                      _endian_decode(*old_filename_len_ptr);
2909                    old_filename = (char *)file->header.data + old_fnamelen_off
2910                                   + 2 + new_filename_len;
2911                    if (old_filename_len) {
2912                        status = filemgr_destroy_file(old_filename, config,
2913                                                      destroy_set);
2914                    }
2915                    free(file->header.data);
2916                }
2917                file->ops->close(file->fd);
2918                if (sb_ops.release && file->sb) {
2919                    sb_ops.release(file);
2920                }
2921                if (status == FDB_RESULT_SUCCESS) {
2922                    if (filemgr_does_file_exist(filename)
2923                                               == FDB_RESULT_SUCCESS) {
2924                        if (remove(filename)) {
2925                            status = FDB_RESULT_FILE_REMOVE_FAIL;
2926                        }
2927                    }
2928                }
2929            }
2930        }
2931    }
2932
2933    if (!destroy_file_set) { // top level or non-recursive call
2934        hash_free(destroy_set);
2935    }
2936
2937    return status;
2938}
2939
2940bool filemgr_is_rollback_on(struct filemgr *file)
2941{
2942    bool rv;
2943    spin_lock(&file->lock);
2944    rv = (file->fflags & FILEMGR_ROLLBACK_IN_PROG);
2945    spin_unlock(&file->lock);
2946    return rv;
2947}
2948
2949void filemgr_set_rollback(struct filemgr *file, uint8_t new_val)
2950{
2951    spin_lock(&file->lock);
2952    if (new_val) {
2953        file->fflags |= FILEMGR_ROLLBACK_IN_PROG;
2954    } else {
2955        file->fflags &= ~FILEMGR_ROLLBACK_IN_PROG;
2956    }
2957    spin_unlock(&file->lock);
2958}
2959
2960void filemgr_set_cancel_compaction(struct filemgr *file, bool cancel)
2961{
2962    spin_lock(&file->lock);
2963    if (cancel) {
2964        file->fflags |= FILEMGR_CANCEL_COMPACTION;
2965    } else {
2966        file->fflags &= ~FILEMGR_CANCEL_COMPACTION;
2967    }
2968    spin_unlock(&file->lock);
2969}
2970
2971bool filemgr_is_compaction_cancellation_requested(struct filemgr *file)
2972{
2973    bool rv;
2974    spin_lock(&file->lock);
2975    rv = (file->fflags & FILEMGR_CANCEL_COMPACTION);
2976    spin_unlock(&file->lock);
2977    return rv;
2978}
2979
2980void filemgr_set_in_place_compaction(struct filemgr *file,
2981                                     bool in_place_compaction) {
2982    spin_lock(&file->lock);
2983    file->in_place_compaction = in_place_compaction;
2984    spin_unlock(&file->lock);
2985}
2986
2987bool filemgr_is_in_place_compaction_set(struct filemgr *file)
2988
2989{
2990    bool ret = false;
2991    spin_lock(&file->lock);
2992    ret = file->in_place_compaction;
2993    spin_unlock(&file->lock);
2994    return ret;
2995}
2996
2997void filemgr_mutex_openlock(struct filemgr_config *config)
2998{
2999    filemgr_init(config);
3000
3001    spin_lock(&filemgr_openlock);
3002}
3003
3004void filemgr_mutex_openunlock(void)
3005{
3006    spin_unlock(&filemgr_openlock);
3007}
3008
3009void filemgr_mutex_lock(struct filemgr *file)
3010{
3011    mutex_lock(&file->writer_lock.mutex);
3012    file->writer_lock.locked = true;
3013}
3014
3015bool filemgr_mutex_trylock(struct filemgr *file) {
3016    if (mutex_trylock(&file->writer_lock.mutex)) {
3017        file->writer_lock.locked = true;
3018        return true;
3019    }
3020    return false;
3021}
3022
3023void filemgr_mutex_unlock(struct filemgr *file)
3024{
3025    if (file->writer_lock.locked) {
3026        file->writer_lock.locked = false;
3027        mutex_unlock(&file->writer_lock.mutex);
3028    }
3029}
3030
3031bool filemgr_is_commit_header(void *head_buffer, size_t blocksize)
3032{
3033    uint8_t marker[BLK_MARKER_SIZE];
3034    filemgr_magic_t magic;
3035    marker[0] = *(((uint8_t *)head_buffer)
3036                 + blocksize - BLK_MARKER_SIZE);
3037    if (marker[0] != BLK_MARKER_DBHEADER) {
3038        return false;
3039    }
3040
3041    memcpy(&magic, (uint8_t *) head_buffer
3042            + blocksize - BLK_MARKER_SIZE - sizeof(magic), sizeof(magic));
3043    magic = _endian_decode(magic);
3044
3045    return ver_is_valid_magic(magic);
3046}
3047
3048bool filemgr_is_cow_supported(struct filemgr *src, struct filemgr *dst)
3049{
3050    src->fs_type = src->ops->get_fs_type(src->fd);
3051    if (src->fs_type < 0) {
3052        return false;
3053    }
3054    dst->fs_type = dst->ops->get_fs_type(dst->fd);
3055    if (dst->fs_type < 0) {
3056        return false;
3057    }
3058    if (src->fs_type == dst->fs_type && src->fs_type != FILEMGR_FS_NO_COW) {
3059        return true;
3060    }
3061    return false;
3062}
3063
3064void filemgr_set_throttling_delay(struct filemgr *file, uint64_t delay_us)
3065{
3066    atomic_store_uint32_t(&file->throttling_delay, delay_us,
3067                          std::memory_order_relaxed);
3068}
3069
3070uint32_t filemgr_get_throttling_delay(struct filemgr *file)
3071{
3072    return atomic_get_uint32_t(&file->throttling_delay,
3073                               std::memory_order_relaxed);
3074}
3075
3076void filemgr_clear_stale_list(struct filemgr *file)
3077{
3078    if (file->stale_list) {
3079        // if the items in the list are not freed yet, release them first.
3080        struct list_elem *e;
3081        struct stale_data *item;
3082
3083        e = list_begin(file->stale_list);
3084        while (e) {
3085            item = _get_entry(e, struct stale_data, le);
3086            e = list_remove(file->stale_list, e);
3087            free(item);
3088        }
3089        file->stale_list = NULL;
3090    }
3091}
3092
3093void filemgr_clear_stale_info_tree(struct filemgr *file)
3094{
3095    struct avl_node *a;
3096    struct list_elem *e;
3097    struct stale_info_commit *commit;
3098    struct stale_info_entry *entry;
3099
3100    a = avl_first(&file->stale_info_tree);
3101    while (a) {
3102        commit = _get_entry(a, struct stale_info_commit, avl);
3103        a = avl_next(&commit->avl);
3104        avl_remove(&file->stale_info_tree, &commit->avl);
3105
3106        e = list_begin(&commit->doc_list);
3107        while (e) {
3108            entry = _get_entry(e, struct stale_info_entry, le);
3109            e = list_next(&entry->le);
3110            list_remove(&commit->doc_list, &entry->le);
3111            free(entry->ctx);
3112            free(entry);
3113        }
3114        free(commit);
3115    }
3116}
3117
3118void filemgr_clear_mergetree(struct filemgr *file)
3119{
3120    struct avl_node *a;
3121    struct stale_data *entry;
3122
3123    a = avl_first(&file->mergetree);
3124    while (a) {
3125        entry = _get_entry(a, struct stale_data, avl);
3126        a = avl_next(&entry->avl);
3127        avl_remove(&file->mergetree, &entry->avl);
3128        free(entry);
3129    }
3130}
3131
3132void filemgr_add_stale_block(struct filemgr *file,
3133                             bid_t pos,
3134                             size_t len)
3135{
3136    if (file->stale_list) {
3137        struct stale_data *item;
3138        struct list_elem *e;
3139
3140        e = list_end(file->stale_list);
3141
3142        if (e) {
3143            item = _get_entry(e, struct stale_data, le);
3144            if (item->pos + item->len == pos) {
3145                // merge if consecutive item
3146                item->len += len;
3147                return;
3148            }
3149        }
3150
3151        item = (struct stale_data*)calloc(1, sizeof(struct stale_data));
3152        item->pos = pos;
3153        item->len = len;
3154        list_push_back(file->stale_list, &item->le);
3155    }
3156}
3157
3158size_t filemgr_actual_stale_length(struct filemgr *file,
3159                                   bid_t offset,
3160                                   size_t length)
3161{
3162    size_t actual_len;
3163    bid_t start_bid, end_bid;
3164
3165    start_bid = offset / file->blocksize;
3166    end_bid = (offset + length) / file->blocksize;
3167
3168    actual_len = length + (end_bid - start_bid);
3169    if ((offset + actual_len) % file->blocksize ==
3170        file->blocksize - 1) {
3171        actual_len += 1;
3172    }
3173
3174    return actual_len;
3175}
3176
3177// if a document is not physically consecutive,
3178// return all fragmented regions.
3179struct stale_regions filemgr_actual_stale_regions(struct filemgr *file,
3180                                                  bid_t offset,
3181                                                  size_t length)
3182{
3183    uint8_t *buf = alca(uint8_t, file->blocksize);
3184    size_t remaining = length;
3185    size_t real_blocksize = file->blocksize;
3186    size_t blocksize = real_blocksize;
3187    size_t cur_pos, space_in_block, count;
3188    bid_t cur_bid;
3189    bool non_consecutive = ver_non_consecutive_doc(file->version);
3190    struct docblk_meta blk_meta;
3191    struct stale_regions ret;
3192    struct stale_data *arr = NULL, *cur_region;
3193
3194    if (non_consecutive) {
3195        blocksize -= DOCBLK_META_SIZE;
3196
3197        cur_bid = offset / file->blocksize;
3198        // relative position in the block 'cur_bid'
3199        cur_pos = offset % file->blocksize;
3200
3201        count = 0;
3202        while (remaining) {
3203            if (count == 1) {
3204                // more than one stale region .. allocate array
3205                size_t arr_size = (length / blocksize) + 2;
3206                arr = (struct stale_data *)calloc(arr_size, sizeof(struct stale_data));
3207                arr[0] = ret.region;
3208                ret.regions = arr;
3209            }
3210
3211            if (count == 0) {
3212                // Since n_regions will be 1 in most cases,
3213                // we do not allocate heap memory when 'n_regions==1'.
3214                cur_region = &ret.region;
3215            } else {
3216                cur_region = &ret.regions[count];
3217            }
3218            cur_region->pos = (cur_bid * real_blocksize) + cur_pos;
3219
3220            // subtract data size in the current block
3221            space_in_block = blocksize - cur_pos;
3222            if (space_in_block <= remaining) {
3223                // rest of the current block (including block meta)
3224                cur_region->len = real_blocksize - cur_pos;
3225                remaining -= space_in_block;
3226            } else {
3227                cur_region->len = remaining;
3228                remaining = 0;
3229            }
3230            count++;
3231
3232            if (remaining) {
3233                // get next BID
3234                filemgr_read(file, cur_bid, (void *)buf, NULL, true);
3235                memcpy(&blk_meta, buf + blocksize, sizeof(blk_meta));
3236                cur_bid = _endian_decode(blk_meta.next_bid);
3237                cur_pos = 0; // beginning of the block
3238            }
3239        }
3240        ret.n_regions = count;
3241
3242    } else {
3243        // doc blocks are consecutive .. always return a single region.
3244        ret.n_regions = 1;
3245        ret.region.pos = offset;
3246        ret.region.len = filemgr_actual_stale_length(file, offset, length);
3247    }
3248
3249    return ret;
3250}
3251
3252void filemgr_mark_stale(struct filemgr *file,
3253                        bid_t offset,
3254                        size_t length)
3255{
3256    if (file->stale_list && length) {
3257        size_t i;
3258        struct stale_regions sr;
3259
3260        sr = filemgr_actual_stale_regions(file, offset, length);
3261
3262        if (sr.n_regions > 1) {
3263            for (i=0; i<sr.n_regions; ++i){
3264                filemgr_add_stale_block(file, sr.regions[i].pos, sr.regions[i].len);
3265            }
3266            free(sr.regions);
3267        } else if (sr.n_regions == 1) {
3268            filemgr_add_stale_block(file, sr.region.pos, sr.region.len);
3269        }
3270    }
3271}
3272
3273INLINE int _fhandle_idx_cmp(struct avl_node *a, struct avl_node *b, void *aux)
3274{
3275    uint64_t aaa, bbb;
3276    struct filemgr_fhandle_idx_node *aa, *bb;
3277    aa = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
3278    bb = _get_entry(b, struct filemgr_fhandle_idx_node, avl);
3279    aaa = (uint64_t)aa->fhandle;
3280    bbb = (uint64_t)bb->fhandle;
3281
3282#ifdef __BIT_CMP
3283    return _CMP_U64(aaa, bbb);
3284#else
3285    if (aaa < bbb) {
3286        return -1;
3287    } else if (aaa > bbb) {
3288        return 1;
3289    } else {
3290        return 0;
3291    }
3292#endif
3293}
3294
3295void _free_fhandle_idx(struct avl_tree *idx)
3296{
3297    struct avl_node *a;
3298    struct filemgr_fhandle_idx_node *item;
3299
3300    a = avl_first(idx);
3301    while (a) {
3302        item = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
3303        a = avl_next(a);
3304        avl_remove(idx, &item->avl);
3305        free(item);
3306    }
3307}
3308
3309bool filemgr_fhandle_add(struct filemgr *file, void *fhandle)
3310{
3311    bool ret;
3312    struct filemgr_fhandle_idx_node *item, query;
3313    struct avl_node *a;
3314
3315    spin_lock(&file->fhandle_idx_lock);
3316
3317    query.fhandle = fhandle;
3318    a = avl_search(&file->fhandle_idx, &query.avl, _fhandle_idx_cmp);
3319    if (!a) {
3320        // not exist, create a node and insert
3321        item = (struct filemgr_fhandle_idx_node *)calloc(1, sizeof(struct filemgr_fhandle_idx_node));
3322        item->fhandle = fhandle;
3323        avl_insert(&file->fhandle_idx, &item->avl, _fhandle_idx_cmp);
3324        ret = true;
3325    } else {
3326        ret = false;
3327    }
3328
3329    spin_unlock(&file->fhandle_idx_lock);
3330    return ret;
3331}
3332
3333bool filemgr_fhandle_remove(struct filemgr *file, void *fhandle)
3334{
3335    bool ret;
3336    struct filemgr_fhandle_idx_node *item, query;
3337    struct avl_node *a;
3338
3339    spin_lock(&file->fhandle_idx_lock);
3340
3341    query.fhandle = fhandle;
3342    a = avl_search(&file->fhandle_idx, &query.avl, _fhandle_idx_cmp);
3343    if (a) {
3344        // exist, remove & free the item
3345        item = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
3346        avl_remove(&file->