1from random import shuffle 2import time 3import logger 4from couchbase_helper.cluster import Cluster 5from membase.api.exception import StatsUnavailableException, \ 6 ServerAlreadyJoinedException, RebalanceFailedException, \ 7 FailoverFailedException, InvalidArgumentException, ServerSelfJoinException, \ 8 AddNodeException 9from membase.api.rest_client import RestConnection, RestHelper, Bucket 10from membase.helper.bucket_helper import BucketOperationHelper 11from memcached.helper.data_helper import MemcachedClientHelper, VBucketAwareMemcached 12from mc_bin_client import MemcachedClient, MemcachedError 13 14log = logger.Logger.get_logger() 15 16 17class RebalanceHelper(): 18 @staticmethod 19 #bucket is a json object that contains name,port,password 20 def wait_for_mc_stats_all_nodes(master, bucket, stat_key, stat_value, timeout_in_seconds=120, verbose=True): 21 log.info("waiting for bucket {0} stat : {1} to match {2} on {3}".format(bucket, stat_key, \ 22 stat_value, master.ip)) 23 time_to_timeout = 0 24 previous_stat_value = -1 25 curr_stat_value = -1 26 verified = False 27 all_stats = {} 28 while not verified: 29 rest = RestConnection(master) 30 nodes = rest.node_statuses() 31 for node in nodes: 32 _server = {"ip": node.ip, "port": node.port, "username": master.rest_username, 33 "password": master.rest_password} 34 #failed over node is part of node_statuses but since its failed over memcached connections 35 #to this node will fail 36 node_self = RestConnection(_server).get_nodes_self() 37 if node_self.clusterMembership == 'active': 38 mc = MemcachedClientHelper.direct_client(_server, bucket) 39 n_stats = mc.stats("") 40 mc.close() 41 all_stats[node.id] = n_stats 42 actual_stat_value = -1 43 for k in all_stats: 44 if all_stats[k] and stat_key in all_stats[k]: 45 if actual_stat_value == -1: 46 log.info(all_stats[k][stat_key]) 47 actual_stat_value = int(all_stats[k][stat_key]) 48 else: 49 actual_stat_value += int(all_stats[k][stat_key]) 50 if actual_stat_value == stat_value: 51 log.info("{0} : {1}".format(stat_key, actual_stat_value)) 52 verified = True 53 break 54 else: 55 if verbose: 56 log.info("{0} : {1}".format(stat_key, actual_stat_value)) 57 curr_stat_value = actual_stat_value 58 59 # values are changing so clear any timeout 60 if curr_stat_value != previous_stat_value: 61 time_to_timeout = 0 62 else: 63 if time_to_timeout == 0: 64 time_to_timeout = time.time() + timeout_in_seconds 65 if time_to_timeout < time.time(): 66 log.info("no change in {0} stat after {1} seconds (value = {2})".format(stat_key, timeout_in_seconds, curr_stat_value)) 67 break 68 69 previous_stat_value = curr_stat_value 70 71 if not verbose: 72 time.sleep(0.1) 73 else: 74 time.sleep(2) 75 return verified 76 77 @staticmethod 78 def wait_for_replication(servers, cluster_helper=None, timeout=600): 79 if cluster_helper is None: 80 cluster = Cluster() 81 else: 82 cluster = cluster_helper 83 tasks = [] 84 rest = RestConnection(servers[0]) 85 buckets = rest.get_buckets() 86 for server in servers: 87 for bucket in buckets: 88 for server_repl in list(set(servers) - set([server])): 89 tasks.append(cluster.async_wait_for_stats([server], bucket, 'tap', 90 'eq_tapq:replication_ns_1@' + server_repl.ip + ':idle', '==', 'true')) 91 tasks.append(cluster.async_wait_for_stats([server], bucket, 'tap', 92 'eq_tapq:replication_ns_1@' + server_repl.ip + ':backfill_completed', '==', 'true')) 93 try: 94 for task in tasks: 95 task.result(timeout) 96 finally: 97 if cluster_helper is None: 98 # stop all newly created task manager threads 99 cluster.shutdown() 100 return True 101 102 @staticmethod 103 #bucket is a json object that contains name,port,password 104 def wait_for_stats(master, bucket, stat_key, stat_value, timeout_in_seconds=120, verbose=True): 105 log.info("waiting for bucket {0} stat : {1} to match {2} on {3}".format(bucket, stat_key, \ 106 stat_value, master.ip)) 107 time_to_timeout = 0 108 previous_stat_value = -1 109 curr_stat_value = -1 110 verified = False 111 while not verified: 112 rest = RestConnection(master) 113 try: 114 stats = rest.get_bucket_stats(bucket) 115 if stats and stat_key in stats and stats[stat_key] == stat_value: 116 log.info("{0} : {1}".format(stat_key, stats[stat_key])) 117 verified = True 118 break 119 else: 120 if stats and stat_key in stats: 121 if verbose: 122 log.info("{0} : {1}".format(stat_key, stats[stat_key])) 123 curr_stat_value = stats[stat_key] 124 125 # values are changing so clear any timeout 126 if curr_stat_value != previous_stat_value: 127 time_to_timeout = 0 128 else: 129 if time_to_timeout == 0: 130 time_to_timeout = time.time() + timeout_in_seconds 131 if time_to_timeout < time.time(): 132 log.info("no change in {0} stat after {1} seconds (value = {2})".format(stat_key, timeout_in_seconds, curr_stat_value)) 133 break 134 135 previous_stat_value = curr_stat_value 136 137 if not verbose: 138 time.sleep(0.1) 139 else: 140 time.sleep(2) 141 except: 142 log.info("unable to collect stats from server {0}".format(master)) 143 verified = True #TODO: throw ex and assume caller catches 144 break 145 # wait for 5 seconds for the next check 146 time.sleep(5) 147 148 return verified 149 150 @staticmethod 151 def wait_for_stats_no_timeout(master, bucket, stat_key, stat_value, timeout_in_seconds=-1, verbose=True): 152 log.info("waiting for bucket {0} stat : {1} to match {2} on {3}".format(bucket, stat_key, \ 153 stat_value, master.ip)) 154 rest = RestConnection(master) 155 stats = rest.get_bucket_stats(bucket) 156 157 while stats.get(stat_key, -1) != stat_value: 158 stats = rest.get_bucket_stats(bucket) 159 if verbose: 160 log.info("{0} : {1}".format(stat_key, stats.get(stat_key, -1))) 161 time.sleep(5) 162 return True 163 164 @staticmethod 165 #bucket is a json object that contains name,port,password 166 def wait_for_mc_stats(master, bucket, stat_key, stat_value, timeout_in_seconds=120, verbose=True): 167 log.info("waiting for bucket {0} stat : {1} to match {2} on {3}".format(bucket, stat_key, \ 168 stat_value, master.ip)) 169 start = time.time() 170 verified = False 171 while (time.time() - start) <= timeout_in_seconds: 172 c = MemcachedClient(master.ip, 11210) 173 stats = c.stats() 174 c.close() 175 if stats and stat_key in stats and str(stats[stat_key]) == str(stat_value): 176 log.info("{0} : {1}".format(stat_key, stats[stat_key])) 177 verified = True 178 break 179 else: 180 if stats and stat_key in stats: 181 if verbose: 182 log.info("{0} : {1}".format(stat_key, stats[stat_key])) 183 if not verbose: 184 time.sleep(0.1) 185 else: 186 time.sleep(2) 187 return verified 188 189 @staticmethod 190 def wait_for_mc_stats_no_timeout(master, bucket, stat_key, stat_value, timeout_in_seconds=-1, verbose=True): 191 log.info("waiting for bucket {0} stat : {1} to match {2} on {3}".format(bucket, stat_key, \ 192 stat_value, master.ip)) 193 # keep retrying until reaches the server 194 stats = {} 195 while not stats: 196 try: 197 c = MemcachedClient(master.ip, 11210) 198 c.sasl_auth_plain(bucket, '') 199 stats = c.stats() 200 except Exception as e: 201 log.info("Exception: {0}, retry in 2 seconds ...".format(str(e))) 202 stats = {} 203 time.sleep(2) 204 finally: 205 c.close() 206 207 while str(stats[stat_key]) != str(stat_value): 208 c = MemcachedClient(master.ip, 11210) 209 c.sasl_auth_plain(bucket, '') 210 stats = c.stats() 211 c.close() 212 if verbose: 213 log.info("{0} : {1}".format(stat_key, stats[stat_key])) 214 time.sleep(5) 215 return True 216 217 @staticmethod 218 #bucket is a json object that contains name,port,password 219 def wait_for_stats_int_value(master, bucket, stat_key, stat_value, option="==", timeout_in_seconds=120, verbose=True): 220 log.info("waiting for bucket {0} stat : {1} to {2} {3} on {4}".format(bucket, stat_key, option, \ 221 stat_value, master.ip)) 222 start = time.time() 223 verified = False 224 while (time.time() - start) <= timeout_in_seconds: 225 rest = RestConnection(master) 226 stats = rest.get_bucket_stats(bucket) 227 #some stats are in memcached 228 if stats and stat_key in stats: 229 actual = int(stats[stat_key]) 230 if option == "==": 231 verified = stat_value == actual 232 elif option == ">": 233 verified = stat_value > actual 234 elif option == "<": 235 verified = stat_value < actual 236 elif option == ">=": 237 verified = stat_value >= actual 238 elif option == "<=": 239 verified = stat_value <= actual 240 if verified: 241 log.info("verified {0} : {1}".format(stat_key, actual)) 242 break 243 if verbose: 244 log.info("{0} : {1} isn't {2} {3}".format(stat_key, stat_value, option, actual)) 245 time.sleep(2) 246 return verified 247 248 @staticmethod 249 #bucket is a json object that contains name,port,password 250 def wait_for_stats_on_all(master, bucket, stat_key, stat_value, timeout_in_seconds=120, 251 fn=None): 252 fn = fn or RebalanceHelper.wait_for_stats 253 rest = RestConnection(master) 254 servers = rest.get_nodes() 255 verified = False 256 start_time = time.time() 257 for server in servers: 258 verified = fn(server, bucket, stat_key, stat_value, \ 259 timeout_in_seconds=timeout_in_seconds) 260 if not verified: 261 log.info("bucket {0}: stat_key {1} for server {2} timed out in {3}".format(bucket, stat_key, \ 262 server.ip, time.time() - start_time)) 263 break 264 265 return verified 266 267 @staticmethod 268 def wait_till_total_numbers_match(master, 269 bucket, 270 timeout_in_seconds=120): 271 272 log.info('waiting for sum_of_curr_items == total_items....') 273 start = time.time() 274 verified = False 275 while (time.time() - start) <= timeout_in_seconds: 276 try: 277 if RebalanceHelper.verify_items_count(master, bucket): 278 verified = True 279 break 280 else: 281 time.sleep(2) 282 except StatsUnavailableException: 283 log.error("unable to retrieve stats for any node! Print taps for all nodes:") 284 break 285 if not verified: 286 rest = RestConnection(master) 287 RebalanceHelper.print_taps_from_all_nodes(rest, bucket) 288 return verified 289 290 @staticmethod 291 def wait_for_persistence(master, bucket, timeout=120): 292 verified = True 293 verified &= RebalanceHelper.wait_for_mc_stats_all_nodes( 294 master, bucket, "ep_queue_size", 0, 295 timeout_in_seconds=timeout) 296 verified &= RebalanceHelper.wait_for_mc_stats_all_nodes( 297 master, bucket, "ep_flusher_todo", 0, 298 timeout_in_seconds=timeout) 299 verified &= RebalanceHelper.wait_for_mc_stats_all_nodes( 300 master, bucket, "ep_uncommitted_items", 0, 301 timeout_in_seconds=timeout) 302 return verified 303 304 @staticmethod 305 #TODO: add password and port 306 def print_taps_from_all_nodes(rest, bucket='default'): 307 #get the port number from rest ? 308 309 log = logger.Logger.get_logger() 310 nodes_for_stats = rest.get_nodes() 311 for node_for_stat in nodes_for_stats: 312 try: 313 client = MemcachedClientHelper.direct_client(node_for_stat, bucket) 314 log.info("getting tap stats... for {0}".format(node_for_stat.ip)) 315 tap_stats = client.stats('tap') 316 if tap_stats: 317 RebalanceHelper.log_interesting_taps(node_for_stat, tap_stats, log) 318 client.close() 319 except Exception as ex: 320 log.error("error {0} while getting stats...".format(ex)) 321 322 @staticmethod 323 def log_interesting_taps(node, tap_stats, logger): 324 interesting_stats = ['ack_log_size', 'ack_seqno', 'ack_window_full', 'has_item', 'has_queued_item', 325 'idle', 'paused', 'backfill_completed', 'pending_backfill', 'pending_disk_backfill', 'recv_ack_seqno', 326 'ep_num_new_'] 327 for name in tap_stats: 328 for interesting_stat in interesting_stats: 329 if name.find(interesting_stat) != -1: 330 logger.info("TAP {0} :{1} {2}".format(node.id, name, tap_stats[name])) 331 break 332 333 @staticmethod 334 def verify_items_count(master, bucket, num_attempt=3, timeout=2): 335 #get the #of buckets from rest 336 rest = RestConnection(master) 337 if isinstance(bucket, Bucket): 338 bucket = bucket.name 339 bucket_info = rest.get_bucket(bucket, num_attempt, timeout) 340 replica_factor = bucket_info.numReplicas 341 vbucket_active_sum = 0 342 vbucket_replica_sum = 0 343 vbucket_pending_sum = 0 344 all_server_stats = [] 345 stats_received = True 346 nodes = rest.get_nodes() 347 for server in nodes: 348 #get the stats 349 server_stats = rest.get_bucket_stats_for_node(bucket, server) 350 if not server_stats: 351 log.info("unable to get stats from {0}:{1}".format(server.ip, server.port)) 352 stats_received = False 353 all_server_stats.append((server, server_stats)) 354 if not stats_received: 355 raise StatsUnavailableException() 356 sum = 0 357 for server, single_stats in all_server_stats: 358 if not single_stats or "curr_items" not in single_stats: 359 continue 360 sum += single_stats["curr_items"] 361 log.info("curr_items from {0}:{1} : {2}".format(server.ip, server.port, \ 362 single_stats["curr_items"])) 363 if 'vb_pending_num' in single_stats: 364 vbucket_pending_sum += single_stats['vb_pending_num'] 365 log.info( 366 "vb_pending_num from {0}:{1} : {2}".format(server.ip, server.port, \ 367 single_stats["vb_pending_num"])) 368 if 'vb_active_num' in single_stats: 369 vbucket_active_sum += single_stats['vb_active_num'] 370 log.info( 371 "vb_active_num from {0}:{1} : {2}".format(server.ip, server.port, \ 372 single_stats["vb_active_num"])) 373 if 'vb_replica_num' in single_stats: 374 vbucket_replica_sum += single_stats['vb_replica_num'] 375 log.info( 376 "vb_replica_num from {0}:{1} : {2}".format(server.ip, server.port, \ 377 single_stats["vb_replica_num"])) 378 379 msg = "summation of vb_active_num : {0} vb_pending_num : {1} vb_replica_num : {2}" 380 log.info(msg.format(vbucket_active_sum, vbucket_pending_sum, vbucket_replica_sum)) 381 msg = 'sum : {0} and sum * (replica_factor + 1) ({1}) : {2}' 382 log.info(msg.format(sum, replica_factor + 1, (sum * (replica_factor + 1)))) 383 master_stats = rest.get_bucket_stats(bucket) 384 if "curr_items_tot" in master_stats: 385 log.info('curr_items_tot from master: {0}'.format(master_stats["curr_items_tot"])) 386 else: 387 raise Exception("bucket {O} stats doesnt contain 'curr_items_tot':".format(bucket)) 388 if replica_factor >= len(nodes): 389 log.warn("the number of nodes is less than replica requires") 390 delta = sum * (len(nodes)) - master_stats["curr_items_tot"] 391 else: 392 delta = sum * (replica_factor + 1) - master_stats["curr_items_tot"] 393 delta = abs(delta) 394 395 if delta > 0: 396 if sum == 0: 397 missing_percentage = 0 398 else: 399 missing_percentage = delta * 1.0 / (sum * (replica_factor + 1)) 400 log.info("Nodes stats are: {0}".format([node.ip for node in nodes])) 401 else: 402 missing_percentage = 1 403 log.info("delta : {0} missing_percentage : {1} replica_factor : {2}".format(delta, \ 404 missing_percentage, replica_factor)) 405 # If no items missing then, return True 406 if not delta: 407 return True 408 return False 409 410 @staticmethod 411 def verify_maps(vbucket_map_before, vbucket_map_after): 412 #for each bucket check the replicas 413 for i in range(0, len(vbucket_map_before)): 414 if not vbucket_map_before[i].master == vbucket_map_after[i].master: 415 log.error( 416 'vbucket[{0}].master mismatch {1} vs {2}'.format(i, vbucket_map_before[i].master, 417 vbucket_map_after[i].master)) 418 return False 419 for j in range(0, len(vbucket_map_before[i].replica)): 420 if not (vbucket_map_before[i].replica[j]) == (vbucket_map_after[i].replica[j]): 421 log.error('vbucket[{0}].replica[{1} mismatch {2} vs {3}'.format(i, j, 422 vbucket_map_before[i].replica[j], 423 vbucket_map_after[i].replica[j])) 424 return False 425 return True 426 427 #read the current nodes 428 # if the node_ip already added then just 429 #silently return 430 #if its not added then let try to add this and then rebalance 431 #we should alo try to get the bucket information from 432 #rest api instead of passing it to the fucntions 433 434 @staticmethod 435 def rebalance_in(servers, how_many, do_shuffle=True, monitor=True, do_check=True): 436 servers_rebalanced = [] 437 log = logger.Logger.get_logger() 438 rest = RestConnection(servers[0]) 439 nodes = rest.node_statuses() 440 #are all ips the same 441 nodes_on_same_ip = True 442 firstIp = nodes[0].ip 443 if len(nodes) == 1: 444 nodes_on_same_ip = False 445 else: 446 for node in nodes: 447 if node.ip != firstIp: 448 nodes_on_same_ip = False 449 break 450 nodeIps = ["{0}:{1}".format(node.ip, node.port) for node in nodes] 451 log.info("current nodes : {0}".format(nodeIps)) 452 toBeAdded = [] 453 master = servers[0] 454 selection = servers[1:] 455 if do_shuffle: 456 shuffle(selection) 457 for server in selection: 458 if nodes_on_same_ip: 459 if not "{0}:{1}".format(firstIp, server.port) in nodeIps: 460 toBeAdded.append(server) 461 servers_rebalanced.append(server) 462 log.info("choosing {0}:{1}".format(server.ip, server.port)) 463 elif not "{0}:{1}".format(server.ip, server.port) in nodeIps: 464 toBeAdded.append(server) 465 servers_rebalanced.append(server) 466 log.info("choosing {0}:{1}".format(server.ip, server.port)) 467 if len(toBeAdded) == int(how_many): 468 break 469 470 if do_check and len(toBeAdded) < how_many: 471 raise Exception("unable to find {0} nodes to rebalance_in".format(how_many)) 472 473 for server in toBeAdded: 474 otpNode = rest.add_node(master.rest_username, master.rest_password, 475 server.ip, server.port) 476 otpNodes = [node.id for node in rest.node_statuses()] 477 started = rest.rebalance(otpNodes, []) 478 msg = "rebalance operation started ? {0}" 479 log.info(msg.format(started)) 480 if monitor is not True: 481 return True, servers_rebalanced 482 if started: 483 try: 484 result = rest.monitorRebalance() 485 except RebalanceFailedException as e: 486 log.error("rebalance failed: {0}".format(e)) 487 return False, servers_rebalanced 488 msg = "successfully rebalanced in selected nodes from the cluster ? {0}" 489 log.info(msg.format(result)) 490 return result, servers_rebalanced 491 return False, servers_rebalanced 492 493 @staticmethod 494 def rebalance_out(servers, how_many, monitor=True): 495 rest = RestConnection(servers[0]) 496 cur_ips = map(lambda node: node.ip, rest.node_statuses()) 497 servers = filter(lambda server: server.ip in cur_ips, servers) or servers 498 if len(cur_ips) <= how_many or how_many < 1: 499 log.error("failed to rebalance %s servers out: not enough servers" 500 % how_many) 501 return False, [] 502 503 ejections = servers[1:how_many + 1] 504 505 log.info("rebalancing out %s" % ejections) 506 RebalanceHelper.begin_rebalance_out(servers[0], ejections) 507 508 if not monitor: 509 return True, ejections 510 try: 511 return rest.monitorRebalance(), ejections 512 except RebalanceFailedException, e: 513 log.error("failed to rebalance %s servers out: %s" % (how_many, e)) 514 return False, ejections 515 516 @staticmethod 517 def rebalance_swap(servers, how_many, monitor=True): 518 if how_many < 1: 519 log.error("failed to swap rebalance %s servers - invalid count" 520 % how_many) 521 return False, [] 522 523 rest = RestConnection(servers[0]) 524 cur_nodes = rest.node_statuses() 525 cur_ips = map(lambda node: node.ip, cur_nodes) 526 cur_ids = map(lambda node: node.id, cur_nodes) 527 free_servers = filter(lambda server: server.ip not in cur_ips, servers) 528 529 if len(cur_ids) <= how_many or len(free_servers) < how_many: 530 log.error("failed to swap rebalance %s servers - not enough servers" 531 % how_many) 532 return False, [] 533 534 ejections = cur_ids[-how_many:] 535 additions = free_servers[:how_many] 536 537 log.info("swap rebalance: cur: %s, eject: %s, add: %s" 538 % (cur_ids, ejections, additions)) 539 540 try: 541 map(lambda server: rest.add_node(servers[0].rest_username, 542 servers[0].rest_password, 543 server.ip, server.port), additions) 544 except (ServerAlreadyJoinedException, 545 ServerSelfJoinException, AddNodeException), e: 546 log.error("failed to swap rebalance - addition failed %s: %s" 547 % (additions, e)) 548 return False, [] 549 550 cur_ids = map(lambda node: node.id, rest.node_statuses()) 551 try: 552 rest.rebalance(otpNodes=cur_ids, ejectedNodes=ejections) 553 except InvalidArgumentException, e: 554 log.error("failed to swap rebalance - rebalance failed :%s" % e) 555 return False, [] 556 557 if not monitor: 558 return True, ejections + additions 559 560 try: 561 return rest.monitorRebalance(), ejections + additions 562 except RebalanceFailedException, e: 563 log.error("failed to swap rebalance %s servers: %s" % (how_many, e)) 564 return False, ejections + additions 565 566 @staticmethod 567 def begin_rebalance_in(master, servers, timeout=5): 568 log = logger.Logger.get_logger() 569 rest = RestConnection(master) 570 otpNode = None 571 572 for server in servers: 573 if server == master: 574 continue 575 log.info("adding node {0}:{1} to cluster".format(server.ip, server.port)) 576 try: 577 otpNode = rest.add_node(master.rest_username, master.rest_password, server.ip, server.port) 578 msg = "unable to add node {0}:{1} to the cluster" 579 assert otpNode, msg.format(server.ip, server.port) 580 except ServerAlreadyJoinedException: 581 log.info("server {0} already joined".format(server)) 582 log.info("beginning rebalance in") 583 try: 584 rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()], ejectedNodes=[]) 585 except: 586 log.error("rebalance failed, trying again after {0} seconds".format(timeout)) 587 588 @staticmethod 589 def begin_rebalance_out(master, servers, timeout=5): 590 log = logger.Logger.get_logger() 591 rest = RestConnection(master) 592 593 master_node = rest.get_nodes_self() 594 595 allNodes = [] 596 ejectedNodes = [] 597 nodes = rest.node_statuses() 598 for server in servers: 599 server_node = RestConnection(server).get_nodes_self() 600 if server_node == master_node: 601 continue 602 log.info("removing node {0}:{1} from cluster".format(server_node.ip, server_node.port)) 603 for node in nodes: 604 if "{0}:{1}".format(node.ip, node.port) == "{0}:{1}".format(server_node.ip, server_node.port): 605 ejectedNodes.append(node.id) 606 log.info("beginning rebalance out") 607 try: 608 rest.rebalance(otpNodes=[node.id for node in nodes], ejectedNodes=ejectedNodes) 609 except: 610 log.error("rebalance failed, trying again after {0} seconds".format(timeout)) 611 612 @staticmethod 613 def end_rebalance(master): 614 log = logger.Logger.get_logger() 615 rest = RestConnection(master) 616 result = False 617 try: 618 result = rest.monitorRebalance() 619 except RebalanceFailedException as e: 620 log.error("rebalance failed: {0}".format(e)) 621 assert result, "rebalance operation failed after adding nodes" 622 log.info("rebalance finished") 623 624 @staticmethod 625 def getOtpNodeIds(master): 626 rest = RestConnection(master) 627 nodes = rest.node_statuses() 628 otpNodeIds = [node.id for node in nodes] 629 return otpNodeIds 630 631 @staticmethod 632 def verify_vBuckets_info(master, bucket="default"): 633 ''' 634 verify vBuckets' state and items count(for active/replica) in them related to vBucketMap for all nodes in cluster 635 ''' 636 awareness = VBucketAwareMemcached(RestConnection(master), bucket) 637 vb_map = awareness.vBucketMap 638 vb_mapReplica = awareness.vBucketMapReplica 639 replica_num = len(vb_mapReplica[0]) 640 641 #get state and count items for all vbuckets for each node 642 node_stats = RebalanceHelper.get_vBuckets_info(master) 643 state = True 644 #iterate throught all vbuckets by their numbers 645 for num in vb_map: 646 #verify that active vbucket in memcached is also active in stats("hash) 647 if(node_stats[vb_map[num]]["vb_" + str(num)][0] != "active"): 648 log.info("vBucket {0} in {1} node has wrong state {3}".format("vb_" + str(num), vb_map[num], node_stats[vb_map[num]]["vb_" + str(num)])); 649 state = False 650 #number of active items for num vBucket 651 vb = node_stats[vb_map[num]]["vb_" + str(num)][1] 652 active_vb = vb_map[num] 653 #list of nodes for wich num vBucket is replica 654 replica_vbs = vb_mapReplica[key] 655 sum_items_replica = 0 656 #sum of replica items for all nodes for num vBucket 657 for i in range(replica_num): 658 if(node_stats[vb_mapReplica[num][i]]["vb_" + str(num)][0] != "replica"): 659 log.info("vBucket {0} in {1} node has wrong state {3}".format("vb_" + str(num), vb_mapReplica[num], node_stats[vb_mapReplica[num]]["vb_" + str(num)])); 660 state = False 661 sum_items_replica += int(node_stats[replica_vbs[i]]["vb_" + str(num)][1]) 662 #print information about the discrepancy of the number of replica and active items for num vBucket 663 if (int(vb) * len(vb_mapReplica[num]) != sum_items_replica): 664 log.info("sum of active items doesn't correspond to replica's vBucets in {0} vBucket:".format("vb_" + str(num))) 665 log.info("items in active vBucket {0}:{1}".format(vb_map[num], node_stats[vb_map[num]]["vb_" + str(num)])) 666 for j in range(replica): 667 log.info("items in replica vBucket {0}: {1}".format(vb_mapReplica[num][j], node_stats[vb_mapReplica[num][j]]["vb_" + str(num)])) 668 log.info(node_stats[vb_mapReplica[num][0]]) 669 state = False 670 671 if not state: 672 log.error("Something is wrong, see log above. See details:") 673 log.error("vBucetMap: {0}".format(vb_map)) 674 log.error("vBucetReplicaMap: {0}".format(vb_mapReplica)) 675 log.error("node_stats: {0}".format(node_stats)) 676 return state 677 678 @staticmethod 679 def get_vBuckets_info(master): 680 """ 681 return state and count items for all vbuckets for each node 682 format: dict: {u'1node_ip1': {'vb_79': ['replica', '0'], 'vb_78': ['active', '0']..}, u'1node_ip1':....} 683 """ 684 rest = RestConnection(master) 685 port = rest.get_nodes_self().memcached 686 nodes = rest.node_statuses() 687 _nodes_stats = {} 688 for node in nodes: 689 stat = {} 690 buckets = [] 691 _server = {"ip": node.ip, "port": node.port, "username": master.rest_username, 692 "password": master.rest_password} 693 try: 694 buckets = rest.get_buckets() 695 mc = MemcachedClient(node.ip, port) 696 stat_hash = mc.stats("hash") 697 except Exception: 698 if not buckets: 699 log.error("There are not any buckets in {0}:{1} node".format(node.ip, node.port)) 700 else: 701 log.error("Impossible to get vBucket's information for {0}:{1} node".format(node.ip, node.port)) 702 _nodes_stats[node.ip + ":" + str(node.port)] 703 continue 704 mc.close() 705 vb_names = [key[:key.index(":")] for key in stat_hash.keys()] 706 707 for name in vb_names: 708 stat[name] = [stat_hash[name + ":state"], stat_hash[name + ":counted"]] 709 _nodes_stats[node.ip + ":" + str(port)] = stat 710 log.info(_nodes_stats) 711 return _nodes_stats 712 713 714 @staticmethod 715 def pick_node(master): 716 rest = RestConnection(master) 717 nodes = rest.node_statuses() 718 node_picked = None 719 nodes_on_same_ip = True 720 721 firstIp = nodes[0].ip 722 for node in nodes: 723 if node.ip != firstIp: 724 nodes_on_same_ip = False 725 break 726 727 for node in nodes: 728 node_picked = node 729 if not nodes_on_same_ip: 730 if node_picked.ip != master.ip: 731 log.info( 732 "Picked node ... {0}:{1}".format(node_picked.ip, node_picked.port)) 733 break 734 else: 735 # temp fix - port numbers of master(machine ip and localhost: 9000 match 736 if int(node_picked.port) == int( 737 master.port): 738 log.info("Not picking the master node {0}:{1}.. try again...".format(node_picked.ip, 739 node_picked.port)) 740 else: 741 log.info( 742 "Picked node {0}:{1}".format(node_picked.ip, node_picked.port)) 743 break 744 return node_picked 745 746 @staticmethod 747 def pick_nodes(master, howmany=1, target_node = None): 748 rest = RestConnection(master) 749 nodes = rest.node_statuses() 750 picked = [] 751 for node_for_stat in nodes: 752 if node_for_stat.ip != master.ip or str(node_for_stat.port) != master.port : 753 if target_node == None: 754 picked.append(node_for_stat) 755 elif target_node.ip == node_for_stat.ip: 756 picked.append(node_for_stat) 757 return picked 758 if len(picked) == howmany: 759 break 760 return picked 761