1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2010 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <fcntl.h>
22 #include <sys/stat.h>
23 #include <stdarg.h>
24 #if !defined(WIN32) && !defined(_WIN32)
25 #include <sys/time.h>
26 #endif
27
28 #include "filemgr.h"
29 #include "filemgr_ops.h"
30 #include "hash_functions.h"
31 #include "blockcache.h"
32 #include "wal.h"
33 #include "list.h"
34 #include "fdb_internal.h"
35 #include "time_utils.h"
36
37 #include "memleak.h"
38
39 #ifdef __DEBUG
40 #ifndef __DEBUG_FILEMGR
41 #undef DBG
42 #undef DBGCMD
43 #undef DBGSW
44 #define DBG(...)
45 #define DBGCMD(...)
46 #define DBGSW(n, ...)
47 #endif
48 #endif
49
50 // NBUCKET must be power of 2
51 #define NBUCKET (1024)
52 #define FILEMGR_MAGIC (UINT64_C(0xdeadcafebeefbeef))
53
54 // global static variables
55 #ifdef SPIN_INITIALIZER
56 static spin_t initial_lock = SPIN_INITIALIZER;
57 #else
58 static volatile unsigned int initial_lock_status = 0;
59 static spin_t initial_lock;
60 #endif
61
62
63 static volatile uint8_t filemgr_initialized = 0;
64 static struct filemgr_config global_config;
65 static struct hash hash;
66 static spin_t filemgr_openlock;
67
68 struct temp_buf_item{
69 void *addr;
70 struct list_elem le;
71 };
72 static struct list temp_buf;
73 static spin_t temp_buf_lock;
74
75 static bool lazy_file_deletion_enabled = false;
76 static register_file_removal_func register_file_removal = NULL;
77 static check_file_removal_func is_file_removed = NULL;
78
spin_init_wrap(void *lock)79 static void spin_init_wrap(void *lock) {
80 spin_init((spin_t*)lock);
81 }
82
spin_destroy_wrap(void *lock)83 static void spin_destroy_wrap(void *lock) {
84 spin_destroy((spin_t*)lock);
85 }
86
spin_lock_wrap(void *lock)87 static void spin_lock_wrap(void *lock) {
88 spin_lock((spin_t*)lock);
89 }
90
spin_unlock_wrap(void *lock)91 static void spin_unlock_wrap(void *lock) {
92 spin_unlock((spin_t*)lock);
93 }
94
mutex_init_wrap(void *lock)95 static void mutex_init_wrap(void *lock) {
96 mutex_init((mutex_t*)lock);
97 }
98
mutex_destroy_wrap(void *lock)99 static void mutex_destroy_wrap(void *lock) {
100 mutex_destroy((mutex_t*)lock);
101 }
102
mutex_lock_wrap(void *lock)103 static void mutex_lock_wrap(void *lock) {
104 mutex_lock((mutex_t*)lock);
105 }
106
mutex_unlock_wrap(void *lock)107 static void mutex_unlock_wrap(void *lock) {
108 mutex_unlock((mutex_t*)lock);
109 }
110
_kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)111 static int _kvs_stat_cmp(struct avl_node *a, struct avl_node *b, void *aux)
112 {
113 struct kvs_node *aa, *bb;
114 aa = _get_entry(a, struct kvs_node, avl_id);
115 bb = _get_entry(b, struct kvs_node, avl_id);
116
117 if (aa->id < bb->id) {
118 return -1;
119 } else if (aa->id > bb->id) {
120 return 1;
121 } else {
122 return 0;
123 }
124 }
125
_block_is_overlapped(void *pbid1, void *pis_writer1, void *pbid2, void *pis_writer2, void *aux)126 static int _block_is_overlapped(void *pbid1, void *pis_writer1,
127 void *pbid2, void *pis_writer2,
128 void *aux)
129 {
130 (void)aux;
131 bid_t bid1, is_writer1, bid2, is_writer2;
132 bid1 = *(bid_t*)pbid1;
133 is_writer1 = *(bid_t*)pis_writer1;
134 bid2 = *(bid_t*)pbid2;
135 is_writer2 = *(bid_t*)pis_writer2;
136
137 if (bid1 != bid2) {
138 // not overlapped
139 return 0;
140 } else {
141 // overlapped
142 if (!is_writer1 && !is_writer2) {
143 // both are readers
144 return 0;
145 } else {
146 return 1;
147 }
148 }
149 }
150
fdb_log(err_log_callback *log_callback, fdb_status status, const char *format, ...)151 fdb_status fdb_log(err_log_callback *log_callback,
152 fdb_status status,
153 const char *format, ...)
154 {
155 if (log_callback && log_callback->callback) {
156 char msg[1024];
157 va_list args;
158 va_start(args, format);
159 vsprintf(msg, format, args);
160 va_end(args);
161 log_callback->callback(status, msg, log_callback->ctx_data);
162 }
163 return status;
164 }
165
_log_errno_str(struct filemgr_ops *ops, err_log_callback *log_callback, fdb_status io_error, const char *what, const char *filename)166 static void _log_errno_str(struct filemgr_ops *ops,
167 err_log_callback *log_callback,
168 fdb_status io_error,
169 const char *what,
170 const char *filename)
171 {
172 if (io_error < 0) {
173 char errno_msg[512];
174 ops->get_errno_str(errno_msg, 512);
175 fdb_log(log_callback, io_error,
176 "Error in %s on a database file '%s', %s", what, filename, errno_msg);
177 }
178 }
179
_file_hash(struct hash *hash, struct hash_elem *e)180 static uint32_t _file_hash(struct hash *hash, struct hash_elem *e)
181 {
182 struct filemgr *file = _get_entry(e, struct filemgr, e);
183 int len = strlen(file->filename);
184 return chksum(file->filename, len) & ((unsigned)(NBUCKET-1));
185 }
186
_file_cmp(struct hash_elem *a, struct hash_elem *b)187 static int _file_cmp(struct hash_elem *a, struct hash_elem *b)
188 {
189 struct filemgr *aa, *bb;
190 aa = _get_entry(a, struct filemgr, e);
191 bb = _get_entry(b, struct filemgr, e);
192 return strcmp(aa->filename, bb->filename);
193 }
194
filemgr_init(struct filemgr_config *config)195 void filemgr_init(struct filemgr_config *config)
196 {
197 // global initialization
198 // initialized only once at first time
199 if (!filemgr_initialized) {
200 #ifndef SPIN_INITIALIZER
201 // Note that only Windows passes through this routine
202 if (InterlockedCompareExchange(&initial_lock_status, 1, 0) == 0) {
203 // atomically initialize spin lock only once
204 spin_init(&initial_lock);
205 initial_lock_status = 2;
206 } else {
207 // the others ... wait until initializing 'initial_lock' is done
208 while (initial_lock_status != 2) {
209 Sleep(1);
210 }
211 }
212 #endif
213
214 spin_lock(&initial_lock);
215 if (!filemgr_initialized) {
216 global_config = *config;
217
218 if (global_config.ncacheblock > 0)
219 bcache_init(global_config.ncacheblock, global_config.blocksize);
220
221 hash_init(&hash, NBUCKET, _file_hash, _file_cmp);
222
223 // initialize temp buffer
224 list_init(&temp_buf);
225 spin_init(&temp_buf_lock);
226
227 // initialize global lock
228 spin_init(&filemgr_openlock);
229
230 // set the initialize flag
231 filemgr_initialized = 1;
232 }
233 spin_unlock(&initial_lock);
234 }
235 }
236
filemgr_set_lazy_file_deletion(bool enable, register_file_removal_func regis_func, check_file_removal_func check_func)237 void filemgr_set_lazy_file_deletion(bool enable,
238 register_file_removal_func regis_func,
239 check_file_removal_func check_func)
240 {
241 lazy_file_deletion_enabled = enable;
242 register_file_removal = regis_func;
243 is_file_removed = check_func;
244 }
245
_filemgr_get_temp_buf()246 static void * _filemgr_get_temp_buf()
247 {
248 struct list_elem *e;
249 struct temp_buf_item *item;
250
251 spin_lock(&temp_buf_lock);
252 e = list_pop_front(&temp_buf);
253 if (e) {
254 item = _get_entry(e, struct temp_buf_item, le);
255 } else {
256 void *addr;
257
258 malloc_align(addr, FDB_SECTOR_SIZE,
259 global_config.blocksize + sizeof(struct temp_buf_item));
260
261 item = (struct temp_buf_item *)((uint8_t *) addr + global_config.blocksize);
262 item->addr = addr;
263 }
264 spin_unlock(&temp_buf_lock);
265
266 return item->addr;
267 }
268
_filemgr_release_temp_buf(void *buf)269 static void _filemgr_release_temp_buf(void *buf)
270 {
271 struct temp_buf_item *item;
272
273 spin_lock(&temp_buf_lock);
274 item = (struct temp_buf_item*)((uint8_t *)buf + global_config.blocksize);
275 list_push_front(&temp_buf, &item->le);
276 spin_unlock(&temp_buf_lock);
277 }
278
_filemgr_shutdown_temp_buf()279 static void _filemgr_shutdown_temp_buf()
280 {
281 struct list_elem *e;
282 struct temp_buf_item *item;
283 size_t count=0;
284
285 spin_lock(&temp_buf_lock);
286 e = list_begin(&temp_buf);
287 while(e){
288 item = _get_entry(e, struct temp_buf_item, le);
289 e = list_remove(&temp_buf, e);
290 free_align(item->addr);
291 count++;
292 }
293 spin_unlock(&temp_buf_lock);
294 }
295
_filemgr_read_header(struct filemgr *file, err_log_callback *log_callback)296 static fdb_status _filemgr_read_header(struct filemgr *file,
297 err_log_callback *log_callback)
298 {
299 uint8_t marker[BLK_MARKER_SIZE];
300 filemgr_magic_t magic;
301 filemgr_header_len_t len;
302 uint8_t *buf;
303 uint32_t crc, crc_file;
304 fdb_status status = FDB_RESULT_SUCCESS;
305
306 // get temp buffer
307 buf = (uint8_t *) _filemgr_get_temp_buf();
308
309 if (atomic_get_uint64_t(&file->pos) > 0) {
310 // Crash Recovery Test 1: unaligned last block write
311 uint64_t remain = atomic_get_uint64_t(&file->pos) % file->blocksize;
312 if (remain) {
313 atomic_sub_uint64_t(&file->pos, remain);
314 atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
315 const char *msg = "Crash Detected: %" _F64 " non-block aligned bytes discarded "
316 "from a database file '%s'\n";
317 DBG(msg, remain, file->filename);
318 fdb_log(log_callback, FDB_RESULT_READ_FAIL /* Need to add a better error code*/,
319 msg, remain, file->filename);
320 }
321
322 size_t block_counter = 0;
323 do {
324 ssize_t rv = file->ops->pread(file->fd, buf, file->blocksize,
325 atomic_get_uint64_t(&file->pos) - file->blocksize);
326 if (rv != file->blocksize) {
327 status = FDB_RESULT_READ_FAIL;
328 const char *msg = "Unable to read a database file '%s' with "
329 "blocksize %" _F64 "\n";
330 DBG(msg, file->filename, file->blocksize);
331 fdb_log(log_callback, status, msg, file->filename, file->blocksize);
332 break;
333 }
334 ++block_counter;
335 memcpy(marker, buf + file->blocksize - BLK_MARKER_SIZE,
336 BLK_MARKER_SIZE);
337
338 if (marker[0] == BLK_MARKER_DBHEADER) {
339 // possible need for byte conversions here
340 memcpy(&magic,
341 buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
342 sizeof(magic));
343 magic = _endian_decode(magic);
344
345 if (magic == FILEMGR_MAGIC) {
346 memcpy(&len,
347 buf + file->blocksize - BLK_MARKER_SIZE -
348 sizeof(magic) - sizeof(len),
349 sizeof(len));
350 len = _endian_decode(len);
351
352 crc = chksum(buf, len - sizeof(crc));
353 memcpy(&crc_file, buf + len - sizeof(crc), sizeof(crc));
354 crc_file = _endian_decode(crc_file);
355 if (crc == crc_file) {
356 file->header.data = (void *)malloc(len);
357
358 memcpy(file->header.data, buf, len);
359 memcpy(&file->header.revnum, buf + len,
360 sizeof(filemgr_header_revnum_t));
361 memcpy((void *) &file->header.seqnum,
362 buf + len + sizeof(filemgr_header_revnum_t),
363 sizeof(fdb_seqnum_t));
364 file->header.revnum =
365 _endian_decode(file->header.revnum);
366 file->header.seqnum =
367 _endian_decode(file->header.seqnum);
368 file->header.size = len;
369 atomic_store_uint64_t(&file->header.bid,
370 (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1);
371 atomic_store_uint64_t(&file->header.dirty_idtree_root,
372 BLK_NOT_FOUND);
373 atomic_store_uint64_t(&file->header.dirty_seqtree_root,
374 BLK_NOT_FOUND);
375 memset(&file->header.stat, 0x0, sizeof(file->header.stat));
376
377 // release temp buffer
378 _filemgr_release_temp_buf(buf);
379
380 return FDB_RESULT_SUCCESS;
381 } else {
382 status = FDB_RESULT_CHECKSUM_ERROR;
383 const char *msg = "Crash Detected: CRC on disk %u != %u "
384 "in a database file '%s'\n";
385 DBG(msg, crc_file, crc, file->filename);
386 fdb_log(log_callback, status, msg, crc_file, crc,
387 file->filename);
388 }
389 } else {
390 status = FDB_RESULT_FILE_CORRUPTION;
391 const char *msg = "Crash Detected: Wrong Magic %" _F64 " != %" _F64
392 " in a database file '%s'\n";
393 DBG(msg, magic, FILEMGR_MAGIC, file->filename);
394 fdb_log(log_callback, status, msg, magic, FILEMGR_MAGIC,
395 file->filename);
396 }
397 } else {
398 status = FDB_RESULT_NO_DB_HEADERS;
399 if (block_counter == 1) {
400 const char *msg = "Crash Detected: Last Block not DBHEADER %0.01x "
401 "in a database file '%s'\n";
402 DBG(msg, marker[0], file->filename);
403 fdb_log(log_callback, status, msg, marker[0], file->filename);
404 }
405 }
406
407 atomic_sub_uint64_t(&file->pos, file->blocksize);
408 atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
409 } while (atomic_get_uint64_t(&file->pos));
410 }
411
412 // release temp buffer
413 _filemgr_release_temp_buf(buf);
414
415 file->header.size = 0;
416 file->header.revnum = 0;
417 file->header.seqnum = 0;
418 file->header.data = NULL;
419 atomic_store_uint64_t(&file->header.bid, 0);
420 atomic_store_uint64_t(&file->header.dirty_idtree_root, BLK_NOT_FOUND);
421 atomic_store_uint64_t(&file->header.dirty_seqtree_root, BLK_NOT_FOUND);
422 memset(&file->header.stat, 0x0, sizeof(file->header.stat));
423 return status;
424 }
425
filemgr_get_ref_count(struct filemgr *file)426 size_t filemgr_get_ref_count(struct filemgr *file)
427 {
428 size_t ret = 0;
429 spin_lock(&file->lock);
430 ret = file->ref_count;
431 spin_unlock(&file->lock);
432 return ret;
433 }
434
filemgr_get_bcache_used_space(void)435 uint64_t filemgr_get_bcache_used_space(void)
436 {
437 uint64_t bcache_free_space = 0;
438 if (global_config.ncacheblock) { // If buffer cache is indeed configured
439 bcache_free_space = bcache_get_num_free_blocks();
440 bcache_free_space = (global_config.ncacheblock - bcache_free_space)
441 * global_config.blocksize;
442 }
443 return bcache_free_space;
444 }
445
446 struct filemgr_prefetch_args {
447 struct filemgr *file;
448 uint64_t duration;
449 err_log_callback *log_callback;
450 void *aux;
451 };
452
_filemgr_prefetch_thread(void *voidargs)453 static void *_filemgr_prefetch_thread(void *voidargs)
454 {
455 struct filemgr_prefetch_args *args = (struct filemgr_prefetch_args*)voidargs;
456 uint8_t *buf = alca(uint8_t, args->file->blocksize);
457 uint64_t cur_pos = 0, i;
458 uint64_t bcache_free_space;
459 bid_t bid;
460 bool terminate = false;
461 struct timeval begin, cur, gap;
462
463 spin_lock(&args->file->lock);
464 cur_pos = atomic_get_uint64_t(&args->file->last_commit);
465 spin_unlock(&args->file->lock);
466 if (cur_pos < FILEMGR_PREFETCH_UNIT) {
467 terminate = true;
468 } else {
469 cur_pos -= FILEMGR_PREFETCH_UNIT;
470 }
471 // read backwards from the end of the file, in the unit of FILEMGR_PREFETCH_UNIT
472 gettimeofday(&begin, NULL);
473 while (!terminate) {
474 for (i = cur_pos;
475 i < cur_pos + FILEMGR_PREFETCH_UNIT;
476 i += args->file->blocksize) {
477
478 gettimeofday(&cur, NULL);
479 gap = _utime_gap(begin, cur);
480 bcache_free_space = bcache_get_num_free_blocks();
481 bcache_free_space *= args->file->blocksize;
482
483 if (args->file->prefetch_status == FILEMGR_PREFETCH_ABORT ||
484 gap.tv_sec >= (int64_t)args->duration ||
485 bcache_free_space < FILEMGR_PREFETCH_UNIT) {
486 // terminate thread when
487 // 1. got abort signal
488 // 2. time out
489 // 3. not enough free space in block cache
490 terminate = true;
491 break;
492 } else {
493 bid = i / args->file->blocksize;
494 if (filemgr_read(args->file, bid, buf, NULL, true)
495 != FDB_RESULT_SUCCESS) {
496 // 4. read failure
497 fdb_log(args->log_callback, FDB_RESULT_READ_FAIL,
498 "Prefetch thread failed to read a block with block id %" _F64
499 " from a database file '%s'", bid, args->file->filename);
500 terminate = true;
501 break;
502 }
503 }
504 }
505
506 if (cur_pos >= FILEMGR_PREFETCH_UNIT) {
507 cur_pos -= FILEMGR_PREFETCH_UNIT;
508 } else {
509 // remaining space is less than FILEMGR_PREFETCH_UNIT
510 terminate = true;
511 }
512 }
513
514 args->file->prefetch_status = FILEMGR_PREFETCH_IDLE;
515 free(args);
516 return NULL;
517 }
518
519 // prefetch the given DB file
filemgr_prefetch(struct filemgr *file, struct filemgr_config *config, err_log_callback *log_callback)520 void filemgr_prefetch(struct filemgr *file,
521 struct filemgr_config *config,
522 err_log_callback *log_callback)
523 {
524 uint64_t bcache_free_space;
525
526 bcache_free_space = bcache_get_num_free_blocks();
527 bcache_free_space *= file->blocksize;
528
529 // block cache should have free space larger than FILEMGR_PREFETCH_UNIT
530 spin_lock(&file->lock);
531 if (atomic_get_uint64_t(&file->last_commit) > 0 &&
532 bcache_free_space >= FILEMGR_PREFETCH_UNIT) {
533 // invoke prefetch thread
534 struct filemgr_prefetch_args *args;
535 args = (struct filemgr_prefetch_args *)
536 calloc(1, sizeof(struct filemgr_prefetch_args));
537 args->file = file;
538 args->duration = config->prefetch_duration;
539 args->log_callback = log_callback;
540
541 file->prefetch_status = FILEMGR_PREFETCH_RUNNING;
542 thread_create(&file->prefetch_tid, _filemgr_prefetch_thread, args);
543 }
544 spin_unlock(&file->lock);
545 }
546
filemgr_does_file_exist(char *filename)547 fdb_status filemgr_does_file_exist(char *filename) {
548 struct filemgr_ops *ops = get_filemgr_ops();
549 int fd = ops->open(filename, O_RDONLY, 0444);
550 if (fd < 0) {
551 return (fdb_status) fd;
552 }
553 ops->close(fd);
554 return FDB_RESULT_SUCCESS;
555 }
556
filemgr_open(char *filename, struct filemgr_ops *ops, struct filemgr_config *config, err_log_callback *log_callback)557 filemgr_open_result filemgr_open(char *filename, struct filemgr_ops *ops,
558 struct filemgr_config *config,
559 err_log_callback *log_callback)
560 {
561 struct filemgr *file = NULL;
562 struct filemgr query;
563 struct hash_elem *e = NULL;
564 bool create = config->options & FILEMGR_CREATE;
565 int file_flag = 0x0;
566 int fd = -1;
567 fdb_status status;
568 filemgr_open_result result = {NULL, FDB_RESULT_OPEN_FAIL};
569
570 filemgr_init(config);
571
572 // check whether file is already opened or not
573 query.filename = filename;
574 spin_lock(&filemgr_openlock);
575 e = hash_find(&hash, &query.e);
576
577 if (e) {
578 // already opened (return existing structure)
579 file = _get_entry(e, struct filemgr, e);
580
581 spin_lock(&file->lock);
582 file->ref_count++;
583
584 if (atomic_get_uint8_t(&file->status) == FILE_CLOSED) { // if file was closed before
585 file_flag = O_RDWR;
586 if (create) {
587 file_flag |= O_CREAT;
588 }
589 *file->config = *config;
590 file->config->blocksize = global_config.blocksize;
591 file->config->ncacheblock = global_config.ncacheblock;
592 file_flag |= config->flag;
593 file->fd = file->ops->open(file->filename, file_flag, 0666);
594 if (file->fd < 0) {
595 if (file->fd == FDB_RESULT_NO_SUCH_FILE) {
596 // A database file was manually deleted by the user.
597 // Clean up global hash table, WAL index, and buffer cache.
598 // Then, retry it with a create option below IFF it is not
599 // a read-only open attempt
600 struct hash_elem *ret;
601 spin_unlock(&file->lock);
602 ret = hash_remove(&hash, &file->e);
603 fdb_assert(ret, 0, 0);
604 filemgr_free_func(&file->e);
605 if (!create) {
606 _log_errno_str(ops, log_callback,
607 FDB_RESULT_NO_SUCH_FILE, "OPEN", filename);
608 spin_unlock(&filemgr_openlock);
609 result.rv = FDB_RESULT_NO_SUCH_FILE;
610 return result;
611 }
612 } else {
613 _log_errno_str(file->ops, log_callback,
614 (fdb_status)file->fd, "OPEN", filename);
615 file->ref_count--;
616 spin_unlock(&file->lock);
617 spin_unlock(&filemgr_openlock);
618 result.rv = file->fd;
619 return result;
620 }
621 } else { // Reopening the closed file is succeed.
622 atomic_store_uint8_t(&file->status, FILE_NORMAL);
623 if (config->options & FILEMGR_SYNC) {
624 file->fflags |= FILEMGR_SYNC;
625 } else {
626 file->fflags &= ~FILEMGR_SYNC;
627 }
628 spin_unlock(&file->lock);
629 spin_unlock(&filemgr_openlock);
630 result.file = file;
631 result.rv = FDB_RESULT_SUCCESS;
632 return result;
633 }
634 } else { // file is already opened.
635
636 if (config->options & FILEMGR_SYNC) {
637 file->fflags |= FILEMGR_SYNC;
638 } else {
639 file->fflags &= ~FILEMGR_SYNC;
640 }
641
642 spin_unlock(&file->lock);
643 spin_unlock(&filemgr_openlock);
644 result.file = file;
645 result.rv = FDB_RESULT_SUCCESS;
646 return result;
647 }
648 }
649
650 file_flag = O_RDWR;
651 if (create) {
652 file_flag |= O_CREAT;
653 }
654 file_flag |= config->flag;
655 fd = ops->open(filename, file_flag, 0666);
656 if (fd < 0) {
657 _log_errno_str(ops, log_callback, (fdb_status)fd, "OPEN", filename);
658 spin_unlock(&filemgr_openlock);
659 result.rv = fd;
660 return result;
661 }
662 file = (struct filemgr*)calloc(1, sizeof(struct filemgr));
663 file->filename_len = strlen(filename);
664 file->filename = (char*)malloc(file->filename_len + 1);
665 strcpy(file->filename, filename);
666
667 file->ref_count = 1;
668
669 file->wal = (struct wal *)calloc(1, sizeof(struct wal));
670 file->wal->flag = 0;
671
672 file->ops = ops;
673 file->blocksize = global_config.blocksize;
674 atomic_init_uint8_t(&file->status, FILE_NORMAL);
675 file->config = (struct filemgr_config*)malloc(sizeof(struct filemgr_config));
676 *file->config = *config;
677 file->config->blocksize = global_config.blocksize;
678 file->config->ncacheblock = global_config.ncacheblock;
679 file->new_file = NULL;
680 file->old_filename = NULL;
681 file->fd = fd;
682
683 cs_off_t offset = file->ops->goto_eof(file->fd);
684 if (offset == FDB_RESULT_SEEK_FAIL) {
685 _log_errno_str(file->ops, log_callback, FDB_RESULT_SEEK_FAIL, "SEEK_END", filename);
686 file->ops->close(file->fd);
687 free(file->wal);
688 free(file->filename);
689 free(file->config);
690 free(file);
691 spin_unlock(&filemgr_openlock);
692 result.rv = FDB_RESULT_SEEK_FAIL;
693 return result;
694 }
695 atomic_init_uint64_t(&file->last_commit, offset);
696 atomic_init_uint64_t(&file->pos, offset);
697 atomic_init_uint32_t(&file->throttling_delay, 0);
698
699 file->bcache = NULL;
700 file->in_place_compaction = false;
701 file->kv_header = NULL;
702 file->prefetch_status = FILEMGR_PREFETCH_IDLE;
703
704 atomic_init_uint64_t(&file->header.bid, 0);
705 atomic_init_uint64_t(&file->header.dirty_idtree_root, 0);
706 atomic_init_uint64_t(&file->header.dirty_seqtree_root, 0);
707 _init_op_stats(&file->header.op_stat);
708 status = _filemgr_read_header(file, log_callback);
709 if (status != FDB_RESULT_SUCCESS) {
710 _log_errno_str(file->ops, log_callback, status, "READ", filename);
711 file->ops->close(file->fd);
712 free(file->wal);
713 free(file->filename);
714 free(file->config);
715 free(file);
716 spin_unlock(&filemgr_openlock);
717 result.rv = status;
718 return result;
719 }
720
721 spin_init(&file->lock);
722
723 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
724 struct plock_ops pops;
725 struct plock_config pconfig;
726
727 pops.init_user = mutex_init_wrap;
728 pops.lock_user = mutex_lock_wrap;
729 pops.unlock_user = mutex_unlock_wrap;
730 pops.destroy_user = mutex_destroy_wrap;
731 pops.init_internal = spin_init_wrap;
732 pops.lock_internal = spin_lock_wrap;
733 pops.unlock_internal = spin_unlock_wrap;
734 pops.destroy_internal = spin_destroy_wrap;
735 pops.is_overlapped = _block_is_overlapped;
736
737 memset(&pconfig, 0x0, sizeof(pconfig));
738 pconfig.ops = &pops;
739 pconfig.sizeof_lock_internal = sizeof(spin_t);
740 pconfig.sizeof_lock_user = sizeof(mutex_t);
741 pconfig.sizeof_range = sizeof(bid_t);
742 pconfig.aux = NULL;
743 plock_init(&file->plock, &pconfig);
744 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
745 int i;
746 for (i=0;i<DLOCK_MAX;++i) {
747 mutex_init(&file->data_mutex[i]);
748 }
749 #else
750 int i;
751 for (i=0;i<DLOCK_MAX;++i) {
752 spin_init(&file->data_spinlock[i]);
753 }
754 #endif //__FILEMGR_DATA_PARTIAL_LOCK
755
756 mutex_init(&file->writer_lock.mutex);
757 file->writer_lock.locked = false;
758
759 // initialize WAL
760 if (!wal_is_initialized(file)) {
761 wal_init(file, FDB_WAL_NBUCKET);
762 }
763
764 // init global transaction for the file
765 file->global_txn.wrapper = (struct wal_txn_wrapper*)
766 malloc(sizeof(struct wal_txn_wrapper));
767 file->global_txn.wrapper->txn = &file->global_txn;
768 file->global_txn.handle = NULL;
769 if (atomic_get_uint64_t(&file->pos)) {
770 file->global_txn.prev_hdr_bid =
771 (atomic_get_uint64_t(&file->pos) / file->blocksize) - 1;
772 } else {
773 file->global_txn.prev_hdr_bid = BLK_NOT_FOUND;
774 }
775 file->global_txn.items = (struct list *)malloc(sizeof(struct list));
776 list_init(file->global_txn.items);
777 file->global_txn.isolation = FDB_ISOLATION_READ_COMMITTED;
778 wal_add_transaction(file, &file->global_txn);
779
780 hash_insert(&hash, &file->e);
781 if (config->prefetch_duration > 0) {
782 filemgr_prefetch(file, config, log_callback);
783 }
784 spin_unlock(&filemgr_openlock);
785
786 if (config->options & FILEMGR_SYNC) {
787 file->fflags |= FILEMGR_SYNC;
788 } else {
789 file->fflags &= ~FILEMGR_SYNC;
790 }
791
792 result.file = file;
793 result.rv = FDB_RESULT_SUCCESS;
794 return result;
795 }
796
filemgr_update_header(struct filemgr *file, void *buf, size_t len)797 uint64_t filemgr_update_header(struct filemgr *file, void *buf, size_t len)
798 {
799 uint64_t ret;
800
801 spin_lock(&file->lock);
802
803 if (file->header.data == NULL) {
804 file->header.data = (void *)malloc(len);
805 }else if (file->header.size < len){
806 file->header.data = (void *)realloc(file->header.data, len);
807 }
808 memcpy(file->header.data, buf, len);
809 file->header.size = len;
810 ++(file->header.revnum);
811 ret = file->header.revnum;
812
813 spin_unlock(&file->lock);
814
815 return ret;
816 }
817
filemgr_get_header_revnum(struct filemgr *file)818 filemgr_header_revnum_t filemgr_get_header_revnum(struct filemgr *file)
819 {
820 filemgr_header_revnum_t ret;
821 spin_lock(&file->lock);
822 ret = file->header.revnum;
823 spin_unlock(&file->lock);
824 return ret;
825 }
826
827 // 'filemgr_get_seqnum' & 'filemgr_set_seqnum' have to be protected by
828 // 'filemgr_mutex_lock' & 'filemgr_mutex_unlock'.
filemgr_get_seqnum(struct filemgr *file)829 fdb_seqnum_t filemgr_get_seqnum(struct filemgr *file)
830 {
831 return file->header.seqnum;
832 }
833
filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)834 void filemgr_set_seqnum(struct filemgr *file, fdb_seqnum_t seqnum)
835 {
836 file->header.seqnum = seqnum;
837 }
838
filemgr_get_header(struct filemgr *file, void *buf, size_t *len, bid_t *header_bid, fdb_seqnum_t *seqnum, filemgr_header_revnum_t *header_revnum)839 void* filemgr_get_header(struct filemgr *file, void *buf, size_t *len,
840 bid_t *header_bid, fdb_seqnum_t *seqnum,
841 filemgr_header_revnum_t *header_revnum)
842 {
843 spin_lock(&file->lock);
844
845 if (file->header.size > 0) {
846 if (buf == NULL) {
847 buf = (void*)malloc(file->header.size);
848 }
849 memcpy(buf, file->header.data, file->header.size);
850 }
851
852 if (len) {
853 *len = file->header.size;
854 }
855 if (header_bid) {
856 *header_bid = ((file->header.size > 0) ?
857 atomic_get_uint64_t(&file->header.bid) : BLK_NOT_FOUND);
858 }
859 if (seqnum) {
860 *seqnum = file->header.seqnum;
861 }
862 if (header_revnum) {
863 *header_revnum = file->header.revnum;
864 }
865
866 spin_unlock(&file->lock);
867
868 return buf;
869 }
870
filemgr_fetch_header(struct filemgr *file, uint64_t bid, void *buf, size_t *len, fdb_seqnum_t *seqnum, filemgr_header_revnum_t *header_revnum, err_log_callback *log_callback)871 fdb_status filemgr_fetch_header(struct filemgr *file, uint64_t bid,
872 void *buf, size_t *len, fdb_seqnum_t *seqnum,
873 filemgr_header_revnum_t *header_revnum,
874 err_log_callback *log_callback)
875 {
876 uint8_t *_buf;
877 uint8_t marker[BLK_MARKER_SIZE];
878 filemgr_header_len_t hdr_len;
879 filemgr_magic_t magic;
880 fdb_status status = FDB_RESULT_SUCCESS;
881
882 if (!bid || bid == BLK_NOT_FOUND) {
883 *len = 0; // No other header available
884 return FDB_RESULT_SUCCESS;
885 }
886 _buf = (uint8_t *)_filemgr_get_temp_buf();
887
888 status = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
889
890 if (status != FDB_RESULT_SUCCESS) {
891 fdb_log(log_callback, status,
892 "Failed to read a database header with block id %" _F64 " in "
893 "a database file '%s'", bid, file->filename);
894 _filemgr_release_temp_buf(_buf);
895 return status;
896 }
897 memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
898 BLK_MARKER_SIZE);
899
900 if (marker[0] != BLK_MARKER_DBHEADER) {
901 fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
902 "A block marker of the database header block id %" _F64 " in "
903 "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
904 bid, file->filename);
905 _filemgr_release_temp_buf(_buf);
906 return FDB_RESULT_READ_FAIL;
907 }
908 memcpy(&magic,
909 _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
910 sizeof(magic));
911 magic = _endian_decode(magic);
912 if (magic != FILEMGR_MAGIC) {
913 fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
914 "A block magic value of the database header block id %" _F64 " in "
915 "a database file '%s' does NOT match FILEMGR_MAGIC!",
916 bid, file->filename);
917 _filemgr_release_temp_buf(_buf);
918 return FDB_RESULT_READ_FAIL;
919 }
920 memcpy(&hdr_len,
921 _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
922 sizeof(hdr_len), sizeof(hdr_len));
923 hdr_len = _endian_decode(hdr_len);
924
925 memcpy(buf, _buf, hdr_len);
926 *len = hdr_len;
927
928 if (header_revnum) {
929 // copy the DB header revnum
930 filemgr_header_revnum_t _revnum;
931 memcpy(&_revnum, _buf + hdr_len, sizeof(_revnum));
932 *header_revnum = _endian_decode(_revnum);
933 }
934 if (seqnum) {
935 // copy default KVS's seqnum
936 fdb_seqnum_t _seqnum;
937 memcpy(&_seqnum, _buf + hdr_len + sizeof(filemgr_header_revnum_t),
938 sizeof(_seqnum));
939 *seqnum = _endian_decode(_seqnum);
940 }
941
942 _filemgr_release_temp_buf(_buf);
943
944 return status;
945 }
946
filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid, void *buf, size_t *len, fdb_seqnum_t *seqnum, err_log_callback *log_callback)947 uint64_t filemgr_fetch_prev_header(struct filemgr *file, uint64_t bid,
948 void *buf, size_t *len, fdb_seqnum_t *seqnum,
949 err_log_callback *log_callback)
950 {
951 uint8_t *_buf;
952 uint8_t marker[BLK_MARKER_SIZE];
953 fdb_seqnum_t _seqnum;
954 filemgr_header_revnum_t _revnum;
955 filemgr_header_len_t hdr_len;
956 filemgr_magic_t magic;
957 bid_t _prev_bid, prev_bid;
958 int found = 0;
959
960 if (!bid || bid == BLK_NOT_FOUND) {
961 *len = 0; // No other header available
962 return bid;
963 }
964 _buf = (uint8_t *)_filemgr_get_temp_buf();
965
966 // Reverse scan the file for a previous DB header
967 do {
968 // Get prev_bid from the current header.
969 // Since the current header is already cached during the previous
970 // operation, no disk I/O will be triggered.
971 if (filemgr_read(file, (bid_t)bid, _buf, log_callback, true)
972 != FDB_RESULT_SUCCESS) {
973 break;
974 }
975
976 memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
977 BLK_MARKER_SIZE);
978 memcpy(&magic,
979 _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
980 sizeof(magic));
981 magic = _endian_decode(magic);
982
983 if (marker[0] != BLK_MARKER_DBHEADER ||
984 magic != FILEMGR_MAGIC) {
985 // not a header block
986 // this happens when this function is invoked between
987 // fdb_set() call and fdb_commit() call, so the last block
988 // in the file is not a header block
989 bid_t latest_hdr = filemgr_get_header_bid(file);
990 if (latest_hdr != BLK_NOT_FOUND && bid > latest_hdr) {
991 // get the latest header BID
992 bid = latest_hdr;
993 } else {
994 break;
995 }
996 } else {
997 memcpy(&_prev_bid,
998 _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
999 sizeof(hdr_len) - sizeof(_prev_bid),
1000 sizeof(_prev_bid));
1001 prev_bid = _endian_decode(_prev_bid);
1002 if (bid <= prev_bid) {
1003 // no more prev header, or broken linked list
1004 break;
1005 }
1006 bid = prev_bid;
1007 }
1008
1009 // Read the prev header
1010 fdb_status fs = filemgr_read(file, (bid_t)bid, _buf, log_callback, true);
1011 if (fs != FDB_RESULT_SUCCESS) {
1012 fdb_log(log_callback, fs,
1013 "Failed to read a previous database header with block id %" _F64 " in "
1014 "a database file '%s'", bid, file->filename);
1015 break;
1016 }
1017
1018 memcpy(marker, _buf + file->blocksize - BLK_MARKER_SIZE,
1019 BLK_MARKER_SIZE);
1020 if (marker[0] != BLK_MARKER_DBHEADER) {
1021 if (bid) {
1022 // broken linked list
1023 fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1024 "A block marker of the previous database header block id %"
1025 _F64 " in "
1026 "a database file '%s' does NOT match BLK_MARKER_DBHEADER!",
1027 bid, file->filename);
1028 }
1029 break;
1030 }
1031
1032 memcpy(&magic,
1033 _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic),
1034 sizeof(magic));
1035 magic = _endian_decode(magic);
1036 if (magic != FILEMGR_MAGIC) {
1037 // broken linked list
1038 fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1039 "A block magic value of the previous database header block id %" _F64 " in "
1040 "a database file '%s' does NOT match FILEMGR_MAGIC!",
1041 bid, file->filename);
1042 break;
1043 }
1044
1045 memcpy(&hdr_len,
1046 _buf + file->blocksize - BLK_MARKER_SIZE - sizeof(magic) -
1047 sizeof(hdr_len), sizeof(hdr_len));
1048 hdr_len = _endian_decode(hdr_len);
1049
1050 if (buf) {
1051 memcpy(buf, _buf, hdr_len);
1052 }
1053 memcpy(&_revnum, _buf + hdr_len,
1054 sizeof(filemgr_header_revnum_t));
1055 memcpy(&_seqnum,
1056 _buf + hdr_len + sizeof(filemgr_header_revnum_t),
1057 sizeof(fdb_seqnum_t));
1058 *seqnum = _endian_decode(_seqnum);
1059 *len = hdr_len;
1060 found = 1;
1061 break;
1062 } while (false); // no repetition
1063
1064 if (!found) { // no other header found till end of file
1065 *len = 0;
1066 }
1067
1068 _filemgr_release_temp_buf(_buf);
1069
1070 return bid;
1071 }
1072
filemgr_close(struct filemgr *file, bool cleanup_cache_onclose, const char *orig_file_name, err_log_callback *log_callback)1073 fdb_status filemgr_close(struct filemgr *file, bool cleanup_cache_onclose,
1074 const char *orig_file_name,
1075 err_log_callback *log_callback)
1076 {
1077 int rv = FDB_RESULT_SUCCESS;
1078
1079 spin_lock(&filemgr_openlock); // Grab the filemgr lock to avoid the race with
1080 // filemgr_open() because file->lock won't
1081 // prevent the race condition.
1082
1083 // remove filemgr structure if no thread refers to the file
1084 spin_lock(&file->lock);
1085 if (--(file->ref_count) == 0) {
1086 if (global_config.ncacheblock > 0 &&
1087 atomic_get_uint8_t(&file->status) != FILE_REMOVED_PENDING) {
1088 spin_unlock(&file->lock);
1089 // discard all dirty blocks belonged to this file
1090 bcache_remove_dirty_blocks(file);
1091 } else {
1092 // If the file is in pending removal (i.e., FILE_REMOVED_PENDING),
1093 // then its dirty block entries will be cleaned up in either
1094 // filemgr_free_func() or register_file_removal() below.
1095 spin_unlock(&file->lock);
1096 }
1097
1098 if (wal_is_initialized(file)) {
1099 wal_close(file);
1100 }
1101
1102 spin_lock(&file->lock);
1103 rv = file->ops->close(file->fd);
1104 if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING) {
1105 _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1106
1107 bool foreground_deletion = false;
1108
1109 // immediately remove file if background remove function is not set
1110 if (!lazy_file_deletion_enabled ||
1111 (file->new_file && file->new_file->in_place_compaction)) {
1112 // TODO: to avoid the scenario below, we prevent background
1113 // deletion of in-place compacted files at this time.
1114 // 1) In-place compacted from 'A' to 'A.1'.
1115 // 2) Request to delete 'A'.
1116 // 3) Close 'A.1'; since 'A' is not deleted yet, 'A.1' is not renamed.
1117 // 4) User opens DB file using its original name 'A', not 'A.1'.
1118 // 5) Old file 'A' is opened, and then background thread deletes 'A'.
1119 // 6) Crash!
1120 remove(file->filename);
1121 foreground_deletion = true;
1122 }
1123
1124 // we can release lock becuase no one will open this file
1125 spin_unlock(&file->lock);
1126 struct hash_elem *ret = hash_remove(&hash, &file->e);
1127 fdb_assert(ret, 0, 0);
1128 spin_unlock(&filemgr_openlock);
1129
1130 if (foreground_deletion) {
1131 filemgr_free_func(&file->e);
1132 } else {
1133 register_file_removal(file, log_callback);
1134 }
1135 return (fdb_status) rv;
1136 } else {
1137 if (cleanup_cache_onclose) {
1138 _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1139 if (file->in_place_compaction && orig_file_name) {
1140 struct hash_elem *elem = NULL;
1141 struct filemgr query;
1142 uint32_t old_file_refcount = 0;
1143
1144 query.filename = (char *)orig_file_name;
1145 elem = hash_find(&hash, &query.e);
1146
1147 if (file->old_filename) {
1148 struct hash_elem *elem_old = NULL;
1149 struct filemgr query_old;
1150 struct filemgr *old_file = NULL;
1151
1152 // get old file's ref count if exists
1153 query_old.filename = file->old_filename;
1154 elem_old = hash_find(&hash, &query_old.e);
1155 if (elem_old) {
1156 old_file = _get_entry(elem_old, struct filemgr, e);
1157 old_file_refcount = old_file->ref_count;
1158 }
1159 }
1160
1161 // If old file is opened by other handle, renaming should be
1162 // postponed. It will be renamed later by the handle referring
1163 // to the old file.
1164 if (!elem && old_file_refcount == 0 &&
1165 is_file_removed(orig_file_name)) {
1166 // If background file removal is not done yet, we postpone
1167 // file renaming at this time.
1168 if (rename(file->filename, orig_file_name) < 0) {
1169 // Note that the renaming failure is not a critical
1170 // issue because the last compacted file will be automatically
1171 // identified and opened in the next fdb_open call.
1172 _log_errno_str(file->ops, log_callback, FDB_RESULT_FILE_RENAME_FAIL,
1173 "CLOSE", file->filename);
1174 }
1175 }
1176 }
1177 spin_unlock(&file->lock);
1178 // Clean up global hash table, WAL index, and buffer cache.
1179 struct hash_elem *ret = hash_remove(&hash, &file->e);
1180 fdb_assert(ret, file, 0);
1181 spin_unlock(&filemgr_openlock);
1182 filemgr_free_func(&file->e);
1183 return (fdb_status) rv;
1184 } else {
1185 atomic_store_uint8_t(&file->status, FILE_CLOSED);
1186 }
1187 }
1188 }
1189
1190 _log_errno_str(file->ops, log_callback, (fdb_status)rv, "CLOSE", file->filename);
1191
1192 spin_unlock(&file->lock);
1193 spin_unlock(&filemgr_openlock);
1194 return (fdb_status) rv;
1195 }
1196
filemgr_remove_all_buffer_blocks(struct filemgr *file)1197 void filemgr_remove_all_buffer_blocks(struct filemgr *file)
1198 {
1199 // remove all cached blocks
1200 if (global_config.ncacheblock > 0 && file->bcache) {
1201 bcache_remove_dirty_blocks(file);
1202 bcache_remove_clean_blocks(file);
1203 bcache_remove_file(file);
1204 file->bcache = NULL;
1205 }
1206 }
1207
filemgr_free_func(struct hash_elem *h)1208 void filemgr_free_func(struct hash_elem *h)
1209 {
1210 struct filemgr *file = _get_entry(h, struct filemgr, e);
1211
1212 spin_lock(&file->lock);
1213 if (file->prefetch_status == FILEMGR_PREFETCH_RUNNING) {
1214 // prefetch thread is running
1215 void *ret;
1216 file->prefetch_status = FILEMGR_PREFETCH_ABORT;
1217 spin_unlock(&file->lock);
1218 // wait
1219 thread_join(file->prefetch_tid, &ret);
1220 } else {
1221 spin_unlock(&file->lock);
1222 }
1223
1224 // remove all cached blocks
1225 if (global_config.ncacheblock > 0 && file->bcache) {
1226 bcache_remove_dirty_blocks(file);
1227 bcache_remove_clean_blocks(file);
1228 bcache_remove_file(file);
1229 file->bcache = NULL;
1230 }
1231
1232 if (file->kv_header) {
1233 // multi KV intance mode & KV header exists
1234 file->free_kv_header(file);
1235 }
1236
1237 // free global transaction
1238 wal_remove_transaction(file, &file->global_txn);
1239 free(file->global_txn.items);
1240 free(file->global_txn.wrapper);
1241
1242 // destroy WAL
1243 if (wal_is_initialized(file)) {
1244 wal_shutdown(file);
1245 size_t i = 0;
1246 size_t num_all_shards = wal_get_num_all_shards(file);
1247 // Free all WAL shards (including compactor's shard)
1248 for (; i < num_all_shards; ++i) {
1249 hash_free(&file->wal->key_shards[i].hash_bykey);
1250 spin_destroy(&file->wal->key_shards[i].lock);
1251 hash_free(&file->wal->seq_shards[i].hash_byseq);
1252 spin_destroy(&file->wal->seq_shards[i].lock);
1253 }
1254 spin_destroy(&file->wal->lock);
1255 atomic_destroy_uint32_t(&file->wal->size);
1256 atomic_destroy_uint32_t(&file->wal->num_flushable);
1257 atomic_destroy_uint64_t(&file->wal->datasize);
1258 free(file->wal->key_shards);
1259 free(file->wal->seq_shards);
1260 }
1261 free(file->wal);
1262
1263 // free filename and header
1264 free(file->filename);
1265 if (file->header.data) free(file->header.data);
1266 // free old filename if any
1267 free(file->old_filename);
1268
1269 // destroy locks
1270 spin_destroy(&file->lock);
1271
1272 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1273 plock_destroy(&file->plock);
1274 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1275 int i;
1276 for (i=0;i<DLOCK_MAX;++i) {
1277 mutex_destroy(&file->data_mutex[i]);
1278 }
1279 #else
1280 int i;
1281 for (i=0;i<DLOCK_MAX;++i) {
1282 spin_destroy(&file->data_spinlock[i]);
1283 }
1284 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1285
1286 mutex_destroy(&file->writer_lock.mutex);
1287
1288 atomic_destroy_uint64_t(&file->pos);
1289 atomic_destroy_uint64_t(&file->last_commit);
1290 atomic_destroy_uint32_t(&file->throttling_delay);
1291
1292 // free file structure
1293 free(file->config);
1294 free(file);
1295 }
1296
1297 // permanently remove file from cache (not just close)
1298 // LCOV_EXCL_START
filemgr_remove_file(struct filemgr *file)1299 void filemgr_remove_file(struct filemgr *file)
1300 {
1301 struct hash_elem *ret;
1302
1303 fdb_assert(file, file, NULL);
1304 fdb_assert(file->ref_count <= 0, file->ref_count, 0);
1305
1306 // remove from global hash table
1307 spin_lock(&filemgr_openlock);
1308 ret = hash_remove(&hash, &file->e);
1309 fdb_assert(ret, ret, NULL);
1310 spin_unlock(&filemgr_openlock);
1311
1312 if (!lazy_file_deletion_enabled ||
1313 (file->new_file && file->new_file->in_place_compaction)) {
1314 filemgr_free_func(&file->e);
1315 } else {
1316 register_file_removal(file, NULL);
1317 }
1318 }
1319 // LCOV_EXCL_STOP
1320
1321 static
_filemgr_is_closed(struct hash_elem *h, void *ctx)1322 void *_filemgr_is_closed(struct hash_elem *h, void *ctx) {
1323 struct filemgr *file = _get_entry(h, struct filemgr, e);
1324 void *ret;
1325 spin_lock(&file->lock);
1326 if (file->ref_count != 0) {
1327 ret = (void *)file;
1328 } else {
1329 ret = NULL;
1330 }
1331 spin_unlock(&file->lock);
1332 return ret;
1333 }
1334
filemgr_shutdown()1335 fdb_status filemgr_shutdown()
1336 {
1337 fdb_status ret = FDB_RESULT_SUCCESS;
1338 void *open_file;
1339 if (filemgr_initialized) {
1340
1341 #ifndef SPIN_INITIALIZER
1342 // Windows: check if spin lock is already destroyed.
1343 if (InterlockedCompareExchange(&initial_lock_status, 1, 2) == 2) {
1344 spin_lock(&initial_lock);
1345 } else {
1346 // filemgr is already shut down
1347 return ret;
1348 }
1349 #else
1350 spin_lock(&initial_lock);
1351 #endif
1352
1353 if (!filemgr_initialized) {
1354 // filemgr is already shut down
1355 #ifdef SPIN_INITIALIZER
1356 spin_unlock(&initial_lock);
1357 #endif
1358 return ret;
1359 }
1360
1361 open_file = hash_scan(&hash, _filemgr_is_closed, NULL);
1362 if (!open_file) {
1363 hash_free_active(&hash, filemgr_free_func);
1364 if (global_config.ncacheblock > 0) {
1365 bcache_shutdown();
1366 }
1367 filemgr_initialized = 0;
1368 #ifndef SPIN_INITIALIZER
1369 initial_lock_status = 0;
1370 #else
1371 initial_lock = SPIN_INITIALIZER;
1372 #endif
1373 _filemgr_shutdown_temp_buf();
1374 spin_unlock(&initial_lock);
1375 #ifndef SPIN_INITIALIZER
1376 spin_destroy(&initial_lock);
1377 #endif
1378 } else {
1379 spin_unlock(&initial_lock);
1380 ret = FDB_RESULT_FILE_IS_BUSY;
1381 }
1382 }
1383 return ret;
1384 }
1385
filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)1386 bid_t filemgr_alloc(struct filemgr *file, err_log_callback *log_callback)
1387 {
1388 spin_lock(&file->lock);
1389 bid_t bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1390 atomic_add_uint64_t(&file->pos, file->blocksize);
1391
1392 if (global_config.ncacheblock <= 0) {
1393 // if block cache is turned off, write the allocated block before use
1394 uint8_t _buf = 0x0;
1395 ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1396 atomic_get_uint64_t(&file->pos) - 1);
1397 _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1398 }
1399 spin_unlock(&file->lock);
1400
1401 return bid;
1402 }
1403
filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin, bid_t *end, err_log_callback *log_callback)1404 void filemgr_alloc_multiple(struct filemgr *file, int nblock, bid_t *begin,
1405 bid_t *end, err_log_callback *log_callback)
1406 {
1407 spin_lock(&file->lock);
1408 *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1409 *end = *begin + nblock - 1;
1410 atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1411
1412 if (global_config.ncacheblock <= 0) {
1413 // if block cache is turned off, write the allocated block before use
1414 uint8_t _buf = 0x0;
1415 ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1416 atomic_get_uint64_t(&file->pos) - 1);
1417 _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1418 }
1419 spin_unlock(&file->lock);
1420 }
1421
1422 // atomically allocate NBLOCK blocks only when current file position is same to nextbid
filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock, bid_t *begin, bid_t *end, err_log_callback *log_callback)1423 bid_t filemgr_alloc_multiple_cond(struct filemgr *file, bid_t nextbid, int nblock,
1424 bid_t *begin, bid_t *end,
1425 err_log_callback *log_callback)
1426 {
1427 bid_t bid;
1428 spin_lock(&file->lock);
1429 bid = atomic_get_uint64_t(&file->pos) / file->blocksize;
1430 if (bid == nextbid) {
1431 *begin = atomic_get_uint64_t(&file->pos) / file->blocksize;
1432 *end = *begin + nblock - 1;
1433 atomic_add_uint64_t(&file->pos, file->blocksize * nblock);
1434
1435 if (global_config.ncacheblock <= 0) {
1436 // if block cache is turned off, write the allocated block before use
1437 uint8_t _buf = 0x0;
1438 ssize_t rv = file->ops->pwrite(file->fd, &_buf, 1,
1439 atomic_get_uint64_t(&file->pos));
1440 _log_errno_str(file->ops, log_callback, (fdb_status) rv, "WRITE", file->filename);
1441 }
1442 }else{
1443 *begin = BLK_NOT_FOUND;
1444 *end = BLK_NOT_FOUND;
1445 }
1446 spin_unlock(&file->lock);
1447 return bid;
1448 }
1449
1450 #ifdef __CRC32
_filemgr_crc32_check(struct filemgr *file, void *buf)1451 INLINE fdb_status _filemgr_crc32_check(struct filemgr *file, void *buf)
1452 {
1453 if ( *((uint8_t*)buf + file->blocksize-1) == BLK_MARKER_BNODE ) {
1454 uint32_t crc_file, crc;
1455 memcpy(&crc_file, (uint8_t *) buf + BTREE_CRC_OFFSET, sizeof(crc_file));
1456 crc_file = _endian_decode(crc_file);
1457 memset((uint8_t *) buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1458 crc = chksum(buf, file->blocksize);
1459 if (crc != crc_file) {
1460 return FDB_RESULT_CHECKSUM_ERROR;
1461 }
1462 }
1463 return FDB_RESULT_SUCCESS;
1464 }
1465 #endif
1466
filemgr_invalidate_block(struct filemgr *file, bid_t bid)1467 void filemgr_invalidate_block(struct filemgr *file, bid_t bid)
1468 {
1469 if (global_config.ncacheblock > 0) {
1470 bcache_invalidate_block(file, bid);
1471 }
1472 }
1473
filemgr_read(struct filemgr *file, bid_t bid, void *buf, err_log_callback *log_callback, bool read_on_cache_miss)1474 fdb_status filemgr_read(struct filemgr *file, bid_t bid, void *buf,
1475 err_log_callback *log_callback,
1476 bool read_on_cache_miss)
1477 {
1478 size_t lock_no;
1479 ssize_t r;
1480 uint64_t pos = bid * file->blocksize;
1481 fdb_status status = FDB_RESULT_SUCCESS;
1482 uint64_t curr_pos = atomic_get_uint64_t(&file->pos);
1483 fdb_assert(pos < curr_pos, pos, curr_pos);
1484
1485 if (global_config.ncacheblock > 0) {
1486 lock_no = bid % DLOCK_MAX;
1487 (void)lock_no;
1488
1489 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1490 plock_entry_t *plock_entry = NULL;
1491 bid_t is_writer = 0;
1492 #endif
1493 bool locked = false;
1494 // Note: we don't need to grab lock for committed blocks
1495 // because they are immutable so that no writer will interfere and
1496 // overwrite dirty data
1497 if (filemgr_is_writable(file, bid)) {
1498 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1499 plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1500 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1501 mutex_lock(&file->data_mutex[lock_no]);
1502 #else
1503 spin_lock(&file->data_spinlock[lock_no]);
1504 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1505 locked = true;
1506 }
1507
1508 r = bcache_read(file, bid, buf);
1509 if (r == 0) {
1510 // cache miss
1511 if (!read_on_cache_miss) {
1512 if (locked) {
1513 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1514 plock_unlock(&file->plock, plock_entry);
1515 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1516 mutex_unlock(&file->data_mutex[lock_no]);
1517 #else
1518 spin_unlock(&file->data_spinlock[lock_no]);
1519 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1520 }
1521 return FDB_RESULT_READ_FAIL;
1522 }
1523
1524 // if normal file, just read a block
1525 r = file->ops->pread(file->fd, buf, file->blocksize, pos);
1526 if (r != file->blocksize) {
1527 _log_errno_str(file->ops, log_callback,
1528 (fdb_status) r, "READ", file->filename);
1529 if (locked) {
1530 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1531 plock_unlock(&file->plock, plock_entry);
1532 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1533 mutex_unlock(&file->data_mutex[lock_no]);
1534 #else
1535 spin_unlock(&file->data_spinlock[lock_no]);
1536 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1537 }
1538 return (fdb_status)r;
1539 }
1540 #ifdef __CRC32
1541 status = _filemgr_crc32_check(file, buf);
1542 if (status != FDB_RESULT_SUCCESS) {
1543 _log_errno_str(file->ops, log_callback, status, "READ",
1544 file->filename);
1545 if (locked) {
1546 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1547 plock_unlock(&file->plock, plock_entry);
1548 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1549 mutex_unlock(&file->data_mutex[lock_no]);
1550 #else
1551 spin_unlock(&file->data_spinlock[lock_no]);
1552 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1553 }
1554 return status;
1555 }
1556 #endif
1557 r = bcache_write(file, bid, buf, BCACHE_REQ_CLEAN);
1558 if (r != global_config.blocksize) {
1559 if (locked) {
1560 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1561 plock_unlock(&file->plock, plock_entry);
1562 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1563 mutex_unlock(&file->data_mutex[lock_no]);
1564 #else
1565 spin_unlock(&file->data_spinlock[lock_no]);
1566 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1567 }
1568 _log_errno_str(file->ops, log_callback,
1569 (fdb_status) r, "WRITE", file->filename);
1570 return FDB_RESULT_WRITE_FAIL;
1571 }
1572 }
1573 if (locked) {
1574 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1575 plock_unlock(&file->plock, plock_entry);
1576 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1577 mutex_unlock(&file->data_mutex[lock_no]);
1578 #else
1579 spin_unlock(&file->data_spinlock[lock_no]);
1580 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1581 }
1582 } else {
1583 if (!read_on_cache_miss) {
1584 return FDB_RESULT_READ_FAIL;
1585 }
1586
1587 r = file->ops->pread(file->fd, buf, file->blocksize, pos);
1588 if (r != file->blocksize) {
1589 _log_errno_str(file->ops, log_callback, (fdb_status) r, "READ",
1590 file->filename);
1591 return (fdb_status)r;
1592 }
1593
1594 #ifdef __CRC32
1595 status = _filemgr_crc32_check(file, buf);
1596 if (status != FDB_RESULT_SUCCESS) {
1597 _log_errno_str(file->ops, log_callback, status, "READ",
1598 file->filename);
1599 return status;
1600 }
1601 #endif
1602 }
1603 return status;
1604 }
1605
filemgr_write_offset(struct filemgr *file, bid_t bid, uint64_t offset, uint64_t len, void *buf, err_log_callback *log_callback)1606 fdb_status filemgr_write_offset(struct filemgr *file, bid_t bid,
1607 uint64_t offset, uint64_t len, void *buf,
1608 err_log_callback *log_callback)
1609 {
1610 fdb_assert(offset + len <= file->blocksize, offset + len, file);
1611
1612 size_t lock_no;
1613 ssize_t r = 0;
1614 uint64_t pos = bid * file->blocksize + offset;
1615 uint64_t curr_commit_pos = atomic_get_uint64_t(&file->last_commit);
1616 fdb_assert(pos >= curr_commit_pos, pos, curr_commit_pos);
1617
1618 if (global_config.ncacheblock > 0) {
1619 lock_no = bid % DLOCK_MAX;
1620 (void)lock_no;
1621
1622 bool locked = false;
1623 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1624 plock_entry_t *plock_entry;
1625 bid_t is_writer = 1;
1626 plock_entry = plock_lock(&file->plock, &bid, &is_writer);
1627 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1628 mutex_lock(&file->data_mutex[lock_no]);
1629 #else
1630 spin_lock(&file->data_spinlock[lock_no]);
1631 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1632 locked = true;
1633
1634 if (len == file->blocksize) {
1635 // write entire block .. we don't need to read previous block
1636 r = bcache_write(file, bid, buf, BCACHE_REQ_DIRTY);
1637 if (r != global_config.blocksize) {
1638 if (locked) {
1639 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1640 plock_unlock(&file->plock, plock_entry);
1641 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1642 mutex_unlock(&file->data_mutex[lock_no]);
1643 #else
1644 spin_unlock(&file->data_spinlock[lock_no]);
1645 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1646 }
1647 _log_errno_str(file->ops, log_callback,
1648 (fdb_status) r, "WRITE", file->filename);
1649 return FDB_RESULT_WRITE_FAIL;
1650 }
1651 } else {
1652 // partially write buffer cache first
1653 r = bcache_write_partial(file, bid, buf, offset, len);
1654 if (r == 0) {
1655 // cache miss
1656 // write partially .. we have to read previous contents of the block
1657 uint64_t cur_file_pos = file->ops->goto_eof(file->fd);
1658 bid_t cur_file_last_bid = cur_file_pos / file->blocksize;
1659 void *_buf = _filemgr_get_temp_buf();
1660
1661 if (bid >= cur_file_last_bid) {
1662 // this is the first time to write this block
1663 // we don't need to read previous block from file.
1664 } else {
1665 r = file->ops->pread(file->fd, _buf, file->blocksize,
1666 bid * file->blocksize);
1667 if (r != file->blocksize) {
1668 if (locked) {
1669 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1670 plock_unlock(&file->plock, plock_entry);
1671 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1672 mutex_unlock(&file->data_mutex[lock_no]);
1673 #else
1674 spin_unlock(&file->data_spinlock[lock_no]);
1675 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1676 }
1677 _filemgr_release_temp_buf(_buf);
1678 _log_errno_str(file->ops, log_callback, (fdb_status) r,
1679 "READ", file->filename);
1680 return FDB_RESULT_READ_FAIL;
1681 }
1682 }
1683 memcpy((uint8_t *)_buf + offset, buf, len);
1684 r = bcache_write(file, bid, _buf, BCACHE_REQ_DIRTY);
1685 if (r != global_config.blocksize) {
1686 if (locked) {
1687 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1688 plock_unlock(&file->plock, plock_entry);
1689 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1690 mutex_unlock(&file->data_mutex[lock_no]);
1691 #else
1692 spin_unlock(&file->data_spinlock[lock_no]);
1693 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1694 }
1695 _filemgr_release_temp_buf(_buf);
1696 _log_errno_str(file->ops, log_callback,
1697 (fdb_status) r, "WRITE", file->filename);
1698 return FDB_RESULT_WRITE_FAIL;
1699 }
1700
1701 _filemgr_release_temp_buf(_buf);
1702 } // cache miss
1703 } // full block or partial block
1704
1705 if (locked) {
1706 #ifdef __FILEMGR_DATA_PARTIAL_LOCK
1707 plock_unlock(&file->plock, plock_entry);
1708 #elif defined(__FILEMGR_DATA_MUTEX_LOCK)
1709 mutex_unlock(&file->data_mutex[lock_no]);
1710 #else
1711 spin_unlock(&file->data_spinlock[lock_no]);
1712 #endif //__FILEMGR_DATA_PARTIAL_LOCK
1713 }
1714
1715 } else { // block cache disabled
1716
1717 #ifdef __CRC32
1718 if (len == file->blocksize) {
1719 uint8_t marker = *((uint8_t*)buf + file->blocksize - 1);
1720 if (marker == BLK_MARKER_BNODE) {
1721 memset((uint8_t *)buf + BTREE_CRC_OFFSET, 0xff, BTREE_CRC_FIELD_LEN);
1722 uint32_t crc32 = chksum(buf, file->blocksize);
1723 crc32 = _endian_encode(crc32);
1724 memcpy((uint8_t *)buf + BTREE_CRC_OFFSET, &crc32, sizeof(crc32));
1725 }
1726 }
1727 #endif
1728
1729 r = file->ops->pwrite(file->fd, buf, len, pos);
1730 _log_errno_str(file->ops, log_callback, (fdb_status) r, "WRITE", file->filename);
1731 if ((uint64_t)r != len) {
1732 return FDB_RESULT_WRITE_FAIL;
1733 }
1734 } // block cache check
1735 return FDB_RESULT_SUCCESS;
1736 }
1737
filemgr_write(struct filemgr *file, bid_t bid, void *buf, err_log_callback *log_callback)1738 fdb_status filemgr_write(struct filemgr *file, bid_t bid, void *buf,
1739 err_log_callback *log_callback)
1740 {
1741 return filemgr_write_offset(file, bid, 0, file->blocksize, buf,
1742 log_callback);
1743 }
1744
filemgr_commit(struct filemgr *file, err_log_callback *log_callback)1745 fdb_status filemgr_commit(struct filemgr *file,
1746 err_log_callback *log_callback)
1747 {
1748 uint16_t header_len = file->header.size;
1749 uint16_t _header_len;
1750 bid_t _prev_bid;
1751 fdb_seqnum_t _seqnum;
1752 filemgr_header_revnum_t _revnum;
1753 int result = FDB_RESULT_SUCCESS;
1754 filemgr_magic_t magic = FILEMGR_MAGIC;
1755 filemgr_magic_t _magic;
1756
1757 if (global_config.ncacheblock > 0) {
1758 result = bcache_flush(file);
1759 if (result != FDB_RESULT_SUCCESS) {
1760 _log_errno_str(file->ops, log_callback, (fdb_status) result,
1761 "FLUSH", file->filename);
1762 return (fdb_status)result;
1763 }
1764 }
1765
1766 spin_lock(&file->lock);
1767
1768 if (file->header.size > 0 && file->header.data) {
1769 void *buf = _filemgr_get_temp_buf();
1770 uint8_t marker[BLK_MARKER_SIZE];
1771
1772 // [header data]: 'header_len' bytes <---+
1773 // [header revnum]: 8 bytes |
1774 // [default KVS seqnum]: 8 bytes |
1775 // ... |
1776 // (empty) blocksize
1777 // ... |
1778 // [prev header bid]: 8 bytes |
1779 // [header length]: 2 bytes |
1780 // [magic number]: 8 bytes |
1781 // [block marker]: 1 byte <---+
1782
1783 // header data
1784 memcpy(buf, file->header.data, header_len);
1785 // header rev number
1786 _revnum = _endian_encode(file->header.revnum);
1787 memcpy((uint8_t *)buf + header_len, &_revnum,
1788 sizeof(filemgr_header_revnum_t));
1789 // file's sequence number (default KVS seqnum)
1790 _seqnum = _endian_encode(file->header.seqnum);
1791 memcpy((uint8_t *)buf + header_len + sizeof(filemgr_header_revnum_t),
1792 &_seqnum, sizeof(fdb_seqnum_t));
1793
1794 // prev header bid
1795 _prev_bid = _endian_encode(atomic_get_uint64_t(&file->header.bid));
1796 memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1797 - sizeof(header_len) - sizeof(_prev_bid) - BLK_MARKER_SIZE),
1798 &_prev_bid, sizeof(_prev_bid));
1799 // header length
1800 _header_len = _endian_encode(header_len);
1801 memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1802 - sizeof(header_len) - BLK_MARKER_SIZE),
1803 &_header_len, sizeof(header_len));
1804 // magic number
1805 _magic = _endian_encode(magic);
1806 memcpy((uint8_t *)buf + (file->blocksize - sizeof(filemgr_magic_t)
1807 - BLK_MARKER_SIZE), &_magic, sizeof(magic));
1808
1809 // marker
1810 memset(marker, BLK_MARKER_DBHEADER, BLK_MARKER_SIZE);
1811 memcpy((uint8_t *)buf + file->blocksize - BLK_MARKER_SIZE,
1812 marker, BLK_MARKER_SIZE);
1813
1814 ssize_t rv = file->ops->pwrite(file->fd, buf, file->blocksize,
1815 atomic_get_uint64_t(&file->pos));
1816 _log_errno_str(file->ops, log_callback, (fdb_status) rv,
1817 "WRITE", file->filename);
1818 if (rv != file->blocksize) {
1819 _filemgr_release_temp_buf(buf);
1820 spin_unlock(&file->lock);
1821 return FDB_RESULT_WRITE_FAIL;
1822 }
1823 atomic_store_uint64_t(&file->header.bid,
1824 atomic_get_uint64_t(&file->pos) / file->blocksize);
1825 atomic_add_uint64_t(&file->pos, file->blocksize);
1826
1827 atomic_store_uint64_t(&file->header.dirty_idtree_root, BLK_NOT_FOUND);
1828 atomic_store_uint64_t(&file->header.dirty_seqtree_root, BLK_NOT_FOUND);
1829
1830 _filemgr_release_temp_buf(buf);
1831 }
1832 // race condition?
1833 atomic_store_uint64_t(&file->last_commit, atomic_get_uint64_t(&file->pos));
1834
1835 spin_unlock(&file->lock);
1836
1837 if (file->fflags & FILEMGR_SYNC) {
1838 result = file->ops->fsync(file->fd);
1839 _log_errno_str(file->ops, log_callback, (fdb_status)result, "FSYNC", file->filename);
1840 }
1841 return (fdb_status) result;
1842 }
1843
filemgr_sync(struct filemgr *file, err_log_callback *log_callback)1844 fdb_status filemgr_sync(struct filemgr *file, err_log_callback *log_callback)
1845 {
1846 fdb_status result = FDB_RESULT_SUCCESS;
1847 if (global_config.ncacheblock > 0) {
1848 result = bcache_flush(file);
1849 if (result != FDB_RESULT_SUCCESS) {
1850 _log_errno_str(file->ops, log_callback, (fdb_status) result,
1851 "FLUSH", file->filename);
1852 return result;
1853 }
1854 }
1855
1856 if (file->fflags & FILEMGR_SYNC) {
1857 int rv = file->ops->fsync(file->fd);
1858 _log_errno_str(file->ops, log_callback, (fdb_status)rv, "FSYNC", file->filename);
1859 return (fdb_status) rv;
1860 }
1861 return result;
1862 }
1863
filemgr_update_file_status(struct filemgr *file, file_status_t status, char *old_filename)1864 int filemgr_update_file_status(struct filemgr *file, file_status_t status,
1865 char *old_filename)
1866 {
1867 int ret = 1;
1868 spin_lock(&file->lock);
1869 atomic_store_uint8_t(&file->status, status);
1870 if (old_filename) {
1871 if (!file->old_filename) {
1872 file->old_filename = old_filename;
1873 } else {
1874 ret = 0;
1875 fdb_assert(file->ref_count, file->ref_count, 0);
1876 free(old_filename);
1877 }
1878 }
1879 spin_unlock(&file->lock);
1880 return ret;
1881 }
1882
filemgr_set_compaction_state(struct filemgr *old_file, struct filemgr *new_file, file_status_t status)1883 void filemgr_set_compaction_state(struct filemgr *old_file, struct filemgr *new_file,
1884 file_status_t status)
1885 {
1886 spin_lock(&old_file->lock);
1887 old_file->new_file = new_file;
1888 atomic_store_uint8_t(&old_file->status, status);
1889 spin_unlock(&old_file->lock);
1890 }
1891
1892 // Check if there is a file that still points to the old_file that is being
1893 // compacted away. If so open the file and return its pointer.
1894 static
_filemgr_check_stale_link(struct hash_elem *h, void *ctx)1895 void *_filemgr_check_stale_link(struct hash_elem *h, void *ctx) {
1896 struct filemgr *cur_file = (struct filemgr *)ctx;
1897 struct filemgr *file = _get_entry(h, struct filemgr, e);
1898 spin_lock(&file->lock);
1899 if (atomic_get_uint8_t(&file->status) == FILE_REMOVED_PENDING &&
1900 file->new_file == cur_file) {
1901 // Incrementing reference counter below is the same as filemgr_open()
1902 // We need to do this to ensure that the pointer returned does not
1903 // get freed outside the filemgr_open lock
1904 file->ref_count++;
1905 spin_unlock(&file->lock);
1906 return (void *)file;
1907 }
1908 spin_unlock(&file->lock);
1909 return (void *)NULL;
1910 }
1911
filemgr_search_stale_links(struct filemgr *cur_file)1912 struct filemgr *filemgr_search_stale_links(struct filemgr *cur_file) {
1913 struct filemgr *very_old_file;
1914 spin_lock(&filemgr_openlock);
1915 very_old_file = (struct filemgr *)hash_scan(&hash,
1916 _filemgr_check_stale_link, cur_file);
1917 spin_unlock(&filemgr_openlock);
1918 return very_old_file;
1919 }
1920
filemgr_redirect_old_file(struct filemgr *very_old_file, struct filemgr *new_file, filemgr_redirect_hdr_func redirect_header_func)1921 char *filemgr_redirect_old_file(struct filemgr *very_old_file,
1922 struct filemgr *new_file,
1923 filemgr_redirect_hdr_func
1924 redirect_header_func) {
1925 size_t old_header_len, new_header_len;
1926 uint16_t new_filename_len;
1927 char *past_filename;
1928 spin_lock(&very_old_file->lock);
1929 fdb_assert(very_old_file->header.size, very_old_file->header.size, 0);
1930 fdb_assert(very_old_file->new_file, very_old_file->new_file, 0);
1931 old_header_len = very_old_file->header.size;
1932 new_filename_len = strlen(new_file->filename);
1933 // Find out the new DB header length with new_file's filename
1934 new_header_len = old_header_len - strlen(very_old_file->new_file->filename)
1935 + new_filename_len;
1936 // As we are going to change the new_filename field in the DB header of the
1937 // very_old_file, maybe reallocate DB header buf to accomodate bigger value
1938 if (new_header_len > old_header_len) {
1939 very_old_file->header.data = realloc(very_old_file->header.data,
1940 new_header_len);
1941 }
1942 very_old_file->new_file = new_file; // Re-direct very_old_file to new_file
1943 past_filename = redirect_header_func((uint8_t *)very_old_file->header.data,
1944 new_file->filename, new_filename_len + 1);//Update in-memory header
1945 very_old_file->header.size = new_header_len;
1946 ++(very_old_file->header.revnum);
1947
1948 spin_unlock(&very_old_file->lock);
1949 return past_filename;
1950 }
1951
filemgr_remove_pending(struct filemgr *old_file, struct filemgr *new_file)1952 void filemgr_remove_pending(struct filemgr *old_file, struct filemgr *new_file)
1953 {
1954 fdb_assert(new_file, new_file, old_file);
1955
1956 spin_lock(&old_file->lock);
1957 if (old_file->ref_count > 0) {
1958 // delay removing
1959 old_file->new_file = new_file;
1960 atomic_store_uint8_t(&old_file->status, FILE_REMOVED_PENDING);
1961 spin_unlock(&old_file->lock);
1962 } else {
1963 // immediatly remove
1964 // LCOV_EXCL_START
1965 spin_unlock(&old_file->lock);
1966
1967 if (!lazy_file_deletion_enabled ||
1968 (old_file->new_file && old_file->new_file->in_place_compaction)) {
1969 remove(old_file->filename);
1970 }
1971 filemgr_remove_file(old_file);
1972 // LCOV_EXCL_STOP
1973 }
1974 }
1975
1976 // migrate default kv store stats over to new_file
filemgr_migrate_op_stats(struct filemgr *old_file, struct filemgr *new_file, struct kvs_info *kvs)1977 struct kvs_ops_stat *filemgr_migrate_op_stats(struct filemgr *old_file,
1978 struct filemgr *new_file,
1979 struct kvs_info *kvs)
1980 {
1981 kvs_ops_stat *ret = NULL;
1982 fdb_assert(new_file, new_file, old_file);
1983
1984 spin_lock(&old_file->lock);
1985 new_file->header.op_stat = old_file->header.op_stat;
1986 ret = &new_file->header.op_stat;
1987 spin_unlock(&old_file->lock);
1988 return ret;
1989 }
1990
1991 // Note: filemgr_openlock should be held before calling this function.
filemgr_destroy_file(char *filename, struct filemgr_config *config, struct hash *destroy_file_set)1992 fdb_status filemgr_destroy_file(char *filename,
1993 struct filemgr_config *config,
1994 struct hash *destroy_file_set)
1995 {
1996 struct filemgr *file = NULL;
1997 struct hash to_destroy_files;
1998 struct hash *destroy_set = (destroy_file_set ? destroy_file_set :
1999 &to_destroy_files);
2000 struct filemgr query;
2001 struct hash_elem *e = NULL;
2002 fdb_status status = FDB_RESULT_SUCCESS;
2003 char *old_filename = NULL;
2004
2005 if (!destroy_file_set) { // top level or non-recursive call
2006 hash_init(destroy_set, NBUCKET, _file_hash, _file_cmp);
2007 }
2008
2009 query.filename = filename;
2010 // check whether file is already being destroyed in parent recursive call
2011 e = hash_find(destroy_set, &query.e);
2012 if (e) { // Duplicate filename found, nothing to be done in this call
2013 if (!destroy_file_set) { // top level or non-recursive call
2014 hash_free(destroy_set);
2015 }
2016 return status;
2017 } else {
2018 // Remember file. Stack value ok IFF single direction recursion
2019 hash_insert(destroy_set, &query.e);
2020 }
2021
2022 // check global list of known files to see if it is already opened or not
2023 e = hash_find(&hash, &query.e);
2024 if (e) {
2025 // already opened (return existing structure)
2026 file = _get_entry(e, struct filemgr, e);
2027
2028 spin_lock(&file->lock);
2029 if (file->ref_count) {
2030 spin_unlock(&file->lock);
2031 status = FDB_RESULT_FILE_IS_BUSY;
2032 if (!destroy_file_set) { // top level or non-recursive call
2033 hash_free(destroy_set);
2034 }
2035 return status;
2036 }
2037 spin_unlock(&file->lock);
2038 if (file->old_filename) {
2039 status = filemgr_destroy_file(file->old_filename, config,
2040 destroy_set);
2041 if (status != FDB_RESULT_SUCCESS) {
2042 if (!destroy_file_set) { // top level or non-recursive call
2043 hash_free(destroy_set);
2044 }
2045 return status;
2046 }
2047 }
2048
2049 // Cleanup file from in-memory as well as on-disk
2050 e = hash_remove(&hash, &file->e);
2051 fdb_assert(e, e, 0);
2052 filemgr_free_func(&file->e);
2053 if (filemgr_does_file_exist(filename) == FDB_RESULT_SUCCESS) {
2054 if (remove(filename)) {
2055 status = FDB_RESULT_FILE_REMOVE_FAIL;
2056 }
2057 }
2058 } else { // file not in memory, read on-disk to destroy older versions..
2059 file = (struct filemgr *)alca(struct filemgr, 1);
2060 file->filename = filename;
2061 file->ops = get_filemgr_ops();
2062 file->fd = file->ops->open(file->filename, O_RDWR, 0666);
2063 file->blocksize = global_config.blocksize;
2064 if (file->fd < 0) {
2065 if (file->fd != FDB_RESULT_NO_SUCH_FILE) {
2066 if (!destroy_file_set) { // top level or non-recursive call
2067 hash_free(destroy_set);
2068 }
2069 return FDB_RESULT_OPEN_FAIL;
2070 }
2071 } else { // file successfully opened, seek to end to get DB header
2072 cs_off_t offset = file->ops->goto_eof(file->fd);
2073 if (offset == FDB_RESULT_SEEK_FAIL) {
2074 if (!destroy_file_set) { // top level or non-recursive call
2075 hash_free(destroy_set);
2076 }
2077 return FDB_RESULT_SEEK_FAIL;
2078 } else { // Need to read DB header which contains old filename
2079 atomic_store_uint64_t(&file->pos, offset);
2080 status = _filemgr_read_header(file, NULL);
2081 if (status != FDB_RESULT_SUCCESS) {
2082 if (!destroy_file_set) { // top level or non-recursive call
2083 hash_free(destroy_set);
2084 }
2085 file->ops->close(file->fd);
2086 return status;
2087 }
2088 if (file->header.data) {
2089 uint16_t *new_filename_len_ptr = (uint16_t *)((char *)
2090 file->header.data + 64);
2091 uint16_t new_filename_len =
2092 _endian_decode(*new_filename_len_ptr);
2093 uint16_t *old_filename_len_ptr = (uint16_t *)((char *)
2094 file->header.data + 66);
2095 uint16_t old_filename_len =
2096 _endian_decode(*old_filename_len_ptr);
2097 old_filename = (char *)file->header.data + 68
2098 + new_filename_len;
2099 if (old_filename_len) {
2100 status = filemgr_destroy_file(old_filename, config,
2101 destroy_set);
2102 }
2103 free(file->header.data);
2104 }
2105 file->ops->close(file->fd);
2106 if (status == FDB_RESULT_SUCCESS) {
2107 if (filemgr_does_file_exist(filename)
2108 == FDB_RESULT_SUCCESS) {
2109 if (remove(filename)) {
2110 status = FDB_RESULT_FILE_REMOVE_FAIL;
2111 }
2112 }
2113 }
2114 }
2115 }
2116 }
2117
2118 if (!destroy_file_set) { // top level or non-recursive call
2119 hash_free(destroy_set);
2120 }
2121
2122 return status;
2123 }
2124
filemgr_is_rollback_on(struct filemgr *file)2125 bool filemgr_is_rollback_on(struct filemgr *file)
2126 {
2127 bool rv;
2128 spin_lock(&file->lock);
2129 rv = (file->fflags & FILEMGR_ROLLBACK_IN_PROG);
2130 spin_unlock(&file->lock);
2131 return rv;
2132 }
2133
filemgr_set_rollback(struct filemgr *file, uint8_t new_val)2134 void filemgr_set_rollback(struct filemgr *file, uint8_t new_val)
2135 {
2136 spin_lock(&file->lock);
2137 if (new_val) {
2138 file->fflags |= FILEMGR_ROLLBACK_IN_PROG;
2139 } else {
2140 file->fflags &= ~FILEMGR_ROLLBACK_IN_PROG;
2141 }
2142 spin_unlock(&file->lock);
2143 }
2144
filemgr_set_in_place_compaction(struct filemgr *file, bool in_place_compaction)2145 void filemgr_set_in_place_compaction(struct filemgr *file,
2146 bool in_place_compaction) {
2147 spin_lock(&file->lock);
2148 file->in_place_compaction = in_place_compaction;
2149 spin_unlock(&file->lock);
2150 }
2151
filemgr_is_in_place_compaction_set(struct filemgr *file)2152 bool filemgr_is_in_place_compaction_set(struct filemgr *file)
2153 {
2154 bool ret = false;
2155 spin_lock(&file->lock);
2156 ret = file->in_place_compaction;
2157 spin_unlock(&file->lock);
2158 return ret;
2159 }
2160
filemgr_mutex_openlock(struct filemgr_config *config)2161 void filemgr_mutex_openlock(struct filemgr_config *config)
2162 {
2163 filemgr_init(config);
2164
2165 spin_lock(&filemgr_openlock);
2166 }
2167
filemgr_mutex_openunlock(void)2168 void filemgr_mutex_openunlock(void)
2169 {
2170 spin_unlock(&filemgr_openlock);
2171 }
2172
filemgr_mutex_lock(struct filemgr *file)2173 void filemgr_mutex_lock(struct filemgr *file)
2174 {
2175 mutex_lock(&file->writer_lock.mutex);
2176 file->writer_lock.locked = true;
2177 }
2178
filemgr_mutex_trylock(struct filemgr *file)2179 bool filemgr_mutex_trylock(struct filemgr *file) {
2180 if (mutex_trylock(&file->writer_lock.mutex)) {
2181 file->writer_lock.locked = true;
2182 return true;
2183 }
2184 return false;
2185 }
2186
filemgr_mutex_unlock(struct filemgr *file)2187 void filemgr_mutex_unlock(struct filemgr *file)
2188 {
2189 if (file->writer_lock.locked) {
2190 file->writer_lock.locked = false;
2191 mutex_unlock(&file->writer_lock.mutex);
2192 }
2193 }
2194
filemgr_set_dirty_root(struct filemgr *file, bid_t dirty_idtree_root, bid_t dirty_seqtree_root)2195 void filemgr_set_dirty_root(struct filemgr *file,
2196 bid_t dirty_idtree_root,
2197 bid_t dirty_seqtree_root)
2198 {
2199 atomic_store_uint64_t(&file->header.dirty_idtree_root, dirty_idtree_root);
2200 atomic_store_uint64_t(&file->header.dirty_seqtree_root, dirty_seqtree_root);
2201 }
2202
filemgr_is_commit_header(void *head_buffer, size_t blocksize)2203 bool filemgr_is_commit_header(void *head_buffer, size_t blocksize)
2204 {
2205 uint8_t marker[BLK_MARKER_SIZE];
2206 filemgr_magic_t magic;
2207 marker[0] = *(((uint8_t *)head_buffer)
2208 + blocksize - BLK_MARKER_SIZE);
2209 if (marker[0] != BLK_MARKER_DBHEADER) {
2210 return false;
2211 }
2212
2213 memcpy(&magic, (uint8_t *) head_buffer
2214 + blocksize - BLK_MARKER_SIZE - sizeof(magic), sizeof(magic));
2215 magic = _endian_decode(magic);
2216
2217 return (magic == FILEMGR_MAGIC);
2218 }
2219
filemgr_set_throttling_delay(struct filemgr *file, uint64_t delay_us)2220 void filemgr_set_throttling_delay(struct filemgr *file, uint64_t delay_us)
2221 {
2222 atomic_store_uint32_t(&file->throttling_delay, delay_us);
2223 }
2224
filemgr_get_throttling_delay(struct filemgr *file)2225 uint32_t filemgr_get_throttling_delay(struct filemgr *file)
2226 {
2227 return atomic_get_uint32_t(&file->throttling_delay);
2228 }
2229
_kvs_stat_set(struct filemgr *file, fdb_kvs_id_t kv_id, struct kvs_stat stat)2230 void _kvs_stat_set(struct filemgr *file,
2231 fdb_kvs_id_t kv_id,
2232 struct kvs_stat stat)
2233 {
2234 if (kv_id == 0) {
2235 spin_lock(&file->lock);
2236 file->header.stat = stat;
2237 spin_unlock(&file->lock);
2238 } else {
2239 struct avl_node *a;
2240 struct kvs_node query, *node;
2241 struct kvs_header *kv_header = file->kv_header;
2242
2243 spin_lock(&kv_header->lock);
2244 query.id = kv_id;
2245 a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2246 if (a) {
2247 node = _get_entry(a, struct kvs_node, avl_id);
2248 node->stat = stat;
2249 }
2250 spin_unlock(&kv_header->lock);
2251 }
2252 }
2253
_kvs_stat_update_attr(struct filemgr *file, fdb_kvs_id_t kv_id, kvs_stat_attr_t attr, int delta)2254 void _kvs_stat_update_attr(struct filemgr *file,
2255 fdb_kvs_id_t kv_id,
2256 kvs_stat_attr_t attr,
2257 int delta)
2258 {
2259 spin_t *lock = NULL;
2260 struct kvs_stat *stat;
2261
2262 if (kv_id == 0) {
2263 stat = &file->header.stat;
2264 lock = &file->lock;
2265 spin_lock(lock);
2266 } else {
2267 struct avl_node *a;
2268 struct kvs_node query, *node;
2269 struct kvs_header *kv_header = file->kv_header;
2270
2271 lock = &kv_header->lock;
2272 spin_lock(lock);
2273 query.id = kv_id;
2274 a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2275 if (!a) {
2276 // KV instance corresponding to the kv_id is already removed
2277 spin_unlock(lock);
2278 return;
2279 }
2280 node = _get_entry(a, struct kvs_node, avl_id);
2281 stat = &node->stat;
2282 }
2283
2284 if (attr == KVS_STAT_DATASIZE) {
2285 stat->datasize += delta;
2286 } else if (attr == KVS_STAT_NDOCS) {
2287 stat->ndocs += delta;
2288 } else if (attr == KVS_STAT_NLIVENODES) {
2289 stat->nlivenodes += delta;
2290 } else if (attr == KVS_STAT_WAL_NDELETES) {
2291 stat->wal_ndeletes += delta;
2292 } else if (attr == KVS_STAT_WAL_NDOCS) {
2293 stat->wal_ndocs += delta;
2294 }
2295 spin_unlock(lock);
2296 }
2297
_kvs_stat_get_kv_header(struct kvs_header *kv_header, fdb_kvs_id_t kv_id, struct kvs_stat *stat)2298 int _kvs_stat_get_kv_header(struct kvs_header *kv_header,
2299 fdb_kvs_id_t kv_id,
2300 struct kvs_stat *stat)
2301 {
2302 int ret = 0;
2303 struct avl_node *a;
2304 struct kvs_node query, *node;
2305
2306 query.id = kv_id;
2307 a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2308 if (a) {
2309 node = _get_entry(a, struct kvs_node, avl_id);
2310 *stat = node->stat;
2311 } else {
2312 ret = -1;
2313 }
2314 return ret;
2315 }
2316
_kvs_stat_get(struct filemgr *file, fdb_kvs_id_t kv_id, struct kvs_stat *stat)2317 int _kvs_stat_get(struct filemgr *file,
2318 fdb_kvs_id_t kv_id,
2319 struct kvs_stat *stat)
2320 {
2321 int ret = 0;
2322
2323 if (kv_id == 0) {
2324 spin_lock(&file->lock);
2325 *stat = file->header.stat;
2326 spin_unlock(&file->lock);
2327 } else {
2328 struct kvs_header *kv_header = file->kv_header;
2329
2330 spin_lock(&kv_header->lock);
2331 ret = _kvs_stat_get_kv_header(kv_header, kv_id, stat);
2332 spin_unlock(&kv_header->lock);
2333 }
2334
2335 return ret;
2336 }
2337
_kvs_stat_get_sum(struct filemgr *file, kvs_stat_attr_t attr)2338 uint64_t _kvs_stat_get_sum(struct filemgr *file,
2339 kvs_stat_attr_t attr)
2340 {
2341 struct avl_node *a;
2342 struct kvs_node *node;
2343 struct kvs_header *kv_header = file->kv_header;
2344
2345 uint64_t ret = 0;
2346 spin_lock(&file->lock);
2347 if (attr == KVS_STAT_DATASIZE) {
2348 ret += file->header.stat.datasize;
2349 } else if (attr == KVS_STAT_NDOCS) {
2350 ret += file->header.stat.ndocs;
2351 } else if (attr == KVS_STAT_NLIVENODES) {
2352 ret += file->header.stat.nlivenodes;
2353 } else if (attr == KVS_STAT_WAL_NDELETES) {
2354 ret += file->header.stat.wal_ndeletes;
2355 } else if (attr == KVS_STAT_WAL_NDOCS) {
2356 ret += file->header.stat.wal_ndocs;
2357 }
2358 spin_unlock(&file->lock);
2359
2360 if (kv_header) {
2361 spin_lock(&kv_header->lock);
2362 a = avl_first(kv_header->idx_id);
2363 while (a) {
2364 node = _get_entry(a, struct kvs_node, avl_id);
2365 a = avl_next(&node->avl_id);
2366
2367 if (attr == KVS_STAT_DATASIZE) {
2368 ret += node->stat.datasize;
2369 } else if (attr == KVS_STAT_NDOCS) {
2370 ret += node->stat.ndocs;
2371 } else if (attr == KVS_STAT_NLIVENODES) {
2372 ret += node->stat.nlivenodes;
2373 } else if (attr == KVS_STAT_WAL_NDELETES) {
2374 ret += node->stat.wal_ndeletes;
2375 } else if (attr == KVS_STAT_WAL_NDOCS) {
2376 ret += node->stat.wal_ndocs;
2377 }
2378 }
2379 spin_unlock(&kv_header->lock);
2380 }
2381
2382 return ret;
2383 }
2384
_kvs_ops_stat_get_kv_header(struct kvs_header *kv_header, fdb_kvs_id_t kv_id, struct kvs_ops_stat *stat)2385 int _kvs_ops_stat_get_kv_header(struct kvs_header *kv_header,
2386 fdb_kvs_id_t kv_id,
2387 struct kvs_ops_stat *stat)
2388 {
2389 int ret = 0;
2390 struct avl_node *a;
2391 struct kvs_node query, *node;
2392
2393 query.id = kv_id;
2394 a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2395 if (a) {
2396 node = _get_entry(a, struct kvs_node, avl_id);
2397 *stat = node->op_stat;
2398 } else {
2399 ret = -1;
2400 }
2401 return ret;
2402 }
2403
_kvs_ops_stat_get(struct filemgr *file, fdb_kvs_id_t kv_id, struct kvs_ops_stat *stat)2404 int _kvs_ops_stat_get(struct filemgr *file,
2405 fdb_kvs_id_t kv_id,
2406 struct kvs_ops_stat *stat)
2407 {
2408 int ret = 0;
2409
2410 if (kv_id == 0) {
2411 spin_lock(&file->lock);
2412 *stat = file->header.op_stat;
2413 spin_unlock(&file->lock);
2414 } else {
2415 struct kvs_header *kv_header = file->kv_header;
2416
2417 spin_lock(&kv_header->lock);
2418 ret = _kvs_ops_stat_get_kv_header(kv_header, kv_id, stat);
2419 spin_unlock(&kv_header->lock);
2420 }
2421
2422 return ret;
2423 }
2424
_init_op_stats(struct kvs_ops_stat *stat)2425 void _init_op_stats(struct kvs_ops_stat *stat) {
2426 atomic_init_uint64_t(&stat->num_sets, 0);
2427 atomic_init_uint64_t(&stat->num_dels, 0);
2428 atomic_init_uint64_t(&stat->num_commits, 0);
2429 atomic_init_uint64_t(&stat->num_compacts, 0);
2430 atomic_init_uint64_t(&stat->num_gets, 0);
2431 atomic_init_uint64_t(&stat->num_iterator_gets, 0);
2432 atomic_init_uint64_t(&stat->num_iterator_moves, 0);
2433 }
2434
filemgr_get_ops_stats(struct filemgr *file, struct kvs_info *kvs)2435 struct kvs_ops_stat *filemgr_get_ops_stats(struct filemgr *file,
2436 struct kvs_info *kvs)
2437 {
2438 struct kvs_ops_stat *stat = NULL;
2439 if (!kvs || (kvs && kvs->id == 0)) {
2440 return &file->header.op_stat;
2441 } else {
2442 struct kvs_header *kv_header = file->kv_header;
2443 struct avl_node *a;
2444 struct kvs_node query, *node;
2445 spin_lock(&kv_header->lock);
2446 query.id = kvs->id;
2447 a = avl_search(kv_header->idx_id, &query.avl_id, _kvs_stat_cmp);
2448 if (a) {
2449 node = _get_entry(a, struct kvs_node, avl_id);
2450 stat = &node->op_stat;
2451 }
2452 spin_unlock(&kv_header->lock);
2453 }
2454 return stat;
2455 }
2456
buf2kvid(size_t chunksize, void *buf, fdb_kvs_id_t *id)2457 void buf2kvid(size_t chunksize, void *buf, fdb_kvs_id_t *id)
2458 {
2459 size_t size_id = sizeof(fdb_kvs_id_t);
2460 fdb_kvs_id_t temp;
2461
2462 if (chunksize == size_id) {
2463 temp = *((fdb_kvs_id_t*)buf);
2464 } else if (chunksize < size_id) {
2465 temp = 0;
2466 memcpy((uint8_t*)&temp + (size_id - chunksize), buf, chunksize);
2467 } else { // chunksize > sizeof(fdb_kvs_id_t)
2468 memcpy(&temp, (uint8_t*)buf + (chunksize - size_id), size_id);
2469 }
2470 *id = _endian_decode(temp);
2471 }
2472
kvid2buf(size_t chunksize, fdb_kvs_id_t id, void *buf)2473 void kvid2buf(size_t chunksize, fdb_kvs_id_t id, void *buf)
2474 {
2475 size_t size_id = sizeof(fdb_kvs_id_t);
2476 id = _endian_encode(id);
2477
2478 if (chunksize == size_id) {
2479 memcpy(buf, &id, size_id);
2480 } else if (chunksize < size_id) {
2481 memcpy(buf, (uint8_t*)&id + (size_id - chunksize), chunksize);
2482 } else { // chunksize > sizeof(fdb_kvs_id_t)
2483 memset(buf, 0x0, chunksize - size_id);
2484 memcpy((uint8_t*)buf + (chunksize - size_id), &id, size_id);
2485 }
2486 }
2487
buf2buf(size_t chunksize_src, void *buf_src, size_t chunksize_dst, void *buf_dst)2488 void buf2buf(size_t chunksize_src, void *buf_src,
2489 size_t chunksize_dst, void *buf_dst)
2490 {
2491 if (chunksize_dst == chunksize_src) {
2492 memcpy(buf_dst, buf_src, chunksize_src);
2493 } else if (chunksize_dst < chunksize_src) {
2494 memcpy(buf_dst, (uint8_t*)buf_src + (chunksize_src - chunksize_dst),
2495 chunksize_dst);
2496 } else { // chunksize_dst > chunksize_src
2497 memset(buf_dst, 0x0, chunksize_dst - chunksize_src);
2498 memcpy((uint8_t*)buf_dst + (chunksize_dst - chunksize_src),
2499 buf_src, chunksize_src);
2500 }
2501 }
2502