1import re 2import sys 3import datetime 4import time 5import json 6import gevent 7import random 8import argparse 9 10 11# couchbase 12from couchbase.experimental import enable as enable_experimental 13from couchbase.exceptions import NotFoundError, TemporaryFailError, TimeoutError, NetworkError 14enable_experimental() 15from gcouchbase.bucket import Bucket 16import testcfg as cfg 17 18# rabbit 19from librabbitmq import Connection 20from rabbit_helper import RabbitHelper 21 22 23# para 24from gevent import Greenlet, queue 25import threading 26import multiprocessing 27from multiprocessing import Process, Event, queues 28 29from gevent import monkey 30monkey.patch_all() 31 32#logging 33import logging 34logging.basicConfig(filename='consumer.log',level=logging.DEBUG) 35 36# setup parser 37parser = argparse.ArgumentParser(description='CB System Test KV Consumer') 38parser.add_argument("--cluster", default = cfg.CB_CLUSTER_TAG, help="the CB_CLUSTER_TAG from testcfg will be used by default <default>") 39 40# some global state 41CB_CLUSTER_TAG = cfg.CB_CLUSTER_TAG 42CLIENTSPERPROCESS = 4 43PROCSPERTASK = 4 44MAXPROCESSES = 16 45PROCSSES = {} 46 47 48class SDKClient(threading.Thread): 49 50 def __init__(self, name, task, e): 51 threading.Thread.__init__(self) 52 self.name = name 53 self.i = 0 54 self.op_factor = CLIENTSPERPROCESS * PROCSPERTASK 55 self.ops_sec = task['ops_sec'] 56 self.bucket = task['bucket'] 57 self.password = task['password'] 58 self.template = task['template'] 59 self.default_tsizes = [128, 256] 60 self.create_count = task['create_count']/self.op_factor 61 self.update_count = task['update_count']/self.op_factor 62 self.get_count = task['get_count']/self.op_factor 63 self.del_count = task['del_count']/self.op_factor 64 self.exp_count = task['exp_count']/self.op_factor 65 self.ttl = task['ttl'] 66 self.miss_perc = task['miss_perc'] 67 self.active_hosts = task['active_hosts'] 68 self.batch_size = 5000 69 self.memq = queue.Queue() 70 self.consume_queue = task['consume_queue'] 71 self.standalone = task['standalone'] 72 self.ccq = None 73 self.hotkey_batches = [] 74 75 if self.consume_queue is not None: 76 RabbitHelper().declare(self.consume_queue) 77 78 if task['template']['cc_queues']: 79 self.ccq = str(task['template']['cc_queues'][0]) #only supporting 1 now 80 RabbitHelper().declare(self.ccq) 81 82 if self.batch_size > self.create_count: 83 self.batch_size = self.create_count 84 85 self.active_hosts = task['active_hosts'] 86 if not self.active_hosts: 87 self.active_hosts = [cfg.COUCHBASE_IP] 88 89 addr = task['active_hosts'][random.randint(0,len(self.active_hosts) - 1)].split(':') 90 host = addr[0] 91 port = 8091 92 if len(addr) > 1: 93 port = addr[1] 94 95 self.e = e 96 self.cb = None 97 self.isterminal = False 98 self.done = False 99 100 try: 101 endpoint = "%s:%s/%s" % (host, port, self.bucket) 102 self.cb = Bucket(endpoint, password = self.password) 103 except Exception as ex: 104 logging.error("[Thread %s] cannot reach %s" % 105 (self.name, endpoint)) 106 logging.error(ex) 107 self.isterminal = True 108 109 logging.info("[Thread %s] started for workload: %s" % (self.name, task['id'])) 110 111 def run(self): 112 113 cycle = ops_total = 0 114 self.e.set() 115 116 while self.e.is_set() == True: 117 118 start = datetime.datetime.now() 119 120 121 # do an op cycle 122 self.do_cycle() 123 124 if self.isterminal == True: 125 # some error occured during workload 126 self.flushq(True) 127 exit(-1) 128 129 # wait till next cycle 130 end = datetime.datetime.now() 131 wait = 1 - (end - start).microseconds/float(1000000) 132 if (wait > 0): 133 time.sleep(wait) 134 else: 135 pass #probably we are overcomitted, but it's ok 136 137 ops_total = ops_total + self.ops_sec 138 cycle = cycle + 1 139 140 if (cycle % 120) == 0: # 2 mins 141 logging.info("[Thread %s] total ops: %s" % (self.name, ops_total)) 142 self.flushq() 143 144 self.flushq() 145 logging.info("[Thread %s] done!" % (self.name)) 146 147 148 def flushq(self, flush_hotkeys = False): 149 150 if self.standalone: 151 return 152 153 mq = RabbitHelper() 154 155 if self.ccq is not None: 156 157 logging.info("[Thread %s] flushing %s items to %s" % 158 (self.name, self.memq.qsize(), self.ccq)) 159 160 # declare queue 161 mq.declare(self.ccq) 162 163 # empty the in memory queue 164 while self.memq.empty() == False: 165 try: 166 msg = self.memq.get_nowait() 167 msg = json.dumps(msg) 168 mq.putMsg(self.ccq, msg) 169 except queue.Empty: 170 pass 171 172 # hot keys 173 if flush_hotkeys and (len(self.hotkey_batches) > 0): 174 175 # try to put onto remote queue 176 queue = self.consume_queue or self.ccq 177 178 if queue is not None: 179 key_map = {'start' : self.hotkey_batches[0][0], 180 'end' : self.hotkey_batches[-1][-1]} 181 msg = json.dumps(key_map) 182 mq.putMsg(queue, msg) 183 self.hotkey_batches = [] 184 185 186 def do_cycle(self): 187 188 sizes = self.template.get('size') or self.default_tsizes 189 t_size = sizes[random.randint(0,len(sizes)-1)] 190 self.template['t_size'] = t_size 191 192 if self.create_count > 0: 193 194 count = self.create_count 195 docs_to_expire = self.exp_count 196 # check if we need to expire some docs 197 if docs_to_expire > 0: 198 199 # create an expire batch 200 self.mset(self.template, docs_to_expire, ttl = self.ttl) 201 count = count - docs_to_expire 202 203 self.mset(self.template, count) 204 205 if self.update_count > 0: 206 self.mset_update(self.template, self.update_count) 207 208 if self.get_count > 0: 209 self.mget(self.get_count) 210 211 if self.del_count > 0: 212 self.mdelete(self.del_count) 213 214 215 def mset(self, template, count, ttl = 0): 216 msg = {} 217 keys = [] 218 cursor = 0 219 j = 0 220 221 template = resolveTemplate(template) 222 for j in xrange(count): 223 self.i = self.i+1 224 msg[self.name+str(self.i)] = template 225 keys.append(self.name+str(self.i)) 226 227 if ((j+1) % self.batch_size) == 0: 228 batch = keys[cursor:j+1] 229 self._mset(msg, ttl) 230 self.memq.put_nowait({'start' : batch[0], 231 'end' : batch[-1]}) 232 msg = {} 233 cursor = j 234 elif j == (count -1): 235 batch = keys[cursor:] 236 self._mset(msg, ttl) 237 self.memq.put_nowait({'start' : batch[0], 238 'end' : batch[-1]}) 239 240 241 def _mset(self, msg, ttl = 0): 242 243 try: 244 self.cb.set_multi(msg, ttl=ttl) 245 except TemporaryFailError: 246 logging.warn("temp failure during mset - cluster may be unstable") 247 except TimeoutError: 248 logging.warn("cluster timed trying to handle mset") 249 except NetworkError as nx: 250 logging.error("network error") 251 logging.error(nx) 252 except Exception as ex: 253 logging.error(ex) 254 self.isterminal = True 255 256 def mset_update(self, template, count): 257 258 msg = {} 259 batches = self.getKeys(count) 260 template = resolveTemplate(template) 261 if len(batches) > 0: 262 263 for batch in batches: 264 try: 265 for key in batch: 266 msg[key] = template 267 self.cb.set_multi(msg) 268 except NotFoundError as nf: 269 logging.error("update key not found! %s: " % nf.key) 270 except TimeoutError: 271 logging.warn("cluster timed out trying to handle mset - cluster may be unstable") 272 except NetworkError as nx: 273 logging.error("network error") 274 logging.error(nx) 275 except TemporaryFailError: 276 logging.warn("temp failure during mset - cluster may be unstable") 277 except Exception as ex: 278 logging.error(ex) 279 self.isterminal = True 280 281 282 def mget(self, count): 283 284 batches = [] 285 if self.miss_perc > 0: 286 batches = self.getCacheMissKeys(count) 287 else: 288 batches = self.getKeys(count) 289 290 if len(batches) > 0: 291 292 for batch in batches: 293 try: 294 self.cb.get_multi(batch) 295 except NotFoundError as nf: 296 logging.warn("get key not found! %s: " % nf.key) 297 pass 298 except TimeoutError: 299 logging.warn("cluster timed out trying to handle mget - cluster may be unstable") 300 except NetworkError as nx: 301 logging.error("network error") 302 logging.error(nx) 303 except Exception as ex: 304 logging.error(ex) 305 self.isterminal = True 306 307 308 def mdelete(self, count): 309 batches = self.getKeys(count, requeue = False) 310 keys_deleted = 0 311 312 # delete from buffer 313 if len(batches) > 0: 314 keys_deleted = self._mdelete(batches) 315 else: 316 pass 317 318 def _mdelete(self, batches): 319 keys_deleted = 0 320 for batch in batches: 321 try: 322 if len(batch) > 0: 323 keys_deleted = len(batch) + keys_deleted 324 self.cb.delete_multi(batch) 325 except NotFoundError as nf: 326 logging.warn("get key not found! %s: " % nf.key) 327 except TimeoutError: 328 logging.warn("cluster timed out trying to handle mdelete - cluster may be unstable") 329 except NetworkError as nx: 330 logging.error("network error") 331 logging.error(nx) 332 except Exception as ex: 333 logging.error(ex) 334 self.isterminal = True 335 336 return keys_deleted 337 338 339 def getCacheMissKeys(self, count): 340 341 # returns batches of keys where first batch contains # of keys to miss 342 keys_retrieved = 0 343 batches = [] 344 miss_keys = [] 345 346 num_to_miss = int( ((self.miss_perc/float(100)) * count)) 347 miss_batches = self.getKeys(num_to_miss, force_stale = True) 348 349 if len(self.hotkey_batches) == 0: 350 # hotkeys are taken off queue and cannot be reused 351 # until workload is flushed 352 need = count - num_to_miss 353 self.hotkey_batches = self.getKeys(need, requeue = False) 354 355 356 batches = miss_batches + self.hotkey_batches 357 return batches 358 359 def getKeys(self, count, requeue = True, force_stale = False): 360 361 keys_retrieved = 0 362 batches = [] 363 364 while keys_retrieved < count: 365 366 # get keys 367 keys = self.getKeysFromQueue(requeue, force_stale = force_stale) 368 369 if len(keys) == 0: 370 break 371 372 # in case we got too many keys slice the batch 373 need = count - keys_retrieved 374 if(len(keys) > need): 375 keys = keys[:need] 376 377 keys_retrieved = keys_retrieved + len(keys) 378 379 # add to batch 380 batches.append(keys) 381 382 383 return batches 384 385 def getKeysFromQueue(self, requeue = True, force_stale = False): 386 387 # get key mapping and convert to keys 388 keys = [] 389 key_map = None 390 391 # priority to stale queue 392 if force_stale: 393 key_map = self.getKeyMapFromRemoteQueue(requeue) 394 395 # fall back to local qeueue 396 if key_map is None: 397 key_map = self.getKeyMapFromLocalQueue(requeue) 398 399 if key_map: 400 keys = self.keyMapToKeys(key_map) 401 402 403 return keys 404 405 def keyMapToKeys(self, key_map): 406 407 keys = [] 408 # reconstruct key-space 409 prefix, start_idx = key_map['start'].split('_') 410 prefix, end_idx = key_map['end'].split('_') 411 412 for i in range(int(start_idx), int(end_idx) + 1): 413 keys.append(prefix+"_"+str(i)) 414 415 return keys 416 417 418 def fillq(self): 419 420 if (self.consume_queue == None) and (self.ccq == None): 421 return 422 423 # put about 20 items into the queue 424 for i in xrange(20): 425 key_map = self.getKeyMapFromRemoteQueue() 426 if key_map: 427 self.memq.put_nowait(key_map) 428 429 logging.info("[Thread %s] filled %s items from %s" % 430 (self.name, self.memq.qsize(), self.consume_queue or self.ccq)) 431 432 def getKeyMapFromLocalQueue(self, requeue = True): 433 434 key_map = None 435 436 try: 437 key_map = self.memq.get_nowait() 438 if requeue: 439 self.memq.put_nowait(key_map) 440 except queue.Empty: 441 #no more items 442 self.fillq() 443 444 return key_map 445 446 def getKeyMapFromRemoteQueue(self, requeue = True): 447 448 key_map = None 449 mq = RabbitHelper() 450 451 # try to fetch from consume queue and 452 # fall back to ccqueue 453 queue = self.consume_queue 454 455 if queue is None or mq.qsize(queue) == 0: 456 queue = self.ccq 457 458 if mq.qsize(queue) > 0: 459 try: 460 key_map = mq.getJsonMsg(queue, requeue = requeue ) 461 except Exception: 462 pass 463 464 return key_map 465 466 467class SDKProcess(Process): 468 469 def __init__(self, id, task): 470 471 super(SDKProcess, self).__init__() 472 473 self.task = task 474 self.id = id 475 self.clients = [] 476 p_id = self.id 477 self.client_events = [Event() for e in xrange(CLIENTSPERPROCESS)] 478 for i in xrange(CLIENTSPERPROCESS): 479 name = _random_string(4)+"-"+str(p_id)+str(i)+"_" 480 481 # start client 482 client = SDKClient(name, self.task, self.client_events[i]) 483 self.clients.append(client) 484 485 p_id = p_id + 1 486 487 488 489 def run(self): 490 491 logging.info("[Process %s] started workload: %s" % (self.id, self.task['id'])) 492 493 # start process clients 494 for client in self.clients: 495 client.start() 496 497 # monitor running threads and restart if any die 498 while True: 499 500 i = -1 501 # if we find a dead client - restart it 502 for client in self.clients: 503 if client.is_alive() == False: 504 505 i = i + 1 506 507 if client.e.is_set() == True: 508 logging.info("[Thread %s] died" % (client.name)) 509 new_client = SDKClient(client.name, self.task, client.e) 510 new_client.start() 511 self.clients.append(new_client) 512 logging.info("[Thread %s] restarting..." % (new_client.name)) 513 else: 514 logging.info("[Thread %s] stopped by parent" % (client.name)) 515 516 break 517 518 if i > -1: 519 del self.clients[i] 520 521 time.sleep(5) 522 523 524 def terminate(self): 525 for e in self.client_events: 526 e.clear() 527 528 super(SDKProcess, self).terminate() 529 logging.info("[Process %s] terminated workload: %s" % (self.id, self.task['id'])) 530 531 532def _random_string(length): 533 return (("%%0%dX" % (length * 2)) % random.getrandbits(length * 8)).encode("ascii") 534 535def _random_int(length): 536 return random.randint(10**(length-1), (10**length) -1) 537 538def _random_float(length): 539 return _random_int(length)/(10.0**(length/2)) 540 541def kill_nprocs(id_, kill_num = None): 542 543 if id_ in PROCSSES: 544 procs = PROCSSES[id_] 545 del PROCSSES[id_] 546 547 if kill_num == None: 548 kill_num = len(procs) 549 550 for i in range(kill_num): 551 procs[i].terminate() 552 553 554def start_client_processes(task, standalone = False): 555 556 task['standalone'] = standalone 557 workload_id = task['id'] 558 PROCSSES[workload_id] = [] 559 560 561 for i in range(PROCSPERTASK): 562 563 # set process id and provide queue 564 p_id = (i)*CLIENTSPERPROCESS 565 p = SDKProcess(p_id, task) 566 567 # start 568 p.start() 569 570 # archive 571 PROCSSES[workload_id].append(p) 572 573 574def init(message): 575 body = message.body 576 task = None 577 578 if str(body) == 'init': 579 return 580 581 try: 582 task = json.loads(str(body)) 583 584 except Exception: 585 print "Unable to parse workload task" 586 print body 587 return 588 589 if task['active'] == False: 590 591 # stop processes running a workload 592 workload_id = task['id'] 593 kill_nprocs(workload_id) 594 595 596 else: 597 try: 598 start_client_processes(task) 599 600 except Exception as ex: 601 print "Unable to start workload processes" 602 print ex 603 604 605 606 607def resolveTemplate(template): 608 609 conversionFuncMap = { 610 'str' : lambda n : _random_string(n), 611 'int' : lambda n : _random_int(n), 612 'flo' : lambda n : _random_float(n), 613 'boo' : lambda n : (True, False)[random.randint(0,1)], 614 } 615 616 def convToType(val): 617 mObj = re.search(r"(.*)(\$)([A-Za-z]+)(\d+)?(.*)", val) 618 if mObj: 619 prefix, magic, type_, len_, suffix = mObj.groups() 620 len_ = len_ or 5 621 if type_ in conversionFuncMap.keys(): 622 val = conversionFuncMap[type_](int(len_)) 623 val = "{}{}{}".format(prefix, val, suffix) 624 625 return val 626 627 def resolveList(li): 628 rc = [] 629 for item in li: 630 val = item 631 if type(item) == list: 632 val = resolveList(item) 633 elif item and type(item) == str and '$' in item: 634 val = convToType(item) 635 rc.append(val) 636 637 return rc 638 def resolveDict(di): 639 rc = {} 640 for k,v in di.iteritems(): 641 val = v 642 if type(v) == dict: 643 val = resolveDict(v) 644 elif type(v) == list: 645 val = resolveList(v) 646 elif v and type(v) == str and '$' in v: 647 val = convToType(v) 648 rc[k] = val 649 return rc 650 651 t_size = template['t_size'] 652 kv_template = resolveDict(template['kv']) 653 kv_size = sys.getsizeof(kv_template)/8 654 if kv_size < t_size: 655 padding = _random_string(t_size - kv_size) 656 kv_template.update({"padding" : padding}) 657 658 return kv_template 659 660def main(): 661 args = parser.parse_args() 662 CB_CLUSTER_TAG = args.cluster 663 exchange = CB_CLUSTER_TAG+"consumers" 664 665 # setup to consume messages from worker 666 mq = RabbitHelper() 667 mq.exchange_declare(exchange, "fanout") 668 queue = mq.declare() 669 queue_name = queue[0] 670 671 # bind to exchange 672 mq.bind(exchange, queue_name) 673 mq.putMsg('', 'init', exchange) 674 675 # consume messages 676 channel, conn = mq.channel() 677 channel.basic_consume(callback = init, queue = queue_name, no_ack = True) 678 679 while True: 680 conn.drain_events() 681 682 683if __name__ == "__main__": 684 main() 685 686