1import copy
2import time
3import uuid
4import zlib
5import logger
6import mc_bin_client
7import crc32
8import socket
9import ctypes
10from membase.api.rest_client import RestConnection, RestHelper
11import memcacheConstants
12from memcached.helper.data_helper import MemcachedClientHelper, VBucketAwareMemcached
13from mc_bin_client import MemcachedClient
14from threading import Thread
15import Queue
16from collections import defaultdict
17from couchbase_helper.stats_tools import StatsCommon
18from remote.remote_util import RemoteMachineShellConnection
19from subprocess import call
20
21class BucketOperationHelper():
22
23    #this function will assert
24
25    @staticmethod
26    def base_bucket_ratio(servers):
27        ratio = 1.0
28        #check if ip is same for all servers
29        ip = servers[0].ip
30        dev_environment = True
31        for server in servers:
32            if server.ip != ip:
33                dev_environment = False
34                break
35        if dev_environment:
36            ratio = 2.0 / 3.0 * 1 / len(servers)
37        else:
38            ratio = 2.0 / 3.0
39        return ratio
40
41    @staticmethod
42    def create_multiple_buckets(server, replica, bucket_ram_ratio=(2.0 / 3.0), howmany=3, sasl=True, saslPassword='password'):
43        success = True
44        log = logger.Logger.get_logger()
45        rest = RestConnection(server)
46        info = rest.get_nodes_self()
47        if info.memoryQuota < 450.0:
48            log.error("at least need 450MB memoryQuota")
49            success = False
50        else:
51            available_ram = info.memoryQuota * bucket_ram_ratio
52            if available_ram / howmany > 100:
53                bucket_ram = int(available_ram / howmany)
54            else:
55                bucket_ram = 100
56                #choose a port that is not taken by this ns server
57            port = info.moxi + 1
58            for i in range(0, howmany):
59                name = "bucket-{0}".format(i)
60                if sasl:
61                    rest.create_bucket(bucket=name,
62                                       ramQuotaMB=bucket_ram,
63                                       replicaNumber=replica,
64                                       authType="sasl",
65                                       saslPassword=saslPassword,
66                                       proxyPort=port)
67                else:
68                    rest.create_bucket(bucket=name,
69                                       ramQuotaMB=bucket_ram,
70                                       replicaNumber=replica,
71                                       proxyPort=port)
72                port += 1
73                msg = "create_bucket succeeded but bucket \"{0}\" does not exist"
74                bucket_created = BucketOperationHelper.wait_for_bucket_creation(name, rest)
75                if not bucket_created:
76                    log.error(msg.format(name))
77                    success = False
78                    break
79        return success
80
81    @staticmethod
82    def create_default_buckets(servers, number_of_replicas=1, assert_on_test=None):
83        log = logger.Logger.get_logger()
84        for serverInfo in servers:
85            ip_rest = RestConnection(serverInfo)
86            ip_rest.create_bucket(bucket='default',
87                               ramQuotaMB=256,
88                               replicaNumber=number_of_replicas,
89                               proxyPort=11220)
90            msg = 'create_bucket succeeded but bucket "default" does not exist'
91            removed_all_buckets = BucketOperationHelper.wait_for_bucket_creation('default', ip_rest)
92            if not removed_all_buckets:
93                log.error(msg)
94                if assert_on_test:
95                    assert_on_test.fail(msg=msg)
96
97    @staticmethod
98    def create_bucket(serverInfo, name='default', replica=1, port=11210, test_case=None, bucket_ram=-1, password=None):
99        log = logger.Logger.get_logger()
100        rest = RestConnection(serverInfo)
101        if bucket_ram < 0:
102            info = rest.get_nodes_self()
103            bucket_ram = info.memoryQuota * 2 / 3
104
105        if password == None:
106            authType = "sasl"
107        else:
108            authType = "none"
109
110        rest.create_bucket(bucket=name,
111                           ramQuotaMB=bucket_ram,
112                           replicaNumber=replica,
113                           proxyPort=port,
114                           authType=authType,
115                           saslPassword=password)
116        msg = 'create_bucket succeeded but bucket "{0}" does not exist'
117        bucket_created = BucketOperationHelper.wait_for_bucket_creation(name, rest)
118        if not bucket_created:
119            log.error(msg)
120            if test_case:
121                test_case.fail(msg=msg.format(name))
122        return bucket_created
123
124    @staticmethod
125    def delete_all_buckets_or_assert(servers, test_case):
126        log = logger.Logger.get_logger()
127        for serverInfo in servers:
128            rest = RestConnection(serverInfo)
129            buckets = []
130            try:
131                buckets = rest.get_buckets()
132            except Exception as e:
133                log.error(e)
134                log.error('15 seconds sleep before calling get_buckets again...')
135                time.sleep(15)
136                buckets = rest.get_buckets()
137            log.info('deleting existing buckets {0} on {1}'.format([b.name for b in buckets], serverInfo.ip))
138            for bucket in buckets:
139                log.info("remove bucket {0} ...".format(bucket.name))
140                try:
141                    status = rest.delete_bucket(bucket.name)
142                except ServerUnavailableException as e:
143                    log.error(e)
144                    log.error('5 seconds sleep before calling delete_bucket again...')
145                    time.sleep(5)
146                    status = rest.delete_bucket(bucket.name)
147                if not status:
148                    try:
149                        BucketOperationHelper.print_dataStorage_content(servers)
150                        log.info(StatsCommon.get_stats([serverInfo], bucket.name, "timings"))
151                    except:
152                        log.error("Unable to get timings for bucket")
153                log.info('deleted bucket : {0} from {1}'.format(bucket.name, serverInfo.ip))
154                msg = 'bucket "{0}" was not deleted even after waiting for two minutes'.format(bucket.name)
155                if test_case:
156                    if not BucketOperationHelper.wait_for_bucket_deletion(bucket.name, rest, 200):
157                        try:
158                            BucketOperationHelper.print_dataStorage_content(servers)
159                            log.info(StatsCommon.get_stats([serverInfo], bucket.name, "timings"))
160                        except:
161                            log.error("Unable to get timings for bucket")
162                        test_case.fail(msg)
163
164    @staticmethod
165    def delete_bucket_or_assert(serverInfo, bucket='default', test_case=None):
166        log = logger.Logger.get_logger()
167        log.info('deleting existing bucket {0} on {1}'.format(bucket, serverInfo))
168
169        rest = RestConnection(serverInfo)
170        if RestHelper(rest).bucket_exists(bucket):
171            status = rest.delete_bucket(bucket)
172            if not status:
173                try:
174                    BucketOperationHelper.print_dataStorage_content([serverInfo])
175                    log.info(StatsCommon.get_stats([serverInfo], bucket, "timings"))
176                except:
177                    log.error("Unable to get timings for bucket")
178            log.info('deleted bucket : {0} from {1}'.format(bucket, serverInfo.ip))
179        msg = 'bucket "{0}" was not deleted even after waiting for two minutes'.format(bucket)
180        if test_case:
181            if not BucketOperationHelper.wait_for_bucket_deletion(bucket, rest, 200):
182                try:
183                    BucketOperationHelper.print_dataStorage_content([serverInfo])
184                    log.info(StatsCommon.get_stats([serverInfo], bucket, "timings"))
185                except:
186                    log.error("Unable to get timings for bucket")
187                test_case.fail(msg)
188
189
190    @staticmethod
191    def print_dataStorage_content(servers):
192        """"printout content of data and index path folders"""
193        #Determine whether its a cluster_run/not
194        cluster_run = True
195
196        firstIp = servers[0].ip
197        if len(servers) == 1 and servers[0].port == '8091':
198            cluster_run = False
199        else:
200            for node in servers:
201                if node.ip != firstIp:
202                    cluster_run = False
203                    break
204
205        for serverInfo in servers:
206            node = RestConnection(serverInfo).get_nodes_self()
207            paths = set([node.storage[0].path, node.storage[0].index_path])
208            for path in paths:
209                if "c:/Program Files" in path:
210                    path = path.replace("c:/Program Files", "/cygdrive/c/Program Files")
211
212                if cluster_run:
213                    call(["ls", "-lR", path])
214                else:
215                    shell = RemoteMachineShellConnection(serverInfo)
216                    o, r = shell.execute_command("ls -LR '{0}'".format(path))
217                    shell.log_command_output(o, r)
218
219    #TODO: TRY TO USE MEMCACHED TO VERIFY BUCKET DELETION BECAUSE
220    # BUCKET DELETION IS A SYNC CALL W.R.T MEMCACHED
221    @staticmethod
222    def wait_for_bucket_deletion(bucket,
223                                 rest,
224                                 timeout_in_seconds=120):
225        log = logger.Logger.get_logger()
226        log.info('waiting for bucket deletion to complete....')
227        start = time.time()
228        helper = RestHelper(rest)
229        while (time.time() - start) <= timeout_in_seconds:
230            if not helper.bucket_exists(bucket):
231                return True
232            else:
233                time.sleep(2)
234        return False
235
236    @staticmethod
237    def wait_for_bucket_creation(bucket,
238                                 rest,
239                                 timeout_in_seconds=120):
240        log = logger.Logger.get_logger()
241        log.info('waiting for bucket creation to complete....')
242        start = time.time()
243        helper = RestHelper(rest)
244        while (time.time() - start) <= timeout_in_seconds:
245            if helper.bucket_exists(bucket):
246                return True
247            else:
248                time.sleep(2)
249        return False
250
251    @staticmethod
252    def wait_for_vbuckets_ready_state(node, bucket, timeout_in_seconds=300, log_msg=''):
253        log = logger.Logger.get_logger()
254        start_time = time.time()
255        end_time = start_time + timeout_in_seconds
256        ready_vbuckets = {}
257        rest = RestConnection(node)
258        servers = rest.get_nodes()
259        RestHelper(rest).vbucket_map_ready(bucket, 60)
260        vbucket_count = len(rest.get_vbuckets(bucket))
261        vbuckets = rest.get_vbuckets(bucket)
262        obj = VBucketAwareMemcached(rest, bucket)
263        memcacheds, vbucket_map, vbucket_map_replica = obj.request_map(rest, bucket)
264        #Create dictionary with key:"ip:port" and value: a list of vbuckets
265        server_dict = defaultdict(list)
266        for everyID in range(0, vbucket_count):
267            memcached_ip_port = str(vbucket_map[everyID])
268            server_dict[memcached_ip_port].append(everyID)
269        while time.time() < end_time and len(ready_vbuckets) < vbucket_count:
270            for every_ip_port in server_dict:
271                #Retrieve memcached ip and port
272                ip, port = every_ip_port.split(":")
273                client = MemcachedClient(ip, int(port), timeout=30)
274                client.vbucket_count = len(vbuckets)
275                bucket_info = rest.get_bucket(bucket)
276                client.sasl_auth_plain(bucket_info.name.encode('ascii'),
277                                    bucket_info.saslPassword.encode('ascii'))
278                for i in server_dict[every_ip_port]:
279                    try:
280                        (a, b, c) = client.get_vbucket_state(i)
281                    except mc_bin_client.MemcachedError as e:
282                        ex_msg = str(e)
283                        if "Not my vbucket" in log_msg:
284                            log_msg = log_msg[:log_msg.find("vBucketMap") + 12] + "..."
285                        if "Not my vbucket" in ex_msg:
286                            # May receive this while waiting for vbuckets, continue and retry...S
287                            continue
288                        log.error("%s: %s" % (log_msg, ex_msg))
289                        continue
290                    if c.find("\x01") > 0 or c.find("\x02") > 0:
291                        ready_vbuckets[i] = True
292                    elif i in ready_vbuckets:
293                        log.warning("vbucket state changed from active to {0}".format(c))
294                        del ready_vbuckets[i]
295                client.close()
296        return len(ready_vbuckets) == vbucket_count
297
298
299    #try to insert key in all vbuckets before returning from this function
300    #bucket { 'name' : 90,'password':,'port':1211'}
301    @staticmethod
302    def wait_for_memcached(node, bucket, timeout_in_seconds=300, log_msg=''):
303        log = logger.Logger.get_logger()
304        msg = "waiting for memcached bucket : {0} in {1} to accept set ops"
305        log.info(msg.format(bucket, node.ip))
306        all_vbuckets_ready = BucketOperationHelper.wait_for_vbuckets_ready_state(node,
307                                                                                 bucket, timeout_in_seconds, log_msg)
308        #return (counter == vbucket_count) and all_vbuckets_ready
309        return all_vbuckets_ready
310
311    @staticmethod
312    def verify_data(server, keys, value_equal_to_key, verify_flags, test, debug=False, bucket="default"):
313        log = logger.Logger.get_logger()
314        log_error_count = 0
315        #verify all the keys
316        client = MemcachedClientHelper.direct_client(server, bucket)
317        vbucket_count = len(RestConnection(server).get_vbuckets(bucket))
318        #populate key
319        index = 0
320        all_verified = True
321        keys_failed = []
322        for key in keys:
323            try:
324                index += 1
325                vbucketId = crc32.crc32_hash(key) & (vbucket_count - 1)
326                client.vbucketId = vbucketId
327                flag, keyx, value = client.get(key=key)
328                if value_equal_to_key:
329                    test.assertEquals(value, key, msg='values dont match')
330                if verify_flags:
331                    actual_flag = socket.ntohl(flag)
332                    expected_flag = ctypes.c_uint32(zlib.adler32(value)).value
333                    test.assertEquals(actual_flag, expected_flag, msg='flags dont match')
334                if debug:
335                    log.info("verified key #{0} : {1}".format(index, key))
336            except mc_bin_client.MemcachedError as error:
337                if debug:
338                    log_error_count += 1
339                    if log_error_count < 100:
340                        log.error(error)
341                        log.error(
342                            "memcachedError : {0} - unable to get a pre-inserted key : {0}".format(error.status, key))
343                keys_failed.append(key)
344                all_verified = False
345        client.close()
346        if len(keys_failed) > 0:
347            log.error('unable to verify #{0} keys'.format(len(keys_failed)))
348        return all_verified
349
350    @staticmethod
351    def keys_dont_exist(server, keys, bucket):
352        log = logger.Logger.get_logger()
353        #verify all the keys
354        client = MemcachedClientHelper.direct_client(server, bucket)
355        vbucket_count = len(RestConnection(server).get_vbuckets(bucket))
356        #populate key
357        for key in keys:
358            try:
359                vbucketId = crc32.crc32_hash(key) & (vbucket_count - 1)
360                client.vbucketId = vbucketId
361                client.get(key=key)
362                client.close()
363                log.error('key {0} should not exist in the bucket'.format(key))
364                return False
365            except mc_bin_client.MemcachedError as error:
366                log.error(error)
367                log.error("expected memcachedError : {0} - unable to get a pre-inserted key : {1}".format(error.status, key))
368        client.close()
369        return True
370
371    @staticmethod
372    def chunks(l, n):
373        keys_chunks = {}
374        index = 0
375        for i in range(0, len(l), n):
376            keys_chunks[index] = l[i:i + n]
377            index += 1
378        return keys_chunks
379
380    @staticmethod
381    def keys_exist_or_assert_in_parallel(keys, server, bucket_name, test, concurrency=2):
382        log = logger.Logger.get_logger()
383        verification_threads = []
384        queue = Queue.Queue()
385        for i in range(concurrency):
386            keys_chunk = BucketOperationHelper.chunks(keys, len(keys) / concurrency)
387            t = Thread(target=BucketOperationHelper.keys_exist_or_assert,
388                       name="verification-thread-{0}".format(i),
389                       args=(keys_chunk.get(i), server, bucket_name, test, queue))
390            verification_threads.append(t)
391        for t in verification_threads:
392            t.start()
393        for t in verification_threads:
394            log.info("thread {0} finished".format(t.name))
395            t.join()
396        while not queue.empty():
397            item = queue.get()
398            if item is False:
399                return False
400        return True
401
402    @staticmethod
403    def keys_exist_or_assert(keys, server, bucket_name, test, queue=None):
404        #we should try out at least three times
405        log = logger.Logger.get_logger()
406        #verify all the keys
407        client = MemcachedClientHelper.proxy_client(server, bucket_name)
408        #populate key
409        retry = 1
410
411        keys_left_to_verify = []
412        keys_left_to_verify.extend(copy.deepcopy(keys))
413        log_count = 0
414        while retry < 6 and len(keys_left_to_verify) > 0:
415            msg = "trying to verify {0} keys - attempt #{1} : {2} keys left to verify"
416            log.info(msg.format(len(keys), retry, len(keys_left_to_verify)))
417            keys_not_verified = []
418            for key in keys_left_to_verify:
419                try:
420                    client.get(key=key)
421                except mc_bin_client.MemcachedError as error:
422                    keys_not_verified.append(key)
423                    if log_count < 100:
424                        log.error("key {0} does not exist because {1}".format(key, error))
425                        log_count += 1
426            retry += 1
427            keys_left_to_verify = keys_not_verified
428        if len(keys_left_to_verify) > 0:
429            log_count = 0
430            for key in keys_left_to_verify:
431                log.error("key {0} not found".format(key))
432                log_count += 1
433                if log_count > 100:
434                    break
435            msg = "unable to verify {0} keys".format(len(keys_left_to_verify))
436            log.error(msg)
437            if test:
438                test.fail(msg=msg)
439            if queue is None:
440                return False
441            else:
442                queue.put(False)
443        log.info("verified that {0} keys exist".format(len(keys)))
444        if queue is None:
445            return True
446        else:
447            queue.put(True)
448
449    @staticmethod
450    def load_some_data(serverInfo,
451                   fill_ram_percentage=10.0,
452                   bucket_name='default'):
453        log = logger.Logger.get_logger()
454        if fill_ram_percentage <= 0.0:
455            fill_ram_percentage = 5.0
456        client = MemcachedClientHelper.direct_client(serverInfo, bucket_name)
457        #populate key
458        rest = RestConnection(serverInfo)
459        RestHelper(rest).vbucket_map_ready(bucket_name, 60)
460        vbucket_count = len(rest.get_vbuckets(bucket_name))
461        testuuid = uuid.uuid4()
462        info = rest.get_bucket(bucket_name)
463        emptySpace = info.stats.ram - info.stats.memUsed
464        log.info('emptySpace : {0} fill_ram_percentage : {1}'.format(emptySpace, fill_ram_percentage))
465        fill_space = (emptySpace * fill_ram_percentage) / 100.0
466        log.info("fill_space {0}".format(fill_space))
467        # each packet can be 10 KB
468        packetSize = int(10 * 1024)
469        number_of_buckets = int(fill_space) / packetSize
470        log.info('packetSize: {0}'.format(packetSize))
471        log.info('memory usage before key insertion : {0}'.format(info.stats.memUsed))
472        log.info('inserting {0} new keys to memcached @ {0}'.format(number_of_buckets, serverInfo.ip))
473        keys = ["key_%s_%d" % (testuuid, i) for i in range(number_of_buckets)]
474        inserted_keys = []
475        for key in keys:
476            vbucketId = crc32.crc32_hash(key) & (vbucket_count - 1)
477            client.vbucketId = vbucketId
478            try:
479                client.set(key, 0, 0, key)
480                inserted_keys.append(key)
481            except mc_bin_client.MemcachedError as error:
482                log.error(error)
483                client.close()
484                log.error("unable to push key : {0} to vbucket : {1}".format(key, client.vbucketId))
485                if test:
486                    test.fail("unable to push key : {0} to vbucket : {1}".format(key, client.vbucketId))
487                else:
488                    break
489        client.close()
490        return inserted_keys
491