xref: /6.0.3/forestdb/src/superblock.cc (revision 5b78091c)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <string.h>
20#include <stdint.h>
21#include <stdlib.h>
22
23#include "libforestdb/forestdb.h"
24#include "superblock.h"
25#include "staleblock.h"
26#include "btreeblock.h"
27#include "list.h"
28#include "fdb_internal.h"
29#include "version.h"
30#include "blockcache.h"
31
32#include "memleak.h"
33
34/*
35 * << super block structure >>
36 *
37 * 0x00 [file version (magic number)]:           8 bytes
38 * 0x08 [super block revision number]:           8 bytes
39 * 0x10 [bitmap revision number]:                8 bytes
40 * 0x18 [BID for next allocation]:               8 bytes
41 * 0x20 [last header BID]:                       8 bytes
42 * 0x28 [last header revnum]:                    8 bytes
43 * 0x30 [min active header revnum]:              8 bytes
44 * 0x38 [min active header BID]:                 8 bytes
45 * 0x40 [# initial free blocks in the bitmap]:   8 bytes
46 * 0x48 [# free blocks in the bitmap]:           8 bytes
47 * 0x50 [bitmap size]:                           8 bytes
48 * 0x58 [reserved bitmap size (0 if not exist)]: 8 bytes
49 * 0x60 ... [bitmap doc offset]:                 8 bytes each
50 *      [CRC32]:                                 4 bytes
51 * ...
52 *      [block marker]:                          1 byte
53 */
54
55
56/*
57 * << basic mask >>
58 * 0: 10000000
59 * 1: 01000000
60 *    ...
61 * 7: 00000001
62 */
63static uint8_t bmp_basic_mask[8];
64/*
65 * << 2d mask >>
66 * bmp_2d_mask[pos][len]
67 *
68 * 0,1: 10000000
69 * 0,2: 11000000
70 * 0,3: 11100000
71 *    ...
72 * 1,1: 01000000
73 * 1,2: 01100000
74 *    ...
75 * 3,1: 00010000
76 * 3,2: 00011000
77 *    ...
78 * 6,1: 00000010
79 * 6,2: 00000011
80 * 7,1: 00000001
81 */
82static uint8_t bmp_2d_mask[8][9];
83
84
85// x % 8
86#define mod8(x) ((x) & 0x7)
87// x / 8
88#define div8(x) ((x) >> 3)
89// rounding down (to the nearest 8)
90#define rd8(x) (((x) >> 3) << 3)
91
92struct bmp_idx_node {
93    uint64_t id;
94    struct avl_node avl;
95};
96
97INLINE int _bmp_idx_cmp(struct avl_node *a, struct avl_node *b, void *aux)
98{
99    struct bmp_idx_node *aa, *bb;
100    aa = _get_entry(a, struct bmp_idx_node, avl);
101    bb = _get_entry(b, struct bmp_idx_node, avl);
102
103#ifdef __BIT_CMP
104    return _CMP_U64(aa->id, bb->id);
105#else
106    if (aa->id < bb->id) {
107        return -1;
108    } else if (aa->id > bb->id) {
109        return 1;
110    } else {
111        return 0;
112    }
113#endif
114}
115
116static void _add_bmp_idx(struct avl_tree *bmp_idx, bid_t bid, bid_t count)
117{
118    bid_t cur, start_id, stop_id;
119    struct avl_node *a;
120    struct bmp_idx_node *item, query;
121
122    // 256 blocks per node
123    start_id = bid >> 8;
124    stop_id = (bid+count-1) >> 8;
125
126    for (cur=start_id; cur<=stop_id; ++cur) {
127        query.id = cur;
128        a = avl_search(bmp_idx, &query.avl, _bmp_idx_cmp);
129        if (a) {
130            // already exists .. do nothing
131        } else {
132            // create a node
133            item = (struct bmp_idx_node*)calloc(1, sizeof(struct bmp_idx_node));
134            item->id = query.id;
135            avl_insert(bmp_idx, &item->avl, _bmp_idx_cmp);
136        }
137    }
138}
139
140static void _free_bmp_idx(struct avl_tree *bmp_idx)
141{
142    // free all supplemental bmp idx nodes
143    struct avl_node *a;
144    struct bmp_idx_node *item;
145    a = avl_first(bmp_idx);
146    while (a) {
147        item = _get_entry(a, struct bmp_idx_node, avl);
148        a = avl_next(a);
149        avl_remove(bmp_idx, &item->avl);
150        free(item);
151    }
152}
153
154static void _construct_bmp_idx(struct avl_tree *bmp_idx,
155                               uint8_t *bmp,
156                               uint64_t bmp_size,
157                               bid_t start_bid)
158{
159    uint64_t i, node_idx;
160    uint64_t *bmp64 = (uint64_t*)bmp;
161    struct bmp_idx_node *item;
162
163    if (start_bid == BLK_NOT_FOUND) {
164        start_bid = 0;
165    }
166
167    // Since a single byte includes 8 bitmaps, an 8-byte integer contains 64 bitmaps.
168    // By converting bitmap array to uint64_t array, we can quickly verify if a
169    // 64-bitmap-group has at least one non-zero bit or not.
170    for (i=start_bid/64; i<bmp_size/64; ++i) {
171        // in this loop, 'i' denotes bitmap group number.
172        node_idx = i/4;
173        if (bmp64[i]) {
174            item = (struct bmp_idx_node *)calloc(1, sizeof(struct bmp_idx_node));
175            item->id = node_idx;
176            avl_insert(bmp_idx, &item->avl, _bmp_idx_cmp);
177            // skip other bitmaps in the same bitmap index node
178            // (1 bitmap index node == 4 bitmap groups == 256 bitmaps)
179            i = (node_idx+1)*4;
180        }
181    }
182
183    // If there are remaining bitmaps, check if they are non-zero or not one by one.
184    if (bmp_size % 64) {
185        uint8_t idx, off;
186        uint64_t start;
187
188        start = (bmp_size/64)*64;
189        if (start < start_bid) {
190            start = start_bid;
191        }
192
193        for (i=start; i<bmp_size; ++i) {
194            // in this loop, 'i' denotes bitmap number (i.e., BID).
195            idx = div8(i);
196            off = mod8(i);
197            if (bmp[idx] & bmp_basic_mask[off]) {
198                node_idx = i >> 8;
199                item = (struct bmp_idx_node *)calloc(1, sizeof(struct bmp_idx_node));
200                item->id = node_idx;
201                avl_insert(bmp_idx, &item->avl, _bmp_idx_cmp);
202                i = (node_idx+1)*256;
203            }
204        }
205    }
206}
207
208INLINE size_t _bmp_size_to_num_docs(uint64_t bmp_size)
209{
210    if (bmp_size) {
211        // 8 bitmaps per byte
212        uint64_t num_bits_per_doc = 8 * SB_MAX_BITMAP_DOC_SIZE;
213        return (bmp_size + num_bits_per_doc - 1) / num_bits_per_doc;
214    } else {
215        return 0;
216    }
217}
218
219INLINE bool _is_bmp_set(uint8_t *bmp, bid_t bid)
220{
221    return (bmp[div8(bid)] & bmp_basic_mask[mod8(bid)]);
222}
223
224static void sb_bmp_barrier_on(struct superblock *sb)
225{
226    atomic_incr_uint64_t(&sb->bmp_rcount);
227    if (atomic_get_uint64_t(&sb->bmp_wcount)) {
228        // now bmp pointer & related variables are being changed.
229        // decrease count and grab lock until the change is done.
230        atomic_decr_uint64_t(&sb->bmp_rcount);
231        spin_lock(&sb->bmp_lock);
232
233        // got control: means that change is done.
234        // re-increase the count
235        atomic_incr_uint64_t(&sb->bmp_rcount);
236        spin_unlock(&sb->bmp_lock);
237    }
238}
239
240static void sb_bmp_barrier_off(struct superblock *sb)
241{
242    atomic_decr_uint64_t(&sb->bmp_rcount);
243}
244
245static void sb_bmp_change_begin(struct superblock *sb)
246{
247    // grab lock and increase writer count
248    // now new readers cannot increase the reader count
249    // (note that there is always one bitmap writer at a time)
250    spin_lock(&sb->bmp_lock);
251    atomic_incr_uint64_t(&sb->bmp_wcount);
252
253    // wait until previous BMP readers terminate
254    // (they are very short bitmap accessing routines so that
255    //  will not take long time).
256    size_t spin = 0;
257    while (atomic_get_uint64_t(&sb->bmp_rcount)) {
258       if (++spin > 64) {
259#ifdef HAVE_SCHED_H
260            sched_yield();
261#elif _MSC_VER
262            SwitchToThread();
263#endif
264       }
265    }
266
267    // now 1) all previous readers terminated
268    //     2) new readers will be blocked
269}
270
271static void sb_bmp_change_end(struct superblock *sb)
272{
273    atomic_decr_uint64_t(&sb->bmp_wcount);
274    spin_unlock(&sb->bmp_lock);
275    // now resume all pending readers
276}
277
278void sb_bmp_append_doc(fdb_kvs_handle *handle)
279{
280    // == write bitmap into system docs ==
281    // calculate # docs (1MB by default)
282    // (1MB bitmap covers 32GB DB file)
283    size_t i;
284    uint64_t num_docs;
285    char doc_key[64];
286    struct superblock *sb = handle->file->sb;
287
288    // mark stale if previous doc offset exists
289    if (sb->bmp_doc_offset) {
290        for (i=0; i<sb->num_bmp_docs; ++i) {
291            filemgr_mark_stale(handle->file, sb->bmp_doc_offset[i],
292                _fdb_get_docsize(sb->bmp_docs[i].length));
293        }
294
295        free(sb->bmp_doc_offset);
296        free(sb->bmp_docs);
297        sb->bmp_doc_offset = NULL;
298        sb->bmp_docs = NULL;
299    }
300
301    uint64_t sb_bmp_size = atomic_get_uint64_t(&sb->bmp_size);
302    sb->num_bmp_docs = num_docs = _bmp_size_to_num_docs(sb_bmp_size);
303    if (num_docs) {
304        sb->bmp_doc_offset = (bid_t*)calloc(num_docs, sizeof(bid_t));
305        sb->bmp_docs = (struct docio_object*)
306                       calloc(num_docs, sizeof(struct docio_object));
307    }
308
309    // bitmap doc offsets
310    for (i=0; i<num_docs; ++i) {
311        // append a system doc for bitmap chunk
312        memset(&sb->bmp_docs[i], 0x0, sizeof(struct docio_object));
313        sprintf(doc_key, "bitmap_%" _F64 "_%d",
314                atomic_get_uint64_t(&sb->bmp_revnum), (int)i);
315        sb->bmp_docs[i].key = (void*)doc_key;
316        sb->bmp_docs[i].meta = NULL;
317        sb->bmp_docs[i].body = sb->bmp + (i * SB_MAX_BITMAP_DOC_SIZE);
318
319        sb->bmp_docs[i].length.keylen = strlen(doc_key)+1;
320        sb->bmp_docs[i].length.metalen = 0;
321        if (i == num_docs - 1) {
322            // the last doc
323            sb->bmp_docs[i].length.bodylen =
324                (sb_bmp_size / 8) % SB_MAX_BITMAP_DOC_SIZE;
325        } else {
326            // otherwise: 1MB
327            sb->bmp_docs[i].length.bodylen = SB_MAX_BITMAP_DOC_SIZE;
328        }
329        sb->bmp_docs[i].seqnum = 0;
330        sb->bmp_doc_offset[i] =
331            docio_append_doc_system(handle->dhandle, &sb->bmp_docs[i]);
332    }
333}
334
335void sb_rsv_append_doc(fdb_kvs_handle *handle)
336{
337    size_t i;
338    uint64_t num_docs;
339    char doc_key[64];
340    struct superblock *sb = handle->file->sb;
341    struct sb_rsv_bmp *rsv = NULL;
342
343    if (!sb) {
344        return;
345    }
346
347    rsv = sb->rsv_bmp;
348    if (!rsv ||
349        atomic_get_uint32_t(&rsv->status) != SB_RSV_INITIALIZING) {
350        return;
351    }
352
353    rsv->num_bmp_docs = num_docs = _bmp_size_to_num_docs(rsv->bmp_size);
354    if (num_docs) {
355        rsv->bmp_doc_offset = (bid_t*)calloc(num_docs, sizeof(bid_t));
356        rsv->bmp_docs = (struct docio_object*)
357                        calloc(num_docs, sizeof(struct docio_object));
358    }
359
360    // bitmap doc offsets
361    for (i=0; i<num_docs; ++i) {
362        // append a system doc for bitmap chunk
363        memset(&rsv->bmp_docs[i], 0x0, sizeof(struct docio_object));
364        sprintf(doc_key, "bitmap_%" _F64 "_%d", rsv->bmp_revnum, (int)i);
365        rsv->bmp_docs[i].key = (void*)doc_key;
366        rsv->bmp_docs[i].meta = NULL;
367        rsv->bmp_docs[i].body = rsv->bmp + (i * SB_MAX_BITMAP_DOC_SIZE);
368
369        rsv->bmp_docs[i].length.keylen = strlen(doc_key)+1;
370        rsv->bmp_docs[i].length.metalen = 0;
371        if (i == num_docs - 1) {
372            // the last doc
373            rsv->bmp_docs[i].length.bodylen =
374                (rsv->bmp_size / 8) % SB_MAX_BITMAP_DOC_SIZE;
375        } else {
376            // otherwise: 1MB
377            rsv->bmp_docs[i].length.bodylen = SB_MAX_BITMAP_DOC_SIZE;
378        }
379        rsv->bmp_docs[i].seqnum = 0;
380        rsv->bmp_doc_offset[i] =
381            docio_append_doc_system(handle->dhandle, &rsv->bmp_docs[i]);
382    }
383
384    // now 'rsv_bmp' is available.
385    atomic_store_uint32_t(&rsv->status, SB_RSV_READY);
386}
387
388fdb_status sb_bmp_fetch_doc(fdb_kvs_handle *handle)
389{
390    // == read bitmap from system docs ==
391    size_t i;
392    uint64_t num_docs;
393    int64_t r_offset;
394    char doc_key[64];
395    struct superblock *sb = handle->file->sb;
396    struct sb_rsv_bmp *rsv = NULL;
397
398    // skip if previous bitmap exists OR
399    // there is no bitmap to be fetched (fast screening)
400    if (sb->bmp.load(std::memory_order_relaxed) ||
401        atomic_get_uint64_t(&sb->bmp_size) == 0) {
402        return FDB_RESULT_SUCCESS;
403    }
404
405    spin_lock(&sb->lock);
406
407    // check once again if superblock is already initialized
408    // while the thread was blocked by the lock.
409    if (sb->bmp.load(std::memory_order_relaxed)) {
410        spin_unlock(&sb->lock);
411        return FDB_RESULT_SUCCESS;
412    }
413
414    uint64_t sb_bmp_size = atomic_get_uint64_t(&sb->bmp_size);
415    sb->num_bmp_docs = num_docs = _bmp_size_to_num_docs(sb_bmp_size);
416    if (!num_docs) {
417        spin_unlock(&sb->lock);
418        return FDB_RESULT_SUCCESS;
419    }
420
421    free(sb->bmp);
422    sb->bmp = (uint8_t*)calloc(1, (sb_bmp_size+7) / 8);
423
424    for (i=0; i<num_docs; ++i) {
425        memset(&sb->bmp_docs[i], 0x0, sizeof(struct docio_object));
426        // pre-allocated buffer for key
427        sb->bmp_docs[i].key = (void*)doc_key;
428        // directly point to the bitmap
429        sb->bmp_docs[i].body = sb->bmp + (i * SB_MAX_BITMAP_DOC_SIZE);
430
431        r_offset = docio_read_doc(handle->dhandle, sb->bmp_doc_offset[i],
432                                  &sb->bmp_docs[i], true);
433        if (r_offset <= 0) {
434            // read fail
435            free(sb->bmp);
436            sb->bmp = NULL;
437
438            spin_unlock(&sb->lock);
439            return r_offset < 0 ? (fdb_status) r_offset : FDB_RESULT_SB_READ_FAIL;
440        }
441    }
442
443    _construct_bmp_idx(&sb->bmp_idx, sb->bmp, sb_bmp_size,
444                       atomic_get_uint64_t(&sb->cur_alloc_bid));
445
446    rsv = sb->rsv_bmp;
447    if (rsv && atomic_get_uint32_t(&rsv->status) == SB_RSV_INITIALIZING) {
448        // reserved bitmap exists
449        rsv->num_bmp_docs = num_docs = _bmp_size_to_num_docs(rsv->bmp_size);
450        if (!num_docs) {
451            spin_unlock(&sb->lock);
452            return FDB_RESULT_SUCCESS;
453        }
454
455        rsv->bmp = (uint8_t*)calloc(1, (rsv->bmp_size+7) / 8);
456
457        for (i=0; i<num_docs; ++i) {
458
459            memset(&rsv->bmp_docs[i], 0x0, sizeof(struct docio_object));
460            // pre-allocated buffer for key
461            rsv->bmp_docs[i].key = (void*)doc_key;
462            // directly point to the (reserved) bitmap
463            rsv->bmp_docs[i].body = rsv->bmp + (i * SB_MAX_BITMAP_DOC_SIZE);
464
465            r_offset = docio_read_doc(handle->dhandle, rsv->bmp_doc_offset[i],
466                                      &rsv->bmp_docs[i], true);
467            if (r_offset <= 0) {
468                // read fail
469                free(rsv->bmp);
470                free(sb->bmp);
471                rsv->bmp = sb->bmp = NULL;
472
473                spin_unlock(&sb->lock);
474                return r_offset < 0 ? (fdb_status) r_offset : FDB_RESULT_SB_READ_FAIL;
475            }
476        }
477
478        _construct_bmp_idx(&rsv->bmp_idx, rsv->bmp, rsv->bmp_size, 0);
479        atomic_store_uint32_t(&rsv->status, SB_RSV_READY);
480    }
481
482    spin_unlock(&sb->lock);
483    return FDB_RESULT_SUCCESS;
484}
485
486bool sb_check_sync_period(fdb_kvs_handle *handle)
487{
488    struct superblock *sb = handle->file->sb;
489
490    if (sb && sb->num_alloc * handle->file->blocksize > SB_SYNC_PERIOD) {
491        return true;
492    }
493    return false;
494}
495
496bool sb_update_header(fdb_kvs_handle *handle)
497{
498    bool ret = false;
499    struct superblock *sb = handle->file->sb;
500
501    bid_t last_hdr_bid = atomic_get_uint64_t(&handle->last_hdr_bid);
502    if (sb && atomic_get_uint64_t(&sb->last_hdr_bid) != last_hdr_bid &&
503        atomic_get_uint64_t(&sb->last_hdr_revnum) < handle->cur_header_revnum) {
504
505        atomic_store_uint64_t(&sb->last_hdr_bid, last_hdr_bid);
506        atomic_store_uint64_t(&sb->last_hdr_revnum, handle->cur_header_revnum);
507
508        uint64_t lw_revnum = atomic_get_uint64_t(&handle->file->last_writable_bmp_revnum);
509        if (lw_revnum == atomic_get_uint64_t(&sb->bmp_revnum) &&
510            sb->bmp_prev) {
511            free(sb->bmp_prev);
512            sb->bmp_prev = NULL;
513        }
514        ret = true;
515    }
516
517    return ret;
518}
519
520void sb_reset_num_alloc(fdb_kvs_handle *handle)
521{
522    if (handle->file->sb) {
523        handle->file->sb->num_alloc = 0;
524    }
525}
526
527fdb_status sb_sync_circular(fdb_kvs_handle *handle)
528{
529    uint64_t sb_revnum;
530    fdb_status fs;
531
532    sb_revnum = atomic_get_uint64_t(&handle->file->sb->revnum);
533    fs = sb_write(handle->file,
534                  sb_revnum % handle->file->sb->config->num_sb,
535                  &handle->log_callback);
536    return fs;
537}
538
539sb_decision_t sb_check_block_reusing(fdb_kvs_handle *handle)
540
541{
542    // start block reusing when
543    // 1) if blocks are not reused yet in this file:
544    //    when file size becomes larger than the threshold
545    // 2) otherwise:
546    //    when # free blocks decreases under the threshold
547
548    uint64_t live_datasize;
549    uint64_t filesize;
550    uint64_t ratio;
551    struct superblock *sb = handle->file->sb;
552
553    if (!sb) {
554        return SBD_NONE;
555    }
556
557    if (filemgr_get_file_status(handle->file) != FILE_NORMAL) {
558        // being compacted file does not allow block reusing
559        return SBD_NONE;
560    }
561
562    uint64_t block_reusing_threshold =
563        atomic_get_uint64_t(&handle->file->config->block_reusing_threshold,
564                            std::memory_order_relaxed);
565    if (block_reusing_threshold == 0 || block_reusing_threshold >= 100) {
566        // circular block reusing is disabled
567        return SBD_NONE;
568    }
569
570    filesize = filemgr_get_pos(handle->file);
571    if (filesize < SB_MIN_BLOCK_REUSING_FILESIZE) {
572        return SBD_NONE;
573    }
574
575    // at least # keeping headers should exist
576    // since the last block reusing
577    if (handle->cur_header_revnum <=
578        sb->min_live_hdr_revnum +
579        atomic_get_uint64_t(&handle->file->config->num_keeping_headers,
580                            std::memory_order_relaxed)) {
581        return SBD_NONE;
582    }
583
584    live_datasize = fdb_estimate_space_used(handle->fhandle);
585    if (filesize == 0 || live_datasize == 0 ||
586        live_datasize > filesize) {
587        return SBD_NONE;
588    }
589
590    ratio = (filesize - live_datasize) * 100 / filesize;
591
592    if (ratio > block_reusing_threshold) {
593        if (!sb_bmp_exists(sb)) {
594            // block reusing has not been started yet
595            return SBD_RECLAIM;
596        } else {
597            // stale blocks are already being reused before
598            if (sb->num_free_blocks == 0) {
599                if (sb->rsv_bmp) {
600                    // reserved bitmap exists
601                    return SBD_SWITCH;
602                } else {
603                    // re-reclaim
604                    return SBD_RECLAIM;
605                }
606            } else if ( (sb->num_free_blocks * 100 <
607                         sb->num_init_free_blocks * SB_PRE_RECLAIM_RATIO)) {
608                if (sb->num_init_free_blocks * handle->file->config->blocksize >
609                    SB_MIN_BLOCK_REUSING_FILESIZE)  {
610                    return SBD_RESERVE;
611                }
612            }
613        }
614    }
615
616    return SBD_NONE;
617}
618
619bool sb_reclaim_reusable_blocks(fdb_kvs_handle *handle)
620{
621    size_t i;
622    uint64_t num_blocks, bmp_size_byte;
623    stale_header_info sheader;
624    reusable_block_list blist;
625    struct superblock *sb = handle->file->sb;
626
627    // should flush all dirty blocks in cache
628    filemgr_sync(handle->file, false, &handle->log_callback);
629
630    sheader = fdb_get_smallest_active_header(handle);
631    if (sheader.bid == BLK_NOT_FOUND) {
632        return false;
633    }
634
635    // get reusable block list
636    blist = fdb_get_reusable_block(handle, sheader);
637
638    // update superblock's bitmap
639    uint8_t *new_bmp = NULL, *old_bmp = NULL;
640    num_blocks = filemgr_get_pos(handle->file) / handle->file->blocksize;
641    // 8 bitmaps per byte
642    bmp_size_byte = (num_blocks+7) / 8;
643    fdb_assert(num_blocks >= SB_DEFAULT_NUM_SUPERBLOCKS,
644               num_blocks, SB_DEFAULT_NUM_SUPERBLOCKS);
645    new_bmp = (uint8_t*)calloc(1, bmp_size_byte);
646
647    // free pre-existing bmp index
648    _free_bmp_idx(&sb->bmp_idx);
649
650    for (i=0; i<blist.n_blocks; ++i) {
651        sb_bmp_set(new_bmp, blist.blocks[i].bid, blist.blocks[i].count);
652        if (i==0 &&
653            atomic_get_uint64_t(&sb->cur_alloc_bid) == BLK_NOT_FOUND) {
654            atomic_store_uint64_t(&sb->cur_alloc_bid, blist.blocks[i].bid);
655        }
656        sb->num_free_blocks += blist.blocks[i].count;
657        // add info for supplementary bmp index
658        _add_bmp_idx(&sb->bmp_idx, blist.blocks[i].bid, blist.blocks[i].count);
659    }
660    free(blist.blocks);
661
662    sb_bmp_change_begin(sb);
663    old_bmp = sb->bmp.load(std::memory_order_relaxed);
664    sb->bmp.store(new_bmp, std::memory_order_relaxed);
665    atomic_store_uint64_t(&sb->bmp_size, num_blocks);
666    sb->min_live_hdr_revnum = sheader.revnum;
667    sb->min_live_hdr_bid = sheader.bid;
668    atomic_incr_uint64_t(&sb->bmp_revnum);
669    sb->num_init_free_blocks = sb->num_free_blocks;
670    sb_bmp_change_end(sb);
671    free(old_bmp);
672
673    return true;
674}
675
676bool sb_reserve_next_reusable_blocks(fdb_kvs_handle *handle)
677{
678    size_t i;
679    uint64_t num_blocks, bmp_size_byte;
680    stale_header_info sheader;
681    reusable_block_list blist;
682    struct superblock *sb = handle->file->sb;
683    struct sb_rsv_bmp *rsv = NULL;
684
685    if (sb->rsv_bmp) {
686        // next bitmap already reclaimed
687        return false;
688    }
689
690    sheader = fdb_get_smallest_active_header(handle);
691    if (sheader.bid == BLK_NOT_FOUND) {
692        return false;
693    }
694
695    // get reusable block list
696    blist = fdb_get_reusable_block(handle, sheader);
697
698    // calculate bitmap size
699    num_blocks = filemgr_get_pos(handle->file) / handle->file->blocksize;
700    bmp_size_byte = (num_blocks+7) / 8;
701    if (num_blocks) {
702        rsv = (struct sb_rsv_bmp*)calloc(1, sizeof(struct sb_rsv_bmp));
703        rsv->bmp = (uint8_t*)calloc(1, bmp_size_byte);
704        rsv->cur_alloc_bid = BLK_NOT_FOUND;
705
706        // the initial status is 'INITIALIZING' so that 'rsv_bmp' is not
707        // available until executing sb_rsv_append_doc().
708        atomic_init_uint32_t(&rsv->status, SB_RSV_INITIALIZING);
709        avl_init(&rsv->bmp_idx, NULL);
710        rsv->bmp_size = num_blocks;
711
712        for (i=0; i<blist.n_blocks; ++i) {
713            sb_bmp_set(rsv->bmp, blist.blocks[i].bid, blist.blocks[i].count);
714            if (i==0 && rsv->cur_alloc_bid == BLK_NOT_FOUND) {
715                rsv->cur_alloc_bid = blist.blocks[i].bid;
716            }
717            rsv->num_free_blocks += blist.blocks[i].count;
718            _add_bmp_idx(&rsv->bmp_idx, blist.blocks[i].bid, blist.blocks[i].count);
719        }
720        free(blist.blocks);
721
722        rsv->min_live_hdr_revnum = sheader.revnum;
723        rsv->min_live_hdr_bid = sheader.bid;
724        rsv->bmp_revnum = atomic_get_uint64_t(&sb->bmp_revnum)+1;
725        sb->rsv_bmp = rsv;
726    }
727
728    return true;
729}
730
731static void _rsv_free(struct sb_rsv_bmp *rsv);
732void sb_return_reusable_blocks(fdb_kvs_handle *handle)
733{
734    uint64_t node_id;
735    bid_t cur;
736    struct superblock *sb = handle->file->sb;
737    struct sb_rsv_bmp *rsv;
738    struct avl_node *a;
739    struct bmp_idx_node *item, query;
740
741    if (!sb) {
742        // we don't need to do this.
743        return;
744    }
745
746    // re-insert all remaining bitmap into stale list
747    uint64_t sb_bmp_size = atomic_get_uint64_t(&sb->bmp_size);
748    for (cur = atomic_get_uint64_t(&sb->cur_alloc_bid); cur < sb_bmp_size; ++cur) {
749        if (_is_bmp_set(sb->bmp, cur)) {
750            filemgr_add_stale_block(handle->file, cur, 1);
751        }
752
753        if ((cur % 256) == 0 && cur > 0) {
754            // node ID changes
755            // remove & free current bmp node
756            node_id = cur / 256;
757            query.id = node_id - 1;
758            a = avl_search(&sb->bmp_idx, &query.avl, _bmp_idx_cmp);
759            if (a) {
760                item = _get_entry(a, struct bmp_idx_node, avl);
761                avl_remove(&sb->bmp_idx, a);
762                free(item);
763            }
764
765            // move to next bmp node
766            do {
767                a = avl_first(&sb->bmp_idx);
768                if (a) {
769                    item = _get_entry(a, struct bmp_idx_node, avl);
770                    if (item->id <= node_id) {
771                        avl_remove(&sb->bmp_idx, a);
772                        free(item);
773                        continue;
774                    }
775                    cur = item->id * 256;
776                    break;
777                }
778
779                // no more reusable block
780                cur = sb_bmp_size;
781                break;
782            } while (true);
783        }
784    }
785    sb->num_free_blocks = 0;
786    atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
787
788    // do the same work for the reserved blocks if exist
789    rsv = sb->rsv_bmp;
790    if (rsv &&
791        atomic_cas_uint32_t(&rsv->status, SB_RSV_READY, SB_RSV_VOID)) {
792
793        for (cur = rsv->cur_alloc_bid; cur < rsv->bmp_size; ++cur) {
794            if (_is_bmp_set(rsv->bmp, cur)) {
795                filemgr_add_stale_block(handle->file, cur, 1);
796            }
797
798            if ((cur % 256) == 0 && cur > 0) {
799                // node ID changes
800                // remove & free current bmp node
801                node_id = cur / 256;
802                query.id = node_id - 1;
803                a = avl_search(&rsv->bmp_idx, &query.avl, _bmp_idx_cmp);
804                if (a) {
805                    item = _get_entry(a, struct bmp_idx_node, avl);
806                    avl_remove(&rsv->bmp_idx, a);
807                    free(item);
808                }
809
810                // move to next bmp node
811                do {
812                    a = avl_first(&rsv->bmp_idx);
813                    if (a) {
814                        item = _get_entry(a, struct bmp_idx_node, avl);
815                        if (item->id <= node_id) {
816                            avl_remove(&rsv->bmp_idx, a);
817                            free(item);
818                            continue;
819                        }
820                        cur = item->id * 256;
821                        break;
822                    }
823
824                    // no more reusable block
825                    cur = rsv->bmp_size;
826                    break;
827                } while (true);
828            }
829        }
830        rsv->num_free_blocks = 0;
831        rsv->cur_alloc_bid = BLK_NOT_FOUND;
832
833        _free_bmp_idx(&rsv->bmp_idx);
834        _rsv_free(rsv);
835        free(rsv);
836        sb->rsv_bmp = NULL;
837    }
838
839    // re-store into stale tree using next header's revnum
840    filemgr_header_revnum_t revnum = handle->cur_header_revnum;
841    fdb_gather_stale_blocks(handle, revnum+1, BLK_NOT_FOUND, BLK_NOT_FOUND, 0, NULL, false);
842}
843
844void sb_bmp_mask_init()
845{
846    // preset masks to speed up bitmap set/clear operations
847    size_t i, pos, len;
848    for (i=0; i<8; ++i) {
849        bmp_basic_mask[i] = 0x1 << (7-i);
850    }
851    for (pos=0; pos<8; ++pos) {
852        for (len=0; len<9; ++len) {
853            bmp_2d_mask[pos][len] = 0x0;
854            if (len != 0 && pos+len <= 8) {
855                for (i=0; i<len; ++i) {
856                    bmp_2d_mask[pos][len] |= bmp_basic_mask[pos+i];
857                }
858            }
859        }
860    }
861}
862
863INLINE void _sb_bmp_update(uint8_t *bmp, bid_t bid, uint64_t len, int mode)
864{
865    // mode==0: bitmap clear
866    // mode==1: bitmap set
867
868    uint64_t front_pos, front_len, rear_pos, rear_len;
869    uint64_t mid_pos, mid_len;
870
871    //      front_len        rear_len
872    //      <->              <-->
873    // 00000111 | 11111111 | 11110000
874    //      ^     <------> mid
875    //      front_pos
876
877    front_pos = bid;
878    front_len = 8 - mod8(front_pos);
879
880    if (front_len >= len) {
881        front_len = len;
882        rear_pos = rear_len = mid_pos = mid_len = 0;
883    } else {
884        rear_pos = rd8(bid + len);
885        rear_len = mod8(bid + len);
886
887        mid_pos = bid + front_len;
888        mid_len = len - front_len - rear_len;
889    }
890
891    // front bitmaps
892    if (front_len) {
893        if (mode) {
894            bmp[div8(front_pos)] |= bmp_2d_mask[mod8(front_pos)][front_len];
895        } else {
896            bmp[div8(front_pos)] &= ~bmp_2d_mask[mod8(front_pos)][front_len];
897        }
898    }
899    // rear bitmaps
900    if (rear_len) {
901        if (mode) {
902            bmp[div8(rear_pos)] |= bmp_2d_mask[mod8(rear_pos)][rear_len];
903        } else {
904            bmp[div8(rear_pos)] &= ~bmp_2d_mask[mod8(rear_pos)][rear_len];
905        }
906    }
907
908    // mid bitmaps
909    uint8_t mask = (mode)?(0xff):(0x0);
910    if (mid_len == 8) {
911        // 8 bitmaps (1 byte)
912        bmp[div8(mid_pos)] = mask;
913    } else if (mid_len < 64) {
914        // 16 ~ 56 bitmaps (2 ~ 7 bytes)
915        size_t i;
916        for (i=0; i<mid_len; i+=8) {
917            bmp[div8(mid_pos+i)] = mask;
918        }
919    } else {
920        // larger than 64 bitmaps (8 bytes)
921        memset(bmp + div8(mid_pos), mask, div8(mid_len));
922    }
923}
924
925void sb_bmp_set(uint8_t *bmp, bid_t bid, uint64_t len)
926{
927    _sb_bmp_update(bmp, bid, len, 1);
928}
929
930void sb_bmp_clear(uint8_t *bmp, bid_t bid, uint64_t len)
931{
932    _sb_bmp_update(bmp, bid, len, 0);
933}
934
935bool sb_switch_reserved_blocks(struct filemgr *file)
936{
937    size_t i;
938    struct superblock *sb = file->sb;
939    struct sb_rsv_bmp *rsv = sb->rsv_bmp;
940
941    // reserved block should exist
942    if (!rsv) {
943        return false;
944    }
945    // should be in a normal status
946    if (!atomic_cas_uint32_t(&rsv->status, SB_RSV_READY, SB_RSV_VOID)) {
947        return false;
948    }
949    // now status becomes 'VOID' so that rsv_bmp is not available.
950
951    // mark stale previous system docs
952    if (sb->bmp_doc_offset) {
953        for (i=0; i<sb->num_bmp_docs; ++i) {
954            filemgr_mark_stale(file, sb->bmp_doc_offset[i],
955                _fdb_get_docsize(sb->bmp_docs[i].length));
956        }
957
958        free(sb->bmp_doc_offset);
959        free(sb->bmp_docs);
960        sb->bmp_doc_offset = NULL;
961        sb->bmp_docs = NULL;
962    }
963
964    // should flush all dirty blocks in cache
965    filemgr_sync(file, false, NULL);
966
967    // free current bitmap idx
968    _free_bmp_idx(&sb->bmp_idx);
969
970    // temporarily keep current bitmap
971    sb_bmp_change_begin(sb);
972    uint8_t *old_prev_bmp = NULL;
973    if (sb->bmp_prev) {
974        old_prev_bmp = sb->bmp_prev;
975    }
976    sb->bmp_prev = sb->bmp;
977    sb->bmp_prev_size = atomic_get_uint64_t(&sb->bmp_size);
978
979    // copy all pointers from rsv to sb
980    atomic_store(&sb->bmp_revnum, rsv->bmp_revnum);
981    atomic_store_uint64_t(&sb->bmp_size, rsv->bmp_size);
982    sb->bmp = rsv->bmp;
983    sb->bmp_idx = rsv->bmp_idx;
984    sb->bmp_doc_offset = rsv->bmp_doc_offset;
985    sb->bmp_docs = rsv->bmp_docs;
986    sb->num_bmp_docs = rsv->num_bmp_docs;
987    sb->num_free_blocks = sb->num_init_free_blocks = rsv->num_free_blocks;
988    atomic_store_uint64_t(&sb->cur_alloc_bid, rsv->cur_alloc_bid);
989    sb->min_live_hdr_revnum = rsv->min_live_hdr_revnum;
990    sb->min_live_hdr_bid = rsv->min_live_hdr_bid;
991    sb_bmp_change_end(sb);
992
993    free(old_prev_bmp);
994    free(sb->rsv_bmp);
995    sb->rsv_bmp = NULL;
996
997    return true;
998}
999
1000bid_t sb_alloc_block(struct filemgr *file)
1001{
1002    uint64_t i, node_idx, node_off, bmp_idx, bmp_off;
1003    bid_t ret = BLK_NOT_FOUND;
1004    struct superblock *sb = file->sb;
1005    struct avl_node *a;
1006    struct bmp_idx_node *item, query;
1007
1008    sb->num_alloc++;
1009sb_alloc_start_over:
1010    if (!sb_bmp_exists(sb)) {
1011        // no bitmap
1012        return BLK_NOT_FOUND;
1013    }
1014
1015    if (sb->num_free_blocks == 0) {
1016        bool switched = false;
1017        if (sb->rsv_bmp) {
1018            switched = sb_switch_reserved_blocks(file);
1019        }
1020        if (switched) {
1021            goto sb_alloc_start_over;
1022        } else {
1023            atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
1024            return BLK_NOT_FOUND;
1025        }
1026    }
1027
1028    ret = atomic_get_uint64_t(&sb->cur_alloc_bid);
1029    sb->num_free_blocks--;
1030
1031    if (sb->num_free_blocks == 0) {
1032        bool switched = false;
1033        if (sb->rsv_bmp) {
1034            switched = sb_switch_reserved_blocks(file);
1035        }
1036        if (!switched) {
1037            atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
1038        }
1039        return ret;
1040    }
1041
1042    // find allocable block in the same bmp idx node
1043    node_idx = ret >> 8;
1044    node_off = (ret & 0xff)+1;
1045    do {
1046        for (i=node_off; i<256; ++i) {
1047            bmp_idx = div8(i) + (node_idx * 32);
1048            bmp_off = mod8(i);
1049
1050            if (bmp_idx*8 + bmp_off >= atomic_get_uint64_t(&sb->bmp_size)) {
1051                break;
1052            }
1053
1054            if (sb->bmp[bmp_idx] & bmp_basic_mask[bmp_off]) {
1055                atomic_store_uint64_t(&sb->cur_alloc_bid, bmp_idx*8 + bmp_off);
1056                return ret;
1057            }
1058        }
1059
1060        // current bmp_node does not include any free block .. remove
1061        query.id = node_idx;
1062        a = avl_search(&sb->bmp_idx, &query.avl, _bmp_idx_cmp);
1063        if (a) {
1064            item = _get_entry(a, struct bmp_idx_node, avl);
1065            avl_remove(&sb->bmp_idx, a);
1066            free(item);
1067        }
1068
1069        // get next allocable bmp_node
1070        a = avl_first(&sb->bmp_idx);
1071        if (!a) {
1072            // no more free bmp_node
1073            sb->num_free_blocks = 0;
1074            bool switched = false;
1075            if (sb->rsv_bmp) {
1076                switched = sb_switch_reserved_blocks(file);
1077            }
1078            if (!switched) {
1079                atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
1080            }
1081            break;
1082        }
1083        item = _get_entry(a, struct bmp_idx_node, avl);
1084        node_idx = item->id;
1085        node_off = 0;
1086    } while (true);
1087
1088    return ret;
1089}
1090
1091bool sb_bmp_is_writable(struct filemgr *file, bid_t bid)
1092{
1093    if (bid < file->sb->config->num_sb) {
1094        // superblocks are always writable
1095        return true;
1096    }
1097
1098    bool ret = false;
1099    bid_t last_commit = atomic_get_uint64_t(&file->last_commit) / file->blocksize;
1100    uint64_t lw_bmp_revnum = atomic_get_uint64_t(&file->last_writable_bmp_revnum);
1101    struct superblock *sb = file->sb;
1102
1103    sb_bmp_barrier_on(sb);
1104
1105    uint8_t *sb_bmp = sb->bmp;
1106    if (atomic_get_uint64_t(&sb->bmp_revnum) == lw_bmp_revnum) {
1107        // Same bitmap revision number: there are 2 possible cases
1108        //
1109        // (1) normal case
1110        //               writable blocks
1111        //            <---->          <--->
1112        // +-------------------------------------------+
1113        // |       xxxxxxxxx          xxxxxxxxxx       | (x: reused blocks)
1114        // +-------------------------------------------+
1115        //            ^                   ^
1116        //            last_commit         cur_alloc
1117        //
1118        // (2) when file size grows after block reusing
1119        //                                   writable blocks
1120        //                             <------->       <--------->
1121        // +-------------------------------------------+---------+
1122        // |       xxxxxxxxx          xxxxxxxxxx       |         |
1123        // +-------------------------------------------+---------+
1124        //                             ^               ^         ^
1125        //                             last_commit     bmp_size  cur_alloc
1126
1127        if (bid < atomic_get_uint64_t(&sb->bmp_size)) {
1128            // BID is in the bitmap .. check if bitmap is set.
1129            if (_is_bmp_set(sb_bmp, bid) &&
1130                bid < atomic_get_uint64_t(&sb->cur_alloc_bid) &&
1131                bid >= last_commit) {
1132                ret = true;
1133            }
1134        } else {
1135            // BID is out-of-range of the bitmap
1136            if (bid >= last_commit) {
1137                ret = true;
1138            }
1139        }
1140    } else {
1141        // Different bitmap revision number: there are also 2 possible cases
1142        //
1143        // (1) normal case
1144        //     writable blocks                 writable blocks
1145        //         <---->                          <--->
1146        // +-------------------------------------------+
1147        // |       xxxxxxxxx          xxxxxxxxxx       |
1148        // +-------------------------------------------+
1149        //              ^                          ^
1150        //              cur_alloc                  last_commit
1151        //
1152        // (2) when file size grows after block reusing
1153        //         writable blocks             writable blocks
1154        //      <---->       <------>      <-----------x--------->
1155        // +-------------------------------------------+---------+
1156        // |    xxxxxx       xxxxxxxx                  |         |
1157        // +-------------------------------------------+---------+
1158        //                                 ^           ^         ^
1159        //                                 last_commit bmp_size  cur_alloc
1160
1161        // the block is writable if
1162        // 1) BID >= last_commit OR
1163        // 2) BID < cur_alloc_bid AND corresponding bitmap is set.
1164        if (bid >= last_commit) {
1165            // if prev_bmp exists, last commit position is still located on the
1166            // previous bitmap (since prev_bmp is released when a commit is invoked
1167            // in the new bitmap).
1168            // So in this case, we have to check both previous/current bitmaps.
1169            //
1170            // (1: previous bitmap (sb->bmp_prev), 2: current bitmap (sb->bmp))
1171            //
1172            //        writable blocks          writable blocks
1173            //    <-->      <---->   <-->          <> <> <>
1174            // +-------------------------------------------+
1175            // |  2222 1111 222222   2222   111111111 22 11|
1176            // +-------------------------------------------+
1177            //                                     ^
1178            //                                     last_commit
1179            if (sb->bmp_prev) {
1180                if (bid < sb->bmp_prev_size &&
1181                    _is_bmp_set(sb->bmp_prev, bid)) {
1182                    ret = true;
1183                }
1184                if (bid < atomic_get_uint64_t(&sb->bmp_size) &&
1185                    _is_bmp_set(sb_bmp, bid)) {
1186                    ret = true;
1187                }
1188                if (bid >= atomic_get_uint64_t(&sb->bmp_size)) {
1189                    // blocks newly allocated beyond the bitmap size:
1190                    // always writable
1191                    ret = true;
1192                }
1193            } else {
1194                // bmp_prev doesn't exist even though bmp_revnum is different
1195                // this happens on the first block reclaim only
1196                // so all blocks whose 'BID >= last_commit' are writable.
1197                ret = true;
1198            }
1199        }
1200
1201        if (bid < atomic_get_uint64_t(&sb->bmp_size) &&
1202            bid < atomic_get_uint64_t(&sb->cur_alloc_bid) &&
1203            _is_bmp_set(sb_bmp, bid)) {
1204            // 'cur_alloc_bid' is always smaller than 'bmp_size'
1205            // except for the case 'cur_alloc_bid == BLK_NOT_FOUND'
1206            ret = true;
1207        }
1208    }
1209
1210    sb_bmp_barrier_off(sb);
1211
1212    return ret;
1213}
1214
1215fdb_status sb_write(struct filemgr *file, size_t sb_no,
1216                    err_log_callback * log_callback)
1217{
1218    int r;
1219    int real_blocksize = file->blocksize;
1220    int blocksize = file->blocksize - BLK_MARKER_SIZE;
1221    uint8_t *buf = alca(uint8_t, real_blocksize);
1222    uint32_t crc, _crc;
1223    uint64_t enc_u64;
1224    uint64_t num_docs;
1225    size_t i, offset;
1226    fdb_status fs;
1227
1228    memset(buf, 0x0, real_blocksize);
1229
1230    offset = 0;
1231    // magic number
1232    enc_u64 = _endian_encode(file->version);
1233    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1234    offset += sizeof(enc_u64);
1235
1236    // revision number
1237    uint64_t sb_revnum = atomic_get_uint64_t(&file->sb->revnum);
1238    enc_u64 = _endian_encode(sb_revnum);
1239    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1240    offset += sizeof(enc_u64);
1241
1242    // bitmap's revision number
1243    enc_u64 = _endian_encode(atomic_get_uint64_t(&file->sb->bmp_revnum));
1244    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1245    offset += sizeof(enc_u64);
1246
1247    // cur_alloc_bid
1248    bid_t sb_cur_alloc_bid = atomic_get_uint64_t(&file->sb->cur_alloc_bid);
1249    enc_u64 = _endian_encode(sb_cur_alloc_bid);
1250    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1251    offset += sizeof(enc_u64);
1252
1253    // last header bid
1254    bid_t sb_last_hdr_bid = atomic_get_uint64_t(&file->sb->last_hdr_bid);
1255    enc_u64 = _endian_encode(sb_last_hdr_bid);
1256    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1257    offset += sizeof(enc_u64);
1258
1259    // last header rev number
1260    uint64_t sb_last_hdr_revnum = atomic_get_uint64_t(&file->sb->last_hdr_revnum);
1261    enc_u64 = _endian_encode(sb_last_hdr_revnum);
1262    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1263    offset += sizeof(enc_u64);
1264
1265    // minimum active header revnum
1266    enc_u64 = _endian_encode(file->sb->min_live_hdr_revnum);
1267    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1268    offset += sizeof(enc_u64);
1269
1270    // minimum active header BID
1271    enc_u64 = _endian_encode(file->sb->min_live_hdr_bid);
1272    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1273    offset += sizeof(enc_u64);
1274
1275    // # initial free blocks
1276    enc_u64 = _endian_encode(file->sb->num_init_free_blocks);
1277    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1278    offset += sizeof(enc_u64);
1279
1280    // # free blocks
1281    enc_u64 = _endian_encode(file->sb->num_free_blocks);
1282    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1283    offset += sizeof(enc_u64);
1284
1285    // bitmap size
1286    uint64_t sb_bmp_size = atomic_get_uint64_t(&file->sb->bmp_size);
1287    enc_u64 = _endian_encode(sb_bmp_size);
1288    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1289    offset += sizeof(enc_u64);
1290
1291    bool rsv_bmp_enabled = false;
1292
1293    if ( file->sb->rsv_bmp &&
1294         atomic_cas_uint32_t(&file->sb->rsv_bmp->status,
1295                             SB_RSV_READY, SB_RSV_WRITING) ) {
1296        rsv_bmp_enabled = true;
1297        // status becomes 'WRITING' so that switching will be postponed.
1298        // note that 'rsv_bmp' is not currently used yet so that
1299        // it won't block any other tasks except for switching.
1300    }
1301
1302    // reserved bitmap size (0 if not exist)
1303    if (rsv_bmp_enabled) {
1304        enc_u64 = _endian_encode(file->sb->rsv_bmp->bmp_size);
1305    } else {
1306        enc_u64 = 0;
1307    }
1308    memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1309    offset += sizeof(enc_u64);
1310
1311    // bitmap doc offsets
1312    num_docs = _bmp_size_to_num_docs(sb_bmp_size);
1313    for (i=0; i<num_docs; ++i) {
1314        enc_u64 = _endian_encode(file->sb->bmp_doc_offset[i]);
1315        memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1316        offset += sizeof(enc_u64);
1317    }
1318
1319    // reserved bitmap doc offsets
1320    if (rsv_bmp_enabled) {
1321        num_docs = _bmp_size_to_num_docs(file->sb->rsv_bmp->bmp_size);
1322        for (i=0; i<num_docs; ++i) {
1323            enc_u64 = _endian_encode(file->sb->rsv_bmp->bmp_doc_offset[i]);
1324            memcpy(buf + offset, &enc_u64, sizeof(enc_u64));
1325            offset += sizeof(enc_u64);
1326        }
1327
1328        atomic_store_uint32_t(&file->sb->rsv_bmp->status, SB_RSV_READY);
1329    }
1330
1331    // CRC
1332    crc = get_checksum(buf, offset, file->crc_mode);
1333    _crc = _endian_encode(crc);
1334    memcpy(buf + offset, &_crc, sizeof(_crc));
1335
1336    // set block marker
1337    memset(buf + blocksize, BLK_MARKER_SB, BLK_MARKER_SIZE);
1338
1339    // directly write a block bypassing block cache
1340    r = file->ops->pwrite(file->fd, buf, real_blocksize, sb_no * real_blocksize);
1341    if (r != real_blocksize) {
1342        char errno_msg[512];
1343        file->ops->get_errno_str(errno_msg, 512);
1344        fs = FDB_RESULT_SB_RACE_CONDITION;
1345        fdb_log(log_callback, fs,
1346                "Failed to write the superblock (number: %" _F64 "), %s",
1347                sb_no, errno_msg);
1348        return fs;
1349    }
1350
1351    // increase superblock's revision number
1352    atomic_incr_uint64_t(&file->sb->revnum);
1353
1354    return FDB_RESULT_SUCCESS;
1355}
1356
1357static void _rsv_free(struct sb_rsv_bmp *rsv)
1358{
1359    free(rsv->bmp);
1360    free(rsv->bmp_doc_offset);
1361    free(rsv->bmp_docs);
1362}
1363
1364static fdb_status _sb_read_given_no(struct filemgr *file,
1365                                    size_t sb_no,
1366                                    struct superblock *sb,
1367                                    err_log_callback *log_callback)
1368{
1369    int r;
1370    int real_blocksize = file->blocksize;
1371    int blocksize = file->blocksize - BLK_MARKER_SIZE;
1372    size_t i, num_docs;
1373    uint8_t *buf = alca(uint8_t, real_blocksize);
1374    uint32_t crc_file, crc, _crc;
1375    uint64_t enc_u64, version, offset, dummy64;
1376    fdb_status fs;
1377    struct sb_rsv_bmp *rsv = NULL;
1378
1379    memset(buf, 0x0, real_blocksize);
1380    offset = 0;
1381
1382    // directly read a block bypassing block cache
1383    r = file->ops->pread(file->fd, buf, real_blocksize, sb_no * real_blocksize);
1384    if (r != real_blocksize) {
1385        char errno_msg[512];
1386        file->ops->get_errno_str(errno_msg, 512);
1387        fs = FDB_RESULT_SB_READ_FAIL;
1388        fdb_log(log_callback, fs,
1389                "Failed to read the superblock: file read failure (SB No.: %" _F64 "), %s",
1390                sb_no, errno_msg);
1391        return fs;
1392    }
1393
1394    // block marker check
1395    if (buf[blocksize] != BLK_MARKER_SB) {
1396        fs = FDB_RESULT_SB_READ_FAIL;
1397        fdb_log(log_callback, fs,
1398                "Failed to read the superblock: "
1399                "incorrect block marker (marker: %x, SB No.: %" _F64 "). "
1400                "Note: this message might be a false alarm if upgrade is running.",
1401                buf[blocksize], sb_no);
1402        return fs;
1403    }
1404
1405    // magic number
1406    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1407    offset += sizeof(enc_u64);
1408    version = _endian_decode(enc_u64);
1409
1410    // version check
1411    if (!ver_superblock_support(version)) {
1412        fs = FDB_RESULT_SB_READ_FAIL;
1413        fdb_log(log_callback, fs,
1414                "Failed to read the superblock: "
1415                "not supported version (magic: %" _F64 ", SB No.: %" _F64 ")",
1416                version, sb_no);
1417        return fs;
1418    }
1419
1420    // revision number
1421    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1422    offset += sizeof(enc_u64);
1423    atomic_store_uint64_t(&sb->revnum, _endian_decode(enc_u64));
1424
1425    // bitmap's revision number
1426    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1427    offset += sizeof(enc_u64);
1428    atomic_store_uint64_t(&sb->bmp_revnum, _endian_decode(enc_u64));
1429
1430    // cur_alloc_bid
1431    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1432    offset += sizeof(enc_u64);
1433    atomic_store_uint64_t(&sb->cur_alloc_bid, _endian_decode(enc_u64));
1434
1435    // last header bid
1436    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1437    offset += sizeof(enc_u64);
1438    atomic_store_uint64_t(&sb->last_hdr_bid, _endian_decode(enc_u64));
1439
1440    // last header rev number
1441    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1442    offset += sizeof(enc_u64);
1443    atomic_store_uint64_t(&sb->last_hdr_revnum, _endian_decode(enc_u64));
1444
1445    // minimum active header revnum
1446    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1447    offset += sizeof(enc_u64);
1448    sb->min_live_hdr_revnum = _endian_decode(enc_u64);
1449
1450    // minimum active header BID
1451    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1452    offset += sizeof(enc_u64);
1453    sb->min_live_hdr_bid = _endian_decode(enc_u64);
1454
1455    // # initial free blocks
1456    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1457    offset += sizeof(enc_u64);
1458    sb->num_init_free_blocks = _endian_decode(enc_u64);
1459
1460    // # free blocks
1461    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1462    offset += sizeof(enc_u64);
1463    sb->num_free_blocks = _endian_decode(enc_u64);
1464
1465    // bitmap size
1466    uint64_t sb_bmp_size;
1467    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1468    offset += sizeof(enc_u64);
1469    sb_bmp_size = _endian_decode(enc_u64);
1470    atomic_store_uint64_t(&sb->bmp_size, sb_bmp_size);
1471
1472    // reserved bitmap size
1473    memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1474    offset += sizeof(enc_u64);
1475    dummy64 = _endian_decode(enc_u64);
1476    if (dummy64) {
1477        // reserved bitmap array exists
1478        rsv = (struct sb_rsv_bmp*)calloc(1, sizeof(struct sb_rsv_bmp));
1479        rsv->bmp = NULL;
1480        rsv->bmp_size = dummy64;
1481        rsv->cur_alloc_bid = BLK_NOT_FOUND;
1482        atomic_init_uint32_t(&rsv->status, SB_RSV_INITIALIZING);
1483    }
1484
1485    // temporarily set bitmap array to NULL
1486    // (it will be allocated by fetching function)
1487    sb->bmp = NULL;
1488
1489    sb->num_bmp_docs = num_docs = _bmp_size_to_num_docs(sb_bmp_size);
1490    if (num_docs) {
1491        sb->bmp_doc_offset = (bid_t*)calloc(num_docs, sizeof(bid_t));
1492        sb->bmp_docs = (struct docio_object*)
1493                       calloc(num_docs, sizeof(struct docio_object));
1494    }
1495
1496    // read doc offsets
1497    for (i=0; i<num_docs; ++i) {
1498        memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1499        offset += sizeof(enc_u64);
1500        sb->bmp_doc_offset[i] = _endian_decode(enc_u64);
1501    }
1502
1503    // read reserved bmp docs if exist
1504    if (rsv) {
1505        rsv->num_bmp_docs = num_docs = _bmp_size_to_num_docs(rsv->bmp_size);
1506        if (rsv->num_bmp_docs) {
1507            rsv->bmp_doc_offset = (bid_t*)calloc(num_docs, sizeof(bid_t));
1508            rsv->bmp_docs = (struct docio_object*)
1509                            calloc(num_docs, sizeof(struct docio_object));
1510        }
1511
1512        // read doc offsets
1513        for (i=0; i<num_docs; ++i) {
1514            memcpy(&enc_u64, buf + offset, sizeof(enc_u64));
1515            offset += sizeof(enc_u64);
1516            rsv->bmp_doc_offset[i] = _endian_decode(enc_u64);
1517        }
1518        sb->rsv_bmp = rsv;
1519    }
1520
1521    // CRC
1522    crc = get_checksum(buf, offset, file->crc_mode);
1523    memcpy(&_crc, buf + offset, sizeof(_crc));
1524    crc_file = _endian_decode(_crc);
1525    if (crc != crc_file) {
1526        free(sb->bmp_doc_offset);
1527        free(sb->bmp_docs);
1528        sb->bmp_doc_offset = NULL;
1529        sb->bmp_docs = NULL;
1530
1531        fs = FDB_RESULT_SB_READ_FAIL;
1532        fdb_log(log_callback, fs,
1533                "Failed to read the superblock: "
1534                "not supported version (magic: %" _F64 ", SB No.: %" _F64 ")",
1535                version, sb_no);
1536        return fs;
1537    }
1538
1539    return FDB_RESULT_SUCCESS;
1540}
1541
1542static void _sb_free(struct superblock *sb)
1543{
1544    if (sb->rsv_bmp) {
1545        atomic_store_uint32_t(&sb->rsv_bmp->status, SB_RSV_VOID);
1546        _free_bmp_idx(&sb->rsv_bmp->bmp_idx);
1547        _rsv_free(sb->rsv_bmp);
1548        free(sb->rsv_bmp);
1549    }
1550    _free_bmp_idx(&sb->bmp_idx);
1551
1552    free(sb->bmp);
1553    free(sb->bmp_prev);
1554    free(sb->bmp_doc_offset);
1555    // note that each docio object doesn't need to be freed
1556    // as key/body fields point to static memory regions.
1557    free(sb->bmp_docs);
1558    free(sb->config);
1559    spin_destroy(&sb->lock);
1560
1561    sb->bmp = NULL;
1562    sb->bmp_doc_offset = NULL;
1563    sb->bmp_docs = NULL;
1564    sb->config = NULL;
1565}
1566
1567void _sb_init(struct superblock *sb, struct sb_config sconfig)
1568{
1569    *sb->config = sconfig;
1570    atomic_init_uint64_t(&sb->revnum, 0);
1571    atomic_init_uint64_t(&sb->bmp_revnum, 0);
1572    atomic_init_uint64_t(&sb->bmp_size, 0);
1573    sb->bmp = NULL;
1574    atomic_init_uint64_t(&sb->bmp_rcount, 0);
1575    atomic_init_uint64_t(&sb->bmp_wcount, 0);
1576    spin_init(&sb->bmp_lock);
1577    sb->bmp_prev_size = 0;
1578    sb->bmp_prev = NULL;
1579    sb->bmp_doc_offset = NULL;
1580    sb->bmp_docs = NULL;
1581    sb->num_bmp_docs = 0;
1582    sb->num_init_free_blocks = 0;
1583    sb->num_free_blocks = 0;
1584    atomic_store_uint64_t(&sb->cur_alloc_bid, BLK_NOT_FOUND);
1585    atomic_init_uint64_t(&sb->last_hdr_bid, BLK_NOT_FOUND);
1586    sb->min_live_hdr_revnum = 0;
1587    sb->min_live_hdr_bid = BLK_NOT_FOUND;
1588    atomic_init_uint64_t(&sb->last_hdr_revnum, 0);
1589    sb->num_alloc = 0;
1590    sb->rsv_bmp = NULL;
1591    avl_init(&sb->bmp_idx, NULL);
1592    spin_init(&sb->lock);
1593}
1594
1595INLINE void _sb_copy(struct superblock *dst, struct superblock *src)
1596{
1597    // since variables in 'src' won't be freed, just copy its pointers
1598    dst->config = src->config;
1599    atomic_store_uint64_t(&dst->revnum, atomic_get_uint64_t(&src->revnum));
1600    atomic_store_uint64_t(&dst->bmp_revnum, atomic_get_uint64_t(&src->bmp_revnum));
1601    atomic_store_uint64_t(&dst->bmp_size, atomic_get_uint64_t(&src->bmp_size));
1602    dst->bmp.store(src->bmp.load(std::memory_order_relaxed), std::memory_order_relaxed);
1603    atomic_store_uint64_t(&dst->bmp_rcount, atomic_get_uint64_t(&src->bmp_rcount));
1604    atomic_store_uint64_t(&dst->bmp_wcount, atomic_get_uint64_t(&src->bmp_wcount));
1605    spin_init(&dst->bmp_lock);
1606    dst->bmp_prev_size = src->bmp_prev_size;
1607    dst->bmp_prev = src->bmp_prev;
1608    dst->bmp_idx = src->bmp_idx;
1609    dst->bmp_doc_offset = src->bmp_doc_offset;
1610    dst->bmp_docs = src->bmp_docs;
1611    dst->num_bmp_docs = src->num_bmp_docs;
1612    dst->num_init_free_blocks = src->num_init_free_blocks;
1613    dst->num_free_blocks = src->num_free_blocks;
1614    dst->rsv_bmp = src->rsv_bmp;
1615    atomic_store_uint64_t(&dst->cur_alloc_bid, atomic_get_uint64_t(&src->cur_alloc_bid));
1616    atomic_store_uint64_t(&dst->last_hdr_bid, atomic_get_uint64_t(&src->last_hdr_bid));
1617    dst->min_live_hdr_revnum = src->min_live_hdr_revnum;
1618    dst->min_live_hdr_bid = src->min_live_hdr_bid;
1619    atomic_store_uint64_t(&dst->last_hdr_revnum, atomic_get_uint64_t(&src->last_hdr_revnum));
1620    dst->num_alloc = src->num_alloc;
1621    spin_init(&dst->lock);
1622}
1623
1624fdb_status sb_read_latest(struct filemgr *file,
1625                          struct sb_config sconfig,
1626                          err_log_callback *log_callback)
1627{
1628    size_t i, max_sb_no = sconfig.num_sb;
1629    uint64_t max_revnum = 0;
1630    uint64_t revnum_limit = (uint64_t)0xffffffffffffffff;
1631    fdb_status fs;
1632    struct superblock *sb_arr;
1633
1634    if (file->sb) {
1635        // Superblock is already read previously.
1636        // This means that there are some problems with the current superblock
1637        // so that we have to read another candidate.
1638
1639        // Note: 'sb->revnum' denotes the revnum of next superblock to be
1640        // written, so we need to subtract 1 from it to get the revnum of
1641        // the current superblock successfully read from the file.
1642        revnum_limit = atomic_get_uint64_t(&file->sb->revnum) - 1;
1643        sb_free(file);
1644    }
1645
1646    sb_arr = alca(struct superblock,
1647                  sconfig.num_sb * sizeof(struct superblock));
1648    memset(sb_arr, 0x0, sconfig.num_sb * sizeof(struct superblock));
1649
1650    // read all superblocks
1651    for (i=0; i<sconfig.num_sb; ++i) {
1652        sb_arr[i].config = (struct sb_config*)calloc(1, sizeof(struct sb_config));
1653        _sb_init(&sb_arr[i], sconfig);
1654        fs = _sb_read_given_no(file, i, &sb_arr[i], log_callback);
1655
1656        uint64_t cur_revnum = atomic_get_uint64_t(&sb_arr[i].revnum);
1657        if (fs == FDB_RESULT_SUCCESS &&
1658            cur_revnum >= max_revnum &&
1659            cur_revnum < revnum_limit) {
1660            max_sb_no = i;
1661            max_revnum = cur_revnum;
1662        }
1663    }
1664
1665    if (max_sb_no == sconfig.num_sb) {
1666        // all superblocks are broken
1667        fs = FDB_RESULT_SB_READ_FAIL;
1668        for (i=0; i<sconfig.num_sb; ++i) {
1669            _sb_free(&sb_arr[i]);
1670        }
1671        // no readable superblock
1672        // (if not corrupted, it may be a normal old version file)
1673        return fs;
1674    }
1675
1676    file->sb = (struct superblock*)calloc(1, sizeof(struct superblock));
1677    _sb_copy(file->sb, &sb_arr[max_sb_no]);
1678
1679    // set last commit position
1680    if (atomic_get_uint64_t(&file->sb->cur_alloc_bid) != BLK_NOT_FOUND) {
1681        atomic_store_uint64_t(&file->last_commit,
1682                              atomic_get_uint64_t(&file->sb->cur_alloc_bid) *
1683                              file->config->blocksize);
1684    } else {
1685        // otherwise, last_commit == file->pos
1686        // (already set by filemgr_open() function)
1687    }
1688
1689    atomic_incr_uint64_t(&file->sb->revnum);
1690    avl_init(&file->sb->bmp_idx, NULL);
1691
1692    // free the other superblocks
1693    for (i=0; i<sconfig.num_sb; ++i) {
1694        if (i != max_sb_no) {
1695            _sb_free(&sb_arr[i]);
1696        }
1697    }
1698
1699    return FDB_RESULT_SUCCESS;
1700}
1701
1702uint64_t sb_get_bmp_revnum(struct filemgr *file)
1703{
1704    if (file->sb) {
1705        return atomic_get_uint64_t(&file->sb->bmp_revnum);
1706    } else {
1707        return 0;
1708    }
1709}
1710
1711filemgr_header_revnum_t sb_get_min_live_revnum(struct filemgr *file)
1712{
1713    if (file->sb) {
1714        return file->sb->min_live_hdr_revnum;
1715    } else {
1716        return 0;
1717    }
1718}
1719
1720uint64_t sb_get_num_free_blocks(struct filemgr *file)
1721{
1722    if (file->sb) {
1723        return file->sb->num_free_blocks;
1724    } else {
1725        return 0;
1726    }
1727}
1728
1729struct sb_config sb_get_default_config()
1730{
1731    struct sb_config ret;
1732    ret.num_sb = SB_DEFAULT_NUM_SUPERBLOCKS;
1733    return ret;
1734}
1735
1736fdb_status sb_init(struct filemgr *file, struct sb_config sconfig,
1737                   err_log_callback * log_callback)
1738{
1739    size_t i;
1740    bid_t sb_bid;
1741    fdb_status fs;
1742
1743    // exit if superblock already exists.
1744    if (file->sb) {
1745        return FDB_RESULT_SUCCESS;
1746    }
1747    // no data should be written in the file before initialization of superblock.
1748    if (filemgr_get_pos(file) > 0) {
1749        return FDB_RESULT_SB_INIT_FAIL;
1750    }
1751
1752    file->sb = (struct superblock*)calloc(1, sizeof(struct superblock));
1753    file->sb->config = (struct sb_config*)calloc(1, sizeof(struct sb_config));
1754    file->version = ver_get_latest_magic();
1755    _sb_init(file->sb, sconfig);
1756
1757    // write initial superblocks
1758    for (i=0; i<file->sb->config->num_sb; ++i) {
1759        // allocate
1760        sb_bid = filemgr_alloc(file, log_callback);
1761        if (sb_bid != i) {
1762            // other data was written during sb_write .. error
1763            fs = FDB_RESULT_SB_RACE_CONDITION;
1764            fdb_log(log_callback, fs,
1765                    "Other writer interfered during sb_write (number: %" _F64 ")",
1766                    i);
1767            free(file->sb->config);
1768            free(file->sb);
1769            return fs;
1770        }
1771
1772        fs = sb_write(file, i, log_callback);
1773        if (fs != FDB_RESULT_SUCCESS) {
1774            return fs;
1775        }
1776    }
1777
1778    return FDB_RESULT_SUCCESS;
1779}
1780
1781fdb_status sb_free(struct filemgr *file)
1782{
1783    if (file->sb) {
1784        _sb_free(file->sb);
1785        free(file->sb);
1786        file->sb = NULL;
1787    }
1788
1789    return FDB_RESULT_SUCCESS;
1790}
1791
1792
1793