1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdint.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <unistd.h>
25#include <sys/types.h>
26#include <sys/stat.h>
27#endif
28
29#include "libforestdb/forestdb.h"
30#include "test.h"
31
32#include "internal_types.h"
33#include "wal.h"
34#include "functional_util.h"
35#include "filemgr.h"
36
37#undef THREAD_SANITIZER
38#if __clang__
39#   if defined(__has_feature) && __has_feature(thread_sanitizer)
40#define THREAD_SANITIZER
41#   endif
42#endif
43
44struct cb_args {
45    int n_moved_docs;
46    int n_batch_move;
47    bool begin;
48    bool end;
49    bool wal_flush;
50    fdb_kvs_handle *handle;
51};
52
53static fdb_compact_decision compaction_cb(fdb_file_handle *fhandle,
54                            fdb_compaction_status status, const char *kv_name,
55                            fdb_doc *doc, uint64_t old_offset,
56                            uint64_t new_offset,
57                            void *ctx)
58{
59    TEST_INIT();
60    fdb_doc *rdoc;
61    fdb_status s;
62    fdb_compact_decision ret = FDB_CS_KEEP_DOC;
63    struct cb_args *args = (struct cb_args *)ctx;
64
65    (void) doc;
66    (void) new_offset;
67
68    if (status == FDB_CS_BEGIN) {
69        args->begin = true;
70    } else if (status == FDB_CS_END) {
71        args->end = true;
72    } else if (status == FDB_CS_FLUSH_WAL) {
73        args->wal_flush = true;
74    } else if (status == FDB_CS_MOVE_DOC) {
75        if (doc->deleted) {
76            ret = FDB_CS_DROP_DOC;
77        }
78
79        args->n_moved_docs++;
80        fdb_doc_create(&rdoc, NULL, 0, NULL, 0, NULL, 0);
81        rdoc->offset = old_offset;
82        s = fdb_get_byoffset(args->handle, rdoc);
83        TEST_CHK(s == FDB_RESULT_SUCCESS);
84        fdb_doc_free(rdoc);
85
86        if (fhandle->root->config.multi_kv_instances) {
87            TEST_CMP(kv_name, "db", 2);
88        } else {
89            TEST_CMP(kv_name, "default", 7);
90        }
91    } else { // FDB_CS_BATCH_MOVE
92        args->n_batch_move++;
93        fdb_doc_create(&rdoc, NULL, 0, NULL, 0, NULL, 0);
94        rdoc->offset = old_offset;
95        s = fdb_get_byoffset(args->handle, rdoc);
96        TEST_CHK (s == FDB_RESULT_SUCCESS);
97        fdb_doc_free(rdoc);
98    }
99    return ret;
100}
101
102void compaction_callback_test(bool multi_kv)
103{
104    TEST_INIT();
105    memleak_start();
106
107    int i, r;
108    int n = 1000;
109    char keybuf[256], bodybuf[256];
110    fdb_file_handle *dbfile;
111    fdb_kvs_handle *db;
112    fdb_status s;
113    fdb_config fconfig = fdb_get_default_config();
114    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
115    struct cb_args cb_args;
116
117    memset(&cb_args, 0x0, sizeof(struct cb_args));
118    fconfig.wal_threshold = 1024;
119    fconfig.flags = FDB_OPEN_FLAG_CREATE;
120    fconfig.compaction_cb = compaction_cb;
121    fconfig.compaction_cb_ctx = &cb_args;
122    fconfig.compaction_cb_mask = FDB_CS_BEGIN |
123                                 FDB_CS_MOVE_DOC |
124                                 FDB_CS_FLUSH_WAL |
125                                 FDB_CS_END;
126    fconfig.multi_kv_instances = multi_kv;
127
128    // remove all previous compact_test files
129    r = system(SHELL_DEL" compact_test* > errorlog.txt");
130    (void)r;
131
132    // open db
133    fdb_open(&dbfile, "./compact_test1", &fconfig);
134    if (multi_kv) {
135        fdb_kvs_open(dbfile, &db, "db", &kvs_config);
136    } else {
137        fdb_kvs_open_default(dbfile, &db, &kvs_config);
138    }
139    cb_args.handle = db;
140
141    // write docs
142    for (i=0;i<n;++i){
143        sprintf(keybuf, "key%04d", i);
144        sprintf(bodybuf, "body%04d", i);
145        s = fdb_set_kv(db, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
146        TEST_CHK(s == FDB_RESULT_SUCCESS);
147    }
148    s = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
149    TEST_CHK(s == FDB_RESULT_SUCCESS);
150    s = fdb_compact(dbfile, "./compact_test2");
151    TEST_CHK(s == FDB_RESULT_SUCCESS);
152
153    TEST_CHK(cb_args.n_moved_docs == n);
154    TEST_CHK(cb_args.begin);
155    TEST_CHK(cb_args.end);
156    TEST_CHK(cb_args.wal_flush);
157    fdb_close(dbfile);
158
159    // open db without move doc
160    memset(&cb_args, 0x0, sizeof(struct cb_args));
161    fconfig.compaction_cb_mask = FDB_CS_BEGIN |
162                                 FDB_CS_FLUSH_WAL |
163                                 FDB_CS_END;
164    fdb_open(&dbfile, "./compact_test2", &fconfig);
165
166    if (multi_kv) {
167        fdb_kvs_open(dbfile, &db, "db", &kvs_config);
168    } else {
169        fdb_kvs_open_default(dbfile, &db, &kvs_config);
170    }
171    cb_args.handle = db;
172    s = fdb_compact(dbfile, "./compact_test3");
173    TEST_CHK(s == FDB_RESULT_SUCCESS);
174    TEST_CHK(cb_args.n_moved_docs == 0);
175    TEST_CHK(cb_args.begin);
176    TEST_CHK(cb_args.end);
177    TEST_CHK(cb_args.wal_flush);
178    fdb_close(dbfile);
179
180    // open db without wal_flush
181    memset(&cb_args, 0x0, sizeof(struct cb_args));
182    fconfig.compaction_cb_mask = FDB_CS_BEGIN |
183                                 FDB_CS_MOVE_DOC |
184                                 FDB_CS_END;
185    fdb_open(&dbfile, "./compact_test3", &fconfig);
186    if (multi_kv) {
187        fdb_kvs_open(dbfile, &db, "db", &kvs_config);
188    } else {
189        fdb_kvs_open_default(dbfile, &db, &kvs_config);
190    }
191    cb_args.handle = db;
192    s = fdb_compact(dbfile, "./compact_test4");
193    TEST_CHK(s == FDB_RESULT_SUCCESS);
194    TEST_CHK(cb_args.n_moved_docs == n);
195    TEST_CHK(cb_args.begin);
196    TEST_CHK(cb_args.end);
197    TEST_CHK(!cb_args.wal_flush);
198    fdb_close(dbfile);
199
200    // open db without begin/end
201    memset(&cb_args, 0x0, sizeof(struct cb_args));
202    fconfig.compaction_cb_mask = FDB_CS_MOVE_DOC |
203                                 FDB_CS_FLUSH_WAL;
204    fdb_open(&dbfile, "./compact_test4", &fconfig);
205    if (multi_kv) {
206        fdb_kvs_open(dbfile, &db, "db", &kvs_config);
207    } else {
208        fdb_kvs_open_default(dbfile, &db, &kvs_config);
209    }
210    cb_args.handle = db;
211    s = fdb_compact(dbfile, "./compact_test5");
212    TEST_CHK(s == FDB_RESULT_SUCCESS);
213    TEST_CHK(cb_args.n_moved_docs == n);
214    TEST_CHK(!cb_args.begin);
215    TEST_CHK(!cb_args.end);
216    TEST_CHK(cb_args.wal_flush);
217    fdb_close(dbfile);
218
219    // open db with batch move
220    memset(&cb_args, 0x0, sizeof(struct cb_args));
221    fconfig.compaction_cb_mask = FDB_CS_BATCH_MOVE;
222    fdb_open(&dbfile, "./compact_test5", &fconfig);
223    if (multi_kv) {
224        fdb_kvs_open(dbfile, &db, "db", &kvs_config);
225    } else {
226        fdb_kvs_open_default(dbfile, &db, &kvs_config);
227    }
228    cb_args.handle = db;
229    s = fdb_compact(dbfile, "./compact_test6");
230    TEST_CHK(s == FDB_RESULT_SUCCESS);
231    TEST_CHK(cb_args.n_moved_docs == 0);
232    TEST_CHK(cb_args.n_batch_move && cb_args.n_batch_move <= n);
233    TEST_CHK(!cb_args.begin);
234    TEST_CHK(!cb_args.end);
235    TEST_CHK(!cb_args.wal_flush);
236    fdb_close(dbfile);
237
238    fdb_shutdown();
239
240    memleak_end();
241    if (multi_kv) {
242        TEST_RESULT("compaction callback function multi kv mode test");
243    } else {
244        TEST_RESULT("compaction callback function single kv mode test");
245    }
246}
247
248
249void compaction_delete_old_test()
250{
251    TEST_INIT();
252
253    memleak_start();
254
255    int i, r;
256    int n = 3;
257    fdb_file_handle *dbfile;
258    fdb_kvs_handle *db;
259    fdb_doc **doc = alca(fdb_doc*, n);
260    fdb_doc *rdoc;
261    fdb_status status;
262
263    char keybuf[256], metabuf[256], bodybuf[256];
264
265    // create a directory
266    r = system(SHELL_MKDIR" original_dir > errorlog.txt");
267    (void)r;
268
269    // create a backup directory
270    r = system(SHELL_MKDIR" backup_dir > errorlog.txt");
271    (void)r;
272
273    // remove previous compact_test files
274    r = system(SHELL_DEL" ./original_dir/compact_test* > errorlog.txt");
275    (void)r;
276
277    // remove previous compact_test files
278    r = system(SHELL_DEL" ./backup_dir/compact_test* > errorlog.txt");
279    (void)r;
280
281
282    fdb_config fconfig = fdb_get_default_config();
283    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
284    fconfig.buffercache_size = 16777216;
285    fconfig.wal_threshold = 1024;
286    fconfig.flags = FDB_OPEN_FLAG_CREATE;
287    fconfig.compaction_threshold = 0;
288
289    // open db
290    fdb_open(&dbfile, "./original_dir/compact_test1", &fconfig);
291    fdb_kvs_open_default(dbfile, &db, &kvs_config);
292    status = fdb_set_log_callback(db, logCallbackFunc,
293                                  (void *) "compaction_delete_old_test");
294    TEST_CHK(status == FDB_RESULT_SUCCESS);
295
296    // insert documents
297    for (i=0;i<n;++i){
298        sprintf(keybuf, "key%d", i);
299        sprintf(metabuf, "meta%d", i);
300        sprintf(bodybuf, "body%d", i);
301        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
302            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
303        fdb_set(db, doc[i]);
304    }
305
306    // remove doc
307    fdb_doc_create(&rdoc, doc[1]->key, doc[1]->keylen, doc[1]->meta, doc[1]->metalen, NULL, 0);
308    rdoc->deleted = true;
309    fdb_set(db, rdoc);
310    fdb_doc_free(rdoc);
311
312    // commit
313    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
314
315    // perform compaction
316    fdb_compact(dbfile, (char *) "./original_dir/compact_test2");
317
318    // close db file
319    fdb_kvs_close(db);
320    status = fdb_close(dbfile);
321
322    // rename the compacted file back to original file
323    r = system(SHELL_MOVE " ./original_dir/compact_test2 ./original_dir/compact_test1 > errorlog.txt");
324    (void)r;
325
326    // backup the original file
327    r = system(SHELL_COPY " ./original_dir/compact_test1 ./backup_dir/compact_test1 > errorlog.txt");
328    (void)r;
329
330    fconfig.flags = 0;
331
332    // Now open the backup file which has the original file as the previous file.
333    fdb_open(&dbfile, "./backup_dir/compact_test1", &fconfig);
334    fdb_kvs_open_default(dbfile, &db, &kvs_config);
335    status = fdb_set_log_callback(db, logCallbackFunc,
336                                  (void *) "compaction_delete_old_test");
337    TEST_CHK(status == FDB_RESULT_SUCCESS);
338
339    // close db file
340    fdb_kvs_close(db);
341    fdb_close(dbfile);
342
343    // Now open the original file.  This file should not have been removed as an old
344    // compacted file when back up file was opened.
345    status = fdb_open(&dbfile, "./original_dir/compact_test1", &fconfig);
346    TEST_CHK(status == FDB_RESULT_SUCCESS);
347    fdb_kvs_open_default(dbfile, &db, &kvs_config);
348    status = fdb_set_log_callback(db, logCallbackFunc,
349                                  (void *) "compaction_delete_old_test");
350    TEST_CHK(status == FDB_RESULT_SUCCESS);
351
352    // free all resources
353    fdb_shutdown();
354
355    memleak_end();
356
357    TEST_RESULT("compaction delete old file test");
358}
359
360void compact_to_new_directory()
361{
362    TEST_INIT();
363
364    memleak_start();
365
366    int i, r;
367    int n = 3;
368    fdb_file_handle *dbfile;
369    fdb_kvs_handle *db;
370    fdb_doc **doc = alca(fdb_doc*, n);
371    fdb_doc *rdoc;
372    fdb_status status;
373
374    char keybuf[256], metabuf[256], bodybuf[256];
375    size_t fpos;
376
377    // create a directory
378    r = system(SHELL_MKDIR" original_dir > errorlog.txt");
379    (void)r;
380
381    // create a backup directory
382    r = system(SHELL_MKDIR" backup_dir > errorlog.txt");
383    (void)r;
384
385    // remove previous compact_test files
386    r = system(SHELL_DEL" ./original_dir/compact_test* > errorlog.txt");
387    (void)r;
388
389    // remove previous compact_test files
390    r = system(SHELL_DEL" ./backup_dir/compact_test* > errorlog.txt");
391    (void)r;
392
393
394    fdb_config fconfig = fdb_get_default_config();
395    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
396    fconfig.buffercache_size = 16777216;
397    fconfig.wal_threshold = 1024;
398    fconfig.flags = FDB_OPEN_FLAG_CREATE;
399    fconfig.compaction_threshold = 0;
400
401    // open db
402    fdb_open(&dbfile, "./original_dir/compact_test1", &fconfig);
403    fdb_kvs_open_default(dbfile, &db, &kvs_config);
404    status = fdb_set_log_callback(db, logCallbackFunc,
405                                  (void *) "compaction_delete_old_test");
406    TEST_CHK(status == FDB_RESULT_SUCCESS);
407
408    // insert documents
409    for (i=0;i<n;++i){
410        sprintf(keybuf, "key%d", i);
411        sprintf(metabuf, "meta%d", i);
412        sprintf(bodybuf, "body%d", i);
413        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
414            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
415        fdb_set(db, doc[i]);
416    }
417
418    // remove doc
419    fdb_doc_create(&rdoc, doc[1]->key, doc[1]->keylen, doc[1]->meta, doc[1]->metalen, NULL, 0);
420    rdoc->deleted = true;
421    fdb_set(db, rdoc);
422    fdb_doc_free(rdoc);
423
424    // commit
425    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
426
427    fpos = filemgr_get_pos(db->file);
428    // perform compaction
429    fdb_compact(dbfile, (char *) "./backup_dir/compact_test2");
430    // close db file
431    fdb_kvs_close(db);
432    status = fdb_close(dbfile);
433
434    // open new compacted db
435    fdb_open(&dbfile, "./backup_dir/compact_test1", &fconfig);
436    fdb_kvs_open_default(dbfile, &db, &kvs_config);
437    status = fdb_set_log_callback(db, logCallbackFunc,
438                 (void *) "compaction_delete_old_test");
439    TEST_CHK(status == FDB_RESULT_SUCCESS);
440    TEST_CHK(fpos > filemgr_get_pos(db->file));
441
442    // close db file
443    fdb_kvs_close(db);
444    status = fdb_close(dbfile);
445
446    // free all resources
447    fdb_shutdown();
448
449    memleak_end();
450
451    TEST_RESULT("compaction delete old file test");
452}
453
454void compact_rename_to_original_test()
455{
456    TEST_INIT();
457
458    memleak_start();
459
460    int i, r;
461    int n = 3;
462    fdb_file_handle *dbfile;
463    fdb_kvs_handle *db;
464    fdb_doc **doc = alca(fdb_doc*, n);
465    fdb_doc *rdoc;
466    fdb_status status;
467
468    char keybuf[256], metabuf[256], bodybuf[256];
469
470    // create a directory
471    r = system(SHELL_MKDIR" original_dir > errorlog.txt");
472    (void)r;
473
474    // remove previous compact_test files
475    r = system(SHELL_DEL" ./original_dir/compact_test* > errorlog.txt");
476    (void)r;
477
478    fdb_config fconfig = fdb_get_default_config();
479    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
480    fconfig.buffercache_size = 16777216;
481    fconfig.wal_threshold = 1024;
482    fconfig.flags = FDB_OPEN_FLAG_CREATE;
483    fconfig.compaction_threshold = 0;
484
485    // open db using a directory in pwd
486    fdb_open(&dbfile, "./original_dir/compact_test1", &fconfig);
487    fdb_kvs_open_default(dbfile, &db, &kvs_config);
488    status = fdb_set_log_callback(db, logCallbackFunc,
489                                  (void *) "compact_rename_to_original_test");
490    TEST_CHK(status == FDB_RESULT_SUCCESS);
491
492    // insert documents
493    for (i=0;i<n;++i){
494        sprintf(keybuf, "key%d", i);
495        sprintf(metabuf, "meta%d", i);
496        sprintf(bodybuf, "body%d", i);
497        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
498            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
499        fdb_set(db, doc[i]);
500    }
501
502    // remove doc
503    fdb_doc_create(&rdoc, doc[1]->key, doc[1]->keylen, doc[1]->meta, doc[1]->metalen, NULL, 0);
504    rdoc->deleted = true;
505    fdb_set(db, rdoc);
506    fdb_doc_free(rdoc);
507
508    // commit
509    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
510
511    // perform compaction
512    fdb_compact(dbfile, (char *) "./original_dir/compact_test.compact");
513
514    // close db file
515    fdb_kvs_close(db);
516    fdb_close(dbfile);
517
518    // rename the compacted file back to original file
519    r = system(SHELL_MOVE " ./original_dir/compact_test.compact ./original_dir/compact_test1 > errorlog.txt");
520    (void)r;
521
522    fconfig.flags = 0;
523
524    // Now open the original file.
525    status = fdb_open(&dbfile, "./original_dir/compact_test1", &fconfig);
526    TEST_CHK(status == FDB_RESULT_SUCCESS);
527    fdb_kvs_open_default(dbfile, &db, &kvs_config);
528    status = fdb_set_log_callback(db, logCallbackFunc,
529                                  (void *) "compact_rename_to_original_test");
530    TEST_CHK(status == FDB_RESULT_SUCCESS);
531
532    // free all resources
533    fdb_shutdown();
534
535    memleak_end();
536
537    TEST_RESULT("compaction rename to old file test");
538}
539
540void compact_wo_reopen_test()
541{
542    TEST_INIT();
543
544    memleak_start();
545
546    int i, r;
547    int n = 3;
548    fdb_file_handle *dbfile, *dbfile_new;
549    fdb_kvs_handle *db;
550    fdb_kvs_handle *db_new;
551    fdb_doc **doc = alca(fdb_doc*, n);
552    fdb_doc *rdoc;
553    fdb_status status;
554
555    char keybuf[256], metabuf[256], bodybuf[256];
556
557    // remove previous compact_test files
558    r = system(SHELL_DEL" compact_test* > errorlog.txt");
559    (void)r;
560
561    fdb_config fconfig = fdb_get_default_config();
562    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
563    fconfig.buffercache_size = 16777216;
564    fconfig.wal_threshold = 1024;
565    fconfig.flags = FDB_OPEN_FLAG_CREATE;
566    fconfig.compaction_threshold = 0;
567
568    // open db
569    fdb_open(&dbfile, "./compact_test1", &fconfig);
570    fdb_kvs_open_default(dbfile, &db, &kvs_config);
571    status = fdb_set_log_callback(db, logCallbackFunc,
572                                  (void *) "compact_wo_reopen_test");
573    TEST_CHK(status == FDB_RESULT_SUCCESS);
574    fdb_open(&dbfile_new, "./compact_test1", &fconfig);
575    fdb_kvs_open_default(dbfile_new, &db_new, &kvs_config);
576    status = fdb_set_log_callback(db_new, logCallbackFunc,
577                                  (void *) "compact_wo_reopen_test");
578    TEST_CHK(status == FDB_RESULT_SUCCESS);
579
580    // insert documents
581    for (i=0;i<n;++i){
582        sprintf(keybuf, "key%d", i);
583        sprintf(metabuf, "meta%d", i);
584        sprintf(bodybuf, "body%d", i);
585        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
586            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
587        fdb_set(db, doc[i]);
588    }
589
590    // remove doc
591    fdb_doc_create(&rdoc, doc[1]->key, doc[1]->keylen, doc[1]->meta, doc[1]->metalen, NULL, 0);
592    rdoc->deleted = true;
593    fdb_set(db, rdoc);
594    fdb_doc_free(rdoc);
595
596    // commit
597    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
598
599    // perform compaction using one handle
600    fdb_compact(dbfile, (char *) "./compact_test2");
601
602    // retrieve documents using the other handle without close/re-open
603    for (i=0;i<n;++i){
604        // search by key
605        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
606        status = fdb_get(db_new, rdoc);
607
608        if (i != 1) {
609            TEST_CHK(status == FDB_RESULT_SUCCESS);
610            TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
611            TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
612        }else{
613            TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
614        }
615
616        // free result document
617        fdb_doc_free(rdoc);
618    }
619    // check the other handle's filename
620    fdb_file_info info;
621    fdb_get_file_info(dbfile_new, &info);
622    TEST_CHK(!strcmp("./compact_test2", info.filename));
623
624    // free all documents
625    for (i=0;i<n;++i){
626        fdb_doc_free(doc[i]);
627    }
628
629    // close db file
630    fdb_kvs_close(db);
631    fdb_kvs_close(db_new);
632    fdb_close(dbfile);
633    fdb_close(dbfile_new);
634
635    // free all resources
636    fdb_shutdown();
637
638    memleak_end();
639
640    TEST_RESULT("compaction without reopen test");
641}
642
643void compact_with_reopen_test()
644{
645    TEST_INIT();
646
647    memleak_start();
648
649    int i, r;
650    int n = 100;
651    fdb_file_handle *dbfile;
652    fdb_kvs_handle *db;
653    fdb_doc **doc = alca(fdb_doc*, n);
654    fdb_doc *rdoc;
655    fdb_status status;
656
657    char keybuf[256], metabuf[256], bodybuf[256], temp[256];
658
659    // remove previous compact_test files
660    r = system(SHELL_DEL" compact_test* > errorlog.txt");
661    (void)r;
662
663    fdb_config fconfig = fdb_get_default_config();
664    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
665    fconfig.buffercache_size = 16777216;
666    fconfig.wal_threshold = 1024;
667    fconfig.flags = FDB_OPEN_FLAG_CREATE;
668    fconfig.compaction_threshold = 0;
669
670    // open db
671    fdb_open(&dbfile, "./compact_test1", &fconfig);
672    fdb_kvs_open_default(dbfile, &db, &kvs_config);
673    status = fdb_set_log_callback(db, logCallbackFunc,
674                                  (void *) "compact_with_reopen_test");
675    TEST_CHK(status == FDB_RESULT_SUCCESS);
676
677    // insert documents
678    for (i=0;i<n;++i){
679        sprintf(keybuf, "key%d", i);
680        sprintf(metabuf, "meta%d", i);
681        sprintf(bodybuf, "body%d", i);
682        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
683            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
684        fdb_set(db, doc[i]);
685    }
686
687    // remove doc
688    fdb_doc_create(&rdoc, doc[1]->key, doc[1]->keylen, doc[1]->meta, doc[1]->metalen, NULL, 0);
689    rdoc->deleted = true;
690    fdb_set(db, rdoc);
691    fdb_doc_free(rdoc);
692
693    // commit
694    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
695
696    // perform compaction using one handle
697    fdb_compact(dbfile, (char *) "./compact_test2");
698
699    // close db file
700    fdb_kvs_close(db);
701    fdb_close(dbfile);
702
703    r = system(SHELL_MOVE " compact_test2 compact_test1 > errorlog.txt");
704    (void)r;
705    fdb_open(&dbfile, "./compact_test1", &fconfig);
706    fdb_kvs_open_default(dbfile, &db, &kvs_config);
707    status = fdb_set_log_callback(db, logCallbackFunc,
708                                  (void *) "compact_with_reopen_test");
709    TEST_CHK(status == FDB_RESULT_SUCCESS);
710
711    // retrieve documents using the other handle without close/re-open
712    for (i=0;i<n;++i){
713        // search by key
714        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
715        status = fdb_get(db, rdoc);
716
717        if (i != 1) {
718            TEST_CHK(status == FDB_RESULT_SUCCESS);
719            TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
720            TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
721        }else{
722            TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
723        }
724
725        // free result document
726        fdb_doc_free(rdoc);
727    }
728    // check the other handle's filename
729    fdb_file_info info;
730    fdb_get_file_info(dbfile, &info);
731    TEST_CHK(!strcmp("./compact_test1", info.filename));
732
733    // update documents
734    for (i=0;i<n;++i){
735        sprintf(metabuf, "newmeta%d", i);
736        sprintf(bodybuf, "newbody%d_%s", i, temp);
737        fdb_doc_update(&doc[i], (void *)metabuf, strlen(metabuf),
738                       (void *)bodybuf, strlen(bodybuf));
739        fdb_set(db, doc[i]);
740    }
741
742    // Open the database with another handle.
743    fdb_file_handle *second_dbfile;
744    fdb_kvs_handle *second_dbh;
745    fdb_open(&second_dbfile, "./compact_test1", &fconfig);
746    fdb_kvs_open_default(second_dbfile, &second_dbh, &kvs_config);
747    status = fdb_set_log_callback(second_dbh, logCallbackFunc,
748                                  (void *) "compact_with_reopen_test");
749    TEST_CHK(status == FDB_RESULT_SUCCESS);
750    // In-place compactions with a handle still open on the first old file
751    status = fdb_compact(dbfile, NULL);
752    TEST_CHK(status == FDB_RESULT_SUCCESS);
753
754    status = fdb_compact(dbfile, NULL);
755    TEST_CHK(status == FDB_RESULT_SUCCESS);
756
757    // MB-12977: retest compaction again..
758    status = fdb_compact(dbfile, NULL);
759    TEST_CHK(status == FDB_RESULT_SUCCESS);
760
761    fdb_kvs_close(db);
762    fdb_close(dbfile);
763
764    for (i=0;i<n;++i){
765        // search by key
766        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
767        status = fdb_get(second_dbh, rdoc);
768        TEST_CHK(status == FDB_RESULT_SUCCESS);
769        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
770        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
771        // free result document
772        fdb_doc_free(rdoc);
773    }
774
775    // Open database with an original name.
776    status = fdb_open(&dbfile, "./compact_test1", &fconfig);
777    TEST_CHK(status == FDB_RESULT_SUCCESS);
778    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
779    TEST_CHK(status == FDB_RESULT_SUCCESS);
780    status = fdb_set_log_callback(db, logCallbackFunc,
781                                  (void *) "compact_with_reopen_test");
782    TEST_CHK(status == FDB_RESULT_SUCCESS);
783    fdb_get_file_info(dbfile, &info);
784    // The actual file name should be a compacted one.
785    TEST_CHK(!strcmp("./compact_test1.3", info.filename));
786
787    fdb_kvs_close(second_dbh);
788    fdb_close(second_dbfile);
789
790    for (i=0;i<n;++i){
791        // search by key
792        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
793        status = fdb_get(db, rdoc);
794        TEST_CHK(status == FDB_RESULT_SUCCESS);
795        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
796        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
797        // free result document
798        fdb_doc_free(rdoc);
799    }
800
801    fdb_compact(dbfile, NULL);
802    fdb_kvs_close(db);
803    fdb_close(dbfile);
804
805    r = system(SHELL_MOVE " compact_test1 compact_test.fdb > errorlog.txt");
806    (void)r;
807    fdb_open(&dbfile, "./compact_test.fdb", &fconfig);
808    fdb_kvs_open_default(dbfile, &db, &kvs_config);
809    status = fdb_set_log_callback(db, logCallbackFunc,
810                                  (void *) "compact_with_reopen_test");
811    TEST_CHK(status == FDB_RESULT_SUCCESS);
812    // In-place compaction
813    fdb_compact(dbfile, NULL);
814    fdb_kvs_close(db);
815    fdb_close(dbfile);
816    // Open database with an original name.
817    status = fdb_open(&dbfile, "./compact_test.fdb", &fconfig);
818    TEST_CHK(status == FDB_RESULT_SUCCESS);
819    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
820    TEST_CHK(status == FDB_RESULT_SUCCESS);
821    status = fdb_set_log_callback(db, logCallbackFunc,
822                                  (void *) "compact_with_reopen_test");
823    TEST_CHK(status == FDB_RESULT_SUCCESS);
824    fdb_get_file_info(dbfile, &info);
825    TEST_CHK(!strcmp("./compact_test.fdb", info.filename));
826    TEST_CHK(info.doc_count == 100);
827
828    // free all documents
829    for (i=0;i<n;++i){
830        fdb_doc_free(doc[i]);
831    }
832
833    fdb_kvs_close(db);
834    fdb_close(dbfile);
835
836    // free all resources
837    fdb_shutdown();
838
839    memleak_end();
840
841    TEST_RESULT("compaction with reopen test");
842}
843
844void compact_reopen_named_kvs()
845{
846    TEST_INIT();
847
848    memleak_start();
849
850    int i, r;
851    int n = 100;
852    int nkvdocs;
853    fdb_file_handle *dbfile;
854    fdb_kvs_handle *db;
855    fdb_doc **doc = alca(fdb_doc*, n);
856    fdb_status status;
857    fdb_kvs_info kvs_info;
858
859    char keybuf[256], metabuf[256], bodybuf[256];
860
861    // remove previous compact_test files
862    r = system(SHELL_DEL" compact_test* > errorlog.txt");
863    (void)r;
864
865    fdb_config fconfig = fdb_get_default_config();
866    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
867    fconfig.wal_threshold = 1024;
868    fconfig.flags = FDB_OPEN_FLAG_CREATE;
869    fconfig.compaction_threshold = 0;
870
871    // open db
872    fdb_open(&dbfile, "./compact_test1", &fconfig);
873    fdb_kvs_open(dbfile, &db, "db",  &kvs_config);
874
875    status = fdb_set_log_callback(db, logCallbackFunc,
876                                  (void *) "compact_reopen_named_kvs");
877    TEST_CHK(status == FDB_RESULT_SUCCESS);
878
879    for (i=0;i<n;++i){
880        sprintf(keybuf, "key%d", i);
881        sprintf(metabuf, "meta%d", i);
882        sprintf(bodybuf, "body%d", i);
883        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
884            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
885        fdb_set(db, doc[i]);
886    }
887
888    // commit
889    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
890
891    // compact
892    fdb_compact(dbfile, NULL);
893
894    // save ndocs
895    fdb_get_kvs_info(db, &kvs_info);
896    nkvdocs = kvs_info.doc_count;
897
898    // close db file
899    fdb_kvs_close(db);
900    fdb_close(dbfile);
901
902    // reopen
903    fdb_open(&dbfile, "./compact_test1", &fconfig);
904    fdb_kvs_open(dbfile, &db, "db",  &kvs_config);
905
906    // verify kvs stats
907    fdb_get_kvs_info(db, &kvs_info);
908    TEST_CHK((uint64_t)nkvdocs == kvs_info.doc_count);
909
910    fdb_kvs_close(db);
911    fdb_close(dbfile);
912
913    // free all documents
914    for (i=0;i<n;++i){
915        fdb_doc_free(doc[i]);
916    }
917    fdb_shutdown();
918
919    memleak_end();
920
921    TEST_RESULT("compact reopen named kvs");
922}
923
924void compact_reopen_with_iterator()
925{
926    TEST_INIT();
927
928    memleak_start();
929
930    int i, r;
931    int n = 9;
932    int count = 0;
933    int nkvdocs;
934    fdb_file_handle *dbfile, *compact_file;
935    fdb_kvs_handle *db;
936    fdb_doc **doc = alca(fdb_doc*, n);
937    fdb_doc *rdoc = NULL;
938    fdb_status status;
939    fdb_kvs_info kvs_info;
940    fdb_iterator *iterator;
941
942    char keybuf[256], metabuf[256], bodybuf[256];
943
944    // remove previous compact_test files
945    r = system(SHELL_DEL" compact_test* > errorlog.txt");
946    (void)r;
947
948    fdb_config fconfig = fdb_get_default_config();
949    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
950    fconfig.wal_threshold = 1024;
951    fconfig.flags = FDB_OPEN_FLAG_CREATE;
952    fconfig.compaction_threshold = 0;
953
954    // open db
955    fdb_open(&dbfile, "./compact_test1", &fconfig);
956    fdb_kvs_open(dbfile, &db, "db",  &kvs_config);
957
958    status = fdb_set_log_callback(db, logCallbackFunc,
959                                  (void *) "compact_reopen_with_iterator");
960    TEST_CHK(status == FDB_RESULT_SUCCESS);
961
962    for (i=0;i<n;++i){
963        sprintf(keybuf, "key%d", i);
964        sprintf(metabuf, "meta%d", i);
965        sprintf(bodybuf, "body%d", i);
966        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
967            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
968        fdb_set(db, doc[i]);
969    }
970
971    // commit
972    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
973
974    // MB-13859: compact the file using a separate handle..
975    fdb_open(&compact_file, "./compact_test1", &fconfig);
976    // compact
977    status = fdb_compact(compact_file, "./compact_test2");
978    TEST_CHK(status == FDB_RESULT_SUCCESS);
979    // close file after compaction to make the new_file's ref count 0
980    fdb_close(compact_file);
981
982    i = 0;
983    status = fdb_iterator_init(db, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
984    TEST_CHK(status == FDB_RESULT_SUCCESS);
985    do {
986        status = fdb_iterator_get(iterator, &rdoc);
987        TEST_CHK(status == FDB_RESULT_SUCCESS);
988
989        TEST_CMP(rdoc->key, doc[i]->key, rdoc->keylen);
990        TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
991        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
992
993        fdb_doc_free(rdoc);
994        rdoc = NULL;
995        i++;
996        count++;
997    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
998
999    TEST_CHK(count==n);
1000
1001
1002    status = fdb_iterator_close(iterator);
1003    TEST_CHK(status == FDB_RESULT_SUCCESS);
1004
1005    // save ndocs
1006    fdb_get_kvs_info(db, &kvs_info);
1007    nkvdocs = kvs_info.doc_count;
1008
1009    // close db file
1010    fdb_kvs_close(db);
1011    fdb_close(dbfile);
1012
1013    // reopen
1014    fdb_open(&dbfile, "./compact_test2", &fconfig);
1015    fdb_kvs_open(dbfile, &db, "db",  &kvs_config);
1016
1017    // verify kvs stats
1018    fdb_get_kvs_info(db, &kvs_info);
1019    TEST_CHK((uint64_t)nkvdocs == kvs_info.doc_count);
1020
1021    fdb_kvs_close(db);
1022    fdb_close(dbfile);
1023
1024    // free all documents
1025    for (i=0;i<n;++i){
1026        fdb_doc_free(doc[i]);
1027    }
1028    fdb_shutdown();
1029
1030    memleak_end();
1031
1032    TEST_RESULT("compact reopen with iterator");
1033}
1034
1035void estimate_space_upto_test(bool multi_kv)
1036{
1037    TEST_INIT();
1038
1039    memleak_start();
1040
1041    int i, j, r;
1042    int n = 300;
1043    int num_kvs = 4; // keep this the same as number of fdb_commit() calls
1044    fdb_file_handle *dbfile;
1045    fdb_kvs_handle **db = alca(fdb_kvs_handle *, num_kvs);
1046    fdb_doc **doc = alca(fdb_doc*, n);
1047    fdb_status status;
1048    fdb_snapshot_info_t *markers;
1049    uint64_t num_markers;
1050    size_t space_used;
1051
1052    char keybuf[256], metabuf[256], bodybuf[256];
1053    char kv_name[8];
1054
1055    fdb_config fconfig = fdb_get_default_config();
1056    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1057    fconfig.buffercache_size = 0;
1058    fconfig.wal_threshold = 50;
1059    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1060    fconfig.compaction_threshold = 0;
1061    fconfig.multi_kv_instances = multi_kv;
1062    fconfig.block_reusing_threshold = 0;
1063
1064    // remove previous compact_test files
1065    r = system(SHELL_DEL" compact_test* > errorlog.txt");
1066    (void)r;
1067
1068    // open db
1069    fdb_open(&dbfile, "./compact_test1", &fconfig);
1070    if (multi_kv) {
1071        for (r = 0; r < num_kvs; ++r) {
1072            sprintf(kv_name, "kv%d", r);
1073            fdb_kvs_open(dbfile, &db[r], kv_name, &kvs_config);
1074        }
1075    } else {
1076        num_kvs = 1;
1077        fdb_kvs_open_default(dbfile, &db[0], &kvs_config);
1078    }
1079
1080   // ------- Setup test ----------------------------------
1081   // insert first quarter of documents
1082    for (i=0; i<n/4; i++){
1083        sprintf(keybuf, "key%d", i);
1084        sprintf(metabuf, "meta%d", i);
1085        sprintf(bodybuf, "body%d", i);
1086        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1087            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1088        for (r = 0; r < num_kvs; ++r) {
1089            fdb_set(db[r], doc[i]);
1090        }
1091    }
1092
1093    // commit with a manual WAL flush (these docs go into HB-trie)
1094    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1095
1096    // insert second quarter of documents
1097    for (; i < n/2; i++){
1098        sprintf(keybuf, "key%d", i);
1099        sprintf(metabuf, "meta%d", i);
1100        sprintf(bodybuf, "body%d", i);
1101        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1102            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1103        for (r = 0; r < num_kvs; ++r) {
1104            fdb_set(db[r], doc[i]);
1105        }
1106    }
1107    // Update first quarter of documents again..
1108    for (j = 0; j < n/4; j++){
1109        for (r = 0; r < num_kvs; ++r) {
1110            fdb_set(db[r], doc[j]);
1111        }
1112    }
1113    // Update first quarter of documents again overwriting previous update..
1114    for (j = 0; j < n/4; j++){
1115        for (r = 0; r < num_kvs; ++r) {
1116            fdb_set(db[r], doc[j]);
1117        }
1118    }
1119
1120    // commit again without a WAL flush (some of these docs remain in WAL)
1121    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1122
1123    // insert third quarter of documents
1124    for (; i < (n/2 + n/4); i++){
1125        sprintf(keybuf, "key%d", i);
1126        sprintf(metabuf, "meta%d", i);
1127        sprintf(bodybuf, "body%d", i);
1128        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1129            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1130        for (r = 0; r < num_kvs; ++r) {
1131            fdb_set(db[r], doc[i]);
1132        }
1133    }
1134    // manually flush WAL & commit (these docs go into HB-trie)
1135    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1136
1137    // insert fourth quarter of documents
1138    for (; i < n; i++){
1139        sprintf(keybuf, "key%d", i);
1140        sprintf(metabuf, "meta%d", i);
1141        sprintf(bodybuf, "body%d", i);
1142        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1143            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1144        for (r = 0; r < num_kvs; ++r) {
1145            fdb_set(db[r], doc[i]);
1146        }
1147    }
1148    // commit without a WAL flush (some of these docs remain in the WAL)
1149    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1150
1151    for (r = 0; r < num_kvs; ++r) {
1152        status = fdb_set_log_callback(db[r], logCallbackFunc,
1153                                      (void *) "estimate_space_upto_test");
1154        TEST_CHK(status == FDB_RESULT_SUCCESS);
1155    }
1156
1157    status = fdb_get_all_snap_markers(dbfile, &markers, &num_markers);
1158    TEST_CHK(status == FDB_RESULT_SUCCESS);
1159
1160    if (!multi_kv) {
1161        size_t space_used2;
1162        TEST_CHK(num_markers == 4);
1163        space_used = fdb_estimate_space_used_from(dbfile, markers[1].marker);
1164        space_used2 = fdb_estimate_space_used_from(dbfile, markers[2].marker);
1165        TEST_CHK(space_used2 > space_used); // greater than space used by just 1
1166    } else {
1167        size_t space_used2;
1168        TEST_CHK(num_markers == 8);
1169        space_used = fdb_estimate_space_used_from(dbfile, markers[1].marker);
1170        space_used2 = fdb_estimate_space_used_from(dbfile, markers[2].marker);
1171        TEST_CHK(space_used2 > space_used); // greater than space used by just 1
1172    }
1173
1174    status = fdb_free_snap_markers(markers, num_markers);
1175    TEST_CHK(status == FDB_RESULT_SUCCESS);
1176
1177    // close db file
1178    fdb_close(dbfile);
1179
1180    // free all documents
1181    for (i=0;i<n;++i){
1182        fdb_doc_free(doc[i]);
1183    }
1184
1185    // free all resources
1186    fdb_shutdown();
1187
1188    memleak_end();
1189
1190    sprintf(bodybuf, "estimate space upto marker in file test %s", multi_kv ?
1191                                                           "multiple kv mode:"
1192                                                         : "single kv mode:");
1193    TEST_RESULT(bodybuf);
1194}
1195
1196void compact_upto_test(bool multi_kv)
1197{
1198    TEST_INIT();
1199
1200    memleak_start();
1201
1202    int i, r;
1203    int n = 20;
1204    int num_kvs = 4; // keep this the same as number of fdb_commit() calls
1205    fdb_file_handle *dbfile;
1206    fdb_kvs_handle **db = alca(fdb_kvs_handle *, num_kvs);
1207    fdb_doc **doc = alca(fdb_doc*, n);
1208    fdb_status status;
1209    fdb_snapshot_info_t *markers;
1210    fdb_kvs_handle *snapshot;
1211    uint64_t num_markers;
1212
1213    char keybuf[256], metabuf[256], bodybuf[256];
1214    char kv_name[8];
1215    char compact_filename[32];
1216
1217    fdb_config fconfig = fdb_get_default_config();
1218    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1219    fconfig.buffercache_size = 0;
1220    fconfig.wal_threshold = 1024;
1221    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1222    fconfig.compaction_threshold = 0;
1223    fconfig.multi_kv_instances = multi_kv;
1224    // since this test requires static number of markers,
1225    // disable block reusing
1226    fconfig.block_reusing_threshold = 0;
1227
1228    // remove previous compact_test files
1229    r = system(SHELL_DEL" compact_test* > errorlog.txt");
1230    (void)r;
1231
1232    // open db
1233    fdb_open(&dbfile, "./compact_test1", &fconfig);
1234    if (multi_kv) {
1235        for (r = 0; r < num_kvs; ++r) {
1236            sprintf(kv_name, "kv%d", r);
1237            fdb_kvs_open(dbfile, &db[r], kv_name, &kvs_config);
1238        }
1239    } else {
1240        num_kvs = 1;
1241        fdb_kvs_open_default(dbfile, &db[0], &kvs_config);
1242    }
1243
1244   // ------- Setup test ----------------------------------
1245   // insert documents of 0-4
1246    for (i=0; i<n/4; i++){
1247        sprintf(keybuf, "key%d", i);
1248        sprintf(metabuf, "meta%d", i);
1249        sprintf(bodybuf, "body%d", i);
1250        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1251            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1252        for (r = 0; r < num_kvs; ++r) {
1253            fdb_set(db[r], doc[i]);
1254        }
1255    }
1256
1257    // commit with a manual WAL flush (these docs go into HB-trie)
1258    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1259
1260    // insert documents from 5 - 9
1261    for (; i < n/2; i++){
1262        sprintf(keybuf, "key%d", i);
1263        sprintf(metabuf, "meta%d", i);
1264        sprintf(bodybuf, "body%d", i);
1265        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1266            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1267        for (r = 0; r < num_kvs; ++r) {
1268            fdb_set(db[r], doc[i]);
1269        }
1270    }
1271
1272    // commit again without a WAL flush
1273    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1274
1275    // insert documents from 10-14 into HB-trie
1276    for (; i < (n/2 + n/4); i++){
1277        sprintf(keybuf, "key%d", i);
1278        sprintf(metabuf, "meta%d", i);
1279        sprintf(bodybuf, "body%d", i);
1280        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1281            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1282        for (r = 0; r < num_kvs; ++r) {
1283            fdb_set(db[r], doc[i]);
1284        }
1285    }
1286    // manually flush WAL & commit
1287    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1288
1289    // insert documents from 15 - 19 on file into the WAL
1290    for (; i < n; i++){
1291        sprintf(keybuf, "key%d", i);
1292        sprintf(metabuf, "meta%d", i);
1293        sprintf(bodybuf, "body%d", i);
1294        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1295            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1296        for (r = 0; r < num_kvs; ++r) {
1297            fdb_set(db[r], doc[i]);
1298        }
1299    }
1300    // commit without a WAL flush
1301    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1302
1303    for (r = 0; r < num_kvs; ++r) {
1304        status = fdb_set_log_callback(db[r], logCallbackFunc,
1305                                      (void *) "compact_upto_test");
1306        TEST_CHK(status == FDB_RESULT_SUCCESS);
1307    }
1308
1309    status = fdb_get_all_snap_markers(dbfile, &markers, &num_markers);
1310    TEST_CHK(status == FDB_RESULT_SUCCESS);
1311
1312    if (!multi_kv) {
1313        TEST_CHK(num_markers == 4);
1314        for (r = 0; (uint64_t)r < num_markers; ++r) {
1315            TEST_CHK(markers[r].num_kvs_markers == 1);
1316            TEST_CHK(markers[r].kvs_markers[0].seqnum ==
1317                     (fdb_seqnum_t)(n - r*5));
1318        }
1319        r = 1; // Test compacting upto sequence number 15
1320        sprintf(compact_filename, "compact_test_compact%d", r);
1321        status = fdb_compact_upto(dbfile, compact_filename,
1322                                  markers[r].marker);
1323        TEST_CHK(status == FDB_RESULT_SUCCESS);
1324        // create a snapshot
1325        status = fdb_snapshot_open(db[0], &snapshot,
1326                                   markers[r].kvs_markers[0].seqnum);
1327        TEST_CHK(status == FDB_RESULT_SUCCESS);
1328        // close snapshot
1329        fdb_kvs_close(snapshot);
1330    } else {
1331        TEST_CHK(num_markers == 8);
1332        for (r = 0; r < num_kvs; ++r) {
1333            TEST_CHK(markers[r].num_kvs_markers == num_kvs);
1334            for (i = 0; i < num_kvs; ++i) {
1335                TEST_CHK(markers[r].kvs_markers[i].seqnum
1336                         == (fdb_seqnum_t)(n - r*5));
1337                sprintf(kv_name, "kv%d", i);
1338                TEST_CMP(markers[r].kvs_markers[i].kv_store_name, kv_name, 3);
1339            }
1340        }
1341        i = r = 1;
1342        sprintf(compact_filename, "compact_test_compact%d", i);
1343        status = fdb_compact_upto(dbfile, compact_filename,
1344                markers[i].marker);
1345        TEST_CHK(status == FDB_RESULT_SUCCESS);
1346        // create a snapshot
1347        status = fdb_snapshot_open(db[r], &snapshot,
1348                markers[i].kvs_markers[r].seqnum);
1349        TEST_CHK(status == FDB_RESULT_SUCCESS);
1350        // close snapshot
1351        fdb_kvs_close(snapshot);
1352    }
1353
1354    status = fdb_free_snap_markers(markers, num_markers);
1355    TEST_CHK(status == FDB_RESULT_SUCCESS);
1356
1357    // close db file
1358    fdb_close(dbfile);
1359
1360    // free all documents
1361    for (i=0;i<n;++i){
1362        fdb_doc_free(doc[i]);
1363    }
1364
1365    // free all resources
1366    fdb_shutdown();
1367
1368    memleak_end();
1369
1370    sprintf(bodybuf, "compact upto marker in file test %s", multi_kv ?
1371                                                           "multiple kv mode:"
1372                                                         : "single kv mode:");
1373    TEST_RESULT(bodybuf);
1374}
1375
1376void auto_recover_compact_ok_test()
1377{
1378    TEST_INIT();
1379
1380    memleak_start();
1381
1382    int i, r;
1383    int n = 3;
1384    fdb_file_handle *dbfile, *dbfile_new;
1385    fdb_kvs_handle *db;
1386    fdb_kvs_handle *db_new;
1387    fdb_doc **doc = alca(fdb_doc *, n);
1388    fdb_doc *rdoc;
1389    fdb_status status;
1390
1391    char keybuf[256], metabuf[256], bodybuf[256];
1392
1393    // remove previous compact_test files
1394    r = system(SHELL_DEL " compact_test* > errorlog.txt");
1395    (void)r;
1396
1397    fdb_config fconfig = fdb_get_default_config();
1398    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1399    fconfig.buffercache_size = 16777216;
1400    fconfig.wal_threshold = 1024;
1401    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1402    fconfig.compaction_threshold = 0;
1403
1404    // open db
1405    fdb_open(&dbfile, "./compact_test1", &fconfig);
1406    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1407    status = fdb_set_log_callback(db, logCallbackFunc,
1408                                  (void *) "auto_recover_compact_ok_test");
1409    TEST_CHK(status == FDB_RESULT_SUCCESS);
1410    fdb_open(&dbfile_new, "./compact_test1", &fconfig);
1411    fdb_kvs_open_default(dbfile_new, &db_new, &kvs_config);
1412    status = fdb_set_log_callback(db_new, logCallbackFunc,
1413                                  (void *) "auto_recover_compact_ok_test");
1414    TEST_CHK(status == FDB_RESULT_SUCCESS);
1415
1416    // insert first two documents
1417    for (i=0;i<2;++i){
1418        sprintf(keybuf, "key%d", i);
1419        sprintf(metabuf, "meta%d", i);
1420        sprintf(bodybuf, "body%d", i);
1421        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1422            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1423        fdb_set(db, doc[i]);
1424    }
1425
1426    // remove second doc
1427    fdb_doc_create(&rdoc, doc[1]->key, doc[1]->keylen, doc[1]->meta, doc[1]->metalen, NULL, 0);
1428    rdoc->deleted = true;
1429    fdb_set(db, rdoc);
1430    fdb_doc_free(rdoc);
1431
1432    // commit
1433    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1434
1435    // perform compaction using one handle
1436    fdb_compact(dbfile, (char *) "./compact_test2");
1437
1438    // save the old file after compaction is done ..
1439    r = system(SHELL_COPY " compact_test1 compact_test11 > errorlog.txt");
1440    (void)r;
1441
1442    // now insert third doc: it should go to the newly compacted file.
1443    sprintf(keybuf, "key%d", i);
1444    sprintf(metabuf, "meta%d", i);
1445    sprintf(bodybuf, "body%d", i);
1446    fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1447        (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1448    fdb_set(db, doc[i]);
1449
1450    // commit
1451    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1452
1453    // close both the db files ...
1454    fdb_kvs_close(db);
1455    fdb_kvs_close(db_new);
1456    fdb_close(dbfile);
1457    fdb_close(dbfile_new);
1458
1459    // restore the old file after close is done ..
1460    r = system(SHELL_MOVE " compact_test11 compact_test1 > errorlog.txt");
1461    (void)r;
1462
1463    // now open the old saved compacted file, it should automatically recover
1464    // and use the new file since compaction was done successfully
1465    fdb_open(&dbfile_new, "./compact_test1", &fconfig);
1466    fdb_kvs_open_default(dbfile_new, &db_new, &kvs_config);
1467    status = fdb_set_log_callback(db_new, logCallbackFunc,
1468                                  (void *) "auto_recover_compact_ok_test");
1469    TEST_CHK(status == FDB_RESULT_SUCCESS);
1470
1471    // retrieve documents using the old handle and expect all 3 docs
1472    for (i=0;i<n;++i){
1473        // search by key
1474        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1475        status = fdb_get(db_new, rdoc);
1476
1477        if (i != 1) {
1478            TEST_CHK(status == FDB_RESULT_SUCCESS);
1479            TEST_CMP(rdoc->meta, doc[i]->meta, rdoc->metalen);
1480            TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1481        }else{
1482            TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
1483        }
1484
1485        // free result document
1486        fdb_doc_free(rdoc);
1487    }
1488    // check this handle's filename it should point to newly compacted file
1489    fdb_file_info info;
1490    fdb_get_file_info(dbfile_new, &info);
1491    TEST_CHK(!strcmp("./compact_test2", info.filename));
1492
1493    // close the file
1494    fdb_kvs_close(db_new);
1495    fdb_close(dbfile_new);
1496
1497    // free all documents
1498    for (i=0;i<n;++i){
1499        fdb_doc_free(doc[i]);
1500    }
1501
1502    // free all resources
1503    fdb_shutdown();
1504
1505    memleak_end();
1506
1507    TEST_RESULT("auto recovery after compaction test");
1508}
1509
1510#if !defined(WIN32) && !defined(_WIN32)
1511static bool does_file_exist(const char *filename) {
1512    struct stat st;
1513    int result = stat(filename, &st);
1514    return result == 0;
1515}
1516#else
1517static bool does_file_exist(const char *filename) {
1518    return GetFileAttributes(filename) != INVALID_FILE_ATTRIBUTES;
1519}
1520#endif
1521
1522void unlink_after_compaction_test()
1523{
1524    TEST_INIT();
1525    memleak_start();
1526
1527    int i, r;
1528    int n = 10;
1529    fdb_file_handle *dbfile;
1530    fdb_kvs_handle *db, *snap;
1531    fdb_doc **doc = alca(fdb_doc *, n);
1532    fdb_doc *rdoc;
1533    fdb_status s;
1534
1535    char keybuf[256], metabuf[256], bodybuf[256];
1536
1537    // remove previous compact_test files
1538    r = system(SHELL_DEL " compact_test* > errorlog.txt");
1539    (void)r;
1540
1541    fdb_config fconfig = fdb_get_default_config();
1542    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1543    fconfig.buffercache_size = 16777216;
1544    fconfig.wal_threshold = 1024;
1545    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1546    fconfig.compaction_threshold = 0;
1547
1548    // open db
1549    fdb_open(&dbfile, "./compact_test1", &fconfig);
1550    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1551    s = fdb_set_log_callback(db, logCallbackFunc,
1552                             (void *) "unlink_after_compaction_test");
1553    TEST_CHK(s == FDB_RESULT_SUCCESS);
1554
1555    // set docs
1556    for (i=0;i<n;++i){
1557        sprintf(keybuf, "key%d", i);
1558        sprintf(metabuf, "meta%d", i);
1559        sprintf(bodybuf, "body%d", i);
1560        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1561            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1562        fdb_set(db, doc[i]);
1563    }
1564
1565    // commit
1566    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1567
1568    // create a snapshot
1569    s = fdb_snapshot_open(db, &snap, n);
1570    TEST_CHK(s == FDB_RESULT_SUCCESS);
1571
1572    // compaction
1573    s = fdb_compact(dbfile, "./compact_test2");
1574    TEST_CHK(s == FDB_RESULT_SUCCESS);
1575
1576    // retrieve check on the new file
1577    for (i=0;i<n;++i){
1578        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1579        s = fdb_get(db, rdoc);
1580        TEST_CHK(s == FDB_RESULT_SUCCESS);
1581        fdb_doc_free(rdoc);
1582    }
1583
1584    // the old file should not be seen by user level application
1585#if !defined(WIN32) && !defined(_WIN32)
1586#ifndef _MSC_VER
1587    TEST_CHK(!does_file_exist("./compact_test1"));
1588#endif
1589#endif
1590
1591    // retrieve check on the old file (snapshot)
1592    for (i=0;i<n;++i){
1593        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1594        s = fdb_get(snap, rdoc);
1595        TEST_CHK(s == FDB_RESULT_SUCCESS);
1596        fdb_doc_free(rdoc);
1597    }
1598
1599    fdb_close(dbfile);
1600
1601    fdb_shutdown();
1602    for (i=0; i<n; ++i){
1603        fdb_doc_free(doc[i]);
1604    }
1605
1606    memleak_end();
1607    TEST_RESULT("unlink after compaction test");
1608}
1609
1610void db_compact_overwrite()
1611{
1612    TEST_INIT();
1613    memleak_start();
1614
1615    int i, r;
1616    int n = 30;
1617    fdb_file_handle *dbfile, *dbfile2;
1618    fdb_kvs_handle *db, *db2;
1619    fdb_doc **doc = alca(fdb_doc *, n);
1620    fdb_doc **doc2 = alca(fdb_doc *, 2*n);
1621    fdb_doc *rdoc;
1622    fdb_status status;
1623    fdb_config fconfig;
1624    fdb_kvs_info kvs_info;
1625    fdb_kvs_config kvs_config;
1626
1627    char keybuf[256], metabuf[256], bodybuf[256];
1628
1629    // remove previous compact_test files
1630    r = system(SHELL_DEL " compact_test* > errorlog.txt");
1631    (void)r;
1632
1633    fconfig = fdb_get_default_config();
1634    kvs_config = fdb_get_default_kvs_config();
1635    fconfig.buffercache_size = 16777216;
1636    fconfig.wal_threshold = 1024;
1637    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1638    fconfig.compaction_threshold = 0;
1639
1640    // write to db1
1641    fdb_open(&dbfile, "./compact_test1", &fconfig);
1642    fdb_kvs_open(dbfile, &db, NULL, &kvs_config);
1643    status = fdb_set_log_callback(db, logCallbackFunc,
1644                                  (void *) "db_destroy_test");
1645    TEST_CHK(status == FDB_RESULT_SUCCESS);
1646    for (i=0;i<n;++i){
1647        sprintf(keybuf, "key%d", i);
1648        sprintf(metabuf, "meta%d", i);
1649        sprintf(bodybuf, "body%d", i);
1650        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1651            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1652        fdb_set(db, doc[i]);
1653    }
1654    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1655
1656    // Open the empty db with future compact name
1657    fdb_open(&dbfile2, "./compact_test1.1", &fconfig);
1658    fdb_kvs_open(dbfile2, &db2, NULL, &kvs_config);
1659    status = fdb_set_log_callback(db2, logCallbackFunc,
1660                                  (void *) "db_destroy_test");
1661    TEST_CHK(status == FDB_RESULT_SUCCESS);
1662    // write to db2
1663    for (i=0;i < 2*n;++i){
1664        sprintf(keybuf, "k2ey%d", i);
1665        sprintf(metabuf, "m2eta%d", i);
1666        sprintf(bodybuf, "b2ody%d", i);
1667        fdb_doc_create(&doc2[i], (void*)keybuf, strlen(keybuf),
1668            (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1669        fdb_set(db2, doc2[i]);
1670    }
1671    fdb_commit(dbfile2, FDB_COMMIT_NORMAL);
1672
1673
1674    // verify db2 seqnum and close
1675    fdb_get_kvs_info(db2, &kvs_info);
1676    TEST_CHK(kvs_info.last_seqnum = 2*n);
1677    fdb_kvs_close(db2);
1678    fdb_close(dbfile2);
1679
1680    // compact db1
1681    status = fdb_compact(dbfile, NULL);
1682    TEST_CHK(status == FDB_RESULT_SUCCESS);
1683
1684    // close db1
1685    fdb_kvs_close(db);
1686    fdb_close(dbfile);
1687
1688    // reopen db1
1689    fdb_open(&dbfile, "./compact_test1", &fconfig);
1690    fdb_kvs_open(dbfile, &db, NULL, &kvs_config);
1691    status = fdb_set_log_callback(db, logCallbackFunc,
1692                                  (void *) "db_destroy_test");
1693    TEST_CHK(status == FDB_RESULT_SUCCESS);
1694    // read db1
1695    for (i=0;i<n;++i){
1696        // search by key
1697        fdb_doc_create(&rdoc, doc[i]->key, doc[i]->keylen, NULL, 0, NULL, 0);
1698        status = fdb_get(db, rdoc);
1699        TEST_CHK(status == FDB_RESULT_SUCCESS);
1700        TEST_CHK(!memcmp(rdoc->meta, doc[i]->meta, rdoc->metalen));
1701        TEST_CHK(!memcmp(rdoc->body, doc[i]->body, rdoc->bodylen));
1702        // free result document
1703        fdb_doc_free(rdoc);
1704    }
1705
1706    // reopen db2
1707    fdb_open(&dbfile2, "./compact_test1.1", &fconfig);
1708    fdb_kvs_open(dbfile2, &db2, NULL, &kvs_config);
1709    status = fdb_set_log_callback(db2, logCallbackFunc,
1710                                  (void *) "db_destroy_test");
1711    TEST_CHK(status == FDB_RESULT_SUCCESS);
1712
1713    fdb_get_kvs_info(db2, &kvs_info);
1714    TEST_CHK(kvs_info.last_seqnum = 2*n);
1715
1716    // read db2
1717    for (i=0;i<2*n;++i){
1718        // search by key
1719        fdb_doc_create(&rdoc, doc2[i]->key, doc2[i]->keylen, NULL, 0, NULL, 0);
1720        status = fdb_get(db2, rdoc);
1721        TEST_CHK(status == FDB_RESULT_SUCCESS);
1722        TEST_CHK(!memcmp(rdoc->meta, doc2[i]->meta, rdoc->metalen));
1723        TEST_CHK(!memcmp(rdoc->body, doc2[i]->body, rdoc->bodylen));
1724        // free result document
1725        fdb_doc_free(rdoc);
1726    }
1727
1728    // free all documents
1729    for (i=0;i<n;++i){
1730        fdb_doc_free(doc[i]);
1731        fdb_doc_free(doc2[i]);
1732    }
1733    for (i=n;i<2*n;++i){
1734        fdb_doc_free(doc2[i]);
1735    }
1736
1737    fdb_kvs_close(db);
1738    fdb_close(dbfile);
1739    fdb_kvs_close(db2);
1740    fdb_close(dbfile2);
1741
1742
1743    fdb_shutdown();
1744    memleak_end();
1745    TEST_RESULT("compact overwrite");
1746}
1747
1748void *db_compact_during_doc_delete(void *args)
1749{
1750
1751    TEST_INIT();
1752    memleak_start();
1753
1754    int i, r;
1755    int n = 100;
1756    fdb_file_handle *dbfile;
1757    fdb_kvs_handle *db;
1758    fdb_status status;
1759    fdb_config fconfig;
1760    fdb_kvs_config kvs_config;
1761    fdb_kvs_info kvs_info;
1762    thread_t tid;
1763    void *thread_ret;
1764    fdb_doc **doc = alca(fdb_doc*, n);
1765    char keybuf[256], metabuf[256], bodybuf[256];
1766
1767    // init dbfile
1768    kvs_config = fdb_get_default_kvs_config();
1769    fconfig = fdb_get_default_config();
1770    fconfig.buffercache_size = 0;
1771    fconfig.wal_threshold = 1024;
1772    fconfig.compaction_threshold = 0;
1773
1774    if (args == NULL)
1775    { // parent
1776
1777        r = system(SHELL_DEL" compact_test* > errorlog.txt");
1778        (void)r;
1779
1780        status = fdb_open(&dbfile, "./compact_test1", &fconfig);
1781        TEST_CHK(status == FDB_RESULT_SUCCESS);
1782        status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
1783        TEST_CHK(status == FDB_RESULT_SUCCESS);
1784
1785        // insert documents
1786        for (i=0;i<n;++i){
1787            sprintf(keybuf, "key%d", i);
1788            sprintf(metabuf, "meta%d", i);
1789            sprintf(bodybuf, "body%d", i);
1790            fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf),
1791                (void*)metabuf, strlen(metabuf), (void*)bodybuf, strlen(bodybuf));
1792            fdb_set(db, doc[i]);
1793        }
1794
1795        fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1796        // verify no docs remaining
1797        fdb_get_kvs_info(db, &kvs_info);
1798        TEST_CHK(kvs_info.doc_count == (uint64_t)n);
1799
1800        // start deleting docs
1801        for (i=0;i<n;++i){
1802            fdb_del(db, doc[i]);
1803            if (i == n/2){
1804                // compact half-way
1805                thread_create(&tid, db_compact_during_doc_delete, (void *)dbfile);
1806            }
1807        }
1808
1809        // join compactor
1810        thread_join(tid, &thread_ret);
1811
1812        fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1813
1814        // verify no docs remaining
1815        fdb_get_kvs_info(db, &kvs_info);
1816        TEST_CHK(kvs_info.doc_count == 0);
1817
1818        // reopen
1819        fdb_kvs_close(db);
1820        fdb_close(dbfile);
1821        status = fdb_open(&dbfile, "./compact_test1", &fconfig);
1822        TEST_CHK(status == FDB_RESULT_SUCCESS);
1823        status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
1824        TEST_CHK(status == FDB_RESULT_SUCCESS);
1825
1826        fdb_get_kvs_info(db, &kvs_info);
1827        TEST_CHK(kvs_info.doc_count == 0);
1828
1829        // cleanup
1830        for (i=0;i<n;++i){
1831            fdb_doc_free(doc[i]);
1832        }
1833        fdb_kvs_close(db);
1834        fdb_close(dbfile);
1835        fdb_shutdown();
1836
1837        memleak_end();
1838        TEST_RESULT("multi thread client shutdown");
1839        return NULL;
1840    }
1841
1842    // compaction thread enters here //
1843    status = fdb_open(&dbfile, "./compact_test1", &fconfig);
1844    TEST_CHK(status == FDB_RESULT_SUCCESS);
1845    status = fdb_compact(dbfile, NULL);
1846    TEST_CHK(status == FDB_RESULT_SUCCESS);
1847    fdb_close(dbfile);
1848
1849    // shutdown
1850    thread_exit(0);
1851    return NULL;
1852}
1853
1854void compaction_daemon_test(size_t time_sec)
1855{
1856    TEST_INIT();
1857
1858    memleak_start();
1859
1860    int i, r;
1861    int n = 10000;
1862    int compaction_threshold = 30;
1863    int escape = 0;
1864    fdb_file_handle *dbfile, *dbfile_less, *dbfile_non, *dbfile_manual, *dbfile_new;
1865    fdb_kvs_handle *db, *db_less, *db_non, *db_manual;
1866    fdb_kvs_handle *snapshot;
1867    fdb_doc **doc = alca(fdb_doc*, n);
1868    fdb_doc *rdoc;
1869    fdb_file_info info;
1870    fdb_status status;
1871    struct timeval ts_begin, ts_cur, ts_gap;
1872
1873    char keybuf[256], metabuf[256], bodybuf[256];
1874
1875    // remove previous compact_test files
1876    r = system(SHELL_DEL" compact_test* > errorlog.txt");
1877    (void)r;
1878
1879    fdb_config fconfig = fdb_get_default_config();
1880    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1881    fconfig.buffercache_size = 0;
1882    fconfig.wal_threshold = 1024;
1883    fconfig.flags = FDB_OPEN_FLAG_CREATE;
1884    fconfig.compaction_mode = FDB_COMPACTION_AUTO;
1885    fconfig.compaction_threshold = compaction_threshold;
1886    fconfig.compactor_sleep_duration = 1; // for quick test
1887
1888    fconfig.num_compactor_threads = 0;
1889    status = fdb_open(&dbfile, "compact_test", &fconfig);
1890    TEST_CHK(status == FDB_RESULT_INVALID_CONFIG);
1891
1892    fconfig.num_compactor_threads = DEFAULT_NUM_COMPACTOR_THREADS;
1893    // open db
1894    fdb_open(&dbfile, "compact_test", &fconfig);
1895    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1896    status = fdb_set_log_callback(db, logCallbackFunc,
1897                                  (void *) "compaction_daemon_test");
1898    TEST_CHK(status == FDB_RESULT_SUCCESS);
1899    // insert documents
1900    printf("Initialize..\n");
1901    for (i=0;i<n;++i){
1902        sprintf(keybuf, "key%04d", i);
1903        sprintf(metabuf, "meta%04d", i);
1904        sprintf(bodybuf, "body%04d", i);
1905        fdb_doc_create(&doc[i], (void*)keybuf, strlen(keybuf)+1,
1906            (void*)metabuf, strlen(metabuf)+1, (void*)bodybuf, strlen(bodybuf)+1);
1907        fdb_set(db, doc[i]);
1908    }
1909    // commit
1910    fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1911    // close db file
1912    fdb_close(dbfile);
1913
1914    // ---- basic retrieve test ------------------------
1915    // reopen db file
1916    status = fdb_open(&dbfile, "compact_test", &fconfig);
1917    TEST_CHK(status == FDB_RESULT_SUCCESS);
1918    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
1919    TEST_CHK(status == FDB_RESULT_SUCCESS);
1920    status = fdb_set_log_callback(db, logCallbackFunc,
1921                                  (void *) "compaction_daemon_test");
1922    TEST_CHK(status == FDB_RESULT_SUCCESS);
1923    // check db filename
1924    fdb_get_file_info(dbfile, &info);
1925    TEST_CHK(!strcmp(info.filename, "compact_test"));
1926
1927    // retrieve documents
1928    for (i=0;i<n;++i){
1929        sprintf(keybuf, "key%04d", i);
1930        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf)+1,
1931            NULL, 0, NULL, 0);
1932        status = fdb_get(db, rdoc);
1933        //printf("%s %s\n", rdoc->key, rdoc->body);
1934        TEST_CHK(status == FDB_RESULT_SUCCESS);
1935        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1936        fdb_doc_free(rdoc);
1937    }
1938
1939    // create a snapshot
1940    status = fdb_snapshot_open(db, &snapshot, n);
1941    TEST_CHK(status == FDB_RESULT_SUCCESS);
1942    // close snapshot
1943    fdb_kvs_close(snapshot);
1944
1945    // close db file
1946    fdb_close(dbfile);
1947
1948    // ---- handling when metafile is removed ------------
1949    // remove meta file
1950    r = system(SHELL_DEL" compact_test.meta > errorlog.txt");
1951    (void)r;
1952    // reopen db file
1953    status = fdb_open(&dbfile, "compact_test", &fconfig);
1954    TEST_CHK(status == FDB_RESULT_SUCCESS);
1955    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
1956    TEST_CHK(status == FDB_RESULT_SUCCESS);
1957    status = fdb_set_log_callback(db, logCallbackFunc,
1958                                  (void *) "compaction_daemon_test");
1959    TEST_CHK(status == FDB_RESULT_SUCCESS);
1960    // retrieve documents
1961    for (i=0;i<n;++i){
1962        sprintf(keybuf, "key%04d", i);
1963        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf)+1,
1964            NULL, 0, NULL, 0);
1965        status = fdb_get(db, rdoc);
1966        //printf("%s %s\n", rdoc->key, rdoc->body);
1967        TEST_CHK(status == FDB_RESULT_SUCCESS);
1968        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1969        fdb_doc_free(rdoc);
1970    }
1971    // close db file
1972    fdb_close(dbfile);
1973
1974    // ---- handling when metafile points to non-exist file ------------
1975    // remove meta file
1976    r = system(SHELL_MOVE" compact_test.0 compact_test.23 > errorlog.txt");
1977    (void)r;
1978    // reopen db file
1979    status = fdb_open(&dbfile, "compact_test", &fconfig);
1980    TEST_CHK(status == FDB_RESULT_SUCCESS);
1981    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
1982    TEST_CHK(status == FDB_RESULT_SUCCESS);
1983    status = fdb_set_log_callback(db, logCallbackFunc,
1984                                  (void *) "compaction_daemon_test");
1985    TEST_CHK(status == FDB_RESULT_SUCCESS);
1986    // retrieve documents
1987    for (i=0;i<n;++i){
1988        sprintf(keybuf, "key%04d", i);
1989        fdb_doc_create(&rdoc, (void*)keybuf, strlen(keybuf)+1,
1990            NULL, 0, NULL, 0);
1991        status = fdb_get(db, rdoc);
1992        //printf("%s %s\n", rdoc->key, rdoc->body);
1993        TEST_CHK(status == FDB_RESULT_SUCCESS);
1994        TEST_CMP(rdoc->body, doc[i]->body, rdoc->bodylen);
1995        fdb_doc_free(rdoc);
1996    }
1997    // close db file
1998    fdb_close(dbfile);
1999
2000    // ---- compaction daemon test -------------------
2001    // db: DB instance to be compacted
2002    // db_less: DB instance to be compacted but with much lower update throughput
2003    // db_non: DB instance not to be compacted (auto compaction with threshold = 0)
2004    // db_manual: DB instance not to be compacted (manual compaction)
2005
2006    // open & create db_less, db_non and db_manual
2007    status = fdb_open(&dbfile_less, "compact_test_less", &fconfig);
2008    TEST_CHK(status == FDB_RESULT_SUCCESS);
2009    status = fdb_kvs_open_default(dbfile_less, &db_less, &kvs_config);
2010    TEST_CHK(status == FDB_RESULT_SUCCESS);
2011
2012    fconfig.compaction_threshold = 0;
2013    status = fdb_open(&dbfile_non, "compact_test_non", &fconfig);
2014    TEST_CHK(status == FDB_RESULT_SUCCESS);
2015    status = fdb_kvs_open_default(dbfile_non, &db_non, &kvs_config);
2016    TEST_CHK(status == FDB_RESULT_SUCCESS);
2017
2018    fconfig.compaction_mode = FDB_COMPACTION_MANUAL;
2019    status = fdb_open(&dbfile_manual, "compact_test_manual", &fconfig);
2020    TEST_CHK(status == FDB_RESULT_SUCCESS);
2021    status = fdb_kvs_open_default(dbfile_manual, &db_manual, &kvs_config);
2022    TEST_CHK(status == FDB_RESULT_SUCCESS);
2023
2024    // reopen db file
2025    fconfig.compaction_threshold = 30;
2026    fconfig.compaction_mode = FDB_COMPACTION_AUTO;
2027    status = fdb_open(&dbfile, "compact_test", &fconfig);
2028    TEST_CHK(status == FDB_RESULT_SUCCESS);
2029    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
2030    TEST_CHK(status == FDB_RESULT_SUCCESS);
2031    status = fdb_set_log_callback(db, logCallbackFunc,
2032                                  (void *) "compaction_daemon_test");
2033    TEST_CHK(status == FDB_RESULT_SUCCESS);
2034    // continuously update documents
2035    printf("wait for %d seconds..\n", (int)time_sec);
2036    gettimeofday(&ts_begin, NULL);
2037    while (!escape) {
2038        for (i=0;i<n;++i){
2039            // update db
2040            fdb_set(db, doc[i]);
2041            fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2042
2043            // update db_less (1/100 throughput)
2044            if (i%100 == 0){
2045                fdb_set(db_less, doc[i]);
2046                fdb_commit(dbfile_less, FDB_COMMIT_NORMAL);
2047            }
2048
2049            // update db_non
2050            fdb_set(db_non, doc[i]);
2051            fdb_commit(dbfile_non, FDB_COMMIT_NORMAL);
2052
2053            // update db_manual
2054            fdb_set(db_manual, doc[i]);
2055            fdb_commit(dbfile_manual, FDB_COMMIT_NORMAL);
2056
2057            gettimeofday(&ts_cur, NULL);
2058            ts_gap = _utime_gap(ts_begin, ts_cur);
2059            if ((size_t)ts_gap.tv_sec >= time_sec) {
2060                escape = 1;
2061                break;
2062            }
2063        }
2064    }
2065
2066    // Change the compaction interval to 60 secs.
2067    status = fdb_set_daemon_compaction_interval(dbfile, 60);
2068    TEST_CHK(status == FDB_RESULT_SUCCESS);
2069    // Change the compaction interval back to 1 sec.
2070    status = fdb_set_daemon_compaction_interval(dbfile, 1);
2071    TEST_CHK(status == FDB_RESULT_SUCCESS);
2072
2073    // perform manual compaction of auto-compact file
2074    status = fdb_compact(dbfile_non, NULL);
2075    TEST_CHK(status == FDB_RESULT_SUCCESS);
2076
2077    // perform manual compaction of manual-compact file
2078    status = fdb_compact(dbfile_manual, "compact_test_manual_compacted");
2079    TEST_CHK(status == FDB_RESULT_SUCCESS);
2080
2081    // open compact_test_manual_compacted using new db handle
2082    fconfig.compaction_mode = FDB_COMPACTION_MANUAL;
2083    status = fdb_open(&dbfile_new, "compact_test_manual_compacted", &fconfig);
2084    TEST_CHK(status == FDB_RESULT_SUCCESS);
2085
2086    // try to switch compaction mode
2087    status = fdb_switch_compaction_mode(dbfile_manual, FDB_COMPACTION_AUTO, 30);
2088    TEST_CHK(status == FDB_RESULT_FILE_IS_BUSY);
2089
2090    // close db_new
2091    status = fdb_close(dbfile_new);
2092    TEST_CHK(status == FDB_RESULT_SUCCESS);
2093
2094    // switch compaction mode of 'db_manual' from MANUAL to AUTO
2095    status = fdb_switch_compaction_mode(dbfile_manual, FDB_COMPACTION_AUTO, 10);
2096    TEST_CHK(status == FDB_RESULT_SUCCESS);
2097
2098    // change compaction value
2099    status = fdb_switch_compaction_mode(dbfile_manual, FDB_COMPACTION_AUTO, 30);
2100    TEST_CHK(status == FDB_RESULT_SUCCESS);
2101
2102    // close and open with auto-compact option
2103    status = fdb_close(dbfile_manual);
2104    TEST_CHK(status == FDB_RESULT_SUCCESS);
2105    fconfig.compaction_mode = FDB_COMPACTION_AUTO;
2106    status = fdb_open(&dbfile_manual, "compact_test_manual_compacted", &fconfig);
2107    TEST_CHK(status == FDB_RESULT_SUCCESS);
2108
2109    // switch compaction mode of 'db_non' from AUTO to MANUAL
2110    status = fdb_switch_compaction_mode(dbfile_non, FDB_COMPACTION_MANUAL, 0);
2111    TEST_CHK(status == FDB_RESULT_SUCCESS);
2112
2113    // close and open with manual-compact option
2114    status = fdb_close(dbfile_non);
2115    TEST_CHK(status == FDB_RESULT_SUCCESS);
2116    fconfig.compaction_mode = FDB_COMPACTION_MANUAL;
2117    status = fdb_open(&dbfile_non, "compact_test_non", &fconfig);
2118    TEST_CHK(status == FDB_RESULT_SUCCESS);
2119
2120    // Now perform one manual compaction on compact_test_non
2121    fdb_compact(dbfile_non, "compact_test_non.manual");
2122
2123    // close all db files except compact_test_non
2124    fdb_close(dbfile);
2125    fdb_close(dbfile_less);
2126    fdb_close(dbfile_manual);
2127
2128    // open manual compact file (compact_test_non) using auto compact mode
2129    fconfig.compaction_mode = FDB_COMPACTION_AUTO;
2130    status = fdb_open(&dbfile, "compact_test_non.manual", &fconfig);
2131    TEST_CHK(status == FDB_RESULT_INVALID_COMPACTION_MODE);
2132
2133    // Attempt to destroy manual compact file using auto compact mode
2134    status = fdb_destroy("compact_test_non.manual", &fconfig);
2135    TEST_CHK(status == FDB_RESULT_INVALID_COMPACTION_MODE);
2136
2137    // open auto copmact file (compact_test_manual_compacted) using manual compact mode
2138    fconfig.compaction_mode = FDB_COMPACTION_MANUAL;
2139    status = fdb_open(&dbfile, "compact_test_manual_compacted", &fconfig);
2140    TEST_CHK(status == FDB_RESULT_INVALID_COMPACTION_MODE);
2141
2142    // Attempt to destroy auto copmact file using manual compact mode
2143    status = fdb_destroy("compact_test_manual_compacted", &fconfig);
2144    TEST_CHK(status == FDB_RESULT_INVALID_COMPACTION_MODE);
2145
2146    // DESTROY auto copmact file with correct mode
2147    fconfig.compaction_mode = FDB_COMPACTION_AUTO;
2148    status = fdb_destroy("compact_test_manual_compacted", &fconfig);
2149    TEST_CHK(status == FDB_RESULT_SUCCESS);
2150
2151    // DESTROY manual compacted file with past version open!
2152    fconfig.compaction_mode = FDB_COMPACTION_MANUAL;
2153    status = fdb_destroy("compact_test_non.manual", &fconfig);
2154    TEST_CHK(status == FDB_RESULT_FILE_IS_BUSY);
2155    fdb_close(dbfile_non);
2156
2157    // Simulate a database crash by doing a premature shutdown
2158    // Note that db_non was never closed properly
2159    status = fdb_shutdown();
2160    TEST_CHK(status == FDB_RESULT_SUCCESS);
2161
2162    status = fdb_destroy("compact_test_non.manual", &fconfig);
2163    TEST_CHK(status == FDB_RESULT_SUCCESS);
2164
2165    // Attempt to read-only auto compacted and destroyed file
2166    fconfig.flags = FDB_OPEN_FLAG_RDONLY;
2167    status = fdb_open(&dbfile, "./compact_test_manual_compacted", &fconfig);
2168    TEST_CHK(status == FDB_RESULT_NO_SUCH_FILE);
2169
2170    status = fdb_open(&dbfile, "./compact_test_manual_compacted.meta", &fconfig);
2171    TEST_CHK(status == FDB_RESULT_NO_SUCH_FILE);
2172
2173    // Attempt to read-only past version of manually compacted destroyed file
2174    status = fdb_open(&dbfile, "compact_test_non", &fconfig);
2175    TEST_CHK(status == FDB_RESULT_NO_SUCH_FILE);
2176
2177    // Attempt to read-only current version of manually compacted destroyed file
2178    status = fdb_open(&dbfile, "compact_test_non.manual", &fconfig);
2179    TEST_CHK(status == FDB_RESULT_NO_SUCH_FILE);
2180
2181    // free all documents
2182    for (i=0;i<n;++i){
2183        fdb_doc_free(doc[i]);
2184    }
2185
2186    // free all resources
2187    fdb_shutdown();
2188
2189    memleak_end();
2190
2191    TEST_RESULT("compaction daemon test");
2192}
2193
2194// MB-13117
2195void auto_compaction_with_concurrent_insert_test(size_t t_limit)
2196{
2197    TEST_INIT();
2198
2199    memleak_start();
2200
2201    int i, r;
2202    fdb_file_handle *file;
2203    fdb_kvs_handle *kvs;
2204    fdb_status status;
2205    fdb_config config;
2206    fdb_kvs_config kvs_config;
2207    struct timeval ts_begin, ts_cur, ts_gap;
2208
2209    r = system(SHELL_DEL" compact_test* > errorlog.txt");
2210    (void)r;
2211
2212    // Open Database File
2213    config = fdb_get_default_config();
2214    config.compaction_mode=FDB_COMPACTION_AUTO;
2215    config.compactor_sleep_duration = 1;
2216    status = fdb_open(&file, "compact_test", &config);
2217    TEST_CHK(status == FDB_RESULT_SUCCESS);
2218
2219    // Open KV Store
2220    kvs_config = fdb_get_default_kvs_config();
2221    status = fdb_kvs_open_default(file, &kvs, &kvs_config);
2222    TEST_CHK(status == FDB_RESULT_SUCCESS);
2223
2224    gettimeofday(&ts_begin, NULL);
2225    printf("wait for %d seconds..\n", (int)t_limit);
2226
2227    // Several kv pairs
2228    for(i=0;i<100000;i++) {
2229        char str[15];
2230        sprintf(str, "%d", i);
2231        status = fdb_set_kv(kvs, str, strlen(str), (void*)"value", 5);
2232        TEST_CHK(status == FDB_RESULT_SUCCESS);
2233
2234        // Commit
2235        status = fdb_commit(file, FDB_COMMIT_NORMAL);
2236        TEST_CHK(status == FDB_RESULT_SUCCESS);
2237
2238        gettimeofday(&ts_cur, NULL);
2239        ts_gap = _utime_gap(ts_begin, ts_cur);
2240        if ((size_t)ts_gap.tv_sec >= t_limit) {
2241            break;
2242        }
2243    }
2244
2245    status = fdb_close(file);
2246    TEST_CHK(status == FDB_RESULT_SUCCESS);
2247    status = fdb_shutdown();
2248    TEST_CHK(status == FDB_RESULT_SUCCESS);
2249
2250    memleak_end();
2251
2252    TEST_RESULT("auto compaction with concurrent insert test");
2253}
2254
2255// lexicographically compares two variable-length binary streams
2256#define MIN(a,b) (((a)<(b))?(a):(b))
2257static int _compact_test_keycmp(void *key1, size_t keylen1,
2258                                void *key2, size_t keylen2)
2259{
2260    if (keylen1 == keylen2) {
2261        return memcmp(key1, key2, keylen1);
2262    }else {
2263        size_t len = MIN(keylen1, keylen2);
2264        int cmp = memcmp(key1, key2, len);
2265        if (cmp != 0) return cmp;
2266        else {
2267            return (int)((int)keylen1 - (int)keylen2);
2268        }
2269    }
2270}
2271
2272void auto_compaction_with_custom_cmp_function()
2273{
2274    TEST_INIT();
2275
2276    memleak_start();
2277
2278    int i, r, n=10000;
2279    char keybuf[256], bodybuf[256];
2280    uint64_t max_filesize = 0;
2281    fdb_file_handle *file;
2282    fdb_kvs_handle *db1, *db2, *db3;
2283    fdb_status status;
2284    fdb_config config;
2285    fdb_kvs_config kvs_config;
2286    fdb_file_info file_info;
2287    char *kvs_names[] = {NULL};
2288    fdb_custom_cmp_variable functions[] = {_compact_test_keycmp};
2289
2290    r = system(SHELL_DEL" compact_test* > errorlog.txt");
2291    (void)r;
2292
2293    // Open Database File
2294    config = fdb_get_default_config();
2295    config.wal_threshold = 4096; // reset WAL threshold for correct file size estimation
2296    config.compaction_mode=FDB_COMPACTION_AUTO;
2297    config.compactor_sleep_duration = 1;
2298    config.compaction_threshold = 10;
2299    status = fdb_open_custom_cmp(&file, "compact_test", &config,
2300                                 1, kvs_names, functions);
2301    TEST_CHK(status == FDB_RESULT_SUCCESS);
2302
2303    // Open 2 KV Stores
2304    kvs_config = fdb_get_default_kvs_config();
2305    status = fdb_kvs_open_default(file, &db1, &kvs_config);
2306    TEST_CHK(status == FDB_RESULT_SUCCESS);
2307    status = fdb_kvs_open(file, &db2, "db", &kvs_config);
2308    TEST_CHK(status == FDB_RESULT_SUCCESS);
2309
2310    // initial load
2311    for(i=0;i<n;i++) {
2312        sprintf(keybuf, "key%06d", i);
2313        sprintf(bodybuf, "body%06d", i);
2314        status = fdb_set_kv(db1, keybuf, strlen(keybuf),
2315                                 bodybuf, strlen(bodybuf));
2316        TEST_CHK(status == FDB_RESULT_SUCCESS);
2317        status = fdb_set_kv(db2, keybuf, strlen(keybuf),
2318                                 bodybuf, strlen(bodybuf));
2319        TEST_CHK(status == FDB_RESULT_SUCCESS);
2320    }
2321
2322    // create one more KVS using custom cmp
2323    // it doesn't exist on the initial cmp_func list
2324    kvs_config.custom_cmp = _compact_test_keycmp;
2325    status = fdb_kvs_open(file, &db3, "db_custom", &kvs_config);
2326    TEST_CHK(status == FDB_RESULT_SUCCESS);
2327    i = 0;
2328    sprintf(keybuf, "key%06d", i);
2329    sprintf(bodybuf, "body%06d", i);
2330    status = fdb_set_kv(db3, keybuf, strlen(keybuf),
2331                             bodybuf, strlen(bodybuf));
2332    TEST_CHK(status == FDB_RESULT_SUCCESS);
2333
2334    // Commit
2335    status = fdb_commit(file, FDB_COMMIT_NORMAL);
2336    TEST_CHK(status == FDB_RESULT_SUCCESS);
2337
2338    // update to trigger compaction
2339    for(i=0;i<n;i++) {
2340        sprintf(keybuf, "key%06d", i);
2341        sprintf(bodybuf, "body%06d", i);
2342        status = fdb_set_kv(db1, keybuf, strlen(keybuf),
2343                                 bodybuf, strlen(bodybuf));
2344        TEST_CHK(status == FDB_RESULT_SUCCESS);
2345
2346        status = fdb_get_file_info(file, &file_info);
2347        TEST_CHK(status == FDB_RESULT_SUCCESS);
2348        if (file_info.file_size > max_filesize) {
2349            max_filesize = file_info.file_size;
2350        }
2351    }
2352    // Commit
2353    status = fdb_commit(file, FDB_COMMIT_NORMAL);
2354    TEST_CHK(status == FDB_RESULT_SUCCESS);
2355
2356    status = fdb_get_file_info(file, &file_info);
2357    TEST_CHK(status == FDB_RESULT_SUCCESS);
2358    if (file_info.file_size > max_filesize) {
2359        max_filesize = file_info.file_size;
2360    }
2361
2362    printf("wait for daemon compaction completion... (max file size: %" _F64 ")\n", max_filesize);
2363    while (true) {
2364        sleep(1);
2365
2366        status = fdb_get_file_info(file, &file_info);
2367        TEST_CHK(status == FDB_RESULT_SUCCESS);
2368        if (file_info.file_size < max_filesize) {
2369            break;
2370        }
2371    }
2372    // should be compacted
2373    TEST_CHK(file_info.file_size < max_filesize);
2374
2375    status = fdb_close(file);
2376    TEST_CHK(status == FDB_RESULT_SUCCESS);
2377    status = fdb_shutdown();
2378    TEST_CHK(status == FDB_RESULT_SUCCESS);
2379
2380    memleak_end();
2381
2382    TEST_RESULT("auto compaction with custom comparison function");
2383}
2384
2385struct cb_txn_args {
2386    fdb_file_handle *file;
2387    fdb_kvs_handle *handle;
2388    int ndocs;
2389    int nupdates;
2390    int done;
2391};
2392
2393static int cb_txn(fdb_file_handle *fhandle,
2394                  fdb_compaction_status status, const char *kv_name,
2395                  fdb_doc *doc, uint64_t old_offset, uint64_t new_offset,
2396                  void *ctx)
2397{
2398    struct cb_txn_args *args = (struct cb_txn_args *)ctx;
2399    (void) fhandle;
2400    (void) doc;
2401    (void) old_offset;
2402    (void) new_offset;
2403
2404    if (status == FDB_CS_END && !args->done) {
2405        int i;
2406        int n = 10;
2407        char keybuf[256], bodybuf[256];
2408        fdb_status s;
2409
2410        // begin transaction
2411        fdb_begin_transaction(args->file, FDB_ISOLATION_READ_COMMITTED);
2412        // insert new docs, but do not end transaction
2413        for (i=0;i<n/2;++i){
2414            sprintf(keybuf, "txn%04d", i);
2415            sprintf(bodybuf, "txn_body%04d", i);
2416            s = fdb_set_kv(args->handle, keybuf, strlen(keybuf)+1,
2417                                         bodybuf, strlen(bodybuf)+1);
2418            (void)s;
2419        }
2420        args->done = 1;
2421    }
2422
2423    return 0;
2424}
2425
2426void compaction_with_concurrent_transaction_test()
2427{
2428    TEST_INIT();
2429    memleak_start();
2430
2431    int i, r;
2432    int n = 10;
2433    size_t valuelen;
2434    char keybuf[256], bodybuf[256];
2435    fdb_file_handle *dbfile, *txn_file;
2436    fdb_kvs_handle *db, *txn;
2437    fdb_status s;
2438    fdb_config fconfig = fdb_get_default_config();
2439    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2440    void *value;
2441    struct cb_txn_args cb_args;
2442
2443    memset(&cb_args, 0x0, sizeof(struct cb_txn_args));
2444    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2445    fconfig.compaction_cb = cb_txn;
2446    fconfig.compaction_cb_ctx = &cb_args;
2447    fconfig.compaction_cb_mask = FDB_CS_BEGIN |
2448                                 FDB_CS_END;
2449
2450    // remove previous compact_test files
2451    r = system(SHELL_DEL" compact_test* > errorlog.txt");
2452    (void)r;
2453
2454    // open db
2455    fdb_open(&dbfile, "./compact_test1", &fconfig);
2456    fdb_kvs_open(dbfile, &db, "db", &kvs_config);
2457
2458    fdb_open(&txn_file, "./compact_test1", &fconfig);
2459    fdb_kvs_open(txn_file, &txn, "db", &kvs_config);
2460    cb_args.file = txn_file;
2461    cb_args.handle = txn;
2462
2463    // write docs & commit
2464    for (i=0;i<n;++i){
2465        sprintf(keybuf, "key%04d", i);
2466        sprintf(bodybuf, "body%04d", i);
2467        s = fdb_set_kv(db, keybuf, strlen(keybuf)+1,
2468                           bodybuf, strlen(bodybuf)+1);
2469        TEST_CHK(s == FDB_RESULT_SUCCESS);
2470    }
2471    s = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2472    TEST_CHK(s == FDB_RESULT_SUCCESS);
2473    cb_args.ndocs = n;
2474    cb_args.nupdates = 2;
2475
2476    s = fdb_compact(dbfile, "./compact_test2");
2477    TEST_CHK(s == FDB_RESULT_SUCCESS);
2478
2479    // insert new docs through transaction
2480    for (i=n/2;i<n;++i){
2481        sprintf(keybuf, "txn%04d", i);
2482        sprintf(bodybuf, "txn_body%04d", i);
2483        s = fdb_set_kv(txn, keybuf, strlen(keybuf)+1,
2484                            bodybuf, strlen(bodybuf)+1);
2485        TEST_CHK(s == FDB_RESULT_SUCCESS);
2486    }
2487    // all txn docs should be retrieved
2488    for (i=0;i<n;++i){
2489        sprintf(keybuf, "txn%04d", i);
2490        sprintf(bodybuf, "txn_body%04d", i);
2491        s = fdb_get_kv(txn, keybuf, strlen(keybuf)+1,
2492                            &value, &valuelen);
2493        TEST_CHK(s == FDB_RESULT_SUCCESS);
2494        TEST_CMP(value, bodybuf, valuelen);
2495        fdb_free_block(value);
2496    }
2497    s = fdb_end_transaction(txn_file, FDB_COMMIT_MANUAL_WAL_FLUSH);
2498    TEST_CHK(s == FDB_RESULT_SUCCESS);
2499    fdb_close(txn_file);
2500
2501    s = fdb_compact(dbfile, "./compact_test3");
2502    TEST_CHK(s == FDB_RESULT_SUCCESS);
2503
2504    fdb_close(dbfile);
2505    fdb_shutdown();
2506    memleak_end();
2507    TEST_RESULT("compaction with concurrent transaction test");
2508}
2509
2510struct cb_upt_args {
2511    fdb_file_handle *file;
2512    fdb_kvs_handle *handle;
2513    fdb_kvs_handle *handle2;
2514    int ndocs;
2515    int nupdates;
2516    int done;
2517    int nmoves;
2518};
2519
2520static fdb_compact_decision cb_upt(fdb_file_handle *fhandle,
2521                                   fdb_compaction_status status,
2522                                   const char *kv_name,
2523                                   fdb_doc *doc, uint64_t old_offset,
2524                                   uint64_t new_offset,
2525                                   void *ctx)
2526{
2527    TEST_INIT();
2528    struct cb_upt_args *args = (struct cb_upt_args *)ctx;
2529    (void) fhandle;
2530    (void) doc;
2531    (void) old_offset;
2532    (void) new_offset;
2533
2534    if (status == FDB_CS_MOVE_DOC) {
2535        int i, j;
2536        int n = 10;
2537        char keybuf[256], bodybuf[256];
2538        char keystr[] = "new%04d";
2539        char valuestr[] = "new_body%04d";
2540        fdb_status s;
2541        args->nmoves++;
2542        if (args->nmoves > n/2 && !args->done) {
2543            // verify if the key is stripped off its prefix
2544            fdb_doc tdoc;
2545            memset(&tdoc, 0, sizeof(fdb_doc));
2546            tdoc.key = doc->key;
2547            tdoc.keylen = doc->keylen;
2548            s = fdb_get(args->handle, &tdoc);
2549            TEST_CHK(s == FDB_RESULT_SUCCESS);
2550            free(tdoc.meta);
2551            free(tdoc.body);
2552            // insert new docs
2553            for (i=0;i<n/2;++i){
2554                sprintf(keybuf, keystr, i);
2555                sprintf(bodybuf, valuestr, i);
2556                s = fdb_set_kv(args->handle, keybuf, strlen(keybuf)+1,
2557                                             bodybuf, strlen(bodybuf)+1);
2558                TEST_CHK(s == FDB_RESULT_SUCCESS);
2559                s = fdb_commit(args->file, FDB_COMMIT_NORMAL);
2560                TEST_CHK(s == FDB_RESULT_SUCCESS);
2561
2562                for (j=0; j<=i; ++j){
2563                    void *v_out;
2564                    size_t vlen_out;
2565                    sprintf(keybuf, keystr, i);
2566                    sprintf(bodybuf, valuestr, i);
2567                    s = fdb_get_kv(args->handle2, keybuf, strlen(keybuf)+1,
2568                        &v_out, &vlen_out);
2569                    TEST_CHK(s == FDB_RESULT_SUCCESS);
2570                    fdb_free_block(v_out);
2571                }
2572            }
2573            args->done = 1;
2574        }
2575        if (args->nmoves > n && args->done == 1) {
2576            // the first phase is done. insert new docs.
2577            for (i=0;i<2;++i){
2578                sprintf(keybuf, "xxx%d", i);
2579                sprintf(bodybuf, "xxxvalue%d", i);
2580                s = fdb_set_kv(args->handle, keybuf, strlen(keybuf)+1,
2581                                             bodybuf, strlen(bodybuf)+1);
2582                TEST_CHK(s == FDB_RESULT_SUCCESS);
2583                s = fdb_commit(args->file, FDB_COMMIT_NORMAL);
2584                TEST_CHK(s == FDB_RESULT_SUCCESS);
2585            }
2586            args->done = 2;
2587        }
2588        if (args->nmoves == (n*3/2 + 2) && args->done == 2) {
2589            // during the second-second phase,
2590            // insert new docs, and do not commit.
2591            for (i=0;i<2;++i){
2592                sprintf(keybuf, "zzz%d", i);
2593                sprintf(bodybuf, "zzzvalue%d", i);
2594                s = fdb_set_kv(args->handle, keybuf, strlen(keybuf)+1,
2595                                             bodybuf, strlen(bodybuf)+1);
2596                TEST_CHK(s == FDB_RESULT_SUCCESS);
2597            }
2598            args->done = 3;
2599        }
2600        TEST_CMP(kv_name, "db", 2);
2601    }
2602
2603    return 0;
2604}
2605
2606void compaction_with_concurrent_update_test()
2607{
2608    TEST_INIT();
2609    memleak_start();
2610
2611    int i, r;
2612    int n = 10;
2613    char keybuf[256], bodybuf[256];
2614    fdb_file_handle *dbfile, *dbfile2, *dbfile3;
2615    fdb_kvs_handle *db, *db2, *db3;
2616    fdb_status s;
2617    fdb_config fconfig = fdb_get_default_config();
2618    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2619    fdb_kvs_info info;
2620    fdb_seqnum_t seqnum;
2621    struct cb_upt_args cb_args;
2622    void *value;
2623    size_t valuelen;
2624
2625    memset(&cb_args, 0x0, sizeof(struct cb_upt_args));
2626    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2627    fconfig.compaction_cb = cb_upt;
2628    fconfig.compaction_cb_ctx = &cb_args;
2629    fconfig.compaction_cb_mask = FDB_CS_BEGIN |
2630                                 FDB_CS_END | FDB_CS_MOVE_DOC;
2631
2632    // remove previous compact_test files
2633    r = system(SHELL_DEL" compact_test* > errorlog.txt");
2634    (void)r;
2635
2636    // open db
2637    fdb_open(&dbfile, "./compact_test1", &fconfig);
2638    fdb_kvs_open(dbfile, &db, "db", &kvs_config);
2639
2640    fdb_open(&dbfile2, "./compact_test1", &fconfig);
2641    fdb_kvs_open(dbfile2, &db2, "db", &kvs_config);
2642    fdb_open(&dbfile3, "./compact_test1", &fconfig);
2643    fdb_kvs_open(dbfile3, &db3, "db", &kvs_config);
2644
2645    cb_args.file = dbfile2;
2646    cb_args.handle = db2;
2647    cb_args.handle2 = db3;
2648
2649    // write docs & commit
2650    for (i=0;i<n;++i){
2651        sprintf(keybuf, "key%04d", i);
2652        sprintf(bodybuf, "body%04d", i);
2653        s = fdb_set_kv(db, keybuf, strlen(keybuf)+1,
2654                           bodybuf, strlen(bodybuf)+1);
2655        TEST_CHK(s == FDB_RESULT_SUCCESS);
2656    }
2657    s = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2658    TEST_CHK(s == FDB_RESULT_SUCCESS);
2659    cb_args.ndocs = n;
2660    cb_args.nupdates = 2;
2661    cb_args.nmoves = 0;
2662
2663    s = fdb_compact(dbfile, "./compact_test2");
2664    TEST_CHK(s == FDB_RESULT_SUCCESS);
2665
2666    // insert new docs after compaction
2667    for (i=n/2;i<n;++i){
2668        sprintf(keybuf, "new%04d", i);
2669        sprintf(bodybuf, "new_body%04d", i);
2670        s = fdb_set_kv(db2, keybuf, strlen(keybuf)+1,
2671                            bodybuf, strlen(bodybuf)+1);
2672        TEST_CHK(s == FDB_RESULT_SUCCESS);
2673    }
2674    // all interleaved docs should be retrieved
2675    // 1. inserted during the first phase of the compaction
2676    for (i=0;i<n;++i){
2677        sprintf(keybuf, "new%04d", i);
2678        sprintf(bodybuf, "new_body%04d", i);
2679        s = fdb_get_kv(db2, keybuf, strlen(keybuf)+1,
2680                            &value, &valuelen);
2681        TEST_CHK(s == FDB_RESULT_SUCCESS);
2682        TEST_CMP(value, bodybuf, valuelen);
2683        fdb_free_block(value);
2684    }
2685    // 2. inserted during the second phase of the compaction
2686    for (i=0; i<2; ++i) {
2687        sprintf(keybuf, "xxx%d", i);
2688        sprintf(bodybuf, "xxxvalue%d", i);
2689        s = fdb_get_kv(db2, keybuf, strlen(keybuf)+1,
2690                            &value, &valuelen);
2691        TEST_CHK(s == FDB_RESULT_SUCCESS);
2692        TEST_CMP(value, bodybuf, valuelen);
2693        fdb_free_block(value);
2694
2695        sprintf(keybuf, "zzz%d", i);
2696        sprintf(bodybuf, "zzzvalue%d", i);
2697        s = fdb_get_kv(db2, keybuf, strlen(keybuf)+1,
2698                            &value, &valuelen);
2699        TEST_CHK(s == FDB_RESULT_SUCCESS);
2700        TEST_CMP(value, bodybuf, valuelen);
2701        fdb_free_block(value);
2702    }
2703
2704    fdb_close(dbfile2);
2705    fdb_close(dbfile3);
2706
2707    s = fdb_get_kvs_info(db, &info);
2708    TEST_CHK(s == FDB_RESULT_SUCCESS);
2709    seqnum = info.last_seqnum;
2710
2711    cb_args.nmoves = 0;
2712    s = fdb_compact(dbfile, "./compact_test3");
2713    TEST_CHK(s == FDB_RESULT_SUCCESS);
2714
2715    s = fdb_get_kvs_info(db, &info);
2716    TEST_CHK(s == FDB_RESULT_SUCCESS);
2717    // No docs are inserted during the compaction.
2718    // Seqnum should be same.
2719    TEST_CHK(info.last_seqnum == seqnum);
2720
2721    // insert the last document
2722    sprintf(keybuf, "last");
2723    sprintf(bodybuf, "last_value");
2724    s = fdb_set_kv(db, keybuf, strlen(keybuf)+1, bodybuf, strlen(bodybuf)+1);
2725    TEST_CHK(s == FDB_RESULT_SUCCESS);
2726    s = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2727    TEST_CHK(s == FDB_RESULT_SUCCESS);
2728
2729    fdb_close(dbfile);
2730    fdb_shutdown();
2731    memleak_end();
2732    TEST_RESULT("compaction with concurrent update test");
2733}
2734
2735static int compaction_del_cb(fdb_file_handle *fhandle,
2736                             fdb_compaction_status status,
2737                             const char *kv_name,
2738                             fdb_doc *doc, uint64_t old_offset,
2739                             uint64_t new_offset,
2740                             void *ctx)
2741{
2742    TEST_INIT();
2743    fdb_status s;
2744    fdb_file_handle *dbfile;
2745    fdb_kvs_handle *db = *(fdb_kvs_handle **)ctx;
2746    int i;
2747    int n = 10; // if changing this change caller function too
2748    char keybuf[256];
2749    (void) doc;
2750    (void) new_offset;
2751    (void) old_offset;
2752
2753    if (status == FDB_CS_BEGIN) {
2754        TEST_CHK(false);
2755    } else if (status == FDB_CS_END) {
2756        TEST_CHK(!kv_name);
2757        if (db) { // At end of first phase, mutate more docs...
2758            s = fdb_open(&dbfile, "./compact_test1", &fhandle->root->config);
2759            TEST_CHK(s == FDB_RESULT_SUCCESS);
2760            fdb_kvs_open_default(dbfile, &db, &db->kvs_config);
2761            for (i = 0; i < n; ++i){
2762                sprintf(keybuf, "key%d", i);
2763                s = fdb_del_kv(db, keybuf, strlen(keybuf));
2764                TEST_CHK(s == FDB_RESULT_SUCCESS);
2765            }
2766            // Now insert and delete a bunch of keys (all in WAL)
2767            for (i = 0; i < n; ++i){
2768                sprintf(keybuf, "KEY%d", i);
2769                s = fdb_set_kv(db, keybuf, strlen(keybuf), NULL, 0);
2770                TEST_CHK(s == FDB_RESULT_SUCCESS);
2771                s = fdb_del_kv(db, keybuf, strlen(keybuf));
2772                TEST_CHK(s == FDB_RESULT_SUCCESS);
2773            }
2774            sprintf(keybuf, "key%d", i);
2775            s = fdb_set_kv(db, keybuf, strlen(keybuf), NULL, 0);
2776            TEST_CHK(s == FDB_RESULT_SUCCESS);
2777            s = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2778            TEST_CHK(s == FDB_RESULT_SUCCESS);
2779            fdb_close(dbfile);
2780            *(fdb_kvs_handle **)ctx = NULL; // Don't run in 2nd, 3rd phases
2781        }
2782    } else if (status == FDB_CS_FLUSH_WAL) {
2783        TEST_CHK(false);
2784    } else if (status == FDB_CS_MOVE_DOC) {
2785        TEST_CHK(false);
2786    } else { // FDB_CS_BATCH_MOVE
2787        TEST_CHK(false);
2788    }
2789    return 0;
2790}
2791
2792void compact_deleted_doc_test()
2793{
2794    TEST_INIT();
2795    memleak_start();
2796    int i, r;
2797    int n = 10; // if changing this change the callback too
2798    fdb_file_handle *dbfile;
2799    fdb_kvs_handle *db, *cb_db;
2800    fdb_doc *rdoc;
2801    fdb_status s;
2802    char keybuf[256];
2803    void *value;
2804    size_t valuelen;
2805
2806    // remove previous compact_test files
2807    r = system(SHELL_DEL " compact_test* > errorlog.txt");
2808    (void)r;
2809
2810    fdb_config fconfig = fdb_get_default_config();
2811    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2812    fconfig.multi_kv_instances = false;
2813    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2814
2815    fconfig.compaction_cb = compaction_del_cb;
2816    fconfig.compaction_cb_ctx = &cb_db;
2817    fconfig.compaction_cb_mask = FDB_CS_END;
2818
2819    // open db
2820    fdb_open(&dbfile, "./compact_test1", &fconfig);
2821    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2822    cb_db = db;
2823
2824    for (i=0;i<n;++i){
2825        sprintf(keybuf, "key%d", i);
2826        s = fdb_set_kv(db, keybuf, strlen(keybuf),
2827                            (void*)"value", 5);
2828        TEST_CHK(s == FDB_RESULT_SUCCESS);
2829    }
2830    fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2831
2832    // At end of phase 1, all documents get deleted
2833    s = fdb_compact(dbfile, "compact_test2");
2834    TEST_CHK(s == FDB_RESULT_SUCCESS);
2835    TEST_CHK(wal_get_num_flushable(dbfile->root->file) == 0);
2836
2837    for (i=0;i<n;++i){
2838        sprintf(keybuf, "key%d", i);
2839        fdb_doc_create(&rdoc, keybuf, strlen(keybuf), NULL, 0, NULL, 0);
2840        s = fdb_get_metaonly(db, rdoc);
2841        TEST_CHK(s == FDB_RESULT_KEY_NOT_FOUND);
2842        fdb_doc_free(rdoc);
2843    }
2844
2845    // documents inserted in the middle of phase 1 should be present
2846    sprintf(keybuf, "key%d", i);
2847    s = fdb_get_kv(db, keybuf, strlen(keybuf), &value, &valuelen);
2848    TEST_CHK(s == FDB_RESULT_SUCCESS);
2849    fdb_free_block(value);
2850
2851    fdb_kvs_close(db);
2852    fdb_close(dbfile);
2853    fdb_shutdown();
2854    memleak_end();
2855    TEST_RESULT("compact deleted doc test");
2856}
2857
2858void compact_upto_twice_test()
2859{
2860    TEST_INIT();
2861    memleak_start();
2862    int i, r;
2863    fdb_file_handle *dbfile;
2864    fdb_kvs_handle *db;
2865    fdb_snapshot_info_t *markers;
2866    fdb_status status;
2867    uint64_t num_markers;
2868    char keybuf[256];
2869
2870    // remove previous compact_test files
2871    r = system(SHELL_DEL " compact_test* > errorlog.txt");
2872    (void)r;
2873
2874    fdb_config fconfig = fdb_get_default_config();
2875    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2876    fconfig.wal_threshold = 1024;
2877    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2878    fconfig.block_reusing_threshold = 0;
2879
2880    // open db
2881    fdb_open(&dbfile, "./compact_test1", &fconfig);
2882    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2883
2884    for (i=0;i<10;++i){
2885        sprintf(keybuf, "key%d", i);
2886        status = fdb_set_kv(db, keybuf, strlen(keybuf),
2887                            (void*)"value", 5);
2888        TEST_CHK(status == FDB_RESULT_SUCCESS);
2889        fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2890    }
2891
2892    // compact upto twice with incrementing seqnums
2893    status = fdb_get_all_snap_markers(dbfile, &markers,
2894                                      &num_markers);
2895    TEST_CHK(status == FDB_RESULT_SUCCESS);
2896    status = fdb_compact_upto(dbfile, NULL, markers[num_markers-1].marker);
2897    TEST_CHK(status == FDB_RESULT_SUCCESS);
2898    status = fdb_free_snap_markers(markers, num_markers);
2899    TEST_CHK(status == FDB_RESULT_SUCCESS);
2900
2901    // compact upto
2902    status = fdb_get_all_snap_markers(dbfile, &markers,
2903                                      &num_markers);
2904    TEST_CHK(status == FDB_RESULT_SUCCESS);
2905    status = fdb_compact_upto(dbfile, NULL, markers[num_markers-2].marker);
2906    TEST_CHK(status == FDB_RESULT_SUCCESS);
2907    status = fdb_free_snap_markers(markers, num_markers);
2908    TEST_CHK(status == FDB_RESULT_SUCCESS);
2909
2910    fdb_close(dbfile);
2911    fdb_shutdown();
2912    memleak_end();
2913    TEST_RESULT("compact upto twice");
2914}
2915
2916void wal_delete_compact_upto_test()
2917{
2918    TEST_INIT();
2919    memleak_start();
2920    int i, r;
2921    fdb_file_handle *dbfile;
2922    fdb_kvs_handle *db;
2923    fdb_snapshot_info_t *markers;
2924    fdb_status status;
2925    uint64_t num_markers;
2926    fdb_kvs_info kvs_info;
2927    char keybuf[256];
2928    void *value;
2929    size_t valuelen;
2930
2931    // remove previous compact_test files
2932    r = system(SHELL_DEL " compact_test* > errorlog.txt");
2933    (void)r;
2934
2935    fdb_config fconfig = fdb_get_default_config();
2936    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
2937    fconfig.wal_threshold = 19;
2938    fconfig.flags = FDB_OPEN_FLAG_CREATE;
2939
2940    // open db
2941    fdb_open(&dbfile, "./compact_test5", &fconfig);
2942    fdb_kvs_open_default(dbfile, &db, &kvs_config);
2943
2944    // insert a few keys...
2945    for (i=0;i<10;++i){
2946        sprintf(keybuf, "key%d", i);
2947        status = fdb_set_kv(db, keybuf, strlen(keybuf),
2948                            (void*)"value", 5);
2949        TEST_CHK(status == FDB_RESULT_SUCCESS);
2950    }
2951
2952    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
2953    TEST_CHK(status == FDB_RESULT_SUCCESS);
2954
2955    // insert keys such that last insert causes wal flush...
2956    for (; i<20;++i){
2957        sprintf(keybuf, "key%d", i);
2958        status = fdb_set_kv(db, keybuf, strlen(keybuf),
2959                            (void*)"value", 5);
2960        TEST_CHK(status == FDB_RESULT_SUCCESS);
2961    }
2962    // Now delete half of the newly inserted keys
2963    for (i = 15; i < 20; ++i){
2964        sprintf(keybuf, "key%d", i);
2965        status = fdb_del_kv(db, keybuf, strlen(keybuf));
2966        TEST_CHK(status == FDB_RESULT_SUCCESS);
2967    }
2968    // Now deleted items are in the unflushed wal..
2969    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2970    TEST_CHK(status == FDB_RESULT_SUCCESS);
2971
2972    // Create another header for compact_upto()
2973    sprintf(keybuf, "key%d", i);
2974    status = fdb_set_kv(db, keybuf, strlen(keybuf),
2975            (void*)"value", 5);
2976    TEST_CHK(status == FDB_RESULT_SUCCESS);
2977    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
2978    TEST_CHK(status == FDB_RESULT_SUCCESS);
2979
2980    // Get all snap markers..
2981    status = fdb_get_all_snap_markers(dbfile, &markers,
2982                                      &num_markers);
2983    TEST_CHK(status == FDB_RESULT_SUCCESS);
2984
2985    // compact upto the delete wal's marker...
2986    status = fdb_compact_upto(dbfile, NULL, markers[1].marker);
2987    TEST_CHK(status == FDB_RESULT_SUCCESS);
2988    status = fdb_free_snap_markers(markers, num_markers);
2989    TEST_CHK(status == FDB_RESULT_SUCCESS);
2990
2991    status = fdb_get_kvs_info(db, &kvs_info);
2992    TEST_CHK(status == FDB_RESULT_SUCCESS);
2993
2994    // Deleted items should not be found..
2995    for (i = 15; i < 20; ++i){
2996        sprintf(keybuf, "key%d", i);
2997        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value, &valuelen);
2998        TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
2999    }
3000
3001    fdb_kvs_close(db);
3002    fdb_close(dbfile);
3003    fdb_shutdown();
3004    memleak_end();
3005    TEST_RESULT("compact upto with wal deletes test");
3006}
3007
3008void compact_upto_post_snapshot_test()
3009{
3010    TEST_INIT();
3011    memleak_start();
3012    int i, r;
3013    fdb_file_handle *dbfile;
3014    fdb_kvs_handle *db, *snap_db;
3015    fdb_snapshot_info_t *markers;
3016    fdb_status status;
3017    fdb_iterator *iterator;
3018    fdb_doc *rdoc = NULL;
3019    fdb_kvs_info kvs_info;
3020    uint64_t num_markers;
3021    char keybuf[256];
3022
3023    // remove previous compact_test files
3024    r = system(SHELL_DEL " compact_test* > errorlog.txt");
3025    (void)r;
3026
3027    fdb_config fconfig = fdb_get_default_config();
3028    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3029    fconfig.wal_threshold = 1024;
3030    fconfig.flags = FDB_OPEN_FLAG_CREATE;
3031    // prevent block reusing to keep snapshots
3032    fconfig.block_reusing_threshold = 0;
3033
3034    // open db
3035    fdb_open(&dbfile, "./compact_test1", &fconfig);
3036    fdb_kvs_open_default(dbfile, &db, &kvs_config);
3037
3038    for (i=0;i<10;++i){
3039        sprintf(keybuf, "key%d", i);
3040        status = fdb_set_kv(db, keybuf, strlen(keybuf),
3041                            (void*)"value", 5);
3042        TEST_CHK(status == FDB_RESULT_SUCCESS);
3043        fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
3044    }
3045
3046    // check  db info
3047    status = fdb_get_kvs_info(db, &kvs_info);
3048    TEST_CHK(status == FDB_RESULT_SUCCESS);
3049    TEST_CHK(kvs_info.last_seqnum == 10);
3050
3051    // open snapshot at seqnum 5
3052    status = fdb_snapshot_open(db, &snap_db, 5);
3053    TEST_CHK(status == FDB_RESULT_SUCCESS);
3054
3055    // check snap info
3056    status = fdb_get_kvs_info(snap_db, &kvs_info);
3057    TEST_CHK(status == FDB_RESULT_SUCCESS);
3058    TEST_CHK(kvs_info.last_seqnum == 5);
3059
3060
3061    // compact upto
3062    status = fdb_get_all_snap_markers(dbfile, &markers,
3063                                      &num_markers);
3064    TEST_CHK(status == FDB_RESULT_SUCCESS);
3065    status = fdb_compact_upto(dbfile, NULL, markers[5].marker);
3066    TEST_CHK(status == FDB_RESULT_SUCCESS);
3067    status = fdb_free_snap_markers(markers, num_markers);
3068    TEST_CHK(status == FDB_RESULT_SUCCESS);
3069
3070    // check db and snap info
3071    status = fdb_get_kvs_info(db, &kvs_info);
3072    TEST_CHK(status == FDB_RESULT_SUCCESS);
3073    TEST_CHK(kvs_info.last_seqnum == 10);
3074    status = fdb_get_kvs_info(snap_db, &kvs_info);
3075    TEST_CHK(status == FDB_RESULT_SUCCESS);
3076    TEST_CHK(kvs_info.last_seqnum == 5);
3077
3078
3079    // iterate over snapshot
3080    status = fdb_iterator_init(snap_db,
3081                               &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
3082    TEST_CHK(status == FDB_RESULT_SUCCESS);
3083    i = 0;
3084    do {
3085        status = fdb_iterator_get(iterator, &rdoc);
3086        TEST_CHK(status == FDB_RESULT_SUCCESS);
3087        fdb_doc_free(rdoc);
3088        rdoc = NULL;
3089        i++;
3090
3091        // check db and snap info
3092        status = fdb_get_kvs_info(db, &kvs_info);
3093        TEST_CHK(status == FDB_RESULT_SUCCESS);
3094        TEST_CHK(kvs_info.last_seqnum == 10);
3095        status = fdb_get_kvs_info(snap_db, &kvs_info);
3096        TEST_CHK(status == FDB_RESULT_SUCCESS);
3097        TEST_CHK(kvs_info.last_seqnum == 5);
3098    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
3099
3100    TEST_CHK(i == 5);
3101
3102    fdb_iterator_close(iterator);
3103    fdb_kvs_close(snap_db);
3104    fdb_close(dbfile);
3105    fdb_shutdown();
3106    memleak_end();
3107    TEST_RESULT("compact upto post snapshot test");
3108}
3109
3110void compact_upto_overwrite_test(int opt)
3111{
3112    TEST_INIT();
3113
3114    int n = 10, value_len=32;
3115    int i, r, idx, c;
3116    char cmd[256];
3117    char key[256], *value;
3118    char keystr[] = "key%06d";
3119    char valuestr[] = "value%08d";
3120    char valuestr2[] = "updated_value%08d";
3121    fdb_file_handle *db_file;
3122    fdb_kvs_handle *db, *snap;
3123    fdb_config config;
3124    fdb_kvs_config kvs_config;
3125    fdb_status s;
3126    fdb_kvs_info kvs_info;
3127    fdb_iterator *fit;
3128    fdb_doc *doc;
3129    fdb_snapshot_info_t *markers;
3130    fdb_seqnum_t seqnum;
3131    fdb_commit_opt_t commit_opt;
3132    uint64_t n_markers;
3133
3134    sprintf(cmd, SHELL_DEL " compact_test* > errorlog.txt");
3135    r = system(cmd);
3136    (void)r;
3137
3138    memleak_start();
3139
3140    value = (char*)malloc(value_len);
3141
3142    config = fdb_get_default_config();
3143    config.durability_opt = FDB_DRB_ASYNC;
3144    config.seqtree_opt = FDB_SEQTREE_USE;
3145    config.wal_flush_before_commit = true;
3146    config.multi_kv_instances = true;
3147    config.buffercache_size = 0;
3148    config.block_reusing_threshold = 0;
3149
3150    commit_opt = (opt)?FDB_COMMIT_NORMAL:FDB_COMMIT_MANUAL_WAL_FLUSH;
3151
3152    kvs_config = fdb_get_default_kvs_config();
3153
3154    s = fdb_open(&db_file, "./compact_test", &config);
3155    TEST_CHK(s == FDB_RESULT_SUCCESS);
3156    s = fdb_kvs_open(db_file, &db, "db", &kvs_config);
3157    TEST_CHK(s == FDB_RESULT_SUCCESS);
3158
3159    // === write ===
3160    for (i=0;i<n;++i){
3161        idx = i % (n/2);
3162        sprintf(key, keystr, idx);
3163        memset(value, 'x', value_len);
3164        memcpy(value + value_len - 6, "<end>", 6);
3165        if (i < (n/2)) {
3166            sprintf(value, valuestr, idx);
3167        } else {
3168            sprintf(value, valuestr2, idx);
3169        }
3170        s = fdb_set_kv(db, key, strlen(key)+1, value, value_len);
3171        TEST_CHK(s == FDB_RESULT_SUCCESS);
3172
3173        if (opt == 2) {
3174            // HB+trie, WAL, HB+trie, WAL...
3175            commit_opt = (i%2)?FDB_COMMIT_NORMAL:FDB_COMMIT_MANUAL_WAL_FLUSH;
3176        } else if (opt == 3) {
3177            // WAL, HB+trie, WAL, HB+trie...
3178            commit_opt = (i%2)?FDB_COMMIT_MANUAL_WAL_FLUSH:FDB_COMMIT_NORMAL;
3179        }
3180
3181        s = fdb_commit(db_file, commit_opt);
3182        TEST_CHK(s == FDB_RESULT_SUCCESS);
3183        s = fdb_get_kvs_info(db, &kvs_info);
3184        TEST_CHK(s == FDB_RESULT_SUCCESS);
3185    }
3186
3187    s = fdb_get_all_snap_markers(db_file, &markers, &n_markers);
3188    TEST_CHK(s == FDB_RESULT_SUCCESS);
3189
3190    int upto = n_markers/2;
3191    s = fdb_compact_upto(db_file, "./compact_test2", markers[upto].marker);
3192    TEST_CHK(s == FDB_RESULT_SUCCESS);
3193
3194    // iterating using snapshots with various seqnums
3195    for (i=n_markers-1; i>=0; --i) {
3196        seqnum = markers[i].kvs_markers->seqnum;
3197
3198        s = fdb_snapshot_open(db, &snap, seqnum);
3199        if (i<=upto) {
3200            TEST_CHK(s == FDB_RESULT_SUCCESS);
3201        } else {
3202            // seqnum < (n/2) must fail
3203            TEST_CHK(s != FDB_RESULT_SUCCESS);
3204            continue;
3205        }
3206
3207        s = fdb_iterator_init(snap, &fit, NULL, 0, NULL, 0, 0);
3208        TEST_CHK(s == FDB_RESULT_SUCCESS);
3209        c = 0;
3210        do {
3211            doc = NULL;
3212            s = fdb_iterator_get(fit, &doc);
3213            if (s != FDB_RESULT_SUCCESS) break;
3214            idx = c;
3215            sprintf(key, keystr, idx);
3216            memset(value, 'x', value_len);
3217            memcpy(value + value_len - 6, "<end>", 6);
3218            if ((fdb_seqnum_t)c >= seqnum-(n/2)) {
3219                sprintf(value, valuestr, idx);
3220            } else {
3221                sprintf(value, valuestr2, idx);
3222            }
3223            TEST_CMP(doc->key, key, doc->keylen);
3224            TEST_CMP(doc->body, value, doc->bodylen);
3225            c++;
3226            fdb_doc_free(doc);
3227        } while(fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
3228        s = fdb_iterator_close(fit);
3229        TEST_CHK(s == FDB_RESULT_SUCCESS);
3230        TEST_CHK(c == (n/2));
3231
3232        s = fdb_kvs_close(snap);
3233        TEST_CHK(s == FDB_RESULT_SUCCESS);
3234    }
3235
3236    // iterating using the original handle
3237    s = fdb_iterator_init(db, &fit, NULL, 0, NULL, 0, 0);
3238    TEST_CHK(s == FDB_RESULT_SUCCESS);
3239    c = 0;
3240    do {
3241        doc = NULL;
3242        s = fdb_iterator_get(fit, &doc);
3243        if (s != FDB_RESULT_SUCCESS) break;
3244        idx = c;
3245        sprintf(key, keystr, idx);
3246        memset(value, 'x', value_len);
3247        memcpy(value + value_len - 6, "<end>", 6);
3248        sprintf(value, valuestr2, idx);
3249        TEST_CMP(doc->key, key, doc->keylen);
3250        TEST_CMP(doc->body, value, doc->bodylen);
3251        c++;
3252        fdb_doc_free(doc);
3253    } while(fdb_iterator_next(fit) == FDB_RESULT_SUCCESS);
3254    s = fdb_iterator_close(fit);
3255    TEST_CHK(s == FDB_RESULT_SUCCESS);
3256    TEST_CHK(c == (n/2));
3257
3258    s = fdb_free_snap_markers(markers, n_markers);
3259    TEST_CHK(s == FDB_RESULT_SUCCESS);
3260    free(value);
3261    s = fdb_close(db_file);
3262    TEST_CHK(s == FDB_RESULT_SUCCESS);
3263    s = fdb_shutdown();
3264    TEST_CHK(s == FDB_RESULT_SUCCESS);
3265
3266    memleak_end();
3267
3268    sprintf(cmd, "compact upto overwrite test");
3269    if (opt == 0) {
3270        strcat(cmd, " (HB+trie)");
3271    } else if (opt == 1) {
3272        strcat(cmd, " (WAL)");
3273    } else if (opt == 2) {
3274        strcat(cmd, " (mixed, HB+trie/WAL)");
3275    } else if (opt == 3) {
3276        strcat(cmd, " (mixed, WAL/HB+trie)");
3277    }
3278    TEST_RESULT(cmd);
3279}
3280static int compaction_cb_get(fdb_file_handle *fhandle,
3281                         fdb_compaction_status status, const char *kv_name,
3282                         fdb_doc *doc, uint64_t old_offset,
3283                         uint64_t new_offset, void *ctx)
3284{
3285    TEST_INIT();
3286    fdb_status s;
3287    struct cb_args *args = (struct cb_args *)ctx;
3288    fdb_kvs_handle *snap_db;
3289    fdb_snapshot_info_t *markers;
3290    uint64_t num_markers;
3291    fdb_seqnum_t seqno;
3292
3293    (void) doc;
3294    (void) old_offset;
3295    (void) new_offset;
3296
3297    if (status == FDB_CS_MOVE_DOC) {
3298        TEST_CHK(kv_name);
3299    } else {
3300        TEST_CHK(!kv_name);
3301    }
3302
3303    // use snap markers
3304    s = fdb_get_all_snap_markers(fhandle, &markers, &num_markers);
3305    TEST_CHK(s == FDB_RESULT_SUCCESS);
3306    seqno = markers[0].kvs_markers[0].seqnum;
3307
3308    // snapshot open
3309    s = fdb_snapshot_open(args->handle, &snap_db, seqno);
3310    TEST_CHK(s==FDB_RESULT_SUCCESS);
3311    s = fdb_kvs_close(snap_db);
3312    TEST_CHK(s==FDB_RESULT_SUCCESS);
3313    return 0;
3314}
3315
3316void compact_with_snapshot_open_test()
3317{
3318  TEST_INIT();
3319  memleak_start();
3320
3321  int i, r;
3322  int n = 100000;
3323  char keybuf[256], bodybuf[256];
3324  fdb_file_handle *dbfile;
3325  fdb_kvs_handle *db, *db2, *snap_db;
3326  fdb_status s;
3327  fdb_config fconfig = fdb_get_default_config();
3328  fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3329  struct cb_args cb_args;
3330
3331  memset(&cb_args, 0x0, sizeof(struct cb_args));
3332  fconfig.wal_threshold = 1024;
3333  fconfig.flags = FDB_OPEN_FLAG_CREATE;
3334  fconfig.compaction_cb = compaction_cb_get;
3335  fconfig.compaction_cb_ctx = &cb_args;
3336  fconfig.compaction_cb_mask = FDB_CS_BEGIN |
3337                               FDB_CS_MOVE_DOC |
3338                               FDB_CS_FLUSH_WAL |
3339                               FDB_CS_END;
3340  // remove previous compact_test files
3341  r = system(SHELL_DEL" compact_test* > errorlog.txt");
3342  (void)r;
3343
3344  // open two handles for kvs
3345  fdb_open(&dbfile, "./compact_test1", &fconfig);
3346  fdb_kvs_open(dbfile, &db, "db", &kvs_config);
3347  fdb_kvs_open(dbfile, &db2, "db", &kvs_config);
3348  for (i=0;i<n;++i){
3349      sprintf(keybuf, "key%04d", i);
3350      sprintf(bodybuf, "body%04d", i);
3351      s = fdb_set_kv(db, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3352      TEST_CHK(s == FDB_RESULT_SUCCESS);
3353  }
3354
3355
3356  // commit
3357  s = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
3358  TEST_CHK(s == FDB_RESULT_SUCCESS);
3359
3360  // point compact callback handle to db2
3361  cb_args.handle = db2;
3362
3363  // compact
3364  s = fdb_compact(dbfile, NULL);
3365  TEST_CHK(s == FDB_RESULT_SUCCESS);
3366
3367  // open compaction end
3368  s = fdb_snapshot_open(db2, &snap_db, n);
3369  TEST_CHK(s == FDB_RESULT_SUCCESS);
3370
3371  fdb_kvs_close(snap_db);
3372  s = fdb_close(dbfile);
3373  TEST_CHK(s == FDB_RESULT_SUCCESS);
3374  s = fdb_shutdown();
3375  TEST_CHK(s == FDB_RESULT_SUCCESS);
3376  TEST_RESULT("compact with snapshot_open test");
3377}
3378
3379static int compaction_cb_markers(fdb_file_handle *fhandle,
3380                         fdb_compaction_status status, const char *kv_name,
3381                         fdb_doc *doc, uint64_t old_offset,
3382                         uint64_t new_offset, void *ctx)
3383{
3384    TEST_INIT();
3385    uint64_t i, j;
3386    fdb_status s;
3387    fdb_kvs_handle *db, *snap_db;
3388    fdb_snapshot_info_t *markers;
3389    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3390    fdb_seqnum_t seqno;
3391    uint64_t num_markers;
3392    struct cb_args *args = (struct cb_args *)ctx;
3393    uint64_t n = (uint64_t) args->n_moved_docs;
3394
3395    (void) status;
3396    (void) doc;
3397    (void) old_offset;
3398    (void) new_offset;
3399
3400    // get snap markers
3401    s = fdb_get_all_snap_markers(fhandle, &markers, &num_markers);
3402    TEST_CHK(s == FDB_RESULT_SUCCESS);
3403    TEST_CHK(markers[0].num_kvs_markers == 5);
3404    seqno = markers[0].kvs_markers[0].seqnum;
3405    TEST_CHK(seqno == n);
3406
3407    // snapshot open for each kvs for each marker
3408    for(j=0;j<num_markers;++j){
3409        for (i=0;i<(uint64_t)markers[j].num_kvs_markers;++i){
3410            // open kv for this marker
3411            fdb_kvs_open(fhandle, &db,
3412                         markers[j].kvs_markers[i].kv_store_name, &kvs_config);
3413            // snapshot the kv
3414            s = fdb_snapshot_open(db, &snap_db, seqno);
3415            TEST_CHK(s==FDB_RESULT_SUCCESS);
3416
3417            s = fdb_kvs_close(snap_db);
3418            TEST_CHK(s==FDB_RESULT_SUCCESS);
3419            s = fdb_kvs_close(db);
3420            TEST_CHK(s==FDB_RESULT_SUCCESS);
3421        }
3422    }
3423    return 0;
3424}
3425
3426void compact_with_snapshot_open_multi_kvs_test()
3427{
3428    TEST_INIT();
3429    memleak_start();
3430
3431    int i, j, r;
3432    int n = 1000;
3433    char keybuf[256], bodybuf[256];
3434    fdb_file_handle *dbfile;
3435    fdb_kvs_handle *db1, *db2, *db3, *db4, *db5;
3436    fdb_status s;
3437    fdb_config fconfig = fdb_get_default_config();
3438    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
3439    struct cb_args cb_args;
3440
3441    memset(&cb_args, 0x0, sizeof(struct cb_args));
3442
3443    fconfig.wal_threshold = 1024;
3444    fconfig.flags = FDB_OPEN_FLAG_CREATE;
3445    fconfig.compaction_cb = compaction_cb_markers;
3446    fconfig.compaction_cb_ctx = &cb_args;
3447    fconfig.compaction_cb_mask = FDB_CS_BEGIN |
3448                                 FDB_CS_MOVE_DOC |
3449                                 FDB_CS_FLUSH_WAL |
3450                                 FDB_CS_END;
3451
3452    // remove previous compact_test files
3453    r = system(SHELL_DEL" compact_test* > errorlog.txt");
3454    (void)r;
3455
3456    // open two handles for kvs
3457    fdb_open(&dbfile, "./compact_test1", &fconfig);
3458    fdb_kvs_open(dbfile, &db1, "db1", &kvs_config);
3459    fdb_kvs_open(dbfile, &db2, "db2", &kvs_config);
3460    fdb_kvs_open(dbfile, &db3, "db3", &kvs_config);
3461    fdb_kvs_open(dbfile, &db4, "db4", &kvs_config);
3462    fdb_kvs_open(dbfile, &db5, "db5", &kvs_config);
3463    for (j=0;j<5;++j){
3464        for (i=0;i<n;++i){
3465            sprintf(keybuf, "key%04d", i);
3466            sprintf(bodybuf, "body%04d", i);
3467            fdb_set_kv(db1, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3468            fdb_set_kv(db2, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3469            fdb_set_kv(db3, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3470            fdb_set_kv(db4, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
3471            fdb_set_kv(db5, keybuf,