xref: /6.0.3/forestdb/src/iterator.cc (revision 5403f419)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22
23#include "libforestdb/forestdb.h"
24#include "fdb_internal.h"
25#include "hbtrie.h"
26#include "docio.h"
27#include "btreeblock.h"
28#include "common.h"
29#include "wal.h"
30#include "avltree.h"
31#include "list.h"
32#include "internal_types.h"
33#include "btree_var_kv_ops.h"
34#include "timing.h"
35
36#include "memleak.h"
37
38#ifdef __DEBUG
39#ifndef __DEBUG_FDB
40    #undef DBG
41    #undef DBGCMD
42    #undef DBGSW
43    #define DBG(...)
44    #define DBGCMD(...)
45    #define DBGSW(n, ...)
46#endif
47#endif
48
49// lexicographically compares two variable-length binary streams
50static int _fdb_keycmp(void *key1, size_t keylen1, void *key2, size_t keylen2)
51{
52    if (keylen1 == keylen2) {
53        return memcmp(key1, key2, keylen1);
54    }else {
55        size_t len = MIN(keylen1, keylen2);
56        int cmp = memcmp(key1, key2, len);
57        if (cmp != 0) return cmp;
58        else {
59            return (int)((int)keylen1 - (int)keylen2);
60        }
61    }
62}
63
64static int _fdb_key_cmp(fdb_iterator *iterator, void *key1, size_t keylen1,
65                        void *key2, size_t keylen2) {
66    int cmp;
67    if (iterator->handle->kvs_config.custom_cmp) {
68        // custom compare function for variable length key
69        if (iterator->handle->kvs) {
70            // multi KV instance mode
71            // KV ID should be compared separately
72            size_t size_chunk = iterator->handle->config.chunksize;
73            fdb_kvs_id_t a_id, b_id;
74            buf2kvid(size_chunk, key1, &a_id);
75            buf2kvid(size_chunk, key2, &b_id);
76
77            if (a_id < b_id) {
78                cmp = -1;
79            } else if (a_id > b_id) {
80                cmp = 1;
81            } else {
82                if (keylen1 == size_chunk) { // key1 < key2
83                    return -1;
84                } else if (keylen2 == size_chunk) { // key1 > key2
85                    return 1;
86                }
87                cmp = iterator->handle->kvs_config.custom_cmp(
88                          (uint8_t*)key1 + size_chunk, keylen1 - size_chunk,
89                          (uint8_t*)key2 + size_chunk, keylen2 - size_chunk);
90            }
91        } else {
92            cmp = iterator->handle->kvs_config.custom_cmp(key1, keylen1,
93                                                       key2, keylen2);
94        }
95    } else {
96        cmp = _fdb_keycmp(key1, keylen1, key2, keylen2);
97    }
98    return cmp;
99}
100
101LIBFDB_API
102fdb_status fdb_iterator_init(fdb_kvs_handle *handle,
103                             fdb_iterator **ptr_iterator,
104                             const void *start_key,
105                             size_t start_keylen,
106                             const void *end_key,
107                             size_t end_keylen,
108                             fdb_iterator_opt_t opt)
109{
110    if (handle == NULL ||
111        start_keylen > FDB_MAX_KEYLEN ||
112        (handle->kvs_config.custom_cmp &&
113           (start_keylen > handle->config.blocksize - HBTRIE_HEADROOM ||
114            end_keylen > handle->config.blocksize - HBTRIE_HEADROOM) ) ||
115        end_keylen > FDB_MAX_KEYLEN) {
116        return FDB_RESULT_INVALID_ARGS;
117    }
118
119    if ((opt & FDB_ITR_SKIP_MIN_KEY && (!start_key || !start_keylen)) ||
120        (opt & FDB_ITR_SKIP_MAX_KEY && (!end_key || !end_keylen))) {
121        return FDB_RESULT_INVALID_ARGS;
122    }
123
124    hbtrie_result hr;
125    fdb_status fs;
126    LATENCY_STAT_START();
127
128    if (!handle->shandle) {
129        // If compaction is already done before this line,
130        // handle->file needs to be replaced with handle->new_file.
131        fdb_check_file_reopen(handle, NULL);
132        fdb_sync_db_header(handle);
133    }
134
135    fdb_iterator *iterator = (fdb_iterator *)calloc(1, sizeof(fdb_iterator));
136
137    if (!handle->shandle) {
138        // snapshot handle doesn't exist
139        // open a new handle to make the iterator handle as a snapshot
140        fs = fdb_snapshot_open(handle, &iterator->handle, FDB_SNAPSHOT_INMEM);
141        if (fs != FDB_RESULT_SUCCESS) {
142            fdb_log(&handle->log_callback, fs,
143                    "Failed to create an iterator instance due to the failure of "
144                    "open operation on the KV Store '%s' in a database file '%s'",
145                    _fdb_kvs_get_name(handle, handle->file),
146                    handle->file->filename);
147            return fs;
148        }
149        iterator->snapshot_handle = false;
150    } else {
151        // Snapshot handle exists
152        // We don't need to open a new handle.. just point to the snapshot handle.
153        iterator->handle = handle;
154        iterator->snapshot_handle = true;
155    }
156    iterator->opt = opt;
157
158    iterator->_key = (void*)malloc(FDB_MAX_KEYLEN_INTERNAL);
159    // set to zero the first <chunksize> bytes
160    memset(iterator->_key, 0x0, iterator->handle->config.chunksize);
161    iterator->_keylen = 0;
162    iterator->_offset = BLK_NOT_FOUND;
163    iterator->hbtrie_iterator = NULL;
164    iterator->seqtree_iterator = NULL;
165    iterator->seqtrie_iterator = NULL;
166
167    if (iterator->handle->kvs) {
168        // multi KV instance mode .. prepend KV ID
169        size_t size_chunk = handle->config.chunksize;
170        uint8_t *start_key_temp, *end_key_temp;
171
172        if (start_key == NULL) {
173            start_key_temp = alca(uint8_t, size_chunk);
174            kvid2buf(size_chunk, iterator->handle->kvs->id, start_key_temp);
175            start_key = start_key_temp;
176            start_keylen = size_chunk;
177        } else {
178            start_key_temp = alca(uint8_t, size_chunk + start_keylen);
179            kvid2buf(size_chunk, iterator->handle->kvs->id, start_key_temp);
180            memcpy(start_key_temp + size_chunk, start_key, start_keylen);
181            start_key = start_key_temp;
182            start_keylen += size_chunk;
183        }
184
185        if (end_key == NULL) {
186            // set end_key as NULL key of the next KV ID.
187            // NULL key doesn't actually exist so that the iterator ends
188            // at the last key of the current KV ID.
189            end_key_temp = alca(uint8_t, size_chunk);
190            kvid2buf(size_chunk, iterator->handle->kvs->id+1, end_key_temp);
191            end_key = end_key_temp;
192            end_keylen = size_chunk;
193        } else {
194            end_key_temp = alca(uint8_t, size_chunk + end_keylen);
195            kvid2buf(size_chunk, iterator->handle->kvs->id, end_key_temp);
196            memcpy(end_key_temp + size_chunk, end_key, end_keylen);
197            end_key = end_key_temp;
198            end_keylen += size_chunk;
199        }
200
201        iterator->start_key = (void*)malloc(start_keylen);
202        memcpy(iterator->start_key, start_key, start_keylen);
203        iterator->start_keylen = start_keylen;
204
205        iterator->end_key = (void*)malloc(end_keylen);
206        memcpy(iterator->end_key, end_key, end_keylen);
207        iterator->end_keylen = end_keylen;
208
209    } else { // single KV instance mode
210        if (start_key == NULL) {
211            iterator->start_key = NULL;
212            iterator->start_keylen = 0;
213        } else {
214            iterator->start_key = (void*)malloc(start_keylen);
215            memcpy(iterator->start_key, start_key, start_keylen);
216            iterator->start_keylen = start_keylen;
217        }
218
219        if (end_key == NULL) {
220            iterator->end_key = NULL;
221            end_keylen = 0;
222        }else{
223            iterator->end_key = (void*)malloc(end_keylen);
224            memcpy(iterator->end_key, end_key, end_keylen);
225        }
226        iterator->end_keylen = end_keylen;
227    }
228
229    // create an iterator handle for hb-trie
230    iterator->hbtrie_iterator = (struct hbtrie_iterator *)
231                                malloc(sizeof(struct hbtrie_iterator));
232    hr = hbtrie_iterator_init(iterator->handle->trie,
233                              iterator->hbtrie_iterator,
234                              (void *)start_key, start_keylen);
235    assert(hr == HBTRIE_RESULT_SUCCESS);
236
237    wal_itr_init(iterator->handle->file, iterator->handle->shandle, true,
238                 &iterator->wal_itr);
239
240    if (start_key) {
241        struct wal_item query;
242        struct wal_item_header query_key;
243        query.header = &query_key;
244        query_key.key = iterator->start_key;
245        query_key.keylen = iterator->start_keylen;
246        iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
247                                                       &query);
248    } else {
249        iterator->tree_cursor = wal_itr_first(iterator->wal_itr);
250    }
251    // to know reverse iteration endpoint store the start cursor
252    if (iterator->tree_cursor) {
253        iterator->tree_cursor_start = iterator->tree_cursor;
254    }
255    iterator->tree_cursor_prev = iterator->tree_cursor;
256    iterator->direction = FDB_ITR_DIR_NONE;
257    iterator->status = FDB_ITR_IDX;
258    iterator->_dhandle = NULL; // populated at the first iterator movement
259
260    *ptr_iterator = iterator;
261
262    ++iterator->handle->num_iterators; // Increment the iterator counter of the KV handle
263    fdb_iterator_next(iterator); // position cursor at first key
264
265    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_INIT);
266
267    return FDB_RESULT_SUCCESS;
268}
269
270LIBFDB_API
271fdb_status fdb_iterator_sequence_init(fdb_kvs_handle *handle,
272                                      fdb_iterator **ptr_iterator,
273                                      const fdb_seqnum_t start_seq,
274                                      const fdb_seqnum_t end_seq,
275                                      fdb_iterator_opt_t opt)
276{
277    if (handle == NULL || ptr_iterator == NULL ||
278        (end_seq && start_seq > end_seq)) {
279        return FDB_RESULT_INVALID_ARGS;
280    }
281
282    fdb_status fs;
283    fdb_seqnum_t _start_seq = _endian_encode(start_seq);
284    fdb_kvs_id_t _kv_id;
285    size_t size_id, size_seq;
286    uint8_t *start_seq_kv;
287    struct wal_item query;
288    struct wal_item_header query_key;
289    LATENCY_STAT_START();
290
291    query.header = &query_key;
292
293    // Sequence trees are a must for byseq operations
294    if (handle->config.seqtree_opt != FDB_SEQTREE_USE) {
295        return FDB_RESULT_INVALID_CONFIG;
296    }
297
298    if (!handle->shandle) {
299        // If compaction is already done before this line,
300        // handle->file needs to be replaced with handle->new_file.
301        fdb_check_file_reopen(handle, NULL);
302        fdb_sync_db_header(handle);
303    }
304
305    size_id = sizeof(fdb_kvs_id_t);
306    size_seq = sizeof(fdb_seqnum_t);
307    fdb_iterator *iterator = (fdb_iterator *)calloc(1, sizeof(fdb_iterator));
308
309    if (!handle->shandle) {
310        // snapshot handle doesn't exist
311        // open a new handle to make the iterator handle as a snapshot
312        fs = fdb_snapshot_open(handle, &iterator->handle, FDB_SNAPSHOT_INMEM);
313        if (fs != FDB_RESULT_SUCCESS) {
314            fdb_log(&handle->log_callback, fs,
315                    "Failed to create an sequence iterator instance due to the "
316                    "failure of "
317                    "open operation on the KV Store '%s' in a database file '%s'",
318                    _fdb_kvs_get_name(handle, handle->file),
319                    handle->file->filename);
320            return fs;
321        }
322        iterator->snapshot_handle = false;
323    } else {
324        // Snapshot handle exists
325        // We don't need to open a new handle.. just point to the snapshot handle.
326        iterator->handle = handle;
327        iterator->snapshot_handle = true;
328    }
329
330    iterator->hbtrie_iterator = NULL;
331    iterator->_key = NULL;
332    iterator->_keylen = 0;
333    iterator->opt = opt;
334    iterator->_offset = BLK_NOT_FOUND;
335    iterator->_seqnum = start_seq;
336
337    // For easy API call, treat zero seq as 0xffff...
338    // (because zero seq number is not used)
339    if (end_seq == 0) {
340        iterator->end_seqnum = SEQNUM_NOT_USED;
341    } else {
342        iterator->end_seqnum = end_seq;
343    }
344
345    iterator->start_seqnum = start_seq;
346
347    iterator->start_key = NULL;
348    iterator->end_key = NULL;
349
350    wal_itr_init(handle->file, iterator->handle->shandle, false,
351                 &iterator->wal_itr);
352
353    if (iterator->handle->kvs) {
354        int size_chunk = handle->config.chunksize;
355        // create an iterator handle for hb-trie
356        start_seq_kv = alca(uint8_t, size_chunk + size_seq);
357        _kv_id = _endian_encode(iterator->handle->kvs->id);
358        memcpy(start_seq_kv, &_kv_id, size_id);
359        memcpy(start_seq_kv + size_id, &_start_seq, size_seq);
360
361        iterator->seqtrie_iterator = (struct hbtrie_iterator *)
362                                     calloc(1, sizeof(struct hbtrie_iterator));
363        hbtrie_iterator_init(iterator->handle->seqtrie,
364                             iterator->seqtrie_iterator,
365                             start_seq_kv, size_id + size_seq);
366
367        query_key.key = start_seq_kv;
368        kvid2buf(size_chunk, iterator->handle->kvs->id, start_seq_kv);
369        memcpy(start_seq_kv + size_chunk, &start_seq, size_seq);
370        query_key.keylen = size_chunk + size_seq;
371        query.seqnum = start_seq;
372        iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
373                                                       &query);
374    } else {
375        // create an iterator handle for b-tree
376        iterator->seqtree_iterator = (struct btree_iterator *)
377                                     calloc(1, sizeof(struct btree_iterator));
378        btree_iterator_init(iterator->handle->seqtree,
379                            iterator->seqtree_iterator,
380                            (void *)(start_seq ? &_start_seq : NULL));
381        query_key.key = (void*)NULL;
382        query_key.keylen = 0;
383        query.seqnum = start_seq;
384        iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
385                                                       &query);
386    }
387
388    // to know reverse iteration endpoint store the start cursor
389    if (iterator->tree_cursor) {
390        iterator->tree_cursor_start = iterator->tree_cursor;
391    }
392    iterator->tree_cursor_prev = iterator->tree_cursor;
393    iterator->direction = FDB_ITR_DIR_NONE;
394    iterator->status = FDB_ITR_IDX;
395    iterator->_dhandle = NULL; // populated at the first iterator movement
396
397    *ptr_iterator = iterator;
398
399    ++iterator->handle->num_iterators; // Increment the iterator counter of the KV handle
400    fdb_iterator_next(iterator); // position cursor at first key
401
402    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_SEQ_INIT);
403
404    return FDB_RESULT_SUCCESS;
405}
406
407static fdb_status _fdb_iterator_prev(fdb_iterator *iterator)
408{
409    int cmp;
410    void *key;
411    size_t keylen;
412    uint64_t offset;
413    hbtrie_result hr = HBTRIE_RESULT_SUCCESS;
414    struct docio_handle *dhandle;
415    struct wal_item *snap_item = NULL;
416
417    if (iterator->direction != FDB_ITR_REVERSE) {
418        iterator->_offset = BLK_NOT_FOUND; // need to re-examine Trie/trees
419        if (iterator->tree_cursor) {
420            // just turn around
421            // WAL:   0  v  2->   4    (OLD state)
422            // TRIE:     1  2  3  4
423            iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
424                                                  iterator->tree_cursor);
425            if (iterator->direction == FDB_ITR_FORWARD &&
426                iterator->status != FDB_ITR_WAL) {
427                iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
428            }
429            // WAL: <-0  v  2     4    (NEW state)
430            // TRIE:  0  1  2  3  4
431        } else if (iterator->tree_cursor_prev) { // gone past the end..
432            iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
433                                             iterator->tree_cursor_prev);
434            iterator->status = FDB_ITR_IDX;
435        } // else Don't move - seek()/init() has already positioned cursor
436    }
437start:
438    key = iterator->_key;
439    dhandle = iterator->handle->dhandle;
440
441    // retrieve from hb-trie
442    if (iterator->_offset == BLK_NOT_FOUND) {
443        // no key waiting for being returned
444        // get next key from hb-trie (or idtree)
445        struct docio_object _doc;
446        // Move Main index Cursor backward...
447        int64_t _offset;
448        do {
449            hr = hbtrie_prev(iterator->hbtrie_iterator, key,
450                             &iterator->_keylen, (void*)&iterator->_offset);
451            btreeblk_end(iterator->handle->bhandle);
452            iterator->_offset = _endian_decode(iterator->_offset);
453            if (!(iterator->opt & FDB_ITR_NO_DELETES) ||
454                  hr != HBTRIE_RESULT_SUCCESS) {
455                break;
456            }
457            // deletion check
458            memset(&_doc, 0x0, sizeof(struct docio_object));
459            _offset = docio_read_doc_key_meta(dhandle, iterator->_offset,
460                                              &_doc, true);
461            if (_offset <= 0) { // read fail
462                continue; // get prev doc
463            }
464            if (_doc.length.flag & DOCIO_DELETED) { // deleted doc
465                free(_doc.key);
466                free(_doc.meta);
467                continue; // get prev doc
468            }
469            free(_doc.key);
470            free(_doc.meta);
471            break;
472        } while (1);
473    }
474    keylen = iterator->_keylen;
475    offset = iterator->_offset;
476
477    if (hr != HBTRIE_RESULT_SUCCESS && !iterator->tree_cursor) {
478        return FDB_RESULT_ITERATOR_FAIL;
479    }
480
481    // Move the WAL cursor backward...
482    while (iterator->tree_cursor) {
483        if (iterator->status == FDB_ITR_WAL) {
484            iterator->tree_cursor_prev = iterator->tree_cursor;
485            iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
486        }// else don't move - seek()/ init() has already positioned cursor
487
488        // get the current item of avl-tree
489        snap_item = iterator->tree_cursor;
490        if (!snap_item) {
491            if (hr == HBTRIE_RESULT_SUCCESS) {
492                break;
493            } else {
494                return FDB_RESULT_ITERATOR_FAIL;
495            }
496        }
497        if (hr == HBTRIE_RESULT_SUCCESS) {
498            cmp = _fdb_key_cmp(iterator, snap_item->header->key,
499                               snap_item->header->keylen,
500                               key, keylen);
501        } else {
502            // no more docs in hb-trie
503            cmp = 1;
504        }
505
506        if (cmp >= 0) {
507            // key[WAL] >= key[hb-trie] .. take key[WAL] first
508            uint8_t drop_logical_deletes =
509                (snap_item->action == WAL_ACT_LOGICAL_REMOVE) &&
510                (iterator->opt & FDB_ITR_NO_DELETES);
511            iterator->status = FDB_ITR_WAL;
512            if (cmp > 0) {
513                if (snap_item->action == WAL_ACT_REMOVE ||
514                    drop_logical_deletes) {
515                    // this key is removed .. get prev key[WAL]
516                    continue;
517                }
518            } else { // same key found in WAL
519                iterator->_offset = BLK_NOT_FOUND; // drop key from trie
520                if (snap_item->action == WAL_ACT_REMOVE || drop_logical_deletes) {
521                    // the key is removed .. start over again
522                    goto start;
523                }
524            }
525
526            key = snap_item->header->key;
527            keylen = snap_item->header->keylen;
528            // key[hb-trie] is stashed in iterator->_key for future call
529            offset = snap_item->offset;
530        }
531        break;
532    }
533
534    if (offset == iterator->_offset) {
535        // take key[hb-trie] & and fetch the prev key[hb-trie] at next turn
536        iterator->_offset = BLK_NOT_FOUND;
537        iterator->status = FDB_ITR_IDX;
538    }
539
540    if (iterator->start_key) {
541        cmp = _fdb_key_cmp(iterator, iterator->start_key,
542                           iterator->start_keylen, key, keylen);
543
544        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MIN_KEY) || cmp > 0) {
545            // current key (KEY) is lexicographically less than START_KEY
546            // OR it is the start key and user wishes to skip it..
547            // terminate the iteration
548            return FDB_RESULT_ITERATOR_FAIL;
549        }
550    }
551
552    if (iterator->end_key) {
553        cmp = _fdb_key_cmp(iterator, iterator->end_key,
554                           iterator->end_keylen, key, keylen);
555
556        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MAX_KEY) || cmp < 0) {
557            // key is the end_key but users wishes to skip it, redo..
558            // OR current key (KEY) is lexicographically greater than END_KEY
559            goto start;
560        }
561    }
562
563    iterator->_dhandle = dhandle; // store for fdb_iterator_get()
564    iterator->_get_offset = offset; // store for fdb_iterator_get()
565
566    return FDB_RESULT_SUCCESS;
567}
568
569static fdb_status _fdb_iterator_next(fdb_iterator *iterator)
570{
571    int cmp;
572    void *key;
573    size_t keylen;
574    uint64_t offset;
575    hbtrie_result hr = HBTRIE_RESULT_SUCCESS;
576    struct docio_handle *dhandle;
577    struct wal_item *snap_item = NULL;
578
579    if (iterator->direction != FDB_ITR_FORWARD) {
580        iterator->_offset = BLK_NOT_FOUND; // need to re-examine Trie/trees
581        // just turn around and face forward..
582        if (iterator->tree_cursor) {
583            // WAL: <-0  v  2     4    (OLD state)
584            // TRIE:     1  2  3  4
585            iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
586                                                  iterator->tree_cursor);
587            if (iterator->direction == FDB_ITR_REVERSE &&
588                iterator->status != FDB_ITR_WAL) {
589                iterator->tree_cursor = wal_itr_next(iterator->wal_itr);
590            }
591            // WAL:   0  v  2->   4    (NEW state)
592            // TRIE:  0  1  2  3  4
593        } else if (iterator->tree_cursor_prev) {
594            iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
595                                             iterator->tree_cursor_prev);
596            iterator->status = FDB_ITR_IDX;
597        } // else Don't move - seek()/init() has already positioned cursor
598    }
599
600start:
601    key = iterator->_key;
602    dhandle = iterator->handle->dhandle;
603
604    // retrieve from hb-trie
605    if (iterator->_offset == BLK_NOT_FOUND) {
606        // no key waiting for being returned
607        // get next key from hb-trie (or idtree)
608        struct docio_object _doc;
609        // Move Main index Cursor forward...
610        int64_t _offset;
611        do {
612            hr = hbtrie_next(iterator->hbtrie_iterator, key,
613                             &iterator->_keylen, (void*)&iterator->_offset);
614            btreeblk_end(iterator->handle->bhandle);
615            iterator->_offset = _endian_decode(iterator->_offset);
616            if (!(iterator->opt & FDB_ITR_NO_DELETES) ||
617                  hr != HBTRIE_RESULT_SUCCESS) {
618                break;
619            }
620            // deletion check
621            memset(&_doc, 0x0, sizeof(struct docio_object));
622            _offset = docio_read_doc_key_meta(dhandle, iterator->_offset, &_doc,
623                                              true);
624            if (_offset <= 0) { // read fail
625                continue; // get next doc
626            }
627            if (_doc.length.flag & DOCIO_DELETED) { // deleted doc
628                free(_doc.key);
629                free(_doc.meta);
630                continue; // get next doc
631            }
632            free(_doc.key);
633            free(_doc.meta);
634            break;
635        } while (1);
636    }
637
638    keylen = iterator->_keylen;
639    offset = iterator->_offset;
640
641    if (hr != HBTRIE_RESULT_SUCCESS && iterator->tree_cursor == NULL) {
642        return FDB_RESULT_ITERATOR_FAIL;
643    }
644
645    // Move WAL Cursor forward...
646    while (iterator->tree_cursor) {
647        if (iterator->status == FDB_ITR_WAL) {
648            iterator->tree_cursor_prev = iterator->tree_cursor;
649            iterator->tree_cursor = wal_itr_next(iterator->wal_itr);
650        } // else Don't move - seek()/ init() has already positioned cursor
651        snap_item = iterator->tree_cursor;
652        if (!snap_item) {
653            if (hr == HBTRIE_RESULT_SUCCESS) {
654                break;
655            } else { // no more keys in WAL or main index
656                return FDB_RESULT_ITERATOR_FAIL;
657            }
658        }
659        // Compare key[WAL] with key[hb-trie]
660        if (hr == HBTRIE_RESULT_SUCCESS) {
661            cmp = _fdb_key_cmp(iterator, snap_item->header->key,
662                               snap_item->header->keylen,
663                               key, keylen);
664        } else {
665            // no more docs in hb-trie
666            cmp = -1;
667        }
668
669        if (cmp <= 0) {
670            // key[WAL] <= key[hb-trie] .. take key[WAL] first
671            uint8_t drop_logical_deletes =
672                (snap_item->action == WAL_ACT_LOGICAL_REMOVE) &&
673                (iterator->opt & FDB_ITR_NO_DELETES);
674            iterator->status = FDB_ITR_WAL;
675            if (cmp < 0) {
676                if (snap_item->action == WAL_ACT_REMOVE ||
677                    drop_logical_deletes) {
678                    // this key is removed .. get next key[WAL]
679                    continue;
680                }
681            } else { // Same key from trie also found from WAL
682                iterator->_offset = BLK_NOT_FOUND; // drop key from trie
683                if (snap_item->action == WAL_ACT_REMOVE || drop_logical_deletes) {
684                    // the key is removed .. start over again
685                    goto start;
686                }
687            }
688            key = snap_item->header->key;
689            keylen = snap_item->header->keylen;
690            // key[hb-trie] is stashed in iterator->key for next call
691            offset = snap_item->offset;
692        }
693        break;
694    }
695
696    if (offset == iterator->_offset) {
697        // take key[hb-trie] & and fetch the next key[hb-trie] at next turn
698        iterator->_offset = BLK_NOT_FOUND;
699        iterator->status = FDB_ITR_IDX;
700    }
701
702    if (iterator->start_key) {
703        cmp = _fdb_key_cmp(iterator, iterator->start_key,
704                           iterator->start_keylen, key, keylen);
705
706        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MIN_KEY) || cmp > 0) {
707            // If user wishes to skip start key, redo first step
708            // OR current key (KEY) is lexicographically smaller than START_KEY
709            goto start;
710        }
711    }
712
713    if (iterator->end_key) {
714        cmp = _fdb_key_cmp(iterator, iterator->end_key, iterator->end_keylen,
715                           key, keylen);
716        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MAX_KEY) || cmp < 0) {
717            // current key (KEY) is lexicographically greater than END_KEY
718            // OR it is the end_key and user wishes to skip it
719            // terminate the iteration
720            return FDB_RESULT_ITERATOR_FAIL;
721        }
722    }
723
724    iterator->_dhandle = dhandle; // store for fdb_iterator_get()
725    iterator->_get_offset = offset; // store for fdb_iterator_get()
726
727    return FDB_RESULT_SUCCESS;
728}
729
730LIBFDB_API
731fdb_status fdb_iterator_seek(fdb_iterator *iterator,
732                             const void *seek_key,
733                             const size_t seek_keylen,
734                             const fdb_iterator_seek_opt_t seek_pref)
735{
736    if (!iterator) {
737        return FDB_RESULT_INVALID_ARGS;
738    }
739
740    int cmp, cmp2; // intermediate results of comparison
741    int next_op = 0; // 0: none, -1: prev(), 1: next();
742    int size_chunk = iterator->handle->config.chunksize;
743    uint8_t *seek_key_kv;
744    int64_t _offset;
745    size_t seek_keylen_kv;
746    bool skip_wal = false, fetch_next = true, fetch_wal = true;
747    hbtrie_result hr = HBTRIE_RESULT_SUCCESS;
748    struct wal_item *snap_item = NULL, query;
749    struct wal_item_header query_header;
750    struct docio_object _doc;
751    fdb_status ret;
752    LATENCY_STAT_START();
753
754    iterator->_dhandle = NULL; // setup for get() to return FAIL
755
756    if (!iterator || !seek_key || !iterator->_key ||
757        seek_keylen > FDB_MAX_KEYLEN ||
758        (iterator->handle->kvs_config.custom_cmp &&
759         seek_keylen > iterator->handle->config.blocksize - HBTRIE_HEADROOM)) {
760        return FDB_RESULT_INVALID_ARGS;
761    }
762
763    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
764        return FDB_RESULT_HANDLE_BUSY;
765    }
766
767    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_moves,
768                         std::memory_order_relaxed);
769
770    if (iterator->handle->kvs) {
771        seek_keylen_kv = seek_keylen + size_chunk;
772        seek_key_kv = alca(uint8_t, seek_keylen_kv);
773        kvid2buf(size_chunk, iterator->handle->kvs->id, seek_key_kv);
774        memcpy(seek_key_kv + size_chunk, seek_key, seek_keylen);
775    } else {
776        seek_keylen_kv = seek_keylen;
777        seek_key_kv = (uint8_t*)seek_key;
778    }
779
780    // disable seeking beyond the end key...
781    if (iterator->end_key) {
782        cmp = _fdb_key_cmp(iterator, (void *)iterator->end_key,
783                                    iterator->end_keylen,
784                                    (void *)seek_key_kv, seek_keylen_kv);
785        if (cmp == 0 && iterator->opt & FDB_ITR_SKIP_MAX_KEY) {
786            // seek the end key at this time,
787            // and call prev() next.
788            next_op = -1;
789        }
790        if (cmp < 0) {
791            atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
792            return FDB_RESULT_ITERATOR_FAIL;
793        }
794    }
795
796    // disable seeking beyond the start key...
797    if (iterator->start_key) {
798        cmp = _fdb_key_cmp(iterator,
799                                  (void *)iterator->start_key,
800                                  iterator->start_keylen,
801                                  (void *)seek_key_kv, seek_keylen_kv);
802        if (cmp == 0 && iterator->opt & FDB_ITR_SKIP_MIN_KEY) {
803            // seek the start key at this time,
804            // and call next() next.
805            next_op = 1;
806        }
807        if (cmp > 0) {
808            atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
809            return FDB_RESULT_ITERATOR_FAIL;
810        }
811    }
812
813    iterator->direction = FDB_ITR_FORWARD;
814
815    // reset HB+trie's iterator
816    hbtrie_iterator_free(iterator->hbtrie_iterator);
817    hbtrie_iterator_init(iterator->handle->trie, iterator->hbtrie_iterator,
818                         seek_key_kv, seek_keylen_kv);
819
820fetch_hbtrie:
821    if (seek_pref == FDB_ITR_SEEK_HIGHER) {
822        // fetch next key
823        hr = hbtrie_next(iterator->hbtrie_iterator, iterator->_key,
824                         &iterator->_keylen, (void*)&iterator->_offset);
825        btreeblk_end(iterator->handle->bhandle);
826
827        if (hr == HBTRIE_RESULT_SUCCESS) {
828            cmp = _fdb_key_cmp(iterator,
829                               iterator->_key, iterator->_keylen,
830                               seek_key_kv, seek_keylen_kv);
831            if (cmp < 0) {
832                // key[HB+trie] < seek_key .. move forward
833                hr = hbtrie_next(iterator->hbtrie_iterator, iterator->_key,
834                                 &iterator->_keylen, (void*)&iterator->_offset);
835                btreeblk_end(iterator->handle->bhandle);
836            }
837            iterator->_offset = _endian_decode(iterator->_offset);
838
839            while (iterator->opt & FDB_ITR_NO_DELETES &&
840                   hr == HBTRIE_RESULT_SUCCESS        &&
841                   fetch_next) {
842                fetch_next = false;
843                memset(&_doc, 0x0, sizeof(struct docio_object));
844                _offset = docio_read_doc_key_meta(iterator->handle->dhandle,
845                                                  iterator->_offset, &_doc,
846                                                  true);
847                if (_offset <= 0) { // read fail
848                    fetch_next = true; // get next
849                } else if (_doc.length.flag & DOCIO_DELETED) { // deleted doc
850                    free(_doc.key);
851                    free(_doc.meta);
852                    fetch_next = true; // get next
853                } else {
854                    free(_doc.key);
855                    free(_doc.meta);
856                }
857                if (fetch_next) {
858                    hr = hbtrie_next(iterator->hbtrie_iterator, iterator->_key,
859                                     &iterator->_keylen,
860                                     (void*)&iterator->_offset);
861                    btreeblk_end(iterator->handle->bhandle);
862                    iterator->_offset = _endian_decode(iterator->_offset);
863                }
864            }
865        }
866    } else {
867        // fetch prev key
868        hr = hbtrie_prev(iterator->hbtrie_iterator, iterator->_key,
869                         &iterator->_keylen, (void*)&iterator->_offset);
870        btreeblk_end(iterator->handle->bhandle);
871        if (hr == HBTRIE_RESULT_SUCCESS) {
872            cmp = _fdb_key_cmp(iterator,
873                               iterator->_key, iterator->_keylen,
874                               seek_key_kv, seek_keylen_kv);
875            if (cmp > 0) {
876                // key[HB+trie] > seek_key .. move backward
877                hr = hbtrie_prev(iterator->hbtrie_iterator, iterator->_key,
878                                 &iterator->_keylen, (void*)&iterator->_offset);
879                btreeblk_end(iterator->handle->bhandle);
880            }
881            iterator->_offset = _endian_decode(iterator->_offset);
882
883            while (iterator->opt & FDB_ITR_NO_DELETES &&
884                   hr == HBTRIE_RESULT_SUCCESS        &&
885                   fetch_next) {
886                fetch_next = false;
887                memset(&_doc, 0x0, sizeof(struct docio_object));
888                _offset = docio_read_doc_key_meta(iterator->handle->dhandle,
889                                                  iterator->_offset, &_doc,
890                                                  true);
891                if (_offset <= 0) { // read fail
892                    fetch_next = true; // get prev
893                } else if (_doc.length.flag & DOCIO_DELETED) { // deleted doc
894                    free(_doc.key);
895                    free(_doc.meta);
896                    fetch_next = true; // get prev
897                } else {
898                    free(_doc.key);
899                    free(_doc.meta);
900                }
901                if (fetch_next) {
902                    hr = hbtrie_prev(iterator->hbtrie_iterator, iterator->_key,
903                                     &iterator->_keylen,
904                                     (void*)&iterator->_offset);
905                    btreeblk_end(iterator->handle->bhandle);
906                    iterator->_offset = _endian_decode(iterator->_offset);
907                }
908            }
909        }
910    }
911
912    if (hr == HBTRIE_RESULT_SUCCESS && // Validate iteration range limits..
913        !next_op) { // only if caller is not seek_to_max/min (handled later)
914        if (iterator->end_key) {
915            cmp = _fdb_key_cmp(iterator, iterator->_key, iterator->_keylen,
916                               iterator->end_key, iterator->end_keylen);
917            if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MAX_KEY) ||
918                 cmp > 0) { // greater than end_key OR at skipped MAX_KEY
919                hr = HBTRIE_RESULT_FAIL;
920            }
921        }
922        if (iterator->start_key) {
923            cmp = _fdb_key_cmp(iterator,iterator->_key, iterator->_keylen,
924                               iterator->start_key, iterator->start_keylen);
925            if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MIN_KEY) ||
926                 cmp < 0) { // smaller than start_key OR at skipped MIN_KEY
927                hr = HBTRIE_RESULT_FAIL;
928            }
929        }
930    }
931
932    if (iterator->handle->kvs) {
933        fdb_kvs_id_t kv_id;
934        buf2kvid(size_chunk, iterator->_key, &kv_id);
935        if (iterator->handle->kvs->id != kv_id) {
936            // seek is done beyond the KV ID
937            hr = HBTRIE_RESULT_FAIL;
938        }
939    }
940    if (hr == HBTRIE_RESULT_SUCCESS) {
941        iterator->_get_offset = iterator->_offset;
942        iterator->_dhandle = iterator->handle->dhandle;
943    } else {
944        // larger than the largest key or smaller than the smallest key
945        iterator->_get_offset = BLK_NOT_FOUND;
946        iterator->_dhandle = NULL;
947    }
948
949    // HB+trie's iterator should fetch another entry next time
950    iterator->_offset = BLK_NOT_FOUND;
951    iterator->status = FDB_ITR_IDX;
952
953    // retrieve avl-tree
954    query.header = &query_header;
955    query_header.key = seek_key_kv;
956    query_header.keylen = seek_keylen_kv;
957
958    if (seek_pref == FDB_ITR_SEEK_HIGHER) {
959        if (fetch_wal) {
960            iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
961                                                           &query);
962            iterator->direction = FDB_ITR_FORWARD;
963        }
964        if (iterator->tree_cursor) {
965            // skip deleted WAL entry
966            do {
967                snap_item = iterator->tree_cursor;
968                if ((snap_item->action == WAL_ACT_LOGICAL_REMOVE && // skip
969                    iterator->opt & FDB_ITR_NO_DELETES) || //logical delete OR
970                    snap_item->action == WAL_ACT_REMOVE) { // immediate purge
971                    if (iterator->_dhandle) {
972                        cmp = _fdb_key_cmp(iterator,
973                                           snap_item->header->key,
974                                           snap_item->header->keylen,
975                                           iterator->_key, iterator->_keylen);
976                        if (cmp == 0) {
977                            // same doc exists in HB+trie
978                            // move tree cursor
979                            iterator->tree_cursor = wal_itr_next(
980                                                     iterator->wal_itr);
981                            // do not move tree cursor next time
982                            fetch_wal = false;
983                            // fetch next key[HB+trie]
984                            goto fetch_hbtrie;
985                        } else if (cmp > 0) {
986                            break;
987                        }
988                    }
989                    iterator->tree_cursor = wal_itr_next(iterator->wal_itr);
990                    continue;
991                } else if (iterator->end_key &&
992                           iterator->opt & FDB_ITR_SKIP_MAX_KEY) {
993                    cmp = _fdb_key_cmp(iterator,
994                                       iterator->end_key, iterator->end_keylen,
995                                       snap_item->header->key,
996                                       snap_item->header->keylen);
997                    if (cmp == 0 ||
998                        // WAL cursor is positioned exactly at seeked end key
999                        // but iterator must skip the end key!
1000                        // If hb+trie has an item, use that else return FAIL
1001                        (cmp < 0 &&// WAL key out of range...
1002                        !next_op)) { // Not called from fdb_iterator_seek_to_max
1003                        skip_wal = true;
1004                        iterator->status = FDB_ITR_WAL;
1005                    }
1006                }
1007                break;
1008            } while(iterator->tree_cursor);
1009        }
1010        iterator->tree_cursor_prev = iterator->tree_cursor;
1011        if (!iterator->tree_cursor) {
1012            // seek_key is larger than the largest key
1013            // set prev key to the largest key.
1014            // if prev operation is called next, tree_cursor will be set to
1015            // tree_cursor_prev.
1016            iterator->tree_cursor_prev = wal_itr_search_smaller(iterator->wal_itr,
1017                                                                &query);
1018        }
1019    } else if (seek_pref == FDB_ITR_SEEK_LOWER) {
1020        if (fetch_wal) {
1021            iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
1022                                                           &query);
1023            iterator->direction = FDB_ITR_REVERSE;
1024        }
1025        if (iterator->tree_cursor) {
1026            // skip deleted WAL entry
1027            do {
1028                snap_item = iterator->tree_cursor;
1029                if ((snap_item->action == WAL_ACT_LOGICAL_REMOVE && // skip
1030                     iterator->opt & FDB_ITR_NO_DELETES) || //logical delete OR
1031                     snap_item->action == WAL_ACT_REMOVE) { //immediate purge
1032                    if (iterator->_dhandle) {
1033                        cmp = _fdb_key_cmp(iterator,
1034                                           snap_item->header->key,
1035                                           snap_item->header->keylen,
1036                                           iterator->_key, iterator->_keylen);
1037                        if (cmp == 0) {
1038                            // same doc exists in HB+trie
1039                            // move tree cursor
1040                            iterator->tree_cursor = wal_itr_prev(iterator->
1041                                                                 wal_itr);
1042                            // do not move tree cursor next time
1043                            fetch_wal = false;
1044                            // fetch next key[HB+trie]
1045                            goto fetch_hbtrie;
1046                        } else if (cmp < 0) {
1047                            break;
1048                        }
1049                    }
1050                    iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
1051                    continue;
1052                } else if (iterator->start_key &&
1053                           iterator->opt & FDB_ITR_SKIP_MIN_KEY) {
1054                    cmp = _fdb_key_cmp(iterator,
1055                                  snap_item->header->key,
1056                                  snap_item->header->keylen,
1057                                  iterator->start_key, iterator->start_keylen);
1058                    if (cmp == 0 ||
1059                        // WAL cursor is positioned exactly at seeked start key
1060                        // but iterator must skip the start key!
1061                        // If hb+trie has an item, use that else return FAIL
1062                        (cmp < 0 && // WAL key out of range and
1063                        !next_op)) { // Not called from fdb_iterator_seek_to_min
1064                        skip_wal = true;
1065                        iterator->status = FDB_ITR_WAL;
1066                    }
1067                }
1068                break;
1069            } while(iterator->tree_cursor);
1070        }
1071        iterator->tree_cursor_prev = iterator->tree_cursor;
1072        if (!iterator->tree_cursor) {
1073            // seek_key is smaller than the smallest key
1074            // Only allow fdb_iterator_next() call, fdb_iterator_prev() should
1075            // hit failure. To ensure this set the direction to as if
1076            // fdb_iterator_prev call has gone past the smallest key...
1077            iterator->tree_cursor_prev = wal_itr_search_greater(iterator->wal_itr,
1078                                                                &query);
1079            // since the current key[WAL] is larger than seek_key,
1080            // skip key[WAL] this time
1081            skip_wal = true;
1082        }
1083    }
1084
1085    if (iterator->tree_cursor && !skip_wal) {
1086        bool take_wal = false;
1087        bool discard_hbtrie = false;
1088
1089        snap_item = iterator->tree_cursor;
1090
1091        if (hr == HBTRIE_RESULT_SUCCESS) {
1092            cmp = _fdb_key_cmp(iterator,
1093                               snap_item->header->key, snap_item->header->keylen,
1094                               iterator->_key, iterator->_keylen);
1095
1096            if (cmp == 0) {
1097                // same key exists in both HB+trie and WAL
1098                fdb_assert(snap_item->action != WAL_ACT_REMOVE,
1099                           snap_item->action, iterator->_offset);
1100                take_wal = true;
1101                discard_hbtrie = true;
1102            } else if (cmp < 0) { // key[WAL] < key[HB+trie]
1103                if (seek_pref == FDB_ITR_SEEK_HIGHER) {
1104                    // higher mode .. take smaller one (key[WAL]) first
1105                    take_wal = true;
1106                    discard_hbtrie = false;
1107                } else if (seek_pref == FDB_ITR_SEEK_LOWER) {
1108                    take_wal = false;
1109                    discard_hbtrie = false;
1110                    // In seek_to_max call with skip_max_key option,
1111                    if (next_op < 0) {
1112                        // if key[HB+trie] is the largest key
1113                        // smaller than max key,
1114                        // do not call prev() next.
1115                        if (iterator->end_key) {
1116                            cmp2 = _fdb_key_cmp(iterator,
1117                                                iterator->_key,
1118                                                iterator->_keylen,
1119                                                iterator->end_key,
1120                                                iterator->end_keylen);
1121                        } else {
1122                            cmp2 = -1;
1123                        }
1124                        if (cmp2 < 0) {
1125                            next_op = 0;
1126                        }
1127                    }
1128                }
1129            } else { // key[HB+trie] < key[WAL]
1130                if (seek_pref == FDB_ITR_SEEK_HIGHER) {
1131                    // higher mode .. take smaller one (key[HB+trie]) first
1132                    take_wal = false;
1133                    discard_hbtrie = false;
1134                    // In seek_to_min call with skip_min_key option,
1135                    if (next_op > 0) {
1136                        // if key[HB+trie] is the smallest key
1137                        // larger than min key,
1138                        // do not call next() next.
1139                        if (iterator->start_key) {
1140                            cmp2 = _fdb_key_cmp(iterator,
1141                                                iterator->start_key,
1142                                                iterator->start_keylen,
1143                                                iterator->_key,
1144                                                iterator->_keylen);
1145                        } else {
1146                            cmp2 = -1;
1147                        }
1148                        if (cmp2 < 0) {
1149                            next_op = 0;
1150                        }
1151                    }
1152                } else if (seek_pref == FDB_ITR_SEEK_LOWER) {
1153                    // lower mode .. discard smaller one (key[HB+trie])
1154                    take_wal = true;
1155                    discard_hbtrie = true;
1156                    // reset HB+trie's iterator to get the current
1157                    // key[HB+trie] one more time
1158                    hbtrie_iterator_free(iterator->hbtrie_iterator);
1159                    hbtrie_iterator_init(iterator->handle->trie,
1160                                         iterator->hbtrie_iterator,
1161                                         seek_key_kv, seek_keylen_kv);
1162                    iterator->_offset = BLK_NOT_FOUND;
1163                }
1164            }
1165        } else {
1166            // HB+trie seek fail (key[HB+trie] doesn't exist)
1167            take_wal = true;
1168            discard_hbtrie = true;
1169        }
1170
1171        if (take_wal) { // take key[WAL]
1172            if (!discard_hbtrie) { // do not skip the current key[HB+trie]
1173                // key[HB+trie] will be returned next time
1174                iterator->_offset = iterator->_get_offset;
1175            }
1176            iterator->_get_offset = snap_item->offset;
1177            iterator->_dhandle = iterator->handle->dhandle;
1178            iterator->status = FDB_ITR_WAL;
1179        }
1180    }
1181
1182    if (!iterator->_dhandle) {
1183        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1184        return FDB_RESULT_ITERATOR_FAIL;
1185    }
1186
1187    if (next_op < 0) {
1188        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1189        ret = fdb_iterator_prev(iterator);
1190    } else if (next_op > 0) {
1191        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1192        ret = fdb_iterator_next(iterator);
1193    } else {
1194        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1195        ret = FDB_RESULT_SUCCESS;
1196    }
1197
1198    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_SEEK);
1199    return ret;
1200}
1201
1202LIBFDB_API
1203fdb_status fdb_iterator_seek_to_min(fdb_iterator *iterator)
1204{
1205    if (!iterator || !iterator->_key) {
1206        return FDB_RESULT_INVALID_ARGS;
1207    }
1208
1209    size_t size_chunk = iterator->handle->config.chunksize;
1210    fdb_status ret;
1211    LATENCY_STAT_START();
1212
1213    // Initialize direction iteration to FORWARD just in case this function was
1214    // called right after fdb_iterator_init() so the cursor gets positioned
1215    // correctly
1216    iterator->direction = FDB_ITR_FORWARD;
1217    if (iterator->start_keylen > size_chunk) {
1218        fdb_iterator_seek_opt_t dir = (iterator->opt & FDB_ITR_SKIP_MIN_KEY) ?
1219                                      FDB_ITR_SEEK_HIGHER : FDB_ITR_SEEK_LOWER;
1220        fdb_status status = fdb_iterator_seek(iterator,
1221                (uint8_t *)iterator->start_key + size_chunk,
1222                iterator->start_keylen - size_chunk, dir);
1223        if (status != FDB_RESULT_SUCCESS && dir == FDB_ITR_SEEK_LOWER) {
1224            dir = FDB_ITR_SEEK_HIGHER;
1225            // It is possible that the min key specified during init does not
1226            // exist, so retry the seek with the HIGHER key
1227            return fdb_iterator_seek(iterator,
1228                (uint8_t *)iterator->start_key + size_chunk,
1229                iterator->start_keylen - size_chunk, dir);
1230        }
1231        return status;
1232    }
1233
1234    // reset HB+trie iterator using start key
1235    hbtrie_iterator_free(iterator->hbtrie_iterator);
1236    hbtrie_iterator_init(iterator->handle->trie, iterator->hbtrie_iterator,
1237                         iterator->start_key, iterator->start_keylen);
1238
1239    // reset WAL tree cursor using search because of the sharded nature of WAL
1240    if (iterator->tree_cursor_start) {
1241        iterator->tree_cursor_prev = iterator->tree_cursor =
1242                                     wal_itr_search_greater(iterator->wal_itr,
1243                                     iterator->tree_cursor_start);
1244        iterator->status = FDB_ITR_IDX; // WAL is already set
1245    }
1246
1247    ret = fdb_iterator_next(iterator);
1248    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_SEEK_MIN);
1249    return ret;
1250}
1251
1252fdb_status _fdb_iterator_seek_to_max_key(fdb_iterator *iterator) {
1253    int cmp;
1254    size_t size_chunk = iterator->handle->config.chunksize;
1255    if (!iterator || !iterator->_key) {
1256        return FDB_RESULT_INVALID_ARGS;
1257    }
1258
1259    // Initialize direction iteration to FORWARD just in case this function was
1260    // called right after fdb_iterator_init() so the cursor gets positioned
1261    // correctly
1262    iterator->direction = FDB_ITR_FORWARD;
1263    if (iterator->end_keylen > size_chunk) {
1264        fdb_iterator_seek_opt_t dir = (iterator->opt & FDB_ITR_SKIP_MAX_KEY) ?
1265                                      FDB_ITR_SEEK_LOWER : FDB_ITR_SEEK_HIGHER;
1266        fdb_status status = fdb_iterator_seek(iterator,
1267                (uint8_t *)iterator->end_key + size_chunk,
1268                iterator->end_keylen - size_chunk, dir);
1269
1270        if (status != FDB_RESULT_SUCCESS && dir == FDB_ITR_SEEK_HIGHER) {
1271            dir = FDB_ITR_SEEK_LOWER;
1272            // It is possible that the max key specified during init does not
1273            // exist, so retry the seek with the LOWER key
1274            return fdb_iterator_seek(iterator,
1275                    (uint8_t *)iterator->end_key + size_chunk,
1276                    iterator->end_keylen - size_chunk, dir);
1277        }
1278        return status;
1279    }
1280    iterator->direction = FDB_ITR_REVERSE; // only reverse iteration possible
1281
1282    if (iterator->end_key && iterator->end_keylen == size_chunk) {
1283        // end_key exists but end_keylen == size_id
1284        // it means that user doesn't assign end_key but
1285        // end_key is automatically assigned due to multi KVS mode.
1286
1287        // reset HB+trie's iterator using end_key
1288        hbtrie_iterator_free(iterator->hbtrie_iterator);
1289        hbtrie_iterator_init(iterator->handle->trie, iterator->hbtrie_iterator,
1290                             iterator->end_key, iterator->end_keylen);
1291        // get first key
1292        hbtrie_prev(iterator->hbtrie_iterator, iterator->_key,
1293                         &iterator->_keylen, (void*)&iterator->_offset);
1294        iterator->_offset = _endian_decode(iterator->_offset);
1295        cmp = _fdb_key_cmp(iterator,
1296                               iterator->end_key, iterator->end_keylen,
1297                               iterator->_key, iterator->_keylen);
1298        if (cmp < 0) {
1299            // returned key is larger than the end key .. skip
1300            iterator->_offset = BLK_NOT_FOUND;
1301        }
1302    } else {
1303        // move HB+trie iterator's cursor to the last entry
1304        hbtrie_last(iterator->hbtrie_iterator);
1305    }
1306
1307    // also move WAL tree's cursor to the last entry
1308    struct wal_item_header hdr;
1309    struct wal_item query;
1310    query.header = &hdr;
1311    hdr.key = iterator->end_key;
1312    hdr.keylen = iterator->end_keylen;
1313    iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
1314                                                   &query);
1315    iterator->tree_cursor_prev = iterator->tree_cursor;
1316    iterator->status = FDB_ITR_IDX;
1317
1318    return fdb_iterator_prev(iterator);
1319}
1320
1321fdb_status _fdb_iterator_seek_to_max_seq(fdb_iterator *iterator) {
1322    if (!iterator) {
1323        return FDB_RESULT_INVALID_ARGS;
1324    }
1325
1326    iterator->direction = FDB_ITR_REVERSE; // only reverse iteration possible
1327    iterator->_seqnum = iterator->end_seqnum;
1328
1329    if (iterator->handle->kvs) {
1330        // create an iterator handle for hb-trie
1331        uint8_t *end_seq_kv = alca(uint8_t, sizeof(size_t)*2);
1332        fdb_kvs_id_t _kv_id = _endian_encode(iterator->handle->kvs->id);
1333        memcpy(end_seq_kv, &_kv_id, sizeof(size_t));
1334        memcpy(end_seq_kv + sizeof(size_t), &iterator->end_seqnum,
1335                sizeof(size_t));
1336
1337        // reset HB+trie's seqtrie iterator using end_seq_kv
1338        hbtrie_iterator_free(iterator->seqtrie_iterator);
1339        hbtrie_iterator_init(iterator->handle->seqtrie,
1340                             iterator->seqtrie_iterator,
1341                             end_seq_kv, sizeof(size_t)*2);
1342    } else {
1343        // reset Btree iterator to end_seqnum
1344        btree_iterator_free(iterator->seqtree_iterator);
1345        // create an iterator handle for b-tree
1346        btree_iterator_init(iterator->handle->seqtree,
1347                            iterator->seqtree_iterator,
1348                            (void *)(&iterator->end_seqnum));
1349    }
1350
1351    if (iterator->end_seqnum != SEQNUM_NOT_USED) {
1352        struct wal_item query;
1353        struct wal_item_header query_key;
1354        size_t size_seq = sizeof(fdb_seqnum_t);
1355        size_t size_chunk = iterator->handle->config.chunksize;
1356        uint8_t *end_seq_kv = alca(uint8_t, size_chunk + size_seq);
1357        if (iterator->handle->kvs) {
1358            query_key.key = end_seq_kv;
1359            kvid2buf(size_chunk, iterator->handle->kvs->id, end_seq_kv);
1360            memcpy(end_seq_kv + size_chunk, &iterator->end_seqnum, size_seq);
1361            query_key.keylen = size_chunk + size_seq;
1362        } else {
1363            query_key.key = (void *) NULL;
1364            query_key.keylen = 0;
1365        }
1366        query.header = &query_key;
1367        query.seqnum = iterator->end_seqnum;
1368
1369        // reset WAL tree cursor using search because of the sharded WAL
1370        iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
1371                                                       &query);
1372    } else { // no end_seqnum specified, just head to the last entry
1373        iterator->tree_cursor = wal_itr_last(iterator->wal_itr);
1374    }
1375
1376    if (iterator->tree_cursor) {
1377        struct wal_item *snap_item = iterator->tree_cursor;
1378        if (snap_item->seqnum == iterator->end_seqnum &&
1379            iterator->opt & FDB_ITR_SKIP_MAX_KEY) {
1380            iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
1381        }
1382    }
1383
1384    if (iterator->tree_cursor) {
1385        // If WAL tree has an entry, skip Main index for reverse iteration..
1386        iterator->_offset = iterator->tree_cursor->offset;
1387    } else {
1388        iterator->_offset = BLK_NOT_FOUND; // fetch from main index
1389    }
1390
1391    iterator->tree_cursor_prev = iterator->tree_cursor;
1392    return fdb_iterator_prev(iterator);
1393}
1394
1395LIBFDB_API
1396fdb_status fdb_iterator_seek_to_max(fdb_iterator *iterator)
1397{
1398    if (!iterator) {
1399        return FDB_RESULT_INVALID_ARGS;
1400    }
1401
1402    fdb_status ret;
1403    LATENCY_STAT_START();
1404
1405    if (!iterator->hbtrie_iterator) {
1406        ret = _fdb_iterator_seek_to_max_seq(iterator);
1407    } else {
1408        ret = _fdb_iterator_seek_to_max_key(iterator);
1409    }
1410    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_SEEK_MAX);
1411    return ret;
1412}
1413
1414static fdb_status _fdb_iterator_seq_prev(fdb_iterator *iterator)
1415{
1416    size_t size_id, size_seq, seq_kv_len;
1417    uint8_t *seq_kv;
1418    uint64_t offset = BLK_NOT_FOUND;
1419    btree_result br = BTREE_RESULT_FAIL;
1420    hbtrie_result hr;
1421    struct docio_object _doc;
1422    struct docio_handle *dhandle;
1423    struct wal_item *snap_item = NULL;
1424    fdb_seqnum_t seqnum;
1425    fdb_kvs_id_t kv_id;
1426
1427    size_id = sizeof(fdb_kvs_id_t);
1428    size_seq = sizeof(fdb_seqnum_t);
1429    seq_kv = alca(uint8_t, size_id + size_seq);
1430
1431    if (iterator->direction != FDB_ITR_REVERSE) {
1432        if (iterator->status == FDB_ITR_IDX) {
1433            iterator->_offset = BLK_NOT_FOUND; // need to re-examine Trie/trees
1434        }
1435        // re-position WAL key to previous key returned using search because of
1436        // sharded nature of wal (we cannot directly assign prev to cursor)
1437        if (iterator->tree_cursor_prev &&
1438            iterator->tree_cursor != iterator->tree_cursor_prev) {
1439            iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
1440                                                  iterator->tree_cursor_prev);
1441            iterator->status = FDB_ITR_IDX;
1442        } // else Don't move - seek()/init() has already positioned cursor
1443    }
1444
1445start_seq:
1446    seqnum = iterator->_seqnum;
1447    dhandle = iterator->handle->dhandle;
1448
1449    if (iterator->_offset == BLK_NOT_FOUND || // was iterating over btree
1450        !iterator->tree_cursor) { // WAL exhausted
1451        if (iterator->handle->kvs) { // multi KV instance mode
1452            hr = hbtrie_prev(iterator->seqtrie_iterator, seq_kv, &seq_kv_len,
1453                             (void *)&offset);
1454            if (hr == HBTRIE_RESULT_SUCCESS) {
1455                br = BTREE_RESULT_SUCCESS;
1456                buf2kvid(size_id, seq_kv, &kv_id);
1457                if (kv_id != iterator->handle->kvs->id) {
1458                    // iterator is beyond the boundary
1459                    br = BTREE_RESULT_FAIL;
1460                }
1461                memcpy(&seqnum, seq_kv + size_id, size_seq);
1462            } else {
1463                br = BTREE_RESULT_FAIL;
1464            }
1465        } else {
1466            br = btree_prev(iterator->seqtree_iterator, &seqnum,
1467                            (void *)&offset);
1468        }
1469        btreeblk_end(iterator->handle->bhandle);
1470        if (br == BTREE_RESULT_SUCCESS) {
1471            seqnum = _endian_decode(seqnum);
1472            iterator->_seqnum = seqnum;
1473            if (seqnum < iterator->start_seqnum) {
1474                return FDB_RESULT_ITERATOR_FAIL;
1475            }
1476            offset = _endian_decode(offset);
1477            iterator->status = FDB_ITR_IDX;
1478        } else {
1479            iterator->_offset = BLK_NOT_FOUND;
1480            // B-tree has no more items
1481            return FDB_RESULT_ITERATOR_FAIL;
1482        }
1483    } else while (iterator->tree_cursor) {
1484        if (iterator->status == FDB_ITR_WAL) {
1485            iterator->tree_cursor_prev = iterator->tree_cursor;
1486            iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
1487            if (!iterator->tree_cursor) {
1488                goto start_seq;
1489            }
1490        }// else don't move - seek()/ init() has already positioned cursor
1491
1492        iterator->status = FDB_ITR_WAL;
1493        // get the current item of avl tree
1494        snap_item = iterator->tree_cursor;
1495        uint8_t drop_logical_deletes =
1496            (snap_item->action == WAL_ACT_LOGICAL_REMOVE) &&
1497            (iterator->opt & FDB_ITR_NO_DELETES);
1498        if (snap_item->action == WAL_ACT_REMOVE ||
1499                drop_logical_deletes) {
1500            if (br == BTREE_RESULT_FAIL && !iterator->tree_cursor) {
1501                return FDB_RESULT_ITERATOR_FAIL;
1502            }
1503            // this key is removed .. get prev key[WAL]
1504            continue;
1505        }
1506
1507        offset = snap_item->offset;
1508        iterator->_offset = offset; // WAL is not exhausted, ignore B-Tree
1509        iterator->_seqnum = snap_item->seqnum;
1510        break;
1511    }
1512
1513    // To prevent returning duplicate items from sequence iterator, only return
1514    // those b-tree items that exist in HB-trie but not WAL
1515    // (WAL items should have already been returned in reverse iteration)
1516    if (br == BTREE_RESULT_SUCCESS) {
1517        fdb_doc doc_kv;
1518        _doc.key = NULL;
1519        _doc.length.keylen = 0;
1520        _doc.meta = NULL;
1521        _doc.body = NULL;
1522
1523        int64_t _offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
1524                                                  true);
1525        if (_offset <= 0) {
1526            return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
1527        }
1528        if (_doc.length.flag & DOCIO_DELETED &&
1529            (iterator->opt & FDB_ITR_NO_DELETES)) {
1530            free(_doc.key);
1531            free(_doc.meta);
1532            return FDB_RESULT_KEY_NOT_FOUND;
1533        }
1534
1535        doc_kv.key = _doc.key;
1536        doc_kv.keylen = _doc.length.keylen;
1537        doc_kv.seqnum = SEQNUM_NOT_USED;
1538        if (wal_find(iterator->handle->shandle->snap_txn,
1539                     iterator->handle->file,
1540                     &iterator->handle->shandle->cmp_info,
1541                     iterator->handle->shandle,
1542                     &doc_kv, (uint64_t *) &_offset) == FDB_RESULT_SUCCESS &&
1543                     iterator->start_seqnum <= doc_kv.seqnum &&
1544                     doc_kv.seqnum <= iterator->end_seqnum) {
1545            free(_doc.key);
1546            free(_doc.meta);
1547            goto start_seq; // B-tree item exists in WAL, skip for now
1548        }
1549        // Also look in HB-Trie to eliminate duplicates
1550        uint64_t hboffset;
1551        struct docio_object _hbdoc;
1552        hr = hbtrie_find(iterator->handle->trie, _doc.key, _doc.length.keylen,
1553                         (void *)&hboffset);
1554        btreeblk_end(iterator->handle->bhandle);
1555
1556        if (hr != HBTRIE_RESULT_SUCCESS) {
1557            free(_doc.key);
1558            free(_doc.meta);
1559            goto start_seq;
1560        } else { // If present in HB-trie ensure it's seqnum is in range
1561            int64_t _offset;
1562            _hbdoc.key = _doc.key;
1563            _hbdoc.meta = NULL;
1564            hboffset = _endian_decode(hboffset);
1565            _offset = docio_read_doc_key_meta(iterator->handle->dhandle,
1566                                              hboffset, &_hbdoc, true);
1567            if (_offset <= 0) {
1568                free(_doc.key);
1569                free(_doc.meta);
1570                return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
1571            }
1572
1573            if (_doc.seqnum < _hbdoc.seqnum &&
1574                _hbdoc.seqnum <= iterator->end_seqnum) {
1575                free(_doc.key);
1576                free(_doc.meta);
1577                free(_hbdoc.meta);
1578                goto start_seq;
1579            }
1580            free(_hbdoc.meta);
1581        }
1582        free(_doc.key);
1583        free(_doc.meta);
1584    }
1585
1586    iterator->_dhandle = dhandle; // store for fdb_iterator_get()
1587    iterator->_get_offset = offset; // store for fdb_iterator_get()
1588
1589    return FDB_RESULT_SUCCESS;
1590}
1591
1592static fdb_status _fdb_iterator_seq_next(fdb_iterator *iterator)
1593{
1594    size_t size_id, size_seq, seq_kv_len;
1595    uint8_t *seq_kv;
1596    uint64_t offset = BLK_NOT_FOUND;
1597    btree_result br = BTREE_RESULT_FAIL;
1598    hbtrie_result hr;
1599    struct docio_object _doc;
1600    struct docio_handle *dhandle;
1601    struct wal_item *snap_item = NULL;
1602    fdb_seqnum_t seqnum;
1603    fdb_kvs_id_t kv_id;
1604
1605    size_id = sizeof(fdb_kvs_id_t);
1606    size_seq = sizeof(fdb_seqnum_t);
1607    seq_kv = alca(uint8_t, size_id + size_seq);
1608
1609    if (iterator->direction != FDB_ITR_FORWARD) {
1610        if (iterator->status == FDB_ITR_IDX) {
1611            iterator->_offset = BLK_NOT_FOUND; // need to re-examine Trie/trees
1612        }
1613        // re-position WAL key to previous key returned
1614        if (iterator->tree_cursor_prev) {
1615            iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
1616                                    iterator->tree_cursor_prev);
1617            iterator->status = FDB_ITR_IDX;
1618        } // else Don't move - seek()/init() has already positioned cursor
1619    }
1620
1621start_seq:
1622    seqnum = iterator->_seqnum;
1623    dhandle = iterator->handle->dhandle;
1624
1625    // retrieve from sequence b-tree first
1626    if (iterator->_offset == BLK_NOT_FOUND) {
1627        if (iterator->handle->kvs) { // multi KV instance mode
1628            hr = hbtrie_next(iterator->seqtrie_iterator, seq_kv, &seq_kv_len,
1629                             (void *)&offset);
1630            if (hr == HBTRIE_RESULT_SUCCESS) {
1631                br = BTREE_RESULT_SUCCESS;
1632                buf2kvid(size_id, seq_kv, &kv_id);
1633                if (kv_id != iterator->handle->kvs->id) {
1634                    // iterator is beyond the boundary
1635                    br = BTREE_RESULT_FAIL;
1636                }
1637                memcpy(&seqnum, seq_kv + size_id, size_seq);
1638            } else {
1639                br = BTREE_RESULT_FAIL;
1640            }
1641        } else {
1642            br = btree_next(iterator->seqtree_iterator, &seqnum,
1643                            (void *)&offset);
1644        }
1645        btreeblk_end(iterator->handle->bhandle);
1646        if (br == BTREE_RESULT_SUCCESS) {
1647            seqnum = _endian_decode(seqnum);
1648            iterator->_seqnum = seqnum;
1649            if (seqnum > iterator->end_seqnum) {
1650                return FDB_RESULT_ITERATOR_FAIL;
1651            }
1652            offset = _endian_decode(offset);
1653            iterator->_offset = BLK_NOT_FOUND; // continue with B-tree
1654            iterator->status = FDB_ITR_IDX;
1655        }
1656    }
1657
1658    if (br == BTREE_RESULT_FAIL) {
1659        if (iterator->tree_cursor == NULL) {
1660            return FDB_RESULT_ITERATOR_FAIL;
1661        } else {
1662            while (iterator->tree_cursor) {
1663                if (iterator->status == FDB_ITR_WAL) {
1664                    // save the current point for direction change
1665                    iterator->tree_cursor_prev = iterator->tree_cursor;
1666                    iterator->tree_cursor = wal_itr_next(iterator->wal_itr);
1667                    if (!iterator->tree_cursor) {
1668                        return FDB_RESULT_ITERATOR_FAIL;
1669                    }
1670                }// else don't move - seek()/ init() already positioned cursor
1671                // get the current item of WAL tree
1672                iterator->status = FDB_ITR_WAL;
1673                snap_item = iterator->tree_cursor;
1674                uint8_t drop_logical_deletes =
1675                    (snap_item->action == WAL_ACT_LOGICAL_REMOVE) &&
1676                    (iterator->opt & FDB_ITR_NO_DELETES);
1677                if (snap_item->action == WAL_ACT_REMOVE ||
1678                    drop_logical_deletes) {
1679                    if (br == BTREE_RESULT_FAIL && !iterator->tree_cursor) {
1680                        return FDB_RESULT_ITERATOR_FAIL;
1681                    }
1682                    // this key is removed .. get next key[WAL]
1683                    continue;
1684                }
1685                if (snap_item->seqnum < iterator->_seqnum) {
1686                    // smaller than the current seqnum .. get next key[WAL]
1687                    continue;
1688                }
1689                if (snap_item->seqnum > iterator->end_seqnum) {
1690                    // out-of-range .. iterator terminates
1691                    return FDB_RESULT_ITERATOR_FAIL;
1692                }
1693
1694                offset = snap_item->offset;
1695                iterator->_offset = offset; // stops b-tree lookups. favor wal
1696                iterator->_seqnum = snap_item->seqnum;
1697                break;
1698            }
1699        }
1700    }
1701
1702    // To prevent returning duplicate items from sequence iterator, only return
1703    // those b-tree items that exist in HB-trie but not WAL (visit WAL later)
1704    if (br == BTREE_RESULT_SUCCESS) {
1705        _doc.key = NULL;
1706        _doc.length.keylen = 0;
1707        _doc.meta = NULL;
1708        _doc.body = NULL;
1709
1710        fdb_doc doc_kv;
1711        int64_t _offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
1712                                                  true);
1713        if (_offset <= 0) {
1714            return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
1715        }
1716        if (_doc.length.flag & DOCIO_DELETED && (iterator->opt & FDB_ITR_NO_DELETES)) {
1717            free(_doc.key);
1718            free(_doc.meta);
1719            return FDB_RESULT_KEY_NOT_FOUND;
1720        }
1721        doc_kv.key = _doc.key;
1722        doc_kv.keylen = _doc.length.keylen;
1723        doc_kv.seqnum = SEQNUM_NOT_USED; // search by key not seqnum
1724        if (wal_find(iterator->handle->shandle->snap_txn,
1725                    iterator->handle->file,
1726                    &iterator->handle->shandle->cmp_info,
1727                    iterator->handle->shandle,
1728                     &doc_kv, (uint64_t *) &_offset) == FDB_RESULT_SUCCESS &&
1729                iterator->start_seqnum <= doc_kv.seqnum &&
1730                doc_kv.seqnum <= iterator->end_seqnum) {
1731            free(_doc.key);
1732            free(_doc.meta);
1733            goto start_seq; // B-tree item exists in WAL, skip for now
1734        }
1735        // Also look in HB-Trie to eliminate duplicates
1736        uint64_t hboffset;
1737        struct docio_object _hbdoc;
1738        hr = hbtrie_find(iterator->handle->trie, _doc.key, _doc.length.keylen,
1739                         (void *)&hboffset);
1740        btreeblk_end(iterator->handle->bhandle);
1741
1742        if (hr != HBTRIE_RESULT_SUCCESS) {
1743            free(_doc.key);
1744            free(_doc.meta);
1745            goto start_seq;
1746        } else { // If present in HB-trie ensure it's seqnum is in range
1747            int64_t _offset;
1748            _hbdoc.key = _doc.key;
1749            _hbdoc.meta = NULL;
1750            hboffset = _endian_decode(hboffset);
1751            _offset = docio_read_doc_key_meta(iterator->handle->dhandle,
1752                                              hboffset, &_hbdoc,
1753                                              true);
1754            if (_offset <= 0) {
1755                free(_doc.key);
1756                free(_doc.meta);
1757                return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
1758            }
1759            if (_doc.seqnum < _hbdoc.seqnum &&
1760                _hbdoc.seqnum <= iterator->end_seqnum) {
1761                free(_doc.key);
1762                free(_doc.meta);
1763                free(_hbdoc.meta);
1764                goto start_seq;
1765            }
1766            free(_hbdoc.meta);
1767        }
1768        free(_doc.key);
1769        free(_doc.meta);
1770    }
1771
1772    iterator->_dhandle = dhandle; // store for fdb_iterator_get
1773    iterator->_get_offset = offset; // store for fdb_iterator_get
1774
1775    return FDB_RESULT_SUCCESS;
1776}
1777
1778LIBFDB_API
1779fdb_status fdb_iterator_prev(fdb_iterator *iterator)
1780{
1781    if (!iterator) {
1782        return FDB_RESULT_INVALID_ARGS;
1783    }
1784
1785    fdb_status result = FDB_RESULT_SUCCESS;
1786    LATENCY_STAT_START();
1787
1788    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
1789        return FDB_RESULT_HANDLE_BUSY;
1790    }
1791
1792    if (iterator->hbtrie_iterator) {
1793        while ((result = _fdb_iterator_prev(iterator)) ==
1794                FDB_RESULT_KEY_NOT_FOUND);
1795    } else {
1796        while ((result = _fdb_iterator_seq_prev(iterator)) ==
1797                FDB_RESULT_KEY_NOT_FOUND);
1798    }
1799    if (result == FDB_RESULT_SUCCESS) {
1800        iterator->direction = FDB_ITR_REVERSE;
1801    } else {
1802        iterator->_dhandle = NULL; // fail fdb_iterator_get also
1803        if (iterator->direction != FDB_ITR_DIR_NONE) {
1804            if ((iterator->seqtree_iterator || iterator->seqtrie_iterator) &&
1805                    iterator->status == FDB_ITR_IDX) {
1806                iterator->_offset = BLK_NOT_FOUND;
1807            }
1808        }
1809    }
1810
1811    atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1812    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_moves,
1813                         std::memory_order_relaxed);
1814    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_PREV);
1815    return result;
1816}
1817
1818LIBFDB_API
1819fdb_status fdb_iterator_next(fdb_iterator *iterator)
1820{
1821    if (!iterator) {
1822        return FDB_RESULT_INVALID_ARGS;
1823    }
1824
1825    fdb_status result = FDB_RESULT_SUCCESS;
1826    LATENCY_STAT_START();
1827
1828    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
1829        return FDB_RESULT_HANDLE_BUSY;
1830    }
1831
1832    if (iterator->hbtrie_iterator) {
1833        while ((result = _fdb_iterator_next(iterator)) ==
1834                FDB_RESULT_KEY_NOT_FOUND);
1835    } else {
1836        while ((result = _fdb_iterator_seq_next(iterator)) ==
1837                FDB_RESULT_KEY_NOT_FOUND);
1838    }
1839    if (result == FDB_RESULT_SUCCESS) {
1840        iterator->direction = FDB_ITR_FORWARD;
1841    } else {
1842        iterator->_dhandle = NULL; // fail fdb_iterator_get also
1843        if (iterator->direction != FDB_ITR_DIR_NONE) {
1844            if ((iterator->seqtree_iterator || iterator->seqtrie_iterator) &&
1845                    iterator->status == FDB_ITR_IDX) {
1846                iterator->_offset = BLK_NOT_FOUND;
1847            }
1848        }
1849    }
1850
1851    atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1852    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_moves,
1853                         std::memory_order_relaxed);
1854    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_NEXT);
1855    return result;
1856}
1857
1858// DOC returned by this function must be freed by fdb_doc_free
1859// if it was allocated because the incoming doc was pointing to NULL
1860LIBFDB_API
1861fdb_status fdb_iterator_get(fdb_iterator *iterator, fdb_doc **doc)
1862{
1863    if (!iterator || !doc) {
1864        return FDB_RESULT_INVALID_ARGS;
1865    }
1866
1867    struct docio_object _doc;
1868    fdb_status ret = FDB_RESULT_SUCCESS;
1869    uint64_t offset;
1870    struct docio_handle *dhandle;
1871    size_t size_chunk = iterator->handle->config.chunksize;
1872    bool alloced_key, alloced_meta, alloced_body;
1873    LATENCY_STAT_START();
1874
1875    dhandle = iterator->_dhandle;
1876    if (!dhandle || iterator->_get_offset == BLK_NOT_FOUND) {
1877        return FDB_RESULT_ITERATOR_FAIL;
1878    }
1879
1880    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
1881        return FDB_RESULT_HANDLE_BUSY;
1882    }
1883
1884    offset = iterator->_get_offset;
1885
1886    if (*doc == NULL) {
1887        ret = fdb_doc_create(doc, NULL, 0, NULL, 0, NULL, 0);
1888        if (ret != FDB_RESULT_SUCCESS) { // LCOV_EXCL_START
1889            atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1890            return ret;
1891        } // LCOV_EXCL_STOP
1892        _doc.key = NULL;
1893        _doc.length.keylen = 0;
1894        _doc.meta = NULL;
1895        _doc.body = NULL;
1896        alloced_key = true;
1897        alloced_meta = true;
1898        alloced_body = true;
1899    } else {
1900        _doc.key = (*doc)->key;
1901        _doc.meta = (*doc)->meta;
1902        _doc.body = (*doc)->body;
1903        alloced_key = _doc.key ? false : true;
1904        alloced_meta = _doc.meta ? false : true;
1905        alloced_body = _doc.body ? false : true;
1906    }
1907
1908    int64_t _offset = docio_read_doc(dhandle, offset, &_doc, true);
1909    if (_offset <= 0) {
1910        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1911        return _offset < 0 ? (fdb_status) _offset : FDB_RESULT_KEY_NOT_FOUND;
1912    }
1913    if (_doc.length.flag & DOCIO_DELETED &&
1914        (iterator->opt & FDB_ITR_NO_DELETES)) {
1915        if (alloced_key) {
1916            free(_doc.key);
1917        }
1918        if (alloced_meta) {
1919            free(_doc.meta);
1920        }
1921        if (alloced_body) {
1922            free(_doc.body);
1923        }
1924        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1925        return FDB_RESULT_KEY_NOT_FOUND;
1926    }
1927
1928    if (iterator->handle->kvs) {
1929        // eliminate KV ID from key
1930        _doc.length.keylen -= size_chunk;
1931        memmove(_doc.key, (uint8_t*)_doc.key + size_chunk, _doc.length.keylen);
1932    }
1933
1934    if (alloced_key) {
1935        (*doc)->key = _doc.key;
1936    }
1937    if (alloced_meta) {
1938        (*doc)->meta = _doc.meta;
1939    }
1940    if (alloced_body) {
1941        (*doc)->body = _doc.body;
1942    }
1943    (*doc)->keylen = _doc.length.keylen;
1944    (*doc)->metalen = _doc.length.metalen;
1945    (*doc)->bodylen = _doc.length.bodylen;
1946    (*doc)->seqnum = _doc.seqnum;
1947    (*doc)->deleted = _doc.length.flag & DOCIO_DELETED;
1948    (*doc)->offset = offset;
1949
1950    atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1951    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_gets,
1952                         std::memory_order_relaxed);
1953    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_GET);
1954    return ret;
1955}
1956
1957// DOC returned by this function must be freed using 'fdb_doc_free'
1958LIBFDB_API
1959fdb_status fdb_iterator_get_metaonly(fdb_iterator *iterator, fdb_doc **doc)
1960{
1961    if (!iterator || !doc) {
1962        return FDB_RESULT_INVALID_ARGS;
1963    }
1964
1965    struct docio_object _doc;
1966    fdb_status ret = FDB_RESULT_SUCCESS;
1967    uint64_t offset;
1968    int64_t _offset;
1969    struct docio_handle *dhandle;
1970    size_t size_chunk = iterator->handle->config.chunksize;
1971    bool alloced_key, alloced_meta;
1972    LATENCY_STAT_START();
1973
1974    dhandle = iterator->_dhandle;
1975    if (!dhandle || iterator->_get_offset == BLK_NOT_FOUND) {
1976        return FDB_RESULT_ITERATOR_FAIL;
1977    }
1978
1979    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
1980        return FDB_RESULT_HANDLE_BUSY;
1981    }
1982
1983    offset = iterator->_get_offset;
1984
1985    if (*doc == NULL) {
1986        ret = fdb_doc_create(doc, NULL, 0, NULL, 0, NULL, 0);
1987        if (ret != FDB_RESULT_SUCCESS) { // LCOV_EXCL_START
1988            atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1989            return ret;
1990        } // LCOV_EXCL_STOP
1991        _doc.key = NULL;
1992        _doc.length.keylen = 0;
1993        _doc.meta = NULL;
1994        _doc.body = NULL;
1995        alloced_key = true;
1996        alloced_meta = true;
1997    } else {
1998        _doc.key = (*doc)->key;
1999        _doc.meta = (*doc)->meta;
2000        _doc.body = NULL;
2001        alloced_key = _doc.key ? false : true;
2002        alloced_meta = _doc.meta ? false : true;
2003    }
2004
2005    _offset = docio_read_doc_key_meta(dhandle, offset, &_doc, true);
2006    if (_offset <= 0) {
2007        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2008        return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
2009    }
2010    if (_doc.length.flag & DOCIO_DELETED &&
2011            (iterator->opt & FDB_ITR_NO_DELETES)) {
2012        if (alloced_key) {
2013            free(_doc.key);
2014        }
2015        if (alloced_meta) {
2016            free(_doc.meta);
2017        }
2018        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2019        return FDB_RESULT_KEY_NOT_FOUND;
2020    }
2021
2022    if (iterator->handle->kvs) {
2023        // eliminate KV ID from key
2024        _doc.length.keylen -= size_chunk;
2025        memmove(_doc.key, (uint8_t*)_doc.key + size_chunk, _doc.length.keylen);
2026    }
2027    if (alloced_key) {
2028        (*doc)->key = _doc.key;
2029    }
2030    if (alloced_meta) {
2031        (*doc)->meta = _doc.meta;
2032    }
2033    (*doc)->keylen = _doc.length.keylen;
2034    (*doc)->metalen = _doc.length.metalen;
2035    (*doc)->bodylen = _doc.length.bodylen;
2036    (*doc)->seqnum = _doc.seqnum;
2037    (*doc)->deleted = _doc.length.flag & DOCIO_DELETED;
2038    (*doc)->offset = offset;
2039
2040    atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2041    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_gets,
2042                         std::memory_order_relaxed);
2043    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_GET_META);
2044    return ret;
2045}
2046
2047LIBFDB_API
2048fdb_status fdb_iterator_close(fdb_iterator *iterator)
2049{
2050    if (!iterator) {
2051        return FDB_RESULT_INVALID_ARGS;
2052    }
2053
2054    LATENCY_STAT_START();
2055    if (iterator->hbtrie_iterator) {
2056        hbtrie_iterator_free(iterator->hbtrie_iterator);
2057        free(iterator->hbtrie_iterator);
2058    }
2059
2060    if (iterator->seqtree_iterator) {
2061        btree_iterator_free(iterator->seqtree_iterator);
2062        free(iterator->seqtree_iterator);
2063    }
2064    if (iterator->seqtrie_iterator) {
2065        hbtrie_iterator_free(iterator->seqtrie_iterator);
2066        free(iterator->seqtrie_iterator);
2067    }
2068
2069    if (iterator->start_key) {
2070        free(iterator->start_key);
2071    }
2072
2073    if (iterator->end_key) {
2074        free(iterator->end_key);
2075    }
2076
2077    --iterator->handle->num_iterators; // Decrement the iterator counter of the KV handle
2078    wal_itr_close(iterator->wal_itr);
2079
2080    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_CLOSE);
2081
2082    if (!iterator->snapshot_handle) {
2083        // Close the opened handle in the iterator,
2084        // if the handle is not for snapshot.
2085        fdb_status fs = fdb_kvs_close(iterator->handle);
2086        if (fs != FDB_RESULT_SUCCESS) {
2087            fdb_log(&iterator->handle->log_callback, fs,
2088                    "Failed to close the KV Store from a database file '%s' as "
2089                    "part of closing the iterator",
2090                    iterator->handle->file->filename);
2091        }
2092    }
2093
2094    free(iterator->_key);
2095    free(iterator);
2096    return FDB_RESULT_SUCCESS;
2097}
2098