1import re
2import sys
3import datetime
4import time
5import json
6import gevent
7import random
8import argparse
9
10
11# couchbase
12from couchbase.experimental import enable as enable_experimental
13from couchbase.exceptions import NotFoundError, TemporaryFailError, TimeoutError, NetworkError
14enable_experimental()
15from gcouchbase.bucket import Bucket
16import testcfg as cfg
17
18# rabbit
19from librabbitmq import Connection
20from rabbit_helper import RabbitHelper
21
22
23# para
24from gevent import Greenlet, queue
25import threading
26import multiprocessing
27from multiprocessing import Process, Event,  queues
28
29from gevent import monkey
30monkey.patch_all()
31
32#logging
33import logging
34logging.basicConfig(filename='consumer.log',level=logging.DEBUG)
35
36# setup parser
37parser = argparse.ArgumentParser(description='CB System Test KV Consumer')
38parser.add_argument("--cluster", default = cfg.CB_CLUSTER_TAG, help="the CB_CLUSTER_TAG from testcfg will be used by default <default>")
39
40# some global state
41CB_CLUSTER_TAG = cfg.CB_CLUSTER_TAG
42CLIENTSPERPROCESS = 4
43PROCSPERTASK = 4
44MAXPROCESSES = 16
45PROCSSES = {}
46
47
48class SDKClient(threading.Thread):
49
50    def __init__(self, name, task, e):
51        threading.Thread.__init__(self)
52        self.name = name
53        self.i = 0
54        self.op_factor = CLIENTSPERPROCESS * PROCSPERTASK
55        self.ops_sec = task['ops_sec']
56        self.bucket = task['bucket']
57        self.password  = task['password']
58        self.template = task['template']
59        self.default_tsizes = [128, 256]
60        self.create_count = task['create_count']/self.op_factor
61        self.update_count = task['update_count']/self.op_factor
62        self.get_count = task['get_count']/self.op_factor
63        self.del_count = task['del_count']/self.op_factor
64        self.exp_count = task['exp_count']/self.op_factor
65        self.ttl = task['ttl']
66        self.miss_perc = task['miss_perc']
67        self.active_hosts = task['active_hosts']
68        self.batch_size = 5000
69        self.memq = queue.Queue()
70        self.consume_queue = task['consume_queue']
71        self.standalone = task['standalone']
72        self.ccq = None
73        self.hotkey_batches = []
74
75        if self.consume_queue is not None:
76            RabbitHelper().declare(self.consume_queue)
77
78        if task['template']['cc_queues']:
79            self.ccq = str(task['template']['cc_queues'][0])  #only supporting 1 now
80            RabbitHelper().declare(self.ccq)
81
82        if self.batch_size > self.create_count:
83            self.batch_size = self.create_count
84
85        self.active_hosts = task['active_hosts']
86        if not self.active_hosts:
87            self.active_hosts = [cfg.COUCHBASE_IP]
88
89        addr = task['active_hosts'][random.randint(0,len(self.active_hosts) - 1)].split(':')
90        host = addr[0]
91        port = 8091
92        if len(addr) > 1:
93            port = addr[1]
94
95        self.e = e
96        self.cb = None
97        self.isterminal = False
98        self.done = False
99
100        try:
101            endpoint = "%s:%s/%s" % (host, port, self.bucket)
102            self.cb = Bucket(endpoint, password = self.password)
103        except Exception as ex:
104            logging.error("[Thread %s] cannot reach %s" %
105                          (self.name, endpoint))
106            logging.error(ex)
107            self.isterminal = True
108
109        logging.info("[Thread %s] started for workload: %s" % (self.name, task['id']))
110
111    def run(self):
112
113        cycle = ops_total = 0
114        self.e.set()
115
116        while self.e.is_set() == True:
117
118            start = datetime.datetime.now()
119
120
121            # do an op cycle
122            self.do_cycle()
123
124            if self.isterminal == True:
125                # some error occured during workload
126                self.flushq(True)
127                exit(-1)
128
129            # wait till next cycle
130            end = datetime.datetime.now()
131            wait = 1 - (end - start).microseconds/float(1000000)
132            if (wait > 0):
133                time.sleep(wait)
134            else:
135                pass #probably  we are overcomitted, but it's ok
136
137            ops_total = ops_total + self.ops_sec
138            cycle = cycle + 1
139
140            if (cycle % 120) == 0: # 2 mins
141                logging.info("[Thread %s] total ops: %s" % (self.name, ops_total))
142                self.flushq()
143
144        self.flushq()
145        logging.info("[Thread %s] done!" % (self.name))
146
147
148    def flushq(self, flush_hotkeys = False):
149
150        if self.standalone:
151            return
152
153        mq = RabbitHelper()
154
155        if self.ccq is not None:
156
157            logging.info("[Thread %s] flushing %s items to %s" %
158                         (self.name, self.memq.qsize(), self.ccq))
159
160            # declare queue
161            mq.declare(self.ccq)
162
163            # empty the in memory queue
164            while self.memq.empty() == False:
165                try:
166                    msg = self.memq.get_nowait()
167                    msg = json.dumps(msg)
168                    mq.putMsg(self.ccq, msg)
169                except queue.Empty:
170                    pass
171
172        # hot keys
173        if flush_hotkeys and (len(self.hotkey_batches) > 0):
174
175            # try to put onto remote queue
176            queue = self.consume_queue or self.ccq
177
178            if queue is not None:
179                key_map = {'start' : self.hotkey_batches[0][0],
180                           'end' : self.hotkey_batches[-1][-1]}
181                msg = json.dumps(key_map)
182                mq.putMsg(queue, msg)
183                self.hotkey_batches = []
184
185
186    def do_cycle(self):
187
188        sizes = self.template.get('size') or self.default_tsizes
189        t_size = sizes[random.randint(0,len(sizes)-1)]
190        self.template['t_size'] = t_size
191
192        if self.create_count > 0:
193
194            count = self.create_count
195            docs_to_expire = self.exp_count
196            # check if we need to expire some docs
197            if docs_to_expire > 0:
198
199                # create an expire batch
200                self.mset(self.template, docs_to_expire, ttl = self.ttl)
201                count = count - docs_to_expire
202
203            self.mset(self.template, count)
204
205        if self.update_count > 0:
206            self.mset_update(self.template, self.update_count)
207
208        if self.get_count > 0:
209            self.mget(self.get_count)
210
211        if self.del_count > 0:
212            self.mdelete(self.del_count)
213
214
215    def mset(self, template, count, ttl = 0):
216        msg = {}
217        keys = []
218        cursor = 0
219        j = 0
220
221        template = resolveTemplate(template)
222        for j in xrange(count):
223            self.i = self.i+1
224            msg[self.name+str(self.i)] = template
225            keys.append(self.name+str(self.i))
226
227            if ((j+1) % self.batch_size) == 0:
228                batch = keys[cursor:j+1]
229                self._mset(msg, ttl)
230                self.memq.put_nowait({'start' : batch[0],
231                                      'end'  : batch[-1]})
232                msg = {}
233                cursor = j
234            elif j == (count -1):
235                batch = keys[cursor:]
236                self._mset(msg, ttl)
237                self.memq.put_nowait({'start' : batch[0],
238                                      'end'  : batch[-1]})
239
240
241    def _mset(self, msg, ttl = 0):
242
243        try:
244            self.cb.set_multi(msg, ttl=ttl)
245        except TemporaryFailError:
246            logging.warn("temp failure during mset - cluster may be unstable")
247        except TimeoutError:
248            logging.warn("cluster timed trying to handle mset")
249        except NetworkError as nx:
250            logging.error("network error")
251            logging.error(nx)
252        except Exception as ex:
253            logging.error(ex)
254            self.isterminal = True
255
256    def mset_update(self, template, count):
257
258        msg = {}
259        batches = self.getKeys(count)
260        template = resolveTemplate(template)
261        if len(batches) > 0:
262
263            for batch in batches:
264                try:
265                    for key in batch:
266                        msg[key] = template
267                    self.cb.set_multi(msg)
268                except NotFoundError as nf:
269                    logging.error("update key not found!  %s: " % nf.key)
270                except TimeoutError:
271                    logging.warn("cluster timed out trying to handle mset - cluster may be unstable")
272                except NetworkError as nx:
273                    logging.error("network error")
274                    logging.error(nx)
275                except TemporaryFailError:
276                    logging.warn("temp failure during mset - cluster may be unstable")
277                except Exception as ex:
278                    logging.error(ex)
279                    self.isterminal = True
280
281
282    def mget(self, count):
283
284        batches = []
285        if self.miss_perc > 0:
286            batches = self.getCacheMissKeys(count)
287        else:
288            batches = self.getKeys(count)
289
290        if len(batches) > 0:
291
292            for batch in batches:
293                try:
294                    self.cb.get_multi(batch)
295                except NotFoundError as nf:
296                    logging.warn("get key not found!  %s: " % nf.key)
297                    pass
298                except TimeoutError:
299                    logging.warn("cluster timed out trying to handle mget - cluster may be unstable")
300                except NetworkError as nx:
301                    logging.error("network error")
302                    logging.error(nx)
303                except Exception as ex:
304                    logging.error(ex)
305                    self.isterminal = True
306
307
308    def mdelete(self, count):
309        batches = self.getKeys(count, requeue = False)
310        keys_deleted = 0
311
312        # delete from buffer
313        if len(batches) > 0:
314            keys_deleted = self._mdelete(batches)
315        else:
316            pass
317
318    def _mdelete(self, batches):
319        keys_deleted = 0
320        for batch in batches:
321            try:
322                if len(batch) > 0:
323                    keys_deleted = len(batch) + keys_deleted
324                    self.cb.delete_multi(batch)
325            except NotFoundError as nf:
326                logging.warn("get key not found!  %s: " % nf.key)
327            except TimeoutError:
328                logging.warn("cluster timed out trying to handle mdelete - cluster may be unstable")
329            except NetworkError as nx:
330                logging.error("network error")
331                logging.error(nx)
332            except Exception as ex:
333                logging.error(ex)
334                self.isterminal = True
335
336        return keys_deleted
337
338
339    def getCacheMissKeys(self, count):
340
341        # returns batches of keys where first batch contains # of keys to miss
342        keys_retrieved = 0
343        batches = []
344        miss_keys = []
345
346        num_to_miss = int( ((self.miss_perc/float(100)) * count))
347        miss_batches = self.getKeys(num_to_miss, force_stale = True)
348
349        if len(self.hotkey_batches) == 0:
350            # hotkeys are taken off queue and cannot be reused
351            # until workload is flushed
352            need = count - num_to_miss
353            self.hotkey_batches = self.getKeys(need, requeue = False)
354
355
356        batches = miss_batches + self.hotkey_batches
357        return batches
358
359    def getKeys(self, count, requeue = True, force_stale = False):
360
361        keys_retrieved = 0
362        batches = []
363
364        while keys_retrieved < count:
365
366            # get keys
367            keys = self.getKeysFromQueue(requeue, force_stale = force_stale)
368
369            if len(keys) == 0:
370                break
371
372            # in case we got too many keys slice the batch
373            need = count - keys_retrieved
374            if(len(keys) > need):
375                keys = keys[:need]
376
377            keys_retrieved = keys_retrieved + len(keys)
378
379            # add to batch
380            batches.append(keys)
381
382
383        return batches
384
385    def getKeysFromQueue(self, requeue = True, force_stale = False):
386
387        # get key mapping and convert to keys
388        keys = []
389        key_map = None
390
391        # priority to stale queue
392        if force_stale:
393            key_map = self.getKeyMapFromRemoteQueue(requeue)
394
395        # fall back to local qeueue
396        if key_map is None:
397            key_map = self.getKeyMapFromLocalQueue(requeue)
398
399        if key_map:
400            keys = self.keyMapToKeys(key_map)
401
402
403        return keys
404
405    def keyMapToKeys(self, key_map):
406
407        keys = []
408        # reconstruct key-space
409        prefix, start_idx = key_map['start'].split('_')
410        prefix, end_idx = key_map['end'].split('_')
411
412        for i in range(int(start_idx), int(end_idx) + 1):
413            keys.append(prefix+"_"+str(i))
414
415        return keys
416
417
418    def fillq(self):
419
420        if (self.consume_queue == None) and (self.ccq == None):
421            return
422
423        # put about 20 items into the queue
424        for i in xrange(20):
425            key_map = self.getKeyMapFromRemoteQueue()
426            if key_map:
427                self.memq.put_nowait(key_map)
428
429        logging.info("[Thread %s] filled %s items from  %s" %
430                     (self.name, self.memq.qsize(), self.consume_queue or self.ccq))
431
432    def getKeyMapFromLocalQueue(self, requeue = True):
433
434        key_map = None
435
436        try:
437            key_map = self.memq.get_nowait()
438            if requeue:
439                self.memq.put_nowait(key_map)
440        except queue.Empty:
441            #no more items
442            self.fillq()
443
444        return key_map
445
446    def getKeyMapFromRemoteQueue(self, requeue = True):
447
448        key_map = None
449        mq = RabbitHelper()
450
451        # try to fetch from consume queue and
452        # fall back to ccqueue
453        queue = self.consume_queue
454
455        if queue is None  or mq.qsize(queue) == 0:
456            queue = self.ccq
457
458        if mq.qsize(queue) > 0:
459            try:
460                key_map = mq.getJsonMsg(queue, requeue = requeue )
461            except Exception:
462                pass
463
464        return key_map
465
466
467class SDKProcess(Process):
468
469    def __init__(self, id, task):
470
471        super(SDKProcess, self).__init__()
472
473        self.task = task
474        self.id = id
475        self.clients = []
476        p_id = self.id
477        self.client_events = [Event() for e in xrange(CLIENTSPERPROCESS)]
478        for i in xrange(CLIENTSPERPROCESS):
479            name = _random_string(4)+"-"+str(p_id)+str(i)+"_"
480
481            # start client
482            client = SDKClient(name, self.task, self.client_events[i])
483            self.clients.append(client)
484
485            p_id = p_id + 1
486
487
488
489    def run(self):
490
491        logging.info("[Process %s] started workload: %s" % (self.id, self.task['id']))
492
493        # start process clients
494        for client  in self.clients:
495            client.start()
496
497        # monitor running threads and restart if any die
498        while True:
499
500            i = -1
501            # if we find a dead client - restart it
502            for client in self.clients:
503                if client.is_alive() == False:
504
505                    i = i + 1
506
507                    if client.e.is_set() == True:
508                        logging.info("[Thread %s] died" % (client.name))
509                        new_client = SDKClient(client.name, self.task, client.e)
510                        new_client.start()
511                        self.clients.append(new_client)
512                        logging.info("[Thread %s] restarting..." % (new_client.name))
513                    else:
514                        logging.info("[Thread %s] stopped by parent" % (client.name))
515
516                    break
517
518            if i > -1:
519                del self.clients[i]
520
521            time.sleep(5)
522
523
524    def terminate(self):
525        for e in self.client_events:
526            e.clear()
527
528        super(SDKProcess, self).terminate()
529        logging.info("[Process %s] terminated workload: %s" % (self.id, self.task['id']))
530
531
532def _random_string(length):
533    return (("%%0%dX" % (length * 2)) % random.getrandbits(length * 8)).encode("ascii")
534
535def _random_int(length):
536    return random.randint(10**(length-1), (10**length) -1)
537
538def _random_float(length):
539    return _random_int(length)/(10.0**(length/2))
540
541def kill_nprocs(id_, kill_num = None):
542
543    if id_ in PROCSSES:
544        procs = PROCSSES[id_]
545        del PROCSSES[id_]
546
547        if kill_num == None:
548            kill_num = len(procs)
549
550        for i in range(kill_num):
551            procs[i].terminate()
552
553
554def start_client_processes(task, standalone = False):
555
556    task['standalone'] = standalone
557    workload_id = task['id']
558    PROCSSES[workload_id] = []
559
560
561    for i in range(PROCSPERTASK):
562
563        # set process id and provide queue
564        p_id = (i)*CLIENTSPERPROCESS
565        p = SDKProcess(p_id, task)
566
567        # start
568        p.start()
569
570        # archive
571        PROCSSES[workload_id].append(p)
572
573
574def init(message):
575    body = message.body
576    task = None
577
578    if str(body) == 'init':
579        return
580
581    try:
582        task = json.loads(str(body))
583
584    except Exception:
585        print "Unable to parse workload task"
586        print body
587        return
588
589    if  task['active'] == False:
590
591        # stop processes running a workload
592        workload_id = task['id']
593        kill_nprocs(workload_id)
594
595
596    else:
597        try:
598            start_client_processes(task)
599
600        except Exception as ex:
601            print "Unable to start workload processes"
602            print ex
603
604
605
606
607def resolveTemplate(template):
608
609    conversionFuncMap = {
610        'str' : lambda n : _random_string(n),
611        'int' : lambda n : _random_int(n),
612        'flo' : lambda n : _random_float(n),
613        'boo' : lambda n : (True, False)[random.randint(0,1)],
614    }
615
616    def convToType(val):
617        mObj = re.search(r"(.*)(\$)([A-Za-z]+)(\d+)?(.*)", val)
618        if mObj:
619            prefix, magic, type_, len_, suffix = mObj.groups()
620            len_ = len_ or 5
621            if type_ in conversionFuncMap.keys():
622                val = conversionFuncMap[type_](int(len_))
623                val = "{}{}{}".format(prefix, val, suffix)
624
625        return val
626
627    def resolveList(li):
628        rc = []
629        for item in li:
630            val = item
631            if type(item) == list:
632                val = resolveList(item)
633            elif item and type(item) == str and '$' in item:
634                val = convToType(item)
635            rc.append(val)
636
637        return rc
638    def resolveDict(di):
639        rc = {}
640        for k,v in di.iteritems():
641            val = v
642            if type(v) == dict:
643                val = resolveDict(v)
644            elif type(v) == list:
645                val = resolveList(v)
646            elif v and type(v) == str and '$' in v:
647                val = convToType(v)
648            rc[k] = val
649        return rc
650
651    t_size = template['t_size']
652    kv_template = resolveDict(template['kv'])
653    kv_size = sys.getsizeof(kv_template)/8
654    if  kv_size < t_size:
655        padding = _random_string(t_size - kv_size)
656        kv_template.update({"padding" : padding})
657
658    return kv_template
659
660def main():
661    args = parser.parse_args()
662    CB_CLUSTER_TAG = args.cluster
663    exchange = CB_CLUSTER_TAG+"consumers"
664
665    # setup to consume messages from worker
666    mq = RabbitHelper()
667    mq.exchange_declare(exchange, "fanout")
668    queue = mq.declare()
669    queue_name = queue[0]
670
671    # bind to exchange
672    mq.bind(exchange, queue_name)
673    mq.putMsg('', 'init', exchange)
674
675    # consume messages
676    channel, conn = mq.channel()
677    channel.basic_consume(callback = init, queue = queue_name, no_ack = True)
678
679    while True:
680        conn.drain_events()
681
682
683if __name__ == "__main__":
684    main()
685
686