1from TestInput import TestInputSingleton
2import logger
3import time
4import unittest
5from membase.api.rest_client import RestConnection
6from membase.helper.bucket_helper import BucketOperationHelper
7from membase.helper.cluster_helper import ClusterOperationHelper
8from membase.helper.rebalance_helper import RebalanceHelper
9from memcached.helper.data_helper import MemcachedClientHelper
10from remote.remote_util import RemoteMachineShellConnection, RemoteUtilHelper
11
12
13
14log = logger.Logger.get_logger()
15
16class AutoFailoverBaseTest(unittest.TestCase):
17    # start from 1..n
18    # then from no failover x node and rebalance and
19    # verify we did not lose items
20
21    # maximum time we allow ns_server to take to detect a failed node
22    # timeout as workaround for MB-7863
23    MAX_FAIL_DETECT_TIME = 120
24
25    @staticmethod
26    def common_setup(input, testcase):
27        log.info("==============  common_setup was started for test #{0} {1}=============="\
28                      .format(testcase.case_number, testcase._testMethodName))
29        servers = input.servers
30        RemoteUtilHelper.common_basic_setup(servers)
31        BucketOperationHelper.delete_all_buckets_or_assert(servers, testcase)
32        ClusterOperationHelper.cleanup_cluster(servers)
33        ClusterOperationHelper.wait_for_ns_servers_or_assert(servers, testcase)
34        log.info("==============  common_setup was finished for test #{0} {1} =============="\
35                      .format(testcase.case_number, testcase._testMethodName))
36
37    @staticmethod
38    def common_tearDown(servers, testcase):
39        log.info("==============  common_tearDown was started for test #{0} {1} =============="\
40                          .format(testcase.case_number, testcase._testMethodName))
41        RemoteUtilHelper.common_basic_setup(servers)
42
43        log.info("10 seconds delay to wait for couchbase-server to start")
44        time.sleep(10)
45        ClusterOperationHelper.wait_for_ns_servers_or_assert(servers, testcase, \
46                wait_time=AutoFailoverBaseTest.MAX_FAIL_DETECT_TIME * 10, wait_if_warmup=True)
47        try:
48            rest = RestConnection(servers[0])
49            buckets = rest.get_buckets()
50            for bucket in buckets:
51                MemcachedClientHelper.flush_bucket(servers[0], bucket.name)
52        except Exception:
53            pass
54        BucketOperationHelper.delete_all_buckets_or_assert(servers, testcase)
55        ClusterOperationHelper.cleanup_cluster(servers)
56        log.info("==============  common_tearDown was finished for test #{0} {1} =============="\
57                          .format(testcase.case_number, testcase._testMethodName))
58
59    @staticmethod
60    def wait_for_failover_or_assert(master, autofailover_count, timeout, testcase):
61        time_start = time.time()
62        time_max_end = time_start + timeout
63        failover_count = 0
64        while time.time() < time_max_end:
65            failover_count = AutoFailoverBaseTest.get_failover_count(master)
66            if failover_count == autofailover_count:
67                testcase.log.info("{0} nodes failed over as expected".format(failover_count))
68                testcase.log.info("expected failover in {0} seconds, actual time {1} seconds".format\
69                              (timeout - AutoFailoverBaseTest.MAX_FAIL_DETECT_TIME, time.time() - time_start))
70                return
71            time.sleep(2)
72
73        rest = RestConnection(master)
74        rest.print_UI_logs()
75        testcase.log.warn("pools/default from {0} : {1}".format(master.ip, rest.cluster_status()))
76        testcase.fail("{0} nodes failed over, expected {1} in {2} seconds".
77                         format(failover_count, autofailover_count, time.time() - time_start))
78
79    @staticmethod
80    def wait_for_no_failover_or_assert(master, autofailover_count, timeout, testcase):
81        time_start = time.time()
82        time_max_end = time_start + timeout
83        failover_count = 0
84
85        while time.time() < time_max_end:
86            failover_count = AutoFailoverBaseTest.get_failover_count(master)
87            if failover_count == autofailover_count:
88                break
89            time.sleep(2)
90
91        time_end = time.time()
92
93        testcase.assertFalse(failover_count == autofailover_count, "{0} nodes failed over, didn't expect {1} in {2} seconds".
94                             format(failover_count, autofailover_count, time.time() - time_start))
95        log.info("{0} nodes failed over as expected in {1} seconds".format(failover_count, time_end - time_start))
96
97    @staticmethod
98    def get_failover_count(master):
99        rest = RestConnection(master)
100        cluster_status = rest.cluster_status()
101
102        failover_count = 0
103        # check for inactiveFailed
104        for node in cluster_status['nodes']:
105            log.info("'clusterMembership' for node {0} is {1}".format(node["otpNode"], node['clusterMembership']))
106            if node['clusterMembership'] == "inactiveFailed":
107                failover_count += 1
108
109        return failover_count
110
111
112class AutoFailoverTests(unittest.TestCase):
113    def setUp(self):
114        self.input = TestInputSingleton.input
115        self.case_number = self.input.param("case_number", 0)
116        self.servers = self.input.servers
117        self.log = logger.Logger().get_logger()
118        self.master = self.servers[0]
119        self.rest = RestConnection(self.master)
120        self.timeout = 60
121        AutoFailoverBaseTest.common_setup(self.input, self)
122        self._cluster_setup()
123
124    def tearDown(self):
125        AutoFailoverBaseTest.common_tearDown(self.servers, self)
126
127    def sleep(self, timeout=1, message=""):
128        self.log.info("sleep for {0} secs. {1} ...".format(timeout, message))
129        time.sleep(timeout)
130
131    def test_enable(self):
132        status = self.rest.update_autofailover_settings(True, self.timeout / 2)
133        if not status:
134            self.fail('failed to change autofailover_settings! See MB-7282')
135        #read settings and verify
136        settings = self.rest.get_autofailover_settings()
137        self.assertEquals(settings.enabled, True)
138
139    def test_disable(self):
140        status = self.rest.update_autofailover_settings(False, self.timeout)
141        if not status:
142            self.fail('failed to change autofailover_settings! See MB-7282')
143        #read settings and verify
144        settings = self.rest.get_autofailover_settings()
145        self.assertEquals(settings.enabled, False)
146
147    def test_valid_timeouts(self):
148        timeouts = [30, 31, 300, 3600]
149        for timeout in timeouts:
150            status = self.rest.update_autofailover_settings(True, timeout)
151            if not status:
152                self.fail('failed to change autofailover_settings! See MB-7282')
153            #read settings and verify
154            settings = self.rest.get_autofailover_settings()
155            self.assertTrue(settings.timeout == timeout)
156
157    def test_30s_timeout_firewall(self):
158        timeout = self.timeout / 2
159        server_fail = self.servers[1]
160        status = self.rest.update_autofailover_settings(True, timeout)
161        if not status:
162            self.fail('failed to change autofailover_settings! See MB-7282')
163        self.sleep(5)
164        RemoteUtilHelper.enable_firewall(server_fail)
165        AutoFailoverBaseTest.wait_for_failover_or_assert(self.master, 1, timeout + AutoFailoverBaseTest.MAX_FAIL_DETECT_TIME, self)
166
167    def test_60s_timeout_firewall(self):
168        timeout = self.timeout
169        server_fail = self.servers[1]
170        status = self.rest.update_autofailover_settings(True, timeout)
171        if not status:
172            self.fail('failed to change autofailover_settings! See MB-7282')
173        self.sleep(5)
174        RemoteUtilHelper.enable_firewall(server_fail)
175        AutoFailoverBaseTest.wait_for_failover_or_assert(self.master, 1, timeout + AutoFailoverBaseTest.MAX_FAIL_DETECT_TIME, self)
176
177    def test_30s_timeout_stop(self):
178        timeout = self.timeout
179        server_fail = self.servers[1]
180        status = self.rest.update_autofailover_settings(True, timeout)
181        if not status:
182            self.fail('failed to change autofailover_settings! See MB-7282')
183        self.sleep(5)
184        self._stop_couchbase(server_fail)
185        AutoFailoverBaseTest.wait_for_failover_or_assert(self.master, 1, timeout + AutoFailoverBaseTest.MAX_FAIL_DETECT_TIME, self)
186
187    def test_60s_timeout_stop(self):
188        timeout = self.timeout
189        server_fail = self.servers[1]
190        status = self.rest.update_autofailover_settings(True, timeout)
191        if not status:
192            self.fail('failed to change autofailover_settings! See MB-7282')
193        self.sleep(5)
194        self._stop_couchbase(server_fail)
195        AutoFailoverBaseTest.wait_for_failover_or_assert(self.master, 1, timeout + AutoFailoverBaseTest.MAX_FAIL_DETECT_TIME, self)
196
197    def test_reset_count(self):
198        timeout = self.timeout / 2
199        server_fail1 = self.servers[1]
200        server_fail2 = self.servers[2]
201        status = self.rest.update_autofailover_settings(True, timeout)
202        if not status:
203            self.fail('failed to change autofailover_settings! See MB-7282')
204        self.sleep(5)
205        self.log.info("stopping the first server")
206        self._stop_couchbase(server_fail1)
207        AutoFailoverBaseTest.wait_for_failover_or_assert(self.master, 1, timeout + AutoFailoverBaseTest.MAX_FAIL_DETECT_TIME, self)
208
209        self.log.info("resetting the autofailover count")
210        if not self.rest.reset_autofailover():
211            self.fail('failed to reset autofailover count!')
212
213        self.log.info("stopping the second server")
214        self._stop_couchbase(server_fail2)
215        AutoFailoverBaseTest.wait_for_failover_or_assert(self.master, 1, timeout + AutoFailoverBaseTest.MAX_FAIL_DETECT_TIME, self)
216
217        self.log.info("resetting the autofailover count")
218        if not self.rest.reset_autofailover():
219            self.fail('failed to reset autofailover count!')
220
221    def test_30s_timeout_pause(self):
222        timeout = self.timeout / 2
223        server_fail = self.servers[1]
224        shell = RemoteMachineShellConnection(server_fail)
225        type = shell.extract_remote_info().distribution_type
226        shell.disconnect()
227        if type.lower() == 'windows':
228            self.log.info("test will be skipped because the signals SIGSTOP and SIGCONT do not exist for Windows")
229            return
230        status = self.rest.update_autofailover_settings(True, timeout)
231        if not status:
232            self.fail('failed to change autofailover_settings! See MB-7282')
233        self.sleep(5)
234        self._pause_couchbase(server_fail)
235        AutoFailoverBaseTest.wait_for_failover_or_assert(self.master, 1, timeout + AutoFailoverBaseTest.MAX_FAIL_DETECT_TIME, self)
236
237    def test_60s_timeout_pause(self):
238        timeout = self.timeout
239        server_fail = self.servers[1]
240        shell = RemoteMachineShellConnection(server_fail)
241        type = shell.extract_remote_info().distribution_type
242        shell.disconnect()
243        if type.lower() == 'windows':
244            self.log.info("test will be skipped because the signals SIGSTOP and SIGCONT do not exist for Windows")
245            return
246        status = self.rest.update_autofailover_settings(True, timeout)
247        if not status:
248            self.fail('failed to change autofailover_settings! See MB-7282')
249        self.sleep(5)
250        self._pause_couchbase(server_fail)
251        AutoFailoverBaseTest.wait_for_failover_or_assert(self.master, 1, timeout + AutoFailoverBaseTest.MAX_FAIL_DETECT_TIME, self)
252
253    def test_invalid_timeouts(self):
254        timeouts = [-360, -60, 0, 15, 29, 300000]
255        for timeout in timeouts:
256            status = self.rest.update_autofailover_settings(True, timeout)
257            if status:
258                self.fail('autofailover_settings have been changed incorrectly!')
259            #read settings and verify
260            settings = self.rest.get_autofailover_settings()
261            self.assertTrue(settings.timeout >= 30)
262
263    def test_two_failed_nodes(self):
264        timeout = self.timeout / 2
265        server_fail1 = self.servers[1]
266        server_fail2 = self.servers[2]
267        status = self.rest.update_autofailover_settings(True, timeout)
268        if not status:
269            self.fail('failed to change autofailover_settings! See MB-7282')
270        self.sleep(5)
271        self.log.info("stopping the first server")
272        self._stop_couchbase(server_fail1)
273        AutoFailoverBaseTest.wait_for_failover_or_assert(self.master, 1, timeout + AutoFailoverBaseTest.MAX_FAIL_DETECT_TIME, self)
274
275        self.log.info("stopping the second server")
276        self._stop_couchbase(server_fail2)
277        AutoFailoverBaseTest.wait_for_no_failover_or_assert(self.master, 2, timeout + AutoFailoverBaseTest.MAX_FAIL_DETECT_TIME, self)
278
279    def _stop_couchbase(self, server):
280        shell = RemoteMachineShellConnection(server)
281        shell.stop_couchbase()
282        shell.disconnect()
283        log.info("stopped couchbase server on {0}".format(server))
284
285    #the signals SIGSTOP and SIGCONT do not exist for Windows
286    def _pause_couchbase(self, server):
287        self._pause_beam(server)
288        self._pause_memcached(server)
289
290    #the signals SIGSTOP and SIGCONT do not exist for Windows
291    def _pause_memcached(self, server):
292        shell = RemoteMachineShellConnection(server)
293        shell.pause_memcached()
294        shell.disconnect()
295        log.info("stopped couchbase server on {0}".format(server))
296
297    #the signals SIGSTOP and SIGCONT do not exist for Windows
298    def _pause_beam(self, server):
299        shell = RemoteMachineShellConnection(server)
300        shell.pause_beam()
301        shell.disconnect()
302        log.info("stopped couchbase server on {0}".format(server))
303
304    def load_data(self, master, bucket, keys_count):
305        inserted_keys_cnt = 0
306        repeat_count = 0
307        while inserted_keys_cnt < keys_count and repeat_count < 5:
308            keys_cnt, rejected_keys_cnt = \
309            MemcachedClientHelper.load_bucket(servers=[master],
310                name=bucket,
311                number_of_items=keys_count,
312                number_of_threads=5,
313                write_only=True)
314            inserted_keys_cnt += keys_cnt
315            if keys_cnt == 0:
316                repeat_count += 1
317            else:
318                repeat_count = 0
319        if repeat_count == 5:
320            log.exception("impossible to load data")
321        log.info("wait until data is completely persisted on the disk")
322        RebalanceHelper.wait_for_persistence(master, bucket)
323        return inserted_keys_cnt
324
325    def _cluster_setup(self):
326        replicas = self.input.param("replicas", 1)
327        keys_count = self.input.param("keys-count", 0)
328        num_buckets = self.input.param("num-buckets", 1)
329
330        bucket_name = "default"
331        master = self.servers[0]
332        credentials = self.input.membase_settings
333        rest = RestConnection(self.master)
334        info = rest.get_nodes_self()
335        rest.init_cluster(username=self.master.rest_username,
336                          password=self.master.rest_password)
337        rest.init_cluster_memoryQuota(memoryQuota=info.mcdMemoryReserved)
338        rest.reset_autofailover()
339        ClusterOperationHelper.add_and_rebalance(self.servers, True)
340
341        if num_buckets == 1:
342            bucket_ram = info.memoryQuota * 2 / 3
343            rest.create_bucket(bucket=bucket_name,
344                               ramQuotaMB=bucket_ram,
345                               replicaNumber=replicas,
346                               proxyPort=info.moxi)
347        else:
348            created = BucketOperationHelper.create_multiple_buckets(self.master, replicas, howmany=num_buckets)
349            self.assertTrue(created, "unable to create multiple buckets")
350
351        buckets = rest.get_buckets()
352        for bucket in buckets:
353                ready = BucketOperationHelper.wait_for_memcached(self.master, bucket.name)
354                self.assertTrue(ready, msg="wait_for_memcached failed")
355
356        for bucket in buckets:
357            inserted_keys_cnt = self.load_data(self.master, bucket.name, keys_count)
358            log.info('inserted {0} keys'.format(inserted_keys_cnt))