1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdint.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <signal.h>
25#include <unistd.h>
26#endif
27
28#include "test.h"
29#include "filemgr.h"
30#include "staleblock.h"
31#include "internal_types.h"
32#include "libforestdb/forestdb.h"
33#include "functional_util.h"
34
35/*
36 * Verify that blocks can be reclaimed when
37 * default block reuse threshold is used.
38 *
39 * When threshold is 0 or 100 blocks should
40 * not be reusable
41 */
42void verify_staleblock_reuse_param_test() {
43    TEST_INIT();
44    memleak_start();
45
46    uint64_t i;
47    int r;
48    const int kv = 512;
49    char keybuf[kv];
50    char bodybuf[kv];
51    fdb_status status;
52    fdb_file_handle *dbfile;
53    fdb_kvs_handle *db;
54    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
55    sb_decision_t sb_decision;
56    fdb_file_info file_info;
57
58    // set block reuse threshold = 65
59    fdb_config fconfig = fdb_get_default_config();
60    fconfig.compaction_threshold = 0;
61    fconfig.block_reusing_threshold = 65;
62
63start_data_loading:
64    // remove previous staleblktest files
65    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
66    (void)r;
67
68    fdb_open(&dbfile, "./staleblktest1", &fconfig);
69    fdb_kvs_open_default(dbfile, &db, &kvs_config);
70
71    // create num_keeping_headers+1
72    for (i = 0; i < fconfig.num_keeping_headers + 1; i++) {
73        sprintf(keybuf, "key");
74        status = fdb_set_kv(db, keybuf, kv, NULL, 0);
75        TEST_STATUS(status);
76        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
77        TEST_STATUS(status);
78    }
79
80    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
81    i = 0;
82    do {
83        sprintf(keybuf, "key");
84        sprintf(bodybuf, "body%d", static_cast<int>(i));
85        status = fdb_set_kv(db, keybuf, kv, bodybuf, kv);
86        TEST_STATUS(status);
87        i++;
88        status = fdb_get_file_info(dbfile, &file_info);
89        TEST_STATUS(status);
90    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
91
92    // expect block reclaim only for valid threshold value
93    sb_decision = sb_check_block_reusing(db);
94    if (fconfig.block_reusing_threshold == 65) {
95        TEST_CHK(sb_decision == SBD_RECLAIM);
96    } else {
97        TEST_CHK(sb_decision == SBD_NONE);
98    }
99
100    // intermediate cleanup
101    status = fdb_close(dbfile);
102    TEST_STATUS(status);
103    fdb_shutdown();
104
105    // run again with different config
106    if (fconfig.block_reusing_threshold == 65) {
107        // disable with 0
108        fconfig.block_reusing_threshold = 0;
109        goto start_data_loading;
110    } else if (fconfig.block_reusing_threshold == 0) {
111        // disable with 100
112        fconfig.block_reusing_threshold = 100;
113        goto start_data_loading;
114    }
115
116    memleak_end();
117    TEST_RESULT("verify staleblock reuse param test");
118}
119
120void fillstr(char *str, char c, int n) {
121    int i;
122    for (i = 0; i < n - 1; ++i) {
123        str[i] = c;
124    }
125    str[i] = '\0';
126}
127
128/*
129 * verify_staleblock_reuse_param_test:
130 *     create 2*num_keeping_headers and open snapshot
131 *     at halfway point. enter block reuse state
132 *     verify snapshot data remains unaffected
133 */
134void reuse_with_snapshot_test() {
135    TEST_INIT();
136    memleak_start();
137
138    int i, n, r;
139    const int kv = 512;
140    char keybuf[kv];
141    char bodybuf[kv];
142    void *rvalue;
143    size_t rvalue_len;
144    fdb_status status;
145    fdb_file_handle *dbfile;
146    fdb_kvs_handle *db, *snap_db;
147    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
148    sb_decision_t sb_decision;
149    fdb_file_info file_info;
150
151    fdb_config fconfig = fdb_get_default_config();
152    fconfig.compaction_threshold = 0;
153    fconfig.block_reusing_threshold = 65;
154    fconfig.num_keeping_headers = 10;
155
156    // remove previous staleblktest files
157    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
158    (void)r;
159
160    fdb_open(&dbfile, "./staleblktest1", &fconfig);
161    fdb_kvs_open_default(dbfile, &db, &kvs_config);
162
163    // create 2*num_keeping_headers
164    n = 2 * fconfig.num_keeping_headers;
165    for (i = 0; i < n; i++) {
166        fillstr(keybuf, 'k', kv);
167        sprintf(bodybuf, "orig_body%d", i+1);
168        status = fdb_set_kv(db, keybuf, strlen(keybuf),
169                            bodybuf, strlen(bodybuf) + 1);
170        TEST_STATUS(status);
171        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
172        TEST_STATUS(status);
173    }
174
175    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
176    i = 0;
177    do {
178        fillstr(keybuf, 'r', kv);
179        fillstr(bodybuf, 'y', kv);
180        status = fdb_set_kv(db, keybuf, strlen(keybuf),
181                            bodybuf, strlen(bodybuf) + 1);
182        TEST_STATUS(status);
183        i++;
184        status = fdb_get_file_info(dbfile, &file_info);
185        TEST_STATUS(status);
186    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
187
188    sb_decision = sb_check_block_reusing(db);
189    TEST_CHK(sb_decision == SBD_RECLAIM);
190
191    // open snapshot
192    status = fdb_snapshot_open(db, &snap_db, 8);
193    TEST_STATUS(status);
194
195    // commit
196    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
197    TEST_STATUS(status);
198
199    // write until blocks are no longer being reused
200    i = 0;
201    do {
202        sprintf(keybuf, "key%d", i);
203        fillstr(bodybuf, 'z', kv);
204        status = fdb_set_kv(db, keybuf, strlen(keybuf),
205                            bodybuf, strlen(bodybuf));
206        TEST_STATUS(status);
207        sb_decision = sb_check_block_reusing(db);
208        i++;
209    } while (sb_decision != SBD_NONE);
210
211    // delete original keys
212    n = 2 * fconfig.num_keeping_headers;
213    for (i = 0; i < n; i++) {
214        fillstr(keybuf, 'k', kv);
215        status = fdb_del_kv(db, keybuf, strlen(keybuf));
216        TEST_STATUS(status);
217    }
218
219    // commit
220    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
221    TEST_STATUS(status);
222
223    fillstr(keybuf, 'k', kv);
224    sprintf(bodybuf, "orig_body%d", 8);
225
226    // check key does not exist in main kv
227    status = fdb_get_kv(db, keybuf, strlen(keybuf),
228                        &rvalue, &rvalue_len);
229    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
230
231    // check still exists in snapshot data
232    status = fdb_get_kv(snap_db, keybuf, strlen(keybuf),
233                        &rvalue, &rvalue_len);
234    TEST_STATUS(status);
235    TEST_CMP(rvalue, bodybuf, rvalue_len);
236
237    status = fdb_free_block(rvalue);
238    TEST_STATUS(status);
239    status = fdb_kvs_close(snap_db);
240    TEST_STATUS(status);
241    status = fdb_kvs_close(db);
242    TEST_STATUS(status);
243    status = fdb_close(dbfile);
244    TEST_STATUS(status);
245    fdb_shutdown();
246
247    memleak_end();
248    TEST_RESULT("reuse with snapshot test");
249}
250
251void verify_minimum_num_keeping_headers_param_test() {
252    memleak_start();
253    TEST_INIT();
254
255    int i, r;
256    fdb_file_handle* dbfile;
257    fdb_kvs_handle* db;
258    fdb_kvs_handle* snap_db;
259    fdb_status status;
260    fdb_config fconfig = fdb_get_default_config();
261    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
262    fdb_file_info file_info;
263
264    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
265    (void)r;
266
267    // init
268    fconfig.compaction_threshold = 0;
269
270    // open with zero num_keeping_headers parameter
271    fconfig.num_keeping_headers = 0;
272    status = fdb_open(&dbfile, (char *)"./staleblktest1", &fconfig);
273    // should fail
274    TEST_CHK(status != FDB_RESULT_SUCCESS);
275
276    // open with single keeping header
277    fconfig.num_keeping_headers = 1;
278    status = fdb_open(&dbfile, (char *)"./staleblktest1", &fconfig);
279    TEST_STATUS(status);
280    status = fdb_kvs_open(dbfile, &db, (char *)"num_keep", &kvs_config);
281    TEST_STATUS(status);
282
283    const char *key = "key";
284    const char *val = "val";
285    // 10 commits
286    for (i = 0; i < 10; ++i) {
287        status = fdb_set_kv(db, key, strlen(key) + 1,
288                            val, strlen(val) + 1);
289        TEST_STATUS(status);
290        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
291        TEST_STATUS(status);
292    }
293
294    // open snapshot on 9th commit
295    status = fdb_snapshot_open(db, &snap_db, 9);
296    TEST_STATUS(status);
297
298    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
299    i = 0;
300    do {
301        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
302        TEST_STATUS(status);
303        i++;
304        status = fdb_get_file_info(dbfile, &file_info);
305        TEST_STATUS(status);
306    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
307
308    // close snapshot (reader)
309    status = fdb_kvs_close(snap_db);
310    TEST_STATUS(status);
311
312    // 11th commit causes reclaim and loss of 9th header
313    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
314    TEST_STATUS(status);
315
316    // attempt to open new snapshot on 9th commit
317    status = fdb_snapshot_open(db, &snap_db, 9);
318    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
319
320    // open new snapshot on 10th commit
321    status = fdb_snapshot_open(db, &snap_db, 10);
322    TEST_STATUS(status);
323
324    // cleanup
325    status = fdb_close(dbfile);
326    TEST_STATUS(status);
327    status = fdb_shutdown();
328    TEST_STATUS(status);
329
330    memleak_end();
331    TEST_RESULT("verify minimum num keeping headers param test");
332}
333
334void verify_high_num_keeping_headers_param_test() {
335    memleak_start();
336    TEST_INIT();
337
338    int i, r;
339    int low_seq = 0;
340    int nheaders=100;
341    char keybuf[16];
342    void *rvalue;
343    size_t rvalue_len;
344
345    fdb_file_handle* dbfile;
346    fdb_kvs_handle* db;
347    fdb_kvs_handle* snap_db;
348    fdb_status status;
349    fdb_config fconfig = fdb_get_default_config();
350    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
351    fdb_file_info file_info;
352
353    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
354    (void)r;
355
356    // init
357    fconfig.compaction_threshold = 0;
358    fconfig.num_keeping_headers = nheaders;
359    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
360    TEST_STATUS(status);
361    status = fdb_kvs_open(dbfile, &db, "num_keep", &kvs_config);
362    TEST_STATUS(status);
363
364    const char *key = "key";
365    const char *val = "val";
366
367    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
368    i = 0;
369    do {
370        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
371        TEST_STATUS(status);
372        i++;
373        status = fdb_get_file_info(dbfile, &file_info);
374        TEST_STATUS(status);
375    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
376
377    // create lowest commit
378    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
379    TEST_STATUS(status);
380    low_seq = i;
381
382    // create 100 headers
383    for (i = 0; i < nheaders; ++i) {
384        sprintf(keybuf, "key%d", i);
385        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu", 4);
386        TEST_STATUS(status);
387        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
388        TEST_STATUS(status);
389    }
390
391    // make sure all blocks up to kept headers are reused
392    i = low_seq;
393    while (--i) {
394        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
395        TEST_STATUS(status);
396    }
397
398    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
399    TEST_STATUS(status);
400
401    // -101 commit fail
402    status = fdb_snapshot_open(db, &snap_db, low_seq);
403    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
404
405    // -100 commit pass
406    low_seq++;
407    status = fdb_snapshot_open(db, &snap_db, low_seq);
408    TEST_STATUS(status);
409
410    status = fdb_get_kv(db, keybuf, strlen(keybuf), &rvalue, &rvalue_len);
411    TEST_STATUS(status);
412
413    // cleanup
414    status = fdb_free_block(rvalue);
415    TEST_STATUS(status);
416    status = fdb_kvs_close(snap_db);
417    TEST_STATUS(status);
418    status = fdb_close(dbfile);
419    TEST_STATUS(status);
420    status = fdb_shutdown();
421    TEST_STATUS(status);
422
423    memleak_end();
424    TEST_RESULT("verify high num keeping headers param test");
425}
426
427void snapshot_before_block_reuse_test(bool inmem) {
428    memleak_start();
429    TEST_INIT();
430
431    int i, j, n, r;
432    void *rvalue;
433    size_t rvalue_len;
434    char bodybuf[256];
435    fdb_file_handle* dbfile;
436    fdb_kvs_handle *db, *snap_db;
437    fdb_status status;
438    fdb_config fconfig = fdb_get_default_config();
439    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
440    sb_decision_t sb_decision;
441    fdb_file_info file_info;
442
443    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
444    (void)r;
445
446    fconfig.num_keeping_headers = 1;
447    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
448    TEST_STATUS(status);
449    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
450    TEST_STATUS(status);
451
452    const char *key = "key";
453    const char *val = "snp";
454
455    // set key
456    status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
457    TEST_STATUS(status);
458
459    // commit
460    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
461    TEST_STATUS(status);
462
463    // open snapshot
464    if (inmem) {
465        status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
466        TEST_STATUS(status);
467    } else {
468        status = fdb_snapshot_open(db, &snap_db, 1);
469        TEST_STATUS(status);
470    }
471
472    val = "val";
473
474    // initial load to reuse
475    i = 0;
476    do {
477        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
478        TEST_STATUS(status);
479        i++;
480        status = fdb_get_file_info(dbfile, &file_info);
481        TEST_STATUS(status);
482    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
483
484    // 5 update cycles of created items
485    for (n = 0; n < 5; n++) {
486        for(j = 0; j < i; j++) {
487            status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
488            TEST_STATUS(status);
489        }
490        // commit
491        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
492        TEST_STATUS(status);
493    }
494
495    // verify in reclaim mode
496    sb_decision = sb_check_block_reusing(db);
497    TEST_CHK(sb_decision == SBD_RECLAIM);
498
499    // expect pre-reuse data retained
500    status = fdb_get_kv(snap_db, key, strlen(key) + 1, &rvalue, &rvalue_len);
501    TEST_STATUS(status);
502    TEST_CMP(rvalue, (char *)"snp", rvalue_len);
503
504    // close snapshot
505    status = fdb_kvs_close(snap_db);
506    TEST_STATUS(status);
507
508    // do another update cycle
509    for (j = 0; j < i; j++) {
510        // commit
511        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
512        TEST_STATUS(status);
513    }
514    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
515    TEST_STATUS(status);
516
517    // attempt to open again will fail
518    status = fdb_snapshot_open(db, &snap_db, 1);
519    if (status == FDB_RESULT_SUCCESS) {
520        // close incase kv was unexpectedly opened
521        status = fdb_kvs_close(snap_db);
522        TEST_STATUS(status);
523    }
524    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
525
526    status = fdb_free_block(rvalue);
527    TEST_STATUS(status);
528    status = fdb_kvs_close(db);
529    TEST_STATUS(status);
530    status = fdb_close(dbfile);
531    TEST_STATUS(status);
532    fdb_shutdown();
533
534    memleak_end();
535
536    sprintf(bodybuf,"snapshot before block reuse test %s", inmem ?
537            "in-mem snapshot" : "disk snapshot");
538    TEST_RESULT(bodybuf);
539}
540
541void snapshot_after_block_reuse_test() {
542    memleak_start();
543    TEST_INIT();
544
545    int i, r;
546    int low_seq = 0;
547    int nheaders=5;
548    char keybuf[16];
549
550    fdb_file_handle* dbfile;
551    fdb_kvs_handle* db;
552    fdb_kvs_handle* snap_db;
553    fdb_status status;
554    fdb_config fconfig = fdb_get_default_config();
555    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
556    fdb_file_info file_info;
557
558    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
559    (void)r;
560
561    // init
562    fconfig.compaction_threshold = 0;
563    fconfig.num_keeping_headers = nheaders;
564    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
565    TEST_STATUS(status);
566    status = fdb_kvs_open(dbfile, &db, "num_keep", &kvs_config);
567    TEST_STATUS(status);
568
569    const char *key = "key";
570    const char *val = "val";
571
572    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
573    i = 0;
574    do {
575        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
576        TEST_STATUS(status);
577        i++;
578        status = fdb_get_file_info(dbfile, &file_info);
579        TEST_STATUS(status);
580    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
581
582    // create lowest commit
583    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
584    TEST_STATUS(status);
585    low_seq = i;
586
587    // create nheaders
588    for (i = 0; i < nheaders + 5; ++i) {
589        sprintf(keybuf, "key%d", i);
590        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu", 4);
591        TEST_STATUS(status);
592        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
593        TEST_STATUS(status);
594    }
595
596    // make sure all blocks up to kept headers are reused
597    i = low_seq;
598    while (--i) {
599        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
600        TEST_STATUS(status);
601    }
602    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
603    TEST_STATUS(status);
604
605    // open snapshot on lowest seqno available
606    low_seq += 6;
607    status = fdb_snapshot_open(db, &snap_db, low_seq);
608    TEST_STATUS(status);
609    status = fdb_kvs_close(snap_db);
610    TEST_STATUS(status);
611
612    // open snapshot using seqno already reclaimed
613    status = fdb_snapshot_open(db, &snap_db, low_seq-1);
614    TEST_CHK(status != FDB_RESULT_SUCCESS);
615
616    status = fdb_kvs_close(db);
617    TEST_STATUS(status);
618    status = fdb_close(dbfile);
619    TEST_STATUS(status);
620    fdb_shutdown();
621
622    memleak_end();
623    TEST_RESULT("snapshot after block reuse test");
624}
625
626void snapshot_inmem_before_block_reuse_test() {
627    memleak_start();
628    TEST_INIT();
629
630    int i, j, n, r;
631    void *rvalue;
632    size_t rvalue_len;
633    fdb_file_handle* dbfile;
634    fdb_kvs_handle *db, *snap_db;
635    fdb_status status;
636    fdb_config fconfig = fdb_get_default_config();
637    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
638    sb_decision_t sb_decision;
639    fdb_file_info file_info;
640
641    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
642    (void)r;
643
644    fconfig.num_keeping_headers = 1;
645    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
646    TEST_STATUS(status);
647    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
648    TEST_STATUS(status);
649
650    const char* key = "key";
651    const char* val = "snp";
652
653    // set key
654    status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
655    TEST_STATUS(status);
656
657    // commit
658    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
659    TEST_STATUS(status);
660
661    // open snapshot
662    status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
663    TEST_STATUS(status);
664
665    val = "val";
666
667    // initial load to reuse
668    i = 0;
669    do {
670        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
671        TEST_STATUS(status);
672        i++;
673        status = fdb_get_file_info(dbfile, &file_info);
674        TEST_STATUS(status);
675    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
676
677    // 5 update cycles of created items
678    for (n = 0; n < 5; n++) {
679        for (j = 0; j < i; j++) {
680            status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
681            TEST_STATUS(status);
682        }
683        // commit
684        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
685        TEST_STATUS(status);
686    }
687
688    // verify in reclaim mode
689    sb_decision = sb_check_block_reusing(db);
690    TEST_CHK(sb_decision == SBD_RECLAIM);
691
692    // expect pre-reuse data retained
693    status = fdb_get_kv(snap_db, key, strlen(key) + 1, &rvalue, &rvalue_len);
694    TEST_STATUS(status);
695    TEST_CMP(rvalue, (char *)"snp", rvalue_len);
696
697    // close snapshot
698    status = fdb_kvs_close(snap_db);
699    TEST_STATUS(status);
700
701    // do another update cycle
702    for (j = 0; j < i; j++) {
703        // commit
704        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
705        TEST_STATUS(status);
706    }
707    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
708    TEST_STATUS(status);
709
710    // attempt to open again will fail
711    status = fdb_snapshot_open(db, &snap_db, 1);
712    if (status == FDB_RESULT_SUCCESS) {
713        // close incase kv was unexpectedly opened
714        status = fdb_kvs_close(snap_db);
715        TEST_STATUS(status);
716    }
717    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
718
719    status = fdb_free_block(rvalue);
720    TEST_STATUS(status);
721    status = fdb_kvs_close(db);
722    TEST_STATUS(status);
723    status = fdb_close(dbfile);
724    TEST_STATUS(status);
725    fdb_shutdown();
726
727    memleak_end();
728    TEST_RESULT("snapshot inmem before block reuse test");
729}
730
731void variable_value_size_test() {
732    TEST_INIT();
733    memleak_start();
734
735    uint64_t i;
736    int j, n, r;
737    const int ndocs = 3;
738    int blen;
739    char keybuf[256];
740    char *bodybuf = new char[1024 * 5120];
741    void *rvalue;
742    size_t rvalue_len;
743    fdb_status status;
744    fdb_file_handle *dbfile;
745    fdb_kvs_handle *db;
746    fdb_kvs_handle *snap_db, *snap_db2;
747    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
748    sb_decision_t sb_decision;
749    fdb_file_info file_info;
750
751    fdb_config fconfig = fdb_get_default_config();
752    fconfig.compaction_threshold = 0;
753    fconfig.block_reusing_threshold = 65;
754
755    // remove previous staleblktest files
756    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
757    (void)r;
758
759    fdb_open(&dbfile, "./staleblktest1", &fconfig);
760    fdb_kvs_open_default(dbfile, &db, &kvs_config);
761
762    // load with doc sizes 512, 1024, 1536
763    n = 0;
764    for (i = 0; i < fconfig.num_keeping_headers + 1; i++) {
765        for (j = 1; j <= ndocs; j++) {
766            blen = j * 512;
767            sprintf(keybuf, "%d_key", blen);
768            fillstr(bodybuf, 'a', blen);    // Max blen allowed: 1024*5120
769            status = fdb_set_kv(db, keybuf, strlen(keybuf) + 1,
770                                bodybuf, strlen(bodybuf) + 1);
771            TEST_STATUS(status);
772            n++;
773        }
774        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
775        TEST_STATUS(status);
776    }
777
778    // open disk snapshot
779    status = fdb_snapshot_open(db, &snap_db, n);
780    TEST_STATUS(status);
781
782    // update docs with sizes 1024, 2048, 3072
783    i = 0;
784    do {
785        for (j = 1;j <= ndocs; j++) {
786            blen = j * 1024;
787            sprintf(keybuf, "%d_key", blen);
788            fillstr(bodybuf, 'b', blen);
789            status = fdb_set_kv(db, keybuf, strlen(keybuf) + 1,
790                                bodybuf, strlen(bodybuf) + 1);
791            TEST_STATUS(status);
792        }
793        i++;
794        status = fdb_get_file_info(dbfile, &file_info);
795        TEST_STATUS(status);
796    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
797
798    sb_decision = sb_check_block_reusing(db);
799    TEST_CHK(sb_decision == SBD_RECLAIM);
800
801    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
802    TEST_STATUS(status);
803
804    // create in mem snapshot
805    status = fdb_snapshot_open(db, &snap_db2, FDB_SNAPSHOT_INMEM);
806    TEST_STATUS(status);
807
808    // update cycle with small values
809    while (--i) {
810        blen = i * 8;
811        sprintf(keybuf, "%d_key", blen);
812        fillstr(bodybuf, 'c', blen);
813        status = fdb_set_kv(db, keybuf, strlen(keybuf) + 1,
814                            bodybuf, strlen(bodybuf) + 1);
815        TEST_STATUS(status);
816        if ((i % 10) == 0) {
817            status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
818            TEST_STATUS(status);
819        }
820    }
821    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
822    TEST_STATUS(status);
823
824    // write very large value
825    sprintf(keybuf, "%d_key", 512);
826    fillstr(bodybuf, 'a', 1024 * 5120);
827    status = fdb_set_kv(db, keybuf, strlen(keybuf) + 1,
828                        bodybuf, strlen(bodybuf) + 1);
829    TEST_STATUS(status);
830    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
831    TEST_STATUS(status);
832
833    // verify disk snapshot
834    blen = 512;
835    sprintf(keybuf, "%d_key", blen);
836    fillstr(bodybuf, 'a', blen);
837    status = fdb_get_kv(snap_db, keybuf, strlen(keybuf) + 1,
838                        &rvalue, &rvalue_len);
839    TEST_STATUS(status);
840    TEST_CMP(bodybuf, rvalue, rvalue_len);
841    status = fdb_free_block(rvalue);
842    TEST_STATUS(status);
843
844    // verify in mem snapshot
845    blen = 2048;
846    sprintf(keybuf, "%d_key", blen);
847    fillstr(bodybuf, 'b', blen);
848    status = fdb_get_kv(snap_db2, keybuf, strlen(keybuf) + 1,
849                        &rvalue, &rvalue_len);
850    TEST_STATUS(status);
851    TEST_CMP(bodybuf, rvalue, rvalue_len);
852    status = fdb_free_block(rvalue);
853    TEST_STATUS(status);
854
855    delete[] bodybuf;
856
857    // cleanup
858    status = fdb_kvs_close(db);
859    TEST_STATUS(status);
860    status = fdb_kvs_close(snap_db);
861    TEST_STATUS(status);
862    status = fdb_kvs_close(snap_db2);
863    TEST_STATUS(status);
864    status = fdb_close(dbfile);
865    TEST_STATUS(status);
866    fdb_shutdown();
867
868    memleak_end();
869    TEST_RESULT("variable value size test");
870}
871
872void rollback_with_num_keeping_headers() {
873    TEST_INIT();
874    memleak_start();
875
876    int i, n, r;
877    char keybuf[256];
878    char bodybuf[1024];
879    fdb_status status;
880    fdb_file_handle *dbfile;
881    fdb_kvs_handle *db;
882    fdb_kvs_info kvs_info;
883    sb_decision_t sb_decision;
884    fdb_file_info file_info;
885
886    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
887    fdb_config fconfig = fdb_get_default_config();
888    fconfig.compaction_threshold = 0;
889    fconfig.block_reusing_threshold = 65;
890    fconfig.num_keeping_headers = 1;
891
892    // remove previous staleblktest files
893    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
894    (void)r;
895
896    fdb_open(&dbfile, "./staleblktest1", &fconfig);
897    fdb_kvs_open_default(dbfile, &db, &kvs_config);
898
899    // create 10 headers
900    for (i = 0; i < 10; i++) {
901        sprintf(keybuf, "%dkey",i);
902        fillstr(bodybuf, 'b', 512);
903        status = fdb_set_kv(db, keybuf, strlen(keybuf),
904                            bodybuf, strlen(bodybuf));
905        TEST_STATUS(status);
906        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
907        TEST_STATUS(status);
908    }
909
910    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
911    i = 0;
912    do {
913        sprintf(keybuf, "0key");
914        fillstr(bodybuf, 'c', 128);
915        status = fdb_set_kv(db, keybuf, strlen(keybuf),
916                            bodybuf, strlen(bodybuf));
917        TEST_STATUS(status);
918        i++;
919        status = fdb_get_file_info(dbfile, &file_info);
920        TEST_STATUS(status);
921    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
922    n = i;
923
924    // expect block reclaim
925    sb_decision = sb_check_block_reusing(db);
926    TEST_CHK(sb_decision == SBD_RECLAIM);
927
928    // reclaim old header via 11th commit
929    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
930    TEST_STATUS(status);
931
932    // load unique keys to reuse old blocks
933    for (i = 0; i < n; i++) {
934        sprintf(keybuf, "%dkey",i);
935        fillstr(bodybuf, 'd', 512);
936        status = fdb_set_kv(db, keybuf, strlen(keybuf),
937                            bodybuf, strlen(bodybuf));
938        TEST_STATUS(status);
939    }
940
941    // do rollback to 10th headers
942    status = fdb_rollback(&db, 10);
943    TEST_STATUS(status);
944
945    // expect only 10 docs
946    status = fdb_get_kvs_info(db, &kvs_info);
947    TEST_STATUS(status);
948    TEST_CHK(kvs_info.doc_count == 10);
949
950    // cleanup
951    status = fdb_kvs_close(db);
952    TEST_STATUS(status);
953    status = fdb_close(dbfile);
954    TEST_STATUS(status);
955    fdb_shutdown();
956
957    memleak_end();
958    TEST_RESULT("rollback with num keeping headers");
959}
960
961void crash_and_recover_with_num_keeping_test() {
962    TEST_INIT();
963    memleak_start();
964
965    int i, r, ndocs;
966    int nheaders = 10;
967    size_t last_seqno;
968    char keybuf[256];
969    char bodybuf[512];
970
971    fdb_status status;
972    fdb_file_handle *dbfile;
973    fdb_kvs_handle *db, *snap_db;
974    fdb_file_info file_info;
975    fdb_kvs_info kvs_info;
976    sb_decision_t sb_decision;
977
978    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
979    fdb_config fconfig = fdb_get_default_config();
980    fconfig.compaction_threshold = 0;
981    fconfig.block_reusing_threshold = 65;
982    fconfig.num_keeping_headers = nheaders;
983
984    // remove previous staleblktest files
985    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
986    (void)r;
987
988    fdb_open(&dbfile, "./staleblktest1file.1", &fconfig);
989    fdb_kvs_open(dbfile, &db, "./staleblktest1", &kvs_config);
990
991    // create num_keeping_headers+1
992    for (i = 0; i < nheaders + 1; i++) {
993        sprintf(keybuf, "%dkey",i);
994        fillstr(bodybuf, 'b', 512);
995        status = fdb_set_kv(db, keybuf, strlen(keybuf),
996                            bodybuf, strlen(bodybuf));
997        TEST_STATUS(status);
998        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
999        TEST_STATUS(status);
1000    }
1001
1002    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
1003    i = 0;
1004    do {
1005        sprintf(keybuf, "0key");
1006        fillstr(bodybuf, 'c', 128);
1007        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1008                            bodybuf, strlen(bodybuf));
1009        TEST_STATUS(status);
1010        i++;
1011        status = fdb_get_file_info(dbfile, &file_info);
1012        TEST_STATUS(status);
1013    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
1014    ndocs = i;
1015
1016    // expect block reclaim
1017    sb_decision = sb_check_block_reusing(db);
1018    TEST_CHK(sb_decision == SBD_RECLAIM);
1019
1020    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1021    TEST_STATUS(status);
1022    status = fdb_get_kvs_info(db, &kvs_info);
1023    TEST_STATUS(status);
1024    last_seqno = kvs_info.last_seqnum;
1025
1026    // create num_keeping_headers
1027    for (i = 0; i < nheaders; i++) {
1028        sprintf(keybuf, "%dkey",i);
1029        fillstr(bodybuf, 'd', 64);
1030        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1031                            bodybuf, strlen(bodybuf));
1032        TEST_STATUS(status);
1033        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1034        TEST_STATUS(status);
1035    }
1036
1037    // preemptive shutdown
1038    status = fdb_kvs_close(db);
1039    TEST_STATUS(status);
1040    status = fdb_close(dbfile);
1041    TEST_STATUS(status);
1042    fdb_shutdown();
1043
1044    // reopen
1045    status = fdb_open(&dbfile, "./staleblktest1file.1", &fconfig);
1046    TEST_STATUS(status);
1047    fdb_kvs_open(dbfile, &db, "./staleblktest1", &kvs_config);
1048    TEST_STATUS(status);
1049
1050    status = fdb_get_file_info(dbfile, &file_info);
1051    TEST_STATUS(status);
1052    r = _disk_dump("./staleblktest1file.1", file_info.file_size,
1053                   (2 * fconfig.blocksize) + (fconfig.blocksize / 4));
1054    TEST_CHK(r >= 0);
1055
1056    // snapshot to last keeping header
1057    status = fdb_snapshot_open(db, &snap_db, last_seqno);
1058    TEST_STATUS(status);
1059
1060    // rollback to last keepheader
1061    status = fdb_rollback(&db, last_seqno);
1062    TEST_STATUS(status);
1063
1064    // manual commit
1065    status = fdb_compact(dbfile, "./staleblktest1file.3");
1066    TEST_STATUS(status);
1067
1068    // delete items
1069    for (i = 0; i < nheaders + 1; i++) {
1070        sprintf(keybuf, "%dkey",i);
1071        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1072        TEST_STATUS(status);
1073        // commit
1074        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1075        TEST_STATUS(status);
1076    }
1077
1078    // not reusing blocks
1079    sb_decision = sb_check_block_reusing(db);
1080    TEST_CHK(sb_decision == SBD_NONE);
1081
1082    // append until reuse
1083    for (i = 0; i < ndocs; i++) {
1084        sprintf(keybuf, "0key");
1085        fillstr(bodybuf, 'e', 512);
1086        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1087                            bodybuf, strlen(bodybuf));
1088        TEST_STATUS(status);
1089    }
1090    sb_decision = sb_check_block_reusing(db);
1091    TEST_CHK(sb_decision == SBD_RECLAIM);
1092
1093    status = fdb_kvs_close(snap_db);
1094    TEST_STATUS(status);
1095    status = fdb_kvs_close(db);
1096    TEST_STATUS(status);
1097    status = fdb_close(dbfile);
1098    TEST_STATUS(status);
1099    fdb_shutdown();
1100
1101    memleak_end();
1102    TEST_RESULT("crash and recover with num keeping test");
1103}
1104
1105void reuse_on_delete_test() {
1106    TEST_INIT();
1107    memleak_start();
1108
1109    int i, r, ndocs;
1110    int nheaders = 10;
1111    char keybuf[256];
1112    char bodybuf[512];
1113
1114    fdb_status status;
1115    fdb_file_handle *dbfile;
1116    fdb_kvs_handle *db;
1117    sb_decision_t sb_decision;
1118    fdb_file_info file_info;
1119
1120    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1121    fdb_config fconfig = fdb_get_default_config();
1122    fconfig.compaction_threshold = 0;
1123    fconfig.block_reusing_threshold = 65;
1124    fconfig.num_keeping_headers = nheaders;
1125
1126    // remove previous staleblktest files
1127    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1128    (void)r;
1129
1130    fdb_open(&dbfile, "./staleblktest1", &fconfig);
1131    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1132
1133    // create num_keeping_headers+1
1134    for (i = 0; i < nheaders + 1; i++) {
1135        sprintf(keybuf, "%dkey",i);
1136        fillstr(bodybuf, 'b', 512);
1137        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1138                            bodybuf, strlen(bodybuf));
1139        TEST_STATUS(status);
1140        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1141        TEST_STATUS(status);
1142    }
1143
1144    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
1145    i = 0;
1146    do {
1147        sprintf(keybuf, "%dkey",i);
1148        fillstr(bodybuf, 'c', 128);
1149        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1150                            bodybuf, strlen(bodybuf));
1151        TEST_STATUS(status);
1152        i++;
1153        status = fdb_get_file_info(dbfile, &file_info);
1154        TEST_STATUS(status);
1155    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
1156    ndocs = i;
1157
1158    // expect NO REUSE
1159    sb_decision = sb_check_block_reusing(db);
1160    TEST_CHK(sb_decision == SBD_NONE);
1161    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1162    TEST_STATUS(status);
1163
1164    // delete so that file becomes stale
1165    for (i = 0; i < ndocs; ++i) {
1166        sprintf(keybuf, "%dkey",i);
1167        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1168        TEST_STATUS(status);
1169    }
1170
1171    sb_decision = sb_check_block_reusing(db);
1172    TEST_CHK(sb_decision == SBD_RECLAIM);
1173
1174    // reload 1/4 all keys again and expect no file size growth
1175    // since all docs being reused
1176    for (i = 0; i < ndocs / 4; ++i) {
1177        sprintf(keybuf, "%dkey",i);
1178        fillstr(bodybuf, 'd', 128);
1179        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1180                            bodybuf, strlen(bodybuf));
1181        TEST_STATUS(status);
1182        i++;
1183    }
1184
1185    // expect to still be in reuse mode
1186    sb_decision = sb_check_block_reusing(db);
1187    TEST_CHK(sb_decision == SBD_RECLAIM);
1188
1189    status = fdb_close(dbfile);
1190    TEST_STATUS(status);
1191    status = fdb_shutdown();
1192    TEST_STATUS(status);
1193
1194    memleak_end();
1195    TEST_RESULT("reuse on delete test");
1196}
1197
1198void fragmented_reuse_test() {
1199    TEST_INIT();
1200    memleak_start();
1201
1202    int i, r, ndocs;
1203    int nheaders = 10;
1204    char keybuf[256];
1205    char bodybuf[512];
1206    size_t fpos;
1207
1208    fdb_status status;
1209    fdb_file_handle *dbfile;
1210    fdb_kvs_handle *db;
1211    sb_decision_t sb_decision;
1212    fdb_file_info file_info;
1213    fdb_kvs_info kvs_info;
1214
1215    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1216    fdb_config fconfig = fdb_get_default_config();
1217    fconfig.compaction_threshold = 0;
1218    fconfig.block_reusing_threshold = 35;
1219    fconfig.num_keeping_headers = nheaders;
1220
1221    // remove previous staleblktest files
1222    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1223    (void)r;
1224
1225    fdb_open(&dbfile, "./staleblktest1", &fconfig);
1226    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1227
1228    // satisfy reuse constraints
1229    for (i = 0; i < nheaders + 1; i++) {
1230        sprintf(keybuf, "%dkey",i);
1231        fillstr(bodybuf, 'a', 512);
1232        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1233                            bodybuf, strlen(bodybuf));
1234        TEST_STATUS(status);
1235        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1236        TEST_STATUS(status);
1237    }
1238    i = 0;
1239    do {
1240        sprintf(keybuf, "%dkey",i);
1241        fillstr(bodybuf, 'b', 128);
1242        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1243                            bodybuf, strlen(bodybuf));
1244        TEST_STATUS(status);
1245        i++;
1246        status = fdb_get_file_info(dbfile, &file_info);
1247        TEST_STATUS(status);
1248    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
1249    ndocs = i;
1250
1251    // make 25% of file stale
1252    for (i = 0; i < ndocs / 4; i++) {
1253        sprintf(keybuf, "%dkey",i);
1254        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1255        TEST_STATUS(status);
1256    }
1257
1258    // verify blocks not being reused
1259    sb_decision = sb_check_block_reusing(db);
1260    TEST_CHK(sb_decision == SBD_NONE);
1261
1262    // manual compaction
1263    fpos = filemgr_get_pos(db->file);
1264    status = fdb_compact(dbfile, "staleblktest_compact");
1265    TEST_STATUS(status);
1266    // MB-20091 : refresh db over to new file since compact won't do it
1267    status = fdb_get_kvs_info(db, &kvs_info);
1268    TEST_STATUS(status);
1269    TEST_CHK(kvs_info.doc_count > 0);
1270    TEST_CHK(fpos > filemgr_get_pos(db->file));
1271
1272    // making additional 25% of file stale should NOT go into
1273    // block reuse.  without compaction we would be 50% stale
1274    for (i = ndocs / 4; i < ndocs / 2; i++) {
1275        sprintf(keybuf, "%dkey",i);
1276        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1277        TEST_STATUS(status);
1278    }
1279    sb_decision = sb_check_block_reusing(db);
1280    TEST_CHK(sb_decision == SBD_NONE);
1281
1282    // delete rest of docs
1283    for (i = ndocs / 2; i < ndocs; i++) {
1284        sprintf(keybuf, "%dkey",i);
1285        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1286        TEST_STATUS(status);
1287    }
1288
1289    // restore nheaders again
1290    for (i = 0; i < nheaders + 1; i++) {
1291        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1292        TEST_STATUS(status);
1293    }
1294
1295    // MB-20091: commit is done with internal root handle, sync up 'db' handle
1296    status = fdb_get_kvs_info(db, &kvs_info);
1297    TEST_STATUS(status);
1298    sb_decision = sb_check_block_reusing(db);
1299    TEST_CHK(sb_decision != SBD_NONE);
1300
1301    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1302    TEST_STATUS(status);
1303    status = fdb_close(dbfile);
1304    TEST_STATUS(status);
1305    status = fdb_shutdown();
1306    TEST_STATUS(status);
1307
1308    memleak_end();
1309    TEST_RESULT("fragmented reuse test");
1310}
1311
1312void enter_reuse_via_separate_kvs_test() {
1313    TEST_INIT();
1314    memleak_start();
1315
1316    int i, r, ndocs;
1317    int nheaders = 10;
1318    char keybuf[256];
1319    char bodybuf[512];
1320
1321    fdb_status status;
1322    fdb_file_handle *dbfile;
1323    fdb_kvs_handle *db, *db2;
1324    fdb_doc *rdoc = NULL;
1325    fdb_iterator *iterator;
1326    sb_decision_t sb_decision;
1327    fdb_file_info file_info;
1328
1329    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1330    fdb_config fconfig = fdb_get_default_config();
1331    fconfig.compaction_threshold = 0;
1332    fconfig.block_reusing_threshold = 35;
1333    fconfig.num_keeping_headers = nheaders;
1334
1335    // remove previous staleblktest files
1336    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1337    (void)r;
1338
1339    fdb_open(&dbfile, "./staleblktest1", &fconfig);
1340    fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1341    fdb_kvs_open(dbfile, &db2, "db2", &kvs_config);
1342
1343    // load docs into db and db2
1344    for (i = 0; i < nheaders; i++) {
1345        sprintf(keybuf, "%dkey",i);
1346        fillstr(bodybuf, 'a', 12);
1347        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1348                            bodybuf, strlen(bodybuf));
1349        TEST_STATUS(status);
1350        status = fdb_set_kv(db2, keybuf, strlen(keybuf),
1351                            bodybuf, strlen(bodybuf));
1352        TEST_STATUS(status);
1353        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1354        TEST_STATUS(status);
1355    }
1356
1357    // enter reuse via db
1358    i = 0;
1359    do {
1360        sprintf(keybuf, "0key");
1361        fillstr(bodybuf, 'b', 128);
1362        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1363                            bodybuf, strlen(bodybuf));
1364        TEST_STATUS(status);
1365        i++;
1366        status = fdb_get_file_info(dbfile, &file_info);
1367        TEST_STATUS(status);
1368    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
1369    ndocs = i;
1370
1371    sb_decision = sb_check_block_reusing(db);
1372    TEST_CHK(sb_decision == SBD_RECLAIM);
1373
1374    // commit
1375    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1376    TEST_STATUS(status);
1377
1378    // reuse blocks
1379    for (i = 0; i < ndocs; i++) {
1380        sprintf(keybuf, "key%d", i);
1381        fillstr(bodybuf, 'c', 128);
1382        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1383                            bodybuf, strlen(bodybuf));
1384        TEST_STATUS(status);
1385    }
1386    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1387    TEST_STATUS(status);
1388
1389    // check that docs from db2 not lost
1390    fdb_iterator_init(db2, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1391    i = 0;
1392    do {
1393        status = fdb_iterator_get(iterator, &rdoc);
1394        TEST_CHK(status == FDB_RESULT_SUCCESS);
1395        fdb_doc_free(rdoc);
1396        rdoc = NULL;
1397        i++;
1398    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
1399    TEST_CHK(i==10);
1400    fdb_iterator_close(iterator);
1401
1402    status = fdb_kvs_close(db);
1403    TEST_STATUS(status);
1404    status = fdb_kvs_close(db2);
1405    TEST_STATUS(status);
1406    status = fdb_close(dbfile);
1407    TEST_STATUS(status);
1408    fdb_shutdown();
1409
1410    memleak_end();
1411    TEST_RESULT("enter reuse via separate kvs test");
1412}
1413
1414void child_function() {
1415    int i;
1416    char keybuf[256];
1417    char bodybuf[512];
1418    fdb_file_handle *dbfile;
1419    fdb_kvs_handle *db;
1420    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1421    fdb_config fconfig = fdb_get_default_config();
1422    fconfig.block_reusing_threshold = 65;
1423    fconfig.num_keeping_headers = 5;
1424
1425    fdb_open(&dbfile, "./staleblktest1", &fconfig);
1426    fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1427
1428    // load docs into db and db2
1429    i = 0;
1430    while (true) {
1431        sprintf(keybuf, "key%d",i);
1432        sprintf(bodybuf, "seqno%d",i);
1433        fdb_set_kv(db, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
1434        // create seqno every 100 updates
1435        if ((i % 100) == 0) {
1436            if ((i % 500) == 0) { // wal flush every 500 updates
1437                fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1438            } else {
1439                fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1440            }
1441        }
1442        i++;
1443    }
1444}
1445
1446#if !defined(WIN32) && !defined(_WIN32)
1447void superblock_recovery_test() {
1448    TEST_INIT();
1449    memleak_start();
1450
1451    int r;
1452    uint64_t i, num_markers;
1453    void *rvalue;
1454    size_t rvalue_len;
1455    char keybuf[256];
1456    char bodybuf[256];
1457    pid_t child_id;
1458
1459    fdb_seqnum_t seqno;
1460    fdb_file_handle *dbfile;
1461    fdb_kvs_handle *db, *snap_db;
1462    fdb_snapshot_info_t *markers;
1463    fdb_status status;
1464    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1465    fdb_config fconfig = fdb_get_default_config();
1466    fdb_file_info file_info;
1467
1468    // remove previous staleblktest files
1469    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1470    (void)r;
1471
1472    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
1473    TEST_STATUS(status);
1474    status = fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1475    TEST_STATUS(status);
1476
1477    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
1478    for (i = 0; i < fconfig.num_keeping_headers + 1; i++) {
1479        sprintf(keybuf, "key");
1480        status = fdb_set_kv(db, keybuf, strlen(keybuf), NULL, 0);
1481        TEST_STATUS(status);
1482        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1483        TEST_STATUS(status);
1484    }
1485
1486    status = fdb_get_file_info(dbfile, &file_info);
1487    TEST_STATUS(status);
1488
1489    while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE) {
1490        sprintf(keybuf, "key");
1491        sprintf(bodybuf, "body%d", static_cast<int>(i));
1492        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1493                            bodybuf, strlen(bodybuf));
1494        TEST_STATUS(status);
1495        status = fdb_get_file_info(dbfile, &file_info);
1496        TEST_STATUS(status);
1497    };
1498
1499    TEST_CHK(sb_check_block_reusing(db) == SBD_RECLAIM);
1500
1501    // close previous handle
1502    status = fdb_kvs_close(db);
1503    TEST_STATUS(status);
1504    status = fdb_close(dbfile);
1505    TEST_STATUS(status);
1506
1507    // fork child
1508    child_id = fork();
1509
1510    if (child_id == 0) {
1511        child_function();
1512    }
1513
1514    // wait 3s
1515    sleep(3);
1516
1517    // kill child
1518    kill(child_id, SIGUSR1);
1519    // reopen and recover
1520
1521    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
1522    TEST_STATUS(status);
1523    status = fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1524    TEST_STATUS(status);
1525
1526    // get known key
1527    sprintf(keybuf, "key0");
1528    status = fdb_get_kv(db, keybuf, strlen(keybuf), &rvalue, &rvalue_len);
1529    TEST_STATUS(status);
1530    status = fdb_free_block(rvalue);
1531    TEST_STATUS(status);
1532
1533    // compact upto marker
1534    status = fdb_get_all_snap_markers(dbfile, &markers, &num_markers);
1535    TEST_CHK(status == FDB_RESULT_SUCCESS);
1536    TEST_CHK(num_markers == fconfig.num_keeping_headers);
1537    status = fdb_compact_upto(dbfile, NULL, markers[4].marker);
1538    TEST_CHK(status == FDB_RESULT_SUCCESS);
1539
1540    // open snapshot on marker
1541    seqno = markers[4].kvs_markers->seqnum;
1542    status = fdb_snapshot_open(db, &snap_db, seqno);
1543    TEST_STATUS(status);
1544
1545    status = fdb_free_snap_markers(markers, num_markers);
1546    TEST_CHK(status == FDB_RESULT_SUCCESS);
1547
1548    // get known key
1549    sprintf(keybuf, "key0");
1550    status = fdb_get_kv(snap_db, keybuf, strlen(keybuf), &rvalue, &rvalue_len);
1551    TEST_STATUS(status);
1552
1553    // check value
1554    TEST_CMP(rvalue, "seqno0", rvalue_len);
1555    status = fdb_free_block(rvalue);
1556    TEST_STATUS(status);
1557
1558    status = fdb_kvs_close(snap_db);
1559    TEST_STATUS(status);
1560    status = fdb_kvs_close(db);
1561    TEST_STATUS(status);
1562    status = fdb_close(dbfile);
1563    TEST_STATUS(status);
1564    fdb_shutdown();
1565
1566    memleak_end();
1567    TEST_RESULT("superblock recovery test");
1568}
1569#endif
1570
1571void reclaim_rollback_point_test() {
1572    memleak_start();
1573    TEST_INIT();
1574
1575    int i, r;
1576    int low_seq = 0;
1577    int nheaders=5;
1578    int ndocs=30000;
1579    char keybuf[16];
1580
1581    fdb_file_handle* dbfile;
1582    fdb_kvs_handle* db;
1583    fdb_status status;
1584    fdb_config fconfig = fdb_get_default_config();
1585    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1586    fdb_file_info file_info;
1587
1588    void *value_out;
1589    size_t valuelen_out;
1590
1591    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1592    (void)r;
1593
1594    // init
1595    fconfig.compaction_threshold = 0;
1596    fconfig.num_keeping_headers = nheaders;
1597    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
1598    TEST_STATUS(status);
1599    status = fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1600    TEST_STATUS(status);
1601
1602    const char *key = "key";
1603    const char *val = "val";
1604
1605    // load n docs
1606    for (i=0; i<ndocs; ++i) {
1607        sprintf(keybuf, "key%d", i);
1608        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu", 4);
1609        TEST_STATUS(status);
1610    }
1611    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1612    TEST_STATUS(status);
1613    low_seq = i;
1614
1615    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
1616    i = 0;
1617    do {
1618        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
1619        TEST_STATUS(status);
1620        i++;
1621        status = fdb_get_file_info(dbfile, &file_info);
1622        TEST_STATUS(status);
1623    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
1624
1625    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1626    TEST_STATUS(status);
1627
1628    // overwrite n docs
1629    for (i=0; i<ndocs; ++i) {
1630        sprintf(keybuf, "key%d", i);
1631        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu2", 5);
1632        TEST_STATUS(status);
1633    }
1634    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1635    TEST_STATUS(status);
1636
1637    // rollback to the first commit
1638    status = fdb_rollback(&db, low_seq);
1639    TEST_STATUS(status);
1640
1641    // retrieve docs
1642    for (i=0; i<ndocs; ++i) {
1643        sprintf(keybuf, "key%d", i);
1644        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value_out, &valuelen_out);
1645        TEST_STATUS(status);
1646        free(value_out);
1647    }
1648
1649    // create nheaders
1650    for (i = 0; i < nheaders; ++i) {
1651        sprintf(keybuf, "key%d", i);
1652        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu", 4);
1653        TEST_STATUS(status);
1654        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1655        TEST_STATUS(status);
1656    }
1657
1658    // append some data & commit
1659    // now old blocks will be reclaimed
1660    for (i=0; i<ndocs; ++i) {
1661        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
1662        TEST_STATUS(status);
1663    }
1664    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1665    TEST_STATUS(status);
1666
1667    // append more data .. now reusable blocks are overwritten
1668    for (i=0; i<ndocs; ++i) {
1669        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
1670        TEST_STATUS(status);
1671    }
1672    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1673    TEST_STATUS(status);
1674
1675    // retrieve docs
1676    for (i=0; i<ndocs; ++i) {
1677        sprintf(keybuf, "key%d", i);
1678        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value_out, &valuelen_out);
1679        TEST_STATUS(status);
1680        free(value_out);
1681    }
1682
1683    status = fdb_kvs_close(db);
1684    TEST_STATUS(status);
1685    status = fdb_close(dbfile);
1686    TEST_STATUS(status);
1687    fdb_shutdown();
1688
1689    memleak_end();
1690    TEST_RESULT("reclaim rollback point test");
1691}
1692
1693int main() {
1694
1695    /* Test resuse of stale blocks with block_reusing_threshold
1696       set at 0, 65, 100 */
1697    verify_staleblock_reuse_param_test();
1698    reuse_with_snapshot_test();
1699
1700    /* Test reclaiming of stale blocks while varying
1701       num_keeping_headers */
1702    verify_minimum_num_keeping_headers_param_test();
1703    verify_high_num_keeping_headers_param_test();
1704
1705    /* Test to verify in-memory and disk snapshots before
1706       block reuse */
1707    snapshot_before_block_reuse_test(false);
1708    snapshot_before_block_reuse_test(true);
1709
1710    /* Test to verify snapshot after block reuse */
1711    snapshot_after_block_reuse_test();
1712
1713    /* Test block reusage with keys having variable value sizes */
1714    variable_value_size_test();
1715
1716    /* Test rollback with block reusage */
1717    rollback_with_num_keeping_headers();
1718
1719    /* Test block resuage with deletes */
1720    reuse_on_delete_test();
1721
1722    /* Test block reusage with manual compaction */
1723    fragmented_reuse_test();
1724
1725    /* Test to verify reuse mode with one kvstore does not affect others */
1726    enter_reuse_via_separate_kvs_test();
1727
1728#if !defined(WIN32) && !defined(_WIN32)
1729    /* Test recovery from superblock corruption */
1730    superblock_recovery_test();
1731#endif
1732
1733    /* Test rollback, verify snapshot, manual compaction upon recovery
1734       after crash */
1735    crash_and_recover_with_num_keeping_test();
1736
1737    reclaim_rollback_point_test();
1738
1739    return 0;
1740}
1741