xref: /6.0.3/forestdb/src/wal.cc (revision 56236603)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdlib.h>
19#include <string.h>
20#include <stdint.h>
21
22#include "filemgr.h"
23#include "common.h"
24#include "hash.h"
25#include "docio.h"
26#include "wal.h"
27#include "hash_functions.h"
28#include "fdb_internal.h"
29
30#include "memleak.h"
31
32
33#ifdef __DEBUG
34#ifndef __DEBUG_WAL
35    #undef DBG
36    #undef DBGCMD
37    #undef DBGSW
38    #define DBG(...)
39    #define DBGCMD(...)
40    #define DBGSW(n, ...)
41#else
42# include "debug.h"
43#endif
44#endif
45
46INLINE int _wal_keycmp(void *key1, size_t keylen1, void *key2, size_t keylen2)
47{
48    if (keylen1 == keylen2) {
49        return memcmp(key1, key2, keylen1);
50    } else {
51        size_t len = MIN(keylen1, keylen2);
52        int cmp = memcmp(key1, key2, len);
53        if (cmp != 0) return cmp;
54        else {
55            return (int)((int)keylen1 - (int)keylen2);
56        }
57    }
58}
59
60INLINE int __wal_cmp_bykey(struct wal_item_header *aa,
61                           struct wal_item_header *bb,
62                           void *aux)
63{
64    struct _fdb_key_cmp_info *info = (struct _fdb_key_cmp_info *)aux;
65    if (info->kvs_config.custom_cmp) {
66        // custom compare function for variable-length key
67        if (info->kvs) {
68            // multi KV instance mode
69            // KV ID should be compared separately
70            size_t size_chunk = info->kvs->root->config.chunksize;
71            fdb_kvs_id_t a_id, b_id;
72            buf2kvid(size_chunk, aa->key, &a_id);
73            buf2kvid(size_chunk, bb->key, &b_id);
74
75            if (a_id < b_id) {
76                return -1;
77            } else if (a_id > b_id) {
78                return 1;
79            } else {
80                return info->kvs_config.custom_cmp(
81                            (uint8_t*)aa->key + size_chunk,
82                            aa->keylen - size_chunk,
83                            (uint8_t*)bb->key + size_chunk,
84                            bb->keylen - size_chunk);
85            }
86        } else {
87            return info->kvs_config.custom_cmp(aa->key, aa->keylen,
88                                               bb->key, bb->keylen);
89        }
90    } else {
91        return _wal_keycmp(aa->key, aa->keylen, bb->key, bb->keylen);
92    }
93}
94
95INLINE int _wal_cmp_bykey(struct avl_node *a, struct avl_node *b, void *aux)
96{
97    struct wal_item_header *aa, *bb;
98    aa = _get_entry(a, struct wal_item_header, avl_key);
99    bb = _get_entry(b, struct wal_item_header, avl_key);
100    return __wal_cmp_bykey(aa, bb, aux);
101}
102
103INLINE int _merge_cmp_bykey(struct avl_node *a, struct avl_node *b, void *aux)
104{
105    struct wal_cursor *aa, *bb;
106    aa = _get_entry(a, struct wal_cursor, avl_merge);
107    bb = _get_entry(b, struct wal_cursor, avl_merge);
108    return __wal_cmp_bykey(aa->item->header, bb->item->header, aux);
109}
110
111INLINE int _snap_cmp_bykey(struct avl_node *a, struct avl_node *b, void *aux)
112{
113    struct wal_item *aa, *bb;
114    aa = _get_entry(a, struct wal_item, avl_keysnap);
115    bb = _get_entry(b, struct wal_item, avl_keysnap);
116    return __wal_cmp_bykey(aa->header, bb->header, aux);
117}
118
119INLINE int __wal_cmp_byseq(struct wal_item *aa, struct wal_item *bb) {
120    if (aa->flag & WAL_ITEM_MULTI_KV_INS_MODE) {
121        // multi KV instance mode
122        int size_chunk = aa->header->chunksize;
123        fdb_kvs_id_t id_aa, id_bb;
124        // KV ID is stored at the first 8 bytes in the key
125        buf2kvid(size_chunk, aa->header->key, &id_aa);
126        buf2kvid(size_chunk, bb->header->key, &id_bb);
127        if (id_aa < id_bb) {
128            return -1;
129        } else if (id_aa > id_bb) {
130            return 1;
131        } else {
132            return _CMP_U64(aa->seqnum, bb->seqnum);
133        }
134    }
135    return _CMP_U64(aa->seqnum, bb->seqnum);
136}
137
138INLINE int _wal_cmp_byseq(struct avl_node *a, struct avl_node *b, void *aux)
139{
140    struct wal_item *aa, *bb;
141    aa = _get_entry(a, struct wal_item, avl_seq);
142    bb = _get_entry(b, struct wal_item, avl_seq);
143    return __wal_cmp_byseq(aa, bb);
144}
145
146INLINE int _merge_cmp_byseq(struct avl_node *a, struct avl_node *b, void *aux)
147{
148    struct wal_cursor *aa, *bb;
149    aa = _get_entry(a, struct wal_cursor, avl_merge);
150    bb = _get_entry(b, struct wal_cursor, avl_merge);
151    return __wal_cmp_byseq(aa->item, bb->item);
152}
153
154INLINE int _wal_snap_cmp(struct avl_node *a, struct avl_node *b, void *aux)
155{
156    struct snap_handle *aa, *bb;
157    aa = _get_entry(a, struct snap_handle, avl_id);
158    bb = _get_entry(b, struct snap_handle, avl_id);
159
160    if (aa->id < bb->id) { // first compare by kv id
161        return -1;
162    } else if (aa->id > bb->id) {
163        return 1;
164    } else { // within same kv store compare by snapshot id
165        if (aa->snap_tag_idx < bb->snap_tag_idx) {
166            return -1;
167        } else if (aa->snap_tag_idx > bb->snap_tag_idx) {
168            return 1;
169        }
170    }
171    return 0;
172}
173
174fdb_status wal_init(struct filemgr *file, int nbucket)
175{
176    size_t num_shards;
177    atomic_init_uint8_t(&file->wal->isPopulated, 0);
178    file->wal->flag = WAL_FLAG_INITIALIZED;
179    atomic_init_uint32_t(&file->wal->size, 0);
180    atomic_init_uint32_t(&file->wal->num_flushable, 0);
181    atomic_init_uint64_t(&file->wal->datasize, 0);
182    atomic_init_uint64_t(&file->wal->mem_overhead, 0);
183    file->wal->wal_dirty = FDB_WAL_CLEAN;
184
185    list_init(&file->wal->txn_list);
186    spin_init(&file->wal->lock);
187
188    if (file->config->num_wal_shards) {
189        file->wal->num_shards = file->config->num_wal_shards;
190    } else {
191        file->wal->num_shards = DEFAULT_NUM_WAL_PARTITIONS;
192    }
193
194    num_shards = wal_get_num_shards(file);
195    file->wal->key_shards = (wal_shard *)
196        malloc(sizeof(struct wal_shard) * num_shards);
197
198    if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
199        file->wal->seq_shards = (wal_shard *)
200            malloc(sizeof(struct wal_shard) * num_shards);
201    } else {
202        file->wal->seq_shards = NULL;
203    }
204
205    for (int i = num_shards - 1; i >= 0; --i) {
206        avl_init(&file->wal->key_shards[i]._map, NULL);
207        spin_init(&file->wal->key_shards[i].lock);
208        if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
209            avl_init(&file->wal->seq_shards[i]._map, NULL);
210            spin_init(&file->wal->seq_shards[i].lock);
211        }
212    }
213
214    avl_init(&file->wal->wal_snapshot_tree, NULL);
215
216    DBG("wal item size %ld\n", sizeof(struct wal_item));
217    return FDB_RESULT_SUCCESS;
218}
219
220fdb_status wal_destroy(struct filemgr *file)
221{
222    size_t i = 0;
223    size_t num_shards = wal_get_num_shards(file);
224    // Free all WAL shards
225    for (; i < num_shards; ++i) {
226        spin_destroy(&file->wal->key_shards[i].lock);
227        if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
228            spin_destroy(&file->wal->seq_shards[i].lock);
229        }
230    }
231    spin_destroy(&file->wal->lock);
232    free(file->wal->key_shards);
233    if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
234        free(file->wal->seq_shards);
235    }
236    return FDB_RESULT_SUCCESS;
237}
238
239int wal_is_initialized(struct filemgr *file)
240{
241    return file->wal->flag & WAL_FLAG_INITIALIZED;
242}
243
244INLINE struct snap_handle * _wal_get_latest_snapshot(struct wal *_wal,
245                                                     fdb_kvs_id_t kv_id)
246{
247    struct avl_node *node;
248    struct snap_handle query, *shandle;
249    // In order to get the highest snapshot id in this kv store..
250    query.snap_tag_idx = 0; // search for snapshot id smaller than the smallest
251    query.id = kv_id + 1;  // in the next kv store.
252    node = avl_search_smaller(&_wal->wal_snapshot_tree, &query.avl_id,
253                              _wal_snap_cmp);
254    if (node) {
255        shandle = _get_entry(node, struct snap_handle, avl_id);
256        if (shandle->id == kv_id) {
257            return shandle;
258        }
259    }
260    return NULL;
261}
262
263INLINE struct snap_handle *_wal_snapshot_create(fdb_kvs_id_t kv_id,
264                                                wal_snapid_t snap_tag,
265                                                wal_snapid_t snap_flush_tag)
266{
267   struct snap_handle *shandle = (struct snap_handle *)
268                                   calloc(1, sizeof(struct snap_handle));
269   if (shandle) {
270       shandle->id = kv_id;
271       shandle->snap_tag_idx = snap_tag;
272       shandle->snap_stop_idx = snap_flush_tag;
273       atomic_init_uint16_t(&shandle->ref_cnt_kvs, 0);
274       atomic_init_uint64_t(&shandle->wal_ndocs, 0);
275       return shandle;
276   }
277   return NULL;
278}
279
280// When a snapshot reader has called wal_snapshot_open(), the ref count
281// on the snapshot handle will be incremented
282INLINE bool _wal_snap_is_immutable(struct snap_handle *shandle) {
283    return atomic_get_uint16_t(&shandle->ref_cnt_kvs);
284}
285
286/**
287 * Returns highest mutable snapshot or creates one if...
288 * No snapshot exists (First item for a given kv store is inserted)
289 * If the highest snapshot was made immutable by snapshot_open (Write barrier)
290 * If the highest snapshot was made un-readable by wal_flush (Read barrier)
291 */
292INLINE struct snap_handle * _wal_fetch_snapshot(struct wal *_wal,
293                                                fdb_kvs_id_t kv_id)
294{
295    struct snap_handle *open_snapshot;
296    wal_snapid_t snap_id, snap_flush_id = 0;
297    spin_lock(&_wal->lock);
298    open_snapshot = _wal_get_latest_snapshot(_wal, kv_id);
299    if (!open_snapshot || // if first WAL item inserted for KV store
300        _wal_snap_is_immutable(open_snapshot) ||//Write barrier (snapshot_open)
301        open_snapshot->is_flushed) { // wal_flushed (read-write barrier)
302        if (!open_snapshot) {
303            snap_id = 1; // begin snapshots id at 1
304            snap_flush_id = 0; // all past elements can be returned
305            DBG("Fresh KV id %" _F64 " Snapshot %" _F64 "- %" _F64"\n",
306                kv_id, snap_flush_id, snap_id);
307        } else { // read/write barrier means a new WAL snapshot gets created
308            snap_id = open_snapshot->snap_tag_idx + 1;
309            if (!open_snapshot->is_flushed) { // Write barrier only
310                snap_flush_id = open_snapshot->snap_stop_idx;
311                DBG("Write Barrier WAL KV id %" _F64 " Snapshot %" _F64
312                    " - %" _F64 "\n", kv_id, snap_flush_id, snap_id);
313            } else { // WAL flushed! Read & Write barrier
314                snap_flush_id = open_snapshot->snap_tag_idx;
315                DBG("Read-Write Barrier WAL KV id %" _F64 " Snapshot %" _F64
316                    "- %" _F64 "\n",
317                    kv_id, snap_flush_id, snap_id);
318            }
319        }
320        open_snapshot = _wal_snapshot_create(kv_id, snap_id, snap_flush_id);
321        avl_insert(&_wal->wal_snapshot_tree, &open_snapshot->avl_id,
322                   _wal_snap_cmp);
323    }
324    // Increment ndocs for garbage collection of the snapshot
325    // When no more docs refer to a snapshot, it can be safely deleted
326    atomic_incr_uint64_t(&open_snapshot->wal_ndocs);
327    spin_unlock(&_wal->lock);
328    return open_snapshot;
329}
330
331INLINE fdb_status _wal_snapshot_init(struct snap_handle *shandle,
332                                     filemgr *file,
333                                     fdb_txn *txn,
334                                     fdb_seqnum_t seqnum,
335                                     _fdb_key_cmp_info *key_cmp_info)
336{
337    struct list_elem *ee;
338    shandle->snap_txn = txn;
339    shandle->cmp_info = *key_cmp_info;
340    atomic_incr_uint16_t(&shandle->ref_cnt_kvs);
341    _kvs_stat_get(file, shandle->id, &shandle->stat);
342    if (seqnum == FDB_SNAPSHOT_INMEM) {
343        shandle->seqnum = fdb_kvs_get_seqnum(file, shandle->id);
344        shandle->is_persisted_snapshot = false;
345    } else {
346        shandle->stat.wal_ndocs = 0; // WAL copy will populate
347        shandle->stat.wal_ndeletes = 0; // these 2 stats
348        shandle->seqnum = seqnum;
349        shandle->is_persisted_snapshot = true;
350    }
351    avl_init(&shandle->key_tree, &shandle->cmp_info);
352    avl_init(&shandle->seq_tree, NULL);
353    shandle->global_txn = &file->global_txn;
354    list_init(&shandle->active_txn_list);
355    ee = list_begin(&file->wal->txn_list);
356    while (ee) {
357        struct wal_txn_wrapper *txn_wrapper;
358        fdb_txn *active_txn;
359        txn_wrapper = _get_entry(ee, struct wal_txn_wrapper, le);
360        active_txn = txn_wrapper->txn;
361        // except for global_txn
362        if (active_txn != &file->global_txn) {
363            txn_wrapper = (struct wal_txn_wrapper *)
364                calloc(1, sizeof(struct wal_txn_wrapper));
365            txn_wrapper->txn_id = active_txn->txn_id;
366            list_push_front(&shandle->active_txn_list, &txn_wrapper->le);
367        }
368        ee = list_next(ee);
369    }
370
371    return FDB_RESULT_SUCCESS;
372}
373
374fdb_status wal_snapshot_open(struct filemgr *file,
375                             fdb_txn *txn,
376                             fdb_kvs_id_t kv_id,
377                             fdb_seqnum_t seqnum,
378                             _fdb_key_cmp_info *key_cmp_info,
379                             struct snap_handle **shandle)
380{
381    struct wal *_wal = file->wal;
382    struct snap_handle *_shandle;
383
384    spin_lock(&_wal->lock);
385    _shandle = _wal_get_latest_snapshot(_wal, kv_id);
386    if (!_shandle || // No item exist in WAL for this KV Store
387        !atomic_get_uint64_t(&_shandle->wal_ndocs) || // Empty snapshot
388        _shandle->is_flushed) { // Latest snapshot has read-write barrier
389        // This can happen when a new snapshot is attempted and WAL was flushed
390        // and no mutations after WAL flush - the snapshot exists solely for
391        // existing open snapshot iterators
392        _shandle = _wal_snapshot_create(kv_id, 0, 0);
393        if (!_shandle) { // LCOV_EXCL_START
394            spin_unlock(&_wal->lock);
395            return FDB_RESULT_ALLOC_FAIL;
396        } // LCOV_EXCL_STOP
397        // This snapshot is not inserted into global shared tree
398        _wal_snapshot_init(_shandle, file, txn, seqnum, key_cmp_info);
399        DBG("%s Persisted snapshot taken at %" _F64 " for kv id %" _F64 "\n",
400            file->filename, _shandle->seqnum, kv_id);
401    } else { // Take a snapshot of the latest WAL state for this KV Store
402        if (_wal_snap_is_immutable(_shandle)) { // existing snapshot still open
403            atomic_incr_uint16_t(&_shandle->ref_cnt_kvs); // ..just Clone it
404        } else { // make this snapshot of the WAL immutable..
405            _wal_snapshot_init(_shandle, file, txn, seqnum, key_cmp_info);
406            DBG("%s Snapshot init %" _F64 " - %" _F64 " taken at %"
407                _F64 " for kv id %" _F64 "\n",
408                file->filename, _shandle->snap_stop_idx,
409                _shandle->snap_tag_idx, _shandle->seqnum, kv_id);
410        }
411    }
412    spin_unlock(&_wal->lock);
413    *shandle = _shandle;
414    return FDB_RESULT_SUCCESS;
415}
416
417
418INLINE bool _wal_can_discard(struct wal *_wal,
419                             struct wal_item *_item,
420                             struct wal_item *covering_item)
421{
422#ifndef _MVCC_WAL_ENABLE
423    return true; // if WAL is never shared, this can never be false
424#endif // _MVCC_WAL_ENABLE
425    struct snap_handle *shandle, *snext;
426    wal_snapid_t snap_stop_idx;
427    wal_snapid_t snap_tag_idx;
428    fdb_kvs_id_t kv_id;
429    bool ret = true;
430
431    if (covering_item) { // stop until the covering item's snapshot is found
432        snap_stop_idx = covering_item->shandle->snap_tag_idx;
433    } else {
434        snap_stop_idx = OPEN_SNAPSHOT_TAG;
435    }
436
437    shandle = _item->shandle;
438    fdb_assert(shandle, _item->seqnum, covering_item);
439
440    snap_tag_idx = shandle->snap_tag_idx;
441    kv_id = shandle->id;
442
443    if (_wal_snap_is_immutable(shandle)) {// its active snapshot is still open
444        ret = false; // it cannot be discarded
445    } else { // item's own snapshot is closed, but a later snapshot may need it
446        struct avl_node *node;
447        spin_lock(&_wal->lock);
448        node = avl_next(&shandle->avl_id);
449        while (node) { // check snapshots taken later until its wal was flushed
450            snext = _get_entry(node, struct snap_handle, avl_id);
451            if (snext->id != kv_id) { // don't look beyond current kv store
452                break;
453            }
454
455            if (snext->snap_stop_idx > snap_tag_idx) { // wal was flushed here.
456                break; // From this snapshot onwards, this item is reflected..
457            } // ..in the main index
458
459            if (snext->snap_tag_idx == snap_stop_idx) {
460                break; // we reached the covering item, need not examine further
461            }
462
463            if (_wal_snap_is_immutable(snext)) {
464                ret = false; // a future snapshot needs this item!
465                break;
466            }
467            node = avl_next(node);
468        }
469        spin_unlock(&_wal->lock);
470    }
471    return ret;
472}
473
474typedef enum _wal_update_type_t {
475    _WAL_NEW_DEL, // A new deleted item inserted into WAL
476    _WAL_NEW_SET, // A new non-deleted item inserted into WAL
477    _WAL_SET_TO_DEL, // A set item updated to be deleted
478    _WAL_DEL_TO_SET, // A deleted item updated to a set
479    _WAL_DROP_DELETE, // A deleted item is de-duplicated or dropped
480    _WAL_DROP_SET // A set item is de-duplicated or dropped
481} _wal_update_type;
482
483INLINE void _wal_update_stat(struct filemgr *file, fdb_kvs_id_t kv_id,
484                             _wal_update_type type)
485{
486    switch (type) {
487        case _WAL_NEW_DEL: // inserted deleted doc: ++wal_ndocs, ++wal_ndeletes
488            _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, 1);
489        case _WAL_NEW_SET: // inserted new doc: ++wal_ndocs
490            _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, 1);
491            break;
492        case _WAL_SET_TO_DEL: // update prev doc to deleted: ++wal_ndeletes
493            _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, 1);
494            break;
495        case _WAL_DEL_TO_SET: // update prev deleted doc to set: --wal_ndeletes
496            _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, -1);
497            break;
498        case _WAL_DROP_DELETE: // drop deleted item: --wal_ndocs,--wal_ndeletes
499            _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, -1);
500        case _WAL_DROP_SET: // drop item: --wal_ndocs
501            _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, -1);
502            break;
503    }
504}
505
506INLINE fdb_status _wal_insert(fdb_txn *txn,
507                              struct filemgr *file,
508                              struct _fdb_key_cmp_info *cmp_info,
509                              fdb_doc *doc,
510                              uint64_t offset,
511                              wal_insert_by caller,
512                              bool immediate_remove)
513{
514    struct wal_item *item;
515    struct wal_item_header query, *header;
516    struct snap_handle *shandle;
517    struct list_elem *le;
518    struct avl_node *node;
519    void *key = doc->key;
520    size_t keylen = doc->keylen;
521    size_t chk_sum;
522    size_t shard_num;
523    wal_snapid_t snap_tag;
524    fdb_kvs_id_t kv_id;
525
526    if (file->kv_header) { // multi KV instance mode
527        buf2kvid(file->config->chunksize, doc->key, &kv_id);
528    } else {
529        kv_id = 0;
530    }
531    shandle = _wal_fetch_snapshot(file->wal, kv_id);
532    snap_tag = shandle->snap_tag_idx;
533    query.key = key;
534    query.keylen = keylen;
535    chk_sum = get_checksum((uint8_t*)key, keylen);
536    shard_num = chk_sum % file->wal->num_shards;
537    if (caller == WAL_INS_WRITER) {
538        spin_lock(&file->wal->key_shards[shard_num].lock);
539    }
540
541    // Since we can have a different custom comparison function per kv store
542    // set the custom compare aux function every time before a search is done
543    avl_set_aux(&file->wal->key_shards[shard_num]._map,
544                (void *)cmp_info);
545    node = avl_search(&file->wal->key_shards[shard_num]._map,
546                      &query.avl_key, _wal_cmp_bykey);
547
548    if (node) {
549        // already exist .. retrieve header
550        header = _get_entry(node, struct wal_item_header, avl_key);
551
552        // find uncommitted item belonging to the same txn
553        le = list_begin(&header->items);
554        while (le) {
555            item = _get_entry(le, struct wal_item, list_elem);
556
557            if (item->txn_id == txn->txn_id
558                && !(item->flag & WAL_ITEM_COMMITTED ||
559                caller == WAL_INS_COMPACT_PHASE1) &&
560                item->shandle->snap_tag_idx == snap_tag) {
561                item->flag &= ~WAL_ITEM_FLUSH_READY;
562
563                if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
564                    // Re-index the item by new sequence number..
565                    size_t seq_shard_num = item->seqnum % file->wal->num_shards;
566                    if (caller == WAL_INS_WRITER) {
567                        spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
568                    }
569                    avl_remove(&file->wal->seq_shards[seq_shard_num]._map,
570                            &item->avl_seq);
571                    if (caller == WAL_INS_WRITER) {
572                        spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
573                    }
574
575                    item->seqnum = doc->seqnum;
576                    seq_shard_num = doc->seqnum % file->wal->num_shards;
577                    if (caller == WAL_INS_WRITER) {
578                        spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
579                    }
580                    avl_insert(&file->wal->seq_shards[seq_shard_num]._map,
581                            &item->avl_seq, _wal_cmp_byseq);
582                    if (caller == WAL_INS_WRITER) {
583                        spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
584                    }
585                } else {
586                    // just overwrite existing WAL item
587                    item->seqnum = doc->seqnum;
588                }
589
590                // mark previous doc region as stale
591                size_t doc_size_ondisk = doc->size_ondisk;
592                uint32_t stale_len = item->doc_size;
593                uint64_t stale_offset = item->offset;
594                if (item->action == WAL_ACT_INSERT ||
595                    item->action == WAL_ACT_LOGICAL_REMOVE) {
596                    // insert or logical remove
597                    filemgr_mark_stale(file, stale_offset, stale_len);
598                }
599
600                if (doc->deleted) {
601                    if (item->txn_id == file->global_txn.txn_id &&
602                        item->action == WAL_ACT_INSERT) {
603                        _wal_update_stat(file, kv_id, _WAL_SET_TO_DEL);
604                    }
605                    if (offset != BLK_NOT_FOUND && !immediate_remove) {
606                        // purge interval not met yet
607                        item->action = WAL_ACT_LOGICAL_REMOVE;// insert deleted
608                    } else { // drop the deleted doc right away
609                        item->action = WAL_ACT_REMOVE; // immediate prune index
610
611                        if (offset != BLK_NOT_FOUND) {
612                            // immediately mark as stale if offset is given
613                            // (which means that a deletion mark was appended into
614                            //  the file before calling wal_insert()).
615                            filemgr_mark_stale(file, offset, doc_size_ondisk);
616                        }
617                        doc_size_ondisk = 0;
618                    }
619                } else {
620                    if (item->txn_id == file->global_txn.txn_id &&
621                        item->action != WAL_ACT_INSERT) {
622                        _wal_update_stat(file, kv_id, _WAL_DEL_TO_SET);
623                    }
624                    item->action = WAL_ACT_INSERT;
625                }
626                atomic_add_uint64_t(&file->wal->datasize,
627                                    doc_size_ondisk - item->doc_size,
628                                    std::memory_order_relaxed);
629                item->doc_size = doc->size_ondisk;
630                item->offset = offset;
631                item->shandle = shandle;
632
633                // move the item to the front of the list (header)
634                list_remove(&header->items, &item->list_elem);
635                list_push_front(&header->items, &item->list_elem);
636                atomic_decr_uint64_t(&shandle->wal_ndocs);
637                break;
638            }
639            le = list_next(le);
640        }
641
642        if (le == NULL) {
643            // not exist
644            // create new item
645            item = (struct wal_item *)calloc(1, sizeof(struct wal_item));
646
647            if (file->kv_header) { // multi KV instance mode
648                item->flag |= WAL_ITEM_MULTI_KV_INS_MODE;
649            }
650            item->txn = txn;
651            item->txn_id = txn->txn_id;
652            if (txn->txn_id == file->global_txn.txn_id) {
653                atomic_incr_uint32_t(&file->wal->num_flushable);
654            }
655            item->header = header;
656            item->seqnum = doc->seqnum;
657
658            if (doc->deleted) {
659                if (item->txn_id == file->global_txn.txn_id) {
660                    _wal_update_stat(file, kv_id, _WAL_NEW_DEL);
661                }
662                if (offset != BLK_NOT_FOUND && !immediate_remove) {
663                    // purge interval not met yet
664                    item->action = WAL_ACT_LOGICAL_REMOVE;// insert deleted
665                } else { // compactor purge deleted doc
666                    item->action = WAL_ACT_REMOVE; // immediate prune index
667
668                    if (offset != BLK_NOT_FOUND) {
669                        // immediately mark as stale if offset is given
670                        // (which means that a deletion mark was appended into
671                        //  the file before calling wal_insert()).
672                        filemgr_mark_stale(file, offset, doc->size_ondisk);
673                    }
674                }
675            } else {
676                if (item->txn_id == file->global_txn.txn_id) {
677                    _wal_update_stat(file, kv_id, _WAL_NEW_SET);
678                }
679                item->action = WAL_ACT_INSERT;
680            }
681            item->offset = offset;
682            item->doc_size = doc->size_ondisk;
683            item->shandle = shandle;
684            if (item->action != WAL_ACT_REMOVE) {
685                atomic_add_uint64_t(&file->wal->datasize, doc->size_ondisk,
686                                    std::memory_order_relaxed);
687            }
688
689            if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
690                size_t seq_shard_num = doc->seqnum % file->wal->num_shards;
691                if (caller == WAL_INS_WRITER) {
692                    spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
693                }
694                avl_insert(&file->wal->seq_shards[seq_shard_num]._map,
695                           &item->avl_seq, _wal_cmp_byseq);
696                if (caller == WAL_INS_WRITER) {
697                    spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
698                }
699            }
700            // insert into header's list
701            list_push_front(&header->items, &item->list_elem);
702            // also insert into transaction's list
703            list_push_back(txn->items, &item->list_elem_txn);
704
705            atomic_incr_uint32_t(&file->wal->size);
706            atomic_add_uint64_t(&file->wal->mem_overhead,
707                                sizeof(struct wal_item), std::memory_order_relaxed);
708        }
709    } else {
710        // not exist .. create new one
711        // create new header and new item
712        header = (struct wal_item_header*)malloc(sizeof(struct wal_item_header));
713        list_init(&header->items);
714        header->chunksize = file->config->chunksize;
715        header->keylen = keylen;
716        header->key = (void *)malloc(header->keylen);
717        memcpy(header->key, key, header->keylen);
718
719        avl_insert(&file->wal->key_shards[shard_num]._map,
720                   &header->avl_key, _wal_cmp_bykey);
721
722        item = (struct wal_item *)malloc(sizeof(struct wal_item));
723        // entries inserted by compactor is already committed
724        if (caller == WAL_INS_COMPACT_PHASE1) {
725            item->flag = WAL_ITEM_COMMITTED;
726        } else {
727            item->flag = 0x0;
728        }
729        if (file->kv_header) { // multi KV instance mode
730            item->flag |= WAL_ITEM_MULTI_KV_INS_MODE;
731        }
732        item->txn = txn;
733        item->txn_id = txn->txn_id;
734        if (txn->txn_id == file->global_txn.txn_id) {
735            atomic_incr_uint32_t(&file->wal->num_flushable);
736        }
737        item->header = header;
738
739        item->seqnum = doc->seqnum;
740
741        if (doc->deleted) {
742            if (item->txn_id == file->global_txn.txn_id) {
743                _wal_update_stat(file, kv_id, _WAL_NEW_DEL);
744            }
745            if (offset != BLK_NOT_FOUND && !immediate_remove) {// purge interval not met yet
746                item->action = WAL_ACT_LOGICAL_REMOVE;// insert deleted
747            } else { // compactor purge deleted doc
748                item->action = WAL_ACT_REMOVE; // immediate prune index
749
750                if (offset != BLK_NOT_FOUND) {
751                    // immediately mark as stale if offset is given
752                    // (which means that an empty doc was appended before
753                    //  calling wal_insert()).
754                    filemgr_mark_stale(file, offset, doc->size_ondisk);
755                }
756            }
757        } else {
758            if (item->txn_id == file->global_txn.txn_id) {
759                _wal_update_stat(file, kv_id, _WAL_NEW_SET);
760            }
761            item->action = WAL_ACT_INSERT;
762        }
763        item->offset = offset;
764        item->doc_size = doc->size_ondisk;
765        item->shandle = shandle;
766        if (item->action != WAL_ACT_REMOVE) {
767            atomic_add_uint64_t(&file->wal->datasize, doc->size_ondisk,
768                                std::memory_order_relaxed);
769        }
770
771        if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
772            size_t seq_shard_num = doc->seqnum % file->wal->num_shards;
773            if (caller == WAL_INS_WRITER) {
774                spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
775            }
776            avl_insert(&file->wal->seq_shards[seq_shard_num]._map,
777                       &item->avl_seq, _wal_cmp_byseq);
778            if (caller == WAL_INS_WRITER) {
779                spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
780            }
781        }
782
783        // insert into header's list
784        list_push_front(&header->items, &item->list_elem);
785        if (caller == WAL_INS_WRITER || caller == WAL_INS_COMPACT_PHASE2) {
786            // also insert into transaction's list
787            list_push_back(txn->items, &item->list_elem_txn);
788        }
789
790        atomic_incr_uint32_t(&file->wal->size);
791        atomic_add_uint64_t(&file->wal->mem_overhead,
792                            sizeof(struct wal_item) + sizeof(struct wal_item_header) + keylen,
793                            std::memory_order_relaxed);
794    }
795
796    if (caller == WAL_INS_WRITER) {
797        spin_unlock(&file->wal->key_shards[shard_num].lock);
798    }
799
800    return FDB_RESULT_SUCCESS;
801}
802
803fdb_status wal_insert(fdb_txn *txn,
804                      struct filemgr *file,
805                      struct _fdb_key_cmp_info *cmp_info,
806                      fdb_doc *doc,
807                      uint64_t offset,
808                      wal_insert_by caller)
809{
810    return _wal_insert(txn, file, cmp_info, doc, offset, caller, false);
811}
812
813fdb_status wal_immediate_remove(fdb_txn *txn,
814                                struct filemgr *file,
815                                struct _fdb_key_cmp_info *cmp_info,
816                                fdb_doc *doc,
817                                uint64_t offset,
818                                wal_insert_by caller)
819{
820    return _wal_insert(txn, file, cmp_info, doc, offset, caller, true);
821}
822
823INLINE bool _wal_item_partially_committed(fdb_txn *global_txn,
824                                          struct list *active_txn_list,
825                                          fdb_txn *current_txn,
826                                          struct wal_item *item)
827{
828    bool partial_commit = false;
829
830    if (item->flag & WAL_ITEM_COMMITTED &&
831        item->txn != global_txn && item->txn != current_txn) {
832        struct wal_txn_wrapper *txn_wrapper;
833        struct list_elem *txn_elem = list_begin(active_txn_list);
834        while(txn_elem) {
835            txn_wrapper = _get_entry(txn_elem, struct wal_txn_wrapper, le);
836            if (txn_wrapper->txn_id == item->txn_id) {
837                partial_commit = true;
838                break;
839            }
840            txn_elem = list_next(txn_elem);
841        }
842    }
843    return partial_commit;
844}
845
846/**
847 * Since items are shared with current & future snapshots...
848 * Find item belonging to snapshot OR
849 * The item from the previous most recent snapshot
850 *
851 * TODO: Due to the fact that transactional items can overwrite
852 *       more recent items created upon fdb_end_trans, we must scan entire list
853 *       to find a qualifying item from the previous most recent snapshot
854 *       This is not efficient and we need a better way of ordering the list
855 */
856INLINE struct wal_item *_wal_get_snap_item(struct wal_item_header *header,
857                                           struct snap_handle *shandle)
858{
859    struct wal_item *item;
860    struct wal_item *max_shared_item = NULL;
861    fdb_txn *txn = shandle->snap_txn;
862    wal_snapid_t tag = shandle->snap_tag_idx;
863    wal_snapid_t snap_stop_tag = shandle->snap_stop_idx;
864    struct list_elem *le = list_begin(&header->items);
865
866    // discard wal keys that have no items in them
867    if (!le) {
868        return NULL;
869    }
870
871    for (; le; le = list_next(le)) {
872        item = _get_entry(le, struct wal_item, list_elem);
873        if (item->txn_id != txn->txn_id && !(item->flag & WAL_ITEM_COMMITTED)) {
874            continue;
875        }
876        if (item->shandle->snap_tag_idx > tag) {
877            continue; // this item was inserted after snapshot creation -> skip
878        }
879        if (_wal_item_partially_committed(shandle->global_txn,
880                                          &shandle->active_txn_list,
881                                          txn, item)) {
882            continue;
883        }
884        if (item->shandle->snap_tag_idx == tag) {// Found exact snapshot item
885            max_shared_item = item; // look no further
886            break;
887        }
888
889        // if my snapshot was taken after a WAL flush..
890        if (item->shandle->snap_tag_idx <= snap_stop_tag) {
891            continue; // then do not consider pre-flush items
892        }
893        if (item->shandle->snap_tag_idx < tag) {
894            if (!max_shared_item) {
895                max_shared_item = item;
896            } else if (item->shandle->snap_tag_idx >
897                       max_shared_item->shandle->snap_tag_idx) {
898                max_shared_item = item;
899            }
900        }
901    }
902    return (struct wal_item *)max_shared_item;
903}
904
905static fdb_status _wal_find(fdb_txn *txn,
906                            struct filemgr *file,
907                            fdb_kvs_id_t kv_id,
908                            struct _fdb_key_cmp_info *cmp_info,
909                            struct snap_handle *shandle,
910                            fdb_doc *doc,
911                            uint64_t *offset)
912{
913    struct wal_item item_query, *item = NULL;
914    struct wal_item_header query, *header = NULL;
915    struct list_elem *le = NULL, *_le;
916    struct avl_node *node = NULL;
917    void *key = doc->key;
918    size_t keylen = doc->keylen;
919
920    if (doc->seqnum == SEQNUM_NOT_USED || (key && keylen>0)) {
921        size_t chk_sum = get_checksum((uint8_t*)key, keylen);
922        size_t shard_num = chk_sum % file->wal->num_shards;
923        spin_lock(&file->wal->key_shards[shard_num].lock);
924        // search by key
925        query.key = key;
926        query.keylen = keylen;
927        avl_set_aux(&file->wal->key_shards[shard_num]._map,
928                    (void *)cmp_info);
929        node = avl_search(&file->wal->key_shards[shard_num]._map,
930                          &query.avl_key, _wal_cmp_bykey);
931        if (node) {
932            struct wal_item *committed_item = NULL;
933            // retrieve header
934            header = _get_entry(node, struct wal_item_header, avl_key);
935            if (shandle) {
936                item = _wal_get_snap_item(header, shandle);
937            } else { // regular non-snapshot lookup
938                for (le = list_begin(&header->items);
939                     le; le = _le) {
940                    item = _get_entry(le, struct wal_item, list_elem);
941                    // Items get ordered as follows in the header's list..
942                    // (begin) 6 --- 5 --- 4 --- 1 --- 2 --- 3 <-- (end)
943                    //  Uncommitted items-->     <--- Committed items
944                    if (!committed_item) {
945                        if (item->flag & WAL_ITEM_COMMITTED) {
946                            committed_item = item;
947                            _le = list_end(&header->items);
948                            if (_le == le) { // just one element at the end
949                                _le = NULL; // process current element & exit
950                            } else { // current element is not the last item..
951                                continue; // start reverse scan from the end
952                            }
953                        } else { // uncommitted items - still continue forward
954                            _le = list_next(le);
955                        }
956                    } else { // reverse scan list over committed items..
957                        _le = list_prev(le);
958                        // is it back to the first committed item..
959                        if (_le == &committed_item->list_elem) {
960                            _le = NULL; // need not re-iterate over uncommitted
961                        }
962                    }
963                    if (item->flag & WAL_ITEM_FLUSHED_OUT) {
964                        item = NULL; // item reflected in main index and is not
965                        break; // to be returned for non-snapshot reads
966                    }
967                    // only committed items can be seen by the other handles, OR
968                    // items belonging to the same txn can be found, OR
969                    // a transaction's isolation level is read uncommitted.
970                    if ((item->flag & WAL_ITEM_COMMITTED) ||
971                        (item->txn_id == txn->txn_id) ||
972                        (txn->isolation == FDB_ISOLATION_READ_UNCOMMITTED)) {
973                        break;
974                    } else {
975                        item = NULL;
976                    }
977                } // done for all items in the header's list
978            } // done for regular (non-snapshot) lookup
979            if (item) {
980                *offset = item->offset;
981                if (item->action == WAL_ACT_INSERT) {
982                    doc->deleted = false;
983                } else {
984                    doc->deleted = true;
985                    if (item->action == WAL_ACT_REMOVE) {
986                        // Immediately deleted & purged docs have no real
987                        // presence on-disk. wal_find must return SUCCESS
988                        // here to indicate that the doc was deleted to
989                        // prevent main index lookup. Also, it must set the
990                        // offset to BLK_NOT_FOUND to ensure that caller
991                        // does NOT attempt to fetch the doc OR its
992                        // metadata from file.
993                        *offset = BLK_NOT_FOUND;
994                    }
995                }
996                doc->seqnum = item->seqnum;
997                spin_unlock(&file->wal->key_shards[shard_num].lock);
998                return FDB_RESULT_SUCCESS;
999            }
1000        }
1001        spin_unlock(&file->wal->key_shards[shard_num].lock);
1002    } else {
1003        if (file->config->seqtree_opt != FDB_SEQTREE_USE) {
1004            return FDB_RESULT_INVALID_CONFIG;
1005        }
1006        // search by seqnum
1007        struct wal_item_header temp_header;
1008
1009        if (file->kv_header) { // multi KV instance mode
1010            temp_header.key = (void*)alca(uint8_t, file->config->chunksize);
1011            kvid2buf(file->config->chunksize, kv_id, temp_header.key);
1012            item_query.header = &temp_header;
1013        }
1014        item_query.seqnum = doc->seqnum;
1015
1016        size_t shard_num = doc->seqnum % file->wal->num_shards;
1017        spin_lock(&file->wal->seq_shards[shard_num].lock);
1018        node = avl_search(&file->wal->seq_shards[shard_num]._map,
1019                          &item_query.avl_seq, _wal_cmp_byseq);
1020        if (node) {
1021            item = _get_entry(node, struct wal_item, avl_seq);
1022            if ((item->flag & WAL_ITEM_COMMITTED) ||
1023                (item->txn_id == txn->txn_id) ||
1024                (txn->isolation == FDB_ISOLATION_READ_UNCOMMITTED)) {
1025                *offset = item->offset;
1026                if (item->action == WAL_ACT_INSERT) {
1027                    doc->deleted = false;
1028                } else {
1029                    doc->deleted = true;
1030                    if (item->action == WAL_ACT_REMOVE) {
1031                        // Immediately deleted & purged doc have no real
1032                        // presence on-disk. wal_find must return SUCCESS
1033                        // here to indicate that the doc was deleted to
1034                        // prevent main index lookup. Also, it must set the
1035                        // offset to BLK_NOT_FOUND to ensure that caller
1036                        // does NOT attempt to fetch the doc OR its
1037                        // metadata from file.
1038                        *offset = BLK_NOT_FOUND;
1039                    }
1040                }
1041                spin_unlock(&file->wal->seq_shards[shard_num].lock);
1042                return FDB_RESULT_SUCCESS;
1043            }
1044        }
1045        spin_unlock(&file->wal->seq_shards[shard_num].lock);
1046    }
1047
1048    return FDB_RESULT_KEY_NOT_FOUND;
1049}
1050
1051static
1052fdb_status _wal_snap_find(struct snap_handle *shandle, fdb_doc *doc,
1053                          uint64_t *offset);
1054
1055fdb_status wal_find(fdb_txn *txn, struct filemgr *file,
1056                    struct _fdb_key_cmp_info *cmp_info,
1057                    struct snap_handle *shandle,
1058                    fdb_doc *doc, uint64_t *offset)
1059{
1060    if (shandle) {
1061        if (shandle->is_persisted_snapshot) {
1062            return _wal_snap_find(shandle, doc, offset);
1063        }
1064    }
1065    return _wal_find(txn, file, 0, cmp_info, shandle, doc, offset);
1066}
1067
1068fdb_status wal_find_kv_id(fdb_txn *txn,
1069                          struct filemgr *file,
1070                          fdb_kvs_id_t kv_id,
1071                          struct _fdb_key_cmp_info *cmp_info,
1072                          struct snap_handle *shandle,
1073                          fdb_doc *doc,
1074                          uint64_t *offset)
1075{
1076    if (shandle) {
1077        if (shandle->is_persisted_snapshot) {
1078            return _wal_snap_find(shandle, doc, offset);
1079        }
1080    }
1081    return _wal_find(txn, file, kv_id, cmp_info, shandle, doc, offset);
1082}
1083
1084// Pre-condition: writer lock (filemgr mutex) must be held for this call
1085// Readers can interleave without lock
1086INLINE void _wal_free_item(struct wal_item *item, struct wal *_wal) {
1087    struct snap_handle *shandle = item->shandle;
1088    if (!atomic_decr_uint64_t(&shandle->wal_ndocs)) {
1089        spin_lock(&_wal->lock);
1090        DBG("%s Last item removed from snapshot %" _F64 "-%" _F64 " %" _F64
1091                " kv id %" _F64 ". Destroy snapshot handle..\n",
1092                shandle->snap_txn && shandle->snap_txn->handle ?
1093                shandle->snap_txn->handle->file->filename : "",
1094                shandle->snap_stop_idx, shandle->snap_tag_idx,
1095                shandle->seqnum, shandle->id);
1096        avl_remove(&_wal->wal_snapshot_tree, &shandle->avl_id);
1097        for (struct list_elem *e = list_begin(&shandle->active_txn_list); e;) {
1098            struct list_elem *e_next = list_next(e);
1099            struct wal_txn_wrapper *active_txn = _get_entry(e,
1100                                                 struct wal_txn_wrapper, le);
1101            free(active_txn);
1102            e = e_next;
1103        }
1104        free(shandle);
1105        spin_unlock(&_wal->lock);
1106    }
1107    memset(item, 0, sizeof(struct wal_item));
1108    free(item);
1109}
1110
1111// move all uncommitted items into 'new_file'
1112fdb_status wal_txn_migration(void *dbhandle,
1113                             void *new_dhandle,
1114                             struct filemgr *old_file,
1115                             struct filemgr *new_file,
1116                             wal_doc_move_func *move_doc)
1117{
1118    int64_t offset;
1119    fdb_doc doc;
1120    fdb_txn *txn;
1121    struct wal_txn_wrapper *txn_wrapper;
1122    struct wal_item_header *header;
1123    struct wal_item *item;
1124    struct avl_node *node;
1125    struct list_elem *e;
1126    size_t i = 0;
1127    size_t num_shards = old_file->wal->num_shards;
1128    uint64_t mem_overhead = 0;
1129    struct _fdb_key_cmp_info cmp_info;
1130
1131    // Note that the caller (i.e., compactor) alreay owns the locks on
1132    // both old_file and new_file filemgr instances. Therefore, it is OK to
1133    // grab each partition lock individually and move all uncommitted items
1134    // to the new_file filemgr instance.
1135
1136    for (; i < num_shards; ++i) {
1137        spin_lock(&old_file->wal->key_shards[i].lock);
1138        node = avl_first(&old_file->wal->key_shards[i]._map);
1139        while(node) {
1140            header = _get_entry(node, struct wal_item_header, avl_key);
1141            e = list_end(&header->items);
1142            while(e) {
1143                item = _get_entry(e, struct wal_item, list_elem);
1144                if (!(item->flag & WAL_ITEM_COMMITTED)) {
1145                    // not committed yet
1146                    // move doc
1147                    offset = move_doc(dbhandle, new_dhandle, item, &doc);
1148                    if (offset <= 0) {
1149                        spin_unlock(&old_file->wal->key_shards[i].lock);
1150                        return offset < 0 ? (fdb_status) offset : FDB_RESULT_READ_FAIL;
1151                    }
1152                    // Note that all items belonging to global_txn should be
1153                    // flushed before calling this function
1154                    // (migrate transactional items only).
1155                    fdb_assert(item->txn != &old_file->global_txn,
1156                               (uint64_t)item->txn, 0);
1157                    cmp_info.kvs_config = item->txn->handle->kvs_config;
1158                    cmp_info.kvs = item->txn->handle->kvs;
1159                    // insert into new_file's WAL
1160                    wal_insert(item->txn, new_file, &cmp_info, &doc, offset,
1161                               WAL_INS_WRITER);
1162
1163                    if (old_file->config->seqtree_opt == FDB_SEQTREE_USE) {
1164                        // remove from seq map
1165                        size_t shard_num = item->seqnum % num_shards;
1166                        spin_lock(&old_file->wal->seq_shards[shard_num].lock);
1167                        avl_remove(&old_file->wal->seq_shards[shard_num]._map,
1168                                &item->avl_seq);
1169                        spin_unlock(&old_file->wal->seq_shards[shard_num].lock);
1170                    }
1171
1172                    // remove from header's list
1173                    e = list_remove_reverse(&header->items, e);
1174                    // remove from transaction's list
1175                    list_remove(item->txn->items, &item->list_elem_txn);
1176                    // decrease num_flushable of old_file if non-transactional update
1177                    if (item->txn_id == old_file->global_txn.txn_id) {
1178                        atomic_decr_uint32_t(&old_file->wal->num_flushable);
1179                    }
1180                    if (item->action != WAL_ACT_REMOVE) {
1181                        atomic_sub_uint64_t(&old_file->wal->datasize, item->doc_size,
1182                                            std::memory_order_relaxed);
1183                    }
1184                    // free item
1185                    free(item);
1186                    // free doc
1187                    free(doc.key);
1188                    free(doc.meta);
1189                    free(doc.body);
1190                    atomic_decr_uint32_t(&old_file->wal->size);
1191                    mem_overhead += sizeof(struct wal_item);
1192                } else {
1193                    e = list_prev(e);
1194                }
1195            }
1196
1197            if (list_begin(&header->items) == NULL) {
1198                // header's list becomes empty
1199                // remove from key map
1200                node = avl_next(node);
1201                avl_remove(&old_file->wal->key_shards[i]._map,
1202                           &header->avl_key);
1203                mem_overhead += header->keylen + sizeof(struct wal_item_header);
1204                // free key & header
1205                free(header->key);
1206                free(header);
1207            } else {
1208                node = avl_next(node);
1209            }
1210        }
1211        spin_unlock(&old_file->wal->key_shards[i].lock);
1212    }
1213    atomic_sub_uint64_t(&old_file->wal->mem_overhead, mem_overhead,
1214                        std::memory_order_relaxed);
1215
1216    spin_lock(&old_file->wal->lock);
1217
1218    // migrate all entries in txn list
1219    e = list_begin(&old_file->wal->txn_list);
1220    while(e) {
1221        txn_wrapper = _get_entry(e, struct wal_txn_wrapper, le);
1222        txn = txn_wrapper->txn;
1223        // except for global_txn
1224        if (txn != &old_file->global_txn) {
1225            e = list_remove(&old_file->wal->txn_list, &txn_wrapper->le);
1226            list_push_front(&new_file->wal->txn_list, &txn_wrapper->le);
1227            // remove previous header info & revnum
1228            txn->prev_hdr_bid = BLK_NOT_FOUND;
1229            txn->prev_revnum = 0;
1230        } else {
1231            e = list_next(e);
1232        }
1233    }
1234
1235    spin_unlock(&old_file->wal->lock);
1236
1237    return FDB_RESULT_SUCCESS;
1238}
1239
1240fdb_status wal_commit(fdb_txn *txn, struct filemgr *file,
1241                      wal_commit_mark_func *func,
1242                      err_log_callback *log_callback)
1243{
1244    int can_overwrite;
1245    struct wal_item *item, *_item;
1246    struct list_elem *e1, *e2;
1247    fdb_kvs_id_t kv_id;
1248    fdb_status status = FDB_RESULT_SUCCESS;
1249    size_t shard_num;
1250    uint64_t mem_overhead = 0;
1251
1252    e1 = list_begin(txn->items);
1253    while(e1) {
1254        item = _get_entry(e1, struct wal_item, list_elem_txn);
1255        fdb_assert(item->txn_id == txn->txn_id, item->txn_id, txn->txn_id);
1256        // Grab the WAL key shard lock.
1257        shard_num = get_checksum((uint8_t*)item->header->key,
1258                                 item->header->keylen) %
1259                                 file->wal->num_shards;
1260        spin_lock(&file->wal->key_shards[shard_num].lock);
1261
1262        if (!(item->flag & WAL_ITEM_COMMITTED)) {
1263            // get KVS ID
1264            if (item->flag & WAL_ITEM_MULTI_KV_INS_MODE) {
1265                buf2kvid(item->header->chunksize, item->header->key, &kv_id);
1266            } else {
1267                kv_id = 0;
1268            }
1269
1270            item->flag |= WAL_ITEM_COMMITTED;
1271            if (item->txn != &file->global_txn) {
1272                // increase num_flushable if it is transactional update
1273                atomic_incr_uint32_t(&file->wal->num_flushable);
1274                // Also since a transaction doc was committed
1275                // update global WAL stats to reflect this change..
1276                if (item->action == WAL_ACT_INSERT) {
1277                    _wal_update_stat(file, kv_id, _WAL_NEW_SET);
1278                } else {
1279                    _wal_update_stat(file, kv_id, _WAL_NEW_DEL);
1280                }
1281            }
1282            // append commit mark if necessary
1283            if (func) {
1284                status = func(txn->handle, item->offset);
1285                if (status != FDB_RESULT_SUCCESS) {
1286                    fdb_log(log_callback, status,
1287                            "Error in appending a commit mark at offset %"
1288                            _F64 " in "
1289                            "a database file '%s'", item->offset,
1290                            file->filename);
1291                    spin_unlock(&file->wal->key_shards[shard_num].lock);
1292                    atomic_sub_uint64_t(&file->wal->mem_overhead, mem_overhead,
1293                                        std::memory_order_relaxed);
1294                    return status;
1295                }
1296            }
1297            // remove previously committed item if no snapshots refer to it,
1298            // move the committed item to the end of the wal_item_header's list
1299            list_remove(&item->header->items, &item->list_elem);
1300            list_push_back(&item->header->items, &item->list_elem);
1301            // now reverse scan among other committed items to de-duplicate..
1302            e2 = list_prev(&item->list_elem);
1303            while(e2) {
1304                _item = _get_entry(e2, struct wal_item, list_elem);
1305                if (!(_item->flag & WAL_ITEM_COMMITTED)) {
1306                    break;
1307                }
1308                e2 = list_prev(e2);
1309                can_overwrite = (item->shandle == _item->shandle ||
1310                                 _wal_can_discard(file->wal, _item, item));
1311                if (!can_overwrite) {
1312                    item = _item; // new covering item found
1313                    continue;
1314                }
1315                // committed but not flush-ready
1316                // (flush-readied item will be removed by flushing)
1317                if (!(_item->flag & WAL_ITEM_FLUSH_READY)) {
1318                    // remove from list & hash
1319                    list_remove(&item->header->items, &_item->list_elem);
1320                    if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
1321                        size_t seq_shard_num = _item->seqnum
1322                                             % file->wal->num_shards;
1323                        spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
1324                        avl_remove(&file->wal->seq_shards[seq_shard_num]._map,
1325                                   &_item->avl_seq);
1326                        spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
1327                    }
1328
1329                    // mark previous doc region as stale
1330                    uint32_t stale_len = _item->doc_size;
1331                    uint64_t stale_offset = _item->offset;
1332                    if (_item->action == WAL_ACT_INSERT ||
1333                        _item->action == WAL_ACT_LOGICAL_REMOVE) {
1334                        // insert or logical remove
1335                        filemgr_mark_stale(file, stale_offset, stale_len);
1336                    }
1337
1338                    atomic_decr_uint32_t(&file->wal->size);
1339                    atomic_decr_uint32_t(&file->wal->num_flushable);
1340                    if (item->action != WAL_ACT_REMOVE) {
1341                        atomic_sub_uint64_t(&file->wal->datasize,
1342                                            _item->doc_size,
1343                                            std::memory_order_relaxed);
1344                    }
1345                    // simply reduce the stat count...
1346                    if (_item->action == WAL_ACT_INSERT) {
1347                        _wal_update_stat(file, kv_id, _WAL_DROP_SET);
1348                    } else {
1349                        _wal_update_stat(file, kv_id, _WAL_DROP_DELETE);
1350                    }
1351                    mem_overhead += sizeof(struct wal_item);
1352                    _wal_free_item(_item, file->wal);
1353                } else {
1354                    fdb_log(log_callback, status,
1355                            "Wal commit called when wal_flush in progress."
1356                            "item seqnum %" _F64
1357                            " keylen %d flags %x action %d"
1358                            "%s", _item->seqnum, item->header->keylen,
1359                            atomic_get_uint8_t(&_item->flag),
1360                            _item->action, file->filename);
1361                }
1362            }
1363        }
1364
1365        // remove from transaction's list
1366        e1 = list_remove(txn->items, e1);
1367        spin_unlock(&file->wal->key_shards[shard_num].lock);
1368    }
1369    atomic_sub_uint64_t(&file->wal->mem_overhead, mem_overhead,
1370                        std::memory_order_relaxed);
1371
1372    return status;
1373}
1374
1375static int _wal_flush_cmp(struct avl_node *a, struct avl_node *b, void *aux)
1376{
1377    struct wal_item *aa, *bb;
1378    aa = _get_entry(a, struct wal_item, avl_flush);
1379    bb = _get_entry(b, struct wal_item, avl_flush);
1380
1381    if (aa->old_offset < bb->old_offset) {
1382        return -1;
1383    } else if (aa->old_offset > bb->old_offset) {
1384        return 1;
1385    } else {
1386        // old_offset can be 0 if the document was newly inserted
1387        if (aa->offset < bb->offset) {
1388            return -1;
1389        } else if (aa->offset > bb->offset) {
1390            return 1;
1391        } else {
1392            // Note: get_old_offset() may return same old_offset on different keys;
1393            // this is because hbtrie_find_offset() (internally called by
1394            // get_old_offset()) does not compare entire key string but just prefix
1395            // only due to performance issue.
1396            // As a result, this case (keys are different but both old_offset and
1397            // offset are same) very rarely happens and causes crash.
1398            // In this case, we need to additionally compare sequence numbers
1399            // to distinguish those two different items.
1400            if (aa->seqnum < bb->seqnum) {
1401                return -1;
1402            } else if (aa->seqnum > bb->seqnum) {
1403                return 1;
1404            } else {
1405                return 0;
1406            }
1407        }
1408    }
1409}
1410
1411INLINE void _wal_release_item(struct filemgr *file, size_t shard_num,
1412                              fdb_kvs_id_t kv_id, struct wal_item *item) {
1413    list_remove(&item->header->items, &item->list_elem);
1414    if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
1415        size_t seq_shard_num;
1416        seq_shard_num = item->seqnum % file->wal->num_shards;
1417        spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
1418        avl_remove(&file->wal->seq_shards[seq_shard_num]._map,
1419                &item->avl_seq);
1420        spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
1421    }
1422
1423    if (item->action == WAL_ACT_LOGICAL_REMOVE ||
1424        item->action == WAL_ACT_REMOVE) {
1425        _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, -1);
1426    }
1427    _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, -1);
1428    atomic_decr_uint32_t(&file->wal->size);
1429    atomic_decr_uint32_t(&file->wal->num_flushable);
1430    if (item->action != WAL_ACT_REMOVE) {
1431        atomic_sub_uint64_t(&file->wal->datasize, item->doc_size,
1432                            std::memory_order_relaxed);
1433    }
1434    _wal_free_item(item, file->wal);
1435}
1436
1437INLINE list_elem *_wal_release_items(struct filemgr *file, size_t shard_num,
1438                                     struct wal_item *item) {
1439    fdb_kvs_id_t kv_id;
1440    uint64_t mem_overhead = 0;
1441    struct list_elem *le = &item->list_elem;
1442    struct wal_item_header *header = item->header;
1443
1444    // get KVS ID
1445    if (item->flag & WAL_ITEM_MULTI_KV_INS_MODE) {
1446        buf2kvid(item->header->chunksize, item->header->key, &kv_id);
1447    } else {
1448        kv_id = 0;
1449    }
1450    le = list_prev(le);
1451    if (_wal_can_discard(file->wal, item, NULL)) {
1452        _wal_release_item(file, shard_num, kv_id, item);
1453        mem_overhead += sizeof(struct wal_item);
1454        item = NULL;
1455    } else {
1456        item->flag &= ~WAL_ITEM_FLUSH_READY;
1457        item->flag |= WAL_ITEM_FLUSHED_OUT;
1458    }
1459    // try to cleanup items from prior snapshots as well..
1460    while (le) {
1461        struct wal_item *sitem = _get_entry(le, struct wal_item, list_elem);
1462        if (!(sitem->flag & WAL_ITEM_COMMITTED)) { // uncommitted items will
1463            le = NULL; // be flushed in the next wal_flush operation
1464            break;
1465        }
1466        le = list_prev(le);
1467        if (_wal_can_discard(file->wal, sitem, item)) {
1468            _wal_release_item(file, shard_num, kv_id, sitem);
1469            mem_overhead += sizeof(struct wal_item);
1470        } else {
1471            item = sitem; // this is the latest and greatest item
1472            item->flag &= ~WAL_ITEM_FLUSH_READY;
1473            item->flag |= WAL_ITEM_FLUSHED_OUT;
1474        }
1475    }
1476    if (list_begin(&header->items) == NULL) {
1477        // wal_item_header becomes empty
1478        // free header and remove from key map
1479        avl_remove(&file->wal->key_shards[shard_num]._map,
1480                &header->avl_key);
1481        mem_overhead = sizeof(wal_item_header) + header->keylen;
1482        free(header->key);
1483        free(header);
1484        le = NULL;
1485    }
1486    atomic_sub_uint64_t(&file->wal->mem_overhead,
1487                        mem_overhead + sizeof(struct wal_item),
1488                        std::memory_order_relaxed);
1489    return le;
1490}
1491
1492// Mark all snapshots are flushed to indicate that all items have been
1493// reflected in the main index and future snapshots must not access these
1494INLINE void _wal_snap_mark_flushed(struct wal *_wal)
1495{
1496    struct avl_node *a;
1497    spin_lock(&_wal->lock);
1498    for (a = avl_first(&_wal->wal_snapshot_tree);
1499         a; a = avl_next(a)) {
1500        struct snap_handle *shandle = _get_entry(a, struct snap_handle, avl_id);
1501        shandle->is_flushed = true;
1502    }
1503    spin_unlock(&_wal->lock);
1504}
1505
1506#define WAL_SORTED_FLUSH ((void *)1) // stored in aux if avl tree is used
1507
1508INLINE bool _wal_are_items_sorted(union wal_flush_items *flush_items)
1509{
1510    return (flush_items->tree.aux == WAL_SORTED_FLUSH);
1511}
1512
1513fdb_status wal_release_flushed_items(struct filemgr *file,
1514                                     union wal_flush_items *flush_items)
1515{
1516    struct wal_item *item;
1517    size_t shard_num;
1518
1519    _wal_snap_mark_flushed(file->wal); // Read-write barrier: items are in trie
1520
1521    if (_wal_are_items_sorted(flush_items)) {
1522        struct avl_tree *tree = &flush_items->tree;
1523        // scan and remove entries in the avl-tree
1524        while (1) {
1525            struct avl_node *a;
1526            if ((a = avl_first(tree)) == NULL) {
1527                break;
1528            }
1529            item = _get_entry(a, struct wal_item, avl_flush);
1530            avl_remove(tree, &item->avl_flush);
1531
1532            // Grab the WAL key shard lock.
1533            shard_num = get_checksum((uint8_t*)item->header->key,
1534                                     item->header->keylen)
1535                                     % file->wal->num_shards;
1536            spin_lock(&file->wal->key_shards[shard_num].lock);
1537
1538            _wal_release_items(file, shard_num, item);
1539
1540            spin_unlock(&file->wal->key_shards[shard_num].lock);
1541        }
1542    } else {
1543        struct list *list_head = &flush_items->list;
1544        // scan and remove entries in the avl-tree
1545        while (1) {
1546            struct list_elem *a;
1547            if ((a = list_begin(list_head)) == NULL) {
1548                break;
1549            }
1550            item = _get_entry(a, struct wal_item, list_elem_flush);
1551            list_remove(list_head, &item->list_elem_flush);
1552
1553            // Grab the WAL key shard lock.
1554            shard_num = get_checksum((uint8_t*)item->header->key,
1555                                     item->header->keylen)
1556                                     % file->wal->num_shards;
1557            spin_lock(&file->wal->key_shards[shard_num].lock);
1558            _wal_release_items(file, shard_num, item);
1559            spin_unlock(&file->wal->key_shards[shard_num].lock);
1560        }
1561    }
1562
1563    return FDB_RESULT_SUCCESS;
1564}
1565
1566INLINE fdb_status _wal_do_flush(struct wal_item *item,
1567                                wal_flush_func *flush_func,
1568                                void *dbhandle,
1569                                struct avl_tree *stale_seqnum_list,
1570                                struct avl_tree *kvs_delta_stats)
1571{
1572    // check weather this item is updated after insertion into tree
1573    if (item->flag & WAL_ITEM_FLUSH_READY) {
1574        fdb_status fs = flush_func(dbhandle, item, stale_seqnum_list, kvs_delta_stats);
1575        if (fs != FDB_RESULT_SUCCESS) {
1576            fdb_kvs_handle *handle = (fdb_kvs_handle *) dbhandle;
1577            fdb_log(&handle->log_callback, fs,
1578                    "Failed to flush WAL item (key '%s') into a database file '%s'",
1579                    (const char *) item->header->key, handle->file->filename);
1580            return fs;
1581        }
1582    }
1583    return FDB_RESULT_SUCCESS;
1584}
1585
1586struct fdb_root_info {
1587    bid_t orig_id_root;
1588    bid_t orig_seq_root;
1589    bid_t orig_stale_root;
1590};
1591
1592INLINE void _wal_backup_root_info(void *voidhandle,
1593                                  struct fdb_root_info *root_info)
1594{
1595    fdb_kvs_handle *handle = (fdb_kvs_handle*)voidhandle;
1596
1597    root_info->orig_id_root = handle->trie->root_bid;
1598    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1599        if (handle->kvs) {
1600            root_info->orig_seq_root = handle->seqtrie->root_bid;
1601        } else {
1602            root_info->orig_seq_root = handle->seqtree->root_bid;
1603        }
1604    }
1605    if (handle->staletree) {
1606        root_info->orig_stale_root = handle->staletree->root_bid;
1607    }
1608}
1609
1610INLINE void _wal_restore_root_info(void *voidhandle,
1611                                   struct fdb_root_info *root_info)
1612{
1613    fdb_kvs_handle *handle = (fdb_kvs_handle*)voidhandle;
1614
1615    handle->trie->root_bid = root_info->orig_id_root;
1616    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1617        if (handle->kvs) {
1618            handle->seqtrie->root_bid = root_info->orig_seq_root;
1619        } else {
1620            handle->seqtree->root_bid = root_info->orig_seq_root;
1621        }
1622    }
1623    if (handle->staletree) {
1624        handle->staletree->root_bid = root_info->orig_stale_root;
1625    }
1626}
1627
1628static fdb_status _wal_flush(struct filemgr *file,
1629                             void *dbhandle,
1630                             wal_flush_func *flush_func,
1631                             wal_get_old_offset_func *get_old_offset,
1632                             wal_flush_seq_purge_func *seq_purge_func,
1633                             wal_flush_kvs_delta_stats_func *delta_stats_func,
1634                             union wal_flush_items *flush_items,
1635                             bool by_compactor)
1636{
1637    struct avl_tree *tree = &flush_items->tree;
1638    struct list *list_head = &flush_items->list;
1639    struct list_elem *ee, *ee_prev;
1640    struct avl_node *a, *a_next;
1641    struct wal_item *item;
1642    struct wal_item_header *header;
1643    struct fdb_root_info root_info;
1644    size_t i = 0;
1645    size_t num_shards = file->wal->num_shards;
1646    bool do_sort = !filemgr_is_fully_resident(file);
1647
1648    if (do_sort) {
1649        avl_init(tree, WAL_SORTED_FLUSH);
1650    } else {
1651        list_init(list_head);
1652    }
1653
1654    memset(&root_info, 0xff, sizeof(root_info));
1655    _wal_backup_root_info(dbhandle, &root_info);
1656
1657    for (; i < num_shards; ++i) {
1658        spin_lock(&file->wal->key_shards[i].lock);
1659        a = avl_first(&file->wal->key_shards[i]._map);
1660        while (a) {
1661            a_next = avl_next(a);
1662            header = _get_entry(a, struct wal_item_header, avl_key);
1663            ee = list_end(&header->items);
1664            while (ee) {
1665                ee_prev = list_prev(ee);
1666                item = _get_entry(ee, struct wal_item, list_elem);
1667                // committed but not flushed items
1668                if (!(item->flag & WAL_ITEM_COMMITTED)) {
1669                    break;
1670                }
1671                // Don't re-flush flushed items, try to free them up instead
1672                if (item->flag & WAL_ITEM_FLUSHED_OUT) {
1673                    _wal_release_items(file, i, item);
1674                    break; // most recent item is already reflected in trie
1675                }
1676                if (!(item->flag & WAL_ITEM_FLUSH_READY)) {
1677                    item->flag |= WAL_ITEM_FLUSH_READY;
1678                    // if WAL_ITEM_FLUSH_READY flag is set,
1679                    // this item becomes immutable, so that
1680                    // no other concurrent thread modifies it.
1681                    if (by_compactor) {
1682                        // During the first phase of compaction, we don't need
1683                        // to retrieve the old offsets of WAL items because they
1684                        // are all new insertions into new file's hbtrie index.
1685                        item->old_offset = 0;
1686                        if (do_sort) {
1687                            avl_insert(tree, &item->avl_flush, _wal_flush_cmp);
1688                        } else {
1689                            list_push_back(list_head, &item->list_elem_flush);
1690                        }
1691                    } else {
1692                        spin_unlock(&file->wal->key_shards[i].lock);
1693                        if (get_old_offset(dbhandle, item, &(item->old_offset))
1694                            == FDB_RECOVERABLE_ERR)
1695                            return FDB_RECOVERABLE_ERR;
1696                        spin_lock(&file->wal->key_shards[i].lock);
1697                        if (item->old_offset == item->offset) {
1698                            // Sometimes if there are uncommitted transactional
1699                            // items along with flushed committed items when
1700                            // file was closed, wal_restore can end up inserting
1701                            // already flushed items back into WAL.
1702                            // We should not try to flush them back again
1703                            item->flag |= WAL_ITEM_FLUSHED_OUT;
1704                        }
1705                        if (item->old_offset == 0 && // doc not in main index
1706                            item->action == WAL_ACT_REMOVE) {// insert & delete
1707                            item->old_offset = BLK_NOT_FOUND;
1708                            item->flag |= WAL_ITEM_FLUSHED_OUT;
1709                        }
1710                        if (do_sort) {
1711                            avl_insert(tree, &item->avl_flush, _wal_flush_cmp);
1712                        } else {
1713                            list_push_back(list_head, &item->list_elem_flush);
1714                        }
1715                        break; // only pick one item per key
1716                    }
1717                }
1718                ee = ee_prev;
1719            }
1720            a = a_next;
1721        }
1722        spin_unlock(&file->wal->key_shards[i].lock);
1723    }
1724
1725    filemgr_set_io_inprog(file); // MB-16622:prevent parallel writes by flusher
1726    fdb_status fs = FDB_RESULT_SUCCESS;
1727    struct avl_tree stale_seqnum_list;
1728    struct avl_tree kvs_delta_stats;
1729    avl_init(&stale_seqnum_list, NULL);
1730    avl_init(&kvs_delta_stats, NULL);
1731
1732    // scan and flush entries in the avl-tree or list
1733    if (do_sort) {
1734        struct avl_node *a = avl_first(tree);
1735        while (a) {
1736            item = _get_entry(a, struct wal_item, avl_flush);
1737            a = avl_next(a);
1738            if (item->flag & WAL_ITEM_FLUSHED_OUT) {
1739                continue; // need not flush this item into main index..
1740            } // item exists solely for in-memory snapshots
1741            fs = _wal_do_flush(item, flush_func, dbhandle,
1742                               &stale_seqnum_list, &kvs_delta_stats);
1743            if (fs != FDB_RESULT_SUCCESS) {
1744                _wal_restore_root_info(dbhandle, &root_info);
1745                break;
1746            }
1747        }
1748    } else {
1749        struct list_elem *a = list_begin(list_head);
1750        while (a) {
1751            item = _get_entry(a, struct wal_item, list_elem_flush);
1752            a = list_next(a);
1753            if (item->flag & WAL_ITEM_FLUSHED_OUT) {
1754                continue; // need not flush this item into main index..
1755            } // item exists solely for in-memory snapshots
1756            fs = _wal_do_flush(item, flush_func, dbhandle,
1757                               &stale_seqnum_list, &kvs_delta_stats);
1758            if (fs != FDB_RESULT_SUCCESS) {
1759                _wal_restore_root_info(dbhandle, &root_info);
1760                break;
1761            }
1762        }
1763    }
1764
1765    // Remove all stale seq entries from the seq tree
1766    if (seq_purge_func(dbhandle, &stale_seqnum_list, &kvs_delta_stats) ==
1767        FDB_RECOVERABLE_ERR)
1768        return FDB_RECOVERABLE_ERR;
1769    // Update each KV store stats after WAL flush
1770    delta_stats_func(file, &kvs_delta_stats);
1771
1772    filemgr_clear_io_inprog(file);
1773    return fs;
1774}
1775
1776fdb_status wal_flush(struct filemgr *file,
1777                     void *dbhandle,
1778                     wal_flush_func *flush_func,
1779                     wal_get_old_offset_func *get_old_offset,
1780                     wal_flush_seq_purge_func *seq_purge_func,
1781                     wal_flush_kvs_delta_stats_func *delta_stats_func,
1782                     union wal_flush_items *flush_items)
1783{
1784    return _wal_flush(file, dbhandle, flush_func, get_old_offset,
1785                      seq_purge_func, delta_stats_func,
1786                      flush_items, false);
1787}
1788
1789fdb_status wal_flush_by_compactor(struct filemgr *file,
1790                                  void *dbhandle,
1791                                  wal_flush_func *flush_func,
1792                                  wal_get_old_offset_func *get_old_offset,
1793                                  wal_flush_seq_purge_func *seq_purge_func,
1794                                  wal_flush_kvs_delta_stats_func *delta_stats_func,
1795                                  union wal_flush_items *flush_items)
1796{
1797    return _wal_flush(file, dbhandle, flush_func, get_old_offset,
1798                      seq_purge_func, delta_stats_func,
1799                      flush_items, true);
1800}
1801
1802fdb_status wal_snapshot_clone(struct snap_handle *shandle_in,
1803                              struct snap_handle **shandle_out,
1804                              fdb_seqnum_t seqnum)
1805{
1806    if (seqnum == FDB_SNAPSHOT_INMEM ||
1807        shandle_in->seqnum == seqnum) {
1808        atomic_incr_uint16_t(&shandle_in->ref_cnt_kvs);
1809        *shandle_out = shandle_in;
1810        return FDB_RESULT_SUCCESS;
1811    }
1812    return FDB_RESULT_INVALID_ARGS;
1813}
1814
1815fdb_status snap_get_stat(struct snap_handle *shandle, struct kvs_stat *stat)
1816{
1817    *stat = shandle->stat;
1818    return FDB_RESULT_SUCCESS;
1819}
1820
1821fdb_status wal_dur_snapshot_open(fdb_seqnum_t seqnum,
1822                                 _fdb_key_cmp_info *key_cmp_info,
1823                                 struct filemgr *file, fdb_txn *txn,
1824                                 struct snap_handle **shandle)
1825{
1826    struct snap_handle *_shandle;
1827    fdb_kvs_id_t kv_id;
1828    fdb_assert(seqnum != FDB_SNAPSHOT_INMEM, seqnum, key_cmp_info->kvs);
1829    if (!key_cmp_info->kvs) {
1830        kv_id = 0;
1831    } else {
1832        kv_id = key_cmp_info->kvs->id;
1833    }
1834    _shandle = _wal_snapshot_create(kv_id, 0, 0);
1835    if (!_shandle) { // LCOV_EXCL_START
1836        return FDB_RESULT_ALLOC_FAIL;
1837    } // LCOV_EXCL_STOP
1838    spin_lock(&file->wal->lock);
1839    _wal_snapshot_init(_shandle, file, txn, seqnum, key_cmp_info);
1840    spin_unlock(&file->wal->lock);
1841    *shandle = _shandle;
1842    return FDB_RESULT_SUCCESS;
1843}
1844
1845fdb_status wal_snap_insert(struct snap_handle *shandle, fdb_doc *doc,
1846                           uint64_t offset)
1847{
1848    struct wal_item query;
1849    struct wal_item_header query_hdr;
1850    struct wal_item *item;
1851    struct avl_node *node;
1852    query_hdr.key = doc->key;
1853    query_hdr.keylen = doc->keylen;
1854    query.header = &query_hdr;
1855    node = avl_search(&shandle->key_tree, &query.avl_keysnap, _snap_cmp_bykey);
1856
1857    if (!node) {
1858        item = (struct wal_item *) calloc(1, sizeof(struct wal_item));
1859        item->header = (struct wal_item_header *) malloc(
1860                                  sizeof(struct wal_item_header));
1861        item->header->key = doc->key;
1862        item->header->keylen = doc->keylen;
1863        item->seqnum = doc->seqnum;
1864        if (doc->deleted) {
1865            if (!offset) { // deleted item can never be at offset 0
1866                item->action = WAL_ACT_REMOVE; // must be a purged item
1867            } else {
1868                item->action = WAL_ACT_LOGICAL_REMOVE;
1869            }
1870        } else {
1871            item->action = WAL_ACT_INSERT;
1872        }
1873        item->offset = offset;
1874        avl_insert(&shandle->key_tree, &item->avl_keysnap, _snap_cmp_bykey);
1875        avl_insert(&shandle->seq_tree, &item->avl_seq, _wal_cmp_byseq);
1876
1877        // Note: same logic in wal_commit
1878        shandle->stat.wal_ndocs++;
1879        if (doc->deleted) {
1880            shandle->stat.wal_ndeletes++;
1881        }
1882        item->shandle = shandle;
1883    } else {
1884        // replace existing node with new values so there are no duplicates
1885        item = _get_entry(node, struct wal_item, avl_keysnap);
1886        free(item->header->key);
1887        item->header->key = doc->key;
1888        item->header->keylen = doc->keylen;
1889        if (item->seqnum != doc->seqnum) { // Re-index duplicate into seqtree
1890            item->seqnum = doc->seqnum;
1891            avl_remove(&shandle->seq_tree, &item->avl_seq);
1892            avl_insert(&shandle->seq_tree, &item->avl_seq, _wal_cmp_byseq);
1893        }
1894
1895        // Note: same logic in wal_commit
1896        if (item->action == WAL_ACT_INSERT &&
1897            doc->deleted) {
1898            shandle->stat.wal_ndeletes++;
1899        } else if (item->action == WAL_ACT_LOGICAL_REMOVE &&
1900                   !doc->deleted) {
1901            shandle->stat.wal_ndeletes--;
1902        }
1903
1904        item->action = doc->deleted ? WAL_ACT_LOGICAL_REMOVE : WAL_ACT_INSERT;
1905        item->offset = offset;
1906    }
1907    return FDB_RESULT_SUCCESS;
1908}
1909
1910fdb_status wal_copyto_snapshot(struct filemgr *file,
1911                               struct snap_handle *shandle,
1912                               bool is_multi_kv)
1913{
1914    struct list_elem *ee;
1915    struct avl_node *a;
1916    struct wal_item *item;
1917    struct wal_item_header *header;
1918    fdb_kvs_id_t kv_id = 0;
1919    fdb_doc doc;
1920    size_t i = 0;
1921    size_t num_shards = file->wal->num_shards;
1922
1923    shandle->stat.wal_ndocs = 0; // WAL copy will populate
1924    shandle->stat.wal_ndeletes = 0; // these 2 stats
1925
1926    // Get the list of active transactions now
1927    for (; i < num_shards; ++i) {
1928        spin_lock(&file->wal->key_shards[i].lock);
1929        a = avl_first(&file->wal->key_shards[i]._map);
1930        while (a) {
1931            header = _get_entry(a, struct wal_item_header, avl_key);
1932            if (is_multi_kv) {
1933                buf2kvid(header->chunksize, header->key, &kv_id);
1934                if (kv_id != shandle->id) {
1935                    a = avl_next(a);
1936                    continue;
1937                }
1938            }
1939            ee = list_begin(&header->items);
1940            while (ee) {
1941                uint64_t offset;
1942                item = _get_entry(ee, struct wal_item, list_elem);
1943                // Skip any uncommitted item, if not part of either global or
1944                // the current transaction
1945                if (!(item->flag & WAL_ITEM_COMMITTED) &&
1946                        item->txn != &file->global_txn &&
1947                        item->txn != shandle->snap_txn) {
1948                    ee = list_next(ee);
1949                    continue;
1950                }
1951                // Skip the partially committed items too.
1952                if (_wal_item_partially_committed(shandle->global_txn,
1953                                                  &shandle->active_txn_list,
1954                                                  shandle->snap_txn, item)) {
1955                    ee = list_next(ee);
1956                    continue;
1957                }
1958
1959                if (item->seqnum > shandle->seqnum) {
1960                    ee = list_next(ee);
1961                    continue;
1962                }
1963
1964                doc.keylen = item->header->keylen;
1965                doc.key = malloc(doc.keylen); // (freed in fdb_snapshot_close)
1966                memcpy(doc.key, item->header->key, doc.keylen);
1967                doc.seqnum = item->seqnum;
1968                doc.deleted = (item->action == WAL_ACT_LOGICAL_REMOVE ||
1969                               item->action == WAL_ACT_REMOVE);
1970                if (item->action == WAL_ACT_REMOVE) {
1971                    offset = 0;
1972                } else {
1973                    offset = item->offset;
1974                }
1975
1976                wal_snap_insert(shandle, &doc, offset);
1977                break; // We just require a single latest copy in the snapshot
1978            }
1979            a = avl_next(a);
1980        }
1981        spin_unlock(&file->wal->key_shards[i].lock);
1982    }
1983    return FDB_RESULT_SUCCESS;
1984}
1985
1986static
1987fdb_status _wal_snap_find(struct snap_handle *shandle, fdb_doc *doc,
1988                          uint64_t *offset)
1989{
1990    struct wal_item query, *item;
1991    struct avl_node *node;
1992    if (doc->seqnum == SEQNUM_NOT_USED || (doc->key && doc->keylen > 0)) {
1993        if (!shandle->key_tree.root) {
1994            return FDB_RESULT_KEY_NOT_FOUND;
1995        }
1996        struct wal_item_header query_hdr;
1997        query.header = &query_hdr;
1998        // search by key
1999        query_hdr.key = doc->key;
2000        query_hdr.keylen = doc->keylen;
2001        node = avl_search(&shandle->key_tree, &query.avl_keysnap,
2002                          _snap_cmp_bykey);
2003        if (!node) {
2004            return FDB_RESULT_KEY_NOT_FOUND;
2005        } else {
2006            item = _get_entry(node, struct wal_item, avl_keysnap);
2007            *offset = item->offset;
2008            if (item->action == WAL_ACT_INSERT) {
2009                doc->deleted = false;
2010            } else {
2011                doc->deleted = true;
2012                if (item->action == WAL_ACT_REMOVE) {
2013                    *offset = BLK_NOT_FOUND;
2014                }
2015            }
2016            doc->seqnum = item->seqnum;
2017            return FDB_RESULT_SUCCESS;
2018        }
2019    } else if (shandle->seq_tree.root) {
2020        // search by sequence number
2021        query.seqnum = doc->seqnum;
2022        node = avl_search(&shandle->seq_tree, &query.avl_seq, _wal_cmp_byseq);
2023        if (!node) {
2024            return FDB_RESULT_KEY_NOT_FOUND;
2025        } else {
2026            item = _get_entry(node, struct wal_item, avl_seq);
2027            *offset = item->offset;
2028            if (item->action == WAL_ACT_INSERT) {
2029                doc->deleted = false;
2030            } else {
2031                doc->deleted = true;
2032                if (item->action == WAL_ACT_REMOVE) {
2033                    *offset = BLK_NOT_FOUND;
2034                }
2035            }
2036            return FDB_RESULT_SUCCESS;
2037        }
2038    }
2039    return FDB_RESULT_KEY_NOT_FOUND;
2040}
2041
2042fdb_status wal_snapshot_close(struct snap_handle *shandle,
2043                              struct filemgr *file)
2044{
2045    if (!atomic_decr_uint16_t(&shandle->ref_cnt_kvs)) {
2046        struct avl_node *a, *nexta;
2047        if (!shandle->is_persisted_snapshot &&
2048            shandle->snap_tag_idx) { // the KVS did have items in WAL..
2049            return FDB_RESULT_SUCCESS;
2050        }
2051        for (a = avl_first(&shandle->key_tree);
2052             a; a = nexta) {
2053            struct wal_item *item = _get_entry(a, struct wal_item, avl_keysnap);
2054            nexta = avl_next(a);
2055            avl_remove(&shandle->key_tree, &item->avl_keysnap);
2056            free(item->header->key);
2057            free(item->header);
2058            free(item);
2059        }
2060        for (struct list_elem *e = list_begin(&shandle->active_txn_list); e;) {
2061            struct list_elem *e_next = list_next(e);
2062            struct wal_txn_wrapper *active_txn = _get_entry(e,
2063                                                   struct wal_txn_wrapper, le);
2064            free(active_txn);
2065            e = e_next;
2066        }
2067        free(shandle);
2068    }
2069    return FDB_RESULT_SUCCESS;
2070}
2071
2072fdb_status wal_itr_init(struct filemgr *file,
2073                        struct snap_handle *shandle,
2074                        bool by_key,
2075                        struct wal_iterator **wal_iterator)
2076{
2077    struct wal_iterator *wal_itr = (struct wal_iterator *)
2078                      malloc(sizeof(struct wal_iterator));
2079    if (!wal_itr) { // LCOV_EXCL_START
2080        return FDB_RESULT_ALLOC_FAIL;
2081    } // LCOV_EXCL_STOP
2082
2083    // If key_cmp_info is non-null it implies key-range iteration
2084    if (by_key) {
2085        wal_itr->map_shards = file->wal->key_shards;
2086        avl_init(&wal_itr->merge_tree, &shandle->cmp_info);
2087        wal_itr->by_key = true;
2088    } else {
2089        // Otherwise wal iteration is requested over sequence range
2090        if (file->config->seqtree_opt != FDB_SEQTREE_USE) {
2091            return FDB_RESULT_INVALID_CONFIG;
2092        }
2093        wal_itr->map_shards = file->wal->seq_shards;
2094        avl_init(&wal_itr->merge_tree, NULL);
2095        wal_itr->by_key = false;
2096    }
2097
2098    if (shandle->cmp_info.kvs) {
2099        wal_itr->multi_kvs = true;
2100    } else {
2101        wal_itr->multi_kvs = false;
2102    }
2103    wal_itr->cursor_pos = NULL;
2104    wal_itr->item_prev = NULL;
2105
2106    wal_itr->num_shards = file->wal->num_shards;
2107    if (!shandle->is_persisted_snapshot) {
2108        wal_itr->cursors = (struct wal_cursor *)calloc(wal_itr->num_shards,
2109                           sizeof(struct wal_cursor));
2110    } else {
2111        wal_itr->cursors = NULL;
2112    }
2113    wal_itr->shandle = shandle;
2114    wal_itr->_wal = file->wal;
2115    wal_itr->direction = FDB_ITR_DIR_NONE;
2116    *wal_iterator = wal_itr;
2117    return FDB_RESULT_SUCCESS;
2118}
2119
2120INLINE bool _wal_is_my_kvs(struct wal_item_header *header,
2121                           struct wal_iterator *wal_itr)
2122{
2123    if (wal_itr->multi_kvs) {
2124        fdb_kvs_id_t kv_id;
2125        buf2kvid(header->chunksize, header->key, &kv_id);
2126        if (kv_id != wal_itr->shandle->id) {
2127            return false;
2128        }
2129    }
2130    return true;
2131}
2132
2133static
2134struct wal_item *_wal_itr_search_greater_bykey(struct wal_iterator *wal_itr,
2135                                               struct wal_item *query)
2136{
2137    struct avl_node *a = NULL;
2138    struct wal_cursor *cursor;
2139
2140    // search is a stateless operation, so re-initialize shard's merge-sort tree
2141    avl_init(&wal_itr->merge_tree, (void*)&wal_itr->shandle->cmp_info);
2142    for (size_t i = 0; i < wal_itr->num_shards; ++i) {
2143        struct wal_item *item = NULL;
2144        spin_lock(&wal_itr->map_shards[i].lock);
2145        if (query) {
2146            avl_set_aux(&wal_itr->map_shards[i]._map,
2147                        (void*)&wal_itr->shandle->cmp_info);
2148            a = avl_search_greater(&wal_itr->map_shards[i]._map,
2149                                   &query->header->avl_key,
2150                                   _wal_cmp_bykey);
2151        } else {
2152            a = avl_first(&wal_itr->map_shards[i]._map);
2153        }
2154        if (a) {
2155            do {
2156                struct wal_item_header *header;
2157                header = _get_entry(a, struct wal_item_header, avl_key);
2158                if (!_wal_is_my_kvs(header, wal_itr)) {
2159                    item = NULL;
2160                    break;
2161                }
2162                item = _wal_get_snap_item(header, wal_itr->shandle);
2163            } while (!item && (a = avl_next(a)));
2164        }
2165        spin_unlock(&wal_itr->map_shards[i].lock);
2166        if (item) {
2167            wal_itr->cursors[i].item = item;
2168            // re-insert into the merge-sorted AVL tree across all shards
2169            avl_insert(&wal_itr->merge_tree, &wal_itr->cursors[i].avl_merge,
2170                       _merge_cmp_bykey);
2171        } else {
2172            wal_itr->cursors[i].item = NULL;
2173        }
2174    } // done for all WAL shards
2175
2176    wal_itr->cursor_pos = avl_first(&wal_itr->merge_tree);
2177
2178    if (!wal_itr->cursor_pos) {
2179        wal_itr->item_prev = NULL;
2180        return NULL;
2181    }
2182    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2183    // save the current cursor position for reverse iteration
2184    wal_itr->item_prev = cursor->item;
2185    return cursor->item;
2186}
2187
2188static
2189struct wal_item *_wal_itr_search_greater_byseq(struct wal_iterator *wal_itr,
2190                                               struct wal_item *query)
2191{
2192    struct avl_node *a = NULL;
2193    struct wal_cursor *cursor;
2194
2195    // search is a stateless operation, so re-initialize shard's merge-sort tree
2196    avl_init(&wal_itr->merge_tree, &wal_itr->shandle->cmp_info);
2197    for (size_t i = 0; i < wal_itr->num_shards; ++i) {
2198        struct wal_item *item = NULL, *_item;
2199        if (query) {
2200            spin_lock(&wal_itr->map_shards[i].lock);
2201            a = avl_search_greater(&wal_itr->map_shards[i]._map, &query->avl_seq,
2202                                   _wal_cmp_byseq);
2203        } else {
2204            a = avl_first(&wal_itr->map_shards[i]._map);
2205        }
2206        while (a) {
2207            item = _get_entry(a, struct wal_item, avl_seq);
2208            if (!_wal_is_my_kvs(item->header, wal_itr)) {
2209                item = NULL;
2210                break;
2211            }
2212            _item = _wal_get_snap_item(item->header, wal_itr->shandle);
2213            if (item == _item) {
2214                break;
2215            } else {
2216                item = NULL;
2217            }
2218            a = avl_next(a);
2219        }
2220        spin_unlock(&wal_itr->map_shards[i].lock);
2221        if (item) {
2222            wal_itr->cursors[i].item = item;
2223            // re-insert into the merge-sorted AVL tree across all shards
2224            avl_insert(&wal_itr->merge_tree, &wal_itr->cursors[i].avl_merge,
2225                       _merge_cmp_byseq);
2226        } else {
2227            wal_itr->cursors[i].item = NULL;
2228        }
2229    } // done for all WAL shards
2230
2231    wal_itr->cursor_pos = avl_first(&wal_itr->merge_tree);
2232    if (!wal_itr->cursor_pos) {
2233        return NULL;
2234    }
2235    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2236    // save the current cursor position for reverse iteration
2237    wal_itr->item_prev = cursor->item;
2238    return cursor->item;
2239}
2240
2241struct wal_item* wal_itr_search_greater(struct wal_iterator *wal_itr,
2242                                        struct wal_item *query)
2243{
2244    if (wal_itr->shandle->is_persisted_snapshot) {
2245        struct avl_node *a;
2246        if (wal_itr->by_key) {
2247            a = avl_search_greater(&wal_itr->shandle->key_tree,
2248                                   &query->avl_keysnap,
2249                                   _snap_cmp_bykey);
2250            wal_itr->cursor_pos = a;
2251            return a ? _get_entry(a, struct wal_item, avl_keysnap) : NULL;
2252        } else {
2253            a = avl_search_greater(&wal_itr->shandle->seq_tree,
2254                                   &query->avl_seq,
2255                                   _wal_cmp_byseq);
2256            wal_itr->cursor_pos = a;
2257            return a ? _get_entry(a, struct wal_item, avl_seq) : NULL;
2258        }
2259    }
2260    if (wal_itr->shandle->snap_tag_idx) {
2261        wal_itr->direction = FDB_ITR_FORWARD;
2262        if (wal_itr->by_key) {
2263            return _wal_itr_search_greater_bykey(wal_itr, query);
2264        } else {
2265            return _wal_itr_search_greater_byseq(wal_itr, query);
2266        }
2267    } // else no items in WAL in snapshot..
2268    return NULL;
2269}
2270
2271static
2272struct wal_item* _wal_itr_search_smaller_bykey(struct wal_iterator *wal_itr,
2273                                               struct wal_item *query)
2274{
2275    struct avl_node *a = NULL;
2276    struct wal_cursor *cursor;
2277
2278    // search is a stateless operation, so re-initialize shard's merge-sort tree
2279    avl_init(&wal_itr->merge_tree, &wal_itr->shandle->cmp_info);
2280    for (size_t i = 0; i < wal_itr->num_shards; ++i) {
2281        struct wal_item *item = NULL;
2282        spin_lock(&wal_itr->map_shards[i].lock);
2283        if (query) {
2284            avl_set_aux(&wal_itr->map_shards[i]._map,
2285                        (void*)&wal_itr->shandle->cmp_info);
2286            a = avl_search_smaller(&wal_itr->map_shards[i]._map,
2287                                   &query->header->avl_key,
2288                                   _wal_cmp_bykey);
2289        } else { // no item implies search to last key
2290            a = avl_last(&wal_itr->map_shards[i]._map);
2291        }
2292        if (a) {
2293            do {
2294                struct wal_item_header *header;
2295                header = _get_entry(a, struct wal_item_header, avl_key);
2296                if (!_wal_is_my_kvs(header, wal_itr)) {
2297                    item = NULL;
2298                    break;
2299                }
2300                item = _wal_get_snap_item(header, wal_itr->shandle);
2301            } while (!item && (a = avl_prev(a)));
2302        }
2303        spin_unlock(&wal_itr->map_shards[i].lock);
2304        if (item) {
2305            wal_itr->cursors[i].item = item;
2306            // re-insert into the merge-sorted AVL tree across all shards
2307            avl_insert(&wal_itr->merge_tree, &wal_itr->cursors[i].avl_merge,
2308                       _merge_cmp_bykey);
2309        } else {
2310            wal_itr->cursors[i].item = NULL;
2311        }
2312    } // done for all WAL shards
2313
2314    wal_itr->cursor_pos = avl_last(&wal_itr->merge_tree);
2315    if (!wal_itr->cursor_pos) {
2316        wal_itr->item_prev = NULL;
2317        return NULL;
2318    }
2319
2320    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2321    // save the current cursor position for reverse iteration
2322    wal_itr->item_prev = cursor->item;
2323    return cursor->item;
2324}
2325
2326static
2327struct wal_item *_wal_itr_search_smaller_byseq(struct wal_iterator *wal_itr,
2328                                               struct wal_item *query)
2329{
2330    struct avl_node *a = NULL;
2331    struct wal_cursor *cursor;
2332
2333    // search is a stateless operation, so re-initialize shard's merge-sort tree
2334    avl_init(&wal_itr->merge_tree, &wal_itr->shandle->cmp_info);
2335    for (size_t i = 0; i < wal_itr->num_shards; ++i) {
2336        struct wal_item *item = NULL, *_item;
2337        spin_lock(&wal_itr->map_shards[i].lock);
2338        if (query) {
2339            a = avl_search_smaller(&wal_itr->map_shards[i]._map,
2340                                   &query->avl_seq, _wal_cmp_byseq);
2341        } else {
2342            a = avl_last(&wal_itr->map_shards[i]._map);
2343        }
2344        while (a) {
2345            item = _get_entry(a, struct wal_item, avl_seq);
2346
2347            if (!_wal_is_my_kvs(item->header, wal_itr)) {
2348                item = NULL;
2349                break;
2350            }
2351            _item = _wal_get_snap_item(item->header, wal_itr->shandle);
2352            if (item == _item) {
2353                break;
2354            } else {
2355                item = NULL;
2356            }
2357            a = avl_prev(a);
2358        }
2359        spin_unlock(&wal_itr->map_shards[i].lock);
2360        if (item) {
2361            wal_itr->cursors[i].item = item;
2362            // re-insert into the merge-sorted AVL tree across all shards
2363            avl_insert(&wal_itr->merge_tree, &wal_itr->cursors[i].avl_merge,
2364                       _merge_cmp_byseq);
2365        } else {
2366            wal_itr->cursors[i].item = NULL;
2367        }
2368    } // done for all WAL shards
2369
2370    wal_itr->cursor_pos = avl_last(&wal_itr->merge_tree);
2371    if (!wal_itr->cursor_pos) {
2372        wal_itr->item_prev = NULL;
2373        return NULL;
2374    }
2375    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2376    // save the current cursor position for reverse iteration
2377    wal_itr->item_prev = cursor->item;
2378    return cursor->item;
2379}
2380
2381struct wal_item* wal_itr_search_smaller(struct wal_iterator *wal_itr,
2382                                        struct wal_item *query)
2383{
2384    if (wal_itr->shandle->is_persisted_snapshot) {
2385        struct avl_node *a;
2386        if (wal_itr->by_key) {
2387            a = avl_search_smaller(&wal_itr->shandle->key_tree,
2388                                   &query->avl_keysnap,
2389                                   _snap_cmp_bykey);
2390            wal_itr->cursor_pos = a;
2391            return a ? _get_entry(a, struct wal_item, avl_keysnap) : NULL;
2392        } else {
2393            a = avl_search_smaller(&wal_itr->shandle->seq_tree,
2394                                   &query->avl_seq,
2395                                   _wal_cmp_byseq);
2396            wal_itr->cursor_pos = a;
2397            return a ? _get_entry(a, struct wal_item, avl_seq) : NULL;
2398        }
2399    }
2400
2401    if (wal_itr->shandle->snap_tag_idx) {
2402        wal_itr->direction = FDB_ITR_REVERSE;
2403        if (!wal_itr->by_key) {
2404            return _wal_itr_search_smaller_byseq(wal_itr, query);
2405        } else {
2406            return _wal_itr_search_smaller_bykey(wal_itr, query);
2407        }
2408    } // else no items in WAL in for this snapshot..
2409    return NULL;
2410}
2411
2412// The following iterator movements are stateful..
2413static
2414struct wal_item *_wal_itr_next_bykey(struct wal_iterator *wal_itr)
2415{
2416    struct wal_cursor *cursor = _get_entry(wal_itr->cursor_pos,
2417                                           struct wal_cursor, avl_merge);
2418    struct wal_cursor cur_item = *cursor; // save cur item for merge sort
2419    struct wal_item_header *header = cur_item.item->header;
2420    size_t cur_shard_num = cursor - wal_itr->cursors;
2421    struct wal_item *item = NULL;
2422
2423    wal_itr->item_prev = cursor->item; // save for direction change
2424
2425    spin_lock(&wal_itr->map_shards[cur_shard_num].lock);
2426    avl_set_aux(&wal_itr->map_shards[cur_shard_num]._map,
2427            (void*)&wal_itr->shandle->cmp_info);
2428    struct avl_node *a = avl_next(&header->avl_key);
2429    if (a) {
2430        do {
2431            header = _get_entry(a, struct wal_item_header, avl_key);
2432            if (!_wal_is_my_kvs(header, wal_itr)) {
2433                item = NULL;
2434                break;
2435            }
2436            item = _wal_get_snap_item(header, wal_itr->shandle);
2437        } while (!item && (a = avl_next(a)));
2438    }
2439    spin_unlock(&wal_itr->map_shards[cur_shard_num].lock);
2440    avl_remove(&wal_itr->merge_tree, &cursor->avl_merge);
2441    if (item) {
2442        // re-insert this merge sorted item back into merge-sort tree..
2443        wal_itr->cursors[cur_shard_num].item = item;
2444        avl_insert(&wal_itr->merge_tree,
2445                   &wal_itr->cursors[cur_shard_num].avl_merge,
2446                   _merge_cmp_bykey);
2447    } else {
2448        wal_itr->cursors[cur_shard_num].item = NULL;
2449    }
2450
2451    wal_itr->cursor_pos = avl_search_greater(&wal_itr->merge_tree,
2452                                             &cur_item.avl_merge,
2453                                             _merge_cmp_bykey);
2454    if (!wal_itr->cursor_pos) {
2455        return NULL;
2456    }
2457    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2458    return cursor->item;
2459}
2460
2461static
2462struct wal_item *_wal_itr_next_byseq(struct wal_iterator *wal_itr)
2463{
2464    struct wal_cursor *cursor = _get_entry(wal_itr->cursor_pos,
2465                                           struct wal_cursor, avl_merge);
2466    struct wal_cursor cur_item = *cursor; // save cur item for merge sort
2467    size_t cur_shard_num = cursor - wal_itr->cursors;
2468    struct wal_item *item = NULL, *_item;
2469
2470    wal_itr->item_prev = cursor->item; // save for direction change
2471
2472    spin_lock(&wal_itr->map_shards[cur_shard_num].lock);
2473    struct avl_node *a = avl_next(&cur_item.item->avl_seq);
2474    while (a) {
2475        item = _get_entry(a, struct wal_item, avl_seq);
2476        if (!_wal_is_my_kvs(item->header, wal_itr)) {
2477            item = NULL;
2478            break;
2479        }
2480        _item = _wal_get_snap_item(item->header, wal_itr->shandle);
2481        if (item == _item) {
2482            break;
2483        } else {
2484            item = NULL;
2485        }
2486        a = avl_next(a);
2487    }
2488    spin_unlock(&wal_itr->map_shards[cur_shard_num].lock);
2489    avl_remove(&wal_itr->merge_tree, &cursor->avl_merge);
2490    if (item) {
2491        wal_itr->cursors[cur_shard_num].item = item;
2492        // re-insert this merge sorted item back into merge-sort tree..
2493        avl_insert(&wal_itr->merge_tree,
2494                   &wal_itr->cursors[cur_shard_num].avl_merge,
2495                   _merge_cmp_byseq);
2496    } else {
2497        wal_itr->cursors[cur_shard_num].item = NULL;
2498    }
2499
2500    wal_itr->cursor_pos = avl_search_greater(&wal_itr->merge_tree,
2501                                             &cur_item.avl_merge,
2502                                             _merge_cmp_byseq);
2503    if (!wal_itr->cursor_pos) {
2504        return NULL;
2505    }
2506    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2507    return cursor->item;
2508}
2509
2510struct wal_item* wal_itr_next(struct wal_iterator *wal_itr)
2511{
2512    struct wal_item *result = NULL;
2513    if (wal_itr->shandle->is_persisted_snapshot) {
2514        wal_itr->cursor_pos = avl_next(wal_itr->cursor_pos);
2515        if (wal_itr->by_key) {
2516            return wal_itr->cursor_pos ? _get_entry(wal_itr->cursor_pos,
2517                                                struct wal_item, avl_keysnap) : NULL;
2518        } else {
2519            return wal_itr->cursor_pos ? _get_entry(wal_itr->cursor_pos,
2520                                                struct wal_item, avl_seq) : NULL;
2521        }
2522    }
2523
2524    if (!wal_itr->shandle->snap_tag_idx) { // no items in WAL in snapshot..
2525        return NULL;
2526    }
2527    if (wal_itr->direction == FDB_ITR_FORWARD) {
2528        if (!wal_itr->cursor_pos) {
2529            return result;
2530        }
2531        if (wal_itr->by_key) {
2532            result = _wal_itr_next_bykey(wal_itr);
2533        } else {
2534            result = _wal_itr_next_byseq(wal_itr);
2535        }
2536    } else { // change of direction involves searching across all shards..
2537        if (!wal_itr->item_prev) {
2538            return result;
2539        }
2540        if (wal_itr->by_key) {
2541            result = _wal_itr_search_greater_bykey(wal_itr,
2542                                                   wal_itr->item_prev);
2543        } else {
2544            result = _wal_itr_search_greater_byseq(wal_itr,
2545                                                   wal_itr->item_prev);
2546        }
2547    }
2548    wal_itr->direction = FDB_ITR_FORWARD;
2549    return result;
2550}
2551
2552static
2553struct wal_item *_wal_itr_prev_bykey(struct wal_iterator *wal_itr)
2554{
2555
2556    struct wal_cursor *cursor = _get_entry(wal_itr->cursor_pos,
2557                                           struct wal_cursor, avl_merge);
2558    struct wal_cursor cur_item = *cursor; // save cur item for merge sort
2559    struct wal_item_header *header = cur_item.item->header;
2560    size_t cur_shard_num = cursor - wal_itr->cursors;
2561    struct wal_item *item = NULL;
2562
2563    wal_itr->item_prev = cursor->item; // save for direction change
2564
2565    spin_lock(&wal_itr->map_shards[cur_shard_num].lock);
2566    avl_set_aux(&wal_itr->map_shards[cur_shard_num]._map,
2567                (void*)&wal_itr->shandle->cmp_info);
2568    struct avl_node *a = avl_prev(&header->avl_key);
2569    if (a) {
2570        do {
2571            header = _get_entry(a, struct wal_item_header, avl_key);
2572            if (!_wal_is_my_kvs(header, wal_itr)) {
2573                item = NULL;
2574                break;
2575            }
2576            item = _wal_get_snap_item(header, wal_itr->shandle);
2577        } while (!item && (a = avl_prev(a)));
2578    }
2579    spin_unlock(&wal_itr->map_shards[cur_shard_num].lock);
2580    avl_remove(&wal_itr->merge_tree, &cursor->avl_merge);
2581    if (item) {
2582        // re-insert this merge sorted item back into merge-sort tree..
2583        wal_itr->cursors[cur_shard_num].item = item;
2584        avl_insert(&wal_itr->merge_tree,
2585                   &wal_itr->cursors[cur_shard_num].avl_merge,
2586                   _merge_cmp_bykey);
2587    } else {
2588        wal_itr->cursors[cur_shard_num].item = NULL;
2589    }
2590
2591    wal_itr->cursor_pos = avl_search_smaller(&wal_itr->merge_tree,
2592                                             &cur_item.avl_merge,
2593                                             _merge_cmp_bykey);
2594    if (!wal_itr->cursor_pos) {
2595        return NULL;
2596    }
2597    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2598    return cursor->item;
2599}
2600
2601static
2602struct wal_item *_wal_itr_prev_byseq(struct wal_iterator *wal_itr)
2603{
2604    struct wal_cursor *cursor = _get_entry(wal_itr->cursor_pos,
2605                                           struct wal_cursor, avl_merge);
2606    struct wal_cursor cur_item = *cursor; // save cur item for merge sort
2607    size_t cur_shard_num = cursor - wal_itr->cursors;
2608    struct wal_item *item = NULL, *_item;
2609
2610    wal_itr->item_prev = cursor->item; // save for direction change
2611
2612    spin_lock(&wal_itr->map_shards[cur_shard_num].lock);
2613    struct avl_node *a = avl_prev(&cur_item.item->avl_seq);
2614    while (a) {
2615        item = _get_entry(a, struct wal_item, avl_seq);
2616        if (!_wal_is_my_kvs(item->header, wal_itr)) {
2617            item = NULL;
2618            break;
2619        }
2620        _item = _wal_get_snap_item(item->header, wal_itr->shandle);
2621        if (item == _item) {
2622            break;
2623        } else {
2624            item = NULL;
2625        }
2626        a = avl_prev(a);
2627    }
2628    spin_unlock(&wal_itr->map_shards[cur_shard_num].lock);
2629    avl_remove(&wal_itr->merge_tree, &cursor->avl_merge);
2630    if (item) {
2631        wal_itr->cursors[cur_shard_num].item = item;
2632        // re-insert this merge sorted item back into merge-sort tree..
2633        avl_insert(&wal_itr->merge_tree,
2634                &wal_itr->cursors[cur_shard_num].avl_merge,
2635                _merge_cmp_byseq);
2636    } else {
2637        wal_itr->cursors[cur_shard_num].item = NULL;
2638    }
2639
2640    wal_itr->cursor_pos = avl_search_smaller(&wal_itr->merge_tree,
2641            &cur_item.avl_merge,
2642            _merge_cmp_byseq);
2643    if (!wal_itr->cursor_pos) {
2644        return NULL;
2645    }
2646    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2647    return cursor->item;
2648}
2649
2650struct wal_item* wal_itr_prev(struct wal_iterator *wal_itr)
2651{
2652    struct wal_item *result = NULL;
2653    if (wal_itr->shandle->is_persisted_snapshot) {
2654        wal_itr->cursor_pos = avl_prev(wal_itr->cursor_pos);
2655        if (wal_itr->by_key) {
2656            return wal_itr->cursor_pos ? _get_entry(wal_itr->cursor_pos,
2657                    struct wal_item, avl_keysnap) : NULL;
2658        } else {
2659            return wal_itr->cursor_pos ? _get_entry(wal_itr->cursor_pos,
2660                    struct wal_item, avl_seq) : NULL;
2661        }
2662    }
2663
2664    if (!wal_itr->shandle->snap_tag_idx) { // no items in WAL in snapshot..
2665        return NULL;
2666    }
2667    if (wal_itr->direction == FDB_ITR_REVERSE) {
2668        if (!wal_itr->cursor_pos) {
2669            return result;
2670        }
2671        if (wal_itr->by_key) {
2672            result = _wal_itr_prev_bykey(wal_itr);
2673        } else {
2674            result = _wal_itr_prev_byseq(wal_itr);
2675        }
2676    } else { // change of direction involves searching across all shards..
2677        if (!wal_itr->item_prev) {
2678            return result;
2679        }
2680        if (wal_itr->by_key) {
2681            result = _wal_itr_search_smaller_bykey(wal_itr,
2682                    wal_itr->item_prev);
2683        } else {
2684            result = _wal_itr_search_smaller_byseq(wal_itr,
2685                    wal_itr->item_prev);
2686        }
2687    }
2688    wal_itr->direction = FDB_ITR_REVERSE;
2689    return result;
2690}
2691
2692/**TODO:
2693 * Sequence iteration currently can be O(n2) if there are huge number of updates
2694 * Need to address this complexity with following functions
2695 */
2696fdb_status wal_itr_set_first(struct wal_iterator *wal_itr,
2697        struct wal_item *elem)
2698{
2699    return FDB_RESULT_SUCCESS;
2700}
2701
2702fdb_status wal_itr_set_last(struct wal_iterator *wal_itr,
2703        struct wal_item *elem)
2704{
2705    return FDB_RESULT_SUCCESS;
2706}
2707
2708struct wal_item *_wal_itr_first_bykey(struct wal_iterator *wal_itr)
2709{
2710    struct wal_item_header dummy_key;
2711    struct wal_item dummy_item;
2712    fdb_kvs_id_t kv_id = wal_itr->shandle->id;
2713    dummy_key.key = &kv_id;
2714    dummy_key.keylen = sizeof(fdb_kvs_id_t);
2715    dummy_item.header = &dummy_key;
2716    if (wal_itr->multi_kvs) {
2717        return _wal_itr_search_greater_bykey(wal_itr, &dummy_item);
2718    } // else we are in single kv instance mode
2719    return _wal_itr_search_greater_bykey(wal_itr, NULL);
2720}
2721
2722struct wal_item* _wal_itr_first_byseq(struct wal_iterator *wal_itr)
2723{
2724    return _wal_itr_search_greater_byseq(wal_itr, NULL);
2725}
2726
2727struct wal_item* wal_itr_first(struct wal_iterator *wal_itr) {
2728    if (wal_itr->shandle->is_persisted_snapshot) {
2729        struct avl_node *a;
2730        if (wal_itr->by_key) {
2731            a = avl_first(&wal_itr->shandle->key_tree);
2732            wal_itr->cursor_pos = a;
2733            return a ? _get_entry(a, struct wal_item, avl_keysnap) : NULL;
2734        } else {
2735            a = avl_first(&wal_itr->shandle->seq_tree);
2736            wal_itr->cursor_pos = a;
2737            return a ? _get_entry(a, struct wal_item, avl_seq) : NULL;
2738        }
2739    }
2740
2741    if (wal_itr->shandle->snap_tag_idx) {
2742        wal_itr->direction = FDB_ITR_FORWARD;
2743        if (wal_itr->by_key) {
2744            return _wal_itr_first_bykey(wal_itr);
2745        } else {
2746            return _wal_itr_first_byseq(wal_itr);
2747        }
2748    } // else no items in WAL for this snapshot
2749    return NULL;
2750}
2751
2752struct wal_item *_wal_itr_last_bykey(struct wal_iterator *wal_itr)
2753{
2754    struct wal_item_header dummy_key;
2755    struct wal_item dummy_item;
2756    fdb_kvs_id_t kv_id = wal_itr->shandle->id + 1; // set to next higher KVS
2757    dummy_key.key = &kv_id;
2758    dummy_key.keylen = sizeof(fdb_kvs_id_t);
2759    dummy_item.header = &dummy_key;
2760    if (wal_itr->multi_kvs) {
2761        return _wal_itr_search_smaller_bykey(wal_itr, &dummy_item);
2762    } // else search go to last element in single kv instance mode..
2763    return _wal_itr_search_smaller_bykey(wal_itr, NULL);
2764}
2765
2766struct wal_item *_wal_itr_last_byseq(struct wal_iterator *wal_itr)
2767{
2768    return _wal_itr_search_smaller_byseq(wal_itr, NULL);
2769}
2770
2771struct wal_item* wal_itr_last(struct wal_iterator *wal_itr) {
2772    if (wal_itr->shandle->is_persisted_snapshot) {
2773        struct avl_node *a;
2774        if (wal_itr->by_key) {
2775            a = avl_last(&wal_itr->shandle->key_tree);
2776            wal_itr->cursor_pos = a;
2777            return a ? _get_entry(a, struct wal_item, avl_keysnap) : NULL;
2778        } else {
2779            a = avl_last(&wal_itr->shandle->seq_tree);
2780            wal_itr->cursor_pos = a;
2781            return a ? _get_entry(a, struct wal_item, avl_seq) : NULL;
2782        }
2783    }
2784
2785    if (wal_itr->shandle->snap_tag_idx) { // no items in WAL in snapshot..
2786        wal_itr->direction = FDB_ITR_REVERSE;
2787        if (wal_itr->by_key) {
2788            return _wal_itr_last_bykey(wal_itr);
2789        } else {
2790            return _wal_itr_last_byseq(wal_itr);
2791        }
2792    }
2793    return NULL;
2794}
2795
2796fdb_status wal_itr_close(struct wal_iterator *wal_itr)
2797{
2798    free(wal_itr->cursors);
2799    free(wal_itr);
2800    return FDB_RESULT_SUCCESS;
2801}
2802
2803// discard entries in txn
2804fdb_status wal_discard(struct filemgr *file, fdb_txn *txn)
2805{
2806    struct wal_item *item;
2807    struct list_elem *e;
2808    size_t shard_num, seq_shard_num;
2809    uint64_t mem_overhead = 0;
2810
2811    e = list_begin(txn->items);
2812    while(e) {
2813        item = _get_entry(e, struct wal_item, list_elem_txn);
2814        shard_num = get_checksum((uint8_t*)item->header->key,
2815                                 item->header->keylen) %
2816                                 file->wal->num_shards;
2817        spin_lock(&file->wal->key_shards[shard_num].lock);
2818
2819        if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
2820            // remove from seq map
2821            seq_shard_num = item->seqnum % file->wal->num_shards;
2822            spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
2823            avl_remove(&file->wal->seq_shards[seq_shard_num]._map,
2824                       &item->avl_seq);
2825            spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
2826        }
2827
2828        // remove from header's list
2829        list_remove(&item->header->items, &item->list_elem);
2830        // remove header if empty
2831        if (list_begin(&item->header->items) == NULL) {
2832            //remove from key map
2833            avl_remove(&file->wal->key_shards[shard_num]._map,
2834                       &item->header->avl_key);
2835            mem_overhead += sizeof(struct wal_item_header) + item->header->keylen;
2836            // free key and header
2837            free(item->header->key);
2838            free(item->header);
2839        }
2840        // remove from txn's list
2841        e = list_remove(txn->items, e);
2842        if (item->txn_id == file->global_txn.txn_id ||
2843            item->flag & WAL_ITEM_COMMITTED) {
2844            atomic_decr_uint32_t(&file->wal->num_flushable);
2845        }
2846        if (item->action != WAL_ACT_REMOVE) {
2847            atomic_sub_uint64_t(&file->wal->datasize, item->doc_size,
2848                                std::memory_order_relaxed);
2849            // mark as stale if the item is not an immediate remove
2850            filemgr_mark_stale(file, item->offset, item->doc_size);
2851        }
2852
2853        // free
2854        free(item);
2855        atomic_decr_uint32_t(&file->wal->size);
2856        mem_overhead += sizeof(struct wal_item);
2857        spin_unlock(&file->wal->key_shards[shard_num].lock);
2858    }
2859    atomic_sub_uint64_t(&file->wal->mem_overhead, mem_overhead,
2860                        std::memory_order_relaxed);
2861
2862    return FDB_RESULT_SUCCESS;
2863}
2864
2865typedef enum wal_discard_type {
2866    WAL_DISCARD_UNCOMMITTED_ONLY,
2867    WAL_DISCARD_ALL,
2868    WAL_DISCARD_KV_INS,
2869} wal_discard_t;
2870
2871// discard all entries
2872static fdb_status _wal_close(struct filemgr *file,
2873                             wal_discard_t type, void *aux,
2874                             err_log_callback *log_callback)
2875{
2876    struct wal_item *item;
2877    struct wal_item_header *header;
2878    struct list_elem *e;
2879    struct avl_node *a, *next_a;
2880    struct snap_handle *shandle;
2881    fdb_kvs_id_t kv_id, kv_id_req;
2882    bool committed;
2883    size_t i = 0, seq_shard_num;
2884    size_t num_shards = wal_get_num_shards(file);
2885    uint64_t mem_overhead = 0;
2886    struct snap_handle query;
2887
2888    if (type == WAL_DISCARD_KV_INS) { // multi KV ins mode
2889        if (aux == NULL) { // aux must contain pointer to KV ID
2890            return FDB_RESULT_INVALID_ARGS;
2891        }
2892        kv_id_req = *(fdb_kvs_id_t*)aux;
2893        query.id = kv_id_req;
2894        query.snap_tag_idx = 0;
2895        a = avl_search_greater(&file->wal->wal_snapshot_tree,
2896                               &query.avl_id, _wal_snap_cmp);
2897        if (a) {
2898            shandle = _get_entry(a, struct snap_handle, avl_id);
2899            if (shandle->id != kv_id_req) {
2900                a = NULL;
2901            }
2902        }
2903        // cleanup any snapshot handles not reclaimed by wal_flush
2904        for (next_a = NULL; a; a = next_a) {
2905            shandle = _get_entry(a, struct snap_handle, avl_id);
2906            if (_wal_snap_is_immutable(shandle)) {
2907                fdb_log(log_callback, FDB_RESULT_INVALID_ARGS,
2908                        "WAL closed before snapshot close in kv id %" _F64
2909                        " in file %s", shandle->id, file->filename);
2910            }
2911            if (shandle->id != kv_id_req) {
2912                break;
2913            }
2914            next_a = avl_next(a);
2915            avl_remove(&file->wal->wal_snapshot_tree, a);
2916            for (struct list_elem *e = list_begin(&shandle->active_txn_list);
2917                 e;) {
2918                struct list_elem *e_next = list_next(e);
2919                struct wal_txn_wrapper *active_txn = _get_entry(e,
2920                        struct wal_txn_wrapper, le);
2921                free(active_txn);
2922                e = e_next;
2923            }
2924            free(shandle);
2925        }
2926    } else {
2927        // cleanup all snapshot handles not reclaimed by wal_flush
2928        for (a = avl_first(&file->wal->wal_snapshot_tree), next_a = NULL;
2929             a; a = next_a) {
2930            shandle = _get_entry(a, struct snap_handle, avl_id);
2931            if (_wal_snap_is_immutable(shandle)) {
2932                fdb_log(log_callback, FDB_RESULT_INVALID_ARGS,
2933                        "WAL closed before snapshot close in kv id %" _F64
2934                        " with %" _F64 " docs in file %s", shandle->id,
2935                        atomic_get_uint64_t(&shandle->wal_ndocs), file->filename);
2936            }
2937            next_a = avl_next(a);
2938            avl_remove(&file->wal->wal_snapshot_tree, a);
2939            for (struct list_elem *e = list_begin(&shandle->active_txn_list);
2940                 e;) {
2941                struct list_elem *e_next = list_next(e);
2942                struct wal_txn_wrapper *active_txn = _get_entry(e,
2943                        struct wal_txn_wrapper, le);
2944                free(active_txn);
2945                e = e_next;
2946            }
2947            free(shandle);
2948        }
2949    }
2950
2951    for (; i < num_shards; ++i) {
2952        spin_lock(&file->wal->key_shards[i].lock);
2953        a = avl_first(&file->wal->key_shards[i]._map);
2954        while (a) {
2955            header = _get_entry(a, struct wal_item_header, avl_key);
2956            if (type == WAL_DISCARD_KV_INS) { // multi KV ins mode
2957                buf2kvid(header->chunksize, header->key, &kv_id);
2958                // begin while loop only on matching KV ID
2959                e = (kv_id == kv_id_req)?(list_begin(&header->items)):(NULL);
2960            } else {
2961                kv_id = 0;
2962                e = list_begin(&header->items);
2963            }
2964
2965            committed = false;
2966            while (e) {
2967                item = _get_entry(e, struct wal_item, list_elem);
2968                if ( type == WAL_DISCARD_ALL ||
2969                     (type == WAL_DISCARD_UNCOMMITTED_ONLY &&
2970                      !(item->flag & WAL_ITEM_COMMITTED)) ||
2971                     type == WAL_DISCARD_KV_INS) {
2972                    // remove from header's list
2973                    e = list_remove(&header->items, e);
2974                    if (!(item->flag & WAL_ITEM_COMMITTED)) {
2975                        // and also remove from transaction's list
2976                        list_remove(item->txn->items, &item->list_elem_txn);
2977                        if (item->action != WAL_ACT_REMOVE) {
2978                            // mark as stale if item is not committed and not an immediate remove
2979                            filemgr_mark_stale(file, item->offset, item->doc_size);
2980                        }
2981                    } else {
2982                        // committed item exists and will be removed
2983                        committed = true;
2984                    }
2985
2986                    if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
2987                        // remove from seq hash table
2988                        seq_shard_num = item->seqnum % num_shards;
2989                        spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
2990                        avl_remove(&file->wal->seq_shards[seq_shard_num]._map,
2991                                   &item->avl_seq);
2992                        spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
2993                    }
2994
2995                    if (item->action != WAL_ACT_REMOVE) {
2996                        atomic_sub_uint64_t(&file->wal->datasize, item->doc_size,
2997                                            std::memory_order_relaxed);
2998                    }
2999                    if (item->txn_id == file->global_txn.txn_id || committed) {
3000                        if (item->action != WAL_ACT_INSERT) {
3001                            _wal_update_stat(file, kv_id, _WAL_DROP_DELETE);
3002                        } else {
3003                            _wal_update_stat(file, kv_id, _WAL_DROP_SET);
3004                        }
3005                        atomic_decr_uint32_t(&file->wal->num_flushable);
3006                    }
3007                    free(item);
3008                    atomic_decr_uint32_t(&file->wal->size);
3009                    mem_overhead += sizeof(struct wal_item);
3010                } else {
3011                    e = list_next(e);
3012                }
3013            }
3014            a = avl_next(a);
3015
3016            if (list_begin(&header->items) == NULL) {
3017                // wal_item_header becomes empty
3018                // free header and remove from key map
3019                avl_remove(&file->wal->key_shards[i]._map,
3020                           &header->avl_key);
3021                mem_overhead += sizeof(struct wal_item_header) + head