1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2/* 3 * Copyright 2015 Couchbase, Inc 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18#include <stdio.h> 19#include <string.h> 20#include <stdint.h> 21#include <stdlib.h> 22 23#include "staleblock.h" 24#include "btreeblock.h" 25#include "list.h" 26#include "docio.h" 27#include "fdb_internal.h" 28#include "version.h" 29#include "time_utils.h" 30 31#ifdef _DOC_COMP 32#include "snappy-c.h" 33#endif 34 35#include "memleak.h" 36 37static bool compress_inmem_stale_info = true; 38 39static int _inmem_stale_cmp(struct avl_node *a, struct avl_node *b, void *aux) 40{ 41 struct stale_info_commit *aa, *bb; 42 aa = _get_entry(a, struct stale_info_commit, avl); 43 bb = _get_entry(b, struct stale_info_commit, avl); 44 45 if (aa->revnum < bb->revnum) { 46 return -1; 47 } else if (aa->revnum > bb->revnum) { 48 return 1; 49 } else { 50 return 0; 51 } 52} 53 54static void fdb_add_inmem_stale_info(fdb_kvs_handle *handle, 55 filemgr_header_revnum_t revnum, 56 struct docio_object *doc, 57 uint64_t doc_offset, 58 bool system_doc_only) 59{ 60 int ret; 61 size_t buflen = 0; 62 struct filemgr *file = handle->file; 63 struct avl_node *a; 64 struct stale_info_commit *item, query; 65 struct stale_info_entry *entry; 66 67 // search using revnum first 68 query.revnum = revnum; 69 a = avl_search(&file->stale_info_tree, &query.avl, _inmem_stale_cmp); 70 if (a) { 71 // already exist 72 item = _get_entry(a, struct stale_info_commit, avl); 73 } else { 74 // not exist .. create a new one and insert into tree 75 item = (struct stale_info_commit *)calloc(1, sizeof(struct stale_info_commit)); 76 item->revnum = revnum; 77 list_init(&item->doc_list); 78 avl_insert(&file->stale_info_tree, &item->avl, _inmem_stale_cmp); 79 file->stale_info_tree_loaded.store(true, std::memory_order_relaxed); 80 } 81 82 entry = (struct stale_info_entry *)calloc(1, sizeof(struct stale_info_entry)); 83 84 if (!system_doc_only) { 85#ifdef _DOC_COMP 86 if (compress_inmem_stale_info) { 87 buflen = snappy_max_compressed_length(doc->length.bodylen);; 88 entry->ctx = (void *)calloc(1, buflen); 89 ret = snappy_compress((char*)doc->body, doc->length.bodylen, 90 (char*)entry->ctx, &buflen); 91 if (ret != 0) { 92 fdb_log(NULL, FDB_RESULT_COMPRESSION_FAIL, 93 "(fdb_add_inmem_stale_info) " 94 "Compression error from a database file '%s'" 95 ": return value %d, header revnum %" _F64 ", " 96 "doc offset %" _F64 "\n", 97 file->filename, ret, revnum, doc_offset); 98 if (!a) { 99 // 'item' is allocated in this function call. 100 avl_remove(&file->stale_info_tree, &item->avl); 101 free(item); 102 } 103 free(entry); 104 return; 105 } 106 } else { 107 buflen = doc->length.bodylen; 108 entry->ctx = (void *)calloc(1, buflen); 109 memcpy(entry->ctx, doc->body, doc->length.bodylen); 110 } 111#else 112 buflen = doc->length.bodylen; 113 entry->ctx = (void *)calloc(1, buflen); 114 memcpy(entry->ctx, doc->body, doc->length.bodylen); 115#endif 116 117 entry->ctxlen = doc->length.bodylen; 118 119 } else { 120 // when 'system_doc_only' flag is set, just set to NULL. 121 // we need the doc's length and offset info only. 122 entry->ctx = NULL; 123 entry->ctxlen = 0; 124 } 125 126 entry->comp_ctxlen = buflen; 127 entry->doclen = _fdb_get_docsize(doc->length); 128 entry->offset = doc_offset; 129 list_push_back(&item->doc_list, &entry->le); 130} 131 132void fdb_load_inmem_stale_info(fdb_kvs_handle *handle) 133{ 134 uint8_t keybuf[64]; 135 int64_t ret; 136 bid_t offset, _offset, prev_offset; 137 filemgr_header_revnum_t revnum, _revnum; 138 btree_iterator bit; 139 btree_result br; 140 struct filemgr *file = handle->file; 141 struct docio_object doc; 142 bool expected = false; 143 144 if (!std::atomic_compare_exchange_strong(&file->stale_info_tree_loaded, 145 &expected, true)) { 146 // stale info is already loaded (fast screening without mutex) 147 return; 148 } 149 150 // first open of the DB file 151 // should grab mutex to avoid race with other writer 152 filemgr_mutex_lock(file); 153 154 btree_iterator_init(handle->staletree, &bit, NULL); 155 do { 156 br = btree_next(&bit, (void*)&_revnum, (void*)&_offset); 157 btreeblk_end(handle->bhandle); 158 if (br != BTREE_RESULT_SUCCESS) { 159 break; 160 } 161 162 revnum = _endian_decode(_revnum); 163 offset = _endian_decode(_offset); 164 165 while (offset != BLK_NOT_FOUND) { 166 memset(&doc, 0x0, sizeof(doc)); 167 // pre-allocated buffer for key 168 doc.key = (void*)keybuf; 169 170 ret = docio_read_doc(handle->dhandle, offset, &doc, true); 171 if (ret <= 0) { 172 // read fail .. escape 173 fdb_log(NULL, (fdb_status)ret, 174 "Error in reading a stale region info document " 175 "from a database file '%s'" 176 ": revnum %" _F64 ", offset %" _F64 "\n", 177 file->filename, revnum, offset); 178 offset = BLK_NOT_FOUND; 179 continue; 180 } 181 182 fdb_add_inmem_stale_info(handle, revnum, &doc, offset, false); 183 184 // fetch previous doc offset 185 memcpy(&_offset, doc.body, sizeof(_offset)); 186 prev_offset = _endian_decode(_offset); 187 188 // We don't need to free 'meta' as it will be NULL. 189 free(doc.body); 190 191 offset = prev_offset; 192 } 193 } while (true); 194 btree_iterator_free(&bit); 195 196 filemgr_mutex_unlock(file); 197} 198 199void fdb_gather_stale_blocks(fdb_kvs_handle *handle, 200 filemgr_header_revnum_t revnum, 201 bid_t prev_hdr, 202 uint64_t kv_info_offset, 203 fdb_seqnum_t seqnum, 204 struct list_elem *e_last, 205 bool from_mergetree) 206{ 207 int64_t delta; 208 int r; 209 uint32_t count = 0; 210 uint32_t offset = 0, count_location; 211 uint32_t bufsize = 8192; 212 uint32_t _count, _len; 213 uint64_t _pos, _kv_info_offset; 214 uint8_t *buf = NULL; 215 bid_t doc_offset, _doc_offset; 216 bid_t _prev_hdr; 217 bool gather_staleblocks = true; 218 bool first_loop = true; 219 filemgr_header_revnum_t _revnum; 220 fdb_seqnum_t _seqnum; 221 struct kvs_stat stat; 222 223 /* 224 * << stale block system doc structure >> 225 * [previous doc offset]: 8 bytes (0xffff.. if not exist) 226 * [previous header BID]: 8 bytes (0xffff.. if not exist) 227 * [KVS info doc offset]: 8 bytes (0xffff.. if not exist) 228 * [Default KVS seqnum]: 8 bytes 229 * [# items]: 4 bytes 230 * --- 231 * [position]: 8 bytes 232 * [length]: 4 bytes 233 * ... 234 */ 235 236 if (filemgr_get_stale_list(handle->file)) { 237 struct avl_node *a; 238 struct list_elem *e; 239 struct stale_data *item; 240 241 r = _kvs_stat_get(handle->file, 0, &stat); 242 handle->bhandle->nlivenodes = stat.nlivenodes; 243 handle->bhandle->ndeltanodes = stat.nlivenodes; 244 (void)r; 245 246 buf = (uint8_t *)calloc(1, bufsize); 247 _revnum = _endian_encode(revnum); 248 249 // initial previous doc offset 250 memset(buf, 0xff, sizeof(bid_t)); 251 count_location = sizeof(bid_t); 252 253 // previous header BID 254 if (prev_hdr == 0 || prev_hdr == BLK_NOT_FOUND) { 255 // does not exist 256 memset(&_prev_hdr, 0xff, sizeof(_prev_hdr)); 257 } else { 258 _prev_hdr = _endian_encode(prev_hdr); 259 } 260 memcpy(buf + sizeof(bid_t), &_prev_hdr, sizeof(bid_t)); 261 count_location += sizeof(bid_t); 262 263 // KVS info doc offset 264 _kv_info_offset = _endian_encode(kv_info_offset); 265 memcpy(buf + count_location, &_kv_info_offset, sizeof(uint64_t)); 266 count_location += sizeof(uint64_t); 267 268 // default KVS seqnum 269 _seqnum = _endian_encode(seqnum); 270 memcpy(buf + count_location, &_seqnum, sizeof(fdb_seqnum_t)); 271 count_location += sizeof(fdb_seqnum_t); 272 count_location += sizeof(count); 273 274 while(gather_staleblocks) { 275 // reserve space for 276 // prev offset (8), prev header (8), kv_info_offset (8), 277 // seqnum (8), count (4) 278 offset = count_location; 279 280 if (first_loop && from_mergetree) { 281 // gather from mergetree 282 a = avl_first(&handle->file->mergetree); 283 while (a) { 284 item = _get_entry(a, struct stale_data, avl); 285 286 if (handle->staletree) { 287 count++; 288 289 _pos = _endian_encode(item->pos); 290 _len = _endian_encode(item->len); 291 292 memcpy(buf + offset, &_pos, sizeof(_pos)); 293 offset += sizeof(_pos); 294 memcpy(buf + offset, &_len, sizeof(_len)); 295 offset += sizeof(_len); 296 297 if (offset + sizeof(_pos) + sizeof(_len) >= bufsize) { 298 bufsize *= 2; 299 buf = (uint8_t*)realloc(buf, bufsize); 300 } 301 } 302 303 304 // If 'from_mergetree' flag is set, it means that this 305 // function is called at the end of fdb_get_reusable_block(), 306 // and those items are remaining (non-reusable) regions after 307 // picking up reusable blocks from 'mergetree'. 308 309 // In the previous implementation, those items are converted 310 // and stored as a system document. The document is re-read in 311 // the next block reclaim, and then we reconstruct 'mergetree' 312 // from the document; this is unnecessary duplicated overhead. 313 314 // As an optimization, we can simply keep those items in 315 // 'mergetree' and use them in the next block reclaim, without 316 // reading the corresponding system document; this also reduces 317 // the commit latency much. Instead, to minimize memory 318 // consumption, we don't need to maintain in-memory copy of the 319 // system doc corresponding to the remaining items in the 320 // 'mergetree', that will be created below. 321 322 // do not remove the item 323 a = avl_next(&item->avl); 324 } 325 } else { 326 // gater from stale_list 327 if (e_last) { 328 e = list_next(e_last); 329 } else { 330 e = list_begin(handle->file->stale_list); 331 } 332 while (e) { 333 item = _get_entry(e, struct stale_data, le); 334 335 if (handle->staletree) { 336 count++; 337 338 _pos = _endian_encode(item->pos); 339 _len = _endian_encode(item->len); 340 341 memcpy(buf + offset, &_pos, sizeof(_pos)); 342 offset += sizeof(_pos); 343 memcpy(buf + offset, &_len, sizeof(_len)); 344 offset += sizeof(_len); 345 346 if (offset + sizeof(_pos) + sizeof(_len) >= bufsize) { 347 bufsize *= 2; 348 buf = (uint8_t*)realloc(buf, bufsize); 349 } 350 } 351 352 e = list_remove(handle->file->stale_list, e); 353 free(item); 354 } 355 } 356 357 gather_staleblocks = false; 358 if (count) { 359 char *doc_key = alca(char, 32); 360 struct docio_object doc; 361 362 // store count 363 _count = _endian_encode(count); 364 memcpy(buf + count_location - sizeof(_count), &_count, sizeof(_count)); 365 366 // append a system doc 367 memset(&doc, 0x0, sizeof(doc)); 368 // add one to 'revnum' to get the next revision number 369 // (note that filemgr_mutex() is grabbed so that no other thread 370 // will change the 'revnum'). 371 sprintf(doc_key, "stale_blocks_%" _F64, revnum); 372 doc.key = (void*)doc_key; 373 doc.body = buf; 374 doc.length.keylen = strlen(doc_key) + 1; 375 doc.length.metalen = 0; 376 doc.length.bodylen = offset; 377 doc.seqnum = 0; 378 doc_offset = docio_append_doc_system(handle->dhandle, &doc); 379 380 // insert into stale-block tree 381 _doc_offset = _endian_encode(doc_offset); 382 btree_insert(handle->staletree, (void *)&_revnum, (void *)&_doc_offset); 383 btreeblk_end(handle->bhandle); 384 btreeblk_reset_subblock_info(handle->bhandle); 385 386 if (from_mergetree && first_loop) { 387 // if from_mergetree flag is set and this is the first loop, 388 // stale regions in this document are already in mergetree 389 // so skip adding them into in-memory stale info tree. 390 391 // however, the system doc itself should be marked as stale 392 // when the doc is reclaimed, thus we instead add a dummy entry 393 // that containing doc offset, length info only. 394 fdb_add_inmem_stale_info(handle, revnum, &doc, doc_offset, true); 395 } else { 396 // add the doc into in-memory stale info tree 397 fdb_add_inmem_stale_info(handle, revnum, &doc, doc_offset, false); 398 } 399 400 if (list_begin(filemgr_get_stale_list(handle->file))) { 401 // updating stale tree brings another stale blocks. 402 // recursively update until there is no more stale block. 403 404 // note that infinite loop will not occur because 405 // 1) all updated index blocks for stale tree are still writable 406 // 2) incoming keys for stale tree (revnum) are monotonic 407 // increasing order; most recently allocated node will be 408 // updated again. 409 410 count = 0; 411 // save previous doc offset 412 memcpy(buf, &_doc_offset, sizeof(_doc_offset)); 413 414 // gather once again 415 gather_staleblocks = true; 416 } 417 } 418 419 first_loop = false; 420 } // gather stale blocks 421 422 delta = handle->bhandle->nlivenodes - stat.nlivenodes; 423 _kvs_stat_update_attr(handle->file, 0, KVS_STAT_NLIVENODES, delta); 424 delta = handle->bhandle->ndeltanodes - stat.nlivenodes; 425 delta *= handle->config.blocksize; 426 _kvs_stat_update_attr(handle->file, 0, KVS_STAT_DELTASIZE, delta); 427 428 free(buf); 429 } else { 430 btreeblk_reset_subblock_info(handle->bhandle); 431 } 432} 433 434static int _reusable_offset_cmp(struct avl_node *a, struct avl_node *b, void *aux) 435{ 436 struct stale_data *aa, *bb; 437 aa = _get_entry(a, struct stale_data, avl); 438 bb = _get_entry(b, struct stale_data, avl); 439 return _CMP_U64(aa->pos, bb->pos); 440} 441 442static void _insert_n_merge(struct avl_tree *tree, 443 uint64_t item_pos, 444 uint32_t item_len) 445{ 446 struct stale_data query, *item; 447 struct avl_node *avl; 448 449 // retrieve the tree first 450 query.pos = item_pos; 451 avl = avl_search(tree, &query.avl, _reusable_offset_cmp); 452 if (avl) { 453 // same offset already exists 454 item = _get_entry(avl, struct stale_data, avl); 455 // choose longer length 456 if (item->len < item_len) { 457 item->len = item_len; 458 } 459 } else { 460 // does not exist .. create a new item 461 item = (struct stale_data*) 462 calloc(1, sizeof(struct stale_data)); 463 item->pos = item_pos; 464 item->len = item_len; 465 avl_insert(tree, &item->avl, _reusable_offset_cmp); 466 } 467 468 // check prev/next item to see if they can be merged 469 struct avl_node *p_avl, *n_avl; 470 struct stale_data*p_item, *n_item; 471 p_avl = avl_prev(&item->avl); 472 if (p_avl) { 473 p_item = _get_entry(p_avl, struct stale_data, avl); 474 if (p_item->pos + p_item->len >= item->pos) { 475 476 if (p_item->pos + p_item->len >= item->pos + item->len) { 477 // 'item' is included in p_item .. simply remove it 478 // (do nothing) 479 } else { 480 // overlapping (or consecutive) .. merge two items 481 p_item->len += item->len + 482 (item->pos - p_item->pos - p_item->len); 483 } 484 // remove current item 485 avl_remove(tree, &item->avl); 486 free(item); 487 item = p_item; 488 } 489 } 490 491 n_avl = avl_next(&item->avl); 492 if (n_avl) { 493 n_item = _get_entry(n_avl, struct stale_data, avl); 494 if (item->pos + item->len >= n_item->pos) { 495 496 if (item->pos + item->len >= n_item->pos + n_item->len) { 497 // 'n_item' is included in 'item' .. simply remove it 498 // (do nothing) 499 } else { 500 // overlapping (or consecutive) .. merge two items 501 item->len += n_item->len + 502 (n_item->pos - item->pos - item->len); 503 } 504 // remove next item 505 avl_remove(tree, &n_item->avl); 506 free(n_item); 507 } 508 } 509} 510 511// Parse & fetch stale regions from the buffer 'ctx', which is the body of 512// a stale info system document (from either in-memory stale-block-tree or 513// on-disk stale-block-tree). After fetching, insert those regions 514// into 'mergetree'. 515static void _fetch_stale_info_doc(void *ctx, 516 struct avl_tree *mergetree, 517 uint64_t &prev_offset_out, 518 uint64_t &prev_hdr_out) 519{ 520 uint32_t i, count, _count, item_len; 521 uint64_t pos; 522 uint64_t item_pos; 523 524 pos = 0; 525 526 // get previous doc offset 527 memcpy(&prev_offset_out, ctx, sizeof(prev_offset_out)); 528 prev_offset_out = _endian_decode(prev_offset_out); 529 pos += sizeof(prev_offset_out); 530 531 // get previous header BID 532 memcpy(&prev_hdr_out, (uint8_t*)ctx + pos, sizeof(prev_hdr_out)); 533 prev_hdr_out = _endian_decode(prev_hdr_out); 534 (void)prev_hdr_out; 535 pos += sizeof(prev_hdr_out); 536 537 // Skip kv_info_offset and default KVS's seqnum 538 pos += sizeof(uint64_t) + sizeof(fdb_seqnum_t); 539 540 // get count; 541 memcpy(&_count, (uint8_t*)ctx + pos, sizeof(_count)); 542 count = _endian_decode(_count); 543 pos += sizeof(_count); 544 545 // get a stale region and insert/merge into tree 546 for (i=0;i<count;++i) { 547 memcpy(&item_pos, (uint8_t*)ctx + pos, sizeof(item_pos)); 548 item_pos = _endian_decode(item_pos); 549 pos += sizeof(item_pos); 550 551 memcpy(&item_len, (uint8_t*)ctx + pos, sizeof(item_len)); 552 item_len = _endian_decode(item_len); 553 pos += sizeof(item_len); 554 555 _insert_n_merge(mergetree, item_pos, item_len); 556 } 557} 558 559reusable_block_list fdb_get_reusable_block(fdb_kvs_handle *handle, 560 stale_header_info stale_header) 561{ 562 int64_t delta; 563 int r; 564 uint8_t keybuf[64]; 565 uint32_t i; 566 uint32_t item_len; 567 uint32_t n_revnums, max_revnum_array = 256; 568 uint64_t item_pos; 569 btree_iterator bit; 570 btree_result br; 571 filemgr_header_revnum_t revnum_upto, prev_revnum = 0; 572 filemgr_header_revnum_t revnum = 0, _revnum; 573 filemgr_header_revnum_t *revnum_array; 574 bid_t offset, _offset, prev_offset; 575 bid_t prev_hdr = BLK_NOT_FOUND; 576 bool stale_tree_scan = true; 577 struct docio_object doc; 578 struct avl_tree *mergetree = &handle->file->mergetree; 579 struct avl_node *avl; 580 struct stale_data *item; 581 struct list_elem *e, *e_last; 582 struct kvs_stat stat; 583 584 revnum_upto = stale_header.revnum; 585 586 r = _kvs_stat_get(handle->file, 0, &stat); 587 handle->bhandle->nlivenodes = stat.nlivenodes; 588 handle->bhandle->ndeltanodes = stat.nlivenodes; 589 (void)r; 590 591 revnum_array = (filemgr_header_revnum_t *) 592 calloc(max_revnum_array, sizeof(filemgr_header_revnum_t)); 593 n_revnums = 0; 594 595 // remember the last stale list item to be preserved 596 e_last = list_end(handle->file->stale_list); 597 598 avl = avl_first(&handle->file->stale_info_tree); 599 if (avl) { 600 // if in-memory stale info exists 601 void *uncomp_buf = NULL; 602 int r; 603 size_t uncomp_buflen = 128*1024; // 128 KB by default; 604 struct stale_info_commit *commit; 605 struct stale_info_entry *entry; 606 607 stale_tree_scan = false; 608 609 if (compress_inmem_stale_info) { 610 uncomp_buf = (void*)calloc(1, uncomp_buflen); 611 if (!uncomp_buf) { 612 fdb_log(NULL, FDB_RESULT_ALLOC_FAIL, 613 "(fdb_get_reusable_block) " 614 "calloc of 'uncomp_buf' failed: " 615 "database file '%s', " 616 "uncomp_buflen %d\n", 617 handle->file->filename, 618 (int)uncomp_buflen); 619 free(revnum_array); 620 621 reusable_block_list ret; 622 ret.n_blocks = 0; 623 ret.blocks = NULL; 624 625 return ret; 626 } 627 } 628 629 while (avl) { 630 commit = _get_entry(avl, struct stale_info_commit, avl); 631 avl = avl_next(avl); 632 633 prev_revnum = revnum; 634 revnum = commit->revnum; 635 if (revnum > revnum_upto) { 636 revnum = prev_revnum; 637 break; 638 } 639 640 filemgr_header_revnum_t *new_revnum_array = revnum_array; 641 revnum_array[n_revnums++] = revnum; 642 if (n_revnums >= max_revnum_array) { 643 max_revnum_array *= 2; 644 new_revnum_array = (filemgr_header_revnum_t *) 645 realloc(revnum_array, max_revnum_array * 646 sizeof(filemgr_header_revnum_t)); 647 } 648 if (!new_revnum_array) { 649 // realloc() of revnum_array failed. 650 fdb_log(NULL, FDB_RESULT_ALLOC_FAIL, 651 "(fdb_get_reusable_block) " 652 "realloc of 'revnum_array' failed: " 653 "database file '%s', " 654 "max_revnum_array %d\n", 655 handle->file->filename, 656 (int)max_revnum_array); 657 free(uncomp_buf); 658 free(revnum_array); 659 660 reusable_block_list ret; 661 ret.n_blocks = 0; 662 ret.blocks = NULL; 663 664 return ret; 665 } 666 revnum_array = new_revnum_array; 667 668 avl_remove(&handle->file->stale_info_tree, &commit->avl); 669 670 e = list_begin(&commit->doc_list); 671 while (e) { 672 entry = _get_entry(e, struct stale_info_entry, le); 673 e = list_next(&entry->le); 674 list_remove(&commit->doc_list, &entry->le); 675 676 if (entry->ctx) { 677#ifdef _DOC_COMP 678 if (compress_inmem_stale_info) { 679 // uncompression 680 void* new_uncomp_buf = uncomp_buf; 681 if (uncomp_buflen < entry->ctxlen) { 682 uncomp_buflen = entry->ctxlen; 683 new_uncomp_buf = (void*)realloc(uncomp_buf, uncomp_buflen); 684 } 685 686 if (!new_uncomp_buf) { 687 // realloc() of uncomp_buf failed. 688 fdb_log(NULL, FDB_RESULT_ALLOC_FAIL, 689 "(fdb_get_reusable_block) " 690 "realloc of 'uncomp_buf' failed: " 691 "database file '%s', " 692 "uncomp_buflen %d, " 693 "entry->ctxlen %d\n", 694 handle->file->filename, 695 (int)uncomp_buflen, 696 (int)entry->ctxlen); 697 free(uncomp_buf); 698 free(revnum_array); 699 700 reusable_block_list ret; 701 ret.n_blocks = 0; 702 ret.blocks = NULL; 703 704 return ret; 705 } 706 uncomp_buf = new_uncomp_buf; 707 708 size_t len = uncomp_buflen; 709 r = snappy_uncompress((char*)entry->ctx, entry->comp_ctxlen, 710 (char*)uncomp_buf, &len); 711 if (r != 0) { 712 fdb_log(NULL, FDB_RESULT_COMPRESSION_FAIL, 713 "(fdb_get_reusable_block) " 714 "Uncompression error from a database file '%s'" 715 ": return value %d, header revnum %" _F64 ", " 716 "doc offset %" _F64 "\n", 717 handle->file->filename, r, revnum, entry->offset); 718 free(uncomp_buf); 719 free(revnum_array); 720 721 reusable_block_list ret; 722 ret.n_blocks = 0; 723 ret.blocks = NULL; 724 725 return ret; 726 } 727 728 // fetch the context 729 _fetch_stale_info_doc(uncomp_buf, mergetree, 730 prev_offset, prev_hdr); 731 } else { 732 _fetch_stale_info_doc(entry->ctx, mergetree, 733 prev_offset, prev_hdr); 734 } 735#else 736 _fetch_stale_info_doc(entry->ctx, mergetree, prev_offset, prev_hdr); 737#endif 738 } 739 740 // also insert/merge the system doc region 741 struct stale_regions sr; 742 743 sr = filemgr_actual_stale_regions(handle->file, entry->offset, 744 entry->doclen); 745 746 if (sr.n_regions > 1) { 747 for (i=0; i<sr.n_regions; ++i){ 748 _insert_n_merge(mergetree, sr.regions[i].pos, sr.regions[i].len); 749 } 750 free(sr.regions); 751 } else { 752 _insert_n_merge(mergetree, sr.region.pos, sr.region.len); 753 } 754 755 free(entry->ctx); 756 free(entry); 757 } 758 759 free(commit); 760 } 761 free(uncomp_buf); 762 } 763 764 if (stale_tree_scan) { 765 // scan stale-block tree and get all stale regions 766 // corresponding to commit headers whose seq number is 767 // equal to or smaller than 'revnum_upto' 768 btree_iterator_init(handle->staletree, &bit, NULL); 769 do { 770 br = btree_next(&bit, (void*)&_revnum, (void*)&_offset); 771 btreeblk_end(handle->bhandle); 772 if (br != BTREE_RESULT_SUCCESS) { 773 break; 774 } 775 776 prev_revnum = revnum; 777 revnum = _endian_decode(_revnum); 778 if (revnum > revnum_upto) { 779 revnum = prev_revnum; 780 break; 781 } 782 783 revnum_array[n_revnums++] = revnum; 784 if (n_revnums >= max_revnum_array) { 785 max_revnum_array *= 2; 786 revnum_array = (filemgr_header_revnum_t *) 787 realloc(revnum_array, max_revnum_array * 788 sizeof(filemgr_header_revnum_t)); 789 } 790 offset = _endian_decode(_offset); 791 792 while (offset != BLK_NOT_FOUND) { 793 memset(&doc, 0x0, sizeof(doc)); 794 // pre-allocated buffer for key 795 doc.key = (void*)keybuf; 796 797 if (docio_read_doc(handle->dhandle, offset, &doc, true) <= 0) { 798 // read fail .. escape 799 offset = BLK_NOT_FOUND; 800 continue; 801 } 802 803 _fetch_stale_info_doc(doc.body, mergetree, prev_offset, prev_hdr); 804 805 // also insert/merge the system doc region 806 size_t length = _fdb_get_docsize(doc.length); 807 struct stale_regions sr; 808 809 sr = filemgr_actual_stale_regions(handle->file, offset, length); 810 811 if (sr.n_regions > 1) { 812 for (i=0; i<sr.n_regions; ++i){ 813 _insert_n_merge(mergetree, sr.regions[i].pos, sr.regions[i].len); 814 } 815 free(sr.regions); 816 } else { 817 _insert_n_merge(mergetree, sr.region.pos, sr.region.len); 818 } 819 820 // We don't need to free 'meta' as it will be NULL. 821 free(doc.body); 822 823 offset = prev_offset; 824 } 825 } while (true); 826 btree_iterator_free(&bit); 827 } 828 829 // remove merged commit headers 830 for (i=0; i<n_revnums; ++i) { 831 _revnum = _endian_encode(revnum_array[i]); 832 btree_remove(handle->staletree, (void*)&_revnum); 833 btreeblk_end(handle->bhandle); 834 } 835 836 delta = handle->bhandle->nlivenodes - stat.nlivenodes; 837 _kvs_stat_update_attr(handle->file, 0, KVS_STAT_NLIVENODES, delta); 838 delta = handle->bhandle->ndeltanodes - stat.nlivenodes; 839 delta *= handle->config.blocksize; 840 _kvs_stat_update_attr(handle->file, 0, KVS_STAT_DELTASIZE, delta); 841 842 // gather stale blocks generated by removing b+tree entries 843 if (e_last) { 844 e = list_next(e_last); 845 } else { 846 e = list_begin(handle->file->stale_list); 847 } 848 while (e) { 849 item = _get_entry(e, struct stale_data, le); 850 e = list_remove(handle->file->stale_list, e); 851 852 _insert_n_merge(mergetree, item->pos, item->len); 853 free(item); 854 } 855 856 // now merge stale regions as large as possible 857 size_t n_blocks =0 ; 858 size_t blocksize = handle->file->blocksize; 859 uint32_t max_blocks = 256; 860 uint32_t front_margin; 861 reusable_block_list ret; 862 struct reusable_block *blocks_arr; 863 864 blocks_arr = (struct reusable_block*) 865 calloc(max_blocks, sizeof(struct reusable_block)); 866 867 avl = avl_first(mergetree); 868 while (avl) { 869 item = _get_entry(avl, struct stale_data, avl); 870 avl = avl_next(avl); 871 872 // A stale region can be represented as follows: 873 // 874 // block x-1 | block x | block x+1 | block x+2 875 // -----+----+------------+------------+--------+---- 876 // ... | A | B | C | D | ... 877 // -----+----+------------+------------+--------+---- 878 // 879 // Only segment 'B' and 'C' can be reusable, and the other segments 880 // (i.e., 'A' and 'D') should be re-inserted into stale-block tree. 881 882 if (item->len < blocksize) { 883 // whole region is smaller than a block .. skip this item 884 // | block | ... 885 // -----+---+-----+---+------- 886 // ... | |/////| | ... 887 // -----+---+-----+---+------- 888 continue; 889 } 890 891 // <------------ item_len ------------> 892 // block x-1 | block x | block x+1 | block x+2 893 // -----+----+------------+------------+--------+---- 894 // ... | A | B | C | D | ... 895 // -----+----+------------+------------+--------+---- 896 // ^ 897 // item_pos 898 if (item->pos % blocksize) { 899 front_margin = blocksize - item->pos % blocksize; 900 } else { 901 front_margin = 0; 902 } 903 item_pos = item->pos + front_margin; 904 item_len = item->len - front_margin; 905 906 if (item_len < blocksize) { 907 // Remaining length is smaller than a block. This means that there 908 // is no reusable block in this region (even though the region size is 909 // bigger than a block size) .. skip this item. 910 // 911 // | block x | block x+1 | ... 912 // -----+------+------+-------+-----+---- 913 // ... | |//////|///////| | ... 914 // -----+------+------+-------+-----+---- 915 continue; 916 } 917 918 // calculate # blocks and add to 'blocks' 919 blocks_arr[n_blocks].bid = item_pos / blocksize; 920 blocks_arr[n_blocks].count = item_len / blocksize; 921 n_blocks += 1; 922 if (n_blocks >= max_blocks) { 923 max_blocks *= 2; 924 blocks_arr = (struct reusable_block*) 925 realloc(blocks_arr, max_blocks * sizeof(struct reusable_block)); 926 } 927 928 if (front_margin) { 929 // adjust the existing item to indicate 'A' in above example 930 item->len = front_margin; 931 } else { 932 // exactly aligned .. remove this item 933 avl_remove(mergetree, &item->avl); 934 free(item); 935 } 936 937 uint32_t remaining_len; 938 remaining_len = item_len % blocksize; 939 if (remaining_len) { 940 // add a new item for the remaining region ('D' in above example) 941 struct stale_data *new_item; 942 new_item = (struct stale_data *) 943 calloc(1, sizeof(struct stale_data)); 944 new_item->pos = (blocks_arr[n_blocks-1].bid + blocks_arr[n_blocks-1].count) 945 * blocksize; 946 new_item->len = remaining_len; 947 avl_insert(mergetree, &new_item->avl, _reusable_offset_cmp); 948 avl = avl_next(&new_item->avl); 949 } 950 } 951 952 // re-write stale tree using the last revnum as a key 953 // in this case, only stale regions newly generated by this function are gathered, 954 // and prev_hdr is set to BLK_NOT_FOUND, as corresponding seq numbers are 955 // already removed. 956 957 // however, do not remove the remaining items in the merge-tree and continue to 958 // merge them in the next block reclaim. 959 fdb_gather_stale_blocks(handle, revnum, BLK_NOT_FOUND, BLK_NOT_FOUND, 960 0, e_last, true); 961 962 free(revnum_array); 963 964 ret.n_blocks = n_blocks; 965 ret.blocks = blocks_arr; 966 return ret; 967} 968 969void fdb_rollback_stale_blocks(fdb_kvs_handle *handle, 970 filemgr_header_revnum_t cur_revnum) 971{ 972 btree_result br; 973 filemgr_header_revnum_t i, _revnum; 974 struct avl_node *avl; 975 struct list_elem *elem; 976 struct stale_info_commit *commit, query; 977 struct stale_info_entry *entry; 978 979 if (handle->rollback_revnum == 0) { 980 return; 981 } 982 983 // remove from on-disk stale-tree 984 for (i = handle->rollback_revnum; i < cur_revnum; ++i) { 985 _revnum = _endian_encode(i); 986 br = btree_remove(handle->staletree, (void*)&_revnum); 987 // don't care the result 988 (void)br; 989 btreeblk_end(handle->bhandle); 990 } 991 992 // also remove from in-memory stale-tree 993 query.revnum = handle->rollback_revnum; 994 avl = avl_search(&handle->file->stale_info_tree, 995 &query.avl, _inmem_stale_cmp); 996 if (!avl) { 997 avl = avl_search_greater(&handle->file->stale_info_tree, 998 &query.avl, _inmem_stale_cmp); 999 } 1000 while (avl) { 1001 commit = _get_entry(avl, struct stale_info_commit, avl); 1002 avl = avl_next(avl); 1003 1004 avl_remove(&handle->file->stale_info_tree, &commit->avl); 1005 1006 elem = list_begin(&commit->doc_list); 1007 while (elem) { 1008 entry = _get_entry(elem, struct stale_info_entry, le); 1009 elem = list_remove(&commit->doc_list, &entry->le); 1010 1011 free(entry->ctx); 1012 free(entry); 1013 } 1014 1015 free(commit); 1016 } 1017} 1018 1019