xref: /5.5.2/testrunner/scripts/thanosied.py (revision db0fc172)
1import argparse
2import multiprocessing
3import time
4from copy import deepcopy
5from multiprocessing.dummy import Pool
6
7from couchbase.bucket import Bucket, LOCKMODE_WAIT, CouchbaseError, \
8    ArgumentError, NotFoundError, TimeoutError
9from couchbase.cluster import Cluster, PasswordAuthenticator
10from decorator import decorator
11
12description = """
13Upsert some documents into a bucket using the couchbase python client.
14The tool can create documents and update them based on the count parameter.
15For pure creates, give passes=1. For both create and updates, give passes=(num of mutations required + 1).
16For pure updates, include update_counter = (value of update in document + 1).
17For deletes, include delete and num_delete parameters.
18The tool can also validate the data loaded. Include validation parameter for validation.
19For no validation, give validation=0
20For validation at the end of the load generation, give validation=1
21For validation after each pass of mutation, give validation=2
22For only validation and no mutations, give validation=3
23With validation=3, pass -update_counter=value of update in a document (and --deleted and --deleted_items=docs_deleted)
24The rate of mutations can be limited using rate_limit (to the best of ability)
25Expiry of documents can be set using ttl parameter
26Validation of expiry of documents can be done by using --validate_expired
27"""
28
29@decorator
30def with_sleep(method, *args):
31    self = args[0]
32    start_time = time.time()
33    return_value = method(self, *args[1:])
34    end_time = time.time()
35    exec_time = end_time - start_time
36    if self.rate_limited and exec_time < self.thread_min_time:
37        time.sleep(self.thread_min_time - exec_time)
38    return return_value
39
40UPSERT = "upsert"
41DELETE = "delete"
42VALIDATE = "validate"
43
44def parseArguments():
45    parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawTextHelpFormatter)
46    parser.add_argument('--spec', '-U', default="couchbase://localhost", help='Cluster connections string. ['
47                                                                              'Default=couchbase://localhost]')
48    parser.add_argument('--bucket', '-b', default="default", help='Bucket to connect to')
49    parser.add_argument('--password', '-p', default="password", help='User password')
50    parser.add_argument('--user', '-u', default="Administrator", help='Username')
51    parser.add_argument('--batch_size', '-B', default=5000, help="Batch size of eatch inserts")
52    parser.add_argument('--prefix', '-k', default="Key_", help='Key Prefix')
53    parser.add_argument('--timeout', '-t', default=5, type=int, help='KV Operation Timeout')
54    parser.add_argument('--count', '-c', default=1000, type=int, help='Number of documents in the bucket already')
55    parser.add_argument('--start_document', default=0, type=int, help="Starting document count to start updating from")
56    parser.add_argument('--passes', '-P', default=0, type=int, help="Number of mutation cycles to perform per "
57                                                                    "document (including create)")
58    parser.add_argument('--update_counter', default=0, type=int, help="Starting update counter to start updating from")
59    parser.add_argument('--cb_version', default='5.0', help="Current version of the couchbase cluster")
60    parser.add_argument('--size', default=100, type=int, help="Size of the document to be inserted, in bytes")
61    parser.add_argument('--validation', '-v', default=0, type=int, help="Validate the documents. 0=No validation, "
62                                                                        "1=Validate at end, 2=Validate after each "
63                                                                        "pass, 3=Perform only validation and no "
64                                                                        "mutation. [Default=0]")
65    parser.add_argument('--replicate_to', default=0, type=int, help="Perform durability checking on this many "
66                                                                    "replicas for presence in memory")
67    parser.add_argument('--ttl', default=0, type=int, help="Set expiry timer for documents")
68    parser.add_argument('--delete', default=False, action='store_true', help="Delete documents from bucket")
69    parser.add_argument('--num_delete', default=0, type=int, help='Number of documents to delete')
70    parser.add_argument('--deleted', default=False, action='store_true', help="Was delete of documents run before "
71                                                                              "validation")
72    parser.add_argument('--deleted_items', default=0, type=int, help="Number of documents that were deleted")
73    parser.add_argument('--validate_expired', default=False, action='store_true', help="Validate if documents have "
74                                                                                    "expired")
75    parser.add_argument('--rate_limit', '-r', default=0, type=int, help="Set operations per second limit")
76    parser.add_argument('--threads', '-T', default=5, type=int, help="Number of threads per worker.")
77    parser.add_argument('--max_worker', default=25, type=int, help="Maximum workers to create. warning: Can cause "
78                                                                   "performance degradation with high number")
79    parser.add_argument('--workers', default=5, type=int, help="Number of workers to create")
80    parser.add_argument('--remove_limit', default=False, action='store_true', help="Remove the rate limiter to run "
81                                                                                   "at max capacity")
82    return parser.parse_args()
83
84class Document:
85    def __init__(self, value, size):
86        body_length = size - str(value).__len__() - "val".__len__() - "update".__len__() - "body".__len__()
87        body = "".rjust(body_length, 'a')
88        self.val = int(value)
89        self.update = 0
90        self.body = body
91
92class Batch:
93    def __init__(self):
94        self.start = 0
95        self.end = 0
96        self.operation = ""
97
98class DocumentGenerator:
99    def __init__(self, args):
100        self.spec = args.spec
101        self.bucket_name = args.bucket
102        self.user = args.user
103        self.password = args.password
104        self.cb_version = args.cb_version
105        self.num_items = args.count
106        self.mutations = args.passes
107        self.batch_size = int(args.batch_size)
108        self.key_prefix = args.prefix
109        self.start_document = int(args.start_document)
110        self.previous_mutation_count = int(args.update_counter)
111        self.validation = int(args.validation)
112        self.replicate_to = int(args.replicate_to)
113        self.size = int(args.size)
114        self.timeout = int(args.timeout)
115        self.ttl = int(args.ttl)
116        self.delete = args.delete
117        self.num_delete = int(args.num_delete)
118        self.deleted = args.deleted
119        self.deleted_items = int(args.deleted_items)
120        self.validate_expired = args.validate_expired
121        self.threads = args.threads
122        self.rate_limit = args.rate_limit
123        self.rate_limited = not args.remove_limit
124        self.workers = args.workers
125        self.connections = []
126        self.current_update_counter = 0
127        self.batches = []
128        self.retry_batches = []
129        self.num_completed = 0
130        self.key_exists_error = 0
131        self.wrong_keys = []
132        self.missing_key_val = []
133        self.wrong_keys_replica = []
134        self.missing_key_val_replica = []
135        self.thread_min_time = float(self.threads * self.workers * self.batch_size) / float(self.rate_limit)
136        if "couchbase://" not in self.spec:
137            self.spec = "couchbase://{}".format(self.spec)
138        self.create_connections()
139
140    def create_connections(self):
141        """
142        Create bucket connections. 5 bucket connections are created per instance.
143        :return: Nothing
144        """
145        for i in range(0, self.threads):
146            if self.cb_version > '5':
147                cluster = Cluster(self.spec)
148                auth = PasswordAuthenticator(self.user, self.password)
149                cluster.authenticate(auth)
150                bucket = cluster.open_bucket(self.bucket_name, lockmode=LOCKMODE_WAIT)
151                bucket.timeout = self.timeout
152                self.connections.append(bucket)
153            else:
154                bucket = Bucket('{0}/{1}'.format(self.spec, self.bucket_name), lockmode=LOCKMODE_WAIT)
155                bucket.timeout = self.timeout
156                self.connections.append(bucket)
157
158    def create_upsert_batches(self):
159        """
160        Create the upsert batches for this instance. Each batch contains start and end counter.
161        :return: Nothing
162        """
163        for i in range(self.start_document, self.start_document + self.num_items, self.batch_size):
164            batch = Batch()
165            batch.start = i
166            batch.operation = UPSERT
167            if i + self.batch_size > self.start_document + self.num_items:
168                batch.end = self.start_document + self.num_items
169            else:
170                batch.end = i + self.batch_size
171            self.batches.append(batch)
172
173    def create_delete_batches(self):
174        """
175        Create delete batches for this instance. Each batch contains start and end counter
176        :return:
177        """
178        if self.num_items < self.num_delete:
179            self.num_delete = self.num_items
180        for i in range(self.start_document, self.start_document + self.num_delete, self.batch_size):
181            batch  = Batch()
182            batch.start = i
183            batch.operation = DELETE
184            if i + self.batch_size > self.start_document + self.num_delete:
185                batch.end = self.start_document + self.num_delete
186            else:
187                batch.end = i + self.batch_size
188            self.batches.append(batch)
189
190    def get_upsert_items(self, start, end):
191        """
192        Get the upsert items.
193        :param start: Starting document
194        :param end: End document
195        :return: (dict) Key-value pair of all documents from start to end - 1
196        """
197        items = {}
198        for x in range(start, end):
199            key = "{}{}".format(self.key_prefix, x)
200            document = Document(x, self.size)
201            document.update = self.current_update_counter
202            items[key] = document.__dict__
203        return items
204
205    def get_retry_upsert_items(self):
206        """
207        Get the items that must be retried for upsert.
208        :return: (dict) key-value pair of all documents that should be retried
209        """
210        items = {}
211        for item in self.retry_batches:
212            doc_num = int(item.split('_')[1])
213            document = Document(doc_num, self.size)
214            document.update = self.current_update_counter
215            items[item] = document.__dict__
216        return items
217
218    def get_delete_retry_keys(self):
219        """
220        Get the items that must be retried for deletes.
221        :return: (list) keys of all documents that should be retried for delete
222        """
223        items = self.retry_batches
224        return items
225
226    def get_keys(self, start, end):
227        """
228        Get the keys for get or delete
229        :param start: Starting document
230        :param end: End document
231        :return: (list) Keys of documents to be retrieved or deleted
232        """
233        keys = []
234        for i in range(start, end):
235            keys.append('{}{}'.format(self.key_prefix, i))
236        return keys
237
238    def get_items(self, connection, keys, replica=False):
239        """
240        Get items from couchbase bucket.
241        :param connection: Bucket object for connection
242        :param keys: (list) List of keys to be retrieved
243        :param replica: (bool) Specify if replica has to be retrieved instead of active.
244        :return: (dict) Successful result of get_multi
245        """
246        try:
247            result = connection.get_multi(keys, replica=replica)
248            return result
249        except TimeoutError as e:
250            ok, fail = e.split_results()
251            failed_keys = [key for key in fail]
252            result = self.get_items(connection, failed_keys, replica)
253            result.update(ok)
254            return result
255        except CouchbaseError as e:
256            ok, fail = e.split_results()
257            return ok
258
259    @with_sleep
260    def upsert_items(self, connection, items):
261        """
262        Upsert items into couchbase bucket
263        :param connection:  Bucket object for connection
264        :param items: (dict) Key-value pairs of documents to be inserted
265        :return: number of items successfully upserted.
266        """
267        try:
268            result = connection.upsert_multi(items, ttl=self.ttl, replicate_to=self.replicate_to)
269            return result.__len__()
270        except ArgumentError:
271            self.replicate_to = 0
272            return self.upsert_items(connection, items)
273        except CouchbaseError as e:
274            ok, fail = e.split_results()
275            num_completed = ok.__len__()
276            for key in fail:
277                self.retry_batches.append(key)
278            self.key_exists_error += 1
279            return num_completed
280
281    @with_sleep
282    def delete_items(self, connection, keys):
283        """
284        Delete items from couchbase bucket
285        :param connection: Bucket object for connection
286        :param keys: (list) List of keys to be deleted
287        :return: number of items successfully deleted.
288        """
289        try:
290            result = connection.remove_multi(keys)
291            return result.__len__()
292        except NotFoundError as e:
293            ok, fail = e.split_results()
294            return ok.__len__()
295        except CouchbaseError as e:
296            ok, fail = e.split_results()
297            for key in fail:
298                self.retry_batches.append(key)
299            self.key_exists_error += 1
300            return ok.__len__()
301
302    def upsert_thread(self, connection, batch):
303        items = self.get_upsert_items(batch.start, batch.end)
304        completed = self.upsert_items(connection, items)
305        return completed
306
307    def upsert_thread_pool(self, args):
308        return self.upsert_thread(*args)
309
310    def validate_items(self, connection, start, end, replica=False):
311        """
312        Validate items in the bucket
313        :param connection: Bucket object for connection
314        :param start: Start document number
315        :param end: End document number
316        :param replica: (bool) Specify if replica has to be validated instead of active.
317        :return: Nothing
318        """
319        keys = self.get_keys(start, end)
320        result = self.get_items(connection, keys, replica=replica)
321        if self.validate_expired > 0:
322            if result:
323                for key in result.keys():
324                    if replica:
325                        self.missing_key_val_replica.append(key)
326                        return
327                    else:
328                        self.missing_key_val.append(key)
329                        return
330            else:
331                return
332        for i in range(start, end):
333            key = "{}{}".format(self.key_prefix, i)
334            document = Document(i, self.size)
335            document.update = self.current_update_counter
336            value = document.__dict__
337            if key in result:
338                if self.deleted and (self.start_document + self.deleted_items) > int(key.split("_")[1]):
339                    if replica:
340                        self.missing_key_val_replica.append(key)
341                    else:
342                        self.missing_key_val.append(key)
343                    continue
344                val = result[key].value
345                for k in value.keys():
346                    if k in val and val[k] == value[k]:
347                        continue
348                    else:
349                        if replica:
350                            self.wrong_keys_replica.append(key)
351                        else:
352                            self.wrong_keys.append(key)
353            else:
354                if self.deleted and (self.start_document + self.deleted_items) > int(key.split("_")[1]):
355                    continue
356                if replica:
357                    self.missing_key_val_replica.append(key)
358                else:
359                    self.missing_key_val.append(key)
360
361    def validate_thread(self, connection, batch):
362        self.validate_items(connection, batch.start, batch.end)
363        if self.replicate_to:
364            self.validate_items(connection, batch.start, batch.end, replica=True)
365
366    def validate_thread_pool(self, args):
367        return self.validate_thread(*args)
368
369    def delete_thread(self, connection, batch):
370        keys = self.get_keys(batch.start, batch.end)
371        completed = self.delete_items(connection, keys)
372        return completed
373
374    def delete_thread_pool(self, args):
375        return self.delete_thread(*args)
376
377    def single_upsert_pass(self):
378        """
379        Upsert round. Upsert the required documents and retry if any failures. The method retries till all the
380        documents are successfully upserted without any error.
381        :return: Nothing
382        """
383        self.batches = []
384        self.create_upsert_batches()
385        args = []
386        num_of_connections_available = self.connections.__len__()
387        for i in range(0, self.batches.__len__()):
388            connection = self.connections[i % num_of_connections_available]
389            args.append((connection, self.batches[i]))
390        thread_pool = Pool(self.threads)
391        result = thread_pool.map(self.upsert_thread_pool, args)
392        thread_pool.close()
393        thread_pool.join()
394        for res in result:
395            self.num_completed += res
396        while self.retry_batches:
397            items = self.get_retry_upsert_items()
398            self.retry_batches = []
399            completed = self.upsert_items(self.connections[0], items)
400            self.num_completed += completed
401
402    def single_validate_pass(self):
403        """
404        Validation round. Validate the documents in the bucket.
405        :return: Nothing
406        """
407        self.batches = []
408        self.create_upsert_batches()
409        args = []
410        num_of_connections_available = self.connections.__len__()
411        for i in range(0, self.batches.__len__()):
412            connection = self.connections[i % num_of_connections_available]
413            args.append((connection, self.batches[i]))
414        thread_pool = Pool(self.threads)
415        result = thread_pool.map(self.validate_thread_pool, args)
416        thread_pool.close()
417        thread_pool.join()
418
419    def single_delete_pass(self):
420        """
421        Deletion round. Delete documents in bucket and retry if any failures. The method retries till all required
422        documents are successfully deleted without any error.
423        :return: Nothing
424        """
425        self.batches = []
426        self.create_delete_batches()
427        args = []
428        num_of_connections_available = self.connections.__len__()
429        for i in range(0, self.batches.__len__()):
430            connection = self.connections[i % num_of_connections_available]
431            args.append((connection, self.batches[i]))
432        thread_pool = Pool(self.threads)
433        result = thread_pool.map(self.delete_thread_pool, args)
434        thread_pool.close()
435        thread_pool.join()
436        for res in result:
437            self.num_completed += res
438        while self.retry_batches:
439            keys = self.get_delete_retry_keys()
440            self.retry_batches = []
441            completed = self.delete_items(self.connections[0], keys)
442            self.num_completed += completed
443        self.deleted = True
444        self.deleted_items = self.num_delete
445
446    def print_validation_stats(self):
447        if self.missing_key_val:
448            print "Missing keys count: {}".format(self.missing_key_val.__len__())
449            print "Missing keys: {}".format(self.missing_key_val.__str__())
450        if self.wrong_keys:
451            print "Mismatch keys count: {}".format(self.wrong_keys.__len__())
452            print "Mismatch keys: {}".format(self.wrong_keys.__str__())
453        if self.replicate_to > 0 and self.missing_key_val_replica:
454            print "Missing keys count from replicas: {}".format(self.missing_key_val_replica.__len__())
455            print "Missing keys from replicas: {}".format(self.missing_key_val_replica.__str__())
456        if self.replicate_to > 0 and self.wrong_keys_replica:
457            print "Mismatch keys count from replicas: {}".format(self.wrong_keys_replica.__len__())
458            print "Mismatch keys from replicas: {}".format(self.wrong_keys_replica.__str__())
459        if not self.missing_key_val and not self.wrong_keys:
460            print "Validated documents: {}".format(self.num_items)
461
462    def print_upsert_stats(self):
463        print "Upserted documents: {}".format(self.num_completed)
464
465    def print_delete_stats(self):
466        print "Deleted documents: {}".format(self.num_completed)
467
468    def generate(self):
469        """
470        Load generation phase. Based on input, perform various load generation and validations.
471        :return:
472        """
473        if self.validation == 3:
474            self.current_update_counter = self.previous_mutation_count
475            self.single_validate_pass()
476            self.print_validation_stats()
477            return
478        for i in range(0, self.mutations):
479            self.current_update_counter = i + self.previous_mutation_count
480            self.num_completed = 0
481            self.single_upsert_pass()
482            self.print_upsert_stats()
483            if self.validation == 2:
484                self.single_validate_pass()
485                self.print_validation_stats()
486        if self.delete:
487            self.num_completed = 0
488            self.single_delete_pass()
489            self.print_delete_stats()
490            if self.validation == 2:
491                self.single_validate_pass()
492                self.print_validation_stats()
493        if self.validation == 1:
494            self.single_validate_pass()
495            self.print_validation_stats()
496
497if __name__ == "__main__":
498    args = parseArguments()
499    rate = int(args.rate_limit)
500    #  Calculate the number of workers required. Since each thread in a worker could take atleast 1 sec and each
501    #  thread loads batch_size documents each time, number of workers can be calculated given rate and batch size.
502    num_workers = int(args.workers)
503    workers = []
504    arguments = []
505    #  Split the items to be loaded among the workers.
506    for i in range(0, num_workers):
507        #  Calculate the number of items in each worker and the start documents for each worker.
508        arg = deepcopy(args)
509        items = int(arg.count) / num_workers
510        start = items * i + int(arg.start_document)
511        if i == num_workers - 1:
512            items = int(arg.count) - (items * i)
513        arg.count = items
514        arg.start_document = start
515        arguments.append(arg)
516    if args.delete:
517        #    If there are deletes, divide the number of documents to be divided among the workers.
518        #    Calculate the workers that should perform the deletes and the workers that should not.
519        #    ex: If num_delete=2500 and count=10000 and num_workers=10, only first 3 workers will have deletes,
520        #    i.e first 2 workers will have 1000 deletes and 3rd will have 500 deletes.
521        items_per_worker = int(args.count) / num_workers
522        args_counter = 0
523        num_delete = int(args.num_delete)
524        for i in range(0, num_delete, items_per_worker):
525            arg = arguments[args_counter]
526            if num_delete < items_per_worker:
527                arg.num_delete = num_delete
528            elif i + items_per_worker > num_delete:
529                arg.num_delete = i + items_per_worker - num_delete
530            else:
531                arg.num_delete = items_per_worker
532            args_counter += 1
533        for arg in arguments[args_counter:]:
534            arg.delete = False
535            arg.num_delete = 0
536    for i in range(0, num_workers):
537        arg = arguments[i]
538        doc_loader = DocumentGenerator(arg)
539        worker = multiprocessing.Process(target=doc_loader.generate, name="Worker {}".format(i))
540        worker.daemon = True
541        workers.append(worker)
542    for worker in workers:
543        worker.start()
544    for worker in workers:
545        worker.join()
546
547