xref: /4.6.0/forestdb/src/docio.cc (revision 3f1b12bb)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21
22#include "docio.h"
23#include "wal.h"
24#include "fdb_internal.h"
25#include "version.h"
26#ifdef _DOC_COMP
27#include "snappy-c.h"
28#endif
29
30#include "memleak.h"
31
32void docio_init(struct docio_handle *handle,
33                struct filemgr *file,
34                bool compress_document_body)
35{
36    handle->file = file;
37    handle->curblock = BLK_NOT_FOUND;
38    handle->curpos = 0;
39    handle->cur_bmp_revnum_hash = 0;
40    handle->lastbid = BLK_NOT_FOUND;
41    handle->lastBmpRevnum = 0;
42    handle->compress_document_body = compress_document_body;
43    malloc_align(handle->readbuffer, FDB_SECTOR_SIZE, file->blocksize);
44}
45
46void docio_free(struct docio_handle *handle)
47{
48    free_align(handle->readbuffer);
49}
50
51#ifdef __CRC32
52#define _add_blk_marker(file, bid, blocksize, marker, log_callback) \
53    filemgr_write_offset((file), (bid), (blocksize), BLK_MARKER_SIZE,\
54                         (marker), (false), (log_callback))
55#else
56#define _add_blk_marker(file, bid, blocksize, marker, log_callback) \
57    FDB_RESULT_SUCCESS
58#endif
59
60INLINE fdb_status _docio_fill_zero(struct docio_handle *handle, bid_t bid,
61                                   size_t pos)
62{
63    // Fill next few bytes (sizeof(struct docio_length)) with zero
64    // to avoid false positive docio_length checksum during file scanning.
65    // (Note that the checksum value of zero-filled docio_length is 0x6F.)
66
67    size_t blocksize = handle->file->blocksize;
68    size_t len_size = sizeof(struct docio_length);
69    uint8_t *zerobuf = alca(uint8_t, len_size);
70
71#ifdef __CRC32
72    if (ver_non_consecutive_doc(handle->file->version)) {
73        // new version: support non-consecutive document block
74        blocksize -= DOCBLK_META_SIZE;
75    } else {
76        // old version: block marker only
77        blocksize -= BLK_MARKER_SIZE;
78    }
79#endif
80
81    if (pos + len_size <= blocksize) {
82        // enough space in the block
83        memset(zerobuf, 0x0, len_size);
84        return filemgr_write_offset(handle->file, bid, pos, len_size,
85                                    zerobuf, false, handle->log_callback);
86    } else {
87        // lack of space .. we don't need to fill zero bytes.
88        return FDB_RESULT_SUCCESS;
89    }
90}
91
92bid_t docio_append_doc_raw(struct docio_handle *handle, uint64_t size, void *buf)
93{
94    uint32_t offset;
95    uint8_t marker[BLK_MARKER_SIZE];
96    size_t blocksize = handle->file->blocksize;
97    size_t real_blocksize = blocksize;
98    size_t remaining_space;
99    err_log_callback *log_callback = handle->log_callback;
100    bool non_consecutive = ver_non_consecutive_doc(handle->file->version);
101    struct docblk_meta blk_meta;
102
103    memset(&blk_meta, 0x0, sizeof(blk_meta));
104    blk_meta.marker = BLK_MARKER_DOC;
105    (void)blk_meta;
106
107#ifdef __CRC32
108    if (non_consecutive) {
109        // new version: support non-consecutive document block
110        blocksize -= DOCBLK_META_SIZE;
111    } else {
112        // old version: block marker only
113        blocksize -= BLK_MARKER_SIZE;
114    }
115    memset(marker, BLK_MARKER_DOC, BLK_MARKER_SIZE);
116#endif
117
118    if (handle->curblock == BLK_NOT_FOUND) {
119        // allocate new block
120        handle->cur_bmp_revnum_hash =
121            filemgr_get_sb_bmp_revnum(handle->file) & BMP_REVNUM_MASK;
122        handle->curblock = filemgr_alloc(handle->file, log_callback);
123        handle->curpos = 0;
124    }
125    if (!filemgr_is_writable(handle->file, handle->curblock)) {
126        // mark remaining space in old block as stale
127        if (handle->curpos < real_blocksize) {
128            // this function will calculate block marker size automatically.
129            filemgr_mark_stale(handle->file,
130                               real_blocksize * handle->curblock + handle->curpos,
131                               blocksize - handle->curpos);
132        }
133        // allocate new block
134        handle->cur_bmp_revnum_hash =
135            filemgr_get_sb_bmp_revnum(handle->file) & BMP_REVNUM_MASK;
136        handle->curblock = filemgr_alloc(handle->file, log_callback);
137        handle->curpos = 0;
138    }
139    blk_meta.sb_bmp_revnum_hash = _endian_encode(handle->cur_bmp_revnum_hash);
140
141    remaining_space = blocksize - handle->curpos;
142    if (size <= remaining_space) {
143        fdb_status fs = FDB_RESULT_SUCCESS;
144        // simply append to current block
145        offset = handle->curpos;
146
147        if (non_consecutive) {
148            // set next BID
149            blk_meta.next_bid = BLK_NOT_FOUND;
150            // write meta
151            fs = filemgr_write_offset(handle->file, handle->curblock,
152                                      blocksize, sizeof(blk_meta), &blk_meta,
153                                      false, log_callback);
154        } else {
155            fs = _add_blk_marker(handle->file, handle->curblock, blocksize, marker,
156                                 log_callback);
157        }
158
159
160        if (fs != FDB_RESULT_SUCCESS) {
161            fdb_log(log_callback, fs,
162                    "Error in appending a doc block marker for a block id %" _F64
163                    " into a database file '%s'", handle->curblock,
164                    handle->file->filename);
165            return BLK_NOT_FOUND;
166        }
167        fs = filemgr_write_offset(handle->file, handle->curblock, offset, size,
168                                  buf, (size == remaining_space), log_callback);
169        if (fs != FDB_RESULT_SUCCESS) {
170            fdb_log(log_callback, fs,
171                    "Error in writing a doc block with id %" _F64 ", offset %d, size %"
172                    _F64 " to a database file '%s'", handle->curblock, offset, size,
173                    handle->file->filename);
174            return BLK_NOT_FOUND;
175        }
176        handle->curpos += size;
177
178        if (_docio_fill_zero(handle, handle->curblock, handle->curpos) !=
179            FDB_RESULT_SUCCESS) {
180            return BLK_NOT_FOUND;
181        }
182
183        return handle->curblock * real_blocksize + offset;
184
185    } else { // insufficient space to fit entire document into current block
186        bid_t begin, end, i, startpos;
187        bid_t *block_list, block_list_size = 0;
188        uint16_t *bmp_revnum_list;
189        uint32_t nblock = size / blocksize;
190        uint32_t remain = size % blocksize;
191        uint64_t remainsize = size;
192        fdb_status fs = FDB_RESULT_SUCCESS;
193
194        // as blocks may not be consecutive, we need to maintain
195        // the list of BIDs.
196        block_list = (bid_t *)alca(bid_t, nblock+1);
197        bmp_revnum_list = (uint16_t *)alca(uint16_t, nblock+1);
198
199#ifdef DOCIO_BLOCK_ALIGN
200        offset = blocksize - handle->curpos;
201        if (remain <= blocksize - handle->curpos &&
202            filemgr_alloc_multiple_cond(handle->file, handle->curblock+1,
203                                        nblock + ((remain>offset)?1:0), &begin, &end,
204                                        log_callback) == handle->curblock+1) {
205
206            // start from current block
207            if (begin != (handle->curblock + 1)) {
208                fdb_log(log_callback, fs,
209                        "Error in allocating blocks starting from block id %" _F64
210                        " in a database file '%s'", handle->curblock + 1,
211                        handle->file->filename);
212                return BLK_NOT_FOUND;
213            }
214
215            fs = _add_blk_marker(handle->file, handle->curblock, blocksize,
216                                 marker, log_callback);
217            if (fs != FDB_RESULT_SUCCESS) {
218                fdb_log(log_callback, fs,
219                        "Error in appending a doc block marker for a block id %" _F64
220                        " into a database file '%s'", handle->curblock,
221                        handle->file->filename);
222                return BLK_NOT_FOUND;
223            }
224            if (offset > 0) {
225                fs = filemgr_write_offset(handle->file, handle->curblock,
226                                          handle->curpos, offset, buf,
227                                          true, // mark block as immutable
228                                          log_callback);
229                if (fs != FDB_RESULT_SUCCESS) {
230                    fdb_log(log_callback, fs,
231                            "Error in writing a doc block with id %" _F64 ", offset %d, "
232                            "size %" _F64 " to a database file '%s'", handle->curblock,
233                            offset, size, handle->file->filename);
234                    return BLK_NOT_FOUND;
235                }
236            }
237            remainsize -= offset;
238
239            startpos = handle->curblock * real_blocksize + handle->curpos;
240        } else {
241            // next block to be allocated is not continuous .. allocate new multiple blocks
242            filemgr_alloc_multiple(handle->file, nblock+((remain>0)?1:0),
243                                   &begin, &end, log_callback);
244            offset = 0;
245
246            startpos = begin * real_blocksize;
247        }
248
249#else
250        // Simple append mode
251        // The given doc is appended at the byte offset right next the last doc.
252        // Note that block allocation can be non-consecutive.
253        offset = blocksize - handle->curpos;
254
255        if (non_consecutive) {
256            // new version: support non-consecutive allocation
257
258            bool new_block = false;
259            bool start_from_new_block = false;
260
261            if (remain > offset) {
262                // if the current block cannot accommodate the remaining length
263                // of the document, allocate an additional block.
264                new_block = true;
265            }
266
267            block_list_size = nblock + ((new_block)?1:0);
268            for (i=0; i<block_list_size; ++i) {
269                bmp_revnum_list[i] = filemgr_get_sb_bmp_revnum(handle->file) &
270                                     BMP_REVNUM_MASK;
271                block_list[i] = filemgr_alloc(handle->file, log_callback);
272
273                if (i == 0 && handle->curblock != BLK_NOT_FOUND &&
274                    block_list[i] > handle->curblock+1) {
275                    // if the first new allocated block is not consecutive
276                    // from the current block, start writing document from
277                    // the new block.
278                    start_from_new_block = true;
279                    // since we won't write into the current block,
280                    // allocate one more block if necessary.
281                    if (remain && !new_block) {
282                        new_block = true;
283                        block_list_size++;
284                    }
285                }
286            }
287
288            if (offset > 0 && !start_from_new_block) {
289                // start from the current block
290
291                // set next BID
292                blk_meta.next_bid = _endian_encode(block_list[0]);
293                // write meta
294                fs = filemgr_write_offset(handle->file, handle->curblock,
295                                          blocksize, sizeof(blk_meta), &blk_meta,
296                                          false, log_callback);
297                if (fs != FDB_RESULT_SUCCESS) {
298                    fdb_log(log_callback, fs,
299                            "Error in appending a doc block metadata for a block id %" _F64
300                            " into a database file '%s'", handle->curblock,
301                            handle->file->filename);
302                    return BLK_NOT_FOUND;
303                }
304
305                // write the front part of the doc
306                if (offset > 0) {
307                    fs = filemgr_write_offset(handle->file, handle->curblock,
308                                              handle->curpos, offset, buf,
309                                              true, // mark block as immutable
310                                              log_callback);
311                    if (fs != FDB_RESULT_SUCCESS) {
312                        fdb_log(log_callback, fs,
313                                "Error in writing a doc block with id %" _F64 ", offset %d, "
314                                "size %" _F64 " to a database file '%s'", handle->curblock,
315                                offset, size, handle->file->filename);
316                        return BLK_NOT_FOUND;
317                    }
318                }
319                remainsize -= offset;
320
321                startpos = handle->curblock * real_blocksize + handle->curpos;
322            } else {
323                // mark remaining space in the current block as stale
324                if (handle->curblock != BLK_NOT_FOUND &&
325                    handle->curpos < real_blocksize) {
326                    filemgr_mark_stale(handle->file,
327                                       real_blocksize * handle->curblock + handle->curpos,
328                                       blocksize - handle->curpos);
329                }
330                offset = 0;
331                startpos = block_list[0] * real_blocksize;
332            }
333
334        } else {
335            // old version: consecutive allocation only
336
337            if (filemgr_alloc_multiple_cond(handle->file, handle->curblock+1,
338                                            nblock + ((remain>offset)?1:0), &begin, &end,
339                                            log_callback) == handle->curblock+1) {
340                // start from current block
341                if (begin != (handle->curblock + 1)) {
342                    fdb_log(log_callback, fs,
343                            "Error in allocating blocks starting from block id %" _F64
344                            " in a database file '%s'", handle->curblock + 1,
345                            handle->file->filename);
346                    return BLK_NOT_FOUND;
347                }
348
349                fs = _add_blk_marker(handle->file, handle->curblock, blocksize,
350                                     marker, log_callback);
351                if (fs != FDB_RESULT_SUCCESS) {
352                    fdb_log(log_callback, fs,
353                            "Error in appending a doc block marker for a block id %" _F64
354                            " into a database file '%s'", handle->curblock,
355                            handle->file->filename);
356                    return BLK_NOT_FOUND;
357                }
358                if (offset > 0) {
359                    fs = filemgr_write_offset(handle->file, handle->curblock,
360                                              handle->curpos, offset, buf,
361                                              true, // mark block as immutable
362                                              log_callback);
363                    if (fs != FDB_RESULT_SUCCESS) {
364                        fdb_log(log_callback, fs,
365                                "Error in writing a doc block with id %" _F64 ", offset %d, "
366                                "size %" _F64 " to a database file '%s'", handle->curblock,
367                                offset, size, handle->file->filename);
368                        return BLK_NOT_FOUND;
369                    }
370                }
371                remainsize -= offset;
372
373                startpos = handle->curblock * real_blocksize + handle->curpos;
374            } else {
375                // next block to be allocated is not continuous
376                // mark remaining space in the old block as stale
377                if (handle->curblock != BLK_NOT_FOUND &&
378                    handle->curpos < real_blocksize) {
379                    filemgr_mark_stale(handle->file,
380                                       real_blocksize * handle->curblock + handle->curpos,
381                                       blocksize - handle->curpos);
382                }
383                // allocate new multiple blocks
384                filemgr_alloc_multiple(handle->file, nblock+((remain>0)?1:0),
385                                       &begin, &end, log_callback);
386                offset = 0;
387
388                startpos = begin * real_blocksize;
389            }
390
391            block_list_size = end - begin + 1;
392            for (i=0; i<block_list_size; ++i) {
393                block_list[i] = begin+i;
394            }
395
396        } // if (non_consecutive)
397
398#endif
399
400        for (i=0; i<block_list_size; ++i) {
401            handle->curblock = block_list[i];
402            handle->cur_bmp_revnum_hash = bmp_revnum_list[i];
403            blk_meta.sb_bmp_revnum_hash = _endian_encode(handle->cur_bmp_revnum_hash);
404
405            if (non_consecutive) {
406                if (i < block_list_size - 1) {
407                    blk_meta.next_bid = _endian_encode(block_list[i+1]);
408                } else {
409                    // the last block .. set next BID '0xffff...'
410                    memset(&blk_meta.next_bid, 0xff, sizeof(blk_meta.next_bid));
411                }
412            }
413
414            // write meta (new) or block marker (old)
415            if (non_consecutive) {
416                fs = filemgr_write_offset(handle->file, handle->curblock,
417                                          blocksize, sizeof(blk_meta), &blk_meta,
418                                          false, log_callback);
419            } else {
420                fs = _add_blk_marker(handle->file, block_list[i], blocksize, marker,
421                                     log_callback);
422            }
423            if (fs != FDB_RESULT_SUCCESS) {
424                fdb_log(log_callback, fs,
425                        "Error in appending a doc block marker for a block "
426                        "id %" _F64 " into a database file '%s'", block_list[i],
427                        handle->file->filename);
428                return BLK_NOT_FOUND;
429            }
430
431            if (remainsize >= blocksize) {
432                // write entire block
433
434                fs = filemgr_write_offset(handle->file, block_list[i], 0, blocksize,
435                                          (uint8_t *)buf + offset,
436                                          true, // mark block as immutable
437                                          log_callback);
438                if (fs != FDB_RESULT_SUCCESS) {
439                    fdb_log(log_callback, fs,
440                            "Error in writing an entire doc block with id %" _F64
441                            ", size %" _F64 " to a database file '%s'", block_list[i], blocksize,
442                            handle->file->filename);
443                    return BLK_NOT_FOUND;
444                }
445                offset += blocksize;
446                remainsize -= blocksize;
447                handle->curpos = blocksize;
448
449            } else {
450                // write rest of document
451                fdb_assert(i==block_list_size-1, i, block_list_size-1);
452
453                fs = filemgr_write_offset(handle->file, block_list[i], 0, remainsize,
454                                          (uint8_t *)buf + offset,
455                                          (remainsize == blocksize),
456                                          log_callback);
457                if (fs != FDB_RESULT_SUCCESS) {
458                    fdb_log(log_callback, fs,
459                            "Error in writing a doc block with id %" _F64 ", "
460                            "size %" _F64 " to a database file '%s'", block_list[i], remainsize,
461                            handle->file->filename);
462                    return BLK_NOT_FOUND;
463                }
464                offset += remainsize;
465                handle->curpos = remainsize;
466
467                if (_docio_fill_zero(handle, block_list[i], handle->curpos) !=
468                    FDB_RESULT_SUCCESS) {
469                    return BLK_NOT_FOUND;
470                }
471            }
472        }
473
474        return startpos;
475    }
476
477    return 0;
478}
479
480#ifdef __ENDIAN_SAFE
481INLINE struct docio_length _docio_length_encode(struct docio_length length)
482{
483    struct docio_length ret;
484    ret = length;
485    ret.keylen = _endian_encode(length.keylen);
486    ret.metalen = _endian_encode(length.metalen);
487    ret.bodylen = _endian_encode(length.bodylen);
488    ret.bodylen_ondisk = _endian_encode(length.bodylen_ondisk);
489    return ret;
490}
491INLINE struct docio_length _docio_length_decode(struct docio_length length)
492{
493    struct docio_length ret;
494    ret = length;
495    ret.keylen = _endian_decode(length.keylen);
496    ret.metalen = _endian_decode(length.metalen);
497    ret.bodylen = _endian_decode(length.bodylen);
498    ret.bodylen_ondisk = _endian_decode(length.bodylen_ondisk);
499    return ret;
500}
501#else
502#define _docio_length_encode(a)
503#define _docio_length_decode(a)
504#endif
505
506INLINE uint8_t _docio_length_checksum(struct docio_length length, struct docio_handle* handle)
507{
508    return uint8_t(get_checksum(reinterpret_cast<const uint8_t*>(&length),
509                                sizeof(keylen_t) + sizeof(uint16_t) + sizeof(uint32_t)*2,
510                                handle->file->crc_mode) & 0xff);
511}
512
513INLINE bid_t _docio_append_doc(struct docio_handle *handle, struct docio_object *doc)
514{
515    size_t _len;
516    uint32_t offset = 0;
517    uint32_t crc;
518    uint64_t docsize;
519    void *buf = NULL;
520    bid_t ret_offset;
521    fdb_seqnum_t _seqnum;
522    timestamp_t _timestamp;
523    struct docio_length length, _length;
524    err_log_callback *log_callback = handle->log_callback;
525
526    length = doc->length;
527    length.bodylen_ondisk = length.bodylen;
528
529#ifdef _DOC_COMP
530    int ret;
531    void *compbuf = NULL;
532    uint32_t compbuf_len = 0;
533    if (doc->length.bodylen > 0 && handle->compress_document_body) {
534        compbuf_len = snappy_max_compressed_length(length.bodylen);
535        compbuf = (void *)malloc(compbuf_len);
536
537        _len = compbuf_len;
538        ret = snappy_compress((char*)doc->body, length.bodylen, (char*)compbuf, &_len);
539        if (ret < 0) { // LCOV_EXCL_START
540            fdb_log(log_callback, FDB_RESULT_COMPRESSION_FAIL,
541                    "Error in compressing the doc body of key '%s' from "
542                    "a database file '%s'",
543                    (char *) doc->key, handle->file->filename);
544            free(compbuf);
545            // we use BLK_NOT_FOUND for error code of appending instead of 0
546            // because document can be written at the byte offset 0
547            return BLK_NOT_FOUND;
548        } // LCOV_EXCL_STOP
549
550        length.bodylen_ondisk = compbuf_len = _len;
551        length.flag |= DOCIO_COMPRESSED;
552
553        docsize = sizeof(struct docio_length) + length.keylen + length.metalen;
554        docsize += compbuf_len;
555    } else {
556        docsize = sizeof(struct docio_length) + length.keylen + length.metalen + length.bodylen;
557        compbuf_len = length.bodylen;
558    }
559#else
560    docsize = sizeof(struct docio_length) + length.keylen + length.metalen + length.bodylen;
561#endif
562    docsize += sizeof(timestamp_t);
563
564    docsize += sizeof(fdb_seqnum_t);
565
566#ifdef __CRC32
567    docsize += sizeof(crc);
568#endif
569
570    doc->length = length;
571    buf = (void *)malloc(docsize);
572
573    _length = _docio_length_encode(length);
574
575    // calculate checksum of LENGTH using crc
576    _length.checksum = _docio_length_checksum(_length, handle);
577
578    memcpy((uint8_t *)buf + offset, &_length, sizeof(struct docio_length));
579    offset += sizeof(struct docio_length);
580
581    // copy key
582    memcpy((uint8_t *)buf + offset, doc->key, length.keylen);
583    offset += length.keylen;
584
585    // copy timestamp
586    _timestamp = _endian_encode(doc->timestamp);
587    memcpy((uint8_t*)buf + offset, &_timestamp, sizeof(_timestamp));
588    offset += sizeof(_timestamp);
589
590    // copy seqeunce number (optional)
591    _seqnum = _endian_encode(doc->seqnum);
592    memcpy((uint8_t *)buf + offset, &_seqnum, sizeof(fdb_seqnum_t));
593    offset += sizeof(fdb_seqnum_t);
594
595    // copy metadata (optional)
596    if (length.metalen > 0) {
597        memcpy((uint8_t *)buf + offset, doc->meta, length.metalen);
598        offset += length.metalen;
599    }
600
601    // copy body (optional)
602    if (length.bodylen > 0) {
603#ifdef _DOC_COMP
604        if (length.flag & DOCIO_COMPRESSED) {
605            // compressed body
606            if (compbuf) {
607                memcpy((uint8_t*)buf + offset, compbuf, compbuf_len);
608                offset += compbuf_len;
609                free(compbuf);
610            }
611        } else {
612            memcpy((uint8_t *)buf + offset, doc->body, length.bodylen);
613            offset += length.bodylen;
614        }
615#else
616        memcpy((uint8_t *)buf + offset, doc->body, length.bodylen);
617        offset += length.bodylen;
618#endif
619    }
620
621#ifdef __CRC32
622    crc = get_checksum(reinterpret_cast<const uint8_t*>(buf),
623                       docsize - sizeof(crc),
624                       handle->file->crc_mode);
625    memcpy((uint8_t *)buf + offset, &crc, sizeof(crc));
626#endif
627
628    ret_offset = docio_append_doc_raw(handle, docsize, buf);
629    free(buf);
630
631    return ret_offset;
632}
633
634bid_t docio_append_commit_mark(struct docio_handle *handle, uint64_t doc_offset)
635{
636    // Note: should adapt DOCIO_COMMIT_MARK_SIZE if this function is modified.
637    uint32_t offset = 0;
638    uint64_t docsize;
639    uint64_t _doc_offset;
640    void *buf;
641    bid_t ret_offset;
642    struct docio_length length, _length;
643
644    memset(&length, 0, sizeof(struct docio_length));
645    length.flag = DOCIO_TXN_COMMITTED;
646
647    docsize = sizeof(struct docio_length) + sizeof(doc_offset);
648    buf = (void *)malloc(docsize);
649
650    _length = _docio_length_encode(length);
651
652    // calculate checksum of LENGTH using crc
653    _length.checksum = _docio_length_checksum(_length, handle);
654
655    memcpy((uint8_t *)buf + offset, &_length, sizeof(struct docio_length));
656    offset += sizeof(struct docio_length);
657
658    // copy doc_offset
659    _doc_offset = _endian_encode(doc_offset);
660    memcpy((uint8_t *)buf + offset, &_doc_offset, sizeof(_doc_offset));
661
662    ret_offset = docio_append_doc_raw(handle, docsize, buf);
663    free(buf);
664
665    return ret_offset;
666}
667
668bid_t docio_append_doc(struct docio_handle *handle, struct docio_object *doc,
669                       uint8_t deleted, uint8_t txn_enabled)
670{
671    doc->length.flag = DOCIO_NORMAL;
672    if (deleted) {
673        doc->length.flag |= DOCIO_DELETED;
674    }
675    if (txn_enabled) {
676        doc->length.flag |= DOCIO_TXN_DIRTY;
677    }
678    return _docio_append_doc(handle, doc);
679}
680
681bid_t docio_append_doc_system(struct docio_handle *handle, struct docio_object *doc)
682{
683    doc->length.flag = DOCIO_NORMAL | DOCIO_SYSTEM;
684    return _docio_append_doc(handle, doc);
685}
686
687INLINE fdb_status _docio_read_through_buffer(struct docio_handle *handle,
688                                             bid_t bid,
689                                             err_log_callback *log_callback,
690                                             bool read_on_cache_miss)
691{
692    fdb_status status = FDB_RESULT_SUCCESS;
693
694    // if superblock's BMP revnum has been changed,
695    // then 'lastbid' should be reset as it might be reused.
696    if (handle->lastbid != BLK_NOT_FOUND &&
697        filemgr_get_sb_bmp_revnum(handle->file) != handle->lastBmpRevnum) {
698        handle->lastbid = BLK_NOT_FOUND;
699    }
700
701    // to reduce the overhead from memcpy the same block
702    if (handle->lastbid != bid) {
703        status = filemgr_read(handle->file, bid, handle->readbuffer,
704                              log_callback, read_on_cache_miss);
705        if (status != FDB_RESULT_SUCCESS) {
706            if (read_on_cache_miss) {
707                fdb_log(log_callback, status,
708                        "Error in reading a doc block with id %" _F64 " from "
709                        "a database file '%s'", bid, handle->file->filename);
710            }
711            // we must reset 'lastbid' here because now 'readbuffer'
712            // may contain other data unrelated to 'lastbid'.
713            handle->lastbid = BLK_NOT_FOUND;
714            return status;
715        }
716
717        if (filemgr_is_writable(handle->file, bid)) {
718            // this block can be modified later .. must be re-read
719            handle->lastbid = BLK_NOT_FOUND;
720        } else {
721            handle->lastbid = bid;
722            handle->lastBmpRevnum = filemgr_get_sb_bmp_revnum(handle->file);
723        }
724    }
725
726    return status;
727}
728
729INLINE bool _docio_check_buffer(struct docio_handle *handle, uint64_t bmp_revnum)
730{
731    size_t blocksize = handle->file->blocksize;
732    bool non_consecutive = ver_non_consecutive_doc(handle->file->version);
733    struct docblk_meta blk_meta;
734
735    if (non_consecutive) {
736        // new version: support non-consecutive document block
737        blocksize -= DOCBLK_META_SIZE;
738        memcpy(&blk_meta, (uint8_t*)handle->readbuffer + blocksize, sizeof(blk_meta));
739    } else {
740        // old version: block marker only
741        blocksize -= BLK_MARKER_SIZE;
742        memcpy(&blk_meta.marker, (uint8_t*)handle->readbuffer + blocksize,
743               sizeof(blk_meta.marker));
744    }
745
746    if (blk_meta.marker != BLK_MARKER_DOC) {
747        return false;
748    }
749
750    if (non_consecutive && bmp_revnum != (uint64_t)-1) {
751        uint16_t revnum_hash = _endian_decode(blk_meta.sb_bmp_revnum_hash);
752        if (revnum_hash == (bmp_revnum & BMP_REVNUM_MASK)) {
753            return true;
754        } else {
755            return false;
756        }
757    }
758    return true;
759}
760
761static int64_t _docio_read_length(struct docio_handle *handle,
762                                  uint64_t offset,
763                                  struct docio_length *length,
764                                  err_log_callback *log_callback,
765                                  bool read_on_cache_miss)
766{
767    size_t blocksize = handle->file->blocksize;
768    size_t real_blocksize = blocksize;
769    bool non_consecutive = ver_non_consecutive_doc(handle->file->version);
770    struct docblk_meta blk_meta;
771#ifdef __CRC32
772    if (non_consecutive) {
773        // new version: support non-consecutive document block
774        blocksize -= DOCBLK_META_SIZE;
775    } else {
776        // old version: block marker only
777        blocksize -= BLK_MARKER_SIZE;
778    }
779#endif
780
781    bid_t bid = offset / real_blocksize;
782    uint32_t pos = offset % real_blocksize;
783    void *buf = handle->readbuffer;
784    uint32_t restsize = 0;
785
786    if (blocksize > pos) {
787        restsize = blocksize - pos;
788    }
789
790    // read length structure
791    fdb_status fs = _docio_read_through_buffer(handle, bid, log_callback,
792                                               read_on_cache_miss);
793    if (fs != FDB_RESULT_SUCCESS) {
794        if (read_on_cache_miss) {
795            fdb_log(log_callback, fs,
796                    "Error in reading a doc length from offset %" _F64
797                    " in block id %" _F64
798                    " from a database file '%s'", offset, bid,
799                    handle->file->filename);
800        }
801        return (int64_t) fs;
802    }
803    if (!_docio_check_buffer(handle, (uint64_t)-1)) {
804        return (int64_t) FDB_RESULT_READ_FAIL; // Need to define a better error code
805    }
806
807    if (restsize >= sizeof(struct docio_length)) {
808        memcpy(length, (uint8_t *)buf + pos, sizeof(struct docio_length));
809        pos += sizeof(struct docio_length);
810
811    } else {
812        if (restsize > 0) {
813            memcpy(length, (uint8_t *)buf + pos, restsize);
814        }
815        // read additional block
816        if (non_consecutive) {
817            memcpy(&blk_meta, (uint8_t*)buf + blocksize, sizeof(blk_meta));
818            bid = _endian_decode(blk_meta.next_bid);
819            if (bid == BLK_NOT_FOUND) {
820                // Reached the last block. Simply return the offset that is passed to
821                // this function.
822                memset(length, 0x0, sizeof(struct docio_length));
823                return offset;
824            }
825        } else {
826            bid++;
827        }
828
829        fs = _docio_read_through_buffer(handle, bid, log_callback, true);
830        if (fs != FDB_RESULT_SUCCESS) {
831            fdb_log(log_callback, fs,
832                    "Error in reading a doc length from an additional block "
833                    "offset %" _F64 " in block id %" _F64
834                    " from a database file '%s'", offset,
835                    bid, handle->file->filename);
836            return (int64_t) fs;
837        }
838        if (!_docio_check_buffer(handle, (uint64_t)-1)) {
839            return (int64_t) FDB_RESULT_READ_FAIL; // Need to define a better error code
840        }
841        // memcpy rest of data
842        memcpy((uint8_t *)length + restsize, buf,
843               sizeof(struct docio_length) - restsize);
844        pos = sizeof(struct docio_length) - restsize;
845    }
846
847    return bid * real_blocksize + pos;
848}
849
850static int64_t _docio_read_doc_component(struct docio_handle *handle,
851                                         uint64_t offset,
852                                         uint32_t len,
853                                         void *buf_out,
854                                         err_log_callback *log_callback)
855{
856    uint32_t rest_len;
857    size_t blocksize = handle->file->blocksize;
858    size_t real_blocksize = blocksize;
859    bool non_consecutive = ver_non_consecutive_doc(handle->file->version);
860    struct docblk_meta blk_meta;
861#ifdef __CRC32
862    if (non_consecutive) {
863        // new version: support non-consecutive document block
864        blocksize -= DOCBLK_META_SIZE;
865    } else {
866        // old version: block marker only
867        blocksize -= BLK_MARKER_SIZE;
868    }
869#endif
870
871    bid_t bid = offset / real_blocksize;
872    uint32_t pos = offset % real_blocksize;
873    //uint8_t buf[handle->file->blocksize];
874    void *buf = handle->readbuffer;
875    uint32_t restsize;
876    fdb_status fs = FDB_RESULT_SUCCESS;
877
878    rest_len = len;
879
880    while(rest_len > 0) {
881        fs = _docio_read_through_buffer(handle, bid, log_callback, true);
882        if (fs != FDB_RESULT_SUCCESS) {
883            fdb_log(log_callback, fs,
884                    "Error in reading a doc block with block id %" _F64 " from "
885                    "a database file '%s'", bid, handle->file->filename);
886            return (int64_t)fs;
887        }
888        restsize = blocksize - pos;
889
890        if (restsize >= rest_len) {
891            memcpy((uint8_t *)buf_out + (len - rest_len), (uint8_t *)buf + pos, rest_len);
892            pos += rest_len;
893            rest_len = 0;
894        }else{
895            memcpy((uint8_t *)buf_out + (len - rest_len), (uint8_t *)buf + pos, restsize);
896
897            if (non_consecutive) {
898                memcpy(&blk_meta, (uint8_t*)buf + blocksize, sizeof(blk_meta));
899                bid = _endian_decode(blk_meta.next_bid);
900            } else {
901                bid++;
902            }
903
904            pos = 0;
905            rest_len -= restsize;
906
907            if (rest_len > 0 &&
908                bid >= filemgr_get_pos(handle->file) / handle->file->blocksize) {
909                // no more data in the file .. the file is corrupted
910                fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
911                        "Fatal error!!! Database file '%s' is corrupted.",
912                        handle->file->filename);
913                return (int64_t)FDB_RESULT_FILE_CORRUPTION;
914            }
915        }
916    }
917
918    return bid * real_blocksize + pos;
919}
920
921#ifdef _DOC_COMP
922
923static int64_t _docio_read_doc_component_comp(struct docio_handle *handle,
924                                              uint64_t offset,
925                                              uint32_t len,
926                                              uint32_t comp_len,
927                                              void *buf_out,
928                                              void *comp_data_out,
929                                              err_log_callback *log_callback)
930{
931    int ret;
932    size_t uncomp_size;
933    int64_t _offset;
934
935    _offset = _docio_read_doc_component(handle, offset,
936                                        comp_len, comp_data_out, log_callback);
937    if (_offset < 0) {
938        fdb_log(log_callback, (fdb_status) _offset,
939                "Error in reading the file with offset %" _F64 ", length %d "
940                "from a database file '%s'", offset, len,
941                handle->file->filename);
942        return _offset;
943    }
944
945    uncomp_size = len;
946    ret = snappy_uncompress((char*)comp_data_out, comp_len,
947                            (char*)buf_out, &uncomp_size);
948    if (ret < 0) {
949        fdb_log(log_callback, FDB_RESULT_COMPRESSION_FAIL,
950                "Error in decompressing the data that was read with the file "
951                "offset %" _F64 ", length %d from a database file '%s'",
952                offset, len, handle->file->filename);
953        return (int64_t) FDB_RESULT_COMPRESSION_FAIL;
954    }
955    if (uncomp_size != len) {
956        fdb_log(log_callback, FDB_RESULT_COMPRESSION_FAIL,
957                "Error in decompressing the data with the file offset "
958                "%" _F64 " in a database file '%s', because the uncompressed length %d "
959                "is not same as the expected length %d",
960                offset, handle->file->filename, uncomp_size, len);
961        return (int64_t) FDB_RESULT_COMPRESSION_FAIL;
962    }
963    return _offset;
964}
965
966#endif
967
968fdb_status docio_read_doc_length(struct docio_handle *handle,
969                                 struct docio_length *length,
970                                 uint64_t offset)
971{
972    uint8_t checksum;
973    int64_t _offset;
974    struct docio_length _length, zero_length;
975    err_log_callback *log_callback = handle->log_callback;
976
977    _offset = _docio_read_length(handle, offset, &_length, log_callback, true);
978    if (_offset < 0) {
979        return (fdb_status) _offset;
980    }
981
982    memset(&zero_length, 0x0, sizeof(struct docio_length));
983    if (memcmp(&_length, &zero_length, sizeof(struct docio_length)) == 0) {
984        // If all the fields in docio_length are zero, then it means that the rest of
985        // the current block, which starts at offset, is zero-filled and can be skipped.
986        *length = zero_length;
987        return FDB_RESULT_SUCCESS;
988    }
989
990    // checksum check
991    checksum = _docio_length_checksum(_length, handle);
992    if (checksum != _length.checksum) {
993        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
994                "doc_length checksum mismatch error in a database file '%s'"
995                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
996                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
997                checksum, _length.checksum, _length.keylen, _length.metalen,
998                _length.bodylen, _length.bodylen_ondisk, offset);
999        return FDB_RESULT_CHECKSUM_ERROR;
1000    }
1001
1002    *length = _docio_length_decode(_length);
1003    if (length->keylen == 0 || length->keylen > FDB_MAX_KEYLEN_INTERNAL) {
1004        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1005                "Error in decoding the doc length metadata in file %s"
1006                " crc %x keylen %d metalen %d bodylen %d "
1007                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1008                checksum, _length.keylen, _length.metalen,
1009                _length.bodylen, _length.bodylen_ondisk, offset);
1010        return FDB_RESULT_FILE_CORRUPTION;
1011    }
1012
1013    return FDB_RESULT_SUCCESS;
1014}
1015
1016fdb_status docio_read_doc_key(struct docio_handle *handle, uint64_t offset,
1017                              keylen_t *keylen, void *keybuf)
1018{
1019    uint8_t checksum;
1020    int64_t _offset;
1021    struct docio_length length, _length, zero_length;
1022    err_log_callback *log_callback = handle->log_callback;
1023
1024    _offset = _docio_read_length(handle, offset, &_length, log_callback, true);
1025    if (_offset < 0) {
1026        fdb_log(log_callback, (fdb_status) _offset,
1027                "Error in reading the doc length metadata with offset %" _F64 " from "
1028                "a database file '%s'",
1029                offset, handle->file->filename);
1030        return (fdb_status) _offset;
1031    }
1032
1033    memset(&zero_length, 0x0, sizeof(struct docio_length));
1034    if (memcmp(&_length, &zero_length, sizeof(struct docio_length)) == 0) {
1035        // If all the fields in docio_length are zero, then it means that the rest of
1036        // the current block, which starts at offset, is zero-filled and can be skipped.
1037        *keylen = 0;
1038        return FDB_RESULT_SUCCESS;
1039    }
1040
1041    // checksum check
1042    checksum = _docio_length_checksum(_length, handle);
1043    if (checksum != _length.checksum) {
1044        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1045                "doc_length key checksum mismatch error in a database file '%s'"
1046                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
1047                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1048                checksum, _length.checksum, _length.keylen, _length.metalen,
1049                _length.bodylen, _length.bodylen_ondisk, offset);
1050        return FDB_RESULT_CHECKSUM_ERROR;
1051    }
1052
1053    length = _docio_length_decode(_length);
1054    if (length.keylen == 0 || length.keylen > FDB_MAX_KEYLEN_INTERNAL) {
1055        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1056                "Error in decoding the doc key length metadata in file %s"
1057                " crc %x keylen %d metalen %d bodylen %d "
1058                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1059                checksum, _length.keylen, _length.metalen,
1060                _length.bodylen, _length.bodylen_ondisk, offset);
1061        return FDB_RESULT_FILE_CORRUPTION;
1062    }
1063
1064    _offset = _docio_read_doc_component(handle, _offset, length.keylen,
1065                                        keybuf, log_callback);
1066    if (_offset < 0) {
1067        fdb_log(log_callback, (fdb_status) _offset,
1068                "Error in reading a key with offset %" _F64 ", length %d "
1069                "from a database file '%s'", _offset, length.keylen,
1070                handle->file->filename);
1071        return (fdb_status) _offset;
1072    }
1073
1074    *keylen = length.keylen;
1075    return FDB_RESULT_SUCCESS;
1076}
1077
1078void free_docio_object(struct docio_object *doc, uint8_t key_alloc,
1079                       uint8_t meta_alloc, uint8_t body_alloc) {
1080    if (!doc) {
1081        return;
1082    }
1083
1084    if (key_alloc) {
1085        free(doc->key);
1086        doc->key = NULL;
1087    }
1088    if (meta_alloc) {
1089        free(doc->meta);
1090        doc->meta = NULL;
1091    }
1092    if (body_alloc) {
1093        free(doc->body);
1094        doc->body = NULL;
1095    }
1096}
1097
1098int64_t docio_read_doc_key_meta(struct docio_handle *handle, uint64_t offset,
1099                                struct docio_object *doc,
1100                                bool read_on_cache_miss)
1101{
1102    uint8_t checksum;
1103    int64_t _offset;
1104    int key_alloc = 0;
1105    int meta_alloc = 0;
1106    fdb_seqnum_t _seqnum;
1107    timestamp_t _timestamp;
1108    struct docio_length _length, zero_length;
1109    err_log_callback *log_callback = handle->log_callback;
1110
1111    _offset = _docio_read_length(handle, offset, &_length, log_callback,
1112                                 read_on_cache_miss);
1113    if (_offset < 0) {
1114        if (read_on_cache_miss) {
1115            fdb_log(log_callback, (fdb_status) _offset,
1116                    "Error in reading the doc length metadata with offset %" _F64 " from "
1117                    "a database file '%s'",
1118                    offset, handle->file->filename);
1119        }
1120        return _offset;
1121    }
1122
1123    memset(&zero_length, 0x0, sizeof(struct docio_length));
1124    if (memcmp(&_length, &zero_length, sizeof(struct docio_length)) == 0) {
1125        // If all the fields in docio_length are zero, then it means that the rest of
1126        // the current block, which starts at offset, is zero-filled and can be skipped.
1127        doc->length = zero_length;
1128        return (int64_t) FDB_RESULT_SUCCESS;
1129    }
1130
1131    // checksum check
1132    checksum = _docio_length_checksum(_length, handle);
1133    if (checksum != _length.checksum) {
1134        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1135                "doc_length meta checksum mismatch error in a database file '%s'"
1136                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
1137                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1138                checksum, _length.checksum, _length.keylen, _length.metalen,
1139                _length.bodylen, _length.bodylen_ondisk, offset);
1140        return (int64_t) FDB_RESULT_CHECKSUM_ERROR;
1141    }
1142
1143    doc->length = _docio_length_decode(_length);
1144    if (doc->length.keylen == 0 || doc->length.keylen > FDB_MAX_KEYLEN_INTERNAL) {
1145        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1146                "Error in decoding the doc length metadata (key length: %d) from "
1147                "a database file '%s'", doc->length.keylen, handle->file->filename);
1148        return (int64_t) FDB_RESULT_FILE_CORRUPTION;
1149    }
1150
1151    if (doc->key == NULL) {
1152        doc->key = (void *)malloc(doc->length.keylen);
1153        key_alloc = 1;
1154    }
1155    if (doc->meta == NULL && doc->length.metalen) {
1156        doc->meta = (void *)malloc(doc->length.metalen);
1157        meta_alloc = 1;
1158    }
1159
1160    _offset = _docio_read_doc_component(handle, _offset, doc->length.keylen,
1161                                        doc->key, log_callback);
1162    if (_offset < 0) {
1163        fdb_log(log_callback, (fdb_status) _offset,
1164                "Error in reading a key with offset %" _F64 ", length %d "
1165                "from a database file '%s'", offset, doc->length.keylen,
1166                handle->file->filename);
1167        free_docio_object(doc, key_alloc, meta_alloc, 0);
1168        return _offset;
1169    }
1170
1171    // read timestamp
1172    _offset = _docio_read_doc_component(handle, _offset,
1173                                        sizeof(timestamp_t),
1174                                        &_timestamp, log_callback);
1175    if (_offset < 0) {
1176        fdb_log(log_callback, (fdb_status) _offset,
1177                "Error in reading a timestamp with offset %" _F64 ", length %d "
1178                "from a database file '%s'", offset, sizeof(timestamp_t),
1179                handle->file->filename);
1180        free_docio_object(doc, key_alloc, meta_alloc, 0);
1181        return _offset;
1182    }
1183    doc->timestamp = _endian_decode(_timestamp);
1184
1185    // copy sequence number (optional)
1186    _offset = _docio_read_doc_component(handle, _offset, sizeof(fdb_seqnum_t),
1187                                        (void *)&_seqnum, log_callback);
1188    if (_offset < 0) {
1189        fdb_log(log_callback, (fdb_status) _offset,
1190                "Error in reading a sequence number with offset %" _F64 ", length %d "
1191                "from a database file '%s'", offset, sizeof(fdb_seqnum_t),
1192                handle->file->filename);
1193        free_docio_object(doc, key_alloc, meta_alloc, 0);
1194        return _offset;
1195    }
1196    doc->seqnum = _endian_decode(_seqnum);
1197
1198    _offset = _docio_read_doc_component(handle, _offset, doc->length.metalen,
1199                                        doc->meta, log_callback);
1200    if (_offset < 0) {
1201        fdb_log(log_callback, (fdb_status) _offset,
1202                "Error in reading the doc metadata with offset %" _F64 ", length %d "
1203                "from a database file '%s'", offset, doc->length.metalen,
1204                handle->file->filename);
1205        free_docio_object(doc, key_alloc, meta_alloc, 0);
1206        return _offset;
1207    }
1208
1209    uint8_t free_meta = meta_alloc && !doc->length.metalen;
1210    free_docio_object(doc, 0, free_meta, 0);
1211
1212    return _offset;
1213}
1214
1215int64_t docio_read_doc(struct docio_handle *handle, uint64_t offset,
1216                       struct docio_object *doc,
1217                       bool read_on_cache_miss)
1218{
1219    uint8_t checksum;
1220    int64_t _offset;
1221    int key_alloc = 0;
1222    int meta_alloc = 0;
1223    int body_alloc = 0;
1224    fdb_seqnum_t _seqnum;
1225    timestamp_t _timestamp;
1226    void *comp_body = NULL;
1227    struct docio_length _length, zero_length;
1228    err_log_callback *log_callback = handle->log_callback;
1229
1230    _offset = _docio_read_length(handle, offset, &_length, log_callback,
1231                                 read_on_cache_miss);
1232    if (_offset < 0) {
1233        if (read_on_cache_miss) {
1234            fdb_log(log_callback, (fdb_status) _offset,
1235                    "Error in reading the doc length metadata with offset %" _F64 " from "
1236                    "a database file '%s'",
1237                    offset, handle->file->filename);
1238        }
1239        return _offset;
1240    }
1241
1242    memset(&zero_length, 0x0, sizeof(struct docio_length));
1243    if (memcmp(&_length, &zero_length, sizeof(struct docio_length)) == 0) {
1244        // If all the fields in docio_length are zero, then it means that the rest of
1245        // the current block, which starts at offset, is zero-filled and can be skipped.
1246        doc->length = zero_length;
1247        return (int64_t) FDB_RESULT_SUCCESS;
1248    }
1249
1250    // checksum check
1251    checksum = _docio_length_checksum(_length, handle);
1252    if (checksum != _length.checksum) {
1253        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1254                "doc_length body checksum mismatch error in a database file '%s'"
1255                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
1256                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1257                checksum, _length.checksum, _length.keylen, _length.metalen,
1258                _length.bodylen, _length.bodylen_ondisk, offset);
1259        return (int64_t) FDB_RESULT_CHECKSUM_ERROR;
1260    }
1261
1262    doc->length = _docio_length_decode(_length);
1263    if (doc->length.flag & DOCIO_TXN_COMMITTED) {
1264        // transaction commit mark
1265        // read the corresponding doc offset
1266
1267        // If TXN_COMMITTED flag is set, this doc is not an actual doc, but a
1268        // transaction commit marker. Thus, all lengths should be zero.
1269        if (doc->length.keylen || doc->length.metalen ||
1270            doc->length.bodylen || doc->length.bodylen_ondisk) {
1271            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1272                    "File corruption: Doc length fields in a transaction commit marker "
1273                    "was not zero in a database file '%s' offset %" _F64,
1274                    handle->file->filename, offset);
1275            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1276            return (int64_t) FDB_RESULT_FILE_CORRUPTION;
1277        }
1278
1279        uint64_t doc_offset;
1280        _offset = _docio_read_doc_component(handle, _offset,
1281                                            sizeof(doc_offset), &doc_offset,
1282                                            log_callback);
1283        if (_offset < 0) {
1284            fdb_log(log_callback, (fdb_status) _offset,
1285                    "Error in reading an offset of a committed doc from an offset %" _F64
1286                    " in a database file '%s'", offset, handle->file->filename);
1287            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1288            return _offset;
1289        }
1290        doc->doc_offset = _endian_decode(doc_offset);
1291        // The offset of the actual document that pointed by this commit marker
1292        // should not be greater than the file size.
1293        if (doc->doc_offset > filemgr_get_pos(handle->file)) {
1294            fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1295                    "File corruption: Offset %" _F64 " of the actual doc pointed by the "
1296                    "commit marker is greater than the size %" _F64 " of a database file '%s'",
1297                    doc->doc_offset, filemgr_get_pos(handle->file),
1298                    handle->file->filename);
1299            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1300            return (int64_t) FDB_RESULT_FILE_CORRUPTION;
1301        }
1302        return _offset;
1303    }
1304
1305    if (doc->length.keylen == 0 || doc->length.keylen > FDB_MAX_KEYLEN_INTERNAL) {
1306        fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1307                "Error in decoding the doc length metadata (key length: %d) from "
1308                "a database file '%s' offset %" _F64, doc->length.keylen,
1309                handle->file->filename, offset);
1310        return (int64_t) FDB_RESULT_FILE_CORRUPTION;
1311    }
1312
1313    if (doc->key == NULL) {
1314        doc->key = (void *)malloc(doc->length.keylen);
1315        key_alloc = 1;
1316    }
1317    if (doc->meta == NULL && doc->length.metalen) {
1318        doc->meta = (void *)malloc(doc->length.metalen);
1319        meta_alloc = 1;
1320    }
1321    if (doc->body == NULL && doc->length.bodylen) {
1322        doc->body = (void *)malloc(doc->length.bodylen);
1323        body_alloc = 1;
1324    }
1325
1326    _offset = _docio_read_doc_component(handle, _offset,
1327                                        doc->length.keylen,
1328                                        doc->key,
1329                                        log_callback);
1330    if (_offset < 0) {
1331        fdb_log(log_callback, (fdb_status) _offset,
1332                "Error in reading a key with offset %" _F64 ", length %d "
1333                "from a database file '%s'", offset, doc->length.keylen,
1334                handle->file->filename);
1335        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1336        return _offset;
1337    }
1338
1339    // read timestamp
1340    _offset = _docio_read_doc_component(handle, _offset,
1341                                        sizeof(timestamp_t),
1342                                        &_timestamp,
1343                                        log_callback);
1344    if (_offset < 0) {
1345        fdb_log(log_callback, (fdb_status) _offset,
1346                "Error in reading a timestamp with offset %" _F64 ", length %d "
1347                "from a database file '%s'", offset, sizeof(timestamp_t),
1348                handle->file->filename);
1349        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1350        return _offset;
1351    }
1352    doc->timestamp = _endian_decode(_timestamp);
1353
1354    // copy seqeunce number (optional)
1355    _offset = _docio_read_doc_component(handle, _offset,
1356                                        sizeof(fdb_seqnum_t),
1357                                        (void *)&_seqnum,
1358                                        log_callback);
1359    if (_offset < 0) {
1360        fdb_log(log_callback, (fdb_status) _offset,
1361                "Error in reading a sequence number with offset %" _F64 ", length %d "
1362                "from a database file '%s'", offset, sizeof(fdb_seqnum_t),
1363                handle->file->filename);
1364        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1365        return _offset;
1366    }
1367    doc->seqnum = _endian_decode(_seqnum);
1368
1369    _offset = _docio_read_doc_component(handle, _offset, doc->length.metalen,
1370                                        doc->meta, log_callback);
1371    if (_offset < 0) {
1372        fdb_log(log_callback, (fdb_status) _offset,
1373                "Error in reading the doc metadata with offset %" _F64 ", length %d "
1374                "from a database file '%s'", offset, doc->length.metalen,
1375                handle->file->filename);
1376        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1377        return _offset;
1378    }
1379
1380#ifdef _DOC_COMP
1381    if (doc->length.flag & DOCIO_COMPRESSED) {
1382        comp_body = (void*)malloc(doc->length.bodylen_ondisk);
1383        _offset = _docio_read_doc_component_comp(handle, _offset, doc->length.bodylen,
1384                                                 doc->length.bodylen_ondisk, doc->body,
1385                                                 comp_body, log_callback);
1386        if (_offset < 0) {
1387            fdb_log(log_callback, (fdb_status) _offset,
1388                    "Error in reading a compressed doc with offset %" _F64 ", length %d "
1389                    "from a database file '%s'", offset, doc->length.bodylen,
1390                    handle->file->filename);
1391            if (comp_body) {
1392                free(comp_body);
1393            }
1394            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1395            return _offset;
1396        }
1397    } else {
1398        _offset = _docio_read_doc_component(handle, _offset, doc->length.bodylen,
1399                                            doc->body, log_callback);
1400        if (_offset < 0) {
1401            fdb_log(log_callback, (fdb_status) _offset,
1402                    "Error in reading a doc with offset %" _F64 ", length %d "
1403                    "from a database file '%s'", offset, doc->length.bodylen,
1404                    handle->file->filename);
1405            free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1406            return _offset;
1407        }
1408    }
1409#else
1410    _offset = _docio_read_doc_component(handle, _offset, doc->length.bodylen,
1411                                        doc->body, log_callback);
1412    if (_offset < 0) {
1413        fdb_log(log_callback, (fdb_status) _offset,
1414                "Error in reading a doc with offset %" _F64 ", length %d "
1415                "from a database file '%s'", offset, doc->length.bodylen,
1416                handle->file->filename);
1417        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1418        return _offset;
1419    }
1420#endif
1421
1422#ifdef __CRC32
1423    uint32_t crc_file, crc;
1424    _offset = _docio_read_doc_component(handle, _offset, sizeof(crc_file),
1425                                        (void *)&crc_file, log_callback);
1426    if (_offset < 0) {
1427        fdb_log(log_callback, (fdb_status) _offset,
1428                "Error in reading a doc's CRC value with offset %" _F64 ", length %d "
1429                "from a database file '%s'", offset, sizeof(crc_file),
1430                handle->file->filename);
1431        if (comp_body) {
1432            free(comp_body);
1433        }
1434        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1435        return _offset;
1436    }
1437
1438    crc = get_checksum(reinterpret_cast<const uint8_t*>(&_length),
1439                       sizeof(_length),
1440                       handle->file->crc_mode);
1441    crc = get_checksum(reinterpret_cast<const uint8_t*>(doc->key),
1442                       doc->length.keylen,
1443                       crc,
1444                       handle->file->crc_mode);
1445    crc = get_checksum(reinterpret_cast<const uint8_t*>(&_timestamp),
1446                       sizeof(timestamp_t),
1447                       crc,
1448                       handle->file->crc_mode);
1449    crc = get_checksum(reinterpret_cast<const uint8_t*>(&_seqnum),
1450                       sizeof(fdb_seqnum_t),
1451                       crc,
1452                       handle->file->crc_mode);
1453    crc = get_checksum(reinterpret_cast<const uint8_t*>(doc->meta),
1454                       doc->length.metalen,
1455                       crc,
1456                       handle->file->crc_mode);
1457
1458    if (doc->length.flag & DOCIO_COMPRESSED) {
1459        crc = get_checksum(reinterpret_cast<const uint8_t*>(comp_body),
1460                           doc->length.bodylen_ondisk,
1461                           crc,
1462                           handle->file->crc_mode);
1463        if (comp_body) {
1464            free(comp_body);
1465        }
1466    } else {
1467        crc = get_checksum(reinterpret_cast<const uint8_t*>(doc->body),
1468                           doc->length.bodylen,
1469                           crc,
1470                           handle->file->crc_mode);
1471    }
1472    if (crc != crc_file) {
1473        fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1474                "doc_body checksum mismatch error in a database file '%s'"
1475                " crc %x != %x (crc in doc) keylen %d metalen %d bodylen %d "
1476                "bodylen_ondisk %d offset %" _F64, handle->file->filename,
1477                crc, crc_file, _length.keylen, _length.metalen,
1478                _length.bodylen, _length.bodylen_ondisk, offset);
1479        free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1480        return (int64_t) FDB_RESULT_CHECKSUM_ERROR;
1481    }
1482#endif
1483
1484    uint8_t free_meta = meta_alloc && !doc->length.metalen;
1485    uint8_t free_body = body_alloc && !doc->length.bodylen;
1486    free_docio_object(doc, 0, free_meta, free_body);
1487
1488    return _offset;
1489}
1490
1491static int _submit_async_io_requests(struct docio_handle *handle,
1492                                     struct docio_object *doc_array,
1493                                     size_t doc_idx,
1494                                     struct async_io_handle *aio_handle,
1495                                     int size,
1496                                     size_t *sum_doc_size,
1497                                     bool keymeta_only)
1498{
1499#ifdef _ASYNC_IO
1500#if !defined(WIN32) && !defined(_WIN32)
1501    struct io_event* io_evt = NULL;
1502    uint8_t *buf = NULL;
1503    uint64_t offset = 0, _offset = 0;
1504    int num_events = 0;
1505
1506    int num_sub = handle->file->ops->aio_submit(aio_handle, size);
1507    if (num_sub < 0) {
1508        // Error loggings
1509        char errno_msg[512];
1510        handle->file->ops->get_errno_str(errno_msg, 512);
1511        fdb_log(handle->log_callback, (fdb_status) num_sub,
1512                "Error in submitting async I/O requests to a file '%s', errno msg: %s",
1513                handle->file->filename, errno_msg);
1514        return num_sub;
1515    } else if (num_sub != size) {
1516        // Error loggings
1517        char errno_msg[512];
1518        handle->file->ops->get_errno_str(errno_msg, 512);
1519        fdb_log(handle->log_callback, (fdb_status) num_sub,
1520                "Error in submitting async I/O requests to a file '%s', errno msg: %s, "
1521                "%d requests were submitted, but only %d requests were processed",
1522                handle->file->filename, errno_msg, size, num_sub);
1523        return num_sub;
1524    }
1525
1526    while (num_sub > 0) {
1527        num_events = handle->file->ops->aio_getevents(aio_handle, 1,
1528                                                      num_sub, (unsigned int) -1);
1529        if (num_events < 0) {
1530            // Error loggings
1531            char errno_msg[512];
1532            handle->file->ops->get_errno_str(errno_msg, 512);
1533            fdb_log(handle->log_callback, (fdb_status) num_events,
1534                    "Error in getting async I/O events from the completion queue "
1535                    "for a file '%s', errno msg: %s", handle->file->filename, errno_msg);
1536            return num_events;
1537        }
1538        num_sub -= num_events;
1539        for (io_evt = aio_handle->events; num_events > 0; --num_events, ++io_evt) {
1540            buf = (uint8_t *) io_evt->obj->u.c.buf;
1541            offset = *((uint64_t *) io_evt->data); // Original offset.
1542
1543            // Set the docio handle's buffer to the AIO buffer to read
1544            // a doc from the AIO buffer. If adddtional blocks need to be
1545            // read, then they will be sequentially read through the synchronous
1546            // I/O path (i.e., buffer cache -> disk read if cache miss).
1547            // As these additional blocks are sequential reads, we don't expect
1548            // asynchronous I/O to give us performance boost.
1549            void *tmp_buffer = handle->readbuffer;
1550            handle->readbuffer = buf;
1551            handle->lastbid = offset / aio_handle->block_size;
1552            memset(&doc_array[doc_idx], 0x0, sizeof(struct docio_object));
1553            if (keymeta_only) {
1554                _offset = docio_read_doc_key_meta(handle, offset,
1555                                                  &doc_array[doc_idx], true);
1556            } else {
1557                _offset = docio_read_doc(handle, offset, &doc_array[doc_idx],
1558                                         true);
1559            }
1560            if (_offset <= 0) {
1561                ++doc_idx;
1562                handle->readbuffer = tmp_buffer;
1563                handle->lastbid = BLK_NOT_FOUND;
1564                continue;
1565            }
1566            handle->readbuffer = tmp_buffer;
1567            handle->lastbid = BLK_NOT_FOUND;
1568
1569            (*sum_doc_size) += _fdb_get_docsize(doc_array[doc_idx].length);
1570            if (keymeta_only) {
1571                (*sum_doc_size) -= doc_array[doc_idx].length.bodylen_ondisk;
1572            }
1573            ++doc_idx;
1574        }
1575    }
1576    return size;
1577#else // Plan to implement async I/O in other OSs (e.g., Windows, OSx)
1578    return 0;
1579#endif
1580#else // Async I/O is not supported in the current OS.
1581    return 0;
1582#endif
1583}
1584
1585size_t docio_batch_read_docs(struct docio_handle *handle,
1586                             uint64_t *offset_array,
1587                             struct docio_object *doc_array,
1588                             size_t array_size,
1589                             size_t data_size_threshold,
1590                             size_t batch_size_threshold,
1591                             struct async_io_handle *aio_handle,
1592                             bool keymeta_only)
1593{
1594    size_t i = 0;
1595    size_t sum_doc_size = 0;
1596    size_t doc_idx = 0;
1597    size_t block_size = handle->file->blocksize;
1598    uint64_t _offset = 0;
1599    int aio_size = 0;
1600    bool read_fail = false;
1601    bool read_on_cache_miss = true;
1602
1603    if (aio_handle) {
1604        // If async I/O is supported, we will then read non-resident docs from disk
1605        // by using async I/O operations.
1606        read_on_cache_miss = false;
1607    }
1608
1609    for (i = 0; i < array_size && i < batch_size_threshold &&
1610           sum_doc_size < data_size_threshold; ++i) {
1611        memset(&doc_array[doc_idx], 0x0, sizeof(struct docio_object));
1612        if (keymeta_only) {
1613            _offset = docio_read_doc_key_meta(handle, offset_array[i], &doc_array[doc_idx],
1614                                              read_on_cache_miss);
1615        } else {
1616            _offset = docio_read_doc(handle, offset_array[i], &doc_array[doc_idx],
1617                                     read_on_cache_miss);
1618        }
1619        if (_offset <= 0) {
1620            if (aio_handle) {
1621                // The page is not resident in the cache. Prepare and perform Async I/O
1622                handle->file->ops->aio_prep_read(aio_handle, aio_size,
1623                                                 block_size, offset_array[i]);
1624                if (++aio_size == (int) aio_handle->queue_depth) {
1625                    int num_sub = _submit_async_io_requests(handle, doc_array, doc_idx,
1626                                                            aio_handle, aio_size,
1627                                                            &sum_doc_size,
1628                                                            keymeta_only);
1629                    if (num_sub < 0 || num_sub != aio_size) {
1630                        read_fail = true;
1631                        break;
1632                    }
1633                    aio_size = 0;
1634                    doc_idx += num_sub;
1635                }
1636            } else {
1637                ++doc_idx; // Error in reading a doc.
1638            }
1639        } else {
1640            sum_doc_size += _fdb_get_docsize(doc_array[doc_idx].length);
1641            if (keymeta_only) {
1642                sum_doc_size -= doc_array[doc_idx].length.bodylen_ondisk;
1643            }
1644            ++doc_idx;
1645        }
1646    }
1647
1648    if (aio_size && !read_fail) {
1649        int num_sub = _submit_async_io_requests(handle, doc_array, doc_idx,
1650                                                aio_handle, aio_size,
1651                                                &sum_doc_size, keymeta_only);
1652        if (num_sub < 0) {
1653            read_fail = true;
1654        } else {
1655            doc_idx += num_sub;
1656        }
1657    }
1658
1659    if (read_fail) {
1660        for (i = 0; i < batch_size_threshold; ++i) {
1661            free(doc_array[i].key);
1662            free(doc_array[i].meta);
1663            free(doc_array[i].body);
1664            doc_array[i].key = doc_array[i].meta = doc_array[i].body = NULL;
1665        }
1666        return (size_t) -1;
1667    }
1668
1669    return doc_idx;
1670}
1671
1672bool docio_check_buffer(struct docio_handle *handle,
1673                        bid_t bid,
1674                        uint64_t sb_bmp_revnum)
1675{
1676    err_log_callback *log_callback = handle->log_callback;
1677    _docio_read_through_buffer(handle, bid, log_callback, true);
1678    return _docio_check_buffer(handle, sb_bmp_revnum);
1679}
1680
1681