xref: /6.6.0/forestdb/src/docio.cc (revision cc8ad991)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21
22#include "docio.h"
23#include "wal.h"
24#include "fdb_internal.h"
25#include "version.h"
26#ifdef _DOC_COMP
27#include "snappy-c.h"
28#endif
29
30#include "memleak.h"
31
32fdb_status docio_init(struct docio_handle *handle,
33                      struct filemgr *file,
34                      bool compress_document_body)
35{
36    handle->file = file;
37    handle->curblock = BLK_NOT_FOUND;
38    handle->curpos = 0;
39    handle->cur_bmp_revnum_hash = 0;
40    handle->lastbid = BLK_NOT_FOUND;
41    handle->lastBmpRevnum = 0;
42    handle->compress_document_body = compress_document_body;
43    malloc_align(handle->readbuffer, FDB_SECTOR_SIZE, file->blocksize);
44    if (!handle->readbuffer) {
45        fdb_log(NULL, FDB_RESULT_ALLOC_FAIL,
46                "(docio_init) malloc_align failed: "
47                "database file '%s'\n",
48                handle->file->filename);
49        return FDB_RESULT_ALLOC_FAIL;
50    }
51    return FDB_RESULT_SUCCESS;
52}
53
54void docio_free(struct docio_handle *handle)
55{
56    free_align(handle->readbuffer);
57}
58
59#ifdef __CRC32
60#define _add_blk_marker(file, bid, blocksize, marker, log_callback) \
61    filemgr_write_offset((file), (bid), (blocksize), BLK_MARKER_SIZE,\
62                         (marker), (false), (log_callback))
63#else
64#define _add_blk_marker(file, bid, blocksize, marker, log_callback) \
65    FDB_RESULT_SUCCESS
66#endif
67
68INLINE fdb_status _docio_fill_zero(struct docio_handle *handle, bid_t bid,
69                                   size_t pos)
70{
71    // Fill next few bytes (sizeof(struct docio_length)) with zero
72    // to avoid false positive docio_length checksum during file scanning.
73    // (Note that the checksum value of zero-filled docio_length is 0x6F.)
74
75    size_t blocksize = handle->file->blocksize;
76    size_t len_size = sizeof(struct docio_length);
77    uint8_t *zerobuf = alca(uint8_t, len_size);
78
79#ifdef __CRC32
80    if (ver_non_consecutive_doc(handle->file->version)) {
81        // new version: support non-consecutive document block
82        blocksize -= DOCBLK_META_SIZE;
83    } else {
84        // old version: block marker only
85        blocksize -= BLK_MARKER_SIZE;
86    }
87#endif
88
89    if (pos + len_size <= blocksize) {
90        // enough space in the block
91        memset(zerobuf, 0x0, len_size);
92        return filemgr_write_offset(handle->file, bid, pos, len_size,
93                                    zerobuf, false, handle->log_callback);
94    } else {
95        // lack of space .. we don't need to fill zero bytes.
96        return FDB_RESULT_SUCCESS;
97    }
98}
99
100bid_t docio_append_doc_raw(struct docio_handle *handle, uint64_t size, void *buf)
101{
102    uint32_t offset;
103    uint8_t marker[BLK_MARKER_SIZE];
104    size_t blocksize = handle->file->blocksize;
105    size_t real_blocksize = blocksize;
106    size_t remaining_space;
107    err_log_callback *log_callback = handle->log_callback;
108    bool non_consecutive = ver_non_consecutive_doc(handle->file->version);
109    struct docblk_meta blk_meta;
110
111    memset(&blk_meta, 0x0, sizeof(blk_meta));
112    blk_meta.marker = BLK_MARKER_DOC;
113    (void)blk_meta;
114
115#ifdef __CRC32
116    if (non_consecutive) {
117        // new version: support non-consecutive document block
118        blocksize -= DOCBLK_META_SIZE;
119    } else {
120        // old version: block marker only
121        blocksize -= BLK_MARKER_SIZE;
122    }
123    memset(marker, BLK_MARKER_DOC, BLK_MARKER_SIZE);
124#endif
125
126    if (handle->curblock == BLK_NOT_FOUND) {
127        // allocate new block
128        handle->cur_bmp_revnum_hash =
129            filemgr_get_sb_bmp_revnum(handle->file) & BMP_REVNUM_MASK;
130        handle->curblock = filemgr_alloc(handle->file, log_callback);
131        handle->curpos = 0;
132    }
133    if (!filemgr_is_writable(handle->file, handle->curblock)) {
134        // mark remaining space in old block as stale
135        if (handle->curpos < real_blocksize) {
136            // this function will calculate block marker size automatically.
137            filemgr_mark_stale(handle->file,
138                               real_blocksize * handle->curblock + handle->curpos,
139                               blocksize - handle->curpos);
140        }
141        // allocate new block
142        handle->cur_bmp_revnum_hash =
143            filemgr_get_sb_bmp_revnum(handle->file) & BMP_REVNUM_MASK;
144        handle->curblock = filemgr_alloc(handle->file, log_callback);
145        handle->curpos = 0;
146    }
147    blk_meta.sb_bmp_revnum_hash = _endian_encode(handle->cur_bmp_revnum_hash);
148
149    remaining_space = blocksize - handle->curpos;
150    if (size <= remaining_space) {
151        fdb_status fs = FDB_RESULT_SUCCESS;
152        // simply append to current block
153        offset = handle->curpos;
154
155        if (non_consecutive) {
156            // set next BID
157            blk_meta.next_bid = BLK_NOT_FOUND;
158            // write meta
159            fs = filemgr_write_offset(handle->file, handle->curblock,
160                                      blocksize, sizeof(blk_meta), &blk_meta,
161                                      false, log_callback);
162        } else {
163            fs = _add_blk_marker(handle->file, handle->curblock, blocksize, marker,
164                                 log_callback);
165        }
166
167
168        if (fs != FDB_RESULT_SUCCESS) {
169            fdb_log(log_callback, fs,
170                    "Error in appending a doc block marker for a block id %" _F64
171                    " into a database file '%s'", handle->curblock,
172                    handle->file->filename);
173            return BLK_NOT_FOUND;
174        }
175        fs = filemgr_write_offset(handle->file, handle->curblock, offset, size,
176                                  buf, (size == remaining_space), log_callback);
177        if (fs != FDB_RESULT_SUCCESS) {
178            fdb_log(log_callback, fs,
179                    "Error in writing a doc block with id %" _F64 ", offset %d, size %"
180                    _F64 " to a database file '%s'", handle->curblock, offset, size,
181                    handle->file->filename);
182            return BLK_NOT_FOUND;
183        }
184        handle->curpos += size;
185
186        if (_docio_fill_zero(handle, handle->curblock, handle->curpos) !=
187            FDB_RESULT_SUCCESS) {
188            return BLK_NOT_FOUND;
189        }
190
191        return handle->curblock * real_blocksize + offset;
192
193    } else { // insufficient space to fit entire document into current block
194        bid_t begin, end, i, startpos;
195        bid_t *block_list, block_list_size = 0;
196        uint16_t *bmp_revnum_list;
197        uint32_t nblock = size / blocksize;
198        uint32_t remain = size % blocksize;
199        uint64_t remainsize = size;
200        fdb_status fs = FDB_RESULT_SUCCESS;
201
202        // as blocks may not be consecutive, we need to maintain
203        // the list of BIDs.
204        block_list = (bid_t *)alca(bid_t, nblock+1);
205        bmp_revnum_list = (uint16_t *)alca(uint16_t, nblock+1);
206
207#ifdef DOCIO_BLOCK_ALIGN
208        offset = blocksize - handle->curpos;
209        if (remain <= blocksize - handle->curpos &&
210            filemgr_alloc_multiple_cond(handle->file, handle->curblock+1,
211                                        nblock + ((remain>offset)?1:0), &begin, &end,
212                                        log_callback) == handle->curblock+1) {
213
214            // start from current block
215            if (begin != (handle->curblock + 1)) {
216                fdb_log(log_callback, fs,
217                        "Error in allocating blocks starting from block id %" _F64
218                        " in a database file '%s'", handle->curblock + 1,
219                        handle->file->filename);
220                return BLK_NOT_FOUND;
221            }
222
223            fs = _add_blk_marker(handle->file, handle->curblock, blocksize,
224                                 marker, log_callback);
225            if (fs != FDB_RESULT_SUCCESS) {
226                fdb_log(log_callback, fs,
227                        "Error in appending a doc block marker for a block id %" _F64
228                        " into a database file '%s'", handle->curblock,
229                        handle->file->filename);
230                return BLK_NOT_FOUND;
231            }
232            if (offset > 0) {
233                fs = filemgr_write_offset(handle->file, handle->curblock,
234                                          handle->curpos, offset, buf,
235                                          true, // mark block as immutable
236                                          log_callback);
237                if (fs != FDB_RESULT_SUCCESS) {
238                    fdb_log(log_callback, fs,
239                            "Error in writing a doc block with id %" _F64 ", offset %d, "
240                            "size %" _F64 " to a database file '%s'", handle->curblock,
241                            offset, size, handle->file->filename);
242                    return BLK_NOT_FOUND;
243                }
244            }
245            remainsize -= offset;
246
247            startpos = handle->curblock * real_blocksize + handle->curpos;
248        } else {
249            // next block to be allocated is not continuous .. allocate new multiple blocks
250            filemgr_alloc_multiple(handle->file, nblock+((remain>0)?1:0),
251                                   &begin, &end, log_callback);
252            offset = 0;
253
254            startpos = begin * real_blocksize;
255        }
256
257#else
258        // Simple append mode
259        // The given doc is appended at the byte offset right next the last doc.
260        // Note that block allocation can be non-consecutive.
261        offset = blocksize - handle->curpos;
262
263        if (non_consecutive) {
264            // new version: support non-consecutive allocation
265
266            bool new_block = false;
267            bool start_from_new_block = false;
268
269            if (remain > offset) {
270                // if the current block cannot accommodate the remaining length
271                // of the document, allocate an additional block.
272                new_block = true;
273            }
274
275            block_list_size = nblock + ((new_block)?1:0);
276            for (i=0; i<block_list_size; ++i) {
277                bmp_revnum_list[i] = filemgr_get_sb_bmp_revnum(handle->file) &
278                                     BMP_REVNUM_MASK;
279                block_list[i] = filemgr_alloc(handle->file, log_callback);
280
281                if (i == 0 && handle->curblock != BLK_NOT_FOUND &&
282                    block_list[i] > handle->curblock+1) {
283                    // if the first new allocated block is not consecutive
284                    // from the current block, start writing document from
285                    // the new block.
286                    start_from_new_block = true;
287                    // since we won't write into the current block,
288                    // allocate one more block if necessary.
289                    if (remain && !new_block) {
290                        new_block = true;
291                        block_list_size++;
292                    }
293                }
294            }
295
296            if (offset > 0 && !start_from_new_block) {
297                // start from the current block
298
299                // set next BID
300                blk_meta.next_bid = _endian_encode(block_list[0]);
301                // write meta
302                fs = filemgr_write_offset(handle->file, handle->curblock,
303                                          blocksize, sizeof(blk_meta), &blk_meta,
304                                          false, log_callback);
305                if (fs != FDB_RESULT_SUCCESS) {
306                    fdb_log(log_callback, fs,
307                            "Error in appending a doc block metadata for a block id %" _F64
308                            " into a database file '%s'", handle->curblock,
309                            handle->file->filename);
310                    return BLK_NOT_FOUND;
311                }
312
313                // write the front part of the doc
314                if (offset > 0) {
315                    fs = filemgr_write_offset(handle->file, handle->curblock,
316                                              handle->curpos, offset, buf,
317                                              true, // mark block as immutable
318                                              log_callback);
319                    if (fs != FDB_RESULT_SUCCESS) {
320                        fdb_log(log_callback, fs,
321                                "Error in writing a doc block with id %" _F64 ", offset %d, "
322                                "size %" _F64 " to a database file '%s'", handle->curblock,
323                                offset, size, handle->file->filename);
324                        return BLK_NOT_FOUND;
325                    }
326                }
327                remainsize -= offset;
328
329                startpos = handle->curblock * real_blocksize + handle->curpos;
330            } else {
331                // mark remaining space in the current block as stale
332                if (handle->curblock != BLK_NOT_FOUND &&
333                    handle->curpos < real_blocksize) {
334                    filemgr_mark_stale(handle->file,
335                                       real_blocksize * handle->curblock + handle->curpos,
336                                       blocksize - handle->curpos);
337                }
338                offset = 0;
339                startpos = block_list[0] * real_blocksize;
340            }
341
342        } else {
343            // old version: consecutive allocation only
344
345            if (filemgr_alloc_multiple_cond(handle->file, handle->curblock+1,
346                                            nblock + ((remain>offset)?1:0), &begin, &end,
347                                            log_callback) == handle->curblock+1) {
348                // start from current block
349                if (begin != (handle->curblock + 1)) {
350                    fdb_log(log_callback, fs,
351                            "Error in allocating blocks starting from block id %" _F64
352                            " in a database file '%s'", handle->curblock + 1,
353                            handle->file->filename);
354                    return BLK_NOT_FOUND;
355                }
356
357                fs = _add_blk_marker(handle->file, handle->curblock, blocksize,
358                                     marker, log_callback);
359                if (fs != FDB_RESULT_SUCCESS) {
360                    fdb_log(log_callback, fs,
361                            "Error in appending a doc block marker for a block id %" _F64
362                            " into a database file '%s'", handle->curblock,
363                            handle->file->filename);
364                    return BLK_NOT_FOUND;
365                }
366                if (offset > 0) {
367                    fs = filemgr_write_offset(handle->file, handle->curblock,
368                                              handle->curpos, offset, buf,
369                                              true, // mark block as immutable
370                                              log_callback);
371                    if (fs != FDB_RESULT_SUCCESS) {
372                        fdb_log(log_callback, fs,
373                                "Error in writing a doc block with id %" _F64 ", offset %d, "
374                                "size %" _F64 " to a database file '%s'", handle->curblock,
375                                offset, size, handle->file->filename);
376                        return BLK_NOT_FOUND;
377                    }
378                }
379                remainsize -= offset;
380
381                startpos = handle->curblock * real_blocksize + handle->curpos;
382            } else {
383                // next block to be allocated is not continuous
384                // mark remaining space in the old block as stale
385                if (handle->curblock != BLK_NOT_FOUND &&
386                    handle->curpos < real_blocksize) {
387                    filemgr_mark_stale(handle->file,
388                                       real_blocksize * handle->curblock + handle->curpos,
389                                       blocksize - handle->curpos);
390                }
391                // allocate new multiple blocks
392                filemgr_alloc_multiple(handle->file, nblock+((remain>0)?1:0),
393                                       &begin, &end, log_callback);
394                offset = 0;
395
396                startpos = begin * real_blocksize;
397            }
398
399            block_list_size = end - begin + 1;
400            for (i=0; i<block_list_size; ++i) {
401                block_list[i] = begin+i;
402            }
403
404        } // if (non_consecutive)
405
406#endif
407
408        for (i=0; i<block_list_size; ++i) {
409            handle->curblock = block_list[i];
410            handle->cur_bmp_revnum_hash = bmp_revnum_list[i];
411            blk_meta.sb_bmp_revnum_hash = _endian_encode(handle->cur_bmp_revnum_hash);
412
413            if (non_consecutive) {
414                if (i < block_list_size - 1) {
415                    blk_meta.next_bid = _endian_encode(block_list[i+1]);
416                } else {
417                    // the last block .. set next BID '0xffff...'
418                    memset(&blk_meta.next_bid, 0xff, sizeof(blk_meta.next_bid));
419                }
420            }
421
422            // write meta (new) or block marker (old)
423            if (non_consecutive) {
424                fs = filemgr_write_offset(handle->file, handle->curblock,
425                                          blocksize, sizeof(blk_meta), &blk_meta,
426                                          false, log_callback);
427            } else {
428                fs = _add_blk_marker(handle->file, block_list[i], blocksize, marker,
429                                     log_callback);
430            }
431            if (fs != FDB_RESULT_SUCCESS) {
432                fdb_log(log_callback, fs,
433                        "Error in appending a doc block marker for a block "
434                        "id %" _F64 " into a database file '%s'", block_list[i],
435                        handle->file->filename);
436                return BLK_NOT_FOUND;
437            }
438
439            if (remainsize >= blocksize) {
440                // write entire block
441
442                fs = filemgr_write_offset(handle->file, block_list[i], 0, blocksize,
443                                          (uint8_t *)buf + offset,
444                                          true, // mark block as immutable
445                                          log_callback);
446                if (fs != FDB_RESULT_SUCCESS) {
447                    fdb_log(log_callback, fs,
448                            "Error in writing an entire doc block with id %" _F64
449                            ", size %" _F64 " to a database file '%s'", block_list[i], blocksize,
450                            handle->file->filename);
451                    return BLK_NOT_FOUND;
452                }
453                offset += blocksize;
454                remainsize -= blocksize;
455                handle->curpos = blocksize;
456
457            } else {
458                // write rest of document
459                fdb_assert(i==block_list_size-1, i, block_list_size-1);
460
461                fs = filemgr_write_offset(handle->file, block_list[i], 0, remainsize,
462                                          (uint8_t *)buf + offset,
463                                          (remainsize == blocksize),
464                                          log_callback);
465                if (fs != FDB_RESULT_SUCCESS) {
466                    fdb_log(log_callback, fs,
467                            "Error in writing a doc block with id %" _F64 ", "
468                            "size %" _F64 " to a database file '%s'", block_list[i], remainsize,
469                            handle->file->filename);
470                    return BLK_NOT_FOUND;
471                }
472                offset += remainsize;
473                handle->curpos = remainsize;
474
475                if (_docio_fill_zero(handle, block_list[i], handle->curpos) !=
476                    FDB_RESULT_SUCCESS) {
477                    return BLK_NOT_FOUND;
478                }
479            }
480        }
481
482        return startpos;
483    }
484
485    return 0;
486}
487
488#ifdef __ENDIAN_SAFE
489INLINE struct docio_length _docio_length_encode(struct docio_length length)
490{
491    struct docio_length ret;
492    ret = length;
493    ret.keylen = _endian_encode(length.keylen);
494    ret.metalen = _endian_encode(length.metalen);
495    ret.bodylen = _endian_encode(length.bodylen);
496    ret.bodylen_ondisk = _endian_encode(length.bodylen_ondisk);
497    return ret;
498}
499INLINE struct docio_length _docio_length_decode(struct docio_length length)
500{
501    struct docio_length ret;
502    ret = length;
503    ret.keylen = _endian_decode(length.keylen);
504    ret.metalen = _endian_decode(length.metalen);
505    ret.bodylen = _endian_decode(length.bodylen);
506    ret.bodylen_ondisk = _endian_decode(length.bodylen_ondisk);
507    return ret;
508}
509#else
510#define _docio_length_encode(a)
511#define _docio_length_decode(a)
512#endif
513
514INLINE uint8_t _docio_length_checksum(struct docio_length length, struct docio_handle* handle)
515{
516    return uint8_t(get_checksum(reinterpret_cast<const uint8_t*>(&length),
517                                sizeof(keylen_t) + sizeof(uint16_t) + sizeof(uint32_t)*2,
518                                handle->file->crc_mode) & 0xff);
519}
520
521INLINE bid_t _docio_append_doc(struct docio_handle *handle, struct docio_object *doc)
522{
523    size_t _len;
524    uint32_t offset = 0;
525    uint32_t crc;
526    uint64_t docsize;
527    void *buf = NULL;
528    bid_t ret_offset;
529    fdb_seqnum_t _seqnum;
530    timestamp_t _timestamp;
531    struct docio_length length, _length;
532    err_log_callback *log_callback = handle->log_callback;
533
534    length = doc->length;
535    length.bodylen_ondisk = length.bodylen;
536
537#ifdef _DOC_COMP
538    int ret;
539    void *compbuf = NULL;
540    uint32_t compbuf_len = 0;
541    if (doc->length.bodylen > 0 && handle->compress_document_body) {
542        compbuf_len = snappy_max_compressed_length(length.bodylen);
543        compbuf = (void *)malloc(compbuf_len);
544
545        _len = compbuf_len;
546        ret = snappy_compress((char*)doc->body, length.bodylen, (char*)compbuf, &_len);
547        if (ret < 0) { // LCOV_EXCL_START
548            fdb_log(log_callback, FDB_RESULT_COMPRESSION_FAIL,
549                    "Error in compressing the doc body of key '%s' from "
550                    "a database file '%s'",
551                    (char *) doc->key, handle->file->filename);
552            free(compbuf);
553            // we use BLK_NOT_FOUND for error code of appending instead of 0
554            // because document can be written at the byte offset 0
555            return BLK_NOT_FOUND;
556        } // LCOV_EXCL_STOP
557
558        length.bodylen_ondisk = compbuf_len = _len;
559        length.flag |= DOCIO_COMPRESSED;
560
561        docsize = sizeof(struct docio_length) + length.keylen + length.metalen;
562        docsize += compbuf_len;
563    } else {
564        docsize = sizeof(struct docio_length) + length.keylen + length.metalen + length.bodylen;
565        compbuf_len = length.bodylen;
566    }
567#else
568    docsize = sizeof(struct docio_length) + length.keylen + length.metalen + length.bodylen;
569#endif
570    docsize += sizeof(timestamp_t);
571
572    docsize += sizeof(fdb_seqnum_t);
573
574#ifdef __CRC32
575    docsize += sizeof(crc);
576#endif
577
578    doc->length = length;
579    buf = (void *)malloc(docsize);
580
581    _length = _docio_length_encode(length);
582
583    // calculate checksum of LENGTH using crc
584    _length.checksum = _docio_length_checksum(_length, handle);
585
586    memcpy((uint8_t *)buf + offset, &_length, sizeof(struct docio_length));
587    offset += sizeof(struct docio_length);
588
589    // copy key
590    memcpy((uint8_t *)buf + offset, doc->key, length.keylen);
591    offset += length.keylen;
592
593    // copy timestamp
594    _timestamp = _endian_encode(doc->timestamp);
595    memcpy((uint8_t*)buf + offset, &_timestamp, sizeof(_timestamp));
596    offset += sizeof(_timestamp);
597
598    // copy seqeunce number (optional)
599    _seqnum = _endian_encode(doc->seqnum);
600    memcpy((uint8_t *)buf + offset, &_seqnum, sizeof(fdb_seqnum_t));
601    offset += sizeof(fdb_seqnum_t);
602
603    // copy metadata (optional)
604    if (length.metalen > 0) {
605        memcpy((uint8_t *)buf + offset, doc->meta, length.metalen);
606        offset += length.metalen;
607    }
608
609    // copy body (optional)
610    if (length.bodylen > 0) {
611#ifdef _DOC_COMP
612        if (length.flag & DOCIO_COMPRESSED) {
613            // compressed body
614            if (compbuf) {
615                memcpy((uint8_t*)buf + offset, compbuf, compbuf_len);
616                offset += compbuf_len;
617                free(compbuf);
618            }
619        } else {
620            memcpy((uint8_t *)buf + offset, doc->body, length.bodylen);
621            offset += length.bodylen;
622        }
623#else
624        memcpy((uint8_t *)buf + offset, doc->body, length.bodylen);
625        offset += length.bodylen;
626#endif
627    }
628
629#ifdef __CRC32
630    crc = get_checksum(reinterpret_cast<const uint8_t*>(buf),
631                       docsize - sizeof(crc),
632                       handle->file->crc_mode);
633    memcpy((uint8_t *)buf + offset, &crc, sizeof(crc));
634#endif
635
636    ret_offset = docio_append_doc_raw(handle, docsize, buf);
637    free(buf);
638
639    return ret_offset;
640}
641
642bid_t docio_append_commit_mark(struct docio_handle *handle, uint64_t doc_offset)
643{
644    // Note: should adapt DOCIO_COMMIT_MARK_SIZE if this function is modified.
645    uint32_t offset = 0;
646    uint64_t docsize;
647    uint64_t _doc_offset;
648    void *buf;
649    bid_t ret_offset;
650    struct docio_length length, _length;
651
652    memset(&length, 0, sizeof(struct docio_length));
653    length.flag = DOCIO_TXN_COMMITTED;
654
655    docsize = sizeof(struct docio_length) + sizeof(doc_offset);
656    buf = (void *)malloc(docsize);
657
658    _length = _docio_length_encode(length);
659
660    // calculate checksum of LENGTH using crc
661    _length.checksum = _docio_length_checksum(_length, handle);
662
663    memcpy((uint8_t *)buf + offset, &_length, sizeof(struct docio_length));
664    offset += sizeof(struct docio_length);
665
666    // copy doc_offset
667    _doc_offset = _endian_encode(doc_offset);
668    memcpy((uint8_t *)buf + offset, &_doc_offset, sizeof(_doc_offset));
669
670    ret_offset = docio_append_doc_raw(handle, docsize, buf);
671    free(buf);
672
673    return ret_offset;
674}
675
676bid_t docio_append_doc(struct docio_handle *handle, struct docio_object *doc,
677                       uint8_t deleted, uint8_t txn_enabled)
678{
679    doc->length.flag = DOCIO_NORMAL;
680    if (deleted) {
681        doc->length.flag |= DOCIO_DELETED;
682    }
683    if (txn_enabled) {
684        doc->length.flag |= DOCIO_TXN_DIRTY;
685    }
686    return _docio_append_doc(handle, doc);
687}
688
689bid_t docio_append_doc_system(struct docio_handle *handle, struct docio_object *doc)
690{
691    doc->length.flag = DOCIO_NORMAL | DOCIO_SYSTEM;
692    return _docio_append_doc(handle, doc);
693}
694
695INLINE fdb_status _docio_read_through_buffer(struct docio_handle *handle,
696                                             bid_t bid,
697                                             err_log_callback *log_callback,
698                                             bool read_on_cache_miss)
699{
700    fdb_status status = FDB_RESULT_SUCCESS;
701
702    // if superblock's BMP revnum has been changed,
703    // then 'lastbid' should be reset as it might be reused.
704    if (handle->lastbid != BLK_NOT_FOUND &&
705        filemgr_get_sb_bmp_revnum(handle->file) != handle->lastBmpRevnum) {
706        handle->lastbid = BLK_NOT_FOUND;
707    }
708
709    // to reduce the overhead from memcpy the same block
710    if (handle->lastbid != bid) {
711        status = filemgr_read(handle->file, bid, handle->readbuffer,
712                              log_callback, read_on_cache_miss);
713        if (status != FDB_RESULT_SUCCESS) {
714            if (read_on_cache_miss) {
715                fdb_log(log_callback, status,
716                        "Error in reading a doc block with id %" _F64 " from "
717                        "a database file '%s'", bid, handle->file->filename);
718            }
719            // we must reset 'lastbid' here because now 'readbuffer'
720            // may contain other data unrelated to 'lastbid'.
721            handle->lastbid = BLK_NOT_FOUND;
722            return status;
723        }
724
725        if (filemgr_is_writable(handle->file, bid)) {
726            // this block can be modified later .. must be re-read
727            handle->lastbid = BLK_NOT_FOUND;
728        } else {
729            handle->lastbid = bid;
730            handle->lastBmpRevnum = filemgr_get_sb_bmp_revnum(handle->file);
731        }
732    }
733
734    return status;
735}
736
737INLINE bool _docio_check_buffer(struct docio_handle *handle, uint64_t bmp_revnum)
738{
739    size_t blocksize = handle->file->blocksize;
740    bool non_consecutive = ver_non_consecutive_doc(handle->file->version);
741    struct docblk_meta blk_meta;
742
743    if (non_consecutive) {
744        // new version: support non-consecutive document block
745        blocksize -= DOCBLK_META_SIZE;
746        memcpy(&blk_meta, (uint8_t*)handle->readbuffer + blocksize, sizeof(blk_meta));
747    } else {
748        // old version: block marker only
749        blocksize -= BLK_MARKER_SIZE;
750        memcpy(&blk_meta.marker, (uint8_t*)handle->readbuffer + blocksize,
751               sizeof(blk_meta.marker));
752    }
753
754    if (blk_meta.marker != BLK_MARKER_DOC) {
755        return false;
756    }
757
758    if (non_consecutive && bmp_revnum != (uint64_t)-1) {
759        uint16_t revnum_hash = _endian_decode(blk_meta.sb_bmp_revnum_hash);
760        if (revnum_hash == (bmp_revnum & BMP_REVNUM_MASK)) {
761            return true;
762        } else {
763            return false;
764        }
765    }
766    return true;
767}
768
769static int64_t _docio_read_length(struct docio_handle *handle,
770                                  uint64_t offset,
771                                  struct docio_length *length,
772                                  err_log_callback *log_callback,
773                                  bool read_on_cache_miss)
774{
775    size_t blocksize = handle->file->blocksize;
776    size_t real_blocksize = blocksize;
777    bool non_consecutive = ver_non_consecutive_doc(handle->file->version);
778    struct docblk_meta blk_meta;
779#ifdef __CRC32
780    if (non_consecutive) {
781        // new version: support non-consecutive document block
782        blocksize -= DOCBLK_META_SIZE;
783    } else {
784        // old version: block marker only
785        blocksize -= BLK_MARKER_SIZE;
786    }
787#endif
788
789    bid_t bid = offset / real_blocksize;
790    uint32_t pos = offset % real_blocksize;
791    void *buf = handle->readbuffer;
792    uint32_t restsize = 0;
793
794    if (blocksize > pos) {
795        restsize = blocksize - pos;
796    }
797
798    // read length structure
799    fdb_status fs = _docio_read_through_buffer(handle, bid, log_callback,
800                                               read_on_cache_miss);
801    if (fs != FDB_RESULT_SUCCESS) {
802        if (read_on_cache_miss) {
803            fdb_log(log_callback, fs,
804                    "Error in reading a doc length from offset %" _F64
805                    " in block id %" _F64
806                    " from a database file '%s'", offset, bid,
807                    handle->file->filename);
808        }
809        return (int64_t) fs;
810    }
811    if (!_docio_check_buffer(handle, (uint64_t)-1)) {
812        return (int64_t) FDB_RESULT_READ_FAIL; // Need to define a better error code
813    }
814
815    if (restsize >= sizeof(struct docio_length)) {
816        memcpy(length, (uint8_t *)buf + pos, sizeof(struct docio_length));
817        pos += sizeof(struct docio_length);
818
819    } else {
820        if (restsize > 0) {
821            memcpy(length, (uint8_t *)buf + pos, restsize);
822        }
823        // read additional block
824        if (non_consecutive) {
825            memcpy(&blk_meta, (uint8_t*)buf + blocksize, sizeof(blk_meta));
826            bid = _endian_decode(blk_meta.next_bid);
827            if (bid == BLK_NOT_FOUND) {
828                // Reached the last block. Simply return the offset that is passed to
829                // this function.
830                memset(length, 0x0, sizeof(struct docio_length));
831                return offset;
832            }
833        } else {
834            bid++;
835        }
836
837        fs = _docio_read_through_buffer(handle, bid, log_callback, true);
838        if (fs != FDB_RESULT_SUCCESS) {
839            fdb_log(log_callback, fs,
840                    "Error in reading a doc length from an additional block "
841                    "offset %" _F64 " in block id %" _F64
842                    " from a database file '%s'", offset,
843                    bid, handle->file->filename);
844            return (int64_t) fs;
845        }
846        if (!_docio_check_buffer(handle, (uint64_t)-1)) {
847            return (int64_t) FDB_RESULT_READ_FAIL; // Need to define a better error code
848        }
849        // memcpy rest of data
850        memcpy((uint8_t *)length + restsize, buf,
851               sizeof(struct docio_length) - restsize);
852        pos = sizeof(struct docio_length) - restsize;
853    }
854
855    return bid * real_blocksize + pos;
856}
857
858static int64_t _docio_read_doc_component(struct docio_handle *handle,
859                                         uint64_t offset,
860                                         uint32_t len,
861                                         void *buf_out,
862                                         err_log_callback *log_callback)
863{
864    uint32_t rest_len;
865    size_t blocksize = handle->file->blocksize;
866    size_t real_blocksize = blocksize;
867    bool non_consecutive = ver_non_consecutive_doc(handle->file->version);
868    struct docblk_meta blk_meta;
869#ifdef __CRC32
870    if (non_consecutive) {
871        // new version: support non-consecutive document block
872        blocksize -= DOCBLK_META_SIZE;
873    } else {
874        // old version: block marker only
875        blocksize -= BLK_MARKER_SIZE;
876    }
877#endif
878
879    bid_t bid = offset / real_blocksize;
880    uint32_t pos = offset % real_blocksize;
881    //uint8_t buf[handle->file->blocksize];
882    void *buf = handle->readbuffer;
883    uint32_t restsize;
884    fdb_status fs = FDB_RESULT_SUCCESS;
885
886    rest_len = len;
887
888    while(rest_len > 0) {
889        fs = _docio_read_through_buffer(handle, bid, log_callback, true);
890        if (fs != FDB_RESULT_SUCCESS) {
891            fdb_log(log_callback, fs,
892                    "Error in reading a doc block with block id %" _F64 " from "
893                    "a database file '%s'", bid, handle->file->filename);
894            return (int64_t)fs;
895        }
896        restsize = blocksize - pos;
897
898        if (restsize >= rest_len) {
899            memcpy((uint8_t *)buf_out + (len - rest_len), (uint8_t *)buf + pos, rest_len);
900            pos += rest_len;
901            rest_len = 0;
902        }else{
903            memcpy((uint8_t *)buf_out + (len - rest_len), (uint8_t *)buf + pos, restsize);
904
905            if (non_consecutive) {
906                memcpy(&blk_meta, (uint8_t*)buf + blocksize, sizeof(blk_meta));
907                bid = _endian_decode(blk_meta.next_bid);
908            } else {
909                bid++;
910            }
911
912            pos = 0;
913            rest_len -= restsize;
914
915            if (rest_len > 0 &&
916                bid >= filemgr_get_pos(handle->file) / handle->file->blocksize) {
917                // no more data in the file .. the file is corrupted
918                fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
919                        "Fatal error!!! Database file '%s' is corrupted.",
920                        handle->file->filename);
921                return (int64_t)FDB_RESULT_FILE_CORRUPTION;
922            }
923        }
924    }
925
926    return bid * real_blocksize + pos;
927}
928
929#ifdef _DOC_COMP
930
931static int64_t _docio_read_doc_component_comp(struct docio_handle *handle,
932                                              uint64_t offset,
933                                              uint32_t len,
934                                              uint32_t comp_len,
935                                              void *buf_out,
936                                              void *comp_data_out,
937                                              err_log_callback *log_callback)
938{
939    int ret;
940    size_t uncomp_size;
941    int64_t _offset;
942
943    _offset = _docio_read_doc_component(handle, offset,
944                                        comp_len, comp_data_out, log_callback);
945    if (_offset < 0) {
946        fdb_log(log_callback, (fdb_status) _offset,
947                "Error in reading the file with offset %" _F64 ", length %d "
948                "from a database file '%s'", offset, len,
949                handle->file->filename);
950        return _offset;
951    }
952
953    uncomp_size = len;
954    ret = snappy_uncompress((char*)comp_data_out, comp_len,
955                            (char*)buf_out, &uncomp_size);
956    if (ret < 0) {
957        fdb_log(log_callback, FDB_RESULT_COMPRESSION_FAIL,
958                "Error in decompressing the data that was read with the file "
959                "offset %" _F64 ", length %d from a database file '%s'",
960                offset, len, handle->file->filename);
961        return (int64_t) FDB_RESULT_COMPRESSION_FAIL;
962    }
963    if (uncomp_size != len) {
964        fdb_log(log_callback, FDB_RESULT_COMPRESSION_FAIL,
965                "Error in decompressing the data with the file offset "
966                "%" _F64 " in a database file '%s', because the uncompressed length %d "
967                "is not same as the expected length %d",
968                offset, handle->file->filename, uncomp_size, len);
969        return (int64_t) FDB_RESULT_COMPRESSION_FAIL;
970    }
971    return _offset;
972}
973
974#endif
975
976fdb_status docio_read_doc_length(struct docio_handle *handle,
977                                 struct docio_length *length,
978                                 uint64_t offset)
979{
980    uint8_t checksum;
981    int64_t _offset;
982    struct docio_length _length, zero_length;
983    err_log_callback *log_callback = handle->log_callback;
984
985    _offset = _docio_read_length(handle, offset, &_length, log_callback, true);
986    if (_offset < 0) {
987        return (fdb_status) _offset;
988    }
989
990    memset(&zero_length, 0x0, sizeof(struct docio_length));
991    if (memcmp(&_length, &zero_length, sizeof(struct docio_length)) == 0) {
992        // If all the fields in docio_length are zero, then it means that the rest of
993        // the current block, which starts at offset, is zero-filled and can be skipped.
994        *length = zero_length;
995        return FDB_RESULT_SUCCESS;
996    }
997
998    // checksum check
999    checksum = _docio_length_checksum(_length, handle);
1000    if (checksum != _length.checksum) {
1001        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1002                "doc_length checksum mismatch error in a database file '%s'"
1003                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
1004                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1005                checksum, _length.checksum, _length.keylen, _length.metalen,
1006                _length.bodylen, _length.bodylen_ondisk, offset);
1007        return FDB_RESULT_CHECKSUM_ERROR;
1008    }
1009
1010    *length = _docio_length_decode(_length);
1011    if (length->keylen == 0 || length->keylen > FDB_MAX_KEYLEN_INTERNAL) {
1012        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1013                "Error in decoding the doc length metadata in file %s"
1014                " crc %x keylen %d metalen %d bodylen %d "
1015                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1016                checksum, _length.keylen, _length.metalen,
1017                _length.bodylen, _length.bodylen_ondisk, offset);
1018        return FDB_RESULT_FILE_CORRUPTION;
1019    }
1020
1021    return FDB_RESULT_SUCCESS;
1022}
1023
1024fdb_status docio_read_doc_key(struct docio_handle *handle, uint64_t offset,
1025                              keylen_t *keylen, void *keybuf)
1026{
1027    uint8_t checksum;
1028    int64_t _offset;
1029    struct docio_length length, _length, zero_length;
1030    err_log_callback *log_callback = handle->log_callback;
1031
1032    _offset = _docio_read_length(handle, offset, &_length, log_callback, true);
1033    if (_offset < 0) {
1034        fdb_log(log_callback, (fdb_status) _offset,
1035                "Error in reading the doc length metadata with offset %" _F64 " from "
1036                "a database file '%s'",
1037                offset, handle->file->filename);
1038        return (fdb_status) _offset;
1039    }
1040
1041    memset(&zero_length, 0x0, sizeof(struct docio_length));
1042    if (memcmp(&_length, &zero_length, sizeof(struct docio_length)) == 0) {
1043        // If all the fields in docio_length are zero, then it means that the rest of
1044        // the current block, which starts at offset, is zero-filled and can be skipped.
1045        *keylen = 0;
1046        return FDB_RESULT_SUCCESS;
1047    }
1048
1049    // checksum check
1050    checksum = _docio_length_checksum(_length, handle);
1051    if (checksum != _length.checksum) {
1052        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1053                "doc_length key checksum mismatch error in a database file '%s'"
1054                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
1055                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1056                checksum, _length.checksum, _length.keylen, _length.metalen,
1057                _length.bodylen, _length.bodylen_ondisk, offset);
1058        return FDB_RESULT_CHECKSUM_ERROR;
1059    }
1060
1061    length = _docio_length_decode(_length);
1062    if (length.keylen == 0 || length.keylen > FDB_MAX_KEYLEN_INTERNAL) {
1063        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1064                "Error in decoding the doc key length metadata in file %s"
1065                " crc %x keylen %d metalen %d bodylen %d "
1066                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1067                checksum, _length.keylen, _length.metalen,
1068                _length.bodylen, _length.bodylen_ondisk, offset);
1069        return FDB_RESULT_FILE_CORRUPTION;
1070    }
1071
1072    _offset = _docio_read_doc_component(handle, _offset, length.keylen,
1073                                        keybuf, log_callback);
1074    if (_offset < 0) {
1075        fdb_log(log_callback, (fdb_status) _offset,
1076                "Error in reading a key with offset %" _F64 ", length %d "
1077                "from a database file '%s'", _offset, length.keylen,
1078                handle->file->filename);
1079        return (fdb_status) _offset;
1080    }
1081
1082    *keylen = length.keylen;
1083    return FDB_RESULT_SUCCESS;
1084}
1085
1086void free_docio_object(struct docio_object *doc, uint8_t key_alloc,
1087                       uint8_t meta_alloc, uint8_t body_alloc) {
1088    if (!doc) {
1089        return;
1090    }
1091
1092    if (key_alloc) {
1093        free(doc->key);
1094        doc->key = NULL;
1095    }
1096    if (meta_alloc) {
1097        free(doc->meta);
1098        doc->meta = NULL;
1099    }
1100    if (body_alloc) {
1101        free(doc->body);
1102        doc->body = NULL;
1103    }
1104}
1105
1106int64_t docio_read_doc_key_meta(struct docio_handle *handle, uint64_t offset,
1107                                struct docio_object *doc,
1108                                bool read_on_cache_miss)
1109{
1110    uint8_t checksum;
1111    int64_t _offset;
1112    int key_alloc = 0;
1113    int meta_alloc = 0;
1114    fdb_seqnum_t _seqnum;
1115    timestamp_t _timestamp;
1116    struct docio_length _length, zero_length;
1117    err_log_callback *log_callback = handle->log_callback;
1118
1119    _offset = _docio_read_length(handle, offset, &_length, log_callback,
1120                                 read_on_cache_miss);
1121    if (_offset < 0) {
1122        if (read_on_cache_miss) {
1123            fdb_log(log_callback, (fdb_status) _offset,
1124                    "Error in reading the doc length metadata with offset %" _F64 " from "
1125                    "a database file '%s'",
1126                    offset, handle->file->filename);
1127        }
1128        return _offset;
1129    }
1130
1131    memset(&zero_length, 0x0, sizeof(struct docio_length));
1132    if (memcmp(&_length, &zero_length, sizeof(struct docio_length)) == 0) {
1133        // If all the fields in docio_length are zero, then it means that the rest of
1134        // the current block, which starts at offset, is zero-filled and can be skipped.
1135        doc->length = zero_length;
1136        return (int64_t) FDB_RESULT_SUCCESS;
1137    }
1138
1139    // checksum check
1140    checksum = _docio_length_checksum(_length, handle);
1141    if (checksum != _length.checksum) {
1142        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1143                "doc_length meta checksum mismatch error in a database file '%s'"
1144                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
1145                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1146                checksum, _length.checksum, _length.keylen, _length.metalen,
1147                _length.bodylen, _length.bodylen_ondisk, offset);
1148        return (int64_t) FDB_RESULT_CHECKSUM_ERROR;
1149    }
1150
1151    doc->length = _docio_length_decode(_length);
1152    if (doc->length.keylen == 0 || doc->length.keylen > FDB_MAX_KEYLEN_INTERNAL) {
1153        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1154                "Error in decoding the doc length metadata (key length: %d) from "
1155                "a database file '%s'", doc->length.keylen, handle->file->filename);
1156        return (int64_t) FDB_RESULT_FILE_CORRUPTION;
1157    }
1158
1159    if (doc->key == NULL) {
1160        doc->key = (void *)malloc(doc->length.keylen);
1161        key_alloc = 1;
1162    }
1163    if (doc->meta == NULL && doc->length.metalen) {
1164        doc->meta = (void *)malloc(doc->length.metalen);
1165        meta_alloc = 1;
1166    }
1167
1168    _offset = _docio_read_doc_component(handle, _offset, doc->length.keylen,
1169                                        doc->key, log_callback);
1170    if (_offset < 0) {
1171        fdb_log(log_callback, (fdb_status) _offset,
1172                "Error in reading a key with offset %" _F64 ", length %d "
1173                "from a database file '%s'", offset, doc->length.keylen,
1174                handle->file->filename);
1175        free_docio_object(doc, key_alloc, meta_alloc, 0);
1176        return _offset;
1177    }
1178
1179    // read timestamp
1180    _offset = _docio_read_doc_component(handle, _offset,
1181                                        sizeof(timestamp_t),
1182                                        &_timestamp, log_callback);
1183    if (_offset < 0) {
1184        fdb_log(log_callback, (fdb_status) _offset,
1185                "Error in reading a timestamp with offset %" _F64 ", length %d "
1186                "from a database file '%s'", offset, sizeof(timestamp_t),
1187                handle->file->filename);
1188        free_docio_object(doc, key_alloc, meta_alloc, 0);
1189        return _offset;
1190    }
1191    doc->timestamp = _endian_decode(_timestamp);
1192
1193    // copy sequence number (optional)
1194    _offset = _docio_read_doc_component(handle, _offset, sizeof(fdb_seqnum_t),
1195                                        (void *)&_seqnum, log_callback);
1196    if (_offset < 0) {
1197        fdb_log(log_callback, (fdb_status) _offset,
1198                "Error in reading a sequence number with offset %" _F64 ", length %d "
1199                "from a database file '%s'", offset, sizeof(fdb_seqnum_t),
1200                handle->file->filename);
1201        free_docio_object(doc, key_alloc, meta_alloc, 0);
1202        return _offset;
1203    }
1204    doc->seqnum = _endian_decode(_seqnum);
1205
1206    _offset = _docio_read_doc_component(handle, _offset, doc->length.metalen,
1207                                        doc->meta, log_callback);
1208    if (_offset < 0) {
1209        fdb_log(log_callback, (fdb_status) _offset,
1210                "Error in reading the doc metadata with offset %" _F64 ", length %d "
1211                "from a database file '%s'", offset, doc->length.metalen,
1212                handle->file->filename);
1213        free_docio_object(doc, key_alloc, meta_alloc, 0);
1214        return _offset;
1215    }
1216
1217    uint8_t free_meta = meta_alloc && !doc->length.metalen;
1218    free_docio_object(doc, 0, free_meta, 0);
1219
1220    return _offset;
1221}
1222
1223int64_t docio_read_doc(struct docio_handle *handle, uint64_t offset,
1224                       struct docio_object *doc,
1225                       bool read_on_cache_miss)
1226{
1227    uint8_t checksum;
1228    int64_t _offset;
1229    int key_alloc = 0;
1230    int meta_alloc = 0;
1231    int body_alloc = 0;
1232    fdb_seqnum_t _seqnum;
1233    timestamp_t _timestamp;
1234    void *comp_body = NULL;
1235    struct docio_length _length, zero_length;
1236    err_log_callback *log_callback = handle->log_callback;
1237
1238    _offset = _docio_read_length(handle, offset, &_length, log_callback,
1239                                 read_on_cache_miss);
1240    if (_offset < 0) {
1241        if (read_on_cache_miss) {
1242            fdb_log(log_callback, (fdb_status) _offset,
1243                    "Error in reading the doc length metadata with offset %" _F64 " from "
1244                    "a database file '%s'",
1245                    offset, handle->file->filename);
1246        }
1247        return _offset;
1248    }
1249
1250    memset(&zero_length, 0x0, sizeof(struct docio_length));
1251    if (memcmp(&_length, &zero_length, sizeof(struct docio_length)) == 0) {
1252        // If all the fields in docio_length are zero, then it means that the rest of
1253        // the current block, which starts at offset, is zero-filled and can be skipped.
1254        doc->length = zero_length;
1255        return (int64_t) FDB_RESULT_SUCCESS;
1256    }
1257
1258    // checksum check
1259    checksum = _docio_length_checksum(_length, handle);
1260    if (checksum != _length.checksum) {
1261        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1262                "doc_length body checksum mismatch error in a database file '%s'"
1263                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
1264                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1265                checksum, _length.checksum, _length.keylen, _length.metalen,
1266                _length.bodylen, _length.bodylen_ondisk, offset);
1267        return (int64_t) FDB_RESULT_CHECKSUM_ERROR;
1268    }
1269
1270    doc->length = _docio_length_decode(_length);
1271    if (doc->length.flag & DOCIO_TXN_COMMITTED) {
1272        // transaction commit mark
1273        // read the corresponding doc offset
1274
1275        // If TXN_COMMITTED flag is set, this doc is not an actual doc, but a
1276        // transaction commit marker. Thus, all lengths should be zero.
1277        if (doc->length.keylen || doc->length.metalen ||
1278            doc->length.bodylen || doc->length.bodylen_ondisk) {
1279            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1280                    "File corruption: Doc length fields in a transaction commit marker "
1281                    "was not zero in a database file '%s' offset %" _F64,
1282                    handle->file->filename, offset);
1283            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1284            return (int64_t) FDB_RESULT_FILE_CORRUPTION;
1285        }
1286
1287        uint64_t doc_offset;
1288        _offset = _docio_read_doc_component(handle, _offset,
1289                                            sizeof(doc_offset), &doc_offset,
1290                                            log_callback);
1291        if (_offset < 0) {
1292            fdb_log(log_callback, (fdb_status) _offset,
1293                    "Error in reading an offset of a committed doc from an offset %" _F64
1294                    " in a database file '%s'", offset, handle->file->filename);
1295            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1296            return _offset;
1297        }
1298        doc->doc_offset = _endian_decode(doc_offset);
1299        // The offset of the actual document that pointed by this commit marker
1300        // should not be greater than the file size.
1301        if (doc->doc_offset > filemgr_get_pos(handle->file)) {
1302            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1303                    "File corruption: Offset %" _F64 " of the actual doc pointed by the "
1304                    "commit marker is greater than the size %" _F64 " of a database file '%s'",
1305                    doc->doc_offset, filemgr_get_pos(handle->file),
1306                    handle->file->filename);
1307            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1308            return (int64_t) FDB_RESULT_FILE_CORRUPTION;
1309        }
1310        return _offset;
1311    }
1312
1313    if (doc->length.keylen == 0 || doc->length.keylen > FDB_MAX_KEYLEN_INTERNAL) {
1314        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1315                "Error in decoding the doc length metadata (key length: %d) from "
1316                "a database file '%s' offset %" _F64, doc->length.keylen,
1317                handle->file->filename, offset);
1318        return (int64_t) FDB_RESULT_FILE_CORRUPTION;
1319    }
1320
1321    if (doc->key == NULL) {
1322        doc->key = (void *)malloc(doc->length.keylen);
1323        key_alloc = 1;
1324    }
1325    if (doc->meta == NULL && doc->length.metalen) {
1326        doc->meta = (void *)malloc(doc->length.metalen);
1327        meta_alloc = 1;
1328    }
1329    if (doc->body == NULL && doc->length.bodylen) {
1330        doc->body = (void *)malloc(doc->length.bodylen);
1331        body_alloc = 1;
1332    }
1333
1334    _offset = _docio_read_doc_component(handle, _offset,
1335                                        doc->length.keylen,
1336                                        doc->key,
1337                                        log_callback);
1338    if (_offset < 0) {
1339        fdb_log(log_callback, (fdb_status) _offset,
1340                "Error in reading a key with offset %" _F64 ", length %d "
1341                "from a database file '%s'", offset, doc->length.keylen,
1342                handle->file->filename);
1343        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1344        return _offset;
1345    }
1346
1347    // read timestamp
1348    _offset = _docio_read_doc_component(handle, _offset,
1349                                        sizeof(timestamp_t),
1350                                        &_timestamp,
1351                                        log_callback);
1352    if (_offset < 0) {
1353        fdb_log(log_callback, (fdb_status) _offset,
1354                "Error in reading a timestamp with offset %" _F64 ", length %d "
1355                "from a database file '%s'", offset, sizeof(timestamp_t),
1356                handle->file->filename);
1357        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1358        return _offset;
1359    }
1360    doc->timestamp = _endian_decode(_timestamp);
1361
1362    // copy seqeunce number (optional)
1363    _offset = _docio_read_doc_component(handle, _offset,
1364                                        sizeof(fdb_seqnum_t),
1365                                        (void *)&_seqnum,
1366                                        log_callback);
1367    if (_offset < 0) {
1368        fdb_log(log_callback, (fdb_status) _offset,
1369                "Error in reading a sequence number with offset %" _F64 ", length %d "
1370                "from a database file '%s'", offset, sizeof(fdb_seqnum_t),
1371                handle->file->filename);
1372        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1373        return _offset;
1374    }
1375    doc->seqnum = _endian_decode(_seqnum);
1376
1377    _offset = _docio_read_doc_component(handle, _offset, doc->length.metalen,
1378                                        doc->meta, log_callback);
1379    if (_offset < 0) {
1380        fdb_log(log_callback, (fdb_status) _offset,
1381                "Error in reading the doc metadata with offset %" _F64 ", length %d "
1382                "from a database file '%s'", offset, doc->length.metalen,
1383                handle->file->filename);
1384        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1385        return _offset;
1386    }
1387
1388#ifdef _DOC_COMP
1389    if (doc->length.flag & DOCIO_COMPRESSED) {
1390        comp_body = (void*)malloc(doc->length.bodylen_ondisk);
1391        _offset = _docio_read_doc_component_comp(handle, _offset, doc->length.bodylen,
1392                                                 doc->length.bodylen_ondisk, doc->body,
1393                                                 comp_body, log_callback);
1394        if (_offset < 0) {
1395            fdb_log(log_callback, (fdb_status) _offset,
1396                    "Error in reading a compressed doc with offset %" _F64 ", length %d "
1397                    "from a database file '%s'", offset, doc->length.bodylen,
1398                    handle->file->filename);
1399            if (comp_body) {
1400                free(comp_body);
1401            }
1402            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1403            return _offset;
1404        }
1405    } else {
1406        _offset = _docio_read_doc_component(handle, _offset, doc->length.bodylen,
1407                                            doc->body, log_callback);
1408        if (_offset < 0) {
1409            fdb_log(log_callback, (fdb_status) _offset,
1410                    "Error in reading a doc with offset %" _F64 ", length %d "
1411                    "from a database file '%s'", offset, doc->length.bodylen,
1412                    handle->file->filename);
1413            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1414            return _offset;
1415        }
1416    }
1417#else
1418    _offset = _docio_read_doc_component(handle, _offset, doc->length.bodylen,
1419                                        doc->body, log_callback);
1420    if (_offset < 0) {
1421        fdb_log(log_callback, (fdb_status) _offset,
1422                "Error in reading a doc with offset %" _F64 ", length %d "
1423                "from a database file '%s'", offset, doc->length.bodylen,
1424                handle->file->filename);
1425        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1426        return _offset;
1427    }
1428#endif
1429
1430#ifdef __CRC32
1431    uint32_t crc_file, crc;
1432    _offset = _docio_read_doc_component(handle, _offset, sizeof(crc_file),
1433                                        (void *)&crc_file, log_callback);
1434    if (_offset < 0) {
1435        fdb_log(log_callback, (fdb_status) _offset,
1436                "Error in reading a doc's CRC value with offset %" _F64 ", length %d "
1437                "from a database file '%s'", offset, sizeof(crc_file),
1438                handle->file->filename);
1439        if (comp_body) {
1440            free(comp_body);
1441        }
1442        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1443        return _offset;
1444    }
1445
1446    crc = get_checksum(reinterpret_cast<const uint8_t*>(&_length),
1447                       sizeof(_length),
1448                       handle->file->crc_mode);
1449    crc = get_checksum(reinterpret_cast<const uint8_t*>(doc->key),
1450                       doc->length.keylen,
1451                       crc,
1452                       handle->file->crc_mode);
1453    crc = get_checksum(reinterpret_cast<const uint8_t*>(&_timestamp),
1454                       sizeof(timestamp_t),
1455                       crc,
1456                       handle->file->crc_mode);
1457    crc = get_checksum(reinterpret_cast<const uint8_t*>(&_seqnum),
1458                       sizeof(fdb_seqnum_t),
1459                       crc,
1460                       handle->file->crc_mode);
1461    crc = get_checksum(reinterpret_cast<const uint8_t*>(doc->meta),
1462                       doc->length.metalen,
1463                       crc,
1464                       handle->file->crc_mode);
1465
1466    if (doc->length.flag & DOCIO_COMPRESSED) {
1467        crc = get_checksum(reinterpret_cast<const uint8_t*>(comp_body),
1468                           doc->length.bodylen_ondisk,
1469                           crc,
1470                           handle->file->crc_mode);
1471        if (comp_body) {
1472            free(comp_body);
1473        }
1474    } else {
1475        crc = get_checksum(reinterpret_cast<const uint8_t*>(doc->body),
1476                           doc->length.bodylen,
1477                           crc,
1478                           handle->file->crc_mode);
1479    }
1480    if (crc != crc_file) {
1481        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1482                "doc_body checksum mismatch error in a database file '%s'"
1483                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
1484                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1485                crc, crc_file, _length.keylen, _length.metalen,
1486                _length.bodylen, _length.bodylen_ondisk, offset);
1487        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1488        return (int64_t) FDB_RESULT_CHECKSUM_ERROR;
1489    }
1490#endif
1491
1492    uint8_t free_meta = meta_alloc && !doc->length.metalen;
1493    uint8_t free_body = body_alloc && !doc->length.bodylen;
1494    free_docio_object(doc, 0, free_meta, free_body);
1495
1496    return _offset;
1497}
1498
1499static int _submit_async_io_requests(struct docio_handle *handle,
1500                                     struct docio_object *doc_array,
1501                                     size_t doc_idx,
1502                                     struct async_io_handle *aio_handle,
1503                                     int size,
1504                                     size_t *sum_doc_size,
1505                                     bool keymeta_only)
1506{
1507#ifdef _ASYNC_IO
1508#if !defined(WIN32) && !defined(_WIN32)
1509    struct io_event* io_evt = NULL;
1510    uint8_t *buf = NULL;
1511    uint64_t offset = 0, _offset = 0;
1512    int num_events = 0;
1513
1514    int num_sub = handle->file->ops->aio_submit(aio_handle, size);
1515    if (num_sub < 0) {
1516        // Error loggings
1517        char errno_msg[512];
1518        handle->file->ops->get_errno_str(errno_msg, 512);
1519        fdb_log(handle->log_callback, (fdb_status) num_sub,
1520                "Error in submitting async I/O requests to a file '%s', errno msg: %s",
1521                handle->file->filename, errno_msg);
1522        return num_sub;
1523    } else if (num_sub != size) {
1524        // Error loggings
1525        char errno_msg[512];
1526        handle->file->ops->get_errno_str(errno_msg, 512);
1527        fdb_log(handle->log_callback, (fdb_status) num_sub,
1528                "Error in submitting async I/O requests to a file '%s', errno msg: %s, "
1529                "%d requests were submitted, but only %d requests were processed",
1530                handle->file->filename, errno_msg, size, num_sub);
1531        return num_sub;
1532    }
1533
1534    while (num_sub > 0) {
1535        num_events = handle->file->ops->aio_getevents(aio_handle, 1,
1536                                                      num_sub, (unsigned int) -1);
1537        if (num_events < 0) {
1538            // Error loggings
1539            char errno_msg[512];
1540            handle->file->ops->get_errno_str(errno_msg, 512);
1541            fdb_log(handle->log_callback, (fdb_status) num_events,
1542                    "Error in getting async I/O events from the completion queue "
1543                    "for a file '%s', errno msg: %s", handle->file->filename, errno_msg);
1544            return num_events;
1545        }
1546        num_sub -= num_events;
1547        for (io_evt = aio_handle->events; num_events > 0; --num_events, ++io_evt) {
1548            buf = (uint8_t *) io_evt->obj->u.c.buf;
1549            offset = *((uint64_t *) io_evt->data); // Original offset.
1550
1551            // Set the docio handle's buffer to the AIO buffer to read
1552            // a doc from the AIO buffer. If adddtional blocks need to be
1553            // read, then they will be sequentially read through the synchronous
1554            // I/O path (i.e., buffer cache -> disk read if cache miss).
1555            // As these additional blocks are sequential reads, we don't expect
1556            // asynchronous I/O to give us performance boost.
1557            void *tmp_buffer = handle->readbuffer;
1558            handle->readbuffer = buf;
1559            handle->lastbid = offset / aio_handle->block_size;
1560            memset(&doc_array[doc_idx], 0x0, sizeof(struct docio_object));
1561            if (keymeta_only) {
1562                _offset = docio_read_doc_key_meta(handle, offset,
1563                                                  &doc_array[doc_idx], true);
1564            } else {
1565                _offset = docio_read_doc(handle, offset, &doc_array[doc_idx],
1566                                         true);
1567            }
1568            if (_offset <= 0) {
1569                ++doc_idx;
1570                handle->readbuffer = tmp_buffer;
1571                handle->lastbid = BLK_NOT_FOUND;
1572                continue;
1573            }
1574            handle->readbuffer = tmp_buffer;
1575            handle->lastbid = BLK_NOT_FOUND;
1576
1577            (*sum_doc_size) += _fdb_get_docsize(doc_array[doc_idx].length);
1578            if (keymeta_only) {
1579                (*sum_doc_size) -= doc_array[doc_idx].length.bodylen_ondisk;
1580            }
1581            ++doc_idx;
1582        }
1583    }
1584    return size;
1585#else // Plan to implement async I/O in other OSs (e.g., Windows, OSx)
1586    return 0;
1587#endif
1588#else // Async I/O is not supported in the current OS.
1589    return 0;
1590#endif
1591}
1592
1593size_t docio_batch_read_docs(struct docio_handle *handle,
1594                             uint64_t *offset_array,
1595                             struct docio_object *doc_array,
1596                             size_t array_size,
1597                             size_t data_size_threshold,
1598                             size_t batch_size_threshold,
1599                             struct async_io_handle *aio_handle,
1600                             bool keymeta_only)
1601{
1602    size_t i = 0;
1603    size_t sum_doc_size = 0;
1604    size_t doc_idx = 0;
1605    size_t block_size = handle->file->blocksize;
1606    uint64_t _offset = 0;
1607    int aio_size = 0;
1608    bool read_fail = false;
1609    bool read_on_cache_miss = true;
1610
1611    if (aio_handle) {
1612        // If async I/O is supported, we will then read non-resident docs from disk
1613        // by using async I/O operations.
1614        read_on_cache_miss = false;
1615    }
1616
1617    for (i = 0; i < array_size && i < batch_size_threshold &&
1618           sum_doc_size < data_size_threshold; ++i) {
1619        memset(&doc_array[doc_idx], 0x0, sizeof(struct docio_object));
1620        if (keymeta_only) {
1621            _offset = docio_read_doc_key_meta(handle, offset_array[i], &doc_array[doc_idx],
1622                                              read_on_cache_miss);
1623        } else {
1624            _offset = docio_read_doc(handle, offset_array[i], &doc_array[doc_idx],
1625                                     read_on_cache_miss);
1626        }
1627        if (_offset <= 0) {
1628            if (aio_handle) {
1629                // The page is not resident in the cache. Prepare and perform Async I/O
1630                handle->file->ops->aio_prep_read(aio_handle, aio_size,
1631                                                 block_size, offset_array[i]);
1632                if (++aio_size == (int) aio_handle->queue_depth) {
1633                    int num_sub = _submit_async_io_requests(handle, doc_array, doc_idx,
1634                                                            aio_handle, aio_size,
1635                                                            &sum_doc_size,
1636                                                            keymeta_only);
1637                    if (num_sub < 0 || num_sub != aio_size) {
1638                        read_fail = true;
1639                        break;
1640                    }
1641                    aio_size = 0;
1642                    doc_idx += num_sub;
1643                }
1644            } else {
1645                ++doc_idx; // Error in reading a doc.
1646            }
1647        } else {
1648            sum_doc_size += _fdb_get_docsize(doc_array[doc_idx].length);
1649            if (keymeta_only) {
1650                sum_doc_size -= doc_array[doc_idx].length.bodylen_ondisk;
1651            }
1652            ++doc_idx;
1653        }
1654    }
1655
1656    if (aio_size && !read_fail) {
1657        int num_sub = _submit_async_io_requests(handle, doc_array, doc_idx,
1658                                                aio_handle, aio_size,
1659                                                &sum_doc_size, keymeta_only);
1660        if (num_sub < 0) {
1661            read_fail = true;
1662        } else {
1663            doc_idx += num_sub;
1664        }
1665    }
1666
1667    if (read_fail) {
1668        for (i = 0; i < batch_size_threshold; ++i) {
1669            free(doc_array[i].key);
1670            free(doc_array[i].meta);
1671            free(doc_array[i].body);
1672            doc_array[i].key = doc_array[i].meta = doc_array[i].body = NULL;
1673        }
1674        return (size_t) -1;
1675    }
1676
1677    return doc_idx;
1678}
1679
1680bool docio_check_buffer(struct docio_handle *handle,
1681                        bid_t bid,
1682                        uint64_t sb_bmp_revnum)
1683{
1684    err_log_callback *log_callback = handle->log_callback;
1685    _docio_read_through_buffer(handle, bid, log_callback, true);
1686    return _docio_check_buffer(handle, sb_bmp_revnum);
1687}
1688
1689