1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdint.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <signal.h>
25#include <unistd.h>
26#endif
27
28#include "test.h"
29#include "filemgr.h"
30#include "staleblock.h"
31#include "internal_types.h"
32#include "libforestdb/forestdb.h"
33#include "functional_util.h"
34
35/** Basic verification that stale blocks are being reused and file size
36 * does not explode with heavy updates
37 */
38void verify_stale_block_reuse_test() {
39    TEST_INIT();
40
41    int i, r;
42    fdb_file_handle* dbfile;
43    fdb_kvs_handle* db, *lazy;
44    fdb_status status;
45    fdb_config fconfig = fdb_get_default_config();
46    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
47    size_t fileSize1, fileSize2;
48
49    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
50    (void)r;
51
52    // init
53    fconfig.compaction_threshold = 0;
54
55    fconfig.block_reusing_threshold = 20;
56    status = fdb_open(&dbfile, (char *)"./staleblktest1", &fconfig);
57    TEST_STATUS(status);
58    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
59    TEST_STATUS(status);
60
61    size_t valuesize = 10240; // 10K buffer
62    char *val = new char[valuesize]();
63    const char *key = "key";
64    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
65    for (i = 0; i < 3000; ++i) { // 3000 * 10K
66        status = fdb_set_kv(db, key, strlen(key) + 1, val, valuesize);
67        TEST_STATUS(status);
68        i++;
69        if (!(i % 100)) { // commit 30 times
70            status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
71            TEST_STATUS(status);
72        }
73    }
74
75    fdb_file_info file_info;
76    status = fdb_get_file_info(dbfile, &file_info);
77    TEST_STATUS(status);
78    fileSize1 = file_info.file_size; // record file size first
79
80    // open a 'lazy' handle that does not issue any forestdb calls..
81    status = fdb_kvs_open(dbfile, &lazy, "lazy", &kvs_config);
82    TEST_STATUS(status);
83
84    // 100 commits which should all reuse stale blocks
85    for (i = 0; i < 1000; ++i) {
86        status = fdb_set_kv(db, key, strlen(key) + 1,
87                            val, 10);
88        TEST_STATUS(status);
89        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
90        TEST_STATUS(status);
91    }
92
93    status = fdb_get_file_info(dbfile, &file_info);
94    TEST_STATUS(status);
95    fileSize2 = file_info.file_size; // record file size first
96    // ensure file size has not exploded and is within 30% of original filesize
97    TEST_CHK(double(fileSize2) < double(fileSize1) * 1.3);
98
99    // cleanup
100    delete [] val;
101    status = fdb_close(dbfile);
102    TEST_STATUS(status);
103    status = fdb_shutdown();
104    TEST_STATUS(status);
105
106    TEST_RESULT("basic stale block reuse test");
107}
108
109/*
110 * Verify that blocks can be reclaimed when
111 * default block reuse threshold is used.
112 *
113 * When threshold is 0 or 100 blocks should
114 * not be reusable
115 */
116void verify_staleblock_reuse_param_test() {
117    TEST_INIT();
118    memleak_start();
119
120    uint64_t i;
121    int r;
122    const int kv = 512;
123    char keybuf[kv];
124    char bodybuf[kv];
125    fdb_status status;
126    fdb_file_handle *dbfile;
127    fdb_kvs_handle *db;
128    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
129    sb_decision_t sb_decision;
130    fdb_file_info file_info;
131
132    // set block reuse threshold = 65
133    fdb_config fconfig = fdb_get_default_config();
134    fconfig.compaction_threshold = 0;
135    fconfig.block_reusing_threshold = 65;
136
137start_data_loading:
138    // remove previous staleblktest files
139    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
140    (void)r;
141
142    fdb_open(&dbfile, "./staleblktest1", &fconfig);
143    fdb_kvs_open_default(dbfile, &db, &kvs_config);
144
145    // create num_keeping_headers+1
146    for (i = 0; i < fconfig.num_keeping_headers + 1; i++) {
147        sprintf(keybuf, "key");
148        status = fdb_set_kv(db, keybuf, kv, NULL, 0);
149        TEST_STATUS(status);
150        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
151        TEST_STATUS(status);
152    }
153
154    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
155    i = 0;
156    do {
157        sprintf(keybuf, "key");
158        sprintf(bodybuf, "body%d", static_cast<int>(i));
159        status = fdb_set_kv(db, keybuf, kv, bodybuf, kv);
160        TEST_STATUS(status);
161        i++;
162        status = fdb_get_file_info(dbfile, &file_info);
163        TEST_STATUS(status);
164    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
165
166    // expect block reclaim only for valid threshold value
167    sb_decision = sb_check_block_reusing(db);
168    if (fconfig.block_reusing_threshold == 65) {
169        TEST_CHK(sb_decision == SBD_RECLAIM);
170    } else {
171        TEST_CHK(sb_decision == SBD_NONE);
172    }
173
174    // intermediate cleanup
175    status = fdb_close(dbfile);
176    TEST_STATUS(status);
177    fdb_shutdown();
178
179    // run again with different config
180    if (fconfig.block_reusing_threshold == 65) {
181        // disable with 0
182        fconfig.block_reusing_threshold = 0;
183        goto start_data_loading;
184    } else if (fconfig.block_reusing_threshold == 0) {
185        // disable with 100
186        fconfig.block_reusing_threshold = 100;
187        goto start_data_loading;
188    }
189
190    memleak_end();
191    TEST_RESULT("verify staleblock reuse param test");
192}
193
194void fillstr(char *str, char c, int n) {
195    int i;
196    for (i = 0; i < n - 1; ++i) {
197        str[i] = c;
198    }
199    str[i] = '\0';
200}
201
202/*
203 * verify_staleblock_reuse_param_test:
204 *     create 2*num_keeping_headers and open snapshot
205 *     at halfway point. enter block reuse state
206 *     verify snapshot data remains unaffected
207 */
208void reuse_with_snapshot_test() {
209    TEST_INIT();
210    memleak_start();
211
212    int i, n, r;
213    const int kv = 512;
214    char keybuf[kv];
215    char bodybuf[kv];
216    void *rvalue;
217    size_t rvalue_len;
218    fdb_status status;
219    fdb_file_handle *dbfile;
220    fdb_kvs_handle *db, *snap_db;
221    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
222    sb_decision_t sb_decision;
223    fdb_file_info file_info;
224
225    fdb_config fconfig = fdb_get_default_config();
226    fconfig.compaction_threshold = 0;
227    fconfig.block_reusing_threshold = 65;
228    fconfig.num_keeping_headers = 10;
229
230    // remove previous staleblktest files
231    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
232    (void)r;
233
234    fdb_open(&dbfile, "./staleblktest1", &fconfig);
235    fdb_kvs_open_default(dbfile, &db, &kvs_config);
236
237    // create 2*num_keeping_headers
238    n = 2 * fconfig.num_keeping_headers;
239    for (i = 0; i < n; i++) {
240        fillstr(keybuf, 'k', kv);
241        sprintf(bodybuf, "orig_body%d", i+1);
242        status = fdb_set_kv(db, keybuf, strlen(keybuf),
243                            bodybuf, strlen(bodybuf) + 1);
244        TEST_STATUS(status);
245        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
246        TEST_STATUS(status);
247    }
248
249    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
250    i = 0;
251    do {
252        fillstr(keybuf, 'r', kv);
253        fillstr(bodybuf, 'y', kv);
254        status = fdb_set_kv(db, keybuf, strlen(keybuf),
255                            bodybuf, strlen(bodybuf) + 1);
256        TEST_STATUS(status);
257        i++;
258        status = fdb_get_file_info(dbfile, &file_info);
259        TEST_STATUS(status);
260    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
261
262    sb_decision = sb_check_block_reusing(db);
263    TEST_CHK(sb_decision == SBD_RECLAIM);
264
265    // open snapshot
266    status = fdb_snapshot_open(db, &snap_db, 8);
267    TEST_STATUS(status);
268
269    // commit
270    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
271    TEST_STATUS(status);
272
273    // write until blocks are no longer being reused
274    i = 0;
275    do {
276        sprintf(keybuf, "key%d", i);
277        fillstr(bodybuf, 'z', kv);
278        status = fdb_set_kv(db, keybuf, strlen(keybuf),
279                            bodybuf, strlen(bodybuf));
280        TEST_STATUS(status);
281        sb_decision = sb_check_block_reusing(db);
282        i++;
283    } while (sb_decision != SBD_NONE);
284
285    // delete original keys
286    n = 2 * fconfig.num_keeping_headers;
287    for (i = 0; i < n; i++) {
288        fillstr(keybuf, 'k', kv);
289        status = fdb_del_kv(db, keybuf, strlen(keybuf));
290        TEST_STATUS(status);
291    }
292
293    // commit
294    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
295    TEST_STATUS(status);
296
297    fillstr(keybuf, 'k', kv);
298    sprintf(bodybuf, "orig_body%d", 8);
299
300    // check key does not exist in main kv
301    status = fdb_get_kv(db, keybuf, strlen(keybuf),
302                        &rvalue, &rvalue_len);
303    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
304
305    // check still exists in snapshot data
306    status = fdb_get_kv(snap_db, keybuf, strlen(keybuf),
307                        &rvalue, &rvalue_len);
308    TEST_STATUS(status);
309    TEST_CMP(rvalue, bodybuf, rvalue_len);
310
311    status = fdb_free_block(rvalue);
312    TEST_STATUS(status);
313    status = fdb_kvs_close(snap_db);
314    TEST_STATUS(status);
315    status = fdb_kvs_close(db);
316    TEST_STATUS(status);
317    status = fdb_close(dbfile);
318    TEST_STATUS(status);
319    fdb_shutdown();
320
321    memleak_end();
322    TEST_RESULT("reuse with snapshot test");
323}
324
325void verify_minimum_num_keeping_headers_param_test() {
326    memleak_start();
327    TEST_INIT();
328
329    int i, r;
330    fdb_file_handle* dbfile;
331    fdb_kvs_handle* db;
332    fdb_kvs_handle* snap_db;
333    fdb_status status;
334    fdb_config fconfig = fdb_get_default_config();
335    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
336    fdb_file_info file_info;
337
338    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
339    (void)r;
340
341    // init
342    fconfig.compaction_threshold = 0;
343
344    // open with zero num_keeping_headers parameter
345    fconfig.num_keeping_headers = 0;
346    status = fdb_open(&dbfile, (char *)"./staleblktest1", &fconfig);
347    // should fail
348    TEST_CHK(status != FDB_RESULT_SUCCESS);
349
350    // open with single keeping header
351    fconfig.num_keeping_headers = 1;
352    status = fdb_open(&dbfile, (char *)"./staleblktest1", &fconfig);
353    TEST_STATUS(status);
354    status = fdb_kvs_open(dbfile, &db, (char *)"num_keep", &kvs_config);
355    TEST_STATUS(status);
356
357    const char *key = "key";
358    const char *val = "val";
359    // 10 commits
360    for (i = 0; i < 10; ++i) {
361        status = fdb_set_kv(db, key, strlen(key) + 1,
362                            val, strlen(val) + 1);
363        TEST_STATUS(status);
364        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
365        TEST_STATUS(status);
366    }
367
368    // open snapshot on 9th commit
369    status = fdb_snapshot_open(db, &snap_db, 9);
370    TEST_STATUS(status);
371
372    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
373    i = 0;
374    do {
375        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
376        TEST_STATUS(status);
377        i++;
378        status = fdb_get_file_info(dbfile, &file_info);
379        TEST_STATUS(status);
380    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
381
382    // close snapshot (reader)
383    status = fdb_kvs_close(snap_db);
384    TEST_STATUS(status);
385
386    // 11th commit causes reclaim and loss of 9th header
387    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
388    TEST_STATUS(status);
389
390    // attempt to open new snapshot on 9th commit
391    status = fdb_snapshot_open(db, &snap_db, 9);
392    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
393
394    // open new snapshot on 10th commit
395    status = fdb_snapshot_open(db, &snap_db, 10);
396    TEST_STATUS(status);
397
398    // cleanup
399    status = fdb_close(dbfile);
400    TEST_STATUS(status);
401    status = fdb_shutdown();
402    TEST_STATUS(status);
403
404    memleak_end();
405    TEST_RESULT("verify minimum num keeping headers param test");
406}
407
408void verify_high_num_keeping_headers_param_test() {
409    memleak_start();
410    TEST_INIT();
411
412    int i, r;
413    int low_seq = 0;
414    int nheaders=100;
415    char keybuf[16];
416    void *rvalue;
417    size_t rvalue_len;
418
419    fdb_file_handle* dbfile;
420    fdb_kvs_handle* db;
421    fdb_kvs_handle* snap_db;
422    fdb_status status;
423    fdb_config fconfig = fdb_get_default_config();
424    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
425    fdb_file_info file_info;
426
427    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
428    (void)r;
429
430    // init
431    fconfig.compaction_threshold = 0;
432    fconfig.num_keeping_headers = nheaders;
433    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
434    TEST_STATUS(status);
435    status = fdb_kvs_open(dbfile, &db, "num_keep", &kvs_config);
436    TEST_STATUS(status);
437
438    const char *key = "key";
439    const char *val = "val";
440
441    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
442    i = 0;
443    do {
444        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
445        TEST_STATUS(status);
446        i++;
447        status = fdb_get_file_info(dbfile, &file_info);
448        TEST_STATUS(status);
449    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
450
451    // create lowest commit
452    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
453    TEST_STATUS(status);
454    low_seq = i;
455
456    // create 100 headers
457    for (i = 0; i < nheaders; ++i) {
458        sprintf(keybuf, "key%d", i);
459        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu", 4);
460        TEST_STATUS(status);
461        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
462        TEST_STATUS(status);
463    }
464
465    // make sure all blocks up to kept headers are reused
466    i = low_seq;
467    while (--i) {
468        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
469        TEST_STATUS(status);
470    }
471
472    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
473    TEST_STATUS(status);
474
475    // -101 commit fail
476    status = fdb_snapshot_open(db, &snap_db, low_seq);
477    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
478
479    // -100 commit pass
480    low_seq++;
481    status = fdb_snapshot_open(db, &snap_db, low_seq);
482    TEST_STATUS(status);
483
484    status = fdb_get_kv(db, keybuf, strlen(keybuf), &rvalue, &rvalue_len);
485    TEST_STATUS(status);
486
487    // cleanup
488    status = fdb_free_block(rvalue);
489    TEST_STATUS(status);
490    status = fdb_kvs_close(snap_db);
491    TEST_STATUS(status);
492    status = fdb_close(dbfile);
493    TEST_STATUS(status);
494    status = fdb_shutdown();
495    TEST_STATUS(status);
496
497    memleak_end();
498    TEST_RESULT("verify high num keeping headers param test");
499}
500
501void snapshot_before_block_reuse_test(bool inmem) {
502    memleak_start();
503    TEST_INIT();
504
505    int i, j, n, r;
506    void *rvalue;
507    size_t rvalue_len;
508    char bodybuf[256];
509    fdb_file_handle* dbfile;
510    fdb_kvs_handle *db, *snap_db;
511    fdb_status status;
512    fdb_config fconfig = fdb_get_default_config();
513    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
514    sb_decision_t sb_decision;
515    fdb_file_info file_info;
516
517    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
518    (void)r;
519
520    fconfig.num_keeping_headers = 1;
521    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
522    TEST_STATUS(status);
523    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
524    TEST_STATUS(status);
525
526    const char *key = "key";
527    const char *val = "snp";
528
529    // set key
530    status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
531    TEST_STATUS(status);
532
533    // commit
534    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
535    TEST_STATUS(status);
536
537    // open snapshot
538    if (inmem) {
539        status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
540        TEST_STATUS(status);
541    } else {
542        status = fdb_snapshot_open(db, &snap_db, 1);
543        TEST_STATUS(status);
544    }
545
546    val = "val";
547
548    // initial load to reuse
549    i = 0;
550    do {
551        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
552        TEST_STATUS(status);
553        i++;
554        status = fdb_get_file_info(dbfile, &file_info);
555        TEST_STATUS(status);
556    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
557
558    // 5 update cycles of created items
559    for (n = 0; n < 5; n++) {
560        for(j = 0; j < i; j++) {
561            status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
562            TEST_STATUS(status);
563        }
564        // commit
565        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
566        TEST_STATUS(status);
567    }
568
569    // verify in reclaim mode
570    sb_decision = sb_check_block_reusing(db);
571    TEST_CHK(sb_decision == SBD_RECLAIM);
572
573    // expect pre-reuse data retained
574    status = fdb_get_kv(snap_db, key, strlen(key) + 1, &rvalue, &rvalue_len);
575    TEST_STATUS(status);
576    TEST_CMP(rvalue, (char *)"snp", rvalue_len);
577
578    // close snapshot
579    status = fdb_kvs_close(snap_db);
580    TEST_STATUS(status);
581
582    // do another update cycle
583    for (j = 0; j < i; j++) {
584        // commit
585        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
586        TEST_STATUS(status);
587    }
588    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
589    TEST_STATUS(status);
590
591    // attempt to open again will fail
592    status = fdb_snapshot_open(db, &snap_db, 1);
593    if (status == FDB_RESULT_SUCCESS) {
594        // close incase kv was unexpectedly opened
595        status = fdb_kvs_close(snap_db);
596        TEST_STATUS(status);
597    }
598    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
599
600    status = fdb_free_block(rvalue);
601    TEST_STATUS(status);
602    status = fdb_kvs_close(db);
603    TEST_STATUS(status);
604    status = fdb_close(dbfile);
605    TEST_STATUS(status);
606    fdb_shutdown();
607
608    memleak_end();
609
610    sprintf(bodybuf,"snapshot before block reuse test %s", inmem ?
611            "in-mem snapshot" : "disk snapshot");
612    TEST_RESULT(bodybuf);
613}
614
615void snapshot_after_block_reuse_test() {
616    memleak_start();
617    TEST_INIT();
618
619    int i, r;
620    int low_seq = 0;
621    int nheaders=5;
622    char keybuf[16];
623
624    fdb_file_handle* dbfile;
625    fdb_kvs_handle* db;
626    fdb_kvs_handle* snap_db;
627    fdb_status status;
628    fdb_config fconfig = fdb_get_default_config();
629    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
630    fdb_file_info file_info;
631
632    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
633    (void)r;
634
635    // init
636    fconfig.compaction_threshold = 0;
637    fconfig.num_keeping_headers = nheaders;
638    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
639    TEST_STATUS(status);
640    status = fdb_kvs_open(dbfile, &db, "num_keep", &kvs_config);
641    TEST_STATUS(status);
642
643    const char *key = "key";
644    const char *val = "val";
645
646    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
647    i = 0;
648    do {
649        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
650        TEST_STATUS(status);
651        i++;
652        status = fdb_get_file_info(dbfile, &file_info);
653        TEST_STATUS(status);
654    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
655
656    // create lowest commit
657    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
658    TEST_STATUS(status);
659    low_seq = i;
660
661    // create nheaders
662    for (i = 0; i < nheaders + 5; ++i) {
663        sprintf(keybuf, "key%d", i);
664        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu", 4);
665        TEST_STATUS(status);
666        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
667        TEST_STATUS(status);
668    }
669
670    // make sure all blocks up to kept headers are reused
671    i = low_seq;
672    while (--i) {
673        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
674        TEST_STATUS(status);
675    }
676    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
677    TEST_STATUS(status);
678
679    // open snapshot on lowest seqno available
680    low_seq += 6;
681    status = fdb_snapshot_open(db, &snap_db, low_seq);
682    TEST_STATUS(status);
683    status = fdb_kvs_close(snap_db);
684    TEST_STATUS(status);
685
686    // open snapshot using seqno already reclaimed
687    status = fdb_snapshot_open(db, &snap_db, low_seq-1);
688    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
689
690    status = fdb_kvs_close(db);
691    TEST_STATUS(status);
692    status = fdb_close(dbfile);
693    TEST_STATUS(status);
694    fdb_shutdown();
695
696    memleak_end();
697    TEST_RESULT("snapshot after block reuse test");
698}
699
700void snapshot_inmem_before_block_reuse_test() {
701    memleak_start();
702    TEST_INIT();
703
704    int i, j, n, r;
705    void *rvalue;
706    size_t rvalue_len;
707    fdb_file_handle* dbfile;
708    fdb_kvs_handle *db, *snap_db;
709    fdb_status status;
710    fdb_config fconfig = fdb_get_default_config();
711    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
712    sb_decision_t sb_decision;
713    fdb_file_info file_info;
714
715    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
716    (void)r;
717
718    fconfig.num_keeping_headers = 1;
719    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
720    TEST_STATUS(status);
721    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
722    TEST_STATUS(status);
723
724    const char* key = "key";
725    const char* val = "snp";
726
727    // set key
728    status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
729    TEST_STATUS(status);
730
731    // commit
732    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
733    TEST_STATUS(status);
734
735    // open snapshot
736    status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
737    TEST_STATUS(status);
738
739    val = "val";
740
741    // initial load to reuse
742    i = 0;
743    do {
744        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
745        TEST_STATUS(status);
746        i++;
747        status = fdb_get_file_info(dbfile, &file_info);
748        TEST_STATUS(status);
749    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
750
751    // 5 update cycles of created items
752    for (n = 0; n < 5; n++) {
753        for (j = 0; j < i; j++) {
754            status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
755            TEST_STATUS(status);
756        }
757        // commit
758        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
759        TEST_STATUS(status);
760    }
761
762    // verify in reclaim mode
763    sb_decision = sb_check_block_reusing(db);
764    TEST_CHK(sb_decision == SBD_RECLAIM);
765
766    // expect pre-reuse data retained
767    status = fdb_get_kv(snap_db, key, strlen(key) + 1, &rvalue, &rvalue_len);
768    TEST_STATUS(status);
769    TEST_CMP(rvalue, (char *)"snp", rvalue_len);
770
771    // close snapshot
772    status = fdb_kvs_close(snap_db);
773    TEST_STATUS(status);
774
775    // do another update cycle
776    for (j = 0; j < i; j++) {
777        // commit
778        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
779        TEST_STATUS(status);
780    }
781    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
782    TEST_STATUS(status);
783
784    // attempt to open again will fail
785    status = fdb_snapshot_open(db, &snap_db, 1);
786    if (status == FDB_RESULT_SUCCESS) {
787        // close incase kv was unexpectedly opened
788        status = fdb_kvs_close(snap_db);
789        TEST_STATUS(status);
790    }
791    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
792
793    status = fdb_free_block(rvalue);
794    TEST_STATUS(status);
795    status = fdb_kvs_close(db);
796    TEST_STATUS(status);
797    status = fdb_close(dbfile);
798    TEST_STATUS(status);
799    fdb_shutdown();
800
801    memleak_end();
802    TEST_RESULT("snapshot inmem before block reuse test");
803}
804
805void variable_value_size_test() {
806    TEST_INIT();
807    memleak_start();
808
809    uint64_t i;
810    int j, n, r;
811    const int ndocs = 3;
812    int blen;
813    char keybuf[256];
814    char *bodybuf = new char[1024 * 5120];
815    void *rvalue;
816    size_t rvalue_len;
817    fdb_status status;
818    fdb_file_handle *dbfile;
819    fdb_kvs_handle *db;
820    fdb_kvs_handle *snap_db, *snap_db2;
821    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
822    sb_decision_t sb_decision;
823    fdb_file_info file_info;
824
825    fdb_config fconfig = fdb_get_default_config();
826    fconfig.compaction_threshold = 0;
827    fconfig.block_reusing_threshold = 65;
828
829    // remove previous staleblktest files
830    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
831    (void)r;
832
833    fdb_open(&dbfile, "./staleblktest1", &fconfig);
834    fdb_kvs_open_default(dbfile, &db, &kvs_config);
835
836    // load with doc sizes 512, 1024, 1536
837    n = 0;
838    for (i = 0; i < fconfig.num_keeping_headers + 1; i++) {
839        for (j = 1; j <= ndocs; j++) {
840            blen = j * 512;
841            sprintf(keybuf, "%d_key", blen);
842            fillstr(bodybuf, 'a', blen);    // Max blen allowed: 1024*5120
843            status = fdb_set_kv(db, keybuf, strlen(keybuf) + 1,
844                                bodybuf, strlen(bodybuf) + 1);
845            TEST_STATUS(status);
846            n++;
847        }
848        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
849        TEST_STATUS(status);
850    }
851
852    // open disk snapshot
853    status = fdb_snapshot_open(db, &snap_db, n);
854    TEST_STATUS(status);
855
856    // update docs with sizes 1024, 2048, 3072
857    i = 0;
858    do {
859        for (j = 1;j <= ndocs; j++) {
860            blen = j * 1024;
861            sprintf(keybuf, "%d_key", blen);
862            fillstr(bodybuf, 'b', blen);
863            status = fdb_set_kv(db, keybuf, strlen(keybuf) + 1,
864                                bodybuf, strlen(bodybuf) + 1);
865            TEST_STATUS(status);
866        }
867        i++;
868        status = fdb_get_file_info(dbfile, &file_info);
869        TEST_STATUS(status);
870    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
871
872    sb_decision = sb_check_block_reusing(db);
873    TEST_CHK(sb_decision == SBD_RECLAIM);
874
875    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
876    TEST_STATUS(status);
877
878    // create in mem snapshot
879    status = fdb_snapshot_open(db, &snap_db2, FDB_SNAPSHOT_INMEM);
880    TEST_STATUS(status);
881
882    // update cycle with small values
883    while (--i) {
884        blen = i * 8;
885        sprintf(keybuf, "%d_key", blen);
886        fillstr(bodybuf, 'c', blen);
887        status = fdb_set_kv(db, keybuf, strlen(keybuf) + 1,
888                            bodybuf, strlen(bodybuf) + 1);
889        TEST_STATUS(status);
890        if ((i % 10) == 0) {
891            status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
892            TEST_STATUS(status);
893        }
894    }
895    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
896    TEST_STATUS(status);
897
898    // write very large value
899    sprintf(keybuf, "%d_key", 512);
900    fillstr(bodybuf, 'a', 1024 * 5120);
901    status = fdb_set_kv(db, keybuf, strlen(keybuf) + 1,
902                        bodybuf, strlen(bodybuf) + 1);
903    TEST_STATUS(status);
904    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
905    TEST_STATUS(status);
906
907    // verify disk snapshot
908    blen = 512;
909    sprintf(keybuf, "%d_key", blen);
910    fillstr(bodybuf, 'a', blen);
911    status = fdb_get_kv(snap_db, keybuf, strlen(keybuf) + 1,
912                        &rvalue, &rvalue_len);
913    TEST_STATUS(status);
914    TEST_CMP(bodybuf, rvalue, rvalue_len);
915    status = fdb_free_block(rvalue);
916    TEST_STATUS(status);
917
918    // verify in mem snapshot
919    blen = 2048;
920    sprintf(keybuf, "%d_key", blen);
921    fillstr(bodybuf, 'b', blen);
922    status = fdb_get_kv(snap_db2, keybuf, strlen(keybuf) + 1,
923                        &rvalue, &rvalue_len);
924    TEST_STATUS(status);
925    TEST_CMP(bodybuf, rvalue, rvalue_len);
926    status = fdb_free_block(rvalue);
927    TEST_STATUS(status);
928
929    delete[] bodybuf;
930
931    // cleanup
932    status = fdb_kvs_close(db);
933    TEST_STATUS(status);
934    status = fdb_kvs_close(snap_db);
935    TEST_STATUS(status);
936    status = fdb_kvs_close(snap_db2);
937    TEST_STATUS(status);
938    status = fdb_close(dbfile);
939    TEST_STATUS(status);
940    fdb_shutdown();
941
942    memleak_end();
943    TEST_RESULT("variable value size test");
944}
945
946void rollback_with_num_keeping_headers() {
947    TEST_INIT();
948    memleak_start();
949
950    int i, n, r;
951    char keybuf[256];
952    char bodybuf[1024];
953    fdb_status status;
954    fdb_file_handle *dbfile;
955    fdb_kvs_handle *db;
956    fdb_kvs_info kvs_info;
957    sb_decision_t sb_decision;
958    fdb_file_info file_info;
959
960    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
961    fdb_config fconfig = fdb_get_default_config();
962    fconfig.compaction_threshold = 0;
963    fconfig.block_reusing_threshold = 65;
964    fconfig.num_keeping_headers = 1;
965
966    // remove previous staleblktest files
967    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
968    (void)r;
969
970    fdb_open(&dbfile, "./staleblktest1", &fconfig);
971    fdb_kvs_open_default(dbfile, &db, &kvs_config);
972
973    // create 10 headers
974    for (i = 0; i < 10; i++) {
975        sprintf(keybuf, "%dkey",i);
976        fillstr(bodybuf, 'b', 512);
977        status = fdb_set_kv(db, keybuf, strlen(keybuf),
978                            bodybuf, strlen(bodybuf));
979        TEST_STATUS(status);
980        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
981        TEST_STATUS(status);
982    }
983
984    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
985    i = 0;
986    do {
987        sprintf(keybuf, "0key");
988        fillstr(bodybuf, 'c', 128);
989        status = fdb_set_kv(db, keybuf, strlen(keybuf),
990                            bodybuf, strlen(bodybuf));
991        TEST_STATUS(status);
992        i++;
993        status = fdb_get_file_info(dbfile, &file_info);
994        TEST_STATUS(status);
995    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
996    n = i;
997
998    // expect block reclaim
999    sb_decision = sb_check_block_reusing(db);
1000    TEST_CHK(sb_decision == SBD_RECLAIM);
1001
1002    // reclaim old header via 11th commit
1003    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1004    TEST_STATUS(status);
1005
1006    // load unique keys to reuse old blocks
1007    for (i = 0; i < n; i++) {
1008        sprintf(keybuf, "%dkey",i);
1009        fillstr(bodybuf, 'd', 512);
1010        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1011                            bodybuf, strlen(bodybuf));
1012        TEST_STATUS(status);
1013    }
1014
1015    // do rollback to 10th headers
1016    status = fdb_rollback(&db, 10);
1017    TEST_STATUS(status);
1018
1019    // expect only 10 docs
1020    status = fdb_get_kvs_info(db, &kvs_info);
1021    TEST_STATUS(status);
1022    TEST_CHK(kvs_info.doc_count == 10);
1023
1024    // cleanup
1025    status = fdb_kvs_close(db);
1026    TEST_STATUS(status);
1027    status = fdb_close(dbfile);
1028    TEST_STATUS(status);
1029    fdb_shutdown();
1030
1031    memleak_end();
1032    TEST_RESULT("rollback with num keeping headers");
1033}
1034
1035void crash_and_recover_with_num_keeping_test() {
1036    TEST_INIT();
1037    memleak_start();
1038
1039    int i, r, ndocs;
1040    int nheaders = 10;
1041    size_t last_seqno;
1042    char keybuf[256];
1043    char bodybuf[512];
1044
1045    fdb_status status;
1046    fdb_file_handle *dbfile;
1047    fdb_kvs_handle *db, *snap_db;
1048    fdb_file_info file_info;
1049    fdb_kvs_info kvs_info;
1050    sb_decision_t sb_decision;
1051
1052    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1053    fdb_config fconfig = fdb_get_default_config();
1054    fconfig.compaction_threshold = 0;
1055    fconfig.block_reusing_threshold = 65;
1056    fconfig.num_keeping_headers = nheaders;
1057
1058    // remove previous staleblktest files
1059    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1060    (void)r;
1061
1062    fdb_open(&dbfile, "./staleblktest1file.1", &fconfig);
1063    fdb_kvs_open(dbfile, &db, "./staleblktest1", &kvs_config);
1064
1065    // create num_keeping_headers+1
1066    for (i = 0; i < nheaders + 1; i++) {
1067        sprintf(keybuf, "%dkey",i);
1068        fillstr(bodybuf, 'b', 512);
1069        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1070                            bodybuf, strlen(bodybuf));
1071        TEST_STATUS(status);
1072        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1073        TEST_STATUS(status);
1074    }
1075
1076    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
1077    i = 0;
1078    do {
1079        sprintf(keybuf, "0key");
1080        fillstr(bodybuf, 'c', 128);
1081        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1082                            bodybuf, strlen(bodybuf));
1083        TEST_STATUS(status);
1084        i++;
1085        status = fdb_get_file_info(dbfile, &file_info);
1086        TEST_STATUS(status);
1087    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
1088    ndocs = i;
1089
1090    // expect block reclaim
1091    sb_decision = sb_check_block_reusing(db);
1092    TEST_CHK(sb_decision == SBD_RECLAIM);
1093
1094    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1095    TEST_STATUS(status);
1096    status = fdb_get_kvs_info(db, &kvs_info);
1097    TEST_STATUS(status);
1098    last_seqno = kvs_info.last_seqnum;
1099
1100    // create num_keeping_headers
1101    for (i = 0; i < nheaders; i++) {
1102        sprintf(keybuf, "%dkey",i);
1103        fillstr(bodybuf, 'd', 64);
1104        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1105                            bodybuf, strlen(bodybuf));
1106        TEST_STATUS(status);
1107        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1108        TEST_STATUS(status);
1109    }
1110
1111    // preemptive shutdown
1112    status = fdb_kvs_close(db);
1113    TEST_STATUS(status);
1114    status = fdb_close(dbfile);
1115    TEST_STATUS(status);
1116    fdb_shutdown();
1117
1118    // reopen
1119    status = fdb_open(&dbfile, "./staleblktest1file.1", &fconfig);
1120    TEST_STATUS(status);
1121    fdb_kvs_open(dbfile, &db, "./staleblktest1", &kvs_config);
1122    TEST_STATUS(status);
1123
1124    status = fdb_get_file_info(dbfile, &file_info);
1125    TEST_STATUS(status);
1126    r = _disk_dump("./staleblktest1file.1", file_info.file_size,
1127                   (2 * fconfig.blocksize) + (fconfig.blocksize / 4));
1128    TEST_CHK(r >= 0);
1129
1130    // snapshot to last keeping header
1131    status = fdb_snapshot_open(db, &snap_db, last_seqno);
1132    TEST_STATUS(status);
1133
1134    // rollback to last keepheader
1135    status = fdb_rollback(&db, last_seqno);
1136    TEST_STATUS(status);
1137
1138    // manual commit
1139    status = fdb_compact(dbfile, "./staleblktest1file.3");
1140    TEST_STATUS(status);
1141
1142    // delete items
1143    for (i = 0; i < nheaders + 1; i++) {
1144        sprintf(keybuf, "%dkey",i);
1145        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1146        TEST_STATUS(status);
1147        // commit
1148        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1149        TEST_STATUS(status);
1150    }
1151
1152    // not reusing blocks
1153    sb_decision = sb_check_block_reusing(db);
1154    TEST_CHK(sb_decision == SBD_NONE);
1155
1156    // append until reuse
1157    for (i = 0; i < ndocs; i++) {
1158        sprintf(keybuf, "0key");
1159        fillstr(bodybuf, 'e', 512);
1160        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1161                            bodybuf, strlen(bodybuf));
1162        TEST_STATUS(status);
1163    }
1164    sb_decision = sb_check_block_reusing(db);
1165    TEST_CHK(sb_decision == SBD_RECLAIM);
1166
1167    status = fdb_kvs_close(snap_db);
1168    TEST_STATUS(status);
1169    status = fdb_kvs_close(db);
1170    TEST_STATUS(status);
1171    status = fdb_close(dbfile);
1172    TEST_STATUS(status);
1173    fdb_shutdown();
1174
1175    memleak_end();
1176    TEST_RESULT("crash and recover with num keeping test");
1177}
1178
1179void reuse_on_delete_test() {
1180    TEST_INIT();
1181    memleak_start();
1182
1183    int i, r, ndocs;
1184    int nheaders = 10;
1185    char keybuf[256];
1186    char bodybuf[512];
1187
1188    fdb_status status;
1189    fdb_file_handle *dbfile;
1190    fdb_kvs_handle *db;
1191    sb_decision_t sb_decision;
1192    fdb_file_info file_info;
1193
1194    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1195    fdb_config fconfig = fdb_get_default_config();
1196    fconfig.compaction_threshold = 0;
1197    fconfig.block_reusing_threshold = 65;
1198    fconfig.num_keeping_headers = nheaders;
1199
1200    // remove previous staleblktest files
1201    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1202    (void)r;
1203
1204    fdb_open(&dbfile, "./staleblktest1", &fconfig);
1205    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1206
1207    // create num_keeping_headers+1
1208    for (i = 0; i < nheaders + 1; i++) {
1209        sprintf(keybuf, "%dkey",i);
1210        fillstr(bodybuf, 'b', 512);
1211        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1212                            bodybuf, strlen(bodybuf));
1213        TEST_STATUS(status);
1214        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1215        TEST_STATUS(status);
1216    }
1217
1218    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
1219    i = 0;
1220    do {
1221        sprintf(keybuf, "%dkey",i);
1222        fillstr(bodybuf, 'c', 128);
1223        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1224                            bodybuf, strlen(bodybuf));
1225        TEST_STATUS(status);
1226        i++;
1227        status = fdb_get_file_info(dbfile, &file_info);
1228        TEST_STATUS(status);
1229    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
1230    ndocs = i;
1231
1232    // expect NO REUSE
1233    sb_decision = sb_check_block_reusing(db);
1234    TEST_CHK(sb_decision == SBD_NONE);
1235    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1236    TEST_STATUS(status);
1237
1238    // delete so that file becomes stale
1239    for (i = 0; i < ndocs; ++i) {
1240        sprintf(keybuf, "%dkey",i);
1241        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1242        TEST_STATUS(status);
1243    }
1244
1245    sb_decision = sb_check_block_reusing(db);
1246    TEST_CHK(sb_decision == SBD_RECLAIM);
1247
1248    // reload 1/4 all keys again and expect no file size growth
1249    // since all docs being reused
1250    for (i = 0; i < ndocs / 4; ++i) {
1251        sprintf(keybuf, "%dkey",i);
1252        fillstr(bodybuf, 'd', 128);
1253        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1254                            bodybuf, strlen(bodybuf));
1255        TEST_STATUS(status);
1256        i++;
1257    }
1258
1259    // expect to still be in reuse mode
1260    sb_decision = sb_check_block_reusing(db);
1261    TEST_CHK(sb_decision == SBD_RECLAIM);
1262
1263    status = fdb_close(dbfile);
1264    TEST_STATUS(status);
1265    status = fdb_shutdown();
1266    TEST_STATUS(status);
1267
1268    memleak_end();
1269    TEST_RESULT("reuse on delete test");
1270}
1271
1272void fragmented_reuse_test() {
1273    TEST_INIT();
1274    memleak_start();
1275
1276    int i, r, ndocs;
1277    int nheaders = 10;
1278    char keybuf[256];
1279    char bodybuf[512];
1280    size_t fpos;
1281
1282    fdb_status status;
1283    fdb_file_handle *dbfile;
1284    fdb_kvs_handle *db;
1285    sb_decision_t sb_decision;
1286    fdb_file_info file_info;
1287    fdb_kvs_info kvs_info;
1288
1289    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1290    fdb_config fconfig = fdb_get_default_config();
1291    fconfig.compaction_threshold = 0;
1292    fconfig.block_reusing_threshold = 35;
1293    fconfig.num_keeping_headers = nheaders;
1294
1295    // remove previous staleblktest files
1296    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1297    (void)r;
1298
1299    fdb_open(&dbfile, "./staleblktest1", &fconfig);
1300    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1301
1302    // satisfy reuse constraints
1303    for (i = 0; i < nheaders + 1; i++) {
1304        sprintf(keybuf, "%dkey",i);
1305        fillstr(bodybuf, 'a', 512);
1306        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1307                            bodybuf, strlen(bodybuf));
1308        TEST_STATUS(status);
1309        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1310        TEST_STATUS(status);
1311    }
1312    i = 0;
1313    do {
1314        sprintf(keybuf, "%dkey",i);
1315        fillstr(bodybuf, 'b', 128);
1316        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1317                            bodybuf, strlen(bodybuf));
1318        TEST_STATUS(status);
1319        i++;
1320        status = fdb_get_file_info(dbfile, &file_info);
1321        TEST_STATUS(status);
1322    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
1323    ndocs = i;
1324
1325    // make 25% of file stale
1326    for (i = 0; i < ndocs / 4; i++) {
1327        sprintf(keybuf, "%dkey",i);
1328        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1329        TEST_STATUS(status);
1330    }
1331
1332    // verify blocks not being reused
1333    sb_decision = sb_check_block_reusing(db);
1334    TEST_CHK(sb_decision == SBD_NONE);
1335
1336    // manual compaction
1337    fpos = filemgr_get_pos(db->file);
1338    status = fdb_compact(dbfile, "staleblktest_compact");
1339    TEST_STATUS(status);
1340    // MB-20091 : refresh db over to new file since compact won't do it
1341    status = fdb_get_kvs_info(db, &kvs_info);
1342    TEST_STATUS(status);
1343    TEST_CHK(kvs_info.doc_count > 0);
1344    TEST_CHK(fpos > filemgr_get_pos(db->file));
1345
1346    // making additional 25% of file stale should NOT go into
1347    // block reuse.  without compaction we would be 50% stale
1348    for (i = ndocs / 4; i < ndocs / 2; i++) {
1349        sprintf(keybuf, "%dkey",i);
1350        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1351        TEST_STATUS(status);
1352    }
1353    sb_decision = sb_check_block_reusing(db);
1354    TEST_CHK(sb_decision == SBD_NONE);
1355
1356    // delete rest of docs
1357    for (i = ndocs / 2; i < ndocs; i++) {
1358        sprintf(keybuf, "%dkey",i);
1359        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1360        TEST_STATUS(status);
1361    }
1362
1363    // restore nheaders again
1364    for (i = 0; i < nheaders + 1; i++) {
1365        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1366        TEST_STATUS(status);
1367    }
1368
1369    // MB-20091: commit is done with internal root handle, sync up 'db' handle
1370    status = fdb_get_kvs_info(db, &kvs_info);
1371    TEST_STATUS(status);
1372    sb_decision = sb_check_block_reusing(db);
1373    TEST_CHK(sb_decision != SBD_NONE);
1374
1375    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1376    TEST_STATUS(status);
1377    status = fdb_close(dbfile);
1378    TEST_STATUS(status);
1379    status = fdb_shutdown();
1380    TEST_STATUS(status);
1381
1382    memleak_end();
1383    TEST_RESULT("fragmented reuse test");
1384}
1385
1386void enter_reuse_via_separate_kvs_test() {
1387    TEST_INIT();
1388    memleak_start();
1389
1390    int i, r, ndocs;
1391    int nheaders = 10;
1392    char keybuf[256];
1393    char bodybuf[512];
1394
1395    fdb_status status;
1396    fdb_file_handle *dbfile;
1397    fdb_kvs_handle *db, *db2;
1398    fdb_doc *rdoc = NULL;
1399    fdb_iterator *iterator;
1400    sb_decision_t sb_decision;
1401    fdb_file_info file_info;
1402
1403    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1404    fdb_config fconfig = fdb_get_default_config();
1405    fconfig.compaction_threshold = 0;
1406    fconfig.block_reusing_threshold = 35;
1407    fconfig.num_keeping_headers = nheaders;
1408
1409    // remove previous staleblktest files
1410    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1411    (void)r;
1412
1413    fdb_open(&dbfile, "./staleblktest1", &fconfig);
1414    fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1415    fdb_kvs_open(dbfile, &db2, "db2", &kvs_config);
1416
1417    // load docs into db and db2
1418    for (i = 0; i < nheaders; i++) {
1419        sprintf(keybuf, "%dkey",i);
1420        fillstr(bodybuf, 'a', 12);
1421        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1422                            bodybuf, strlen(bodybuf));
1423        TEST_STATUS(status);
1424        status = fdb_set_kv(db2, keybuf, strlen(keybuf),
1425                            bodybuf, strlen(bodybuf));
1426        TEST_STATUS(status);
1427        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1428        TEST_STATUS(status);
1429    }
1430
1431    // enter reuse via db
1432    i = 0;
1433    do {
1434        sprintf(keybuf, "0key");
1435        fillstr(bodybuf, 'b', 128);
1436        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1437                            bodybuf, strlen(bodybuf));
1438        TEST_STATUS(status);
1439        i++;
1440        status = fdb_get_file_info(dbfile, &file_info);
1441        TEST_STATUS(status);
1442    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
1443    ndocs = i;
1444
1445    sb_decision = sb_check_block_reusing(db);
1446    TEST_CHK(sb_decision == SBD_RECLAIM);
1447
1448    // commit
1449    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1450    TEST_STATUS(status);
1451
1452    // reuse blocks
1453    for (i = 0; i < ndocs; i++) {
1454        sprintf(keybuf, "key%d", i);
1455        fillstr(bodybuf, 'c', 128);
1456        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1457                            bodybuf, strlen(bodybuf));
1458        TEST_STATUS(status);
1459    }
1460    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1461    TEST_STATUS(status);
1462
1463    // check that docs from db2 not lost
1464    fdb_iterator_init(db2, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1465    i = 0;
1466    do {
1467        status = fdb_iterator_get(iterator, &rdoc);
1468        TEST_CHK(status == FDB_RESULT_SUCCESS);
1469        fdb_doc_free(rdoc);
1470        rdoc = NULL;
1471        i++;
1472    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
1473    TEST_CHK(i==10);
1474    fdb_iterator_close(iterator);
1475
1476    status = fdb_kvs_close(db);
1477    TEST_STATUS(status);
1478    status = fdb_kvs_close(db2);
1479    TEST_STATUS(status);
1480    status = fdb_close(dbfile);
1481    TEST_STATUS(status);
1482    fdb_shutdown();
1483
1484    memleak_end();
1485    TEST_RESULT("enter reuse via separate kvs test");
1486}
1487
1488void child_function() {
1489    int i;
1490    char keybuf[256];
1491    char bodybuf[512];
1492    fdb_file_handle *dbfile;
1493    fdb_kvs_handle *db;
1494    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1495    fdb_config fconfig = fdb_get_default_config();
1496    fconfig.block_reusing_threshold = 65;
1497    fconfig.num_keeping_headers = 5;
1498
1499    fdb_open(&dbfile, "./staleblktest1", &fconfig);
1500    fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1501
1502    // load docs into db and db2
1503    i = 0;
1504    while (true) {
1505        sprintf(keybuf, "key%d",i);
1506        sprintf(bodybuf, "seqno%d",i);
1507        fdb_set_kv(db, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
1508        // create seqno every 100 updates
1509        if ((i % 100) == 0) {
1510            if ((i % 500) == 0) { // wal flush every 500 updates
1511                fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1512            } else {
1513                fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1514            }
1515        }
1516        i++;
1517    }
1518}
1519
1520#if !defined(WIN32) && !defined(_WIN32)
1521void superblock_recovery_test() {
1522    TEST_INIT();
1523    memleak_start();
1524
1525    int r;
1526    uint64_t i, num_markers;
1527    void *rvalue;
1528    size_t rvalue_len;
1529    char keybuf[256];
1530    char bodybuf[256];
1531    pid_t child_id;
1532
1533    fdb_seqnum_t seqno;
1534    fdb_file_handle *dbfile;
1535    fdb_kvs_handle *db, *snap_db;
1536    fdb_snapshot_info_t *markers;
1537    fdb_status status;
1538    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1539    fdb_config fconfig = fdb_get_default_config();
1540    fdb_file_info file_info;
1541
1542    // remove previous staleblktest files
1543    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1544    (void)r;
1545
1546    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
1547    TEST_STATUS(status);
1548    status = fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1549    TEST_STATUS(status);
1550
1551    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
1552    for (i = 0; i < fconfig.num_keeping_headers + 1; i++) {
1553        sprintf(keybuf, "key");
1554        status = fdb_set_kv(db, keybuf, strlen(keybuf), NULL, 0);
1555        TEST_STATUS(status);
1556        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1557        TEST_STATUS(status);
1558    }
1559
1560    status = fdb_get_file_info(dbfile, &file_info);
1561    TEST_STATUS(status);
1562
1563    while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE) {
1564        sprintf(keybuf, "key");
1565        sprintf(bodybuf, "body%d", static_cast<int>(i));
1566        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1567                            bodybuf, strlen(bodybuf));
1568        TEST_STATUS(status);
1569        status = fdb_get_file_info(dbfile, &file_info);
1570        TEST_STATUS(status);
1571    };
1572
1573    TEST_CHK(sb_check_block_reusing(db) == SBD_RECLAIM);
1574
1575    // close previous handle
1576    status = fdb_kvs_close(db);
1577    TEST_STATUS(status);
1578    status = fdb_close(dbfile);
1579    TEST_STATUS(status);
1580
1581    // fork child
1582    child_id = fork();
1583
1584    if (child_id == 0) {
1585        child_function();
1586    }
1587
1588    // wait 3s
1589    sleep(3);
1590
1591    // kill child
1592    kill(child_id, SIGUSR1);
1593    // reopen and recover
1594
1595    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
1596    TEST_STATUS(status);
1597    status = fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1598    TEST_STATUS(status);
1599
1600    // get known key
1601    sprintf(keybuf, "key0");
1602    status = fdb_get_kv(db, keybuf, strlen(keybuf), &rvalue, &rvalue_len);
1603    TEST_STATUS(status);
1604    status = fdb_free_block(rvalue);
1605    TEST_STATUS(status);
1606
1607    // compact upto marker
1608    status = fdb_get_all_snap_markers(dbfile, &markers, &num_markers);
1609    TEST_CHK(status == FDB_RESULT_SUCCESS);
1610    TEST_CHK(num_markers == fconfig.num_keeping_headers);
1611    status = fdb_compact_upto(dbfile, NULL, markers[4].marker);
1612    TEST_CHK(status == FDB_RESULT_SUCCESS);
1613
1614    // open snapshot on marker
1615    seqno = markers[4].kvs_markers->seqnum;
1616    status = fdb_snapshot_open(db, &snap_db, seqno);
1617    TEST_STATUS(status);
1618
1619    status = fdb_free_snap_markers(markers, num_markers);
1620    TEST_CHK(status == FDB_RESULT_SUCCESS);
1621
1622    // get known key
1623    sprintf(keybuf, "key0");
1624    status = fdb_get_kv(snap_db, keybuf, strlen(keybuf), &rvalue, &rvalue_len);
1625    TEST_STATUS(status);
1626
1627    // check value
1628    TEST_CMP(rvalue, "seqno0", rvalue_len);
1629    status = fdb_free_block(rvalue);
1630    TEST_STATUS(status);
1631
1632    status = fdb_kvs_close(snap_db);
1633    TEST_STATUS(status);
1634    status = fdb_kvs_close(db);
1635    TEST_STATUS(status);
1636    status = fdb_close(dbfile);
1637    TEST_STATUS(status);
1638    fdb_shutdown();
1639
1640    memleak_end();
1641    TEST_RESULT("superblock recovery test");
1642}
1643#endif
1644
1645void reclaim_rollback_point_test() {
1646    memleak_start();
1647    TEST_INIT();
1648
1649    int i, r;
1650    int low_seq = 0;
1651    int nheaders=5;
1652    int ndocs=30000;
1653    char keybuf[16];
1654
1655    fdb_file_handle* dbfile;
1656    fdb_kvs_handle* db;
1657    fdb_status status;
1658    fdb_config fconfig = fdb_get_default_config();
1659    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1660    fdb_file_info file_info;
1661
1662    void *value_out;
1663    size_t valuelen_out;
1664
1665    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1666    (void)r;
1667
1668    // init
1669    fconfig.compaction_threshold = 0;
1670    fconfig.num_keeping_headers = nheaders;
1671    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
1672    TEST_STATUS(status);
1673    status = fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1674    TEST_STATUS(status);
1675
1676    const char *key = "key";
1677    const char *val = "val";
1678
1679    // load n docs
1680    for (i=0; i<ndocs; ++i) {
1681        sprintf(keybuf, "key%d", i);
1682        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu", 4);
1683        TEST_STATUS(status);
1684    }
1685    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1686    TEST_STATUS(status);
1687    low_seq = i;
1688
1689    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
1690    i = 0;
1691    do {
1692        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
1693        TEST_STATUS(status);
1694        i++;
1695        status = fdb_get_file_info(dbfile, &file_info);
1696        TEST_STATUS(status);
1697    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
1698
1699    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1700    TEST_STATUS(status);
1701
1702    // overwrite n docs
1703    for (i=0; i<ndocs; ++i) {
1704        sprintf(keybuf, "key%d", i);
1705        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu2", 5);
1706        TEST_STATUS(status);
1707    }
1708    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1709    TEST_STATUS(status);
1710
1711    // rollback to the first commit
1712    status = fdb_rollback(&db, low_seq);
1713    TEST_STATUS(status);
1714
1715    // retrieve docs
1716    for (i=0; i<ndocs; ++i) {
1717        sprintf(keybuf, "key%d", i);
1718        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value_out, &valuelen_out);
1719        TEST_STATUS(status);
1720        free(value_out);
1721    }
1722
1723    // create nheaders
1724    for (i = 0; i < nheaders; ++i) {
1725        sprintf(keybuf, "key%d", i);
1726        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu", 4);
1727        TEST_STATUS(status);
1728        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1729        TEST_STATUS(status);
1730    }
1731
1732    // append some data & commit
1733    // now old blocks will be reclaimed
1734    for (i=0; i<ndocs; ++i) {
1735        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
1736        TEST_STATUS(status);
1737    }
1738    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1739    TEST_STATUS(status);
1740
1741    // append more data .. now reusable blocks are overwritten
1742    for (i=0; i<ndocs; ++i) {
1743        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
1744        TEST_STATUS(status);
1745    }
1746    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1747    TEST_STATUS(status);
1748
1749    // retrieve docs
1750    for (i=0; i<ndocs; ++i) {
1751        sprintf(keybuf, "key%d", i);
1752        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value_out, &valuelen_out);
1753        TEST_STATUS(status);
1754        free(value_out);
1755    }
1756
1757    status = fdb_kvs_close(db);
1758    TEST_STATUS(status);
1759    status = fdb_close(dbfile);
1760    TEST_STATUS(status);
1761    fdb_shutdown();
1762
1763    memleak_end();
1764    TEST_RESULT("reclaim rollback point test");
1765}
1766
1767int main() {
1768    /* Test if basic stale block re-use is functional */
1769    verify_stale_block_reuse_test();
1770
1771    /* Test resuse of stale blocks with block_reusing_threshold
1772       set at 0, 65, 100 */
1773    verify_staleblock_reuse_param_test();
1774    reuse_with_snapshot_test();
1775
1776    /* Test reclaiming of stale blocks while varying
1777       num_keeping_headers */
1778    verify_minimum_num_keeping_headers_param_test();
1779    verify_high_num_keeping_headers_param_test();
1780
1781    /* Test to verify in-memory and disk snapshots before
1782       block reuse */
1783    snapshot_before_block_reuse_test(false);
1784    snapshot_before_block_reuse_test(true);
1785
1786    /* Test to verify snapshot after block reuse */
1787    snapshot_after_block_reuse_test();
1788
1789    /* Test block reusage with keys having variable value sizes */
1790    variable_value_size_test();
1791
1792    /* Test rollback with block reusage */
1793    rollback_with_num_keeping_headers();
1794
1795    /* Test block resuage with deletes */
1796    reuse_on_delete_test();
1797
1798    /* Test block reusage with manual compaction */
1799    fragmented_reuse_test();
1800
1801    /* Test to verify reuse mode with one kvstore does not affect others */
1802    enter_reuse_via_separate_kvs_test();
1803
1804#if !defined(WIN32) && !defined(_WIN32)
1805    /* Test recovery from superblock corruption */
1806    superblock_recovery_test();
1807#endif
1808
1809    /* Test rollback, verify snapshot, manual compaction upon recovery
1810       after crash */
1811    crash_and_recover_with_num_keeping_test();
1812
1813    reclaim_rollback_point_test();
1814
1815    return 0;
1816}
1817