1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdint.h>
22
23#include <atomic>
24#include <chrono>
25#include <string>
26#include <vector>
27
28#include "libforestdb/forestdb.h"
29#include "test.h"
30#include "timing.h"
31
32#include "stat_aggregator.h"
33
34// NUM_STATS to always be the last entry in the following
35// enum class to keep a count of the number of stats tracked
36// in the use case tests.
37enum _op_ {
38    SET,
39    COMMIT,
40    GET,
41    INMEMSNAP,
42    ITR_INIT,
43    ITR_GET,
44    ITR_CLOSE,
45    NUM_STATS
46};
47
48/**
49 * Each entry in the vector maintained by the file handle pool.
50 */
51struct PoolEntry {
52    PoolEntry(int _index,
53              bool _avail,
54              fdb_file_handle *_dbfile,
55              fdb_kvs_handle *_db) {
56        index = _index;
57        available.store(_avail);
58        dbfile = _dbfile;
59        db = _db;
60    }
61
62    int index;
63    std::atomic<bool> available;
64    fdb_file_handle *dbfile;
65    fdb_kvs_handle *db;
66};
67
68/**
69 * This class maintains a pool of file and kvs handles.
70 */
71class FileHandlePool {
72public:
73    FileHandlePool(const char *filename, int count) {
74        fdb_status status;
75        fdb_config fconfig = fdb_get_default_config();
76        fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
77        for (int i = 0; i < count; ++i) {
78            fdb_file_handle *dbfile;
79            fdb_kvs_handle *db;
80            status = fdb_open(&dbfile, filename, &fconfig);
81            fdb_assert(status == FDB_RESULT_SUCCESS, status, FDB_RESULT_SUCCESS);
82            status = fdb_kvs_open_default(dbfile, &db, &kvs_config);
83            fdb_assert(status == FDB_RESULT_SUCCESS, status, FDB_RESULT_SUCCESS);
84            PoolEntry *pe = new PoolEntry(i, true, dbfile, db);
85            pool_vector.push_back(pe);
86        }
87
88        // Set up StatAggregator
89        sa = new StatAggregator(NUM_STATS, 1);
90
91        sa->t_stats[SET][0].name = "set";
92        sa->t_stats[COMMIT][0].name = "commit";
93        sa->t_stats[GET][0].name = "get";
94        sa->t_stats[INMEMSNAP][0].name = "in-mem snapshot";
95
96        sa->t_stats[ITR_INIT][0].name = "iterator-init";
97        sa->t_stats[ITR_GET][0].name = "iterator-get";
98        sa->t_stats[ITR_CLOSE][0].name = "iterator-close";
99
100        samples = 0;
101        mutex_init(&statlock);
102    }
103
104    ~FileHandlePool() {
105        fdb_status status;
106        for (size_t i = 0; i < pool_vector.size(); ++i) {
107            PoolEntry *pe = pool_vector.at(i);
108            if (pe) {
109                status = fdb_kvs_close(pe->db);
110                fdb_assert(status == FDB_RESULT_SUCCESS,
111                           status, FDB_RESULT_SUCCESS);
112                status = fdb_close(pe->dbfile);
113                fdb_assert(status == FDB_RESULT_SUCCESS,
114                           status, FDB_RESULT_SUCCESS);
115                delete pe;
116            }
117        }
118        pool_vector.clear();
119
120        // Delete StatAggregator
121        delete sa;
122        mutex_destroy(&statlock);
123    }
124
125    /**
126     * Acquire a handle set and its index that is currently available,
127     * in the process, the handle set will be marked as unavailable for
128     * any other user.
129     */
130    int getAvailableResource(fdb_file_handle **dbfile, fdb_kvs_handle **db) {
131        while (true) {
132            int index = rand() % pool_vector.size();
133            bool inverse = true;
134            PoolEntry *pe = pool_vector.at(index);
135            if (pe && pe->available.compare_exchange_strong(inverse, false)) {
136                *dbfile = pe->dbfile;
137                *db = pe->db;
138                return pe->index;
139            }
140        }
141    }
142
143    /**
144     * Set the handle set at an index to available, indicating the current
145     * user will not be using the handles anymore.
146     */
147    void returnResourceToPool(int index) {
148        PoolEntry *pe = pool_vector.at(index);
149        if (!pe) {
150            fprintf(stderr, "Invalid entry!\n");
151            return;
152        }
153        bool inverse = false;
154        if (!pe->available.compare_exchange_strong(inverse, true)) {
155            fprintf(stderr, "Handles were likely used by another thread!");
156            assert(false);
157        }
158    }
159
160    /**
161     * Collects stats - invoked by concurrent threads, hence the mutex.
162     */
163    void collectStat(int index, uint64_t diff) {
164        mutex_lock(&statlock);
165        if (sa && index < NUM_STATS) {
166            sa->t_stats[index][0].latencies.push_back(diff);
167            ++samples;
168        }
169        mutex_unlock(&statlock);
170    }
171
172    /**
173     * Displays median, percentiles and histogram of the
174     * collected stats.
175     */
176    void displayCollection(const char* title) {
177        if (sa) {
178            sa->aggregateAndPrintStats(title, samples, "ms");
179        }
180    }
181
182    /**
183     * Print availabity status of the handle sets at every index in the
184     * pool.
185     */
186    void printPoolVector() {
187        fprintf(stderr, "---------------------\n");
188        for (size_t i = 0; i < pool_vector.size(); ++i) {
189            fprintf(stderr, "Index: %d Available: %d\n",
190                    (pool_vector.at(i))->index,
191                    (pool_vector.at(i))->available.load() ? 1 : 0);
192        }
193        fprintf(stderr, "---------------------\n");
194    }
195
196    /**
197     * Print FDB_LATENCY_STATS, for all file handles.
198     */
199    void printHandleStats() {
200        fdb_status status;
201        fdb_latency_stat stat;
202        int nstats = FDB_LATENCY_NUM_STATS;
203
204        StatAggregator *s = new StatAggregator(nstats, 1);
205
206        for (int i = 0; i < nstats; ++i) {
207            const char* name = fdb_latency_stat_name(i);
208            s->t_stats[i][0].name = name;
209
210            for (size_t j = 0; j < pool_vector.size(); ++j) {
211                memset(&stat, 0, sizeof(fdb_latency_stat));
212                status = fdb_get_latency_stats((pool_vector.at(j))->dbfile,
213                                               &stat, i);
214                fdb_assert(status == FDB_RESULT_SUCCESS,
215                           status, FDB_RESULT_SUCCESS);
216
217                if (stat.lat_count > 0) {
218                    s->t_stats[i][0].latencies.push_back(stat.lat_avg);
219                }
220            }
221        }
222        (void)status;
223
224        s->aggregateAndPrintStats("FDB_STATS", pool_vector.size(), "ms");
225        delete s;
226    }
227
228private:
229    std::vector<PoolEntry *> pool_vector;
230    int samples;
231    StatAggregator *sa;
232    mutex_t statlock;
233};
234
235struct ops_args {
236    FileHandlePool *fhp;
237    const float time;
238};
239
240static void *invoke_writer_ops(void *args) {
241    struct ops_args *oa = static_cast<ops_args *>(args);
242    int i = 0;
243    fdb_status status;
244    std::chrono::time_point<std::chrono::system_clock> start, end;
245    start = std::chrono::system_clock::now();
246
247    while (true) {
248        /* Acquire handles from pool */
249        fdb_file_handle *dbfile = nullptr;
250        fdb_kvs_handle *db = nullptr;
251        const int index = oa->fhp->getAvailableResource(&dbfile, &db);
252
253        char keybuf[256], bodybuf[256];
254        sprintf(keybuf, "key%d", i);
255        sprintf(bodybuf, "body%d", i);
256
257        // Start transaction
258        status = fdb_begin_transaction(dbfile, FDB_ISOLATION_READ_COMMITTED);
259        fdb_assert(status == FDB_RESULT_SUCCESS, status, FDB_RESULT_SUCCESS);
260
261        // Issue a set
262        ts_nsec beginSet = get_monotonic_ts();
263        status = fdb_set_kv(db,
264                (void*)keybuf, strlen(keybuf) + 1,
265                (void*)bodybuf, strlen(bodybuf) + 1);
266        ts_nsec endSet = get_monotonic_ts();
267        fdb_assert(status == FDB_RESULT_SUCCESS, status, FDB_RESULT_SUCCESS);
268
269        // End transaction (Commit)
270        ts_nsec beginCommit = get_monotonic_ts();
271        status = fdb_end_transaction(dbfile, FDB_COMMIT_NORMAL);
272        ts_nsec endCommit = get_monotonic_ts();
273        fdb_assert(status == FDB_RESULT_SUCCESS, status, FDB_RESULT_SUCCESS);
274
275
276        oa->fhp->collectStat(SET, ts_diff(beginSet, endSet));
277        oa->fhp->collectStat(COMMIT, ts_diff(beginCommit, endCommit));
278
279        /* Return resource to pool */
280        oa->fhp->returnResourceToPool(index);
281
282        ++i;
283        end = std::chrono::system_clock::now();
284        std::chrono::duration<double> elapsed_seconds = end - start;
285        if (elapsed_seconds.count() > oa->time) {
286#ifdef __DEBUG_USECASE
287            fprintf(stderr, "Writer: Ends after %fs\n", elapsed_seconds.count());
288#endif
289            break;
290        }
291    }
292    thread_exit(0);
293    return nullptr;
294}
295
296static void *invoke_reader_ops(void *args) {
297    struct ops_args *oa = static_cast<ops_args *>(args);
298    fdb_status status;
299    std::chrono::time_point<std::chrono::system_clock> start, end;
300    start = std::chrono::system_clock::now();
301
302    int tracker = 0;
303    while (true) {
304        /* Acquire handles from pool */
305        fdb_file_handle *dbfile = nullptr;
306        fdb_kvs_handle *db = nullptr;
307        fdb_kvs_handle *snap_handle = nullptr;
308        const int index = oa->fhp->getAvailableResource(&dbfile, &db);
309
310        // Create an in-memory snapshot
311        ts_nsec beginSnap = get_monotonic_ts();
312        status = fdb_snapshot_open(db, &snap_handle, FDB_SNAPSHOT_INMEM);
313        ts_nsec endSnap = get_monotonic_ts();
314        fdb_assert(status == FDB_RESULT_SUCCESS, status, FDB_RESULT_SUCCESS);
315
316        oa->fhp->collectStat(INMEMSNAP, ts_diff(beginSnap, endSnap));
317
318        // Iterator ops using the snapshot handle once every 100 times
319        if (++tracker % 100 == 0) {
320            fdb_iterator *iterator = nullptr;
321            fdb_doc *rdoc = nullptr;
322
323            // Initialize iterator
324            ts_nsec beginInit = get_monotonic_ts();
325            status = fdb_iterator_init(snap_handle, &iterator, NULL, 0, NULL, 0,
326                                       FDB_ITR_NONE);
327            ts_nsec endInit = get_monotonic_ts();
328            fdb_assert(status == FDB_RESULT_SUCCESS, status, FDB_RESULT_SUCCESS);
329
330            oa->fhp->collectStat(ITR_INIT,ts_diff(beginInit, endInit));
331
332            // Get using iterator
333            ts_nsec beginGet = get_monotonic_ts();
334            status = fdb_iterator_get(iterator, &rdoc);
335            ts_nsec endGet = get_monotonic_ts();
336            if (status == FDB_RESULT_SUCCESS) {
337                fdb_doc_free(rdoc);
338                oa->fhp->collectStat(ITR_GET,ts_diff(beginGet, endGet));
339            } else {
340                fdb_assert(status == FDB_RESULT_ITERATOR_FAIL,
341                           status, FDB_RESULT_ITERATOR_FAIL);
342            }
343
344            // Close iterator
345            ts_nsec beginClose = get_monotonic_ts();
346            status = fdb_iterator_close(iterator);
347            ts_nsec endClose = get_monotonic_ts();
348            fdb_assert(status == FDB_RESULT_SUCCESS, status, FDB_RESULT_SUCCESS);
349
350            oa->fhp->collectStat(ITR_CLOSE,ts_diff(beginClose, endClose));
351        } else {
352            // Try fetching a random doc, using the snapshot handle
353            void *value = nullptr;
354            size_t valuelen;
355            char keybuf[256], bodybuf[256];
356            int i = rand() % 100;
357            sprintf(keybuf, "key%d", i);
358            sprintf(bodybuf, "body%d", i);
359            ts_nsec beginGet = get_monotonic_ts();
360            status = fdb_get_kv(snap_handle,
361                                (void*)keybuf, strlen(keybuf) + 1,
362                                &value, &valuelen);
363            ts_nsec endGet = get_monotonic_ts();
364            if (status == FDB_RESULT_SUCCESS) {
365                assert(memcmp(value, bodybuf, valuelen) == 0);
366                fdb_free_block(value);
367            } else {
368                fdb_assert(status == FDB_RESULT_KEY_NOT_FOUND,
369                           status, FDB_RESULT_KEY_NOT_FOUND);
370            }
371
372            oa->fhp->collectStat(GET, ts_diff(beginGet, endGet));
373        }
374
375        // Close snapshot handle
376        status = fdb_kvs_close(snap_handle);
377        fdb_assert(status == FDB_RESULT_SUCCESS, status, FDB_RESULT_SUCCESS);
378
379        /* Return resource to pool */
380        oa->fhp->returnResourceToPool(index);
381
382        end = std::chrono::system_clock::now();
383        std::chrono::duration<double> elapsed_seconds = end - start;
384        if (elapsed_seconds.count() > oa->time) {
385#ifdef __DEBUG_USECASE
386            fprintf(stderr, "Reader: Ends after %fs\n", elapsed_seconds.count());
387#endif
388            break;
389        }
390    }
391    thread_exit(0);
392    return nullptr;
393}
394
395void reader_writer_shared_pool_test(int nhandles,
396                                    int writers,
397                                    int readers,
398                                    int time,
399                                    const char *title) {
400    TEST_INIT();
401    memleak_start();
402
403    int r;
404
405    r = system(SHELL_DEL" usecase_test* > errorlog.txt");
406    (void)r;
407
408
409    /* prepare handle pool */
410    const char *filename = "./usecase_test1";
411
412    FileHandlePool *fhp = new FileHandlePool(filename, nhandles);
413
414    assert(writers + readers > 1);
415    thread_t *threads = new thread_t[writers + readers];
416
417    struct ops_args oa{fhp, (float)time};
418
419    int threadid = 0;
420    // Spawn writer thread(s)
421    for (int i = 0; i < writers; ++i) {
422        thread_create(&threads[threadid++], invoke_writer_ops, &oa);
423    }
424
425    // Spawn reader thread(s)
426    for (int i = 0; i < readers; ++i) {
427        thread_create(&threads[threadid++], invoke_reader_ops, &oa);
428    }
429
430    assert(threadid == readers + writers);
431
432    // Wait for child threads
433    for (int j = 0; j < (readers + writers); ++j) {
434        int r = thread_join(threads[j], nullptr);
435        assert(r == 0);
436    }
437    delete[] threads;
438
439    /* Print Collected Stats */
440    fhp->displayCollection(title);
441
442#ifdef __DEBUG_USECASE
443    fhp->printHandleStats();
444#endif
445
446    /* cleanup */
447    delete fhp;
448    fdb_shutdown();
449
450    r = system(SHELL_DEL" usecase_test* > errorlog.txt");
451    (void)r;
452
453    memleak_end();
454    TEST_RESULT(title);
455}
456
457int main() {
458
459    /* Test single writer with multiple readers sharing a common
460       pool of file handles, for 30 seconds */
461    reader_writer_shared_pool_test(10 /*number of handles*/,
462                                   1        /*writer count*/,
463                                   4        /*reader count*/,
464                                   30       /*test time in seconds*/,
465                                   "1 WRITER - 4 READERS TEST");
466
467    /* Test multiple writers with multiple readers sharing a common
468       pool of file handles, for 30 seconds */
469    reader_writer_shared_pool_test(10       /*number of handles*/,
470                                   4        /*writer count*/,
471                                   4        /*reader count*/,
472                                   30       /*test time in seconds*/,
473                                   "4 WRITERS - 4 READERS TEST");
474
475    return 0;
476}
477