1import json 2import time 3import unittest 4import testconstants 5from TestInput import TestInputSingleton 6 7from rackzone.rackzone_base import RackzoneBaseTest 8from memcached.helper.data_helper import MemcachedClientHelper 9from membase.api.rest_client import RestConnection, Bucket 10from membase.helper.rebalance_helper import RebalanceHelper 11from membase.api.exception import RebalanceFailedException 12from couchbase_helper.documentgenerator import BlobGenerator 13from remote.remote_util import RemoteMachineShellConnection 14from membase.helper.cluster_helper import ClusterOperationHelper 15from scripts.install import InstallerJob 16from testconstants import COUCHBASE_VERSION_2 17from testconstants import COUCHBASE_VERSION_3 18 19 20 21class RackzoneTests(RackzoneBaseTest): 22 def setUp(self): 23 super(RackzoneTests, self).setUp() 24 self.command = self.input.param("command", "") 25 self.zone = self.input.param("zone", 1) 26 self.replica = self.input.param("replica", 1) 27 self.command_options = self.input.param("command_options", '') 28 self.set_get_ratio = self.input.param("set_get_ratio", 0.9) 29 self.item_size = self.input.param("item_size", 128) 30 self.shutdown_zone = self.input.param("shutdown_zone", 1) 31 self.do_verify = self.input.param("do-verify", True) 32 self.num_node = self.input.param("num_node", 4) 33 self.timeout = 6000 34 35 36 def tearDown(self): 37 super(RackzoneTests, self).tearDown() 38 39 def test_check_default_zone_create_by_default(self): 40 zone_name = "Group 1" 41 self._verify_zone(zone_name) 42 43 def test_create_second_default_zone(self): 44 zone_name = "Group 1" 45 serverInfo = self.servers[0] 46 rest = RestConnection(serverInfo) 47 try: 48 self.log.info("create additional default zone") 49 rest.add_zone(zone_name) 50 except Exception,e : 51 print e 52 53 def test_create_zone_with_upper_case_name(self): 54 zone_name = "ALLWITHUPTERCASE" 55 serverInfo = self.servers[0] 56 rest = RestConnection(serverInfo) 57 try: 58 self.log.info("create zone {0}".format(zone_name)) 59 rest.add_zone(zone_name) 60 except Exception,e : 61 print e 62 self._verify_zone(zone_name) 63 64 def test_create_zone_with_lower_case_name(self): 65 zone_name = "allwithlowercaseeeeeee" 66 serverInfo = self.servers[0] 67 rest = RestConnection(serverInfo) 68 try: 69 self.log.info("create zone {0}".format(zone_name)) 70 rest.add_zone(zone_name) 71 except Exception,e : 72 print e 73 self._verify_zone(zone_name) 74 75 def test_create_zone_with_all_number_name(self): 76 zone_name = "3223345557666760" 77 serverInfo = self.servers[0] 78 rest = RestConnection(serverInfo) 79 try: 80 self.log.info("create zone {0}".format(zone_name)) 81 rest.add_zone(zone_name) 82 except Exception,e : 83 print e 84 self._verify_zone(zone_name) 85 86 def test_create_zone_with_upper_lower_number_name(self): 87 zone_name = "AAABBBCCCaakkkkmmm345672" 88 serverInfo = self.servers[0] 89 rest = RestConnection(serverInfo) 90 try: 91 self.log.info("create zone {0}".format(zone_name)) 92 rest.add_zone(zone_name) 93 except Exception,e : 94 print e 95 self._verify_zone(zone_name) 96 97 def test_create_zone_with_upper_lower_number_and_space_name(self): 98 zone_name = " AAAB BBCCC aakkk kmmm3 456 72 " 99 serverInfo = self.servers[0] 100 rest = RestConnection(serverInfo) 101 try: 102 self.log.info("create zone {0}".format(zone_name)) 103 rest.add_zone(zone_name) 104 except Exception,e : 105 print e 106 self._verify_zone(zone_name) 107 108 def test_create_zone_with_none_ascii_name(self): 109 # zone name is limited to 64 bytes 110 zone_name = "abcdGHIJKLMNOPQRSTUVWXYZ0123456789efghijklmnopqrstuvwyABCDEF_-.%" 111 serverInfo = self.servers[0] 112 rest = RestConnection(serverInfo) 113 try: 114 self.log.info("create zone {0}".format(zone_name)) 115 rest.add_zone(zone_name) 116 except Exception,e : 117 print e 118 self._verify_zone(zone_name) 119 120 def test_delete_empty_defautl_zone(self): 121 zone_name ="test1" 122 default_zone = "Group 1" 123 moved_node = [] 124 serverInfo = self.servers[0] 125 moved_node.append(serverInfo.ip) 126 rest = RestConnection(serverInfo) 127 try: 128 self.log.info("create zone {0}".format(zone_name)) 129 rest.add_zone(zone_name) 130 if rest.is_zone_exist(zone_name): 131 self.log.info("Move node {0} from zone {1} to zone {2}" \ 132 .format(moved_node, default_zone, zone_name)) 133 status = rest.shuffle_nodes_in_zones(moved_node, default_zone, zone_name) 134 if status: 135 rest.delete_zone(default_zone) 136 else: 137 self.fail("Failed to move node {0} from zone {1} to zone {2}" \ 138 .format(moved_node, default_zone, zone_name)) 139 if not rest.is_zone_exist(default_zone): 140 self.log.info("successful delete default zone") 141 else: 142 raise Exception("Failed to delete default zone") 143 rest.rename_zone(zone_name, default_zone) 144 except Exception,e : 145 print e 146 147 """ test params: 148 -p shutdown_zone=1,items=100000,shutdown_zone=1,zone=2,replicas=1 """ 149 def test_replica_distribution_in_zone(self): 150 if len(self.servers) < int(self.num_node): 151 msg = "This test needs minimum {1} servers to run.\n Currently in ini file \ 152 has only {0} servers".format(len(self.servers), self.num_node) 153 self.log.error("{0}".format(msg)) 154 raise Exception(msg) 155 if self.shutdown_zone >= self.zone: 156 msg = "shutdown zone should smaller than zone" 157 raise Exception(msg) 158 serverInfo = self.servers[0] 159 rest = RestConnection(serverInfo) 160 zones = [] 161 zones.append("Group 1") 162 nodes_in_zone = {} 163 nodes_in_zone["Group 1"] = [serverInfo.ip] 164 """ Create zone base on params zone in test""" 165 if int(self.zone) > 1: 166 for i in range(1,int(self.zone)): 167 a = "Group " 168 zones.append(a + str(i + 1)) 169 rest.add_zone(a + str(i + 1)) 170 servers_rebalanced = [] 171 self.user = serverInfo.rest_username 172 self.password = serverInfo.rest_password 173 if len(self.servers)%int(self.zone) != 0: 174 msg = "unbalance zone. Recaculate to make balance ratio node/zone" 175 raise Exception(msg) 176 """ Add node to each zone """ 177 k = 1 178 for i in range(0, self.zone): 179 if "Group 1" in zones[i]: 180 total_node_per_zone = int(len(self.servers))/int(self.zone) - 1 181 else: 182 nodes_in_zone[zones[i]] = [] 183 total_node_per_zone = int(len(self.servers))/int(self.zone) 184 for n in range(0,total_node_per_zone): 185 nodes_in_zone[zones[i]].append(self.servers[k].ip) 186 rest.add_node(user=self.user, password=self.password, \ 187 remoteIp=self.servers[k].ip, port='8091', zone_name=zones[i]) 188 k += 1 189 otpNodes = [node.id for node in rest.node_statuses()] 190 """ Start rebalance and monitor it. """ 191 started = rest.rebalance(otpNodes, []) 192 193 if started: 194 try: 195 result = rest.monitorRebalance() 196 except RebalanceFailedException as e: 197 self.log.error("rebalance failed: {0}".format(e)) 198 return False, servers_rebalanced 199 msg = "successfully rebalanced cluster {0}" 200 self.log.info(msg.format(result)) 201 """ Verify replica of one node should not in same zone of active. """ 202 self._verify_replica_distribution_in_zones(nodes_in_zone, "tap") 203 204 """ Simulate entire nodes down in zone(s) by killing erlang process""" 205 if self.shutdown_zone >= 1 and self.zone >=2: 206 self.log.info("Start to shutdown nodes in zone to failover") 207 for down_zone in range(1, self.shutdown_zone + 1): 208 down_zone = "Group " + str(down_zone + 1) 209 for sv in nodes_in_zone[down_zone]: 210 for si in self.servers: 211 if si.ip == sv: 212 server = si 213 214 shell = RemoteMachineShellConnection(server) 215 os_info = shell.extract_remote_info() 216 shell.kill_erlang(os_info) 217 """ Failover down node(s)""" 218 failed_over = rest.fail_over("ns_1@" + server.ip) 219 if not failed_over: 220 self.log.info("unable to failover the node the first time. \ 221 try again in 75 seconds..") 222 time.sleep(75) 223 failed_over = rest.fail_over("ns_1@" + server.ip) 224 self.assertTrue(failed_over, "unable to failover node after erlang killed") 225 otpNodes = [node.id for node in rest.node_statuses()] 226 self.log.info("start rebalance after failover.") 227 """ Start rebalance and monitor it. """ 228 started = rest.rebalance(otpNodes, []) 229 if started: 230 try: 231 result = rest.monitorRebalance() 232 except RebalanceFailedException as e: 233 self.log.error("rebalance failed: {0}".format(e)) 234 return False, servers_rebalanced 235 msg = "successfully rebalanced in selected nodes from the cluster ? {0}" 236 self.log.info(msg.format(result)) 237 """ Compare current keys in bucekt with initial loaded keys count. """ 238 self._verify_total_keys(self.servers[0], self.num_items) 239 240 """ to run this test, use these params: 241 nodes_init=3,version=2.5.1-xx,type=community """ 242 def test_zone_enable_after_upgrade_from_ce_to_ee(self): 243 params = {} 244 params['product'] = self.product 245 params['version'] = self.version 246 params['vbuckets'] = [self.vbuckets] 247 params['type'] = self.type 248 """ install couchbasse server community edition to run the test """ 249 InstallerJob().parallel_install(self.servers[:3], params) 250 251 params["type"] = "enterprise" 252 zone_name = "AAABBBCCCaakkkkmmm345672" 253 serverInfo = self.servers[0] 254 ini_servers = self.servers[:self.nodes_init] 255 rest = RestConnection(serverInfo) 256 self.user = serverInfo.rest_username 257 self.password = serverInfo.rest_password 258 if len(ini_servers) > 1: 259 self.cluster.rebalance([ini_servers[0]], ini_servers[1:], []) 260 rest = RestConnection(self.master) 261 self._bucket_creation() 262 263 """ verify all nodes in cluster in CE """ 264 if rest.is_enterprise_edition(): 265 raise Exception("This test needs couchbase server community edition to run") 266 267 self._load_all_buckets(self.servers[0], self.gen_load, "create", 0) 268 try: 269 self.log.info("create zone {0}".format(zone_name)) 270 result = rest.add_zone(zone_name) 271 if result: 272 raise Exception("Zone feature should not be available in CE version") 273 except Exception,e : 274 if "Failed" in e: 275 pass 276 277 for i in range(1, int(self.nodes_init) + 1): 278 if i == 1: 279 """ install EE on one node to do swap rebalance """ 280 InstallerJob().parallel_install(self.servers[3:], params) 281 self.cluster.rebalance([ini_servers[0]], [self.servers[int(self.nodes_init)]], [self.servers[int(self.nodes_init) - i]]) 282 self.log.info("sleep 5 seconds") 283 time.sleep(5) 284 try: 285 self.log.info("try to create zone {0} when cluster is not completely EE".format(zone_name)) 286 result = rest.add_zone(zone_name) 287 if result: 288 raise Exception("Zone feature should not be available in CE version") 289 except Exception,e : 290 if "Failed" in e: 291 pass 292 else: 293 InstallerJob().parallel_install([self.servers[int(self.nodes_init) - (i - 1)]], params) 294 self.cluster.rebalance([ini_servers[0]], [self.servers[int(self.nodes_init) - (i -1)]], [self.servers[int(self.nodes_init) - i]]) 295 self.sleep(12, "wait 12 seconds after rebalance") 296 if i < int(self.nodes_init): 297 try: 298 self.log.info("try to create zone {0} when cluster is not completely EE".format(zone_name)) 299 result = rest.add_zone(zone_name) 300 if result: 301 raise Exception("Zone feature should not be available in CE version") 302 except Exception,e : 303 if "Failed" in e: 304 pass 305 serverInfo = self.servers[1] 306 rest = RestConnection(serverInfo) 307 self.user = serverInfo.rest_username 308 self.password = serverInfo.rest_password 309 if not rest.is_enterprise_edition(): 310 raise Exception("Test failed to upgrade cluster from CE to EE") 311 self.log.info("try to create zone {0} when cluster {1} is completely EE".format(zone_name, serverInfo.ip)) 312 result = rest.add_zone(zone_name) 313 self.log.info("sleep 5 seconds") 314 time.sleep(5) 315 if result: 316 self.log.info("Zone feature is available in this cluster") 317 else: 318 raise Exception("Could not create zone with name: %s in cluster. It's a bug" % zone_name) 319 if rest.is_zone_exist(zone_name.strip()): 320 self.log.info("verified! zone '{0}' is existed".format(zone_name.strip())) 321 else: 322 raise Exception("There is not zone with name: %s in cluster. It's a bug" % zone_name) 323 324 """ re-install enterprise edition for next test if there is any """ 325 InstallerJob().parallel_install([self.servers[0]], params) 326 327 """ reset master node to new node to teardown cluster """ 328 self.log.info("Start to clean up cluster") 329 self.master = self.servers[1] 330 self.servers = self.servers[1:] 331 332 def _verify_zone(self, name): 333 serverInfo = self.servers[0] 334 rest = RestConnection(serverInfo) 335 if rest.is_zone_exist(name.strip()): 336 self.log.info("verified! zone '{0}' is existed".format(name.strip())) 337 else: 338 raise Exception("There is not zone with name: %s in cluster" % name) 339 340 def _verify_replica_distribution_in_zones(self, nodes, commmand, saslPassword = ""): 341 shell = RemoteMachineShellConnection(self.servers[0]) 342 info = shell.extract_remote_info() 343 if info.type.lower() == 'linux': 344 cbstat_command = "%scbstats" % (testconstants.LINUX_COUCHBASE_BIN_PATH) 345 elif info.type.lower() == 'windows': 346 cbstat_command = "%scbstats.exe" % (testconstants.WIN_COUCHBASE_BIN_PATH) 347 elif info.type.lower() == 'mac': 348 cbstat_command = "%scbstats" % (testconstants.MAC_COUCHBASE_BIN_PATH) 349 else: 350 raise Exception("Not support OS") 351 saslPassword = '' 352 versions = RestConnection(self.master).get_nodes_versions() 353 for group in nodes: 354 for node in nodes[group]: 355 if versions[0][:5] in COUCHBASE_VERSION_2: 356 command = "tap" 357 if not info.type.lower() == 'windows': 358 commands = "%s %s:11210 %s -b %s -p \"%s\" |grep :vb_filter: | awk '{print $1}' \ 359 | xargs | sed 's/eq_tapq:replication_ns_1@//g' | sed 's/:vb_filter://g' \ 360 " % (cbstat_command, node, command,"default", saslPassword) 361 elif info.type.lower() == 'windows': 362 """ standalone gawk.exe should be copy to ../ICW/bin for command below to work. 363 Ask IT to do this if you don't know how """ 364 commands = "%s %s:11210 %s -b %s -p \"%s\" | grep.exe :vb_filter: | gawk.exe '{print $1}' \ 365 | sed.exe 's/eq_tapq:replication_ns_1@//g' | sed.exe 's/:vb_filter://g' \ 366 " % (cbstat_command, node, command,"default", saslPassword) 367 output, error = shell.execute_command(commands) 368 elif versions[0][:5] in COUCHBASE_VERSION_3: 369 command = "dcp" 370 if not info.type.lower() == 'windows': 371 commands = "%s %s:11210 %s -b %s -p \"%s\" | grep :replication:ns_1@%s | grep vb_uuid | \ 372 awk '{print $1}' | sed 's/eq_dcpq:replication:ns_1@%s->ns_1@//g' | \ 373 sed 's/:.*//g' | sort -u | xargs \ 374 " % (cbstat_command, node, command,"default", saslPassword, node, node) 375 output, error = shell.execute_command(commands) 376 elif info.type.lower() == 'windows': 377 commands = "%s %s:11210 %s -b %s -p \"%s\" | grep.exe :replication:ns_1@%s | grep vb_uuid | \ 378 gawk.exe '{print $1}' | sed.exe 's/eq_dcpq:replication:ns_1@%s->ns_1@//g' | \ 379 sed.exe 's/:.*//g' \ 380 " % (cbstat_command, node, command,"default", saslPassword, node, node) 381 output, error = shell.execute_command(commands) 382 output = sorted(set(output)) 383 shell.log_command_output(output, error) 384 output = output[0].split(" ") 385 if node not in output: 386 self.log.info("{0}".format(nodes)) 387 self.log.info("replicas of node {0} are in nodes {1}".format(node, output)) 388 self.log.info("replicas of node {0} are not in its zone {1}".format(node, group)) 389 else: 390 raise Exception("replica of node {0} are on its own zone {1}".format(node, group)) 391 shell.disconnect() 392 393 def _verify_total_keys(self, server, loaded_keys): 394 rest = RestConnection(server) 395 buckets = rest.get_buckets() 396 for bucket in buckets: 397 self.log.info("start to verify bucket: {0}".format(bucket)) 398 stats = rest.get_bucket_stats(bucket) 399 if stats["curr_items"] == loaded_keys: 400 self.log.info("{0} keys in bucket {2} match with \ 401 pre-loaded keys: {1}".format(stats["curr_items"], loaded_keys, bucket)) 402 else: 403 raise Exception("{%s keys in bucket %s does not match with \ 404 loaded %s keys" % (stats["curr_items"], bucket, loaded_keys)) 405 406