1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2010 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <stdint.h>
22 #include <time.h>
23 #include <limits.h>
24 #if !defined(WIN32) && !defined(_WIN32)
25 #include <unistd.h>
26 #include <errno.h>
27 #endif
28
29 #include "libforestdb/forestdb.h"
30 #include "test.h"
31 #include "filemgr_anomalous_ops.h"
32 #include "filemgr.h"
33 #include "internal_types.h"
34
logCallbackFunc(int err_code, const char *err_msg, void *pCtxData)35 void logCallbackFunc(int err_code,
36 const char *err_msg,
37 void *pCtxData) {
38 fprintf(stderr, "%s - error code: %d, error message: %s\n",
39 (char *) pCtxData, err_code, err_msg);
40 }
41
42 #define TEST_FILENAME "disksim_testfile"
43 #define PWRITE_SLEEP_MASK 0x01
44 #define PWRITE_MAX_SLEEP 10
45
46 #define PREAD_SLEEP_MASK 0x02
47 #define PREAD_MAX_SLEEP 10
48
49 #define CLOSE_SLEEP_MASK 0x03
50 #define CLOSE_MAX_SLEEP 10
51
52 #define FSYNC_SLEEP_MASK 0x04
53 #define FSYNC_MAX_SLEEP 100
54
55 int MAX_NUM_SNAPSHOTS;
56 int NUM_DOCS;
57 int COMMIT_FREQ;
58 int SNAPSHOT_FREQ;
59 int NUM_ITERATORS;
60 int NUM_WRITERS;
61 int NUM_WRITER_ITERATIONS;
62 int ITERATOR_BATCH_SIZE;
63
64 typedef struct snapshot_t {
65 fdb_kvs_handle *snap;
66 int8_t *_key_map;
67 } snapshot_t;
68
69 typedef struct storage_t {
70 fdb_file_handle *fhandle;
71 fdb_config fconfig;
72 fdb_kvs_config kvs_config;
73 fdb_kvs_handle *main;
74 fdb_kvs_handle *back;
75 fdb_kvs_handle *def;
76 int8_t *keymap;
77 int8_t fflag;
78 snapshot_t *snaps;
79 int latest_snap_idx;
80 bool shutdown;
81 spin_t lock;
82 } storage_t;
83
pwrite_cb(void *ctx, struct filemgr_ops *normal_ops, int fd, void *buf, size_t count, cs_off_t offset)84 ssize_t pwrite_cb(void *ctx, struct filemgr_ops *normal_ops,
85 int fd, void *buf, size_t count, cs_off_t offset)
86 {
87 storage_t *wctx = (storage_t *)ctx;
88 if (wctx->fflag & PWRITE_SLEEP_MASK) {
89 usleep(rand() % PWRITE_MAX_SLEEP);
90 }
91 return normal_ops->pwrite(fd, buf, count, offset);
92 }
93
pread_cb(void *ctx, struct filemgr_ops *normal_ops, int fd, void *buf, size_t count, cs_off_t offset)94 ssize_t pread_cb(void *ctx, struct filemgr_ops *normal_ops,
95 int fd, void *buf, size_t count, cs_off_t offset)
96 {
97 storage_t *wctx = (storage_t *)ctx;
98 if (wctx->fflag & PREAD_SLEEP_MASK) {
99 usleep(rand() % PREAD_MAX_SLEEP);
100 }
101 return normal_ops->pread(fd, buf, count, offset);
102 }
103
close_cb(void *ctx, struct filemgr_ops *normal_ops, int fd)104 int close_cb(void *ctx, struct filemgr_ops *normal_ops, int fd)
105 {
106 storage_t *wctx = (storage_t *)ctx;
107 if (wctx->fflag & CLOSE_SLEEP_MASK) {
108 usleep(rand() % CLOSE_MAX_SLEEP);
109 }
110 return normal_ops->close(fd);
111 }
112
fsync_cb(void *ctx, struct filemgr_ops *normal_ops, int fd)113 int fsync_cb(void *ctx, struct filemgr_ops *normal_ops, int fd)
114 {
115 storage_t *wctx = (storage_t *)ctx;
116 if (wctx->fflag & FSYNC_SLEEP_MASK) {
117 usleep(rand() % FSYNC_MAX_SLEEP);
118 }
119 return normal_ops->fsync(fd);
120 }
121
make_key(char *buf, int i, int8_t key_ver)122 INLINE void make_key(char *buf, int i, int8_t key_ver) {
123 sprintf(buf, "%08d %d", i, key_ver);
124 }
125
indexer_set(storage_t *st, int docid, int mutno, bool isdel)126 fdb_status indexer_set(storage_t *st, int docid, int mutno, bool isdel)
127 {
128 TEST_INIT();
129 char mainkey[16], backkey[16];
130 fdb_status s;
131 int8_t key_ver;
132 fdb_doc *rdoc;
133 sprintf(backkey, "%08d", docid);
134 fdb_doc_create(&rdoc, (void *)backkey, strlen(backkey)+1, NULL,0, NULL,0);
135 s = fdb_get(st->back, rdoc);
136 key_ver = st->keymap[docid];
137 bool was_deleted = key_ver < 0 ? true : false;
138
139 if (s == FDB_RESULT_SUCCESS) { // key was already inserted
140 TEST_CHK(!was_deleted); // must be positive for non-deleted keys
141 make_key(mainkey, docid, key_ver);
142 TEST_CMP(rdoc->body, mainkey, rdoc->bodylen);
143 // Use body from back index to delete key from main index
144 s = fdb_del_kv(st->main, rdoc->body, rdoc->bodylen);
145 TEST_CHK(s == FDB_RESULT_SUCCESS);
146 } else { // key does not exist or deleted
147 TEST_CHK(s == FDB_RESULT_KEY_NOT_FOUND);
148 TEST_CHK(key_ver <=0 ); // negative for deleted keys, 0 if new key
149 }
150
151 if (was_deleted) {
152 key_ver = -key_ver; // flip the sign
153 }
154 if (key_ver == CHAR_MAX) {
155 key_ver = 1;
156 } else {
157 key_ver++;
158 }
159
160 if (isdel) { // delete indexed key from database
161 s = fdb_del_kv(st->back, (void*)backkey, strlen(backkey)+1);
162 TEST_CHK(s == FDB_RESULT_SUCCESS);
163 key_ver = -key_ver;
164 } else { // update existing key in database, back index first, then main
165 make_key(mainkey, docid, key_ver);
166 s = fdb_set_kv(st->back, (void*)backkey, strlen(backkey)+1,
167 mainkey, strlen(mainkey)+1);
168 TEST_CHK(s == FDB_RESULT_SUCCESS);
169 s = fdb_set_kv(st->main, (void *) mainkey, strlen(mainkey)+1,
170 NULL, 0);
171 TEST_CHK(s == FDB_RESULT_SUCCESS);
172 }
173
174 if (mutno && (mutno % COMMIT_FREQ) == 0) {
175 s = fdb_commit(st->fhandle, FDB_COMMIT_NORMAL);
176 TEST_CHK(s == FDB_RESULT_SUCCESS);
177 }
178
179 if (mutno && (mutno % SNAPSHOT_FREQ) == 0) {
180 // Take a snapshot of main kv store as well as the key_map for reference
181 spin_lock(&st->lock);
182 st->latest_snap_idx = (st->latest_snap_idx + 1) % MAX_NUM_SNAPSHOTS;
183 st->keymap[docid] = key_ver;
184 if (st->snaps[st->latest_snap_idx].snap) {
185 fdb_kvs_close(st->snaps[st->latest_snap_idx].snap);
186 }
187
188 s = fdb_snapshot_open(st->main, &st->snaps[st->latest_snap_idx].snap,
189 FDB_SNAPSHOT_INMEM);
190 TEST_CHK(s == FDB_RESULT_SUCCESS);
191 memcpy(st->snaps[st->latest_snap_idx]._key_map, st->keymap, NUM_DOCS);
192
193 spin_unlock(&st->lock);
194 } else { // just update the reference map
195 spin_lock(&st->lock);
196 st->keymap[docid] = key_ver;
197 spin_unlock(&st->lock);
198 }
199
200 fdb_doc_free(rdoc);
201
202 return FDB_RESULT_SUCCESS;
203 }
204
205 static void *_writer_thread(void *voidargs)
206 {
207 TEST_INIT();
208 storage_t *db = (storage_t *)voidargs;
209 fdb_status s;
210
211 // open the file and the 3 kv stores - main, back and default
212 s = fdb_kvs_open_default(db->fhandle, &db->def, &db->kvs_config);
213 TEST_CHK(s == FDB_RESULT_SUCCESS);
214 s = fdb_kvs_open(db->fhandle, &db->main, "main", &db->kvs_config);
215 TEST_CHK(s == FDB_RESULT_SUCCESS);
216 s = fdb_kvs_open(db->fhandle, &db->back, "back", &db->kvs_config);
217 TEST_CHK(s == FDB_RESULT_SUCCESS);
218
219 s = fdb_set_log_callback(db->main, logCallbackFunc,
220 (void *) "indexer_patter_main");
221 TEST_CHK(s == FDB_RESULT_SUCCESS);
222 s = fdb_set_log_callback(db->back, logCallbackFunc,
223 (void *) "indexer_patter_back");
224 TEST_CHK(s == FDB_RESULT_SUCCESS);
225 s = fdb_set_log_callback(db->def, logCallbackFunc,
226 (void *) "indexer_patter_default");
227 TEST_CHK(s == FDB_RESULT_SUCCESS);
228
229 for (int i = 0; i < NUM_DOCS; ++i) {
230 indexer_set(db, i, i, false);
231 }
232 printf("--------LOADING COMPLETE------\n");
233 for (int j = 0; j < NUM_WRITER_ITERATIONS * NUM_DOCS; ++j) {
234 int i = rand() % NUM_DOCS;
235 if (rand() % 100 > 80) {
236 indexer_set(db, i, j, true); // delete key
237 } else {
238 indexer_set(db, i, j, false); // update key
239 }
240 }
241
242 thread_exit(0);
243 return NULL;
244 }
245
246 static void *_iterator_thread(void *voidargs)
247 {
248 TEST_INIT();
249 fdb_file_handle *fhandle;
250 fdb_kvs_handle *snapdb;
251 fdb_iterator *it;
252 int start_key;
253 char buf[32];
254 int num_keys;
255 int end_key;
256 fdb_doc *rdoc = NULL;
257 int j = 0;
258 int total_docs_scanned = 0;
259 int snap_idx;
260 fdb_status s;
261
262 storage_t *st = (storage_t *)voidargs;
263 int8_t *key_snap = alca(int8_t, NUM_DOCS);
264
265 s = fdb_open(&fhandle, TEST_FILENAME, &st->fconfig);
266 if (s != FDB_RESULT_SUCCESS) {
267 printf("Iterator failed to open file %s %d\n", TEST_FILENAME, s);
268 TEST_CHK(s == FDB_RESULT_SUCCESS);
269 }
270
271 while (++j) {
272 spin_lock(&st->lock);
273 if (st->shutdown) {
274 spin_unlock(&st->lock);
275 printf("Iterator thread shutting down after %d scans..\n", j);
276 break;
277 }
278 if (!st->snaps[st->latest_snap_idx].snap) {
279 spin_unlock(&st->lock);
280 usleep(10);
281 --j;
282 continue;
283 }
284 // CLONE the latest snapshot from the writer's context...
285 snap_idx = st->latest_snap_idx;
286 memcpy(key_snap, st->snaps[snap_idx]._key_map, NUM_DOCS);
287 s = fdb_snapshot_open(st->snaps[snap_idx].snap, &snapdb,
288 FDB_SNAPSHOT_INMEM);
289 TEST_CHK(s == FDB_RESULT_SUCCESS);
290 spin_unlock(&st->lock);
291
292 start_key = rand() % NUM_DOCS;
293 num_keys = ITERATOR_BATCH_SIZE;
294 end_key = start_key + num_keys;
295 end_key = end_key >= NUM_DOCS ? NUM_DOCS - 1 : end_key;
296
297 make_key(buf, start_key, 0);
298 s = fdb_iterator_init(snapdb, &it, buf, strlen(buf) + 1, NULL, 0,
299 FDB_ITR_NO_DELETES);
300 for (int i = start_key; i < end_key; ++i) {
301 if (key_snap[i] > 0) { // if the key was non-deleted in snapshot
302 TEST_CHK(s == FDB_RESULT_SUCCESS); //next should be SUCCESS
303 s = fdb_iterator_get(it, &rdoc);
304 if (s == FDB_RESULT_ITERATOR_FAIL) {
305 break; // break!
306 }
307 TEST_CHK(s == FDB_RESULT_SUCCESS);
308 make_key(buf, i, key_snap[i]);
309 TEST_CMP(rdoc->key, buf, rdoc->keylen); // validate it
310 fdb_doc_free(rdoc);
311 rdoc = NULL;
312 s = fdb_iterator_next(it);
313 total_docs_scanned++;
314 } else { // this key is not expected to be returned..
315 continue;
316 }
317 }
318 s = fdb_iterator_close(it);
319 TEST_CHK(s == FDB_RESULT_SUCCESS);
320 fdb_kvs_close(snapdb);
321 }
322 fdb_close(fhandle);
323 printf("Total docs scanned = %d\n", total_docs_scanned);
324
325 thread_exit(0);
326 return NULL;
327 }
328
329 void indexer_pattern_test()
330 {
331 TEST_INIT();
332
333 memleak_start();
334 int r;
335 fdb_status s;
336 fdb_config fconfig = fdb_get_default_config();
337 fdb_kvs_config kvs_config = fdb_get_default_kvs_config();
338 char temp[32];
339
340 // SETUP Configurations...
341 NUM_DOCS = 10000;
342 NUM_WRITER_ITERATIONS = 5;
343 COMMIT_FREQ = NUM_DOCS/10;
344 SNAPSHOT_FREQ = 5;
345 ITERATOR_BATCH_SIZE = 10;
346 NUM_ITERATORS = 7;
347 MAX_NUM_SNAPSHOTS = 5;
348 NUM_WRITERS = 1; // Do not bump this up not safe
349
350 fconfig.buffercache_size = 8*1024*1024;
351 fconfig.flags = FDB_OPEN_FLAG_CREATE;
352 fconfig.purging_interval = 0;
353 fconfig.wal_threshold = 4096;
354 fconfig.num_compactor_threads = 1;
355 //fconfig.block_reusing_threshold = 0;
356 //fconfig.num_wal_partitions = 3;
357
358 thread_t *wtid = alca(thread_t, NUM_WRITERS);
359 thread_t *tid = alca(thread_t, NUM_ITERATORS);
360 void **thread_ret = alca(void *, NUM_ITERATORS);
361
362 // Get the default callbacks which result in normal operation for other ops
363 struct anomalous_callbacks *disk_sim_cb = get_default_anon_cbs();
364 storage_t db;
365 memset(&db, 0, sizeof(storage_t));
366 db.keymap = (int8_t *)calloc(NUM_DOCS, sizeof(int8_t));
367 spin_init(&db.lock);
368 db.snaps = (snapshot_t *)calloc(MAX_NUM_SNAPSHOTS, sizeof(snapshot_t));
369 for (int i = 0; i < MAX_NUM_SNAPSHOTS; ++i) {
370 db.snaps[i]._key_map = (int8_t *)calloc(NUM_DOCS, sizeof(int8_t));
371 }
372
373 // Modify the pwrite callback to redirect to test-specific function
374 disk_sim_cb->pwrite_cb = &pwrite_cb;
375 disk_sim_cb->pread_cb = &pread_cb;
376 disk_sim_cb->close_cb = &close_cb;
377 disk_sim_cb->fsync_cb = &fsync_cb;
378
379 // remove previous anomaly_test files
380 r = system(SHELL_DEL" " TEST_FILENAME " > errorlog.txt");
381 (void)r;
382
383 // Reset anomalous behavior stats..
384 filemgr_ops_anomalous_init(disk_sim_cb, &db);
385
386 db.fconfig = fconfig;
387 db.kvs_config = kvs_config;
388
389 s = fdb_open(&db.fhandle, TEST_FILENAME, &db.fconfig);
390 TEST_CHK(s == FDB_RESULT_SUCCESS);
391
392 printf("Num docs %d Num iterations %d Commit freq %d Snapshot freq %d "
393 "Iterator batch %d\n",
394 NUM_DOCS, NUM_WRITER_ITERATIONS, COMMIT_FREQ, SNAPSHOT_FREQ,
395 ITERATOR_BATCH_SIZE);
396 printf("Wal size %" _F64 " Buffercache size %" _F64 "MB\n",
397 fconfig.wal_threshold, fconfig.buffercache_size/1024/1024);
398 for (r = 0; r < NUM_WRITERS; ++r) {
399 thread_create(&wtid[r], _writer_thread, &db);
400 }
401
402 usleep(1);
403
404 for (int i = 0; i < NUM_ITERATORS; ++i) {
405 thread_create(&tid[i], _iterator_thread, &db);
406 }
407
408 for (int i = 0; i < NUM_WRITERS; ++i) {
409 thread_join(wtid[i], &thread_ret[i]);
410 }
411
412 spin_lock(&db.lock);
413 db.shutdown = true;
414 spin_unlock(&db.lock);
415
416 for (int i = 0; i < NUM_ITERATORS; ++i) {
417 thread_join(tid[i], &thread_ret[i]);
418 }
419
420 printf("Done with iterations.. Stats..\n");
421 // also get latency stats..
422 for (int i = 0; i < FDB_LATENCY_NUM_STATS; ++i) {
423 fdb_latency_stat stat;
424 memset(&stat, 0, sizeof(fdb_latency_stat));
425 s = fdb_get_latency_stats(db.fhandle, &stat, i);
426 TEST_CHK(s == FDB_RESULT_SUCCESS);
427 fprintf(stderr, "%s:\t%u\t%u\t%u\t%" _F64 "\n",
428 fdb_latency_stat_name(i),
429 stat.lat_min, stat.lat_avg, stat.lat_max, stat.lat_count);
430 }
431
432 // free all resources
433 fdb_close(db.fhandle);
434 spin_destroy(&db.lock);
435 free(db.keymap);
436 for (int i = 0; i < MAX_NUM_SNAPSHOTS; ++i) {
437 free(db.snaps[i]._key_map);
438 }
439 free(db.snaps);
440 fdb_shutdown();
441
442 memleak_end();
443
444 sprintf(temp, "indexer pattern test:");
445
446 TEST_RESULT(temp);
447 }
448
449 int main(){
450 indexer_pattern_test();
451
452 return 0;
453 }
454