1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <stdint.h>
22 #include <time.h>
23 #include <limits.h>
24 #if !defined(WIN32) && !defined(_WIN32)
25 #include <unistd.h>
26 #include <errno.h>
27 #endif
28 
29 #include "libforestdb/forestdb.h"
30 #include "test.h"
31 #include "filemgr_anomalous_ops.h"
32 #include "filemgr.h"
33 #include "internal_types.h"
34 
logCallbackFunc(int err_code, const char *err_msg, void *pCtxData)35 void logCallbackFunc(int err_code,
36                      const char *err_msg,
37                      void *pCtxData) {
38     fprintf(stderr, "%s - error code: %d, error message: %s\n",
39             (char *) pCtxData, err_code, err_msg);
40 }
41 
42 #define TEST_FILENAME "disksim_testfile"
43 #define PWRITE_SLEEP_MASK 0x01
44 #define PWRITE_MAX_SLEEP  10
45 
46 #define PREAD_SLEEP_MASK  0x02
47 #define PREAD_MAX_SLEEP   10
48 
49 #define CLOSE_SLEEP_MASK  0x03
50 #define CLOSE_MAX_SLEEP   10
51 
52 #define FSYNC_SLEEP_MASK  0x04
53 #define FSYNC_MAX_SLEEP   100
54 
55 int MAX_NUM_SNAPSHOTS;
56 int NUM_DOCS;
57 int COMMIT_FREQ;
58 int SNAPSHOT_FREQ;
59 int NUM_ITERATORS;
60 int NUM_WRITERS;
61 int NUM_WRITER_ITERATIONS;
62 int ITERATOR_BATCH_SIZE;
63 
64 typedef struct snapshot_t {
65     fdb_kvs_handle *snap;
66     int8_t *_key_map;
67 } snapshot_t;
68 
69 typedef struct storage_t {
70     fdb_file_handle *fhandle;
71     fdb_config fconfig;
72     fdb_kvs_config kvs_config;
73     fdb_kvs_handle *main;
74     fdb_kvs_handle *back;
75     fdb_kvs_handle *def;
76     int8_t *keymap;
77     int8_t fflag;
78     snapshot_t *snaps;
79     int latest_snap_idx;
80     bool shutdown;
81     spin_t lock;
82 } storage_t;
83 
pwrite_cb(void *ctx, struct filemgr_ops *normal_ops, int fd, void *buf, size_t count, cs_off_t offset)84 ssize_t pwrite_cb(void *ctx, struct filemgr_ops *normal_ops,
85                   int fd, void *buf, size_t count, cs_off_t offset)
86 {
87     storage_t *wctx = (storage_t *)ctx;
88     if (wctx->fflag & PWRITE_SLEEP_MASK) {
89         usleep(rand() % PWRITE_MAX_SLEEP);
90     }
91     return normal_ops->pwrite(fd, buf, count, offset);
92 }
93 
pread_cb(void *ctx, struct filemgr_ops *normal_ops, int fd, void *buf, size_t count, cs_off_t offset)94 ssize_t pread_cb(void *ctx, struct filemgr_ops *normal_ops,
95                  int fd, void *buf, size_t count, cs_off_t offset)
96 {
97     storage_t *wctx = (storage_t *)ctx;
98     if (wctx->fflag & PREAD_SLEEP_MASK) {
99         usleep(rand() % PREAD_MAX_SLEEP);
100     }
101     return normal_ops->pread(fd, buf, count, offset);
102 }
103 
close_cb(void *ctx, struct filemgr_ops *normal_ops, int fd)104 int close_cb(void *ctx, struct filemgr_ops *normal_ops, int fd)
105 {
106     storage_t *wctx = (storage_t *)ctx;
107     if (wctx->fflag & CLOSE_SLEEP_MASK) {
108         usleep(rand() % CLOSE_MAX_SLEEP);
109     }
110     return normal_ops->close(fd);
111 }
112 
fsync_cb(void *ctx, struct filemgr_ops *normal_ops, int fd)113 int fsync_cb(void *ctx, struct filemgr_ops *normal_ops, int fd)
114 {
115     storage_t *wctx = (storage_t *)ctx;
116     if (wctx->fflag & FSYNC_SLEEP_MASK) {
117         usleep(rand() % FSYNC_MAX_SLEEP);
118     }
119     return normal_ops->fsync(fd);
120 }
121 
make_key(char *buf, int i, int8_t key_ver)122 INLINE void make_key(char *buf, int i, int8_t key_ver) {
123     sprintf(buf, "%08d %d", i, key_ver);
124 }
125 
indexer_set(storage_t *st, int docid, int mutno, bool isdel)126 fdb_status indexer_set(storage_t *st, int docid, int mutno, bool isdel)
127 {
128     TEST_INIT();
129     char mainkey[16], backkey[16];
130     fdb_status s;
131     int8_t key_ver;
132     fdb_doc *rdoc;
133     sprintf(backkey, "%08d", docid);
134     fdb_doc_create(&rdoc, (void *)backkey, strlen(backkey)+1, NULL,0, NULL,0);
135     s = fdb_get(st->back, rdoc);
136     key_ver = st->keymap[docid];
137     bool was_deleted = key_ver < 0 ? true : false;
138 
139     if (s == FDB_RESULT_SUCCESS) { // key was already inserted
140         TEST_CHK(!was_deleted); // must be positive for non-deleted keys
141         make_key(mainkey, docid, key_ver);
142         TEST_CMP(rdoc->body, mainkey, rdoc->bodylen);
143         // Use body from back index to delete key from main index
144         s = fdb_del_kv(st->main, rdoc->body, rdoc->bodylen);
145         TEST_CHK(s == FDB_RESULT_SUCCESS);
146     } else { // key does not exist or deleted
147         TEST_CHK(s == FDB_RESULT_KEY_NOT_FOUND);
148         TEST_CHK(key_ver <=0 ); // negative for deleted keys, 0 if new key
149     }
150 
151     if (was_deleted) {
152         key_ver = -key_ver; // flip the sign
153     }
154     if (key_ver == CHAR_MAX) {
155         key_ver = 1;
156     } else {
157         key_ver++;
158     }
159 
160     if (isdel) { // delete indexed key from database
161         s = fdb_del_kv(st->back, (void*)backkey, strlen(backkey)+1);
162         TEST_CHK(s == FDB_RESULT_SUCCESS);
163         key_ver = -key_ver;
164     } else { // update existing key in database, back index first, then main
165         make_key(mainkey, docid, key_ver);
166         s = fdb_set_kv(st->back, (void*)backkey, strlen(backkey)+1,
167                 mainkey, strlen(mainkey)+1);
168         TEST_CHK(s == FDB_RESULT_SUCCESS);
169         s = fdb_set_kv(st->main, (void *) mainkey, strlen(mainkey)+1,
170                        NULL, 0);
171         TEST_CHK(s == FDB_RESULT_SUCCESS);
172     }
173 
174     if (mutno && (mutno % COMMIT_FREQ) == 0) {
175         s = fdb_commit(st->fhandle, FDB_COMMIT_NORMAL);
176         TEST_CHK(s == FDB_RESULT_SUCCESS);
177     }
178 
179     if (mutno && (mutno % SNAPSHOT_FREQ) == 0) {
180         // Take a snapshot of main kv store as well as the key_map for reference
181         spin_lock(&st->lock);
182         st->latest_snap_idx = (st->latest_snap_idx + 1) % MAX_NUM_SNAPSHOTS;
183         st->keymap[docid] = key_ver;
184         if (st->snaps[st->latest_snap_idx].snap) {
185             fdb_kvs_close(st->snaps[st->latest_snap_idx].snap);
186         }
187 
188         s = fdb_snapshot_open(st->main, &st->snaps[st->latest_snap_idx].snap,
189                               FDB_SNAPSHOT_INMEM);
190         TEST_CHK(s == FDB_RESULT_SUCCESS);
191         memcpy(st->snaps[st->latest_snap_idx]._key_map, st->keymap, NUM_DOCS);
192 
193         spin_unlock(&st->lock);
194     } else { // just update the reference map
195         spin_lock(&st->lock);
196         st->keymap[docid] = key_ver;
197         spin_unlock(&st->lock);
198     }
199 
200     fdb_doc_free(rdoc);
201 
202     return FDB_RESULT_SUCCESS;
203 }
204 
205 static void *_writer_thread(void *voidargs)
206 {
207     TEST_INIT();
208     storage_t *db = (storage_t *)voidargs;
209     fdb_status s;
210 
211     // open the file and the 3 kv stores - main, back and default
212     s = fdb_kvs_open_default(db->fhandle, &db->def, &db->kvs_config);
213     TEST_CHK(s == FDB_RESULT_SUCCESS);
214     s = fdb_kvs_open(db->fhandle, &db->main, "main", &db->kvs_config);
215     TEST_CHK(s == FDB_RESULT_SUCCESS);
216     s = fdb_kvs_open(db->fhandle, &db->back, "back", &db->kvs_config);
217     TEST_CHK(s == FDB_RESULT_SUCCESS);
218 
219     s = fdb_set_log_callback(db->main, logCallbackFunc,
220                                   (void *) "indexer_patter_main");
221     TEST_CHK(s == FDB_RESULT_SUCCESS);
222     s = fdb_set_log_callback(db->back, logCallbackFunc,
223                                   (void *) "indexer_patter_back");
224     TEST_CHK(s == FDB_RESULT_SUCCESS);
225     s = fdb_set_log_callback(db->def, logCallbackFunc,
226                                   (void *) "indexer_patter_default");
227     TEST_CHK(s == FDB_RESULT_SUCCESS);
228 
229     for (int i = 0; i < NUM_DOCS; ++i) {
230         indexer_set(db, i, i, false);
231     }
232     printf("--------LOADING COMPLETE------\n");
233     for (int j = 0; j < NUM_WRITER_ITERATIONS * NUM_DOCS; ++j) {
234         int i = rand() % NUM_DOCS;
235         if (rand() % 100 > 80) {
236             indexer_set(db, i, j, true); // delete key
237         } else {
238             indexer_set(db, i, j, false); // update key
239         }
240     }
241 
242     thread_exit(0);
243     return NULL;
244 }
245 
246 static void *_iterator_thread(void *voidargs)
247 {
248     TEST_INIT();
249     fdb_file_handle *fhandle;
250     fdb_kvs_handle *snapdb;
251     fdb_iterator *it;
252     int start_key;
253     char buf[32];
254     int num_keys;
255     int end_key;
256     fdb_doc *rdoc = NULL;
257     int j = 0;
258     int total_docs_scanned = 0;
259     int snap_idx;
260     fdb_status s;
261 
262     storage_t *st = (storage_t *)voidargs;
263     int8_t *key_snap = alca(int8_t, NUM_DOCS);
264 
265     s = fdb_open(&fhandle, TEST_FILENAME, &st->fconfig);
266     if (s != FDB_RESULT_SUCCESS) {
267         printf("Iterator failed to open file %s %d\n", TEST_FILENAME, s);
268         TEST_CHK(s == FDB_RESULT_SUCCESS);
269     }
270 
271     while (++j) {
272         spin_lock(&st->lock);
273         if (st->shutdown) {
274             spin_unlock(&st->lock);
275             printf("Iterator thread shutting down after %d scans..\n", j);
276             break;
277         }
278         if (!st->snaps[st->latest_snap_idx].snap) {
279             spin_unlock(&st->lock);
280             usleep(10);
281             --j;
282             continue;
283         }
284         // CLONE the latest snapshot from the writer's context...
285         snap_idx = st->latest_snap_idx;
286         memcpy(key_snap, st->snaps[snap_idx]._key_map, NUM_DOCS);
287         s = fdb_snapshot_open(st->snaps[snap_idx].snap, &snapdb,
288                               FDB_SNAPSHOT_INMEM);
289         TEST_CHK(s == FDB_RESULT_SUCCESS);
290         spin_unlock(&st->lock);
291 
292         start_key = rand() % NUM_DOCS;
293         num_keys = ITERATOR_BATCH_SIZE;
294         end_key = start_key + num_keys;
295         end_key = end_key >= NUM_DOCS ? NUM_DOCS - 1 : end_key;
296 
297         make_key(buf, start_key, 0);
298         s = fdb_iterator_init(snapdb, &it, buf, strlen(buf) + 1, NULL, 0,
299                               FDB_ITR_NO_DELETES);
300         for (int i = start_key; i < end_key; ++i) {
301             if (key_snap[i] > 0) { // if the key was non-deleted in snapshot
302                 TEST_CHK(s == FDB_RESULT_SUCCESS); //next should be SUCCESS
303                 s = fdb_iterator_get(it, &rdoc);
304                 if (s == FDB_RESULT_ITERATOR_FAIL) {
305                     break; // break!
306                 }
307                 TEST_CHK(s == FDB_RESULT_SUCCESS);
308                 make_key(buf, i, key_snap[i]);
309                 TEST_CMP(rdoc->key, buf, rdoc->keylen); // validate it
310                 fdb_doc_free(rdoc);
311                 rdoc = NULL;
312                 s = fdb_iterator_next(it);
313                 total_docs_scanned++;
314             } else { // this key is not expected to be returned..
315                 continue;
316             }
317         }
318         s = fdb_iterator_close(it);
319         TEST_CHK(s == FDB_RESULT_SUCCESS);
320         fdb_kvs_close(snapdb);
321     }
322     fdb_close(fhandle);
323     printf("Total docs scanned = %d\n", total_docs_scanned);
324 
325     thread_exit(0);
326     return NULL;
327 }
328 
329 void indexer_pattern_test()
330 {
331     TEST_INIT();
332 
333     memleak_start();
334     int r;
335     fdb_status s;
336     fdb_config fconfig = fdb_get_default_config();
337     fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
338     char temp[32];
339 
340     // SETUP Configurations...
341     NUM_DOCS = 10000;
342     NUM_WRITER_ITERATIONS = 5;
343     COMMIT_FREQ = NUM_DOCS/10;
344     SNAPSHOT_FREQ = 5;
345     ITERATOR_BATCH_SIZE = 10;
346     NUM_ITERATORS = 7;
347     MAX_NUM_SNAPSHOTS = 5;
348     NUM_WRITERS = 1; // Do not bump this up not safe
349 
350     fconfig.buffercache_size = 8*1024*1024;
351     fconfig.flags = FDB_OPEN_FLAG_CREATE;
352     fconfig.purging_interval = 0;
353     fconfig.wal_threshold = 4096;
354     fconfig.num_compactor_threads = 1;
355     //fconfig.block_reusing_threshold = 0;
356     //fconfig.num_wal_partitions = 3;
357 
358     thread_t *wtid = alca(thread_t, NUM_WRITERS);
359     thread_t *tid = alca(thread_t, NUM_ITERATORS);
360     void **thread_ret = alca(void *, NUM_ITERATORS);
361 
362     // Get the default callbacks which result in normal operation for other ops
363     struct anomalous_callbacks *disk_sim_cb = get_default_anon_cbs();
364     storage_t db;
365     memset(&db, 0, sizeof(storage_t));
366     db.keymap = (int8_t *)calloc(NUM_DOCS, sizeof(int8_t));
367     spin_init(&db.lock);
368     db.snaps = (snapshot_t *)calloc(MAX_NUM_SNAPSHOTS, sizeof(snapshot_t));
369     for (int i = 0; i < MAX_NUM_SNAPSHOTS; ++i) {
370         db.snaps[i]._key_map = (int8_t *)calloc(NUM_DOCS, sizeof(int8_t));
371     }
372 
373     // Modify the pwrite callback to redirect to test-specific function
374     disk_sim_cb->pwrite_cb = &pwrite_cb;
375     disk_sim_cb->pread_cb = &pread_cb;
376     disk_sim_cb->close_cb = &close_cb;
377     disk_sim_cb->fsync_cb = &fsync_cb;
378 
379     // remove previous anomaly_test files
380     r = system(SHELL_DEL" " TEST_FILENAME " > errorlog.txt");
381     (void)r;
382 
383     // Reset anomalous behavior stats..
384     filemgr_ops_anomalous_init(disk_sim_cb, &db);
385 
386     db.fconfig = fconfig;
387     db.kvs_config = kvs_config;
388 
389     s = fdb_open(&db.fhandle, TEST_FILENAME, &db.fconfig);
390     TEST_CHK(s == FDB_RESULT_SUCCESS);
391 
392     printf("Num docs %d Num iterations %d Commit freq %d Snapshot freq %d "
393            "Iterator batch %d\n",
394            NUM_DOCS, NUM_WRITER_ITERATIONS, COMMIT_FREQ, SNAPSHOT_FREQ,
395            ITERATOR_BATCH_SIZE);
396     printf("Wal size %" _F64 " Buffercache size %" _F64 "MB\n",
397            fconfig.wal_threshold, fconfig.buffercache_size/1024/1024);
398     for (r = 0; r < NUM_WRITERS; ++r) {
399         thread_create(&wtid[r], _writer_thread, &db);
400     }
401 
402     usleep(1);
403 
404     for (int i = 0; i < NUM_ITERATORS; ++i) {
405         thread_create(&tid[i], _iterator_thread, &db);
406     }
407 
408     for (int i = 0; i < NUM_WRITERS; ++i) {
409         thread_join(wtid[i], &thread_ret[i]);
410     }
411 
412     spin_lock(&db.lock);
413     db.shutdown = true;
414     spin_unlock(&db.lock);
415 
416     for (int i = 0; i < NUM_ITERATORS; ++i) {
417         thread_join(tid[i], &thread_ret[i]);
418     }
419 
420     printf("Done with iterations.. Stats..\n");
421     // also get latency stats..
422     for (int i = 0; i < FDB_LATENCY_NUM_STATS; ++i) {
423         fdb_latency_stat stat;
424         memset(&stat, 0, sizeof(fdb_latency_stat));
425         s = fdb_get_latency_stats(db.fhandle, &stat, i);
426         TEST_CHK(s == FDB_RESULT_SUCCESS);
427         fprintf(stderr, "%s:\t%u\t%u\t%u\t%" _F64 "\n",
428                 fdb_latency_stat_name(i),
429                 stat.lat_min, stat.lat_avg, stat.lat_max, stat.lat_count);
430     }
431 
432     // free all resources
433     fdb_close(db.fhandle);
434     spin_destroy(&db.lock);
435     free(db.keymap);
436     for (int i = 0; i < MAX_NUM_SNAPSHOTS; ++i) {
437         free(db.snaps[i]._key_map);
438     }
439     free(db.snaps);
440     fdb_shutdown();
441 
442     memleak_end();
443 
444     sprintf(temp, "indexer pattern test:");
445 
446     TEST_RESULT(temp);
447 }
448 
449 int main(){
450     indexer_pattern_test();
451 
452     return 0;
453 }
454