1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2010 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <stdint.h>
22 #include <time.h>
23 #include <errno.h>
24 #if !defined(WIN32) && !defined(_WIN32)
25 #include <unistd.h>
26 #endif
27
28 #include "filemgr_ops.h"
29
30 #include "libforestdb/forestdb.h"
31 #include "test.h"
32 #include "e2espec.h"
33
load_persons(storage_t *st)34 void load_persons(storage_t *st) {
35 int i, n=100;
36 person_t p;
37
38 // store and index person docs
39 for (i = 0; i < n; ++i) {
40 gen_person(&p);
41 e2e_fdb_set_person(st, &p);
42 }
43
44 #ifdef __DEBUG_E2E
45 printf("[%s] load persons: %03d docs created\n",st->keyspace, n);
46 #endif
47 }
48
delete_persons(storage_t *st)49 void delete_persons(storage_t *st) {
50 TEST_INIT();
51
52 fdb_doc *rdoc = NULL;
53 fdb_status status;
54 fdb_iterator *it;
55 person_t *p;
56 int i, n = 0;
57
58 // delete every 5th doc
59 status = fdb_iterator_sequence_init(st->all_docs, &it, 0, 0,
60 FDB_ITR_NO_DELETES);
61 TEST_CHK(status == FDB_RESULT_SUCCESS);
62
63 i = 0;
64 do {
65 status = fdb_iterator_get(it, &rdoc);
66 TEST_CHK (status == FDB_RESULT_SUCCESS);
67 if ((i % 5) == 0) {
68
69 p = (person_t *)rdoc->body;
70 // ensure the requester has created this key
71 if (strcmp(p->keyspace, st->keyspace) == 0) {
72 e2e_fdb_del_person(st, p);
73 n++;
74 }
75
76 }
77 fdb_doc_free(rdoc);
78 rdoc=NULL;
79 i++;
80 } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
81
82 fdb_iterator_close(it);
83
84 #ifdef __DEBUG_E2E
85 sprintf(rbuf, "[%s] delete persons: %03d docs deleted\n",st->keyspace, n);
86 TEST_RESULT(rbuf);
87 #endif
88 }
89
90 /*
91 * reset params used to index storage
92 * delete old docs that are part of new index
93 * so that they are not included at verification time
94 */
update_index(storage_t *st, bool checkpointing)95 void update_index(storage_t *st, bool checkpointing) {
96
97 TEST_INIT();
98
99 fdb_iterator *it;
100 person_t *p = NULL;
101 fdb_doc *rdoc = NULL;
102 fdb_status status;
103 int n = 0;
104 size_t vallen;
105 char *mink = st->index_params->min;
106 char *maxk = st->index_params->max;
107 char rbuf[256];
108
109 // change storage index range
110 reset_storage_index(st);
111
112 if (checkpointing) {
113 start_checkpoint(st);
114 }
115 status = fdb_iterator_init(st->index1, &it, mink, 12,
116 maxk, 12, FDB_ITR_NO_DELETES);
117 if (status != FDB_RESULT_SUCCESS) {
118 // no items within min max range
119 TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
120 }
121
122
123
124 do {
125 status = fdb_iterator_get(it, &rdoc);
126 if (status == FDB_RESULT_SUCCESS) {
127 status = fdb_get_kv(st->all_docs,
128 rdoc->body, rdoc->bodylen,
129 (void **)&p, &vallen);
130 if (status == FDB_RESULT_SUCCESS) {
131 if (strcmp(p->keyspace, st->keyspace) == 0) {
132 e2e_fdb_del_person(st, p);
133 n++;
134 }
135 free(p);
136 p=NULL;
137 }
138 fdb_doc_free(rdoc);
139 rdoc=NULL;
140 }
141 } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
142
143 if (checkpointing) {
144 end_checkpoint(st);
145 }
146
147 fdb_iterator_close(it);
148
149 // reset verification chkpoint
150 st->v_chk->num_indexed = 0;
151 st->v_chk->sum_age_indexed = 0;
152
153 sprintf(rbuf, "update index: %03d docs deleted", n);
154 #ifdef __DEBUG_E2E
155 TEST_RESULT(rbuf);
156 #endif
157 }
158
159 // --- verify ----
160 // 1. check that num of keys within index are correct
161 // 2. check that age index total is correct for specified range
162 // 3. check that doc count is as expected
verify_db(storage_t *st)163 void verify_db(storage_t *st) {
164
165 TEST_INIT();
166
167 checkpoint_t *db_checkpoint = create_checkpoint(st, END_CHECKPOINT);
168 int db_ndocs = db_checkpoint->ndocs;
169 int exp_ndocs = st->v_chk->ndocs;
170 int exp_nidx = st->v_chk->num_indexed;
171 int db_nidx = db_checkpoint->num_indexed;
172 int db_suma = db_checkpoint->sum_age_indexed;
173 int exp_suma = st->v_chk->sum_age_indexed;
174 fdb_kvs_info info;
175 char rbuf[256];
176
177 e2e_fdb_commit(st->main, st->walflush);
178
179 fdb_get_kvs_info(st->index1, &info);
180
181 #ifdef __DEBUG_E2E
182 int val1, val2;
183 fdb_iterator *it;
184 fdb_doc *rdoc = NULL;
185 if (db_ndocs != exp_ndocs) {
186 // for debugging: currently inaccurate for concurrency patterns
187 fdb_get_kvs_info(st->all_docs, &info);
188 val1 = info.doc_count;
189 (void)val1;
190 val2 = 0;
191 fdb_iterator_init(st->index1, &it, NULL, 0,
192 NULL, 0, FDB_ITR_NONE);
193 do {
194 fdb_iterator_get(it, &rdoc);
195 if (!rdoc->deleted) {
196 val2++;
197 }
198 fdb_doc_free(rdoc);
199 rdoc=NULL;
200 } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
201 printf("ndocs_debug: kvs_info(%d) == exp_ndocs(%d) ?\n", val1,exp_ndocs);
202 printf("ndocs_debug: kvs_info(%d) == itr_count(%d) ?\n", val1, val2);
203 fdb_iterator_close(it);
204 }
205 printf("[%s] db_ndix(%d) == exp_nidx(%d)\n", st->keyspace, db_nidx, exp_nidx);
206 #endif
207
208 free(db_checkpoint);
209 db_checkpoint=NULL;
210 //TEST_CHK(db_nidx==exp_nidx);
211 //TEST_CHK(db_suma==exp_suma);
212
213 sprintf(rbuf, "[%s] verifydb: ndocs(%d=%d), nidx(%d=%d), sumage(%d=%d)\n",
214 st->keyspace,
215 db_ndocs, exp_ndocs,
216 db_nidx, exp_nidx,
217 db_suma, exp_suma);
218 #ifdef __DEBUG_E2E
219 TEST_RESULT(rbuf);
220 #endif
221 }
222
223
224 /*
225 * compares a db where src is typically from
226 * a rollback state of live data and replay is a
227 * db to use for comparison of expected data
228 */
db_compare(fdb_kvs_handle *src, fdb_kvs_handle *replay)229 void db_compare(fdb_kvs_handle *src, fdb_kvs_handle *replay) {
230
231 TEST_INIT();
232
233 int ndoc1, ndoc2;
234 fdb_kvs_info info;
235 fdb_iterator *it;
236 fdb_doc *rdoc = NULL;
237 fdb_doc *vdoc = NULL;
238 fdb_status status;
239 char rbuf[256];
240
241 fdb_get_kvs_info(src, &info);
242 ndoc1 = info.doc_count;
243 fdb_get_kvs_info(replay, &info);
244 ndoc2 = info.doc_count;
245
246 TEST_CHK(ndoc1 == ndoc2);
247
248 // all docs in replay db must be in source db with same status
249 status = fdb_iterator_sequence_init(replay, &it, 0, 0, FDB_ITR_NONE);
250 TEST_CHK(status == FDB_RESULT_SUCCESS);
251 do {
252 status = fdb_iterator_get_metaonly(it, &rdoc);
253 TEST_CHK(status == FDB_RESULT_SUCCESS);
254
255 fdb_doc_create(&vdoc, rdoc->key, rdoc->keylen,
256 rdoc->meta, rdoc->metalen,
257 rdoc->body, rdoc->bodylen);
258 // lookup by key
259 status = fdb_get(src, vdoc);
260
261 if (rdoc->deleted) {
262 TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
263 } else {
264 TEST_CHK(status == FDB_RESULT_SUCCESS);
265 }
266
267 fdb_doc_free(rdoc);
268 fdb_doc_free(vdoc);
269 rdoc=NULL;
270 vdoc=NULL;
271
272 } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
273 fdb_iterator_close(it);
274
275 sprintf(rbuf, "db compare: src(%d) == replay(%d)", ndoc1, ndoc2);
276 #ifdef __DEBUG_E2E
277 TEST_RESULT(rbuf);
278 #endif
279 }
280
281 /*
282 * populate replay db up to specified seqnum
283 */
load_replay_kvs(storage_t *st, fdb_kvs_handle *replay_kvs, fdb_seqnum_t seqnum)284 void load_replay_kvs(storage_t *st, fdb_kvs_handle *replay_kvs, fdb_seqnum_t seqnum) {
285
286 TEST_INIT();
287
288 fdb_iterator *it;
289 fdb_doc *rdoc = NULL;
290 fdb_status status;
291 transaction_t *tx;
292
293
294 // iterator end at seqnum
295 status = fdb_iterator_sequence_init(st->rtx, &it, 0, seqnum,
296 FDB_ITR_NONE);
297 TEST_CHK(status == FDB_RESULT_SUCCESS);
298
299 do {
300 status = fdb_iterator_get(it, &rdoc);
301 TEST_CHK(status == FDB_RESULT_SUCCESS);
302
303 tx = (transaction_t *)rdoc->body;
304 if (tx->type == SET_PERSON) {
305 status = fdb_set_kv(replay_kvs,
306 tx->refkey,
307 tx->refkey_len,
308 NULL,0);
309 TEST_CHK(status == FDB_RESULT_SUCCESS);
310 }
311 if (tx->type == DEL_PERSON) {
312 status = fdb_del_kv(replay_kvs,
313 tx->refkey,
314 tx->refkey_len);
315 TEST_CHK(status == FDB_RESULT_SUCCESS);
316 }
317 fdb_doc_free(rdoc);
318 rdoc=NULL;
319
320 } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
321 fdb_iterator_close(it);
322
323 }
324
325 /*
326 * replay records db up to a checkpoint into another db
327 * rollback all_docs to that checkpoint
328 * compare all_docs at that state to new doc
329 */
replay(storage_t *st)330 void replay(storage_t *st) {
331 TEST_INIT();
332
333 int i;
334 size_t v;
335 char kvsbuf[10];
336 fdb_file_handle *dbfile;
337 fdb_kvs_handle *replay_kvs;
338 fdb_config fconfig = fdb_get_default_config();
339 fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
340 fconfig.wal_threshold = 1024;
341 fconfig.flags = FDB_OPEN_FLAG_CREATE;
342 fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
343 fconfig.compaction_threshold = 10;
344 fdb_iterator *it;
345 fdb_status status;
346 fdb_doc *rdoc = NULL;
347 fdb_kvs_info info;
348 transaction_t *tx;
349 checkpoint_t *chk;
350 fdb_seqnum_t rollback_seqnum;
351
352 // create replay kvs
353 status = fdb_open(&dbfile, E2EDB_RECORDS, &fconfig);
354 TEST_CHK(status == FDB_RESULT_SUCCESS);
355
356 e2e_fdb_commit(st->main, st->walflush);
357 status = fdb_get_kvs_info(st->all_docs, &info);
358 TEST_CHK(status == FDB_RESULT_SUCCESS);
359
360
361 // iterate over records kv and replay transactions
362 status = fdb_iterator_sequence_init(st->rtx, &it, 0, 0, FDB_ITR_NONE);
363 TEST_CHK(status == FDB_RESULT_SUCCESS);
364
365 // seek to end so we can reverse iterate
366 // seq iterators cannot seek
367 do {
368 ;
369 } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
370
371
372 // reverse iterate from highest to lowest checkpoint
373 i=0;
374 while (fdb_iterator_prev(it) != FDB_RESULT_ITERATOR_FAIL) {
375 status = fdb_iterator_get(it, &rdoc);
376 TEST_CHK(status == FDB_RESULT_SUCCESS);
377 tx = (transaction_t *)rdoc->body;
378 if (tx->type == END_CHECKPOINT) {
379
380 sprintf(kvsbuf, "rkvs%d", i);
381 status = fdb_kvs_open(dbfile, &replay_kvs, kvsbuf, &kvs_config);
382 TEST_CHK(status == FDB_RESULT_SUCCESS);
383
384 // load replay db up to this seqnum
385 load_replay_kvs(st, replay_kvs, rdoc->seqnum);
386
387 // get checkpoint doc for rollback
388 status = fdb_get_kv(st->chk, tx->refkey, tx->refkey_len,
389 (void **)&chk, &v);
390
391 TEST_CHK(status == FDB_RESULT_SUCCESS);
392 rollback_seqnum = chk->seqnum_all;
393 ;
394 #ifdef __DEBUG_E2E
395 printf("rollback to %llu\n", chk->seqnum_all);
396 #endif
397 status = fdb_rollback(&st->all_docs, rollback_seqnum);
398 if (status == FDB_RESULT_NO_DB_INSTANCE) {
399 free(chk);
400 // drop replay kvs
401 status = fdb_kvs_close(replay_kvs);
402 TEST_CHK(status == FDB_RESULT_SUCCESS);
403 status = fdb_kvs_remove(dbfile, kvsbuf);
404 TEST_CHK(status == FDB_RESULT_SUCCESS);
405 break;
406 }
407 TEST_CHK(status == FDB_RESULT_SUCCESS);
408 free(chk);
409 chk=NULL;
410 // after rollback, WAL entries should be flushed for
411 // accurate # docs count comparison with 'replay_kvs'.
412 e2e_fdb_commit(st->main, true);
413
414 status = fdb_get_kvs_info(st->rtx, &info);
415 TEST_CHK(status == FDB_RESULT_SUCCESS);
416
417 // compare rollback and replay db
418 e2e_fdb_commit(dbfile, st->walflush);
419 db_compare(st->all_docs, replay_kvs);
420
421 // drop replay kvs
422 status = fdb_kvs_close(replay_kvs);
423 TEST_CHK(status == FDB_RESULT_SUCCESS);
424 status = fdb_kvs_remove(dbfile, kvsbuf);
425 TEST_CHK(status == FDB_RESULT_SUCCESS);
426 i++;
427 }
428 fdb_doc_free(rdoc);
429 rdoc=NULL;
430 }
431
432 fdb_iterator_close(it);
433 fdb_close(dbfile);
434
435 }
436
437 /* do forward previous and seek operations on main kv */
iterate_thread(void *args)438 void *iterate_thread(void *args) {
439
440 TEST_INIT();
441
442 int i, j;
443 fdb_config *fconfig = (fdb_config *)args;
444 fdb_file_handle *dbfile;
445 fdb_iterator *it;
446 fdb_doc *rdoc = NULL;
447 fdb_open(&dbfile, E2EDB_MAIN, fconfig);
448 fdb_kvs_handle *all_docs, *snap_db;
449 fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
450 fdb_status status;
451 person_t p;
452
453 for (i = 0; i < 50; i++) {
454 fdb_kvs_open(dbfile, &all_docs, E2EKV_ALLDOCS, &kvs_config);
455
456 if ((i % 2) == 0) { //snapshot
457 status = fdb_snapshot_open(all_docs, &snap_db, FDB_SNAPSHOT_INMEM);
458 TEST_CHK(status == FDB_RESULT_SUCCESS);
459 status = fdb_iterator_init(snap_db, &it, NULL, 0, NULL, 0, FDB_ITR_NONE);
460 TEST_CHK(status == FDB_RESULT_SUCCESS);
461 } else {
462 status = fdb_iterator_init(all_docs, &it, NULL, 0, NULL, 0, FDB_ITR_NONE);
463 TEST_CHK(status == FDB_RESULT_SUCCESS);
464 }
465 j = 0;
466 // forward
467 do {
468 status = fdb_iterator_get(it, &rdoc);
469 if (j) {
470 TEST_CHK(status == FDB_RESULT_SUCCESS);
471 }
472 fdb_doc_free(rdoc);
473 rdoc = NULL;
474 j++;
475 } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
476 // reverse and seek ahead
477 for (j = 0; j < 10; j++) {
478 while (fdb_iterator_prev(it) != FDB_RESULT_ITERATOR_FAIL) {
479 status = fdb_iterator_get(it, &rdoc);
480 TEST_CHK(status == FDB_RESULT_SUCCESS);
481 fdb_doc_free(rdoc);
482 rdoc = NULL;
483 }
484 gen_person(&p);
485
486 // seek using random person key
487 if (j % 2 == 0) { // seek higher
488 fdb_iterator_seek(it, p.key, strlen(p.key), FDB_ITR_SEEK_HIGHER);
489 } else {
490 fdb_iterator_seek(it, p.key, strlen(p.key), FDB_ITR_SEEK_LOWER);
491 }
492 }
493 fdb_iterator_close(it);
494 if ((i % 2) == 0) {
495 fdb_kvs_close(snap_db);
496 }
497 fdb_kvs_close(all_docs);
498 }
499
500 fdb_doc_free(rdoc);
501 rdoc = NULL;
502 fdb_close(dbfile);
503 return NULL;
504 }
505
compact_thread(void *args)506 void *compact_thread(void *args) {
507 int i;
508 fdb_config *fconfig = (fdb_config *)args;
509 fdb_file_handle *dbfile;
510 fdb_open(&dbfile, E2EDB_MAIN, fconfig);
511 for (i = 0; i < 3; ++i) {
512 sleep(2);
513 #ifdef __DEBUG_E2E
514 printf("compact: %d\n", i);
515 #endif
516 fdb_compact(dbfile, NULL);
517 }
518 fdb_close(dbfile);
519 return NULL;
520 }
521
compact_upto_thread(void *args)522 void *compact_upto_thread(void *args) {
523 TEST_INIT();
524
525 int i;
526 uint64_t num_markers;
527 fdb_snapshot_info_t *markers;
528 fdb_status status;
529
530 fdb_config *fconfig = (fdb_config *)args;
531 fdb_file_handle *dbfile;
532 fdb_open(&dbfile, E2EDB_MAIN, fconfig);
533 for (i = 0; i < 10; ++i) {
534 sleep(1);
535 status = fdb_get_all_snap_markers(dbfile, &markers,
536 &num_markers);
537 TEST_CHK(status == FDB_RESULT_SUCCESS);
538 if (num_markers == 0) {
539 // No need of compaction as file is empty
540 break;
541 }
542 status = fdb_compact_upto(dbfile, NULL, markers[0].marker);
543 TEST_CHK(status == FDB_RESULT_SUCCESS);
544 fdb_free_snap_markers(markers, num_markers);
545 }
546 fdb_close(dbfile);
547 return NULL;
548 }
549
e2e_async_compact_pattern(int n_checkpoints, fdb_config fconfig, bool deletes, bool walflush)550 void e2e_async_compact_pattern(int n_checkpoints, fdb_config fconfig,
551 bool deletes, bool walflush) {
552 TEST_INIT();
553 int n, i;
554 storage_t *st;
555 checkpoint_t verification_checkpoint;
556 idx_prams_t index_params;
557 fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
558 fdb_kvs_info info;
559 fdb_kvs_handle *snap_db;
560 fdb_status status;
561 thread_t tid;
562 void *thread_ret;
563 n_checkpoints = n_checkpoints * LOAD_FACTOR;
564
565 memleak_start();
566
567 // init
568 rm_storage_fs();
569 gen_index_params(&index_params);
570 memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
571
572 // setup
573 st = init_storage(&fconfig, &fconfig, &kvs_config,
574 &index_params, &verification_checkpoint, walflush);
575
576 // create compaction thread
577 thread_create(&tid, compact_thread, (void*)&fconfig);
578
579 // test
580 for ( n = 0; n < n_checkpoints; ++n) {
581
582 #ifdef __DEBUG_E2E
583 printf("checkpoint: %d\n", n);
584 #endif
585 load_persons(st);
586 for (i=0;i<100;++i) {
587 #ifdef __DEBUG_E2E
588 printf("\n\n----%d----\n", i);
589 #endif
590 load_persons(st);
591 if (deletes) {
592 delete_persons(st);
593 }
594 e2e_fdb_commit(st->main, walflush);
595 status = fdb_get_kvs_info(st->all_docs, &info);
596 TEST_CHK(status == FDB_RESULT_SUCCESS);
597 status = fdb_snapshot_open(st->all_docs, &snap_db, info.last_seqnum);
598 TEST_CHK(status == FDB_RESULT_SUCCESS);
599 update_index(st, true);
600 verify_db(st);
601 fdb_kvs_close(snap_db);
602 }
603 }
604
605 thread_join(tid, &thread_ret);
606 // teardown
607 e2e_fdb_shutdown(st);
608
609 memleak_end();
610 }
611
e2e_kvs_index_pattern(int n_checkpoints, fdb_config fconfig, bool deletes, bool walflush)612 void e2e_kvs_index_pattern(int n_checkpoints, fdb_config fconfig,
613 bool deletes, bool walflush) {
614
615 int n, i;
616 storage_t *st;
617 checkpoint_t verification_checkpoint;
618 idx_prams_t index_params;
619 fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
620 n_checkpoints = n_checkpoints * LOAD_FACTOR;
621
622 memleak_start();
623
624 // init
625 rm_storage_fs();
626 gen_index_params(&index_params);
627 memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
628
629 // setup
630 st = init_storage(&fconfig, &fconfig, &kvs_config,
631 &index_params, &verification_checkpoint, walflush);
632
633 // test
634 for (n = 0; n < n_checkpoints; ++n) {
635
636 // checkpoint
637 start_checkpoint(st);
638
639 for (i = 0; i < 100; ++i) {
640 #ifdef __DEBUG_E2E
641 printf("\n\n----%d----\n", i);
642 #endif
643 load_persons(st);
644 if (deletes) {
645 delete_persons(st);
646 }
647 verify_db(st);
648 }
649
650 // end checkpoint
651 end_checkpoint(st);
652
653 // change index range
654 update_index(st, true);
655 verify_db(st);
656
657 }
658
659 if (fconfig.compaction_mode != FDB_COMPACTION_AUTO) {
660 /* replay involves rollbacks but
661 * cannot rollback pre compact due to BUG: MB-13130
662 */
663 replay(st);
664 }
665
666 // teardown
667 e2e_fdb_shutdown(st);
668
669 memleak_end();
670 }
671
scan_thread(void *args)672 void *scan_thread(void *args) {
673 int i = 0;
674 fdb_kvs_handle *scan_kv = (fdb_kvs_handle *)args;
675
676 for (i = 0; i < 20; ++i) {
677 scan(NULL, scan_kv);
678 }
679 return NULL;
680 }
681
disk_read_thread(void *args)682 void *disk_read_thread(void *args) {
683 TEST_INIT();
684 int n;
685 uint64_t i;
686 int seqno;
687 uint64_t num_markers;
688 fdb_snapshot_info_t *markers;
689 fdb_kvs_handle *snap_db;
690 fdb_status status;
691 fdb_doc *doc;
692 person_t p;
693 fdb_doc_create(&doc, NULL, 0, NULL, 0, NULL, 0);
694
695 storage_t *st = (storage_t *)args;
696
697 for (n = 0; n < 10; ++n) {
698 status = fdb_get_all_snap_markers(st->main, &markers,
699 &num_markers);
700 TEST_CHK(status == FDB_RESULT_SUCCESS);
701 for (i = 0; i < num_markers; i++) {
702 // open disk snapshot to each marker
703 seqno = markers[i].kvs_markers[0].seqnum;
704 if (!seqno) {
705 continue;
706 }
707 status = fdb_snapshot_open(st->all_docs, &snap_db, seqno);
708 if (status == FDB_RESULT_SUCCESS) {
709 doc->seqnum = seqno;
710 // can get doc
711 status = fdb_get_byseq(snap_db, doc);
712 TEST_CHK(status == FDB_RESULT_SUCCESS);
713
714 // verify
715 memcpy(&p, (person_t *)doc->body, sizeof(person_t));
716 TEST_CHK(p.age <= seqno)
717 }
718 }
719 fdb_free_snap_markers(markers, num_markers);
720 }
721
722 fdb_doc_free(doc);
723 return NULL;
724 }
725
726
writer_thread(void *args)727 void *writer_thread(void *args) {
728
729 storage_t *st = (storage_t *)args;
730 for (int i = 0; i < 5; ++i) {
731 load_persons(st);
732 if (i == 5) {
733 delete_persons(st);
734 }
735 }
736 return NULL;
737 }
738
seq_writer_thread(void *args)739 void *seq_writer_thread(void *args) {
740
741 int j, i = 0, n=10000;
742 person_t p;
743 storage_t *st = (storage_t *)args;
744 for (j = 0; j < 10; ++j) {
745 for (i = 0; i < n; ++i) {
746 gen_person(&p);
747 p.age = i;
748 sprintf(p.key, "person%d", i);
749 e2e_fdb_set_person(st, &p);
750 }
751 e2e_fdb_commit(st->main, true);
752 }
753 return NULL;
754 }
755
update_thread(void *args)756 void *update_thread(void *args) {
757 TEST_INIT();
758
759 int i = 0, n=100000;
760 person_t p;
761 fdb_kvs_handle *kv = (fdb_kvs_handle *)args;
762
763 // only generate 1 person type
764 gen_person(&p);
765 for (i = 0; i < n; ++i) {
766 fdb_set_kv(kv, p.key, strlen(p.key),
767 p.name, strlen(p.name));
768 }
769 return NULL;
770 }
771
772
773 /*
774 * perform many fdb features against concurrent handlers
775 * to verify robustness of db. this pattern is qualified by
776 * completion without faults data corrections and has relaxed error handling.
777 */
e2e_robust_pattern(fdb_config fconfig)778 void e2e_robust_pattern(fdb_config fconfig) {
779
780 int i;
781 int n_writers = 10;
782 int n_scanners = 10;
783 storage_t **st = alca(storage_t *, n_writers);
784 storage_t **st2 = alca(storage_t *, n_writers);
785 checkpoint_t *verification_checkpoint = alca(checkpoint_t, n_writers);
786 idx_prams_t *index_params = alca(idx_prams_t, n_writers);
787 fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
788 thread_t *tid_sc = alca(thread_t, n_scanners);
789 void **thread_ret_sc = alca(void *, n_scanners);
790 thread_t *tid_wr = alca(thread_t, n_writers);
791 void **thread_ret_wr = alca(void *, n_writers);
792 fdb_kvs_handle **scan_kv = alca(fdb_kvs_handle *, n_scanners);
793 thread_t c_tid, i_tid;
794 void *c_ret, *i_ret;
795
796 memleak_start();
797
798 // init
799 rm_storage_fs();
800
801 // init storage handles
802 // the nth writer is commit handler
803 for (i = 0;i < n_writers; ++i) {
804 gen_index_params(&index_params[i]);
805 memset(&verification_checkpoint[i], 0, sizeof(checkpoint_t));
806 st[i] = init_storage(&fconfig, &fconfig, &kvs_config,
807 &index_params[i], &verification_checkpoint[i], true);
808 st2[i] = init_storage(&fconfig, &fconfig, &kvs_config,
809 &index_params[i], &verification_checkpoint[i], true);
810 memcpy(st2[i]->keyspace, st[i]->keyspace, KEYSPACE_LEN);
811 }
812
813 // load init data
814 for (i = 0; i < 100; ++i) {
815 load_persons(st[0]);
816 }
817
818 thread_create(&c_tid, compact_thread, (void*)&fconfig);
819 thread_create(&i_tid, iterate_thread, (void*)&fconfig);
820
821 for (int n = 0; n < 10; ++n) {
822 // start writer threads
823 for (i = 0; i < n_writers - 1; ++i) {
824 st[i]->verify_set = false;
825 thread_create(&tid_wr[i], writer_thread, (void*)st[i]);
826 }
827
828
829 // start scanner threads
830 for (i = 0; i < n_scanners; ++i) {
831 scan_kv[i] = scan(st2[i], NULL);
832 thread_create(&tid_sc[i], scan_thread, (void*)scan_kv[i]);
833 }
834
835 e2e_fdb_commit(st[n_writers-1]->main, true);
836
837 // join scan threads
838 for (i = 0; i < n_scanners; ++i) {
839 thread_join(tid_sc[i], &thread_ret_sc[i]);
840 fdb_kvs_close(scan_kv[i]);
841 }
842
843 // join writer threads
844 for (i = 0; i < n_writers - 1; ++i) {
845 thread_join(tid_wr[i], &thread_ret_wr[i]);
846 update_index(st[i], false);
847 }
848 e2e_fdb_commit(st[n_writers - 1]->main, false);
849 }
850
851 thread_join(c_tid, &c_ret);
852 thread_join(i_tid, &i_ret);
853
854 for (i = 0; i < n_writers; ++i) {
855 // teardown
856 e2e_fdb_close(st2[i]);
857 e2e_fdb_close(st[i]);
858 }
859 fdb_shutdown();
860
861 memleak_end();
862 }
863
864 /*
865 * concurrent scan pattern:
866 * start n_scanners and n_writers
867 * scanners share in-mem snapshots and writers use copies of storage_t
868 */
e2e_concurrent_scan_pattern(int n_checkpoints, int n_scanners, int n_writers, fdb_config fconfig, bool walflush)869 void e2e_concurrent_scan_pattern(int n_checkpoints, int n_scanners, int n_writers,
870 fdb_config fconfig, bool walflush) {
871
872 int n, i;
873 storage_t **st = alca(storage_t *, n_writers);
874 storage_t **st2 = alca(storage_t *, n_writers);
875 checkpoint_t *verification_checkpoint = alca(checkpoint_t, n_writers);
876 idx_prams_t *index_params = alca(idx_prams_t, n_writers);
877 fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
878 thread_t *tid_sc = alca(thread_t, n_scanners);
879 void **thread_ret_sc = alca(void *, n_scanners);
880 thread_t *tid_wr = alca(thread_t, n_writers);
881 void **thread_ret_wr = alca(void *, n_writers);
882 fdb_kvs_handle **scan_kv = alca(fdb_kvs_handle *, n_scanners);
883 n_checkpoints = n_checkpoints * LOAD_FACTOR;
884
885 memleak_start();
886
887 // init
888 rm_storage_fs();
889
890
891 // init storage handles
892 for (i = 0; i < n_writers; ++i) {
893 gen_index_params(&index_params[i]);
894 memset(&verification_checkpoint[i], 0, sizeof(checkpoint_t));
895 st[i] = init_storage(&fconfig, &fconfig, &kvs_config,
896 &index_params[i], &verification_checkpoint[i], walflush);
897 st2[i] = init_storage(&fconfig, &fconfig, &kvs_config,
898 &index_params[i], &verification_checkpoint[i], walflush);
899 memcpy(st2[i]->keyspace, st[i]->keyspace, KEYSPACE_LEN);
900 }
901
902 // load init data
903 start_checkpoint(st[0]);
904 for (i = 0; i < 100; ++i) {
905 load_persons(st[0]);
906 }
907 end_checkpoint(st[0]);
908 verify_db(st[0]);
909
910 for (n = 0; n < n_checkpoints; ++n) {
911
912 // start writer threads
913 for (i = 0; i < n_writers; ++i) {
914 st[i]->verify_set = false;
915 start_checkpoint(st[i]);
916 thread_create(&tid_wr[i], writer_thread, (void*)st[i]);
917 }
918
919 // start scanner threads
920 for (i = 0; i < n_scanners; ++i) {
921 scan_kv[i] = scan(st2[i], NULL);
922 thread_create(&tid_sc[i], scan_thread, (void*)scan_kv[i]);
923 }
924
925 // join scan threads
926 for (i = 0; i < n_scanners; ++i) {
927 thread_join(tid_sc[i], &thread_ret_sc[i]);
928 fdb_kvs_close(scan_kv[i]);
929 }
930
931 // join writer threads
932 for (i = 0; i < n_writers; ++i) {
933 thread_join(tid_wr[i], &thread_ret_wr[i]);
934 end_checkpoint(st[i]);
935 verify_db(st[i]);
936 update_index(st[i], true);
937 }
938
939 }
940
941 for (i = 0; i < n_writers; ++i) {
942 // teardown
943 e2e_fdb_close(st2[i]);
944 e2e_fdb_close(st[i]);
945 }
946 fdb_shutdown();
947
948 memleak_end();
949 }
950
951
e2e_index_basic_test()952 void e2e_index_basic_test() {
953
954 TEST_INIT();
955 memleak_start();
956
957 randomize();
958 // configure
959 fdb_config fconfig = fdb_get_default_config();
960 fconfig.wal_threshold = 1024;
961 fconfig.flags = FDB_OPEN_FLAG_CREATE;
962 fconfig.durability_opt = FDB_DRB_ASYNC;
963 fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
964
965 // test
966 e2e_kvs_index_pattern(1, fconfig, true, false); // normal commit
967 e2e_kvs_index_pattern(1, fconfig, true, true); // wal commit
968
969 memleak_end();
970 TEST_RESULT("TEST: e2e index basic test");
971 }
972
e2e_index_walflush_test_no_deletes_auto_compact()973 void e2e_index_walflush_test_no_deletes_auto_compact() {
974
975 TEST_INIT();
976 memleak_start();
977
978 randomize();
979 // configure
980 fdb_config fconfig = fdb_get_default_config();
981 fconfig.wal_threshold = 1024;
982 fconfig.flags = FDB_OPEN_FLAG_CREATE;
983 fconfig.durability_opt = FDB_DRB_ASYNC;
984 fconfig.compaction_mode=FDB_COMPACTION_AUTO;
985 fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
986
987 // test
988 e2e_kvs_index_pattern(10, fconfig, false, true);
989
990 memleak_end();
991 TEST_RESULT("TEST: e2e index walflush test no deletes auto compact");
992 }
993
e2e_index_walflush_autocompact_test()994 void e2e_index_walflush_autocompact_test() {
995
996 TEST_INIT();
997 memleak_start();
998
999 randomize();
1000 // opts
1001 fdb_config fconfig = fdb_get_default_config();
1002 fconfig.wal_threshold = 1024;
1003 fconfig.flags = FDB_OPEN_FLAG_CREATE;
1004 fconfig.durability_opt = FDB_DRB_ASYNC;
1005 fconfig.compaction_mode=FDB_COMPACTION_AUTO;
1006 fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1007
1008 // test
1009 e2e_kvs_index_pattern(2, fconfig, true, true);
1010
1011 memleak_end();
1012 TEST_RESULT("TEST: e2e index walflush autocompact test");
1013
1014 }
1015
e2e_index_normal_commit_autocompact_test()1016 void e2e_index_normal_commit_autocompact_test() {
1017
1018 TEST_INIT();
1019 memleak_start();
1020
1021 randomize();
1022 // opts
1023 fdb_config fconfig = fdb_get_default_config();
1024 fconfig.wal_threshold = 1024;
1025 fconfig.flags = FDB_OPEN_FLAG_CREATE;
1026 fconfig.durability_opt = FDB_DRB_NONE;
1027 fconfig.compaction_mode=FDB_COMPACTION_AUTO;
1028 fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1029
1030 // test
1031 e2e_kvs_index_pattern(2, fconfig, true, false);
1032
1033 memleak_end();
1034 TEST_RESULT("TEST: e2e index normal commit autocompact test");
1035 }
1036
e2e_async_manual_compact_test()1037 void e2e_async_manual_compact_test() {
1038 TEST_INIT();
1039 memleak_start();
1040
1041 randomize();
1042 // opts
1043 fdb_config fconfig = fdb_get_default_config();
1044 fconfig.wal_threshold = 1024;
1045 fconfig.flags = FDB_OPEN_FLAG_CREATE;
1046 fconfig.durability_opt = FDB_DRB_ASYNC;
1047
1048 // test
1049 e2e_async_compact_pattern(10, fconfig, false, true);
1050 memleak_end();
1051 TEST_RESULT("TEST: e2e async manual compact test");
1052 }
1053
1054
e2e_concurrent_scan_test()1055 void e2e_concurrent_scan_test() {
1056 TEST_INIT();
1057 memleak_start();
1058
1059 randomize();
1060 fdb_config fconfig = fdb_get_default_config();
1061 fconfig.wal_threshold = 1024;
1062 fconfig.flags = FDB_OPEN_FLAG_CREATE;
1063 fconfig.compaction_mode=FDB_COMPACTION_AUTO;
1064 fconfig.durability_opt = FDB_DRB_ASYNC;
1065 fconfig.purging_interval = 30; // retain deleted docs for iteration
1066 fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1067
1068 // test
1069 e2e_concurrent_scan_pattern(3, 5, 5, fconfig, true);
1070 // normal_commit
1071 e2e_concurrent_scan_pattern(3, 5, 5, fconfig, false);
1072
1073 memleak_end();
1074 TEST_RESULT("TEST: e2e concurrent scan");
1075 }
1076
e2e_robust_test()1077 void e2e_robust_test() {
1078 TEST_INIT();
1079
1080 randomize();
1081 fdb_config fconfig = fdb_get_default_config();
1082 fconfig.wal_threshold = 1024;
1083 fconfig.flags = FDB_OPEN_FLAG_CREATE;
1084 fconfig.compaction_mode=FDB_COMPACTION_AUTO;
1085 fconfig.durability_opt = FDB_DRB_ASYNC;
1086 fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1087 // to allow iterators to validate docs across async compaction
1088 // specify purging_interval so deleted docs are not dropped by
1089 // compactor immediately..
1090 fconfig.purging_interval = 80;
1091
1092 // test
1093 e2e_robust_pattern(fconfig);
1094
1095 TEST_RESULT("TEST: e2e robust test");
1096 }
1097
e2e_scan_compact_upto_test()1098 void e2e_scan_compact_upto_test() {
1099 TEST_INIT();
1100 memleak_start();
1101
1102 randomize();
1103 int n, i;
1104 void *thread_ret;
1105 bool walflush = true;
1106 int n_checkpoints = LOAD_FACTOR;
1107
1108 fdb_config fconfig = fdb_get_default_config();
1109 fconfig.flags = FDB_OPEN_FLAG_CREATE;
1110 fconfig.compaction_threshold = 0;
1111 fconfig.durability_opt = FDB_DRB_ASYNC;
1112 fconfig.purging_interval = 30; // retain deleted docs for iteration
1113
1114 storage_t *st;
1115 checkpoint_t verification_checkpoint;
1116 idx_prams_t index_params;
1117 fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1118 fdb_kvs_info info;
1119 fdb_kvs_handle *snap_db;
1120 fdb_status status;
1121 thread_t tid;
1122
1123 // init
1124 rm_storage_fs();
1125 gen_index_params(&index_params);
1126 memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
1127
1128 // setup
1129 st = init_storage(&fconfig, &fconfig, &kvs_config,
1130 &index_params, &verification_checkpoint, walflush);
1131
1132 // create compaction thread
1133 thread_create(&tid, compact_upto_thread, (void*)&fconfig);
1134
1135 // test
1136 for (n = 0; n < n_checkpoints; ++n) {
1137 load_persons(st);
1138 for (i = 0; i < 100; ++i) {
1139 load_persons(st);
1140 e2e_fdb_commit(st->main, walflush);
1141 status = fdb_get_kvs_info(st->all_docs, &info);
1142 TEST_CHK(status == FDB_RESULT_SUCCESS);
1143 fdb_snapshot_open(st->all_docs, &snap_db, info.last_seqnum);
1144 TEST_CHK(status == FDB_RESULT_SUCCESS);
1145 update_index(st, true);
1146 verify_db(st);
1147 fdb_kvs_close(snap_db);
1148 }
1149 }
1150
1151 thread_join(tid, &thread_ret);
1152
1153 // teardown
1154 e2e_fdb_shutdown(st);
1155 memleak_end();
1156 TEST_RESULT("TEST: e2e concurrent compact upto");
1157 }
1158
kv_thread(void *args)1159 void *kv_thread(void *args) {
1160 // This thread stores and indexes person docs
1161
1162 TEST_INIT();
1163 uint64_t i;
1164 uint64_t num_markers;
1165 fdb_snapshot_info_t *markers;
1166 fdb_status status;
1167
1168 storage_t *st = (storage_t *)args;
1169 status = fdb_get_all_snap_markers(st->main, &markers,
1170 &num_markers);
1171 TEST_CHK(status == FDB_RESULT_SUCCESS);
1172
1173 for (i = 0; i < num_markers; i += 10) {
1174 load_persons(st);
1175 }
1176 fdb_free_snap_markers(markers, num_markers);
1177 return NULL;
1178 }
1179
1180 // This function identifies the latest superblock
1181 // add adds garbage to it.
corrupt_latest_superblock(const char* filename)1182 void corrupt_latest_superblock(const char* filename) {
1183 /*
1184 * Note that each block is 4096 bytes.
1185 * - There are 4 superblocks which constitute the
1186 * first 4 blocks of the file.
1187 * - The 8 bytes following the first 8 bytes of a
1188 * superblock contains the block revision num.
1189 */
1190 struct filemgr_ops *ops = get_filemgr_ops();
1191 int64_t offset = 8;
1192 int latest_sb = 0;
1193 int fd = ops->open(filename, O_RDWR, 0644);
1194 uint64_t buf, highest_rev = 0;
1195 for (int i = 0; i < 4; ++i) { // num of superblocks: 4
1196 if (ops->pread(fd, &buf, sizeof(uint64_t), offset) == sizeof(uint64_t)) {
1197 buf = _endian_decode(buf);
1198 assert(buf != highest_rev);
1199 if (buf > highest_rev) {
1200 highest_rev = buf;
1201 latest_sb = i;
1202 }
1203 offset += 4096;
1204 } else {
1205 fprintf(stderr, "Warning: Could not find the latest superblock!\n");
1206 ops->close(fd);
1207 return;
1208 }
1209 }
1210 // Write garbage at a random offset that would fall within
1211 // the latest super block
1212 uint64_t garbage = rand();
1213 offset = latest_sb * 4096 + (rand() % (4095 - sizeof("garbage")));
1214 if (ops->pwrite(fd, &garbage, sizeof(garbage), offset) != sizeof(garbage)) {
1215 fprintf(stderr,
1216 "\nWarning: Could not write garbage into the superblock!");
1217 }
1218 ops->close(fd);
1219 }
1220
e2e_crash_recover_test(bool do_rollback)1221 void e2e_crash_recover_test(bool do_rollback) {
1222
1223 TEST_INIT();
1224 memleak_start();
1225
1226 randomize();
1227
1228 bool walflush = true;
1229 int i, n;
1230 int n_checkpoints = 2;
1231
1232 rm_storage_fs();
1233 storage_t *st;
1234 checkpoint_t verification_checkpoint;
1235 idx_prams_t index_params;
1236 fdb_kvs_handle *snap_db;
1237 fdb_kvs_info info;
1238 fdb_status status;
1239
1240 fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1241 fdb_config fconfig = fdb_get_default_config();
1242 fconfig.flags = FDB_OPEN_FLAG_CREATE;
1243 fconfig.compaction_threshold = 0;
1244 fconfig.durability_opt = FDB_DRB_ASYNC;
1245 fconfig.purging_interval = 30; // retain deleted docs for iteration
1246
1247 gen_index_params(&index_params);
1248 memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
1249 st = init_storage(&fconfig, &fconfig, &kvs_config,
1250 &index_params, &verification_checkpoint, walflush);
1251
1252 for (i = 0; i < 100; ++i) {
1253 load_persons(st);
1254 e2e_fdb_commit(st->main, true);
1255 }
1256
1257 fdb_seqnum_t seqno;
1258 uint64_t k, num_markers;
1259 fdb_snapshot_info_t *markers;
1260
1261 if (do_rollback) {
1262 status = fdb_get_all_snap_markers(st->main, &markers,
1263 &num_markers);
1264 TEST_CHK(status == FDB_RESULT_SUCCESS);
1265
1266 for (k = 0; k < num_markers; k += 10) {
1267 // rollback to each marker
1268 seqno = markers[k].kvs_markers[0].seqnum;
1269 status = fdb_rollback(&st->all_docs, seqno);
1270 TEST_CHK(status == FDB_RESULT_SUCCESS);
1271 }
1272 fdb_free_snap_markers(markers, num_markers);
1273 } else {
1274 status = fdb_get_all_snap_markers(st->main, &markers,
1275 &num_markers);
1276 TEST_CHK(status == FDB_RESULT_SUCCESS);
1277
1278 for (k = 0; k < num_markers; k += 10) {
1279 load_persons(st);
1280 }
1281 fdb_free_snap_markers(markers, num_markers);
1282 }
1283
1284 // close storage
1285 e2e_fdb_close(st);
1286
1287 corrupt_latest_superblock(E2EDB_MAIN);
1288
1289 // reopen storage
1290 gen_index_params(&index_params);
1291 memset(&verification_checkpoint, 0, sizeof(checkpoint_t));
1292 st = init_storage(&fconfig, &fconfig, &kvs_config,
1293 &index_params, &verification_checkpoint, walflush);
1294
1295 // run verifiable workload
1296 for (n = 0; n < n_checkpoints; ++n) {
1297 load_persons(st);
1298 for (i = 0; i < 100; ++i) {
1299 load_persons(st);
1300 e2e_fdb_commit(st->main, walflush);
1301 status = fdb_get_kvs_info(st->all_docs, &info);
1302 TEST_CHK(status == FDB_RESULT_SUCCESS);
1303 fdb_snapshot_open(st->all_docs, &snap_db, info.last_seqnum);
1304 TEST_CHK(status == FDB_RESULT_SUCCESS);
1305 update_index(st, true);
1306 verify_db(st);
1307 fdb_kvs_close(snap_db);
1308 }
1309 }
1310
1311 // teardown
1312 e2e_fdb_shutdown(st);
1313 memleak_end();
1314 if (do_rollback) {
1315 TEST_RESULT("TEST: e2e crash recover test with rollback");
1316 } else {
1317 TEST_RESULT("TEST: e2e crash recover test");
1318 }
1319 }
1320
e2e_concurrent_reader_writer(bool do_compaction)1321 void e2e_concurrent_reader_writer(bool do_compaction) {
1322 TEST_INIT();
1323 memleak_start();
1324
1325 randomize();
1326 fdb_config fconfig = fdb_get_default_config();
1327 // to allow iterators to validate docs across async compaction
1328 // specify purging_interval so deleted docs are not dropped by
1329 // compactor immediately..
1330 fconfig.purging_interval = 80;
1331 fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1332
1333 // init
1334 rm_storage_fs();
1335
1336 // test
1337 int i;
1338 int n_writers = 2;
1339 int n_scanners = 2;
1340 storage_t **st = alca(storage_t *, n_writers);
1341 storage_t **st2 = alca(storage_t *, n_writers);
1342 checkpoint_t *verification_checkpoint = alca(checkpoint_t, n_writers);
1343 idx_prams_t *index_params = alca(idx_prams_t, n_writers);
1344 fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1345 thread_t *tid_sc = alca(thread_t, n_scanners);
1346 void **thread_ret_sc = alca(void *, n_scanners);
1347 thread_t *tid_wr = alca(thread_t, n_writers);
1348 void **thread_ret_wr = alca(void *, n_writers);
1349 fdb_kvs_handle **scan_kv = alca(fdb_kvs_handle *, n_scanners);
1350 thread_t c_tid;
1351 void *c_ret;
1352
1353 // init storage handles
1354 // the nth writer is commit handler
1355 for (i = 0; i < n_writers; ++i) {
1356 gen_index_params(&index_params[i]);
1357 memset(&verification_checkpoint[i], 0, sizeof(checkpoint_t));
1358 st[i] = init_storage(&fconfig, &fconfig, &kvs_config,
1359 &index_params[i], &verification_checkpoint[i],
1360 true);
1361 st2[i] = init_storage(&fconfig, &fconfig, &kvs_config,
1362 &index_params[i], &verification_checkpoint[i],
1363 true);
1364 memcpy(st2[i]->keyspace, st[i]->keyspace, KEYSPACE_LEN);
1365 }
1366
1367 if (do_compaction) {
1368 thread_create(&c_tid, compact_thread, (void*)&fconfig);
1369 }
1370
1371 // start new item thread
1372 thread_create(&tid_wr[0], seq_writer_thread, (void*)st[0]);
1373 // start update thread
1374 thread_create(&tid_wr[1], update_thread, (void*)st[1]->index1);
1375
1376 // start disk-snapshot thread
1377 scan_kv[0] = scan(st2[0], NULL);
1378 thread_create(&tid_sc[0], disk_read_thread, (void*)st[1]);
1379 // start mem-snapshot thread
1380 scan_kv[1] = scan(st2[1], NULL);
1381 thread_create(&tid_sc[1], scan_thread, (void*)scan_kv[1]);
1382
1383 // join threads
1384 thread_join(tid_wr[0], &thread_ret_wr[0]);
1385 thread_join(tid_wr[1], &thread_ret_wr[1]);
1386 thread_join(tid_sc[1], &thread_ret_sc[1]);
1387 thread_join(tid_sc[0], &thread_ret_sc[0]);
1388 if (do_compaction) {
1389 thread_join(c_tid, &c_ret);
1390 }
1391
1392 // commit
1393 e2e_fdb_commit(st[0]->main, false);
1394
1395 for (i = 0; i < n_writers; ++i) {
1396 // teardown
1397 e2e_fdb_close(st2[i]);
1398 e2e_fdb_close(st[i]);
1399 }
1400 fdb_shutdown();
1401
1402 memleak_end();
1403 if (do_compaction) {
1404 TEST_RESULT("TEST: e2e concurrent reader writer test with compaction");
1405 } else {
1406 TEST_RESULT("TEST: e2e concurrent reader writer test");
1407 }
1408 }
1409
e2e_multi_dbfile_concurrent_wr()1410 void e2e_multi_dbfile_concurrent_wr() {
1411 TEST_INIT();
1412 memleak_start();
1413 int i, r;
1414 int nf = 16;
1415 char buf[64];
1416 thread_t *tid_wr = alca(thread_t, nf);
1417 void **thread_ret_wr = alca(void *, nf);
1418 fdb_file_handle **dbfiles = alca(fdb_file_handle*, nf);
1419 fdb_kvs_handle **kvs = alca(fdb_kvs_handle*, nf);
1420 fdb_status status;
1421 randomize();
1422 rm_storage_fs();
1423 r = system(SHELL_DEL" e2edb_ex* > errorlog.txt");
1424 (void)r;
1425
1426 // create dbfiles
1427 fdb_config fconfig = fdb_get_default_config();
1428 fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1429 fconfig.compaction_mode = FDB_COMPACTION_MANUAL;
1430
1431 for (i = 0; i < nf; i++) {
1432 sprintf(buf, "e2edb_ex%d", i);
1433 status = fdb_open(&dbfiles[i], buf, &fconfig);
1434 TEST_CHK(status == FDB_RESULT_SUCCESS);
1435 status = fdb_kvs_open_default(dbfiles[i], &kvs[i], &kvs_config);
1436 TEST_CHK(status == FDB_RESULT_SUCCESS);
1437
1438 // run update thread on each file to enter reuse
1439 thread_create(&tid_wr[i], update_thread, (void*)kvs[i]);
1440 }
1441
1442 // join and commit
1443 for (i = 0; i < nf; i++) {
1444 thread_join(tid_wr[i], &thread_ret_wr[i]);
1445 fdb_commit(dbfiles[i], FDB_COMMIT_MANUAL_WAL_FLUSH);
1446 fdb_close(dbfiles[i]);
1447 }
1448
1449 fdb_shutdown();
1450 memleak_end();
1451 TEST_RESULT("TEST: e2e multi dbfile concurrent wr");
1452 }
1453
e2e_multi_kvs_concurrent_wr()1454 void e2e_multi_kvs_concurrent_wr() {
1455 TEST_INIT();
1456 memleak_start();
1457 int i, j, r;
1458 int nf = 8;
1459 char buf[64];
1460 char body[64];
1461
1462 thread_t *tid_wr = alca(thread_t, nf);
1463 void **thread_ret_wr = alca(void *, nf);
1464 fdb_file_handle *dbfile;
1465 fdb_kvs_handle **kvs = alca(fdb_kvs_handle*, nf);
1466 fdb_doc *rdoc = NULL;
1467 fdb_status status;
1468 randomize();
1469 rm_storage_fs();
1470 r = system(SHELL_DEL" e2edb_main > errorlog.txt");
1471 fdb_iterator *it;
1472 (void)r;
1473
1474 // create dbfile
1475 fdb_config fconfig = fdb_get_default_config();
1476 fconfig.num_keeping_headers = 10;
1477 fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1478 fconfig.compaction_mode = FDB_COMPACTION_MANUAL;
1479 fdb_open(&dbfile, "e2edb_main", &fconfig);
1480
1481 // open dbfiles
1482 for (i = 0; i < nf; i++) {
1483 sprintf(buf, "e2edb_kv%d", i);
1484 status = fdb_kvs_open(dbfile, &kvs[i], buf, &kvs_config);
1485 TEST_CHK(status == FDB_RESULT_SUCCESS);
1486 }
1487
1488 // initial commits
1489 for (j = 0; j < 10; j++) {
1490 for (i = 0; i < nf; i++) {
1491 sprintf(buf, "%dkey%d", j, i);
1492 sprintf(body, "commit%d", j);
1493 status = fdb_set_kv(kvs[i], buf, strlen(buf), body, strlen(body));
1494 TEST_CHK(status == FDB_RESULT_SUCCESS);
1495 }
1496 status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1497 TEST_CHK(status == FDB_RESULT_SUCCESS);
1498 }
1499
1500 // run update thread on each file to enter reuse
1501 for (i = 0; i < nf; i++) {
1502 thread_create(&tid_wr[i], update_thread, (void*)kvs[i]);
1503 }
1504
1505 // join and rollback
1506 for (i = 0; i < nf; i++) {
1507 thread_join(tid_wr[i], &thread_ret_wr[i]);
1508 status = fdb_rollback(&kvs[i], 7);
1509 TEST_CHK(status == FDB_RESULT_SUCCESS);
1510 }
1511
1512 status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1513 TEST_CHK(status == FDB_RESULT_SUCCESS);
1514 for (i = 0; i < nf; i++) {
1515 // verify rollback kvs
1516 status = fdb_iterator_init(kvs[i], &it, NULL, 0,
1517 NULL, 0, FDB_ITR_NONE);
1518 TEST_CHK(status == FDB_RESULT_SUCCESS);
1519 j = 0;
1520 do {
1521 status = fdb_iterator_get(it, &rdoc);
1522 TEST_CHK (status == FDB_RESULT_SUCCESS);
1523 fdb_doc_free(rdoc);
1524 rdoc=NULL;
1525 j++;
1526 } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
1527 TEST_CHK(j==7);
1528 fdb_iterator_close(it);
1529 }
1530
1531 fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1532 fdb_close(dbfile);
1533 fdb_shutdown();
1534
1535 memleak_end();
1536
1537 r = system(SHELL_DEL" e2edb_main > errorlog.txt");
1538 (void)r;
1539
1540 TEST_RESULT("TEST: e2e multi kvs concurrent wr test");
1541 }
1542
e2e_multi_kvs_concurrent_wr_compact()1543 void e2e_multi_kvs_concurrent_wr_compact() {
1544 TEST_INIT();
1545 memleak_start();
1546 int i, j, r;
1547 int nf = 4;
1548 char buf[64];
1549
1550 thread_t *tid_wr = alca(thread_t, nf*nf);
1551 void **thread_ret_wr = alca(void *, nf*nf);
1552 fdb_file_handle **dbfiles = alca(fdb_file_handle*, nf);
1553 fdb_kvs_handle **kvs = alca(fdb_kvs_handle*, nf*nf);
1554 fdb_status status;
1555 randomize();
1556 rm_storage_fs();
1557 r = system(SHELL_DEL" e2edb_ex* > errorlog.txt");
1558 (void)r;
1559
1560 // create dbfile
1561 fdb_config fconfig = fdb_get_default_config();
1562 fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1563 fconfig.compaction_mode = FDB_COMPACTION_MANUAL;
1564
1565 // open dbfiles
1566 for (i = 0; i < nf; ++i) {
1567 sprintf(buf, "e2edb_ex%d", i);
1568 status = fdb_open(&dbfiles[i], buf, &fconfig);
1569 TEST_CHK(status == FDB_RESULT_SUCCESS);
1570
1571 for (j = i*nf; j < (i * nf + nf); ++j) {
1572 sprintf(buf, "e2edb_kv%d", j);
1573 status = fdb_kvs_open(dbfiles[i], &kvs[j], buf, &kvs_config);
1574 TEST_CHK(status == FDB_RESULT_SUCCESS);
1575 }
1576 }
1577
1578 // initial commits
1579 for (i = 0; i < nf; i++) {
1580 status = fdb_commit(dbfiles[i], FDB_COMMIT_MANUAL_WAL_FLUSH);
1581 TEST_CHK(status == FDB_RESULT_SUCCESS);
1582 }
1583
1584 // run update thread on each file to enter reuse
1585 for (i = 0; i < (nf * nf); i++) {
1586 thread_create(&tid_wr[i], update_thread, (void*)kvs[i]);
1587 }
1588
1589 // join update threads
1590 for (i = 0; i < (nf * nf); i++) {
1591 thread_join(tid_wr[i], &thread_ret_wr[i]);
1592 TEST_CHK(status == FDB_RESULT_SUCCESS);
1593 }
1594
1595 // more commits
1596 for (i = 0; i < nf; i++) {
1597 status = fdb_commit(dbfiles[i], FDB_COMMIT_MANUAL_WAL_FLUSH);
1598 TEST_CHK(status == FDB_RESULT_SUCCESS);
1599 }
1600
1601 for (i = 0; i < nf; i++) {
1602 fdb_close(dbfiles[i]);
1603 }
1604 fdb_shutdown();
1605 memleak_end();
1606
1607 TEST_RESULT("TEST: e2e multi kvs concurrent wr test with compaction");
1608 }
1609
main()1610 int main() {
1611
1612 // Note: following tests are temporarily disabled due to
1613 // the keeping header violation issue by rollback/snapshot API.
1614 //
1615 // - e2e_multi_kvs_concurrent_wr();
1616
1617 /* Multiple kvstores under reuse with rollback */
1618 // e2e_multi_kvs_concurrent_wr();
1619
1620 /* Multiple kvstores under reuse with rollback and compaction */
1621 e2e_multi_kvs_concurrent_wr_compact();
1622
1623 /* Multiple dbfiles after stale block reuse */
1624 e2e_multi_dbfile_concurrent_wr();
1625
1626 /* Concurrent readers and writers after stale block reuse, without
1627 and with compaction */
1628 e2e_concurrent_reader_writer(false);
1629 e2e_concurrent_reader_writer(true); // w.compaction
1630
1631 e2e_robust_test();
1632 e2e_concurrent_scan_test();
1633 e2e_async_manual_compact_test();
1634 e2e_index_basic_test();
1635 e2e_index_walflush_test_no_deletes_auto_compact();
1636 e2e_index_walflush_autocompact_test();
1637 e2e_index_normal_commit_autocompact_test();
1638
1639 /* Data loading with concurrent compaction */
1640 e2e_scan_compact_upto_test();
1641
1642 /* Crash recovery with e2e workload, without and with a rollback */
1643 e2e_crash_recover_test(false);
1644 e2e_crash_recover_test(true); // w.rollback
1645
1646 return 0;
1647 }
1648