1import json
2import time
3import unittest
4import testconstants
5from TestInput import TestInputSingleton
6
7from rackzone.rackzone_base import RackzoneBaseTest
8from memcached.helper.data_helper import  MemcachedClientHelper
9from membase.api.rest_client import RestConnection, Bucket
10from membase.helper.rebalance_helper import RebalanceHelper
11from membase.api.exception import RebalanceFailedException
12from couchbase_helper.documentgenerator import BlobGenerator
13from remote.remote_util import RemoteMachineShellConnection
14from membase.helper.cluster_helper import ClusterOperationHelper
15from scripts.install import InstallerJob
16from testconstants import COUCHBASE_VERSION_2
17from testconstants import COUCHBASE_VERSION_3
18
19
20
21class RackzoneTests(RackzoneBaseTest):
22    def setUp(self):
23        super(RackzoneTests, self).setUp()
24        self.command = self.input.param("command", "")
25        self.zone = self.input.param("zone", 1)
26        self.replica = self.input.param("replica", 1)
27        self.command_options = self.input.param("command_options", '')
28        self.set_get_ratio = self.input.param("set_get_ratio", 0.9)
29        self.item_size = self.input.param("item_size", 128)
30        self.shutdown_zone = self.input.param("shutdown_zone", 1)
31        self.do_verify = self.input.param("do-verify", True)
32        self.num_node = self.input.param("num_node", 4)
33        self.timeout = 6000
34
35
36    def tearDown(self):
37        super(RackzoneTests, self).tearDown()
38
39    def test_check_default_zone_create_by_default(self):
40        zone_name = "Group 1"
41        self._verify_zone(zone_name)
42
43    def test_create_second_default_zone(self):
44        zone_name = "Group 1"
45        serverInfo = self.servers[0]
46        rest = RestConnection(serverInfo)
47        try:
48            self.log.info("create additional default zone")
49            rest.add_zone(zone_name)
50        except Exception,e :
51            print e
52
53    def test_create_zone_with_upper_case_name(self):
54        zone_name = "ALLWITHUPTERCASE"
55        serverInfo = self.servers[0]
56        rest = RestConnection(serverInfo)
57        try:
58            self.log.info("create zone {0}".format(zone_name))
59            rest.add_zone(zone_name)
60        except Exception,e :
61            print e
62        self._verify_zone(zone_name)
63
64    def test_create_zone_with_lower_case_name(self):
65        zone_name = "allwithlowercaseeeeeee"
66        serverInfo = self.servers[0]
67        rest = RestConnection(serverInfo)
68        try:
69            self.log.info("create zone {0}".format(zone_name))
70            rest.add_zone(zone_name)
71        except Exception,e :
72            print e
73        self._verify_zone(zone_name)
74
75    def test_create_zone_with_all_number_name(self):
76        zone_name = "3223345557666760"
77        serverInfo = self.servers[0]
78        rest = RestConnection(serverInfo)
79        try:
80            self.log.info("create zone {0}".format(zone_name))
81            rest.add_zone(zone_name)
82        except Exception,e :
83            print e
84        self._verify_zone(zone_name)
85
86    def test_create_zone_with_upper_lower_number_name(self):
87        zone_name = "AAABBBCCCaakkkkmmm345672"
88        serverInfo = self.servers[0]
89        rest = RestConnection(serverInfo)
90        try:
91            self.log.info("create zone {0}".format(zone_name))
92            rest.add_zone(zone_name)
93        except Exception,e :
94            print e
95        self._verify_zone(zone_name)
96
97    def test_create_zone_with_upper_lower_number_and_space_name(self):
98        zone_name = " AAAB BBCCC aakkk kmmm3 456 72 "
99        serverInfo = self.servers[0]
100        rest = RestConnection(serverInfo)
101        try:
102            self.log.info("create zone {0}".format(zone_name))
103            rest.add_zone(zone_name)
104        except Exception,e :
105            print e
106        self._verify_zone(zone_name)
107
108    def test_create_zone_with_none_ascii_name(self):
109        # zone name is limited to 64 bytes
110        zone_name = "abcdGHIJKLMNOPQRSTUVWXYZ0123456789efghijklmnopqrstuvwyABCDEF_-.%"
111        serverInfo = self.servers[0]
112        rest = RestConnection(serverInfo)
113        try:
114            self.log.info("create zone {0}".format(zone_name))
115            rest.add_zone(zone_name)
116        except Exception,e :
117            print e
118        self._verify_zone(zone_name)
119
120    def test_delete_empty_defautl_zone(self):
121        zone_name ="test1"
122        default_zone = "Group 1"
123        moved_node = []
124        serverInfo = self.servers[0]
125        moved_node.append(serverInfo.ip)
126        rest = RestConnection(serverInfo)
127        try:
128            self.log.info("create zone {0}".format(zone_name))
129            rest.add_zone(zone_name)
130            if rest.is_zone_exist(zone_name):
131                self.log.info("Move node {0} from zone {1} to zone {2}" \
132                              .format(moved_node, default_zone, zone_name))
133                status = rest.shuffle_nodes_in_zones(moved_node, default_zone, zone_name)
134                if status:
135                    rest.delete_zone(default_zone)
136                else:
137                    self.fail("Failed to move node {0} from zone {1} to zone {2}" \
138                              .format(moved_node, default_zone, zone_name))
139                if not rest.is_zone_exist(default_zone):
140                    self.log.info("successful delete default zone")
141                else:
142                    raise Exception("Failed to delete default zone")
143            rest.rename_zone(zone_name, default_zone)
144        except Exception,e :
145            print e
146
147    """ test params:
148         -p shutdown_zone=1,items=100000,shutdown_zone=1,zone=2,replicas=1 """
149    def test_replica_distribution_in_zone(self):
150        if len(self.servers) < int(self.num_node):
151            msg = "This test needs minimum {1} servers to run.\n  Currently in ini file \
152                   has only {0} servers".format(len(self.servers), self.num_node)
153            self.log.error("{0}".format(msg))
154            raise Exception(msg)
155        if self.shutdown_zone >= self.zone:
156            msg = "shutdown zone should smaller than zone"
157            raise Exception(msg)
158        serverInfo = self.servers[0]
159        rest = RestConnection(serverInfo)
160        zones = []
161        zones.append("Group 1")
162        nodes_in_zone = {}
163        nodes_in_zone["Group 1"] = [serverInfo.ip]
164        """ Create zone base on params zone in test"""
165        if int(self.zone) > 1:
166            for i in range(1,int(self.zone)):
167                a = "Group "
168                zones.append(a + str(i + 1))
169                rest.add_zone(a + str(i + 1))
170        servers_rebalanced = []
171        self.user = serverInfo.rest_username
172        self.password = serverInfo.rest_password
173        if len(self.servers)%int(self.zone) != 0:
174            msg = "unbalance zone.  Recaculate to make balance ratio node/zone"
175            raise Exception(msg)
176        """ Add node to each zone """
177        k = 1
178        for i in range(0, self.zone):
179            if "Group 1" in zones[i]:
180                total_node_per_zone = int(len(self.servers))/int(self.zone) - 1
181            else:
182                nodes_in_zone[zones[i]] = []
183                total_node_per_zone = int(len(self.servers))/int(self.zone)
184            for n in range(0,total_node_per_zone):
185                nodes_in_zone[zones[i]].append(self.servers[k].ip)
186                rest.add_node(user=self.user, password=self.password, \
187                    remoteIp=self.servers[k].ip, port='8091', zone_name=zones[i])
188                k += 1
189        otpNodes = [node.id for node in rest.node_statuses()]
190        """ Start rebalance and monitor it. """
191        started = rest.rebalance(otpNodes, [])
192
193        if started:
194            try:
195                result = rest.monitorRebalance()
196            except RebalanceFailedException as e:
197                self.log.error("rebalance failed: {0}".format(e))
198                return False, servers_rebalanced
199            msg = "successfully rebalanced cluster {0}"
200            self.log.info(msg.format(result))
201        """ Verify replica of one node should not in same zone of active. """
202        self._verify_replica_distribution_in_zones(nodes_in_zone, "tap")
203
204        """ Simulate entire nodes down in zone(s) by killing erlang process"""
205        if self.shutdown_zone >= 1 and self.zone >=2:
206            self.log.info("Start to shutdown nodes in zone to failover")
207            for down_zone in range(1, self.shutdown_zone + 1):
208                down_zone = "Group " + str(down_zone + 1)
209                for sv in nodes_in_zone[down_zone]:
210                    for si in self.servers:
211                        if si.ip == sv:
212                            server = si
213
214                    shell = RemoteMachineShellConnection(server)
215                    os_info = shell.extract_remote_info()
216                    shell.kill_erlang(os_info)
217                    """ Failover down node(s)"""
218                    failed_over = rest.fail_over("ns_1@" + server.ip)
219                    if not failed_over:
220                        self.log.info("unable to failover the node the first time. \
221                                       try again in 75 seconds..")
222                        time.sleep(75)
223                        failed_over = rest.fail_over("ns_1@" + server.ip)
224                    self.assertTrue(failed_over, "unable to failover node after erlang killed")
225        otpNodes = [node.id for node in rest.node_statuses()]
226        self.log.info("start rebalance after failover.")
227        """ Start rebalance and monitor it. """
228        started = rest.rebalance(otpNodes, [])
229        if started:
230            try:
231                result = rest.monitorRebalance()
232            except RebalanceFailedException as e:
233                self.log.error("rebalance failed: {0}".format(e))
234                return False, servers_rebalanced
235            msg = "successfully rebalanced in selected nodes from the cluster ? {0}"
236            self.log.info(msg.format(result))
237        """ Compare current keys in bucekt with initial loaded keys count. """
238        self._verify_total_keys(self.servers[0], self.num_items)
239
240    """ to run this test, use these params:
241            nodes_init=3,version=2.5.1-xx,type=community   """
242    def test_zone_enable_after_upgrade_from_ce_to_ee(self):
243        params = {}
244        params['product'] = self.product
245        params['version'] = self.version
246        params['vbuckets'] = [self.vbuckets]
247        params['type'] = self.type
248        """ install couchbasse server community edition to run the test """
249        InstallerJob().parallel_install(self.servers[:3], params)
250
251        params["type"] = "enterprise"
252        zone_name = "AAABBBCCCaakkkkmmm345672"
253        serverInfo = self.servers[0]
254        ini_servers = self.servers[:self.nodes_init]
255        rest = RestConnection(serverInfo)
256        self.user = serverInfo.rest_username
257        self.password = serverInfo.rest_password
258        if len(ini_servers) > 1:
259            self.cluster.rebalance([ini_servers[0]], ini_servers[1:], [])
260        rest = RestConnection(self.master)
261        self._bucket_creation()
262
263        """ verify all nodes in cluster in CE """
264        if rest.is_enterprise_edition():
265            raise Exception("This test needs couchbase server community edition to run")
266
267        self._load_all_buckets(self.servers[0], self.gen_load, "create", 0)
268        try:
269            self.log.info("create zone {0}".format(zone_name))
270            result = rest.add_zone(zone_name)
271            if result:
272                raise Exception("Zone feature should not be available in CE version")
273        except Exception,e :
274            if "Failed" in e:
275                pass
276
277        for i in range(1, int(self.nodes_init) + 1):
278            if i == 1:
279                """ install EE on one node to do swap rebalance """
280                InstallerJob().parallel_install(self.servers[3:], params)
281                self.cluster.rebalance([ini_servers[0]], [self.servers[int(self.nodes_init)]], [self.servers[int(self.nodes_init) - i]])
282                self.log.info("sleep  5 seconds")
283                time.sleep(5)
284                try:
285                    self.log.info("try to create zone {0} when cluster is not completely EE".format(zone_name))
286                    result = rest.add_zone(zone_name)
287                    if result:
288                        raise Exception("Zone feature should not be available in CE version")
289                except Exception,e :
290                    if "Failed" in e:
291                        pass
292            else:
293                InstallerJob().parallel_install([self.servers[int(self.nodes_init) - (i - 1)]], params)
294                self.cluster.rebalance([ini_servers[0]], [self.servers[int(self.nodes_init) - (i -1)]], [self.servers[int(self.nodes_init) - i]])
295                self.sleep(12, "wait 12 seconds after rebalance")
296                if i < int(self.nodes_init):
297                    try:
298                        self.log.info("try to create zone {0} when cluster is not completely EE".format(zone_name))
299                        result = rest.add_zone(zone_name)
300                        if result:
301                            raise Exception("Zone feature should not be available in CE version")
302                    except Exception,e :
303                        if "Failed" in e:
304                            pass
305        serverInfo = self.servers[1]
306        rest = RestConnection(serverInfo)
307        self.user = serverInfo.rest_username
308        self.password = serverInfo.rest_password
309        if not rest.is_enterprise_edition():
310            raise Exception("Test failed to upgrade cluster from CE to EE")
311        self.log.info("try to create zone {0} when cluster {1} is completely EE".format(zone_name, serverInfo.ip))
312        result = rest.add_zone(zone_name)
313        self.log.info("sleep  5 seconds")
314        time.sleep(5)
315        if result:
316            self.log.info("Zone feature is available in this cluster")
317        else:
318            raise Exception("Could not create zone with name: %s in cluster.  It's a bug" % zone_name)
319        if rest.is_zone_exist(zone_name.strip()):
320            self.log.info("verified! zone '{0}' is existed".format(zone_name.strip()))
321        else:
322            raise Exception("There is not zone with name: %s in cluster.  It's a bug" % zone_name)
323
324        """ re-install enterprise edition for next test if there is any """
325        InstallerJob().parallel_install([self.servers[0]], params)
326
327        """ reset master node to new node to teardown cluster """
328        self.log.info("Start to clean up cluster")
329        self.master = self.servers[1]
330        self.servers = self.servers[1:]
331
332    def _verify_zone(self, name):
333        serverInfo = self.servers[0]
334        rest = RestConnection(serverInfo)
335        if rest.is_zone_exist(name.strip()):
336            self.log.info("verified! zone '{0}' is existed".format(name.strip()))
337        else:
338            raise Exception("There is not zone with name: %s in cluster" % name)
339
340    def _verify_replica_distribution_in_zones(self, nodes, commmand, saslPassword = ""):
341        shell = RemoteMachineShellConnection(self.servers[0])
342        info = shell.extract_remote_info()
343        if info.type.lower() == 'linux':
344            cbstat_command = "%scbstats" % (testconstants.LINUX_COUCHBASE_BIN_PATH)
345        elif info.type.lower() == 'windows':
346            cbstat_command = "%scbstats.exe" % (testconstants.WIN_COUCHBASE_BIN_PATH)
347        elif info.type.lower() == 'mac':
348            cbstat_command = "%scbstats" % (testconstants.MAC_COUCHBASE_BIN_PATH)
349        else:
350            raise Exception("Not support OS")
351        saslPassword = ''
352        versions = RestConnection(self.master).get_nodes_versions()
353        for group in nodes:
354            for node in nodes[group]:
355                if versions[0][:5] in COUCHBASE_VERSION_2:
356                    command = "tap"
357                    if not info.type.lower() == 'windows':
358                        commands = "%s %s:11210 %s -b %s -p \"%s\" |grep :vb_filter: |  awk '{print $1}' \
359                            | xargs | sed 's/eq_tapq:replication_ns_1@//g'  | sed 's/:vb_filter://g' \
360                            " % (cbstat_command, node, command,"default", saslPassword)
361                    elif info.type.lower() == 'windows':
362                        """ standalone gawk.exe should be copy to ../ICW/bin for command below to work.
363                            Ask IT to do this if you don't know how """
364                        commands = "%s %s:11210 %s -b %s -p \"%s\" | grep.exe :vb_filter: | gawk.exe '{print $1}' \
365                               | sed.exe 's/eq_tapq:replication_ns_1@//g'  | sed.exe 's/:vb_filter://g' \
366                               " % (cbstat_command, node, command,"default", saslPassword)
367                    output, error = shell.execute_command(commands)
368                elif versions[0][:5] in COUCHBASE_VERSION_3:
369                    command = "dcp"
370                    if not info.type.lower() == 'windows':
371                        commands = "%s %s:11210 %s -b %s -p \"%s\" | grep :replication:ns_1@%s |  grep vb_uuid | \
372                                    awk '{print $1}' | sed 's/eq_dcpq:replication:ns_1@%s->ns_1@//g' | \
373                                    sed 's/:.*//g' | sort -u | xargs \
374                                   " % (cbstat_command, node, command,"default", saslPassword, node, node)
375                        output, error = shell.execute_command(commands)
376                    elif info.type.lower() == 'windows':
377                        commands = "%s %s:11210 %s -b %s -p \"%s\" | grep.exe :replication:ns_1@%s |  grep vb_uuid | \
378                                    gawk.exe '{print $1}' | sed.exe 's/eq_dcpq:replication:ns_1@%s->ns_1@//g' | \
379                                    sed.exe 's/:.*//g' \
380                                   " % (cbstat_command, node, command,"default", saslPassword, node, node)
381                        output, error = shell.execute_command(commands)
382                        output = sorted(set(output))
383                shell.log_command_output(output, error)
384                output = output[0].split(" ")
385                if node not in output:
386                    self.log.info("{0}".format(nodes))
387                    self.log.info("replicas of node {0} are in nodes {1}".format(node, output))
388                    self.log.info("replicas of node {0} are not in its zone {1}".format(node, group))
389                else:
390                    raise Exception("replica of node {0} are on its own zone {1}".format(node, group))
391        shell.disconnect()
392
393    def _verify_total_keys(self, server, loaded_keys):
394        rest = RestConnection(server)
395        buckets = rest.get_buckets()
396        for bucket in buckets:
397            self.log.info("start to verify bucket: {0}".format(bucket))
398            stats = rest.get_bucket_stats(bucket)
399            if stats["curr_items"] == loaded_keys:
400                self.log.info("{0} keys in bucket {2} match with \
401                               pre-loaded keys: {1}".format(stats["curr_items"], loaded_keys, bucket))
402            else:
403                raise Exception("{%s keys in bucket %s does not match with \
404                                 loaded %s keys" % (stats["curr_items"], bucket, loaded_keys))
405
406