xref: /4.6.0/forestdb/src/iterator.cc (revision 7f953304)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <fcntl.h>
22
23#include "libforestdb/forestdb.h"
24#include "fdb_internal.h"
25#include "hbtrie.h"
26#include "docio.h"
27#include "btreeblock.h"
28#include "common.h"
29#include "wal.h"
30#include "avltree.h"
31#include "list.h"
32#include "internal_types.h"
33#include "btree_var_kv_ops.h"
34#include "timing.h"
35
36#include "memleak.h"
37
38#ifdef __DEBUG
39#ifndef __DEBUG_FDB
40    #undef DBG
41    #undef DBGCMD
42    #undef DBGSW
43    #define DBG(...)
44    #define DBGCMD(...)
45    #define DBGSW(n, ...)
46#endif
47#endif
48
49// lexicographically compares two variable-length binary streams
50static int _fdb_keycmp(void *key1, size_t keylen1, void *key2, size_t keylen2)
51{
52    if (keylen1 == keylen2) {
53        return memcmp(key1, key2, keylen1);
54    }else {
55        size_t len = MIN(keylen1, keylen2);
56        int cmp = memcmp(key1, key2, len);
57        if (cmp != 0) return cmp;
58        else {
59            return (int)((int)keylen1 - (int)keylen2);
60        }
61    }
62}
63
64static int _fdb_key_cmp(fdb_iterator *iterator, void *key1, size_t keylen1,
65                        void *key2, size_t keylen2) {
66    int cmp;
67    if (iterator->handle->kvs_config.custom_cmp) {
68        // custom compare function for variable length key
69        if (iterator->handle->kvs) {
70            // multi KV instance mode
71            // KV ID should be compared separately
72            size_t size_chunk = iterator->handle->config.chunksize;
73            fdb_kvs_id_t a_id, b_id;
74            buf2kvid(size_chunk, key1, &a_id);
75            buf2kvid(size_chunk, key2, &b_id);
76
77            if (a_id < b_id) {
78                cmp = -1;
79            } else if (a_id > b_id) {
80                cmp = 1;
81            } else {
82                if (keylen1 == size_chunk) { // key1 < key2
83                    return -1;
84                } else if (keylen2 == size_chunk) { // key1 > key2
85                    return 1;
86                }
87                cmp = iterator->handle->kvs_config.custom_cmp(
88                          (uint8_t*)key1 + size_chunk, keylen1 - size_chunk,
89                          (uint8_t*)key2 + size_chunk, keylen2 - size_chunk);
90            }
91        } else {
92            cmp = iterator->handle->kvs_config.custom_cmp(key1, keylen1,
93                                                       key2, keylen2);
94        }
95    } else {
96        cmp = _fdb_keycmp(key1, keylen1, key2, keylen2);
97    }
98    return cmp;
99}
100
101LIBFDB_API
102fdb_status fdb_iterator_init(fdb_kvs_handle *handle,
103                             fdb_iterator **ptr_iterator,
104                             const void *start_key,
105                             size_t start_keylen,
106                             const void *end_key,
107                             size_t end_keylen,
108                             fdb_iterator_opt_t opt)
109{
110    if (!handle) {
111        return FDB_RESULT_INVALID_HANDLE;
112    }
113
114    if (start_keylen > FDB_MAX_KEYLEN ||
115        (handle->kvs_config.custom_cmp &&
116           (start_keylen > handle->config.blocksize - HBTRIE_HEADROOM ||
117            end_keylen > handle->config.blocksize - HBTRIE_HEADROOM)) ||
118        end_keylen > FDB_MAX_KEYLEN) {
119        return FDB_RESULT_INVALID_ARGS;
120    }
121
122    if ((opt & FDB_ITR_SKIP_MIN_KEY && (!start_key || !start_keylen)) ||
123        (opt & FDB_ITR_SKIP_MAX_KEY && (!end_key || !end_keylen))) {
124        return FDB_RESULT_INVALID_ARGS;
125    }
126
127    hbtrie_result hr;
128    fdb_status fs;
129    LATENCY_STAT_START();
130
131    if (!handle->shandle) {
132        // If compaction is already done before this line,
133        // handle->file needs to be replaced with handle->new_file.
134        fdb_check_file_reopen(handle, NULL);
135        fdb_sync_db_header(handle);
136    }
137
138    fdb_iterator *iterator = (fdb_iterator *)calloc(1, sizeof(fdb_iterator));
139
140    if (!handle->shandle) {
141        // snapshot handle doesn't exist
142        // open a new handle to make the iterator handle as a snapshot
143        fs = fdb_snapshot_open(handle, &iterator->handle, FDB_SNAPSHOT_INMEM);
144        if (fs != FDB_RESULT_SUCCESS) {
145            fdb_log(&handle->log_callback, fs,
146                    "Failed to create an iterator instance due to the failure of "
147                    "open operation on the KV Store '%s' in a database file '%s'",
148                    _fdb_kvs_get_name(handle, handle->file),
149                    handle->file->filename);
150            return fs;
151        }
152        iterator->snapshot_handle = false;
153    } else {
154        // Snapshot handle exists
155        // We don't need to open a new handle.. just point to the snapshot handle.
156        iterator->handle = handle;
157        iterator->snapshot_handle = true;
158    }
159    iterator->opt = opt;
160
161    iterator->_key = (void*)malloc(FDB_MAX_KEYLEN_INTERNAL);
162    // set to zero the first <chunksize> bytes
163    memset(iterator->_key, 0x0, iterator->handle->config.chunksize);
164    iterator->_keylen = 0;
165    iterator->_offset = BLK_NOT_FOUND;
166    iterator->hbtrie_iterator = NULL;
167    iterator->seqtree_iterator = NULL;
168    iterator->seqtrie_iterator = NULL;
169
170    if (iterator->handle->kvs) {
171        // multi KV instance mode .. prepend KV ID
172        size_t size_chunk = handle->config.chunksize;
173        uint8_t *start_key_temp, *end_key_temp;
174
175        if (start_key == NULL) {
176            start_key_temp = alca(uint8_t, size_chunk);
177            kvid2buf(size_chunk, iterator->handle->kvs->id, start_key_temp);
178            start_key = start_key_temp;
179            start_keylen = size_chunk;
180        } else {
181            start_key_temp = alca(uint8_t, size_chunk + start_keylen);
182            kvid2buf(size_chunk, iterator->handle->kvs->id, start_key_temp);
183            memcpy(start_key_temp + size_chunk, start_key, start_keylen);
184            start_key = start_key_temp;
185            start_keylen += size_chunk;
186        }
187
188        if (end_key == NULL) {
189            // set end_key as NULL key of the next KV ID.
190            // NULL key doesn't actually exist so that the iterator ends
191            // at the last key of the current KV ID.
192            end_key_temp = alca(uint8_t, size_chunk);
193            kvid2buf(size_chunk, iterator->handle->kvs->id+1, end_key_temp);
194            end_key = end_key_temp;
195            end_keylen = size_chunk;
196        } else {
197            end_key_temp = alca(uint8_t, size_chunk + end_keylen);
198            kvid2buf(size_chunk, iterator->handle->kvs->id, end_key_temp);
199            memcpy(end_key_temp + size_chunk, end_key, end_keylen);
200            end_key = end_key_temp;
201            end_keylen += size_chunk;
202        }
203
204        iterator->start_key = (void*)malloc(start_keylen);
205        memcpy(iterator->start_key, start_key, start_keylen);
206        iterator->start_keylen = start_keylen;
207
208        iterator->end_key = (void*)malloc(end_keylen);
209        memcpy(iterator->end_key, end_key, end_keylen);
210        iterator->end_keylen = end_keylen;
211
212    } else { // single KV instance mode
213        if (start_key == NULL) {
214            iterator->start_key = NULL;
215            iterator->start_keylen = 0;
216        } else {
217            iterator->start_key = (void*)malloc(start_keylen);
218            memcpy(iterator->start_key, start_key, start_keylen);
219            iterator->start_keylen = start_keylen;
220        }
221
222        if (end_key == NULL) {
223            iterator->end_key = NULL;
224            end_keylen = 0;
225        }else{
226            iterator->end_key = (void*)malloc(end_keylen);
227            memcpy(iterator->end_key, end_key, end_keylen);
228        }
229        iterator->end_keylen = end_keylen;
230    }
231
232    // create an iterator handle for hb-trie
233    iterator->hbtrie_iterator = (struct hbtrie_iterator *)
234                                malloc(sizeof(struct hbtrie_iterator));
235    hr = hbtrie_iterator_init(iterator->handle->trie,
236                              iterator->hbtrie_iterator,
237                              (void *)start_key, start_keylen);
238    assert(hr == HBTRIE_RESULT_SUCCESS);
239
240    wal_itr_init(iterator->handle->file, iterator->handle->shandle, true,
241                 &iterator->wal_itr);
242
243    if (start_key) {
244        struct wal_item query;
245        struct wal_item_header query_key;
246        query.header = &query_key;
247        query_key.key = iterator->start_key;
248        query_key.keylen = iterator->start_keylen;
249        iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
250                                                       &query);
251    } else {
252        iterator->tree_cursor = wal_itr_first(iterator->wal_itr);
253    }
254    // to know reverse iteration endpoint store the start cursor
255    if (iterator->tree_cursor) {
256        iterator->tree_cursor_start = iterator->tree_cursor;
257    }
258    iterator->tree_cursor_prev = iterator->tree_cursor;
259    iterator->direction = FDB_ITR_DIR_NONE;
260    iterator->status = FDB_ITR_IDX;
261    iterator->_dhandle = NULL; // populated at the first iterator movement
262
263    *ptr_iterator = iterator;
264
265    ++iterator->handle->num_iterators; // Increment the iterator counter of the KV handle
266    fdb_iterator_next(iterator); // position cursor at first key
267
268    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_INIT);
269
270    return FDB_RESULT_SUCCESS;
271}
272
273LIBFDB_API
274fdb_status fdb_iterator_sequence_init(fdb_kvs_handle *handle,
275                                      fdb_iterator **ptr_iterator,
276                                      const fdb_seqnum_t start_seq,
277                                      const fdb_seqnum_t end_seq,
278                                      fdb_iterator_opt_t opt)
279{
280    if (!handle) {
281        return FDB_RESULT_INVALID_HANDLE;
282    }
283
284    if (ptr_iterator == NULL || (end_seq && start_seq > end_seq)) {
285        return FDB_RESULT_INVALID_ARGS;
286    }
287
288    fdb_status fs;
289    fdb_seqnum_t _start_seq = _endian_encode(start_seq);
290    fdb_kvs_id_t _kv_id;
291    size_t size_id, size_seq;
292    uint8_t *start_seq_kv;
293    struct wal_item query;
294    struct wal_item_header query_key;
295    LATENCY_STAT_START();
296
297    query.header = &query_key;
298
299    // Sequence trees are a must for byseq operations
300    if (handle->config.seqtree_opt != FDB_SEQTREE_USE) {
301        return FDB_RESULT_INVALID_CONFIG;
302    }
303
304    if (!handle->shandle) {
305        // If compaction is already done before this line,
306        // handle->file needs to be replaced with handle->new_file.
307        fdb_check_file_reopen(handle, NULL);
308        fdb_sync_db_header(handle);
309    }
310
311    size_id = sizeof(fdb_kvs_id_t);
312    size_seq = sizeof(fdb_seqnum_t);
313    fdb_iterator *iterator = (fdb_iterator *)calloc(1, sizeof(fdb_iterator));
314
315    if (!handle->shandle) {
316        // snapshot handle doesn't exist
317        // open a new handle to make the iterator handle as a snapshot
318        fs = fdb_snapshot_open(handle, &iterator->handle, FDB_SNAPSHOT_INMEM);
319        if (fs != FDB_RESULT_SUCCESS) {
320            fdb_log(&handle->log_callback, fs,
321                    "Failed to create an sequence iterator instance due to the "
322                    "failure of "
323                    "open operation on the KV Store '%s' in a database file '%s'",
324                    _fdb_kvs_get_name(handle, handle->file),
325                    handle->file->filename);
326            return fs;
327        }
328        iterator->snapshot_handle = false;
329    } else {
330        // Snapshot handle exists
331        // We don't need to open a new handle.. just point to the snapshot handle.
332        iterator->handle = handle;
333        iterator->snapshot_handle = true;
334    }
335
336    iterator->hbtrie_iterator = NULL;
337    iterator->_key = NULL;
338    iterator->_keylen = 0;
339    iterator->opt = opt;
340    iterator->_offset = BLK_NOT_FOUND;
341    iterator->_seqnum = start_seq;
342
343    // For easy API call, treat zero seq as 0xffff...
344    // (because zero seq number is not used)
345    if (end_seq == 0) {
346        iterator->end_seqnum = SEQNUM_NOT_USED;
347    } else {
348        iterator->end_seqnum = end_seq;
349    }
350
351    iterator->start_seqnum = start_seq;
352
353    iterator->start_key = NULL;
354    iterator->end_key = NULL;
355
356    wal_itr_init(handle->file, iterator->handle->shandle, false,
357                 &iterator->wal_itr);
358
359    if (iterator->handle->kvs) {
360        int size_chunk = handle->config.chunksize;
361        // create an iterator handle for hb-trie
362        start_seq_kv = alca(uint8_t, size_chunk + size_seq);
363        _kv_id = _endian_encode(iterator->handle->kvs->id);
364        memcpy(start_seq_kv, &_kv_id, size_id);
365        memcpy(start_seq_kv + size_id, &_start_seq, size_seq);
366
367        iterator->seqtrie_iterator = (struct hbtrie_iterator *)
368                                     calloc(1, sizeof(struct hbtrie_iterator));
369        hbtrie_iterator_init(iterator->handle->seqtrie,
370                             iterator->seqtrie_iterator,
371                             start_seq_kv, size_id + size_seq);
372
373        query_key.key = start_seq_kv;
374        kvid2buf(size_chunk, iterator->handle->kvs->id, start_seq_kv);
375        memcpy(start_seq_kv + size_chunk, &start_seq, size_seq);
376        query_key.keylen = size_chunk + size_seq;
377        query.seqnum = start_seq;
378        iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
379                                                       &query);
380    } else {
381        // create an iterator handle for b-tree
382        iterator->seqtree_iterator = (struct btree_iterator *)
383                                     calloc(1, sizeof(struct btree_iterator));
384        btree_iterator_init(iterator->handle->seqtree,
385                            iterator->seqtree_iterator,
386                            (void *)(start_seq ? &_start_seq : NULL));
387        query_key.key = (void*)NULL;
388        query_key.keylen = 0;
389        query.seqnum = start_seq;
390        iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
391                                                       &query);
392    }
393
394    // to know reverse iteration endpoint store the start cursor
395    if (iterator->tree_cursor) {
396        iterator->tree_cursor_start = iterator->tree_cursor;
397    }
398    iterator->tree_cursor_prev = iterator->tree_cursor;
399    iterator->direction = FDB_ITR_DIR_NONE;
400    iterator->status = FDB_ITR_IDX;
401    iterator->_dhandle = NULL; // populated at the first iterator movement
402
403    *ptr_iterator = iterator;
404
405    ++iterator->handle->num_iterators; // Increment the iterator counter of the KV handle
406    fdb_iterator_next(iterator); // position cursor at first key
407
408    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_SEQ_INIT);
409
410    return FDB_RESULT_SUCCESS;
411}
412
413static fdb_status _fdb_iterator_prev(fdb_iterator *iterator)
414{
415    int cmp;
416    void *key;
417    size_t keylen;
418    uint64_t offset;
419    hbtrie_result hr = HBTRIE_RESULT_SUCCESS;
420    struct docio_handle *dhandle;
421    struct wal_item *snap_item = NULL;
422
423    if (iterator->direction != FDB_ITR_REVERSE) {
424        iterator->_offset = BLK_NOT_FOUND; // need to re-examine Trie/trees
425        if (iterator->tree_cursor) {
426            // just turn around
427            // WAL:   0  v  2->   4    (OLD state)
428            // TRIE:     1  2  3  4
429            iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
430                                                  iterator->tree_cursor);
431            if (iterator->direction == FDB_ITR_FORWARD &&
432                iterator->status != FDB_ITR_WAL) {
433                iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
434            }
435            // WAL: <-0  v  2     4    (NEW state)
436            // TRIE:  0  1  2  3  4
437        } else if (iterator->tree_cursor_prev) { // gone past the end..
438            iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
439                                             iterator->tree_cursor_prev);
440            iterator->status = FDB_ITR_IDX;
441        } // else Don't move - seek()/init() has already positioned cursor
442    }
443start:
444    key = iterator->_key;
445    dhandle = iterator->handle->dhandle;
446
447    // retrieve from hb-trie
448    if (iterator->_offset == BLK_NOT_FOUND) {
449        // no key waiting for being returned
450        // get next key from hb-trie (or idtree)
451        struct docio_object _doc;
452        // Move Main index Cursor backward...
453        int64_t _offset;
454        do {
455            hr = hbtrie_prev(iterator->hbtrie_iterator, key,
456                             &iterator->_keylen, (void*)&iterator->_offset);
457            btreeblk_end(iterator->handle->bhandle);
458            iterator->_offset = _endian_decode(iterator->_offset);
459            if (!(iterator->opt & FDB_ITR_NO_DELETES) ||
460                  hr != HBTRIE_RESULT_SUCCESS) {
461                break;
462            }
463            // deletion check
464            memset(&_doc, 0x0, sizeof(struct docio_object));
465            _offset = docio_read_doc_key_meta(dhandle, iterator->_offset,
466                                              &_doc, true);
467            if (_offset <= 0) { // read fail
468                continue; // get prev doc
469            }
470            if (_doc.length.flag & DOCIO_DELETED) { // deleted doc
471                free(_doc.key);
472                free(_doc.meta);
473                continue; // get prev doc
474            }
475            free(_doc.key);
476            free(_doc.meta);
477            break;
478        } while (1);
479    }
480    keylen = iterator->_keylen;
481    offset = iterator->_offset;
482
483    if (hr != HBTRIE_RESULT_SUCCESS && !iterator->tree_cursor) {
484        return FDB_RESULT_ITERATOR_FAIL;
485    }
486
487    // Move the WAL cursor backward...
488    while (iterator->tree_cursor) {
489        if (iterator->status == FDB_ITR_WAL) {
490            iterator->tree_cursor_prev = iterator->tree_cursor;
491            iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
492        }// else don't move - seek()/ init() has already positioned cursor
493
494        // get the current item of avl-tree
495        snap_item = iterator->tree_cursor;
496        if (!snap_item) {
497            if (hr == HBTRIE_RESULT_SUCCESS) {
498                break;
499            } else {
500                return FDB_RESULT_ITERATOR_FAIL;
501            }
502        }
503        if (hr == HBTRIE_RESULT_SUCCESS) {
504            cmp = _fdb_key_cmp(iterator, snap_item->header->key,
505                               snap_item->header->keylen,
506                               key, keylen);
507        } else {
508            // no more docs in hb-trie
509            cmp = 1;
510        }
511
512        if (cmp >= 0) {
513            // key[WAL] >= key[hb-trie] .. take key[WAL] first
514            uint8_t drop_logical_deletes =
515                (snap_item->action == WAL_ACT_LOGICAL_REMOVE) &&
516                (iterator->opt & FDB_ITR_NO_DELETES);
517            iterator->status = FDB_ITR_WAL;
518            if (cmp > 0) {
519                if (snap_item->action == WAL_ACT_REMOVE ||
520                    drop_logical_deletes) {
521                    // this key is removed .. get prev key[WAL]
522                    continue;
523                }
524            } else { // same key found in WAL
525                iterator->_offset = BLK_NOT_FOUND; // drop key from trie
526                if (snap_item->action == WAL_ACT_REMOVE || drop_logical_deletes) {
527                    // the key is removed .. start over again
528                    goto start;
529                }
530            }
531
532            key = snap_item->header->key;
533            keylen = snap_item->header->keylen;
534            // key[hb-trie] is stashed in iterator->_key for future call
535            offset = snap_item->offset;
536        }
537        break;
538    }
539
540    if (offset == iterator->_offset) {
541        // take key[hb-trie] & and fetch the prev key[hb-trie] at next turn
542        iterator->_offset = BLK_NOT_FOUND;
543        iterator->status = FDB_ITR_IDX;
544    }
545
546    if (iterator->start_key) {
547        cmp = _fdb_key_cmp(iterator, iterator->start_key,
548                           iterator->start_keylen, key, keylen);
549
550        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MIN_KEY) || cmp > 0) {
551            // current key (KEY) is lexicographically less than START_KEY
552            // OR it is the start key and user wishes to skip it..
553            // terminate the iteration
554            return FDB_RESULT_ITERATOR_FAIL;
555        }
556    }
557
558    if (iterator->end_key) {
559        cmp = _fdb_key_cmp(iterator, iterator->end_key,
560                           iterator->end_keylen, key, keylen);
561
562        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MAX_KEY) || cmp < 0) {
563            // key is the end_key but users wishes to skip it, redo..
564            // OR current key (KEY) is lexicographically greater than END_KEY
565            goto start;
566        }
567    }
568
569    iterator->_dhandle = dhandle; // store for fdb_iterator_get()
570    iterator->_get_offset = offset; // store for fdb_iterator_get()
571
572    return FDB_RESULT_SUCCESS;
573}
574
575static fdb_status _fdb_iterator_next(fdb_iterator *iterator)
576{
577    int cmp;
578    void *key;
579    size_t keylen;
580    uint64_t offset;
581    hbtrie_result hr = HBTRIE_RESULT_SUCCESS;
582    struct docio_handle *dhandle;
583    struct wal_item *snap_item = NULL;
584
585    if (iterator->direction != FDB_ITR_FORWARD) {
586        iterator->_offset = BLK_NOT_FOUND; // need to re-examine Trie/trees
587        // just turn around and face forward..
588        if (iterator->tree_cursor) {
589            // WAL: <-0  v  2     4    (OLD state)
590            // TRIE:     1  2  3  4
591            iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
592                                                  iterator->tree_cursor);
593            if (iterator->direction == FDB_ITR_REVERSE &&
594                iterator->status != FDB_ITR_WAL) {
595                iterator->tree_cursor = wal_itr_next(iterator->wal_itr);
596            }
597            // WAL:   0  v  2->   4    (NEW state)
598            // TRIE:  0  1  2  3  4
599        } else if (iterator->tree_cursor_prev) {
600            iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
601                                             iterator->tree_cursor_prev);
602            iterator->status = FDB_ITR_IDX;
603        } // else Don't move - seek()/init() has already positioned cursor
604    }
605
606start:
607    key = iterator->_key;
608    dhandle = iterator->handle->dhandle;
609
610    // retrieve from hb-trie
611    if (iterator->_offset == BLK_NOT_FOUND) {
612        // no key waiting for being returned
613        // get next key from hb-trie (or idtree)
614        struct docio_object _doc;
615        // Move Main index Cursor forward...
616        int64_t _offset;
617        do {
618            hr = hbtrie_next(iterator->hbtrie_iterator, key,
619                             &iterator->_keylen, (void*)&iterator->_offset);
620            btreeblk_end(iterator->handle->bhandle);
621            iterator->_offset = _endian_decode(iterator->_offset);
622            if (!(iterator->opt & FDB_ITR_NO_DELETES) ||
623                  hr != HBTRIE_RESULT_SUCCESS) {
624                break;
625            }
626            // deletion check
627            memset(&_doc, 0x0, sizeof(struct docio_object));
628            _offset = docio_read_doc_key_meta(dhandle, iterator->_offset, &_doc,
629                                              true);
630            if (_offset <= 0) { // read fail
631                continue; // get next doc
632            }
633            if (_doc.length.flag & DOCIO_DELETED) { // deleted doc
634                free(_doc.key);
635                free(_doc.meta);
636                continue; // get next doc
637            }
638            free(_doc.key);
639            free(_doc.meta);
640            break;
641        } while (1);
642    }
643
644    keylen = iterator->_keylen;
645    offset = iterator->_offset;
646
647    if (hr != HBTRIE_RESULT_SUCCESS && iterator->tree_cursor == NULL) {
648        return FDB_RESULT_ITERATOR_FAIL;
649    }
650
651    // Move WAL Cursor forward...
652    while (iterator->tree_cursor) {
653        if (iterator->status == FDB_ITR_WAL) {
654            iterator->tree_cursor_prev = iterator->tree_cursor;
655            iterator->tree_cursor = wal_itr_next(iterator->wal_itr);
656        } // else Don't move - seek()/ init() has already positioned cursor
657        snap_item = iterator->tree_cursor;
658        if (!snap_item) {
659            if (hr == HBTRIE_RESULT_SUCCESS) {
660                break;
661            } else { // no more keys in WAL or main index
662                return FDB_RESULT_ITERATOR_FAIL;
663            }
664        }
665        // Compare key[WAL] with key[hb-trie]
666        if (hr == HBTRIE_RESULT_SUCCESS) {
667            cmp = _fdb_key_cmp(iterator, snap_item->header->key,
668                               snap_item->header->keylen,
669                               key, keylen);
670        } else {
671            // no more docs in hb-trie
672            cmp = -1;
673        }
674
675        if (cmp <= 0) {
676            // key[WAL] <= key[hb-trie] .. take key[WAL] first
677            uint8_t drop_logical_deletes =
678                (snap_item->action == WAL_ACT_LOGICAL_REMOVE) &&
679                (iterator->opt & FDB_ITR_NO_DELETES);
680            iterator->status = FDB_ITR_WAL;
681            if (cmp < 0) {
682                if (snap_item->action == WAL_ACT_REMOVE ||
683                    drop_logical_deletes) {
684                    // this key is removed .. get next key[WAL]
685                    continue;
686                }
687            } else { // Same key from trie also found from WAL
688                iterator->_offset = BLK_NOT_FOUND; // drop key from trie
689                if (snap_item->action == WAL_ACT_REMOVE || drop_logical_deletes) {
690                    // the key is removed .. start over again
691                    goto start;
692                }
693            }
694            key = snap_item->header->key;
695            keylen = snap_item->header->keylen;
696            // key[hb-trie] is stashed in iterator->key for next call
697            offset = snap_item->offset;
698        }
699        break;
700    }
701
702    if (offset == iterator->_offset) {
703        // take key[hb-trie] & and fetch the next key[hb-trie] at next turn
704        iterator->_offset = BLK_NOT_FOUND;
705        iterator->status = FDB_ITR_IDX;
706    }
707
708    if (iterator->start_key) {
709        cmp = _fdb_key_cmp(iterator, iterator->start_key,
710                           iterator->start_keylen, key, keylen);
711
712        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MIN_KEY) || cmp > 0) {
713            // If user wishes to skip start key, redo first step
714            // OR current key (KEY) is lexicographically smaller than START_KEY
715            goto start;
716        }
717    }
718
719    if (iterator->end_key) {
720        cmp = _fdb_key_cmp(iterator, iterator->end_key, iterator->end_keylen,
721                           key, keylen);
722        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MAX_KEY) || cmp < 0) {
723            // current key (KEY) is lexicographically greater than END_KEY
724            // OR it is the end_key and user wishes to skip it
725            // terminate the iteration
726            return FDB_RESULT_ITERATOR_FAIL;
727        }
728    }
729
730    iterator->_dhandle = dhandle; // store for fdb_iterator_get()
731    iterator->_get_offset = offset; // store for fdb_iterator_get()
732
733    return FDB_RESULT_SUCCESS;
734}
735
736static
737bool _validate_range_limits(fdb_iterator *iterator,
738                            void *ret_key,
739                            const size_t ret_keylen)
740{
741    int cmp;
742    if (iterator->end_key) {
743        cmp = _fdb_key_cmp(iterator, ret_key, ret_keylen,
744                iterator->end_key, iterator->end_keylen);
745        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MAX_KEY) ||
746                cmp > 0) { // greater than end_key OR at skipped MAX_KEY
747            return false;
748        }
749    }
750    if (iterator->start_key) {
751        cmp = _fdb_key_cmp(iterator, ret_key, ret_keylen,
752                iterator->start_key, iterator->start_keylen);
753        if ((cmp == 0 && iterator->opt & FDB_ITR_SKIP_MIN_KEY) ||
754                cmp < 0) { // smaller than start_key OR at skipped MIN_KEY
755            return false;
756        }
757    }
758    return true;
759}
760
761LIBFDB_API
762fdb_status fdb_iterator_seek(fdb_iterator *iterator,
763                             const void *seek_key,
764                             const size_t seek_keylen,
765                             const fdb_iterator_seek_opt_t seek_pref)
766{
767    if (!iterator || !iterator->handle) {
768        return FDB_RESULT_INVALID_HANDLE;
769    }
770
771    int cmp, cmp2; // intermediate results of comparison
772    int next_op = 0; // 0: none, -1: prev(), 1: next();
773    int size_chunk = iterator->handle->config.chunksize;
774    uint8_t *seek_key_kv;
775    int64_t _offset;
776    size_t seek_keylen_kv;
777    bool skip_wal = false, fetch_next = true, fetch_wal = true;
778    hbtrie_result hr = HBTRIE_RESULT_SUCCESS;
779    struct wal_item *snap_item = NULL, query;
780    struct wal_item_header query_header;
781    struct docio_object _doc;
782    fdb_status ret;
783    LATENCY_STAT_START();
784
785    iterator->_dhandle = NULL; // setup for get() to return FAIL
786
787    if (!seek_key || !iterator->_key ||
788        seek_keylen > FDB_MAX_KEYLEN ||
789        (iterator->handle->kvs_config.custom_cmp &&
790         seek_keylen > iterator->handle->config.blocksize - HBTRIE_HEADROOM)) {
791        return FDB_RESULT_INVALID_ARGS;
792    }
793
794    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
795        return FDB_RESULT_HANDLE_BUSY;
796    }
797
798    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_moves,
799                         std::memory_order_relaxed);
800
801    if (iterator->handle->kvs) {
802        seek_keylen_kv = seek_keylen + size_chunk;
803        seek_key_kv = alca(uint8_t, seek_keylen_kv);
804        kvid2buf(size_chunk, iterator->handle->kvs->id, seek_key_kv);
805        memcpy(seek_key_kv + size_chunk, seek_key, seek_keylen);
806    } else {
807        seek_keylen_kv = seek_keylen;
808        seek_key_kv = (uint8_t*)seek_key;
809    }
810
811    // disable seeking beyond the end key...
812    if (iterator->end_key) {
813        cmp = _fdb_key_cmp(iterator, (void *)iterator->end_key,
814                                    iterator->end_keylen,
815                                    (void *)seek_key_kv, seek_keylen_kv);
816        if (cmp == 0 && iterator->opt & FDB_ITR_SKIP_MAX_KEY) {
817            // seek the end key at this time,
818            // and call prev() next.
819            next_op = -1;
820        }
821        if (cmp < 0) {
822            atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
823            return FDB_RESULT_ITERATOR_FAIL;
824        }
825    }
826
827    // disable seeking beyond the start key...
828    if (iterator->start_key) {
829        cmp = _fdb_key_cmp(iterator,
830                                  (void *)iterator->start_key,
831                                  iterator->start_keylen,
832                                  (void *)seek_key_kv, seek_keylen_kv);
833        if (cmp == 0 && iterator->opt & FDB_ITR_SKIP_MIN_KEY) {
834            // seek the start key at this time,
835            // and call next() next.
836            next_op = 1;
837        }
838        if (cmp > 0) {
839            atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
840            return FDB_RESULT_ITERATOR_FAIL;
841        }
842    }
843
844    iterator->direction = FDB_ITR_FORWARD;
845
846    // reset HB+trie's iterator
847    hbtrie_iterator_free(iterator->hbtrie_iterator);
848    hbtrie_iterator_init(iterator->handle->trie, iterator->hbtrie_iterator,
849                         seek_key_kv, seek_keylen_kv);
850
851fetch_hbtrie:
852    if (seek_pref == FDB_ITR_SEEK_HIGHER) {
853        // fetch next key
854        hr = hbtrie_next(iterator->hbtrie_iterator, iterator->_key,
855                         &iterator->_keylen, (void*)&iterator->_offset);
856        btreeblk_end(iterator->handle->bhandle);
857
858        if (hr == HBTRIE_RESULT_SUCCESS) {
859            cmp = _fdb_key_cmp(iterator,
860                               iterator->_key, iterator->_keylen,
861                               seek_key_kv, seek_keylen_kv);
862            if (cmp < 0) {
863                // key[HB+trie] < seek_key .. move forward
864                hr = hbtrie_next(iterator->hbtrie_iterator, iterator->_key,
865                                 &iterator->_keylen, (void*)&iterator->_offset);
866                btreeblk_end(iterator->handle->bhandle);
867            }
868            iterator->_offset = _endian_decode(iterator->_offset);
869
870            while (iterator->opt & FDB_ITR_NO_DELETES &&
871                   hr == HBTRIE_RESULT_SUCCESS        &&
872                   fetch_next) {
873                fetch_next = false;
874                memset(&_doc, 0x0, sizeof(struct docio_object));
875                _offset = docio_read_doc_key_meta(iterator->handle->dhandle,
876                                                  iterator->_offset, &_doc,
877                                                  true);
878                if (_offset <= 0) { // read fail
879                    fetch_next = true; // get next
880                } else if (_doc.length.flag & DOCIO_DELETED) { // deleted doc
881                    free(_doc.key);
882                    free(_doc.meta);
883                    fetch_next = true; // get next
884                } else {
885                    free(_doc.key);
886                    free(_doc.meta);
887                }
888                if (fetch_next) {
889                    hr = hbtrie_next(iterator->hbtrie_iterator, iterator->_key,
890                                     &iterator->_keylen,
891                                     (void*)&iterator->_offset);
892                    btreeblk_end(iterator->handle->bhandle);
893                    iterator->_offset = _endian_decode(iterator->_offset);
894                }
895            }
896        }
897    } else {
898        // fetch prev key
899        hr = hbtrie_prev(iterator->hbtrie_iterator, iterator->_key,
900                         &iterator->_keylen, (void*)&iterator->_offset);
901        btreeblk_end(iterator->handle->bhandle);
902        if (hr == HBTRIE_RESULT_SUCCESS) {
903            cmp = _fdb_key_cmp(iterator,
904                               iterator->_key, iterator->_keylen,
905                               seek_key_kv, seek_keylen_kv);
906            if (cmp > 0) {
907                // key[HB+trie] > seek_key .. move backward
908                hr = hbtrie_prev(iterator->hbtrie_iterator, iterator->_key,
909                                 &iterator->_keylen, (void*)&iterator->_offset);
910                btreeblk_end(iterator->handle->bhandle);
911            }
912            iterator->_offset = _endian_decode(iterator->_offset);
913
914            while (iterator->opt & FDB_ITR_NO_DELETES &&
915                   hr == HBTRIE_RESULT_SUCCESS        &&
916                   fetch_next) {
917                fetch_next = false;
918                memset(&_doc, 0x0, sizeof(struct docio_object));
919                _offset = docio_read_doc_key_meta(iterator->handle->dhandle,
920                                                  iterator->_offset, &_doc,
921                                                  true);
922                if (_offset <= 0) { // read fail
923                    fetch_next = true; // get prev
924                } else if (_doc.length.flag & DOCIO_DELETED) { // deleted doc
925                    free(_doc.key);
926                    free(_doc.meta);
927                    fetch_next = true; // get prev
928                } else {
929                    free(_doc.key);
930                    free(_doc.meta);
931                }
932                if (fetch_next) {
933                    hr = hbtrie_prev(iterator->hbtrie_iterator, iterator->_key,
934                                     &iterator->_keylen,
935                                     (void*)&iterator->_offset);
936                    btreeblk_end(iterator->handle->bhandle);
937                    iterator->_offset = _endian_decode(iterator->_offset);
938                }
939            }
940        }
941    }
942
943    if (hr == HBTRIE_RESULT_SUCCESS && // Validate iteration range limits..
944        !next_op) { // only if caller is not seek_to_max/min (handled later)
945        if (!_validate_range_limits(iterator, iterator->_key, iterator->_keylen)) {
946            hr = HBTRIE_RESULT_FAIL;
947        }
948    }
949
950    if (iterator->handle->kvs) {
951        fdb_kvs_id_t kv_id;
952        buf2kvid(size_chunk, iterator->_key, &kv_id);
953        if (iterator->handle->kvs->id != kv_id) {
954            // seek is done beyond the KV ID
955            hr = HBTRIE_RESULT_FAIL;
956        }
957    }
958    if (hr == HBTRIE_RESULT_SUCCESS) {
959        iterator->_get_offset = iterator->_offset;
960        iterator->_dhandle = iterator->handle->dhandle;
961    } else {
962        // larger than the largest key or smaller than the smallest key
963        iterator->_get_offset = BLK_NOT_FOUND;
964        iterator->_dhandle = NULL;
965    }
966
967    // HB+trie's iterator should fetch another entry next time
968    iterator->_offset = BLK_NOT_FOUND;
969    iterator->status = FDB_ITR_IDX;
970
971    // retrieve avl-tree
972    query.header = &query_header;
973    query_header.key = seek_key_kv;
974    query_header.keylen = seek_keylen_kv;
975
976    if (seek_pref == FDB_ITR_SEEK_HIGHER) {
977        if (fetch_wal) {
978            iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
979                                                           &query);
980            iterator->direction = FDB_ITR_FORWARD;
981        }
982        if (iterator->tree_cursor) {
983            // skip deleted WAL entry
984            do {
985                if (!next_op && // only validate range if not skip max/min key mode
986                    !_validate_range_limits(iterator,
987                                            iterator->tree_cursor->header->key,
988                                            iterator->tree_cursor->header->keylen)) {
989                    iterator->tree_cursor = NULL;
990                    break;
991                }
992                snap_item = iterator->tree_cursor;
993                if ((snap_item->action == WAL_ACT_LOGICAL_REMOVE && // skip
994                    iterator->opt & FDB_ITR_NO_DELETES) || //logical delete OR
995                    snap_item->action == WAL_ACT_REMOVE) { // immediate purge
996                    if (iterator->_dhandle) {
997                        cmp = _fdb_key_cmp(iterator,
998                                           snap_item->header->key,
999                                           snap_item->header->keylen,
1000                                           iterator->_key, iterator->_keylen);
1001                        if (cmp == 0) {
1002                            // same doc exists in HB+trie
1003                            // move tree cursor
1004                            iterator->tree_cursor = wal_itr_next(
1005                                                     iterator->wal_itr);
1006                            // do not move tree cursor next time
1007                            fetch_wal = false;
1008                            // fetch next key[HB+trie]
1009                            goto fetch_hbtrie;
1010                        } else if (cmp > 0) {
1011                            break;
1012                        }
1013                    }
1014                    iterator->tree_cursor = wal_itr_next(iterator->wal_itr);
1015                    continue;
1016                } else if (iterator->end_key &&
1017                           iterator->opt & FDB_ITR_SKIP_MAX_KEY) {
1018                    cmp = _fdb_key_cmp(iterator,
1019                                       iterator->end_key, iterator->end_keylen,
1020                                       snap_item->header->key,
1021                                       snap_item->header->keylen);
1022                    if (cmp == 0 ||
1023                        // WAL cursor is positioned exactly at seeked end key
1024                        // but iterator must skip the end key!
1025                        // If hb+trie has an item, use that else return FAIL
1026                        (cmp < 0 &&// WAL key out of range...
1027                        !next_op)) { // Not called from fdb_iterator_seek_to_max
1028                        skip_wal = true;
1029                        iterator->status = FDB_ITR_WAL;
1030                    }
1031                }
1032                break;
1033            } while(iterator->tree_cursor);
1034        }
1035        iterator->tree_cursor_prev = iterator->tree_cursor;
1036        if (!iterator->tree_cursor) {
1037            // seek_key is larger than the largest key
1038            // set prev key to the largest key.
1039            // if prev operation is called next, tree_cursor will be set to
1040            // tree_cursor_prev.
1041            iterator->tree_cursor_prev = wal_itr_search_smaller(iterator->wal_itr,
1042                                                                &query);
1043            skip_wal = true;
1044        }
1045    } else if (seek_pref == FDB_ITR_SEEK_LOWER) {
1046        if (fetch_wal) {
1047            iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
1048                                                           &query);
1049            iterator->direction = FDB_ITR_REVERSE;
1050        }
1051        if (iterator->tree_cursor) {
1052            // skip deleted WAL entry
1053            do {
1054                if (!next_op && // only validate range if not skip max/min key mode
1055                    !_validate_range_limits(iterator,
1056                                            iterator->tree_cursor->header->key,
1057                                            iterator->tree_cursor->header->keylen)) {
1058                    iterator->tree_cursor = NULL;
1059                    break;
1060                }
1061                snap_item = iterator->tree_cursor;
1062                if ((snap_item->action == WAL_ACT_LOGICAL_REMOVE && // skip
1063                     iterator->opt & FDB_ITR_NO_DELETES) || //logical delete OR
1064                     snap_item->action == WAL_ACT_REMOVE) { //immediate purge
1065                    if (iterator->_dhandle) {
1066                        cmp = _fdb_key_cmp(iterator,
1067                                           snap_item->header->key,
1068                                           snap_item->header->keylen,
1069                                           iterator->_key, iterator->_keylen);
1070                        if (cmp == 0) {
1071                            // same doc exists in HB+trie
1072                            // move tree cursor
1073                            iterator->tree_cursor = wal_itr_prev(iterator->
1074                                                                 wal_itr);
1075                            // do not move tree cursor next time
1076                            fetch_wal = false;
1077                            // fetch next key[HB+trie]
1078                            goto fetch_hbtrie;
1079                        } else if (cmp < 0) {
1080                            break;
1081                        }
1082                    }
1083                    iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
1084                    continue;
1085                } else if (iterator->start_key &&
1086                           iterator->opt & FDB_ITR_SKIP_MIN_KEY) {
1087                    cmp = _fdb_key_cmp(iterator,
1088                                  snap_item->header->key,
1089                                  snap_item->header->keylen,
1090                                  iterator->start_key, iterator->start_keylen);
1091                    if (cmp == 0 ||
1092                        // WAL cursor is positioned exactly at seeked start key
1093                        // but iterator must skip the start key!
1094                        // If hb+trie has an item, use that else return FAIL
1095                        (cmp < 0 && // WAL key out of range and
1096                        !next_op)) { // Not called from fdb_iterator_seek_to_min
1097                        skip_wal = true;
1098                        iterator->status = FDB_ITR_WAL;
1099                    }
1100                }
1101                break;
1102            } while(iterator->tree_cursor);
1103        }
1104        iterator->tree_cursor_prev = iterator->tree_cursor;
1105        if (!iterator->tree_cursor) {
1106            // seek_key is smaller than the smallest key
1107            // Only allow fdb_iterator_next() call, fdb_iterator_prev() should
1108            // hit failure. To ensure this set the direction to as if
1109            // fdb_iterator_prev call has gone past the smallest key...
1110            iterator->tree_cursor_prev = wal_itr_search_greater(iterator->wal_itr,
1111                                                                &query);
1112            // since the current key[WAL] is smaller than seek_key,
1113            // skip key[WAL] this time
1114            skip_wal = true;
1115        }
1116    }
1117
1118    if (iterator->tree_cursor && !skip_wal) {
1119        bool take_wal = false;
1120        bool discard_hbtrie = false;
1121
1122        snap_item = iterator->tree_cursor;
1123
1124        if (hr == HBTRIE_RESULT_SUCCESS) {
1125            cmp = _fdb_key_cmp(iterator,
1126                               snap_item->header->key, snap_item->header->keylen,
1127                               iterator->_key, iterator->_keylen);
1128
1129            if (cmp == 0) {
1130                // same key exists in both HB+trie and WAL
1131                fdb_assert(snap_item->action != WAL_ACT_REMOVE,
1132                           snap_item->action, iterator->_offset);
1133                take_wal = true;
1134                discard_hbtrie = true;
1135            } else if (cmp < 0) { // key[WAL] < key[HB+trie]
1136                if (seek_pref == FDB_ITR_SEEK_HIGHER) {
1137                    // higher mode .. take smaller one (key[WAL]) first
1138                    take_wal = true;
1139                    discard_hbtrie = false;
1140                } else if (seek_pref == FDB_ITR_SEEK_LOWER) {
1141                    take_wal = false;
1142                    discard_hbtrie = false;
1143                    // In seek_to_max call with skip_max_key option,
1144                    if (next_op < 0) {
1145                        // if key[HB+trie] is the largest key
1146                        // smaller than max key,
1147                        // do not call prev() next.
1148                        if (iterator->end_key) {
1149                            cmp2 = _fdb_key_cmp(iterator,
1150                                                iterator->_key,
1151                                                iterator->_keylen,
1152                                                iterator->end_key,
1153                                                iterator->end_keylen);
1154                        } else {
1155                            cmp2 = -1;
1156                        }
1157                        if (cmp2 < 0) {
1158                            next_op = 0;
1159                        }
1160                    }
1161                }
1162            } else { // key[HB+trie] < key[WAL]
1163                if (seek_pref == FDB_ITR_SEEK_HIGHER) {
1164                    // higher mode .. take smaller one (key[HB+trie]) first
1165                    take_wal = false;
1166                    discard_hbtrie = false;
1167                    // In seek_to_min call with skip_min_key option,
1168                    if (next_op > 0) {
1169                        // if key[HB+trie] is the smallest key
1170                        // larger than min key,
1171                        // do not call next() next.
1172                        if (iterator->start_key) {
1173                            cmp2 = _fdb_key_cmp(iterator,
1174                                                iterator->start_key,
1175                                                iterator->start_keylen,
1176                                                iterator->_key,
1177                                                iterator->_keylen);
1178                        } else {
1179                            cmp2 = -1;
1180                        }
1181                        if (cmp2 < 0) {
1182                            next_op = 0;
1183                        }
1184                    }
1185                } else if (seek_pref == FDB_ITR_SEEK_LOWER) {
1186                    // lower mode .. discard smaller one (key[HB+trie])
1187                    take_wal = true;
1188                    discard_hbtrie = true;
1189                    // reset HB+trie's iterator to get the current
1190                    // key[HB+trie] one more time
1191                    hbtrie_iterator_free(iterator->hbtrie_iterator);
1192                    hbtrie_iterator_init(iterator->handle->trie,
1193                                         iterator->hbtrie_iterator,
1194                                         seek_key_kv, seek_keylen_kv);
1195                    iterator->_offset = BLK_NOT_FOUND;
1196                }
1197            }
1198        } else {
1199            // HB+trie seek fail (key[HB+trie] doesn't exist)
1200            take_wal = true;
1201            discard_hbtrie = true;
1202        }
1203
1204        if (take_wal) { // take key[WAL]
1205            if (!discard_hbtrie) { // do not skip the current key[HB+trie]
1206                // key[HB+trie] will be returned next time
1207                iterator->_offset = iterator->_get_offset;
1208            }
1209            iterator->_get_offset = snap_item->offset;
1210            iterator->_dhandle = iterator->handle->dhandle;
1211            iterator->status = FDB_ITR_WAL;
1212        }
1213    }
1214
1215    if (!iterator->_dhandle) {
1216        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1217        return FDB_RESULT_ITERATOR_FAIL;
1218    }
1219
1220    if (next_op < 0) {
1221        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1222        ret = fdb_iterator_prev(iterator);
1223    } else if (next_op > 0) {
1224        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1225        ret = fdb_iterator_next(iterator);
1226    } else {
1227        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1228        ret = FDB_RESULT_SUCCESS;
1229    }
1230
1231    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_SEEK);
1232    return ret;
1233}
1234
1235LIBFDB_API
1236fdb_status fdb_iterator_seek_to_min(fdb_iterator *iterator)
1237{
1238    if (!iterator || !iterator->handle) {
1239        return FDB_RESULT_INVALID_HANDLE;
1240    }
1241
1242    if (!iterator->_key) {
1243        return FDB_RESULT_INVALID_ARGS;
1244    }
1245
1246    size_t size_chunk = iterator->handle->config.chunksize;
1247    fdb_status ret;
1248    LATENCY_STAT_START();
1249
1250    // Initialize direction iteration to FORWARD just in case this function was
1251    // called right after fdb_iterator_init() so the cursor gets positioned
1252    // correctly
1253    iterator->direction = FDB_ITR_FORWARD;
1254    if (iterator->start_keylen > size_chunk) {
1255        fdb_iterator_seek_opt_t dir = (iterator->opt & FDB_ITR_SKIP_MIN_KEY) ?
1256                                      FDB_ITR_SEEK_HIGHER : FDB_ITR_SEEK_LOWER;
1257        fdb_status status = fdb_iterator_seek(iterator,
1258                (uint8_t *)iterator->start_key + size_chunk,
1259                iterator->start_keylen - size_chunk, dir);
1260        if (status != FDB_RESULT_SUCCESS && dir == FDB_ITR_SEEK_LOWER) {
1261            dir = FDB_ITR_SEEK_HIGHER;
1262            // It is possible that the min key specified during init does not
1263            // exist, so retry the seek with the HIGHER key
1264            return fdb_iterator_seek(iterator,
1265                (uint8_t *)iterator->start_key + size_chunk,
1266                iterator->start_keylen - size_chunk, dir);
1267        }
1268        return status;
1269    }
1270
1271    // reset HB+trie iterator using start key
1272    hbtrie_iterator_free(iterator->hbtrie_iterator);
1273    hbtrie_iterator_init(iterator->handle->trie, iterator->hbtrie_iterator,
1274                         iterator->start_key, iterator->start_keylen);
1275
1276    // reset WAL tree cursor using search because of the sharded nature of WAL
1277    if (iterator->tree_cursor_start) {
1278        iterator->tree_cursor_prev = iterator->tree_cursor =
1279                                     wal_itr_search_greater(iterator->wal_itr,
1280                                     iterator->tree_cursor_start);
1281        iterator->status = FDB_ITR_IDX; // WAL is already set
1282    }
1283
1284    ret = fdb_iterator_next(iterator);
1285    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_SEEK_MIN);
1286    return ret;
1287}
1288
1289fdb_status _fdb_iterator_seek_to_max_key(fdb_iterator *iterator) {
1290    int cmp;
1291
1292    if (!iterator || !iterator->handle) {
1293        return FDB_RESULT_INVALID_HANDLE;
1294    }
1295
1296    if (!iterator->_key) {
1297        return FDB_RESULT_INVALID_ARGS;
1298    }
1299
1300    size_t size_chunk = iterator->handle->config.chunksize;
1301
1302    // Initialize direction iteration to FORWARD just in case this function was
1303    // called right after fdb_iterator_init() so the cursor gets positioned
1304    // correctly
1305    iterator->direction = FDB_ITR_FORWARD;
1306    if (iterator->end_keylen > size_chunk) {
1307        fdb_iterator_seek_opt_t dir = (iterator->opt & FDB_ITR_SKIP_MAX_KEY) ?
1308                                      FDB_ITR_SEEK_LOWER : FDB_ITR_SEEK_HIGHER;
1309        fdb_status status = fdb_iterator_seek(iterator,
1310                (uint8_t *)iterator->end_key + size_chunk,
1311                iterator->end_keylen - size_chunk, dir);
1312
1313        if (status != FDB_RESULT_SUCCESS && dir == FDB_ITR_SEEK_HIGHER) {
1314            dir = FDB_ITR_SEEK_LOWER;
1315            // It is possible that the max key specified during init does not
1316            // exist, so retry the seek with the LOWER key
1317            return fdb_iterator_seek(iterator,
1318                    (uint8_t *)iterator->end_key + size_chunk,
1319                    iterator->end_keylen - size_chunk, dir);
1320        }
1321        return status;
1322    }
1323    iterator->direction = FDB_ITR_REVERSE; // only reverse iteration possible
1324
1325    if (iterator->end_key && iterator->end_keylen == size_chunk) {
1326        // end_key exists but end_keylen == size_id
1327        // it means that user doesn't assign end_key but
1328        // end_key is automatically assigned due to multi KVS mode.
1329
1330        // reset HB+trie's iterator using end_key
1331        hbtrie_iterator_free(iterator->hbtrie_iterator);
1332        hbtrie_iterator_init(iterator->handle->trie, iterator->hbtrie_iterator,
1333                             iterator->end_key, iterator->end_keylen);
1334        // get first key
1335        hbtrie_prev(iterator->hbtrie_iterator, iterator->_key,
1336                         &iterator->_keylen, (void*)&iterator->_offset);
1337        iterator->_offset = _endian_decode(iterator->_offset);
1338        cmp = _fdb_key_cmp(iterator,
1339                               iterator->end_key, iterator->end_keylen,
1340                               iterator->_key, iterator->_keylen);
1341        if (cmp < 0) {
1342            // returned key is larger than the end key .. skip
1343            iterator->_offset = BLK_NOT_FOUND;
1344        }
1345    } else {
1346        // move HB+trie iterator's cursor to the last entry
1347        hbtrie_last(iterator->hbtrie_iterator);
1348    }
1349
1350    // also move WAL tree's cursor to the last entry
1351    struct wal_item_header hdr;
1352    struct wal_item query;
1353    query.header = &hdr;
1354    hdr.key = iterator->end_key;
1355    hdr.keylen = iterator->end_keylen;
1356    iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
1357                                                   &query);
1358    iterator->tree_cursor_prev = iterator->tree_cursor;
1359    iterator->status = FDB_ITR_IDX;
1360
1361    return fdb_iterator_prev(iterator);
1362}
1363
1364fdb_status _fdb_iterator_seek_to_max_seq(fdb_iterator *iterator) {
1365    if (!iterator || !iterator->handle) {
1366        return FDB_RESULT_INVALID_HANDLE;
1367    }
1368
1369    iterator->direction = FDB_ITR_REVERSE; // only reverse iteration possible
1370    iterator->_seqnum = iterator->end_seqnum;
1371
1372    if (iterator->handle->kvs) {
1373        // create an iterator handle for hb-trie
1374        uint8_t *end_seq_kv = alca(uint8_t, sizeof(size_t)*2);
1375        fdb_kvs_id_t _kv_id = _endian_encode(iterator->handle->kvs->id);
1376        memcpy(end_seq_kv, &_kv_id, sizeof(size_t));
1377        memcpy(end_seq_kv + sizeof(size_t), &iterator->end_seqnum,
1378                sizeof(size_t));
1379
1380        // reset HB+trie's seqtrie iterator using end_seq_kv
1381        hbtrie_iterator_free(iterator->seqtrie_iterator);
1382        hbtrie_iterator_init(iterator->handle->seqtrie,
1383                             iterator->seqtrie_iterator,
1384                             end_seq_kv, sizeof(size_t)*2);
1385    } else {
1386        // reset Btree iterator to end_seqnum
1387        btree_iterator_free(iterator->seqtree_iterator);
1388        // create an iterator handle for b-tree
1389        btree_iterator_init(iterator->handle->seqtree,
1390                            iterator->seqtree_iterator,
1391                            (void *)(&iterator->end_seqnum));
1392    }
1393
1394    if (iterator->end_seqnum != SEQNUM_NOT_USED) {
1395        struct wal_item query;
1396        struct wal_item_header query_key;
1397        size_t size_seq = sizeof(fdb_seqnum_t);
1398        size_t size_chunk = iterator->handle->config.chunksize;
1399        uint8_t *end_seq_kv = alca(uint8_t, size_chunk + size_seq);
1400        if (iterator->handle->kvs) {
1401            query_key.key = end_seq_kv;
1402            kvid2buf(size_chunk, iterator->handle->kvs->id, end_seq_kv);
1403            memcpy(end_seq_kv + size_chunk, &iterator->end_seqnum, size_seq);
1404            query_key.keylen = size_chunk + size_seq;
1405        } else {
1406            query_key.key = (void *) NULL;
1407            query_key.keylen = 0;
1408        }
1409        query.header = &query_key;
1410        query.seqnum = iterator->end_seqnum;
1411
1412        // reset WAL tree cursor using search because of the sharded WAL
1413        iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
1414                                                       &query);
1415    } else { // no end_seqnum specified, just head to the last entry
1416        iterator->tree_cursor = wal_itr_last(iterator->wal_itr);
1417    }
1418
1419    if (iterator->tree_cursor) {
1420        struct wal_item *snap_item = iterator->tree_cursor;
1421        if (snap_item->seqnum == iterator->end_seqnum &&
1422            iterator->opt & FDB_ITR_SKIP_MAX_KEY) {
1423            iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
1424        }
1425    }
1426
1427    if (iterator->tree_cursor) {
1428        // If WAL tree has an entry, skip Main index for reverse iteration..
1429        iterator->_offset = iterator->tree_cursor->offset;
1430    } else {
1431        iterator->_offset = BLK_NOT_FOUND; // fetch from main index
1432    }
1433
1434    iterator->tree_cursor_prev = iterator->tree_cursor;
1435    return fdb_iterator_prev(iterator);
1436}
1437
1438LIBFDB_API
1439fdb_status fdb_iterator_seek_to_max(fdb_iterator *iterator)
1440{
1441    if (!iterator || !iterator->handle) {
1442        return FDB_RESULT_INVALID_HANDLE;
1443    }
1444
1445    fdb_status ret;
1446    LATENCY_STAT_START();
1447
1448    if (!iterator->hbtrie_iterator) {
1449        ret = _fdb_iterator_seek_to_max_seq(iterator);
1450    } else {
1451        ret = _fdb_iterator_seek_to_max_key(iterator);
1452    }
1453    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_SEEK_MAX);
1454    return ret;
1455}
1456
1457static fdb_status _fdb_iterator_seq_prev(fdb_iterator *iterator)
1458{
1459    size_t size_id, size_seq, seq_kv_len;
1460    uint8_t *seq_kv;
1461    uint64_t offset = BLK_NOT_FOUND;
1462    btree_result br = BTREE_RESULT_FAIL;
1463    hbtrie_result hr;
1464    struct docio_object _doc;
1465    struct docio_handle *dhandle;
1466    struct wal_item *snap_item = NULL;
1467    fdb_seqnum_t seqnum;
1468    fdb_kvs_id_t kv_id;
1469
1470    size_id = sizeof(fdb_kvs_id_t);
1471    size_seq = sizeof(fdb_seqnum_t);
1472    seq_kv = alca(uint8_t, size_id + size_seq);
1473
1474    if (iterator->direction != FDB_ITR_REVERSE) {
1475        if (iterator->status == FDB_ITR_IDX) {
1476            iterator->_offset = BLK_NOT_FOUND; // need to re-examine Trie/trees
1477        }
1478        // re-position WAL key to previous key returned using search because of
1479        // sharded nature of wal (we cannot directly assign prev to cursor)
1480        if (iterator->tree_cursor_prev &&
1481            iterator->tree_cursor != iterator->tree_cursor_prev) {
1482            iterator->tree_cursor = wal_itr_search_smaller(iterator->wal_itr,
1483                                                  iterator->tree_cursor_prev);
1484            iterator->status = FDB_ITR_IDX;
1485        } // else Don't move - seek()/init() has already positioned cursor
1486    }
1487
1488start_seq:
1489    seqnum = iterator->_seqnum;
1490    dhandle = iterator->handle->dhandle;
1491
1492    if (iterator->_offset == BLK_NOT_FOUND || // was iterating over btree
1493        !iterator->tree_cursor) { // WAL exhausted
1494        if (iterator->handle->kvs) { // multi KV instance mode
1495            hr = hbtrie_prev(iterator->seqtrie_iterator, seq_kv, &seq_kv_len,
1496                             (void *)&offset);
1497            if (hr == HBTRIE_RESULT_SUCCESS) {
1498                br = BTREE_RESULT_SUCCESS;
1499                buf2kvid(size_id, seq_kv, &kv_id);
1500                if (kv_id != iterator->handle->kvs->id) {
1501                    // iterator is beyond the boundary
1502                    br = BTREE_RESULT_FAIL;
1503                }
1504                memcpy(&seqnum, seq_kv + size_id, size_seq);
1505            } else {
1506                br = BTREE_RESULT_FAIL;
1507            }
1508        } else {
1509            br = btree_prev(iterator->seqtree_iterator, &seqnum,
1510                            (void *)&offset);
1511        }
1512        btreeblk_end(iterator->handle->bhandle);
1513        if (br == BTREE_RESULT_SUCCESS) {
1514            seqnum = _endian_decode(seqnum);
1515            iterator->_seqnum = seqnum;
1516            if (seqnum < iterator->start_seqnum) {
1517                return FDB_RESULT_ITERATOR_FAIL;
1518            }
1519            offset = _endian_decode(offset);
1520            iterator->status = FDB_ITR_IDX;
1521        } else {
1522            iterator->_offset = BLK_NOT_FOUND;
1523            // B-tree has no more items
1524            return FDB_RESULT_ITERATOR_FAIL;
1525        }
1526    } else while (iterator->tree_cursor) {
1527        if (iterator->status == FDB_ITR_WAL) {
1528            iterator->tree_cursor_prev = iterator->tree_cursor;
1529            iterator->tree_cursor = wal_itr_prev(iterator->wal_itr);
1530            if (!iterator->tree_cursor) {
1531                goto start_seq;
1532            }
1533        }// else don't move - seek()/ init() has already positioned cursor
1534
1535        iterator->status = FDB_ITR_WAL;
1536        // get the current item of avl tree
1537        snap_item = iterator->tree_cursor;
1538        uint8_t drop_logical_deletes =
1539            (snap_item->action == WAL_ACT_LOGICAL_REMOVE) &&
1540            (iterator->opt & FDB_ITR_NO_DELETES);
1541        if (snap_item->action == WAL_ACT_REMOVE ||
1542                drop_logical_deletes) {
1543            if (br == BTREE_RESULT_FAIL && !iterator->tree_cursor) {
1544                return FDB_RESULT_ITERATOR_FAIL;
1545            }
1546            // this key is removed .. get prev key[WAL]
1547            continue;
1548        }
1549
1550        offset = snap_item->offset;
1551        iterator->_offset = offset; // WAL is not exhausted, ignore B-Tree
1552        iterator->_seqnum = snap_item->seqnum;
1553        break;
1554    }
1555
1556    // To prevent returning duplicate items from sequence iterator, only return
1557    // those b-tree items that exist in HB-trie but not WAL
1558    // (WAL items should have already been returned in reverse iteration)
1559    if (br == BTREE_RESULT_SUCCESS) {
1560        fdb_doc doc_kv;
1561        _doc.key = NULL;
1562        _doc.length.keylen = 0;
1563        _doc.meta = NULL;
1564        _doc.body = NULL;
1565
1566        int64_t _offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
1567                                                  true);
1568        if (_offset <= 0) {
1569            return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
1570        }
1571        if (_doc.length.flag & DOCIO_DELETED &&
1572            (iterator->opt & FDB_ITR_NO_DELETES)) {
1573            free(_doc.key);
1574            free(_doc.meta);
1575            return FDB_RESULT_KEY_NOT_FOUND;
1576        }
1577
1578        doc_kv.key = _doc.key;
1579        doc_kv.keylen = _doc.length.keylen;
1580        doc_kv.seqnum = SEQNUM_NOT_USED;
1581        if (wal_find(iterator->handle->shandle->snap_txn,
1582                     iterator->handle->file,
1583                     &iterator->handle->shandle->cmp_info,
1584                     iterator->handle->shandle,
1585                     &doc_kv, (uint64_t *) &_offset) == FDB_RESULT_SUCCESS &&
1586                     iterator->start_seqnum <= doc_kv.seqnum &&
1587                     doc_kv.seqnum <= iterator->end_seqnum) {
1588            free(_doc.key);
1589            free(_doc.meta);
1590            goto start_seq; // B-tree item exists in WAL, skip for now
1591        }
1592        // Also look in HB-Trie to eliminate duplicates
1593        uint64_t hboffset;
1594        struct docio_object _hbdoc;
1595        hr = hbtrie_find(iterator->handle->trie, _doc.key, _doc.length.keylen,
1596                         (void *)&hboffset);
1597        btreeblk_end(iterator->handle->bhandle);
1598
1599        if (hr != HBTRIE_RESULT_SUCCESS) {
1600            free(_doc.key);
1601            free(_doc.meta);
1602            goto start_seq;
1603        } else { // If present in HB-trie ensure it's seqnum is in range
1604            int64_t _offset;
1605            _hbdoc.key = _doc.key;
1606            _hbdoc.meta = NULL;
1607            hboffset = _endian_decode(hboffset);
1608            _offset = docio_read_doc_key_meta(iterator->handle->dhandle,
1609                                              hboffset, &_hbdoc, true);
1610            if (_offset <= 0) {
1611                free(_doc.key);
1612                free(_doc.meta);
1613                return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
1614            }
1615
1616            if (_doc.seqnum < _hbdoc.seqnum &&
1617                _hbdoc.seqnum <= iterator->end_seqnum) {
1618                free(_doc.key);
1619                free(_doc.meta);
1620                free(_hbdoc.meta);
1621                goto start_seq;
1622            }
1623            free(_hbdoc.meta);
1624        }
1625        free(_doc.key);
1626        free(_doc.meta);
1627    }
1628
1629    iterator->_dhandle = dhandle; // store for fdb_iterator_get()
1630    iterator->_get_offset = offset; // store for fdb_iterator_get()
1631
1632    return FDB_RESULT_SUCCESS;
1633}
1634
1635static fdb_status _fdb_iterator_seq_next(fdb_iterator *iterator)
1636{
1637    size_t size_id, size_seq, seq_kv_len;
1638    uint8_t *seq_kv;
1639    uint64_t offset = BLK_NOT_FOUND;
1640    btree_result br = BTREE_RESULT_FAIL;
1641    hbtrie_result hr;
1642    struct docio_object _doc;
1643    struct docio_handle *dhandle;
1644    struct wal_item *snap_item = NULL;
1645    fdb_seqnum_t seqnum;
1646    fdb_kvs_id_t kv_id;
1647
1648    size_id = sizeof(fdb_kvs_id_t);
1649    size_seq = sizeof(fdb_seqnum_t);
1650    seq_kv = alca(uint8_t, size_id + size_seq);
1651
1652    if (iterator->direction != FDB_ITR_FORWARD) {
1653        if (iterator->status == FDB_ITR_IDX) {
1654            iterator->_offset = BLK_NOT_FOUND; // need to re-examine Trie/trees
1655        }
1656        // re-position WAL key to previous key returned
1657        if (iterator->tree_cursor_prev) {
1658            iterator->tree_cursor = wal_itr_search_greater(iterator->wal_itr,
1659                                    iterator->tree_cursor_prev);
1660            iterator->status = FDB_ITR_IDX;
1661        } // else Don't move - seek()/init() has already positioned cursor
1662    }
1663
1664start_seq:
1665    seqnum = iterator->_seqnum;
1666    dhandle = iterator->handle->dhandle;
1667
1668    // retrieve from sequence b-tree first
1669    if (iterator->_offset == BLK_NOT_FOUND) {
1670        if (iterator->handle->kvs) { // multi KV instance mode
1671            hr = hbtrie_next(iterator->seqtrie_iterator, seq_kv, &seq_kv_len,
1672                             (void *)&offset);
1673            if (hr == HBTRIE_RESULT_SUCCESS) {
1674                br = BTREE_RESULT_SUCCESS;
1675                buf2kvid(size_id, seq_kv, &kv_id);
1676                if (kv_id != iterator->handle->kvs->id) {
1677                    // iterator is beyond the boundary
1678                    br = BTREE_RESULT_FAIL;
1679                }
1680                memcpy(&seqnum, seq_kv + size_id, size_seq);
1681            } else {
1682                br = BTREE_RESULT_FAIL;
1683            }
1684        } else {
1685            br = btree_next(iterator->seqtree_iterator, &seqnum,
1686                            (void *)&offset);
1687        }
1688        btreeblk_end(iterator->handle->bhandle);
1689        if (br == BTREE_RESULT_SUCCESS) {
1690            seqnum = _endian_decode(seqnum);
1691            iterator->_seqnum = seqnum;
1692            if (seqnum > iterator->end_seqnum) {
1693                return FDB_RESULT_ITERATOR_FAIL;
1694            }
1695            offset = _endian_decode(offset);
1696            iterator->_offset = BLK_NOT_FOUND; // continue with B-tree
1697            iterator->status = FDB_ITR_IDX;
1698        }
1699    }
1700
1701    if (br == BTREE_RESULT_FAIL) {
1702        if (iterator->tree_cursor == NULL) {
1703            return FDB_RESULT_ITERATOR_FAIL;
1704        } else {
1705            while (iterator->tree_cursor) {
1706                if (iterator->status == FDB_ITR_WAL) {
1707                    // save the current point for direction change
1708                    iterator->tree_cursor_prev = iterator->tree_cursor;
1709                    iterator->tree_cursor = wal_itr_next(iterator->wal_itr);
1710                    if (!iterator->tree_cursor) {
1711                        return FDB_RESULT_ITERATOR_FAIL;
1712                    }
1713                }// else don't move - seek()/ init() already positioned cursor
1714                // get the current item of WAL tree
1715                iterator->status = FDB_ITR_WAL;
1716                snap_item = iterator->tree_cursor;
1717                uint8_t drop_logical_deletes =
1718                    (snap_item->action == WAL_ACT_LOGICAL_REMOVE) &&
1719                    (iterator->opt & FDB_ITR_NO_DELETES);
1720                if (snap_item->action == WAL_ACT_REMOVE ||
1721                    drop_logical_deletes) {
1722                    if (br == BTREE_RESULT_FAIL && !iterator->tree_cursor) {
1723                        return FDB_RESULT_ITERATOR_FAIL;
1724                    }
1725                    // this key is removed .. get next key[WAL]
1726                    continue;
1727                }
1728                if (snap_item->seqnum < iterator->_seqnum) {
1729                    // smaller than the current seqnum .. get next key[WAL]
1730                    continue;
1731                }
1732                if (snap_item->seqnum > iterator->end_seqnum) {
1733                    // out-of-range .. iterator terminates
1734                    return FDB_RESULT_ITERATOR_FAIL;
1735                }
1736
1737                offset = snap_item->offset;
1738                iterator->_offset = offset; // stops b-tree lookups. favor wal
1739                iterator->_seqnum = snap_item->seqnum;
1740                break;
1741            }
1742        }
1743    }
1744
1745    // To prevent returning duplicate items from sequence iterator, only return
1746    // those b-tree items that exist in HB-trie but not WAL (visit WAL later)
1747    if (br == BTREE_RESULT_SUCCESS) {
1748        _doc.key = NULL;
1749        _doc.length.keylen = 0;
1750        _doc.meta = NULL;
1751        _doc.body = NULL;
1752
1753        fdb_doc doc_kv;
1754        int64_t _offset = docio_read_doc_key_meta(dhandle, offset, &_doc,
1755                                                  true);
1756        if (_offset <= 0) {
1757            return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
1758        }
1759        if (_doc.length.flag & DOCIO_DELETED && (iterator->opt & FDB_ITR_NO_DELETES)) {
1760            free(_doc.key);
1761            free(_doc.meta);
1762            return FDB_RESULT_KEY_NOT_FOUND;
1763        }
1764        doc_kv.key = _doc.key;
1765        doc_kv.keylen = _doc.length.keylen;
1766        doc_kv.seqnum = SEQNUM_NOT_USED; // search by key not seqnum
1767        if (wal_find(iterator->handle->shandle->snap_txn,
1768                    iterator->handle->file,
1769                    &iterator->handle->shandle->cmp_info,
1770                    iterator->handle->shandle,
1771                     &doc_kv, (uint64_t *) &_offset) == FDB_RESULT_SUCCESS &&
1772                iterator->start_seqnum <= doc_kv.seqnum &&
1773                doc_kv.seqnum <= iterator->end_seqnum) {
1774            free(_doc.key);
1775            free(_doc.meta);
1776            goto start_seq; // B-tree item exists in WAL, skip for now
1777        }
1778        // Also look in HB-Trie to eliminate duplicates
1779        uint64_t hboffset;
1780        struct docio_object _hbdoc;
1781        hr = hbtrie_find(iterator->handle->trie, _doc.key, _doc.length.keylen,
1782                         (void *)&hboffset);
1783        btreeblk_end(iterator->handle->bhandle);
1784
1785        if (hr != HBTRIE_RESULT_SUCCESS) {
1786            free(_doc.key);
1787            free(_doc.meta);
1788            goto start_seq;
1789        } else { // If present in HB-trie ensure it's seqnum is in range
1790            int64_t _offset;
1791            _hbdoc.key = _doc.key;
1792            _hbdoc.meta = NULL;
1793            hboffset = _endian_decode(hboffset);
1794            _offset = docio_read_doc_key_meta(iterator->handle->dhandle,
1795                                              hboffset, &_hbdoc,
1796                                              true);
1797            if (_offset <= 0) {
1798                free(_doc.key);
1799                free(_doc.meta);
1800                return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
1801            }
1802            if (_doc.seqnum < _hbdoc.seqnum &&
1803                _hbdoc.seqnum <= iterator->end_seqnum) {
1804                free(_doc.key);
1805                free(_doc.meta);
1806                free(_hbdoc.meta);
1807                goto start_seq;
1808            }
1809            free(_hbdoc.meta);
1810        }
1811        free(_doc.key);
1812        free(_doc.meta);
1813    }
1814
1815    iterator->_dhandle = dhandle; // store for fdb_iterator_get
1816    iterator->_get_offset = offset; // store for fdb_iterator_get
1817
1818    return FDB_RESULT_SUCCESS;
1819}
1820
1821LIBFDB_API
1822fdb_status fdb_iterator_prev(fdb_iterator *iterator)
1823{
1824    if (!iterator || !iterator->handle) {
1825        return FDB_RESULT_INVALID_HANDLE;
1826    }
1827
1828    fdb_status result = FDB_RESULT_SUCCESS;
1829    LATENCY_STAT_START();
1830
1831    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
1832        return FDB_RESULT_HANDLE_BUSY;
1833    }
1834
1835    if (iterator->hbtrie_iterator) {
1836        while ((result = _fdb_iterator_prev(iterator)) ==
1837                FDB_RESULT_KEY_NOT_FOUND);
1838    } else {
1839        while ((result = _fdb_iterator_seq_prev(iterator)) ==
1840                FDB_RESULT_KEY_NOT_FOUND);
1841    }
1842    if (result == FDB_RESULT_SUCCESS) {
1843        iterator->direction = FDB_ITR_REVERSE;
1844    } else {
1845        iterator->_dhandle = NULL; // fail fdb_iterator_get also
1846        if (iterator->direction != FDB_ITR_DIR_NONE) {
1847            if ((iterator->seqtree_iterator || iterator->seqtrie_iterator) &&
1848                    iterator->status == FDB_ITR_IDX) {
1849                iterator->_offset = BLK_NOT_FOUND;
1850            }
1851        }
1852    }
1853
1854    atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1855    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_moves,
1856                         std::memory_order_relaxed);
1857    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_PREV);
1858    return result;
1859}
1860
1861LIBFDB_API
1862fdb_status fdb_iterator_next(fdb_iterator *iterator)
1863{
1864    if (!iterator || !iterator->handle) {
1865        return FDB_RESULT_INVALID_HANDLE;
1866    }
1867
1868    fdb_status result = FDB_RESULT_SUCCESS;
1869    LATENCY_STAT_START();
1870
1871    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
1872        return FDB_RESULT_HANDLE_BUSY;
1873    }
1874
1875    if (iterator->hbtrie_iterator) {
1876        while ((result = _fdb_iterator_next(iterator)) ==
1877                FDB_RESULT_KEY_NOT_FOUND);
1878    } else {
1879        while ((result = _fdb_iterator_seq_next(iterator)) ==
1880                FDB_RESULT_KEY_NOT_FOUND);
1881    }
1882    if (result == FDB_RESULT_SUCCESS) {
1883        iterator->direction = FDB_ITR_FORWARD;
1884    } else {
1885        iterator->_dhandle = NULL; // fail fdb_iterator_get also
1886        if (iterator->direction != FDB_ITR_DIR_NONE) {
1887            if ((iterator->seqtree_iterator || iterator->seqtrie_iterator) &&
1888                    iterator->status == FDB_ITR_IDX) {
1889                iterator->_offset = BLK_NOT_FOUND;
1890            }
1891        }
1892    }
1893
1894    atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1895    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_moves,
1896                         std::memory_order_relaxed);
1897    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_NEXT);
1898    return result;
1899}
1900
1901// DOC returned by this function must be freed by fdb_doc_free
1902// if it was allocated because the incoming doc was pointing to NULL
1903LIBFDB_API
1904fdb_status fdb_iterator_get(fdb_iterator *iterator, fdb_doc **doc)
1905{
1906    if (!iterator || !iterator->handle) {
1907        return FDB_RESULT_INVALID_HANDLE;
1908    }
1909
1910    if (!doc) {
1911        return FDB_RESULT_INVALID_ARGS;
1912    }
1913
1914    struct docio_object _doc;
1915    fdb_status ret = FDB_RESULT_SUCCESS;
1916    uint64_t offset;
1917    struct docio_handle *dhandle;
1918    size_t size_chunk = iterator->handle->config.chunksize;
1919    bool alloced_key, alloced_meta, alloced_body;
1920    LATENCY_STAT_START();
1921
1922    dhandle = iterator->_dhandle;
1923    if (!dhandle || iterator->_get_offset == BLK_NOT_FOUND) {
1924        return FDB_RESULT_ITERATOR_FAIL;
1925    }
1926
1927    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
1928        return FDB_RESULT_HANDLE_BUSY;
1929    }
1930
1931    offset = iterator->_get_offset;
1932
1933    if (*doc == NULL) {
1934        ret = fdb_doc_create(doc, NULL, 0, NULL, 0, NULL, 0);
1935        if (ret != FDB_RESULT_SUCCESS) { // LCOV_EXCL_START
1936            atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1937            return ret;
1938        } // LCOV_EXCL_STOP
1939        _doc.key = NULL;
1940        _doc.length.keylen = 0;
1941        _doc.meta = NULL;
1942        _doc.body = NULL;
1943        alloced_key = true;
1944        alloced_meta = true;
1945        alloced_body = true;
1946    } else {
1947        _doc.key = (*doc)->key;
1948        _doc.meta = (*doc)->meta;
1949        _doc.body = (*doc)->body;
1950        alloced_key = _doc.key ? false : true;
1951        alloced_meta = _doc.meta ? false : true;
1952        alloced_body = _doc.body ? false : true;
1953    }
1954
1955    int64_t _offset = docio_read_doc(dhandle, offset, &_doc, true);
1956    if (_offset <= 0) {
1957        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1958        return _offset < 0 ? (fdb_status) _offset : FDB_RESULT_KEY_NOT_FOUND;
1959    }
1960    if (_doc.length.flag & DOCIO_DELETED &&
1961        (iterator->opt & FDB_ITR_NO_DELETES)) {
1962        if (alloced_key) {
1963            free(_doc.key);
1964        }
1965        if (alloced_meta) {
1966            free(_doc.meta);
1967        }
1968        if (alloced_body) {
1969            free(_doc.body);
1970        }
1971        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1972        return FDB_RESULT_KEY_NOT_FOUND;
1973    }
1974
1975    if (iterator->handle->kvs) {
1976        // eliminate KV ID from key
1977        _doc.length.keylen -= size_chunk;
1978        memmove(_doc.key, (uint8_t*)_doc.key + size_chunk, _doc.length.keylen);
1979    }
1980
1981    if (alloced_key) {
1982        (*doc)->key = _doc.key;
1983    }
1984    if (alloced_meta) {
1985        (*doc)->meta = _doc.meta;
1986    }
1987    if (alloced_body) {
1988        (*doc)->body = _doc.body;
1989    }
1990    (*doc)->keylen = _doc.length.keylen;
1991    (*doc)->metalen = _doc.length.metalen;
1992    (*doc)->bodylen = _doc.length.bodylen;
1993    (*doc)->seqnum = _doc.seqnum;
1994    (*doc)->deleted = _doc.length.flag & DOCIO_DELETED;
1995    (*doc)->offset = offset;
1996
1997    atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
1998    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_gets,
1999                         std::memory_order_relaxed);
2000    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_GET);
2001    return ret;
2002}
2003
2004// DOC returned by this function must be freed using 'fdb_doc_free'
2005LIBFDB_API
2006fdb_status fdb_iterator_get_metaonly(fdb_iterator *iterator, fdb_doc **doc)
2007{
2008    if (!iterator || !iterator->handle) {
2009        return FDB_RESULT_INVALID_HANDLE;
2010    }
2011
2012    if (!doc) {
2013        return FDB_RESULT_INVALID_ARGS;
2014    }
2015
2016    struct docio_object _doc;
2017    fdb_status ret = FDB_RESULT_SUCCESS;
2018    uint64_t offset;
2019    int64_t _offset;
2020    struct docio_handle *dhandle;
2021    size_t size_chunk = iterator->handle->config.chunksize;
2022    bool alloced_key, alloced_meta;
2023    LATENCY_STAT_START();
2024
2025    dhandle = iterator->_dhandle;
2026    if (!dhandle || iterator->_get_offset == BLK_NOT_FOUND) {
2027        return FDB_RESULT_ITERATOR_FAIL;
2028    }
2029
2030    if (!atomic_cas_uint8_t(&iterator->handle->handle_busy, 0, 1)) {
2031        return FDB_RESULT_HANDLE_BUSY;
2032    }
2033
2034    offset = iterator->_get_offset;
2035
2036    if (*doc == NULL) {
2037        ret = fdb_doc_create(doc, NULL, 0, NULL, 0, NULL, 0);
2038        if (ret != FDB_RESULT_SUCCESS) { // LCOV_EXCL_START
2039            atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2040            return ret;
2041        } // LCOV_EXCL_STOP
2042        _doc.key = NULL;
2043        _doc.length.keylen = 0;
2044        _doc.meta = NULL;
2045        _doc.body = NULL;
2046        alloced_key = true;
2047        alloced_meta = true;
2048    } else {
2049        _doc.key = (*doc)->key;
2050        _doc.meta = (*doc)->meta;
2051        _doc.body = NULL;
2052        alloced_key = _doc.key ? false : true;
2053        alloced_meta = _doc.meta ? false : true;
2054    }
2055
2056    _offset = docio_read_doc_key_meta(dhandle, offset, &_doc, true);
2057    if (_offset <= 0) {
2058        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2059        return _offset < 0 ? (fdb_status)_offset : FDB_RESULT_KEY_NOT_FOUND;
2060    }
2061    if (_doc.length.flag & DOCIO_DELETED &&
2062            (iterator->opt & FDB_ITR_NO_DELETES)) {
2063        if (alloced_key) {
2064            free(_doc.key);
2065        }
2066        if (alloced_meta) {
2067            free(_doc.meta);
2068        }
2069        atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2070        return FDB_RESULT_KEY_NOT_FOUND;
2071    }
2072
2073    if (iterator->handle->kvs) {
2074        // eliminate KV ID from key
2075        _doc.length.keylen -= size_chunk;
2076        memmove(_doc.key, (uint8_t*)_doc.key + size_chunk, _doc.length.keylen);
2077    }
2078    if (alloced_key) {
2079        (*doc)->key = _doc.key;
2080    }
2081    if (alloced_meta) {
2082        (*doc)->meta = _doc.meta;
2083    }
2084    (*doc)->keylen = _doc.length.keylen;
2085    (*doc)->metalen = _doc.length.metalen;
2086    (*doc)->bodylen = _doc.length.bodylen;
2087    (*doc)->seqnum = _doc.seqnum;
2088    (*doc)->deleted = _doc.length.flag & DOCIO_DELETED;
2089    (*doc)->offset = offset;
2090
2091    atomic_cas_uint8_t(&iterator->handle->handle_busy, 1, 0);
2092    atomic_incr_uint64_t(&iterator->handle->op_stats->num_iterator_gets,
2093                         std::memory_order_relaxed);
2094    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_GET_META);
2095    return ret;
2096}
2097
2098LIBFDB_API
2099fdb_status fdb_iterator_close(fdb_iterator *iterator)
2100{
2101    if (!iterator || !iterator->handle) {
2102        return FDB_RESULT_INVALID_HANDLE;
2103    }
2104
2105    LATENCY_STAT_START();
2106    if (iterator->hbtrie_iterator) {
2107        hbtrie_iterator_free(iterator->hbtrie_iterator);
2108        free(iterator->hbtrie_iterator);
2109    }
2110
2111    if (iterator->seqtree_iterator) {
2112        btree_iterator_free(iterator->seqtree_iterator);
2113        free(iterator->seqtree_iterator);
2114    }
2115    if (iterator->seqtrie_iterator) {
2116        hbtrie_iterator_free(iterator->seqtrie_iterator);
2117        free(iterator->seqtrie_iterator);
2118    }
2119
2120    if (iterator->start_key) {
2121        free(iterator->start_key);
2122    }
2123
2124    if (iterator->end_key) {
2125        free(iterator->end_key);
2126    }
2127
2128    --iterator->handle->num_iterators; // Decrement the iterator counter of the KV handle
2129    wal_itr_close(iterator->wal_itr);
2130
2131    LATENCY_STAT_END(iterator->handle->file, FDB_LATENCY_ITR_CLOSE);
2132
2133    if (!iterator->snapshot_handle) {
2134        // Close the opened handle in the iterator,
2135        // if the handle is not for snapshot.
2136        fdb_status fs = fdb_kvs_close(iterator->handle);
2137        if (fs != FDB_RESULT_SUCCESS) {
2138            fdb_log(&iterator->handle->log_callback, fs,
2139                    "Failed to close the KV Store from a database file '%s' as "
2140                    "part of closing the iterator",
2141                    iterator->handle->file->filename);
2142        }
2143    }
2144
2145    free(iterator->_key);
2146    free(iterator);
2147    return FDB_RESULT_SUCCESS;
2148}
2149