1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include <stdlib.h>
19 #include <string.h>
20 #include <stdint.h>
21 
22 #include "filemgr.h"
23 #include "common.h"
24 #include "hash.h"
25 #include "docio.h"
26 #include "wal.h"
27 #include "hash_functions.h"
28 #include "fdb_internal.h"
29 
30 #include "memleak.h"
31 
32 #ifdef __DEBUG
33 #ifndef __DEBUG_WAL
34     #undef DBG
35     #undef DBGCMD
36     #undef DBGSW
37     #define DBG(...)
38     #define DBGCMD(...)
39     #define DBGSW(n, ...)
40 #endif
41 #endif
42 
_wal_hash_bykey(struct hash *hash, struct hash_elem *e)43 INLINE uint32_t _wal_hash_bykey(struct hash *hash, struct hash_elem *e)
44 {
45     struct wal_item_header *item = _get_entry(e, struct wal_item_header, he_key);
46     return chksum((uint8_t*)item->key, item->keylen) % ((uint64_t)hash->nbuckets);
47 }
48 
_wal_cmp_bykey(struct hash_elem *a, struct hash_elem *b)49 INLINE int _wal_cmp_bykey(struct hash_elem *a, struct hash_elem *b)
50 {
51     struct wal_item_header *aa, *bb;
52     aa = _get_entry(a, struct wal_item_header, he_key);
53     bb = _get_entry(b, struct wal_item_header, he_key);
54 
55     if (aa->keylen == bb->keylen) return memcmp(aa->key, bb->key, aa->keylen);
56     else {
57         size_t len = MIN(aa->keylen , bb->keylen);
58         int cmp = memcmp(aa->key, bb->key, len);
59         if (cmp != 0) return cmp;
60         else {
61             return (int)((int)aa->keylen - (int)bb->keylen);
62         }
63     }
64 }
65 
_wal_hash_byseq(struct hash *hash, struct hash_elem *e)66 INLINE uint32_t _wal_hash_byseq(struct hash *hash, struct hash_elem *e)
67 {
68     struct wal_item *item = _get_entry(e, struct wal_item, he_seq);
69     return (item->seqnum) % ((uint64_t)hash->nbuckets);
70 }
71 
_wal_cmp_byseq(struct hash_elem *a, struct hash_elem *b)72 INLINE int _wal_cmp_byseq(struct hash_elem *a, struct hash_elem *b)
73 {
74     struct wal_item *aa, *bb;
75     aa = _get_entry(a, struct wal_item, he_seq);
76     bb = _get_entry(b, struct wal_item, he_seq);
77 
78     if (aa->flag & WAL_ITEM_MULTI_KV_INS_MODE) {
79         // multi KV instance mode
80         int size_chunk = aa->header->chunksize;
81         fdb_kvs_id_t id_aa, id_bb;
82         // KV ID is stored at the first 8 bytes in the key
83         buf2kvid(size_chunk, aa->header->key, &id_aa);
84         buf2kvid(size_chunk, bb->header->key, &id_bb);
85         if (id_aa < id_bb) {
86             return -1;
87         } else if (id_aa > id_bb) {
88             return 1;
89         } else {
90             return _CMP_U64(aa->seqnum, bb->seqnum);
91         }
92     } else {
93         return _CMP_U64(aa->seqnum, bb->seqnum);
94     }
95 }
96 
wal_init(struct filemgr *file, int nbucket)97 fdb_status wal_init(struct filemgr *file, int nbucket)
98 {
99     size_t i, num_hash_buckets;
100     size_t num_all_shards;
101 
102     file->wal->flag = WAL_FLAG_INITIALIZED;
103     atomic_init_uint32_t(&file->wal->size, 0);
104     atomic_init_uint32_t(&file->wal->num_flushable, 0);
105     atomic_init_uint64_t(&file->wal->datasize, 0);
106     file->wal->wal_dirty = FDB_WAL_CLEAN;
107 
108     list_init(&file->wal->txn_list);
109     spin_init(&file->wal->lock);
110 
111     if (file->config->num_wal_shards) {
112         file->wal->num_shards = file->config->num_wal_shards;
113     } else {
114         file->wal->num_shards = DEFAULT_NUM_WAL_PARTITIONS;
115     }
116 
117     // Create one more WAL shard (num_shards+1)
118     // The additional shard is reserved for compactor
119     num_all_shards = wal_get_num_all_shards(file);
120     file->wal->key_shards = (wal_shard_by_key *)
121         malloc(sizeof(struct wal_shard_by_key) * num_all_shards);
122     file->wal->seq_shards = (wal_shard_by_seq *)
123         malloc(sizeof(struct wal_shard_by_seq) * num_all_shards);
124 
125     num_hash_buckets = nbucket / file->wal->num_shards;
126     for (i = 0; i < num_all_shards; ++i) {
127         if (i == file->wal->num_shards - 1) {
128             num_hash_buckets = nbucket - (num_hash_buckets * i);
129         } else if (i == file->wal->num_shards) {
130             // WAL shard for compactor .. use more buckets
131             num_hash_buckets = nbucket;
132         }
133         hash_init(&file->wal->key_shards[i].hash_bykey, num_hash_buckets,
134                   _wal_hash_bykey, _wal_cmp_bykey);
135         hash_init(&file->wal->seq_shards[i].hash_byseq, num_hash_buckets,
136                   _wal_hash_byseq, _wal_cmp_byseq);
137         list_init(&file->wal->key_shards[i].list);
138         spin_init(&file->wal->key_shards[i].lock);
139         spin_init(&file->wal->seq_shards[i].lock);
140     }
141 
142     DBG("wal item size %" _F64 "\n", sizeof(struct wal_item));
143     return FDB_RESULT_SUCCESS;
144 }
145 
wal_is_initialized(struct filemgr *file)146 int wal_is_initialized(struct filemgr *file)
147 {
148     return file->wal->flag & WAL_FLAG_INITIALIZED;
149 }
150 
wal_insert(fdb_txn *txn, struct filemgr *file, fdb_doc *doc, uint64_t offset, int is_compactor)151 fdb_status wal_insert(fdb_txn *txn,
152                       struct filemgr *file,
153                       fdb_doc *doc,
154                       uint64_t offset,
155                       int is_compactor)
156 {
157     struct wal_item *item;
158     struct wal_item_header query, *header;
159     struct list_elem *le;
160     struct hash_elem *he;
161     void *key = doc->key;
162     size_t keylen = doc->keylen;
163     size_t chk_sum;
164     size_t shard_num;
165     fdb_kvs_id_t kv_id;
166 
167     if (file->kv_header) { // multi KV instance mode
168         buf2kvid(file->config->chunksize, doc->key, &kv_id);
169     } else {
170         kv_id = 0;
171     }
172     query.key = key;
173     query.keylen = keylen;
174 
175     // During the compaction, WAL entry inserted by compactor is always stored
176     // in the special shard (shard_num), while WAL entry inserted by normal
177     // writer is stored in the corresponding normal shards (0 ~ shard_num-1).
178     // Note that wal_find() always searches the normal shards only, thus
179     // documents inserted by compactor but not exist in the normal shards
180     // cannot be retrieved by wal_find(). However, fdb_get() continues to
181     // search in the HB+trie in the old file if wal_find() fails, thus they
182     // can be retrieved eventually.
183     chk_sum = chksum((uint8_t*)key, keylen);
184     if (is_compactor) {
185         // Document inserted by compactor is always stored in
186         // the compactor's shard (i.e., shards[shard_num])
187         shard_num = file->wal->num_shards;
188     } else {
189         // Insertion by normal writer
190         shard_num = chk_sum % file->wal->num_shards;
191     }
192     spin_lock(&file->wal->key_shards[shard_num].lock);
193 
194     he = hash_find_by_hash_val(&file->wal->key_shards[shard_num].hash_bykey,
195                                &query.he_key, (uint32_t) chk_sum);
196 
197     if (he) {
198         // already exist .. retrieve header
199         header = _get_entry(he, struct wal_item_header, he_key);
200 
201         // it cannot happen that
202         // same doc already exists in the compactor's shard
203         assert(!is_compactor);
204 
205         // find uncommitted item belonging to the same txn
206         le = list_begin(&header->items);
207         while (le) {
208             item = _get_entry(le, struct wal_item, list_elem);
209 
210             if (item->txn == txn && !(item->flag & WAL_ITEM_COMMITTED)) {
211                 item->flag &= ~WAL_ITEM_FLUSH_READY;
212 
213                 size_t seq_shard_num = item->seqnum % file->wal->num_shards;
214                 spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
215                 hash_remove(&file->wal->seq_shards[seq_shard_num].hash_byseq,
216                             &item->he_seq);
217                 spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
218                 item->seqnum = doc->seqnum;
219                 seq_shard_num = doc->seqnum % file->wal->num_shards;
220                 spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
221                 hash_insert(&file->wal->seq_shards[seq_shard_num].hash_byseq,
222                             &item->he_seq);
223                 spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
224 
225                 atomic_add_uint64_t(&file->wal->datasize, doc->size_ondisk - item->doc_size);
226 
227                 item->doc_size = doc->size_ondisk;
228                 item->offset = offset;
229                 item->action = doc->deleted ? WAL_ACT_LOGICAL_REMOVE : WAL_ACT_INSERT;
230 
231                 // move the item to the front of the list (header)
232                 list_remove(&header->items, &item->list_elem);
233                 list_push_front(&header->items, &item->list_elem);
234                 break;
235             }
236             le = list_next(le);
237         }
238 
239         if (le == NULL) {
240             // not exist
241             // create new item
242             item = (struct wal_item *)malloc(sizeof(struct wal_item));
243             item->flag = 0x0;
244 
245             if (file->kv_header) { // multi KV instance mode
246                 item->flag |= WAL_ITEM_MULTI_KV_INS_MODE;
247             }
248             item->txn = txn;
249             if (txn == &file->global_txn) {
250                 atomic_incr_uint32_t(&file->wal->num_flushable);
251             }
252             item->header = header;
253 
254             item->seqnum = doc->seqnum;
255             item->action = doc->deleted ? WAL_ACT_LOGICAL_REMOVE : WAL_ACT_INSERT;
256             item->offset = offset;
257             item->doc_size = doc->size_ondisk;
258             atomic_add_uint64_t(&file->wal->datasize, doc->size_ondisk);
259 
260             // don't care about compactor's shard here
261             size_t seq_shard_num = doc->seqnum % file->wal->num_shards;
262             spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
263             hash_insert(&file->wal->seq_shards[seq_shard_num].hash_byseq, &item->he_seq);
264             spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
265 
266             // insert into header's list
267             list_push_front(&header->items, &item->list_elem);
268             // also insert into transaction's list
269             list_push_back(txn->items, &item->list_elem_txn);
270 
271             atomic_incr_uint32_t(&file->wal->size);
272         }
273     } else {
274         // not exist .. create new one
275         // create new header and new item
276         header = (struct wal_item_header*)malloc(sizeof(struct wal_item_header));
277         list_init(&header->items);
278         header->chunksize = file->config->chunksize;
279         header->keylen = keylen;
280         header->key = (void *)malloc(header->keylen);
281         memcpy(header->key, key, header->keylen);
282 
283         hash_insert_by_hash_val(&file->wal->key_shards[shard_num].hash_bykey,
284                                 &header->he_key, (uint32_t) chk_sum);
285 
286         item = (struct wal_item *)malloc(sizeof(struct wal_item));
287         // entries inserted by compactor is already committed
288         if (is_compactor) {
289             item->flag = WAL_ITEM_COMMITTED | WAL_ITEM_BY_COMPACTOR;
290         } else {
291             item->flag = 0x0;
292         }
293         if (file->kv_header) { // multi KV instance mode
294             item->flag |= WAL_ITEM_MULTI_KV_INS_MODE;
295         }
296         item->txn = txn;
297         if (txn == &file->global_txn) {
298             atomic_incr_uint32_t(&file->wal->num_flushable);
299         }
300         item->header = header;
301 
302         item->seqnum = doc->seqnum;
303         item->action = doc->deleted ? WAL_ACT_LOGICAL_REMOVE : WAL_ACT_INSERT;
304         item->offset = offset;
305         item->doc_size = doc->size_ondisk;
306         atomic_add_uint64_t(&file->wal->datasize, doc->size_ondisk);
307 
308         size_t seq_shard_num;
309         if (is_compactor) {
310             // Document inserted by compactor is always stored in
311             // the compactor's shard
312             seq_shard_num = file->wal->num_shards;
313         } else {
314             seq_shard_num = doc->seqnum % file->wal->num_shards;
315         }
316         spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
317         hash_insert(&file->wal->seq_shards[seq_shard_num].hash_byseq, &item->he_seq);
318         spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
319 
320         // insert into header's list
321         // (pushing front is ok for compactor because no other item already exists)
322         list_push_front(&header->items, &item->list_elem);
323         if (!is_compactor) {
324             // also insert into transaction's list
325             list_push_back(txn->items, &item->list_elem_txn);
326         } else {
327             // increase num_docs
328             _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, 1);
329         }
330 
331         // insert an item header into a WAL shard's list
332         list_push_back(&file->wal->key_shards[shard_num].list, &header->list_elem);
333         atomic_incr_uint32_t(&file->wal->size);
334     }
335 
336     spin_unlock(&file->wal->key_shards[shard_num].lock);
337 
338     return FDB_RESULT_SUCCESS;
339 }
340 
_wal_find(fdb_txn *txn, struct filemgr *file, fdb_kvs_id_t kv_id, fdb_doc *doc, uint64_t *offset)341 static fdb_status _wal_find(fdb_txn *txn,
342                             struct filemgr *file,
343                             fdb_kvs_id_t kv_id,
344                             fdb_doc *doc,
345                             uint64_t *offset)
346 {
347     struct wal_item item_query, *item = NULL;
348     struct wal_item_header query, *header = NULL;
349     struct list_elem *le = NULL;
350     struct hash_elem *he = NULL;
351     void *key = doc->key;
352     size_t keylen = doc->keylen;
353 
354     if (doc->seqnum == SEQNUM_NOT_USED || (key && keylen>0)) {
355         size_t chk_sum = chksum((uint8_t*)key, keylen);
356         // _wal_find() doesn't care compactor's shard
357         size_t shard_num = chk_sum % file->wal->num_shards;
358         spin_lock(&file->wal->key_shards[shard_num].lock);
359         // search by key
360         query.key = key;
361         query.keylen = keylen;
362         he = hash_find_by_hash_val(&file->wal->key_shards[shard_num].hash_bykey,
363                                    &query.he_key, (uint32_t) chk_sum);
364         if (he) {
365             // retrieve header
366             header = _get_entry(he, struct wal_item_header, he_key);
367             le = list_begin(&header->items);
368             while(le) {
369                 item = _get_entry(le, struct wal_item, list_elem);
370                 // only committed items can be seen by the other handles, OR
371                 // items belonging to the same txn can be found, OR
372                 // a transaction's isolation level is read uncommitted.
373                 if ((item->flag & WAL_ITEM_COMMITTED) ||
374                     (item->txn == txn) ||
375                     (txn->isolation == FDB_ISOLATION_READ_UNCOMMITTED)) {
376                     *offset = item->offset;
377                     if (item->action == WAL_ACT_INSERT) {
378                         doc->deleted = false;
379                     } else {
380                         doc->deleted = true;
381                     }
382                     spin_unlock(&file->wal->key_shards[shard_num].lock);
383                     return FDB_RESULT_SUCCESS;
384                 }
385                 le = list_next(le);
386             }
387         }
388         spin_unlock(&file->wal->key_shards[shard_num].lock);
389     } else {
390         // search by seqnum
391         struct wal_item_header temp_header;
392 
393         if (file->kv_header) { // multi KV instance mode
394             temp_header.key = (void*)alca(uint8_t, file->config->chunksize);
395             kvid2buf(file->config->chunksize, kv_id, temp_header.key);
396             item_query.header = &temp_header;
397         }
398         item_query.seqnum = doc->seqnum;
399 
400         size_t shard_num = doc->seqnum % file->wal->num_shards;
401         spin_lock(&file->wal->seq_shards[shard_num].lock);
402         he = hash_find(&file->wal->seq_shards[shard_num].hash_byseq, &item_query.he_seq);
403         if (he) {
404             item = _get_entry(he, struct wal_item, he_seq);
405             if ((item->flag & WAL_ITEM_COMMITTED) ||
406                 (item->txn == txn) ||
407                 (txn->isolation == FDB_ISOLATION_READ_UNCOMMITTED)) {
408                 *offset = item->offset;
409                 if (item->action == WAL_ACT_INSERT) {
410                     doc->deleted = false;
411                 } else {
412                     doc->deleted = true;
413                 }
414                 spin_unlock(&file->wal->seq_shards[shard_num].lock);
415                 return FDB_RESULT_SUCCESS;
416             }
417         }
418         spin_unlock(&file->wal->seq_shards[shard_num].lock);
419     }
420 
421     return FDB_RESULT_KEY_NOT_FOUND;
422 }
423 
wal_find(fdb_txn *txn, struct filemgr *file, fdb_doc *doc, uint64_t *offset)424 fdb_status wal_find(fdb_txn *txn, struct filemgr *file, fdb_doc *doc, uint64_t *offset)
425 {
426     return _wal_find(txn, file, 0, doc, offset);
427 }
428 
wal_find_kv_id(fdb_txn *txn, struct filemgr *file, fdb_kvs_id_t kv_id, fdb_doc *doc, uint64_t *offset)429 fdb_status wal_find_kv_id(fdb_txn *txn,
430                           struct filemgr *file,
431                           fdb_kvs_id_t kv_id,
432                           fdb_doc *doc,
433                           uint64_t *offset)
434 {
435     return _wal_find(txn, file, kv_id, doc, offset);
436 }
437 
438 // move all uncommitted items into 'new_file'
wal_txn_migration(void *dbhandle, void *new_dhandle, struct filemgr *old_file, struct filemgr *new_file, wal_doc_move_func *move_doc)439 fdb_status wal_txn_migration(void *dbhandle,
440                              void *new_dhandle,
441                              struct filemgr *old_file,
442                              struct filemgr *new_file,
443                              wal_doc_move_func *move_doc)
444 {
445     uint64_t offset;
446     fdb_doc doc;
447     fdb_txn *txn;
448     struct wal_txn_wrapper *txn_wrapper;
449     struct wal_item_header *header;
450     struct wal_item *item;
451     struct list_elem *e1, *e2;
452     size_t i = 0;
453     size_t num_shards = old_file->wal->num_shards;
454 
455     // Note that the caller (i.e., compactor) alreay owns the locks on
456     // both old_file and new_file filemgr instances. Therefore, it is OK to
457     // grab each partition lock individually and move all uncommitted items
458     // to the new_file filemgr instance.
459 
460     for (; i < num_shards; ++i) {
461         spin_lock(&old_file->wal->key_shards[i].lock);
462         e1 = list_begin(&old_file->wal->key_shards[i].list);
463         while(e1) {
464             header = _get_entry(e1, struct wal_item_header, list_elem);
465             e2 = list_end(&header->items);
466             while(e2) {
467                 item = _get_entry(e2, struct wal_item, list_elem);
468                 if (!(item->flag & WAL_ITEM_COMMITTED)) {
469                     // not committed yet
470                     // move doc
471                     offset = move_doc(dbhandle, new_dhandle, item, &doc);
472                     // Note that all items belonging to global_txn should be
473                     // flushed before calling this function
474                     // (migrate transactional items only).
475                     fdb_assert(item->txn != &old_file->global_txn,
476                                (uint64_t)item->txn, 0);
477                     // insert into new_file's WAL
478                     wal_insert(item->txn, new_file, &doc, offset, 0);
479                     // remove from seq hash table
480                     size_t shard_num = item->seqnum % num_shards;
481                     spin_lock(&old_file->wal->seq_shards[shard_num].lock);
482                     hash_remove(&old_file->wal->seq_shards[shard_num].hash_byseq,
483                                 &item->he_seq);
484                     spin_unlock(&old_file->wal->seq_shards[shard_num].lock);
485                     // remove from header's list
486                     e2 = list_remove_reverse(&header->items, e2);
487                     // remove from transaction's list
488                     list_remove(item->txn->items, &item->list_elem_txn);
489                     // decrease num_flushable of old_file if non-transactional update
490                     if (item->txn == &old_file->global_txn) {
491                         atomic_decr_uint32_t(&old_file->wal->num_flushable);
492                     }
493                     if (item->action != WAL_ACT_REMOVE) {
494                         atomic_sub_uint64_t(&old_file->wal->datasize, item->doc_size);
495                     }
496                     // free item
497                     free(item);
498                     // free doc
499                     free(doc.key);
500                     free(doc.meta);
501                     free(doc.body);
502                     atomic_decr_uint32_t(&old_file->wal->size);
503                 } else {
504                     e2= list_prev(e2);
505                 }
506             }
507 
508             if (list_begin(&header->items) == NULL) {
509                 // header's list becomes empty
510                 // remove from key hash table
511                 hash_remove(&old_file->wal->key_shards[i].hash_bykey, &header->he_key);
512                 // remove from wal list
513                 e1 = list_remove(&old_file->wal->key_shards[i].list, &header->list_elem);
514                 // free key & header
515                 free(header->key);
516                 free(header);
517             } else {
518                 e1 = list_next(e1);
519             }
520         }
521         spin_unlock(&old_file->wal->key_shards[i].lock);
522     }
523 
524     spin_lock(&old_file->wal->lock);
525 
526     // migrate all entries in txn list
527     e1 = list_begin(&old_file->wal->txn_list);
528     while(e1) {
529         txn_wrapper = _get_entry(e1, struct wal_txn_wrapper, le);
530         txn = txn_wrapper->txn;
531         // except for global_txn
532         if (txn != &old_file->global_txn) {
533             e1 = list_remove(&old_file->wal->txn_list, &txn_wrapper->le);
534             list_push_front(&new_file->wal->txn_list, &txn_wrapper->le);
535             // remove previous header info
536             txn->prev_hdr_bid = BLK_NOT_FOUND;
537         } else {
538             e1 = list_next(e1);
539         }
540     }
541 
542     spin_unlock(&old_file->wal->lock);
543 
544     return FDB_RESULT_SUCCESS;
545 }
546 
wal_commit(fdb_txn *txn, struct filemgr *file, wal_commit_mark_func *func, err_log_callback *log_callback)547 fdb_status wal_commit(fdb_txn *txn, struct filemgr *file,
548                       wal_commit_mark_func *func, err_log_callback *log_callback)
549 {
550     int prev_commit;
551     wal_item_action prev_action;
552     struct wal_item *item;
553     struct wal_item *_item;
554     struct list_elem *e1, *e2;
555     fdb_kvs_id_t kv_id;
556     fdb_status status;
557     size_t shard_num;
558 
559     e1 = list_begin(txn->items);
560     while(e1) {
561         item = _get_entry(e1, struct wal_item, list_elem_txn);
562         assert(item->txn == txn);
563         // Grab the WAL key shard lock.
564         shard_num = chksum((uint8_t*)item->header->key, item->header->keylen) %
565             file->wal->num_shards;
566         spin_lock(&file->wal->key_shards[shard_num].lock);
567 
568         if (!(item->flag & WAL_ITEM_COMMITTED)) {
569             // get KVS ID
570             if (item->flag & WAL_ITEM_MULTI_KV_INS_MODE) {
571                 buf2kvid(item->header->chunksize, item->header->key, &kv_id);
572             } else {
573                 kv_id = 0;
574             }
575 
576             item->flag |= WAL_ITEM_COMMITTED;
577             // append commit mark if necessary
578             if (func) {
579                 status = func(txn->handle, item->offset);
580                 if (status != FDB_RESULT_SUCCESS) {
581                     fdb_log(log_callback, status,
582                             "Error in appending a commit mark at offset %" _F64 " in "
583                             "a database file '%s'", item->offset, file->filename);
584                     spin_unlock(&file->wal->key_shards[shard_num].lock);
585                     return status;
586                 }
587             }
588             // remove previously committed item
589             prev_commit = 0;
590             // next item on the wal_item_header's items
591             e2 = list_next(&item->list_elem);
592             while(e2) {
593                 _item = _get_entry(e2, struct wal_item, list_elem);
594                 e2 = list_next(e2);
595                 // committed but not flush-ready
596                 // (flush-readied item will be removed by flushing)
597                 if ((_item->flag & WAL_ITEM_COMMITTED) &&
598                     !(_item->flag & WAL_ITEM_FLUSH_READY)) {
599                     list_remove(&item->header->items, &_item->list_elem);
600                     size_t seq_shard_num = _item->seqnum % file->wal->num_shards;
601                     spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
602                     hash_remove(&file->wal->seq_shards[seq_shard_num].hash_byseq,
603                                 &_item->he_seq);
604                     spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
605                     prev_action = _item->action;
606                     prev_commit = 1;
607                     atomic_decr_uint32_t(&file->wal->size);
608                     atomic_decr_uint32_t(&file->wal->num_flushable);
609                     if (item->action != WAL_ACT_REMOVE) {
610                         atomic_sub_uint64_t(&file->wal->datasize, _item->doc_size);
611                     }
612                     free(_item);
613                 }
614             }
615             if (!prev_commit) {
616                 // there was no previous commit .. increase num_docs
617                 _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, 1);
618                 if (item->action == WAL_ACT_LOGICAL_REMOVE) {
619                     _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, 1);
620                 }
621             } else {
622                 if (prev_action == WAL_ACT_INSERT &&
623                     item->action == WAL_ACT_LOGICAL_REMOVE) {
624                     _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, 1);
625                 } else if (prev_action == WAL_ACT_LOGICAL_REMOVE &&
626                            item->action == WAL_ACT_INSERT) {
627                     _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, -1);
628                 }
629             }
630             // increase num_flushable if it is transactional update
631             if (item->txn != &file->global_txn) {
632                 atomic_incr_uint32_t(&file->wal->num_flushable);
633             }
634             // move the committed item to the end of the wal_item_header's list
635             list_remove(&item->header->items, &item->list_elem);
636             list_push_back(&item->header->items, &item->list_elem);
637         }
638 
639         // remove from transaction's list
640         e1 = list_remove(txn->items, e1);
641         spin_unlock(&file->wal->key_shards[shard_num].lock);
642     }
643 
644     return FDB_RESULT_SUCCESS;
645 }
646 
_wal_flush_cmp(struct avl_node *a, struct avl_node *b, void *aux)647 static int _wal_flush_cmp(struct avl_node *a, struct avl_node *b, void *aux)
648 {
649     struct wal_item *aa, *bb;
650     aa = _get_entry(a, struct wal_item, avl);
651     bb = _get_entry(b, struct wal_item, avl);
652 
653     if (aa->old_offset < bb->old_offset) {
654         return -1;
655     } else if (aa->old_offset > bb->old_offset) {
656         return 1;
657     } else {
658         // old_offset can be 0 if the document was newly inserted
659         if (aa->offset < bb->offset) {
660             return -1;
661         } else if (aa->offset > bb->offset) {
662             return 1;
663         } else {
664             return 0;
665         }
666     }
667 }
668 
wal_release_flushed_items(struct filemgr *file, struct avl_tree *flush_items)669 fdb_status wal_release_flushed_items(struct filemgr *file,
670                                      struct avl_tree *flush_items)
671 {
672     struct avl_tree *tree = flush_items;
673     struct avl_node *a;
674     struct wal_item *item;
675     fdb_kvs_id_t kv_id;
676     size_t shard_num, seq_shard_num;
677 
678     // scan and remove entries in the avl-tree
679     while (1) {
680         if ((a = avl_first(tree)) == NULL) {
681             break;
682         }
683         item = _get_entry(a, struct wal_item, avl);
684         avl_remove(tree, &item->avl);
685 
686         // Grab the WAL key shard lock.
687         if (item->flag & WAL_ITEM_BY_COMPACTOR) {
688             shard_num = file->wal->num_shards;
689         } else {
690             shard_num = chksum((uint8_t*)item->header->key, item->header->keylen) %
691                 file->wal->num_shards;
692         }
693         spin_lock(&file->wal->key_shards[shard_num].lock);
694 
695         // get KVS ID
696         if (item->flag & WAL_ITEM_MULTI_KV_INS_MODE) {
697             buf2kvid(item->header->chunksize, item->header->key, &kv_id);
698         } else {
699             kv_id = 0;
700         }
701 
702         list_remove(&item->header->items, &item->list_elem);
703         if (item->flag & WAL_ITEM_BY_COMPACTOR) {
704             seq_shard_num = file->wal->num_shards;
705         } else {
706             seq_shard_num = item->seqnum % file->wal->num_shards;
707         }
708         spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
709         hash_remove(&file->wal->seq_shards[seq_shard_num].hash_byseq,
710                     &item->he_seq);
711         spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
712         if (list_begin(&item->header->items) == NULL) {
713             // wal_item_header becomes empty
714             // free header and remove from hash table & wal list
715             list_remove(&file->wal->key_shards[shard_num].list, &item->header->list_elem);
716             hash_remove(&file->wal->key_shards[shard_num].hash_bykey, &item->header->he_key);
717             free(item->header->key);
718             free(item->header);
719         }
720 
721         if (item->action == WAL_ACT_LOGICAL_REMOVE ||
722             item->action == WAL_ACT_REMOVE) {
723             _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, -1);
724         }
725         _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, -1);
726         atomic_decr_uint32_t(&file->wal->size);
727         atomic_decr_uint32_t(&file->wal->num_flushable);
728         if (item->action != WAL_ACT_REMOVE) {
729             atomic_sub_uint64_t(&file->wal->datasize, item->doc_size);
730         }
731         free(item);
732         spin_unlock(&file->wal->key_shards[shard_num].lock);
733     }
734 
735     return FDB_RESULT_SUCCESS;
736 }
737 
_wal_flush(struct filemgr *file, void *dbhandle, wal_flush_func *flush_func, wal_get_old_offset_func *get_old_offset, struct avl_tree *flush_items, bool by_compactor)738 static fdb_status _wal_flush(struct filemgr *file,
739                              void *dbhandle,
740                              wal_flush_func *flush_func,
741                              wal_get_old_offset_func *get_old_offset,
742                              struct avl_tree *flush_items,
743                              bool by_compactor)
744 {
745     struct avl_tree *tree = flush_items;
746     struct avl_node *a;
747     struct list_elem *e, *ee;
748     struct wal_item *item;
749     struct wal_item_header *header;
750     size_t i = 0;
751     size_t num_shards = file->wal->num_shards;
752 
753     if (by_compactor) {
754         // If this flushing is requested by compactor,
755         // flush WAL entries by compactor only.
756         i = file->wal->num_shards;
757         num_shards = i+1;
758     }
759 
760     // sort by old byte-offset of the document (for sequential access)
761     avl_init(tree, NULL);
762     for (; i < num_shards; ++i) {
763         spin_lock(&file->wal->key_shards[i].lock);
764         e = list_begin(&file->wal->key_shards[i].list);
765         while (e) {
766             header = _get_entry(e, struct wal_item_header, list_elem);
767             ee = list_end(&header->items);
768             while (ee) {
769                 item = _get_entry(ee, struct wal_item, list_elem);
770                 // committed but not flushed items
771                 if (!(item->flag & WAL_ITEM_COMMITTED)) {
772                     break;
773                 }
774                 if (by_compactor &&
775                     !(item->flag & WAL_ITEM_BY_COMPACTOR)) {
776                     // during compaction, do not flush normally committed item
777                     break;
778                 }
779                 if (!(item->flag & WAL_ITEM_FLUSH_READY)) {
780                     item->flag |= WAL_ITEM_FLUSH_READY;
781                     // if WAL_ITEM_FLUSH_READY flag is set,
782                     // this item becomes immutable, so that
783                     // no other concurrent thread modifies it.
784                     if (by_compactor) {
785                         // During the first phase of compaction, we don't need to retrieve
786                         // the old offsets of WAL items because they are all new insertions
787                         // into the new file's hbtrie index.
788                         item->old_offset = 0;
789                         avl_insert(tree, &item->avl, _wal_flush_cmp);
790                     } else {
791                         spin_unlock(&file->wal->key_shards[i].lock);
792                         item->old_offset = get_old_offset(dbhandle, item);
793                         avl_insert(tree, &item->avl, _wal_flush_cmp);
794                         spin_lock(&file->wal->key_shards[i].lock);
795                     }
796                 }
797                 ee = list_prev(ee);
798             }
799             e = list_next(e);
800         }
801         spin_unlock(&file->wal->key_shards[i].lock);
802     }
803 
804     // scan and flush entries in the avl-tree
805     a = avl_first(tree);
806     while (a) {
807         item = _get_entry(a, struct wal_item, avl);
808         a = avl_next(a);
809 
810         // check weather this item is updated after insertion into tree
811         if (item->flag & WAL_ITEM_FLUSH_READY) {
812             fdb_status fs = flush_func(dbhandle, item);
813             if (fs != FDB_RESULT_SUCCESS) {
814                 fdb_kvs_handle *handle = (fdb_kvs_handle *) dbhandle;
815                 fdb_log(&handle->log_callback, fs,
816                         "Failed to flush WAL item (key '%s') into a database file '%s'",
817                         (const char *) item->header->key, file->filename);
818 
819                 return fs;
820             }
821         }
822     }
823 
824     return FDB_RESULT_SUCCESS;
825 }
826 
wal_flush(struct filemgr *file, void *dbhandle, wal_flush_func *flush_func, wal_get_old_offset_func *get_old_offset, struct avl_tree *flush_items)827 fdb_status wal_flush(struct filemgr *file,
828                      void *dbhandle,
829                      wal_flush_func *flush_func,
830                      wal_get_old_offset_func *get_old_offset,
831                      struct avl_tree *flush_items)
832 {
833     return _wal_flush(file, dbhandle, flush_func, get_old_offset,
834                       flush_items, false);
835 }
836 
wal_flush_by_compactor(struct filemgr *file, void *dbhandle, wal_flush_func *flush_func, wal_get_old_offset_func *get_old_offset, struct avl_tree *flush_items)837 fdb_status wal_flush_by_compactor(struct filemgr *file,
838                                   void *dbhandle,
839                                   wal_flush_func *flush_func,
840                                   wal_get_old_offset_func *get_old_offset,
841                                   struct avl_tree *flush_items)
842 {
843     return _wal_flush(file, dbhandle, flush_func, get_old_offset,
844                       flush_items, true);
845 }
846 
847 // Used to copy all the WAL items for non-durable snapshots
wal_snapshot(struct filemgr *file, void *dbhandle, fdb_txn *txn, fdb_seqnum_t *upto_seq, wal_snapshot_func *snapshot_func)848 fdb_status wal_snapshot(struct filemgr *file,
849                         void *dbhandle, fdb_txn *txn,
850                         fdb_seqnum_t *upto_seq,
851                         wal_snapshot_func *snapshot_func)
852 {
853     struct list_elem *e, *ee;
854     struct wal_item *item;
855     struct wal_item_header *header;
856     fdb_seqnum_t copy_upto = *upto_seq;
857     fdb_seqnum_t copied_seq = 0;
858     fdb_doc doc;
859     size_t i = 0;
860     size_t num_shards = file->wal->num_shards;
861 
862     for (; i < num_shards; ++i) {
863         spin_lock(&file->wal->key_shards[i].lock);
864         e = list_begin(&file->wal->key_shards[i].list);
865         while (e) {
866             header = _get_entry(e, struct wal_item_header, list_elem);
867             ee = list_begin(&header->items);
868             while (ee) {
869                 item = _get_entry(ee, struct wal_item, list_elem);
870                 if (item->flag & WAL_ITEM_BY_COMPACTOR) { // Always skip
871                     ee = list_next(ee); // items moved by compactor to prevent
872                     continue; // duplication of items in WAL & Main-index
873                 }
874                 if (copy_upto != FDB_SNAPSHOT_INMEM) {
875                     // Take stable snapshot in new_file: Skip all items that are...
876                     if (copy_upto < item->seqnum || // higher than requested seqnum
877                         !(item->flag & WAL_ITEM_COMMITTED)) { // or uncommitted
878                         ee = list_next(ee);
879                         continue;
880                     }
881                 } else { // An in-memory snapshot in current file..
882                     // Skip any uncommitted item, if not part of either global or
883                     // the current transaction
884                     if (!(item->flag & WAL_ITEM_COMMITTED) &&
885                         item->txn != &file->global_txn &&
886                         item->txn != txn) {
887                         ee = list_next(ee);
888                         continue;
889                     }
890                 }
891 
892                 doc.keylen = item->header->keylen;
893                 doc.key = malloc(doc.keylen); // (freed in fdb_snapshot_close)
894                 memcpy(doc.key, item->header->key, doc.keylen);
895                 doc.seqnum = item->seqnum;
896                 doc.deleted = (item->action == WAL_ACT_LOGICAL_REMOVE ||
897                                item->action == WAL_ACT_REMOVE);
898                 snapshot_func(dbhandle, &doc, item->offset);
899                 if (doc.seqnum > copied_seq) {
900                     copied_seq = doc.seqnum;
901                 }
902                 break; // We just require a single latest copy in the snapshot
903             }
904             e = list_next(e);
905         }
906         spin_unlock(&file->wal->key_shards[i].lock);
907     }
908 
909     *upto_seq = copied_seq; // Return to caller the highest copied seqnum
910     return FDB_RESULT_SUCCESS;
911 }
912 
913 // discard entries in txn
wal_discard(struct filemgr *file, fdb_txn *txn)914 fdb_status wal_discard(struct filemgr *file, fdb_txn *txn)
915 {
916     struct wal_item *item;
917     struct list_elem *e;
918     size_t shard_num, seq_shard_num;
919 
920     e = list_begin(txn->items);
921     while(e) {
922         item = _get_entry(e, struct wal_item, list_elem_txn);
923         shard_num = chksum((uint8_t*)item->header->key, item->header->keylen) %
924             file->wal->num_shards;
925         spin_lock(&file->wal->key_shards[shard_num].lock);
926 
927         // remove from seq hash table
928         seq_shard_num = item->seqnum % file->wal->num_shards;
929         spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
930         hash_remove(&file->wal->seq_shards[seq_shard_num].hash_byseq,
931                     &item->he_seq);
932         spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
933 
934         // remove from header's list
935         list_remove(&item->header->items, &item->list_elem);
936         // remove header if empty
937         if (list_begin(&item->header->items) == NULL) {
938             //remove from key hash table
939             hash_remove(&file->wal->key_shards[shard_num].hash_bykey,
940                         &item->header->he_key);
941             // remove from wal list
942             list_remove(&file->wal->key_shards[shard_num].list,
943                         &item->header->list_elem);
944             // free key and header
945             free(item->header->key);
946             free(item->header);
947         }
948         // remove from txn's list
949         e = list_remove(txn->items, e);
950         if (item->txn == &file->global_txn ||
951             item->flag & WAL_ITEM_COMMITTED) {
952             atomic_decr_uint32_t(&file->wal->num_flushable);
953         }
954         if (item->action != WAL_ACT_REMOVE) {
955             atomic_sub_uint64_t(&file->wal->datasize, item->doc_size);
956         }
957         // free
958         free(item);
959         atomic_decr_uint32_t(&file->wal->size);
960         spin_unlock(&file->wal->key_shards[shard_num].lock);
961     }
962 
963     return FDB_RESULT_SUCCESS;
964 }
965 
966 typedef enum wal_discard_type {
967     WAL_DISCARD_UNCOMMITTED_ONLY,
968     WAL_DISCARD_ALL,
969     WAL_DISCARD_KV_INS,
970 } wal_discard_t;
971 
972 // discard all entries
_wal_close(struct filemgr *file, wal_discard_t type, void *aux)973 static fdb_status _wal_close(struct filemgr *file,
974                              wal_discard_t type, void *aux)
975 {
976     struct wal_item *item;
977     struct wal_item_header *header;
978     struct list_elem *e1, *e2;
979     fdb_kvs_id_t kv_id, kv_id_req;
980     bool committed;
981     wal_item_action committed_item_action;
982     size_t i = 0, seq_shard_num;
983     size_t num_all_shards = wal_get_num_all_shards(file);
984 
985     if (type == WAL_DISCARD_KV_INS) { // multi KV ins mode
986         if (aux == NULL) { // aux must contain pointer to KV ID
987             return FDB_RESULT_INVALID_ARGS;
988         }
989         kv_id_req = *(fdb_kvs_id_t*)aux;
990     }
991 
992     for (; i < num_all_shards; ++i) {
993         spin_lock(&file->wal->key_shards[i].lock);
994         e1 = list_begin(&file->wal->key_shards[i].list);
995         while (e1) {
996             header = _get_entry(e1, struct wal_item_header, list_elem);
997             if (type == WAL_DISCARD_KV_INS) { // multi KV ins mode
998                 buf2kvid(header->chunksize, header->key, &kv_id);
999                 // begin while loop only on matching KV ID
1000                 e2 = (kv_id == kv_id_req)?(list_begin(&header->items)):(NULL);
1001             } else {
1002                 kv_id = 0;
1003                 e2 = list_begin(&header->items);
1004             }
1005 
1006             committed = false;
1007             while (e2) {
1008                 item = _get_entry(e2, struct wal_item, list_elem);
1009                 if ( type == WAL_DISCARD_ALL ||
1010                      (type == WAL_DISCARD_UNCOMMITTED_ONLY &&
1011                       !(item->flag & WAL_ITEM_COMMITTED)) ||
1012                      type == WAL_DISCARD_KV_INS) {
1013                     // remove from header's list
1014                     e2 = list_remove(&header->items, e2);
1015                     if (!(item->flag & WAL_ITEM_COMMITTED)) {
1016                         // and also remove from transaction's list
1017                         list_remove(item->txn->items, &item->list_elem_txn);
1018                     } else {
1019                         // committed item exists and will be removed
1020                         committed = true;
1021                         committed_item_action = item->action;
1022                     }
1023                     // remove from seq hash table
1024                     if (item->flag & WAL_ITEM_BY_COMPACTOR) {
1025                         seq_shard_num = file->wal->num_shards;
1026                     } else {
1027                         seq_shard_num = item->seqnum % file->wal->num_shards;
1028                     }
1029                     spin_lock(&file->wal->seq_shards[seq_shard_num].lock);
1030                     hash_remove(&file->wal->seq_shards[seq_shard_num].hash_byseq,
1031                                 &item->he_seq);
1032                     spin_unlock(&file->wal->seq_shards[seq_shard_num].lock);
1033 
1034                     if (item->action != WAL_ACT_REMOVE) {
1035                         atomic_sub_uint64_t(&file->wal->datasize, item->doc_size);
1036                     }
1037                     if (item->txn == &file->global_txn) {
1038                         atomic_decr_uint32_t(&file->wal->num_flushable);
1039                     }
1040                     free(item);
1041                     atomic_decr_uint32_t(&file->wal->size);
1042                 } else {
1043                     e2 = list_next(e2);
1044                 }
1045             }
1046             e1 = list_next(e1);
1047 
1048             if (list_begin(&header->items) == NULL) {
1049                 // wal_item_header becomes empty
1050                 // free header and remove from hash table & wal list
1051                 list_remove(&file->wal->key_shards[i].list, &header->list_elem);
1052                 hash_remove(&file->wal->key_shards[i].hash_bykey, &header->he_key);
1053                 free(header->key);
1054                 free(header);
1055 
1056                 if (committed) {
1057                     // this document was committed
1058                     // num_docs and num_deletes should be updated
1059                     if (committed_item_action == WAL_ACT_LOGICAL_REMOVE ||
1060                         committed_item_action == WAL_ACT_REMOVE) {
1061                         _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDELETES, -1);
1062                     }
1063                     _kvs_stat_update_attr(file, kv_id, KVS_STAT_WAL_NDOCS, -1);
1064                 }
1065             }
1066         }
1067         spin_unlock(&file->wal->key_shards[i].lock);
1068     }
1069 
1070     return FDB_RESULT_SUCCESS;
1071 }
1072 
wal_close(struct filemgr *file)1073 fdb_status wal_close(struct filemgr *file)
1074 {
1075     return _wal_close(file, WAL_DISCARD_UNCOMMITTED_ONLY, NULL);
1076 }
1077 
1078 // discard all WAL entries
wal_shutdown(struct filemgr *file)1079 fdb_status wal_shutdown(struct filemgr *file)
1080 {
1081     fdb_status wr = _wal_close(file, WAL_DISCARD_ALL, NULL);
1082     atomic_store_uint32_t(&file->wal->size, 0);
1083     atomic_store_uint32_t(&file->wal->num_flushable, 0);
1084     atomic_store_uint64_t(&file->wal->datasize, 0);
1085     return wr;
1086 }
1087 
1088 // discard all WAL entries belonging to KV_ID
wal_close_kv_ins(struct filemgr *file, fdb_kvs_id_t kv_id)1089 fdb_status wal_close_kv_ins(struct filemgr *file,
1090                             fdb_kvs_id_t kv_id)
1091 {
1092     return _wal_close(file, WAL_DISCARD_KV_INS, &kv_id);
1093 }
1094 
wal_get_size(struct filemgr *file)1095 size_t wal_get_size(struct filemgr *file)
1096 {
1097     return atomic_get_uint32_t(&file->wal->size);
1098 }
1099 
wal_get_num_all_shards(struct filemgr *file)1100 size_t wal_get_num_all_shards(struct filemgr *file)
1101 {
1102     // normal shards (shard[0] ~ shard[num_shard-1]) +
1103     // special shard (shard[num_shard]) for compactor
1104     return file->wal->num_shards + 1;
1105 }
1106 
wal_get_num_flushable(struct filemgr *file)1107 size_t wal_get_num_flushable(struct filemgr *file)
1108 {
1109     return atomic_get_uint32_t(&file->wal->num_flushable);
1110 }
1111 
wal_get_num_docs(struct filemgr *file)1112 size_t wal_get_num_docs(struct filemgr *file) {
1113     return _kvs_stat_get_sum(file, KVS_STAT_WAL_NDOCS);
1114 }
1115 
wal_get_num_deletes(struct filemgr *file)1116 size_t wal_get_num_deletes(struct filemgr *file) {
1117     return _kvs_stat_get_sum(file, KVS_STAT_WAL_NDELETES);
1118 }
1119 
wal_get_datasize(struct filemgr *file)1120 size_t wal_get_datasize(struct filemgr *file)
1121 {
1122     return atomic_get_uint64_t(&file->wal->datasize);
1123 }
1124 
wal_set_dirty_status(struct filemgr *file, wal_dirty_t status)1125 void wal_set_dirty_status(struct filemgr *file, wal_dirty_t status)
1126 {
1127     spin_lock(&file->wal->lock);
1128     file->wal->wal_dirty = status;
1129     spin_unlock(&file->wal->lock);
1130 }
1131 
wal_get_dirty_status(struct filemgr *file)1132 wal_dirty_t wal_get_dirty_status(struct filemgr *file)
1133 {
1134     wal_dirty_t ret;
1135     spin_lock(&file->wal->lock);
1136     ret = file->wal->wal_dirty;
1137     spin_unlock(&file->wal->lock);
1138     return ret;
1139 }
1140 
wal_add_transaction(struct filemgr *file, fdb_txn *txn)1141 void wal_add_transaction(struct filemgr *file, fdb_txn *txn)
1142 {
1143     spin_lock(&file->wal->lock);
1144     list_push_front(&file->wal->txn_list, &txn->wrapper->le);
1145     spin_unlock(&file->wal->lock);
1146 }
1147 
wal_remove_transaction(struct filemgr *file, fdb_txn *txn)1148 void wal_remove_transaction(struct filemgr *file, fdb_txn *txn)
1149 {
1150     spin_lock(&file->wal->lock);
1151     list_remove(&file->wal->txn_list, &txn->wrapper->le);
1152     spin_unlock(&file->wal->lock);
1153 }
1154 
wal_earliest_txn(struct filemgr *file, fdb_txn *cur_txn)1155 fdb_txn * wal_earliest_txn(struct filemgr *file, fdb_txn *cur_txn)
1156 {
1157     struct list_elem *le;
1158     struct wal_txn_wrapper *txn_wrapper;
1159     fdb_txn *txn;
1160     fdb_txn *ret = NULL;
1161     bid_t bid = BLK_NOT_FOUND;
1162 
1163     spin_lock(&file->wal->lock);
1164 
1165     le = list_begin(&file->wal->txn_list);
1166     while(le) {
1167         txn_wrapper = _get_entry(le, struct wal_txn_wrapper, le);
1168         txn = txn_wrapper->txn;
1169         if (txn != cur_txn && list_begin(txn->items)) {
1170             if (bid == BLK_NOT_FOUND || txn->prev_hdr_bid < bid) {
1171                 bid = txn->prev_hdr_bid;
1172                 ret = txn;
1173             }
1174         }
1175         le = list_next(le);
1176     }
1177     spin_unlock(&file->wal->lock);
1178 
1179     return ret;
1180 }
1181 
wal_txn_exists(struct filemgr *file)1182 bool wal_txn_exists(struct filemgr *file)
1183 {
1184     struct list_elem *le;
1185     struct wal_txn_wrapper *txn_wrapper;
1186     fdb_txn *txn;
1187 
1188     spin_lock(&file->wal->lock);
1189 
1190     le = list_begin(&file->wal->txn_list);
1191     while(le) {
1192         txn_wrapper = _get_entry(le, struct wal_txn_wrapper, le);
1193         txn = txn_wrapper->txn;
1194         if (txn != &file->global_txn) {
1195             spin_unlock(&file->wal->lock);
1196             return true;
1197         }
1198         le = list_next(le);
1199     }
1200     spin_unlock(&file->wal->lock);
1201 
1202     return false;
1203 }
1204