1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2015 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include <stdio.h>
19 #include <string.h>
20 #include <stdint.h>
21 #include <stdlib.h>
22
23 #include "staleblock.h"
24 #include "btreeblock.h"
25 #include "list.h"
26 #include "docio.h"
27 #include "fdb_internal.h"
28 #include "version.h"
29 #include "time_utils.h"
30
31 #ifdef _DOC_COMP
32 #include "snappy-c.h"
33 #endif
34
35 #include "memleak.h"
36
37 static bool compress_inmem_stale_info = true;
38
_inmem_stale_cmp(struct avl_node *a, struct avl_node *b, void *aux)39 static int _inmem_stale_cmp(struct avl_node *a, struct avl_node *b, void *aux)
40 {
41 struct stale_info_commit *aa, *bb;
42 aa = _get_entry(a, struct stale_info_commit, avl);
43 bb = _get_entry(b, struct stale_info_commit, avl);
44
45 if (aa->revnum < bb->revnum) {
46 return -1;
47 } else if (aa->revnum > bb->revnum) {
48 return 1;
49 } else {
50 return 0;
51 }
52 }
53
fdb_add_inmem_stale_info(fdb_kvs_handle *handle, filemgr_header_revnum_t revnum, struct docio_object *doc, uint64_t doc_offset, bool system_doc_only)54 static void fdb_add_inmem_stale_info(fdb_kvs_handle *handle,
55 filemgr_header_revnum_t revnum,
56 struct docio_object *doc,
57 uint64_t doc_offset,
58 bool system_doc_only)
59 {
60 int ret;
61 size_t buflen = 0;
62 struct filemgr *file = handle->file;
63 struct avl_node *a;
64 struct stale_info_commit *item, query;
65 struct stale_info_entry *entry;
66
67 // search using revnum first
68 query.revnum = revnum;
69 a = avl_search(&file->stale_info_tree, &query.avl, _inmem_stale_cmp);
70 if (a) {
71 // already exist
72 item = _get_entry(a, struct stale_info_commit, avl);
73 } else {
74 // not exist .. create a new one and insert into tree
75 item = (struct stale_info_commit *)calloc(1, sizeof(struct stale_info_commit));
76 item->revnum = revnum;
77 list_init(&item->doc_list);
78 avl_insert(&file->stale_info_tree, &item->avl, _inmem_stale_cmp);
79 file->stale_info_tree_loaded.store(true, std::memory_order_relaxed);
80 }
81
82 entry = (struct stale_info_entry *)calloc(1, sizeof(struct stale_info_entry));
83
84 if (!system_doc_only) {
85 #ifdef _DOC_COMP
86 if (compress_inmem_stale_info) {
87 buflen = snappy_max_compressed_length(doc->length.bodylen);;
88 entry->ctx = (void *)calloc(1, buflen);
89 ret = snappy_compress((char*)doc->body, doc->length.bodylen,
90 (char*)entry->ctx, &buflen);
91 if (ret != 0) {
92 fdb_log(NULL, FDB_RESULT_COMPRESSION_FAIL,
93 "(fdb_add_inmem_stale_info) "
94 "Compression error from a database file '%s'"
95 ": return value %d, header revnum %" _F64 ", "
96 "doc offset %" _F64 "\n",
97 file->filename, ret, revnum, doc_offset);
98 if (!a) {
99 // 'item' is allocated in this function call.
100 avl_remove(&file->stale_info_tree, &item->avl);
101 free(item);
102 }
103 free(entry);
104 return;
105 }
106 } else {
107 buflen = doc->length.bodylen;
108 entry->ctx = (void *)calloc(1, buflen);
109 memcpy(entry->ctx, doc->body, doc->length.bodylen);
110 }
111 #else
112 buflen = doc->length.bodylen;
113 entry->ctx = (void *)calloc(1, buflen);
114 memcpy(entry->ctx, doc->body, doc->length.bodylen);
115 #endif
116
117 entry->ctxlen = doc->length.bodylen;
118
119 } else {
120 // when 'system_doc_only' flag is set, just set to NULL.
121 // we need the doc's length and offset info only.
122 entry->ctx = NULL;
123 entry->ctxlen = 0;
124 }
125
126 entry->comp_ctxlen = buflen;
127 entry->doclen = _fdb_get_docsize(doc->length);
128 entry->offset = doc_offset;
129 list_push_back(&item->doc_list, &entry->le);
130 }
131
fdb_load_inmem_stale_info(fdb_kvs_handle *handle)132 void fdb_load_inmem_stale_info(fdb_kvs_handle *handle)
133 {
134 uint8_t keybuf[64];
135 int64_t ret;
136 bid_t offset, _offset, prev_offset;
137 filemgr_header_revnum_t revnum, _revnum;
138 btree_iterator bit;
139 btree_result br;
140 struct filemgr *file = handle->file;
141 struct docio_object doc;
142 bool expected = false;
143
144 if (!std::atomic_compare_exchange_strong(&file->stale_info_tree_loaded,
145 &expected, true)) {
146 // stale info is already loaded (fast screening without mutex)
147 return;
148 }
149
150 // first open of the DB file
151 // should grab mutex to avoid race with other writer
152 filemgr_mutex_lock(file);
153
154 btree_iterator_init(handle->staletree, &bit, NULL);
155 do {
156 br = btree_next(&bit, (void*)&_revnum, (void*)&_offset);
157 btreeblk_end(handle->bhandle);
158 if (br != BTREE_RESULT_SUCCESS) {
159 break;
160 }
161
162 revnum = _endian_decode(_revnum);
163 offset = _endian_decode(_offset);
164
165 while (offset != BLK_NOT_FOUND) {
166 memset(&doc, 0x0, sizeof(doc));
167 // pre-allocated buffer for key
168 doc.key = (void*)keybuf;
169
170 ret = docio_read_doc(handle->dhandle, offset, &doc, true);
171 if (ret <= 0) {
172 // read fail .. escape
173 fdb_log(NULL, (fdb_status)ret,
174 "Error in reading a stale region info document "
175 "from a database file '%s'"
176 ": revnum %" _F64 ", offset %" _F64 "\n",
177 file->filename, revnum, offset);
178 offset = BLK_NOT_FOUND;
179 continue;
180 }
181
182 fdb_add_inmem_stale_info(handle, revnum, &doc, offset, false);
183
184 // fetch previous doc offset
185 memcpy(&_offset, doc.body, sizeof(_offset));
186 prev_offset = _endian_decode(_offset);
187
188 // We don't need to free 'meta' as it will be NULL.
189 free(doc.body);
190
191 offset = prev_offset;
192 }
193 } while (true);
194 btree_iterator_free(&bit);
195
196 filemgr_mutex_unlock(file);
197 }
198
fdb_gather_stale_blocks(fdb_kvs_handle *handle, filemgr_header_revnum_t revnum, bid_t prev_hdr, uint64_t kv_info_offset, fdb_seqnum_t seqnum, struct list_elem *e_last, bool from_mergetree)199 void fdb_gather_stale_blocks(fdb_kvs_handle *handle,
200 filemgr_header_revnum_t revnum,
201 bid_t prev_hdr,
202 uint64_t kv_info_offset,
203 fdb_seqnum_t seqnum,
204 struct list_elem *e_last,
205 bool from_mergetree)
206 {
207 int64_t delta;
208 int r;
209 uint32_t count = 0;
210 uint32_t offset = 0, count_location;
211 uint32_t bufsize = 8192;
212 uint32_t _count, _len;
213 uint64_t _pos, _kv_info_offset;
214 uint8_t *buf = NULL;
215 bid_t doc_offset, _doc_offset;
216 bid_t _prev_hdr;
217 bool gather_staleblocks = true;
218 bool first_loop = true;
219 filemgr_header_revnum_t _revnum;
220 fdb_seqnum_t _seqnum;
221 struct kvs_stat stat;
222
223 /*
224 * << stale block system doc structure >>
225 * [previous doc offset]: 8 bytes (0xffff.. if not exist)
226 * [previous header BID]: 8 bytes (0xffff.. if not exist)
227 * [KVS info doc offset]: 8 bytes (0xffff.. if not exist)
228 * [Default KVS seqnum]: 8 bytes
229 * [# items]: 4 bytes
230 * ---
231 * [position]: 8 bytes
232 * [length]: 4 bytes
233 * ...
234 */
235
236 if (filemgr_get_stale_list(handle->file)) {
237 struct avl_node *a;
238 struct list_elem *e;
239 struct stale_data *item;
240
241 r = _kvs_stat_get(handle->file, 0, &stat);
242 handle->bhandle->nlivenodes = stat.nlivenodes;
243 handle->bhandle->ndeltanodes = stat.nlivenodes;
244 (void)r;
245
246 buf = (uint8_t *)calloc(1, bufsize);
247 _revnum = _endian_encode(revnum);
248
249 // initial previous doc offset
250 memset(buf, 0xff, sizeof(bid_t));
251 count_location = sizeof(bid_t);
252
253 // previous header BID
254 if (prev_hdr == 0 || prev_hdr == BLK_NOT_FOUND) {
255 // does not exist
256 memset(&_prev_hdr, 0xff, sizeof(_prev_hdr));
257 } else {
258 _prev_hdr = _endian_encode(prev_hdr);
259 }
260 memcpy(buf + sizeof(bid_t), &_prev_hdr, sizeof(bid_t));
261 count_location += sizeof(bid_t);
262
263 // KVS info doc offset
264 _kv_info_offset = _endian_encode(kv_info_offset);
265 memcpy(buf + count_location, &_kv_info_offset, sizeof(uint64_t));
266 count_location += sizeof(uint64_t);
267
268 // default KVS seqnum
269 _seqnum = _endian_encode(seqnum);
270 memcpy(buf + count_location, &_seqnum, sizeof(fdb_seqnum_t));
271 count_location += sizeof(fdb_seqnum_t);
272 count_location += sizeof(count);
273
274 while(gather_staleblocks) {
275 // reserve space for
276 // prev offset (8), prev header (8), kv_info_offset (8),
277 // seqnum (8), count (4)
278 offset = count_location;
279
280 if (first_loop && from_mergetree) {
281 // gather from mergetree
282 a = avl_first(&handle->file->mergetree);
283 while (a) {
284 item = _get_entry(a, struct stale_data, avl);
285
286 if (handle->staletree) {
287 count++;
288
289 _pos = _endian_encode(item->pos);
290 _len = _endian_encode(item->len);
291
292 memcpy(buf + offset, &_pos, sizeof(_pos));
293 offset += sizeof(_pos);
294 memcpy(buf + offset, &_len, sizeof(_len));
295 offset += sizeof(_len);
296
297 if (offset + sizeof(_pos) + sizeof(_len) >= bufsize) {
298 bufsize *= 2;
299 buf = (uint8_t*)realloc(buf, bufsize);
300 }
301 }
302
303
304 // If 'from_mergetree' flag is set, it means that this
305 // function is called at the end of fdb_get_reusable_block(),
306 // and those items are remaining (non-reusable) regions after
307 // picking up reusable blocks from 'mergetree'.
308
309 // In the previous implementation, those items are converted
310 // and stored as a system document. The document is re-read in
311 // the next block reclaim, and then we reconstruct 'mergetree'
312 // from the document; this is unnecessary duplicated overhead.
313
314 // As an optimization, we can simply keep those items in
315 // 'mergetree' and use them in the next block reclaim, without
316 // reading the corresponding system document; this also reduces
317 // the commit latency much. Instead, to minimize memory
318 // consumption, we don't need to maintain in-memory copy of the
319 // system doc corresponding to the remaining items in the
320 // 'mergetree', that will be created below.
321
322 // do not remove the item
323 a = avl_next(&item->avl);
324 }
325 } else {
326 // gater from stale_list
327 if (e_last) {
328 e = list_next(e_last);
329 } else {
330 e = list_begin(handle->file->stale_list);
331 }
332 while (e) {
333 item = _get_entry(e, struct stale_data, le);
334
335 if (handle->staletree) {
336 count++;
337
338 _pos = _endian_encode(item->pos);
339 _len = _endian_encode(item->len);
340
341 memcpy(buf + offset, &_pos, sizeof(_pos));
342 offset += sizeof(_pos);
343 memcpy(buf + offset, &_len, sizeof(_len));
344 offset += sizeof(_len);
345
346 if (offset + sizeof(_pos) + sizeof(_len) >= bufsize) {
347 bufsize *= 2;
348 buf = (uint8_t*)realloc(buf, bufsize);
349 }
350 }
351
352 e = list_remove(handle->file->stale_list, e);
353 free(item);
354 }
355 }
356
357 gather_staleblocks = false;
358 if (count) {
359 char *doc_key = alca(char, 32);
360 struct docio_object doc;
361
362 // store count
363 _count = _endian_encode(count);
364 memcpy(buf + count_location - sizeof(_count), &_count, sizeof(_count));
365
366 // append a system doc
367 memset(&doc, 0x0, sizeof(doc));
368 // add one to 'revnum' to get the next revision number
369 // (note that filemgr_mutex() is grabbed so that no other thread
370 // will change the 'revnum').
371 sprintf(doc_key, "stale_blocks_%" _F64, revnum);
372 doc.key = (void*)doc_key;
373 doc.body = buf;
374 doc.length.keylen = strlen(doc_key) + 1;
375 doc.length.metalen = 0;
376 doc.length.bodylen = offset;
377 doc.seqnum = 0;
378 doc_offset = docio_append_doc_system(handle->dhandle, &doc);
379
380 // insert into stale-block tree
381 _doc_offset = _endian_encode(doc_offset);
382 btree_insert(handle->staletree, (void *)&_revnum, (void *)&_doc_offset);
383 btreeblk_end(handle->bhandle);
384 btreeblk_reset_subblock_info(handle->bhandle);
385
386 if (from_mergetree && first_loop) {
387 // if from_mergetree flag is set and this is the first loop,
388 // stale regions in this document are already in mergetree
389 // so skip adding them into in-memory stale info tree.
390
391 // however, the system doc itself should be marked as stale
392 // when the doc is reclaimed, thus we instead add a dummy entry
393 // that containing doc offset, length info only.
394 fdb_add_inmem_stale_info(handle, revnum, &doc, doc_offset, true);
395 } else {
396 // add the doc into in-memory stale info tree
397 fdb_add_inmem_stale_info(handle, revnum, &doc, doc_offset, false);
398 }
399
400 if (list_begin(filemgr_get_stale_list(handle->file))) {
401 // updating stale tree brings another stale blocks.
402 // recursively update until there is no more stale block.
403
404 // note that infinite loop will not occur because
405 // 1) all updated index blocks for stale tree are still writable
406 // 2) incoming keys for stale tree (revnum) are monotonic
407 // increasing order; most recently allocated node will be
408 // updated again.
409
410 count = 0;
411 // save previous doc offset
412 memcpy(buf, &_doc_offset, sizeof(_doc_offset));
413
414 // gather once again
415 gather_staleblocks = true;
416 }
417 }
418
419 first_loop = false;
420 } // gather stale blocks
421
422 delta = handle->bhandle->nlivenodes - stat.nlivenodes;
423 _kvs_stat_update_attr(handle->file, 0, KVS_STAT_NLIVENODES, delta);
424 delta = handle->bhandle->ndeltanodes - stat.nlivenodes;
425 delta *= handle->config.blocksize;
426 _kvs_stat_update_attr(handle->file, 0, KVS_STAT_DELTASIZE, delta);
427
428 free(buf);
429 } else {
430 btreeblk_reset_subblock_info(handle->bhandle);
431 }
432 }
433
_reusable_offset_cmp(struct avl_node *a, struct avl_node *b, void *aux)434 static int _reusable_offset_cmp(struct avl_node *a, struct avl_node *b, void *aux)
435 {
436 struct stale_data *aa, *bb;
437 aa = _get_entry(a, struct stale_data, avl);
438 bb = _get_entry(b, struct stale_data, avl);
439 return _CMP_U64(aa->pos, bb->pos);
440 }
441
_insert_n_merge(struct avl_tree *tree, uint64_t item_pos, uint32_t item_len)442 static void _insert_n_merge(struct avl_tree *tree,
443 uint64_t item_pos,
444 uint32_t item_len)
445 {
446 struct stale_data query, *item;
447 struct avl_node *avl;
448
449 // retrieve the tree first
450 query.pos = item_pos;
451 avl = avl_search(tree, &query.avl, _reusable_offset_cmp);
452 if (avl) {
453 // same offset already exists
454 item = _get_entry(avl, struct stale_data, avl);
455 // choose longer length
456 if (item->len < item_len) {
457 item->len = item_len;
458 }
459 } else {
460 // does not exist .. create a new item
461 item = (struct stale_data*)
462 calloc(1, sizeof(struct stale_data));
463 item->pos = item_pos;
464 item->len = item_len;
465 avl_insert(tree, &item->avl, _reusable_offset_cmp);
466 }
467
468 // check prev/next item to see if they can be merged
469 struct avl_node *p_avl, *n_avl;
470 struct stale_data*p_item, *n_item;
471 p_avl = avl_prev(&item->avl);
472 if (p_avl) {
473 p_item = _get_entry(p_avl, struct stale_data, avl);
474 if (p_item->pos + p_item->len >= item->pos) {
475
476 if (p_item->pos + p_item->len >= item->pos + item->len) {
477 // 'item' is included in p_item .. simply remove it
478 // (do nothing)
479 } else {
480 // overlapping (or consecutive) .. merge two items
481 p_item->len += item->len +
482 (item->pos - p_item->pos - p_item->len);
483 }
484 // remove current item
485 avl_remove(tree, &item->avl);
486 free(item);
487 item = p_item;
488 }
489 }
490
491 n_avl = avl_next(&item->avl);
492 if (n_avl) {
493 n_item = _get_entry(n_avl, struct stale_data, avl);
494 if (item->pos + item->len >= n_item->pos) {
495
496 if (item->pos + item->len >= n_item->pos + n_item->len) {
497 // 'n_item' is included in 'item' .. simply remove it
498 // (do nothing)
499 } else {
500 // overlapping (or consecutive) .. merge two items
501 item->len += n_item->len +
502 (n_item->pos - item->pos - item->len);
503 }
504 // remove next item
505 avl_remove(tree, &n_item->avl);
506 free(n_item);
507 }
508 }
509 }
510
511 // Parse & fetch stale regions from the buffer 'ctx', which is the body of
512 // a stale info system document (from either in-memory stale-block-tree or
513 // on-disk stale-block-tree). After fetching, insert those regions
514 // into 'mergetree'.
_fetch_stale_info_doc(void *ctx, struct avl_tree *mergetree, uint64_t &prev_offset_out, uint64_t &prev_hdr_out)515 static void _fetch_stale_info_doc(void *ctx,
516 struct avl_tree *mergetree,
517 uint64_t &prev_offset_out,
518 uint64_t &prev_hdr_out)
519 {
520 uint32_t i, count, _count, item_len;
521 uint64_t pos;
522 uint64_t item_pos;
523
524 pos = 0;
525
526 // get previous doc offset
527 memcpy(&prev_offset_out, ctx, sizeof(prev_offset_out));
528 prev_offset_out = _endian_decode(prev_offset_out);
529 pos += sizeof(prev_offset_out);
530
531 // get previous header BID
532 memcpy(&prev_hdr_out, (uint8_t*)ctx + pos, sizeof(prev_hdr_out));
533 prev_hdr_out = _endian_decode(prev_hdr_out);
534 (void)prev_hdr_out;
535 pos += sizeof(prev_hdr_out);
536
537 // Skip kv_info_offset and default KVS's seqnum
538 pos += sizeof(uint64_t) + sizeof(fdb_seqnum_t);
539
540 // get count;
541 memcpy(&_count, (uint8_t*)ctx + pos, sizeof(_count));
542 count = _endian_decode(_count);
543 pos += sizeof(_count);
544
545 // get a stale region and insert/merge into tree
546 for (i=0;i<count;++i) {
547 memcpy(&item_pos, (uint8_t*)ctx + pos, sizeof(item_pos));
548 item_pos = _endian_decode(item_pos);
549 pos += sizeof(item_pos);
550
551 memcpy(&item_len, (uint8_t*)ctx + pos, sizeof(item_len));
552 item_len = _endian_decode(item_len);
553 pos += sizeof(item_len);
554
555 _insert_n_merge(mergetree, item_pos, item_len);
556 }
557 }
558
fdb_get_reusable_block(fdb_kvs_handle *handle, stale_header_info stale_header)559 reusable_block_list fdb_get_reusable_block(fdb_kvs_handle *handle,
560 stale_header_info stale_header)
561 {
562 int64_t delta;
563 int r;
564 uint8_t keybuf[64];
565 uint32_t i;
566 uint32_t item_len;
567 uint32_t n_revnums, max_revnum_array = 256;
568 uint64_t item_pos;
569 btree_iterator bit;
570 btree_result br;
571 filemgr_header_revnum_t revnum_upto, prev_revnum = 0;
572 filemgr_header_revnum_t revnum = 0, _revnum;
573 filemgr_header_revnum_t *revnum_array;
574 bid_t offset, _offset, prev_offset;
575 bid_t prev_hdr = BLK_NOT_FOUND;
576 bool stale_tree_scan = true;
577 struct docio_object doc;
578 struct avl_tree *mergetree = &handle->file->mergetree;
579 struct avl_node *avl;
580 struct stale_data *item;
581 struct list_elem *e, *e_last;
582 struct kvs_stat stat;
583
584 revnum_upto = stale_header.revnum;
585
586 r = _kvs_stat_get(handle->file, 0, &stat);
587 handle->bhandle->nlivenodes = stat.nlivenodes;
588 handle->bhandle->ndeltanodes = stat.nlivenodes;
589 (void)r;
590
591 revnum_array = (filemgr_header_revnum_t *)
592 calloc(max_revnum_array, sizeof(filemgr_header_revnum_t));
593 n_revnums = 0;
594
595 // remember the last stale list item to be preserved
596 e_last = list_end(handle->file->stale_list);
597
598 avl = avl_first(&handle->file->stale_info_tree);
599 if (avl) {
600 // if in-memory stale info exists
601 void *uncomp_buf = NULL;
602 int r;
603 size_t uncomp_buflen = 128*1024; // 128 KB by default;
604 struct stale_info_commit *commit;
605 struct stale_info_entry *entry;
606
607 stale_tree_scan = false;
608
609 if (compress_inmem_stale_info) {
610 uncomp_buf = (void*)calloc(1, uncomp_buflen);
611 if (!uncomp_buf) {
612 fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
613 "(fdb_get_reusable_block) "
614 "calloc of 'uncomp_buf' failed: "
615 "database file '%s', "
616 "uncomp_buflen %d\n",
617 handle->file->filename,
618 (int)uncomp_buflen);
619 free(revnum_array);
620
621 reusable_block_list ret;
622 ret.n_blocks = 0;
623 ret.blocks = NULL;
624
625 return ret;
626 }
627 }
628
629 while (avl) {
630 commit = _get_entry(avl, struct stale_info_commit, avl);
631 avl = avl_next(avl);
632
633 prev_revnum = revnum;
634 revnum = commit->revnum;
635 if (revnum > revnum_upto) {
636 revnum = prev_revnum;
637 break;
638 }
639
640 filemgr_header_revnum_t *new_revnum_array = revnum_array;
641 revnum_array[n_revnums++] = revnum;
642 if (n_revnums >= max_revnum_array) {
643 max_revnum_array *= 2;
644 new_revnum_array = (filemgr_header_revnum_t *)
645 realloc(revnum_array, max_revnum_array *
646 sizeof(filemgr_header_revnum_t));
647 }
648 if (!new_revnum_array) {
649 // realloc() of revnum_array failed.
650 fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
651 "(fdb_get_reusable_block) "
652 "realloc of 'revnum_array' failed: "
653 "database file '%s', "
654 "max_revnum_array %d\n",
655 handle->file->filename,
656 (int)max_revnum_array);
657 free(uncomp_buf);
658 free(revnum_array);
659
660 reusable_block_list ret;
661 ret.n_blocks = 0;
662 ret.blocks = NULL;
663
664 return ret;
665 }
666 revnum_array = new_revnum_array;
667
668 avl_remove(&handle->file->stale_info_tree, &commit->avl);
669
670 e = list_begin(&commit->doc_list);
671 while (e) {
672 entry = _get_entry(e, struct stale_info_entry, le);
673 e = list_next(&entry->le);
674 list_remove(&commit->doc_list, &entry->le);
675
676 if (entry->ctx) {
677 #ifdef _DOC_COMP
678 if (compress_inmem_stale_info) {
679 // uncompression
680 void* new_uncomp_buf = uncomp_buf;
681 if (uncomp_buflen < entry->ctxlen) {
682 uncomp_buflen = entry->ctxlen;
683 new_uncomp_buf = (void*)realloc(uncomp_buf, uncomp_buflen);
684 }
685
686 if (!new_uncomp_buf) {
687 // realloc() of uncomp_buf failed.
688 fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
689 "(fdb_get_reusable_block) "
690 "realloc of 'uncomp_buf' failed: "
691 "database file '%s', "
692 "uncomp_buflen %d, "
693 "entry->ctxlen %d\n",
694 handle->file->filename,
695 (int)uncomp_buflen,
696 (int)entry->ctxlen);
697 free(uncomp_buf);
698 free(revnum_array);
699
700 reusable_block_list ret;
701 ret.n_blocks = 0;
702 ret.blocks = NULL;
703
704 return ret;
705 }
706 uncomp_buf = new_uncomp_buf;
707
708 size_t len = uncomp_buflen;
709 r = snappy_uncompress((char*)entry->ctx, entry->comp_ctxlen,
710 (char*)uncomp_buf, &len);
711 if (r != 0) {
712 fdb_log(NULL, FDB_RESULT_COMPRESSION_FAIL,
713 "(fdb_get_reusable_block) "
714 "Uncompression error from a database file '%s'"
715 ": return value %d, header revnum %" _F64 ", "
716 "doc offset %" _F64 "\n",
717 handle->file->filename, r, revnum, entry->offset);
718 free(uncomp_buf);
719 free(revnum_array);
720
721 reusable_block_list ret;
722 ret.n_blocks = 0;
723 ret.blocks = NULL;
724
725 return ret;
726 }
727
728 // fetch the context
729 _fetch_stale_info_doc(uncomp_buf, mergetree,
730 prev_offset, prev_hdr);
731 } else {
732 _fetch_stale_info_doc(entry->ctx, mergetree,
733 prev_offset, prev_hdr);
734 }
735 #else
736 _fetch_stale_info_doc(entry->ctx, mergetree, prev_offset, prev_hdr);
737 #endif
738 }
739
740 // also insert/merge the system doc region
741 struct stale_regions sr;
742
743 sr = filemgr_actual_stale_regions(handle->file, entry->offset,
744 entry->doclen);
745
746 if (sr.n_regions > 1) {
747 for (i=0; i<sr.n_regions; ++i){
748 _insert_n_merge(mergetree, sr.regions[i].pos, sr.regions[i].len);
749 }
750 free(sr.regions);
751 } else {
752 _insert_n_merge(mergetree, sr.region.pos, sr.region.len);
753 }
754
755 free(entry->ctx);
756 free(entry);
757 }
758
759 free(commit);
760 }
761 free(uncomp_buf);
762 }
763
764 if (stale_tree_scan) {
765 // scan stale-block tree and get all stale regions
766 // corresponding to commit headers whose seq number is
767 // equal to or smaller than 'revnum_upto'
768 btree_iterator_init(handle->staletree, &bit, NULL);
769 do {
770 br = btree_next(&bit, (void*)&_revnum, (void*)&_offset);
771 btreeblk_end(handle->bhandle);
772 if (br != BTREE_RESULT_SUCCESS) {
773 break;
774 }
775
776 prev_revnum = revnum;
777 revnum = _endian_decode(_revnum);
778 if (revnum > revnum_upto) {
779 revnum = prev_revnum;
780 break;
781 }
782
783 revnum_array[n_revnums++] = revnum;
784 if (n_revnums >= max_revnum_array) {
785 max_revnum_array *= 2;
786 revnum_array = (filemgr_header_revnum_t *)
787 realloc(revnum_array, max_revnum_array *
788 sizeof(filemgr_header_revnum_t));
789 }
790 offset = _endian_decode(_offset);
791
792 while (offset != BLK_NOT_FOUND) {
793 memset(&doc, 0x0, sizeof(doc));
794 // pre-allocated buffer for key
795 doc.key = (void*)keybuf;
796
797 if (docio_read_doc(handle->dhandle, offset, &doc, true) <= 0) {
798 // read fail .. escape
799 offset = BLK_NOT_FOUND;
800 continue;
801 }
802
803 _fetch_stale_info_doc(doc.body, mergetree, prev_offset, prev_hdr);
804
805 // also insert/merge the system doc region
806 size_t length = _fdb_get_docsize(doc.length);
807 struct stale_regions sr;
808
809 sr = filemgr_actual_stale_regions(handle->file, offset, length);
810
811 if (sr.n_regions > 1) {
812 for (i=0; i<sr.n_regions; ++i){
813 _insert_n_merge(mergetree, sr.regions[i].pos, sr.regions[i].len);
814 }
815 free(sr.regions);
816 } else {
817 _insert_n_merge(mergetree, sr.region.pos, sr.region.len);
818 }
819
820 // We don't need to free 'meta' as it will be NULL.
821 free(doc.body);
822
823 offset = prev_offset;
824 }
825 } while (true);
826 btree_iterator_free(&bit);
827 }
828
829 // remove merged commit headers
830 for (i=0; i<n_revnums; ++i) {
831 _revnum = _endian_encode(revnum_array[i]);
832 btree_remove(handle->staletree, (void*)&_revnum);
833 btreeblk_end(handle->bhandle);
834 }
835
836 delta = handle->bhandle->nlivenodes - stat.nlivenodes;
837 _kvs_stat_update_attr(handle->file, 0, KVS_STAT_NLIVENODES, delta);
838 delta = handle->bhandle->ndeltanodes - stat.nlivenodes;
839 delta *= handle->config.blocksize;
840 _kvs_stat_update_attr(handle->file, 0, KVS_STAT_DELTASIZE, delta);
841
842 // gather stale blocks generated by removing b+tree entries
843 if (e_last) {
844 e = list_next(e_last);
845 } else {
846 e = list_begin(handle->file->stale_list);
847 }
848 while (e) {
849 item = _get_entry(e, struct stale_data, le);
850 e = list_remove(handle->file->stale_list, e);
851
852 _insert_n_merge(mergetree, item->pos, item->len);
853 free(item);
854 }
855
856 // now merge stale regions as large as possible
857 size_t n_blocks =0 ;
858 size_t blocksize = handle->file->blocksize;
859 uint32_t max_blocks = 256;
860 uint32_t front_margin;
861 reusable_block_list ret;
862 struct reusable_block *blocks_arr;
863
864 blocks_arr = (struct reusable_block*)
865 calloc(max_blocks, sizeof(struct reusable_block));
866
867 avl = avl_first(mergetree);
868 while (avl) {
869 item = _get_entry(avl, struct stale_data, avl);
870 avl = avl_next(avl);
871
872 // A stale region can be represented as follows:
873 //
874 // block x-1 | block x | block x+1 | block x+2
875 // -----+----+------------+------------+--------+----
876 // ... | A | B | C | D | ...
877 // -----+----+------------+------------+--------+----
878 //
879 // Only segment 'B' and 'C' can be reusable, and the other segments
880 // (i.e., 'A' and 'D') should be re-inserted into stale-block tree.
881
882 if (item->len < blocksize) {
883 // whole region is smaller than a block .. skip this item
884 // | block | ...
885 // -----+---+-----+---+-------
886 // ... | |/////| | ...
887 // -----+---+-----+---+-------
888 continue;
889 }
890
891 // <------------ item_len ------------>
892 // block x-1 | block x | block x+1 | block x+2
893 // -----+----+------------+------------+--------+----
894 // ... | A | B | C | D | ...
895 // -----+----+------------+------------+--------+----
896 // ^
897 // item_pos
898 if (item->pos % blocksize) {
899 front_margin = blocksize - item->pos % blocksize;
900 } else {
901 front_margin = 0;
902 }
903 item_pos = item->pos + front_margin;
904 item_len = item->len - front_margin;
905
906 if (item_len < blocksize) {
907 // Remaining length is smaller than a block. This means that there
908 // is no reusable block in this region (even though the region size is
909 // bigger than a block size) .. skip this item.
910 //
911 // | block x | block x+1 | ...
912 // -----+------+------+-------+-----+----
913 // ... | |//////|///////| | ...
914 // -----+------+------+-------+-----+----
915 continue;
916 }
917
918 // calculate # blocks and add to 'blocks'
919 blocks_arr[n_blocks].bid = item_pos / blocksize;
920 blocks_arr[n_blocks].count = item_len / blocksize;
921 n_blocks += 1;
922 if (n_blocks >= max_blocks) {
923 max_blocks *= 2;
924 blocks_arr = (struct reusable_block*)
925 realloc(blocks_arr, max_blocks * sizeof(struct reusable_block));
926 }
927
928 if (front_margin) {
929 // adjust the existing item to indicate 'A' in above example
930 item->len = front_margin;
931 } else {
932 // exactly aligned .. remove this item
933 avl_remove(mergetree, &item->avl);
934 free(item);
935 }
936
937 uint32_t remaining_len;
938 remaining_len = item_len % blocksize;
939 if (remaining_len) {
940 // add a new item for the remaining region ('D' in above example)
941 struct stale_data *new_item;
942 new_item = (struct stale_data *)
943 calloc(1, sizeof(struct stale_data));
944 new_item->pos = (blocks_arr[n_blocks-1].bid + blocks_arr[n_blocks-1].count)
945 * blocksize;
946 new_item->len = remaining_len;
947 avl_insert(mergetree, &new_item->avl, _reusable_offset_cmp);
948 avl = avl_next(&new_item->avl);
949 }
950 }
951
952 // re-write stale tree using the last revnum as a key
953 // in this case, only stale regions newly generated by this function are gathered,
954 // and prev_hdr is set to BLK_NOT_FOUND, as corresponding seq numbers are
955 // already removed.
956
957 // however, do not remove the remaining items in the merge-tree and continue to
958 // merge them in the next block reclaim.
959 fdb_gather_stale_blocks(handle, revnum, BLK_NOT_FOUND, BLK_NOT_FOUND,
960 0, e_last, true);
961
962 free(revnum_array);
963
964 ret.n_blocks = n_blocks;
965 ret.blocks = blocks_arr;
966 return ret;
967 }
968
fdb_rollback_stale_blocks(fdb_kvs_handle *handle, filemgr_header_revnum_t cur_revnum)969 void fdb_rollback_stale_blocks(fdb_kvs_handle *handle,
970 filemgr_header_revnum_t cur_revnum)
971 {
972 btree_result br;
973 filemgr_header_revnum_t i, _revnum;
974 struct avl_node *avl;
975 struct list_elem *elem;
976 struct stale_info_commit *commit, query;
977 struct stale_info_entry *entry;
978
979 if (handle->rollback_revnum == 0) {
980 return;
981 }
982
983 // remove from on-disk stale-tree
984 for (i = handle->rollback_revnum; i < cur_revnum; ++i) {
985 _revnum = _endian_encode(i);
986 br = btree_remove(handle->staletree, (void*)&_revnum);
987 // don't care the result
988 (void)br;
989 btreeblk_end(handle->bhandle);
990 }
991
992 // also remove from in-memory stale-tree
993 query.revnum = handle->rollback_revnum;
994 avl = avl_search(&handle->file->stale_info_tree,
995 &query.avl, _inmem_stale_cmp);
996 if (!avl) {
997 avl = avl_search_greater(&handle->file->stale_info_tree,
998 &query.avl, _inmem_stale_cmp);
999 }
1000 while (avl) {
1001 commit = _get_entry(avl, struct stale_info_commit, avl);
1002 avl = avl_next(avl);
1003
1004 avl_remove(&handle->file->stale_info_tree, &commit->avl);
1005
1006 elem = list_begin(&commit->doc_list);
1007 while (elem) {
1008 entry = _get_entry(elem, struct stale_info_entry, le);
1009 elem = list_remove(&commit->doc_list, &entry->le);
1010
1011 free(entry->ctx);
1012 free(entry);
1013 }
1014
1015 free(commit);
1016 }
1017 }
1018
1019