1import copy
2import hashlib
3import json
4import random
5import socket
6import threading
7import time
8import uuid
9import zlib
10from multiprocessing.process import BaseProcess as Process
11from multiprocessing.queues import Queue
12from queue import Queue
13from random import Random
14
15import crc32
16import logger
17import memcacheConstants
18from mc_ascii_client import MemcachedAsciiClient
19from mc_bin_client import MemcachedClient, MemcachedError
20from membase.api.rest_client import RestConnection, RestHelper, Bucket, vBucket
21from memcacheConstants import ERR_NOT_MY_VBUCKET, ERR_ETMPFAIL, ERR_EINVAL, ERR_2BIG
22from memcached.helper.old_kvstore import ClientKeyValueStore
23from perf_engines import mcsoda
24
25from TestInput import TestInputServer
26from TestInput import TestInputSingleton
27from lib.Cb_constants.CBServer import CbServer
28
29log = logger.Logger.get_logger()
30try:
31    import concurrent.futures
32except ImportError:
33    log.warning("{0} {1}".format("Can not import concurrent module.",
34                                 "Data for each server will be loaded/retrieved sequentially"))
35
36
37class MemcachedClientHelperExcetion(Exception):
38    def __init__(self, errorcode, message):
39        Exception.__init__(self, errorcode, message)
40        self._message = message
41        self.errorcode = errorcode
42        self._args = (errorcode, message)
43
44
45class MemcachedClientHelper(object):
46    # value_sizes {10:0.1,20:0.2:40:0.8}
47
48    @staticmethod
49    def create_threads(servers=None,
50                       name='default',
51                       ram_load_ratio=-1,
52                       number_of_items=-1,
53                       value_size_distribution=None,
54                       number_of_threads=50,
55                       override_vBucketId=-1,
56                       write_only=False,
57                       async_write=False,
58                       delete_ratio=0,
59                       expiry_ratio=0,
60                       scope=None,
61                       collection=None):
62        log = logger.Logger.get_logger()
63        if not servers:
64            raise MemcachedClientHelperExcetion(errorcode='invalid_argument',
65                                                message="servers is not set")
66        if ram_load_ratio < 0 and number_of_items < 0:
67            raise MemcachedClientHelperExcetion(errorcode='invalid_argument',
68                                                message="ram_load_ratio or number_of_items must be specified")
69        if not value_size_distribution:
70            value_size_distribution = {16: 0.25, 128: 0.25, 512: 0.25, 1024: 0.25}
71
72        list = []
73
74        if ram_load_ratio >= 0:
75            info = RestConnection(servers[0]).get_bucket(name)
76            emptySpace = info.stats.ram - info.stats.memUsed
77            space_to_fill = (int((emptySpace * ram_load_ratio) / 100.0))
78            log.info('space_to_fill : {0}, emptySpace : {1}'.format(space_to_fill, emptySpace))
79            for size, probability in list(value_size_distribution.items()):
80                how_many = int(space_to_fill / (size + 250) * probability)
81                payload_generator = DocumentGenerator.make_docs(number_of_items,
82                                                                {"name": "user-${prefix}",
83                                                                 "payload": "memcached-json-${prefix}-${padding}",
84                                                                 "size": size, "seed": str(uuid.uuid4())})
85                list.append({'size': size, 'value': payload_generator, 'how_many': how_many})
86        else:
87            for size, probability in value_size_distribution.items():
88                how_many = ((number_of_items // number_of_threads) * probability)
89                payload_generator = DocumentGenerator.make_docs(number_of_items,
90                                                                {"name": "user-${prefix}",
91                                                                 "payload": "memcached-json-${prefix}-${padding}",
92                                                                 "size": size, "seed": str(uuid.uuid4())})
93                list.append({'size': size, 'value': payload_generator, 'how_many': how_many})
94
95        for item in list:
96            item['how_many'] //= int(number_of_threads)
97            # at least one element for each value size
98            if item['how_many'] < 1:
99                item['how_many'] = 1
100            msg = "each thread will send {0} items with value of size : {1}"
101            log.info(msg.format(item['how_many'], item['size']))
102
103        threads = []
104        for i in range(0, int(number_of_threads)):
105            # choose one of the servers random
106            thread = WorkerThread(serverInfo=MemcachedClientHelper.random_pick(servers),
107                                  name=name,
108                                  values_list=list,
109                                  override_vBucketId=override_vBucketId,
110                                  write_only=write_only,
111                                  async_write=async_write,
112                                  delete_ratio=delete_ratio,
113                                  expiry_ratio=expiry_ratio,
114                                  scope=scope,
115                                  collection=collection)
116            threads.append(thread)
117
118        return threads
119
120    @staticmethod
121    def create_threads_for_load_bucket(serverInfo=None,
122                                       name='default',
123                                       ram_load_ratio=-1,
124                                       number_of_items=-1,
125                                       value_size_distribution=None,
126                                       number_of_threads=50,
127                                       override_vBucketId=-1,
128                                       write_only=False,
129                                       delete_ratio=0,
130                                       expiry_ratio=0,
131                                       scope=None,
132                                       collection=None):
133        log = logger.Logger.get_logger()
134        if not serverInfo:
135            raise MemcachedClientHelperExcetion(errorcode='invalid_argument',
136                                                message="serverInfo is not set")
137        if ram_load_ratio < 0 and number_of_items < 0:
138            raise MemcachedClientHelperExcetion(errorcode='invalid_argument',
139                                                message="ram_load_ratio or number_of_items must be specified")
140        if not value_size_distribution:
141            value_size_distribution = {16: 0.33, 128: 0.33, 1024: 0.33}
142
143        list = []
144
145        if ram_load_ratio >= 0:
146            info = RestConnection(serverInfo).get_bucket(name)
147            emptySpace = info.stats.ram - info.stats.memUsed
148            space_to_fill = (int((emptySpace * ram_load_ratio) / 100.0))
149            log.info('space_to_fill : {0}, emptySpace : {1}'.format(space_to_fill, emptySpace))
150            for size, probability in list(value_size_distribution.items()):
151                # let's assume overhead per key is 64 bytes ?
152                how_many = int(space_to_fill / (size + 250) * probability)
153                payload = MemcachedClientHelper.create_value('*', size)
154                list.append({'size': size, 'value': payload, 'how_many': how_many})
155        else:
156            for size, probability in list(value_size_distribution.items()):
157                how_many = (number_of_items * probability)
158                payload = MemcachedClientHelper.create_value('*', size)
159                list.append({'size': size, 'value': payload, 'how_many': how_many})
160
161        for item in list:
162            item['how_many'] //= int(number_of_threads)
163            # at least one element for each value size
164            if item['how_many'] < 1:
165                item['how_many'] = 1
166            msg = "each thread will send {0} items with value of size : {1}"
167            log.info(msg.format(item['how_many'], item['size']))
168
169        threads = []
170        for i in range(0, int(number_of_threads)):
171            thread = WorkerThread(serverInfo=serverInfo,
172                                  name=name,
173                                  values_list=list,
174                                  override_vBucketId=override_vBucketId,
175                                  write_only=write_only,
176                                  delete_ratio=delete_ratio,
177                                  expiry_ratio=expiry_ratio,
178                                  scope=scope,
179                                  collection=collection)
180            threads.append(thread)
181
182        return threads
183
184    @staticmethod
185    def load_bucket_and_return_the_keys(servers=None,
186                                        name='default',
187                                        ram_load_ratio=-1,
188                                        number_of_items=-1,
189                                        value_size_distribution=None,
190                                        number_of_threads=50,
191                                        override_vBucketId=-1,
192                                        write_only=False,
193                                        delete_ratio=0,
194                                        expiry_ratio=0,
195                                        scope=None,
196                                        collection=None):
197        inserted_keys = []
198        rejected_keys = []
199        log = logger.Logger.get_logger()
200        threads = MemcachedClientHelper.create_threads(servers,
201                                                       name,
202                                                       ram_load_ratio,
203                                                       number_of_items,
204                                                       value_size_distribution,
205                                                       number_of_threads,
206                                                       override_vBucketId,
207                                                       write_only=write_only,
208                                                       delete_ratio=delete_ratio,
209                                                       expiry_ratio=expiry_ratio,
210                                                       scope=scope,
211                                                       collection=collection)
212
213        # we can start them!
214        for thread in threads:
215            thread.start()
216        log.info("waiting for all worker thread to finish their work...")
217        [thread.join() for thread in threads]
218        log.info("worker threads are done...")
219
220        inserted_count = 0
221        rejected_count = 0
222        deleted_count = 0
223        expired_count = 0
224        for thread in threads:
225            t_inserted, t_rejected = thread.keys_set()
226            inserted_count += thread.inserted_keys_count()
227            rejected_count += thread.rejected_keys_count()
228            deleted_count += thread._delete_count
229            expired_count += thread._expiry_count
230            inserted_keys.extend(t_inserted)
231            rejected_keys.extend(t_rejected)
232        msg = "inserted keys count : {0} , rejected keys count : {1}"
233        log.info(msg.format(inserted_count, rejected_count))
234        msg = "deleted keys count : {0} , expired keys count : {1}"
235        log.info(msg.format(deleted_count, expired_count))
236        return inserted_keys, rejected_keys
237
238    @staticmethod
239    def load_bucket(servers,
240                    name='default',
241                    ram_load_ratio=-1,
242                    number_of_items=-1,
243                    value_size_distribution=None,
244                    number_of_threads=50,
245                    override_vBucketId=-1,
246                    write_only=False,
247                    scope=None,
248                    collection=None):
249        inserted_keys_count = 0
250        rejected_keys_count = 0
251        log = logger.Logger.get_logger()
252        threads = MemcachedClientHelper.create_threads(servers,
253                                                       name,
254                                                       ram_load_ratio,
255                                                       number_of_items,
256                                                       value_size_distribution,
257                                                       number_of_threads,
258                                                       override_vBucketId,
259                                                       write_only,
260                                                       scope=scope,
261                                                       collection=collection)
262        # we can start them!
263        for thread in threads:
264            thread.start()
265        log.info("waiting for all worker thread to finish their work...")
266        [thread.join() for thread in threads]
267        log.info("worker threads are done...")
268        for thread in threads:
269            inserted_keys_count += thread.inserted_keys_count()
270            rejected_keys_count += thread.rejected_keys_count()
271        msg = "inserted keys count : {0} , rejected keys count : {1}"
272        log.info(msg.format(inserted_keys_count, rejected_keys_count))
273        return inserted_keys_count, rejected_keys_count
274
275    @staticmethod
276    def create_value(pattern, size):
277        return (pattern * (size // len(pattern))) + pattern[0:(size % len(pattern))]
278
279    @staticmethod
280    def random_pick(list):
281        if list:
282            if len(list) > 1:
283                return list[Random().randint(0, len(list) - 1)]
284            return list[0]
285            # raise array empty ?
286        return None
287
288    @staticmethod
289    def direct_client(server, bucket, timeout=30, admin_user=None, admin_pass=None):
290        if admin_user is None:
291            admin_user = CbServer.rest_username
292        if admin_pass is None:
293            admin_pass = CbServer.rest_password
294        log = logger.Logger.get_logger()
295        rest = RestConnection(server)
296        node = None
297        try:
298            node = rest.get_nodes_self()
299        except ValueError as e:
300            log.info("could not connect to server {0}, will try scanning all nodes".format(server))
301        if not node:
302            nodes = rest.get_nodes()
303            for n in nodes:
304                if n.ip == server.ip and n.port == server.port:
305                    node = n
306        if CbServer.use_https:
307            node.memcached = CbServer.ssl_memcached_port
308        if isinstance(server, dict):
309            log.info("dict:{0}".format(server))
310            log.info("creating direct client {0}:{1} {2}".format(server["ip"], node.memcached, bucket))
311        else:
312            log.info("creating direct client {0}:{1} {2}".format(server.ip, node.memcached, bucket))
313        RestHelper(rest).vbucket_map_ready(bucket, 60)
314        vBuckets = RestConnection(server).get_vbuckets(bucket)
315        client = None
316        if isinstance(server, dict):
317            client = MemcachedClient(server["ip"], node.memcached, timeout=timeout)
318        else:
319            client = MemcachedClient(server.ip, node.memcached, timeout=timeout)
320        if vBuckets != None:
321            client.vbucket_count = len(vBuckets)
322        else:
323            client.vbucket_count = 0
324        bucket_info = rest.get_bucket(bucket)
325        # todo raise exception for not bucket_info
326
327        cluster_compatibility = rest.check_cluster_compatibility("5.0")
328        if cluster_compatibility is None:
329            pre_spock = True
330        else:
331            pre_spock = not cluster_compatibility
332        if pre_spock:
333            log.info("Atleast 1 of the server is on pre-spock "
334                     "version. Using the old ssl auth to connect to "
335                     "bucket.")
336            client.sasl_auth_plain(bucket_info.name.encode('ascii'),
337                                   bucket_info.saslPassword.encode('ascii'))
338        else:
339            if isinstance(bucket, Bucket):
340                bucket = bucket.name
341            #bucket = bucket.encode('ascii')
342            client.sasl_auth_plain(admin_user, admin_pass)
343            client.bucket_select(bucket)
344
345        return client
346
347    @staticmethod
348    def proxy_client(server, bucket, timeout=30, force_ascii=False):
349        # for this bucket on this node what is the proxy ?
350        rest = RestConnection(server)
351        log = logger.Logger.get_logger()
352        bucket_info = rest.get_bucket(bucket)
353        nodes = bucket_info.nodes
354
355        if (TestInputSingleton.input and "ascii" in TestInputSingleton.input.test_params \
356            and TestInputSingleton.input.test_params["ascii"].lower() == "true") \
357                or force_ascii:
358            ascii = True
359        else:
360            ascii = False
361        for node in nodes:
362            RestHelper(rest).vbucket_map_ready(bucket, 60)
363            vBuckets = rest.get_vbuckets(bucket)
364            if ascii:
365                log = logger.Logger.get_logger()
366                log.info("creating ascii client {0} {2}".format(server.ip, bucket))
367                client = MemcachedAsciiClient(server.ip, timeout=timeout)
368            else:
369                log = logger.Logger.get_logger()
370                if isinstance(server, dict):
371                    log.info("creating proxy client {0} {2}".format(server["ip"], bucket))
372                    client = MemcachedClient(server["ip"], timeout=timeout)
373                else:
374                    log.info("creating proxy client {0} {2}".format(server.ip, bucket))
375                    client = MemcachedClient(server.ip, timeout=timeout)
376                client.vbucket_count = len(vBuckets)
377            return client
378        if isinstance(server, dict):
379            raise Exception("unable to find {0} in get_nodes()".format(server["ip"]))
380        else:
381            raise Exception("unable to find {0} in get_nodes()".format(server.ip))
382
383    @staticmethod
384    def flush_bucket(server, bucket, admin_user=None, admin_pass=None):
385        # if memcached throws OOM error try again ?
386        log = logger.Logger.get_logger()
387        retry_attempt = 5
388        while retry_attempt > 0:
389            client = MemcachedClientHelper.direct_client(server, bucket, admin_user=admin_user, admin_pass=admin_pass)
390            try:
391                client.flush()
392                log.info('flushed bucket {0}...'.format(bucket))
393                break
394            except MemcachedError:
395                retry_attempt -= 1
396                log.info('flush raised memcached error trying again in 5 seconds...')
397                time.sleep(5)
398            finally:
399                client.close()
400        return
401
402
403class MutationThread(threading.Thread):
404    def run(self, scope=None, collection=None):
405        values = DocumentGenerator.make_docs(len(self.keys),
406                                             {"name": "user-${prefix}",
407                                              "payload": "memcached-json-${prefix}-${padding}",
408                                              "size": 1024, "seed": self.seed})
409        client = MemcachedClientHelper.proxy_client(self.serverInfo, self.name)
410        counter = 0
411        for value in values:
412            try:
413                if self.op == "set":
414                    client.set(self.keys[counter], 0, 0, value, scope=scope, collection=collection)
415                    self._mutated_count += 1
416            except MemcachedError:
417                self._rejected_count += 1
418                self._rejected_keys.append({"key": self.keys[counter], "value": value})
419            except Exception as e:
420                self.log.info("unable to mutate {0} due to {1}".format(self.keys[counter], e))
421                self._rejected_count += 1
422                self._rejected_keys.append({"key": self.keys[counter], "value": value})
423                client.close()
424                client = MemcachedClientHelper.proxy_client(self.serverInfo, self.name)
425            counter = counter + 1
426        self.log.info("mutation failed {0} times".format(self._rejected_count))
427        client.close()
428
429    def __init__(self, serverInfo,
430                 keys,
431                 op,
432                 seed,
433                 name='default',
434                 scope=None,
435                 collection=None):
436        threading.Thread.__init__(self)
437        self.log = logger.Logger.get_logger()
438        self.serverInfo = serverInfo
439        self.name = name
440        self.scope = scope,
441        self.collection = collection
442        self.keys = keys
443        self.op = op
444        self.seed = seed
445        self._mutated_count = 0
446        self._rejected_count = 0
447        self._rejected_keys = []
448
449
450class ReaderThread(object):
451    def __init__(self, info, keyset, queue, scope=None, collection=None):
452        self.info = info
453        self.log = logger.Logger.get_logger()
454        self.error_seen = 0
455        self.keyset = keyset
456        self.aborted = False
457        self.queue = queue
458        self.scope = scope
459        self.collection = collection
460
461    def abort(self):
462        self.aborted = True
463
464    def _saw_error(self, key):
465        #        error_msg = "unable to get key {0}"
466        self.error_seen += 1
467
468    #        if self.error_seen < 500:
469    #            self.log.error(error_msg.format(key))
470
471    def start(self):
472        client = MemcachedClientHelper.direct_client(self.info["server"], self.info['name'])
473        time.sleep(5)
474        while self.queue.empty() and self.keyset:
475            selected = MemcachedClientHelper.random_pick(self.keyset)
476            selected['how_many'] -= 1
477            if selected['how_many'] < 1:
478                self.keyset.remove(selected)
479            key = "{0}-{1}-{2}".format(self.info['baseuuid'],
480                                       selected['size'],
481                                       int(selected['how_many']))
482            try:
483                client.send_get(key, self.scope, self.collection)
484            except Exception:
485                self._saw_error(key)
486                #        self.log.warning("attempted to get {0} keys before they are set".format(self.error_seen))
487        client.close()
488
489
490# mutation ? let' do two cycles , first run and then try to mutate all those itesm
491# and return
492class WorkerThread(threading.Thread):
493    # too flags : stop after x errors
494    # slow down after every seeing y errors
495    # value_list is a list of document generators
496    def __init__(self,
497                 serverInfo,
498                 name,
499                 values_list,
500                 ignore_how_many_errors=5000,
501                 override_vBucketId=-1,
502                 terminate_in_minutes=120,
503                 write_only=False,
504                 async_write=False,
505                 delete_ratio=0,
506                 expiry_ratio=0,
507                 scope=None,
508                 collection=None):
509        threading.Thread.__init__(self)
510        self.log = logger.Logger.get_logger()
511        self.serverInfo = serverInfo
512        self.name = name
513        self.scope = scope
514        self.collection = collection
515        self.values_list = []
516        self.values_list.extend(copy.deepcopy(values_list))
517        self._value_list_copy = []
518        self._value_list_copy.extend(copy.deepcopy(values_list))
519        self._inserted_keys_count = 0
520        self._rejected_keys = []
521        self._rejected_keys_count = 0
522        self._delete_ratio = delete_ratio
523        self._expiry_ratio = expiry_ratio
524        self._delete_count = 0
525        self._expiry_count = 0
526        self._delete = []
527        self.ignore_how_many_errors = ignore_how_many_errors
528        self.override_vBucketId = override_vBucketId
529        self.terminate_in_minutes = terminate_in_minutes
530        self._base_uuid = uuid.uuid4()
531        self.queue = Queue()
532        # let's create a read_thread
533        self.info = {'server': serverInfo,
534                     'name': self.name,
535                     'baseuuid': self._base_uuid,
536                     'scope': self.scope,
537                     'collection': self.collection}
538        self.write_only = write_only
539        self.aborted = False
540        self.async_write = async_write
541
542    def inserted_keys_count(self):
543        return self._inserted_keys_count
544
545    def rejected_keys_count(self):
546        return self._rejected_keys_count
547
548    # smart functin that gives you sth you can use to
549    # get inserted keys
550    # we should just expose an iterator instead which
551    # generates the key,values on fly
552    def keys_set(self):
553        # let's construct the inserted keys set
554        # TODO: hard limit , let's only populated up to 1 million keys
555        inserted_keys = []
556        for item in self._value_list_copy:
557            for i in range(0, (int(item['how_many']))):
558                key = "{0}-{1}-{2}".format(self._base_uuid, item['size'], i)
559                if key not in self._rejected_keys:
560                    inserted_keys.append(key)
561                if len(inserted_keys) > 2 * 1024 * 1024:
562                    break
563        return inserted_keys, self._rejected_keys
564
565    def run(self):
566        msg = "starting a thread to set keys mixed set-get ? {0} and using async_set ? {1}"
567        msg = msg.format(self.write_only, self.async_write)
568        self.log.info(msg)
569        try:
570            awareness = VBucketAwareMemcached(RestConnection(self.serverInfo), self.name)
571            client = None
572        except Exception as ex:
573            self.log.info("unable to create memcached client due to {0}. stop thread...".format(ex))
574            import traceback
575            traceback.print_exc()
576            return
577        # keeping keys in the memory is not such a good idea because
578        # we run out of memory so best is to just keep a counter ?
579        # if someone asks for the keys we can give them the formula which is
580        # baseuuid-{0}-{1} , size and counter , which is between n-0 except those
581        # keys which were rejected
582        # let's print out some status every 5 minutes..
583
584        if not self.write_only:
585            self.reader = Process(target=start_reader_process, args=(self.info, self._value_list_copy, self.queue))
586            self.reader.start()
587        start_time = time.time()
588        last_reported = start_time
589        backoff_count = 0
590        while len(self.values_list) > 0 and not self.aborted:
591            selected = MemcachedClientHelper.random_pick(self.values_list)
592            selected['how_many'] -= 1
593            if selected['how_many'] < 1:
594                self.values_list.remove(selected)
595            if (time.time() - start_time) > self.terminate_in_minutes * 60:
596                self.log.info("its been more than {0} minutes loading data. stopping the process..".format(
597                    self.terminate_in_minutes))
598                break
599            else:
600                # every two minutes print the status
601                if time.time() - last_reported > 2 * 60:
602                    awareness.done()
603                    try:
604                        awareness = VBucketAwareMemcached(RestConnection(self.serverInfo), self.name)
605                    except Exception:
606                        # vbucket map is changing . sleep 5 seconds
607                        time.sleep(5)
608                        awareness = VBucketAwareMemcached(RestConnection(self.serverInfo), self.name)
609                    self.log.info("now connected to {0} memcacheds".format(len(awareness.memcacheds)))
610                    last_reported = time.time()
611                    for item in self.values_list:
612                        self.log.info(
613                            '{0} keys (each {1} bytes) more to send...'.format(item['how_many'], item['size']))
614
615            key = "{0}-{1}-{2}".format(self._base_uuid,
616                                       selected['size'],
617                                       int(selected['how_many']))
618            client = awareness.memcached(key)
619            if not client:
620                self.log.error("client should not be null")
621            value = "*"
622            try:
623                value = next(selected["value"])
624            except StopIteration:
625                pass
626            try:
627                if self.override_vBucketId >= 0:
628                    client.vbucketId = self.override_vBucketId
629                if self.async_write:
630                    client.send_set(key, 0, 0, value, self.scope, self.collection)
631                else:
632                    client.set(key, 0, 0, value, self.scope, self.collection)
633                self._inserted_keys_count += 1
634                backoff_count = 0
635                # do expiry sets, 30 second expiry time
636                if Random().random() < self._expiry_ratio:
637                    client.set(key + "-exp", 30, 0, value, self.scope, self.collection)
638                    self._expiry_count += 1
639                    # do deletes if we have 100 pending
640                # at the end delete the remaining
641                if len(self._delete) >= 100:
642                    #                    self.log.info("deleting {0} keys".format(len(self._delete)))
643                    for key_del in self._delete:
644                        client.delete(key_del, self.scope, self.collection)
645                    self._delete = []
646                    # do delete sets
647                if Random().random() < self._delete_ratio:
648                    client.set(key + "-del", 0, 0, value, self.scope, self.collection)
649                    self._delete.append(key + "-del")
650                    self._delete_count += 1
651            except MemcachedError as error:
652                awareness.done()
653                try:
654                    awareness = VBucketAwareMemcached(RestConnection(self.serverInfo), self.name)
655                except Exception:
656                    # vbucket map is changing . sleep 5 seconds
657                    time.sleep(5)
658                    awareness = VBucketAwareMemcached(RestConnection(self.serverInfo), self.name)
659                self.log.info("now connected to {0} memcacheds".format(len(awareness.memcacheds)))
660                if isinstance(self.serverInfo, dict):
661                    self.log.error(
662                        "memcached error {0} {1} from {2}".format(error.status, error.msg, self.serverInfo["ip"]))
663                else:
664                    self.log.error(
665                        "memcached error {0} {1} from {2}".format(error.status, error.msg, self.serverInfo.ip))
666                if error.status == 134:
667                    backoff_count += 1
668                    if backoff_count < 5:
669                        backoff_seconds = 15 * backoff_count
670                    else:
671                        backoff_seconds = 2 * backoff_count
672                    self.log.info("received error # 134. backing off for {0} sec".format(backoff_seconds))
673                    time.sleep(backoff_seconds)
674
675                self._rejected_keys_count += 1
676                self._rejected_keys.append({"key": key, "value": value})
677                if len(self._rejected_keys) > self.ignore_how_many_errors:
678                    break
679            except Exception as ex:
680                awareness.done()
681                try:
682                    awareness = VBucketAwareMemcached(RestConnection(self.serverInfo), self.name)
683                except Exception:
684                    awareness = VBucketAwareMemcached(RestConnection(self.serverInfo), self.name)
685                self.log.info("now connected to {0} memcacheds".format(len(awareness.memcacheds)))
686                if isinstance(self.serverInfo, dict):
687                    self.log.error("error {0} from {1}".format(ex, self.serverInfo["ip"]))
688                    import traceback
689                    traceback.print_exc()
690                else:
691                    self.log.error("error {0} from {1}".format(ex, self.serverInfo.ip))
692                self._rejected_keys_count += 1
693                self._rejected_keys.append({"key": key, "value": value})
694                if len(self._rejected_keys) > self.ignore_how_many_errors:
695                    break
696
697                    # before closing the session let's try sending those items again
698        retry = 3
699        while retry > 0 and self._rejected_keys_count > 0:
700            rejected_after_retry = []
701            self._rejected_keys_count = 0
702            for item in self._rejected_keys:
703                try:
704                    if self.override_vBucketId >= 0:
705                        client.vbucketId = self.override_vBucketId
706                    if self.async_write:
707                        client.send_set(item["key"], 0, 0, item["value"], self.scope, self.collection)
708                    else:
709                        client.set(item["key"], 0, 0, item["value"], self.scope, self.collection)
710                    self._inserted_keys_count += 1
711                except MemcachedError:
712                    self._rejected_keys_count += 1
713                    rejected_after_retry.append({"key": item["key"], "value": item["value"]})
714                    if len(rejected_after_retry) > self.ignore_how_many_errors:
715                        break
716            self._rejected_keys = rejected_after_retry
717            retry = -1
718            # clean up the rest of the deleted keys
719            if len(self._delete) > 0:
720                #                self.log.info("deleting {0} keys".format(len(self._delete)))
721                for key_del in self._delete:
722                    client.delete(key_del, self.scope, self.collection)
723                self._delete = []
724
725            self.log.info("deleted {0} keys".format(self._delete_count))
726            self.log.info("expiry {0} keys".format(self._expiry_count))
727            #        client.close()
728        awareness.done()
729        if not self.write_only:
730            self.queue.put_nowait("stop")
731            self.reader.join()
732
733    def _initialize_memcached(self):
734        pass
735
736    def _set(self):
737        pass
738
739    def _handle_error(self):
740        pass
741        # if error is memcached error oom related let's do a sleep
742
743    def _time_to_stop(self):
744        return self.aborted or len(self._rejected_keys) > self.ignore_how_many_errors
745
746
747class VBucketAwareMemcached(object):
748    def __init__(self, rest, bucket, info=None, scope=None, collection=None):
749        self.log = logger.Logger.get_logger()
750        self.info = info
751        self.bucket = bucket
752        if isinstance(bucket, Bucket):
753            self.bucket = bucket.name
754        self.memcacheds = {}
755        self.vBucketMap = {}
756        self.vBucketMapReplica = {}
757        self.rest = rest
758        self.reset(rest)
759        self.scope = scope
760        self.collections = collection
761
762    def reset(self, rest=None):
763        if not rest:
764            self.rest = RestConnection(self.info)
765        m, v, r = self.request_map(self.rest, self.bucket)
766        self.memcacheds = m
767        self.vBucketMap = v
768        self.vBucketMapReplica = r
769
770    def reset_vbuckets(self, rest, vbucketids_set, forward_map=None, admin_user=None, admin_pass=None):
771        if not forward_map:
772            forward_map = rest.get_bucket(self.bucket, num_attempt=2).forward_map
773            if not forward_map:
774                self.reset(rest)
775                forward_map = rest.get_vbuckets(self.bucket)
776        nodes = rest.get_nodes()
777        for vBucket in forward_map:
778            if vBucket.id in vbucketids_set:
779                self.vBucketMap[vBucket.id] = vBucket.master
780                masterIp = vBucket.master.rsplit(":", 1)[0]
781                masterPort = int(vBucket.master.rsplit(":", 1)[1])
782                if self.vBucketMap[vBucket.id] not in self.memcacheds:
783                    server = TestInputServer()
784                    server.rest_username = rest.username
785                    server.rest_password = rest.password
786                    for node in nodes:
787                        if node.ip == masterIp and node.memcached == masterPort:
788                            server.port = node.port
789                    server.ip = masterIp
790                    if CbServer.use_https:
791                        server.port = CbServer.ssl_port_map.get(str(server.port), str(server.port))
792                    self.log.info("Received forward map, reset vbucket map, new direct_client")
793                    self.memcacheds[vBucket.master] = MemcachedClientHelper.direct_client(server, self.bucket,
794                                                                                          admin_user=admin_user,
795                                                                                          admin_pass=admin_pass)
796                # if no one is using that memcached connection anymore just close the connection
797                used_nodes = {self.vBucketMap[vb_name] for vb_name in self.vBucketMap}
798                rm_clients = []
799                for memcache_con in self.memcacheds:
800                    if memcache_con not in used_nodes:
801                        rm_clients.append(memcache_con)
802                for rm_cl in rm_clients:
803                    self.memcacheds[rm_cl].close()
804                    del self.memcacheds[rm_cl]
805                self.vBucketMapReplica[vBucket.id] = vBucket.replica
806                for replica in vBucket.replica:
807                    self.add_memcached(replica, self.memcacheds, self.rest, self.bucket, admin_user, admin_pass)
808        return True
809
810    def request_map(self, rest, bucket):
811        memcacheds = {}
812        vBucketMap = {}
813        vBucketMapReplica = {}
814        vb_ready = RestHelper(rest).vbucket_map_ready(bucket, 60)
815        if not vb_ready:
816            raise Exception("vbucket map is not ready for bucket {0}".format(bucket))
817        vBuckets = rest.get_vbuckets(bucket)
818        for vBucket in vBuckets:
819            vBucketMap[vBucket.id] = vBucket.master
820            self.add_memcached(vBucket.master, memcacheds, rest, bucket)
821
822            vBucketMapReplica[vBucket.id] = vBucket.replica
823            for replica in vBucket.replica:
824                self.add_memcached(replica, memcacheds, rest, bucket)
825        return memcacheds, vBucketMap, vBucketMapReplica
826
827    def add_memcached(self, server_str, memcacheds, rest, bucket, admin_user=None, admin_pass=None):
828        if not server_str in memcacheds:
829            serverIp = server_str.rsplit(":", 1)[0]
830            serverPort = int(server_str.rsplit(":", 1)[1])
831            nodes = rest.get_nodes()
832            server = TestInputServer()
833            server.ip = serverIp
834            if TestInputSingleton.input.param("alt_addr", False):
835                server.ip = rest.get_ip_from_ini_file()
836            server.port = rest.port
837            server.rest_username = rest.username
838            server.rest_password = rest.password
839            try:
840                for node in nodes:
841                    if node.ip == serverIp and (node.memcached == serverPort or
842                                                node.memcached == CbServer.ssl_memcached_port):
843                        if server_str not in memcacheds:
844                            server.port = node.port
845                            if CbServer.use_https:
846                                server.port = CbServer.ssl_port_map.get(str(server.port), str(server.port))
847                            memcacheds[server_str] = \
848                                MemcachedClientHelper.direct_client(server, bucket, admin_user, admin_pass)
849                            # self.enable_collection(memcacheds[server_str])
850                        break
851            except Exception as ex:
852                msg = "unable to establish connection to {0}. cleanup open connections"
853                self.log.warning(msg.format(serverIp))
854                self.done()
855                raise ex
856
857    def memcached(self, key, replica_index=None, scope=None, collection=None):
858        vBucketId = self._get_vBucket_id(key)
859        if replica_index is None:
860            return self.memcached_for_vbucket(vBucketId)
861        else:
862            return self.memcached_for_replica_vbucket(vBucketId, replica_index)
863
864    def memcached_for_vbucket(self, vBucketId):
865        if vBucketId not in self.vBucketMap:
866            msg = "vbucket map does not have an entry for vb : {0}"
867            raise Exception(msg.format(vBucketId))
868        if self.vBucketMap[vBucketId] not in self.memcacheds:
869            msg = "does not have a mc connection for server : {0}"
870            raise Exception(msg.format(self.vBucketMap[vBucketId]))
871        return self.memcacheds[self.vBucketMap[vBucketId]]
872
873    def memcached_for_replica_vbucket(self, vBucketId, replica_index=0, log_on=False):
874        if vBucketId not in self.vBucketMapReplica:
875            msg = "replica vbucket map does not have an entry for vb : {0}"
876            raise Exception(msg.format(vBucketId))
877        if log_on:
878            self.log.info("replica vbucket: vBucketId {0}, server{1}".format(vBucketId,
879                                                                             self.vBucketMapReplica[vBucketId][
880                                                                                 replica_index]))
881        if self.vBucketMapReplica[vBucketId][replica_index] not in self.memcacheds:
882            msg = "does not have a mc connection for server : {0}"
883            raise Exception(msg.format(self.vBucketMapReplica[vBucketId][replica_index]))
884        return self.memcacheds[self.vBucketMapReplica[vBucketId][replica_index]]
885
886    def not_my_vbucket_memcached(self, key, scope=None, collection=None):
887        vBucketId = self._get_vBucket_id(key)
888        which_mc = self.vBucketMap[vBucketId]
889        for server in self.memcacheds:
890            if server != which_mc:
891                return self.memcacheds[server]
892
893    # DECORATOR
894    def aware_call(func):
895        def new_func(self, key, *args, **keyargs):
896            vb_error = 0
897            while True:
898                try:
899                    return func(self, key, *args, **keyargs)
900                except MemcachedError as error:
901                    if error.status == ERR_NOT_MY_VBUCKET and vb_error < 5:
902                        self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)},
903                                            forward_map=self._parse_not_my_vbucket_error(error))
904                        vb_error += 1
905                    else:
906                        raise error
907                except (EOFError, socket.error) as error:
908                    if "Got empty data (remote died?)" in str(error) or \
909                            "Timeout waiting for socket" in str(error) or \
910                            "Broken pipe" in str(error) or "Connection reset by peer" in str(error) \
911                            and vb_error < 5:
912                        self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)})
913                        vb_error += 1
914                    else:
915                        raise error
916                except BaseException as error:
917                    if vb_error < 5:
918                        self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)})
919                        vb_error += 1
920                    else:
921                        raise error
922
923        return new_func
924
925    # SUBDOCS
926
927    @aware_call
928    def counter_sd(self, key, path, value, expiry=0, opaque=0, cas=0, create=False, scope=None, collection=None):
929        return self._send_op(self.memcached(key).counter_sd, key, path, value, expiry=expiry, opaque=opaque, cas=cas,
930                             create=create, scope=scope, collection=collection)
931
932    @aware_call
933    def array_add_insert_sd(self, key, path, value, expiry=0, opaque=0, cas=0, create=False, scope=None, collection=None):
934        return self._send_op(self.memcached(key).array_add_insert_sd, key, path, value, expiry=expiry, opaque=opaque,
935                             cas=cas, create=create, scope=scope, collection=collection)
936
937    @aware_call
938    def array_add_unique_sd(self, key, path, value, expiry=0, opaque=0, cas=0, create=False, scope=None, collection=None):
939        return self._send_op(self.memcached(key).array_add_unique_sd, key, path, value, expiry=expiry, opaque=opaque,
940                             cas=cas, create=create, scope=scope, collection=collection)
941
942    @aware_call
943    def array_push_first_sd(self, key, path, value, expiry=0, opaque=0, cas=0, create=False, scope=None, collection=None):
944        return self._send_op(self.memcached(key).array_push_first_sd, key, path, value, expiry=expiry, opaque=opaque,
945                             cas=cas, create=create, scope=scope, collection=collection)
946
947    @aware_call
948    def array_push_last_sd(self, key, path, value, expiry=0, opaque=0, cas=0, create=False, scope=None, collection=None):
949        return self._send_op(self.memcached(key).array_push_last_sd, key, path, value, expiry=expiry, opaque=opaque,
950                             cas=cas, create=create, scope=scope, collection=collection)
951
952    @aware_call
953    def replace_sd(self, key, path, value, expiry=0, opaque=0, cas=0, create=False, scope=None, collection=None):
954        return self._send_op(self.memcached(key).replace_sd, key, path, value, expiry=expiry, opaque=opaque, cas=cas,
955                             create=create, scope=scope, collection=collection)
956
957    @aware_call
958    def delete_sd(self, key, path, opaque=0, cas=0, scope=None, collection=None):
959        return self._send_op(self.memcached(key).delete_sd, key, path, opaque=opaque, cas=cas, scope=scope, collection=collection)
960
961    @aware_call
962    def dict_upsert_sd(self, key, path, value, expiry=0, opaque=0, cas=0, create=False, scope=None, collection=None):
963        return self._send_op(self.memcached(key).dict_upsert_sd, key, path, value, expiry=expiry, opaque=opaque,
964                             cas=cas, create=create, scope=scope, collection=collection)
965
966    @aware_call
967    def dict_add_sd(self, key, path, value, expiry=0, opaque=0, cas=0, create=False, scope=None, collection=None):
968        return self._send_op(self.memcached(key).dict_add_sd, key, path, value, expiry=expiry, opaque=opaque, cas=cas,
969                             create=create, scope=scope, collection=collection)
970
971    @aware_call
972    def exists_sd(self, key, path, cas=0, scope=None, collection=None):
973        return self._send_op(self.memcached(key).exists_sd, key, path, cas=cas, scope=scope, collection=collection)
974
975    @aware_call
976    def get_sd(self, key, path, cas=0, scope=None, collection=None):
977        return self._send_op(self.memcached(key).get_sd, key, path, cas=cas, scope=scope, collection=collection)
978
979    @aware_call
980    def set(self, key, exp, flags, value, scope=None, collection=None):
981        return self._send_op(self.memcached(key).set, key, exp, flags, value, scope=None, collection=collection)
982
983    @aware_call
984    def append(self, key, value, scope=None, collection=None):
985        return self._send_op(self.memcached(key).append, key, value, scope=scope, collection=collection)
986
987    @aware_call
988    def observe(self, key, scope=None, collection=None):
989        return self._send_op(self.memcached(key).observe, key, scope=scope, collection=collection)
990
991    @aware_call
992    def observe_seqno(self, key, vbucket_uuid, scope=None, collection=None):
993        return self._send_op(self.memcached(key).observe_seqno, key, vbucket_uuid, scope=scope, collection=collection)
994
995    # This saves a lot of repeated code - the func is the mc bin client function
996
997    def generic_request(self, func, *args):
998        key = args[0]
999        vb_error = 0
1000        while True:
1001            try:
1002                return self._send_op(func, *args)
1003            except MemcachedError as error:
1004                if error.status == ERR_NOT_MY_VBUCKET and vb_error < 5:
1005                    self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)},
1006                                        forward_map=self._parse_not_my_vbucket_error(error))
1007                    vb_error += 1
1008                else:
1009                    raise error
1010            except (EOFError, socket.error) as error:
1011                if "Got empty data (remote died?)" in str(error) or \
1012                        "Timeout waiting for socket" in str(error) or \
1013                        "Broken pipe" in str(error) or "Connection reset by peer" in str(error) \
1014                        and vb_error < 5:
1015                    self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)})
1016                    vb_error += 1
1017                    if vb_error >= 5:
1018                        raise error
1019                else:
1020                    raise error
1021            except BaseException as error:
1022                if vb_error < 5:
1023                    self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)})
1024                    self.log.info("***************resetting vbucket id***********")
1025                    vb_error += 1
1026                else:
1027                    raise error
1028
1029    def get(self, key, scope=None, collection=None):
1030        vb_error = 0
1031        while True:
1032            try:
1033                return self._send_op(self.memcached(key).get, key, scope=scope, collection=collection)
1034            except MemcachedError as error:
1035                if error.status == ERR_NOT_MY_VBUCKET and vb_error < 5:
1036                    self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)},
1037                                        forward_map=self._parse_not_my_vbucket_error(error))
1038                    vb_error += 1
1039                else:
1040                    raise error
1041            except (EOFError, socket.error) as error:
1042                if "Got empty data (remote died?)" in str(error) or \
1043                        "Timeout waiting for socket" in str(error) or \
1044                        "Broken pipe" in str(error) or "Connection reset by peer" in str(error) \
1045                        and vb_error < 5:
1046                    self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)})
1047                    vb_error += 1
1048                else:
1049                    raise error
1050            except BaseException as error:
1051                if vb_error < 5:
1052                    self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)})
1053                    vb_error += 1
1054                else:
1055                    raise error
1056
1057    def getr(self, key, replica_index=0, scope=None, collection=None):
1058        vb_error = 0
1059        while True:
1060            try:
1061                vBucketId = self._get_vBucket_id(key)
1062                return self._send_op(self.memcached(key, replica_index=replica_index).getr, key, scope=scope, collection=collection)
1063            except MemcachedError as error:
1064                if error.status == ERR_NOT_MY_VBUCKET and vb_error < 5:
1065                    self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)},
1066                                        forward_map=self._parse_not_my_vbucket_error(error))
1067                    vb_error += 1
1068                else:
1069                    raise error
1070            except (EOFError, socket.error) as error:
1071                if "Got empty data (remote died?)" in str(error) or \
1072                        "Timeout waiting for socket" in str(error) or \
1073                        "Broken pipe" in str(error) or "Connection reset by peer" in str(error) \
1074                        and vb_error < 5:
1075                    self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)})
1076                    vb_error += 1
1077                else:
1078                    raise error
1079            except BaseException as error:
1080                if vb_error < 5:
1081                    self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)})
1082                    vb_error += 1
1083                else:
1084                    raise error
1085
1086    def setMulti(self, exp, flags, key_val_dic, pause_sec=1, timeout_sec=5, parallel=False, scope=None, collection=None):
1087
1088        if parallel:
1089            try:
1090                import concurrent.futures
1091                self._setMulti_parallel(exp, flags, key_val_dic, pause_sec, timeout_sec, scope=scope, collection=collection)
1092            except ImportError:
1093                self._setMulti_seq(exp, flags, key_val_dic, pause_sec, timeout_sec, scope=scope, collection=collection)
1094        else:
1095            self._setMulti_seq(exp, flags, key_val_dic, pause_sec, timeout_sec, scope=scope, collection=collection)
1096
1097    def _setMulti_seq(self, exp, flags, key_val_dic, pause_sec=1, timeout_sec=5, scope=None, collection=None):
1098        # set keys in their respective vbuckets and identify the server for each vBucketId
1099
1100        server_keyval = self._get_server_keyval_dic(key_val_dic)
1101
1102        # get memcached client against each server and multi set
1103        for server_str, keyval in list(server_keyval.items()):
1104            # if the server has been removed after server_keyval has been gotten
1105            if server_str not in self.memcacheds:
1106                self._setMulti_seq(exp, flags, key_val_dic, pause_sec, timeout_sec, scope=scope, collection=collection)
1107            else:
1108
1109                mc = self.memcacheds[server_str]
1110
1111                errors = self._setMulti_rec(mc, exp, flags, keyval, pause_sec,
1112                                            timeout_sec, self._setMulti_seq, scope=scope, collection=collection)
1113                if errors:
1114                    self.log.error(list(set(str(error) for error in errors)), exc_info=1)
1115                    raise errors[0]
1116
1117    def _setMulti_parallel(self, exp, flags, key_val_dic, pause_sec=1, timeout_sec=5, scope=None, collection=None):
1118        # set keys in their respective vbuckets and identify the server for each vBucketId
1119        server_keyval = self._get_server_keyval_dic(key_val_dic)
1120        # get memcached client against each server and multi set
1121        tasks = []
1122        import concurrent.futures
1123        with concurrent.futures.ThreadPoolExecutor(max_workers=len(server_keyval)) as executor:
1124            for server_str, keyval in list(server_keyval.items()):
1125                mc = self.memcacheds[server_str]
1126                tasks.append(
1127                    executor.submit(self._setMulti_rec, mc, exp, flags, keyval, pause_sec, timeout_sec,
1128                                    self._setMulti_parallel, scope, collection))
1129            errors = []
1130            now = time.time()
1131            for future in concurrent.futures.as_completed(tasks, timeout_sec):
1132                if future.exception() is not None:
1133                    self.log.error("exception in {0} sec".format(time.time() - now))
1134                    raise future.exception()
1135                errors.extend(future.result())
1136
1137            if errors:
1138                self.log.error(list(set(str(error) for error in errors)), exc_info=1)
1139                raise errors[0]
1140
1141    def enable_collection(self, memcached_client, bucket="default"):
1142        memcached_client.bucket_select(bucket)
1143        memcached_client.enable_collections()
1144        memcached_client.hello(memcacheConstants.FEATURE_COLLECTIONS)
1145        memcached_client.get_collections(True)
1146
1147    def _setMulti_rec(self, memcached_client, exp, flags, keyval, pause, timeout, rec_caller_fn,
1148                      scope=None, collection=None):
1149        try:
1150            if collection:
1151                self.enable_collection(memcached_client)
1152            errors = memcached_client.setMulti(exp, flags, keyval, scope=scope, collection=collection)
1153
1154            if not errors:
1155                return []
1156            elif timeout <= 0:
1157                return errors
1158            else:
1159                time.sleep(pause)
1160                self.reset_vbuckets(self.rest, self._get_vBucket_ids(list(keyval.keys())))
1161                try:
1162                    rec_caller_fn(exp, flags, keyval, pause, timeout - pause,
1163                                  scope=scope, collection=collection)  # Start all over again for these key vals.
1164                except MemcachedError as error:
1165                    if error.status == ERR_2BIG:
1166                        self.log.info("<MemcachedError #%d ``%s''>" % (error.status, error.msg))
1167                        return []
1168                    else:
1169                        return [error]
1170                return []  # Note: If used for async,too many recursive threads could get spawn here.
1171        except (EOFError, socket.error) as error:
1172            try:
1173                if "Got empty data (remote died?)" in str(error) or \
1174                        "Timeout waiting for socket" in str(error) or \
1175                        "Broken pipe" in str(error) or \
1176                        "Connection reset by peer" in str(error) \
1177                        and timeout > 0:
1178                    time.sleep(pause)
1179                    self.reset_vbuckets(self.rest, self._get_vBucket_ids(list(keyval.keys())))
1180                    rec_caller_fn(exp, flags, keyval, pause, timeout - pause)
1181                    return []
1182                else:
1183                    return [error]
1184            except AttributeError:
1185                # noinspection PyPackageRequirements
1186                if "Got empty data (remote died?)" in str(error) or \
1187                        "Timeout waiting for socket" in str(error) or \
1188                        "Broken pipe" in str(error) or \
1189                        "Connection reset by peer" in str(error) \
1190                        and timeout > 0:
1191                    time.sleep(pause)
1192                    self.reset_vbuckets(self.rest, self._get_vBucket_ids(list(keyval.keys())))
1193                    rec_caller_fn(exp, flags, keyval, pause, timeout - pause)
1194                    return []
1195                else:
1196                    return [error]
1197
1198        except BaseException as error:
1199            if timeout <= 0:
1200                return [error]
1201            else:
1202                time.sleep(pause)
1203                self.reset_vbuckets(self.rest, self._get_vBucket_ids(list(keyval.keys())))
1204                rec_caller_fn(exp, flags, keyval, pause, timeout - pause,
1205                              scope=scope, collection=collection)  # Please refer above for comments.
1206                return []
1207
1208    def _get_server_keyval_dic(self, key_val_dic):
1209        server_keyval = {}
1210        for key, val in list(key_val_dic.items()):
1211            vBucketId = self._get_vBucket_id(key)
1212            server_str = self.vBucketMap[vBucketId]
1213            if server_str not in server_keyval:
1214                server_keyval[server_str] = {}
1215            server_keyval[server_str][key] = val
1216        return server_keyval
1217
1218    def getMulti(self, keys_lst, pause_sec=1, timeout_sec=5, parallel=True, scope=None, collection=None):
1219        if parallel:
1220            try:
1221
1222                import concurrent.futures
1223                return self._getMulti_parallel(keys_lst, pause_sec, timeout_sec, scope=scope, collection=collection)
1224            except ImportError:
1225                return self._getMulti_seq(keys_lst, pause_sec, timeout_sec, scope=scope, collection=collection)
1226        else:
1227            return self._getMulti_seq(keys_lst, pause_sec, timeout_sec, scope=scope, collection=collection)
1228
1229    def _getMulti_seq(self, keys_lst, pause_sec=1, timeout_sec=5, scope=None, collection=None):
1230        server_keys = self._get_server_keys_dic(
1231            keys_lst)  # set keys in their respective vbuckets and identify the server for each vBucketId
1232        keys_vals = {}
1233        for server_str, keys in list(server_keys.items()):  # get memcached client against each server and multi get
1234            mc = self.memcacheds[server_str]
1235            keys_vals.update(
1236                self._getMulti_from_mc(mc, keys, pause_sec, timeout_sec, self._getMulti_seq,
1237                                       scope=scope, collection=collection))
1238        if len(keys_lst) != len(keys_vals):
1239            raise ValueError("Not able to get values for following keys - {0}".format(
1240                set(keys_lst).difference(list(keys_vals.keys()))))
1241        return keys_vals
1242
1243    def _getMulti_parallel(self, keys_lst, pause_sec=1, timeout_sec=5, scope=None, collection=None):
1244        server_keys = self._get_server_keys_dic(keys_lst)
1245        tasks = []
1246        import concurrent.futures
1247        with concurrent.futures.ThreadPoolExecutor(max_workers=len(server_keys)) as executor:
1248            for server_str, keys in list(server_keys.items()):
1249                mc = self.memcacheds[server_str]
1250                tasks.append(
1251                    executor.submit(self._getMulti_from_mc, mc, keys, pause_sec, timeout_sec, self._getMulti_parallel,
1252                                    scope=scope, collection=collection))
1253            keys_vals = self._reduce_getMulti_values(tasks, pause_sec, timeout_sec)
1254            if len(set(keys_lst)) != len(keys_vals):
1255                raise ValueError("Not able to get values for following keys - {0}".format(
1256                    set(keys_lst).difference(list(keys_vals[collection].keys()))))
1257
1258            return keys_vals
1259
1260    def _getMulti_from_mc(self, memcached_client, keys, pause, timeout, rec_caller_fn, scope=None, collection=None):
1261        try:
1262            if collection:
1263                self.enable_collection(memcached_client)
1264            return memcached_client.getMulti(keys, scope=scope, collection=collection)
1265
1266        except (EOFError, socket.error) as error:
1267            if "Got empty data (remote died?)" in str(error) or \
1268                    "Timeout waiting for socket" in str(error) or \
1269                    "Broken pipe" in str(error) or "Connection reset by peer" in str(error) \
1270                    and timeout > 0:
1271                time.sleep(pause)
1272                self.reset_vbuckets(self.rest, self._get_vBucket_ids(keys))
1273                return rec_caller_fn(keys, pause, timeout - pause, scope=scope, collection=collection)
1274            else:
1275                raise error
1276        except BaseException as error:
1277            if timeout <= 0:
1278                raise error
1279            time.sleep(pause)
1280            self.reset_vbuckets(self.rest, self._get_vBucket_ids(keys))
1281            return rec_caller_fn(keys, pause, timeout - pause)
1282
1283    def _reduce_getMulti_values(self, tasks, pause, timeout):
1284        keys_vals = {}
1285        import concurrent.futures
1286        now = time.time()
1287        for future in concurrent.futures.as_completed(tasks, timeout):
1288            if future.exception() is not None:
1289                self.log.error("exception in {0} sec".format(time.time() - now))
1290                raise future.exception()
1291            keys_vals.update(future.result())
1292        return keys_vals
1293
1294    def _get_server_keys_dic(self, keys):
1295        server_keys = {}
1296        for key in keys:
1297            vBucketId = self._get_vBucket_id(key)
1298            server_str = self.vBucketMap[vBucketId]
1299            if server_str not in server_keys:
1300                server_keys[server_str] = []
1301            server_keys[server_str].append(key)
1302        return server_keys
1303
1304    def _get_vBucket_ids(self, keys, scope=None, collection=None):
1305        return {self._get_vBucket_id(key) for key in keys}
1306
1307    def _get_vBucket_id(self, key, scope=None, collection=None):
1308        return (zlib.crc32(key.encode()) >> 16) & (len(self.vBucketMap) - 1)
1309
1310    def delete(self, key, scope=None, collection=None):
1311        vb_error = 0
1312        while True:
1313            try:
1314                return self._send_op(self.memcached(key).delete, key, scope=scope, collection=collection)
1315            except MemcachedError as error:
1316                if error.status in [ERR_NOT_MY_VBUCKET, ERR_EINVAL] and vb_error < 5:
1317                    self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)})
1318                    vb_error += 1
1319                else:
1320                    raise error
1321            except (EOFError, socket.error) as error:
1322                if "Got empty data (remote died?)" in str(error) or \
1323                        "Timeout waiting for socket" in str(error) or \
1324                        "Broken pipe" in str(error) or "Connection reset by peer" in str(error) \
1325                        and vb_error < 5:
1326                    self.reset_vbuckets(self.rest, set([key], scope=scope, collection=collection))
1327                    vb_error += 1
1328                else:
1329                    raise error
1330            except BaseException as error:
1331                if vb_error < 5:
1332                    self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)})
1333                    vb_error += 1
1334                else:
1335                    raise error
1336
1337    def _send_op(self, func, *args, **kargs):
1338        backoff = .001
1339        while True:
1340            try:
1341                return func(*args, **kargs)
1342            except MemcachedError as error:
1343                if error.status == ERR_ETMPFAIL and backoff < .5:
1344                    time.sleep(backoff)
1345                    backoff *= 2
1346                else:
1347                    raise error
1348            except (EOFError, IOError, socket.error) as error:
1349                raise MemcachedError(ERR_NOT_MY_VBUCKET, "Connection reset with error: {0}".format(error))
1350
1351    def done(self):
1352        [self.memcacheds[ip].close() for ip in self.memcacheds]
1353
1354        # This saves a lot of repeated code - the func is the mc bin client function
1355
1356    def generic_request(self, func, *args):
1357        key = args[0]
1358        vb_error = 0
1359        while True:
1360            try:
1361                return self._send_op(func, *args)
1362            except MemcachedError as error:
1363                if error.status == ERR_NOT_MY_VBUCKET and vb_error < 5:
1364                    self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)},
1365                                        forward_map=self._parse_not_my_vbucket_error(error))
1366                    vb_error += 1
1367                else:
1368                    raise error
1369            except (EOFError, socket.error) as error:
1370                if "Got empty data (remote died?)" in str(error) or \
1371                        "Timeout waiting for socket" in str(error) or \
1372                        "Broken pipe" in str(error) or "Connection reset by peer" in str(error) \
1373                        and vb_error < 5:
1374                    self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)})
1375                    vb_error += 1
1376                    if vb_error >= 5:
1377                        raise error
1378                else:
1379                    raise error
1380            except BaseException as error:
1381                if vb_error < 5:
1382                    self.reset_vbuckets(self.rest, {self._get_vBucket_id(key)})
1383                    self.log.info("***************resetting vbucket id***********")
1384                    vb_error += 1
1385                else:
1386                    raise error
1387
1388    def _parse_not_my_vbucket_error(self, error):
1389        error_msg = error.msg
1390        if "Connection reset with error:" in error_msg:
1391            self.log.error("{0} while _send_op, server is alive?".format(error_msg))
1392            return None
1393        vbuckets = []
1394        try:
1395            error_json = json.loads(error_msg[error_msg.find('{'):error_msg.rfind('}') + 1])
1396        except:
1397            self.log.error("Error while getting CCCP from not_my_vbucket...\n %s" % error_msg)
1398            return None
1399        if 'vBucketMapForward' in error_json['vBucketServerMap']:
1400            vBucketMap = error_json['vBucketServerMap']['vBucketMapForward']
1401        else:
1402            vBucketMap = error_json['vBucketServerMap']['vBucketMap']
1403        serverList = error_json['vBucketServerMap']['serverList']
1404        if not self.rest:
1405            self.rest = RestConnection(self.info)
1406        serverList = [server.replace("$HOST", str(self.rest.ip))
1407                      if server.find("$HOST") != -1 else server for server in serverList]
1408        counter = 0
1409        for vbucket in vBucketMap:
1410            vbucketInfo = vBucket()
1411            vbucketInfo.master = serverList[vbucket[0]]
1412            if vbucket:
1413                for i in range(1, len(vbucket)):
1414                    if vbucket[i] != -1:
1415                        vbucketInfo.replica.append(serverList[vbucket[i]])
1416            vbucketInfo.id = counter
1417            counter += 1
1418            vbuckets.append(vbucketInfo)
1419        return vbuckets
1420
1421    def sendHellos(self, feature_flag):
1422        for m in self.memcacheds:
1423            self.memcacheds[m].hello(feature_flag)
1424
1425
1426class KVStoreAwareSmartClient(VBucketAwareMemcached):
1427    def __init__(self, rest, bucket, kv_store=None, info=None, store_enabled=True, scope=None, collection=None):
1428        VBucketAwareMemcached.__init__(self, rest, bucket, info, scope=scope, collection=collection)
1429        self.kv_store = kv_store or ClientKeyValueStore()
1430        self.store_enabled = store_enabled
1431        self._rlock = threading.Lock()
1432
1433    def set(self, key, value, ttl=-1, flag=0, scope=None, collection=None):
1434        self._rlock.acquire()
1435        try:
1436            if ttl >= 0:
1437                self.memcached(key).set(key, ttl, 0, value, scope=scope, collection=collection)
1438            else:
1439                self.memcached(key).set(key, 0, 0, value, scope=scope, collection=collection)
1440
1441            if self.store_enabled:
1442                self.kv_store.write(key, hashlib.md5(value.encode()).digest(), ttl)
1443
1444        except MemcachedError as e:
1445            self._rlock.release()
1446            raise MemcachedError(e.status, e.msg)
1447        except AssertionError:
1448            self._rlock.release()
1449            raise AssertionError
1450        except:
1451            self._rlock.release()
1452            raise Exception("General Exception from KVStoreAwareSmartClient.set()")
1453
1454        self._rlock.release()
1455
1456    """
1457    " retrieve meta data of document from disk
1458    """
1459
1460    def get_doc_metadata(self, num_vbuckets, key, scope=None, collection=None):
1461        vid = crc32.crc32_hash(key) & (num_vbuckets - 1)
1462
1463        mc = self.memcached(key, scope=scope, collection=collection)
1464        metadatastats = None
1465
1466        try:
1467            metadatastats = mc.stats("vkey {0} {1}".format(key, vid))
1468        except MemcachedError:
1469            msg = "key {0} doesn't exist in memcached".format(key)
1470            self.log.info(msg)
1471
1472        return metadatastats
1473
1474    def delete(self, key, scope=None, collection=None):
1475        try:
1476            self._rlock.acquire()
1477            opaque, cas, data = self.memcached(key).delete(key, scope=scope, collection=collection)
1478            if self.store_enabled:
1479                self.kv_store.delete(key, scope=scope, collection=collection)
1480            self._rlock.release()
1481            if cas == 0:
1482                raise MemcachedError(7, "Invalid cas value")
1483        except Exception as e:
1484            self._rlock.release()
1485            raise MemcachedError(7, str(e))
1486
1487    def get_valid_key(self, key, scope=None, collection=None):
1488        return self.get_key_check_status(key, "valid", scope=scope, collection=collection)
1489
1490    def get_deleted_key(self, key, scope=None, collection=None):
1491        return self.get_key_check_status(key, "deleted", scope=scope, collection=collection)
1492
1493    def get_expired_key(self, key, scope=None, collection=None):
1494        return self.get_key_check_status(key, "expired", scope=scope, collection=collection)
1495
1496    def get_all_keys(self, scope=None, collection=None):
1497        return self.kv_store.keys(scope=scope, collection=collection)
1498
1499    def get_all_valid_items(self, scope=None, collection=None):
1500        return self.kv_store.valid_items(scope=scope, collection=collection)
1501
1502    def get_all_deleted_items(self, scope=None, collection=None):
1503        return self.kv_store.deleted_items(scope=scope, collection=collection)
1504
1505    def get_all_expired_items(self, scope=None, collection=None):
1506        return self.kv_store.expired_items(scope=scope, collection=collection)
1507
1508    def get_key_check_status(self, key, status, scope=None, collection=None):
1509        item = self.kv_get(key, scope=scope, collection=collection)
1510        if item is not None and item["status"] == status:
1511            return item
1512        else:
1513            msg = "key {0} is not valid".format(key)
1514            self.log.info(msg)
1515            return None
1516
1517    # safe kvstore retrieval
1518    # return dict of {key,status,value,ttl}
1519    # or None if not found
1520    def kv_get(self, key, scope=None, collection=None):
1521        item = None
1522        try:
1523            item = self.kv_store.read(key, scope=scope, collection=collection)
1524        except KeyError:
1525            msg = "key {0} doesn't exist in store".format(key)
1526            # self.log.info(msg)
1527
1528        return item
1529
1530    # safe memcached retrieval
1531    # return dict of {key, flags, seq, value}
1532    # or None if not found
1533    def mc_get(self, key, scope=None, collection=None):
1534        item = self.mc_get_full(key, scope=scope, collection=collection)
1535        if item is not None:
1536            item["value"] = hashlib.md5(item["value"]).digest()
1537        return item
1538
1539    # unhashed value
1540    def mc_get_full(self, key, scope=None, collection=None):
1541        item = None
1542        try:
1543            x, y, value = self.memcached(key).get(key, scope=scope, collection=collection)
1544            item = {}
1545            item["key"] = key
1546            item["flags"] = x
1547            item["seq"] = y
1548            item["value"] = value
1549        except MemcachedError:
1550            msg = "key {0} doesn't exist in memcached".format(key)
1551
1552        return item
1553
1554    def kv_mc_sync_get(self, key, status, scope=None, collection=None):
1555        self._rlock.acquire()
1556        kv_item = self.get_key_check_status(key, status, scope=scope, collection=collection)
1557        mc_item = self.mc_get(key, scope=scope, collection=collection)
1558        self._rlock.release()
1559
1560        return kv_item, mc_item
1561
1562
1563class KVStoreSmartClientHelper(object):
1564
1565    @staticmethod
1566    def do_verification(client, scope=None, collection=None):
1567        keys = client.get_all_keys(scope=scope, collection=collection)
1568        validation_failures = {}
1569        for k in keys:
1570            m, valid = KVStoreSmartClientHelper.verify_key(client, k, scope=scope, collection=collection)
1571            if (valid == False):
1572                validation_failures[k] = m
1573
1574        return validation_failures
1575
1576    @staticmethod
1577    def verify_key(client, key, scope=None, collection=None):
1578        status = False
1579        msg = ""
1580        item = client.kv_get(key, scope=scope, collection=collection)
1581        if item is not None:
1582            if item["status"] == "deleted":
1583                msg, status = \
1584                    KVStoreSmartClientHelper.verify_delete(client, key, scope=scope, collection=collection)
1585
1586            elif item["status"] == "expired":
1587                msg, status = \
1588                    KVStoreSmartClientHelper.verify_expired(client, key, scope=scope, collection=collection)
1589
1590            elif item["status"] == "valid":
1591                msg, status = \
1592                    KVStoreSmartClientHelper.verify_set(client, key, scope=scope, collection=collection)
1593
1594        return msg, status
1595
1596    # verify kvstore contains key with valid status
1597    # and that key also exists in memcached with
1598    # expected value
1599    @staticmethod
1600    def verify_set(client, key, scope=None, collection=None):
1601
1602        kv_item = client.get_valid_key(key, scope=scope, collection=collection)
1603        mc_item = client.mc_get(key, scope=scope, collection=collection)
1604        status = False
1605        msg = ""
1606
1607        if kv_item is not None and mc_item is not None:
1608            # compare values
1609            if kv_item["value"] == mc_item["value"]:
1610                status = True
1611            else:
1612                msg = "kvstore and memcached values mismatch"
1613        elif kv_item is None:
1614            msg = "valid status not set in kv_store"
1615        elif mc_item is None:
1616            msg = "key missing from memcached"
1617
1618        return msg, status
1619
1620    # verify kvstore contains key with deleted status
1621    # and that it does not exist in memcached
1622    @staticmethod
1623    def verify_delete(client, key, scope=None, collection=None):
1624        deleted_kv_item = client.get_deleted_key(key, scope=scope, collection=collection)
1625        mc_item = client.mc_get(key, scope=scope, collection=collection)
1626        status = False
1627        msg = ""
1628
1629        if deleted_kv_item is not None and mc_item is None:
1630            status = True
1631        elif deleted_kv_item is None:
1632            msg = "delete status not set in kv_store"
1633        elif mc_item is not None:
1634            msg = "key still exists in memcached"
1635
1636        return msg, status
1637
1638    # verify kvstore contains key with expired status
1639    # and that key has also expired in memcached
1640    @staticmethod
1641    def verify_expired(client, key, scope=None, collection=None):
1642        expired_kv_item = client.get_expired_key(key, scope=scope, collection=collection)
1643        mc_item = client.mc_get(key, scope=scope, collection=collection)
1644        status = False
1645        msg = ""
1646
1647        if expired_kv_item is not None and mc_item is None:
1648            status = True
1649        elif expired_kv_item is None:
1650            msg = 'exp. status not set in kv_store'
1651        elif mc_item is not None:
1652            msg = "key still exists in memcached"
1653        return msg, status
1654
1655
1656def start_reader_process(info, keyset, queue):
1657    ReaderThread(info, keyset, queue).start()
1658
1659
1660class GeneratedDocuments(object):
1661    def __init__(self, items, kv_template, options=dict(size=1024)):
1662        self._items = items
1663        self._kv_template = kv_template
1664        self._options = options
1665        self._pointer = 0
1666        if "padding" in options:
1667            self._pad = options["padding"]
1668        else:
1669            self._pad = DocumentGenerator._random_string(options["size"])
1670
1671        self._pad = self._pad.decode()
1672
1673    # Required for the for-in syntax
1674    def __iter__(self):
1675        return self
1676
1677    def __len__(self):
1678        return self._items
1679
1680    def reset(self):
1681        self._pointer = 0
1682
1683    def has_next(self):
1684        return self._pointer != self._items
1685
1686    # Returns the next value of the iterator
1687    def __next__(self):
1688        if self._pointer == self._items:
1689            raise StopIteration
1690        else:
1691            i = self._pointer
1692            doc = {"meta": {"id": "{0}-{1}".format(i, self._options["seed"])}, "json": {}}
1693            for k in self._kv_template:
1694                v = self._kv_template[k]
1695                if isinstance(v, str) and v.find("${prefix}") != -1:
1696                    v = v.replace("${prefix}", "{0}".format(i))
1697                    # how about the value size
1698                if isinstance(v, str) and v.find("${padding}") != -1:
1699                    v = v.replace("${padding}", self._pad)
1700                if isinstance(v, str) and v.find("${seed}") != -1:
1701                    v = v.replace("${seed}", "{0}".format(self._options["seed"]))
1702                doc["json"][k] = v
1703        self._pointer += 1
1704        return json.dumps(doc)
1705
1706
1707class DocumentGenerator(object):
1708    # will loop over all values in props and replace ${prefix} with ${i}
1709    @staticmethod
1710    def make_docs(items, kv_template, options=dict(size=1024, seed=str(uuid.uuid4()))):
1711        return GeneratedDocuments(items, kv_template, options)
1712
1713    @staticmethod
1714    def _random_string(length):
1715        return (("%%0%dX" % (length * 2)) % random.getrandbits(length * 8)).encode("ascii")
1716
1717    @staticmethod
1718    def create_value(pattern, size):
1719        return (pattern * (size // len(pattern))) + pattern[0:(size % len(pattern))]
1720
1721    @staticmethod
1722    def get_doc_generators(count, kv_template=None, seed=None, sizes=None):
1723
1724        seed = seed or str(uuid.uuid4())[0:7]
1725        sizes = sizes or [128]
1726
1727        doc_gen_iterators = []
1728
1729        if kv_template is None:
1730            kv_template = {"name": "doc-${prefix}-${seed}",
1731                           "sequence": "${seed}",
1732                           "email": "${prefix}@couchbase.com"}
1733        for size in sizes:
1734            options = {"size": size, "seed": seed}
1735            docs = DocumentGenerator.make_docs(count // len(sizes),
1736                                               kv_template, options)
1737            doc_gen_iterators.append(docs)
1738
1739        return doc_gen_iterators
1740
1741    @staticmethod
1742    def get_doc_generators_by_load_ratio(rest,
1743                                         bucket='default',
1744                                         ram_load_ratio=1,
1745                                         value_size_distribution=None,
1746                                         seed=None):
1747
1748        log = logger.Logger.get_logger()
1749
1750        if ram_load_ratio < 0:
1751            raise MemcachedClientHelperExcetion(errorcode='invalid_argument',
1752                                                message="ram_load_ratio")
1753        if not value_size_distribution:
1754            value_size_distribution = {16: 0.25, 128: 0.25, 512: 0.25, 1024: 0.25}
1755
1756        list = []
1757
1758        info = rest.get_bucket(bucket)
1759        emptySpace = info.stats.ram - info.stats.memUsed
1760        space_to_fill = (int((emptySpace * ram_load_ratio) / 100.0))
1761        log.info('space_to_fill : {0}, emptySpace : {1}'.format(space_to_fill, emptySpace))
1762        for size, probability in list(value_size_distribution.items()):
1763            how_many = int(space_to_fill / (size + 250) * probability)
1764            doc_seed = seed or str(uuid.uuid4())
1765            kv_template = {"name": "user-${prefix}", "payload": "memcached-json-${prefix}-${padding}",
1766                           "size": size, "seed": doc_seed}
1767            options = {"size": size, "seed": doc_seed}
1768            payload_generator = DocumentGenerator.make_docs(how_many, kv_template, options)
1769            list.append({'size': size, 'value': payload_generator, 'how_many': how_many, 'seed': doc_seed})
1770
1771        return list
1772
1773
1774#        docs = DocumentGenerator.make_docs(number_of_items,
1775#                {"name": "user-${prefix}", "payload": "payload-${prefix}-${padding}"},
1776#                {"size": 1024, "seed": str(uuid.uuid4())})
1777
1778# Format of the json documents that mcsoda uses.
1779# JSON BODY
1780# {
1781# "key":"%s",
1782# "key_num":%s,
1783# "name":"%s",
1784# "email":"%s",
1785# "city":"%s",
1786# "country":"%s",
1787# "realm":"%s",
1788# "coins":%s,
1789# "achievements":%s
1790# }
1791
1792class LoadWithMcsoda(object):
1793
1794    def __init__(self, master, num_docs, prefix='', bucket='default', rest_user='Administrator',
1795                 rest_password="password", protocol='membase-binary', port=11211):
1796
1797        rest = RestConnection(master)
1798        self.bucket = bucket
1799        vBuckets = rest.get_vbuckets(self.bucket)
1800        self.vbucket_count = len(vBuckets)
1801
1802        self.cfg = {
1803            'max-items': num_docs,
1804            'max-creates': num_docs,
1805            'min-value-size': 128,
1806            'exit-after-creates': 1,
1807            'ratio-sets': 1,
1808            'ratio-misses': 0,
1809            'ratio-creates': 1,
1810            'ratio-deletes': 0,
1811            'ratio-hot': 0,
1812            'ratio-hot-sets': 1,
1813            'ratio-hot-gets': 0,
1814            'ratio-expirations': 0,
1815            'expiration': 0,
1816            'threads': 1,
1817            'json': 1,
1818            'batch': 10,
1819            'vbuckets': self.vbucket_count,
1820            'doc-cache': 0,
1821            'doc-gen': 0,
1822            'prefix': prefix,
1823            'socket-timeout': 60,
1824        }
1825
1826        self.protocol = protocol
1827        self.rest_user = rest_user
1828        self.rest_password = rest_password
1829
1830        if protocol == 'membase-binary':
1831            self.host_port = "{0}:{1}:{2}".format(master.ip, master.port, port)
1832
1833        elif protocol == 'memcached-binary':
1834            self.host_port = "{0}:{1}:{1}".format(master.ip, port)
1835
1836        self.ctl = {'run_ok': True}
1837
1838    def protocol_parse(self, protocol_in):
1839        if protocol_in.find('://') >= 0:
1840            protocol = \
1841                '-'.join(((["membase"] + \
1842                           protocol_in.split("://"))[-2] + "-binary").split('-')[0:2])
1843            host_port = ('@' + protocol_in.split("://")[-1]).split('@')[-1] + ":8091"
1844            user, pswd = (('@' + protocol_in.split("://")[-1]).split('@')[-2] + ":").split(':')[0:2]
1845
1846        return protocol, host_port, user, pswd
1847
1848    def get_cfg(self):
1849        return self.cfg
1850
1851    def load_data(self, scope=None, collection=None):
1852        cur, start_time, end_time = mcsoda.run(self.cfg, {}, self.protocol, self.host_port, self.rest_user,
1853                                               self.rest_password, ctl=self.ctl, bucket=self.bucket)
1854        return cur
1855
1856    def load_stop(self):
1857        self.ctl['run_ok'] = False
1858