1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 
22 #include "docio.h"
23 #include "wal.h"
24 #include "fdb_internal.h"
25 #ifdef _DOC_COMP
26 #include "snappy-c.h"
27 #endif
28 
29 #include "memleak.h"
30 
docio_init(struct docio_handle *handle, struct filemgr *file, bool compress_document_body)31 void docio_init(struct docio_handle *handle,
32                 struct filemgr *file,
33                 bool compress_document_body)
34 {
35     handle->file = file;
36     handle->curblock = BLK_NOT_FOUND;
37     handle->curpos = 0;
38     handle->lastbid = BLK_NOT_FOUND;
39     handle->compress_document_body = compress_document_body;
40     malloc_align(handle->readbuffer, FDB_SECTOR_SIZE, file->blocksize);
41 }
42 
docio_free(struct docio_handle *handle)43 void docio_free(struct docio_handle *handle)
44 {
45     free_align(handle->readbuffer);
46 }
47 
48 #ifdef __CRC32
49 #define _add_blk_marker(file, bid, blocksize, marker, log_callback) \
50     filemgr_write_offset((file), (bid), (blocksize), BLK_MARKER_SIZE, (marker), (log_callback))
51 #else
52 #define _add_blk_marker(file, bid, blocksize, marker, log_callback) \
53     FDB_RESULT_SUCCESS
54 #endif
55 
_docio_fill_zero(struct docio_handle *handle, bid_t bid, size_t pos)56 INLINE fdb_status _docio_fill_zero(struct docio_handle *handle, bid_t bid,
57                                    size_t pos)
58 {
59     // Fill next few bytes (sizeof(struct docio_length)) with zero
60     // to avoid false positive docio_length checksum during file scanning.
61     // (Note that the checksum value of zero-filled docio_length is 0x6F.)
62 
63     size_t blocksize = handle->file->blocksize;
64     size_t len_size = sizeof(struct docio_length);
65     uint8_t *zerobuf = alca(uint8_t, len_size);
66 
67 #ifdef __CRC32
68     blocksize -= BLK_MARKER_SIZE;
69 #endif
70 
71     if (pos + len_size <= blocksize) {
72         // enough space in the block
73         memset(zerobuf, 0x0, len_size);
74         return filemgr_write_offset(handle->file, bid, pos, len_size,
75                                     zerobuf, handle->log_callback);
76     } else {
77         // lack of space .. we don't need to fill zero bytes.
78         return FDB_RESULT_SUCCESS;
79     }
80 }
81 
docio_append_doc_raw(struct docio_handle *handle, uint64_t size, void *buf)82 bid_t docio_append_doc_raw(struct docio_handle *handle, uint64_t size, void *buf)
83 {
84     uint32_t offset;
85     uint8_t marker[BLK_MARKER_SIZE];
86     size_t blocksize = handle->file->blocksize;
87     size_t real_blocksize = blocksize;
88     err_log_callback *log_callback = handle->log_callback;
89 #ifdef __CRC32
90     blocksize -= BLK_MARKER_SIZE;
91     memset(marker, BLK_MARKER_DOC, BLK_MARKER_SIZE);
92 #endif
93 
94     if (handle->curblock == BLK_NOT_FOUND) {
95         // allocate new block
96         handle->curblock = filemgr_alloc(handle->file, log_callback);
97         handle->curpos = 0;
98     }
99     if (!filemgr_is_writable(handle->file, handle->curblock)) {
100         // allocate new block
101         handle->curblock = filemgr_alloc(handle->file, log_callback);
102         handle->curpos = 0;
103     }
104 
105     if (size <= blocksize - handle->curpos) {
106         fdb_status fs = FDB_RESULT_SUCCESS;
107         // simply append to current block
108         offset = handle->curpos;
109         fs = _add_blk_marker(handle->file, handle->curblock, blocksize, marker,
110                              log_callback);
111         if (fs != FDB_RESULT_SUCCESS) {
112             fdb_log(log_callback, fs,
113                     "Error in appending a doc block marker for a block id %" _F64
114                     " into a database file '%s'", handle->curblock,
115                     handle->file->filename);
116             return BLK_NOT_FOUND;
117         }
118         fs = filemgr_write_offset(handle->file, handle->curblock, offset, size,
119                                   buf, log_callback);
120         if (fs != FDB_RESULT_SUCCESS) {
121             fdb_log(log_callback, fs,
122                     "Error in writing a doc block with id %" _F64 ", offset %d, size %"
123                     _F64 " to a database file '%s'", handle->curblock, offset, size,
124                     handle->file->filename);
125             return BLK_NOT_FOUND;
126         }
127         handle->curpos += size;
128 
129         if (_docio_fill_zero(handle, handle->curblock, handle->curpos) !=
130             FDB_RESULT_SUCCESS) {
131             return BLK_NOT_FOUND;
132         }
133 
134         return handle->curblock * real_blocksize + offset;
135 
136     } else {
137         // not simply fitted into current block
138         bid_t begin, end, i, startpos;
139         uint32_t nblock = size / blocksize;
140         uint32_t remain = size % blocksize;
141         uint64_t remainsize = size;
142         fdb_status fs = FDB_RESULT_SUCCESS;
143 
144 #ifdef DOCIO_BLOCK_ALIGN
145         offset = blocksize - handle->curpos;
146         if (remain <= blocksize - handle->curpos &&
147             filemgr_alloc_multiple_cond(handle->file, handle->curblock+1,
148                                         nblock + ((remain>offset)?1:0), &begin, &end,
149                                         log_callback) == handle->curblock+1) {
150 
151             // start from current block
152             fdb_assert(begin == handle->curblock + 1, begin, handle->curblock+1);
153 
154             fs = _add_blk_marker(handle->file, handle->curblock, blocksize,
155                                  marker, log_callback);
156             if (fs != FDB_RESULT_SUCCESS) {
157                 fdb_log(log_callback, fs,
158                         "Error in appending a doc block marker for a block id %" _F64
159                         " into a database file '%s'", handle->curblock,
160                         handle->file->filename);
161                 return BLK_NOT_FOUND;
162             }
163             if (offset > 0) {
164                 fs = filemgr_write_offset(handle->file, handle->curblock,
165                                           handle->curpos, offset, buf, log_callback);
166                 if (fs != FDB_RESULT_SUCCESS) {
167                     fdb_log(log_callback, fs,
168                             "Error in writing a doc block with id %" _F64 ", offset %d, "
169                             "size %" _F64 " to a database file '%s'", handle->curblock,
170                             offset, size, handle->file->filename);
171                     return BLK_NOT_FOUND;
172                 }
173             }
174             remainsize -= offset;
175 
176             startpos = handle->curblock * real_blocksize + handle->curpos;
177         } else {
178             // next block to be allocated is not continuous .. allocate new multiple blocks
179             filemgr_alloc_multiple(handle->file, nblock+((remain>0)?1:0),
180                                    &begin, &end, log_callback);
181             offset = 0;
182 
183             startpos = begin * real_blocksize;
184         }
185 
186 #else
187         // simple append mode .. always append at the end of file
188         offset = blocksize - handle->curpos;
189         if (filemgr_alloc_multiple_cond(handle->file, handle->curblock+1,
190                                         nblock + ((remain>offset)?1:0), &begin, &end,
191                                         log_callback) == handle->curblock+1) {
192             // start from current block
193             fdb_assert(begin == handle->curblock + 1, begin, handle->curblock+1);
194 
195             fs = _add_blk_marker(handle->file, handle->curblock, blocksize,
196                                  marker, log_callback);
197             if (fs != FDB_RESULT_SUCCESS) {
198                 fdb_log(log_callback, fs,
199                         "Error in appending a doc block marker for a block id %" _F64
200                         " into a database file '%s'", handle->curblock,
201                         handle->file->filename);
202                 return BLK_NOT_FOUND;
203             }
204             if (offset > 0) {
205                 fs = filemgr_write_offset(handle->file, handle->curblock,
206                                           handle->curpos, offset, buf, log_callback);
207                 if (fs != FDB_RESULT_SUCCESS) {
208                     fdb_log(log_callback, fs,
209                             "Error in writing a doc block with id %" _F64 ", offset %d, "
210                             "size %" _F64 " to a database file '%s'", handle->curblock,
211                             offset, size, handle->file->filename);
212                     return BLK_NOT_FOUND;
213                 }
214             }
215             remainsize -= offset;
216 
217             startpos = handle->curblock * real_blocksize + handle->curpos;
218         } else {
219             // next block to be allocated is not continuous .. allocate new multiple blocks
220             filemgr_alloc_multiple(handle->file, nblock+((remain>0)?1:0),
221                                    &begin, &end, log_callback);
222             offset = 0;
223 
224             startpos = begin * real_blocksize;
225         }
226 
227 #endif
228 
229         for (i=begin; i<=end; ++i) {
230             handle->curblock = i;
231             if (remainsize >= blocksize) {
232                 // write entire block
233                 fs = _add_blk_marker(handle->file, i, blocksize, marker,
234                                      log_callback);
235                 if (fs != FDB_RESULT_SUCCESS) {
236                     fdb_log(log_callback, fs,
237                             "Error in appending a doc block marker for a block "
238                             "id %" _F64 " into a database file '%s'", i,
239                             handle->file->filename);
240                     return BLK_NOT_FOUND;
241                 }
242                 fs = filemgr_write_offset(handle->file, i, 0, blocksize,
243                                           (uint8_t *)buf + offset, log_callback);
244                 if (fs != FDB_RESULT_SUCCESS) {
245                     fdb_log(log_callback, fs,
246                             "Error in writing an entire doc block with id %" _F64
247                             ", size %" _F64 " to a database file '%s'", i, blocksize,
248                             handle->file->filename);
249                     return BLK_NOT_FOUND;
250                 }
251                 offset += blocksize;
252                 remainsize -= blocksize;
253                 handle->curpos = blocksize;
254 
255             } else {
256                 // write rest of document
257                 fdb_assert(i==end, i, end);
258                 fs = _add_blk_marker(handle->file, i, blocksize, marker,
259                                      log_callback);
260                 if (fs != FDB_RESULT_SUCCESS) {
261                     fdb_log(log_callback, fs,
262                             "Error in appending a doc block marker for a block "
263                             "id %" _F64 " into a database file '%s'", i,
264                             handle->file->filename);
265                     return BLK_NOT_FOUND;
266                 }
267                 fs = filemgr_write_offset(handle->file, i, 0, remainsize,
268                                           (uint8_t *)buf + offset, log_callback);
269                 if (fs != FDB_RESULT_SUCCESS) {
270                     fdb_log(log_callback, fs,
271                             "Error in writing a doc block with id %" _F64 ", "
272                             "size %" _F64 " to a database file '%s'", i, remainsize,
273                             handle->file->filename);
274                     return BLK_NOT_FOUND;
275                 }
276                 offset += remainsize;
277                 handle->curpos = remainsize;
278 
279                 if (_docio_fill_zero(handle, i, handle->curpos) !=
280                     FDB_RESULT_SUCCESS) {
281                     return BLK_NOT_FOUND;
282                 }
283             }
284         }
285 
286         return startpos;
287     }
288 
289     return 0;
290 }
291 
292 #ifdef __ENDIAN_SAFE
_docio_length_encode(struct docio_length length)293 INLINE struct docio_length _docio_length_encode(struct docio_length length)
294 {
295     struct docio_length ret;
296     ret = length;
297     ret.keylen = _endian_encode(length.keylen);
298     ret.metalen = _endian_encode(length.metalen);
299     ret.bodylen = _endian_encode(length.bodylen);
300     ret.bodylen_ondisk = _endian_encode(length.bodylen_ondisk);
301     return ret;
302 }
_docio_length_decode(struct docio_length length)303 INLINE struct docio_length _docio_length_decode(struct docio_length length)
304 {
305     struct docio_length ret;
306     ret = length;
307     ret.keylen = _endian_decode(length.keylen);
308     ret.metalen = _endian_decode(length.metalen);
309     ret.bodylen = _endian_decode(length.bodylen);
310     ret.bodylen_ondisk = _endian_decode(length.bodylen_ondisk);
311     return ret;
312 }
313 #else
314 #define _docio_length_encode(a)
315 #define _docio_length_decode(a)
316 #endif
317 
_docio_length_checksum(struct docio_length length)318 INLINE uint8_t _docio_length_checksum(struct docio_length length)
319 {
320     return (uint8_t)(
321         chksum(&length,
322                sizeof(keylen_t) + sizeof(uint16_t) + sizeof(uint32_t)*2)
323         & 0xff);
324 }
325 
_docio_append_doc(struct docio_handle *handle, struct docio_object *doc)326 INLINE bid_t _docio_append_doc(struct docio_handle *handle, struct docio_object *doc)
327 {
328     size_t _len;
329     uint32_t offset = 0;
330     uint32_t crc;
331     uint64_t docsize;
332     void *buf;
333     bid_t ret_offset;
334     fdb_seqnum_t _seqnum;
335     timestamp_t _timestamp;
336     struct docio_length length, _length;
337     err_log_callback *log_callback = handle->log_callback;
338 
339     length = doc->length;
340     length.bodylen_ondisk = length.bodylen;
341 
342 #ifdef _DOC_COMP
343     int ret;
344     void *compbuf = NULL;
345     uint32_t compbuf_len;
346     if (doc->length.bodylen > 0 && handle->compress_document_body) {
347         compbuf_len = snappy_max_compressed_length(length.bodylen);
348         compbuf = (void *)malloc(compbuf_len);
349 
350         _len = compbuf_len;
351         ret = snappy_compress((char*)doc->body, length.bodylen, (char*)compbuf, &_len);
352         if (ret < 0) { // LCOV_EXCL_START
353             fdb_log(log_callback, FDB_RESULT_COMPRESSION_FAIL,
354                     "Error in compressing the doc body of key '%s' from "
355                     "a database file '%s'",
356                     (char *) doc->key, handle->file->filename);
357             free(compbuf);
358             // we use BLK_NOT_FOUND for error code of appending instead of 0
359             // because document can be written at the byte offset 0
360             return BLK_NOT_FOUND;
361         } // LCOV_EXCL_STOP
362 
363         length.bodylen_ondisk = compbuf_len = _len;
364         length.flag |= DOCIO_COMPRESSED;
365 
366         docsize = sizeof(struct docio_length) + length.keylen + length.metalen;
367         docsize += compbuf_len;
368     } else {
369         docsize = sizeof(struct docio_length) + length.keylen + length.metalen + length.bodylen;
370         compbuf_len = length.bodylen;
371     }
372 #else
373     docsize = sizeof(struct docio_length) + length.keylen + length.metalen + length.bodylen;
374 #endif
375     docsize += sizeof(timestamp_t);
376 
377     docsize += sizeof(fdb_seqnum_t);
378 
379 #ifdef __CRC32
380     docsize += sizeof(crc);
381 #endif
382 
383     doc->length = length;
384     buf = (void *)malloc(docsize);
385 
386     _length = _docio_length_encode(length);
387 
388     // calculate checksum of LENGTH using crc
389     _length.checksum = _docio_length_checksum(_length);
390 
391     memcpy((uint8_t *)buf + offset, &_length, sizeof(struct docio_length));
392     offset += sizeof(struct docio_length);
393 
394     // copy key
395     memcpy((uint8_t *)buf + offset, doc->key, length.keylen);
396     offset += length.keylen;
397 
398     // copy timestamp
399     _timestamp = _endian_encode(doc->timestamp);
400     memcpy((uint8_t*)buf + offset, &_timestamp, sizeof(_timestamp));
401     offset += sizeof(_timestamp);
402 
403     // copy seqeunce number (optional)
404     _seqnum = _endian_encode(doc->seqnum);
405     memcpy((uint8_t *)buf + offset, &_seqnum, sizeof(fdb_seqnum_t));
406     offset += sizeof(fdb_seqnum_t);
407 
408     // copy metadata (optional)
409     if (length.metalen > 0) {
410         memcpy((uint8_t *)buf + offset, doc->meta, length.metalen);
411         offset += length.metalen;
412     }
413 
414     // copy body (optional)
415     if (length.bodylen > 0) {
416 #ifdef _DOC_COMP
417         if (length.flag & DOCIO_COMPRESSED) {
418             // compressed body
419             memcpy((uint8_t*)buf + offset, compbuf, compbuf_len);
420             offset += compbuf_len;
421             free(compbuf);
422         } else {
423             memcpy((uint8_t *)buf + offset, doc->body, length.bodylen);
424             offset += length.bodylen;
425         }
426 #else
427         memcpy((uint8_t *)buf + offset, doc->body, length.bodylen);
428         offset += length.bodylen;
429 #endif
430     }
431 
432 #ifdef __CRC32
433     crc = chksum(buf, docsize - sizeof(crc));
434     memcpy((uint8_t *)buf + offset, &crc, sizeof(crc));
435 #endif
436 
437     ret_offset = docio_append_doc_raw(handle, docsize, buf);
438     free(buf);
439 
440     return ret_offset;
441 }
442 
docio_append_commit_mark(struct docio_handle *handle, uint64_t doc_offset)443 bid_t docio_append_commit_mark(struct docio_handle *handle, uint64_t doc_offset)
444 {
445     uint32_t offset = 0;
446     uint64_t docsize;
447     uint64_t _doc_offset;
448     void *buf;
449     bid_t ret_offset;
450     struct docio_length length, _length;
451 
452     memset(&length, 0, sizeof(struct docio_length));
453     length.flag = DOCIO_TXN_COMMITTED;
454 
455     docsize = sizeof(struct docio_length) + sizeof(doc_offset);
456     buf = (void *)malloc(docsize);
457 
458     _length = _docio_length_encode(length);
459 
460     // calculate checksum of LENGTH using crc
461     _length.checksum = _docio_length_checksum(_length);
462 
463     memcpy((uint8_t *)buf + offset, &_length, sizeof(struct docio_length));
464     offset += sizeof(struct docio_length);
465 
466     // copy doc_offset
467     _doc_offset = _endian_encode(doc_offset);
468     memcpy((uint8_t *)buf + offset, &_doc_offset, sizeof(_doc_offset));
469 
470     ret_offset = docio_append_doc_raw(handle, docsize, buf);
471     free(buf);
472 
473     return ret_offset;
474 }
475 
docio_append_doc(struct docio_handle *handle, struct docio_object *doc, uint8_t deleted, uint8_t txn_enabled)476 bid_t docio_append_doc(struct docio_handle *handle, struct docio_object *doc,
477                        uint8_t deleted, uint8_t txn_enabled)
478 {
479     doc->length.flag = DOCIO_NORMAL;
480     if (deleted) {
481         doc->length.flag |= DOCIO_DELETED;
482     }
483     if (txn_enabled) {
484         doc->length.flag |= DOCIO_TXN_DIRTY;
485     }
486     return _docio_append_doc(handle, doc);
487 }
488 
docio_append_doc_system(struct docio_handle *handle, struct docio_object *doc)489 bid_t docio_append_doc_system(struct docio_handle *handle, struct docio_object *doc)
490 {
491     doc->length.flag = DOCIO_NORMAL | DOCIO_SYSTEM;
492     return _docio_append_doc(handle, doc);
493 }
494 
_docio_read_through_buffer(struct docio_handle *handle, bid_t bid, err_log_callback *log_callback, bool read_on_cache_miss)495 INLINE fdb_status _docio_read_through_buffer(struct docio_handle *handle,
496                                              bid_t bid,
497                                              err_log_callback *log_callback,
498                                              bool read_on_cache_miss)
499 {
500     fdb_status status = FDB_RESULT_SUCCESS;
501     // to reduce the overhead from memcpy the same block
502     if (handle->lastbid != bid) {
503         status = filemgr_read(handle->file, bid, handle->readbuffer,
504                               log_callback, read_on_cache_miss);
505         if (status != FDB_RESULT_SUCCESS) {
506             if (read_on_cache_miss) {
507                 fdb_log(log_callback, status,
508                         "Error in reading a doc block with id %" _F64 " from "
509                         "a database file '%s'", bid, handle->file->filename);
510             }
511             return status;
512         }
513 
514         if (filemgr_is_writable(handle->file, bid)) {
515             // this block can be modified later .. must be re-read
516             handle->lastbid = BLK_NOT_FOUND;
517         }else{
518             handle->lastbid = bid;
519         }
520     }
521 
522     return status;
523 }
524 
_docio_check_buffer(struct docio_handle *handle)525 INLINE int _docio_check_buffer(struct docio_handle *handle)
526 {
527     uint8_t marker[BLK_MARKER_SIZE];
528     marker[0] = *(((uint8_t *)handle->readbuffer)
529                  + handle->file->blocksize - BLK_MARKER_SIZE);
530     return (marker[0] == BLK_MARKER_DOC);
531 }
532 
_docio_read_length(struct docio_handle *handle, uint64_t offset, struct docio_length *length, err_log_callback *log_callback, bool read_on_cache_miss)533 static uint64_t _docio_read_length(struct docio_handle *handle,
534                                    uint64_t offset,
535                                    struct docio_length *length,
536                                    err_log_callback *log_callback,
537                                    bool read_on_cache_miss)
538 {
539     size_t blocksize = handle->file->blocksize;
540     size_t real_blocksize = blocksize;
541 #ifdef __CRC32
542     blocksize -= BLK_MARKER_SIZE;
543 #endif
544 
545     uint64_t file_pos = filemgr_get_pos(handle->file);
546     if (file_pos < (offset + sizeof(struct docio_length))) {
547         fdb_log(log_callback, FDB_RESULT_READ_FAIL,
548                 "Read request with offset %" _F64 " and size %d exceeds the current "
549                 "size %" _F64 " of a database file '%s'",
550                 offset, sizeof(struct docio_length), file_pos,
551                 handle->file->filename);
552         return offset;
553     }
554 
555     bid_t bid = offset / real_blocksize;
556     uint32_t pos = offset % real_blocksize;
557     void *buf = handle->readbuffer;
558     uint32_t restsize;
559 
560     restsize = blocksize - pos;
561     // read length structure
562     fdb_status fs = _docio_read_through_buffer(handle, bid, log_callback,
563                                                read_on_cache_miss);
564     if (fs != FDB_RESULT_SUCCESS) {
565         if (read_on_cache_miss) {
566             fdb_log(log_callback, fs,
567                     "Error in reading a doc length from a block with block id %" _F64
568                     " from a database file '%s'", bid, handle->file->filename);
569         }
570         return offset;
571     }
572     if (!_docio_check_buffer(handle)) {
573         return offset;
574     }
575 
576     if (restsize >= sizeof(struct docio_length)) {
577         memcpy(length, (uint8_t *)buf + pos, sizeof(struct docio_length));
578         pos += sizeof(struct docio_length);
579 
580     } else {
581         memcpy(length, (uint8_t *)buf + pos, restsize);
582         // read additional block
583         bid++;
584         fs = _docio_read_through_buffer(handle, bid, log_callback, true);
585         if (fs != FDB_RESULT_SUCCESS) {
586             fdb_log(log_callback, fs,
587                     "Error in reading a doc length from an additional block "
588                     "with block id %" _F64 " from a database file '%s'",
589                     bid, handle->file->filename);
590             return offset;
591         }
592         if (!_docio_check_buffer(handle)) {
593             return offset;
594         }
595         // memcpy rest of data
596         memcpy((uint8_t *)length + restsize, buf, sizeof(struct docio_length) - restsize);
597         pos = sizeof(struct docio_length) - restsize;
598     }
599 
600     return bid * real_blocksize + pos;
601 }
602 
_docio_read_doc_component(struct docio_handle *handle, uint64_t offset, uint32_t len, void *buf_out, err_log_callback *log_callback)603 static uint64_t _docio_read_doc_component(struct docio_handle *handle,
604                                           uint64_t offset,
605                                           uint32_t len,
606                                           void *buf_out,
607                                           err_log_callback *log_callback)
608 {
609     uint32_t rest_len;
610     size_t blocksize = handle->file->blocksize;
611     size_t real_blocksize = blocksize;
612 #ifdef __CRC32
613     blocksize -= BLK_MARKER_SIZE;
614 #endif
615 
616     bid_t bid = offset / real_blocksize;
617     uint32_t pos = offset % real_blocksize;
618     //uint8_t buf[handle->file->blocksize];
619     void *buf = handle->readbuffer;
620     uint32_t restsize;
621     fdb_status fs = FDB_RESULT_SUCCESS;
622 
623     rest_len = len;
624 
625     while(rest_len > 0) {
626         fs = _docio_read_through_buffer(handle, bid, log_callback, true);
627         if (fs != FDB_RESULT_SUCCESS) {
628             fdb_log(log_callback, FDB_RESULT_READ_FAIL,
629                     "Error in reading a doc block with block id %" _F64 " from "
630                     "a database file '%s'", bid, handle->file->filename);
631             return 0;
632         }
633         restsize = blocksize - pos;
634 
635         if (restsize >= rest_len) {
636             memcpy((uint8_t *)buf_out + (len - rest_len), (uint8_t *)buf + pos, rest_len);
637             pos += rest_len;
638             rest_len = 0;
639         }else{
640             memcpy((uint8_t *)buf_out + (len - rest_len), (uint8_t *)buf + pos, restsize);
641             bid++;
642             pos = 0;
643             rest_len -= restsize;
644 
645             if (rest_len > 0 &&
646                 bid >= filemgr_get_pos(handle->file) / handle->file->blocksize) {
647                 // no more data in the file .. the file is corrupted
648                 fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
649                         "Fatal error!!! Database file '%s' is corrupted.",
650                         handle->file->filename);
651                 // TODO: Need to return a better error code.
652                 return 0;
653             }
654         }
655     }
656 
657     return bid * real_blocksize + pos;
658 }
659 
660 #ifdef _DOC_COMP
661 
_docio_read_doc_component_comp(struct docio_handle *handle, uint64_t offset, uint32_t len, uint32_t comp_len, void *buf_out, void *comp_data_out, err_log_callback *log_callback)662 static uint64_t _docio_read_doc_component_comp(struct docio_handle *handle,
663                                                uint64_t offset,
664                                                uint32_t len,
665                                                uint32_t comp_len,
666                                                void *buf_out,
667                                                void *comp_data_out,
668                                                err_log_callback *log_callback)
669 {
670     int ret;
671     size_t uncomp_size;
672     uint64_t _offset;
673 
674     _offset = _docio_read_doc_component(handle, offset,
675                                         comp_len, comp_data_out, log_callback);
676     if (_offset == 0) {
677         fdb_log(log_callback, FDB_RESULT_READ_FAIL,
678                 "Error in reading the file with offset %" _F64 ", length %d "
679                 "from a database file '%s'", offset, len,
680                 handle->file->filename);
681         return 0;
682     }
683 
684     uncomp_size = len;
685     ret = snappy_uncompress((char*)comp_data_out, comp_len,
686                             (char*)buf_out, &uncomp_size);
687     if (ret < 0) {
688         fdb_log(log_callback, FDB_RESULT_COMPRESSION_FAIL,
689                 "Error in decompressing the data that was read with the file "
690                 "offset %" _F64 ", length %d from a database file '%s'",
691                 offset, len, handle->file->filename);
692         return 0;
693     }
694 
695     fdb_assert(uncomp_size == len, uncomp_size, len);
696     return _offset;
697 }
698 
699 #endif
700 
701 // return length.keylen = 0 if failure
docio_read_doc_length(struct docio_handle *handle, uint64_t offset)702 struct docio_length docio_read_doc_length(struct docio_handle *handle, uint64_t offset)
703 {
704     uint8_t checksum;
705     uint64_t _offset;
706     struct docio_length length, _length;
707     err_log_callback *log_callback = handle->log_callback;
708 
709     _offset = _docio_read_length(handle, offset, &_length, log_callback, true);
710     if (_offset == offset) {
711         length.keylen = 0;
712         return length;
713     }
714 
715     // checksum check
716     checksum = _docio_length_checksum(_length);
717     if (checksum != _length.checksum) {
718         fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
719                 "doc_length checksum mismatch error in a database file '%s'",
720                 handle->file->filename);
721         length.keylen = 0;
722         return length;
723     }
724 
725     length = _docio_length_decode(_length);
726     if (length.keylen == 0 || length.keylen > FDB_MAX_KEYLEN_INTERNAL) {
727         fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
728                 "Error in decoding the doc length metadata (key length: %d) from "
729                 "a database file '%s'", length.keylen, handle->file->filename);
730         length.keylen = 0;
731         return length;
732     }
733 
734     // document size check
735     if (offset + sizeof(struct docio_length) +
736         length.keylen + length.metalen + length.bodylen_ondisk >
737         filemgr_get_pos(handle->file)) {
738         fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
739                 "Fatal error!!! Database file '%s' is corrupted.",
740                 handle->file->filename);
741         length.keylen = 0;
742         return length;
743     }
744 
745     return length;
746 }
747 
748 // return length.keylen = 0 if failure
docio_read_doc_key(struct docio_handle *handle, uint64_t offset, keylen_t *keylen, void *keybuf)749 void docio_read_doc_key(struct docio_handle *handle, uint64_t offset,
750                         keylen_t *keylen, void *keybuf)
751 {
752     uint8_t checksum;
753     uint64_t _offset;
754     struct docio_length length, _length;
755     err_log_callback *log_callback = handle->log_callback;
756 
757     _offset = _docio_read_length(handle, offset, &_length, log_callback, true);
758     if (_offset == offset) {
759         fdb_log(log_callback, FDB_RESULT_READ_FAIL,
760                 "Error in reading the doc length metadata with offset %" _F64 " from "
761                 "a database file '%s'",
762                 offset, handle->file->filename);
763         *keylen = 0;
764         return;
765     }
766 
767     // checksum check
768     checksum = _docio_length_checksum(_length);
769     if (checksum != _length.checksum) {
770         fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
771                 "doc_length checksum mismatch error in a database file '%s'",
772                 handle->file->filename);
773         *keylen = 0;
774         return;
775     }
776 
777     length = _docio_length_decode(_length);
778     if (length.keylen == 0 || length.keylen > FDB_MAX_KEYLEN_INTERNAL) {
779         fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
780                 "Error in decoding the doc length metadata (key length: %d) from "
781                 "a database file '%s'", length.keylen, handle->file->filename);
782         *keylen = 0;
783         return;
784     }
785 
786     // document size check
787     if (offset + sizeof(struct docio_length) +
788         length.keylen + length.metalen + length.bodylen_ondisk >
789         filemgr_get_pos(handle->file)) {
790         fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
791                 "Fatal error!!! Database file '%s' is corrupted.",
792                 handle->file->filename);
793         *keylen = 0;
794         return;
795     }
796 
797     _offset = _docio_read_doc_component(handle, _offset, length.keylen,
798                                         keybuf, log_callback);
799     if (_offset == 0) {
800         fdb_log(log_callback, FDB_RESULT_READ_FAIL,
801                 "Error in reading a key with offset %" _F64 ", length %d "
802                 "from a database file '%s'", _offset, length.keylen,
803                 handle->file->filename);
804         *keylen = 0;
805         return;
806     }
807     *keylen = length.keylen;
808 }
809 
free_docio_object(struct docio_object *doc, uint8_t key_alloc, uint8_t meta_alloc, uint8_t body_alloc)810 void free_docio_object(struct docio_object *doc, uint8_t key_alloc,
811                        uint8_t meta_alloc, uint8_t body_alloc) {
812     if (!doc) {
813         return;
814     }
815 
816     if (key_alloc) {
817         free(doc->key);
818         doc->key = NULL;
819     }
820     if (meta_alloc) {
821         free(doc->meta);
822         doc->meta = NULL;
823     }
824     if (body_alloc) {
825         free(doc->body);
826         doc->body = NULL;
827     }
828 }
829 
docio_read_doc_key_meta(struct docio_handle *handle, uint64_t offset, struct docio_object *doc, bool read_on_cache_miss)830 uint64_t docio_read_doc_key_meta(struct docio_handle *handle, uint64_t offset,
831                                  struct docio_object *doc,
832                                  bool read_on_cache_miss)
833 {
834     uint8_t checksum;
835     uint64_t _offset;
836     int key_alloc = 0;
837     int meta_alloc = 0;
838     fdb_seqnum_t _seqnum;
839     timestamp_t _timestamp;
840     struct docio_length _length;
841     err_log_callback *log_callback = handle->log_callback;
842 
843     _offset = _docio_read_length(handle, offset, &_length, log_callback,
844                                  read_on_cache_miss);
845     if (_offset == offset) {
846         if (read_on_cache_miss) {
847             fdb_log(log_callback, FDB_RESULT_READ_FAIL,
848                     "Error in reading the doc length metadata with offset %" _F64 " from "
849                     "a database file '%s'",
850                     offset, handle->file->filename);
851         }
852         return offset;
853     }
854 
855     // checksum check
856     checksum = _docio_length_checksum(_length);
857     if (checksum != _length.checksum) {
858         fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
859                 "doc_length checksum mismatch error in a database file '%s'",
860                 handle->file->filename);
861         return offset;
862     }
863 
864     doc->length = _docio_length_decode(_length);
865     if (doc->length.keylen == 0 || doc->length.keylen > FDB_MAX_KEYLEN_INTERNAL) {
866         fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
867                 "Error in decoding the doc length metadata (key length: %d) from "
868                 "a database file '%s'", doc->length.keylen, handle->file->filename);
869         return offset;
870     }
871 
872     // document size check
873     if (offset + sizeof(struct docio_length) +
874         doc->length.keylen + doc->length.metalen + doc->length.bodylen_ondisk >
875         filemgr_get_pos(handle->file)) {
876         fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
877                 "Fatal error!!! Database file '%s' is corrupted.",
878                 handle->file->filename);
879         return offset;
880     }
881 
882     if (doc->key == NULL) {
883         doc->key = (void *)malloc(doc->length.keylen);
884         key_alloc = 1;
885     }
886     if (doc->meta == NULL && doc->length.metalen) {
887         doc->meta = (void *)malloc(doc->length.metalen);
888         meta_alloc = 1;
889     }
890 
891     fdb_assert(doc->key, handle, doc->length.keylen);
892 
893     _offset = _docio_read_doc_component(handle, _offset, doc->length.keylen,
894                                         doc->key, log_callback);
895     if (_offset == 0) {
896         fdb_log(log_callback, FDB_RESULT_READ_FAIL,
897                 "Error in reading a key with offset %" _F64 ", length %d "
898                 "from a database file '%s'", _offset, doc->length.keylen,
899                 handle->file->filename);
900         free_docio_object(doc, key_alloc, meta_alloc, 0);
901         return offset;
902     }
903 
904     // read timestamp
905     _offset = _docio_read_doc_component(handle, _offset,
906                                         sizeof(timestamp_t),
907                                         &_timestamp, log_callback);
908     if (_offset == 0) {
909         fdb_log(log_callback, FDB_RESULT_READ_FAIL,
910                 "Error in reading a timestamp with offset %" _F64 ", length %d "
911                 "from a database file '%s'", _offset, sizeof(timestamp_t),
912                 handle->file->filename);
913         free_docio_object(doc, key_alloc, meta_alloc, 0);
914         return offset;
915     }
916     doc->timestamp = _endian_decode(_timestamp);
917 
918     // copy sequence number (optional)
919     _offset = _docio_read_doc_component(handle, _offset, sizeof(fdb_seqnum_t),
920                                         (void *)&_seqnum, log_callback);
921     if (_offset == 0) {
922         fdb_log(log_callback, FDB_RESULT_READ_FAIL,
923                 "Error in reading a sequence number with offset %" _F64 ", length %d "
924                 "from a database file '%s'", _offset, sizeof(fdb_seqnum_t),
925                 handle->file->filename);
926         free_docio_object(doc, key_alloc, meta_alloc, 0);
927         return offset;
928     }
929     doc->seqnum = _endian_decode(_seqnum);
930 
931     _offset = _docio_read_doc_component(handle, _offset, doc->length.metalen,
932                                         doc->meta, log_callback);
933     if (_offset == 0) {
934         fdb_log(log_callback, FDB_RESULT_READ_FAIL,
935                 "Error in reading the doc metadata with offset %" _F64 ", length %d "
936                 "from a database file '%s'", _offset, doc->length.metalen,
937                 handle->file->filename);
938         free_docio_object(doc, key_alloc, meta_alloc, 0);
939         return offset;
940     }
941 
942     uint8_t free_meta = meta_alloc && !doc->length.metalen;
943     free_docio_object(doc, 0, free_meta, 0);
944 
945     return _offset;
946 }
947 
docio_read_doc(struct docio_handle *handle, uint64_t offset, struct docio_object *doc, bool read_on_cache_miss)948 uint64_t docio_read_doc(struct docio_handle *handle, uint64_t offset,
949                         struct docio_object *doc,
950                         bool read_on_cache_miss)
951 {
952     uint8_t checksum;
953     uint64_t _offset;
954     int key_alloc = 0;
955     int meta_alloc = 0;
956     int body_alloc = 0;
957     fdb_seqnum_t _seqnum;
958     timestamp_t _timestamp;
959     void *comp_body = NULL;
960     struct docio_length _length;
961     err_log_callback *log_callback = handle->log_callback;
962 
963     _offset = _docio_read_length(handle, offset, &_length, log_callback,
964                                  read_on_cache_miss);
965     if (_offset == offset) {
966         if (read_on_cache_miss) {
967             fdb_log(log_callback, FDB_RESULT_READ_FAIL,
968                     "Error in reading the doc length metadata with offset %" _F64 " from "
969                     "a database file '%s'",
970                     offset, handle->file->filename);
971         }
972         return offset;
973     }
974 
975     // checksum check
976     checksum = _docio_length_checksum(_length);
977     if (checksum != _length.checksum) {
978         fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
979                 "doc_length checksum mismatch error in a database file '%s'",
980                 handle->file->filename);
981         return offset;
982     }
983 
984     doc->length = _docio_length_decode(_length);
985     if (doc->length.flag & DOCIO_TXN_COMMITTED) {
986         // transaction commit mark
987         // read the corresponding doc offset
988 
989         // If TXN_COMMITTED flag is set, this doc is not an actual doc, but a
990         // transaction commit marker. Thus, all lengths should be zero.
991         if (doc->length.keylen || doc->length.metalen ||
992             doc->length.bodylen || doc->length.bodylen_ondisk) {
993             fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
994                     "File corruption: Doc length fields in a transaction commit marker "
995                     "was not zero in a database file '%s'", handle->file->filename);
996             free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
997             return offset;
998         }
999 
1000         uint64_t doc_offset;
1001         _offset = _docio_read_doc_component(handle, _offset,
1002                                             sizeof(doc_offset), &doc_offset,
1003                                             log_callback);
1004         if (_offset == 0) {
1005             fdb_log(log_callback, FDB_RESULT_READ_FAIL,
1006                     "Error in reading an offset of a committed doc from an offset %" _F64
1007                     " in a database file '%s'", _offset, handle->file->filename);
1008             free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1009             return offset;
1010         }
1011         doc->doc_offset = _endian_decode(doc_offset);
1012         // The offset of the actual document that pointed by this commit marker
1013         // should not be greater than the file size.
1014         if (doc->doc_offset > filemgr_get_pos(handle->file)) {
1015             fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1016                     "File corruption: Offset %" _F64 " of the actual doc pointed by the "
1017                     "commit marker is greater than the size %" _F64 " of a database file '%s'",
1018                     doc->doc_offset, filemgr_get_pos(handle->file),
1019                     handle->file->filename);
1020             free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1021             return offset;
1022         }
1023         return _offset;
1024     }
1025 
1026     if (doc->length.keylen == 0 || doc->length.keylen > FDB_MAX_KEYLEN_INTERNAL) {
1027         fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1028                 "Error in decoding the doc length metadata (key length: %d) from "
1029                 "a database file '%s'", doc->length.keylen, handle->file->filename);
1030         return offset;
1031     }
1032 
1033     // document size check
1034     if (offset + sizeof(struct docio_length) +
1035         doc->length.keylen + doc->length.metalen + doc->length.bodylen_ondisk >
1036         filemgr_get_pos(handle->file)) {
1037         fdb_log(log_callback, FDB_RESULT_FILE_CORRUPTION,
1038                 "Fatal error!!! Database file '%s' is corrupted.",
1039                 handle->file->filename);
1040         return offset;
1041     }
1042 
1043     if (doc->key == NULL) {
1044         doc->key = (void *)malloc(doc->length.keylen);
1045         key_alloc = 1;
1046     }
1047     if (doc->meta == NULL && doc->length.metalen) {
1048         doc->meta = (void *)malloc(doc->length.metalen);
1049         meta_alloc = 1;
1050     }
1051     if (doc->body == NULL && doc->length.bodylen) {
1052         doc->body = (void *)malloc(doc->length.bodylen);
1053         body_alloc = 1;
1054     }
1055 
1056     fdb_assert(doc->key, handle, doc->length.keylen);
1057 
1058     _offset = _docio_read_doc_component(handle, _offset,
1059                                         doc->length.keylen,
1060                                         doc->key,
1061                                         log_callback);
1062     if (_offset == 0) {
1063         fdb_log(log_callback, FDB_RESULT_READ_FAIL,
1064                 "Error in reading a key with offset %" _F64 ", length %d "
1065                 "from a database file '%s'", _offset, doc->length.keylen,
1066                 handle->file->filename);
1067         free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1068         return offset;
1069     }
1070 
1071     // read timestamp
1072     _offset = _docio_read_doc_component(handle, _offset,
1073                                         sizeof(timestamp_t),
1074                                         &_timestamp,
1075                                         log_callback);
1076     if (_offset == 0) {
1077         fdb_log(log_callback, FDB_RESULT_READ_FAIL,
1078                 "Error in reading a timestamp with offset %" _F64 ", length %d "
1079                 "from a database file '%s'", _offset, sizeof(timestamp_t),
1080                 handle->file->filename);
1081         free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1082         return offset;
1083     }
1084     doc->timestamp = _endian_decode(_timestamp);
1085 
1086     // copy seqeunce number (optional)
1087     _offset = _docio_read_doc_component(handle, _offset,
1088                                         sizeof(fdb_seqnum_t),
1089                                         (void *)&_seqnum,
1090                                         log_callback);
1091     if (_offset == 0) {
1092         fdb_log(log_callback, FDB_RESULT_READ_FAIL,
1093                 "Error in reading a sequence number with offset %" _F64 ", length %d "
1094                 "from a database file '%s'", _offset, sizeof(fdb_seqnum_t),
1095                 handle->file->filename);
1096         free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1097         return offset;
1098     }
1099     doc->seqnum = _endian_decode(_seqnum);
1100 
1101     _offset = _docio_read_doc_component(handle, _offset, doc->length.metalen,
1102                                         doc->meta, log_callback);
1103     if (_offset == 0) {
1104         fdb_log(log_callback, FDB_RESULT_READ_FAIL,
1105                 "Error in reading the doc metadata with offset %" _F64 ", length %d "
1106                 "from a database file '%s'", _offset, doc->length.metalen,
1107                 handle->file->filename);
1108         free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1109         return offset;
1110     }
1111 
1112 #ifdef _DOC_COMP
1113     if (doc->length.flag & DOCIO_COMPRESSED) {
1114         comp_body = (void*)malloc(doc->length.bodylen_ondisk);
1115         _offset = _docio_read_doc_component_comp(handle, _offset, doc->length.bodylen,
1116                                                  doc->length.bodylen_ondisk, doc->body,
1117                                                  comp_body, log_callback);
1118         if (_offset == 0) {
1119             fdb_log(log_callback, FDB_RESULT_READ_FAIL,
1120                     "Error in reading a compressed doc with offset %" _F64 ", length %d "
1121                     "from a database file '%s'", _offset, doc->length.bodylen,
1122                     handle->file->filename);
1123             if (comp_body) {
1124                 free(comp_body);
1125             }
1126             free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1127             return offset;
1128         }
1129     } else {
1130         _offset = _docio_read_doc_component(handle, _offset, doc->length.bodylen,
1131                                             doc->body, log_callback);
1132         if (_offset == 0) {
1133             fdb_log(log_callback, FDB_RESULT_READ_FAIL,
1134                     "Error in reading a doc with offset %" _F64 ", length %d "
1135                     "from a database file '%s'", _offset, doc->length.bodylen,
1136                     handle->file->filename);
1137             free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1138             return offset;
1139         }
1140     }
1141 #else
1142     _offset = _docio_read_doc_component(handle, _offset, doc->length.bodylen,
1143                                         doc->body, log_callback);
1144     if (_offset == 0) {
1145         fdb_log(log_callback, FDB_RESULT_READ_FAIL,
1146                 "Error in reading a doc with offset %" _F64 ", length %d "
1147                 "from a database file '%s'", _offset, doc->length.bodylen,
1148                 handle->file->filename);
1149         free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1150         return offset;
1151     }
1152 #endif
1153 
1154 #ifdef __CRC32
1155     uint32_t crc_file, crc;
1156     _offset = _docio_read_doc_component(handle, _offset, sizeof(crc_file),
1157                                         (void *)&crc_file, log_callback);
1158     if (_offset == 0) {
1159         fdb_log(log_callback, FDB_RESULT_READ_FAIL,
1160                 "Error in reading a doc's CRC value with offset %" _F64 ", length %d "
1161                 "from a database file '%s'", _offset, sizeof(crc_file),
1162                 handle->file->filename);
1163         if (comp_body) {
1164             free(comp_body);
1165         }
1166         free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1167         return offset;
1168     }
1169 
1170     crc = chksum((void *)&_length, sizeof(_length));
1171     crc = chksum_scd(doc->key, doc->length.keylen, crc);
1172     crc = chksum_scd((void *)&_timestamp, sizeof(timestamp_t), crc);
1173     crc = chksum_scd((void *)&_seqnum, sizeof(fdb_seqnum_t), crc);
1174     crc = chksum_scd(doc->meta, doc->length.metalen, crc);
1175     if (doc->length.flag & DOCIO_COMPRESSED) {
1176         crc = chksum_scd(comp_body, doc->length.bodylen_ondisk, crc);
1177         if (comp_body) {
1178             free(comp_body);
1179         }
1180     } else {
1181         crc = chksum_scd(doc->body, doc->length.bodylen, crc);
1182     }
1183     if (crc != crc_file) {
1184         fdb_log(log_callback, FDB_RESULT_CHECKSUM_ERROR,
1185                 "doc_body checksum mismatch error in a database file '%s'",
1186                 handle->file->filename);
1187         free_docio_object(doc, key_alloc, meta_alloc, body_alloc);
1188         return offset;
1189     }
1190 #endif
1191 
1192     uint8_t free_meta = meta_alloc && !doc->length.metalen;
1193     uint8_t free_body = body_alloc && !doc->length.bodylen;
1194     free_docio_object(doc, 0, free_meta, free_body);
1195 
1196     return _offset;
1197 }
1198 
_submit_async_io_requests(struct docio_handle *handle, struct docio_object *doc_array, size_t doc_idx, struct async_io_handle *aio_handle, int size, size_t *sum_doc_size, bool keymeta_only)1199 static int _submit_async_io_requests(struct docio_handle *handle,
1200                                      struct docio_object *doc_array,
1201                                      size_t doc_idx,
1202                                      struct async_io_handle *aio_handle,
1203                                      int size,
1204                                      size_t *sum_doc_size,
1205                                      bool keymeta_only)
1206 {
1207 #ifdef _ASYNC_IO
1208 #if !defined(WIN32) && !defined(_WIN32)
1209     struct io_event* io_evt = NULL;
1210     uint8_t *buf = NULL;
1211     uint64_t offset = 0, _offset = 0;
1212     int num_events = 0;
1213 
1214     int num_sub = handle->file->ops->aio_submit(aio_handle, size);
1215     if (num_sub < 0) {
1216         // Error loggings
1217         char errno_msg[512];
1218         handle->file->ops->get_errno_str(errno_msg, 512);
1219         fdb_log(handle->log_callback, (fdb_status) num_sub,
1220                 "Error in submitting async I/O requests to a file '%s', errno msg: %s",
1221                 handle->file->filename, errno_msg);
1222         return num_sub;
1223     }
1224     fdb_assert(num_sub == size, num_sub, size);
1225 
1226     while (num_sub > 0) {
1227         num_events = handle->file->ops->aio_getevents(aio_handle, 1,
1228                                                       num_sub, (unsigned int) -1);
1229         if (num_events < 0) {
1230             // Error loggings
1231             char errno_msg[512];
1232             handle->file->ops->get_errno_str(errno_msg, 512);
1233             fdb_log(handle->log_callback, (fdb_status) num_sub,
1234                     "Error in getting async I/O events from the completion queue "
1235                     "for a file '%s', errno msg: %s", handle->file->filename, errno_msg);
1236             return num_events;
1237         }
1238         num_sub -= num_events;
1239         for (io_evt = aio_handle->events; num_events > 0; --num_events, ++io_evt) {
1240             buf = (uint8_t *) io_evt->obj->u.c.buf;
1241             offset = *((uint64_t *) io_evt->data); // Original offset.
1242 
1243             // Set the docio handle's buffer to the AIO buffer to read
1244             // a doc from the AIO buffer. If adddtional blocks need to be
1245             // read, then they will be sequentially read through the synchronous
1246             // I/O path (i.e., buffer cache -> disk read if cache miss).
1247             // As these additional blocks are sequential reads, we don't expect
1248             // asynchronous I/O to give us performance boost.
1249             void *tmp_buffer = handle->readbuffer;
1250             handle->readbuffer = buf;
1251             handle->lastbid = offset / aio_handle->block_size;
1252             memset(&doc_array[doc_idx], 0x0, sizeof(struct docio_object));
1253             if (keymeta_only) {
1254                 _offset = docio_read_doc_key_meta(handle, offset,
1255                                                   &doc_array[doc_idx], true);
1256             } else {
1257                 _offset = docio_read_doc(handle, offset, &doc_array[doc_idx],
1258                                          true);
1259             }
1260             if (_offset == offset) {
1261                 ++doc_idx;
1262                 handle->readbuffer = tmp_buffer;
1263                 handle->lastbid = BLK_NOT_FOUND;
1264                 continue;
1265             }
1266             handle->readbuffer = tmp_buffer;
1267             handle->lastbid = BLK_NOT_FOUND;
1268 
1269             (*sum_doc_size) += _fdb_get_docsize(doc_array[doc_idx].length);
1270             ++doc_idx;
1271         }
1272     }
1273     return size;
1274 #else // Plan to implement async I/O in other OSs (e.g., Windows, OSx)
1275     return 0;
1276 #endif
1277 #else // Async I/O is not supported in the current OS.
1278     return 0;
1279 #endif
1280 }
1281 
docio_batch_read_docs(struct docio_handle *handle, uint64_t *offset_array, struct docio_object *doc_array, size_t array_size, size_t data_size_threshold, size_t batch_size_threshold, struct async_io_handle *aio_handle, bool keymeta_only)1282 size_t docio_batch_read_docs(struct docio_handle *handle,
1283                              uint64_t *offset_array,
1284                              struct docio_object *doc_array,
1285                              size_t array_size,
1286                              size_t data_size_threshold,
1287                              size_t batch_size_threshold,
1288                              struct async_io_handle *aio_handle,
1289                              bool keymeta_only)
1290 {
1291     size_t i = 0;
1292     size_t sum_doc_size = 0;
1293     size_t doc_idx = 0;
1294     size_t block_size = handle->file->blocksize;
1295     uint64_t _offset = 0;
1296     int aio_size = 0;
1297     bool read_fail = false;
1298     bool read_on_cache_miss = true;
1299 
1300     if (aio_handle) {
1301         // If async I/O is supported, we will then read non-resident docs from disk
1302         // by using async I/O operations.
1303         read_on_cache_miss = false;
1304     }
1305 
1306     for (i = 0; i < array_size && i < batch_size_threshold &&
1307            sum_doc_size < data_size_threshold; ++i) {
1308         memset(&doc_array[doc_idx], 0x0, sizeof(struct docio_object));
1309         if (keymeta_only) {
1310             _offset = docio_read_doc_key_meta(handle, offset_array[i], &doc_array[doc_idx],
1311                                               read_on_cache_miss);
1312         } else {
1313             _offset = docio_read_doc(handle, offset_array[i], &doc_array[doc_idx],
1314                                      read_on_cache_miss);
1315         }
1316         if (_offset == offset_array[i]) {
1317             if (aio_handle) {
1318                 // The page is not resident in the cache. Prepare and perform Async I/O
1319                 handle->file->ops->aio_prep_read(aio_handle, aio_size,
1320                                                  block_size, offset_array[i]);
1321                 if (++aio_size == (int) aio_handle->queue_depth) {
1322                     int num_sub = _submit_async_io_requests(handle, doc_array, doc_idx,
1323                                                             aio_handle, aio_size,
1324                                                             &sum_doc_size,
1325                                                             keymeta_only);
1326                     if (num_sub < 0) {
1327                         read_fail = true;
1328                         break;
1329                     }
1330                     fdb_assert(num_sub == aio_size, num_sub, aio_size);
1331                     aio_size = 0;
1332                     doc_idx += num_sub;
1333                 }
1334             } else {
1335                 ++doc_idx; // Error in reading a doc.
1336             }
1337         } else {
1338             sum_doc_size += _fdb_get_docsize(doc_array[doc_idx].length);
1339             ++doc_idx;
1340         }
1341     }
1342 
1343     if (aio_size && !read_fail) {
1344         int num_sub = _submit_async_io_requests(handle, doc_array, doc_idx,
1345                                                 aio_handle, aio_size,
1346                                                 &sum_doc_size, keymeta_only);
1347         if (num_sub < 0) {
1348             read_fail = true;
1349         } else {
1350             doc_idx += num_sub;
1351         }
1352     }
1353 
1354     if (read_fail) {
1355         for (i = 0; i < batch_size_threshold; ++i) {
1356             free(doc_array[i].key);
1357             free(doc_array[i].meta);
1358             free(doc_array[i].body);
1359             doc_array[i].key = doc_array[i].meta = doc_array[i].body = NULL;
1360         }
1361         return (size_t) -1;
1362     }
1363 
1364     return doc_idx;
1365 }
1366 
docio_check_buffer(struct docio_handle *handle, bid_t bid)1367 int docio_check_buffer(struct docio_handle *handle, bid_t bid)
1368 {
1369     err_log_callback *log_callback = handle->log_callback;
1370     _docio_read_through_buffer(handle, bid, log_callback, true);
1371     return _docio_check_buffer(handle);
1372 }
1373 
1374