xref: /6.0.3/forestdb/src/wal.cc (revision 5b0b2b2e)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdlib.h>
19#include <string.h>
20#include <stdint.h>
21
22#include "filemgr.h"
23#include "common.h"
24#include "hash.h"
25#include "docio.h"
26#include "wal.h"
27#include "hash_functions.h"
28#include "fdb_internal.h"
29
30#include "memleak.h"
31
32
33#ifdef __DEBUG
34#ifndef __DEBUG_WAL
35    #undef DBG
36    #undef DBGCMD
37    #undef DBGSW
38    #define DBG(...)
39    #define DBGCMD(...)
40    #define DBGSW(n, ...)
41#else
42# include "debug.h"
43#endif
44#endif
45
46INLINE int _wal_keycmp(void *key1, size_t keylen1, void *key2, size_t keylen2)
47{
48    if (keylen1 == keylen2) {
49        return memcmp(key1, key2, keylen1);
50    } else {
51        size_t len = MIN(keylen1, keylen2);
52        int cmp = memcmp(key1, key2, len);
53        if (cmp != 0) return cmp;
54        else {
55            return (int)((int)keylen1 - (int)keylen2);
56        }
57    }
58}
59
60INLINE int __wal_cmp_bykey(struct wal_item_header *aa,
61                           struct wal_item_header *bb,
62                           void *aux)
63{
64    struct _fdb_key_cmp_info *info = (struct _fdb_key_cmp_info *)aux;
65    if (info->kvs_config.custom_cmp) {
66        // custom compare function for variable-length key
67        if (info->kvs) {
68            // multi KV instance mode
69            // KV ID should be compared separately
70            size_t size_chunk = info->kvs->root->config.chunksize;
71            fdb_kvs_id_t a_id, b_id;
72            buf2kvid(size_chunk, aa->key, &a_id);
73            buf2kvid(size_chunk, bb->key, &b_id);
74
75            if (a_id < b_id) {
76                return -1;
77            } else if (a_id > b_id) {
78                return 1;
79            } else {
80                return info->kvs_config.custom_cmp(
81                            (uint8_t*)aa->key + size_chunk,
82                            aa->keylen - size_chunk,
83                            (uint8_t*)bb->key + size_chunk,
84                            bb->keylen - size_chunk);
85            }
86        } else {
87            return info->kvs_config.custom_cmp(aa->key, aa->keylen,
88                                               bb->key, bb->keylen);
89        }
90    } else {
91        return _wal_keycmp(aa->key, aa->keylen, bb->key, bb->keylen);
92    }
93}
94
95INLINE int _wal_cmp_bykey(struct avl_node *a, struct avl_node *b, void *aux)
96{
97    struct wal_item_header *aa, *bb;
98    aa = _get_entry(a, struct wal_item_header, avl_key);
99    bb = _get_entry(b, struct wal_item_header, avl_key);
100    return __wal_cmp_bykey(aa, bb, aux);
101}
102
103INLINE int _merge_cmp_bykey(struct avl_node *a, struct avl_node *b, void *aux)
104{
105    struct wal_cursor *aa, *bb;
106    aa = _get_entry(a, struct wal_cursor, avl_merge);
107    bb = _get_entry(b, struct wal_cursor, avl_merge);
108    return __wal_cmp_bykey(aa->item->header, bb->item->header, aux);
109}
110
111INLINE int _snap_cmp_bykey(struct avl_node *a, struct avl_node *b, void *aux)
112{
113    struct wal_item *aa, *bb;
114    aa = _get_entry(a, struct wal_item, avl_keysnap);
115    bb = _get_entry(b, struct wal_item, avl_keysnap);
116    return __wal_cmp_bykey(aa->header, bb->header, aux);
117}
118
119INLINE int __wal_cmp_byseq(struct wal_item *aa, struct wal_item *bb) {
120    if (aa->flag & WAL_ITEM_MULTI_KV_INS_MODE) {
121        // multi KV instance mode
122        int size_chunk = aa->header->chunksize;
123        fdb_kvs_id_t id_aa, id_bb;
124        // KV ID is stored at the first 8 bytes in the key
125        buf2kvid(size_chunk, aa->header->key, &id_aa);
126        buf2kvid(size_chunk, bb->header->key, &id_bb);
127        if (id_aa < id_bb) {
128            return -1;
129        } else if (id_aa > id_bb) {
130            return 1;
131        } else {
132            return _CMP_U64(aa->seqnum, bb->seqnum);
133        }
134    }
135    return _CMP_U64(aa->seqnum, bb->seqnum);
136}
137
138INLINE int _wal_cmp_byseq(struct avl_node *a, struct avl_node *b, void *aux)
139{
140    struct wal_item *aa, *bb;
141    aa = _get_entry(a, struct wal_item, avl_seq);
142    bb = _get_entry(b, struct wal_item, avl_seq);
143    return __wal_cmp_byseq(aa, bb);
144}
145
146INLINE int _merge_cmp_byseq(struct avl_node *a, struct avl_node *b, void *aux)
147{
148    struct wal_cursor *aa, *bb;
149    aa = _get_entry(a, struct wal_cursor, avl_merge);
150    bb = _get_entry(b, struct wal_cursor, avl_merge);
151    return __wal_cmp_byseq(aa->item, bb->item);
152}
153
154INLINE int _wal_snap_cmp(struct avl_node *a, struct avl_node *b, void *aux)
155{
156    struct snap_handle *aa, *bb;
157    aa = _get_entry(a, struct snap_handle, avl_id);
158    bb = _get_entry(b, struct snap_handle, avl_id);
159
160    if (aa->id < bb->id) { // first compare by kv id
161        return -1;
162    } else if (aa->id > bb->id) {
163        return 1;
164    } else { // within same kv store compare by snapshot id
165        if (aa->snap_tag_idx < bb->snap_tag_idx) {
166            return -1;
167        } else if (aa->snap_tag_idx > bb->snap_tag_idx) {
168            return 1;
169        }
170    }
171    return 0;
172}
173
174fdb_status wal_init(struct filemgr *file, int nbucket)
175{
176    size_t num_shards;
177
178    file->wal->flag = WAL_FLAG_INITIALIZED;
179    atomic_init_uint32_t(&file->wal->size, 0);
180    atomic_init_uint32_t(&file->wal->num_flushable, 0);
181    atomic_init_uint64_t(&file->wal->datasize, 0);
182    atomic_init_uint64_t(&file->wal->mem_overhead, 0);
183    file->wal->wal_dirty = FDB_WAL_CLEAN;
184
185    list_init(&file->wal->txn_list);
186    spin_init(&file->wal->lock);
187
188    if (file->config->num_wal_shards) {
189        file->wal->num_shards = file->config->num_wal_shards;
190    } else {
191        file->wal->num_shards = DEFAULT_NUM_WAL_PARTITIONS;
192    }
193
194    num_shards = wal_get_num_shards(file);
195    file->wal->key_shards = (wal_shard *)
196        malloc(sizeof(struct wal_shard) * num_shards);
197
198    if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
199        file->wal->seq_shards = (wal_shard *)
200            malloc(sizeof(struct wal_shard) * num_shards);
201    } else {
202        file->wal->seq_shards = NULL;
203    }
204
205    for (int i = num_shards - 1; i >= 0; --i) {
206        avl_init(&file->wal->key_shards[i]._map, NULL);
207        spin_init(&file->wal->key_shards[i].lock);
208        if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
209            avl_init(&file->wal->seq_shards[i]._map, NULL);
210            spin_init(&file->wal->seq_shards[i].lock);
211        }
212    }
213
214    avl_init(&file->wal->wal_snapshot_tree, NULL);
215
216    DBG("wal item size %ld\n", sizeof(struct wal_item));
217    return FDB_RESULT_SUCCESS;
218}
219
220fdb_status wal_destroy(struct filemgr *file)
221{
222    size_t i = 0;
223    size_t num_shards = wal_get_num_shards(file);
224    // Free all WAL shards
225    for (; i < num_shards; ++i) {
226        spin_destroy(&file->wal->key_shards[i].lock);
227        if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
228            spin_destroy(&file->wal->seq_shards[i].lock);
229        }
230    }
231    spin_destroy(&file->wal->lock);
232    free(file->wal->key_shards);
233    if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
234        free(file->wal->seq_shards);
235    }
236    return FDB_RESULT_SUCCESS;
237}
238
239int wal_is_initialized(struct filemgr *file)
240{
241    return file->wal->flag & WAL_FLAG_INITIALIZED;
242}
243
244INLINE struct snap_handle * _wal_get_latest_snapshot(struct wal *_wal,
245                                                     fdb_kvs_id_t kv_id)
246{
247    struct avl_node *node;
248    struct snap_handle query, *shandle;
249    // In order to get the highest snapshot id in this kv store..
250    query.snap_tag_idx = 0; // search for snapshot id smaller than the smallest
251    query.id = kv_id + 1;  // in the next kv store.
252    node = avl_search_smaller(&_wal->wal_snapshot_tree, &query.avl_id,
253                              _wal_snap_cmp);
254    if (node) {
255        shandle = _get_entry(node, struct snap_handle, avl_id);
256        if (shandle->id == kv_id) {
257            return shandle;
258        }
259    }
260    return NULL;
261}
262
263INLINE struct snap_handle *_wal_snapshot_create(fdb_kvs_id_t kv_id,
264                                                wal_snapid_t snap_tag,
265                                                wal_snapid_t snap_flush_tag)
266{
267   struct snap_handle *shandle = (struct snap_handle *)
268                                   calloc(1, sizeof(struct snap_handle));
269   if (shandle) {
270       shandle->id = kv_id;
271       shandle->snap_tag_idx = snap_tag;
272       shandle->snap_stop_idx = snap_flush_tag;
273       atomic_init_uint16_t(&shandle->ref_cnt_kvs, 0);
274       atomic_init_uint64_t(&shandle->wal_ndocs, 0);
275       return shandle;
276   }
277   return NULL;
278}
279
280// When a snapshot reader has called wal_snapshot_open(), the ref count
281// on the snapshot handle will be incremented
282INLINE bool _wal_snap_is_immutable(struct snap_handle *shandle) {
283    return atomic_get_uint16_t(&shandle->ref_cnt_kvs);
284}
285
286/**
287 * Returns highest mutable snapshot or creates one if...
288 * No snapshot exists (First item for a given kv store is inserted)
289 * If the highest snapshot was made immutable by snapshot_open (Write barrier)
290 * If the highest snapshot was made un-readable by wal_flush (Read barrier)
291 */
292INLINE struct snap_handle * _wal_fetch_snapshot(struct wal *_wal,
293                                                fdb_kvs_id_t kv_id)
294{
295    struct snap_handle *open_snapshot;
296    wal_snapid_t snap_id, snap_flush_id = 0;
297    spin_lock(&_wal->lock);
298    open_snapshot = _wal_get_latest_snapshot(_wal, kv_id);
299    if (!open_snapshot || // if first WAL item inserted for KV store
300        _wal_snap_is_immutable(open_snapshot) ||//Write barrier (snapshot_open)
301        open_snapshot->is_flushed) { // wal_flushed (read-write barrier)
302        if (!open_snapshot) {
303            snap_id = 1; // begin snapshots id at 1
304            snap_flush_id = 0; // all past elements can be returned
305            DBG("Fresh KV id %" _F64 " Snapshot %" _F64 "- %" _F64"\n",
306                kv_id, snap_flush_id, snap_id);
307        } else { // read/write barrier means a new WAL snapshot gets created
308            snap_id = open_snapshot->snap_tag_idx + 1;
309            if (!open_snapshot->is_flushed) { // Write barrier only
310                snap_flush_id = open_snapshot->snap_stop_idx;
311                DBG("Write Barrier WAL KV id %" _F64 " Snapshot %" _F64
312                    " - %" _F64 "\n", kv_id, snap_flush_id, snap_id);
313            } else { // WAL flushed! Read & Write barrier
314                snap_flush_id = open_snapshot->snap_tag_idx;
315                DBG("Read-Write Barrier WAL KV id %" _F64 " Snapshot %" _F64
316                    "- %" _F64 "\n",
317                    kv_id, snap_flush_id, snap_id);
318            }
319        }
320        open_snapshot = _wal_snapshot_create(kv_id, snap_id, snap_flush_id);
321        avl_insert(&_wal->wal_snapshot_tree, &open_snapshot->avl_id,
322                   _wal_snap_cmp);
323    }
324    // Increment ndocs for garbage collection of the snapshot
325    // When no more docs refer to a snapshot, it can be safely deleted
326    atomic_incr_uint64_t(&open_snapshot->wal_ndocs);
327    spin_unlock(&_wal->lock);
328    return open_snapshot;
329}
330
331INLINE fdb_status _wal_snapshot_init(struct snap_handle *shandle,
332                                     filemgr *file,
333                                     fdb_txn *txn,
334                                     fdb_seqnum_t seqnum,
335                                     _fdb_key_cmp_info *key_cmp_info)
336{
337    struct list_elem *ee;
338    shandle->snap_txn = txn;
339    shandle->cmp_info = *key_cmp_info;
340    atomic_incr_uint16_t(&shandle->ref_cnt_kvs);
341    _kvs_stat_get(file, shandle->id, &shandle->stat);
342    if (seqnum == FDB_SNAPSHOT_INMEM) {
343        shandle->seqnum = fdb_kvs_get_seqnum(file, shandle->id);
344        shandle->is_persisted_snapshot = false;
345    } else {
346        shandle->stat.wal_ndocs = 0; // WAL copy will populate
347        shandle->stat.wal_ndeletes = 0; // these 2 stats
348        shandle->seqnum = seqnum;
349        shandle->is_persisted_snapshot = true;
350    }
351    avl_init(&shandle->key_tree, &shandle->cmp_info);
352    avl_init(&shandle->seq_tree, NULL);
353    shandle->global_txn = &file->global_txn;
354    list_init(&shandle->active_txn_list);
355    ee = list_begin(&file->wal->txn_list);
356    while (ee) {
357        struct wal_txn_wrapper *txn_wrapper;
358        fdb_txn *active_txn;
359        txn_wrapper = _get_entry(ee, struct wal_txn_wrapper, le);
360        active_txn = txn_wrapper->txn;
361        // except for global_txn
362        if (active_txn != &file->global_txn) {
363            txn_wrapper = (struct wal_txn_wrapper *)
364                calloc(1, sizeof(struct wal_txn_wrapper));
365            txn_wrapper->txn = active_txn;
366            list_push_front(&shandle->active_txn_list, &txn_wrapper->le);
367        }
368        ee = list_next(ee);
369    }
370
371    return FDB_RESULT_SUCCESS;
372}
373
374fdb_status wal_snapshot_open(struct filemgr *file,
375                             fdb_txn *txn,
376                             fdb_kvs_id_t kv_id,
377                             fdb_seqnum_t seqnum,
378                             _fdb_key_cmp_info *key_cmp_info,
379                             struct snap_handle **shandle)
380{
381    struct wal *_wal = file->wal;
382    struct snap_handle *_shandle;
383
384    spin_lock(&_wal->lock);
385    _shandle = _wal_get_latest_snapshot(_wal, kv_id);
386    if (!_shandle || // No item exist in WAL for this KV Store
387        !atomic_get_uint64_t(&_shandle->wal_ndocs) || // Empty snapshot
388        _shandle->is_flushed) { // Latest snapshot has read-write barrier
389        // This can happen when a new snapshot is attempted and WAL was flushed
390        // and no mutations after WAL flush - the snapshot exists solely for
391        // existing open snapshot iterators
392        _shandle = _wal_snapshot_create(kv_id, 0, 0);
393        if (!_shandle) { // LCOV_EXCL_START
394            spin_unlock(&_wal->lock);
395            return FDB_RESULT_ALLOC_FAIL;
396        } // LCOV_EXCL_STOP
397        // This snapshot is not inserted into global shared tree
398        _wal_snapshot_init(_shandle, file, txn, seqnum, key_cmp_info);
399        DBG("%s Persisted snapshot taken at %" _F64 " for kv id %" _F64 "\n",
400            file->filename, _shandle->seqnum, kv_id);
401    } else { // Take a snapshot of the latest WAL state for this KV Store
402        if (_wal_snap_is_immutable(_shandle)) { // existing snapshot still open
403            atomic_incr_uint16_t(&_shandle->ref_cnt_kvs); // ..just Clone it
404        } else { // make this snapshot of the WAL immutable..
405            _wal_snapshot_init(_shandle, file, txn, seqnum, key_cmp_info);
406            DBG("%s Snapshot init %" _F64 " - %" _F64 " taken at %"
407                _F64 " for kv id %" _F64 "\n",
408                file->filename, _shandle->snap_stop_idx,
409                _shandle->snap_tag_idx, _shandle->seqnum, kv_id);
410        }
411    }
412    spin_unlock(&_wal->lock);
413    *shandle = _shandle;
414    return FDB_RESULT_SUCCESS;
415}
416
417
418INLINE bool _wal_can_discard(struct wal *_wal,
419                             struct wal_item *_item,
420                             struct wal_item *covering_item)
421{
422#ifndef _MVCC_WAL_ENABLE
423    return true; // if WAL is never shared, this can never be false
424#endif // _MVCC_WAL_ENABLE
425    struct snap_handle *shandle, *snext;
426    wal_snapid_t snap_stop_idx;
427    wal_snapid_t snap_tag_idx;
428    fdb_kvs_id_t kv_id;
429    bool ret = true;
430
431    if (covering_item) { // stop until the covering item's snapshot is found
432        snap_stop_idx = covering_item->shandle->snap_tag_idx;
433    } else {
434        snap_stop_idx = OPEN_SNAPSHOT_TAG;
435    }
436
437    shandle = _item->shandle;
438    fdb_assert(shandle, _item->seqnum, covering_item);
439
440    snap_tag_idx = shandle->snap_tag_idx;
441    kv_id = shandle->id;
442
443    if (_wal_snap_is_immutable(shandle)) {// its active snapshot is still open
444        ret = false; // it cannot be discarded
445    } else { // item's own snapshot is closed, but a later snapshot may need it
446        struct avl_node *node;
447        spin_lock(&_wal->lock);
448        node = avl_next(&shandle->avl_id);
449        while (node) { // check snapshots taken later until its wal was flushed
450            snext = _get_entry(node, struct snap_handle, avl_id);
451            if (snext->id != kv_id) { // don't look beyond current kv store
452                break;
453            }
454
455            if (snext->snap_stop_idx > snap_tag_idx) { // wal was flushed here.
456                break; // From this snapshot onwards, this item is reflected..
457            } // ..in the main index
458
459            if (snext->snap_tag_idx == snap_stop_idx) {
460                break; // we reached the covering item, need not examine further
461            }
462
463            if (_wal_snap_is_immutable(snext)) {
464                ret = false; // a future snapshot needs this item!
465                break;
466            }
467            node = avl_next(node);
468        }
469        spin_unlock(&_wal->lock);
470    }
471    return ret;
472}
473
474typedef enum _wal_update_type_t {
475    _WAL_NEW_DEL, // A new deleted item inserted into WAL
476    _WAL_NEW_SET, // A new non-deleted item inserted into WAL
477    _WAL_SET_TO_DEL, // A set item updated to be deleted
478    _WAL_DEL_TO_SET, // A deleted item updated to a set
479    _WAL_DROP_DELETE, // A deleted item is de-duplicated or dropped
480    _WAL_DROP_SET // A set item is de-duplicated or dropped
481} _wal_update_type;
482
483INLINE void _wal_update_stat(struct filemgr *file, fdb_kvs_id_t kv_id,
484                             _wal_update_type type)
485{
486    switch (type) {
487        case _WAL_NEW_DEL: // inserted deleted doc: ++wal_ndocs, ++wal_ndeletes
488            _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, 1);
489        case _WAL_NEW_SET: // inserted new doc: ++wal_ndocs
490            _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, 1);
491            break;
492        case _WAL_SET_TO_DEL: // update prev doc to deleted: ++wal_ndeletes
493            _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, 1);
494            break;
495        case _WAL_DEL_TO_SET: // update prev deleted doc to set: --wal_ndeletes
496            _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, -1);
497            break;
498        case _WAL_DROP_DELETE: // drop deleted item: --wal_ndocs,--wal_ndeletes
499            _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, -1);
500        case _WAL_DROP_SET: // drop item: --wal_ndocs
501            _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, -1);
502            break;
503    }
504}
505
506INLINE fdb_status _wal_insert(fdb_txn *txn,
507                              struct filemgr *file,
508                              struct _fdb_key_cmp_info *cmp_info,
509                              fdb_doc *doc,
510                              uint64_t offset,
511                              wal_insert_by caller,
512                              bool immediate_remove)
513{
514    struct wal_item *item;
515    struct wal_item_header query, *header;
516    struct snap_handle *shandle;
517    struct list_elem *le;
518    struct avl_node *node;
519    void *key = doc->key;
520    size_t keylen = doc->keylen;
521    size_t chk_sum;
522    size_t shard_num;
523    wal_snapid_t snap_tag;
524    fdb_kvs_id_t kv_id;
525
526    if (file->kv_header) { // multi KV instance mode
527        buf2kvid(file->config->chunksize, doc->key, &kv_id);
528    } else {
529        kv_id = 0;
530    }
531    shandle = _wal_fetch_snapshot(file->wal, kv_id);
532    snap_tag = shandle->snap_tag_idx;
533    query.key = key;
534    query.keylen = keylen;
535    chk_sum = get_checksum((uint8_t*)key, keylen);
536    shard_num = chk_sum % file->wal->num_shards;
537    if (caller == WAL_INS_WRITER) {
538        spin_lock(&file->wal->key_shards[shard_num].lock);
539    }
540
541    // Since we can have a different custom comparison function per kv store
542    // set the custom compare aux function every time before a search is done
543    avl_set_aux(&file->wal->key_shards[shard_num]._map,
544                (void *)cmp_info);
545    node = avl_search(&file->wal->key_shards[shard_num]._map,
546                      &query.avl_key, _wal_cmp_bykey);
547
548    if (node) {
549        // already exist .. retrieve header
550        header = _get_entry(node, struct wal_item_header, avl_key);
551
552        // find uncommitted item belonging to the same txn
553        le = list_begin(&header->items);
554        while (le) {
555            item = _get_entry(le, struct wal_item, list_elem);
556
557            if (item->txn == txn && !(item->flag & WAL_ITEM_COMMITTED ||
558                caller == WAL_INS_COMPACT_PHASE1) &&
559                item->shandle->snap_tag_idx == snap_tag) {
560                item->flag &= ~WAL_ITEM_FLUSH_READY;
561
562                if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
563                    // Re-index the item by new sequence number..
564                    size_t seq_shard_num = item->seqnum % file->wal->num_shards;
565                    if (caller == WAL_INS_WRITER) {
566                        spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
567                    }
568                    avl_remove(&file->wal->seq_shards[seq_shard_num]._map,
569                            &item->avl_seq);
570                    if (caller == WAL_INS_WRITER) {
571                        spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
572                    }
573
574                    item->seqnum = doc->seqnum;
575                    seq_shard_num = doc->seqnum % file->wal->num_shards;
576                    if (caller == WAL_INS_WRITER) {
577                        spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
578                    }
579                    avl_insert(&file->wal->seq_shards[seq_shard_num]._map,
580                            &item->avl_seq, _wal_cmp_byseq);
581                    if (caller == WAL_INS_WRITER) {
582                        spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
583                    }
584                } else {
585                    // just overwrite existing WAL item
586                    item->seqnum = doc->seqnum;
587                }
588
589                // mark previous doc region as stale
590                size_t doc_size_ondisk = doc->size_ondisk;
591                uint32_t stale_len = item->doc_size;
592                uint64_t stale_offset = item->offset;
593                if (item->action == WAL_ACT_INSERT ||
594                    item->action == WAL_ACT_LOGICAL_REMOVE) {
595                    // insert or logical remove
596                    filemgr_mark_stale(file, stale_offset, stale_len);
597                }
598
599                if (doc->deleted) {
600                    if (item->txn == &file->global_txn &&
601                        item->action == WAL_ACT_INSERT) {
602                        _wal_update_stat(file, kv_id, _WAL_SET_TO_DEL);
603                    }
604                    if (offset != BLK_NOT_FOUND && !immediate_remove) {
605                        // purge interval not met yet
606                        item->action = WAL_ACT_LOGICAL_REMOVE;// insert deleted
607                    } else { // drop the deleted doc right away
608                        item->action = WAL_ACT_REMOVE; // immediate prune index
609
610                        if (offset != BLK_NOT_FOUND) {
611                            // immediately mark as stale if offset is given
612                            // (which means that a deletion mark was appended into
613                            //  the file before calling wal_insert()).
614                            filemgr_mark_stale(file, offset, doc_size_ondisk);
615                        }
616                        doc_size_ondisk = 0;
617                    }
618                } else {
619                    if (item->txn == &file->global_txn &&
620                        item->action != WAL_ACT_INSERT) {
621                        _wal_update_stat(file, kv_id, _WAL_DEL_TO_SET);
622                    }
623                    item->action = WAL_ACT_INSERT;
624                }
625                atomic_add_uint64_t(&file->wal->datasize,
626                                    doc_size_ondisk - item->doc_size,
627                                    std::memory_order_relaxed);
628                item->doc_size = doc->size_ondisk;
629                item->offset = offset;
630                item->shandle = shandle;
631
632                // move the item to the front of the list (header)
633                list_remove(&header->items, &item->list_elem);
634                list_push_front(&header->items, &item->list_elem);
635                atomic_decr_uint64_t(&shandle->wal_ndocs);
636                break;
637            }
638            le = list_next(le);
639        }
640
641        if (le == NULL) {
642            // not exist
643            // create new item
644            item = (struct wal_item *)calloc(1, sizeof(struct wal_item));
645
646            if (file->kv_header) { // multi KV instance mode
647                item->flag |= WAL_ITEM_MULTI_KV_INS_MODE;
648            }
649            item->txn = txn;
650            if (txn == &file->global_txn) {
651                atomic_incr_uint32_t(&file->wal->num_flushable);
652            }
653            item->header = header;
654            item->seqnum = doc->seqnum;
655
656            if (doc->deleted) {
657                if (item->txn == &file->global_txn) {
658                    _wal_update_stat(file, kv_id, _WAL_NEW_DEL);
659                }
660                if (offset != BLK_NOT_FOUND && !immediate_remove) {
661                    // purge interval not met yet
662                    item->action = WAL_ACT_LOGICAL_REMOVE;// insert deleted
663                } else { // compactor purge deleted doc
664                    item->action = WAL_ACT_REMOVE; // immediate prune index
665
666                    if (offset != BLK_NOT_FOUND) {
667                        // immediately mark as stale if offset is given
668                        // (which means that a deletion mark was appended into
669                        //  the file before calling wal_insert()).
670                        filemgr_mark_stale(file, offset, doc->size_ondisk);
671                    }
672                }
673            } else {
674                if (item->txn == &file->global_txn) {
675                    _wal_update_stat(file, kv_id, _WAL_NEW_SET);
676                }
677                item->action = WAL_ACT_INSERT;
678            }
679            item->offset = offset;
680            item->doc_size = doc->size_ondisk;
681            item->shandle = shandle;
682            if (item->action != WAL_ACT_REMOVE) {
683                atomic_add_uint64_t(&file->wal->datasize, doc->size_ondisk,
684                                    std::memory_order_relaxed);
685            }
686
687            if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
688                size_t seq_shard_num = doc->seqnum % file->wal->num_shards;
689                if (caller == WAL_INS_WRITER) {
690                    spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
691                }
692                avl_insert(&file->wal->seq_shards[seq_shard_num]._map,
693                           &item->avl_seq, _wal_cmp_byseq);
694                if (caller == WAL_INS_WRITER) {
695                    spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
696                }
697            }
698            // insert into header's list
699            list_push_front(&header->items, &item->list_elem);
700            // also insert into transaction's list
701            list_push_back(txn->items, &item->list_elem_txn);
702
703            atomic_incr_uint32_t(&file->wal->size);
704            atomic_add_uint64_t(&file->wal->mem_overhead,
705                                sizeof(struct wal_item), std::memory_order_relaxed);
706        }
707    } else {
708        // not exist .. create new one
709        // create new header and new item
710        header = (struct wal_item_header*)malloc(sizeof(struct wal_item_header));
711        list_init(&header->items);
712        header->chunksize = file->config->chunksize;
713        header->keylen = keylen;
714        header->key = (void *)malloc(header->keylen);
715        memcpy(header->key, key, header->keylen);
716
717        avl_insert(&file->wal->key_shards[shard_num]._map,
718                   &header->avl_key, _wal_cmp_bykey);
719
720        item = (struct wal_item *)malloc(sizeof(struct wal_item));
721        // entries inserted by compactor is already committed
722        if (caller == WAL_INS_COMPACT_PHASE1) {
723            item->flag = WAL_ITEM_COMMITTED;
724        } else {
725            item->flag = 0x0;
726        }
727        if (file->kv_header) { // multi KV instance mode
728            item->flag |= WAL_ITEM_MULTI_KV_INS_MODE;
729        }
730        item->txn = txn;
731        if (txn == &file->global_txn) {
732            atomic_incr_uint32_t(&file->wal->num_flushable);
733        }
734        item->header = header;
735
736        item->seqnum = doc->seqnum;
737
738        if (doc->deleted) {
739            if (item->txn == &file->global_txn) {
740                _wal_update_stat(file, kv_id, _WAL_NEW_DEL);
741            }
742            if (offset != BLK_NOT_FOUND && !immediate_remove) {// purge interval not met yet
743                item->action = WAL_ACT_LOGICAL_REMOVE;// insert deleted
744            } else { // compactor purge deleted doc
745                item->action = WAL_ACT_REMOVE; // immediate prune index
746
747                if (offset != BLK_NOT_FOUND) {
748                    // immediately mark as stale if offset is given
749                    // (which means that an empty doc was appended before
750                    //  calling wal_insert()).
751                    filemgr_mark_stale(file, offset, doc->size_ondisk);
752                }
753            }
754        } else {
755            if (item->txn == &file->global_txn) {
756                _wal_update_stat(file, kv_id, _WAL_NEW_SET);
757            }
758            item->action = WAL_ACT_INSERT;
759        }
760        item->offset = offset;
761        item->doc_size = doc->size_ondisk;
762        item->shandle = shandle;
763        if (item->action != WAL_ACT_REMOVE) {
764            atomic_add_uint64_t(&file->wal->datasize, doc->size_ondisk,
765                                std::memory_order_relaxed);
766        }
767
768        if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
769            size_t seq_shard_num = doc->seqnum % file->wal->num_shards;
770            if (caller == WAL_INS_WRITER) {
771                spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
772            }
773            avl_insert(&file->wal->seq_shards[seq_shard_num]._map,
774                       &item->avl_seq, _wal_cmp_byseq);
775            if (caller == WAL_INS_WRITER) {
776                spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
777            }
778        }
779
780        // insert into header's list
781        list_push_front(&header->items, &item->list_elem);
782        if (caller == WAL_INS_WRITER || caller == WAL_INS_COMPACT_PHASE2) {
783            // also insert into transaction's list
784            list_push_back(txn->items, &item->list_elem_txn);
785        }
786
787        atomic_incr_uint32_t(&file->wal->size);
788        atomic_add_uint64_t(&file->wal->mem_overhead,
789                            sizeof(struct wal_item) + sizeof(struct wal_item_header) + keylen,
790                            std::memory_order_relaxed);
791    }
792
793    if (caller == WAL_INS_WRITER) {
794        spin_unlock(&file->wal->key_shards[shard_num].lock);
795    }
796
797    return FDB_RESULT_SUCCESS;
798}
799
800fdb_status wal_insert(fdb_txn *txn,
801                      struct filemgr *file,
802                      struct _fdb_key_cmp_info *cmp_info,
803                      fdb_doc *doc,
804                      uint64_t offset,
805                      wal_insert_by caller)
806{
807    return _wal_insert(txn, file, cmp_info, doc, offset, caller, false);
808}
809
810fdb_status wal_immediate_remove(fdb_txn *txn,
811                                struct filemgr *file,
812                                struct _fdb_key_cmp_info *cmp_info,
813                                fdb_doc *doc,
814                                uint64_t offset,
815                                wal_insert_by caller)
816{
817    return _wal_insert(txn, file, cmp_info, doc, offset, caller, true);
818}
819
820INLINE bool _wal_item_partially_committed(fdb_txn *global_txn,
821                                          struct list *active_txn_list,
822                                          fdb_txn *current_txn,
823                                          struct wal_item *item)
824{
825    bool partial_commit = false;
826
827    if (item->flag & WAL_ITEM_COMMITTED &&
828        item->txn != global_txn && item->txn != current_txn) {
829        struct wal_txn_wrapper *txn_wrapper;
830        struct list_elem *txn_elem = list_begin(active_txn_list);
831        while(txn_elem) {
832            txn_wrapper = _get_entry(txn_elem, struct wal_txn_wrapper, le);
833            if (txn_wrapper->txn == item->txn) {
834                partial_commit = true;
835                break;
836            }
837            txn_elem = list_next(txn_elem);
838        }
839    }
840    return partial_commit;
841}
842
843/**
844 * Since items are shared with current & future snapshots...
845 * Find item belonging to snapshot OR
846 * The item from the previous most recent snapshot
847 *
848 * TODO: Due to the fact that transactional items can overwrite
849 *       more recent items created upon fdb_end_trans, we must scan entire list
850 *       to find a qualifying item from the previous most recent snapshot
851 *       This is not efficient and we need a better way of ordering the list
852 */
853INLINE struct wal_item *_wal_get_snap_item(struct wal_item_header *header,
854                                           struct snap_handle *shandle)
855{
856    struct wal_item *item;
857    struct wal_item *max_shared_item = NULL;
858    fdb_txn *txn = shandle->snap_txn;
859    wal_snapid_t tag = shandle->snap_tag_idx;
860    wal_snapid_t snap_stop_tag = shandle->snap_stop_idx;
861    struct list_elem *le = list_begin(&header->items);
862
863    // discard wal keys that have no items in them
864    if (!le) {
865        return NULL;
866    }
867
868    for (; le; le = list_next(le)) {
869        item = _get_entry(le, struct wal_item, list_elem);
870        if (item->txn != txn && !(item->flag & WAL_ITEM_COMMITTED)) {
871            continue;
872        }
873        if (item->shandle->snap_tag_idx > tag) {
874            continue; // this item was inserted after snapshot creation -> skip
875        }
876        if (_wal_item_partially_committed(shandle->global_txn,
877                                          &shandle->active_txn_list,
878                                          txn, item)) {
879            continue;
880        }
881        if (item->shandle->snap_tag_idx == tag) {// Found exact snapshot item
882            max_shared_item = item; // look no further
883            break;
884        }
885
886        // if my snapshot was taken after a WAL flush..
887        if (item->shandle->snap_tag_idx <= snap_stop_tag) {
888            continue; // then do not consider pre-flush items
889        }
890        if (item->shandle->snap_tag_idx < tag) {
891            if (!max_shared_item) {
892                max_shared_item = item;
893            } else if (item->shandle->snap_tag_idx >
894                       max_shared_item->shandle->snap_tag_idx) {
895                max_shared_item = item;
896            }
897        }
898    }
899    return (struct wal_item *)max_shared_item;
900}
901
902static fdb_status _wal_find(fdb_txn *txn,
903                            struct filemgr *file,
904                            fdb_kvs_id_t kv_id,
905                            struct _fdb_key_cmp_info *cmp_info,
906                            struct snap_handle *shandle,
907                            fdb_doc *doc,
908                            uint64_t *offset)
909{
910    struct wal_item item_query, *item = NULL;
911    struct wal_item_header query, *header = NULL;
912    struct list_elem *le = NULL, *_le;
913    struct avl_node *node = NULL;
914    void *key = doc->key;
915    size_t keylen = doc->keylen;
916
917    if (doc->seqnum == SEQNUM_NOT_USED || (key && keylen>0)) {
918        size_t chk_sum = get_checksum((uint8_t*)key, keylen);
919        size_t shard_num = chk_sum % file->wal->num_shards;
920        spin_lock(&file->wal->key_shards[shard_num].lock);
921        // search by key
922        query.key = key;
923        query.keylen = keylen;
924        avl_set_aux(&file->wal->key_shards[shard_num]._map,
925                    (void *)cmp_info);
926        node = avl_search(&file->wal->key_shards[shard_num]._map,
927                          &query.avl_key, _wal_cmp_bykey);
928        if (node) {
929            struct wal_item *committed_item = NULL;
930            // retrieve header
931            header = _get_entry(node, struct wal_item_header, avl_key);
932            if (shandle) {
933                item = _wal_get_snap_item(header, shandle);
934            } else { // regular non-snapshot lookup
935                for (le = list_begin(&header->items);
936                     le; le = _le) {
937                    item = _get_entry(le, struct wal_item, list_elem);
938                    // Items get ordered as follows in the header's list..
939                    // (begin) 6 --- 5 --- 4 --- 1 --- 2 --- 3 <-- (end)
940                    //  Uncommitted items-->     <--- Committed items
941                    if (!committed_item) {
942                        if (item->flag & WAL_ITEM_COMMITTED) {
943                            committed_item = item;
944                            _le = list_end(&header->items);
945                            if (_le == le) { // just one element at the end
946                                _le = NULL; // process current element & exit
947                            } else { // current element is not the last item..
948                                continue; // start reverse scan from the end
949                            }
950                        } else { // uncommitted items - still continue forward
951                            _le = list_next(le);
952                        }
953                    } else { // reverse scan list over committed items..
954                        _le = list_prev(le);
955                        // is it back to the first committed item..
956                        if (_le == &committed_item->list_elem) {
957                            _le = NULL; // need not re-iterate over uncommitted
958                        }
959                    }
960                    if (item->flag & WAL_ITEM_FLUSHED_OUT) {
961                        item = NULL; // item reflected in main index and is not
962                        break; // to be returned for non-snapshot reads
963                    }
964                    // only committed items can be seen by the other handles, OR
965                    // items belonging to the same txn can be found, OR
966                    // a transaction's isolation level is read uncommitted.
967                    if ((item->flag & WAL_ITEM_COMMITTED) ||
968                        (item->txn == txn) ||
969                        (txn->isolation == FDB_ISOLATION_READ_UNCOMMITTED)) {
970                        break;
971                    } else {
972                        item = NULL;
973                    }
974                } // done for all items in the header's list
975            } // done for regular (non-snapshot) lookup
976            if (item) {
977                *offset = item->offset;
978                if (item->action == WAL_ACT_INSERT) {
979                    doc->deleted = false;
980                } else {
981                    doc->deleted = true;
982                    if (item->action == WAL_ACT_REMOVE) {
983                        // Immediately deleted & purged docs have no real
984                        // presence on-disk. wal_find must return SUCCESS
985                        // here to indicate that the doc was deleted to
986                        // prevent main index lookup. Also, it must set the
987                        // offset to BLK_NOT_FOUND to ensure that caller
988                        // does NOT attempt to fetch the doc OR its
989                        // metadata from file.
990                        *offset = BLK_NOT_FOUND;
991                    }
992                }
993                doc->seqnum = item->seqnum;
994                spin_unlock(&file->wal->key_shards[shard_num].lock);
995                return FDB_RESULT_SUCCESS;
996            }
997        }
998        spin_unlock(&file->wal->key_shards[shard_num].lock);
999    } else {
1000        if (file->config->seqtree_opt != FDB_SEQTREE_USE) {
1001            return FDB_RESULT_INVALID_CONFIG;
1002        }
1003        // search by seqnum
1004        struct wal_item_header temp_header;
1005
1006        if (file->kv_header) { // multi KV instance mode
1007            temp_header.key = (void*)alca(uint8_t, file->config->chunksize);
1008            kvid2buf(file->config->chunksize, kv_id, temp_header.key);
1009            item_query.header = &temp_header;
1010        }
1011        item_query.seqnum = doc->seqnum;
1012
1013        size_t shard_num = doc->seqnum % file->wal->num_shards;
1014        spin_lock(&file->wal->seq_shards[shard_num].lock);
1015        node = avl_search(&file->wal->seq_shards[shard_num]._map,
1016                          &item_query.avl_seq, _wal_cmp_byseq);
1017        if (node) {
1018            item = _get_entry(node, struct wal_item, avl_seq);
1019            if ((item->flag & WAL_ITEM_COMMITTED) ||
1020                (item->txn == txn) ||
1021                (txn->isolation == FDB_ISOLATION_READ_UNCOMMITTED)) {
1022                *offset = item->offset;
1023                if (item->action == WAL_ACT_INSERT) {
1024                    doc->deleted = false;
1025                } else {
1026                    doc->deleted = true;
1027                    if (item->action == WAL_ACT_REMOVE) {
1028                        // Immediately deleted & purged doc have no real
1029                        // presence on-disk. wal_find must return SUCCESS
1030                        // here to indicate that the doc was deleted to
1031                        // prevent main index lookup. Also, it must set the
1032                        // offset to BLK_NOT_FOUND to ensure that caller
1033                        // does NOT attempt to fetch the doc OR its
1034                        // metadata from file.
1035                        *offset = BLK_NOT_FOUND;
1036                    }
1037                }
1038                spin_unlock(&file->wal->seq_shards[shard_num].lock);
1039                return FDB_RESULT_SUCCESS;
1040            }
1041        }
1042        spin_unlock(&file->wal->seq_shards[shard_num].lock);
1043    }
1044
1045    return FDB_RESULT_KEY_NOT_FOUND;
1046}
1047
1048static
1049fdb_status _wal_snap_find(struct snap_handle *shandle, fdb_doc *doc,
1050                          uint64_t *offset);
1051
1052fdb_status wal_find(fdb_txn *txn, struct filemgr *file,
1053                    struct _fdb_key_cmp_info *cmp_info,
1054                    struct snap_handle *shandle,
1055                    fdb_doc *doc, uint64_t *offset)
1056{
1057    if (shandle) {
1058        if (shandle->is_persisted_snapshot) {
1059            return _wal_snap_find(shandle, doc, offset);
1060        }
1061    }
1062    return _wal_find(txn, file, 0, cmp_info, shandle, doc, offset);
1063}
1064
1065fdb_status wal_find_kv_id(fdb_txn *txn,
1066                          struct filemgr *file,
1067                          fdb_kvs_id_t kv_id,
1068                          struct _fdb_key_cmp_info *cmp_info,
1069                          struct snap_handle *shandle,
1070                          fdb_doc *doc,
1071                          uint64_t *offset)
1072{
1073    if (shandle) {
1074        if (shandle->is_persisted_snapshot) {
1075            return _wal_snap_find(shandle, doc, offset);
1076        }
1077    }
1078    return _wal_find(txn, file, kv_id, cmp_info, shandle, doc, offset);
1079}
1080
1081// Pre-condition: writer lock (filemgr mutex) must be held for this call
1082// Readers can interleave without lock
1083INLINE void _wal_free_item(struct wal_item *item, struct wal *_wal) {
1084    struct snap_handle *shandle = item->shandle;
1085    if (!atomic_decr_uint64_t(&shandle->wal_ndocs)) {
1086        spin_lock(&_wal->lock);
1087        DBG("%s Last item removed from snapshot %" _F64 "-%" _F64 " %" _F64
1088                " kv id %" _F64 ". Destroy snapshot handle..\n",
1089                shandle->snap_txn && shandle->snap_txn->handle ?
1090                shandle->snap_txn->handle->file->filename : "",
1091                shandle->snap_stop_idx, shandle->snap_tag_idx,
1092                shandle->seqnum, shandle->id);
1093        avl_remove(&_wal->wal_snapshot_tree, &shandle->avl_id);
1094        for (struct list_elem *e = list_begin(&shandle->active_txn_list); e;) {
1095            struct list_elem *e_next = list_next(e);
1096            struct wal_txn_wrapper *active_txn = _get_entry(e,
1097                                                 struct wal_txn_wrapper, le);
1098            free(active_txn);
1099            e = e_next;
1100        }
1101        free(shandle);
1102        spin_unlock(&_wal->lock);
1103    }
1104    memset(item, 0, sizeof(struct wal_item));
1105    free(item);
1106}
1107
1108// move all uncommitted items into 'new_file'
1109fdb_status wal_txn_migration(void *dbhandle,
1110                             void *new_dhandle,
1111                             struct filemgr *old_file,
1112                             struct filemgr *new_file,
1113                             wal_doc_move_func *move_doc)
1114{
1115    int64_t offset;
1116    fdb_doc doc;
1117    fdb_txn *txn;
1118    struct wal_txn_wrapper *txn_wrapper;
1119    struct wal_item_header *header;
1120    struct wal_item *item;
1121    struct avl_node *node;
1122    struct list_elem *e;
1123    size_t i = 0;
1124    size_t num_shards = old_file->wal->num_shards;
1125    uint64_t mem_overhead = 0;
1126    struct _fdb_key_cmp_info cmp_info;
1127
1128    // Note that the caller (i.e., compactor) alreay owns the locks on
1129    // both old_file and new_file filemgr instances. Therefore, it is OK to
1130    // grab each partition lock individually and move all uncommitted items
1131    // to the new_file filemgr instance.
1132
1133    for (; i < num_shards; ++i) {
1134        spin_lock(&old_file->wal->key_shards[i].lock);
1135        node = avl_first(&old_file->wal->key_shards[i]._map);
1136        while(node) {
1137            header = _get_entry(node, struct wal_item_header, avl_key);
1138            e = list_end(&header->items);
1139            while(e) {
1140                item = _get_entry(e, struct wal_item, list_elem);
1141                if (!(item->flag & WAL_ITEM_COMMITTED)) {
1142                    // not committed yet
1143                    // move doc
1144                    offset = move_doc(dbhandle, new_dhandle, item, &doc);
1145                    if (offset <= 0) {
1146                        spin_unlock(&old_file->wal->key_shards[i].lock);
1147                        return offset < 0 ? (fdb_status) offset : FDB_RESULT_READ_FAIL;
1148                    }
1149                    // Note that all items belonging to global_txn should be
1150                    // flushed before calling this function
1151                    // (migrate transactional items only).
1152                    fdb_assert(item->txn != &old_file->global_txn,
1153                               (uint64_t)item->txn, 0);
1154                    cmp_info.kvs_config = item->txn->handle->kvs_config;
1155                    cmp_info.kvs = item->txn->handle->kvs;
1156                    // insert into new_file's WAL
1157                    wal_insert(item->txn, new_file, &cmp_info, &doc, offset,
1158                               WAL_INS_WRITER);
1159
1160                    if (old_file->config->seqtree_opt == FDB_SEQTREE_USE) {
1161                        // remove from seq map
1162                        size_t shard_num = item->seqnum % num_shards;
1163                        spin_lock(&old_file->wal->seq_shards[shard_num].lock);
1164                        avl_remove(&old_file->wal->seq_shards[shard_num]._map,
1165                                &item->avl_seq);
1166                        spin_unlock(&old_file->wal->seq_shards[shard_num].lock);
1167                    }
1168
1169                    // remove from header's list
1170                    e = list_remove_reverse(&header->items, e);
1171                    // remove from transaction's list
1172                    list_remove(item->txn->items, &item->list_elem_txn);
1173                    // decrease num_flushable of old_file if non-transactional update
1174                    if (item->txn == &old_file->global_txn) {
1175                        atomic_decr_uint32_t(&old_file->wal->num_flushable);
1176                    }
1177                    if (item->action != WAL_ACT_REMOVE) {
1178                        atomic_sub_uint64_t(&old_file->wal->datasize, item->doc_size,
1179                                            std::memory_order_relaxed);
1180                    }
1181                    // free item
1182                    free(item);
1183                    // free doc
1184                    free(doc.key);
1185                    free(doc.meta);
1186                    free(doc.body);
1187                    atomic_decr_uint32_t(&old_file->wal->size);
1188                    mem_overhead += sizeof(struct wal_item);
1189                } else {
1190                    e = list_prev(e);
1191                }
1192            }
1193
1194            if (list_begin(&header->items) == NULL) {
1195                // header's list becomes empty
1196                // remove from key map
1197                node = avl_next(node);
1198                avl_remove(&old_file->wal->key_shards[i]._map,
1199                           &header->avl_key);
1200                mem_overhead += header->keylen + sizeof(struct wal_item_header);
1201                // free key & header
1202                free(header->key);
1203                free(header);
1204            } else {
1205                node = avl_next(node);
1206            }
1207        }
1208        spin_unlock(&old_file->wal->key_shards[i].lock);
1209    }
1210    atomic_sub_uint64_t(&old_file->wal->mem_overhead, mem_overhead,
1211                        std::memory_order_relaxed);
1212
1213    spin_lock(&old_file->wal->lock);
1214
1215    // migrate all entries in txn list
1216    e = list_begin(&old_file->wal->txn_list);
1217    while(e) {
1218        txn_wrapper = _get_entry(e, struct wal_txn_wrapper, le);
1219        txn = txn_wrapper->txn;
1220        // except for global_txn
1221        if (txn != &old_file->global_txn) {
1222            e = list_remove(&old_file->wal->txn_list, &txn_wrapper->le);
1223            list_push_front(&new_file->wal->txn_list, &txn_wrapper->le);
1224            // remove previous header info & revnum
1225            txn->prev_hdr_bid = BLK_NOT_FOUND;
1226            txn->prev_revnum = 0;
1227        } else {
1228            e = list_next(e);
1229        }
1230    }
1231
1232    spin_unlock(&old_file->wal->lock);
1233
1234    return FDB_RESULT_SUCCESS;
1235}
1236
1237fdb_status wal_commit(fdb_txn *txn, struct filemgr *file,
1238                      wal_commit_mark_func *func,
1239                      err_log_callback *log_callback)
1240{
1241    int can_overwrite;
1242    struct wal_item *item, *_item;
1243    struct list_elem *e1, *e2;
1244    fdb_kvs_id_t kv_id;
1245    fdb_status status = FDB_RESULT_SUCCESS;
1246    size_t shard_num;
1247    uint64_t mem_overhead = 0;
1248
1249    e1 = list_begin(txn->items);
1250    while(e1) {
1251        item = _get_entry(e1, struct wal_item, list_elem_txn);
1252        fdb_assert(item->txn == txn, item->txn, txn);
1253        // Grab the WAL key shard lock.
1254        shard_num = get_checksum((uint8_t*)item->header->key,
1255                                 item->header->keylen) %
1256                                 file->wal->num_shards;
1257        spin_lock(&file->wal->key_shards[shard_num].lock);
1258
1259        if (!(item->flag & WAL_ITEM_COMMITTED)) {
1260            // get KVS ID
1261            if (item->flag & WAL_ITEM_MULTI_KV_INS_MODE) {
1262                buf2kvid(item->header->chunksize, item->header->key, &kv_id);
1263            } else {
1264                kv_id = 0;
1265            }
1266
1267            item->flag |= WAL_ITEM_COMMITTED;
1268            if (item->txn != &file->global_txn) {
1269                // increase num_flushable if it is transactional update
1270                atomic_incr_uint32_t(&file->wal->num_flushable);
1271                // Also since a transaction doc was committed
1272                // update global WAL stats to reflect this change..
1273                if (item->action == WAL_ACT_INSERT) {
1274                    _wal_update_stat(file, kv_id, _WAL_NEW_SET);
1275                } else {
1276                    _wal_update_stat(file, kv_id, _WAL_NEW_DEL);
1277                }
1278            }
1279            // append commit mark if necessary
1280            if (func) {
1281                status = func(txn->handle, item->offset);
1282                if (status != FDB_RESULT_SUCCESS) {
1283                    fdb_log(log_callback, status,
1284                            "Error in appending a commit mark at offset %"
1285                            _F64 " in "
1286                            "a database file '%s'", item->offset,
1287                            file->filename);
1288                    spin_unlock(&file->wal->key_shards[shard_num].lock);
1289                    atomic_sub_uint64_t(&file->wal->mem_overhead, mem_overhead,
1290                                        std::memory_order_relaxed);
1291                    return status;
1292                }
1293            }
1294            // remove previously committed item if no snapshots refer to it,
1295            // move the committed item to the end of the wal_item_header's list
1296            list_remove(&item->header->items, &item->list_elem);
1297            list_push_back(&item->header->items, &item->list_elem);
1298            // now reverse scan among other committed items to de-duplicate..
1299            e2 = list_prev(&item->list_elem);
1300            while(e2) {
1301                _item = _get_entry(e2, struct wal_item, list_elem);
1302                if (!(_item->flag & WAL_ITEM_COMMITTED)) {
1303                    break;
1304                }
1305                e2 = list_prev(e2);
1306                can_overwrite = (item->shandle == _item->shandle ||
1307                                 _wal_can_discard(file->wal, _item, item));
1308                if (!can_overwrite) {
1309                    item = _item; // new covering item found
1310                    continue;
1311                }
1312                // committed but not flush-ready
1313                // (flush-readied item will be removed by flushing)
1314                if (!(_item->flag & WAL_ITEM_FLUSH_READY)) {
1315                    // remove from list & hash
1316                    list_remove(&item->header->items, &_item->list_elem);
1317                    if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
1318                        size_t seq_shard_num = _item->seqnum
1319                                             % file->wal->num_shards;
1320                        spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
1321                        avl_remove(&file->wal->seq_shards[seq_shard_num]._map,
1322                                   &_item->avl_seq);
1323                        spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
1324                    }
1325
1326                    // mark previous doc region as stale
1327                    uint32_t stale_len = _item->doc_size;
1328                    uint64_t stale_offset = _item->offset;
1329                    if (_item->action == WAL_ACT_INSERT ||
1330                        _item->action == WAL_ACT_LOGICAL_REMOVE) {
1331                        // insert or logical remove
1332                        filemgr_mark_stale(file, stale_offset, stale_len);
1333                    }
1334
1335                    atomic_decr_uint32_t(&file->wal->size);
1336                    atomic_decr_uint32_t(&file->wal->num_flushable);
1337                    if (item->action != WAL_ACT_REMOVE) {
1338                        atomic_sub_uint64_t(&file->wal->datasize,
1339                                            _item->doc_size,
1340                                            std::memory_order_relaxed);
1341                    }
1342                    // simply reduce the stat count...
1343                    if (_item->action == WAL_ACT_INSERT) {
1344                        _wal_update_stat(file, kv_id, _WAL_DROP_SET);
1345                    } else {
1346                        _wal_update_stat(file, kv_id, _WAL_DROP_DELETE);
1347                    }
1348                    mem_overhead += sizeof(struct wal_item);
1349                    _wal_free_item(_item, file->wal);
1350                } else {
1351                    fdb_log(log_callback, status,
1352                            "Wal commit called when wal_flush in progress."
1353                            "item seqnum %" _F64
1354                            " keylen %d flags %x action %d"
1355                            "%s", _item->seqnum, item->header->keylen,
1356                            atomic_get_uint8_t(&_item->flag),
1357                            _item->action, file->filename);
1358                }
1359            }
1360        }
1361
1362        // remove from transaction's list
1363        e1 = list_remove(txn->items, e1);
1364        spin_unlock(&file->wal->key_shards[shard_num].lock);
1365    }
1366    atomic_sub_uint64_t(&file->wal->mem_overhead, mem_overhead,
1367                        std::memory_order_relaxed);
1368
1369    return status;
1370}
1371
1372static int _wal_flush_cmp(struct avl_node *a, struct avl_node *b, void *aux)
1373{
1374    struct wal_item *aa, *bb;
1375    aa = _get_entry(a, struct wal_item, avl_flush);
1376    bb = _get_entry(b, struct wal_item, avl_flush);
1377
1378    if (aa->old_offset < bb->old_offset) {
1379        return -1;
1380    } else if (aa->old_offset > bb->old_offset) {
1381        return 1;
1382    } else {
1383        // old_offset can be 0 if the document was newly inserted
1384        if (aa->offset < bb->offset) {
1385            return -1;
1386        } else if (aa->offset > bb->offset) {
1387            return 1;
1388        } else {
1389            // Note: get_old_offset() may return same old_offset on different keys;
1390            // this is because hbtrie_find_offset() (internally called by
1391            // get_old_offset()) does not compare entire key string but just prefix
1392            // only due to performance issue.
1393            // As a result, this case (keys are different but both old_offset and
1394            // offset are same) very rarely happens and causes crash.
1395            // In this case, we need to additionally compare sequence numbers
1396            // to distinguish those two different items.
1397            if (aa->seqnum < bb->seqnum) {
1398                return -1;
1399            } else if (aa->seqnum > bb->seqnum) {
1400                return 1;
1401            } else {
1402                return 0;
1403            }
1404        }
1405    }
1406}
1407
1408INLINE void _wal_release_item(struct filemgr *file, size_t shard_num,
1409                              fdb_kvs_id_t kv_id, struct wal_item *item) {
1410    list_remove(&item->header->items, &item->list_elem);
1411    if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
1412        size_t seq_shard_num;
1413        seq_shard_num = item->seqnum % file->wal->num_shards;
1414        spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
1415        avl_remove(&file->wal->seq_shards[seq_shard_num]._map,
1416                &item->avl_seq);
1417        spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
1418    }
1419
1420    if (item->action == WAL_ACT_LOGICAL_REMOVE ||
1421        item->action == WAL_ACT_REMOVE) {
1422        _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, -1);
1423    }
1424    _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, -1);
1425    atomic_decr_uint32_t(&file->wal->size);
1426    atomic_decr_uint32_t(&file->wal->num_flushable);
1427    if (item->action != WAL_ACT_REMOVE) {
1428        atomic_sub_uint64_t(&file->wal->datasize, item->doc_size,
1429                            std::memory_order_relaxed);
1430    }
1431    _wal_free_item(item, file->wal);
1432}
1433
1434INLINE list_elem *_wal_release_items(struct filemgr *file, size_t shard_num,
1435                                     struct wal_item *item) {
1436    fdb_kvs_id_t kv_id;
1437    uint64_t mem_overhead = 0;
1438    struct list_elem *le = &item->list_elem;
1439    struct wal_item_header *header = item->header;
1440
1441    // get KVS ID
1442    if (item->flag & WAL_ITEM_MULTI_KV_INS_MODE) {
1443        buf2kvid(item->header->chunksize, item->header->key, &kv_id);
1444    } else {
1445        kv_id = 0;
1446    }
1447    le = list_prev(le);
1448    if (_wal_can_discard(file->wal, item, NULL)) {
1449        _wal_release_item(file, shard_num, kv_id, item);
1450        mem_overhead += sizeof(struct wal_item);
1451        item = NULL;
1452    } else {
1453        item->flag &= ~WAL_ITEM_FLUSH_READY;
1454        item->flag |= WAL_ITEM_FLUSHED_OUT;
1455    }
1456    // try to cleanup items from prior snapshots as well..
1457    while (le) {
1458        struct wal_item *sitem = _get_entry(le, struct wal_item, list_elem);
1459        if (!(sitem->flag & WAL_ITEM_COMMITTED)) { // uncommitted items will
1460            le = NULL; // be flushed in the next wal_flush operation
1461            break;
1462        }
1463        le = list_prev(le);
1464        if (_wal_can_discard(file->wal, sitem, item)) {
1465            _wal_release_item(file, shard_num, kv_id, sitem);
1466            mem_overhead += sizeof(struct wal_item);
1467        } else {
1468            item = sitem; // this is the latest and greatest item
1469            item->flag &= ~WAL_ITEM_FLUSH_READY;
1470            item->flag |= WAL_ITEM_FLUSHED_OUT;
1471        }
1472    }
1473    if (list_begin(&header->items) == NULL) {
1474        // wal_item_header becomes empty
1475        // free header and remove from key map
1476        avl_remove(&file->wal->key_shards[shard_num]._map,
1477                &header->avl_key);
1478        mem_overhead = sizeof(wal_item_header) + header->keylen;
1479        free(header->key);
1480        free(header);
1481        le = NULL;
1482    }
1483    atomic_sub_uint64_t(&file->wal->mem_overhead,
1484                        mem_overhead + sizeof(struct wal_item),
1485                        std::memory_order_relaxed);
1486    return le;
1487}
1488
1489// Mark all snapshots are flushed to indicate that all items have been
1490// reflected in the main index and future snapshots must not access these
1491INLINE void _wal_snap_mark_flushed(struct wal *_wal)
1492{
1493    struct avl_node *a;
1494    spin_lock(&_wal->lock);
1495    for (a = avl_first(&_wal->wal_snapshot_tree);
1496         a; a = avl_next(a)) {
1497        struct snap_handle *shandle = _get_entry(a, struct snap_handle, avl_id);
1498        shandle->is_flushed = true;
1499    }
1500    spin_unlock(&_wal->lock);
1501}
1502
1503#define WAL_SORTED_FLUSH ((void *)1) // stored in aux if avl tree is used
1504
1505INLINE bool _wal_are_items_sorted(union wal_flush_items *flush_items)
1506{
1507    return (flush_items->tree.aux == WAL_SORTED_FLUSH);
1508}
1509
1510fdb_status wal_release_flushed_items(struct filemgr *file,
1511                                     union wal_flush_items *flush_items)
1512{
1513    struct wal_item *item;
1514    size_t shard_num;
1515
1516    _wal_snap_mark_flushed(file->wal); // Read-write barrier: items are in trie
1517
1518    if (_wal_are_items_sorted(flush_items)) {
1519        struct avl_tree *tree = &flush_items->tree;
1520        // scan and remove entries in the avl-tree
1521        while (1) {
1522            struct avl_node *a;
1523            if ((a = avl_first(tree)) == NULL) {
1524                break;
1525            }
1526            item = _get_entry(a, struct wal_item, avl_flush);
1527            avl_remove(tree, &item->avl_flush);
1528
1529            // Grab the WAL key shard lock.
1530            shard_num = get_checksum((uint8_t*)item->header->key,
1531                                     item->header->keylen)
1532                                     % file->wal->num_shards;
1533            spin_lock(&file->wal->key_shards[shard_num].lock);
1534
1535            _wal_release_items(file, shard_num, item);
1536
1537            spin_unlock(&file->wal->key_shards[shard_num].lock);
1538        }
1539    } else {
1540        struct list *list_head = &flush_items->list;
1541        // scan and remove entries in the avl-tree
1542        while (1) {
1543            struct list_elem *a;
1544            if ((a = list_begin(list_head)) == NULL) {
1545                break;
1546            }
1547            item = _get_entry(a, struct wal_item, list_elem_flush);
1548            list_remove(list_head, &item->list_elem_flush);
1549
1550            // Grab the WAL key shard lock.
1551            shard_num = get_checksum((uint8_t*)item->header->key,
1552                                     item->header->keylen)
1553                                     % file->wal->num_shards;
1554            spin_lock(&file->wal->key_shards[shard_num].lock);
1555            _wal_release_items(file, shard_num, item);
1556            spin_unlock(&file->wal->key_shards[shard_num].lock);
1557        }
1558    }
1559
1560    return FDB_RESULT_SUCCESS;
1561}
1562
1563INLINE fdb_status _wal_do_flush(struct wal_item *item,
1564                                wal_flush_func *flush_func,
1565                                void *dbhandle,
1566                                struct avl_tree *stale_seqnum_list,
1567                                struct avl_tree *kvs_delta_stats)
1568{
1569    // check weather this item is updated after insertion into tree
1570    if (item->flag & WAL_ITEM_FLUSH_READY) {
1571        fdb_status fs = flush_func(dbhandle, item, stale_seqnum_list, kvs_delta_stats);
1572        if (fs != FDB_RESULT_SUCCESS) {
1573            fdb_kvs_handle *handle = (fdb_kvs_handle *) dbhandle;
1574            fdb_log(&handle->log_callback, fs,
1575                    "Failed to flush WAL item (key '%s') into a database file '%s'",
1576                    (const char *) item->header->key, handle->file->filename);
1577            return fs;
1578        }
1579    }
1580    return FDB_RESULT_SUCCESS;
1581}
1582
1583struct fdb_root_info {
1584    bid_t orig_id_root;
1585    bid_t orig_seq_root;
1586    bid_t orig_stale_root;
1587};
1588
1589INLINE void _wal_backup_root_info(void *voidhandle,
1590                                  struct fdb_root_info *root_info)
1591{
1592    fdb_kvs_handle *handle = (fdb_kvs_handle*)voidhandle;
1593
1594    root_info->orig_id_root = handle->trie->root_bid;
1595    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1596        if (handle->kvs) {
1597            root_info->orig_seq_root = handle->seqtrie->root_bid;
1598        } else {
1599            root_info->orig_seq_root = handle->seqtree->root_bid;
1600        }
1601    }
1602    if (handle->staletree) {
1603        root_info->orig_stale_root = handle->staletree->root_bid;
1604    }
1605}
1606
1607INLINE void _wal_restore_root_info(void *voidhandle,
1608                                   struct fdb_root_info *root_info)
1609{
1610    fdb_kvs_handle *handle = (fdb_kvs_handle*)voidhandle;
1611
1612    handle->trie->root_bid = root_info->orig_id_root;
1613    if (handle->config.seqtree_opt == FDB_SEQTREE_USE) {
1614        if (handle->kvs) {
1615            handle->seqtrie->root_bid = root_info->orig_seq_root;
1616        } else {
1617            handle->seqtree->root_bid = root_info->orig_seq_root;
1618        }
1619    }
1620    if (handle->staletree) {
1621        handle->staletree->root_bid = root_info->orig_stale_root;
1622    }
1623}
1624
1625static fdb_status _wal_flush(struct filemgr *file,
1626                             void *dbhandle,
1627                             wal_flush_func *flush_func,
1628                             wal_get_old_offset_func *get_old_offset,
1629                             wal_flush_seq_purge_func *seq_purge_func,
1630                             wal_flush_kvs_delta_stats_func *delta_stats_func,
1631                             union wal_flush_items *flush_items,
1632                             bool by_compactor)
1633{
1634    struct avl_tree *tree = &flush_items->tree;
1635    struct list *list_head = &flush_items->list;
1636    struct list_elem *ee, *ee_prev;
1637    struct avl_node *a, *a_next;
1638    struct wal_item *item;
1639    struct wal_item_header *header;
1640    struct fdb_root_info root_info;
1641    size_t i = 0;
1642    size_t num_shards = file->wal->num_shards;
1643    bool do_sort = !filemgr_is_fully_resident(file);
1644
1645    if (do_sort) {
1646        avl_init(tree, WAL_SORTED_FLUSH);
1647    } else {
1648        list_init(list_head);
1649    }
1650
1651    memset(&root_info, 0xff, sizeof(root_info));
1652    _wal_backup_root_info(dbhandle, &root_info);
1653
1654    for (; i < num_shards; ++i) {
1655        spin_lock(&file->wal->key_shards[i].lock);
1656        a = avl_first(&file->wal->key_shards[i]._map);
1657        while (a) {
1658            a_next = avl_next(a);
1659            header = _get_entry(a, struct wal_item_header, avl_key);
1660            ee = list_end(&header->items);
1661            while (ee) {
1662                ee_prev = list_prev(ee);
1663                item = _get_entry(ee, struct wal_item, list_elem);
1664                // committed but not flushed items
1665                if (!(item->flag & WAL_ITEM_COMMITTED)) {
1666                    break;
1667                }
1668                // Don't re-flush flushed items, try to free them up instead
1669                if (item->flag & WAL_ITEM_FLUSHED_OUT) {
1670                    _wal_release_items(file, i, item);
1671                    break; // most recent item is already reflected in trie
1672                }
1673                if (!(item->flag & WAL_ITEM_FLUSH_READY)) {
1674                    item->flag |= WAL_ITEM_FLUSH_READY;
1675                    // if WAL_ITEM_FLUSH_READY flag is set,
1676                    // this item becomes immutable, so that
1677                    // no other concurrent thread modifies it.
1678                    if (by_compactor) {
1679                        // During the first phase of compaction, we don't need
1680                        // to retrieve the old offsets of WAL items because they
1681                        // are all new insertions into new file's hbtrie index.
1682                        item->old_offset = 0;
1683                        if (do_sort) {
1684                            avl_insert(tree, &item->avl_flush, _wal_flush_cmp);
1685                        } else {
1686                            list_push_back(list_head, &item->list_elem_flush);
1687                        }
1688                    } else {
1689                        spin_unlock(&file->wal->key_shards[i].lock);
1690                        item->old_offset = get_old_offset(dbhandle, item);
1691                        spin_lock(&file->wal->key_shards[i].lock);
1692                        if (item->old_offset == 0 && // doc not in main index
1693                            item->action == WAL_ACT_REMOVE) {// insert & delete
1694                            item->old_offset = BLK_NOT_FOUND;
1695                        }
1696                        if (do_sort) {
1697                            avl_insert(tree, &item->avl_flush, _wal_flush_cmp);
1698                        } else {
1699                            list_push_back(list_head, &item->list_elem_flush);
1700                        }
1701                        break; // only pick one item per key
1702                    }
1703                }
1704                ee = ee_prev;
1705            }
1706            a = a_next;
1707        }
1708        spin_unlock(&file->wal->key_shards[i].lock);
1709    }
1710
1711    filemgr_set_io_inprog(file); // MB-16622:prevent parallel writes by flusher
1712    fdb_status fs = FDB_RESULT_SUCCESS;
1713    struct avl_tree stale_seqnum_list;
1714    struct avl_tree kvs_delta_stats;
1715    avl_init(&stale_seqnum_list, NULL);
1716    avl_init(&kvs_delta_stats, NULL);
1717
1718    // scan and flush entries in the avl-tree or list
1719    if (do_sort) {
1720        struct avl_node *a = avl_first(tree);
1721        while (a) {
1722            item = _get_entry(a, struct wal_item, avl_flush);
1723            a = avl_next(a);
1724            if (item->old_offset == BLK_NOT_FOUND && // doc not in main index
1725                item->action == WAL_ACT_REMOVE) {// insert & immediate delete
1726                continue; // need not flush this item into main index..
1727            } // item exists solely for in-memory snapshots
1728            fs = _wal_do_flush(item, flush_func, dbhandle,
1729                               &stale_seqnum_list, &kvs_delta_stats);
1730            if (fs != FDB_RESULT_SUCCESS) {
1731                _wal_restore_root_info(dbhandle, &root_info);
1732                break;
1733            }
1734        }
1735    } else {
1736        struct list_elem *a = list_begin(list_head);
1737        while (a) {
1738            item = _get_entry(a, struct wal_item, list_elem_flush);
1739            a = list_next(a);
1740            if (item->old_offset == BLK_NOT_FOUND && // doc not in main index
1741                item->action == WAL_ACT_REMOVE) {// insert & immediate delete
1742                continue; // need not flush this item into main index..
1743            } // item exists solely for in-memory snapshots
1744            fs = _wal_do_flush(item, flush_func, dbhandle,
1745                               &stale_seqnum_list, &kvs_delta_stats);
1746            if (fs != FDB_RESULT_SUCCESS) {
1747                _wal_restore_root_info(dbhandle, &root_info);
1748                break;
1749            }
1750        }
1751    }
1752
1753    // Remove all stale seq entries from the seq tree
1754    seq_purge_func(dbhandle, &stale_seqnum_list, &kvs_delta_stats);
1755    // Update each KV store stats after WAL flush
1756    delta_stats_func(file, &kvs_delta_stats);
1757
1758    filemgr_clear_io_inprog(file);
1759    return fs;
1760}
1761
1762fdb_status wal_flush(struct filemgr *file,
1763                     void *dbhandle,
1764                     wal_flush_func *flush_func,
1765                     wal_get_old_offset_func *get_old_offset,
1766                     wal_flush_seq_purge_func *seq_purge_func,
1767                     wal_flush_kvs_delta_stats_func *delta_stats_func,
1768                     union wal_flush_items *flush_items)
1769{
1770    return _wal_flush(file, dbhandle, flush_func, get_old_offset,
1771                      seq_purge_func, delta_stats_func,
1772                      flush_items, false);
1773}
1774
1775fdb_status wal_flush_by_compactor(struct filemgr *file,
1776                                  void *dbhandle,
1777                                  wal_flush_func *flush_func,
1778                                  wal_get_old_offset_func *get_old_offset,
1779                                  wal_flush_seq_purge_func *seq_purge_func,
1780                                  wal_flush_kvs_delta_stats_func *delta_stats_func,
1781                                  union wal_flush_items *flush_items)
1782{
1783    return _wal_flush(file, dbhandle, flush_func, get_old_offset,
1784                      seq_purge_func, delta_stats_func,
1785                      flush_items, true);
1786}
1787
1788fdb_status wal_snapshot_clone(struct snap_handle *shandle_in,
1789                              struct snap_handle **shandle_out,
1790                              fdb_seqnum_t seqnum)
1791{
1792    if (seqnum == FDB_SNAPSHOT_INMEM ||
1793        shandle_in->seqnum == seqnum) {
1794        atomic_incr_uint16_t(&shandle_in->ref_cnt_kvs);
1795        *shandle_out = shandle_in;
1796        return FDB_RESULT_SUCCESS;
1797    }
1798    return FDB_RESULT_INVALID_ARGS;
1799}
1800
1801fdb_status snap_get_stat(struct snap_handle *shandle, struct kvs_stat *stat)
1802{
1803    *stat = shandle->stat;
1804    return FDB_RESULT_SUCCESS;
1805}
1806
1807fdb_status wal_dur_snapshot_open(fdb_seqnum_t seqnum,
1808                                 _fdb_key_cmp_info *key_cmp_info,
1809                                 struct filemgr *file, fdb_txn *txn,
1810                                 struct snap_handle **shandle)
1811{
1812    struct snap_handle *_shandle;
1813    fdb_kvs_id_t kv_id;
1814    fdb_assert(seqnum != FDB_SNAPSHOT_INMEM, seqnum, key_cmp_info->kvs);
1815    if (!key_cmp_info->kvs) {
1816        kv_id = 0;
1817    } else {
1818        kv_id = key_cmp_info->kvs->id;
1819    }
1820    _shandle = _wal_snapshot_create(kv_id, 0, 0);
1821    if (!_shandle) { // LCOV_EXCL_START
1822        return FDB_RESULT_ALLOC_FAIL;
1823    } // LCOV_EXCL_STOP
1824    spin_lock(&file->wal->lock);
1825    _wal_snapshot_init(_shandle, file, txn, seqnum, key_cmp_info);
1826    spin_unlock(&file->wal->lock);
1827    *shandle = _shandle;
1828    return FDB_RESULT_SUCCESS;
1829}
1830
1831fdb_status wal_snap_insert(struct snap_handle *shandle, fdb_doc *doc,
1832                           uint64_t offset)
1833{
1834    struct wal_item query;
1835    struct wal_item_header query_hdr;
1836    struct wal_item *item;
1837    struct avl_node *node;
1838    query_hdr.key = doc->key;
1839    query_hdr.keylen = doc->keylen;
1840    query.header = &query_hdr;
1841    node = avl_search(&shandle->key_tree, &query.avl_keysnap, _snap_cmp_bykey);
1842
1843    if (!node) {
1844        item = (struct wal_item *) calloc(1, sizeof(struct wal_item));
1845        item->header = (struct wal_item_header *) malloc(
1846                                  sizeof(struct wal_item_header));
1847        item->header->key = doc->key;
1848        item->header->keylen = doc->keylen;
1849        item->seqnum = doc->seqnum;
1850        if (doc->deleted) {
1851            if (!offset) { // deleted item can never be at offset 0
1852                item->action = WAL_ACT_REMOVE; // must be a purged item
1853            } else {
1854                item->action = WAL_ACT_LOGICAL_REMOVE;
1855            }
1856        } else {
1857            item->action = WAL_ACT_INSERT;
1858        }
1859        item->offset = offset;
1860        avl_insert(&shandle->key_tree, &item->avl_keysnap, _snap_cmp_bykey);
1861        avl_insert(&shandle->seq_tree, &item->avl_seq, _wal_cmp_byseq);
1862
1863        // Note: same logic in wal_commit
1864        shandle->stat.wal_ndocs++;
1865        if (doc->deleted) {
1866            shandle->stat.wal_ndeletes++;
1867        }
1868        item->shandle = shandle;
1869    } else {
1870        // replace existing node with new values so there are no duplicates
1871        item = _get_entry(node, struct wal_item, avl_keysnap);
1872        free(item->header->key);
1873        item->header->key = doc->key;
1874        item->header->keylen = doc->keylen;
1875        if (item->seqnum != doc->seqnum) { // Re-index duplicate into seqtree
1876            item->seqnum = doc->seqnum;
1877            avl_remove(&shandle->seq_tree, &item->avl_seq);
1878            avl_insert(&shandle->seq_tree, &item->avl_seq, _wal_cmp_byseq);
1879        }
1880
1881        // Note: same logic in wal_commit
1882        if (item->action == WAL_ACT_INSERT &&
1883            doc->deleted) {
1884            shandle->stat.wal_ndeletes++;
1885        } else if (item->action == WAL_ACT_LOGICAL_REMOVE &&
1886                   !doc->deleted) {
1887            shandle->stat.wal_ndeletes--;
1888        }
1889
1890        item->action = doc->deleted ? WAL_ACT_LOGICAL_REMOVE : WAL_ACT_INSERT;
1891        item->offset = offset;
1892    }
1893    return FDB_RESULT_SUCCESS;
1894}
1895
1896fdb_status wal_copyto_snapshot(struct filemgr *file,
1897                               struct snap_handle *shandle,
1898                               bool is_multi_kv)
1899{
1900    struct list_elem *ee;
1901    struct avl_node *a;
1902    struct wal_item *item;
1903    struct wal_item_header *header;
1904    fdb_kvs_id_t kv_id = 0;
1905    fdb_doc doc;
1906    size_t i = 0;
1907    size_t num_shards = file->wal->num_shards;
1908
1909    shandle->stat.wal_ndocs = 0; // WAL copy will populate
1910    shandle->stat.wal_ndeletes = 0; // these 2 stats
1911
1912    // Get the list of active transactions now
1913    for (; i < num_shards; ++i) {
1914        spin_lock(&file->wal->key_shards[i].lock);
1915        a = avl_first(&file->wal->key_shards[i]._map);
1916        while (a) {
1917            header = _get_entry(a, struct wal_item_header, avl_key);
1918            if (is_multi_kv) {
1919                buf2kvid(header->chunksize, header->key, &kv_id);
1920                if (kv_id != shandle->id) {
1921                    a = avl_next(a);
1922                    continue;
1923                }
1924            }
1925            ee = list_begin(&header->items);
1926            while (ee) {
1927                uint64_t offset;
1928                item = _get_entry(ee, struct wal_item, list_elem);
1929                // Skip any uncommitted item, if not part of either global or
1930                // the current transaction
1931                if (!(item->flag & WAL_ITEM_COMMITTED) &&
1932                        item->txn != &file->global_txn &&
1933                        item->txn != shandle->snap_txn) {
1934                    ee = list_next(ee);
1935                    continue;
1936                }
1937                // Skip the partially committed items too.
1938                if (_wal_item_partially_committed(shandle->global_txn,
1939                                                  &shandle->active_txn_list,
1940                                                  shandle->snap_txn, item)) {
1941                    ee = list_next(ee);
1942                    continue;
1943                }
1944
1945                doc.keylen = item->header->keylen;
1946                doc.key = malloc(doc.keylen); // (freed in fdb_snapshot_close)
1947                memcpy(doc.key, item->header->key, doc.keylen);
1948                doc.seqnum = item->seqnum;
1949                doc.deleted = (item->action == WAL_ACT_LOGICAL_REMOVE ||
1950                               item->action == WAL_ACT_REMOVE);
1951                if (item->action == WAL_ACT_REMOVE) {
1952                    offset = 0;
1953                } else {
1954                    offset = item->offset;
1955                }
1956
1957                wal_snap_insert(shandle, &doc, offset);
1958                break; // We just require a single latest copy in the snapshot
1959            }
1960            a = avl_next(a);
1961        }
1962        spin_unlock(&file->wal->key_shards[i].lock);
1963    }
1964    return FDB_RESULT_SUCCESS;
1965}
1966
1967static
1968fdb_status _wal_snap_find(struct snap_handle *shandle, fdb_doc *doc,
1969                          uint64_t *offset)
1970{
1971    struct wal_item query, *item;
1972    struct avl_node *node;
1973    if (doc->seqnum == SEQNUM_NOT_USED || (doc->key && doc->keylen > 0)) {
1974        if (!shandle->key_tree.root) {
1975            return FDB_RESULT_KEY_NOT_FOUND;
1976        }
1977        struct wal_item_header query_hdr;
1978        query.header = &query_hdr;
1979        // search by key
1980        query_hdr.key = doc->key;
1981        query_hdr.keylen = doc->keylen;
1982        node = avl_search(&shandle->key_tree, &query.avl_keysnap,
1983                          _snap_cmp_bykey);
1984        if (!node) {
1985            return FDB_RESULT_KEY_NOT_FOUND;
1986        } else {
1987            item = _get_entry(node, struct wal_item, avl_keysnap);
1988            *offset = item->offset;
1989            if (item->action == WAL_ACT_INSERT) {
1990                doc->deleted = false;
1991            } else {
1992                doc->deleted = true;
1993                if (item->action == WAL_ACT_REMOVE) {
1994                    *offset = BLK_NOT_FOUND;
1995                }
1996            }
1997            doc->seqnum = item->seqnum;
1998            return FDB_RESULT_SUCCESS;
1999        }
2000    } else if (shandle->seq_tree.root) {
2001        // search by sequence number
2002        query.seqnum = doc->seqnum;
2003        node = avl_search(&shandle->seq_tree, &query.avl_seq, _wal_cmp_byseq);
2004        if (!node) {
2005            return FDB_RESULT_KEY_NOT_FOUND;
2006        } else {
2007            item = _get_entry(node, struct wal_item, avl_seq);
2008            *offset = item->offset;
2009            if (item->action == WAL_ACT_INSERT) {
2010                doc->deleted = false;
2011            } else {
2012                doc->deleted = true;
2013                if (item->action == WAL_ACT_REMOVE) {
2014                    *offset = BLK_NOT_FOUND;
2015                }
2016            }
2017            return FDB_RESULT_SUCCESS;
2018        }
2019    }
2020    return FDB_RESULT_KEY_NOT_FOUND;
2021}
2022
2023fdb_status wal_snapshot_close(struct snap_handle *shandle,
2024                              struct filemgr *file)
2025{
2026    if (!atomic_decr_uint16_t(&shandle->ref_cnt_kvs)) {
2027        struct avl_node *a, *nexta;
2028        if (!shandle->is_persisted_snapshot &&
2029            shandle->snap_tag_idx) { // the KVS did have items in WAL..
2030            return FDB_RESULT_SUCCESS;
2031        }
2032        for (a = avl_first(&shandle->key_tree);
2033             a; a = nexta) {
2034            struct wal_item *item = _get_entry(a, struct wal_item, avl_keysnap);
2035            nexta = avl_next(a);
2036            avl_remove(&shandle->key_tree, &item->avl_keysnap);
2037            free(item->header->key);
2038            free(item->header);
2039            free(item);
2040        }
2041        for (struct list_elem *e = list_begin(&shandle->active_txn_list); e;) {
2042            struct list_elem *e_next = list_next(e);
2043            struct wal_txn_wrapper *active_txn = _get_entry(e,
2044                                                   struct wal_txn_wrapper, le);
2045            free(active_txn);
2046            e = e_next;
2047        }
2048        free(shandle);
2049    }
2050    return FDB_RESULT_SUCCESS;
2051}
2052
2053fdb_status wal_itr_init(struct filemgr *file,
2054                        struct snap_handle *shandle,
2055                        bool by_key,
2056                        struct wal_iterator **wal_iterator)
2057{
2058    struct wal_iterator *wal_itr = (struct wal_iterator *)
2059                      malloc(sizeof(struct wal_iterator));
2060    if (!wal_itr) { // LCOV_EXCL_START
2061        return FDB_RESULT_ALLOC_FAIL;
2062    } // LCOV_EXCL_STOP
2063
2064    // If key_cmp_info is non-null it implies key-range iteration
2065    if (by_key) {
2066        wal_itr->map_shards = file->wal->key_shards;
2067        avl_init(&wal_itr->merge_tree, &shandle->cmp_info);
2068        wal_itr->by_key = true;
2069    } else {
2070        // Otherwise wal iteration is requested over sequence range
2071        if (file->config->seqtree_opt != FDB_SEQTREE_USE) {
2072            return FDB_RESULT_INVALID_CONFIG;
2073        }
2074        wal_itr->map_shards = file->wal->seq_shards;
2075        avl_init(&wal_itr->merge_tree, NULL);
2076        wal_itr->by_key = false;
2077    }
2078
2079    if (shandle->cmp_info.kvs) {
2080        wal_itr->multi_kvs = true;
2081    } else {
2082        wal_itr->multi_kvs = false;
2083    }
2084    wal_itr->cursor_pos = NULL;
2085    wal_itr->item_prev = NULL;
2086
2087    wal_itr->num_shards = file->wal->num_shards;
2088    if (!shandle->is_persisted_snapshot) {
2089        wal_itr->cursors = (struct wal_cursor *)calloc(wal_itr->num_shards,
2090                           sizeof(struct wal_cursor));
2091    } else {
2092        wal_itr->cursors = NULL;
2093    }
2094    wal_itr->shandle = shandle;
2095    wal_itr->_wal = file->wal;
2096    wal_itr->direction = FDB_ITR_DIR_NONE;
2097    *wal_iterator = wal_itr;
2098    return FDB_RESULT_SUCCESS;
2099}
2100
2101INLINE bool _wal_is_my_kvs(struct wal_item_header *header,
2102                           struct wal_iterator *wal_itr)
2103{
2104    if (wal_itr->multi_kvs) {
2105        fdb_kvs_id_t kv_id;
2106        buf2kvid(header->chunksize, header->key, &kv_id);
2107        if (kv_id != wal_itr->shandle->id) {
2108            return false;
2109        }
2110    }
2111    return true;
2112}
2113
2114static
2115struct wal_item *_wal_itr_search_greater_bykey(struct wal_iterator *wal_itr,
2116                                               struct wal_item *query)
2117{
2118    struct avl_node *a = NULL;
2119    struct wal_cursor *cursor;
2120
2121    // search is a stateless operation, so re-initialize shard's merge-sort tree
2122    avl_init(&wal_itr->merge_tree, (void*)&wal_itr->shandle->cmp_info);
2123    for (size_t i = 0; i < wal_itr->num_shards; ++i) {
2124        struct wal_item *item = NULL;
2125        spin_lock(&wal_itr->map_shards[i].lock);
2126        if (query) {
2127            avl_set_aux(&wal_itr->map_shards[i]._map,
2128                        (void*)&wal_itr->shandle->cmp_info);
2129            a = avl_search_greater(&wal_itr->map_shards[i]._map,
2130                                   &query->header->avl_key,
2131                                   _wal_cmp_bykey);
2132        } else {
2133            a = avl_first(&wal_itr->map_shards[i]._map);
2134        }
2135        if (a) {
2136            do {
2137                struct wal_item_header *header;
2138                header = _get_entry(a, struct wal_item_header, avl_key);
2139                if (!_wal_is_my_kvs(header, wal_itr)) {
2140                    item = NULL;
2141                    break;
2142                }
2143                item = _wal_get_snap_item(header, wal_itr->shandle);
2144            } while (!item && (a = avl_next(a)));
2145        }
2146        spin_unlock(&wal_itr->map_shards[i].lock);
2147        if (item) {
2148            wal_itr->cursors[i].item = item;
2149            // re-insert into the merge-sorted AVL tree across all shards
2150            avl_insert(&wal_itr->merge_tree, &wal_itr->cursors[i].avl_merge,
2151                       _merge_cmp_bykey);
2152        } else {
2153            wal_itr->cursors[i].item = NULL;
2154        }
2155    } // done for all WAL shards
2156
2157    wal_itr->cursor_pos = avl_first(&wal_itr->merge_tree);
2158
2159    if (!wal_itr->cursor_pos) {
2160        wal_itr->item_prev = NULL;
2161        return NULL;
2162    }
2163    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2164    // save the current cursor position for reverse iteration
2165    wal_itr->item_prev = cursor->item;
2166    return cursor->item;
2167}
2168
2169static
2170struct wal_item *_wal_itr_search_greater_byseq(struct wal_iterator *wal_itr,
2171                                               struct wal_item *query)
2172{
2173    struct avl_node *a = NULL;
2174    struct wal_cursor *cursor;
2175
2176    // search is a stateless operation, so re-initialize shard's merge-sort tree
2177    avl_init(&wal_itr->merge_tree, &wal_itr->shandle->cmp_info);
2178    for (size_t i = 0; i < wal_itr->num_shards; ++i) {
2179        struct wal_item *item = NULL, *_item;
2180        if (query) {
2181            spin_lock(&wal_itr->map_shards[i].lock);
2182            a = avl_search_greater(&wal_itr->map_shards[i]._map, &query->avl_seq,
2183                                   _wal_cmp_byseq);
2184        } else {
2185            a = avl_first(&wal_itr->map_shards[i]._map);
2186        }
2187        while (a) {
2188            item = _get_entry(a, struct wal_item, avl_seq);
2189            if (!_wal_is_my_kvs(item->header, wal_itr)) {
2190                item = NULL;
2191                break;
2192            }
2193            _item = _wal_get_snap_item(item->header, wal_itr->shandle);
2194            if (item == _item) {
2195                break;
2196            } else {
2197                item = NULL;
2198            }
2199            a = avl_next(a);
2200        }
2201        spin_unlock(&wal_itr->map_shards[i].lock);
2202        if (item) {
2203            wal_itr->cursors[i].item = item;
2204            // re-insert into the merge-sorted AVL tree across all shards
2205            avl_insert(&wal_itr->merge_tree, &wal_itr->cursors[i].avl_merge,
2206                       _merge_cmp_byseq);
2207        } else {
2208            wal_itr->cursors[i].item = NULL;
2209        }
2210    } // done for all WAL shards
2211
2212    wal_itr->cursor_pos = avl_first(&wal_itr->merge_tree);
2213    if (!wal_itr->cursor_pos) {
2214        return NULL;
2215    }
2216    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2217    // save the current cursor position for reverse iteration
2218    wal_itr->item_prev = cursor->item;
2219    return cursor->item;
2220}
2221
2222struct wal_item* wal_itr_search_greater(struct wal_iterator *wal_itr,
2223                                        struct wal_item *query)
2224{
2225    if (wal_itr->shandle->is_persisted_snapshot) {
2226        struct avl_node *a;
2227        if (wal_itr->by_key) {
2228            a = avl_search_greater(&wal_itr->shandle->key_tree,
2229                                   &query->avl_keysnap,
2230                                   _snap_cmp_bykey);
2231            wal_itr->cursor_pos = a;
2232            return a ? _get_entry(a, struct wal_item, avl_keysnap) : NULL;
2233        } else {
2234            a = avl_search_greater(&wal_itr->shandle->seq_tree,
2235                                   &query->avl_seq,
2236                                   _wal_cmp_byseq);
2237            wal_itr->cursor_pos = a;
2238            return a ? _get_entry(a, struct wal_item, avl_seq) : NULL;
2239        }
2240    }
2241    if (wal_itr->shandle->snap_tag_idx) {
2242        wal_itr->direction = FDB_ITR_FORWARD;
2243        if (wal_itr->by_key) {
2244            return _wal_itr_search_greater_bykey(wal_itr, query);
2245        } else {
2246            return _wal_itr_search_greater_byseq(wal_itr, query);
2247        }
2248    } // else no items in WAL in snapshot..
2249    return NULL;
2250}
2251
2252static
2253struct wal_item* _wal_itr_search_smaller_bykey(struct wal_iterator *wal_itr,
2254                                               struct wal_item *query)
2255{
2256    struct avl_node *a = NULL;
2257    struct wal_cursor *cursor;
2258
2259    // search is a stateless operation, so re-initialize shard's merge-sort tree
2260    avl_init(&wal_itr->merge_tree, &wal_itr->shandle->cmp_info);
2261    for (size_t i = 0; i < wal_itr->num_shards; ++i) {
2262        struct wal_item *item = NULL;
2263        spin_lock(&wal_itr->map_shards[i].lock);
2264        if (query) {
2265            avl_set_aux(&wal_itr->map_shards[i]._map,
2266                        (void*)&wal_itr->shandle->cmp_info);
2267            a = avl_search_smaller(&wal_itr->map_shards[i]._map,
2268                                   &query->header->avl_key,
2269                                   _wal_cmp_bykey);
2270        } else { // no item implies search to last key
2271            a = avl_last(&wal_itr->map_shards[i]._map);
2272        }
2273        if (a) {
2274            do {
2275                struct wal_item_header *header;
2276                header = _get_entry(a, struct wal_item_header, avl_key);
2277                if (!_wal_is_my_kvs(header, wal_itr)) {
2278                    item = NULL;
2279                    break;
2280                }
2281                item = _wal_get_snap_item(header, wal_itr->shandle);
2282            } while (!item && (a = avl_prev(a)));
2283        }
2284        spin_unlock(&wal_itr->map_shards[i].lock);
2285        if (item) {
2286            wal_itr->cursors[i].item = item;
2287            // re-insert into the merge-sorted AVL tree across all shards
2288            avl_insert(&wal_itr->merge_tree, &wal_itr->cursors[i].avl_merge,
2289                       _merge_cmp_bykey);
2290        } else {
2291            wal_itr->cursors[i].item = NULL;
2292        }
2293    } // done for all WAL shards
2294
2295    wal_itr->cursor_pos = avl_last(&wal_itr->merge_tree);
2296    if (!wal_itr->cursor_pos) {
2297        wal_itr->item_prev = NULL;
2298        return NULL;
2299    }
2300
2301    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2302    // save the current cursor position for reverse iteration
2303    wal_itr->item_prev = cursor->item;
2304    return cursor->item;
2305}
2306
2307static
2308struct wal_item *_wal_itr_search_smaller_byseq(struct wal_iterator *wal_itr,
2309                                               struct wal_item *query)
2310{
2311    struct avl_node *a = NULL;
2312    struct wal_cursor *cursor;
2313
2314    // search is a stateless operation, so re-initialize shard's merge-sort tree
2315    avl_init(&wal_itr->merge_tree, &wal_itr->shandle->cmp_info);
2316    for (size_t i = 0; i < wal_itr->num_shards; ++i) {
2317        struct wal_item *item = NULL, *_item;
2318        spin_lock(&wal_itr->map_shards[i].lock);
2319        if (query) {
2320            a = avl_search_smaller(&wal_itr->map_shards[i]._map,
2321                                   &query->avl_seq, _wal_cmp_byseq);
2322        } else {
2323            a = avl_last(&wal_itr->map_shards[i]._map);
2324        }
2325        while (a) {
2326            item = _get_entry(a, struct wal_item, avl_seq);
2327
2328            if (!_wal_is_my_kvs(item->header, wal_itr)) {
2329                item = NULL;
2330                break;
2331            }
2332            _item = _wal_get_snap_item(item->header, wal_itr->shandle);
2333            if (item == _item) {
2334                break;
2335            } else {
2336                item = NULL;
2337            }
2338            a = avl_prev(a);
2339        }
2340        spin_unlock(&wal_itr->map_shards[i].lock);
2341        if (item) {
2342            wal_itr->cursors[i].item = item;
2343            // re-insert into the merge-sorted AVL tree across all shards
2344            avl_insert(&wal_itr->merge_tree, &wal_itr->cursors[i].avl_merge,
2345                       _merge_cmp_byseq);
2346        } else {
2347            wal_itr->cursors[i].item = NULL;
2348        }
2349    } // done for all WAL shards
2350
2351    wal_itr->cursor_pos = avl_last(&wal_itr->merge_tree);
2352    if (!wal_itr->cursor_pos) {
2353        wal_itr->item_prev = NULL;
2354        return NULL;
2355    }
2356    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2357    // save the current cursor position for reverse iteration
2358    wal_itr->item_prev = cursor->item;
2359    return cursor->item;
2360}
2361
2362struct wal_item* wal_itr_search_smaller(struct wal_iterator *wal_itr,
2363                                        struct wal_item *query)
2364{
2365    if (wal_itr->shandle->is_persisted_snapshot) {
2366        struct avl_node *a;
2367        if (wal_itr->by_key) {
2368            a = avl_search_smaller(&wal_itr->shandle->key_tree,
2369                                   &query->avl_keysnap,
2370                                   _snap_cmp_bykey);
2371            wal_itr->cursor_pos = a;
2372            return a ? _get_entry(a, struct wal_item, avl_keysnap) : NULL;
2373        } else {
2374            a = avl_search_smaller(&wal_itr->shandle->seq_tree,
2375                                   &query->avl_seq,
2376                                   _wal_cmp_byseq);
2377            wal_itr->cursor_pos = a;
2378            return a ? _get_entry(a, struct wal_item, avl_seq) : NULL;
2379        }
2380    }
2381
2382    if (wal_itr->shandle->snap_tag_idx) {
2383        wal_itr->direction = FDB_ITR_REVERSE;
2384        if (!wal_itr->by_key) {
2385            return _wal_itr_search_smaller_byseq(wal_itr, query);
2386        } else {
2387            return _wal_itr_search_smaller_bykey(wal_itr, query);
2388        }
2389    } // else no items in WAL in for this snapshot..
2390    return NULL;
2391}
2392
2393// The following iterator movements are stateful..
2394static
2395struct wal_item *_wal_itr_next_bykey(struct wal_iterator *wal_itr)
2396{
2397    struct wal_cursor *cursor = _get_entry(wal_itr->cursor_pos,
2398                                           struct wal_cursor, avl_merge);
2399    struct wal_cursor cur_item = *cursor; // save cur item for merge sort
2400    struct wal_item_header *header = cur_item.item->header;
2401    size_t cur_shard_num = cursor - wal_itr->cursors;
2402    struct wal_item *item = NULL;
2403
2404    wal_itr->item_prev = cursor->item; // save for direction change
2405
2406    spin_lock(&wal_itr->map_shards[cur_shard_num].lock);
2407    avl_set_aux(&wal_itr->map_shards[cur_shard_num]._map,
2408            (void*)&wal_itr->shandle->cmp_info);
2409    struct avl_node *a = avl_next(&header->avl_key);
2410    if (a) {
2411        do {
2412            header = _get_entry(a, struct wal_item_header, avl_key);
2413            if (!_wal_is_my_kvs(header, wal_itr)) {
2414                item = NULL;
2415                break;
2416            }
2417            item = _wal_get_snap_item(header, wal_itr->shandle);
2418        } while (!item && (a = avl_next(a)));
2419    }
2420    spin_unlock(&wal_itr->map_shards[cur_shard_num].lock);
2421    avl_remove(&wal_itr->merge_tree, &cursor->avl_merge);
2422    if (item) {
2423        // re-insert this merge sorted item back into merge-sort tree..
2424        wal_itr->cursors[cur_shard_num].item = item;
2425        avl_insert(&wal_itr->merge_tree,
2426                   &wal_itr->cursors[cur_shard_num].avl_merge,
2427                   _merge_cmp_bykey);
2428    } else {
2429        wal_itr->cursors[cur_shard_num].item = NULL;
2430    }
2431
2432    wal_itr->cursor_pos = avl_search_greater(&wal_itr->merge_tree,
2433                                             &cur_item.avl_merge,
2434                                             _merge_cmp_bykey);
2435    if (!wal_itr->cursor_pos) {
2436        return NULL;
2437    }
2438    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2439    return cursor->item;
2440}
2441
2442static
2443struct wal_item *_wal_itr_next_byseq(struct wal_iterator *wal_itr)
2444{
2445    struct wal_cursor *cursor = _get_entry(wal_itr->cursor_pos,
2446                                           struct wal_cursor, avl_merge);
2447    struct wal_cursor cur_item = *cursor; // save cur item for merge sort
2448    size_t cur_shard_num = cursor - wal_itr->cursors;
2449    struct wal_item *item = NULL, *_item;
2450
2451    wal_itr->item_prev = cursor->item; // save for direction change
2452
2453    spin_lock(&wal_itr->map_shards[cur_shard_num].lock);
2454    struct avl_node *a = avl_next(&cur_item.item->avl_seq);
2455    while (a) {
2456        item = _get_entry(a, struct wal_item, avl_seq);
2457        if (!_wal_is_my_kvs(item->header, wal_itr)) {
2458            item = NULL;
2459            break;
2460        }
2461        _item = _wal_get_snap_item(item->header, wal_itr->shandle);
2462        if (item == _item) {
2463            break;
2464        } else {
2465            item = NULL;
2466        }
2467        a = avl_next(a);
2468    }
2469    spin_unlock(&wal_itr->map_shards[cur_shard_num].lock);
2470    avl_remove(&wal_itr->merge_tree, &cursor->avl_merge);
2471    if (item) {
2472        wal_itr->cursors[cur_shard_num].item = item;
2473        // re-insert this merge sorted item back into merge-sort tree..
2474        avl_insert(&wal_itr->merge_tree,
2475                   &wal_itr->cursors[cur_shard_num].avl_merge,
2476                   _merge_cmp_byseq);
2477    } else {
2478        wal_itr->cursors[cur_shard_num].item = NULL;
2479    }
2480
2481    wal_itr->cursor_pos = avl_search_greater(&wal_itr->merge_tree,
2482                                             &cur_item.avl_merge,
2483                                             _merge_cmp_byseq);
2484    if (!wal_itr->cursor_pos) {
2485        return NULL;
2486    }
2487    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2488    return cursor->item;
2489}
2490
2491struct wal_item* wal_itr_next(struct wal_iterator *wal_itr)
2492{
2493    struct wal_item *result = NULL;
2494    if (wal_itr->shandle->is_persisted_snapshot) {
2495        wal_itr->cursor_pos = avl_next(wal_itr->cursor_pos);
2496        if (wal_itr->by_key) {
2497            return wal_itr->cursor_pos ? _get_entry(wal_itr->cursor_pos,
2498                                                struct wal_item, avl_keysnap) : NULL;
2499        } else {
2500            return wal_itr->cursor_pos ? _get_entry(wal_itr->cursor_pos,
2501                                                struct wal_item, avl_seq) : NULL;
2502        }
2503    }
2504
2505    if (!wal_itr->shandle->snap_tag_idx) { // no items in WAL in snapshot..
2506        return NULL;
2507    }
2508    if (wal_itr->direction == FDB_ITR_FORWARD) {
2509        if (!wal_itr->cursor_pos) {
2510            return result;
2511        }
2512        if (wal_itr->by_key) {
2513            result = _wal_itr_next_bykey(wal_itr);
2514        } else {
2515            result = _wal_itr_next_byseq(wal_itr);
2516        }
2517    } else { // change of direction involves searching across all shards..
2518        if (!wal_itr->item_prev) {
2519            return result;
2520        }
2521        if (wal_itr->by_key) {
2522            result = _wal_itr_search_greater_bykey(wal_itr,
2523                                                   wal_itr->item_prev);
2524        } else {
2525            result = _wal_itr_search_greater_byseq(wal_itr,
2526                                                   wal_itr->item_prev);
2527        }
2528    }
2529    wal_itr->direction = FDB_ITR_FORWARD;
2530    return result;
2531}
2532
2533static
2534struct wal_item *_wal_itr_prev_bykey(struct wal_iterator *wal_itr)
2535{
2536
2537    struct wal_cursor *cursor = _get_entry(wal_itr->cursor_pos,
2538                                           struct wal_cursor, avl_merge);
2539    struct wal_cursor cur_item = *cursor; // save cur item for merge sort
2540    struct wal_item_header *header = cur_item.item->header;
2541    size_t cur_shard_num = cursor - wal_itr->cursors;
2542    struct wal_item *item = NULL;
2543
2544    wal_itr->item_prev = cursor->item; // save for direction change
2545
2546    spin_lock(&wal_itr->map_shards[cur_shard_num].lock);
2547    avl_set_aux(&wal_itr->map_shards[cur_shard_num]._map,
2548                (void*)&wal_itr->shandle->cmp_info);
2549    struct avl_node *a = avl_prev(&header->avl_key);
2550    if (a) {
2551        do {
2552            header = _get_entry(a, struct wal_item_header, avl_key);
2553            if (!_wal_is_my_kvs(header, wal_itr)) {
2554                item = NULL;
2555                break;
2556            }
2557            item = _wal_get_snap_item(header, wal_itr->shandle);
2558        } while (!item && (a = avl_prev(a)));
2559    }
2560    spin_unlock(&wal_itr->map_shards[cur_shard_num].lock);
2561    avl_remove(&wal_itr->merge_tree, &cursor->avl_merge);
2562    if (item) {
2563        // re-insert this merge sorted item back into merge-sort tree..
2564        wal_itr->cursors[cur_shard_num].item = item;
2565        avl_insert(&wal_itr->merge_tree,
2566                   &wal_itr->cursors[cur_shard_num].avl_merge,
2567                   _merge_cmp_bykey);
2568    } else {
2569        wal_itr->cursors[cur_shard_num].item = NULL;
2570    }
2571
2572    wal_itr->cursor_pos = avl_search_smaller(&wal_itr->merge_tree,
2573                                             &cur_item.avl_merge,
2574                                             _merge_cmp_bykey);
2575    if (!wal_itr->cursor_pos) {
2576        return NULL;
2577    }
2578    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2579    return cursor->item;
2580}
2581
2582static
2583struct wal_item *_wal_itr_prev_byseq(struct wal_iterator *wal_itr)
2584{
2585    struct wal_cursor *cursor = _get_entry(wal_itr->cursor_pos,
2586                                           struct wal_cursor, avl_merge);
2587    struct wal_cursor cur_item = *cursor; // save cur item for merge sort
2588    size_t cur_shard_num = cursor - wal_itr->cursors;
2589    struct wal_item *item = NULL, *_item;
2590
2591    wal_itr->item_prev = cursor->item; // save for direction change
2592
2593    spin_lock(&wal_itr->map_shards[cur_shard_num].lock);
2594    struct avl_node *a = avl_prev(&cur_item.item->avl_seq);
2595    while (a) {
2596        item = _get_entry(a, struct wal_item, avl_seq);
2597        if (!_wal_is_my_kvs(item->header, wal_itr)) {
2598            item = NULL;
2599            break;
2600        }
2601        _item = _wal_get_snap_item(item->header, wal_itr->shandle);
2602        if (item == _item) {
2603            break;
2604        } else {
2605            item = NULL;
2606        }
2607        a = avl_prev(a);
2608    }
2609    spin_unlock(&wal_itr->map_shards[cur_shard_num].lock);
2610    avl_remove(&wal_itr->merge_tree, &cursor->avl_merge);
2611    if (item) {
2612        wal_itr->cursors[cur_shard_num].item = item;
2613        // re-insert this merge sorted item back into merge-sort tree..
2614        avl_insert(&wal_itr->merge_tree,
2615                &wal_itr->cursors[cur_shard_num].avl_merge,
2616                _merge_cmp_byseq);
2617    } else {
2618        wal_itr->cursors[cur_shard_num].item = NULL;
2619    }
2620
2621    wal_itr->cursor_pos = avl_search_smaller(&wal_itr->merge_tree,
2622            &cur_item.avl_merge,
2623            _merge_cmp_byseq);
2624    if (!wal_itr->cursor_pos) {
2625        return NULL;
2626    }
2627    cursor = _get_entry(wal_itr->cursor_pos, struct wal_cursor, avl_merge);
2628    return cursor->item;
2629}
2630
2631struct wal_item* wal_itr_prev(struct wal_iterator *wal_itr)
2632{
2633    struct wal_item *result = NULL;
2634    if (wal_itr->shandle->is_persisted_snapshot) {
2635        wal_itr->cursor_pos = avl_prev(wal_itr->cursor_pos);
2636        if (wal_itr->by_key) {
2637            return wal_itr->cursor_pos ? _get_entry(wal_itr->cursor_pos,
2638                    struct wal_item, avl_keysnap) : NULL;
2639        } else {
2640            return wal_itr->cursor_pos ? _get_entry(wal_itr->cursor_pos,
2641                    struct wal_item, avl_seq) : NULL;
2642        }
2643    }
2644
2645    if (!wal_itr->shandle->snap_tag_idx) { // no items in WAL in snapshot..
2646        return NULL;
2647    }
2648    if (wal_itr->direction == FDB_ITR_REVERSE) {
2649        if (!wal_itr->cursor_pos) {
2650            return result;
2651        }
2652        if (wal_itr->by_key) {
2653            result = _wal_itr_prev_bykey(wal_itr);
2654        } else {
2655            result = _wal_itr_prev_byseq(wal_itr);
2656        }
2657    } else { // change of direction involves searching across all shards..
2658        if (!wal_itr->item_prev) {
2659            return result;
2660        }
2661        if (wal_itr->by_key) {
2662            result = _wal_itr_search_smaller_bykey(wal_itr,
2663                    wal_itr->item_prev);
2664        } else {
2665            result = _wal_itr_search_smaller_byseq(wal_itr,
2666                    wal_itr->item_prev);
2667        }
2668    }
2669    wal_itr->direction = FDB_ITR_REVERSE;
2670    return result;
2671}
2672
2673/**TODO:
2674 * Sequence iteration currently can be O(n2) if there are huge number of updates
2675 * Need to address this complexity with following functions
2676 */
2677fdb_status wal_itr_set_first(struct wal_iterator *wal_itr,
2678        struct wal_item *elem)
2679{
2680    return FDB_RESULT_SUCCESS;
2681}
2682
2683fdb_status wal_itr_set_last(struct wal_iterator *wal_itr,
2684        struct wal_item *elem)
2685{
2686    return FDB_RESULT_SUCCESS;
2687}
2688
2689struct wal_item *_wal_itr_first_bykey(struct wal_iterator *wal_itr)
2690{
2691    struct wal_item_header dummy_key;
2692    struct wal_item dummy_item;
2693    fdb_kvs_id_t kv_id = wal_itr->shandle->id;
2694    dummy_key.key = &kv_id;
2695    dummy_key.keylen = sizeof(fdb_kvs_id_t);
2696    dummy_item.header = &dummy_key;
2697    if (wal_itr->multi_kvs) {
2698        return _wal_itr_search_greater_bykey(wal_itr, &dummy_item);
2699    } // else we are in single kv instance mode
2700    return _wal_itr_search_greater_bykey(wal_itr, NULL);
2701}
2702
2703struct wal_item* _wal_itr_first_byseq(struct wal_iterator *wal_itr)
2704{
2705    return _wal_itr_search_greater_byseq(wal_itr, NULL);
2706}
2707
2708struct wal_item* wal_itr_first(struct wal_iterator *wal_itr) {
2709    if (wal_itr->shandle->is_persisted_snapshot) {
2710        struct avl_node *a;
2711        if (wal_itr->by_key) {
2712            a = avl_first(&wal_itr->shandle->key_tree);
2713            wal_itr->cursor_pos = a;
2714            return a ? _get_entry(a, struct wal_item, avl_keysnap) : NULL;
2715        } else {
2716            a = avl_first(&wal_itr->shandle->seq_tree);
2717            wal_itr->cursor_pos = a;
2718            return a ? _get_entry(a, struct wal_item, avl_seq) : NULL;
2719        }
2720    }
2721
2722    if (wal_itr->shandle->snap_tag_idx) {
2723        wal_itr->direction = FDB_ITR_FORWARD;
2724        if (wal_itr->by_key) {
2725            return _wal_itr_first_bykey(wal_itr);
2726        } else {
2727            return _wal_itr_first_byseq(wal_itr);
2728        }
2729    } // else no items in WAL for this snapshot
2730    return NULL;
2731}
2732
2733struct wal_item *_wal_itr_last_bykey(struct wal_iterator *wal_itr)
2734{
2735    struct wal_item_header dummy_key;
2736    struct wal_item dummy_item;
2737    fdb_kvs_id_t kv_id = wal_itr->shandle->id + 1; // set to next higher KVS
2738    dummy_key.key = &kv_id;
2739    dummy_key.keylen = sizeof(fdb_kvs_id_t);
2740    dummy_item.header = &dummy_key;
2741    if (wal_itr->multi_kvs) {
2742        return _wal_itr_search_smaller_bykey(wal_itr, &dummy_item);
2743    } // else search go to last element in single kv instance mode..
2744    return _wal_itr_search_smaller_bykey(wal_itr, NULL);
2745}
2746
2747struct wal_item *_wal_itr_last_byseq(struct wal_iterator *wal_itr)
2748{
2749    return _wal_itr_search_smaller_byseq(wal_itr, NULL);
2750}
2751
2752struct wal_item* wal_itr_last(struct wal_iterator *wal_itr) {
2753    if (wal_itr->shandle->is_persisted_snapshot) {
2754        struct avl_node *a;
2755        if (wal_itr->by_key) {
2756            a = avl_last(&wal_itr->shandle->key_tree);
2757            wal_itr->cursor_pos = a;
2758            return a ? _get_entry(a, struct wal_item, avl_keysnap) : NULL;
2759        } else {
2760            a = avl_last(&wal_itr->shandle->seq_tree);
2761            wal_itr->cursor_pos = a;
2762            return a ? _get_entry(a, struct wal_item, avl_seq) : NULL;
2763        }
2764    }
2765
2766    if (wal_itr->shandle->snap_tag_idx) { // no items in WAL in snapshot..
2767        wal_itr->direction = FDB_ITR_REVERSE;
2768        if (wal_itr->by_key) {
2769            return _wal_itr_last_bykey(wal_itr);
2770        } else {
2771            return _wal_itr_last_byseq(wal_itr);
2772        }
2773    }
2774    return NULL;
2775}
2776
2777fdb_status wal_itr_close(struct wal_iterator *wal_itr)
2778{
2779    free(wal_itr->cursors);
2780    free(wal_itr);
2781    return FDB_RESULT_SUCCESS;
2782}
2783
2784// discard entries in txn
2785fdb_status wal_discard(struct filemgr *file, fdb_txn *txn)
2786{
2787    struct wal_item *item;
2788    struct list_elem *e;
2789    size_t shard_num, seq_shard_num;
2790    uint64_t mem_overhead = 0;
2791
2792    e = list_begin(txn->items);
2793    while(e) {
2794        item = _get_entry(e, struct wal_item, list_elem_txn);
2795        shard_num = get_checksum((uint8_t*)item->header->key,
2796                                 item->header->keylen) %
2797                                 file->wal->num_shards;
2798        spin_lock(&file->wal->key_shards[shard_num].lock);
2799
2800        if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
2801            // remove from seq map
2802            seq_shard_num = item->seqnum % file->wal->num_shards;
2803            spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
2804            avl_remove(&file->wal->seq_shards[seq_shard_num]._map,
2805                       &item->avl_seq);
2806            spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
2807        }
2808
2809        // remove from header's list
2810        list_remove(&item->header->items, &item->list_elem);
2811        // remove header if empty
2812        if (list_begin(&item->header->items) == NULL) {
2813            //remove from key map
2814            avl_remove(&file->wal->key_shards[shard_num]._map,
2815                       &item->header->avl_key);
2816            mem_overhead += sizeof(struct wal_item_header) + item->header->keylen;
2817            // free key and header
2818            free(item->header->key);
2819            free(item->header);
2820        }
2821        // remove from txn's list
2822        e = list_remove(txn->items, e);
2823        if (item->txn == &file->global_txn ||
2824            item->flag & WAL_ITEM_COMMITTED) {
2825            atomic_decr_uint32_t(&file->wal->num_flushable);
2826        }
2827        if (item->action != WAL_ACT_REMOVE) {
2828            atomic_sub_uint64_t(&file->wal->datasize, item->doc_size,
2829                                std::memory_order_relaxed);
2830            // mark as stale if the item is not an immediate remove
2831            filemgr_mark_stale(file, item->offset, item->doc_size);
2832        }
2833
2834        // free
2835        free(item);
2836        atomic_decr_uint32_t(&file->wal->size);
2837        mem_overhead += sizeof(struct wal_item);
2838        spin_unlock(&file->wal->key_shards[shard_num].lock);
2839    }
2840    atomic_sub_uint64_t(&file->wal->mem_overhead, mem_overhead,
2841                        std::memory_order_relaxed);
2842
2843    return FDB_RESULT_SUCCESS;
2844}
2845
2846typedef enum wal_discard_type {
2847    WAL_DISCARD_UNCOMMITTED_ONLY,
2848    WAL_DISCARD_ALL,
2849    WAL_DISCARD_KV_INS,
2850} wal_discard_t;
2851
2852// discard all entries
2853static fdb_status _wal_close(struct filemgr *file,
2854                             wal_discard_t type, void *aux,
2855                             err_log_callback *log_callback)
2856{
2857    struct wal_item *item;
2858    struct wal_item_header *header;
2859    struct list_elem *e;
2860    struct avl_node *a, *next_a;
2861    struct snap_handle *shandle;
2862    fdb_kvs_id_t kv_id, kv_id_req;
2863    bool committed;
2864    size_t i = 0, seq_shard_num;
2865    size_t num_shards = wal_get_num_shards(file);
2866    uint64_t mem_overhead = 0;
2867    struct snap_handle query;
2868
2869    if (type == WAL_DISCARD_KV_INS) { // multi KV ins mode
2870        if (aux == NULL) { // aux must contain pointer to KV ID
2871            return FDB_RESULT_INVALID_ARGS;
2872        }
2873        kv_id_req = *(fdb_kvs_id_t*)aux;
2874        query.id = kv_id_req;
2875        query.snap_tag_idx = 0;
2876        a = avl_search_greater(&file->wal->wal_snapshot_tree,
2877                               &query.avl_id, _wal_snap_cmp);
2878        if (a) {
2879            shandle = _get_entry(a, struct snap_handle, avl_id);
2880            if (shandle->id != kv_id_req) {
2881                a = NULL;
2882            }
2883        }
2884        // cleanup any snapshot handles not reclaimed by wal_flush
2885        for (next_a = NULL; a; a = next_a) {
2886            shandle = _get_entry(a, struct snap_handle, avl_id);
2887            if (_wal_snap_is_immutable(shandle)) {
2888                fdb_log(log_callback, FDB_RESULT_INVALID_ARGS,
2889                        "WAL closed before snapshot close in kv id %" _F64
2890                        " in file %s", shandle->id, file->filename);
2891            }
2892            if (shandle->id != kv_id_req) {
2893                break;
2894            }
2895            next_a = avl_next(a);
2896            avl_remove(&file->wal->wal_snapshot_tree, a);
2897            for (struct list_elem *e = list_begin(&shandle->active_txn_list);
2898                 e;) {
2899                struct list_elem *e_next = list_next(e);
2900                struct wal_txn_wrapper *active_txn = _get_entry(e,
2901                        struct wal_txn_wrapper, le);
2902                free(active_txn);
2903                e = e_next;
2904            }
2905            free(shandle);
2906        }
2907    } else {
2908        // cleanup all snapshot handles not reclaimed by wal_flush
2909        for (a = avl_first(&file->wal->wal_snapshot_tree), next_a = NULL;
2910             a; a = next_a) {
2911            shandle = _get_entry(a, struct snap_handle, avl_id);
2912            if (_wal_snap_is_immutable(shandle)) {
2913                fdb_log(log_callback, FDB_RESULT_INVALID_ARGS,
2914                        "WAL closed before snapshot close in kv id %" _F64
2915                        " with %" _F64 " docs in file %s", shandle->id,
2916                        atomic_get_uint64_t(&shandle->wal_ndocs), file->filename);
2917            }
2918            next_a = avl_next(a);
2919            avl_remove(&file->wal->wal_snapshot_tree, a);
2920            for (struct list_elem *e = list_begin(&shandle->active_txn_list);
2921                 e;) {
2922                struct list_elem *e_next = list_next(e);
2923                struct wal_txn_wrapper *active_txn = _get_entry(e,
2924                        struct wal_txn_wrapper, le);
2925                free(active_txn);
2926                e = e_next;
2927            }
2928            free(shandle);
2929        }
2930    }
2931
2932    for (; i < num_shards; ++i) {
2933        spin_lock(&file->wal->key_shards[i].lock);
2934        a = avl_first(&file->wal->key_shards[i]._map);
2935        while (a) {
2936            header = _get_entry(a, struct wal_item_header, avl_key);
2937            if (type == WAL_DISCARD_KV_INS) { // multi KV ins mode
2938                buf2kvid(header->chunksize, header->key, &kv_id);
2939                // begin while loop only on matching KV ID
2940                e = (kv_id == kv_id_req)?(list_begin(&header->items)):(NULL);
2941            } else {
2942                kv_id = 0;
2943                e = list_begin(&header->items);
2944            }
2945
2946            committed = false;
2947            while (e) {
2948                item = _get_entry(e, struct wal_item, list_elem);
2949                if ( type == WAL_DISCARD_ALL ||
2950                     (type == WAL_DISCARD_UNCOMMITTED_ONLY &&
2951                      !(item->flag & WAL_ITEM_COMMITTED)) ||
2952                     type == WAL_DISCARD_KV_INS) {
2953                    // remove from header's list
2954                    e = list_remove(&header->items, e);
2955                    if (!(item->flag & WAL_ITEM_COMMITTED)) {
2956                        // and also remove from transaction's list
2957                        list_remove(item->txn->items, &item->list_elem_txn);
2958                        if (item->action != WAL_ACT_REMOVE) {
2959                            // mark as stale if item is not committed and not an immediate remove
2960                            filemgr_mark_stale(file, item->offset, item->doc_size);
2961                        }
2962                    } else {
2963                        // committed item exists and will be removed
2964                        committed = true;
2965                    }
2966
2967                    if (file->config->seqtree_opt == FDB_SEQTREE_USE) {
2968                        // remove from seq hash table
2969                        seq_shard_num = item->seqnum % num_shards;
2970                        spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
2971                        avl_remove(&file->wal->seq_shards[seq_shard_num]._map,
2972                                   &item->avl_seq);
2973                        spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
2974                    }
2975
2976                    if (item->action != WAL_ACT_REMOVE) {
2977                        atomic_sub_uint64_t(&file->wal->datasize, item->doc_size,
2978                                            std::memory_order_relaxed);
2979                    }
2980                    if (item->txn == &file->global_txn || committed) {
2981                        if (item->action != WAL_ACT_INSERT) {
2982                            _wal_update_stat(file, kv_id, _WAL_DROP_DELETE);
2983                        } else {
2984                            _wal_update_stat(file, kv_id, _WAL_DROP_SET);
2985                        }
2986                        atomic_decr_uint32_t(&file->wal->num_flushable);
2987                    }
2988                    free(item);
2989                    atomic_decr_uint32_t(&file->wal->size);
2990                    mem_overhead += sizeof(struct wal_item);
2991                } else {
2992                    e = list_next(e);
2993                }
2994            }
2995            a = avl_next(a);
2996
2997            if (list_begin(&header->items) == NULL) {
2998                // wal_item_header becomes empty
2999                // free header and remove from key map
3000                avl_remove(&file->wal->key_shards[i]._map,
3001                           &header->avl_key);
3002                mem_overhead += sizeof(struct wal_item_header) + header->keylen;
3003                free(header->key);
3004                free(header);
3005            }
3006        }
3007        spin_unlock(&file->wal->key_shards[i].lock);
3008    }
3009    atomic_sub_uint64_t(&file->wal->mem_overhead, mem_overhead,
3010                        std::memory_order_relaxed);
3011
3012    return FDB_RESULT_SUCCESS;
3013}
3014
3015fdb_status wal_close(struct filemgr *file, err_log_callback *log_callback)
3016{
3017    return _wal_close(file, WAL_DISCARD_UNCOMMITTED_ONLY, NULL, log_callback);
3018}
3019
3020// discard all WAL entries
3021fdb_status wal_shutdown(struct filemgr *file, err_log_callback *log_callback)
3022{
3023    fdb_status wr = _wal_close(file, WAL_DISCARD_ALL, NULL, log_callback);
3024    atomic_store_uint32_t(&file->wal->size, 0);
3025