1import argparse
2import json
3from rabbit_helper import RabbitHelper
4import uuid
5import time
6import copy
7
8parser = argparse.ArgumentParser(description='CB System Test Tool')
9subparser = parser.add_subparsers(dest="subparsers")
10
11def add_modifier_args(parser):
12    parser.add_argument("--cc_queues",    nargs='+', help="queues to copy created keys into")
13    parser.add_argument("--consume_queue",help="queue with keys to get/update/delete")
14    parser.add_argument("--precondition", help="required stat or cluster state required before running workload")
15    parser.add_argument("--postcondition",help="required stat or cluster state required to complete workload")
16    parser.add_argument("--wait",  nargs=3,  help="time to wait before starting workload: <hour> <min> <sec>", metavar = ('HOUR','MIN','SEC'), type=int)
17#depreciated
18#parser.add_argument("--expires",nargs=3,  help="time to wait before terminating workload: <hour> <min> <sec>", metavar = ('HOUR','MIN','SEC'), type=int)
19
20def add_broker_arg(parser):
21    parser.add_argument("--broker", help="ip address of broker used to consume options")
22    parser.add_argument("--cluster", default="deafult", help="queue suffix within broker used to consume option")
23
24def add_template_parser(parent):
25    parser = parent.add_parser("template")
26
27    add_broker_arg(parser)
28    parser.add_argument("--name",     help="template name", required = True)
29    parser.add_argument("--ttl",      default=0, help="document expires time")
30    parser.add_argument("--flags",    default=0, help="document create flags")
31    parser.add_argument("--cc_queues",nargs='+', help="queues to copy created keys into")
32    parser.add_argument("--kvpairs",   nargs='+', help="list of kv items i.e=> state:ca,age:28,company:cb")
33    parser.add_argument("--type",    help="json/non-json default is json", default="json")
34    parser.add_argument("--size", nargs='+',    help="size of documents. padding is used if necessary")
35
36#TODO    parser.add_argument("--blobs",   nargs='+', help="data strings for non-json docs")
37    parser.set_defaults(handler=import_template)
38
39#TODO    parser.add_argument("--blobs",   nargs='+', help="data strings for non-json docs")
40    parser.set_defaults(handler=import_template)
41
42
43def add_workload_parser(parent):
44    parser = parent.add_parser("workload")
45
46    add_broker_arg(parser)
47    parser.add_argument("--name",    help="predefind workload", default="default")
48    parser.add_argument("--bucket",  help="bucket", default="default")
49    parser.add_argument("--password", help="password", default="")
50    parser.add_argument("--ops",     help="ops per sec", default=0, type=int)
51    parser.add_argument("--create",  help="percentage of creates 0-100", default=0, type=int)
52    parser.add_argument("--update",  help="percentage of updates 0-100", default=0, type=int)
53    parser.add_argument("--get",     help="percentage of gets 0-100", default=0, type=int)
54    parser.add_argument("--miss",    help="percentage of misses 0-100", default=0, type=int)
55    parser.add_argument("--expire",  help="percentage of expirations 0-100", default=0, type=int)
56    parser.add_argument("--ttl",      default=15, help="document expires time to use when expirations set")
57    parser.add_argument("--delete",  help="percentage of deletes 0-100", default=0, type=int)
58    parser.add_argument("--template",help="predefined template to use", default="default")
59    parser.add_argument("--standalone",help="run without broker",action='store_true')
60    parser.add_argument("--hosts",  default=["127.0.0.1"],  nargs='+', help="couchbase hosts for use with standalone")
61    parser.add_argument("--padding",  default="", help="you can put a custom string here when using standalone loader")
62    add_modifier_args(parser)
63
64    parser.set_defaults(handler=run_workload)
65
66def add_admin_parser(parent):
67    parser = parent.add_parser("admin")
68
69    add_broker_arg(parser)
70    parser.add_argument("--rebalance_in", help="rebalance_in", default='', type=str)
71    parser.add_argument("--rebalance_out", help="rebalance_out", default='', type=str)
72    parser.add_argument("--failover", help="failover", default='', type=str)
73    parser.add_argument("--only_failover", help="only_failover", default=False, action='store_true')
74    parser.add_argument("--soft_restart", help="soft_restart", default='', type=str)
75    parser.add_argument("--hard_restart", help="hard_restart", default='', type=str)
76
77    parser.set_defaults(handler=perform_admin_tasks)
78
79def add_xdcr_parser(parent):
80    parser = parent.add_parser("xdcr")
81
82    add_broker_arg(parser)
83    parser.add_argument("--dest_cluster_ip", help="Dest. cluster ip", default='', type=str)
84    parser.add_argument("--dest_cluster_username", help="Dest. cluster rest username", default='Administrator', type=str)
85    parser.add_argument("--dest_cluster_pwd", help="Dest. cluster rest pwd", default='password', type=str)
86    parser.add_argument("--dest_cluster_name", help="Dest. cluster name", default='', type=str)
87    parser.add_argument("--replication_type", help="unidirection or bidirection", default='unidirection', type=str)
88    parser.set_defaults(handler=perform_xdcr_tasks)
89
90def add_query_parser(parent):
91    parser = parent.add_parser("query")
92
93    add_broker_arg(parser)
94    parser.add_argument("--ddoc", help="Design Document", required = True, type=str)
95    parser.add_argument("--view", help="Name of view", required = True, type=str)
96    parser.add_argument("--bucket", help="Bucket with documents to query", default="default", type=str)
97    parser.add_argument("--password", help="Sasl password of bucket", default="", type=str)
98    parser.add_argument("--queries_per_sec", help="Queries per second", default=1, type=int, metavar = 'N')
99    parser.add_argument("--indexed_key",   help="the key from kvpair being indexed in this query", metavar = "KEY")
100    parser.add_argument("--include_filters", help="<startkey_docid, endkey_docid, descending, stale_ok, stale_false>", default=["startkey", "endkey", "limit"], nargs='+', metavar="")
101    parser.add_argument("--exclude_filters", help="<startkey, endkey, limit>", default=[], nargs='+', metavar="")
102    parser.add_argument("--startkey", help="manually specify value for startkey <default=auto>", type=str)
103    parser.add_argument("--endkey", help="manually specify value for endkey <default=auto>", type=str)
104    parser.add_argument("--startkey_docid", help="manually specify value for startkey_docid <default=auto>", type=str)
105    parser.add_argument("--endkey_docid", help="manually specify value for endkey_docid <default=auto>", type=str)
106    parser.add_argument("--limit", help="number of rows in query results", type=int, default=10)
107    parser.set_defaults(handler=perform_query_tasks)
108
109def add_test_parser(parent):
110    parser = parent.add_parser("test")
111
112    add_broker_arg(parser)
113    parser.add_argument("--name", help="name of remote test or runlist to start, found in <pysystests>/tests/<name>.js directory")
114    parser.add_argument("--fromfile",  help="launch a test from local file configuration")
115    parser.add_argument("--filesuffix", default="js", help="suffix appened to file when using 'name' arg", metavar = 'js')
116
117
118    parser.set_defaults(handler=run_systemtest)
119
120def setup_run_parser():
121    run_parser = subparser.add_parser('run')
122    subparser_ = run_parser.add_subparsers()
123    add_workload_parser(subparser_)
124    add_admin_parser(subparser_)
125    add_xdcr_parser(subparser_)
126    add_query_parser(subparser_)
127    add_test_parser(subparser_)
128
129def setup_import_parser():
130    import_parser = subparser.add_parser('import')
131    subparser_ = import_parser.add_subparsers()
132    add_template_parser(subparser_)
133
134
135def setup_list_parser():
136    list_parser = subparser.add_parser('list')
137    list_parser.add_argument('workloads', help='list pre-defined workloads')
138    list_parser.add_argument('templates', help='list pre-defined document templates')
139    list_parser.add_argument('tests', help='list pre-defined tests')
140
141def conv_to_secs(list_):
142    return list_[0]*60*60 + list_[1]*60 + list_[2]
143
144def getResponseQueue(handler):
145    rc_queue = "rc_"+str(uuid.uuid4())[:7]
146    handler.declare(rc_queue)
147    return rc_queue
148
149def receiveResponse(handle, rc_queue, tries = 5):
150    while tries > 0:
151        rc = handle.getMsg(rc_queue)
152        if rc is not None:
153            print rc
154            handle.delete(rc_queue)
155            return
156        tries = tries - 1
157        time.sleep(1)
158    print "no response received from broker"
159    handle.delete(rc_queue)
160
161def run_workload(args):
162
163    workload = {}
164
165    if args.name != None:
166        # TODO: read in workload params from saved store
167        # workload.update(cached_workload)
168        pass
169
170    if args.wait is not None:
171        args.wait = conv_to_secs(args.wait)
172
173    workload = { "bucket"      : args.bucket,
174                 "password"    : args.password,
175                 "ops_per_sec" : args.ops,
176                 "create_perc" : args.create,
177                 "update_perc" : args.update,
178                 "get_perc"    : args.get,
179                 "del_perc"    : args.delete,
180                 "exp_perc"    : args.expire,
181                 "miss_perc"   : args.miss,
182                 "ttl"         : args.ttl,
183                 "cc_queues"   : args.cc_queues,
184                 "consume_queue" : args.consume_queue,
185                 "postconditions" : args.postcondition,
186                 "preconditions" : args.precondition,
187                 "wait"  : args.wait,
188                 "template"  : args.template}
189    cluster = args.cluster
190
191    if args.standalone:
192        from consumer import start_client_processes
193        task = argsToTask(args)
194        start_client_processes(task, standalone = True)
195    else:
196        rabbitHelper = RabbitHelper(args.broker, cluster)
197        workload['rcq'] = getResponseQueue(rabbitHelper)
198        rabbitHelper.putMsg("workload_"+cluster, json.dumps(workload))
199        receiveResponse(rabbitHelper, workload['rcq'])
200
201def argsToTask(args):
202
203    bucket = args.bucket
204    password = args.password
205    active_hosts = args.hosts
206    ops_sec = args.ops
207    num_consumers = 1
208
209    ops_sec = int(ops_sec)/num_consumers
210    create_count = int(ops_sec *  args.create/100)
211    update_count = int(ops_sec *  args.update/100)
212    get_count = int(ops_sec *  args.get/100)
213    del_count = int(ops_sec *  args.delete/100)
214    exp_count = int(ops_sec *  args.expire/100)
215
216    ttl = args.ttl
217    miss_perc = args.miss
218
219    # broadcast to sdk_consumers
220    msg = {'bucket' : bucket,
221           'id' : bucket,
222           'password' : password,
223           'ops_sec' : ops_sec,
224           'create_count' : create_count,
225           'update_count' : update_count,
226           'get_count' : get_count,
227           'del_count' : del_count,
228           'exp_count' : exp_count,
229           'cc_queues' : None,
230           'consume_queue' : None,
231           'ttl' : ttl,
232           'miss_perc' : miss_perc,
233           'active' : True,
234           'active_hosts' : active_hosts}
235
236    # set doc-template to this message
237    msg_copy = copy.deepcopy(msg)
238    msg_copy['template'] = {}
239    msg_copy['template']['cc_queues'] = None
240    msg_copy['template']['kv'] = msg
241
242    return msg_copy
243
244
245def import_template(args):
246
247    val = None
248
249    if args.type == "json":
250        json_val = {}
251        for kv in args.kvpairs:
252            pair = '{%s}' % kv
253            try:
254                pair = json.loads(pair)
255                json_val.update(pair)
256            except ValueError as ex:
257                print "ERROR: Unable to encode as valid json: %s " % kv
258                print "make sure strings surrounded by double quotes"
259                return
260        val = json_val
261
262    #TODO binary blobs
263
264    template = {"name" : args.name,
265                "ttl" : args.ttl,
266                "flags" : args.flags,
267                "cc_queues" : args.cc_queues,
268                "size" : args.size,
269                "kv" : val}
270    cluster = args.cluster
271
272    rabbitHelper = RabbitHelper(args.broker, cluster)
273    template['rcq'] = getResponseQueue(rabbitHelper)
274    rabbitHelper.putMsg("workload_template_"+cluster, json.dumps(template))
275    receiveResponse(rabbitHelper, template['rcq'])
276
277def perform_admin_tasks(args):
278
279    actions = {'rebalance_in': args.rebalance_in,
280               'rebalance_out': args.rebalance_out,
281               'failover': args.failover,
282               'soft_restart': args.soft_restart,
283               'hard_restart': args.hard_restart,
284               'only_failover': args.only_failover
285              }
286    cluster = args.cluster
287
288    #TODO: Validate the user inputs, before passing to rabbit
289    rabbitHelper = RabbitHelper(args.broker, cluster)
290    actions['rcq'] = getResponseQueue(rabbitHelper)
291    rabbitHelper.putMsg("admin_"+cluster, json.dumps(actions))
292    receiveResponse(rabbitHelper, actions['rcq'])
293
294def perform_xdcr_tasks(args):
295
296    xdcrMsg = {'dest_cluster_ip': args.dest_cluster_ip,
297               'dest_cluster_rest_username': args.dest_cluster_username,
298               'dest_cluster_rest_pwd':  args.dest_cluster_pwd,
299               'dest_cluster_name': args.dest_cluster_name,
300               'replication_type': args.replication_type,
301    }
302    cluster = args.cluster
303
304    #TODO: Validate the user inputs, before passing to rabbit
305    print xdcrMsg
306    rabbitHelper = RabbitHelper(args.broker, cluster)
307    xdcrMsg['rcq'] = getResponseQueue(rabbitHelper)
308    rabbitHelper.putMsg("xdcr_"+cluster, json.dumps(xdcrMsg))
309    receiveResponse(rabbitHelper, xdcrMsg['rcq'])
310
311def perform_query_tasks(args):
312    queryMsg = {'queries_per_sec' : args.queries_per_sec,
313                'ddoc' : args.ddoc,
314                'view' : args.view,
315                'bucket' : args.bucket,
316                'password' : args.password,
317                'include_filters' : args.include_filters,
318                'exclude_filters' : args.exclude_filters,
319                'startkey' : args.startkey,
320                'endkey' : args.endkey,
321                'startkey_docid' : args.startkey_docid,
322                'endkey_docid' : args.endkey_docid,
323                'limit' : args.limit,
324                'indexed_key' : args.indexed_key}
325
326    cluster = args.cluster
327
328    rabbitHelper = RabbitHelper(args.broker, cluster)
329    queryMsg['rcq'] = getResponseQueue(rabbitHelper)
330    rabbitHelper.putMsg('query_'+cluster, json.dumps(queryMsg))
331    receiveResponse(rabbitHelper, queryMsg['rcq'])
332
333def run_systemtest(args):
334
335    cluster = args.cluster
336    rabbitHelper = RabbitHelper(args.broker, cluster)
337
338    test = {'suffix' : args.filesuffix}
339
340    if args.fromfile is not None:
341
342        # load json config
343        json_data = open(args.fromfile)
344        msg = json.load(json_data)
345
346    elif args.name is not None:
347        msg = { "localtestname" : args.name }
348
349    test.update(msg)
350    test['rcq'] = getResponseQueue(rabbitHelper)
351    rabbitHelper.putMsg('systest_manager_'+cluster, json.dumps(test))
352    receiveResponse(rabbitHelper, test['rcq'])
353
354
355if __name__ == "__main__":
356
357    ### setup main arg parsers
358    setup_run_parser()
359    setup_list_parser()
360    setup_import_parser()
361
362    ## PARSE ARGS ##
363    args = parser.parse_args()
364
365    # setup parser callbacks
366    if args.subparsers == "run" or\
367        args.subparsers == "import":
368        args.handler(args)
369
370