1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdint.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <unistd.h>
25#endif
26
27#include "libforestdb/forestdb.h"
28#include "test.h"
29
30#include "internal_types.h"
31#include "functional_util.h"
32
33void rollback_secondary_kvs()
34{
35    TEST_INIT();
36    memleak_start();
37
38    int r;
39    void *value_out;
40    size_t valuelen_out;
41
42    fdb_status status;
43    fdb_config fconfig = fdb_get_default_config();
44    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
45    fdb_file_handle *dbfile;
46    fdb_kvs_handle *kv1, *kv2;
47
48    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
49    (void)r;
50
51    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
52    fdb_kvs_open_default(dbfile, &kv1, &kvs_config);
53    fdb_kvs_open_default(dbfile, &kv2, &kvs_config);
54
55    // seq:2
56    status = fdb_set_kv(kv1, (void *) "a", 1, (void *)"val-a", 5);
57    TEST_CHK(status == FDB_RESULT_SUCCESS);
58    status = fdb_set_kv(kv1, (void *) "b", 1, (void *)"val-b", 5);
59    TEST_CHK(status == FDB_RESULT_SUCCESS);
60    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
61
62    // seq:3
63    status = fdb_set_kv(kv1, (void *) "b", 1, (void *)"val-v", 5);
64    TEST_CHK(status == FDB_RESULT_SUCCESS);
65    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
66
67    // seq:4
68    status = fdb_del_kv(kv1, (void *)"a", 1);
69    TEST_CHK(status == FDB_RESULT_SUCCESS);
70    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
71
72    // get 'a' via kv2
73    status = fdb_get_kv(kv2, (void *)"b", 1, &value_out, &valuelen_out);
74    free(value_out);
75    TEST_CHK(status == FDB_RESULT_SUCCESS);
76
77    // rollback seq:3 via kv2
78    status = fdb_rollback(&kv2, 3);
79    TEST_CHK(status == FDB_RESULT_SUCCESS);
80
81    fdb_close(dbfile);
82    fdb_shutdown();
83
84    memleak_end();
85    TEST_RESULT("rollback secondary kv");
86}
87
88
89void multi_version_test()
90{
91    TEST_INIT();
92
93    memleak_start();
94
95    int i, r;
96    int n = 2;
97    fdb_file_handle *dbfile, *dbfile_new;
98    fdb_kvs_handle *db;
99    fdb_kvs_handle *db_new;
100    fdb_doc **doc = alca(fdb_doc*, n);
101    fdb_doc *rdoc;
102    fdb_status status;
103
104    char keybuf[256], metabuf[256], bodybuf[256];
105
106    // remove all previous mvcc_test files
107    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
108    (void)r;
109
110    fdb_config fconfig = fdb_get_default_config();
111    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
112    fconfig.buffercache_size = 1048576;
113    fconfig.wal_threshold = 1024;
114    fconfig.flags = FDB_OPEN_FLAG_CREATE;
115    fconfig.compaction_threshold = 0;
116
117    // open db
118    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
119    fdb_kvs_open_default(dbfile, &db, &kvs_config);
120    status = fdb_set_log_callback(db, logCallbackFunc, (void *) "multi_version_test");
121    TEST_CHK(status == FDB_RESULT_SUCCESS);
122
123    // insert documents
124    for (i=0;i<n;++i){
125        sprintf(keybuf, "key%d", i);
126        sprintf(metabuf, "meta%d", i);
127        sprintf(bodybuf, "body%d", i);
128        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
129            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
130        fdb_set(db, doc[i]);
131    }
132
133    // commit
134    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
135
136    // open same db file using a new handle
137    fdb_open(&dbfile_new, "./mvcc_test1", &fconfig);
138    fdb_kvs_open_default(dbfile_new, &db_new, &kvs_config);
139    status = fdb_set_log_callback(db_new, logCallbackFunc, (void *) "multi_version_test");
140    TEST_CHK(status == FDB_RESULT_SUCCESS);
141
142    // update documents using the old handle
143    for (i=0;i<n;++i){
144        sprintf(metabuf, "meta2%d", i);
145        sprintf(bodybuf, "body2%d", i);
146        fdb_doc_update(&doc[i], (void*)metabuf, strlen(metabuf),
147            (void*)bodybuf, strlen(bodybuf));
148        fdb_set(db, doc[i]);
149    }
150
151    // manually flush WAL and commit using the old handle
152    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
153
154    // retrieve documents using the old handle
155    for (i=0;i<n;++i){
156        // search by key
157        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
158        status = fdb_get(db, rdoc);
159
160        TEST_CHK(status == FDB_RESULT_SUCCESS);
161        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
162        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
163
164        // free result document
165        fdb_doc_free(rdoc);
166    }
167
168    // retrieve documents using the new handle
169    for (i=0;i<n;++i){
170        // search by key
171        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
172        status = fdb_get(db_new, rdoc);
173
174        TEST_CHK(status == FDB_RESULT_SUCCESS);
175        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
176        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
177
178        // free result document
179        fdb_doc_free(rdoc);
180    }
181
182    // close and re-open the new handle
183    fdb_kvs_close(db_new);
184    fdb_close(dbfile_new);
185    fdb_open(&dbfile_new, "./mvcc_test1", &fconfig);
186    fdb_kvs_open_default(dbfile_new, &db_new, &kvs_config);
187    status = fdb_set_log_callback(db_new, logCallbackFunc, (void *) "multi_version_test");
188    TEST_CHK(status == FDB_RESULT_SUCCESS);
189
190    // retrieve documents using the new handle
191    for (i=0;i<n;++i){
192        // search by key
193        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
194        status = fdb_get(db_new, rdoc);
195
196        TEST_CHK(status == FDB_RESULT_SUCCESS);
197        // the new version of data should be read
198        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
199        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
200
201        // free result document
202        fdb_doc_free(rdoc);
203    }
204
205
206    // free all documents
207    for (i=0;i<n;++i){
208        fdb_doc_free(doc[i]);
209    }
210
211    // close db file
212    fdb_kvs_close(db);
213    fdb_kvs_close(db_new);
214    fdb_close(dbfile);
215    fdb_close(dbfile_new);
216
217    // free all resources
218    fdb_shutdown();
219
220    memleak_end();
221
222    TEST_RESULT("multi version test");
223}
224
225void crash_recovery_test(bool walflush)
226{
227    TEST_INIT();
228
229    memleak_start();
230
231    int i, r;
232    int n = 10;
233    fdb_file_handle *dbfile;
234    fdb_kvs_handle *db;
235    fdb_doc **doc = alca(fdb_doc*, n);
236    fdb_doc *rdoc;
237    fdb_file_info file_info;
238    uint64_t bid;
239    const char *test_file = "./mvcc_test2";
240    fdb_status status;
241
242    char keybuf[256], metabuf[256], bodybuf[256];
243
244    // remove previous mvcc_test files
245    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
246    (void)r;
247
248    fdb_config fconfig = fdb_get_default_config();
249    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
250    fconfig.buffercache_size = 0;
251    fconfig.wal_threshold = 1024;
252    fconfig.flags = FDB_OPEN_FLAG_CREATE;
253    fconfig.compaction_threshold = 0;
254
255    // reopen db
256    fdb_open(&dbfile, test_file, &fconfig);
257    fdb_kvs_open_default(dbfile, &db, &kvs_config);
258    status = fdb_set_log_callback(db, logCallbackFunc,
259                                  (void *) "crash_recovery_test");
260    TEST_CHK(status == FDB_RESULT_SUCCESS);
261
262    // insert documents
263    for (i=0;i<n;++i){
264        sprintf(keybuf, "key%d", i);
265        sprintf(metabuf, "meta%d", i);
266        sprintf(bodybuf, "body%d", i);
267        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
268            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
269        fdb_set(db, doc[i]);
270    }
271
272    // commit
273    if(walflush){
274        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
275        TEST_CHK(status == FDB_RESULT_SUCCESS);
276    } else {
277        status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
278        TEST_CHK(status == FDB_RESULT_SUCCESS);
279    }
280
281    status = fdb_get_file_info(dbfile, &file_info);
282    TEST_CHK(status == FDB_RESULT_SUCCESS);
283    bid = file_info.file_size / fconfig.blocksize;
284
285    // close the db
286    fdb_kvs_close(db);
287    fdb_close(dbfile);
288
289    // Shutdown forest db in the middle of the test to simulate crash
290    fdb_shutdown();
291
292    sprintf(bodybuf,
293            "dd if=/dev/zero bs=%d of=%s oseek=%d count=2 >> errorlog.txt",
294            (int)fconfig.blocksize, test_file, (int)bid);
295    // Now append garbage at the end of the file for a few blocks
296    r = system(bodybuf);
297    (void)r;
298    // Write 1024 bytes of non-block aligned garbage to end of file
299    sprintf(bodybuf,
300            "dd if=/dev/zero bs=%d of=%s oseek=%d count=1 >> errorlog.txt",
301            (int)fconfig.blocksize/4, test_file, (int)(bid + 2)*4);
302    r = system(bodybuf);
303    (void)r;
304
305
306    // reopen the same file
307    status = fdb_open(&dbfile, test_file, &fconfig);
308    TEST_CHK(status == FDB_RESULT_SUCCESS);
309    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
310    TEST_CHK(status == FDB_RESULT_SUCCESS);
311    status = fdb_set_log_callback(db, logCallbackFunc,
312                                  (void *) "crash_recovery_test");
313    TEST_CHK(status == FDB_RESULT_SUCCESS);
314
315    // retrieve documents
316    for (i=0;i<n;++i){
317        // search by key
318        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
319        status = fdb_get(db, rdoc);
320
321        TEST_CHK(status == FDB_RESULT_SUCCESS);
322        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
323        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
324
325        // free result document
326        fdb_doc_free(rdoc);
327    }
328
329    // retrieve documents by sequence number
330    for (i=0;i<n;++i){
331        // search by seq
332        fdb_doc_create(&rdoc, NULL, 0, NULL, 0, NULL, 0);
333        rdoc->seqnum = i + 1;
334        status = fdb_get_byseq(db, rdoc);
335
336        TEST_CHK(status == FDB_RESULT_SUCCESS);
337
338        // free result document
339        fdb_doc_free(rdoc);
340    }
341
342    // free all documents
343    for (i=0;i<n;++i){
344        fdb_doc_free(doc[i]);
345    }
346
347    // close db file
348    fdb_kvs_close(db);
349    fdb_close(dbfile);
350
351    // free all resources
352    fdb_shutdown();
353
354    memleak_end();
355
356    if(walflush){
357        TEST_RESULT("crash recovery test - walflush");
358    } else {
359        TEST_RESULT("crash recovery test - normal flush");
360    }
361}
362
363void snapshot_test()
364{
365    TEST_INIT();
366
367    memleak_start();
368
369    int i, r;
370    int n = 20;
371    int count;
372    fdb_file_handle *dbfile;
373    fdb_kvs_handle *db;
374    fdb_kvs_handle *snap_db;
375    fdb_seqnum_t snap_seq;
376    fdb_doc **doc = alca(fdb_doc*, n);
377    fdb_doc *rdoc = NULL;
378    fdb_kvs_info kvs_info;
379    fdb_status status;
380    fdb_iterator *iterator;
381
382    char keybuf[256], metabuf[256], bodybuf[256];
383
384    // remove previous mvcc_test files
385    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
386    (void)r;
387
388    fdb_config fconfig = fdb_get_default_config();
389    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
390    fconfig.buffercache_size = 0;
391    fconfig.wal_threshold = 1024;
392    fconfig.flags = FDB_OPEN_FLAG_CREATE;
393    fconfig.compaction_threshold = 0;
394
395    // remove previous mvcc_test files
396    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
397    (void)r;
398
399    // open db
400    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
401    fdb_kvs_open_default(dbfile, &db, &kvs_config);
402    TEST_CHK(status == FDB_RESULT_SUCCESS);
403
404    // Create a snapshot from an empty database file
405    status = fdb_snapshot_open(db, &snap_db, 0);
406    TEST_CHK(status == FDB_RESULT_SUCCESS);
407    // check if snapshot's sequence number is zero.
408    fdb_get_kvs_info(snap_db, &kvs_info);
409    TEST_CHK(kvs_info.last_seqnum == 0);
410    // create an iterator on the snapshot for full range
411    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
412    // Iterator should not return any items.
413    status = fdb_iterator_get(iterator, &rdoc);
414    TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
415    fdb_iterator_close(iterator);
416    fdb_kvs_close(snap_db);
417
418    // ------- Setup test ----------------------------------
419    // insert documents of 0-4
420    for (i=0; i<n/4; i++){
421        sprintf(keybuf, "key%d", i);
422        sprintf(metabuf, "meta%d", i);
423        sprintf(bodybuf, "body%d", i);
424        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
425            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
426        fdb_set(db, doc[i]);
427    }
428
429    // commit with a manual WAL flush (these docs go into HB-trie)
430    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
431
432    // insert documents from 4 - 8
433    for (; i < n/2 - 1; i++){
434        sprintf(keybuf, "key%d", i);
435        sprintf(metabuf, "meta%d", i);
436        sprintf(bodybuf, "body%d", i);
437        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
438            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
439        fdb_set(db, doc[i]);
440    }
441
442    // We want to create:
443    // |WALFlushHDR|Key-Value1|HDR|Key-Value2|SnapshotHDR|Key-Value1|HDR|
444    // Insert doc 9 with a different value to test duplicate elimination..
445    sprintf(keybuf, "key%d", i);
446    sprintf(metabuf, "meta%d", i);
447    sprintf(bodybuf, "Body%d", i);
448    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
449            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
450    fdb_set(db, doc[i]);
451
452    // commit again without a WAL flush (last doc goes into the AVL tree)
453    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
454
455    // Insert doc 9 now again with expected value..
456    *(char *)doc[i]->body = 'b';
457    fdb_set(db, doc[i]);
458    // commit again without a WAL flush (these documents go into the AVL trees)
459    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
460
461    // TAKE SNAPSHOT: pick up sequence number of a commit without a WAL flush
462    snap_seq = doc[i]->seqnum;
463
464    // Now re-insert doc 9 as another duplicate (only newer sequence number)
465    fdb_set(db, doc[i]);
466    // commit again without a WAL flush (last doc goes into the AVL tree)
467    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
468
469    // insert documents from 10-14 into HB-trie
470    for (++i; i < (n/2 + n/4); i++){
471        sprintf(keybuf, "key%d", i);
472        sprintf(metabuf, "meta%d", i);
473        sprintf(bodybuf, "body%d", i);
474        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
475            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
476        fdb_set(db, doc[i]);
477    }
478    // manually flush WAL & commit
479    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
480
481    status = fdb_set_log_callback(db, logCallbackFunc,
482                                  (void *) "snapshot_test");
483    TEST_CHK(status == FDB_RESULT_SUCCESS);
484    // ---------- Snapshot tests begin -----------------------
485    // Attempt to take snapshot with out-of-range marker..
486    status = fdb_snapshot_open(db, &snap_db, 999999);
487    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
488
489    // Init Snapshot of open file with saved document seqnum as marker
490    status = fdb_snapshot_open(db, &snap_db, snap_seq);
491    TEST_CHK(status == FDB_RESULT_SUCCESS);
492
493    // check snapshot's sequence number
494    fdb_get_kvs_info(snap_db, &kvs_info);
495    TEST_CHK(kvs_info.last_seqnum == snap_seq);
496
497    // insert documents from 15 - 19 on file into the WAL
498    for (; i < n; i++){
499        sprintf(keybuf, "key%d", i);
500        sprintf(metabuf, "meta%d", i);
501        sprintf(bodybuf, "body%d", i);
502        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
503            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
504        fdb_set(db, doc[i]);
505    }
506    // commit without a WAL flush (This WAL must not affect snapshot)
507    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
508
509    // create an iterator on the snapshot for full range
510    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
511
512    // repeat until fail
513    i=0;
514    count=0;
515    do {
516        status = fdb_iterator_get(iterator, &rdoc);
517        TEST_CHK(status == FDB_RESULT_SUCCESS);
518
519        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
520        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
521        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
522
523        fdb_doc_free(rdoc);
524        rdoc = NULL;
525        i ++;
526        count++;
527    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
528
529    TEST_CHK(count==n/2); // Only unique items from the first half
530
531    fdb_iterator_close(iterator);
532
533    // create a sequence iterator on the snapshot for full range
534    fdb_iterator_sequence_init(snap_db, &iterator, 0, 0, FDB_ITR_NONE);
535
536    // repeat until fail
537    i=0;
538    count=0;
539    do {
540        status = fdb_iterator_get(iterator, &rdoc);
541        TEST_CHK(status == FDB_RESULT_SUCCESS);
542
543        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
544        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
545        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
546
547        fdb_doc_free(rdoc);
548        rdoc = NULL;
549        i ++;
550        count++;
551    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
552
553    TEST_CHK(count==n/2); // Only unique items from the first half
554
555    fdb_iterator_close(iterator);
556
557    // close db handle
558    fdb_kvs_close(db);
559    // close snapshot handle
560    fdb_kvs_close(snap_db);
561    // close db file
562    fdb_close(dbfile);
563
564    // free all documents
565    for (i=0;i<n;++i){
566        fdb_doc_free(doc[i]);
567    }
568
569    // 1. Open Database File
570    status = fdb_open(&dbfile, "./mvcc_test2", &fconfig);
571    TEST_CHK(status == FDB_RESULT_SUCCESS);
572
573    // 2. Open KV Store
574    status = fdb_kvs_open(dbfile, &db, "kv2", &kvs_config);
575    TEST_CHK(status == FDB_RESULT_SUCCESS);
576
577    // 2. Create Key 'a' and Commit
578    status = fdb_set_kv(db, (void *) "a", 1, (void *)"val-a", 5);
579    TEST_CHK(status == FDB_RESULT_SUCCESS);
580    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
581    TEST_CHK(status == FDB_RESULT_SUCCESS);
582
583    // 3. Create Key 'b' and Commit
584    status = fdb_set_kv(db, (void *)"b", 1, (void *)"val-b", 5);
585    TEST_CHK(status == FDB_RESULT_SUCCESS);
586    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
587    TEST_CHK(status == FDB_RESULT_SUCCESS);
588
589    // 4. Create Key 'c' and Commit
590    status = fdb_set_kv(db, (void *)"c", 1, (void *)"val-c", 5);
591    TEST_CHK(status == FDB_RESULT_SUCCESS);
592    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
593    TEST_CHK(status == FDB_RESULT_SUCCESS);
594
595    // 5. Remember seqnum for opening snapshot
596    status = fdb_get_kvs_info(db, &kvs_info);
597    TEST_CHK(status == FDB_RESULT_SUCCESS);
598    snap_seq = kvs_info.last_seqnum;
599
600    // 6.  Create an iterator
601    status = fdb_iterator_init(db, &iterator, (void *)"a", 1,
602                               (void *)"c", 1, FDB_ITR_NONE);
603    TEST_CHK(status == FDB_RESULT_SUCCESS);
604
605    // 7. Do a seek
606    status = fdb_iterator_seek(iterator, (void *)"b", 1, FDB_ITR_SEEK_HIGHER);
607    TEST_CHK(status == FDB_RESULT_SUCCESS);
608
609    // 8. Close the iterator
610    status = fdb_iterator_close(iterator);
611    TEST_CHK(status == FDB_RESULT_SUCCESS);
612
613    // 9. Open a snapshot at the same point
614    status = fdb_snapshot_open(db, &snap_db, snap_seq);
615    TEST_CHK(status == FDB_RESULT_SUCCESS);
616
617    // That works, now attempt to do exact same sequence on a snapshot
618    // The snapshot is opened at the exact same place that the original
619    // database handle should be at (last seq 3)
620
621    // 10.  Create an iterator on snapshot
622    status = fdb_iterator_init(snap_db, &iterator, (void *)"a", 1,
623                               (void *)"c", 1, FDB_ITR_NONE);
624    TEST_CHK(status == FDB_RESULT_SUCCESS);
625
626    // 11. Do a seek
627    status = fdb_iterator_seek(iterator, (void *)"b", 1, FDB_ITR_SEEK_HIGHER);
628    TEST_CHK(status == FDB_RESULT_SUCCESS);
629
630    // 12. Close the iterator
631    status = fdb_iterator_close(iterator);
632    TEST_CHK(status == FDB_RESULT_SUCCESS);
633
634    // close db handle
635    fdb_kvs_close(db);
636    // close snapshot handle
637    fdb_kvs_close(snap_db);
638    // close db file
639    fdb_close(dbfile);
640
641    // free all resources
642    fdb_shutdown();
643
644    memleak_end();
645
646    TEST_RESULT("snapshot test");
647}
648
649void snapshot_stats_test()
650{
651    TEST_INIT();
652
653    memleak_start();
654
655    int i, r;
656    int n = 20;
657    fdb_file_handle *dbfile;
658    fdb_kvs_handle *db;
659    fdb_kvs_handle *snap_db;
660    fdb_seqnum_t snap_seq;
661    fdb_doc **doc = alca(fdb_doc*, n);
662    fdb_kvs_info kvs_info;
663    fdb_status status;
664
665    char keybuf[256], metabuf[256], bodybuf[256];
666
667    // remove previous mvcc_test files
668    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
669    (void)r;
670
671    fdb_config fconfig = fdb_get_default_config();
672    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
673    fconfig.buffercache_size = 0;
674    fconfig.wal_threshold = 1024;
675    fconfig.flags = FDB_OPEN_FLAG_CREATE;
676    fconfig.compaction_threshold = 0;
677
678    // remove previous mvcc_test files
679    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
680    (void)r;
681
682    // open db
683    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
684    fdb_kvs_open_default(dbfile, &db, &kvs_config);
685    TEST_CHK(status == FDB_RESULT_SUCCESS);
686
687    // ------- Setup test ----------------------------------
688    // insert documents
689    for (i=0; i<n; i++){
690        sprintf(keybuf, "key%d", i);
691        sprintf(metabuf, "meta%d", i);
692        sprintf(bodybuf, "body%d", i);
693        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
694            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
695        fdb_set(db, doc[i]);
696        fdb_doc_free(doc[i]);
697    }
698
699    // commit with a manual WAL flush (these docs go into HB-trie)
700    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
701
702    // retrieve the sequence number for a snapshot open
703    status = fdb_get_kvs_info(db, &kvs_info);
704    TEST_CHK(status == FDB_RESULT_SUCCESS);
705    snap_seq = kvs_info.last_seqnum;
706
707    status = fdb_set_log_callback(db, logCallbackFunc,
708                                  (void *) "snapshot_stats_test");
709    TEST_CHK(status == FDB_RESULT_SUCCESS);
710
711    // Init Snapshot of open file with saved document seqnum as marker
712    status = fdb_snapshot_open(db, &snap_db, snap_seq);
713    TEST_CHK(status == FDB_RESULT_SUCCESS);
714
715    // check snapshot's sequence number
716    fdb_get_kvs_info(snap_db, &kvs_info);
717    TEST_CHK(kvs_info.last_seqnum == snap_seq);
718
719    TEST_CHK(kvs_info.doc_count == (size_t)n);
720    TEST_CHK(kvs_info.file == dbfile);
721
722    // close snapshot handle
723    fdb_kvs_close(snap_db);
724
725    // Test stats by creating an in-memory snapshot
726    status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
727    TEST_CHK(status == FDB_RESULT_SUCCESS);
728
729    // check snapshot's sequence number
730    fdb_get_kvs_info(snap_db, &kvs_info);
731    TEST_CHK(kvs_info.last_seqnum == snap_seq);
732
733    TEST_CHK(kvs_info.doc_count == (size_t)n);
734    TEST_CHK(kvs_info.file == dbfile);
735
736    // close snapshot handle
737    fdb_kvs_close(snap_db);
738
739    // close db handle
740    fdb_kvs_close(db);
741    // close db file
742    fdb_close(dbfile);
743
744    // free all resources
745    fdb_shutdown();
746
747    memleak_end();
748
749    TEST_RESULT("snapshot stats test");
750}
751
752void snapshot_with_uncomitted_data_test()
753{
754    TEST_INIT();
755
756    int n = 10, value_len=32;
757    int i, r, idx;
758    char cmd[256];
759    char key[256], *value;
760    char keystr[] = "key%06d";
761    char valuestr[] = "value%d";
762    fdb_file_handle *db_file;
763    fdb_kvs_handle *db0, *db1, *db2, *snap;
764    fdb_config config;
765    fdb_kvs_config kvs_config;
766    fdb_kvs_info info;
767    fdb_seqnum_t seqnum;
768    fdb_status s; (void)s;
769
770    sprintf(cmd, SHELL_DEL " mvcc_test* > errorlog.txt");
771    r = system(cmd);
772    (void)r;
773
774    memleak_start();
775
776    value = (char*)malloc(value_len);
777
778    srand(1234);
779    config = fdb_get_default_config();
780    config.seqtree_opt = FDB_SEQTREE_USE;
781    config.wal_flush_before_commit = true;
782    config.multi_kv_instances = true;
783    config.buffercache_size = 0*1024*1024;
784
785    kvs_config = fdb_get_default_kvs_config();
786
787    s = fdb_open(&db_file, "./mvcc_test9", &config);
788    TEST_CHK(s == FDB_RESULT_SUCCESS);
789    s = fdb_kvs_open(db_file, &db0, NULL, &kvs_config);
790    TEST_CHK(s == FDB_RESULT_SUCCESS);
791    s = fdb_kvs_open(db_file, &db1, "db1", &kvs_config);
792    TEST_CHK(s == FDB_RESULT_SUCCESS);
793    s = fdb_kvs_open(db_file, &db2, "db2", &kvs_config);
794    TEST_CHK(s == FDB_RESULT_SUCCESS);
795
796    // insert docs in all KV stores
797    for (i=0;i<n;++i){
798        idx = i;
799        sprintf(key, keystr, idx);
800        memset(value, 'x', value_len);
801        memcpy(value + value_len - 6, "<end>", 6);
802        sprintf(value, valuestr, idx);
803        s = fdb_set_kv(db0, key, strlen(key)+1, value, value_len);
804        TEST_CHK(s == FDB_RESULT_SUCCESS);
805        s = fdb_set_kv(db1, key, strlen(key)+1, value, value_len);
806        TEST_CHK(s == FDB_RESULT_SUCCESS);
807        s = fdb_set_kv(db2, key, strlen(key)+1, value, value_len);
808        TEST_CHK(s == FDB_RESULT_SUCCESS);
809    }
810    // try to open snapshot before commit
811    s = fdb_get_kvs_info(db0, &info);
812    TEST_CHK(s == FDB_RESULT_SUCCESS);
813    s = fdb_snapshot_open(db0, &snap, info.last_seqnum);
814    TEST_CHK(s != FDB_RESULT_SUCCESS);
815
816    s = fdb_get_kvs_info(db1, &info);
817    TEST_CHK(s == FDB_RESULT_SUCCESS);
818    s = fdb_snapshot_open(db1, &snap, info.last_seqnum);
819    TEST_CHK(s != FDB_RESULT_SUCCESS);
820
821    s = fdb_get_kvs_info(db2, &info);
822    TEST_CHK(s == FDB_RESULT_SUCCESS);
823    s = fdb_snapshot_open(db2, &snap, info.last_seqnum);
824    TEST_CHK(s != FDB_RESULT_SUCCESS);
825
826    s = fdb_commit(db_file, FDB_COMMIT_NORMAL);
827    TEST_CHK(s == FDB_RESULT_SUCCESS);
828
829    s = fdb_get_kvs_info(db1, &info);
830    TEST_CHK(s == FDB_RESULT_SUCCESS);
831    seqnum = info.last_seqnum;
832
833    s = fdb_get_kvs_info(db2, &info);
834    TEST_CHK(s == FDB_RESULT_SUCCESS);
835    TEST_CHK(seqnum == info.last_seqnum);
836
837    s = fdb_get_kvs_info(db0, &info);
838    TEST_CHK(s == FDB_RESULT_SUCCESS);
839    TEST_CHK(seqnum == info.last_seqnum);
840
841    // now insert docs into default and db2 only, without commit
842    for (i=0;i<n;++i){
843        idx = i;
844        sprintf(key, keystr, idx);
845        memset(value, 'x', value_len);
846        memcpy(value + value_len - 6, "<end>", 6);
847        sprintf(value, valuestr, idx);
848        s = fdb_set_kv(db0, key, strlen(key)+1, value, value_len);
849        TEST_CHK(s == FDB_RESULT_SUCCESS);
850        s = fdb_set_kv(db2, key, strlen(key)+1, value, value_len);
851        TEST_CHK(s == FDB_RESULT_SUCCESS);
852    }
853
854    // open snapshot on db1
855    s = fdb_get_kvs_info(db1, &info);
856    TEST_CHK(s == FDB_RESULT_SUCCESS);
857    // latest (comitted) seqnum
858    s = fdb_snapshot_open(db1, &snap, info.last_seqnum);
859    TEST_CHK(s == FDB_RESULT_SUCCESS);
860    s = fdb_kvs_close(snap);
861    TEST_CHK(s == FDB_RESULT_SUCCESS);
862
863    // open snapshot on db2
864    s = fdb_get_kvs_info(db2, &info);
865    TEST_CHK(s == FDB_RESULT_SUCCESS);
866
867    // latest (uncomitted) seqnum
868    s = fdb_snapshot_open(db2, &snap, info.last_seqnum);
869    TEST_CHK(s != FDB_RESULT_SUCCESS);
870    // committed seqnum
871    s = fdb_snapshot_open(db2, &snap, seqnum);
872    TEST_CHK(s == FDB_RESULT_SUCCESS);
873    s = fdb_kvs_close(snap);
874    TEST_CHK(s == FDB_RESULT_SUCCESS);
875
876    // open snapshot on default KVS
877    s = fdb_get_kvs_info(db0, &info);
878    TEST_CHK(s == FDB_RESULT_SUCCESS);
879
880    // latest (uncomitted) seqnum
881    s = fdb_snapshot_open(db0, &snap, info.last_seqnum);
882    TEST_CHK(s != FDB_RESULT_SUCCESS);
883    // committed seqnum
884    s = fdb_snapshot_open(db0, &snap, seqnum);
885    TEST_CHK(s == FDB_RESULT_SUCCESS);
886    s = fdb_kvs_close(snap);
887    TEST_CHK(s == FDB_RESULT_SUCCESS);
888
889    s = fdb_close(db_file);
890    TEST_CHK(s == FDB_RESULT_SUCCESS);
891    s = fdb_shutdown();
892    TEST_CHK(s == FDB_RESULT_SUCCESS);
893    free(value);
894    memleak_end();
895
896    TEST_RESULT("snapshot with uncomitted data in other KVS test");
897}
898
899void in_memory_snapshot_test()
900{
901    TEST_INIT();
902
903    memleak_start();
904
905    int i, r;
906    int n = 20;
907    int count;
908    fdb_file_handle *dbfile;
909    fdb_kvs_handle *db;
910    fdb_kvs_handle *snap_db;
911    fdb_seqnum_t snap_seq;
912    fdb_doc **doc = alca(fdb_doc*, n);
913    fdb_doc *rdoc;
914    fdb_kvs_info kvs_info;
915    fdb_status status;
916    fdb_iterator *iterator;
917
918    char keybuf[256], metabuf[256], bodybuf[256];
919
920    // remove previous mvcc_test files
921    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
922    (void)r;
923
924    fdb_config fconfig = fdb_get_default_config();
925    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
926    fconfig.buffercache_size = 0;
927    fconfig.wal_threshold = 1024;
928    fconfig.flags = FDB_OPEN_FLAG_CREATE;
929    fconfig.compaction_threshold = 0;
930
931    // remove previous mvcc_test files
932    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
933    (void)r;
934
935    // open db
936    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
937    TEST_CHK(status == FDB_RESULT_SUCCESS);
938    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
939    TEST_CHK(status == FDB_RESULT_SUCCESS);
940
941    // Create a snapshot from an empty database file
942    status = fdb_snapshot_open(db, &snap_db, 0);
943    TEST_CHK(status == FDB_RESULT_SUCCESS);
944    // check if snapshot's sequence number is zero.
945    fdb_get_kvs_info(snap_db, &kvs_info);
946    TEST_CHK(kvs_info.last_seqnum == 0);
947    // create an iterator on the snapshot for full range
948    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
949    // Iterator should not return any items.
950    status = fdb_iterator_get(iterator, &rdoc);
951    TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
952    fdb_iterator_close(iterator);
953    fdb_kvs_close(snap_db);
954
955    // ------- Setup test ----------------------------------
956    // insert documents of 0-4
957    for (i=0; i<n/4; i++){
958        sprintf(keybuf, "key%d", i);
959        sprintf(metabuf, "meta%d", i);
960        sprintf(bodybuf, "body%d", i);
961        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
962            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
963        fdb_set(db, doc[i]);
964    }
965
966    // commit with a manual WAL flush (these docs go into HB-trie)
967    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
968
969    // insert documents from 5 - 8
970    for (; i < n/2 - 1; i++){
971        sprintf(keybuf, "key%d", i);
972        sprintf(metabuf, "meta%d", i);
973        sprintf(bodybuf, "body%d", i);
974        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
975            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
976        fdb_set(db, doc[i]);
977    }
978
979    // We want to create:
980    // |WALFlushHDR|Key-Value1|HDR|Key-Value2|SnapshotHDR|Key-Value1|HDR|
981    // Insert doc 9 with a different value to test duplicate elimination..
982    sprintf(keybuf, "key%d", i);
983    sprintf(metabuf, "meta%d", i);
984    sprintf(bodybuf, "Body%d", i);
985    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
986            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
987    fdb_set(db, doc[i]);
988
989    // commit again without a WAL flush (last doc goes into the AVL tree)
990    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
991
992    // Insert doc 9 now again with expected value..
993    *(char *)doc[i]->body = 'b';
994    fdb_set(db, doc[i]);
995
996    // TAKE SNAPSHOT: pick up sequence number of a commit without a WAL flush
997    snap_seq = doc[i]->seqnum;
998
999    // Creation of a snapshot with a sequence number that was taken WITHOUT a
1000    // commit must fail even if it from the latest document that was set...
1001    fdb_get_kvs_info(db, &kvs_info);
1002    status = fdb_snapshot_open(db, &snap_db, kvs_info.last_seqnum);
1003    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
1004
1005    // ---------- Snapshot tests begin -----------------------
1006    // Initialize an in-memory snapshot Without a Commit...
1007    // WAL items are not flushed...
1008    status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
1009    TEST_CHK(status == FDB_RESULT_SUCCESS);
1010
1011    // commit again without a WAL flush (these documents go into the AVL trees)
1012    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1013
1014    // Now re-insert doc 9 as another duplicate (only newer sequence number)
1015    fdb_set(db, doc[i]);
1016
1017    // commit again without a WAL flush (last doc goes into the AVL tree)
1018    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1019
1020    // insert documents from 10-14 into HB-trie
1021    for (++i; i < (n/2 + n/4); i++){
1022        sprintf(keybuf, "key%d", i);
1023        sprintf(metabuf, "meta%d", i);
1024        sprintf(bodybuf, "body%d", i);
1025        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1026                (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1027        fdb_set(db, doc[i]);
1028    }
1029
1030    status = fdb_set_log_callback(db, logCallbackFunc,
1031                                  (void *) "in_memory_snapshot_test");
1032    TEST_CHK(status == FDB_RESULT_SUCCESS);
1033
1034    // check snapshot's sequence number
1035    fdb_get_kvs_info(snap_db, &kvs_info);
1036    TEST_CHK(kvs_info.last_seqnum == snap_seq);
1037
1038    // insert documents from 15 - 19 on file into the WAL
1039    for (; i < n; i++){
1040        sprintf(keybuf, "key%d", i);
1041        sprintf(metabuf, "meta%d", i);
1042        sprintf(bodybuf, "body%d", i);
1043        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1044            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1045        fdb_set(db, doc[i]);
1046    }
1047    // commit without a WAL flush (This WAL must not affect snapshot)
1048    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1049
1050    // create an iterator on the snapshot for full range
1051    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1052
1053    // repeat until fail
1054    i=0;
1055    count=0;
1056    rdoc = NULL;
1057    do {
1058        status = fdb_iterator_get(iterator, &rdoc);
1059        TEST_CHK(status == FDB_RESULT_SUCCESS);
1060
1061        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1062        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
1063        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1064
1065        fdb_doc_free(rdoc);
1066        rdoc = NULL;
1067        i++;
1068        count++;
1069    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
1070
1071    TEST_CHK(count==n/2); // Only unique items from the first half
1072
1073    fdb_iterator_close(iterator);
1074
1075    // close db handle
1076    fdb_kvs_close(db);
1077    // close snapshot handle
1078    fdb_kvs_close(snap_db);
1079
1080    // close db file
1081    fdb_close(dbfile);
1082
1083    // free all documents
1084    for (i=0;i<n;++i){
1085        fdb_doc_free(doc[i]);
1086    }
1087
1088    // free all resources
1089    fdb_shutdown();
1090
1091    memleak_end();
1092
1093    TEST_RESULT("in-memory snapshot test");
1094}
1095
1096
1097void in_memory_snapshot_on_dirty_hbtrie_test()
1098{
1099    TEST_INIT();
1100
1101    int n = 300, value_len=32;
1102    int i, r, idx, c;
1103    char cmd[256];
1104    char key[256], *value;
1105    char keystr[] = "k%05d";
1106    char keystr2[] = "k%06d";
1107    char valuestr[] = "value%08d";
1108    fdb_file_handle *db_file;
1109    fdb_kvs_handle *db, *snap, *snap_clone;
1110    fdb_config config;
1111    fdb_kvs_config kvs_config;
1112    fdb_status s;
1113    fdb_iterator *fit, *fit_normal, *fit_clone;
1114    fdb_doc *doc;
1115
1116    sprintf(cmd, SHELL_DEL " mvcc_test* > errorlog.txt");
1117    r = system(cmd);
1118    (void)r;
1119
1120    memleak_start();
1121
1122    value = (char*)malloc(value_len);
1123
1124    config = fdb_get_default_config();
1125    config.durability_opt = FDB_DRB_ASYNC;
1126    config.seqtree_opt = FDB_SEQTREE_USE;
1127    config.wal_flush_before_commit = true;
1128    config.wal_threshold = n/5;
1129    config.multi_kv_instances = true;
1130    config.buffercache_size = 0;
1131
1132    kvs_config = fdb_get_default_kvs_config();
1133
1134    s = fdb_open(&db_file, "./mvcc_test", &config);
1135    TEST_CHK(s == FDB_RESULT_SUCCESS);
1136    s = fdb_kvs_open(db_file, &db, "db", &kvs_config);
1137    TEST_CHK(s == FDB_RESULT_SUCCESS);
1138
1139    // write a few documents and commit & wal flush
1140    for (i=0;i<n/10;++i){
1141        idx = i;
1142        sprintf(key, keystr, idx);
1143        memset(value, 'x', value_len);
1144        memcpy(value + value_len - 6, "<end>", 6);
1145        sprintf(value, valuestr, idx);
1146        s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1147        TEST_CHK(s == FDB_RESULT_SUCCESS);
1148    }
1149    s = fdb_commit(db_file, FDB_COMMIT_MANUAL_WAL_FLUSH);
1150    TEST_CHK(s == FDB_RESULT_SUCCESS);
1151
1152    // create an in-memory snapshot and its clone
1153    s = fdb_snapshot_open(db, &snap, FDB_SNAPSHOT_INMEM);
1154    TEST_CHK(s == FDB_RESULT_SUCCESS);
1155    s = fdb_snapshot_open(snap, &snap_clone, FDB_SNAPSHOT_INMEM);
1156    TEST_CHK(s == FDB_RESULT_SUCCESS);
1157
1158    // write a number of documents in order to make WAL be flushed before commit
1159    for (i=n/10;i<n;++i){
1160        idx = i;
1161        sprintf(key, keystr, idx);
1162        memset(value, 'x', value_len);
1163        memcpy(value + value_len - 6, "<end>", 6);
1164        sprintf(value, valuestr, idx);
1165        s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1166        TEST_CHK(s == FDB_RESULT_SUCCESS);
1167    }
1168
1169    void *v_out;
1170    size_t vlen_out;
1171    idx = n/10;
1172    sprintf(key, keystr, idx);
1173    s = fdb_get_kv(snap, key, strlen(key)+1, &v_out, &vlen_out);
1174    // should not be able to retrieve
1175    TEST_CHK(s != FDB_RESULT_SUCCESS);
1176
1177    s = fdb_get_kv(snap_clone, key, strlen(key)+1, &v_out, &vlen_out);
1178    // should not be able to retrieve in also clone
1179    TEST_CHK(s != FDB_RESULT_SUCCESS);
1180
1181    // close snapshot and its clone
1182    s = fdb_kvs_close(snap);
1183    TEST_CHK(s == FDB_RESULT_SUCCESS);
1184    s = fdb_kvs_close(snap_clone);
1185    TEST_CHK(s == FDB_RESULT_SUCCESS);
1186
1187    // create an in-memory snapshot
1188    s = fdb_snapshot_open(db, &snap, FDB_SNAPSHOT_INMEM);
1189    TEST_CHK(s == FDB_RESULT_SUCCESS);
1190
1191    s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1192    TEST_CHK(s == FDB_RESULT_SUCCESS);
1193    c = 0;
1194    do {
1195        doc = NULL;
1196        s = fdb_iterator_get(fit, &doc);
1197        if (s != FDB_RESULT_SUCCESS) break;
1198
1199        idx = c;
1200        sprintf(key, keystr, idx);
1201        memset(value, 'x', value_len);
1202        memcpy(value + value_len - 6, "<end>", 6);
1203        sprintf(value, valuestr, idx);
1204        TEST_CMP(doc->key, key, doc->keylen);
1205        TEST_CMP(doc->body, value, doc->bodylen);
1206        c++;
1207        fdb_doc_free(doc);
1208    } while(fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1209    TEST_CHK(c == n);
1210
1211    s = fdb_iterator_close(fit);
1212    TEST_CHK(s == FDB_RESULT_SUCCESS);
1213
1214    // create a clone
1215    s = fdb_snapshot_open(snap, &snap_clone, FDB_SNAPSHOT_INMEM);
1216    TEST_CHK(s == FDB_RESULT_SUCCESS);
1217
1218    // create iterators on snapshot and its clone
1219    s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1220    TEST_CHK(s == FDB_RESULT_SUCCESS);
1221    s = fdb_iterator_init(snap_clone, &fit_clone, NULL, 0, NULL, 0, 0x0);
1222    TEST_CHK(s == FDB_RESULT_SUCCESS);
1223
1224    // also create an iterator on a normal handle
1225    s = fdb_iterator_init(db, &fit_normal, NULL, 0, NULL, 0, 0x0);
1226    TEST_CHK(s == FDB_RESULT_SUCCESS);
1227
1228    c = 0;
1229    do {
1230        doc = NULL;
1231        s = fdb_iterator_get(fit, &doc);
1232        if (s != FDB_RESULT_SUCCESS) break;
1233
1234        idx = c;
1235        sprintf(key, keystr, idx);
1236        memset(value, 'x', value_len);
1237        memcpy(value + value_len - 6, "<end>", 6);
1238        sprintf(value, valuestr, idx);
1239        TEST_CMP(doc->key, key, doc->keylen);
1240        TEST_CMP(doc->body, value, doc->bodylen);
1241        c++;
1242        fdb_doc_free(doc);
1243
1244        doc = NULL;
1245        s = fdb_iterator_get(fit, &doc);
1246        TEST_CHK(s == FDB_RESULT_SUCCESS);
1247        TEST_CMP(doc->key, key, doc->keylen);
1248        TEST_CMP(doc->body, value, doc->bodylen);
1249        fdb_doc_free(doc);
1250
1251        if (c == n/5) {
1252            // insert new docs in the middle of iteration
1253            for (i=0; i<n*10; ++i){
1254                idx = i;
1255                sprintf(key, keystr2, idx);
1256                memset(value, 'x', value_len);
1257                memcpy(value + value_len - 6, "<end>", 6);
1258                sprintf(value, valuestr, idx);
1259                s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1260                TEST_CHK(s == FDB_RESULT_SUCCESS);
1261            }
1262        }
1263
1264        s = fdb_iterator_next(fit);
1265        // result should be same to clone
1266        if (s != FDB_RESULT_SUCCESS) {
1267            s = fdb_iterator_next(fit_clone);
1268            TEST_CHK(s != FDB_RESULT_SUCCESS);
1269        } else {
1270            s = fdb_iterator_next(fit_clone);
1271            TEST_CHK(s == FDB_RESULT_SUCCESS);
1272        }
1273    } while(s == FDB_RESULT_SUCCESS);
1274    TEST_CHK(c == n);
1275
1276    // close iterators
1277    s = fdb_iterator_close(fit);
1278    TEST_CHK(s == FDB_RESULT_SUCCESS);
1279    s = fdb_iterator_close(fit_clone);
1280    TEST_CHK(s == FDB_RESULT_SUCCESS);
1281
1282    // the results should be same in the normal iterator
1283    c = 0;
1284    do {
1285        doc = NULL;
1286        s = fdb_iterator_get(fit_normal, &doc);
1287        if (s != FDB_RESULT_SUCCESS) break;
1288
1289        idx = c;
1290        sprintf(key, keystr, idx);
1291        memset(value, 'x', value_len);
1292        memcpy(value + value_len - 6, "<end>", 6);
1293        sprintf(value, valuestr, idx);
1294        TEST_CMP(doc->key, key, doc->keylen);
1295        TEST_CMP(doc->body, value, doc->bodylen);
1296        c++;
1297        fdb_doc_free(doc);
1298    } while(fdb_iterator_next(fit_normal) == FDB_RESULT_SUCCESS);
1299    TEST_CHK(c == n);
1300
1301    s = fdb_iterator_close(fit_normal);
1302    TEST_CHK(s == FDB_RESULT_SUCCESS);
1303
1304    s = fdb_kvs_close(snap);
1305    TEST_CHK(s == FDB_RESULT_SUCCESS);
1306
1307    s = fdb_kvs_close(snap_clone);
1308    TEST_CHK(s == FDB_RESULT_SUCCESS);
1309
1310    s = fdb_close(db_file);
1311    TEST_CHK(s == FDB_RESULT_SUCCESS);
1312    s = fdb_shutdown();
1313    TEST_CHK(s == FDB_RESULT_SUCCESS);
1314    free(value);
1315
1316    memleak_end();
1317
1318    TEST_RESULT("in-memory snapshot on dirty HB+trie nodes test");
1319}
1320
1321
1322struct cb_inmem_snap_args {
1323    fdb_kvs_handle *handle;
1324    int n;
1325    int move_count;
1326    int value_len;
1327    char *keystr;
1328    char *valuestr;
1329};
1330
1331static int cb_inmem_snap(fdb_file_handle *fhandle,
1332                       fdb_compaction_status status,
1333                       fdb_doc *doc_in, uint64_t old_offset, uint64_t new_offset,
1334                       void *ctx)
1335{
1336    TEST_INIT();
1337    int c, idx;
1338    char key[256], value[256];
1339    void *value_out;
1340    size_t valuelen;
1341    fdb_kvs_handle *snap;
1342    fdb_kvs_info info;
1343    fdb_iterator *fit;
1344    fdb_doc *doc;
1345    fdb_status s;
1346    struct cb_inmem_snap_args *args = (struct cb_inmem_snap_args *)ctx;
1347    (void)args;
1348
1349    if (status == FDB_CS_MOVE_DOC) {
1350        args->move_count++;
1351        if (args->move_count == 2) {
1352            // open in-memory snapshot
1353            s = fdb_snapshot_open(args->handle, &snap, FDB_SNAPSHOT_INMEM);
1354            TEST_CHK(s == FDB_RESULT_SUCCESS);
1355
1356            s = fdb_get_kvs_info(snap, &info);
1357            TEST_CHK(s == FDB_RESULT_SUCCESS);
1358            TEST_CHK(info.last_seqnum == (fdb_seqnum_t)args->n);
1359
1360            s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1361            TEST_CHK(s == FDB_RESULT_SUCCESS);
1362
1363            c = 0;
1364            do {
1365                doc = NULL;
1366                s = fdb_iterator_get(fit, &doc);
1367                if (s != FDB_RESULT_SUCCESS) {
1368                    break;
1369                }
1370                idx = c;
1371                sprintf(key, args->keystr, idx);
1372                memset(value, 'x', args->value_len);
1373                memcpy(value + args->value_len - 6, "<end>", 6);
1374                sprintf(value, args->valuestr, idx);
1375                TEST_CMP(doc->key, key, doc->keylen);
1376                TEST_CMP(doc->body, value, doc->bodylen);
1377
1378                c++;
1379                fdb_doc_free(doc);
1380            } while (fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1381            TEST_CHK(c == args->n);
1382
1383            s = fdb_iterator_close(fit);
1384            TEST_CHK(s == FDB_RESULT_SUCCESS);
1385
1386            fdb_kvs_close(snap);
1387
1388        } else if (args->move_count == 5) {
1389            // insert new doc
1390            sprintf(key, "new_key");
1391            sprintf(value, "new_value");
1392            s = fdb_set_kv(args->handle, (void*)key, strlen(key)+1, (void*)value, strlen(value)+1);
1393            TEST_CHK(s == FDB_RESULT_SUCCESS);
1394
1395            // open in-memory snapshot
1396            s = fdb_snapshot_open(args->handle, &snap, FDB_SNAPSHOT_INMEM);
1397            TEST_CHK(s == FDB_RESULT_SUCCESS);
1398
1399            s = fdb_get_kvs_info(snap, &info);
1400            TEST_CHK(s == FDB_RESULT_SUCCESS);
1401            TEST_CHK(info.last_seqnum == (fdb_seqnum_t)args->n+1);
1402
1403            s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1404            TEST_CHK(s == FDB_RESULT_SUCCESS);
1405
1406            c = 0;
1407            do {
1408                doc = NULL;
1409                s = fdb_iterator_get(fit, &doc);
1410                if (s != FDB_RESULT_SUCCESS) {
1411                    break;
1412                }
1413                if (c < args->n) {
1414                    idx = c;
1415                    sprintf(key, args->keystr, idx);
1416                    memset(value, 'x', args->value_len);
1417                    memcpy(value + args->value_len - 6, "<end>", 6);
1418                    sprintf(value, args->valuestr, idx);
1419                    TEST_CMP(doc->key, key, doc->keylen);
1420                    TEST_CMP(doc->body, value, doc->bodylen);
1421                } else {
1422                    // new document
1423                    sprintf(key, "new_key");
1424                    sprintf(value, "new_value");
1425                    TEST_CMP(doc->key, key, doc->keylen);
1426                    TEST_CMP(doc->body, value, doc->bodylen);
1427                }
1428
1429                c++;
1430                fdb_doc_free(doc);
1431            } while (fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1432            TEST_CHK(c == args->n + 1);
1433
1434            s = fdb_iterator_close(fit);
1435            TEST_CHK(s == FDB_RESULT_SUCCESS);
1436
1437            s = fdb_get_kv(snap, (void*)key, strlen(key)+1, &value_out, &valuelen);
1438            TEST_CHK(s == FDB_RESULT_SUCCESS);
1439            TEST_CMP(value_out, value, valuelen);
1440            fdb_free_block(value_out);
1441
1442            fdb_kvs_close(snap);
1443        }
1444    }
1445    return 0;
1446}
1447
1448void in_memory_snapshot_compaction_test()
1449{
1450    TEST_INIT();
1451
1452    int n = 10, value_len=32;
1453    int i, r, c, idx;
1454    char cmd[256];
1455    char key[256], *value;
1456    char keystr[] = "k%05d";
1457    char valuestr[] = "value%08d";
1458    void *value_out;
1459    size_t valuelen;
1460    fdb_file_handle *db_file;
1461    fdb_kvs_handle *db, *db2, *snap;
1462    fdb_config config;
1463    fdb_kvs_config kvs_config;
1464    fdb_kvs_info info;
1465    fdb_iterator *fit;
1466    fdb_doc *doc;
1467    fdb_status s;
1468    struct cb_inmem_snap_args cargs;
1469
1470    sprintf(cmd, SHELL_DEL " mvcc_test* > errorlog.txt");
1471    r = system(cmd);
1472    (void)r;
1473
1474    memleak_start();
1475
1476    value = (char*)malloc(value_len);
1477
1478    config = fdb_get_default_config();
1479    config.durability_opt = FDB_DRB_ASYNC;
1480    config.seqtree_opt = FDB_SEQTREE_USE;
1481    config.wal_flush_before_commit = true;
1482    config.wal_threshold = n/5;
1483    config.multi_kv_instances = true;
1484    config.buffercache_size = 0;
1485    config.compaction_cb = cb_inmem_snap;
1486    config.compaction_cb_mask = FDB_CS_MOVE_DOC;
1487    config.compaction_cb_ctx = &cargs;
1488
1489    kvs_config = fdb_get_default_kvs_config();
1490
1491    s = fdb_open(&db_file, "./mvcc_test", &config);
1492    TEST_CHK(s == FDB_RESULT_SUCCESS);
1493    s = fdb_kvs_open(db_file, &db, "db", &kvs_config);
1494    TEST_CHK(s == FDB_RESULT_SUCCESS);
1495
1496    s = fdb_kvs_open(db_file, &db2, "db", &kvs_config);
1497    TEST_CHK(s == FDB_RESULT_SUCCESS);
1498
1499    cargs.handle = db2;
1500    cargs.move_count = 0;
1501    cargs.n = n;
1502    cargs.keystr = keystr;
1503    cargs.valuestr = valuestr;
1504    cargs.value_len = value_len;
1505
1506    // write
1507    for (i=0;i<n;++i){
1508        idx = i;
1509        sprintf(key, keystr, idx);
1510        memset(value, 'x', value_len);
1511        memcpy(value + value_len - 6, "<end>", 6);
1512        sprintf(value, valuestr, idx);
1513        s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
1514        TEST_CHK(s == FDB_RESULT_SUCCESS);
1515    }
1516    s = fdb_commit(db_file, FDB_COMMIT_MANUAL_WAL_FLUSH);
1517    TEST_CHK(s == FDB_RESULT_SUCCESS);
1518
1519    s = fdb_compact(db_file, "./mvcc_test2");
1520    TEST_CHK(s == FDB_RESULT_SUCCESS);
1521
1522    // open in-memory snapshot
1523    s = fdb_snapshot_open(db, &snap, FDB_SNAPSHOT_INMEM);
1524    TEST_CHK(s == FDB_RESULT_SUCCESS);
1525
1526    s = fdb_get_kvs_info(snap, &info);
1527    TEST_CHK(s == FDB_RESULT_SUCCESS);
1528    TEST_CHK(info.last_seqnum == (fdb_seqnum_t)n+1);
1529
1530    s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0x0);
1531    TEST_CHK(s == FDB_RESULT_SUCCESS);
1532
1533    c = 0;
1534    do {
1535        doc = NULL;
1536        s = fdb_iterator_get(fit, &doc);
1537        if (s != FDB_RESULT_SUCCESS) {
1538            break;
1539        }
1540        if (c < n) {
1541            idx = c;
1542            sprintf(key, keystr, idx);
1543            memset(value, 'x', value_len);
1544            memcpy(value + value_len - 6, "<end>", 6);
1545            sprintf(value, valuestr, idx);
1546            TEST_CMP(doc->key, key, doc->keylen);
1547            TEST_CMP(doc->body, value, doc->bodylen);
1548        } else {
1549            // new document
1550            sprintf(key, "new_key");
1551            sprintf(value, "new_value");
1552            TEST_CMP(doc->key, key, doc->keylen);
1553            TEST_CMP(doc->body, value, doc->bodylen);
1554        }
1555
1556        c++;
1557        fdb_doc_free(doc);
1558    } while (fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
1559    TEST_CHK(c == n + 1);
1560
1561    s = fdb_iterator_close(fit);
1562    TEST_CHK(s == FDB_RESULT_SUCCESS);
1563
1564    s = fdb_get_kv(snap, (void*)key, strlen(key)+1, &value_out, &valuelen);
1565    TEST_CHK(s == FDB_RESULT_SUCCESS);
1566    TEST_CMP(value_out, value, valuelen);
1567    fdb_free_block(value_out);
1568
1569    fdb_kvs_close(snap);
1570
1571    s = fdb_close(db_file);
1572    TEST_CHK(s == FDB_RESULT_SUCCESS);
1573    s = fdb_shutdown();
1574    TEST_CHK(s == FDB_RESULT_SUCCESS);
1575    free(value);
1576
1577    memleak_end();
1578
1579    TEST_RESULT("in-memory snapshot with concurrent compaction test");
1580}
1581
1582void snapshot_clone_test()
1583{
1584    TEST_INIT();
1585
1586    memleak_start();
1587
1588    int i, r;
1589    int n = 20;
1590    int count;
1591    fdb_file_handle *dbfile;
1592    fdb_kvs_handle *db;
1593    fdb_kvs_handle *snap_db, *snap_inmem;
1594    fdb_kvs_handle *snap_db2, *snap_inmem2; // clones from stable & inmemory
1595    fdb_seqnum_t snap_seq;
1596    fdb_doc **doc = alca(fdb_doc*, n);
1597    fdb_doc *rdoc;
1598    fdb_kvs_info kvs_info;
1599    fdb_status status;
1600    fdb_iterator *iterator;
1601
1602    char keybuf[256], metabuf[256], bodybuf[256];
1603
1604    // remove previous mvcc_test files
1605    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1606    (void)r;
1607
1608    fdb_config fconfig = fdb_get_default_config();
1609    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1610    fconfig.buffercache_size = 0;
1611    fconfig.wal_threshold = 1024;
1612    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1613    fconfig.compaction_threshold = 0;
1614
1615    // remove previous mvcc_test files
1616    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1617    (void)r;
1618
1619    // open db
1620    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
1621    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1622    TEST_CHK(status == FDB_RESULT_SUCCESS);
1623
1624    // Create a snapshot from an empty database file
1625    status = fdb_snapshot_open(db, &snap_db, 0);
1626    TEST_CHK(status == FDB_RESULT_SUCCESS);
1627    // check if snapshot's sequence number is zero.
1628    fdb_get_kvs_info(snap_db, &kvs_info);
1629    TEST_CHK(kvs_info.last_seqnum == 0);
1630    // create an iterator on the snapshot for full range
1631    fdb_iterator_init(snap_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1632    // Iterator should not return any items.
1633    status = fdb_iterator_next(iterator);
1634    TEST_CHK(status == FDB_RESULT_ITERATOR_FAIL);
1635    fdb_iterator_close(iterator);
1636    fdb_kvs_close(snap_db);
1637
1638    // ------- Setup test ----------------------------------
1639    // insert documents of 0-4
1640    for (i=0; i<n/4; i++){
1641        sprintf(keybuf, "key%d", i);
1642        sprintf(metabuf, "meta%d", i);
1643        sprintf(bodybuf, "body%d", i);
1644        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1645            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1646        fdb_set(db, doc[i]);
1647    }
1648
1649    // commit with a manual WAL flush (these docs go into HB-trie)
1650    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1651
1652    // insert documents from 4 - 8
1653    for (; i < n/2 - 1; i++){
1654        sprintf(keybuf, "key%d", i);
1655        sprintf(metabuf, "meta%d", i);
1656        sprintf(bodybuf, "body%d", i);
1657        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1658            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1659        fdb_set(db, doc[i]);
1660    }
1661
1662    // We want to create:
1663    // |WALFlushHDR|Key-Value1|HDR|Key-Value2|SnapshotHDR|Key-Value1|HDR|
1664    // Insert doc 9 with a different value to test duplicate elimination..
1665    sprintf(keybuf, "key%d", i);
1666    sprintf(metabuf, "meta%d", i);
1667    sprintf(bodybuf, "Body%d", i);
1668    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1669            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1670    fdb_set(db, doc[i]);
1671
1672    // commit again without a WAL flush (last doc goes into the AVL tree)
1673    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1674
1675    // Insert doc 9 now again with expected value..
1676    *(char *)doc[i]->body = 'b';
1677    fdb_set(db, doc[i]);
1678    // commit again without a WAL flush (these documents go into the AVL trees)
1679    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1680
1681    // TAKE SNAPSHOT: pick up sequence number of a commit without a WAL flush
1682    snap_seq = doc[i]->seqnum;
1683
1684    // Initialize an in-memory snapshot Without a Commit...
1685    // WAL items are not flushed...
1686    status = fdb_snapshot_open(db, &snap_inmem, FDB_SNAPSHOT_INMEM);
1687    TEST_CHK(status == FDB_RESULT_SUCCESS);
1688
1689    // Now re-insert doc 9 as another duplicate (only newer sequence number)
1690    fdb_set(db, doc[i]);
1691    // commit again without a WAL flush (last doc goes into the AVL tree)
1692    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1693
1694    // insert documents from 10-14 into HB-trie
1695    for (++i; i < (n/2 + n/4); i++){
1696        sprintf(keybuf, "key%d", i);
1697        sprintf(metabuf, "meta%d", i);
1698        sprintf(bodybuf, "body%d", i);
1699        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1700            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1701        fdb_set(db, doc[i]);
1702    }
1703    // manually flush WAL & commit
1704    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1705
1706    status = fdb_set_log_callback(db, logCallbackFunc,
1707                                  (void *) "snapshot_clone_test");
1708    TEST_CHK(status == FDB_RESULT_SUCCESS);
1709    // ---------- Snapshot tests begin -----------------------
1710    // Attempt to take snapshot with out-of-range marker..
1711    status = fdb_snapshot_open(db, &snap_db, 999999);
1712    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
1713
1714    // Init Snapshot of open file with saved document seqnum as marker
1715    status = fdb_snapshot_open(db, &snap_db, snap_seq);
1716    TEST_CHK(status == FDB_RESULT_SUCCESS);
1717
1718    // Clone a snapshot into another snapshot...
1719    status = fdb_snapshot_open(snap_db, &snap_db2, snap_seq);
1720    TEST_CHK(status == FDB_RESULT_SUCCESS);
1721
1722    // close snapshot handle
1723    status = fdb_kvs_close(snap_db);
1724    TEST_CHK(status == FDB_RESULT_SUCCESS);
1725
1726    // check snapshot's sequence number
1727    fdb_get_kvs_info(snap_db2, &kvs_info);
1728    TEST_CHK(kvs_info.last_seqnum == snap_seq);
1729
1730    // insert documents from 15 - 19 on file into the WAL
1731    for (; i < n; i++){
1732        sprintf(keybuf, "key%d", i);
1733        sprintf(metabuf, "meta%d", i);
1734        sprintf(bodybuf, "body%d", i);
1735        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1736            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1737        fdb_set(db, doc[i]);
1738    }
1739    // commit without a WAL flush (This WAL must not affect snapshot)
1740    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1741
1742    // Clone the in-memory snapshot into another snapshot
1743    status = fdb_snapshot_open(snap_inmem, &snap_inmem2, FDB_SNAPSHOT_INMEM);
1744    TEST_CHK(status == FDB_RESULT_SUCCESS);
1745
1746    // Stable Snapshot Scan Tests..........
1747    // Retrieve metaonly by key from snapshot
1748    i = snap_seq + 1;
1749    fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1750    status = fdb_get_metaonly(snap_db2, rdoc);
1751    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
1752    fdb_doc_free(rdoc);
1753
1754    i = 5;
1755    fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1756    status = fdb_get_metaonly(snap_db2, rdoc);
1757    TEST_CHK(status == FDB_RESULT_SUCCESS);
1758    TEST_CMP(rdoc->key, doc[i]->key, doc[i]->keylen);
1759    TEST_CMP(rdoc->meta, doc[i]->meta, doc[i]->metalen);
1760    fdb_doc_free(rdoc);
1761
1762    // Retrieve by seq from snapshot
1763    fdb_doc_create(&rdoc, NULL, 0, NULL, 0, NULL, 0);
1764    rdoc->seqnum = 6;
1765    status = fdb_get_byseq(snap_db2, rdoc);
1766    TEST_CHK(status == FDB_RESULT_SUCCESS);
1767    TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1768    TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1769    fdb_doc_free(rdoc);
1770    rdoc = NULL;
1771
1772    // create an iterator on the snapshot for full range
1773    fdb_iterator_init(snap_db2, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1774
1775    // repeat until fail
1776    i=0;
1777    count=0;
1778    do {
1779        status = fdb_iterator_get(iterator, &rdoc);
1780        TEST_CHK(status == FDB_RESULT_SUCCESS);
1781        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1782        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
1783        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1784
1785        fdb_doc_free(rdoc);
1786        rdoc = NULL;
1787        i ++;
1788        count++;
1789    } while (fdb_iterator_next(iterator) == FDB_RESULT_SUCCESS);
1790
1791    TEST_CHK(count==n/2); // Only unique items from the first half
1792
1793    fdb_iterator_close(iterator);
1794
1795    // create an iterator on the in-memory snapshot clone for full range
1796    fdb_iterator_init(snap_inmem2, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1797
1798    // repeat until fail
1799    i=0;
1800    count=0;
1801    do {
1802        status = fdb_iterator_get(iterator, &rdoc);
1803        TEST_CHK(status == FDB_RESULT_SUCCESS);
1804        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1805        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
1806        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1807
1808        fdb_doc_free(rdoc);
1809        rdoc = NULL;
1810        i ++;
1811        count++;
1812    } while(fdb_iterator_next(iterator) == FDB_RESULT_SUCCESS);
1813
1814    TEST_CHK(count==n/2); // Only unique items from the first half
1815
1816    fdb_iterator_close(iterator);
1817
1818    // close db handle
1819    fdb_kvs_close(db);
1820    // close the in-memory snapshot handle
1821    fdb_kvs_close(snap_inmem);
1822    // close the in-memory clone snapshot handle
1823    fdb_kvs_close(snap_inmem2);
1824    // close the clone snapshot handle
1825    fdb_kvs_close(snap_db2);
1826    // close db file
1827    fdb_close(dbfile);
1828
1829    // free all documents
1830    for (i=0;i<n;++i){
1831        fdb_doc_free(doc[i]);
1832    }
1833
1834    // free all resources
1835    fdb_shutdown();
1836
1837    memleak_end();
1838
1839    TEST_RESULT("snapshot clone test");
1840}
1841
1842struct parallel_clone_t {
1843    fdb_doc **doc;
1844    fdb_kvs_handle *snap_db;
1845    int num_docs;
1846};
1847
1848void *snap_clone_thread(void *args)
1849{
1850    struct parallel_clone_t *t = (struct parallel_clone_t *)args;
1851    fdb_kvs_handle *clone_db;
1852    fdb_iterator *iterator;
1853    fdb_doc *rdoc = NULL;
1854    fdb_doc **doc = t->doc;
1855    int i;
1856    int n = t->num_docs;
1857    fdb_status status;
1858    int count;
1859    TEST_INIT();
1860
1861    status = fdb_snapshot_open(t->snap_db, &clone_db, FDB_SNAPSHOT_INMEM);
1862    TEST_CHK(status == FDB_RESULT_SUCCESS);
1863
1864    // create an iterator on the in-memory snapshot clone for full range
1865    fdb_iterator_init(clone_db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1866
1867    // repeat until fail
1868    i=0;
1869    count=0;
1870    do {
1871        status = fdb_iterator_get(iterator, &rdoc);
1872        TEST_CHK(status == FDB_RESULT_SUCCESS);
1873        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
1874        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1875
1876        fdb_doc_free(rdoc);
1877        rdoc = NULL;
1878        i ++;
1879        count++;
1880    } while(fdb_iterator_next(iterator) == FDB_RESULT_SUCCESS);
1881
1882    TEST_CHK(count==n/2); // Only unique items from the first half
1883
1884    fdb_iterator_close(iterator);
1885
1886    fdb_kvs_close(clone_db);
1887    thread_exit(0);
1888    return NULL;
1889}
1890
1891void snapshot_parallel_clone_test()
1892{
1893    TEST_INIT();
1894
1895    memleak_start();
1896
1897    int i, r;
1898    int num_cloners = 30; // parallel snapshot clone operations
1899    int n = 20480; // 10 dirty wal flushes at least
1900    fdb_file_handle *dbfile;
1901    fdb_kvs_handle *db;
1902    fdb_kvs_handle *snap_inmem;
1903    fdb_doc **doc = alca(fdb_doc*, n);
1904    thread_t *tid = alca(thread_t, num_cloners);
1905    void *thread_ret;
1906    fdb_status status;
1907    struct parallel_clone_t clone_data;
1908
1909    char keybuf[256], bodybuf[256];
1910
1911    // remove previous mvcc_test files
1912    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1913    (void)r;
1914
1915    fdb_config fconfig = fdb_get_default_config();
1916    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1917    fconfig.buffercache_size = 0;
1918    fconfig.wal_threshold = 1024;
1919    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1920    fconfig.compaction_threshold = 0;
1921
1922    // remove previous mvcc_test files
1923    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
1924    (void)r;
1925
1926    // open db
1927    status = fdb_open(&dbfile, "./mvcc_test1", &fconfig);
1928    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1929    TEST_CHK(status == FDB_RESULT_SUCCESS);
1930
1931    status = fdb_set_log_callback(db, logCallbackFunc,
1932                                  (void *) "snapshot_parallel_clone_test");
1933    TEST_CHK(status == FDB_RESULT_SUCCESS);
1934    // ------- Setup test ----------------------------------
1935    for (i=0; i<n/2; i++){
1936        sprintf(keybuf, "key%5d", i);
1937        sprintf(bodybuf, "body%d", i);
1938        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1939            NULL, 0, (void*)bodybuf, strlen(bodybuf));
1940        fdb_set(db, doc[i]);
1941    }
1942
1943    // Initialize an in-memory snapshot Without a Commit...
1944    // WAL items are not flushed...
1945    status = fdb_snapshot_open(db, &snap_inmem, FDB_SNAPSHOT_INMEM);
1946    TEST_CHK(status == FDB_RESULT_SUCCESS);
1947
1948    clone_data.doc = doc;
1949    clone_data.num_docs = n;
1950    clone_data.snap_db = snap_inmem;
1951
1952    for (int j = num_cloners - 1; j>=0; --j) {
1953        thread_create(&tid[j], snap_clone_thread, &clone_data);
1954    }
1955
1956    for (; i < n; i++){
1957        sprintf(keybuf, "key%5d", i);
1958        sprintf(bodybuf, "BODY%d", i);
1959        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1960            NULL, 0, (void*)bodybuf, strlen(bodybuf));
1961        fdb_set(db, doc[i]);
1962    }
1963    // commit without a WAL flush (This WAL must not affect snapshot)
1964    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1965
1966    for (int j = num_cloners - 1; j>=0; --j) {
1967        thread_join(tid[j], &thread_ret);
1968    }
1969
1970    // close db handle
1971    fdb_kvs_close(db);
1972    // close the in-memory snapshot handle
1973    fdb_kvs_close(snap_inmem);
1974    // close db file
1975    fdb_close(dbfile);
1976
1977    // free all documents
1978    for (i=0;i<n;++i){
1979        fdb_doc_free(doc[i]);
1980    }
1981
1982    // free all resources
1983    fdb_shutdown();
1984
1985    memleak_end();
1986
1987    TEST_RESULT("snapshot parallel clone test");
1988}
1989
1990void snapshot_markers_in_file_test(bool multi_kv)
1991{
1992    TEST_INIT();
1993
1994    memleak_start();
1995
1996    int i, r;
1997    int n = 20;
1998    int num_kvs = 4; // keep this the same as number of fdb_commit() calls
1999    fdb_file_handle *dbfile;
2000    fdb_kvs_handle **db = alca(fdb_kvs_handle *, num_kvs);
2001    fdb_doc **doc = alca(fdb_doc*, n);
2002    fdb_status status;
2003    fdb_snapshot_info_t *markers;
2004    uint64_t num_markers;
2005
2006    char keybuf[256], metabuf[256], bodybuf[256];
2007    char kv_name[8];
2008
2009    fdb_config fconfig = fdb_get_default_config();
2010    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2011    fconfig.buffercache_size = 0;
2012    fconfig.wal_threshold = 1024;
2013    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2014    fconfig.compaction_threshold = 0;
2015    fconfig.multi_kv_instances = multi_kv;
2016
2017    // remove previous mvcc_test files
2018    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2019    (void)r;
2020
2021    // open db
2022    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2023    if (multi_kv) {
2024        for (r = 0; r < num_kvs; ++r) {
2025            sprintf(kv_name, "kv%d", r);
2026            fdb_kvs_open(dbfile, &db[r], kv_name, &kvs_config);
2027        }
2028    } else {
2029        num_kvs = 1;
2030        fdb_kvs_open_default(dbfile, &db[0], &kvs_config);
2031    }
2032
2033   // ------- Setup test ----------------------------------
2034   // insert documents of 0-4
2035    for (i=0; i<n/4; i++){
2036        sprintf(keybuf, "key%d", i);
2037        sprintf(metabuf, "meta%d", i);
2038        sprintf(bodybuf, "body%d", i);
2039        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2040            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2041        for (r = 0; r < num_kvs; ++r) {
2042            fdb_set(db[r], doc[i]);
2043        }
2044    }
2045
2046    // commit with a manual WAL flush (these docs go into HB-trie)
2047    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2048
2049    // insert documents from 5 - 9
2050    for (; i < n/2; i++){
2051        sprintf(keybuf, "key%d", i);
2052        sprintf(metabuf, "meta%d", i);
2053        sprintf(bodybuf, "body%d", i);
2054        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2055            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2056        for (r = 0; r < num_kvs; ++r) {
2057            fdb_set(db[r], doc[i]);
2058        }
2059    }
2060
2061    // commit again without a WAL flush
2062    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2063
2064    // insert documents from 10-14 into HB-trie
2065    for (; i < (n/2 + n/4); i++){
2066        sprintf(keybuf, "key%d", i);
2067        sprintf(metabuf, "meta%d", i);
2068        sprintf(bodybuf, "body%d", i);
2069        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2070            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2071        for (r = 0; r < num_kvs; ++r) {
2072            fdb_set(db[r], doc[i]);
2073        }
2074    }
2075    // manually flush WAL & commit
2076    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2077
2078    // insert documents from 15 - 19 on file into the WAL
2079    for (; i < n; i++){
2080        sprintf(keybuf, "key%d", i);
2081        sprintf(metabuf, "meta%d", i);
2082        sprintf(bodybuf, "body%d", i);
2083        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2084            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2085        for (r = 0; r < num_kvs; ++r) {
2086            fdb_set(db[r], doc[i]);
2087        }
2088    }
2089    // commit without a WAL flush
2090    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2091
2092    for (r = 0; r < num_kvs; ++r) {
2093        status = fdb_set_log_callback(db[r], logCallbackFunc,
2094                                      (void *) "snapshot_markers_in_file");
2095        TEST_CHK(status == FDB_RESULT_SUCCESS);
2096    }
2097
2098    status = fdb_get_all_snap_markers(dbfile, &markers, &num_markers);
2099    TEST_CHK(status == FDB_RESULT_SUCCESS);
2100
2101    if (!multi_kv) {
2102        TEST_CHK(num_markers == 4);
2103        for (r = 0; r < num_kvs; ++r) {
2104            TEST_CHK(markers[r].num_kvs_markers == 1);
2105            TEST_CHK(markers[r].kvs_markers[0].seqnum
2106                     == (fdb_seqnum_t)(n - r*5));
2107        }
2108    } else {
2109        TEST_CHK(num_markers == 8);
2110        for (r = 0; r < num_kvs; ++r) {
2111            TEST_CHK(markers[r].num_kvs_markers == num_kvs);
2112            for (i = 0; i < num_kvs; ++i) {
2113                TEST_CHK(markers[r].kvs_markers[i].seqnum
2114                         == (fdb_seqnum_t)(n - r*5));
2115                sprintf(kv_name, "kv%d", i);
2116                TEST_CMP(markers[r].kvs_markers[i].kv_store_name, kv_name, 3);
2117            }
2118        }
2119    }
2120
2121    status = fdb_free_snap_markers(markers, num_markers);
2122    TEST_CHK(status == FDB_RESULT_SUCCESS);
2123
2124    // close db file
2125    fdb_close(dbfile);
2126
2127    // free all documents
2128    for (i=0;i<n;++i){
2129        fdb_doc_free(doc[i]);
2130    }
2131
2132    // free all resources
2133    fdb_shutdown();
2134
2135    memleak_end();
2136
2137    sprintf(bodybuf, "snapshot markers in file test %s", multi_kv ?
2138                                                        "multiple kv mode:"
2139                                                      : "single kv mode:");
2140    TEST_RESULT(bodybuf);
2141}
2142
2143void rollback_forward_seqnum()
2144{
2145
2146    TEST_INIT();
2147    memleak_start();
2148
2149    int r;
2150    int i, n=100;
2151    int rb1_seqnum, rb2_seqnum;
2152    char keybuf[256];
2153    char setop[3];
2154    fdb_file_handle *dbfile;
2155    fdb_iterator *it;
2156    fdb_kvs_handle *kv1, *mirror_kv1;
2157    fdb_kvs_info info;
2158    fdb_config fconfig = fdb_get_default_config();
2159    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2160    fdb_doc **doc = alca(fdb_doc*, n);
2161    fdb_doc *rdoc = NULL;
2162    fdb_status status;
2163    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2164    (void)r;
2165
2166    fconfig.wal_threshold = 1024;
2167    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2168    fconfig.compaction_threshold = 0;
2169
2170    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2171    fdb_kvs_open(dbfile, &kv1, "kv1", &kvs_config);
2172    fdb_kvs_open(dbfile, &mirror_kv1, NULL, &kvs_config);
2173
2174
2175    // set n docs within both dbs
2176    for(i=0;i<n;++i){
2177        sprintf(keybuf, "key%d", i);
2178        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2179            NULL, 0, NULL, 0);
2180        fdb_set(kv1, doc[i]);
2181        fdb_set_kv(mirror_kv1, keybuf, strlen(keybuf), setop, 3);
2182    }
2183
2184    // commit and save seqnum1
2185    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2186    fdb_get_kvs_info(kv1, &info);
2187    rb1_seqnum = info.last_seqnum;
2188
2189    // delete all docs in kv1
2190    for(i=0;i<n;++i){
2191        fdb_del(kv1, doc[i]);
2192    }
2193
2194    // commit and save seqnum2
2195    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2196    fdb_get_kvs_info(kv1, &info);
2197    rb2_seqnum = info.last_seqnum;
2198
2199
2200    // sets again
2201    for(i=0;i<n;++i){
2202        doc[i]->deleted = false;
2203        fdb_set(kv1, doc[i]);
2204    }
2205
2206    // commit
2207    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2208
2209    // rollback to first seqnum
2210    status = fdb_rollback(&kv1, rb1_seqnum);
2211    TEST_CHK(status == FDB_RESULT_SUCCESS);
2212
2213    // rollback to second seqnum
2214    status = fdb_rollback(&kv1, rb2_seqnum);
2215    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
2216
2217    status = fdb_iterator_sequence_init(mirror_kv1, &it, 0, 0, FDB_ITR_NONE);
2218    TEST_CHK(status == FDB_RESULT_SUCCESS);
2219
2220    do {
2221        status = fdb_iterator_get(it, &rdoc);
2222        TEST_CHK(status == FDB_RESULT_SUCCESS);
2223        status = fdb_get_metaonly_byseq(kv1, rdoc);
2224        TEST_CHK(status == FDB_RESULT_SUCCESS);
2225        TEST_CHK(rdoc->deleted == false);
2226        fdb_doc_free(rdoc);
2227        rdoc = NULL;
2228    } while(fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
2229
2230    for (i=0;i<n;++i){
2231        fdb_doc_free(doc[i]);
2232    }
2233    fdb_iterator_close(it);
2234    fdb_kvs_close(kv1);
2235    fdb_close(dbfile);
2236    fdb_shutdown();
2237    memleak_end();
2238
2239    TEST_RESULT("rollback forward seqnum");
2240}
2241
2242void rollback_test(bool multi_kv)
2243{
2244    TEST_INIT();
2245
2246    memleak_start();
2247
2248    int i, r;
2249    int n = 20;
2250    int count;
2251    fdb_file_handle *dbfile, *dbfile_txn;
2252    fdb_kvs_handle *db, *db_txn;
2253    fdb_seqnum_t rollback_seq;
2254    fdb_doc **doc = alca(fdb_doc*, n);
2255    fdb_doc *rdoc = NULL;
2256    fdb_kvs_info kvs_info;
2257    fdb_status status;
2258    fdb_iterator *iterator;
2259
2260    char keybuf[256], metabuf[256], bodybuf[256];
2261
2262    // remove previous mvcc_test files
2263    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2264    (void)r;
2265
2266    fdb_config fconfig = fdb_get_default_config();
2267    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2268    fconfig.buffercache_size = 0;
2269    fconfig.wal_threshold = 1024;
2270    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2271    fconfig.compaction_threshold = 0;
2272    fconfig.multi_kv_instances = multi_kv;
2273
2274    // remove previous mvcc_test files
2275    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2276    (void)r;
2277
2278    // open db
2279    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2280    if (multi_kv) {
2281        fdb_kvs_open_default(dbfile, &db, &kvs_config);
2282    } else {
2283        fdb_kvs_open(dbfile, &db, NULL, &kvs_config);
2284    }
2285
2286   // ------- Setup test ----------------------------------
2287   // insert documents of 0-4
2288    for (i=0; i<n/4; i++){
2289        sprintf(keybuf, "key%d", i);
2290        sprintf(metabuf, "meta%d", i);
2291        sprintf(bodybuf, "body%d", i);
2292        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2293            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2294        fdb_set(db, doc[i]);
2295    }
2296
2297    // commit with a manual WAL flush (these docs go into HB-trie)
2298    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2299
2300    // insert documents from 4 - 9
2301    for (; i < n/2; i++){
2302        sprintf(keybuf, "key%d", i);
2303        sprintf(metabuf, "meta%d", i);
2304        sprintf(bodybuf, "body%d", i);
2305        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2306            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2307        fdb_set(db, doc[i]);
2308    }
2309
2310    // commit again without a WAL flush
2311    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2312
2313    // ROLLBACK POINT: pick up sequence number of a commit without a WAL flush
2314    rollback_seq = doc[i-1]->seqnum;
2315
2316    // insert documents from 10-14 into HB-trie
2317    for (; i < (n/2 + n/4); i++){
2318        sprintf(keybuf, "key%d", i);
2319        sprintf(metabuf, "meta%d", i);
2320        sprintf(bodybuf, "body%d", i);
2321        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2322            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2323        fdb_set(db, doc[i]);
2324    }
2325    // manually flush WAL & commit
2326    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2327
2328    // insert documents from 15 - 19 on file into the WAL
2329    for (; i < n; i++){
2330        sprintf(keybuf, "key%d", i);
2331        sprintf(metabuf, "meta%d", i);
2332        sprintf(bodybuf, "body%d", i);
2333        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2334            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
2335        fdb_set(db, doc[i]);
2336    }
2337    // commit without a WAL flush
2338    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2339
2340    status = fdb_set_log_callback(db, logCallbackFunc,
2341                                  (void *) "rollback_test");
2342    TEST_CHK(status == FDB_RESULT_SUCCESS);
2343
2344    // ---------- Rollback tests begin -----------------------
2345    // We have DB file with 5 HB-trie docs, 5 unflushed WAL docs occuring twice
2346    // Attempt to rollback to out-of-range marker..
2347    status = fdb_rollback(&db, 999999);
2348    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
2349
2350    // Open another handle & begin transaction
2351    fdb_open(&dbfile_txn, "./mvcc_test1", &fconfig);
2352    if (multi_kv) {
2353        fdb_kvs_open_default(dbfile_txn, &db_txn, &kvs_config);
2354    } else {
2355        fdb_kvs_open(dbfile_txn, &db_txn, NULL, &kvs_config);
2356    }
2357    fdb_begin_transaction(dbfile_txn, FDB_ISOLATION_READ_COMMITTED);
2358    // Attempt to rollback while the transaction is active
2359    status =  fdb_rollback(&db, rollback_seq);
2360    // Must fail
2361    TEST_CHK(status == FDB_RESULT_FAIL_BY_TRANSACTION);
2362    fdb_abort_transaction(dbfile_txn);
2363    fdb_kvs_close(db_txn);
2364    fdb_close(dbfile_txn);
2365
2366    // Rollback to saved marker from above
2367    status = fdb_rollback(&db, rollback_seq);
2368    TEST_CHK(status == FDB_RESULT_SUCCESS);
2369
2370    // check handle's sequence number
2371    fdb_get_kvs_info(db, &kvs_info);
2372    TEST_CHK(kvs_info.last_seqnum == rollback_seq);
2373
2374    // Modify an item and update into the rollbacked file..
2375    i = n/2;
2376    *(char *)doc[i]->body = 'B';
2377    fdb_set(db, doc[i]);
2378    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2379
2380    // create an iterator on the rollback for full range
2381    fdb_iterator_sequence_init(db, &iterator, 0, 0, FDB_ITR_NONE);
2382
2383    // repeat until fail
2384    i=0;
2385    count=0;
2386    do {
2387        status = fdb_iterator_get(iterator, &rdoc);
2388        TEST_CHK(status == FDB_RESULT_SUCCESS);
2389
2390        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
2391        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
2392        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
2393
2394        fdb_doc_free(rdoc);
2395        rdoc = NULL;
2396        i ++;
2397        count++;
2398    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
2399
2400    TEST_CHK(count==n/2 + 1); // Items from first half and newly set item
2401
2402    fdb_iterator_close(iterator);
2403
2404    // close db file
2405    fdb_kvs_close(db);
2406    fdb_close(dbfile);
2407
2408    // free all documents
2409    for (i=0;i<n;++i){
2410        fdb_doc_free(doc[i]);
2411    }
2412
2413    // free all resources
2414    fdb_shutdown();
2415
2416    memleak_end();
2417
2418    sprintf(bodybuf, "rollback test %s", multi_kv ? "multiple kv mode:"
2419                                                  : "single kv mode:");
2420    TEST_RESULT(bodybuf);
2421}
2422
2423
2424
2425void rollback_and_snapshot_test()
2426{
2427    TEST_INIT();
2428
2429    memleak_start();
2430
2431    fdb_seqnum_t seqnum, rollback_seqnum;
2432    fdb_kvs_info kvs_info;
2433    fdb_status status;
2434    fdb_config config;
2435    fdb_kvs_config kvs_config;
2436    fdb_file_handle *dbfile;
2437    fdb_kvs_handle *db,  *snapshot;
2438    int r;
2439
2440    // remove previous mvcc_test files
2441    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2442    (void)r;
2443
2444    // MB-12530 open db
2445    config = fdb_get_default_config();
2446    kvs_config = fdb_get_default_kvs_config();
2447    status = fdb_open(&dbfile, "mvcc_test", &config);
2448    TEST_CHK(status == FDB_RESULT_SUCCESS);
2449    status = fdb_kvs_open(dbfile, &db, NULL, &kvs_config);
2450    TEST_CHK(status == FDB_RESULT_SUCCESS);
2451
2452    // 2. Create Key 'a' and Commit
2453    status = fdb_set_kv(db, (void *) "a", 1, (void *)"val-a", 5);
2454    TEST_CHK(status == FDB_RESULT_SUCCESS);
2455    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2456    TEST_CHK(status == FDB_RESULT_SUCCESS);
2457
2458    status = fdb_get_kvs_info(db, &kvs_info);
2459    TEST_CHK(status == FDB_RESULT_SUCCESS);
2460
2461    // 3. Create Key 'b' and Commit
2462    status = fdb_set_kv(db, (void *)"b", 1, (void *)"val-b", 5);
2463    TEST_CHK(status == FDB_RESULT_SUCCESS);
2464    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2465    TEST_CHK(status == FDB_RESULT_SUCCESS);
2466
2467    status = fdb_get_kvs_info(db, &kvs_info);
2468    TEST_CHK(status == FDB_RESULT_SUCCESS);
2469    seqnum = kvs_info.last_seqnum;
2470
2471    // 4.  Remember this as our rollback point
2472    rollback_seqnum = seqnum;
2473
2474    // 5. Create Key 'c' and Commit
2475    status = fdb_set_kv(db, (void *)"c", 1,(void *) "val-c", 5);
2476    TEST_CHK(status == FDB_RESULT_SUCCESS);
2477    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2478    TEST_CHK(status == FDB_RESULT_SUCCESS);
2479
2480    status = fdb_get_kvs_info(db, &kvs_info);
2481    TEST_CHK(status == FDB_RESULT_SUCCESS);
2482
2483    // 6. Rollback to rollback point (seq 2)
2484    status = fdb_rollback(&db, rollback_seqnum);
2485    TEST_CHK(status == FDB_RESULT_SUCCESS);
2486
2487    status = fdb_get_kvs_info(db, &kvs_info);
2488    TEST_CHK(status == FDB_RESULT_SUCCESS);
2489    seqnum = kvs_info.last_seqnum;
2490
2491    // 7. Verify that Key 'c' is not found
2492    void *val;
2493    size_t vallen;
2494    status = fdb_get_kv(db, (void *)"c", 1, &val, &vallen);
2495    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
2496
2497    // 8. Open a snapshot at the same point
2498    status = fdb_snapshot_open(db, &snapshot, seqnum);
2499    TEST_CHK(status == FDB_RESULT_SUCCESS);
2500
2501    // 9. Verify that Key 'c' is not found
2502    status = fdb_get_kv(snapshot, (void *)"c", 1, &val, &vallen);
2503    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
2504
2505    // close the snapshot db
2506    fdb_kvs_close(snapshot);
2507
2508    // close db file
2509    fdb_close(dbfile);
2510
2511    // free all resources
2512    fdb_shutdown();
2513
2514    memleak_end();
2515
2516    TEST_RESULT("rollback and snapshot test");
2517}
2518
2519void rollback_ncommits()
2520{
2521
2522    TEST_INIT();
2523    memleak_start();
2524
2525    int r;
2526    int i, j, n=100;
2527    int ncommits=10;
2528    char keybuf[256];
2529    fdb_file_handle *dbfile;
2530    fdb_kvs_handle *kv1, *kv2;
2531    fdb_kvs_info info;
2532    fdb_config fconfig = fdb_get_default_config();
2533    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2534    fdb_status status;
2535    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2536    (void)r;
2537
2538    fconfig.wal_threshold = 1024;
2539    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2540    fconfig.compaction_threshold = 0;
2541
2542    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2543    fdb_kvs_open(dbfile, &kv1, "kv1", &kvs_config);
2544    fdb_kvs_open(dbfile, &kv2, NULL, &kvs_config);
2545
2546
2547    for(j=0;j<ncommits;++j){
2548
2549        // set n docs per commit
2550        for(i=0;i<n;++i){
2551            sprintf(keybuf, "key%02d%03d", j, i);
2552            fdb_set_kv(kv1, keybuf, strlen(keybuf), NULL, 0);
2553            fdb_set_kv(kv2, keybuf, strlen(keybuf), NULL, 0);
2554        }
2555        // alternate commit pattern
2556        if((j % 2) == 0){
2557            fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2558        } else {
2559            fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2560        }
2561
2562        // doc_count should match seqnum since they are unique
2563        fdb_get_kvs_info(kv1, &info);
2564        TEST_CHK(info.doc_count == info.last_seqnum);
2565    }
2566
2567    // iteratively rollback 5 commits
2568     for(j=ncommits;j>0;--j){
2569        status = fdb_rollback(&kv1, j*n);
2570        TEST_CHK(status == FDB_RESULT_SUCCESS);
2571
2572        // check rollback doc_count
2573        fdb_get_kvs_info(kv1, &info);
2574        TEST_CHK(info.doc_count == info.last_seqnum);
2575    }
2576
2577    fdb_kvs_close(kv1);
2578    fdb_close(dbfile);
2579    fdb_shutdown();
2580    memleak_end();
2581
2582    TEST_RESULT("rollback n commits");
2583}
2584
2585void transaction_test()
2586{
2587    TEST_INIT();
2588
2589    memleak_start();
2590
2591    int i, r;
2592    int n = 10;
2593    fdb_file_handle *dbfile, *dbfile_txn1, *dbfile_txn2, *dbfile_txn3;
2594    fdb_kvs_handle *db, *db_txn1, *db_txn2, *db_txn3;
2595    fdb_doc **doc = alca(fdb_doc*, n);
2596    fdb_doc *rdoc;
2597    fdb_status status;
2598
2599    char keybuf[256], metabuf[256], bodybuf[256];
2600
2601    // remove previous mvcc_test files
2602    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2603    (void)r;
2604
2605    fdb_config fconfig = fdb_get_default_config();
2606    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2607    fconfig.buffercache_size = 0;
2608    fconfig.wal_threshold = 1024;
2609    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2610    fconfig.purging_interval = 0;
2611    fconfig.compaction_threshold = 0;
2612
2613    // open db
2614    fdb_open(&dbfile, "mvcc_test1", &fconfig);
2615    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2616    status = fdb_set_log_callback(db, logCallbackFunc,
2617                                  (void *) "transaction_test");
2618    TEST_CHK(status == FDB_RESULT_SUCCESS);
2619
2620    // open db and begin transactions
2621    fdb_open(&dbfile_txn1, "mvcc_test1", &fconfig);
2622    fdb_open(&dbfile_txn2, "mvcc_test1", &fconfig);
2623    fdb_kvs_open_default(dbfile_txn1, &db_txn1, &kvs_config);
2624    fdb_kvs_open_default(dbfile_txn2, &db_txn2, &kvs_config);
2625    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
2626    fdb_begin_transaction(dbfile_txn2, FDB_ISOLATION_READ_COMMITTED);
2627
2628    // insert half docs into txn1
2629    for (i=0;i<n/2;++i){
2630        sprintf(keybuf, "key%d", i);
2631        sprintf(metabuf, "meta%d", i);
2632        sprintf(bodybuf, "body%d", i);
2633        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2634                                (void*)metabuf, strlen(metabuf),
2635                                (void*)bodybuf, strlen(bodybuf));
2636        fdb_set(db_txn1, doc[i]);
2637    }
2638
2639    // insert other half docs into txn2
2640    for (i=n/2;i<n;++i){
2641        sprintf(keybuf, "key%d", i);
2642        sprintf(metabuf, "meta%d", i);
2643        sprintf(bodybuf, "body%d", i);
2644        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
2645                                (void*)metabuf, strlen(metabuf),
2646                                (void*)bodybuf, strlen(bodybuf));
2647        fdb_set(db_txn2, doc[i]);
2648    }
2649
2650    // uncommitted docs should not be read by the other transaction that
2651    // doesn't allow uncommitted reads.
2652    for (i=0;i<n;++i){
2653        sprintf(keybuf, "key%d", i);
2654        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2655        status = fdb_get(db_txn1, rdoc);
2656        if (i<n/2) {
2657            TEST_CHK(status == FDB_RESULT_SUCCESS);
2658        } else {
2659            TEST_CHK(status != FDB_RESULT_SUCCESS);
2660        }
2661        fdb_doc_free(rdoc);
2662    }
2663
2664    // uncommitted docs can be read by the transaction that allows uncommitted reads.
2665    fdb_open(&dbfile_txn3, "mvcc_test1", &fconfig);
2666    fdb_kvs_open_default(dbfile_txn3, &db_txn3, &kvs_config);
2667    fdb_begin_transaction(dbfile_txn3, FDB_ISOLATION_READ_UNCOMMITTED);
2668    for (i=0;i<n;++i){
2669        sprintf(keybuf, "key%d", i);
2670        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2671        status = fdb_get(db_txn3, rdoc);
2672        TEST_CHK(status == FDB_RESULT_SUCCESS);
2673        fdb_doc_free(rdoc);
2674    }
2675    fdb_end_transaction(dbfile_txn3, FDB_COMMIT_NORMAL);
2676    fdb_close(dbfile_txn3);
2677
2678    // commit and end txn1
2679    fdb_end_transaction(dbfile_txn1, FDB_COMMIT_NORMAL);
2680
2681    // uncommitted docs should not be read generally
2682    for (i=0;i<n;++i){
2683        sprintf(keybuf, "key%d", i);
2684        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2685        status = fdb_get(db, rdoc);
2686        if (i<n/2) {
2687            TEST_CHK(status == FDB_RESULT_SUCCESS);
2688        } else {
2689            TEST_CHK(status != FDB_RESULT_SUCCESS);
2690        }
2691        fdb_doc_free(rdoc);
2692    }
2693
2694    // read uncommitted docs using the same transaction
2695    for (i=0;i<n;++i){
2696        sprintf(keybuf, "key%d", i);
2697        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2698        status = fdb_get(db_txn2, rdoc);
2699        TEST_CHK(status == FDB_RESULT_SUCCESS);
2700        fdb_doc_free(rdoc);
2701    }
2702
2703    // abort txn2
2704    fdb_abort_transaction(dbfile_txn2);
2705
2706    // close & re-open db file
2707    fdb_close(dbfile);
2708    fdb_open(&dbfile, "mvcc_test1", &fconfig);
2709    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2710    status = fdb_set_log_callback(db, logCallbackFunc,
2711                                  (void *) "transaction_test");
2712    TEST_CHK(status == FDB_RESULT_SUCCESS);
2713
2714    // uncommitted docs should not be read after reopening the file either
2715    for (i=0;i<n;++i){
2716        sprintf(keybuf, "key%d", i);
2717        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2718        status = fdb_get(db, rdoc);
2719        if (i<n/2) {
2720            TEST_CHK(status == FDB_RESULT_SUCCESS);
2721        } else {
2722            TEST_CHK(status != FDB_RESULT_SUCCESS);
2723        }
2724        fdb_doc_free(rdoc);
2725    }
2726
2727    // insert other half docs & commit
2728    for (i=n/2;i<n;++i){
2729        fdb_set(db, doc[i]);
2730    }
2731    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2732
2733    // now all docs can be read generally
2734    for (i=0;i<n;++i){
2735        sprintf(keybuf, "key%d", i);
2736        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2737        status = fdb_get(db, rdoc);
2738        TEST_CHK(status == FDB_RESULT_SUCCESS);
2739        fdb_doc_free(rdoc);
2740    }
2741
2742    // begin transactions
2743    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
2744    fdb_begin_transaction(dbfile_txn2, FDB_ISOLATION_READ_COMMITTED);
2745
2746    // concurrently update docs
2747    for (i=0;i<n;++i){
2748        sprintf(keybuf, "key%d", i);
2749        sprintf(metabuf, "meta%d", i);
2750        sprintf(bodybuf, "body%d_txn1", i);
2751        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf),
2752                                (void*)metabuf, strlen(metabuf),
2753                                (void*)bodybuf, strlen(bodybuf));
2754        fdb_set(db_txn1, rdoc);
2755        fdb_doc_free(rdoc);
2756
2757        sprintf(bodybuf, "body%d_txn2", i);
2758        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf),
2759                                (void*)metabuf, strlen(metabuf),
2760                                (void*)bodybuf, strlen(bodybuf));
2761        fdb_set(db_txn2, rdoc);
2762        fdb_doc_free(rdoc);
2763    }
2764
2765    // retrieve check
2766    for (i=0;i<n;++i){
2767        // general retrieval
2768        sprintf(keybuf, "key%d", i);
2769        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2770        status = fdb_get(db, rdoc);
2771        TEST_CHK(status == FDB_RESULT_SUCCESS);
2772        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
2773        fdb_doc_free(rdoc);
2774
2775        // get from txn1
2776        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2777        status = fdb_get(db_txn1, rdoc);
2778        TEST_CHK(status == FDB_RESULT_SUCCESS);
2779        sprintf(bodybuf, "body%d_txn1", i);
2780        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2781        fdb_doc_free(rdoc);
2782
2783        // get from txn2
2784        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2785        status = fdb_get(db_txn2, rdoc);
2786        TEST_CHK(status == FDB_RESULT_SUCCESS);
2787        sprintf(bodybuf, "body%d_txn2", i);
2788        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2789        fdb_doc_free(rdoc);
2790    }
2791
2792    // commit txn2 & retrieve check
2793    fdb_end_transaction(dbfile_txn2, FDB_COMMIT_NORMAL);
2794    for (i=0;i<n;++i){
2795        // general retrieval
2796        sprintf(keybuf, "key%d", i);
2797        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2798        status = fdb_get(db, rdoc);
2799        TEST_CHK(status == FDB_RESULT_SUCCESS);
2800        sprintf(bodybuf, "body%d_txn2", i);
2801        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2802        fdb_doc_free(rdoc);
2803
2804        // get from txn1
2805        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2806        status = fdb_get(db_txn1, rdoc);
2807        TEST_CHK(status == FDB_RESULT_SUCCESS);
2808        sprintf(bodybuf, "body%d_txn1", i);
2809        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2810        fdb_doc_free(rdoc);
2811    }
2812
2813    // commit txn1 & retrieve check
2814    fdb_end_transaction(dbfile_txn1, FDB_COMMIT_NORMAL);
2815    for (i=0;i<n;++i){
2816        // general retrieval
2817        sprintf(keybuf, "key%d", i);
2818        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2819        status = fdb_get(db, rdoc);
2820        TEST_CHK(status == FDB_RESULT_SUCCESS);
2821        sprintf(bodybuf, "body%d_txn1", i);
2822        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2823        fdb_doc_free(rdoc);
2824    }
2825
2826    // begin new transaction
2827    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
2828    // update doc#5
2829    i = 5;
2830    sprintf(keybuf, "key%d", i);
2831    sprintf(metabuf, "meta%d", i);
2832    sprintf(bodybuf, "body%d_before_compaction", i);
2833    fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf),
2834                            (void*)metabuf, strlen(metabuf),
2835                            (void*)bodybuf, strlen(bodybuf));
2836    fdb_set(db_txn1, rdoc);
2837    fdb_doc_free(rdoc);
2838
2839    // do compaction
2840    fdb_compact(dbfile, "mvcc_test2");
2841
2842    // retrieve doc#5
2843    // using txn1
2844    fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2845    status = fdb_get(db_txn1, rdoc);
2846    TEST_CHK(status == FDB_RESULT_SUCCESS);
2847    TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2848    fdb_doc_free(rdoc);
2849
2850    // general retrieval
2851    fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2852    status = fdb_get(db, rdoc);
2853    TEST_CHK(status == FDB_RESULT_SUCCESS);
2854    sprintf(bodybuf, "body%d_txn1", i);
2855    TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2856    fdb_doc_free(rdoc);
2857
2858    // commit transaction
2859    fdb_end_transaction(dbfile_txn1, FDB_COMMIT_NORMAL);
2860    // retrieve check
2861    for (i=0;i<n;++i){
2862        // general get
2863        sprintf(keybuf, "key%d", i);
2864        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2865        status = fdb_get(db, rdoc);
2866        TEST_CHK(status == FDB_RESULT_SUCCESS);
2867        if (i != 5) {
2868            sprintf(bodybuf, "body%d_txn1", i);
2869        } else {
2870            sprintf(bodybuf, "body%d_before_compaction", i);
2871        }
2872        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2873        fdb_doc_free(rdoc);
2874    }
2875
2876    // close & re-open db file
2877    fdb_close(dbfile);
2878    fdb_open(&dbfile, "mvcc_test2", &fconfig);
2879    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2880    status = fdb_set_log_callback(db, logCallbackFunc,
2881                                  (void *) "transaction_test");
2882    TEST_CHK(status == FDB_RESULT_SUCCESS);
2883    // retrieve check again
2884    for (i=0;i<n;++i){
2885        // general get
2886        sprintf(keybuf, "key%d", i);
2887        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2888        status = fdb_get(db, rdoc);
2889        TEST_CHK(status == FDB_RESULT_SUCCESS);
2890        if (i != 5) {
2891            sprintf(bodybuf, "body%d_txn1", i);
2892        } else {
2893            sprintf(bodybuf, "body%d_before_compaction", i);
2894        }
2895        TEST_CMP(rdoc->body, bodybuf, rdoc->bodylen);
2896        fdb_doc_free(rdoc);
2897    }
2898
2899    // close db file
2900    fdb_close(dbfile);
2901    fdb_close(dbfile_txn1);
2902    fdb_close(dbfile_txn2);
2903
2904    // free all documents
2905    for (i=0;i<n;++i){
2906        fdb_doc_free(doc[i]);
2907    }
2908
2909    // free all resources
2910    fdb_shutdown();
2911
2912    memleak_end();
2913
2914    TEST_RESULT("transaction test");
2915}
2916
2917void transaction_simple_api_test()
2918{
2919    TEST_INIT();
2920
2921    memleak_start();
2922
2923    int i, r;
2924    int n = 10;
2925    size_t valuelen;
2926    void *value;
2927    fdb_file_handle *dbfile, *dbfile_txn1, *dbfile_txn2;
2928    fdb_kvs_handle *db, *db_txn1, *db_txn2;
2929    fdb_status status;
2930
2931    char keybuf[256], bodybuf[256];
2932
2933    // remove previous mvcc_test files
2934    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
2935    (void)r;
2936
2937    fdb_config fconfig = fdb_get_default_config();
2938    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2939    fconfig.buffercache_size = 0;
2940    fconfig.wal_threshold = 1024;
2941    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2942    fconfig.purging_interval = 0;
2943    fconfig.compaction_threshold = 0;
2944
2945    // open db
2946    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
2947    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2948    status = fdb_set_log_callback(db, logCallbackFunc,
2949                                  (void *) "transaction_simple_api_test");
2950    TEST_CHK(status == FDB_RESULT_SUCCESS);
2951
2952    // insert key-value pairs
2953    for (i=0;i<n;++i){
2954        sprintf(keybuf, "key%d", i);
2955        sprintf(bodybuf, "body%d", i);
2956        status = fdb_set_kv(db, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
2957        TEST_CHK(status == FDB_RESULT_SUCCESS);
2958    }
2959
2960    // commit
2961    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2962
2963    // open db and begin transactions
2964    fdb_open(&dbfile_txn1, "./mvcc_test1", &fconfig);
2965    fdb_open(&dbfile_txn2, "./mvcc_test1", &fconfig);
2966    fdb_kvs_open_default(dbfile_txn1, &db_txn1, &kvs_config);
2967    fdb_kvs_open_default(dbfile_txn2, &db_txn2, &kvs_config);
2968    fdb_begin_transaction(dbfile_txn1, FDB_ISOLATION_READ_COMMITTED);
2969    fdb_begin_transaction(dbfile_txn2, FDB_ISOLATION_READ_COMMITTED);
2970
2971    // concurrently update docs
2972    for (i=0;i<n;++i){
2973        sprintf(keybuf, "key%d", i);
2974        sprintf(bodybuf, "body%d_txn1", i);
2975        fdb_set_kv(db_txn1, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
2976
2977        sprintf(keybuf, "key%d", i);
2978        sprintf(bodybuf, "body%d_txn2", i);
2979        fdb_set_kv(db_txn2, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
2980    }
2981
2982    // retrieve key-value pairs
2983    for (i=0;i<n;++i){
2984        // general retrieval
2985        sprintf(keybuf, "key%d", i);
2986        sprintf(bodybuf, "body%d", i);
2987        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value, &valuelen);
2988        TEST_CHK(status == FDB_RESULT_SUCCESS);
2989        TEST_CMP(value, bodybuf, valuelen);
2990        fdb_free_block(value);
2991
2992        // txn1
2993        sprintf(keybuf, "key%d", i);
2994        sprintf(bodybuf, "body%d_txn1", i);
2995        status = fdb_get_kv(db_txn1, keybuf, strlen(keybuf), &value, &valuelen);
2996        TEST_CHK(status == FDB_RESULT_SUCCESS);
2997        TEST_CMP(value, bodybuf, valuelen);
2998        fdb_free_block(value);
2999
3000        // txn2
3001        sprintf(keybuf, "key%d", i);
3002        sprintf(bodybuf, "body%d_txn2", i);
3003        status = fdb_get_kv(db_txn2, keybuf, strlen(keybuf), &value, &valuelen);
3004        TEST_CHK(status == FDB_RESULT_SUCCESS);
3005        TEST_CMP(value, bodybuf, valuelen);
3006        fdb_free_block(value);
3007    }
3008
3009    // commit txn1
3010    fdb_end_transaction(dbfile_txn1, FDB_COMMIT_NORMAL);
3011    for (i=0;i<n;++i){
3012        // general retrieval
3013        sprintf(keybuf, "key%d", i);
3014        sprintf(bodybuf, "body%d_txn1", i);
3015        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value, &valuelen);
3016        TEST_CHK(status == FDB_RESULT_SUCCESS);
3017        TEST_CMP(value, bodybuf, valuelen);
3018        status = fdb_free_block(value);
3019        TEST_CHK(status == FDB_RESULT_SUCCESS);
3020
3021        // txn2
3022        sprintf(keybuf, "key%d", i);
3023        sprintf(bodybuf, "body%d_txn2", i);
3024        status = fdb_get_kv(db_txn2, keybuf, strlen(keybuf), &value, &valuelen);
3025        TEST_CHK(status == FDB_RESULT_SUCCESS);
3026        TEST_CMP(value, bodybuf, valuelen);
3027        fdb_free_block(value);
3028    }
3029
3030    // commit txn2
3031    fdb_end_transaction(dbfile_txn2, FDB_COMMIT_NORMAL);
3032    for (i=0;i<n;++i){
3033        // general retrieval
3034        sprintf(keybuf, "key%d", i);
3035        sprintf(bodybuf, "body%d_txn2", i);
3036        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value, &valuelen);
3037        TEST_CHK(status == FDB_RESULT_SUCCESS);
3038        TEST_CMP(value, bodybuf, valuelen);
3039        fdb_free_block(value);
3040    }
3041
3042    // close db file
3043    fdb_close(dbfile);
3044    fdb_close(dbfile_txn1);
3045    fdb_close(dbfile_txn2);
3046
3047    // free all resources
3048    fdb_shutdown();
3049
3050    memleak_end();
3051
3052    TEST_RESULT("transaction simple API test");
3053}
3054
3055
3056void rollback_prior_to_ops(bool walflush)
3057{
3058
3059    TEST_INIT();
3060    memleak_start();
3061
3062    int r;
3063    int i, j, n=100;
3064    int expected_doc_count = 0;
3065    int rb_seqnum;
3066    char keybuf[256], bodybuf[256];
3067    fdb_file_handle *dbfile;
3068    fdb_iterator *it;
3069    fdb_kvs_handle *kv1, *mirror_kv1;
3070    fdb_kvs_info info;
3071    fdb_config fconfig = fdb_get_default_config();
3072    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3073    fdb_doc **doc = alca(fdb_doc*, n);
3074    fdb_doc *rdoc = NULL, *vdoc;
3075    fdb_status status;
3076
3077    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
3078    (void)r;
3079
3080    fconfig.wal_threshold = 1024;
3081    fconfig.flags = FDB_OPEN_FLAG_CREATE;
3082    fconfig.compaction_threshold = 10;
3083
3084    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
3085    fdb_kvs_open(dbfile, &kv1, "kv1", &kvs_config);
3086
3087    if(walflush){
3088        fdb_kvs_open(dbfile, &mirror_kv1, NULL, &kvs_config);
3089    } else {
3090        fdb_kvs_open(dbfile, &mirror_kv1, "mirror", &kvs_config);
3091    }
3092
3093    // set n docs
3094    for(i=0;i<n;++i){
3095        sprintf(keybuf, "key%d", i);
3096        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3097            NULL, 0, NULL, 0);
3098        fdb_set(kv1, doc[i]);
3099        fdb_set(mirror_kv1, doc[i]);
3100        expected_doc_count++;
3101    }
3102
3103    // commit
3104    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3105
3106    // delete subset of recently loaded docs
3107    for(i=0;i<n/2;++i){
3108        fdb_del(kv1, doc[i]);
3109        fdb_del(mirror_kv1, doc[i]);
3110        expected_doc_count--;
3111        fdb_doc_free(doc[i]);
3112    }
3113
3114    for(i=n/2;i<n;++i){
3115        fdb_doc_free(doc[i]);
3116    }
3117
3118    // commit and save seqnum
3119    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3120    fdb_get_kvs_info(kv1, &info);
3121    TEST_CHK(info.doc_count == (size_t)expected_doc_count);
3122    rb_seqnum = info.last_seqnum;
3123
3124    for (j=0;j<100;++j){
3125        // load some more docs but stop mirroring
3126        for(i=0;i<n;++i){
3127            sprintf(keybuf, "key%d", n+i);
3128            fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3129                NULL, 0, NULL, 0);
3130            fdb_set(kv1, doc[i]);
3131            if( n%2 == 0){
3132                fdb_del(kv1, doc[i]);
3133            }
3134            fdb_doc_free(doc[i]);
3135        }
3136    }
3137
3138    // commit wal or normal
3139    if(walflush){
3140        fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
3141    } else {
3142        fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3143    }
3144
3145    // rollback
3146    status = fdb_rollback(&kv1, rb_seqnum);
3147    TEST_CHK(status == FDB_RESULT_SUCCESS);
3148
3149    fdb_get_kvs_info(kv1, &info);
3150    TEST_CHK(info.doc_count == (size_t)expected_doc_count);
3151    fdb_get_kvs_info(mirror_kv1, &info);
3152    TEST_CHK(info.doc_count == (size_t)expected_doc_count);
3153
3154    status = fdb_iterator_sequence_init(mirror_kv1, &it, 0, 0, FDB_ITR_NONE);
3155    TEST_CHK(status == FDB_RESULT_SUCCESS);
3156    do {
3157        status = fdb_iterator_get_metaonly(it, &rdoc);
3158        TEST_CHK(status == FDB_RESULT_SUCCESS);
3159
3160        fdb_doc_create(&vdoc, rdoc->key, rdoc->keylen,
3161                              rdoc->meta, rdoc->metalen,
3162                              rdoc->body, rdoc->bodylen);
3163        status = fdb_get(kv1, vdoc);
3164        if (rdoc->deleted){
3165            TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
3166        } else {
3167            TEST_CHK(status == FDB_RESULT_SUCCESS);
3168        }
3169        fdb_doc_free(rdoc);
3170        rdoc = NULL;
3171        fdb_doc_free(vdoc);
3172    } while (fdb_iterator_next(it) != FDB_RESULT_ITERATOR_FAIL);
3173
3174    fdb_iterator_close(it);
3175    fdb_kvs_close(mirror_kv1);
3176    fdb_kvs_close(kv1);
3177    fdb_close(dbfile);
3178    fdb_shutdown();
3179    memleak_end();
3180
3181    sprintf(bodybuf, "rollback prior to ops test %s",
3182            walflush ? "commit with wal flush:" : "normal commit:");
3183    TEST_RESULT(bodybuf);
3184}
3185
3186struct cb_snapshot_args {
3187    fdb_kvs_handle *handle;
3188    int ndocs;
3189    int nupdates;
3190};
3191
3192static void _snapshot_check(fdb_kvs_handle *handle, int ndocs, int nupdates)
3193{
3194    TEST_INIT();
3195    int i, j, update_no;
3196    int commit_term = ndocs/2;
3197    char keybuf[256], bodybuf[256];
3198    char *value;
3199    size_t valuelen;
3200    fdb_kvs_handle *snap;
3201    fdb_status s;
3202    fdb_kvs_info info;
3203
3204    // check last seqnum
3205    fdb_get_kvs_info(handle, &info);
3206    TEST_CHK(info.last_seqnum == (fdb_seqnum_t)commit_term * nupdates);
3207
3208    // open snapshot for every 'commit_term' seq numbers
3209    for (i=0; i<nupdates; i++) {
3210        s = fdb_snapshot_open(handle, &snap, (i+1)*commit_term);
3211        TEST_CHK(s == FDB_RESULT_SUCCESS);
3212
3213        for (j=0;j<ndocs;++j) {
3214            if (j < ndocs/2) {
3215                update_no = (i/2) * 2;
3216            } else {
3217                if (i == 0) {
3218                    break;
3219                }
3220                update_no = 1 + ((i-1)/2) * 2;
3221            }
3222            sprintf(keybuf, "key%04d", j);
3223            sprintf(bodybuf, "body%04d_update%d", j, update_no);
3224            s = fdb_get_kv(snap, keybuf, strlen(keybuf)+1,
3225                           (void**)&value, &valuelen);
3226            TEST_CHK(s == FDB_RESULT_SUCCESS);
3227            TEST_CMP(value, bodybuf, valuelen);
3228            fdb_free_block(value);
3229        }
3230
3231        s = fdb_kvs_close(snap);
3232        TEST_CHK(s == FDB_RESULT_SUCCESS);
3233    }
3234}
3235
3236static void _snapshot_update_docs(fdb_file_handle *fhandle, struct cb_snapshot_args *args)
3237{
3238    int i;
3239    char keybuf[256], bodybuf[256];
3240    fdb_status s;
3241    TEST_INIT();
3242
3243    // update (half) docs
3244    if (args->nupdates % 2 == 0) {
3245        // former half
3246        for (i=0; i<args->ndocs/2; ++i) {
3247            sprintf(keybuf, "key%04d", i);
3248            sprintf(bodybuf, "body%04d_update%d", i, args->nupdates);
3249            s = fdb_set_kv(args->handle, keybuf, strlen(keybuf)+1,
3250                           bodybuf, strlen(bodybuf)+1);
3251            TEST_CHK(s == FDB_RESULT_SUCCESS);
3252        }
3253    } else {
3254        // latter half
3255        for (i=args->ndocs/2 ; i<args->ndocs; ++i) {
3256            sprintf(keybuf, "key%04d", i);
3257            sprintf(bodybuf, "body%04d_update%d", i, args->nupdates);
3258            fdb_set_kv(args->handle, keybuf, strlen(keybuf)+1,
3259                           bodybuf, strlen(bodybuf)+1);
3260        }
3261    }
3262    s = fdb_commit(fhandle, FDB_COMMIT_NORMAL);
3263    TEST_CHK(s == FDB_RESULT_SUCCESS);
3264    args->nupdates++;
3265}
3266
3267static int cb_snapshot(fdb_file_handle *fhandle,
3268                       fdb_compaction_status status,
3269                       fdb_doc *doc, uint64_t old_offset, uint64_t new_offset,
3270                       void *ctx)
3271{
3272    struct cb_snapshot_args *args = (struct cb_snapshot_args *)ctx;
3273
3274    if (status == FDB_CS_BEGIN) {
3275        // first verification
3276        _snapshot_check(args->handle, args->ndocs, args->nupdates);
3277        // update half docs
3278        _snapshot_update_docs(fhandle, args);
3279        // second verification
3280        _snapshot_check(args->handle, args->ndocs, args->nupdates);
3281    } else { // if (status == FDB_CS_END)
3282        // first verification
3283        _snapshot_check(args->handle, args->ndocs, args->nupdates);
3284        // update half docs
3285        _snapshot_update_docs(fhandle, args);
3286        // second verification
3287        _snapshot_check(args->handle, args->ndocs, args->nupdates);
3288    }
3289    return 0;
3290}
3291
3292void snapshot_concurrent_compaction_test()
3293{
3294    TEST_INIT();
3295    memleak_start();
3296
3297    int i, j, idx, r;
3298    int n = 100;
3299    int commit_term = n/2;
3300    char keybuf[256], bodybuf[256];
3301    fdb_file_handle *dbfile;
3302    fdb_kvs_handle *db;
3303    fdb_status s;
3304    fdb_config fconfig = fdb_get_default_config();
3305    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3306    struct cb_snapshot_args cb_args;
3307
3308    memset(&cb_args, 0x0, sizeof(struct cb_snapshot_args));
3309    fconfig.wal_threshold = 128;
3310    fconfig.flags = FDB_OPEN_FLAG_CREATE;
3311    fconfig.compaction_cb = cb_snapshot;
3312    fconfig.compaction_cb_ctx = &cb_args;
3313    fconfig.compaction_cb_mask = FDB_CS_BEGIN |
3314                                 FDB_CS_END;
3315
3316    // remove previous mvcc_test files
3317    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
3318    (void)r;
3319
3320    // open db
3321    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
3322    fdb_kvs_open(dbfile, &db, "db", &kvs_config);
3323    cb_args.handle = db;
3324
3325    // write docs & commit for each n/2 doc updates
3326    for (i=0;i<n;++i){
3327        idx = i;
3328        j = i/commit_term;
3329        sprintf(keybuf, "key%04d", idx);
3330        sprintf(bodybuf, "body%04d_update%d", idx, j);
3331        s = fdb_set_kv(db, keybuf, strlen(keybuf)+1,
3332                           bodybuf, strlen(bodybuf)+1);
3333        TEST_CHK(s == FDB_RESULT_SUCCESS);
3334        if ((i+1)%commit_term == 0) {
3335            s = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3336	    TEST_CHK(s == FDB_RESULT_SUCCESS);
3337        }
3338    }
3339    cb_args.ndocs = n;
3340    cb_args.nupdates = 2;
3341
3342    _snapshot_check(db, cb_args.ndocs, cb_args.nupdates);
3343
3344    s = fdb_compact(dbfile, "./mvcc_test2");
3345    TEST_CHK(s == FDB_RESULT_SUCCESS);
3346
3347    fdb_close(dbfile);
3348    fdb_shutdown();
3349    memleak_end();
3350    TEST_RESULT("snapshot with concurrent compaction test");
3351}
3352
3353void rollback_to_zero_test(bool multi_kv)
3354{
3355    TEST_INIT();
3356
3357    memleak_start();
3358
3359    int i, r;
3360    int n = 20;
3361    int num_kvs = 4; // keep this the same as number of fdb_commit() calls
3362    fdb_file_handle *dbfile;
3363    fdb_kvs_handle **db = alca(fdb_kvs_handle *, num_kvs);
3364    fdb_doc **doc = alca(fdb_doc*, n);
3365    fdb_kvs_info info;
3366    fdb_status status;
3367
3368    char keybuf[256], metabuf[256], bodybuf[256];
3369    char kv_name[8];
3370
3371    // remove previous mvcc_test files
3372    r = system(SHELL_DEL" mvcc_test* > errorlog.txt");
3373    (void)r;
3374
3375    fdb_config fconfig = fdb_get_default_config();
3376    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3377    fconfig.buffercache_size = 0;
3378    fconfig.wal_threshold = 1024;
3379    fconfig.flags = FDB_OPEN_FLAG_CREATE;
3380    fconfig.compaction_threshold = 0;
3381    fconfig.multi_kv_instances = multi_kv;
3382
3383    // open db
3384    fdb_open(&dbfile, "./mvcc_test1", &fconfig);
3385
3386    if (multi_kv) {
3387        for (r = 0; r < num_kvs; ++r) {
3388            sprintf(kv_name, "kv%d", r);
3389            fdb_kvs_open(dbfile, &db[r], kv_name, &kvs_config);
3390        }
3391    } else {
3392        num_kvs = 1;
3393        fdb_kvs_open_default(dbfile, &db[0], &kvs_config);
3394    }
3395
3396    for (r = 0; r < num_kvs; ++r) {
3397        status = fdb_set_log_callback(db[r], logCallbackFunc,
3398                                      (void *) "rollback_to_zero_test");
3399        TEST_CHK(status == FDB_RESULT_SUCCESS);
3400    }
3401
3402   // ------- Setup test ----------------------------------
3403   // insert documents of 0-4
3404    for (i=0; i<n/4; i++){
3405        sprintf(keybuf, "key%d", i);
3406        sprintf(metabuf, "meta%d", i);
3407        sprintf(bodybuf, "body%d", i);
3408        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3409            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
3410        for (r = 0; r < num_kvs; ++r) {
3411            fdb_set(db[r], doc[i]);
3412        }
3413    }
3414
3415    // commit with a manual WAL flush (these docs go into HB-trie)
3416    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
3417
3418    // insert documents from 5 - 9
3419    for (; i < n/2; i++){
3420        sprintf(keybuf, "key%d", i);
3421        sprintf(metabuf, "meta%d", i);
3422        sprintf(bodybuf, "body%d", i);
3423        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3424            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
3425        for (r = 0; r < num_kvs; ++r) {
3426            fdb_set(db[r], doc[i]);
3427        }
3428    }
3429
3430    // commit again without a WAL flush
3431    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3432
3433    // insert documents from 10-14 into HB-trie
3434    for (; i < (n/2 + n/4); i++){
3435        sprintf(keybuf, "key%d", i);
3436        sprintf(metabuf, "meta%d", i);
3437        sprintf(bodybuf, "body%d", i);
3438        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3439            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
3440        for (r = 0; r < num_kvs; ++r) {
3441            fdb_set(db[r], doc[i]);
3442        }
3443    }
3444    // manually flush WAL & commit
3445    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
3446
3447    // insert documents from 15 - 19 on file into the WAL
3448    for (; i < n; i++){
3449        sprintf(keybuf, "key%d", i);
3450        sprintf(metabuf, "meta%d", i);
3451        sprintf(bodybuf, "body%d", i);
3452        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
3453            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
3454        for (r = 0; r < num_kvs; ++r) {
3455            fdb_set(db[r], doc[i]);
3456        }
3457    }
3458    // commit without a WAL flush
3459