1import time 2import datetime 3import unittest 4from TestInput import TestInputSingleton 5import logger 6from couchbase_helper.cluster import Cluster 7from membase.api.rest_client import RestConnection, RestHelper 8from membase.helper.bucket_helper import BucketOperationHelper 9from membase.helper.cluster_helper import ClusterOperationHelper 10from membase.helper.rebalance_helper import RebalanceHelper 11from memcached.helper.data_helper import LoadWithMcsoda 12from threading import Thread 13from remote.remote_util import RemoteMachineShellConnection 14from memcached.helper.data_helper import MemcachedClientHelper 15from membase.api.exception import RebalanceFailedException 16from basetestcase import BaseTestCase 17 18class SwapRebalanceBase(unittest.TestCase): 19 20 @staticmethod 21 def common_setup(self): 22 self.log = logger.Logger.get_logger() 23 self.cluster_run = False 24 self.input = TestInputSingleton.input 25 self.servers = self.input.servers 26 serverInfo = self.servers[0] 27 rest = RestConnection(serverInfo) 28 if len(set([server.ip for server in self.servers])) == 1: 29 ip = rest.get_nodes_self().ip 30 for server in self.servers: 31 server.ip = ip 32 self.cluster_run = True 33 self.case_number = self.input.param("case_number", 0) 34 self.replica = self.input.param("replica", 1) 35 self.keys_count = self.input.param("keys-count", 1000) 36 self.load_ratio = self.input.param("load-ratio", 1) 37 self.ratio_expiry = self.input.param("ratio-expiry", 0.03) 38 self.ratio_deletes = self.input.param("ratio-deletes", 0.13) 39 self.num_buckets = self.input.param("num-buckets", 1) 40 self.failover_factor = self.num_swap = self.input.param("num-swap", 1) 41 self.num_initial_servers = self.input.param("num-initial-servers", 3) 42 self.fail_orchestrator = self.swap_orchestrator = self.input.param("swap-orchestrator", False) 43 self.do_access = self.input.param("do-access", True) 44 self.load_started = False 45 self.loaders = [] 46 try: 47 # Clear the state from Previous invalid run 48 if rest._rebalance_progress_status() == 'running': 49 self.log.warning("rebalancing is still running, previous test should be verified") 50 stopped = rest.stop_rebalance() 51 self.assertTrue(stopped, msg="unable to stop rebalance") 52 self.log.info("============== SwapRebalanceBase setup was started for test #{0} {1}=============="\ 53 .format(self.case_number, self._testMethodName)) 54 SwapRebalanceBase.reset(self) 55 self.cluster_helper = Cluster() 56 57 # Make sure the test is setup correctly 58 min_servers = int(self.num_initial_servers) + int(self.num_swap) 59 msg = "minimum {0} nodes required for running swap rebalance" 60 self.assertTrue(len(self.servers) >= min_servers, msg=msg.format(min_servers)) 61 62 self.log.info('picking server : {0} as the master'.format(serverInfo)) 63 node_ram_ratio = BucketOperationHelper.base_bucket_ratio(self.servers) 64 info = rest.get_nodes_self() 65 rest.init_cluster(username=serverInfo.rest_username, password=serverInfo.rest_password) 66 rest.init_cluster_memoryQuota(memoryQuota=int(info.mcdMemoryReserved * node_ram_ratio)) 67 if self.num_buckets > 10: 68 BaseTestCase.change_max_buckets(self, self.num_buckets) 69 self.log.info("============== SwapRebalanceBase setup was finished for test #{0} {1} ==============" 70 .format(self.case_number, self._testMethodName)) 71 SwapRebalanceBase._log_start(self) 72 except Exception, e: 73 self.cluster_helper.shutdown() 74 self.fail(e) 75 76 @staticmethod 77 def common_tearDown(self): 78 self.cluster_helper.shutdown() 79 test_failed = (hasattr(self, '_resultForDoCleanups') and len(self._resultForDoCleanups.failures or self._resultForDoCleanups.errors)) \ 80 or (hasattr(self, '_exc_info') and self._exc_info()[1] is not None) 81 if test_failed and TestInputSingleton.input.param("stop-on-failure", False)\ 82 or self.input.param("skip_cleanup", False): 83 self.log.warn("CLEANUP WAS SKIPPED") 84 else: 85 SwapRebalanceBase.reset(self) 86 SwapRebalanceBase._log_finish(self) 87 88 @staticmethod 89 def reset(self): 90 self.log.info("============== SwapRebalanceBase cleanup was started for test #{0} {1} =============="\ 91 .format(self.case_number, self._testMethodName)) 92 self.log.info("Stopping load in Teardown") 93 SwapRebalanceBase.stop_load(self.loaders) 94 for server in self.servers: 95 rest = RestConnection(server) 96 if rest._rebalance_progress_status() == 'running': 97 self.log.warning("rebalancing is still running, test should be verified") 98 stopped = rest.stop_rebalance() 99 self.assertTrue(stopped, msg="unable to stop rebalance") 100 BucketOperationHelper.delete_all_buckets_or_assert(self.servers, self) 101 for server in self.servers: 102 ClusterOperationHelper.cleanup_cluster([server]) 103 if server.data_path: 104 rest = RestConnection(server) 105 rest.set_data_path(data_path=server.data_path) 106 ClusterOperationHelper.wait_for_ns_servers_or_assert(self.servers, self) 107 self.log.info("============== SwapRebalanceBase cleanup was finished for test #{0} {1} =============="\ 108 .format(self.case_number, self._testMethodName)) 109 110 @staticmethod 111 def _log_start(self): 112 try: 113 msg = "{0} : {1} started ".format(datetime.datetime.now(), self._testMethodName) 114 RestConnection(self.servers[0]).log_client_error(msg) 115 except: 116 pass 117 118 @staticmethod 119 def _log_finish(self): 120 try: 121 msg = "{0} : {1} finished ".format(datetime.datetime.now(), self._testMethodName) 122 RestConnection(self.servers[0]).log_client_error(msg) 123 except: 124 pass 125 126 @staticmethod 127 def sleep(self, timeout=1, message=""): 128 self.log.info("sleep for {0} secs. {1} ...".format(timeout, message)) 129 time.sleep(timeout) 130 131 @staticmethod 132 def _create_default_bucket(self, replica=1): 133 name = "default" 134 master = self.servers[0] 135 rest = RestConnection(master) 136 helper = RestHelper(RestConnection(master)) 137 if not helper.bucket_exists(name): 138 node_ram_ratio = BucketOperationHelper.base_bucket_ratio(self.servers) 139 info = rest.get_nodes_self() 140 available_ram = info.memoryQuota * node_ram_ratio 141 rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram), replicaNumber=replica) 142 ready = BucketOperationHelper.wait_for_memcached(master, name) 143 self.assertTrue(ready, msg="wait_for_memcached failed") 144 self.assertTrue(helper.bucket_exists(name), 145 msg="unable to create {0} bucket".format(name)) 146 147 @staticmethod 148 def _create_multiple_buckets(self, replica=1): 149 master = self.servers[0] 150 created = BucketOperationHelper.create_multiple_buckets(master, replica, howmany=self.num_buckets) 151 self.assertTrue(created, "unable to create multiple buckets") 152 153 rest = RestConnection(master) 154 buckets = rest.get_buckets() 155 for bucket in buckets: 156 ready = BucketOperationHelper.wait_for_memcached(master, bucket.name) 157 self.assertTrue(ready, msg="wait_for_memcached failed") 158 159 # Used for items verification active vs. replica 160 @staticmethod 161 def items_verification(test, master): 162 rest = RestConnection(master) 163 # Verify items count across all node 164 timeout = 600 165 for bucket in rest.get_buckets(): 166 verified = RebalanceHelper.wait_till_total_numbers_match(master, bucket.name, timeout_in_seconds=timeout) 167 test.assertTrue(verified, "Lost items!!.. failing test in {0} secs".format(timeout)) 168 169 @staticmethod 170 def start_load_phase(self, master): 171 loaders = [] 172 rest = RestConnection(master) 173 for bucket in rest.get_buckets(): 174 loader = dict() 175 loader["mcsoda"] = LoadWithMcsoda(master, self.keys_count, bucket=bucket.name, 176 rest_password=master.rest_password, prefix=str(bucket.name), port=8091) 177 loader["mcsoda"].cfg["exit-after-creates"] = 1 178 loader["mcsoda"].cfg["json"] = 0 179 loader["thread"] = Thread(target=loader["mcsoda"].load_data, name='mcloader_' + bucket.name) 180 loader["thread"].daemon = True 181 loaders.append(loader) 182 for loader in loaders: 183 loader["thread"].start() 184 return loaders 185 186 @staticmethod 187 def start_access_phase(self, master): 188 loaders = [] 189 rest = RestConnection(master) 190 for bucket in rest.get_buckets(): 191 loader = dict() 192 loader["mcsoda"] = LoadWithMcsoda(master, self.keys_count / 2, bucket=bucket.name, 193 rest_password=master.rest_password, prefix=str(bucket.name), port=8091) 194 loader["mcsoda"].cfg["ratio-sets"] = 0.8 195 loader["mcsoda"].cfg["ratio-hot"] = 0.2 196 loader["mcsoda"].cfg["ratio-creates"] = 0.5 197 loader["mcsoda"].cfg["ratio-deletes"] = self.ratio_deletes 198 loader["mcsoda"].cfg["ratio-expirations"] = self.ratio_expiry 199 loader["mcsoda"].cfg["json"] = 0 200 loader["thread"] = Thread(target=loader["mcsoda"].load_data, name='mcloader_' + bucket.name) 201 loader["thread"].daemon = True 202 loaders.append(loader) 203 for loader in loaders: 204 loader["thread"].start() 205 return loaders 206 207 @staticmethod 208 def stop_load(loaders, do_stop=True): 209 if do_stop: 210 for loader in loaders: 211 loader["mcsoda"].load_stop() 212 for loader in loaders: 213 if do_stop: 214 loader["thread"].join(300) 215 else: 216 loader["thread"].join() 217 218 @staticmethod 219 def create_buckets(self): 220 if self.num_buckets == 1: 221 SwapRebalanceBase._create_default_bucket(self, replica=self.replica) 222 else: 223 SwapRebalanceBase._create_multiple_buckets(self, replica=self.replica) 224 225 @staticmethod 226 def verification_phase(test, master): 227 # Stop loaders 228 SwapRebalanceBase.stop_load(test.loaders) 229 test.log.info("DONE DATA ACCESS PHASE") 230 231 test.log.info("VERIFICATION PHASE") 232 rest = RestConnection(master) 233 servers_in_cluster = [] 234 nodes = rest.get_nodes() 235 for server in test.servers: 236 for node in nodes: 237 if node.ip == server.ip and node.port == server.port: 238 servers_in_cluster.append(server) 239 RebalanceHelper.wait_for_replication(servers_in_cluster, test.cluster_helper) 240 SwapRebalanceBase.items_verification(test, master) 241 242 @staticmethod 243 def _common_test_body_swap_rebalance(self, do_stop_start=False): 244 master = self.servers[0] 245 rest = RestConnection(master) 246 num_initial_servers = self.num_initial_servers 247 creds = self.input.membase_settings 248 intial_severs = self.servers[:num_initial_servers] 249 250 self.log.info("CREATE BUCKET PHASE") 251 SwapRebalanceBase.create_buckets(self) 252 253 # Cluster all starting set of servers 254 self.log.info("INITIAL REBALANCE PHASE") 255 status, servers_rebalanced = RebalanceHelper.rebalance_in(intial_severs, len(intial_severs) - 1) 256 self.assertTrue(status, msg="Rebalance was failed") 257 258 self.log.info("DATA LOAD PHASE") 259 self.loaders = SwapRebalanceBase.start_load_phase(self, master) 260 261 # Wait till load phase is over 262 SwapRebalanceBase.stop_load(self.loaders, do_stop=False) 263 self.log.info("DONE LOAD PHASE") 264 265 # Start the swap rebalance 266 current_nodes = RebalanceHelper.getOtpNodeIds(master) 267 self.log.info("current nodes : {0}".format(current_nodes)) 268 toBeEjectedNodes = RebalanceHelper.pick_nodes(master, howmany=self.num_swap) 269 optNodesIds = [node.id for node in toBeEjectedNodes] 270 271 if self.swap_orchestrator: 272 status, content = ClusterOperationHelper.find_orchestrator(master) 273 self.assertTrue(status, msg="Unable to find orchestrator: {0}:{1}".\ 274 format(status, content)) 275 if self.num_swap is len(current_nodes): 276 optNodesIds.append(content) 277 else: 278 optNodesIds[0] = content 279 280 for node in optNodesIds: 281 self.log.info("removing node {0} and rebalance afterwards".format(node)) 282 283 new_swap_servers = self.servers[num_initial_servers:num_initial_servers + self.num_swap] 284 for server in new_swap_servers: 285 otpNode = rest.add_node(creds.rest_username, creds.rest_password, server.ip, server.port) 286 msg = "unable to add node {0} to the cluster" 287 self.assertTrue(otpNode, msg.format(server.ip)) 288 289 if self.swap_orchestrator: 290 rest = RestConnection(new_swap_servers[0]) 291 master = new_swap_servers[0] 292 293 if self.do_access: 294 self.log.info("DATA ACCESS PHASE") 295 self.loaders = SwapRebalanceBase.start_access_phase(self, master) 296 297 self.log.info("SWAP REBALANCE PHASE") 298 rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()], 299 ejectedNodes=optNodesIds) 300 301 if do_stop_start: 302 # Rebalance is stopped at 20%, 40% and 60% completion 303 retry = 0 304 for expected_progress in (20, 40, 60): 305 self.log.info("STOP/START SWAP REBALANCE PHASE WITH PROGRESS {0}%". 306 format(expected_progress)) 307 while True: 308 progress = rest._rebalance_progress() 309 if progress < 0: 310 self.log.error("rebalance progress code : {0}".format(progress)) 311 break 312 elif progress == 100: 313 self.log.warn("Rebalance has already reached 100%") 314 break 315 elif progress >= expected_progress: 316 self.log.info("Rebalance will be stopped with {0}%".format(progress)) 317 stopped = rest.stop_rebalance() 318 self.assertTrue(stopped, msg="unable to stop rebalance") 319 SwapRebalanceBase.sleep(self, 20) 320 rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()], 321 ejectedNodes=optNodesIds) 322 break 323 elif retry > 100: 324 break 325 else: 326 retry += 1 327 SwapRebalanceBase.sleep(self, 1) 328 self.assertTrue(rest.monitorRebalance(), 329 msg="rebalance operation failed after adding node {0}".format(optNodesIds)) 330 SwapRebalanceBase.verification_phase(self, master) 331 332 @staticmethod 333 def _common_test_body_failed_swap_rebalance(self): 334 master = self.servers[0] 335 rest = RestConnection(master) 336 num_initial_servers = self.num_initial_servers 337 creds = self.input.membase_settings 338 intial_severs = self.servers[:num_initial_servers] 339 340 self.log.info("CREATE BUCKET PHASE") 341 SwapRebalanceBase.create_buckets(self) 342 343 # Cluster all starting set of servers 344 self.log.info("INITIAL REBALANCE PHASE") 345 status, servers_rebalanced = RebalanceHelper.rebalance_in(intial_severs, len(intial_severs) - 1) 346 self.assertTrue(status, msg="Rebalance was failed") 347 348 self.log.info("DATA LOAD PHASE") 349 self.loaders = SwapRebalanceBase.start_load_phase(self, master) 350 351 # Wait till load phase is over 352 SwapRebalanceBase.stop_load(self.loaders, do_stop=False) 353 self.log.info("DONE LOAD PHASE") 354 355 # Start the swap rebalance 356 current_nodes = RebalanceHelper.getOtpNodeIds(master) 357 self.log.info("current nodes : {0}".format(current_nodes)) 358 toBeEjectedNodes = RebalanceHelper.pick_nodes(master, howmany=self.num_swap) 359 optNodesIds = [node.id for node in toBeEjectedNodes] 360 if self.swap_orchestrator: 361 status, content = ClusterOperationHelper.find_orchestrator(master) 362 self.assertTrue(status, msg="Unable to find orchestrator: {0}:{1}".\ 363 format(status, content)) 364 # When swapping all the nodes 365 if self.num_swap is len(current_nodes): 366 optNodesIds.append(content) 367 else: 368 optNodesIds[0] = content 369 370 for node in optNodesIds: 371 self.log.info("removing node {0} and rebalance afterwards".format(node)) 372 373 new_swap_servers = self.servers[num_initial_servers:num_initial_servers + self.num_swap] 374 for server in new_swap_servers: 375 otpNode = rest.add_node(creds.rest_username, creds.rest_password, server.ip, server.port) 376 msg = "unable to add node {0} to the cluster" 377 self.assertTrue(otpNode, msg.format(server.ip)) 378 379 if self.swap_orchestrator: 380 rest = RestConnection(new_swap_servers[0]) 381 master = new_swap_servers[0] 382 383 self.log.info("DATA ACCESS PHASE") 384 self.loaders = SwapRebalanceBase.start_access_phase(self, master) 385 386 self.log.info("SWAP REBALANCE PHASE") 387 rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()], 388 ejectedNodes=optNodesIds) 389 SwapRebalanceBase.sleep(self, 10, "Rebalance should start") 390 self.log.info("FAIL SWAP REBALANCE PHASE @ {0}".format(self.percentage_progress)) 391 reached = RestHelper(rest).rebalance_reached(self.percentage_progress) 392 if reached and RestHelper(rest).is_cluster_rebalanced(): 393 # handle situation when rebalance failed at the beginning 394 self.log.error('seems rebalance failed!') 395 rest.print_UI_logs() 396 self.fail("rebalance failed even before killing memcached") 397 bucket = rest.get_buckets()[0].name 398 pid = None 399 if self.swap_orchestrator and not self.cluster_run: 400 # get PID via remote connection if master is a new node 401 shell = RemoteMachineShellConnection(master) 402 o, _ = shell.execute_command("ps -eo comm,pid | awk '$1 == \"memcached\" { print $2 }'") 403 pid = o[0] 404 shell.disconnect() 405 else: 406 times = 2 407 if self.cluster_run: 408 times = 20 409 for i in xrange(times): 410 try: 411 _mc = MemcachedClientHelper.direct_client(master, bucket) 412 pid = _mc.stats()["pid"] 413 break 414 except EOFError as e: 415 self.log.error("{0}.Retry in 2 sec".format(e)) 416 SwapRebalanceBase.sleep(self, 2) 417 if pid is None: 418 self.fail("impossible to get a PID") 419 command = "os:cmd(\"kill -9 {0} \")".format(pid) 420 self.log.info(command) 421 killed = rest.diag_eval(command) 422 self.log.info("killed {0}:{1}?? {2} ".format(master.ip, master.port, killed)) 423 self.log.info("sleep for 10 sec after kill memcached") 424 SwapRebalanceBase.sleep(self, 10) 425 # we can't get stats for new node when rebalance falls 426 if not self.swap_orchestrator: 427 ClusterOperationHelper._wait_warmup_completed(self, [master], bucket, wait_time=600) 428 i = 0 429 # we expect that rebalance will be failed 430 try: 431 rest.monitorRebalance() 432 except RebalanceFailedException: 433 # retry rebalance if it failed 434 self.log.warn("Rebalance failed but it's expected") 435 SwapRebalanceBase.sleep(self, 30) 436 self.assertFalse(RestHelper(rest).is_cluster_rebalanced(), msg="cluster need rebalance") 437 knownNodes = rest.node_statuses(); 438 self.log.info("nodes are still in cluster: {0}".format([(node.ip, node.port) for node in knownNodes])) 439 ejectedNodes = list(set(optNodesIds) & set([node.id for node in knownNodes])) 440 rest.rebalance(otpNodes=[node.id for node in knownNodes], ejectedNodes=ejectedNodes) 441 self.assertTrue(rest.monitorRebalance(), 442 msg="rebalance operation failed after adding node {0}".format(toBeEjectedNodes)) 443 else: 444 self.log.info("rebalance completed successfully") 445 SwapRebalanceBase.verification_phase(self, master) 446 447 @staticmethod 448 def _add_back_failed_node(self, do_node_cleanup=False): 449 master = self.servers[0] 450 rest = RestConnection(master) 451 creds = self.input.membase_settings 452 453 self.log.info("CREATE BUCKET PHASE") 454 SwapRebalanceBase.create_buckets(self) 455 456 # Cluster all servers 457 self.log.info("INITIAL REBALANCE PHASE") 458 status, servers_rebalanced = RebalanceHelper.rebalance_in(self.servers, len(self.servers) - 1) 459 self.assertTrue(status, msg="Rebalance was failed") 460 461 self.log.info("DATA LOAD PHASE") 462 self.loaders = SwapRebalanceBase.start_load_phase(self, master) 463 464 # Wait till load phase is over 465 SwapRebalanceBase.stop_load(self.loaders, do_stop=False) 466 self.log.info("DONE LOAD PHASE") 467 468 # Start the swap rebalance 469 current_nodes = RebalanceHelper.getOtpNodeIds(master) 470 self.log.info("current nodes : {0}".format(current_nodes)) 471 toBeEjectedNodes = RebalanceHelper.pick_nodes(master, howmany=self.failover_factor) 472 optNodesIds = [node.id for node in toBeEjectedNodes] 473 474 # List of servers that will not be failed over 475 not_failed_over = [] 476 for server in self.servers: 477 if self.cluster_run: 478 if server.port not in [node.port for node in toBeEjectedNodes]: 479 not_failed_over.append(server) 480 self.log.info("Node {0}:{1} not failed over".format(server.ip, server.port)) 481 else: 482 if server.ip not in [node.ip for node in toBeEjectedNodes]: 483 not_failed_over.append(server) 484 self.log.info("Node {0}:{1} not failed over".format(server.ip, server.port)) 485 486 if self.fail_orchestrator: 487 status, content = ClusterOperationHelper.find_orchestrator(master) 488 self.assertTrue(status, msg="Unable to find orchestrator: {0}:{1}".\ 489 format(status, content)) 490 # When swapping all the nodes 491 if self.num_swap is len(current_nodes): 492 optNodesIds.append(content) 493 else: 494 optNodesIds[0] = content 495 master = not_failed_over[-1] 496 497 self.log.info("DATA ACCESS PHASE") 498 self.loaders = SwapRebalanceBase.start_access_phase(self, master) 499 500 # Failover selected nodes 501 for node in optNodesIds: 502 self.log.info("failover node {0} and rebalance afterwards".format(node)) 503 rest.fail_over(node) 504 505 rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()], \ 506 ejectedNodes=optNodesIds) 507 508 self.assertTrue(rest.monitorRebalance(), 509 msg="rebalance operation failed after adding node {0}".format(optNodesIds)) 510 511 # Add back the same failed over nodes 512 513 # Cleanup the node, somehow 514 # TODO: cluster_run? 515 if do_node_cleanup: 516 pass 517 518 # Make rest connection with node part of cluster 519 rest = RestConnection(master) 520 521 # Given the optNode, find ip 522 add_back_servers = [] 523 nodes = rest.get_nodes() 524 for server in nodes: 525 if isinstance(server.ip, unicode): 526 add_back_servers.append(server) 527 final_add_back_servers = [] 528 for server in self.servers: 529 if self.cluster_run: 530 if server.port not in [serv.port for serv in add_back_servers]: 531 final_add_back_servers.append(server) 532 else: 533 if server.ip not in [serv.ip for serv in add_back_servers]: 534 final_add_back_servers.append(server) 535 for server in final_add_back_servers: 536 otpNode = rest.add_node(creds.rest_username, creds.rest_password, server.ip, server.port) 537 msg = "unable to add node {0} to the cluster" 538 self.assertTrue(otpNode, msg.format(server.ip)) 539 540 rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()], ejectedNodes=[]) 541 542 self.assertTrue(rest.monitorRebalance(), 543 msg="rebalance operation failed after adding node {0}".format(add_back_servers)) 544 545 SwapRebalanceBase.verification_phase(self, master) 546 547 @staticmethod 548 def _failover_swap_rebalance(self): 549 master = self.servers[0] 550 rest = RestConnection(master) 551 creds = self.input.membase_settings 552 num_initial_servers = self.num_initial_servers 553 intial_severs = self.servers[:num_initial_servers] 554 555 self.log.info("CREATE BUCKET PHASE") 556 SwapRebalanceBase.create_buckets(self) 557 558 # Cluster all starting set of servers 559 self.log.info("INITIAL REBALANCE PHASE") 560 status, servers_rebalanced = RebalanceHelper.rebalance_in(intial_severs, len(intial_severs) - 1) 561 self.assertTrue(status, msg="Rebalance was failed") 562 563 self.log.info("DATA LOAD PHASE") 564 self.loaders = SwapRebalanceBase.start_load_phase(self, master) 565 566 # Wait till load phase is over 567 SwapRebalanceBase.stop_load(self.loaders, do_stop=False) 568 self.log.info("DONE LOAD PHASE") 569 570 # Start the swap rebalance 571 self.log.info("current nodes : {0}".format(RebalanceHelper.getOtpNodeIds(master))) 572 toBeEjectedNodes = RebalanceHelper.pick_nodes(master, howmany=self.failover_factor) 573 optNodesIds = [node.id for node in toBeEjectedNodes] 574 if self.fail_orchestrator: 575 status, content = ClusterOperationHelper.find_orchestrator(master) 576 self.assertTrue(status, msg="Unable to find orchestrator: {0}:{1}".\ 577 format(status, content)) 578 optNodesIds[0] = content 579 580 self.log.info("FAILOVER PHASE") 581 # Failover selected nodes 582 for node in optNodesIds: 583 self.log.info("failover node {0} and rebalance afterwards".format(node)) 584 rest.fail_over(node) 585 586 new_swap_servers = self.servers[num_initial_servers:num_initial_servers + self.failover_factor] 587 for server in new_swap_servers: 588 otpNode = rest.add_node(creds.rest_username, creds.rest_password, server.ip, server.port) 589 msg = "unable to add node {0} to the cluster" 590 self.assertTrue(otpNode, msg.format(server.ip)) 591 592 if self.fail_orchestrator: 593 rest = RestConnection(new_swap_servers[0]) 594 master = new_swap_servers[0] 595 596 self.log.info("DATA ACCESS PHASE") 597 self.loaders = SwapRebalanceBase.start_access_phase(self, master) 598 599 rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()], \ 600 ejectedNodes=optNodesIds) 601 602 self.assertTrue(rest.monitorRebalance(), 603 msg="rebalance operation failed after adding node {0}".format(new_swap_servers)) 604 605 SwapRebalanceBase.verification_phase(self, master) 606 607class SwapRebalanceBasicTests(unittest.TestCase): 608 609 def setUp(self): 610 SwapRebalanceBase.common_setup(self) 611 612 def tearDown(self): 613 SwapRebalanceBase.common_tearDown(self) 614 615 def do_test(self): 616 SwapRebalanceBase._common_test_body_swap_rebalance(self, do_stop_start=False) 617 618class SwapRebalanceStartStopTests(unittest.TestCase): 619 620 def setUp(self): 621 SwapRebalanceBase.common_setup(self) 622 623 def tearDown(self): 624 SwapRebalanceBase.common_tearDown(self) 625 626 def do_test(self): 627 SwapRebalanceBase._common_test_body_swap_rebalance(self, do_stop_start=True) 628 629class SwapRebalanceFailedTests(unittest.TestCase): 630 631 def setUp(self): 632 SwapRebalanceBase.common_setup(self) 633 634 def tearDown(self): 635 SwapRebalanceBase.common_tearDown(self) 636 637 def test_failed_swap_rebalance(self): 638 self.percentage_progress = self.input.param("percentage_progress", 50) 639 SwapRebalanceBase._common_test_body_failed_swap_rebalance(self) 640 641 # Not cluster_run friendly, yet 642 def test_add_back_failed_node(self): 643 SwapRebalanceBase._add_back_failed_node(self, do_node_cleanup=False) 644 645 def test_failover_swap_rebalance(self): 646 SwapRebalanceBase._failover_swap_rebalance(self) 647