1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdint.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <unistd.h>
25#endif
26
27#include "libforestdb/forestdb.h"
28#include "test.h"
29
30#include "internal_types.h"
31#include "functional_util.h"
32
33void rollback_secondary_kvs()
34{
35    TEST_INIT();
36    memleak_start();
37
38    int r;
39    void *value_out;
40    size_t valuelen_out;
41
42    fdb_status status;
43    fdb_config fconfig = fdb_get_default_config();
44    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
45    fdb_file_handle *dbfile;
46    fdb_kvs_handle *kv1, *kv2;
47
48    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
49    (void)r;
50
51    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
52    fdb_kvs_open_default(dbfile, &kv1, &kvs_config);
53    fdb_kvs_open_default(dbfile, &kv2, &kvs_config);
54
55    // seq:2
56    status = fdb_set_kv(kv1, (void *) "a", 1, (void *)"val-a", 5);
57    TEST_CHK(status == FDB_RESULT_SUCCESS);
58    status = fdb_set_kv(kv1, (void *) "b", 1, (void *)"val-b", 5);
59    TEST_CHK(status == FDB_RESULT_SUCCESS);
60    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
61
62    // seq:3
63    status = fdb_set_kv(kv1, (void *) "b", 1, (void *)"val-v", 5);
64    TEST_CHK(status == FDB_RESULT_SUCCESS);
65    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
66
67    // seq:4
68    status = fdb_del_kv(kv1, (void *)"a", 1);
69    TEST_CHK(status == FDB_RESULT_SUCCESS);
70    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
71
72    // get 'a' via kv2
73    status = fdb_get_kv(kv2, (void *)"b", 1, &value_out, &valuelen_out);
74    free(value_out);
75    TEST_CHK(status == FDB_RESULT_SUCCESS);
76
77    // rollback seq:3 via kv2
78    status = fdb_rollback(&kv2, 3);
79    TEST_CHK(status == FDB_RESULT_SUCCESS);
80
81    fdb_close(dbfile);
82    fdb_shutdown();
83
84    memleak_end();
85    TEST_RESULT("rollback secondary kv");
86}
87
88
89void multi_version_test()
90{
91    TEST_INIT();
92
93    memleak_start();
94
95    int i, r;
96    int n = 2;
97    fdb_file_handle *dbfile, *dbfile_new;
98    fdb_kvs_handle *db;
99    fdb_kvs_handle *db_new;
100    fdb_doc **doc = alca(fdb_doc*, n);
101    fdb_doc *rdoc;
102    fdb_status status;
103
104    char keybuf[256], metabuf[256], bodybuf[256];
105
106    // remove all previous mvcc_test files
107    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
108    (void)r;
109
110    fdb_config fconfig = fdb_get_default_config();
111    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
112    fconfig.buffercache_size = 1048576;
113    fconfig.wal_threshold = 1024;
114    fconfig.flags = FDB_OPEN_FLAG_CREATE;
115    fconfig.compaction_threshold = 0;
116
117    // open db
118    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
119    fdb_kvs_open_default(dbfile, &db, &kvs_config);
120    status = fdb_set_log_callback(db, logCallbackFunc, (void *) "multi_version_test");
121    TEST_CHK(status == FDB_RESULT_SUCCESS);
122
123    // insert documents
124    for (i=0;i<n;++i){
125        sprintf(keybuf, "key%d", i);
126        sprintf(metabuf, "meta%d", i);
127        sprintf(bodybuf, "body%d", i);
128        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
129            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
130        fdb_set(db, doc[i]);
131    }
132
133    // commit
134    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
135
136    // open same db file using a new handle
137    fdb_open(&dbfile_new, "./mvcc_test1", &fconfig);
138    fdb_kvs_open_default(dbfile_new, &db_new, &kvs_config);
139    status = fdb_set_log_callback(db_new, logCallbackFunc, (void *) "multi_version_test");
140    TEST_CHK(status == FDB_RESULT_SUCCESS);
141
142    // update documents using the old handle
143    for (i=0;i<n;++i){
144        sprintf(metabuf, "meta2%d", i);
145        sprintf(bodybuf, "body2%d", i);
146        fdb_doc_update(&doc[i], (void*)metabuf, strlen(metabuf),
147            (void*)bodybuf, strlen(bodybuf));
148        fdb_set(db, doc[i]);
149    }
150
151    // manually flush WAL and commit using the old handle
152    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
153
154    // retrieve documents using the old handle
155    for (i=0;i<n;++i){
156        // search by key
157        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
158        status = fdb_get(db, rdoc);
159
160        TEST_CHK(status == FDB_RESULT_SUCCESS);
161        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
162        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
163
164        // free result document
165        fdb_doc_free(rdoc);
166    }
167
168    // retrieve documents using the new handle
169    for (i=0;i<n;++i){
170        // search by key
171        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
172        status = fdb_get(db_new, rdoc);
173
174        TEST_CHK(status == FDB_RESULT_SUCCESS);
175        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
176        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
177
178        // free result document
179        fdb_doc_free(rdoc);
180    }
181
182    // close and re-open the new handle
183    fdb_kvs_close(db_new);
184    fdb_close(dbfile_new);
185    fdb_open(&dbfile_new, "./mvcc_test1", &fconfig);
186    fdb_kvs_open_default(dbfile_new, &db_new, &kvs_config);
187    status = fdb_set_log_callback(db_new, logCallbackFunc, (void *) "multi_version_test");
188    TEST_CHK(status == FDB_RESULT_SUCCESS);
189
190    // retrieve documents using the new handle
191    for (i=0;i<n;++i){
192        // search by key
193        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
194        status = fdb_get(db_new, rdoc);
195
196        TEST_CHK(status == FDB_RESULT_SUCCESS);
197        // the new version of data should be read
198        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
199        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
200
201        // free result document
202        fdb_doc_free(rdoc);
203    }
204
205
206    // free all documents
207    for (i=0;i<n;++i){
208        fdb_doc_free(doc[i]);
209    }
210
211    // close db file
212    fdb_kvs_close(db);
213    fdb_kvs_close(db_new);
214    fdb_close(dbfile);
215    fdb_close(dbfile_new);
216
217    // free all resources
218    fdb_shutdown();
219
220    memleak_end();
221
222    TEST_RESULT("multi version test");
223}
224
225void crash_recovery_test(bool walflush)
226{
227    TEST_INIT();
228
229    memleak_start();
230
231    int i, r;
232    int n = 10;
233    fdb_file_handle *dbfile;
234    fdb_kvs_handle *db;
235    fdb_doc **doc = alca(fdb_doc*, n);
236    fdb_doc *rdoc;
237    fdb_file_info file_info;
238    uint64_t bid;
239    const char *test_file = "./mvcc_test2";
240    fdb_status status;
241
242    char keybuf[256], metabuf[256], bodybuf[256];
243
244    // remove previous mvcc_test files
245    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
246    (void)r;
247
248    fdb_config fconfig = fdb_get_default_config();
249    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
250    fconfig.buffercache_size = 0;
251    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
252    fconfig.wal_threshold = 1024;
253    fconfig.flags = FDB_OPEN_FLAG_CREATE;
254    fconfig.compaction_threshold = 0;
255
256    // reopen db
257    fdb_open(&dbfile, test_file, &fconfig);
258    fdb_kvs_open_default(dbfile, &db, &kvs_config);
259    status = fdb_set_log_callback(db, logCallbackFunc,
260                                  (void *) "crash_recovery_test");
261    TEST_CHK(status == FDB_RESULT_SUCCESS);
262
263    // insert documents
264    for (i=0;i<n;++i){
265        sprintf(keybuf, "key%d", i);
266        sprintf(metabuf, "meta%d", i);
267        sprintf(bodybuf, "body%d", i);
268        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
269            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
270        fdb_set(db, doc[i]);
271    }
272
273    // commit
274    if(walflush){
275        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
276        TEST_CHK(status == FDB_RESULT_SUCCESS);
277    } else {
278        status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
279        TEST_CHK(status == FDB_RESULT_SUCCESS);
280    }
281
282    status = fdb_get_file_info(dbfile, &file_info);
283    TEST_CHK(status == FDB_RESULT_SUCCESS);
284    bid = file_info.file_size / fconfig.blocksize;
285
286    // close the db
287    fdb_kvs_close(db);
288    fdb_close(dbfile);
289
290    // Shutdown forest db in the middle of the test to simulate crash
291    fdb_shutdown();
292
293    // append 9K of non-block aligned garbage at end of file
294    r = _disk_dump(test_file, bid * fconfig.blocksize,
295                   (2 * fconfig.blocksize) + (fconfig.blocksize / 4));
296    TEST_CHK(r >= 0);
297
298    // reopen the same file
299    status = fdb_open(&dbfile, test_file, &fconfig);
300    TEST_CHK(status == FDB_RESULT_SUCCESS);
301    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
302    TEST_CHK(status == FDB_RESULT_SUCCESS);
303    status = fdb_set_log_callback(db, logCallbackFunc,
304                                  (void *) "crash_recovery_test");
305    TEST_CHK(status == FDB_RESULT_SUCCESS);
306
307    // retrieve documents
308    for (i=0;i<n;++i){
309        // search by key
310        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
311        status = fdb_get(db, rdoc);
312
313        TEST_CHK(status == FDB_RESULT_SUCCESS);
314        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
315        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
316
317        // free result document
318        fdb_doc_free(rdoc);
319    }
320
321    // retrieve documents by sequence number
322    for (i=0;i<n;++i){
323        // search by seq
324        fdb_doc_create(&rdoc, NULL, 0, NULL, 0, NULL, 0);
325        rdoc->seqnum = i + 1;
326        status = fdb_get_byseq(db, rdoc);
327
328        TEST_CHK(status == FDB_RESULT_SUCCESS);
329
330        // free result document
331        fdb_doc_free(rdoc);
332    }
333
334    // free all documents
335    for (i=0;i<n;++i){
336        fdb_doc_free(doc[i]);
337    }
338
339    // close db file
340    fdb_kvs_close(db);
341    fdb_close(dbfile);
342
343    // free all resources
344    fdb_shutdown();
345
346    memleak_end();
347
348    if(walflush){
349        TEST_RESULT("crash recovery test - walflush");
350    } else {
351        TEST_RESULT("crash recovery test - normal flush");
352    }
353}
354
355void snapshot_test()
356{
357    TEST_INIT();
358
359    memleak_start();
360
361    int i, r;
362    int n = 20;
363    int count;
364    fdb_file_handle *dbfile;
365    fdb_kvs_handle *db;
366    fdb_kvs_handle *snap_db;
367    fdb_seqnum_t snap_seq;
368    fdb_doc **doc = alca(fdb_doc*, n);
369    fdb_doc *rdoc = NULL;
370    fdb_kvs_info kvs_info;
371    fdb_status status;
372    fdb_iterator *iterator;
373
374    char keybuf[256], metabuf[256], bodybuf[256];
375
376    // remove previous mvcc_test files
377    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
378    (void)r;
379
380    fdb_config fconfig = fdb_get_default_config();
381    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
382    fconfig.buffercache_size = 0;
383    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
384    fconfig.wal_threshold = 1024;
385    fconfig.flags = FDB_OPEN_FLAG_CREATE;
386    fconfig.compaction_threshold = 0;
387
388    // remove previous mvcc_test files
389    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
390    (void)r;
391
392    // open db
393    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
394    fdb_kvs_open_default(dbfile, &db, &kvs_config);
395    TEST_CHK(status == FDB_RESULT_SUCCESS);
396
397    // Create a snapshot from an empty database file
398    status = fdb_snapshot_open(db, &snap_db, 0);
399    TEST_CHK(status == FDB_RESULT_SUCCESS);
400    // check if snapshot's sequence number is zero.
401    fdb_get_kvs_info(snap_db, &kvs_info);
402    TEST_CHK(kvs_info.last_seqnum == 0);
403    // create an iterator on the snapshot for full range
404    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
405    // Iterator should not return any items.
406    status = fdb_iterator_get(iterator, &rdoc);
407    TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
408    fdb_iterator_close(iterator);
409    fdb_kvs_close(snap_db);
410
411    // ------- Setup test ----------------------------------
412    // insert documents of 0-4
413    for (i=0; i<n/4; i++){
414        sprintf(keybuf, "key%d", i);
415        sprintf(metabuf, "meta%d", i);
416        sprintf(bodybuf, "body%d", i);
417        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
418            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
419        fdb_set(db, doc[i]);
420    }
421
422    // commit with a manual WAL flush (these docs go into HB-trie)
423    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
424
425    // insert documents from 4 - 8
426    for (; i < n/2 - 1; i++){
427        sprintf(keybuf, "key%d", i);
428        sprintf(metabuf, "meta%d", i);
429        sprintf(bodybuf, "body%d", i);
430        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
431            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
432        fdb_set(db, doc[i]);
433    }
434
435    // We want to create:
436    // |WALFlushHDR|Key-Value1|HDR|Key-Value2|SnapshotHDR|Key-Value1|HDR|
437    // Insert doc 9 with a different value to test duplicate elimination..
438    sprintf(keybuf, "key%d", i);
439    sprintf(metabuf, "meta%d", i);
440    sprintf(bodybuf, "Body%d", i);
441    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
442            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
443    fdb_set(db, doc[i]);
444
445    // commit again without a WAL flush (last doc goes into the AVL tree)
446    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
447
448    // Insert doc 9 now again with expected value..
449    *(char *)doc[i]->body = 'b';
450    fdb_set(db, doc[i]);
451    // commit again without a WAL flush (these documents go into the AVL trees)
452    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
453
454    // TAKE SNAPSHOT: pick up sequence number of a commit without a WAL flush
455    snap_seq = doc[i]->seqnum;
456
457    // Now re-insert doc 9 as another duplicate (only newer sequence number)
458    fdb_set(db, doc[i]);
459    // commit again without a WAL flush (last doc goes into the AVL tree)
460    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
461
462    // insert documents from 10-14 into HB-trie
463    for (++i; i < (n/2 + n/4); i++){
464        sprintf(keybuf, "key%d", i);
465        sprintf(metabuf, "meta%d", i);
466        sprintf(bodybuf, "body%d", i);
467        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
468            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
469        fdb_set(db, doc[i]);
470    }
471    // manually flush WAL & commit
472    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
473
474    status = fdb_set_log_callback(db, logCallbackFunc,
475                                  (void *) "snapshot_test");
476    TEST_CHK(status == FDB_RESULT_SUCCESS);
477    // ---------- Snapshot tests begin -----------------------
478    // Attempt to take snapshot with out-of-range marker..
479    status = fdb_snapshot_open(db, &snap_db, 999999);
480    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
481
482    // Init Snapshot of open file with saved document seqnum as marker
483    status = fdb_snapshot_open(db, &snap_db, snap_seq);
484    TEST_CHK(status == FDB_RESULT_SUCCESS);
485
486    // check snapshot's sequence number
487    fdb_get_kvs_info(snap_db, &kvs_info);
488    TEST_CHK(kvs_info.last_seqnum == snap_seq);
489
490    // insert documents from 15 - 19 on file into the WAL
491    for (; i < n; i++){
492        sprintf(keybuf, "key%d", i);
493        sprintf(metabuf, "meta%d", i);
494        sprintf(bodybuf, "body%d", i);
495        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
496            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
497        fdb_set(db, doc[i]);
498    }
499    // commit without a WAL flush (This WAL must not affect snapshot)
500    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
501
502    // create an iterator on the snapshot for full range
503    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
504
505    // repeat until fail
506    i=0;
507    count=0;
508    do {
509        status = fdb_iterator_get(iterator, &rdoc);
510        TEST_CHK(status == FDB_RESULT_SUCCESS);
511
512        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
513        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
514        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
515
516        fdb_doc_free(rdoc);
517        rdoc = NULL;
518        i ++;
519        count++;
520    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
521
522    TEST_CHK(count==n/2); // Only unique items from the first half
523
524    fdb_iterator_close(iterator);
525
526    // create a sequence iterator on the snapshot for full range
527    fdb_iterator_sequence_init(snap_db, &iterator, 0, 0, FDB_ITR_NONE);
528
529    // repeat until fail
530    i=0;
531    count=0;
532    do {
533        status = fdb_iterator_get(iterator, &rdoc);
534        TEST_CHK(status == FDB_RESULT_SUCCESS);
535
536        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
537        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
538        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
539
540        fdb_doc_free(rdoc);
541        rdoc = NULL;
542        i ++;
543        count++;
544    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
545
546    TEST_CHK(count==n/2); // Only unique items from the first half
547
548    fdb_iterator_close(iterator);
549
550    // close db handle
551    fdb_kvs_close(db);
552    // close snapshot handle
553    fdb_kvs_close(snap_db);
554    // close db file
555    fdb_close(dbfile);
556
557    // free all documents
558    for (i=0;i<n;++i){
559        fdb_doc_free(doc[i]);
560    }
561
562    // 1. Open Database File
563    status = fdb_open(&dbfile, "./mvcc_test2", &fconfig);
564    TEST_CHK(status == FDB_RESULT_SUCCESS);
565
566    // 2. Open KV Store
567    status = fdb_kvs_open(dbfile, &db, "kv2", &kvs_config);
568    TEST_CHK(status == FDB_RESULT_SUCCESS);
569
570    // 2. Create Key 'a' and Commit
571    status = fdb_set_kv(db, (void *) "a", 1, (void *)"val-a", 5);
572    TEST_CHK(status == FDB_RESULT_SUCCESS);
573    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
574    TEST_CHK(status == FDB_RESULT_SUCCESS);
575
576    // 3. Create Key 'b' and Commit
577    status = fdb_set_kv(db, (void *)"b", 1, (void *)"val-b", 5);
578    TEST_CHK(status == FDB_RESULT_SUCCESS);
579    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
580    TEST_CHK(status == FDB_RESULT_SUCCESS);
581
582    // 4. Create Key 'c' and Commit
583    status = fdb_set_kv(db, (void *)"c", 1, (void *)"val-c", 5);
584    TEST_CHK(status == FDB_RESULT_SUCCESS);
585    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
586    TEST_CHK(status == FDB_RESULT_SUCCESS);
587
588    // 5. Remember seqnum for opening snapshot
589    status = fdb_get_kvs_info(db, &kvs_info);
590    TEST_CHK(status == FDB_RESULT_SUCCESS);
591    snap_seq = kvs_info.last_seqnum;
592
593    // 6.  Create an iterator
594    status = fdb_iterator_init(db, &iterator, (void *)"a", 1,
595                               (void *)"c", 1, FDB_ITR_NONE);
596    TEST_CHK(status == FDB_RESULT_SUCCESS);
597
598    // 7. Do a seek
599    status = fdb_iterator_seek(iterator, (void *)"b", 1, FDB_ITR_SEEK_HIGHER);
600    TEST_CHK(status == FDB_RESULT_SUCCESS);
601
602    // 8. Close the iterator
603    status = fdb_iterator_close(iterator);
604    TEST_CHK(status == FDB_RESULT_SUCCESS);
605
606    // 9. Open a snapshot at the same point
607    status = fdb_snapshot_open(db, &snap_db, snap_seq);
608    TEST_CHK(status == FDB_RESULT_SUCCESS);
609
610    // That works, now attempt to do exact same sequence on a snapshot
611    // The snapshot is opened at the exact same place that the original
612    // database handle should be at (last seq 3)
613
614    // 10.  Create an iterator on snapshot
615    status = fdb_iterator_init(snap_db, &iterator, (void *)"a", 1,
616                               (void *)"c", 1, FDB_ITR_NONE);
617    TEST_CHK(status == FDB_RESULT_SUCCESS);
618
619    // 11. Do a seek
620    status = fdb_iterator_seek(iterator, (void *)"b", 1, FDB_ITR_SEEK_HIGHER);
621    TEST_CHK(status == FDB_RESULT_SUCCESS);
622
623    // 12. Close the iterator
624    status = fdb_iterator_close(iterator);
625    TEST_CHK(status == FDB_RESULT_SUCCESS);
626
627    // close db handle
628    fdb_kvs_close(db);
629    // close snapshot handle
630    fdb_kvs_close(snap_db);
631    // close db file
632    fdb_close(dbfile);
633
634    // free all resources
635    fdb_shutdown();
636
637    memleak_end();
638
639    TEST_RESULT("snapshot test");
640}
641
642void snapshot_stats_test()
643{
644    TEST_INIT();
645
646    memleak_start();
647
648    int i, r;
649    int n = 20;
650    fdb_file_handle *dbfile;
651    fdb_kvs_handle *db;
652    fdb_kvs_handle *snap_db;
653    fdb_seqnum_t snap_seq;
654    fdb_doc **doc = alca(fdb_doc*, n);
655    fdb_kvs_info kvs_info;
656    fdb_status status;
657
658    char keybuf[256], metabuf[256], bodybuf[256];
659
660    // remove previous mvcc_test files
661    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
662    (void)r;
663
664    fdb_config fconfig = fdb_get_default_config();
665    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
666    fconfig.buffercache_size = 0;
667    fconfig.wal_threshold = 1024;
668    fconfig.flags = FDB_OPEN_FLAG_CREATE;
669    fconfig.compaction_threshold = 0;
670
671    // remove previous mvcc_test files
672    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
673    (void)r;
674
675    // open db
676    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
677    fdb_kvs_open_default(dbfile, &db, &kvs_config);
678    TEST_CHK(status == FDB_RESULT_SUCCESS);
679
680    // ------- Setup test ----------------------------------
681    // insert documents
682    for (i=0; i<n; i++){
683        sprintf(keybuf, "key%d", i);
684        sprintf(metabuf, "meta%d", i);
685        sprintf(bodybuf, "body%d", i);
686        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
687            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
688        fdb_set(db, doc[i]);
689        fdb_doc_free(doc[i]);
690    }
691
692    // commit with a manual WAL flush (these docs go into HB-trie)
693    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
694
695    // retrieve the sequence number for a snapshot open
696    status = fdb_get_kvs_info(db, &kvs_info);
697    TEST_CHK(status == FDB_RESULT_SUCCESS);
698    snap_seq = kvs_info.last_seqnum;
699
700    status = fdb_set_log_callback(db, logCallbackFunc,
701                                  (void *) "snapshot_stats_test");
702    TEST_CHK(status == FDB_RESULT_SUCCESS);
703
704    // Init Snapshot of open file with saved document seqnum as marker
705    status = fdb_snapshot_open(db, &snap_db, snap_seq);
706    TEST_CHK(status == FDB_RESULT_SUCCESS);
707
708    // check snapshot's sequence number
709    fdb_get_kvs_info(snap_db, &kvs_info);
710    TEST_CHK(kvs_info.last_seqnum == snap_seq);
711
712    TEST_CHK(kvs_info.doc_count == (size_t)n);
713    TEST_CHK(kvs_info.file == dbfile);
714
715    // close snapshot handle
716    fdb_kvs_close(snap_db);
717
718    // Test stats by creating an in-memory snapshot
719    status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
720    TEST_CHK(status == FDB_RESULT_SUCCESS);
721
722    // check snapshot's sequence number
723    fdb_get_kvs_info(snap_db, &kvs_info);
724    TEST_CHK(kvs_info.last_seqnum == snap_seq);
725
726    TEST_CHK(kvs_info.doc_count == (size_t)n);
727    TEST_CHK(kvs_info.file == dbfile);
728
729    // close snapshot handle
730    fdb_kvs_close(snap_db);
731
732    // close db handle
733    fdb_kvs_close(db);
734    // close db file
735    fdb_close(dbfile);
736
737    // free all resources
738    fdb_shutdown();
739
740    memleak_end();
741
742    TEST_RESULT("snapshot stats test");
743}
744
745void snapshot_with_uncomitted_data_test()
746{
747    TEST_INIT();
748
749    int n = 10, value_len=32;
750    int i, r, idx;
751    char cmd[256];
752    char key[256], *value;
753    char keystr[] = "key%06d";
754    char valuestr[] = "value%d";
755    fdb_file_handle *db_file;
756    fdb_kvs_handle *db0, *db1, *db2, *snap;
757    fdb_config config;
758    fdb_kvs_config kvs_config;
759    fdb_kvs_info info;
760    fdb_seqnum_t seqnum;
761    fdb_status s; (void)s;
762
763    sprintf(cmd, SHELL_DEL " mvcc_test* > errorlog.txt");
764    r = system(cmd);
765    (void)r;
766
767    memleak_start();
768
769    value = (char*)malloc(value_len);
770
771    srand(1234);
772    config = fdb_get_default_config();
773    config.seqtree_opt = FDB_SEQTREE_USE;
774    config.wal_flush_before_commit = true;
775    config.multi_kv_instances = true;
776    config.buffercache_size = 0*1024*1024;
777
778    kvs_config = fdb_get_default_kvs_config();
779
780    s = fdb_open(&db_file, "./mvcc_test9", &config);
781    TEST_CHK(s == FDB_RESULT_SUCCESS);
782    s = fdb_kvs_open(db_file, &db0, NULL, &kvs_config);
783    TEST_CHK(s == FDB_RESULT_SUCCESS);
784    s = fdb_kvs_open(db_file, &db1, "db1", &kvs_config);
785    TEST_CHK(s == FDB_RESULT_SUCCESS);
786    s = fdb_kvs_open(db_file, &db2, "db2", &kvs_config);
787    TEST_CHK(s == FDB_RESULT_SUCCESS);
788
789    // insert docs in all KV stores
790    for (i=0;i<n;++i){
791        idx = i;
792        sprintf(key, keystr, idx);
793        memset(value, 'x', value_len);
794        memcpy(value + value_len - 6, "<end>", 6);
795        sprintf(value, valuestr, idx);
796        s = fdb_set_kv(db0, key, strlen(key)+1, value, value_len);
797        TEST_CHK(s == FDB_RESULT_SUCCESS);
798        s = fdb_set_kv(db1, key, strlen(key)+1, value, value_len);
799        TEST_CHK(s == FDB_RESULT_SUCCESS);
800        s = fdb_set_kv(db2, key, strlen(key)+1, value, value_len);
801        TEST_CHK(s == FDB_RESULT_SUCCESS);
802    }
803    // try to open snapshot before commit
804    s = fdb_get_kvs_info(db0, &info);
805    TEST_CHK(s == FDB_RESULT_SUCCESS);
806    s = fdb_snapshot_open(db0, &snap, info.last_seqnum);
807    TEST_CHK(s != FDB_RESULT_SUCCESS);
808
809    s = fdb_get_kvs_info(db1, &info);
810    TEST_CHK(s == FDB_RESULT_SUCCESS);
811    s = fdb_snapshot_open(db1, &snap, info.last_seqnum);
812    TEST_CHK(s != FDB_RESULT_SUCCESS);
813
814    s = fdb_get_kvs_info(db2, &info);
815    TEST_CHK(s == FDB_RESULT_SUCCESS);
816    s = fdb_snapshot_open(db2, &snap, info.last_seqnum);
817    TEST_CHK(s != FDB_RESULT_SUCCESS);
818
819    s = fdb_commit(db_file, FDB_COMMIT_NORMAL);
820    TEST_CHK(s == FDB_RESULT_SUCCESS);
821
822    s = fdb_get_kvs_info(db1, &info);
823    TEST_CHK(s == FDB_RESULT_SUCCESS);
824    seqnum = info.last_seqnum;
825
826    s = fdb_get_kvs_info(db2, &info);
827    TEST_CHK(s == FDB_RESULT_SUCCESS);
828    TEST_CHK(seqnum == info.last_seqnum);
829
830    s = fdb_get_kvs_info(db0, &info);
831    TEST_CHK(s == FDB_RESULT_SUCCESS);
832    TEST_CHK(seqnum == info.last_seqnum);
833
834    // now insert docs into default and db2 only, without commit
835    for (i=0;i<n;++i){
836        idx = i;
837        sprintf(key, keystr, idx);
838        memset(value, 'x', value_len);
839        memcpy(value + value_len - 6, "<end>", 6);
840        sprintf(value, valuestr, idx);
841        s = fdb_set_kv(db0, key, strlen(key)+1, value, value_len);
842        TEST_CHK(s == FDB_RESULT_SUCCESS);
843        s = fdb_set_kv(db2, key, strlen(key)+1, value, value_len);
844        TEST_CHK(s == FDB_RESULT_SUCCESS);
845    }
846
847    // open snapshot on db1
848    s = fdb_get_kvs_info(db1, &info);
849    TEST_CHK(s == FDB_RESULT_SUCCESS);
850    // latest (comitted) seqnum
851    s = fdb_snapshot_open(db1, &snap, info.last_seqnum);
852    TEST_CHK(s == FDB_RESULT_SUCCESS);
853    s = fdb_kvs_close(snap);
854    TEST_CHK(s == FDB_RESULT_SUCCESS);
855
856    // open snapshot on db2
857    s = fdb_get_kvs_info(db2, &info);
858    TEST_CHK(s == FDB_RESULT_SUCCESS);
859
860    // latest (uncomitted) seqnum
861    s = fdb_snapshot_open(db2, &snap, info.last_seqnum);
862    TEST_CHK(s != FDB_RESULT_SUCCESS);
863    // committed seqnum
864    s = fdb_snapshot_open(db2, &snap, seqnum);
865    TEST_CHK(s == FDB_RESULT_SUCCESS);
866    s = fdb_kvs_close(snap);
867    TEST_CHK(s == FDB_RESULT_SUCCESS);
868
869    // open snapshot on default KVS
870    s = fdb_get_kvs_info(db0, &info);
871    TEST_CHK(s == FDB_RESULT_SUCCESS);
872
873    // latest (uncomitted) seqnum
874    s = fdb_snapshot_open(db0, &snap, info.last_seqnum);
875    TEST_CHK(s != FDB_RESULT_SUCCESS);
876    // committed seqnum
877    s = fdb_snapshot_open(db0, &snap, seqnum);
878    TEST_CHK(s == FDB_RESULT_SUCCESS);
879    s = fdb_kvs_close(snap);
880    TEST_CHK(s == FDB_RESULT_SUCCESS);
881
882    s = fdb_close(db_file);
883    TEST_CHK(s == FDB_RESULT_SUCCESS);
884    s = fdb_shutdown();
885    TEST_CHK(s == FDB_RESULT_SUCCESS);
886    free(value);
887    memleak_end();
888
889    TEST_RESULT("snapshot with uncomitted data in other KVS test");
890}
891
892void in_memory_snapshot_test()
893{
894    TEST_INIT();
895
896    memleak_start();
897
898    int i, r;
899    int n = 20;
900    int count;
901    fdb_file_handle *dbfile;
902    fdb_kvs_handle *db;
903    fdb_kvs_handle *snap_db;
904    fdb_seqnum_t snap_seq;
905    fdb_doc **doc = alca(fdb_doc*, n);
906    fdb_doc *rdoc;
907    fdb_kvs_info kvs_info;
908    fdb_status status;
909    fdb_iterator *iterator;
910
911    char keybuf[256], metabuf[256], bodybuf[256];
912
913    // remove previous mvcc_test files
914    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
915    (void)r;
916
917    fdb_config fconfig = fdb_get_default_config();
918    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
919    fconfig.buffercache_size = 0;
920    fconfig.wal_threshold = 1024;
921    fconfig.flags = FDB_OPEN_FLAG_CREATE;
922    fconfig.compaction_threshold = 0;
923
924    // remove previous mvcc_test files
925    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
926    (void)r;
927
928    // open db
929    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
930    TEST_CHK(status == FDB_RESULT_SUCCESS);
931    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
932    TEST_CHK(status == FDB_RESULT_SUCCESS);
933
934    // Create a snapshot from an empty database file
935    status = fdb_snapshot_open(db, &snap_db, 0);
936    TEST_CHK(status == FDB_RESULT_SUCCESS);
937    // check if snapshot's sequence number is zero.
938    fdb_get_kvs_info(snap_db, &kvs_info);
939    TEST_CHK(kvs_info.last_seqnum == 0);
940    // create an iterator on the snapshot for full range
941    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
942    // Iterator should not return any items.
943    status = fdb_iterator_get(iterator, &rdoc);
944    TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
945    fdb_iterator_close(iterator);
946    fdb_kvs_close(snap_db);
947
948    // ------- Setup test ----------------------------------
949    // insert documents of 0-4
950    for (i=0; i<n/4; i++){
951        sprintf(keybuf, "key%d", i);
952        sprintf(metabuf, "meta%d", i);
953        sprintf(bodybuf, "body%d", i);
954        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
955            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
956        fdb_set(db, doc[i]);
957    }
958
959    // commit with a manual WAL flush (these docs go into HB-trie)
960    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
961
962    // insert documents from 5 - 8
963    for (; i < n/2 - 1; i++){
964        sprintf(keybuf, "key%d", i);
965        sprintf(metabuf, "meta%d", i);
966        sprintf(bodybuf, "body%d", i);
967        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
968            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
969        fdb_set(db, doc[i]);
970    }
971
972    // We want to create:
973    // |WALFlushHDR|Key-Value1|HDR|Key-Value2|SnapshotHDR|Key-Value1|HDR|
974    // Insert doc 9 with a different value to test duplicate elimination..
975    sprintf(keybuf, "key%d", i);
976    sprintf(metabuf, "meta%d", i);
977    sprintf(bodybuf, "Body%d", i);
978    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
979            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
980    fdb_set(db, doc[i]);
981
982    // commit again without a WAL flush (last doc goes into the AVL tree)
983    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
984
985    // Insert doc 9 now again with expected value..
986    *(char *)doc[i]->body = 'b';
987    fdb_set(db, doc[i]);
988
989    // TAKE SNAPSHOT: pick up sequence number of a commit without a WAL flush
990    snap_seq = doc[i]->seqnum;
991
992    // Creation of a snapshot with a sequence number that was taken WITHOUT a
993    // commit must fail even if it from the latest document that was set...
994    fdb_get_kvs_info(db, &kvs_info);
995    status = fdb_snapshot_open(db, &snap_db, kvs_info.last_seqnum);
996    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
997
998    // ---------- Snapshot tests begin -----------------------
999    // Initialize an in-memory snapshot Without a Commit...
1000    // WAL items are not flushed...
1001    status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
1002    TEST_CHK(status == FDB_RESULT_SUCCESS);
1003
1004    // commit again without a WAL flush (these documents go into the AVL trees)
1005    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1006
1007    // Now re-insert doc 9 as another duplicate (only newer sequence number)
1008    fdb_set(db, doc[i]);
1009
1010    // commit again without a WAL flush (last doc goes into the AVL tree)
1011    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1012
1013    // insert documents from 10-14 into HB-trie
1014    for (++i; i < (n/2 + n/4); i++){
1015        sprintf(keybuf, "key%d", i);
1016        sprintf(metabuf, "meta%d", i);
1017        sprintf(bodybuf, "body%d", i);
1018        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1019                (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1020        fdb_set(db, doc[i]);
1021    }
1022
1023    status = fdb_set_log_callback(db, logCallbackFunc,
1024                                  (void *) "in_memory_snapshot_test");
1025    TEST_CHK(status == FDB_RESULT_SUCCESS);
1026
1027    // check snapshot's sequence number
1028    fdb_get_kvs_info(snap_db, &kvs_info);
1029    TEST_CHK(kvs_info.last_seqnum == snap_seq);
1030
1031    // insert documents from 15 - 19 on file into the WAL
1032    for (; i < n; i++){
1033        sprintf(keybuf, "key%d", i);
1034        sprintf(metabuf, "meta%d", i);
1035        sprintf(bodybuf, "body%d", i);
1036        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1037            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1038        fdb_set(db, doc[i]);
1039    }
1040    // commit without a WAL flush (This WAL must not affect snapshot)
1041    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1042
1043    // create an iterator on the snapshot for full range
1044    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1045
1046    // repeat until fail
1047    i=0;
1048    count=0;
1049    rdoc = NULL;
1050    do {
1051        status = fdb_iterator_get(iterator, &rdoc);
1052        TEST_CHK(status == FDB_RESULT_SUCCESS);
1053
1054        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1055        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
1056        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1057
1058        fdb_doc_free(rdoc);
1059        rdoc = NULL;
1060        i++;
1061        count++;
1062    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
1063
1064    TEST_CHK(count==n/2); // Only unique items from the first half
1065
1066    fdb_iterator_close(iterator);
1067
1068    // close db handle
1069    fdb_kvs_close(db);
1070    // close snapshot handle
1071    fdb_kvs_close(snap_db);
1072
1073    // close db file
1074    fdb_close(dbfile);
1075
1076    // free all documents
1077    for (i=0;i<n;++i){
1078        fdb_doc_free(doc[i]);
1079    }
1080
1081    // free all resources
1082    fdb_shutdown();
1083
1084    memleak_end();
1085
1086    TEST_RESULT("in-memory snapshot test");
1087}
1088
1089void in_memory_snapshot_cleanup_test()
1090{
1091    TEST_INIT();
1092
1093    memleak_start();
1094
1095    int r;
1096    fdb_file_handle *dbfile;
1097    fdb_kvs_handle *db;
1098    fdb_kvs_handle *snap_db1, *snap_db2, *snap_db3;
1099    fdb_kvs_handle *psnap_db1, *psnap_db2, *psnap_db3;
1100    fdb_doc *doc = NULL;
1101    fdb_doc *rdoc;
1102    fdb_status status;
1103
1104    char keybuf[32], metabuf[32], bodybuf[32];
1105
1106    // remove previous mvcc_test files
1107    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1108    (void)r;
1109
1110    fdb_config fconfig = fdb_get_default_config();
1111    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1112    fconfig.wal_threshold = 1024;
1113    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1114    fconfig.compaction_threshold = 0;
1115    fconfig.block_reusing_threshold = 0;
1116
1117    // remove previous mvcc_test files
1118    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1119    (void)r;
1120
1121    // open db
1122    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
1123    TEST_CHK(status == FDB_RESULT_SUCCESS);
1124    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
1125    TEST_CHK(status == FDB_RESULT_SUCCESS);
1126
1127    status = fdb_set_log_callback(db, logCallbackFunc,
1128                                  (void *) "in_memory_snapshot_cleanup_test");
1129    TEST_CHK(status == FDB_RESULT_SUCCESS);
1130
1131    sprintf(keybuf, "key");
1132    sprintf(metabuf, "meta");
1133    sprintf(bodybuf, "body");
1134    fdb_doc_create(&doc, (void*)keybuf, strlen(keybuf),
1135            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1136    fdb_set(db, doc);
1137
1138    // commit without a WAL flush
1139    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1140
1141    status = fdb_snapshot_open(db, &snap_db1, FDB_SNAPSHOT_INMEM);
1142    TEST_CHK(status == FDB_RESULT_SUCCESS);
1143
1144    status = fdb_set(db, doc);
1145    TEST_CHK(status == FDB_RESULT_SUCCESS);
1146
1147    // commit without a WAL flush
1148    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1149    TEST_CHK(status == FDB_RESULT_SUCCESS);
1150
1151    status = fdb_snapshot_open(db, &snap_db2, FDB_SNAPSHOT_INMEM);
1152    TEST_CHK(status == FDB_RESULT_SUCCESS);
1153
1154    status = fdb_set(db, doc);
1155    TEST_CHK(status == FDB_RESULT_SUCCESS);
1156
1157    // commit without a WAL flush
1158    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1159    TEST_CHK(status == FDB_RESULT_SUCCESS);
1160
1161    status = fdb_set(db, doc);
1162    TEST_CHK(status == FDB_RESULT_SUCCESS);
1163
1164    // commit without a WAL flush
1165    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1166    TEST_CHK(status == FDB_RESULT_SUCCESS);
1167
1168    fdb_doc_free(doc);
1169
1170    // Update doc with new Key
1171    char newKey[32];
1172    sprintf(newKey, "Key");
1173    fdb_doc_create(&doc, (void*)newKey, strlen(newKey),
1174            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1175    fdb_set(db, doc);
1176
1177    status = fdb_snapshot_open(db, &snap_db3, FDB_SNAPSHOT_INMEM);
1178    TEST_CHK(status == FDB_RESULT_SUCCESS);
1179
1180    fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
1181
1182    status = fdb_get(snap_db1, rdoc);
1183    TEST_CHK(status == FDB_RESULT_SUCCESS);
1184
1185    status = fdb_get(snap_db2, rdoc);
1186    TEST_CHK(status == FDB_RESULT_SUCCESS);
1187
1188    status = fdb_get(snap_db3, rdoc);
1189    TEST_CHK(status == FDB_RESULT_SUCCESS);
1190
1191    fdb_doc_free(rdoc);
1192
1193    status = fdb_snapshot_open(db, &psnap_db1, 1);
1194    TEST_CHK(status == FDB_RESULT_SUCCESS);
1195
1196    status = fdb_snapshot_open(db, &psnap_db2, 2);
1197    TEST_CHK(status == FDB_RESULT_SUCCESS);
1198
1199    status = fdb_snapshot_open(db, &psnap_db3, 3);
1200    TEST_CHK(status == FDB_RESULT_SUCCESS);
1201
1202    // close snapshot handle
1203    fdb_kvs_close(snap_db1);
1204    fdb_kvs_close(snap_db2);
1205    fdb_kvs_close(snap_db3);
1206
1207    // commit without a WAL flush
1208    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1209    TEST_CHK(status == FDB_RESULT_SUCCESS);
1210
1211    fdb_kvs_close(psnap_db1);
1212    fdb_kvs_close(psnap_db2);
1213    fdb_kvs_close(psnap_db3);
1214
1215    // close db handle
1216    fdb_kvs_close(db);
1217
1218    // close db file
1219    fdb_close(dbfile);
1220
1221    fdb_doc_free(doc);
1222
1223    // free all resources
1224    fdb_shutdown();
1225
1226    memleak_end();
1227
1228    TEST_RESULT("in-memory snapshot cleanup test");
1229}
1230
1231void in_memory_snapshot_on_dirty_hbtrie_test()
1232{
1233    TEST_INIT();
1234
1235    int n = 300, value_len=32;
1236    int i, r, idx, c;
1237    char cmd[256];
1238    char key[256], *value;
1239    char keystr[] = "k%05d";
1240    char keystr2[] = "k%06d";
1241    char valuestr[] = "value%08d";
1242    fdb_file_handle *db_file;
1243    fdb_kvs_handle *db, *snap, *snap_clone;
1244    fdb_config config;
1245    fdb_kvs_config kvs_config;
1246    fdb_status s;
1247    fdb_iterator *fit, *fit_normal, *fit_clone;
1248    fdb_doc *doc;
1249
1250    sprintf(cmd, SHELL_DEL " mvcc_test* > errorlog.txt");
1251    r = system(cmd);
1252    (void)r;
1253
1254    memleak_start();
1255
1256    value = (char*)malloc(value_len);
1257
1258    config = fdb_get_default_config();
1259    config.durability_opt = FDB_DRB_ASYNC;
1260    config.seqtree_opt = FDB_SEQTREE_USE;
1261    config.wal_flush_before_commit = true;
1262    config.wal_threshold = n/5;
1263    config.multi_kv_instances = true;
1264    config.buffercache_size = 0;
1265
1266    kvs_config = fdb_get_default_kvs_config();
1267
1268    s = fdb_open(&db_file, "./mvcc_test", &config);
1269    TEST_CHK(s == FDB_RESULT_SUCCESS);
1270    s = fdb_kvs_open(db_file, &db, "db", &kvs_config);
1271    TEST_CHK(s == FDB_RESULT_SUCCESS);
1272
1273    // write a few documents and commit & wal flush
1274    for (i=0;i<n/10;++i){
1275        idx = i;
1276        sprintf(key, keystr, idx);
1277        memset(value, 'x', value_len);
1278        memcpy(value + value_len - 6, "<end>", 6);
1279        sprintf(value, valuestr, idx);
1280        s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1281        TEST_CHK(s == FDB_RESULT_SUCCESS);
1282    }
1283    s = fdb_commit(db_file, FDB_COMMIT_MANUAL_WAL_FLUSH);
1284    TEST_CHK(s == FDB_RESULT_SUCCESS);
1285
1286    // create an in-memory snapshot and its clone
1287    s = fdb_snapshot_open(db, &snap, FDB_SNAPSHOT_INMEM);
1288    TEST_CHK(s == FDB_RESULT_SUCCESS);
1289    s = fdb_snapshot_open(snap, &snap_clone, FDB_SNAPSHOT_INMEM);
1290    TEST_CHK(s == FDB_RESULT_SUCCESS);
1291
1292    // write a number of documents in order to make WAL be flushed before commit
1293    for (i=n/10;i<n;++i){
1294        idx = i;
1295        sprintf(key, keystr, idx);
1296        memset(value, 'x', value_len);
1297        memcpy(value + value_len - 6, "<end>", 6);
1298        sprintf(value, valuestr, idx);
1299        s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1300        TEST_CHK(s == FDB_RESULT_SUCCESS);
1301    }
1302
1303    void *v_out;
1304    size_t vlen_out;
1305    idx = n/10;
1306    sprintf(key, keystr, idx);
1307    s = fdb_get_kv(snap, key, strlen(key)+1, &v_out, &vlen_out);
1308    // should not be able to retrieve
1309    TEST_CHK(s != FDB_RESULT_SUCCESS);
1310
1311    s = fdb_get_kv(snap_clone, key, strlen(key)+1, &v_out, &vlen_out);
1312    // should not be able to retrieve in also clone
1313    TEST_CHK(s != FDB_RESULT_SUCCESS);
1314
1315    // close snapshot and its clone
1316    s = fdb_kvs_close(snap);
1317    TEST_CHK(s == FDB_RESULT_SUCCESS);
1318    s = fdb_kvs_close(snap_clone);
1319    TEST_CHK(s == FDB_RESULT_SUCCESS);
1320
1321    // create an in-memory snapshot
1322    s = fdb_snapshot_open(db, &snap, FDB_SNAPSHOT_INMEM);
1323    TEST_CHK(s == FDB_RESULT_SUCCESS);
1324
1325    s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1326    TEST_CHK(s == FDB_RESULT_SUCCESS);
1327    c = 0;
1328    do {
1329        doc = NULL;
1330        s = fdb_iterator_get(fit, &doc);
1331        if (s != FDB_RESULT_SUCCESS) break;
1332
1333        idx = c;
1334        sprintf(key, keystr, idx);
1335        memset(value, 'x', value_len);
1336        memcpy(value + value_len - 6, "<end>", 6);
1337        sprintf(value, valuestr, idx);
1338        TEST_CMP(doc->key, key, doc->keylen);
1339        TEST_CMP(doc->body, value, doc->bodylen);
1340        c++;
1341        fdb_doc_free(doc);
1342    } while(fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1343    TEST_CHK(c == n);
1344
1345    s = fdb_iterator_close(fit);
1346    TEST_CHK(s == FDB_RESULT_SUCCESS);
1347
1348    // create a clone
1349    s = fdb_snapshot_open(snap, &snap_clone, FDB_SNAPSHOT_INMEM);
1350    TEST_CHK(s == FDB_RESULT_SUCCESS);
1351
1352    // create iterators on snapshot and its clone
1353    s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1354    TEST_CHK(s == FDB_RESULT_SUCCESS);
1355    s = fdb_iterator_init(snap_clone, &fit_clone, NULL, 0, NULL, 0, 0x0);
1356    TEST_CHK(s == FDB_RESULT_SUCCESS);
1357
1358    // also create an iterator on a normal handle
1359    s = fdb_iterator_init(db, &fit_normal, NULL, 0, NULL, 0, 0x0);
1360    TEST_CHK(s == FDB_RESULT_SUCCESS);
1361
1362    c = 0;
1363    do {
1364        doc = NULL;
1365        s = fdb_iterator_get(fit, &doc);
1366        if (s != FDB_RESULT_SUCCESS) break;
1367
1368        idx = c;
1369        sprintf(key, keystr, idx);
1370        memset(value, 'x', value_len);
1371        memcpy(value + value_len - 6, "<end>", 6);
1372        sprintf(value, valuestr, idx);
1373        TEST_CMP(doc->key, key, doc->keylen);
1374        TEST_CMP(doc->body, value, doc->bodylen);
1375        c++;
1376        fdb_doc_free(doc);
1377
1378        doc = NULL;
1379        s = fdb_iterator_get(fit, &doc);
1380        TEST_CHK(s == FDB_RESULT_SUCCESS);
1381        TEST_CMP(doc->key, key, doc->keylen);
1382        TEST_CMP(doc->body, value, doc->bodylen);
1383        fdb_doc_free(doc);
1384
1385        if (c == n/5) {
1386            // insert new docs in the middle of iteration
1387            for (i=0; i<n*10; ++i){
1388                idx = i;
1389                sprintf(key, keystr2, idx);
1390                memset(value, 'x', value_len);
1391                memcpy(value + value_len - 6, "<end>", 6);
1392                sprintf(value, valuestr, idx);
1393                s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1394                TEST_CHK(s == FDB_RESULT_SUCCESS);
1395            }
1396        }
1397
1398        s = fdb_iterator_next(fit);
1399        // result should be same to clone
1400        if (s != FDB_RESULT_SUCCESS) {
1401            s = fdb_iterator_next(fit_clone);
1402            TEST_CHK(s != FDB_RESULT_SUCCESS);
1403        } else {
1404            s = fdb_iterator_next(fit_clone);
1405            TEST_CHK(s == FDB_RESULT_SUCCESS);
1406        }
1407    } while(s == FDB_RESULT_SUCCESS);
1408    TEST_CHK(c == n);
1409
1410    // close iterators
1411    s = fdb_iterator_close(fit);
1412    TEST_CHK(s == FDB_RESULT_SUCCESS);
1413    s = fdb_iterator_close(fit_clone);
1414    TEST_CHK(s == FDB_RESULT_SUCCESS);
1415
1416    // the results should be same in the normal iterator
1417    c = 0;
1418    do {
1419        doc = NULL;
1420        s = fdb_iterator_get(fit_normal, &doc);
1421        if (s != FDB_RESULT_SUCCESS) break;
1422
1423        idx = c;
1424        sprintf(key, keystr, idx);
1425        memset(value, 'x', value_len);
1426        memcpy(value + value_len - 6, "<end>", 6);
1427        sprintf(value, valuestr, idx);
1428        TEST_CMP(doc->key, key, doc->keylen);
1429        TEST_CMP(doc->body, value, doc->bodylen);
1430        c++;
1431        fdb_doc_free(doc);
1432    } while(fdb_iterator_next(fit_normal) == FDB_RESULT_SUCCESS);
1433    TEST_CHK(c == n);
1434
1435    s = fdb_iterator_close(fit_normal);
1436    TEST_CHK(s == FDB_RESULT_SUCCESS);
1437
1438    s = fdb_kvs_close(snap);
1439    TEST_CHK(s == FDB_RESULT_SUCCESS);
1440
1441    s = fdb_kvs_close(snap_clone);
1442    TEST_CHK(s == FDB_RESULT_SUCCESS);
1443
1444    s = fdb_close(db_file);
1445    TEST_CHK(s == FDB_RESULT_SUCCESS);
1446    s = fdb_shutdown();
1447    TEST_CHK(s == FDB_RESULT_SUCCESS);
1448    free(value);
1449
1450    memleak_end();
1451
1452    TEST_RESULT("in-memory snapshot on dirty HB+trie nodes test");
1453}
1454
1455
1456struct cb_inmem_snap_args {
1457    fdb_kvs_handle *handle;
1458    int n;
1459    int move_count;
1460    int value_len;
1461    char *keystr;
1462    char *valuestr;
1463};
1464
1465static fdb_compact_decision cb_inmem_snap(fdb_file_handle *fhandle,
1466                            fdb_compaction_status status,
1467                            const char *kv_name,
1468                            fdb_doc *doc_in, uint64_t old_offset,
1469                            uint64_t new_offset,
1470                            void *ctx)
1471{
1472    TEST_INIT();
1473    int c, idx;
1474    char key[256], value[256];
1475    void *value_out;
1476    size_t valuelen;
1477    fdb_kvs_handle *snap;
1478    fdb_kvs_info info;
1479    fdb_iterator *fit;
1480    fdb_doc *doc;
1481    fdb_status s;
1482    fdb_compact_decision ret = FDB_CS_KEEP_DOC;
1483    struct cb_inmem_snap_args *args = (struct cb_inmem_snap_args *)ctx;
1484    (void)args;
1485
1486    if (status == FDB_CS_MOVE_DOC) {
1487        TEST_CHK(kv_name);
1488        if (doc_in->deleted) {
1489            ret = FDB_CS_DROP_DOC;
1490        }
1491        args->move_count++;
1492        if (args->move_count == 2) {
1493            // open in-memory snapshot
1494            s = fdb_snapshot_open(args->handle, &snap, FDB_SNAPSHOT_INMEM);
1495            TEST_CHK(s == FDB_RESULT_SUCCESS);
1496
1497            s = fdb_get_kvs_info(snap, &info);
1498            TEST_CHK(s == FDB_RESULT_SUCCESS);
1499            TEST_CHK(info.last_seqnum == (fdb_seqnum_t)args->n);
1500
1501            s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1502            TEST_CHK(s == FDB_RESULT_SUCCESS);
1503
1504            c = 0;
1505            do {
1506                doc = NULL;
1507                s = fdb_iterator_get(fit, &doc);
1508                if (s != FDB_RESULT_SUCCESS) {
1509                    break;
1510                }
1511                idx = c;
1512                sprintf(key, args->keystr, idx);
1513                memset(value, 'x', args->value_len);
1514                memcpy(value + args->value_len - 6, "<end>", 6);
1515                sprintf(value, args->valuestr, idx);
1516                TEST_CMP(doc->key, key, doc->keylen);
1517                TEST_CMP(doc->body, value, doc->bodylen);
1518
1519                c++;
1520                fdb_doc_free(doc);
1521            } while (fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1522            TEST_CHK(c == args->n);
1523
1524            s = fdb_iterator_close(fit);
1525            TEST_CHK(s == FDB_RESULT_SUCCESS);
1526
1527            fdb_kvs_close(snap);
1528
1529        } else if (args->move_count == 5) {
1530            // insert new doc
1531            sprintf(key, "new_key");
1532            sprintf(value, "new_value");
1533            s = fdb_set_kv(args->handle, (void*)key, strlen(key)+1, (void*)value, strlen(value)+1);
1534            TEST_CHK(s == FDB_RESULT_SUCCESS);
1535
1536            // open in-memory snapshot
1537            s = fdb_snapshot_open(args->handle, &snap, FDB_SNAPSHOT_INMEM);
1538            TEST_CHK(s == FDB_RESULT_SUCCESS);
1539
1540            s = fdb_get_kvs_info(snap, &info);
1541            TEST_CHK(s == FDB_RESULT_SUCCESS);
1542            TEST_CHK(info.last_seqnum == (fdb_seqnum_t)args->n+1);
1543
1544            s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1545            TEST_CHK(s == FDB_RESULT_SUCCESS);
1546
1547            c = 0;
1548            do {
1549                doc = NULL;
1550                s = fdb_iterator_get(fit, &doc);
1551                if (s != FDB_RESULT_SUCCESS) {
1552                    break;
1553                }
1554                if (c < args->n) {
1555                    idx = c;
1556                    sprintf(key, args->keystr, idx);
1557                    memset(value, 'x', args->value_len);
1558                    memcpy(value + args->value_len - 6, "<end>", 6);
1559                    sprintf(value, args->valuestr, idx);
1560                    TEST_CMP(doc->key, key, doc->keylen);
1561                    TEST_CMP(doc->body, value, doc->bodylen);
1562                } else {
1563                    // new document
1564                    sprintf(key, "new_key");
1565                    sprintf(value, "new_value");
1566                    TEST_CMP(doc->key, key, doc->keylen);
1567                    TEST_CMP(doc->body, value, doc->bodylen);
1568                }
1569
1570                c++;
1571                fdb_doc_free(doc);
1572            } while (fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1573            TEST_CHK(c == args->n + 1);
1574
1575            s = fdb_iterator_close(fit);
1576            TEST_CHK(s == FDB_RESULT_SUCCESS);
1577
1578            s = fdb_get_kv(snap, (void*)key, strlen(key)+1, &value_out, &valuelen);
1579            TEST_CHK(s == FDB_RESULT_SUCCESS);
1580            TEST_CMP(value_out, value, valuelen);
1581            fdb_free_block(value_out);
1582
1583            fdb_kvs_close(snap);
1584        }
1585    }
1586    return ret;
1587}
1588
1589void in_memory_snapshot_compaction_test()
1590{
1591    TEST_INIT();
1592
1593    int n = 10, value_len=32;
1594    int i, r, c, idx;
1595    char cmd[256];
1596    char key[256], *value;
1597    char keystr[] = "k%05d";
1598    char valuestr[] = "value%08d";
1599    void *value_out;
1600    size_t valuelen;
1601    fdb_file_handle *db_file;
1602    fdb_kvs_handle *db, *db2, *snap;
1603    fdb_config config;
1604    fdb_kvs_config kvs_config;
1605    fdb_kvs_info info;
1606    fdb_iterator *fit;
1607    fdb_doc *doc;
1608    fdb_status s;
1609    struct cb_inmem_snap_args cargs;
1610
1611    sprintf(cmd, SHELL_DEL " mvcc_test* > errorlog.txt");
1612    r = system(cmd);
1613    (void)r;
1614
1615    memleak_start();
1616
1617    value = (char*)malloc(value_len);
1618
1619    config = fdb_get_default_config();
1620    config.durability_opt = FDB_DRB_ASYNC;
1621    config.seqtree_opt = FDB_SEQTREE_USE;
1622    config.wal_flush_before_commit = true;
1623    config.wal_threshold = n/5;
1624    config.multi_kv_instances = true;
1625    config.buffercache_size = 0;
1626    config.compaction_cb = cb_inmem_snap;
1627    config.compaction_cb_mask = FDB_CS_MOVE_DOC;
1628    config.compaction_cb_ctx = &cargs;
1629
1630    kvs_config = fdb_get_default_kvs_config();
1631
1632    s = fdb_open(&db_file, "./mvcc_test", &config);
1633    TEST_CHK(s == FDB_RESULT_SUCCESS);
1634    s = fdb_kvs_open(db_file, &db, "db", &kvs_config);
1635    TEST_CHK(s == FDB_RESULT_SUCCESS);
1636
1637    s = fdb_kvs_open(db_file, &db2, "db", &kvs_config);
1638    TEST_CHK(s == FDB_RESULT_SUCCESS);
1639
1640    cargs.handle = db2;
1641    cargs.move_count = 0;
1642    cargs.n = n;
1643    cargs.keystr = keystr;
1644    cargs.valuestr = valuestr;
1645    cargs.value_len = value_len;
1646
1647    // write
1648    for (i=0;i<n;++i){
1649        idx = i;
1650        sprintf(key, keystr, idx);
1651        memset(value, 'x', value_len);
1652        memcpy(value + value_len - 6, "<end>", 6);
1653        sprintf(value, valuestr, idx);
1654        s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1655        TEST_CHK(s == FDB_RESULT_SUCCESS);
1656    }
1657    s = fdb_commit(db_file, FDB_COMMIT_MANUAL_WAL_FLUSH);
1658    TEST_CHK(s == FDB_RESULT_SUCCESS);
1659
1660    s = fdb_compact(db_file, "./mvcc_test2");
1661    TEST_CHK(s == FDB_RESULT_SUCCESS);
1662
1663    // open in-memory snapshot
1664    s = fdb_snapshot_open(db, &snap, FDB_SNAPSHOT_INMEM);
1665    TEST_CHK(s == FDB_RESULT_SUCCESS);
1666
1667    s = fdb_get_kvs_info(snap, &info);
1668    TEST_CHK(s == FDB_RESULT_SUCCESS);
1669    TEST_CHK(info.last_seqnum == (fdb_seqnum_t)n+1);
1670
1671    s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1672    TEST_CHK(s == FDB_RESULT_SUCCESS);
1673
1674    c = 0;
1675    do {
1676        doc = NULL;
1677        s = fdb_iterator_get(fit, &doc);
1678        if (s != FDB_RESULT_SUCCESS) {
1679            break;
1680        }
1681        if (c < n) {
1682            idx = c;
1683            sprintf(key, keystr, idx);
1684            memset(value, 'x', value_len);
1685            memcpy(value + value_len - 6, "<end>", 6);
1686            sprintf(value, valuestr, idx);
1687            TEST_CMP(doc->key, key, doc->keylen);
1688            TEST_CMP(doc->body, value, doc->bodylen);
1689        } else {
1690            // new document
1691            sprintf(key, "new_key");
1692            sprintf(value, "new_value");
1693            TEST_CMP(doc->key, key, doc->keylen);
1694            TEST_CMP(doc->body, value, doc->bodylen);
1695        }
1696
1697        c++;
1698        fdb_doc_free(doc);
1699    } while (fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1700    TEST_CHK(c == n + 1);
1701
1702    s = fdb_iterator_close(fit);
1703    TEST_CHK(s == FDB_RESULT_SUCCESS);
1704
1705    s = fdb_get_kv(snap, (void*)key, strlen(key)+1, &value_out, &valuelen);
1706    TEST_CHK(s == FDB_RESULT_SUCCESS);
1707    TEST_CMP(value_out, value, valuelen);
1708    fdb_free_block(value_out);
1709
1710    fdb_kvs_close(snap);
1711
1712    s = fdb_close(db_file);
1713    TEST_CHK(s == FDB_RESULT_SUCCESS);
1714    s = fdb_shutdown();
1715    TEST_CHK(s == FDB_RESULT_SUCCESS);
1716    free(value);
1717
1718    memleak_end();
1719
1720    TEST_RESULT("in-memory snapshot with concurrent compaction test");
1721}
1722
1723void snapshot_clone_test()
1724{
1725    TEST_INIT();
1726
1727    memleak_start();
1728
1729    int i, r;
1730    int n = 20;
1731    int count;
1732    fdb_file_handle *dbfile;
1733    fdb_kvs_handle *db;
1734    fdb_kvs_handle *snap_db, *snap_inmem;
1735    fdb_kvs_handle *snap_db2, *snap_inmem2; // clones from stable & inmemory
1736    fdb_seqnum_t snap_seq;
1737    fdb_doc **doc = alca(fdb_doc*, n);
1738    fdb_doc *rdoc;
1739    fdb_kvs_info kvs_info;
1740    fdb_status status;
1741    fdb_iterator *iterator;
1742
1743    char keybuf[256], metabuf[256], bodybuf[256];
1744
1745    // remove previous mvcc_test files
1746    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1747    (void)r;
1748
1749    fdb_config fconfig = fdb_get_default_config();
1750    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1751    fconfig.buffercache_size = 0;
1752    fconfig.wal_threshold = 1024;
1753    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1754    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1755    fconfig.compaction_threshold = 0;
1756
1757    // remove previous mvcc_test files
1758    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1759    (void)r;
1760
1761    // open db
1762    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
1763    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1764    TEST_CHK(status == FDB_RESULT_SUCCESS);
1765
1766    // Create a snapshot from an empty database file
1767    status = fdb_snapshot_open(db, &snap_db, 0);
1768    TEST_CHK(status == FDB_RESULT_SUCCESS);
1769    // check if snapshot's sequence number is zero.
1770    fdb_get_kvs_info(snap_db, &kvs_info);
1771    TEST_CHK(kvs_info.last_seqnum == 0);
1772    // create an iterator on the snapshot for full range
1773    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1774    // Iterator should not return any items.
1775    status = fdb_iterator_next(iterator);
1776    TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
1777    fdb_iterator_close(iterator);
1778    fdb_kvs_close(snap_db);
1779
1780    // ------- Setup test ----------------------------------
1781    // insert documents of 0-4
1782    for (i=0; i<n/4; i++){
1783        sprintf(keybuf, "key%d", i);
1784        sprintf(metabuf, "meta%d", i);
1785        sprintf(bodybuf, "body%d", i);
1786        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1787            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1788        fdb_set(db, doc[i]);
1789    }
1790
1791    // commit with a manual WAL flush (these docs go into HB-trie)
1792    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1793
1794    // insert documents from 4 - 8
1795    for (; i < n/2 - 1; i++){
1796        sprintf(keybuf, "key%d", i);
1797        sprintf(metabuf, "meta%d", i);
1798        sprintf(bodybuf, "body%d", i);
1799        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1800            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1801        fdb_set(db, doc[i]);
1802    }
1803
1804    // We want to create:
1805    // |WALFlushHDR|Key-Value1|HDR|Key-Value2|SnapshotHDR|Key-Value1|HDR|
1806    // Insert doc 9 with a different value to test duplicate elimination..
1807    sprintf(keybuf, "key%d", i);
1808    sprintf(metabuf, "meta%d", i);
1809    sprintf(bodybuf, "Body%d", i);
1810    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1811            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1812    fdb_set(db, doc[i]);
1813
1814    // commit again without a WAL flush (last doc goes into the AVL tree)
1815    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1816
1817    // Insert doc 9 now again with expected value..
1818    *(char *)doc[i]->body = 'b';
1819    fdb_set(db, doc[i]);
1820    // commit again without a WAL flush (these documents go into the AVL trees)
1821    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1822
1823    // TAKE SNAPSHOT: pick up sequence number of a commit without a WAL flush
1824    snap_seq = doc[i]->seqnum;
1825
1826    // Initialize an in-memory snapshot Without a Commit...
1827    // WAL items are not flushed...
1828    status = fdb_snapshot_open(db, &snap_inmem, FDB_SNAPSHOT_INMEM);
1829    TEST_CHK(status == FDB_RESULT_SUCCESS);
1830
1831    // Now re-insert doc 9 as another duplicate (only newer sequence number)
1832    fdb_set(db, doc[i]);
1833    // commit again without a WAL flush (last doc goes into the AVL tree)
1834    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1835
1836    // insert documents from 10-14 into HB-trie
1837    for (++i; i < (n/2 + n/4); i++){
1838        sprintf(keybuf, "key%d", i);
1839        sprintf(metabuf, "meta%d", i);
1840        sprintf(bodybuf, "body%d", i);
1841        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1842            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1843        fdb_set(db, doc[i]);
1844    }
1845    // manually flush WAL & commit
1846    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1847
1848    status = fdb_set_log_callback(db, logCallbackFunc,
1849                                  (void *) "snapshot_clone_test");
1850    TEST_CHK(status == FDB_RESULT_SUCCESS);
1851    // ---------- Snapshot tests begin -----------------------
1852    // Attempt to take snapshot with out-of-range marker..
1853    status = fdb_snapshot_open(db, &snap_db, 999999);
1854    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
1855
1856    // Init Snapshot of open file with saved document seqnum as marker
1857    status = fdb_snapshot_open(db, &snap_db, snap_seq);
1858    TEST_CHK(status == FDB_RESULT_SUCCESS);
1859
1860    // Clone a snapshot into another snapshot...
1861    status = fdb_snapshot_open(snap_db, &snap_db2, snap_seq);
1862    TEST_CHK(status == FDB_RESULT_SUCCESS);
1863
1864    // close snapshot handle
1865    status = fdb_kvs_close(snap_db);
1866    TEST_CHK(status == FDB_RESULT_SUCCESS);
1867
1868    // check snapshot's sequence number
1869    fdb_get_kvs_info(snap_db2, &kvs_info);
1870    TEST_CHK(kvs_info.last_seqnum == snap_seq);
1871
1872    // insert documents from 15 - 19 on file into the WAL
1873    for (; i < n; i++){
1874        sprintf(keybuf, "key%d", i);
1875        sprintf(metabuf, "meta%d", i);
1876        sprintf(bodybuf, "body%d", i);
1877        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1878            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1879        fdb_set(db, doc[i]);
1880    }
1881    // commit without a WAL flush (This WAL must not affect snapshot)
1882    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1883
1884    // Clone the in-memory snapshot into another snapshot
1885    status = fdb_snapshot_open(snap_inmem, &snap_inmem2, FDB_SNAPSHOT_INMEM);
1886    TEST_CHK(status == FDB_RESULT_SUCCESS);
1887
1888    // Stable Snapshot Scan Tests..........
1889    // Retrieve metaonly by key from snapshot
1890    i = snap_seq + 1;
1891    fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1892    status = fdb_get_metaonly(snap_db2, rdoc);
1893    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
1894    fdb_doc_free(rdoc);
1895
1896    i = 5;
1897    fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1898    status = fdb_get_metaonly(snap_db2, rdoc);
1899    TEST_CHK(status == FDB_RESULT_SUCCESS);
1900    TEST_CMP(rdoc->key, doc[i]->key, doc[i]->keylen);
1901    TEST_CMP(rdoc->meta, doc[i]->meta, doc[i]->metalen);
1902    fdb_doc_free(rdoc);
1903
1904    // Retrieve by seq from snapshot
1905    fdb_doc_create(&rdoc, NULL, 0, NULL, 0, NULL, 0);
1906    rdoc->seqnum = 6;
1907    status = fdb_get_byseq(snap_db2, rdoc);
1908    TEST_CHK(status == FDB_RESULT_SUCCESS);
1909    TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1910    TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1911    fdb_doc_free(rdoc);
1912    rdoc = NULL;
1913
1914    // create an iterator on the snapshot for full range
1915    fdb_iterator_init(snap_db2, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1916
1917    // repeat until fail
1918    i=0;
1919    count=0;
1920    do {
1921        status = fdb_iterator_get(iterator, &rdoc);
1922        TEST_CHK(status == FDB_RESULT_SUCCESS);
1923        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1924        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
1925        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1926
1927        fdb_doc_free(rdoc);
1928        rdoc = NULL;
1929        i ++;
1930        count++;
1931    } while (fdb_iterator_next(iterator) == FDB_RESULT_SUCCESS);
1932
1933    TEST_CHK(count==n/2); // Only unique items from the first half
1934
1935    fdb_iterator_close(iterator);
1936
1937    // create an iterator on the in-memory snapshot clone for full range
1938    fdb_iterator_init(snap_inmem2, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1939
1940    // repeat until fail
1941    i=0;
1942    count=0;
1943    do {
1944        status = fdb_iterator_get(iterator, &rdoc);
1945        TEST_CHK(status == FDB_RESULT_SUCCESS);
1946        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1947        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
1948        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1949
1950        fdb_doc_free(rdoc);
1951        rdoc = NULL;
1952        i ++;
1953        count++;
1954    } while(fdb_iterator_next(iterator) == FDB_RESULT_SUCCESS);
1955
1956    TEST_CHK(count==n/2); // Only unique items from the first half
1957
1958    fdb_iterator_close(iterator);
1959
1960    // close db handle
1961    fdb_kvs_close(db);
1962    // close the in-memory snapshot handle
1963    fdb_kvs_close(snap_inmem);
1964    // close the in-memory clone snapshot handle
1965    fdb_kvs_close(snap_inmem2);
1966    // close the clone snapshot handle
1967    fdb_kvs_close(snap_db2);
1968    // close db file
1969    fdb_close(dbfile);
1970
1971    // free all documents
1972    for (i=0;i<n;++i){
1973        fdb_doc_free(doc[i]);
1974    }
1975
1976    // free all resources
1977    fdb_shutdown();
1978
1979    memleak_end();
1980
1981    TEST_RESULT("snapshot clone test");
1982}
1983
1984struct parallel_clone_t {
1985    fdb_doc **doc;
1986    fdb_kvs_handle *snap_db;
1987    int num_docs;
1988};
1989
1990void *snap_clone_thread(void *args)
1991{
1992    struct parallel_clone_t *t = (struct parallel_clone_t *)args;
1993    fdb_kvs_handle *clone_db;
1994    fdb_iterator *iterator;
1995    fdb_doc *rdoc = NULL;
1996    fdb_doc **doc = t->doc;
1997    int i;
1998    int n = t->num_docs;
1999    fdb_status status;
2000    int count;
2001    TEST_INIT();
2002
2003    status = fdb_snapshot_open(t->snap_db, &clone_db, FDB_SNAPSHOT_INMEM);
2004    TEST_CHK(status == FDB_RESULT_SUCCESS);
2005
2006    // create an iterator on the in-memory snapshot clone for full range
2007    fdb_iterator_init(clone_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
2008
2009    // repeat until fail
2010    i=0;
2011    count=0;
2012    do {
2013        status = fdb_iterator_get(iterator, &rdoc);
2014        TEST_CHK(status == FDB_RESULT_SUCCESS);
2015        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
2016        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
2017
2018        fdb_doc_free(rdoc);
2019        rdoc = NULL;
2020        i ++;
2021        count++;
2022    } while(fdb_iterator_next(iterator) == FDB_RESULT_SUCCESS);
2023
2024    TEST_CHK(count==n/2); // Only unique items from the first half
2025
2026    fdb_iterator_close(iterator);
2027
2028    fdb_kvs_close(clone_db);
2029    thread_exit(0);
2030    return NULL;
2031}
2032
2033void snapshot_parallel_clone_test()
2034{
2035    TEST_INIT();
2036
2037    memleak_start();
2038
2039    int i, r;
2040    int num_cloners = 30; // parallel snapshot clone operations
2041    int n = 20480; // 10 dirty wal flushes at least
2042    fdb_file_handle *dbfile;
2043    fdb_kvs_handle *db;
2044    fdb_kvs_handle *snap_inmem;
2045    fdb_doc **doc = alca(fdb_doc*, n);
2046    thread_t *tid = alca(thread_t, num_cloners);
2047    void *thread_ret;
2048    fdb_status status;
2049    struct parallel_clone_t clone_data;
2050
2051    char keybuf[256], bodybuf[256];
2052
2053    // remove previous mvcc_test files
2054    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2055    (void)r;
2056
2057    fdb_config fconfig = fdb_get_default_config();
2058    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2059    fconfig.buffercache_size = 0;
2060    fconfig.wal_threshold = 1024;
2061    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2062    fconfig.compaction_threshold = 0;
2063
2064    // remove previous mvcc_test files
2065    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2066    (void)r;
2067
2068    // open db
2069    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2070    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2071    TEST_CHK(status == FDB_RESULT_SUCCESS);
2072
2073    status = fdb_set_log_callback(db, logCallbackFunc,
2074                                  (void *) "snapshot_parallel_clone_test");
2075    TEST_CHK(status == FDB_RESULT_SUCCESS);
2076    // ------- Setup test ----------------------------------
2077    for (i=0; i<n/2; i++){
2078        sprintf(keybuf, "key%5d", i);
2079        sprintf(bodybuf, "body%d", i);
2080        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2081            NULL, 0, (void*)bodybuf, strlen(bodybuf));
2082        fdb_set(db, doc[i]);
2083    }
2084
2085    // Initialize an in-memory snapshot Without a Commit...
2086    // WAL items are not flushed...
2087    status = fdb_snapshot_open(db, &snap_inmem, FDB_SNAPSHOT_INMEM);
2088    TEST_CHK(status == FDB_RESULT_SUCCESS);
2089
2090    clone_data.doc = doc;
2091    clone_data.num_docs = n;
2092    clone_data.snap_db = snap_inmem;
2093
2094    for (int j = num_cloners - 1; j>=0; --j) {
2095        thread_create(&tid[j], snap_clone_thread, &clone_data);
2096    }
2097
2098    for (; i < n; i++){
2099        sprintf(keybuf, "key%5d", i);
2100        sprintf(bodybuf, "BODY%d", i);
2101        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2102            NULL, 0, (void*)bodybuf, strlen(bodybuf));
2103        fdb_set(db, doc[i]);
2104    }
2105    // commit without a WAL flush (This WAL must not affect snapshot)
2106    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2107
2108    for (int j = num_cloners - 1; j>=0; --j) {
2109        thread_join(tid[j], &thread_ret);
2110    }
2111
2112    // close db handle
2113    fdb_kvs_close(db);
2114    // close the in-memory snapshot handle
2115    fdb_kvs_close(snap_inmem);
2116    // close db file
2117    fdb_close(dbfile);
2118
2119    // free all documents
2120    for (i=0;i<n;++i){
2121        fdb_doc_free(doc[i]);
2122    }
2123
2124    // free all resources
2125    fdb_shutdown();
2126
2127    memleak_end();
2128
2129    TEST_RESULT("snapshot parallel clone test");
2130}
2131
2132void snapshot_markers_in_file_test(bool multi_kv)
2133{
2134    TEST_INIT();
2135
2136    memleak_start();
2137
2138    int i, r;
2139    int n = 20;
2140    int num_kvs = 4; // keep this the same as number of fdb_commit() calls
2141    fdb_file_handle *dbfile;
2142    fdb_kvs_handle **db = alca(fdb_kvs_handle *, num_kvs);
2143    fdb_doc **doc = alca(fdb_doc*, n);
2144    fdb_status status;
2145    fdb_snapshot_info_t *markers;
2146    uint64_t num_markers;
2147
2148    char keybuf[256], metabuf[256], bodybuf[256];
2149    char kv_name[8];
2150
2151    fdb_config fconfig = fdb_get_default_config();
2152    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2153    fconfig.buffercache_size = 0;
2154    fconfig.wal_threshold = 1024;
2155    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2156    fconfig.compaction_threshold = 0;
2157    fconfig.multi_kv_instances = multi_kv;
2158    // for creating the strict number of snapshots, disable block reusing
2159    fconfig.block_reusing_threshold = 0;
2160
2161    // remove previous mvcc_test files
2162    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2163    (void)r;
2164
2165    // open db
2166    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2167    if (multi_kv) {
2168        for (r = 0; r < num_kvs; ++r) {
2169            sprintf(kv_name, "kv%d", r);
2170            fdb_kvs_open(dbfile, &db[r], kv_name, &kvs_config);
2171        }
2172    } else {
2173        num_kvs = 1;
2174        fdb_kvs_open_default(dbfile, &db[0], &kvs_config);
2175    }
2176
2177   // ------- Setup test ----------------------------------
2178   // insert documents of 0-4
2179    for (i=0; i<n/4; i++){
2180        sprintf(keybuf, "key%d", i);
2181        sprintf(metabuf, "meta%d", i);
2182        sprintf(bodybuf, "body%d", i);
2183        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2184            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2185        for (r = 0; r < num_kvs; ++r) {
2186            fdb_set(db[r], doc[i]);
2187        }
2188    }
2189
2190    // commit with a manual WAL flush (these docs go into HB-trie)
2191    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2192
2193    // insert documents from 5 - 9
2194    for (; i < n/2; i++){
2195        sprintf(keybuf, "key%d", i);
2196        sprintf(metabuf, "meta%d", i);
2197        sprintf(bodybuf, "body%d", i);
2198        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2199            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2200        for (r = 0; r < num_kvs; ++r) {
2201            fdb_set(db[r], doc[i]);
2202        }
2203    }
2204
2205    // commit again without a WAL flush
2206    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2207
2208    // insert documents from 10-14 into HB-trie
2209    for (; i < (n/2 + n/4); i++){
2210        sprintf(keybuf, "key%d", i);
2211        sprintf(metabuf, "meta%d", i);
2212        sprintf(bodybuf, "body%d", i);
2213        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2214            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2215        for (r = 0; r < num_kvs; ++r) {
2216            fdb_set(db[r], doc[i]);
2217        }
2218    }
2219    // manually flush WAL & commit
2220    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2221
2222    // insert documents from 15 - 19 on file into the WAL
2223    for (; i < n; i++){
2224        sprintf(keybuf, "key%d", i);
2225        sprintf(metabuf, "meta%d", i);
2226        sprintf(bodybuf, "body%d", i);
2227        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2228            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2229        for (r = 0; r < num_kvs; ++r) {
2230            fdb_set(db[r], doc[i]);
2231        }
2232    }
2233    // commit without a WAL flush
2234    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2235
2236    for (r = 0; r < num_kvs; ++r) {
2237        status = fdb_set_log_callback(db[r], logCallbackFunc,
2238                                      (void *) "snapshot_markers_in_file");
2239        TEST_CHK(status == FDB_RESULT_SUCCESS);
2240    }
2241
2242    status = fdb_get_all_snap_markers(dbfile, &markers, &num_markers);
2243    TEST_CHK(status == FDB_RESULT_SUCCESS);
2244
2245    if (!multi_kv) {
2246        TEST_CHK(num_markers == 4);
2247        for (r = 0; r < num_kvs; ++r) {
2248            TEST_CHK(markers[r].num_kvs_markers == 1);
2249            TEST_CHK(markers[r].kvs_markers[0].seqnum
2250                     == (fdb_seqnum_t)(n - r*5));
2251        }
2252    } else {
2253        TEST_CHK(num_markers == 8);
2254        for (r = 0; r < num_kvs; ++r) {
2255            TEST_CHK(markers[r].num_kvs_markers == num_kvs);
2256            for (i = 0; i < num_kvs; ++i) {
2257                TEST_CHK(markers[r].kvs_markers[i].seqnum
2258                         == (fdb_seqnum_t)(n - r*5));
2259                sprintf(kv_name, "kv%d", i);
2260                TEST_CMP(markers[r].kvs_markers[i].kv_store_name, kv_name, 3);
2261            }
2262        }
2263    }
2264
2265    status = fdb_free_snap_markers(markers, num_markers);
2266    TEST_CHK(status == FDB_RESULT_SUCCESS);
2267
2268    // close db file
2269    fdb_close(dbfile);
2270
2271    // free all documents
2272    for (i=0;i<n;++i){
2273        fdb_doc_free(doc[i]);
2274    }
2275
2276    // free all resources
2277    fdb_shutdown();
2278
2279    memleak_end();
2280
2281    sprintf(bodybuf, "snapshot markers in file test %s", multi_kv ?
2282                                                        "multiple kv mode:"
2283                                                      : "single kv mode:");
2284    TEST_RESULT(bodybuf);
2285}
2286
2287void snapshot_without_seqtree(bool multi_kv)
2288{
2289    TEST_INIT();
2290
2291    memleak_start();
2292
2293    int i, j, r;
2294    int n = 10;
2295    int num_commits = 3, num_kvs = 3;
2296    fdb_file_handle *dbfile;
2297    fdb_kvs_handle **db = alca(fdb_kvs_handle *, num_kvs);
2298    fdb_doc **doc = alca(fdb_doc*, n);
2299    fdb_status status;
2300    fdb_snapshot_info_t *markers;
2301    uint64_t num_markers;
2302
2303    char keybuf[256], metabuf[256], bodybuf[256];
2304    char kv_name[8];
2305
2306    fdb_config fconfig = fdb_get_default_config();
2307    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2308    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2309    fconfig.multi_kv_instances = multi_kv;
2310    // for creating the strict number of snapshots, disable block reusing
2311    fconfig.block_reusing_threshold = 0;
2312    fconfig.seqtree_opt = FDB_SEQTREE_NOT_USE;
2313
2314    // remove previous mvcc_test files
2315    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2316    (void)r;
2317
2318    // open db
2319    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2320    if (multi_kv) {
2321        for (r = 0; r < num_kvs; ++r) {
2322            sprintf(kv_name, "kv%d", r);
2323            fdb_kvs_open(dbfile, &db[r], kv_name, &kvs_config);
2324        }
2325    } else {
2326        num_kvs = 1;
2327        fdb_kvs_open_default(dbfile, &db[0], &kvs_config);
2328    }
2329
2330    // Load documents mulitple times
2331    for (j=0; j < num_commits; ++j) {
2332        for (i=0; i<n; i++){
2333            sprintf(keybuf, "key%d", i);
2334            sprintf(metabuf, "meta-%d-%d", j, i);
2335            sprintf(bodybuf, "body-%d-%d", j, i);
2336            fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2337                           (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2338            for (r = 0; r < num_kvs; ++r) {
2339                fdb_set(db[r], doc[i]);
2340            }
2341            fdb_doc_free(doc[i]);
2342        }
2343        if (j % 2 == 0) {
2344            fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2345        } else {
2346            fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2347        }
2348    }
2349
2350    status = fdb_get_all_snap_markers(dbfile, &markers, &num_markers);
2351    TEST_CHK(status == FDB_RESULT_SUCCESS);
2352
2353    if (!multi_kv) {
2354        TEST_CHK(num_markers == 3);
2355        for (r = 0; r < num_kvs; ++r) {
2356            TEST_CHK(markers[r].num_kvs_markers == 1);
2357            TEST_CHK(markers[r].kvs_markers[0].seqnum
2358                     == (fdb_seqnum_t) (n * (num_commits - r)));
2359        }
2360    } else {
2361        TEST_CHK(num_markers == 6);
2362        for (r = 0; r < num_kvs; ++r) {
2363            TEST_CHK(markers[r].num_kvs_markers == num_kvs);
2364            for (i = 0; i < num_kvs; ++i) {
2365                TEST_CHK(markers[r].kvs_markers[i].seqnum
2366                         == (fdb_seqnum_t) (n * (num_commits - r)));
2367                sprintf(kv_name, "kv%d", i);
2368                TEST_CMP(markers[r].kvs_markers[i].kv_store_name, kv_name, 3);
2369            }
2370        }
2371    }
2372
2373    fdb_kvs_handle *snap_db;
2374    fdb_iterator *iterator;
2375    fdb_doc *rdoc = NULL;
2376    // Open the snapshot with the snapshot markers from the second commit
2377    for (r = 0; r < num_kvs; ++r) {
2378        status = fdb_snapshot_open(db[r], &snap_db, markers[1].kvs_markers[r].seqnum);
2379        TEST_CHK(status == FDB_RESULT_SUCCESS);
2380
2381        // Verify all the keys in the snapshot
2382        status = fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
2383        TEST_CHK(status == FDB_RESULT_SUCCESS);
2384        i=0;
2385        do {
2386            status = fdb_iterator_get(iterator, &rdoc);
2387            TEST_CHK(status == FDB_RESULT_SUCCESS);
2388            sprintf(keybuf, "key%d", i);
2389            sprintf(metabuf, "meta-1-%d", i);
2390            sprintf(bodybuf, "body-1-%d", i);
2391            TEST_CMP(rdoc->key, keybuf, rdoc->keylen);
2392            TEST_CMP(rdoc->meta, metabuf, rdoc->metalen);
2393            TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2394            fdb_doc_free(rdoc);
2395            rdoc = NULL;
2396            i++;
2397        } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
2398        TEST_CHK(i==n);
2399        status = fdb_iterator_close(iterator);
2400        TEST_CHK(status == FDB_RESULT_SUCCESS);
2401
2402        // Verify that a sequence-based iteration is not allowed.
2403        status = fdb_iterator_sequence_init(snap_db, &iterator, 0, 0, FDB_ITR_NONE);
2404        TEST_CHK(status != FDB_RESULT_SUCCESS);
2405
2406        status = fdb_kvs_close(snap_db);
2407        TEST_CHK(status == FDB_RESULT_SUCCESS);
2408    }
2409
2410    status = fdb_free_snap_markers(markers, num_markers);
2411    TEST_CHK(status == FDB_RESULT_SUCCESS);
2412
2413    // close db file
2414    fdb_close(dbfile);
2415
2416    // free all resources
2417    fdb_shutdown();
2418
2419    memleak_end();
2420
2421    sprintf(bodybuf, "snapshot without seqtree in %s",
2422            multi_kv ? "multiple kv mode:" : "single kv mode:");
2423    TEST_RESULT(bodybuf);
2424}
2425
2426void snapshot_with_deletes_test()
2427{
2428    TEST_INIT();
2429
2430    memleak_start();
2431
2432    int i, r;
2433    int n = 10;
2434    fdb_file_handle *dbfile;
2435    fdb_kvs_handle *db;
2436    fdb_kvs_handle *snap_db;
2437    fdb_doc **doc = alca(fdb_doc*, n);
2438    fdb_doc *rdoc = NULL;
2439    fdb_kvs_info kvs_info;
2440    fdb_status status;
2441    fdb_iterator *iterator;
2442
2443    char keybuf[256], metabuf[256], bodybuf[256];
2444
2445    // remove previous mvcc_test files
2446    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2447    (void)r;
2448
2449    fdb_config fconfig = fdb_get_default_config();
2450    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2451    fconfig.buffercache_size = 0;
2452
2453    // remove previous mvcc_test files
2454    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2455    (void)r;
2456
2457    // open db
2458    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2459    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2460    TEST_CHK(status == FDB_RESULT_SUCCESS);
2461
2462    status = fdb_set_log_callback(db, logCallbackFunc,
2463                                  (void *) "snapshot_with_deletes_test");
2464    TEST_CHK(status == FDB_RESULT_SUCCESS);
2465
2466    // ------- Setup test ----------------------------------
2467    // insert even documents into main index
2468    for (i=0; i<n; i = i + 2){
2469        sprintf(keybuf, "key%d", i);
2470        sprintf(metabuf, "meta%d", i);
2471        sprintf(bodybuf, "body%d", i);
2472        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2473            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2474        fdb_set(db, doc[i]);
2475    }
2476
2477    i = 7;
2478    sprintf(keybuf, "key%d", i);
2479    fdb_set_kv(db, keybuf, strlen(keybuf), NULL, 0);
2480
2481    // commit with a manual WAL flush (these docs go into HB-trie)
2482    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2483
2484    // let odd documents be in WAL section
2485    for (i = 1; i < n; i = i + 2){
2486        sprintf(keybuf, "key%d", i);
2487        sprintf(metabuf, "meta%d", i);
2488        sprintf(bodybuf, "body%d", i);
2489        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2490            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2491        fdb_set(db, doc[i]);
2492    }
2493
2494    // Delete WAL doc 7
2495    i = 7;
2496    fdb_del(db, doc[i]);
2497
2498    // Create an iterator on the live DB over full range
2499    fdb_iterator_init(db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
2500
2501    // attempt to seek to the deleted key7
2502    status = fdb_iterator_seek(iterator, doc[i]->key, doc[i]->keylen,
2503                               FDB_ITR_SEEK_HIGHER);
2504    TEST_CHK(status == FDB_RESULT_SUCCESS);
2505    status = fdb_iterator_get(iterator, &rdoc);
2506    TEST_CHK(status == FDB_RESULT_SUCCESS);
2507
2508    i = 8; // The next higher document must be returned
2509    TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
2510    TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
2511    TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
2512
2513    fdb_doc_free(rdoc);
2514    rdoc = NULL;
2515
2516    fdb_iterator_close(iterator);
2517
2518    // Open an in-memory snapshot over live DB
2519    status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
2520    TEST_CHK(status == FDB_RESULT_SUCCESS);
2521
2522    // check snapshot's sequence number
2523    fdb_get_kvs_info(snap_db, &kvs_info);
2524    TEST_CHK(kvs_info.last_seqnum == (fdb_seqnum_t)n+2);
2525    TEST_CHK(kvs_info.deleted_count == 1);
2526
2527    // re-create an iterator on the snapshot for full range
2528    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
2529
2530    i = 7;
2531    // attempt to seek to the deleted key7
2532    status = fdb_iterator_seek(iterator, doc[i]->key, doc[i]->keylen,
2533                               FDB_ITR_SEEK_HIGHER);
2534    TEST_CHK(status == FDB_RESULT_SUCCESS);
2535    status = fdb_iterator_get(iterator, &rdoc);
2536    TEST_CHK(status == FDB_RESULT_SUCCESS);
2537
2538    i = 8; // The next higher document must be returned
2539    TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
2540    TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
2541    TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
2542
2543    fdb_doc_free(rdoc);
2544    rdoc = NULL;
2545
2546    fdb_iterator_close(iterator);
2547
2548    // close db handle
2549    fdb_kvs_close(db);
2550    // close snapshot handle
2551    fdb_kvs_close(snap_db);
2552    // close db file
2553    fdb_close(dbfile);
2554
2555    // free all documents
2556    for (i=0;i<n;++i){
2557        fdb_doc_free(doc[i]);
2558    }
2559
2560    // free all resources
2561    fdb_shutdown();
2562
2563    memleak_end();
2564
2565    TEST_RESULT("snapshot with deletes test");
2566}
2567
2568void rollback_forward_seqnum()
2569{
2570
2571    TEST_INIT();
2572    memleak_start();
2573
2574    int r;
2575    int i, n=100;
2576    int rb1_seqnum, rb2_seqnum;
2577    char keybuf[256];
2578    char setop[3];
2579    fdb_file_handle *dbfile;
2580    fdb_iterator *it;
2581    fdb_kvs_handle *kv1, *mirror_kv1;
2582    fdb_kvs_info info;
2583    fdb_config fconfig = fdb_get_default_config();
2584    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2585    fdb_doc **doc = alca(fdb_doc*, n+1);
2586    fdb_doc *rdoc = NULL;
2587    fdb_status status;
2588    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2589    (void)r;
2590
2591    fconfig.wal_threshold = n;
2592    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
2593    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2594    fconfig.compaction_threshold = 0;
2595    fconfig.purging_interval = 5;
2596    fconfig.block_reusing_threshold = 0;
2597
2598    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2599    fdb_kvs_open(dbfile, &kv1, "kv1", &kvs_config);
2600    fdb_kvs_open(dbfile, &mirror_kv1, NULL, &kvs_config);
2601
2602
2603    // set n docs within both dbs
2604    for(i=0;i<=n;++i){
2605        sprintf(keybuf, "key%d", i);
2606        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2607            NULL, 0, NULL, 0);
2608        fdb_set(kv1, doc[i]);
2609        fdb_set_kv(mirror_kv1, keybuf, strlen(keybuf), setop, 3);
2610    } // last set should have caused a wal flush
2611
2612    fdb_del(kv1, doc[n]);
2613
2614    // commit and save seqnum1
2615    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2616    fdb_get_kvs_info(kv1, &info);
2617    rb1_seqnum = info.last_seqnum;
2618
2619    // delete all docs in kv1
2620    for(i=0;i<n;++i){
2621        fdb_del(kv1, doc[i]);
2622    }
2623
2624    // commit and save seqnum2
2625    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2626    fdb_get_kvs_info(kv1, &info);
2627    rb2_seqnum = info.last_seqnum;
2628
2629    // sets again
2630    for(i=0;i<n;++i){
2631        doc[i]->deleted = false;
2632        fdb_set(kv1, doc[i]);
2633    }
2634
2635    // commit
2636    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2637
2638    // rollback to first seqnum
2639    status = fdb_rollback(&kv1, rb1_seqnum);
2640    TEST_CHK(status == FDB_RESULT_SUCCESS);
2641
2642    fdb_get_kvs_info(kv1, &info);
2643    TEST_CHK(info.deleted_count == 1);
2644
2645    // rollback to second seqnum
2646    status = fdb_rollback(&kv1, rb2_seqnum);
2647    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
2648
2649    status = fdb_iterator_sequence_init(mirror_kv1, &it, 0, 0, FDB_ITR_NONE);
2650    TEST_CHK(status == FDB_RESULT_SUCCESS);
2651
2652    do {
2653        status = fdb_iterator_get(it, &rdoc);
2654        TEST_CHK(status == FDB_RESULT_SUCCESS);
2655        if (rdoc->seqnum != (uint64_t)n+1) {
2656            status = fdb_get_metaonly_byseq(kv1, rdoc);
2657            TEST_CHK(status == FDB_RESULT_SUCCESS);
2658            TEST_CHK(rdoc->deleted == false);
2659        } else {
2660            status = fdb_get_metaonly(kv1, rdoc);
2661            TEST_CHK(status == FDB_RESULT_SUCCESS);
2662            TEST_CHK(rdoc->deleted == true);
2663        }
2664        fdb_doc_free(rdoc);
2665        rdoc = NULL;
2666    } while(fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
2667
2668    for (i=0;i<=n;++i){
2669        fdb_doc_free(doc[i]);
2670    }
2671    fdb_iterator_close(it);
2672    fdb_kvs_close(kv1);
2673    fdb_close(dbfile);
2674    fdb_shutdown();
2675    memleak_end();
2676
2677    TEST_RESULT("rollback forward seqnum");
2678}
2679
2680void rollback_test(bool multi_kv)
2681{
2682    TEST_INIT();
2683
2684    memleak_start();
2685
2686    int i, r;
2687    int n = 20;
2688    int count;
2689    fdb_file_handle *dbfile, *dbfile_txn;
2690    fdb_kvs_handle *db, *db_txn;
2691    fdb_seqnum_t rollback_seq;
2692    fdb_doc **doc = alca(fdb_doc*, n);
2693    fdb_doc *rdoc = NULL;
2694    fdb_kvs_info kvs_info;
2695    fdb_status status;
2696    fdb_iterator *iterator;
2697
2698    char keybuf[256], metabuf[256], bodybuf[256];
2699
2700    // remove previous mvcc_test files
2701    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2702    (void)r;
2703
2704    fdb_config fconfig = fdb_get_default_config();
2705    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2706    fconfig.buffercache_size = 0;
2707    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
2708    fconfig.wal_threshold = 1024;
2709    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2710    fconfig.compaction_threshold = 0;
2711    fconfig.multi_kv_instances = multi_kv;
2712
2713    // remove previous mvcc_test files
2714    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2715    (void)r;
2716
2717    // open db
2718    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2719    if (multi_kv) {
2720        fdb_kvs_open_default(dbfile, &db, &kvs_config);
2721    } else {
2722        fdb_kvs_open(dbfile, &db, NULL, &kvs_config);
2723    }
2724
2725   // ------- Setup test ----------------------------------
2726   // insert documents of 0-4
2727    for (i=0; i<n/4; i++){
2728        sprintf(keybuf, "key%d", i);
2729        sprintf(metabuf, "meta%d", i);
2730        sprintf(bodybuf, "body%d", i);
2731        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2732            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2733        fdb_set(db, doc[i]);
2734    }
2735
2736    // commit with a manual WAL flush (these docs go into HB-trie)
2737    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2738
2739    // insert documents from 4 - 9
2740    for (; i < n/2; i++){
2741        sprintf(keybuf, "key%d", i);
2742        sprintf(metabuf, "meta%d", i);
2743        sprintf(bodybuf, "body%d", i);
2744        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2745            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2746        fdb_set(db, doc[i]);
2747    }
2748
2749    // commit again without a WAL flush
2750    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2751
2752    // ROLLBACK POINT: pick up sequence number of a commit without a WAL flush
2753    rollback_seq = doc[i-1]->seqnum;
2754
2755    // insert documents from 10-14 into HB-trie
2756    for (; i < (n/2 + n/4); i++){
2757        sprintf(keybuf, "key%d", i);
2758        sprintf(metabuf, "meta%d", i);
2759        sprintf(bodybuf, "body%d", i);
2760        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2761            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2762        fdb_set(db, doc[i]);
2763    }
2764    // manually flush WAL & commit
2765    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2766
2767    // insert documents from 15 - 19 on file into the WAL
2768    for (; i < n; i++){
2769        sprintf(keybuf, "key%d", i);
2770        sprintf(metabuf, "meta%d", i);
2771        sprintf(bodybuf, "body%d", i);
2772        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2773            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2774        fdb_set(db, doc[i]);
2775    }
2776    // commit without a WAL flush
2777    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2778
2779    status = fdb_set_log_callback(db, logCallbackFunc,
2780                                  (void *) "rollback_test");
2781    TEST_CHK(status == FDB_RESULT_SUCCESS);
2782
2783    // ---------- Rollback tests begin -----------------------
2784    // We have DB file with 5 HB-trie docs, 5 unflushed WAL docs occuring twice
2785    // Attempt to rollback to out-of-range marker..
2786    status = fdb_rollback(&db, 999999);
2787    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
2788
2789    // Open another handle & begin transaction
2790    fdb_open(&dbfile_txn, "./mvcc_test1", &fconfig);
2791    if (multi_kv) {
2792        fdb_kvs_open_default(dbfile_txn, &db_txn, &kvs_config);
2793    } else {
2794        fdb_kvs_open(dbfile_txn, &db_txn, NULL, &kvs_config);
2795    }
2796    fdb_begin_transaction(dbfile_txn, FDB_ISOLATION_READ_COMMITTED);
2797    // Attempt to rollback while the transaction is active
2798    status =  fdb_rollback(&db, rollback_seq);
2799    // Must fail
2800    TEST_CHK(status == FDB_RESULT_FAIL_BY_TRANSACTION);
2801    fdb_abort_transaction(dbfile_txn);
2802    fdb_kvs_close(db_txn);
2803    fdb_close(dbfile_txn);
2804
2805    // Rollback to saved marker from above
2806    status = fdb_rollback(&db, rollback_seq);
2807    TEST_CHK(status == FDB_RESULT_SUCCESS);
2808
2809    // check handle's sequence number
2810    fdb_get_kvs_info(db, &kvs_info);
2811    TEST_CHK(kvs_info.last_seqnum == rollback_seq);
2812
2813    // Modify an item and update into the rollbacked file..
2814    i = n/2;
2815    *(char *)doc[i]->body = 'B';
2816    fdb_set(db, doc[i]);
2817    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2818
2819    // create an iterator on the rollback for full range
2820    fdb_iterator_sequence_init(db, &iterator, 0, 0, FDB_ITR_NONE);
2821
2822    // repeat until fail
2823    i=0;
2824    count=0;
2825    do {
2826        status = fdb_iterator_get(iterator, &rdoc);
2827        TEST_CHK(status == FDB_RESULT_SUCCESS);
2828
2829        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
2830        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
2831        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
2832
2833        fdb_doc_free(rdoc);
2834        rdoc = NULL;
2835        i ++;
2836        count++;
2837    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
2838
2839    TEST_CHK(count==n/2 + 1); // Items from first half and newly set item
2840
2841    fdb_iterator_close(iterator);
2842
2843    // close db file
2844    fdb_kvs_close(db);
2845    fdb_close(dbfile);
2846
2847    // free all documents
2848    for (i=0;i<n;++i){
2849        fdb_doc_free(doc[i]);
2850    }
2851
2852    // free all resources
2853    fdb_shutdown();
2854
2855    memleak_end();
2856
2857    sprintf(bodybuf, "rollback test %s", multi_kv ? "multiple kv mode:"
2858                                                  : "single kv mode:");
2859    TEST_RESULT(bodybuf);
2860}
2861
2862
2863
2864void rollback_and_snapshot_test()
2865{
2866    TEST_INIT();
2867
2868    memleak_start();
2869
2870    fdb_seqnum_t seqnum, rollback_seqnum;
2871    fdb_kvs_info kvs_info;
2872    fdb_status status;
2873    fdb_config config;
2874    fdb_kvs_config kvs_config;
2875    fdb_file_handle *dbfile;
2876    fdb_kvs_handle *db,  *snapshot;
2877    int r;
2878
2879    // remove previous mvcc_test files
2880    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2881    (void)r;
2882
2883    // MB-12530 open db
2884    config = fdb_get_default_config();
2885    kvs_config = fdb_get_default_kvs_config();
2886    status = fdb_open(&dbfile, "mvcc_test", &config);
2887    TEST_CHK(status == FDB_RESULT_SUCCESS);
2888    status = fdb_kvs_open(dbfile, &db, NULL, &kvs_config);
2889    TEST_CHK(status == FDB_RESULT_SUCCESS);
2890
2891    // 2. Create Key 'a' and Commit
2892    status = fdb_set_kv(db, (void *) "a", 1, (void *)"val-a", 5);
2893    TEST_CHK(status == FDB_RESULT_SUCCESS);
2894    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2895    TEST_CHK(status == FDB_RESULT_SUCCESS);
2896
2897    status = fdb_get_kvs_info(db, &kvs_info);
2898    TEST_CHK(status == FDB_RESULT_SUCCESS);
2899
2900    // 3. Create Key 'b' and Commit
2901    status = fdb_set_kv(db, (void *)"b", 1, (void *)"val-b", 5);
2902    TEST_CHK(status == FDB_RESULT_SUCCESS);
2903    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2904    TEST_CHK(status == FDB_RESULT_SUCCESS);
2905
2906    status = fdb_get_kvs_info(db, &kvs_info);
2907    TEST_CHK(status == FDB_RESULT_SUCCESS);
2908    seqnum = kvs_info.last_seqnum;
2909
2910    // 4.  Remember this as our rollback point
2911    rollback_seqnum = seqnum;
2912
2913    // 5. Create Key 'c' and Commit
2914    status = fdb_set_kv(db, (void *)"c", 1,(void *) "val-c", 5);
2915    TEST_CHK(status == FDB_RESULT_SUCCESS);
2916    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2917    TEST_CHK(status == FDB_RESULT_SUCCESS);
2918
2919    status = fdb_get_kvs_info(db, &kvs_info);
2920    TEST_CHK(status == FDB_RESULT_SUCCESS);
2921
2922    // 6. Rollback to rollback point (seq 2)
2923    status = fdb_rollback(&db, rollback_seqnum);
2924    TEST_CHK(status == FDB_RESULT_SUCCESS);
2925
2926    status = fdb_get_kvs_info(db, &kvs_info);
2927    TEST_CHK(status == FDB_RESULT_SUCCESS);
2928    seqnum = kvs_info.last_seqnum;
2929
2930    // 7. Verify that Key 'c' is not found
2931    void *val;
2932    size_t vallen;
2933    status = fdb_get_kv(db, (void *)"c", 1, &val, &vallen);
2934    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
2935
2936    // 8. Open a snapshot at the same point
2937    status = fdb_snapshot_open(db, &snapshot, seqnum);
2938    TEST_CHK(status == FDB_RESULT_SUCCESS);
2939
2940    // 9. Verify that Key 'c' is not found
2941    status = fdb_get_kv(snapshot, (void *)"c", 1, &val, &vallen);
2942    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
2943
2944    // close the snapshot db
2945    fdb_kvs_close(snapshot);
2946
2947    // close db file
2948    fdb_close(dbfile);
2949
2950    // free all resources
2951    fdb_shutdown();
2952
2953    memleak_end();
2954
2955    TEST_RESULT("rollback and snapshot test");
2956}
2957
2958void rollback_ncommits()
2959{
2960
2961    TEST_INIT();
2962    memleak_start();
2963
2964    int r;
2965    int i, j, n=100;
2966    int ncommits=10;
2967    char keybuf[256];
2968    fdb_file_handle *dbfile;
2969    fdb_kvs_handle *kv1, *kv2;
2970    fdb_kvs_info info;
2971    fdb_config fconfig = fdb_get_default_config();
2972    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2973    fdb_status status;
2974    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2975    (void)r;
2976
2977    fconfig.wal_threshold = 1024;
2978    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2979    fconfig.compaction_threshold = 0;
2980    fconfig.block_reusing_threshold = 0;
2981
2982    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2983    fdb_kvs_open(dbfile, &kv1, "kv1", &kvs_config);
2984    fdb_kvs_open(dbfile, &kv2, NULL, &kvs_config);
2985
2986
2987    for(j=0;j<ncommits;++j){
2988
2989        // set n docs per commit
2990        for(i=0;i<n;++i){
2991            sprintf(keybuf, "key%02d%03d", j, i);
2992            fdb_set_kv(kv1, keybuf, strlen(keybuf), NULL, 0);
2993            fdb_set_kv(kv2, keybuf, strlen(keybuf), NULL, 0);
2994        }
2995        // alternate commit pattern
2996        if((j % 2) == 0){
2997            fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2998        } else {
2999            fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
3000        }
3001
3002        // doc_count should match seqnum since they are unique
3003        fdb_get_kvs_info(kv1, &info);
3004        TEST_CHK(info.doc_count == info.last_seqnum);
3005    }
3006
3007    // iteratively rollback 5 commits
3008     for(j=ncommits;j>0;--j){
3009        status = fdb_rollback(&kv1, j*n);
3010        TEST_CHK(status == FDB_RESULT_SUCCESS);
3011
3012        // check rollback doc_count
3013        fdb_get_kvs_info(kv1, &info);
3014        TEST_CHK(info.doc_count == info.last_seqnum);
3015    }
3016
3017    fdb_kvs_close(kv1);
3018    fdb_close(dbfile);
3019    fdb_shutdown();
3020    memleak_end();
3021
3022    TEST_RESULT("rollback n commits");
3023}
3024
3025void transaction_test()
3026{
3027    TEST_INIT();
3028
3029    memleak_start();
3030
3031    int i, r;
3032    int n = 10;
3033    fdb_file_handle *dbfile, *dbfile_txn1, *dbfile_txn2, *dbfile_txn3;
3034    fdb_kvs_handle *db, *db_txn1, *db_txn2, *db_txn3;
3035    fdb_doc **doc = alca(fdb_doc*, n);
3036    fdb_doc *rdoc;
3037    fdb_status status;
3038
3039    char keybuf[256], metabuf[256], bodybuf[256];
3040
3041    // remove previous mvcc_test files
3042    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
3043    (void)r;
3044
3045    fdb_config fconfig = fdb_get_default_config();
3046    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3047    fconfig.buffercache_size = 0;
3048    fconfig.wal_threshold = 1024;
3049    fconfig.flags = FDB_OPEN_FLAG_CREATE;
3050    fconfig.purging_interval = 0;
3051    fconfig.compaction_threshold = 0;
3052
3053    // open db
3054    fdb_open(&dbfile, "mvcc_test1", &fconfig);
3055    fdb_kvs_open_default(dbfile, &db, &kvs_config);
3056    status = fdb_set_log_callback(db, logCallbackFunc,
3057                                  (void *) "transaction_test");
3058    TEST_CHK(status == FDB_RESULT_SUCCESS);
3059
3060    // open db and begin transactions
3061    fdb_open(&dbfile_txn1, "mvcc_test1", &fconfig);
3062    fdb_open(&dbfile_txn2, "mvcc_test1", &fconfig);
3063    fdb_kvs_open_default(dbfile_txn1, &db_txn1, &kvs_config);
3064    fdb_kvs_open_default(dbfile_txn2, &db_txn2, &kvs_config);
3065    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
3066    fdb_begin_transaction(dbfile_txn2, FDB_ISOLATION_READ_COMMITTED);
3067
3068    // insert half docs into txn1
3069    for (i=0;i<n/2;++i){
3070        sprintf(keybuf, "key%d", i);
3071        sprintf(metabuf, "meta%d", i);
3072        sprintf(bodybuf, "body%d", i);
3073        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3074                                (void*)metabuf, strlen(metabuf),
3075                                (void*)bodybuf, strlen(bodybuf));
3076        fdb_set(db_txn1, doc[i]);
3077    }
3078
3079    // insert other half docs into txn2
3080    for (i=n/2;i<n;++i){
3081        sprintf(keybuf, "key%d", i);
3082        sprintf(metabuf, "meta%d", i);
3083        sprintf(bodybuf, "body%d", i);
3084        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3085                                (void*)metabuf, strlen(metabuf),
3086                                (void*)bodybuf, strlen(bodybuf));
3087        fdb_set(db_txn2, doc[i]);
3088    }
3089
3090    // uncommitted docs should not be read by the other transaction that
3091    // doesn't allow uncommitted reads.
3092    for (i=0;i<n;++i){
3093        sprintf(keybuf, "key%d", i);
3094        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3095        status = fdb_get(db_txn1, rdoc);
3096        if (i<n/2) {
3097            TEST_CHK(status == FDB_RESULT_SUCCESS);
3098        } else {
3099            TEST_CHK(status != FDB_RESULT_SUCCESS);
3100        }
3101        fdb_doc_free(rdoc);
3102    }
3103
3104    // uncommitted docs can be read by the transaction that allows uncommitted reads.
3105    fdb_open(&dbfile_txn3, "mvcc_test1", &fconfig);
3106    fdb_kvs_open_default(dbfile_txn3, &db_txn3, &kvs_config);
3107    fdb_begin_transaction(dbfile_txn3, FDB_ISOLATION_READ_UNCOMMITTED);
3108    for (i=0;i<n;++i){
3109        sprintf(keybuf, "key%d", i);
3110        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3111        status = fdb_get(db_txn3, rdoc);
3112        TEST_CHK(status == FDB_RESULT_SUCCESS);
3113        fdb_doc_free(rdoc);
3114    }
3115    fdb_end_transaction(dbfile_txn3, FDB_COMMIT_NORMAL);
3116    fdb_close(dbfile_txn3);
3117
3118    // commit and end txn1
3119    fdb_end_transaction(dbfile_txn1, FDB_COMMIT_NORMAL);
3120
3121    // uncommitted docs should not be read generally
3122    for (i=0;i<n;++i){
3123        sprintf(keybuf, "key%d", i);
3124        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3125        status = fdb_get(db, rdoc);
3126        if (i<n/2) {
3127            TEST_CHK(status == FDB_RESULT_SUCCESS);
3128        } else {
3129            TEST_CHK(status != FDB_RESULT_SUCCESS);
3130        }
3131        fdb_doc_free(rdoc);
3132    }
3133
3134    // read uncommitted docs using the same transaction
3135    for (i=0;i<n;++i){
3136        sprintf(keybuf, "key%d", i);
3137        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3138        status = fdb_get(db_txn2, rdoc);
3139        TEST_CHK(status == FDB_RESULT_SUCCESS);
3140        fdb_doc_free(rdoc);
3141    }
3142
3143    // abort txn2
3144    fdb_abort_transaction(dbfile_txn2);
3145
3146    // close & re-open db file
3147    fdb_close(dbfile);
3148    fdb_open(&dbfile, "mvcc_test1", &fconfig);
3149    fdb_kvs_open_default(dbfile, &db, &kvs_config);
3150    status = fdb_set_log_callback(db, logCallbackFunc,
3151                                  (void *) "transaction_test");
3152    TEST_CHK(status == FDB_RESULT_SUCCESS);
3153
3154    // uncommitted docs should not be read after reopening the file either
3155    for (i=0;i<n;++i){
3156        sprintf(keybuf, "key%d", i);
3157        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3158        status = fdb_get(db, rdoc);
3159        if (i<n/2) {
3160            TEST_CHK(status == FDB_RESULT_SUCCESS);
3161        } else {
3162            TEST_CHK(status != FDB_RESULT_SUCCESS);
3163        }
3164        fdb_doc_free(rdoc);
3165    }
3166
3167    // insert other half docs & commit
3168    for (i=n/2;i<n;++i){
3169        fdb_set(db, doc[i]);
3170    }
3171    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3172
3173    // now all docs can be read generally
3174    for (i=0;i<n;++i){
3175        sprintf(keybuf, "key%d", i);
3176        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3177        status = fdb_get(db, rdoc);
3178        TEST_CHK(status == FDB_RESULT_SUCCESS);
3179        fdb_doc_free(rdoc);
3180    }
3181
3182    // begin transactions
3183    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
3184    fdb_begin_transaction(dbfile_txn2, FDB_ISOLATION_READ_COMMITTED);
3185
3186    // concurrently update docs
3187    for (i=0;i<n;++i){
3188        sprintf(keybuf, "key%d", i);
3189        sprintf(metabuf, "meta%d", i);
3190        sprintf(bodybuf, "body%d_txn1", i);
3191        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf),
3192                                (void*)metabuf, strlen(metabuf),
3193                                (void*)bodybuf, strlen(bodybuf));
3194        fdb_set(db_txn1, rdoc);
3195        fdb_doc_free(rdoc);
3196
3197        sprintf(bodybuf, "body%d_txn2", i);
3198        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf),
3199                                (void*)metabuf, strlen(metabuf),
3200                                (void*)bodybuf, strlen(bodybuf));
3201        fdb_set(db_txn2, rdoc);
3202        fdb_doc_free(rdoc);
3203    }
3204
3205    // retrieve check
3206    for (i=0;i<n;++i){
3207        // general retrieval
3208        sprintf(keybuf, "key%d", i);
3209        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3210        status = fdb_get(db, rdoc);
3211        TEST_CHK(status == FDB_RESULT_SUCCESS);
3212        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
3213        fdb_doc_free(rdoc);
3214
3215        // get from txn1
3216        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3217        status = fdb_get(db_txn1, rdoc);
3218        TEST_CHK(status == FDB_RESULT_SUCCESS);
3219        sprintf(bodybuf, "body%d_txn1", i);
3220        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3221        fdb_doc_free(rdoc);
3222
3223        // get from txn2
3224        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3225        status = fdb_get(db_txn2, rdoc);
3226        TEST_CHK(status == FDB_RESULT_SUCCESS);
3227        sprintf(bodybuf, "body%d_txn2", i);
3228        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3229        fdb_doc_free(rdoc);
3230    }
3231
3232    // commit txn2 & retrieve check
3233    fdb_end_transaction(dbfile_txn2, FDB_COMMIT_NORMAL);
3234    for (i=0;i<n;++i){
3235        // general retrieval
3236        sprintf(keybuf, "key%d", i);
3237        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3238        status = fdb_get(db, rdoc);
3239        TEST_CHK(status == FDB_RESULT_SUCCESS);
3240        sprintf(bodybuf, "body%d_txn2", i);
3241        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3242        fdb_doc_free(rdoc);
3243
3244        // get from txn1
3245        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3246        status = fdb_get(db_txn1, rdoc);
3247        TEST_CHK(status == FDB_RESULT_SUCCESS);
3248        sprintf(bodybuf, "body%d_txn1", i);
3249        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3250        fdb_doc_free(rdoc);
3251    }
3252
3253    // commit txn1 & retrieve check
3254    fdb_end_transaction(dbfile_txn1, FDB_COMMIT_NORMAL);
3255    for (i=0;i<n;++i){
3256        // general retrieval
3257        sprintf(keybuf, "key%d", i);
3258        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3259        status = fdb_get(db, rdoc);
3260        TEST_CHK(status == FDB_RESULT_SUCCESS);
3261        sprintf(bodybuf, "body%d_txn1", i);
3262        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3263        fdb_doc_free(rdoc);
3264    }
3265
3266    // begin new transaction
3267    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
3268    // update doc#5
3269    i = 5;
3270    sprintf(keybuf, "key%d", i);
3271    sprintf(metabuf, "meta%d", i);
3272    sprintf(bodybuf, "body%d_before_compaction", i);
3273    fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf),
3274                            (void*)metabuf, strlen(metabuf),
3275                            (void*)bodybuf, strlen(bodybuf));
3276    fdb_set(db_txn1, rdoc);
3277    fdb_doc_free(rdoc);
3278
3279    // do compaction
3280    fdb_compact(dbfile, "mvcc_test2");
3281
3282    // retrieve doc#5
3283    // using txn1
3284    fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3285    status = fdb_get(db_txn1, rdoc);
3286    TEST_CHK(status == FDB_RESULT_SUCCESS);
3287    TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3288    fdb_doc_free(rdoc);
3289
3290    // general retrieval
3291    fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3292    status = fdb_get(db, rdoc);
3293    TEST_CHK(status == FDB_RESULT_SUCCESS);
3294    sprintf(bodybuf, "body%d_txn1", i);
3295    TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3296    fdb_doc_free(rdoc);
3297
3298    // commit transaction
3299    fdb_end_transaction(dbfile_txn1, FDB_COMMIT_NORMAL);
3300    // retrieve check
3301    for (i=0;i<n;++i){
3302        // general get
3303        sprintf(keybuf, "key%d", i);
3304        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3305        status = fdb_get(db, rdoc);
3306        TEST_CHK(status == FDB_RESULT_SUCCESS);
3307        if (i != 5) {
3308            sprintf(bodybuf, "body%d_txn1", i);
3309        } else {
3310            sprintf(bodybuf, "body%d_before_compaction", i);
3311        }
3312        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3313        fdb_doc_free(rdoc);
3314    }
3315
3316    // close & re-open db file
3317    fdb_close(dbfile);
3318    fdb_open(&dbfile, "mvcc_test2", &fconfig);
3319    fdb_kvs_open_default(dbfile, &db, &kvs_config);
3320    status = fdb_set_log_callback(db, logCallbackFunc,
3321                                  (void *) "transaction_test");
3322    TEST_CHK(status == FDB_RESULT_SUCCESS);
3323    // retrieve check again
3324    for (i=0;i<n;++i){
3325        // general get
3326        sprintf(keybuf, "key%d", i);
3327        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3328        status = fdb_get(db, rdoc);
3329        TEST_CHK(status == FDB_RESULT_SUCCESS);
3330        if (i != 5) {
3331            sprintf(bodybuf, "body%d_txn1", i);
3332        } else {
3333            sprintf(bodybuf, "body%d_before_compaction", i);
3334        }
3335        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3336        fdb_doc_free(rdoc);
3337    }
3338
3339    // close db file
3340    fdb_close(dbfile);
3341    fdb_close(dbfile_txn1);
3342    fdb_close(dbfile_txn2);
3343
3344    // free all documents
3345    for (i=0;i<n;++i){
3346        fdb_doc_free(doc[i]);
3347    }
3348
3349    // free all resources
3350    fdb_shutdown();
3351
3352    memleak_end();
3353
3354    TEST_RESULT("transaction test");
3355}
3356
3357void transaction_simple_api_test()
3358{
3359    TEST_INIT();
3360
3361    memleak_start();
3362
3363    int i, r;
3364    int n = 10;
3365    size_t valuelen;
3366    void *value;
3367    fdb_file_handle *dbfile, *dbfile_txn1, *dbfile_txn2;
3368    fdb_kvs_handle *db, *db_txn1, *db_txn2;
3369    fdb_status status;
3370
3371    char keybuf[256], bodybuf[256];
3372
3373    // remove previous mvcc_test files
3374    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
3375    (void)r;
3376
3377    fdb_config fconfig = fdb_get_default_config();
3378    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3379    fconfig.buffercache_size = 0;
3380    fconfig.wal_threshold = 1024;
3381    fconfig.flags = FDB_OPEN_FLAG_CREATE;
3382    fconfig.purging_interval = 0;
3383    fconfig.compaction_threshold = 0;
3384
3385    // open db
3386    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
3387    fdb_kvs_open_default(dbfile, &db, &kvs_config);
3388    status = fdb_set_log_callback(db, logCallbackFunc,
3389                                  (void *) "transaction_simple_api_test");
3390    TEST_CHK(status == FDB_RESULT_SUCCESS);
3391
3392    // insert key-value pairs
3393    for (i=0;i<n;++i){
3394        sprintf(keybuf, "key%d", i);
3395        sprintf(bodybuf, "body%d", i);
3396        status = fdb_set_kv(db, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3397        TEST_CHK(status == FDB_RESULT_SUCCESS);
3398    }
3399
3400    // commit
3401    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3402
3403    // open db and begin transactions
3404    fdb_open(&dbfile_txn1, "./mvcc_test1", &fconfig);
3405    fdb_open(&dbfile_txn2, "./mvcc_test1", &fconfig);
3406    fdb_kvs_open_default(dbfile_txn1, &db_txn1, &kvs_config);
3407    fdb_kvs_open_default(dbfile_txn2, &db_txn2, &kvs_config);
3408    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
3409    fdb_begin_transaction(dbfile_txn2, FDB_ISOLATION_READ_COMMITTED);
3410
3411    // concurrently update docs
3412    for (i=0;i<n;++i){
3413        sprintf(keybuf, "key%d", i);
3414        sprintf(bodybuf, "body%d_txn1", i);
3415        fdb_set_kv(db_txn1, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3416
3417        sprintf(keybuf, "key%d", i);
3418        sprintf(bodybuf, "body%d_txn2", i);
3419        fdb_set_kv(db_txn2, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3420    }
3421
3422    // retrieve key-value pairs
3423    for (i=0;i<n;++i){
3424        // general retrieval
3425        sprintf(keybuf, "key%d", i);
3426        sprintf(bodybuf, "body%d", i);
3427        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value, &valuelen);
3428        TEST_CHK(status == FDB_RESULT_SUCCESS);
3429        TEST_CMP(value, bodybuf, valuelen);
3430        fdb_free_block(value);
3431
3432        // txn1
3433        sprintf(keybuf, "key%d", i);
3434        sprintf(bodybuf, "body%d_txn1", i);
3435        status = fdb_get_kv(db_txn1,