xref: /6.0.3/forestdb/src/superblock.cc (revision c3456ce2)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <string.h>
20#include <stdint.h>
21#include <stdlib.h>
22
23#include "libforestdb/forestdb.h"
24#include "superblock.h"
25#include "staleblock.h"
26#include "btreeblock.h"
27#include "list.h"
28#include "fdb_internal.h"
29#include "version.h"
30#include "blockcache.h"
31
32#include "memleak.h"
33
34/*
35 * << super block structure >>
36 *
37 * 0x00 [file version (magic number)]:           8 bytes
38 * 0x08 [super block revision number]:           8 bytes
39 * 0x10 [bitmap revision number]:                8 bytes
40 * 0x18 [BID for next allocation]:               8 bytes
41 * 0x20 [last header BID]:                       8 bytes
42 * 0x28 [last header revnum]:                    8 bytes
43 * 0x30 [min active header revnum]:              8 bytes
44 * 0x38 [min active header BID]:                 8 bytes
45 * 0x40 [# initial free blocks in the bitmap]:   8 bytes
46 * 0x48 [# free blocks in the bitmap]:           8 bytes
47 * 0x50 [bitmap size]:                           8 bytes
48 * 0x58 [reserved bitmap size (0 if not exist)]: 8 bytes
49 * 0x60 ... [bitmap doc offset]:                 8 bytes each
50 *      [CRC32]:                                 4 bytes
51 * ...
52 *      [block marker]:                          1 byte
53 */
54
55
56/*
57 * << basic mask >>
58 * 0: 10000000
59 * 1: 01000000
60 *    ...
61 * 7: 00000001
62 */
63static uint8_t bmp_basic_mask[8];
64/*
65 * << 2d mask >>
66 * bmp_2d_mask[pos][len]
67 *
68 * 0,1: 10000000
69 * 0,2: 11000000
70 * 0,3: 11100000
71 *    ...
72 * 1,1: 01000000
73 * 1,2: 01100000
74 *    ...
75 * 3,1: 00010000
76 * 3,2: 00011000
77 *    ...
78 * 6,1: 00000010
79 * 6,2: 00000011
80 * 7,1: 00000001
81 */
82static uint8_t bmp_2d_mask[8][9];
83
84
85// x % 8
86#define mod8(x) ((x) & 0x7)
87// x / 8
88#define div8(x) ((x) >> 3)
89// rounding down (to the nearest 8)
90#define rd8(x) (((x) >> 3) << 3)
91
92struct bmp_idx_node {
93    uint64_t id;
94    struct avl_node avl;
95};
96
97INLINE int _bmp_idx_cmp(struct avl_node *a, struct avl_node *b, void *aux)
98{
99    struct bmp_idx_node *aa, *bb;
100    aa = _get_entry(a, struct bmp_idx_node, avl);
101    bb = _get_entry(b, struct bmp_idx_node, avl);
102
103#ifdef __BIT_CMP
104    return _CMP_U64(aa->id, bb->id);
105#else
106    if (aa->id < bb->id) {
107        return -1;
108    } else if (aa->id > bb->id) {
109        return 1;
110    } else {
111        return 0;
112    }
113#endif
114}
115
116static void _add_bmp_idx(struct avl_tree *bmp_idx, bid_t bid, bid_t count)
117{
118    bid_t cur, start_id, stop_id;
119    struct avl_node *a;
120    struct bmp_idx_node *item, query;
121
122    // 256 blocks per node
123    start_id = bid >> 8;
124    stop_id = (bid+count-1) >> 8;
125
126    for (cur=start_id; cur<=stop_id; ++cur) {
127        query.id = cur;
128        a = avl_search(bmp_idx, &query.avl, _bmp_idx_cmp);
129        if (a) {
130            // already exists .. do nothing
131        } else {
132            // create a node
133            item = (struct bmp_idx_node*)calloc(1, sizeof(struct bmp_idx_node));
134            item->id = query.id;
135            avl_insert(bmp_idx, &item->avl, _bmp_idx_cmp);
136        }
137    }
138}
139
140static void _free_bmp_idx(struct avl_tree *bmp_idx)
141{
142    // free all supplemental bmp idx nodes
143    struct avl_node *a;
144    struct bmp_idx_node *item;
145    a = avl_first(bmp_idx);
146    while (a) {
147        item = _get_entry(a, struct bmp_idx_node, avl);
148        a = avl_next(a);
149        avl_remove(bmp_idx, &item->avl);
150        free(item);
151    }
152}
153
154static void _construct_bmp_idx(struct avl_tree *bmp_idx,
155                               uint8_t *bmp,
156                               uint64_t bmp_size,
157                               bid_t start_bid)
158{
159    uint64_t i, node_idx;
160    uint64_t *bmp64 = (uint64_t*)bmp;
161    struct bmp_idx_node *item;
162
163    if (start_bid == BLK_NOT_FOUND) {
164        start_bid = 0;
165    }
166
167    // Since a single byte includes 8 bitmaps, an 8-byte integer contains 64 bitmaps.
168    // By converting bitmap array to uint64_t array, we can quickly verify if a
169    // 64-bitmap-group has at least one non-zero bit or not.
170    for (i=start_bid/64; i<bmp_size/64; ++i) {
171        // in this loop, 'i' denotes bitmap group number.
172        node_idx = i/4;
173        if (bmp64[i]) {
174            item = (struct bmp_idx_node *)calloc(1, sizeof(struct bmp_idx_node));
175            item->id = node_idx;
176            avl_insert(bmp_idx, &item->avl, _bmp_idx_cmp);
177            // skip other bitmaps in the same bitmap index node
178            // (1 bitmap index node == 4 bitmap groups == 256 bitmaps)
179            i = (node_idx+1)*4;
180        }
181    }
182
183    // If there are remaining bitmaps, check if they are non-zero or not one by one.
184    if (bmp_size % 64) {
185        uint8_t idx, off;
186        uint64_t start;
187
188        start = (bmp_size/64)*64;
189        if (start < start_bid) {
190            start = start_bid;
191        }
192
193        for (i=start; i<bmp_size; ++i) {
194            // in this loop, 'i' denotes bitmap number (i.e., BID).
195            idx = div8(i);
196            off = mod8(i);
197            if (bmp[idx] & bmp_basic_mask[off]) {
198                node_idx = i >> 8;
199                item = (struct bmp_idx_node *)calloc(1, sizeof(struct bmp_idx_node));
200                item->id = node_idx;
201                avl_insert(bmp_idx, &item->avl, _bmp_idx_cmp);
202                i = (node_idx+1)*256;
203            }
204        }
205    }
206}
207
208INLINE size_t _bmp_size_to_num_docs(uint64_t bmp_size)
209{
210    if (bmp_size) {
211        // 8 bitmaps per byte
212        uint64_t num_bits_per_doc = 8 * SB_MAX_BITMAP_DOC_SIZE;
213        return (bmp_size + num_bits_per_doc - 1) / num_bits_per_doc;
214    } else {
215        return 0;
216    }
217}
218
219INLINE bool _is_bmp_set(uint8_t *bmp, bid_t bid)
220{
221    return (bmp[div8(bid)] & bmp_basic_mask[mod8(bid)]);
222}
223
224static void sb_bmp_barrier_on(struct superblock *sb)
225{
226    atomic_incr_uint64_t(&sb->bmp_rcount);
227    if (atomic_get_uint64_t(&sb->bmp_wcount)) {
228        // now bmp pointer & related variables are being changed.
229        // decrease count and grab lock until the change is done.
230        atomic_decr_uint64_t(&sb->bmp_rcount);
231        spin_lock(&sb->bmp_lock);
232
233        // got control: means that change is done.
234        // re-increase the count
235        atomic_incr_uint64_t(&sb->bmp_rcount);
236        spin_unlock(&sb->bmp_lock);
237    }
238}
239
240static void sb_bmp_barrier_off(struct superblock *sb)
241{
242    atomic_decr_uint64_t(&sb->bmp_rcount);
243}
244
245static void sb_bmp_change_begin(struct superblock *sb)
246{
247    // grab lock and increase writer count
248    // now new readers cannot increase the reader count
249    // (note that there is always one bitmap writer at a time)
250    spin_lock(&sb->bmp_lock);
251    atomic_incr_uint64_t(&sb->bmp_wcount);
252
253    // wait until previous BMP readers terminate
254    // (they are very short bitmap accessing routines so that
255    //  will not take long time).
256    size_t spin = 0;
257    while (atomic_get_uint64_t(&sb->bmp_rcount)) {
258       if (++spin > 64) {
259#ifdef HAVE_SCHED_H
260            sched_yield();
261#elif _MSC_VER
262            SwitchToThread();
263#endif
264       }
265    }
266
267    // now 1) all previous readers terminated
268    //     2) new readers will be blocked
269}
270
271static void sb_bmp_change_end(struct superblock *sb)
272{
273    atomic_decr_uint64_t(&sb->bmp_wcount);
274    spin_unlock(&sb->bmp_lock);
275    // now resume all pending readers
276}
277
278void sb_bmp_append_doc(fdb_kvs_handle *handle)
279{
280    // == write bitmap into system docs ==
281    // calculate # docs (1MB by default)
282    // (1MB bitmap covers 32GB DB file)
283    size_t i;
284    uint64_t num_docs;
285    char doc_key[64];
286    struct superblock *sb = handle->file->sb;
287
288    // mark stale if previous doc offset exists
289    if (sb->bmp_doc_offset) {
290        for (i=0; i<sb->num_bmp_docs; ++i) {
291            filemgr_mark_stale(handle->file, sb->bmp_doc_offset[i],
292                _fdb_get_docsize(sb->bmp_docs[i].length));
293        }
294
295        free(sb->bmp_doc_offset);
296        free(sb->bmp_docs);
297        sb->bmp_doc_offset = NULL;
298        sb->bmp_docs = NULL;
299    }
300
301    uint64_t sb_bmp_size = atomic_get_uint64_t(&sb->bmp_size);
302    sb->num_bmp_docs = num_docs = _bmp_size_to_num_docs(sb_bmp_size);
303    if (num_docs) {
304        sb->bmp_doc_offset = (bid_t*)calloc(num_docs, sizeof(bid_t));
305        sb->bmp_docs = (struct docio_object*)
306                       calloc(num_docs, sizeof(struct docio_object));
307    }
308
309    // bitmap doc offsets
310    for (i=0; i<num_docs; ++i) {
311        // append a system doc for bitmap chunk
312        memset(&sb->bmp_docs[i], 0x0, sizeof(struct docio_object));
313        sprintf(doc_key, "bitmap_%" _F64 "_%d",
314                atomic_get_uint64_t(&sb->bmp_revnum), (int)i);
315        sb->bmp_docs[i].key = (void*)doc_key;
316        sb->bmp_docs[i].meta = NULL;
317        sb->bmp_docs[i].body = sb->bmp + (i * SB_MAX_BITMAP_DOC_SIZE);
318
319        sb->bmp_docs[i].length.keylen = strlen(doc_key)+1;
320        sb->bmp_docs[i].length.metalen = 0;
321        if (i == num_docs - 1) {
322            // the last doc
323            sb->bmp_docs[i].length.bodylen =
324                (sb_bmp_size / 8) % SB_MAX_BITMAP_DOC_SIZE;
325        } else {
326            // otherwise: 1MB
327            sb->bmp_docs[i].length.bodylen = SB_MAX_BITMAP_DOC_SIZE;
328        }
329        sb->bmp_docs[i].seqnum = 0;
330        sb->bmp_doc_offset[i] =
331            docio_append_doc_system(handle->dhandle, &sb->bmp_docs[i]);
332    }
333}
334
335void sb_rsv_append_doc(fdb_kvs_handle *handle)
336{
337    size_t i;
338    uint64_t num_docs;
339    char doc_key[64];
340    struct superblock *sb = handle->file->sb;
341    struct sb_rsv_bmp *rsv = NULL;
342
343    if (!sb) {
344        return;
345    }
346
347    rsv = sb->rsv_bmp;
348    if (!rsv ||
349        atomic_get_uint32_t(&rsv->status) != SB_RSV_INITIALIZING) {
350        return;
351    }
352
353    rsv->num_bmp_docs = num_docs = _bmp_size_to_num_docs(rsv->bmp_size);
354    if (num_docs) {
355        rsv->bmp_doc_offset = (bid_t*)calloc(num_docs, sizeof(bid_t));
356        rsv->bmp_docs = (struct docio_object*)
357                        calloc(num_docs, sizeof(struct docio_object));
358    }
359
360    // bitmap doc offsets
361    for (i=0; i<num_docs; ++i) {
362        // append a system doc for bitmap chunk
363        memset(&rsv->bmp_docs[i], 0x0, sizeof(struct docio_object));
364        sprintf(doc_key, "bitmap_%" _F64 "_%d", rsv->bmp_revnum, (int)i);
365        rsv->bmp_docs[i].key = (void*)doc_key;
366        rsv->bmp_docs[i].meta = NULL;
367        rsv->bmp_docs[i].body = rsv->bmp + (i * SB_MAX_BITMAP_DOC_SIZE);
368
369        rsv->bmp_docs[i].length.keylen = strlen(doc_key)+1;
370        rsv->bmp_docs[i].length.metalen = 0;
371        if (i == num_docs - 1) {
372            // the last doc
373            rsv->bmp_docs[i].length.bodylen =
374                (rsv->bmp_size / 8) % SB_MAX_BITMAP_DOC_SIZE;
375        } else {
376            // otherwise: 1MB
377            rsv->bmp_docs[i].length.bodylen = SB_MAX_BITMAP_DOC_SIZE;
378        }
379        rsv->bmp_docs[i].seqnum = 0;
380        rsv->bmp_doc_offset[i] =
381            docio_append_doc_system(handle->dhandle, &rsv->bmp_docs[i]);
382    }
383
384    // now 'rsv_bmp' is available.
385    atomic_store_uint32_t(&rsv->status, SB_RSV_READY);
386}
387
388fdb_status sb_bmp_fetch_doc(fdb_kvs_handle *handle)
389{
390    // == read bitmap from system docs ==
391    size_t i;
392    uint64_t num_docs;
393    int64_t r_offset;
394    char doc_key[64];
395    struct superblock *sb = handle->file->sb;
396    struct sb_rsv_bmp *rsv = NULL;
397
398    // skip if previous bitmap exists OR
399    // there is no bitmap to be fetched (fast screening)
400    if (sb->bmp.load(std::memory_order_relaxed) ||
401        atomic_get_uint64_t(&sb->bmp_size) == 0) {
402        return FDB_RESULT_SUCCESS;
403    }
404
405    spin_lock(&sb->lock);
406
407    // check once again if superblock is already initialized
408    // while the thread was blocked by the lock.
409    if (sb->bmp.load(std::memory_order_relaxed)) {
410        spin_unlock(&sb->lock);
411        return FDB_RESULT_SUCCESS;
412    }
413
414    uint64_t sb_bmp_size = atomic_get_uint64_t(&sb->bmp_size);
415    sb->num_bmp_docs = num_docs = _bmp_size_to_num_docs(sb_bmp_size);
416    if (!num_docs) {
417        spin_unlock(&sb->lock);
418        return FDB_RESULT_SUCCESS;
419    }
420
421    free(sb->bmp);
422    sb->bmp = (uint8_t*)calloc(1, (sb_bmp_size+7) / 8);
423
424    for (i=0; i<num_docs; ++i) {
425        memset(&sb->bmp_docs[i], 0x0, sizeof(struct docio_object));
426        // pre-allocated buffer for key
427        sb->bmp_docs[i].key = (void*)doc_key;
428        // directly point to the bitmap
429        sb->bmp_docs[i].body = sb->bmp + (i * SB_MAX_BITMAP_DOC_SIZE);
430
431        r_offset = docio_read_doc(handle->dhandle, sb->bmp_doc_offset[i],
432                                  &sb->bmp_docs[i], true);
433        if (r_offset <= 0) {
434            // read fail
435            free(sb->bmp);
436            sb->bmp = NULL;
437
438            spin_unlock(&sb->lock);
439            return r_offset < 0 ? (fdb_status) r_offset : FDB_RESULT_SB_READ_FAIL;
440        }
441    }
442
443    _construct_bmp_idx(&sb->bmp_idx, sb->bmp, sb_bmp_size,
444                       atomic_get_uint64_t(&sb->cur_alloc_bid));
445
446    rsv = sb->rsv_bmp;
447    if (rsv && atomic_get_uint32_t(&rsv->status) == SB_RSV_INITIALIZING) {
448        // reserved bitmap exists
449        rsv->num_bmp_docs = num_docs = _bmp_size_to_num_docs(rsv->bmp_size);
450        if (!num_docs) {
451            spin_unlock(&sb->lock);
452            return FDB_RESULT_SUCCESS;
453        }
454
455        rsv->bmp = (uint8_t*)calloc(1, (rsv->bmp_size+7) / 8);
456
457        for (i=0; i<num_docs; ++i) {
458
459            memset(&rsv->bmp_docs[i], 0x0, sizeof(struct docio_object));
460            // pre-allocated buffer for key
461            rsv->bmp_docs[i].key = (void*)doc_key;
462            // directly point to the (reserved) bitmap
463            rsv->bmp_docs[i].body = rsv->bmp + (i * SB_MAX_BITMAP_DOC_SIZE);
464
465            r_offset = docio_read_doc(handle->dhandle, rsv->bmp_doc_offset[i],
466                                      &rsv->bmp_docs[i], true);
467            if (r_offset <= 0) {
468                // read fail
469                free(rsv->bmp);
470                free(sb->bmp);
471                rsv->bmp = sb->bmp = NULL;
472
473                spin_unlock(&sb->lock);
474                return r_offset < 0 ? (fdb_status) r_offset : FDB_RESULT_SB_READ_FAIL;
475            }
476        }
477
478        _construct_bmp_idx(&rsv->bmp_idx, rsv->bmp, rsv->bmp_size, 0);
479        atomic_store_uint32_t(&rsv->status, SB_RSV_READY);
480    }
481
482    spin_unlock(&sb->lock);
483    return FDB_RESULT_SUCCESS;
484}
485
486bool sb_check_sync_period(fdb_kvs_handle *handle)
487{
488    struct superblock *sb = handle->file->sb;
489
490    if (sb && sb->num_alloc * handle->file->blocksize > SB_SYNC_PERIOD) {
491        return true;
492    }
493    return false;
494}
495
496bool sb_update_header(fdb_kvs_handle *handle)
497{
498    bool ret = false;
499    struct superblock *sb = handle->file->sb;
500
501    if (sb && atomic_get_uint64_t(&sb->last_hdr_bid) != handle->last_hdr_bid &&
502        atomic_get_uint64_t(&sb->last_hdr_revnum) < handle->cur_header_revnum) {
503
504        atomic_store_uint64_t(&sb->last_hdr_bid, handle->last_hdr_bid);
505        atomic_store_uint64_t(&sb->last_hdr_revnum, handle->cur_header_revnum);
506
507        uint64_t lw_revnum = atomic_get_uint64_t(&handle->file->last_writable_bmp_revnum);
508        if (lw_revnum == atomic_get_uint64_t(&sb->bmp_revnum) &&
509            sb->bmp_prev) {
510            free(sb->bmp_prev);
511            sb->bmp_prev = NULL;
512        }
513        ret = true;
514    }
515
516    return ret;
517}
518
519void sb_reset_num_alloc(fdb_kvs_handle *handle)
520{
521    if (handle->file->sb) {
522        handle->file->sb->num_alloc = 0;
523    }
524}
525
526fdb_status sb_sync_circular(fdb_kvs_handle *handle)
527{
528    uint64_t sb_revnum;
529    fdb_status fs;
530
531    sb_revnum = atomic_get_uint64_t(&handle->file->sb->revnum);
532    fs = sb_write(handle->file,
533                  sb_revnum % handle->file->sb->config->num_sb,
534                  &handle->log_callback);
535    return fs;
536}
537
538sb_decision_t sb_check_block_reusing(fdb_kvs_handle *handle)
539
540{
541    // start block reusing when
542    // 1) if blocks are not reused yet in this file:
543    //    when file size becomes larger than the threshold
544    // 2) otherwise:
545    //    when # free blocks decreases under the threshold
546
547    uint64_t live_datasize;
548    uint64_t filesize;
549    uint64_t ratio;
550    struct superblock *sb = handle->file->sb;
551
552    if (!sb) {
553        return SBD_NONE;
554    }
555
556    if (filemgr_get_file_status(handle->file) != FILE_NORMAL) {
557        // being compacted file does not allow block reusing
558        return SBD_NONE;
559    }
560
561    uint64_t block_reusing_threshold =
562        atomic_get_uint64_t(&handle->file->config->block_reusing_threshold,
563                            std::memory_order_relaxed);
564    if (block_reusing_threshold == 0 || block_reusing_threshold >= 100) {
565        // circular block reusing is disabled
566        return SBD_NONE;
567    }
568
569    filesize = filemgr_get_pos(handle->file);
570    if (filesize < SB_MIN_BLOCK_REUSING_FILESIZE) {
571        return SBD_NONE;
572    }
573
574    // at least # keeping headers should exist
575    // since the last block reusing
576    if (handle->cur_header_revnum <=
577        sb->min_live_hdr_revnum +
578        atomic_get_uint64_t(&handle->file->config->num_keeping_headers,
579                            std::memory_order_relaxed)) {
580        return SBD_NONE;
581    }
582
583    live_datasize = fdb_estimate_space_used(handle->fhandle);
584    if (filesize == 0 || live_datasize == 0 ||
585        live_datasize > filesize) {
586        return SBD_NONE;
587    }
588
589    ratio = (filesize - live_datasize) * 100 / filesize;
590
591    if (ratio > block_reusing_threshold) {
592        if (!sb_bmp_exists(sb)) {
593            // block reusing has not been started yet
594            return SBD_RECLAIM;
595        } else {
596            // stale blocks are already being reused before
597            if (sb->num_free_blocks == 0) {
598                if (sb->rsv_bmp) {
599                    // reserved bitmap exists
600                    return SBD_SWITCH;
601                } else {
602                    // re-reclaim
603                    return SBD_RECLAIM;
604                }
605            } else if ( (sb->num_free_blocks * 100 <
606                         sb->num_init_free_blocks * SB_PRE_RECLAIM_RATIO)) {
607                if (sb->num_init_free_blocks * handle->file->config->blocksize >
608                    SB_MIN_BLOCK_REUSING_FILESIZE)  {
609                    return SBD_RESERVE;
610                }
611            }
612        }
613    }
614
615    return SBD_NONE;
616}
617
618bool sb_reclaim_reusable_blocks(fdb_kvs_handle *handle)
619{
620    size_t i;
621    uint64_t num_blocks, bmp_size_byte;
622    stale_header_info sheader;
623    reusable_block_list blist;
624    struct superblock *sb = handle->file->sb;
625
626    // should flush all dirty blocks in cache
627    filemgr_sync(handle->file, false, &handle->log_callback);
628
629    sheader = fdb_get_smallest_active_header(handle);
630    if (sheader.bid == BLK_NOT_FOUND) {
631        return false;
632    }
633
634    // get reusable block list
635    blist = fdb_get_reusable_block(handle, sheader);
636
637    // update superblock's bitmap
638    uint8_t *new_bmp = NULL, *old_bmp = NULL;
639    num_blocks = filemgr_get_pos(handle->file) / handle->file->blocksize;
640    // 8 bitmaps per byte
641    bmp_size_byte = (num_blocks+7) / 8;
642    fdb_assert(num_blocks >= SB_DEFAULT_NUM_SUPERBLOCKS,
643               num_blocks, SB_DEFAULT_NUM_SUPERBLOCKS);
644    new_bmp = (uint8_t*)calloc(1, bmp_size_byte);
645
646    // free pre-existing bmp index
647    _free_bmp_idx(&sb->bmp_idx);
648
649    for (i=0; i<blist.n_blocks; ++i) {
650        sb_bmp_set(new_bmp, blist.blocks[i].bid, blist.blocks[i].count);
651        if (i==0 &&
652            atomic_get_uint64_t(&sb->cur_alloc_bid) == BLK_NOT_FOUND) {
653            atomic_store_uint64_t(&sb->cur_alloc_bid, blist.blocks[i].bid);
654        }
655        sb->num_free_blocks += blist.blocks[i].count;
656        // add info for supplementary bmp index
657        _add_bmp_idx(&sb->bmp_idx, blist.blocks[i].bid, blist.blocks[i].count);
658    }
659    free(blist.blocks);
660
661    sb_bmp_change_begin(sb);
662    old_bmp = sb->bmp.load(std::memory_order_relaxed);
663    sb->bmp.store(new_bmp, std::memory_order_relaxed);
664    atomic_store_uint64_t(&sb->bmp_size, num_blocks);
665    sb->min_live_hdr_revnum = sheader.revnum;
666    sb->min_live_hdr_bid = sheader.bid;
667    atomic_incr_uint64_t(&sb->bmp_revnum);
668    sb->num_init_free_blocks = sb->num_free_blocks;
669    sb_bmp_change_end(sb);
670    free(old_bmp);
671
672    return true;
673}
674
675bool sb_reserve_next_reusable_blocks(fdb_kvs_handle *handle)
676{
677    size_t i;
678    uint64_t num_blocks, bmp_size_byte;
679    stale_header_info sheader;
680    reusable_block_list blist;
681    struct superblock *sb = handle->file->sb;
682    struct sb_rsv_bmp *rsv = NULL;
683
684    if (sb->rsv_bmp) {
685        // next bitmap already reclaimed
686        return false;
687    }
688
689    sheader = fdb_get_smallest_active_header(handle);
690    if (sheader.bid == BLK_NOT_FOUND) {
691        return false;
692    }
693
694    // get reusable block list
695    blist = fdb_get_reusable_block(handle, sheader);
696
697    // calculate bitmap size
698    num_blocks = filemgr_get_pos(handle->file) / handle->file->blocksize;
699    bmp_size_byte = (num_blocks+7) / 8;
700    if (num_blocks) {
701        rsv = (struct sb_rsv_bmp*)calloc(1, sizeof(struct sb_rsv_bmp));
702        rsv->bmp = (uint8_t*)calloc(1, bmp_size_byte);
703        rsv->cur_alloc_bid = BLK_NOT_FOUND;
704
705        // the initial status is 'INITIALIZING' so that 'rsv_bmp' is not
706        // available until executing sb_rsv_append_doc().
707        atomic_init_uint32_t(&rsv->status, SB_RSV_INITIALIZING);
708        avl_init(&rsv->bmp_idx, NULL);
709        rsv->bmp_size = num_blocks;
710
711        for (i=0; i<blist.n_blocks; ++i) {
712            sb_bmp_set(rsv->bmp, blist.blocks[i].bid, blist.blocks[i].count);
713            if (i==0 && rsv->cur_alloc_bid == BLK_NOT_FOUND) {
714                rsv->cur_alloc_bid = blist.blocks[i].bid;
715            }
716            rsv->num_free_blocks += blist.blocks[i].count;
717            _add_bmp_idx(&rsv->bmp_idx, blist.blocks[i].bid, blist.blocks[i].count);
718        }
719        free(blist.blocks);
720
721        rsv->min_live_hdr_revnum = sheader.revnum;
722        rsv->min_live_hdr_bid = sheader.bid;
723        rsv->bmp_revnum = atomic_get_uint64_t(&sb->bmp_revnum)+1;
724        sb->rsv_bmp = rsv;
725    }
726
727    return true;
728}
729
730static void _rsv_free(struct sb_rsv_bmp *rsv);
731void sb_return_reusable_blocks(fdb_kvs_handle *handle)
732{
733    uint64_t node_id;
734    bid_t cur;
735    struct superblock *sb = handle->file->sb;
736    struct sb_rsv_bmp *rsv;
737    struct avl_node *a;
738    struct bmp_idx_node *item, query;
739
740    if (!sb) {
741        // we don't need to do this.
742        return;
743    }
744
745    // re-insert all remaining bitmap into stale list
746    uint64_t sb_bmp_size = atomic_get_uint64_t(&sb->bmp_size);
747    for (cur = atomic_get_uint64_t(&sb->cur_alloc_bid); cur < sb_bmp_size; ++cur) {
748        if (_is_bmp_set(sb->bmp, cur)) {
749            filemgr_add_stale_block(handle->file, cur, 1);
750        }
751
752        if ((cur % 256) == 0 && cur > 0) {
753            // node ID changes
754            // remove & free current bmp node
755            node_id = cur / 256;
756            query.id = node_id - 1;
757            a = avl_search(&sb->bmp_idx, &query.avl, _bmp_idx_cmp);
758            if (a) {
759                item = _get_entry(a, struct bmp_idx_node, avl);
760                avl_remove(&sb->bmp_idx, a);
761                free(item);
762            }
763
764            // move to next bmp node
765            do {
766                a = avl_first(&sb->bmp_idx);
767                if (a) {
768                    item = _get_entry(a, struct bmp_idx_node, avl);
769                    if (item->id <= node_id) {
770                        avl_remove(&sb->bmp_idx, a);
771                        free(item);
772                        continue;
773                    }
774                    cur = item->id * 256;
775                    break;
776                }
777
778                // no more reusable block
779                cur = sb_bmp_size;
780                break;
781            } while (true);
782        }
783    }
784    sb->num_free_blocks = 0;
785    atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
786
787    // do the same work for the reserved blocks if exist
788    rsv = sb->rsv_bmp;
789    if (rsv &&
790        atomic_cas_uint32_t(&rsv->status, SB_RSV_READY, SB_RSV_VOID)) {
791
792        for (cur = rsv->cur_alloc_bid; cur < rsv->bmp_size; ++cur) {
793            if (_is_bmp_set(rsv->bmp, cur)) {
794                filemgr_add_stale_block(handle->file, cur, 1);
795            }
796
797            if ((cur % 256) == 0 && cur > 0) {
798                // node ID changes
799                // remove & free current bmp node
800                node_id = cur / 256;
801                query.id = node_id - 1;
802                a = avl_search(&rsv->bmp_idx, &query.avl, _bmp_idx_cmp);
803                if (a) {
804                    item = _get_entry(a, struct bmp_idx_node, avl);
805                    avl_remove(&rsv->bmp_idx, a);
806                    free(item);
807                }
808
809                // move to next bmp node
810                do {
811                    a = avl_first(&rsv->bmp_idx);
812                    if (a) {
813                        item = _get_entry(a, struct bmp_idx_node, avl);
814                        if (item->id <= node_id) {
815                            avl_remove(&rsv->bmp_idx, a);
816                            free(item);
817                            continue;
818                        }
819                        cur = item->id * 256;
820                        break;
821                    }
822
823                    // no more reusable block
824                    cur = rsv->bmp_size;
825                    break;
826                } while (true);
827            }
828        }
829        rsv->num_free_blocks = 0;
830        rsv->cur_alloc_bid = BLK_NOT_FOUND;
831
832        _free_bmp_idx(&rsv->bmp_idx);
833        _rsv_free(rsv);
834        free(rsv);
835        sb->rsv_bmp = NULL;
836    }
837
838    // re-store into stale tree using next header's revnum
839    filemgr_header_revnum_t revnum = handle->cur_header_revnum;
840    fdb_gather_stale_blocks(handle, revnum+1, BLK_NOT_FOUND, BLK_NOT_FOUND, 0, NULL, false);
841}
842
843void sb_bmp_mask_init()
844{
845    // preset masks to speed up bitmap set/clear operations
846    size_t i, pos, len;
847    for (i=0; i<8; ++i) {
848        bmp_basic_mask[i] = 0x1 << (7-i);
849    }
850    for (pos=0; pos<8; ++pos) {
851        for (len=0; len<9; ++len) {
852            bmp_2d_mask[pos][len] = 0x0;
853            if (len != 0 && pos+len <= 8) {
854                for (i=0; i<len; ++i) {
855                    bmp_2d_mask[pos][len] |= bmp_basic_mask[pos+i];
856                }
857            }
858        }
859    }
860}
861
862INLINE void _sb_bmp_update(uint8_t *bmp, bid_t bid, uint64_t len, int mode)
863{
864    // mode==0: bitmap clear
865    // mode==1: bitmap set
866
867    uint64_t front_pos, front_len, rear_pos, rear_len;
868    uint64_t mid_pos, mid_len;
869
870    //      front_len        rear_len
871    //      <->              <-->
872    // 00000111 | 11111111 | 11110000
873    //      ^     <------> mid
874    //      front_pos
875
876    front_pos = bid;
877    front_len = 8 - mod8(front_pos);
878
879    if (front_len >= len) {
880        front_len = len;
881        rear_pos = rear_len = mid_pos = mid_len = 0;
882    } else {
883        rear_pos = rd8(bid + len);
884        rear_len = mod8(bid + len);
885
886        mid_pos = bid + front_len;
887        mid_len = len - front_len - rear_len;
888    }
889
890    // front bitmaps
891    if (front_len) {
892        if (mode) {
893            bmp[div8(front_pos)] |= bmp_2d_mask[mod8(front_pos)][front_len];
894        } else {
895            bmp[div8(front_pos)] &= ~bmp_2d_mask[mod8(front_pos)][front_len];
896        }
897    }
898    // rear bitmaps
899    if (rear_len) {
900        if (mode) {
901            bmp[div8(rear_pos)] |= bmp_2d_mask[mod8(rear_pos)][rear_len];
902        } else {
903            bmp[div8(rear_pos)] &= ~bmp_2d_mask[mod8(rear_pos)][rear_len];
904        }
905    }
906
907    // mid bitmaps
908    uint8_t mask = (mode)?(0xff):(0x0);
909    if (mid_len == 8) {
910        // 8 bitmaps (1 byte)
911        bmp[div8(mid_pos)] = mask;
912    } else if (mid_len < 64) {
913        // 16 ~ 56 bitmaps (2 ~ 7 bytes)
914        size_t i;
915        for (i=0; i<mid_len; i+=8) {
916            bmp[div8(mid_pos+i)] = mask;
917        }
918    } else {
919        // larger than 64 bitmaps (8 bytes)
920        memset(bmp + div8(mid_pos), mask, div8(mid_len));
921    }
922}
923
924void sb_bmp_set(uint8_t *bmp, bid_t bid, uint64_t len)
925{
926    _sb_bmp_update(bmp, bid, len, 1);
927}
928
929void sb_bmp_clear(uint8_t *bmp, bid_t bid, uint64_t len)
930{
931    _sb_bmp_update(bmp, bid, len, 0);
932}
933
934bool sb_switch_reserved_blocks(struct filemgr *file)
935{
936    size_t i;
937    struct superblock *sb = file->sb;
938    struct sb_rsv_bmp *rsv = sb->rsv_bmp;
939
940    // reserved block should exist
941    if (!rsv) {
942        return false;
943    }
944    // should be in a normal status
945    if (!atomic_cas_uint32_t(&rsv->status, SB_RSV_READY, SB_RSV_VOID)) {
946        return false;
947    }
948    // now status becomes 'VOID' so that rsv_bmp is not available.
949
950    // mark stale previous system docs
951    if (sb->bmp_doc_offset) {
952        for (i=0; i<sb->num_bmp_docs; ++i) {
953            filemgr_mark_stale(file, sb->bmp_doc_offset[i],
954                _fdb_get_docsize(sb->bmp_docs[i].length));
955        }
956
957        free(sb->bmp_doc_offset);
958        free(sb->bmp_docs);
959        sb->bmp_doc_offset = NULL;
960        sb->bmp_docs = NULL;
961    }
962
963    // should flush all dirty blocks in cache
964    filemgr_sync(file, false, NULL);
965
966    // free current bitmap idx
967    _free_bmp_idx(&sb->bmp_idx);
968
969    // temporarily keep current bitmap
970    sb_bmp_change_begin(sb);
971    uint8_t *old_prev_bmp = NULL;
972    if (sb->bmp_prev) {
973        old_prev_bmp = sb->bmp_prev;
974    }
975    sb->bmp_prev = sb->bmp;
976    sb->bmp_prev_size = atomic_get_uint64_t(&sb->bmp_size);
977
978    // copy all pointers from rsv to sb
979    atomic_store(&sb->bmp_revnum, rsv->bmp_revnum);
980    atomic_store_uint64_t(&sb->bmp_size, rsv->bmp_size);
981    sb->bmp = rsv->bmp;
982    sb->bmp_idx = rsv->bmp_idx;
983    sb->bmp_doc_offset = rsv->bmp_doc_offset;
984    sb->bmp_docs = rsv->bmp_docs;
985    sb->num_bmp_docs = rsv->num_bmp_docs;
986    sb->num_free_blocks = sb->num_init_free_blocks = rsv->num_free_blocks;
987    atomic_store_uint64_t(&sb->cur_alloc_bid, rsv->cur_alloc_bid);
988    sb->min_live_hdr_revnum = rsv->min_live_hdr_revnum;
989    sb->min_live_hdr_bid = rsv->min_live_hdr_bid;
990    sb_bmp_change_end(sb);
991
992    free(old_prev_bmp);
993    free(sb->rsv_bmp);
994    sb->rsv_bmp = NULL;
995
996    return true;
997}
998
999bid_t sb_alloc_block(struct filemgr *file)
1000{
1001    uint64_t i, node_idx, node_off, bmp_idx, bmp_off;
1002    bid_t ret = BLK_NOT_FOUND;
1003    struct superblock *sb = file->sb;
1004    struct avl_node *a;
1005    struct bmp_idx_node *item, query;
1006
1007    sb->num_alloc++;
1008sb_alloc_start_over:
1009    if (!sb_bmp_exists(sb)) {
1010        // no bitmap
1011        return BLK_NOT_FOUND;
1012    }
1013
1014    if (sb->num_free_blocks == 0) {
1015        bool switched = false;
1016        if (sb->rsv_bmp) {
1017            switched = sb_switch_reserved_blocks(file);
1018        }
1019        if (switched) {
1020            goto sb_alloc_start_over;
1021        } else {
1022            atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
1023            return BLK_NOT_FOUND;
1024        }
1025    }
1026
1027    ret = atomic_get_uint64_t(&sb->cur_alloc_bid);
1028    sb->num_free_blocks--;
1029
1030    if (sb->num_free_blocks == 0) {
1031        bool switched = false;
1032        if (sb->rsv_bmp) {
1033            switched = sb_switch_reserved_blocks(file);
1034        }
1035        if (!switched) {
1036            atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
1037        }
1038        return ret;
1039    }
1040
1041    // find allocable block in the same bmp idx node
1042    node_idx = ret >> 8;
1043    node_off = (ret & 0xff)+1;
1044    do {
1045        for (i=node_off; i<256; ++i) {
1046            bmp_idx = div8(i) + (node_idx * 32);
1047            bmp_off = mod8(i);
1048
1049            if (bmp_idx*8 + bmp_off >= atomic_get_uint64_t(&sb->bmp_size)) {
1050                break;
1051            }
1052
1053            if (sb->bmp[bmp_idx] & bmp_basic_mask[bmp_off]) {
1054                atomic_store_uint64_t(&sb->cur_alloc_bid, bmp_idx*8 + bmp_off);
1055                return ret;
1056            }
1057        }
1058
1059        // current bmp_node does not include any free block .. remove
1060        query.id = node_idx;
1061        a = avl_search(&sb->bmp_idx, &query.avl, _bmp_idx_cmp);
1062        if (a) {
1063            item = _get_entry(a, struct bmp_idx_node, avl);
1064            avl_remove(&sb->bmp_idx, a);
1065            free(item);
1066        }
1067
1068        // get next allocable bmp_node
1069        a = avl_first(&sb->bmp_idx);
1070        if (!a) {
1071            // no more free bmp_node
1072            sb->num_free_blocks = 0;
1073            bool switched = false;
1074            if (sb->rsv_bmp) {
1075                switched = sb_switch_reserved_blocks(file);
1076            }
1077            if (!switched) {
1078                atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
1079            }
1080            break;
1081        }
1082        item = _get_entry(a, struct bmp_idx_node, avl);
1083        node_idx = item->id;
1084        node_off = 0;
1085    } while (true);
1086
1087    return ret;
1088}
1089
1090bool sb_bmp_is_writable(struct filemgr *file, bid_t bid)
1091{
1092    if (bid < file->sb->config->num_sb) {
1093        // superblocks are always writable
1094        return true;
1095    }
1096
1097    bool ret = false;
1098    bid_t last_commit = atomic_get_uint64_t(&file->last_commit) / file->blocksize;
1099    uint64_t lw_bmp_revnum = atomic_get_uint64_t(&file->last_writable_bmp_revnum);
1100    struct superblock *sb = file->sb;
1101
1102    sb_bmp_barrier_on(sb);
1103
1104    uint8_t *sb_bmp = sb->bmp;
1105    if (atomic_get_uint64_t(&sb->bmp_revnum) == lw_bmp_revnum) {
1106        // Same bitmap revision number: there are 2 possible cases
1107        //
1108        // (1) normal case
1109        //               writable blocks
1110        //            <---->          <--->
1111        // +-------------------------------------------+
1112        // |       xxxxxxxxx          xxxxxxxxxx       | (x: reused blocks)
1113        // +-------------------------------------------+
1114        //            ^                   ^
1115        //            last_commit         cur_alloc
1116        //
1117        // (2) when file size grows after block reusing
1118        //                                   writable blocks
1119        //                             <------->       <--------->
1120        // +-------------------------------------------+---------+
1121        // |       xxxxxxxxx          xxxxxxxxxx       |         |
1122        // +-------------------------------------------+---------+
1123        //                             ^               ^         ^
1124        //                             last_commit     bmp_size  cur_alloc
1125
1126        if (bid < atomic_get_uint64_t(&sb->bmp_size)) {
1127            // BID is in the bitmap .. check if bitmap is set.
1128            if (_is_bmp_set(sb_bmp, bid) &&
1129                bid < atomic_get_uint64_t(&sb->cur_alloc_bid) &&
1130                bid >= last_commit) {
1131                ret = true;
1132            }
1133        } else {
1134            // BID is out-of-range of the bitmap
1135            if (bid >= last_commit) {
1136                ret = true;
1137            }
1138        }
1139    } else {
1140        // Different bitmap revision number: there are also 2 possible cases
1141        //
1142        // (1) normal case
1143        //     writable blocks                 writable blocks
1144        //         <---->                          <--->
1145        // +-------------------------------------------+
1146        // |       xxxxxxxxx          xxxxxxxxxx       |
1147        // +-------------------------------------------+
1148        //              ^                          ^
1149        //              cur_alloc                  last_commit
1150        //
1151        // (2) when file size grows after block reusing
1152        //         writable blocks             writable blocks
1153        //      <---->       <------>      <-----------x--------->
1154        // +-------------------------------------------+---------+
1155        // |    xxxxxx       xxxxxxxx                  |         |
1156        // +-------------------------------------------+---------+
1157        //                                 ^           ^         ^
1158        //                                 last_commit bmp_size  cur_alloc
1159
1160        // the block is writable if
1161        // 1) BID >= last_commit OR
1162        // 2) BID < cur_alloc_bid AND corresponding bitmap is set.
1163        if (bid >= last_commit) {
1164            // if prev_bmp exists, last commit position is still located on the
1165            // previous bitmap (since prev_bmp is released when a commit is invoked
1166            // in the new bitmap).
1167            // So in this case, we have to check both previous/current bitmaps.
1168            //
1169            // (1: previous bitmap (sb->bmp_prev), 2: current bitmap (sb->bmp))
1170            //
1171            //        writable blocks          writable blocks
1172            //    <-->      <---->   <-->          <> <> <>
1173            // +-------------------------------------------+
1174            // |  2222 1111 222222   2222   111111111 22 11|
1175            // +-------------------------------------------+
1176            //                                     ^
1177            //                                     last_commit
1178            if (sb->bmp_prev) {
1179                if (bid < sb->bmp_prev_size &&
1180                    _is_bmp_set(sb->bmp_prev, bid)) {
1181                    ret = true;
1182                }
1183                if (bid < atomic_get_uint64_t(&sb->bmp_size) &&
1184                    _is_bmp_set(sb_bmp, bid)) {
1185                    ret = true;
1186                }
1187                if (bid >= atomic_get_uint64_t(&sb->bmp_size)) {
1188                    // blocks newly allocated beyond the bitmap size:
1189                    // always writable
1190                    ret = true;
1191                }
1192            } else {
1193                // bmp_prev doesn't exist even though bmp_revnum is different
1194                // this happens on the first block reclaim only
1195                // so all blocks whose 'BID >= last_commit' are writable.
1196                ret = true;
1197            }
1198        }
1199
1200        if (bid < atomic_get_uint64_t(&sb->bmp_size) &&
1201            bid < atomic_get_uint64_t(&sb->cur_alloc_bid) &&
1202            _is_bmp_set(sb_bmp, bid)) {
1203            // 'cur_alloc_bid' is always smaller than 'bmp_size'
1204            // except for the case 'cur_alloc_bid == BLK_NOT_FOUND'
1205            ret = true;
1206        }
1207    }
1208
1209    sb_bmp_barrier_off(sb);
1210
1211    return ret;
1212}
1213
1214fdb_status sb_write(struct filemgr *file, size_t sb_no,
1215                    err_log_callback * log_callback)
1216{
1217    int r;
1218    int real_blocksize = file->blocksize;
1219    int blocksize = file->blocksize - BLK_MARKER_SIZE;
1220    uint8_t *buf = alca(uint8_t, real_blocksize);
1221    uint32_t crc, _crc;
1222    uint64_t enc_u64;
1223    uint64_t num_docs;
1224    size_t i, offset;
1225    fdb_status fs;
1226
1227    memset(buf, 0x0, real_blocksize);
1228
1229    offset = 0;
1230    // magic number
1231    enc_u64 = _endian_encode(file->version);
1232    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1233    offset += sizeof(enc_u64);
1234
1235    // revision number
1236    uint64_t sb_revnum = atomic_get_uint64_t(&file->sb->revnum);
1237    enc_u64 = _endian_encode(sb_revnum);
1238    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1239    offset += sizeof(enc_u64);
1240
1241    // bitmap's revision number
1242    enc_u64 = _endian_encode(atomic_get_uint64_t(&file->sb->bmp_revnum));
1243    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1244    offset += sizeof(enc_u64);
1245
1246    // cur_alloc_bid
1247    bid_t sb_cur_alloc_bid = atomic_get_uint64_t(&file->sb->cur_alloc_bid);
1248    enc_u64 = _endian_encode(sb_cur_alloc_bid);
1249    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1250    offset += sizeof(enc_u64);
1251
1252    // last header bid
1253    bid_t sb_last_hdr_bid = atomic_get_uint64_t(&file->sb->last_hdr_bid);
1254    enc_u64 = _endian_encode(sb_last_hdr_bid);
1255    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1256    offset += sizeof(enc_u64);
1257
1258    // last header rev number
1259    uint64_t sb_last_hdr_revnum = atomic_get_uint64_t(&file->sb->last_hdr_revnum);
1260    enc_u64 = _endian_encode(sb_last_hdr_revnum);
1261    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1262    offset += sizeof(enc_u64);
1263
1264    // minimum active header revnum
1265    enc_u64 = _endian_encode(file->sb->min_live_hdr_revnum);
1266    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1267    offset += sizeof(enc_u64);
1268
1269    // minimum active header BID
1270    enc_u64 = _endian_encode(file->sb->min_live_hdr_bid);
1271    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1272    offset += sizeof(enc_u64);
1273
1274    // # initial free blocks
1275    enc_u64 = _endian_encode(file->sb->num_init_free_blocks);
1276    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1277    offset += sizeof(enc_u64);
1278
1279    // # free blocks
1280    enc_u64 = _endian_encode(file->sb->num_free_blocks);
1281    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1282    offset += sizeof(enc_u64);
1283
1284    // bitmap size
1285    uint64_t sb_bmp_size = atomic_get_uint64_t(&file->sb->bmp_size);
1286    enc_u64 = _endian_encode(sb_bmp_size);
1287    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1288    offset += sizeof(enc_u64);
1289
1290    bool rsv_bmp_enabled = false;
1291
1292    if ( file->sb->rsv_bmp &&
1293         atomic_cas_uint32_t(&file->sb->rsv_bmp->status,
1294                             SB_RSV_READY, SB_RSV_WRITING) ) {
1295        rsv_bmp_enabled = true;
1296        // status becomes 'WRITING' so that switching will be postponed.
1297        // note that 'rsv_bmp' is not currently used yet so that
1298        // it won't block any other tasks except for switching.
1299    }
1300
1301    // reserved bitmap size (0 if not exist)
1302    if (rsv_bmp_enabled) {
1303        enc_u64 = _endian_encode(file->sb->rsv_bmp->bmp_size);
1304    } else {
1305        enc_u64 = 0;
1306    }
1307    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1308    offset += sizeof(enc_u64);
1309
1310    // bitmap doc offsets
1311    num_docs = _bmp_size_to_num_docs(sb_bmp_size);
1312    for (i=0; i<num_docs; ++i) {
1313        enc_u64 = _endian_encode(file->sb->bmp_doc_offset[i]);
1314        memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1315        offset += sizeof(enc_u64);
1316    }
1317
1318    // reserved bitmap doc offsets
1319    if (rsv_bmp_enabled) {
1320        num_docs = _bmp_size_to_num_docs(file->sb->rsv_bmp->bmp_size);
1321        for (i=0; i<num_docs; ++i) {
1322            enc_u64 = _endian_encode(file->sb->rsv_bmp->bmp_doc_offset[i]);
1323            memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1324            offset += sizeof(enc_u64);
1325        }
1326
1327        atomic_store_uint32_t(&file->sb->rsv_bmp->status, SB_RSV_READY);
1328    }
1329
1330    // CRC
1331    crc = get_checksum(buf, offset, file->crc_mode);
1332    _crc = _endian_encode(crc);
1333    memcpy(buf + offset, &_crc, sizeof(_crc));
1334
1335    // set block marker
1336    memset(buf + blocksize, BLK_MARKER_SB, BLK_MARKER_SIZE);
1337
1338    // directly write a block bypassing block cache
1339    r = file->ops->pwrite(file->fd, buf, real_blocksize, sb_no * real_blocksize);
1340    if (r != real_blocksize) {
1341        char errno_msg[512];
1342        file->ops->get_errno_str(errno_msg, 512);
1343        fs = FDB_RESULT_SB_RACE_CONDITION;
1344        fdb_log(log_callback, fs,
1345                "Failed to write the superblock (number: %" _F64 "), %s",
1346                sb_no, errno_msg);
1347        return fs;
1348    }
1349
1350    // increase superblock's revision number
1351    atomic_incr_uint64_t(&file->sb->revnum);
1352
1353    return FDB_RESULT_SUCCESS;
1354}
1355
1356static void _rsv_free(struct sb_rsv_bmp *rsv)
1357{
1358    free(rsv->bmp);
1359    free(rsv->bmp_doc_offset);
1360    free(rsv->bmp_docs);
1361}
1362
1363static fdb_status _sb_read_given_no(struct filemgr *file,
1364                                    size_t sb_no,
1365                                    struct superblock *sb,
1366                                    err_log_callback *log_callback)
1367{
1368    int r;
1369    int real_blocksize = file->blocksize;
1370    int blocksize = file->blocksize - BLK_MARKER_SIZE;
1371    size_t i, num_docs;
1372    uint8_t *buf = alca(uint8_t, real_blocksize);
1373    uint32_t crc_file, crc, _crc;
1374    uint64_t enc_u64, version, offset, dummy64;
1375    fdb_status fs;
1376    struct sb_rsv_bmp *rsv = NULL;
1377
1378    memset(buf, 0x0, real_blocksize);
1379    offset = 0;
1380
1381    // directly read a block bypassing block cache
1382    r = file->ops->pread(file->fd, buf, real_blocksize, sb_no * real_blocksize);
1383    if (r != real_blocksize) {
1384        char errno_msg[512];
1385        file->ops->get_errno_str(errno_msg, 512);
1386        fs = FDB_RESULT_SB_READ_FAIL;
1387        fdb_log(log_callback, fs,
1388                "Failed to read the superblock: file read failure (SB No.: %" _F64 "), %s",
1389                sb_no, errno_msg);
1390        return fs;
1391    }
1392
1393    // block marker check
1394    if (buf[blocksize] != BLK_MARKER_SB) {
1395        fs = FDB_RESULT_SB_READ_FAIL;
1396        fdb_log(log_callback, fs,
1397                "Failed to read the superblock: "
1398                "incorrect block marker (marker: %x, SB No.: %" _F64 "). "
1399                "Note: this message might be a false alarm if upgrade is running.",
1400                buf[blocksize], sb_no);
1401        return fs;
1402    }
1403
1404    // magic number
1405    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1406    offset += sizeof(enc_u64);
1407    version = _endian_decode(enc_u64);
1408
1409    // version check
1410    if (!ver_superblock_support(version)) {
1411        fs = FDB_RESULT_SB_READ_FAIL;
1412        fdb_log(log_callback, fs,
1413                "Failed to read the superblock: "
1414                "not supported version (magic: %" _F64 ", SB No.: %" _F64 ")",
1415                version, sb_no);
1416        return fs;
1417    }
1418
1419    // revision number
1420    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1421    offset += sizeof(enc_u64);
1422    atomic_store_uint64_t(&sb->revnum, _endian_decode(enc_u64));
1423
1424    // bitmap's revision number
1425    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1426    offset += sizeof(enc_u64);
1427    atomic_store_uint64_t(&sb->bmp_revnum, _endian_decode(enc_u64));
1428
1429    // cur_alloc_bid
1430    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1431    offset += sizeof(enc_u64);
1432    atomic_store_uint64_t(&sb->cur_alloc_bid, _endian_decode(enc_u64));
1433
1434    // last header bid
1435    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1436    offset += sizeof(enc_u64);
1437    atomic_store_uint64_t(&sb->last_hdr_bid, _endian_decode(enc_u64));
1438
1439    // last header rev number
1440    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1441    offset += sizeof(enc_u64);
1442    atomic_store_uint64_t(&sb->last_hdr_revnum, _endian_decode(enc_u64));
1443
1444    // minimum active header revnum
1445    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1446    offset += sizeof(enc_u64);
1447    sb->min_live_hdr_revnum = _endian_decode(enc_u64);
1448
1449    // minimum active header BID
1450    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1451    offset += sizeof(enc_u64);
1452    sb->min_live_hdr_bid = _endian_decode(enc_u64);
1453
1454    // # initial free blocks
1455    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1456    offset += sizeof(enc_u64);
1457    sb->num_init_free_blocks = _endian_decode(enc_u64);
1458
1459    // # free blocks
1460    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1461    offset += sizeof(enc_u64);
1462    sb->num_free_blocks = _endian_decode(enc_u64);
1463
1464    // bitmap size
1465    uint64_t sb_bmp_size;
1466    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1467    offset += sizeof(enc_u64);
1468    sb_bmp_size = _endian_decode(enc_u64);
1469    atomic_store_uint64_t(&sb->bmp_size, sb_bmp_size);
1470
1471    // reserved bitmap size
1472    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1473    offset += sizeof(enc_u64);
1474    dummy64 = _endian_decode(enc_u64);
1475    if (dummy64) {
1476        // reserved bitmap array exists
1477        rsv = (struct sb_rsv_bmp*)calloc(1, sizeof(struct sb_rsv_bmp));
1478        rsv->bmp = NULL;
1479        rsv->bmp_size = dummy64;
1480        rsv->cur_alloc_bid = BLK_NOT_FOUND;
1481        atomic_init_uint32_t(&rsv->status, SB_RSV_INITIALIZING);
1482    }
1483
1484    // temporarily set bitmap array to NULL
1485    // (it will be allocated by fetching function)
1486    sb->bmp = NULL;
1487
1488    sb->num_bmp_docs = num_docs = _bmp_size_to_num_docs(sb_bmp_size);
1489    if (num_docs) {
1490        sb->bmp_doc_offset = (bid_t*)calloc(num_docs, sizeof(bid_t));
1491        sb->bmp_docs = (struct docio_object*)
1492                       calloc(num_docs, sizeof(struct docio_object));
1493    }
1494
1495    // read doc offsets
1496    for (i=0; i<num_docs; ++i) {
1497        memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1498        offset += sizeof(enc_u64);
1499        sb->bmp_doc_offset[i] = _endian_decode(enc_u64);
1500    }
1501
1502    // read reserved bmp docs if exist
1503    if (rsv) {
1504        rsv->num_bmp_docs = num_docs = _bmp_size_to_num_docs(rsv->bmp_size);
1505        if (rsv->num_bmp_docs) {
1506            rsv->bmp_doc_offset = (bid_t*)calloc(num_docs, sizeof(bid_t));
1507            rsv->bmp_docs = (struct docio_object*)
1508                            calloc(num_docs, sizeof(struct docio_object));
1509        }
1510
1511        // read doc offsets
1512        for (i=0; i<num_docs; ++i) {
1513            memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1514            offset += sizeof(enc_u64);
1515            rsv->bmp_doc_offset[i] = _endian_decode(enc_u64);
1516        }
1517        sb->rsv_bmp = rsv;
1518    }
1519
1520    // CRC
1521    crc = get_checksum(buf, offset, file->crc_mode);
1522    memcpy(&_crc, buf + offset, sizeof(_crc));
1523    crc_file = _endian_decode(_crc);
1524    if (crc != crc_file) {
1525        free(sb->bmp_doc_offset);
1526        free(sb->bmp_docs);
1527        sb->bmp_doc_offset = NULL;
1528        sb->bmp_docs = NULL;
1529
1530        fs = FDB_RESULT_SB_READ_FAIL;
1531        fdb_log(log_callback, fs,
1532                "Failed to read the superblock: "
1533                "not supported version (magic: %" _F64 ", SB No.: %" _F64 ")",
1534                version, sb_no);
1535        return fs;
1536    }
1537
1538    return FDB_RESULT_SUCCESS;
1539}
1540
1541static void _sb_free(struct superblock *sb)
1542{
1543    if (sb->rsv_bmp) {
1544        atomic_store_uint32_t(&sb->rsv_bmp->status, SB_RSV_VOID);
1545        _free_bmp_idx(&sb->rsv_bmp->bmp_idx);
1546        _rsv_free(sb->rsv_bmp);
1547        free(sb->rsv_bmp);
1548    }
1549    _free_bmp_idx(&sb->bmp_idx);
1550
1551    free(sb->bmp);
1552    free(sb->bmp_prev);
1553    free(sb->bmp_doc_offset);
1554    // note that each docio object doesn't need to be freed
1555    // as key/body fields point to static memory regions.
1556    free(sb->bmp_docs);
1557    free(sb->config);
1558    spin_destroy(&sb->lock);
1559
1560    sb->bmp = NULL;
1561    sb->bmp_doc_offset = NULL;
1562    sb->bmp_docs = NULL;
1563    sb->config = NULL;
1564}
1565
1566void _sb_init(struct superblock *sb, struct sb_config sconfig)
1567{
1568    *sb->config = sconfig;
1569    atomic_init_uint64_t(&sb->revnum, 0);
1570    atomic_init_uint64_t(&sb->bmp_revnum, 0);
1571    atomic_init_uint64_t(&sb->bmp_size, 0);
1572    sb->bmp = NULL;
1573    atomic_init_uint64_t(&sb->bmp_rcount, 0);
1574    atomic_init_uint64_t(&sb->bmp_wcount, 0);
1575    spin_init(&sb->bmp_lock);
1576    sb->bmp_prev_size = 0;
1577    sb->bmp_prev = NULL;
1578    sb->bmp_doc_offset = NULL;
1579    sb->bmp_docs = NULL;
1580    sb->num_bmp_docs = 0;
1581    sb->num_init_free_blocks = 0;
1582    sb->num_free_blocks = 0;
1583    atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
1584    atomic_init_uint64_t(&sb->last_hdr_bid, BLK_NOT_FOUND);
1585    sb->min_live_hdr_revnum = 0;
1586    sb->min_live_hdr_bid = BLK_NOT_FOUND;
1587    atomic_init_uint64_t(&sb->last_hdr_revnum, 0);
1588    sb->num_alloc = 0;
1589    sb->rsv_bmp = NULL;
1590    avl_init(&sb->bmp_idx, NULL);
1591    spin_init(&sb->lock);
1592}
1593
1594INLINE void _sb_copy(struct superblock *dst, struct superblock *src)
1595{
1596    // since variables in 'src' won't be freed, just copy its pointers
1597    dst->config = src->config;
1598    atomic_store_uint64_t(&dst->revnum, atomic_get_uint64_t(&src->revnum));
1599    atomic_store_uint64_t(&dst->bmp_revnum, atomic_get_uint64_t(&src->bmp_revnum));
1600    atomic_store_uint64_t(&dst->bmp_size, atomic_get_uint64_t(&src->bmp_size));
1601    dst->bmp.store(src->bmp.load(std::memory_order_relaxed), std::memory_order_relaxed);
1602    atomic_store_uint64_t(&dst->bmp_rcount, atomic_get_uint64_t(&src->bmp_rcount));
1603    atomic_store_uint64_t(&dst->bmp_wcount, atomic_get_uint64_t(&src->bmp_wcount));
1604    spin_init(&dst->bmp_lock);
1605    dst->bmp_prev_size = src->bmp_prev_size;
1606    dst->bmp_prev = src->bmp_prev;
1607    dst->bmp_idx = src->bmp_idx;
1608    dst->bmp_doc_offset = src->bmp_doc_offset;
1609    dst->bmp_docs = src->bmp_docs;
1610    dst->num_bmp_docs = src->num_bmp_docs;
1611    dst->num_init_free_blocks = src->num_init_free_blocks;
1612    dst->num_free_blocks = src->num_free_blocks;
1613    dst->rsv_bmp = src->rsv_bmp;
1614    atomic_store_uint64_t(&dst->cur_alloc_bid, atomic_get_uint64_t(&src->cur_alloc_bid));
1615    atomic_store_uint64_t(&dst->last_hdr_bid, atomic_get_uint64_t(&src->last_hdr_bid));
1616    dst->min_live_hdr_revnum = src->min_live_hdr_revnum;
1617    dst->min_live_hdr_bid = src->min_live_hdr_bid;
1618    atomic_store_uint64_t(&dst->last_hdr_revnum, atomic_get_uint64_t(&src->last_hdr_revnum));
1619    dst->num_alloc = src->num_alloc;
1620    spin_init(&dst->lock);
1621}
1622
1623fdb_status sb_read_latest(struct filemgr *file,
1624                          struct sb_config sconfig,
1625                          err_log_callback *log_callback)
1626{
1627    size_t i, max_sb_no = sconfig.num_sb;
1628    uint64_t max_revnum = 0;
1629    uint64_t revnum_limit = (uint64_t)0xffffffffffffffff;
1630    fdb_status fs;
1631    struct superblock *sb_arr;
1632
1633    if (file->sb) {
1634        // Superblock is already read previously.
1635        // This means that there are some problems with the current superblock
1636        // so that we have to read another candidate.
1637
1638        // Note: 'sb->revnum' denotes the revnum of next superblock to be
1639        // written, so we need to subtract 1 from it to get the revnum of
1640        // the current superblock successfully read from the file.
1641        revnum_limit = atomic_get_uint64_t(&file->sb->revnum) - 1;
1642        sb_free(file);
1643    }
1644
1645    sb_arr = alca(struct superblock,
1646                  sconfig.num_sb * sizeof(struct superblock));
1647    memset(sb_arr, 0x0, sconfig.num_sb * sizeof(struct superblock));
1648
1649    // read all superblocks
1650    for (i=0; i<sconfig.num_sb; ++i) {
1651        sb_arr[i].config = (struct sb_config*)calloc(1, sizeof(struct sb_config));
1652        _sb_init(&sb_arr[i], sconfig);
1653        fs = _sb_read_given_no(file, i, &sb_arr[i], log_callback);
1654
1655        uint64_t cur_revnum = atomic_get_uint64_t(&sb_arr[i].revnum);
1656        if (fs == FDB_RESULT_SUCCESS &&
1657            cur_revnum >= max_revnum &&
1658            cur_revnum < revnum_limit) {
1659            max_sb_no = i;
1660            max_revnum = cur_revnum;
1661        }
1662    }
1663
1664    if (max_sb_no == sconfig.num_sb) {
1665        // all superblocks are broken
1666        fs = FDB_RESULT_SB_READ_FAIL;
1667        for (i=0; i<sconfig.num_sb; ++i) {
1668            _sb_free(&sb_arr[i]);
1669        }
1670        // no readable superblock
1671        // (if not corrupted, it may be a normal old version file)
1672        return fs;
1673    }
1674
1675    file->sb = (struct superblock*)calloc(1, sizeof(struct superblock));
1676    _sb_copy(file->sb, &sb_arr[max_sb_no]);
1677
1678    // set last commit position
1679    if (atomic_get_uint64_t(&file->sb->cur_alloc_bid) != BLK_NOT_FOUND) {
1680        atomic_store_uint64_t(&file->last_commit,
1681                              atomic_get_uint64_t(&file->sb->cur_alloc_bid) *
1682                              file->config->blocksize);
1683    } else {
1684        // otherwise, last_commit == file->pos
1685        // (already set by filemgr_open() function)
1686    }
1687
1688    atomic_incr_uint64_t(&file->sb->revnum);
1689    avl_init(&file->sb->bmp_idx, NULL);
1690
1691    // free the other superblocks
1692    for (i=0; i<sconfig.num_sb; ++i) {
1693        if (i != max_sb_no) {
1694            _sb_free(&sb_arr[i]);
1695        }
1696    }
1697
1698    return FDB_RESULT_SUCCESS;
1699}
1700
1701uint64_t sb_get_bmp_revnum(struct filemgr *file)
1702{
1703    if (file->sb) {
1704        return atomic_get_uint64_t(&file->sb->bmp_revnum);
1705    } else {
1706        return 0;
1707    }
1708}
1709
1710filemgr_header_revnum_t sb_get_min_live_revnum(struct filemgr *file)
1711{
1712    if (file->sb) {
1713        return file->sb->min_live_hdr_revnum;
1714    } else {
1715        return 0;
1716    }
1717}
1718
1719uint64_t sb_get_num_free_blocks(struct filemgr *file)
1720{
1721    if (file->sb) {
1722        return file->sb->num_free_blocks;
1723    } else {
1724        return 0;
1725    }
1726}
1727
1728struct sb_config sb_get_default_config()
1729{
1730    struct sb_config ret;
1731    ret.num_sb = SB_DEFAULT_NUM_SUPERBLOCKS;
1732    return ret;
1733}
1734
1735fdb_status sb_init(struct filemgr *file, struct sb_config sconfig,
1736                   err_log_callback * log_callback)
1737{
1738    size_t i;
1739    bid_t sb_bid;
1740    fdb_status fs;
1741
1742    // exit if superblock already exists.
1743    if (file->sb) {
1744        return FDB_RESULT_SUCCESS;
1745    }
1746    // no data should be written in the file before initialization of superblock.
1747    if (filemgr_get_pos(file) > 0) {
1748        return FDB_RESULT_SB_INIT_FAIL;
1749    }
1750
1751    file->sb = (struct superblock*)calloc(1, sizeof(struct superblock));
1752    file->sb->config = (struct sb_config*)calloc(1, sizeof(struct sb_config));
1753    file->version = ver_get_latest_magic();
1754    _sb_init(file->sb, sconfig);
1755
1756    // write initial superblocks
1757    for (i=0; i<file->sb->config->num_sb; ++i) {
1758        // allocate
1759        sb_bid = filemgr_alloc(file, log_callback);
1760        if (sb_bid != i) {
1761            // other data was written during sb_write .. error
1762            fs = FDB_RESULT_SB_RACE_CONDITION;
1763            fdb_log(log_callback, fs,
1764                    "Other writer interfered during sb_write (number: %" _F64 ")",
1765                    i);
1766            free(file->sb->config);
1767            free(file->sb);
1768            return fs;
1769        }
1770
1771        fs = sb_write(file, i, log_callback);
1772        if (fs != FDB_RESULT_SUCCESS) {
1773            return fs;
1774        }
1775    }
1776
1777    return FDB_RESULT_SUCCESS;
1778}
1779
1780fdb_status sb_free(struct filemgr *file)
1781{
1782    if (file->sb) {
1783        _sb_free(file->sb);
1784        free(file->sb);
1785        file->sb = NULL;
1786    }
1787
1788    return FDB_RESULT_SUCCESS;
1789}
1790
1791
1792