1from random import shuffle
2import time
3import logger
4from couchbase_helper.cluster import Cluster
5from membase.api.exception import StatsUnavailableException, \
6    ServerAlreadyJoinedException, RebalanceFailedException, \
7    FailoverFailedException, InvalidArgumentException, ServerSelfJoinException, \
8    AddNodeException
9from membase.api.rest_client import RestConnection, RestHelper, Bucket
10from membase.helper.bucket_helper import BucketOperationHelper
11from memcached.helper.data_helper import MemcachedClientHelper, VBucketAwareMemcached
12from mc_bin_client import MemcachedClient, MemcachedError
13
14log = logger.Logger.get_logger()
15
16
17class RebalanceHelper():
18    @staticmethod
19    #bucket is a json object that contains name,port,password
20    def wait_for_mc_stats_all_nodes(master, bucket, stat_key, stat_value, timeout_in_seconds=120, verbose=True):
21        log.info("waiting for bucket {0} stat : {1} to match {2} on {3}".format(bucket, stat_key, \
22                                                                                stat_value, master.ip))
23        time_to_timeout = 0
24        previous_stat_value = -1
25        curr_stat_value = -1
26        verified = False
27        all_stats = {}
28        while not verified:
29            rest = RestConnection(master)
30            nodes = rest.node_statuses()
31            for node in nodes:
32                _server = {"ip": node.ip, "port": node.port, "username": master.rest_username,
33                           "password": master.rest_password}
34                #failed over node is part of node_statuses but since its failed over memcached connections
35                #to this node will fail
36                node_self = RestConnection(_server).get_nodes_self()
37                if node_self.clusterMembership == 'active':
38                    mc = MemcachedClientHelper.direct_client(_server, bucket)
39                    n_stats = mc.stats("")
40                    mc.close()
41                    all_stats[node.id] = n_stats
42            actual_stat_value = -1
43            for k in all_stats:
44                if all_stats[k] and stat_key in all_stats[k]:
45                    if actual_stat_value == -1:
46                        log.info(all_stats[k][stat_key])
47                        actual_stat_value = int(all_stats[k][stat_key])
48                    else:
49                        actual_stat_value += int(all_stats[k][stat_key])
50            if actual_stat_value == stat_value:
51                log.info("{0} : {1}".format(stat_key, actual_stat_value))
52                verified = True
53                break
54            else:
55                if verbose:
56                    log.info("{0} : {1}".format(stat_key, actual_stat_value))
57                curr_stat_value = actual_stat_value
58
59                # values are changing so clear any timeout
60                if curr_stat_value != previous_stat_value:
61                    time_to_timeout = 0
62                else:
63                    if time_to_timeout == 0:
64                        time_to_timeout = time.time() + timeout_in_seconds
65                    if time_to_timeout < time.time():
66                        log.info("no change in {0} stat after {1} seconds (value = {2})".format(stat_key, timeout_in_seconds, curr_stat_value))
67                        break
68
69                previous_stat_value = curr_stat_value
70
71                if not verbose:
72                    time.sleep(0.1)
73                else:
74                    time.sleep(2)
75        return verified
76
77    @staticmethod
78    def wait_for_replication(servers, cluster_helper=None, timeout=600):
79        if cluster_helper is None:
80            cluster = Cluster()
81        else:
82            cluster = cluster_helper
83        tasks = []
84        rest = RestConnection(servers[0])
85        buckets = rest.get_buckets()
86        for server in servers:
87            for bucket in buckets:
88                for server_repl in list(set(servers) - set([server])):
89                    tasks.append(cluster.async_wait_for_stats([server], bucket, 'tap',
90                                   'eq_tapq:replication_ns_1@' + server_repl.ip + ':idle', '==', 'true'))
91                    tasks.append(cluster.async_wait_for_stats([server], bucket, 'tap',
92                                   'eq_tapq:replication_ns_1@' + server_repl.ip + ':backfill_completed', '==', 'true'))
93        try:
94            for task in tasks:
95                task.result(timeout)
96        finally:
97            if cluster_helper is None:
98                # stop all newly created task manager threads
99                cluster.shutdown()
100            return True
101
102    @staticmethod
103    #bucket is a json object that contains name,port,password
104    def wait_for_stats(master, bucket, stat_key, stat_value, timeout_in_seconds=120, verbose=True):
105        log.info("waiting for bucket {0} stat : {1} to match {2} on {3}".format(bucket, stat_key, \
106                                                                                stat_value, master.ip))
107        time_to_timeout = 0
108        previous_stat_value = -1
109        curr_stat_value = -1
110        verified = False
111        while not verified:
112            rest = RestConnection(master)
113            try:
114                stats = rest.get_bucket_stats(bucket)
115                if stats and stat_key in stats and stats[stat_key] == stat_value:
116                    log.info("{0} : {1}".format(stat_key, stats[stat_key]))
117                    verified = True
118                    break
119                else:
120                    if stats and stat_key in stats:
121                        if verbose:
122                            log.info("{0} : {1}".format(stat_key, stats[stat_key]))
123                        curr_stat_value = stats[stat_key]
124
125                    # values are changing so clear any timeout
126                    if curr_stat_value != previous_stat_value:
127                        time_to_timeout = 0
128                    else:
129                        if time_to_timeout == 0:
130                            time_to_timeout = time.time() + timeout_in_seconds
131                        if time_to_timeout < time.time():
132                            log.info("no change in {0} stat after {1} seconds (value = {2})".format(stat_key, timeout_in_seconds, curr_stat_value))
133                            break
134
135                    previous_stat_value = curr_stat_value
136
137                    if not verbose:
138                        time.sleep(0.1)
139                    else:
140                        time.sleep(2)
141            except:
142                log.info("unable to collect stats from server {0}".format(master))
143                verified = True  #TODO: throw ex and assume caller catches
144                break
145            # wait for 5 seconds for the next check
146            time.sleep(5)
147
148        return verified
149
150    @staticmethod
151    def wait_for_stats_no_timeout(master, bucket, stat_key, stat_value, timeout_in_seconds=-1, verbose=True):
152        log.info("waiting for bucket {0} stat : {1} to match {2} on {3}".format(bucket, stat_key, \
153                                                                                stat_value, master.ip))
154        rest = RestConnection(master)
155        stats = rest.get_bucket_stats(bucket)
156
157        while stats.get(stat_key, -1) != stat_value:
158            stats = rest.get_bucket_stats(bucket)
159            if verbose:
160                log.info("{0} : {1}".format(stat_key, stats.get(stat_key, -1)))
161            time.sleep(5)
162        return True
163
164    @staticmethod
165    #bucket is a json object that contains name,port,password
166    def wait_for_mc_stats(master, bucket, stat_key, stat_value, timeout_in_seconds=120, verbose=True):
167        log.info("waiting for bucket {0} stat : {1} to match {2} on {3}".format(bucket, stat_key, \
168                                                                                stat_value, master.ip))
169        start = time.time()
170        verified = False
171        while (time.time() - start) <= timeout_in_seconds:
172            c = MemcachedClient(master.ip, 11210)
173            stats = c.stats()
174            c.close()
175            if stats and stat_key in stats and str(stats[stat_key]) == str(stat_value):
176                log.info("{0} : {1}".format(stat_key, stats[stat_key]))
177                verified = True
178                break
179            else:
180                if stats and stat_key in stats:
181                    if verbose:
182                        log.info("{0} : {1}".format(stat_key, stats[stat_key]))
183                if not verbose:
184                    time.sleep(0.1)
185                else:
186                    time.sleep(2)
187        return verified
188
189    @staticmethod
190    def wait_for_mc_stats_no_timeout(master, bucket, stat_key, stat_value, timeout_in_seconds=-1, verbose=True):
191        log.info("waiting for bucket {0} stat : {1} to match {2} on {3}".format(bucket, stat_key, \
192                                                                                stat_value, master.ip))
193        # keep retrying until reaches the server
194        stats = {}
195        while not stats:
196            try:
197                c = MemcachedClient(master.ip, 11210)
198                c.sasl_auth_plain(bucket, '')
199                stats = c.stats()
200            except Exception as e:
201                log.info("Exception: {0}, retry in 2 seconds ...".format(str(e)))
202                stats = {}
203                time.sleep(2)
204            finally:
205                c.close()
206
207        while str(stats[stat_key]) != str(stat_value):
208            c = MemcachedClient(master.ip, 11210)
209            c.sasl_auth_plain(bucket, '')
210            stats = c.stats()
211            c.close()
212            if verbose:
213                log.info("{0} : {1}".format(stat_key, stats[stat_key]))
214            time.sleep(5)
215        return True
216
217    @staticmethod
218    #bucket is a json object that contains name,port,password
219    def wait_for_stats_int_value(master, bucket, stat_key, stat_value, option="==", timeout_in_seconds=120, verbose=True):
220        log.info("waiting for bucket {0} stat : {1} to {2} {3} on {4}".format(bucket, stat_key, option, \
221                                                                                stat_value, master.ip))
222        start = time.time()
223        verified = False
224        while (time.time() - start) <= timeout_in_seconds:
225            rest = RestConnection(master)
226            stats = rest.get_bucket_stats(bucket)
227            #some stats are in memcached
228            if stats and stat_key in stats:
229                actual = int(stats[stat_key])
230                if option == "==":
231                    verified = stat_value == actual
232                elif option == ">":
233                    verified = stat_value > actual
234                elif option == "<":
235                    verified = stat_value < actual
236                elif option == ">=":
237                    verified = stat_value >= actual
238                elif option == "<=":
239                    verified = stat_value <= actual
240                if verified:
241                    log.info("verified {0} : {1}".format(stat_key, actual))
242                    break
243                if verbose:
244                    log.info("{0} : {1} isn't {2} {3}".format(stat_key, stat_value, option, actual))
245            time.sleep(2)
246        return verified
247
248    @staticmethod
249    #bucket is a json object that contains name,port,password
250    def wait_for_stats_on_all(master, bucket, stat_key, stat_value, timeout_in_seconds=120,
251                              fn=None):
252        fn = fn or RebalanceHelper.wait_for_stats
253        rest = RestConnection(master)
254        servers = rest.get_nodes()
255        verified = False
256        start_time = time.time()
257        for server in servers:
258            verified = fn(server, bucket, stat_key, stat_value, \
259                          timeout_in_seconds=timeout_in_seconds)
260            if not verified:
261                log.info("bucket {0}: stat_key {1} for server {2} timed out in {3}".format(bucket, stat_key, \
262                                                                                           server.ip, time.time() - start_time))
263                break
264
265        return verified
266
267    @staticmethod
268    def wait_till_total_numbers_match(master,
269                                      bucket,
270                                      timeout_in_seconds=120):
271
272        log.info('waiting for sum_of_curr_items == total_items....')
273        start = time.time()
274        verified = False
275        while (time.time() - start) <= timeout_in_seconds:
276            try:
277                if RebalanceHelper.verify_items_count(master, bucket):
278                    verified = True
279                    break
280                else:
281                    time.sleep(2)
282            except StatsUnavailableException:
283                log.error("unable to retrieve stats for any node! Print taps for all nodes:")
284                break
285        if not verified:
286            rest = RestConnection(master)
287            RebalanceHelper.print_taps_from_all_nodes(rest, bucket)
288        return verified
289
290    @staticmethod
291    def wait_for_persistence(master, bucket, timeout=120):
292        verified = True
293        verified &= RebalanceHelper.wait_for_mc_stats_all_nodes(
294            master, bucket, "ep_queue_size", 0,
295            timeout_in_seconds=timeout)
296        verified &= RebalanceHelper.wait_for_mc_stats_all_nodes(
297            master, bucket, "ep_flusher_todo", 0,
298            timeout_in_seconds=timeout)
299        verified &= RebalanceHelper.wait_for_mc_stats_all_nodes(
300            master, bucket, "ep_uncommitted_items", 0,
301            timeout_in_seconds=timeout)
302        return verified
303
304    @staticmethod
305    #TODO: add password and port
306    def print_taps_from_all_nodes(rest, bucket='default'):
307        #get the port number from rest ?
308
309        log = logger.Logger.get_logger()
310        nodes_for_stats = rest.get_nodes()
311        for node_for_stat in nodes_for_stats:
312            try:
313                client = MemcachedClientHelper.direct_client(node_for_stat, bucket)
314                log.info("getting tap stats... for {0}".format(node_for_stat.ip))
315                tap_stats = client.stats('tap')
316                if tap_stats:
317                    RebalanceHelper.log_interesting_taps(node_for_stat, tap_stats, log)
318                client.close()
319            except Exception as ex:
320                log.error("error {0} while getting stats...".format(ex))
321
322    @staticmethod
323    def log_interesting_taps(node, tap_stats, logger):
324        interesting_stats = ['ack_log_size', 'ack_seqno', 'ack_window_full', 'has_item', 'has_queued_item',
325                             'idle', 'paused', 'backfill_completed', 'pending_backfill', 'pending_disk_backfill', 'recv_ack_seqno',
326                             'ep_num_new_']
327        for name in tap_stats:
328            for interesting_stat in interesting_stats:
329                if name.find(interesting_stat) != -1:
330                    logger.info("TAP {0} :{1}   {2}".format(node.id, name, tap_stats[name]))
331                    break
332
333    @staticmethod
334    def verify_items_count(master, bucket, num_attempt=3, timeout=2):
335        #get the #of buckets from rest
336        rest = RestConnection(master)
337        if isinstance(bucket, Bucket):
338            bucket = bucket.name
339        bucket_info = rest.get_bucket(bucket, num_attempt, timeout)
340        replica_factor = bucket_info.numReplicas
341        vbucket_active_sum = 0
342        vbucket_replica_sum = 0
343        vbucket_pending_sum = 0
344        all_server_stats = []
345        stats_received = True
346        nodes = rest.get_nodes()
347        for server in nodes:
348            #get the stats
349            server_stats = rest.get_bucket_stats_for_node(bucket, server)
350            if not server_stats:
351                log.info("unable to get stats from {0}:{1}".format(server.ip, server.port))
352                stats_received = False
353            all_server_stats.append((server, server_stats))
354        if not stats_received:
355            raise StatsUnavailableException()
356        sum = 0
357        for server, single_stats in all_server_stats:
358            if not single_stats or "curr_items" not in single_stats:
359                continue
360            sum += single_stats["curr_items"]
361            log.info("curr_items from {0}:{1} : {2}".format(server.ip, server.port, \
362                single_stats["curr_items"]))
363            if 'vb_pending_num' in single_stats:
364                vbucket_pending_sum += single_stats['vb_pending_num']
365                log.info(
366                    "vb_pending_num from {0}:{1} : {2}".format(server.ip, server.port, \
367                        single_stats["vb_pending_num"]))
368            if 'vb_active_num' in single_stats:
369                vbucket_active_sum += single_stats['vb_active_num']
370                log.info(
371                    "vb_active_num from {0}:{1} : {2}".format(server.ip, server.port, \
372                        single_stats["vb_active_num"]))
373            if 'vb_replica_num' in single_stats:
374                vbucket_replica_sum += single_stats['vb_replica_num']
375                log.info(
376                    "vb_replica_num from {0}:{1} : {2}".format(server.ip, server.port, \
377                        single_stats["vb_replica_num"]))
378
379        msg = "summation of vb_active_num : {0} vb_pending_num : {1} vb_replica_num : {2}"
380        log.info(msg.format(vbucket_active_sum, vbucket_pending_sum, vbucket_replica_sum))
381        msg = 'sum : {0} and sum * (replica_factor + 1) ({1}) : {2}'
382        log.info(msg.format(sum, replica_factor + 1, (sum * (replica_factor + 1))))
383        master_stats = rest.get_bucket_stats(bucket)
384        if "curr_items_tot" in master_stats:
385            log.info('curr_items_tot from master: {0}'.format(master_stats["curr_items_tot"]))
386        else:
387           raise Exception("bucket {O} stats doesnt contain 'curr_items_tot':".format(bucket))
388        if replica_factor >= len(nodes):
389            log.warn("the number of nodes is less than replica requires")
390            delta = sum * (len(nodes)) - master_stats["curr_items_tot"]
391        else:
392            delta = sum * (replica_factor + 1) - master_stats["curr_items_tot"]
393        delta = abs(delta)
394
395        if delta > 0:
396            if sum == 0:
397                missing_percentage = 0
398            else:
399                missing_percentage = delta * 1.0 / (sum * (replica_factor + 1))
400            log.info("Nodes stats are: {0}".format([node.ip for node in nodes]))
401        else:
402            missing_percentage = 1
403        log.info("delta : {0} missing_percentage : {1} replica_factor : {2}".format(delta, \
404            missing_percentage, replica_factor))
405        # If no items missing then, return True
406        if not delta:
407            return True
408        return False
409
410    @staticmethod
411    def verify_maps(vbucket_map_before, vbucket_map_after):
412        #for each bucket check the replicas
413        for i in range(0, len(vbucket_map_before)):
414            if not vbucket_map_before[i].master == vbucket_map_after[i].master:
415                log.error(
416                    'vbucket[{0}].master mismatch {1} vs {2}'.format(i, vbucket_map_before[i].master,
417                                                                     vbucket_map_after[i].master))
418                return False
419            for j in range(0, len(vbucket_map_before[i].replica)):
420                if not (vbucket_map_before[i].replica[j]) == (vbucket_map_after[i].replica[j]):
421                    log.error('vbucket[{0}].replica[{1} mismatch {2} vs {3}'.format(i, j,
422                                                                                    vbucket_map_before[i].replica[j],
423                                                                                    vbucket_map_after[i].replica[j]))
424                    return False
425        return True
426
427    #read the current nodes
428    # if the node_ip already added then just
429    #silently return
430    #if its not added then let try to add this and then rebalance
431    #we should alo try to get the bucket information from
432    #rest api instead of passing it to the fucntions
433
434    @staticmethod
435    def rebalance_in(servers, how_many, do_shuffle=True, monitor=True, do_check=True):
436        servers_rebalanced = []
437        log = logger.Logger.get_logger()
438        rest = RestConnection(servers[0])
439        nodes = rest.node_statuses()
440        #are all ips the same
441        nodes_on_same_ip = True
442        firstIp = nodes[0].ip
443        if len(nodes) == 1:
444            nodes_on_same_ip = False
445        else:
446            for node in nodes:
447                if node.ip != firstIp:
448                    nodes_on_same_ip = False
449                    break
450        nodeIps = ["{0}:{1}".format(node.ip, node.port) for node in nodes]
451        log.info("current nodes : {0}".format(nodeIps))
452        toBeAdded = []
453        master = servers[0]
454        selection = servers[1:]
455        if do_shuffle:
456            shuffle(selection)
457        for server in selection:
458            if nodes_on_same_ip:
459                if not "{0}:{1}".format(firstIp, server.port) in nodeIps:
460                    toBeAdded.append(server)
461                    servers_rebalanced.append(server)
462                    log.info("choosing {0}:{1}".format(server.ip, server.port))
463            elif not "{0}:{1}".format(server.ip, server.port) in nodeIps:
464                toBeAdded.append(server)
465                servers_rebalanced.append(server)
466                log.info("choosing {0}:{1}".format(server.ip, server.port))
467            if len(toBeAdded) == int(how_many):
468                break
469
470        if do_check and len(toBeAdded) < how_many:
471            raise Exception("unable to find {0} nodes to rebalance_in".format(how_many))
472
473        for server in toBeAdded:
474            otpNode = rest.add_node(master.rest_username, master.rest_password,
475                                    server.ip, server.port)
476        otpNodes = [node.id for node in rest.node_statuses()]
477        started = rest.rebalance(otpNodes, [])
478        msg = "rebalance operation started ? {0}"
479        log.info(msg.format(started))
480        if monitor is not True:
481            return True, servers_rebalanced
482        if started:
483            try:
484                result = rest.monitorRebalance()
485            except RebalanceFailedException as e:
486                log.error("rebalance failed: {0}".format(e))
487                return False, servers_rebalanced
488            msg = "successfully rebalanced in selected nodes from the cluster ? {0}"
489            log.info(msg.format(result))
490            return result, servers_rebalanced
491        return False, servers_rebalanced
492
493    @staticmethod
494    def rebalance_out(servers, how_many, monitor=True):
495        rest = RestConnection(servers[0])
496        cur_ips = map(lambda node: node.ip, rest.node_statuses())
497        servers = filter(lambda server: server.ip in cur_ips, servers) or servers
498        if len(cur_ips) <= how_many or how_many < 1:
499            log.error("failed to rebalance %s servers out: not enough servers"
500                      % how_many)
501            return False, []
502
503        ejections = servers[1:how_many + 1]
504
505        log.info("rebalancing out %s" % ejections)
506        RebalanceHelper.begin_rebalance_out(servers[0], ejections)
507
508        if not monitor:
509            return True, ejections
510        try:
511            return rest.monitorRebalance(), ejections
512        except RebalanceFailedException, e:
513            log.error("failed to rebalance %s servers out: %s" % (how_many, e))
514            return False, ejections
515
516    @staticmethod
517    def rebalance_swap(servers, how_many, monitor=True):
518        if how_many < 1:
519            log.error("failed to swap rebalance %s servers - invalid count"
520                      % how_many)
521            return False, []
522
523        rest = RestConnection(servers[0])
524        cur_nodes = rest.node_statuses()
525        cur_ips = map(lambda node: node.ip, cur_nodes)
526        cur_ids = map(lambda node: node.id, cur_nodes)
527        free_servers = filter(lambda server: server.ip not in cur_ips, servers)
528
529        if len(cur_ids) <= how_many or len(free_servers) < how_many:
530            log.error("failed to swap rebalance %s servers - not enough servers"
531                      % how_many)
532            return False, []
533
534        ejections = cur_ids[-how_many:]
535        additions = free_servers[:how_many]
536
537        log.info("swap rebalance: cur: %s, eject: %s, add: %s"
538                 % (cur_ids, ejections, additions))
539
540        try:
541            map(lambda server: rest.add_node(servers[0].rest_username,
542                                             servers[0].rest_password,
543                                             server.ip, server.port), additions)
544        except (ServerAlreadyJoinedException,
545                ServerSelfJoinException, AddNodeException), e:
546            log.error("failed to swap rebalance - addition failed %s: %s"
547                      % (additions, e))
548            return False, []
549
550        cur_ids = map(lambda node: node.id, rest.node_statuses())
551        try:
552            rest.rebalance(otpNodes=cur_ids, ejectedNodes=ejections)
553        except InvalidArgumentException, e:
554            log.error("failed to swap rebalance - rebalance failed :%s" % e)
555            return False, []
556
557        if not monitor:
558            return True, ejections + additions
559
560        try:
561            return rest.monitorRebalance(), ejections + additions
562        except RebalanceFailedException, e:
563            log.error("failed to swap rebalance %s servers: %s" % (how_many, e))
564            return False, ejections + additions
565
566    @staticmethod
567    def begin_rebalance_in(master, servers, timeout=5):
568        log = logger.Logger.get_logger()
569        rest = RestConnection(master)
570        otpNode = None
571
572        for server in servers:
573            if server == master:
574                continue
575            log.info("adding node {0}:{1} to cluster".format(server.ip, server.port))
576            try:
577                otpNode = rest.add_node(master.rest_username, master.rest_password, server.ip, server.port)
578                msg = "unable to add node {0}:{1} to the cluster"
579                assert otpNode, msg.format(server.ip, server.port)
580            except ServerAlreadyJoinedException:
581                log.info("server {0} already joined".format(server))
582        log.info("beginning rebalance in")
583        try:
584            rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()], ejectedNodes=[])
585        except:
586            log.error("rebalance failed, trying again after {0} seconds".format(timeout))
587
588    @staticmethod
589    def begin_rebalance_out(master, servers, timeout=5):
590        log = logger.Logger.get_logger()
591        rest = RestConnection(master)
592
593        master_node = rest.get_nodes_self()
594
595        allNodes = []
596        ejectedNodes = []
597        nodes = rest.node_statuses()
598        for server in servers:
599            server_node = RestConnection(server).get_nodes_self()
600            if server_node == master_node:
601                continue
602            log.info("removing node {0}:{1} from cluster".format(server_node.ip, server_node.port))
603            for node in nodes:
604                if "{0}:{1}".format(node.ip, node.port) == "{0}:{1}".format(server_node.ip, server_node.port):
605                    ejectedNodes.append(node.id)
606        log.info("beginning rebalance out")
607        try:
608            rest.rebalance(otpNodes=[node.id for node in nodes], ejectedNodes=ejectedNodes)
609        except:
610            log.error("rebalance failed, trying again after {0} seconds".format(timeout))
611
612    @staticmethod
613    def end_rebalance(master):
614        log = logger.Logger.get_logger()
615        rest = RestConnection(master)
616        result = False
617        try:
618            result = rest.monitorRebalance()
619        except RebalanceFailedException as e:
620            log.error("rebalance failed: {0}".format(e))
621        assert result, "rebalance operation failed after adding nodes"
622        log.info("rebalance finished")
623
624    @staticmethod
625    def getOtpNodeIds(master):
626        rest = RestConnection(master)
627        nodes = rest.node_statuses()
628        otpNodeIds = [node.id for node in nodes]
629        return otpNodeIds
630
631    @staticmethod
632    def verify_vBuckets_info(master, bucket="default"):
633        '''
634        verify vBuckets' state and items count(for active/replica) in them related to vBucketMap for all nodes in cluster
635        '''
636        awareness = VBucketAwareMemcached(RestConnection(master), bucket)
637        vb_map = awareness.vBucketMap
638        vb_mapReplica = awareness.vBucketMapReplica
639        replica_num = len(vb_mapReplica[0])
640
641        #get state and count items for all vbuckets for each node
642        node_stats = RebalanceHelper.get_vBuckets_info(master)
643        state = True
644        #iterate throught all vbuckets by their numbers
645        for num in vb_map:
646            #verify that active vbucket in memcached  is also active in stats("hash)
647            if(node_stats[vb_map[num]]["vb_" + str(num)][0] != "active"):
648                log.info("vBucket {0} in {1} node has wrong state {3}".format("vb_" + str(num), vb_map[num], node_stats[vb_map[num]]["vb_" + str(num)]));
649                state = False
650            #number of active items for num vBucket
651            vb = node_stats[vb_map[num]]["vb_" + str(num)][1]
652            active_vb = vb_map[num]
653            #list of nodes for wich num vBucket is replica
654            replica_vbs = vb_mapReplica[key]
655            sum_items_replica = 0
656            #sum of replica items for all nodes for num vBucket
657            for i in range(replica_num):
658                if(node_stats[vb_mapReplica[num][i]]["vb_" + str(num)][0] != "replica"):
659                    log.info("vBucket {0} in {1} node has wrong state {3}".format("vb_" + str(num), vb_mapReplica[num], node_stats[vb_mapReplica[num]]["vb_" + str(num)]));
660                    state = False
661                sum_items_replica += int(node_stats[replica_vbs[i]]["vb_" + str(num)][1])
662            #print information about the discrepancy of the number of replica and active items for num vBucket
663            if (int(vb) * len(vb_mapReplica[num]) != sum_items_replica):
664                log.info("sum of active items doesn't correspond to replica's vBucets in {0} vBucket:".format("vb_" + str(num)))
665                log.info("items in active vBucket {0}:{1}".format(vb_map[num], node_stats[vb_map[num]]["vb_" + str(num)]))
666                for j in range(replica):
667                    log.info("items in replica vBucket {0}: {1}".format(vb_mapReplica[num][j], node_stats[vb_mapReplica[num][j]]["vb_" + str(num)]))
668                    log.info(node_stats[vb_mapReplica[num][0]])
669                state = False
670
671        if not state:
672             log.error("Something is wrong, see log above. See details:")
673             log.error("vBucetMap: {0}".format(vb_map))
674             log.error("vBucetReplicaMap: {0}".format(vb_mapReplica))
675             log.error("node_stats: {0}".format(node_stats))
676        return state
677
678    @staticmethod
679    def get_vBuckets_info(master):
680        """
681        return state and count items for all vbuckets for each node
682        format: dict: {u'1node_ip1': {'vb_79': ['replica', '0'], 'vb_78': ['active', '0']..}, u'1node_ip1':....}
683        """
684        rest = RestConnection(master)
685        port = rest.get_nodes_self().memcached
686        nodes = rest.node_statuses()
687        _nodes_stats = {}
688        for node in nodes:
689            stat = {}
690            buckets = []
691            _server = {"ip": node.ip, "port": node.port, "username": master.rest_username,
692                           "password": master.rest_password}
693            try:
694                buckets = rest.get_buckets()
695                mc = MemcachedClient(node.ip, port)
696                stat_hash = mc.stats("hash")
697            except Exception:
698                if not buckets:
699                    log.error("There are not any buckets in {0}:{1} node".format(node.ip, node.port))
700                else:
701                    log.error("Impossible to get vBucket's information for {0}:{1} node".format(node.ip, node.port))
702                    _nodes_stats[node.ip + ":" + str(node.port)]
703                continue
704            mc.close()
705            vb_names = [key[:key.index(":")] for key in stat_hash.keys()]
706
707            for name in vb_names:
708                stat[name] = [stat_hash[name + ":state"], stat_hash[name + ":counted"]]
709            _nodes_stats[node.ip + ":" + str(port)] = stat
710        log.info(_nodes_stats)
711        return _nodes_stats
712
713
714    @staticmethod
715    def pick_node(master):
716        rest = RestConnection(master)
717        nodes = rest.node_statuses()
718        node_picked = None
719        nodes_on_same_ip = True
720
721        firstIp = nodes[0].ip
722        for node in nodes:
723            if node.ip != firstIp:
724                nodes_on_same_ip = False
725                break
726
727        for node in nodes:
728            node_picked = node
729            if not nodes_on_same_ip:
730                if node_picked.ip != master.ip:
731                    log.info(
732                        "Picked node ... {0}:{1}".format(node_picked.ip, node_picked.port))
733                    break
734            else:
735                # temp fix - port numbers of master(machine ip and localhost: 9000 match
736                if int(node_picked.port) == int(
737                    master.port):
738                    log.info("Not picking the master node {0}:{1}.. try again...".format(node_picked.ip,
739                        node_picked.port))
740                else:
741                    log.info(
742                        "Picked  node {0}:{1}".format(node_picked.ip, node_picked.port))
743                    break
744        return node_picked
745
746    @staticmethod
747    def pick_nodes(master, howmany=1, target_node = None):
748        rest = RestConnection(master)
749        nodes = rest.node_statuses()
750        picked = []
751        for node_for_stat in nodes:
752            if node_for_stat.ip != master.ip or  str(node_for_stat.port) != master.port :
753                if target_node ==  None:
754                    picked.append(node_for_stat)
755                elif target_node.ip == node_for_stat.ip:
756                    picked.append(node_for_stat)
757                    return picked
758                if len(picked) == howmany:
759                    break
760        return picked
761