1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <fcntl.h>
22 #include <sys/stat.h>
23 #include <stdarg.h>
24 #if !defined(WIN32) && !defined(_WIN32)
25 #include <sys/time.h>
26 #include <libgen.h>
27 #endif
28 #include <time.h>
29 
30 #include "filemgr.h"
31 #include "filemgr_ops.h"
32 #include "hash_functions.h"
33 #include "blockcache.h"
34 #include "wal.h"
35 #include "list.h"
36 #include "fdb_internal.h"
37 #include "time_utils.h"
38 #include "encryption.h"
39 #include "version.h"
40 
41 #include "memleak.h"
42 
43 #ifdef __DEBUG
44 #ifndef __DEBUG_FILEMGR
45     #undef DBG
46     #undef DBGCMD
47     #undef DBGSW
48     #define DBG(...)
49     #define DBGCMD(...)
50     #define DBGSW(n, ...)
51 #endif
52 #endif
53 
54 // NBUCKET must be power of 2
55 #define NBUCKET (1024)
56 
57 // global static variables
58 #ifdef SPIN_INITIALIZER
59 static spin_t initial_lock = SPIN_INITIALIZER;
60 #else
61 static volatile unsigned int initial_lock_status = 0;
62 static spin_t initial_lock;
63 #endif
64 
65 
66 static volatile uint8_t filemgr_initialized = 0;
67 extern volatile uint8_t bgflusher_initialized;
68 static struct filemgr_config global_config;
69 static struct hash hash;
70 static spin_t filemgr_openlock;
71 
72 static const int MAX_STAT_UPDATE_RETRIES = 5;
73 
74 struct temp_buf_item{
75     void *addr;
76     struct list_elem le;
77 };
78 static struct list temp_buf;
79 static spin_t temp_buf_lock;
80 
81 static bool lazy_file_deletion_enabled = false;
82 static register_file_removal_func register_file_removal = NULL;
83 static check_file_removal_func is_file_removed = NULL;
84 
85 static struct sb_ops sb_ops;
86 
spin_init_wrap(void *lock)87 static void spin_init_wrap(void *lock) {
88     spin_init((spin_t*)lock);
89 }
90 
spin_destroy_wrap(void *lock)91 static void spin_destroy_wrap(void *lock) {
92     spin_destroy((spin_t*)lock);
93 }
94 
spin_lock_wrap(void *lock)95 static void spin_lock_wrap(void *lock) {
96     spin_lock((spin_t*)lock);
97 }
98 
spin_unlock_wrap(void *lock)99 static void spin_unlock_wrap(void *lock) {
100     spin_unlock((spin_t*)lock);
101 }
102 
mutex_init_wrap(void *lock)103 static void mutex_init_wrap(void *lock) {
104     mutex_init((mutex_t*)lock);
105 }
106 
mutex_destroy_wrap(void *lock)107 static void mutex_destroy_wrap(void *lock) {
108     mutex_destroy((mutex_t*)lock);
109 }
110 
mutex_lock_wrap(void *lock)111 static void mutex_lock_wrap(void *lock) {
112     mutex_lock((mutex_t*)lock);
113 }
114 
mutex_unlock_wrap(void *lock)115 static void mutex_unlock_wrap(void *lock) {
116     mutex_unlock((mutex_t*)lock);
117 }
118 
_kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)119 static int _kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
120 {
121     struct kvs_node *aa, *bb;
122     aa = _get_entry(a, struct kvs_node, avl_id);
123     bb = _get_entry(b, struct kvs_node, avl_id);
124 
125     if (aa->id < bb->id) {
126         return -1;
127     } else if (aa->id > bb->id) {
128         return 1;
129     } else {
130         return 0;
131     }
132 }
133 
_block_is_overlapped(void *pbid1, void *pis_writer1, void *pbid2, void *pis_writer2, void *aux)134 static int _block_is_overlapped(void *pbid1, void *pis_writer1,
135                                 void *pbid2, void *pis_writer2,
136                                 void *aux)
137 {
138     (void)aux;
139     bid_t bid1, is_writer1, bid2, is_writer2;
140     bid1 = *(bid_t*)pbid1;
141     is_writer1 = *(bid_t*)pis_writer1;
142     bid2 = *(bid_t*)pbid2;
143     is_writer2 = *(bid_t*)pis_writer2;
144 
145     if (bid1 != bid2) {
146         // not overlapped
147         return 0;
148     } else {
149         // overlapped
150         if (!is_writer1 && !is_writer2) {
151             // both are readers
152             return 0;
153         } else {
154             return 1;
155         }
156     }
157 }
158 
printISOTime(char* buffer, size_t buffer_len)159 void printISOTime(char* buffer, size_t buffer_len) {
160     struct tm* tm_now;
161     time_t rawtime;
162     time(&rawtime);
163     tm_now = localtime(&rawtime);
164 
165     // 2017-06-22T10:00:00
166     size_t time_len = strftime(buffer, buffer_len,
167                                "%Y-%m-%dT%H:%M:%S", tm_now);
168 
169     // Add milliseconds
170     timeval cur_time;
171     gettimeofday(&cur_time, NULL);
172     size_t milli = cur_time.tv_usec / 1000;
173     // 2017-06-22T10:00:00.123
174     sprintf(buffer + time_len, ".%03d", (int)milli);
175     time_len += 4;
176 
177     // timezone offset format: -0500
178     char tz_offset_str[6];
179     size_t offset_len =  strftime(tz_offset_str, 6,
180                                   "%z", tm_now);
181     if (offset_len < 5) {
182         // Time zone info is not supported, skip it.
183         return;
184     }
185 
186     // hour
187     strncat(buffer, tz_offset_str, 3);
188     // :
189     strcat(buffer, ":");
190     // min
191     strncat(buffer, tz_offset_str + 3, 2);
192     // final format: 2017-06-22T10:00:00.123-05:00
193 }
194 
fdb_log(err_log_callback *log_callback, fdb_status status, const char *format, ...)195 fdb_status fdb_log(err_log_callback *log_callback,
196                    fdb_status status,
197                    const char *format, ...)
198 {
199     char msg[4096];
200     va_list args;
201     va_start(args, format);
202     vsprintf(msg, format, args);
203     va_end(args);
204 
205     if (log_callback && log_callback->callback) {
206         log_callback->callback(status, msg, log_callback->ctx_data);
207     } else {
208         char ISO_time_buffer[64];
209         printISOTime(ISO_time_buffer, 64);
210         if (status != FDB_RESULT_SUCCESS) {
211             fprintf(stderr, "%s [ERRO][FDB] %s\n", ISO_time_buffer, msg);
212         } else {
213             fprintf(stderr, "%s [INFO][FDB] %s\n", ISO_time_buffer, msg);
214         }
215     }
216     return status;
217 }
218 
_log_errno_str(struct filemgr_ops *ops, err_log_callback *log_callback, fdb_status io_error, const char *what, const char *filename)219 static void _log_errno_str(struct filemgr_ops *ops,
220                            err_log_callback *log_callback,
221                            fdb_status io_error,
222                            const char *what,
223                            const char *filename)
224 {
225     if (io_error < 0) {
226         char errno_msg[512];
227         ops->get_errno_str(errno_msg, 512);
228         fdb_log(log_callback, io_error,
229                 "Error in %s on a database file '%s', %s", what, filename, errno_msg);
230     }
231 }
232 
_file_hash(struct hash *hash, struct hash_elem *e)233 static uint32_t _file_hash(struct hash *hash, struct hash_elem *e)
234 {
235     struct filemgr *file = _get_entry(e, struct filemgr, e);
236     int len = strlen(file->filename);
237 
238     return get_checksum(reinterpret_cast<const uint8_t*>(file->filename), len) &
239                         ((unsigned)(NBUCKET-1));
240 }
241 
_file_cmp(struct hash_elem *a, struct hash_elem *b)242 static int _file_cmp(struct hash_elem *a, struct hash_elem *b)
243 {
244     struct filemgr *aa, *bb;
245     aa = _get_entry(a, struct filemgr, e);
246     bb = _get_entry(b, struct filemgr, e);
247     return strcmp(aa->filename, bb->filename);
248 }
249 
filemgr_init(struct filemgr_config *config)250 void filemgr_init(struct filemgr_config *config)
251 {
252     // global initialization
253     // initialized only once at first time
254     if (!filemgr_initialized) {
255 #ifndef SPIN_INITIALIZER
256         // Note that only Windows passes through this routine
257         if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
258             // atomically initialize spin lock only once
259             spin_init(&initial_lock);
260             initial_lock_status = 2;
261         } else {
262             // the others ... wait until initializing 'initial_lock' is done
263             while (initial_lock_status != 2) {
264                 Sleep(1);
265             }
266         }
267 #endif
268 
269         spin_lock(&initial_lock);
270         if (!filemgr_initialized) {
271             memset(&sb_ops, 0x0, sizeof(sb_ops));
272             global_config = *config;
273 
274             if (global_config.ncacheblock > 0)
275                 bcache_init(global_config.ncacheblock, global_config.blocksize);
276 
277             hash_init(&hash, NBUCKET, _file_hash, _file_cmp);
278 
279             // initialize temp buffer
280             list_init(&temp_buf);
281             spin_init(&temp_buf_lock);
282 
283             // initialize global lock
284             spin_init(&filemgr_openlock);
285 
286             // set the initialize flag
287             filemgr_initialized = 1;
288         }
289         spin_unlock(&initial_lock);
290     }
291 }
292 
filemgr_set_lazy_file_deletion(bool enable, register_file_removal_func regis_func, check_file_removal_func check_func)293 void filemgr_set_lazy_file_deletion(bool enable,
294                                     register_file_removal_func regis_func,
295                                     check_file_removal_func check_func)
296 {
297     lazy_file_deletion_enabled = enable;
298     register_file_removal = regis_func;
299     is_file_removed = check_func;
300 }
301 
filemgr_set_sb_operation(struct sb_ops ops)302 void filemgr_set_sb_operation(struct sb_ops ops)
303 {
304     sb_ops = ops;
305 }
306 
_filemgr_get_temp_buf()307 static void * _filemgr_get_temp_buf()
308 {
309     struct list_elem *e;
310     struct temp_buf_item *item;
311 
312     spin_lock(&temp_buf_lock);
313     e = list_pop_front(&temp_buf);
314     if (e) {
315         item = _get_entry(e, struct temp_buf_item, le);
316     } else {
317         void *addr = NULL;
318 
319         malloc_align(addr, FDB_SECTOR_SIZE,
320                      global_config.blocksize + sizeof(struct temp_buf_item));
321 
322         item = (struct temp_buf_item *)((uint8_t *) addr + global_config.blocksize);
323         item->addr = addr;
324     }
325     spin_unlock(&temp_buf_lock);
326 
327     return item->addr;
328 }
329 
_filemgr_release_temp_buf(void *buf)330 static void _filemgr_release_temp_buf(void *buf)
331 {
332     struct temp_buf_item *item;
333 
334     spin_lock(&temp_buf_lock);
335     item = (struct temp_buf_item*)((uint8_t *)buf + global_config.blocksize);
336     list_push_front(&temp_buf, &item->le);
337     spin_unlock(&temp_buf_lock);
338 }
339 
_filemgr_shutdown_temp_buf()340 static void _filemgr_shutdown_temp_buf()
341 {
342     struct list_elem *e;
343     struct temp_buf_item *item;
344     size_t count=0;
345 
346     spin_lock(&temp_buf_lock);
347     e = list_begin(&temp_buf);
348     while(e){
349         item = _get_entry(e, struct temp_buf_item, le);
350         e = list_remove(&temp_buf, e);
351         free_align(item->addr);
352         count++;
353     }
354     spin_unlock(&temp_buf_lock);
355 }
356 
357 // Read a block from the file, decrypting if necessary.
filemgr_read_block(struct filemgr *file, void *buf, bid_t bid)358 static ssize_t filemgr_read_block(struct filemgr *file, void *buf, bid_t bid) {
359     ssize_t result = file->ops->pread(file->fd, buf, file->blocksize,
360                                       file->blocksize*bid);
361     if (file->encryption.ops && result > 0) {
362         if (result != (ssize_t)file->blocksize)
363             return FDB_RESULT_READ_FAIL;
364         fdb_status status = fdb_decrypt_block(&file->encryption, buf, result, bid);
365         if (status != FDB_RESULT_SUCCESS)
366             return status;
367     }
368     return result;
369 }
370 
371 // Write consecutive block(s) to the file, encrypting if necessary.
filemgr_write_blocks(struct filemgr *file, void *buf, unsigned num_blocks, bid_t start_bid)372 ssize_t filemgr_write_blocks(struct filemgr *file, void *buf, unsigned num_blocks, bid_t start_bid) {
373     size_t blocksize = file->blocksize;
374     cs_off_t offset = start_bid * blocksize;
375     size_t nbytes = num_blocks * blocksize;
376     if (file->encryption.ops == NULL) {
377         return file->ops->pwrite(file->fd, buf, nbytes, offset);
378     } else {
379         uint8_t *encrypted_buf;
380         if (nbytes > 4096)
381             encrypted_buf = (uint8_t*)malloc(nbytes);
382         else
383             encrypted_buf = alca(uint8_t, nbytes); // most common case (writing single block)
384         if (!encrypted_buf)
385             return FDB_RESULT_ALLOC_FAIL;
386         fdb_status status = fdb_encrypt_blocks(&file->encryption,
387                                                encrypted_buf,
388                                                buf,
389                                                blocksize,
390                                                num_blocks,
391                                                start_bid);
392         if (nbytes > 4096)
393             free(encrypted_buf);
394         if (status != FDB_RESULT_SUCCESS)
395             return status;
396         return file->ops->pwrite(file->fd, encrypted_buf, nbytes, offset);
397     }
398 }
399 
filemgr_is_writable(struct filemgr *file, bid_t bid)400 int filemgr_is_writable(struct filemgr *file, bid_t bid)
401 {
402     if (sb_bmp_exists(file->sb) && sb_ops.is_writable) {
403         // block reusing is enabled
404         return sb_ops.is_writable(file, bid);
405     } else {
406         uint64_t pos = bid * file->blocksize;
407         // Note that we don't need to grab file->lock here because
408         // 1) both file->pos and file->last_commit are only incremented.
409         // 2) file->last_commit is updated using the value of file->pos,
410         //    and always equal to or smaller than file->pos.
411         return (pos <  atomic_get_uint64_t(&file->pos) &&
412                 pos >= atomic_get_uint64_t(&file->last_commit));
413     }
414 }
415 
filemgr_get_sb_bmp_revnum(struct filemgr *file)416 uint64_t filemgr_get_sb_bmp_revnum(struct filemgr *file)
417 {
418     if (file->sb && sb_ops.get_bmp_revnum) {
419         return sb_ops.get_bmp_revnum(file);
420     } else {
421         return 0;
422     }
423 }
424 
_filemgr_read_header(struct filemgr *file, err_log_callback *log_callback)425 static fdb_status _filemgr_read_header(struct filemgr *file,
426                                        err_log_callback *log_callback)
427 {
428     uint8_t marker[BLK_MARKER_SIZE];
429     filemgr_magic_t magic = ver_get_latest_magic();
430     filemgr_header_len_t len;
431     uint8_t *buf;
432     uint32_t crc, crc_file;
433     bool check_crc32_open_rule = false;
434     fdb_status status = FDB_RESULT_SUCCESS;
435     bid_t hdr_bid, hdr_bid_local;
436     size_t min_filesize = 0;
437     size_t bad_header_count = 0;
438 
439     // get temp buffer
440     buf = (uint8_t *) _filemgr_get_temp_buf();
441 
442     // If a header is found crc_mode can change to reflect the file
443     if (file->crc_mode == CRC32) {
444         check_crc32_open_rule = true;
445     }
446 
447     hdr_bid = atomic_get_uint64_t(&file->pos) / file->blocksize - 1;
448     hdr_bid_local = hdr_bid;
449 
450     if (file->sb) {
451         // superblock exists .. file size does not start from zero.
452         min_filesize = file->sb->config->num_sb * file->blocksize;
453         bid_t sb_last_hdr_bid = atomic_get_uint64_t(&file->sb->last_hdr_bid);
454         if (sb_last_hdr_bid != BLK_NOT_FOUND) {
455             hdr_bid = hdr_bid_local = sb_last_hdr_bid;
456         }
457         // if header info does not exist in superblock,
458         // get DB header at the end of the file.
459     }
460 
461     if (atomic_get_uint64_t(&file->pos) > min_filesize) {
462         // Crash Recovery Test 1: unaligned last block write
463         uint64_t remain = atomic_get_uint64_t(&file->pos) % file->blocksize;
464         if (remain) {
465             atomic_sub_uint64_t(&file->pos, remain);
466             atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
467             const char *msg = "Crash Detected: %" _F64 " non-block aligned bytes discarded "
468                 "from a database file '%s'\n";
469             DBG(msg, remain, file->filename);
470             fdb_log(log_callback, FDB_RESULT_READ_FAIL /* Need to add a better error code*/,
471                     msg, remain, file->filename);
472         }
473 
474         size_t block_counter = 0;
475         do {
476             if (hdr_bid_local * file->blocksize >= file->pos) {
477                 // Handling EOF scenario
478                 status = FDB_RESULT_NO_DB_HEADERS;
479                 const char *msg = "Unable to read block from file '%s' as EOF "
480                                   "reached\n";
481                 fdb_log(log_callback, status, msg, file->filename);
482                 break;
483             }
484             ssize_t rv = filemgr_read_block(file, buf, hdr_bid_local);
485             if (rv != (ssize_t)file->blocksize) {
486                 status = (fdb_status) rv;
487                 const char *msg = "Unable to read a database file '%s' with "
488                                   "blocksize %u\n";
489                 DBG(msg, file->filename, file->blocksize);
490                 fdb_log(log_callback, status, msg, file->filename, file->blocksize);
491                 break;
492             }
493             ++block_counter;
494             memcpy(marker, buf + file->blocksize - BLK_MARKER_SIZE,
495                    BLK_MARKER_SIZE);
496 
497             if (marker[0] == BLK_MARKER_DBHEADER) {
498                 // possible need for byte conversions here
499                 memcpy(&magic,
500                        buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
501                        sizeof(magic));
502                 magic = _endian_decode(magic);
503 
504                 if (ver_is_valid_magic(magic)) {
505 
506                     memcpy(&len,
507                            buf + file->blocksize - BLK_MARKER_SIZE -
508                            sizeof(magic) - sizeof(len),
509                            sizeof(len));
510                     len = _endian_decode(len);
511 
512                     memcpy(&crc_file, buf + len - sizeof(crc), sizeof(crc));
513                     crc_file = _endian_decode(crc_file);
514 
515                     // crc check and detect the crc_mode
516                     if (detect_and_check_crc(reinterpret_cast<const uint8_t*>(buf),
517                                              len - sizeof(crc),
518                                              crc_file,
519                                              &file->crc_mode)) {
520                         // crc mode is detected and known.
521                         // check the rules of opening legacy CRC
522                         if (check_crc32_open_rule && file->crc_mode != CRC32) {
523                             const char *msg = "Open of CRC32C file"
524                                               " with forced CRC32\n";
525                             status = FDB_RESULT_INVALID_ARGS;
526                             DBG(msg);
527                             fdb_log(log_callback, status, msg);
528                             break;
529                         } else {
530                             status = FDB_RESULT_SUCCESS;
531 
532                             file->header.data = (void *)malloc(file->blocksize);
533 
534                             memcpy(file->header.data, buf, len);
535                             memcpy(&file->header.revnum, buf + len,
536                                    sizeof(filemgr_header_revnum_t));
537                             memcpy((void *) &file->header.seqnum,
538                                     buf + len + sizeof(filemgr_header_revnum_t),
539                                     sizeof(fdb_seqnum_t));
540 
541                             if (ver_superblock_support(magic)) {
542                                 // last_writable_bmp_revnum should be same with
543                                 // the current bmp_revnum (since it indicates the
544                                 // 'bmp_revnum' of 'sb->cur_alloc_bid').
545                                 atomic_store_uint64_t(&file->last_writable_bmp_revnum,
546                                                       filemgr_get_sb_bmp_revnum(file));
547                             }
548 
549                             file->header.revnum =
550                                 _endian_decode(file->header.revnum);
551                             file->header.seqnum =
552                                 _endian_decode(file->header.seqnum.load());
553                             file->header.size = len;
554                             atomic_store_uint64_t(&file->header.bid, hdr_bid_local);
555                             memset(&file->header.stat, 0x0, sizeof(file->header.stat));
556 
557                             // release temp buffer
558                             _filemgr_release_temp_buf(buf);
559                         }
560 
561                         file->version = magic;
562                         return status;
563                     } else {
564                         status = FDB_RESULT_CHECKSUM_ERROR;
565                         uint32_t crc32 = 0, crc32c = 0;
566                         crc32 = get_checksum(reinterpret_cast<const uint8_t*>(buf),
567                                              len - sizeof(crc),
568                                              CRC32);
569 #ifdef _CRC32C
570                         crc32c = get_checksum(reinterpret_cast<const uint8_t*>(buf),
571                                               len - sizeof(crc),
572                                               CRC32C);
573 #endif
574                         const char *msg = "Crash Detected: CRC on disk %u != (%u | %u) "
575                             "in a database file '%s'\n";
576                         DBG(msg, crc_file, crc32, crc32c, file->filename);
577                         fdb_log(log_callback, status, msg, crc_file, crc32, crc32c,
578                                 file->filename);
579                     }
580                 } else {
581                     status = FDB_RESULT_FILE_CORRUPTION;
582                     const char *msg = "Crash Detected: Wrong Magic %" _F64
583                                       " in a database file '%s'\n";
584                     fdb_log(log_callback, status, msg, magic, file->filename);
585                 }
586             } else {
587                 status = FDB_RESULT_NO_DB_HEADERS;
588                 if (block_counter == 1) {
589                     const char *msg = "Crash Detected: Last Block not DBHEADER %0.01x "
590                                       "in a database file '%s'\n";
591                     DBG(msg, marker[0], file->filename);
592                     fdb_log(log_callback, status, msg, marker[0], file->filename);
593                 }
594                 // if the header was invalidated due to corruption detection, handle that
595                 if (marker[0] == BLK_MARKER_BAD){
596                     bad_header_count++;
597                     if (file->config->num_keeping_headers &&
598                         bad_header_count >= file->config->num_keeping_headers){
599                         status = FDB_NONRECOVERABLE_ERR;
600                         fdb_log(log_callback, status,
601                                 "Found too many (%d) bad dbheader blocks",
602                                 bad_header_count);
603                         break;
604                     }
605                 }
606             }
607 
608             atomic_store_uint64_t(&file->last_commit, hdr_bid_local * file->blocksize);
609             // traverse headers in a circular manner
610             if (hdr_bid_local) {
611                 hdr_bid_local--;
612             } else {
613                 hdr_bid_local = atomic_get_uint64_t(&file->pos) / file->blocksize - 1;
614             }
615         } while (hdr_bid_local != hdr_bid);
616     }
617 
618     // release temp buffer
619     _filemgr_release_temp_buf(buf);
620 
621     file->header.size = 0;
622     file->header.revnum = 0;
623     file->header.seqnum = 0;
624     file->header.data = NULL;
625     atomic_store_uint64_t(&file->header.bid, 0);
626     memset(&file->header.stat, 0x0, sizeof(file->header.stat));
627     file->version = magic;
628     if (bad_header_count > 0  && status == FDB_RESULT_NO_DB_HEADERS)
629         status = FDB_NONRECOVERABLE_ERR;
630     return status;
631 }
632 
filemgr_get_ref_count(struct filemgr *file)633 size_t filemgr_get_ref_count(struct filemgr *file)
634 {
635     size_t ret = 0;
636     spin_lock(&file->lock);
637     ret = atomic_get_uint32_t(&file->ref_count);
638     spin_unlock(&file->lock);
639     return ret;
640 }
641 
filemgr_get_bcache_used_space(void)642 uint64_t filemgr_get_bcache_used_space(void)
643 {
644     uint64_t bcache_free_space = 0;
645     if (global_config.ncacheblock) { // If buffer cache is indeed configured
646         bcache_free_space = bcache_get_num_free_blocks();
647         bcache_free_space = (global_config.ncacheblock - bcache_free_space)
648                           * global_config.blocksize;
649     }
650     return bcache_free_space;
651 }
652 
653 struct filemgr_prefetch_args {
654     struct filemgr *file;
655     uint64_t duration;
656     err_log_callback *log_callback;
657     void *aux;
658 };
659 
_filemgr_prefetch_thread(void *voidargs)660 static void *_filemgr_prefetch_thread(void *voidargs)
661 {
662     struct filemgr_prefetch_args *args = (struct filemgr_prefetch_args*)voidargs;
663     uint8_t *buf = alca(uint8_t, args->file->blocksize);
664     uint64_t cur_pos = 0, i;
665     uint64_t bcache_free_space;
666     bid_t bid;
667     bool terminate = false;
668     struct timeval begin, cur, gap;
669 
670     spin_lock(&args->file->lock);
671     cur_pos = atomic_get_uint64_t(&args->file->last_commit);
672     spin_unlock(&args->file->lock);
673     if (cur_pos < FILEMGR_PREFETCH_UNIT) {
674         terminate = true;
675     } else {
676         cur_pos -= FILEMGR_PREFETCH_UNIT;
677     }
678     // read backwards from the end of the file, in the unit of FILEMGR_PREFETCH_UNIT
679     gettimeofday(&begin, NULL);
680     while (!terminate) {
681         for (i = cur_pos;
682              i < cur_pos + FILEMGR_PREFETCH_UNIT;
683              i += args->file->blocksize) {
684 
685             gettimeofday(&cur, NULL);
686             gap = _utime_gap(begin, cur);
687             bcache_free_space = bcache_get_num_free_blocks();
688             bcache_free_space *= args->file->blocksize;
689 
690             if (atomic_get_uint8_t(&args->file->prefetch_status)
691                 == FILEMGR_PREFETCH_ABORT ||
692                 gap.tv_sec >= (int64_t)args->duration ||
693                 bcache_free_space < FILEMGR_PREFETCH_UNIT) {
694                 // terminate thread when
695                 // 1. got abort signal
696                 // 2. time out
697                 // 3. not enough free space in block cache
698                 terminate = true;
699                 break;
700             } else {
701                 bid = i / args->file->blocksize;
702                 if (filemgr_read(args->file, bid, buf, NULL, true)
703                         != FDB_RESULT_SUCCESS) {
704                     // 4. read failure
705                     fdb_log(args->log_callback, FDB_RESULT_READ_FAIL,
706                             "Prefetch thread failed to read a block with block id %" _F64
707                             " from a database file '%s'", bid, args->file->filename);
708                     terminate = true;
709                     break;
710                 }
711             }
712         }
713 
714         if (cur_pos >= FILEMGR_PREFETCH_UNIT) {
715             cur_pos -= FILEMGR_PREFETCH_UNIT;
716         } else {
717             // remaining space is less than FILEMGR_PREFETCH_UNIT
718             terminate = true;
719         }
720     }
721 
722     atomic_cas_uint8_t(&args->file->prefetch_status, FILEMGR_PREFETCH_RUNNING,
723                        FILEMGR_PREFETCH_IDLE);
724     free(args);
725     return NULL;
726 }
727 
728 // prefetch the given DB file
filemgr_prefetch(struct filemgr *file, struct filemgr_config *config, err_log_callback *log_callback)729 void filemgr_prefetch(struct filemgr *file,
730                       struct filemgr_config *config,
731                       err_log_callback *log_callback)
732 {
733     uint64_t bcache_free_space;
734 
735     bcache_free_space = bcache_get_num_free_blocks();
736     bcache_free_space *= file->blocksize;
737 
738     // block cache should have free space larger than FILEMGR_PREFETCH_UNIT
739     spin_lock(&file->lock);
740     if (atomic_get_uint64_t(&file->last_commit) > 0 &&
741         bcache_free_space >= FILEMGR_PREFETCH_UNIT) {
742         // invoke prefetch thread
743         struct filemgr_prefetch_args *args;
744         args = (struct filemgr_prefetch_args *)
745                calloc(1, sizeof(struct filemgr_prefetch_args));
746         args->file = file;
747         args->duration = config->prefetch_duration;
748         args->log_callback = log_callback;
749 
750         if (atomic_cas_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_IDLE,
751                                FILEMGR_PREFETCH_RUNNING)) {
752             thread_create(&file->prefetch_tid, _filemgr_prefetch_thread, args);
753         }
754     }
755     spin_unlock(&file->lock);
756 }
757 
filemgr_does_file_exist(char *filename)758 fdb_status filemgr_does_file_exist(char *filename) {
759     struct filemgr_ops *ops = get_filemgr_ops();
760     int fd = ops->open(filename, O_RDONLY, 0444);
761     if (fd < 0) {
762         return (fdb_status) fd;
763     }
764     ops->close(fd);
765     return FDB_RESULT_SUCCESS;
766 }
767 
_filemgr_load_sb(struct filemgr *file, err_log_callback *log_callback)768 static fdb_status _filemgr_load_sb(struct filemgr *file,
769                                    err_log_callback *log_callback)
770 {
771     fdb_status status = FDB_RESULT_SUCCESS;
772     struct sb_config sconfig;
773 
774     if (sb_ops.init && sb_ops.get_default_config && sb_ops.read_latest) {
775         sconfig = sb_ops.get_default_config();
776         if (filemgr_get_pos(file)) {
777             // existing file
778             status = sb_ops.read_latest(file, sconfig, log_callback);
779         } else {
780             // new file
781             status = sb_ops.init(file, sconfig, log_callback);
782         }
783     }
784 
785     return status;
786 }
787 
get_instance_UNLOCKED(const char *filename)788 static filemgr* get_instance_UNLOCKED(const char *filename)
789 {
790     if (!filename) {
791         return NULL;
792     }
793 
794     struct filemgr query;
795     struct hash_elem *e = NULL;
796     struct filemgr *file = NULL;
797 
798     query.filename = (char*)filename;
799     e = hash_find(&hash, &query.e);
800     if (e) {
801         file = _get_entry(e, struct filemgr, e);
802     }
803     return file;
804 }
805 
filemgr_get_instance(const char* filename)806 struct filemgr* filemgr_get_instance(const char* filename)
807 {
808     spin_lock(&filemgr_openlock);
809     struct filemgr *file = get_instance_UNLOCKED(filename);
810     spin_unlock(&filemgr_openlock);
811 
812     return file;
813 }
814 
filemgr_open(char *filename, struct filemgr_ops *ops, struct filemgr_config *config, err_log_callback *log_callback)815 filemgr_open_result filemgr_open(char *filename, struct filemgr_ops *ops,
816                                  struct filemgr_config *config,
817                                  err_log_callback *log_callback)
818 {
819     struct filemgr *file = NULL;
820     struct filemgr query;
821     struct hash_elem *e = NULL;
822     bool create = config->options & FILEMGR_CREATE;
823     int file_flag = 0x0;
824     int fd = -1;
825     fdb_status status;
826     filemgr_open_result result = {NULL, FDB_RESULT_OPEN_FAIL};
827 
828     filemgr_init(config);
829 
830     if (config->encryption_key.algorithm != FDB_ENCRYPTION_NONE && global_config.ncacheblock <= 0) {
831         // cannot use encryption without a block cache
832         result.rv = FDB_RESULT_CRYPTO_ERROR;
833         return result;
834     }
835 
836     // check whether file is already opened or not
837     query.filename = filename;
838     spin_lock(&filemgr_openlock);
839     e = hash_find(&hash, &query.e);
840 
841     if (e) {
842         // already opened (return existing structure)
843         file = _get_entry(e, struct filemgr, e);
844 
845         if (atomic_incr_uint32_t(&file->ref_count) > 1 &&
846             atomic_get_uint8_t(&file->status) != FILE_CLOSED) {
847             spin_unlock(&filemgr_openlock);
848             result.file = file;
849             result.rv = FDB_RESULT_SUCCESS;
850             return result;
851         }
852 
853         spin_lock(&file->lock);
854 
855         if (atomic_get_uint8_t(&file->status) == FILE_CLOSED) { // if file was closed before
856             file_flag = O_RDWR;
857             if (create) {
858                 file_flag |= O_CREAT;
859             }
860             *file->config = *config;
861             file->config->blocksize = global_config.blocksize;
862             file->config->ncacheblock = global_config.ncacheblock;
863             file_flag |= config->flag;
864             file->fd = file->ops->open(file->filename, file_flag, 0666);
865             if (file->fd < 0) {
866                 if (file->fd == FDB_RESULT_NO_SUCH_FILE) {
867                     // A database file was manually deleted by the user.
868                     // Clean up global hash table, WAL index, and buffer cache.
869                     // Then, retry it with a create option below IFF it is not
870                     // a read-only open attempt
871                     struct hash_elem *ret;
872                     spin_unlock(&file->lock);
873                     ret = hash_remove(&hash, &file->e);
874                     fdb_assert(ret, 0, 0);
875                     filemgr_free_func(&file->e);
876                     if (!create) {
877                         _log_errno_str(ops, log_callback,
878                                 FDB_RESULT_NO_SUCH_FILE, "OPEN", filename);
879                         spin_unlock(&filemgr_openlock);
880                         result.rv = FDB_RESULT_NO_SUCH_FILE;
881                         return result;
882                     }
883                 } else {
884                     _log_errno_str(file->ops, log_callback,
885                                   (fdb_status)file->fd, "OPEN", filename);
886                     atomic_decr_uint32_t(&file->ref_count);
887                     spin_unlock(&file->lock);
888                     spin_unlock(&filemgr_openlock);
889                     result.rv = file->fd;
890                     return result;
891                 }
892             } else { // Reopening the closed file is succeed.
893                 atomic_store_uint8_t(&file->status, FILE_NORMAL);
894                 if (config->options & FILEMGR_SYNC) {
895                     file->fflags |= FILEMGR_SYNC;
896                 } else {
897                     file->fflags &= ~FILEMGR_SYNC;
898                 }
899 
900                 spin_unlock(&file->lock);
901                 spin_unlock(&filemgr_openlock);
902 
903                 result.file = file;
904                 result.rv = FDB_RESULT_SUCCESS;
905                 return result;
906             }
907         } else { // file is already opened.
908 
909             if (config->options & FILEMGR_SYNC) {
910                 file->fflags |= FILEMGR_SYNC;
911             } else {
912                 file->fflags &= ~FILEMGR_SYNC;
913             }
914 
915             spin_unlock(&file->lock);
916             spin_unlock(&filemgr_openlock);
917             result.file = file;
918             result.rv = FDB_RESULT_SUCCESS;
919             return result;
920         }
921     }
922 
923     file_flag = O_RDWR;
924     if (create) {
925         file_flag |= O_CREAT;
926     }
927     file_flag |= config->flag;
928     fd = ops->open(filename, file_flag, 0666);
929     if (fd < 0) {
930         _log_errno_str(ops, log_callback, (fdb_status)fd, "OPEN", filename);
931         spin_unlock(&filemgr_openlock);
932         result.rv = fd;
933         return result;
934     }
935     file = (struct filemgr*)calloc(1, sizeof(struct filemgr));
936     file->filename_len = strlen(filename);
937     file->filename = (char*)malloc(file->filename_len + 1);
938     strcpy(file->filename, filename);
939 
940     atomic_init_uint32_t(&file->ref_count, 1);
941     file->stale_list = NULL;
942 
943     status = fdb_init_encryptor(&file->encryption, &config->encryption_key);
944     if (status != FDB_RESULT_SUCCESS) {
945         ops->close(fd);
946         free(file);
947         spin_unlock(&filemgr_openlock);
948         result.rv = status;
949         return result;
950     }
951 
952     file->wal = (struct wal *)calloc(1, sizeof(struct wal));
953     file->wal->flag = 0;
954 
955     file->ops = ops;
956     file->blocksize = global_config.blocksize;
957     atomic_init_uint8_t(&file->status, FILE_NORMAL);
958     file->config = (struct filemgr_config*)malloc(sizeof(struct filemgr_config));
959     *file->config = *config;
960     file->config->blocksize = global_config.blocksize;
961     file->config->ncacheblock = global_config.ncacheblock;
962     file->old_filename = NULL;
963     file->new_filename = NULL;
964     file->fd = fd;
965 
966     cs_off_t offset = file->ops->goto_eof(file->fd);
967     if (offset < 0) {
968         _log_errno_str(file->ops, log_callback, (fdb_status) offset, "SEEK_END", filename);
969         file->ops->close(file->fd);
970         free(file->wal);
971         free(file->filename);
972         free(file->config);
973         free(file);
974         spin_unlock(&filemgr_openlock);
975         result.rv = (fdb_status) offset;
976         return result;
977     }
978     atomic_init_uint64_t(&file->last_commit, offset);
979     atomic_init_uint64_t(&file->last_writable_bmp_revnum, 0);
980     atomic_init_uint64_t(&file->pos, offset);
981     atomic_init_uint32_t(&file->throttling_delay, 0);
982     atomic_init_uint64_t(&file->num_invalidated_blocks, 0);
983     atomic_init_uint8_t(&file->io_in_prog, 0);
984 
985 #ifdef _LATENCY_STATS
986     for (int i = 0; i < FDB_LATENCY_NUM_STATS; ++i) {
987         filemgr_init_latency_stat(&file->lat_stats[i]);
988     }
989 #endif // _LATENCY_STATS
990 
991     file->bcache = NULL;
992     file->in_place_compaction = false;
993     file->kv_header = NULL;
994     atomic_init_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_IDLE);
995 
996     atomic_init_uint64_t(&file->header.bid, 0);
997     _init_op_stats(&file->header.op_stat);
998 
999     spin_init(&file->lock);
1000     file->stale_list = (struct list*)calloc(1, sizeof(struct list));
1001     list_init(file->stale_list);
1002     avl_init(&file->stale_info_tree, NULL);
1003     avl_init(&file->mergetree, NULL);
1004     file->stale_info_tree_loaded = false;
1005 
1006     filemgr_dirty_update_init(file);
1007 
1008     spin_init(&file->fhandle_idx_lock);
1009     avl_init(&file->fhandle_idx, NULL);
1010 
1011 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1012     struct plock_ops pops;
1013     struct plock_config pconfig;
1014 
1015     pops.init_user = mutex_init_wrap;
1016     pops.lock_user = mutex_lock_wrap;
1017     pops.unlock_user = mutex_unlock_wrap;
1018     pops.destroy_user = mutex_destroy_wrap;
1019     pops.init_internal = spin_init_wrap;
1020     pops.lock_internal = spin_lock_wrap;
1021     pops.unlock_internal = spin_unlock_wrap;
1022     pops.destroy_internal = spin_destroy_wrap;
1023     pops.is_overlapped = _block_is_overlapped;
1024 
1025     memset(&pconfig, 0x0, sizeof(pconfig));
1026     pconfig.ops = &pops;
1027     pconfig.sizeof_lock_internal = sizeof(spin_t);
1028     pconfig.sizeof_lock_user = sizeof(mutex_t);
1029     pconfig.sizeof_range = sizeof(bid_t);
1030     pconfig.aux = NULL;
1031     plock_init(&file->plock, &pconfig);
1032 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1033     int i;
1034     for (i=0;i<DLOCK_MAX;++i) {
1035         mutex_init(&file->data_mutex[i]);
1036     }
1037 #else
1038     int i;
1039     for (i=0;i<DLOCK_MAX;++i) {
1040         spin_init(&file->data_spinlock[i]);
1041     }
1042 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1043 
1044     mutex_init(&file->writer_lock.mutex);
1045     file->writer_lock.locked = false;
1046 
1047     // Note: CRC must be initialized before superblock loading
1048     // initialize CRC mode
1049     if (file->config && file->config->options & FILEMGR_CREATE_CRC32) {
1050         file->crc_mode = CRC32;
1051     } else {
1052         file->crc_mode = CRC_DEFAULT;
1053     }
1054 
1055     do { // repeat until both superblock and DB header are correctly read
1056         // init or load superblock
1057         status = _filemgr_load_sb(file, log_callback);
1058         // we can tolerate SB_READ_FAIL for old version file
1059         if (status != FDB_RESULT_SB_READ_FAIL &&
1060             status != FDB_RESULT_SUCCESS) {
1061             _log_errno_str(file->ops, log_callback, status, "READ", file->filename);
1062             file->ops->close(file->fd);
1063             free(file->stale_list);
1064             free(file->wal);
1065             free(file->filename);
1066             free(file->config);
1067             free(file);
1068             spin_unlock(&filemgr_openlock);
1069             result.rv = status;
1070             return result;
1071         }
1072 
1073         // read header
1074         status = _filemgr_read_header(file, log_callback);
1075         if (file->sb && status == FDB_RESULT_NO_DB_HEADERS) {
1076             // this happens when user created & closed a file without any mutations,
1077             // thus there is no other data but superblocks.
1078             // we can tolerate this case.
1079         } else if (status != FDB_RESULT_SUCCESS) {
1080             _log_errno_str(file->ops, log_callback, status, "READ", filename);
1081             file->ops->close(file->fd);
1082             if (file->sb) {
1083                 sb_ops.release(file);
1084             }
1085             free(file->stale_list);
1086             free(file->wal);
1087             free(file->filename);
1088             free(file->config);
1089             free(file);
1090             spin_unlock(&filemgr_openlock);
1091             result.rv = status;
1092             return result;
1093         }
1094 
1095         if (file->sb &&
1096             file->header.revnum != atomic_get_uint64_t(&file->sb->last_hdr_revnum)) {
1097             // superblock exists but the corresponding DB header does not match.
1098             // read another candidate.
1099             continue;
1100         }
1101 
1102         break;
1103     } while (true);
1104 
1105     // initialize WAL
1106     if (!wal_is_initialized(file)) {
1107         wal_init(file, FDB_WAL_NBUCKET);
1108     }
1109 
1110     // init global transaction for the file
1111     file->global_txn.wrapper = (struct wal_txn_wrapper*)
1112                                malloc(sizeof(struct wal_txn_wrapper));
1113     file->global_txn.wrapper->txn = &file->global_txn;
1114     file->global_txn.handle = NULL;
1115     if (atomic_get_uint64_t(&file->pos)) {
1116         file->global_txn.prev_hdr_bid =
1117             (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1;
1118     } else {
1119         file->global_txn.prev_hdr_bid = BLK_NOT_FOUND;
1120     }
1121     file->global_txn.prev_revnum = 0;
1122     file->global_txn.items = (struct list *)malloc(sizeof(struct list));
1123     list_init(file->global_txn.items);
1124     file->global_txn.isolation = FDB_ISOLATION_READ_COMMITTED;
1125     wal_add_transaction(file, &file->global_txn);
1126 
1127     hash_insert(&hash, &file->e);
1128     if (config->prefetch_duration > 0) {
1129         filemgr_prefetch(file, config, log_callback);
1130     }
1131 
1132     spin_unlock(&filemgr_openlock);
1133 
1134     if (config->options & FILEMGR_SYNC) {
1135         file->fflags |= FILEMGR_SYNC;
1136     } else {
1137         file->fflags &= ~FILEMGR_SYNC;
1138     }
1139 
1140     result.file = file;
1141     result.rv = FDB_RESULT_SUCCESS;
1142     fdb_log(log_callback, FDB_RESULT_SUCCESS, "Forestdb opened database file %s",
1143             filename);
1144 
1145     return result;
1146 }
1147 
filemgr_update_header(struct filemgr *file, void *buf, size_t len, bool inc_revnum)1148 uint64_t filemgr_update_header(struct filemgr *file,
1149                                void *buf,
1150                                size_t len,
1151                                bool inc_revnum)
1152 {
1153     uint64_t ret;
1154 
1155     spin_lock(&file->lock);
1156 
1157     if (file->header.data == NULL) {
1158         file->header.data = (void *)malloc(file->blocksize);
1159     }
1160     memcpy(file->header.data, buf, len);
1161     file->header.size = len;
1162     if (inc_revnum) {
1163         ++(file->header.revnum);
1164     }
1165     ret = file->header.revnum;
1166 
1167     spin_unlock(&file->lock);
1168 
1169     return ret;
1170 }
1171 
filemgr_get_header_revnum(struct filemgr *file)1172 filemgr_header_revnum_t filemgr_get_header_revnum(struct filemgr *file)
1173 {
1174     filemgr_header_revnum_t ret;
1175     spin_lock(&file->lock);
1176     ret = file->header.revnum;
1177     spin_unlock(&file->lock);
1178     return ret;
1179 }
1180 
1181 // 'filemgr_get_seqnum', 'filemgr_set_seqnum',
1182 // 'filemgr_get_walflush_revnum', 'filemgr_set_walflush_revnum'
1183 // have to be protected by 'filemgr_mutex_lock' & 'filemgr_mutex_unlock'.
filemgr_get_seqnum(struct filemgr *file)1184 fdb_seqnum_t filemgr_get_seqnum(struct filemgr *file)
1185 {
1186     return file->header.seqnum;
1187 }
1188 
filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)1189 void filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)
1190 {
1191     file->header.seqnum = seqnum;
1192 }
1193 
filemgr_get_header(struct filemgr *file, void *buf, size_t *len, bid_t *header_bid, fdb_seqnum_t *seqnum, filemgr_header_revnum_t *header_revnum)1194 void* filemgr_get_header(struct filemgr *file, void *buf, size_t *len,
1195                          bid_t *header_bid, fdb_seqnum_t *seqnum,
1196                          filemgr_header_revnum_t *header_revnum)
1197 {
1198     spin_lock(&file->lock);
1199 
1200     if (file->header.size > 0) {
1201         if (buf == NULL) {
1202             buf = (void*)malloc(file->header.size);
1203         }
1204         memcpy(buf, file->header.data, file->header.size);
1205     }
1206 
1207     if (len) {
1208         *len = file->header.size;
1209     }
1210     if (header_bid) {
1211         *header_bid = filemgr_get_header_bid(file);
1212     }
1213     if (seqnum) {
1214         *seqnum = file->header.seqnum;
1215     }
1216     if (header_revnum) {
1217         *header_revnum = file->header.revnum;
1218     }
1219 
1220     spin_unlock(&file->lock);
1221 
1222     return buf;
1223 }
1224 
filemgr_fetch_header(struct filemgr *file, uint64_t bid, void *buf, size_t *len, fdb_seqnum_t *seqnum, filemgr_header_revnum_t *header_revnum, uint64_t *deltasize, uint64_t *version, uint64_t *sb_bmp_revnum, err_log_callback *log_callback)1225 fdb_status filemgr_fetch_header(struct filemgr *file, uint64_t bid,
1226                                 void *buf, size_t *len, fdb_seqnum_t *seqnum,
1227                                 filemgr_header_revnum_t *header_revnum,
1228                                 uint64_t *deltasize, uint64_t *version,
1229                                 uint64_t *sb_bmp_revnum,
1230                                 err_log_callback *log_callback)
1231 {
1232     uint8_t *_buf;
1233     uint8_t marker[BLK_MARKER_SIZE];
1234     filemgr_header_len_t hdr_len;
1235     uint64_t _deltasize, _bmp_revnum;
1236     filemgr_magic_t magic;
1237     fdb_status status = FDB_RESULT_SUCCESS;
1238 
1239     *len = 0;
1240 
1241     if (!bid || bid == BLK_NOT_FOUND) {
1242         // No other header available
1243         return FDB_RESULT_SUCCESS;
1244     }
1245 
1246     _buf = (uint8_t *)_filemgr_get_temp_buf();
1247 
1248     status = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
1249 
1250     if (status != FDB_RESULT_SUCCESS) {
1251         fdb_log(log_callback, status,
1252                 "Failed to read a database header with block id %" _F64 " in "
1253                 "a database file '%s'", bid, file->filename);
1254         _filemgr_release_temp_buf(_buf);
1255         return status;
1256     }
1257     memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1258             BLK_MARKER_SIZE);
1259 
1260     if (marker[0] != BLK_MARKER_DBHEADER) {
1261         // Comment this warning log as of now because the circular block reuse
1262         // can cause false alarms as a previous stale header block can be reclaimed
1263         // and reused for incoming writes.
1264         /*
1265         fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1266                 "A block marker of the database header block id %" _F64 " in "
1267                 "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1268                 bid, file->filename);
1269         */
1270         _filemgr_release_temp_buf(_buf);
1271         return FDB_RESULT_READ_FAIL;
1272     }
1273     memcpy(&magic,
1274             _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1275             sizeof(magic));
1276     magic = _endian_decode(magic);
1277     if (!ver_is_valid_magic(magic)) {
1278         fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1279                 "A block magic value of %" _F64 " in the database header block"
1280                 "id %" _F64 " in a database file '%s'"
1281                 "does NOT match FILEMGR_MAGIC %" _F64 "!",
1282                 magic, bid, file->filename, ver_get_latest_magic());
1283         _filemgr_release_temp_buf(_buf);
1284         return FDB_RESULT_FILE_CORRUPTION;
1285     }
1286     memcpy(&hdr_len,
1287             _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1288             sizeof(hdr_len), sizeof(hdr_len));
1289     hdr_len = _endian_decode(hdr_len);
1290 
1291     memcpy(buf, _buf, hdr_len);
1292     *len = hdr_len;
1293     *version = magic;
1294 
1295     if (header_revnum) {
1296         // copy the DB header revnum
1297         filemgr_header_revnum_t _revnum;
1298         memcpy(&_revnum, _buf + hdr_len, sizeof(_revnum));
1299         *header_revnum = _endian_decode(_revnum);
1300     }
1301     if (seqnum) {
1302         // copy default KVS's seqnum
1303         fdb_seqnum_t _seqnum;
1304         memcpy(&_seqnum, _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1305                sizeof(_seqnum));
1306         *seqnum = _endian_decode(_seqnum);
1307     }
1308 
1309     if (ver_is_atleast_magic_001(magic)) {
1310         if (deltasize) {
1311             memcpy(&_deltasize, _buf + file->blocksize - BLK_MARKER_SIZE
1312                     - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1313                     - sizeof(_deltasize), sizeof(_deltasize));
1314             *deltasize = _endian_decode(_deltasize);
1315         }
1316     }
1317 
1318     if (sb_bmp_revnum && ver_superblock_support(magic)) {
1319         memcpy(&_bmp_revnum, _buf + file->blocksize - BLK_MARKER_SIZE
1320                 - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1321                 - sizeof(_deltasize) - sizeof(_bmp_revnum), sizeof(_bmp_revnum));
1322         *sb_bmp_revnum = _endian_decode(_bmp_revnum);
1323     }
1324 
1325     _filemgr_release_temp_buf(_buf);
1326 
1327     return status;
1328 }
1329 
filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid, void *buf, size_t *len, fdb_seqnum_t *seqnum, filemgr_header_revnum_t *revnum, uint64_t *deltasize, uint64_t *version, uint64_t *sb_bmp_revnum, err_log_callback *log_callback)1330 uint64_t filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid,
1331                                    void *buf, size_t *len, fdb_seqnum_t *seqnum,
1332                                    filemgr_header_revnum_t *revnum,
1333                                    uint64_t *deltasize, uint64_t *version,
1334                                    uint64_t *sb_bmp_revnum,
1335                                    err_log_callback *log_callback)
1336 {
1337     uint8_t *_buf;
1338     uint8_t marker[BLK_MARKER_SIZE];
1339     fdb_seqnum_t _seqnum;
1340     filemgr_header_revnum_t _revnum, cur_revnum, prev_revnum;
1341     filemgr_header_len_t hdr_len;
1342     filemgr_magic_t magic;
1343     bid_t _prev_bid, prev_bid;
1344     uint64_t _deltasize, _bmp_revnum;
1345     int found = 0;
1346 
1347     *len = 0;
1348 
1349     if (!bid || bid == BLK_NOT_FOUND) {
1350         // No other header available
1351         return bid;
1352     }
1353     _buf = (uint8_t *)_filemgr_get_temp_buf();
1354 
1355     // Reverse scan the file for a previous DB header
1356     do {
1357         // Get prev_bid from the current header.
1358         // Since the current header is already cached during the previous
1359         // operation, no disk I/O will be triggered.
1360         if (filemgr_read(file, (bid_t)bid, _buf, log_callback, true)
1361                 != FDB_RESULT_SUCCESS) {
1362             break;
1363         }
1364 
1365         memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1366                BLK_MARKER_SIZE);
1367         memcpy(&magic,
1368                _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1369                sizeof(magic));
1370         magic = _endian_decode(magic);
1371 
1372         if (marker[0] != BLK_MARKER_DBHEADER ||
1373             !ver_is_valid_magic(magic)) {
1374             // not a header block
1375             // this happens when this function is invoked between
1376             // fdb_set() call and fdb_commit() call, so the last block
1377             // in the file is not a header block
1378             bid_t latest_hdr = filemgr_get_header_bid(file);
1379             if (latest_hdr != BLK_NOT_FOUND && bid > latest_hdr) {
1380                 // get the latest header BID
1381                 bid = latest_hdr;
1382             } else {
1383                 break;
1384             }
1385             cur_revnum = file->header.revnum + 1;
1386         } else {
1387 
1388             memcpy(&hdr_len,
1389                    _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1390                    sizeof(hdr_len), sizeof(hdr_len));
1391             hdr_len = _endian_decode(hdr_len);
1392 
1393             memcpy(&_revnum, _buf + hdr_len,
1394                    sizeof(filemgr_header_revnum_t));
1395             cur_revnum = _endian_decode(_revnum);
1396 
1397             if (sb_bmp_exists(file->sb)) {
1398                 // first check revnum
1399                 if (cur_revnum <= sb_ops.get_min_live_revnum(file)) {
1400                     // previous headers already have been reclaimed
1401                     // no more logical prev header
1402                     break;
1403                 }
1404             }
1405 
1406             memcpy(&_prev_bid,
1407                    _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1408                        sizeof(hdr_len) - sizeof(_prev_bid),
1409                    sizeof(_prev_bid));
1410             prev_bid = _endian_decode(_prev_bid);
1411             bid = prev_bid;
1412         }
1413 
1414         // Read the prev header
1415         fdb_status fs = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
1416         if (fs != FDB_RESULT_SUCCESS) {
1417             fdb_log(log_callback, fs,
1418                     "Failed to read a previous database header with block id %"
1419                     _F64 " in "
1420                     "a database file '%s'", bid, file->filename);
1421             break;
1422         }
1423 
1424         memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1425                BLK_MARKER_SIZE);
1426         if (marker[0] != BLK_MARKER_DBHEADER) {
1427             if (bid) {
1428                 // broken linked list
1429                 fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1430                         "A block marker of the previous database header block id %"
1431                         _F64 " in "
1432                         "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1433                         bid, file->filename);
1434             }
1435             break;
1436         }
1437 
1438         memcpy(&magic,
1439                _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1440                sizeof(magic));
1441         magic = _endian_decode(magic);
1442         if (!ver_is_valid_magic(magic)) {
1443             // broken linked list
1444             fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1445                     "A block magic value of %" _F64
1446                     " of the previous database header block id %" _F64 " in "
1447                     "a database file '%s' does NOT match FILEMGR_MAGIC %"
1448                     _F64"!", magic,
1449                     bid, file->filename, ver_get_latest_magic());
1450             break;
1451         }
1452 
1453         memcpy(&hdr_len,
1454                _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1455                sizeof(hdr_len), sizeof(hdr_len));
1456         hdr_len = _endian_decode(hdr_len);
1457 
1458         if (buf) {
1459             memcpy(buf, _buf, hdr_len);
1460         }
1461         memcpy(&_revnum, _buf + hdr_len,
1462                sizeof(filemgr_header_revnum_t));
1463         prev_revnum = _endian_decode(_revnum);
1464         if (prev_revnum >= cur_revnum ||
1465             prev_revnum < sb_ops.get_min_live_revnum(file)) {
1466             // no more prev header, or broken linked list
1467             break;
1468         }
1469 
1470         memcpy(&_seqnum,
1471                _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1472                sizeof(fdb_seqnum_t));
1473         if (ver_is_atleast_magic_001(magic)) {
1474             if (deltasize) {
1475                 memcpy(&_deltasize,
1476                         _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic)
1477                        - sizeof(hdr_len) - sizeof(prev_bid) - sizeof(_deltasize),
1478                         sizeof(_deltasize));
1479                 *deltasize = _endian_decode(_deltasize);
1480             }
1481         }
1482 
1483         if (sb_bmp_revnum && ver_superblock_support(magic)) {
1484             memcpy(&_bmp_revnum, _buf + file->blocksize - BLK_MARKER_SIZE
1485                     - sizeof(magic) - sizeof(hdr_len) - sizeof(bid)
1486                     - sizeof(_deltasize) - sizeof(_bmp_revnum), sizeof(_bmp_revnum));
1487             *sb_bmp_revnum = _endian_decode(_bmp_revnum);
1488         }
1489 
1490         if (revnum) {
1491             *revnum = prev_revnum;
1492         }
1493         *seqnum = _endian_decode(_seqnum);
1494         *len = hdr_len;
1495         *version = magic;
1496         found = 1;
1497         break;
1498     } while (false); // no repetition
1499 
1500     if (!found) { // no other header found till end of file
1501         *len = 0;
1502         bid = BLK_NOT_FOUND;
1503     }
1504 
1505     _filemgr_release_temp_buf(_buf);
1506 
1507     return bid;
1508 }
1509 
filemgr_close(struct filemgr *file, bool cleanup_cache_onclose, const char *orig_file_name, err_log_callback *log_callback)1510 fdb_status filemgr_close(struct filemgr *file, bool cleanup_cache_onclose,
1511                          const char *orig_file_name,
1512                          err_log_callback *log_callback)
1513 {
1514     int rv = FDB_RESULT_SUCCESS;
1515 
1516     if (atomic_decr_uint32_t(&file->ref_count) > 0) {
1517         // File is still accessed by other readers or writers.
1518         return FDB_RESULT_SUCCESS;
1519     }
1520 
1521     fdb_log(log_callback, (fdb_status)rv, "Forestdb closed database file %s",
1522             file->filename);
1523 
1524     spin_lock(&filemgr_openlock); // Grab the filemgr lock to avoid the race with
1525                                   // filemgr_open() because file->lock won't
1526                                   // prevent the race condition.
1527 
1528     // remove filemgr structure if no thread refers to the file
1529     spin_lock(&file->lock);
1530     if (atomic_get_uint32_t(&file->ref_count) == 0) {
1531         if (global_config.ncacheblock > 0 &&
1532             atomic_get_uint8_t(&file->status) != FILE_REMOVED_PENDING) {
1533             spin_unlock(&file->lock);
1534             // discard all dirty blocks belonged to this file
1535             bcache_remove_dirty_blocks(file);
1536         } else {
1537             // If the file is in pending removal (i.e., FILE_REMOVED_PENDING),
1538             // then its dirty block entries will be cleaned up in either
1539             // filemgr_free_func() or register_file_removal() below.
1540             spin_unlock(&file->lock);
1541         }
1542 
1543         if (wal_is_initialized(file)) {
1544             wal_close(file, log_callback);
1545         }
1546 #ifdef _LATENCY_STATS_DUMP_TO_FILE
1547         filemgr_dump_latency_stat(file, log_callback);
1548 #endif // _LATENCY_STATS_DUMP_TO_FILE
1549 
1550         spin_lock(&file->lock);
1551 
1552         if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING) {
1553 
1554             bool foreground_deletion = false;
1555             struct filemgr* new_file = get_instance_UNLOCKED(file->new_filename);
1556 
1557             // immediately remove file if background remove function is not set
1558             if (!lazy_file_deletion_enabled ||
1559                 (new_file && new_file->in_place_compaction)) {
1560                 // TODO: to avoid the scenario below, we prevent background
1561                 //       deletion of in-place compacted files at this time.
1562                 // 1) In-place compacted from 'A' to 'A.1'.
1563                 // 2) Request to delete 'A'.
1564                 // 3) Close 'A.1'; since 'A' is not deleted yet, 'A.1' is not renamed.
1565                 // 4) User opens DB file using its original name 'A', not 'A.1'.
1566                 // 5) Old file 'A' is opened, and then background thread deletes 'A'.
1567                 // 6) Crash!
1568 
1569                 // As the file is already unlinked, the file will be removed
1570                 // as soon as we close it.
1571                 rv = file->ops->close(file->fd);
1572                 _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1573 #if defined(WIN32) || defined(_WIN32)
1574                 // For Windows, we need to manually remove the file.
1575                 remove(file->filename);
1576 #endif
1577                 foreground_deletion = true;
1578             }
1579 
1580             // we can release lock becuase no one will open this file
1581             spin_unlock(&file->lock);
1582             struct hash_elem *ret = hash_remove(&hash, &file->e);
1583             fdb_assert(ret, 0, 0);
1584 
1585             spin_unlock(&filemgr_openlock);
1586 
1587             if (foreground_deletion) {
1588                 filemgr_free_func(&file->e);
1589             } else {
1590                 register_file_removal(file, log_callback);
1591             }
1592             return (fdb_status) rv;
1593         } else {
1594 
1595             rv = file->ops->close(file->fd);
1596             if (cleanup_cache_onclose) {
1597                 _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1598                 if (file->in_place_compaction && orig_file_name) {
1599                     struct hash_elem *elem = NULL;
1600                     struct filemgr query;
1601                     uint32_t old_file_refcount = 0;
1602 
1603                     query.filename = (char *)orig_file_name;
1604                     elem = hash_find(&hash, &query.e);
1605 
1606                     if (file->old_filename) {
1607                         struct hash_elem *elem_old = NULL;
1608                         struct filemgr query_old;
1609                         struct filemgr *old_file = NULL;
1610 
1611                         // get old file's ref count if exists
1612                         query_old.filename = file->old_filename;
1613                         elem_old = hash_find(&hash, &query_old.e);
1614                         if (elem_old) {
1615                             old_file = _get_entry(elem_old, struct filemgr, e);
1616                             old_file_refcount = atomic_get_uint32_t(&old_file->ref_count);
1617                         }
1618                     }
1619 
1620                     // If old file is opened by other handle, renaming should be
1621                     // postponed. It will be renamed later by the handle referring
1622                     // to the old file.
1623                     if (!elem && old_file_refcount == 0 &&
1624                         is_file_removed(orig_file_name)) {
1625                         // If background file removal is not done yet, we postpone
1626                         // file renaming at this time.
1627                         if (rename(file->filename, orig_file_name) < 0) {
1628                             // Note that the renaming failure is not a critical
1629                             // issue because the last compacted file will be automatically
1630                             // identified and opened in the next fdb_open call.
1631                             _log_errno_str(file->ops, log_callback, FDB_RESULT_FILE_RENAME_FAIL,
1632                                            "CLOSE", file->filename);
1633                         }
1634                     }
1635                 }
1636                 spin_unlock(&file->lock);
1637                 // Clean up global hash table, WAL index, and buffer cache.
1638                 struct hash_elem *ret = hash_remove(&hash, &file->e);
1639                 fdb_assert(ret, file, 0);
1640 
1641                 spin_unlock(&filemgr_openlock);
1642 
1643                 filemgr_free_func(&file->e);
1644                 return (fdb_status) rv;
1645             } else {
1646                 atomic_store_uint8_t(&file->status, FILE_CLOSED);
1647             }
1648         }
1649     }
1650 
1651     _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1652 
1653     spin_unlock(&file->lock);
1654     spin_unlock(&filemgr_openlock);
1655     return (fdb_status) rv;
1656 }
1657 
filemgr_remove_all_buffer_blocks(struct filemgr *file)1658 void filemgr_remove_all_buffer_blocks(struct filemgr *file)
1659 {
1660     // remove all cached blocks
1661     if (global_config.ncacheblock > 0 &&
1662             file->bcache.load(std::memory_order_relaxed)) {
1663         bcache_remove_dirty_blocks(file);
1664         bcache_remove_clean_blocks(file);
1665         bcache_remove_file(file);
1666         file->bcache.store(NULL, std::memory_order_relaxed);
1667     }
1668 }
1669 
1670 void _free_fhandle_idx(struct avl_tree *idx);
filemgr_free_func(struct hash_elem *h)1671 void filemgr_free_func(struct hash_elem *h)
1672 {
1673     struct filemgr *file = _get_entry(h, struct filemgr, e);
1674 
1675     filemgr_prefetch_status_t prefetch_state =
1676                               atomic_get_uint8_t(&file->prefetch_status);
1677 
1678     atomic_store_uint8_t(&file->prefetch_status, FILEMGR_PREFETCH_ABORT);
1679     if (prefetch_state == FILEMGR_PREFETCH_RUNNING) {
1680         // prefetch thread was running
1681         void *ret;
1682         // wait (the thread must have been created..)
1683         thread_join(file->prefetch_tid, &ret);
1684     }
1685 
1686     // remove all cached blocks
1687     if (global_config.ncacheblock > 0 &&
1688             file->bcache.load(std::memory_order_relaxed)) {
1689         bcache_remove_dirty_blocks(file);
1690         bcache_remove_clean_blocks(file);
1691         bcache_remove_file(file);
1692         file->bcache.store(NULL, std::memory_order_relaxed);
1693     }
1694 
1695     if (file->kv_header) {
1696         // multi KV intance mode & KV header exists
1697         file->free_kv_header(file);
1698     }
1699 
1700     // free global transaction
1701     wal_remove_transaction(file, &file->global_txn);
1702     free(file->global_txn.items);
1703     free(file->global_txn.wrapper);
1704 
1705     // destroy WAL
1706     if (wal_is_initialized(file)) {
1707         wal_shutdown(file, NULL);
1708         wal_destroy(file);
1709     }
1710     free(file->wal);
1711 
1712 #ifdef _LATENCY_STATS
1713     for (int x = 0; x < FDB_LATENCY_NUM_STATS; ++x) {
1714         filemgr_destroy_latency_stat(&file->lat_stats[x]);
1715     }
1716 #endif // _LATENCY_STATS
1717 
1718     // free filename and header
1719     free(file->filename);
1720     if (file->header.data) free(file->header.data);
1721 
1722     // free old/new filename if any
1723     free(file->old_filename);
1724     free(file->new_filename);
1725 
1726     // destroy locks
1727     spin_destroy(&file->lock);
1728 
1729 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1730     plock_destroy(&file->plock);
1731 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1732     int i;
1733     for (i=0;i<DLOCK_MAX;++i) {
1734         mutex_destroy(&file->data_mutex[i]);
1735     }
1736 #else
1737     int i;
1738     for (i=0;i<DLOCK_MAX;++i) {
1739         spin_destroy(&file->data_spinlock[i]);
1740     }
1741 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1742 
1743     mutex_destroy(&file->writer_lock.mutex);
1744 
1745     // free superblock
1746     if (sb_ops.release) {
1747         sb_ops.release(file);
1748     }
1749 
1750     // free dirty update index
1751     filemgr_dirty_update_free(file);
1752 
1753     // free fhandle idx
1754     _free_fhandle_idx(&file->fhandle_idx);
1755     spin_destroy(&file->fhandle_idx_lock);
1756 
1757     // free file structure
1758     struct list *stale_list = filemgr_get_stale_list(file);
1759     filemgr_clear_stale_list(file);
1760     filemgr_clear_stale_info_tree(file);
1761     filemgr_clear_mergetree(file);
1762     free(stale_list);
1763     free(file->config);
1764     free(file);
1765 }
1766 
1767 // permanently remove file from cache (not just close)
1768 // LCOV_EXCL_START
filemgr_remove_file(struct filemgr *file, err_log_callback *log_callback)1769 void filemgr_remove_file(struct filemgr *file, err_log_callback *log_callback)
1770 {
1771     struct hash_elem *ret;
1772 
1773     if (!file || atomic_get_uint32_t(&file->ref_count) > 0) {
1774         return;
1775     }
1776 
1777     // remove from global hash table
1778     spin_lock(&filemgr_openlock);
1779     ret = hash_remove(&hash, &file->e);
1780     fdb_assert(ret, ret, NULL);
1781     spin_unlock(&filemgr_openlock);
1782 
1783     struct filemgr *new_file = filemgr_get_instance(file->new_filename);
1784 
1785     if (!lazy_file_deletion_enabled ||
1786         (new_file && new_file->in_place_compaction)) {
1787         filemgr_free_func(&file->e);
1788     } else {
1789         register_file_removal(file, log_callback);
1790     }
1791 }
1792 // LCOV_EXCL_STOP
1793 
1794 static
_filemgr_is_closed(struct hash_elem *h, void *ctx)1795 void *_filemgr_is_closed(struct hash_elem *h, void *ctx) {
1796     struct filemgr *file = _get_entry(h, struct filemgr, e);
1797     void *ret;
1798     spin_lock(&file->lock);
1799     if (atomic_get_uint32_t(&file->ref_count) != 0) {
1800         ret = (void *)file;
1801     } else {
1802         ret = NULL;
1803     }
1804     spin_unlock(&file->lock);
1805     return ret;
1806 }
1807 
filemgr_shutdown()1808 fdb_status filemgr_shutdown()
1809 {
1810     fdb_status ret = FDB_RESULT_SUCCESS;
1811     void *open_file;
1812     if (filemgr_initialized) {
1813 
1814 #ifndef SPIN_INITIALIZER
1815         // Windows: check if spin lock is already destroyed.
1816         if (InterlockedCompareExchange(&initial_lock_status, 1, 2) == 2) {
1817             spin_lock(&initial_lock);
1818         } else {
1819             // filemgr is already shut down
1820             return ret;
1821         }
1822 #else
1823         spin_lock(&initial_lock);
1824 #endif
1825 
1826         if (!filemgr_initialized) {
1827             // filemgr is already shut down
1828 #ifdef SPIN_INITIALIZER
1829             spin_unlock(&initial_lock);
1830 #endif
1831             return ret;
1832         }
1833 
1834         spin_lock(&filemgr_openlock);
1835         open_file = hash_scan(&hash, _filemgr_is_closed, NULL);
1836         spin_unlock(&filemgr_openlock);
1837         if (!open_file) {
1838             hash_free_active(&hash, filemgr_free_func);
1839             if (global_config.ncacheblock > 0) {
1840                 bcache_shutdown();
1841             }
1842             filemgr_initialized = 0;
1843             _filemgr_shutdown_temp_buf();
1844             spin_unlock(&initial_lock);
1845 #ifndef SPIN_INITIALIZER
1846             initial_lock_status = 0;
1847             spin_destroy(&initial_lock);
1848 #endif
1849         } else {
1850             spin_unlock(&initial_lock);
1851             ret = FDB_RESULT_FILE_IS_BUSY;
1852         }
1853     }
1854     return ret;
1855 }
1856 
filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)1857 bid_t filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)
1858 {
1859     spin_lock(&file->lock);
1860     bid_t bid = BLK_NOT_FOUND;
1861 
1862     // block reusing is not allowed for being compacted file
1863     // for easy implementation.
1864     if (filemgr_get_file_status(file) == FILE_NORMAL &&
1865         file->sb && sb_ops.alloc_block) {
1866         bid = sb_ops.alloc_block(file);
1867     }
1868     if (bid == BLK_NOT_FOUND) {
1869         bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1870         atomic_add_uint64_t(&file->pos, file->blocksize);
1871     }
1872 
1873     if (global_config.ncacheblock <= 0) {
1874         // if block cache is turned off, write the allocated block before use
1875         uint8_t _buf = 0x0;
1876         ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1877                                        (bid+1) * file->blocksize - 1);
1878         _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1879     }
1880     spin_unlock(&file->lock);
1881 
1882     return bid;
1883 }
1884 
1885 // Note that both alloc_multiple & alloc_multiple_cond are not used in
1886 // the new version of DB file (with superblock support).
filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin, bid_t *end, err_log_callback *log_callback)1887 void filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin,
1888                             bid_t *end, err_log_callback *log_callback)
1889 {
1890     spin_lock(&file->lock);
1891     *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1892     *end = *begin + nblock - 1;
1893     atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1894 
1895     if (global_config.ncacheblock <= 0) {
1896         // if block cache is turned off, write the allocated block before use
1897         uint8_t _buf = 0x0;
1898         ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1899                                        atomic_get_uint64_t(&file->pos) - 1);
1900         _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1901     }
1902     spin_unlock(&file->lock);
1903 }
1904 
1905 // atomically allocate NBLOCK blocks only when current file position is same to nextbid
filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock, bid_t *begin, bid_t *end, err_log_callback *log_callback)1906 bid_t filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock,
1907                                   bid_t *begin, bid_t *end,
1908                                   err_log_callback *log_callback)
1909 {
1910     bid_t bid;
1911     spin_lock(&file->lock);
1912     bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1913     if (bid == nextbid) {
1914         *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1915         *end = *begin + nblock - 1;
1916         atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1917 
1918         if (global_config.ncacheblock <= 0) {
1919             // if block cache is turned off, write the allocated block before use
1920             uint8_t _buf = 0x0;
1921             ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1922                                            atomic_get_uint64_t(&file->pos));
1923             _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1924         }
1925     }else{
1926         *begin = BLK_NOT_FOUND;
1927         *end = BLK_NOT_FOUND;
1928     }
1929     spin_unlock(&file->lock);
1930     return bid;
1931 }
1932 
1933 #ifdef __CRC32
_filemgr_crc32_check(struct filemgr *file, void *buf)1934 INLINE fdb_status _filemgr_crc32_check(struct filemgr *file, void *buf)
1935 {
1936     if ( *((uint8_t*)buf + file->blocksize-1) == BLK_MARKER_BNODE ) {
1937         uint32_t crc_file = 0;
1938         memcpy(&crc_file, (uint8_t *) buf + BTREE_CRC_OFFSET, sizeof(crc_file));
1939         crc_file = _endian_decode(crc_file);
1940         memset((uint8_t *) buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1941         if (!perform_integrity_check(reinterpret_cast<const uint8_t*>(buf),
1942                                      file->blocksize,
1943                                      crc_file,
1944                                      file->crc_mode)) {
1945             return FDB_RESULT_CHECKSUM_ERROR;
1946         }
1947     }
1948     return FDB_RESULT_SUCCESS;
1949 }
1950 #endif
1951 
filemgr_invalidate_dbheader(struct filemgr *file, bid_t bid, err_log_callback *log_callback)1952 fdb_status filemgr_invalidate_dbheader(struct filemgr *file, bid_t bid,
1953                                        err_log_callback *log_callback){
1954     uint8_t marker[BLK_MARKER_SIZE];
1955 
1956     // get temp buffer
1957     uint8_t *buf = (uint8_t *) _filemgr_get_temp_buf();
1958 
1959     // get the dbheader
1960     ssize_t rv = filemgr_read_block(file, buf, bid);
1961     if (rv != (ssize_t)file->blocksize) {
1962         _log_errno_str(file->ops, log_callback,
1963                        (fdb_status) rv, "READ", file->filename);
1964         return FDB_RESULT_READ_FAIL;
1965     }
1966     // set the marker to an invalid value
1967     memset(marker, BLK_MARKER_BAD, BLK_MARKER_SIZE);
1968     memcpy((uint8_t *)buf + file->blocksize - BLK_MARKER_SIZE,
1969            marker, BLK_MARKER_SIZE);
1970 
1971     // write it back
1972     rv = filemgr_write_blocks(file, buf, 1, bid);
1973     _log_errno_str(file->ops, log_callback, (fdb_status) rv,
1974                    "WRITE", file->filename);
1975     if (rv != (ssize_t)file->blocksize) {
1976         _filemgr_release_temp_buf(buf);
1977         spin_unlock(&file->lock);
1978         filemgr_clear_io_inprog(file);
1979         return rv < 0 ? (fdb_status) rv : FDB_RESULT_WRITE_FAIL;
1980     }
1981     _filemgr_release_temp_buf(buf);
1982     file->ops->fsync(file->fd);
1983     return FDB_RESULT_SUCCESS;
1984 }
filemgr_invalidate_block(struct filemgr *file, bid_t bid)1985 bool filemgr_invalidate_block(struct filemgr *file, bid_t bid)
1986 {
1987     bool ret;
1988     if (atomic_get_uint64_t(&file->last_commit) < bid * file->blocksize) {
1989         ret = true; // block invalidated was allocated recently (uncommitted)
1990     } else {
1991         ret = false; // a block from the past is invalidated (committed)
1992     }
1993     if (global_config.ncacheblock > 0) {
1994         bcache_invalidate_block(file, bid);
1995     }
1996     return ret;
1997 }
1998 
filemgr_is_fully_resident(struct filemgr *file)1999 bool filemgr_is_fully_resident(struct filemgr *file)
2000 {
2001     bool ret = false;
2002     if (global_config.ncacheblock > 0) {
2003         //TODO: A better thing to do is to track number of document blocks
2004         // and only compare those with the cached document block count
2005         double num_cached_blocks = (double)bcache_get_num_blocks(file);
2006         uint64_t num_blocks = atomic_get_uint64_t(&file->pos)
2007                                  / file->blocksize;
2008         double num_fblocks = (double)num_blocks;
2009         if (num_cached_blocks > num_fblocks * FILEMGR_RESIDENT_THRESHOLD) {
2010             ret = true;
2011         }
2012     }
2013     return ret;
2014 }
2015 
filemgr_flush_immutable(struct filemgr *file, err_log_callback *log_callback)2016 uint64_t filemgr_flush_immutable(struct filemgr *file,
2017                                    err_log_callback *log_callback)
2018 {
2019     uint64_t ret = 0;
2020     if (global_config.ncacheblock > 0) {
2021         if (atomic_get_uint8_t(&file->io_in_prog)) {
2022             return 0;
2023         }
2024         ret = bcache_get_num_immutable(file);
2025         if (!ret) {
2026             return ret;
2027         }
2028         fdb_status rv = bcache_flush_immutable(file);
2029         if (rv != FDB_RESULT_SUCCESS) {
2030             _log_errno_str(file->ops, log_callback, (fdb_status)rv, "WRITE",
2031                            file->filename);
2032         }
2033         return bcache_get_num_immutable(file);
2034     }
2035 
2036     return ret;
2037 }
2038 
filemgr_read(struct filemgr *file, bid_t bid, void *buf, err_log_callback *log_callback, bool read_on_cache_miss)2039 fdb_status filemgr_read(struct filemgr *file, bid_t bid, void *buf,
2040                         err_log_callback *log_callback,
2041                         bool read_on_cache_miss)
2042 {
2043     size_t lock_no;
2044     ssize_t r;
2045     uint64_t pos = bid * file->blocksize;
2046     fdb_status status = FDB_RESULT_SUCCESS;
2047     uint64_t curr_pos = atomic_get_uint64_t(&file->pos);
2048 
2049     if (pos >= curr_pos) {
2050         const char *msg = "Read error: read offset %" _F64 " exceeds the file's "
2051                           "current offset %" _F64 " in a database file '%s'\n";
2052         fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, pos, curr_pos,
2053                 file->filename);
2054         return FDB_RESULT_READ_FAIL;
2055     }
2056 
2057     if (global_config.ncacheblock > 0) {
2058         lock_no = bid % DLOCK_MAX;
2059         (void)lock_no;
2060 
2061 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
2062         plock_entry_t *plock_entry = NULL;
2063         bid_t is_writer = 0;
2064 #endif
2065         bool locked = false;
2066         // Note: we don't need to grab lock for committed blocks
2067         // because they are immutable so that no writer will interfere and
2068         // overwrite dirty data
2069         if (filemgr_is_writable(file, bid)) {
2070 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
2071             plock_entry = plock_lock(&file->plock, &bid, &is_writer);
2072 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2073             mutex_lock(&file->data_mutex[lock_no]);
2074 #else
2075             spin_lock(&file->data_spinlock[lock_no]);
2076 #endif //__FILEMGR_DATA_PARTIAL_LOCK
2077             locked = true;
2078         }
2079 
2080         r = bcache_read(file, bid, buf);
2081         if (r == 0) {
2082             // cache miss
2083             if (!read_on_cache_miss) {
2084                 if (locked) {
2085 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
2086                     plock_unlock(&file->plock, plock_entry);
2087 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2088                     mutex_unlock(&file->data_mutex[lock_no]);
2089 #else
2090                     spin_unlock(&file->data_spinlock[lock_no]);
2091 #endif //__FILEMGR_DATA_PARTIAL_LOCK
2092                 }
2093                 const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2094                     "doesn't exist in the cache and read_on_cache_miss flag is turned on.\n";
2095                 fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, bid,
2096                         file->filename);
2097                 return FDB_RESULT_READ_FAIL;
2098             }
2099 
2100             // if normal file, just read a block
2101             r = filemgr_read_block(file, buf, bid);
2102             if (r != (ssize_t)file->blocksize) {
2103                 _log_errno_str(file->ops, log_callback,
2104                                (fdb_status) r, "READ", file->filename);
2105                 if (locked) {
2106 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
2107                     plock_unlock(&file->plock, plock_entry);
2108 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2109                     mutex_unlock(&file->data_mutex[lock_no]);
2110 #else
2111                     spin_unlock(&file->data_spinlock[lock_no]);
2112 #endif //__FILEMGR_DATA_PARTIAL_LOCK
2113                 }
2114                 const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2115                     "is not read correctly: only %d bytes read.\n";
2116                 status = r < 0 ? (fdb_status)r : FDB_RESULT_READ_FAIL;
2117                 fdb_log(log_callback, status, msg, bid, file->filename, r);
2118                 if (!log_callback || !log_callback->callback) {
2119                     dbg_print_buf(buf, file->blocksize, true, 16);
2120                 }
2121                 return status;
2122             }
2123 #ifdef __CRC32
2124             status = _filemgr_crc32_check(file, buf);
2125             if (status != FDB_RESULT_SUCCESS) {
2126                 _log_errno_str(file->ops, log_callback, status, "READ",
2127                         file->filename);
2128                 if (locked) {
2129 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
2130                     plock_unlock(&file->plock, plock_entry);
2131 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2132                     mutex_unlock(&file->data_mutex[lock_no]);
2133 #else
2134                     spin_unlock(&file->data_spinlock[lock_no]);
2135 #endif //__FILEMGR_DATA_PARTIAL_LOCK
2136                 }
2137                 const char *msg = "Read error: checksum error on BID %" _F64 " in a database file '%s' "
2138                     ": marker %x\n";
2139                 fdb_log(log_callback, status, msg, bid,
2140                         file->filename, *((uint8_t*)buf + file->blocksize-1));
2141                 if (!log_callback || !log_callback->callback) {
2142                     dbg_print_buf(buf, file->blocksize, true, 16);
2143                 }
2144                 return status;
2145             }
2146 #endif
2147             r = bcache_write(file, bid, buf, BCACHE_REQ_CLEAN, false);
2148             if (r != global_config.blocksize) {
2149                 if (locked) {
2150 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
2151                     plock_unlock(&file->plock, plock_entry);
2152 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2153                     mutex_unlock(&file->data_mutex[lock_no]);
2154 #else
2155                     spin_unlock(&file->data_spinlock[lock_no]);
2156 #endif //__FILEMGR_DATA_PARTIAL_LOCK
2157                 }
2158                 _log_errno_str(file->ops, log_callback,
2159                                (fdb_status) r, "WRITE", file->filename);
2160                 const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2161                     "is not written in cache correctly: only %d bytes written.\n";
2162                 status = r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2163                 fdb_log(log_callback, status, msg, bid, file->filename, r);
2164                 if (!log_callback || !log_callback->callback) {
2165                     dbg_print_buf(buf, file->blocksize, true, 16);
2166                 }
2167                 return status;
2168             }
2169         }
2170         if (locked) {
2171 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
2172             plock_unlock(&file->plock, plock_entry);
2173 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2174             mutex_unlock(&file->data_mutex[lock_no]);
2175 #else
2176             spin_unlock(&file->data_spinlock[lock_no]);
2177 #endif //__FILEMGR_DATA_PARTIAL_LOCK
2178         }
2179     } else {
2180         if (!read_on_cache_miss) {
2181             const char *msg = "Read error: BID %" _F64 " in a database file '%s':"
2182                 "block cache is not enabled.\n";
2183             fdb_log(log_callback, FDB_RESULT_READ_FAIL, msg, bid,
2184                     file->filename);
2185             return FDB_RESULT_READ_FAIL;
2186         }
2187 
2188         r = filemgr_read_block(file, buf, bid);
2189         if (r != (ssize_t)file->blocksize) {
2190             _log_errno_str(file->ops, log_callback, (fdb_status) r, "READ",
2191                            file->filename);
2192             const char *msg = "Read error: BID %" _F64 " in a database file '%s' "
2193                 "is not read correctly: only %d bytes read (block cache disabled).\n";
2194             status = (r < 0)? (fdb_status)r : FDB_RESULT_READ_FAIL;
2195             fdb_log(log_callback, status, msg, bid, file->filename, r);
2196             if (!log_callback || !log_callback->callback) {
2197                 dbg_print_buf(buf, file->blocksize, true, 16);
2198             }
2199             return status;
2200         }
2201 
2202 #ifdef __CRC32
2203         status = _filemgr_crc32_check(file, buf);
2204         if (status != FDB_RESULT_SUCCESS) {
2205             _log_errno_str(file->ops, log_callback, status, "READ",
2206                            file->filename);
2207             const char *msg = "Read error: checksum error on BID %" _F64 " in a database file '%s' "
2208                 ": marker %x (block cache disabled)\n";
2209             fdb_log(log_callback, status, msg, bid,
2210                     file->filename, *((uint8_t*)buf + file->blocksize-1));
2211             if (!log_callback || !log_callback->callback) {
2212                 dbg_print_buf(buf, file->blocksize, true, 16);
2213             }
2214             return status;
2215         }
2216 #endif
2217     }
2218     return status;
2219 }
2220 
filemgr_write_offset(struct filemgr *file, bid_t bid, uint64_t offset, uint64_t len, void *buf, bool final_write, err_log_callback *log_callback)2221 fdb_status filemgr_write_offset(struct filemgr *file, bid_t bid,
2222                                 uint64_t offset, uint64_t len, void *buf,
2223                                 bool final_write,
2224                                 err_log_callback *log_callback)
2225 {
2226     size_t lock_no;
2227     ssize_t r = 0;
2228     uint64_t pos = bid * file->blocksize + offset;
2229     uint64_t curr_commit_pos = atomic_get_uint64_t(&file->last_commit);
2230 
2231     if (offset + len > file->blocksize) {
2232         const char *msg = "Write error: trying to write the buffer data "
2233             "(offset: %" _F64 ", len: %" _F64 " that exceeds the block size "
2234             "%" _F64 " in a database file '%s'\n";
2235         fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, offset, len,
2236                 file->blocksize, file->filename);
2237         return FDB_RESULT_WRITE_FAIL;
2238     }
2239 
2240     if (sb_bmp_exists(file->sb)) {
2241         // block reusing is enabled
2242         if (!sb_ops.is_writable(file, bid)) {
2243             const char *msg = "Write error: trying to write at the offset %" _F64 " that is "
2244                               "not identified as a reusable block in "
2245                               "a database file '%s'\n";
2246             fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, pos, file->filename);
2247             return FDB_RESULT_WRITE_FAIL;
2248         }
2249     } else if (pos < curr_commit_pos) {
2250         // stale blocks are not reused yet
2251         if (file->sb == NULL ||
2252             (file->sb && pos >= file->sb->config->num_sb * file->blocksize)) {
2253             // (non-sequential update is exceptionally allowed for superblocks)
2254             const char *msg = "Write error: trying to write at the offset %" _F64 " that is "
2255                               "smaller than the current commit offset %" _F64 " in "
2256                               "a database file '%s'\n";
2257             fdb_log(log_callback, FDB_RESULT_WRITE_FAIL, msg, pos, curr_commit_pos,
2258                     file->filename);
2259             return FDB_RESULT_WRITE_FAIL;
2260         }
2261     }
2262 
2263     if (global_config.ncacheblock > 0) {
2264         lock_no = bid % DLOCK_MAX;
2265         (void)lock_no;
2266 
2267         bool locked = false;
2268 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
2269         plock_entry_t *plock_entry;
2270         bid_t is_writer = 1;
2271         plock_entry = plock_lock(&file->plock, &bid, &is_writer);
2272 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2273         mutex_lock(&file->data_mutex[lock_no]);
2274 #else
2275         spin_lock(&file->data_spinlock[lock_no]);
2276 #endif //__FILEMGR_DATA_PARTIAL_LOCK
2277         locked = true;
2278 
2279         if (len == file->blocksize) {
2280             // write entire block .. we don't need to read previous block
2281             r = bcache_write(file, bid, buf, BCACHE_REQ_DIRTY, final_write);
2282             if (r != global_config.blocksize) {
2283                 if (locked) {
2284 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
2285                     plock_unlock(&file->plock, plock_entry);
2286 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2287                     mutex_unlock(&file->data_mutex[lock_no]);
2288 #else
2289                     spin_unlock(&file->data_spinlock[lock_no]);
2290 #endif //__FILEMGR_DATA_PARTIAL_LOCK
2291                 }
2292                 _log_errno_str(file->ops, log_callback,
2293                                (fdb_status) r, "WRITE", file->filename);
2294                 return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2295             }
2296         } else {
2297             // partially write buffer cache first
2298             r = bcache_write_partial(file, bid, buf, offset, len, final_write);
2299             if (r == 0) {
2300                 // cache miss
2301                 // write partially .. we have to read previous contents of the block
2302                 int64_t cur_file_pos = file->ops->goto_eof(file->fd);
2303                 if (cur_file_pos < 0) {
2304                     _log_errno_str(file->ops, log_callback,
2305                                    (fdb_status) cur_file_pos, "EOF", file->filename);
2306                     return (fdb_status) cur_file_pos;
2307                 }
2308                 bid_t cur_file_last_bid = cur_file_pos / file->blocksize;
2309                 void *_buf = _filemgr_get_temp_buf();
2310 
2311                 if (bid >= cur_file_last_bid) {
2312                     // this is the first time to write this block
2313                     // we don't need to read previous block from file.
2314                 } else {
2315                     r = filemgr_read_block(file, _buf, bid);
2316                     if (r != (ssize_t)file->blocksize) {
2317                         if (locked) {
2318 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
2319                             plock_unlock(&file->plock, plock_entry);
2320 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2321                             mutex_unlock(&file->data_mutex[lock_no]);
2322 #else
2323                             spin_unlock(&file->data_spinlock[lock_no]);
2324 #endif //__FILEMGR_DATA_PARTIAL_LOCK
2325                         }
2326                         _filemgr_release_temp_buf(_buf);
2327                         _log_errno_str(file->ops, log_callback, (fdb_status) r,
2328                                        "READ", file->filename);
2329                         return r < 0 ? (fdb_status) r : FDB_RESULT_READ_FAIL;
2330                     }
2331                 }
2332                 memcpy((uint8_t *)_buf + offset, buf, len);
2333                 r = bcache_write(file, bid, _buf, BCACHE_REQ_DIRTY, final_write);
2334                 if (r != global_config.blocksize) {
2335                     if (locked) {
2336 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
2337                         plock_unlock(&file->plock, plock_entry);
2338 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2339                         mutex_unlock(&file->data_mutex[lock_no]);
2340 #else
2341                         spin_unlock(&file->data_spinlock[lock_no]);
2342 #endif //__FILEMGR_DATA_PARTIAL_LOCK
2343                     }
2344                     _filemgr_release_temp_buf(_buf);
2345                     _log_errno_str(file->ops, log_callback,
2346                             (fdb_status) r, "WRITE", file->filename);
2347                     return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2348                 }
2349 
2350                 _filemgr_release_temp_buf(_buf);
2351             } // cache miss
2352         } // full block or partial block
2353 
2354         if (locked) {
2355 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
2356             plock_unlock(&file->plock, plock_entry);
2357 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
2358             mutex_unlock(&file->data_mutex[lock_no]);
2359 #else
2360             spin_unlock(&file->data_spinlock[lock_no]);
2361 #endif //__FILEMGR_DATA_PARTIAL_LOCK
2362         }
2363     } else { // block cache disabled
2364 
2365 #ifdef __CRC32
2366         if (len == file->blocksize) {
2367             uint8_t marker = *((uint8_t*)buf + file->blocksize - 1);
2368             if (marker == BLK_MARKER_BNODE) {
2369                 memset((uint8_t *)buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
2370                 uint32_t crc32 = get_checksum(reinterpret_cast<const uint8_t*>(buf),
2371                                               file->blocksize,
2372                                               file->crc_mode);
2373                 crc32 = _endian_encode(crc32);
2374                 memcpy((uint8_t *)buf + BTREE_CRC_OFFSET, &crc32, sizeof(crc32));
2375             }
2376         }
2377 #endif
2378 
2379         r = file->ops->pwrite(file->fd, buf, len, pos);
2380         _log_errno_str(file->ops, log_callback, (fdb_status) r, "WRITE", file->filename);
2381         if ((uint64_t)r != len) {
2382             return r < 0 ? (fdb_status) r : FDB_RESULT_WRITE_FAIL;
2383         }
2384     } // block cache check
2385     return FDB_RESULT_SUCCESS;
2386 }
2387 
filemgr_write(struct filemgr *file, bid_t bid, void *buf, err_log_callback *log_callback)2388 fdb_status filemgr_write(struct filemgr *file, bid_t bid, void *buf,
2389                    err_log_callback *log_callback)
2390 {
2391     return filemgr_write_offset(file, bid, 0, file->blocksize, buf,
2392                                 false, // TODO: track immutability of index blk
2393                                 log_callback);
2394 }
2395 
filemgr_commit(struct filemgr *file, bool sync, err_log_callback *log_callback)2396 fdb_status filemgr_commit(struct filemgr *file, bool sync,
2397                           err_log_callback *log_callback)
2398 {
2399     // append header at the end of the file
2400     uint64_t bmp_revnum = 0;
2401     if (sb_ops.get_bmp_revnum) {
2402         bmp_revnum = sb_ops.get_bmp_revnum(file);
2403     }
2404     return filemgr_commit_bid(file, BLK_NOT_FOUND, bmp_revnum,
2405                               sync, log_callback);
2406 }
2407 
filemgr_commit_bid(struct filemgr *file, bid_t bid, uint64_t bmp_revnum, bool sync, err_log_callback *log_callback)2408 fdb_status filemgr_commit_bid(struct filemgr *file, bid_t bid,
2409                               uint64_t bmp_revnum, bool sync,
2410                               err_log_callback *log_callback)
2411 {
2412     struct avl_node *a;
2413     struct kvs_node *node;
2414     bid_t prev_bid, _prev_bid;
2415     uint64_t _deltasize, _bmp_revnum;
2416     fdb_seqnum_t _seqnum;
2417     filemgr_header_revnum_t _revnum;
2418     int result = FDB_RESULT_SUCCESS;
2419     bool block_reusing = false;
2420 
2421     filemgr_set_io_inprog(file);
2422     if (global_config.ncacheblock > 0) {
2423         result = bcache_flush(file);
2424         if (result != FDB_RESULT_SUCCESS) {
2425             _log_errno_str(file->ops, log_callback, (fdb_status) result,
2426                            "FLUSH", file->filename);
2427             filemgr_clear_io_inprog(file);
2428             return (fdb_status)result;
2429         }
2430     }
2431 
2432     spin_lock(&file->lock);
2433 
2434     uint16_t header_len = file->header.size;
2435     struct kvs_header *kv_header = file->kv_header;
2436     filemgr_magic_t magic = file->version;
2437 
2438     if (file->header.size > 0 && file->header.data) {
2439         void *buf = _filemgr_get_temp_buf();
2440         uint8_t marker[BLK_MARKER_SIZE];
2441 
2442         // [header data]:        'header_len' bytes   <---+
2443         // [header revnum]:      8 bytes                  |
2444         // [default KVS seqnum]: 8 bytes                  |
2445         // ...                                            |
2446         // (empty)                                    blocksize
2447         // ...                                            |
2448         // [SB bitmap revnum]:   8 bytes                  |
2449         // [Delta size]:         8 bytes                  |
2450         // [prev header bid]:    8 bytes                  |
2451         // [header length]:      2 bytes                  |
2452         // [magic number]:       8 bytes                  |
2453         // [block marker]:       1 byte               <---+
2454 
2455         // header data
2456         memcpy(buf, file->header.data, header_len);
2457         // header rev number
2458         _revnum = _endian_encode(file->header.revnum);
2459         memcpy((uint8_t *)buf + header_len, &_revnum,
2460                sizeof(filemgr_header_revnum_t));
2461         // file's sequence number (default KVS seqnum)
2462         _seqnum = _endian_encode(file->header.seqnum.load());
2463         memcpy((uint8_t *)buf + header_len + sizeof(filemgr_header_revnum_t),
2464                &_seqnum, sizeof(fdb_seqnum_t));
2465 
2466         // current header's sb bmp revision number
2467         if (file->sb) {
2468             _bmp_revnum = _endian_encode(bmp_revnum);
2469             memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2470                    - sizeof(header_len) - sizeof(_prev_bid)
2471                    - sizeof(_deltasize) - sizeof(_bmp_revnum)
2472                    - BLK_MARKER_SIZE),
2473                    &_bmp_revnum, sizeof(_bmp_revnum));
2474         }
2475 
2476         // delta size since prior commit
2477         _deltasize = _endian_encode(file->header.stat.deltasize //index+data
2478                                   + wal_get_datasize(file)); // wal datasize
2479         memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2480                - sizeof(header_len) - sizeof(_prev_bid)*2 - BLK_MARKER_SIZE),
2481                &_deltasize, sizeof(_deltasize));
2482 
2483         // Reset in-memory delta size of the header for next commit...
2484         file->header.stat.deltasize = 0; // single kv store header
2485         if (kv_header) { // multi kv store stats
2486             a = avl_first(kv_header->idx_id);
2487             while (a) {
2488                 node = _get_entry(a, struct kvs_node, avl_id);
2489                 a = avl_next(&node->avl_id);
2490                 node->stat.deltasize = 0;
2491             }
2492         }
2493 
2494         // prev header bid
2495         prev_bid = atomic_get_uint64_t(&file->header.bid);
2496         _prev_bid = _endian_encode(prev_bid);
2497         memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2498                - sizeof(header_len) - sizeof(_prev_bid) - BLK_MARKER_SIZE),
2499                &_prev_bid, sizeof(_prev_bid));
2500         // header length
2501         header_len = _endian_encode(header_len);
2502         memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2503                - sizeof(header_len) - BLK_MARKER_SIZE),
2504                &header_len, sizeof(header_len));
2505         // magic number
2506         magic = _endian_encode(magic);
2507         memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
2508                - BLK_MARKER_SIZE), &magic, sizeof(magic));
2509 
2510         // marker
2511         memset(marker, BLK_MARKER_DBHEADER, BLK_MARKER_SIZE);
2512         memcpy((uint8_t *)buf + file->blocksize - BLK_MARKER_SIZE,
2513                marker, BLK_MARKER_SIZE);
2514 
2515         if (bid == BLK_NOT_FOUND) {
2516             // append header at the end of file
2517             bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
2518             block_reusing = false;
2519         } else {
2520             // write header in the allocated (reused) block
2521             block_reusing = true;
2522             // we MUST invalidate the header block 'bid', since previous
2523             // contents of 'bid' may remain in block cache and cause data
2524             // inconsistency if reading header block hits the cache.
2525             bcache_invalidate_block(file, bid);
2526         }
2527 
2528         ssize_t rv = filemgr_write_blocks(file, buf, 1, bid);
2529         _log_errno_str(file->ops, log_callback, (fdb_status) rv,
2530                        "WRITE", file->filename);
2531         if (rv != (ssize_t)file->blocksize) {
2532             _filemgr_release_temp_buf(buf);
2533             spin_unlock(&file->lock);
2534             filemgr_clear_io_inprog(file);
2535             return rv < 0 ? (fdb_status) rv : FDB_RESULT_WRITE_FAIL;
2536         }
2537 
2538         if (prev_bid) {
2539             // mark prev DB header as stale
2540             filemgr_add_stale_block(file, prev_bid * file->blocksize, file->blocksize);
2541         }
2542 
2543         atomic_store_uint64_t(&file->header.bid, bid);
2544         if (!block_reusing) {
2545             atomic_add_uint64_t(&file->pos, file->blocksize);
2546         }
2547 
2548         _filemgr_release_temp_buf(buf);
2549     }
2550 
2551     if (sb_bmp_exists(file->sb) &&
2552         atomic_get_uint64_t(&file->sb->cur_alloc_bid) != BLK_NOT_FOUND &&
2553         atomic_get_uint8_t(&file->status) == FILE_NORMAL) {
2554         // block reusing is currently enabled
2555         atomic_store_uint64_t(&file->last_commit,
2556             atomic_get_uint64_t(&file->sb->cur_alloc_bid) * file->blocksize);
2557         // Since some more blocks may be allocated after the header block
2558         // (for storing BMP data or system docs for stale info)
2559         // so that the block pointed to by 'cur_alloc_bid' may have
2560         // different BMP revision number. So we have to use the
2561         // up-to-date bmp_revnum here.
2562         /*
2563           sb_bmp_is_writable() was reporting a block is not writable.
2564           This is because the test bid >= last_commit fails. This is due to
2565           the last_writable_bmp_revnum getting set in all cases, even when
2566           last_commit was set to the end of file.
2567           This causes the a bitmap version mismatch in the check for a writable
2568           block. This fix is a work around to avoid the failure. By setting the
2569           last_writable_bmp_revnum to current bitmap version only when
2570           last_commit is not set to end of file.
2571         */
2572         atomic_store_uint64_t(&file->last_writable_bmp_revnum,
2573                               filemgr_get_sb_bmp_revnum(file));
2574     } else {
2575         atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
2576     }
2577     spin_unlock(&file->lock);
2578 
2579     if (sync) {
2580         result = file->ops->fsync(file->fd);
2581         _log_errno_str(file->ops, log_callback, (fdb_status)result,
2582                        "FSYNC", file->filename);
2583     }
2584     filemgr_clear_io_inprog(file);
2585     return (fdb_status) result;
2586 }
2587 
filemgr_sync(struct filemgr *file, bool sync_option, err_log_callback *log_callback)2588 fdb_status filemgr_sync(struct filemgr *file, bool sync_option,
2589                         err_log_callback *log_callback)
2590 {
2591     fdb_status result = FDB_RESULT_SUCCESS;
2592     if (global_config.ncacheblock > 0) {
2593         result = bcache_flush(file);
2594         if (result != FDB_RESULT_SUCCESS) {
2595             _log_errno_str(file->ops, log_callback, (fdb_status) result,
2596                            "FLUSH", file->filename);
2597             return result;
2598         }
2599     }
2600 
2601     if (sync_option && file->fflags & FILEMGR_SYNC) {
2602         int rv = file->ops->fsync(file->fd);
2603         _log_errno_str(file->ops, log_callback, (fdb_status)rv, "FSYNC", file->filename);
2604         return (fdb_status) rv;
2605     }
2606     return result;
2607 }
2608 
filemgr_copy_file_range(struct filemgr *src_file, struct filemgr *dst_file, bid_t src_bid, bid_t dst_bid, bid_t clone_len)2609 fdb_status filemgr_copy_file_range(struct filemgr *src_file,
2610                                    struct filemgr *dst_file,
2611                                    bid_t src_bid, bid_t dst_bid,
2612                                    bid_t clone_len)
2613 {
2614     uint32_t blocksize = src_file->blocksize;
2615     fdb_status fs = (fdb_status)dst_file->ops->copy_file_range(
2616                                             src_file->fs_type,
2617                                             src_file->fd,
2618                                             dst_file->fd,
2619                                             src_bid * blocksize,
2620                                             dst_bid * blocksize,
2621                                             clone_len * blocksize);
2622     if (fs != FDB_RESULT_SUCCESS) {
2623         return fs;
2624     }
2625     atomic_store_uint64_t(&dst_file->pos, (dst_bid + clone_len) * blocksize);
2626     return FDB_RESULT_SUCCESS;
2627 }
2628 
filemgr_update_file_status(struct filemgr *file, file_status_t status)2629 void filemgr_update_file_status(struct filemgr *file, file_status_t status)
2630 {
2631     spin_lock(&file->lock);
2632     atomic_store_uint8_t(&file->status, status);
2633     spin_unlock(&file->lock);
2634 }
2635 
assign_old_filename(struct filemgr *file, const char *old_filename)2636 static void assign_old_filename(struct filemgr *file, const char *old_filename)
2637 {
2638     free(file->old_filename);
2639     if (old_filename) {
2640         file->old_filename = (char*)malloc(strlen(old_filename) + 1);
2641         strcpy(file->old_filename, old_filename);
2642     } else {
2643         file->old_filename = NULL;
2644     }
2645 }
2646 
assign_new_filename(struct filemgr *file, const char *new_filename)2647 static void assign_new_filename(struct filemgr *file, const char *new_filename)
2648 {
2649     free(file->new_filename);
2650     if (new_filename) {
2651         file->new_filename = (char*)malloc(strlen(new_filename) + 1);
2652         strcpy(file->new_filename, new_filename);
2653     } else {
2654         file->new_filename = NULL;
2655     }
2656 }
2657 
filemgr_update_file_linkage(struct filemgr *file, const char *old_filename, const char *new_filename)2658 bool filemgr_update_file_linkage(struct filemgr *file,
2659                                  const char *old_filename,
2660                                  const char *new_filename)
2661 {
2662 
2663     bool ret = true;
2664     spin_lock(&file->lock);
2665     if (old_filename) {
2666         if (!file->old_filename) {
2667             assign_old_filename(file, old_filename);
2668         } else {
2669             ret = false;
2670             fdb_assert(atomic_get_uint32_t(&file->ref_count),
2671                        atomic_get_uint32_t(&file->ref_count), 0);
2672         }
2673     }
2674     if (new_filename) {
2675         assign_new_filename(file, new_filename);
2676     }
2677     spin_unlock(&file->lock);
2678     return ret;
2679 }
2680 
filemgr_set_compaction_state(struct filemgr *old_file, struct filemgr *new_file, file_status_t status)2681 void filemgr_set_compaction_state(struct filemgr *old_file,
2682                                   struct filemgr *new_file,
2683                                   file_status_t status)
2684 {
2685     if (old_file) {
2686         spin_lock(&old_file->lock);
2687         assign_new_filename(old_file, new_file ? new_file->filename : NULL);
2688         atomic_store_uint8_t(&old_file->status, status);
2689         spin_unlock(&old_file->lock);
2690 
2691         if (new_file) {
2692             spin_lock(&new_file->lock);
2693             assign_old_filename(new_file, old_file->filename);
2694             spin_unlock(&new_file->lock);
2695         }
2696     }
2697 }
2698 
filemgr_set_kv_header(struct filemgr *file, struct kvs_header *kv_header, void (*free_kv_header)(struct filemgr *file))2699 bool filemgr_set_kv_header(struct filemgr *file, struct kvs_header *kv_header,
2700                            void (*free_kv_header)(struct filemgr *file))
2701 {
2702     bool ret;
2703     spin_lock(&file->lock);
2704 
2705     if (!file->kv_header) {
2706         file->kv_header = kv_header;
2707         file->free_kv_header = free_kv_header;
2708         ret = true;
2709     } else {
2710         ret = false;
2711     }
2712 
2713     spin_unlock(&file->lock);
2714 
2715     return ret;
2716 }
2717 
filemgr_get_kv_header(struct filemgr *file)2718 struct kvs_header *filemgr_get_kv_header(struct filemgr *file)
2719 {
2720     struct kvs_header *kv_header = NULL;
2721     spin_lock(&file->lock);
2722     kv_header = file->kv_header;
2723     spin_unlock(&file->lock);
2724     return kv_header;
2725 }
2726 
2727 // Check if there is a file that still points to the old_file that is being
2728 // compacted away. If so open the file and return its pointer.
2729 static
_filemgr_check_stale_link(struct hash_elem *h, void *ctx)2730 void *_filemgr_check_stale_link(struct hash_elem *h, void *ctx) {
2731     struct filemgr *cur_file = (struct filemgr *)ctx;
2732     struct filemgr *file = _get_entry(h, struct filemgr, e);
2733     spin_lock(&file->lock);
2734     if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING &&
2735         !strcmp(file->new_filename, cur_file->filename)) {
2736         // Incrementing reference counter below is the same as filemgr_open()
2737         // We need to do this to ensure that the pointer returned does not
2738         // get freed outside the filemgr_open lock
2739         atomic_incr_uint32_t(&file->ref_count);
2740         spin_unlock(&file->lock);
2741         return (void *)file;
2742     }
2743     spin_unlock(&file->lock);
2744     return (void *)NULL;
2745 }
2746 
filemgr_search_stale_links(struct filemgr *cur_file)2747 struct filemgr *filemgr_search_stale_links(struct filemgr *cur_file) {
2748     struct filemgr *very_old_file;
2749     spin_lock(&filemgr_openlock);
2750     very_old_file = (struct filemgr *)hash_scan(&hash,
2751                                          _filemgr_check_stale_link, cur_file);
2752     spin_unlock(&filemgr_openlock);
2753     return very_old_file;
2754 }
2755 
filemgr_redirect_old_file(struct filemgr *very_old_file, struct filemgr *new_file, filemgr_redirect_hdr_func redirect_header_func)2756 char *filemgr_redirect_old_file(struct filemgr *very_old_file,
2757                                 struct filemgr *new_file,
2758                                 filemgr_redirect_hdr_func
2759                                 redirect_header_func) {
2760     if (!very_old_file || !new_file) {
2761         return NULL;
2762     }
2763 
2764     size_t old_header_len, new_header_len;
2765     uint16_t new_filename_len;
2766     char *past_filename;
2767     spin_lock(&very_old_file->lock);
2768 
2769     struct filemgr *new_file_of_very_old_file =
2770         filemgr_get_instance(very_old_file->new_filename);
2771 
2772     if (very_old_file->header.size == 0 || !new_file_of_very_old_file) {
2773         spin_unlock(&very_old_file->lock);
2774         return NULL;
2775     }
2776 
2777     old_header_len = very_old_file->header.size;
2778     new_filename_len = strlen(new_file->filename);
2779     // Find out the new DB header length with new_file's filename
2780     new_header_len = old_header_len
2781                      - strlen(new_file_of_very_old_file->filename)
2782                      + new_filename_len;
2783     // As we are going to change the new_filename field in the DB header of the
2784     // very_old_file, maybe reallocate DB header buf to accomodate bigger value
2785     if (new_header_len > old_header_len) {
2786         very_old_file->header.data = realloc(very_old_file->header.data,
2787                                              new_file->blocksize);
2788     }
2789     // Re-direct very_old_file to new_file
2790     assign_new_filename(very_old_file, new_file->filename);
2791     // Note that the old_filename of the new_file is not updated, this
2792     // is so that every file in the history is reachable from the current file.
2793 
2794     past_filename = redirect_header_func(very_old_file,
2795                                          (uint8_t *)very_old_file->header.data,
2796                                          new_file);//Update in-memory header
2797     very_old_file->header.size = new_header_len;
2798     ++(very_old_file->header.revnum);
2799 
2800     spin_unlock(&very_old_file->lock);
2801     return past_filename;
2802 }
2803 
2804 #include <string>
compare_directory_paths(char *file1, char *file2)2805 int compare_directory_paths(char *file1, char *file2) {
2806     std::string path1(file1);
2807     std::string path2(file2);
2808     std::string dir1;
2809     std::string dir2;
2810     char sep = '/';
2811 #ifdef _WIN32
2812     sep = '\\';
2813 #endif
2814 
2815     size_t i = path1.rfind(sep, path1.length());
2816     if (i != std::string::npos) {
2817         dir1 = path1.substr(0,i);
2818     } else {
2819         dir1 = ".";
2820     }
2821 
2822     i = path2.rfind(sep, path2.length());
2823     if (i != std::string::npos) {
2824         dir2 = path2.substr(0,i);
2825     } else {
2826         dir2 = ".";
2827     }
2828     return dir1.compare(dir2);
2829 }
2830 
filemgr_remove_pending(struct filemgr *old_file, struct filemgr *new_file, err_log_callback *log_callback)2831 void filemgr_remove_pending(struct filemgr *old_file,
2832                             struct filemgr *new_file,
2833                             err_log_callback *log_callback)
2834 {
2835     if (new_file == NULL) {
2836         return;
2837     }
2838 
2839     int diff_dir = compare_directory_paths(old_file->filename, new_file->filename);
2840     spin_lock(&old_file->lock);
2841     if (atomic_get_uint32_t(&old_file->ref_count) > 0) {
2842         // delay removing
2843         assign_new_filename(old_file, new_file->filename);
2844         if (!diff_dir) {
2845             atomic_store_uint8_t(&old_file->status, FILE_REMOVED_PENDING);
2846         }
2847 #if !(defined(WIN32) || defined(_WIN32))
2848         // Only for Posix
2849         int ret;
2850         /* if the old_file and new_file are in different directories
2851            do not remove the old_file
2852         */
2853         if (!diff_dir){
2854             ret = unlink(old_file->filename);
2855             _log_errno_str(old_file->ops, log_callback, (fdb_status)ret,
2856                        "UNLINK", old_file->filename);
2857         }
2858 #endif
2859 
2860         spin_unlock(&old_file->lock);
2861 
2862         // Update new_file's old_filename
2863         spin_lock(&new_file->lock);
2864         assign_old_filename(new_file, old_file->filename);
2865         spin_unlock(&new_file->lock);
2866     } else {
2867         // immediatly remove
2868         // LCOV_EXCL_START
2869         spin_unlock(&old_file->lock);
2870 
2871         struct filemgr *new_file_of_old_file =
2872             filemgr_get_instance(old_file->new_filename);
2873 
2874         if (!lazy_file_deletion_enabled ||
2875             (new_file_of_old_file && new_file_of_old_file->in_place_compaction)) {
2876             if (!diff_dir) {
2877                 remove(old_file->filename);
2878             }
2879         }
2880         filemgr_remove_file(old_file, log_callback);
2881         // LCOV_EXCL_STOP
2882     }
2883 }
2884 
2885 // migrate default kv store stats over to new_file
filemgr_migrate_op_stats(struct filemgr *old_file, struct filemgr *new_file, struct kvs_info *kvs)2886 struct kvs_ops_stat *filemgr_migrate_op_stats(struct filemgr *old_file,
2887                                               struct filemgr *new_file,
2888                                               struct kvs_info *kvs)
2889 {
2890     kvs_ops_stat *ret = NULL;
2891     if (new_file == NULL) {
2892         return NULL;
2893     }
2894 
2895     spin_lock(&old_file->lock);
2896     new_file->header.op_stat = old_file->header.op_stat;
2897     ret = &new_file->header.op_stat;
2898     spin_unlock(&old_file->lock);
2899     return ret;
2900 }
2901 
2902 // Note: filemgr_openlock should be held before calling this function.
filemgr_destroy_file(char *filename, struct filemgr_config *config, struct hash *destroy_file_set)2903 fdb_status filemgr_destroy_file(char *filename,
2904                                 struct filemgr_config *config,
2905                                 struct hash *destroy_file_set)
2906 {
2907     struct filemgr *file = NULL;
2908     struct hash to_destroy_files;
2909     struct hash *destroy_set = (destroy_file_set ? destroy_file_set :
2910                                                   &to_destroy_files);
2911     struct filemgr query;
2912     struct hash_elem *e = NULL;
2913     fdb_status status = FDB_RESULT_SUCCESS;
2914     char *old_filename = NULL;
2915 
2916     if (!destroy_file_set)