xref: /6.0.3/forestdb/tests/e2e/e2etest.cc (revision 506df2a9)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdint.h>
22#include <time.h>
23#include <errno.h>
24#if !defined(WIN32) && !defined(_WIN32)
25#include <unistd.h>
26#endif
27
28#include "filemgr_ops.h"
29
30#include "libforestdb/forestdb.h"
31#include "test.h"
32#include "e2espec.h"
33
34void load_persons(storage_t *st) {
35    int i, n=100;
36    person_t p;
37
38    // store and index person docs
39    for (i = 0; i < n; ++i) {
40        gen_person(&p);
41        e2e_fdb_set_person(st, &p);
42    }
43
44#ifdef __DEBUG_E2E
45    printf("[%s] load persons: %03d docs created\n",st->keyspace, n);
46#endif
47}
48
49void delete_persons(storage_t *st) {
50    TEST_INIT();
51
52    fdb_doc *rdoc = NULL;
53    fdb_status status;
54    fdb_iterator *it;
55    person_t *p;
56    int i, n = 0;
57
58    // delete every 5th doc
59    status = fdb_iterator_sequence_init(st->all_docs, &it, 0, 0,
60                                        FDB_ITR_NO_DELETES);
61    TEST_CHK(status == FDB_RESULT_SUCCESS);
62
63    i = 0;
64    do {
65        status = fdb_iterator_get(it, &rdoc);
66        TEST_CHK (status == FDB_RESULT_SUCCESS);
67        if ((i % 5) == 0) {
68
69            p = (person_t *)rdoc->body;
70            // ensure the requester has created this key
71            if (strcmp(p->keyspace, st->keyspace) == 0) {
72                e2e_fdb_del_person(st, p);
73                n++;
74            }
75
76        }
77        fdb_doc_free(rdoc);
78        rdoc=NULL;
79        i++;
80    } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
81
82    fdb_iterator_close(it);
83
84#ifdef __DEBUG_E2E
85    sprintf(rbuf, "[%s] delete persons: %03d docs deleted\n",st->keyspace, n);
86    TEST_RESULT(rbuf);
87#endif
88}
89
90/*
91 * reset params used to index storage
92 * delete old docs that are part of new index
93 * so that they are not included at verification time
94 */
95void update_index(storage_t *st, bool checkpointing) {
96
97    TEST_INIT();
98
99    fdb_iterator *it;
100    person_t *p = NULL;
101    fdb_doc *rdoc = NULL;
102    fdb_status status;
103    int n = 0;
104    size_t vallen;
105    char *mink = st->index_params->min;
106    char *maxk = st->index_params->max;
107    char rbuf[256];
108
109    // change storage index range
110    reset_storage_index(st);
111
112    if (checkpointing) {
113        start_checkpoint(st);
114    }
115    status = fdb_iterator_init(st->index1, &it, mink, 12,
116                               maxk, 12, FDB_ITR_NO_DELETES);
117    if (status != FDB_RESULT_SUCCESS) {
118        // no items within min max range
119        TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
120    }
121
122
123
124    do {
125        status = fdb_iterator_get(it, &rdoc);
126        if (status == FDB_RESULT_SUCCESS) {
127            status = fdb_get_kv(st->all_docs,
128                                rdoc->body, rdoc->bodylen,
129                                (void **)&p, &vallen);
130            if (status == FDB_RESULT_SUCCESS) {
131                if (strcmp(p->keyspace, st->keyspace) == 0) {
132                    e2e_fdb_del_person(st, p);
133                    n++;
134                }
135                free(p);
136                p=NULL;
137            }
138            fdb_doc_free(rdoc);
139            rdoc=NULL;
140        }
141    } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
142
143    if (checkpointing) {
144        end_checkpoint(st);
145    }
146
147    fdb_iterator_close(it);
148
149    // reset verification chkpoint
150    st->v_chk->num_indexed = 0;
151    st->v_chk->sum_age_indexed = 0;
152
153    sprintf(rbuf, "update index: %03d docs deleted", n);
154#ifdef __DEBUG_E2E
155    TEST_RESULT(rbuf);
156#endif
157}
158
159// --- verify ----
160// 1. check that num of keys within index are correct
161// 2. check that age index total is correct for specified range
162// 3. check that doc count is as expected
163void verify_db(storage_t *st) {
164
165    TEST_INIT();
166
167    checkpoint_t *db_checkpoint = create_checkpoint(st, END_CHECKPOINT);
168    int db_ndocs = db_checkpoint->ndocs;
169    int exp_ndocs = st->v_chk->ndocs;
170    int exp_nidx = st->v_chk->num_indexed;
171    int db_nidx = db_checkpoint->num_indexed;
172    int db_suma = db_checkpoint->sum_age_indexed;
173    int exp_suma = st->v_chk->sum_age_indexed;
174    fdb_kvs_info info;
175    char rbuf[256];
176
177    e2e_fdb_commit(st->main, st->walflush);
178
179    fdb_get_kvs_info(st->index1, &info);
180
181#ifdef __DEBUG_E2E
182    int val1, val2;
183    fdb_iterator *it;
184    fdb_doc *rdoc = NULL;
185    if (db_ndocs != exp_ndocs) {
186        // for debugging: currently inaccurate for concurrency patterns
187        fdb_get_kvs_info(st->all_docs, &info);
188        val1 = info.doc_count;
189        (void)val1;
190        val2 = 0;
191        fdb_iterator_init(st->index1, &it, NULL, 0,
192                          NULL, 0, FDB_ITR_NONE);
193        do {
194            fdb_iterator_get(it, &rdoc);
195            if (!rdoc->deleted) {
196                val2++;
197            }
198            fdb_doc_free(rdoc);
199            rdoc=NULL;
200        } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
201        printf("ndocs_debug: kvs_info(%d) == exp_ndocs(%d) ?\n", val1,exp_ndocs);
202        printf("ndocs_debug: kvs_info(%d) == itr_count(%d) ?\n", val1, val2);
203        fdb_iterator_close(it);
204    }
205    printf("[%s] db_ndix(%d) == exp_nidx(%d)\n", st->keyspace, db_nidx, exp_nidx);
206#endif
207
208    free(db_checkpoint);
209    db_checkpoint=NULL;
210    //TEST_CHK(db_nidx==exp_nidx);
211    //TEST_CHK(db_suma==exp_suma);
212
213    sprintf(rbuf, "[%s] verifydb: ndocs(%d=%d), nidx(%d=%d), sumage(%d=%d)\n",
214            st->keyspace,
215            db_ndocs, exp_ndocs,
216            db_nidx, exp_nidx,
217            db_suma, exp_suma);
218#ifdef __DEBUG_E2E
219    TEST_RESULT(rbuf);
220#endif
221}
222
223
224/*
225 * compares a db where src is typically from
226 * a rollback state of live data and replay is a
227 * db to use for comparison  of expected data
228 */
229void db_compare(fdb_kvs_handle *src, fdb_kvs_handle *replay) {
230
231    TEST_INIT();
232
233    int ndoc1, ndoc2;
234    fdb_kvs_info info;
235    fdb_iterator *it;
236    fdb_doc *rdoc = NULL;
237    fdb_doc *vdoc = NULL;
238    fdb_status status;
239    char rbuf[256];
240
241    fdb_get_kvs_info(src, &info);
242    ndoc1 = info.doc_count;
243    fdb_get_kvs_info(replay, &info);
244    ndoc2 = info.doc_count;
245
246    TEST_CHK(ndoc1 == ndoc2);
247
248    // all docs in replay db must be in source db with same status
249    status = fdb_iterator_sequence_init(replay, &it, 0, 0, FDB_ITR_NONE);
250    TEST_CHK(status == FDB_RESULT_SUCCESS);
251    do {
252        status = fdb_iterator_get_metaonly(it, &rdoc);
253        TEST_CHK(status == FDB_RESULT_SUCCESS);
254
255        fdb_doc_create(&vdoc, rdoc->key, rdoc->keylen,
256                              rdoc->meta, rdoc->metalen,
257                              rdoc->body, rdoc->bodylen);
258        // lookup by key
259        status = fdb_get(src, vdoc);
260
261        if (rdoc->deleted) {
262            TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
263        } else {
264            TEST_CHK(status == FDB_RESULT_SUCCESS);
265        }
266
267        fdb_doc_free(rdoc);
268        fdb_doc_free(vdoc);
269        rdoc=NULL;
270        vdoc=NULL;
271
272    } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
273    fdb_iterator_close(it);
274
275    sprintf(rbuf, "db compare: src(%d) == replay(%d)", ndoc1, ndoc2);
276#ifdef __DEBUG_E2E
277    TEST_RESULT(rbuf);
278#endif
279}
280
281/*
282 * populate replay db up to specified seqnum
283 */
284void load_replay_kvs(storage_t *st, fdb_kvs_handle *replay_kvs, fdb_seqnum_t seqnum) {
285
286    TEST_INIT();
287
288    fdb_iterator *it;
289    fdb_doc *rdoc = NULL;
290    fdb_status status;
291    transaction_t *tx;
292
293
294    // iterator end at seqnum
295    status = fdb_iterator_sequence_init(st->rtx, &it, 0, seqnum,
296                                        FDB_ITR_NONE);
297    TEST_CHK(status == FDB_RESULT_SUCCESS);
298
299    do {
300        status = fdb_iterator_get(it, &rdoc);
301        TEST_CHK(status == FDB_RESULT_SUCCESS);
302
303        tx = (transaction_t *)rdoc->body;
304        if (tx->type == SET_PERSON) {
305            status = fdb_set_kv(replay_kvs,
306                                tx->refkey,
307                                tx->refkey_len,
308                                NULL,0);
309            TEST_CHK(status == FDB_RESULT_SUCCESS);
310        }
311        if (tx->type == DEL_PERSON) {
312            status = fdb_del_kv(replay_kvs,
313                                tx->refkey,
314                                tx->refkey_len);
315            TEST_CHK(status == FDB_RESULT_SUCCESS);
316        }
317        fdb_doc_free(rdoc);
318        rdoc=NULL;
319
320    } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
321    fdb_iterator_close(it);
322
323}
324
325/*
326 * replay records db up to a checkpoint into another db
327 * rollback all_docs to that checkpoint
328 * compare all_docs at that state to new doc
329 */
330void replay(storage_t *st) {
331    TEST_INIT();
332
333    int i;
334    size_t v;
335    char kvsbuf[10];
336    fdb_file_handle *dbfile;
337    fdb_kvs_handle *replay_kvs;
338    fdb_config fconfig = fdb_get_default_config();
339    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
340    fconfig.wal_threshold = 1024;
341    fconfig.flags = FDB_OPEN_FLAG_CREATE;
342    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
343    fconfig.compaction_threshold = 10;
344    fdb_iterator *it;
345    fdb_status status;
346    fdb_doc *rdoc = NULL;
347    fdb_kvs_info info;
348    transaction_t *tx;
349    checkpoint_t *chk;
350    fdb_seqnum_t rollback_seqnum;
351
352    // create replay kvs
353    status = fdb_open(&dbfile, E2EDB_RECORDS, &fconfig);
354    TEST_CHK(status == FDB_RESULT_SUCCESS);
355
356    e2e_fdb_commit(st->main, st->walflush);
357    status = fdb_get_kvs_info(st->all_docs, &info);
358    TEST_CHK(status == FDB_RESULT_SUCCESS);
359
360
361    // iterate over records kv and replay transactions
362    status = fdb_iterator_sequence_init(st->rtx, &it, 0, 0, FDB_ITR_NONE);
363    TEST_CHK(status == FDB_RESULT_SUCCESS);
364
365    // seek to end so we can reverse iterate
366    // seq iterators cannot seek
367    do {
368        ;
369    } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
370
371
372    // reverse iterate from highest to lowest checkpoint
373    i=0;
374    while (fdb_iterator_prev(it) != FDB_RESULT_ITERATOR_FAIL) {
375        status = fdb_iterator_get(it, &rdoc);
376        TEST_CHK(status == FDB_RESULT_SUCCESS);
377        tx = (transaction_t *)rdoc->body;
378        if (tx->type == END_CHECKPOINT) {
379
380            sprintf(kvsbuf, "rkvs%d", i);
381            status = fdb_kvs_open(dbfile, &replay_kvs, kvsbuf,  &kvs_config);
382            TEST_CHK(status == FDB_RESULT_SUCCESS);
383
384            // load replay db up to this seqnum
385            load_replay_kvs(st, replay_kvs, rdoc->seqnum);
386
387            // get checkpoint doc for rollback
388            status = fdb_get_kv(st->chk, tx->refkey, tx->refkey_len,
389                                (void **)&chk, &v);
390
391            TEST_CHK(status == FDB_RESULT_SUCCESS);
392            rollback_seqnum = chk->seqnum_all;
393;
394#ifdef __DEBUG_E2E
395            printf("rollback to %llu\n", chk->seqnum_all);
396#endif
397            status = fdb_rollback(&st->all_docs, rollback_seqnum);
398            TEST_CHK(status == FDB_RESULT_SUCCESS);
399            free(chk);
400            chk=NULL;
401            // after rollback, WAL entries should be flushed for
402            // accurate # docs count comparison with 'replay_kvs'.
403            e2e_fdb_commit(st->main, true);
404
405            status = fdb_get_kvs_info(st->rtx, &info);
406            TEST_CHK(status == FDB_RESULT_SUCCESS);
407
408            // compare rollback and replay db
409            e2e_fdb_commit(dbfile, st->walflush);
410            db_compare(st->all_docs, replay_kvs);
411
412            // drop replay kvs
413            status = fdb_kvs_close(replay_kvs);
414            TEST_CHK(status == FDB_RESULT_SUCCESS);
415            status = fdb_kvs_remove(dbfile, kvsbuf);
416            TEST_CHK(status == FDB_RESULT_SUCCESS);
417            i++;
418        }
419        fdb_doc_free(rdoc);
420        rdoc=NULL;
421    }
422
423    fdb_iterator_close(it);
424    fdb_close(dbfile);
425
426}
427
428/* do forward previous and seek operations on main kv */
429void *iterate_thread(void *args) {
430
431    TEST_INIT();
432
433    int i, j;
434    fdb_config *fconfig = (fdb_config *)args;
435    fdb_file_handle *dbfile;
436    fdb_iterator *it;
437    fdb_doc *rdoc = NULL;
438    fdb_open(&dbfile, E2EDB_MAIN, fconfig);
439    fdb_kvs_handle *all_docs, *snap_db;
440    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
441    fdb_status status;
442    person_t p;
443
444    for (i = 0; i < 50; i++) {
445        fdb_kvs_open(dbfile, &all_docs, E2EKV_ALLDOCS,  &kvs_config);
446
447        if ((i % 2) == 0) { //snapshot
448            status = fdb_snapshot_open(all_docs, &snap_db, FDB_SNAPSHOT_INMEM);
449            TEST_CHK(status == FDB_RESULT_SUCCESS);
450            status = fdb_iterator_init(snap_db, &it, NULL, 0, NULL, 0, FDB_ITR_NONE);
451            TEST_CHK(status == FDB_RESULT_SUCCESS);
452        } else {
453            status = fdb_iterator_init(all_docs, &it, NULL, 0, NULL, 0, FDB_ITR_NONE);
454            TEST_CHK(status == FDB_RESULT_SUCCESS);
455        }
456        j = 0;
457        // forward
458        do {
459            status = fdb_iterator_get(it, &rdoc);
460            if (j) {
461                TEST_CHK(status == FDB_RESULT_SUCCESS);
462            }
463            fdb_doc_free(rdoc);
464            rdoc = NULL;
465            j++;
466        } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
467        // reverse and seek ahead
468        for (j = 0; j < 10; j++) {
469            while (fdb_iterator_prev(it) != FDB_RESULT_ITERATOR_FAIL) {
470                status = fdb_iterator_get(it, &rdoc);
471                TEST_CHK(status == FDB_RESULT_SUCCESS);
472                fdb_doc_free(rdoc);
473                rdoc = NULL;
474            }
475            gen_person(&p);
476
477            // seek using random person key
478            if (j % 2 == 0) { // seek higher
479                fdb_iterator_seek(it, p.key, strlen(p.key), FDB_ITR_SEEK_HIGHER);
480            } else {
481                fdb_iterator_seek(it, p.key, strlen(p.key), FDB_ITR_SEEK_LOWER);
482            }
483        }
484        fdb_iterator_close(it);
485        if ((i % 2) == 0) {
486            fdb_kvs_close(snap_db);
487        }
488        fdb_kvs_close(all_docs);
489    }
490
491    fdb_doc_free(rdoc);
492    rdoc = NULL;
493    fdb_close(dbfile);
494    return NULL;
495}
496
497void *compact_thread(void *args) {
498    int i;
499    fdb_config *fconfig = (fdb_config *)args;
500    fdb_file_handle *dbfile;
501    fdb_open(&dbfile, E2EDB_MAIN, fconfig);
502    for (i = 0; i < 3; ++i) {
503        sleep(2);
504#ifdef __DEBUG_E2E
505        printf("compact: %d\n", i);
506#endif
507        fdb_compact(dbfile, NULL);
508    }
509    fdb_close(dbfile);
510    return NULL;
511}
512
513void *compact_upto_thread(void *args) {
514    TEST_INIT();
515
516    int i;
517    uint64_t num_markers;
518    fdb_snapshot_info_t *markers;
519    fdb_status status;
520
521    fdb_config *fconfig = (fdb_config *)args;
522    fdb_file_handle *dbfile;
523    fdb_open(&dbfile, E2EDB_MAIN, fconfig);
524    for (i = 0; i < 10; ++i) {
525        sleep(1);
526        status = fdb_get_all_snap_markers(dbfile, &markers,
527                                          &num_markers);
528        TEST_CHK(status == FDB_RESULT_SUCCESS);
529        if (num_markers == 0) {
530            // No need of compaction as file is empty
531            break;
532        }
533        status = fdb_compact_upto(dbfile, NULL, markers[0].marker);
534        TEST_CHK(status == FDB_RESULT_SUCCESS);
535        fdb_free_snap_markers(markers, num_markers);
536    }
537    fdb_close(dbfile);
538    return NULL;
539}
540
541void e2e_async_compact_pattern(int n_checkpoints, fdb_config fconfig,
542                               bool deletes, bool walflush) {
543    TEST_INIT();
544    int n, i;
545    storage_t *st;
546    checkpoint_t verification_checkpoint;
547    idx_prams_t index_params;
548    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
549    fdb_kvs_info info;
550    fdb_kvs_handle *snap_db;
551    fdb_status status;
552    thread_t tid;
553    void *thread_ret;
554    n_checkpoints = n_checkpoints * LOAD_FACTOR;
555
556    memleak_start();
557
558    // init
559    rm_storage_fs();
560    gen_index_params(&index_params);
561    memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
562
563    // setup
564    st = init_storage(&fconfig, &fconfig, &kvs_config,
565            &index_params, &verification_checkpoint, walflush);
566
567    // create compaction thread
568    thread_create(&tid, compact_thread, (void*)&fconfig);
569
570    // test
571    for ( n = 0; n < n_checkpoints; ++n) {
572
573#ifdef __DEBUG_E2E
574        printf("checkpoint: %d\n", n);
575#endif
576        load_persons(st);
577        for (i=0;i<100;++i) {
578#ifdef __DEBUG_E2E
579            printf("\n\n----%d----\n", i);
580#endif
581            load_persons(st);
582            if (deletes) {
583                delete_persons(st);
584            }
585            e2e_fdb_commit(st->main, walflush);
586            status = fdb_get_kvs_info(st->all_docs, &info);
587            TEST_CHK(status == FDB_RESULT_SUCCESS);
588            status = fdb_snapshot_open(st->all_docs, &snap_db, info.last_seqnum);
589            TEST_CHK(status == FDB_RESULT_SUCCESS);
590            update_index(st, true);
591            verify_db(st);
592            fdb_kvs_close(snap_db);
593        }
594    }
595
596    thread_join(tid, &thread_ret);
597    // teardown
598    e2e_fdb_shutdown(st);
599
600    memleak_end();
601}
602
603void e2e_kvs_index_pattern(int n_checkpoints, fdb_config fconfig,
604                           bool deletes, bool walflush) {
605
606    int n, i;
607    storage_t *st;
608    checkpoint_t verification_checkpoint;
609    idx_prams_t index_params;
610    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
611    n_checkpoints = n_checkpoints * LOAD_FACTOR;
612
613    memleak_start();
614
615    // init
616    rm_storage_fs();
617    gen_index_params(&index_params);
618    memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
619
620    // setup
621    st = init_storage(&fconfig, &fconfig, &kvs_config,
622                      &index_params, &verification_checkpoint, walflush);
623
624    // test
625    for (n = 0; n < n_checkpoints; ++n) {
626
627        // checkpoint
628        start_checkpoint(st);
629
630        for (i = 0; i < 100; ++i) {
631#ifdef __DEBUG_E2E
632            printf("\n\n----%d----\n", i);
633#endif
634            load_persons(st);
635            if (deletes) {
636                delete_persons(st);
637            }
638            verify_db(st);
639        }
640
641        // end checkpoint
642        end_checkpoint(st);
643
644        // change index range
645        update_index(st, true);
646        verify_db(st);
647
648    }
649
650    if (fconfig.compaction_mode != FDB_COMPACTION_AUTO) {
651        /* replay involves rollbacks but
652         * cannot rollback pre compact due to BUG: MB-13130
653         */
654        replay(st);
655    }
656
657    // teardown
658    e2e_fdb_shutdown(st);
659
660    memleak_end();
661}
662
663void *scan_thread(void *args) {
664    int i = 0;
665    fdb_kvs_handle *scan_kv = (fdb_kvs_handle *)args;
666
667    for (i = 0; i < 20; ++i) {
668        scan(NULL, scan_kv);
669    }
670    return NULL;
671}
672
673void *disk_read_thread(void *args) {
674    TEST_INIT();
675    int n;
676    uint64_t i;
677    int seqno;
678    uint64_t num_markers;
679    fdb_snapshot_info_t *markers;
680    fdb_kvs_handle *snap_db;
681    fdb_status status;
682    fdb_doc *doc;
683    person_t p;
684    fdb_doc_create(&doc, NULL, 0, NULL, 0, NULL, 0);
685
686    storage_t *st = (storage_t *)args;
687
688    for (n = 0; n < 10; ++n) {
689        status = fdb_get_all_snap_markers(st->main, &markers,
690                                          &num_markers);
691        TEST_CHK(status == FDB_RESULT_SUCCESS);
692        for (i = 0; i < num_markers; i++) {
693            // open disk snapshot to each marker
694            seqno = markers[i].kvs_markers[0].seqnum;
695            if (!seqno) {
696                continue;
697            }
698            status = fdb_snapshot_open(st->all_docs, &snap_db, seqno);
699            if (status == FDB_RESULT_SUCCESS) {
700                doc->seqnum = seqno;
701                // can get doc
702                status = fdb_get_byseq(snap_db, doc);
703                TEST_CHK(status == FDB_RESULT_SUCCESS);
704
705                // verify
706                memcpy(&p, (person_t *)doc->body, sizeof(person_t));
707                TEST_CHK(p.age <= seqno)
708            }
709        }
710        fdb_free_snap_markers(markers, num_markers);
711    }
712
713    fdb_doc_free(doc);
714    return NULL;
715}
716
717
718void *writer_thread(void *args) {
719
720    storage_t *st = (storage_t *)args;
721    for (int i = 0; i < 5; ++i) {
722        load_persons(st);
723        if (i == 5) {
724            delete_persons(st);
725        }
726    }
727    return NULL;
728}
729
730void *seq_writer_thread(void *args) {
731
732    int j, i = 0, n=10000;
733    person_t p;
734    storage_t *st = (storage_t *)args;
735    for (j = 0; j < 10; ++j) {
736        for (i = 0; i < n; ++i) {
737            gen_person(&p);
738            p.age = i;
739            sprintf(p.key, "person%d", i);
740            e2e_fdb_set_person(st, &p);
741        }
742        e2e_fdb_commit(st->main, true);
743    }
744    return NULL;
745}
746
747void *update_thread(void *args) {
748    TEST_INIT();
749
750    int i = 0, n=100000;
751    person_t p;
752    fdb_kvs_handle *kv = (fdb_kvs_handle *)args;
753
754    // only generate 1 person type
755    gen_person(&p);
756    for (i = 0; i < n; ++i) {
757        fdb_set_kv(kv, p.key, strlen(p.key),
758                   p.name, strlen(p.name));
759    }
760    return NULL;
761}
762
763
764/*
765 * perform many fdb features against concurrent handlers
766 * to verify robustness of db.  this pattern is qualified by
767 * completion without faults data corrections and has relaxed error handling.
768 */
769void e2e_robust_pattern(fdb_config fconfig) {
770
771    int i;
772    int n_writers = 10;
773    int n_scanners = 10;
774    storage_t **st = alca(storage_t *, n_writers);
775    storage_t **st2 = alca(storage_t *, n_writers);
776    checkpoint_t *verification_checkpoint = alca(checkpoint_t, n_writers);
777    idx_prams_t *index_params = alca(idx_prams_t, n_writers);
778    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
779    thread_t *tid_sc = alca(thread_t, n_scanners);
780    void **thread_ret_sc = alca(void *, n_scanners);
781    thread_t *tid_wr = alca(thread_t, n_writers);
782    void **thread_ret_wr = alca(void *, n_writers);
783    fdb_kvs_handle **scan_kv = alca(fdb_kvs_handle *, n_scanners);
784    thread_t c_tid, i_tid;
785    void *c_ret, *i_ret;
786
787    memleak_start();
788
789    // init
790    rm_storage_fs();
791
792    // init storage handles
793    // the nth writer is commit handler
794    for (i = 0;i < n_writers; ++i) {
795        gen_index_params(&index_params[i]);
796        memset(&verification_checkpoint[i], 0, sizeof(checkpoint_t));
797        st[i] = init_storage(&fconfig, &fconfig, &kvs_config,
798                &index_params[i], &verification_checkpoint[i], true);
799        st2[i] = init_storage(&fconfig, &fconfig, &kvs_config,
800                &index_params[i], &verification_checkpoint[i], true);
801        memcpy(st2[i]->keyspace, st[i]->keyspace, KEYSPACE_LEN);
802    }
803
804    // load init data
805    for (i = 0; i < 100; ++i) {
806        load_persons(st[0]);
807    }
808
809    thread_create(&c_tid, compact_thread, (void*)&fconfig);
810    thread_create(&i_tid, iterate_thread, (void*)&fconfig);
811
812    for (int n = 0; n < 10; ++n) {
813        // start writer threads
814        for (i = 0; i < n_writers - 1; ++i) {
815            st[i]->verify_set = false;
816            thread_create(&tid_wr[i], writer_thread, (void*)st[i]);
817        }
818
819
820        // start scanner threads
821        for (i = 0; i < n_scanners; ++i) {
822            scan_kv[i] = scan(st2[i], NULL);
823            thread_create(&tid_sc[i], scan_thread, (void*)scan_kv[i]);
824        }
825
826        e2e_fdb_commit(st[n_writers-1]->main, true);
827
828        // join scan threads
829        for (i = 0; i < n_scanners; ++i) {
830            thread_join(tid_sc[i], &thread_ret_sc[i]);
831            fdb_kvs_close(scan_kv[i]);
832        }
833
834        // join writer threads
835        for (i = 0; i < n_writers - 1; ++i) {
836            thread_join(tid_wr[i], &thread_ret_wr[i]);
837            update_index(st[i], false);
838        }
839        e2e_fdb_commit(st[n_writers - 1]->main, false);
840    }
841
842    thread_join(c_tid, &c_ret);
843    thread_join(i_tid, &i_ret);
844
845    for (i = 0; i < n_writers; ++i) {
846        // teardown
847        e2e_fdb_close(st2[i]);
848        e2e_fdb_close(st[i]);
849    }
850    fdb_shutdown();
851
852    memleak_end();
853}
854
855/*
856 * concurrent scan pattern:
857 *   start n_scanners and n_writers
858 *   scanners share in-mem snapshots and writers use copies of storage_t
859 */
860void e2e_concurrent_scan_pattern(int n_checkpoints, int n_scanners, int n_writers,
861                                 fdb_config fconfig, bool walflush) {
862
863    int n, i;
864    storage_t **st = alca(storage_t *, n_writers);
865    storage_t **st2 = alca(storage_t *, n_writers);
866    checkpoint_t *verification_checkpoint = alca(checkpoint_t, n_writers);
867    idx_prams_t *index_params = alca(idx_prams_t, n_writers);
868    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
869    thread_t *tid_sc = alca(thread_t, n_scanners);
870    void **thread_ret_sc = alca(void *, n_scanners);
871    thread_t *tid_wr = alca(thread_t, n_writers);
872    void **thread_ret_wr = alca(void *, n_writers);
873    fdb_kvs_handle **scan_kv = alca(fdb_kvs_handle *, n_scanners);
874    n_checkpoints = n_checkpoints * LOAD_FACTOR;
875
876    memleak_start();
877
878    // init
879    rm_storage_fs();
880
881
882    // init storage handles
883    for (i = 0; i < n_writers; ++i) {
884        gen_index_params(&index_params[i]);
885        memset(&verification_checkpoint[i], 0, sizeof(checkpoint_t));
886        st[i] = init_storage(&fconfig, &fconfig, &kvs_config,
887                &index_params[i], &verification_checkpoint[i], walflush);
888        st2[i] = init_storage(&fconfig, &fconfig, &kvs_config,
889                &index_params[i], &verification_checkpoint[i], walflush);
890        memcpy(st2[i]->keyspace, st[i]->keyspace, KEYSPACE_LEN);
891    }
892
893    // load init data
894    start_checkpoint(st[0]);
895    for (i = 0; i < 100; ++i) {
896        load_persons(st[0]);
897    }
898    end_checkpoint(st[0]);
899    verify_db(st[0]);
900
901    for (n = 0; n < n_checkpoints; ++n) {
902
903        // start writer threads
904        for (i = 0; i < n_writers; ++i) {
905            st[i]->verify_set = false;
906            start_checkpoint(st[i]);
907            thread_create(&tid_wr[i], writer_thread, (void*)st[i]);
908        }
909
910        // start scanner threads
911        for (i = 0; i < n_scanners; ++i) {
912            scan_kv[i] = scan(st2[i], NULL);
913            thread_create(&tid_sc[i], scan_thread, (void*)scan_kv[i]);
914        }
915
916        // join scan threads
917        for (i = 0; i < n_scanners; ++i) {
918            thread_join(tid_sc[i], &thread_ret_sc[i]);
919            fdb_kvs_close(scan_kv[i]);
920        }
921
922        // join writer threads
923        for (i = 0; i < n_writers; ++i) {
924            thread_join(tid_wr[i], &thread_ret_wr[i]);
925            end_checkpoint(st[i]);
926            verify_db(st[i]);
927            update_index(st[i], true);
928        }
929
930    }
931
932    for (i = 0; i < n_writers; ++i) {
933        // teardown
934        e2e_fdb_close(st2[i]);
935        e2e_fdb_close(st[i]);
936    }
937    fdb_shutdown();
938
939    memleak_end();
940}
941
942
943void e2e_index_basic_test() {
944
945    TEST_INIT();
946    memleak_start();
947
948    randomize();
949    // configure
950    fdb_config fconfig = fdb_get_default_config();
951    fconfig.wal_threshold = 1024;
952    fconfig.flags = FDB_OPEN_FLAG_CREATE;
953    fconfig.durability_opt = FDB_DRB_ASYNC;
954    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
955
956    // test
957    e2e_kvs_index_pattern(1, fconfig, true, false); // normal commit
958    e2e_kvs_index_pattern(1, fconfig, true, true);  // wal commit
959
960    memleak_end();
961    TEST_RESULT("TEST: e2e index basic test");
962}
963
964void e2e_index_walflush_test_no_deletes_auto_compact() {
965
966    TEST_INIT();
967    memleak_start();
968
969    randomize();
970    // configure
971    fdb_config fconfig = fdb_get_default_config();
972    fconfig.wal_threshold = 1024;
973    fconfig.flags = FDB_OPEN_FLAG_CREATE;
974    fconfig.durability_opt = FDB_DRB_ASYNC;
975    fconfig.compaction_mode=FDB_COMPACTION_AUTO;
976    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
977
978    // test
979    e2e_kvs_index_pattern(10, fconfig, false, true);
980
981    memleak_end();
982    TEST_RESULT("TEST: e2e index walflush test no deletes auto compact");
983}
984
985void e2e_index_walflush_autocompact_test() {
986
987    TEST_INIT();
988    memleak_start();
989
990    randomize();
991    // opts
992    fdb_config fconfig = fdb_get_default_config();
993    fconfig.wal_threshold = 1024;
994    fconfig.flags = FDB_OPEN_FLAG_CREATE;
995    fconfig.durability_opt = FDB_DRB_ASYNC;
996    fconfig.compaction_mode=FDB_COMPACTION_AUTO;
997    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
998
999    // test
1000    e2e_kvs_index_pattern(2, fconfig, true, true);
1001
1002    memleak_end();
1003    TEST_RESULT("TEST: e2e index walflush autocompact test");
1004
1005}
1006
1007void e2e_index_normal_commit_autocompact_test() {
1008
1009    TEST_INIT();
1010    memleak_start();
1011
1012    randomize();
1013    // opts
1014    fdb_config fconfig = fdb_get_default_config();
1015    fconfig.wal_threshold = 1024;
1016    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1017    fconfig.durability_opt = FDB_DRB_NONE;
1018    fconfig.compaction_mode=FDB_COMPACTION_AUTO;
1019    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1020
1021    // test
1022    e2e_kvs_index_pattern(2, fconfig, true, false);
1023
1024    memleak_end();
1025    TEST_RESULT("TEST: e2e index normal commit autocompact test");
1026}
1027
1028void e2e_async_manual_compact_test() {
1029    TEST_INIT();
1030    memleak_start();
1031
1032    randomize();
1033    // opts
1034    fdb_config fconfig = fdb_get_default_config();
1035    fconfig.wal_threshold = 1024;
1036    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1037    fconfig.durability_opt = FDB_DRB_ASYNC;
1038
1039    // test
1040    e2e_async_compact_pattern(10, fconfig, false, true);
1041    memleak_end();
1042    TEST_RESULT("TEST: e2e async manual compact test");
1043}
1044
1045
1046void e2e_concurrent_scan_test() {
1047    TEST_INIT();
1048    memleak_start();
1049
1050    randomize();
1051    fdb_config fconfig = fdb_get_default_config();
1052    fconfig.wal_threshold = 1024;
1053    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1054    fconfig.compaction_mode=FDB_COMPACTION_AUTO;
1055    fconfig.durability_opt = FDB_DRB_ASYNC;
1056    fconfig.purging_interval = 30; // retain deleted docs for iteration
1057    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1058
1059    // test
1060    e2e_concurrent_scan_pattern(3, 5, 5, fconfig, true);
1061    // normal_commit
1062    e2e_concurrent_scan_pattern(3, 5, 5, fconfig, false);
1063
1064    memleak_end();
1065    TEST_RESULT("TEST: e2e concurrent scan");
1066}
1067
1068void e2e_robust_test() {
1069    TEST_INIT();
1070
1071    randomize();
1072    fdb_config fconfig = fdb_get_default_config();
1073    fconfig.wal_threshold = 1024;
1074    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1075    fconfig.compaction_mode=FDB_COMPACTION_AUTO;
1076    fconfig.durability_opt = FDB_DRB_ASYNC;
1077    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1078    // to allow iterators to validate docs across async compaction
1079    // specify purging_interval so deleted docs are not dropped by
1080    // compactor immediately..
1081    fconfig.purging_interval = 80;
1082
1083    // test
1084    e2e_robust_pattern(fconfig);
1085
1086    TEST_RESULT("TEST: e2e robust test");
1087}
1088
1089void e2e_scan_compact_upto_test() {
1090    TEST_INIT();
1091    memleak_start();
1092
1093    randomize();
1094    int n, i;
1095    void *thread_ret;
1096    bool walflush = true;
1097    int n_checkpoints =  LOAD_FACTOR;
1098
1099    fdb_config fconfig = fdb_get_default_config();
1100    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1101    fconfig.compaction_threshold = 0;
1102    fconfig.durability_opt = FDB_DRB_ASYNC;
1103    fconfig.purging_interval = 30; // retain deleted docs for iteration
1104
1105    storage_t *st;
1106    checkpoint_t verification_checkpoint;
1107    idx_prams_t index_params;
1108    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1109    fdb_kvs_info info;
1110    fdb_kvs_handle *snap_db;
1111    fdb_status status;
1112    thread_t tid;
1113
1114    // init
1115    rm_storage_fs();
1116    gen_index_params(&index_params);
1117    memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
1118
1119    // setup
1120    st = init_storage(&fconfig, &fconfig, &kvs_config,
1121                      &index_params, &verification_checkpoint, walflush);
1122
1123    // create compaction thread
1124    thread_create(&tid, compact_upto_thread, (void*)&fconfig);
1125
1126    // test
1127    for (n = 0; n < n_checkpoints; ++n) {
1128        load_persons(st);
1129        for (i = 0; i < 100; ++i) {
1130            load_persons(st);
1131            e2e_fdb_commit(st->main, walflush);
1132            status = fdb_get_kvs_info(st->all_docs, &info);
1133            TEST_CHK(status == FDB_RESULT_SUCCESS);
1134            fdb_snapshot_open(st->all_docs, &snap_db, info.last_seqnum);
1135            TEST_CHK(status == FDB_RESULT_SUCCESS);
1136            update_index(st, true);
1137            verify_db(st);
1138            fdb_kvs_close(snap_db);
1139        }
1140    }
1141
1142    thread_join(tid, &thread_ret);
1143
1144    // teardown
1145    e2e_fdb_shutdown(st);
1146    memleak_end();
1147    TEST_RESULT("TEST: e2e concurrent compact upto");
1148}
1149
1150void *kv_thread(void *args) {
1151    // This thread stores and indexes person docs
1152
1153    TEST_INIT();
1154    uint64_t i;
1155    uint64_t num_markers;
1156    fdb_snapshot_info_t *markers;
1157    fdb_status status;
1158
1159    storage_t *st = (storage_t *)args;
1160    status = fdb_get_all_snap_markers(st->main, &markers,
1161                                      &num_markers);
1162    TEST_CHK(status == FDB_RESULT_SUCCESS);
1163
1164    for (i = 0; i < num_markers; i += 10) {
1165        load_persons(st);
1166    }
1167    fdb_free_snap_markers(markers, num_markers);
1168    return NULL;
1169}
1170
1171// This function identifies the latest superblock
1172// add adds garbage to it.
1173void corrupt_latest_superblock(const char* filename) {
1174    /*
1175     * Note that each block is 4096 bytes.
1176     * - There are 4 superblocks which constitute the
1177     *   first 4 blocks of the file.
1178     * - The 8 bytes following the first 8 bytes of a
1179     *   superblock contains the block revision num.
1180     */
1181    struct filemgr_ops *ops = get_filemgr_ops();
1182    int64_t offset = 8;
1183    int latest_sb = 0;
1184    int fd = ops->open(filename, O_RDWR, 0644);
1185    uint64_t buf, highest_rev = 0;
1186    for (int i = 0; i < 4; ++i) {    // num of superblocks: 4
1187        if (ops->pread(fd, &buf, sizeof(uint64_t), offset) == sizeof(uint64_t)) {
1188            buf = _endian_decode(buf);
1189            assert(buf != highest_rev);
1190            if (buf > highest_rev) {
1191                highest_rev = buf;
1192                latest_sb = i;
1193            }
1194            offset += 4096;
1195        } else {
1196            fprintf(stderr, "Warning: Could not find the latest superblock!\n");
1197            ops->close(fd);
1198            return;
1199        }
1200    }
1201    // Write garbage at a random offset that would fall within
1202    // the latest super block
1203    uint64_t garbage = rand();
1204    offset = latest_sb * 4096 + (rand() % (4095 - sizeof("garbage")));
1205    if (ops->pwrite(fd, &garbage, sizeof(garbage), offset) != sizeof(garbage)) {
1206        fprintf(stderr,
1207                "\nWarning: Could not write garbage into the superblock!");
1208    }
1209    ops->close(fd);
1210}
1211
1212void e2e_crash_recover_test(bool do_rollback) {
1213
1214    TEST_INIT();
1215    memleak_start();
1216
1217    randomize();
1218
1219    bool walflush = true;
1220    int i, n;
1221    int n_checkpoints =  2;
1222
1223    rm_storage_fs();
1224    storage_t *st;
1225    checkpoint_t verification_checkpoint;
1226    idx_prams_t index_params;
1227    fdb_kvs_handle *snap_db;
1228    fdb_kvs_info info;
1229    fdb_status status;
1230
1231    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1232    fdb_config fconfig = fdb_get_default_config();
1233    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1234    fconfig.compaction_threshold = 0;
1235    fconfig.durability_opt = FDB_DRB_ASYNC;
1236    fconfig.purging_interval = 30; // retain deleted docs for iteration
1237
1238    gen_index_params(&index_params);
1239    memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
1240    st = init_storage(&fconfig, &fconfig, &kvs_config,
1241                      &index_params, &verification_checkpoint, walflush);
1242
1243    for (i = 0; i < 100; ++i) {
1244        load_persons(st);
1245        e2e_fdb_commit(st->main, true);
1246    }
1247
1248    fdb_seqnum_t seqno;
1249    uint64_t k, num_markers;
1250    fdb_snapshot_info_t *markers;
1251
1252    if (do_rollback) {
1253        status = fdb_get_all_snap_markers(st->main, &markers,
1254                                          &num_markers);
1255        TEST_CHK(status == FDB_RESULT_SUCCESS);
1256
1257        for (k = 0; k < num_markers; k += 10) {
1258            // rollback to each marker
1259            seqno = markers[k].kvs_markers[0].seqnum;
1260            status = fdb_rollback(&st->all_docs, seqno);
1261            TEST_CHK(status == FDB_RESULT_SUCCESS);
1262        }
1263        fdb_free_snap_markers(markers, num_markers);
1264    } else {
1265        status = fdb_get_all_snap_markers(st->main, &markers,
1266                                          &num_markers);
1267        TEST_CHK(status == FDB_RESULT_SUCCESS);
1268
1269        for (k = 0; k < num_markers; k += 10) {
1270            load_persons(st);
1271        }
1272        fdb_free_snap_markers(markers, num_markers);
1273    }
1274
1275    // close storage
1276    e2e_fdb_close(st);
1277
1278    corrupt_latest_superblock(E2EDB_MAIN);
1279
1280    // reopen storage
1281    gen_index_params(&index_params);
1282    memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
1283    st = init_storage(&fconfig, &fconfig, &kvs_config,
1284                      &index_params, &verification_checkpoint, walflush);
1285
1286    // run verifiable workload
1287    for (n = 0; n < n_checkpoints; ++n) {
1288        load_persons(st);
1289        for (i = 0; i < 100; ++i) {
1290            load_persons(st);
1291            e2e_fdb_commit(st->main, walflush);
1292            status = fdb_get_kvs_info(st->all_docs, &info);
1293            TEST_CHK(status == FDB_RESULT_SUCCESS);
1294            fdb_snapshot_open(st->all_docs, &snap_db, info.last_seqnum);
1295            TEST_CHK(status == FDB_RESULT_SUCCESS);
1296            update_index(st, true);
1297            verify_db(st);
1298            fdb_kvs_close(snap_db);
1299        }
1300    }
1301
1302    // teardown
1303    e2e_fdb_shutdown(st);
1304    memleak_end();
1305    if (do_rollback) {
1306        TEST_RESULT("TEST: e2e crash recover test with rollback");
1307    } else {
1308        TEST_RESULT("TEST: e2e crash recover test");
1309    }
1310}
1311
1312void e2e_concurrent_reader_writer(bool do_compaction) {
1313    TEST_INIT();
1314    memleak_start();
1315
1316    randomize();
1317    fdb_config fconfig = fdb_get_default_config();
1318    // to allow iterators to validate docs across async compaction
1319    // specify purging_interval so deleted docs are not dropped by
1320    // compactor immediately..
1321    fconfig.purging_interval = 80;
1322    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1323
1324    // init
1325    rm_storage_fs();
1326
1327    // test
1328    int i;
1329    int n_writers = 2;
1330    int n_scanners = 2;
1331    storage_t **st = alca(storage_t *, n_writers);
1332    storage_t **st2 = alca(storage_t *, n_writers);
1333    checkpoint_t *verification_checkpoint = alca(checkpoint_t, n_writers);
1334    idx_prams_t *index_params = alca(idx_prams_t, n_writers);
1335    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1336    thread_t *tid_sc = alca(thread_t, n_scanners);
1337    void **thread_ret_sc = alca(void *, n_scanners);
1338    thread_t *tid_wr = alca(thread_t, n_writers);
1339    void **thread_ret_wr = alca(void *, n_writers);
1340    fdb_kvs_handle **scan_kv = alca(fdb_kvs_handle *, n_scanners);
1341    thread_t c_tid;
1342    void *c_ret;
1343
1344    // init storage handles
1345    // the nth writer is commit handler
1346    for (i = 0; i < n_writers; ++i) {
1347        gen_index_params(&index_params[i]);
1348        memset(&verification_checkpoint[i], 0, sizeof(checkpoint_t));
1349        st[i] = init_storage(&fconfig, &fconfig, &kvs_config,
1350                             &index_params[i], &verification_checkpoint[i],
1351                             true);
1352        st2[i] = init_storage(&fconfig, &fconfig, &kvs_config,
1353                              &index_params[i], &verification_checkpoint[i],
1354                              true);
1355        memcpy(st2[i]->keyspace, st[i]->keyspace, KEYSPACE_LEN);
1356    }
1357
1358    if (do_compaction) {
1359        thread_create(&c_tid, compact_thread, (void*)&fconfig);
1360    }
1361
1362    // start new item thread
1363    thread_create(&tid_wr[0], seq_writer_thread, (void*)st[0]);
1364    // start update thread
1365    thread_create(&tid_wr[1], update_thread, (void*)st[1]->index1);
1366
1367    // start disk-snapshot thread
1368    scan_kv[0] = scan(st2[0], NULL);
1369    thread_create(&tid_sc[0], disk_read_thread, (void*)st[1]);
1370    // start mem-snapshot thread
1371    scan_kv[1] = scan(st2[1], NULL);
1372    thread_create(&tid_sc[1], scan_thread, (void*)scan_kv[1]);
1373
1374    // join threads
1375    thread_join(tid_wr[0], &thread_ret_wr[0]);
1376    thread_join(tid_wr[1], &thread_ret_wr[1]);
1377    thread_join(tid_sc[1], &thread_ret_sc[1]);
1378    thread_join(tid_sc[0], &thread_ret_sc[0]);
1379    if (do_compaction) {
1380        thread_join(c_tid, &c_ret);
1381    }
1382
1383    // commit
1384    e2e_fdb_commit(st[0]->main, false);
1385
1386    for (i = 0; i < n_writers; ++i) {
1387        // teardown
1388        e2e_fdb_close(st2[i]);
1389        e2e_fdb_close(st[i]);
1390    }
1391    fdb_shutdown();
1392
1393    memleak_end();
1394    if (do_compaction) {
1395        TEST_RESULT("TEST: e2e concurrent reader writer test with compaction");
1396    } else {
1397        TEST_RESULT("TEST: e2e concurrent reader writer test");
1398    }
1399}
1400
1401void e2e_multi_dbfile_concurrent_wr() {
1402    TEST_INIT();
1403    memleak_start();
1404    int i, r;
1405    int nf = 16;
1406    char buf[64];
1407    thread_t *tid_wr = alca(thread_t, nf);
1408    void **thread_ret_wr = alca(void *, nf);
1409    fdb_file_handle **dbfiles = alca(fdb_file_handle*, nf);
1410    fdb_kvs_handle **kvs = alca(fdb_kvs_handle*, nf);
1411    fdb_status status;
1412    randomize();
1413    rm_storage_fs();
1414    r = system(SHELL_DEL" e2edb_ex* > errorlog.txt");
1415    (void)r;
1416
1417    // create dbfiles
1418    fdb_config fconfig = fdb_get_default_config();
1419    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1420    fconfig.compaction_mode  = FDB_COMPACTION_MANUAL;
1421
1422    for (i = 0; i < nf; i++) {
1423        sprintf(buf, "e2edb_ex%d", i);
1424        status = fdb_open(&dbfiles[i], buf, &fconfig);
1425        TEST_CHK(status == FDB_RESULT_SUCCESS);
1426        status = fdb_kvs_open_default(dbfiles[i], &kvs[i], &kvs_config);
1427        TEST_CHK(status == FDB_RESULT_SUCCESS);
1428
1429        // run update thread on each file to enter reuse
1430        thread_create(&tid_wr[i], update_thread, (void*)kvs[i]);
1431    }
1432
1433    // join and commit
1434    for (i = 0; i < nf; i++) {
1435        thread_join(tid_wr[i], &thread_ret_wr[i]);
1436        fdb_commit(dbfiles[i], FDB_COMMIT_MANUAL_WAL_FLUSH);
1437        fdb_close(dbfiles[i]);
1438    }
1439
1440    fdb_shutdown();
1441    memleak_end();
1442    TEST_RESULT("TEST: e2e multi dbfile concurrent wr");
1443}
1444
1445void e2e_multi_kvs_concurrent_wr() {
1446    TEST_INIT();
1447    memleak_start();
1448    int i, j, r;
1449    int nf = 8;
1450    char buf[64];
1451    char body[64];
1452
1453    thread_t *tid_wr = alca(thread_t, nf);
1454    void **thread_ret_wr = alca(void *, nf);
1455    fdb_file_handle *dbfile;
1456    fdb_kvs_handle **kvs = alca(fdb_kvs_handle*, nf);
1457    fdb_doc *rdoc = NULL;
1458    fdb_status status;
1459    randomize();
1460    rm_storage_fs();
1461    r = system(SHELL_DEL" e2edb_main > errorlog.txt");
1462    fdb_iterator *it;
1463    (void)r;
1464
1465    // create dbfile
1466    fdb_config fconfig = fdb_get_default_config();
1467    fconfig.num_keeping_headers = 10;
1468    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1469    fconfig.compaction_mode  = FDB_COMPACTION_MANUAL;
1470    fdb_open(&dbfile, "e2edb_main", &fconfig);
1471
1472    // open dbfiles
1473    for (i = 0; i < nf; i++) {
1474        sprintf(buf, "e2edb_kv%d", i);
1475        status = fdb_kvs_open(dbfile, &kvs[i], buf, &kvs_config);
1476        TEST_CHK(status == FDB_RESULT_SUCCESS);
1477    }
1478
1479    // initial commits
1480    for (j = 0; j < 10; j++) {
1481        for (i = 0; i < nf; i++) {
1482            sprintf(buf, "%dkey%d", j, i);
1483            sprintf(body, "commit%d", j);
1484            status = fdb_set_kv(kvs[i], buf, strlen(buf), body, strlen(body));
1485            TEST_CHK(status == FDB_RESULT_SUCCESS);
1486        }
1487        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1488        TEST_CHK(status == FDB_RESULT_SUCCESS);
1489    }
1490
1491    // run update thread on each file to enter reuse
1492    for (i = 0; i < nf; i++) {
1493        thread_create(&tid_wr[i], update_thread, (void*)kvs[i]);
1494    }
1495
1496    // join and rollback
1497    for (i = 0; i < nf; i++) {
1498        thread_join(tid_wr[i], &thread_ret_wr[i]);
1499        status = fdb_rollback(&kvs[i], 7);
1500        TEST_CHK(status == FDB_RESULT_SUCCESS);
1501    }
1502
1503    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1504    TEST_CHK(status == FDB_RESULT_SUCCESS);
1505    for (i = 0; i < nf; i++) {
1506        // verify rollback kvs
1507        status = fdb_iterator_init(kvs[i], &it, NULL, 0,
1508                                   NULL, 0, FDB_ITR_NONE);
1509        TEST_CHK(status == FDB_RESULT_SUCCESS);
1510        j = 0;
1511        do {
1512            status = fdb_iterator_get(it, &rdoc);
1513            TEST_CHK (status == FDB_RESULT_SUCCESS);
1514            fdb_doc_free(rdoc);
1515            rdoc=NULL;
1516            j++;
1517        } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
1518        TEST_CHK(j==7);
1519        fdb_iterator_close(it);
1520    }
1521
1522    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1523    fdb_close(dbfile);
1524    fdb_shutdown();
1525
1526    memleak_end();
1527
1528    r = system(SHELL_DEL" e2edb_main > errorlog.txt");
1529    (void)r;
1530
1531    TEST_RESULT("TEST: e2e multi kvs concurrent wr test");
1532}
1533
1534void e2e_multi_kvs_concurrent_wr_compact() {
1535    TEST_INIT();
1536    memleak_start();
1537    int i, j, r;
1538    int nf = 4;
1539    char buf[64];
1540
1541    thread_t *tid_wr = alca(thread_t, nf*nf);
1542    void **thread_ret_wr = alca(void *, nf*nf);
1543    fdb_file_handle **dbfiles = alca(fdb_file_handle*, nf);
1544    fdb_kvs_handle **kvs = alca(fdb_kvs_handle*, nf*nf);
1545    fdb_status status;
1546    randomize();
1547    rm_storage_fs();
1548    r = system(SHELL_DEL" e2edb_ex* > errorlog.txt");
1549    (void)r;
1550
1551    // create dbfile
1552    fdb_config fconfig = fdb_get_default_config();
1553    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1554    fconfig.compaction_mode  = FDB_COMPACTION_MANUAL;
1555
1556    // open dbfiles
1557    for (i = 0; i < nf; ++i) {
1558        sprintf(buf, "e2edb_ex%d", i);
1559        status = fdb_open(&dbfiles[i], buf, &fconfig);
1560        TEST_CHK(status == FDB_RESULT_SUCCESS);
1561
1562        for (j = i*nf; j < (i * nf + nf); ++j) {
1563            sprintf(buf, "e2edb_kv%d", j);
1564            status = fdb_kvs_open(dbfiles[i], &kvs[j], buf, &kvs_config);
1565            TEST_CHK(status == FDB_RESULT_SUCCESS);
1566        }
1567    }
1568
1569    // initial commits
1570    for (i = 0; i < nf; i++) {
1571        status = fdb_commit(dbfiles[i], FDB_COMMIT_MANUAL_WAL_FLUSH);
1572        TEST_CHK(status == FDB_RESULT_SUCCESS);
1573    }
1574
1575    // run update thread on each file to enter reuse
1576    for (i = 0; i < (nf * nf); i++) {
1577        thread_create(&tid_wr[i], update_thread, (void*)kvs[i]);
1578    }
1579
1580    // join update threads
1581    for (i = 0; i < (nf * nf); i++) {
1582      thread_join(tid_wr[i], &thread_ret_wr[i]);
1583      TEST_CHK(status == FDB_RESULT_SUCCESS);
1584    }
1585
1586    // more commits
1587    for (i = 0; i < nf; i++) {
1588        status = fdb_commit(dbfiles[i], FDB_COMMIT_MANUAL_WAL_FLUSH);
1589        TEST_CHK(status == FDB_RESULT_SUCCESS);
1590    }
1591
1592    for (i = 0; i < nf; i++) {
1593        fdb_close(dbfiles[i]);
1594    }
1595    fdb_shutdown();
1596    memleak_end();
1597
1598    TEST_RESULT("TEST: e2e multi kvs concurrent wr test with compaction");
1599}
1600
1601int main() {
1602
1603    // Note: following tests are temporarily disabled due to
1604    // the keeping header violation issue by rollback/snapshot API.
1605    //
1606    //   - e2e_multi_kvs_concurrent_wr();
1607    //   - e2e_index_basic_test();
1608
1609    /* Multiple kvstores under reuse with rollback */
1610    // e2e_multi_kvs_concurrent_wr();
1611
1612    /* Multiple kvstores under reuse with rollback and compaction */
1613    e2e_multi_kvs_concurrent_wr_compact();
1614
1615    /* Multiple dbfiles after stale block reuse */
1616    e2e_multi_dbfile_concurrent_wr();
1617
1618    /* Concurrent readers and writers after stale block reuse, without
1619       and with compaction */
1620    e2e_concurrent_reader_writer(false);
1621    e2e_concurrent_reader_writer(true);     // w.compaction
1622
1623    e2e_robust_test();
1624    e2e_concurrent_scan_test();
1625    e2e_async_manual_compact_test();
1626    // e2e_index_basic_test();
1627    e2e_index_walflush_test_no_deletes_auto_compact();
1628    e2e_index_walflush_autocompact_test();
1629    e2e_index_normal_commit_autocompact_test();
1630
1631    /* Data loading with concurrent compaction */
1632    e2e_scan_compact_upto_test();
1633
1634    /* Crash recovery with e2e workload, without and with a rollback */
1635    e2e_crash_recover_test(false);
1636    e2e_crash_recover_test(true);           // w.rollback
1637
1638    return 0;
1639}
1640