xref: /6.6.0/forestdb/src/superblock.cc (revision 74dd9214)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <string.h>
20#include <stdint.h>
21#include <stdlib.h>
22
23#include "libforestdb/forestdb.h"
24#include "superblock.h"
25#include "staleblock.h"
26#include "btreeblock.h"
27#include "list.h"
28#include "fdb_internal.h"
29#include "version.h"
30#include "blockcache.h"
31
32#include "memleak.h"
33
34/*
35 * << super block structure >>
36 *
37 * 0x00 [file version (magic number)]:           8 bytes
38 * 0x08 [super block revision number]:           8 bytes
39 * 0x10 [bitmap revision number]:                8 bytes
40 * 0x18 [BID for next allocation]:               8 bytes
41 * 0x20 [last header BID]:                       8 bytes
42 * 0x28 [last header revnum]:                    8 bytes
43 * 0x30 [min active header revnum]:              8 bytes
44 * 0x38 [min active header BID]:                 8 bytes
45 * 0x40 [# initial free blocks in the bitmap]:   8 bytes
46 * 0x48 [# free blocks in the bitmap]:           8 bytes
47 * 0x50 [bitmap size]:                           8 bytes
48 * 0x58 [reserved bitmap size (0 if not exist)]: 8 bytes
49 * 0x60 ... [bitmap doc offset]:                 8 bytes each
50 *      [CRC32]:                                 4 bytes
51 * ...
52 *      [block marker]:                          1 byte
53 */
54
55
56/*
57 * << basic mask >>
58 * 0: 10000000
59 * 1: 01000000
60 *    ...
61 * 7: 00000001
62 */
63static uint8_t bmp_basic_mask[8];
64/*
65 * << 2d mask >>
66 * bmp_2d_mask[pos][len]
67 *
68 * 0,1: 10000000
69 * 0,2: 11000000
70 * 0,3: 11100000
71 *    ...
72 * 1,1: 01000000
73 * 1,2: 01100000
74 *    ...
75 * 3,1: 00010000
76 * 3,2: 00011000
77 *    ...
78 * 6,1: 00000010
79 * 6,2: 00000011
80 * 7,1: 00000001
81 */
82static uint8_t bmp_2d_mask[8][9];
83
84
85// x % 8
86#define mod8(x) ((x) & 0x7)
87// x / 8
88#define div8(x) ((x) >> 3)
89// rounding down (to the nearest 8)
90#define rd8(x) (((x) >> 3) << 3)
91
92struct bmp_idx_node {
93    uint64_t id;
94    struct avl_node avl;
95};
96
97INLINE int _bmp_idx_cmp(struct avl_node *a, struct avl_node *b, void *aux)
98{
99    struct bmp_idx_node *aa, *bb;
100    aa = _get_entry(a, struct bmp_idx_node, avl);
101    bb = _get_entry(b, struct bmp_idx_node, avl);
102
103#ifdef __BIT_CMP
104    return _CMP_U64(aa->id, bb->id);
105#else
106    if (aa->id < bb->id) {
107        return -1;
108    } else if (aa->id > bb->id) {
109        return 1;
110    } else {
111        return 0;
112    }
113#endif
114}
115
116static void _add_bmp_idx(struct avl_tree *bmp_idx, bid_t bid, bid_t count)
117{
118    bid_t cur, start_id, stop_id;
119    struct avl_node *a;
120    struct bmp_idx_node *item, query;
121
122    // 256 blocks per node
123    start_id = bid >> 8;
124    stop_id = (bid+count-1) >> 8;
125
126    for (cur=start_id; cur<=stop_id; ++cur) {
127        query.id = cur;
128        a = avl_search(bmp_idx, &query.avl, _bmp_idx_cmp);
129        if (a) {
130            // already exists .. do nothing
131        } else {
132            // create a node
133            item = (struct bmp_idx_node*)calloc(1, sizeof(struct bmp_idx_node));
134            item->id = query.id;
135            avl_insert(bmp_idx, &item->avl, _bmp_idx_cmp);
136        }
137    }
138}
139
140static void _free_bmp_idx(struct avl_tree *bmp_idx)
141{
142    // free all supplemental bmp idx nodes
143    struct avl_node *a;
144    struct bmp_idx_node *item;
145    a = avl_first(bmp_idx);
146    while (a) {
147        item = _get_entry(a, struct bmp_idx_node, avl);
148        a = avl_next(a);
149        avl_remove(bmp_idx, &item->avl);
150        free(item);
151    }
152}
153
154static void _construct_bmp_idx(struct avl_tree *bmp_idx,
155                               uint8_t *bmp,
156                               uint64_t bmp_size,
157                               bid_t start_bid)
158{
159    uint64_t i, node_idx;
160    uint64_t *bmp64 = (uint64_t*)bmp;
161    struct bmp_idx_node *item;
162
163    if (start_bid == BLK_NOT_FOUND) {
164        start_bid = 0;
165    }
166
167    // Since a single byte includes 8 bitmaps, an 8-byte integer contains 64 bitmaps.
168    // By converting bitmap array to uint64_t array, we can quickly verify if a
169    // 64-bitmap-group has at least one non-zero bit or not.
170    for (i=start_bid/64; i<bmp_size/64; ++i) {
171        // in this loop, 'i' denotes bitmap group number.
172        node_idx = i/4;
173        if (bmp64[i]) {
174            item = (struct bmp_idx_node *)calloc(1, sizeof(struct bmp_idx_node));
175            item->id = node_idx;
176            avl_insert(bmp_idx, &item->avl, _bmp_idx_cmp);
177            // skip other bitmaps in the same bitmap index node
178            // (1 bitmap index node == 4 bitmap groups == 256 bitmaps)
179            i = (node_idx+1)*4;
180        }
181    }
182
183    // If there are remaining bitmaps, check if they are non-zero or not one by one.
184    if (bmp_size % 64) {
185        uint8_t idx, off;
186        uint64_t start;
187
188        start = (bmp_size/64)*64;
189        if (start < start_bid) {
190            start = start_bid;
191        }
192
193        for (i=start; i<bmp_size; ++i) {
194            // in this loop, 'i' denotes bitmap number (i.e., BID).
195            idx = div8(i);
196            off = mod8(i);
197            if (bmp[idx] & bmp_basic_mask[off]) {
198                node_idx = i >> 8;
199                item = (struct bmp_idx_node *)calloc(1, sizeof(struct bmp_idx_node));
200                item->id = node_idx;
201                avl_insert(bmp_idx, &item->avl, _bmp_idx_cmp);
202                i = (node_idx+1)*256;
203            }
204        }
205    }
206}
207
208INLINE size_t _bmp_size_to_num_docs(uint64_t bmp_size)
209{
210    if (bmp_size) {
211        // 8 bitmaps per byte
212        uint64_t num_bits_per_doc = 8 * SB_MAX_BITMAP_DOC_SIZE;
213        return (bmp_size + num_bits_per_doc - 1) / num_bits_per_doc;
214    } else {
215        return 0;
216    }
217}
218
219INLINE bool _is_bmp_set(uint8_t *bmp, bid_t bid)
220{
221    return (bmp[div8(bid)] & bmp_basic_mask[mod8(bid)]);
222}
223
224static void sb_bmp_barrier_on(struct superblock *sb)
225{
226    atomic_incr_uint64_t(&sb->bmp_rcount);
227    if (atomic_get_uint64_t(&sb->bmp_wcount)) {
228        // now bmp pointer & related variables are being changed.
229        // decrease count and grab lock until the change is done.
230        atomic_decr_uint64_t(&sb->bmp_rcount);
231        spin_lock(&sb->bmp_lock);
232
233        // got control: means that change is done.
234        // re-increase the count
235        atomic_incr_uint64_t(&sb->bmp_rcount);
236        spin_unlock(&sb->bmp_lock);
237    }
238}
239
240static void sb_bmp_barrier_off(struct superblock *sb)
241{
242    atomic_decr_uint64_t(&sb->bmp_rcount);
243}
244
245static void sb_bmp_change_begin(struct superblock *sb)
246{
247    // grab lock and increase writer count
248    // now new readers cannot increase the reader count
249    // (note that there is always one bitmap writer at a time)
250    spin_lock(&sb->bmp_lock);
251    atomic_incr_uint64_t(&sb->bmp_wcount);
252
253    // wait until previous BMP readers terminate
254    // (they are very short bitmap accessing routines so that
255    //  will not take long time).
256    size_t spin = 0;
257    while (atomic_get_uint64_t(&sb->bmp_rcount)) {
258       if (++spin > 64) {
259#ifdef HAVE_SCHED_H
260            sched_yield();
261#elif _MSC_VER
262            SwitchToThread();
263#endif
264       }
265    }
266
267    // now 1) all previous readers terminated
268    //     2) new readers will be blocked
269}
270
271static void sb_bmp_change_end(struct superblock *sb)
272{
273    atomic_decr_uint64_t(&sb->bmp_wcount);
274    spin_unlock(&sb->bmp_lock);
275    // now resume all pending readers
276}
277
278void sb_bmp_append_doc(fdb_kvs_handle *handle)
279{
280    // == write bitmap into system docs ==
281    // calculate # docs (1MB by default)
282    // (1MB bitmap covers 32GB DB file)
283    size_t i;
284    uint64_t num_docs;
285    char doc_key[64];
286    struct superblock *sb = handle->file->sb;
287
288    // mark stale if previous doc offset exists
289    if (sb->bmp_doc_offset) {
290        for (i=0; i<sb->num_bmp_docs; ++i) {
291            filemgr_mark_stale(handle->file, sb->bmp_doc_offset[i],
292                _fdb_get_docsize(sb->bmp_docs[i].length));
293        }
294
295        free(sb->bmp_doc_offset);
296        free(sb->bmp_docs);
297        sb->bmp_doc_offset = NULL;
298        sb->bmp_docs = NULL;
299    }
300
301    uint64_t sb_bmp_size = atomic_get_uint64_t(&sb->bmp_size);
302    sb->num_bmp_docs = num_docs = _bmp_size_to_num_docs(sb_bmp_size);
303    if (num_docs) {
304        sb->bmp_doc_offset = (bid_t*)calloc(num_docs, sizeof(bid_t));
305        sb->bmp_docs = (struct docio_object*)
306                       calloc(num_docs, sizeof(struct docio_object));
307    }
308
309    // bitmap doc offsets
310    for (i=0; i<num_docs; ++i) {
311        // append a system doc for bitmap chunk
312        memset(&sb->bmp_docs[i], 0x0, sizeof(struct docio_object));
313        sprintf(doc_key, "bitmap_%" _F64 "_%d",
314                atomic_get_uint64_t(&sb->bmp_revnum), (int)i);
315        sb->bmp_docs[i].key = (void*)doc_key;
316        sb->bmp_docs[i].meta = NULL;
317        sb->bmp_docs[i].body = sb->bmp + (i * SB_MAX_BITMAP_DOC_SIZE);
318
319        sb->bmp_docs[i].length.keylen = strlen(doc_key)+1;
320        sb->bmp_docs[i].length.metalen = 0;
321        if (i == num_docs - 1) {
322            // the last doc
323            sb->bmp_docs[i].length.bodylen =
324                (sb_bmp_size / 8) % SB_MAX_BITMAP_DOC_SIZE;
325        } else {
326            // otherwise: 1MB
327            sb->bmp_docs[i].length.bodylen = SB_MAX_BITMAP_DOC_SIZE;
328        }
329        sb->bmp_docs[i].seqnum = 0;
330        sb->bmp_doc_offset[i] =
331            docio_append_doc_system(handle->dhandle, &sb->bmp_docs[i]);
332    }
333}
334
335void sb_rsv_append_doc(fdb_kvs_handle *handle)
336{
337    size_t i;
338    uint64_t num_docs;
339    char doc_key[64];
340    struct superblock *sb = handle->file->sb;
341    struct sb_rsv_bmp *rsv = NULL;
342
343    if (!sb) {
344        return;
345    }
346
347    rsv = sb->rsv_bmp;
348    if (!rsv ||
349        atomic_get_uint32_t(&rsv->status) != SB_RSV_INITIALIZING) {
350        return;
351    }
352
353    rsv->num_bmp_docs = num_docs = _bmp_size_to_num_docs(rsv->bmp_size);
354    if (num_docs) {
355        rsv->bmp_doc_offset = (bid_t*)calloc(num_docs, sizeof(bid_t));
356        rsv->bmp_docs = (struct docio_object*)
357                        calloc(num_docs, sizeof(struct docio_object));
358    }
359
360    // bitmap doc offsets
361    for (i=0; i<num_docs; ++i) {
362        // append a system doc for bitmap chunk
363        memset(&rsv->bmp_docs[i], 0x0, sizeof(struct docio_object));
364        sprintf(doc_key, "bitmap_%" _F64 "_%d", rsv->bmp_revnum, (int)i);
365        rsv->bmp_docs[i].key = (void*)doc_key;
366        rsv->bmp_docs[i].meta = NULL;
367        rsv->bmp_docs[i].body = rsv->bmp + (i * SB_MAX_BITMAP_DOC_SIZE);
368
369        rsv->bmp_docs[i].length.keylen = strlen(doc_key)+1;
370        rsv->bmp_docs[i].length.metalen = 0;
371        if (i == num_docs - 1) {
372            // the last doc
373            rsv->bmp_docs[i].length.bodylen =
374                (rsv->bmp_size / 8) % SB_MAX_BITMAP_DOC_SIZE;
375        } else {
376            // otherwise: 1MB
377            rsv->bmp_docs[i].length.bodylen = SB_MAX_BITMAP_DOC_SIZE;
378        }
379        rsv->bmp_docs[i].seqnum = 0;
380        rsv->bmp_doc_offset[i] =
381            docio_append_doc_system(handle->dhandle, &rsv->bmp_docs[i]);
382    }
383
384    // now 'rsv_bmp' is available.
385    atomic_store_uint32_t(&rsv->status, SB_RSV_READY);
386}
387
388fdb_status sb_bmp_fetch_doc(fdb_kvs_handle *handle)
389{
390    // == read bitmap from system docs ==
391    size_t i;
392    uint64_t num_docs;
393    int64_t r_offset;
394    char doc_key[64];
395    struct superblock *sb = handle->file->sb;
396    struct sb_rsv_bmp *rsv = NULL;
397
398    // skip if previous bitmap exists OR
399    // there is no bitmap to be fetched (fast screening)
400    if (sb->bmp.load(std::memory_order_relaxed) ||
401        atomic_get_uint64_t(&sb->bmp_size) == 0) {
402        return FDB_RESULT_SUCCESS;
403    }
404
405    spin_lock(&sb->lock);
406
407    // check once again if superblock is already initialized
408    // while the thread was blocked by the lock.
409    if (sb->bmp.load(std::memory_order_relaxed)) {
410        spin_unlock(&sb->lock);
411        return FDB_RESULT_SUCCESS;
412    }
413
414    uint64_t sb_bmp_size = atomic_get_uint64_t(&sb->bmp_size);
415    sb->num_bmp_docs = num_docs = _bmp_size_to_num_docs(sb_bmp_size);
416    if (!num_docs) {
417        spin_unlock(&sb->lock);
418        return FDB_RESULT_SUCCESS;
419    }
420
421    free(sb->bmp);
422    sb->bmp = (uint8_t*)calloc(1, (sb_bmp_size+7) / 8);
423
424    for (i=0; i<num_docs; ++i) {
425        memset(&sb->bmp_docs[i], 0x0, sizeof(struct docio_object));
426        // pre-allocated buffer for key
427        sb->bmp_docs[i].key = (void*)doc_key;
428        // directly point to the bitmap
429        sb->bmp_docs[i].body = sb->bmp + (i * SB_MAX_BITMAP_DOC_SIZE);
430
431        r_offset = docio_read_doc(handle->dhandle, sb->bmp_doc_offset[i],
432                                  &sb->bmp_docs[i], true);
433        if (r_offset <= 0) {
434            // read fail
435            free(sb->bmp);
436            sb->bmp = NULL;
437
438            spin_unlock(&sb->lock);
439            return r_offset < 0 ? (fdb_status) r_offset : FDB_RESULT_SB_READ_FAIL;
440        }
441    }
442
443    _construct_bmp_idx(&sb->bmp_idx, sb->bmp, sb_bmp_size,
444                       atomic_get_uint64_t(&sb->cur_alloc_bid));
445
446    rsv = sb->rsv_bmp;
447    if (rsv && atomic_get_uint32_t(&rsv->status) == SB_RSV_INITIALIZING) {
448        // reserved bitmap exists
449        rsv->num_bmp_docs = num_docs = _bmp_size_to_num_docs(rsv->bmp_size);
450        if (!num_docs) {
451            spin_unlock(&sb->lock);
452            return FDB_RESULT_SUCCESS;
453        }
454
455        rsv->bmp = (uint8_t*)calloc(1, (rsv->bmp_size+7) / 8);
456
457        for (i=0; i<num_docs; ++i) {
458
459            memset(&rsv->bmp_docs[i], 0x0, sizeof(struct docio_object));
460            // pre-allocated buffer for key
461            rsv->bmp_docs[i].key = (void*)doc_key;
462            // directly point to the (reserved) bitmap
463            rsv->bmp_docs[i].body = rsv->bmp + (i * SB_MAX_BITMAP_DOC_SIZE);
464
465            r_offset = docio_read_doc(handle->dhandle, rsv->bmp_doc_offset[i],
466                                      &rsv->bmp_docs[i], true);
467            if (r_offset <= 0) {
468                // read fail
469                free(rsv->bmp);
470                free(sb->bmp);
471                rsv->bmp = sb->bmp = NULL;
472
473                spin_unlock(&sb->lock);
474                return r_offset < 0 ? (fdb_status) r_offset : FDB_RESULT_SB_READ_FAIL;
475            }
476        }
477
478        _construct_bmp_idx(&rsv->bmp_idx, rsv->bmp, rsv->bmp_size, 0);
479        atomic_store_uint32_t(&rsv->status, SB_RSV_READY);
480    }
481
482    spin_unlock(&sb->lock);
483    return FDB_RESULT_SUCCESS;
484}
485
486bool sb_check_sync_period(fdb_kvs_handle *handle)
487{
488    struct superblock *sb = handle->file->sb;
489
490    if (sb && sb->num_alloc * handle->file->blocksize > SB_SYNC_PERIOD) {
491        return true;
492    }
493    return false;
494}
495
496bool sb_update_header(fdb_kvs_handle *handle)
497{
498    bool ret = false;
499    struct superblock *sb = handle->file->sb;
500
501    bid_t last_hdr_bid = atomic_get_uint64_t(&handle->last_hdr_bid);
502    if (sb && atomic_get_uint64_t(&sb->last_hdr_bid) != last_hdr_bid &&
503        atomic_get_uint64_t(&sb->last_hdr_revnum) < handle->cur_header_revnum) {
504
505        atomic_store_uint64_t(&sb->last_hdr_bid, last_hdr_bid);
506        atomic_store_uint64_t(&sb->last_hdr_revnum, handle->cur_header_revnum);
507
508        uint64_t lw_revnum = atomic_get_uint64_t(&handle->file->last_writable_bmp_revnum);
509        if (lw_revnum == atomic_get_uint64_t(&sb->bmp_revnum) &&
510            sb->bmp_prev) {
511            free(sb->bmp_prev);
512            sb->bmp_prev = NULL;
513        }
514        ret = true;
515    }
516
517    return ret;
518}
519
520void sb_reset_num_alloc(fdb_kvs_handle *handle)
521{
522    if (handle->file->sb) {
523        handle->file->sb->num_alloc = 0;
524    }
525}
526
527fdb_status sb_sync_circular(fdb_kvs_handle *handle)
528{
529    uint64_t sb_revnum;
530    fdb_status fs;
531
532    sb_revnum = atomic_get_uint64_t(&handle->file->sb->revnum);
533    fs = sb_write(handle->file,
534                  sb_revnum % handle->file->sb->config->num_sb,
535                  &handle->log_callback);
536    return fs;
537}
538
539sb_decision_t sb_check_block_reusing(fdb_kvs_handle *handle)
540
541{
542    // start block reusing when
543    // 1) if blocks are not reused yet in this file:
544    //    when file size becomes larger than the threshold
545    // 2) otherwise:
546    //    when # free blocks decreases under the threshold
547
548    uint64_t live_datasize;
549    uint64_t filesize;
550    uint64_t ratio;
551    struct superblock *sb = handle->file->sb;
552
553    if (!sb) {
554        return SBD_NONE;
555    }
556
557    if (filemgr_get_file_status(handle->file) != FILE_NORMAL) {
558        // being compacted file does not allow block reusing
559        return SBD_NONE;
560    }
561
562    uint64_t block_reusing_threshold =
563        atomic_get_uint64_t(&handle->file->config->block_reusing_threshold,
564                            std::memory_order_relaxed);
565    if (block_reusing_threshold == 0 || block_reusing_threshold >= 100) {
566        // circular block reusing is disabled
567        return SBD_NONE;
568    }
569
570    filesize = filemgr_get_pos(handle->file);
571    if (filesize < SB_MIN_BLOCK_REUSING_FILESIZE) {
572        return SBD_NONE;
573    }
574
575    // at least # keeping headers should exist
576    // since the last block reusing
577    if (handle->cur_header_revnum <=
578        sb->min_live_hdr_revnum +
579        atomic_get_uint64_t(&handle->file->config->num_keeping_headers,
580                            std::memory_order_relaxed)) {
581        return SBD_NONE;
582    }
583
584    live_datasize = fdb_estimate_space_used(handle->fhandle);
585    if (filesize == 0 || live_datasize == 0 ||
586        live_datasize > filesize) {
587        return SBD_NONE;
588    }
589
590    ratio = (filesize - live_datasize) * 100 / filesize;
591
592    if (ratio > block_reusing_threshold) {
593        if (!sb_bmp_exists(sb)) {
594            // block reusing has not been started yet
595            return SBD_RECLAIM;
596        } else {
597            // stale blocks are already being reused before
598            if (sb->num_free_blocks == 0) {
599                if (sb->rsv_bmp) {
600                    // reserved bitmap exists
601                    return SBD_SWITCH;
602                } else {
603                    // re-reclaim
604                    return SBD_RECLAIM;
605                }
606            } else if ( (sb->num_free_blocks * 100 <
607                         sb->num_init_free_blocks * SB_PRE_RECLAIM_RATIO)) {
608                if (sb->num_init_free_blocks * handle->file->config->blocksize >
609                    SB_MIN_BLOCK_REUSING_FILESIZE)  {
610                    return SBD_RESERVE;
611                }
612            }
613        }
614    }
615
616    return SBD_NONE;
617}
618
619bool sb_reclaim_reusable_blocks(fdb_kvs_handle *handle)
620{
621    size_t i;
622    uint64_t num_blocks, bmp_size_byte;
623    stale_header_info sheader;
624    reusable_block_list blist;
625    struct superblock *sb = handle->file->sb;
626
627    // should flush all dirty blocks in cache
628    filemgr_sync(handle->file, false, &handle->log_callback);
629
630    sheader = fdb_get_smallest_active_header(handle);
631    if (sheader.bid == BLK_NOT_FOUND) {
632        return false;
633    }
634
635    // get reusable block list
636    blist = fdb_get_reusable_block(handle, sheader);
637    if (blist.n_blocks == 0) {
638        return false;
639    }
640
641    // update superblock's bitmap
642    uint8_t *new_bmp = NULL, *old_bmp = NULL;
643    num_blocks = filemgr_get_pos(handle->file) / handle->file->blocksize;
644    // 8 bitmaps per byte
645    bmp_size_byte = (num_blocks+7) / 8;
646    fdb_assert(num_blocks >= SB_DEFAULT_NUM_SUPERBLOCKS,
647               num_blocks, SB_DEFAULT_NUM_SUPERBLOCKS);
648    new_bmp = (uint8_t*)calloc(1, bmp_size_byte);
649
650    // free pre-existing bmp index
651    _free_bmp_idx(&sb->bmp_idx);
652
653    for (i=0; i<blist.n_blocks; ++i) {
654        sb_bmp_set(new_bmp, blist.blocks[i].bid, blist.blocks[i].count);
655        if (i==0 &&
656            atomic_get_uint64_t(&sb->cur_alloc_bid) == BLK_NOT_FOUND) {
657            atomic_store_uint64_t(&sb->cur_alloc_bid, blist.blocks[i].bid);
658        }
659        sb->num_free_blocks += blist.blocks[i].count;
660        // add info for supplementary bmp index
661        _add_bmp_idx(&sb->bmp_idx, blist.blocks[i].bid, blist.blocks[i].count);
662    }
663    free(blist.blocks);
664
665    sb_bmp_change_begin(sb);
666    old_bmp = sb->bmp.load(std::memory_order_relaxed);
667    sb->bmp.store(new_bmp, std::memory_order_relaxed);
668    atomic_store_uint64_t(&sb->bmp_size, num_blocks);
669    sb->min_live_hdr_revnum = sheader.revnum;
670    sb->min_live_hdr_bid = sheader.bid;
671    atomic_incr_uint64_t(&sb->bmp_revnum);
672    sb->num_init_free_blocks = sb->num_free_blocks;
673    sb_bmp_change_end(sb);
674    free(old_bmp);
675
676    return true;
677}
678
679bool sb_reserve_next_reusable_blocks(fdb_kvs_handle *handle)
680{
681    size_t i;
682    uint64_t num_blocks, bmp_size_byte;
683    stale_header_info sheader;
684    reusable_block_list blist;
685    struct superblock *sb = handle->file->sb;
686    struct sb_rsv_bmp *rsv = NULL;
687
688    if (sb->rsv_bmp) {
689        // next bitmap already reclaimed
690        return false;
691    }
692
693    sheader = fdb_get_smallest_active_header(handle);
694    if (sheader.bid == BLK_NOT_FOUND) {
695        return false;
696    }
697
698    // get reusable block list
699    blist = fdb_get_reusable_block(handle, sheader);
700    if (blist.n_blocks == 0) {
701        return false;
702    }
703
704    // calculate bitmap size
705    num_blocks = filemgr_get_pos(handle->file) / handle->file->blocksize;
706    bmp_size_byte = (num_blocks+7) / 8;
707    if (num_blocks) {
708        rsv = (struct sb_rsv_bmp*)calloc(1, sizeof(struct sb_rsv_bmp));
709        rsv->bmp = (uint8_t*)calloc(1, bmp_size_byte);
710        rsv->cur_alloc_bid = BLK_NOT_FOUND;
711
712        // the initial status is 'INITIALIZING' so that 'rsv_bmp' is not
713        // available until executing sb_rsv_append_doc().
714        atomic_init_uint32_t(&rsv->status, SB_RSV_INITIALIZING);
715        avl_init(&rsv->bmp_idx, NULL);
716        rsv->bmp_size = num_blocks;
717
718        for (i=0; i<blist.n_blocks; ++i) {
719            sb_bmp_set(rsv->bmp, blist.blocks[i].bid, blist.blocks[i].count);
720            if (i==0 && rsv->cur_alloc_bid == BLK_NOT_FOUND) {
721                rsv->cur_alloc_bid = blist.blocks[i].bid;
722            }
723            rsv->num_free_blocks += blist.blocks[i].count;
724            _add_bmp_idx(&rsv->bmp_idx, blist.blocks[i].bid, blist.blocks[i].count);
725        }
726        free(blist.blocks);
727
728        rsv->min_live_hdr_revnum = sheader.revnum;
729        rsv->min_live_hdr_bid = sheader.bid;
730        rsv->bmp_revnum = atomic_get_uint64_t(&sb->bmp_revnum)+1;
731        sb->rsv_bmp = rsv;
732    }
733
734    return true;
735}
736
737static void _rsv_free(struct sb_rsv_bmp *rsv);
738void sb_return_reusable_blocks(fdb_kvs_handle *handle)
739{
740    uint64_t node_id;
741    bid_t cur;
742    struct superblock *sb = handle->file->sb;
743    struct sb_rsv_bmp *rsv;
744    struct avl_node *a;
745    struct bmp_idx_node *item, query;
746
747    if (!sb) {
748        // we don't need to do this.
749        return;
750    }
751
752    // re-insert all remaining bitmap into stale list
753    uint64_t sb_bmp_size = atomic_get_uint64_t(&sb->bmp_size);
754    for (cur = atomic_get_uint64_t(&sb->cur_alloc_bid); cur < sb_bmp_size; ++cur) {
755        if (_is_bmp_set(sb->bmp, cur)) {
756            filemgr_add_stale_block(handle->file, cur, 1);
757        }
758
759        if ((cur % 256) == 0 && cur > 0) {
760            // node ID changes
761            // remove & free current bmp node
762            node_id = cur / 256;
763            query.id = node_id - 1;
764            a = avl_search(&sb->bmp_idx, &query.avl, _bmp_idx_cmp);
765            if (a) {
766                item = _get_entry(a, struct bmp_idx_node, avl);
767                avl_remove(&sb->bmp_idx, a);
768                free(item);
769            }
770
771            // move to next bmp node
772            do {
773                a = avl_first(&sb->bmp_idx);
774                if (a) {
775                    item = _get_entry(a, struct bmp_idx_node, avl);
776                    if (item->id <= node_id) {
777                        avl_remove(&sb->bmp_idx, a);
778                        free(item);
779                        continue;
780                    }
781                    cur = item->id * 256;
782                    break;
783                }
784
785                // no more reusable block
786                cur = sb_bmp_size;
787                break;
788            } while (true);
789        }
790    }
791    sb->num_free_blocks = 0;
792    atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
793
794    // do the same work for the reserved blocks if exist
795    rsv = sb->rsv_bmp;
796    if (rsv &&
797        atomic_cas_uint32_t(&rsv->status, SB_RSV_READY, SB_RSV_VOID)) {
798
799        for (cur = rsv->cur_alloc_bid; cur < rsv->bmp_size; ++cur) {
800            if (_is_bmp_set(rsv->bmp, cur)) {
801                filemgr_add_stale_block(handle->file, cur, 1);
802            }
803
804            if ((cur % 256) == 0 && cur > 0) {
805                // node ID changes
806                // remove & free current bmp node
807                node_id = cur / 256;
808                query.id = node_id - 1;
809                a = avl_search(&rsv->bmp_idx, &query.avl, _bmp_idx_cmp);
810                if (a) {
811                    item = _get_entry(a, struct bmp_idx_node, avl);
812                    avl_remove(&rsv->bmp_idx, a);
813                    free(item);
814                }
815
816                // move to next bmp node
817                do {
818                    a = avl_first(&rsv->bmp_idx);
819                    if (a) {
820                        item = _get_entry(a, struct bmp_idx_node, avl);
821                        if (item->id <= node_id) {
822                            avl_remove(&rsv->bmp_idx, a);
823                            free(item);
824                            continue;
825                        }
826                        cur = item->id * 256;
827                        break;
828                    }
829
830                    // no more reusable block
831                    cur = rsv->bmp_size;
832                    break;
833                } while (true);
834            }
835        }
836        rsv->num_free_blocks = 0;
837        rsv->cur_alloc_bid = BLK_NOT_FOUND;
838
839        _free_bmp_idx(&rsv->bmp_idx);
840        _rsv_free(rsv);
841        free(rsv);
842        sb->rsv_bmp = NULL;
843    }
844
845    // re-store into stale tree using next header's revnum
846    filemgr_header_revnum_t revnum = handle->cur_header_revnum;
847    fdb_gather_stale_blocks(handle, revnum+1, BLK_NOT_FOUND, BLK_NOT_FOUND, 0, NULL, false);
848}
849
850void sb_bmp_mask_init()
851{
852    // preset masks to speed up bitmap set/clear operations
853    size_t i, pos, len;
854    for (i=0; i<8; ++i) {
855        bmp_basic_mask[i] = 0x1 << (7-i);
856    }
857    for (pos=0; pos<8; ++pos) {
858        for (len=0; len<9; ++len) {
859            bmp_2d_mask[pos][len] = 0x0;
860            if (len != 0 && pos+len <= 8) {
861                for (i=0; i<len; ++i) {
862                    bmp_2d_mask[pos][len] |= bmp_basic_mask[pos+i];
863                }
864            }
865        }
866    }
867}
868
869INLINE void _sb_bmp_update(uint8_t *bmp, bid_t bid, uint64_t len, int mode)
870{
871    // mode==0: bitmap clear
872    // mode==1: bitmap set
873
874    uint64_t front_pos, front_len, rear_pos, rear_len;
875    uint64_t mid_pos, mid_len;
876
877    //      front_len        rear_len
878    //      <->              <-->
879    // 00000111 | 11111111 | 11110000
880    //      ^     <------> mid
881    //      front_pos
882
883    front_pos = bid;
884    front_len = 8 - mod8(front_pos);
885
886    if (front_len >= len) {
887        front_len = len;
888        rear_pos = rear_len = mid_pos = mid_len = 0;
889    } else {
890        rear_pos = rd8(bid + len);
891        rear_len = mod8(bid + len);
892
893        mid_pos = bid + front_len;
894        mid_len = len - front_len - rear_len;
895    }
896
897    // front bitmaps
898    if (front_len) {
899        if (mode) {
900            bmp[div8(front_pos)] |= bmp_2d_mask[mod8(front_pos)][front_len];
901        } else {
902            bmp[div8(front_pos)] &= ~bmp_2d_mask[mod8(front_pos)][front_len];
903        }
904    }
905    // rear bitmaps
906    if (rear_len) {
907        if (mode) {
908            bmp[div8(rear_pos)] |= bmp_2d_mask[mod8(rear_pos)][rear_len];
909        } else {
910            bmp[div8(rear_pos)] &= ~bmp_2d_mask[mod8(rear_pos)][rear_len];
911        }
912    }
913
914    // mid bitmaps
915    uint8_t mask = (mode)?(0xff):(0x0);
916    if (mid_len == 8) {
917        // 8 bitmaps (1 byte)
918        bmp[div8(mid_pos)] = mask;
919    } else if (mid_len < 64) {
920        // 16 ~ 56 bitmaps (2 ~ 7 bytes)
921        size_t i;
922        for (i=0; i<mid_len; i+=8) {
923            bmp[div8(mid_pos+i)] = mask;
924        }
925    } else {
926        // larger than 64 bitmaps (8 bytes)
927        memset(bmp + div8(mid_pos), mask, div8(mid_len));
928    }
929}
930
931void sb_bmp_set(uint8_t *bmp, bid_t bid, uint64_t len)
932{
933    _sb_bmp_update(bmp, bid, len, 1);
934}
935
936void sb_bmp_clear(uint8_t *bmp, bid_t bid, uint64_t len)
937{
938    _sb_bmp_update(bmp, bid, len, 0);
939}
940
941bool sb_switch_reserved_blocks(struct filemgr *file)
942{
943    size_t i;
944    struct superblock *sb = file->sb;
945    struct sb_rsv_bmp *rsv = sb->rsv_bmp;
946
947    // reserved block should exist
948    if (!rsv) {
949        return false;
950    }
951    // should be in a normal status
952    if (!atomic_cas_uint32_t(&rsv->status, SB_RSV_READY, SB_RSV_VOID)) {
953        return false;
954    }
955    // now status becomes 'VOID' so that rsv_bmp is not available.
956
957    // mark stale previous system docs
958    if (sb->bmp_doc_offset) {
959        for (i=0; i<sb->num_bmp_docs; ++i) {
960            filemgr_mark_stale(file, sb->bmp_doc_offset[i],
961                _fdb_get_docsize(sb->bmp_docs[i].length));
962        }
963
964        free(sb->bmp_doc_offset);
965        free(sb->bmp_docs);
966        sb->bmp_doc_offset = NULL;
967        sb->bmp_docs = NULL;
968    }
969
970    // should flush all dirty blocks in cache
971    filemgr_sync(file, false, NULL);
972
973    // free current bitmap idx
974    _free_bmp_idx(&sb->bmp_idx);
975
976    // temporarily keep current bitmap
977    sb_bmp_change_begin(sb);
978    uint8_t *old_prev_bmp = NULL;
979    if (sb->bmp_prev) {
980        old_prev_bmp = sb->bmp_prev;
981    }
982    sb->bmp_prev = sb->bmp;
983    sb->bmp_prev_size = atomic_get_uint64_t(&sb->bmp_size);
984
985    // copy all pointers from rsv to sb
986    atomic_store(&sb->bmp_revnum, rsv->bmp_revnum);
987    atomic_store_uint64_t(&sb->bmp_size, rsv->bmp_size);
988    sb->bmp = rsv->bmp;
989    sb->bmp_idx = rsv->bmp_idx;
990    sb->bmp_doc_offset = rsv->bmp_doc_offset;
991    sb->bmp_docs = rsv->bmp_docs;
992    sb->num_bmp_docs = rsv->num_bmp_docs;
993    sb->num_free_blocks = sb->num_init_free_blocks = rsv->num_free_blocks;
994    atomic_store_uint64_t(&sb->cur_alloc_bid, rsv->cur_alloc_bid);
995    sb->min_live_hdr_revnum = rsv->min_live_hdr_revnum;
996    sb->min_live_hdr_bid = rsv->min_live_hdr_bid;
997    sb_bmp_change_end(sb);
998
999    free(old_prev_bmp);
1000    free(sb->rsv_bmp);
1001    sb->rsv_bmp = NULL;
1002
1003    return true;
1004}
1005
1006bid_t sb_alloc_block(struct filemgr *file)
1007{
1008    uint64_t i, node_idx, node_off, bmp_idx, bmp_off;
1009    bid_t ret = BLK_NOT_FOUND;
1010    struct superblock *sb = file->sb;
1011    struct avl_node *a;
1012    struct bmp_idx_node *item, query;
1013
1014    sb->num_alloc++;
1015sb_alloc_start_over:
1016    if (!sb_bmp_exists(sb)) {
1017        // no bitmap
1018        return BLK_NOT_FOUND;
1019    }
1020
1021    if (sb->num_free_blocks == 0) {
1022        bool switched = false;
1023        if (sb->rsv_bmp) {
1024            switched = sb_switch_reserved_blocks(file);
1025        }
1026        if (switched) {
1027            goto sb_alloc_start_over;
1028        } else {
1029            atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
1030            return BLK_NOT_FOUND;
1031        }
1032    }
1033
1034    ret = atomic_get_uint64_t(&sb->cur_alloc_bid);
1035    sb->num_free_blocks--;
1036
1037    if (sb->num_free_blocks == 0) {
1038        bool switched = false;
1039        if (sb->rsv_bmp) {
1040            switched = sb_switch_reserved_blocks(file);
1041        }
1042        if (!switched) {
1043            atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
1044        }
1045        return ret;
1046    }
1047
1048    // find allocable block in the same bmp idx node
1049    node_idx = ret >> 8;
1050    node_off = (ret & 0xff)+1;
1051    do {
1052        for (i=node_off; i<256; ++i) {
1053            bmp_idx = div8(i) + (node_idx * 32);
1054            bmp_off = mod8(i);
1055
1056            if (bmp_idx*8 + bmp_off >= atomic_get_uint64_t(&sb->bmp_size)) {
1057                break;
1058            }
1059
1060            if (sb->bmp[bmp_idx] & bmp_basic_mask[bmp_off]) {
1061                atomic_store_uint64_t(&sb->cur_alloc_bid, bmp_idx*8 + bmp_off);
1062                return ret;
1063            }
1064        }
1065
1066        // current bmp_node does not include any free block .. remove
1067        query.id = node_idx;
1068        a = avl_search(&sb->bmp_idx, &query.avl, _bmp_idx_cmp);
1069        if (a) {
1070            item = _get_entry(a, struct bmp_idx_node, avl);
1071            avl_remove(&sb->bmp_idx, a);
1072            free(item);
1073        }
1074
1075        // get next allocable bmp_node
1076        a = avl_first(&sb->bmp_idx);
1077        if (!a) {
1078            // no more free bmp_node
1079            sb->num_free_blocks = 0;
1080            bool switched = false;
1081            if (sb->rsv_bmp) {
1082                switched = sb_switch_reserved_blocks(file);
1083            }
1084            if (!switched) {
1085                atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
1086            }
1087            break;
1088        }
1089        item = _get_entry(a, struct bmp_idx_node, avl);
1090        node_idx = item->id;
1091        node_off = 0;
1092    } while (true);
1093
1094    return ret;
1095}
1096
1097bool sb_bmp_is_writable(struct filemgr *file, bid_t bid)
1098{
1099    if (bid < file->sb->config->num_sb) {
1100        // superblocks are always writable
1101        return true;
1102    }
1103
1104    bool ret = false;
1105    bid_t last_commit = atomic_get_uint64_t(&file->last_commit) / file->blocksize;
1106    uint64_t lw_bmp_revnum = atomic_get_uint64_t(&file->last_writable_bmp_revnum);
1107    struct superblock *sb = file->sb;
1108
1109    sb_bmp_barrier_on(sb);
1110
1111    uint8_t *sb_bmp = sb->bmp;
1112    if (atomic_get_uint64_t(&sb->bmp_revnum) == lw_bmp_revnum) {
1113        // Same bitmap revision number: there are 2 possible cases
1114        //
1115        // (1) normal case
1116        //               writable blocks
1117        //            <---->          <--->
1118        // +-------------------------------------------+
1119        // |       xxxxxxxxx          xxxxxxxxxx       | (x: reused blocks)
1120        // +-------------------------------------------+
1121        //            ^                   ^
1122        //            last_commit         cur_alloc
1123        //
1124        // (2) when file size grows after block reusing
1125        //                                   writable blocks
1126        //                             <------->       <--------->
1127        // +-------------------------------------------+---------+
1128        // |       xxxxxxxxx          xxxxxxxxxx       |         |
1129        // +-------------------------------------------+---------+
1130        //                             ^               ^         ^
1131        //                             last_commit     bmp_size  cur_alloc
1132
1133        if (bid < atomic_get_uint64_t(&sb->bmp_size)) {
1134            // BID is in the bitmap .. check if bitmap is set.
1135            if (_is_bmp_set(sb_bmp, bid) &&
1136                bid < atomic_get_uint64_t(&sb->cur_alloc_bid) &&
1137                bid >= last_commit) {
1138                ret = true;
1139            }
1140        } else {
1141            // BID is out-of-range of the bitmap
1142            if (bid >= last_commit) {
1143                ret = true;
1144            }
1145        }
1146    } else {
1147        // Different bitmap revision number: there are also 2 possible cases
1148        //
1149        // (1) normal case
1150        //     writable blocks                 writable blocks
1151        //         <---->                          <--->
1152        // +-------------------------------------------+
1153        // |       xxxxxxxxx          xxxxxxxxxx       |
1154        // +-------------------------------------------+
1155        //              ^                          ^
1156        //              cur_alloc                  last_commit
1157        //
1158        // (2) when file size grows after block reusing
1159        //         writable blocks             writable blocks
1160        //      <---->       <------>      <-----------x--------->
1161        // +-------------------------------------------+---------+
1162        // |    xxxxxx       xxxxxxxx                  |         |
1163        // +-------------------------------------------+---------+
1164        //                                 ^           ^         ^
1165        //                                 last_commit bmp_size  cur_alloc
1166
1167        // the block is writable if
1168        // 1) BID >= last_commit OR
1169        // 2) BID < cur_alloc_bid AND corresponding bitmap is set.
1170        if (bid >= last_commit) {
1171            // if prev_bmp exists, last commit position is still located on the
1172            // previous bitmap (since prev_bmp is released when a commit is invoked
1173            // in the new bitmap).
1174            // So in this case, we have to check both previous/current bitmaps.
1175            //
1176            // (1: previous bitmap (sb->bmp_prev), 2: current bitmap (sb->bmp))
1177            //
1178            //        writable blocks          writable blocks
1179            //    <-->      <---->   <-->          <> <> <>
1180            // +-------------------------------------------+
1181            // |  2222 1111 222222   2222   111111111 22 11|
1182            // +-------------------------------------------+
1183            //                                     ^
1184            //                                     last_commit
1185            if (sb->bmp_prev) {
1186                if (bid < sb->bmp_prev_size &&
1187                    _is_bmp_set(sb->bmp_prev, bid)) {
1188                    ret = true;
1189                }
1190                if (bid < atomic_get_uint64_t(&sb->bmp_size) &&
1191                    _is_bmp_set(sb_bmp, bid)) {
1192                    ret = true;
1193                }
1194                if (bid >= atomic_get_uint64_t(&sb->bmp_size)) {
1195                    // blocks newly allocated beyond the bitmap size:
1196                    // always writable
1197                    ret = true;
1198                }
1199            } else {
1200                // bmp_prev doesn't exist even though bmp_revnum is different
1201                // this happens on the first block reclaim only
1202                // so all blocks whose 'BID >= last_commit' are writable.
1203                ret = true;
1204            }
1205        }
1206
1207        if (bid < atomic_get_uint64_t(&sb->bmp_size) &&
1208            bid < atomic_get_uint64_t(&sb->cur_alloc_bid) &&
1209            _is_bmp_set(sb_bmp, bid)) {
1210            // 'cur_alloc_bid' is always smaller than 'bmp_size'
1211            // except for the case 'cur_alloc_bid == BLK_NOT_FOUND'
1212            ret = true;
1213        }
1214    }
1215
1216    sb_bmp_barrier_off(sb);
1217
1218    return ret;
1219}
1220
1221fdb_status sb_write(struct filemgr *file, size_t sb_no,
1222                    err_log_callback * log_callback)
1223{
1224    int r;
1225    int real_blocksize = file->blocksize;
1226    int blocksize = file->blocksize - BLK_MARKER_SIZE;
1227    uint8_t *buf = alca(uint8_t, real_blocksize);
1228    uint32_t crc, _crc;
1229    uint64_t enc_u64;
1230    uint64_t num_docs;
1231    size_t i, offset;
1232    fdb_status fs;
1233
1234    memset(buf, 0x0, real_blocksize);
1235
1236    offset = 0;
1237    // magic number
1238    enc_u64 = _endian_encode(file->version);
1239    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1240    offset += sizeof(enc_u64);
1241
1242    // revision number
1243    uint64_t sb_revnum = atomic_get_uint64_t(&file->sb->revnum);
1244    enc_u64 = _endian_encode(sb_revnum);
1245    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1246    offset += sizeof(enc_u64);
1247
1248    // bitmap's revision number
1249    enc_u64 = _endian_encode(atomic_get_uint64_t(&file->sb->bmp_revnum));
1250    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1251    offset += sizeof(enc_u64);
1252
1253    // cur_alloc_bid
1254    bid_t sb_cur_alloc_bid = atomic_get_uint64_t(&file->sb->cur_alloc_bid);
1255    enc_u64 = _endian_encode(sb_cur_alloc_bid);
1256    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1257    offset += sizeof(enc_u64);
1258
1259    // last header bid
1260    bid_t sb_last_hdr_bid = atomic_get_uint64_t(&file->sb->last_hdr_bid);
1261    enc_u64 = _endian_encode(sb_last_hdr_bid);
1262    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1263    offset += sizeof(enc_u64);
1264
1265    // last header rev number
1266    uint64_t sb_last_hdr_revnum = atomic_get_uint64_t(&file->sb->last_hdr_revnum);
1267    enc_u64 = _endian_encode(sb_last_hdr_revnum);
1268    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1269    offset += sizeof(enc_u64);
1270
1271    // minimum active header revnum
1272    enc_u64 = _endian_encode(file->sb->min_live_hdr_revnum);
1273    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1274    offset += sizeof(enc_u64);
1275
1276    // minimum active header BID
1277    enc_u64 = _endian_encode(file->sb->min_live_hdr_bid);
1278    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1279    offset += sizeof(enc_u64);
1280
1281    // # initial free blocks
1282    enc_u64 = _endian_encode(file->sb->num_init_free_blocks);
1283    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1284    offset += sizeof(enc_u64);
1285
1286    // # free blocks
1287    enc_u64 = _endian_encode(file->sb->num_free_blocks);
1288    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1289    offset += sizeof(enc_u64);
1290
1291    // bitmap size
1292    uint64_t sb_bmp_size = atomic_get_uint64_t(&file->sb->bmp_size);
1293    enc_u64 = _endian_encode(sb_bmp_size);
1294    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1295    offset += sizeof(enc_u64);
1296
1297    bool rsv_bmp_enabled = false;
1298
1299    if ( file->sb->rsv_bmp &&
1300         atomic_cas_uint32_t(&file->sb->rsv_bmp->status,
1301                             SB_RSV_READY, SB_RSV_WRITING) ) {
1302        rsv_bmp_enabled = true;
1303        // status becomes 'WRITING' so that switching will be postponed.
1304        // note that 'rsv_bmp' is not currently used yet so that
1305        // it won't block any other tasks except for switching.
1306    }
1307
1308    // reserved bitmap size (0 if not exist)
1309    if (rsv_bmp_enabled) {
1310        enc_u64 = _endian_encode(file->sb->rsv_bmp->bmp_size);
1311    } else {
1312        enc_u64 = 0;
1313    }
1314    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1315    offset += sizeof(enc_u64);
1316
1317    // bitmap doc offsets
1318    num_docs = _bmp_size_to_num_docs(sb_bmp_size);
1319    for (i=0; i<num_docs; ++i) {
1320        enc_u64 = _endian_encode(file->sb->bmp_doc_offset[i]);
1321        memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1322        offset += sizeof(enc_u64);
1323    }
1324
1325    // reserved bitmap doc offsets
1326    if (rsv_bmp_enabled) {
1327        num_docs = _bmp_size_to_num_docs(file->sb->rsv_bmp->bmp_size);
1328        for (i=0; i<num_docs; ++i) {
1329            enc_u64 = _endian_encode(file->sb->rsv_bmp->bmp_doc_offset[i]);
1330            memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1331            offset += sizeof(enc_u64);
1332        }
1333
1334        atomic_store_uint32_t(&file->sb->rsv_bmp->status, SB_RSV_READY);
1335    }
1336
1337    // CRC
1338    crc = get_checksum(buf, offset, file->crc_mode);
1339    _crc = _endian_encode(crc);
1340    memcpy(buf + offset, &_crc, sizeof(_crc));
1341
1342    // set block marker
1343    memset(buf + blocksize, BLK_MARKER_SB, BLK_MARKER_SIZE);
1344
1345    // directly write a block bypassing block cache
1346    r = file->ops->pwrite(file->fd, buf, real_blocksize, sb_no * real_blocksize);
1347    if (r != real_blocksize) {
1348        char errno_msg[512];
1349        file->ops->get_errno_str(errno_msg, 512);
1350        fs = FDB_RESULT_SB_RACE_CONDITION;
1351        fdb_log(log_callback, fs,
1352                "Failed to write the superblock (number: %" _F64 "), %s",
1353                sb_no, errno_msg);
1354        return fs;
1355    }
1356
1357    // increase superblock's revision number
1358    atomic_incr_uint64_t(&file->sb->revnum);
1359
1360    return FDB_RESULT_SUCCESS;
1361}
1362
1363static void _rsv_free(struct sb_rsv_bmp *rsv)
1364{
1365    free(rsv->bmp);
1366    free(rsv->bmp_doc_offset);
1367    free(rsv->bmp_docs);
1368}
1369
1370static fdb_status _sb_read_given_no(struct filemgr *file,
1371                                    size_t sb_no,
1372                                    struct superblock *sb,
1373                                    err_log_callback *log_callback)
1374{
1375    int r;
1376    int real_blocksize = file->blocksize;
1377    int blocksize = file->blocksize - BLK_MARKER_SIZE;
1378    size_t i, num_docs;
1379    uint8_t *buf = alca(uint8_t, real_blocksize);
1380    uint32_t crc_file, crc, _crc;
1381    uint64_t enc_u64, version, offset, dummy64;
1382    fdb_status fs;
1383    struct sb_rsv_bmp *rsv = NULL;
1384
1385    memset(buf, 0x0, real_blocksize);
1386    offset = 0;
1387
1388    // directly read a block bypassing block cache
1389    r = file->ops->pread(file->fd, buf, real_blocksize, sb_no * real_blocksize);
1390    if (r != real_blocksize) {
1391        char errno_msg[512];
1392        file->ops->get_errno_str(errno_msg, 512);
1393        fs = FDB_RESULT_SB_READ_FAIL;
1394        fdb_log(log_callback, fs,
1395                "Failed to read the superblock: file read failure (SB No.: %" _F64 "), %s",
1396                sb_no, errno_msg);
1397        return fs;
1398    }
1399
1400    // block marker check
1401    if (buf[blocksize] != BLK_MARKER_SB) {
1402        fs = FDB_RESULT_SB_READ_FAIL;
1403        fdb_log(log_callback, fs,
1404                "Failed to read the superblock: "
1405                "incorrect block marker (marker: %x, SB No.: %" _F64 "). "
1406                "Note: this message might be a false alarm if upgrade is running.",
1407                buf[blocksize], sb_no);
1408        return fs;
1409    }
1410
1411    // magic number
1412    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1413    offset += sizeof(enc_u64);
1414    version = _endian_decode(enc_u64);
1415
1416    // version check
1417    if (!ver_superblock_support(version)) {
1418        fs = FDB_RESULT_SB_READ_FAIL;
1419        fdb_log(log_callback, fs,
1420                "Failed to read the superblock: "
1421                "not supported version (magic: %" _F64 ", SB No.: %" _F64 ")",
1422                version, sb_no);
1423        return fs;
1424    }
1425
1426    // revision number
1427    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1428    offset += sizeof(enc_u64);
1429    atomic_store_uint64_t(&sb->revnum, _endian_decode(enc_u64));
1430
1431    // bitmap's revision number
1432    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1433    offset += sizeof(enc_u64);
1434    atomic_store_uint64_t(&sb->bmp_revnum, _endian_decode(enc_u64));
1435
1436    // cur_alloc_bid
1437    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1438    offset += sizeof(enc_u64);
1439    atomic_store_uint64_t(&sb->cur_alloc_bid, _endian_decode(enc_u64));
1440
1441    // last header bid
1442    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1443    offset += sizeof(enc_u64);
1444    atomic_store_uint64_t(&sb->last_hdr_bid, _endian_decode(enc_u64));
1445
1446    // last header rev number
1447    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1448    offset += sizeof(enc_u64);
1449    atomic_store_uint64_t(&sb->last_hdr_revnum, _endian_decode(enc_u64));
1450
1451    // minimum active header revnum
1452    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1453    offset += sizeof(enc_u64);
1454    sb->min_live_hdr_revnum = _endian_decode(enc_u64);
1455
1456    // minimum active header BID
1457    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1458    offset += sizeof(enc_u64);
1459    sb->min_live_hdr_bid = _endian_decode(enc_u64);
1460
1461    // # initial free blocks
1462    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1463    offset += sizeof(enc_u64);
1464    sb->num_init_free_blocks = _endian_decode(enc_u64);
1465
1466    // # free blocks
1467    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1468    offset += sizeof(enc_u64);
1469    sb->num_free_blocks = _endian_decode(enc_u64);
1470
1471    // bitmap size
1472    uint64_t sb_bmp_size;
1473    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1474    offset += sizeof(enc_u64);
1475    sb_bmp_size = _endian_decode(enc_u64);
1476    atomic_store_uint64_t(&sb->bmp_size, sb_bmp_size);
1477
1478    // reserved bitmap size
1479    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1480    offset += sizeof(enc_u64);
1481    dummy64 = _endian_decode(enc_u64);
1482    if (dummy64) {
1483        // reserved bitmap array exists
1484        rsv = (struct sb_rsv_bmp*)calloc(1, sizeof(struct sb_rsv_bmp));
1485        rsv->bmp = NULL;
1486        rsv->bmp_size = dummy64;
1487        rsv->cur_alloc_bid = BLK_NOT_FOUND;
1488        atomic_init_uint32_t(&rsv->status, SB_RSV_INITIALIZING);
1489    }
1490
1491    // temporarily set bitmap array to NULL
1492    // (it will be allocated by fetching function)
1493    sb->bmp = NULL;
1494
1495    sb->num_bmp_docs = num_docs = _bmp_size_to_num_docs(sb_bmp_size);
1496    if (num_docs) {
1497        sb->bmp_doc_offset = (bid_t*)calloc(num_docs, sizeof(bid_t));
1498        sb->bmp_docs = (struct docio_object*)
1499                       calloc(num_docs, sizeof(struct docio_object));
1500    }
1501
1502    // read doc offsets
1503    for (i=0; i<num_docs; ++i) {
1504        memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1505        offset += sizeof(enc_u64);
1506        sb->bmp_doc_offset[i] = _endian_decode(enc_u64);
1507    }
1508
1509    // read reserved bmp docs if exist
1510    if (rsv) {
1511        rsv->num_bmp_docs = num_docs = _bmp_size_to_num_docs(rsv->bmp_size);
1512        if (rsv->num_bmp_docs) {
1513            rsv->bmp_doc_offset = (bid_t*)calloc(num_docs, sizeof(bid_t));
1514            rsv->bmp_docs = (struct docio_object*)
1515                            calloc(num_docs, sizeof(struct docio_object));
1516        }
1517
1518        // read doc offsets
1519        for (i=0; i<num_docs; ++i) {
1520            memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1521            offset += sizeof(enc_u64);
1522            rsv->bmp_doc_offset[i] = _endian_decode(enc_u64);
1523        }
1524        sb->rsv_bmp = rsv;
1525    }
1526
1527    // CRC
1528    crc = get_checksum(buf, offset, file->crc_mode);
1529    memcpy(&_crc, buf + offset, sizeof(_crc));
1530    crc_file = _endian_decode(_crc);
1531    if (crc != crc_file) {
1532        free(sb->bmp_doc_offset);
1533        free(sb->bmp_docs);
1534        sb->bmp_doc_offset = NULL;
1535        sb->bmp_docs = NULL;
1536
1537        fs = FDB_RESULT_SB_READ_FAIL;
1538        fdb_log(log_callback, fs,
1539                "Failed to read the superblock: "
1540                "not supported version (magic: %" _F64 ", SB No.: %" _F64 ")",
1541                version, sb_no);
1542        return fs;
1543    }
1544
1545    return FDB_RESULT_SUCCESS;
1546}
1547
1548static void _sb_free(struct superblock *sb)
1549{
1550    if (sb->rsv_bmp) {
1551        atomic_store_uint32_t(&sb->rsv_bmp->status, SB_RSV_VOID);
1552        _free_bmp_idx(&sb->rsv_bmp->bmp_idx);
1553        _rsv_free(sb->rsv_bmp);
1554        free(sb->rsv_bmp);
1555    }
1556    _free_bmp_idx(&sb->bmp_idx);
1557
1558    free(sb->bmp);
1559    free(sb->bmp_prev);
1560    free(sb->bmp_doc_offset);
1561    // note that each docio object doesn't need to be freed
1562    // as key/body fields point to static memory regions.
1563    free(sb->bmp_docs);
1564    free(sb->config);
1565    spin_destroy(&sb->lock);
1566
1567    sb->bmp = NULL;
1568    sb->bmp_doc_offset = NULL;
1569    sb->bmp_docs = NULL;
1570    sb->config = NULL;
1571}
1572
1573void _sb_init(struct superblock *sb, struct sb_config sconfig)
1574{
1575    *sb->config = sconfig;
1576    atomic_init_uint64_t(&sb->revnum, 0);
1577    atomic_init_uint64_t(&sb->bmp_revnum, 0);
1578    atomic_init_uint64_t(&sb->bmp_size, 0);
1579    sb->bmp = NULL;
1580    atomic_init_uint64_t(&sb->bmp_rcount, 0);
1581    atomic_init_uint64_t(&sb->bmp_wcount, 0);
1582    spin_init(&sb->bmp_lock);
1583    sb->bmp_prev_size = 0;
1584    sb->bmp_prev = NULL;
1585    sb->bmp_doc_offset = NULL;
1586    sb->bmp_docs = NULL;
1587    sb->num_bmp_docs = 0;
1588    sb->num_init_free_blocks = 0;
1589    sb->num_free_blocks = 0;
1590    atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
1591    atomic_init_uint64_t(&sb->last_hdr_bid, BLK_NOT_FOUND);
1592    sb->min_live_hdr_revnum = 0;
1593    sb->min_live_hdr_bid = BLK_NOT_FOUND;
1594    atomic_init_uint64_t(&sb->last_hdr_revnum, 0);
1595    sb->num_alloc = 0;
1596    sb->rsv_bmp = NULL;
1597    avl_init(&sb->bmp_idx, NULL);
1598    spin_init(&sb->lock);
1599}
1600
1601INLINE void _sb_copy(struct superblock *dst, struct superblock *src)
1602{
1603    // since variables in 'src' won't be freed, just copy its pointers
1604    dst->config = src->config;
1605    atomic_store_uint64_t(&dst->revnum, atomic_get_uint64_t(&src->revnum));
1606    atomic_store_uint64_t(&dst->bmp_revnum, atomic_get_uint64_t(&src->bmp_revnum));
1607    atomic_store_uint64_t(&dst->bmp_size, atomic_get_uint64_t(&src->bmp_size));
1608    dst->bmp.store(src->bmp.load(std::memory_order_relaxed), std::memory_order_relaxed);
1609    atomic_store_uint64_t(&dst->bmp_rcount, atomic_get_uint64_t(&src->bmp_rcount));
1610    atomic_store_uint64_t(&dst->bmp_wcount, atomic_get_uint64_t(&src->bmp_wcount));
1611    spin_init(&dst->bmp_lock);
1612    dst->bmp_prev_size = src->bmp_prev_size;
1613    dst->bmp_prev = src->bmp_prev;
1614    dst->bmp_idx = src->bmp_idx;
1615    dst->bmp_doc_offset = src->bmp_doc_offset;
1616    dst->bmp_docs = src->bmp_docs;
1617    dst->num_bmp_docs = src->num_bmp_docs;
1618    dst->num_init_free_blocks = src->num_init_free_blocks;
1619    dst->num_free_blocks = src->num_free_blocks;
1620    dst->rsv_bmp = src->rsv_bmp;
1621    atomic_store_uint64_t(&dst->cur_alloc_bid, atomic_get_uint64_t(&src->cur_alloc_bid));
1622    atomic_store_uint64_t(&dst->last_hdr_bid, atomic_get_uint64_t(&src->last_hdr_bid));
1623    dst->min_live_hdr_revnum = src->min_live_hdr_revnum;
1624    dst->min_live_hdr_bid = src->min_live_hdr_bid;
1625    atomic_store_uint64_t(&dst->last_hdr_revnum, atomic_get_uint64_t(&src->last_hdr_revnum));
1626    dst->num_alloc = src->num_alloc;
1627    spin_init(&dst->lock);
1628}
1629
1630fdb_status sb_read_latest(struct filemgr *file,
1631                          struct sb_config sconfig,
1632                          err_log_callback *log_callback)
1633{
1634    size_t i, max_sb_no = sconfig.num_sb;
1635    uint64_t max_revnum = 0;
1636    uint64_t revnum_limit = (uint64_t)0xffffffffffffffff;
1637    fdb_status fs;
1638    struct superblock *sb_arr;
1639
1640    if (file->sb) {
1641        // Superblock is already read previously.
1642        // This means that there are some problems with the current superblock
1643        // so that we have to read another candidate.
1644
1645        // Note: 'sb->revnum' denotes the revnum of next superblock to be
1646        // written, so we need to subtract 1 from it to get the revnum of
1647        // the current superblock successfully read from the file.
1648        revnum_limit = atomic_get_uint64_t(&file->sb->revnum) - 1;
1649        sb_free(file);
1650    }
1651
1652    sb_arr = alca(struct superblock,
1653                  sconfig.num_sb * sizeof(struct superblock));
1654    memset(sb_arr, 0x0, sconfig.num_sb * sizeof(struct superblock));
1655
1656    // read all superblocks
1657    for (i=0; i<sconfig.num_sb; ++i) {
1658        sb_arr[i].config = (struct sb_config*)calloc(1, sizeof(struct sb_config));
1659        _sb_init(&sb_arr[i], sconfig);
1660        fs = _sb_read_given_no(file, i, &sb_arr[i], log_callback);
1661
1662        uint64_t cur_revnum = atomic_get_uint64_t(&sb_arr[i].revnum);
1663        if (fs == FDB_RESULT_SUCCESS &&
1664            cur_revnum >= max_revnum &&
1665            cur_revnum < revnum_limit) {
1666            max_sb_no = i;
1667            max_revnum = cur_revnum;
1668        }
1669    }
1670
1671    if (max_sb_no == sconfig.num_sb) {
1672        // all superblocks are broken
1673        fs = FDB_RESULT_SB_READ_FAIL;
1674        for (i=0; i<sconfig.num_sb; ++i) {
1675            _sb_free(&sb_arr[i]);
1676        }
1677        // no readable superblock
1678        // (if not corrupted, it may be a normal old version file)
1679        return fs;
1680    }
1681
1682    file->sb = (struct superblock*)calloc(1, sizeof(struct superblock));
1683    _sb_copy(file->sb, &sb_arr[max_sb_no]);
1684
1685    // set last commit position
1686    if (atomic_get_uint64_t(&file->sb->cur_alloc_bid) != BLK_NOT_FOUND) {
1687        atomic_store_uint64_t(&file->last_commit,
1688                              atomic_get_uint64_t(&file->sb->cur_alloc_bid) *
1689                              file->config->blocksize);
1690    } else {
1691        // otherwise, last_commit == file->pos
1692        // (already set by filemgr_open() function)
1693    }
1694
1695    atomic_incr_uint64_t(&file->sb->revnum);
1696    avl_init(&file->sb->bmp_idx, NULL);
1697
1698    // free the other superblocks
1699    for (i=0; i<sconfig.num_sb; ++i) {
1700        if (i != max_sb_no) {
1701            _sb_free(&sb_arr[i]);
1702        }
1703    }
1704
1705    return FDB_RESULT_SUCCESS;
1706}
1707
1708uint64_t sb_get_bmp_revnum(struct filemgr *file)
1709{
1710    if (file->sb) {
1711        return atomic_get_uint64_t(&file->sb->bmp_revnum);
1712    } else {
1713        return 0;
1714    }
1715}
1716
1717filemgr_header_revnum_t sb_get_min_live_revnum(struct filemgr *file)
1718{
1719    if (file->sb) {
1720        return file->sb->min_live_hdr_revnum;
1721    } else {
1722        return 0;
1723    }
1724}
1725
1726uint64_t sb_get_num_free_blocks(struct filemgr *file)
1727{
1728    if (file->sb) {
1729        return file->sb->num_free_blocks;
1730    } else {
1731        return 0;
1732    }
1733}
1734
1735struct sb_config sb_get_default_config()
1736{
1737    struct sb_config ret;
1738    ret.num_sb = SB_DEFAULT_NUM_SUPERBLOCKS;
1739    return ret;
1740}
1741
1742fdb_status sb_init(struct filemgr *file, struct sb_config sconfig,
1743                   err_log_callback * log_callback)
1744{
1745    size_t i;
1746    bid_t sb_bid;
1747    fdb_status fs;
1748
1749    // exit if superblock already exists.
1750    if (file->sb) {
1751        return FDB_RESULT_SUCCESS;
1752    }
1753    // no data should be written in the file before initialization of superblock.
1754    if (filemgr_get_pos(file) > 0) {
1755        return FDB_RESULT_SB_INIT_FAIL;
1756    }
1757
1758    file->sb = (struct superblock*)calloc(1, sizeof(struct superblock));
1759    file->sb->config = (struct sb_config*)calloc(1, sizeof(struct sb_config));
1760    file->version = ver_get_latest_magic();
1761    _sb_init(file->sb, sconfig);
1762
1763    // write initial superblocks
1764    for (i=0; i<file->sb->config->num_sb; ++i) {
1765        // allocate
1766        sb_bid = filemgr_alloc(file, log_callback);
1767        if (sb_bid != i) {
1768            // other data was written during sb_write .. error
1769            fs = FDB_RESULT_SB_RACE_CONDITION;
1770            fdb_log(log_callback, fs,
1771                    "Other writer interfered during sb_write (number: %" _F64 ")",
1772                    i);
1773            free(file->sb->config);
1774            free(file->sb);
1775            return fs;
1776        }
1777
1778        fs = sb_write(file, i, log_callback);
1779        if (fs != FDB_RESULT_SUCCESS) {
1780            return fs;
1781        }
1782    }
1783
1784    return FDB_RESULT_SUCCESS;
1785}
1786
1787fdb_status sb_free(struct filemgr *file)
1788{
1789    if (file->sb) {
1790        _sb_free(file->sb);
1791        free(file->sb);
1792        file->sb = NULL;
1793    }
1794
1795    return FDB_RESULT_SUCCESS;
1796}
1797
1798
1799