1#!/usr/bin/env python
2import re
3import sys
4import math
5import time
6import socket
7import string
8import struct
9import random
10import threading
11import multiprocessing
12import Queue
13import logging
14import logging.config
15from collections import deque
16from hashlib import md5
17import json
18import inspect
19
20sys.path.extend(('.', 'lib'))
21
22from lib import crc32
23from lib import mc_bin_client
24from lib.membase.api.rest_client import RestConnection
25from lib.membase.api.exception import QueryViewException, \
26    ServerUnavailableException
27from lib.memcacheConstants import REQ_MAGIC_BYTE, RES_MAGIC_BYTE, \
28    ERR_NOT_MY_VBUCKET, ERR_ENOMEM, ERR_EBUSY, ERR_ETMPFAIL, REQ_PKT_FMT, \
29    RES_PKT_FMT, MIN_RECV_PACKET, SET_PKT_FMT, CMD_GET, CMD_SET, CMD_DELETE, \
30    CMD_ADD, CMD_REPLACE, CMD_PREPEND, CMD_APPEND  # "ARPA"
31from lib.perf_engines.libobserve.obs_mcsoda import McsodaObserver
32from lib.perf_engines.libobserve.obs import Observable
33from lib.perf_engines.libobserve.obs_helper import UnblockingJoinableQueue
34
35logging.config.fileConfig("mcsoda.logging.conf")
36log = logging.getLogger()
37
38LARGE_PRIME = 9576890767
39OPAQUE_MAX = 4294967295
40INT_TYPE = type(123)
41FLOAT_TYPE = type(0.1)
42DICT_TYPE = type({})
43RETRIES = 5
44
45
46class Stack(object):
47    """
48    Not a traditional stack:
49
50    If stack is full, append() removes an item from the bottom
51
52    If rotate flag is on,
53    pop() rotates the queue rather than removes an item from the top
54    """
55    def __init__(self, size, rotate=False):
56        self.size = size
57        self.rotate = rotate
58        self.deq = deque()
59
60    def __repr__(self):
61        return "Stack(size=%r, rotate=%r, deq=%r" \
62            % (self.size, self.rotate, self.deq)
63
64    def pop(self):
65        if self.size <= 0:
66            log.error("unable to pop item from Stack: invalid size %s"
67                      % self.size)
68            return None
69        try:
70            if self.rotate:
71                ret = self.deq[-1]
72                self.deq.rotate(1)
73                return ret
74            else:
75                return self.deq.pop()
76        except IndexError:
77            return None
78
79    def append(self, val):
80        if self.size <= 0:
81            log.error("unable to append item to Stack: invalid size %s"
82                      % self.size)
83            return
84        while len(self.deq) >= self.size:
85            self.deq.popleft()
86        self.deq.append(val)
87
88    def clear(self):
89        num_cleared = len(self.deq)
90        self.deq.clear()
91        log.info("cleared %d items from hot stack" % num_cleared)
92
93
94def dict_to_s(d, level="", res=None, suffix=", ", ljust=None):
95    res = res or []
96    return ''.join(dict_to_s_inner(d, level, res, suffix, ljust))
97
98
99def dict_to_s_inner(d, level, res, suffix, ljust):
100    dtype = DICT_TYPE
101    scalars = []
102    complex = []
103
104    for key in d.keys():
105        if type(d[key]) == dtype:
106            complex.append(key)
107        else:
108            scalars.append(key)
109    scalars.sort()
110    complex.sort()
111
112    # Special case for histogram output.
113    histo_max = 0
114    histo_sum = 0
115    if scalars and not complex and \
116            type(scalars[0]) == FLOAT_TYPE and type(d[scalars[0]]) == INT_TYPE:
117        for key in scalars:
118            v = d[key]
119            histo_max = max(v, histo_max)
120            histo_sum = histo_sum + v
121
122    histo_cur = 0  # Running total for histogram output.
123    for key in scalars:
124        if type(key) == FLOAT_TYPE:
125            k = re.sub("0*$", "", "%.7f" % key)
126        else:
127            k = str(key)
128        if ljust:
129            k = string.ljust(k, ljust)
130        x = d[key]
131        if histo_max:
132            histo_cur = histo_cur + x
133        v = str(x)
134        if histo_max:
135            v = string.rjust(v, 8) + " " + \
136                string.rjust("{0:.1%}".format(histo_cur / float(histo_sum)), 8) + " " + \
137                ("*" * int(math.ceil(50.0 * d[key] / histo_max)))
138
139        res.append(level + k + ": " + v + suffix)
140
141    # Recurse for nested, dictionary values.
142    if complex:
143        res.append("\n")
144    for key in complex:
145        res.append(level + str(key) + ":\n")
146        dict_to_s_inner(d[key], level + "  ", res, "\n", 9)
147
148    return res
149
150# The histo dict is returned by add_timing_sample().
151# The percentiles must be sorted, ascending, like [0.90, 0.99].
152
153
154def histo_percentile(histo, percentiles):
155    v_sum = 0
156    bins = histo.keys()
157    bins.sort()
158    for bin in bins:
159        v_sum += histo[bin]
160    v_sum = float(v_sum)
161    v_cur = 0  # Running total.
162    rv = []
163    for bin in bins:
164        if not percentiles:
165            return rv
166        v_cur += histo[bin]
167        while percentiles and (v_cur / v_sum) >= percentiles[0]:
168            rv.append((percentiles[0], bin))
169            percentiles.pop(0)
170    return rv
171
172# --------------------------------------------------------
173
174MIN_VALUE_SIZE = [10]
175
176
177def obs_cb(store):
178    """
179    callback for observe thread.
180    """
181    if not store:
182        log.error("obs_cb is broken")
183        return
184
185    log.info("obs_cb: clear obs_key_cas %s" % store.obs_key_cas)
186    store.obs_key_cas.clear()
187
188
189def woq_worker(req_queue, stats_queue, ctl, cfg, store):
190    """
191    measure latencies of standard write/observe/query patterns
192    """
193    bucket = "default"
194    ddoc = "A"
195    view = "city1"  # TODO pass from eperf
196    query_params = {"limit": 10,
197                    "stale": "false"}
198
199    log.info("woq_worker started")
200    woq_observer = McsodaObserver(ctl, cfg, store, None)
201
202    while True:
203
204        key, cas = req_queue.get(block=True)
205        start_time = time.time()  # latency includes observe and query time
206
207        # observe
208        if not woq_observer.block_for_persistence(key, cas):
209            # put an invalid object to indicate error
210            stats_queue.put([key, cas, 0, 0, 0, 0], block=True)
211            req_queue.task_done()
212            continue
213
214        obs_latency = time.time() - start_time
215        if cfg.get("woq-verbose", 0):
216            log.info("woq_worker obs latency: %s, key = %s, cas = %s "
217                     % (obs_latency, key, cas))
218
219        query_start = time.time()
220
221        try:
222            result = store.rest.query_view(ddoc, view, bucket, query_params)
223        except QueryViewException as e:
224            log.error("woq_worker QueryViewException: %s" % e)
225            stats_queue.put([key, cas, 0, 0, 0, 0], block=True)
226            req_queue.task_done()
227            continue
228
229        query_latency = time.time() - query_start
230        if cfg.get("woq-verbose", 0):
231            log.info("woq_worker query latency: %s, key = %s, cas = %s "
232                     % (query_latency, key, cas))
233            log.info("woq_worker query result: %s" % result)
234
235        latency = time.time() - start_time
236        stats_queue.put([key, cas, start_time, obs_latency, query_latency, latency],
237                        block=True)
238        req_queue.task_done()
239    log.info("woq_worker stopped working")
240
241
242def cor_worker(stats_queue, ctl, cfg, cur, store):
243    """
244    Sequentially measure latencies of create/observe_replications patterns
245
246    Create brand new items instead of reusing the foreground mcsoda load
247    """
248    OP_WIN = 1     # ensure foreground load to dominate the traffic
249    backoff = 0
250    key_num = OPAQUE_MAX - cur.get('cur-gets', 0)
251    store.cfg["cor"] = 1
252    store.cfg["batch"] = 1
253    persist = (int(cfg.get('cor-persist', 0)) == 1)
254
255    if isinstance(store, StoreMembaseBinary):
256        store.awareness.reset()
257    else:
258        log.error("cannot start cor_worker: invalid store %s" % store)
259        return
260
261    log.info("cor_worker started")
262
263    observer = McsodaObserver(ctl, cfg, store, None)
264
265    while True:
266
267        if backoff:
268            log.info("cor_worker sleep for %s seconds" % backoff)
269            time.sleep(backoff)
270            backoff = 0
271
272        start_time = time.time()
273
274        key_num -= 1
275        key_str = prepare_key(key_num, cfg.get('prefix', ''))
276
277        data = store.gen_doc(
278            key_num, key_str,
279            choose_entry(cfg.get('min-value-size', MIN_VALUE_SIZE), key_num)
280        )
281
282        grp = store.inflight_start()
283        store.cmd_append("set", key_num, key_str, data, 0, grp)
284        msg = store.inflight_complete(grp)
285
286        store.inflight_send(msg)
287        store.inflight_recv(1, grp, expectBuffer=False)
288
289        cas = store.cor_key_cas[key_num]
290        store.cor_key_cas.clear()
291
292        status = \
293            observer.block_for_replication(key_str, cas, persist=persist)
294        latency = time.time() - start_time
295
296        if status:
297            stats_queue.put([key_str, cas, start_time, latency], block=True)
298        else:
299            # put an invalid object to indicate error
300            stats_queue.put([key_str, cas, 0, 0], block=True)
301
302        if latency < OP_WIN:
303            backoff = OP_WIN - latency
304
305    log.info("cor_worker stopped")
306
307
308def run_worker(ctl, cfg, cur, store, prefix, heartbeat=0, why=""):
309    i = 0
310    t_last_flush = time.time()
311    t_last_cycle = time.time()
312    o_last_flush = store.num_ops(cur)
313    t_last = time.time()
314    o_last = store.num_ops(cur)
315    xfer_sent_last = 0
316    xfer_recv_last = 0
317    store.why = why
318    store.stats_ops = cfg.get("stats_ops", 10000)
319
320    report = cfg.get('report', 0)
321    hot_shift = cfg.get('hot-shift', 0)
322    max_ops_per_sec = float(cfg.get('max-ops-per-sec', 0))
323
324    if cfg.get('max-ops-per-sec', 0) > 0 and not 'batch' in cur:
325        cur['batch'] = 10
326
327    log.debug("%s starts cfg: %s" % (why, cfg))
328    log.debug("%s starts cur: %s" % (why, cur))
329    log.debug("%s starts store: %s" % (why, store))
330    log.debug("%s starts prefix: %s" % (why, prefix))
331    log.debug("%s starts running." % why)
332
333    heartbeat_last = t_last
334
335    if cfg.get('woq-pattern', 0):
336        woq_req_queue = UnblockingJoinableQueue(1)    # pattern: write/observe/query
337        woq_stats_queue = multiprocessing.Queue(1)
338        woq_process = multiprocessing.Process(target=woq_worker,
339                                              args=(woq_req_queue, woq_stats_queue,
340                                                    ctl, cfg, store))
341        woq_process.daemon = True
342        woq_process.start()
343
344    if cfg.get('observe', 0):
345        observer = McsodaObserver(ctl, cfg, store, obs_cb)
346        observer.start()
347
348    if cfg.get('cor-pattern', 0):
349        cor_stats_queue = multiprocessing.Queue()
350        cor_process = multiprocessing.Process(
351            target=cor_worker, args=(cor_stats_queue, ctl, cfg, cur, store))
352        cor_process.daemon = True
353        cor_process.start()
354
355    while ctl.get('run_ok', True):
356        num_ops = cur.get('cur-gets', 0) + cur.get('cur-sets', 0)
357
358        if cfg.get('max-ops', 0) and num_ops >= cfg.get('max-ops', 0):
359            log.info("exiting because of max ops")
360            break
361        if cfg.get('exit-after-creates', 0) and cfg.get('max-creates', 0) and \
362                cur.get('cur-creates', 0) >= cfg.get('max-creates', 0):
363            log.info("exiting because of max creates")
364            break
365        if cfg.get('exit-after-gets', 0) and cfg.get('max-gets', 0) and \
366                cur.get('cur-gets', 0) >= cfg.get('max-gets', 0):
367            log.info("exiting because of max gets")
368            break
369        if ctl.get('shutdown_event') is not None and \
370                ctl['shutdown_event'].is_set():
371            log.info("exiting because of shutdown event")
372            break
373
374        heartbeat_duration = time.time() - heartbeat_last
375        if heartbeat != 0 and heartbeat_duration > heartbeat:
376            heartbeat_last += heartbeat_duration
377            if cfg.get('max-ops', 0):
378                progress = 100.0 * num_ops / cfg['max-ops']
379                log.info("%s num ops = %s out of %s (%.2f %%)",
380                         why, num_ops, cfg['max-ops'], progress)
381            else:
382                log.info("%s num ops = %s", why, num_ops)
383
384        command = next_cmd(cfg, cur, store)
385        flushed = store.command(command)
386
387        if flushed and cfg.get('woq-pattern', 0):
388
389            # record stats
390            if not woq_stats_queue.empty():
391                try:
392                    key, cas, start_time, obs_latency, query_latency, latency \
393                        = woq_stats_queue.get(block=False)
394                    if not start_time and not latency:
395                        store.woq_key_cas.clear()   # error
396                    else:
397                        store.add_timing_sample("woq-obs", obs_latency)
398                        store.add_timing_sample("woq-query", query_latency)
399                        store.add_timing_sample("woq", latency)
400                        store.save_stats(start_time)
401                        store.woq_key_cas.clear()   # simply clear all, no key/cas sanity check
402                        log.info("woq_stats: key: %s, cas: %s, "
403                                 "obs_latency: %f, query_latency: %f, latency: %f"
404                                 % (key, cas, obs_latency, query_latency, latency))
405                except Queue.Empty:
406                    pass
407
408            # produce request
409            if woq_req_queue.all_finished():
410                for key_num, cas in store.woq_key_cas.iteritems():
411                    key = prepare_key(key_num, cfg.get('prefix', ''))
412                    try:
413                        woq_req_queue.put([key, cas], block=False)
414                    except Queue.Full:
415                        break
416
417        if flushed and cfg.get('observe', 0):
418            if store.obs_key_cas and not observer.num_observables():
419                observables = []
420                for key_num, cas in store.obs_key_cas.iteritems():
421                    obs = Observable(key=prepare_key(key_num, cfg.get('prefix', '')),
422                                     cas=cas,
423                                     persist_count=cfg.get('obs-persist-count', 1),
424                                     repl_count=cfg.get('obs-repl-count', 1))
425                    observables.append(obs)
426                observer.load_observables(observables)
427
428        if flushed and cfg.get('cor-pattern', 0):
429            # record stats
430            if not cor_stats_queue.empty():
431                try:
432                    key, cas, start_time, latency = \
433                        cor_stats_queue.get(block=False)
434                    if latency:
435                        store.add_timing_sample("cor", latency)
436                        store.save_stats(start_time)
437                        log.info("cor_stats: key: %s, cas: %s, latency: %f"
438                                 % (key, cas, latency))
439                except Queue.Empty:
440                    pass
441
442        i += 1
443
444        if report > 0 and i % report == 0:
445            t_curr = time.time()
446            o_curr = store.num_ops(cur)
447            xfer_sent_curr = store.xfer_sent
448            xfer_recv_curr = store.xfer_recv
449
450            t_delta = t_curr - t_last
451            o_delta = o_curr - o_last
452            xfer_sent_delta = xfer_sent_curr - xfer_sent_last
453            xfer_recv_delta = xfer_recv_curr - xfer_recv_last
454
455            try:
456                ops_per_sec = o_delta / t_delta
457                xfer_sent_per_sec = xfer_sent_delta / t_delta
458                xfer_recv_per_sec = xfer_recv_delta / t_delta
459            except ZeroDivisionError:
460                ops_per_sec = o_delta
461                xfer_sent_per_sec = xfer_sent_delta
462                xfer_recv_per_sec = xfer_recv_delta
463
464
465            log.debug(prefix + dict_to_s(cur))
466            log.info("%s ops: %s secs: %s ops/sec: %s tx-bytes/sec: %s rx-bytes/sec: %s" %
467                     (prefix, o_delta, t_delta, int(ops_per_sec),
468                      int(xfer_sent_per_sec) or "unknown",
469                      int(xfer_recv_per_sec) or "unknown"))
470
471            t_last = t_curr
472            o_last = o_curr
473            xfer_sent_last = xfer_sent_curr
474            xfer_recv_last = xfer_recv_curr
475
476        if flushed:
477            # Code below is responsible for speed limitation.
478            # Stream looks like ^_^_^_^_^_^_^
479            #
480            # delta1 = flush time + previous sleep time (^_)
481            # delta2 = flush time (^)
482            #
483            # TODO: dynamic correction factor.
484            # We have to measure actual average throughput - let's say - every
485            # minute. Thus we can adjust request rate. For now it's empiric,
486            # because we always oversleep.
487            CORRECTION_FACTOR = 0.975
488
489            delta1 = time.time() - t_last_cycle
490            delta2 = time.time() - t_last_flush
491            t_last_cycle += delta1
492
493            ops_done = float(store.num_ops(cur) - o_last_flush)
494            o_last_flush += ops_done
495
496            if max_ops_per_sec:
497                # Taking into account global throughtput
498                if cfg.get('active_fg_workers') is not None:
499                    concurrent_workers = cfg.get('active_fg_workers').value
500                else:
501                    concurrent_workers = 1
502                local_max_ops_per_sec = max_ops_per_sec / concurrent_workers
503                # Actual throughput
504                ops_per_sec = ops_done / delta2
505                # Sleep if too fast. It must be too fast.
506                if ops_per_sec > local_max_ops_per_sec:
507                    sleep_time = CORRECTION_FACTOR * ops_done / local_max_ops_per_sec - delta2
508                    time.sleep(max(sleep_time, 0))
509
510            if hot_shift > 0:
511                cur['cur-base'] = cur.get('cur-base', 0) + (hot_shift * delta1)
512
513            t_last_flush = time.time()
514
515    store.flush()
516
517
518def next_cmd(cfg, cur, store):
519    do_delete = False
520    itm_val = None
521    num_ops = cur.get('cur-ops', 0)
522
523    do_set = cfg.get('ratio-sets', 0) > float(cur.get('cur-sets', 0)) / positive(num_ops)
524    if do_set:
525        itm_gen = True
526
527        cmd = 'set'
528        cur_sets = cur.get('cur-sets', 0) + 1
529        cur['cur-sets'] = cur_sets
530        cur['cur-ops'] = cur.get('cur-ops', 0) + 1
531
532        do_set_create = (
533            (cfg.get('max-items', 0) <= 0 or
534             cfg.get('max-items', 0) > cur.get('cur-items', 0)) and
535            cfg.get('max-creates', 0) > cur.get('cur-creates', 0) and
536            cfg.get('ratio-creates', 0) >= float(cur.get('cur-creates', 0)) / positive(cur.get('cur-sets', 0))
537        )
538        if do_set_create:
539            # Create...
540            key_num = cur.get('cur-items', 0)
541
542            cur['cur-items'] = cur.get('cur-items', 0) + 1
543            cur['cur-creates'] = cur.get('cur-creates', 0) + 1
544        else:
545            # Update...
546            num_updates = cur['cur-sets'] - cur.get('cur-creates', 0)
547
548            do_delete = cfg.get('ratio-deletes', 0) > \
549                float(cur.get('cur-deletes', 0)) / positive(num_updates)
550            if do_delete:
551                itm_gen = False
552                cmd = 'delete'
553                cur['cur-deletes'] = cur.get('cur-deletes', 0) + 1
554            else:
555                num_mutates = num_updates - cur.get('cur-deletes', 0)
556
557                do_arpa = cfg.get('ratio-arpas', 0) > \
558                    float(cur.get('cur-arpas', 0)) / positive(num_mutates)
559                if do_arpa:
560                    cmd = 'arpa'
561                    cur['cur-arpas'] = cur.get('cur-arpas', 0) + 1
562
563            key_num = choose_key_num(num_updates,
564                                     cfg.get('ratio-hot', 0),
565                                     cfg.get('ratio-hot-sets', 0),
566                                     cur.get('cur-sets', 0),
567                                     cur.get('cur-base', 0),
568                                     cfg.get('random', 0),
569                                     cur)
570
571        if not do_delete and cfg.get('hot-stack', 0):
572            stack = cur.get('hot-stack', None)
573            if not stack:
574                rotate = (cfg.get('hot-stack-rotate', 0) == 1)
575                stack = Stack(cfg.get('hot-stack-size', 10), rotate)
576                cur['hot-stack'] = stack
577            stack.append(key_num)
578
579        expiration = 0
580        if cmd[0] == 's' and cfg.get('ratio-expirations', 0.0) * 100 > cur_sets % 100:
581            expiration = cfg.get('expiration', 0)
582
583        key_str = prepare_key(key_num, cfg.get('prefix', ''))
584        if itm_gen:
585            itm_val = store.gen_doc(key_num, key_str,
586                                    choose_entry(cfg.get('min-value-size', MIN_VALUE_SIZE),
587                                                 num_ops))
588
589        return cmd, key_num, key_str, itm_val, expiration
590    else:
591        cmd = 'get'
592        cur['cur-gets'] = cur.get('cur-gets', 0) + 1
593        cur['cur-ops'] = cur.get('cur-ops', 0) + 1
594
595        do_query = cfg.get('ratio-queries', 0) > \
596            float(cur.get('cur-queries', 0)) / cur.get('cur-gets', 0)
597        if do_query:
598            cmd = 'query'
599            cur['cur-queries'] = cur.get('cur-queries', 0) + 1
600
601        do_get_hit = (cfg.get('ratio-misses', 0) * 100) <= (cur.get('cur-gets', 0) % 100)
602        if do_get_hit:
603            key_num = None
604            do_hot = (cfg.get('ratio-hot-gets', 0) * 100)\
605                > (cur.get('cur-gets', 0) % 100)
606            stack = cur.get('hot-stack', None)
607
608            if do_hot and stack:
609                key_num = stack.pop()
610
611            if cfg.get('exit-after-gets', 0):
612                key_num = cur['cur-gets']
613
614            if not key_num:
615                key_num = choose_key_num(cur.get('cur-items', 0),
616                                         cfg.get('ratio-hot', 0),
617                                         cfg.get('ratio-hot-gets', 0),
618                                         cur.get('cur-gets', 0),
619                                         cur.get('cur-base', 0),
620                                         cfg.get('random', 0),
621                                         cur)
622            key_str = prepare_key(key_num, cfg.get('prefix', ''))
623
624            return cmd, key_num, key_str, itm_val, 0
625        else:
626            cur['cur-misses'] = cur.get('cur-misses', 0) + 1
627            return cmd, -1, prepare_key(-1, cfg.get('prefix', '')), None, 0
628
629
630def choose_key_num(num_items, ratio_hot, ratio_hot_choice,
631                   num_ops, base, random_key, cur):
632    """
633    Choose a random or deterministic number in order to generate the MD5 hash.
634
635    The deterministic algorithm always favors new items.
636    i.e:
637        If many items have been created (num_creates > num_hot_items), \
638        hot items are chosen from the newest guys.
639    """
640    num_creates = cur.get('cur-creates', 0)
641    if num_items < 0 or ratio_hot < 0 or ratio_hot > 1:
642        log.error("num_items: {0}, num_creates:{1}, ratio_hot: {2}"
643                  .format(num_items, num_creates, ratio_hot))
644        return 1
645
646    # get a random or deterministic key
647    if random_key == 1:
648        x = int(random.random() * num_items)
649    else:
650        pos = cur.get('pos', 0)
651        pos = (pos + LARGE_PRIME) % positive(num_items)
652        cur['pos'] = pos
653        x = pos
654
655    hit_hot_range = (ratio_hot_choice * 100) > (num_ops % 100)
656    num_hot_items = positive(math.floor(ratio_hot * num_items))
657    num_cold_items = positive(num_items - num_hot_items)
658    num_init_items = positive(num_items - num_creates)
659    base %= num_init_items
660
661    # calculate offset and apply it to the base
662    if hit_hot_range:
663        offset = x % num_hot_items
664        if offset > num_creates:                          # choose from the left hot set
665            retval = (base + offset - num_creates) % num_init_items
666        else:
667            retval = num_items - offset                   # choose from the right hot set
668    else:
669        offset = x % num_cold_items
670        if num_creates > num_hot_items:
671            retval = offset
672        elif base > num_cold_items:                         # no split-up on the cold set
673            retval = (base + num_hot_items - num_creates + offset) % num_init_items
674        elif offset < base:                                 # choose from the left cold set
675            retval = offset
676        else:
677            retval = offset + num_hot_items - num_creates   # choose from the right cold set
678
679    return int(retval) % num_items
680
681
682def positive(x):
683    if x > 0:
684        return x
685    return 1
686
687
688def prepare_key(key_num, prefix=None):
689    key_hash = md5(str(key_num)).hexdigest()[0:16]
690    if prefix and len(prefix) > 0:
691        return prefix + "-" + key_hash
692    return key_hash
693
694
695def choose_entry(arr, n):
696    return arr[n % len(arr)]
697
698
699class Store(object):
700
701    def __init__(self):
702        self.errors = dict()
703
704    def connect(self, target, user, pswd, cfg, cur, bucket="default", backups=None):
705        self.target = target
706        self.cfg = cfg
707        self.cur = cur
708        self.xfer_sent = 0
709        self.xfer_recv = 0
710
711    def err_msg(self, error):
712        """Generate error message with class.method names as prefix"""
713        cname = self.__class__.__name__
714        fname = inspect.stack()[2][3]  # err_msg <- save_error <- caller
715        return "[{0}.{1}] {2}".format(cname, fname, error)
716
717    def save_error(self, error):
718        """Update dictionary with errors"""
719        err_msg = self.err_msg(error)
720        self.errors[err_msg] = self.errors.get(err_msg, 0) + 1
721
722    def show_some_keys(self):
723        log.debug("first 5 keys...")
724        for i in range(5):
725            log.debug(("echo get %s | nc %s %s" %
726                      (self.cmd_line_get(i, prepare_key(i, self.cfg.get('prefix', ''))),
727                       self.target.rsplit(':', 1)[0],
728                       self.target.rsplit(':', 1)[1])))
729
730    def stats_collector(self, sc):
731        self.sc = sc
732
733    def command(self, c):
734        cmd, key_num, key_str, data, expiration = c
735        if cmd[0] == 'g' or cmd[0] == 'q':
736            log.debug(cmd + ' ' + key_str + '\r')
737            return False
738        if cmd[0] == 'd':
739            log.debug('delete ' + key_str + '\r')
740            return False
741
742        c = 'set'
743        if cmd[0] == 'a':
744            c = self.arpa[self.cur.get('cur-sets', 0) % len(self.arpa)]
745
746        log.debug("%s %s 0 %s %s\r\n%s\r" % (c, key_str, expiration,
747                                             len(data), data))
748        return False
749
750    def flush(self):
751        pass
752
753    def num_ops(self, cur):
754        return cur.get('cur-gets', 0) + cur.get('cur-sets', 0)
755
756    def gen_doc(self, key_num, key_str, min_value_size, json=None, cache=None):
757        if json is None:
758            json = self.cfg.get('json', 1) > 0
759        if cache is None:
760            cache = self.cfg.get('doc-cache', 0)
761
762        return gen_doc_string(key_num, key_str, min_value_size,
763                              self.cfg['suffix'][min_value_size],
764                              json, cache=cache)
765
766    def cmd_line_get(self, key_num, key_str):
767        return key_str
768
769    def readbytes(self, skt, nbytes, buf):
770        while len(buf) < nbytes:
771            data = None
772            try:
773                data = skt.recv(max(nbytes - len(buf), 4096))
774            except Exception as error:
775                self.save_error(error)
776                log.error(error)
777            if not data:
778                self.save_error("no data")
779                log.error("no data")
780                return None, ""
781            buf += data
782        return buf[:nbytes], buf[nbytes:]
783
784    def add_timing_sample(self, cmd, delta, prefix="latency-"):
785        base = prefix + cmd
786        for suffix in self.cfg.get("timing-suffixes", ["", "-recent"]):
787            key = base + suffix
788            histo = self.cur.get(key, None)
789            if histo is None:
790                histo = {}
791                self.cur[key] = histo
792            try:
793                bucket = round(self.histo_bucket(delta), 6)
794                histo[bucket] = histo.get(bucket, 0) + 1
795            except TypeError, error:
796                self.save_error(error)
797                log.error(error)
798
799    def histo_bucket(self, samp):
800        hp = self.cfg.get("histo-precision", 2)
801        if samp > 0:
802            p = 10 ** (math.floor(math.log10(samp)) - (hp - 1))
803            r = round(samp / p)
804            return r * p
805
806    def drange(self, start, stop, step):
807        r = start
808        while r < stop:
809            yield round(float(r), 6)
810            r += float(step)
811
812
813class StoreMemcachedBinary(Store):
814
815    def connect(self, target, user, pswd, cfg, cur, bucket="default", backups=None):
816        self.cfg = cfg
817        self.cur = cur
818        self.target = target
819        self.host_port = (target + ":11211").rsplit(':', 1)[0:2]
820        self.host_port[1] = int(self.host_port[1])
821        self.host_port = self.host_port[0].rsplit(':')[0:3]
822        self.connect_host_port(self.host_port[0], self.host_port[1], user, pswd, bucket=bucket)
823        self.inflight_reinit()
824        self.queue = []
825        self.cmds = 0
826        self.ops = 0
827        self.previous_ops = 0
828        self.buf = ''
829        self.arpa = [(CMD_ADD,     True),
830                     (CMD_REPLACE, True),
831                     (CMD_APPEND,  False),
832                     (CMD_PREPEND, False)]
833        self.xfer_sent = 0
834        self.xfer_recv = 0
835        self.obs_key_cas = {}  # {key_num: cas} pair
836        self.woq_key_cas = {}  # {key_num: cas} pair
837        self.cor_key_cas = {}  # {key_num: cas} pair
838        self.retries = 0
839        self.backups = backups
840        self.bucket = bucket
841        self.user = user
842        self.pswd = pswd
843
844    def reconnect(self):
845        if self.backups:
846            self.target = self.backups[0]
847            self.backups.pop(0)
848
849        log.info("StoreMemcachedBinary: reconnect to %s" % self.target)
850        self.host_port = (self.target + ":11211").rsplit(':', 0)[0:2]
851        self.host_port[1] = int(self.host_port[1])
852        self.connect_host_port(self.host_port[0], self.host_port[1],
853                               self.user, self.pswd, bucket=self.bucket)
854
855    def connect_host_port(self, host, port, user, pswd, bucket="default"):
856        self.conn = mc_bin_client.MemcachedClient(host, port)
857        if user and bucket != "default":
858            self.conn.sasl_auth_plain(user, pswd)
859
860    def inflight_reinit(self, inflight=0):
861        self.inflight = inflight
862        self.inflight_num_gets = 0
863        self.inflight_num_sets = 0
864        self.inflight_num_deletes = 0
865        self.inflight_num_arpas = 0
866        self.inflight_num_queries = 0
867        self.inflight_start_time = 0
868        self.inflight_end_time = 0
869        self.inflight_grp = None
870
871    def inflight_start(self):
872        return []
873
874    def inflight_complete(self, inflight_arr):
875        return ''.join(inflight_arr)
876
877    def inflight_send(self, inflight_msg):
878        try:
879            self.conn.s.sendall(inflight_msg)
880        except socket.error, error:
881            self.retries += 1
882            self.save_error(error)
883            log.error("%s, retries = %s", error, self.retries)
884            if self.retries == RETRIES:
885                e = ServerUnavailableException(self.host_port)
886                self.reconnect()
887                self.retries = 0
888                raise e
889            time.sleep(0.2)
890            return 0
891        return len(inflight_msg)
892
893    def inflight_recv(self, inflight, inflight_arr, expectBuffer=None):
894        received = 0
895        for i in range(inflight):
896            try:
897                cmd, keylen, extralen, errcode, datalen, opaque, val, buf = \
898                    self.recvMsg()
899            except Exception, error:
900                self.retries += 1
901                self.save_error(error)
902                log.error("%s, retries = %s", error, self.retries)
903                if self.retries == RETRIES:
904                    e = ServerUnavailableException(self.host_port)
905                    self.reconnect()
906                    self.retries = 0
907                    raise e
908                time.sleep(0.2)
909                return received
910            received += datalen + MIN_RECV_PACKET
911        return received
912
913    def inflight_append_buffer(self, grp, vbucketId, opcode, opaque):
914        return grp
915
916    def command(self, c):
917        self.queue.append(c)
918        if len(self.queue) <= self.flush_level():
919            return False
920
921        try:
922            self.flush()
923            return True
924        except ServerUnavailableException, error:
925            self.save_error(error)
926            log.error(error)
927            self.queue = list()
928            self.inflight_reinit()
929            return False
930
931    def flush_level(self):
932        return self.cur.get('batch') or self.cfg.get('batch', 100)
933
934    def get_vbucketId(self, key):
935        vbuckets = self.cfg.get("vbuckets", 0)
936        if vbuckets > 0:
937            return crc32.crc32_hash(key) & (vbuckets - 1)
938        return 0
939
940    def header(self, op, key, val, opaque=0, extra='', cas=0,
941               dtype=0,
942               fmt=REQ_PKT_FMT,
943               magic=REQ_MAGIC_BYTE):
944        vbucketId = self.get_vbucketId(key)
945        return struct.pack(fmt, magic, op,
946                           len(key), len(extra), dtype, vbucketId,
947                           len(key) + len(extra) + len(val), opaque, cas), vbucketId
948
949    def create_seed(self):
950        """Return a seed (hashable tuple or int value) based on current stats.
951        This seed ensures reproducible randomness for the same test
952        configurations.
953
954        """
955
956        if self.why == 'loop-fg':
957            return self.cur.get('cur-queries', 0)
958        else:
959            return (self.cur.get('cur-gets', 0),
960                    self.cur.get('cur-sets', 0),
961                    self.cur.get('cur-deletes', 0),
962                    self.cur.get('cur-creates', 0),
963                    self.cur.get('cur-arpas', 0))
964
965    def flush(self):
966        next_inflight = 0
967        next_inflight_num_gets = 0
968        next_inflight_num_sets = 0
969        next_inflight_num_deletes = 0
970        next_inflight_num_arpas = 0
971        next_inflight_num_queries = 0
972
973        next_grp = self.inflight_start()
974
975        # Permutation of requests
976        random.seed(self.create_seed())
977        random.shuffle(self.queue)
978
979        # Start a 1, not 0, due to the single latency measurement request.
980        for i in range(1, len(self.queue)):
981            cmd, key_num, key_str, data, expiration = self.queue[i]
982            delta_gets, delta_sets, delta_deletes, delta_arpas, delta_queries = \
983                self.cmd_append(cmd, key_num, key_str, data, expiration, next_grp)
984            next_inflight += 1
985            next_inflight_num_gets += delta_gets
986            next_inflight_num_sets += delta_sets
987            next_inflight_num_deletes += delta_deletes
988            next_inflight_num_arpas += delta_arpas
989            next_inflight_num_queries += delta_queries
990
991        next_msg = self.inflight_complete(next_grp)
992
993        latency_cmd = None
994        latency_start = 0
995        latency_end = 0
996
997        if self.inflight > 0:
998            # Receive replies from the previous batch of inflight requests.
999            self.xfer_recv += self.inflight_recv(self.inflight, self.inflight_grp)
1000            self.inflight_end_time = time.time()
1001            self.ops += self.inflight
1002            if self.sc:
1003                self.sc.ops_stats({'tot-gets':    self.inflight_num_gets,
1004                                   'tot-sets':    self.inflight_num_sets,
1005                                   'tot-deletes': self.inflight_num_deletes,
1006                                   'tot-arpas':   self.inflight_num_arpas,
1007                                   'tot-queries': self.inflight_num_queries,
1008                                   'start-time':  self.inflight_start_time,
1009                                   'end-time':    self.inflight_end_time})
1010
1011        if len(self.queue) > 0:
1012            # Use the first request in the batch to measure single
1013            # request latency.
1014            grp = self.inflight_start()
1015            latency_cmd, key_num, key_str, data, expiration = self.queue[0]
1016            self.cmd_append(latency_cmd, key_num, key_str, data, expiration, grp)
1017            msg = self.inflight_complete(grp)
1018
1019            latency_start = time.time()
1020            self.xfer_sent += self.inflight_send(msg)
1021            self.xfer_recv += self.inflight_recv(1, grp, expectBuffer=False)
1022            latency_end = time.time()
1023
1024            self.ops += 1
1025
1026        self.queue = []
1027
1028        self.inflight_reinit()
1029        if next_inflight > 0:
1030            self.inflight = next_inflight
1031            self.inflight_num_gets = next_inflight_num_gets
1032            self.inflight_num_sets = next_inflight_num_sets
1033            self.inflight_num_deletes = next_inflight_num_deletes
1034            self.inflight_num_arpas = next_inflight_num_arpas
1035            self.inflight_num_queries = next_inflight_num_queries
1036            self.inflight_start_time = time.time()
1037            self.inflight_grp = next_grp
1038            self.xfer_sent += self.inflight_send(next_msg)
1039
1040        if latency_cmd:
1041            delta = latency_end - latency_start
1042            self.add_timing_sample(latency_cmd, delta)
1043
1044        if self.sc:
1045            if self.ops - self.previous_ops > self.stats_ops:
1046                self.previous_ops = self.ops
1047                self.save_stats()
1048                log.debug("%s save_stats : %s" % (self.why, latency_cmd))
1049
1050    def save_stats(self, cur_time=0):
1051        for key in self.cur:
1052            if key.startswith('latency-'):
1053                histo = self.cur.get(key, None)
1054                if histo:
1055                    self.sc.latency_stats(key, histo, cur_time)
1056                    if key.endswith('-recent'):
1057                        self.cur[key] = {}
1058        self.sc.sample(self.cur)
1059
1060    def cmd_append(self, cmd, key_num, key_str, data, expiration, grp):
1061        self.cmds += 1
1062        if cmd[0] == 'g' or cmd[0] == 'q':
1063            hdr, vbucketId = self.header(CMD_GET, key_str, '', opaque=self.cmds)
1064            m = self.inflight_append_buffer(grp, vbucketId, CMD_GET, self.cmds)
1065            m.append(hdr)
1066            m.append(key_str)
1067            return 1, 0, 0, 0, 0
1068        elif cmd[0] == 'd':
1069            hdr, vbucketId = self.header(CMD_DELETE, key_str, '', opaque=self.cmds)
1070            m = self.inflight_append_buffer(grp, vbucketId, CMD_DELETE, self.cmds)
1071            m.append(hdr)
1072            m.append(key_str)
1073            return 0, 0, 1, 0, 0
1074
1075        rv = (0, 1, 0, 0, 0)
1076        curr_cmd = CMD_SET
1077        curr_extra = struct.pack(SET_PKT_FMT, 0, expiration)
1078
1079        if cmd[0] == 'a':
1080            rv = (0, 0, 0, 1, 0)
1081            curr_cmd, have_extra = self.arpa[self.cur.get('cur-sets', 0) % len(self.arpa)]
1082            if not have_extra:
1083                curr_extra = ''
1084
1085        hdr, vbucketId = self.header(curr_cmd, key_str, data,
1086                                     extra=curr_extra, opaque=key_num)
1087        m = self.inflight_append_buffer(grp, vbucketId, curr_cmd, self.cmds)
1088        m.append(hdr)
1089        if curr_extra:
1090            m.append(curr_extra)
1091        m.append(key_str)
1092        m.append(data)
1093        return rv
1094
1095    def num_ops(self, cur):
1096        return self.ops
1097
1098    def recvMsg(self):
1099        sock = self.conn.s
1100        buf = self.buf
1101        pkt, buf = self.readbytes(sock, MIN_RECV_PACKET, buf)
1102        magic, cmd, keylen, extralen, dtype, errcode, datalen, opaque, cas = \
1103            struct.unpack(RES_PKT_FMT, pkt)
1104        if magic != RES_MAGIC_BYTE:
1105            raise Exception("Unexpected recvMsg magic: " + str(magic))
1106        val, buf = self.readbytes(sock, datalen, buf)
1107        self.buf = buf
1108        if not self.obs_key_cas and cmd == CMD_SET:
1109            self.obs_key_cas[opaque] = cas  # opaque is the key_num
1110        if not self.woq_key_cas and cmd == CMD_SET:
1111            self.woq_key_cas[opaque] = cas
1112        if "cor" in self.cfg and cmd == CMD_SET:
1113            self.cor_key_cas[opaque] = cas
1114        return cmd, keylen, extralen, errcode, datalen, opaque, val, buf
1115
1116
1117class StoreMembaseBinary(StoreMemcachedBinary):
1118
1119    def connect_host_port(self, host, port, user, pswd, bucket="default"):
1120        """
1121        Connect to the server host using REST API.
1122        Username and password should be rest_username and rest_password, \
1123        generally they are different from ssh identities.
1124        """
1125        from lib.memcached.helper.data_helper import VBucketAwareMemcached
1126
1127        info = {"ip": host, "port": port,
1128                'username': user or self.cfg.get("rest_username", "Administrator"),
1129                'password': pswd or self.cfg.get("rest_password", "password")}
1130
1131        self.rest = RestConnection(info)
1132        self.awareness = VBucketAwareMemcached(self.rest, bucket, info)
1133        self.backoff = 0
1134        self.xfer_sent = 0
1135        self.xfer_recv = 0
1136
1137    def flush_level(self):
1138        f = StoreMemcachedBinary.flush_level(self)
1139        return f * len(self.awareness.memcacheds)
1140
1141    def inflight_start(self):
1142        return {
1143            's_bufs': {},  # Key is server str, value is [] of buffer.
1144            's_cmds': {}   # Key is server str, value is int (number of cmds).
1145        }
1146
1147    def inflight_complete(self, inflight_grp):
1148        rv = []  # Array of tuples (server, buffer).
1149        s_bufs = inflight_grp['s_bufs']
1150        for server in s_bufs.keys():
1151            buffers = s_bufs[server]
1152            rv.append((server, ''.join(buffers)))
1153        return rv
1154
1155    def inflight_send(self, inflight_msg):
1156        """
1157        If timeout value is 0,
1158        blocks until everything been sent out \
1159        or the connection breaks.
1160        """
1161        timeout_sec = self.cfg.get("socket-timeout", 0)
1162
1163        sent_total = 0
1164        for server, buf in inflight_msg:
1165
1166            length = len(buf)
1167            if length == 0:
1168                continue
1169
1170            sent_tuple = 0   # byte sent out per tuple in inflight_msg
1171            while sent_tuple < length:
1172                try:
1173                    conn = self.awareness.memcacheds[server]
1174                    if timeout_sec > 0:
1175                        conn.s.settimeout(timeout_sec)
1176                    sent = conn.s.send(buf)
1177                    if sent == 0:
1178                        self.save_error("socket.send returned 0")
1179                        log.error("socket.send returned 0")
1180                        break
1181                    sent_tuple += sent
1182                except socket.timeout:
1183                    self.save_error("socket timeout")
1184                    log.error("socket timeout")
1185                    break
1186                except Exception, error:
1187                    self.save_error(error)
1188                    log.error(error)
1189                    break
1190
1191            sent_total += sent_tuple
1192        return sent_total
1193
1194    def inflight_recv(self, inflight, inflight_grp, expectBuffer=None):
1195        received = 0
1196        s_cmds = inflight_grp['s_cmds']
1197        reset_my_awareness = False
1198        backoff = False
1199
1200        for server in s_cmds.keys():
1201            try:
1202                conn = self.awareness.memcacheds[server]
1203                try:
1204                    recvBuf = conn.recvBuf
1205                except AttributeError:
1206                    recvBuf = ''
1207                if expectBuffer == False and recvBuf != '':
1208                    raise Exception("Was expecting empty buffer, but have (" +
1209                                    str(len(recvBuf)) + "): " + recvBuf)
1210                cmds = s_cmds[server]
1211                for i in range(cmds):
1212                    try:
1213                        rcmd, keylen, extralen, errcode, datalen, ropaque, val, recvBuf = \
1214                            self.recvMsgSockBuf(conn.s, recvBuf)
1215                        received += datalen + MIN_RECV_PACKET
1216                        if errcode == ERR_NOT_MY_VBUCKET:
1217                            reset_my_awareness = True
1218                        elif errcode == ERR_ENOMEM or \
1219                                errcode == ERR_EBUSY or \
1220                                errcode == ERR_ETMPFAIL:
1221                            backoff = True
1222                            self.save_error("errorcode = %s" % errcode)
1223                            if errcode == ERR_ENOMEM:
1224                                log.error("errorcode = ENOMEM")
1225                            # Don't log backoffs due to ETMPFAIL/EBUSY
1226                    except Exception, error:
1227                        self.save_error(error)
1228                        log.error(error)
1229                        reset_my_awareness = True
1230                        backoff = True
1231                conn.recvBuf = recvBuf
1232            except Exception, error:
1233                self.save_error(error)
1234                log.error(error)
1235                reset_my_awareness = True
1236                backoff = True
1237
1238        if backoff:
1239            self.backoff = max(self.backoff, 0.1) * \
1240                self.cfg.get('backoff-factor', 2.0)
1241            if self.backoff > 0:
1242                self.cur['cur-backoffs'] = self.cur.get('cur-backoffs', 0) + 1
1243                log.info("inflight recv backoff = %s" % self.backoff)
1244                time.sleep(self.backoff)
1245        else:
1246            self.backoff = 0
1247
1248        if reset_my_awareness:
1249            try:
1250                self.awareness.reset()
1251            except Exception, error:
1252                self.save_error("awareness.reset: {0}".format(error))
1253                log.error("awareness.reset: %s", error)
1254
1255        return received
1256
1257    def recvMsgSockBuf(self, sock, buf):
1258        pkt, buf = self.readbytes(sock, MIN_RECV_PACKET, buf)
1259        magic, cmd, keylen, extralen, dtype, errcode, datalen, opaque, cas = \
1260            struct.unpack(RES_PKT_FMT, pkt)
1261        if magic != RES_MAGIC_BYTE:
1262            raise Exception("Unexpected recvMsg magic: " + str(magic))
1263        if not self.obs_key_cas and cmd == CMD_SET:
1264            self.obs_key_cas[opaque] = cas  # opaque is the key_num
1265        if not self.woq_key_cas and cmd == CMD_SET:
1266            self.woq_key_cas[opaque] = cas  # opaque is the key_num
1267        if "cor" in self.cfg and cmd == CMD_SET:
1268            self.cor_key_cas[opaque] = cas  # opaque is the key_num
1269        val, buf = self.readbytes(sock, datalen, buf)
1270        return cmd, keylen, extralen, errcode, datalen, opaque, val, buf
1271
1272    def inflight_append_buffer(self, grp, vbucketId, opcode, opaque):
1273        s_bufs = grp['s_bufs']
1274        s_cmds = grp['s_cmds']
1275        s = self.awareness.vBucketMap[vbucketId]
1276        m = s_bufs.get(s, None)
1277        if m is None:
1278            m = []
1279            s_bufs[s] = m
1280            s_cmds[s] = 0
1281        s_cmds[s] += 1
1282        return m
1283
1284
1285class StoreMemcachedAscii(Store):
1286
1287    def connect(self, target, user, pswd, cfg, cur, bucket="default", backups=None):
1288        self.cfg = cfg
1289        self.cur = cur
1290        self.target = target
1291        self.host_port = (target + ":11211").rsplit(':', 1)[0:2]
1292        self.host_port[1] = int(self.host_port[1])
1293        self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1294        self.skt.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1295        self.skt.connect(tuple(self.host_port))
1296        self.queue = []
1297        self.ops = 0
1298        self.previous_ops = 0
1299        self.buf = ''
1300        self.arpa = ['add', 'replace', 'append', 'prepend']
1301        self.xfer_sent = 0
1302        self.xfer_recv = 0
1303
1304    def command(self, c):
1305        self.queue.append(c)
1306        if len(self.queue) > (self.cur.get('batch') or
1307                              self.cfg.get('batch', 100)):
1308            self.flush()
1309            return True
1310        return False
1311
1312    def command_send(self, cmd, key_num, key_str, data, expiration):
1313        if cmd[0] == 'g' or cmd[0] == 'q':
1314            return cmd + ' ' + key_str + '\r\n'
1315        if cmd[0] == 'd':
1316            return 'delete ' + key_str + '\r\n'
1317
1318        c = 'set'
1319        if cmd[0] == 'a':
1320            c = self.arpa[self.cur.get('cur-sets', 0) % len(self.arpa)]
1321        return "%s %s 0 %s %s\r\n%s\r\n" % (c, key_str, expiration,
1322                                            len(data), data)
1323
1324    def command_recv(self, cmd, key_num, key_str, data, expiration):
1325        buf = self.buf
1326        if cmd[0] == 'g' or cmd[0] == 'q':
1327            # GET...
1328            line, buf = self.readline(self.skt, buf)
1329            while line and line != 'END':
1330                # line == "VALUE k flags len"
1331                rvalue, rkey, rflags, rlen = line.split()
1332                data, buf = self.readbytes(self.skt, int(rlen) + 2, buf)
1333                line, buf = self.readline(self.skt, buf)
1334        elif cmd[0] == 'd':
1335            # DELETE...
1336            line, buf = self.readline(self.skt, buf)  # line == "DELETED"
1337        else:
1338            # SET...
1339            line, buf = self.readline(self.skt, buf)  # line == "STORED"
1340        self.buf = buf
1341
1342    def flush(self):
1343        m = []
1344        for c in self.queue:
1345            cmd, key_num, key_str, data, expiration = c
1346            m.append(self.command_send(cmd, key_num, key_str, data, expiration))
1347
1348        self.skt.send(''.join(m))
1349
1350        for c in self.queue:
1351            cmd, key_num, key_str, data, expiration = c
1352            self.command_recv(cmd, key_num, key_str, data, expiration)
1353
1354        self.ops += len(self.queue)
1355        self.queue = []
1356
1357    def num_ops(self, cur):
1358        return self.ops
1359
1360    def readline(self, skt, buf):
1361        while True:
1362            index = buf.find('\r\n')
1363            if index >= 0:
1364                break
1365            data = skt.recv(4096)
1366            if not data:
1367                return '', ''
1368            buf += data
1369        return buf[:index], buf[index + 2:]
1370
1371# --------------------------------------------------------
1372
1373# A key is a 16 char hex string.
1374
1375
1376def key_to_name(key_str):
1377    return "%s %s" % (key_str[-16:-12], key_str[-4:-1])
1378
1379
1380def key_to_email(key_str):
1381    return "%s@%s.com" % (key_str[-16:-12], key_str[-13:-11])
1382
1383
1384def key_to_city(key_str):
1385    return key_str[-12:-9]
1386
1387
1388def key_to_country(key_str):
1389    return key_str[-9:-7]
1390
1391
1392def key_to_realm(key_str):
1393    return key_str[-7:-5]
1394
1395
1396def key_to_coins(key_str):
1397    sub_key = key_str[-16:]
1398    return max(0.0, int(sub_key[0:4], 16) / 100.0)
1399
1400
1401def key_to_category(key_str):
1402    return int(key_str[-12], 16) % 3
1403
1404
1405def key_to_achievements(key_str):
1406    next = 300
1407    achievements = []
1408    sub_key = key_str[-16:]
1409    for i in range(len(sub_key)):
1410        next = (next + int(sub_key[i], 16) * i) % 500
1411        if next < 256:
1412            achievements.append(next)
1413    return achievements
1414
1415doc_cache = {}
1416
1417
1418def gen_doc_string(key_num, key_str, min_value_size, suffix, json,
1419                   cache=None, key_name="key", suffix_ex="", whitespace=True):
1420    global doc_cache
1421
1422    c = "{"
1423    if not json:
1424        c = "*"
1425
1426    d = None
1427    if cache:
1428        d = doc_cache.get(key_num, None)
1429
1430    if d is None:
1431        d = """"%s":"%s",
1432 "key_num":%s,
1433 "name":"%s",
1434 "email":"%s",
1435 "city":"%s",
1436 "country":"%s",
1437 "realm":"%s",
1438 "coins":%s,
1439 "category":%s,
1440 "achievements":%s,""" % (key_name, key_str,
1441                          key_num,
1442                          key_to_name(key_str),
1443                          key_to_email(key_str),
1444                          key_to_city(key_str),
1445                          key_to_country(key_str),
1446                          key_to_realm(key_str),
1447                          key_to_coins(key_str),
1448                          key_to_category(key_str),
1449                          key_to_achievements(key_str))
1450        if not whitespace:
1451            d = d.replace("\n ", "")
1452        if cache:
1453            doc_cache[key_num] = d
1454
1455    return "%s%s%s%s" % (c, d, suffix_ex, suffix)
1456
1457# --------------------------------------------------------
1458
1459PROTOCOL_STORE = {'memcached-ascii': StoreMemcachedAscii,
1460                  'memcached-binary': StoreMemcachedBinary,
1461                  'membase-binary': StoreMembaseBinary,
1462                  'none-binary': Store,
1463                  'none': Store}
1464
1465
1466def final_report(cur, store, total_time):
1467    """Report final stats"""
1468    log.info(dict_to_s(cur))
1469    if cur.get('cur-queries', 0):
1470        total_cmds = cur.get('cur-queries', 0)
1471    else:
1472        total_cmds = cur.get('cur-gets', 0) + cur.get('cur-sets', 0)
1473    log.info("ops/sec: %s" % (total_cmds / float(total_time)))
1474    if store.errors:
1475        log.warn("errors:\n%s", json.dumps(store.errors, indent=4))
1476
1477
1478def run(cfg, cur, protocol, host_port, user, pswd, stats_collector=None,
1479        stores=None, ctl=None, heartbeat=0, why="", bucket="default", backups=None, collection=None):
1480    if isinstance(cfg['min-value-size'], str):
1481        cfg['min-value-size'] = string.split(cfg['min-value-size'], ",")
1482    if not isinstance(cfg['min-value-size'], list):
1483        cfg['min-value-size'] = [cfg['min-value-size']]
1484
1485    cfg['body'] = {}
1486    cfg['suffix'] = {}
1487
1488    for i in range(len(cfg['min-value-size'])):
1489        mvs = int(cfg['min-value-size'][i])
1490        cfg['min-value-size'][i] = mvs
1491        cfg['body'][mvs] = 'x'
1492        while len(cfg['body'][mvs]) < mvs:
1493            cfg['body'][mvs] = cfg['body'][mvs] + \
1494                md5(str(len(cfg['body'][mvs]))).hexdigest()
1495        cfg['suffix'][mvs] = "\"body\":\"" + cfg['body'][mvs] + "\"}"
1496
1497    ctl = ctl or {'run_ok': True}
1498
1499    threads = []
1500
1501    for i in range(cfg.get('threads', 1)):
1502        store = None
1503        if stores and i < len(stores):
1504            store = stores[i]
1505
1506        if store is None:
1507            store = PROTOCOL_STORE[protocol]()
1508
1509        log.debug("store: %s - %s" % (i, store.__class__))
1510
1511        store.connect(host_port, user, pswd, cfg, cur, bucket=bucket, backups=backups)
1512        store.stats_collector(stats_collector)
1513
1514        threads.append(threading.Thread(target=run_worker,
1515                                        args=(ctl, cfg, cur, store,
1516                                              "thread-" + str(i) + ": ")))
1517
1518    store.show_some_keys()
1519
1520    if cfg.get("doc-cache", 0) > 0 and cfg.get("doc-gen", 0) > 0:
1521        min_value_size = cfg['min-value-size'][0]
1522        json = cfg.get('json', 1) > 0
1523        cache = cfg.get('doc-cache', 0)
1524        log.debug("doc-gen...")
1525        gen_start = time.time()
1526        for key_num in range(cfg.get("max-items", 0)):
1527            key_str = prepare_key(key_num, cfg.get('prefix', ''))
1528            store.gen_doc(key_num, key_str, min_value_size, json, cache)
1529        gen_end = time.time()
1530        log.debug("doc-gen...done (elapsed: %s, docs/sec: %s)" %
1531                 (gen_end - gen_start,
1532                  float(key_num) / (gen_end - gen_start)))
1533
1534    def stop_after(secs):
1535        time.sleep(secs)
1536        log.info("exiting because of stop_after time")
1537        ctl['run_ok'] = False
1538
1539    if cfg.get('time', 0) > 0:
1540        t = threading.Thread(target=stop_after, args=(cfg.get('time', 0),))
1541        t.daemon = True
1542        t.start()
1543
1544    t_start = time.time()
1545
1546    try:
1547        if len(threads) <= 1:
1548            run_worker(ctl, cfg, cur, store, "", heartbeat, why)
1549        else:
1550            for thread in threads:
1551                thread.daemon = True
1552                thread.start()
1553
1554            while threads:
1555                threads[0].join(1)
1556                threads = [t for t in threads if t.isAlive()]
1557    except KeyboardInterrupt:
1558        log.warn("exiting because of KeyboardInterrupt")
1559        ctl['run_ok'] = False
1560
1561    t_end = time.time()
1562
1563    final_report(cur, store, total_time=t_end - t_start)
1564
1565    threads = [t for t in threads if t.isAlive()]
1566    heartbeat = 0
1567    while threads:
1568        threads[0].join(1)
1569        heartbeat += 1
1570        if heartbeat >= 60:
1571            heartbeat = 0
1572            log.info("mcsoda is running with %s threads" % len(threads))
1573        threads = [t for t in threads if t.isAlive()]
1574
1575    ctl['run_ok'] = False
1576    if ctl.get('shutdown_event') is not None:
1577        ctl['shutdown_event'].set()
1578
1579    log.info("%s stopped running." % why)
1580    return cur, t_start, t_end
1581
1582# --------------------------------------------------------
1583
1584
1585def main(argv, cfg_defaults=None, cur_defaults=None, protocol=None, stores=None,
1586         extra_examples=None):
1587    cfg_defaults = cfg_defaults or {
1588        "prefix":             ("",    "Prefix for every item key."),
1589        "max-ops":            (0,     "Max # of ops before exiting. 0 means keep going."),
1590        "max-items":          (-1,    "Max # of items; default 100000."),
1591        "max-creates":        (-1,    "Max # of creates; defaults to max-items."),
1592        "min-value-size":     ("10",  "Min value size (bytes) for SET's; comma-separated."),
1593        "ratio-sets":         (0.1,   "Fraction of requests that should be SET's."),
1594        "ratio-creates":      (0.1,   "Fraction of SET's that should create new items."),
1595        "ratio-misses":       (0.05,  "Fraction of GET's that should miss."),
1596        "ratio-hot":          (0.2,   "Fraction of items to have as a hot item subset."),
1597        "ratio-hot-sets":     (0.95,  "Fraction of SET's that hit the hot item subset."),
1598        "ratio-hot-gets":     (0.95,  "Fraction of GET's that hit the hot item subset."),
1599        "ratio-deletes":      (0.0,   "Fraction of SET updates that shold be DELETE's."),
1600        "ratio-arpas":        (0.0,   "Fraction of SET non-DELETE'S to be 'a-r-p-a' cmds."),
1601        "ratio-expirations":  (0.0,   "Fraction of SET's that use the provided expiration."),
1602        "ratio-queries":      (0.0,   "Fraction of GET hits that should be queries."),
1603        "expiration":         (0,     "Expiration time parameter for SET's"),
1604        "exit-after-creates": (0,     "Exit after max-creates is reached."),
1605        "threads":            (1,     "Number of client worker threads to use."),
1606        "batch":              (100,   "Batch/pipeline up this # of commands per server."),
1607        "json":               (1,     "Use JSON documents. 0 to generate binary documents."),
1608        "time":               (0,     "Stop after this many seconds if > 0."),
1609        "max-ops-per-sec":    (0,     "When >0, max ops/second target performance."),
1610        "report":             (40000, "Emit performance output after this many requests."),
1611        "histo-precision":    (1,     "Precision of histogram bins."),
1612        "vbuckets":           (0,     "When >0, vbucket hash in memcached-binary protocol."),
1613        "doc-cache":          (1,     "When 1, cache docs; faster, but uses O(N) memory."),
1614        "doc-gen":            (1,     "When 1 and doc-cache, pre-generate docs at start."),
1615        "backoff-factor":     (2.0,   "Exponential backoff factor on ETMPFAIL errors."),
1616        "hot-shift":          (0,     "# of keys/sec that hot item subset should shift."),
1617        "random":             (0,     "When 1, use random keys for gets and updates."),
1618        "queries":            ("",    "Query templates; semicolon-separated."),
1619        "socket-timeout":     (0,     "Used for socket.settimeout(), in seconds.")}
1620
1621    cur_defaults = cur_defaults or {
1622        "cur-items":    (0, "Number of items known to already exist."),
1623        "cur-sets":     (0, "Number of sets already done."),
1624        "cur-creates":  (0, "Number of sets that were creates."),
1625        "cur-gets":     (0, "Number of gets already done."),
1626        "cur-deletes":  (0, "Number of deletes already done."),
1627        "cur-arpas":    (0, "# of add/replace/prepend/append's (a-r-p-a) cmds."),
1628        "cur-queries":  (0, "Number of gets that were view/index queries."),
1629        "cur-base":     (0, "Base of numeric key range. 0 by default.")}
1630
1631    if len(argv) < 2 or "-h" in argv or "--help" in argv:
1632        print("usage: %s [memcached[-binary|-ascii]://][user[:pswd]@]host[:port] [key=val]*\n" %
1633              (argv[0]))
1634        print("  default protocol = memcached-binary://")
1635        print("  default port     = 11211\n")
1636        examples = ["examples:",
1637                    "  %s membase://127.0.0.1:8091 max-items=1000000 json=1",
1638                    "  %s memcached://127.0.0.1:11210 vbuckets=1024",
1639                    "  %s memcached://127.0.0.1:11211",
1640                    "  %s memcached-ascii://127.0.0.1:11211",
1641                    "  %s memcached-binary://127.0.0.1:11211",
1642                    "  %s 127.0.0.1:11211",
1643                    "  %s 127.0.0.1",
1644                    "  %s my-test-bucket@127.0.0.1",
1645                    "  %s my-test-bucket:MyPassword@127.0.0.1",
1646                    "  %s none://"]
1647        if extra_examples:
1648            examples = examples + extra_examples
1649        for s in examples:
1650            if s.find("%s") > 0:
1651                print(s % (argv[0]))
1652            else:
1653                print(s)
1654        print("")
1655        print("optional key=val's and their defaults:")
1656        for d in [cfg_defaults, cur_defaults]:
1657            for k in sorted(d.iterkeys()):
1658                print("  %s = %s %s" %
1659                      (string.ljust(k, 18), string.ljust(str(d[k][0]), 5), d[k][1]))
1660        print("")
1661        print("  TIP: min-value-size can be comma-separated values: min-value-size=10,256,1024")
1662        print("")
1663        sys.exit(-1)
1664
1665    cfg = {}
1666    cur = {}
1667    err = {}
1668
1669    for (o, d) in [(cfg, cfg_defaults), (cur, cur_defaults)]:  # Parse key=val pairs.
1670        for (dk, dv) in d.iteritems():
1671            o[dk] = dv[0]
1672        for kv in argv[2:]:
1673            s = (kv + '=').split('=')[0:-1]
1674            k = s[0]
1675            v = '='.join(s[1:])
1676            if k and v and k in o:
1677                if not isinstance(o[k], str):
1678                    try:
1679                        v = ({'y': '1', 'n': '0'}).get(v, v)
1680                        for parse in [float, int]:
1681                            if str(parse(v)) == v:
1682                                v = parse(v)
1683                    except:
1684                        err[kv] = err.get(kv, 0) + 1
1685                o[k] = v
1686            else:
1687                err[kv] = err.get(kv, 0) + 1
1688
1689    for kv in err:
1690        if err[kv] > 1:
1691            log.error("problem parsing key=val option: " + kv)
1692    for kv in err:
1693        if err[kv] > 1:
1694            sys.exit(-1)
1695
1696    if cfg.get('max-items', 0) < 0 and cfg.get('max-creates', 0) < 0:
1697        cfg['max-items'] = 100000
1698    if cfg.get('max-items', 0) < 0:
1699        cfg['max-items'] = cfg.get('max-creates', 0)
1700    if cfg.get('max-creates', 0) < 0:
1701        cfg['max-creates'] = cfg.get('max-items', 0)
1702
1703    for o in [cfg, cur]:
1704        for k in sorted(o.iterkeys()):
1705            log.info("    %s = %s" % (string.ljust(k, 20), o[k]))
1706
1707    protocol = protocol or '-'.join(((["memcached"] +
1708                                    argv[1].split("://"))[-2] + "-binary").split('-')[0:2])
1709    host_port = ('@' + argv[1].split("://")[-1]).split('@')[-1] + ":11211"
1710    user, pswd = (('@' + argv[1].split("://")[-1]).split('@')[-2] + ":").split(':')[0:2]
1711
1712    cfg["timing-suffixes"] = [""]
1713
1714    run(cfg, cur, protocol, host_port, user, pswd, stores=stores)
1715
1716if __name__ == "__main__":
1717    main(sys.argv)
1718