1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <stdint.h>
22 #include <time.h>
23 #include <errno.h>
24 #if !defined(WIN32) && !defined(_WIN32)
25 #include <unistd.h>
26 #endif
27 
28 #include "filemgr_ops.h"
29 
30 #include "libforestdb/forestdb.h"
31 #include "test.h"
32 #include "e2espec.h"
33 
load_persons(storage_t *st)34 void load_persons(storage_t *st) {
35     int i, n=100;
36     person_t p;
37 
38     // store and index person docs
39     for (i = 0; i < n; ++i) {
40         gen_person(&p);
41         e2e_fdb_set_person(st, &p);
42     }
43 
44 #ifdef __DEBUG_E2E
45     printf("[%s] load persons: %03d docs created\n",st->keyspace, n);
46 #endif
47 }
48 
delete_persons(storage_t *st)49 void delete_persons(storage_t *st) {
50     TEST_INIT();
51 
52     fdb_doc *rdoc = NULL;
53     fdb_status status;
54     fdb_iterator *it;
55     person_t *p;
56     int i, n = 0;
57 
58     // delete every 5th doc
59     status = fdb_iterator_sequence_init(st->all_docs, &it, 0, 0,
60                                         FDB_ITR_NO_DELETES);
61     TEST_CHK(status == FDB_RESULT_SUCCESS);
62 
63     i = 0;
64     do {
65         status = fdb_iterator_get(it, &rdoc);
66         TEST_CHK (status == FDB_RESULT_SUCCESS);
67         if ((i % 5) == 0) {
68 
69             p = (person_t *)rdoc->body;
70             // ensure the requester has created this key
71             if (strcmp(p->keyspace, st->keyspace) == 0) {
72                 e2e_fdb_del_person(st, p);
73                 n++;
74             }
75 
76         }
77         fdb_doc_free(rdoc);
78         rdoc=NULL;
79         i++;
80     } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
81 
82     fdb_iterator_close(it);
83 
84 #ifdef __DEBUG_E2E
85     sprintf(rbuf, "[%s] delete persons: %03d docs deleted\n",st->keyspace, n);
86     TEST_RESULT(rbuf);
87 #endif
88 }
89 
90 /*
91  * reset params used to index storage
92  * delete old docs that are part of new index
93  * so that they are not included at verification time
94  */
update_index(storage_t *st, bool checkpointing)95 void update_index(storage_t *st, bool checkpointing) {
96 
97     TEST_INIT();
98 
99     fdb_iterator *it;
100     person_t *p = NULL;
101     fdb_doc *rdoc = NULL;
102     fdb_status status;
103     int n = 0;
104     size_t vallen;
105     char *mink = st->index_params->min;
106     char *maxk = st->index_params->max;
107     char rbuf[256];
108 
109     // change storage index range
110     reset_storage_index(st);
111 
112     if (checkpointing) {
113         start_checkpoint(st);
114     }
115     status = fdb_iterator_init(st->index1, &it, mink, 12,
116                                maxk, 12, FDB_ITR_NO_DELETES);
117     if (status != FDB_RESULT_SUCCESS) {
118         // no items within min max range
119         TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
120     }
121 
122 
123 
124     do {
125         status = fdb_iterator_get(it, &rdoc);
126         if (status == FDB_RESULT_SUCCESS) {
127             status = fdb_get_kv(st->all_docs,
128                                 rdoc->body, rdoc->bodylen,
129                                 (void **)&p, &vallen);
130             if (status == FDB_RESULT_SUCCESS) {
131                 if (strcmp(p->keyspace, st->keyspace) == 0) {
132                     e2e_fdb_del_person(st, p);
133                     n++;
134                 }
135                 free(p);
136                 p=NULL;
137             }
138             fdb_doc_free(rdoc);
139             rdoc=NULL;
140         }
141     } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
142 
143     if (checkpointing) {
144         end_checkpoint(st);
145     }
146 
147     fdb_iterator_close(it);
148 
149     // reset verification chkpoint
150     st->v_chk->num_indexed = 0;
151     st->v_chk->sum_age_indexed = 0;
152 
153     sprintf(rbuf, "update index: %03d docs deleted", n);
154 #ifdef __DEBUG_E2E
155     TEST_RESULT(rbuf);
156 #endif
157 }
158 
159 // --- verify ----
160 // 1. check that num of keys within index are correct
161 // 2. check that age index total is correct for specified range
162 // 3. check that doc count is as expected
verify_db(storage_t *st)163 void verify_db(storage_t *st) {
164 
165     TEST_INIT();
166 
167     checkpoint_t *db_checkpoint = create_checkpoint(st, END_CHECKPOINT);
168     int db_ndocs = db_checkpoint->ndocs;
169     int exp_ndocs = st->v_chk->ndocs;
170     int exp_nidx = st->v_chk->num_indexed;
171     int db_nidx = db_checkpoint->num_indexed;
172     int db_suma = db_checkpoint->sum_age_indexed;
173     int exp_suma = st->v_chk->sum_age_indexed;
174     fdb_kvs_info info;
175     char rbuf[256];
176 
177     e2e_fdb_commit(st->main, st->walflush);
178 
179     fdb_get_kvs_info(st->index1, &info);
180 
181 #ifdef __DEBUG_E2E
182     int val1, val2;
183     fdb_iterator *it;
184     fdb_doc *rdoc = NULL;
185     if (db_ndocs != exp_ndocs) {
186         // for debugging: currently inaccurate for concurrency patterns
187         fdb_get_kvs_info(st->all_docs, &info);
188         val1 = info.doc_count;
189         (void)val1;
190         val2 = 0;
191         fdb_iterator_init(st->index1, &it, NULL, 0,
192                           NULL, 0, FDB_ITR_NONE);
193         do {
194             fdb_iterator_get(it, &rdoc);
195             if (!rdoc->deleted) {
196                 val2++;
197             }
198             fdb_doc_free(rdoc);
199             rdoc=NULL;
200         } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
201         printf("ndocs_debug: kvs_info(%d) == exp_ndocs(%d) ?\n", val1,exp_ndocs);
202         printf("ndocs_debug: kvs_info(%d) == itr_count(%d) ?\n", val1, val2);
203         fdb_iterator_close(it);
204     }
205     printf("[%s] db_ndix(%d) == exp_nidx(%d)\n", st->keyspace, db_nidx, exp_nidx);
206 #endif
207 
208     free(db_checkpoint);
209     db_checkpoint=NULL;
210     //TEST_CHK(db_nidx==exp_nidx);
211     //TEST_CHK(db_suma==exp_suma);
212 
213     sprintf(rbuf, "[%s] verifydb: ndocs(%d=%d), nidx(%d=%d), sumage(%d=%d)\n",
214             st->keyspace,
215             db_ndocs, exp_ndocs,
216             db_nidx, exp_nidx,
217             db_suma, exp_suma);
218 #ifdef __DEBUG_E2E
219     TEST_RESULT(rbuf);
220 #endif
221 }
222 
223 
224 /*
225  * compares a db where src is typically from
226  * a rollback state of live data and replay is a
227  * db to use for comparison  of expected data
228  */
db_compare(fdb_kvs_handle *src, fdb_kvs_handle *replay)229 void db_compare(fdb_kvs_handle *src, fdb_kvs_handle *replay) {
230 
231     TEST_INIT();
232 
233     int ndoc1, ndoc2;
234     fdb_kvs_info info;
235     fdb_iterator *it;
236     fdb_doc *rdoc = NULL;
237     fdb_doc *vdoc = NULL;
238     fdb_status status;
239     char rbuf[256];
240 
241     fdb_get_kvs_info(src, &info);
242     ndoc1 = info.doc_count;
243     fdb_get_kvs_info(replay, &info);
244     ndoc2 = info.doc_count;
245 
246     TEST_CHK(ndoc1 == ndoc2);
247 
248     // all docs in replay db must be in source db with same status
249     status = fdb_iterator_sequence_init(replay, &it, 0, 0, FDB_ITR_NONE);
250     TEST_CHK(status == FDB_RESULT_SUCCESS);
251     do {
252         status = fdb_iterator_get_metaonly(it, &rdoc);
253         TEST_CHK(status == FDB_RESULT_SUCCESS);
254 
255         fdb_doc_create(&vdoc, rdoc->key, rdoc->keylen,
256                               rdoc->meta, rdoc->metalen,
257                               rdoc->body, rdoc->bodylen);
258         // lookup by key
259         status = fdb_get(src, vdoc);
260 
261         if (rdoc->deleted) {
262             TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
263         } else {
264             TEST_CHK(status == FDB_RESULT_SUCCESS);
265         }
266 
267         fdb_doc_free(rdoc);
268         fdb_doc_free(vdoc);
269         rdoc=NULL;
270         vdoc=NULL;
271 
272     } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
273     fdb_iterator_close(it);
274 
275     sprintf(rbuf, "db compare: src(%d) == replay(%d)", ndoc1, ndoc2);
276 #ifdef __DEBUG_E2E
277     TEST_RESULT(rbuf);
278 #endif
279 }
280 
281 /*
282  * populate replay db up to specified seqnum
283  */
load_replay_kvs(storage_t *st, fdb_kvs_handle *replay_kvs, fdb_seqnum_t seqnum)284 void load_replay_kvs(storage_t *st, fdb_kvs_handle *replay_kvs, fdb_seqnum_t seqnum) {
285 
286     TEST_INIT();
287 
288     fdb_iterator *it;
289     fdb_doc *rdoc = NULL;
290     fdb_status status;
291     transaction_t *tx;
292 
293 
294     // iterator end at seqnum
295     status = fdb_iterator_sequence_init(st->rtx, &it, 0, seqnum,
296                                         FDB_ITR_NONE);
297     TEST_CHK(status == FDB_RESULT_SUCCESS);
298 
299     do {
300         status = fdb_iterator_get(it, &rdoc);
301         TEST_CHK(status == FDB_RESULT_SUCCESS);
302 
303         tx = (transaction_t *)rdoc->body;
304         if (tx->type == SET_PERSON) {
305             status = fdb_set_kv(replay_kvs,
306                                 tx->refkey,
307                                 tx->refkey_len,
308                                 NULL,0);
309             TEST_CHK(status == FDB_RESULT_SUCCESS);
310         }
311         if (tx->type == DEL_PERSON) {
312             status = fdb_del_kv(replay_kvs,
313                                 tx->refkey,
314                                 tx->refkey_len);
315             TEST_CHK(status == FDB_RESULT_SUCCESS);
316         }
317         fdb_doc_free(rdoc);
318         rdoc=NULL;
319 
320     } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
321     fdb_iterator_close(it);
322 
323 }
324 
325 /*
326  * replay records db up to a checkpoint into another db
327  * rollback all_docs to that checkpoint
328  * compare all_docs at that state to new doc
329  */
replay(storage_t *st)330 void replay(storage_t *st) {
331     TEST_INIT();
332 
333     int i;
334     size_t v;
335     char kvsbuf[10];
336     fdb_file_handle *dbfile;
337     fdb_kvs_handle *replay_kvs;
338     fdb_config fconfig = fdb_get_default_config();
339     fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
340     fconfig.wal_threshold = 1024;
341     fconfig.flags = FDB_OPEN_FLAG_CREATE;
342     fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
343     fconfig.compaction_threshold = 10;
344     fdb_iterator *it;
345     fdb_status status;
346     fdb_doc *rdoc = NULL;
347     fdb_kvs_info info;
348     transaction_t *tx;
349     checkpoint_t *chk;
350     fdb_seqnum_t rollback_seqnum;
351 
352     // create replay kvs
353     status = fdb_open(&dbfile, E2EDB_RECORDS, &fconfig);
354     TEST_CHK(status == FDB_RESULT_SUCCESS);
355 
356     e2e_fdb_commit(st->main, st->walflush);
357     status = fdb_get_kvs_info(st->all_docs, &info);
358     TEST_CHK(status == FDB_RESULT_SUCCESS);
359 
360 
361     // iterate over records kv and replay transactions
362     status = fdb_iterator_sequence_init(st->rtx, &it, 0, 0, FDB_ITR_NONE);
363     TEST_CHK(status == FDB_RESULT_SUCCESS);
364 
365     // seek to end so we can reverse iterate
366     // seq iterators cannot seek
367     do {
368         ;
369     } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
370 
371 
372     // reverse iterate from highest to lowest checkpoint
373     i=0;
374     while (fdb_iterator_prev(it) != FDB_RESULT_ITERATOR_FAIL) {
375         status = fdb_iterator_get(it, &rdoc);
376         TEST_CHK(status == FDB_RESULT_SUCCESS);
377         tx = (transaction_t *)rdoc->body;
378         if (tx->type == END_CHECKPOINT) {
379 
380             sprintf(kvsbuf, "rkvs%d", i);
381             status = fdb_kvs_open(dbfile, &replay_kvs, kvsbuf,  &kvs_config);
382             TEST_CHK(status == FDB_RESULT_SUCCESS);
383 
384             // load replay db up to this seqnum
385             load_replay_kvs(st, replay_kvs, rdoc->seqnum);
386 
387             // get checkpoint doc for rollback
388             status = fdb_get_kv(st->chk, tx->refkey, tx->refkey_len,
389                                 (void **)&chk, &v);
390 
391             TEST_CHK(status == FDB_RESULT_SUCCESS);
392             rollback_seqnum = chk->seqnum_all;
393 ;
394 #ifdef __DEBUG_E2E
395             printf("rollback to %llu\n", chk->seqnum_all);
396 #endif
397             status = fdb_rollback(&st->all_docs, rollback_seqnum);
398             if (status == FDB_RESULT_NO_DB_INSTANCE) {
399                 free(chk);
400                 // drop replay kvs
401                 status = fdb_kvs_close(replay_kvs);
402                 TEST_CHK(status == FDB_RESULT_SUCCESS);
403                 status = fdb_kvs_remove(dbfile, kvsbuf);
404                 TEST_CHK(status == FDB_RESULT_SUCCESS);
405                 break;
406             }
407             TEST_CHK(status == FDB_RESULT_SUCCESS);
408             free(chk);
409             chk=NULL;
410             // after rollback, WAL entries should be flushed for
411             // accurate # docs count comparison with 'replay_kvs'.
412             e2e_fdb_commit(st->main, true);
413 
414             status = fdb_get_kvs_info(st->rtx, &info);
415             TEST_CHK(status == FDB_RESULT_SUCCESS);
416 
417             // compare rollback and replay db
418             e2e_fdb_commit(dbfile, st->walflush);
419             db_compare(st->all_docs, replay_kvs);
420 
421             // drop replay kvs
422             status = fdb_kvs_close(replay_kvs);
423             TEST_CHK(status == FDB_RESULT_SUCCESS);
424             status = fdb_kvs_remove(dbfile, kvsbuf);
425             TEST_CHK(status == FDB_RESULT_SUCCESS);
426             i++;
427         }
428         fdb_doc_free(rdoc);
429         rdoc=NULL;
430     }
431 
432     fdb_iterator_close(it);
433     fdb_close(dbfile);
434 
435 }
436 
437 /* do forward previous and seek operations on main kv */
iterate_thread(void *args)438 void *iterate_thread(void *args) {
439 
440     TEST_INIT();
441 
442     int i, j;
443     fdb_config *fconfig = (fdb_config *)args;
444     fdb_file_handle *dbfile;
445     fdb_iterator *it;
446     fdb_doc *rdoc = NULL;
447     fdb_open(&dbfile, E2EDB_MAIN, fconfig);
448     fdb_kvs_handle *all_docs, *snap_db;
449     fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
450     fdb_status status;
451     person_t p;
452 
453     for (i = 0; i < 50; i++) {
454         fdb_kvs_open(dbfile, &all_docs, E2EKV_ALLDOCS,  &kvs_config);
455 
456         if ((i % 2) == 0) { //snapshot
457             status = fdb_snapshot_open(all_docs, &snap_db, FDB_SNAPSHOT_INMEM);
458             TEST_CHK(status == FDB_RESULT_SUCCESS);
459             status = fdb_iterator_init(snap_db, &it, NULL, 0, NULL, 0, FDB_ITR_NONE);
460             TEST_CHK(status == FDB_RESULT_SUCCESS);
461         } else {
462             status = fdb_iterator_init(all_docs, &it, NULL, 0, NULL, 0, FDB_ITR_NONE);
463             TEST_CHK(status == FDB_RESULT_SUCCESS);
464         }
465         j = 0;
466         // forward
467         do {
468             status = fdb_iterator_get(it, &rdoc);
469             if (j) {
470                 TEST_CHK(status == FDB_RESULT_SUCCESS);
471             }
472             fdb_doc_free(rdoc);
473             rdoc = NULL;
474             j++;
475         } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
476         // reverse and seek ahead
477         for (j = 0; j < 10; j++) {
478             while (fdb_iterator_prev(it) != FDB_RESULT_ITERATOR_FAIL) {
479                 status = fdb_iterator_get(it, &rdoc);
480                 TEST_CHK(status == FDB_RESULT_SUCCESS);
481                 fdb_doc_free(rdoc);
482                 rdoc = NULL;
483             }
484             gen_person(&p);
485 
486             // seek using random person key
487             if (j % 2 == 0) { // seek higher
488                 fdb_iterator_seek(it, p.key, strlen(p.key), FDB_ITR_SEEK_HIGHER);
489             } else {
490                 fdb_iterator_seek(it, p.key, strlen(p.key), FDB_ITR_SEEK_LOWER);
491             }
492         }
493         fdb_iterator_close(it);
494         if ((i % 2) == 0) {
495             fdb_kvs_close(snap_db);
496         }
497         fdb_kvs_close(all_docs);
498     }
499 
500     fdb_doc_free(rdoc);
501     rdoc = NULL;
502     fdb_close(dbfile);
503     return NULL;
504 }
505 
compact_thread(void *args)506 void *compact_thread(void *args) {
507     int i;
508     fdb_config *fconfig = (fdb_config *)args;
509     fdb_file_handle *dbfile;
510     fdb_open(&dbfile, E2EDB_MAIN, fconfig);
511     for (i = 0; i < 3; ++i) {
512         sleep(2);
513 #ifdef __DEBUG_E2E
514         printf("compact: %d\n", i);
515 #endif
516         fdb_compact(dbfile, NULL);
517     }
518     fdb_close(dbfile);
519     return NULL;
520 }
521 
compact_upto_thread(void *args)522 void *compact_upto_thread(void *args) {
523     TEST_INIT();
524 
525     int i;
526     uint64_t num_markers;
527     fdb_snapshot_info_t *markers;
528     fdb_status status;
529 
530     fdb_config *fconfig = (fdb_config *)args;
531     fdb_file_handle *dbfile;
532     fdb_open(&dbfile, E2EDB_MAIN, fconfig);
533     for (i = 0; i < 10; ++i) {
534         sleep(1);
535         status = fdb_get_all_snap_markers(dbfile, &markers,
536                                           &num_markers);
537         TEST_CHK(status == FDB_RESULT_SUCCESS);
538         if (num_markers == 0) {
539             // No need of compaction as file is empty
540             break;
541         }
542         status = fdb_compact_upto(dbfile, NULL, markers[0].marker);
543         TEST_CHK(status == FDB_RESULT_SUCCESS);
544         fdb_free_snap_markers(markers, num_markers);
545     }
546     fdb_close(dbfile);
547     return NULL;
548 }
549 
e2e_async_compact_pattern(int n_checkpoints, fdb_config fconfig, bool deletes, bool walflush)550 void e2e_async_compact_pattern(int n_checkpoints, fdb_config fconfig,
551                                bool deletes, bool walflush) {
552     TEST_INIT();
553     int n, i;
554     storage_t *st;
555     checkpoint_t verification_checkpoint;
556     idx_prams_t index_params;
557     fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
558     fdb_kvs_info info;
559     fdb_kvs_handle *snap_db;
560     fdb_status status;
561     thread_t tid;
562     void *thread_ret;
563     n_checkpoints = n_checkpoints * LOAD_FACTOR;
564 
565     memleak_start();
566 
567     // init
568     rm_storage_fs();
569     gen_index_params(&index_params);
570     memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
571 
572     // setup
573     st = init_storage(&fconfig, &fconfig, &kvs_config,
574             &index_params, &verification_checkpoint, walflush);
575 
576     // create compaction thread
577     thread_create(&tid, compact_thread, (void*)&fconfig);
578 
579     // test
580     for ( n = 0; n < n_checkpoints; ++n) {
581 
582 #ifdef __DEBUG_E2E
583         printf("checkpoint: %d\n", n);
584 #endif
585         load_persons(st);
586         for (i=0;i<100;++i) {
587 #ifdef __DEBUG_E2E
588             printf("\n\n----%d----\n", i);
589 #endif
590             load_persons(st);
591             if (deletes) {
592                 delete_persons(st);
593             }
594             e2e_fdb_commit(st->main, walflush);
595             status = fdb_get_kvs_info(st->all_docs, &info);
596             TEST_CHK(status == FDB_RESULT_SUCCESS);
597             status = fdb_snapshot_open(st->all_docs, &snap_db, info.last_seqnum);
598             TEST_CHK(status == FDB_RESULT_SUCCESS);
599             update_index(st, true);
600             verify_db(st);
601             fdb_kvs_close(snap_db);
602         }
603     }
604 
605     thread_join(tid, &thread_ret);
606     // teardown
607     e2e_fdb_shutdown(st);
608 
609     memleak_end();
610 }
611 
e2e_kvs_index_pattern(int n_checkpoints, fdb_config fconfig, bool deletes, bool walflush)612 void e2e_kvs_index_pattern(int n_checkpoints, fdb_config fconfig,
613                            bool deletes, bool walflush) {
614 
615     int n, i;
616     storage_t *st;
617     checkpoint_t verification_checkpoint;
618     idx_prams_t index_params;
619     fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
620     n_checkpoints = n_checkpoints * LOAD_FACTOR;
621 
622     memleak_start();
623 
624     // init
625     rm_storage_fs();
626     gen_index_params(&index_params);
627     memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
628 
629     // setup
630     st = init_storage(&fconfig, &fconfig, &kvs_config,
631                       &index_params, &verification_checkpoint, walflush);
632 
633     // test
634     for (n = 0; n < n_checkpoints; ++n) {
635 
636         // checkpoint
637         start_checkpoint(st);
638 
639         for (i = 0; i < 100; ++i) {
640 #ifdef __DEBUG_E2E
641             printf("\n\n----%d----\n", i);
642 #endif
643             load_persons(st);
644             if (deletes) {
645                 delete_persons(st);
646             }
647             verify_db(st);
648         }
649 
650         // end checkpoint
651         end_checkpoint(st);
652 
653         // change index range
654         update_index(st, true);
655         verify_db(st);
656 
657     }
658 
659     if (fconfig.compaction_mode != FDB_COMPACTION_AUTO) {
660         /* replay involves rollbacks but
661          * cannot rollback pre compact due to BUG: MB-13130
662          */
663         replay(st);
664     }
665 
666     // teardown
667     e2e_fdb_shutdown(st);
668 
669     memleak_end();
670 }
671 
scan_thread(void *args)672 void *scan_thread(void *args) {
673     int i = 0;
674     fdb_kvs_handle *scan_kv = (fdb_kvs_handle *)args;
675 
676     for (i = 0; i < 20; ++i) {
677         scan(NULL, scan_kv);
678     }
679     return NULL;
680 }
681 
disk_read_thread(void *args)682 void *disk_read_thread(void *args) {
683     TEST_INIT();
684     int n;
685     uint64_t i;
686     int seqno;
687     uint64_t num_markers;
688     fdb_snapshot_info_t *markers;
689     fdb_kvs_handle *snap_db;
690     fdb_status status;
691     fdb_doc *doc;
692     person_t p;
693     fdb_doc_create(&doc, NULL, 0, NULL, 0, NULL, 0);
694 
695     storage_t *st = (storage_t *)args;
696 
697     for (n = 0; n < 10; ++n) {
698         status = fdb_get_all_snap_markers(st->main, &markers,
699                                           &num_markers);
700         TEST_CHK(status == FDB_RESULT_SUCCESS);
701         for (i = 0; i < num_markers; i++) {
702             // open disk snapshot to each marker
703             seqno = markers[i].kvs_markers[0].seqnum;
704             if (!seqno) {
705                 continue;
706             }
707             status = fdb_snapshot_open(st->all_docs, &snap_db, seqno);
708             if (status == FDB_RESULT_SUCCESS) {
709                 doc->seqnum = seqno;
710                 // can get doc
711                 status = fdb_get_byseq(snap_db, doc);
712                 TEST_CHK(status == FDB_RESULT_SUCCESS);
713 
714                 // verify
715                 memcpy(&p, (person_t *)doc->body, sizeof(person_t));
716                 TEST_CHK(p.age <= seqno)
717             }
718         }
719         fdb_free_snap_markers(markers, num_markers);
720     }
721 
722     fdb_doc_free(doc);
723     return NULL;
724 }
725 
726 
writer_thread(void *args)727 void *writer_thread(void *args) {
728 
729     storage_t *st = (storage_t *)args;
730     for (int i = 0; i < 5; ++i) {
731         load_persons(st);
732         if (i == 5) {
733             delete_persons(st);
734         }
735     }
736     return NULL;
737 }
738 
seq_writer_thread(void *args)739 void *seq_writer_thread(void *args) {
740 
741     int j, i = 0, n=10000;
742     person_t p;
743     storage_t *st = (storage_t *)args;
744     for (j = 0; j < 10; ++j) {
745         for (i = 0; i < n; ++i) {
746             gen_person(&p);
747             p.age = i;
748             sprintf(p.key, "person%d", i);
749             e2e_fdb_set_person(st, &p);
750         }
751         e2e_fdb_commit(st->main, true);
752     }
753     return NULL;
754 }
755 
update_thread(void *args)756 void *update_thread(void *args) {
757     TEST_INIT();
758 
759     int i = 0, n=100000;
760     person_t p;
761     fdb_kvs_handle *kv = (fdb_kvs_handle *)args;
762 
763     // only generate 1 person type
764     gen_person(&p);
765     for (i = 0; i < n; ++i) {
766         fdb_set_kv(kv, p.key, strlen(p.key),
767                    p.name, strlen(p.name));
768     }
769     return NULL;
770 }
771 
772 
773 /*
774  * perform many fdb features against concurrent handlers
775  * to verify robustness of db.  this pattern is qualified by
776  * completion without faults data corrections and has relaxed error handling.
777  */
e2e_robust_pattern(fdb_config fconfig)778 void e2e_robust_pattern(fdb_config fconfig) {
779 
780     int i;
781     int n_writers = 10;
782     int n_scanners = 10;
783     storage_t **st = alca(storage_t *, n_writers);
784     storage_t **st2 = alca(storage_t *, n_writers);
785     checkpoint_t *verification_checkpoint = alca(checkpoint_t, n_writers);
786     idx_prams_t *index_params = alca(idx_prams_t, n_writers);
787     fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
788     thread_t *tid_sc = alca(thread_t, n_scanners);
789     void **thread_ret_sc = alca(void *, n_scanners);
790     thread_t *tid_wr = alca(thread_t, n_writers);
791     void **thread_ret_wr = alca(void *, n_writers);
792     fdb_kvs_handle **scan_kv = alca(fdb_kvs_handle *, n_scanners);
793     thread_t c_tid, i_tid;
794     void *c_ret, *i_ret;
795 
796     memleak_start();
797 
798     // init
799     rm_storage_fs();
800 
801     // init storage handles
802     // the nth writer is commit handler
803     for (i = 0;i < n_writers; ++i) {
804         gen_index_params(&index_params[i]);
805         memset(&verification_checkpoint[i], 0, sizeof(checkpoint_t));
806         st[i] = init_storage(&fconfig, &fconfig, &kvs_config,
807                 &index_params[i], &verification_checkpoint[i], true);
808         st2[i] = init_storage(&fconfig, &fconfig, &kvs_config,
809                 &index_params[i], &verification_checkpoint[i], true);
810         memcpy(st2[i]->keyspace, st[i]->keyspace, KEYSPACE_LEN);
811     }
812 
813     // load init data
814     for (i = 0; i < 100; ++i) {
815         load_persons(st[0]);
816     }
817 
818     thread_create(&c_tid, compact_thread, (void*)&fconfig);
819     thread_create(&i_tid, iterate_thread, (void*)&fconfig);
820 
821     for (int n = 0; n < 10; ++n) {
822         // start writer threads
823         for (i = 0; i < n_writers - 1; ++i) {
824             st[i]->verify_set = false;
825             thread_create(&tid_wr[i], writer_thread, (void*)st[i]);
826         }
827 
828 
829         // start scanner threads
830         for (i = 0; i < n_scanners; ++i) {
831             scan_kv[i] = scan(st2[i], NULL);
832             thread_create(&tid_sc[i], scan_thread, (void*)scan_kv[i]);
833         }
834 
835         e2e_fdb_commit(st[n_writers-1]->main, true);
836 
837         // join scan threads
838         for (i = 0; i < n_scanners; ++i) {
839             thread_join(tid_sc[i], &thread_ret_sc[i]);
840             fdb_kvs_close(scan_kv[i]);
841         }
842 
843         // join writer threads
844         for (i = 0; i < n_writers - 1; ++i) {
845             thread_join(tid_wr[i], &thread_ret_wr[i]);
846             update_index(st[i], false);
847         }
848         e2e_fdb_commit(st[n_writers - 1]->main, false);
849     }
850 
851     thread_join(c_tid, &c_ret);
852     thread_join(i_tid, &i_ret);
853 
854     for (i = 0; i < n_writers; ++i) {
855         // teardown
856         e2e_fdb_close(st2[i]);
857         e2e_fdb_close(st[i]);
858     }
859     fdb_shutdown();
860 
861     memleak_end();
862 }
863 
864 /*
865  * concurrent scan pattern:
866  *   start n_scanners and n_writers
867  *   scanners share in-mem snapshots and writers use copies of storage_t
868  */
e2e_concurrent_scan_pattern(int n_checkpoints, int n_scanners, int n_writers, fdb_config fconfig, bool walflush)869 void e2e_concurrent_scan_pattern(int n_checkpoints, int n_scanners, int n_writers,
870                                  fdb_config fconfig, bool walflush) {
871 
872     int n, i;
873     storage_t **st = alca(storage_t *, n_writers);
874     storage_t **st2 = alca(storage_t *, n_writers);
875     checkpoint_t *verification_checkpoint = alca(checkpoint_t, n_writers);
876     idx_prams_t *index_params = alca(idx_prams_t, n_writers);
877     fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
878     thread_t *tid_sc = alca(thread_t, n_scanners);
879     void **thread_ret_sc = alca(void *, n_scanners);
880     thread_t *tid_wr = alca(thread_t, n_writers);
881     void **thread_ret_wr = alca(void *, n_writers);
882     fdb_kvs_handle **scan_kv = alca(fdb_kvs_handle *, n_scanners);
883     n_checkpoints = n_checkpoints * LOAD_FACTOR;
884 
885     memleak_start();
886 
887     // init
888     rm_storage_fs();
889 
890 
891     // init storage handles
892     for (i = 0; i < n_writers; ++i) {
893         gen_index_params(&index_params[i]);
894         memset(&verification_checkpoint[i], 0, sizeof(checkpoint_t));
895         st[i] = init_storage(&fconfig, &fconfig, &kvs_config,
896                 &index_params[i], &verification_checkpoint[i], walflush);
897         st2[i] = init_storage(&fconfig, &fconfig, &kvs_config,
898                 &index_params[i], &verification_checkpoint[i], walflush);
899         memcpy(st2[i]->keyspace, st[i]->keyspace, KEYSPACE_LEN);
900     }
901 
902     // load init data
903     start_checkpoint(st[0]);
904     for (i = 0; i < 100; ++i) {
905         load_persons(st[0]);
906     }
907     end_checkpoint(st[0]);
908     verify_db(st[0]);
909 
910     for (n = 0; n < n_checkpoints; ++n) {
911 
912         // start writer threads
913         for (i = 0; i < n_writers; ++i) {
914             st[i]->verify_set = false;
915             start_checkpoint(st[i]);
916             thread_create(&tid_wr[i], writer_thread, (void*)st[i]);
917         }
918 
919         // start scanner threads
920         for (i = 0; i < n_scanners; ++i) {
921             scan_kv[i] = scan(st2[i], NULL);
922             thread_create(&tid_sc[i], scan_thread, (void*)scan_kv[i]);
923         }
924 
925         // join scan threads
926         for (i = 0; i < n_scanners; ++i) {
927             thread_join(tid_sc[i], &thread_ret_sc[i]);
928             fdb_kvs_close(scan_kv[i]);
929         }
930 
931         // join writer threads
932         for (i = 0; i < n_writers; ++i) {
933             thread_join(tid_wr[i], &thread_ret_wr[i]);
934             end_checkpoint(st[i]);
935             verify_db(st[i]);
936             update_index(st[i], true);
937         }
938 
939     }
940 
941     for (i = 0; i < n_writers; ++i) {
942         // teardown
943         e2e_fdb_close(st2[i]);
944         e2e_fdb_close(st[i]);
945     }
946     fdb_shutdown();
947 
948     memleak_end();
949 }
950 
951 
e2e_index_basic_test()952 void e2e_index_basic_test() {
953 
954     TEST_INIT();
955     memleak_start();
956 
957     randomize();
958     // configure
959     fdb_config fconfig = fdb_get_default_config();
960     fconfig.wal_threshold = 1024;
961     fconfig.flags = FDB_OPEN_FLAG_CREATE;
962     fconfig.durability_opt = FDB_DRB_ASYNC;
963     fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
964 
965     // test
966     e2e_kvs_index_pattern(1, fconfig, true, false); // normal commit
967     e2e_kvs_index_pattern(1, fconfig, true, true);  // wal commit
968 
969     memleak_end();
970     TEST_RESULT("TEST: e2e index basic test");
971 }
972 
e2e_index_walflush_test_no_deletes_auto_compact()973 void e2e_index_walflush_test_no_deletes_auto_compact() {
974 
975     TEST_INIT();
976     memleak_start();
977 
978     randomize();
979     // configure
980     fdb_config fconfig = fdb_get_default_config();
981     fconfig.wal_threshold = 1024;
982     fconfig.flags = FDB_OPEN_FLAG_CREATE;
983     fconfig.durability_opt = FDB_DRB_ASYNC;
984     fconfig.compaction_mode=FDB_COMPACTION_AUTO;
985     fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
986 
987     // test
988     e2e_kvs_index_pattern(10, fconfig, false, true);
989 
990     memleak_end();
991     TEST_RESULT("TEST: e2e index walflush test no deletes auto compact");
992 }
993 
e2e_index_walflush_autocompact_test()994 void e2e_index_walflush_autocompact_test() {
995 
996     TEST_INIT();
997     memleak_start();
998 
999     randomize();
1000     // opts
1001     fdb_config fconfig = fdb_get_default_config();
1002     fconfig.wal_threshold = 1024;
1003     fconfig.flags = FDB_OPEN_FLAG_CREATE;
1004     fconfig.durability_opt = FDB_DRB_ASYNC;
1005     fconfig.compaction_mode=FDB_COMPACTION_AUTO;
1006     fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1007 
1008     // test
1009     e2e_kvs_index_pattern(2, fconfig, true, true);
1010 
1011     memleak_end();
1012     TEST_RESULT("TEST: e2e index walflush autocompact test");
1013 
1014 }
1015 
e2e_index_normal_commit_autocompact_test()1016 void e2e_index_normal_commit_autocompact_test() {
1017 
1018     TEST_INIT();
1019     memleak_start();
1020 
1021     randomize();
1022     // opts
1023     fdb_config fconfig = fdb_get_default_config();
1024     fconfig.wal_threshold = 1024;
1025     fconfig.flags = FDB_OPEN_FLAG_CREATE;
1026     fconfig.durability_opt = FDB_DRB_NONE;
1027     fconfig.compaction_mode=FDB_COMPACTION_AUTO;
1028     fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1029 
1030     // test
1031     e2e_kvs_index_pattern(2, fconfig, true, false);
1032 
1033     memleak_end();
1034     TEST_RESULT("TEST: e2e index normal commit autocompact test");
1035 }
1036 
e2e_async_manual_compact_test()1037 void e2e_async_manual_compact_test() {
1038     TEST_INIT();
1039     memleak_start();
1040 
1041     randomize();
1042     // opts
1043     fdb_config fconfig = fdb_get_default_config();
1044     fconfig.wal_threshold = 1024;
1045     fconfig.flags = FDB_OPEN_FLAG_CREATE;
1046     fconfig.durability_opt = FDB_DRB_ASYNC;
1047 
1048     // test
1049     e2e_async_compact_pattern(10, fconfig, false, true);
1050     memleak_end();
1051     TEST_RESULT("TEST: e2e async manual compact test");
1052 }
1053 
1054 
e2e_concurrent_scan_test()1055 void e2e_concurrent_scan_test() {
1056     TEST_INIT();
1057     memleak_start();
1058 
1059     randomize();
1060     fdb_config fconfig = fdb_get_default_config();
1061     fconfig.wal_threshold = 1024;
1062     fconfig.flags = FDB_OPEN_FLAG_CREATE;
1063     fconfig.compaction_mode=FDB_COMPACTION_AUTO;
1064     fconfig.durability_opt = FDB_DRB_ASYNC;
1065     fconfig.purging_interval = 30; // retain deleted docs for iteration
1066     fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1067 
1068     // test
1069     e2e_concurrent_scan_pattern(3, 5, 5, fconfig, true);
1070     // normal_commit
1071     e2e_concurrent_scan_pattern(3, 5, 5, fconfig, false);
1072 
1073     memleak_end();
1074     TEST_RESULT("TEST: e2e concurrent scan");
1075 }
1076 
e2e_robust_test()1077 void e2e_robust_test() {
1078     TEST_INIT();
1079 
1080     randomize();
1081     fdb_config fconfig = fdb_get_default_config();
1082     fconfig.wal_threshold = 1024;
1083     fconfig.flags = FDB_OPEN_FLAG_CREATE;
1084     fconfig.compaction_mode=FDB_COMPACTION_AUTO;
1085     fconfig.durability_opt = FDB_DRB_ASYNC;
1086     fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1087     // to allow iterators to validate docs across async compaction
1088     // specify purging_interval so deleted docs are not dropped by
1089     // compactor immediately..
1090     fconfig.purging_interval = 80;
1091 
1092     // test
1093     e2e_robust_pattern(fconfig);
1094 
1095     TEST_RESULT("TEST: e2e robust test");
1096 }
1097 
e2e_scan_compact_upto_test()1098 void e2e_scan_compact_upto_test() {
1099     TEST_INIT();
1100     memleak_start();
1101 
1102     randomize();
1103     int n, i;
1104     void *thread_ret;
1105     bool walflush = true;
1106     int n_checkpoints =  LOAD_FACTOR;
1107 
1108     fdb_config fconfig = fdb_get_default_config();
1109     fconfig.flags = FDB_OPEN_FLAG_CREATE;
1110     fconfig.compaction_threshold = 0;
1111     fconfig.durability_opt = FDB_DRB_ASYNC;
1112     fconfig.purging_interval = 30; // retain deleted docs for iteration
1113 
1114     storage_t *st;
1115     checkpoint_t verification_checkpoint;
1116     idx_prams_t index_params;
1117     fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1118     fdb_kvs_info info;
1119     fdb_kvs_handle *snap_db;
1120     fdb_status status;
1121     thread_t tid;
1122 
1123     // init
1124     rm_storage_fs();
1125     gen_index_params(&index_params);
1126     memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
1127 
1128     // setup
1129     st = init_storage(&fconfig, &fconfig, &kvs_config,
1130                       &index_params, &verification_checkpoint, walflush);
1131 
1132     // create compaction thread
1133     thread_create(&tid, compact_upto_thread, (void*)&fconfig);
1134 
1135     // test
1136     for (n = 0; n < n_checkpoints; ++n) {
1137         load_persons(st);
1138         for (i = 0; i < 100; ++i) {
1139             load_persons(st);
1140             e2e_fdb_commit(st->main, walflush);
1141             status = fdb_get_kvs_info(st->all_docs, &info);
1142             TEST_CHK(status == FDB_RESULT_SUCCESS);
1143             fdb_snapshot_open(st->all_docs, &snap_db, info.last_seqnum);
1144             TEST_CHK(status == FDB_RESULT_SUCCESS);
1145             update_index(st, true);
1146             verify_db(st);
1147             fdb_kvs_close(snap_db);
1148         }
1149     }
1150 
1151     thread_join(tid, &thread_ret);
1152 
1153     // teardown
1154     e2e_fdb_shutdown(st);
1155     memleak_end();
1156     TEST_RESULT("TEST: e2e concurrent compact upto");
1157 }
1158 
kv_thread(void *args)1159 void *kv_thread(void *args) {
1160     // This thread stores and indexes person docs
1161 
1162     TEST_INIT();
1163     uint64_t i;
1164     uint64_t num_markers;
1165     fdb_snapshot_info_t *markers;
1166     fdb_status status;
1167 
1168     storage_t *st = (storage_t *)args;
1169     status = fdb_get_all_snap_markers(st->main, &markers,
1170                                       &num_markers);
1171     TEST_CHK(status == FDB_RESULT_SUCCESS);
1172 
1173     for (i = 0; i < num_markers; i += 10) {
1174         load_persons(st);
1175     }
1176     fdb_free_snap_markers(markers, num_markers);
1177     return NULL;
1178 }
1179 
1180 // This function identifies the latest superblock
1181 // add adds garbage to it.
corrupt_latest_superblock(const char* filename)1182 void corrupt_latest_superblock(const char* filename) {
1183     /*
1184      * Note that each block is 4096 bytes.
1185      * - There are 4 superblocks which constitute the
1186      *   first 4 blocks of the file.
1187      * - The 8 bytes following the first 8 bytes of a
1188      *   superblock contains the block revision num.
1189      */
1190     struct filemgr_ops *ops = get_filemgr_ops();
1191     int64_t offset = 8;
1192     int latest_sb = 0;
1193     int fd = ops->open(filename, O_RDWR, 0644);
1194     uint64_t buf, highest_rev = 0;
1195     for (int i = 0; i < 4; ++i) {    // num of superblocks: 4
1196         if (ops->pread(fd, &buf, sizeof(uint64_t), offset) == sizeof(uint64_t)) {
1197             buf = _endian_decode(buf);
1198             assert(buf != highest_rev);
1199             if (buf > highest_rev) {
1200                 highest_rev = buf;
1201                 latest_sb = i;
1202             }
1203             offset += 4096;
1204         } else {
1205             fprintf(stderr, "Warning: Could not find the latest superblock!\n");
1206             ops->close(fd);
1207             return;
1208         }
1209     }
1210     // Write garbage at a random offset that would fall within
1211     // the latest super block
1212     uint64_t garbage = rand();
1213     offset = latest_sb * 4096 + (rand() % (4095 - sizeof("garbage")));
1214     if (ops->pwrite(fd, &garbage, sizeof(garbage), offset) != sizeof(garbage)) {
1215         fprintf(stderr,
1216                 "\nWarning: Could not write garbage into the superblock!");
1217     }
1218     ops->close(fd);
1219 }
1220 
e2e_crash_recover_test(bool do_rollback)1221 void e2e_crash_recover_test(bool do_rollback) {
1222 
1223     TEST_INIT();
1224     memleak_start();
1225 
1226     randomize();
1227 
1228     bool walflush = true;
1229     int i, n;
1230     int n_checkpoints =  2;
1231 
1232     rm_storage_fs();
1233     storage_t *st;
1234     checkpoint_t verification_checkpoint;
1235     idx_prams_t index_params;
1236     fdb_kvs_handle *snap_db;
1237     fdb_kvs_info info;
1238     fdb_status status;
1239 
1240     fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1241     fdb_config fconfig = fdb_get_default_config();
1242     fconfig.flags = FDB_OPEN_FLAG_CREATE;
1243     fconfig.compaction_threshold = 0;
1244     fconfig.durability_opt = FDB_DRB_ASYNC;
1245     fconfig.purging_interval = 30; // retain deleted docs for iteration
1246 
1247     gen_index_params(&index_params);
1248     memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
1249     st = init_storage(&fconfig, &fconfig, &kvs_config,
1250                       &index_params, &verification_checkpoint, walflush);
1251 
1252     for (i = 0; i < 100; ++i) {
1253         load_persons(st);
1254         e2e_fdb_commit(st->main, true);
1255     }
1256 
1257     fdb_seqnum_t seqno;
1258     uint64_t k, num_markers;
1259     fdb_snapshot_info_t *markers;
1260 
1261     if (do_rollback) {
1262         status = fdb_get_all_snap_markers(st->main, &markers,
1263                                           &num_markers);
1264         TEST_CHK(status == FDB_RESULT_SUCCESS);
1265 
1266         for (k = 0; k < num_markers; k += 10) {
1267             // rollback to each marker
1268             seqno = markers[k].kvs_markers[0].seqnum;
1269             status = fdb_rollback(&st->all_docs, seqno);
1270             TEST_CHK(status == FDB_RESULT_SUCCESS);
1271         }
1272         fdb_free_snap_markers(markers, num_markers);
1273     } else {
1274         status = fdb_get_all_snap_markers(st->main, &markers,
1275                                           &num_markers);
1276         TEST_CHK(status == FDB_RESULT_SUCCESS);
1277 
1278         for (k = 0; k < num_markers; k += 10) {
1279             load_persons(st);
1280         }
1281         fdb_free_snap_markers(markers, num_markers);
1282     }
1283 
1284     // close storage
1285     e2e_fdb_close(st);
1286 
1287     corrupt_latest_superblock(E2EDB_MAIN);
1288 
1289     // reopen storage
1290     gen_index_params(&index_params);
1291     memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
1292     st = init_storage(&fconfig, &fconfig, &kvs_config,
1293                       &index_params, &verification_checkpoint, walflush);
1294 
1295     // run verifiable workload
1296     for (n = 0; n < n_checkpoints; ++n) {
1297         load_persons(st);
1298         for (i = 0; i < 100; ++i) {
1299             load_persons(st);
1300             e2e_fdb_commit(st->main, walflush);
1301             status = fdb_get_kvs_info(st->all_docs, &info);
1302             TEST_CHK(status == FDB_RESULT_SUCCESS);
1303             fdb_snapshot_open(st->all_docs, &snap_db, info.last_seqnum);
1304             TEST_CHK(status == FDB_RESULT_SUCCESS);
1305             update_index(st, true);
1306             verify_db(st);
1307             fdb_kvs_close(snap_db);
1308         }
1309     }
1310 
1311     // teardown
1312     e2e_fdb_shutdown(st);
1313     memleak_end();
1314     if (do_rollback) {
1315         TEST_RESULT("TEST: e2e crash recover test with rollback");
1316     } else {
1317         TEST_RESULT("TEST: e2e crash recover test");
1318     }
1319 }
1320 
e2e_concurrent_reader_writer(bool do_compaction)1321 void e2e_concurrent_reader_writer(bool do_compaction) {
1322     TEST_INIT();
1323     memleak_start();
1324 
1325     randomize();
1326     fdb_config fconfig = fdb_get_default_config();
1327     // to allow iterators to validate docs across async compaction
1328     // specify purging_interval so deleted docs are not dropped by
1329     // compactor immediately..
1330     fconfig.purging_interval = 80;
1331     fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1332 
1333     // init
1334     rm_storage_fs();
1335 
1336     // test
1337     int i;
1338     int n_writers = 2;
1339     int n_scanners = 2;
1340     storage_t **st = alca(storage_t *, n_writers);
1341     storage_t **st2 = alca(storage_t *, n_writers);
1342     checkpoint_t *verification_checkpoint = alca(checkpoint_t, n_writers);
1343     idx_prams_t *index_params = alca(idx_prams_t, n_writers);
1344     fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1345     thread_t *tid_sc = alca(thread_t, n_scanners);
1346     void **thread_ret_sc = alca(void *, n_scanners);
1347     thread_t *tid_wr = alca(thread_t, n_writers);
1348     void **thread_ret_wr = alca(void *, n_writers);
1349     fdb_kvs_handle **scan_kv = alca(fdb_kvs_handle *, n_scanners);
1350     thread_t c_tid;
1351     void *c_ret;
1352 
1353     // init storage handles
1354     // the nth writer is commit handler
1355     for (i = 0; i < n_writers; ++i) {
1356         gen_index_params(&index_params[i]);
1357         memset(&verification_checkpoint[i], 0, sizeof(checkpoint_t));
1358         st[i] = init_storage(&fconfig, &fconfig, &kvs_config,
1359                              &index_params[i], &verification_checkpoint[i],
1360                              true);
1361         st2[i] = init_storage(&fconfig, &fconfig, &kvs_config,
1362                               &index_params[i], &verification_checkpoint[i],
1363                               true);
1364         memcpy(st2[i]->keyspace, st[i]->keyspace, KEYSPACE_LEN);
1365     }
1366 
1367     if (do_compaction) {
1368         thread_create(&c_tid, compact_thread, (void*)&fconfig);
1369     }
1370 
1371     // start new item thread
1372     thread_create(&tid_wr[0], seq_writer_thread, (void*)st[0]);
1373     // start update thread
1374     thread_create(&tid_wr[1], update_thread, (void*)st[1]->index1);
1375 
1376     // start disk-snapshot thread
1377     scan_kv[0] = scan(st2[0], NULL);
1378     thread_create(&tid_sc[0], disk_read_thread, (void*)st[1]);
1379     // start mem-snapshot thread
1380     scan_kv[1] = scan(st2[1], NULL);
1381     thread_create(&tid_sc[1], scan_thread, (void*)scan_kv[1]);
1382 
1383     // join threads
1384     thread_join(tid_wr[0], &thread_ret_wr[0]);
1385     thread_join(tid_wr[1], &thread_ret_wr[1]);
1386     thread_join(tid_sc[1], &thread_ret_sc[1]);
1387     thread_join(tid_sc[0], &thread_ret_sc[0]);
1388     if (do_compaction) {
1389         thread_join(c_tid, &c_ret);
1390     }
1391 
1392     // commit
1393     e2e_fdb_commit(st[0]->main, false);
1394 
1395     for (i = 0; i < n_writers; ++i) {
1396         // teardown
1397         e2e_fdb_close(st2[i]);
1398         e2e_fdb_close(st[i]);
1399     }
1400     fdb_shutdown();
1401 
1402     memleak_end();
1403     if (do_compaction) {
1404         TEST_RESULT("TEST: e2e concurrent reader writer test with compaction");
1405     } else {
1406         TEST_RESULT("TEST: e2e concurrent reader writer test");
1407     }
1408 }
1409 
e2e_multi_dbfile_concurrent_wr()1410 void e2e_multi_dbfile_concurrent_wr() {
1411     TEST_INIT();
1412     memleak_start();
1413     int i, r;
1414     int nf = 16;
1415     char buf[64];
1416     thread_t *tid_wr = alca(thread_t, nf);
1417     void **thread_ret_wr = alca(void *, nf);
1418     fdb_file_handle **dbfiles = alca(fdb_file_handle*, nf);
1419     fdb_kvs_handle **kvs = alca(fdb_kvs_handle*, nf);
1420     fdb_status status;
1421     randomize();
1422     rm_storage_fs();
1423     r = system(SHELL_DEL" e2edb_ex* > errorlog.txt");
1424     (void)r;
1425 
1426     // create dbfiles
1427     fdb_config fconfig = fdb_get_default_config();
1428     fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1429     fconfig.compaction_mode  = FDB_COMPACTION_MANUAL;
1430 
1431     for (i = 0; i < nf; i++) {
1432         sprintf(buf, "e2edb_ex%d", i);
1433         status = fdb_open(&dbfiles[i], buf, &fconfig);
1434         TEST_CHK(status == FDB_RESULT_SUCCESS);
1435         status = fdb_kvs_open_default(dbfiles[i], &kvs[i], &kvs_config);
1436         TEST_CHK(status == FDB_RESULT_SUCCESS);
1437 
1438         // run update thread on each file to enter reuse
1439         thread_create(&tid_wr[i], update_thread, (void*)kvs[i]);
1440     }
1441 
1442     // join and commit
1443     for (i = 0; i < nf; i++) {
1444         thread_join(tid_wr[i], &thread_ret_wr[i]);
1445         fdb_commit(dbfiles[i], FDB_COMMIT_MANUAL_WAL_FLUSH);
1446         fdb_close(dbfiles[i]);
1447     }
1448 
1449     fdb_shutdown();
1450     memleak_end();
1451     TEST_RESULT("TEST: e2e multi dbfile concurrent wr");
1452 }
1453 
e2e_multi_kvs_concurrent_wr()1454 void e2e_multi_kvs_concurrent_wr() {
1455     TEST_INIT();
1456     memleak_start();
1457     int i, j, r;
1458     int nf = 8;
1459     char buf[64];
1460     char body[64];
1461 
1462     thread_t *tid_wr = alca(thread_t, nf);
1463     void **thread_ret_wr = alca(void *, nf);
1464     fdb_file_handle *dbfile;
1465     fdb_kvs_handle **kvs = alca(fdb_kvs_handle*, nf);
1466     fdb_doc *rdoc = NULL;
1467     fdb_status status;
1468     randomize();
1469     rm_storage_fs();
1470     r = system(SHELL_DEL" e2edb_main > errorlog.txt");
1471     fdb_iterator *it;
1472     (void)r;
1473 
1474     // create dbfile
1475     fdb_config fconfig = fdb_get_default_config();
1476     fconfig.num_keeping_headers = 10;
1477     fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1478     fconfig.compaction_mode  = FDB_COMPACTION_MANUAL;
1479     fdb_open(&dbfile, "e2edb_main", &fconfig);
1480 
1481     // open dbfiles
1482     for (i = 0; i < nf; i++) {
1483         sprintf(buf, "e2edb_kv%d", i);
1484         status = fdb_kvs_open(dbfile, &kvs[i], buf, &kvs_config);
1485         TEST_CHK(status == FDB_RESULT_SUCCESS);
1486     }
1487 
1488     // initial commits
1489     for (j = 0; j < 10; j++) {
1490         for (i = 0; i < nf; i++) {
1491             sprintf(buf, "%dkey%d", j, i);
1492             sprintf(body, "commit%d", j);
1493             status = fdb_set_kv(kvs[i], buf, strlen(buf), body, strlen(body));
1494             TEST_CHK(status == FDB_RESULT_SUCCESS);
1495         }
1496         status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1497         TEST_CHK(status == FDB_RESULT_SUCCESS);
1498     }
1499 
1500     // run update thread on each file to enter reuse
1501     for (i = 0; i < nf; i++) {
1502         thread_create(&tid_wr[i], update_thread, (void*)kvs[i]);
1503     }
1504 
1505     // join and rollback
1506     for (i = 0; i < nf; i++) {
1507         thread_join(tid_wr[i], &thread_ret_wr[i]);
1508         status = fdb_rollback(&kvs[i], 7);
1509         TEST_CHK(status == FDB_RESULT_SUCCESS);
1510     }
1511 
1512     status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1513     TEST_CHK(status == FDB_RESULT_SUCCESS);
1514     for (i = 0; i < nf; i++) {
1515         // verify rollback kvs
1516         status = fdb_iterator_init(kvs[i], &it, NULL, 0,
1517                                    NULL, 0, FDB_ITR_NONE);
1518         TEST_CHK(status == FDB_RESULT_SUCCESS);
1519         j = 0;
1520         do {
1521             status = fdb_iterator_get(it, &rdoc);
1522             TEST_CHK (status == FDB_RESULT_SUCCESS);
1523             fdb_doc_free(rdoc);
1524             rdoc=NULL;
1525             j++;
1526         } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
1527         TEST_CHK(j==7);
1528         fdb_iterator_close(it);
1529     }
1530 
1531     fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1532     fdb_close(dbfile);
1533     fdb_shutdown();
1534 
1535     memleak_end();
1536 
1537     r = system(SHELL_DEL" e2edb_main > errorlog.txt");
1538     (void)r;
1539 
1540     TEST_RESULT("TEST: e2e multi kvs concurrent wr test");
1541 }
1542 
e2e_multi_kvs_concurrent_wr_compact()1543 void e2e_multi_kvs_concurrent_wr_compact() {
1544     TEST_INIT();
1545     memleak_start();
1546     int i, j, r;
1547     int nf = 4;
1548     char buf[64];
1549 
1550     thread_t *tid_wr = alca(thread_t, nf*nf);
1551     void **thread_ret_wr = alca(void *, nf*nf);
1552     fdb_file_handle **dbfiles = alca(fdb_file_handle*, nf);
1553     fdb_kvs_handle **kvs = alca(fdb_kvs_handle*, nf*nf);
1554     fdb_status status;
1555     randomize();
1556     rm_storage_fs();
1557     r = system(SHELL_DEL" e2edb_ex* > errorlog.txt");
1558     (void)r;
1559 
1560     // create dbfile
1561     fdb_config fconfig = fdb_get_default_config();
1562     fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1563     fconfig.compaction_mode  = FDB_COMPACTION_MANUAL;
1564 
1565     // open dbfiles
1566     for (i = 0; i < nf; ++i) {
1567         sprintf(buf, "e2edb_ex%d", i);
1568         status = fdb_open(&dbfiles[i], buf, &fconfig);
1569         TEST_CHK(status == FDB_RESULT_SUCCESS);
1570 
1571         for (j = i*nf; j < (i * nf + nf); ++j) {
1572             sprintf(buf, "e2edb_kv%d", j);
1573             status = fdb_kvs_open(dbfiles[i], &kvs[j], buf, &kvs_config);
1574             TEST_CHK(status == FDB_RESULT_SUCCESS);
1575         }
1576     }
1577 
1578     // initial commits
1579     for (i = 0; i < nf; i++) {
1580         status = fdb_commit(dbfiles[i], FDB_COMMIT_MANUAL_WAL_FLUSH);
1581         TEST_CHK(status == FDB_RESULT_SUCCESS);
1582     }
1583 
1584     // run update thread on each file to enter reuse
1585     for (i = 0; i < (nf * nf); i++) {
1586         thread_create(&tid_wr[i], update_thread, (void*)kvs[i]);
1587     }
1588 
1589     // join update threads
1590     for (i = 0; i < (nf * nf); i++) {
1591       thread_join(tid_wr[i], &thread_ret_wr[i]);
1592       TEST_CHK(status == FDB_RESULT_SUCCESS);
1593     }
1594 
1595     // more commits
1596     for (i = 0; i < nf; i++) {
1597         status = fdb_commit(dbfiles[i], FDB_COMMIT_MANUAL_WAL_FLUSH);
1598         TEST_CHK(status == FDB_RESULT_SUCCESS);
1599     }
1600 
1601     for (i = 0; i < nf; i++) {
1602         fdb_close(dbfiles[i]);
1603     }
1604     fdb_shutdown();
1605     memleak_end();
1606 
1607     TEST_RESULT("TEST: e2e multi kvs concurrent wr test with compaction");
1608 }
1609 
main()1610 int main() {
1611 
1612     // Note: following tests are temporarily disabled due to
1613     // the keeping header violation issue by rollback/snapshot API.
1614     //
1615     //   - e2e_multi_kvs_concurrent_wr();
1616 
1617     /* Multiple kvstores under reuse with rollback */
1618     // e2e_multi_kvs_concurrent_wr();
1619 
1620     /* Multiple kvstores under reuse with rollback and compaction */
1621     e2e_multi_kvs_concurrent_wr_compact();
1622 
1623     /* Multiple dbfiles after stale block reuse */
1624     e2e_multi_dbfile_concurrent_wr();
1625 
1626     /* Concurrent readers and writers after stale block reuse, without
1627        and with compaction */
1628     e2e_concurrent_reader_writer(false);
1629     e2e_concurrent_reader_writer(true);     // w.compaction
1630 
1631     e2e_robust_test();
1632     e2e_concurrent_scan_test();
1633     e2e_async_manual_compact_test();
1634     e2e_index_basic_test();
1635     e2e_index_walflush_test_no_deletes_auto_compact();
1636     e2e_index_walflush_autocompact_test();
1637     e2e_index_normal_commit_autocompact_test();
1638 
1639     /* Data loading with concurrent compaction */
1640     e2e_scan_compact_upto_test();
1641 
1642     /* Crash recovery with e2e workload, without and with a rollback */
1643     e2e_crash_recover_test(false);
1644     e2e_crash_recover_test(true);           // w.rollback
1645 
1646     return 0;
1647 }
1648