xref: /6.0.3/couchbase-cli/pump_gen.py (revision 6b08a59c)
1#!/usr/bin/env python
2
3import couchbaseConstants
4import pump
5import random
6import string
7
8class GenSource(pump.Source):
9    """Generates simple SET/GET workload, useful for basic testing.
10       Examples:
11         ./cbtransfer gen: http://10.3.121.192:8091
12         ./cbtransfer gen:max-items=50000,min-value-size=10,exit-after-creates=1,\
13                          prefix=steve1-,ratio-sets=1.0 \
14                      http://10.3.121.192:8091 -B my-other-bucket --threads=10
15    """
16    def __init__(self, opts, spec, source_bucket, source_node,
17                 source_map, sink_map, ctl, cur):
18        super(GenSource, self).__init__(opts, spec, source_bucket, source_node,
19                                        source_map, sink_map, ctl, cur)
20        self.done = False
21        self.body = None
22        self.cur_ops = source_map['cfg']['cur-ops']
23        self.cur_gets = source_map['cfg']['cur-gets']
24        self.cur_sets = source_map['cfg']['cur-sets']
25        self.cur_items = source_map['cfg']['cur-items']
26
27    @staticmethod
28    def can_handle(opts, spec):
29        """The gen spec follows gen:[key=value[,key=value]] format."""
30        return spec.startswith("gen:")
31
32    @staticmethod
33    def check(opts, spec):
34        rv, cfg = GenSource.parse_spec(opts, spec)
35        if rv != 0:
36            return rv, None
37        return 0, {'cfg': cfg,
38                   'spec': spec,
39                   'buckets': [{'name': 'default',
40                                'nodes': [{'hostname': 'N/A-' + str(i)}
41                                          for i in range(opts.threads)]}]}
42
43    @staticmethod
44    def parse_spec(opts, spec):
45        """Parse the comma-separated key=value configuration from the gen spec.
46           Names and semantics were inspired from subset of mcsoda parameters."""
47        cfg = {'cur-ops': 0,
48               'cur-gets': 0,
49               'cur-sets': 0,
50               'cur-items': 0,
51               'exit-after-creates': 0,
52               'max-items': 10000,
53               'min-value-size': 10,
54               'prefix': "",
55               'ratio-sets': 0.05,
56               'json': 0,
57               'low-compression': False}
58        for kv in spec[len("gen:"):].split(','):
59            if kv:
60                k = kv.split('=')[0].strip()
61                v = kv.split('=')[1].strip()
62                try:
63                    if k in cfg:
64                        cfg[k] = type(cfg[k])(v)
65                    else:
66                        return "error: unknown workload gen parameter: %s" % (k), None
67                except ValueError:
68                    return "error: could not parse value from: %s" % (kv), None
69        return 0, cfg
70
71    @staticmethod
72    def provide_design(opts, source_spec, source_bucket, source_map):
73        """No design from a GenSource."""
74        return 0, None
75
76    def provide_batch(self):
77        """Provides a batch of messages, with GET/SET ratios and keys
78           controlled by a mcsoda-inspired approach, but simpler."""
79        if self.done:
80            return 0, None
81
82        cfg = self.source_map['cfg']
83        prefix = cfg['prefix']
84        max_items = cfg['max-items']
85        ratio_sets = cfg['ratio-sets']
86        exit_after_creates = cfg['exit-after-creates']
87        low_compression  = cfg['low-compression']
88        itr = None
89        collections = self.opts.collection
90        if collections:
91            itr = iter(collections)
92
93        json = cfg['json']
94        if not self.body:
95
96            if low_compression:
97                # Generate a document which snappy will struggle to compress.
98                # Useful if your creating data-sets which utilise disk.
99                random.seed(0) # Seed to a fixed value so we always have the same document pattern.
100                document = ''.join(random.choice(string.ascii_uppercase) for _ in range(cfg['min-value-size']))
101            else:
102                # else a string of 0 is fine, but will compress very well.
103                document = "0" * cfg['min-value-size']
104
105            if json:
106                self.body = '{"name": "%s%s", "age": %s, "index": %s,' + \
107                            ' "body": "%s"}' % document
108            else:
109                self.body = document
110
111        batch = pump.Batch(self)
112
113        batch_max_size = self.opts.extra['batch_max_size']
114        batch_max_bytes = self.opts.extra['batch_max_bytes']
115
116        vbucket_id = 0x0000ffff
117        cas, exp, flg = 0, 0, 0
118
119        while (batch.size() < batch_max_size and
120               batch.bytes < batch_max_bytes):
121            if ratio_sets >= float(self.cur_sets) / float(self.cur_ops or 1):
122                self.cur_sets = self.cur_sets + 1
123                cmd = couchbaseConstants.CMD_TAP_MUTATION
124                if self.cur_items < max_items:
125                    key = self.cur_items
126                    self.cur_items = self.cur_items + 1
127                else:
128                    key = self.cur_sets % self.cur_items
129            else:
130                self.cur_gets = self.cur_gets + 1
131                cmd = couchbaseConstants.CMD_GET
132                key = self.cur_gets % self.cur_items
133            self.cur_ops = self.cur_ops + 1
134
135            if json:
136                value = self.body % (prefix, key, key % 101, key)
137            else:
138                value = self.body
139
140            # generate a collection key
141            if itr:
142                try:
143                    c = itr.next()
144                except StopIteration:
145                    itr = iter(collections)
146                    c = itr.next()
147                docKey = c + self.opts.separator + prefix + str(key)
148            else:
149                docKey = prefix + str(key)
150
151            msg = (cmd, vbucket_id, docKey, flg, exp, cas, '', value, 0, 0, 0, 0)
152            batch.append(msg, len(value))
153
154            if exit_after_creates and self.cur_items >= max_items:
155                self.done = True
156                return 0, batch
157
158        if batch.size() <= 0:
159            return 0, None
160        return 0, batch
161
162    @staticmethod
163    def total_msgs(opts, source_bucket, source_node, source_map):
164        """Returns max-items only if exit-after-creates was specified.
165           Else, total msgs is unknown as GenSource does not stop generating."""
166        if source_map['cfg']['exit-after-creates'] and source_map['cfg']['ratio-sets'] > 0:
167            ratio = source_map['cfg']['ratio-sets']
168            ops = 0
169            sets = 0
170            while sets != source_map['cfg']['max-items']:
171                if ratio >= float(sets)/float(ops or 1):
172                    sets += 1
173                ops += 1
174            return 0, ops
175        return 0, None
176