1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdint.h>
22#include <time.h>
23#if !defined(WIN32) && !defined(_WIN32)
24#include <signal.h>
25#include <unistd.h>
26#endif
27
28#include "test.h"
29#include "filemgr.h"
30#include "staleblock.h"
31#include "internal_types.h"
32#include "libforestdb/forestdb.h"
33#include "functional_util.h"
34
35/*
36 * Verify that blocks can be reclaimed when
37 * default block reuse threshold is used.
38 *
39 * When threshold is 0 or 100 blocks should
40 * not be reusable
41 */
42void verify_staleblock_reuse_param_test() {
43    TEST_INIT();
44    memleak_start();
45
46    uint64_t i;
47    int r;
48    const int kv = 512;
49    char keybuf[kv];
50    char bodybuf[kv];
51    fdb_status status;
52    fdb_file_handle *dbfile;
53    fdb_kvs_handle *db;
54    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
55    sb_decision_t sb_decision;
56    fdb_file_info file_info;
57
58    // set block reuse threshold = 65
59    fdb_config fconfig = fdb_get_default_config();
60    fconfig.compaction_threshold = 0;
61    fconfig.block_reusing_threshold = 65;
62
63start_data_loading:
64    // remove previous staleblktest files
65    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
66    (void)r;
67
68    fdb_open(&dbfile, "./staleblktest1", &fconfig);
69    fdb_kvs_open_default(dbfile, &db, &kvs_config);
70
71    // create num_keeping_headers+1
72    for (i = 0; i < fconfig.num_keeping_headers + 1; i++) {
73        sprintf(keybuf, "key");
74        status = fdb_set_kv(db, keybuf, kv, NULL, 0);
75        TEST_STATUS(status);
76        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
77        TEST_STATUS(status);
78    }
79
80    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
81    i = 0;
82    do {
83        sprintf(keybuf, "key");
84        sprintf(bodybuf, "body%d", static_cast<int>(i));
85        status = fdb_set_kv(db, keybuf, kv, bodybuf, kv);
86        TEST_STATUS(status);
87        i++;
88        status = fdb_get_file_info(dbfile, &file_info);
89        TEST_STATUS(status);
90    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
91
92    // expect block reclaim only for valid threshold value
93    sb_decision = sb_check_block_reusing(db);
94    if (fconfig.block_reusing_threshold == 65) {
95        TEST_CHK(sb_decision == SBD_RECLAIM);
96    } else {
97        TEST_CHK(sb_decision == SBD_NONE);
98    }
99
100    // intermediate cleanup
101    status = fdb_close(dbfile);
102    TEST_STATUS(status);
103    fdb_shutdown();
104
105    // run again with different config
106    if (fconfig.block_reusing_threshold == 65) {
107        // disable with 0
108        fconfig.block_reusing_threshold = 0;
109        goto start_data_loading;
110    } else if (fconfig.block_reusing_threshold == 0) {
111        // disable with 100
112        fconfig.block_reusing_threshold = 100;
113        goto start_data_loading;
114    }
115
116    memleak_end();
117    TEST_RESULT("verify staleblock reuse param test");
118}
119
120void fillstr(char *str, char c, int n) {
121    int i;
122    for (i = 0; i < n - 1; ++i) {
123        str[i] = c;
124    }
125    str[i] = '\0';
126}
127
128/*
129 * verify_staleblock_reuse_param_test:
130 *     create 2*num_keeping_headers and open snapshot
131 *     at halfway point. enter block reuse state
132 *     verify snapshot data remains unaffected
133 */
134void reuse_with_snapshot_test() {
135    TEST_INIT();
136    memleak_start();
137
138    int i, n, r;
139    const int kv = 512;
140    char keybuf[kv];
141    char bodybuf[kv];
142    void *rvalue;
143    size_t rvalue_len;
144    fdb_status status;
145    fdb_file_handle *dbfile;
146    fdb_kvs_handle *db, *snap_db;
147    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
148    sb_decision_t sb_decision;
149    fdb_file_info file_info;
150
151    fdb_config fconfig = fdb_get_default_config();
152    fconfig.compaction_threshold = 0;
153    fconfig.block_reusing_threshold = 65;
154    fconfig.num_keeping_headers = 10;
155
156    // remove previous staleblktest files
157    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
158    (void)r;
159
160    fdb_open(&dbfile, "./staleblktest1", &fconfig);
161    fdb_kvs_open_default(dbfile, &db, &kvs_config);
162
163    // create 2*num_keeping_headers
164    n = 2 * fconfig.num_keeping_headers;
165    for (i = 0; i < n; i++) {
166        fillstr(keybuf, 'k', kv);
167        sprintf(bodybuf, "orig_body%d", i+1);
168        status = fdb_set_kv(db, keybuf, strlen(keybuf),
169                            bodybuf, strlen(bodybuf) + 1);
170        TEST_STATUS(status);
171        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
172        TEST_STATUS(status);
173    }
174
175    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
176    i = 0;
177    do {
178        fillstr(keybuf, 'r', kv);
179        fillstr(bodybuf, 'y', kv);
180        status = fdb_set_kv(db, keybuf, strlen(keybuf),
181                            bodybuf, strlen(bodybuf) + 1);
182        TEST_STATUS(status);
183        i++;
184        status = fdb_get_file_info(dbfile, &file_info);
185        TEST_STATUS(status);
186    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
187
188    sb_decision = sb_check_block_reusing(db);
189    TEST_CHK(sb_decision == SBD_RECLAIM);
190
191    // open snapshot
192    status = fdb_snapshot_open(db, &snap_db, 8);
193    TEST_STATUS(status);
194
195    // commit
196    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
197    TEST_STATUS(status);
198
199    // write until blocks are no longer being reused
200    i = 0;
201    do {
202        sprintf(keybuf, "key%d", i);
203        fillstr(bodybuf, 'z', kv);
204        status = fdb_set_kv(db, keybuf, strlen(keybuf),
205                            bodybuf, strlen(bodybuf));
206        TEST_STATUS(status);
207        sb_decision = sb_check_block_reusing(db);
208        i++;
209    } while (sb_decision != SBD_NONE);
210
211    // delete original keys
212    n = 2 * fconfig.num_keeping_headers;
213    for (i = 0; i < n; i++) {
214        fillstr(keybuf, 'k', kv);
215        status = fdb_del_kv(db, keybuf, strlen(keybuf));
216        TEST_STATUS(status);
217    }
218
219    // commit
220    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
221    TEST_STATUS(status);
222
223    fillstr(keybuf, 'k', kv);
224    sprintf(bodybuf, "orig_body%d", 8);
225
226    // check key does not exist in main kv
227    status = fdb_get_kv(db, keybuf, strlen(keybuf),
228                        &rvalue, &rvalue_len);
229    TEST_CHK(status == FDB_RESULT_KEY_NOT_FOUND);
230
231    // check still exists in snapshot data
232    status = fdb_get_kv(snap_db, keybuf, strlen(keybuf),
233                        &rvalue, &rvalue_len);
234    TEST_STATUS(status);
235    TEST_CMP(rvalue, bodybuf, rvalue_len);
236
237    status = fdb_free_block(rvalue);
238    TEST_STATUS(status);
239    status = fdb_kvs_close(snap_db);
240    TEST_STATUS(status);
241    status = fdb_kvs_close(db);
242    TEST_STATUS(status);
243    status = fdb_close(dbfile);
244    TEST_STATUS(status);
245    fdb_shutdown();
246
247    memleak_end();
248    TEST_RESULT("reuse with snapshot test");
249}
250
251void verify_minimum_num_keeping_headers_param_test() {
252    memleak_start();
253    TEST_INIT();
254
255    int i, r;
256    fdb_file_handle* dbfile;
257    fdb_kvs_handle* db;
258    fdb_kvs_handle* snap_db;
259    fdb_status status;
260    fdb_config fconfig = fdb_get_default_config();
261    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
262    fdb_file_info file_info;
263
264    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
265    (void)r;
266
267    // init
268    fconfig.compaction_threshold = 0;
269
270    // open with zero num_keeping_headers parameter
271    fconfig.num_keeping_headers = 0;
272    status = fdb_open(&dbfile, (char *)"./staleblktest1", &fconfig);
273    // should fail
274    TEST_CHK(status != FDB_RESULT_SUCCESS);
275
276    // open with single keeping header
277    fconfig.num_keeping_headers = 1;
278    status = fdb_open(&dbfile, (char *)"./staleblktest1", &fconfig);
279    TEST_STATUS(status);
280    status = fdb_kvs_open(dbfile, &db, (char *)"num_keep", &kvs_config);
281    TEST_STATUS(status);
282
283    const char *key = "key";
284    const char *val = "val";
285    // 10 commits
286    for (i = 0; i < 10; ++i) {
287        status = fdb_set_kv(db, key, strlen(key) + 1,
288                            val, strlen(val) + 1);
289        TEST_STATUS(status);
290        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
291        TEST_STATUS(status);
292    }
293
294    // open snapshot on 9th commit
295    status = fdb_snapshot_open(db, &snap_db, 9);
296    TEST_STATUS(status);
297
298    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
299    i = 0;
300    do {
301        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
302        TEST_STATUS(status);
303        i++;
304        status = fdb_get_file_info(dbfile, &file_info);
305        TEST_STATUS(status);
306    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
307
308    // close snapshot (reader)
309    status = fdb_kvs_close(snap_db);
310    TEST_STATUS(status);
311
312    // 11th commit causes reclaim and loss of 9th header
313    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
314    TEST_STATUS(status);
315
316    // attempt to open new snapshot on 9th commit
317    status = fdb_snapshot_open(db, &snap_db, 9);
318    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
319
320    // open new snapshot on 10th commit
321    status = fdb_snapshot_open(db, &snap_db, 10);
322    TEST_STATUS(status);
323
324    // cleanup
325    status = fdb_close(dbfile);
326    TEST_STATUS(status);
327    status = fdb_shutdown();
328    TEST_STATUS(status);
329
330    memleak_end();
331    TEST_RESULT("verify minimum num keeping headers param test");
332}
333
334void verify_high_num_keeping_headers_param_test() {
335    memleak_start();
336    TEST_INIT();
337
338    int i, r;
339    int low_seq = 0;
340    int nheaders=100;
341    char keybuf[16];
342    void *rvalue;
343    size_t rvalue_len;
344
345    fdb_file_handle* dbfile;
346    fdb_kvs_handle* db;
347    fdb_kvs_handle* snap_db;
348    fdb_status status;
349    fdb_config fconfig = fdb_get_default_config();
350    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
351    fdb_file_info file_info;
352
353    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
354    (void)r;
355
356    // init
357    fconfig.compaction_threshold = 0;
358    fconfig.num_keeping_headers = nheaders;
359    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
360    TEST_STATUS(status);
361    status = fdb_kvs_open(dbfile, &db, "num_keep", &kvs_config);
362    TEST_STATUS(status);
363
364    const char *key = "key";
365    const char *val = "val";
366
367    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
368    i = 0;
369    do {
370        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
371        TEST_STATUS(status);
372        i++;
373        status = fdb_get_file_info(dbfile, &file_info);
374        TEST_STATUS(status);
375    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
376
377    // create lowest commit
378    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
379    TEST_STATUS(status);
380    low_seq = i;
381
382    // create 100 headers
383    for (i = 0; i < nheaders; ++i) {
384        sprintf(keybuf, "key%d", i);
385        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu", 4);
386        TEST_STATUS(status);
387        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
388        TEST_STATUS(status);
389    }
390
391    // make sure all blocks up to kept headers are reused
392    i = low_seq;
393    while (--i) {
394        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
395        TEST_STATUS(status);
396    }
397
398    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
399    TEST_STATUS(status);
400
401    // -101 commit fail
402    status = fdb_snapshot_open(db, &snap_db, low_seq);
403    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
404
405    // -100 commit pass
406    low_seq++;
407    status = fdb_snapshot_open(db, &snap_db, low_seq);
408    TEST_STATUS(status);
409
410    status = fdb_get_kv(db, keybuf, strlen(keybuf), &rvalue, &rvalue_len);
411    TEST_STATUS(status);
412
413    // cleanup
414    status = fdb_free_block(rvalue);
415    TEST_STATUS(status);
416    status = fdb_kvs_close(snap_db);
417    TEST_STATUS(status);
418    status = fdb_close(dbfile);
419    TEST_STATUS(status);
420    status = fdb_shutdown();
421    TEST_STATUS(status);
422
423    memleak_end();
424    TEST_RESULT("verify high num keeping headers param test");
425}
426
427void snapshot_before_block_reuse_test(bool inmem) {
428    memleak_start();
429    TEST_INIT();
430
431    int i, j, n, r;
432    void *rvalue;
433    size_t rvalue_len;
434    char bodybuf[256];
435    fdb_file_handle* dbfile;
436    fdb_kvs_handle *db, *snap_db;
437    fdb_status status;
438    fdb_config fconfig = fdb_get_default_config();
439    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
440    sb_decision_t sb_decision;
441    fdb_file_info file_info;
442
443    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
444    (void)r;
445
446    fconfig.num_keeping_headers = 1;
447    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
448    TEST_STATUS(status);
449    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
450    TEST_STATUS(status);
451
452    const char *key = "key";
453    const char *val = "snp";
454
455    // set key
456    status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
457    TEST_STATUS(status);
458
459    // commit
460    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
461    TEST_STATUS(status);
462
463    // open snapshot
464    if (inmem) {
465        status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
466        TEST_STATUS(status);
467    } else {
468        status = fdb_snapshot_open(db, &snap_db, 1);
469        TEST_STATUS(status);
470    }
471
472    val = "val";
473
474    // initial load to reuse
475    i = 0;
476    do {
477        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
478        TEST_STATUS(status);
479        i++;
480        status = fdb_get_file_info(dbfile, &file_info);
481        TEST_STATUS(status);
482    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
483
484    // 5 update cycles of created items
485    for (n = 0; n < 5; n++) {
486        for(j = 0; j < i; j++) {
487            status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
488            TEST_STATUS(status);
489        }
490        // commit
491        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
492        TEST_STATUS(status);
493    }
494
495    // verify in reclaim mode
496    sb_decision = sb_check_block_reusing(db);
497    TEST_CHK(sb_decision == SBD_RECLAIM);
498
499    // expect pre-reuse data retained
500    status = fdb_get_kv(snap_db, key, strlen(key) + 1, &rvalue, &rvalue_len);
501    TEST_STATUS(status);
502    TEST_CMP(rvalue, (char *)"snp", rvalue_len);
503
504    // close snapshot
505    status = fdb_kvs_close(snap_db);
506    TEST_STATUS(status);
507
508    // do another update cycle
509    for (j = 0; j < i; j++) {
510        // commit
511        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
512        TEST_STATUS(status);
513    }
514    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
515    TEST_STATUS(status);
516
517    // attempt to open again will fail
518    status = fdb_snapshot_open(db, &snap_db, 1);
519    if (status == FDB_RESULT_SUCCESS) {
520        // close incase kv was unexpectedly opened
521        status = fdb_kvs_close(snap_db);
522        TEST_STATUS(status);
523    }
524    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
525
526    status = fdb_free_block(rvalue);
527    TEST_STATUS(status);
528    status = fdb_kvs_close(db);
529    TEST_STATUS(status);
530    status = fdb_close(dbfile);
531    TEST_STATUS(status);
532    fdb_shutdown();
533
534    memleak_end();
535
536    sprintf(bodybuf,"snapshot before block reuse test %s", inmem ?
537            "in-mem snapshot" : "disk snapshot");
538    TEST_RESULT(bodybuf);
539}
540
541void snapshot_after_block_reuse_test() {
542    memleak_start();
543    TEST_INIT();
544
545    int i, r;
546    int low_seq = 0;
547    int nheaders=5;
548    char keybuf[16];
549
550    fdb_file_handle* dbfile;
551    fdb_kvs_handle* db;
552    fdb_kvs_handle* snap_db;
553    fdb_status status;
554    fdb_config fconfig = fdb_get_default_config();
555    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
556    fdb_file_info file_info;
557
558    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
559    (void)r;
560
561    // init
562    fconfig.compaction_threshold = 0;
563    fconfig.num_keeping_headers = nheaders;
564    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
565    TEST_STATUS(status);
566    status = fdb_kvs_open(dbfile, &db, "num_keep", &kvs_config);
567    TEST_STATUS(status);
568
569    const char *key = "key";
570    const char *val = "val";
571
572    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
573    i = 0;
574    do {
575        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
576        TEST_STATUS(status);
577        i++;
578        status = fdb_get_file_info(dbfile, &file_info);
579        TEST_STATUS(status);
580    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
581
582    // create lowest commit
583    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
584    TEST_STATUS(status);
585    low_seq = i;
586
587    // create nheaders
588    for (i = 0; i < nheaders + 5; ++i) {
589        sprintf(keybuf, "key%d", i);
590        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu", 4);
591        TEST_STATUS(status);
592        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
593        TEST_STATUS(status);
594    }
595
596    // make sure all blocks up to kept headers are reused
597    i = low_seq;
598    while (--i) {
599        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
600        TEST_STATUS(status);
601    }
602    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
603    TEST_STATUS(status);
604
605    // open snapshot on lowest seqno available
606    low_seq += 6;
607    status = fdb_snapshot_open(db, &snap_db, low_seq);
608    TEST_STATUS(status);
609    status = fdb_kvs_close(snap_db);
610    TEST_STATUS(status);
611
612    // open snapshot using seqno already reclaimed
613    status = fdb_snapshot_open(db, &snap_db, low_seq-1);
614    TEST_CHK(status != FDB_RESULT_SUCCESS);
615
616    status = fdb_kvs_close(db);
617    TEST_STATUS(status);
618    status = fdb_close(dbfile);
619    TEST_STATUS(status);
620    fdb_shutdown();
621
622    memleak_end();
623    TEST_RESULT("snapshot after block reuse test");
624}
625
626void snapshot_inmem_before_block_reuse_test() {
627    memleak_start();
628    TEST_INIT();
629
630    int i, j, n, r;
631    void *rvalue;
632    size_t rvalue_len;
633    fdb_file_handle* dbfile;
634    fdb_kvs_handle *db, *snap_db;
635    fdb_status status;
636    fdb_config fconfig = fdb_get_default_config();
637    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
638    sb_decision_t sb_decision;
639    fdb_file_info file_info;
640
641    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
642    (void)r;
643
644    fconfig.num_keeping_headers = 1;
645    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
646    TEST_STATUS(status);
647    status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
648    TEST_STATUS(status);
649
650    const char* key = "key";
651    const char* val = "snp";
652
653    // set key
654    status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
655    TEST_STATUS(status);
656
657    // commit
658    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
659    TEST_STATUS(status);
660
661    // open snapshot
662    status = fdb_snapshot_open(db, &snap_db, FDB_SNAPSHOT_INMEM);
663    TEST_STATUS(status);
664
665    val = "val";
666
667    // initial load to reuse
668    i = 0;
669    do {
670        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
671        TEST_STATUS(status);
672        i++;
673        status = fdb_get_file_info(dbfile, &file_info);
674        TEST_STATUS(status);
675    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
676
677    // 5 update cycles of created items
678    for (n = 0; n < 5; n++) {
679        for (j = 0; j < i; j++) {
680            status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
681            TEST_STATUS(status);
682        }
683        // commit
684        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
685        TEST_STATUS(status);
686    }
687
688    // verify in reclaim mode
689    sb_decision = sb_check_block_reusing(db);
690    TEST_CHK(sb_decision == SBD_RECLAIM);
691
692    // expect pre-reuse data retained
693    status = fdb_get_kv(snap_db, key, strlen(key) + 1, &rvalue, &rvalue_len);
694    TEST_STATUS(status);
695    TEST_CMP(rvalue, (char *)"snp", rvalue_len);
696
697    // close snapshot
698    status = fdb_kvs_close(snap_db);
699    TEST_STATUS(status);
700
701    // do another update cycle
702    for (j = 0; j < i; j++) {
703        // commit
704        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
705        TEST_STATUS(status);
706    }
707    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
708    TEST_STATUS(status);
709
710    // attempt to open again will fail
711    status = fdb_snapshot_open(db, &snap_db, 1);
712    if (status == FDB_RESULT_SUCCESS) {
713        // close incase kv was unexpectedly opened
714        status = fdb_kvs_close(snap_db);
715        TEST_STATUS(status);
716    }
717    TEST_CHK(status == FDB_RESULT_NO_DB_INSTANCE);
718
719    status = fdb_free_block(rvalue);
720    TEST_STATUS(status);
721    status = fdb_kvs_close(db);
722    TEST_STATUS(status);
723    status = fdb_close(dbfile);
724    TEST_STATUS(status);
725    fdb_shutdown();
726
727    memleak_end();
728    TEST_RESULT("snapshot inmem before block reuse test");
729}
730
731void variable_value_size_test() {
732    TEST_INIT();
733    memleak_start();
734
735    uint64_t i;
736    int j, n, r;
737    const int ndocs = 3;
738    int blen;
739    char keybuf[256];
740    char *bodybuf = new char[1024 * 5120];
741    void *rvalue;
742    size_t rvalue_len;
743    fdb_status status;
744    fdb_file_handle *dbfile;
745    fdb_kvs_handle *db;
746    fdb_kvs_handle *snap_db, *snap_db2;
747    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
748    sb_decision_t sb_decision;
749    fdb_file_info file_info;
750
751    fdb_config fconfig = fdb_get_default_config();
752    fconfig.compaction_threshold = 0;
753    fconfig.block_reusing_threshold = 65;
754
755    // remove previous staleblktest files
756    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
757    (void)r;
758
759    fdb_open(&dbfile, "./staleblktest1", &fconfig);
760    fdb_kvs_open_default(dbfile, &db, &kvs_config);
761
762    // load with doc sizes 512, 1024, 1536
763    n = 0;
764    for (i = 0; i < fconfig.num_keeping_headers + 1; i++) {
765        for (j = 1; j <= ndocs; j++) {
766            blen = j * 512;
767            sprintf(keybuf, "%d_key", blen);
768            fillstr(bodybuf, 'a', blen);    // Max blen allowed: 1024*5120
769            status = fdb_set_kv(db, keybuf, strlen(keybuf) + 1,
770                                bodybuf, strlen(bodybuf) + 1);
771            TEST_STATUS(status);
772            n++;
773        }
774        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
775        TEST_STATUS(status);
776    }
777
778    // open disk snapshot
779    status = fdb_snapshot_open(db, &snap_db, n);
780    TEST_STATUS(status);
781
782    // update docs with sizes 1024, 2048, 3072
783    i = 0;
784    do {
785        for (j = 1;j <= ndocs; j++) {
786            blen = j * 1024;
787            sprintf(keybuf, "%d_key", blen);
788            fillstr(bodybuf, 'b', blen);
789            status = fdb_set_kv(db, keybuf, strlen(keybuf) + 1,
790                                bodybuf, strlen(bodybuf) + 1);
791            TEST_STATUS(status);
792        }
793        i++;
794        status = fdb_get_file_info(dbfile, &file_info);
795        TEST_STATUS(status);
796    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
797
798    sb_decision = sb_check_block_reusing(db);
799    TEST_CHK(sb_decision == SBD_RECLAIM);
800
801    status = fdb_commit(dbfile, FDB_COMMIT_NORMAL);
802    TEST_STATUS(status);
803
804    // create in mem snapshot
805    status = fdb_snapshot_open(db, &snap_db2, FDB_SNAPSHOT_INMEM);
806    TEST_STATUS(status);
807
808    // update cycle with small values
809    while (--i) {
810        blen = i * 8;
811        sprintf(keybuf, "%d_key", blen);
812        fillstr(bodybuf, 'c', blen);
813        status = fdb_set_kv(db, keybuf, strlen(keybuf) + 1,
814                            bodybuf, strlen(bodybuf) + 1);
815        TEST_STATUS(status);
816        if ((i % 10) == 0) {
817            status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
818            TEST_STATUS(status);
819        }
820    }
821    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
822    TEST_STATUS(status);
823
824    // write very large value
825    sprintf(keybuf, "%d_key", 512);
826    fillstr(bodybuf, 'a', 1024 * 5120);
827    status = fdb_set_kv(db, keybuf, strlen(keybuf) + 1,
828                        bodybuf, strlen(bodybuf) + 1);
829    TEST_STATUS(status);
830    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
831    TEST_STATUS(status);
832
833    // verify disk snapshot
834    blen = 512;
835    sprintf(keybuf, "%d_key", blen);
836    fillstr(bodybuf, 'a', blen);
837    status = fdb_get_kv(snap_db, keybuf, strlen(keybuf) + 1,
838                        &rvalue, &rvalue_len);
839    TEST_STATUS(status);
840    TEST_CMP(bodybuf, rvalue, rvalue_len);
841    status = fdb_free_block(rvalue);
842    TEST_STATUS(status);
843
844    // verify in mem snapshot
845    blen = 2048;
846    sprintf(keybuf, "%d_key", blen);
847    fillstr(bodybuf, 'b', blen);
848    status = fdb_get_kv(snap_db2, keybuf, strlen(keybuf) + 1,
849                        &rvalue, &rvalue_len);
850    TEST_STATUS(status);
851    TEST_CMP(bodybuf, rvalue, rvalue_len);
852    status = fdb_free_block(rvalue);
853    TEST_STATUS(status);
854
855    delete[] bodybuf;
856
857    // cleanup
858    status = fdb_kvs_close(db);
859    TEST_STATUS(status);
860    status = fdb_kvs_close(snap_db);
861    TEST_STATUS(status);
862    status = fdb_kvs_close(snap_db2);
863    TEST_STATUS(status);
864    status = fdb_close(dbfile);
865    TEST_STATUS(status);
866    fdb_shutdown();
867
868    memleak_end();
869    TEST_RESULT("variable value size test");
870}
871
872void rollback_with_num_keeping_headers() {
873    TEST_INIT();
874    memleak_start();
875
876    int i, n, r;
877    char keybuf[256];
878    char bodybuf[1024];
879    fdb_status status;
880    fdb_file_handle *dbfile;
881    fdb_kvs_handle *db;
882    fdb_kvs_info kvs_info;
883    sb_decision_t sb_decision;
884    fdb_file_info file_info;
885
886    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
887    fdb_config fconfig = fdb_get_default_config();
888    fconfig.compaction_threshold = 0;
889    fconfig.block_reusing_threshold = 65;
890    fconfig.num_keeping_headers = 1;
891
892    // remove previous staleblktest files
893    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
894    (void)r;
895
896    fdb_open(&dbfile, "./staleblktest1", &fconfig);
897    fdb_kvs_open_default(dbfile, &db, &kvs_config);
898
899    // create 10 headers
900    for (i = 0; i < 10; i++) {
901        sprintf(keybuf, "%dkey",i);
902        fillstr(bodybuf, 'b', 512);
903        status = fdb_set_kv(db, keybuf, strlen(keybuf),
904                            bodybuf, strlen(bodybuf));
905        TEST_STATUS(status);
906        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
907        TEST_STATUS(status);
908    }
909
910    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
911    i = 0;
912    do {
913        sprintf(keybuf, "0key");
914        fillstr(bodybuf, 'c', 128);
915        status = fdb_set_kv(db, keybuf, strlen(keybuf),
916                            bodybuf, strlen(bodybuf));
917        TEST_STATUS(status);
918        i++;
919        status = fdb_get_file_info(dbfile, &file_info);
920        TEST_STATUS(status);
921    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
922    n = i;
923
924    // expect block reclaim
925    sb_decision = sb_check_block_reusing(db);
926    TEST_CHK(sb_decision == SBD_RECLAIM);
927
928    // reclaim old header via 11th commit
929    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
930    TEST_STATUS(status);
931
932    // load unique keys to reuse old blocks
933    for (i = 0; i < n; i++) {
934        sprintf(keybuf, "%dkey",i);
935        fillstr(bodybuf, 'd', 512);
936        status = fdb_set_kv(db, keybuf, strlen(keybuf),
937                            bodybuf, strlen(bodybuf));
938        TEST_STATUS(status);
939    }
940
941    // do rollback to 10th headers
942    status = fdb_rollback(&db, 10);
943    TEST_STATUS(status);
944
945    // expect only 10 docs
946    status = fdb_get_kvs_info(db, &kvs_info);
947    TEST_STATUS(status);
948    TEST_CHK(kvs_info.doc_count == 10);
949
950    // cleanup
951    status = fdb_kvs_close(db);
952    TEST_STATUS(status);
953    status = fdb_close(dbfile);
954    TEST_STATUS(status);
955    fdb_shutdown();
956
957    memleak_end();
958    TEST_RESULT("rollback with num keeping headers");
959}
960
961void crash_and_recover_with_num_keeping_test() {
962    TEST_INIT();
963    memleak_start();
964
965    int i, r, ndocs;
966    int nheaders = 10;
967    size_t last_seqno;
968    char keybuf[256];
969    char bodybuf[512];
970
971    fdb_status status;
972    fdb_file_handle *dbfile;
973    fdb_kvs_handle *db, *snap_db;
974    fdb_file_info file_info;
975    fdb_kvs_info kvs_info;
976    sb_decision_t sb_decision;
977
978    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
979    fdb_config fconfig = fdb_get_default_config();
980    fconfig.compaction_threshold = 0;
981    fconfig.block_reusing_threshold = 65;
982    fconfig.num_keeping_headers = nheaders;
983
984    // remove previous staleblktest files
985    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
986    (void)r;
987
988    fdb_open(&dbfile, "./staleblktest1file.1", &fconfig);
989    fdb_kvs_open(dbfile, &db, "./staleblktest1", &kvs_config);
990
991    // create num_keeping_headers+1
992    for (i = 0; i < nheaders + 1; i++) {
993        sprintf(keybuf, "%dkey",i);
994        fillstr(bodybuf, 'b', 512);
995        status = fdb_set_kv(db, keybuf, strlen(keybuf),
996                            bodybuf, strlen(bodybuf));
997        TEST_STATUS(status);
998        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
999        TEST_STATUS(status);
1000    }
1001
1002    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
1003    i = 0;
1004    do {
1005        sprintf(keybuf, "0key");
1006        fillstr(bodybuf, 'c', 128);
1007        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1008                            bodybuf, strlen(bodybuf));
1009        TEST_STATUS(status);
1010        i++;
1011        status = fdb_get_file_info(dbfile, &file_info);
1012        TEST_STATUS(status);
1013    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
1014    ndocs = i;
1015
1016    // expect block reclaim
1017    sb_decision = sb_check_block_reusing(db);
1018    TEST_CHK(sb_decision == SBD_RECLAIM);
1019
1020    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1021    TEST_STATUS(status);
1022    status = fdb_get_kvs_info(db, &kvs_info);
1023    TEST_STATUS(status);
1024    last_seqno = kvs_info.last_seqnum;
1025
1026    // create num_keeping_headers
1027    for (i = 0; i < nheaders; i++) {
1028        sprintf(keybuf, "%dkey",i);
1029        fillstr(bodybuf, 'd', 64);
1030        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1031                            bodybuf, strlen(bodybuf));
1032        TEST_STATUS(status);
1033        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1034        TEST_STATUS(status);
1035    }
1036
1037    // preemptive shutdown
1038    status = fdb_kvs_close(db);
1039    TEST_STATUS(status);
1040    status = fdb_close(dbfile);
1041    TEST_STATUS(status);
1042    fdb_shutdown();
1043
1044    // reopen
1045    status = fdb_open(&dbfile, "./staleblktest1file.1", &fconfig);
1046    TEST_STATUS(status);
1047    fdb_kvs_open(dbfile, &db, "./staleblktest1", &kvs_config);
1048    TEST_STATUS(status);
1049
1050    status = fdb_get_file_info(dbfile, &file_info);
1051    TEST_STATUS(status);
1052    r = _disk_dump("./staleblktest1file.1", file_info.file_size,
1053                   (2 * fconfig.blocksize) + (fconfig.blocksize / 4));
1054    TEST_CHK(r >= 0);
1055
1056    // snapshot to last keeping header
1057    status = fdb_snapshot_open(db, &snap_db, last_seqno);
1058    TEST_STATUS(status);
1059
1060    // rollback to last keepheader
1061    status = fdb_rollback(&db, last_seqno);
1062    TEST_STATUS(status);
1063
1064    // manual commit
1065    status = fdb_compact(dbfile, "./staleblktest1file.3");
1066    TEST_STATUS(status);
1067
1068    // delete items
1069    for (i = 0; i < nheaders + 1; i++) {
1070        sprintf(keybuf, "%dkey",i);
1071        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1072        TEST_STATUS(status);
1073        // commit
1074        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1075        TEST_STATUS(status);
1076    }
1077
1078    // not reusing blocks
1079    sb_decision = sb_check_block_reusing(db);
1080    TEST_CHK(sb_decision == SBD_NONE);
1081
1082    // append until reuse
1083    for (i = 0; i < ndocs; i++) {
1084        sprintf(keybuf, "0key");
1085        fillstr(bodybuf, 'e', 512);
1086        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1087                            bodybuf, strlen(bodybuf));
1088        TEST_STATUS(status);
1089    }
1090    sb_decision = sb_check_block_reusing(db);
1091    TEST_CHK(sb_decision == SBD_RECLAIM);
1092
1093    status = fdb_kvs_close(snap_db);
1094    TEST_STATUS(status);
1095    status = fdb_kvs_close(db);
1096    TEST_STATUS(status);
1097    status = fdb_close(dbfile);
1098    TEST_STATUS(status);
1099    fdb_shutdown();
1100
1101    memleak_end();
1102    TEST_RESULT("crash and recover with num keeping test");
1103}
1104
1105void reuse_on_delete_test() {
1106    TEST_INIT();
1107    memleak_start();
1108
1109    int i, r, ndocs;
1110    int nheaders = 10;
1111    char keybuf[256];
1112    char bodybuf[512];
1113
1114    fdb_status status;
1115    fdb_file_handle *dbfile;
1116    fdb_kvs_handle *db;
1117    sb_decision_t sb_decision;
1118    fdb_file_info file_info;
1119
1120    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1121    fdb_config fconfig = fdb_get_default_config();
1122    fconfig.compaction_threshold = 0;
1123    fconfig.block_reusing_threshold = 65;
1124    fconfig.num_keeping_headers = nheaders;
1125
1126    // remove previous staleblktest files
1127    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1128    (void)r;
1129
1130    fdb_open(&dbfile, "./staleblktest1", &fconfig);
1131    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1132
1133    // create num_keeping_headers+1
1134    for (i = 0; i < nheaders + 1; i++) {
1135        sprintf(keybuf, "%dkey",i);
1136        fillstr(bodybuf, 'b', 512);
1137        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1138                            bodybuf, strlen(bodybuf));
1139        TEST_STATUS(status);
1140        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1141        TEST_STATUS(status);
1142    }
1143
1144    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
1145    i = 0;
1146    do {
1147        sprintf(keybuf, "%dkey",i);
1148        fillstr(bodybuf, 'c', 128);
1149        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1150                            bodybuf, strlen(bodybuf));
1151        TEST_STATUS(status);
1152        i++;
1153        status = fdb_get_file_info(dbfile, &file_info);
1154        TEST_STATUS(status);
1155    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
1156    ndocs = i;
1157
1158    // expect NO REUSE
1159    sb_decision = sb_check_block_reusing(db);
1160    TEST_CHK(sb_decision == SBD_NONE);
1161    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1162    TEST_STATUS(status);
1163
1164    // delete so that file becomes stale
1165    for (i = 0; i < ndocs; ++i) {
1166        sprintf(keybuf, "%dkey",i);
1167        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1168        TEST_STATUS(status);
1169    }
1170
1171    sb_decision = sb_check_block_reusing(db);
1172    TEST_CHK(sb_decision == SBD_RECLAIM);
1173
1174    // reload 1/4 all keys again and expect no file size growth
1175    // since all docs being reused
1176    for (i = 0; i < ndocs / 4; ++i) {
1177        sprintf(keybuf, "%dkey",i);
1178        fillstr(bodybuf, 'd', 128);
1179        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1180                            bodybuf, strlen(bodybuf));
1181        TEST_STATUS(status);
1182        i++;
1183    }
1184
1185    // expect to still be in reuse mode
1186    sb_decision = sb_check_block_reusing(db);
1187    TEST_CHK(sb_decision == SBD_RECLAIM);
1188
1189    status = fdb_close(dbfile);
1190    TEST_STATUS(status);
1191    status = fdb_shutdown();
1192    TEST_STATUS(status);
1193
1194    memleak_end();
1195    TEST_RESULT("reuse on delete test");
1196}
1197
1198void fragmented_reuse_test() {
1199    TEST_INIT();
1200    memleak_start();
1201
1202    int i, r, ndocs;
1203    int nheaders = 10;
1204    char keybuf[256];
1205    char bodybuf[512];
1206    size_t fpos;
1207
1208    fdb_status status;
1209    fdb_file_handle *dbfile;
1210    fdb_kvs_handle *db;
1211    sb_decision_t sb_decision;
1212    fdb_file_info file_info;
1213
1214    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1215    fdb_config fconfig = fdb_get_default_config();
1216    fconfig.compaction_threshold = 0;
1217    fconfig.block_reusing_threshold = 35;
1218    fconfig.num_keeping_headers = nheaders;
1219
1220    // remove previous staleblktest files
1221    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1222    (void)r;
1223
1224    fdb_open(&dbfile, "./staleblktest1", &fconfig);
1225    fdb_kvs_open_default(dbfile, &db, &kvs_config);
1226
1227    // satisfy reuse constraints
1228    for (i = 0; i < nheaders + 1; i++) {
1229        sprintf(keybuf, "%dkey",i);
1230        fillstr(bodybuf, 'a', 512);
1231        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1232                            bodybuf, strlen(bodybuf));
1233        TEST_STATUS(status);
1234        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1235        TEST_STATUS(status);
1236    }
1237    i = 0;
1238    do {
1239        sprintf(keybuf, "%dkey",i);
1240        fillstr(bodybuf, 'b', 128);
1241        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1242                            bodybuf, strlen(bodybuf));
1243        TEST_STATUS(status);
1244        i++;
1245        status = fdb_get_file_info(dbfile, &file_info);
1246        TEST_STATUS(status);
1247    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
1248    ndocs = i;
1249
1250    // make 25% of file stale
1251    for (i = 0; i < ndocs / 4; i++) {
1252        sprintf(keybuf, "%dkey",i);
1253        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1254        TEST_STATUS(status);
1255    }
1256
1257    // verify blocks not being reused
1258    sb_decision = sb_check_block_reusing(db);
1259    TEST_CHK(sb_decision == SBD_NONE);
1260
1261    // manual compaction
1262    fpos = filemgr_get_pos(db->file);
1263    status = fdb_compact(dbfile, "staleblktest_compact");
1264    TEST_STATUS(status);
1265    TEST_CHK(fpos > filemgr_get_pos(db->file));
1266
1267    // making additional 25% of file stale should NOT go into
1268    // block reuse.  without compaction we would be 50% stale
1269    for (i = ndocs / 4; i < ndocs / 2; i++) {
1270        sprintf(keybuf, "%dkey",i);
1271        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1272        TEST_STATUS(status);
1273    }
1274    sb_decision = sb_check_block_reusing(db);
1275    TEST_CHK(sb_decision == SBD_NONE);
1276
1277    // delete rest of docs
1278    for (i = ndocs / 2; i < ndocs; i++) {
1279        sprintf(keybuf, "%dkey",i);
1280        status = fdb_del_kv(db, keybuf, strlen(keybuf));
1281        TEST_STATUS(status);
1282    }
1283
1284    // restore nheaders again
1285    for (i = 0; i < nheaders + 1; i++) {
1286        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1287        TEST_STATUS(status);
1288    }
1289
1290    sb_decision = sb_check_block_reusing(db);
1291    TEST_CHK(sb_decision != SBD_NONE);
1292
1293    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1294    TEST_STATUS(status);
1295    status = fdb_close(dbfile);
1296    TEST_STATUS(status);
1297    status = fdb_shutdown();
1298    TEST_STATUS(status);
1299
1300    memleak_end();
1301    TEST_RESULT("fragmented reuse test");
1302}
1303
1304void enter_reuse_via_separate_kvs_test() {
1305    TEST_INIT();
1306    memleak_start();
1307
1308    int i, r, ndocs;
1309    int nheaders = 10;
1310    char keybuf[256];
1311    char bodybuf[512];
1312
1313    fdb_status status;
1314    fdb_file_handle *dbfile;
1315    fdb_kvs_handle *db, *db2;
1316    fdb_doc *rdoc = NULL;
1317    fdb_iterator *iterator;
1318    sb_decision_t sb_decision;
1319    fdb_file_info file_info;
1320
1321    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1322    fdb_config fconfig = fdb_get_default_config();
1323    fconfig.compaction_threshold = 0;
1324    fconfig.block_reusing_threshold = 35;
1325    fconfig.num_keeping_headers = nheaders;
1326
1327    // remove previous staleblktest files
1328    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1329    (void)r;
1330
1331    fdb_open(&dbfile, "./staleblktest1", &fconfig);
1332    fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1333    fdb_kvs_open(dbfile, &db2, "db2", &kvs_config);
1334
1335    // load docs into db and db2
1336    for (i = 0; i < nheaders; i++) {
1337        sprintf(keybuf, "%dkey",i);
1338        fillstr(bodybuf, 'a', 12);
1339        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1340                            bodybuf, strlen(bodybuf));
1341        TEST_STATUS(status);
1342        status = fdb_set_kv(db2, keybuf, strlen(keybuf),
1343                            bodybuf, strlen(bodybuf));
1344        TEST_STATUS(status);
1345        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1346        TEST_STATUS(status);
1347    }
1348
1349    // enter reuse via db
1350    i = 0;
1351    do {
1352        sprintf(keybuf, "0key");
1353        fillstr(bodybuf, 'b', 128);
1354        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1355                            bodybuf, strlen(bodybuf));
1356        TEST_STATUS(status);
1357        i++;
1358        status = fdb_get_file_info(dbfile, &file_info);
1359        TEST_STATUS(status);
1360    } while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE);
1361    ndocs = i;
1362
1363    sb_decision = sb_check_block_reusing(db);
1364    TEST_CHK(sb_decision == SBD_RECLAIM);
1365
1366    // commit
1367    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1368    TEST_STATUS(status);
1369
1370    // reuse blocks
1371    for (i = 0; i < ndocs; i++) {
1372        sprintf(keybuf, "key%d", i);
1373        fillstr(bodybuf, 'c', 128);
1374        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1375                            bodybuf, strlen(bodybuf));
1376        TEST_STATUS(status);
1377    }
1378    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1379    TEST_STATUS(status);
1380
1381    // check that docs from db2 not lost
1382    fdb_iterator_init(db2, &iterator, NULL, 0, NULL, 0, FDB_ITR_NONE);
1383    i = 0;
1384    do {
1385        status = fdb_iterator_get(iterator, &rdoc);
1386        TEST_CHK(status == FDB_RESULT_SUCCESS);
1387        fdb_doc_free(rdoc);
1388        rdoc = NULL;
1389        i++;
1390    } while (fdb_iterator_next(iterator) != FDB_RESULT_ITERATOR_FAIL);
1391    TEST_CHK(i==10);
1392    fdb_iterator_close(iterator);
1393
1394    status = fdb_kvs_close(db);
1395    TEST_STATUS(status);
1396    status = fdb_kvs_close(db2);
1397    TEST_STATUS(status);
1398    status = fdb_close(dbfile);
1399    TEST_STATUS(status);
1400    fdb_shutdown();
1401
1402    memleak_end();
1403    TEST_RESULT("enter reuse via separate kvs test");
1404}
1405
1406void child_function() {
1407    int i;
1408    char keybuf[256];
1409    char bodybuf[512];
1410    fdb_file_handle *dbfile;
1411    fdb_kvs_handle *db;
1412    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1413    fdb_config fconfig = fdb_get_default_config();
1414    fconfig.block_reusing_threshold = 65;
1415    fconfig.num_keeping_headers = 5;
1416
1417    fdb_open(&dbfile, "./staleblktest1", &fconfig);
1418    fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1419
1420    // load docs into db and db2
1421    i = 0;
1422    while (true) {
1423        sprintf(keybuf, "key%d",i);
1424        sprintf(bodybuf, "seqno%d",i);
1425        fdb_set_kv(db, keybuf, strlen(keybuf), bodybuf, strlen(bodybuf));
1426        // create seqno every 100 updates
1427        if ((i % 100) == 0) {
1428            if ((i % 500) == 0) { // wal flush every 500 updates
1429                fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1430            } else {
1431                fdb_commit(dbfile, FDB_COMMIT_NORMAL);
1432            }
1433        }
1434        i++;
1435    }
1436}
1437
1438#if !defined(WIN32) && !defined(_WIN32)
1439void superblock_recovery_test() {
1440    TEST_INIT();
1441    memleak_start();
1442
1443    int r;
1444    uint64_t i, num_markers;
1445    void *rvalue;
1446    size_t rvalue_len;
1447    char keybuf[256];
1448    char bodybuf[256];
1449    pid_t child_id;
1450
1451    fdb_seqnum_t seqno;
1452    fdb_file_handle *dbfile;
1453    fdb_kvs_handle *db, *snap_db;
1454    fdb_snapshot_info_t *markers;
1455    fdb_status status;
1456    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1457    fdb_config fconfig = fdb_get_default_config();
1458    fdb_file_info file_info;
1459
1460    // remove previous staleblktest files
1461    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1462    (void)r;
1463
1464    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
1465    TEST_STATUS(status);
1466    status = fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1467    TEST_STATUS(status);
1468
1469    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
1470    for (i = 0; i < fconfig.num_keeping_headers + 1; i++) {
1471        sprintf(keybuf, "key");
1472        status = fdb_set_kv(db, keybuf, strlen(keybuf), NULL, 0);
1473        TEST_STATUS(status);
1474        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1475        TEST_STATUS(status);
1476    }
1477
1478    status = fdb_get_file_info(dbfile, &file_info);
1479    TEST_STATUS(status);
1480
1481    while (file_info.file_size < SB_MIN_BLOCK_REUSING_FILESIZE) {
1482        sprintf(keybuf, "key");
1483        sprintf(bodybuf, "body%d", static_cast<int>(i));
1484        status = fdb_set_kv(db, keybuf, strlen(keybuf),
1485                            bodybuf, strlen(bodybuf));
1486        TEST_STATUS(status);
1487        status = fdb_get_file_info(dbfile, &file_info);
1488        TEST_STATUS(status);
1489    };
1490
1491    TEST_CHK(sb_check_block_reusing(db) == SBD_RECLAIM);
1492
1493    // close previous handle
1494    status = fdb_kvs_close(db);
1495    TEST_STATUS(status);
1496    status = fdb_close(dbfile);
1497    TEST_STATUS(status);
1498
1499    // fork child
1500    child_id = fork();
1501
1502    if (child_id == 0) {
1503        child_function();
1504    }
1505
1506    // wait 3s
1507    sleep(3);
1508
1509    // kill child
1510    kill(child_id, SIGUSR1);
1511    // reopen and recover
1512
1513    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
1514    TEST_STATUS(status);
1515    status = fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1516    TEST_STATUS(status);
1517
1518    // get known key
1519    sprintf(keybuf, "key0");
1520    status = fdb_get_kv(db, keybuf, strlen(keybuf), &rvalue, &rvalue_len);
1521    TEST_STATUS(status);
1522    status = fdb_free_block(rvalue);
1523    TEST_STATUS(status);
1524
1525    // compact upto marker
1526    status = fdb_get_all_snap_markers(dbfile, &markers, &num_markers);
1527    TEST_CHK(status == FDB_RESULT_SUCCESS);
1528    TEST_CHK(num_markers == fconfig.num_keeping_headers);
1529    status = fdb_compact_upto(dbfile, NULL, markers[4].marker);
1530    TEST_CHK(status == FDB_RESULT_SUCCESS);
1531
1532    // open snapshot on marker
1533    seqno = markers[4].kvs_markers->seqnum;
1534    status = fdb_snapshot_open(db, &snap_db, seqno);
1535    TEST_STATUS(status);
1536
1537    status = fdb_free_snap_markers(markers, num_markers);
1538    TEST_CHK(status == FDB_RESULT_SUCCESS);
1539
1540    // get known key
1541    sprintf(keybuf, "key0");
1542    status = fdb_get_kv(snap_db, keybuf, strlen(keybuf), &rvalue, &rvalue_len);
1543    TEST_STATUS(status);
1544
1545    // check value
1546    TEST_CMP(rvalue, "seqno0", rvalue_len);
1547    status = fdb_free_block(rvalue);
1548    TEST_STATUS(status);
1549
1550    status = fdb_kvs_close(snap_db);
1551    TEST_STATUS(status);
1552    status = fdb_kvs_close(db);
1553    TEST_STATUS(status);
1554    status = fdb_close(dbfile);
1555    TEST_STATUS(status);
1556    fdb_shutdown();
1557
1558    memleak_end();
1559    TEST_RESULT("superblock recovery test");
1560}
1561#endif
1562
1563void reclaim_rollback_point_test() {
1564    memleak_start();
1565    TEST_INIT();
1566
1567    int i, r;
1568    int low_seq = 0;
1569    int nheaders=5;
1570    int ndocs=30000;
1571    char keybuf[16];
1572
1573    fdb_file_handle* dbfile;
1574    fdb_kvs_handle* db;
1575    fdb_status status;
1576    fdb_config fconfig = fdb_get_default_config();
1577    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
1578    fdb_file_info file_info;
1579
1580    void *value_out;
1581    size_t valuelen_out;
1582
1583    r = system(SHELL_DEL" staleblktest* > errorlog.txt");
1584    (void)r;
1585
1586    // init
1587    fconfig.compaction_threshold = 0;
1588    fconfig.num_keeping_headers = nheaders;
1589    status = fdb_open(&dbfile, "./staleblktest1", &fconfig);
1590    TEST_STATUS(status);
1591    status = fdb_kvs_open(dbfile, &db, "db", &kvs_config);
1592    TEST_STATUS(status);
1593
1594    const char *key = "key";
1595    const char *val = "val";
1596
1597    // load n docs
1598    for (i=0; i<ndocs; ++i) {
1599        sprintf(keybuf, "key%d", i);
1600        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu", 4);
1601        TEST_STATUS(status);
1602    }
1603    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1604    TEST_STATUS(status);
1605    low_seq = i;
1606
1607    // load until exceeding SB_MIN_BLOCK_REUSING_FILESIZE
1608    i = 0;
1609    do {
1610        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
1611        TEST_STATUS(status);
1612        i++;
1613        status = fdb_get_file_info(dbfile, &file_info);
1614        TEST_STATUS(status);
1615    } while (file_info.file_size <= SB_MIN_BLOCK_REUSING_FILESIZE);
1616
1617    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1618    TEST_STATUS(status);
1619
1620    // overwrite n docs
1621    for (i=0; i<ndocs; ++i) {
1622        sprintf(keybuf, "key%d", i);
1623        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu2", 5);
1624        TEST_STATUS(status);
1625    }
1626    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1627    TEST_STATUS(status);
1628
1629    // rollback to the first commit
1630    status = fdb_rollback(&db, low_seq);
1631    TEST_STATUS(status);
1632
1633    // retrieve docs
1634    for (i=0; i<ndocs; ++i) {
1635        sprintf(keybuf, "key%d", i);
1636        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value_out, &valuelen_out);
1637        TEST_STATUS(status);
1638        free(value_out);
1639    }
1640
1641    // create nheaders
1642    for (i = 0; i < nheaders; ++i) {
1643        sprintf(keybuf, "key%d", i);
1644        status = fdb_set_kv(db, keybuf, strlen(keybuf), (char *)"reu", 4);
1645        TEST_STATUS(status);
1646        status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1647        TEST_STATUS(status);
1648    }
1649
1650    // append some data & commit
1651    // now old blocks will be reclaimed
1652    for (i=0; i<ndocs; ++i) {
1653        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
1654        TEST_STATUS(status);
1655    }
1656    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1657    TEST_STATUS(status);
1658
1659    // append more data .. now reusable blocks are overwritten
1660    for (i=0; i<ndocs; ++i) {
1661        status = fdb_set_kv(db, key, strlen(key) + 1, val, strlen(val) + 1);
1662        TEST_STATUS(status);
1663    }
1664    status = fdb_commit(dbfile, FDB_COMMIT_MANUAL_WAL_FLUSH);
1665    TEST_STATUS(status);
1666
1667    // retrieve docs
1668    for (i=0; i<ndocs; ++i) {
1669        sprintf(keybuf, "key%d", i);
1670        status = fdb_get_kv(db, keybuf, strlen(keybuf), &value_out, &valuelen_out);
1671        TEST_STATUS(status);
1672        free(value_out);
1673    }
1674
1675    status = fdb_kvs_close(db);
1676    TEST_STATUS(status);
1677    status = fdb_close(dbfile);
1678    TEST_STATUS(status);
1679    fdb_shutdown();
1680
1681    memleak_end();
1682    TEST_RESULT("reclaim rollback point test");
1683}
1684
1685int main() {
1686
1687    /* Test resuse of stale blocks with block_reusing_threshold
1688       set at 0, 65, 100 */
1689    verify_staleblock_reuse_param_test();
1690    reuse_with_snapshot_test();
1691
1692    /* Test reclaiming of stale blocks while varying
1693       num_keeping_headers */
1694    verify_minimum_num_keeping_headers_param_test();
1695    verify_high_num_keeping_headers_param_test();
1696
1697    /* Test to verify in-memory and disk snapshots before
1698       block reuse */
1699    snapshot_before_block_reuse_test(false);
1700    snapshot_before_block_reuse_test(true);
1701
1702    /* Test to verify snapshot after block reuse */
1703    snapshot_after_block_reuse_test();
1704
1705    /* Test block reusage with keys having variable value sizes */
1706    variable_value_size_test();
1707
1708    /* Test rollback with block reusage */
1709    rollback_with_num_keeping_headers();
1710
1711    /* Test block resuage with deletes */
1712    reuse_on_delete_test();
1713
1714    /* Test block reusage with manual compaction */
1715    fragmented_reuse_test();
1716
1717    /* Test to verify reuse mode with one kvstore does not affect others */
1718    enter_reuse_via_separate_kvs_test();
1719
1720#if !defined(WIN32) && !defined(_WIN32)
1721    /* Test recovery from superblock corruption */
1722    superblock_recovery_test();
1723#endif
1724
1725    /* Test rollback, verify snapshot, manual compaction upon recovery
1726       after crash */
1727    crash_and_recover_with_num_keeping_test();
1728
1729    reclaim_rollback_point_test();
1730
1731    return 0;
1732}
1733