xref: /6.0.3/forestdb/src/docio.cc (revision 3f8ea205)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21
22#include "docio.h"
23#include "wal.h"
24#include "fdb_internal.h"
25#include "version.h"
26#ifdef _DOC_COMP
27#include "snappy-c.h"
28#endif
29
30#include "memleak.h"
31
32void docio_init(struct docio_handle *handle,
33                struct filemgr *file,
34                bool compress_document_body)
35{
36    handle->file = file;
37    handle->curblock = BLK_NOT_FOUND;
38    handle->curpos = 0;
39    handle->cur_bmp_revnum_hash = 0;
40    handle->lastbid = BLK_NOT_FOUND;
41    handle->compress_document_body = compress_document_body;
42    malloc_align(handle->readbuffer, FDB_SECTOR_SIZE, file->blocksize);
43}
44
45void docio_free(struct docio_handle *handle)
46{
47    free_align(handle->readbuffer);
48}
49
50#ifdef __CRC32
51#define _add_blk_marker(file, bid, blocksize, marker, log_callback) \
52    filemgr_write_offset((file), (bid), (blocksize), BLK_MARKER_SIZE,\
53                         (marker), (false), (log_callback))
54#else
55#define _add_blk_marker(file, bid, blocksize, marker, log_callback) \
56    FDB_RESULT_SUCCESS
57#endif
58
59INLINE fdb_status _docio_fill_zero(struct docio_handle *handle, bid_t bid,
60                                   size_t pos)
61{
62    // Fill next few bytes (sizeof(struct docio_length)) with zero
63    // to avoid false positive docio_length checksum during file scanning.
64    // (Note that the checksum value of zero-filled docio_length is 0x6F.)
65
66    size_t blocksize = handle->file->blocksize;
67    size_t len_size = sizeof(struct docio_length);
68    uint8_t *zerobuf = alca(uint8_t, len_size);
69
70#ifdef __CRC32
71    if (ver_non_consecutive_doc(handle->file->version)) {
72        // new version: support non-consecutive document block
73        blocksize -= DOCBLK_META_SIZE;
74    } else {
75        // old version: block marker only
76        blocksize -= BLK_MARKER_SIZE;
77    }
78#endif
79
80    if (pos + len_size <= blocksize) {
81        // enough space in the block
82        memset(zerobuf, 0x0, len_size);
83        return filemgr_write_offset(handle->file, bid, pos, len_size,
84                                    zerobuf, false, handle->log_callback);
85    } else {
86        // lack of space .. we don't need to fill zero bytes.
87        return FDB_RESULT_SUCCESS;
88    }
89}
90
91bid_t docio_append_doc_raw(struct docio_handle *handle, uint64_t size, void *buf)
92{
93    uint32_t offset;
94    uint8_t marker[BLK_MARKER_SIZE];
95    size_t blocksize = handle->file->blocksize;
96    size_t real_blocksize = blocksize;
97    size_t remaining_space;
98    err_log_callback *log_callback = handle->log_callback;
99    bool non_consecutive = ver_non_consecutive_doc(handle->file->version);
100    struct docblk_meta blk_meta;
101
102    memset(&blk_meta, 0x0, sizeof(blk_meta));
103    blk_meta.marker = BLK_MARKER_DOC;
104    (void)blk_meta;
105
106#ifdef __CRC32
107    if (non_consecutive) {
108        // new version: support non-consecutive document block
109        blocksize -= DOCBLK_META_SIZE;
110    } else {
111        // old version: block marker only
112        blocksize -= BLK_MARKER_SIZE;
113    }
114    memset(marker, BLK_MARKER_DOC, BLK_MARKER_SIZE);
115#endif
116
117    if (handle->curblock == BLK_NOT_FOUND) {
118        // allocate new block
119        handle->cur_bmp_revnum_hash =
120            filemgr_get_sb_bmp_revnum(handle->file) & 0xff;
121        handle->curblock = filemgr_alloc(handle->file, log_callback);
122        handle->curpos = 0;
123    }
124    if (!filemgr_is_writable(handle->file, handle->curblock)) {
125        // mark remaining space in old block as stale
126        if (handle->curpos < real_blocksize) {
127            // this function will calculate block marker size automatically.
128            filemgr_mark_stale(handle->file,
129                               real_blocksize * handle->curblock + handle->curpos,
130                               blocksize - handle->curpos);
131        }
132        // allocate new block
133        handle->cur_bmp_revnum_hash =
134            filemgr_get_sb_bmp_revnum(handle->file) & 0xff;
135        handle->curblock = filemgr_alloc(handle->file, log_callback);
136        handle->curpos = 0;
137    }
138    blk_meta.sb_bmp_revnum_hash = _endian_encode(handle->cur_bmp_revnum_hash);
139
140    remaining_space = blocksize - handle->curpos;
141    if (size <= remaining_space) {
142        fdb_status fs = FDB_RESULT_SUCCESS;
143        // simply append to current block
144        offset = handle->curpos;
145
146        if (non_consecutive) {
147            // set next BID
148            blk_meta.next_bid = BLK_NOT_FOUND;
149            // write meta
150            fs = filemgr_write_offset(handle->file, handle->curblock,
151                                      blocksize, sizeof(blk_meta), &blk_meta,
152                                      false, log_callback);
153        } else {
154            fs = _add_blk_marker(handle->file, handle->curblock, blocksize, marker,
155                                 log_callback);
156        }
157
158
159        if (fs != FDB_RESULT_SUCCESS) {
160            fdb_log(log_callback, fs,
161                    "Error in appending a doc block marker for a block id %" _F64
162                    " into a database file '%s'", handle->curblock,
163                    handle->file->filename);
164            return BLK_NOT_FOUND;
165        }
166        fs = filemgr_write_offset(handle->file, handle->curblock, offset, size,
167                                  buf, (size == remaining_space), log_callback);
168        if (fs != FDB_RESULT_SUCCESS) {
169            fdb_log(log_callback, fs,
170                    "Error in writing a doc block with id %" _F64 ", offset %d, size %"
171                    _F64 " to a database file '%s'", handle->curblock, offset, size,
172                    handle->file->filename);
173            return BLK_NOT_FOUND;
174        }
175        handle->curpos += size;
176
177        if (_docio_fill_zero(handle, handle->curblock, handle->curpos) !=
178            FDB_RESULT_SUCCESS) {
179            return BLK_NOT_FOUND;
180        }
181
182        return handle->curblock * real_blocksize + offset;
183
184    } else { // insufficient space to fit entire document into current block
185        bid_t begin, end, i, startpos;
186        bid_t *block_list, block_list_size = 0;
187        uint16_t *bmp_revnum_list;
188        uint32_t nblock = size / blocksize;
189        uint32_t remain = size % blocksize;
190        uint64_t remainsize = size;
191        fdb_status fs = FDB_RESULT_SUCCESS;
192
193        // as blocks may not be consecutive, we need to maintain
194        // the list of BIDs.
195        block_list = (bid_t *)alca(bid_t, nblock+1);
196        bmp_revnum_list = (uint16_t *)alca(uint16_t, nblock+1);
197
198#ifdef DOCIO_BLOCK_ALIGN
199        offset = blocksize - handle->curpos;
200        if (remain <= blocksize - handle->curpos &&
201            filemgr_alloc_multiple_cond(handle->file, handle->curblock+1,
202                                        nblock + ((remain>offset)?1:0), &begin, &end,
203                                        log_callback) == handle->curblock+1) {
204
205            // start from current block
206            if (begin != (handle->curblock + 1)) {
207                fdb_log(log_callback, fs,
208                        "Error in allocating blocks starting from block id %" _F64
209                        " in a database file '%s'", handle->curblock + 1,
210                        handle->file->filename);
211                return BLK_NOT_FOUND;
212            }
213
214            fs = _add_blk_marker(handle->file, handle->curblock, blocksize,
215                                 marker, log_callback);
216            if (fs != FDB_RESULT_SUCCESS) {
217                fdb_log(log_callback, fs,
218                        "Error in appending a doc block marker for a block id %" _F64
219                        " into a database file '%s'", handle->curblock,
220                        handle->file->filename);
221                return BLK_NOT_FOUND;
222            }
223            if (offset > 0) {
224                fs = filemgr_write_offset(handle->file, handle->curblock,
225                                          handle->curpos, offset, buf,
226                                          true, // mark block as immutable
227                                          log_callback);
228                if (fs != FDB_RESULT_SUCCESS) {
229                    fdb_log(log_callback, fs,
230                            "Error in writing a doc block with id %" _F64 ", offset %d, "
231                            "size %" _F64 " to a database file '%s'", handle->curblock,
232                            offset, size, handle->file->filename);
233                    return BLK_NOT_FOUND;
234                }
235            }
236            remainsize -= offset;
237
238            startpos = handle->curblock * real_blocksize + handle->curpos;
239        } else {
240            // next block to be allocated is not continuous .. allocate new multiple blocks
241            filemgr_alloc_multiple(handle->file, nblock+((remain>0)?1:0),
242                                   &begin, &end, log_callback);
243            offset = 0;
244
245            startpos = begin * real_blocksize;
246        }
247
248#else
249        // Simple append mode
250        // The given doc is appended at the byte offset right next the last doc.
251        // Note that block allocation can be non-consecutive.
252        offset = blocksize - handle->curpos;
253
254        if (non_consecutive) {
255            // new version: support non-consecutive allocation
256
257            bool new_block = false;
258            bool start_from_new_block = false;
259
260            if (remain > offset) {
261                // if the current block cannot accommodate the remaining length
262                // of the document, allocate an additional block.
263                new_block = true;
264            }
265
266            block_list_size = nblock + ((new_block)?1:0);
267            for (i=0; i<block_list_size; ++i) {
268                bmp_revnum_list[i] = filemgr_get_sb_bmp_revnum(handle->file) & 0xff;
269                block_list[i] = filemgr_alloc(handle->file, log_callback);
270
271                if (i == 0 && handle->curblock != BLK_NOT_FOUND &&
272                    block_list[i] > handle->curblock+1) {
273                    // if the first new allocated block is not consecutive
274                    // from the current block, start writing document from
275                    // the new block.
276                    start_from_new_block = true;
277                    // since we won't write into the current block,
278                    // allocate one more block if necessary.
279                    if (remain && !new_block) {
280                        new_block = true;
281                        block_list_size++;
282                    }
283                }
284            }
285
286            if (offset > 0 && !start_from_new_block) {
287                // start from the current block
288
289                // set next BID
290                blk_meta.next_bid = _endian_encode(block_list[0]);
291                // write meta
292                fs = filemgr_write_offset(handle->file, handle->curblock,
293                                          blocksize, sizeof(blk_meta), &blk_meta,
294                                          false, log_callback);
295                if (fs != FDB_RESULT_SUCCESS) {
296                    fdb_log(log_callback, fs,
297                            "Error in appending a doc block metadata for a block id %" _F64
298                            " into a database file '%s'", handle->curblock,
299                            handle->file->filename);
300                    return BLK_NOT_FOUND;
301                }
302
303                // write the front part of the doc
304                if (offset > 0) {
305                    fs = filemgr_write_offset(handle->file, handle->curblock,
306                                              handle->curpos, offset, buf,
307                                              true, // mark block as immutable
308                                              log_callback);
309                    if (fs != FDB_RESULT_SUCCESS) {
310                        fdb_log(log_callback, fs,
311                                "Error in writing a doc block with id %" _F64 ", offset %d, "
312                                "size %" _F64 " to a database file '%s'", handle->curblock,
313                                offset, size, handle->file->filename);
314                        return BLK_NOT_FOUND;
315                    }
316                }
317                remainsize -= offset;
318
319                startpos = handle->curblock * real_blocksize + handle->curpos;
320            } else {
321                // mark remaining space in the current block as stale
322                if (handle->curblock != BLK_NOT_FOUND &&
323                    handle->curpos < real_blocksize) {
324                    filemgr_mark_stale(handle->file,
325                                       real_blocksize * handle->curblock + handle->curpos,
326                                       blocksize - handle->curpos);
327                }
328                offset = 0;
329                startpos = block_list[0] * real_blocksize;
330            }
331
332        } else {
333            // old version: consecutive allocation only
334
335            if (filemgr_alloc_multiple_cond(handle->file, handle->curblock+1,
336                                            nblock + ((remain>offset)?1:0), &begin, &end,
337                                            log_callback) == handle->curblock+1) {
338                // start from current block
339                if (begin != (handle->curblock + 1)) {
340                    fdb_log(log_callback, fs,
341                            "Error in allocating blocks starting from block id %" _F64
342                            " in a database file '%s'", handle->curblock + 1,
343                            handle->file->filename);
344                    return BLK_NOT_FOUND;
345                }
346
347                fs = _add_blk_marker(handle->file, handle->curblock, blocksize,
348                                     marker, log_callback);
349                if (fs != FDB_RESULT_SUCCESS) {
350                    fdb_log(log_callback, fs,
351                            "Error in appending a doc block marker for a block id %" _F64
352                            " into a database file '%s'", handle->curblock,
353                            handle->file->filename);
354                    return BLK_NOT_FOUND;
355                }
356                if (offset > 0) {
357                    fs = filemgr_write_offset(handle->file, handle->curblock,
358                                              handle->curpos, offset, buf,
359                                              true, // mark block as immutable
360                                              log_callback);
361                    if (fs != FDB_RESULT_SUCCESS) {
362                        fdb_log(log_callback, fs,
363                                "Error in writing a doc block with id %" _F64 ", offset %d, "
364                                "size %" _F64 " to a database file '%s'", handle->curblock,
365                                offset, size, handle->file->filename);
366                        return BLK_NOT_FOUND;
367                    }
368                }
369                remainsize -= offset;
370
371                startpos = handle->curblock * real_blocksize + handle->curpos;
372            } else {
373                // next block to be allocated is not continuous
374                // mark remaining space in the old block as stale
375                if (handle->curblock != BLK_NOT_FOUND &&
376                    handle->curpos < real_blocksize) {
377                    filemgr_mark_stale(handle->file,
378                                       real_blocksize * handle->curblock + handle->curpos,
379                                       blocksize - handle->curpos);
380                }
381                // allocate new multiple blocks
382                filemgr_alloc_multiple(handle->file, nblock+((remain>0)?1:0),
383                                       &begin, &end, log_callback);
384                offset = 0;
385
386                startpos = begin * real_blocksize;
387            }
388
389            block_list_size = end - begin + 1;
390            for (i=0; i<block_list_size; ++i) {
391                block_list[i] = begin+i;
392            }
393
394        } // if (non_consecutive)
395
396#endif
397
398        for (i=0; i<block_list_size; ++i) {
399            handle->curblock = block_list[i];
400            handle->cur_bmp_revnum_hash = bmp_revnum_list[i];
401            blk_meta.sb_bmp_revnum_hash = _endian_encode(handle->cur_bmp_revnum_hash);
402
403            if (non_consecutive) {
404                if (i < block_list_size - 1) {
405                    blk_meta.next_bid = _endian_encode(block_list[i+1]);
406                } else {
407                    // the last block .. set next BID '0xffff...'
408                    memset(&blk_meta.next_bid, 0xff, sizeof(blk_meta.next_bid));
409                }
410            }
411
412            // write meta (new) or block marker (old)
413            if (non_consecutive) {
414                fs = filemgr_write_offset(handle->file, handle->curblock,
415                                          blocksize, sizeof(blk_meta), &blk_meta,
416                                          false, log_callback);
417            } else {
418                fs = _add_blk_marker(handle->file, block_list[i], blocksize, marker,
419                                     log_callback);
420            }
421            if (fs != FDB_RESULT_SUCCESS) {
422                fdb_log(log_callback, fs,
423                        "Error in appending a doc block marker for a block "
424                        "id %" _F64 " into a database file '%s'", block_list[i],
425                        handle->file->filename);
426                return BLK_NOT_FOUND;
427            }
428
429            if (remainsize >= blocksize) {
430                // write entire block
431
432                fs = filemgr_write_offset(handle->file, block_list[i], 0, blocksize,
433                                          (uint8_t *)buf + offset,
434                                          true, // mark block as immutable
435                                          log_callback);
436                if (fs != FDB_RESULT_SUCCESS) {
437                    fdb_log(log_callback, fs,
438                            "Error in writing an entire doc block with id %" _F64
439                            ", size %" _F64 " to a database file '%s'", block_list[i], blocksize,
440                            handle->file->filename);
441                    return BLK_NOT_FOUND;
442                }
443                offset += blocksize;
444                remainsize -= blocksize;
445                handle->curpos = blocksize;
446
447            } else {
448                // write rest of document
449                fdb_assert(i==block_list_size-1, i, block_list_size-1);
450
451                fs = filemgr_write_offset(handle->file, block_list[i], 0, remainsize,
452                                          (uint8_t *)buf + offset,
453                                          (remainsize == blocksize),
454                                          log_callback);
455                if (fs != FDB_RESULT_SUCCESS) {
456                    fdb_log(log_callback, fs,
457                            "Error in writing a doc block with id %" _F64 ", "
458                            "size %" _F64 " to a database file '%s'", block_list[i], remainsize,
459                            handle->file->filename);
460                    return BLK_NOT_FOUND;
461                }
462                offset += remainsize;
463                handle->curpos = remainsize;
464
465                if (_docio_fill_zero(handle, block_list[i], handle->curpos) !=
466                    FDB_RESULT_SUCCESS) {
467                    return BLK_NOT_FOUND;
468                }
469            }
470        }
471
472        return startpos;
473    }
474
475    return 0;
476}
477
478#ifdef __ENDIAN_SAFE
479INLINE struct docio_length _docio_length_encode(struct docio_length length)
480{
481    struct docio_length ret;
482    ret = length;
483    ret.keylen = _endian_encode(length.keylen);
484    ret.metalen = _endian_encode(length.metalen);
485    ret.bodylen = _endian_encode(length.bodylen);
486    ret.bodylen_ondisk = _endian_encode(length.bodylen_ondisk);
487    return ret;
488}
489INLINE struct docio_length _docio_length_decode(struct docio_length length)
490{
491    struct docio_length ret;
492    ret = length;
493    ret.keylen = _endian_decode(length.keylen);
494    ret.metalen = _endian_decode(length.metalen);
495    ret.bodylen = _endian_decode(length.bodylen);
496    ret.bodylen_ondisk = _endian_decode(length.bodylen_ondisk);
497    return ret;
498}
499#else
500#define _docio_length_encode(a)
501#define _docio_length_decode(a)
502#endif
503
504INLINE uint8_t _docio_length_checksum(struct docio_length length, struct docio_handle* handle)
505{
506    return uint8_t(get_checksum(reinterpret_cast<const uint8_t*>(&length),
507                                sizeof(keylen_t) + sizeof(uint16_t) + sizeof(uint32_t)*2,
508                                handle->file->crc_mode) & 0xff);
509}
510
511INLINE bid_t _docio_append_doc(struct docio_handle *handle, struct docio_object *doc)
512{
513    size_t _len;
514    uint32_t offset = 0;
515    uint32_t crc;
516    uint64_t docsize;
517    void *buf = NULL;
518    bid_t ret_offset;
519    fdb_seqnum_t _seqnum;
520    timestamp_t _timestamp;
521    struct docio_length length, _length;
522    err_log_callback *log_callback = handle->log_callback;
523
524    length = doc->length;
525    length.bodylen_ondisk = length.bodylen;
526
527#ifdef _DOC_COMP
528    int ret;
529    void *compbuf = NULL;
530    uint32_t compbuf_len = 0;
531    if (doc->length.bodylen > 0 && handle->compress_document_body) {
532        compbuf_len = snappy_max_compressed_length(length.bodylen);
533        compbuf = (void *)malloc(compbuf_len);
534
535        _len = compbuf_len;
536        ret = snappy_compress((char*)doc->body, length.bodylen, (char*)compbuf, &_len);
537        if (ret < 0) { // LCOV_EXCL_START
538            fdb_log(log_callback, FDB_RESULT_COMPRESSION_FAIL,
539                    "Error in compressing the doc body of key '%s' from "
540                    "a database file '%s'",
541                    (char *) doc->key, handle->file->filename);
542            free(compbuf);
543            // we use BLK_NOT_FOUND for error code of appending instead of 0
544            // because document can be written at the byte offset 0
545            return BLK_NOT_FOUND;
546        } // LCOV_EXCL_STOP
547
548        length.bodylen_ondisk = compbuf_len = _len;
549        length.flag |= DOCIO_COMPRESSED;
550
551        docsize = sizeof(struct docio_length) + length.keylen + length.metalen;
552        docsize += compbuf_len;
553    } else {
554        docsize = sizeof(struct docio_length) + length.keylen + length.metalen + length.bodylen;
555        compbuf_len = length.bodylen;
556    }
557#else
558    docsize = sizeof(struct docio_length) + length.keylen + length.metalen + length.bodylen;
559#endif
560    docsize += sizeof(timestamp_t);
561
562    docsize += sizeof(fdb_seqnum_t);
563
564#ifdef __CRC32
565    docsize += sizeof(crc);
566#endif
567
568    doc->length = length;
569    buf = (void *)malloc(docsize);
570
571    _length = _docio_length_encode(length);
572
573    // calculate checksum of LENGTH using crc
574    _length.checksum = _docio_length_checksum(_length, handle);
575
576    memcpy((uint8_t *)buf + offset, &_length, sizeof(struct docio_length));
577    offset += sizeof(struct docio_length);
578
579    // copy key
580    memcpy((uint8_t *)buf + offset, doc->key, length.keylen);
581    offset += length.keylen;
582
583    // copy timestamp
584    _timestamp = _endian_encode(doc->timestamp);
585    memcpy((uint8_t*)buf + offset, &_timestamp, sizeof(_timestamp));
586    offset += sizeof(_timestamp);
587
588    // copy seqeunce number (optional)
589    _seqnum = _endian_encode(doc->seqnum);
590    memcpy((uint8_t *)buf + offset, &_seqnum, sizeof(fdb_seqnum_t));
591    offset += sizeof(fdb_seqnum_t);
592
593    // copy metadata (optional)
594    if (length.metalen > 0) {
595        memcpy((uint8_t *)buf + offset, doc->meta, length.metalen);
596        offset += length.metalen;
597    }
598
599    // copy body (optional)
600    if (length.bodylen > 0) {
601#ifdef _DOC_COMP
602        if (length.flag & DOCIO_COMPRESSED) {
603            // compressed body
604            if (compbuf) {
605                memcpy((uint8_t*)buf + offset, compbuf, compbuf_len);
606                offset += compbuf_len;
607                free(compbuf);
608            }
609        } else {
610            memcpy((uint8_t *)buf + offset, doc->body, length.bodylen);
611            offset += length.bodylen;
612        }
613#else
614        memcpy((uint8_t *)buf + offset, doc->body, length.bodylen);
615        offset += length.bodylen;
616#endif
617    }
618
619#ifdef __CRC32
620    crc = get_checksum(reinterpret_cast<const uint8_t*>(buf),
621                       docsize - sizeof(crc),
622                       handle->file->crc_mode);
623    memcpy((uint8_t *)buf + offset, &crc, sizeof(crc));
624#endif
625
626    ret_offset = docio_append_doc_raw(handle, docsize, buf);
627    free(buf);
628
629    return ret_offset;
630}
631
632bid_t docio_append_commit_mark(struct docio_handle *handle, uint64_t doc_offset)
633{
634    // Note: should adapt DOCIO_COMMIT_MARK_SIZE if this function is modified.
635    uint32_t offset = 0;
636    uint64_t docsize;
637    uint64_t _doc_offset;
638    void *buf;
639    bid_t ret_offset;
640    struct docio_length length, _length;
641
642    memset(&length, 0, sizeof(struct docio_length));
643    length.flag = DOCIO_TXN_COMMITTED;
644
645    docsize = sizeof(struct docio_length) + sizeof(doc_offset);
646    buf = (void *)malloc(docsize);
647
648    _length = _docio_length_encode(length);
649
650    // calculate checksum of LENGTH using crc
651    _length.checksum = _docio_length_checksum(_length, handle);
652
653    memcpy((uint8_t *)buf + offset, &_length, sizeof(struct docio_length));
654    offset += sizeof(struct docio_length);
655
656    // copy doc_offset
657    _doc_offset = _endian_encode(doc_offset);
658    memcpy((uint8_t *)buf + offset, &_doc_offset, sizeof(_doc_offset));
659
660    ret_offset = docio_append_doc_raw(handle, docsize, buf);
661    free(buf);
662
663    return ret_offset;
664}
665
666bid_t docio_append_doc(struct docio_handle *handle, struct docio_object *doc,
667                       uint8_t deleted, uint8_t txn_enabled)
668{
669    doc->length.flag = DOCIO_NORMAL;
670    if (deleted) {
671        doc->length.flag |= DOCIO_DELETED;
672    }
673    if (txn_enabled) {
674        doc->length.flag |= DOCIO_TXN_DIRTY;
675    }
676    return _docio_append_doc(handle, doc);
677}
678
679bid_t docio_append_doc_system(struct docio_handle *handle, struct docio_object *doc)
680{
681    doc->length.flag = DOCIO_NORMAL | DOCIO_SYSTEM;
682    return _docio_append_doc(handle, doc);
683}
684
685INLINE fdb_status _docio_read_through_buffer(struct docio_handle *handle,
686                                             bid_t bid,
687                                             err_log_callback *log_callback,
688                                             bool read_on_cache_miss)
689{
690    fdb_status status = FDB_RESULT_SUCCESS;
691    // to reduce the overhead from memcpy the same block
692    if (handle->lastbid != bid) {
693        status = filemgr_read(handle->file, bid, handle->readbuffer,
694                              log_callback, read_on_cache_miss);
695        if (status != FDB_RESULT_SUCCESS) {
696            if (read_on_cache_miss) {
697                fdb_log(log_callback, status,
698                        "Error in reading a doc block with id %" _F64 " from "
699                        "a database file '%s'", bid, handle->file->filename);
700            }
701            // we must reset 'lastbid' here because now 'readbuffer'
702            // may contain other data unrelated to 'lastbid'.
703            handle->lastbid = BLK_NOT_FOUND;
704            return status;
705        }
706
707        if (filemgr_is_writable(handle->file, bid)) {
708            // this block can be modified later .. must be re-read
709            handle->lastbid = BLK_NOT_FOUND;
710        } else {
711            handle->lastbid = bid;
712        }
713    }
714
715    return status;
716}
717
718INLINE bool _docio_check_buffer(struct docio_handle *handle, uint64_t bmp_revnum)
719{
720    size_t blocksize = handle->file->blocksize;
721    bool non_consecutive = ver_non_consecutive_doc(handle->file->version);
722    struct docblk_meta blk_meta;
723
724    if (non_consecutive) {
725        // new version: support non-consecutive document block
726        blocksize -= DOCBLK_META_SIZE;
727        memcpy(&blk_meta, (uint8_t*)handle->readbuffer + blocksize, sizeof(blk_meta));
728    } else {
729        // old version: block marker only
730        blocksize -= BLK_MARKER_SIZE;
731        memcpy(&blk_meta.marker, (uint8_t*)handle->readbuffer + blocksize,
732               sizeof(blk_meta.marker));
733    }
734
735    if (blk_meta.marker != BLK_MARKER_DOC) {
736        return false;
737    }
738
739    if (non_consecutive && bmp_revnum != (uint64_t)-1) {
740        uint16_t revnum_hash = _endian_decode(blk_meta.sb_bmp_revnum_hash);
741        if (revnum_hash == (bmp_revnum & 0xff)) {
742            return true;
743        } else {
744            return false;
745        }
746    }
747    return true;
748}
749
750static int64_t _docio_read_length(struct docio_handle *handle,
751                                  uint64_t offset,
752                                  struct docio_length *length,
753                                  err_log_callback *log_callback,
754                                  bool read_on_cache_miss)
755{
756    size_t blocksize = handle->file->blocksize;
757    size_t real_blocksize = blocksize;
758    bool non_consecutive = ver_non_consecutive_doc(handle->file->version);
759    struct docblk_meta blk_meta;
760#ifdef __CRC32
761    if (non_consecutive) {
762        // new version: support non-consecutive document block
763        blocksize -= DOCBLK_META_SIZE;
764    } else {
765        // old version: block marker only
766        blocksize -= BLK_MARKER_SIZE;
767    }
768#endif
769
770    bid_t bid = offset / real_blocksize;
771    uint32_t pos = offset % real_blocksize;
772    void *buf = handle->readbuffer;
773    uint32_t restsize;
774
775    restsize = blocksize - pos;
776    // read length structure
777    fdb_status fs = _docio_read_through_buffer(handle, bid, log_callback,
778                                               read_on_cache_miss);
779    if (fs != FDB_RESULT_SUCCESS) {
780        if (read_on_cache_miss) {
781            fdb_log(log_callback, fs,
782                    "Error in reading a doc length from offset %" _F64
783                    " in block id %" _F64
784                    " from a database file '%s'", offset, bid,
785                    handle->file->filename);
786        }
787        return (int64_t) fs;
788    }
789    if (!_docio_check_buffer(handle, (uint64_t)-1)) {
790        return (int64_t) FDB_RESULT_READ_FAIL; // Need to define a better error code
791    }
792
793    if (restsize >= sizeof(struct docio_length)) {
794        memcpy(length, (uint8_t *)buf + pos, sizeof(struct docio_length));
795        pos += sizeof(struct docio_length);
796
797    } else {
798        memcpy(length, (uint8_t *)buf + pos, restsize);
799        // read additional block
800        if (non_consecutive) {
801            memcpy(&blk_meta, (uint8_t*)buf + blocksize, sizeof(blk_meta));
802            bid = _endian_decode(blk_meta.next_bid);
803            if (bid == BLK_NOT_FOUND) {
804                // Reached the last block. Simply return the offset that is passed to
805                // this function.
806                memset(length, 0x0, sizeof(struct docio_length));
807                return offset;
808            }
809        } else {
810            bid++;
811        }
812
813        fs = _docio_read_through_buffer(handle, bid, log_callback, true);
814        if (fs != FDB_RESULT_SUCCESS) {
815            fdb_log(log_callback, fs,
816                    "Error in reading a doc length from an additional block "
817                    "offset %" _F64 " in block id %" _F64
818                    " from a database file '%s'", offset,
819                    bid, handle->file->filename);
820            return (int64_t) fs;
821        }
822        if (!_docio_check_buffer(handle, (uint64_t)-1)) {
823            return (int64_t) FDB_RESULT_READ_FAIL; // Need to define a better error code
824        }
825        // memcpy rest of data
826        memcpy((uint8_t *)length + restsize, buf,
827               sizeof(struct docio_length) - restsize);
828        pos = sizeof(struct docio_length) - restsize;
829    }
830
831    return bid * real_blocksize + pos;
832}
833
834static int64_t _docio_read_doc_component(struct docio_handle *handle,
835                                         uint64_t offset,
836                                         uint32_t len,
837                                         void *buf_out,
838                                         err_log_callback *log_callback)
839{
840    uint32_t rest_len;
841    size_t blocksize = handle->file->blocksize;
842    size_t real_blocksize = blocksize;
843    bool non_consecutive = ver_non_consecutive_doc(handle->file->version);
844    struct docblk_meta blk_meta;
845#ifdef __CRC32
846    if (non_consecutive) {
847        // new version: support non-consecutive document block
848        blocksize -= DOCBLK_META_SIZE;
849    } else {
850        // old version: block marker only
851        blocksize -= BLK_MARKER_SIZE;
852    }
853#endif
854
855    bid_t bid = offset / real_blocksize;
856    uint32_t pos = offset % real_blocksize;
857    //uint8_t buf[handle->file->blocksize];
858    void *buf = handle->readbuffer;
859    uint32_t restsize;
860    fdb_status fs = FDB_RESULT_SUCCESS;
861
862    rest_len = len;
863
864    while(rest_len > 0) {
865        fs = _docio_read_through_buffer(handle, bid, log_callback, true);
866        if (fs != FDB_RESULT_SUCCESS) {
867            fdb_log(log_callback, fs,
868                    "Error in reading a doc block with block id %" _F64 " from "
869                    "a database file '%s'", bid, handle->file->filename);
870            return (int64_t)fs;
871        }
872        restsize = blocksize - pos;
873
874        if (restsize >= rest_len) {
875            memcpy((uint8_t *)buf_out + (len - rest_len), (uint8_t *)buf + pos, rest_len);
876            pos += rest_len;
877            rest_len = 0;
878        }else{
879            memcpy((uint8_t *)buf_out + (len - rest_len), (uint8_t *)buf + pos, restsize);
880
881            if (non_consecutive) {
882                memcpy(&blk_meta, (uint8_t*)buf + blocksize, sizeof(blk_meta));
883                bid = _endian_decode(blk_meta.next_bid);
884            } else {
885                bid++;
886            }
887
888            pos = 0;
889            rest_len -= restsize;
890
891            if (rest_len > 0 &&
892                bid >= filemgr_get_pos(handle->file) / handle->file->blocksize) {
893                // no more data in the file .. the file is corrupted
894                fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
895                        "Fatal error!!! Database file '%s' is corrupted.",
896                        handle->file->filename);
897                return (int64_t)FDB_RESULT_FILE_CORRUPTION;
898            }
899        }
900    }
901
902    return bid * real_blocksize + pos;
903}
904
905#ifdef _DOC_COMP
906
907static int64_t _docio_read_doc_component_comp(struct docio_handle *handle,
908                                              uint64_t offset,
909                                              uint32_t len,
910                                              uint32_t comp_len,
911                                              void *buf_out,
912                                              void *comp_data_out,
913                                              err_log_callback *log_callback)
914{
915    int ret;
916    size_t uncomp_size;
917    int64_t _offset;
918
919    _offset = _docio_read_doc_component(handle, offset,
920                                        comp_len, comp_data_out, log_callback);
921    if (_offset < 0) {
922        fdb_log(log_callback, (fdb_status) _offset,
923                "Error in reading the file with offset %" _F64 ", length %d "
924                "from a database file '%s'", offset, len,
925                handle->file->filename);
926        return _offset;
927    }
928
929    uncomp_size = len;
930    ret = snappy_uncompress((char*)comp_data_out, comp_len,
931                            (char*)buf_out, &uncomp_size);
932    if (ret < 0) {
933        fdb_log(log_callback, FDB_RESULT_COMPRESSION_FAIL,
934                "Error in decompressing the data that was read with the file "
935                "offset %" _F64 ", length %d from a database file '%s'",
936                offset, len, handle->file->filename);
937        return (int64_t) FDB_RESULT_COMPRESSION_FAIL;
938    }
939    if (uncomp_size != len) {
940        fdb_log(log_callback, FDB_RESULT_COMPRESSION_FAIL,
941                "Error in decompressing the data with the file offset "
942                "%" _F64 " in a database file '%s', because the uncompressed length %d "
943                "is not same as the expected length %d",
944                offset, handle->file->filename, uncomp_size, len);
945        return (int64_t) FDB_RESULT_COMPRESSION_FAIL;
946    }
947    return _offset;
948}
949
950#endif
951
952fdb_status docio_read_doc_length(struct docio_handle *handle,
953                                 struct docio_length *length,
954                                 uint64_t offset)
955{
956    uint8_t checksum;
957    int64_t _offset;
958    struct docio_length _length, zero_length;
959    err_log_callback *log_callback = handle->log_callback;
960
961    _offset = _docio_read_length(handle, offset, &_length, log_callback, true);
962    if (_offset < 0) {
963        return (fdb_status) _offset;
964    }
965
966    memset(&zero_length, 0x0, sizeof(struct docio_length));
967    if (memcmp(&_length, &zero_length, sizeof(struct docio_length)) == 0) {
968        // If all the fields in docio_length are zero, then it means that the rest of
969        // the current block, which starts at offset, is zero-filled and can be skipped.
970        *length = zero_length;
971        return FDB_RESULT_SUCCESS;
972    }
973
974    // checksum check
975    checksum = _docio_length_checksum(_length, handle);
976    if (checksum != _length.checksum) {
977        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
978                "doc_length checksum mismatch error in a database file '%s'"
979                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
980                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
981                checksum, _length.checksum, _length.keylen, _length.metalen,
982                _length.bodylen, _length.bodylen_ondisk, offset);
983        return FDB_RESULT_CHECKSUM_ERROR;
984    }
985
986    *length = _docio_length_decode(_length);
987    if (length->keylen == 0 || length->keylen > FDB_MAX_KEYLEN_INTERNAL) {
988        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
989                "Error in decoding the doc length metadata in file %s"
990                " crc %x keylen %d metalen %d bodylen %d "
991                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
992                checksum, _length.keylen, _length.metalen,
993                _length.bodylen, _length.bodylen_ondisk, offset);
994        return FDB_RESULT_FILE_CORRUPTION;
995    }
996
997    return FDB_RESULT_SUCCESS;
998}
999
1000fdb_status docio_read_doc_key(struct docio_handle *handle, uint64_t offset,
1001                              keylen_t *keylen, void *keybuf)
1002{
1003    uint8_t checksum;
1004    int64_t _offset;
1005    struct docio_length length, _length, zero_length;
1006    err_log_callback *log_callback = handle->log_callback;
1007
1008    _offset = _docio_read_length(handle, offset, &_length, log_callback, true);
1009    if (_offset < 0) {
1010        fdb_log(log_callback, (fdb_status) _offset,
1011                "Error in reading the doc length metadata with offset %" _F64 " from "
1012                "a database file '%s'",
1013                offset, handle->file->filename);
1014        return (fdb_status) _offset;
1015    }
1016
1017    memset(&zero_length, 0x0, sizeof(struct docio_length));
1018    if (memcmp(&_length, &zero_length, sizeof(struct docio_length)) == 0) {
1019        // If all the fields in docio_length are zero, then it means that the rest of
1020        // the current block, which starts at offset, is zero-filled and can be skipped.
1021        *keylen = 0;
1022        return FDB_RESULT_SUCCESS;
1023    }
1024
1025    // checksum check
1026    checksum = _docio_length_checksum(_length, handle);
1027    if (checksum != _length.checksum) {
1028        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1029                "doc_length key checksum mismatch error in a database file '%s'"
1030                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
1031                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1032                checksum, _length.checksum, _length.keylen, _length.metalen,
1033                _length.bodylen, _length.bodylen_ondisk, offset);
1034        return FDB_RESULT_CHECKSUM_ERROR;
1035    }
1036
1037    length = _docio_length_decode(_length);
1038    if (length.keylen == 0 || length.keylen > FDB_MAX_KEYLEN_INTERNAL) {
1039        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1040                "Error in decoding the doc key length metadata in file %s"
1041                " crc %x keylen %d metalen %d bodylen %d "
1042                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1043                checksum, _length.keylen, _length.metalen,
1044                _length.bodylen, _length.bodylen_ondisk, offset);
1045        return FDB_RESULT_FILE_CORRUPTION;
1046    }
1047
1048    _offset = _docio_read_doc_component(handle, _offset, length.keylen,
1049                                        keybuf, log_callback);
1050    if (_offset < 0) {
1051        fdb_log(log_callback, (fdb_status) _offset,
1052                "Error in reading a key with offset %" _F64 ", length %d "
1053                "from a database file '%s'", _offset, length.keylen,
1054                handle->file->filename);
1055        return (fdb_status) _offset;
1056    }
1057
1058    *keylen = length.keylen;
1059    return FDB_RESULT_SUCCESS;
1060}
1061
1062void free_docio_object(struct docio_object *doc, uint8_t key_alloc,
1063                       uint8_t meta_alloc, uint8_t body_alloc) {
1064    if (!doc) {
1065        return;
1066    }
1067
1068    if (key_alloc) {
1069        free(doc->key);
1070        doc->key = NULL;
1071    }
1072    if (meta_alloc) {
1073        free(doc->meta);
1074        doc->meta = NULL;
1075    }
1076    if (body_alloc) {
1077        free(doc->body);
1078        doc->body = NULL;
1079    }
1080}
1081
1082int64_t docio_read_doc_key_meta(struct docio_handle *handle, uint64_t offset,
1083                                struct docio_object *doc,
1084                                bool read_on_cache_miss)
1085{
1086    uint8_t checksum;
1087    int64_t _offset;
1088    int key_alloc = 0;
1089    int meta_alloc = 0;
1090    fdb_seqnum_t _seqnum;
1091    timestamp_t _timestamp;
1092    struct docio_length _length, zero_length;
1093    err_log_callback *log_callback = handle->log_callback;
1094
1095    _offset = _docio_read_length(handle, offset, &_length, log_callback,
1096                                 read_on_cache_miss);
1097    if (_offset < 0) {
1098        if (read_on_cache_miss) {
1099            fdb_log(log_callback, (fdb_status) _offset,
1100                    "Error in reading the doc length metadata with offset %" _F64 " from "
1101                    "a database file '%s'",
1102                    offset, handle->file->filename);
1103        }
1104        return _offset;
1105    }
1106
1107    memset(&zero_length, 0x0, sizeof(struct docio_length));
1108    if (memcmp(&_length, &zero_length, sizeof(struct docio_length)) == 0) {
1109        // If all the fields in docio_length are zero, then it means that the rest of
1110        // the current block, which starts at offset, is zero-filled and can be skipped.
1111        doc->length = zero_length;
1112        return (int64_t) FDB_RESULT_SUCCESS;
1113    }
1114
1115    // checksum check
1116    checksum = _docio_length_checksum(_length, handle);
1117    if (checksum != _length.checksum) {
1118        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1119                "doc_length meta checksum mismatch error in a database file '%s'"
1120                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
1121                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1122                checksum, _length.checksum, _length.keylen, _length.metalen,
1123                _length.bodylen, _length.bodylen_ondisk, offset);
1124        return (int64_t) FDB_RESULT_CHECKSUM_ERROR;
1125    }
1126
1127    doc->length = _docio_length_decode(_length);
1128    if (doc->length.keylen == 0 || doc->length.keylen > FDB_MAX_KEYLEN_INTERNAL) {
1129        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1130                "Error in decoding the doc length metadata (key length: %d) from "
1131                "a database file '%s'", doc->length.keylen, handle->file->filename);
1132        return (int64_t) FDB_RESULT_FILE_CORRUPTION;
1133    }
1134
1135    if (doc->key == NULL) {
1136        doc->key = (void *)malloc(doc->length.keylen);
1137        key_alloc = 1;
1138    }
1139    if (doc->meta == NULL && doc->length.metalen) {
1140        doc->meta = (void *)malloc(doc->length.metalen);
1141        meta_alloc = 1;
1142    }
1143
1144    _offset = _docio_read_doc_component(handle, _offset, doc->length.keylen,
1145                                        doc->key, log_callback);
1146    if (_offset < 0) {
1147        fdb_log(log_callback, (fdb_status) _offset,
1148                "Error in reading a key with offset %" _F64 ", length %d "
1149                "from a database file '%s'", offset, doc->length.keylen,
1150                handle->file->filename);
1151        free_docio_object(doc, key_alloc, meta_alloc, 0);
1152        return _offset;
1153    }
1154
1155    // read timestamp
1156    _offset = _docio_read_doc_component(handle, _offset,
1157                                        sizeof(timestamp_t),
1158                                        &_timestamp, log_callback);
1159    if (_offset < 0) {
1160        fdb_log(log_callback, (fdb_status) _offset,
1161                "Error in reading a timestamp with offset %" _F64 ", length %d "
1162                "from a database file '%s'", offset, sizeof(timestamp_t),
1163                handle->file->filename);
1164        free_docio_object(doc, key_alloc, meta_alloc, 0);
1165        return _offset;
1166    }
1167    doc->timestamp = _endian_decode(_timestamp);
1168
1169    // copy sequence number (optional)
1170    _offset = _docio_read_doc_component(handle, _offset, sizeof(fdb_seqnum_t),
1171                                        (void *)&_seqnum, log_callback);
1172    if (_offset < 0) {
1173        fdb_log(log_callback, (fdb_status) _offset,
1174                "Error in reading a sequence number with offset %" _F64 ", length %d "
1175                "from a database file '%s'", offset, sizeof(fdb_seqnum_t),
1176                handle->file->filename);
1177        free_docio_object(doc, key_alloc, meta_alloc, 0);
1178        return _offset;
1179    }
1180    doc->seqnum = _endian_decode(_seqnum);
1181
1182    _offset = _docio_read_doc_component(handle, _offset, doc->length.metalen,
1183                                        doc->meta, log_callback);
1184    if (_offset < 0) {
1185        fdb_log(log_callback, (fdb_status) _offset,
1186                "Error in reading the doc metadata with offset %" _F64 ", length %d "
1187                "from a database file '%s'", offset, doc->length.metalen,
1188                handle->file->filename);
1189        free_docio_object(doc, key_alloc, meta_alloc, 0);
1190        return _offset;
1191    }
1192
1193    uint8_t free_meta = meta_alloc && !doc->length.metalen;
1194    free_docio_object(doc, 0, free_meta, 0);
1195
1196    return _offset;
1197}
1198
1199int64_t docio_read_doc(struct docio_handle *handle, uint64_t offset,
1200                       struct docio_object *doc,
1201                       bool read_on_cache_miss)
1202{
1203    uint8_t checksum;
1204    int64_t _offset;
1205    int key_alloc = 0;
1206    int meta_alloc = 0;
1207    int body_alloc = 0;
1208    fdb_seqnum_t _seqnum;
1209    timestamp_t _timestamp;
1210    void *comp_body = NULL;
1211    struct docio_length _length, zero_length;
1212    err_log_callback *log_callback = handle->log_callback;
1213
1214    _offset = _docio_read_length(handle, offset, &_length, log_callback,
1215                                 read_on_cache_miss);
1216    if (_offset < 0) {
1217        if (read_on_cache_miss) {
1218            fdb_log(log_callback, (fdb_status) _offset,
1219                    "Error in reading the doc length metadata with offset %" _F64 " from "
1220                    "a database file '%s'",
1221                    offset, handle->file->filename);
1222        }
1223        return _offset;
1224    }
1225
1226    memset(&zero_length, 0x0, sizeof(struct docio_length));
1227    if (memcmp(&_length, &zero_length, sizeof(struct docio_length)) == 0) {
1228        // If all the fields in docio_length are zero, then it means that the rest of
1229        // the current block, which starts at offset, is zero-filled and can be skipped.
1230        doc->length = zero_length;
1231        return (int64_t) FDB_RESULT_SUCCESS;
1232    }
1233
1234    // checksum check
1235    checksum = _docio_length_checksum(_length, handle);
1236    if (checksum != _length.checksum) {
1237        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1238                "doc_length body checksum mismatch error in a database file '%s'"
1239                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
1240                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1241                checksum, _length.checksum, _length.keylen, _length.metalen,
1242                _length.bodylen, _length.bodylen_ondisk, offset);
1243        return (int64_t) FDB_RESULT_CHECKSUM_ERROR;
1244    }
1245
1246    doc->length = _docio_length_decode(_length);
1247    if (doc->length.flag & DOCIO_TXN_COMMITTED) {
1248        // transaction commit mark
1249        // read the corresponding doc offset
1250
1251        // If TXN_COMMITTED flag is set, this doc is not an actual doc, but a
1252        // transaction commit marker. Thus, all lengths should be zero.
1253        if (doc->length.keylen || doc->length.metalen ||
1254            doc->length.bodylen || doc->length.bodylen_ondisk) {
1255            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1256                    "File corruption: Doc length fields in a transaction commit marker "
1257                    "was not zero in a database file '%s' offset %" _F64,
1258                    handle->file->filename, offset);
1259            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1260            return (int64_t) FDB_RESULT_FILE_CORRUPTION;
1261        }
1262
1263        uint64_t doc_offset;
1264        _offset = _docio_read_doc_component(handle, _offset,
1265                                            sizeof(doc_offset), &doc_offset,
1266                                            log_callback);
1267        if (_offset < 0) {
1268            fdb_log(log_callback, (fdb_status) _offset,
1269                    "Error in reading an offset of a committed doc from an offset %" _F64
1270                    " in a database file '%s'", offset, handle->file->filename);
1271            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1272            return _offset;
1273        }
1274        doc->doc_offset = _endian_decode(doc_offset);
1275        // The offset of the actual document that pointed by this commit marker
1276        // should not be greater than the file size.
1277        if (doc->doc_offset > filemgr_get_pos(handle->file)) {
1278            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1279                    "File corruption: Offset %" _F64 " of the actual doc pointed by the "
1280                    "commit marker is greater than the size %" _F64 " of a database file '%s'",
1281                    doc->doc_offset, filemgr_get_pos(handle->file),
1282                    handle->file->filename);
1283            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1284            return (int64_t) FDB_RESULT_FILE_CORRUPTION;
1285        }
1286        return _offset;
1287    }
1288
1289    if (doc->length.keylen == 0 || doc->length.keylen > FDB_MAX_KEYLEN_INTERNAL) {
1290        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1291                "Error in decoding the doc length metadata (key length: %d) from "
1292                "a database file '%s' offset %" _F64, doc->length.keylen,
1293                handle->file->filename, offset);
1294        return (int64_t) FDB_RESULT_FILE_CORRUPTION;
1295    }
1296
1297    if (doc->key == NULL) {
1298        doc->key = (void *)malloc(doc->length.keylen);
1299        key_alloc = 1;
1300    }
1301    if (doc->meta == NULL && doc->length.metalen) {
1302        doc->meta = (void *)malloc(doc->length.metalen);
1303        meta_alloc = 1;
1304    }
1305    if (doc->body == NULL && doc->length.bodylen) {
1306        doc->body = (void *)malloc(doc->length.bodylen);
1307        body_alloc = 1;
1308    }
1309
1310    _offset = _docio_read_doc_component(handle, _offset,
1311                                        doc->length.keylen,
1312                                        doc->key,
1313                                        log_callback);
1314    if (_offset < 0) {
1315        fdb_log(log_callback, (fdb_status) _offset,
1316                "Error in reading a key with offset %" _F64 ", length %d "
1317                "from a database file '%s'", offset, doc->length.keylen,
1318                handle->file->filename);
1319        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1320        return _offset;
1321    }
1322
1323    // read timestamp
1324    _offset = _docio_read_doc_component(handle, _offset,
1325                                        sizeof(timestamp_t),
1326                                        &_timestamp,
1327                                        log_callback);
1328    if (_offset < 0) {
1329        fdb_log(log_callback, (fdb_status) _offset,
1330                "Error in reading a timestamp with offset %" _F64 ", length %d "
1331                "from a database file '%s'", offset, sizeof(timestamp_t),
1332                handle->file->filename);
1333        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1334        return _offset;
1335    }
1336    doc->timestamp = _endian_decode(_timestamp);
1337
1338    // copy seqeunce number (optional)
1339    _offset = _docio_read_doc_component(handle, _offset,
1340                                        sizeof(fdb_seqnum_t),
1341                                        (void *)&_seqnum,
1342                                        log_callback);
1343    if (_offset < 0) {
1344        fdb_log(log_callback, (fdb_status) _offset,
1345                "Error in reading a sequence number with offset %" _F64 ", length %d "
1346                "from a database file '%s'", offset, sizeof(fdb_seqnum_t),
1347                handle->file->filename);
1348        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1349        return _offset;
1350    }
1351    doc->seqnum = _endian_decode(_seqnum);
1352
1353    _offset = _docio_read_doc_component(handle, _offset, doc->length.metalen,
1354                                        doc->meta, log_callback);
1355    if (_offset < 0) {
1356        fdb_log(log_callback, (fdb_status) _offset,
1357                "Error in reading the doc metadata with offset %" _F64 ", length %d "
1358                "from a database file '%s'", offset, doc->length.metalen,
1359                handle->file->filename);
1360        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1361        return _offset;
1362    }
1363
1364#ifdef _DOC_COMP
1365    if (doc->length.flag & DOCIO_COMPRESSED) {
1366        comp_body = (void*)malloc(doc->length.bodylen_ondisk);
1367        _offset = _docio_read_doc_component_comp(handle, _offset, doc->length.bodylen,
1368                                                 doc->length.bodylen_ondisk, doc->body,
1369                                                 comp_body, log_callback);
1370        if (_offset < 0) {
1371            fdb_log(log_callback, (fdb_status) _offset,
1372                    "Error in reading a compressed doc with offset %" _F64 ", length %d "
1373                    "from a database file '%s'", offset, doc->length.bodylen,
1374                    handle->file->filename);
1375            if (comp_body) {
1376                free(comp_body);
1377            }
1378            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1379            return _offset;
1380        }
1381    } else {
1382        _offset = _docio_read_doc_component(handle, _offset, doc->length.bodylen,
1383                                            doc->body, log_callback);
1384        if (_offset < 0) {
1385            fdb_log(log_callback, (fdb_status) _offset,
1386                    "Error in reading a doc with offset %" _F64 ", length %d "
1387                    "from a database file '%s'", offset, doc->length.bodylen,
1388                    handle->file->filename);
1389            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1390            return _offset;
1391        }
1392    }
1393#else
1394    _offset = _docio_read_doc_component(handle, _offset, doc->length.bodylen,
1395                                        doc->body, log_callback);
1396    if (_offset < 0) {
1397        fdb_log(log_callback, (fdb_status) _offset,
1398                "Error in reading a doc with offset %" _F64 ", length %d "
1399                "from a database file '%s'", offset, doc->length.bodylen,
1400                handle->file->filename);
1401        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1402        return _offset;
1403    }
1404#endif
1405
1406#ifdef __CRC32
1407    uint32_t crc_file, crc;
1408    _offset = _docio_read_doc_component(handle, _offset, sizeof(crc_file),
1409                                        (void *)&crc_file, log_callback);
1410    if (_offset < 0) {
1411        fdb_log(log_callback, (fdb_status) _offset,
1412                "Error in reading a doc's CRC value with offset %" _F64 ", length %d "
1413                "from a database file '%s'", offset, sizeof(crc_file),
1414                handle->file->filename);
1415        if (comp_body) {
1416            free(comp_body);
1417        }
1418        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1419        return _offset;
1420    }
1421
1422    crc = get_checksum(reinterpret_cast<const uint8_t*>(&_length),
1423                       sizeof(_length),
1424                       handle->file->crc_mode);
1425    crc = get_checksum(reinterpret_cast<const uint8_t*>(doc->key),
1426                       doc->length.keylen,
1427                       crc,
1428                       handle->file->crc_mode);
1429    crc = get_checksum(reinterpret_cast<const uint8_t*>(&_timestamp),
1430                       sizeof(timestamp_t),
1431                       crc,
1432                       handle->file->crc_mode);
1433    crc = get_checksum(reinterpret_cast<const uint8_t*>(&_seqnum),
1434                       sizeof(fdb_seqnum_t),
1435                       crc,
1436                       handle->file->crc_mode);
1437    crc = get_checksum(reinterpret_cast<const uint8_t*>(doc->meta),
1438                       doc->length.metalen,
1439                       crc,
1440                       handle->file->crc_mode);
1441
1442    if (doc->length.flag & DOCIO_COMPRESSED) {
1443        crc = get_checksum(reinterpret_cast<const uint8_t*>(comp_body),
1444                           doc->length.bodylen_ondisk,
1445                           crc,
1446                           handle->file->crc_mode);
1447        if (comp_body) {
1448            free(comp_body);
1449        }
1450    } else {
1451        crc = get_checksum(reinterpret_cast<const uint8_t*>(doc->body),
1452                           doc->length.bodylen,
1453                           crc,
1454                           handle->file->crc_mode);
1455    }
1456    if (crc != crc_file) {
1457        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1458                "doc_body checksum mismatch error in a database file '%s'"
1459                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
1460                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1461                crc, crc_file, _length.keylen, _length.metalen,
1462                _length.bodylen, _length.bodylen_ondisk, offset);
1463        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1464        return (int64_t) FDB_RESULT_CHECKSUM_ERROR;
1465    }
1466#endif
1467
1468    uint8_t free_meta = meta_alloc && !doc->length.metalen;
1469    uint8_t free_body = body_alloc && !doc->length.bodylen;
1470    free_docio_object(doc, 0, free_meta, free_body);
1471
1472    return _offset;
1473}
1474
1475static int _submit_async_io_requests(struct docio_handle *handle,
1476                                     struct docio_object *doc_array,
1477                                     size_t doc_idx,
1478                                     struct async_io_handle *aio_handle,
1479                                     int size,
1480                                     size_t *sum_doc_size,
1481                                     bool keymeta_only)
1482{
1483#ifdef _ASYNC_IO
1484#if !defined(WIN32) && !defined(_WIN32)
1485    struct io_event* io_evt = NULL;
1486    uint8_t *buf = NULL;
1487    uint64_t offset = 0, _offset = 0;
1488    int num_events = 0;
1489
1490    int num_sub = handle->file->ops->aio_submit(aio_handle, size);
1491    if (num_sub < 0) {
1492        // Error loggings
1493        char errno_msg[512];
1494        handle->file->ops->get_errno_str(errno_msg, 512);
1495        fdb_log(handle->log_callback, (fdb_status) num_sub,
1496                "Error in submitting async I/O requests to a file '%s', errno msg: %s",
1497                handle->file->filename, errno_msg);
1498        return num_sub;
1499    } else if (num_sub != size) {
1500        // Error loggings
1501        char errno_msg[512];
1502        handle->file->ops->get_errno_str(errno_msg, 512);
1503        fdb_log(handle->log_callback, (fdb_status) num_sub,
1504                "Error in submitting async I/O requests to a file '%s', errno msg: %s, "
1505                "%d requests were submitted, but only %d requests were processed",
1506                handle->file->filename, errno_msg, size, num_sub);
1507        return num_sub;
1508    }
1509
1510    while (num_sub > 0) {
1511        num_events = handle->file->ops->aio_getevents(aio_handle, 1,
1512                                                      num_sub, (unsigned int) -1);
1513        if (num_events < 0) {
1514            // Error loggings
1515            char errno_msg[512];
1516            handle->file->ops->get_errno_str(errno_msg, 512);
1517            fdb_log(handle->log_callback, (fdb_status) num_events,
1518                    "Error in getting async I/O events from the completion queue "
1519                    "for a file '%s', errno msg: %s", handle->file->filename, errno_msg);
1520            return num_events;
1521        }
1522        num_sub -= num_events;
1523        for (io_evt = aio_handle->events; num_events > 0; --num_events, ++io_evt) {
1524            buf = (uint8_t *) io_evt->obj->u.c.buf;
1525            offset = *((uint64_t *) io_evt->data); // Original offset.
1526
1527            // Set the docio handle's buffer to the AIO buffer to read
1528            // a doc from the AIO buffer. If adddtional blocks need to be
1529            // read, then they will be sequentially read through the synchronous
1530            // I/O path (i.e., buffer cache -> disk read if cache miss).
1531            // As these additional blocks are sequential reads, we don't expect
1532            // asynchronous I/O to give us performance boost.
1533            void *tmp_buffer = handle->readbuffer;
1534            handle->readbuffer = buf;
1535            handle->lastbid = offset / aio_handle->block_size;
1536            memset(&doc_array[doc_idx], 0x0, sizeof(struct docio_object));
1537            if (keymeta_only) {
1538                _offset = docio_read_doc_key_meta(handle, offset,
1539                                                  &doc_array[doc_idx], true);
1540            } else {
1541                _offset = docio_read_doc(handle, offset, &doc_array[doc_idx],
1542                                         true);
1543            }
1544            if (_offset <= 0) {
1545                ++doc_idx;
1546                handle->readbuffer = tmp_buffer;
1547                handle->lastbid = BLK_NOT_FOUND;
1548                continue;
1549            }
1550            handle->readbuffer = tmp_buffer;
1551            handle->lastbid = BLK_NOT_FOUND;
1552
1553            (*sum_doc_size) += _fdb_get_docsize(doc_array[doc_idx].length);
1554            if (keymeta_only) {
1555                (*sum_doc_size) -= doc_array[doc_idx].length.bodylen_ondisk;
1556            }
1557            ++doc_idx;
1558        }
1559    }
1560    return size;
1561#else // Plan to implement async I/O in other OSs (e.g., Windows, OSx)
1562    return 0;
1563#endif
1564#else // Async I/O is not supported in the current OS.
1565    return 0;
1566#endif
1567}
1568
1569size_t docio_batch_read_docs(struct docio_handle *handle,
1570                             uint64_t *offset_array,
1571                             struct docio_object *doc_array,
1572                             size_t array_size,
1573                             size_t data_size_threshold,
1574                             size_t batch_size_threshold,
1575                             struct async_io_handle *aio_handle,
1576                             bool keymeta_only)
1577{
1578    size_t i = 0;
1579    size_t sum_doc_size = 0;
1580    size_t doc_idx = 0;
1581    size_t block_size = handle->file->blocksize;
1582    uint64_t _offset = 0;
1583    int aio_size = 0;
1584    bool read_fail = false;
1585    bool read_on_cache_miss = true;
1586
1587    if (aio_handle) {
1588        // If async I/O is supported, we will then read non-resident docs from disk
1589        // by using async I/O operations.
1590        read_on_cache_miss = false;
1591    }
1592
1593    for (i = 0; i < array_size && i < batch_size_threshold &&
1594           sum_doc_size < data_size_threshold; ++i) {
1595        memset(&doc_array[doc_idx], 0x0, sizeof(struct docio_object));
1596        if (keymeta_only) {
1597            _offset = docio_read_doc_key_meta(handle, offset_array[i], &doc_array[doc_idx],
1598                                              read_on_cache_miss);
1599        } else {
1600            _offset = docio_read_doc(handle, offset_array[i], &doc_array[doc_idx],
1601                                     read_on_cache_miss);
1602        }
1603        if (_offset <= 0) {
1604            if (aio_handle) {
1605                // The page is not resident in the cache. Prepare and perform Async I/O
1606                handle->file->ops->aio_prep_read(aio_handle, aio_size,
1607                                                 block_size, offset_array[i]);
1608                if (++aio_size == (int) aio_handle->queue_depth) {
1609                    int num_sub = _submit_async_io_requests(handle, doc_array, doc_idx,
1610                                                            aio_handle, aio_size,
1611                                                            &sum_doc_size,
1612                                                            keymeta_only);
1613                    if (num_sub < 0 || num_sub != aio_size) {
1614                        read_fail = true;
1615                        break;
1616                    }
1617                    aio_size = 0;
1618                    doc_idx += num_sub;
1619                }
1620            } else {
1621                ++doc_idx; // Error in reading a doc.
1622            }
1623        } else {
1624            sum_doc_size += _fdb_get_docsize(doc_array[doc_idx].length);
1625            if (keymeta_only) {
1626                sum_doc_size -= doc_array[doc_idx].length.bodylen_ondisk;
1627            }
1628            ++doc_idx;
1629        }
1630    }
1631
1632    if (aio_size && !read_fail) {
1633        int num_sub = _submit_async_io_requests(handle, doc_array, doc_idx,
1634                                                aio_handle, aio_size,
1635                                                &sum_doc_size, keymeta_only);
1636        if (num_sub < 0) {
1637            read_fail = true;
1638        } else {
1639            doc_idx += num_sub;
1640        }
1641    }
1642
1643    if (read_fail) {
1644        for (i = 0; i < batch_size_threshold; ++i) {
1645            free(doc_array[i].key);
1646            free(doc_array[i].meta);
1647            free(doc_array[i].body);
1648            doc_array[i].key = doc_array[i].meta = doc_array[i].body = NULL;
1649        }
1650        return (size_t) -1;
1651    }
1652
1653    return doc_idx;
1654}
1655
1656bool docio_check_buffer(struct docio_handle *handle,
1657                        bid_t bid,
1658                        uint64_t sb_bmp_revnum)
1659{
1660    err_log_callback *log_callback = handle->log_callback;
1661    _docio_read_through_buffer(handle, bid, log_callback, true);
1662    return _docio_check_buffer(handle, sb_bmp_revnum);
1663}
1664
1665