1import time
2import datetime
3import unittest
4from TestInput import TestInputSingleton
5import logger
6from couchbase_helper.cluster import Cluster
7from membase.api.rest_client import RestConnection, RestHelper
8from membase.helper.bucket_helper import BucketOperationHelper
9from membase.helper.cluster_helper import ClusterOperationHelper
10from membase.helper.rebalance_helper import RebalanceHelper
11from memcached.helper.data_helper import LoadWithMcsoda
12from threading import Thread
13from remote.remote_util import RemoteMachineShellConnection
14from memcached.helper.data_helper import MemcachedClientHelper
15from membase.api.exception import RebalanceFailedException
16from basetestcase import BaseTestCase
17
18class SwapRebalanceBase(unittest.TestCase):
19
20    @staticmethod
21    def common_setup(self):
22        self.log = logger.Logger.get_logger()
23        self.cluster_run = False
24        self.input = TestInputSingleton.input
25        self.servers = self.input.servers
26        serverInfo = self.servers[0]
27        rest = RestConnection(serverInfo)
28        if len(set([server.ip for server in self.servers])) == 1:
29            ip = rest.get_nodes_self().ip
30            for server in self.servers:
31                server.ip = ip
32            self.cluster_run = True
33        self.case_number = self.input.param("case_number", 0)
34        self.replica = self.input.param("replica", 1)
35        self.keys_count = self.input.param("keys-count", 1000)
36        self.load_ratio = self.input.param("load-ratio", 1)
37        self.ratio_expiry = self.input.param("ratio-expiry", 0.03)
38        self.ratio_deletes = self.input.param("ratio-deletes", 0.13)
39        self.num_buckets = self.input.param("num-buckets", 1)
40        self.failover_factor = self.num_swap = self.input.param("num-swap", 1)
41        self.num_initial_servers = self.input.param("num-initial-servers", 3)
42        self.fail_orchestrator = self.swap_orchestrator = self.input.param("swap-orchestrator", False)
43        self.do_access = self.input.param("do-access", True)
44        self.load_started = False
45        self.loaders = []
46        try:
47            # Clear the state from Previous invalid run
48            if rest._rebalance_progress_status() == 'running':
49                self.log.warning("rebalancing is still running, previous test should be verified")
50                stopped = rest.stop_rebalance()
51                self.assertTrue(stopped, msg="unable to stop rebalance")
52            self.log.info("==============  SwapRebalanceBase setup was started for test #{0} {1}=============="\
53                      .format(self.case_number, self._testMethodName))
54            SwapRebalanceBase.reset(self)
55            self.cluster_helper = Cluster()
56
57            # Make sure the test is setup correctly
58            min_servers = int(self.num_initial_servers) + int(self.num_swap)
59            msg = "minimum {0} nodes required for running swap rebalance"
60            self.assertTrue(len(self.servers) >= min_servers, msg=msg.format(min_servers))
61
62            self.log.info('picking server : {0} as the master'.format(serverInfo))
63            node_ram_ratio = BucketOperationHelper.base_bucket_ratio(self.servers)
64            info = rest.get_nodes_self()
65            rest.init_cluster(username=serverInfo.rest_username, password=serverInfo.rest_password)
66            rest.init_cluster_memoryQuota(memoryQuota=int(info.mcdMemoryReserved * node_ram_ratio))
67            if self.num_buckets > 10:
68                BaseTestCase.change_max_buckets(self, self.num_buckets)
69            self.log.info("==============  SwapRebalanceBase setup was finished for test #{0} {1} =============="
70                      .format(self.case_number, self._testMethodName))
71            SwapRebalanceBase._log_start(self)
72        except Exception, e:
73            self.cluster_helper.shutdown()
74            self.fail(e)
75
76    @staticmethod
77    def common_tearDown(self):
78        self.cluster_helper.shutdown()
79        test_failed = (hasattr(self, '_resultForDoCleanups') and len(self._resultForDoCleanups.failures or self._resultForDoCleanups.errors)) \
80                   or (hasattr(self, '_exc_info') and self._exc_info()[1] is not None)
81        if test_failed and TestInputSingleton.input.param("stop-on-failure", False)\
82                        or self.input.param("skip_cleanup", False):
83                    self.log.warn("CLEANUP WAS SKIPPED")
84        else:
85            SwapRebalanceBase.reset(self)
86            SwapRebalanceBase._log_finish(self)
87
88    @staticmethod
89    def reset(self):
90        self.log.info("==============  SwapRebalanceBase cleanup was started for test #{0} {1} =============="\
91                          .format(self.case_number, self._testMethodName))
92        self.log.info("Stopping load in Teardown")
93        SwapRebalanceBase.stop_load(self.loaders)
94        for server in self.servers:
95            rest = RestConnection(server)
96            if rest._rebalance_progress_status() == 'running':
97                self.log.warning("rebalancing is still running, test should be verified")
98                stopped = rest.stop_rebalance()
99                self.assertTrue(stopped, msg="unable to stop rebalance")
100        BucketOperationHelper.delete_all_buckets_or_assert(self.servers, self)
101        for server in self.servers:
102            ClusterOperationHelper.cleanup_cluster([server])
103            if server.data_path:
104                rest = RestConnection(server)
105                rest.set_data_path(data_path=server.data_path)
106        ClusterOperationHelper.wait_for_ns_servers_or_assert(self.servers, self)
107        self.log.info("==============  SwapRebalanceBase cleanup was finished for test #{0} {1} =============="\
108                          .format(self.case_number, self._testMethodName))
109
110    @staticmethod
111    def _log_start(self):
112        try:
113            msg = "{0} : {1} started ".format(datetime.datetime.now(), self._testMethodName)
114            RestConnection(self.servers[0]).log_client_error(msg)
115        except:
116            pass
117
118    @staticmethod
119    def _log_finish(self):
120        try:
121            msg = "{0} : {1} finished ".format(datetime.datetime.now(), self._testMethodName)
122            RestConnection(self.servers[0]).log_client_error(msg)
123        except:
124            pass
125
126    @staticmethod
127    def sleep(self, timeout=1, message=""):
128        self.log.info("sleep for {0} secs. {1} ...".format(timeout, message))
129        time.sleep(timeout)
130
131    @staticmethod
132    def _create_default_bucket(self, replica=1):
133        name = "default"
134        master = self.servers[0]
135        rest = RestConnection(master)
136        helper = RestHelper(RestConnection(master))
137        if not helper.bucket_exists(name):
138            node_ram_ratio = BucketOperationHelper.base_bucket_ratio(self.servers)
139            info = rest.get_nodes_self()
140            available_ram = info.memoryQuota * node_ram_ratio
141            rest.create_bucket(bucket=name, ramQuotaMB=int(available_ram), replicaNumber=replica)
142            ready = BucketOperationHelper.wait_for_memcached(master, name)
143            self.assertTrue(ready, msg="wait_for_memcached failed")
144        self.assertTrue(helper.bucket_exists(name),
145            msg="unable to create {0} bucket".format(name))
146
147    @staticmethod
148    def _create_multiple_buckets(self, replica=1):
149        master = self.servers[0]
150        created = BucketOperationHelper.create_multiple_buckets(master, replica, howmany=self.num_buckets)
151        self.assertTrue(created, "unable to create multiple buckets")
152
153        rest = RestConnection(master)
154        buckets = rest.get_buckets()
155        for bucket in buckets:
156            ready = BucketOperationHelper.wait_for_memcached(master, bucket.name)
157            self.assertTrue(ready, msg="wait_for_memcached failed")
158
159    # Used for items verification active vs. replica
160    @staticmethod
161    def items_verification(test, master):
162        rest = RestConnection(master)
163        # Verify items count across all node
164        timeout = 600
165        for bucket in rest.get_buckets():
166            verified = RebalanceHelper.wait_till_total_numbers_match(master, bucket.name, timeout_in_seconds=timeout)
167            test.assertTrue(verified, "Lost items!!.. failing test in {0} secs".format(timeout))
168
169    @staticmethod
170    def start_load_phase(self, master):
171        loaders = []
172        rest = RestConnection(master)
173        for bucket in rest.get_buckets():
174            loader = dict()
175            loader["mcsoda"] = LoadWithMcsoda(master, self.keys_count, bucket=bucket.name,
176                                rest_password=master.rest_password, prefix=str(bucket.name), port=8091)
177            loader["mcsoda"].cfg["exit-after-creates"] = 1
178            loader["mcsoda"].cfg["json"] = 0
179            loader["thread"] = Thread(target=loader["mcsoda"].load_data, name='mcloader_' + bucket.name)
180            loader["thread"].daemon = True
181            loaders.append(loader)
182        for loader in loaders:
183            loader["thread"].start()
184        return loaders
185
186    @staticmethod
187    def start_access_phase(self, master):
188        loaders = []
189        rest = RestConnection(master)
190        for bucket in rest.get_buckets():
191            loader = dict()
192            loader["mcsoda"] = LoadWithMcsoda(master, self.keys_count / 2, bucket=bucket.name,
193                    rest_password=master.rest_password, prefix=str(bucket.name), port=8091)
194            loader["mcsoda"].cfg["ratio-sets"] = 0.8
195            loader["mcsoda"].cfg["ratio-hot"] = 0.2
196            loader["mcsoda"].cfg["ratio-creates"] = 0.5
197            loader["mcsoda"].cfg["ratio-deletes"] = self.ratio_deletes
198            loader["mcsoda"].cfg["ratio-expirations"] = self.ratio_expiry
199            loader["mcsoda"].cfg["json"] = 0
200            loader["thread"] = Thread(target=loader["mcsoda"].load_data, name='mcloader_' + bucket.name)
201            loader["thread"].daemon = True
202            loaders.append(loader)
203        for loader in loaders:
204            loader["thread"].start()
205        return loaders
206
207    @staticmethod
208    def stop_load(loaders, do_stop=True):
209        if do_stop:
210            for loader in loaders:
211                loader["mcsoda"].load_stop()
212        for loader in loaders:
213            if do_stop:
214                loader["thread"].join(300)
215            else:
216                loader["thread"].join()
217
218    @staticmethod
219    def create_buckets(self):
220        if self.num_buckets == 1:
221            SwapRebalanceBase._create_default_bucket(self, replica=self.replica)
222        else:
223            SwapRebalanceBase._create_multiple_buckets(self, replica=self.replica)
224
225    @staticmethod
226    def verification_phase(test, master):
227        # Stop loaders
228        SwapRebalanceBase.stop_load(test.loaders)
229        test.log.info("DONE DATA ACCESS PHASE")
230
231        test.log.info("VERIFICATION PHASE")
232        rest = RestConnection(master)
233        servers_in_cluster = []
234        nodes = rest.get_nodes()
235        for server in test.servers:
236            for node in nodes:
237                if node.ip == server.ip and node.port == server.port:
238                    servers_in_cluster.append(server)
239        RebalanceHelper.wait_for_replication(servers_in_cluster, test.cluster_helper)
240        SwapRebalanceBase.items_verification(test, master)
241
242    @staticmethod
243    def _common_test_body_swap_rebalance(self, do_stop_start=False):
244        master = self.servers[0]
245        rest = RestConnection(master)
246        num_initial_servers = self.num_initial_servers
247        creds = self.input.membase_settings
248        intial_severs = self.servers[:num_initial_servers]
249
250        self.log.info("CREATE BUCKET PHASE")
251        SwapRebalanceBase.create_buckets(self)
252
253        # Cluster all starting set of servers
254        self.log.info("INITIAL REBALANCE PHASE")
255        status, servers_rebalanced = RebalanceHelper.rebalance_in(intial_severs, len(intial_severs) - 1)
256        self.assertTrue(status, msg="Rebalance was failed")
257
258        self.log.info("DATA LOAD PHASE")
259        self.loaders = SwapRebalanceBase.start_load_phase(self, master)
260
261        # Wait till load phase is over
262        SwapRebalanceBase.stop_load(self.loaders, do_stop=False)
263        self.log.info("DONE LOAD PHASE")
264
265        # Start the swap rebalance
266        current_nodes = RebalanceHelper.getOtpNodeIds(master)
267        self.log.info("current nodes : {0}".format(current_nodes))
268        toBeEjectedNodes = RebalanceHelper.pick_nodes(master, howmany=self.num_swap)
269        optNodesIds = [node.id for node in toBeEjectedNodes]
270
271        if self.swap_orchestrator:
272            status, content = ClusterOperationHelper.find_orchestrator(master)
273            self.assertTrue(status, msg="Unable to find orchestrator: {0}:{1}".\
274                format(status, content))
275            if self.num_swap is len(current_nodes):
276                optNodesIds.append(content)
277            else:
278                optNodesIds[0] = content
279
280        for node in optNodesIds:
281            self.log.info("removing node {0} and rebalance afterwards".format(node))
282
283        new_swap_servers = self.servers[num_initial_servers:num_initial_servers + self.num_swap]
284        for server in new_swap_servers:
285            otpNode = rest.add_node(creds.rest_username, creds.rest_password, server.ip, server.port)
286            msg = "unable to add node {0} to the cluster"
287            self.assertTrue(otpNode, msg.format(server.ip))
288
289        if self.swap_orchestrator:
290            rest = RestConnection(new_swap_servers[0])
291            master = new_swap_servers[0]
292
293        if self.do_access:
294            self.log.info("DATA ACCESS PHASE")
295            self.loaders = SwapRebalanceBase.start_access_phase(self, master)
296
297        self.log.info("SWAP REBALANCE PHASE")
298        rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()],
299            ejectedNodes=optNodesIds)
300
301        if do_stop_start:
302            # Rebalance is stopped at 20%, 40% and 60% completion
303            retry = 0
304            for expected_progress in (20, 40, 60):
305                self.log.info("STOP/START SWAP REBALANCE PHASE WITH PROGRESS {0}%".
306                              format(expected_progress))
307                while True:
308                    progress = rest._rebalance_progress()
309                    if progress < 0:
310                        self.log.error("rebalance progress code : {0}".format(progress))
311                        break
312                    elif progress == 100:
313                        self.log.warn("Rebalance has already reached 100%")
314                        break
315                    elif progress >= expected_progress:
316                        self.log.info("Rebalance will be stopped with {0}%".format(progress))
317                        stopped = rest.stop_rebalance()
318                        self.assertTrue(stopped, msg="unable to stop rebalance")
319                        SwapRebalanceBase.sleep(self, 20)
320                        rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()],
321                                       ejectedNodes=optNodesIds)
322                        break
323                    elif retry > 100:
324                        break
325                    else:
326                        retry += 1
327                        SwapRebalanceBase.sleep(self, 1)
328        self.assertTrue(rest.monitorRebalance(),
329            msg="rebalance operation failed after adding node {0}".format(optNodesIds))
330        SwapRebalanceBase.verification_phase(self, master)
331
332    @staticmethod
333    def _common_test_body_failed_swap_rebalance(self):
334        master = self.servers[0]
335        rest = RestConnection(master)
336        num_initial_servers = self.num_initial_servers
337        creds = self.input.membase_settings
338        intial_severs = self.servers[:num_initial_servers]
339
340        self.log.info("CREATE BUCKET PHASE")
341        SwapRebalanceBase.create_buckets(self)
342
343        # Cluster all starting set of servers
344        self.log.info("INITIAL REBALANCE PHASE")
345        status, servers_rebalanced = RebalanceHelper.rebalance_in(intial_severs, len(intial_severs) - 1)
346        self.assertTrue(status, msg="Rebalance was failed")
347
348        self.log.info("DATA LOAD PHASE")
349        self.loaders = SwapRebalanceBase.start_load_phase(self, master)
350
351        # Wait till load phase is over
352        SwapRebalanceBase.stop_load(self.loaders, do_stop=False)
353        self.log.info("DONE LOAD PHASE")
354
355        # Start the swap rebalance
356        current_nodes = RebalanceHelper.getOtpNodeIds(master)
357        self.log.info("current nodes : {0}".format(current_nodes))
358        toBeEjectedNodes = RebalanceHelper.pick_nodes(master, howmany=self.num_swap)
359        optNodesIds = [node.id for node in toBeEjectedNodes]
360        if self.swap_orchestrator:
361            status, content = ClusterOperationHelper.find_orchestrator(master)
362            self.assertTrue(status, msg="Unable to find orchestrator: {0}:{1}".\
363            format(status, content))
364            # When swapping all the nodes
365            if self.num_swap is len(current_nodes):
366                optNodesIds.append(content)
367            else:
368                optNodesIds[0] = content
369
370        for node in optNodesIds:
371            self.log.info("removing node {0} and rebalance afterwards".format(node))
372
373        new_swap_servers = self.servers[num_initial_servers:num_initial_servers + self.num_swap]
374        for server in new_swap_servers:
375            otpNode = rest.add_node(creds.rest_username, creds.rest_password, server.ip, server.port)
376            msg = "unable to add node {0} to the cluster"
377            self.assertTrue(otpNode, msg.format(server.ip))
378
379        if self.swap_orchestrator:
380            rest = RestConnection(new_swap_servers[0])
381            master = new_swap_servers[0]
382
383        self.log.info("DATA ACCESS PHASE")
384        self.loaders = SwapRebalanceBase.start_access_phase(self, master)
385
386        self.log.info("SWAP REBALANCE PHASE")
387        rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()],
388            ejectedNodes=optNodesIds)
389        SwapRebalanceBase.sleep(self, 10, "Rebalance should start")
390        self.log.info("FAIL SWAP REBALANCE PHASE @ {0}".format(self.percentage_progress))
391        reached = RestHelper(rest).rebalance_reached(self.percentage_progress)
392        if reached and RestHelper(rest).is_cluster_rebalanced():
393            # handle situation when rebalance failed at the beginning
394            self.log.error('seems rebalance failed!')
395            rest.print_UI_logs()
396            self.fail("rebalance failed even before killing memcached")
397        bucket = rest.get_buckets()[0].name
398        pid = None
399        if self.swap_orchestrator and not self.cluster_run:
400            # get PID via remote connection if master is a new node
401            shell = RemoteMachineShellConnection(master)
402            o, _ = shell.execute_command("ps -eo comm,pid | awk '$1 == \"memcached\" { print $2 }'")
403            pid = o[0]
404            shell.disconnect()
405        else:
406            times = 2
407            if self.cluster_run:
408                times = 20
409            for i in xrange(times):
410                try:
411                    _mc = MemcachedClientHelper.direct_client(master, bucket)
412                    pid = _mc.stats()["pid"]
413                    break
414                except EOFError as e:
415                    self.log.error("{0}.Retry in 2 sec".format(e))
416                    SwapRebalanceBase.sleep(self, 2)
417        if pid is None:
418            self.fail("impossible to get a PID")
419        command = "os:cmd(\"kill -9 {0} \")".format(pid)
420        self.log.info(command)
421        killed = rest.diag_eval(command)
422        self.log.info("killed {0}:{1}??  {2} ".format(master.ip, master.port, killed))
423        self.log.info("sleep for 10 sec after kill memcached")
424        SwapRebalanceBase.sleep(self, 10)
425        # we can't get stats for new node when rebalance falls
426        if not self.swap_orchestrator:
427            ClusterOperationHelper._wait_warmup_completed(self, [master], bucket, wait_time=600)
428        i = 0
429        # we expect that rebalance will be failed
430        try:
431            rest.monitorRebalance()
432        except RebalanceFailedException:
433            # retry rebalance if it failed
434            self.log.warn("Rebalance failed but it's expected")
435            SwapRebalanceBase.sleep(self, 30)
436            self.assertFalse(RestHelper(rest).is_cluster_rebalanced(), msg="cluster need rebalance")
437            knownNodes = rest.node_statuses();
438            self.log.info("nodes are still in cluster: {0}".format([(node.ip, node.port) for node in knownNodes]))
439            ejectedNodes = list(set(optNodesIds) & set([node.id for node in knownNodes]))
440            rest.rebalance(otpNodes=[node.id for node in knownNodes], ejectedNodes=ejectedNodes)
441            self.assertTrue(rest.monitorRebalance(),
442                            msg="rebalance operation failed after adding node {0}".format(toBeEjectedNodes))
443        else:
444            self.log.info("rebalance completed successfully")
445        SwapRebalanceBase.verification_phase(self, master)
446
447    @staticmethod
448    def _add_back_failed_node(self, do_node_cleanup=False):
449        master = self.servers[0]
450        rest = RestConnection(master)
451        creds = self.input.membase_settings
452
453        self.log.info("CREATE BUCKET PHASE")
454        SwapRebalanceBase.create_buckets(self)
455
456        # Cluster all servers
457        self.log.info("INITIAL REBALANCE PHASE")
458        status, servers_rebalanced = RebalanceHelper.rebalance_in(self.servers, len(self.servers) - 1)
459        self.assertTrue(status, msg="Rebalance was failed")
460
461        self.log.info("DATA LOAD PHASE")
462        self.loaders = SwapRebalanceBase.start_load_phase(self, master)
463
464        # Wait till load phase is over
465        SwapRebalanceBase.stop_load(self.loaders, do_stop=False)
466        self.log.info("DONE LOAD PHASE")
467
468        # Start the swap rebalance
469        current_nodes = RebalanceHelper.getOtpNodeIds(master)
470        self.log.info("current nodes : {0}".format(current_nodes))
471        toBeEjectedNodes = RebalanceHelper.pick_nodes(master, howmany=self.failover_factor)
472        optNodesIds = [node.id for node in toBeEjectedNodes]
473
474        # List of servers that will not be failed over
475        not_failed_over = []
476        for server in self.servers:
477            if self.cluster_run:
478                if server.port not in [node.port for node in toBeEjectedNodes]:
479                    not_failed_over.append(server)
480                    self.log.info("Node {0}:{1} not failed over".format(server.ip, server.port))
481            else:
482                if server.ip not in [node.ip for node in toBeEjectedNodes]:
483                    not_failed_over.append(server)
484                    self.log.info("Node {0}:{1} not failed over".format(server.ip, server.port))
485
486        if self.fail_orchestrator:
487            status, content = ClusterOperationHelper.find_orchestrator(master)
488            self.assertTrue(status, msg="Unable to find orchestrator: {0}:{1}".\
489                format(status, content))
490            # When swapping all the nodes
491            if self.num_swap is len(current_nodes):
492                optNodesIds.append(content)
493            else:
494                optNodesIds[0] = content
495            master = not_failed_over[-1]
496
497        self.log.info("DATA ACCESS PHASE")
498        self.loaders = SwapRebalanceBase.start_access_phase(self, master)
499
500        # Failover selected nodes
501        for node in optNodesIds:
502            self.log.info("failover node {0} and rebalance afterwards".format(node))
503            rest.fail_over(node)
504
505        rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()], \
506            ejectedNodes=optNodesIds)
507
508        self.assertTrue(rest.monitorRebalance(),
509            msg="rebalance operation failed after adding node {0}".format(optNodesIds))
510
511        # Add back the same failed over nodes
512
513        # Cleanup the node, somehow
514        # TODO: cluster_run?
515        if do_node_cleanup:
516            pass
517
518        # Make rest connection with node part of cluster
519        rest = RestConnection(master)
520
521        # Given the optNode, find ip
522        add_back_servers = []
523        nodes = rest.get_nodes()
524        for server in nodes:
525            if isinstance(server.ip, unicode):
526                add_back_servers.append(server)
527        final_add_back_servers = []
528        for server in self.servers:
529            if self.cluster_run:
530                if server.port not in [serv.port for serv in add_back_servers]:
531                    final_add_back_servers.append(server)
532            else:
533                if server.ip not in [serv.ip for serv in add_back_servers]:
534                    final_add_back_servers.append(server)
535        for server in final_add_back_servers:
536            otpNode = rest.add_node(creds.rest_username, creds.rest_password, server.ip, server.port)
537            msg = "unable to add node {0} to the cluster"
538            self.assertTrue(otpNode, msg.format(server.ip))
539
540        rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()], ejectedNodes=[])
541
542        self.assertTrue(rest.monitorRebalance(),
543            msg="rebalance operation failed after adding node {0}".format(add_back_servers))
544
545        SwapRebalanceBase.verification_phase(self, master)
546
547    @staticmethod
548    def _failover_swap_rebalance(self):
549        master = self.servers[0]
550        rest = RestConnection(master)
551        creds = self.input.membase_settings
552        num_initial_servers = self.num_initial_servers
553        intial_severs = self.servers[:num_initial_servers]
554
555        self.log.info("CREATE BUCKET PHASE")
556        SwapRebalanceBase.create_buckets(self)
557
558        # Cluster all starting set of servers
559        self.log.info("INITIAL REBALANCE PHASE")
560        status, servers_rebalanced = RebalanceHelper.rebalance_in(intial_severs, len(intial_severs) - 1)
561        self.assertTrue(status, msg="Rebalance was failed")
562
563        self.log.info("DATA LOAD PHASE")
564        self.loaders = SwapRebalanceBase.start_load_phase(self, master)
565
566        # Wait till load phase is over
567        SwapRebalanceBase.stop_load(self.loaders, do_stop=False)
568        self.log.info("DONE LOAD PHASE")
569
570        # Start the swap rebalance
571        self.log.info("current nodes : {0}".format(RebalanceHelper.getOtpNodeIds(master)))
572        toBeEjectedNodes = RebalanceHelper.pick_nodes(master, howmany=self.failover_factor)
573        optNodesIds = [node.id for node in toBeEjectedNodes]
574        if self.fail_orchestrator:
575            status, content = ClusterOperationHelper.find_orchestrator(master)
576            self.assertTrue(status, msg="Unable to find orchestrator: {0}:{1}".\
577            format(status, content))
578            optNodesIds[0] = content
579
580        self.log.info("FAILOVER PHASE")
581        # Failover selected nodes
582        for node in optNodesIds:
583            self.log.info("failover node {0} and rebalance afterwards".format(node))
584            rest.fail_over(node)
585
586        new_swap_servers = self.servers[num_initial_servers:num_initial_servers + self.failover_factor]
587        for server in new_swap_servers:
588            otpNode = rest.add_node(creds.rest_username, creds.rest_password, server.ip, server.port)
589            msg = "unable to add node {0} to the cluster"
590            self.assertTrue(otpNode, msg.format(server.ip))
591
592        if self.fail_orchestrator:
593            rest = RestConnection(new_swap_servers[0])
594            master = new_swap_servers[0]
595
596        self.log.info("DATA ACCESS PHASE")
597        self.loaders = SwapRebalanceBase.start_access_phase(self, master)
598
599        rest.rebalance(otpNodes=[node.id for node in rest.node_statuses()], \
600            ejectedNodes=optNodesIds)
601
602        self.assertTrue(rest.monitorRebalance(),
603            msg="rebalance operation failed after adding node {0}".format(new_swap_servers))
604
605        SwapRebalanceBase.verification_phase(self, master)
606
607class SwapRebalanceBasicTests(unittest.TestCase):
608
609    def setUp(self):
610        SwapRebalanceBase.common_setup(self)
611
612    def tearDown(self):
613        SwapRebalanceBase.common_tearDown(self)
614
615    def do_test(self):
616        SwapRebalanceBase._common_test_body_swap_rebalance(self, do_stop_start=False)
617
618class SwapRebalanceStartStopTests(unittest.TestCase):
619
620    def setUp(self):
621        SwapRebalanceBase.common_setup(self)
622
623    def tearDown(self):
624        SwapRebalanceBase.common_tearDown(self)
625
626    def do_test(self):
627        SwapRebalanceBase._common_test_body_swap_rebalance(self, do_stop_start=True)
628
629class SwapRebalanceFailedTests(unittest.TestCase):
630
631    def setUp(self):
632        SwapRebalanceBase.common_setup(self)
633
634    def tearDown(self):
635        SwapRebalanceBase.common_tearDown(self)
636
637    def test_failed_swap_rebalance(self):
638        self.percentage_progress = self.input.param("percentage_progress", 50)
639        SwapRebalanceBase._common_test_body_failed_swap_rebalance(self)
640
641    # Not cluster_run friendly, yet
642    def test_add_back_failed_node(self):
643        SwapRebalanceBase._add_back_failed_node(self, do_node_cleanup=False)
644
645    def test_failover_swap_rebalance(self):
646        SwapRebalanceBase._failover_swap_rebalance(self)
647