xref: /6.0.3/forestdb/src/filemgr.cc (revision 1c75bd93)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22#include <sys/stat.h>
23#include <stdarg.h>
24#if !defined(WIN32) && !defined(_WIN32)
25#include <sys/time.h>
26#endif
27
28#include "filemgr.h"
29#include "filemgr_ops.h"
30#include "hash_functions.h"
31#include "blockcache.h"
32#include "wal.h"
33#include "list.h"
34#include "fdb_internal.h"
35#include "time_utils.h"
36#include "encryption.h"
37#include "version.h"
38
39#include "memleak.h"
40
41#ifdef __DEBUG
42#ifndef __DEBUG_FILEMGR
43    #undef DBG
44    #undef DBGCMD
45    #undef DBGSW
46    #define DBG(...)
47    #define DBGCMD(...)
48    #define DBGSW(n, ...)
49#endif
50#endif
51
52// NBUCKET must be power of 2
53#define NBUCKET (1024)
54
55// global static variables
56#ifdef SPIN_INITIALIZER
57static spin_t initial_lock = SPIN_INITIALIZER;
58#else
59static volatile unsigned int initial_lock_status = 0;
60static spin_t initial_lock;
61#endif
62
63
64static volatile uint8_t filemgr_initialized = 0;
65extern volatile uint8_t bgflusher_initialized;
66static struct filemgr_config global_config;
67static struct hash hash;
68static spin_t filemgr_openlock;
69
70static const int MAX_STAT_UPDATE_RETRIES = 5;
71
72struct temp_buf_item{
73    void *addr;
74    struct list_elem le;
75};
76static struct list temp_buf;
77static spin_t temp_buf_lock;
78
79static bool lazy_file_deletion_enabled = false;
80static register_file_removal_func register_file_removal = NULL;
81static check_file_removal_func is_file_removed = NULL;
82
83static struct sb_ops sb_ops;
84
85static void spin_init_wrap(void *lock) {
86    spin_init((spin_t*)lock);
87}
88
89static void spin_destroy_wrap(void *lock) {
90    spin_destroy((spin_t*)lock);
91}
92
93static void spin_lock_wrap(void *lock) {
94    spin_lock((spin_t*)lock);
95}
96
97static void spin_unlock_wrap(void *lock) {
98    spin_unlock((spin_t*)lock);
99}
100
101static void mutex_init_wrap(void *lock) {
102    mutex_init((mutex_t*)lock);
103}
104
105static void mutex_destroy_wrap(void *lock) {
106    mutex_destroy((mutex_t*)lock);
107}
108
109static void mutex_lock_wrap(void *lock) {
110    mutex_lock((mutex_t*)lock);
111}
112
113static void mutex_unlock_wrap(void *lock) {
114    mutex_unlock((mutex_t*)lock);
115}
116
117static int _kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
118{
119    struct kvs_node *aa, *bb;
120    aa = _get_entry(a, struct kvs_node, avl_id);
121    bb = _get_entry(b, struct kvs_node, avl_id);
122
123    if (aa->id < bb->id) {
124        return -1;
125    } else if (aa->id > bb->id) {
126        return 1;
127    } else {
128        return 0;
129    }
130}
131
132static int _block_is_overlapped(void *pbid1, void *pis_writer1,
133                                void *pbid2, void *pis_writer2,
134                                void *aux)
135{
136    (void)aux;
137    bid_t bid1, is_writer1, bid2, is_writer2;
138    bid1 = *(bid_t*)pbid1;
139    is_writer1 = *(bid_t*)pis_writer1;
140    bid2 = *(bid_t*)pbid2;
141    is_writer2 = *(bid_t*)pis_writer2;
142
143    if (bid1 != bid2) {
144        // not overlapped
145        return 0;
146    } else {
147        // overlapped
148        if (!is_writer1 && !is_writer2) {
149            // both are readers
150            return 0;
151        } else {
152            return 1;
153        }
154    }
155}
156
157fdb_status fdb_log(err_log_callback *log_callback,
158                   fdb_status status,
159                   const char *format, ...)
160{
161    char msg[4096];
162    va_list args;
163    va_start(args, format);
164    vsprintf(msg, format, args);
165    va_end(args);
166
167    if (log_callback && log_callback->callback) {
168        log_callback->callback(status, msg, log_callback->ctx_data);
169    } else {
170        fprintf(stderr, "[FDB ERR] %s\n", msg);
171    }
172    return status;
173}
174
175static void _log_errno_str(struct filemgr_ops *ops,
176                           err_log_callback *log_callback,
177                           fdb_status io_error,
178                           const char *what,
179                           const char *filename)
180{
181    if (io_error < 0) {
182        char errno_msg[512];
183        ops->get_errno_str(errno_msg, 512);
184        fdb_log(log_callback, io_error,
185                "Error in %s on a database file '%s', %s", what, filename, errno_msg);
186    }
187}
188
189static uint32_t _file_hash(struct hash *hash, struct hash_elem *e)
190{
191    struct filemgr *file = _get_entry(e, struct filemgr, e);
192    int len = strlen(file->filename);
193
194    return get_checksum(reinterpret_cast<const uint8_t*>(file->filename), len) &
195                        ((unsigned)(NBUCKET-1));
196}
197
198static int _file_cmp(struct hash_elem *a, struct hash_elem *b)
199{
200    struct filemgr *aa, *bb;
201    aa = _get_entry(a, struct filemgr, e);
202    bb = _get_entry(b, struct filemgr, e);
203    return strcmp(aa->filename, bb->filename);
204}
205
206void filemgr_init(struct filemgr_config *config)
207{
208    // global initialization
209    // initialized only once at first time
210    if (!filemgr_initialized) {
211#ifndef SPIN_INITIALIZER
212        // Note that only Windows passes through this routine
213        if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
214            // atomically initialize spin lock only once
215            spin_init(&initial_lock);
216            initial_lock_status = 2;
217        } else {
218            // the others ... wait until initializing 'initial_lock' is done
219            while (initial_lock_status != 2) {
220                Sleep(1);
221            }
222        }
223#endif
224
225        spin_lock(&initial_lock);
226        if (!filemgr_initialized) {
227            memset(&sb_ops, 0x0, sizeof(sb_ops));
228            global_config = *config;
229
230            if (global_config.ncacheblock > 0)
231                bcache_init(global_config.ncacheblock, global_config.blocksize);
232
233            hash_init(&hash, NBUCKET, _file_hash, _file_cmp);
234
235            // initialize temp buffer
236            list_init(&temp_buf);
237            spin_init(&temp_buf_lock);
238
239            // initialize global lock
240            spin_init(&filemgr_openlock);
241
242            // set the initialize flag
243            filemgr_initialized = 1;
244        }
245        spin_unlock(&initial_lock);
246    }
247}
248
249void filemgr_set_lazy_file_deletion(bool enable,
250                                    register_file_removal_func regis_func,
251                                    check_file_removal_func check_func)
252{
253    lazy_file_deletion_enabled = enable;
254    register_file_removal = regis_func;
255    is_file_removed = check_func;
256}
257
258void filemgr_set_sb_operation(struct sb_ops ops)
259{
260    sb_ops = ops;
261}
262
263static void * _filemgr_get_temp_buf()
264{
265    struct list_elem *e;
266    struct temp_buf_item *item;
267
268    spin_lock(&temp_buf_lock);
269    e = list_pop_front(&temp_buf);
270    if (e) {
271        item = _get_entry(e, struct temp_buf_item, le);
272    } else {
273        void *addr = NULL;
274
275        malloc_align(addr, FDB_SECTOR_SIZE,
276                     global_config.blocksize + sizeof(struct temp_buf_item));
277
278        item = (struct temp_buf_item *)((uint8_t *) addr + global_config.blocksize);
279        item->addr = addr;
280    }
281    spin_unlock(&temp_buf_lock);
282
283    return item->addr;
284}
285
286static void _filemgr_release_temp_buf(void *buf)
287{
288    struct temp_buf_item *item;
289
290    spin_lock(&temp_buf_lock);
291    item = (struct temp_buf_item*)((uint8_t *)buf + global_config.blocksize);
292    list_push_front(&temp_buf, &item->le);
293    spin_unlock(&temp_buf_lock);
294}
295
296static void _filemgr_shutdown_temp_buf()
297{
298    struct list_elem *e;
299    struct temp_buf_item *item;
300    size_t count=0;
301
302    spin_lock(&temp_buf_lock);
303    e = list_begin(&temp_buf);
304    while(e){
305        item = _get_entry(e, struct temp_buf_item, le);
306        e = list_remove(&temp_buf, e);
307        free_align(item->addr);
308        count++;
309    }
310    spin_unlock(&temp_buf_lock);
311}
312
313// Read a block from the file, decrypting if necessary.
314static ssize_t filemgr_read_block(struct filemgr *file, void *buf, bid_t bid) {
315    ssize_t result = file->ops->pread(file->fd, buf, file->blocksize,
316                                      file->blocksize*bid);
317    if (file->encryption.ops && result > 0) {
318        if (result != (ssize_t)file->blocksize)
319            return FDB_RESULT_READ_FAIL;
320        fdb_status status = fdb_decrypt_block(&file->encryption, buf, result, bid);
321        if (status != FDB_RESULT_SUCCESS)
322            return status;
323    }
324    return result;
325}
326
327// Write consecutive block(s) to the file, encrypting if necessary.
328ssize_t filemgr_write_blocks(struct filemgr *file, void *buf, unsigned num_blocks, bid_t start_bid) {
329    size_t blocksize = file->blocksize;
330    cs_off_t offset = start_bid * blocksize;
331    size_t nbytes = num_blocks * blocksize;
332    if (file->encryption.ops == NULL) {
333        return file->ops->pwrite(file->fd, buf, nbytes, offset);
334    } else {
335        uint8_t *encrypted_buf;
336        if (nbytes > 4096)
337            encrypted_buf = (uint8_t*)malloc(nbytes);
338        else
339            encrypted_buf = alca(uint8_t, nbytes); // most common case (writing single block)
340        if (!encrypted_buf)
341            return FDB_RESULT_ALLOC_FAIL;
342        fdb_status status = fdb_encrypt_blocks(&file->encryption,
343                                               encrypted_buf,
344                                               buf,
345                                               blocksize,
346                                               num_blocks,
347                                               start_bid);
348        if (nbytes > 4096)
349            free(encrypted_buf);
350        if (status != FDB_RESULT_SUCCESS)
351            return status;
352        return file->ops->pwrite(file->fd, encrypted_buf, nbytes, offset);
353    }
354}
355
356int filemgr_is_writable(struct filemgr *file, bid_t bid)
357{
358    if (sb_bmp_exists(file->sb) && sb_ops.is_writable) {
359        // block reusing is enabled
360        return sb_ops.is_writable(file, bid);
361    } else {
362        uint64_t pos = bid * file->blocksize;
363        // Note that we don't need to grab file->lock here because
364        // 1) both file->pos and file->last_commit are only incremented.
365        // 2) file->last_commit is updated using the value of file->pos,
366        //    and always equal to or smaller than file->pos.
367        return (pos <  atomic_get_uint64_t(&file->pos) &&
368                pos >= atomic_get_uint64_t(&file->last_commit));
369    }
370}
371
372static fdb_status _filemgr_read_header(struct filemgr *file,
373                                       err_log_callback *log_callback)
374{
375    uint8_t marker[BLK_MARKER_SIZE];
376    filemgr_magic_t magic = ver_get_latest_magic();
377    filemgr_header_len_t len;
378    uint8_t *buf;
379    uint32_t crc, crc_file;
380    bool check_crc32_open_rule = false;
381    fdb_status status = FDB_RESULT_SUCCESS;
382    bid_t hdr_bid, hdr_bid_local;
383    size_t min_filesize = 0;
384
385    // get temp buffer
386    buf = (uint8_t *) _filemgr_get_temp_buf();
387
388    // If a header is found crc_mode can change to reflect the file
389    if (file->crc_mode == CRC32) {
390        check_crc32_open_rule = true;
391    }
392
393    hdr_bid = atomic_get_uint64_t(&file->pos) / file->blocksize - 1;
394    hdr_bid_local = hdr_bid;
395
396    if (file->sb) {
397        // superblock exists .. file size does not start from zero.
398        min_filesize = file->sb->config->num_sb * file->blocksize;
399        bid_t sb_last_hdr_bid = atomic_get_uint64_t(&file->sb->last_hdr_bid);
400        if (sb_last_hdr_bid != BLK_NOT_FOUND) {
401            hdr_bid = hdr_bid_local = sb_last_hdr_bid;
402        }
403        // if header info does not exist in superblock,
404        // get DB header at the end of the file.
405    }
406
407    if (atomic_get_uint64_t(&file->pos) > min_filesize) {
408        // Crash Recovery Test 1: unaligned last block write
409        uint64_t remain = atomic_get_uint64_t(&file->pos) % file->blocksize;
410        if (remain) {
411            atomic_sub_uint64_t(&file->pos, remain);
412            atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
413            const char *msg = "Crash Detected: %" _F64 " non-block aligned bytes discarded "
414                "from a database file '%s'\n";
415            DBG(msg, remain, file->filename);
416            fdb_log(log_callback, FDB_RESULT_READ_FAIL /* Need to add a better error code*/,
417                    msg, remain, file->filename);
418        }
419
420        size_t block_counter = 0;
421        do {
422            ssize_t rv = filemgr_read_block(file, buf, hdr_bid_local);
423            if (rv != (ssize_t)file->blocksize) {
424                status = (fdb_status) rv;
425                const char *msg = "Unable to read a database file '%s' with "
426                    "blocksize %" _F64 "\n";
427                DBG(msg, file->filename, file->blocksize);
428                fdb_log(log_callback, status, msg, file->filename, file->blocksize);
429                break;
430            }
431            ++block_counter;
432            memcpy(marker, buf + file->blocksize - BLK_MARKER_SIZE,
433                   BLK_MARKER_SIZE);
434
435            if (marker[0] == BLK_MARKER_DBHEADER) {
436                // possible need for byte conversions here
437                memcpy(&magic,
438                       buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
439                       sizeof(magic));
440                magic = _endian_decode(magic);
441
442                if (ver_is_valid_magic(magic)) {
443
444                    memcpy(&len,
445                           buf + file->blocksize - BLK_MARKER_SIZE -
446                           sizeof(magic) - sizeof(len),
447                           sizeof(len));
448                    len = _endian_decode(len);
449
450                    memcpy(&crc_file, buf + len - sizeof(crc), sizeof(crc));
451                    crc_file = _endian_decode(crc_file);
452
453                    // crc check and detect the crc_mode
454                    if (detect_and_check_crc(reinterpret_cast<const uint8_t*>(buf),
455                                             len - sizeof(crc),
456                                             crc_file,
457                                             &file->crc_mode)) {
458                        // crc mode is detected and known.
459                        // check the rules of opening legacy CRC
460                        if (check_crc32_open_rule && file->crc_mode != CRC32) {
461                            const char *msg = "Open of CRC32C file"
462                                              " with forced CRC32\n";
463                            status = FDB_RESULT_INVALID_ARGS;
464                            DBG(msg);
465                            fdb_log(log_callback, status, msg);
466                            break;
467                        } else {
468                            status = FDB_RESULT_SUCCESS;
469
470                            file->header.data = (void *)malloc(file->blocksize);
471
472                            memcpy(file->header.data, buf, len);
473                            memcpy(&file->header.revnum, buf + len,
474                                   sizeof(filemgr_header_revnum_t));
475                            memcpy((void *) &file->header.seqnum,
476                                    buf + len + sizeof(filemgr_header_revnum_t),
477                                    sizeof(fdb_seqnum_t));
478
479                            if (ver_superblock_support(magic)) {
480                                // sb bmp revnum
481                                uint64_t _bmp_revnum;
482                                memcpy(&_bmp_revnum,
483                                    (uint8_t *)buf + (file->blocksize
484                                    - sizeof(filemgr_magic_t) - sizeof(len)
485                                    - sizeof(bid_t) - sizeof(uint64_t)
486                                    - sizeof(_bmp_revnum)
487                                    - BLK_MARKER_SIZE),
488                                    sizeof(_bmp_revnum));
489                                atomic_store_uint64_t(&file->last_commit_bmp_revnum,
490                                                      _endian_decode(_bmp_revnum));
491                            }
492
493                            file->header.revnum =
494                                _endian_decode(file->header.revnum);
495                            file->header.seqnum =
496                                _endian_decode(file->header.seqnum.load());
497                            file->header.size = len;
498                            atomic_store_uint64_t(&file->header.bid, hdr_bid_local);
499                            memset(&file->header.stat, 0x0, sizeof(file->header.stat));
500
501                            // release temp buffer
502                            _filemgr_release_temp_buf(buf);
503                        }
504
505                        file->version = magic;
506                        return status;
507                    } else {
508                        status = FDB_RESULT_CHECKSUM_ERROR;
509                        uint32_t crc32 = 0, crc32c = 0;
510                        crc32 = get_checksum(reinterpret_cast<const uint8_t*>(buf),
511                                             len - sizeof(crc),
512                                             CRC32);
513#ifdef _CRC32C
514                        crc32c = get_checksum(reinterpret_cast<const uint8_t*>(buf),
515                                              len - sizeof(crc),
516                                              CRC32C);
517#endif
518                        const char *msg = "Crash Detected: CRC on disk %u != (%u | %u) "
519                            "in a database file '%s'\n";
520                        DBG(msg, crc_file, crc32, crc32c, file->filename);
521                        fdb_log(log_callback, status, msg, crc_file, crc32, crc32c,
522                                file->filename);
523                    }
524                } else {
525                    status = FDB_RESULT_FILE_CORRUPTION;
526                    const char *msg = "Crash Detected: Wrong Magic %" _F64
527                                      " in a database file '%s'\n";
528                    fdb_log(log_callback, status, msg, magic, file->filename);
529                }
530            } else {
531                status = FDB_RESULT_NO_DB_HEADERS;
532                if (block_counter == 1) {
533                    const char *msg = "Crash Detected: Last Block not DBHEADER %0.01x "
534                                      "in a database file '%s'\n";
535                    DBG(msg, marker[0], file->filename);
536                    fdb_log(log_callback, status, msg, marker[0], file->filename);
537                }
538            }
539
540            atomic_store_uint64_t(&file->last_commit, hdr_bid_local * file->blocksize);
541            // traverse headers in a circular manner
542            if (hdr_bid_local) {
543                hdr_bid_local--;
544            } else {
545                hdr_bid_local = atomic_get_uint64_t(&file->pos) / file->blocksize - 1;
546            }
547        } while (hdr_bid_local != hdr_bid);
548    }
549
550    // release temp buffer
551    _filemgr_release_temp_buf(buf);
552
553    file->header.size = 0;
554    file->header.revnum = 0;
555    file->header.seqnum = 0;
556    file->header.data = NULL;
557    atomic_store_uint64_t(&file->header.bid, 0);
558    memset(&file->header.stat, 0x0, sizeof(file->header.stat));
559    file->version = magic;
560    return status;
561}
562
563size_t filemgr_get_ref_count(struct filemgr *file)
564{
565    size_t ret = 0;
566    spin_lock(&file->lock);
567    ret = atomic_get_uint32_t(&file->ref_count);
568    spin_unlock(&file->lock);
569    return ret;
570}
571
572uint64_t filemgr_get_bcache_used_space(void)
573{
574    uint64_t bcache_free_space = 0;
575    if (global_config.ncacheblock) { // If buffer cache is indeed configured
576        bcache_free_space = bcache_get_num_free_blocks();
577        bcache_free_space = (global_config.ncacheblock - bcache_free_space)
578                          * global_config.blocksize;
579    }
580    return bcache_free_space;
581}
582
583struct filemgr_prefetch_args {
584    struct filemgr *file;
585    uint64_t duration;
586    err_log_callback *log_callback;
587    void *aux;
588};
589
590static void *_filemgr_prefetch_thread(void *voidargs)
591{
592    struct filemgr_prefetch_args *args = (struct filemgr_prefetch_args*)voidargs;
593    uint8_t *buf = alca(uint8_t, args->file->blocksize);
594    uint64_t cur_pos = 0, i;
595    uint64_t bcache_free_space;
596    bid_t bid;
597    bool terminate = false;
598    struct timeval begin, cur, gap;
599
600    spin_lock(&args->file->lock);
601    cur_pos = atomic_get_uint64_t(&args->file->last_commit);
602    spin_unlock(&args->file->lock);
603    if (cur_pos < FILEMGR_PREFETCH_UNIT) {
604        terminate = true;
605    } else {
606        cur_pos -= FILEMGR_PREFETCH_UNIT;
607    }
608    // read backwards from the end of the file, in the unit of FILEMGR_PREFETCH_UNIT
609    gettimeofday(&begin, NULL);
610    while (!terminate) {
611        for (i = cur_pos;
612             i < cur_pos + FILEMGR_PREFETCH_UNIT;
613             i += args->file->blocksize) {
614
615            gettimeofday(&cur, NULL);
616            gap = _utime_gap(begin, cur);
617            bcache_free_space = bcache_get_num_free_blocks();
618            bcache_free_space *= args->file->blocksize;
619
620            if (atomic_get_uint8_t(&args->file->prefetch_status)
621                == FILEMGR_PREFETCH_ABORT ||
622                gap.tv_sec >= (int64_t)args->duration ||
623                bcache_free_space < FILEMGR_PREFETCH_UNIT) {
624                // terminate thread when
625                // 1. got abort signal
626                // 2. time out
627                // 3. not enough free space in block cache
628                terminate = true;
629                break;
630            } else {
631                bid = i / args->file->blocksize;
632                if (filemgr_read(args->file, bid, buf, NULL, true)
633                        != FDB_RESULT_SUCCESS) {
634                    // 4. read failure
635                    fdb_log(args->log_callback, FDB_RESULT_READ_FAIL,
636                            "Prefetch thread failed to read a block with block id %" _F64
637                            " from a database file '%s'", bid, args->file->filename);
638                    terminate = true;
639                    break;
640                }
641            }
642        }
643
644        if (cur_pos >= FILEMGR_PREFETCH_UNIT) {
645            cur_pos -= FILEMGR_PREFETCH_UNIT;
646        } else {
647            // remaining space is less than FILEMGR_PREFETCH_UNIT
648            terminate = true;
649        }
650    }
651
652    atomic_cas_uint8_t(&args->file->prefetch_status, FILEMGR_PREFETCH_RUNNING,
653                       FILEMGR_PREFETCH_IDLE);
654    free(args);
655    return NULL;
656}
657
658// prefetch the given DB file
659void filemgr_prefetch(struct filemgr *file,
660                      struct filemgr_config *config,
661                      err_log_callback *log_callback)
662{
663    uint64_t bcache_free_space;
664
665    bcache_free_space = bcache_get_num_free_blocks();
666    bcache_free_space *= file->blocksize;
667
668    // block cache should have free space larger than FILEMGR_PREFETCH_UNIT
669    spin_lock(&file->lock);
670    if (atomic_get_uint64_t(&file->last_commit) > 0 &&
671        bcache_free_space >= FILEMGR_PREFETCH_UNIT) {
672        // invoke prefetch thread
673        struct filemgr_prefetch_args *args;
674        args = (struct filemgr_prefetch_args *)
675               calloc(1, sizeof(struct filemgr_prefetch_args));
676        args->file = file;
677        args->duration = config->prefetch_duration;
678        args->log_callback = log_callback;
679
680        if (atomic_cas_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_IDLE,
681                               FILEMGR_PREFETCH_RUNNING)) {
682            thread_create(&file->prefetch_tid, _filemgr_prefetch_thread, args);
683        }
684    }
685    spin_unlock(&file->lock);
686}
687
688fdb_status filemgr_does_file_exist(char *filename) {
689    struct filemgr_ops *ops = get_filemgr_ops();
690    int fd = ops->open(filename, O_RDONLY, 0444);
691    if (fd < 0) {
692        return (fdb_status) fd;
693    }
694    ops->close(fd);
695    return FDB_RESULT_SUCCESS;
696}
697
698static fdb_status _filemgr_load_sb(struct filemgr *file,
699                                   err_log_callback *log_callback)
700{
701    fdb_status status = FDB_RESULT_SUCCESS;
702    struct sb_config sconfig;
703
704    if (sb_ops.init && sb_ops.get_default_config && sb_ops.read_latest) {
705        sconfig = sb_ops.get_default_config();
706        if (filemgr_get_pos(file)) {
707            // existing file
708            status = sb_ops.read_latest(file, sconfig, log_callback);
709        } else {
710            // new file
711            status = sb_ops.init(file, sconfig, log_callback);
712        }
713    }
714
715    return status;
716}
717
718filemgr_open_result filemgr_open(char *filename, struct filemgr_ops *ops,
719                                 struct filemgr_config *config,
720                                 err_log_callback *log_callback)
721{
722    struct filemgr *file = NULL;
723    struct filemgr query;
724    struct hash_elem *e = NULL;
725    bool create = config->options & FILEMGR_CREATE;
726    int file_flag = 0x0;
727    int fd = -1;
728    fdb_status status;
729    filemgr_open_result result = {NULL, FDB_RESULT_OPEN_FAIL};
730
731    filemgr_init(config);
732
733    if (config->encryption_key.algorithm != FDB_ENCRYPTION_NONE && global_config.ncacheblock <= 0) {
734        // cannot use encryption without a block cache
735        result.rv = FDB_RESULT_CRYPTO_ERROR;
736        return result;
737    }
738
739    // check whether file is already opened or not
740    query.filename = filename;
741    spin_lock(&filemgr_openlock);
742    e = hash_find(&hash, &query.e);
743
744    if (e) {
745        // already opened (return existing structure)
746        file = _get_entry(e, struct filemgr, e);
747
748        if (atomic_incr_uint32_t(&file->ref_count) > 1 &&
749            atomic_get_uint8_t(&file->status) != FILE_CLOSED) {
750            spin_unlock(&filemgr_openlock);
751            result.file = file;
752            result.rv = FDB_RESULT_SUCCESS;
753            return result;
754        }
755
756        spin_lock(&file->lock);
757
758        if (atomic_get_uint8_t(&file->status) == FILE_CLOSED) { // if file was closed before
759            file_flag = O_RDWR;
760            if (create) {
761                file_flag |= O_CREAT;
762            }
763            *file->config = *config;
764            file->config->blocksize = global_config.blocksize;
765            file->config->ncacheblock = global_config.ncacheblock;
766            file_flag |= config->flag;
767            file->fd = file->ops->open(file->filename, file_flag, 0666);
768            if (file->fd < 0) {
769                if (file->fd == FDB_RESULT_NO_SUCH_FILE) {
770                    // A database file was manually deleted by the user.
771                    // Clean up global hash table, WAL index, and buffer cache.
772                    // Then, retry it with a create option below IFF it is not
773                    // a read-only open attempt
774                    struct hash_elem *ret;
775                    spin_unlock(&file->lock);
776                    ret = hash_remove(&hash, &file->e);
777                    fdb_assert(ret, 0, 0);
778                    filemgr_free_func(&file->e);
779                    if (!create) {
780                        _log_errno_str(ops, log_callback,
781                                FDB_RESULT_NO_SUCH_FILE, "OPEN", filename);
782                        spin_unlock(&filemgr_openlock);
783                        result.rv = FDB_RESULT_NO_SUCH_FILE;
784                        return result;
785                    }
786                } else {
787                    _log_errno_str(file->ops, log_callback,
788                                  (fdb_status)file->fd, "OPEN", filename);
789                    atomic_decr_uint32_t(&file->ref_count);
790                    spin_unlock(&file->lock);
791                    spin_unlock(&filemgr_openlock);
792                    result.rv = file->fd;
793                    return result;
794                }
795            } else { // Reopening the closed file is succeed.
796                atomic_store_uint8_t(&file->status, FILE_NORMAL);
797                if (config->options & FILEMGR_SYNC) {
798                    file->fflags |= FILEMGR_SYNC;
799                } else {
800                    file->fflags &= ~FILEMGR_SYNC;
801                }
802
803                spin_unlock(&file->lock);
804                spin_unlock(&filemgr_openlock);
805
806                result.file = file;
807                result.rv = FDB_RESULT_SUCCESS;
808                return result;
809            }
810        } else { // file is already opened.
811
812            if (config->options & FILEMGR_SYNC) {
813                file->fflags |= FILEMGR_SYNC;
814            } else {
815                file->fflags &= ~FILEMGR_SYNC;
816            }
817
818            spin_unlock(&file->lock);
819            spin_unlock(&filemgr_openlock);
820            result.file = file;
821            result.rv = FDB_RESULT_SUCCESS;
822            return result;
823        }
824    }
825
826    file_flag = O_RDWR;
827    if (create) {
828        file_flag |= O_CREAT;
829    }
830    file_flag |= config->flag;
831    fd = ops->open(filename, file_flag, 0666);
832    if (fd < 0) {
833        _log_errno_str(ops, log_callback, (fdb_status)fd, "OPEN", filename);
834        spin_unlock(&filemgr_openlock);
835        result.rv = fd;
836        return result;
837    }
838    file = (struct filemgr*)calloc(1, sizeof(struct filemgr));
839    file->filename_len = strlen(filename);
840    file->filename = (char*)malloc(file->filename_len + 1);
841    strcpy(file->filename, filename);
842
843    atomic_init_uint32_t(&file->ref_count, 1);
844    file->stale_list = NULL;
845
846    status = fdb_init_encryptor(&file->encryption, &config->encryption_key);
847    if (status != FDB_RESULT_SUCCESS) {
848        ops->close(fd);
849        free(file);
850        spin_unlock(&filemgr_openlock);
851        result.rv = status;
852        return result;
853    }
854
855    file->wal = (struct wal *)calloc(1, sizeof(struct wal));
856    file->wal->flag = 0;
857
858    file->ops = ops;
859    file->blocksize = global_config.blocksize;
860    atomic_init_uint8_t(&file->status, FILE_NORMAL);
861    file->config = (struct filemgr_config*)malloc(sizeof(struct filemgr_config));
862    *file->config = *config;
863    file->config->blocksize = global_config.blocksize;
864    file->config->ncacheblock = global_config.ncacheblock;
865    file->new_file = NULL;
866    file->prev_file = NULL;
867    file->old_filename = NULL;
868    file->fd = fd;
869
870    cs_off_t offset = file->ops->goto_eof(file->fd);
871    if (offset < 0) {
872        _log_errno_str(file->ops, log_callback, (fdb_status) offset, "SEEK_END", filename);
873        file->ops->close(file->fd);
874        free(file->wal);
875        free(file->filename);
876        free(file->config);
877        free(file);
878        spin_unlock(&filemgr_openlock);
879        result.rv = (fdb_status) offset;
880        return result;
881    }
882    atomic_init_uint64_t(&file->last_commit, offset);
883    atomic_init_uint64_t(&file->last_commit_bmp_revnum, 0);
884    atomic_init_uint64_t(&file->pos, offset);
885    atomic_init_uint32_t(&file->throttling_delay, 0);
886    atomic_init_uint64_t(&file->num_invalidated_blocks, 0);
887    atomic_init_uint8_t(&file->io_in_prog, 0);
888
889#ifdef _LATENCY_STATS
890    for (int i = 0; i < FDB_LATENCY_NUM_STATS; ++i) {
891        filemgr_init_latency_stat(&file->lat_stats[i]);
892    }
893#endif // _LATENCY_STATS
894
895    file->bcache = NULL;
896    file->in_place_compaction = false;
897    file->kv_header = NULL;
898    atomic_init_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_IDLE);
899
900    atomic_init_uint64_t(&file->header.bid, 0);
901    _init_op_stats(&file->header.op_stat);
902
903    spin_init(&file->lock);
904    file->stale_list = (struct list*)calloc(1, sizeof(struct list));
905    list_init(file->stale_list);
906
907    filemgr_dirty_update_init(file);
908
909    spin_init(&file->fhandle_idx_lock);
910    avl_init(&file->fhandle_idx, NULL);
911
912#ifdef __FILEMGR_DATA_PARTIAL_LOCK
913    struct plock_ops pops;
914    struct plock_config pconfig;
915
916    pops.init_user = mutex_init_wrap;
917    pops.lock_user = mutex_lock_wrap;
918    pops.unlock_user = mutex_unlock_wrap;
919    pops.destroy_user = mutex_destroy_wrap;
920    pops.init_internal = spin_init_wrap;
921    pops.lock_internal = spin_lock_wrap;
922    pops.unlock_internal = spin_unlock_wrap;
923    pops.destroy_internal = spin_destroy_wrap;
924    pops.is_overlapped = _block_is_overlapped;
925
926    memset(&pconfig, 0x0, sizeof(pconfig));
927    pconfig.ops = &pops;
928    pconfig.sizeof_lock_internal = sizeof(spin_t);
929    pconfig.sizeof_lock_user = sizeof(mutex_t);
930    pconfig.sizeof_range = sizeof(bid_t);
931    pconfig.aux = NULL;
932    plock_init(&file->plock, &pconfig);
933#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
934    int i;
935    for (i=0;i<DLOCK_MAX;++i) {
936        mutex_init(&file->data_mutex[i]);
937    }
938#else
939    int i;
940    for (i=0;i<DLOCK_MAX;++i) {
941        spin_init(&file->data_spinlock[i]);
942    }
943#endif //__FILEMGR_DATA_PARTIAL_LOCK
944
945    mutex_init(&file->writer_lock.mutex);
946    file->writer_lock.locked = false;
947
948    // Note: CRC must be initialized before superblock loading
949    // initialize CRC mode
950    if (file->config && file->config->options & FILEMGR_CREATE_CRC32) {
951        file->crc_mode = CRC32;
952    } else {
953        file->crc_mode = CRC_DEFAULT;
954    }
955
956    do { // repeat until both superblock and DB header are correctly read
957        // init or load superblock
958        status = _filemgr_load_sb(file, log_callback);
959        // we can tolerate SB_READ_FAIL for old version file
960        if (status != FDB_RESULT_SB_READ_FAIL &&
961            status != FDB_RESULT_SUCCESS) {
962            _log_errno_str(file->ops, log_callback, status, "READ", file->filename);
963            file->ops->close(file->fd);
964            free(file->stale_list);
965            free(file->wal);
966            free(file->filename);
967            free(file->config);
968            free(file);
969            spin_unlock(&filemgr_openlock);
970            result.rv = status;
971            return result;
972        }
973
974        // read header
975        status = _filemgr_read_header(file, log_callback);
976        if (file->sb && status == FDB_RESULT_NO_DB_HEADERS) {
977            // this happens when user created & closed a file without any mutations,
978            // thus there is no other data but superblocks.
979            // we can also tolerate this case.
980        } else if (status != FDB_RESULT_SUCCESS) {
981            _log_errno_str(file->ops, log_callback, status, "READ", filename);
982            file->ops->close(file->fd);
983            if (file->sb) {
984                sb_ops.release(file);
985            }
986            free(file->stale_list);
987            free(file->wal);
988            free(file->filename);
989            free(file->config);
990            free(file);
991            spin_unlock(&filemgr_openlock);
992            result.rv = status;
993            return result;
994        }
995
996        if (file->sb &&
997            file->header.revnum != atomic_get_uint64_t(&file->sb->last_hdr_revnum)) {
998            // superblock exists but the corresponding DB header does not match.
999            // read another candidate.
1000            continue;
1001        }
1002
1003        break;
1004    } while (true);
1005
1006    // initialize WAL
1007    if (!wal_is_initialized(file)) {
1008        wal_init(file, FDB_WAL_NBUCKET);
1009    }
1010
1011    // init global transaction for the file
1012    file->global_txn.wrapper = (struct wal_txn_wrapper*)
1013                               malloc(sizeof(struct wal_txn_wrapper));
1014    file->global_txn.wrapper->txn = &file->global_txn;
1015    file->global_txn.handle = NULL;
1016    if (atomic_get_uint64_t(&file->pos)) {
1017        file->global_txn.prev_hdr_bid =
1018            (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1;
1019    } else {
1020        file->global_txn.prev_hdr_bid = BLK_NOT_FOUND;
1021    }
1022    file->global_txn.prev_revnum = 0;
1023    file->global_txn.items = (struct list *)malloc(sizeof(struct list));
1024    list_init(file->global_txn.items);
1025    file->global_txn.isolation = FDB_ISOLATION_READ_COMMITTED;
1026    wal_add_transaction(file, &file->global_txn);
1027
1028    hash_insert(&hash, &file->e);
1029    if (config->prefetch_duration > 0) {
1030        filemgr_prefetch(file, config, log_callback);
1031    }
1032
1033    spin_unlock(&filemgr_openlock);
1034
1035    if (config->options & FILEMGR_SYNC) {
1036        file->fflags |= FILEMGR_SYNC;
1037    } else {
1038        file->fflags &= ~FILEMGR_SYNC;
1039    }
1040
1041    result.file = file;
1042    result.rv = FDB_RESULT_SUCCESS;
1043    return result;
1044}
1045
1046uint64_t filemgr_update_header(struct filemgr *file,
1047                               void *buf,
1048                               size_t len,
1049                               bool inc_revnum)
1050{
1051    uint64_t ret;
1052
1053    spin_lock(&file->lock);
1054
1055    if (file->header.data == NULL) {
1056        file->header.data = (void *)malloc(file->blocksize);
1057    }
1058    memcpy(file->header.data, buf, len);
1059    file->header.size = len;
1060    if (inc_revnum) {
1061        ++(file->header.revnum);
1062    }
1063    ret = file->header.revnum;
1064
1065    spin_unlock(&file->lock);
1066
1067    return ret;
1068}
1069
1070filemgr_header_revnum_t filemgr_get_header_revnum(struct filemgr *file)
1071{
1072    filemgr_header_revnum_t ret;
1073    spin_lock(&file->lock);
1074    ret = file->header.revnum;
1075    spin_unlock(&file->lock);
1076    return ret;
1077}
1078
1079// 'filemgr_get_seqnum', 'filemgr_set_seqnum',
1080// 'filemgr_get_walflush_revnum', 'filemgr_set_walflush_revnum'
1081// have to be protected by 'filemgr_mutex_lock' & 'filemgr_mutex_unlock'.
1082fdb_seqnum_t filemgr_get_seqnum(struct filemgr *file)
1083{
1084    return file->header.seqnum;
1085}
1086
1087void filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)
1088{
1089    file->header.seqnum = seqnum;
1090}
1091
1092void* filemgr_get_header(struct filemgr *file, void *buf, size_t *len,
1093                         bid_t *header_bid, fdb_seqnum_t *seqnum,
1094                         filemgr_header_revnum_t *header_revnum)
1095{
1096    spin_lock(&file->lock);
1097
1098    if (file->header.size > 0) {
1099        if (buf == NULL) {
1100            buf = (void*)malloc(file->header.size);
1101        }
1102        memcpy(buf, file->header.data, file->header.size);
1103    }
1104
1105    if (len) {
1106        *len = file->header.size;
1107    }
1108    if (header_bid) {
1109        *header_bid = filemgr_get_header_bid(file);
1110    }
1111    if (seqnum) {
1112        *seqnum = file->header.seqnum;
1113    }
1114    if (header_revnum) {
1115        *header_revnum = file->header.revnum;
1116    }
1117
1118    spin_unlock(&file->lock);
1119
1120    return buf;
1121}
1122
1123uint64_t filemgr_get_sb_bmp_revnum(struct filemgr *file)
1124{
1125    if (file->sb && sb_ops.get_bmp_revnum) {
1126        return sb_ops.get_bmp_revnum(file);
1127    } else {
1128        return 0;
1129    }
1130}
1131
1132fdb_status filemgr_fetch_header(struct filemgr *file, uint64_t bid,
1133                                void *buf, size_t *len, fdb_seqnum_t *seqnum,
1134                                filemgr_header_revnum_t *header_revnum,
1135                                uint64_t *deltasize, uint64_t *version,
1136                                uint64_t *sb_bmp_revnum,
1137                                err_log_callback *log_callback)
1138{
1139    uint8_t *_buf;
1140    uint8_t marker[BLK_MARKER_SIZE];
1141    filemgr_header_len_t hdr_len;
1142    uint64_t _deltasize, _bmp_revnum;
1143    filemgr_magic_t magic;
1144    fdb_status status = FDB_RESULT_SUCCESS;
1145
1146    *len = 0;
1147
1148    if (!bid || bid == BLK_NOT_FOUND) {
1149        // No other header available
1150        return FDB_RESULT_SUCCESS;
1151    }
1152
1153    _buf = (uint8_t *)_filemgr_get_temp_buf();
1154
1155    status = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
1156
1157    if (status != FDB_RESULT_SUCCESS) {
1158        fdb_log(log_callback, status,
1159                "Failed to read a database header with block id %" _F64 " in "
1160                "a database file '%s'", bid, file->filename);
1161        _filemgr_release_temp_buf(_buf);
1162        return status;
1163    }
1164    memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1165            BLK_MARKER_SIZE);
1166
1167    if (marker[0] != BLK_MARKER_DBHEADER) {
1168        // Comment this warning log as of now because the circular block reuse
1169        // can cause false alarms as a previous stale header block can be reclaimed
1170        // and reused for incoming writes.
1171        /*
1172        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1173                "A block marker of the database header block id %" _F64 " in "
1174                "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1175                bid, file->filename);
1176        */
1177        _filemgr_release_temp_buf(_buf);
1178        return FDB_RESULT_READ_FAIL;
1179    }
1180    memcpy(&magic,
1181            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1182            sizeof(magic));
1183    magic = _endian_decode(magic);
1184    if (!ver_is_valid_magic(magic)) {
1185        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1186                "A block magic value of %" _F64 " in the database header block"
1187                "id %" _F64 " in a database file '%s'"
1188                "does NOT match FILEMGR_MAGIC %" _F64 "!",
1189                magic, bid, file->filename, ver_get_latest_magic());
1190        _filemgr_release_temp_buf(_buf);
1191        return FDB_RESULT_FILE_CORRUPTION;
1192    }
1193    memcpy(&hdr_len,
1194            _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1195            sizeof(hdr_len), sizeof(hdr_len));
1196    hdr_len = _endian_decode(hdr_len);
1197
1198    memcpy(buf, _buf, hdr_len);
1199    *len = hdr_len;
1200    *version = magic;
1201
1202    if (header_revnum) {
1203        // copy the DB header revnum
1204        filemgr_header_revnum_t _revnum;
1205        memcpy(&_revnum, _buf + hdr_len, sizeof(_revnum));
1206        *header_revnum = _endian_decode(_revnum);
1207    }
1208    if (seqnum) {
1209        // copy default KVS's seqnum
1210        fdb_seqnum_t _seqnum;
1211        memcpy(&_seqnum, _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1212               sizeof(_seqnum));
1213        *seqnum = _endian_decode(_seqnum);
1214    }
1215
1216    if (ver_is_atleast_magic_001(magic)) {
1217        if (deltasize) {
1218            memcpy(&_deltasize, _buf + file->blocksize - BLK_MARKER_SIZE
1219                    - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1220                    - sizeof(_deltasize), sizeof(_deltasize));
1221            *deltasize = _endian_decode(_deltasize);
1222        }
1223    }
1224
1225    if (sb_bmp_revnum && ver_superblock_support(magic)) {
1226        memcpy(&_bmp_revnum, _buf + file->blocksize - BLK_MARKER_SIZE
1227                - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1228                - sizeof(_deltasize) - sizeof(_bmp_revnum), sizeof(_bmp_revnum));
1229        *sb_bmp_revnum = _endian_decode(_bmp_revnum);
1230    }
1231
1232    _filemgr_release_temp_buf(_buf);
1233
1234    return status;
1235}
1236
1237uint64_t filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid,
1238                                   void *buf, size_t *len, fdb_seqnum_t *seqnum,
1239                                   filemgr_header_revnum_t *revnum,
1240                                   uint64_t *deltasize, uint64_t *version,
1241                                   uint64_t *sb_bmp_revnum,
1242                                   err_log_callback *log_callback)
1243{
1244    uint8_t *_buf;
1245    uint8_t marker[BLK_MARKER_SIZE];
1246    fdb_seqnum_t _seqnum;
1247    filemgr_header_revnum_t _revnum, cur_revnum, prev_revnum;
1248    filemgr_header_len_t hdr_len;
1249    filemgr_magic_t magic;
1250    bid_t _prev_bid, prev_bid;
1251    uint64_t _deltasize, _bmp_revnum;
1252    int found = 0;
1253
1254    *len = 0;
1255
1256    if (!bid || bid == BLK_NOT_FOUND) {
1257        // No other header available
1258        return bid;
1259    }
1260    _buf = (uint8_t *)_filemgr_get_temp_buf();
1261
1262    // Reverse scan the file for a previous DB header
1263    do {
1264        // Get prev_bid from the current header.
1265        // Since the current header is already cached during the previous
1266        // operation, no disk I/O will be triggered.
1267        if (filemgr_read(file, (bid_t)bid, _buf, log_callback, true)
1268                != FDB_RESULT_SUCCESS) {
1269            break;
1270        }
1271
1272        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1273               BLK_MARKER_SIZE);
1274        memcpy(&magic,
1275               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1276               sizeof(magic));
1277        magic = _endian_decode(magic);
1278
1279        if (marker[0] != BLK_MARKER_DBHEADER ||
1280            !ver_is_valid_magic(magic)) {
1281            // not a header block
1282            // this happens when this function is invoked between
1283            // fdb_set() call and fdb_commit() call, so the last block
1284            // in the file is not a header block
1285            bid_t latest_hdr = filemgr_get_header_bid(file);
1286            if (latest_hdr != BLK_NOT_FOUND && bid > latest_hdr) {
1287                // get the latest header BID
1288                bid = latest_hdr;
1289            } else {
1290                break;
1291            }
1292            cur_revnum = file->header.revnum + 1;
1293        } else {
1294
1295            memcpy(&hdr_len,
1296                   _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1297                   sizeof(hdr_len), sizeof(hdr_len));
1298            hdr_len = _endian_decode(hdr_len);
1299
1300            memcpy(&_revnum, _buf + hdr_len,
1301                   sizeof(filemgr_header_revnum_t));
1302            cur_revnum = _endian_decode(_revnum);
1303
1304            if (sb_bmp_exists(file->sb)) {
1305                // first check revnum
1306                if (cur_revnum <= sb_ops.get_min_live_revnum(file)) {
1307                    // previous headers already have been reclaimed
1308                    // no more logical prev header
1309                    break;
1310                }
1311            }
1312
1313            memcpy(&_prev_bid,
1314                   _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1315                       sizeof(hdr_len) - sizeof(_prev_bid),
1316                   sizeof(_prev_bid));
1317            prev_bid = _endian_decode(_prev_bid);
1318            bid = prev_bid;
1319        }
1320
1321        // Read the prev header
1322        fdb_status fs = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
1323        if (fs != FDB_RESULT_SUCCESS) {
1324            fdb_log(log_callback, fs,
1325                    "Failed to read a previous database header with block id %"
1326                    _F64 " in "
1327                    "a database file '%s'", bid, file->filename);
1328            break;
1329        }
1330
1331        memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1332               BLK_MARKER_SIZE);
1333        if (marker[0] != BLK_MARKER_DBHEADER) {
1334            if (bid) {
1335                // broken linked list
1336                fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1337                        "A block marker of the previous database header block id %"
1338                        _F64 " in "
1339                        "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1340                        bid, file->filename);
1341            }
1342            break;
1343        }
1344
1345        memcpy(&magic,
1346               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1347               sizeof(magic));
1348        magic = _endian_decode(magic);
1349        if (!ver_is_valid_magic(magic)) {
1350            // broken linked list
1351            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1352                    "A block magic value of %" _F64
1353                    " of the previous database header block id %" _F64 " in "
1354                    "a database file '%s' does NOT match FILEMGR_MAGIC %"
1355                    _F64"!", magic,
1356                    bid, file->filename, ver_get_latest_magic());
1357            break;
1358        }
1359
1360        memcpy(&hdr_len,
1361               _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1362               sizeof(hdr_len), sizeof(hdr_len));
1363        hdr_len = _endian_decode(hdr_len);
1364
1365        if (buf) {
1366            memcpy(buf, _buf, hdr_len);
1367        }
1368        memcpy(&_revnum, _buf + hdr_len,
1369               sizeof(filemgr_header_revnum_t));
1370        prev_revnum = _endian_decode(_revnum);
1371        if (prev_revnum >= cur_revnum ||
1372            prev_revnum < sb_ops.get_min_live_revnum(file)) {
1373            // no more prev header, or broken linked list
1374            break;
1375        }
1376
1377        memcpy(&_seqnum,
1378               _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1379               sizeof(fdb_seqnum_t));
1380        if (ver_is_atleast_magic_001(magic)) {
1381            if (deltasize) {
1382                memcpy(&_deltasize,
1383                        _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic)
1384                       - sizeof(hdr_len) - sizeof(prev_bid) - sizeof(_deltasize),
1385                        sizeof(_deltasize));
1386                *deltasize = _endian_decode(_deltasize);
1387            }
1388        }
1389
1390        if (sb_bmp_revnum && ver_superblock_support(magic)) {
1391            memcpy(&_bmp_revnum, _buf + file->blocksize - BLK_MARKER_SIZE
1392                    - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1393                    - sizeof(_deltasize) - sizeof(_bmp_revnum), sizeof(_bmp_revnum));
1394            *sb_bmp_revnum = _endian_decode(_bmp_revnum);
1395        }
1396
1397        if (revnum) {
1398            *revnum = prev_revnum;
1399        }
1400        *seqnum = _endian_decode(_seqnum);
1401        *len = hdr_len;
1402        *version = magic;
1403        found = 1;
1404        break;
1405    } while (false); // no repetition
1406
1407    if (!found) { // no other header found till end of file
1408        *len = 0;
1409        bid = BLK_NOT_FOUND;
1410    }
1411
1412    _filemgr_release_temp_buf(_buf);
1413
1414    return bid;
1415}
1416
1417static void update_file_pointers(struct filemgr *file) {
1418    // Update new_file pointers of all previously redirected downstream files
1419    struct filemgr *temp = file->prev_file;
1420    while (temp != NULL) {
1421        spin_lock(&temp->lock);
1422        if (temp->new_file == file) {
1423            temp->new_file = file->new_file;
1424        }
1425        spin_unlock(&temp->lock);
1426        temp = temp->prev_file;
1427    }
1428    // Update prev_file pointer of the upstream file if any
1429    if (file->new_file != NULL) {
1430        spin_lock(&file->new_file->lock);
1431        file->new_file->prev_file = file->prev_file;
1432        spin_unlock(&file->new_file->lock);
1433    }
1434}
1435
1436fdb_status filemgr_close(struct filemgr *file, bool cleanup_cache_onclose,
1437                         const char *orig_file_name,
1438                         err_log_callback *log_callback)
1439{
1440    int rv = FDB_RESULT_SUCCESS;
1441
1442    if (atomic_decr_uint32_t(&file->ref_count) > 0) {
1443        // File is still accessed by other readers or writers.
1444        return FDB_RESULT_SUCCESS;
1445    }
1446
1447    spin_lock(&filemgr_openlock); // Grab the filemgr lock to avoid the race with
1448                                  // filemgr_open() because file->lock won't
1449                                  // prevent the race condition.
1450
1451    // remove filemgr structure if no thread refers to the file
1452    spin_lock(&file->lock);
1453    if (atomic_get_uint32_t(&file->ref_count) == 0) {
1454        if (global_config.ncacheblock > 0 &&
1455            atomic_get_uint8_t(&file->status) != FILE_REMOVED_PENDING) {
1456            spin_unlock(&file->lock);
1457            // discard all dirty blocks belonged to this file
1458            bcache_remove_dirty_blocks(file);
1459        } else {
1460            // If the file is in pending removal (i.e., FILE_REMOVED_PENDING),
1461            // then its dirty block entries will be cleaned up in either
1462            // filemgr_free_func() or register_file_removal() below.
1463            spin_unlock(&file->lock);
1464        }
1465
1466        if (wal_is_initialized(file)) {
1467            wal_close(file, log_callback);
1468        }
1469#ifdef _LATENCY_STATS_DUMP_TO_FILE
1470        filemgr_dump_latency_stat(file, log_callback);
1471#endif // _LATENCY_STATS_DUMP_TO_FILE
1472
1473        spin_lock(&file->lock);
1474
1475        if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING) {
1476
1477            bool foreground_deletion = false;
1478
1479            // immediately remove file if background remove function is not set
1480            if (!lazy_file_deletion_enabled ||
1481                (file->new_file && file->new_file->in_place_compaction)) {
1482                // TODO: to avoid the scenario below, we prevent background
1483                //       deletion of in-place compacted files at this time.
1484                // 1) In-place compacted from 'A' to 'A.1'.
1485                // 2) Request to delete 'A'.
1486                // 3) Close 'A.1'; since 'A' is not deleted yet, 'A.1' is not renamed.
1487                // 4) User opens DB file using its original name 'A', not 'A.1'.
1488                // 5) Old file 'A' is opened, and then background thread deletes 'A'.
1489                // 6) Crash!
1490
1491                // As the file is already unlinked, the file will be removed
1492                // as soon as we close it.
1493                rv = file->ops->close(file->fd);
1494                _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1495#if defined(WIN32) || defined(_WIN32)
1496                // For Windows, we need to manually remove the file.
1497                remove(file->filename);
1498#endif
1499                foreground_deletion = true;
1500            }
1501
1502            // we can release lock becuase no one will open this file
1503            spin_unlock(&file->lock);
1504            struct hash_elem *ret = hash_remove(&hash, &file->e);
1505            fdb_assert(ret, 0, 0);
1506
1507            update_file_pointers(file);
1508            spin_unlock(&filemgr_openlock);
1509
1510            if (foreground_deletion) {
1511                filemgr_free_func(&file->e);
1512            } else {
1513                register_file_removal(file, log_callback);
1514            }
1515            return (fdb_status) rv;
1516        } else {
1517
1518            rv = file->ops->close(file->fd);
1519            if (cleanup_cache_onclose) {
1520                _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1521                if (file->in_place_compaction && orig_file_name) {
1522                    struct hash_elem *elem = NULL;
1523                    struct filemgr query;
1524                    uint32_t old_file_refcount = 0;
1525
1526                    query.filename = (char *)orig_file_name;
1527                    elem = hash_find(&hash, &query.e);
1528
1529                    if (file->old_filename) {
1530                        struct hash_elem *elem_old = NULL;
1531                        struct filemgr query_old;
1532                        struct filemgr *old_file = NULL;
1533
1534                        // get old file's ref count if exists
1535                        query_old.filename = file->old_filename;
1536                        elem_old = hash_find(&hash, &query_old.e);
1537                        if (elem_old) {
1538                            old_file = _get_entry(elem_old, struct filemgr, e);
1539                            old_file_refcount = atomic_get_uint32_t(&old_file->ref_count);
1540                        }
1541                    }
1542
1543                    // If old file is opened by other handle, renaming should be
1544                    // postponed. It will be renamed later by the handle referring
1545                    // to the old file.
1546                    if (!elem && old_file_refcount == 0 &&
1547                        is_file_removed(orig_file_name)) {
1548                        // If background file removal is not done yet, we postpone
1549                        // file renaming at this time.
1550                        if (rename(file->filename, orig_file_name) < 0) {
1551                            // Note that the renaming failure is not a critical
1552                            // issue because the last compacted file will be automatically
1553                            // identified and opened in the next fdb_open call.
1554                            _log_errno_str(file->ops, log_callback, FDB_RESULT_FILE_RENAME_FAIL,
1555                                           "CLOSE", file->filename);
1556                        }
1557                    }
1558                }
1559                spin_unlock(&file->lock);
1560                // Clean up global hash table, WAL index, and buffer cache.
1561                struct hash_elem *ret = hash_remove(&hash, &file->e);
1562                fdb_assert(ret, file, 0);
1563
1564                update_file_pointers(file);
1565                spin_unlock(&filemgr_openlock);
1566
1567                filemgr_free_func(&file->e);
1568                return (fdb_status) rv;
1569            } else {
1570                atomic_store_uint8_t(&file->status, FILE_CLOSED);
1571            }
1572        }
1573    }
1574
1575    _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1576
1577    spin_unlock(&file->lock);
1578    spin_unlock(&filemgr_openlock);
1579    return (fdb_status) rv;
1580}
1581
1582void filemgr_remove_all_buffer_blocks(struct filemgr *file)
1583{
1584    // remove all cached blocks
1585    if (global_config.ncacheblock > 0 &&
1586            file->bcache.load(std::memory_order_relaxed)) {
1587        bcache_remove_dirty_blocks(file);
1588        bcache_remove_clean_blocks(file);
1589        bcache_remove_file(file);
1590        file->bcache.store(NULL, std::memory_order_relaxed);
1591    }
1592}
1593
1594void _free_fhandle_idx(struct avl_tree *idx);
1595void filemgr_free_func(struct hash_elem *h)
1596{
1597    struct filemgr *file = _get_entry(h, struct filemgr, e);
1598
1599    filemgr_prefetch_status_t prefetch_state =
1600                              atomic_get_uint8_t(&file->prefetch_status);
1601
1602    atomic_store_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_ABORT);
1603    if (prefetch_state == FILEMGR_PREFETCH_RUNNING) {
1604        // prefetch thread was running
1605        void *ret;
1606        // wait (the thread must have been created..)
1607        thread_join(file->prefetch_tid, &ret);
1608    }
1609
1610    // remove all cached blocks
1611    if (global_config.ncacheblock > 0 &&
1612            file->bcache.load(std::memory_order_relaxed)) {
1613        bcache_remove_dirty_blocks(file);
1614        bcache_remove_clean_blocks(file);
1615        bcache_remove_file(file);
1616        file->bcache.store(NULL, std::memory_order_relaxed);
1617    }
1618
1619    if (file->kv_header) {
1620        // multi KV intance mode & KV header exists
1621        file->free_kv_header(file);
1622    }
1623
1624    // free global transaction
1625    wal_remove_transaction(file, &file->global_txn);
1626    free(file->global_txn.items);
1627    free(file->global_txn.wrapper);
1628
1629    // destroy WAL
1630    if (wal_is_initialized(file)) {
1631        wal_shutdown(file, NULL);
1632        wal_destroy(file);
1633    }
1634    free(file->wal);
1635
1636#ifdef _LATENCY_STATS
1637    for (int x = 0; x < FDB_LATENCY_NUM_STATS; ++x) {
1638        filemgr_destroy_latency_stat(&file->lat_stats[x]);
1639    }
1640#endif // _LATENCY_STATS
1641
1642    // free filename and header
1643    free(file->filename);
1644    if (file->header.data) free(file->header.data);
1645    // free old filename if any
1646    free(file->old_filename);
1647
1648    // destroy locks
1649    spin_destroy(&file->lock);
1650
1651#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1652    plock_destroy(&file->plock);
1653#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1654    int i;
1655    for (i=0;i<DLOCK_MAX;++i) {
1656        mutex_destroy(&file->data_mutex[i]);
1657    }
1658#else
1659    int i;
1660    for (i=0;i<DLOCK_MAX;++i) {
1661        spin_destroy(&file->data_spinlock[i]);
1662    }
1663#endif //__FILEMGR_DATA_PARTIAL_LOCK
1664
1665    mutex_destroy(&file->writer_lock.mutex);
1666
1667    // free superblock
1668    if (sb_ops.release) {
1669        sb_ops.release(file);
1670    }
1671
1672    // free dirty update index
1673    filemgr_dirty_update_free(file);
1674
1675    // free fhandle idx
1676    _free_fhandle_idx(&file->fhandle_idx);
1677    spin_destroy(&file->fhandle_idx_lock);
1678
1679    // free file structure
1680    struct list *stale_list = filemgr_get_stale_list(file);
1681    filemgr_clear_stale_list(file);
1682    free(stale_list);
1683    free(file->config);
1684    free(file);
1685}
1686
1687// permanently remove file from cache (not just close)
1688// LCOV_EXCL_START
1689void filemgr_remove_file(struct filemgr *file, err_log_callback *log_callback)
1690{
1691    struct hash_elem *ret;
1692
1693    if (!file || atomic_get_uint32_t(&file->ref_count) > 0) {
1694        return;
1695    }
1696
1697    // remove from global hash table
1698    spin_lock(&filemgr_openlock);
1699    ret = hash_remove(&hash, &file->e);
1700    fdb_assert(ret, ret, NULL);
1701    spin_unlock(&filemgr_openlock);
1702
1703    if (!lazy_file_deletion_enabled ||
1704        (file->new_file && file->new_file->in_place_compaction)) {
1705        filemgr_free_func(&file->e);
1706    } else {
1707        register_file_removal(file, log_callback);
1708    }
1709}
1710// LCOV_EXCL_STOP
1711
1712static
1713void *_filemgr_is_closed(struct hash_elem *h, void *ctx) {
1714    struct filemgr *file = _get_entry(h, struct filemgr, e);
1715    void *ret;
1716    spin_lock(&file->lock);
1717    if (atomic_get_uint32_t(&file->ref_count) != 0) {
1718        ret = (void *)file;
1719    } else {
1720        ret = NULL;
1721    }
1722    spin_unlock(&file->lock);
1723    return ret;
1724}
1725
1726fdb_status filemgr_shutdown()
1727{
1728    fdb_status ret = FDB_RESULT_SUCCESS;
1729    void *open_file;
1730    if (filemgr_initialized) {
1731
1732#ifndef SPIN_INITIALIZER
1733        // Windows: check if spin lock is already destroyed.
1734        if (InterlockedCompareExchange(&initial_lock_status, 1, 2) == 2) {
1735            spin_lock(&initial_lock);
1736        } else {
1737            // filemgr is already shut down
1738            return ret;
1739        }
1740#else
1741        spin_lock(&initial_lock);
1742#endif
1743
1744        if (!filemgr_initialized) {
1745            // filemgr is already shut down
1746#ifdef SPIN_INITIALIZER
1747            spin_unlock(&initial_lock);
1748#endif
1749            return ret;
1750        }
1751
1752        spin_lock(&filemgr_openlock);
1753        open_file = hash_scan(&hash, _filemgr_is_closed, NULL);
1754        spin_unlock(&filemgr_openlock);
1755        if (!open_file) {
1756            hash_free_active(&hash, filemgr_free_func);
1757            if (global_config.ncacheblock > 0) {
1758                bcache_shutdown();
1759            }
1760            filemgr_initialized = 0;
1761#ifndef SPIN_INITIALIZER
1762            initial_lock_status = 0;
1763#else
1764            initial_lock = SPIN_INITIALIZER;
1765#endif
1766            _filemgr_shutdown_temp_buf();
1767            spin_unlock(&initial_lock);
1768#ifndef SPIN_INITIALIZER
1769            spin_destroy(&initial_lock);
1770#endif
1771        } else {
1772            spin_unlock(&initial_lock);
1773            ret = FDB_RESULT_FILE_IS_BUSY;
1774        }
1775    }
1776    return ret;
1777}
1778
1779bid_t filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)
1780{
1781    spin_lock(&file->lock);
1782    bid_t bid = BLK_NOT_FOUND;
1783
1784    // block reusing is not allowed for being compacted file
1785    // for easy implementation.
1786    if (filemgr_get_file_status(file) == FILE_NORMAL &&
1787        file->sb && sb_ops.alloc_block) {
1788        bid = sb_ops.alloc_block(file);
1789    }
1790    if (bid == BLK_NOT_FOUND) {
1791        bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1792        atomic_add_uint64_t(&file->pos, file->blocksize);
1793    }
1794
1795    if (global_config.ncacheblock <= 0) {
1796        // if block cache is turned off, write the allocated block before use
1797        uint8_t _buf = 0x0;
1798        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1799                                       (bid+1) * file->blocksize - 1);
1800        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1801    }
1802    spin_unlock(&file->lock);
1803
1804    return bid;
1805}
1806
1807// Note that both alloc_multiple & alloc_multiple_cond are not used in
1808// the new version of DB file (with superblock support).
1809void filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin,
1810                            bid_t *end, err_log_callback *log_callback)
1811{
1812    spin_lock(&file->lock);
1813    *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1814    *end = *begin + nblock - 1;
1815    atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1816
1817    if (global_config.ncacheblock <= 0) {
1818        // if block cache is turned off, write the allocated block before use
1819        uint8_t _buf = 0x0;
1820        ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1821                                       atomic_get_uint64_t(&file->pos) - 1);
1822        _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1823    }
1824    spin_unlock(&file->lock);
1825}
1826
1827// atomically allocate NBLOCK blocks only when current file position is same to nextbid
1828bid_t filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock,
1829                                  bid_t *begin, bid_t *end,
1830                                  err_log_callback *log_callback)
1831{
1832    bid_t bid;
1833    spin_lock(&file->lock);
1834    bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1835    if (bid == nextbid) {
1836        *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1837        *end = *begin + nblock - 1;
1838        atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1839
1840        if (global_config.ncacheblock <= 0) {
1841            // if block cache is turned off, write the allocated block before use
1842            uint8_t _buf = 0x0;
1843            ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1844                                           atomic_get_uint64_t(&file->pos));
1845            _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1846        }
1847    }else{
1848        *begin = BLK_NOT_FOUND;
1849        *end = BLK_NOT_FOUND;
1850    }
1851    spin_unlock(&file->lock);
1852    return bid;
1853}
1854
1855#ifdef __CRC32
1856INLINE fdb_status _filemgr_crc32_check(struct filemgr *file, void *buf)
1857{
1858    if ( *((uint8_t*)buf + file->blocksize-1) == BLK_MARKER_BNODE ) {
1859        uint32_t crc_file = 0;
1860        memcpy(&crc_file, (uint8_t *) buf + BTREE_CRC_OFFSET, sizeof(crc_file));
1861        crc_file = _endian_decode(crc_file);
1862        memset((uint8_t *) buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1863        if (!perform_integrity_check(reinterpret_cast<const uint8_t*>(buf),
1864                                     file->blocksize,
1865                                     crc_file,
1866                                     file->crc_mode)) {
1867            return FDB_RESULT_CHECKSUM_ERROR;
1868        }
1869    }
1870    return FDB_RESULT_SUCCESS;
1871}
1872#endif
1873
1874bool filemgr_invalidate_block(struct filemgr *file, bid_t bid)
1875{
1876    bool ret;
1877    if (atomic_get_uint64_t(&file->last_commit) < bid * file->blocksize) {
1878        ret = true; // block invalidated was allocated recently (uncommitted)
1879    } else {
1880        ret = false; // a block from the past is invalidated (committed)
1881    }
1882    if (global_config.ncacheblock > 0) {
1883        bcache_invalidate_block(file, bid);
1884    }
1885    return ret;
1886}
1887
1888bool filemgr_is_fully_resident(struct filemgr *file)
1889{
1890    bool ret = false;
1891    if (global_config.ncacheblock > 0) {
1892        //TODO: A better thing to do is to track number of document blocks
1893        // and only compare those with the cached document block count
1894        double num_cached_blocks = (double)bcache_get_num_blocks(file);
1895        uint64_t num_blocks = atomic_get_uint64_t(&file->pos)
1896                                 / file->blocksize;
1897        double num_fblocks = (double)num_blocks;
1898        if (num_cached_blocks > num_fblocks * FILEMGR_RESIDENT_THRESHOLD) {
1899            ret = true;
1900        }
1901    }
1902    return ret;
1903}
1904
1905uint64_t filemgr_flush_immutable(struct filemgr *file,
1906                                   err_log_callback *log_callback)
1907{
1908    uint64_t ret = 0;
1909    if (global_config.ncacheblock > 0) {
1910        if (atomic_get_uint8_t(&file->io_in_prog)) {
1911            return 0;
1912        }
1913        ret = bcache_get_num_immutable(file);
1914        if (!ret) {
1915            return ret;
1916        }
1917        fdb_status rv = bcache_flush_immutable(file);
1918        if (rv != FDB_RESULT_SUCCESS) {
1919            _log_errno_str(file->ops, log_callback, (fdb_status)rv, "WRITE",
1920                           file->filename);
1921        }
1922        return bcache_get_num_immutable(file);
1923    }
1924
1925    return ret;
1926}
1927
1928fdb_status filemgr_read(struct filemgr *file, bid_t bid, void *buf,
1929                        err_log_callback *log_callback,
1930                        bool read_on_cache_miss)
1931{
1932    size_t lock_no;
1933    ssize_t r;
1934    uint64_t pos = bid * file->blocksize;
1935    fdb_status status = FDB_RESULT_SUCCESS;
1936    uint64_t curr_pos = atomic_get_uint64_t(&file->pos);
1937
1938    if (pos >= curr_pos) {
1939        const char *msg = "Read error: read offset %" _F64 " exceeds the file's "
1940                          "current offset %" _F64 " in a database file '%s'\n";
1941        fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, pos, curr_pos,
1942                file->filename);
1943        return FDB_RESULT_READ_FAIL;
1944    }
1945
1946    if (global_config.ncacheblock > 0) {
1947        lock_no = bid % DLOCK_MAX;
1948        (void)lock_no;
1949
1950#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1951        plock_entry_t *plock_entry = NULL;
1952        bid_t is_writer = 0;
1953#endif
1954        bool locked = false;
1955        // Note: we don't need to grab lock for committed blocks
1956        // because they are immutable so that no writer will interfere and
1957        // overwrite dirty data
1958        if (filemgr_is_writable(file, bid)) {
1959#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1960            plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1961#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1962            mutex_lock(&file->data_mutex[lock_no]);
1963#else
1964            spin_lock(&file->data_spinlock[lock_no]);
1965#endif //__FILEMGR_DATA_PARTIAL_LOCK
1966            locked = true;
1967        }
1968
1969        r = bcache_read(file, bid, buf);
1970        if (r == 0) {
1971            // cache miss
1972            if (!read_on_cache_miss) {
1973                if (locked) {
1974#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1975                    plock_unlock(&file->plock, plock_entry);
1976#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1977                    mutex_unlock(&file->data_mutex[lock_no]);
1978#else
1979                    spin_unlock(&file->data_spinlock[lock_no]);
1980#endif //__FILEMGR_DATA_PARTIAL_LOCK
1981                }
1982                const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
1983                    "doesn't exist in the cache and read_on_cache_miss flag is turned on.\n";
1984                fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, bid,
1985                        file->filename);
1986                return FDB_RESULT_READ_FAIL;
1987            }
1988
1989            // if normal file, just read a block
1990            r = filemgr_read_block(file, buf, bid);
1991            if (r != (ssize_t)file->blocksize) {
1992                _log_errno_str(file->ops, log_callback,
1993                               (fdb_status) r, "READ", file->filename);
1994                if (locked) {
1995#ifdef __FILEMGR_DATA_PARTIAL_LOCK
1996                    plock_unlock(&file->plock, plock_entry);
1997#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1998                    mutex_unlock(&file->data_mutex[lock_no]);
1999#else
2000                    spin_unlock(&file->data_spinlock[lock_no]);
2001#endif //__FILEMGR_DATA_PARTIAL_LOCK
2002                }
2003                const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2004                    "is not read correctly: only %d bytes read.\n";
2005                status = r < 0 ? (fdb_status)r : FDB_RESULT_READ_FAIL;
2006                fdb_log(log_callback, status, msg, bid, file->filename, r);
2007                if (!log_callback || !log_callback->callback) {
2008                    dbg_print_buf(buf, file->blocksize, true, 16);
2009                }
2010                return status;
2011            }
2012#ifdef __CRC32
2013            status = _filemgr_crc32_check(file, buf);
2014            if (status != FDB_RESULT_SUCCESS) {
2015                _log_errno_str(file->ops, log_callback, status, "READ",
2016                        file->filename);
2017                if (locked) {
2018#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2019                    plock_unlock(&file->plock, plock_entry);
2020#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2021                    mutex_unlock(&file->data_mutex[lock_no]);
2022#else
2023                    spin_unlock(&file->data_spinlock[lock_no]);
2024#endif //__FILEMGR_DATA_PARTIAL_LOCK
2025                }
2026                const char *msg = "Read error: checksum error on BID %" _F64 " in a database file '%s' "
2027                    ": marker %x\n";
2028                fdb_log(log_callback, status, msg, bid,
2029                        file->filename, *((uint8_t*)buf + file->blocksize-1));
2030                if (!log_callback || !log_callback->callback) {
2031                    dbg_print_buf(buf, file->blocksize, true, 16);
2032                }
2033                return status;
2034            }
2035#endif
2036            r = bcache_write(file, bid, buf, BCACHE_REQ_CLEAN, false);
2037            if (r != global_config.blocksize) {
2038                if (locked) {
2039#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2040                    plock_unlock(&file->plock, plock_entry);
2041#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2042                    mutex_unlock(&file->data_mutex[lock_no]);
2043#else
2044                    spin_unlock(&file->data_spinlock[lock_no]);
2045#endif //__FILEMGR_DATA_PARTIAL_LOCK
2046                }
2047                _log_errno_str(file->ops, log_callback,
2048                               (fdb_status) r, "WRITE", file->filename);
2049                const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2050                    "is not written in cache correctly: only %d bytes written.\n";
2051                status = r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2052                fdb_log(log_callback, status, msg, bid, file->filename, r);
2053                if (!log_callback || !log_callback->callback) {
2054                    dbg_print_buf(buf, file->blocksize, true, 16);
2055                }
2056                return status;
2057            }
2058        }
2059        if (locked) {
2060#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2061            plock_unlock(&file->plock, plock_entry);
2062#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2063            mutex_unlock(&file->data_mutex[lock_no]);
2064#else
2065            spin_unlock(&file->data_spinlock[lock_no]);
2066#endif //__FILEMGR_DATA_PARTIAL_LOCK
2067        }
2068    } else {
2069        if (!read_on_cache_miss) {
2070            const char *msg = "Read error: BID %" _F64 " in a database file '%s':"
2071                "block cache is not enabled.\n";
2072            fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, bid,
2073                    file->filename);
2074            return FDB_RESULT_READ_FAIL;
2075        }
2076
2077        r = filemgr_read_block(file, buf, bid);
2078        if (r != (ssize_t)file->blocksize) {
2079            _log_errno_str(file->ops, log_callback, (fdb_status) r, "READ",
2080                           file->filename);
2081            const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2082                "is not read correctly: only %d bytes read (block cache disabled).\n";
2083            status = (r < 0)? (fdb_status)r : FDB_RESULT_READ_FAIL;
2084            fdb_log(log_callback, status, msg, bid, file->filename, r);
2085            if (!log_callback || !log_callback->callback) {
2086                dbg_print_buf(buf, file->blocksize, true, 16);
2087            }
2088            return status;
2089        }
2090
2091#ifdef __CRC32
2092        status = _filemgr_crc32_check(file, buf);
2093        if (status != FDB_RESULT_SUCCESS) {
2094            _log_errno_str(file->ops, log_callback, status, "READ",
2095                           file->filename);
2096            const char *msg = "Read error: checksum error on BID %" _F64 " in a database file '%s' "
2097                ": marker %x (block cache disabled)\n";
2098            fdb_log(log_callback, status, msg, bid,
2099                    file->filename, *((uint8_t*)buf + file->blocksize-1));
2100            if (!log_callback || !log_callback->callback) {
2101                dbg_print_buf(buf, file->blocksize, true, 16);
2102            }
2103            return status;
2104        }
2105#endif
2106    }
2107    return status;
2108}
2109
2110fdb_status filemgr_write_offset(struct filemgr *file, bid_t bid,
2111                                uint64_t offset, uint64_t len, void *buf,
2112                                bool final_write,
2113                                err_log_callback *log_callback)
2114{
2115    size_t lock_no;
2116    ssize_t r = 0;
2117    uint64_t pos = bid * file->blocksize + offset;
2118    uint64_t curr_commit_pos = atomic_get_uint64_t(&file->last_commit);
2119
2120    if (offset + len > file->blocksize) {
2121        const char *msg = "Write error: trying to write the buffer data "
2122            "(offset: %" _F64 ", len: %" _F64 " that exceeds the block size "
2123            "%" _F64 " in a database file '%s'\n";
2124        fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, offset, len,
2125                file->blocksize, file->filename);
2126        return FDB_RESULT_WRITE_FAIL;
2127    }
2128
2129    if (sb_bmp_exists(file->sb)) {
2130        // block reusing is enabled
2131        if (!sb_ops.is_writable(file, bid)) {
2132            const char *msg = "Write error: trying to write at the offset %" _F64 " that is "
2133                              "not identified as a reusable block in "
2134                              "a database file '%s'\n";
2135            fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, pos, file->filename);
2136            return FDB_RESULT_WRITE_FAIL;
2137        }
2138    } else if (pos < curr_commit_pos) {
2139        // stale blocks are not reused yet
2140        if (file->sb == NULL ||
2141            (file->sb && pos >= file->sb->config->num_sb * file->blocksize)) {
2142            // (non-sequential update is exceptionally allowed for superblocks)
2143            const char *msg = "Write error: trying to write at the offset %" _F64 " that is "
2144                              "smaller than the current commit offset %" _F64 " in "
2145                              "a database file '%s'\n";
2146            fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, pos, curr_commit_pos,
2147                    file->filename);
2148            return FDB_RESULT_WRITE_FAIL;
2149        }
2150    }
2151
2152    if (global_config.ncacheblock > 0) {
2153        lock_no = bid % DLOCK_MAX;
2154        (void)lock_no;
2155
2156        bool locked = false;
2157#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2158        plock_entry_t *plock_entry;
2159        bid_t is_writer = 1;
2160        plock_entry = plock_lock(&file->plock, &bid, &is_writer);
2161#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2162        mutex_lock(&file->data_mutex[lock_no]);
2163#else
2164        spin_lock(&file->data_spinlock[lock_no]);
2165#endif //__FILEMGR_DATA_PARTIAL_LOCK
2166        locked = true;
2167
2168        if (len == file->blocksize) {
2169            // write entire block .. we don't need to read previous block
2170            r = bcache_write(file, bid, buf, BCACHE_REQ_DIRTY, final_write);
2171            if (r != global_config.blocksize) {
2172                if (locked) {
2173#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2174                    plock_unlock(&file->plock, plock_entry);
2175#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2176                    mutex_unlock(&file->data_mutex[lock_no]);
2177#else
2178                    spin_unlock(&file->data_spinlock[lock_no]);
2179#endif //__FILEMGR_DATA_PARTIAL_LOCK
2180                }
2181                _log_errno_str(file->ops, log_callback,
2182                               (fdb_status) r, "WRITE", file->filename);
2183                return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2184            }
2185        } else {
2186            // partially write buffer cache first
2187            r = bcache_write_partial(file, bid, buf, offset, len, final_write);
2188            if (r == 0) {
2189                // cache miss
2190                // write partially .. we have to read previous contents of the block
2191                int64_t cur_file_pos = file->ops->goto_eof(file->fd);
2192                if (cur_file_pos < 0) {
2193                    _log_errno_str(file->ops, log_callback,
2194                                   (fdb_status) cur_file_pos, "EOF", file->filename);
2195                    return (fdb_status) cur_file_pos;
2196                }
2197                bid_t cur_file_last_bid = cur_file_pos / file->blocksize;
2198                void *_buf = _filemgr_get_temp_buf();
2199
2200                if (bid >= cur_file_last_bid) {
2201                    // this is the first time to write this block
2202                    // we don't need to read previous block from file.
2203                } else {
2204                    r = filemgr_read_block(file, _buf, bid);
2205                    if (r != (ssize_t)file->blocksize) {
2206                        if (locked) {
2207#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2208                            plock_unlock(&file->plock, plock_entry);
2209#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2210                            mutex_unlock(&file->data_mutex[lock_no]);
2211#else
2212                            spin_unlock(&file->data_spinlock[lock_no]);
2213#endif //__FILEMGR_DATA_PARTIAL_LOCK
2214                        }
2215                        _filemgr_release_temp_buf(_buf);
2216                        _log_errno_str(file->ops, log_callback, (fdb_status) r,
2217                                       "READ", file->filename);
2218                        return r < 0 ? (fdb_status) r : FDB_RESULT_READ_FAIL;
2219                    }
2220                }
2221                memcpy((uint8_t *)_buf + offset, buf, len);
2222                r = bcache_write(file, bid, _buf, BCACHE_REQ_DIRTY, final_write);
2223                if (r != global_config.blocksize) {
2224                    if (locked) {
2225#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2226                        plock_unlock(&file->plock, plock_entry);
2227#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2228                        mutex_unlock(&file->data_mutex[lock_no]);
2229#else
2230                        spin_unlock(&file->data_spinlock[lock_no]);
2231#endif //__FILEMGR_DATA_PARTIAL_LOCK
2232                    }
2233                    _filemgr_release_temp_buf(_buf);
2234                    _log_errno_str(file->ops, log_callback,
2235                            (fdb_status) r, "WRITE", file->filename);
2236                    return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2237                }
2238
2239                _filemgr_release_temp_buf(_buf);
2240            } // cache miss
2241        } // full block or partial block
2242
2243        if (locked) {
2244#ifdef __FILEMGR_DATA_PARTIAL_LOCK
2245            plock_unlock(&file->plock, plock_entry);
2246#elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2247            mutex_unlock(&file->data_mutex[lock_no]);
2248#else
2249            spin_unlock(&file->data_spinlock[lock_no]);
2250#endif //__FILEMGR_DATA_PARTIAL_LOCK
2251        }
2252    } else { // block cache disabled
2253
2254#ifdef __CRC32
2255        if (len == file->blocksize) {
2256            uint8_t marker = *((uint8_t*)buf + file->blocksize - 1);
2257            if (marker == BLK_MARKER_BNODE) {
2258                memset((uint8_t *)buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
2259                uint32_t crc32 = get_checksum(reinterpret_cast<const uint8_t*>(buf),
2260                                              file->blocksize,
2261                                              file->crc_mode);
2262                crc32 = _endian_encode(crc32);
2263                memcpy((uint8_t *)buf + BTREE_CRC_OFFSET, &crc32, sizeof(crc32));
2264            }
2265        }
2266#endif
2267
2268        r = file->ops->pwrite(file->fd, buf, len, pos);
2269        _log_errno_str(file->ops, log_callback, (fdb_status) r, "WRITE", file->filename);
2270        if ((uint64_t)r != len) {
2271            return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2272        }
2273    } // block cache check
2274    return FDB_RESULT_SUCCESS;
2275}
2276
2277fdb_status filemgr_write(struct filemgr *file, bid_t bid, void *buf,
2278                   err_log_callback *log_callback)
2279{
2280    return filemgr_write_offset(file, bid, 0, file->blocksize, buf,
2281                                false, // TODO: track immutability of index blk
2282                                log_callback);
2283}
2284
2285fdb_status filemgr_commit(struct filemgr *file, bool sync,
2286                          err_log_callback *log_callback)
2287{
2288    // append header at the end of the file
2289    uint64_t bmp_revnum = 0;
2290    if (sb_ops.get_bmp_revnum) {
2291        bmp_revnum = sb_ops.get_bmp_revnum(file);
2292    }
2293    return filemgr_commit_bid(file, BLK_NOT_FOUND, bmp_revnum,
2294                              sync, log_callback);
2295}
2296
2297fdb_status filemgr_commit_bid(struct filemgr *file, bid_t bid,
2298                              uint64_t bmp_revnum, bool sync,
2299                              err_log_callback *log_callback)
2300{
2301    struct avl_node *a;
2302    struct kvs_node *node;
2303    bid_t prev_bid, _prev_bid;
2304    uint64_t _deltasize, _bmp_revnum;
2305    fdb_seqnum_t _seqnum;
2306    filemgr_header_revnum_t _revnum;
2307    int result = FDB_RESULT_SUCCESS;
2308    bool block_reusing = false;
2309
2310    filemgr_set_io_inprog(file);
2311    if (global_config.ncacheblock > 0) {
2312        result = bcache_flush(file);
2313        if (result != FDB_RESULT_SUCCESS) {
2314            _log_errno_str(file->ops, log_callback, (fdb_status) result,
2315                           "FLUSH", file->filename);
2316            filemgr_clear_io_inprog(file);
2317            return (fdb_status)result;
2318        }
2319    }
2320
2321    spin_lock(&file->lock);
2322
2323    uint16_t header_len = file->header.size;
2324    struct kvs_header *kv_header = file->kv_header;
2325    filemgr_magic_t magic = file->version;
2326
2327    if (file->header.size > 0 && file->header.data) {
2328        void *buf = _filemgr_get_temp_buf();
2329        uint8_t marker[BLK_MARKER_SIZE];
2330
2331        // [header data]:        'header_len' bytes   <---+
2332        // [header revnum]:      8 bytes                  |
2333        // [default KVS seqnum]: 8 bytes                  |
2334        // ...                                            |
2335        // (empty)                                    blocksize
2336        // ...                                            |
2337        // [SB bitmap revnum]:   8 bytes                  |
2338        // [Delta size]:         8 bytes                  |
2339        // [prev header bid]:    8 bytes                  |
2340        // [header length]:      2 bytes                  |
2341        // [magic number]:       8 bytes                  |
2342        // [block marker]:       1 byte               <---+
2343
2344        // header data
2345        memcpy(buf, file->header.data, header_len);
2346        // header rev number
2347        _revnum = _endian_encode(file->header.revnum);
2348        memcpy((uint8_t *)buf + header_len, &_revnum,
2349               sizeof(filemgr_header_revnum_t));
2350        // file's sequence number (default KVS seqnum)
2351        _seqnum = _endian_encode(file->header.seqnum.load());
2352        memcpy((uint8_t *)buf + header_len + sizeof(filemgr_header_revnum_t),
2353               &_seqnum, sizeof(fdb_seqnum_t));
2354
2355        // current header's sb bmp revision number
2356        if (file->sb) {
2357            _bmp_revnum = _endian_encode(bmp_revnum);
2358            memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2359                   - sizeof(header_len) - sizeof(_prev_bid)
2360                   - sizeof(_deltasize) - sizeof(_bmp_revnum)
2361                   - BLK_MARKER_SIZE),
2362                   &_bmp_revnum, sizeof(_bmp_revnum));
2363        }
2364
2365        // delta size since prior commit
2366        _deltasize = _endian_encode(file->header.stat.deltasize //index+data
2367                                  + wal_get_datasize(file)); // wal datasize
2368        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2369               - sizeof(header_len) - sizeof(_prev_bid)*2 - BLK_MARKER_SIZE),
2370               &_deltasize, sizeof(_deltasize));
2371
2372        // Reset in-memory delta size of the header for next commit...
2373        file->header.stat.deltasize = 0; // single kv store header
2374        if (kv_header) { // multi kv store stats
2375            a = avl_first(kv_header->idx_id);
2376            while (a) {
2377                node = _get_entry(a, struct kvs_node, avl_id);
2378                a = avl_next(&node->avl_id);
2379                node->stat.deltasize = 0;
2380            }
2381        }
2382
2383        // prev header bid
2384        prev_bid = atomic_get_uint64_t(&file->header.bid);
2385        _prev_bid = _endian_encode(prev_bid);
2386        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2387               - sizeof(header_len) - sizeof(_prev_bid) - BLK_MARKER_SIZE),
2388               &_prev_bid, sizeof(_prev_bid));
2389        // header length
2390        header_len = _endian_encode(header_len);
2391        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2392               - sizeof(header_len) - BLK_MARKER_SIZE),
2393               &header_len, sizeof(header_len));
2394        // magic number
2395        magic = _endian_encode(magic);
2396        memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2397               - BLK_MARKER_SIZE), &magic, sizeof(magic));
2398
2399        // marker
2400        memset(marker, BLK_MARKER_DBHEADER, BLK_MARKER_SIZE);
2401        memcpy((uint8_t *)buf + file->blocksize - BLK_MARKER_SIZE,
2402               marker, BLK_MARKER_SIZE);
2403
2404        if (bid == BLK_NOT_FOUND) {
2405            // append header at the end of file
2406            bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
2407            block_reusing = false;
2408        } else {
2409            // write header in the allocated (reused) block
2410            block_reusing = true;
2411            // we MUST invalidate the header block 'bid', since previous
2412            // contents of 'bid' may remain in block cache and cause data
2413            // inconsistency if reading header block hits the cache.
2414            bcache_invalidate_block(file, bid);
2415        }
2416
2417        ssize_t rv = filemgr_write_blocks(file, buf, 1, bid);
2418        _log_errno_str(file->ops, log_callback, (fdb_status) rv,
2419                       "WRITE", file->filename);
2420        if (rv != (ssize_t)file->blocksize) {
2421            _filemgr_release_temp_buf(buf);
2422            spin_unlock(&file->lock);
2423            filemgr_clear_io_inprog(file);
2424            return rv < 0 ? (fdb_status) rv : FDB_RESULT_WRITE_FAIL;
2425        }
2426
2427        if (prev_bid) {
2428            // mark prev DB header as stale
2429            filemgr_add_stale_block(file, prev_bid * file->blocksize, file->blocksize);
2430        }
2431
2432        atomic_store_uint64_t(&file->header.bid, bid);
2433        if (!block_reusing) {
2434            atomic_add_uint64_t(&file->pos, file->blocksize);
2435        }
2436
2437        _filemgr_release_temp_buf(buf);
2438    }
2439
2440    if (sb_bmp_exists(file->sb) &&
2441        atomic_get_uint64_t(&file->sb->cur_alloc_bid) != BLK_NOT_FOUND &&
2442        atomic_get_uint8_t(&file->status) == FILE_NORMAL) {
2443        // block reusing is currently enabled
2444        atomic_store_uint64_t(&file->last_commit,
2445                              atomic_get_uint64_t(&file->sb->cur_alloc_bid) * file->blocksize);
2446    } else {
2447        atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
2448    }
2449    if (file->sb) {
2450        atomic_store_uint64_t(&file->last_commit_bmp_revnum,
2451                              bmp_revnum);
2452    }
2453
2454    spin_unlock(&file->lock);
2455
2456    if (sync) {
2457        result = file->ops->fsync(file->fd);
2458        _log_errno_str(file->ops, log_callback, (fdb_status)result,
2459                       "FSYNC", file->filename);
2460    }
2461    filemgr_clear_io_inprog(file);
2462    return (fdb_status) result;
2463}
2464
2465fdb_status filemgr_sync(struct filemgr *file, bool sync_option,
2466                        err_log_callback *log_callback)
2467{
2468    fdb_status result = FDB_RESULT_SUCCESS;
2469    if (global_config.ncacheblock > 0) {
2470        result = bcache_flush(file);
2471        if (result != FDB_RESULT_SUCCESS) {
2472            _log_errno_str(file->ops, log_callback, (fdb_status) result,
2473                           "FLUSH", file->filename);
2474            return result;
2475        }
2476    }
2477
2478    if (sync_option && file->fflags & FILEMGR_SYNC) {
2479        int rv = file->ops->fsync(file->fd);
2480        _log_errno_str(file->ops, log_callback, (fdb_status)rv, "FSYNC", file->filename);
2481        return (fdb_status) rv;
2482    }
2483    return result;
2484}
2485
2486fdb_status filemgr_copy_file_range(struct filemgr *src_file,
2487                                   struct filemgr *dst_file,
2488                                   bid_t src_bid, bid_t dst_bid,
2489                                   bid_t clone_len)
2490{
2491    uint32_t blocksize = src_file->blocksize;
2492    fdb_status fs = (fdb_status)dst_file->ops->copy_file_range(
2493                                            src_file->fs_type,
2494                                            src_file->fd,
2495                                            dst_file->fd,
2496                                            src_bid * blocksize,
2497                                            dst_bid * blocksize,
2498                                            clone_len * blocksize);
2499    if (fs != FDB_RESULT_SUCCESS) {
2500        return fs;
2501    }
2502    atomic_store_uint64_t(&dst_file->pos, (dst_bid + clone_len) * blocksize);
2503    return FDB_RESULT_SUCCESS;
2504}
2505
2506int filemgr_update_file_status(struct filemgr *file, file_status_t status,
2507                                char *old_filename)
2508{
2509    int ret = 1;
2510    spin_lock(&file->lock);
2511    atomic_store_uint8_t(&file->status, status);
2512    if (old_filename) {
2513        if (!file->old_filename) {
2514            file->old_filename = old_filename;
2515        } else {
2516            ret = 0;
2517            fdb_assert(atomic_get_uint32_t(&file->ref_count),
2518                       atomic_get_uint32_t(&file->ref_count), 0);
2519        }
2520    }
2521    spin_unlock(&file->lock);
2522    return ret;
2523}
2524
2525void filemgr_set_compaction_state(struct filemgr *old_file, struct filemgr *new_file,
2526                                  file_status_t status)
2527{
2528    spin_lock(&old_file->lock);
2529    old_file->new_file = new_file;
2530    atomic_store_uint8_t(&old_file->status, status);
2531    spin_unlock(&old_file->lock);
2532
2533    if (new_file) {
2534        spin_lock(&new_file->lock);
2535        new_file->prev_file = old_file;
2536        spin_unlock(&new_file->lock);
2537    }
2538}
2539
2540bool filemgr_set_kv_header(struct filemgr *file, struct kvs_header *kv_header,
2541                           void (*free_kv_header)(struct filemgr *file))
2542{
2543    bool ret;
2544    spin_lock(&file->lock);
2545
2546    if (!file->kv_header) {
2547        file->kv_header = kv_header;
2548        file->free_kv_header = free_kv_header;
2549        ret = true;
2550    } else {
2551        ret = false;
2552    }
2553
2554    spin_unlock(&file->lock);
2555
2556    return ret;
2557}
2558
2559struct kvs_header *filemgr_get_kv_header(struct filemgr *file)
2560{
2561    struct kvs_header *kv_header = NULL;
2562    spin_lock(&file->lock);
2563    kv_header = file->kv_header;
2564    spin_unlock(&file->lock);
2565    return kv_header;
2566}
2567
2568// Check if there is a file that still points to the old_file that is being
2569// compacted away. If so open the file and return its pointer.
2570static
2571void *_filemgr_check_stale_link(struct hash_elem *h, void *ctx) {
2572    struct filemgr *cur_file = (struct filemgr *)ctx;
2573    struct filemgr *file = _get_entry(h, struct filemgr, e);
2574    spin_lock(&file->lock);
2575    if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING &&
2576        file->new_file == cur_file) {
2577        // Incrementing reference counter below is the same as filemgr_open()
2578        // We need to do this to ensure that the pointer returned does not
2579        // get freed outside the filemgr_open lock
2580        atomic_incr_uint32_t(&file->ref_count);
2581        spin_unlock(&file->lock);
2582        return (void *)file;
2583    }
2584    spin_unlock(&file->lock);
2585    return (void *)NULL;
2586}
2587
2588struct filemgr *filemgr_search_stale_links(struct filemgr *cur_file) {
2589    struct filemgr *very_old_file;
2590    spin_lock(&filemgr_openlock);
2591    very_old_file = (struct filemgr *)hash_scan(&hash,
2592                                         _filemgr_check_stale_link, cur_file);
2593    spin_unlock(&filemgr_openlock);
2594    return very_old_file;
2595}
2596
2597char *filemgr_redirect_old_file(struct filemgr *very_old_file,
2598                                     struct filemgr *new_file,
2599                                     filemgr_redirect_hdr_func
2600                                     redirect_header_func) {
2601    size_t old_header_len, new_header_len;
2602    uint16_t new_filename_len;
2603    char *past_filename;
2604    spin_lock(&very_old_file->lock);
2605
2606    if (very_old_file->header.size == 0 || very_old_file->new_file == NULL) {
2607        spin_unlock(&very_old_file->lock);
2608        return NULL;
2609    }
2610
2611    old_header_len = very_old_file->header.size;
2612    new_filename_len = strlen(new_file->filename);
2613    // Find out the new DB header length with new_file's filename
2614    new_header_len = old_header_len - strlen(very_old_file->new_file->filename)
2615        + new_filename_len;
2616    // As we are going to change the new_filename field in the DB header of the
2617    // very_old_file, maybe reallocate DB header buf to accomodate bigger value
2618    if (new_header_len > old_header_len) {
2619        very_old_file->header.data = realloc(very_old_file->header.data,
2620                new_file->blocksize);
2621    }
2622    very_old_file->new_file = new_file; // Re-direct very_old_file to new_file
2623    // Note that the prev_file pointer of the new_file is not updated, this
2624    // is so that every file in the history is reachable from the current file.
2625
2626    past_filename = redirect_header_func(very_old_file,
2627                                         (uint8_t *)very_old_file->header.data,
2628                                         new_file);//Update in-memory header
2629    very_old_file->header.size = new_header_len;
2630    ++(very_old_file->header.revnum);
2631
2632    spin_unlock(&very_old_file->lock);
2633    return past_filename;
2634}
2635
2636void filemgr_remove_pending(struct filemgr *old_file,
2637                            struct filemgr *new_file,
2638                            err_log_callback *log_callback)
2639{
2640    if (new_file == NULL) {
2641        return;
2642    }
2643
2644    spin_lock(&old_file->lock);
2645    if (atomic_get_uint32_t(&old_file->ref_count) > 0) {
2646        // delay removing
2647        old_file->new_file = new_file;
2648        atomic_store_uint8_t(&old_file->status, FILE_REMOVED_PENDING);
2649
2650#if !(defined(WIN32) || defined(_WIN32))
2651        // Only for Posix
2652        int ret;
2653        ret = unlink(old_file->filename);
2654        _log_errno_str(old_file->ops, log_callback, (fdb_status)ret,
2655                       "UNLINK", old_file->filename);
2656#endif
2657
2658        spin_unlock(&old_file->lock);
2659    } else {
2660        // immediatly remove
2661        // LCOV_EXCL_START
2662        spin_unlock(&old_file->lock);
2663
2664        if (!lazy_file_deletion_enabled ||
2665            (old_file->new_file && old_file->new_file->in_place_compaction)) {
2666            remove(old_file->filename);
2667        }
2668        filemgr_remove_file(old_file, log_callback);
2669        // LCOV_EXCL_STOP
2670    }
2671}
2672
2673// migrate default kv store stats over to new_file
2674struct kvs_ops_stat *filemgr_migrate_op_stats(struct filemgr *old_file,
2675                                              struct filemgr *new_file,
2676                                              struct kvs_info *kvs)
2677{
2678    kvs_ops_stat *ret = NULL;
2679    if (new_file == NULL) {
2680        return NULL;
2681    }
2682
2683    spin_lock(&old_file->lock);
2684    new_file->header.op_stat = old_file->header.op_stat;
2685    ret = &new_file->header.op_stat;
2686    spin_unlock(&old_file->lock);
2687    return ret;
2688}
2689
2690// Note: filemgr_openlock should be held before calling this function.
2691fdb_status filemgr_destroy_file(char *filename,
2692                                struct filemgr_config *config,
2693                                struct hash *destroy_file_set)
2694{
2695    struct filemgr *file = NULL;
2696    struct hash to_destroy_files;
2697    struct hash *destroy_set = (destroy_file_set ? destroy_file_set :
2698                                                  &to_destroy_files);
2699    struct filemgr query;
2700    struct hash_elem *e = NULL;
2701    fdb_status status = FDB_RESULT_SUCCESS;
2702    char *old_filename = NULL;
2703
2704    if (!destroy_file_set) { // top level or non-recursive call
2705        hash_init(destroy_set, NBUCKET, _file_hash, _file_cmp);
2706    }
2707
2708    query.filename = filename;
2709    // check whether file is already being destroyed in parent recursive call
2710    e = hash_find(destroy_set, &query.e);
2711    if (e) { // Duplicate filename found, nothing to be done in this call
2712        if (!destroy_file_set) { // top level or non-recursive call
2713            hash_free(destroy_set);
2714        }
2715        return status;
2716    } else {
2717        // Remember file. Stack value ok IFF single direction recursion
2718        hash_insert(destroy_set, &query.e);
2719    }
2720
2721    // check global list of known files to see if it is already opened or not
2722    e = hash_find(&hash, &query.e);
2723    if (e) {
2724        // already opened (return existing structure)
2725        file = _get_entry(e, struct filemgr, e);
2726
2727        spin_lock(&file->lock);
2728        if (atomic_get_uint32_t(&file->ref_count)) {
2729            spin_unlock(&file->lock);
2730            status = FDB_RESULT_FILE_IS_BUSY;
2731            if (!destroy_file_set) { // top level or non-recursive call
2732                hash_free(destroy_set);
2733            }
2734            return status;
2735        }
2736        spin_unlock(&file->lock);
2737        if (file->old_filename) {
2738            status = filemgr_destroy_file(file->old_filename, config,
2739                                          destroy_set);
2740            if (status != FDB_RESULT_SUCCESS) {
2741                if (!destroy_file_set) { // top level or non-recursive call
2742                    hash_free(destroy_set);
2743                }
2744                return status;
2745            }
2746        }
2747
2748        // Cleanup file from in-memory as well as on-disk
2749        e = hash_remove(&hash, &file->e);
2750        fdb_assert(e, e, 0);
2751        filemgr_free_func(&file->e);
2752        if (filemgr_does_file_exist(filename) == FDB_RESULT_SUCCESS) {
2753            if (remove(filename)) {
2754                status = FDB_RESULT_FILE_REMOVE_FAIL;
2755            }
2756        }
2757    } else { // file not in memory, read on-disk to destroy older versions..
2758        file = (struct filemgr *)alca(struct filemgr, 1);
2759        memset(file, 0x0, sizeof(struct filemgr));
2760        file->filename = filename;
2761        file->ops = get_filemgr_ops();
2762        file->fd = file->ops->open(file->filename, O_RDWR, 0666);
2763        file->blocksize = global_config.blocksize;
2764        file->config = (struct filemgr_config *)alca(struct filemgr_config, 1);
2765        *file->config = *config;
2766        fdb_init_encryptor(&file->encryption, &config->encryption_key);
2767        if (file->fd < 0) {
2768            if (file->fd != FDB_RESULT_NO_SUCH_FILE) {
2769                if (!destroy_file_set) { // top level or non-recursive call
2770                    hash_free(destroy_set);
2771                }
2772                return (fdb_status) file->fd;
2773            }
2774        } else { // file successfully opened, seek to end to get DB header
2775            cs_off_t offset = file->ops->goto_eof(file->fd);
2776            if (offset < 0) {
2777                if (!destroy_file_set) { // top level or non-recursive call
2778                    hash_free(destroy_set);
2779                }
2780                return (fdb_status) offset;
2781            } else { // Need to read DB header which contains old filename
2782                atomic_store_uint64_t(&file->pos, offset);
2783                // initialize CRC mode
2784                if (file->config && file->config->options & FILEMGR_CREATE_CRC32) {
2785                    file->crc_mode = CRC32;
2786                } else {
2787                    file->crc_mode = CRC_DEFAULT;
2788                }
2789
2790                status = _filemgr_load_sb(file, NULL);
2791                if (status != FDB_RESULT_SUCCESS) {
2792                    if (!destroy_file_set) { // top level or non-recursive call
2793                        hash_free(destroy_set);
2794                    }
2795                    file->ops->close(file->fd);
2796                    return status;
2797                }
2798
2799                status = _filemgr_read_header(file, NULL);
2800                if (status != FDB_RESULT_SUCCESS) {
2801                    if (!destroy_file_set) { // top level or non-recursive call
2802                        hash_free(destroy_set);
2803                    }
2804                    file->ops->close(file->fd);
2805                    if (sb_ops.release && file->sb) {
2806                        sb_ops.release(file);
2807                    }
2808                    return status;
2809                }
2810                if (file->header.data) {
2811                    size_t new_fnamelen_off = ver_get_new_filename_off(file->
2812                                                                      version);
2813                    size_t old_fnamelen_off = new_fnamelen_off + 2;
2814                    uint16_t *new_filename_len_ptr = (uint16_t *)((char *)
2815                                                     file->header.data
2816                                                     + new_fnamelen_off);
2817                    uint16_t new_filename_len =
2818                                      _endian_decode(*new_filename_len_ptr);
2819                    uint16_t *old_filename_len_ptr = (uint16_t *)((char *)
2820                                                     file->header.data
2821                                                     + old_fnamelen_off);
2822                    uint16_t old_filename_len =
2823                                      _endian_decode(*old_filename_len_ptr);
2824                    old_filename = (char *)file->header.data + old_fnamelen_off
2825                                   + 2 + new_filename_len;
2826                    if (old_filename_len) {
2827                        status = filemgr_destroy_file(old_filename, config,
2828                                                      destroy_set);
2829                    }
2830                    free(file->header.data);
2831                }
2832                file->ops->close(file->fd);
2833                if (sb_ops.release && file->sb) {
2834                    sb_ops.release(file);
2835                }
2836                if (status == FDB_RESULT_SUCCESS) {
2837                    if (filemgr_does_file_exist(filename)
2838                                               == FDB_RESULT_SUCCESS) {
2839                        if (remove(filename)) {
2840                            status = FDB_RESULT_FILE_REMOVE_FAIL;
2841                        }
2842                    }
2843                }
2844            }
2845        }
2846    }
2847
2848    if (!destroy_file_set) { // top level or non-recursive call
2849        hash_free(destroy_set);
2850    }
2851
2852    return status;
2853}
2854
2855bool filemgr_is_rollback_on(struct filemgr *file)
2856{
2857    bool rv;
2858    spin_lock(&file->lock);
2859    rv = (file->fflags & FILEMGR_ROLLBACK_IN_PROG);
2860    spin_unlock(&file->lock);
2861    return rv;
2862}
2863
2864void filemgr_set_rollback(struct filemgr *file, uint8_t new_val)
2865{
2866    spin_lock(&file->lock);
2867    if (new_val) {
2868        file->fflags |= FILEMGR_ROLLBACK_IN_PROG;
2869    } else {
2870        file->fflags &= ~FILEMGR_ROLLBACK_IN_PROG;
2871    }
2872    spin_unlock(&file->lock);
2873}
2874
2875void filemgr_set_cancel_compaction(struct filemgr *file, bool cancel)
2876{
2877    spin_lock(&file->lock);
2878    if (cancel) {
2879        file->fflags |= FILEMGR_CANCEL_COMPACTION;
2880    } else {
2881        file->fflags &= ~FILEMGR_CANCEL_COMPACTION;
2882    }
2883    spin_unlock(&file->lock);
2884}
2885
2886bool filemgr_is_compaction_cancellation_requested(struct filemgr *file)
2887{
2888    bool rv;
2889    spin_lock(&file->lock);
2890    rv = (file->fflags & FILEMGR_CANCEL_COMPACTION);
2891    spin_unlock(&file->lock);
2892    return rv;
2893}
2894
2895void filemgr_set_in_place_compaction(struct filemgr *file,
2896                                     bool in_place_compaction) {
2897    spin_lock(&file->lock);
2898    file->in_place_compaction = in_place_compaction;
2899    spin_unlock(&file->lock);
2900}
2901
2902bool filemgr_is_in_place_compaction_set(struct filemgr *file)
2903
2904{
2905    bool ret = false;
2906    spin_lock(&file->lock);
2907    ret = file->in_place_compaction;
2908    spin_unlock(&file->lock);
2909    return ret;
2910}
2911
2912void filemgr_mutex_openlock(struct filemgr_config *config)
2913{
2914    filemgr_init(config);
2915
2916    spin_lock(&filemgr_openlock);
2917}
2918
2919void filemgr_mutex_openunlock(void)
2920{
2921    spin_unlock(&filemgr_openlock);
2922}
2923
2924void filemgr_mutex_lock(struct filemgr *file)
2925{
2926    mutex_lock(&file->writer_lock.mutex);
2927    file->writer_lock.locked = true;
2928}
2929
2930bool filemgr_mutex_trylock(struct filemgr *file) {
2931    if (mutex_trylock(&file->writer_lock.mutex)) {
2932        file->writer_lock.locked = true;
2933        return true;
2934    }
2935    return false;
2936}
2937
2938void filemgr_mutex_unlock(struct filemgr *file)
2939{
2940    if (file->writer_lock.locked) {
2941        file->writer_lock.locked = false;
2942        mutex_unlock(&file->writer_lock.mutex);
2943    }
2944}
2945
2946bool filemgr_is_commit_header(void *head_buffer, size_t blocksize)
2947{
2948    uint8_t marker[BLK_MARKER_SIZE];
2949    filemgr_magic_t magic;
2950    marker[0] = *(((uint8_t *)head_buffer)
2951                 + blocksize - BLK_MARKER_SIZE);
2952    if (marker[0] != BLK_MARKER_DBHEADER) {
2953        return false;
2954    }
2955
2956    memcpy(&magic, (uint8_t *) head_buffer
2957            + blocksize - BLK_MARKER_SIZE - sizeof(magic), sizeof(magic));
2958    magic = _endian_decode(magic);
2959
2960    return ver_is_valid_magic(magic);
2961}
2962
2963bool filemgr_is_cow_supported(struct filemgr *src, struct filemgr *dst)
2964{
2965    src->fs_type = src->ops->get_fs_type(src->fd);
2966    if (src->fs_type < 0) {
2967        return false;
2968    }
2969    dst->fs_type = dst->ops->get_fs_type(dst->fd);
2970    if (dst->fs_type < 0) {
2971        return false;
2972    }
2973    if (src->fs_type == dst->fs_type && src->fs_type != FILEMGR_FS_NO_COW) {
2974        return true;
2975    }
2976    return false;
2977}
2978
2979void filemgr_set_throttling_delay(struct filemgr *file, uint64_t delay_us)
2980{
2981    atomic_store_uint32_t(&file->throttling_delay, delay_us,
2982                          std::memory_order_relaxed);
2983}
2984
2985uint32_t filemgr_get_throttling_delay(struct filemgr *file)
2986{
2987    return atomic_get_uint32_t(&file->throttling_delay,
2988                               std::memory_order_relaxed);
2989}
2990
2991void filemgr_clear_stale_list(struct filemgr *file)
2992{
2993    if (file->stale_list) {
2994        // if the items in the list are not freed yet, release them first.
2995        struct list_elem *e;
2996        struct stale_data *item;
2997
2998        e = list_begin(file->stale_list);
2999        while (e) {
3000            item = _get_entry(e, struct stale_data, le);
3001            e = list_remove(file->stale_list, e);
3002            free(item);
3003        }
3004        file->stale_list = NULL;
3005    }
3006}
3007
3008void filemgr_add_stale_block(struct filemgr *file,
3009                             bid_t pos,
3010                             size_t len)
3011{
3012    if (file->stale_list) {
3013        struct stale_data *item;
3014        struct list_elem *e;
3015
3016        e = list_end(file->stale_list);
3017
3018        if (e) {
3019            item = _get_entry(e, struct stale_data, le);
3020            if (item->pos + item->len == pos) {
3021                // merge if consecutive item
3022                item->len += len;
3023                return;
3024            }
3025        }
3026
3027        item = (struct stale_data*)calloc(1, sizeof(struct stale_data));
3028        item->pos = pos;
3029        item->len = len;
3030        list_push_back(file->stale_list, &item->le);
3031    }
3032}
3033
3034size_t filemgr_actual_stale_length(struct filemgr *file,
3035                                   bid_t offset,
3036                                   size_t length)
3037{
3038    size_t actual_len;
3039    bid_t start_bid, end_bid;
3040
3041    start_bid = offset / file->blocksize;
3042    end_bid = (offset + length) / file->blocksize;
3043
3044    actual_len = length + (end_bid - start_bid);
3045    if ((offset + actual_len) % file->blocksize ==
3046        file->blocksize - 1) {
3047        actual_len += 1;
3048    }
3049
3050    return actual_len;
3051}
3052
3053// if a document is not physically consecutive,
3054// return all fragmented regions.
3055struct stale_regions filemgr_actual_stale_regions(struct filemgr *file,
3056                                                  bid_t offset,
3057                                                  size_t length)
3058{
3059    uint8_t *buf = alca(uint8_t, file->blocksize);
3060    size_t remaining = length;
3061    size_t real_blocksize = file->blocksize;
3062    size_t blocksize = real_blocksize;
3063    size_t cur_pos, space_in_block, count;
3064    bid_t cur_bid;
3065    bool non_consecutive = ver_non_consecutive_doc(file->version);
3066    struct docblk_meta blk_meta;
3067    struct stale_regions ret;
3068    struct stale_data *arr = NULL, *cur_region;
3069
3070    if (non_consecutive) {
3071        blocksize -= DOCBLK_META_SIZE;
3072
3073        cur_bid = offset / file->blocksize;
3074        // relative position in the block 'cur_bid'
3075        cur_pos = offset % file->blocksize;
3076
3077        count = 0;
3078        while (remaining) {
3079            if (count == 1) {
3080                // more than one stale region .. allocate array
3081                size_t arr_size = (length / blocksize) + 2;
3082                arr = (struct stale_data *)calloc(arr_size, sizeof(struct stale_data));
3083                arr[0] = ret.region;
3084                ret.regions = arr;
3085            }
3086
3087            if (count == 0) {
3088                // Since n_regions will be 1 in most cases,
3089                // we do not allocate heap memory when 'n_regions==1'.
3090                cur_region = &ret.region;
3091            } else {
3092                cur_region = &ret.regions[count];
3093            }
3094            cur_region->pos = (cur_bid * real_blocksize) + cur_pos;
3095
3096            // subtract data size in the current block
3097            space_in_block = blocksize - cur_pos;
3098            if (space_in_block <= remaining) {
3099                // rest of the current block (including block meta)
3100                cur_region->len = real_blocksize - cur_pos;
3101                remaining -= space_in_block;
3102            } else {
3103                cur_region->len = remaining;
3104                remaining = 0;
3105            }
3106            count++;
3107
3108            if (remaining) {
3109                // get next BID
3110                filemgr_read(file, cur_bid, (void *)buf, NULL, true);
3111                memcpy(&blk_meta, buf + blocksize, sizeof(blk_meta));
3112                cur_bid = _endian_decode(blk_meta.next_bid);
3113                cur_pos = 0; // beginning of the block
3114            }
3115        }
3116        ret.n_regions = count;
3117
3118    } else {
3119        // doc blocks are consecutive .. always return a single region.
3120        ret.n_regions = 1;
3121        ret.region.pos = offset;
3122        ret.region.len = filemgr_actual_stale_length(file, offset, length);
3123    }
3124
3125    return ret;
3126}
3127
3128void filemgr_mark_stale(struct filemgr *file,
3129                        bid_t offset,
3130                        size_t length)
3131{
3132    if (file->stale_list && length) {
3133        size_t i;
3134        struct stale_regions sr;
3135
3136        sr = filemgr_actual_stale_regions(file, offset, length);
3137
3138        if (sr.n_regions > 1) {
3139            for (i=0; i<sr.n_regions; ++i){
3140                filemgr_add_stale_block(file, sr.regions[i].pos, sr.regions[i].len);
3141            }
3142            free(sr.regions);
3143        } else if (sr.n_regions == 1) {
3144            filemgr_add_stale_block(file, sr.region.pos, sr.region.len);
3145        }
3146    }
3147}
3148
3149INLINE int _fhandle_idx_cmp(struct avl_node *a, struct avl_node *b, void *aux)
3150{
3151    uint64_t aaa, bbb;
3152    struct filemgr_fhandle_idx_node *aa, *bb;
3153    aa = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
3154    bb = _get_entry(b, struct filemgr_fhandle_idx_node, avl);
3155    aaa = (uint64_t)aa->fhandle;
3156    bbb = (uint64_t)bb->fhandle;
3157
3158#ifdef __BIT_CMP
3159    return _CMP_U64(aaa, bbb);
3160#else
3161    if (aaa < bbb) {
3162        return -1;
3163    } else if (aaa > bbb) {
3164        return 1;
3165    } else {
3166        return 0;
3167    }
3168#endif
3169}
3170
3171void _free_fhandle_idx(struct avl_tree *idx)
3172{
3173    struct avl_node *a;
3174    struct filemgr_fhandle_idx_node *item;
3175
3176    a = avl_first(idx);
3177    while (a) {
3178        item = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
3179        a = avl_next(a);
3180        avl_remove(idx, &item->avl);
3181        free(item);
3182    }
3183}
3184
3185bool filemgr_fhandle_add(struct filemgr *file, void *fhandle)
3186{
3187    bool ret;
3188    struct filemgr_fhandle_idx_node *item, query;
3189    struct avl_node *a;
3190
3191    spin_lock(&file->fhandle_idx_lock);
3192
3193    query.fhandle = fhandle;
3194    a = avl_search(&file->fhandle_idx, &query.avl, _fhandle_idx_cmp);
3195    if (!a) {
3196        // not exist, create a node and insert
3197        item = (struct filemgr_fhandle_idx_node *)calloc(1, sizeof(struct filemgr_fhandle_idx_node));
3198        item->fhandle = fhandle;
3199        avl_insert(&file->fhandle_idx, &item->avl, _fhandle_idx_cmp);
3200        ret = true;
3201    } else {
3202        ret = false;
3203    }
3204
3205    spin_unlock(&file->fhandle_idx_lock);
3206    return ret;
3207}
3208
3209bool filemgr_fhandle_remove(struct filemgr *file, void *fhandle)
3210{
3211    bool ret;
3212    struct filemgr_fhandle_idx_node *item, query;
3213    struct avl_node *a;
3214
3215    spin_lock(&file->fhandle_idx_lock);
3216
3217    query.fhandle = fhandle;
3218    a = avl_search(&file->fhandle_idx, &query.avl, _fhandle_idx_cmp);
3219    if (a) {
3220        // exist, remove & free the item
3221        item = _get_entry(a, struct filemgr_fhandle_idx_node, avl);
3222        avl_remove(&file->fhandle_idx, &item->avl);
3223        free(item);
3224        ret = true;
3225    } else {
3226        ret = false;
3227    }
3228
3229    spin_unlock(&file->fhandle_idx_lock);
3230    return ret;
3231}
3232
3233static void _filemgr_dirty_update_remove_node(struct filemgr *file,
3234                                              struct filemgr_dirty_update_node *node);
3235
3236void filemgr_dirty_update_init(struct filemgr *file)
3237{
3238    avl_init(&file->dirty_update_idx, NULL);
3239    spin_init(&file->dirty_update_lock);
3240    atomic_init_uint64_t(&file->dirty_update_counter, 0);
3241    file->latest_dirty_update = NULL;
3242}
3243
3244void filemgr_dirty_update_free(struct filemgr *file)
3245{
3246    struct avl_node *a = avl_first(&file->dirty_update_idx);
3247    struct filemgr_dirty_update_node *node;
3248
3249    while (a) {
3250        node = _get_entry(a, struct filemgr_dirty_update_node, avl);
3251        a = avl_next(a);
3252        avl_remove(&file->dirty_update_idx, &node->avl);
3253        _filemgr_dirty_update_remove_node(file, node);
3254    }
3255    spin_destroy(&file->dirty_update_lock);
3256}
3257
3258INLINE int _dirty_update_idx_cmp(struct avl_node *a, struct avl_node *b, void *aux)
3259{
3260    struct filemgr_dirty_update_node *aa, *bb;
3261    aa = _get_entry(a, struct filemgr_dirty_update_node, avl);
3262    bb = _get_entry(b, struct filemgr_dirty_update_node, avl);
3263
3264    return _CMP_U64(aa->id, bb->id);
3265}
3266
3267INLINE int _dirty_blocks_cmp(struct avl_node *a, struct avl_node *b, void *aux)
3268{
3269    struct filemgr_dirty_update_block *aa, *bb;
3270    aa = _get_entry(a, struct filemgr_dirty_update_block, avl);
3271    bb = _get_entry(b, struct filemgr_dirty_update_block, avl);
3272
3273    return _CMP_U64(aa->bid, bb->bid);
3274}
3275
3276struct filemgr_dirty_update_node *filemgr_dirty_update_new_node(struct filemgr *file)
3277{
3278    struct filemgr_dirty_update_node *node;
3279
3280    node = (struct filemgr_dirty_update_node *)
3281           calloc(1, sizeof(struct filemgr_dirty_update_node));
3282    node->id = atomic_incr_uint64_t(&file->dirty_update_counter);
3283    node->immutable = false; // currently being written
3284    node->expired = false;
3285    atomic_init_uint32_t(&node->ref_count, 0);
3286    node->idtree_root = node->seqtree_root = BLK_NOT_FOUND;
3287    avl_init(&node->dirty_blocks, NULL);
3288
3289    spin_lock(&file->dirty_update_lock);
3290    avl_insert(&file->dirty_update_idx, &node->avl, _dirty_update_idx_cmp);
3291    spin_unlock(&file->dirty_update_lock);
3292
3293    return node;
3294}
3295
3296struct filemgr_dirty_update_node *filemgr_dirty_update_get_latest(struct filemgr *file)
3297{
3298    struct filemgr_dirty_update_node *node = NULL;
3299
3300    // find the first immutable node from the end
3301    spin_lock(&file->dirty_update_lock);
3302
3303    node = file->latest_dirty_update;
3304    if (node) {
3305        atomic_incr_uint32_t(&node->ref_count);
3306    }
3307
3308    spin_unlock(&file->dirty_update_lock);
3309    return node;
3310}
3311
3312void filemgr_dirty_update_inc_ref_count(struct filemgr_dirty_update_node *node)
3313{
3314    if (!node) {
3315        return;
3316    }
3317    atomic_incr_uint32_t(&node->ref_count);
3318}
3319
3320INLINE void filemgr_dirty_update_flush(struct filemgr *file,
3321                                       struct filemgr_dirty_update_node *node,
3322                                       err_log_callback *log_callback)
3323{
3324    struct avl_node *a;
3325    struct filemgr_dirty_update_block *block;
3326
3327    if (!node) {
3328        return;
3329    }
3330
3331    // Flush all dirty blocks belonging to this dirty update entry
3332    a = <