1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdint.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <unistd.h>
25#endif
26
27#include "libforestdb/forestdb.h"
28#include "test.h"
29
30#include "internal_types.h"
31#include "functional_util.h"
32
33void rollback_secondary_kvs()
34{
35    TEST_INIT();
36    memleak_start();
37
38    int r;
39    void *value_out;
40    size_t valuelen_out;
41
42    fdb_status status;
43    fdb_config fconfig = fdb_get_default_config();
44    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
45    fdb_file_handle *dbfile;
46    fdb_kvs_handle *kv1, *kv2;
47
48    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
49    (void)r;
50
51    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
52    fdb_kvs_open_default(dbfile, &kv1, &kvs_config);
53    fdb_kvs_open_default(dbfile, &kv2, &kvs_config);
54
55    // seq:2
56    status = fdb_set_kv(kv1, (void *) "a", 1, (void *)"val-a", 5);
57    TEST_CHK(status == FDB_RESULT_SUCCESS);
58    status = fdb_set_kv(kv1, (void *) "b", 1, (void *)"val-b", 5);
59    TEST_CHK(status == FDB_RESULT_SUCCESS);
60    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
61
62    // seq:3
63    status = fdb_set_kv(kv1, (void *) "b", 1, (void *)"val-v", 5);
64    TEST_CHK(status == FDB_RESULT_SUCCESS);
65    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
66
67    // seq:4
68    status = fdb_del_kv(kv1, (void *)"a", 1);
69    TEST_CHK(status == FDB_RESULT_SUCCESS);
70    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
71
72    // get 'a' via kv2
73    status = fdb_get_kv(kv2, (void *)"b", 1, &value_out, &valuelen_out);
74    free(value_out);
75    TEST_CHK(status == FDB_RESULT_SUCCESS);
76
77    // rollback seq:3 via kv2
78    status = fdb_rollback(&kv2, 3);
79    TEST_CHK(status == FDB_RESULT_SUCCESS);
80
81    fdb_close(dbfile);
82    fdb_shutdown();
83
84    memleak_end();
85    TEST_RESULT("rollback secondary kv");
86}
87
88
89void multi_version_test()
90{
91    TEST_INIT();
92
93    memleak_start();
94
95    int i, r;
96    int n = 2;
97    fdb_file_handle *dbfile, *dbfile_new;
98    fdb_kvs_handle *db;
99    fdb_kvs_handle *db_new;
100    fdb_doc **doc = alca(fdb_doc*, n);
101    fdb_doc *rdoc;
102    fdb_status status;
103
104    char keybuf[256], metabuf[256], bodybuf[256];
105
106    // remove all previous mvcc_test files
107    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
108    (void)r;
109
110    fdb_config fconfig = fdb_get_default_config();
111    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
112    fconfig.buffercache_size = 1048576;
113    fconfig.wal_threshold = 1024;
114    fconfig.flags = FDB_OPEN_FLAG_CREATE;
115    fconfig.compaction_threshold = 0;
116
117    // open db
118    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
119    fdb_kvs_open_default(dbfile, &db, &kvs_config);
120    status = fdb_set_log_callback(db, logCallbackFunc, (void *) "multi_version_test");
121    TEST_CHK(status == FDB_RESULT_SUCCESS);
122
123    // insert documents
124    for (i=0;i<n;++i){
125        sprintf(keybuf, "key%d", i);
126        sprintf(metabuf, "meta%d", i);
127        sprintf(bodybuf, "body%d", i);
128        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
129            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
130        fdb_set(db, doc[i]);
131    }
132
133    // commit
134    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
135
136    // open same db file using a new handle
137    fdb_open(&dbfile_new, "./mvcc_test1", &fconfig);
138    fdb_kvs_open_default(dbfile_new, &db_new, &kvs_config);
139    status = fdb_set_log_callback(db_new, logCallbackFunc, (void *) "multi_version_test");
140    TEST_CHK(status == FDB_RESULT_SUCCESS);
141
142    // update documents using the old handle
143    for (i=0;i<n;++i){
144        sprintf(metabuf, "meta2%d", i);
145        sprintf(bodybuf, "body2%d", i);
146        fdb_doc_update(&doc[i], (void*)metabuf, strlen(metabuf),
147            (void*)bodybuf, strlen(bodybuf));
148        fdb_set(db, doc[i]);
149    }
150
151    // manually flush WAL and commit using the old handle
152    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
153
154    // retrieve documents using the old handle
155    for (i=0;i<n;++i){
156        // search by key
157        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
158        status = fdb_get(db, rdoc);
159
160        TEST_CHK(status == FDB_RESULT_SUCCESS);
161        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
162        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
163
164        // free result document
165        fdb_doc_free(rdoc);
166    }
167
168    // retrieve documents using the new handle
169    for (i=0;i<n;++i){
170        // search by key
171        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
172        status = fdb_get(db_new, rdoc);
173
174        TEST_CHK(status == FDB_RESULT_SUCCESS);
175        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
176        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
177
178        // free result document
179        fdb_doc_free(rdoc);
180    }
181
182    // close and re-open the new handle
183    fdb_kvs_close(db_new);
184    fdb_close(dbfile_new);
185    fdb_open(&dbfile_new, "./mvcc_test1", &fconfig);
186    fdb_kvs_open_default(dbfile_new, &db_new, &kvs_config);
187    status = fdb_set_log_callback(db_new, logCallbackFunc, (void *) "multi_version_test");
188    TEST_CHK(status == FDB_RESULT_SUCCESS);
189
190    // retrieve documents using the new handle
191    for (i=0;i<n;++i){
192        // search by key
193        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
194        status = fdb_get(db_new, rdoc);
195
196        TEST_CHK(status == FDB_RESULT_SUCCESS);
197        // the new version of data should be read
198        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
199        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
200
201        // free result document
202        fdb_doc_free(rdoc);
203    }
204
205
206    // free all documents
207    for (i=0;i<n;++i){
208        fdb_doc_free(doc[i]);
209    }
210
211    // close db file
212    fdb_kvs_close(db);
213    fdb_kvs_close(db_new);
214    fdb_close(dbfile);
215    fdb_close(dbfile_new);
216
217    // free all resources
218    fdb_shutdown();
219
220    memleak_end();
221
222    TEST_RESULT("multi version test");
223}
224
225void crash_recovery_test(bool walflush)
226{
227    TEST_INIT();
228
229    memleak_start();
230
231    int i, r;
232    int n = 10;
233    fdb_file_handle *dbfile;
234    fdb_kvs_handle *db;
235    fdb_doc **doc = alca(fdb_doc*, n);
236    fdb_doc *rdoc;
237    fdb_file_info file_info;
238    uint64_t bid;
239    const char *test_file = "./mvcc_test2";
240    fdb_status status;
241
242    char keybuf[256], metabuf[256], bodybuf[256];
243
244    // remove previous mvcc_test files
245    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
246    (void)r;
247
248    fdb_config fconfig = fdb_get_default_config();
249    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
250    fconfig.buffercache_size = 0;
251    fconfig.wal_threshold = 1024;
252    fconfig.flags = FDB_OPEN_FLAG_CREATE;
253    fconfig.compaction_threshold = 0;
254
255    // reopen db
256    fdb_open(&dbfile, test_file, &fconfig);
257    fdb_kvs_open_default(dbfile, &db, &kvs_config);
258    status = fdb_set_log_callback(db, logCallbackFunc,
259                                  (void *) "crash_recovery_test");
260    TEST_CHK(status == FDB_RESULT_SUCCESS);
261
262    // insert documents
263    for (i=0;i<n;++i){
264        sprintf(keybuf, "key%d", i);
265        sprintf(metabuf, "meta%d", i);
266        sprintf(bodybuf, "body%d", i);
267        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
268            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
269        fdb_set(db, doc[i]);
270    }
271
272    // commit
273    if(walflush){
274        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
275        TEST_CHK(status == FDB_RESULT_SUCCESS);
276    } else {
277        status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
278        TEST_CHK(status == FDB_RESULT_SUCCESS);
279    }
280
281    status = fdb_get_file_info(dbfile, &file_info);
282    TEST_CHK(status == FDB_RESULT_SUCCESS);
283    bid = file_info.file_size / fconfig.blocksize;
284
285    // close the db
286    fdb_kvs_close(db);
287    fdb_close(dbfile);
288
289    // Shutdown forest db in the middle of the test to simulate crash
290    fdb_shutdown();
291
292    sprintf(bodybuf,
293            "dd if=/dev/zero bs=%d of=%s oseek=%d count=2 >> errorlog.txt",
294            (int)fconfig.blocksize, test_file, (int)bid);
295    // Now append garbage at the end of the file for a few blocks
296    r = system(bodybuf);
297    (void)r;
298    // Write 1024 bytes of non-block aligned garbage to end of file
299    sprintf(bodybuf,
300            "dd if=/dev/zero bs=%d of=%s oseek=%d count=1 >> errorlog.txt",
301            (int)fconfig.blocksize/4, test_file, (int)(bid + 2)*4);
302    r = system(bodybuf);
303    (void)r;
304
305
306    // reopen the same file
307    status = fdb_open(&dbfile, test_file, &fconfig);
308    TEST_CHK(status == FDB_RESULT_SUCCESS);
309    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
310    TEST_CHK(status == FDB_RESULT_SUCCESS);
311    status = fdb_set_log_callback(db, logCallbackFunc,
312                                  (void *) "crash_recovery_test");
313    TEST_CHK(status == FDB_RESULT_SUCCESS);
314
315    // retrieve documents
316    for (i=0;i<n;++i){
317        // search by key
318        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
319        status = fdb_get(db, rdoc);
320
321        TEST_CHK(status == FDB_RESULT_SUCCESS);
322        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
323        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
324
325        // free result document
326        fdb_doc_free(rdoc);
327    }
328
329    // retrieve documents by sequence number
330    for (i=0;i<n;++i){
331        // search by seq
332        fdb_doc_create(&rdoc, NULL, 0, NULL, 0, NULL, 0);
333        rdoc->seqnum = i + 1;
334        status = fdb_get_byseq(db, rdoc);
335
336        TEST_CHK(status == FDB_RESULT_SUCCESS);
337
338        // free result document
339        fdb_doc_free(rdoc);
340    }
341
342    // free all documents
343    for (i=0;i<n;++i){
344        fdb_doc_free(doc[i]);
345    }
346
347    // close db file
348    fdb_kvs_close(db);
349    fdb_close(dbfile);
350
351    // free all resources
352    fdb_shutdown();
353
354    memleak_end();
355
356    if(walflush){
357        TEST_RESULT("crash recovery test - walflush");
358    } else {
359        TEST_RESULT("crash recovery test - normal flush");
360    }
361}
362
363void snapshot_test()
364{
365    TEST_INIT();
366
367    memleak_start();
368
369    int i, r;
370    int n = 20;
371    int count;
372    fdb_file_handle *dbfile;
373    fdb_kvs_handle *db;
374    fdb_kvs_handle *snap_db;
375    fdb_seqnum_t snap_seq;
376    fdb_doc **doc = alca(fdb_doc*, n);
377    fdb_doc *rdoc = NULL;
378    fdb_kvs_info kvs_info;
379    fdb_status status;
380    fdb_iterator *iterator;
381
382    char keybuf[256], metabuf[256], bodybuf[256];
383
384    // remove previous mvcc_test files
385    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
386    (void)r;
387
388    fdb_config fconfig = fdb_get_default_config();
389    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
390    fconfig.buffercache_size = 0;
391    fconfig.wal_threshold = 1024;
392    fconfig.flags = FDB_OPEN_FLAG_CREATE;
393    fconfig.compaction_threshold = 0;
394
395    // remove previous mvcc_test files
396    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
397    (void)r;
398
399    // open db
400    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
401    fdb_kvs_open_default(dbfile, &db, &kvs_config);
402    TEST_CHK(status == FDB_RESULT_SUCCESS);
403
404    // Create a snapshot from an empty database file
405    status = fdb_snapshot_open(db, &snap_db, 0);
406    TEST_CHK(status == FDB_RESULT_SUCCESS);
407    // check if snapshot's sequence number is zero.
408    fdb_get_kvs_info(snap_db, &kvs_info);
409    TEST_CHK(kvs_info.last_seqnum == 0);
410    // create an iterator on the snapshot for full range
411    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
412    // Iterator should not return any items.
413    status = fdb_iterator_get(iterator, &rdoc);
414    TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
415    fdb_iterator_close(iterator);
416    fdb_kvs_close(snap_db);
417
418    // ------- Setup test ----------------------------------
419    // insert documents of 0-4
420    for (i=0; i<n/4; i++){
421        sprintf(keybuf, "key%d", i);
422        sprintf(metabuf, "meta%d", i);
423        sprintf(bodybuf, "body%d", i);
424        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
425            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
426        fdb_set(db, doc[i]);
427    }
428
429    // commit with a manual WAL flush (these docs go into HB-trie)
430    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
431
432    // insert documents from 4 - 8
433    for (; i < n/2 - 1; i++){
434        sprintf(keybuf, "key%d", i);
435        sprintf(metabuf, "meta%d", i);
436        sprintf(bodybuf, "body%d", i);
437        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
438            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
439        fdb_set(db, doc[i]);
440    }
441
442    // We want to create:
443    // |WALFlushHDR|Key-Value1|HDR|Key-Value2|SnapshotHDR|Key-Value1|HDR|
444    // Insert doc 9 with a different value to test duplicate elimination..
445    sprintf(keybuf, "key%d", i);
446    sprintf(metabuf, "meta%d", i);
447    sprintf(bodybuf, "Body%d", i);
448    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
449            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
450    fdb_set(db, doc[i]);
451
452    // commit again without a WAL flush (last doc goes into the AVL tree)
453    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
454
455    // Insert doc 9 now again with expected value..
456    *(char *)doc[i]->body = 'b';
457    fdb_set(db, doc[i]);
458    // commit again without a WAL flush (these documents go into the AVL trees)
459    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
460
461    // TAKE SNAPSHOT: pick up sequence number of a commit without a WAL flush
462    snap_seq = doc[i]->seqnum;
463
464    // Now re-insert doc 9 as another duplicate (only newer sequence number)
465    fdb_set(db, doc[i]);
466    // commit again without a WAL flush (last doc goes into the AVL tree)
467    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
468
469    // insert documents from 10-14 into HB-trie
470    for (++i; i < (n/2 + n/4); i++){
471        sprintf(keybuf, "key%d", i);
472        sprintf(metabuf, "meta%d", i);
473        sprintf(bodybuf, "body%d", i);
474        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
475            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
476        fdb_set(db, doc[i]);
477    }
478    // manually flush WAL & commit
479    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
480
481    status = fdb_set_log_callback(db, logCallbackFunc,
482                                  (void *) "snapshot_test");
483    TEST_CHK(status == FDB_RESULT_SUCCESS);
484    // ---------- Snapshot tests begin -----------------------
485    // Attempt to take snapshot with out-of-range marker..
486    status = fdb_snapshot_open(db, &snap_db, 999999);
487    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
488
489    // Init Snapshot of open file with saved document seqnum as marker
490    status = fdb_snapshot_open(db, &snap_db, snap_seq);
491    TEST_CHK(status == FDB_RESULT_SUCCESS);
492
493    // check snapshot's sequence number
494    fdb_get_kvs_info(snap_db, &kvs_info);
495    TEST_CHK(kvs_info.last_seqnum == snap_seq);
496
497    // insert documents from 15 - 19 on file into the WAL
498    for (; i < n; i++){
499        sprintf(keybuf, "key%d", i);
500        sprintf(metabuf, "meta%d", i);
501        sprintf(bodybuf, "body%d", i);
502        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
503            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
504        fdb_set(db, doc[i]);
505    }
506    // commit without a WAL flush (This WAL must not affect snapshot)
507    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
508
509    // create an iterator on the snapshot for full range
510    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
511
512    // repeat until fail
513    i=0;
514    count=0;
515    do {
516        status = fdb_iterator_get(iterator, &rdoc);
517        TEST_CHK(status == FDB_RESULT_SUCCESS);
518
519        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
520        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
521        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
522
523        fdb_doc_free(rdoc);
524        rdoc = NULL;
525        i ++;
526        count++;
527    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
528
529    TEST_CHK(count==n/2); // Only unique items from the first half
530
531    fdb_iterator_close(iterator);
532
533    // create a sequence iterator on the snapshot for full range
534    fdb_iterator_sequence_init(snap_db, &iterator, 0, 0, FDB_ITR_NONE);
535
536    // repeat until fail
537    i=0;
538    count=0;
539    do {
540        status = fdb_iterator_get(iterator, &rdoc);
541        TEST_CHK(status == FDB_RESULT_SUCCESS);
542
543        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
544        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
545        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
546
547        fdb_doc_free(rdoc);
548        rdoc = NULL;
549        i ++;
550        count++;
551    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
552
553    TEST_CHK(count==n/2); // Only unique items from the first half
554
555    fdb_iterator_close(iterator);
556
557    // close db handle
558    fdb_kvs_close(db);
559    // close snapshot handle
560    fdb_kvs_close(snap_db);
561    // close db file
562    fdb_close(dbfile);
563
564    // free all documents
565    for (i=0;i<n;++i){
566        fdb_doc_free(doc[i]);
567    }
568
569    // 1. Open Database File
570    status = fdb_open(&dbfile, "./mvcc_test2", &fconfig);
571    TEST_CHK(status == FDB_RESULT_SUCCESS);
572
573    // 2. Open KV Store
574    status = fdb_kvs_open(dbfile, &db, "kv2", &kvs_config);
575    TEST_CHK(status == FDB_RESULT_SUCCESS);
576
577    // 2. Create Key 'a' and Commit
578    status = fdb_set_kv(db, (void *) "a", 1, (void *)"val-a", 5);
579    TEST_CHK(status == FDB_RESULT_SUCCESS);
580    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
581    TEST_CHK(status == FDB_RESULT_SUCCESS);
582
583    // 3. Create Key 'b' and Commit
584    status = fdb_set_kv(db, (void *)"b", 1, (void *)"val-b", 5);
585    TEST_CHK(status == FDB_RESULT_SUCCESS);
586    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
587    TEST_CHK(status == FDB_RESULT_SUCCESS);
588
589    // 4. Create Key 'c' and Commit
590    status = fdb_set_kv(db, (void *)"c", 1, (void *)"val-c", 5);
591    TEST_CHK(status == FDB_RESULT_SUCCESS);
592    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
593    TEST_CHK(status == FDB_RESULT_SUCCESS);
594
595    // 5. Remember seqnum for opening snapshot
596    status = fdb_get_kvs_info(db, &kvs_info);
597    TEST_CHK(status == FDB_RESULT_SUCCESS);
598    snap_seq = kvs_info.last_seqnum;
599
600    // 6.  Create an iterator
601    status = fdb_iterator_init(db, &iterator, (void *)"a", 1,
602                               (void *)"c", 1, FDB_ITR_NONE);
603    TEST_CHK(status == FDB_RESULT_SUCCESS);
604
605    // 7. Do a seek
606    status = fdb_iterator_seek(iterator, (void *)"b", 1, FDB_ITR_SEEK_HIGHER);
607    TEST_CHK(status == FDB_RESULT_SUCCESS);
608
609    // 8. Close the iterator
610    status = fdb_iterator_close(iterator);
611    TEST_CHK(status == FDB_RESULT_SUCCESS);
612
613    // 9. Open a snapshot at the same point
614    status = fdb_snapshot_open(db, &snap_db, snap_seq);
615    TEST_CHK(status == FDB_RESULT_SUCCESS);
616
617    // That works, now attempt to do exact same sequence on a snapshot
618    // The snapshot is opened at the exact same place that the original
619    // database handle should be at (last seq 3)
620
621    // 10.  Create an iterator on snapshot
622    status = fdb_iterator_init(snap_db, &iterator, (void *)"a", 1,
623                               (void *)"c", 1, FDB_ITR_NONE);
624    TEST_CHK(status == FDB_RESULT_SUCCESS);
625
626    // 11. Do a seek
627    status = fdb_iterator_seek(iterator, (void *)"b", 1, FDB_ITR_SEEK_HIGHER);
628    TEST_CHK(status == FDB_RESULT_SUCCESS);
629
630    // 12. Close the iterator
631    status = fdb_iterator_close(iterator);
632    TEST_CHK(status == FDB_RESULT_SUCCESS);
633
634    // close db handle
635    fdb_kvs_close(db);
636    // close snapshot handle
637    fdb_kvs_close(snap_db);
638    // close db file
639    fdb_close(dbfile);
640
641    // free all resources
642    fdb_shutdown();
643
644    memleak_end();
645
646    TEST_RESULT("snapshot test");
647}
648
649void snapshot_stats_test()
650{
651    TEST_INIT();
652
653    memleak_start();
654
655    int i, r;
656    int n = 20;
657    fdb_file_handle *dbfile;
658    fdb_kvs_handle *db;
659    fdb_kvs_handle *snap_db;
660    fdb_seqnum_t snap_seq;
661    fdb_doc **doc = alca(fdb_doc*, n);
662    fdb_kvs_info kvs_info;
663    fdb_status status;
664
665    char keybuf[256], metabuf[256], bodybuf[256];
666
667    // remove previous mvcc_test files
668    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
669    (void)r;
670
671    fdb_config fconfig = fdb_get_default_config();
672    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
673    fconfig.buffercache_size = 0;
674    fconfig.wal_threshold = 1024;
675    fconfig.flags = FDB_OPEN_FLAG_CREATE;
676    fconfig.compaction_threshold = 0;
677
678    // remove previous mvcc_test files
679    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
680    (void)r;
681
682    // open db
683    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
684    fdb_kvs_open_default(dbfile, &db, &kvs_config);
685    TEST_CHK(status == FDB_RESULT_SUCCESS);
686
687    // ------- Setup test ----------------------------------
688    // insert documents
689    for (i=0; i<n; i++){
690        sprintf(keybuf, "key%d", i);
691        sprintf(metabuf, "meta%d", i);
692        sprintf(bodybuf, "body%d", i);
693        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
694            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
695        fdb_set(db, doc[i]);
696        fdb_doc_free(doc[i]);
697    }
698
699    // commit with a manual WAL flush (these docs go into HB-trie)
700    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
701
702    // retrieve the sequence number for a snapshot open
703    status = fdb_get_kvs_info(db, &kvs_info);
704    TEST_CHK(status == FDB_RESULT_SUCCESS);
705    snap_seq = kvs_info.last_seqnum;
706
707    status = fdb_set_log_callback(db, logCallbackFunc,
708                                  (void *) "snapshot_stats_test");
709    TEST_CHK(status == FDB_RESULT_SUCCESS);
710
711    // Init Snapshot of open file with saved document seqnum as marker
712    status = fdb_snapshot_open(db, &snap_db, snap_seq);
713    TEST_CHK(status == FDB_RESULT_SUCCESS);
714
715    // check snapshot's sequence number
716    fdb_get_kvs_info(snap_db, &kvs_info);
717    TEST_CHK(kvs_info.last_seqnum == snap_seq);
718
719    TEST_CHK(kvs_info.doc_count == (size_t)n);
720    TEST_CHK(kvs_info.file == dbfile);
721
722    // close snapshot handle
723    fdb_kvs_close(snap_db);
724
725    // Test stats by creating an in-memory snapshot
726    status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
727    TEST_CHK(status == FDB_RESULT_SUCCESS);
728
729    // check snapshot's sequence number
730    fdb_get_kvs_info(snap_db, &kvs_info);
731    TEST_CHK(kvs_info.last_seqnum == snap_seq);
732
733    TEST_CHK(kvs_info.doc_count == (size_t)n);
734    TEST_CHK(kvs_info.file == dbfile);
735
736    // close snapshot handle
737    fdb_kvs_close(snap_db);
738
739    // close db handle
740    fdb_kvs_close(db);
741    // close db file
742    fdb_close(dbfile);
743
744    // free all resources
745    fdb_shutdown();
746
747    memleak_end();
748
749    TEST_RESULT("snapshot stats test");
750}
751
752void snapshot_with_uncomitted_data_test()
753{
754    TEST_INIT();
755
756    int n = 10, value_len=32;
757    int i, r, idx;
758    char cmd[256];
759    char key[256], *value;
760    char keystr[] = "key%06d";
761    char valuestr[] = "value%d";
762    fdb_file_handle *db_file;
763    fdb_kvs_handle *db0, *db1, *db2, *snap;
764    fdb_config config;
765    fdb_kvs_config kvs_config;
766    fdb_kvs_info info;
767    fdb_seqnum_t seqnum;
768    fdb_status s; (void)s;
769
770    sprintf(cmd, SHELL_DEL " mvcc_test* > errorlog.txt");
771    r = system(cmd);
772    (void)r;
773
774    memleak_start();
775
776    value = (char*)malloc(value_len);
777
778    srand(1234);
779    config = fdb_get_default_config();
780    config.seqtree_opt = FDB_SEQTREE_USE;
781    config.wal_flush_before_commit = true;
782    config.multi_kv_instances = true;
783    config.buffercache_size = 0*1024*1024;
784
785    kvs_config = fdb_get_default_kvs_config();
786
787    s = fdb_open(&db_file, "./mvcc_test9", &config);
788    TEST_CHK(s == FDB_RESULT_SUCCESS);
789    s = fdb_kvs_open(db_file, &db0, NULL, &kvs_config);
790    TEST_CHK(s == FDB_RESULT_SUCCESS);
791    s = fdb_kvs_open(db_file, &db1, "db1", &kvs_config);
792    TEST_CHK(s == FDB_RESULT_SUCCESS);
793    s = fdb_kvs_open(db_file, &db2, "db2", &kvs_config);
794    TEST_CHK(s == FDB_RESULT_SUCCESS);
795
796    // insert docs in all KV stores
797    for (i=0;i<n;++i){
798        idx = i;
799        sprintf(key, keystr, idx);
800        memset(value, 'x', value_len);
801        memcpy(value + value_len - 6, "<end>", 6);
802        sprintf(value, valuestr, idx);
803        s = fdb_set_kv(db0, key, strlen(key)+1, value, value_len);
804        TEST_CHK(s == FDB_RESULT_SUCCESS);
805        s = fdb_set_kv(db1, key, strlen(key)+1, value, value_len);
806        TEST_CHK(s == FDB_RESULT_SUCCESS);
807        s = fdb_set_kv(db2, key, strlen(key)+1, value, value_len);
808        TEST_CHK(s == FDB_RESULT_SUCCESS);
809    }
810    // try to open snapshot before commit
811    s = fdb_get_kvs_info(db0, &info);
812    TEST_CHK(s == FDB_RESULT_SUCCESS);
813    s = fdb_snapshot_open(db0, &snap, info.last_seqnum);
814    TEST_CHK(s != FDB_RESULT_SUCCESS);
815
816    s = fdb_get_kvs_info(db1, &info);
817    TEST_CHK(s == FDB_RESULT_SUCCESS);
818    s = fdb_snapshot_open(db1, &snap, info.last_seqnum);
819    TEST_CHK(s != FDB_RESULT_SUCCESS);
820
821    s = fdb_get_kvs_info(db2, &info);
822    TEST_CHK(s == FDB_RESULT_SUCCESS);
823    s = fdb_snapshot_open(db2, &snap, info.last_seqnum);
824    TEST_CHK(s != FDB_RESULT_SUCCESS);
825
826    s = fdb_commit(db_file, FDB_COMMIT_NORMAL);
827    TEST_CHK(s == FDB_RESULT_SUCCESS);
828
829    s = fdb_get_kvs_info(db1, &info);
830    TEST_CHK(s == FDB_RESULT_SUCCESS);
831    seqnum = info.last_seqnum;
832
833    s = fdb_get_kvs_info(db2, &info);
834    TEST_CHK(s == FDB_RESULT_SUCCESS);
835    TEST_CHK(seqnum == info.last_seqnum);
836
837    s = fdb_get_kvs_info(db0, &info);
838    TEST_CHK(s == FDB_RESULT_SUCCESS);
839    TEST_CHK(seqnum == info.last_seqnum);
840
841    // now insert docs into default and db2 only, without commit
842    for (i=0;i<n;++i){
843        idx = i;
844        sprintf(key, keystr, idx);
845        memset(value, 'x', value_len);
846        memcpy(value + value_len - 6, "<end>", 6);
847        sprintf(value, valuestr, idx);
848        s = fdb_set_kv(db0, key, strlen(key)+1, value, value_len);
849        TEST_CHK(s == FDB_RESULT_SUCCESS);
850        s = fdb_set_kv(db2, key, strlen(key)+1, value, value_len);
851        TEST_CHK(s == FDB_RESULT_SUCCESS);
852    }
853
854    // open snapshot on db1
855    s = fdb_get_kvs_info(db1, &info);
856    TEST_CHK(s == FDB_RESULT_SUCCESS);
857    // latest (comitted) seqnum
858    s = fdb_snapshot_open(db1, &snap, info.last_seqnum);
859    TEST_CHK(s == FDB_RESULT_SUCCESS);
860    s = fdb_kvs_close(snap);
861
862    // open snapshot on db2
863    s = fdb_get_kvs_info(db2, &info);
864    TEST_CHK(s == FDB_RESULT_SUCCESS);
865
866    // latest (uncomitted) seqnum
867    s = fdb_snapshot_open(db2, &snap, info.last_seqnum);
868    TEST_CHK(s != FDB_RESULT_SUCCESS);
869    // committed seqnum
870    s = fdb_snapshot_open(db2, &snap, seqnum);
871    TEST_CHK(s == FDB_RESULT_SUCCESS);
872    s = fdb_kvs_close(snap);
873
874    // open snapshot on default KVS
875    s = fdb_get_kvs_info(db0, &info);
876    TEST_CHK(s == FDB_RESULT_SUCCESS);
877
878    // latest (uncomitted) seqnum
879    s = fdb_snapshot_open(db0, &snap, info.last_seqnum);
880    TEST_CHK(s != FDB_RESULT_SUCCESS);
881    // committed seqnum
882    s = fdb_snapshot_open(db0, &snap, seqnum);
883    TEST_CHK(s == FDB_RESULT_SUCCESS);
884    s = fdb_kvs_close(snap);
885
886    s = fdb_close(db_file);
887    s = fdb_shutdown();
888    free(value);
889    memleak_end();
890
891    TEST_RESULT("snapshot with uncomitted data in other KVS test");
892}
893
894void in_memory_snapshot_test()
895{
896    TEST_INIT();
897
898    memleak_start();
899
900    int i, r;
901    int n = 20;
902    int count;
903    fdb_file_handle *dbfile;
904    fdb_kvs_handle *db;
905    fdb_kvs_handle *snap_db;
906    fdb_seqnum_t snap_seq;
907    fdb_doc **doc = alca(fdb_doc*, n);
908    fdb_doc *rdoc;
909    fdb_kvs_info kvs_info;
910    fdb_status status;
911    fdb_iterator *iterator;
912
913    char keybuf[256], metabuf[256], bodybuf[256];
914
915    // remove previous mvcc_test files
916    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
917    (void)r;
918
919    fdb_config fconfig = fdb_get_default_config();
920    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
921    fconfig.buffercache_size = 0;
922    fconfig.wal_threshold = 1024;
923    fconfig.flags = FDB_OPEN_FLAG_CREATE;
924    fconfig.compaction_threshold = 0;
925
926    // remove previous mvcc_test files
927    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
928    (void)r;
929
930    // open db
931    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
932    TEST_CHK(status == FDB_RESULT_SUCCESS);
933    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
934    TEST_CHK(status == FDB_RESULT_SUCCESS);
935
936    // Create a snapshot from an empty database file
937    status = fdb_snapshot_open(db, &snap_db, 0);
938    TEST_CHK(status == FDB_RESULT_SUCCESS);
939    // check if snapshot's sequence number is zero.
940    fdb_get_kvs_info(snap_db, &kvs_info);
941    TEST_CHK(kvs_info.last_seqnum == 0);
942    // create an iterator on the snapshot for full range
943    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
944    // Iterator should not return any items.
945    status = fdb_iterator_get(iterator, &rdoc);
946    TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
947    fdb_iterator_close(iterator);
948    fdb_kvs_close(snap_db);
949
950    // ------- Setup test ----------------------------------
951    // insert documents of 0-4
952    for (i=0; i<n/4; i++){
953        sprintf(keybuf, "key%d", i);
954        sprintf(metabuf, "meta%d", i);
955        sprintf(bodybuf, "body%d", i);
956        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
957            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
958        fdb_set(db, doc[i]);
959    }
960
961    // commit with a manual WAL flush (these docs go into HB-trie)
962    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
963
964    // insert documents from 5 - 8
965    for (; i < n/2 - 1; i++){
966        sprintf(keybuf, "key%d", i);
967        sprintf(metabuf, "meta%d", i);
968        sprintf(bodybuf, "body%d", i);
969        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
970            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
971        fdb_set(db, doc[i]);
972    }
973
974    // We want to create:
975    // |WALFlushHDR|Key-Value1|HDR|Key-Value2|SnapshotHDR|Key-Value1|HDR|
976    // Insert doc 9 with a different value to test duplicate elimination..
977    sprintf(keybuf, "key%d", i);
978    sprintf(metabuf, "meta%d", i);
979    sprintf(bodybuf, "Body%d", i);
980    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
981            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
982    fdb_set(db, doc[i]);
983
984    // commit again without a WAL flush (last doc goes into the AVL tree)
985    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
986
987    // Insert doc 9 now again with expected value..
988    *(char *)doc[i]->body = 'b';
989    fdb_set(db, doc[i]);
990
991    // TAKE SNAPSHOT: pick up sequence number of a commit without a WAL flush
992    snap_seq = doc[i]->seqnum;
993
994    // Creation of a snapshot with a sequence number that was taken WITHOUT a
995    // commit must fail even if it from the latest document that was set...
996    fdb_get_kvs_info(db, &kvs_info);
997    status = fdb_snapshot_open(db, &snap_db, kvs_info.last_seqnum);
998    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
999
1000    // ---------- Snapshot tests begin -----------------------
1001    // Initialize an in-memory snapshot Without a Commit...
1002    // WAL items are not flushed...
1003    status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
1004    TEST_CHK(status == FDB_RESULT_SUCCESS);
1005
1006    // commit again without a WAL flush (these documents go into the AVL trees)
1007    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1008
1009    // Now re-insert doc 9 as another duplicate (only newer sequence number)
1010    fdb_set(db, doc[i]);
1011
1012    // commit again without a WAL flush (last doc goes into the AVL tree)
1013    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1014
1015    // insert documents from 10-14 into HB-trie
1016    for (++i; i < (n/2 + n/4); i++){
1017        sprintf(keybuf, "key%d", i);
1018        sprintf(metabuf, "meta%d", i);
1019        sprintf(bodybuf, "body%d", i);
1020        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1021                (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1022        fdb_set(db, doc[i]);
1023    }
1024
1025    status = fdb_set_log_callback(db, logCallbackFunc,
1026                                  (void *) "in_memory_snapshot_test");
1027    TEST_CHK(status == FDB_RESULT_SUCCESS);
1028
1029    // check snapshot's sequence number
1030    fdb_get_kvs_info(snap_db, &kvs_info);
1031    TEST_CHK(kvs_info.last_seqnum == snap_seq);
1032
1033    // insert documents from 15 - 19 on file into the WAL
1034    for (; i < n; i++){
1035        sprintf(keybuf, "key%d", i);
1036        sprintf(metabuf, "meta%d", i);
1037        sprintf(bodybuf, "body%d", i);
1038        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1039            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1040        fdb_set(db, doc[i]);
1041    }
1042    // commit without a WAL flush (This WAL must not affect snapshot)
1043    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1044
1045    // create an iterator on the snapshot for full range
1046    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1047
1048    // repeat until fail
1049    i=0;
1050    count=0;
1051    rdoc = NULL;
1052    do {
1053        status = fdb_iterator_get(iterator, &rdoc);
1054        TEST_CHK(status == FDB_RESULT_SUCCESS);
1055
1056        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1057        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
1058        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1059
1060        fdb_doc_free(rdoc);
1061        rdoc = NULL;
1062        i++;
1063        count++;
1064    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
1065
1066    TEST_CHK(count==n/2); // Only unique items from the first half
1067
1068    fdb_iterator_close(iterator);
1069
1070    // close db handle
1071    fdb_kvs_close(db);
1072    // close snapshot handle
1073    fdb_kvs_close(snap_db);
1074
1075    // close db file
1076    fdb_close(dbfile);
1077
1078    // free all documents
1079    for (i=0;i<n;++i){
1080        fdb_doc_free(doc[i]);
1081    }
1082
1083    // free all resources
1084    fdb_shutdown();
1085
1086    memleak_end();
1087
1088    TEST_RESULT("in-memory snapshot test");
1089}
1090
1091
1092void in_memory_snapshot_on_dirty_hbtrie_test()
1093{
1094    TEST_INIT();
1095
1096    int n = 300, value_len=32;
1097    int i, r, idx, c;
1098    char cmd[256];
1099    char key[256], *value;
1100    char keystr[] = "k%05d";
1101    char keystr2[] = "k%06d";
1102    char valuestr[] = "value%08d";
1103    fdb_file_handle *db_file;
1104    fdb_kvs_handle *db, *snap, *snap_clone;
1105    fdb_config config;
1106    fdb_kvs_config kvs_config;
1107    fdb_status s;
1108    fdb_iterator *fit, *fit_normal, *fit_clone;
1109    fdb_doc *doc;
1110
1111    sprintf(cmd, SHELL_DEL " mvcc_test* > errorlog.txt");
1112    r = system(cmd);
1113    (void)r;
1114
1115    memleak_start();
1116
1117    value = (char*)malloc(value_len);
1118
1119    config = fdb_get_default_config();
1120    config.durability_opt = FDB_DRB_ASYNC;
1121    config.seqtree_opt = FDB_SEQTREE_USE;
1122    config.wal_flush_before_commit = true;
1123    config.wal_threshold = n/5;
1124    config.multi_kv_instances = true;
1125    config.buffercache_size = 0;
1126
1127    kvs_config = fdb_get_default_kvs_config();
1128
1129    s = fdb_open(&db_file, "./mvcc_test", &config);
1130    s = fdb_kvs_open(db_file, &db, "db", &kvs_config);
1131    TEST_CHK(s == FDB_RESULT_SUCCESS);
1132
1133    // write a few documents and commit & wal flush
1134    for (i=0;i<n/10;++i){
1135        idx = i;
1136        sprintf(key, keystr, idx);
1137        memset(value, 'x', value_len);
1138        memcpy(value + value_len - 6, "<end>", 6);
1139        sprintf(value, valuestr, idx);
1140        s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1141        TEST_CHK(s == FDB_RESULT_SUCCESS);
1142    }
1143    s = fdb_commit(db_file, FDB_COMMIT_MANUAL_WAL_FLUSH);
1144    TEST_CHK(s == FDB_RESULT_SUCCESS);
1145
1146    // create an in-memory snapshot and its clone
1147    s = fdb_snapshot_open(db, &snap, FDB_SNAPSHOT_INMEM);
1148    TEST_CHK(s == FDB_RESULT_SUCCESS);
1149    s = fdb_snapshot_open(snap, &snap_clone, FDB_SNAPSHOT_INMEM);
1150    TEST_CHK(s == FDB_RESULT_SUCCESS);
1151
1152    // write a number of documents in order to make WAL be flushed before commit
1153    for (i=n/10;i<n;++i){
1154        idx = i;
1155        sprintf(key, keystr, idx);
1156        memset(value, 'x', value_len);
1157        memcpy(value + value_len - 6, "<end>", 6);
1158        sprintf(value, valuestr, idx);
1159        s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1160        TEST_CHK(s == FDB_RESULT_SUCCESS);
1161    }
1162
1163    void *v_out;
1164    size_t vlen_out;
1165    idx = n/10;
1166    sprintf(key, keystr, idx);
1167    s = fdb_get_kv(snap, key, strlen(key)+1, &v_out, &vlen_out);
1168    // should not be able to retrieve
1169    TEST_CHK(s != FDB_RESULT_SUCCESS);
1170
1171    s = fdb_get_kv(snap_clone, key, strlen(key)+1, &v_out, &vlen_out);
1172    // should not be able to retrieve in also clone
1173    TEST_CHK(s != FDB_RESULT_SUCCESS);
1174
1175    // close snapshot and its clone
1176    s = fdb_kvs_close(snap);
1177    TEST_CHK(s == FDB_RESULT_SUCCESS);
1178    s = fdb_kvs_close(snap_clone);
1179    TEST_CHK(s == FDB_RESULT_SUCCESS);
1180
1181    // create an in-memory snapshot
1182    s = fdb_snapshot_open(db, &snap, FDB_SNAPSHOT_INMEM);
1183    TEST_CHK(s == FDB_RESULT_SUCCESS);
1184
1185    s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1186    TEST_CHK(s == FDB_RESULT_SUCCESS);
1187    c = 0;
1188    do {
1189        doc = NULL;
1190        s = fdb_iterator_get(fit, &doc);
1191        if (s != FDB_RESULT_SUCCESS) break;
1192
1193        idx = c;
1194        sprintf(key, keystr, idx);
1195        memset(value, 'x', value_len);
1196        memcpy(value + value_len - 6, "<end>", 6);
1197        sprintf(value, valuestr, idx);
1198        TEST_CMP(doc->key, key, doc->keylen);
1199        TEST_CMP(doc->body, value, doc->bodylen);
1200        c++;
1201        s = fdb_doc_free(doc);
1202    } while(fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1203    TEST_CHK(c == n);
1204
1205    s = fdb_iterator_close(fit);
1206    TEST_CHK(s == FDB_RESULT_SUCCESS);
1207
1208    // create a clone
1209    s = fdb_snapshot_open(snap, &snap_clone, FDB_SNAPSHOT_INMEM);
1210    TEST_CHK(s == FDB_RESULT_SUCCESS);
1211
1212    // create iterators on snapshot and its clone
1213    s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1214    TEST_CHK(s == FDB_RESULT_SUCCESS);
1215    s = fdb_iterator_init(snap_clone, &fit_clone, NULL, 0, NULL, 0, 0x0);
1216    TEST_CHK(s == FDB_RESULT_SUCCESS);
1217
1218    // also create an iterator on a normal handle
1219    s = fdb_iterator_init(db, &fit_normal, NULL, 0, NULL, 0, 0x0);
1220    TEST_CHK(s == FDB_RESULT_SUCCESS);
1221
1222    c = 0;
1223    do {
1224        doc = NULL;
1225        s = fdb_iterator_get(fit, &doc);
1226        if (s != FDB_RESULT_SUCCESS) break;
1227
1228        idx = c;
1229        sprintf(key, keystr, idx);
1230        memset(value, 'x', value_len);
1231        memcpy(value + value_len - 6, "<end>", 6);
1232        sprintf(value, valuestr, idx);
1233        TEST_CMP(doc->key, key, doc->keylen);
1234        TEST_CMP(doc->body, value, doc->bodylen);
1235        c++;
1236        s = fdb_doc_free(doc);
1237
1238        doc = NULL;
1239        s = fdb_iterator_get(fit, &doc);
1240        TEST_CHK(s == FDB_RESULT_SUCCESS);
1241        TEST_CMP(doc->key, key, doc->keylen);
1242        TEST_CMP(doc->body, value, doc->bodylen);
1243        s = fdb_doc_free(doc);
1244
1245        if (c == n/5) {
1246            // insert new docs in the middle of iteration
1247            for (i=0; i<n*10; ++i){
1248                idx = i;
1249                sprintf(key, keystr2, idx);
1250                memset(value, 'x', value_len);
1251                memcpy(value + value_len - 6, "<end>", 6);
1252                sprintf(value, valuestr, idx);
1253                s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1254                TEST_CHK(s == FDB_RESULT_SUCCESS);
1255            }
1256        }
1257
1258        s = fdb_iterator_next(fit);
1259        // result should be same to clone
1260        if (s != FDB_RESULT_SUCCESS) {
1261            s = fdb_iterator_next(fit_clone);
1262            TEST_CHK(s != FDB_RESULT_SUCCESS);
1263        } else {
1264            s = fdb_iterator_next(fit_clone);
1265            TEST_CHK(s == FDB_RESULT_SUCCESS);
1266        }
1267    } while(s == FDB_RESULT_SUCCESS);
1268    TEST_CHK(c == n);
1269
1270    // close iterators
1271    s = fdb_iterator_close(fit);
1272    TEST_CHK(s == FDB_RESULT_SUCCESS);
1273    s = fdb_iterator_close(fit_clone);
1274    TEST_CHK(s == FDB_RESULT_SUCCESS);
1275
1276    // the results should be same in the normal iterator
1277    c = 0;
1278    do {
1279        doc = NULL;
1280        s = fdb_iterator_get(fit_normal, &doc);
1281        if (s != FDB_RESULT_SUCCESS) break;
1282
1283        idx = c;
1284        sprintf(key, keystr, idx);
1285        memset(value, 'x', value_len);
1286        memcpy(value + value_len - 6, "<end>", 6);
1287        sprintf(value, valuestr, idx);
1288        TEST_CMP(doc->key, key, doc->keylen);
1289        TEST_CMP(doc->body, value, doc->bodylen);
1290        c++;
1291        s = fdb_doc_free(doc);
1292    } while(fdb_iterator_next(fit_normal) == FDB_RESULT_SUCCESS);
1293    TEST_CHK(c == n);
1294
1295    s = fdb_iterator_close(fit_normal);
1296    TEST_CHK(s == FDB_RESULT_SUCCESS);
1297
1298    s = fdb_kvs_close(snap);
1299    TEST_CHK(s == FDB_RESULT_SUCCESS);
1300
1301    s = fdb_kvs_close(snap_clone);
1302    TEST_CHK(s == FDB_RESULT_SUCCESS);
1303
1304    s = fdb_close(db_file);
1305    s = fdb_shutdown();
1306    free(value);
1307
1308    memleak_end();
1309
1310    TEST_RESULT("in-memory snapshot on dirty HB+trie nodes test");
1311}
1312
1313
1314struct cb_inmem_snap_args {
1315    fdb_kvs_handle *handle;
1316    int n;
1317    int move_count;
1318    int value_len;
1319    char *keystr;
1320    char *valuestr;
1321};
1322
1323static int cb_inmem_snap(fdb_file_handle *fhandle,
1324                       fdb_compaction_status status,
1325                       fdb_doc *doc_in, uint64_t old_offset, uint64_t new_offset,
1326                       void *ctx)
1327{
1328    TEST_INIT();
1329    int c, idx;
1330    char key[256], value[256];
1331    void *value_out;
1332    size_t valuelen;
1333    fdb_kvs_handle *snap;
1334    fdb_kvs_info info;
1335    fdb_iterator *fit;
1336    fdb_doc *doc;
1337    fdb_status s;
1338    struct cb_inmem_snap_args *args = (struct cb_inmem_snap_args *)ctx;
1339    (void)args;
1340
1341    if (status == FDB_CS_MOVE_DOC) {
1342        args->move_count++;
1343        if (args->move_count == 2) {
1344            // open in-memory snapshot
1345            s = fdb_snapshot_open(args->handle, &snap, FDB_SNAPSHOT_INMEM);
1346            TEST_CHK(s == FDB_RESULT_SUCCESS);
1347
1348            s = fdb_get_kvs_info(snap, &info);
1349            TEST_CHK(s == FDB_RESULT_SUCCESS);
1350            TEST_CHK(info.last_seqnum == (fdb_seqnum_t)args->n);
1351
1352            s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1353            TEST_CHK(s == FDB_RESULT_SUCCESS);
1354
1355            c = 0;
1356            do {
1357                doc = NULL;
1358                s = fdb_iterator_get(fit, &doc);
1359                if (s != FDB_RESULT_SUCCESS) {
1360                    break;
1361                }
1362                idx = c;
1363                sprintf(key, args->keystr, idx);
1364                memset(value, 'x', args->value_len);
1365                memcpy(value + args->value_len - 6, "<end>", 6);
1366                sprintf(value, args->valuestr, idx);
1367                TEST_CMP(doc->key, key, doc->keylen);
1368                TEST_CMP(doc->body, value, doc->bodylen);
1369
1370                c++;
1371                fdb_doc_free(doc);
1372            } while (fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1373            TEST_CHK(c == args->n);
1374
1375            s = fdb_iterator_close(fit);
1376            TEST_CHK(s == FDB_RESULT_SUCCESS);
1377
1378            fdb_kvs_close(snap);
1379
1380        } else if (args->move_count == 5) {
1381            // insert new doc
1382            sprintf(key, "new_key");
1383            sprintf(value, "new_value");
1384            s = fdb_set_kv(args->handle, (void*)key, strlen(key)+1, (void*)value, strlen(value)+1);
1385            TEST_CHK(s == FDB_RESULT_SUCCESS);
1386
1387            // open in-memory snapshot
1388            s = fdb_snapshot_open(args->handle, &snap, FDB_SNAPSHOT_INMEM);
1389            TEST_CHK(s == FDB_RESULT_SUCCESS);
1390
1391            s = fdb_get_kvs_info(snap, &info);
1392            TEST_CHK(s == FDB_RESULT_SUCCESS);
1393            TEST_CHK(info.last_seqnum == (fdb_seqnum_t)args->n+1);
1394
1395            s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1396            TEST_CHK(s == FDB_RESULT_SUCCESS);
1397
1398            c = 0;
1399            do {
1400                doc = NULL;
1401                s = fdb_iterator_get(fit, &doc);
1402                if (s != FDB_RESULT_SUCCESS) {
1403                    break;
1404                }
1405                if (c < args->n) {
1406                    idx = c;
1407                    sprintf(key, args->keystr, idx);
1408                    memset(value, 'x', args->value_len);
1409                    memcpy(value + args->value_len - 6, "<end>", 6);
1410                    sprintf(value, args->valuestr, idx);
1411                    TEST_CMP(doc->key, key, doc->keylen);
1412                    TEST_CMP(doc->body, value, doc->bodylen);
1413                } else {
1414                    // new document
1415                    sprintf(key, "new_key");
1416                    sprintf(value, "new_value");
1417                    TEST_CMP(doc->key, key, doc->keylen);
1418                    TEST_CMP(doc->body, value, doc->bodylen);
1419                }
1420
1421                c++;
1422                fdb_doc_free(doc);
1423            } while (fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1424            TEST_CHK(c == args->n + 1);
1425
1426            s = fdb_iterator_close(fit);
1427            TEST_CHK(s == FDB_RESULT_SUCCESS);
1428
1429            s = fdb_get_kv(snap, (void*)key, strlen(key)+1, &value_out, &valuelen);
1430            TEST_CHK(s == FDB_RESULT_SUCCESS);
1431            TEST_CMP(value_out, value, valuelen);
1432            fdb_free_block(value_out);
1433
1434            fdb_kvs_close(snap);
1435        }
1436    }
1437    return 0;
1438}
1439
1440void in_memory_snapshot_compaction_test()
1441{
1442    TEST_INIT();
1443
1444    int n = 10, value_len=32;
1445    int i, r, c, idx;
1446    char cmd[256];
1447    char key[256], *value;
1448    char keystr[] = "k%05d";
1449    char valuestr[] = "value%08d";
1450    void *value_out;
1451    size_t valuelen;
1452    fdb_file_handle *db_file;
1453    fdb_kvs_handle *db, *db2, *snap;
1454    fdb_config config;
1455    fdb_kvs_config kvs_config;
1456    fdb_kvs_info info;
1457    fdb_iterator *fit;
1458    fdb_doc *doc;
1459    fdb_status s;
1460    struct cb_inmem_snap_args cargs;
1461
1462    sprintf(cmd, SHELL_DEL " mvcc_test* > errorlog.txt");
1463    r = system(cmd);
1464    (void)r;
1465
1466    memleak_start();
1467
1468    value = (char*)malloc(value_len);
1469
1470    config = fdb_get_default_config();
1471    config.durability_opt = FDB_DRB_ASYNC;
1472    config.seqtree_opt = FDB_SEQTREE_USE;
1473    config.wal_flush_before_commit = true;
1474    config.wal_threshold = n/5;
1475    config.multi_kv_instances = true;
1476    config.buffercache_size = 0;
1477    config.compaction_cb = cb_inmem_snap;
1478    config.compaction_cb_mask = FDB_CS_MOVE_DOC;
1479    config.compaction_cb_ctx = &cargs;
1480
1481    kvs_config = fdb_get_default_kvs_config();
1482
1483    s = fdb_open(&db_file, "./mvcc_test", &config);
1484    s = fdb_kvs_open(db_file, &db, "db", &kvs_config);
1485    TEST_CHK(s == FDB_RESULT_SUCCESS);
1486
1487    s = fdb_kvs_open(db_file, &db2, "db", &kvs_config);
1488    TEST_CHK(s == FDB_RESULT_SUCCESS);
1489
1490    cargs.handle = db2;
1491    cargs.move_count = 0;
1492    cargs.n = n;
1493    cargs.keystr = keystr;
1494    cargs.valuestr = valuestr;
1495    cargs.value_len = value_len;
1496
1497    // write
1498    for (i=0;i<n;++i){
1499        idx = i;
1500        sprintf(key, keystr, idx);
1501        memset(value, 'x', value_len);
1502        memcpy(value + value_len - 6, "<end>", 6);
1503        sprintf(value, valuestr, idx);
1504        s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1505        TEST_CHK(s == FDB_RESULT_SUCCESS);
1506    }
1507    s = fdb_commit(db_file, FDB_COMMIT_MANUAL_WAL_FLUSH);
1508    TEST_CHK(s == FDB_RESULT_SUCCESS);
1509
1510    s = fdb_compact(db_file, "./mvcc_test2");
1511
1512    // open in-memory snapshot
1513    s = fdb_snapshot_open(db, &snap, FDB_SNAPSHOT_INMEM);
1514    TEST_CHK(s == FDB_RESULT_SUCCESS);
1515
1516    s = fdb_get_kvs_info(snap, &info);
1517    TEST_CHK(s == FDB_RESULT_SUCCESS);
1518    TEST_CHK(info.last_seqnum == (fdb_seqnum_t)n+1);
1519
1520    s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1521    TEST_CHK(s == FDB_RESULT_SUCCESS);
1522
1523    c = 0;
1524    do {
1525        doc = NULL;
1526        s = fdb_iterator_get(fit, &doc);
1527        if (s != FDB_RESULT_SUCCESS) {
1528            break;
1529        }
1530        if (c < n) {
1531            idx = c;
1532            sprintf(key, keystr, idx);
1533            memset(value, 'x', value_len);
1534            memcpy(value + value_len - 6, "<end>", 6);
1535            sprintf(value, valuestr, idx);
1536            TEST_CMP(doc->key, key, doc->keylen);
1537            TEST_CMP(doc->body, value, doc->bodylen);
1538        } else {
1539            // new document
1540            sprintf(key, "new_key");
1541            sprintf(value, "new_value");
1542            TEST_CMP(doc->key, key, doc->keylen);
1543            TEST_CMP(doc->body, value, doc->bodylen);
1544        }
1545
1546        c++;
1547        fdb_doc_free(doc);
1548    } while (fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1549    TEST_CHK(c == n + 1);
1550
1551    s = fdb_iterator_close(fit);
1552    TEST_CHK(s == FDB_RESULT_SUCCESS);
1553
1554    s = fdb_get_kv(snap, (void*)key, strlen(key)+1, &value_out, &valuelen);
1555    TEST_CHK(s == FDB_RESULT_SUCCESS);
1556    TEST_CMP(value_out, value, valuelen);
1557    fdb_free_block(value_out);
1558
1559    fdb_kvs_close(snap);
1560
1561    s = fdb_close(db_file);
1562    s = fdb_shutdown();
1563    free(value);
1564
1565    memleak_end();
1566
1567    TEST_RESULT("in-memory snapshot with concurrent compaction test");
1568}
1569
1570void snapshot_clone_test()
1571{
1572    TEST_INIT();
1573
1574    memleak_start();
1575
1576    int i, r;
1577    int n = 20;
1578    int count;
1579    fdb_file_handle *dbfile;
1580    fdb_kvs_handle *db;
1581    fdb_kvs_handle *snap_db, *snap_inmem;
1582    fdb_kvs_handle *snap_db2, *snap_inmem2; // clones from stable & inmemory
1583    fdb_seqnum_t snap_seq;
1584    fdb_doc **doc = alca(fdb_doc*, n);
1585    fdb_doc *rdoc;
1586    fdb_kvs_info kvs_info;
1587    fdb_status status;
1588    fdb_iterator *iterator;
1589
1590    char keybuf[256], metabuf[256], bodybuf[256];
1591
1592    // remove previous mvcc_test files
1593    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1594    (void)r;
1595
1596    fdb_config fconfig = fdb_get_default_config();
1597    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1598    fconfig.buffercache_size = 0;
1599    fconfig.wal_threshold = 1024;
1600    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1601    fconfig.compaction_threshold = 0;
1602
1603    // remove previous mvcc_test files
1604    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1605    (void)r;
1606
1607    // open db
1608    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
1609    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1610    TEST_CHK(status == FDB_RESULT_SUCCESS);
1611
1612    // Create a snapshot from an empty database file
1613    status = fdb_snapshot_open(db, &snap_db, 0);
1614    TEST_CHK(status == FDB_RESULT_SUCCESS);
1615    // check if snapshot's sequence number is zero.
1616    fdb_get_kvs_info(snap_db, &kvs_info);
1617    TEST_CHK(kvs_info.last_seqnum == 0);
1618    // create an iterator on the snapshot for full range
1619    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1620    // Iterator should not return any items.
1621    status = fdb_iterator_next(iterator);
1622    TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
1623    fdb_iterator_close(iterator);
1624    fdb_kvs_close(snap_db);
1625
1626    // ------- Setup test ----------------------------------
1627    // insert documents of 0-4
1628    for (i=0; i<n/4; i++){
1629        sprintf(keybuf, "key%d", i);
1630        sprintf(metabuf, "meta%d", i);
1631        sprintf(bodybuf, "body%d", i);
1632        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1633            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1634        fdb_set(db, doc[i]);
1635    }
1636
1637    // commit with a manual WAL flush (these docs go into HB-trie)
1638    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1639
1640    // insert documents from 4 - 8
1641    for (; i < n/2 - 1; i++){
1642        sprintf(keybuf, "key%d", i);
1643        sprintf(metabuf, "meta%d", i);
1644        sprintf(bodybuf, "body%d", i);
1645        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1646            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1647        fdb_set(db, doc[i]);
1648    }
1649
1650    // We want to create:
1651    // |WALFlushHDR|Key-Value1|HDR|Key-Value2|SnapshotHDR|Key-Value1|HDR|
1652    // Insert doc 9 with a different value to test duplicate elimination..
1653    sprintf(keybuf, "key%d", i);
1654    sprintf(metabuf, "meta%d", i);
1655    sprintf(bodybuf, "Body%d", i);
1656    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1657            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1658    fdb_set(db, doc[i]);
1659
1660    // commit again without a WAL flush (last doc goes into the AVL tree)
1661    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1662
1663    // Insert doc 9 now again with expected value..
1664    *(char *)doc[i]->body = 'b';
1665    fdb_set(db, doc[i]);
1666    // commit again without a WAL flush (these documents go into the AVL trees)
1667    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1668
1669    // TAKE SNAPSHOT: pick up sequence number of a commit without a WAL flush
1670    snap_seq = doc[i]->seqnum;
1671
1672    // Initialize an in-memory snapshot Without a Commit...
1673    // WAL items are not flushed...
1674    status = fdb_snapshot_open(db, &snap_inmem, FDB_SNAPSHOT_INMEM);
1675    TEST_CHK(status == FDB_RESULT_SUCCESS);
1676
1677    // Now re-insert doc 9 as another duplicate (only newer sequence number)
1678    fdb_set(db, doc[i]);
1679    // commit again without a WAL flush (last doc goes into the AVL tree)
1680    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1681
1682    // insert documents from 10-14 into HB-trie
1683    for (++i; i < (n/2 + n/4); i++){
1684        sprintf(keybuf, "key%d", i);
1685        sprintf(metabuf, "meta%d", i);
1686        sprintf(bodybuf, "body%d", i);
1687        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1688            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1689        fdb_set(db, doc[i]);
1690    }
1691    // manually flush WAL & commit
1692    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1693
1694    status = fdb_set_log_callback(db, logCallbackFunc,
1695                                  (void *) "snapshot_clone_test");
1696    TEST_CHK(status == FDB_RESULT_SUCCESS);
1697    // ---------- Snapshot tests begin -----------------------
1698    // Attempt to take snapshot with out-of-range marker..
1699    status = fdb_snapshot_open(db, &snap_db, 999999);
1700    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
1701
1702    // Init Snapshot of open file with saved document seqnum as marker
1703    status = fdb_snapshot_open(db, &snap_db, snap_seq);
1704    TEST_CHK(status == FDB_RESULT_SUCCESS);
1705
1706    // Clone a snapshot into another snapshot...
1707    status = fdb_snapshot_open(snap_db, &snap_db2, snap_seq);
1708    TEST_CHK(status == FDB_RESULT_SUCCESS);
1709
1710    // close snapshot handle
1711    status = fdb_kvs_close(snap_db);
1712    TEST_CHK(status == FDB_RESULT_SUCCESS);
1713
1714    // check snapshot's sequence number
1715    fdb_get_kvs_info(snap_db2, &kvs_info);
1716    TEST_CHK(kvs_info.last_seqnum == snap_seq);
1717
1718    // insert documents from 15 - 19 on file into the WAL
1719    for (; i < n; i++){
1720        sprintf(keybuf, "key%d", i);
1721        sprintf(metabuf, "meta%d", i);
1722        sprintf(bodybuf, "body%d", i);
1723        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1724            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1725        fdb_set(db, doc[i]);
1726    }
1727    // commit without a WAL flush (This WAL must not affect snapshot)
1728    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1729
1730    // Clone the in-memory snapshot into another snapshot
1731    status = fdb_snapshot_open(snap_inmem, &snap_inmem2, FDB_SNAPSHOT_INMEM);
1732    TEST_CHK(status == FDB_RESULT_SUCCESS);
1733
1734    // Stable Snapshot Scan Tests..........
1735    // Retrieve metaonly by key from snapshot
1736    i = snap_seq + 1;
1737    fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1738    status = fdb_get_metaonly(snap_db2, rdoc);
1739    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
1740    fdb_doc_free(rdoc);
1741
1742    i = 5;
1743    fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1744    status = fdb_get_metaonly(snap_db2, rdoc);
1745    TEST_CHK(status == FDB_RESULT_SUCCESS);
1746    TEST_CMP(rdoc->key, doc[i]->key, doc[i]->keylen);
1747    TEST_CMP(rdoc->meta, doc[i]->meta, doc[i]->metalen);
1748    fdb_doc_free(rdoc);
1749
1750    // Retrieve by seq from snapshot
1751    fdb_doc_create(&rdoc, NULL, 0, NULL, 0, NULL, 0);
1752    rdoc->seqnum = 6;
1753    status = fdb_get_byseq(snap_db2, rdoc);
1754    TEST_CHK(status == FDB_RESULT_SUCCESS);
1755    TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1756    TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1757    fdb_doc_free(rdoc);
1758    rdoc = NULL;
1759
1760    // create an iterator on the snapshot for full range
1761    fdb_iterator_init(snap_db2, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1762
1763    // repeat until fail
1764    i=0;
1765    count=0;
1766    do {
1767        status = fdb_iterator_get(iterator, &rdoc);
1768        TEST_CHK(status == FDB_RESULT_SUCCESS);
1769        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1770        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
1771        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1772
1773        fdb_doc_free(rdoc);
1774        rdoc = NULL;
1775        i ++;
1776        count++;
1777    } while (fdb_iterator_next(iterator) == FDB_RESULT_SUCCESS);
1778
1779    TEST_CHK(count==n/2); // Only unique items from the first half
1780
1781    fdb_iterator_close(iterator);
1782
1783    // create an iterator on the in-memory snapshot clone for full range
1784    fdb_iterator_init(snap_inmem2, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1785
1786    // repeat until fail
1787    i=0;
1788    count=0;
1789    do {
1790        status = fdb_iterator_get(iterator, &rdoc);
1791        TEST_CHK(status == FDB_RESULT_SUCCESS);
1792        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1793        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
1794        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1795
1796        fdb_doc_free(rdoc);
1797        rdoc = NULL;
1798        i ++;
1799        count++;
1800    } while(fdb_iterator_next(iterator) == FDB_RESULT_SUCCESS);
1801
1802    TEST_CHK(count==n/2); // Only unique items from the first half
1803
1804    fdb_iterator_close(iterator);
1805
1806    // close db handle
1807    fdb_kvs_close(db);
1808    // close the in-memory snapshot handle
1809    fdb_kvs_close(snap_inmem);
1810    // close the in-memory clone snapshot handle
1811    fdb_kvs_close(snap_inmem2);
1812    // close the clone snapshot handle
1813    fdb_kvs_close(snap_db2);
1814    // close db file
1815    fdb_close(dbfile);
1816
1817    // free all documents
1818    for (i=0;i<n;++i){
1819        fdb_doc_free(doc[i]);
1820    }
1821
1822    // free all resources
1823    fdb_shutdown();
1824
1825    memleak_end();
1826
1827    TEST_RESULT("snapshot clone test");
1828}
1829
1830struct parallel_clone_t {
1831    fdb_doc **doc;
1832    fdb_kvs_handle *snap_db;
1833    int num_docs;
1834};
1835
1836void *snap_clone_thread(void *args)
1837{
1838    struct parallel_clone_t *t = (struct parallel_clone_t *)args;
1839    fdb_kvs_handle *clone_db;
1840    fdb_iterator *iterator;
1841    fdb_doc *rdoc = NULL;
1842    fdb_doc **doc = t->doc;
1843    int i;
1844    int n = t->num_docs;
1845    fdb_status status;
1846    int count;
1847    TEST_INIT();
1848
1849    status = fdb_snapshot_open(t->snap_db, &clone_db, FDB_SNAPSHOT_INMEM);
1850    TEST_CHK(status == FDB_RESULT_SUCCESS);
1851
1852    // create an iterator on the in-memory snapshot clone for full range
1853    fdb_iterator_init(clone_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1854
1855    // repeat until fail
1856    i=0;
1857    count=0;
1858    do {
1859        status = fdb_iterator_get(iterator, &rdoc);
1860        TEST_CHK(status == FDB_RESULT_SUCCESS);
1861        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1862        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1863
1864        fdb_doc_free(rdoc);
1865        rdoc = NULL;
1866        i ++;
1867        count++;
1868    } while(fdb_iterator_next(iterator) == FDB_RESULT_SUCCESS);
1869
1870    TEST_CHK(count==n/2); // Only unique items from the first half
1871
1872    fdb_iterator_close(iterator);
1873
1874    fdb_kvs_close(clone_db);
1875    thread_exit(0);
1876    return NULL;
1877}
1878
1879void snapshot_parallel_clone_test()
1880{
1881    TEST_INIT();
1882
1883    memleak_start();
1884
1885    int i, r;
1886    int num_cloners = 30; // parallel snapshot clone operations
1887    int n = 20480; // 10 dirty wal flushes at least
1888    fdb_file_handle *dbfile;
1889    fdb_kvs_handle *db;
1890    fdb_kvs_handle *snap_inmem;
1891    fdb_doc **doc = alca(fdb_doc*, n);
1892    thread_t *tid = alca(thread_t, num_cloners);
1893    void *thread_ret;
1894    fdb_status status;
1895    struct parallel_clone_t clone_data;
1896
1897    char keybuf[256], bodybuf[256];
1898
1899    // remove previous mvcc_test files
1900    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1901    (void)r;
1902
1903    fdb_config fconfig = fdb_get_default_config();
1904    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1905    fconfig.buffercache_size = 0;
1906    fconfig.wal_threshold = 1024;
1907    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1908    fconfig.compaction_threshold = 0;
1909
1910    // remove previous mvcc_test files
1911    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1912    (void)r;
1913
1914    // open db
1915    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
1916    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1917    TEST_CHK(status == FDB_RESULT_SUCCESS);
1918
1919    status = fdb_set_log_callback(db, logCallbackFunc,
1920                                  (void *) "snapshot_parallel_clone_test");
1921    TEST_CHK(status == FDB_RESULT_SUCCESS);
1922    // ------- Setup test ----------------------------------
1923    for (i=0; i<n/2; i++){
1924        sprintf(keybuf, "key%5d", i);
1925        sprintf(bodybuf, "body%d", i);
1926        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1927            NULL, 0, (void*)bodybuf, strlen(bodybuf));
1928        fdb_set(db, doc[i]);
1929    }
1930
1931    // Initialize an in-memory snapshot Without a Commit...
1932    // WAL items are not flushed...
1933    status = fdb_snapshot_open(db, &snap_inmem, FDB_SNAPSHOT_INMEM);
1934    TEST_CHK(status == FDB_RESULT_SUCCESS);
1935
1936    clone_data.doc = doc;
1937    clone_data.num_docs = n;
1938    clone_data.snap_db = snap_inmem;
1939
1940    for (int j = num_cloners - 1; j>=0; --j) {
1941        thread_create(&tid[j], snap_clone_thread, &clone_data);
1942    }
1943
1944    for (; i < n; i++){
1945        sprintf(keybuf, "key%5d", i);
1946        sprintf(bodybuf, "BODY%d", i);
1947        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1948            NULL, 0, (void*)bodybuf, strlen(bodybuf));
1949        fdb_set(db, doc[i]);
1950    }
1951    // commit without a WAL flush (This WAL must not affect snapshot)
1952    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1953
1954    for (int j = num_cloners - 1; j>=0; --j) {
1955        thread_join(tid[j], &thread_ret);
1956    }
1957
1958    // close db handle
1959    fdb_kvs_close(db);
1960    // close the in-memory snapshot handle
1961    fdb_kvs_close(snap_inmem);
1962    // close db file
1963    fdb_close(dbfile);
1964
1965    // free all documents
1966    for (i=0;i<n;++i){
1967        fdb_doc_free(doc[i]);
1968    }
1969
1970    // free all resources
1971    fdb_shutdown();
1972
1973    memleak_end();
1974
1975    TEST_RESULT("snapshot parallel clone test");
1976}
1977
1978void snapshot_markers_in_file_test(bool multi_kv)
1979{
1980    TEST_INIT();
1981
1982    memleak_start();
1983
1984    int i, r;
1985    int n = 20;
1986    int num_kvs = 4; // keep this the same as number of fdb_commit() calls
1987    fdb_file_handle *dbfile;
1988    fdb_kvs_handle **db = alca(fdb_kvs_handle *, num_kvs);
1989    fdb_doc **doc = alca(fdb_doc*, n);
1990    fdb_status status;
1991    fdb_snapshot_info_t *markers;
1992    uint64_t num_markers;
1993
1994    char keybuf[256], metabuf[256], bodybuf[256];
1995    char kv_name[8];
1996
1997    fdb_config fconfig = fdb_get_default_config();
1998    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1999    fconfig.buffercache_size = 0;
2000    fconfig.wal_threshold = 1024;
2001    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2002    fconfig.compaction_threshold = 0;
2003    fconfig.multi_kv_instances = multi_kv;
2004
2005    // remove previous mvcc_test files
2006    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2007    (void)r;
2008
2009    // open db
2010    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2011    if (multi_kv) {
2012        for (r = 0; r < num_kvs; ++r) {
2013            sprintf(kv_name, "kv%d", r);
2014            fdb_kvs_open(dbfile, &db[r], kv_name, &kvs_config);
2015        }
2016    } else {
2017        num_kvs = 1;
2018        fdb_kvs_open_default(dbfile, &db[0], &kvs_config);
2019    }
2020
2021   // ------- Setup test ----------------------------------
2022   // insert documents of 0-4
2023    for (i=0; i<n/4; i++){
2024        sprintf(keybuf, "key%d", i);
2025        sprintf(metabuf, "meta%d", i);
2026        sprintf(bodybuf, "body%d", i);
2027        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2028            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2029        for (r = 0; r < num_kvs; ++r) {
2030            fdb_set(db[r], doc[i]);
2031        }
2032    }
2033
2034    // commit with a manual WAL flush (these docs go into HB-trie)
2035    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2036
2037    // insert documents from 5 - 9
2038    for (; i < n/2; i++){
2039        sprintf(keybuf, "key%d", i);
2040        sprintf(metabuf, "meta%d", i);
2041        sprintf(bodybuf, "body%d", i);
2042        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2043            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2044        for (r = 0; r < num_kvs; ++r) {
2045            fdb_set(db[r], doc[i]);
2046        }
2047    }
2048
2049    // commit again without a WAL flush
2050    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2051
2052    // insert documents from 10-14 into HB-trie
2053    for (; i < (n/2 + n/4); i++){
2054        sprintf(keybuf, "key%d", i);
2055        sprintf(metabuf, "meta%d", i);
2056        sprintf(bodybuf, "body%d", i);
2057        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2058            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2059        for (r = 0; r < num_kvs; ++r) {
2060            fdb_set(db[r], doc[i]);
2061        }
2062    }
2063    // manually flush WAL & commit
2064    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2065
2066    // insert documents from 15 - 19 on file into the WAL
2067    for (; i < n; i++){
2068        sprintf(keybuf, "key%d", i);
2069        sprintf(metabuf, "meta%d", i);
2070        sprintf(bodybuf, "body%d", i);
2071        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2072            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2073        for (r = 0; r < num_kvs; ++r) {
2074            fdb_set(db[r], doc[i]);
2075        }
2076    }
2077    // commit without a WAL flush
2078    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2079
2080    for (r = 0; r < num_kvs; ++r) {
2081        status = fdb_set_log_callback(db[r], logCallbackFunc,
2082                                      (void *) "snapshot_markers_in_file");
2083        TEST_CHK(status == FDB_RESULT_SUCCESS);
2084    }
2085
2086    status = fdb_get_all_snap_markers(dbfile, &markers, &num_markers);
2087    TEST_CHK(status == FDB_RESULT_SUCCESS);
2088
2089    if (!multi_kv) {
2090        TEST_CHK(num_markers == 4);
2091        for (r = 0; r < num_kvs; ++r) {
2092            TEST_CHK(markers[r].num_kvs_markers == 1);
2093            TEST_CHK(markers[r].kvs_markers[0].seqnum
2094                     == (fdb_seqnum_t)(n - r*5));
2095        }
2096    } else {
2097        TEST_CHK(num_markers == 8);
2098        for (r = 0; r < num_kvs; ++r) {
2099            TEST_CHK(markers[r].num_kvs_markers == num_kvs);
2100            for (i = 0; i < num_kvs; ++i) {
2101                TEST_CHK(markers[r].kvs_markers[i].seqnum
2102                         == (fdb_seqnum_t)(n - r*5));
2103                sprintf(kv_name, "kv%d", i);
2104                TEST_CMP(markers[r].kvs_markers[i].kv_store_name, kv_name, 3);
2105            }
2106        }
2107    }
2108
2109    status = fdb_free_snap_markers(markers, num_markers);
2110    TEST_CHK(status == FDB_RESULT_SUCCESS);
2111
2112    // close db file
2113    fdb_close(dbfile);
2114
2115    // free all documents
2116    for (i=0;i<n;++i){
2117        fdb_doc_free(doc[i]);
2118    }
2119
2120    // free all resources
2121    fdb_shutdown();
2122
2123    memleak_end();
2124
2125    sprintf(bodybuf, "snapshot markers in file test %s", multi_kv ?
2126                                                        "multiple kv mode:"
2127                                                      : "single kv mode:");
2128    TEST_RESULT(bodybuf);
2129}
2130
2131void rollback_forward_seqnum()
2132{
2133
2134    TEST_INIT();
2135    memleak_start();
2136
2137    int r;
2138    int i, n=100;
2139    int rb1_seqnum, rb2_seqnum;
2140    char keybuf[256];
2141    char setop[3];
2142    fdb_file_handle *dbfile;
2143    fdb_iterator *it;
2144    fdb_kvs_handle *kv1, *mirror_kv1;
2145    fdb_kvs_info info;
2146    fdb_config fconfig = fdb_get_default_config();
2147    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2148    fdb_doc **doc = alca(fdb_doc*, n);
2149    fdb_doc *rdoc = NULL;
2150    fdb_status status;
2151    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2152    (void)r;
2153
2154    fconfig.wal_threshold = 1024;
2155    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2156    fconfig.compaction_threshold = 0;
2157
2158    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2159    fdb_kvs_open(dbfile, &kv1, "kv1", &kvs_config);
2160    fdb_kvs_open(dbfile, &mirror_kv1, NULL, &kvs_config);
2161
2162
2163    // set n docs within both dbs
2164    for(i=0;i<n;++i){
2165        sprintf(keybuf, "key%d", i);
2166        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2167            NULL, 0, NULL, 0);
2168        fdb_set(kv1, doc[i]);
2169        fdb_set_kv(mirror_kv1, keybuf, strlen(keybuf), setop, 3);
2170    }
2171
2172    // commit and save seqnum1
2173    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2174    fdb_get_kvs_info(kv1, &info);
2175    rb1_seqnum = info.last_seqnum;
2176
2177    // delete all docs in kv1
2178    for(i=0;i<n;++i){
2179        fdb_del(kv1, doc[i]);
2180    }
2181
2182    // commit and save seqnum2
2183    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2184    fdb_get_kvs_info(kv1, &info);
2185    rb2_seqnum = info.last_seqnum;
2186
2187
2188    // sets again
2189    for(i=0;i<n;++i){
2190        doc[i]->deleted = false;
2191        fdb_set(kv1, doc[i]);
2192    }
2193
2194    // commit
2195    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2196
2197    // rollback to first seqnum
2198    status = fdb_rollback(&kv1, rb1_seqnum);
2199    TEST_CHK(status == FDB_RESULT_SUCCESS);
2200
2201    // rollback to second seqnum
2202    status = fdb_rollback(&kv1, rb2_seqnum);
2203    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
2204
2205    status = fdb_iterator_sequence_init(mirror_kv1, &it, 0, 0, FDB_ITR_NONE);
2206    TEST_CHK(status == FDB_RESULT_SUCCESS);
2207
2208    do {
2209        status = fdb_iterator_get(it, &rdoc);
2210        TEST_CHK(status == FDB_RESULT_SUCCESS);
2211        status = fdb_get_metaonly_byseq(kv1, rdoc);
2212        TEST_CHK(status == FDB_RESULT_SUCCESS);
2213        TEST_CHK(rdoc->deleted == false);
2214        fdb_doc_free(rdoc);
2215        rdoc = NULL;
2216    } while(fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
2217
2218    for (i=0;i<n;++i){
2219        fdb_doc_free(doc[i]);
2220    }
2221    fdb_iterator_close(it);
2222    fdb_kvs_close(kv1);
2223    fdb_close(dbfile);
2224    fdb_shutdown();
2225    memleak_end();
2226
2227    TEST_RESULT("rollback forward seqnum");
2228}
2229
2230void rollback_test(bool multi_kv)
2231{
2232    TEST_INIT();
2233
2234    memleak_start();
2235
2236    int i, r;
2237    int n = 20;
2238    int count;
2239    fdb_file_handle *dbfile, *dbfile_txn;
2240    fdb_kvs_handle *db, *db_txn;
2241    fdb_seqnum_t rollback_seq;
2242    fdb_doc **doc = alca(fdb_doc*, n);
2243    fdb_doc *rdoc = NULL;
2244    fdb_kvs_info kvs_info;
2245    fdb_status status;
2246    fdb_iterator *iterator;
2247
2248    char keybuf[256], metabuf[256], bodybuf[256];
2249
2250    // remove previous mvcc_test files
2251    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2252    (void)r;
2253
2254    fdb_config fconfig = fdb_get_default_config();
2255    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2256    fconfig.buffercache_size = 0;
2257    fconfig.wal_threshold = 1024;
2258    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2259    fconfig.compaction_threshold = 0;
2260    fconfig.multi_kv_instances = multi_kv;
2261
2262    // remove previous mvcc_test files
2263    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2264    (void)r;
2265
2266    // open db
2267    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2268    if (multi_kv) {
2269        fdb_kvs_open_default(dbfile, &db, &kvs_config);
2270    } else {
2271        fdb_kvs_open(dbfile, &db, NULL, &kvs_config);
2272    }
2273
2274   // ------- Setup test ----------------------------------
2275   // insert documents of 0-4
2276    for (i=0; i<n/4; i++){
2277        sprintf(keybuf, "key%d", i);
2278        sprintf(metabuf, "meta%d", i);
2279        sprintf(bodybuf, "body%d", i);
2280        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2281            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2282        fdb_set(db, doc[i]);
2283    }
2284
2285    // commit with a manual WAL flush (these docs go into HB-trie)
2286    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2287
2288    // insert documents from 4 - 9
2289    for (; i < n/2; i++){
2290        sprintf(keybuf, "key%d", i);
2291        sprintf(metabuf, "meta%d", i);
2292        sprintf(bodybuf, "body%d", i);
2293        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2294            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2295        fdb_set(db, doc[i]);
2296    }
2297
2298    // commit again without a WAL flush
2299    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2300
2301    // ROLLBACK POINT: pick up sequence number of a commit without a WAL flush
2302    rollback_seq = doc[i-1]->seqnum;
2303
2304    // insert documents from 10-14 into HB-trie
2305    for (; i < (n/2 + n/4); i++){
2306        sprintf(keybuf, "key%d", i);
2307        sprintf(metabuf, "meta%d", i);
2308        sprintf(bodybuf, "body%d", i);
2309        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2310            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2311        fdb_set(db, doc[i]);
2312    }
2313    // manually flush WAL & commit
2314    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2315
2316    // insert documents from 15 - 19 on file into the WAL
2317    for (; i < n; i++){
2318        sprintf(keybuf, "key%d", i);
2319        sprintf(metabuf, "meta%d", i);
2320        sprintf(bodybuf, "body%d", i);
2321        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2322            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2323        fdb_set(db, doc[i]);
2324    }
2325    // commit without a WAL flush
2326    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2327
2328    status = fdb_set_log_callback(db, logCallbackFunc,
2329                                  (void *) "rollback_test");
2330    TEST_CHK(status == FDB_RESULT_SUCCESS);
2331
2332    // ---------- Rollback tests begin -----------------------
2333    // We have DB file with 5 HB-trie docs, 5 unflushed WAL docs occuring twice
2334    // Attempt to rollback to out-of-range marker..
2335    status = fdb_rollback(&db, 999999);
2336    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
2337
2338    // Open another handle & begin transaction
2339    fdb_open(&dbfile_txn, "./mvcc_test1", &fconfig);
2340    if (multi_kv) {
2341        fdb_kvs_open_default(dbfile_txn, &db_txn, &kvs_config);
2342    } else {
2343        fdb_kvs_open(dbfile_txn, &db_txn, NULL, &kvs_config);
2344    }
2345    fdb_begin_transaction(dbfile_txn, FDB_ISOLATION_READ_COMMITTED);
2346    // Attempt to rollback while the transaction is active
2347    status =  fdb_rollback(&db, rollback_seq);
2348    // Must fail
2349    TEST_CHK(status == FDB_RESULT_FAIL_BY_TRANSACTION);
2350    fdb_abort_transaction(dbfile_txn);
2351    fdb_kvs_close(db_txn);
2352    fdb_close(dbfile_txn);
2353
2354    // Rollback to saved marker from above
2355    status = fdb_rollback(&db, rollback_seq);
2356    TEST_CHK(status == FDB_RESULT_SUCCESS);
2357
2358    // check handle's sequence number
2359    fdb_get_kvs_info(db, &kvs_info);
2360    TEST_CHK(kvs_info.last_seqnum == rollback_seq);
2361
2362    // Modify an item and update into the rollbacked file..
2363    i = n/2;
2364    *(char *)doc[i]->body = 'B';
2365    fdb_set(db, doc[i]);
2366    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2367
2368    // create an iterator on the rollback for full range
2369    fdb_iterator_sequence_init(db, &iterator, 0, 0, FDB_ITR_NONE);
2370
2371    // repeat until fail
2372    i=0;
2373    count=0;
2374    do {
2375        status = fdb_iterator_get(iterator, &rdoc);
2376        TEST_CHK(status == FDB_RESULT_SUCCESS);
2377
2378        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
2379        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
2380        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
2381
2382        fdb_doc_free(rdoc);
2383        rdoc = NULL;
2384        i ++;
2385        count++;
2386    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
2387
2388    TEST_CHK(count==n/2 + 1); // Items from first half and newly set item
2389
2390    fdb_iterator_close(iterator);
2391
2392    // close db file
2393    fdb_kvs_close(db);
2394    fdb_close(dbfile);
2395
2396    // free all documents
2397    for (i=0;i<n;++i){
2398        fdb_doc_free(doc[i]);
2399    }
2400
2401    // free all resources
2402    fdb_shutdown();
2403
2404    memleak_end();
2405
2406    sprintf(bodybuf, "rollback test %s", multi_kv ? "multiple kv mode:"
2407                                                  : "single kv mode:");
2408    TEST_RESULT(bodybuf);
2409}
2410
2411
2412
2413void rollback_and_snapshot_test()
2414{
2415    TEST_INIT();
2416
2417    memleak_start();
2418
2419    fdb_seqnum_t seqnum, rollback_seqnum;
2420    fdb_kvs_info kvs_info;
2421    fdb_status status;
2422    fdb_config config;
2423    fdb_kvs_config kvs_config;
2424    fdb_file_handle *dbfile;
2425    fdb_kvs_handle *db,  *snapshot;
2426    int r;
2427
2428    // remove previous mvcc_test files
2429    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2430    (void)r;
2431
2432    // MB-12530 open db
2433    config = fdb_get_default_config();
2434    kvs_config = fdb_get_default_kvs_config();
2435    status = fdb_open(&dbfile, "mvcc_test", &config);
2436    TEST_CHK(status == FDB_RESULT_SUCCESS);
2437    status = fdb_kvs_open(dbfile, &db, NULL, &kvs_config);
2438    TEST_CHK(status == FDB_RESULT_SUCCESS);
2439
2440    // 2. Create Key 'a' and Commit
2441    status = fdb_set_kv(db, (void *) "a", 1, (void *)"val-a", 5);
2442    TEST_CHK(status == FDB_RESULT_SUCCESS);
2443    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2444    TEST_CHK(status == FDB_RESULT_SUCCESS);
2445
2446    status = fdb_get_kvs_info(db, &kvs_info);
2447    TEST_CHK(status == FDB_RESULT_SUCCESS);
2448
2449    // 3. Create Key 'b' and Commit
2450    status = fdb_set_kv(db, (void *)"b", 1, (void *)"val-b", 5);
2451    TEST_CHK(status == FDB_RESULT_SUCCESS);
2452    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2453    TEST_CHK(status == FDB_RESULT_SUCCESS);
2454
2455    status = fdb_get_kvs_info(db, &kvs_info);
2456    TEST_CHK(status == FDB_RESULT_SUCCESS);
2457    seqnum = kvs_info.last_seqnum;
2458
2459    // 4.  Remember this as our rollback point
2460    rollback_seqnum = seqnum;
2461
2462    // 5. Create Key 'c' and Commit
2463    status = fdb_set_kv(db, (void *)"c", 1,(void *) "val-c", 5);
2464    TEST_CHK(status == FDB_RESULT_SUCCESS);
2465    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2466    TEST_CHK(status == FDB_RESULT_SUCCESS);
2467
2468    status = fdb_get_kvs_info(db, &kvs_info);
2469    TEST_CHK(status == FDB_RESULT_SUCCESS);
2470
2471    // 6. Rollback to rollback point (seq 2)
2472    status = fdb_rollback(&db, rollback_seqnum);
2473    TEST_CHK(status == FDB_RESULT_SUCCESS);
2474
2475    status = fdb_get_kvs_info(db, &kvs_info);
2476    TEST_CHK(status == FDB_RESULT_SUCCESS);
2477    seqnum = kvs_info.last_seqnum;
2478
2479    // 7. Verify that Key 'c' is not found
2480    void *val;
2481    size_t vallen;
2482    status = fdb_get_kv(db, (void *)"c", 1, &val, &vallen);
2483    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
2484
2485    // 8. Open a snapshot at the same point
2486    status = fdb_snapshot_open(db, &snapshot, seqnum);
2487    TEST_CHK(status == FDB_RESULT_SUCCESS);
2488
2489    // 9. Verify that Key 'c' is not found
2490    status = fdb_get_kv(snapshot, (void *)"c", 1, &val, &vallen);
2491    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
2492
2493    // close the snapshot db
2494    fdb_kvs_close(snapshot);
2495
2496    // close db file
2497    fdb_close(dbfile);
2498
2499    // free all resources
2500    fdb_shutdown();
2501
2502    memleak_end();
2503
2504    TEST_RESULT("rollback and snapshot test");
2505}
2506
2507void rollback_ncommits()
2508{
2509
2510    TEST_INIT();
2511    memleak_start();
2512
2513    int r;
2514    int i, j, n=100;
2515    int ncommits=10;
2516    char keybuf[256];
2517    fdb_file_handle *dbfile;
2518    fdb_kvs_handle *kv1, *kv2;
2519    fdb_kvs_info info;
2520    fdb_config fconfig = fdb_get_default_config();
2521    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2522    fdb_status status;
2523    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2524    (void)r;
2525
2526    fconfig.wal_threshold = 1024;
2527    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2528    fconfig.compaction_threshold = 0;
2529
2530    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2531    fdb_kvs_open(dbfile, &kv1, "kv1", &kvs_config);
2532    fdb_kvs_open(dbfile, &kv2, NULL, &kvs_config);
2533
2534
2535    for(j=0;j<ncommits;++j){
2536
2537        // set n docs per commit
2538        for(i=0;i<n;++i){
2539            sprintf(keybuf, "key%02d%03d", j, i);
2540            fdb_set_kv(kv1, keybuf, strlen(keybuf), NULL, 0);
2541            fdb_set_kv(kv2, keybuf, strlen(keybuf), NULL, 0);
2542        }
2543        // alternate commit pattern
2544        if((j % 2) == 0){
2545            fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2546        } else {
2547            fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2548        }
2549
2550        // doc_count should match seqnum since they are unique
2551        fdb_get_kvs_info(kv1, &info);
2552        TEST_CHK(info.doc_count == info.last_seqnum);
2553    }
2554
2555    // iteratively rollback 5 commits
2556     for(j=ncommits;j>0;--j){
2557        status = fdb_rollback(&kv1, j*n);
2558        TEST_CHK(status == FDB_RESULT_SUCCESS);
2559
2560        // check rollback doc_count
2561        fdb_get_kvs_info(kv1, &info);
2562        TEST_CHK(info.doc_count == info.last_seqnum);
2563    }
2564
2565    fdb_kvs_close(kv1);
2566    fdb_close(dbfile);
2567    fdb_shutdown();
2568    memleak_end();
2569
2570    TEST_RESULT("rollback n commits");
2571}
2572
2573void transaction_test()
2574{
2575    TEST_INIT();
2576
2577    memleak_start();
2578
2579    int i, r;
2580    int n = 10;
2581    fdb_file_handle *dbfile, *dbfile_txn1, *dbfile_txn2, *dbfile_txn3;
2582    fdb_kvs_handle *db, *db_txn1, *db_txn2, *db_txn3;
2583    fdb_doc **doc = alca(fdb_doc*, n);
2584    fdb_doc *rdoc;
2585    fdb_status status;
2586
2587    char keybuf[256], metabuf[256], bodybuf[256];
2588
2589    // remove previous mvcc_test files
2590    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2591    (void)r;
2592
2593    fdb_config fconfig = fdb_get_default_config();
2594    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2595    fconfig.buffercache_size = 0;
2596    fconfig.wal_threshold = 1024;
2597    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2598    fconfig.purging_interval = 0;
2599    fconfig.compaction_threshold = 0;
2600
2601    // open db
2602    fdb_open(&dbfile, "mvcc_test1", &fconfig);
2603    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2604    status = fdb_set_log_callback(db, logCallbackFunc,
2605                                  (void *) "transaction_test");
2606    TEST_CHK(status == FDB_RESULT_SUCCESS);
2607
2608    // open db and begin transactions
2609    fdb_open(&dbfile_txn1, "mvcc_test1", &fconfig);
2610    fdb_open(&dbfile_txn2, "mvcc_test1", &fconfig);
2611    fdb_kvs_open_default(dbfile_txn1, &db_txn1, &kvs_config);
2612    fdb_kvs_open_default(dbfile_txn2, &db_txn2, &kvs_config);
2613    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
2614    fdb_begin_transaction(dbfile_txn2, FDB_ISOLATION_READ_COMMITTED);
2615
2616    // insert half docs into txn1
2617    for (i=0;i<n/2;++i){
2618        sprintf(keybuf, "key%d", i);
2619        sprintf(metabuf, "meta%d", i);
2620        sprintf(bodybuf, "body%d", i);
2621        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2622                                (void*)metabuf, strlen(metabuf),
2623                                (void*)bodybuf, strlen(bodybuf));
2624        fdb_set(db_txn1, doc[i]);
2625    }
2626
2627    // insert other half docs into txn2
2628    for (i=n/2;i<n;++i){
2629        sprintf(keybuf, "key%d", i);
2630        sprintf(metabuf, "meta%d", i);
2631        sprintf(bodybuf, "body%d", i);
2632        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2633                                (void*)metabuf, strlen(metabuf),
2634                                (void*)bodybuf, strlen(bodybuf));
2635        fdb_set(db_txn2, doc[i]);
2636    }
2637
2638    // uncommitted docs should not be read by the other transaction that
2639    // doesn't allow uncommitted reads.
2640    for (i=0;i<n;++i){
2641        sprintf(keybuf, "key%d", i);
2642        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2643        status = fdb_get(db_txn1, rdoc);
2644        if (i<n/2) {
2645            TEST_CHK(status == FDB_RESULT_SUCCESS);
2646        } else {
2647            TEST_CHK(status != FDB_RESULT_SUCCESS);
2648        }
2649        fdb_doc_free(rdoc);
2650    }
2651
2652    // uncommitted docs can be read by the transaction that allows uncommitted reads.
2653    fdb_open(&dbfile_txn3, "mvcc_test1", &fconfig);
2654    fdb_kvs_open_default(dbfile_txn3, &db_txn3, &kvs_config);
2655    fdb_begin_transaction(dbfile_txn3, FDB_ISOLATION_READ_UNCOMMITTED);
2656    for (i=0;i<n;++i){
2657        sprintf(keybuf, "key%d", i);
2658        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2659        status = fdb_get(db_txn3, rdoc);
2660        TEST_CHK(status == FDB_RESULT_SUCCESS);
2661        fdb_doc_free(rdoc);
2662    }
2663    fdb_end_transaction(dbfile_txn3, FDB_COMMIT_NORMAL);
2664    fdb_close(dbfile_txn3);
2665
2666    // commit and end txn1
2667    fdb_end_transaction(dbfile_txn1, FDB_COMMIT_NORMAL);
2668
2669    // uncommitted docs should not be read generally
2670    for (i=0;i<n;++i){
2671        sprintf(keybuf, "key%d", i);
2672        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2673        status = fdb_get(db, rdoc);
2674        if (i<n/2) {
2675            TEST_CHK(status == FDB_RESULT_SUCCESS);
2676        } else {
2677            TEST_CHK(status != FDB_RESULT_SUCCESS);
2678        }
2679        fdb_doc_free(rdoc);
2680    }
2681
2682    // read uncommitted docs using the same transaction
2683    for (i=0;i<n;++i){
2684        sprintf(keybuf, "key%d", i);
2685        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2686        status = fdb_get(db_txn2, rdoc);
2687        TEST_CHK(status == FDB_RESULT_SUCCESS);
2688        fdb_doc_free(rdoc);
2689    }
2690
2691    // abort txn2
2692    fdb_abort_transaction(dbfile_txn2);
2693
2694    // close & re-open db file
2695    fdb_close(dbfile);
2696    fdb_open(&dbfile, "mvcc_test1", &fconfig);
2697    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2698    status = fdb_set_log_callback(db, logCallbackFunc,
2699                                  (void *) "transaction_test");
2700    TEST_CHK(status == FDB_RESULT_SUCCESS);
2701
2702    // uncommitted docs should not be read after reopening the file either
2703    for (i=0;i<n;++i){
2704        sprintf(keybuf, "key%d", i);
2705        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2706        status = fdb_get(db, rdoc);
2707        if (i<n/2) {
2708            TEST_CHK(status == FDB_RESULT_SUCCESS);
2709        } else {
2710            TEST_CHK(status != FDB_RESULT_SUCCESS);
2711        }
2712        fdb_doc_free(rdoc);
2713    }
2714
2715    // insert other half docs & commit
2716    for (i=n/2;i<n;++i){
2717        fdb_set(db, doc[i]);
2718    }
2719    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2720
2721    // now all docs can be read generally
2722    for (i=0;i<n;++i){
2723        sprintf(keybuf, "key%d", i);
2724        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2725        status = fdb_get(db, rdoc);
2726        TEST_CHK(status == FDB_RESULT_SUCCESS);
2727        fdb_doc_free(rdoc);
2728    }
2729
2730    // begin transactions
2731    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
2732    fdb_begin_transaction(dbfile_txn2, FDB_ISOLATION_READ_COMMITTED);
2733
2734    // concurrently update docs
2735    for (i=0;i<n;++i){
2736        sprintf(keybuf, "key%d", i);
2737        sprintf(metabuf, "meta%d", i);
2738        sprintf(bodybuf, "body%d_txn1", i);
2739        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf),
2740                                (void*)metabuf, strlen(metabuf),
2741                                (void*)bodybuf, strlen(bodybuf));
2742        fdb_set(db_txn1, rdoc);
2743        fdb_doc_free(rdoc);
2744
2745        sprintf(bodybuf, "body%d_txn2", i);
2746        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf),
2747                                (void*)metabuf, strlen(metabuf),
2748                                (void*)bodybuf, strlen(bodybuf));
2749        fdb_set(db_txn2, rdoc);
2750        fdb_doc_free(rdoc);
2751    }
2752
2753    // retrieve check
2754    for (i=0;i<n;++i){
2755        // general retrieval
2756        sprintf(keybuf, "key%d", i);
2757        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2758        status = fdb_get(db, rdoc);
2759        TEST_CHK(status == FDB_RESULT_SUCCESS);
2760        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
2761        fdb_doc_free(rdoc);
2762
2763        // get from txn1
2764        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2765        status = fdb_get(db_txn1, rdoc);
2766        TEST_CHK(status == FDB_RESULT_SUCCESS);
2767        sprintf(bodybuf, "body%d_txn1", i);
2768        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2769        fdb_doc_free(rdoc);
2770
2771        // get from txn2
2772        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2773        status = fdb_get(db_txn2, rdoc);
2774        TEST_CHK(status == FDB_RESULT_SUCCESS);
2775        sprintf(bodybuf, "body%d_txn2", i);
2776        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2777        fdb_doc_free(rdoc);
2778    }
2779
2780    // commit txn2 & retrieve check
2781    fdb_end_transaction(dbfile_txn2, FDB_COMMIT_NORMAL);
2782    for (i=0;i<n;++i){
2783        // general retrieval
2784        sprintf(keybuf, "key%d", i);
2785        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2786        status = fdb_get(db, rdoc);
2787        TEST_CHK(status == FDB_RESULT_SUCCESS);
2788        sprintf(bodybuf, "body%d_txn2", i);
2789        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2790        fdb_doc_free(rdoc);
2791
2792        // get from txn1
2793        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2794        status = fdb_get(db_txn1, rdoc);
2795        TEST_CHK(status == FDB_RESULT_SUCCESS);
2796        sprintf(bodybuf, "body%d_txn1", i);
2797        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2798        fdb_doc_free(rdoc);
2799    }
2800
2801    // commit txn1 & retrieve check
2802    fdb_end_transaction(dbfile_txn1, FDB_COMMIT_NORMAL);
2803    for (i=0;i<n;++i){
2804        // general retrieval
2805        sprintf(keybuf, "key%d", i);
2806        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2807        status = fdb_get(db, rdoc);
2808        TEST_CHK(status == FDB_RESULT_SUCCESS);
2809        sprintf(bodybuf, "body%d_txn1", i);
2810        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2811        fdb_doc_free(rdoc);
2812    }
2813
2814    // begin new transaction
2815    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
2816    // update doc#5
2817    i = 5;
2818    sprintf(keybuf, "key%d", i);
2819    sprintf(metabuf, "meta%d", i);
2820    sprintf(bodybuf, "body%d_before_compaction", i);
2821    fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf),
2822                            (void*)metabuf, strlen(metabuf),
2823                            (void*)bodybuf, strlen(bodybuf));
2824    fdb_set(db_txn1, rdoc);
2825    fdb_doc_free(rdoc);
2826
2827    // do compaction
2828    fdb_compact(dbfile, "mvcc_test2");
2829
2830    // retrieve doc#5
2831    // using txn1
2832    fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2833    status = fdb_get(db_txn1, rdoc);
2834    TEST_CHK(status == FDB_RESULT_SUCCESS);
2835    TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2836    fdb_doc_free(rdoc);
2837
2838    // general retrieval
2839    fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2840    status = fdb_get(db, rdoc);
2841    TEST_CHK(status == FDB_RESULT_SUCCESS);
2842    sprintf(bodybuf, "body%d_txn1", i);
2843    TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2844    fdb_doc_free(rdoc);
2845
2846    // commit transaction
2847    fdb_end_transaction(dbfile_txn1, FDB_COMMIT_NORMAL);
2848    // retrieve check
2849    for (i=0;i<n;++i){
2850        // general get
2851        sprintf(keybuf, "key%d", i);
2852        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2853        status = fdb_get(db, rdoc);
2854        TEST_CHK(status == FDB_RESULT_SUCCESS);
2855        if (i != 5) {
2856            sprintf(bodybuf, "body%d_txn1", i);
2857        } else {
2858            sprintf(bodybuf, "body%d_before_compaction", i);
2859        }
2860        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2861        fdb_doc_free(rdoc);
2862    }
2863
2864    // close & re-open db file
2865    fdb_close(dbfile);
2866    fdb_open(&dbfile, "mvcc_test2", &fconfig);
2867    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2868    status = fdb_set_log_callback(db, logCallbackFunc,
2869                                  (void *) "transaction_test");
2870    TEST_CHK(status == FDB_RESULT_SUCCESS);
2871    // retrieve check again
2872    for (i=0;i<n;++i){
2873        // general get
2874        sprintf(keybuf, "key%d", i);
2875        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2876        status = fdb_get(db, rdoc);
2877        TEST_CHK(status == FDB_RESULT_SUCCESS);
2878        if (i != 5) {
2879            sprintf(bodybuf, "body%d_txn1", i);
2880        } else {
2881            sprintf(bodybuf, "body%d_before_compaction", i);
2882        }
2883        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2884        fdb_doc_free(rdoc);
2885    }
2886
2887    // close db file
2888    fdb_close(dbfile);
2889    fdb_close(dbfile_txn1);
2890    fdb_close(dbfile_txn2);
2891
2892    // free all documents
2893    for (i=0;i<n;++i){
2894        fdb_doc_free(doc[i]);
2895    }
2896
2897    // free all resources
2898    fdb_shutdown();
2899
2900    memleak_end();
2901
2902    TEST_RESULT("transaction test");
2903}
2904
2905void transaction_simple_api_test()
2906{
2907    TEST_INIT();
2908
2909    memleak_start();
2910
2911    int i, r;
2912    int n = 10;
2913    size_t valuelen;
2914    void *value;
2915    fdb_file_handle *dbfile, *dbfile_txn1, *dbfile_txn2;
2916    fdb_kvs_handle *db, *db_txn1, *db_txn2;
2917    fdb_status status;
2918
2919    char keybuf[256], bodybuf[256];
2920
2921    // remove previous mvcc_test files
2922    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2923    (void)r;
2924
2925    fdb_config fconfig = fdb_get_default_config();
2926    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2927    fconfig.buffercache_size = 0;
2928    fconfig.wal_threshold = 1024;
2929    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2930    fconfig.purging_interval = 0;
2931    fconfig.compaction_threshold = 0;
2932
2933    // open db
2934    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2935    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2936    status = fdb_set_log_callback(db, logCallbackFunc,
2937                                  (void *) "transaction_simple_api_test");
2938    TEST_CHK(status == FDB_RESULT_SUCCESS);
2939
2940    // insert key-value pairs
2941    for (i=0;i<n;++i){
2942        sprintf(keybuf, "key%d", i);
2943        sprintf(bodybuf, "body%d", i);
2944        status = fdb_set_kv(db, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
2945        TEST_CHK(status == FDB_RESULT_SUCCESS);
2946    }
2947
2948    // commit
2949    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2950
2951    // open db and begin transactions
2952    fdb_open(&dbfile_txn1, "./mvcc_test1", &fconfig);
2953    fdb_open(&dbfile_txn2, "./mvcc_test1", &fconfig);
2954    fdb_kvs_open_default(dbfile_txn1, &db_txn1, &kvs_config);
2955    fdb_kvs_open_default(dbfile_txn2, &db_txn2, &kvs_config);
2956    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
2957    fdb_begin_transaction(dbfile_txn2, FDB_ISOLATION_READ_COMMITTED);
2958
2959    // concurrently update docs
2960    for (i=0;i<n;++i){
2961        sprintf(keybuf, "key%d", i);
2962        sprintf(bodybuf, "body%d_txn1", i);
2963        fdb_set_kv(db_txn1, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
2964
2965        sprintf(keybuf, "key%d", i);
2966        sprintf(bodybuf, "body%d_txn2", i);
2967        fdb_set_kv(db_txn2, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
2968    }
2969
2970    // retrieve key-value pairs
2971    for (i=0;i<n;++i){
2972        // general retrieval
2973        sprintf(keybuf, "key%d", i);
2974        sprintf(bodybuf, "body%d", i);
2975        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value, &valuelen);
2976        TEST_CHK(status == FDB_RESULT_SUCCESS);
2977        TEST_CMP(value, bodybuf, valuelen);
2978        status = fdb_free_block(value);
2979
2980        // txn1
2981        sprintf(keybuf, "key%d", i);
2982        sprintf(bodybuf, "body%d_txn1", i);
2983        status = fdb_get_kv(db_txn1, keybuf, strlen(keybuf), &value, &valuelen);
2984        TEST_CHK(status == FDB_RESULT_SUCCESS);
2985        TEST_CMP(value, bodybuf, valuelen);
2986        status = fdb_free_block(value);
2987
2988        // txn2
2989        sprintf(keybuf, "key%d", i);
2990        sprintf(bodybuf, "body%d_txn2", i);
2991        status = fdb_get_kv(db_txn2, keybuf, strlen(keybuf), &value, &valuelen);
2992        TEST_CHK(status == FDB_RESULT_SUCCESS);
2993        TEST_CMP(value, bodybuf, valuelen);
2994        status = fdb_free_block(value);
2995    }
2996
2997    // commit txn1
2998    fdb_end_transaction(dbfile_txn1, FDB_COMMIT_NORMAL);
2999    for (i=0;i<n;++i){
3000        // general retrieval
3001        sprintf(keybuf, "key%d", i);
3002        sprintf(bodybuf, "body%d_txn1", i);
3003        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value, &valuelen);
3004        TEST_CHK(status == FDB_RESULT_SUCCESS);
3005        TEST_CMP(value, bodybuf, valuelen);
3006        status = fdb_free_block(value);
3007
3008        // txn2
3009        sprintf(keybuf, "key%d", i);
3010        sprintf(bodybuf, "body%d_txn2", i);
3011        status = fdb_get_kv(db_txn2, keybuf, strlen(keybuf), &value, &valuelen);
3012        TEST_CHK(status == FDB_RESULT_SUCCESS);
3013        TEST_CMP(value, bodybuf, valuelen);
3014        status = fdb_free_block(value);
3015    }
3016
3017    // commit txn2
3018    fdb_end_transaction(dbfile_txn2, FDB_COMMIT_NORMAL);
3019    for (i=0;i<n;++i){
3020        // general retrieval
3021        sprintf(keybuf, "key%d", i);
3022        sprintf(bodybuf, "body%d_txn2", i);
3023        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value, &valuelen);
3024        TEST_CHK(status == FDB_RESULT_SUCCESS);
3025        TEST_CMP(value, bodybuf, valuelen);
3026        status = fdb_free_block(value);
3027    }
3028
3029    // close db file
3030    fdb_close(dbfile);
3031    fdb_close(dbfile_txn1);
3032    fdb_close(dbfile_txn2);
3033
3034    // free all resources
3035    fdb_shutdown();
3036
3037    memleak_end();
3038
3039    TEST_RESULT("transaction simple API test");
3040}
3041
3042
3043void rollback_prior_to_ops(bool walflush)
3044{
3045
3046    TEST_INIT();
3047    memleak_start();
3048
3049    int r;
3050    int i, j, n=100;
3051    int expected_doc_count = 0;
3052    int rb_seqnum;
3053    char keybuf[256], bodybuf[256];
3054    fdb_file_handle *dbfile;
3055    fdb_iterator *it;
3056    fdb_kvs_handle *kv1, *mirror_kv1;
3057    fdb_kvs_info info;
3058    fdb_config fconfig = fdb_get_default_config();
3059    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3060    fdb_doc **doc = alca(fdb_doc*, n);
3061    fdb_doc *rdoc = NULL, *vdoc;
3062    fdb_status status;
3063
3064    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
3065    (void)r;
3066
3067    fconfig.wal_threshold = 1024;
3068    fconfig.flags = FDB_OPEN_FLAG_CREATE;
3069    fconfig.compaction_threshold = 10;
3070
3071    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
3072    fdb_kvs_open(dbfile, &kv1, "kv1", &kvs_config);
3073
3074    if(walflush){
3075        fdb_kvs_open(dbfile, &mirror_kv1, NULL, &kvs_config);
3076    } else {
3077        fdb_kvs_open(dbfile, &mirror_kv1, "mirror", &kvs_config);
3078    }
3079
3080    // set n docs
3081    for(i=0;i<n;++i){
3082        sprintf(keybuf, "key%d", i);
3083        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3084            NULL, 0, NULL, 0);
3085        fdb_set(kv1, doc[i]);
3086        fdb_set(mirror_kv1, doc[i]);
3087        expected_doc_count++;
3088    }
3089
3090    // commit
3091    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3092
3093    // delete subset of recently loaded docs
3094    for(i=0;i<n/2;++i){
3095        fdb_del(kv1, doc[i]);
3096        fdb_del(mirror_kv1, doc[i]);
3097        expected_doc_count--;
3098        fdb_doc_free(doc[i]);
3099    }
3100
3101    for(i=n/2;i<n;++i){
3102        fdb_doc_free(doc[i]);
3103    }
3104
3105    // commit and save seqnum
3106    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3107    fdb_get_kvs_info(kv1, &info);
3108    TEST_CHK(info.doc_count == (size_t)expected_doc_count);
3109    rb_seqnum = info.last_seqnum;
3110
3111    for (j=0;j<100;++j){
3112        // load some more docs but stop mirroring
3113        for(i=0;i<n;++i){
3114            sprintf(keybuf, "key%d", n+i);
3115            fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3116                NULL, 0, NULL, 0);
3117            fdb_set(kv1, doc[i]);
3118            if( n%2 == 0){
3119                fdb_del(kv1, doc[i]);
3120            }
3121            fdb_doc_free(doc[i]);
3122        }
3123    }
3124
3125    // commit wal or normal
3126    if(walflush){
3127        fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
3128    } else {
3129        fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3130    }
3131
3132    // rollback
3133    status = fdb_rollback(&kv1, rb_seqnum);
3134    TEST_CHK(status == FDB_RESULT_SUCCESS);
3135
3136    fdb_get_kvs_info(kv1, &info);
3137    TEST_CHK(info.doc_count == (size_t)expected_doc_count);
3138    fdb_get_kvs_info(mirror_kv1, &info);
3139    TEST_CHK(info.doc_count == (size_t)expected_doc_count);
3140
3141    status = fdb_iterator_sequence_init(mirror_kv1, &it, 0, 0, FDB_ITR_NONE);
3142    TEST_CHK(status == FDB_RESULT_SUCCESS);
3143    do {
3144        status = fdb_iterator_get_metaonly(it, &rdoc);
3145        TEST_CHK(status == FDB_RESULT_SUCCESS);
3146
3147        fdb_doc_create(&vdoc, rdoc->key, rdoc->keylen,
3148                              rdoc->meta, rdoc->metalen,
3149                              rdoc->body, rdoc->bodylen);
3150        status = fdb_get(kv1, vdoc);
3151        if (rdoc->deleted){
3152            TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
3153        } else {
3154            TEST_CHK(status == FDB_RESULT_SUCCESS);
3155        }
3156        fdb_doc_free(rdoc);
3157        rdoc = NULL;
3158        fdb_doc_free(vdoc);
3159    } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
3160
3161    fdb_iterator_close(it);
3162    fdb_kvs_close(mirror_kv1);
3163    fdb_kvs_close(kv1);
3164    fdb_close(dbfile);
3165    fdb_shutdown();
3166    memleak_end();
3167
3168    sprintf(bodybuf, "rollback prior to ops test %s",
3169            walflush ? "commit with wal flush:" : "normal commit:");
3170    TEST_RESULT(bodybuf);
3171}
3172
3173struct cb_snapshot_args {
3174    fdb_kvs_handle *handle;
3175    int ndocs;
3176    int nupdates;
3177};
3178
3179static void _snapshot_check(fdb_kvs_handle *handle, int ndocs, int nupdates)
3180{
3181    TEST_INIT();
3182    int i, j, update_no;
3183    int commit_term = ndocs/2;
3184    char keybuf[256], bodybuf[256];
3185    char *value;
3186    size_t valuelen;
3187    fdb_kvs_handle *snap;
3188    fdb_status s;
3189    fdb_kvs_info info;
3190
3191    // check last seqnum
3192    fdb_get_kvs_info(handle, &info);
3193    TEST_CHK(info.last_seqnum == (fdb_seqnum_t)commit_term * nupdates);
3194
3195    // open snapshot for every 'commit_term' seq numbers
3196    for (i=0; i<nupdates; i++) {
3197        s = fdb_snapshot_open(handle, &snap, (i+1)*commit_term);
3198        TEST_CHK(s == FDB_RESULT_SUCCESS);
3199
3200        for (j=0;j<ndocs;++j) {
3201            if (j < ndocs/2) {
3202                update_no = (i/2) * 2;
3203            } else {
3204                if (i == 0) {
3205                    break;
3206                }
3207                update_no = 1 + ((i-1)/2) * 2;
3208            }
3209            sprintf(keybuf, "key%04d", j);
3210            sprintf(bodybuf, "body%04d_update%d", j, update_no);
3211            s = fdb_get_kv(snap, keybuf, strlen(keybuf)+1,
3212                           (void**)&value, &valuelen);
3213            TEST_CHK(s == FDB_RESULT_SUCCESS);
3214            TEST_CMP(value, bodybuf, valuelen);
3215            s = fdb_free_block(value);
3216        }
3217
3218        s = fdb_kvs_close(snap);
3219        TEST_CHK(s == FDB_RESULT_SUCCESS);
3220    }
3221}
3222
3223static void _snapshot_update_docs(fdb_file_handle *fhandle, struct cb_snapshot_args *args)
3224{
3225    int i;
3226    char keybuf[256], bodybuf[256];
3227    fdb_status s;
3228
3229    // update (half) docs
3230    if (args->nupdates % 2 == 0) {
3231        // former half
3232        for (i=0; i<args->ndocs/2; ++i) {
3233            sprintf(keybuf, "key%04d", i);
3234            sprintf(bodybuf, "body%04d_update%d", i, args->nupdates);
3235            s = fdb_set_kv(args->handle, keybuf, strlen(keybuf)+1,
3236                           bodybuf, strlen(bodybuf)+1);
3237        }
3238    } else {
3239        // latter half
3240        for (i=args->ndocs/2 ; i<args->ndocs; ++i) {
3241            sprintf(keybuf, "key%04d", i);
3242            sprintf(bodybuf, "body%04d_update%d", i, args->nupdates);
3243            s = fdb_set_kv(args->handle, keybuf, strlen(keybuf)+1,
3244                           bodybuf, strlen(bodybuf)+1);
3245        }
3246    }
3247    s = fdb_commit(fhandle, FDB_COMMIT_NORMAL);
3248    (void) s;
3249    args->nupdates++;
3250}
3251
3252static int cb_snapshot(fdb_file_handle *fhandle,
3253                       fdb_compaction_status status,
3254                       fdb_doc *doc, uint64_t old_offset, uint64_t new_offset,
3255                       void *ctx)
3256{
3257    struct cb_snapshot_args *args = (struct cb_snapshot_args *)ctx;
3258
3259    if (status == FDB_CS_BEGIN) {
3260        // first verification
3261        _snapshot_check(args->handle, args->ndocs, args->nupdates);
3262        // update half docs
3263        _snapshot_update_docs(fhandle, args);
3264        // second verification
3265        _snapshot_check(args->handle, args->ndocs, args->nupdates);
3266    } else { // if (status == FDB_CS_END)
3267        // first verification
3268        _snapshot_check(args->handle, args->ndocs, args->nupdates);
3269        // update half docs
3270        _snapshot_update_docs(fhandle, args);
3271        // second verification
3272        _snapshot_check(args->handle, args->ndocs, args->nupdates);
3273    }
3274    return 0;
3275}
3276
3277void snapshot_concurrent_compaction_test()
3278{
3279    TEST_INIT();
3280    memleak_start();
3281
3282    int i, j, idx, r;
3283    int n = 100;
3284    int commit_term = n/2;
3285    char keybuf[256], bodybuf[256];
3286    fdb_file_handle *dbfile;
3287    fdb_kvs_handle *db;
3288    fdb_status s;
3289    fdb_config fconfig = fdb_get_default_config();
3290    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3291    struct cb_snapshot_args cb_args;
3292
3293    memset(&cb_args, 0x0, sizeof(struct cb_snapshot_args));
3294    fconfig.wal_threshold = 128;
3295    fconfig.flags = FDB_OPEN_FLAG_CREATE;
3296    fconfig.compaction_cb = cb_snapshot;
3297    fconfig.compaction_cb_ctx = &cb_args;
3298    fconfig.compaction_cb_mask = FDB_CS_BEGIN |
3299                                 FDB_CS_END;
3300
3301    // remove previous mvcc_test files
3302    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
3303    (void)r;
3304
3305    // open db
3306    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
3307    fdb_kvs_open(dbfile, &db, "db", &kvs_config);
3308    cb_args.handle = db;
3309
3310    // write docs & commit for each n/2 doc updates
3311    for (i=0;i<n;++i){
3312        idx = i;
3313        j = i/commit_term;
3314        sprintf(keybuf, "key%04d", idx);
3315        sprintf(bodybuf, "body%04d_update%d", idx, j);
3316        s = fdb_set_kv(db, keybuf, strlen(keybuf)+1,
3317                           bodybuf, strlen(bodybuf)+1);
3318        TEST_CHK(s == FDB_RESULT_SUCCESS);
3319        if ((i+1)%commit_term == 0) {
3320            s = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3321	    TEST_CHK(s == FDB_RESULT_SUCCESS);
3322        }
3323    }
3324    cb_args.ndocs = n;
3325    cb_args.nupdates = 2;
3326
3327    _snapshot_check(db, cb_args.ndocs, cb_args.nupdates);
3328
3329    s = fdb_compact(dbfile, "./mvcc_test2");
3330    TEST_CHK(s == FDB_RESULT_SUCCESS);
3331
3332    fdb_close(dbfile);
3333    fdb_shutdown();
3334    memleak_end();
3335    TEST_RESULT("snapshot with concurrent compaction test");
3336}
3337
3338void rollback_to_zero_test(bool multi_kv)
3339{
3340    TEST_INIT();
3341
3342    memleak_start();
3343
3344    int i, r;
3345    int n = 20;
3346    int num_kvs = 4; // keep this the same as number of fdb_commit() calls
3347    fdb_file_handle *dbfile;
3348    fdb_kvs_handle **db = alca(fdb_kvs_handle *, num_kvs);
3349    fdb_doc **doc = alca(fdb_doc*, n);
3350    fdb_kvs_info info;
3351    fdb_status status;
3352
3353    char keybuf[256], metabuf[256], bodybuf[256];
3354    char kv_name[8];
3355
3356    // remove previous mvcc_test files
3357    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
3358    (void)r;
3359
3360    fdb_config fconfig = fdb_get_default_config();
3361    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3362    fconfig.buffercache_size = 0;
3363    fconfig.wal_threshold = 1024;
3364    fconfig.flags = FDB_OPEN_FLAG_CREATE;
3365    fconfig.compaction_threshold = 0;
3366    fconfig.multi_kv_instances = multi_kv;
3367
3368    // open db
3369    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
3370
3371    if (multi_kv) {
3372        for (r = 0; r < num_kvs; ++r) {
3373            sprintf(kv_name, "kv%d", r);
3374            fdb_kvs_open(dbfile, &db[r], kv_name, &kvs_config);
3375        }
3376    } else {
3377        num_kvs = 1;
3378        fdb_kvs_open_default(dbfile, &db[0], &kvs_config);
3379    }
3380
3381    for (r = 0; r < num_kvs; ++r) {
3382        status = fdb_set_log_callback(db[r], logCallbackFunc,
3383                                      (void *) "rollback_to_zero_test");
3384        TEST_CHK(status == FDB_RESULT_SUCCESS);
3385    }
3386
3387   // ------- Setup test ----------------------------------
3388   // insert documents of 0-4
3389    for (i=0; i<n/4; i++){
3390        sprintf(keybuf, "key%d", i);
3391        sprintf(metabuf, "meta%d", i);
3392        sprintf(bodybuf, "body%d", i);
3393        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3394            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
3395        for (r = 0; r < num_kvs; ++r) {
3396            fdb_set(db[r], doc[i]);
3397        }
3398    }
3399
3400    // commit with a manual WAL flush (these docs go into HB-trie)
3401    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
3402
3403    // insert documents from 5 - 9
3404    for (; i < n/2; i++){
3405        sprintf(keybuf, "key%d", i);
3406        sprintf(metabuf, "meta%d", i);
3407        sprintf(bodybuf, "body%d", i);
3408        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3409            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
3410        for (r = 0; r < num_kvs; ++r) {
3411            fdb_set(db[r], doc[i]);
3412        }
3413    }
3414
3415    // commit again without a WAL flush
3416    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3417
3418    // insert documents from 10-14 into HB-trie
3419    for (; i < (n/2 + n/4); i++){
3420        sprintf(keybuf, "key%d", i);
3421        sprintf(metabuf, "meta%d", i);
3422        sprintf(bodybuf, "body%d", i);
3423        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3424            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
3425        for (r = 0; r < num_kvs; ++r) {
3426            fdb_set(db[r], doc[i]);
3427        }
3428    }
3429    // manually flush WAL & commit
3430    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
3431
3432    // insert documents from 15 - 19 on file into the WAL
3433    for (; i < n; i++){
3434        sprintf(keybuf, "key%d", i);
3435        sprintf(metabuf, "meta%d", i);
3436        sprintf(bodybuf, "body%d", i);
3437        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3438            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
3439        for (r = 0; r < num_kvs; ++r) {
3440            fdb_set(db[r], doc[i]);
3441        }
3442    }
3443    // commit without a WAL flush
3444    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3445
3446    if (!multi_kv) {
3447        status = fdb_rollback(&db[0], 0);
3448        TEST_CHK(status == FDB_RESULT_SUCCESS);
3449        status = fdb_get_kvs_info(db[0], &info);
3450        TEST_CHK(status == FDB_RESULT_SUCCESS);
3451        TEST_CHK(info.last_seqnum == 0);
3452        TEST_CHK(info.doc_count == 0);
3453        TEST_CHK(info.space_used == 0);
3454        status = fdb_get(db[0], doc[0]);
3455        TEST_CHK(