1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdint.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <unistd.h>
25#endif
26
27#include "libforestdb/forestdb.h"
28#include "test.h"
29
30#include "internal_types.h"
31#include "functional_util.h"
32
33void rollback_secondary_kvs()
34{
35    TEST_INIT();
36    memleak_start();
37
38    int r;
39    void *value_out;
40    size_t valuelen_out;
41
42    fdb_status status;
43    fdb_config fconfig = fdb_get_default_config();
44    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
45    fdb_file_handle *dbfile;
46    fdb_kvs_handle *kv1, *kv2;
47
48    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
49    (void)r;
50
51    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
52    fdb_kvs_open_default(dbfile, &kv1, &kvs_config);
53    fdb_kvs_open_default(dbfile, &kv2, &kvs_config);
54
55    // seq:2
56    status = fdb_set_kv(kv1, (void *) "a", 1, (void *)"val-a", 5);
57    TEST_CHK(status == FDB_RESULT_SUCCESS);
58    status = fdb_set_kv(kv1, (void *) "b", 1, (void *)"val-b", 5);
59    TEST_CHK(status == FDB_RESULT_SUCCESS);
60    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
61
62    // seq:3
63    status = fdb_set_kv(kv1, (void *) "b", 1, (void *)"val-v", 5);
64    TEST_CHK(status == FDB_RESULT_SUCCESS);
65    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
66
67    // seq:4
68    status = fdb_del_kv(kv1, (void *)"a", 1);
69    TEST_CHK(status == FDB_RESULT_SUCCESS);
70    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
71
72    // get 'a' via kv2
73    status = fdb_get_kv(kv2, (void *)"b", 1, &value_out, &valuelen_out);
74    free(value_out);
75    TEST_CHK(status == FDB_RESULT_SUCCESS);
76
77    // rollback seq:3 via kv2
78    status = fdb_rollback(&kv2, 3);
79    TEST_CHK(status == FDB_RESULT_SUCCESS);
80
81    fdb_close(dbfile);
82    fdb_shutdown();
83
84    memleak_end();
85    TEST_RESULT("rollback secondary kv");
86}
87
88
89void multi_version_test()
90{
91    TEST_INIT();
92
93    memleak_start();
94
95    int i, r;
96    int n = 2;
97    fdb_file_handle *dbfile, *dbfile_new;
98    fdb_kvs_handle *db;
99    fdb_kvs_handle *db_new;
100    fdb_doc **doc = alca(fdb_doc*, n);
101    fdb_doc *rdoc;
102    fdb_status status;
103
104    char keybuf[256], metabuf[256], bodybuf[256];
105
106    // remove all previous mvcc_test files
107    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
108    (void)r;
109
110    fdb_config fconfig = fdb_get_default_config();
111    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
112    fconfig.buffercache_size = 1048576;
113    fconfig.wal_threshold = 1024;
114    fconfig.flags = FDB_OPEN_FLAG_CREATE;
115    fconfig.compaction_threshold = 0;
116
117    // open db
118    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
119    fdb_kvs_open_default(dbfile, &db, &kvs_config);
120    status = fdb_set_log_callback(db, logCallbackFunc, (void *) "multi_version_test");
121    TEST_CHK(status == FDB_RESULT_SUCCESS);
122
123    // insert documents
124    for (i=0;i<n;++i){
125        sprintf(keybuf, "key%d", i);
126        sprintf(metabuf, "meta%d", i);
127        sprintf(bodybuf, "body%d", i);
128        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
129            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
130        fdb_set(db, doc[i]);
131    }
132
133    // commit
134    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
135
136    // open same db file using a new handle
137    fdb_open(&dbfile_new, "./mvcc_test1", &fconfig);
138    fdb_kvs_open_default(dbfile_new, &db_new, &kvs_config);
139    status = fdb_set_log_callback(db_new, logCallbackFunc, (void *) "multi_version_test");
140    TEST_CHK(status == FDB_RESULT_SUCCESS);
141
142    // update documents using the old handle
143    for (i=0;i<n;++i){
144        sprintf(metabuf, "meta2%d", i);
145        sprintf(bodybuf, "body2%d", i);
146        fdb_doc_update(&doc[i], (void*)metabuf, strlen(metabuf),
147            (void*)bodybuf, strlen(bodybuf));
148        fdb_set(db, doc[i]);
149    }
150
151    // manually flush WAL and commit using the old handle
152    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
153
154    // retrieve documents using the old handle
155    for (i=0;i<n;++i){
156        // search by key
157        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
158        status = fdb_get(db, rdoc);
159
160        TEST_CHK(status == FDB_RESULT_SUCCESS);
161        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
162        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
163
164        // free result document
165        fdb_doc_free(rdoc);
166    }
167
168    // retrieve documents using the new handle
169    for (i=0;i<n;++i){
170        // search by key
171        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
172        status = fdb_get(db_new, rdoc);
173
174        TEST_CHK(status == FDB_RESULT_SUCCESS);
175        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
176        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
177
178        // free result document
179        fdb_doc_free(rdoc);
180    }
181
182    // close and re-open the new handle
183    fdb_kvs_close(db_new);
184    fdb_close(dbfile_new);
185    fdb_open(&dbfile_new, "./mvcc_test1", &fconfig);
186    fdb_kvs_open_default(dbfile_new, &db_new, &kvs_config);
187    status = fdb_set_log_callback(db_new, logCallbackFunc, (void *) "multi_version_test");
188    TEST_CHK(status == FDB_RESULT_SUCCESS);
189
190    // retrieve documents using the new handle
191    for (i=0;i<n;++i){
192        // search by key
193        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
194        status = fdb_get(db_new, rdoc);
195
196        TEST_CHK(status == FDB_RESULT_SUCCESS);
197        // the new version of data should be read
198        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
199        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
200
201        // free result document
202        fdb_doc_free(rdoc);
203    }
204
205
206    // free all documents
207    for (i=0;i<n;++i){
208        fdb_doc_free(doc[i]);
209    }
210
211    // close db file
212    fdb_kvs_close(db);
213    fdb_kvs_close(db_new);
214    fdb_close(dbfile);
215    fdb_close(dbfile_new);
216
217    // free all resources
218    fdb_shutdown();
219
220    memleak_end();
221
222    TEST_RESULT("multi version test");
223}
224
225void crash_recovery_test(bool walflush)
226{
227    TEST_INIT();
228
229    memleak_start();
230
231    int i, r;
232    int n = 10;
233    fdb_file_handle *dbfile;
234    fdb_kvs_handle *db;
235    fdb_doc **doc = alca(fdb_doc*, n);
236    fdb_doc *rdoc;
237    fdb_file_info file_info;
238    uint64_t bid;
239    const char *test_file = "./mvcc_test2";
240    fdb_status status;
241
242    char keybuf[256], metabuf[256], bodybuf[256];
243
244    // remove previous mvcc_test files
245    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
246    (void)r;
247
248    fdb_config fconfig = fdb_get_default_config();
249    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
250    fconfig.buffercache_size = 0;
251    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
252    fconfig.wal_threshold = 1024;
253    fconfig.flags = FDB_OPEN_FLAG_CREATE;
254    fconfig.compaction_threshold = 0;
255
256    // reopen db
257    fdb_open(&dbfile, test_file, &fconfig);
258    fdb_kvs_open_default(dbfile, &db, &kvs_config);
259    status = fdb_set_log_callback(db, logCallbackFunc,
260                                  (void *) "crash_recovery_test");
261    TEST_CHK(status == FDB_RESULT_SUCCESS);
262
263    // insert documents
264    for (i=0;i<n;++i){
265        sprintf(keybuf, "key%d", i);
266        sprintf(metabuf, "meta%d", i);
267        sprintf(bodybuf, "body%d", i);
268        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
269            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
270        fdb_set(db, doc[i]);
271    }
272
273    // commit
274    if(walflush){
275        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
276        TEST_CHK(status == FDB_RESULT_SUCCESS);
277    } else {
278        status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
279        TEST_CHK(status == FDB_RESULT_SUCCESS);
280    }
281
282    status = fdb_get_file_info(dbfile, &file_info);
283    TEST_CHK(status == FDB_RESULT_SUCCESS);
284    bid = file_info.file_size / fconfig.blocksize;
285
286    // close the db
287    fdb_kvs_close(db);
288    fdb_close(dbfile);
289
290    // Shutdown forest db in the middle of the test to simulate crash
291    fdb_shutdown();
292
293    // append 9K of non-block aligned garbage at end of file
294    r = _disk_dump(test_file, bid * fconfig.blocksize,
295                   (2 * fconfig.blocksize) + (fconfig.blocksize / 4));
296    TEST_CHK(r >= 0);
297
298    // reopen the same file
299    status = fdb_open(&dbfile, test_file, &fconfig);
300    TEST_CHK(status == FDB_RESULT_SUCCESS);
301    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
302    TEST_CHK(status == FDB_RESULT_SUCCESS);
303    status = fdb_set_log_callback(db, logCallbackFunc,
304                                  (void *) "crash_recovery_test");
305    TEST_CHK(status == FDB_RESULT_SUCCESS);
306
307    // retrieve documents
308    for (i=0;i<n;++i){
309        // search by key
310        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
311        status = fdb_get(db, rdoc);
312
313        TEST_CHK(status == FDB_RESULT_SUCCESS);
314        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
315        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
316
317        // free result document
318        fdb_doc_free(rdoc);
319    }
320
321    // retrieve documents by sequence number
322    for (i=0;i<n;++i){
323        // search by seq
324        fdb_doc_create(&rdoc, NULL, 0, NULL, 0, NULL, 0);
325        rdoc->seqnum = i + 1;
326        status = fdb_get_byseq(db, rdoc);
327
328        TEST_CHK(status == FDB_RESULT_SUCCESS);
329
330        // free result document
331        fdb_doc_free(rdoc);
332    }
333
334    // free all documents
335    for (i=0;i<n;++i){
336        fdb_doc_free(doc[i]);
337    }
338
339    // close db file
340    fdb_kvs_close(db);
341    fdb_close(dbfile);
342
343    // free all resources
344    fdb_shutdown();
345
346    memleak_end();
347
348    if(walflush){
349        TEST_RESULT("crash recovery test - walflush");
350    } else {
351        TEST_RESULT("crash recovery test - normal flush");
352    }
353}
354
355void snapshot_test()
356{
357    TEST_INIT();
358
359    memleak_start();
360
361    int i, r;
362    int n = 20;
363    int count;
364    fdb_file_handle *dbfile;
365    fdb_kvs_handle *db;
366    fdb_kvs_handle *snap_db;
367    fdb_seqnum_t snap_seq;
368    fdb_doc **doc = alca(fdb_doc*, n);
369    fdb_doc *rdoc = NULL;
370    fdb_kvs_info kvs_info;
371    fdb_status status;
372    fdb_iterator *iterator;
373
374    char keybuf[256], metabuf[256], bodybuf[256];
375
376    // remove previous mvcc_test files
377    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
378    (void)r;
379
380    fdb_config fconfig = fdb_get_default_config();
381    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
382    fconfig.buffercache_size = 0;
383    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
384    fconfig.wal_threshold = 1024;
385    fconfig.flags = FDB_OPEN_FLAG_CREATE;
386    fconfig.compaction_threshold = 0;
387
388    // remove previous mvcc_test files
389    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
390    (void)r;
391
392    // open db
393    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
394    fdb_kvs_open_default(dbfile, &db, &kvs_config);
395    TEST_CHK(status == FDB_RESULT_SUCCESS);
396
397    // Create a snapshot from an empty database file
398    status = fdb_snapshot_open(db, &snap_db, 0);
399    TEST_CHK(status == FDB_RESULT_SUCCESS);
400    // check if snapshot's sequence number is zero.
401    fdb_get_kvs_info(snap_db, &kvs_info);
402    TEST_CHK(kvs_info.last_seqnum == 0);
403    // create an iterator on the snapshot for full range
404    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
405    // Iterator should not return any items.
406    status = fdb_iterator_get(iterator, &rdoc);
407    TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
408    fdb_iterator_close(iterator);
409    fdb_kvs_close(snap_db);
410
411    // ------- Setup test ----------------------------------
412    // insert documents of 0-4
413    for (i=0; i<n/4; i++){
414        sprintf(keybuf, "key%d", i);
415        sprintf(metabuf, "meta%d", i);
416        sprintf(bodybuf, "body%d", i);
417        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
418            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
419        fdb_set(db, doc[i]);
420    }
421
422    // commit with a manual WAL flush (these docs go into HB-trie)
423    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
424
425    // insert documents from 4 - 8
426    for (; i < n/2 - 1; i++){
427        sprintf(keybuf, "key%d", i);
428        sprintf(metabuf, "meta%d", i);
429        sprintf(bodybuf, "body%d", i);
430        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
431            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
432        fdb_set(db, doc[i]);
433    }
434
435    // We want to create:
436    // |WALFlushHDR|Key-Value1|HDR|Key-Value2|SnapshotHDR|Key-Value1|HDR|
437    // Insert doc 9 with a different value to test duplicate elimination..
438    sprintf(keybuf, "key%d", i);
439    sprintf(metabuf, "meta%d", i);
440    sprintf(bodybuf, "Body%d", i);
441    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
442            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
443    fdb_set(db, doc[i]);
444
445    // commit again without a WAL flush (last doc goes into the AVL tree)
446    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
447
448    // Insert doc 9 now again with expected value..
449    *(char *)doc[i]->body = 'b';
450    fdb_set(db, doc[i]);
451    // commit again without a WAL flush (these documents go into the AVL trees)
452    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
453
454    // TAKE SNAPSHOT: pick up sequence number of a commit without a WAL flush
455    snap_seq = doc[i]->seqnum;
456
457    // Now re-insert doc 9 as another duplicate (only newer sequence number)
458    fdb_set(db, doc[i]);
459    // commit again without a WAL flush (last doc goes into the AVL tree)
460    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
461
462    // insert documents from 10-14 into HB-trie
463    for (++i; i < (n/2 + n/4); i++){
464        sprintf(keybuf, "key%d", i);
465        sprintf(metabuf, "meta%d", i);
466        sprintf(bodybuf, "body%d", i);
467        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
468            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
469        fdb_set(db, doc[i]);
470    }
471    // manually flush WAL & commit
472    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
473
474    status = fdb_set_log_callback(db, logCallbackFunc,
475                                  (void *) "snapshot_test");
476    TEST_CHK(status == FDB_RESULT_SUCCESS);
477    // ---------- Snapshot tests begin -----------------------
478    // Attempt to take snapshot with out-of-range marker..
479    status = fdb_snapshot_open(db, &snap_db, 999999);
480    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
481
482    // Init Snapshot of open file with saved document seqnum as marker
483    status = fdb_snapshot_open(db, &snap_db, snap_seq);
484    TEST_CHK(status == FDB_RESULT_SUCCESS);
485
486    // check snapshot's sequence number
487    fdb_get_kvs_info(snap_db, &kvs_info);
488    TEST_CHK(kvs_info.last_seqnum == snap_seq);
489
490    // insert documents from 15 - 19 on file into the WAL
491    for (; i < n; i++){
492        sprintf(keybuf, "key%d", i);
493        sprintf(metabuf, "meta%d", i);
494        sprintf(bodybuf, "body%d", i);
495        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
496            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
497        fdb_set(db, doc[i]);
498    }
499    // commit without a WAL flush (This WAL must not affect snapshot)
500    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
501
502    // create an iterator on the snapshot for full range
503    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
504
505    // repeat until fail
506    i=0;
507    count=0;
508    do {
509        status = fdb_iterator_get(iterator, &rdoc);
510        TEST_CHK(status == FDB_RESULT_SUCCESS);
511
512        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
513        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
514        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
515
516        fdb_doc_free(rdoc);
517        rdoc = NULL;
518        i ++;
519        count++;
520    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
521
522    TEST_CHK(count==n/2); // Only unique items from the first half
523
524    fdb_iterator_close(iterator);
525
526    // create a sequence iterator on the snapshot for full range
527    fdb_iterator_sequence_init(snap_db, &iterator, 0, 0, FDB_ITR_NONE);
528
529    // repeat until fail
530    i=0;
531    count=0;
532    do {
533        status = fdb_iterator_get(iterator, &rdoc);
534        TEST_CHK(status == FDB_RESULT_SUCCESS);
535
536        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
537        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
538        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
539
540        fdb_doc_free(rdoc);
541        rdoc = NULL;
542        i ++;
543        count++;
544    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
545
546    TEST_CHK(count==n/2); // Only unique items from the first half
547
548    fdb_iterator_close(iterator);
549
550    // close db handle
551    fdb_kvs_close(db);
552    // close snapshot handle
553    fdb_kvs_close(snap_db);
554    // close db file
555    fdb_close(dbfile);
556
557    // free all documents
558    for (i=0;i<n;++i){
559        fdb_doc_free(doc[i]);
560    }
561
562    // 1. Open Database File
563    status = fdb_open(&dbfile, "./mvcc_test2", &fconfig);
564    TEST_CHK(status == FDB_RESULT_SUCCESS);
565
566    // 2. Open KV Store
567    status = fdb_kvs_open(dbfile, &db, "kv2", &kvs_config);
568    TEST_CHK(status == FDB_RESULT_SUCCESS);
569
570    // 2. Create Key 'a' and Commit
571    status = fdb_set_kv(db, (void *) "a", 1, (void *)"val-a", 5);
572    TEST_CHK(status == FDB_RESULT_SUCCESS);
573    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
574    TEST_CHK(status == FDB_RESULT_SUCCESS);
575
576    // 3. Create Key 'b' and Commit
577    status = fdb_set_kv(db, (void *)"b", 1, (void *)"val-b", 5);
578    TEST_CHK(status == FDB_RESULT_SUCCESS);
579    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
580    TEST_CHK(status == FDB_RESULT_SUCCESS);
581
582    // 4. Create Key 'c' and Commit
583    status = fdb_set_kv(db, (void *)"c", 1, (void *)"val-c", 5);
584    TEST_CHK(status == FDB_RESULT_SUCCESS);
585    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
586    TEST_CHK(status == FDB_RESULT_SUCCESS);
587
588    // 5. Remember seqnum for opening snapshot
589    status = fdb_get_kvs_info(db, &kvs_info);
590    TEST_CHK(status == FDB_RESULT_SUCCESS);
591    snap_seq = kvs_info.last_seqnum;
592
593    // 6.  Create an iterator
594    status = fdb_iterator_init(db, &iterator, (void *)"a", 1,
595                               (void *)"c", 1, FDB_ITR_NONE);
596    TEST_CHK(status == FDB_RESULT_SUCCESS);
597
598    // 7. Do a seek
599    status = fdb_iterator_seek(iterator, (void *)"b", 1, FDB_ITR_SEEK_HIGHER);
600    TEST_CHK(status == FDB_RESULT_SUCCESS);
601
602    // 8. Close the iterator
603    status = fdb_iterator_close(iterator);
604    TEST_CHK(status == FDB_RESULT_SUCCESS);
605
606    // 9. Open a snapshot at the same point
607    status = fdb_snapshot_open(db, &snap_db, snap_seq);
608    TEST_CHK(status == FDB_RESULT_SUCCESS);
609
610    // That works, now attempt to do exact same sequence on a snapshot
611    // The snapshot is opened at the exact same place that the original
612    // database handle should be at (last seq 3)
613
614    // 10.  Create an iterator on snapshot
615    status = fdb_iterator_init(snap_db, &iterator, (void *)"a", 1,
616                               (void *)"c", 1, FDB_ITR_NONE);
617    TEST_CHK(status == FDB_RESULT_SUCCESS);
618
619    // 11. Do a seek
620    status = fdb_iterator_seek(iterator, (void *)"b", 1, FDB_ITR_SEEK_HIGHER);
621    TEST_CHK(status == FDB_RESULT_SUCCESS);
622
623    // 12. Close the iterator
624    status = fdb_iterator_close(iterator);
625    TEST_CHK(status == FDB_RESULT_SUCCESS);
626
627    // close db handle
628    fdb_kvs_close(db);
629    // close snapshot handle
630    fdb_kvs_close(snap_db);
631    // close db file
632    fdb_close(dbfile);
633
634    // free all resources
635    fdb_shutdown();
636
637    memleak_end();
638
639    TEST_RESULT("snapshot test");
640}
641
642void snapshot_stats_test()
643{
644    TEST_INIT();
645
646    memleak_start();
647
648    int i, r;
649    int n = 20;
650    fdb_file_handle *dbfile;
651    fdb_kvs_handle *db;
652    fdb_kvs_handle *snap_db;
653    fdb_seqnum_t snap_seq;
654    fdb_doc **doc = alca(fdb_doc*, n);
655    fdb_kvs_info kvs_info;
656    fdb_status status;
657
658    char keybuf[256], metabuf[256], bodybuf[256];
659
660    // remove previous mvcc_test files
661    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
662    (void)r;
663
664    fdb_config fconfig = fdb_get_default_config();
665    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
666    fconfig.buffercache_size = 0;
667    fconfig.wal_threshold = 1024;
668    fconfig.flags = FDB_OPEN_FLAG_CREATE;
669    fconfig.compaction_threshold = 0;
670
671    // remove previous mvcc_test files
672    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
673    (void)r;
674
675    // open db
676    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
677    fdb_kvs_open_default(dbfile, &db, &kvs_config);
678    TEST_CHK(status == FDB_RESULT_SUCCESS);
679
680    // ------- Setup test ----------------------------------
681    // insert documents
682    for (i=0; i<n; i++){
683        sprintf(keybuf, "key%d", i);
684        sprintf(metabuf, "meta%d", i);
685        sprintf(bodybuf, "body%d", i);
686        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
687            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
688        fdb_set(db, doc[i]);
689        fdb_doc_free(doc[i]);
690    }
691
692    // commit with a manual WAL flush (these docs go into HB-trie)
693    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
694
695    // retrieve the sequence number for a snapshot open
696    status = fdb_get_kvs_info(db, &kvs_info);
697    TEST_CHK(status == FDB_RESULT_SUCCESS);
698    snap_seq = kvs_info.last_seqnum;
699
700    status = fdb_set_log_callback(db, logCallbackFunc,
701                                  (void *) "snapshot_stats_test");
702    TEST_CHK(status == FDB_RESULT_SUCCESS);
703
704    // Init Snapshot of open file with saved document seqnum as marker
705    status = fdb_snapshot_open(db, &snap_db, snap_seq);
706    TEST_CHK(status == FDB_RESULT_SUCCESS);
707
708    // check snapshot's sequence number
709    fdb_get_kvs_info(snap_db, &kvs_info);
710    TEST_CHK(kvs_info.last_seqnum == snap_seq);
711
712    TEST_CHK(kvs_info.doc_count == (size_t)n);
713    TEST_CHK(kvs_info.file == dbfile);
714
715    // close snapshot handle
716    fdb_kvs_close(snap_db);
717
718    // Test stats by creating an in-memory snapshot
719    status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
720    TEST_CHK(status == FDB_RESULT_SUCCESS);
721
722    // check snapshot's sequence number
723    fdb_get_kvs_info(snap_db, &kvs_info);
724    TEST_CHK(kvs_info.last_seqnum == snap_seq);
725
726    TEST_CHK(kvs_info.doc_count == (size_t)n);
727    TEST_CHK(kvs_info.file == dbfile);
728
729    // close snapshot handle
730    fdb_kvs_close(snap_db);
731
732    // close db handle
733    fdb_kvs_close(db);
734    // close db file
735    fdb_close(dbfile);
736
737    // free all resources
738    fdb_shutdown();
739
740    memleak_end();
741
742    TEST_RESULT("snapshot stats test");
743}
744
745void snapshot_with_uncomitted_data_test()
746{
747    TEST_INIT();
748
749    int n = 10, value_len=32;
750    int i, r, idx;
751    char cmd[256];
752    char key[256], *value;
753    char keystr[] = "key%06d";
754    char valuestr[] = "value%d";
755    fdb_file_handle *db_file;
756    fdb_kvs_handle *db0, *db1, *db2, *snap;
757    fdb_config config;
758    fdb_kvs_config kvs_config;
759    fdb_kvs_info info;
760    fdb_seqnum_t seqnum;
761    fdb_status s; (void)s;
762
763    sprintf(cmd, SHELL_DEL " mvcc_test* > errorlog.txt");
764    r = system(cmd);
765    (void)r;
766
767    memleak_start();
768
769    value = (char*)malloc(value_len);
770
771    srand(1234);
772    config = fdb_get_default_config();
773    config.seqtree_opt = FDB_SEQTREE_USE;
774    config.wal_flush_before_commit = true;
775    config.multi_kv_instances = true;
776    config.buffercache_size = 0*1024*1024;
777
778    kvs_config = fdb_get_default_kvs_config();
779
780    s = fdb_open(&db_file, "./mvcc_test9", &config);
781    TEST_CHK(s == FDB_RESULT_SUCCESS);
782    s = fdb_kvs_open(db_file, &db0, NULL, &kvs_config);
783    TEST_CHK(s == FDB_RESULT_SUCCESS);
784    s = fdb_kvs_open(db_file, &db1, "db1", &kvs_config);
785    TEST_CHK(s == FDB_RESULT_SUCCESS);
786    s = fdb_kvs_open(db_file, &db2, "db2", &kvs_config);
787    TEST_CHK(s == FDB_RESULT_SUCCESS);
788
789    // insert docs in all KV stores
790    for (i=0;i<n;++i){
791        idx = i;
792        sprintf(key, keystr, idx);
793        memset(value, 'x', value_len);
794        memcpy(value + value_len - 6, "<end>", 6);
795        sprintf(value, valuestr, idx);
796        s = fdb_set_kv(db0, key, strlen(key)+1, value, value_len);
797        TEST_CHK(s == FDB_RESULT_SUCCESS);
798        s = fdb_set_kv(db1, key, strlen(key)+1, value, value_len);
799        TEST_CHK(s == FDB_RESULT_SUCCESS);
800        s = fdb_set_kv(db2, key, strlen(key)+1, value, value_len);
801        TEST_CHK(s == FDB_RESULT_SUCCESS);
802    }
803    // try to open snapshot before commit
804    s = fdb_get_kvs_info(db0, &info);
805    TEST_CHK(s == FDB_RESULT_SUCCESS);
806    s = fdb_snapshot_open(db0, &snap, info.last_seqnum);
807    TEST_CHK(s != FDB_RESULT_SUCCESS);
808
809    s = fdb_get_kvs_info(db1, &info);
810    TEST_CHK(s == FDB_RESULT_SUCCESS);
811    s = fdb_snapshot_open(db1, &snap, info.last_seqnum);
812    TEST_CHK(s != FDB_RESULT_SUCCESS);
813
814    s = fdb_get_kvs_info(db2, &info);
815    TEST_CHK(s == FDB_RESULT_SUCCESS);
816    s = fdb_snapshot_open(db2, &snap, info.last_seqnum);
817    TEST_CHK(s != FDB_RESULT_SUCCESS);
818
819    s = fdb_commit(db_file, FDB_COMMIT_NORMAL);
820    TEST_CHK(s == FDB_RESULT_SUCCESS);
821
822    s = fdb_get_kvs_info(db1, &info);
823    TEST_CHK(s == FDB_RESULT_SUCCESS);
824    seqnum = info.last_seqnum;
825
826    s = fdb_get_kvs_info(db2, &info);
827    TEST_CHK(s == FDB_RESULT_SUCCESS);
828    TEST_CHK(seqnum == info.last_seqnum);
829
830    s = fdb_get_kvs_info(db0, &info);
831    TEST_CHK(s == FDB_RESULT_SUCCESS);
832    TEST_CHK(seqnum == info.last_seqnum);
833
834    // now insert docs into default and db2 only, without commit
835    for (i=0;i<n;++i){
836        idx = i;
837        sprintf(key, keystr, idx);
838        memset(value, 'x', value_len);
839        memcpy(value + value_len - 6, "<end>", 6);
840        sprintf(value, valuestr, idx);
841        s = fdb_set_kv(db0, key, strlen(key)+1, value, value_len);
842        TEST_CHK(s == FDB_RESULT_SUCCESS);
843        s = fdb_set_kv(db2, key, strlen(key)+1, value, value_len);
844        TEST_CHK(s == FDB_RESULT_SUCCESS);
845    }
846
847    // open snapshot on db1
848    s = fdb_get_kvs_info(db1, &info);
849    TEST_CHK(s == FDB_RESULT_SUCCESS);
850    // latest (comitted) seqnum
851    s = fdb_snapshot_open(db1, &snap, info.last_seqnum);
852    TEST_CHK(s == FDB_RESULT_SUCCESS);
853    s = fdb_kvs_close(snap);
854    TEST_CHK(s == FDB_RESULT_SUCCESS);
855
856    // open snapshot on db2
857    s = fdb_get_kvs_info(db2, &info);
858    TEST_CHK(s == FDB_RESULT_SUCCESS);
859
860    // latest (uncomitted) seqnum
861    s = fdb_snapshot_open(db2, &snap, info.last_seqnum);
862    TEST_CHK(s != FDB_RESULT_SUCCESS);
863    // committed seqnum
864    s = fdb_snapshot_open(db2, &snap, seqnum);
865    TEST_CHK(s == FDB_RESULT_SUCCESS);
866    s = fdb_kvs_close(snap);
867    TEST_CHK(s == FDB_RESULT_SUCCESS);
868
869    // open snapshot on default KVS
870    s = fdb_get_kvs_info(db0, &info);
871    TEST_CHK(s == FDB_RESULT_SUCCESS);
872
873    // latest (uncomitted) seqnum
874    s = fdb_snapshot_open(db0, &snap, info.last_seqnum);
875    TEST_CHK(s != FDB_RESULT_SUCCESS);
876    // committed seqnum
877    s = fdb_snapshot_open(db0, &snap, seqnum);
878    TEST_CHK(s == FDB_RESULT_SUCCESS);
879    s = fdb_kvs_close(snap);
880    TEST_CHK(s == FDB_RESULT_SUCCESS);
881
882    s = fdb_close(db_file);
883    TEST_CHK(s == FDB_RESULT_SUCCESS);
884    s = fdb_shutdown();
885    TEST_CHK(s == FDB_RESULT_SUCCESS);
886    free(value);
887    memleak_end();
888
889    TEST_RESULT("snapshot with uncomitted data in other KVS test");
890}
891
892void in_memory_snapshot_test()
893{
894    TEST_INIT();
895
896    memleak_start();
897
898    int i, r;
899    int n = 20;
900    int count;
901    fdb_file_handle *dbfile;
902    fdb_kvs_handle *db;
903    fdb_kvs_handle *snap_db;
904    fdb_seqnum_t snap_seq;
905    fdb_doc **doc = alca(fdb_doc*, n);
906    fdb_doc *rdoc;
907    fdb_kvs_info kvs_info;
908    fdb_status status;
909    fdb_iterator *iterator;
910
911    char keybuf[256], metabuf[256], bodybuf[256];
912
913    // remove previous mvcc_test files
914    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
915    (void)r;
916
917    fdb_config fconfig = fdb_get_default_config();
918    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
919    fconfig.buffercache_size = 0;
920    fconfig.wal_threshold = 1024;
921    fconfig.flags = FDB_OPEN_FLAG_CREATE;
922    fconfig.compaction_threshold = 0;
923
924    // remove previous mvcc_test files
925    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
926    (void)r;
927
928    // open db
929    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
930    TEST_CHK(status == FDB_RESULT_SUCCESS);
931    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
932    TEST_CHK(status == FDB_RESULT_SUCCESS);
933
934    // Create a snapshot from an empty database file
935    status = fdb_snapshot_open(db, &snap_db, 0);
936    TEST_CHK(status == FDB_RESULT_SUCCESS);
937    // check if snapshot's sequence number is zero.
938    fdb_get_kvs_info(snap_db, &kvs_info);
939    TEST_CHK(kvs_info.last_seqnum == 0);
940    // create an iterator on the snapshot for full range
941    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
942    // Iterator should not return any items.
943    status = fdb_iterator_get(iterator, &rdoc);
944    TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
945    fdb_iterator_close(iterator);
946    fdb_kvs_close(snap_db);
947
948    // ------- Setup test ----------------------------------
949    // insert documents of 0-4
950    for (i=0; i<n/4; i++){
951        sprintf(keybuf, "key%d", i);
952        sprintf(metabuf, "meta%d", i);
953        sprintf(bodybuf, "body%d", i);
954        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
955            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
956        fdb_set(db, doc[i]);
957    }
958
959    // commit with a manual WAL flush (these docs go into HB-trie)
960    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
961
962    // insert documents from 5 - 8
963    for (; i < n/2 - 1; i++){
964        sprintf(keybuf, "key%d", i);
965        sprintf(metabuf, "meta%d", i);
966        sprintf(bodybuf, "body%d", i);
967        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
968            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
969        fdb_set(db, doc[i]);
970    }
971
972    // We want to create:
973    // |WALFlushHDR|Key-Value1|HDR|Key-Value2|SnapshotHDR|Key-Value1|HDR|
974    // Insert doc 9 with a different value to test duplicate elimination..
975    sprintf(keybuf, "key%d", i);
976    sprintf(metabuf, "meta%d", i);
977    sprintf(bodybuf, "Body%d", i);
978    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
979            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
980    fdb_set(db, doc[i]);
981
982    // commit again without a WAL flush (last doc goes into the AVL tree)
983    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
984
985    // Insert doc 9 now again with expected value..
986    *(char *)doc[i]->body = 'b';
987    fdb_set(db, doc[i]);
988
989    // TAKE SNAPSHOT: pick up sequence number of a commit without a WAL flush
990    snap_seq = doc[i]->seqnum;
991
992    // Creation of a snapshot with a sequence number that was taken WITHOUT a
993    // commit must fail even if it from the latest document that was set...
994    fdb_get_kvs_info(db, &kvs_info);
995    status = fdb_snapshot_open(db, &snap_db, kvs_info.last_seqnum);
996    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
997
998    // ---------- Snapshot tests begin -----------------------
999    // Initialize an in-memory snapshot Without a Commit...
1000    // WAL items are not flushed...
1001    status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
1002    TEST_CHK(status == FDB_RESULT_SUCCESS);
1003
1004    // commit again without a WAL flush (these documents go into the AVL trees)
1005    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1006
1007    // Now re-insert doc 9 as another duplicate (only newer sequence number)
1008    fdb_set(db, doc[i]);
1009
1010    // commit again without a WAL flush (last doc goes into the AVL tree)
1011    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1012
1013    // insert documents from 10-14 into HB-trie
1014    for (++i; i < (n/2 + n/4); i++){
1015        sprintf(keybuf, "key%d", i);
1016        sprintf(metabuf, "meta%d", i);
1017        sprintf(bodybuf, "body%d", i);
1018        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1019                (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1020        fdb_set(db, doc[i]);
1021    }
1022
1023    status = fdb_set_log_callback(db, logCallbackFunc,
1024                                  (void *) "in_memory_snapshot_test");
1025    TEST_CHK(status == FDB_RESULT_SUCCESS);
1026
1027    // check snapshot's sequence number
1028    fdb_get_kvs_info(snap_db, &kvs_info);
1029    TEST_CHK(kvs_info.last_seqnum == snap_seq);
1030
1031    // insert documents from 15 - 19 on file into the WAL
1032    for (; i < n; i++){
1033        sprintf(keybuf, "key%d", i);
1034        sprintf(metabuf, "meta%d", i);
1035        sprintf(bodybuf, "body%d", i);
1036        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1037            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1038        fdb_set(db, doc[i]);
1039    }
1040    // commit without a WAL flush (This WAL must not affect snapshot)
1041    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1042
1043    // create an iterator on the snapshot for full range
1044    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1045
1046    // repeat until fail
1047    i=0;
1048    count=0;
1049    rdoc = NULL;
1050    do {
1051        status = fdb_iterator_get(iterator, &rdoc);
1052        TEST_CHK(status == FDB_RESULT_SUCCESS);
1053
1054        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1055        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
1056        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1057
1058        fdb_doc_free(rdoc);
1059        rdoc = NULL;
1060        i++;
1061        count++;
1062    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
1063
1064    TEST_CHK(count==n/2); // Only unique items from the first half
1065
1066    fdb_iterator_close(iterator);
1067
1068    // close db handle
1069    fdb_kvs_close(db);
1070    // close snapshot handle
1071    fdb_kvs_close(snap_db);
1072
1073    // close db file
1074    fdb_close(dbfile);
1075
1076    // free all documents
1077    for (i=0;i<n;++i){
1078        fdb_doc_free(doc[i]);
1079    }
1080
1081    // free all resources
1082    fdb_shutdown();
1083
1084    memleak_end();
1085
1086    TEST_RESULT("in-memory snapshot test");
1087}
1088
1089void in_memory_snapshot_cleanup_test()
1090{
1091    TEST_INIT();
1092
1093    memleak_start();
1094
1095    int i, r;
1096    int n = 20;
1097    fdb_file_handle *dbfile;
1098    fdb_kvs_handle *db;
1099    fdb_kvs_handle *snap_db1, *snap_db2, *snap_db3;
1100    fdb_kvs_handle *psnap_db1, *psnap_db2, *psnap_db3;
1101    fdb_doc **doc = alca(fdb_doc*, n);
1102    fdb_doc *rdoc;
1103    fdb_status status;
1104
1105    char keybuf[2560], metabuf[2560], bodybuf[2560];
1106
1107    // remove previous mvcc_test files
1108    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1109    (void)r;
1110
1111    fdb_config fconfig = fdb_get_default_config();
1112    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1113    fconfig.wal_threshold = 1024;
1114    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1115    fconfig.compaction_threshold = 0;
1116    fconfig.block_reusing_threshold = 0;
1117
1118    // remove previous mvcc_test files
1119    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1120    (void)r;
1121
1122    // open db
1123    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
1124    TEST_CHK(status == FDB_RESULT_SUCCESS);
1125    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
1126    TEST_CHK(status == FDB_RESULT_SUCCESS);
1127
1128    status = fdb_set_log_callback(db, logCallbackFunc,
1129                                  (void *) "in_memory_snapshot_cleanup_test");
1130    TEST_CHK(status == FDB_RESULT_SUCCESS);
1131
1132    i = 0;
1133    sprintf(keybuf, "key%d", i);
1134    sprintf(metabuf, "meta%d", i);
1135    sprintf(bodybuf, "Body%d", i);
1136    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1137            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1138    fdb_set(db, doc[i]);
1139
1140    // commit without a WAL flush
1141    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1142
1143    status = fdb_snapshot_open(db, &snap_db1, FDB_SNAPSHOT_INMEM);
1144    TEST_CHK(status == FDB_RESULT_SUCCESS);
1145
1146    status = fdb_set(db, doc[i]);
1147    TEST_CHK(status == FDB_RESULT_SUCCESS);
1148
1149    // commit without a WAL flush
1150    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1151    TEST_CHK(status == FDB_RESULT_SUCCESS);
1152
1153    status = fdb_snapshot_open(db, &snap_db2, FDB_SNAPSHOT_INMEM);
1154    TEST_CHK(status == FDB_RESULT_SUCCESS);
1155
1156    status = fdb_set(db, doc[i]);
1157    TEST_CHK(status == FDB_RESULT_SUCCESS);
1158
1159    // commit without a WAL flush
1160    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1161    TEST_CHK(status == FDB_RESULT_SUCCESS);
1162
1163    status = fdb_set(db, doc[i]);
1164    TEST_CHK(status == FDB_RESULT_SUCCESS);
1165
1166    // commit without a WAL flush
1167    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1168    TEST_CHK(status == FDB_RESULT_SUCCESS);
1169
1170    sprintf((char*)doc[i]->key, "Key%d", i);
1171    fdb_set(db, doc[i]);
1172
1173    status = fdb_snapshot_open(db, &snap_db3, FDB_SNAPSHOT_INMEM);
1174    TEST_CHK(status == FDB_RESULT_SUCCESS);
1175
1176    fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
1177
1178    status = fdb_get(snap_db1, rdoc);
1179    TEST_CHK(status == FDB_RESULT_SUCCESS);
1180
1181    status = fdb_get(snap_db2, rdoc);
1182    TEST_CHK(status == FDB_RESULT_SUCCESS);
1183
1184    status = fdb_get(snap_db3, rdoc);
1185    TEST_CHK(status == FDB_RESULT_SUCCESS);
1186
1187    fdb_doc_free(rdoc);
1188
1189    status = fdb_snapshot_open(db, &psnap_db1, 1);
1190    TEST_CHK(status == FDB_RESULT_SUCCESS);
1191
1192    status = fdb_snapshot_open(db, &psnap_db2, 2);
1193    TEST_CHK(status == FDB_RESULT_SUCCESS);
1194
1195    status = fdb_snapshot_open(db, &psnap_db3, 3);
1196    TEST_CHK(status == FDB_RESULT_SUCCESS);
1197
1198    // close snapshot handle
1199    fdb_kvs_close(snap_db1);
1200    fdb_kvs_close(snap_db2);
1201    fdb_kvs_close(snap_db3);
1202
1203    // commit without a WAL flush
1204    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1205    TEST_CHK(status == FDB_RESULT_SUCCESS);
1206
1207    fdb_kvs_close(psnap_db1);
1208    fdb_kvs_close(psnap_db2);
1209    fdb_kvs_close(psnap_db3);
1210
1211    // close db handle
1212    fdb_kvs_close(db);
1213
1214    // close db file
1215    fdb_close(dbfile);
1216
1217    fdb_doc_free(doc[i]);
1218
1219    // free all resources
1220    fdb_shutdown();
1221
1222    memleak_end();
1223
1224    TEST_RESULT("in-memory snapshot cleanup test");
1225}
1226
1227void in_memory_snapshot_on_dirty_hbtrie_test()
1228{
1229    TEST_INIT();
1230
1231    int n = 300, value_len=32;
1232    int i, r, idx, c;
1233    char cmd[256];
1234    char key[256], *value;
1235    char keystr[] = "k%05d";
1236    char keystr2[] = "k%06d";
1237    char valuestr[] = "value%08d";
1238    fdb_file_handle *db_file;
1239    fdb_kvs_handle *db, *snap, *snap_clone;
1240    fdb_config config;
1241    fdb_kvs_config kvs_config;
1242    fdb_status s;
1243    fdb_iterator *fit, *fit_normal, *fit_clone;
1244    fdb_doc *doc;
1245
1246    sprintf(cmd, SHELL_DEL " mvcc_test* > errorlog.txt");
1247    r = system(cmd);
1248    (void)r;
1249
1250    memleak_start();
1251
1252    value = (char*)malloc(value_len);
1253
1254    config = fdb_get_default_config();
1255    config.durability_opt = FDB_DRB_ASYNC;
1256    config.seqtree_opt = FDB_SEQTREE_USE;
1257    config.wal_flush_before_commit = true;
1258    config.wal_threshold = n/5;
1259    config.multi_kv_instances = true;
1260    config.buffercache_size = 0;
1261
1262    kvs_config = fdb_get_default_kvs_config();
1263
1264    s = fdb_open(&db_file, "./mvcc_test", &config);
1265    TEST_CHK(s == FDB_RESULT_SUCCESS);
1266    s = fdb_kvs_open(db_file, &db, "db", &kvs_config);
1267    TEST_CHK(s == FDB_RESULT_SUCCESS);
1268
1269    // write a few documents and commit & wal flush
1270    for (i=0;i<n/10;++i){
1271        idx = i;
1272        sprintf(key, keystr, idx);
1273        memset(value, 'x', value_len);
1274        memcpy(value + value_len - 6, "<end>", 6);
1275        sprintf(value, valuestr, idx);
1276        s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1277        TEST_CHK(s == FDB_RESULT_SUCCESS);
1278    }
1279    s = fdb_commit(db_file, FDB_COMMIT_MANUAL_WAL_FLUSH);
1280    TEST_CHK(s == FDB_RESULT_SUCCESS);
1281
1282    // create an in-memory snapshot and its clone
1283    s = fdb_snapshot_open(db, &snap, FDB_SNAPSHOT_INMEM);
1284    TEST_CHK(s == FDB_RESULT_SUCCESS);
1285    s = fdb_snapshot_open(snap, &snap_clone, FDB_SNAPSHOT_INMEM);
1286    TEST_CHK(s == FDB_RESULT_SUCCESS);
1287
1288    // write a number of documents in order to make WAL be flushed before commit
1289    for (i=n/10;i<n;++i){
1290        idx = i;
1291        sprintf(key, keystr, idx);
1292        memset(value, 'x', value_len);
1293        memcpy(value + value_len - 6, "<end>", 6);
1294        sprintf(value, valuestr, idx);
1295        s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1296        TEST_CHK(s == FDB_RESULT_SUCCESS);
1297    }
1298
1299    void *v_out;
1300    size_t vlen_out;
1301    idx = n/10;
1302    sprintf(key, keystr, idx);
1303    s = fdb_get_kv(snap, key, strlen(key)+1, &v_out, &vlen_out);
1304    // should not be able to retrieve
1305    TEST_CHK(s != FDB_RESULT_SUCCESS);
1306
1307    s = fdb_get_kv(snap_clone, key, strlen(key)+1, &v_out, &vlen_out);
1308    // should not be able to retrieve in also clone
1309    TEST_CHK(s != FDB_RESULT_SUCCESS);
1310
1311    // close snapshot and its clone
1312    s = fdb_kvs_close(snap);
1313    TEST_CHK(s == FDB_RESULT_SUCCESS);
1314    s = fdb_kvs_close(snap_clone);
1315    TEST_CHK(s == FDB_RESULT_SUCCESS);
1316
1317    // create an in-memory snapshot
1318    s = fdb_snapshot_open(db, &snap, FDB_SNAPSHOT_INMEM);
1319    TEST_CHK(s == FDB_RESULT_SUCCESS);
1320
1321    s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1322    TEST_CHK(s == FDB_RESULT_SUCCESS);
1323    c = 0;
1324    do {
1325        doc = NULL;
1326        s = fdb_iterator_get(fit, &doc);
1327        if (s != FDB_RESULT_SUCCESS) break;
1328
1329        idx = c;
1330        sprintf(key, keystr, idx);
1331        memset(value, 'x', value_len);
1332        memcpy(value + value_len - 6, "<end>", 6);
1333        sprintf(value, valuestr, idx);
1334        TEST_CMP(doc->key, key, doc->keylen);
1335        TEST_CMP(doc->body, value, doc->bodylen);
1336        c++;
1337        fdb_doc_free(doc);
1338    } while(fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1339    TEST_CHK(c == n);
1340
1341    s = fdb_iterator_close(fit);
1342    TEST_CHK(s == FDB_RESULT_SUCCESS);
1343
1344    // create a clone
1345    s = fdb_snapshot_open(snap, &snap_clone, FDB_SNAPSHOT_INMEM);
1346    TEST_CHK(s == FDB_RESULT_SUCCESS);
1347
1348    // create iterators on snapshot and its clone
1349    s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1350    TEST_CHK(s == FDB_RESULT_SUCCESS);
1351    s = fdb_iterator_init(snap_clone, &fit_clone, NULL, 0, NULL, 0, 0x0);
1352    TEST_CHK(s == FDB_RESULT_SUCCESS);
1353
1354    // also create an iterator on a normal handle
1355    s = fdb_iterator_init(db, &fit_normal, NULL, 0, NULL, 0, 0x0);
1356    TEST_CHK(s == FDB_RESULT_SUCCESS);
1357
1358    c = 0;
1359    do {
1360        doc = NULL;
1361        s = fdb_iterator_get(fit, &doc);
1362        if (s != FDB_RESULT_SUCCESS) break;
1363
1364        idx = c;
1365        sprintf(key, keystr, idx);
1366        memset(value, 'x', value_len);
1367        memcpy(value + value_len - 6, "<end>", 6);
1368        sprintf(value, valuestr, idx);
1369        TEST_CMP(doc->key, key, doc->keylen);
1370        TEST_CMP(doc->body, value, doc->bodylen);
1371        c++;
1372        fdb_doc_free(doc);
1373
1374        doc = NULL;
1375        s = fdb_iterator_get(fit, &doc);
1376        TEST_CHK(s == FDB_RESULT_SUCCESS);
1377        TEST_CMP(doc->key, key, doc->keylen);
1378        TEST_CMP(doc->body, value, doc->bodylen);
1379        fdb_doc_free(doc);
1380
1381        if (c == n/5) {
1382            // insert new docs in the middle of iteration
1383            for (i=0; i<n*10; ++i){
1384                idx = i;
1385                sprintf(key, keystr2, idx);
1386                memset(value, 'x', value_len);
1387                memcpy(value + value_len - 6, "<end>", 6);
1388                sprintf(value, valuestr, idx);
1389                s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1390                TEST_CHK(s == FDB_RESULT_SUCCESS);
1391            }
1392        }
1393
1394        s = fdb_iterator_next(fit);
1395        // result should be same to clone
1396        if (s != FDB_RESULT_SUCCESS) {
1397            s = fdb_iterator_next(fit_clone);
1398            TEST_CHK(s != FDB_RESULT_SUCCESS);
1399        } else {
1400            s = fdb_iterator_next(fit_clone);
1401            TEST_CHK(s == FDB_RESULT_SUCCESS);
1402        }
1403    } while(s == FDB_RESULT_SUCCESS);
1404    TEST_CHK(c == n);
1405
1406    // close iterators
1407    s = fdb_iterator_close(fit);
1408    TEST_CHK(s == FDB_RESULT_SUCCESS);
1409    s = fdb_iterator_close(fit_clone);
1410    TEST_CHK(s == FDB_RESULT_SUCCESS);
1411
1412    // the results should be same in the normal iterator
1413    c = 0;
1414    do {
1415        doc = NULL;
1416        s = fdb_iterator_get(fit_normal, &doc);
1417        if (s != FDB_RESULT_SUCCESS) break;
1418
1419        idx = c;
1420        sprintf(key, keystr, idx);
1421        memset(value, 'x', value_len);
1422        memcpy(value + value_len - 6, "<end>", 6);
1423        sprintf(value, valuestr, idx);
1424        TEST_CMP(doc->key, key, doc->keylen);
1425        TEST_CMP(doc->body, value, doc->bodylen);
1426        c++;
1427        fdb_doc_free(doc);
1428    } while(fdb_iterator_next(fit_normal) == FDB_RESULT_SUCCESS);
1429    TEST_CHK(c == n);
1430
1431    s = fdb_iterator_close(fit_normal);
1432    TEST_CHK(s == FDB_RESULT_SUCCESS);
1433
1434    s = fdb_kvs_close(snap);
1435    TEST_CHK(s == FDB_RESULT_SUCCESS);
1436
1437    s = fdb_kvs_close(snap_clone);
1438    TEST_CHK(s == FDB_RESULT_SUCCESS);
1439
1440    s = fdb_close(db_file);
1441    TEST_CHK(s == FDB_RESULT_SUCCESS);
1442    s = fdb_shutdown();
1443    TEST_CHK(s == FDB_RESULT_SUCCESS);
1444    free(value);
1445
1446    memleak_end();
1447
1448    TEST_RESULT("in-memory snapshot on dirty HB+trie nodes test");
1449}
1450
1451
1452struct cb_inmem_snap_args {
1453    fdb_kvs_handle *handle;
1454    int n;
1455    int move_count;
1456    int value_len;
1457    char *keystr;
1458    char *valuestr;
1459};
1460
1461static fdb_compact_decision cb_inmem_snap(fdb_file_handle *fhandle,
1462                            fdb_compaction_status status,
1463                            const char *kv_name,
1464                            fdb_doc *doc_in, uint64_t old_offset,
1465                            uint64_t new_offset,
1466                            void *ctx)
1467{
1468    TEST_INIT();
1469    int c, idx;
1470    char key[256], value[256];
1471    void *value_out;
1472    size_t valuelen;
1473    fdb_kvs_handle *snap;
1474    fdb_kvs_info info;
1475    fdb_iterator *fit;
1476    fdb_doc *doc;
1477    fdb_status s;
1478    fdb_compact_decision ret = FDB_CS_KEEP_DOC;
1479    struct cb_inmem_snap_args *args = (struct cb_inmem_snap_args *)ctx;
1480    (void)args;
1481
1482    if (status == FDB_CS_MOVE_DOC) {
1483        TEST_CHK(kv_name);
1484        if (doc_in->deleted) {
1485            ret = FDB_CS_DROP_DOC;
1486        }
1487        args->move_count++;
1488        if (args->move_count == 2) {
1489            // open in-memory snapshot
1490            s = fdb_snapshot_open(args->handle, &snap, FDB_SNAPSHOT_INMEM);
1491            TEST_CHK(s == FDB_RESULT_SUCCESS);
1492
1493            s = fdb_get_kvs_info(snap, &info);
1494            TEST_CHK(s == FDB_RESULT_SUCCESS);
1495            TEST_CHK(info.last_seqnum == (fdb_seqnum_t)args->n);
1496
1497            s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1498            TEST_CHK(s == FDB_RESULT_SUCCESS);
1499
1500            c = 0;
1501            do {
1502                doc = NULL;
1503                s = fdb_iterator_get(fit, &doc);
1504                if (s != FDB_RESULT_SUCCESS) {
1505                    break;
1506                }
1507                idx = c;
1508                sprintf(key, args->keystr, idx);
1509                memset(value, 'x', args->value_len);
1510                memcpy(value + args->value_len - 6, "<end>", 6);
1511                sprintf(value, args->valuestr, idx);
1512                TEST_CMP(doc->key, key, doc->keylen);
1513                TEST_CMP(doc->body, value, doc->bodylen);
1514
1515                c++;
1516                fdb_doc_free(doc);
1517            } while (fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1518            TEST_CHK(c == args->n);
1519
1520            s = fdb_iterator_close(fit);
1521            TEST_CHK(s == FDB_RESULT_SUCCESS);
1522
1523            fdb_kvs_close(snap);
1524
1525        } else if (args->move_count == 5) {
1526            // insert new doc
1527            sprintf(key, "new_key");
1528            sprintf(value, "new_value");
1529            s = fdb_set_kv(args->handle, (void*)key, strlen(key)+1, (void*)value, strlen(value)+1);
1530            TEST_CHK(s == FDB_RESULT_SUCCESS);
1531
1532            // open in-memory snapshot
1533            s = fdb_snapshot_open(args->handle, &snap, FDB_SNAPSHOT_INMEM);
1534            TEST_CHK(s == FDB_RESULT_SUCCESS);
1535
1536            s = fdb_get_kvs_info(snap, &info);
1537            TEST_CHK(s == FDB_RESULT_SUCCESS);
1538            TEST_CHK(info.last_seqnum == (fdb_seqnum_t)args->n+1);
1539
1540            s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1541            TEST_CHK(s == FDB_RESULT_SUCCESS);
1542
1543            c = 0;
1544            do {
1545                doc = NULL;
1546                s = fdb_iterator_get(fit, &doc);
1547                if (s != FDB_RESULT_SUCCESS) {
1548                    break;
1549                }
1550                if (c < args->n) {
1551                    idx = c;
1552                    sprintf(key, args->keystr, idx);
1553                    memset(value, 'x', args->value_len);
1554                    memcpy(value + args->value_len - 6, "<end>", 6);
1555                    sprintf(value, args->valuestr, idx);
1556                    TEST_CMP(doc->key, key, doc->keylen);
1557                    TEST_CMP(doc->body, value, doc->bodylen);
1558                } else {
1559                    // new document
1560                    sprintf(key, "new_key");
1561                    sprintf(value, "new_value");
1562                    TEST_CMP(doc->key, key, doc->keylen);
1563                    TEST_CMP(doc->body, value, doc->bodylen);
1564                }
1565
1566                c++;
1567                fdb_doc_free(doc);
1568            } while (fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1569            TEST_CHK(c == args->n + 1);
1570
1571            s = fdb_iterator_close(fit);
1572            TEST_CHK(s == FDB_RESULT_SUCCESS);
1573
1574            s = fdb_get_kv(snap, (void*)key, strlen(key)+1, &value_out, &valuelen);
1575            TEST_CHK(s == FDB_RESULT_SUCCESS);
1576            TEST_CMP(value_out, value, valuelen);
1577            fdb_free_block(value_out);
1578
1579            fdb_kvs_close(snap);
1580        }
1581    }
1582    return ret;
1583}
1584
1585void in_memory_snapshot_compaction_test()
1586{
1587    TEST_INIT();
1588
1589    int n = 10, value_len=32;
1590    int i, r, c, idx;
1591    char cmd[256];
1592    char key[256], *value;
1593    char keystr[] = "k%05d";
1594    char valuestr[] = "value%08d";
1595    void *value_out;
1596    size_t valuelen;
1597    fdb_file_handle *db_file;
1598    fdb_kvs_handle *db, *db2, *snap;
1599    fdb_config config;
1600    fdb_kvs_config kvs_config;
1601    fdb_kvs_info info;
1602    fdb_iterator *fit;
1603    fdb_doc *doc;
1604    fdb_status s;
1605    struct cb_inmem_snap_args cargs;
1606
1607    sprintf(cmd, SHELL_DEL " mvcc_test* > errorlog.txt");
1608    r = system(cmd);
1609    (void)r;
1610
1611    memleak_start();
1612
1613    value = (char*)malloc(value_len);
1614
1615    config = fdb_get_default_config();
1616    config.durability_opt = FDB_DRB_ASYNC;
1617    config.seqtree_opt = FDB_SEQTREE_USE;
1618    config.wal_flush_before_commit = true;
1619    config.wal_threshold = n/5;
1620    config.multi_kv_instances = true;
1621    config.buffercache_size = 0;
1622    config.compaction_cb = cb_inmem_snap;
1623    config.compaction_cb_mask = FDB_CS_MOVE_DOC;
1624    config.compaction_cb_ctx = &cargs;
1625
1626    kvs_config = fdb_get_default_kvs_config();
1627
1628    s = fdb_open(&db_file, "./mvcc_test", &config);
1629    TEST_CHK(s == FDB_RESULT_SUCCESS);
1630    s = fdb_kvs_open(db_file, &db, "db", &kvs_config);
1631    TEST_CHK(s == FDB_RESULT_SUCCESS);
1632
1633    s = fdb_kvs_open(db_file, &db2, "db", &kvs_config);
1634    TEST_CHK(s == FDB_RESULT_SUCCESS);
1635
1636    cargs.handle = db2;
1637    cargs.move_count = 0;
1638    cargs.n = n;
1639    cargs.keystr = keystr;
1640    cargs.valuestr = valuestr;
1641    cargs.value_len = value_len;
1642
1643    // write
1644    for (i=0;i<n;++i){
1645        idx = i;
1646        sprintf(key, keystr, idx);
1647        memset(value, 'x', value_len);
1648        memcpy(value + value_len - 6, "<end>", 6);
1649        sprintf(value, valuestr, idx);
1650        s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1651        TEST_CHK(s == FDB_RESULT_SUCCESS);
1652    }
1653    s = fdb_commit(db_file, FDB_COMMIT_MANUAL_WAL_FLUSH);
1654    TEST_CHK(s == FDB_RESULT_SUCCESS);
1655
1656    s = fdb_compact(db_file, "./mvcc_test2");
1657    TEST_CHK(s == FDB_RESULT_SUCCESS);
1658
1659    // open in-memory snapshot
1660    s = fdb_snapshot_open(db, &snap, FDB_SNAPSHOT_INMEM);
1661    TEST_CHK(s == FDB_RESULT_SUCCESS);
1662
1663    s = fdb_get_kvs_info(snap, &info);
1664    TEST_CHK(s == FDB_RESULT_SUCCESS);
1665    TEST_CHK(info.last_seqnum == (fdb_seqnum_t)n+1);
1666
1667    s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1668    TEST_CHK(s == FDB_RESULT_SUCCESS);
1669
1670    c = 0;
1671    do {
1672        doc = NULL;
1673        s = fdb_iterator_get(fit, &doc);
1674        if (s != FDB_RESULT_SUCCESS) {
1675            break;
1676        }
1677        if (c < n) {
1678            idx = c;
1679            sprintf(key, keystr, idx);
1680            memset(value, 'x', value_len);
1681            memcpy(value + value_len - 6, "<end>", 6);
1682            sprintf(value, valuestr, idx);
1683            TEST_CMP(doc->key, key, doc->keylen);
1684            TEST_CMP(doc->body, value, doc->bodylen);
1685        } else {
1686            // new document
1687            sprintf(key, "new_key");
1688            sprintf(value, "new_value");
1689            TEST_CMP(doc->key, key, doc->keylen);
1690            TEST_CMP(doc->body, value, doc->bodylen);
1691        }
1692
1693        c++;
1694        fdb_doc_free(doc);
1695    } while (fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1696    TEST_CHK(c == n + 1);
1697
1698    s = fdb_iterator_close(fit);
1699    TEST_CHK(s == FDB_RESULT_SUCCESS);
1700
1701    s = fdb_get_kv(snap, (void*)key, strlen(key)+1, &value_out, &valuelen);
1702    TEST_CHK(s == FDB_RESULT_SUCCESS);
1703    TEST_CMP(value_out, value, valuelen);
1704    fdb_free_block(value_out);
1705
1706    fdb_kvs_close(snap);
1707
1708    s = fdb_close(db_file);
1709    TEST_CHK(s == FDB_RESULT_SUCCESS);
1710    s = fdb_shutdown();
1711    TEST_CHK(s == FDB_RESULT_SUCCESS);
1712    free(value);
1713
1714    memleak_end();
1715
1716    TEST_RESULT("in-memory snapshot with concurrent compaction test");
1717}
1718
1719void snapshot_clone_test()
1720{
1721    TEST_INIT();
1722
1723    memleak_start();
1724
1725    int i, r;
1726    int n = 20;
1727    int count;
1728    fdb_file_handle *dbfile;
1729    fdb_kvs_handle *db;
1730    fdb_kvs_handle *snap_db, *snap_inmem;
1731    fdb_kvs_handle *snap_db2, *snap_inmem2; // clones from stable & inmemory
1732    fdb_seqnum_t snap_seq;
1733    fdb_doc **doc = alca(fdb_doc*, n);
1734    fdb_doc *rdoc;
1735    fdb_kvs_info kvs_info;
1736    fdb_status status;
1737    fdb_iterator *iterator;
1738
1739    char keybuf[256], metabuf[256], bodybuf[256];
1740
1741    // remove previous mvcc_test files
1742    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1743    (void)r;
1744
1745    fdb_config fconfig = fdb_get_default_config();
1746    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1747    fconfig.buffercache_size = 0;
1748    fconfig.wal_threshold = 1024;
1749    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
1750    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1751    fconfig.compaction_threshold = 0;
1752
1753    // remove previous mvcc_test files
1754    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1755    (void)r;
1756
1757    // open db
1758    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
1759    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1760    TEST_CHK(status == FDB_RESULT_SUCCESS);
1761
1762    // Create a snapshot from an empty database file
1763    status = fdb_snapshot_open(db, &snap_db, 0);
1764    TEST_CHK(status == FDB_RESULT_SUCCESS);
1765    // check if snapshot's sequence number is zero.
1766    fdb_get_kvs_info(snap_db, &kvs_info);
1767    TEST_CHK(kvs_info.last_seqnum == 0);
1768    // create an iterator on the snapshot for full range
1769    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1770    // Iterator should not return any items.
1771    status = fdb_iterator_next(iterator);
1772    TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
1773    fdb_iterator_close(iterator);
1774    fdb_kvs_close(snap_db);
1775
1776    // ------- Setup test ----------------------------------
1777    // insert documents of 0-4
1778    for (i=0; i<n/4; i++){
1779        sprintf(keybuf, "key%d", i);
1780        sprintf(metabuf, "meta%d", i);
1781        sprintf(bodybuf, "body%d", i);
1782        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1783            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1784        fdb_set(db, doc[i]);
1785    }
1786
1787    // commit with a manual WAL flush (these docs go into HB-trie)
1788    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1789
1790    // insert documents from 4 - 8
1791    for (; i < n/2 - 1; i++){
1792        sprintf(keybuf, "key%d", i);
1793        sprintf(metabuf, "meta%d", i);
1794        sprintf(bodybuf, "body%d", i);
1795        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1796            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1797        fdb_set(db, doc[i]);
1798    }
1799
1800    // We want to create:
1801    // |WALFlushHDR|Key-Value1|HDR|Key-Value2|SnapshotHDR|Key-Value1|HDR|
1802    // Insert doc 9 with a different value to test duplicate elimination..
1803    sprintf(keybuf, "key%d", i);
1804    sprintf(metabuf, "meta%d", i);
1805    sprintf(bodybuf, "Body%d", i);
1806    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1807            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1808    fdb_set(db, doc[i]);
1809
1810    // commit again without a WAL flush (last doc goes into the AVL tree)
1811    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1812
1813    // Insert doc 9 now again with expected value..
1814    *(char *)doc[i]->body = 'b';
1815    fdb_set(db, doc[i]);
1816    // commit again without a WAL flush (these documents go into the AVL trees)
1817    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1818
1819    // TAKE SNAPSHOT: pick up sequence number of a commit without a WAL flush
1820    snap_seq = doc[i]->seqnum;
1821
1822    // Initialize an in-memory snapshot Without a Commit...
1823    // WAL items are not flushed...
1824    status = fdb_snapshot_open(db, &snap_inmem, FDB_SNAPSHOT_INMEM);
1825    TEST_CHK(status == FDB_RESULT_SUCCESS);
1826
1827    // Now re-insert doc 9 as another duplicate (only newer sequence number)
1828    fdb_set(db, doc[i]);
1829    // commit again without a WAL flush (last doc goes into the AVL tree)
1830    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1831
1832    // insert documents from 10-14 into HB-trie
1833    for (++i; i < (n/2 + n/4); i++){
1834        sprintf(keybuf, "key%d", i);
1835        sprintf(metabuf, "meta%d", i);
1836        sprintf(bodybuf, "body%d", i);
1837        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1838            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1839        fdb_set(db, doc[i]);
1840    }
1841    // manually flush WAL & commit
1842    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1843
1844    status = fdb_set_log_callback(db, logCallbackFunc,
1845                                  (void *) "snapshot_clone_test");
1846    TEST_CHK(status == FDB_RESULT_SUCCESS);
1847    // ---------- Snapshot tests begin -----------------------
1848    // Attempt to take snapshot with out-of-range marker..
1849    status = fdb_snapshot_open(db, &snap_db, 999999);
1850    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
1851
1852    // Init Snapshot of open file with saved document seqnum as marker
1853    status = fdb_snapshot_open(db, &snap_db, snap_seq);
1854    TEST_CHK(status == FDB_RESULT_SUCCESS);
1855
1856    // Clone a snapshot into another snapshot...
1857    status = fdb_snapshot_open(snap_db, &snap_db2, snap_seq);
1858    TEST_CHK(status == FDB_RESULT_SUCCESS);
1859
1860    // close snapshot handle
1861    status = fdb_kvs_close(snap_db);
1862    TEST_CHK(status == FDB_RESULT_SUCCESS);
1863
1864    // check snapshot's sequence number
1865    fdb_get_kvs_info(snap_db2, &kvs_info);
1866    TEST_CHK(kvs_info.last_seqnum == snap_seq);
1867
1868    // insert documents from 15 - 19 on file into the WAL
1869    for (; i < n; i++){
1870        sprintf(keybuf, "key%d", i);
1871        sprintf(metabuf, "meta%d", i);
1872        sprintf(bodybuf, "body%d", i);
1873        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1874            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1875        fdb_set(db, doc[i]);
1876    }
1877    // commit without a WAL flush (This WAL must not affect snapshot)
1878    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1879
1880    // Clone the in-memory snapshot into another snapshot
1881    status = fdb_snapshot_open(snap_inmem, &snap_inmem2, FDB_SNAPSHOT_INMEM);
1882    TEST_CHK(status == FDB_RESULT_SUCCESS);
1883
1884    // Stable Snapshot Scan Tests..........
1885    // Retrieve metaonly by key from snapshot
1886    i = snap_seq + 1;
1887    fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1888    status = fdb_get_metaonly(snap_db2, rdoc);
1889    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
1890    fdb_doc_free(rdoc);
1891
1892    i = 5;
1893    fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1894    status = fdb_get_metaonly(snap_db2, rdoc);
1895    TEST_CHK(status == FDB_RESULT_SUCCESS);
1896    TEST_CMP(rdoc->key, doc[i]->key, doc[i]->keylen);
1897    TEST_CMP(rdoc->meta, doc[i]->meta, doc[i]->metalen);
1898    fdb_doc_free(rdoc);
1899
1900    // Retrieve by seq from snapshot
1901    fdb_doc_create(&rdoc, NULL, 0, NULL, 0, NULL, 0);
1902    rdoc->seqnum = 6;
1903    status = fdb_get_byseq(snap_db2, rdoc);
1904    TEST_CHK(status == FDB_RESULT_SUCCESS);
1905    TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1906    TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1907    fdb_doc_free(rdoc);
1908    rdoc = NULL;
1909
1910    // create an iterator on the snapshot for full range
1911    fdb_iterator_init(snap_db2, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1912
1913    // repeat until fail
1914    i=0;
1915    count=0;
1916    do {
1917        status = fdb_iterator_get(iterator, &rdoc);
1918        TEST_CHK(status == FDB_RESULT_SUCCESS);
1919        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1920        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
1921        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1922
1923        fdb_doc_free(rdoc);
1924        rdoc = NULL;
1925        i ++;
1926        count++;
1927    } while (fdb_iterator_next(iterator) == FDB_RESULT_SUCCESS);
1928
1929    TEST_CHK(count==n/2); // Only unique items from the first half
1930
1931    fdb_iterator_close(iterator);
1932
1933    // create an iterator on the in-memory snapshot clone for full range
1934    fdb_iterator_init(snap_inmem2, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1935
1936    // repeat until fail
1937    i=0;
1938    count=0;
1939    do {
1940        status = fdb_iterator_get(iterator, &rdoc);
1941        TEST_CHK(status == FDB_RESULT_SUCCESS);
1942        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1943        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
1944        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1945
1946        fdb_doc_free(rdoc);
1947        rdoc = NULL;
1948        i ++;
1949        count++;
1950    } while(fdb_iterator_next(iterator) == FDB_RESULT_SUCCESS);
1951
1952    TEST_CHK(count==n/2); // Only unique items from the first half
1953
1954    fdb_iterator_close(iterator);
1955
1956    // close db handle
1957    fdb_kvs_close(db);
1958    // close the in-memory snapshot handle
1959    fdb_kvs_close(snap_inmem);
1960    // close the in-memory clone snapshot handle
1961    fdb_kvs_close(snap_inmem2);
1962    // close the clone snapshot handle
1963    fdb_kvs_close(snap_db2);
1964    // close db file
1965    fdb_close(dbfile);
1966
1967    // free all documents
1968    for (i=0;i<n;++i){
1969        fdb_doc_free(doc[i]);
1970    }
1971
1972    // free all resources
1973    fdb_shutdown();
1974
1975    memleak_end();
1976
1977    TEST_RESULT("snapshot clone test");
1978}
1979
1980struct parallel_clone_t {
1981    fdb_doc **doc;
1982    fdb_kvs_handle *snap_db;
1983    int num_docs;
1984};
1985
1986void *snap_clone_thread(void *args)
1987{
1988    struct parallel_clone_t *t = (struct parallel_clone_t *)args;
1989    fdb_kvs_handle *clone_db;
1990    fdb_iterator *iterator;
1991    fdb_doc *rdoc = NULL;
1992    fdb_doc **doc = t->doc;
1993    int i;
1994    int n = t->num_docs;
1995    fdb_status status;
1996    int count;
1997    TEST_INIT();
1998
1999    status = fdb_snapshot_open(t->snap_db, &clone_db, FDB_SNAPSHOT_INMEM);
2000    TEST_CHK(status == FDB_RESULT_SUCCESS);
2001
2002    // create an iterator on the in-memory snapshot clone for full range
2003    fdb_iterator_init(clone_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
2004
2005    // repeat until fail
2006    i=0;
2007    count=0;
2008    do {
2009        status = fdb_iterator_get(iterator, &rdoc);
2010        TEST_CHK(status == FDB_RESULT_SUCCESS);
2011        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
2012        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
2013
2014        fdb_doc_free(rdoc);
2015        rdoc = NULL;
2016        i ++;
2017        count++;
2018    } while(fdb_iterator_next(iterator) == FDB_RESULT_SUCCESS);
2019
2020    TEST_CHK(count==n/2); // Only unique items from the first half
2021
2022    fdb_iterator_close(iterator);
2023
2024    fdb_kvs_close(clone_db);
2025    thread_exit(0);
2026    return NULL;
2027}
2028
2029void snapshot_parallel_clone_test()
2030{
2031    TEST_INIT();
2032
2033    memleak_start();
2034
2035    int i, r;
2036    int num_cloners = 30; // parallel snapshot clone operations
2037    int n = 20480; // 10 dirty wal flushes at least
2038    fdb_file_handle *dbfile;
2039    fdb_kvs_handle *db;
2040    fdb_kvs_handle *snap_inmem;
2041    fdb_doc **doc = alca(fdb_doc*, n);
2042    thread_t *tid = alca(thread_t, num_cloners);
2043    void *thread_ret;
2044    fdb_status status;
2045    struct parallel_clone_t clone_data;
2046
2047    char keybuf[256], bodybuf[256];
2048
2049    // remove previous mvcc_test files
2050    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2051    (void)r;
2052
2053    fdb_config fconfig = fdb_get_default_config();
2054    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2055    fconfig.buffercache_size = 0;
2056    fconfig.wal_threshold = 1024;
2057    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2058    fconfig.compaction_threshold = 0;
2059
2060    // remove previous mvcc_test files
2061    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2062    (void)r;
2063
2064    // open db
2065    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2066    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2067    TEST_CHK(status == FDB_RESULT_SUCCESS);
2068
2069    status = fdb_set_log_callback(db, logCallbackFunc,
2070                                  (void *) "snapshot_parallel_clone_test");
2071    TEST_CHK(status == FDB_RESULT_SUCCESS);
2072    // ------- Setup test ----------------------------------
2073    for (i=0; i<n/2; i++){
2074        sprintf(keybuf, "key%5d", i);
2075        sprintf(bodybuf, "body%d", i);
2076        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2077            NULL, 0, (void*)bodybuf, strlen(bodybuf));
2078        fdb_set(db, doc[i]);
2079    }
2080
2081    // Initialize an in-memory snapshot Without a Commit...
2082    // WAL items are not flushed...
2083    status = fdb_snapshot_open(db, &snap_inmem, FDB_SNAPSHOT_INMEM);
2084    TEST_CHK(status == FDB_RESULT_SUCCESS);
2085
2086    clone_data.doc = doc;
2087    clone_data.num_docs = n;
2088    clone_data.snap_db = snap_inmem;
2089
2090    for (int j = num_cloners - 1; j>=0; --j) {
2091        thread_create(&tid[j], snap_clone_thread, &clone_data);
2092    }
2093
2094    for (; i < n; i++){
2095        sprintf(keybuf, "key%5d", i);
2096        sprintf(bodybuf, "BODY%d", i);
2097        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2098            NULL, 0, (void*)bodybuf, strlen(bodybuf));
2099        fdb_set(db, doc[i]);
2100    }
2101    // commit without a WAL flush (This WAL must not affect snapshot)
2102    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2103
2104    for (int j = num_cloners - 1; j>=0; --j) {
2105        thread_join(tid[j], &thread_ret);
2106    }
2107
2108    // close db handle
2109    fdb_kvs_close(db);
2110    // close the in-memory snapshot handle
2111    fdb_kvs_close(snap_inmem);
2112    // close db file
2113    fdb_close(dbfile);
2114
2115    // free all documents
2116    for (i=0;i<n;++i){
2117        fdb_doc_free(doc[i]);
2118    }
2119
2120    // free all resources
2121    fdb_shutdown();
2122
2123    memleak_end();
2124
2125    TEST_RESULT("snapshot parallel clone test");
2126}
2127
2128void snapshot_markers_in_file_test(bool multi_kv)
2129{
2130    TEST_INIT();
2131
2132    memleak_start();
2133
2134    int i, r;
2135    int n = 20;
2136    int num_kvs = 4; // keep this the same as number of fdb_commit() calls
2137    fdb_file_handle *dbfile;
2138    fdb_kvs_handle **db = alca(fdb_kvs_handle *, num_kvs);
2139    fdb_doc **doc = alca(fdb_doc*, n);
2140    fdb_status status;
2141    fdb_snapshot_info_t *markers;
2142    uint64_t num_markers;
2143
2144    char keybuf[256], metabuf[256], bodybuf[256];
2145    char kv_name[8];
2146
2147    fdb_config fconfig = fdb_get_default_config();
2148    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2149    fconfig.buffercache_size = 0;
2150    fconfig.wal_threshold = 1024;
2151    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2152    fconfig.compaction_threshold = 0;
2153    fconfig.multi_kv_instances = multi_kv;
2154    // for creating the strict number of snapshots, disable block reusing
2155    fconfig.block_reusing_threshold = 0;
2156
2157    // remove previous mvcc_test files
2158    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2159    (void)r;
2160
2161    // open db
2162    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2163    if (multi_kv) {
2164        for (r = 0; r < num_kvs; ++r) {
2165            sprintf(kv_name, "kv%d", r);
2166            fdb_kvs_open(dbfile, &db[r], kv_name, &kvs_config);
2167        }
2168    } else {
2169        num_kvs = 1;
2170        fdb_kvs_open_default(dbfile, &db[0], &kvs_config);
2171    }
2172
2173   // ------- Setup test ----------------------------------
2174   // insert documents of 0-4
2175    for (i=0; i<n/4; i++){
2176        sprintf(keybuf, "key%d", i);
2177        sprintf(metabuf, "meta%d", i);
2178        sprintf(bodybuf, "body%d", i);
2179        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2180            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2181        for (r = 0; r < num_kvs; ++r) {
2182            fdb_set(db[r], doc[i]);
2183        }
2184    }
2185
2186    // commit with a manual WAL flush (these docs go into HB-trie)
2187    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2188
2189    // insert documents from 5 - 9
2190    for (; i < n/2; i++){
2191        sprintf(keybuf, "key%d", i);
2192        sprintf(metabuf, "meta%d", i);
2193        sprintf(bodybuf, "body%d", i);
2194        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2195            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2196        for (r = 0; r < num_kvs; ++r) {
2197            fdb_set(db[r], doc[i]);
2198        }
2199    }
2200
2201    // commit again without a WAL flush
2202    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2203
2204    // insert documents from 10-14 into HB-trie
2205    for (; i < (n/2 + n/4); i++){
2206        sprintf(keybuf, "key%d", i);
2207        sprintf(metabuf, "meta%d", i);
2208        sprintf(bodybuf, "body%d", i);
2209        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2210            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2211        for (r = 0; r < num_kvs; ++r) {
2212            fdb_set(db[r], doc[i]);
2213        }
2214    }
2215    // manually flush WAL & commit
2216    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2217
2218    // insert documents from 15 - 19 on file into the WAL
2219    for (; i < n; i++){
2220        sprintf(keybuf, "key%d", i);
2221        sprintf(metabuf, "meta%d", i);
2222        sprintf(bodybuf, "body%d", i);
2223        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2224            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2225        for (r = 0; r < num_kvs; ++r) {
2226            fdb_set(db[r], doc[i]);
2227        }
2228    }
2229    // commit without a WAL flush
2230    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2231
2232    for (r = 0; r < num_kvs; ++r) {
2233        status = fdb_set_log_callback(db[r], logCallbackFunc,
2234                                      (void *) "snapshot_markers_in_file");
2235        TEST_CHK(status == FDB_RESULT_SUCCESS);
2236    }
2237
2238    status = fdb_get_all_snap_markers(dbfile, &markers, &num_markers);
2239    TEST_CHK(status == FDB_RESULT_SUCCESS);
2240
2241    if (!multi_kv) {
2242        TEST_CHK(num_markers == 4);
2243        for (r = 0; r < num_kvs; ++r) {
2244            TEST_CHK(markers[r].num_kvs_markers == 1);
2245            TEST_CHK(markers[r].kvs_markers[0].seqnum
2246                     == (fdb_seqnum_t)(n - r*5));
2247        }
2248    } else {
2249        TEST_CHK(num_markers == 8);
2250        for (r = 0; r < num_kvs; ++r) {
2251            TEST_CHK(markers[r].num_kvs_markers == num_kvs);
2252            for (i = 0; i < num_kvs; ++i) {
2253                TEST_CHK(markers[r].kvs_markers[i].seqnum
2254                         == (fdb_seqnum_t)(n - r*5));
2255                sprintf(kv_name, "kv%d", i);
2256                TEST_CMP(markers[r].kvs_markers[i].kv_store_name, kv_name, 3);
2257            }
2258        }
2259    }
2260
2261    status = fdb_free_snap_markers(markers, num_markers);
2262    TEST_CHK(status == FDB_RESULT_SUCCESS);
2263
2264    // close db file
2265    fdb_close(dbfile);
2266
2267    // free all documents
2268    for (i=0;i<n;++i){
2269        fdb_doc_free(doc[i]);
2270    }
2271
2272    // free all resources
2273    fdb_shutdown();
2274
2275    memleak_end();
2276
2277    sprintf(bodybuf, "snapshot markers in file test %s", multi_kv ?
2278                                                        "multiple kv mode:"
2279                                                      : "single kv mode:");
2280    TEST_RESULT(bodybuf);
2281}
2282
2283void snapshot_without_seqtree(bool multi_kv)
2284{
2285    TEST_INIT();
2286
2287    memleak_start();
2288
2289    int i, j, r;
2290    int n = 10;
2291    int num_commits = 3, num_kvs = 3;
2292    fdb_file_handle *dbfile;
2293    fdb_kvs_handle **db = alca(fdb_kvs_handle *, num_kvs);
2294    fdb_doc **doc = alca(fdb_doc*, n);
2295    fdb_status status;
2296    fdb_snapshot_info_t *markers;
2297    uint64_t num_markers;
2298
2299    char keybuf[256], metabuf[256], bodybuf[256];
2300    char kv_name[8];
2301
2302    fdb_config fconfig = fdb_get_default_config();
2303    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2304    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2305    fconfig.multi_kv_instances = multi_kv;
2306    // for creating the strict number of snapshots, disable block reusing
2307    fconfig.block_reusing_threshold = 0;
2308    fconfig.seqtree_opt = FDB_SEQTREE_NOT_USE;
2309
2310    // remove previous mvcc_test files
2311    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2312    (void)r;
2313
2314    // open db
2315    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2316    if (multi_kv) {
2317        for (r = 0; r < num_kvs; ++r) {
2318            sprintf(kv_name, "kv%d", r);
2319            fdb_kvs_open(dbfile, &db[r], kv_name, &kvs_config);
2320        }
2321    } else {
2322        num_kvs = 1;
2323        fdb_kvs_open_default(dbfile, &db[0], &kvs_config);
2324    }
2325
2326    // Load documents mulitple times
2327    for (j=0; j < num_commits; ++j) {
2328        for (i=0; i<n; i++){
2329            sprintf(keybuf, "key%d", i);
2330            sprintf(metabuf, "meta-%d-%d", j, i);
2331            sprintf(bodybuf, "body-%d-%d", j, i);
2332            fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2333                           (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2334            for (r = 0; r < num_kvs; ++r) {
2335                fdb_set(db[r], doc[i]);
2336            }
2337            fdb_doc_free(doc[i]);
2338        }
2339        if (j % 2 == 0) {
2340            fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2341        } else {
2342            fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2343        }
2344    }
2345
2346    status = fdb_get_all_snap_markers(dbfile, &markers, &num_markers);
2347    TEST_CHK(status == FDB_RESULT_SUCCESS);
2348
2349    if (!multi_kv) {
2350        TEST_CHK(num_markers == 3);
2351        for (r = 0; r < num_kvs; ++r) {
2352            TEST_CHK(markers[r].num_kvs_markers == 1);
2353            TEST_CHK(markers[r].kvs_markers[0].seqnum
2354                     == (fdb_seqnum_t) (n * (num_commits - r)));
2355        }
2356    } else {
2357        TEST_CHK(num_markers == 6);
2358        for (r = 0; r < num_kvs; ++r) {
2359            TEST_CHK(markers[r].num_kvs_markers == num_kvs);
2360            for (i = 0; i < num_kvs; ++i) {
2361                TEST_CHK(markers[r].kvs_markers[i].seqnum
2362                         == (fdb_seqnum_t) (n * (num_commits - r)));
2363                sprintf(kv_name, "kv%d", i);
2364                TEST_CMP(markers[r].kvs_markers[i].kv_store_name, kv_name, 3);
2365            }
2366        }
2367    }
2368
2369    fdb_kvs_handle *snap_db;
2370    fdb_iterator *iterator;
2371    fdb_doc *rdoc = NULL;
2372    // Open the snapshot with the snapshot markers from the second commit
2373    for (r = 0; r < num_kvs; ++r) {
2374        status = fdb_snapshot_open(db[r], &snap_db, markers[1].kvs_markers[r].seqnum);
2375        TEST_CHK(status == FDB_RESULT_SUCCESS);
2376
2377        // Verify all the keys in the snapshot
2378        status = fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
2379        TEST_CHK(status == FDB_RESULT_SUCCESS);
2380        i=0;
2381        do {
2382            status = fdb_iterator_get(iterator, &rdoc);
2383            TEST_CHK(status == FDB_RESULT_SUCCESS);
2384            sprintf(keybuf, "key%d", i);
2385            sprintf(metabuf, "meta-1-%d", i);
2386            sprintf(bodybuf, "body-1-%d", i);
2387            TEST_CMP(rdoc->key, keybuf, rdoc->keylen);
2388            TEST_CMP(rdoc->meta, metabuf, rdoc->metalen);
2389            TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2390            fdb_doc_free(rdoc);
2391            rdoc = NULL;
2392            i++;
2393        } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
2394        TEST_CHK(i==n);
2395        status = fdb_iterator_close(iterator);
2396        TEST_CHK(status == FDB_RESULT_SUCCESS);
2397
2398        // Verify that a sequence-based iteration is not allowed.
2399        status = fdb_iterator_sequence_init(snap_db, &iterator, 0, 0, FDB_ITR_NONE);
2400        TEST_CHK(status != FDB_RESULT_SUCCESS);
2401
2402        status = fdb_kvs_close(snap_db);
2403        TEST_CHK(status == FDB_RESULT_SUCCESS);
2404    }
2405
2406    status = fdb_free_snap_markers(markers, num_markers);
2407    TEST_CHK(status == FDB_RESULT_SUCCESS);
2408
2409    // close db file
2410    fdb_close(dbfile);
2411
2412    // free all resources
2413    fdb_shutdown();
2414
2415    memleak_end();
2416
2417    sprintf(bodybuf, "snapshot without seqtree in %s",
2418            multi_kv ? "multiple kv mode:" : "single kv mode:");
2419    TEST_RESULT(bodybuf);
2420}
2421
2422void snapshot_with_deletes_test()
2423{
2424    TEST_INIT();
2425
2426    memleak_start();
2427
2428    int i, r;
2429    int n = 10;
2430    fdb_file_handle *dbfile;
2431    fdb_kvs_handle *db;
2432    fdb_kvs_handle *snap_db;
2433    fdb_doc **doc = alca(fdb_doc*, n);
2434    fdb_doc *rdoc = NULL;
2435    fdb_kvs_info kvs_info;
2436    fdb_status status;
2437    fdb_iterator *iterator;
2438
2439    char keybuf[256], metabuf[256], bodybuf[256];
2440
2441    // remove previous mvcc_test files
2442    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2443    (void)r;
2444
2445    fdb_config fconfig = fdb_get_default_config();
2446    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2447    fconfig.buffercache_size = 0;
2448
2449    // remove previous mvcc_test files
2450    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2451    (void)r;
2452
2453    // open db
2454    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2455    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2456    TEST_CHK(status == FDB_RESULT_SUCCESS);
2457
2458    status = fdb_set_log_callback(db, logCallbackFunc,
2459                                  (void *) "snapshot_with_deletes_test");
2460    TEST_CHK(status == FDB_RESULT_SUCCESS);
2461
2462    // ------- Setup test ----------------------------------
2463    // insert even documents into main index
2464    for (i=0; i<n; i = i + 2){
2465        sprintf(keybuf, "key%d", i);
2466        sprintf(metabuf, "meta%d", i);
2467        sprintf(bodybuf, "body%d", i);
2468        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2469            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2470        fdb_set(db, doc[i]);
2471    }
2472
2473    i = 7;
2474    sprintf(keybuf, "key%d", i);
2475    fdb_set_kv(db, keybuf, strlen(keybuf), NULL, 0);
2476
2477    // commit with a manual WAL flush (these docs go into HB-trie)
2478    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2479
2480    // let odd documents be in WAL section
2481    for (i = 1; i < n; i = i + 2){
2482        sprintf(keybuf, "key%d", i);
2483        sprintf(metabuf, "meta%d", i);
2484        sprintf(bodybuf, "body%d", i);
2485        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2486            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2487        fdb_set(db, doc[i]);
2488    }
2489
2490    // Delete WAL doc 7
2491    i = 7;
2492    fdb_del(db, doc[i]);
2493
2494    // Create an iterator on the live DB over full range
2495    fdb_iterator_init(db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
2496
2497    // attempt to seek to the deleted key7
2498    status = fdb_iterator_seek(iterator, doc[i]->key, doc[i]->keylen,
2499                               FDB_ITR_SEEK_HIGHER);
2500    TEST_CHK(status == FDB_RESULT_SUCCESS);
2501    status = fdb_iterator_get(iterator, &rdoc);
2502    TEST_CHK(status == FDB_RESULT_SUCCESS);
2503
2504    i = 8; // The next higher document must be returned
2505    TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
2506    TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
2507    TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
2508
2509    fdb_doc_free(rdoc);
2510    rdoc = NULL;
2511
2512    fdb_iterator_close(iterator);
2513
2514    // Open an in-memory snapshot over live DB
2515    status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
2516    TEST_CHK(status == FDB_RESULT_SUCCESS);
2517
2518    // check snapshot's sequence number
2519    fdb_get_kvs_info(snap_db, &kvs_info);
2520    TEST_CHK(kvs_info.last_seqnum == (fdb_seqnum_t)n+2);
2521    TEST_CHK(kvs_info.deleted_count == 1);
2522
2523    // re-create an iterator on the snapshot for full range
2524    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
2525
2526    i = 7;
2527    // attempt to seek to the deleted key7
2528    status = fdb_iterator_seek(iterator, doc[i]->key, doc[i]->keylen,
2529                               FDB_ITR_SEEK_HIGHER);
2530    TEST_CHK(status == FDB_RESULT_SUCCESS);
2531    status = fdb_iterator_get(iterator, &rdoc);
2532    TEST_CHK(status == FDB_RESULT_SUCCESS);
2533
2534    i = 8; // The next higher document must be returned
2535    TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
2536    TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
2537    TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
2538
2539    fdb_doc_free(rdoc);
2540    rdoc = NULL;
2541
2542    fdb_iterator_close(iterator);
2543
2544    // close db handle
2545    fdb_kvs_close(db);
2546    // close snapshot handle
2547    fdb_kvs_close(snap_db);
2548    // close db file
2549    fdb_close(dbfile);
2550
2551    // free all documents
2552    for (i=0;i<n;++i){
2553        fdb_doc_free(doc[i]);
2554    }
2555
2556    // free all resources
2557    fdb_shutdown();
2558
2559    memleak_end();
2560
2561    TEST_RESULT("snapshot with deletes test");
2562}
2563
2564void rollback_forward_seqnum()
2565{
2566
2567    TEST_INIT();
2568    memleak_start();
2569
2570    int r;
2571    int i, n=100;
2572    int rb1_seqnum, rb2_seqnum;
2573    char keybuf[256];
2574    char setop[3];
2575    fdb_file_handle *dbfile;
2576    fdb_iterator *it;
2577    fdb_kvs_handle *kv1, *mirror_kv1;
2578    fdb_kvs_info info;
2579    fdb_config fconfig = fdb_get_default_config();
2580    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2581    fdb_doc **doc = alca(fdb_doc*, n+1);
2582    fdb_doc *rdoc = NULL;
2583    fdb_status status;
2584    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2585    (void)r;
2586
2587    fconfig.wal_threshold = n;
2588    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
2589    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2590    fconfig.compaction_threshold = 0;
2591    fconfig.purging_interval = 5;
2592    fconfig.block_reusing_threshold = 0;
2593
2594    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2595    fdb_kvs_open(dbfile, &kv1, "kv1", &kvs_config);
2596    fdb_kvs_open(dbfile, &mirror_kv1, NULL, &kvs_config);
2597
2598
2599    // set n docs within both dbs
2600    for(i=0;i<=n;++i){
2601        sprintf(keybuf, "key%d", i);
2602        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2603            NULL, 0, NULL, 0);
2604        fdb_set(kv1, doc[i]);
2605        fdb_set_kv(mirror_kv1, keybuf, strlen(keybuf), setop, 3);
2606    } // last set should have caused a wal flush
2607
2608    fdb_del(kv1, doc[n]);
2609
2610    // commit and save seqnum1
2611    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2612    fdb_get_kvs_info(kv1, &info);
2613    rb1_seqnum = info.last_seqnum;
2614
2615    // delete all docs in kv1
2616    for(i=0;i<n;++i){
2617        fdb_del(kv1, doc[i]);
2618    }
2619
2620    // commit and save seqnum2
2621    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2622    fdb_get_kvs_info(kv1, &info);
2623    rb2_seqnum = info.last_seqnum;
2624
2625    // sets again
2626    for(i=0;i<n;++i){
2627        doc[i]->deleted = false;
2628        fdb_set(kv1, doc[i]);
2629    }
2630
2631    // commit
2632    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2633
2634    // rollback to first seqnum
2635    status = fdb_rollback(&kv1, rb1_seqnum);
2636    TEST_CHK(status == FDB_RESULT_SUCCESS);
2637
2638    fdb_get_kvs_info(kv1, &info);
2639    TEST_CHK(info.deleted_count == 1);
2640
2641    // rollback to second seqnum
2642    status = fdb_rollback(&kv1, rb2_seqnum);
2643    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
2644
2645    status = fdb_iterator_sequence_init(mirror_kv1, &it, 0, 0, FDB_ITR_NONE);
2646    TEST_CHK(status == FDB_RESULT_SUCCESS);
2647
2648    do {
2649        status = fdb_iterator_get(it, &rdoc);
2650        TEST_CHK(status == FDB_RESULT_SUCCESS);
2651        if (rdoc->seqnum != (uint64_t)n+1) {
2652            status = fdb_get_metaonly_byseq(kv1, rdoc);
2653            TEST_CHK(status == FDB_RESULT_SUCCESS);
2654            TEST_CHK(rdoc->deleted == false);
2655        } else {
2656            status = fdb_get_metaonly(kv1, rdoc);
2657            TEST_CHK(status == FDB_RESULT_SUCCESS);
2658            TEST_CHK(rdoc->deleted == true);
2659        }
2660        fdb_doc_free(rdoc);
2661        rdoc = NULL;
2662    } while(fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
2663
2664    for (i=0;i<=n;++i){
2665        fdb_doc_free(doc[i]);
2666    }
2667    fdb_iterator_close(it);
2668    fdb_kvs_close(kv1);
2669    fdb_close(dbfile);
2670    fdb_shutdown();
2671    memleak_end();
2672
2673    TEST_RESULT("rollback forward seqnum");
2674}
2675
2676void rollback_test(bool multi_kv)
2677{
2678    TEST_INIT();
2679
2680    memleak_start();
2681
2682    int i, r;
2683    int n = 20;
2684    int count;
2685    fdb_file_handle *dbfile, *dbfile_txn;
2686    fdb_kvs_handle *db, *db_txn;
2687    fdb_seqnum_t rollback_seq;
2688    fdb_doc **doc = alca(fdb_doc*, n);
2689    fdb_doc *rdoc = NULL;
2690    fdb_kvs_info kvs_info;
2691    fdb_status status;
2692    fdb_iterator *iterator;
2693
2694    char keybuf[256], metabuf[256], bodybuf[256];
2695
2696    // remove previous mvcc_test files
2697    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2698    (void)r;
2699
2700    fdb_config fconfig = fdb_get_default_config();
2701    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2702    fconfig.buffercache_size = 0;
2703    fconfig.seqtree_opt = FDB_SEQTREE_USE; // enable seqtree since get_byseq
2704    fconfig.wal_threshold = 1024;
2705    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2706    fconfig.compaction_threshold = 0;
2707    fconfig.multi_kv_instances = multi_kv;
2708
2709    // remove previous mvcc_test files
2710    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2711    (void)r;
2712
2713    // open db
2714    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2715    if (multi_kv) {
2716        fdb_kvs_open_default(dbfile, &db, &kvs_config);
2717    } else {
2718        fdb_kvs_open(dbfile, &db, NULL, &kvs_config);
2719    }
2720
2721   // ------- Setup test ----------------------------------
2722   // insert documents of 0-4
2723    for (i=0; i<n/4; i++){
2724        sprintf(keybuf, "key%d", i);
2725        sprintf(metabuf, "meta%d", i);
2726        sprintf(bodybuf, "body%d", i);
2727        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2728            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2729        fdb_set(db, doc[i]);
2730    }
2731
2732    // commit with a manual WAL flush (these docs go into HB-trie)
2733    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2734
2735    // insert documents from 4 - 9
2736    for (; i < n/2; i++){
2737        sprintf(keybuf, "key%d", i);
2738        sprintf(metabuf, "meta%d", i);
2739        sprintf(bodybuf, "body%d", i);
2740        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2741            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2742        fdb_set(db, doc[i]);
2743    }
2744
2745    // commit again without a WAL flush
2746    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2747
2748    // ROLLBACK POINT: pick up sequence number of a commit without a WAL flush
2749    rollback_seq = doc[i-1]->seqnum;
2750
2751    // insert documents from 10-14 into HB-trie
2752    for (; i < (n/2 + n/4); i++){
2753        sprintf(keybuf, "key%d", i);
2754        sprintf(metabuf, "meta%d", i);
2755        sprintf(bodybuf, "body%d", i);
2756        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2757            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2758        fdb_set(db, doc[i]);
2759    }
2760    // manually flush WAL & commit
2761    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2762
2763    // insert documents from 15 - 19 on file into the WAL
2764    for (; i < n; i++){
2765        sprintf(keybuf, "key%d", i);
2766        sprintf(metabuf, "meta%d", i);
2767        sprintf(bodybuf, "body%d", i);
2768        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2769            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2770        fdb_set(db, doc[i]);
2771    }
2772    // commit without a WAL flush
2773    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2774
2775    status = fdb_set_log_callback(db, logCallbackFunc,
2776                                  (void *) "rollback_test");
2777    TEST_CHK(status == FDB_RESULT_SUCCESS);
2778
2779    // ---------- Rollback tests begin -----------------------
2780    // We have DB file with 5 HB-trie docs, 5 unflushed WAL docs occuring twice
2781    // Attempt to rollback to out-of-range marker..
2782    status = fdb_rollback(&db, 999999);
2783    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
2784
2785    // Open another handle & begin transaction
2786    fdb_open(&dbfile_txn, "./mvcc_test1", &fconfig);
2787    if (multi_kv) {
2788        fdb_kvs_open_default(dbfile_txn, &db_txn, &kvs_config);
2789    } else {
2790        fdb_kvs_open(dbfile_txn, &db_txn, NULL, &kvs_config);
2791    }
2792    fdb_begin_transaction(dbfile_txn, FDB_ISOLATION_READ_COMMITTED);
2793    // Attempt to rollback while the transaction is active
2794    status =  fdb_rollback(&db, rollback_seq);
2795    // Must fail
2796    TEST_CHK(status == FDB_RESULT_FAIL_BY_TRANSACTION);
2797    fdb_abort_transaction(dbfile_txn);
2798    fdb_kvs_close(db_txn);
2799    fdb_close(dbfile_txn);
2800
2801    // Rollback to saved marker from above
2802    status = fdb_rollback(&db, rollback_seq);
2803    TEST_CHK(status == FDB_RESULT_SUCCESS);
2804
2805    // check handle's sequence number
2806    fdb_get_kvs_info(db, &kvs_info);
2807    TEST_CHK(kvs_info.last_seqnum == rollback_seq);
2808
2809    // Modify an item and update into the rollbacked file..
2810    i = n/2;
2811    *(char *)doc[i]->body = 'B';
2812    fdb_set(db, doc[i]);
2813    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2814
2815    // create an iterator on the rollback for full range
2816    fdb_iterator_sequence_init(db, &iterator, 0, 0, FDB_ITR_NONE);
2817
2818    // repeat until fail
2819    i=0;
2820    count=0;
2821    do {
2822        status = fdb_iterator_get(iterator, &rdoc);
2823        TEST_CHK(status == FDB_RESULT_SUCCESS);
2824
2825        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
2826        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
2827        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
2828
2829        fdb_doc_free(rdoc);
2830        rdoc = NULL;
2831        i ++;
2832        count++;
2833    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
2834
2835    TEST_CHK(count==n/2 + 1); // Items from first half and newly set item
2836
2837    fdb_iterator_close(iterator);
2838
2839    // close db file
2840    fdb_kvs_close(db);
2841    fdb_close(dbfile);
2842
2843    // free all documents
2844    for (i=0;i<n;++i){
2845        fdb_doc_free(doc[i]);
2846    }
2847
2848    // free all resources
2849    fdb_shutdown();
2850
2851    memleak_end();
2852
2853    sprintf(bodybuf, "rollback test %s", multi_kv ? "multiple kv mode:"
2854                                                  : "single kv mode:");
2855    TEST_RESULT(bodybuf);
2856}
2857
2858
2859
2860void rollback_and_snapshot_test()
2861{
2862    TEST_INIT();
2863
2864    memleak_start();
2865
2866    fdb_seqnum_t seqnum, rollback_seqnum;
2867    fdb_kvs_info kvs_info;
2868    fdb_status status;
2869    fdb_config config;
2870    fdb_kvs_config kvs_config;
2871    fdb_file_handle *dbfile;
2872    fdb_kvs_handle *db,  *snapshot;
2873    int r;
2874
2875    // remove previous mvcc_test files
2876    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2877    (void)r;
2878
2879    // MB-12530 open db
2880    config = fdb_get_default_config();
2881    kvs_config = fdb_get_default_kvs_config();
2882    status = fdb_open(&dbfile, "mvcc_test", &config);
2883    TEST_CHK(status == FDB_RESULT_SUCCESS);
2884    status = fdb_kvs_open(dbfile, &db, NULL, &kvs_config);
2885    TEST_CHK(status == FDB_RESULT_SUCCESS);
2886
2887    // 2. Create Key 'a' and Commit
2888    status = fdb_set_kv(db, (void *) "a", 1, (void *)"val-a", 5);
2889    TEST_CHK(status == FDB_RESULT_SUCCESS);
2890    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2891    TEST_CHK(status == FDB_RESULT_SUCCESS);
2892
2893    status = fdb_get_kvs_info(db, &kvs_info);
2894    TEST_CHK(status == FDB_RESULT_SUCCESS);
2895
2896    // 3. Create Key 'b' and Commit
2897    status = fdb_set_kv(db, (void *)"b", 1, (void *)"val-b", 5);
2898    TEST_CHK(status == FDB_RESULT_SUCCESS);
2899    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2900    TEST_CHK(status == FDB_RESULT_SUCCESS);
2901
2902    status = fdb_get_kvs_info(db, &kvs_info);
2903    TEST_CHK(status == FDB_RESULT_SUCCESS);
2904    seqnum = kvs_info.last_seqnum;
2905
2906    // 4.  Remember this as our rollback point
2907    rollback_seqnum = seqnum;
2908
2909    // 5. Create Key 'c' and Commit
2910    status = fdb_set_kv(db, (void *)"c", 1,(void *) "val-c", 5);
2911    TEST_CHK(status == FDB_RESULT_SUCCESS);
2912    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2913    TEST_CHK(status == FDB_RESULT_SUCCESS);
2914
2915    status = fdb_get_kvs_info(db, &kvs_info);
2916    TEST_CHK(status == FDB_RESULT_SUCCESS);
2917
2918    // 6. Rollback to rollback point (seq 2)
2919    status = fdb_rollback(&db, rollback_seqnum);
2920    TEST_CHK(status == FDB_RESULT_SUCCESS);
2921
2922    status = fdb_get_kvs_info(db, &kvs_info);
2923    TEST_CHK(status == FDB_RESULT_SUCCESS);
2924    seqnum = kvs_info.last_seqnum;
2925
2926    // 7. Verify that Key 'c' is not found
2927    void *val;
2928    size_t vallen;
2929    status = fdb_get_kv(db, (void *)"c", 1, &val, &vallen);
2930    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
2931
2932    // 8. Open a snapshot at the same point
2933    status = fdb_snapshot_open(db, &snapshot, seqnum);
2934    TEST_CHK(status == FDB_RESULT_SUCCESS);
2935
2936    // 9. Verify that Key 'c' is not found
2937    status = fdb_get_kv(snapshot, (void *)"c", 1, &val, &vallen);
2938    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
2939
2940    // close the snapshot db
2941    fdb_kvs_close(snapshot);
2942
2943    // close db file
2944    fdb_close(dbfile);
2945
2946    // free all resources
2947    fdb_shutdown();
2948
2949    memleak_end();
2950
2951    TEST_RESULT("rollback and snapshot test");
2952}
2953
2954void rollback_ncommits()
2955{
2956
2957    TEST_INIT();
2958    memleak_start();
2959
2960    int r;
2961    int i, j, n=100;
2962    int ncommits=10;
2963    char keybuf[256];
2964    fdb_file_handle *dbfile;
2965    fdb_kvs_handle *kv1, *kv2;
2966    fdb_kvs_info info;
2967    fdb_config fconfig = fdb_get_default_config();
2968    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2969    fdb_status status;
2970    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2971    (void)r;
2972
2973    fconfig.wal_threshold = 1024;
2974    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2975    fconfig.compaction_threshold = 0;
2976    fconfig.block_reusing_threshold = 0;
2977
2978    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2979    fdb_kvs_open(dbfile, &kv1, "kv1", &kvs_config);
2980    fdb_kvs_open(dbfile, &kv2, NULL, &kvs_config);
2981
2982
2983    for(j=0;j<ncommits;++j){
2984
2985        // set n docs per commit
2986        for(i=0;i<n;++i){
2987            sprintf(keybuf, "key%02d%03d", j, i);
2988            fdb_set_kv(kv1, keybuf, strlen(keybuf), NULL, 0);
2989            fdb_set_kv(kv2, keybuf, strlen(keybuf), NULL, 0);
2990        }
2991        // alternate commit pattern
2992        if((j % 2) == 0){
2993            fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2994        } else {
2995            fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2996        }
2997
2998        // doc_count should match seqnum since they are unique
2999        fdb_get_kvs_info(kv1, &info);
3000        TEST_CHK(info.doc_count == info.last_seqnum);
3001    }
3002
3003    // iteratively rollback 5 commits
3004     for(j=ncommits;j>0;--j){
3005        status = fdb_rollback(&kv1, j*n);
3006        TEST_CHK(status == FDB_RESULT_SUCCESS);
3007
3008        // check rollback doc_count
3009        fdb_get_kvs_info(kv1, &info);
3010        TEST_CHK(info.doc_count == info.last_seqnum);
3011    }
3012
3013    fdb_kvs_close(kv1);
3014    fdb_close(dbfile);
3015    fdb_shutdown();
3016    memleak_end();
3017
3018    TEST_RESULT("rollback n commits");
3019}
3020
3021void transaction_test()
3022{
3023    TEST_INIT();
3024
3025    memleak_start();
3026
3027    int i, r;
3028    int n = 10;
3029    fdb_file_handle *dbfile, *dbfile_txn1, *dbfile_txn2, *dbfile_txn3;
3030    fdb_kvs_handle *db, *db_txn1, *db_txn2, *db_txn3;
3031    fdb_doc **doc = alca(fdb_doc*, n);
3032    fdb_doc *rdoc;
3033    fdb_status status;
3034
3035    char keybuf[256], metabuf[256], bodybuf[256];
3036
3037    // remove previous mvcc_test files
3038    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
3039    (void)r;
3040
3041    fdb_config fconfig = fdb_get_default_config();
3042    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3043    fconfig.buffercache_size = 0;
3044    fconfig.wal_threshold = 1024;
3045    fconfig.flags = FDB_OPEN_FLAG_CREATE;
3046    fconfig.purging_interval = 0;
3047    fconfig.compaction_threshold = 0;
3048
3049    // open db
3050    fdb_open(&dbfile, "mvcc_test1", &fconfig);
3051    fdb_kvs_open_default(dbfile, &db, &kvs_config);
3052    status = fdb_set_log_callback(db, logCallbackFunc,
3053                                  (void *) "transaction_test");
3054    TEST_CHK(status == FDB_RESULT_SUCCESS);
3055
3056    // open db and begin transactions
3057    fdb_open(&dbfile_txn1, "mvcc_test1", &fconfig);
3058    fdb_open(&dbfile_txn2, "mvcc_test1", &fconfig);
3059    fdb_kvs_open_default(dbfile_txn1, &db_txn1, &kvs_config);
3060    fdb_kvs_open_default(dbfile_txn2, &db_txn2, &kvs_config);
3061    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
3062    fdb_begin_transaction(dbfile_txn2, FDB_ISOLATION_READ_COMMITTED);
3063
3064    // insert half docs into txn1
3065    for (i=0;i<n/2;++i){
3066        sprintf(keybuf, "key%d", i);
3067        sprintf(metabuf, "meta%d", i);
3068        sprintf(bodybuf, "body%d", i);
3069        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3070                                (void*)metabuf, strlen(metabuf),
3071                                (void*)bodybuf, strlen(bodybuf));
3072        fdb_set(db_txn1, doc[i]);
3073    }
3074
3075    // insert other half docs into txn2
3076    for (i=n/2;i<n;++i){
3077        sprintf(keybuf, "key%d", i);
3078        sprintf(metabuf, "meta%d", i);
3079        sprintf(bodybuf, "body%d", i);
3080        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3081                                (void*)metabuf, strlen(metabuf),
3082                                (void*)bodybuf, strlen(bodybuf));
3083        fdb_set(db_txn2, doc[i]);
3084    }
3085
3086    // uncommitted docs should not be read by the other transaction that
3087    // doesn't allow uncommitted reads.
3088    for (i=0;i<n;++i){
3089        sprintf(keybuf, "key%d", i);
3090        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3091        status = fdb_get(db_txn1, rdoc);
3092        if (i<n/2) {
3093            TEST_CHK(status == FDB_RESULT_SUCCESS);
3094        } else {
3095            TEST_CHK(status != FDB_RESULT_SUCCESS);
3096        }
3097        fdb_doc_free(rdoc);
3098    }
3099
3100    // uncommitted docs can be read by the transaction that allows uncommitted reads.
3101    fdb_open(&dbfile_txn3, "mvcc_test1", &fconfig);
3102    fdb_kvs_open_default(dbfile_txn3, &db_txn3, &kvs_config);
3103    fdb_begin_transaction(dbfile_txn3, FDB_ISOLATION_READ_UNCOMMITTED);
3104    for (i=0;i<n;++i){
3105        sprintf(keybuf, "key%d", i);
3106        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3107        status = fdb_get(db_txn3, rdoc);
3108        TEST_CHK(status == FDB_RESULT_SUCCESS);
3109        fdb_doc_free(rdoc);
3110    }
3111    fdb_end_transaction(dbfile_txn3, FDB_COMMIT_NORMAL);
3112    fdb_close(dbfile_txn3);
3113
3114    // commit and end txn1
3115    fdb_end_transaction(dbfile_txn1, FDB_COMMIT_NORMAL);
3116
3117    // uncommitted docs should not be read generally
3118    for (i=0;i<n;++i){
3119        sprintf(keybuf, "key%d", i);
3120        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3121        status = fdb_get(db, rdoc);
3122        if (i<n/2) {
3123            TEST_CHK(status == FDB_RESULT_SUCCESS);
3124        } else {
3125            TEST_CHK(status != FDB_RESULT_SUCCESS);
3126        }
3127        fdb_doc_free(rdoc);
3128    }
3129
3130    // read uncommitted docs using the same transaction
3131    for (i=0;i<n;++i){
3132        sprintf(keybuf, "key%d", i);
3133        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3134        status = fdb_get(db_txn2, rdoc);
3135        TEST_CHK(status == FDB_RESULT_SUCCESS);
3136        fdb_doc_free(rdoc);
3137    }
3138
3139    // abort txn2
3140    fdb_abort_transaction(dbfile_txn2);
3141
3142    // close & re-open db file
3143    fdb_close(dbfile);
3144    fdb_open(&dbfile, "mvcc_test1", &fconfig);
3145    fdb_kvs_open_default(dbfile, &db, &kvs_config);
3146    status = fdb_set_log_callback(db, logCallbackFunc,
3147                                  (void *) "transaction_test");
3148    TEST_CHK(status == FDB_RESULT_SUCCESS);
3149
3150    // uncommitted docs should not be read after reopening the file either
3151    for (i=0;i<n;++i){
3152        sprintf(keybuf, "key%d", i);
3153        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3154        status = fdb_get(db, rdoc);
3155        if (i<n/2) {
3156            TEST_CHK(status == FDB_RESULT_SUCCESS);
3157        } else {
3158            TEST_CHK(status != FDB_RESULT_SUCCESS);
3159        }
3160        fdb_doc_free(rdoc);
3161    }
3162
3163    // insert other half docs & commit
3164    for (i=n/2;i<n;++i){
3165        fdb_set(db, doc[i]);
3166    }
3167    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3168
3169    // now all docs can be read generally
3170    for (i=0;i<n;++i){
3171        sprintf(keybuf, "key%d", i);
3172        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3173        status = fdb_get(db, rdoc);
3174        TEST_CHK(status == FDB_RESULT_SUCCESS);
3175        fdb_doc_free(rdoc);
3176    }
3177
3178    // begin transactions
3179    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
3180    fdb_begin_transaction(dbfile_txn2, FDB_ISOLATION_READ_COMMITTED);
3181
3182    // concurrently update docs
3183    for (i=0;i<n;++i){
3184        sprintf(keybuf, "key%d", i);
3185        sprintf(metabuf, "meta%d", i);
3186        sprintf(bodybuf, "body%d_txn1", i);
3187        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf),
3188                                (void*)metabuf, strlen(metabuf),
3189                                (void*)bodybuf, strlen(bodybuf));
3190        fdb_set(db_txn1, rdoc);
3191        fdb_doc_free(rdoc);
3192
3193        sprintf(bodybuf, "body%d_txn2", i);
3194        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf),
3195                                (void*)metabuf, strlen(metabuf),
3196                                (void*)bodybuf, strlen(bodybuf));
3197        fdb_set(db_txn2, rdoc);
3198        fdb_doc_free(rdoc);
3199    }
3200
3201    // retrieve check
3202    for (i=0;i<n;++i){
3203        // general retrieval
3204        sprintf(keybuf, "key%d", i);
3205        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3206        status = fdb_get(db, rdoc);
3207        TEST_CHK(status == FDB_RESULT_SUCCESS);
3208        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
3209        fdb_doc_free(rdoc);
3210
3211        // get from txn1
3212        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3213        status = fdb_get(db_txn1, rdoc);
3214        TEST_CHK(status == FDB_RESULT_SUCCESS);
3215        sprintf(bodybuf, "body%d_txn1", i);
3216        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3217        fdb_doc_free(rdoc);
3218
3219        // get from txn2
3220        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3221        status = fdb_get(db_txn2, rdoc);
3222        TEST_CHK(status == FDB_RESULT_SUCCESS);
3223        sprintf(bodybuf, "body%d_txn2", i);
3224        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3225        fdb_doc_free(rdoc);
3226    }
3227
3228    // commit txn2 & retrieve check
3229    fdb_end_transaction(dbfile_txn2, FDB_COMMIT_NORMAL);
3230    for (i=0;i<n;++i){
3231        // general retrieval
3232        sprintf(keybuf, "key%d", i);
3233        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3234        status = fdb_get(db, rdoc);
3235        TEST_CHK(status == FDB_RESULT_SUCCESS);
3236        sprintf(bodybuf, "body%d_txn2", i);
3237        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3238        fdb_doc_free(rdoc);
3239
3240        // get from txn1
3241        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3242        status = fdb_get(db_txn1, rdoc);
3243        TEST_CHK(status == FDB_RESULT_SUCCESS);
3244        sprintf(bodybuf, "body%d_txn1", i);
3245        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3246        fdb_doc_free(rdoc);
3247    }
3248
3249    // commit txn1 & retrieve check
3250    fdb_end_transaction(dbfile_txn1, FDB_COMMIT_NORMAL);
3251    for (i=0;i<n;++i){
3252        // general retrieval
3253        sprintf(keybuf, "key%d", i);
3254        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3255        status = fdb_get(db, rdoc);
3256        TEST_CHK(status == FDB_RESULT_SUCCESS);
3257        sprintf(bodybuf, "body%d_txn1", i);
3258        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3259        fdb_doc_free(rdoc);
3260    }
3261
3262    // begin new transaction
3263    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
3264    // update doc#5
3265    i = 5;
3266    sprintf(keybuf, "key%d", i);
3267    sprintf(metabuf, "meta%d", i);
3268    sprintf(bodybuf, "body%d_before_compaction", i);
3269    fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf),
3270                            (void*)metabuf, strlen(metabuf),
3271                            (void*)bodybuf, strlen(bodybuf));
3272    fdb_set(db_txn1, rdoc);
3273    fdb_doc_free(rdoc);
3274
3275    // do compaction
3276    fdb_compact(dbfile, "mvcc_test2");
3277
3278    // retrieve doc#5
3279    // using txn1
3280    fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3281    status = fdb_get(db_txn1, rdoc);
3282    TEST_CHK(status == FDB_RESULT_SUCCESS);
3283    TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3284    fdb_doc_free(rdoc);
3285
3286    // general retrieval
3287    fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3288    status = fdb_get(db, rdoc);
3289    TEST_CHK(status == FDB_RESULT_SUCCESS);
3290    sprintf(bodybuf, "body%d_txn1", i);
3291    TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3292    fdb_doc_free(rdoc);
3293
3294    // commit transaction
3295    fdb_end_transaction(dbfile_txn1, FDB_COMMIT_NORMAL);
3296    // retrieve check
3297    for (i=0;i<n;++i){
3298        // general get
3299        sprintf(keybuf, "key%d", i);
3300        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3301        status = fdb_get(db, rdoc);
3302        TEST_CHK(status == FDB_RESULT_SUCCESS);
3303        if (i != 5) {
3304            sprintf(bodybuf, "body%d_txn1", i);
3305        } else {
3306            sprintf(bodybuf, "body%d_before_compaction", i);
3307        }
3308        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3309        fdb_doc_free(rdoc);
3310    }
3311
3312    // close & re-open db file
3313    fdb_close(dbfile);
3314    fdb_open(&dbfile, "mvcc_test2", &fconfig);
3315    fdb_kvs_open_default(dbfile, &db, &kvs_config);
3316    status = fdb_set_log_callback(db, logCallbackFunc,
3317                                  (void *) "transaction_test");
3318    TEST_CHK(status == FDB_RESULT_SUCCESS);
3319    // retrieve check again
3320    for (i=0;i<n;++i){
3321        // general get
3322        sprintf(keybuf, "key%d", i);
3323        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
3324        status = fdb_get(db, rdoc);
3325        TEST_CHK(status == FDB_RESULT_SUCCESS);
3326        if (i != 5) {
3327            sprintf(bodybuf, "body%d_txn1", i);
3328        } else {
3329            sprintf(bodybuf, "body%d_before_compaction", i);
3330        }
3331        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
3332        fdb_doc_free(rdoc);
3333    }
3334
3335    // close db file
3336    fdb_close(dbfile);
3337    fdb_close(dbfile_txn1);
3338    fdb_close(dbfile_txn2);
3339
3340    // free all documents
3341    for (i=0;i<n;++i){
3342        fdb_doc_free(doc[i]);
3343    }
3344
3345    // free all resources
3346    fdb_shutdown();
3347
3348    memleak_end();
3349
3350    TEST_RESULT("transaction test");
3351}
3352
3353void transaction_simple_api_test()
3354{
3355    TEST_INIT();
3356
3357    memleak_start();
3358
3359    int i, r;
3360    int n = 10;
3361    size_t valuelen;
3362    void *value;
3363    fdb_file_handle *dbfile, *dbfile_txn1, *dbfile_txn2;
3364    fdb_kvs_handle *db, *db_txn1, *db_txn2;
3365    fdb_status status;
3366
3367    char keybuf[256], bodybuf[256];
3368
3369    // remove previous mvcc_test files
3370    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
3371    (void)r;
3372
3373    fdb_config fconfig = fdb_get_default_config();
3374    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3375    fconfig.buffercache_size = 0;
3376    fconfig.wal_threshold = 1024;
3377    fconfig.flags = FDB_OPEN_FLAG_CREATE;
3378    fconfig.purging_interval = 0;
3379    fconfig.compaction_threshold = 0;
3380
3381    // open db
3382    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
3383    fdb_kvs_open_default(dbfile, &db, &kvs_config);
3384    status = fdb_set_log_callback(db, logCallbackFunc,
3385                                  (void *) "transaction_simple_api_test");
3386    TEST_CHK(status == FDB_RESULT_SUCCESS);
3387
3388    // insert key-value pairs
3389    for (i=0;i<n;++i){
3390        sprintf(keybuf, "key%d", i);
3391        sprintf(bodybuf, "body%d", i);
3392        status = fdb_set_kv(db, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3393        TEST_CHK(status == FDB_RESULT_SUCCESS);
3394    }
3395
3396    // commit
3397    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3398
3399    // open db and begin transactions
3400    fdb_open(&dbfile_txn1, "./mvcc_test1", &fconfig);
3401    fdb_open(&dbfile_txn2, "./mvcc_test1", &fconfig);
3402    fdb_kvs_open_default(dbfile_txn1, &db_txn1, &kvs_config);
3403    fdb_kvs_open_default(dbfile_txn2, &db_txn2, &kvs_config);
3404    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
3405    fdb_begin_transaction(dbfile_txn2, FDB_ISOLATION_READ_COMMITTED);
3406
3407    // concurrently update docs
3408    for (i=0;i<n;++i){
3409        sprintf(keybuf, "key%d", i);
3410        sprintf(bodybuf, "body%d_txn1", i);
3411        fdb_set_kv(db_txn1, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3412
3413        sprintf(keybuf, "key%d", i);
3414        sprintf(bodybuf, "body%d_txn2", i);
3415        fdb_set_kv(db_txn2, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3416    }
3417
3418    // retrieve key-value pairs
3419    for (i=0;i<n;++i){
3420        // general retrieval
3421        sprintf(keybuf, "key%d", i);
3422        sprintf(bodybuf, "body%d", i);
3423        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value, &valuelen);
3424        TEST_CHK(status == FDB_RESULT_SUCCESS);
3425        TEST_CMP(value, bodybuf, valuelen);
3426        fdb_free_block(value);
3427
3428        // txn1
3429        sprintf(keybuf, "key%d", i);
3430        sprintf(bodybuf, "body%d_txn1", i);
3431        status = fdb_get_kv(db_txn1, keybuf, strlen(keybuf), &value, &valuelen);
3432        TEST_CHK(status == FDB_RESULT_SUCCESS);
3433        TEST_CMP(value, bodybuf, valuelen);
3434