xref: /6.0.3/forestdb/src/iterator.cc (revision 56236603)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22
23#include "libforestdb/forestdb.h"
24#include "fdb_internal.h"
25#include "hbtrie.h"
26#include "docio.h"
27#include "btreeblock.h"
28#include "common.h"
29#include "wal.h"
30#include "avltree.h"
31#include "list.h"
32#include "internal_types.h"
33#include "btree_var_kv_ops.h"
34#include "timing.h"
35
36#include "memleak.h"
37
38#ifdef __DEBUG
39#ifndef __DEBUG_FDB
40    #undef DBG
41    #undef DBGCMD
42    #undef DBGSW
43    #define DBG(...)
44    #define DBGCMD(...)
45    #define DBGSW(n, ...)
46#endif
47#endif
48
49// lexicographically compares two variable-length binary streams
50static int _fdb_keycmp(void *key1, size_t keylen1, void *key2, size_t keylen2)
51{
52    if (keylen1 == keylen2) {
53        return memcmp(key1, key2, keylen1);
54    }else {
55        size_t len = MIN(keylen1, keylen2);
56        int cmp = memcmp(key1, key2, len);
57        if (cmp != 0) return cmp;
58        else {
59            return (int)((int)keylen1 - (int)keylen2);
60        }
61    }
62}
63
64static int _fdb_key_cmp(fdb_iterator *iterator, void *key1, size_t keylen1,
65                        void *key2, size_t keylen2) {
66    int cmp;
67    if (iterator->handle->kvs_config.custom_cmp) {
68        // custom compare function for variable length key
69        if (iterator->handle->kvs) {
70            // multi KV instance mode
71            // KV ID should be compared separately
72            size_t size_chunk = iterator->handle->config.chunksize;
73            fdb_kvs_id_t a_id, b_id;
74            buf2kvid(size_chunk, key1, &a_id);
75            buf2kvid(size_chunk, key2, &b_id);
76
77            if (a_id < b_id) {
78                cmp = -1;
79            } else if (a_id > b_id) {
80                cmp = 1;
81            } else {
82                if (keylen1 == size_chunk) { // key1 < key2
83                    return -1;
84                } else if (keylen2 == size_chunk) { // key1 > key2
85                    return 1;
86                }
87                cmp = iterator->handle->kvs_config.custom_cmp(
88                          (uint8_t*)key1 + size_chunk, keylen1 - size_chunk,
89                          (uint8_t*)key2 + size_chunk, keylen2 - size_chunk);
90            }
91        } else {
92            cmp = iterator->handle->kvs_config.custom_cmp(key1, keylen1,
93                                                       key2, keylen2);
94        }
95    } else {
96        cmp = _fdb_keycmp(key1, keylen1, key2, keylen2);
97    }
98    return cmp;
99}
100
101LIBFDB_API
102fdb_status fdb_iterator_init(fdb_kvs_handle *handle,
103                             fdb_iterator **ptr_iterator,
104                             const void *start_key,
105                             size_t start_keylen,
106                             const void *end_key,
107                             size_t end_keylen,
108                             fdb_iterator_opt_t opt)
109{
110    if (!handle) {
111        return FDB_RESULT_INVALID_HANDLE;
112    }
113
114    if (start_keylen > FDB_MAX_KEYLEN ||
115        (handle->kvs_config.custom_cmp &&
116           (start_keylen > handle->config.blocksize - HBTRIE_HEADROOM ||
117            end_keylen > handle->config.blocksize - HBTRIE_HEADROOM)) ||
118        end_keylen > FDB_MAX_KEYLEN) {
119        return FDB_RESULT_INVALID_ARGS;
120    }
121
122    if ((opt & FDB_ITR_SKIP_MIN_KEY && (!start_key || !start_keylen)) ||
123        (opt & FDB_ITR_SKIP_MAX_KEY && (!end_key || !end_keylen))) {
124        return FDB_RESULT_INVALID_ARGS;
125    }
126
127    hbtrie_result hr;
128    fdb_status fs;
129    LATENCY_STAT_START();
130
131    if (!handle->shandle) {
132        // If compaction is already done before this line,
133        // handle->file needs to be replaced with handle->new_file.
134        fdb_check_file_reopen(handle, NULL);
135        fdb_sync_db_header(handle);
136    }
137
138    fdb_iterator *iterator = (fdb_iterator *)calloc(1, sizeof(fdb_iterator));
139
140    if (!handle->shandle) {
141        // snapshot handle doesn't exist
142        // open a new handle to make the iterator handle as a snapshot
143        fs = fdb_snapshot_open(handle, &iterator->handle, FDB_SNAPSHOT_INMEM);
144        if (fs != FDB_RESULT_SUCCESS) {
145            fdb_log(&handle->log_callback, fs,
146                    "Failed to create an iterator instance due to the failure of "
147                    "open operation on the KV Store '%s' in a database file '%s'",
148                    _fdb_kvs_get_name(handle, handle->file),
149                    handle->file->filename);
150            return fs;
151        }
152        iterator->snapshot_handle = false;
153    } else {
154        // Snapshot handle exists
155        // We don't need to open a new handle.. just point to the snapshot handle.
156        iterator->handle = handle;
157        iterator->snapshot_handle = true;
158    }
159    iterator->opt = opt;
160
161    iterator->_key = (void*)malloc(FDB_MAX_KEYLEN_INTERNAL);
162    // set to zero the first <chunksize> bytes
163    memset(iterator->_key, 0x0, iterator->handle->config.chunksize);
164    iterator->_keylen = 0;
165    iterator->_offset = BLK_NOT_FOUND;
166    iterator->hbtrie_iterator = NULL;
167    iterator->seqtree_iterator = NULL;
168    iterator->seqtrie_iterator = NULL;
169
170    if (iterator->handle->kvs) {
171        // multi KV instance mode .. prepend KV ID
172        size_t size_chunk = handle->config.chunksize;
173        uint8_t *start_key_temp, *end_key_temp;
174
175        if (start_key == NULL) {
176            start_key_temp = alca(uint8_t, size_chunk);
177            kvid2buf(size_chunk, iterator->handle->kvs->id, start_key_temp);
178            start_key = start_key_temp;
179            start_keylen = size_chunk;
180        } else {
181            start_key_temp = alca(uint8_t, size_chunk + start_keylen);
182            kvid2buf(size_chunk, iterator->handle->kvs->id, start_key_temp);
183            memcpy(start_key_temp + size_chunk, start_key, start_keylen);
184            start_key = start_key_temp;
185            start_keylen += size_chunk;
186        }
187
188        if (end_key == NULL) {
189            // set end_key as NULL key of the next KV ID.
190            // NULL key doesn't actually exist so that the iterator ends
191            // at the last key of the current KV ID.
192            end_key_temp = alca(uint8_t, size_chunk);
193            kvid2buf(size_chunk, iterator->handle->kvs->id+1, end_key_temp);
194            end_key = end_key_temp;
195            end_keylen = size_chunk;
196        } else {
197            end_key_temp = alca(uint8_t, size_chunk + end_keylen);
198            kvid2buf(size_chunk, iterator->handle->kvs->id, end_key_temp);
199            memcpy(end_key_temp + size_chunk, end_key, end_keylen);
200            end_key = end_key_temp;
201            end_keylen += size_chunk;
202        }
203
204        iterator->start_key = (void*)malloc(start_keylen);
205        memcpy(iterator->start_key, start_key, start_keylen);
206        iterator->start_keylen = start_keylen;
207
208        iterator->end_key = (void*)malloc(end_keylen);
209        memcpy(iterator->end_key, end_key, end_keylen);
210        iterator->end_keylen = end_keylen;
211
212    } else { // single KV instance mode
213        if (start_key == NULL) {
214            iterator->start_key = NULL;
215            iterator->start_keylen = 0;
216        } else {
217            iterator->start_key = (void*)malloc(start_keylen);
218            memcpy(iterator->start_key, start_key, start_keylen);
219            iterator->start_keylen = start_keylen;
220        }
221
222        if (end_key == NULL) {
223            iterator->end_key = NULL;
224            end_keylen = 0;
225        }else{
226            iterator->end_key = (void*)malloc(end_keylen);
227            memcpy(iterator->end_key, end_key, end_keylen);
228        }
229        iterator->end_keylen = end_keylen;
230    }
231
232    // create an iterator handle for hb-trie
233    iterator->hbtrie_iterator = (struct hbtrie_iterator *)
234                                malloc(sizeof(struct hbtrie_iterator));
235    hr = hbtrie_iterator_init(iterator->handle->trie,
236                              iterator->hbtrie_iterator,
237                              (void *)start_key, start_keylen);
238    if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
239        _fdb_invalidate_dbheader(iterator->handle);
240        return FDB_RECOVERABLE_ERR;
241    }
242    assert(hr == HBTRIE_RESULT_SUCCESS);
243
244    wal_itr_init(iterator->handle->file, iterator->handle->shandle, true,
245                 &iterator->wal_itr);
246
247    if (start_key) {
248        struct wal_item query;
249        struct wal_item_header query_key;
250        query.header = &query_key;
251        query_key.key = iterator->start_key;
252        query_key.keylen = iterator->start_keylen;
253        iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
254                                                       &query);
255    } else {
256        iterator->tree_cursor = wal_itr_first(iterator->wal_itr);
257    }
258    // to know reverse iteration endpoint store the start cursor
259    if (iterator->tree_cursor) {
260        iterator->tree_cursor_start = iterator->tree_cursor;
261    }
262    iterator->tree_cursor_prev = iterator->tree_cursor;
263    iterator->direction = FDB_ITR_DIR_NONE;
264    iterator->status = FDB_ITR_IDX;
265    iterator->_dhandle = NULL; // populated at the first iterator movement
266
267    *ptr_iterator = iterator;
268
269    ++iterator->handle->num_iterators; // Increment the iterator counter of the KV handle
270    fdb_iterator_next(iterator); // position cursor at first key
271
272    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_INIT);
273
274    return FDB_RESULT_SUCCESS;
275}
276
277LIBFDB_API
278fdb_status fdb_iterator_sequence_init(fdb_kvs_handle *handle,
279                                      fdb_iterator **ptr_iterator,
280                                      const fdb_seqnum_t start_seq,
281                                      const fdb_seqnum_t end_seq,
282                                      fdb_iterator_opt_t opt)
283{
284    if (!handle) {
285        return FDB_RESULT_INVALID_HANDLE;
286    }
287
288    if (ptr_iterator == NULL || (end_seq && start_seq > end_seq)) {
289        return FDB_RESULT_INVALID_ARGS;
290    }
291
292    fdb_status fs;
293    fdb_seqnum_t _start_seq = _endian_encode(start_seq);
294    fdb_kvs_id_t _kv_id;
295    size_t size_id, size_seq;
296    uint8_t *start_seq_kv;
297    struct wal_item query;
298    struct wal_item_header query_key;
299    LATENCY_STAT_START();
300
301    query.header = &query_key;
302
303    // Sequence trees are a must for byseq operations
304    if (handle->config.seqtree_opt != FDB_SEQTREE_USE) {
305        return FDB_RESULT_INVALID_CONFIG;
306    }
307
308    if (!handle->shandle) {
309        // If compaction is already done before this line,
310        // handle->file needs to be replaced with handle->new_file.
311        fdb_check_file_reopen(handle, NULL);
312        fdb_sync_db_header(handle);
313    }
314
315    size_id = sizeof(fdb_kvs_id_t);
316    size_seq = sizeof(fdb_seqnum_t);
317    fdb_iterator *iterator = (fdb_iterator *)calloc(1, sizeof(fdb_iterator));
318
319    if (!handle->shandle) {
320        // snapshot handle doesn't exist
321        // open a new handle to make the iterator handle as a snapshot
322        fs = fdb_snapshot_open(handle, &iterator->handle, FDB_SNAPSHOT_INMEM);
323        if (fs != FDB_RESULT_SUCCESS) {
324            fdb_log(&handle->log_callback, fs,
325                    "Failed to create an sequence iterator instance due to the "
326                    "failure of "
327                    "open operation on the KV Store '%s' in a database file '%s'",
328                    _fdb_kvs_get_name(handle, handle->file),
329                    handle->file->filename);
330            return fs;
331        }
332        iterator->snapshot_handle = false;
333    } else {
334        // Snapshot handle exists
335        // We don't need to open a new handle.. just point to the snapshot handle.
336        iterator->handle = handle;
337        iterator->snapshot_handle = true;
338    }
339
340    iterator->hbtrie_iterator = NULL;
341    iterator->_key = NULL;
342    iterator->_keylen = 0;
343    iterator->opt = opt;
344    iterator->_offset = BLK_NOT_FOUND;
345    iterator->_seqnum = start_seq;
346
347    // For easy API call, treat zero seq as 0xffff...
348    // (because zero seq number is not used)
349    if (end_seq == 0) {
350        iterator->end_seqnum = SEQNUM_NOT_USED;
351    } else {
352        iterator->end_seqnum = end_seq;
353    }
354
355    iterator->start_seqnum = start_seq;
356
357    iterator->start_key = NULL;
358    iterator->end_key = NULL;
359
360    wal_itr_init(handle->file, iterator->handle->shandle, false,
361                 &iterator->wal_itr);
362
363    if (iterator->handle->kvs) {
364        int size_chunk = handle->config.chunksize;
365        // create an iterator handle for hb-trie
366        start_seq_kv = alca(uint8_t, size_chunk + size_seq);
367        _kv_id = _endian_encode(iterator->handle->kvs->id);
368        memcpy(start_seq_kv, &_kv_id, size_id);
369        memcpy(start_seq_kv + size_id, &_start_seq, size_seq);
370
371        iterator->seqtrie_iterator = (struct hbtrie_iterator *)
372                                     calloc(1, sizeof(struct hbtrie_iterator));
373        if (hbtrie_iterator_init(iterator->handle->seqtrie,
374                             iterator->seqtrie_iterator,
375                                  start_seq_kv, size_id + size_seq)
376            == HBTRIE_CORRUPTED_RECOVERING_ERR){
377            _fdb_invalidate_dbheader(iterator->handle);
378            return FDB_RECOVERABLE_ERR;
379        }
380
381        query_key.key = start_seq_kv;
382        kvid2buf(size_chunk, iterator->handle->kvs->id, start_seq_kv);
383        memcpy(start_seq_kv + size_chunk, &start_seq, size_seq);
384        query_key.keylen = size_chunk + size_seq;
385        query.seqnum = start_seq;
386        iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
387                                                       &query);
388    } else {
389        // create an iterator handle for b-tree
390        iterator->seqtree_iterator = (struct btree_iterator *)
391                                     calloc(1, sizeof(struct btree_iterator));
392        btree_iterator_init(iterator->handle->seqtree,
393                            iterator->seqtree_iterator,
394                            (void *)(start_seq ? &_start_seq : NULL));
395        query_key.key = (void*)NULL;
396        query_key.keylen = 0;
397        query.seqnum = start_seq;
398        iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
399                                                       &query);
400    }
401
402    // to know reverse iteration endpoint store the start cursor
403    if (iterator->tree_cursor) {
404        iterator->tree_cursor_start = iterator->tree_cursor;
405    }
406    iterator->tree_cursor_prev = iterator->tree_cursor;
407    iterator->direction = FDB_ITR_DIR_NONE;
408    iterator->status = FDB_ITR_IDX;
409    iterator->_dhandle = NULL; // populated at the first iterator movement
410
411    *ptr_iterator = iterator;
412
413    ++iterator->handle->num_iterators; // Increment the iterator counter of the KV handle
414    fdb_iterator_next(iterator); // position cursor at first key
415
416    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_SEQ_INIT);
417
418    return FDB_RESULT_SUCCESS;
419}
420
421static fdb_status _fdb_iterator_prev(fdb_iterator *iterator)
422{
423    int cmp;
424    void *key;
425    size_t keylen;
426    uint64_t offset;
427    hbtrie_result hr = HBTRIE_RESULT_SUCCESS;
428    struct docio_handle *dhandle;
429    struct wal_item *snap_item = NULL;
430
431    if (iterator->direction != FDB_ITR_REVERSE) {
432        iterator->_offset = BLK_NOT_FOUND; // need to re-examine Trie/trees
433        if (iterator->tree_cursor) {
434            // just turn around
435            // WAL:   0  v  2->   4    (OLD state)
436            // TRIE:     1  2  3  4
437            iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
438                                                  iterator->tree_cursor);
439            if (iterator->direction == FDB_ITR_FORWARD &&
440                iterator->status != FDB_ITR_WAL) {
441                iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
442            }
443            // WAL: <-0  v  2     4    (NEW state)
444            // TRIE:  0  1  2  3  4
445        } else if (iterator->tree_cursor_prev) { // gone past the end..
446            iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
447                                             iterator->tree_cursor_prev);
448            iterator->status = FDB_ITR_IDX;
449        } // else Don't move - seek()/init() has already positioned cursor
450    }
451start:
452    key = iterator->_key;
453    dhandle = iterator->handle->dhandle;
454
455    // retrieve from hb-trie
456    if (iterator->_offset == BLK_NOT_FOUND) {
457        // no key waiting for being returned
458        // get next key from hb-trie (or idtree)
459        struct docio_object _doc;
460        // Move Main index Cursor backward...
461        int64_t _offset;
462        do {
463            hr = hbtrie_prev(iterator->hbtrie_iterator, key,
464                             &iterator->_keylen, (void*)&iterator->_offset);
465            if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
466                _fdb_invalidate_dbheader(iterator->handle);
467                return FDB_RECOVERABLE_ERR;
468            }
469
470            btreeblk_end(iterator->handle->bhandle);
471            iterator->_offset = _endian_decode(iterator->_offset);
472            if (!(iterator->opt & FDB_ITR_NO_DELETES) ||
473                  hr != HBTRIE_RESULT_SUCCESS) {
474                break;
475            }
476            // deletion check
477            memset(&_doc, 0x0, sizeof(struct docio_object));
478            _offset = docio_read_doc_key_meta(dhandle, iterator->_offset,
479                                              &_doc, true);
480            if (_offset <= 0) { // read fail
481                continue; // get prev doc
482            }
483            if (_doc.length.flag & DOCIO_DELETED) { // deleted doc
484                free(_doc.key);
485                free(_doc.meta);
486                continue; // get prev doc
487            }
488            free(_doc.key);
489            free(_doc.meta);
490            break;
491        } while (1);
492    }
493    keylen = iterator->_keylen;
494    offset = iterator->_offset;
495
496    if (hr != HBTRIE_RESULT_SUCCESS && !iterator->tree_cursor) {
497        return FDB_RESULT_ITERATOR_FAIL;
498    }
499
500    // Move the WAL cursor backward...
501    while (iterator->tree_cursor) {
502        if (iterator->status == FDB_ITR_WAL) {
503            iterator->tree_cursor_prev = iterator->tree_cursor;
504            iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
505        }// else don't move - seek()/ init() has already positioned cursor
506
507        // get the current item of avl-tree
508        snap_item = iterator->tree_cursor;
509        if (!snap_item) {
510            if (hr == HBTRIE_RESULT_SUCCESS) {
511                break;
512            } else {
513                return FDB_RESULT_ITERATOR_FAIL;
514            }
515        }
516        if (hr == HBTRIE_RESULT_SUCCESS) {
517            cmp = _fdb_key_cmp(iterator, snap_item->header->key,
518                               snap_item->header->keylen,
519                               key, keylen);
520        } else {
521            // no more docs in hb-trie
522            cmp = 1;
523        }
524
525        if (cmp >= 0) {
526            // key[WAL] >= key[hb-trie] .. take key[WAL] first
527            uint8_t drop_logical_deletes =
528                (snap_item->action == WAL_ACT_LOGICAL_REMOVE) &&
529                (iterator->opt & FDB_ITR_NO_DELETES);
530            iterator->status = FDB_ITR_WAL;
531            if (cmp > 0) {
532                if (snap_item->action == WAL_ACT_REMOVE ||
533                    drop_logical_deletes) {
534                    // this key is removed .. get prev key[WAL]
535                    continue;
536                }
537            } else { // same key found in WAL
538                iterator->_offset = BLK_NOT_FOUND; // drop key from trie
539                if (snap_item->action == WAL_ACT_REMOVE || drop_logical_deletes) {
540                    // the key is removed .. start over again
541                    goto start;
542                }
543            }
544
545            key = snap_item->header->key;
546            keylen = snap_item->header->keylen;
547            // key[hb-trie] is stashed in iterator->_key for future call
548            offset = snap_item->offset;
549        }
550        break;
551    }
552
553    if (offset == iterator->_offset) {
554        // take key[hb-trie] & and fetch the prev key[hb-trie] at next turn
555        iterator->_offset = BLK_NOT_FOUND;
556        iterator->status = FDB_ITR_IDX;
557    }
558
559    if (iterator->start_key) {
560        cmp = _fdb_key_cmp(iterator, iterator->start_key,
561                           iterator->start_keylen, key, keylen);
562
563        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MIN_KEY) || cmp > 0) {
564            // current key (KEY) is lexicographically less than START_KEY
565            // OR it is the start key and user wishes to skip it..
566            // terminate the iteration
567            return FDB_RESULT_ITERATOR_FAIL;
568        }
569    }
570
571    if (iterator->end_key) {
572        cmp = _fdb_key_cmp(iterator, iterator->end_key,
573                           iterator->end_keylen, key, keylen);
574
575        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MAX_KEY) || cmp < 0) {
576            // key is the end_key but users wishes to skip it, redo..
577            // OR current key (KEY) is lexicographically greater than END_KEY
578            goto start;
579        }
580    }
581
582    iterator->_dhandle = dhandle; // store for fdb_iterator_get()
583    iterator->_get_offset = offset; // store for fdb_iterator_get()
584
585    return FDB_RESULT_SUCCESS;
586}
587
588static fdb_status _fdb_iterator_next(fdb_iterator *iterator)
589{
590    int cmp;
591    void *key;
592    size_t keylen;
593    uint64_t offset;
594    hbtrie_result hr = HBTRIE_RESULT_SUCCESS;
595    struct docio_handle *dhandle;
596    struct wal_item *snap_item = NULL;
597
598    if (iterator->direction != FDB_ITR_FORWARD) {
599        iterator->_offset = BLK_NOT_FOUND; // need to re-examine Trie/trees
600        // just turn around and face forward..
601        if (iterator->tree_cursor) {
602            // WAL: <-0  v  2     4    (OLD state)
603            // TRIE:     1  2  3  4
604            iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
605                                                  iterator->tree_cursor);
606            if (iterator->direction == FDB_ITR_REVERSE &&
607                iterator->status != FDB_ITR_WAL) {
608                iterator->tree_cursor = wal_itr_next(iterator->wal_itr);
609            }
610            // WAL:   0  v  2->   4    (NEW state)
611            // TRIE:  0  1  2  3  4
612        } else if (iterator->tree_cursor_prev) {
613            iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
614                                             iterator->tree_cursor_prev);
615            iterator->status = FDB_ITR_IDX;
616        } // else Don't move - seek()/init() has already positioned cursor
617    }
618
619start:
620    key = iterator->_key;
621    dhandle = iterator->handle->dhandle;
622
623    // retrieve from hb-trie
624    if (iterator->_offset == BLK_NOT_FOUND) {
625        // no key waiting for being returned
626        // get next key from hb-trie (or idtree)
627        struct docio_object _doc;
628        // Move Main index Cursor forward...
629        int64_t _offset;
630        do {
631            hr = hbtrie_next(iterator->hbtrie_iterator, key,
632                             &iterator->_keylen, (void*)&iterator->_offset);
633            if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
634                _fdb_invalidate_dbheader(iterator->handle);
635                return FDB_RECOVERABLE_ERR;
636            }
637            btreeblk_end(iterator->handle->bhandle);
638            iterator->_offset = _endian_decode(iterator->_offset);
639            if (!(iterator->opt & FDB_ITR_NO_DELETES) ||
640                  hr != HBTRIE_RESULT_SUCCESS) {
641                break;
642            }
643            // deletion check
644            memset(&_doc, 0x0, sizeof(struct docio_object));
645            _offset = docio_read_doc_key_meta(dhandle, iterator->_offset, &_doc,
646                                              true);
647            if (_offset <= 0) { // read fail
648                continue; // get next doc
649            }
650            if (_doc.length.flag & DOCIO_DELETED) { // deleted doc
651                free(_doc.key);
652                free(_doc.meta);
653                continue; // get next doc
654            }
655            free(_doc.key);
656            free(_doc.meta);
657            break;
658        } while (1);
659    }
660
661    keylen = iterator->_keylen;
662    offset = iterator->_offset;
663
664    if (hr != HBTRIE_RESULT_SUCCESS && iterator->tree_cursor == NULL) {
665        return FDB_RESULT_ITERATOR_FAIL;
666    }
667
668    // Move WAL Cursor forward...
669    while (iterator->tree_cursor) {
670        if (iterator->status == FDB_ITR_WAL) {
671            iterator->tree_cursor_prev = iterator->tree_cursor;
672            iterator->tree_cursor = wal_itr_next(iterator->wal_itr);
673        } // else Don't move - seek()/ init() has already positioned cursor
674        snap_item = iterator->tree_cursor;
675        if (!snap_item) {
676            if (hr == HBTRIE_RESULT_SUCCESS) {
677                break;
678            } else { // no more keys in WAL or main index
679                return FDB_RESULT_ITERATOR_FAIL;
680            }
681        }
682        // Compare key[WAL] with key[hb-trie]
683        if (hr == HBTRIE_RESULT_SUCCESS) {
684            cmp = _fdb_key_cmp(iterator, snap_item->header->key,
685                               snap_item->header->keylen,
686                               key, keylen);
687        } else {
688            // no more docs in hb-trie
689            cmp = -1;
690        }
691
692        if (cmp <= 0) {
693            // key[WAL] <= key[hb-trie] .. take key[WAL] first
694            uint8_t drop_logical_deletes =
695                (snap_item->action == WAL_ACT_LOGICAL_REMOVE) &&
696                (iterator->opt & FDB_ITR_NO_DELETES);
697            iterator->status = FDB_ITR_WAL;
698            if (cmp < 0) {
699                if (snap_item->action == WAL_ACT_REMOVE ||
700                    drop_logical_deletes) {
701                    // this key is removed .. get next key[WAL]
702                    continue;
703                }
704            } else { // Same key from trie also found from WAL
705                iterator->_offset = BLK_NOT_FOUND; // drop key from trie
706                if (snap_item->action == WAL_ACT_REMOVE || drop_logical_deletes) {
707                    // the key is removed .. start over again
708                    goto start;
709                }
710            }
711            key = snap_item->header->key;
712            keylen = snap_item->header->keylen;
713            // key[hb-trie] is stashed in iterator->key for next call
714            offset = snap_item->offset;
715        }
716        break;
717    }
718
719    if (offset == iterator->_offset) {
720        // take key[hb-trie] & and fetch the next key[hb-trie] at next turn
721        iterator->_offset = BLK_NOT_FOUND;
722        iterator->status = FDB_ITR_IDX;
723    }
724
725    if (iterator->start_key) {
726        cmp = _fdb_key_cmp(iterator, iterator->start_key,
727                           iterator->start_keylen, key, keylen);
728
729        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MIN_KEY) || cmp > 0) {
730            // If user wishes to skip start key, redo first step
731            // OR current key (KEY) is lexicographically smaller than START_KEY
732            goto start;
733        }
734    }
735
736    if (iterator->end_key) {
737        cmp = _fdb_key_cmp(iterator, iterator->end_key, iterator->end_keylen,
738                           key, keylen);
739        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MAX_KEY) || cmp < 0) {
740            // current key (KEY) is lexicographically greater than END_KEY
741            // OR it is the end_key and user wishes to skip it
742            // terminate the iteration
743            return FDB_RESULT_ITERATOR_FAIL;
744        }
745    }
746
747    iterator->_dhandle = dhandle; // store for fdb_iterator_get()
748    iterator->_get_offset = offset; // store for fdb_iterator_get()
749
750    return FDB_RESULT_SUCCESS;
751}
752
753static
754bool _validate_range_limits(fdb_iterator *iterator,
755                            void *ret_key,
756                            const size_t ret_keylen)
757{
758    int cmp;
759    if (iterator->end_key) {
760        cmp = _fdb_key_cmp(iterator, ret_key, ret_keylen,
761                iterator->end_key, iterator->end_keylen);
762        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MAX_KEY) ||
763                cmp > 0) { // greater than end_key OR at skipped MAX_KEY
764            return false;
765        }
766    }
767    if (iterator->start_key) {
768        cmp = _fdb_key_cmp(iterator, ret_key, ret_keylen,
769                iterator->start_key, iterator->start_keylen);
770        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MIN_KEY) ||
771                cmp < 0) { // smaller than start_key OR at skipped MIN_KEY
772            return false;
773        }
774    }
775    return true;
776}
777
778static fdb_status _fdb_iterator_seek(fdb_iterator *iterator,
779                                     const void *seek_key,
780                                     const size_t seek_keylen,
781                                     const fdb_iterator_seek_opt_t seek_pref,
782                                     bool seek_min_max)
783{
784    if (!iterator || !iterator->handle) {
785        return FDB_RESULT_INVALID_HANDLE;
786    }
787
788    int cmp, cmp2; // intermediate results of comparison
789    int next_op = 0; // 0: none, -1: prev(), 1: next();
790    int size_chunk = iterator->handle->config.chunksize;
791    uint8_t *seek_key_kv;
792    int64_t _offset;
793    size_t seek_keylen_kv;
794    bool skip_wal = false, fetch_next = true, fetch_wal = true;
795    hbtrie_result hr = HBTRIE_RESULT_SUCCESS;
796    struct wal_item *snap_item = NULL, query;
797    struct wal_item_header query_header;
798    struct docio_object _doc;
799    fdb_status ret;
800    LATENCY_STAT_START();
801
802    iterator->_dhandle = NULL; // setup for get() to return FAIL
803
804    if (!seek_key || !iterator->_key ||
805        seek_keylen > FDB_MAX_KEYLEN ||
806        (iterator->handle->kvs_config.custom_cmp &&
807         seek_keylen > iterator->handle->config.blocksize - HBTRIE_HEADROOM)) {
808        return FDB_RESULT_INVALID_ARGS;
809    }
810
811    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
812        return FDB_RESULT_HANDLE_BUSY;
813    }
814
815    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_moves,
816                         std::memory_order_relaxed);
817
818    if (iterator->handle->kvs) {
819        seek_keylen_kv = seek_keylen + size_chunk;
820        seek_key_kv = alca(uint8_t, seek_keylen_kv);
821        kvid2buf(size_chunk, iterator->handle->kvs->id, seek_key_kv);
822        memcpy(seek_key_kv + size_chunk, seek_key, seek_keylen);
823    } else {
824        seek_keylen_kv = seek_keylen;
825        seek_key_kv = (uint8_t*)seek_key;
826    }
827
828    // disable seeking beyond the end key...
829    if (iterator->end_key) {
830        cmp = _fdb_key_cmp(iterator, (void *)iterator->end_key,
831                                    iterator->end_keylen,
832                                    (void *)seek_key_kv, seek_keylen_kv);
833        if (cmp == 0 && iterator->opt & FDB_ITR_SKIP_MAX_KEY) {
834            // seek the end key at this time,
835            // and call prev() next iff caller is seek_to_max()
836            if (seek_min_max) {
837                next_op = -1;
838            }
839        }
840        if (cmp < 0) {
841            atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
842            return FDB_RESULT_ITERATOR_FAIL;
843        }
844    }
845
846    // disable seeking beyond the start key...
847    if (iterator->start_key) {
848        cmp = _fdb_key_cmp(iterator,
849                                  (void *)iterator->start_key,
850                                  iterator->start_keylen,
851                                  (void *)seek_key_kv, seek_keylen_kv);
852        if (cmp == 0 && iterator->opt & FDB_ITR_SKIP_MIN_KEY) {
853            // seek the start key at this time,
854            // and call next() next iff caller is seek_to_min()
855            if (seek_min_max) {
856                next_op = 1;
857            }
858        }
859        if (cmp > 0) {
860            atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
861            return FDB_RESULT_ITERATOR_FAIL;
862        }
863    }
864
865    iterator->direction = FDB_ITR_FORWARD;
866
867    // reset HB+trie's iterator
868    hbtrie_iterator_free(iterator->hbtrie_iterator);
869    if (hbtrie_iterator_init(iterator->handle->trie, iterator->hbtrie_iterator,
870                         seek_key_kv, seek_keylen_kv)
871        == HBTRIE_CORRUPTED_RECOVERING_ERR){
872        _fdb_invalidate_dbheader(iterator->handle);
873        return FDB_RECOVERABLE_ERR;
874    }
875
876fetch_hbtrie:
877    if (seek_pref == FDB_ITR_SEEK_HIGHER) {
878        // fetch next key
879        hr = hbtrie_next(iterator->hbtrie_iterator, iterator->_key,
880                         &iterator->_keylen, (void*)&iterator->_offset);
881        if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
882            _fdb_invalidate_dbheader(iterator->handle);
883            return FDB_RECOVERABLE_ERR;
884        }
885        btreeblk_end(iterator->handle->bhandle);
886
887        if (hr == HBTRIE_RESULT_SUCCESS) {
888            cmp = _fdb_key_cmp(iterator,
889                               iterator->_key, iterator->_keylen,
890                               seek_key_kv, seek_keylen_kv);
891            if (cmp < 0) {
892                // key[HB+trie] < seek_key .. move forward
893                hr = hbtrie_next(iterator->hbtrie_iterator, iterator->_key,
894                                 &iterator->_keylen, (void*)&iterator->_offset);
895                if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
896                    _fdb_invalidate_dbheader(iterator->handle);
897                    return FDB_RECOVERABLE_ERR;
898                }
899                btreeblk_end(iterator->handle->bhandle);
900            }
901            iterator->_offset = _endian_decode(iterator->_offset);
902
903            while (iterator->opt & FDB_ITR_NO_DELETES &&
904                   hr == HBTRIE_RESULT_SUCCESS        &&
905                   fetch_next) {
906                fetch_next = false;
907                memset(&_doc, 0x0, sizeof(struct docio_object));
908                _offset = docio_read_doc_key_meta(iterator->handle->dhandle,
909                                                  iterator->_offset, &_doc,
910                                                  true);
911                if (_offset <= 0) { // read fail
912                    fetch_next = true; // get next
913                } else if (_doc.length.flag & DOCIO_DELETED) { // deleted doc
914                    free(_doc.key);
915                    free(_doc.meta);
916                    fetch_next = true; // get next
917                } else {
918                    free(_doc.key);
919                    free(_doc.meta);
920                }
921                if (fetch_next) {
922                    hr = hbtrie_next(iterator->hbtrie_iterator, iterator->_key,
923                                     &iterator->_keylen,
924                                     (void*)&iterator->_offset);
925                    if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
926                        _fdb_invalidate_dbheader(iterator->handle);
927                        return FDB_RECOVERABLE_ERR;
928                    }
929                    btreeblk_end(iterator->handle->bhandle);
930                    iterator->_offset = _endian_decode(iterator->_offset);
931                }
932            }
933        }
934    } else {
935        // fetch prev key
936        hr = hbtrie_prev(iterator->hbtrie_iterator, iterator->_key,
937                         &iterator->_keylen, (void*)&iterator->_offset);
938        if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
939            _fdb_invalidate_dbheader(iterator->handle);
940            return FDB_RECOVERABLE_ERR;
941        }
942
943        btreeblk_end(iterator->handle->bhandle);
944        if (hr == HBTRIE_RESULT_SUCCESS) {
945            cmp = _fdb_key_cmp(iterator,
946                               iterator->_key, iterator->_keylen,
947                               seek_key_kv, seek_keylen_kv);
948            if (cmp > 0) {
949                // key[HB+trie] > seek_key .. move backward
950                hr = hbtrie_prev(iterator->hbtrie_iterator, iterator->_key,
951                                 &iterator->_keylen, (void*)&iterator->_offset);
952                if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
953                    _fdb_invalidate_dbheader(iterator->handle);
954                    return FDB_RECOVERABLE_ERR;
955                }
956
957                btreeblk_end(iterator->handle->bhandle);
958            }
959            iterator->_offset = _endian_decode(iterator->_offset);
960
961            while (iterator->opt & FDB_ITR_NO_DELETES &&
962                   hr == HBTRIE_RESULT_SUCCESS        &&
963                   fetch_next) {
964                fetch_next = false;
965                memset(&_doc, 0x0, sizeof(struct docio_object));
966                _offset = docio_read_doc_key_meta(iterator->handle->dhandle,
967                                                  iterator->_offset, &_doc,
968                                                  true);
969                if (_offset <= 0) { // read fail
970                    fetch_next = true; // get prev
971                } else if (_doc.length.flag & DOCIO_DELETED) { // deleted doc
972                    free(_doc.key);
973                    free(_doc.meta);
974                    fetch_next = true; // get prev
975                } else {
976                    free(_doc.key);
977                    free(_doc.meta);
978                }
979                if (fetch_next) {
980                    hr = hbtrie_prev(iterator->hbtrie_iterator, iterator->_key,
981                                     &iterator->_keylen,
982                                     (void*)&iterator->_offset);
983                    if (hr ==  HBTRIE_CORRUPTED_RECOVERING_ERR){
984                        _fdb_invalidate_dbheader(iterator->handle);
985                        return FDB_RECOVERABLE_ERR;
986                    }
987
988                    btreeblk_end(iterator->handle->bhandle);
989                    iterator->_offset = _endian_decode(iterator->_offset);
990                }
991            }
992        }
993    }
994
995    if (hr == HBTRIE_RESULT_SUCCESS && // Validate iteration range limits..
996        !next_op) { // only if caller is not seek_to_max/min (handled later)
997        if (!_validate_range_limits(iterator, iterator->_key, iterator->_keylen)) {
998            hr = HBTRIE_RESULT_FAIL;
999        }
1000    }
1001
1002    if (iterator->handle->kvs) {
1003        fdb_kvs_id_t kv_id;
1004        buf2kvid(size_chunk, iterator->_key, &kv_id);
1005        if (iterator->handle->kvs->id != kv_id) {
1006            // seek is done beyond the KV ID
1007            hr = HBTRIE_RESULT_FAIL;
1008        }
1009    }
1010    if (hr == HBTRIE_RESULT_SUCCESS) {
1011        iterator->_get_offset = iterator->_offset;
1012        iterator->_dhandle = iterator->handle->dhandle;
1013    } else {
1014        // larger than the largest key or smaller than the smallest key
1015        iterator->_get_offset = BLK_NOT_FOUND;
1016        iterator->_dhandle = NULL;
1017    }
1018
1019    // HB+trie's iterator should fetch another entry next time
1020    iterator->_offset = BLK_NOT_FOUND;
1021    iterator->status = FDB_ITR_IDX;
1022
1023    // retrieve avl-tree
1024    query.header = &query_header;
1025    query_header.key = seek_key_kv;
1026    query_header.keylen = seek_keylen_kv;
1027
1028    if (seek_pref == FDB_ITR_SEEK_HIGHER) {
1029        if (fetch_wal) {
1030            iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
1031                                                           &query);
1032            iterator->direction = FDB_ITR_FORWARD;
1033        }
1034        if (iterator->tree_cursor) {
1035            // skip deleted WAL entry
1036            do {
1037                if (!next_op && // only validate range if not skip max/min key mode
1038                    !_validate_range_limits(iterator,
1039                                            iterator->tree_cursor->header->key,
1040                                            iterator->tree_cursor->header->keylen)) {
1041                    iterator->tree_cursor = NULL;
1042                    break;
1043                }
1044                snap_item = iterator->tree_cursor;
1045                if ((snap_item->action == WAL_ACT_LOGICAL_REMOVE && // skip
1046                    iterator->opt & FDB_ITR_NO_DELETES) || //logical delete OR
1047                    snap_item->action == WAL_ACT_REMOVE) { // immediate purge
1048                    if (iterator->_dhandle) {
1049                        cmp = _fdb_key_cmp(iterator,
1050                                           snap_item->header->key,
1051                                           snap_item->header->keylen,
1052                                           iterator->_key, iterator->_keylen);
1053                        if (cmp == 0) {
1054                            // same doc exists in HB+trie
1055                            // move tree cursor
1056                            iterator->tree_cursor = wal_itr_next(
1057                                                     iterator->wal_itr);
1058                            // do not move tree cursor next time
1059                            fetch_wal = false;
1060                            // fetch next key[HB+trie]
1061                            goto fetch_hbtrie;
1062                        } else if (cmp > 0) {
1063                            break;
1064                        }
1065                    }
1066                    iterator->tree_cursor = wal_itr_next(iterator->wal_itr);
1067                    continue;
1068                } else if (iterator->end_key &&
1069                           iterator->opt & FDB_ITR_SKIP_MAX_KEY) {
1070                    cmp = _fdb_key_cmp(iterator,
1071                                       iterator->end_key, iterator->end_keylen,
1072                                       snap_item->header->key,
1073                                       snap_item->header->keylen);
1074                    if (cmp == 0 ||
1075                        // WAL cursor is positioned exactly at seeked end key
1076                        // but iterator must skip the end key!
1077                        // If hb+trie has an item, use that else return FAIL
1078                        (cmp < 0 &&// WAL key out of range...
1079                        !next_op)) { // Not called from fdb_iterator_seek_to_max
1080                        skip_wal = true;
1081                        iterator->status = FDB_ITR_WAL;
1082                    }
1083                }
1084                break;
1085            } while(iterator->tree_cursor);
1086        }
1087        iterator->tree_cursor_prev = iterator->tree_cursor;
1088        if (!iterator->tree_cursor) {
1089            // seek_key is larger than the largest key
1090            // set prev key to the largest key.
1091            // if prev operation is called next, tree_cursor will be set to
1092            // tree_cursor_prev.
1093            iterator->tree_cursor_prev = wal_itr_search_smaller(iterator->wal_itr,
1094                                                                &query);
1095            skip_wal = true;
1096        }
1097    } else if (seek_pref == FDB_ITR_SEEK_LOWER) {
1098        if (fetch_wal) {
1099            iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
1100                                                           &query);
1101            iterator->direction = FDB_ITR_REVERSE;
1102        }
1103        if (iterator->tree_cursor) {
1104            // skip deleted WAL entry
1105            do {
1106                if (!next_op && // only validate range if not skip max/min key mode
1107                    !_validate_range_limits(iterator,
1108                                            iterator->tree_cursor->header->key,
1109                                            iterator->tree_cursor->header->keylen)) {
1110                    iterator->tree_cursor = NULL;
1111                    break;
1112                }
1113                snap_item = iterator->tree_cursor;
1114                if ((snap_item->action == WAL_ACT_LOGICAL_REMOVE && // skip
1115                     iterator->opt & FDB_ITR_NO_DELETES) || //logical delete OR
1116                     snap_item->action == WAL_ACT_REMOVE) { //immediate purge
1117                    if (iterator->_dhandle) {
1118                        cmp = _fdb_key_cmp(iterator,
1119                                           snap_item->header->key,
1120                                           snap_item->header->keylen,
1121                                           iterator->_key, iterator->_keylen);
1122                        if (cmp == 0) {
1123                            // same doc exists in HB+trie
1124                            // move tree cursor
1125                            iterator->tree_cursor = wal_itr_prev(iterator->
1126                                                                 wal_itr);
1127                            // do not move tree cursor next time
1128                            fetch_wal = false;
1129                            // fetch next key[HB+trie]
1130                            goto fetch_hbtrie;
1131                        } else if (cmp < 0) {
1132                            break;
1133                        }
1134                    }
1135                    iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
1136                    continue;
1137                } else if (iterator->start_key &&
1138                           iterator->opt & FDB_ITR_SKIP_MIN_KEY) {
1139                    cmp = _fdb_key_cmp(iterator,
1140                                  snap_item->header->key,
1141                                  snap_item->header->keylen,
1142                                  iterator->start_key, iterator->start_keylen);
1143                    if (cmp == 0 ||
1144                        // WAL cursor is positioned exactly at seeked start key
1145                        // but iterator must skip the start key!
1146                        // If hb+trie has an item, use that else return FAIL
1147                        (cmp < 0 && // WAL key out of range and
1148                        !next_op)) { // Not called from fdb_iterator_seek_to_min
1149                        skip_wal = true;
1150                        iterator->status = FDB_ITR_WAL;
1151                    }
1152                }
1153                break;
1154            } while(iterator->tree_cursor);
1155        }
1156        iterator->tree_cursor_prev = iterator->tree_cursor;
1157        if (!iterator->tree_cursor) {
1158            // seek_key is smaller than the smallest key
1159            // Only allow fdb_iterator_next() call, fdb_iterator_prev() should
1160            // hit failure. To ensure this set the direction to as if
1161            // fdb_iterator_prev call has gone past the smallest key...
1162            iterator->tree_cursor_prev = wal_itr_search_greater(iterator->wal_itr,
1163                                                                &query);
1164            // since the current key[WAL] is smaller than seek_key,
1165            // skip key[WAL] this time
1166            skip_wal = true;
1167        }
1168    }
1169
1170    if (iterator->tree_cursor && !skip_wal) {
1171        bool take_wal = false;
1172        bool discard_hbtrie = false;
1173
1174        snap_item = iterator->tree_cursor;
1175
1176        if (hr == HBTRIE_RESULT_SUCCESS) {
1177            cmp = _fdb_key_cmp(iterator,
1178                               snap_item->header->key, snap_item->header->keylen,
1179                               iterator->_key, iterator->_keylen);
1180
1181            if (cmp == 0) {
1182                // same key exists in both HB+trie and WAL
1183                fdb_assert(snap_item->action != WAL_ACT_REMOVE,
1184                           snap_item->action, iterator->_offset);
1185                take_wal = true;
1186                discard_hbtrie = true;
1187            } else if (cmp < 0) { // key[WAL] < key[HB+trie]
1188                if (seek_pref == FDB_ITR_SEEK_HIGHER) {
1189                    // higher mode .. take smaller one (key[WAL]) first
1190                    take_wal = true;
1191                    discard_hbtrie = false;
1192                } else if (seek_pref == FDB_ITR_SEEK_LOWER) {
1193                    take_wal = false;
1194                    discard_hbtrie = false;
1195                    // In seek_to_max call with skip_max_key option,
1196                    if (next_op < 0) {
1197                        // if key[HB+trie] is the largest key
1198                        // smaller than max key,
1199                        // do not call prev() next.
1200                        if (iterator->end_key) {
1201                            cmp2 = _fdb_key_cmp(iterator,
1202                                                iterator->_key,
1203                                                iterator->_keylen,
1204                                                iterator->end_key,
1205                                                iterator->end_keylen);
1206                        } else {
1207                            cmp2 = -1;
1208                        }
1209                        if (cmp2 < 0) {
1210                            next_op = 0;
1211                        }
1212                    }
1213                }
1214            } else { // key[HB+trie] < key[WAL]
1215                if (seek_pref == FDB_ITR_SEEK_HIGHER) {
1216                    // higher mode .. take smaller one (key[HB+trie]) first
1217                    take_wal = false;
1218                    discard_hbtrie = false;
1219                    // In seek_to_min call with skip_min_key option,
1220                    if (next_op > 0) {
1221                        // if key[HB+trie] is the smallest key
1222                        // larger than min key,
1223                        // do not call next() next.
1224                        if (iterator->start_key) {
1225                            cmp2 = _fdb_key_cmp(iterator,
1226                                                iterator->start_key,
1227                                                iterator->start_keylen,
1228                                                iterator->_key,
1229                                                iterator->_keylen);
1230                        } else {
1231                            cmp2 = -1;
1232                        }
1233                        if (cmp2 < 0) {
1234                            next_op = 0;
1235                        }
1236                    }
1237                } else if (seek_pref == FDB_ITR_SEEK_LOWER) {
1238                    // lower mode .. discard smaller one (key[HB+trie])
1239                    take_wal = true;
1240                    discard_hbtrie = true;
1241                    // reset HB+trie's iterator to get the current
1242                    // key[HB+trie] one more time
1243                    hbtrie_iterator_free(iterator->hbtrie_iterator);
1244                    if (hbtrie_iterator_init(iterator->handle->trie,
1245                                             iterator->hbtrie_iterator,
1246                                             seek_key_kv, seek_keylen_kv)
1247                        == HBTRIE_CORRUPTED_RECOVERING_ERR){
1248                        _fdb_invalidate_dbheader(iterator->handle);
1249                        return FDB_RECOVERABLE_ERR;
1250                    }
1251                    iterator->_offset = BLK_NOT_FOUND;
1252                }
1253            }
1254        } else {
1255            // HB+trie seek fail (key[HB+trie] doesn't exist)
1256            take_wal = true;
1257            discard_hbtrie = true;
1258        }
1259
1260        if (take_wal) { // take key[WAL]
1261            if (!discard_hbtrie) { // do not skip the current key[HB+trie]
1262                // key[HB+trie] will be returned next time
1263                iterator->_offset = iterator->_get_offset;
1264            }
1265            iterator->_get_offset = snap_item->offset;
1266            iterator->_dhandle = iterator->handle->dhandle;
1267            iterator->status = FDB_ITR_WAL;
1268        }
1269    }
1270
1271    if (!iterator->_dhandle) {
1272        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1273        return FDB_RESULT_ITERATOR_FAIL;
1274    }
1275
1276    if (next_op < 0) {
1277        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1278        ret = fdb_iterator_prev(iterator);
1279    } else if (next_op > 0) {
1280        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1281        ret = fdb_iterator_next(iterator);
1282    } else {
1283        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1284        ret = FDB_RESULT_SUCCESS;
1285    }
1286
1287    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_SEEK);
1288    return ret;
1289}
1290
1291LIBFDB_API
1292fdb_status fdb_iterator_seek(fdb_iterator *iterator,
1293                             const void *seek_key,
1294                             const size_t seek_keylen,
1295                             const fdb_iterator_seek_opt_t seek_pref) {
1296    return _fdb_iterator_seek(iterator, seek_key, seek_keylen, seek_pref,
1297                              false /*not seek_to_min() or seek_to_max()*/);
1298}
1299
1300LIBFDB_API
1301fdb_status fdb_iterator_seek_to_min(fdb_iterator *iterator)
1302{
1303    if (!iterator || !iterator->handle) {
1304        return FDB_RESULT_INVALID_HANDLE;
1305    }
1306
1307    if (!iterator->_key) {
1308        return FDB_RESULT_INVALID_ARGS;
1309    }
1310
1311    size_t size_chunk = iterator->handle->config.chunksize;
1312    fdb_status ret;
1313    LATENCY_STAT_START();
1314
1315    // Initialize direction iteration to FORWARD just in case this function was
1316    // called right after fdb_iterator_init() so the cursor gets positioned
1317    // correctly
1318    iterator->direction = FDB_ITR_FORWARD;
1319    if (iterator->start_keylen > size_chunk) {
1320        fdb_iterator_seek_opt_t dir = (iterator->opt & FDB_ITR_SKIP_MIN_KEY) ?
1321                                      FDB_ITR_SEEK_HIGHER : FDB_ITR_SEEK_LOWER;
1322        fdb_status status = _fdb_iterator_seek(iterator,
1323                (uint8_t *)iterator->start_key + size_chunk,
1324                iterator->start_keylen - size_chunk,
1325                dir, true);// not regular seek
1326
1327        if (status != FDB_RESULT_SUCCESS && dir == FDB_ITR_SEEK_LOWER) {
1328            dir = FDB_ITR_SEEK_HIGHER;
1329            // It is possible that the min key specified during init does not
1330            // exist, so retry the seek with the HIGHER key
1331            return _fdb_iterator_seek(iterator,
1332                (uint8_t *)iterator->start_key + size_chunk,
1333                iterator->start_keylen - size_chunk,
1334                dir, true);// not regular seek
1335        }
1336        return status;
1337    }
1338
1339    // reset HB+trie iterator using start key
1340    hbtrie_iterator_free(iterator->hbtrie_iterator);
1341    if (hbtrie_iterator_init(iterator->handle->trie, iterator->hbtrie_iterator,
1342                         iterator->start_key, iterator->start_keylen)
1343        == HBTRIE_CORRUPTED_RECOVERING_ERR){
1344        _fdb_invalidate_dbheader(iterator->handle);
1345        return FDB_RECOVERABLE_ERR;
1346    }
1347
1348    // reset WAL tree cursor using search because of the sharded nature of WAL
1349    if (iterator->tree_cursor_start) {
1350        iterator->tree_cursor_prev = iterator->tree_cursor =
1351                                     wal_itr_search_greater(iterator->wal_itr,
1352                                     iterator->tree_cursor_start);
1353        iterator->status = FDB_ITR_IDX; // WAL is already set
1354    }
1355
1356    ret = fdb_iterator_next(iterator);
1357    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_SEEK_MIN);
1358    return ret;
1359}
1360
1361fdb_status _fdb_iterator_seek_to_max_key(fdb_iterator *iterator) {
1362    int cmp;
1363
1364    if (!iterator || !iterator->handle) {
1365        return FDB_RESULT_INVALID_HANDLE;
1366    }
1367
1368    if (!iterator->_key) {
1369        return FDB_RESULT_INVALID_ARGS;
1370    }
1371
1372    size_t size_chunk = iterator->handle->config.chunksize;
1373
1374    // Initialize direction iteration to FORWARD just in case this function was
1375    // called right after fdb_iterator_init() so the cursor gets positioned
1376    // correctly
1377    iterator->direction = FDB_ITR_FORWARD;
1378    if (iterator->end_keylen > size_chunk) {
1379        fdb_iterator_seek_opt_t dir = (iterator->opt & FDB_ITR_SKIP_MAX_KEY) ?
1380                                      FDB_ITR_SEEK_LOWER : FDB_ITR_SEEK_HIGHER;
1381        fdb_status status = _fdb_iterator_seek(iterator,
1382                (uint8_t *)iterator->end_key + size_chunk,
1383                iterator->end_keylen - size_chunk,
1384                dir, true);// not regular seek
1385
1386        if (status != FDB_RESULT_SUCCESS && dir == FDB_ITR_SEEK_HIGHER) {
1387            dir = FDB_ITR_SEEK_LOWER;
1388            // It is possible that the max key specified during init does not
1389            // exist, so retry the seek with the LOWER key
1390            return _fdb_iterator_seek(iterator,
1391                    (uint8_t *)iterator->end_key + size_chunk,
1392                    iterator->end_keylen - size_chunk,
1393                    dir, true);// not regular seek
1394        }
1395        return status;
1396    }
1397    iterator->direction = FDB_ITR_REVERSE; // only reverse iteration possible
1398
1399    if (iterator->end_key && iterator->end_keylen == size_chunk) {
1400        // end_key exists but end_keylen == size_id
1401        // it means that user doesn't assign end_key but
1402        // end_key is automatically assigned due to multi KVS mode.
1403
1404        // reset HB+trie's iterator using end_key
1405        hbtrie_iterator_free(iterator->hbtrie_iterator);
1406        hbtrie_iterator_init(iterator->handle->trie, iterator->hbtrie_iterator,
1407                             iterator->end_key, iterator->end_keylen);
1408        // get first key
1409        if (hbtrie_prev(iterator->hbtrie_iterator, iterator->_key,
1410                        &iterator->_keylen, (void*)&iterator->_offset) ==
1411            HBTRIE_CORRUPTED_RECOVERING_ERR){
1412            _fdb_invalidate_dbheader(iterator->handle);
1413            return FDB_RECOVERABLE_ERR;;
1414        }
1415        iterator->_offset = _endian_decode(iterator->_offset);
1416        cmp = _fdb_key_cmp(iterator,
1417                               iterator->end_key, iterator->end_keylen,
1418                               iterator->_key, iterator->_keylen);
1419        if (cmp < 0) {
1420            // returned key is larger than the end key .. skip
1421            iterator->_offset = BLK_NOT_FOUND;
1422        }
1423    } else {
1424        // move HB+trie iterator's cursor to the last entry
1425        hbtrie_last(iterator->hbtrie_iterator);
1426    }
1427
1428    // also move WAL tree's cursor to the last entry
1429    struct wal_item_header hdr;
1430    struct wal_item query;
1431    query.header = &hdr;
1432    hdr.key = iterator->end_key;
1433    hdr.keylen = iterator->end_keylen;
1434    iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
1435                                                   &query);
1436    iterator->tree_cursor_prev = iterator->tree_cursor;
1437    iterator->status = FDB_ITR_IDX;
1438
1439    return fdb_iterator_prev(iterator);
1440}
1441
1442fdb_status _fdb_iterator_seek_to_max_seq(fdb_iterator *iterator) {
1443    if (!iterator || !iterator->handle) {
1444        return FDB_RESULT_INVALID_HANDLE;
1445    }
1446
1447    iterator->direction = FDB_ITR_REVERSE; // only reverse iteration possible
1448    iterator->_seqnum = iterator->end_seqnum;
1449
1450    if (iterator->handle->kvs) {
1451        // create an iterator handle for hb-trie
1452        uint8_t *end_seq_kv = alca(uint8_t, sizeof(size_t)*2);
1453        fdb_kvs_id_t _kv_id = _endian_encode(iterator->handle->kvs->id);
1454        memcpy(end_seq_kv, &_kv_id, sizeof(size_t));
1455        memcpy(end_seq_kv + sizeof(size_t), &iterator->end_seqnum,
1456                sizeof(size_t));
1457
1458        // reset HB+trie's seqtrie iterator using end_seq_kv
1459        hbtrie_iterator_free(iterator->seqtrie_iterator);
1460        if (hbtrie_iterator_init(iterator->handle->seqtrie,
1461                                 iterator->seqtrie_iterator,
1462                                 end_seq_kv, sizeof(size_t)*2)
1463            == HBTRIE_CORRUPTED_RECOVERING_ERR){
1464            _fdb_invalidate_dbheader(iterator->handle);
1465            return FDB_RECOVERABLE_ERR;
1466        }
1467    } else {
1468        // reset Btree iterator to end_seqnum
1469        btree_iterator_free(iterator->seqtree_iterator);
1470        // create an iterator handle for b-tree
1471        btree_iterator_init(iterator->handle->seqtree,
1472                            iterator->seqtree_iterator,
1473                            (void *)(&iterator->end_seqnum));
1474    }
1475
1476    if (iterator->end_seqnum != SEQNUM_NOT_USED) {
1477        struct wal_item query;
1478        struct wal_item_header query_key;
1479        size_t size_seq = sizeof(fdb_seqnum_t);
1480        size_t size_chunk = iterator->handle->config.chunksize;
1481        uint8_t *end_seq_kv = alca(uint8_t, size_chunk + size_seq);
1482        if (iterator->handle->kvs) {
1483            query_key.key = end_seq_kv;
1484            kvid2buf(size_chunk, iterator->handle->kvs->id, end_seq_kv);
1485            memcpy(end_seq_kv + size_chunk, &iterator->end_seqnum, size_seq);
1486            query_key.keylen = size_chunk + size_seq;
1487        } else {
1488            query_key.key = (void *) NULL;
1489            query_key.keylen = 0;
1490        }
1491        query.header = &query_key;
1492        query.seqnum = iterator->end_seqnum;
1493
1494        // reset WAL tree cursor using search because of the sharded WAL
1495        iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
1496                                                       &query);
1497    } else { // no end_seqnum specified, just head to the last entry
1498        iterator->tree_cursor = wal_itr_last(iterator->wal_itr);
1499    }
1500
1501    if (iterator->tree_cursor) {
1502        struct wal_item *snap_item = iterator->tree_cursor;
1503        if (snap_item->seqnum == iterator->end_seqnum &&
1504            iterator->opt & FDB_ITR_SKIP_MAX_KEY) {
1505            iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
1506        }
1507    }
1508
1509    if (iterator->tree_cursor) {
1510        // If WAL tree has an entry, skip Main index for reverse iteration..
1511        iterator->_offset = iterator->tree_cursor->offset;
1512    } else {
1513        iterator->_offset = BLK_NOT_FOUND; // fetch from main index
1514    }
1515
1516    iterator->tree_cursor_prev = iterator->tree_cursor;
1517    return fdb_iterator_prev(iterator);
1518}
1519
1520LIBFDB_API
1521fdb_status fdb_iterator_seek_to_max(fdb_iterator *iterator)
1522{
1523    if (!iterator || !iterator->handle) {
1524        return FDB_RESULT_INVALID_HANDLE;
1525    }
1526
1527    fdb_status ret;
1528    LATENCY_STAT_START();
1529
1530    if (!iterator->hbtrie_iterator) {
1531        ret = _fdb_iterator_seek_to_max_seq(iterator);
1532    } else {
1533        ret = _fdb_iterator_seek_to_max_key(iterator);
1534    }
1535    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_SEEK_MAX);
1536    return ret;
1537}
1538
1539static fdb_status _fdb_iterator_seq_prev(fdb_iterator *iterator)
1540{
1541    size_t size_id, size_seq, seq_kv_len;
1542    uint8_t *seq_kv;
1543    uint64_t offset = BLK_NOT_FOUND;
1544    btree_result br = BTREE_RESULT_FAIL;
1545    hbtrie_result hr;
1546    struct docio_object _doc;
1547    struct docio_handle *dhandle;
1548    struct wal_item *snap_item = NULL;
1549    fdb_seqnum_t seqnum;
1550    fdb_kvs_id_t kv_id;
1551
1552    size_id = sizeof(fdb_kvs_id_t);
1553    size_seq = sizeof(fdb_seqnum_t);
1554    seq_kv = alca(uint8_t, size_id + size_seq);
1555
1556    if (iterator->direction != FDB_ITR_REVERSE) {
1557        if (iterator->status == FDB_ITR_IDX) {
1558            iterator->_offset = BLK_NOT_FOUND; // need to re-examine Trie/trees
1559        }
1560        // re-position WAL key to previous key returned using search because of
1561        // sharded nature of wal (we cannot directly assign prev to cursor)
1562        if (iterator->tree_cursor_prev &&
1563            iterator->tree_cursor != iterator->tree_cursor_prev) {
1564            iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
1565                                                  iterator->tree_cursor_prev);
1566            iterator->status = FDB_ITR_IDX;
1567        } // else Don't move - seek()/init() has already positioned cursor
1568    }
1569
1570start_seq:
1571    seqnum = iterator->_seqnum;
1572    dhandle = iterator->handle->dhandle;
1573
1574    if (iterator->_offset == BLK_NOT_FOUND || // was iterating over btree
1575        !iterator->tree_cursor) { // WAL exhausted
1576        if (iterator->handle->kvs) { // multi KV instance mode
1577            hr = hbtrie_prev(iterator->seqtrie_iterator, seq_kv, &seq_kv_len,
1578                             (void *)&offset);
1579            if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
1580                _fdb_invalidate_dbheader(iterator->handle);
1581                return FDB_RECOVERABLE_ERR;
1582            }
1583
1584            if (hr == HBTRIE_RESULT_SUCCESS) {
1585                br = BTREE_RESULT_SUCCESS;
1586                buf2kvid(size_id, seq_kv, &kv_id);
1587                if (kv_id != iterator->handle->kvs->id) {
1588                    // iterator is beyond the boundary
1589                    br = BTREE_RESULT_FAIL;
1590                }
1591                memcpy(&seqnum, seq_kv + size_id, size_seq);
1592            } else {
1593                br = BTREE_RESULT_FAIL;
1594            }
1595        } else {
1596            br = btree_prev(iterator->seqtree_iterator, &seqnum,
1597                            (void *)&offset);
1598        }
1599        btreeblk_end(iterator->handle->bhandle);
1600        if (br == BTREE_RESULT_SUCCESS) {
1601            seqnum = _endian_decode(seqnum);
1602            iterator->_seqnum = seqnum;
1603            if (seqnum < iterator->start_seqnum) {
1604                return FDB_RESULT_ITERATOR_FAIL;
1605            }
1606            offset = _endian_decode(offset);
1607            iterator->status = FDB_ITR_IDX;
1608        } else {
1609            iterator->_offset = BLK_NOT_FOUND;
1610            // B-tree has no more items
1611            return FDB_RESULT_ITERATOR_FAIL;
1612        }
1613    } else while (iterator->tree_cursor) {
1614        if (iterator->status == FDB_ITR_WAL) {
1615            iterator->tree_cursor_prev = iterator->tree_cursor;
1616            iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
1617            if (!iterator->tree_cursor) {
1618                goto start_seq;
1619            }
1620        }// else don't move - seek()/ init() has already positioned cursor
1621
1622        iterator->status = FDB_ITR_WAL;
1623        // get the current item of avl tree
1624        snap_item = iterator->tree_cursor;
1625        uint8_t drop_logical_deletes =
1626            (snap_item->action == WAL_ACT_LOGICAL_REMOVE) &&
1627            (iterator->opt & FDB_ITR_NO_DELETES);
1628        if (snap_item->action == WAL_ACT_REMOVE ||
1629                drop_logical_deletes) {
1630            if (br == BTREE_RESULT_FAIL && !iterator->tree_cursor) {
1631                return FDB_RESULT_ITERATOR_FAIL;
1632            }
1633            // this key is removed .. get prev key[WAL]
1634            continue;
1635        }
1636
1637        offset = snap_item->offset;
1638        iterator->_offset = offset; // WAL is not exhausted, ignore B-Tree
1639        iterator->_seqnum = snap_item->seqnum;
1640        break;
1641    }
1642
1643    // To prevent returning duplicate items from sequence iterator, only return
1644    // those b-tree items that exist in HB-trie but not WAL
1645    // (WAL items should have already been returned in reverse iteration)
1646    if (br == BTREE_RESULT_SUCCESS) {
1647        fdb_doc doc_kv;
1648        _doc.key = NULL;
1649        _doc.length.keylen = 0;
1650        _doc.meta = NULL;
1651        _doc.body = NULL;
1652
1653        int64_t _offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
1654                                                  true);
1655        if (_offset <= 0) {
1656            return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
1657        }
1658        if (_doc.length.flag & DOCIO_DELETED &&
1659            (iterator->opt & FDB_ITR_NO_DELETES)) {
1660            free(_doc.key);
1661            free(_doc.meta);
1662            return FDB_RESULT_KEY_NOT_FOUND;
1663        }
1664
1665        doc_kv.key = _doc.key;
1666        doc_kv.keylen = _doc.length.keylen;
1667        doc_kv.seqnum = SEQNUM_NOT_USED;
1668        if (wal_find(iterator->handle->shandle->snap_txn,
1669                     iterator->handle->file,
1670                     &iterator->handle->shandle->cmp_info,
1671                     iterator->handle->shandle,
1672                     &doc_kv, (uint64_t *) &_offset) == FDB_RESULT_SUCCESS &&
1673                     iterator->start_seqnum <= doc_kv.seqnum &&
1674                     doc_kv.seqnum <= iterator->end_seqnum) {
1675            free(_doc.key);
1676            free(_doc.meta);
1677            goto start_seq; // B-tree item exists in WAL, skip for now
1678        }
1679        // Also look in HB-Trie to eliminate duplicates
1680        uint64_t hboffset;
1681        struct docio_object _hbdoc;
1682        hr = hbtrie_find(iterator->handle->trie, _doc.key, _doc.length.keylen,
1683                         (void *)&hboffset);
1684        if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
1685            _fdb_invalidate_dbheader(iterator->handle);
1686            return FDB_RECOVERABLE_ERR;
1687        }
1688        btreeblk_end(iterator->handle->bhandle);
1689
1690        if (hr != HBTRIE_RESULT_SUCCESS) {
1691            free(_doc.key);
1692            free(_doc.meta);
1693            goto start_seq;
1694        } else { // If present in HB-trie ensure it's seqnum is in range
1695            int64_t _offset;
1696            _hbdoc.key = _doc.key;
1697            _hbdoc.meta = NULL;
1698            hboffset = _endian_decode(hboffset);
1699            _offset = docio_read_doc_key_meta(iterator->handle->dhandle,
1700                                              hboffset, &_hbdoc, true);
1701            if (_offset <= 0) {
1702                free(_doc.key);
1703                free(_doc.meta);
1704                return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
1705            }
1706
1707            if (_doc.seqnum < _hbdoc.seqnum &&
1708                _hbdoc.seqnum <= iterator->end_seqnum) {
1709                free(_doc.key);
1710                free(_doc.meta);
1711                free(_hbdoc.meta);
1712                goto start_seq;
1713            }
1714            free(_hbdoc.meta);
1715        }
1716        free(_doc.key);
1717        free(_doc.meta);
1718    }
1719
1720    iterator->_dhandle = dhandle; // store for fdb_iterator_get()
1721    iterator->_get_offset = offset; // store for fdb_iterator_get()
1722
1723    return FDB_RESULT_SUCCESS;
1724}
1725
1726static fdb_status _fdb_iterator_seq_next(fdb_iterator *iterator)
1727{
1728    size_t size_id, size_seq, seq_kv_len;
1729    uint8_t *seq_kv;
1730    uint64_t offset = BLK_NOT_FOUND;
1731    btree_result br = BTREE_RESULT_FAIL;
1732    hbtrie_result hr;
1733    struct docio_object _doc;
1734    struct docio_handle *dhandle;
1735    struct wal_item *snap_item = NULL;
1736    fdb_seqnum_t seqnum;
1737    fdb_kvs_id_t kv_id;
1738
1739    size_id = sizeof(fdb_kvs_id_t);
1740    size_seq = sizeof(fdb_seqnum_t);
1741    seq_kv = alca(uint8_t, size_id + size_seq);
1742
1743    if (iterator->direction != FDB_ITR_FORWARD) {
1744        if (iterator->status == FDB_ITR_IDX) {
1745            iterator->_offset = BLK_NOT_FOUND; // need to re-examine Trie/trees
1746        }
1747        // re-position WAL key to previous key returned
1748        if (iterator->tree_cursor_prev) {
1749            iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
1750                                    iterator->tree_cursor_prev);
1751            iterator->status = FDB_ITR_IDX;
1752        } // else Don't move - seek()/init() has already positioned cursor
1753    }
1754
1755start_seq:
1756    seqnum = iterator->_seqnum;
1757    dhandle = iterator->handle->dhandle;
1758
1759    // retrieve from sequence b-tree first
1760    if (iterator->_offset == BLK_NOT_FOUND) {
1761        if (iterator->handle->kvs) { // multi KV instance mode
1762            hr = hbtrie_next(iterator->seqtrie_iterator, seq_kv, &seq_kv_len,
1763                             (void *)&offset);
1764            if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
1765                _fdb_invalidate_dbheader(iterator->handle);
1766                return FDB_RECOVERABLE_ERR;
1767            }
1768            if (hr == HBTRIE_RESULT_SUCCESS) {
1769                br = BTREE_RESULT_SUCCESS;
1770                buf2kvid(size_id, seq_kv, &kv_id);
1771                if (kv_id != iterator->handle->kvs->id) {
1772                    // iterator is beyond the boundary
1773                    br = BTREE_RESULT_FAIL;
1774                }
1775                memcpy(&seqnum, seq_kv + size_id, size_seq);
1776            } else {
1777                br = BTREE_RESULT_FAIL;
1778            }
1779        } else {
1780            br = btree_next(iterator->seqtree_iterator, &seqnum,
1781                            (void *)&offset);
1782        }
1783        btreeblk_end(iterator->handle->bhandle);
1784        if (br == BTREE_RESULT_SUCCESS) {
1785            seqnum = _endian_decode(seqnum);
1786            iterator->_seqnum = seqnum;
1787            if (seqnum > iterator->end_seqnum) {
1788                return FDB_RESULT_ITERATOR_FAIL;
1789            }
1790            offset = _endian_decode(offset);
1791            iterator->_offset = BLK_NOT_FOUND; // continue with B-tree
1792            iterator->status = FDB_ITR_IDX;
1793        }
1794    }
1795
1796    if (br == BTREE_RESULT_FAIL) {
1797        if (iterator->tree_cursor == NULL) {
1798            return FDB_RESULT_ITERATOR_FAIL;
1799        } else {
1800            while (iterator->tree_cursor) {
1801                if (iterator->status == FDB_ITR_WAL) {
1802                    // save the current point for direction change
1803                    iterator->tree_cursor_prev = iterator->tree_cursor;
1804                    iterator->tree_cursor = wal_itr_next(iterator->wal_itr);
1805                    if (!iterator->tree_cursor) {
1806                        return FDB_RESULT_ITERATOR_FAIL;
1807                    }
1808                }// else don't move - seek()/ init() already positioned cursor
1809                // get the current item of WAL tree
1810                iterator->status = FDB_ITR_WAL;
1811                snap_item = iterator->tree_cursor;
1812                uint8_t drop_logical_deletes =
1813                    (snap_item->action == WAL_ACT_LOGICAL_REMOVE) &&
1814                    (iterator->opt & FDB_ITR_NO_DELETES);
1815                if (snap_item->action == WAL_ACT_REMOVE ||
1816                    drop_logical_deletes) {
1817                    if (br == BTREE_RESULT_FAIL && !iterator->tree_cursor) {
1818                        return FDB_RESULT_ITERATOR_FAIL;
1819                    }
1820                    // this key is removed .. get next key[WAL]
1821                    continue;
1822                }
1823                if (snap_item->seqnum < iterator->_seqnum) {
1824                    // smaller than the current seqnum .. get next key[WAL]
1825                    continue;
1826                }
1827                if (snap_item->seqnum > iterator->end_seqnum) {
1828                    // out-of-range .. iterator terminates
1829                    return FDB_RESULT_ITERATOR_FAIL;
1830                }
1831
1832                offset = snap_item->offset;
1833                iterator->_offset = offset; // stops b-tree lookups. favor wal
1834                iterator->_seqnum = snap_item->seqnum;
1835                break;
1836            }
1837        }
1838    }
1839
1840    // To prevent returning duplicate items from sequence iterator, only return
1841    // those b-tree items that exist in HB-trie but not WAL (visit WAL later)
1842    if (br == BTREE_RESULT_SUCCESS) {
1843        _doc.key = NULL;
1844        _doc.length.keylen = 0;
1845        _doc.meta = NULL;
1846        _doc.body = NULL;
1847
1848        fdb_doc doc_kv;
1849        int64_t _offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
1850                                                  true);
1851        if (_offset <= 0) {
1852            return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
1853        }
1854        if (_doc.length.flag & DOCIO_DELETED && (iterator->opt & FDB_ITR_NO_DELETES)) {
1855            free(_doc.key);
1856            free(_doc.meta);
1857            return FDB_RESULT_KEY_NOT_FOUND;
1858        }
1859        doc_kv.key = _doc.key;
1860        doc_kv.keylen = _doc.length.keylen;
1861        doc_kv.seqnum = SEQNUM_NOT_USED; // search by key not seqnum
1862        if (wal_find(iterator->handle->shandle->snap_txn,
1863                    iterator->handle->file,
1864                    &iterator->handle->shandle->cmp_info,
1865                    iterator->handle->shandle,
1866                     &doc_kv, (uint64_t *) &_offset) == FDB_RESULT_SUCCESS &&
1867                iterator->start_seqnum <= doc_kv.seqnum &&
1868                doc_kv.seqnum <= iterator->end_seqnum) {
1869            free(_doc.key);
1870            free(_doc.meta);
1871            goto start_seq; // B-tree item exists in WAL, skip for now
1872        }
1873        // Also look in HB-Trie to eliminate duplicates
1874        uint64_t hboffset;
1875        struct docio_object _hbdoc;
1876        hr = hbtrie_find(iterator->handle->trie, _doc.key, _doc.length.keylen,
1877                         (void *)&hboffset);
1878        if (hr == HBTRIE_CORRUPTED_RECOVERING_ERR){
1879            _fdb_invalidate_dbheader(iterator->handle);
1880            return FDB_RECOVERABLE_ERR;
1881        }
1882        btreeblk_end(iterator->handle->bhandle);
1883
1884        if (hr != HBTRIE_RESULT_SUCCESS) {
1885            free(_doc.key);
1886            free(_doc.meta);
1887            goto start_seq;
1888        } else { // If present in HB-trie ensure it's seqnum is in range
1889            int64_t _offset;
1890            _hbdoc.key = _doc.key;
1891            _hbdoc.meta = NULL;
1892            hboffset = _endian_decode(hboffset);
1893            _offset = docio_read_doc_key_meta(iterator->handle->dhandle,
1894                                              hboffset, &_hbdoc,
1895                                              true);
1896            if (_offset <= 0) {
1897                free(_doc.key);
1898                free(_doc.meta);
1899                return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
1900            }
1901            if (_doc.seqnum < _hbdoc.seqnum &&
1902                _hbdoc.seqnum <= iterator->end_seqnum) {
1903                free(_doc.key);
1904                free(_doc.meta);
1905                free(_hbdoc.meta);
1906                goto start_seq;
1907            }
1908            free(_hbdoc.meta);
1909        }
1910        free(_doc.key);
1911        free(_doc.meta);
1912    }
1913
1914    iterator->_dhandle = dhandle; // store for fdb_iterator_get
1915    iterator->_get_offset = offset; // store for fdb_iterator_get
1916
1917    return FDB_RESULT_SUCCESS;
1918}
1919
1920LIBFDB_API
1921fdb_status fdb_iterator_prev(fdb_iterator *iterator)
1922{
1923    if (!iterator || !iterator->handle) {
1924        return FDB_RESULT_INVALID_HANDLE;
1925    }
1926
1927    fdb_status result = FDB_RESULT_SUCCESS;
1928    LATENCY_STAT_START();
1929
1930    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
1931        return FDB_RESULT_HANDLE_BUSY;
1932    }
1933
1934    if (iterator->hbtrie_iterator) {
1935        while ((result = _fdb_iterator_prev(iterator)) ==
1936                FDB_RESULT_KEY_NOT_FOUND);
1937    } else {
1938        while ((result = _fdb_iterator_seq_prev(iterator)) ==
1939                FDB_RESULT_KEY_NOT_FOUND);
1940    }
1941    if (result == FDB_RESULT_SUCCESS) {
1942        iterator->direction = FDB_ITR_REVERSE;
1943    } else {
1944        iterator->_dhandle = NULL; // fail fdb_iterator_get also
1945        if (iterator->direction != FDB_ITR_DIR_NONE) {
1946            if ((iterator->seqtree_iterator || iterator->seqtrie_iterator) &&
1947                    iterator->status == FDB_ITR_IDX) {
1948                iterator->_offset = BLK_NOT_FOUND;
1949            }
1950        }
1951    }
1952
1953    atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1954    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_moves,
1955                         std::memory_order_relaxed);
1956    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_PREV);
1957    return result;
1958}
1959
1960LIBFDB_API
1961fdb_status fdb_iterator_next(fdb_iterator *iterator)
1962{
1963    if (!iterator || !iterator->handle) {
1964        return FDB_RESULT_INVALID_HANDLE;
1965    }
1966
1967    fdb_status result = FDB_RESULT_SUCCESS;
1968    LATENCY_STAT_START();
1969
1970    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
1971        return FDB_RESULT_HANDLE_BUSY;
1972    }
1973
1974    if (iterator->hbtrie_iterator) {
1975        while ((result = _fdb_iterator_next(iterator)) ==
1976                FDB_RESULT_KEY_NOT_FOUND);
1977    } else {
1978        while ((result = _fdb_iterator_seq_next(iterator)) ==
1979                FDB_RESULT_KEY_NOT_FOUND);
1980    }
1981    if (result == FDB_RESULT_SUCCESS) {
1982        iterator->direction = FDB_ITR_FORWARD;
1983    } else {
1984        iterator->_dhandle = NULL; // fail fdb_iterator_get also
1985        if (iterator->direction != FDB_ITR_DIR_NONE) {
1986            if ((iterator->seqtree_iterator || iterator->seqtrie_iterator) &&
1987                    iterator->status == FDB_ITR_IDX) {
1988                iterator->_offset = BLK_NOT_FOUND;
1989            }
1990        }
1991    }
1992
1993    atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1994    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_moves,
1995                         std::memory_order_relaxed);
1996    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_NEXT);
1997    return result;
1998}
1999
2000// DOC returned by this function must be freed by fdb_doc_free
2001// if it was allocated because the incoming doc was pointing to NULL
2002LIBFDB_API
2003fdb_status fdb_iterator_get(fdb_iterator *iterator, fdb_doc **doc)
2004{
2005    if (!iterator || !iterator->handle) {
2006        return FDB_RESULT_INVALID_HANDLE;
2007    }
2008
2009    if (!doc) {
2010        return FDB_RESULT_INVALID_ARGS;
2011    }
2012
2013    struct docio_object _doc;
2014    fdb_status ret = FDB_RESULT_SUCCESS;
2015    uint64_t offset;
2016    struct docio_handle *dhandle;
2017    size_t size_chunk = iterator->handle->config.chunksize;
2018    bool alloced_key, alloced_meta, alloced_body;
2019    LATENCY_STAT_START();
2020
2021    dhandle = iterator->_dhandle;
2022    if (!dhandle || iterator->_get_offset == BLK_NOT_FOUND) {
2023        return FDB_RESULT_ITERATOR_FAIL;
2024    }
2025
2026    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
2027        return FDB_RESULT_HANDLE_BUSY;
2028    }
2029
2030    offset = iterator->_get_offset;
2031
2032    if (*doc == NULL) {
2033        ret = fdb_doc_create(doc, NULL, 0, NULL, 0, NULL, 0);
2034        if (ret != FDB_RESULT_SUCCESS) { // LCOV_EXCL_START
2035            atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2036            return ret;
2037        } // LCOV_EXCL_STOP
2038        _doc.key = NULL;
2039        _doc.length.keylen = 0;
2040        _doc.meta = NULL;
2041        _doc.body = NULL;
2042        alloced_key = true;
2043        alloced_meta = true;
2044        alloced_body = true;
2045    } else {
2046        _doc.key = (*doc)->key;
2047        _doc.meta = (*doc)->meta;
2048        _doc.body = (*doc)->body;
2049        alloced_key = _doc.key ? false : true;
2050        alloced_meta = _doc.meta ? false : true;
2051        alloced_body = _doc.body ? false : true;
2052    }
2053
2054    int64_t _offset = docio_read_doc(dhandle, offset, &_doc, true);
2055    if (_offset <= 0) {
2056        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2057        return _offset < 0 ? (fdb_status) _offset : FDB_RESULT_KEY_NOT_FOUND;
2058    }
2059    if (_doc.length.flag & DOCIO_DELETED &&
2060        (iterator->opt & FDB_ITR_NO_DELETES)) {
2061        if (alloced_key) {
2062            free(_doc.key);
2063        }
2064        if (alloced_meta) {
2065            free(_doc.meta);
2066        }
2067        if (alloced_body) {
2068            free(_doc.body);
2069        }
2070        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2071        return FDB_RESULT_KEY_NOT_FOUND;
2072    }
2073
2074    if (iterator->handle->kvs) {
2075        // eliminate KV ID from key
2076        _doc.length.keylen -= size_chunk;
2077        memmove(_doc.key, (uint8_t*)_doc.key + size_chunk, _doc.length.keylen);
2078    }
2079
2080    if (alloced_key) {
2081        (*doc)->key = _doc.key;
2082    }
2083    if (alloced_meta) {
2084        (*doc)->meta = _doc.meta;
2085    }
2086    if (alloced_body) {
2087        (*doc)->body = _doc.body;
2088    }
2089    (*doc)->keylen = _doc.length.keylen;
2090    (*doc)->metalen = _doc.length.metalen;
2091    (*doc)->bodylen = _doc.length.bodylen;
2092    (*doc)->seqnum = _doc.seqnum;
2093    (*doc)->deleted = _doc.length.flag & DOCIO_DELETED;
2094    (*doc)->offset = offset;
2095
2096    atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2097    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_gets,
2098                         std::memory_order_relaxed);
2099    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_GET);
2100    return ret;
2101}
2102
2103// DOC returned by this function must be freed using 'fdb_doc_free'
2104LIBFDB_API
2105fdb_status fdb_iterator_get_metaonly(fdb_iterator *iterator, fdb_doc **doc)
2106{
2107    if (!iterator || !iterator->handle) {
2108        return FDB_RESULT_INVALID_HANDLE;
2109    }
2110
2111    if (!doc) {
2112        return FDB_RESULT_INVALID_ARGS;
2113    }
2114
2115    struct docio_object _doc;
2116    fdb_status ret = FDB_RESULT_SUCCESS;
2117    uint64_t offset;
2118    int64_t _offset;
2119    struct docio_handle *dhandle;
2120    size_t size_chunk = iterator->handle->config.chunksize;
2121    bool alloced_key, alloced_meta;
2122    LATENCY_STAT_START();
2123
2124    dhandle = iterator->_dhandle;
2125    if (!dhandle || iterator->_get_offset == BLK_NOT_FOUND) {
2126        return FDB_RESULT_ITERATOR_FAIL;
2127    }
2128
2129    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
2130        return FDB_RESULT_HANDLE_BUSY;
2131    }
2132
2133    offset = iterator->_get_offset;
2134
2135    if (*doc == NULL) {
2136        ret = fdb_doc_create(doc, NULL, 0, NULL, 0, NULL, 0);
2137        if (ret != FDB_RESULT_SUCCESS) { // LCOV_EXCL_START
2138            atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2139            return ret;
2140        } // LCOV_EXCL_STOP
2141        _doc.key = NULL;
2142        _doc.length.keylen = 0;
2143        _doc.meta = NULL;
2144        _doc.body = NULL;
2145        alloced_key = true;
2146        alloced_meta = true;
2147    } else {
2148        _doc.key = (*doc)->key;
2149        _doc.meta = (*doc)->meta;
2150        _doc.body = NULL;
2151        alloced_key = _doc.key ? false : true;
2152        alloced_meta = _doc.meta ? false : true;
2153    }
2154
2155    _offset = docio_read_doc_key_meta(dhandle, offset, &_doc, true);
2156    if (_offset <= 0) {
2157        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2158        return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
2159    }
2160    if (_doc.length.flag & DOCIO_DELETED &&
2161            (iterator->opt & FDB_ITR_NO_DELETES)) {
2162        if (alloced_key) {
2163            free(_doc.key);
2164        }
2165        if (alloced_meta) {
2166            free(_doc.meta);
2167        }
2168        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2169        return FDB_RESULT_KEY_NOT_FOUND;
2170    }
2171
2172    if (iterator->handle->kvs) {
2173        // eliminate KV ID from key
2174        _doc.length.keylen -= size_chunk;
2175        memmove(_doc.key, (uint8_t*)_doc.key + size_chunk, _doc.length.keylen);
2176    }
2177    if (alloced_key) {
2178        (*doc)->key = _doc.key;
2179    }
2180    if (alloced_meta) {
2181        (*doc)->meta = _doc.meta;
2182    }
2183    (*doc)->keylen = _doc.length.keylen;
2184    (*doc)->metalen = _doc.length.metalen;
2185    (*doc)->bodylen = _doc.length.bodylen;
2186    (*doc)->seqnum = _doc.seqnum;
2187    (*doc)->deleted = _doc.length.flag & DOCIO_DELETED;
2188    (*doc)->offset = offset;
2189
2190    atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2191    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_gets,
2192                         std::memory_order_relaxed);
2193    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_GET_META);
2194    return ret;
2195}
2196
2197LIBFDB_API
2198fdb_status fdb_iterator_close(fdb_iterator *iterator)
2199{
2200    if (!iterator || !iterator->handle) {
2201        return FDB_RESULT_INVALID_HANDLE;
2202    }
2203
2204    LATENCY_STAT_START();
2205    if (iterator->hbtrie_iterator) {
2206        hbtrie_iterator_free(iterator->hbtrie_iterator);
2207        free(iterator->hbtrie_iterator);
2208    }
2209
2210    if (iterator->seqtree_iterator) {
2211        btree_iterator_free(iterator->seqtree_iterator);
2212        free(iterator->seqtree_iterator);
2213    }
2214    if (iterator->seqtrie_iterator) {
2215        hbtrie_iterator_free(iterator->seqtrie_iterator);
2216        free(iterator->seqtrie_iterator);
2217    }
2218
2219    if (iterator->start_key) {
2220        free(iterator->start_key);
2221    }
2222
2223    if (iterator->end_key) {
2224        free(iterator->end_key);
2225    }
2226
2227    --iterator->handle->num_iterators; // Decrement the iterator counter of the KV handle
2228    wal_itr_close(iterator->wal_itr);
2229
2230    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_CLOSE);
2231
2232    if (!iterator->snapshot_handle) {
2233        // Close the opened handle in the iterator,
2234        // if the handle is not for snapshot.
2235        fdb_status fs = fdb_kvs_close(iterator->handle);
2236        if (fs != FDB_RESULT_SUCCESS) {
2237            fdb_log(&iterator->handle->log_callback, fs,
2238                    "Failed to close the KV Store from a database file '%s' as "
2239                    "part of closing the iterator",
2240                    iterator->handle->file->filename);
2241        }
2242    }
2243
2244    free(iterator->_key);
2245    free(iterator);
2246    return FDB_RESULT_SUCCESS;
2247}
2248