1from xdcr.xdcrbasetests import XDCRReplicationBaseTest
2from remote.remote_util import RemoteMachineShellConnection
3from membase.api.rest_client import RestConnection, RestHelper
4from membase.api.exception import CBRecoveryFailedException, InvalidArgumentException, BucketCreationException
5from testconstants import STANDARD_BUCKET_PORT
6import time
7
8
9class CBRbaseclass(XDCRReplicationBaseTest):
10    def _autofail_enable(self, _rest_):
11        status = _rest_.update_autofailover_settings(True, self.wait_timeout / 2)
12        if not status:
13            self.log.info('failed to change autofailover_settings!')
14            return
15        # read settings and verify
16        settings = _rest_.get_autofailover_settings()
17        self.assertEquals(settings.enabled, True)
18
19    def _autofail_disable(self, _rest_):
20        status = _rest_.update_autofailover_settings(False, self.wait_timeout / 2)
21        if not status:
22            self.log.info('failed to change autofailover_settings!')
23            return
24        # read settings and verify
25        settings = _rest_.get_autofailover_settings()
26        self.assertEquals(settings.enabled, False)
27
28    def wait_for_failover_or_assert(self, master, autofailover_count, timeout):
29        time_start = time.time()
30        time_max_end = time_start + 300
31        failover_count = 0
32        while time.time() < time_max_end:
33            failover_count = self.get_failover_count(master)
34            if failover_count == autofailover_count:
35                break
36            self.sleep(30)
37
38        if failover_count != autofailover_count:
39            rest = RestConnection(master)
40            self.log.warn("pools/default from {0} : {1}".format(master.ip, rest.cluster_status()))
41            self.fail("{0} node(s) failed over, expected {1} in {2} seconds".
42                            format(failover_count, autofailover_count, time.time() - time_start))
43        else:
44            self.log.info("{0} node(s) failed over as expected".format(failover_count))
45
46    def get_failover_count(self, master):
47        rest = RestConnection(master)
48        cluster_status = rest.cluster_status()
49
50        failover_count = 0
51        # check for inactiveFailed
52        for node in cluster_status['nodes']:
53            self.log.info("'clusterMembership' for node {0} is {1}".format(node["otpNode"], node['clusterMembership']))
54            if node['clusterMembership'] == "inactiveFailed":
55                failover_count += 1
56
57        return failover_count
58
59    def wait_for_catchup(self, _healthy_, _compromised_, bucket):
60        start = time.time()
61        _flag = False
62        rest1 = RestConnection(_healthy_)
63        rest2 = RestConnection(_compromised_)
64        while time.time() - start < 60:
65            _count1 = rest1.fetch_bucket_stats(bucket=bucket)["op"]["samples"]["curr_items"][-1]
66            _count2 = rest2.fetch_bucket_stats(bucket=bucket)["op"]["samples"]["curr_items"][-1]
67            if _count1 == _count2:
68                self.log.info("Cbrecovery caught up bucket {0}... {1} == {2}".format(bucket, _count1, _count2))
69                _flag = True
70                break
71            self.log.warn("Waiting for cbrecovery to catch up bucket {0}... {1} != {2}".format(bucket, _count1, _count2))
72            self.sleep(self.wait_timeout)
73        return _flag
74
75    def cbr_routine(self, _healthy_, _compromised_, wait_completed=True):
76        tasks = []
77        for bucket in self._get_cluster_buckets(_compromised_):
78            tasks.append(self.cluster.async_cbrecovery(_healthy_, _compromised_, bucket_src=bucket, bucket_dest=bucket, username=_healthy_.rest_username, password=_healthy_.rest_password,
79                 username_dest=_compromised_.rest_username, password_dest=_compromised_.rest_password, verbose=False, wait_completed=wait_completed))
80        for task in tasks:
81            task.result()
82
83        if not wait_completed:
84            return
85
86        _check = True
87
88        for bucket in self._get_cluster_buckets(_compromised_):
89            _check += self.wait_for_catchup(_healthy_, _compromised_, bucket.name)
90        if not _check:
91            raise CBRecoveryFailedException("not all items were recovered. see logs above")
92
93    def trigger_rebalance(self, rest, rebalance_reached=100):
94        _nodes_ = rest.node_statuses()
95        rest.rebalance(otpNodes=[node.id for node in _nodes_], ejectedNodes=[])
96        reached = RestHelper(rest).rebalance_reached(rebalance_reached)
97        self.assertTrue(reached, "rebalance failed, stuck or did not completed with expected progress {0}".format(rebalance_reached))
98
99    def auto_fail_over(self, master):
100        _count_ = 1
101        rest = RestConnection(master)
102        if "stop_server" in self.failover_reason:
103            for node in self.failed_nodes:
104                """
105                Autofailover will not auto failover nodes, if it could
106                result in data loss, so force failover
107                """
108                if _count_ > self._num_replicas:
109                    self.sleep(10)
110                    for item in rest.node_statuses():
111                        if node.ip == item.ip:
112                            rest.fail_over(item.id)
113                            break
114                    _count_ += 1
115                    continue
116                shell = RemoteMachineShellConnection(node)
117                shell.stop_couchbase()
118                shell.disconnect()
119                self.wait_for_failover_or_assert(master, _count_, self.wait_timeout)
120                rest.reset_autofailover()
121                _count_ += 1
122
123        elif "firewall_block" in self.failover_reason:
124            for node in self.failed_nodes:
125                """
126                Autofailover will not auto failover nodes, if it could
127                result in data loss, so force failover
128                """
129                if _count_ > self._num_replicas:
130                    time.sleep(10)
131                    for item in rest.node_statuses():
132                        if node.ip == item.ip:
133                            rest.fail_over(item.id)
134                            break
135                    _count_ += 1
136                    continue
137                shell = RemoteMachineShellConnection(node)
138                o, r = shell.execute_command("/sbin/iptables -A INPUT -p tcp -i eth0 --dport 1000:65535 -j REJECT")
139                shell.disconnect()
140                self.wait_for_failover_or_assert(master, _count_, self.wait_timeout)
141                rest.reset_autofailover()
142                _count_ += 1
143
144    def vbucket_map_checker(self, map_before, map_after, initial_set, final_set):
145        """
146        map_before: initial vbucket_map
147        map_after: changed vbucket_map
148        initial_set: no. of nodes in the cluster when initial vbucket_map considered
149        final_set: no. of nodes in the cluster when changed vbucket_map considered
150        """
151        pre_dict = {}
152        post_dict = {}
153
154        """
155        pre_dict and post_dict:
156        Dictionaries that are to contain information about
157        number of vbuckets on each node in cluster
158        """
159        pre_dict = dict((i, 0) for i in xrange(initial_set))
160        post_dict = dict((i, 0) for i in xrange(final_set))
161
162        for i in map_before:
163            for j in range(initial_set):
164                if i[0] == j:
165                    pre_dict[j] += 1
166
167        for i in map_after:
168            for j in range(final_set):
169                if i[0] == j:
170                    post_dict[j] += 1
171
172        if len(pre_dict) != len(post_dict):
173            return False
174
175        for i in pre_dict:
176            for j in post_dict:
177                if i == j:
178                    if pre_dict[i] != post_dict[j]:
179                        return False
180        return True
181
182# Assumption that at least 4 nodes on every cluster
183class cbrecovery(CBRbaseclass, XDCRReplicationBaseTest):
184    def setUp(self):
185        super(cbrecovery, self).setUp()
186        self._failover_count = self._input.param("fail_count", 0)
187        self._add_count = self._input.param("add_count", 0)
188        self.failover_reason = self._input.param("failover_reason", "stop_server")  # or firewall_block
189        self.flag_val = self._input.param("setflag", 0)
190        self.failed_nodes = []
191        self._ifautofail = False
192        for server in self._servers:
193            rest = RestConnection(server)
194            rest.reset_autofailover()
195            shell = RemoteMachineShellConnection(server)
196            o, r = shell.execute_command("iptables -F")
197            shell.log_command_output(o, r)
198            shell.disconnect()
199
200    def tearDown(self):
201        if self._ifautofail:
202            if "stop_server" in self.failover_reason:
203                for node in self.failed_nodes:
204                    shell = RemoteMachineShellConnection(node)
205                    shell.start_couchbase()
206                    shell.disconnect()
207            elif "firewall_block" in self.failover_reason:
208                for node in self.failed_nodes:
209                    shell = RemoteMachineShellConnection(node)
210                    o, r = shell.execute_command("iptables -F")
211                    shell.log_command_output(o, r)
212                    o, r = shell.execute_command("/sbin/iptables -A INPUT -p tcp -i eth0 --dport 1000:65535 -j ACCEPT")
213                    shell.log_command_output(o, r)
214                    shell.disconnect()
215            self.sleep(20)
216        super(cbrecovery, self).tearDown()
217
218    def common_preSetup(self):
219        self._load_all_buckets(self.src_master, self.gen_create, "create", 0, flag=self.flag_val)
220        tasks = []
221        if self._doc_ops is not None:
222            if "update" in self._doc_ops:
223                tasks.extend(self._async_load_all_buckets(self.src_master, self.gen_update, "update", self._expires))
224            if "delete" in self._doc_ops:
225                tasks.extend(self._async_load_all_buckets(self.src_master, self.gen_delete, "delete", 0))
226        for task in tasks:
227            task.result()
228
229        self._wait_for_replication_to_catchup()
230        """
231        Tracking vbucket movement just on the default bucket for now
232        """
233        self.vbucket_map_before = []
234        self.initial_node_count = 0
235        self.vbucket_map_after = []
236        self.final_node_count = 0
237
238    def common_tearDown_verification(self):
239        if self._failover is not None:
240            # TOVERIFY: Check if vbucket map unchanged if swap rebalance
241            if self._default_bucket:
242                if self._failover_count == self._add_count:
243                    _flag_ = self.vbucket_map_checker(self.vbucket_map_before, self.vbucket_map_after, self.initial_node_count, self.final_node_count)
244                    if _flag_:
245                        self.log.info("vbucket_map same as earlier")
246                    else:
247                        self.log.info("vbucket_map differs from earlier")
248
249        self.sleep(self.wait_timeout / 2)
250        self.merge_buckets(self.src_master, self.dest_master, bidirection=False)
251        self.verify_results()
252
253
254    def restart_cbrecover_multiple_failover_swapout_reb_routine(self):
255        self.common_preSetup()
256        when_step = self._input.param("when_step", "recovery_when_rebalance")
257        if self._failover is not None:
258            if "source" in self._failover:
259                rest = RestConnection(self.src_master)
260                if self._default_bucket:
261                    self.initial_node_count = len(self.src_nodes)
262                    self.vbucket_map_before = rest.fetch_vbucket_map()  # JUST FOR DEFAULT BUCKET AS OF NOW
263                if self._failover_count >= len(self.src_nodes):
264                    raise Exception("Won't failover .. count exceeds available servers on source : SKIPPING TEST")
265                if len(self._floating_servers_set) < self._add_count:
266                    raise Exception("Not enough spare nodes available, to match the failover count : SKIPPING TEST")
267                self.log.info("Failing over {0} nodes on source ..".format(self._failover_count))
268                self.failed_nodes = self.src_nodes[(len(self.src_nodes) - self._failover_count):len(self.src_nodes)]
269                self.cluster.failover(self.src_nodes, self.failed_nodes)
270                for node in self.failed_nodes:
271                    self.src_nodes.remove(node)
272                add_nodes = self._floating_servers_set[0:self._add_count]
273                for node in add_nodes:
274                    rest.add_node(user=node.rest_username, password=node.rest_password, remoteIp=node.ip, port=node.port)
275                self.src_nodes.extend(add_nodes)
276                self.sleep(self.wait_timeout / 4)
277                # CALL THE CBRECOVERY ROUTINE WITHOUT WAIT FOR COMPLETED
278                self.cbr_routine(self.dest_master, self.src_master, False)
279
280                if "create_bucket_when_recovery" in when_step:
281                     name = 'standard_bucket'
282                     try:
283                         standard_params=self._create_bucket_params(server=self.src_master, size=100, replicas=1)
284                         self._create_standard_bucket(name=name, port=STANDARD_BUCKET_PORT+10,
285                                                             bucket_params=standard_params)
286                     except BucketCreationException, e:
287                         self.log.info("bucket creation failed during cbrecovery as expected")
288                     # but still able to create bucket on destination
289                     standard_params = self._create_bucket_params(server=self.dest_master, size=100, replicas=1)
290                     self.cluster.create_standard_bucket(name=name, port=STANDARD_BUCKET_PORT + 10,
291                                                         bucket_params=standard_params)
292                     # here we try to re-call cbrecovery(seems it's supported even it's still running)
293                     # if recovery fast(=completed) we can get "No recovery needed"
294                     self.cbr_routine(self.dest_master, self.src_master)
295                elif "recovery_when_rebalance" in when_step:
296                    rest.remove_all_recoveries()
297                    self.trigger_rebalance(rest, 15)
298                    try:
299                        self.cbr_routine(self.dest_master, self.src_master)
300                        self.log.exception("cbrecovery should be failed when rebalance is in progress")
301                    except CBRecoveryFailedException, e:
302                        self.log.info("cbrecovery failed  as expected when there are no failovered nodes")
303                    reached = RestHelper(rest).rebalance_reached()
304                    self.assertTrue(reached, "rebalance failed or did not completed")
305                    if self._replication_direction_str == "unidirection":
306                        self.log.warn("we expect data lost on source cluster with unidirection replication")
307                        self.log.warn("verification data will be skipped")
308                        return
309                elif "recovery_when_rebalance_stopped" in when_step:
310                    rest.remove_all_recoveries()
311                    self.trigger_rebalance(rest, 15)
312                    rest.stop_rebalance()
313                    try:
314                        self.cbr_routine(self.dest_master, self.src_master)
315                        self.log.exception("cbrecovery should be failed when rebalance has been stopped")
316                    except CBRecoveryFailedException, e:
317                        self.log.info("cbrecovery failed  as expected when there are no failovered nodes")
318                elif "rebalance_when_recovering" in when_step:
319                    # try to call  rebalance during cbrecovery
320                    try:
321                        self.trigger_rebalance(rest)
322                        self.log.exception("rebalance is not permitted during cbrecovery")
323                    except InvalidArgumentException, e:
324                        self.log.info("can't call rebalance during cbrecovery as expected")
325                    # here we try to re-call cbrecovery(seems it's supported even it's still running)
326                    self.cbr_routine(self.dest_master, self.src_master)
327                if self._default_bucket:
328                    self.vbucket_map_after = rest.fetch_vbucket_map()
329                    self.final_node_count = len(self.src_nodes)
330
331            elif "destination" in self._failover:
332                rest = RestConnection(self.dest_master)
333                if self._default_bucket:
334                    self.initial_node_count = len(self.dest_nodes)
335                    self.vbucket_map_before = rest.fetch_vbucket_map()  # JUST FOR DEFAULT BUCKET AS OF NOW
336                if self._failover_count >= len(self.dest_nodes):
337                    raise Exception("Won't failover .. count exceeds available servers on sink : SKIPPING TEST")
338                if len(self._floating_servers_set) < self._add_count:
339                    raise Exception("Not enough spare nodes available, to match the failover count : SKIPPING TEST")
340                self.log.info("Failing over {0} nodes on destination ..".format(self._failover_count))
341                self.failed_nodes = self.dest_nodes[(len(self.dest_nodes) - self._failover_count):len(self.dest_nodes)]
342                self.cluster.failover(self.dest_nodes, self.failed_nodes)
343                for node in self.failed_nodes:
344                    self.dest_nodes.remove(node)
345                add_nodes = self._floating_servers_set[0:self._add_count]
346                for node in add_nodes:
347                    rest.add_node(user=node.rest_username, password=node.rest_password, remoteIp=node.ip, port=node.port)
348                self.dest_nodes.extend(add_nodes)
349                self.sleep(self.wait_timeout / 4)
350                # CALL THE CBRECOVERY ROUTINE
351                self.cbr_routine(self.src_master, self.dest_master, False)
352
353                if "create_bucket_when_recovery" in when_step:
354                     name = 'standard_bucket'
355                     try:
356                         standard_params=self._create_bucket_params(server=self.dest_master, size=100, replicas=1)
357                         self.cluster.create_standard_bucket(name=name, port=STANDARD_BUCKET_PORT + 10,
358                                                             bucket_params=standard_params)
359                     except BucketCreationException, e:
360                         self.log.info("bucket creation failed during cbrecovery as expected")
361                     standard_params = self._create_bucket_params(server=self.src_master, size=100, replicas=1)
362                     self.cluster.create_standard_bucket(name=name, port=STANDARD_BUCKET_PORT + 10,
363                                                         bucket_params=standard_params)
364                     self.cbr_routine(self.src_master, self.dest_master)
365                elif "recovery_when_rebalance" in when_step:
366                    rest.remove_all_recoveries()
367                    self.trigger_rebalance(rest, 15)
368                    try:
369                        self.cbr_routine(self.src_master, self.dest_master)
370                        self.log.exception("cbrecovery should be failed when rebalance is in progress")
371                    except CBRecoveryFailedException, e:
372                        self.log.info("cbrecovery failed  as expected when there are no failovered nodes")
373                    reached = RestHelper(rest).rebalance_reached()
374                    self.assertTrue(reached, "rebalance failed or did not completed")
375                elif "recovery_when_rebalance_stopped" in when_step:
376                    rest.remove_all_recoveries()
377                    self.trigger_rebalance(rest, 15)
378                    rest.stop_rebalance()
379                    try:
380                        self.cbr_routine(self.src_master, self.dest_master)
381                        self.log.exception("cbrecovery should be failed when rebalance has been stopped")
382                    except CBRecoveryFailedException, e:
383                        self.log.info("cbrecovery failed  as expected when there are no failovered nodes")
384                elif "rebalance_when_recovering" in when_step:
385                    # try to call  rebalance during cbrecovery
386                    try:
387                        self.trigger_rebalance(rest)
388                        self.log.exception("rebalance is not permitted during cbrecovery")
389                    except InvalidArgumentException, e:
390                        self.log.info("can't call rebalance during cbrecovery as expected")
391                    # here we try to re-call cbrecovery(seems it's supported even it's still running)
392                    self.cbr_routine(self.src_master, self.dest_master)
393
394                if self._default_bucket:
395                    self.vbucket_map_after = rest.fetch_vbucket_map()
396                    self.final_node_count = len(self.dest_nodes)
397
398        self.trigger_rebalance(rest)
399        self.common_tearDown_verification()
400
401
402    def cbrecover_multiple_failover_swapout_reb_routine(self):
403        self.common_preSetup()
404        if self._failover is not None:
405            if "source" in self._failover:
406                rest = RestConnection(self.src_master)
407                if self._default_bucket:
408                    self.initial_node_count = len(self.src_nodes)
409                    self.vbucket_map_before = rest.fetch_vbucket_map()  # JUST FOR DEFAULT BUCKET AS OF NOW
410                if self._failover_count >= len(self.src_nodes):
411                    raise Exception("Won't failover .. count exceeds available servers on source : SKIPPING TEST")
412                if len(self._floating_servers_set) < self._add_count:
413                    raise Exception("Not enough spare nodes available, to match the failover count : SKIPPING TEST")
414                self.log.info("Failing over {0} nodes on source ..".format(self._failover_count))
415                self.failed_nodes = self.src_nodes[(len(self.src_nodes) - self._failover_count):len(self.src_nodes)]
416                self.cluster.failover(self.src_nodes, self.failed_nodes)
417                for node in self.failed_nodes:
418                    self.src_nodes.remove(node)
419                add_nodes = self._floating_servers_set[0:self._add_count]
420                for node in add_nodes:
421                    rest.add_node(user=node.rest_username, password=node.rest_password, remoteIp=node.ip, port=node.port)
422                self.src_nodes.extend(add_nodes)
423                self.sleep(self.wait_timeout / 4)
424                # CALL THE CBRECOVERY ROUTINE
425                self.cbr_routine(self.dest_master, self.src_master)
426
427                self.trigger_rebalance(rest)
428                if self._default_bucket:
429                    self.vbucket_map_after = rest.fetch_vbucket_map()
430                    self.final_node_count = len(self.src_nodes)
431
432            elif "destination" in self._failover:
433                rest = RestConnection(self.dest_master)
434                if self._default_bucket:
435                    self.initial_node_count = len(self.dest_nodes)
436                    self.vbucket_map_before = rest.fetch_vbucket_map()  # JUST FOR DEFAULT BUCKET AS OF NOW
437                if self._failover_count >= len(self.dest_nodes):
438                    raise Exception("Won't failover .. count exceeds available servers on sink : SKIPPING TEST")
439                if len(self._floating_servers_set) < self._add_count:
440                    raise Exception("Not enough spare nodes available, to match the failover count : SKIPPING TEST")
441                self.log.info("Failing over {0} nodes on destination ..".format(self._failover_count))
442                self.failed_nodes = self.dest_nodes[(len(self.dest_nodes) - self._failover_count):len(self.dest_nodes)]
443                self.cluster.failover(self.dest_nodes, self.failed_nodes)
444                for node in self.failed_nodes:
445                    self.dest_nodes.remove(node)
446                add_nodes = self._floating_servers_set[0:self._add_count]
447                for node in add_nodes:
448                    rest.add_node(user=node.rest_username, password=node.rest_password, remoteIp=node.ip, port=node.port)
449                self.dest_nodes.extend(add_nodes)
450                self.sleep(self.wait_timeout / 4)
451                # CALL THE CBRECOVERY ROUTINE
452                self.cbr_routine(self.src_master, self.dest_master)
453
454                self.trigger_rebalance(rest)
455                if self._default_bucket:
456                    self.vbucket_map_after = rest.fetch_vbucket_map()
457                    self.final_node_count = len(self.dest_nodes)
458
459        self.common_tearDown_verification()
460
461
462    def cbrecover_multiple_autofailover_swapout_reb_routine(self):
463        self.common_preSetup()
464        if self._failover is not None:
465            self._ifautofail = True
466            if "source" in self._failover:
467                rest = RestConnection(self.src_master)
468                if self._default_bucket:
469                    self.initial_node_count = len(self.src_nodes)
470                    self.vbucket_map_before = rest.fetch_vbucket_map()  # JUST FOR DEFAULT BUCKET AS OF NOW
471                if self._failover_count >= len(self.src_nodes):
472                    raise Exception("Won't failover .. count exceeds available servers on source : SKIPPING TEST")
473                if len(self._floating_servers_set) < self._add_count:
474                    raise Exception("Not enough spare nodes available, to match the failover count : SKIPPING TEST")
475
476                self._autofail_enable(rest)
477                self.log.info("Triggering {0} over {1} nodes on source ..".format(self.failover_reason, self._failover_count))
478                self.failed_nodes = self.src_nodes[(len(self.src_nodes) - self._failover_count):len(self.src_nodes)]
479                self.auto_fail_over(self.src_master)
480                for node in self.failed_nodes:
481                    self.src_nodes.remove(node)
482                add_nodes = self._floating_servers_set[0:self._add_count]
483                for node in add_nodes:
484                    rest.add_node(user=node.rest_username, password=node.rest_password, remoteIp=node.ip, port=node.port)
485                self.src_nodes.extend(add_nodes)
486                self.sleep(self.wait_timeout / 4)
487                # CALL THE CBRECOVERY ROUTINE
488                self.cbr_routine(self.dest_master, self.src_master)
489
490                self.trigger_rebalance(rest)
491                if self._default_bucket:
492                    self.vbucket_map_after = rest.fetch_vbucket_map()
493                    self.initial_node_count = len(self.src_nodes)
494
495                self._autofail_disable(rest)
496
497            elif "destination" in self._failover:
498                rest = RestConnection(self.dest_master)
499                if self._default_bucket:
500                    self.initial_node_count = len(self.dest_nodes)
501                    self.vbucket_map_before = rest.fetch_vbucket_map()  # JUST FOR DEFAULT BUCKET AS OF NOW
502                if self._failover_count >= len(self.dest_nodes):
503                    raise Exception("Won't failover .. count exceeds available servers on source : SKIPPING TEST")
504                if len(self._floating_servers_set) < self._add_count:
505                    raise Exception("Not enough spare nodes available, to match the failover count : SKIPPING TEST")
506
507                self._autofail_enable(rest)
508                self.log.info("Triggering {0} over {1} nodes on destination ..".format(self.failover_reason, self._failover_count))
509                self.failed_nodes = self.dest_nodes[(len(self.dest_nodes) - self._failover_count):len(self.dest_nodes)]
510                self.auto_fail_over(self.dest_master)
511                for node in self.failed_nodes:
512                    self.dest_nodes.remove(node)
513                add_nodes = self._floating_servers_set[0:self._add_count]
514                for node in add_nodes:
515                    rest.add_node(user=node.rest_username, password=node.rest_password, remoteIp=node.ip, port=node.port)
516                self.dest_nodes.extend(add_nodes)
517                self.sleep(self.wait_timeout / 4)
518                # CALL THE CBRECOVERY ROUTINE
519                self.cbr_routine(self.src_master, self.dest_master)
520
521                self.trigger_rebalance(rest)
522                if self._default_bucket:
523                    self.vbucket_map_after = rest.fetch_vbucket_map()
524                    self.final_node_count = len(self.dest_nodes)
525
526                self._autofail_disable(rest)
527
528        self.common_tearDown_verification()
529
530    def cbrecover_multiple_failover_addback_routine(self):
531        self.common_preSetup()
532        if self._failover is not None:
533            if "source" in self._failover:
534                rest = RestConnection(self.src_master)
535                if self._default_bucket:
536                    self.initial_node_count = len(self.src_nodes)
537                    self.vbucket_map_before = rest.fetch_vbucket_map()  # JUST FOR DEFAULT BUCKET AS OF NOW
538                if self._failover_count >= len(self.src_nodes):
539                    raise Exception("Won't failover .. count exceeds available servers on source : SKIPPING TEST")
540                if len(self._floating_servers_set) < self._add_count:
541                    raise Exception("Not enough spare nodes available, to match the failover count : SKIPPING TEST")
542                self.log.info("Failing over {0} nodes on source ..".format(self._failover_count))
543                self.failed_nodes = self.src_nodes[(len(self.src_nodes) - self._failover_count):len(self.src_nodes)]
544                self.cluster.failover(self.src_nodes, self.failed_nodes)
545                self.sleep(self.wait_timeout / 4)
546                self.log.info("Adding back the {0} nodes that were failed over ..".format(self._failover_count))
547                for node in self.failed_nodes:
548                    self.adding_back_a_node(self.src_master, node)
549                add_nodes = self._floating_servers_set[0:self._add_count]
550                self.sleep(self.wait_timeout / 4)
551                for node in add_nodes:
552                    rest.add_node(user=node.rest_username, password=node.rest_password, remoteIp=node.ip, port=node.port)
553                self.src_nodes.extend(add_nodes)
554                # CALL THE CBRECOVERY ROUTINE
555                self.cbr_routine(self.dest_master, self.src_master)
556
557                self.trigger_rebalance(rest)
558                if self._default_bucket:
559                    self.vbucket_map_after = rest.fetch_vbucket_map()
560                    self.final_node_count = len(self.src_nodes)
561
562            elif "destination" in self._failover:
563                rest = RestConnection(self.dest_master)
564                if self._default_bucket:
565                    self.initial_node_count = len(self.dest_nodes)
566                    self.vbucket_map_before = rest.fetch_vbucket_map()  # JUST FOR DEFAULT BUCKET AS OF NOW
567                if self._failover_count >= len(self.dest_nodes):
568                    raise Exception("Won't failover .. count exceeds available servers on sink : SKIPPING TEST")
569                if len(self._floating_servers_set) < self._add_count:
570                    raise Exception("Not enough spare nodes available, to match the failover count : SKIPPING TEST")
571
572                self.log.info("Failing over {0} nodes on destination ..".format(self._failover_count))
573                self.failed_nodes = self.dest_nodes[(len(self.dest_nodes) - self._failover_count):len(self.dest_nodes)]
574                self.cluster.failover(self.dest_nodes, self.failed_nodes)
575                self.sleep(self.wait_timeout / 4)
576                self.log.info("Adding back the {0} nodes that were failed over ..".format(self._failover_count))
577                for node in self.failed_nodes:
578                    self.adding_back_a_node(self.dest_master, node)
579                add_nodes = self._floating_servers_set[0:self._add_count]
580                self.sleep(self.wait_timeout / 4)
581                for node in add_nodes:
582                    rest.add_node(user=node.rest_username, password=node.rest_password, remoteIp=node.ip, port=node.port)
583                self.dest_nodes.extend(add_nodes)
584                # CALL THE CBRECOVERY ROUTINE
585                self.cbr_routine(self.src_master, self.dest_master)
586
587                self.trigger_rebalance(rest)
588                if self._default_bucket:
589                    self.vbucket_map_after = rest.fetch_vbucket_map()
590                    self.final_node_count = len(self.dest_nodes)
591
592        self.common_tearDown_verification()
593