1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <fcntl.h>
22 #include <sys/stat.h>
23 #include <stdarg.h>
24 #if !defined(WIN32) && !defined(_WIN32)
25 #include <sys/time.h>
26 #endif
27 
28 #include "filemgr.h"
29 #include "filemgr_ops.h"
30 #include "hash_functions.h"
31 #include "blockcache.h"
32 #include "wal.h"
33 #include "list.h"
34 #include "fdb_internal.h"
35 #include "time_utils.h"
36 
37 #include "memleak.h"
38 
39 #ifdef __DEBUG
40 #ifndef __DEBUG_FILEMGR
41     #undef DBG
42     #undef DBGCMD
43     #undef DBGSW
44     #define DBG(...)
45     #define DBGCMD(...)
46     #define DBGSW(n, ...)
47 #endif
48 #endif
49 
50 // NBUCKET must be power of 2
51 #define NBUCKET (1024)
52 #define FILEMGR_MAGIC (UINT64_C(0xdeadcafebeefbeef))
53 
54 // global static variables
55 #ifdef SPIN_INITIALIZER
56 static spin_t initial_lock = SPIN_INITIALIZER;
57 #else
58 static volatile unsigned int initial_lock_status = 0;
59 static spin_t initial_lock;
60 #endif
61 
62 
63 static volatile uint8_t filemgr_initialized = 0;
64 static struct filemgr_config global_config;
65 static struct hash hash;
66 static spin_t filemgr_openlock;
67 
68 struct temp_buf_item{
69     void *addr;
70     struct list_elem le;
71 };
72 static struct list temp_buf;
73 static spin_t temp_buf_lock;
74 
75 static bool lazy_file_deletion_enabled = false;
76 static register_file_removal_func register_file_removal = NULL;
77 static check_file_removal_func is_file_removed = NULL;
78 
spin_init_wrap(void *lock)79 static void spin_init_wrap(void *lock) {
80     spin_init((spin_t*)lock);
81 }
82 
spin_destroy_wrap(void *lock)83 static void spin_destroy_wrap(void *lock) {
84     spin_destroy((spin_t*)lock);
85 }
86 
spin_lock_wrap(void *lock)87 static void spin_lock_wrap(void *lock) {
88     spin_lock((spin_t*)lock);
89 }
90 
spin_unlock_wrap(void *lock)91 static void spin_unlock_wrap(void *lock) {
92     spin_unlock((spin_t*)lock);
93 }
94 
mutex_init_wrap(void *lock)95 static void mutex_init_wrap(void *lock) {
96     mutex_init((mutex_t*)lock);
97 }
98 
mutex_destroy_wrap(void *lock)99 static void mutex_destroy_wrap(void *lock) {
100     mutex_destroy((mutex_t*)lock);
101 }
102 
mutex_lock_wrap(void *lock)103 static void mutex_lock_wrap(void *lock) {
104     mutex_lock((mutex_t*)lock);
105 }
106 
mutex_unlock_wrap(void *lock)107 static void mutex_unlock_wrap(void *lock) {
108     mutex_unlock((mutex_t*)lock);
109 }
110 
_kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)111 static int _kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
112 {
113     struct kvs_node *aa, *bb;
114     aa = _get_entry(a, struct kvs_node, avl_id);
115     bb = _get_entry(b, struct kvs_node, avl_id);
116 
117     if (aa->id < bb->id) {
118         return -1;
119     } else if (aa->id > bb->id) {
120         return 1;
121     } else {
122         return 0;
123     }
124 }
125 
_block_is_overlapped(void *pbid1, void *pis_writer1, void *pbid2, void *pis_writer2, void *aux)126 static int _block_is_overlapped(void *pbid1, void *pis_writer1,
127                                 void *pbid2, void *pis_writer2,
128                                 void *aux)
129 {
130     (void)aux;
131     bid_t bid1, is_writer1, bid2, is_writer2;
132     bid1 = *(bid_t*)pbid1;
133     is_writer1 = *(bid_t*)pis_writer1;
134     bid2 = *(bid_t*)pbid2;
135     is_writer2 = *(bid_t*)pis_writer2;
136 
137     if (bid1 != bid2) {
138         // not overlapped
139         return 0;
140     } else {
141         // overlapped
142         if (!is_writer1 && !is_writer2) {
143             // both are readers
144             return 0;
145         } else {
146             return 1;
147         }
148     }
149 }
150 
fdb_log(err_log_callback *log_callback, fdb_status status, const char *format, ...)151 fdb_status fdb_log(err_log_callback *log_callback,
152                    fdb_status status,
153                    const char *format, ...)
154 {
155     if (log_callback && log_callback->callback) {
156         char msg[1024];
157         va_list args;
158         va_start(args, format);
159         vsprintf(msg, format, args);
160         va_end(args);
161         log_callback->callback(status, msg, log_callback->ctx_data);
162     }
163     return status;
164 }
165 
_log_errno_str(struct filemgr_ops *ops, err_log_callback *log_callback, fdb_status io_error, const char *what, const char *filename)166 static void _log_errno_str(struct filemgr_ops *ops,
167                            err_log_callback *log_callback,
168                            fdb_status io_error,
169                            const char *what,
170                            const char *filename)
171 {
172     if (io_error < 0) {
173         char errno_msg[512];
174         ops->get_errno_str(errno_msg, 512);
175         fdb_log(log_callback, io_error,
176                 "Error in %s on a database file '%s', %s", what, filename, errno_msg);
177     }
178 }
179 
_file_hash(struct hash *hash, struct hash_elem *e)180 static uint32_t _file_hash(struct hash *hash, struct hash_elem *e)
181 {
182     struct filemgr *file = _get_entry(e, struct filemgr, e);
183     int len = strlen(file->filename);
184     return chksum(file->filename, len) & ((unsigned)(NBUCKET-1));
185 }
186 
_file_cmp(struct hash_elem *a, struct hash_elem *b)187 static int _file_cmp(struct hash_elem *a, struct hash_elem *b)
188 {
189     struct filemgr *aa, *bb;
190     aa = _get_entry(a, struct filemgr, e);
191     bb = _get_entry(b, struct filemgr, e);
192     return strcmp(aa->filename, bb->filename);
193 }
194 
filemgr_init(struct filemgr_config *config)195 void filemgr_init(struct filemgr_config *config)
196 {
197     // global initialization
198     // initialized only once at first time
199     if (!filemgr_initialized) {
200 #ifndef SPIN_INITIALIZER
201         // Note that only Windows passes through this routine
202         if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
203             // atomically initialize spin lock only once
204             spin_init(&initial_lock);
205             initial_lock_status = 2;
206         } else {
207             // the others ... wait until initializing 'initial_lock' is done
208             while (initial_lock_status != 2) {
209                 Sleep(1);
210             }
211         }
212 #endif
213 
214         spin_lock(&initial_lock);
215         if (!filemgr_initialized) {
216             global_config = *config;
217 
218             if (global_config.ncacheblock > 0)
219                 bcache_init(global_config.ncacheblock, global_config.blocksize);
220 
221             hash_init(&hash, NBUCKET, _file_hash, _file_cmp);
222 
223             // initialize temp buffer
224             list_init(&temp_buf);
225             spin_init(&temp_buf_lock);
226 
227             // initialize global lock
228             spin_init(&filemgr_openlock);
229 
230             // set the initialize flag
231             filemgr_initialized = 1;
232         }
233         spin_unlock(&initial_lock);
234     }
235 }
236 
filemgr_set_lazy_file_deletion(bool enable, register_file_removal_func regis_func, check_file_removal_func check_func)237 void filemgr_set_lazy_file_deletion(bool enable,
238                                     register_file_removal_func regis_func,
239                                     check_file_removal_func check_func)
240 {
241     lazy_file_deletion_enabled = enable;
242     register_file_removal = regis_func;
243     is_file_removed = check_func;
244 }
245 
_filemgr_get_temp_buf()246 static void * _filemgr_get_temp_buf()
247 {
248     struct list_elem *e;
249     struct temp_buf_item *item;
250 
251     spin_lock(&temp_buf_lock);
252     e = list_pop_front(&temp_buf);
253     if (e) {
254         item = _get_entry(e, struct temp_buf_item, le);
255     } else {
256         void *addr;
257 
258         malloc_align(addr, FDB_SECTOR_SIZE,
259                      global_config.blocksize + sizeof(struct temp_buf_item));
260 
261         item = (struct temp_buf_item *)((uint8_t *) addr + global_config.blocksize);
262         item->addr = addr;
263     }
264     spin_unlock(&temp_buf_lock);
265 
266     return item->addr;
267 }
268 
_filemgr_release_temp_buf(void *buf)269 static void _filemgr_release_temp_buf(void *buf)
270 {
271     struct temp_buf_item *item;
272 
273     spin_lock(&temp_buf_lock);
274     item = (struct temp_buf_item*)((uint8_t *)buf + global_config.blocksize);
275     list_push_front(&temp_buf, &item->le);
276     spin_unlock(&temp_buf_lock);
277 }
278 
_filemgr_shutdown_temp_buf()279 static void _filemgr_shutdown_temp_buf()
280 {
281     struct list_elem *e;
282     struct temp_buf_item *item;
283     size_t count=0;
284 
285     spin_lock(&temp_buf_lock);
286     e = list_begin(&temp_buf);
287     while(e){
288         item = _get_entry(e, struct temp_buf_item, le);
289         e = list_remove(&temp_buf, e);
290         free_align(item->addr);
291         count++;
292     }
293     spin_unlock(&temp_buf_lock);
294 }
295 
_filemgr_read_header(struct filemgr *file, err_log_callback *log_callback)296 static fdb_status _filemgr_read_header(struct filemgr *file,
297                                        err_log_callback *log_callback)
298 {
299     uint8_t marker[BLK_MARKER_SIZE];
300     filemgr_magic_t magic;
301     filemgr_header_len_t len;
302     uint8_t *buf;
303     uint32_t crc, crc_file;
304     fdb_status status = FDB_RESULT_SUCCESS;
305 
306     // get temp buffer
307     buf = (uint8_t *) _filemgr_get_temp_buf();
308 
309     if (atomic_get_uint64_t(&file->pos) > 0) {
310         // Crash Recovery Test 1: unaligned last block write
311         uint64_t remain = atomic_get_uint64_t(&file->pos) % file->blocksize;
312         if (remain) {
313             atomic_sub_uint64_t(&file->pos, remain);
314             atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
315             const char *msg = "Crash Detected: %" _F64 " non-block aligned bytes discarded "
316                 "from a database file '%s'\n";
317             DBG(msg, remain, file->filename);
318             fdb_log(log_callback, FDB_RESULT_READ_FAIL /* Need to add a better error code*/,
319                     msg, remain, file->filename);
320         }
321 
322         size_t block_counter = 0;
323         do {
324             ssize_t rv = file->ops->pread(file->fd, buf, file->blocksize,
325                 atomic_get_uint64_t(&file->pos) - file->blocksize);
326             if (rv != file->blocksize) {
327                 status = FDB_RESULT_READ_FAIL;
328                 const char *msg = "Unable to read a database file '%s' with "
329                     "blocksize %" _F64 "\n";
330                 DBG(msg, file->filename, file->blocksize);
331                 fdb_log(log_callback, status, msg, file->filename, file->blocksize);
332                 break;
333             }
334             ++block_counter;
335             memcpy(marker, buf + file->blocksize - BLK_MARKER_SIZE,
336                    BLK_MARKER_SIZE);
337 
338             if (marker[0] == BLK_MARKER_DBHEADER) {
339                 // possible need for byte conversions here
340                 memcpy(&magic,
341                        buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
342                        sizeof(magic));
343                 magic = _endian_decode(magic);
344 
345                 if (magic == FILEMGR_MAGIC) {
346                     memcpy(&len,
347                            buf + file->blocksize - BLK_MARKER_SIZE -
348                            sizeof(magic) - sizeof(len),
349                            sizeof(len));
350                     len = _endian_decode(len);
351 
352                     crc = chksum(buf, len - sizeof(crc));
353                     memcpy(&crc_file, buf + len - sizeof(crc), sizeof(crc));
354                     crc_file = _endian_decode(crc_file);
355                     if (crc == crc_file) {
356                         file->header.data = (void *)malloc(len);
357 
358                         memcpy(file->header.data, buf, len);
359                         memcpy(&file->header.revnum, buf + len,
360                                sizeof(filemgr_header_revnum_t));
361                         memcpy((void *) &file->header.seqnum,
362                                 buf + len + sizeof(filemgr_header_revnum_t),
363                                 sizeof(fdb_seqnum_t));
364                         file->header.revnum =
365                             _endian_decode(file->header.revnum);
366                         file->header.seqnum =
367                             _endian_decode(file->header.seqnum);
368                         file->header.size = len;
369                         atomic_store_uint64_t(&file->header.bid,
370                             (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1);
371                         atomic_store_uint64_t(&file->header.dirty_idtree_root,
372                                               BLK_NOT_FOUND);
373                         atomic_store_uint64_t(&file->header.dirty_seqtree_root,
374                                               BLK_NOT_FOUND);
375                         memset(&file->header.stat, 0x0, sizeof(file->header.stat));
376 
377                         // release temp buffer
378                         _filemgr_release_temp_buf(buf);
379 
380                         return FDB_RESULT_SUCCESS;
381                     } else {
382                         status = FDB_RESULT_CHECKSUM_ERROR;
383                         const char *msg = "Crash Detected: CRC on disk %u != %u "
384                             "in a database file '%s'\n";
385                         DBG(msg, crc_file, crc, file->filename);
386                         fdb_log(log_callback, status, msg, crc_file, crc,
387                                 file->filename);
388                     }
389                 } else {
390                     status = FDB_RESULT_FILE_CORRUPTION;
391                     const char *msg = "Crash Detected: Wrong Magic %" _F64 " != %" _F64
392                         " in a database file '%s'\n";
393                     DBG(msg, magic, FILEMGR_MAGIC, file->filename);
394                     fdb_log(log_callback, status, msg, magic, FILEMGR_MAGIC,
395                             file->filename);
396                 }
397             } else {
398                 status = FDB_RESULT_NO_DB_HEADERS;
399                 if (block_counter == 1) {
400                     const char *msg = "Crash Detected: Last Block not DBHEADER %0.01x "
401                                       "in a database file '%s'\n";
402                     DBG(msg, marker[0], file->filename);
403                     fdb_log(log_callback, status, msg, marker[0], file->filename);
404                 }
405             }
406 
407             atomic_sub_uint64_t(&file->pos, file->blocksize);
408             atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
409         } while (atomic_get_uint64_t(&file->pos));
410     }
411 
412     // release temp buffer
413     _filemgr_release_temp_buf(buf);
414 
415     file->header.size = 0;
416     file->header.revnum = 0;
417     file->header.seqnum = 0;
418     file->header.data = NULL;
419     atomic_store_uint64_t(&file->header.bid, 0);
420     atomic_store_uint64_t(&file->header.dirty_idtree_root, BLK_NOT_FOUND);
421     atomic_store_uint64_t(&file->header.dirty_seqtree_root, BLK_NOT_FOUND);
422     memset(&file->header.stat, 0x0, sizeof(file->header.stat));
423     return status;
424 }
425 
filemgr_get_ref_count(struct filemgr *file)426 size_t filemgr_get_ref_count(struct filemgr *file)
427 {
428     size_t ret = 0;
429     spin_lock(&file->lock);
430     ret = file->ref_count;
431     spin_unlock(&file->lock);
432     return ret;
433 }
434 
filemgr_get_bcache_used_space(void)435 uint64_t filemgr_get_bcache_used_space(void)
436 {
437     uint64_t bcache_free_space = 0;
438     if (global_config.ncacheblock) { // If buffer cache is indeed configured
439         bcache_free_space = bcache_get_num_free_blocks();
440         bcache_free_space = (global_config.ncacheblock - bcache_free_space)
441                           * global_config.blocksize;
442     }
443     return bcache_free_space;
444 }
445 
446 struct filemgr_prefetch_args {
447     struct filemgr *file;
448     uint64_t duration;
449     err_log_callback *log_callback;
450     void *aux;
451 };
452 
_filemgr_prefetch_thread(void *voidargs)453 static void *_filemgr_prefetch_thread(void *voidargs)
454 {
455     struct filemgr_prefetch_args *args = (struct filemgr_prefetch_args*)voidargs;
456     uint8_t *buf = alca(uint8_t, args->file->blocksize);
457     uint64_t cur_pos = 0, i;
458     uint64_t bcache_free_space;
459     bid_t bid;
460     bool terminate = false;
461     struct timeval begin, cur, gap;
462 
463     spin_lock(&args->file->lock);
464     cur_pos = atomic_get_uint64_t(&args->file->last_commit);
465     spin_unlock(&args->file->lock);
466     if (cur_pos < FILEMGR_PREFETCH_UNIT) {
467         terminate = true;
468     } else {
469         cur_pos -= FILEMGR_PREFETCH_UNIT;
470     }
471     // read backwards from the end of the file, in the unit of FILEMGR_PREFETCH_UNIT
472     gettimeofday(&begin, NULL);
473     while (!terminate) {
474         for (i = cur_pos;
475              i < cur_pos + FILEMGR_PREFETCH_UNIT;
476              i += args->file->blocksize) {
477 
478             gettimeofday(&cur, NULL);
479             gap = _utime_gap(begin, cur);
480             bcache_free_space = bcache_get_num_free_blocks();
481             bcache_free_space *= args->file->blocksize;
482 
483             if (args->file->prefetch_status == FILEMGR_PREFETCH_ABORT ||
484                 gap.tv_sec >= (int64_t)args->duration ||
485                 bcache_free_space < FILEMGR_PREFETCH_UNIT) {
486                 // terminate thread when
487                 // 1. got abort signal
488                 // 2. time out
489                 // 3. not enough free space in block cache
490                 terminate = true;
491                 break;
492             } else {
493                 bid = i / args->file->blocksize;
494                 if (filemgr_read(args->file, bid, buf, NULL, true)
495                         != FDB_RESULT_SUCCESS) {
496                     // 4. read failure
497                     fdb_log(args->log_callback, FDB_RESULT_READ_FAIL,
498                             "Prefetch thread failed to read a block with block id %" _F64
499                             " from a database file '%s'", bid, args->file->filename);
500                     terminate = true;
501                     break;
502                 }
503             }
504         }
505 
506         if (cur_pos >= FILEMGR_PREFETCH_UNIT) {
507             cur_pos -= FILEMGR_PREFETCH_UNIT;
508         } else {
509             // remaining space is less than FILEMGR_PREFETCH_UNIT
510             terminate = true;
511         }
512     }
513 
514     args->file->prefetch_status = FILEMGR_PREFETCH_IDLE;
515     free(args);
516     return NULL;
517 }
518 
519 // prefetch the given DB file
filemgr_prefetch(struct filemgr *file, struct filemgr_config *config, err_log_callback *log_callback)520 void filemgr_prefetch(struct filemgr *file,
521                       struct filemgr_config *config,
522                       err_log_callback *log_callback)
523 {
524     uint64_t bcache_free_space;
525 
526     bcache_free_space = bcache_get_num_free_blocks();
527     bcache_free_space *= file->blocksize;
528 
529     // block cache should have free space larger than FILEMGR_PREFETCH_UNIT
530     spin_lock(&file->lock);
531     if (atomic_get_uint64_t(&file->last_commit) > 0 &&
532         bcache_free_space >= FILEMGR_PREFETCH_UNIT) {
533         // invoke prefetch thread
534         struct filemgr_prefetch_args *args;
535         args = (struct filemgr_prefetch_args *)
536                calloc(1, sizeof(struct filemgr_prefetch_args));
537         args->file = file;
538         args->duration = config->prefetch_duration;
539         args->log_callback = log_callback;
540 
541         file->prefetch_status = FILEMGR_PREFETCH_RUNNING;
542         thread_create(&file->prefetch_tid, _filemgr_prefetch_thread, args);
543     }
544     spin_unlock(&file->lock);
545 }
546 
filemgr_does_file_exist(char *filename)547 fdb_status filemgr_does_file_exist(char *filename) {
548     struct filemgr_ops *ops = get_filemgr_ops();
549     int fd = ops->open(filename, O_RDONLY, 0444);
550     if (fd < 0) {
551         return (fdb_status) fd;
552     }
553     ops->close(fd);
554     return FDB_RESULT_SUCCESS;
555 }
556 
filemgr_open(char *filename, struct filemgr_ops *ops, struct filemgr_config *config, err_log_callback *log_callback)557 filemgr_open_result filemgr_open(char *filename, struct filemgr_ops *ops,
558                                  struct filemgr_config *config,
559                                  err_log_callback *log_callback)
560 {
561     struct filemgr *file = NULL;
562     struct filemgr query;
563     struct hash_elem *e = NULL;
564     bool create = config->options & FILEMGR_CREATE;
565     int file_flag = 0x0;
566     int fd = -1;
567     fdb_status status;
568     filemgr_open_result result = {NULL, FDB_RESULT_OPEN_FAIL};
569 
570     filemgr_init(config);
571 
572     // check whether file is already opened or not
573     query.filename = filename;
574     spin_lock(&filemgr_openlock);
575     e = hash_find(&hash, &query.e);
576 
577     if (e) {
578         // already opened (return existing structure)
579         file = _get_entry(e, struct filemgr, e);
580 
581         spin_lock(&file->lock);
582         file->ref_count++;
583 
584         if (atomic_get_uint8_t(&file->status) == FILE_CLOSED) { // if file was closed before
585             file_flag = O_RDWR;
586             if (create) {
587                 file_flag |= O_CREAT;
588             }
589             *file->config = *config;
590             file->config->blocksize = global_config.blocksize;
591             file->config->ncacheblock = global_config.ncacheblock;
592             file_flag |= config->flag;
593             file->fd = file->ops->open(file->filename, file_flag, 0666);
594             if (file->fd < 0) {
595                 if (file->fd == FDB_RESULT_NO_SUCH_FILE) {
596                     // A database file was manually deleted by the user.
597                     // Clean up global hash table, WAL index, and buffer cache.
598                     // Then, retry it with a create option below IFF it is not
599                     // a read-only open attempt
600                     struct hash_elem *ret;
601                     spin_unlock(&file->lock);
602                     ret = hash_remove(&hash, &file->e);
603                     fdb_assert(ret, 0, 0);
604                     filemgr_free_func(&file->e);
605                     if (!create) {
606                         _log_errno_str(ops, log_callback,
607                                 FDB_RESULT_NO_SUCH_FILE, "OPEN", filename);
608                         spin_unlock(&filemgr_openlock);
609                         result.rv = FDB_RESULT_NO_SUCH_FILE;
610                         return result;
611                     }
612                 } else {
613                     _log_errno_str(file->ops, log_callback,
614                                   (fdb_status)file->fd, "OPEN", filename);
615                     file->ref_count--;
616                     spin_unlock(&file->lock);
617                     spin_unlock(&filemgr_openlock);
618                     result.rv = file->fd;
619                     return result;
620                 }
621             } else { // Reopening the closed file is succeed.
622                 atomic_store_uint8_t(&file->status, FILE_NORMAL);
623                 if (config->options & FILEMGR_SYNC) {
624                     file->fflags |= FILEMGR_SYNC;
625                 } else {
626                     file->fflags &= ~FILEMGR_SYNC;
627                 }
628                 spin_unlock(&file->lock);
629                 spin_unlock(&filemgr_openlock);
630                 result.file = file;
631                 result.rv = FDB_RESULT_SUCCESS;
632                 return result;
633             }
634         } else { // file is already opened.
635 
636             if (config->options & FILEMGR_SYNC) {
637                 file->fflags |= FILEMGR_SYNC;
638             } else {
639                 file->fflags &= ~FILEMGR_SYNC;
640             }
641 
642             spin_unlock(&file->lock);
643             spin_unlock(&filemgr_openlock);
644             result.file = file;
645             result.rv = FDB_RESULT_SUCCESS;
646             return result;
647         }
648     }
649 
650     file_flag = O_RDWR;
651     if (create) {
652         file_flag |= O_CREAT;
653     }
654     file_flag |= config->flag;
655     fd = ops->open(filename, file_flag, 0666);
656     if (fd < 0) {
657         _log_errno_str(ops, log_callback, (fdb_status)fd, "OPEN", filename);
658         spin_unlock(&filemgr_openlock);
659         result.rv = fd;
660         return result;
661     }
662     file = (struct filemgr*)calloc(1, sizeof(struct filemgr));
663     file->filename_len = strlen(filename);
664     file->filename = (char*)malloc(file->filename_len + 1);
665     strcpy(file->filename, filename);
666 
667     file->ref_count = 1;
668 
669     file->wal = (struct wal *)calloc(1, sizeof(struct wal));
670     file->wal->flag = 0;
671 
672     file->ops = ops;
673     file->blocksize = global_config.blocksize;
674     atomic_init_uint8_t(&file->status, FILE_NORMAL);
675     file->config = (struct filemgr_config*)malloc(sizeof(struct filemgr_config));
676     *file->config = *config;
677     file->config->blocksize = global_config.blocksize;
678     file->config->ncacheblock = global_config.ncacheblock;
679     file->new_file = NULL;
680     file->old_filename = NULL;
681     file->fd = fd;
682 
683     cs_off_t offset = file->ops->goto_eof(file->fd);
684     if (offset == FDB_RESULT_SEEK_FAIL) {
685         _log_errno_str(file->ops, log_callback, FDB_RESULT_SEEK_FAIL, "SEEK_END", filename);
686         file->ops->close(file->fd);
687         free(file->wal);
688         free(file->filename);
689         free(file->config);
690         free(file);
691         spin_unlock(&filemgr_openlock);
692         result.rv = FDB_RESULT_SEEK_FAIL;
693         return result;
694     }
695     atomic_init_uint64_t(&file->last_commit, offset);
696     atomic_init_uint64_t(&file->pos, offset);
697     atomic_init_uint32_t(&file->throttling_delay, 0);
698 
699     file->bcache = NULL;
700     file->in_place_compaction = false;
701     file->kv_header = NULL;
702     file->prefetch_status = FILEMGR_PREFETCH_IDLE;
703 
704     atomic_init_uint64_t(&file->header.bid, 0);
705     atomic_init_uint64_t(&file->header.dirty_idtree_root, 0);
706     atomic_init_uint64_t(&file->header.dirty_seqtree_root, 0);
707     _init_op_stats(&file->header.op_stat);
708     status = _filemgr_read_header(file, log_callback);
709     if (status != FDB_RESULT_SUCCESS) {
710         _log_errno_str(file->ops, log_callback, status, "READ", filename);
711         file->ops->close(file->fd);
712         free(file->wal);
713         free(file->filename);
714         free(file->config);
715         free(file);
716         spin_unlock(&filemgr_openlock);
717         result.rv = status;
718         return result;
719     }
720 
721     spin_init(&file->lock);
722 
723 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
724     struct plock_ops pops;
725     struct plock_config pconfig;
726 
727     pops.init_user = mutex_init_wrap;
728     pops.lock_user = mutex_lock_wrap;
729     pops.unlock_user = mutex_unlock_wrap;
730     pops.destroy_user = mutex_destroy_wrap;
731     pops.init_internal = spin_init_wrap;
732     pops.lock_internal = spin_lock_wrap;
733     pops.unlock_internal = spin_unlock_wrap;
734     pops.destroy_internal = spin_destroy_wrap;
735     pops.is_overlapped = _block_is_overlapped;
736 
737     memset(&pconfig, 0x0, sizeof(pconfig));
738     pconfig.ops = &pops;
739     pconfig.sizeof_lock_internal = sizeof(spin_t);
740     pconfig.sizeof_lock_user = sizeof(mutex_t);
741     pconfig.sizeof_range = sizeof(bid_t);
742     pconfig.aux = NULL;
743     plock_init(&file->plock, &pconfig);
744 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
745     int i;
746     for (i=0;i<DLOCK_MAX;++i) {
747         mutex_init(&file->data_mutex[i]);
748     }
749 #else
750     int i;
751     for (i=0;i<DLOCK_MAX;++i) {
752         spin_init(&file->data_spinlock[i]);
753     }
754 #endif //__FILEMGR_DATA_PARTIAL_LOCK
755 
756     mutex_init(&file->writer_lock.mutex);
757     file->writer_lock.locked = false;
758 
759     // initialize WAL
760     if (!wal_is_initialized(file)) {
761         wal_init(file, FDB_WAL_NBUCKET);
762     }
763 
764     // init global transaction for the file
765     file->global_txn.wrapper = (struct wal_txn_wrapper*)
766                                malloc(sizeof(struct wal_txn_wrapper));
767     file->global_txn.wrapper->txn = &file->global_txn;
768     file->global_txn.handle = NULL;
769     if (atomic_get_uint64_t(&file->pos)) {
770         file->global_txn.prev_hdr_bid =
771             (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1;
772     } else {
773         file->global_txn.prev_hdr_bid = BLK_NOT_FOUND;
774     }
775     file->global_txn.items = (struct list *)malloc(sizeof(struct list));
776     list_init(file->global_txn.items);
777     file->global_txn.isolation = FDB_ISOLATION_READ_COMMITTED;
778     wal_add_transaction(file, &file->global_txn);
779 
780     hash_insert(&hash, &file->e);
781     if (config->prefetch_duration > 0) {
782         filemgr_prefetch(file, config, log_callback);
783     }
784     spin_unlock(&filemgr_openlock);
785 
786     if (config->options & FILEMGR_SYNC) {
787         file->fflags |= FILEMGR_SYNC;
788     } else {
789         file->fflags &= ~FILEMGR_SYNC;
790     }
791 
792     result.file = file;
793     result.rv = FDB_RESULT_SUCCESS;
794     return result;
795 }
796 
filemgr_update_header(struct filemgr *file, void *buf, size_t len)797 uint64_t filemgr_update_header(struct filemgr *file, void *buf, size_t len)
798 {
799     uint64_t ret;
800 
801     spin_lock(&file->lock);
802 
803     if (file->header.data == NULL) {
804         file->header.data = (void *)malloc(len);
805     }else if (file->header.size < len){
806         file->header.data = (void *)realloc(file->header.data, len);
807     }
808     memcpy(file->header.data, buf, len);
809     file->header.size = len;
810     ++(file->header.revnum);
811     ret = file->header.revnum;
812 
813     spin_unlock(&file->lock);
814 
815     return ret;
816 }
817 
filemgr_get_header_revnum(struct filemgr *file)818 filemgr_header_revnum_t filemgr_get_header_revnum(struct filemgr *file)
819 {
820     filemgr_header_revnum_t ret;
821     spin_lock(&file->lock);
822     ret = file->header.revnum;
823     spin_unlock(&file->lock);
824     return ret;
825 }
826 
827 // 'filemgr_get_seqnum' & 'filemgr_set_seqnum' have to be protected by
828 // 'filemgr_mutex_lock' & 'filemgr_mutex_unlock'.
filemgr_get_seqnum(struct filemgr *file)829 fdb_seqnum_t filemgr_get_seqnum(struct filemgr *file)
830 {
831     return file->header.seqnum;
832 }
833 
filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)834 void filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)
835 {
836     file->header.seqnum = seqnum;
837 }
838 
filemgr_get_header(struct filemgr *file, void *buf, size_t *len, bid_t *header_bid, fdb_seqnum_t *seqnum, filemgr_header_revnum_t *header_revnum)839 void* filemgr_get_header(struct filemgr *file, void *buf, size_t *len,
840                          bid_t *header_bid, fdb_seqnum_t *seqnum,
841                          filemgr_header_revnum_t *header_revnum)
842 {
843     spin_lock(&file->lock);
844 
845     if (file->header.size > 0) {
846         if (buf == NULL) {
847             buf = (void*)malloc(file->header.size);
848         }
849         memcpy(buf, file->header.data, file->header.size);
850     }
851 
852     if (len) {
853         *len = file->header.size;
854     }
855     if (header_bid) {
856         *header_bid = ((file->header.size > 0) ?
857                        atomic_get_uint64_t(&file->header.bid) : BLK_NOT_FOUND);
858     }
859     if (seqnum) {
860         *seqnum = file->header.seqnum;
861     }
862     if (header_revnum) {
863         *header_revnum = file->header.revnum;
864     }
865 
866     spin_unlock(&file->lock);
867 
868     return buf;
869 }
870 
filemgr_fetch_header(struct filemgr *file, uint64_t bid, void *buf, size_t *len, fdb_seqnum_t *seqnum, filemgr_header_revnum_t *header_revnum, err_log_callback *log_callback)871 fdb_status filemgr_fetch_header(struct filemgr *file, uint64_t bid,
872                                 void *buf, size_t *len, fdb_seqnum_t *seqnum,
873                                 filemgr_header_revnum_t *header_revnum,
874                                 err_log_callback *log_callback)
875 {
876     uint8_t *_buf;
877     uint8_t marker[BLK_MARKER_SIZE];
878     filemgr_header_len_t hdr_len;
879     filemgr_magic_t magic;
880     fdb_status status = FDB_RESULT_SUCCESS;
881 
882     if (!bid || bid == BLK_NOT_FOUND) {
883         *len = 0; // No other header available
884         return FDB_RESULT_SUCCESS;
885     }
886     _buf = (uint8_t *)_filemgr_get_temp_buf();
887 
888     status = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
889 
890     if (status != FDB_RESULT_SUCCESS) {
891         fdb_log(log_callback, status,
892                 "Failed to read a database header with block id %" _F64 " in "
893                 "a database file '%s'", bid, file->filename);
894         _filemgr_release_temp_buf(_buf);
895         return status;
896     }
897     memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
898             BLK_MARKER_SIZE);
899 
900     if (marker[0] != BLK_MARKER_DBHEADER) {
901         fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
902                 "A block marker of the database header block id %" _F64 " in "
903                 "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
904                 bid, file->filename);
905         _filemgr_release_temp_buf(_buf);
906         return FDB_RESULT_READ_FAIL;
907     }
908     memcpy(&magic,
909             _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
910             sizeof(magic));
911     magic = _endian_decode(magic);
912     if (magic != FILEMGR_MAGIC) {
913         fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
914                 "A block magic value of the database header block id %" _F64 " in "
915                 "a database file '%s' does NOT match FILEMGR_MAGIC!",
916                 bid, file->filename);
917         _filemgr_release_temp_buf(_buf);
918         return FDB_RESULT_READ_FAIL;
919     }
920     memcpy(&hdr_len,
921             _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
922             sizeof(hdr_len), sizeof(hdr_len));
923     hdr_len = _endian_decode(hdr_len);
924 
925     memcpy(buf, _buf, hdr_len);
926     *len = hdr_len;
927 
928     if (header_revnum) {
929         // copy the DB header revnum
930         filemgr_header_revnum_t _revnum;
931         memcpy(&_revnum, _buf + hdr_len, sizeof(_revnum));
932         *header_revnum = _endian_decode(_revnum);
933     }
934     if (seqnum) {
935         // copy default KVS's seqnum
936         fdb_seqnum_t _seqnum;
937         memcpy(&_seqnum, _buf + hdr_len + sizeof(filemgr_header_revnum_t),
938                sizeof(_seqnum));
939         *seqnum = _endian_decode(_seqnum);
940     }
941 
942     _filemgr_release_temp_buf(_buf);
943 
944     return status;
945 }
946 
filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid, void *buf, size_t *len, fdb_seqnum_t *seqnum, err_log_callback *log_callback)947 uint64_t filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid,
948                                    void *buf, size_t *len, fdb_seqnum_t *seqnum,
949                                    err_log_callback *log_callback)
950 {
951     uint8_t *_buf;
952     uint8_t marker[BLK_MARKER_SIZE];
953     fdb_seqnum_t _seqnum;
954     filemgr_header_revnum_t _revnum;
955     filemgr_header_len_t hdr_len;
956     filemgr_magic_t magic;
957     bid_t _prev_bid, prev_bid;
958     int found = 0;
959 
960     if (!bid || bid == BLK_NOT_FOUND) {
961         *len = 0; // No other header available
962         return bid;
963     }
964     _buf = (uint8_t *)_filemgr_get_temp_buf();
965 
966     // Reverse scan the file for a previous DB header
967     do {
968         // Get prev_bid from the current header.
969         // Since the current header is already cached during the previous
970         // operation, no disk I/O will be triggered.
971         if (filemgr_read(file, (bid_t)bid, _buf, log_callback, true)
972                 != FDB_RESULT_SUCCESS) {
973             break;
974         }
975 
976         memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
977                BLK_MARKER_SIZE);
978         memcpy(&magic,
979                _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
980                sizeof(magic));
981         magic = _endian_decode(magic);
982 
983         if (marker[0] != BLK_MARKER_DBHEADER ||
984             magic != FILEMGR_MAGIC) {
985             // not a header block
986             // this happens when this function is invoked between
987             // fdb_set() call and fdb_commit() call, so the last block
988             // in the file is not a header block
989             bid_t latest_hdr = filemgr_get_header_bid(file);
990             if (latest_hdr != BLK_NOT_FOUND && bid > latest_hdr) {
991                 // get the latest header BID
992                 bid = latest_hdr;
993             } else {
994                 break;
995             }
996         } else {
997             memcpy(&_prev_bid,
998                    _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
999                        sizeof(hdr_len) - sizeof(_prev_bid),
1000                    sizeof(_prev_bid));
1001             prev_bid = _endian_decode(_prev_bid);
1002             if (bid <= prev_bid) {
1003                 // no more prev header, or broken linked list
1004                 break;
1005             }
1006             bid = prev_bid;
1007         }
1008 
1009         // Read the prev header
1010         fdb_status fs = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
1011         if (fs != FDB_RESULT_SUCCESS) {
1012             fdb_log(log_callback, fs,
1013                     "Failed to read a previous database header with block id %" _F64 " in "
1014                     "a database file '%s'", bid, file->filename);
1015             break;
1016         }
1017 
1018         memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1019                BLK_MARKER_SIZE);
1020         if (marker[0] != BLK_MARKER_DBHEADER) {
1021             if (bid) {
1022                 // broken linked list
1023                 fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1024                         "A block marker of the previous database header block id %"
1025                         _F64 " in "
1026                         "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1027                         bid, file->filename);
1028             }
1029             break;
1030         }
1031 
1032         memcpy(&magic,
1033                _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1034                sizeof(magic));
1035         magic = _endian_decode(magic);
1036         if (magic != FILEMGR_MAGIC) {
1037             // broken linked list
1038             fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1039                     "A block magic value of the previous database header block id %" _F64 " in "
1040                     "a database file '%s' does NOT match FILEMGR_MAGIC!",
1041                     bid, file->filename);
1042             break;
1043         }
1044 
1045         memcpy(&hdr_len,
1046                _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1047                sizeof(hdr_len), sizeof(hdr_len));
1048         hdr_len = _endian_decode(hdr_len);
1049 
1050         if (buf) {
1051             memcpy(buf, _buf, hdr_len);
1052         }
1053         memcpy(&_revnum, _buf + hdr_len,
1054                sizeof(filemgr_header_revnum_t));
1055         memcpy(&_seqnum,
1056                _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1057                sizeof(fdb_seqnum_t));
1058         *seqnum = _endian_decode(_seqnum);
1059         *len = hdr_len;
1060         found = 1;
1061         break;
1062     } while (false); // no repetition
1063 
1064     if (!found) { // no other header found till end of file
1065         *len = 0;
1066     }
1067 
1068     _filemgr_release_temp_buf(_buf);
1069 
1070     return bid;
1071 }
1072 
filemgr_close(struct filemgr *file, bool cleanup_cache_onclose, const char *orig_file_name, err_log_callback *log_callback)1073 fdb_status filemgr_close(struct filemgr *file, bool cleanup_cache_onclose,
1074                          const char *orig_file_name,
1075                          err_log_callback *log_callback)
1076 {
1077     int rv = FDB_RESULT_SUCCESS;
1078 
1079     spin_lock(&filemgr_openlock); // Grab the filemgr lock to avoid the race with
1080                                   // filemgr_open() because file->lock won't
1081                                   // prevent the race condition.
1082 
1083     // remove filemgr structure if no thread refers to the file
1084     spin_lock(&file->lock);
1085     if (--(file->ref_count) == 0) {
1086         if (global_config.ncacheblock > 0 &&
1087             atomic_get_uint8_t(&file->status) != FILE_REMOVED_PENDING) {
1088             spin_unlock(&file->lock);
1089             // discard all dirty blocks belonged to this file
1090             bcache_remove_dirty_blocks(file);
1091         } else {
1092             // If the file is in pending removal (i.e., FILE_REMOVED_PENDING),
1093             // then its dirty block entries will be cleaned up in either
1094             // filemgr_free_func() or register_file_removal() below.
1095             spin_unlock(&file->lock);
1096         }
1097 
1098         if (wal_is_initialized(file)) {
1099             wal_close(file);
1100         }
1101 
1102         spin_lock(&file->lock);
1103         rv = file->ops->close(file->fd);
1104         if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING) {
1105             _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1106 
1107             bool foreground_deletion = false;
1108 
1109             // immediately remove file if background remove function is not set
1110             if (!lazy_file_deletion_enabled ||
1111                 (file->new_file && file->new_file->in_place_compaction)) {
1112                 // TODO: to avoid the scenario below, we prevent background
1113                 //       deletion of in-place compacted files at this time.
1114                 // 1) In-place compacted from 'A' to 'A.1'.
1115                 // 2) Request to delete 'A'.
1116                 // 3) Close 'A.1'; since 'A' is not deleted yet, 'A.1' is not renamed.
1117                 // 4) User opens DB file using its original name 'A', not 'A.1'.
1118                 // 5) Old file 'A' is opened, and then background thread deletes 'A'.
1119                 // 6) Crash!
1120                 remove(file->filename);
1121                 foreground_deletion = true;
1122             }
1123 
1124             // we can release lock becuase no one will open this file
1125             spin_unlock(&file->lock);
1126             struct hash_elem *ret = hash_remove(&hash, &file->e);
1127             fdb_assert(ret, 0, 0);
1128             spin_unlock(&filemgr_openlock);
1129 
1130             if (foreground_deletion) {
1131                 filemgr_free_func(&file->e);
1132             } else {
1133                 register_file_removal(file, log_callback);
1134             }
1135             return (fdb_status) rv;
1136         } else {
1137             if (cleanup_cache_onclose) {
1138                 _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1139                 if (file->in_place_compaction && orig_file_name) {
1140                     struct hash_elem *elem = NULL;
1141                     struct filemgr query;
1142                     uint32_t old_file_refcount = 0;
1143 
1144                     query.filename = (char *)orig_file_name;
1145                     elem = hash_find(&hash, &query.e);
1146 
1147                     if (file->old_filename) {
1148                         struct hash_elem *elem_old = NULL;
1149                         struct filemgr query_old;
1150                         struct filemgr *old_file = NULL;
1151 
1152                         // get old file's ref count if exists
1153                         query_old.filename = file->old_filename;
1154                         elem_old = hash_find(&hash, &query_old.e);
1155                         if (elem_old) {
1156                             old_file = _get_entry(elem_old, struct filemgr, e);
1157                             old_file_refcount = old_file->ref_count;
1158                         }
1159                     }
1160 
1161                     // If old file is opened by other handle, renaming should be
1162                     // postponed. It will be renamed later by the handle referring
1163                     // to the old file.
1164                     if (!elem && old_file_refcount == 0 &&
1165                         is_file_removed(orig_file_name)) {
1166                         // If background file removal is not done yet, we postpone
1167                         // file renaming at this time.
1168                         if (rename(file->filename, orig_file_name) < 0) {
1169                             // Note that the renaming failure is not a critical
1170                             // issue because the last compacted file will be automatically
1171                             // identified and opened in the next fdb_open call.
1172                             _log_errno_str(file->ops, log_callback, FDB_RESULT_FILE_RENAME_FAIL,
1173                                            "CLOSE", file->filename);
1174                         }
1175                     }
1176                 }
1177                 spin_unlock(&file->lock);
1178                 // Clean up global hash table, WAL index, and buffer cache.
1179                 struct hash_elem *ret = hash_remove(&hash, &file->e);
1180                 fdb_assert(ret, file, 0);
1181                 spin_unlock(&filemgr_openlock);
1182                 filemgr_free_func(&file->e);
1183                 return (fdb_status) rv;
1184             } else {
1185                 atomic_store_uint8_t(&file->status, FILE_CLOSED);
1186             }
1187         }
1188     }
1189 
1190     _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1191 
1192     spin_unlock(&file->lock);
1193     spin_unlock(&filemgr_openlock);
1194     return (fdb_status) rv;
1195 }
1196 
filemgr_remove_all_buffer_blocks(struct filemgr *file)1197 void filemgr_remove_all_buffer_blocks(struct filemgr *file)
1198 {
1199     // remove all cached blocks
1200     if (global_config.ncacheblock > 0 && file->bcache) {
1201         bcache_remove_dirty_blocks(file);
1202         bcache_remove_clean_blocks(file);
1203         bcache_remove_file(file);
1204         file->bcache = NULL;
1205     }
1206 }
1207 
filemgr_free_func(struct hash_elem *h)1208 void filemgr_free_func(struct hash_elem *h)
1209 {
1210     struct filemgr *file = _get_entry(h, struct filemgr, e);
1211 
1212     spin_lock(&file->lock);
1213     if (file->prefetch_status == FILEMGR_PREFETCH_RUNNING) {
1214         // prefetch thread is running
1215         void *ret;
1216         file->prefetch_status = FILEMGR_PREFETCH_ABORT;
1217         spin_unlock(&file->lock);
1218         // wait
1219         thread_join(file->prefetch_tid, &ret);
1220     } else {
1221         spin_unlock(&file->lock);
1222     }
1223 
1224     // remove all cached blocks
1225     if (global_config.ncacheblock > 0 && file->bcache) {
1226         bcache_remove_dirty_blocks(file);
1227         bcache_remove_clean_blocks(file);
1228         bcache_remove_file(file);
1229         file->bcache = NULL;
1230     }
1231 
1232     if (file->kv_header) {
1233         // multi KV intance mode & KV header exists
1234         file->free_kv_header(file);
1235     }
1236 
1237     // free global transaction
1238     wal_remove_transaction(file, &file->global_txn);
1239     free(file->global_txn.items);
1240     free(file->global_txn.wrapper);
1241 
1242     // destroy WAL
1243     if (wal_is_initialized(file)) {
1244         wal_shutdown(file);
1245         size_t i = 0;
1246         size_t num_all_shards = wal_get_num_all_shards(file);
1247         // Free all WAL shards (including compactor's shard)
1248         for (; i < num_all_shards; ++i) {
1249             hash_free(&file->wal->key_shards[i].hash_bykey);
1250             spin_destroy(&file->wal->key_shards[i].lock);
1251             hash_free(&file->wal->seq_shards[i].hash_byseq);
1252             spin_destroy(&file->wal->seq_shards[i].lock);
1253         }
1254         spin_destroy(&file->wal->lock);
1255         atomic_destroy_uint32_t(&file->wal->size);
1256         atomic_destroy_uint32_t(&file->wal->num_flushable);
1257         atomic_destroy_uint64_t(&file->wal->datasize);
1258         free(file->wal->key_shards);
1259         free(file->wal->seq_shards);
1260     }
1261     free(file->wal);
1262 
1263     // free filename and header
1264     free(file->filename);
1265     if (file->header.data) free(file->header.data);
1266     // free old filename if any
1267     free(file->old_filename);
1268 
1269     // destroy locks
1270     spin_destroy(&file->lock);
1271 
1272 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1273     plock_destroy(&file->plock);
1274 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1275     int i;
1276     for (i=0;i<DLOCK_MAX;++i) {
1277         mutex_destroy(&file->data_mutex[i]);
1278     }
1279 #else
1280     int i;
1281     for (i=0;i<DLOCK_MAX;++i) {
1282         spin_destroy(&file->data_spinlock[i]);
1283     }
1284 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1285 
1286     mutex_destroy(&file->writer_lock.mutex);
1287 
1288     atomic_destroy_uint64_t(&file->pos);
1289     atomic_destroy_uint64_t(&file->last_commit);
1290     atomic_destroy_uint32_t(&file->throttling_delay);
1291 
1292     // free file structure
1293     free(file->config);
1294     free(file);
1295 }
1296 
1297 // permanently remove file from cache (not just close)
1298 // LCOV_EXCL_START
filemgr_remove_file(struct filemgr *file)1299 void filemgr_remove_file(struct filemgr *file)
1300 {
1301     struct hash_elem *ret;
1302 
1303     fdb_assert(file, file, NULL);
1304     fdb_assert(file->ref_count <= 0, file->ref_count, 0);
1305 
1306     // remove from global hash table
1307     spin_lock(&filemgr_openlock);
1308     ret = hash_remove(&hash, &file->e);
1309     fdb_assert(ret, ret, NULL);
1310     spin_unlock(&filemgr_openlock);
1311 
1312     if (!lazy_file_deletion_enabled ||
1313         (file->new_file && file->new_file->in_place_compaction)) {
1314         filemgr_free_func(&file->e);
1315     } else {
1316         register_file_removal(file, NULL);
1317     }
1318 }
1319 // LCOV_EXCL_STOP
1320 
1321 static
_filemgr_is_closed(struct hash_elem *h, void *ctx)1322 void *_filemgr_is_closed(struct hash_elem *h, void *ctx) {
1323     struct filemgr *file = _get_entry(h, struct filemgr, e);
1324     void *ret;
1325     spin_lock(&file->lock);
1326     if (file->ref_count != 0) {
1327         ret = (void *)file;
1328     } else {
1329         ret = NULL;
1330     }
1331     spin_unlock(&file->lock);
1332     return ret;
1333 }
1334 
filemgr_shutdown()1335 fdb_status filemgr_shutdown()
1336 {
1337     fdb_status ret = FDB_RESULT_SUCCESS;
1338     void *open_file;
1339     if (filemgr_initialized) {
1340 
1341 #ifndef SPIN_INITIALIZER
1342         // Windows: check if spin lock is already destroyed.
1343         if (InterlockedCompareExchange(&initial_lock_status, 1, 2) == 2) {
1344             spin_lock(&initial_lock);
1345         } else {
1346             // filemgr is already shut down
1347             return ret;
1348         }
1349 #else
1350         spin_lock(&initial_lock);
1351 #endif
1352 
1353         if (!filemgr_initialized) {
1354             // filemgr is already shut down
1355 #ifdef SPIN_INITIALIZER
1356             spin_unlock(&initial_lock);
1357 #endif
1358             return ret;
1359         }
1360 
1361         open_file = hash_scan(&hash, _filemgr_is_closed, NULL);
1362         if (!open_file) {
1363             hash_free_active(&hash, filemgr_free_func);
1364             if (global_config.ncacheblock > 0) {
1365                 bcache_shutdown();
1366             }
1367             filemgr_initialized = 0;
1368 #ifndef SPIN_INITIALIZER
1369             initial_lock_status = 0;
1370 #else
1371             initial_lock = SPIN_INITIALIZER;
1372 #endif
1373             _filemgr_shutdown_temp_buf();
1374             spin_unlock(&initial_lock);
1375 #ifndef SPIN_INITIALIZER
1376             spin_destroy(&initial_lock);
1377 #endif
1378         } else {
1379             spin_unlock(&initial_lock);
1380             ret = FDB_RESULT_FILE_IS_BUSY;
1381         }
1382     }
1383     return ret;
1384 }
1385 
filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)1386 bid_t filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)
1387 {
1388     spin_lock(&file->lock);
1389     bid_t bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1390     atomic_add_uint64_t(&file->pos, file->blocksize);
1391 
1392     if (global_config.ncacheblock <= 0) {
1393         // if block cache is turned off, write the allocated block before use
1394         uint8_t _buf = 0x0;
1395         ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1396                                        atomic_get_uint64_t(&file->pos) - 1);
1397         _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1398     }
1399     spin_unlock(&file->lock);
1400 
1401     return bid;
1402 }
1403 
filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin, bid_t *end, err_log_callback *log_callback)1404 void filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin,
1405                             bid_t *end, err_log_callback *log_callback)
1406 {
1407     spin_lock(&file->lock);
1408     *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1409     *end = *begin + nblock - 1;
1410     atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1411 
1412     if (global_config.ncacheblock <= 0) {
1413         // if block cache is turned off, write the allocated block before use
1414         uint8_t _buf = 0x0;
1415         ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1416                                        atomic_get_uint64_t(&file->pos) - 1);
1417         _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1418     }
1419     spin_unlock(&file->lock);
1420 }
1421 
1422 // atomically allocate NBLOCK blocks only when current file position is same to nextbid
filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock, bid_t *begin, bid_t *end, err_log_callback *log_callback)1423 bid_t filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock,
1424                                   bid_t *begin, bid_t *end,
1425                                   err_log_callback *log_callback)
1426 {
1427     bid_t bid;
1428     spin_lock(&file->lock);
1429     bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1430     if (bid == nextbid) {
1431         *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1432         *end = *begin + nblock - 1;
1433         atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1434 
1435         if (global_config.ncacheblock <= 0) {
1436             // if block cache is turned off, write the allocated block before use
1437             uint8_t _buf = 0x0;
1438             ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1439                                            atomic_get_uint64_t(&file->pos));
1440             _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1441         }
1442     }else{
1443         *begin = BLK_NOT_FOUND;
1444         *end = BLK_NOT_FOUND;
1445     }
1446     spin_unlock(&file->lock);
1447     return bid;
1448 }
1449 
1450 #ifdef __CRC32
_filemgr_crc32_check(struct filemgr *file, void *buf)1451 INLINE fdb_status _filemgr_crc32_check(struct filemgr *file, void *buf)
1452 {
1453     if ( *((uint8_t*)buf + file->blocksize-1) == BLK_MARKER_BNODE ) {
1454         uint32_t crc_file, crc;
1455         memcpy(&crc_file, (uint8_t *) buf + BTREE_CRC_OFFSET, sizeof(crc_file));
1456         crc_file = _endian_decode(crc_file);
1457         memset((uint8_t *) buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1458         crc = chksum(buf, file->blocksize);
1459         if (crc != crc_file) {
1460             return FDB_RESULT_CHECKSUM_ERROR;
1461         }
1462     }
1463     return FDB_RESULT_SUCCESS;
1464 }
1465 #endif
1466 
filemgr_invalidate_block(struct filemgr *file, bid_t bid)1467 void filemgr_invalidate_block(struct filemgr *file, bid_t bid)
1468 {
1469     if (global_config.ncacheblock > 0) {
1470         bcache_invalidate_block(file, bid);
1471     }
1472 }
1473 
filemgr_read(struct filemgr *file, bid_t bid, void *buf, err_log_callback *log_callback, bool read_on_cache_miss)1474 fdb_status filemgr_read(struct filemgr *file, bid_t bid, void *buf,
1475                         err_log_callback *log_callback,
1476                         bool read_on_cache_miss)
1477 {
1478     size_t lock_no;
1479     ssize_t r;
1480     uint64_t pos = bid * file->blocksize;
1481     fdb_status status = FDB_RESULT_SUCCESS;
1482     uint64_t curr_pos = atomic_get_uint64_t(&file->pos);
1483     fdb_assert(pos < curr_pos, pos, curr_pos);
1484 
1485     if (global_config.ncacheblock > 0) {
1486         lock_no = bid % DLOCK_MAX;
1487         (void)lock_no;
1488 
1489 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1490         plock_entry_t *plock_entry = NULL;
1491         bid_t is_writer = 0;
1492 #endif
1493         bool locked = false;
1494         // Note: we don't need to grab lock for committed blocks
1495         // because they are immutable so that no writer will interfere and
1496         // overwrite dirty data
1497         if (filemgr_is_writable(file, bid)) {
1498 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1499             plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1500 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1501             mutex_lock(&file->data_mutex[lock_no]);
1502 #else
1503             spin_lock(&file->data_spinlock[lock_no]);
1504 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1505             locked = true;
1506         }
1507 
1508         r = bcache_read(file, bid, buf);
1509         if (r == 0) {
1510             // cache miss
1511             if (!read_on_cache_miss) {
1512                 if (locked) {
1513 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1514                     plock_unlock(&file->plock, plock_entry);
1515 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1516                     mutex_unlock(&file->data_mutex[lock_no]);
1517 #else
1518                     spin_unlock(&file->data_spinlock[lock_no]);
1519 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1520                 }
1521                 return FDB_RESULT_READ_FAIL;
1522             }
1523 
1524             // if normal file, just read a block
1525             r = file->ops->pread(file->fd, buf, file->blocksize, pos);
1526             if (r != file->blocksize) {
1527                 _log_errno_str(file->ops, log_callback,
1528                                (fdb_status) r, "READ", file->filename);
1529                 if (locked) {
1530 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1531                     plock_unlock(&file->plock, plock_entry);
1532 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1533                     mutex_unlock(&file->data_mutex[lock_no]);
1534 #else
1535                     spin_unlock(&file->data_spinlock[lock_no]);
1536 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1537                 }
1538                 return (fdb_status)r;
1539             }
1540 #ifdef __CRC32
1541             status = _filemgr_crc32_check(file, buf);
1542             if (status != FDB_RESULT_SUCCESS) {
1543                 _log_errno_str(file->ops, log_callback, status, "READ",
1544                         file->filename);
1545                 if (locked) {
1546 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1547                     plock_unlock(&file->plock, plock_entry);
1548 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1549                     mutex_unlock(&file->data_mutex[lock_no]);
1550 #else
1551                     spin_unlock(&file->data_spinlock[lock_no]);
1552 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1553                 }
1554                 return status;
1555             }
1556 #endif
1557             r = bcache_write(file, bid, buf, BCACHE_REQ_CLEAN);
1558             if (r != global_config.blocksize) {
1559                 if (locked) {
1560 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1561                     plock_unlock(&file->plock, plock_entry);
1562 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1563                     mutex_unlock(&file->data_mutex[lock_no]);
1564 #else
1565                     spin_unlock(&file->data_spinlock[lock_no]);
1566 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1567                 }
1568                 _log_errno_str(file->ops, log_callback,
1569                                (fdb_status) r, "WRITE", file->filename);
1570                 return FDB_RESULT_WRITE_FAIL;
1571             }
1572         }
1573         if (locked) {
1574 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1575             plock_unlock(&file->plock, plock_entry);
1576 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1577             mutex_unlock(&file->data_mutex[lock_no]);
1578 #else
1579             spin_unlock(&file->data_spinlock[lock_no]);
1580 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1581         }
1582     } else {
1583         if (!read_on_cache_miss) {
1584             return FDB_RESULT_READ_FAIL;
1585         }
1586 
1587         r = file->ops->pread(file->fd, buf, file->blocksize, pos);
1588         if (r != file->blocksize) {
1589             _log_errno_str(file->ops, log_callback, (fdb_status) r, "READ",
1590                            file->filename);
1591             return (fdb_status)r;
1592         }
1593 
1594 #ifdef __CRC32
1595         status = _filemgr_crc32_check(file, buf);
1596         if (status != FDB_RESULT_SUCCESS) {
1597             _log_errno_str(file->ops, log_callback, status, "READ",
1598                            file->filename);
1599             return status;
1600         }
1601 #endif
1602     }
1603     return status;
1604 }
1605 
filemgr_write_offset(struct filemgr *file, bid_t bid, uint64_t offset, uint64_t len, void *buf, err_log_callback *log_callback)1606 fdb_status filemgr_write_offset(struct filemgr *file, bid_t bid,
1607                                 uint64_t offset, uint64_t len, void *buf,
1608                                 err_log_callback *log_callback)
1609 {
1610     fdb_assert(offset + len <= file->blocksize, offset + len, file);
1611 
1612     size_t lock_no;
1613     ssize_t r = 0;
1614     uint64_t pos = bid * file->blocksize + offset;
1615     uint64_t curr_commit_pos = atomic_get_uint64_t(&file->last_commit);
1616     fdb_assert(pos >= curr_commit_pos, pos, curr_commit_pos);
1617 
1618     if (global_config.ncacheblock > 0) {
1619         lock_no = bid % DLOCK_MAX;
1620         (void)lock_no;
1621 
1622         bool locked = false;
1623 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1624         plock_entry_t *plock_entry;
1625         bid_t is_writer = 1;
1626         plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1627 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1628         mutex_lock(&file->data_mutex[lock_no]);
1629 #else
1630         spin_lock(&file->data_spinlock[lock_no]);
1631 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1632         locked = true;
1633 
1634         if (len == file->blocksize) {
1635             // write entire block .. we don't need to read previous block
1636             r = bcache_write(file, bid, buf, BCACHE_REQ_DIRTY);
1637             if (r != global_config.blocksize) {
1638                 if (locked) {
1639 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1640                     plock_unlock(&file->plock, plock_entry);
1641 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1642                     mutex_unlock(&file->data_mutex[lock_no]);
1643 #else
1644                     spin_unlock(&file->data_spinlock[lock_no]);
1645 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1646                 }
1647                 _log_errno_str(file->ops, log_callback,
1648                                (fdb_status) r, "WRITE", file->filename);
1649                 return FDB_RESULT_WRITE_FAIL;
1650             }
1651         } else {
1652             // partially write buffer cache first
1653             r = bcache_write_partial(file, bid, buf, offset, len);
1654             if (r == 0) {
1655                 // cache miss
1656                 // write partially .. we have to read previous contents of the block
1657                 uint64_t cur_file_pos = file->ops->goto_eof(file->fd);
1658                 bid_t cur_file_last_bid = cur_file_pos / file->blocksize;
1659                 void *_buf = _filemgr_get_temp_buf();
1660 
1661                 if (bid >= cur_file_last_bid) {
1662                     // this is the first time to write this block
1663                     // we don't need to read previous block from file.
1664                 } else {
1665                     r = file->ops->pread(file->fd, _buf, file->blocksize,
1666                                          bid * file->blocksize);
1667                     if (r != file->blocksize) {
1668                         if (locked) {
1669 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1670                             plock_unlock(&file->plock, plock_entry);
1671 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1672                             mutex_unlock(&file->data_mutex[lock_no]);
1673 #else
1674                             spin_unlock(&file->data_spinlock[lock_no]);
1675 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1676                         }
1677                         _filemgr_release_temp_buf(_buf);
1678                         _log_errno_str(file->ops, log_callback, (fdb_status) r,
1679                                        "READ", file->filename);
1680                         return FDB_RESULT_READ_FAIL;
1681                     }
1682                 }
1683                 memcpy((uint8_t *)_buf + offset, buf, len);
1684                 r = bcache_write(file, bid, _buf, BCACHE_REQ_DIRTY);
1685                 if (r != global_config.blocksize) {
1686                     if (locked) {
1687 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1688                         plock_unlock(&file->plock, plock_entry);
1689 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1690                         mutex_unlock(&file->data_mutex[lock_no]);
1691 #else
1692                         spin_unlock(&file->data_spinlock[lock_no]);
1693 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1694                     }
1695                     _filemgr_release_temp_buf(_buf);
1696                     _log_errno_str(file->ops, log_callback,
1697                             (fdb_status) r, "WRITE", file->filename);
1698                     return FDB_RESULT_WRITE_FAIL;
1699                 }
1700 
1701                 _filemgr_release_temp_buf(_buf);
1702             } // cache miss
1703         } // full block or partial block
1704 
1705         if (locked) {
1706 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1707             plock_unlock(&file->plock, plock_entry);
1708 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1709             mutex_unlock(&file->data_mutex[lock_no]);
1710 #else
1711             spin_unlock(&file->data_spinlock[lock_no]);
1712 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1713         }
1714 
1715     } else { // block cache disabled
1716 
1717 #ifdef __CRC32
1718         if (len == file->blocksize) {
1719             uint8_t marker = *((uint8_t*)buf + file->blocksize - 1);
1720             if (marker == BLK_MARKER_BNODE) {
1721                 memset((uint8_t *)buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1722                 uint32_t crc32 = chksum(buf, file->blocksize);
1723                 crc32 = _endian_encode(crc32);
1724                 memcpy((uint8_t *)buf + BTREE_CRC_OFFSET, &crc32, sizeof(crc32));
1725             }
1726         }
1727 #endif
1728 
1729         r = file->ops->pwrite(file->fd, buf, len, pos);
1730         _log_errno_str(file->ops, log_callback, (fdb_status) r, "WRITE", file->filename);
1731         if ((uint64_t)r != len) {
1732             return FDB_RESULT_WRITE_FAIL;
1733         }
1734     } // block cache check
1735     return FDB_RESULT_SUCCESS;
1736 }
1737 
filemgr_write(struct filemgr *file, bid_t bid, void *buf, err_log_callback *log_callback)1738 fdb_status filemgr_write(struct filemgr *file, bid_t bid, void *buf,
1739                    err_log_callback *log_callback)
1740 {
1741     return filemgr_write_offset(file, bid, 0, file->blocksize, buf,
1742                                 log_callback);
1743 }
1744 
filemgr_commit(struct filemgr *file, err_log_callback *log_callback)1745 fdb_status filemgr_commit(struct filemgr *file,
1746                           err_log_callback *log_callback)
1747 {
1748     uint16_t header_len = file->header.size;
1749     uint16_t _header_len;
1750     bid_t _prev_bid;
1751     fdb_seqnum_t _seqnum;
1752     filemgr_header_revnum_t _revnum;
1753     int result = FDB_RESULT_SUCCESS;
1754     filemgr_magic_t magic = FILEMGR_MAGIC;
1755     filemgr_magic_t _magic;
1756 
1757     if (global_config.ncacheblock > 0) {
1758         result = bcache_flush(file);
1759         if (result != FDB_RESULT_SUCCESS) {
1760             _log_errno_str(file->ops, log_callback, (fdb_status) result,
1761                            "FLUSH", file->filename);
1762             return (fdb_status)result;
1763         }
1764     }
1765 
1766     spin_lock(&file->lock);
1767 
1768     if (file->header.size > 0 && file->header.data) {
1769         void *buf = _filemgr_get_temp_buf();
1770         uint8_t marker[BLK_MARKER_SIZE];
1771 
1772         // [header data]:        'header_len' bytes   <---+
1773         // [header revnum]:      8 bytes                  |
1774         // [default KVS seqnum]: 8 bytes                  |
1775         // ...                                            |
1776         // (empty)                                    blocksize
1777         // ...                                            |
1778         // [prev header bid]:    8 bytes                  |
1779         // [header length]:      2 bytes                  |
1780         // [magic number]:       8 bytes                  |
1781         // [block marker]:       1 byte               <---+
1782 
1783         // header data
1784         memcpy(buf, file->header.data, header_len);
1785         // header rev number
1786         _revnum = _endian_encode(file->header.revnum);
1787         memcpy((uint8_t *)buf + header_len, &_revnum,
1788                sizeof(filemgr_header_revnum_t));
1789         // file's sequence number (default KVS seqnum)
1790         _seqnum = _endian_encode(file->header.seqnum);
1791         memcpy((uint8_t *)buf + header_len + sizeof(filemgr_header_revnum_t),
1792                &_seqnum, sizeof(fdb_seqnum_t));
1793 
1794         // prev header bid
1795         _prev_bid = _endian_encode(atomic_get_uint64_t(&file->header.bid));
1796         memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1797                - sizeof(header_len) - sizeof(_prev_bid) - BLK_MARKER_SIZE),
1798                &_prev_bid, sizeof(_prev_bid));
1799         // header length
1800         _header_len = _endian_encode(header_len);
1801         memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1802                - sizeof(header_len) - BLK_MARKER_SIZE),
1803                &_header_len, sizeof(header_len));
1804         // magic number
1805         _magic = _endian_encode(magic);
1806         memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1807                - BLK_MARKER_SIZE), &_magic, sizeof(magic));
1808 
1809         // marker
1810         memset(marker, BLK_MARKER_DBHEADER, BLK_MARKER_SIZE);
1811         memcpy((uint8_t *)buf + file->blocksize - BLK_MARKER_SIZE,
1812                marker, BLK_MARKER_SIZE);
1813 
1814         ssize_t rv = file->ops->pwrite(file->fd, buf, file->blocksize,
1815                                        atomic_get_uint64_t(&file->pos));
1816         _log_errno_str(file->ops, log_callback, (fdb_status) rv,
1817                        "WRITE", file->filename);
1818         if (rv != file->blocksize) {
1819             _filemgr_release_temp_buf(buf);
1820             spin_unlock(&file->lock);
1821             return FDB_RESULT_WRITE_FAIL;
1822         }
1823         atomic_store_uint64_t(&file->header.bid,
1824                               atomic_get_uint64_t(&file->pos) / file->blocksize);
1825         atomic_add_uint64_t(&file->pos, file->blocksize);
1826 
1827         atomic_store_uint64_t(&file->header.dirty_idtree_root, BLK_NOT_FOUND);
1828         atomic_store_uint64_t(&file->header.dirty_seqtree_root, BLK_NOT_FOUND);
1829 
1830         _filemgr_release_temp_buf(buf);
1831     }
1832     // race condition?
1833     atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
1834 
1835     spin_unlock(&file->lock);
1836 
1837     if (file->fflags & FILEMGR_SYNC) {
1838         result = file->ops->fsync(file->fd);
1839         _log_errno_str(file->ops, log_callback, (fdb_status)result, "FSYNC", file->filename);
1840     }
1841     return (fdb_status) result;
1842 }
1843 
filemgr_sync(struct filemgr *file, err_log_callback *log_callback)1844 fdb_status filemgr_sync(struct filemgr *file, err_log_callback *log_callback)
1845 {
1846     fdb_status result = FDB_RESULT_SUCCESS;
1847     if (global_config.ncacheblock > 0) {
1848         result = bcache_flush(file);
1849         if (result != FDB_RESULT_SUCCESS) {
1850             _log_errno_str(file->ops, log_callback, (fdb_status) result,
1851                            "FLUSH", file->filename);
1852             return result;
1853         }
1854     }
1855 
1856     if (file->fflags & FILEMGR_SYNC) {
1857         int rv = file->ops->fsync(file->fd);
1858         _log_errno_str(file->ops, log_callback, (fdb_status)rv, "FSYNC", file->filename);
1859         return (fdb_status) rv;
1860     }
1861     return result;
1862 }
1863 
filemgr_update_file_status(struct filemgr *file, file_status_t status, char *old_filename)1864 int filemgr_update_file_status(struct filemgr *file, file_status_t status,
1865                                 char *old_filename)
1866 {
1867     int ret = 1;
1868     spin_lock(&file->lock);
1869     atomic_store_uint8_t(&file->status, status);
1870     if (old_filename) {
1871         if (!file->old_filename) {
1872             file->old_filename = old_filename;
1873         } else {
1874             ret = 0;
1875             fdb_assert(file->ref_count, file->ref_count, 0);
1876             free(old_filename);
1877         }
1878     }
1879     spin_unlock(&file->lock);
1880     return ret;
1881 }
1882 
filemgr_set_compaction_state(struct filemgr *old_file, struct filemgr *new_file, file_status_t status)1883 void filemgr_set_compaction_state(struct filemgr *old_file, struct filemgr *new_file,
1884                                   file_status_t status)
1885 {
1886     spin_lock(&old_file->lock);
1887     old_file->new_file = new_file;
1888     atomic_store_uint8_t(&old_file->status, status);
1889     spin_unlock(&old_file->lock);
1890 }
1891 
1892 // Check if there is a file that still points to the old_file that is being
1893 // compacted away. If so open the file and return its pointer.
1894 static
_filemgr_check_stale_link(struct hash_elem *h, void *ctx)1895 void *_filemgr_check_stale_link(struct hash_elem *h, void *ctx) {
1896     struct filemgr *cur_file = (struct filemgr *)ctx;
1897     struct filemgr *file = _get_entry(h, struct filemgr, e);
1898     spin_lock(&file->lock);
1899     if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING &&
1900         file->new_file == cur_file) {
1901         // Incrementing reference counter below is the same as filemgr_open()
1902         // We need to do this to ensure that the pointer returned does not
1903         // get freed outside the filemgr_open lock
1904         file->ref_count++;
1905         spin_unlock(&file->lock);
1906         return (void *)file;
1907     }
1908     spin_unlock(&file->lock);
1909     return (void *)NULL;
1910 }
1911 
filemgr_search_stale_links(struct filemgr *cur_file)1912 struct filemgr *filemgr_search_stale_links(struct filemgr *cur_file) {
1913     struct filemgr *very_old_file;
1914     spin_lock(&filemgr_openlock);
1915     very_old_file = (struct filemgr *)hash_scan(&hash,
1916                                          _filemgr_check_stale_link, cur_file);
1917     spin_unlock(&filemgr_openlock);
1918     return very_old_file;
1919 }
1920 
filemgr_redirect_old_file(struct filemgr *very_old_file, struct filemgr *new_file, filemgr_redirect_hdr_func redirect_header_func)1921 char *filemgr_redirect_old_file(struct filemgr *very_old_file,
1922                                      struct filemgr *new_file,
1923                                      filemgr_redirect_hdr_func
1924                                      redirect_header_func) {
1925     size_t old_header_len, new_header_len;
1926     uint16_t new_filename_len;
1927     char *past_filename;
1928     spin_lock(&very_old_file->lock);
1929     fdb_assert(very_old_file->header.size, very_old_file->header.size, 0);
1930     fdb_assert(very_old_file->new_file, very_old_file->new_file, 0);
1931     old_header_len = very_old_file->header.size;
1932     new_filename_len = strlen(new_file->filename);
1933     // Find out the new DB header length with new_file's filename
1934     new_header_len = old_header_len - strlen(very_old_file->new_file->filename)
1935         + new_filename_len;
1936     // As we are going to change the new_filename field in the DB header of the
1937     // very_old_file, maybe reallocate DB header buf to accomodate bigger value
1938     if (new_header_len > old_header_len) {
1939         very_old_file->header.data = realloc(very_old_file->header.data,
1940                 new_header_len);
1941     }
1942     very_old_file->new_file = new_file; // Re-direct very_old_file to new_file
1943     past_filename = redirect_header_func((uint8_t *)very_old_file->header.data,
1944             new_file->filename, new_filename_len + 1);//Update in-memory header
1945     very_old_file->header.size = new_header_len;
1946     ++(very_old_file->header.revnum);
1947 
1948     spin_unlock(&very_old_file->lock);
1949     return past_filename;
1950 }
1951 
filemgr_remove_pending(struct filemgr *old_file, struct filemgr *new_file)1952 void filemgr_remove_pending(struct filemgr *old_file, struct filemgr *new_file)
1953 {
1954     fdb_assert(new_file, new_file, old_file);
1955 
1956     spin_lock(&old_file->lock);
1957     if (old_file->ref_count > 0) {
1958         // delay removing
1959         old_file->new_file = new_file;
1960         atomic_store_uint8_t(&old_file->status, FILE_REMOVED_PENDING);
1961         spin_unlock(&old_file->lock);
1962     } else {
1963         // immediatly remove
1964         // LCOV_EXCL_START
1965         spin_unlock(&old_file->lock);
1966 
1967         if (!lazy_file_deletion_enabled ||
1968             (old_file->new_file && old_file->new_file->in_place_compaction)) {
1969             remove(old_file->filename);
1970         }
1971         filemgr_remove_file(old_file);
1972         // LCOV_EXCL_STOP
1973     }
1974 }
1975 
1976 // migrate default kv store stats over to new_file
filemgr_migrate_op_stats(struct filemgr *old_file, struct filemgr *new_file, struct kvs_info *kvs)1977 struct kvs_ops_stat *filemgr_migrate_op_stats(struct filemgr *old_file,
1978                                               struct filemgr *new_file,
1979                                               struct kvs_info *kvs)
1980 {
1981     kvs_ops_stat *ret = NULL;
1982     fdb_assert(new_file, new_file, old_file);
1983 
1984     spin_lock(&old_file->lock);
1985     new_file->header.op_stat = old_file->header.op_stat;
1986     ret = &new_file->header.op_stat;
1987     spin_unlock(&old_file->lock);
1988     return ret;
1989 }
1990 
1991 // Note: filemgr_openlock should be held before calling this function.
filemgr_destroy_file(char *filename, struct filemgr_config *config, struct hash *destroy_file_set)1992 fdb_status filemgr_destroy_file(char *filename,
1993                                 struct filemgr_config *config,
1994                                 struct hash *destroy_file_set)
1995 {
1996     struct filemgr *file = NULL;
1997     struct hash to_destroy_files;
1998     struct hash *destroy_set = (destroy_file_set ? destroy_file_set :
1999                                                   &to_destroy_files);
2000     struct filemgr query;
2001     struct hash_elem *e = NULL;
2002     fdb_status status = FDB_RESULT_SUCCESS;
2003     char *old_filename = NULL;
2004 
2005     if (!destroy_file_set) { // top level or non-recursive call
2006         hash_init(destroy_set, NBUCKET, _file_hash, _file_cmp);
2007     }
2008 
2009     query.filename = filename;
2010     // check whether file is already being destroyed in parent recursive call
2011     e = hash_find(destroy_set, &query.e);
2012     if (e) { // Duplicate filename found, nothing to be done in this call
2013         if (!destroy_file_set) { // top level or non-recursive call
2014             hash_free(destroy_set);
2015         }
2016         return status;
2017     } else {
2018         // Remember file. Stack value ok IFF single direction recursion
2019         hash_insert(destroy_set, &query.e);
2020     }
2021 
2022     // check global list of known files to see if it is already opened or not
2023     e = hash_find(&hash, &query.e);
2024     if (e) {
2025         // already opened (return existing structure)
2026         file = _get_entry(e, struct filemgr, e);
2027 
2028         spin_lock(&file->lock);
2029         if (file->ref_count) {
2030             spin_unlock(&file->lock);
2031             status = FDB_RESULT_FILE_IS_BUSY;
2032             if (!destroy_file_set) { // top level or non-recursive call
2033                 hash_free(destroy_set);
2034             }
2035             return status;
2036         }
2037         spin_unlock(&file->lock);
2038         if (file->old_filename) {
2039             status = filemgr_destroy_file(file->old_filename, config,
2040                                           destroy_set);
2041             if (status != FDB_RESULT_SUCCESS) {
2042                 if (!destroy_file_set) { // top level or non-recursive call
2043                     hash_free(destroy_set);
2044                 }
2045                 return status;
2046             }
2047         }
2048 
2049         // Cleanup file from in-memory as well as on-disk
2050         e = hash_remove(&hash, &file->e);
2051         fdb_assert(e, e, 0);
2052         filemgr_free_func(&file->e);
2053         if (filemgr_does_file_exist(filename) == FDB_RESULT_SUCCESS) {
2054             if (remove(filename)) {
2055                 status = FDB_RESULT_FILE_REMOVE_FAIL;
2056             }
2057         }
2058     } else { // file not in memory, read on-disk to destroy older versions..
2059         file = (struct filemgr *)alca(struct filemgr, 1);
2060         file->filename = filename;
2061         file->ops = get_filemgr_ops();
2062         file->fd = file->ops->open(file->filename, O_RDWR, 0666);
2063         file->blocksize = global_config.blocksize;
2064         if (file->fd < 0) {
2065             if (file->fd != FDB_RESULT_NO_SUCH_FILE) {
2066                 if (!destroy_file_set) { // top level or non-recursive call
2067                     hash_free(destroy_set);
2068                 }
2069                 return FDB_RESULT_OPEN_FAIL;
2070             }
2071         } else { // file successfully opened, seek to end to get DB header
2072             cs_off_t offset = file->ops->goto_eof(file->fd);
2073             if (offset == FDB_RESULT_SEEK_FAIL) {
2074                 if (!destroy_file_set) { // top level or non-recursive call
2075                     hash_free(destroy_set);
2076                 }
2077                 return FDB_RESULT_SEEK_FAIL;
2078             } else { // Need to read DB header which contains old filename
2079                 atomic_store_uint64_t(&file->pos, offset);
2080                 status = _filemgr_read_header(file, NULL);
2081                 if (status != FDB_RESULT_SUCCESS) {
2082                     if (!destroy_file_set) { // top level or non-recursive call
2083                         hash_free(destroy_set);
2084                     }
2085                     file->ops->close(file->fd);
2086                     return status;
2087                 }
2088                 if (file->header.data) {
2089                     uint16_t *new_filename_len_ptr = (uint16_t *)((char *)
2090                                                      file->header.data + 64);
2091                     uint16_t new_filename_len =
2092                                       _endian_decode(*new_filename_len_ptr);
2093                     uint16_t *old_filename_len_ptr = (uint16_t *)((char *)
2094                                                      file->header.data + 66);
2095                     uint16_t old_filename_len =
2096                                       _endian_decode(*old_filename_len_ptr);
2097                     old_filename = (char *)file->header.data + 68
2098                                    + new_filename_len;
2099                     if (old_filename_len) {
2100                         status = filemgr_destroy_file(old_filename, config,
2101                                                       destroy_set);
2102                     }
2103                     free(file->header.data);
2104                 }
2105                 file->ops->close(file->fd);
2106                 if (status == FDB_RESULT_SUCCESS) {
2107                     if (filemgr_does_file_exist(filename)
2108                                                == FDB_RESULT_SUCCESS) {
2109                         if (remove(filename)) {
2110                             status = FDB_RESULT_FILE_REMOVE_FAIL;
2111                         }
2112                     }
2113                 }
2114             }
2115         }
2116     }
2117 
2118     if (!destroy_file_set) { // top level or non-recursive call
2119         hash_free(destroy_set);
2120     }
2121 
2122     return status;
2123 }
2124 
filemgr_is_rollback_on(struct filemgr *file)2125 bool filemgr_is_rollback_on(struct filemgr *file)
2126 {
2127     bool rv;
2128     spin_lock(&file->lock);
2129     rv = (file->fflags & FILEMGR_ROLLBACK_IN_PROG);
2130     spin_unlock(&file->lock);
2131     return rv;
2132 }
2133 
filemgr_set_rollback(struct filemgr *file, uint8_t new_val)2134 void filemgr_set_rollback(struct filemgr *file, uint8_t new_val)
2135 {
2136     spin_lock(&file->lock);
2137     if (new_val) {
2138         file->fflags |= FILEMGR_ROLLBACK_IN_PROG;
2139     } else {
2140         file->fflags &= ~FILEMGR_ROLLBACK_IN_PROG;
2141     }
2142     spin_unlock(&file->lock);
2143 }
2144 
filemgr_set_in_place_compaction(struct filemgr *file, bool in_place_compaction)2145 void filemgr_set_in_place_compaction(struct filemgr *file,
2146                                      bool in_place_compaction) {
2147     spin_lock(&file->lock);
2148     file->in_place_compaction = in_place_compaction;
2149     spin_unlock(&file->lock);
2150 }
2151 
filemgr_is_in_place_compaction_set(struct filemgr *file)2152 bool filemgr_is_in_place_compaction_set(struct filemgr *file)
2153 {
2154     bool ret = false;
2155     spin_lock(&file->lock);
2156     ret = file->in_place_compaction;
2157     spin_unlock(&file->lock);
2158     return ret;
2159 }
2160 
filemgr_mutex_openlock(struct filemgr_config *config)2161 void filemgr_mutex_openlock(struct filemgr_config *config)
2162 {
2163     filemgr_init(config);
2164 
2165     spin_lock(&filemgr_openlock);
2166 }
2167 
filemgr_mutex_openunlock(void)2168 void filemgr_mutex_openunlock(void)
2169 {
2170     spin_unlock(&filemgr_openlock);
2171 }
2172 
filemgr_mutex_lock(struct filemgr *file)2173 void filemgr_mutex_lock(struct filemgr *file)
2174 {
2175     mutex_lock(&file->writer_lock.mutex);
2176     file->writer_lock.locked = true;
2177 }
2178 
filemgr_mutex_trylock(struct filemgr *file)2179 bool filemgr_mutex_trylock(struct filemgr *file) {
2180     if (mutex_trylock(&file->writer_lock.mutex)) {
2181         file->writer_lock.locked = true;
2182         return true;
2183     }
2184     return false;
2185 }
2186 
filemgr_mutex_unlock(struct filemgr *file)2187 void filemgr_mutex_unlock(struct filemgr *file)
2188 {
2189     if (file->writer_lock.locked) {
2190         file->writer_lock.locked = false;
2191         mutex_unlock(&file->writer_lock.mutex);
2192     }
2193 }
2194 
filemgr_set_dirty_root(struct filemgr *file, bid_t dirty_idtree_root, bid_t dirty_seqtree_root)2195 void filemgr_set_dirty_root(struct filemgr *file,
2196                             bid_t dirty_idtree_root,
2197                             bid_t dirty_seqtree_root)
2198 {
2199     atomic_store_uint64_t(&file->header.dirty_idtree_root, dirty_idtree_root);
2200     atomic_store_uint64_t(&file->header.dirty_seqtree_root, dirty_seqtree_root);
2201 }
2202 
filemgr_is_commit_header(void *head_buffer, size_t blocksize)2203 bool filemgr_is_commit_header(void *head_buffer, size_t blocksize)
2204 {
2205     uint8_t marker[BLK_MARKER_SIZE];
2206     filemgr_magic_t magic;
2207     marker[0] = *(((uint8_t *)head_buffer)
2208                  + blocksize - BLK_MARKER_SIZE);
2209     if (marker[0] != BLK_MARKER_DBHEADER) {
2210         return false;
2211     }
2212 
2213     memcpy(&magic, (uint8_t *) head_buffer
2214             + blocksize - BLK_MARKER_SIZE - sizeof(magic), sizeof(magic));
2215     magic = _endian_decode(magic);
2216 
2217     return (magic == FILEMGR_MAGIC);
2218 }
2219 
filemgr_set_throttling_delay(struct filemgr *file, uint64_t delay_us)2220 void filemgr_set_throttling_delay(struct filemgr *file, uint64_t delay_us)
2221 {
2222     atomic_store_uint32_t(&file->throttling_delay, delay_us);
2223 }
2224 
filemgr_get_throttling_delay(struct filemgr *file)2225 uint32_t filemgr_get_throttling_delay(struct filemgr *file)
2226 {
2227     return atomic_get_uint32_t(&file->throttling_delay);
2228 }
2229 
_kvs_stat_set(struct filemgr *file, fdb_kvs_id_t kv_id, struct kvs_stat stat)2230 void _kvs_stat_set(struct filemgr *file,
2231                    fdb_kvs_id_t kv_id,
2232                    struct kvs_stat stat)
2233 {
2234     if (kv_id == 0) {
2235         spin_lock(&file->lock);
2236         file->header.stat = stat;
2237         spin_unlock(&file->lock);
2238     } else {
2239         struct avl_node *a;
2240         struct kvs_node query, *node;
2241         struct kvs_header *kv_header = file->kv_header;
2242 
2243         spin_lock(&kv_header->lock);
2244         query.id = kv_id;
2245         a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2246         if (a) {
2247             node = _get_entry(a, struct kvs_node, avl_id);
2248             node->stat = stat;
2249         }
2250         spin_unlock(&kv_header->lock);
2251     }
2252 }
2253 
_kvs_stat_update_attr(struct filemgr *file, fdb_kvs_id_t kv_id, kvs_stat_attr_t attr, int delta)2254 void _kvs_stat_update_attr(struct filemgr *file,
2255                            fdb_kvs_id_t kv_id,
2256                            kvs_stat_attr_t attr,
2257                            int delta)
2258 {
2259     spin_t *lock = NULL;
2260     struct kvs_stat *stat;
2261 
2262     if (kv_id == 0) {
2263         stat = &file->header.stat;
2264         lock = &file->lock;
2265         spin_lock(lock);
2266     } else {
2267         struct avl_node *a;
2268         struct kvs_node query, *node;
2269         struct kvs_header *kv_header = file->kv_header;
2270 
2271         lock = &kv_header->lock;
2272         spin_lock(lock);
2273         query.id = kv_id;
2274         a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2275         if (!a) {
2276             // KV instance corresponding to the kv_id is already removed
2277             spin_unlock(lock);
2278             return;
2279         }
2280         node = _get_entry(a, struct kvs_node, avl_id);
2281         stat = &node->stat;
2282     }
2283 
2284     if (attr == KVS_STAT_DATASIZE) {
2285         stat->datasize += delta;
2286     } else if (attr == KVS_STAT_NDOCS) {
2287         stat->ndocs += delta;
2288     } else if (attr == KVS_STAT_NLIVENODES) {
2289         stat->nlivenodes += delta;
2290     } else if (attr == KVS_STAT_WAL_NDELETES) {
2291         stat->wal_ndeletes += delta;
2292     } else if (attr == KVS_STAT_WAL_NDOCS) {
2293         stat->wal_ndocs += delta;
2294     }
2295     spin_unlock(lock);
2296 }
2297 
_kvs_stat_get_kv_header(struct kvs_header *kv_header, fdb_kvs_id_t kv_id, struct kvs_stat *stat)2298 int _kvs_stat_get_kv_header(struct kvs_header *kv_header,
2299                             fdb_kvs_id_t kv_id,
2300                             struct kvs_stat *stat)
2301 {
2302     int ret = 0;
2303     struct avl_node *a;
2304     struct kvs_node query, *node;
2305 
2306     query.id = kv_id;
2307     a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2308     if (a) {
2309         node = _get_entry(a, struct kvs_node, avl_id);
2310         *stat = node->stat;
2311     } else {
2312         ret = -1;
2313     }
2314     return ret;
2315 }
2316 
_kvs_stat_get(struct filemgr *file, fdb_kvs_id_t kv_id, struct kvs_stat *stat)2317 int _kvs_stat_get(struct filemgr *file,
2318                   fdb_kvs_id_t kv_id,
2319                   struct kvs_stat *stat)
2320 {
2321     int ret = 0;
2322 
2323     if (kv_id == 0) {
2324         spin_lock(&file->lock);
2325         *stat = file->header.stat;
2326         spin_unlock(&file->lock);
2327     } else {
2328         struct kvs_header *kv_header = file->kv_header;
2329 
2330         spin_lock(&kv_header->lock);
2331         ret = _kvs_stat_get_kv_header(kv_header, kv_id, stat);
2332         spin_unlock(&kv_header->lock);
2333     }
2334 
2335     return ret;
2336 }
2337 
_kvs_stat_get_sum(struct filemgr *file, kvs_stat_attr_t attr)2338 uint64_t _kvs_stat_get_sum(struct filemgr *file,
2339                            kvs_stat_attr_t attr)
2340 {
2341     struct avl_node *a;
2342     struct kvs_node *node;
2343     struct kvs_header *kv_header = file->kv_header;
2344 
2345     uint64_t ret = 0;
2346     spin_lock(&file->lock);
2347     if (attr == KVS_STAT_DATASIZE) {
2348         ret += file->header.stat.datasize;
2349     } else if (attr == KVS_STAT_NDOCS) {
2350         ret += file->header.stat.ndocs;
2351     } else if (attr == KVS_STAT_NLIVENODES) {
2352         ret += file->header.stat.nlivenodes;
2353     } else if (attr == KVS_STAT_WAL_NDELETES) {
2354         ret += file->header.stat.wal_ndeletes;
2355     } else if (attr == KVS_STAT_WAL_NDOCS) {
2356         ret += file->header.stat.wal_ndocs;
2357     }
2358     spin_unlock(&file->lock);
2359 
2360     if (kv_header) {
2361         spin_lock(&kv_header->lock);
2362         a = avl_first(kv_header->idx_id);
2363         while (a) {
2364             node = _get_entry(a, struct kvs_node, avl_id);
2365             a = avl_next(&node->avl_id);
2366 
2367             if (attr == KVS_STAT_DATASIZE) {
2368                 ret += node->stat.datasize;
2369             } else if (attr == KVS_STAT_NDOCS) {
2370                 ret += node->stat.ndocs;
2371             } else if (attr == KVS_STAT_NLIVENODES) {
2372                 ret += node->stat.nlivenodes;
2373             } else if (attr == KVS_STAT_WAL_NDELETES) {
2374                 ret += node->stat.wal_ndeletes;
2375             } else if (attr == KVS_STAT_WAL_NDOCS) {
2376                 ret += node->stat.wal_ndocs;
2377             }
2378         }
2379         spin_unlock(&kv_header->lock);
2380     }
2381 
2382     return ret;
2383 }
2384 
_kvs_ops_stat_get_kv_header(struct kvs_header *kv_header, fdb_kvs_id_t kv_id, struct kvs_ops_stat *stat)2385 int _kvs_ops_stat_get_kv_header(struct kvs_header *kv_header,
2386                                 fdb_kvs_id_t kv_id,
2387                                 struct kvs_ops_stat *stat)
2388 {
2389     int ret = 0;
2390     struct avl_node *a;
2391     struct kvs_node query, *node;
2392 
2393     query.id = kv_id;
2394     a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2395     if (a) {
2396         node = _get_entry(a, struct kvs_node, avl_id);
2397         *stat = node->op_stat;
2398     } else {
2399         ret = -1;
2400     }
2401     return ret;
2402 }
2403 
_kvs_ops_stat_get(struct filemgr *file, fdb_kvs_id_t kv_id, struct kvs_ops_stat *stat)2404 int _kvs_ops_stat_get(struct filemgr *file,
2405                       fdb_kvs_id_t kv_id,
2406                       struct kvs_ops_stat *stat)
2407 {
2408     int ret = 0;
2409 
2410     if (kv_id == 0) {
2411         spin_lock(&file->lock);
2412         *stat = file->header.op_stat;
2413         spin_unlock(&file->lock);
2414     } else {
2415         struct kvs_header *kv_header = file->kv_header;
2416 
2417         spin_lock(&kv_header->lock);
2418         ret = _kvs_ops_stat_get_kv_header(kv_header, kv_id, stat);
2419         spin_unlock(&kv_header->lock);
2420     }
2421 
2422     return ret;
2423 }
2424 
_init_op_stats(struct kvs_ops_stat *stat)2425 void _init_op_stats(struct kvs_ops_stat *stat) {
2426     atomic_init_uint64_t(&stat->num_sets, 0);
2427     atomic_init_uint64_t(&stat->num_dels, 0);
2428     atomic_init_uint64_t(&stat->num_commits, 0);
2429     atomic_init_uint64_t(&stat->num_compacts, 0);
2430     atomic_init_uint64_t(&stat->num_gets, 0);
2431     atomic_init_uint64_t(&stat->num_iterator_gets, 0);
2432     atomic_init_uint64_t(&stat->num_iterator_moves, 0);
2433 }
2434 
filemgr_get_ops_stats(struct filemgr *file, struct kvs_info *kvs)2435 struct kvs_ops_stat *filemgr_get_ops_stats(struct filemgr *file,
2436                                            struct kvs_info *kvs)
2437 {
2438     struct kvs_ops_stat *stat = NULL;
2439     if (!kvs || (kvs && kvs->id == 0)) {
2440         return &file->header.op_stat;
2441     } else {
2442         struct kvs_header *kv_header = file->kv_header;
2443         struct avl_node *a;
2444         struct kvs_node query, *node;
2445         spin_lock(&kv_header->lock);
2446         query.id = kvs->id;
2447         a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2448         if (a) {
2449             node = _get_entry(a, struct kvs_node, avl_id);
2450             stat = &node->op_stat;
2451         }
2452         spin_unlock(&kv_header->lock);
2453     }
2454     return stat;
2455 }
2456 
buf2kvid(size_t chunksize, void *buf, fdb_kvs_id_t *id)2457 void buf2kvid(size_t chunksize, void *buf, fdb_kvs_id_t *id)
2458 {
2459     size_t size_id = sizeof(fdb_kvs_id_t);
2460     fdb_kvs_id_t temp;
2461 
2462     if (chunksize == size_id) {
2463         temp = *((fdb_kvs_id_t*)buf);
2464     } else if (chunksize < size_id) {
2465         temp = 0;
2466         memcpy((uint8_t*)&temp + (size_id - chunksize), buf, chunksize);
2467     } else { // chunksize > sizeof(fdb_kvs_id_t)
2468         memcpy(&temp, (uint8_t*)buf + (chunksize - size_id), size_id);
2469     }
2470     *id = _endian_decode(temp);
2471 }
2472 
kvid2buf(size_t chunksize, fdb_kvs_id_t id, void *buf)2473 void kvid2buf(size_t chunksize, fdb_kvs_id_t id, void *buf)
2474 {
2475     size_t size_id = sizeof(fdb_kvs_id_t);
2476     id = _endian_encode(id);
2477 
2478     if (chunksize == size_id) {
2479         memcpy(buf, &id, size_id);
2480     } else if (chunksize < size_id) {
2481         memcpy(buf, (uint8_t*)&id + (size_id - chunksize), chunksize);
2482     } else { // chunksize > sizeof(fdb_kvs_id_t)
2483         memset(buf, 0x0, chunksize - size_id);
2484         memcpy((uint8_t*)buf + (chunksize - size_id), &id, size_id);
2485     }
2486 }
2487 
buf2buf(size_t chunksize_src, void *buf_src, size_t chunksize_dst, void *buf_dst)2488 void buf2buf(size_t chunksize_src, void *buf_src,
2489              size_t chunksize_dst, void *buf_dst)
2490 {
2491     if (chunksize_dst == chunksize_src) {
2492         memcpy(buf_dst, buf_src, chunksize_src);
2493     } else if (chunksize_dst < chunksize_src) {
2494         memcpy(buf_dst, (uint8_t*)buf_src + (chunksize_src - chunksize_dst),
2495                chunksize_dst);
2496     } else { // chunksize_dst > chunksize_src
2497         memset(buf_dst, 0x0, chunksize_dst - chunksize_src);
2498         memcpy((uint8_t*)buf_dst + (chunksize_dst - chunksize_src),
2499                buf_src, chunksize_src);
2500     }
2501 }
2502