1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdint.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <unistd.h>
25#include <sys/types.h>
26#include <sys/stat.h>
27#endif
28
29#include "libforestdb/forestdb.h"
30#include "test.h"
31
32#include "internal_types.h"
33#include "wal.h"
34#include "functional_util.h"
35
36#undef THREAD_SANITIZER
37#if __clang__
38#   if defined(__has_feature) && __has_feature(thread_sanitizer)
39#define THREAD_SANITIZER
40#   endif
41#endif
42
43struct cb_args {
44    int n_moved_docs;
45    int n_batch_move;
46    bool begin;
47    bool end;
48    bool wal_flush;
49    fdb_kvs_handle *handle;
50};
51
52static fdb_compact_decision compaction_cb(fdb_file_handle *fhandle,
53                            fdb_compaction_status status, const char *kv_name,
54                            fdb_doc *doc, uint64_t old_offset,
55                            uint64_t new_offset,
56                            void *ctx)
57{
58    TEST_INIT();
59    fdb_doc *rdoc;
60    fdb_status s;
61    fdb_compact_decision ret = FDB_CS_KEEP_DOC;
62    struct cb_args *args = (struct cb_args *)ctx;
63
64    (void) doc;
65    (void) new_offset;
66
67    if (status == FDB_CS_BEGIN) {
68        args->begin = true;
69    } else if (status == FDB_CS_END) {
70        args->end = true;
71    } else if (status == FDB_CS_FLUSH_WAL) {
72        args->wal_flush = true;
73    } else if (status == FDB_CS_MOVE_DOC) {
74        if (doc->deleted) {
75            ret = FDB_CS_DROP_DOC;
76        }
77
78        args->n_moved_docs++;
79        fdb_doc_create(&rdoc, NULL, 0, NULL, 0, NULL, 0);
80        rdoc->offset = old_offset;
81        s = fdb_get_byoffset(args->handle, rdoc);
82        TEST_CHK(s == FDB_RESULT_SUCCESS);
83        fdb_doc_free(rdoc);
84
85        if (fhandle->root->config.multi_kv_instances) {
86            TEST_CMP(kv_name, "db", 2);
87        } else {
88            TEST_CMP(kv_name, "default", 7);
89        }
90    } else { // FDB_CS_BATCH_MOVE
91        args->n_batch_move++;
92        fdb_doc_create(&rdoc, NULL, 0, NULL, 0, NULL, 0);
93        rdoc->offset = old_offset;
94        s = fdb_get_byoffset(args->handle, rdoc);
95        TEST_CHK (s == FDB_RESULT_SUCCESS);
96        fdb_doc_free(rdoc);
97    }
98    return ret;
99}
100
101void compaction_callback_test(bool multi_kv)
102{
103    TEST_INIT();
104    memleak_start();
105
106    int i, r;
107    int n = 1000;
108    char keybuf[256], bodybuf[256];
109    fdb_file_handle *dbfile;
110    fdb_kvs_handle *db;
111    fdb_status s;
112    fdb_config fconfig = fdb_get_default_config();
113    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
114    struct cb_args cb_args;
115
116    memset(&cb_args, 0x0, sizeof(struct cb_args));
117    fconfig.wal_threshold = 1024;
118    fconfig.flags = FDB_OPEN_FLAG_CREATE;
119    fconfig.compaction_cb = compaction_cb;
120    fconfig.compaction_cb_ctx = &cb_args;
121    fconfig.compaction_cb_mask = FDB_CS_BEGIN |
122                                 FDB_CS_MOVE_DOC |
123                                 FDB_CS_FLUSH_WAL |
124                                 FDB_CS_END;
125    fconfig.multi_kv_instances = multi_kv;
126
127    // remove all previous compact_test files
128    r = system(SHELL_DEL" compact_test* > errorlog.txt");
129    (void)r;
130
131    // open db
132    fdb_open(&dbfile, "./compact_test1", &fconfig);
133    if (multi_kv) {
134        fdb_kvs_open(dbfile, &db, "db", &kvs_config);
135    } else {
136        fdb_kvs_open_default(dbfile, &db, &kvs_config);
137    }
138    cb_args.handle = db;
139
140    // write docs
141    for (i=0;i<n;++i){
142        sprintf(keybuf, "key%04d", i);
143        sprintf(bodybuf, "body%04d", i);
144        s = fdb_set_kv(db, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
145        TEST_CHK(s == FDB_RESULT_SUCCESS);
146    }
147    s = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
148    TEST_CHK(s == FDB_RESULT_SUCCESS);
149    s = fdb_compact(dbfile, "./compact_test2");
150    TEST_CHK(s == FDB_RESULT_SUCCESS);
151
152    TEST_CHK(cb_args.n_moved_docs == n);
153    TEST_CHK(cb_args.begin);
154    TEST_CHK(cb_args.end);
155    TEST_CHK(cb_args.wal_flush);
156    fdb_close(dbfile);
157
158    // open db without move doc
159    memset(&cb_args, 0x0, sizeof(struct cb_args));
160    fconfig.compaction_cb_mask = FDB_CS_BEGIN |
161                                 FDB_CS_FLUSH_WAL |
162                                 FDB_CS_END;
163    fdb_open(&dbfile, "./compact_test2", &fconfig);
164
165    if (multi_kv) {
166        fdb_kvs_open(dbfile, &db, "db", &kvs_config);
167    } else {
168        fdb_kvs_open_default(dbfile, &db, &kvs_config);
169    }
170    cb_args.handle = db;
171    s = fdb_compact(dbfile, "./compact_test3");
172    TEST_CHK(s == FDB_RESULT_SUCCESS);
173    TEST_CHK(cb_args.n_moved_docs == 0);
174    TEST_CHK(cb_args.begin);
175    TEST_CHK(cb_args.end);
176    TEST_CHK(cb_args.wal_flush);
177    fdb_close(dbfile);
178
179    // open db without wal_flush
180    memset(&cb_args, 0x0, sizeof(struct cb_args));
181    fconfig.compaction_cb_mask = FDB_CS_BEGIN |
182                                 FDB_CS_MOVE_DOC |
183                                 FDB_CS_END;
184    fdb_open(&dbfile, "./compact_test3", &fconfig);
185    if (multi_kv) {
186        fdb_kvs_open(dbfile, &db, "db", &kvs_config);
187    } else {
188        fdb_kvs_open_default(dbfile, &db, &kvs_config);
189    }
190    cb_args.handle = db;
191    s = fdb_compact(dbfile, "./compact_test4");
192    TEST_CHK(s == FDB_RESULT_SUCCESS);
193    TEST_CHK(cb_args.n_moved_docs == n);
194    TEST_CHK(cb_args.begin);
195    TEST_CHK(cb_args.end);
196    TEST_CHK(!cb_args.wal_flush);
197    fdb_close(dbfile);
198
199    // open db without begin/end
200    memset(&cb_args, 0x0, sizeof(struct cb_args));
201    fconfig.compaction_cb_mask = FDB_CS_MOVE_DOC |
202                                 FDB_CS_FLUSH_WAL;
203    fdb_open(&dbfile, "./compact_test4", &fconfig);
204    if (multi_kv) {
205        fdb_kvs_open(dbfile, &db, "db", &kvs_config);
206    } else {
207        fdb_kvs_open_default(dbfile, &db, &kvs_config);
208    }
209    cb_args.handle = db;
210    s = fdb_compact(dbfile, "./compact_test5");
211    TEST_CHK(s == FDB_RESULT_SUCCESS);
212    TEST_CHK(cb_args.n_moved_docs == n);
213    TEST_CHK(!cb_args.begin);
214    TEST_CHK(!cb_args.end);
215    TEST_CHK(cb_args.wal_flush);
216    fdb_close(dbfile);
217
218    // open db with batch move
219    memset(&cb_args, 0x0, sizeof(struct cb_args));
220    fconfig.compaction_cb_mask = FDB_CS_BATCH_MOVE;
221    fdb_open(&dbfile, "./compact_test5", &fconfig);
222    if (multi_kv) {
223        fdb_kvs_open(dbfile, &db, "db", &kvs_config);
224    } else {
225        fdb_kvs_open_default(dbfile, &db, &kvs_config);
226    }
227    cb_args.handle = db;
228    s = fdb_compact(dbfile, "./compact_test6");
229    TEST_CHK(s == FDB_RESULT_SUCCESS);
230    TEST_CHK(cb_args.n_moved_docs == 0);
231    TEST_CHK(cb_args.n_batch_move && cb_args.n_batch_move <= n);
232    TEST_CHK(!cb_args.begin);
233    TEST_CHK(!cb_args.end);
234    TEST_CHK(!cb_args.wal_flush);
235    fdb_close(dbfile);
236
237    fdb_shutdown();
238
239    memleak_end();
240    if (multi_kv) {
241        TEST_RESULT("compaction callback function multi kv mode test");
242    } else {
243        TEST_RESULT("compaction callback function single kv mode test");
244    }
245}
246
247void compact_wo_reopen_test()
248{
249    TEST_INIT();
250
251    memleak_start();
252
253    int i, r;
254    int n = 3;
255    fdb_file_handle *dbfile, *dbfile_new;
256    fdb_kvs_handle *db;
257    fdb_kvs_handle *db_new;
258    fdb_doc **doc = alca(fdb_doc*, n);
259    fdb_doc *rdoc;
260    fdb_status status;
261
262    char keybuf[256], metabuf[256], bodybuf[256];
263
264    // remove previous compact_test files
265    r = system(SHELL_DEL" compact_test* > errorlog.txt");
266    (void)r;
267
268    fdb_config fconfig = fdb_get_default_config();
269    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
270    fconfig.buffercache_size = 16777216;
271    fconfig.wal_threshold = 1024;
272    fconfig.flags = FDB_OPEN_FLAG_CREATE;
273    fconfig.compaction_threshold = 0;
274
275    // open db
276    fdb_open(&dbfile, "./compact_test1", &fconfig);
277    fdb_kvs_open_default(dbfile, &db, &kvs_config);
278    status = fdb_set_log_callback(db, logCallbackFunc,
279                                  (void *) "compact_wo_reopen_test");
280    TEST_CHK(status == FDB_RESULT_SUCCESS);
281    fdb_open(&dbfile_new, "./compact_test1", &fconfig);
282    fdb_kvs_open_default(dbfile_new, &db_new, &kvs_config);
283    status = fdb_set_log_callback(db_new, logCallbackFunc,
284                                  (void *) "compact_wo_reopen_test");
285    TEST_CHK(status == FDB_RESULT_SUCCESS);
286
287    // insert documents
288    for (i=0;i<n;++i){
289        sprintf(keybuf, "key%d", i);
290        sprintf(metabuf, "meta%d", i);
291        sprintf(bodybuf, "body%d", i);
292        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
293            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
294        fdb_set(db, doc[i]);
295    }
296
297    // remove doc
298    fdb_doc_create(&rdoc, doc[1]->key, doc[1]->keylen, doc[1]->meta, doc[1]->metalen, NULL, 0);
299    rdoc->deleted = true;
300    fdb_set(db, rdoc);
301    fdb_doc_free(rdoc);
302
303    // commit
304    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
305
306    // perform compaction using one handle
307    fdb_compact(dbfile, (char *) "./compact_test2");
308
309    // retrieve documents using the other handle without close/re-open
310    for (i=0;i<n;++i){
311        // search by key
312        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
313        status = fdb_get(db_new, rdoc);
314
315        if (i != 1) {
316            TEST_CHK(status == FDB_RESULT_SUCCESS);
317            TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
318            TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
319        }else{
320            TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
321        }
322
323        // free result document
324        fdb_doc_free(rdoc);
325    }
326    // check the other handle's filename
327    fdb_file_info info;
328    fdb_get_file_info(dbfile_new, &info);
329    TEST_CHK(!strcmp("./compact_test2", info.filename));
330
331    // free all documents
332    for (i=0;i<n;++i){
333        fdb_doc_free(doc[i]);
334    }
335
336    // close db file
337    fdb_kvs_close(db);
338    fdb_kvs_close(db_new);
339    fdb_close(dbfile);
340    fdb_close(dbfile_new);
341
342    // free all resources
343    fdb_shutdown();
344
345    memleak_end();
346
347    TEST_RESULT("compaction without reopen test");
348}
349
350void compact_with_reopen_test()
351{
352    TEST_INIT();
353
354    memleak_start();
355
356    int i, r;
357    int n = 100;
358    fdb_file_handle *dbfile;
359    fdb_kvs_handle *db;
360    fdb_doc **doc = alca(fdb_doc*, n);
361    fdb_doc *rdoc;
362    fdb_status status;
363
364    char keybuf[256], metabuf[256], bodybuf[256], temp[256];
365
366    // remove previous compact_test files
367    r = system(SHELL_DEL" compact_test* > errorlog.txt");
368    (void)r;
369
370    fdb_config fconfig = fdb_get_default_config();
371    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
372    fconfig.buffercache_size = 16777216;
373    fconfig.wal_threshold = 1024;
374    fconfig.flags = FDB_OPEN_FLAG_CREATE;
375    fconfig.compaction_threshold = 0;
376
377    // open db
378    fdb_open(&dbfile, "./compact_test1", &fconfig);
379    fdb_kvs_open_default(dbfile, &db, &kvs_config);
380    status = fdb_set_log_callback(db, logCallbackFunc,
381                                  (void *) "compact_with_reopen_test");
382    TEST_CHK(status == FDB_RESULT_SUCCESS);
383
384    // insert documents
385    for (i=0;i<n;++i){
386        sprintf(keybuf, "key%d", i);
387        sprintf(metabuf, "meta%d", i);
388        sprintf(bodybuf, "body%d", i);
389        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
390            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
391        fdb_set(db, doc[i]);
392    }
393
394    // remove doc
395    fdb_doc_create(&rdoc, doc[1]->key, doc[1]->keylen, doc[1]->meta, doc[1]->metalen, NULL, 0);
396    rdoc->deleted = true;
397    fdb_set(db, rdoc);
398    fdb_doc_free(rdoc);
399
400    // commit
401    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
402
403    // perform compaction using one handle
404    fdb_compact(dbfile, (char *) "./compact_test2");
405
406    // close db file
407    fdb_kvs_close(db);
408    fdb_close(dbfile);
409
410    r = system(SHELL_MOVE " compact_test2 compact_test1 > errorlog.txt");
411    (void)r;
412    fdb_open(&dbfile, "./compact_test1", &fconfig);
413    fdb_kvs_open_default(dbfile, &db, &kvs_config);
414    status = fdb_set_log_callback(db, logCallbackFunc,
415                                  (void *) "compact_with_reopen_test");
416    TEST_CHK(status == FDB_RESULT_SUCCESS);
417
418    // retrieve documents using the other handle without close/re-open
419    for (i=0;i<n;++i){
420        // search by key
421        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
422        status = fdb_get(db, rdoc);
423
424        if (i != 1) {
425            TEST_CHK(status == FDB_RESULT_SUCCESS);
426            TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
427            TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
428        }else{
429            TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
430        }
431
432        // free result document
433        fdb_doc_free(rdoc);
434    }
435    // check the other handle's filename
436    fdb_file_info info;
437    fdb_get_file_info(dbfile, &info);
438    TEST_CHK(!strcmp("./compact_test1", info.filename));
439
440    // update documents
441    for (i=0;i<n;++i){
442        sprintf(metabuf, "newmeta%d", i);
443        sprintf(bodybuf, "newbody%d_%s", i, temp);
444        fdb_doc_update(&doc[i], (void *)metabuf, strlen(metabuf),
445                       (void *)bodybuf, strlen(bodybuf));
446        fdb_set(db, doc[i]);
447    }
448
449    // Open the database with another handle.
450    fdb_file_handle *second_dbfile;
451    fdb_kvs_handle *second_dbh;
452    fdb_open(&second_dbfile, "./compact_test1", &fconfig);
453    fdb_kvs_open_default(second_dbfile, &second_dbh, &kvs_config);
454    status = fdb_set_log_callback(second_dbh, logCallbackFunc,
455                                  (void *) "compact_with_reopen_test");
456    TEST_CHK(status == FDB_RESULT_SUCCESS);
457    // In-place compactions with a handle still open on the first old file
458    status = fdb_compact(dbfile, NULL);
459    TEST_CHK(status == FDB_RESULT_SUCCESS);
460
461    status = fdb_compact(dbfile, NULL);
462    TEST_CHK(status == FDB_RESULT_SUCCESS);
463
464    // MB-12977: retest compaction again..
465    status = fdb_compact(dbfile, NULL);
466    TEST_CHK(status == FDB_RESULT_SUCCESS);
467
468    fdb_kvs_close(db);
469    fdb_close(dbfile);
470
471    for (i=0;i<n;++i){
472        // search by key
473        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
474        status = fdb_get(second_dbh, rdoc);
475        TEST_CHK(status == FDB_RESULT_SUCCESS);
476        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
477        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
478        // free result document
479        fdb_doc_free(rdoc);
480    }
481
482    // Open database with an original name.
483    status = fdb_open(&dbfile, "./compact_test1", &fconfig);
484    TEST_CHK(status == FDB_RESULT_SUCCESS);
485    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
486    TEST_CHK(status == FDB_RESULT_SUCCESS);
487    status = fdb_set_log_callback(db, logCallbackFunc,
488                                  (void *) "compact_with_reopen_test");
489    TEST_CHK(status == FDB_RESULT_SUCCESS);
490    fdb_get_file_info(dbfile, &info);
491    // The actual file name should be a compacted one.
492    TEST_CHK(!strcmp("./compact_test1.3", info.filename));
493
494    fdb_kvs_close(second_dbh);
495    fdb_close(second_dbfile);
496
497    for (i=0;i<n;++i){
498        // search by key
499        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
500        status = fdb_get(db, rdoc);
501        TEST_CHK(status == FDB_RESULT_SUCCESS);
502        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
503        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
504        // free result document
505        fdb_doc_free(rdoc);
506    }
507
508    fdb_compact(dbfile, NULL);
509    fdb_kvs_close(db);
510    fdb_close(dbfile);
511
512    r = system(SHELL_MOVE " compact_test1 compact_test.fdb > errorlog.txt");
513    (void)r;
514    fdb_open(&dbfile, "./compact_test.fdb", &fconfig);
515    fdb_kvs_open_default(dbfile, &db, &kvs_config);
516    status = fdb_set_log_callback(db, logCallbackFunc,
517                                  (void *) "compact_with_reopen_test");
518    TEST_CHK(status == FDB_RESULT_SUCCESS);
519    // In-place compaction
520    fdb_compact(dbfile, NULL);
521    fdb_kvs_close(db);
522    fdb_close(dbfile);
523    // Open database with an original name.
524    status = fdb_open(&dbfile, "./compact_test.fdb", &fconfig);
525    TEST_CHK(status == FDB_RESULT_SUCCESS);
526    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
527    TEST_CHK(status == FDB_RESULT_SUCCESS);
528    status = fdb_set_log_callback(db, logCallbackFunc,
529                                  (void *) "compact_with_reopen_test");
530    TEST_CHK(status == FDB_RESULT_SUCCESS);
531    fdb_get_file_info(dbfile, &info);
532    TEST_CHK(!strcmp("./compact_test.fdb", info.filename));
533    TEST_CHK(info.doc_count == 100);
534
535    // free all documents
536    for (i=0;i<n;++i){
537        fdb_doc_free(doc[i]);
538    }
539
540    fdb_kvs_close(db);
541    fdb_close(dbfile);
542
543    // free all resources
544    fdb_shutdown();
545
546    memleak_end();
547
548    TEST_RESULT("compaction with reopen test");
549}
550
551void compact_reopen_named_kvs()
552{
553    TEST_INIT();
554
555    memleak_start();
556
557    int i, r;
558    int n = 100;
559    int nkvdocs;
560    fdb_file_handle *dbfile;
561    fdb_kvs_handle *db;
562    fdb_doc **doc = alca(fdb_doc*, n);
563    fdb_status status;
564    fdb_kvs_info kvs_info;
565
566    char keybuf[256], metabuf[256], bodybuf[256];
567
568    // remove previous compact_test files
569    r = system(SHELL_DEL" compact_test* > errorlog.txt");
570    (void)r;
571
572    fdb_config fconfig = fdb_get_default_config();
573    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
574    fconfig.wal_threshold = 1024;
575    fconfig.flags = FDB_OPEN_FLAG_CREATE;
576    fconfig.compaction_threshold = 0;
577
578    // open db
579    fdb_open(&dbfile, "./compact_test1", &fconfig);
580    fdb_kvs_open(dbfile, &db, "db",  &kvs_config);
581
582    status = fdb_set_log_callback(db, logCallbackFunc,
583                                  (void *) "compact_reopen_named_kvs");
584    TEST_CHK(status == FDB_RESULT_SUCCESS);
585
586    for (i=0;i<n;++i){
587        sprintf(keybuf, "key%d", i);
588        sprintf(metabuf, "meta%d", i);
589        sprintf(bodybuf, "body%d", i);
590        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
591            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
592        fdb_set(db, doc[i]);
593    }
594
595    // commit
596    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
597
598    // compact
599    fdb_compact(dbfile, NULL);
600
601    // save ndocs
602    fdb_get_kvs_info(db, &kvs_info);
603    nkvdocs = kvs_info.doc_count;
604
605    // close db file
606    fdb_kvs_close(db);
607    fdb_close(dbfile);
608
609    // reopen
610    fdb_open(&dbfile, "./compact_test1", &fconfig);
611    fdb_kvs_open(dbfile, &db, "db",  &kvs_config);
612
613    // verify kvs stats
614    fdb_get_kvs_info(db, &kvs_info);
615    TEST_CHK((uint64_t)nkvdocs == kvs_info.doc_count);
616
617    fdb_kvs_close(db);
618    fdb_close(dbfile);
619
620    // free all documents
621    for (i=0;i<n;++i){
622        fdb_doc_free(doc[i]);
623    }
624    fdb_shutdown();
625
626    memleak_end();
627
628    TEST_RESULT("compact reopen named kvs");
629}
630
631void compact_reopen_with_iterator()
632{
633    TEST_INIT();
634
635    memleak_start();
636
637    int i, r;
638    int n = 9;
639    int count = 0;
640    int nkvdocs;
641    fdb_file_handle *dbfile, *compact_file;
642    fdb_kvs_handle *db;
643    fdb_doc **doc = alca(fdb_doc*, n);
644    fdb_doc *rdoc = NULL;
645    fdb_status status;
646    fdb_kvs_info kvs_info;
647    fdb_iterator *iterator;
648
649    char keybuf[256], metabuf[256], bodybuf[256];
650
651    // remove previous compact_test files
652    r = system(SHELL_DEL" compact_test* > errorlog.txt");
653    (void)r;
654
655    fdb_config fconfig = fdb_get_default_config();
656    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
657    fconfig.wal_threshold = 1024;
658    fconfig.flags = FDB_OPEN_FLAG_CREATE;
659    fconfig.compaction_threshold = 0;
660
661    // open db
662    fdb_open(&dbfile, "./compact_test1", &fconfig);
663    fdb_kvs_open(dbfile, &db, "db",  &kvs_config);
664
665    status = fdb_set_log_callback(db, logCallbackFunc,
666                                  (void *) "compact_reopen_with_iterator");
667    TEST_CHK(status == FDB_RESULT_SUCCESS);
668
669    for (i=0;i<n;++i){
670        sprintf(keybuf, "key%d", i);
671        sprintf(metabuf, "meta%d", i);
672        sprintf(bodybuf, "body%d", i);
673        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
674            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
675        fdb_set(db, doc[i]);
676    }
677
678    // commit
679    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
680
681    // MB-13859: compact the file using a separate handle..
682    fdb_open(&compact_file, "./compact_test1", &fconfig);
683    // compact
684    status = fdb_compact(compact_file, "./compact_test2");
685    TEST_CHK(status == FDB_RESULT_SUCCESS);
686    // close file after compaction to make the new_file's ref count 0
687    fdb_close(compact_file);
688
689    i = 0;
690    status = fdb_iterator_init(db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
691    TEST_CHK(status == FDB_RESULT_SUCCESS);
692    do {
693        status = fdb_iterator_get(iterator, &rdoc);
694        TEST_CHK(status == FDB_RESULT_SUCCESS);
695
696        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
697        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
698        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
699
700        fdb_doc_free(rdoc);
701        rdoc = NULL;
702        i++;
703        count++;
704    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
705
706    TEST_CHK(count==n);
707
708
709    status = fdb_iterator_close(iterator);
710    TEST_CHK(status == FDB_RESULT_SUCCESS);
711
712    // save ndocs
713    fdb_get_kvs_info(db, &kvs_info);
714    nkvdocs = kvs_info.doc_count;
715
716    // close db file
717    fdb_kvs_close(db);
718    fdb_close(dbfile);
719
720    // reopen
721    fdb_open(&dbfile, "./compact_test2", &fconfig);
722    fdb_kvs_open(dbfile, &db, "db",  &kvs_config);
723
724    // verify kvs stats
725    fdb_get_kvs_info(db, &kvs_info);
726    TEST_CHK((uint64_t)nkvdocs == kvs_info.doc_count);
727
728    fdb_kvs_close(db);
729    fdb_close(dbfile);
730
731    // free all documents
732    for (i=0;i<n;++i){
733        fdb_doc_free(doc[i]);
734    }
735    fdb_shutdown();
736
737    memleak_end();
738
739    TEST_RESULT("compact reopen with iterator");
740}
741
742void estimate_space_upto_test(bool multi_kv)
743{
744    TEST_INIT();
745
746    memleak_start();
747
748    int i, j, r;
749    int n = 300;
750    int num_kvs = 4; // keep this the same as number of fdb_commit() calls
751    fdb_file_handle *dbfile;
752    fdb_kvs_handle **db = alca(fdb_kvs_handle *, num_kvs);
753    fdb_doc **doc = alca(fdb_doc*, n);
754    fdb_status status;
755    fdb_snapshot_info_t *markers;
756    uint64_t num_markers;
757    size_t space_used;
758
759    char keybuf[256], metabuf[256], bodybuf[256];
760    char kv_name[8];
761
762    fdb_config fconfig = fdb_get_default_config();
763    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
764    fconfig.buffercache_size = 0;
765    fconfig.wal_threshold = 50;
766    fconfig.flags = FDB_OPEN_FLAG_CREATE;
767    fconfig.compaction_threshold = 0;
768    fconfig.multi_kv_instances = multi_kv;
769    fconfig.block_reusing_threshold = 0;
770
771    // remove previous compact_test files
772    r = system(SHELL_DEL" compact_test* > errorlog.txt");
773    (void)r;
774
775    // open db
776    fdb_open(&dbfile, "./compact_test1", &fconfig);
777    if (multi_kv) {
778        for (r = 0; r < num_kvs; ++r) {
779            sprintf(kv_name, "kv%d", r);
780            fdb_kvs_open(dbfile, &db[r], kv_name, &kvs_config);
781        }
782    } else {
783        num_kvs = 1;
784        fdb_kvs_open_default(dbfile, &db[0], &kvs_config);
785    }
786
787   // ------- Setup test ----------------------------------
788   // insert first quarter of documents
789    for (i=0; i<n/4; i++){
790        sprintf(keybuf, "key%d", i);
791        sprintf(metabuf, "meta%d", i);
792        sprintf(bodybuf, "body%d", i);
793        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
794            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
795        for (r = 0; r < num_kvs; ++r) {
796            fdb_set(db[r], doc[i]);
797        }
798    }
799
800    // commit with a manual WAL flush (these docs go into HB-trie)
801    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
802
803    // insert second quarter of documents
804    for (; i < n/2; i++){
805        sprintf(keybuf, "key%d", i);
806        sprintf(metabuf, "meta%d", i);
807        sprintf(bodybuf, "body%d", i);
808        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
809            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
810        for (r = 0; r < num_kvs; ++r) {
811            fdb_set(db[r], doc[i]);
812        }
813    }
814    // Update first quarter of documents again..
815    for (j = 0; j < n/4; j++){
816        for (r = 0; r < num_kvs; ++r) {
817            fdb_set(db[r], doc[j]);
818        }
819    }
820    // Update first quarter of documents again overwriting previous update..
821    for (j = 0; j < n/4; j++){
822        for (r = 0; r < num_kvs; ++r) {
823            fdb_set(db[r], doc[j]);
824        }
825    }
826
827    // commit again without a WAL flush (some of these docs remain in WAL)
828    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
829
830    // insert third quarter of documents
831    for (; i < (n/2 + n/4); i++){
832        sprintf(keybuf, "key%d", i);
833        sprintf(metabuf, "meta%d", i);
834        sprintf(bodybuf, "body%d", i);
835        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
836            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
837        for (r = 0; r < num_kvs; ++r) {
838            fdb_set(db[r], doc[i]);
839        }
840    }
841    // manually flush WAL & commit (these docs go into HB-trie)
842    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
843
844    // insert fourth quarter of documents
845    for (; i < n; i++){
846        sprintf(keybuf, "key%d", i);
847        sprintf(metabuf, "meta%d", i);
848        sprintf(bodybuf, "body%d", i);
849        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
850            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
851        for (r = 0; r < num_kvs; ++r) {
852            fdb_set(db[r], doc[i]);
853        }
854    }
855    // commit without a WAL flush (some of these docs remain in the WAL)
856    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
857
858    for (r = 0; r < num_kvs; ++r) {
859        status = fdb_set_log_callback(db[r], logCallbackFunc,
860                                      (void *) "estimate_space_upto_test");
861        TEST_CHK(status == FDB_RESULT_SUCCESS);
862    }
863
864    status = fdb_get_all_snap_markers(dbfile, &markers, &num_markers);
865    TEST_CHK(status == FDB_RESULT_SUCCESS);
866
867    if (!multi_kv) {
868        size_t space_used2;
869        TEST_CHK(num_markers == 4);
870        space_used = fdb_estimate_space_used_from(dbfile, markers[1].marker);
871        space_used2 = fdb_estimate_space_used_from(dbfile, markers[2].marker);
872        TEST_CHK(space_used2 > space_used); // greater than space used by just 1
873    } else {
874        size_t space_used2;
875        TEST_CHK(num_markers == 8);
876        space_used = fdb_estimate_space_used_from(dbfile, markers[1].marker);
877        space_used2 = fdb_estimate_space_used_from(dbfile, markers[2].marker);
878        TEST_CHK(space_used2 > space_used); // greater than space used by just 1
879    }
880
881    status = fdb_free_snap_markers(markers, num_markers);
882    TEST_CHK(status == FDB_RESULT_SUCCESS);
883
884    // close db file
885    fdb_close(dbfile);
886
887    // free all documents
888    for (i=0;i<n;++i){
889        fdb_doc_free(doc[i]);
890    }
891
892    // free all resources
893    fdb_shutdown();
894
895    memleak_end();
896
897    sprintf(bodybuf, "estimate space upto marker in file test %s", multi_kv ?
898                                                           "multiple kv mode:"
899                                                         : "single kv mode:");
900    TEST_RESULT(bodybuf);
901}
902
903void compact_upto_test(bool multi_kv)
904{
905    TEST_INIT();
906
907    memleak_start();
908
909    int i, r;
910    int n = 20;
911    int num_kvs = 4; // keep this the same as number of fdb_commit() calls
912    fdb_file_handle *dbfile;
913    fdb_kvs_handle **db = alca(fdb_kvs_handle *, num_kvs);
914    fdb_doc **doc = alca(fdb_doc*, n);
915    fdb_status status;
916    fdb_snapshot_info_t *markers;
917    fdb_kvs_handle *snapshot;
918    uint64_t num_markers;
919
920    char keybuf[256], metabuf[256], bodybuf[256];
921    char kv_name[8];
922    char compact_filename[32];
923
924    fdb_config fconfig = fdb_get_default_config();
925    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
926    fconfig.buffercache_size = 0;
927    fconfig.wal_threshold = 1024;
928    fconfig.flags = FDB_OPEN_FLAG_CREATE;
929    fconfig.compaction_threshold = 0;
930    fconfig.multi_kv_instances = multi_kv;
931    // since this test requires static number of markers,
932    // disable block reusing
933    fconfig.block_reusing_threshold = 0;
934
935    // remove previous compact_test files
936    r = system(SHELL_DEL" compact_test* > errorlog.txt");
937    (void)r;
938
939    // open db
940    fdb_open(&dbfile, "./compact_test1", &fconfig);
941    if (multi_kv) {
942        for (r = 0; r < num_kvs; ++r) {
943            sprintf(kv_name, "kv%d", r);
944            fdb_kvs_open(dbfile, &db[r], kv_name, &kvs_config);
945        }
946    } else {
947        num_kvs = 1;
948        fdb_kvs_open_default(dbfile, &db[0], &kvs_config);
949    }
950
951   // ------- Setup test ----------------------------------
952   // insert documents of 0-4
953    for (i=0; i<n/4; i++){
954        sprintf(keybuf, "key%d", i);
955        sprintf(metabuf, "meta%d", i);
956        sprintf(bodybuf, "body%d", i);
957        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
958            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
959        for (r = 0; r < num_kvs; ++r) {
960            fdb_set(db[r], doc[i]);
961        }
962    }
963
964    // commit with a manual WAL flush (these docs go into HB-trie)
965    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
966
967    // insert documents from 5 - 9
968    for (; i < n/2; i++){
969        sprintf(keybuf, "key%d", i);
970        sprintf(metabuf, "meta%d", i);
971        sprintf(bodybuf, "body%d", i);
972        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
973            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
974        for (r = 0; r < num_kvs; ++r) {
975            fdb_set(db[r], doc[i]);
976        }
977    }
978
979    // commit again without a WAL flush
980    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
981
982    // insert documents from 10-14 into HB-trie
983    for (; i < (n/2 + n/4); i++){
984        sprintf(keybuf, "key%d", i);
985        sprintf(metabuf, "meta%d", i);
986        sprintf(bodybuf, "body%d", i);
987        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
988            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
989        for (r = 0; r < num_kvs; ++r) {
990            fdb_set(db[r], doc[i]);
991        }
992    }
993    // manually flush WAL & commit
994    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
995
996    // insert documents from 15 - 19 on file into the WAL
997    for (; i < n; i++){
998        sprintf(keybuf, "key%d", i);
999        sprintf(metabuf, "meta%d", i);
1000        sprintf(bodybuf, "body%d", i);
1001        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1002            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1003        for (r = 0; r < num_kvs; ++r) {
1004            fdb_set(db[r], doc[i]);
1005        }
1006    }
1007    // commit without a WAL flush
1008    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1009
1010    for (r = 0; r < num_kvs; ++r) {
1011        status = fdb_set_log_callback(db[r], logCallbackFunc,
1012                                      (void *) "compact_upto_test");
1013        TEST_CHK(status == FDB_RESULT_SUCCESS);
1014    }
1015
1016    status = fdb_get_all_snap_markers(dbfile, &markers, &num_markers);
1017    TEST_CHK(status == FDB_RESULT_SUCCESS);
1018
1019    if (!multi_kv) {
1020        TEST_CHK(num_markers == 4);
1021        for (r = 0; (uint64_t)r < num_markers; ++r) {
1022            TEST_CHK(markers[r].num_kvs_markers == 1);
1023            TEST_CHK(markers[r].kvs_markers[0].seqnum ==
1024                     (fdb_seqnum_t)(n - r*5));
1025        }
1026        r = 1; // Test compacting upto sequence number 15
1027        sprintf(compact_filename, "compact_test_compact%d", r);
1028        status = fdb_compact_upto(dbfile, compact_filename,
1029                                  markers[r].marker);
1030        TEST_CHK(status == FDB_RESULT_SUCCESS);
1031        // create a snapshot
1032        status = fdb_snapshot_open(db[0], &snapshot,
1033                                   markers[r].kvs_markers[0].seqnum);
1034        TEST_CHK(status == FDB_RESULT_SUCCESS);
1035        // close snapshot
1036        fdb_kvs_close(snapshot);
1037    } else {
1038        TEST_CHK(num_markers == 8);
1039        for (r = 0; r < num_kvs; ++r) {
1040            TEST_CHK(markers[r].num_kvs_markers == num_kvs);
1041            for (i = 0; i < num_kvs; ++i) {
1042                TEST_CHK(markers[r].kvs_markers[i].seqnum
1043                         == (fdb_seqnum_t)(n - r*5));
1044                sprintf(kv_name, "kv%d", i);
1045                TEST_CMP(markers[r].kvs_markers[i].kv_store_name, kv_name, 3);
1046            }
1047        }
1048        i = r = 1;
1049        sprintf(compact_filename, "compact_test_compact%d", i);
1050        status = fdb_compact_upto(dbfile, compact_filename,
1051                markers[i].marker);
1052        TEST_CHK(status == FDB_RESULT_SUCCESS);
1053        // create a snapshot
1054        status = fdb_snapshot_open(db[r], &snapshot,
1055                markers[i].kvs_markers[r].seqnum);
1056        TEST_CHK(status == FDB_RESULT_SUCCESS);
1057        // close snapshot
1058        fdb_kvs_close(snapshot);
1059    }
1060
1061    status = fdb_free_snap_markers(markers, num_markers);
1062    TEST_CHK(status == FDB_RESULT_SUCCESS);
1063
1064    // close db file
1065    fdb_close(dbfile);
1066
1067    // free all documents
1068    for (i=0;i<n;++i){
1069        fdb_doc_free(doc[i]);
1070    }
1071
1072    // free all resources
1073    fdb_shutdown();
1074
1075    memleak_end();
1076
1077    sprintf(bodybuf, "compact upto marker in file test %s", multi_kv ?
1078                                                           "multiple kv mode:"
1079                                                         : "single kv mode:");
1080    TEST_RESULT(bodybuf);
1081}
1082
1083void auto_recover_compact_ok_test()
1084{
1085    TEST_INIT();
1086
1087    memleak_start();
1088
1089    int i, r;
1090    int n = 3;
1091    fdb_file_handle *dbfile, *dbfile_new;
1092    fdb_kvs_handle *db;
1093    fdb_kvs_handle *db_new;
1094    fdb_doc **doc = alca(fdb_doc *, n);
1095    fdb_doc *rdoc;
1096    fdb_status status;
1097
1098    char keybuf[256], metabuf[256], bodybuf[256];
1099
1100    // remove previous compact_test files
1101    r = system(SHELL_DEL " compact_test* > errorlog.txt");
1102    (void)r;
1103
1104    fdb_config fconfig = fdb_get_default_config();
1105    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1106    fconfig.buffercache_size = 16777216;
1107    fconfig.wal_threshold = 1024;
1108    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1109    fconfig.compaction_threshold = 0;
1110
1111    // open db
1112    fdb_open(&dbfile, "./compact_test1", &fconfig);
1113    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1114    status = fdb_set_log_callback(db, logCallbackFunc,
1115                                  (void *) "auto_recover_compact_ok_test");
1116    TEST_CHK(status == FDB_RESULT_SUCCESS);
1117    fdb_open(&dbfile_new, "./compact_test1", &fconfig);
1118    fdb_kvs_open_default(dbfile_new, &db_new, &kvs_config);
1119    status = fdb_set_log_callback(db_new, logCallbackFunc,
1120                                  (void *) "auto_recover_compact_ok_test");
1121    TEST_CHK(status == FDB_RESULT_SUCCESS);
1122
1123    // insert first two documents
1124    for (i=0;i<2;++i){
1125        sprintf(keybuf, "key%d", i);
1126        sprintf(metabuf, "meta%d", i);
1127        sprintf(bodybuf, "body%d", i);
1128        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1129            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1130        fdb_set(db, doc[i]);
1131    }
1132
1133    // remove second doc
1134    fdb_doc_create(&rdoc, doc[1]->key, doc[1]->keylen, doc[1]->meta, doc[1]->metalen, NULL, 0);
1135    rdoc->deleted = true;
1136    fdb_set(db, rdoc);
1137    fdb_doc_free(rdoc);
1138
1139    // commit
1140    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1141
1142    // perform compaction using one handle
1143    fdb_compact(dbfile, (char *) "./compact_test2");
1144
1145    // save the old file after compaction is done ..
1146    r = system(SHELL_COPY " compact_test1 compact_test11 > errorlog.txt");
1147    (void)r;
1148
1149    // now insert third doc: it should go to the newly compacted file.
1150    sprintf(keybuf, "key%d", i);
1151    sprintf(metabuf, "meta%d", i);
1152    sprintf(bodybuf, "body%d", i);
1153    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1154        (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1155    fdb_set(db, doc[i]);
1156
1157    // commit
1158    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1159
1160    // close both the db files ...
1161    fdb_kvs_close(db);
1162    fdb_kvs_close(db_new);
1163    fdb_close(dbfile);
1164    fdb_close(dbfile_new);
1165
1166    // restore the old file after close is done ..
1167    r = system(SHELL_MOVE " compact_test11 compact_test1 > errorlog.txt");
1168    (void)r;
1169
1170    // now open the old saved compacted file, it should automatically recover
1171    // and use the new file since compaction was done successfully
1172    fdb_open(&dbfile_new, "./compact_test1", &fconfig);
1173    fdb_kvs_open_default(dbfile_new, &db_new, &kvs_config);
1174    status = fdb_set_log_callback(db_new, logCallbackFunc,
1175                                  (void *) "auto_recover_compact_ok_test");
1176    TEST_CHK(status == FDB_RESULT_SUCCESS);
1177
1178    // retrieve documents using the old handle and expect all 3 docs
1179    for (i=0;i<n;++i){
1180        // search by key
1181        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1182        status = fdb_get(db_new, rdoc);
1183
1184        if (i != 1) {
1185            TEST_CHK(status == FDB_RESULT_SUCCESS);
1186            TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
1187            TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1188        }else{
1189            TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
1190        }
1191
1192        // free result document
1193        fdb_doc_free(rdoc);
1194    }
1195    // check this handle's filename it should point to newly compacted file
1196    fdb_file_info info;
1197    fdb_get_file_info(dbfile_new, &info);
1198    TEST_CHK(!strcmp("./compact_test2", info.filename));
1199
1200    // close the file
1201    fdb_kvs_close(db_new);
1202    fdb_close(dbfile_new);
1203
1204    // free all documents
1205    for (i=0;i<n;++i){
1206        fdb_doc_free(doc[i]);
1207    }
1208
1209    // free all resources
1210    fdb_shutdown();
1211
1212    memleak_end();
1213
1214    TEST_RESULT("auto recovery after compaction test");
1215}
1216
1217#if !defined(WIN32) && !defined(_WIN32)
1218static bool does_file_exist(const char *filename) {
1219    struct stat st;
1220    int result = stat(filename, &st);
1221    return result == 0;
1222}
1223#else
1224static bool does_file_exist(const char *filename) {
1225    return GetFileAttributes(filename) != INVALID_FILE_ATTRIBUTES;
1226}
1227#endif
1228
1229void unlink_after_compaction_test()
1230{
1231    TEST_INIT();
1232    memleak_start();
1233
1234    int i, r;
1235    int n = 10;
1236    fdb_file_handle *dbfile;
1237    fdb_kvs_handle *db, *snap;
1238    fdb_doc **doc = alca(fdb_doc *, n);
1239    fdb_doc *rdoc;
1240    fdb_status s;
1241
1242    char keybuf[256], metabuf[256], bodybuf[256];
1243
1244    // remove previous compact_test files
1245    r = system(SHELL_DEL " compact_test* > errorlog.txt");
1246    (void)r;
1247
1248    fdb_config fconfig = fdb_get_default_config();
1249    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1250    fconfig.buffercache_size = 16777216;
1251    fconfig.wal_threshold = 1024;
1252    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1253    fconfig.compaction_threshold = 0;
1254
1255    // open db
1256    fdb_open(&dbfile, "./compact_test1", &fconfig);
1257    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1258    s = fdb_set_log_callback(db, logCallbackFunc,
1259                             (void *) "unlink_after_compaction_test");
1260    TEST_CHK(s == FDB_RESULT_SUCCESS);
1261
1262    // set docs
1263    for (i=0;i<n;++i){
1264        sprintf(keybuf, "key%d", i);
1265        sprintf(metabuf, "meta%d", i);
1266        sprintf(bodybuf, "body%d", i);
1267        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1268            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1269        fdb_set(db, doc[i]);
1270    }
1271
1272    // commit
1273    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1274
1275    // create a snapshot
1276    s = fdb_snapshot_open(db, &snap, n);
1277    TEST_CHK(s == FDB_RESULT_SUCCESS);
1278
1279    // compaction
1280    s = fdb_compact(dbfile, "./compact_test2");
1281    TEST_CHK(s == FDB_RESULT_SUCCESS);
1282
1283    // retrieve check on the new file
1284    for (i=0;i<n;++i){
1285        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1286        s = fdb_get(db, rdoc);
1287        TEST_CHK(s == FDB_RESULT_SUCCESS);
1288        fdb_doc_free(rdoc);
1289    }
1290
1291    // the old file should not be seen by user level application
1292#if !defined(WIN32) && !defined(_WIN32)
1293#ifndef _MSC_VER
1294    TEST_CHK(!does_file_exist("./compact_test1"));
1295#endif
1296#endif
1297
1298    // retrieve check on the old file (snapshot)
1299    for (i=0;i<n;++i){
1300        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1301        s = fdb_get(snap, rdoc);
1302        TEST_CHK(s == FDB_RESULT_SUCCESS);
1303        fdb_doc_free(rdoc);
1304    }
1305
1306    fdb_close(dbfile);
1307
1308    fdb_shutdown();
1309    for (i=0; i<n; ++i){
1310        fdb_doc_free(doc[i]);
1311    }
1312
1313    memleak_end();
1314    TEST_RESULT("unlink after compaction test");
1315}
1316
1317void db_compact_overwrite()
1318{
1319    TEST_INIT();
1320    memleak_start();
1321
1322    int i, r;
1323    int n = 30;
1324    fdb_file_handle *dbfile, *dbfile2;
1325    fdb_kvs_handle *db, *db2;
1326    fdb_doc **doc = alca(fdb_doc *, n);
1327    fdb_doc **doc2 = alca(fdb_doc *, 2*n);
1328    fdb_doc *rdoc;
1329    fdb_status status;
1330    fdb_config fconfig;
1331    fdb_kvs_info kvs_info;
1332    fdb_kvs_config kvs_config;
1333
1334    char keybuf[256], metabuf[256], bodybuf[256];
1335
1336    // remove previous compact_test files
1337    r = system(SHELL_DEL " compact_test* > errorlog.txt");
1338    (void)r;
1339
1340    fconfig = fdb_get_default_config();
1341    kvs_config = fdb_get_default_kvs_config();
1342    fconfig.buffercache_size = 16777216;
1343    fconfig.wal_threshold = 1024;
1344    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1345    fconfig.compaction_threshold = 0;
1346
1347    // write to db1
1348    fdb_open(&dbfile, "./compact_test1", &fconfig);
1349    fdb_kvs_open(dbfile, &db, NULL, &kvs_config);
1350    status = fdb_set_log_callback(db, logCallbackFunc,
1351                                  (void *) "db_destroy_test");
1352    TEST_CHK(status == FDB_RESULT_SUCCESS);
1353    for (i=0;i<n;++i){
1354        sprintf(keybuf, "key%d", i);
1355        sprintf(metabuf, "meta%d", i);
1356        sprintf(bodybuf, "body%d", i);
1357        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1358            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1359        fdb_set(db, doc[i]);
1360    }
1361    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1362
1363    // Open the empty db with future compact name
1364    fdb_open(&dbfile2, "./compact_test1.1", &fconfig);
1365    fdb_kvs_open(dbfile2, &db2, NULL, &kvs_config);
1366    status = fdb_set_log_callback(db2, logCallbackFunc,
1367                                  (void *) "db_destroy_test");
1368    TEST_CHK(status == FDB_RESULT_SUCCESS);
1369    // write to db2
1370    for (i=0;i < 2*n;++i){
1371        sprintf(keybuf, "k2ey%d", i);
1372        sprintf(metabuf, "m2eta%d", i);
1373        sprintf(bodybuf, "b2ody%d", i);
1374        fdb_doc_create(&doc2[i], (void*)keybuf, strlen(keybuf),
1375            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1376        fdb_set(db2, doc2[i]);
1377    }
1378    fdb_commit(dbfile2, FDB_COMMIT_NORMAL);
1379
1380
1381    // verify db2 seqnum and close
1382    fdb_get_kvs_info(db2, &kvs_info);
1383    TEST_CHK(kvs_info.last_seqnum = 2*n);
1384    fdb_kvs_close(db2);
1385    fdb_close(dbfile2);
1386
1387    // compact db1
1388    status = fdb_compact(dbfile, NULL);
1389    TEST_CHK(status == FDB_RESULT_SUCCESS);
1390
1391    // close db1
1392    fdb_kvs_close(db);
1393    fdb_close(dbfile);
1394
1395    // reopen db1
1396    fdb_open(&dbfile, "./compact_test1", &fconfig);
1397    fdb_kvs_open(dbfile, &db, NULL, &kvs_config);
1398    status = fdb_set_log_callback(db, logCallbackFunc,
1399                                  (void *) "db_destroy_test");
1400    TEST_CHK(status == FDB_RESULT_SUCCESS);
1401    // read db1
1402    for (i=0;i<n;++i){
1403        // search by key
1404        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1405        status = fdb_get(db, rdoc);
1406        TEST_CHK(status == FDB_RESULT_SUCCESS);
1407        TEST_CHK(!memcmp(rdoc->meta, doc[i]->meta, rdoc->metalen));
1408        TEST_CHK(!memcmp(rdoc->body, doc[i]->body, rdoc->bodylen));
1409        // free result document
1410        fdb_doc_free(rdoc);
1411    }
1412
1413    // reopen db2
1414    fdb_open(&dbfile2, "./compact_test1.1", &fconfig);
1415    fdb_kvs_open(dbfile2, &db2, NULL, &kvs_config);
1416    status = fdb_set_log_callback(db2, logCallbackFunc,
1417                                  (void *) "db_destroy_test");
1418    TEST_CHK(status == FDB_RESULT_SUCCESS);
1419
1420    fdb_get_kvs_info(db2, &kvs_info);
1421    TEST_CHK(kvs_info.last_seqnum = 2*n);
1422
1423    // read db2
1424    for (i=0;i<2*n;++i){
1425        // search by key
1426        fdb_doc_create(&rdoc, doc2[i]->key, doc2[i]->keylen, NULL, 0, NULL, 0);
1427        status = fdb_get(db2, rdoc);
1428        TEST_CHK(status == FDB_RESULT_SUCCESS);
1429        TEST_CHK(!memcmp(rdoc->meta, doc2[i]->meta, rdoc->metalen));
1430        TEST_CHK(!memcmp(rdoc->body, doc2[i]->body, rdoc->bodylen));
1431        // free result document
1432        fdb_doc_free(rdoc);
1433    }
1434
1435    // free all documents
1436    for (i=0;i<n;++i){
1437        fdb_doc_free(doc[i]);
1438        fdb_doc_free(doc2[i]);
1439    }
1440    for (i=n;i<2*n;++i){
1441        fdb_doc_free(doc2[i]);
1442    }
1443
1444    fdb_kvs_close(db);
1445    fdb_close(dbfile);
1446    fdb_kvs_close(db2);
1447    fdb_close(dbfile2);
1448
1449
1450    fdb_shutdown();
1451    memleak_end();
1452    TEST_RESULT("compact overwrite");
1453}
1454
1455void *db_compact_during_doc_delete(void *args)
1456{
1457
1458    TEST_INIT();
1459    memleak_start();
1460
1461    int i, r;
1462    int n = 100;
1463    fdb_file_handle *dbfile;
1464    fdb_kvs_handle *db;
1465    fdb_status status;
1466    fdb_config fconfig;
1467    fdb_kvs_config kvs_config;
1468    fdb_kvs_info kvs_info;
1469    thread_t tid;
1470    void *thread_ret;
1471    fdb_doc **doc = alca(fdb_doc*, n);
1472    char keybuf[256], metabuf[256], bodybuf[256];
1473
1474    // init dbfile
1475    kvs_config = fdb_get_default_kvs_config();
1476    fconfig = fdb_get_default_config();
1477    fconfig.buffercache_size = 0;
1478    fconfig.wal_threshold = 1024;
1479    fconfig.compaction_threshold = 0;
1480
1481    if (args == NULL)
1482    { // parent
1483
1484        r = system(SHELL_DEL" compact_test* > errorlog.txt");
1485        (void)r;
1486
1487        status = fdb_open(&dbfile, "./compact_test1", &fconfig);
1488        TEST_CHK(status == FDB_RESULT_SUCCESS);
1489        status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
1490        TEST_CHK(status == FDB_RESULT_SUCCESS);
1491
1492        // insert documents
1493        for (i=0;i<n;++i){
1494            sprintf(keybuf, "key%d", i);
1495            sprintf(metabuf, "meta%d", i);
1496            sprintf(bodybuf, "body%d", i);
1497            fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1498                (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1499            fdb_set(db, doc[i]);
1500        }
1501
1502        fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1503        // verify no docs remaining
1504        fdb_get_kvs_info(db, &kvs_info);
1505        TEST_CHK(kvs_info.doc_count == (uint64_t)n);
1506
1507        // start deleting docs
1508        for (i=0;i<n;++i){
1509            fdb_del(db, doc[i]);
1510            if (i == n/2){
1511                // compact half-way
1512                thread_create(&tid, db_compact_during_doc_delete, (void *)dbfile);
1513            }
1514        }
1515
1516        // join compactor
1517        thread_join(tid, &thread_ret);
1518
1519        fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1520
1521        // verify no docs remaining
1522        fdb_get_kvs_info(db, &kvs_info);
1523        TEST_CHK(kvs_info.doc_count == 0);
1524
1525        // reopen
1526        fdb_kvs_close(db);
1527        fdb_close(dbfile);
1528        status = fdb_open(&dbfile, "./compact_test1", &fconfig);
1529        TEST_CHK(status == FDB_RESULT_SUCCESS);
1530        status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
1531        TEST_CHK(status == FDB_RESULT_SUCCESS);
1532
1533        fdb_get_kvs_info(db, &kvs_info);
1534        TEST_CHK(kvs_info.doc_count == 0);
1535
1536        // cleanup
1537        for (i=0;i<n;++i){
1538            fdb_doc_free(doc[i]);
1539        }
1540        fdb_kvs_close(db);
1541        fdb_close(dbfile);
1542        fdb_shutdown();
1543
1544        memleak_end();
1545        TEST_RESULT("multi thread client shutdown");
1546        return NULL;
1547    }
1548
1549    // compaction thread enters here //
1550    status = fdb_open(&dbfile, "./compact_test1", &fconfig);
1551    TEST_CHK(status == FDB_RESULT_SUCCESS);
1552    status = fdb_compact(dbfile, NULL);
1553    TEST_CHK(status == FDB_RESULT_SUCCESS);
1554    fdb_close(dbfile);
1555
1556    // shutdown
1557    thread_exit(0);
1558    return NULL;
1559}
1560
1561void compaction_daemon_test(size_t time_sec)
1562{
1563    TEST_INIT();
1564
1565    memleak_start();
1566
1567    int i, r;
1568    int n = 10000;
1569    int compaction_threshold = 30;
1570    int escape = 0;
1571    fdb_file_handle *dbfile, *dbfile_less, *dbfile_non, *dbfile_manual, *dbfile_new;
1572    fdb_kvs_handle *db, *db_less, *db_non, *db_manual;
1573    fdb_kvs_handle *snapshot;
1574    fdb_doc **doc = alca(fdb_doc*, n);
1575    fdb_doc *rdoc;
1576    fdb_file_info info;
1577    fdb_status status;
1578    struct timeval ts_begin, ts_cur, ts_gap;
1579
1580    char keybuf[256], metabuf[256], bodybuf[256];
1581
1582    // remove previous compact_test files
1583    r = system(SHELL_DEL" compact_test* > errorlog.txt");
1584    (void)r;
1585
1586    fdb_config fconfig = fdb_get_default_config();
1587    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1588    fconfig.buffercache_size = 0;
1589    fconfig.wal_threshold = 1024;
1590    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1591    fconfig.compaction_mode = FDB_COMPACTION_AUTO;
1592    fconfig.compaction_threshold = compaction_threshold;
1593    fconfig.compactor_sleep_duration = 1; // for quick test
1594
1595    fconfig.num_compactor_threads = 0;
1596    status = fdb_open(&dbfile, "compact_test", &fconfig);
1597    TEST_CHK(status == FDB_RESULT_INVALID_CONFIG);
1598
1599    fconfig.num_compactor_threads = DEFAULT_NUM_COMPACTOR_THREADS;
1600    // open db
1601    fdb_open(&dbfile, "compact_test", &fconfig);
1602    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1603    status = fdb_set_log_callback(db, logCallbackFunc,
1604                                  (void *) "compaction_daemon_test");
1605    TEST_CHK(status == FDB_RESULT_SUCCESS);
1606    // insert documents
1607    printf("Initialize..\n");
1608    for (i=0;i<n;++i){
1609        sprintf(keybuf, "key%04d", i);
1610        sprintf(metabuf, "meta%04d", i);
1611        sprintf(bodybuf, "body%04d", i);
1612        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf)+1,
1613            (void*)metabuf, strlen(metabuf)+1, (void*)bodybuf, strlen(bodybuf)+1);
1614        fdb_set(db, doc[i]);
1615    }
1616    // commit
1617    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1618    // close db file
1619    fdb_close(dbfile);
1620
1621    // ---- basic retrieve test ------------------------
1622    // reopen db file
1623    status = fdb_open(&dbfile, "compact_test", &fconfig);
1624    TEST_CHK(status == FDB_RESULT_SUCCESS);
1625    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
1626    TEST_CHK(status == FDB_RESULT_SUCCESS);
1627    status = fdb_set_log_callback(db, logCallbackFunc,
1628                                  (void *) "compaction_daemon_test");
1629    TEST_CHK(status == FDB_RESULT_SUCCESS);
1630    // check db filename
1631    fdb_get_file_info(dbfile, &info);
1632    TEST_CHK(!strcmp(info.filename, "compact_test"));
1633
1634    // retrieve documents
1635    for (i=0;i<n;++i){
1636        sprintf(keybuf, "key%04d", i);
1637        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf)+1,
1638            NULL, 0, NULL, 0);
1639        status = fdb_get(db, rdoc);
1640        //printf("%s %s\n", rdoc->key, rdoc->body);
1641        TEST_CHK(status == FDB_RESULT_SUCCESS);
1642        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1643        fdb_doc_free(rdoc);
1644    }
1645
1646    // create a snapshot
1647    status = fdb_snapshot_open(db, &snapshot, n);
1648    TEST_CHK(status == FDB_RESULT_SUCCESS);
1649    // close snapshot
1650    fdb_kvs_close(snapshot);
1651
1652    // close db file
1653    fdb_close(dbfile);
1654
1655    // ---- handling when metafile is removed ------------
1656    // remove meta file
1657    r = system(SHELL_DEL" compact_test.meta > errorlog.txt");
1658    (void)r;
1659    // reopen db file
1660    status = fdb_open(&dbfile, "compact_test", &fconfig);
1661    TEST_CHK(status == FDB_RESULT_SUCCESS);
1662    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
1663    TEST_CHK(status == FDB_RESULT_SUCCESS);
1664    status = fdb_set_log_callback(db, logCallbackFunc,
1665                                  (void *) "compaction_daemon_test");
1666    TEST_CHK(status == FDB_RESULT_SUCCESS);
1667    // retrieve documents
1668    for (i=0;i<n;++i){
1669        sprintf(keybuf, "key%04d", i);
1670        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf)+1,
1671            NULL, 0, NULL, 0);
1672        status = fdb_get(db, rdoc);
1673        //printf("%s %s\n", rdoc->key, rdoc->body);
1674        TEST_CHK(status == FDB_RESULT_SUCCESS);
1675        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1676        fdb_doc_free(rdoc);
1677    }
1678    // close db file
1679    fdb_close(dbfile);
1680
1681    // ---- handling when metafile points to non-exist file ------------
1682    // remove meta file
1683    r = system(SHELL_MOVE" compact_test.0 compact_test.23 > errorlog.txt");
1684    (void)r;
1685    // reopen db file
1686    status = fdb_open(&dbfile, "compact_test", &fconfig);
1687    TEST_CHK(status == FDB_RESULT_SUCCESS);
1688    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
1689    TEST_CHK(status == FDB_RESULT_SUCCESS);
1690    status = fdb_set_log_callback(db, logCallbackFunc,
1691                                  (void *) "compaction_daemon_test");
1692    TEST_CHK(status == FDB_RESULT_SUCCESS);
1693    // retrieve documents
1694    for (i=0;i<n;++i){
1695        sprintf(keybuf, "key%04d", i);
1696        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf)+1,
1697            NULL, 0, NULL, 0);
1698        status = fdb_get(db, rdoc);
1699        //printf("%s %s\n", rdoc->key, rdoc->body);
1700        TEST_CHK(status == FDB_RESULT_SUCCESS);
1701        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1702        fdb_doc_free(rdoc);
1703    }
1704    // close db file
1705    fdb_close(dbfile);
1706
1707    // ---- compaction daemon test -------------------
1708    // db: DB instance to be compacted
1709    // db_less: DB instance to be compacted but with much lower update throughput
1710    // db_non: DB instance not to be compacted (auto compaction with threshold = 0)
1711    // db_manual: DB instance not to be compacted (manual compaction)
1712
1713    // open & create db_less, db_non and db_manual
1714    status = fdb_open(&dbfile_less, "compact_test_less", &fconfig);
1715    TEST_CHK(status == FDB_RESULT_SUCCESS);
1716    status = fdb_kvs_open_default(dbfile_less, &db_less, &kvs_config);
1717    TEST_CHK(status == FDB_RESULT_SUCCESS);
1718
1719    fconfig.compaction_threshold = 0;
1720    status = fdb_open(&dbfile_non, "compact_test_non", &fconfig);
1721    TEST_CHK(status == FDB_RESULT_SUCCESS);
1722    status = fdb_kvs_open_default(dbfile_non, &db_non, &kvs_config);
1723    TEST_CHK(status == FDB_RESULT_SUCCESS);
1724
1725    fconfig.compaction_mode = FDB_COMPACTION_MANUAL;
1726    status = fdb_open(&dbfile_manual, "compact_test_manual", &fconfig);
1727    TEST_CHK(status == FDB_RESULT_SUCCESS);
1728    status = fdb_kvs_open_default(dbfile_manual, &db_manual, &kvs_config);
1729    TEST_CHK(status == FDB_RESULT_SUCCESS);
1730
1731    // reopen db file
1732    fconfig.compaction_threshold = 30;
1733    fconfig.compaction_mode = FDB_COMPACTION_AUTO;
1734    status = fdb_open(&dbfile, "compact_test", &fconfig);
1735    TEST_CHK(status == FDB_RESULT_SUCCESS);
1736    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
1737    TEST_CHK(status == FDB_RESULT_SUCCESS);
1738    status = fdb_set_log_callback(db, logCallbackFunc,
1739                                  (void *) "compaction_daemon_test");
1740    TEST_CHK(status == FDB_RESULT_SUCCESS);
1741    // continuously update documents
1742    printf("wait for %d seconds..\n", (int)time_sec);
1743    gettimeofday(&ts_begin, NULL);
1744    while (!escape) {
1745        for (i=0;i<n;++i){
1746            // update db
1747            fdb_set(db, doc[i]);
1748            fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1749
1750            // update db_less (1/100 throughput)
1751            if (i%100 == 0){
1752                fdb_set(db_less, doc[i]);
1753                fdb_commit(dbfile_less, FDB_COMMIT_NORMAL);
1754            }
1755
1756            // update db_non
1757            fdb_set(db_non, doc[i]);
1758            fdb_commit(dbfile_non, FDB_COMMIT_NORMAL);
1759
1760            // update db_manual
1761            fdb_set(db_manual, doc[i]);
1762            fdb_commit(dbfile_manual, FDB_COMMIT_NORMAL);
1763
1764            gettimeofday(&ts_cur, NULL);
1765            ts_gap = _utime_gap(ts_begin, ts_cur);
1766            if ((size_t)ts_gap.tv_sec >= time_sec) {
1767                escape = 1;
1768                break;
1769            }
1770        }
1771    }
1772
1773    // Change the compaction interval to 60 secs.
1774    status = fdb_set_daemon_compaction_interval(dbfile, 60);
1775    TEST_CHK(status == FDB_RESULT_SUCCESS);
1776    // Change the compaction interval back to 1 sec.
1777    status = fdb_set_daemon_compaction_interval(dbfile, 1);
1778    TEST_CHK(status == FDB_RESULT_SUCCESS);
1779
1780    // perform manual compaction of auto-compact file
1781    status = fdb_compact(dbfile_non, NULL);
1782    TEST_CHK(status == FDB_RESULT_SUCCESS);
1783
1784    // perform manual compaction of manual-compact file
1785    status = fdb_compact(dbfile_manual, "compact_test_manual_compacted");
1786    TEST_CHK(status == FDB_RESULT_SUCCESS);
1787
1788    // open compact_test_manual_compacted using new db handle
1789    fconfig.compaction_mode = FDB_COMPACTION_MANUAL;
1790    status = fdb_open(&dbfile_new, "compact_test_manual_compacted", &fconfig);
1791    TEST_CHK(status == FDB_RESULT_SUCCESS);
1792
1793    // try to switch compaction mode
1794    status = fdb_switch_compaction_mode(dbfile_manual, FDB_COMPACTION_AUTO, 30);
1795    TEST_CHK(status == FDB_RESULT_FILE_IS_BUSY);
1796
1797    // close db_new
1798    status = fdb_close(dbfile_new);
1799    TEST_CHK(status == FDB_RESULT_SUCCESS);
1800
1801    // switch compaction mode of 'db_manual' from MANUAL to AUTO
1802    status = fdb_switch_compaction_mode(dbfile_manual, FDB_COMPACTION_AUTO, 10);
1803    TEST_CHK(status == FDB_RESULT_SUCCESS);
1804
1805    // change compaction value
1806    status = fdb_switch_compaction_mode(dbfile_manual, FDB_COMPACTION_AUTO, 30);
1807    TEST_CHK(status == FDB_RESULT_SUCCESS);
1808
1809    // close and open with auto-compact option
1810    status = fdb_close(dbfile_manual);
1811    TEST_CHK(status == FDB_RESULT_SUCCESS);
1812    fconfig.compaction_mode = FDB_COMPACTION_AUTO;
1813    status = fdb_open(&dbfile_manual, "compact_test_manual_compacted", &fconfig);
1814    TEST_CHK(status == FDB_RESULT_SUCCESS);
1815
1816    // switch compaction mode of 'db_non' from AUTO to MANUAL
1817    status = fdb_switch_compaction_mode(dbfile_non, FDB_COMPACTION_MANUAL, 0);
1818    TEST_CHK(status == FDB_RESULT_SUCCESS);
1819
1820    // close and open with manual-compact option
1821    status = fdb_close(dbfile_non);
1822    TEST_CHK(status == FDB_RESULT_SUCCESS);
1823    fconfig.compaction_mode = FDB_COMPACTION_MANUAL;
1824    status = fdb_open(&dbfile_non, "compact_test_non", &fconfig);
1825    TEST_CHK(status == FDB_RESULT_SUCCESS);
1826
1827    // Now perform one manual compaction on compact_test_non
1828    fdb_compact(dbfile_non, "compact_test_non.manual");
1829
1830    // close all db files except compact_test_non
1831    fdb_close(dbfile);
1832    fdb_close(dbfile_less);
1833    fdb_close(dbfile_manual);
1834
1835    // open manual compact file (compact_test_non) using auto compact mode
1836    fconfig.compaction_mode = FDB_COMPACTION_AUTO;
1837    status = fdb_open(&dbfile, "compact_test_non.manual", &fconfig);
1838    TEST_CHK(status == FDB_RESULT_INVALID_COMPACTION_MODE);
1839
1840    // Attempt to destroy manual compact file using auto compact mode
1841    status = fdb_destroy("compact_test_non.manual", &fconfig);
1842    TEST_CHK(status == FDB_RESULT_INVALID_COMPACTION_MODE);
1843
1844    // open auto copmact file (compact_test_manual_compacted) using manual compact mode
1845    fconfig.compaction_mode = FDB_COMPACTION_MANUAL;
1846    status = fdb_open(&dbfile, "compact_test_manual_compacted", &fconfig);
1847    TEST_CHK(status == FDB_RESULT_INVALID_COMPACTION_MODE);
1848
1849    // Attempt to destroy auto copmact file using manual compact mode
1850    status = fdb_destroy("compact_test_manual_compacted", &fconfig);
1851    TEST_CHK(status == FDB_RESULT_INVALID_COMPACTION_MODE);
1852
1853    // DESTROY auto copmact file with correct mode
1854    fconfig.compaction_mode = FDB_COMPACTION_AUTO;
1855    status = fdb_destroy("compact_test_manual_compacted", &fconfig);
1856    TEST_CHK(status == FDB_RESULT_SUCCESS);
1857
1858    // DESTROY manual compacted file with past version open!
1859    fconfig.compaction_mode = FDB_COMPACTION_MANUAL;
1860    status = fdb_destroy("compact_test_non.manual", &fconfig);
1861    TEST_CHK(status == FDB_RESULT_FILE_IS_BUSY);
1862    fdb_close(dbfile_non);
1863
1864    // Simulate a database crash by doing a premature shutdown
1865    // Note that db_non was never closed properly
1866    status = fdb_shutdown();
1867    TEST_CHK(status == FDB_RESULT_SUCCESS);
1868
1869    status = fdb_destroy("compact_test_non.manual", &fconfig);
1870    TEST_CHK(status == FDB_RESULT_SUCCESS);
1871
1872    // Attempt to read-only auto compacted and destroyed file
1873    fconfig.flags = FDB_OPEN_FLAG_RDONLY;
1874    status = fdb_open(&dbfile, "./compact_test_manual_compacted", &fconfig);
1875    TEST_CHK(status == FDB_RESULT_NO_SUCH_FILE);
1876
1877    status = fdb_open(&dbfile, "./compact_test_manual_compacted.meta", &fconfig);
1878    TEST_CHK(status == FDB_RESULT_NO_SUCH_FILE);
1879
1880    // Attempt to read-only past version of manually compacted destroyed file
1881    status = fdb_open(&dbfile, "compact_test_non", &fconfig);
1882    TEST_CHK(status == FDB_RESULT_NO_SUCH_FILE);
1883
1884    // Attempt to read-only current version of manually compacted destroyed file
1885    status = fdb_open(&dbfile, "compact_test_non.manual", &fconfig);
1886    TEST_CHK(status == FDB_RESULT_NO_SUCH_FILE);
1887
1888    // free all documents
1889    for (i=0;i<n;++i){
1890        fdb_doc_free(doc[i]);
1891    }
1892
1893    // free all resources
1894    fdb_shutdown();
1895
1896    memleak_end();
1897
1898    TEST_RESULT("compaction daemon test");
1899}
1900
1901// MB-13117
1902void auto_compaction_with_concurrent_insert_test(size_t t_limit)
1903{
1904    TEST_INIT();
1905
1906    memleak_start();
1907
1908    int i, r;
1909    fdb_file_handle *file;
1910    fdb_kvs_handle *kvs;
1911    fdb_status status;
1912    fdb_config config;
1913    fdb_kvs_config kvs_config;
1914    struct timeval ts_begin, ts_cur, ts_gap;
1915
1916    r = system(SHELL_DEL" compact_test* > errorlog.txt");
1917    (void)r;
1918
1919    // Open Database File
1920    config = fdb_get_default_config();
1921    config.compaction_mode=FDB_COMPACTION_AUTO;
1922    config.compactor_sleep_duration = 1;
1923    status = fdb_open(&file, "compact_test", &config);
1924    TEST_CHK(status == FDB_RESULT_SUCCESS);
1925
1926    // Open KV Store
1927    kvs_config = fdb_get_default_kvs_config();
1928    status = fdb_kvs_open_default(file, &kvs, &kvs_config);
1929    TEST_CHK(status == FDB_RESULT_SUCCESS);
1930
1931    gettimeofday(&ts_begin, NULL);
1932    printf("wait for %d seconds..\n", (int)t_limit);
1933
1934    // Several kv pairs
1935    for(i=0;i<100000;i++) {
1936        char str[15];
1937        sprintf(str, "%d", i);
1938        status = fdb_set_kv(kvs, str, strlen(str), (void*)"value", 5);
1939        TEST_CHK(status == FDB_RESULT_SUCCESS);
1940
1941        // Commit
1942        status = fdb_commit(file, FDB_COMMIT_NORMAL);
1943        TEST_CHK(status == FDB_RESULT_SUCCESS);
1944
1945        gettimeofday(&ts_cur, NULL);
1946        ts_gap = _utime_gap(ts_begin, ts_cur);
1947        if ((size_t)ts_gap.tv_sec >= t_limit) {
1948            break;
1949        }
1950    }
1951
1952    status = fdb_close(file);
1953    TEST_CHK(status == FDB_RESULT_SUCCESS);
1954    status = fdb_shutdown();
1955    TEST_CHK(status == FDB_RESULT_SUCCESS);
1956
1957    memleak_end();
1958
1959    TEST_RESULT("auto compaction with concurrent insert test");
1960}
1961
1962// lexicographically compares two variable-length binary streams
1963#define MIN(a,b) (((a)<(b))?(a):(b))
1964static int _compact_test_keycmp(void *key1, size_t keylen1,
1965                                void *key2, size_t keylen2)
1966{
1967    if (keylen1 == keylen2) {
1968        return memcmp(key1, key2, keylen1);
1969    }else {
1970        size_t len = MIN(keylen1, keylen2);
1971        int cmp = memcmp(key1, key2, len);
1972        if (cmp != 0) return cmp;
1973        else {
1974            return (int)((int)keylen1 - (int)keylen2);
1975        }
1976    }
1977}
1978
1979void auto_compaction_with_custom_cmp_function()
1980{
1981    TEST_INIT();
1982
1983    memleak_start();
1984
1985    int i, r, n=10000;
1986    char keybuf[256], bodybuf[256];
1987    uint64_t max_filesize = 0;
1988    fdb_file_handle *file;
1989    fdb_kvs_handle *db1, *db2, *db3;
1990    fdb_status status;
1991    fdb_config config;
1992    fdb_kvs_config kvs_config;
1993    fdb_file_info file_info;
1994    char *kvs_names[] = {NULL};
1995    fdb_custom_cmp_variable functions[] = {_compact_test_keycmp};
1996
1997    r = system(SHELL_DEL" compact_test* > errorlog.txt");
1998    (void)r;
1999
2000    // Open Database File
2001    config = fdb_get_default_config();
2002    config.wal_threshold = 4096; // reset WAL threshold for correct file size estimation
2003    config.compaction_mode=FDB_COMPACTION_AUTO;
2004    config.compactor_sleep_duration = 1;
2005    config.compaction_threshold = 10;
2006    status = fdb_open_custom_cmp(&file, "compact_test", &config,
2007                                 1, kvs_names, functions);
2008    TEST_CHK(status == FDB_RESULT_SUCCESS);
2009
2010    // Open 2 KV Stores
2011    kvs_config = fdb_get_default_kvs_config();
2012    status = fdb_kvs_open_default(file, &db1, &kvs_config);
2013    TEST_CHK(status == FDB_RESULT_SUCCESS);
2014    status = fdb_kvs_open(file, &db2, "db", &kvs_config);
2015    TEST_CHK(status == FDB_RESULT_SUCCESS);
2016
2017    // initial load
2018    for(i=0;i<n;i++) {
2019        sprintf(keybuf, "key%06d", i);
2020        sprintf(bodybuf, "body%06d", i);
2021        status = fdb_set_kv(db1, keybuf, strlen(keybuf),
2022                                 bodybuf, strlen(bodybuf));
2023        TEST_CHK(status == FDB_RESULT_SUCCESS);
2024        status = fdb_set_kv(db2, keybuf, strlen(keybuf),
2025                                 bodybuf, strlen(bodybuf));
2026        TEST_CHK(status == FDB_RESULT_SUCCESS);
2027    }
2028
2029    // create one more KVS using custom cmp
2030    // it doesn't exist on the initial cmp_func list
2031    kvs_config.custom_cmp = _compact_test_keycmp;
2032    status = fdb_kvs_open(file, &db3, "db_custom", &kvs_config);
2033    TEST_CHK(status == FDB_RESULT_SUCCESS);
2034    i = 0;
2035    sprintf(keybuf, "key%06d", i);
2036    sprintf(bodybuf, "body%06d", i);
2037    status = fdb_set_kv(db3, keybuf, strlen(keybuf),
2038                             bodybuf, strlen(bodybuf));
2039    TEST_CHK(status == FDB_RESULT_SUCCESS);
2040
2041    // Commit
2042    status = fdb_commit(file, FDB_COMMIT_NORMAL);
2043    TEST_CHK(status == FDB_RESULT_SUCCESS);
2044
2045    // update to trigger compaction
2046    for(i=0;i<n;i++) {
2047        sprintf(keybuf, "key%06d", i);
2048        sprintf(bodybuf, "body%06d", i);
2049        status = fdb_set_kv(db1, keybuf, strlen(keybuf),
2050                                 bodybuf, strlen(bodybuf));
2051        TEST_CHK(status == FDB_RESULT_SUCCESS);
2052
2053        status = fdb_get_file_info(file, &file_info);
2054        TEST_CHK(status == FDB_RESULT_SUCCESS);
2055        if (file_info.file_size > max_filesize) {
2056            max_filesize = file_info.file_size;
2057        }
2058    }
2059    // Commit
2060    status = fdb_commit(file, FDB_COMMIT_NORMAL);
2061    TEST_CHK(status == FDB_RESULT_SUCCESS);
2062
2063    status = fdb_get_file_info(file, &file_info);
2064    TEST_CHK(status == FDB_RESULT_SUCCESS);
2065    if (file_info.file_size > max_filesize) {
2066        max_filesize = file_info.file_size;
2067    }
2068
2069    printf("wait for daemon compaction completion... (max file size: %" _F64 ")\n", max_filesize);
2070    while (true) {
2071        sleep(1);
2072
2073        status = fdb_get_file_info(file, &file_info);
2074        TEST_CHK(status == FDB_RESULT_SUCCESS);
2075        if (file_info.file_size < max_filesize) {
2076            break;
2077        }
2078    }
2079    // should be compacted
2080    TEST_CHK(file_info.file_size < max_filesize);
2081
2082    status = fdb_close(file);
2083    TEST_CHK(status == FDB_RESULT_SUCCESS);
2084    status = fdb_shutdown();
2085    TEST_CHK(status == FDB_RESULT_SUCCESS);
2086
2087    memleak_end();
2088
2089    TEST_RESULT("auto compaction with custom comparison function");
2090}
2091
2092struct cb_txn_args {
2093    fdb_file_handle *file;
2094    fdb_kvs_handle *handle;
2095    int ndocs;
2096    int nupdates;
2097    int done;
2098};
2099
2100static int cb_txn(fdb_file_handle *fhandle,
2101                  fdb_compaction_status status, const char *kv_name,
2102                  fdb_doc *doc, uint64_t old_offset, uint64_t new_offset,
2103                  void *ctx)
2104{
2105    struct cb_txn_args *args = (struct cb_txn_args *)ctx;
2106    (void) fhandle;
2107    (void) doc;
2108    (void) old_offset;
2109    (void) new_offset;
2110
2111    if (status == FDB_CS_END && !args->done) {
2112        int i;
2113        int n = 10;
2114        char keybuf[256], bodybuf[256];
2115        fdb_status s;
2116
2117        // begin transaction
2118        fdb_begin_transaction(args->file, FDB_ISOLATION_READ_COMMITTED);
2119        // insert new docs, but do not end transaction
2120        for (i=0;i<n/2;++i){
2121            sprintf(keybuf, "txn%04d", i);
2122            sprintf(bodybuf, "txn_body%04d", i);
2123            s = fdb_set_kv(args->handle, keybuf, strlen(keybuf)+1,
2124                                         bodybuf, strlen(bodybuf)+1);
2125            (void)s;
2126        }
2127        args->done = 1;
2128    }
2129
2130    return 0;
2131}
2132
2133void compaction_with_concurrent_transaction_test()
2134{
2135    TEST_INIT();
2136    memleak_start();
2137
2138    int i, r;
2139    int n = 10;
2140    size_t valuelen;
2141    char keybuf[256], bodybuf[256];
2142    fdb_file_handle *dbfile, *txn_file;
2143    fdb_kvs_handle *db, *txn;
2144    fdb_status s;
2145    fdb_config fconfig = fdb_get_default_config();
2146    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2147    void *value;
2148    struct cb_txn_args cb_args;
2149
2150    memset(&cb_args, 0x0, sizeof(struct cb_txn_args));
2151    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2152    fconfig.compaction_cb = cb_txn;
2153    fconfig.compaction_cb_ctx = &cb_args;
2154    fconfig.compaction_cb_mask = FDB_CS_BEGIN |
2155                                 FDB_CS_END;
2156
2157    // remove previous compact_test files
2158    r = system(SHELL_DEL" compact_test* > errorlog.txt");
2159    (void)r;
2160
2161    // open db
2162    fdb_open(&dbfile, "./compact_test1", &fconfig);
2163    fdb_kvs_open(dbfile, &db, "db", &kvs_config);
2164
2165    fdb_open(&txn_file, "./compact_test1", &fconfig);
2166    fdb_kvs_open(txn_file, &txn, "db", &kvs_config);
2167    cb_args.file = txn_file;
2168    cb_args.handle = txn;
2169
2170    // write docs & commit
2171    for (i=0;i<n;++i){
2172        sprintf(keybuf, "key%04d", i);
2173        sprintf(bodybuf, "body%04d", i);
2174        s = fdb_set_kv(db, keybuf, strlen(keybuf)+1,
2175                           bodybuf, strlen(bodybuf)+1);
2176        TEST_CHK(s == FDB_RESULT_SUCCESS);
2177    }
2178    s = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2179    TEST_CHK(s == FDB_RESULT_SUCCESS);
2180    cb_args.ndocs = n;
2181    cb_args.nupdates = 2;
2182
2183    s = fdb_compact(dbfile, "./compact_test2");
2184    TEST_CHK(s == FDB_RESULT_SUCCESS);
2185
2186    // insert new docs through transaction
2187    for (i=n/2;i<n;++i){
2188        sprintf(keybuf, "txn%04d", i);
2189        sprintf(bodybuf, "txn_body%04d", i);
2190        s = fdb_set_kv(txn, keybuf, strlen(keybuf)+1,
2191                            bodybuf, strlen(bodybuf)+1);
2192        TEST_CHK(s == FDB_RESULT_SUCCESS);
2193    }
2194    // all txn docs should be retrieved
2195    for (i=0;i<n;++i){
2196        sprintf(keybuf, "txn%04d", i);
2197        sprintf(bodybuf, "txn_body%04d", i);
2198        s = fdb_get_kv(txn, keybuf, strlen(keybuf)+1,
2199                            &value, &valuelen);
2200        TEST_CHK(s == FDB_RESULT_SUCCESS);
2201        TEST_CMP(value, bodybuf, valuelen);
2202        fdb_free_block(value);
2203    }
2204    s = fdb_end_transaction(txn_file, FDB_COMMIT_MANUAL_WAL_FLUSH);
2205    TEST_CHK(s == FDB_RESULT_SUCCESS);
2206    fdb_close(txn_file);
2207
2208    s = fdb_compact(dbfile, "./compact_test3");
2209    TEST_CHK(s == FDB_RESULT_SUCCESS);
2210
2211    fdb_close(dbfile);
2212    fdb_shutdown();
2213    memleak_end();
2214    TEST_RESULT("compaction with concurrent transaction test");
2215}
2216
2217struct cb_upt_args {
2218    fdb_file_handle *file;
2219    fdb_kvs_handle *handle;
2220    fdb_kvs_handle *handle2;
2221    int ndocs;
2222    int nupdates;
2223    int done;
2224    int nmoves;
2225};
2226
2227static fdb_compact_decision cb_upt(fdb_file_handle *fhandle,
2228                                   fdb_compaction_status status,
2229                                   const char *kv_name,
2230                                   fdb_doc *doc, uint64_t old_offset,
2231                                   uint64_t new_offset,
2232                                   void *ctx)
2233{
2234    TEST_INIT();
2235    struct cb_upt_args *args = (struct cb_upt_args *)ctx;
2236    (void) fhandle;
2237    (void) doc;
2238    (void) old_offset;
2239    (void) new_offset;
2240
2241    if (status == FDB_CS_MOVE_DOC) {
2242        int i, j;
2243        int n = 10;
2244        char keybuf[256], bodybuf[256];
2245        char keystr[] = "new%04d";
2246        char valuestr[] = "new_body%04d";
2247        fdb_status s;
2248        args->nmoves++;
2249        if (args->nmoves > n/2 && !args->done) {
2250            // verify if the key is stripped off its prefix
2251            fdb_doc tdoc;
2252            memset(&tdoc, 0, sizeof(fdb_doc));
2253            tdoc.key = doc->key;
2254            tdoc.keylen = doc->keylen;
2255            s = fdb_get(args->handle, &tdoc);
2256            TEST_CHK(s == FDB_RESULT_SUCCESS);
2257            free(tdoc.meta);
2258            free(tdoc.body);
2259            // insert new docs
2260            for (i=0;i<n/2;++i){
2261                sprintf(keybuf, keystr, i);
2262                sprintf(bodybuf, valuestr, i);
2263                s = fdb_set_kv(args->handle, keybuf, strlen(keybuf)+1,
2264                                             bodybuf, strlen(bodybuf)+1);
2265                TEST_CHK(s == FDB_RESULT_SUCCESS);
2266                s = fdb_commit(args->file, FDB_COMMIT_NORMAL);
2267                TEST_CHK(s == FDB_RESULT_SUCCESS);
2268
2269                for (j=0; j<=i; ++j){
2270                    void *v_out;
2271                    size_t vlen_out;
2272                    sprintf(keybuf, keystr, i);
2273                    sprintf(bodybuf, valuestr, i);
2274                    s = fdb_get_kv(args->handle2, keybuf, strlen(keybuf)+1,
2275                        &v_out, &vlen_out);
2276                    TEST_CHK(s == FDB_RESULT_SUCCESS);
2277                    fdb_free_block(v_out);
2278                }
2279            }
2280            args->done = 1;
2281        }
2282        if (args->nmoves > n && args->done == 1) {
2283            // the first phase is done. insert new docs.
2284            for (i=0;i<2;++i){
2285                sprintf(keybuf, "xxx%d", i);
2286                sprintf(bodybuf, "xxxvalue%d", i);
2287                s = fdb_set_kv(args->handle, keybuf, strlen(keybuf)+1,
2288                                             bodybuf, strlen(bodybuf)+1);
2289                TEST_CHK(s == FDB_RESULT_SUCCESS);
2290                s = fdb_commit(args->file, FDB_COMMIT_NORMAL);
2291                TEST_CHK(s == FDB_RESULT_SUCCESS);
2292            }
2293            args->done = 2;
2294        }
2295        if (args->nmoves == (n*3/2 + 2) && args->done == 2) {
2296            // during the second-second phase,
2297            // insert new docs, and do not commit.
2298            for (i=0;i<2;++i){
2299                sprintf(keybuf, "zzz%d", i);
2300                sprintf(bodybuf, "zzzvalue%d", i);
2301                s = fdb_set_kv(args->handle, keybuf, strlen(keybuf)+1,
2302                                             bodybuf, strlen(bodybuf)+1);
2303                TEST_CHK(s == FDB_RESULT_SUCCESS);
2304            }
2305            args->done = 3;
2306        }
2307        TEST_CMP(kv_name, "db", 2);
2308    }
2309
2310    return 0;
2311}
2312
2313void compaction_with_concurrent_update_test()
2314{
2315    TEST_INIT();
2316    memleak_start();
2317
2318    int i, r;
2319    int n = 10;
2320    char keybuf[256], bodybuf[256];
2321    fdb_file_handle *dbfile, *dbfile2, *dbfile3;
2322    fdb_kvs_handle *db, *db2, *db3;
2323    fdb_status s;
2324    fdb_config fconfig = fdb_get_default_config();
2325    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2326    fdb_kvs_info info;
2327    fdb_seqnum_t seqnum;
2328    struct cb_upt_args cb_args;
2329    void *value;
2330    size_t valuelen;
2331
2332    memset(&cb_args, 0x0, sizeof(struct cb_upt_args));
2333    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2334    fconfig.compaction_cb = cb_upt;
2335    fconfig.compaction_cb_ctx = &cb_args;
2336    fconfig.compaction_cb_mask = FDB_CS_BEGIN |
2337                                 FDB_CS_END | FDB_CS_MOVE_DOC;
2338
2339    // remove previous compact_test files
2340    r = system(SHELL_DEL" compact_test* > errorlog.txt");
2341    (void)r;
2342
2343    // open db
2344    fdb_open(&dbfile, "./compact_test1", &fconfig);
2345    fdb_kvs_open(dbfile, &db, "db", &kvs_config);
2346
2347    fdb_open(&dbfile2, "./compact_test1", &fconfig);
2348    fdb_kvs_open(dbfile2, &db2, "db", &kvs_config);
2349    fdb_open(&dbfile3, "./compact_test1", &fconfig);
2350    fdb_kvs_open(dbfile3, &db3, "db", &kvs_config);
2351
2352    cb_args.file = dbfile2;
2353    cb_args.handle = db2;
2354    cb_args.handle2 = db3;
2355
2356    // write docs & commit
2357    for (i=0;i<n;++i){
2358        sprintf(keybuf, "key%04d", i);
2359        sprintf(bodybuf, "body%04d", i);
2360        s = fdb_set_kv(db, keybuf, strlen(keybuf)+1,
2361                           bodybuf, strlen(bodybuf)+1);
2362        TEST_CHK(s == FDB_RESULT_SUCCESS);
2363    }
2364    s = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2365    TEST_CHK(s == FDB_RESULT_SUCCESS);
2366    cb_args.ndocs = n;
2367    cb_args.nupdates = 2;
2368    cb_args.nmoves = 0;
2369
2370    s = fdb_compact(dbfile, "./compact_test2");
2371    TEST_CHK(s == FDB_RESULT_SUCCESS);
2372
2373    // insert new docs after compaction
2374    for (i=n/2;i<n;++i){
2375        sprintf(keybuf, "new%04d", i);
2376        sprintf(bodybuf, "new_body%04d", i);
2377        s = fdb_set_kv(db2, keybuf, strlen(keybuf)+1,
2378                            bodybuf, strlen(bodybuf)+1);
2379        TEST_CHK(s == FDB_RESULT_SUCCESS);
2380    }
2381    // all interleaved docs should be retrieved
2382    // 1. inserted during the first phase of the compaction
2383    for (i=0;i<n;++i){
2384        sprintf(keybuf, "new%04d", i);
2385        sprintf(bodybuf, "new_body%04d", i);
2386        s = fdb_get_kv(db2, keybuf, strlen(keybuf)+1,
2387                            &value, &valuelen);
2388        TEST_CHK(s == FDB_RESULT_SUCCESS);
2389        TEST_CMP(value, bodybuf, valuelen);
2390        fdb_free_block(value);
2391    }
2392    // 2. inserted during the second phase of the compaction
2393    for (i=0; i<2; ++i) {
2394        sprintf(keybuf, "xxx%d", i);
2395        sprintf(bodybuf, "xxxvalue%d", i);
2396        s = fdb_get_kv(db2, keybuf, strlen(keybuf)+1,
2397                            &value, &valuelen);
2398        TEST_CHK(s == FDB_RESULT_SUCCESS);
2399        TEST_CMP(value, bodybuf, valuelen);
2400        fdb_free_block(value);
2401
2402        sprintf(keybuf, "zzz%d", i);
2403        sprintf(bodybuf, "zzzvalue%d", i);
2404        s = fdb_get_kv(db2, keybuf, strlen(keybuf)+1,
2405                            &value, &valuelen);
2406        TEST_CHK(s == FDB_RESULT_SUCCESS);
2407        TEST_CMP(value, bodybuf, valuelen);
2408        fdb_free_block(value);
2409    }
2410
2411    fdb_close(dbfile2);
2412    fdb_close(dbfile3);
2413
2414    s = fdb_get_kvs_info(db, &info);
2415    TEST_CHK(s == FDB_RESULT_SUCCESS);
2416    seqnum = info.last_seqnum;
2417
2418    cb_args.nmoves = 0;
2419    s = fdb_compact(dbfile, "./compact_test3");
2420    TEST_CHK(s == FDB_RESULT_SUCCESS);
2421
2422    s = fdb_get_kvs_info(db, &info);
2423    TEST_CHK(s == FDB_RESULT_SUCCESS);
2424    // No docs are inserted during the compaction.
2425    // Seqnum should be same.
2426    TEST_CHK(info.last_seqnum == seqnum);
2427
2428    // insert the last document
2429    sprintf(keybuf, "last");
2430    sprintf(bodybuf, "last_value");
2431    s = fdb_set_kv(db, keybuf, strlen(keybuf)+1, bodybuf, strlen(bodybuf)+1);
2432    TEST_CHK(s == FDB_RESULT_SUCCESS);
2433    s = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2434    TEST_CHK(s == FDB_RESULT_SUCCESS);
2435
2436    fdb_close(dbfile);
2437    fdb_shutdown();
2438    memleak_end();
2439    TEST_RESULT("compaction with concurrent update test");
2440}
2441
2442static int compaction_del_cb(fdb_file_handle *fhandle,
2443                             fdb_compaction_status status,
2444                             const char *kv_name,
2445                             fdb_doc *doc, uint64_t old_offset,
2446                             uint64_t new_offset,
2447                             void *ctx)
2448{
2449    TEST_INIT();
2450    fdb_status s;
2451    fdb_file_handle *dbfile;
2452    fdb_kvs_handle *db = *(fdb_kvs_handle **)ctx;
2453    int i;
2454    int n = 10; // if changing this change caller function too
2455    char keybuf[256];
2456    (void) doc;
2457    (void) new_offset;
2458    (void) old_offset;
2459
2460    if (status == FDB_CS_BEGIN) {
2461        TEST_CHK(false);
2462    } else if (status == FDB_CS_END) {
2463        TEST_CHK(!kv_name);
2464        if (db) { // At end of first phase, mutate more docs...
2465            s = fdb_open(&dbfile, "./compact_test1", &fhandle->root->config);
2466            TEST_CHK(s == FDB_RESULT_SUCCESS);
2467            fdb_kvs_open_default(dbfile, &db, &db->kvs_config);
2468            for (i = 0; i < n; ++i){
2469                sprintf(keybuf, "key%d", i);
2470                s = fdb_del_kv(db, keybuf, strlen(keybuf));
2471                TEST_CHK(s == FDB_RESULT_SUCCESS);
2472            }
2473            // Now insert and delete a bunch of keys (all in WAL)
2474            for (i = 0; i < n; ++i){
2475                sprintf(keybuf, "KEY%d", i);
2476                s = fdb_set_kv(db, keybuf, strlen(keybuf), NULL, 0);
2477                TEST_CHK(s == FDB_RESULT_SUCCESS);
2478                s = fdb_del_kv(db, keybuf, strlen(keybuf));
2479                TEST_CHK(s == FDB_RESULT_SUCCESS);
2480            }
2481            sprintf(keybuf, "key%d", i);
2482            s = fdb_set_kv(db, keybuf, strlen(keybuf), NULL, 0);
2483            TEST_CHK(s == FDB_RESULT_SUCCESS);
2484            s = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2485            TEST_CHK(s == FDB_RESULT_SUCCESS);
2486            fdb_close(dbfile);
2487            *(fdb_kvs_handle **)ctx = NULL; // Don't run in 2nd, 3rd phases
2488        }
2489    } else if (status == FDB_CS_FLUSH_WAL) {
2490        TEST_CHK(false);
2491    } else if (status == FDB_CS_MOVE_DOC) {
2492        TEST_CHK(false);
2493    } else { // FDB_CS_BATCH_MOVE
2494        TEST_CHK(false);
2495    }
2496    return 0;
2497}
2498
2499void compact_deleted_doc_test()
2500{
2501    TEST_INIT();
2502    memleak_start();
2503    int i, r;
2504    int n = 10; // if changing this change the callback too
2505    fdb_file_handle *dbfile;
2506    fdb_kvs_handle *db, *cb_db;
2507    fdb_doc *rdoc;
2508    fdb_status s;
2509    char keybuf[256];
2510    void *value;
2511    size_t valuelen;
2512
2513    // remove previous compact_test files
2514    r = system(SHELL_DEL " compact_test* > errorlog.txt");
2515    (void)r;
2516
2517    fdb_config fconfig = fdb_get_default_config();
2518    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2519    fconfig.multi_kv_instances = false;
2520    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2521
2522    fconfig.compaction_cb = compaction_del_cb;
2523    fconfig.compaction_cb_ctx = &cb_db;
2524    fconfig.compaction_cb_mask = FDB_CS_END;
2525
2526    // open db
2527    fdb_open(&dbfile, "./compact_test1", &fconfig);
2528    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2529    cb_db = db;
2530
2531    for (i=0;i<n;++i){
2532        sprintf(keybuf, "key%d", i);
2533        s = fdb_set_kv(db, keybuf, strlen(keybuf),
2534                            (void*)"value", 5);
2535        TEST_CHK(s == FDB_RESULT_SUCCESS);
2536    }
2537    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2538
2539    // At end of phase 1, all documents get deleted
2540    s = fdb_compact(dbfile, "compact_test2");
2541    TEST_CHK(s == FDB_RESULT_SUCCESS);
2542    TEST_CHK(wal_get_num_flushable(dbfile->root->file) == 0);
2543
2544    for (i=0;i<n;++i){
2545        sprintf(keybuf, "key%d", i);
2546        fdb_doc_create(&rdoc, keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2547        s = fdb_get_metaonly(db, rdoc);
2548        TEST_CHK(s == FDB_RESULT_KEY_NOT_FOUND);
2549        fdb_doc_free(rdoc);
2550    }
2551
2552    // documents inserted in the middle of phase 1 should be present
2553    sprintf(keybuf, "key%d", i);
2554    s = fdb_get_kv(db, keybuf, strlen(keybuf), &value, &valuelen);
2555    TEST_CHK(s == FDB_RESULT_SUCCESS);
2556    fdb_free_block(value);
2557
2558    fdb_kvs_close(db);
2559    fdb_close(dbfile);
2560    fdb_shutdown();
2561    memleak_end();
2562    TEST_RESULT("compact deleted doc test");
2563}
2564
2565void compact_upto_twice_test()
2566{
2567    TEST_INIT();
2568    memleak_start();
2569    int i, r;
2570    fdb_file_handle *dbfile;
2571    fdb_kvs_handle *db;
2572    fdb_snapshot_info_t *markers;
2573    fdb_status status;
2574    uint64_t num_markers;
2575    char keybuf[256];
2576
2577    // remove previous compact_test files
2578    r = system(SHELL_DEL " compact_test* > errorlog.txt");
2579    (void)r;
2580
2581    fdb_config fconfig = fdb_get_default_config();
2582    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2583    fconfig.wal_threshold = 1024;
2584    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2585    fconfig.block_reusing_threshold = 0;
2586
2587    // open db
2588    fdb_open(&dbfile, "./compact_test1", &fconfig);
2589    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2590
2591    for (i=0;i<10;++i){
2592        sprintf(keybuf, "key%d", i);
2593        status = fdb_set_kv(db, keybuf, strlen(keybuf),
2594                            (void*)"value", 5);
2595        TEST_CHK(status == FDB_RESULT_SUCCESS);
2596        fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2597    }
2598
2599    // compact upto twice with incrementing seqnums
2600    status = fdb_get_all_snap_markers(dbfile, &markers,
2601                                      &num_markers);
2602    TEST_CHK(status == FDB_RESULT_SUCCESS);
2603    status = fdb_compact_upto(dbfile, NULL, markers[num_markers-1].marker);
2604    TEST_CHK(status == FDB_RESULT_SUCCESS);
2605    status = fdb_free_snap_markers(markers, num_markers);
2606    TEST_CHK(status == FDB_RESULT_SUCCESS);
2607
2608    // compact upto
2609    status = fdb_get_all_snap_markers(dbfile, &markers,
2610                                      &num_markers);
2611    TEST_CHK(status == FDB_RESULT_SUCCESS);
2612    status = fdb_compact_upto(dbfile, NULL, markers[num_markers-2].marker);
2613    TEST_CHK(status == FDB_RESULT_SUCCESS);
2614    status = fdb_free_snap_markers(markers, num_markers);
2615    TEST_CHK(status == FDB_RESULT_SUCCESS);
2616
2617    fdb_close(dbfile);
2618    fdb_shutdown();
2619    memleak_end();
2620    TEST_RESULT("compact upto twice");
2621}
2622
2623void wal_delete_compact_upto_test()
2624{
2625    TEST_INIT();
2626    memleak_start();
2627    int i, r;
2628    fdb_file_handle *dbfile;
2629    fdb_kvs_handle *db;
2630    fdb_snapshot_info_t *markers;
2631    fdb_status status;
2632    uint64_t num_markers;
2633    fdb_kvs_info kvs_info;
2634    char keybuf[256];
2635    void *value;
2636    size_t valuelen;
2637
2638    // remove previous compact_test files
2639    r = system(SHELL_DEL " compact_test* > errorlog.txt");
2640    (void)r;
2641
2642    fdb_config fconfig = fdb_get_default_config();
2643    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2644    fconfig.wal_threshold = 19;
2645    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2646
2647    // open db
2648    fdb_open(&dbfile, "./compact_test5", &fconfig);
2649    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2650
2651    // insert a few keys...
2652    for (i=0;i<10;++i){
2653        sprintf(keybuf, "key%d", i);
2654        status = fdb_set_kv(db, keybuf, strlen(keybuf),
2655                            (void*)"value", 5);
2656        TEST_CHK(status == FDB_RESULT_SUCCESS);
2657    }
2658
2659    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2660    TEST_CHK(status == FDB_RESULT_SUCCESS);
2661
2662    // insert keys such that last insert causes wal flush...
2663    for (; i<20;++i){
2664        sprintf(keybuf, "key%d", i);
2665        status = fdb_set_kv(db, keybuf, strlen(keybuf),
2666                            (void*)"value", 5);
2667        TEST_CHK(status == FDB_RESULT_SUCCESS);
2668    }
2669    // Now delete half of the newly inserted keys
2670    for (i = 15; i < 20; ++i){
2671        sprintf(keybuf, "key%d", i);
2672        status = fdb_del_kv(db, keybuf, strlen(keybuf));
2673        TEST_CHK(status == FDB_RESULT_SUCCESS);
2674    }
2675    // Now deleted items are in the unflushed wal..
2676    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2677    TEST_CHK(status == FDB_RESULT_SUCCESS);
2678
2679    // Create another header for compact_upto()
2680    sprintf(keybuf, "key%d", i);
2681    status = fdb_set_kv(db, keybuf, strlen(keybuf),
2682            (void*)"value", 5);
2683    TEST_CHK(status == FDB_RESULT_SUCCESS);
2684    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2685    TEST_CHK(status == FDB_RESULT_SUCCESS);
2686
2687    // Get all snap markers..
2688    status = fdb_get_all_snap_markers(dbfile, &markers,
2689                                      &num_markers);
2690    TEST_CHK(status == FDB_RESULT_SUCCESS);
2691
2692    // compact upto the delete wal's marker...
2693    status = fdb_compact_upto(dbfile, NULL, markers[1].marker);
2694    TEST_CHK(status == FDB_RESULT_SUCCESS);
2695    status = fdb_free_snap_markers(markers, num_markers);
2696    TEST_CHK(status == FDB_RESULT_SUCCESS);
2697
2698    status = fdb_get_kvs_info(db, &kvs_info);
2699    TEST_CHK(status == FDB_RESULT_SUCCESS);
2700
2701    // Deleted items should not be found..
2702    for (i = 15; i < 20; ++i){
2703        sprintf(keybuf, "key%d", i);
2704        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value, &valuelen);
2705        TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
2706    }
2707
2708    fdb_kvs_close(db);
2709    fdb_close(dbfile);
2710    fdb_shutdown();
2711    memleak_end();
2712    TEST_RESULT("compact upto with wal deletes test");
2713}
2714
2715void compact_upto_post_snapshot_test()
2716{
2717    TEST_INIT();
2718    memleak_start();
2719    int i, r;
2720    fdb_file_handle *dbfile;
2721    fdb_kvs_handle *db, *snap_db;
2722    fdb_snapshot_info_t *markers;
2723    fdb_status status;
2724    fdb_iterator *iterator;
2725    fdb_doc *rdoc = NULL;
2726    fdb_kvs_info kvs_info;
2727    uint64_t num_markers;
2728    char keybuf[256];
2729
2730    // remove previous compact_test files
2731    r = system(SHELL_DEL " compact_test* > errorlog.txt");
2732    (void)r;
2733
2734    fdb_config fconfig = fdb_get_default_config();
2735    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2736    fconfig.wal_threshold = 1024;
2737    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2738    // prevent block reusing to keep snapshots
2739    fconfig.block_reusing_threshold = 0;
2740
2741    // open db
2742    fdb_open(&dbfile, "./compact_test1", &fconfig);
2743    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2744
2745    for (i=0;i<10;++i){
2746        sprintf(keybuf, "key%d", i);
2747        status = fdb_set_kv(db, keybuf, strlen(keybuf),
2748                            (void*)"value", 5);
2749        TEST_CHK(status == FDB_RESULT_SUCCESS);
2750        fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2751    }
2752
2753    // check  db info
2754    status = fdb_get_kvs_info(db, &kvs_info);
2755    TEST_CHK(status == FDB_RESULT_SUCCESS);
2756    TEST_CHK(kvs_info.last_seqnum == 10);
2757
2758    // open snapshot at seqnum 5
2759    status = fdb_snapshot_open(db, &snap_db, 5);
2760    TEST_CHK(status == FDB_RESULT_SUCCESS);
2761
2762    // check snap info
2763    status = fdb_get_kvs_info(snap_db, &kvs_info);
2764    TEST_CHK(status == FDB_RESULT_SUCCESS);
2765    TEST_CHK(kvs_info.last_seqnum == 5);
2766
2767
2768    // compact upto
2769    status = fdb_get_all_snap_markers(dbfile, &markers,
2770                                      &num_markers);
2771    TEST_CHK(status == FDB_RESULT_SUCCESS);
2772    status = fdb_compact_upto(dbfile, NULL, markers[5].marker);
2773    TEST_CHK(status == FDB_RESULT_SUCCESS);
2774    status = fdb_free_snap_markers(markers, num_markers);
2775    TEST_CHK(status == FDB_RESULT_SUCCESS);
2776
2777    // check db and snap info
2778    status = fdb_get_kvs_info(db, &kvs_info);
2779    TEST_CHK(status == FDB_RESULT_SUCCESS);
2780    TEST_CHK(kvs_info.last_seqnum == 10);
2781    status = fdb_get_kvs_info(snap_db, &kvs_info);
2782    TEST_CHK(status == FDB_RESULT_SUCCESS);
2783    TEST_CHK(kvs_info.last_seqnum == 5);
2784
2785
2786    // iterate over snapshot
2787    status = fdb_iterator_init(snap_db,
2788                               &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
2789    TEST_CHK(status == FDB_RESULT_SUCCESS);
2790    i = 0;
2791    do {
2792        status = fdb_iterator_get(iterator, &rdoc);
2793        TEST_CHK(status == FDB_RESULT_SUCCESS);
2794        fdb_doc_free(rdoc);
2795        rdoc = NULL;
2796        i++;
2797
2798        // check db and snap info
2799        status = fdb_get_kvs_info(db, &kvs_info);
2800        TEST_CHK(status == FDB_RESULT_SUCCESS);
2801        TEST_CHK(kvs_info.last_seqnum == 10);
2802        status = fdb_get_kvs_info(snap_db, &kvs_info);
2803        TEST_CHK(status == FDB_RESULT_SUCCESS);
2804        TEST_CHK(kvs_info.last_seqnum == 5);
2805    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
2806
2807    TEST_CHK(i == 5);
2808
2809    fdb_iterator_close(iterator);
2810    fdb_kvs_close(snap_db);
2811    fdb_close(dbfile);
2812    fdb_shutdown();
2813    memleak_end();
2814    TEST_RESULT("compact upto post snapshot test");
2815}
2816
2817void compact_upto_overwrite_test(int opt)
2818{
2819    TEST_INIT();
2820
2821    int n = 10, value_len=32;
2822    int i, r, idx, c;
2823    char cmd[256];
2824    char key[256], *value;
2825    char keystr[] = "key%06d";
2826    char valuestr[] = "value%08d";
2827    char valuestr2[] = "updated_value%08d";
2828    fdb_file_handle *db_file;
2829    fdb_kvs_handle *db, *snap;
2830    fdb_config config;
2831    fdb_kvs_config kvs_config;
2832    fdb_status s;
2833    fdb_kvs_info kvs_info;
2834    fdb_iterator *fit;
2835    fdb_doc *doc;
2836    fdb_snapshot_info_t *markers;
2837    fdb_seqnum_t seqnum;
2838    fdb_commit_opt_t commit_opt;
2839    uint64_t n_markers;
2840
2841    sprintf(cmd, SHELL_DEL " compact_test* > errorlog.txt");
2842    r = system(cmd);
2843    (void)r;
2844
2845    memleak_start();
2846
2847    value = (char*)malloc(value_len);
2848
2849    config = fdb_get_default_config();
2850    config.durability_opt = FDB_DRB_ASYNC;
2851    config.seqtree_opt = FDB_SEQTREE_USE;
2852    config.wal_flush_before_commit = true;
2853    config.multi_kv_instances = true;
2854    config.buffercache_size = 0;
2855    config.block_reusing_threshold = 0;
2856
2857    commit_opt = (opt)?FDB_COMMIT_NORMAL:FDB_COMMIT_MANUAL_WAL_FLUSH;
2858
2859    kvs_config = fdb_get_default_kvs_config();
2860
2861    s = fdb_open(&db_file, "./compact_test", &config);
2862    TEST_CHK(s == FDB_RESULT_SUCCESS);
2863    s = fdb_kvs_open(db_file, &db, "db", &kvs_config);
2864    TEST_CHK(s == FDB_RESULT_SUCCESS);
2865
2866    // === write ===
2867    for (i=0;i<n;++i){
2868        idx = i % (n/2);
2869        sprintf(key, keystr, idx);
2870        memset(value, 'x', value_len);
2871        memcpy(value + value_len - 6, "<end>", 6);
2872        if (i < (n/2)) {
2873            sprintf(value, valuestr, idx);
2874        } else {
2875            sprintf(value, valuestr2, idx);
2876        }
2877        s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
2878        TEST_CHK(s == FDB_RESULT_SUCCESS);
2879
2880        if (opt == 2) {
2881            // HB+trie, WAL, HB+trie, WAL...
2882            commit_opt = (i%2)?FDB_COMMIT_NORMAL:FDB_COMMIT_MANUAL_WAL_FLUSH;
2883        } else if (opt == 3) {
2884            // WAL, HB+trie, WAL, HB+trie...
2885            commit_opt = (i%2)?FDB_COMMIT_MANUAL_WAL_FLUSH:FDB_COMMIT_NORMAL;
2886        }
2887
2888        s = fdb_commit(db_file, commit_opt);
2889        TEST_CHK(s == FDB_RESULT_SUCCESS);
2890        s = fdb_get_kvs_info(db, &kvs_info);
2891        TEST_CHK(s == FDB_RESULT_SUCCESS);
2892    }
2893
2894    s = fdb_get_all_snap_markers(db_file, &markers, &n_markers);
2895    TEST_CHK(s == FDB_RESULT_SUCCESS);
2896
2897    int upto = n_markers/2;
2898    s = fdb_compact_upto(db_file, "./compact_test2", markers[upto].marker);
2899    TEST_CHK(s == FDB_RESULT_SUCCESS);
2900
2901    // iterating using snapshots with various seqnums
2902    for (i=n_markers-1; i>=0; --i) {
2903        seqnum = markers[i].kvs_markers->seqnum;
2904
2905        s = fdb_snapshot_open(db, &snap, seqnum);
2906        if (i<=upto) {
2907            TEST_CHK(s == FDB_RESULT_SUCCESS);
2908        } else {
2909            // seqnum < (n/2) must fail
2910            TEST_CHK(s != FDB_RESULT_SUCCESS);
2911            continue;
2912        }
2913
2914        s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0);
2915        TEST_CHK(s == FDB_RESULT_SUCCESS);
2916        c = 0;
2917        do {
2918            doc = NULL;
2919            s = fdb_iterator_get(fit, &doc);
2920            if (s != FDB_RESULT_SUCCESS) break;
2921            idx = c;
2922            sprintf(key, keystr, idx);
2923            memset(value, 'x', value_len);
2924            memcpy(value + value_len - 6, "<end>", 6);
2925            if ((fdb_seqnum_t)c >= seqnum-(n/2)) {
2926                sprintf(value, valuestr, idx);
2927            } else {
2928                sprintf(value, valuestr2, idx);
2929            }
2930            TEST_CMP(doc->key, key, doc->keylen);
2931            TEST_CMP(doc->body, value, doc->bodylen);
2932            c++;
2933            fdb_doc_free(doc);
2934        } while(fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
2935        s = fdb_iterator_close(fit);
2936        TEST_CHK(s == FDB_RESULT_SUCCESS);
2937        TEST_CHK(c == (n/2));
2938
2939        s = fdb_kvs_close(snap);
2940        TEST_CHK(s == FDB_RESULT_SUCCESS);
2941    }
2942
2943    // iterating using the original handle
2944    s = fdb_iterator_init(db, &fit, NULL, 0, NULL, 0, 0);
2945    TEST_CHK(s == FDB_RESULT_SUCCESS);
2946    c = 0;
2947    do {
2948        doc = NULL;
2949        s = fdb_iterator_get(fit, &doc);
2950        if (s != FDB_RESULT_SUCCESS) break;
2951        idx = c;
2952        sprintf(key, keystr, idx);
2953        memset(value, 'x', value_len);
2954        memcpy(value + value_len - 6, "<end>", 6);
2955        sprintf(value, valuestr2, idx);
2956        TEST_CMP(doc->key, key, doc->keylen);
2957        TEST_CMP(doc->body, value, doc->bodylen);
2958        c++;
2959        fdb_doc_free(doc);
2960    } while(fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
2961    s = fdb_iterator_close(fit);
2962    TEST_CHK(s == FDB_RESULT_SUCCESS);
2963    TEST_CHK(c == (n/2));
2964
2965    s = fdb_free_snap_markers(markers, n_markers);
2966    TEST_CHK(s == FDB_RESULT_SUCCESS);
2967    free(value);
2968    s = fdb_close(db_file);
2969    TEST_CHK(s == FDB_RESULT_SUCCESS);
2970    s = fdb_shutdown();
2971    TEST_CHK(s == FDB_RESULT_SUCCESS);
2972
2973    memleak_end();
2974
2975    sprintf(cmd, "compact upto overwrite test");
2976    if (opt == 0) {
2977        strcat(cmd, " (HB+trie)");
2978    } else if (opt == 1) {
2979        strcat(cmd, " (WAL)");
2980    } else if (opt == 2) {
2981        strcat(cmd, " (mixed, HB+trie/WAL)");
2982    } else if (opt == 3) {
2983        strcat(cmd, " (mixed, WAL/HB+trie)");
2984    }
2985    TEST_RESULT(cmd);
2986}
2987static int compaction_cb_get(fdb_file_handle *fhandle,
2988                         fdb_compaction_status status, const char *kv_name,
2989                         fdb_doc *doc, uint64_t old_offset,
2990                         uint64_t new_offset, void *ctx)
2991{
2992    TEST_INIT();
2993    fdb_status s;
2994    struct cb_args *args = (struct cb_args *)ctx;
2995    fdb_kvs_handle *snap_db;
2996    fdb_snapshot_info_t *markers;
2997    uint64_t num_markers;
2998    fdb_seqnum_t seqno;
2999
3000    (void) doc;
3001    (void) old_offset;
3002    (void) new_offset;
3003
3004    if (status == FDB_CS_MOVE_DOC) {
3005        TEST_CHK(kv_name);
3006    } else {
3007        TEST_CHK(!kv_name);
3008    }
3009
3010    // use snap markers
3011    s = fdb_get_all_snap_markers(fhandle, &markers, &num_markers);
3012    TEST_CHK(s == FDB_RESULT_SUCCESS);
3013    seqno = markers[0].kvs_markers[0].seqnum;
3014
3015    // snapshot open
3016    s = fdb_snapshot_open(args->handle, &snap_db, seqno);
3017    TEST_CHK(s==FDB_RESULT_SUCCESS);
3018    s = fdb_kvs_close(snap_db);
3019    TEST_CHK(s==FDB_RESULT_SUCCESS);
3020    return 0;
3021}
3022
3023void compact_with_snapshot_open_test()
3024{
3025  TEST_INIT();
3026  memleak_start();
3027
3028  int i, r;
3029  int n = 100000;
3030  char keybuf[256], bodybuf[256];
3031  fdb_file_handle *dbfile;
3032  fdb_kvs_handle *db, *db2, *snap_db;
3033  fdb_status s;
3034  fdb_config fconfig = fdb_get_default_config();
3035  fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3036  struct cb_args cb_args;
3037
3038  memset(&cb_args, 0x0, sizeof(struct cb_args));
3039  fconfig.wal_threshold = 1024;
3040  fconfig.flags = FDB_OPEN_FLAG_CREATE;
3041  fconfig.compaction_cb = compaction_cb_get;
3042  fconfig.compaction_cb_ctx = &cb_args;
3043  fconfig.compaction_cb_mask = FDB_CS_BEGIN |
3044                               FDB_CS_MOVE_DOC |
3045                               FDB_CS_FLUSH_WAL |
3046                               FDB_CS_END;
3047  // remove previous compact_test files
3048  r = system(SHELL_DEL" compact_test* > errorlog.txt");
3049  (void)r;
3050
3051  // open two handles for kvs
3052  fdb_open(&dbfile, "./compact_test1", &fconfig);
3053  fdb_kvs_open(dbfile, &db, "db", &kvs_config);
3054  fdb_kvs_open(dbfile, &db2, "db", &kvs_config);
3055  for (i=0;i<n;++i){
3056      sprintf(keybuf, "key%04d", i);
3057      sprintf(bodybuf, "body%04d", i);
3058      s = fdb_set_kv(db, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3059      TEST_CHK(s == FDB_RESULT_SUCCESS);
3060  }
3061
3062
3063  // commit
3064  s = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
3065  TEST_CHK(s == FDB_RESULT_SUCCESS);
3066
3067  // point compact callback handle to db2
3068  cb_args.handle = db2;
3069
3070  // compact
3071  s = fdb_compact(dbfile, NULL);
3072  TEST_CHK(s == FDB_RESULT_SUCCESS);
3073
3074  // open compaction end
3075  s = fdb_snapshot_open(db2, &snap_db, n);
3076  TEST_CHK(s == FDB_RESULT_SUCCESS);
3077
3078  fdb_kvs_close(snap_db);
3079  s = fdb_close(dbfile);
3080  TEST_CHK(s == FDB_RESULT_SUCCESS);
3081  s = fdb_shutdown();
3082  TEST_CHK(s == FDB_RESULT_SUCCESS);
3083  TEST_RESULT("compact with snapshot_open test");
3084}
3085
3086static int compaction_cb_markers(fdb_file_handle *fhandle,
3087                         fdb_compaction_status status, const char *kv_name,
3088                         fdb_doc *doc, uint64_t old_offset,
3089                         uint64_t new_offset, void *ctx)
3090{
3091    TEST_INIT();
3092    uint64_t i, j;
3093    fdb_status s;
3094    fdb_kvs_handle *db, *snap_db;
3095    fdb_snapshot_info_t *markers;
3096    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3097    fdb_seqnum_t seqno;
3098    uint64_t num_markers;
3099    struct cb_args *args = (struct cb_args *)ctx;
3100    uint64_t n = (uint64_t) args->n_moved_docs;
3101
3102    (void) status;
3103    (void) doc;
3104    (void) old_offset;
3105    (void) new_offset;
3106
3107    // get snap markers
3108    s = fdb_get_all_snap_markers(fhandle, &markers, &num_markers);
3109    TEST_CHK(s == FDB_RESULT_SUCCESS);
3110    TEST_CHK(markers[0].num_kvs_markers == 5);
3111    seqno = markers[0].kvs_markers[0].seqnum;
3112    TEST_CHK(seqno == n);
3113
3114    // snapshot open for each kvs for each marker
3115    for(j=0;j<num_markers;++j){
3116        for (i=0;i<(uint64_t)markers[j].num_kvs_markers;++i){
3117            // open kv for this marker
3118            fdb_kvs_open(fhandle, &db,
3119                         markers[j].kvs_markers[i].kv_store_name, &kvs_config);
3120            // snapshot the kv
3121            s = fdb_snapshot_open(db, &snap_db, seqno);
3122            TEST_CHK(s==FDB_RESULT_SUCCESS);
3123
3124            s = fdb_kvs_close(snap_db);
3125            TEST_CHK(s==FDB_RESULT_SUCCESS);
3126            s = fdb_kvs_close(db);
3127            TEST_CHK(s==FDB_RESULT_SUCCESS);
3128        }
3129    }
3130    return 0;
3131}
3132
3133void compact_with_snapshot_open_multi_kvs_test()
3134{
3135    TEST_INIT();
3136    memleak_start();
3137
3138    int i, j, r;
3139    int n = 1000;
3140    char keybuf[256], bodybuf[256];
3141    fdb_file_handle *dbfile;
3142    fdb_kvs_handle *db1, *db2, *db3, *db4, *db5;
3143    fdb_status s;
3144    fdb_config fconfig = fdb_get_default_config();
3145    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3146    struct cb_args cb_args;
3147
3148    memset(&cb_args, 0x0, sizeof(struct cb_args));
3149
3150    fconfig.wal_threshold = 1024;
3151    fconfig.flags = FDB_OPEN_FLAG_CREATE;
3152    fconfig.compaction_cb = compaction_cb_markers;
3153    fconfig.compaction_cb_ctx = &cb_args;
3154    fconfig.compaction_cb_mask = FDB_CS_BEGIN |
3155                                 FDB_CS_MOVE_DOC |
3156                                 FDB_CS_FLUSH_WAL |
3157                                 FDB_CS_END;
3158
3159    // remove previous compact_test files
3160    r = system(SHELL_DEL" compact_test* > errorlog.txt");
3161    (void)r;
3162
3163    // open two handles for kvs
3164    fdb_open(&dbfile, "./compact_test1", &fconfig);
3165    fdb_kvs_open(dbfile, &db1, "db1", &kvs_config);
3166    fdb_kvs_open(dbfile, &db2, "db2", &kvs_config);
3167    fdb_kvs_open(dbfile, &db3, "db3", &kvs_config);
3168    fdb_kvs_open(dbfile, &db4, "db4", &kvs_config);
3169    fdb_kvs_open(dbfile, &db5, "db5", &kvs_config);
3170    for (j=0;j<5;++j){
3171        for (i=0;i<n;++i){
3172            sprintf(keybuf, "key%04d", i);
3173            sprintf(bodybuf, "body%04d", i);
3174            fdb_set_kv(db1, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3175            fdb_set_kv(db2, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3176            fdb_set_kv(db3, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3177            fdb_set_kv(db4, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3178            fdb_set_kv(db5, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3179        }
3180
3181        // commit
3182        s = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
3183        TEST_CHK(s == FDB_RESULT_SUCCESS);
3184
3185        // compact
3186        cb_args.n_moved_docs = n*(j+1);
3187        s = fdb_compact(dbfile, NULL);
3188        TEST_CHK(s == FDB_RESULT_SUCCESS);
3189    }
3190
3191    s = fdb_close(dbfile);
3192    TEST_CHK(s == FDB_RESULT_SUCCESS);
3193    s = fdb_shutdown();
3194    TEST_CHK(s == FDB_RESULT_SUCCESS);
3195    TEST_RESULT("compact with snapshot_open multi kvs test");
3196}
3197
3198
3199static atomic_uint8_t cancel_test_signal_begin(0);
3200static atomic_uint8_t cancel_test_signal_end(0);
3201
3202static int cb_cancel_test(fdb_file_handle *fhandle,
3203                          fdb_compaction_status status, const char *kv_name,
3204                          fdb_doc *doc, uint64_t old_offset, uint64_t new_offset,
3205                          void *ctx)
3206{
3207    (void) fhandle;
3208    (void) kv_name;
3209    (void) doc;
3210    (void) old_offset;
3211    (void) new_offset;
3212    (void) ctx;
3213
3214    if (status == FDB_CS_BEGIN) {
3215        cancel_test_signal_begin = 1;
3216    } else if (status == FDB_CS_END) {
3217        cancel_test_signal_end = 1;
3218    }
3219
3220    return 0;
3221}
3222
3223void *db_compact_during_compaction_cancellation(void *args)
3224{
3225
3226    TEST_INIT();
3227
3228    fdb_file_handle *dbfile;
3229    fdb_status status;
3230    fdb_config config;
3231
3232    // Open Database File
3233    config = fdb_get_default_config();
3234    config.compaction_cb = cb_cancel_test;
3235    config.compaction_cb_ctx = NULL;
3236    config.compaction_cb_mask = FDB_CS_BEGIN | FDB_CS_END;
3237
3238    status = fdb_open(&dbfile, "compact_test", &config);
3239    TEST_CHK(status == FDB_RESULT_SUCCESS);
3240    // compaction thread enters here
3241    status = fdb_compact(dbfile, NULL);
3242    TEST_CHK(status == FDB_RESULT_SUCCESS ||
3243             status == FDB_RESULT_COMPACTION_CANCELLATION ||
3244             status == FDB_RESULT_FAIL_BY_ROLLBACK);
3245    fdb_close(dbfile);
3246
3247    // shutdown
3248    thread_exit(0);
3249    return NULL;
3250}
3251
3252typedef enum {
3253    COMPACTION_CANCEL_MODE = 0,
3254    COMPACTION_ROLLBACK_MODE = 1
3255} compaction_test_mode;
3256
3257void compaction_cancellation_test(compaction_test_mode mode)
3258{
3259    TEST_INIT();
3260
3261    memleak_start();
3262
3263    int i, r, n=100000;
3264    fdb_file_handle *file;
3265    fdb_kvs_handle *kvs;
3266    fdb_status status;
3267    fdb_config config;
3268    fdb_kvs_config kvs_config;
3269
3270    r = system(SHELL_DEL" compact_test* > errorlog.txt");
3271    (void)r;
3272
3273    // Open Database File
3274    config = fdb_get_default_config();
3275    status = fdb_open(&file, "compact_test", &config);
3276    TEST_CHK(status == FDB_RESULT_SUCCESS);
3277
3278    // Open KV Store
3279    kvs_config = fdb_get_default_kvs_config();
3280    status = fdb_kvs_open_default(file, &kvs, &kvs_config);
3281    TEST_CHK(status == FDB_RESULT_SUCCESS);
3282
3283    // Load kv pairs
3284    for(i=0;i<n;i++) {
3285        char str[15];
3286        sprintf(str, "%d", i);
3287        status = fdb_set_kv(kvs, str, strlen(str), (void*)"value", 5);
3288        TEST_CHK(status == FDB_RESULT_SUCCESS);
3289        // Commit every 100 SETs
3290        if (i % 100 == 0) {
3291            status = fdb_commit(file, FDB_COMMIT_NORMAL);
3292            TEST_CHK(status == FDB_RESULT_SUCCESS);
3293        }
3294    }
3295
3296    thread_t tid;
3297    void *thread_ret;
3298    bool rollback_failed = false;
3299
3300    thread_create(&tid, db_compact_during_compaction_cancellation, NULL);
3301
3302    // wait until compaction begins
3303    while (!cancel_test_signal_begin) {
3304        usleep(1000); // Sleep for 1ms
3305    }
3306
3307    if (mode == COMPACTION_ROLLBACK_MODE) {
3308        // append more mutations
3309        for(i=0;i<n/10;i++) {
3310            char str[15];
3311            sprintf(str, "%d", i);
3312            status = fdb_set_kv(kvs, str, strlen(str), (void*)"value", 5);
3313            TEST_CHK(status == FDB_RESULT_SUCCESS);
3314            // Commit every 100 SETs
3315            if (i % 100 == 0) {
3316                status = fdb_commit(file, FDB_COMMIT_NORMAL);
3317                TEST_CHK(status == FDB_RESULT_SUCCESS);
3318            }
3319        }
3320    }
3321
3322    if (mode == COMPACTION_CANCEL_MODE) {
3323        // Cancel the compaction task
3324        status = fdb_cancel_compaction(file);
3325        TEST_CHK(status == FDB_RESULT_SUCCESS);
3326    } else if (mode == COMPACTION_ROLLBACK_MODE) {
3327        // Rollback
3328
3329        // wait until the 1st phase is done
3330        while (!cancel_test_signal_end) {
3331            usleep(1000); // Sleep for 1ms
3332        }
3333
3334        status = fdb_rollback(&kvs, n/2 + 1);
3335        if (status != FDB_RESULT_SUCCESS) {
3336            // if compactor thread is done before reaching this line,
3337            // rollback may fail.
3338            rollback_failed = true;
3339        }
3340    }
3341    // join compactor
3342    thread_join(tid, &thread_ret);
3343
3344    // if rollback failed, we don't need to compact the file again.
3345    if (!rollback_failed) {
3346        // Compact the database
3347        status = fdb_compact(file, NULL);
3348        TEST_CHK(status == FDB_RESULT_SUCCESS);
3349    }
3350
3351    char key[15];
3352    void *value;
3353    size_t val_size;
3354
3355    for(i=0;i<n;i++) {
3356        sprintf(key, "%d", i);
3357        status = fdb_get_kv(kvs, key, strlen(key), &value, &val_size);
3358        if (mode == COMPACTION_CANCEL_MODE) {
3359            // compaction cancel mode: all docs should be retrieved
3360            TEST_CHK(status == FDB_RESULT_SUCCESS);
3361            fdb_free_block(value);
3362        } else if (mode == COMPACTION_ROLLBACK_MODE) {
3363            // rollback mode: only the first half docs should be retrieved
3364            if (i<=n/2 || rollback_failed) {
3365                // if rollback failed, all doc should be retrieved
3366                TEST_CHK(status == FDB_RESULT_SUCCESS);
3367                fdb_free_block(value);
3368            } else {
3369                TEST_CHK(status != FDB_RESULT_SUCCESS);
3370            }
3371        }
3372    }
3373
3374    status = fdb_close(file);
3375    TEST_CHK(status == FDB_RESULT_SUCCESS);
3376    status = fdb_shutdown();
3377    TEST_CHK(status == FDB_RESULT_SUCCESS);
3378
3379    memleak_end();
3380
3381    if (mode == COMPACTION_CANCEL_MODE) {
3382        TEST_RESULT("compaction cancellation test");
3383    } else if (mode == COMPACTION_ROLLBACK_MODE) {
3384        TEST_RESULT("rollback during the 2nd phase of compaction test");
3385    }
3386}
3387
3388void compact_upto_with_circular_reuse_test()
3389{
3390    TEST_INIT();
3391    int batch=100, n_batch=64, n_dbs=3, i, j, r, k, idx;
3392    int n_repeat = 4;
3393    fdb_file_handle *dbfile;
3394    fdb_kvs_handle *db[3];
3395    fdb_config config;
3396    fdb_kvs_config kvs_config;
3397    fdb_doc *doc;
3398    fdb_status s; (void)s;
3399    char keybuf[256], valuebuf[512];
3400
3401    memleak_start();
3402
3403    // remove previous dummy files
3404    r = system(SHELL_DEL" compact_test* > errorlog.txt");
3405    (void)r;
3406
3407    config = fdb_get_default_config();
3408    config.buffercache_size = 0;
3409    config.num_keeping_headers = 10;
3410    kvs_config = fdb_get_default_kvs_config();
3411
3412    // create a file
3413    s = fdb_open(&dbfile, "compact_test", &config);
3414    TEST_CHK(s == FDB_RESULT_SUCCESS);
3415
3416    s = fdb_kvs_open(dbfile, &db[0], NULL, &kvs_config);
3417    TEST_CHK(s == FDB_RESULT_SUCCESS);
3418
3419    s = fdb_kvs_open(dbfile, &db[1], "db1", &kvs_config);
3420    TEST_CHK(s == FDB_RESULT_SUCCESS);
3421
3422    s = fdb_kvs_open(dbfile, &db[2], "db2", &kvs_config);
3423    TEST_CHK(s == FDB_RESULT_SUCCESS);
3424
3425    memset(valuebuf, 'x', 256);
3426    valuebuf[256] = 0;
3427    for (k=0; k<n_repeat; ++k) {
3428        for (r=0; r<n_batch; ++r) {
3429            for (j=1; j<n_dbs; ++j) {
3430                for (i=0; i<batch; ++i) {
3431                    idx = r*batch + i;
3432                    sprintf(keybuf, "k%06d", idx);
3433                    sprintf(valuebuf, "v%d_%04d_%d", j, idx, k);
3434                    s = fdb_doc_create(&doc, keybuf, 8, NULL, 0, valuebuf, 257);
3435                    TEST_CHK(s == FDB_RESULT_SUCCESS);
3436                    s = fdb_set(db[j], doc);
3437                    TEST_CHK(s == FDB_RESULT_SUCCESS);
3438                    s = fdb_doc_free(doc);
3439                    TEST_CHK(s == FDB_RESULT_SUCCESS);
3440                }
3441            }
3442            s = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3443            TEST_CHK(s == FDB_RESULT_SUCCESS);
3444        }
3445    }
3446
3447    fdb_snapshot_info_t *markers_out;
3448    uint64_t num_markers;
3449
3450    s = fdb_get_all_snap_markers(dbfile, &markers_out, &num_markers);
3451    TEST_CHK(s == FDB_RESULT_SUCCESS);
3452
3453    // num_markers should be equal to or smaller than num_keeping_headers
3454    TEST_CHK(num_markers <= config.num_keeping_headers);
3455
3456    s = fdb_compact_upto(dbfile, "compact_test_compact", markers_out[5].marker);
3457    TEST_CHK(s == FDB_RESULT_SUCCESS);
3458
3459    s = fdb_free_snap_markers(markers_out, num_markers);
3460    TEST_CHK(s == FDB_RESULT_SUCCESS);
3461
3462    // all docs should be retrieved correclty.
3463    k = n_repeat - 1;
3464    for (r=0; r<n_batch; ++r) {
3465        for (j=1; j<n_dbs; ++j) {
3466            for (i=0; i<batch; ++i) {
3467                idx = r*batch + i;
3468                sprintf(keybuf, "k%06d", idx);
3469                sprintf(valuebuf, "v%d_%04d_%d", j, idx, k);
3470                s = fdb_doc_create(&doc, keybuf, 8, NULL, 0, NULL, 0);
3471                TEST_CHK(s == FDB_RESULT_SUCCESS);
3472                s = fdb_get(db[j], doc);
3473                TEST_CHK(s == FDB_RESULT_SUCCESS);
3474                TEST_CMP(doc->body, valuebuf, doc->bodylen);
3475                s = fdb_doc_free(doc);
3476                TEST_CHK(s == FDB_RESULT_SUCCESS);
3477            }
3478        }
3479        s = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
3480        TEST_CHK(s == FDB_RESULT_SUCCESS);
3481    }
3482
3483    s =