1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18
19#include <stdio.h>
20#include <assert.h>
21
22#include <algorithm>
23#include <cmath>
24#include <iterator>
25#include <numeric>
26#include <string>
27#include <vector>
28
29#include "config.h"
30#include "timing.h"
31#include "test.h"
32
33#include <libforestdb/forestdb.h>
34
35bool track_stat(stat_history_t *stat, uint64_t lat) {
36
37    if (lat == (uint64_t)ERR_NS) {
38      return false;
39    }
40
41    if (stat) {
42        stat->latencies.push_back(lat);
43        return true;
44    } else {
45        return false;
46    }
47}
48
49void print_db_stats(fdb_file_handle **dbfiles, int nfiles) {
50
51    int i, j;
52    fdb_status status;
53    fdb_latency_stat stat;
54    int nstats = FDB_LATENCY_NUM_STATS;
55
56    StatAggregator *sa = new StatAggregator(nstats, 1);
57
58    for (i = 0; i < nstats; i++) {
59        const char* name = fdb_latency_stat_name(i);
60        sa->t_stats[i][0].name = name;
61
62        for (j = 0; j < nfiles; j++) {
63            memset(&stat, 0, sizeof(fdb_latency_stat));
64            status = fdb_get_latency_stats(dbfiles[j], &stat, i);
65            assert(status == FDB_RESULT_SUCCESS);
66
67            if (stat.lat_count > 0) {
68                sa->t_stats[i][0].latencies.push_back(stat.lat_avg);
69            }
70        }
71    }
72    (void)status;
73
74    sa->aggregateAndPrintStats("FDB_STATS", nfiles, "ms");
75    delete sa;
76}
77
78void str_gen(char *s, const int len) {
79
80    int i = 0;
81    static const char alphanum[] =
82        "0123456789"
83        "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
84        "abcdefghijklmnopqrstuvwxyz";
85    size_t n_ch = strlen(alphanum);
86
87    if (len < 1){
88        return;
89    }
90
91    // return same ordering of chars
92    while (i < len) {
93        s[i] = alphanum[i%n_ch];
94        i++;
95    }
96    s[len-1] = '\0';
97}
98
99void swap(char *x, char *y) {
100
101    char temp;
102    temp = *x;
103    *x = *y;
104    *y = temp;
105}
106
107void permute(fdb_kvs_handle *kv, char *a, int l, int r) {
108
109    int i;
110    char keybuf[256], metabuf[256], bodybuf[1024];
111    fdb_doc *doc = NULL;
112    str_gen(bodybuf, 1024);
113
114    if (l == r) {
115        sprintf(keybuf, a, l);
116        sprintf(metabuf, "meta%d", r);
117        fdb_doc_create(&doc, (void*)keybuf, strlen(keybuf),
118                       (void*)metabuf, strlen(metabuf),
119                       (void*)bodybuf, strlen(bodybuf));
120        fdb_set(kv, doc);
121        fdb_doc_free(doc);
122    } else {
123        for (i = l; i <= r; i++) {
124            swap((a+l), (a+i));
125            permute(kv, a, l+1, r);
126            swap((a+l), (a+i)); //backtrack
127        }
128    }
129}
130
131void sequential(fdb_kvs_handle *kv, int pos) {
132
133    int i;
134    char keybuf[256], metabuf[256], bodybuf[512];
135    fdb_doc *doc = NULL;
136    str_gen(bodybuf, 512);
137
138    // load flat keys
139    for (i = 0; i < 1000; i++){
140        sprintf(keybuf, "%d_%dseqkey", pos, i);
141        sprintf(metabuf, "meta%d", i);
142        fdb_doc_create(&doc, (void*)keybuf, strlen(keybuf),
143                       (void*)metabuf, strlen(metabuf),
144                       (void*)bodybuf, strlen(bodybuf));
145        fdb_set(kv, doc);
146        fdb_doc_free(doc);
147    }
148}
149
150void writer(fdb_kvs_handle *db, int pos) {
151
152    char keybuf[KEY_SIZE];
153
154    str_gen(keybuf, KEY_SIZE);
155    permute(db, keybuf, 0, PERMUTED_BYTES);
156    sequential(db, pos);
157}
158
159void reader(reader_context *ctx) {
160
161    bool is_err;
162    fdb_kvs_handle *db = ctx->handle;
163    fdb_iterator *iterator;
164    fdb_doc *doc = NULL, *rdoc = NULL;
165    fdb_status status;
166
167    track_stat(ctx->stat_itr_init,
168               timed_fdb_iterator_init(db, &iterator));
169
170    // repeat until fail
171    do {
172        // sum time of all gets
173        track_stat(ctx->stat_itr_get, timed_fdb_iterator_get(iterator, &rdoc));
174
175        // get from kv
176        fdb_doc_create(&doc, rdoc->key, rdoc->keylen, NULL, 0, NULL, 0);
177        status = fdb_get(db, doc);
178        assert(status == FDB_RESULT_SUCCESS);
179
180        fdb_doc_free(doc);
181        doc = NULL;
182
183        // kv get
184        fdb_doc_free(rdoc);
185        rdoc = NULL;
186
187        is_err = track_stat(ctx->stat_itr_next,
188                            timed_fdb_iterator_next(iterator));
189    } while (!is_err);
190    track_stat(ctx->stat_itr_close, timed_fdb_iterator_close(iterator));
191    (void)status;
192}
193
194void deletes(fdb_kvs_handle *db, int pos) {
195
196    int i;
197    char keybuf[256];
198    fdb_doc *doc = NULL;
199
200    // deletes sequential docs
201    for (i = 0; i < 1000; i++){
202        sprintf(keybuf, "%d_%dseqkey", pos, i);
203        fdb_doc_create(&doc, (void*)keybuf, strlen(keybuf), NULL, 0, NULL, 0);
204        fdb_del(db, doc);
205        fdb_doc_free(doc);
206    }
207}
208
209void do_bench() {
210
211    TEST_INIT();
212    int i, j, r;
213    int n_loops = 5;
214    int n_kvs = 16;
215
216    char cmd[64], fname[64], dbname[64];
217    int n2_kvs = n_kvs * n_kvs;
218
219    // file handlers
220    fdb_status status;
221    fdb_file_handle **dbfile = alca(fdb_file_handle*, n_kvs);
222    fdb_kvs_handle **db = alca(fdb_kvs_handle*, n2_kvs);
223    fdb_kvs_handle **snap_db = alca(fdb_kvs_handle*, n2_kvs);
224    fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
225    fdb_config fconfig = fdb_get_default_config();
226
227    // reader stats
228    reader_context *ctx = alca(reader_context, n2_kvs);
229
230    StatAggregator *sa = new StatAggregator(4, n2_kvs);
231
232    for (i = 0; i < n2_kvs; ++i) {
233        sa->t_stats[0][i].name.assign(ST_ITR_INIT);
234        sa->t_stats[1][i].name.assign(ST_ITR_NEXT);
235        sa->t_stats[2][i].name.assign(ST_ITR_GET);
236        sa->t_stats[3][i].name.assign(ST_ITR_CLOSE);
237        ctx[i].stat_itr_init = &sa->t_stats[0][i];
238        ctx[i].stat_itr_next = &sa->t_stats[1][i];
239        ctx[i].stat_itr_get = &sa->t_stats[2][i];
240        ctx[i].stat_itr_close = &sa->t_stats[3][i];
241    }
242
243    sprintf(cmd, "rm bench* > errorlog.txt");
244    r = system(cmd);
245    (void)r;
246
247    // setup
248    fconfig.compaction_mode = FDB_COMPACTION_MANUAL;
249    fconfig.auto_commit = false;
250    fconfig.compactor_sleep_duration = 600;
251    fconfig.prefetch_duration = 0;
252    fconfig.num_compactor_threads = 1;
253    fconfig.num_bgflusher_threads = 0;
254
255    // open 16 dbfiles each with 16 kvs
256    for (i = 0; i < n_kvs; ++i){
257        sprintf(fname, "bench%d",i);
258        status = fdb_open(&dbfile[i], fname, &fconfig);
259        assert(status == FDB_RESULT_SUCCESS);
260
261        for (j = i*n_kvs; j < (i*n_kvs + n_kvs); ++j){
262            sprintf(dbname, "db%d",j);
263            status = fdb_kvs_open(dbfile[i], &db[j],
264                                  dbname, &kvs_config);
265            assert(status == FDB_RESULT_SUCCESS);
266        }
267    }
268
269    for (i = 0; i < 10; ++i){
270        // generate initial commit headers
271        for (i = 0; i < n_kvs; i++){
272            status = fdb_commit(dbfile[i], FDB_COMMIT_MANUAL_WAL_FLUSH);
273            assert(status == FDB_RESULT_SUCCESS);
274        }
275    }
276
277    for (j = 0; j < n_loops; j++){
278
279        // write to single file 1 kvs
280        writer(db[0], 0);
281
282        // reads from single file 1 kvs
283        ctx[0].handle = db[0];
284        reader(&ctx[0]);
285
286        // snap iterator read
287        status = fdb_snapshot_open(db[0], &snap_db[0],
288                                   FDB_SNAPSHOT_INMEM);
289        TEST_CHK(status == FDB_RESULT_SUCCESS);
290        ctx[0].handle = snap_db[0];
291        reader(&ctx[0]);
292
293       // write/read/snap to single file 16 kvs
294        for (i = 0;i < n_kvs; ++i){
295            writer(db[i], i);
296        }
297        for (i = 0; i < n_kvs; ++i){
298            deletes(db[i], i);
299        }
300        for (i = 0; i < n_kvs; ++i){
301            ctx[i].handle = db[i];
302            reader(&ctx[i]);
303        }
304        for (i = 0; i < n_kvs; ++i){
305            status = fdb_snapshot_open(db[i], &snap_db[i],
306                                       FDB_SNAPSHOT_INMEM);
307            TEST_CHK(status == FDB_RESULT_SUCCESS);
308            ctx[i].handle = snap_db[i];
309            reader(&ctx[i]);
310        }
311
312        // commit single file
313        status = fdb_commit(dbfile[0], FDB_COMMIT_MANUAL_WAL_FLUSH);
314        assert(status == FDB_RESULT_SUCCESS);
315
316        // write/write/snap to 16 files 1 kvs
317        for (i = 0; i < n2_kvs; i += n_kvs){ // every 16 kvs is new file
318            writer(db[i], i);
319        }
320        for (i = 0; i < n2_kvs; i += n_kvs){
321            deletes(db[i], i);
322        }
323        for (i = 0; i < n2_kvs; i += n_kvs){ // every 16 kvs is new file
324            ctx[i].handle = db[i];
325            reader(&ctx[i]);
326        }
327        for (i = 0; i < n2_kvs; i += n_kvs){
328            status = fdb_snapshot_open(db[i], &snap_db[i],
329                                       FDB_SNAPSHOT_INMEM);
330            assert(status == FDB_RESULT_SUCCESS);
331            ctx[i].handle = snap_db[i];
332            reader(&ctx[i]);
333        }
334
335        // write to 16 files 16 kvs each
336        for (i = 0; i < n2_kvs; i++){
337            writer(db[i], i);
338        }
339        for (i = 0; i < n2_kvs; ++i){
340            deletes(db[i], i);
341        }
342        for (i = 0; i < n2_kvs; i++){
343            ctx[i].handle = db[i];
344            reader(&ctx[i]);
345        }
346        for (i = 0; i < n2_kvs; i++){
347            status = fdb_snapshot_open(db[i], &snap_db[i],
348                                       FDB_SNAPSHOT_INMEM);
349            assert(status == FDB_RESULT_SUCCESS);
350            ctx[i].handle = snap_db[i];
351            reader(&ctx[i]);
352        }
353
354        // commit all
355        for (i = 0;i < n_kvs; i++){
356            status = fdb_commit(dbfile[i], FDB_COMMIT_MANUAL_WAL_FLUSH);
357            assert(status == FDB_RESULT_SUCCESS);
358        }
359    }
360    // compact all
361    for (i = 0; i < n_kvs; i++){
362        status = fdb_compact(dbfile[i], NULL);
363        assert(status == FDB_RESULT_SUCCESS);
364    }
365
366    // print aggregated reader stats
367    sa->aggregateAndPrintStats("ITERATOR_TEST_STATS", n_kvs * n_kvs, "µs");
368    delete sa;
369
370    // print aggregated dbfile stats
371    print_db_stats(dbfile, n_kvs);
372
373    // cleanup
374    for(i = 0; i < n2_kvs; i++){
375        fdb_kvs_close(db[i]);
376        fdb_kvs_close(snap_db[i]);
377    }
378    for(i = 0; i < n_kvs; i++){
379        fdb_close(dbfile[i]);
380    }
381
382    fdb_shutdown();
383
384    (void)status;
385    sprintf(cmd, "rm bench* > errorlog.txt");
386    r = system(cmd);
387    (void)r;
388
389    TEST_RESULT("Benchmark done");
390}
391
392/*
393 *  ===================
394 *  FDB BENCH MARK TEST
395 *  ===================
396 *  Performs unit benchmarking with 16 dbfiles each with max 16 kvs
397 */
398int main(int argc, char* args[]) {
399
400    do_bench();
401}
402