1from newupgradebasetest import NewUpgradeBaseTest 2import Queue 3import copy 4from membase.helper.cluster_helper import ClusterOperationHelper 5import threading 6from random import randrange, randint 7from remote.remote_util import RemoteMachineShellConnection, RemoteUtilHelper 8from couchbase_helper.tuq_helper import N1QLHelper 9from eventing.eventing_base import EventingBaseTest 10from pytests.eventing.eventing_constants import HANDLER_CODE 11from lib.testconstants import STANDARD_BUCKET_PORT 12from membase.api.rest_client import RestConnection, RestHelper 13from couchbase_helper.documentgenerator import BlobGenerator 14from membase.helper.bucket_helper import BucketOperationHelper 15 16class UpgradeTests(NewUpgradeBaseTest, EventingBaseTest): 17 18 def setUp(self): 19 super(UpgradeTests, self).setUp() 20 self.queue = Queue.Queue() 21 self.graceful = self.input.param("graceful",False) 22 self.after_upgrade_nodes_in = self.input.param("after_upgrade_nodes_in",1) 23 self.after_upgrade_nodes_out = self.input.param("after_upgrade_nodes_out",1) 24 self.verify_vbucket_info = self.input.param("verify_vbucket_info",True) 25 self.initialize_events = self.input.param("initialize_events","").split(":") 26 self.upgrade_services_in = self.input.param("upgrade_services_in", None) 27 self.after_upgrade_services_in = \ 28 self.input.param("after_upgrade_services_in",None) 29 self.after_upgrade_services_out_dist = \ 30 self.input.param("after_upgrade_services_out_dist",None) 31 self.in_between_events = self.input.param("in_between_events","").split(":") 32 self.after_events = self.input.param("after_events","").split(":") 33 self.before_events = self.input.param("before_events","").split(":") 34 self.upgrade_type = self.input.param("upgrade_type","online") 35 self.sherlock_upgrade = self.input.param("sherlock",False) 36 self.max_verify = self.input.param("max_verify", None) 37 self.verify_after_events = self.input.param("verify_after_events", True) 38 self.online_upgrade_type = self.input.param("online_upgrade_type","swap") 39 self.src_bucket_name = self.input.param('src_bucket_name', 'src_bucket') 40 self.eventing_log_level = self.input.param('eventing_log_level', 'INFO') 41 self.dst_bucket_name = self.input.param('dst_bucket_name', 'dst_bucket') 42 self.dst_bucket_name1 = self.input.param('dst_bucket_name1', 'dst_bucket1') 43 self.metadata_bucket_name = self.input.param('metadata_bucket_name', 'metadata') 44 self.create_functions_buckets = self.input.param('create_functions_buckets', True) 45 self.use_memory_manager = self.input.param('use_memory_manager', True) 46 self.final_events = [] 47 self.n1ql_helper = None 48 self.total_buckets = 1 49 self.in_servers_pool = self._convert_server_map(self.servers[:self.nodes_init]) 50 """ Init nodes to not upgrade yet """ 51 for key in self.in_servers_pool.keys(): 52 self.in_servers_pool[key].upgraded = False 53 self.out_servers_pool = self._convert_server_map(self.servers[self.nodes_init:]) 54 self.gen_initial_create = BlobGenerator('upgrade', 'upgrade',\ 55 self.value_size, end=self.num_items) 56 self.gen_create = BlobGenerator('upgrade', 'upgrade', self.value_size,\ 57 start=self.num_items + 1 , end=self.num_items * 1.5) 58 self.gen_update = BlobGenerator('upgrade', 'upgrade', self.value_size,\ 59 start=self.num_items / 2, end=self.num_items) 60 self.gen_delete = BlobGenerator('upgrade', 'upgrade', self.value_size,\ 61 start=self.num_items / 4, end=self.num_items / 2 - 1) 62 self.after_gen_create = BlobGenerator('upgrade', 'upgrade',\ 63 self.value_size, start=self.num_items * 1.6 , end=self.num_items * 2) 64 self.after_gen_update = BlobGenerator('upgrade', 'upgrade',\ 65 self.value_size, start=1 , end=self.num_items/4) 66 self.after_gen_delete = BlobGenerator('upgrade', 'upgrade',\ 67 self.value_size, start=self.num_items * .5,\ 68 end=self.num_items* 0.75) 69 initial_services_setting = self.input.param("initial-services-setting", None) 70 self._install(self.servers[:self.nodes_init]) 71 if not self.init_nodes and initial_services_setting is not None: 72 self.initialize_nodes(self.servers[:self.nodes_init], 73 services=initial_services_setting) 74 self._log_start(self) 75 if len(self.servers[:self.nodes_init]) > 1: 76 if initial_services_setting is None: 77 self.cluster.rebalance(self.servers[:1], self.servers[1:self.nodes_init], 78 [], use_hostnames=self.use_hostnames) 79 else: 80 set_services = self.initial_services(initial_services_setting) 81 for i in range(1, len(set_services)): 82 self.cluster.rebalance([self.servers[0]], [self.servers[i]], [], 83 use_hostnames=self.use_hostnames, 84 services=[set_services[i]]) 85 self.sleep(10) 86 else: 87 self.cluster.rebalance([self.servers[0]], self.servers[1:], [], 88 use_hostnames=self.use_hostnames) 89 self.sleep(10) 90 """ sometimes, when upgrade failed and node does not install couchbase 91 server yet, we could not set quota at beginning of the test. We 92 have to wait to install new couchbase server to set it properly here """ 93 servers_available = copy.deepcopy(self.servers) 94 if len(self.servers) > int(self.nodes_init): 95 servers_available = servers_available[:self.nodes_init] 96 self.quota = self._initialize_nodes(self.cluster, servers_available,\ 97 self.disabled_consistent_view,\ 98 self.rebalanceIndexWaitingDisabled,\ 99 self.rebalanceIndexPausingDisabled,\ 100 self.maxParallelIndexers,\ 101 self.maxParallelReplicaIndexers,\ 102 self.port) 103 self.add_built_in_server_user(node=self.master) 104 self.bucket_size = self._get_bucket_size(self.quota, self.total_buckets) 105 self.create_buckets() 106 self.n1ql_server = None 107 self.success_run = True 108 self.failed_thread = None 109 self.generate_map_nodes_out_dist_upgrade(self.after_upgrade_services_out_dist) 110 if self.upgrade_services_in != "same": 111 self.upgrade_services_in = self.get_services(self.in_servers_pool.values(), 112 self.upgrade_services_in, start_node = 0) 113 self.after_upgrade_services_in = self.get_services(self.out_servers_pool.values(), 114 self.after_upgrade_services_in, start_node = 0) 115 self.fts_obj = None 116 self.index_name_prefix = None 117 118 def tearDown(self): 119 super(UpgradeTests, self).tearDown() 120 121 def test_upgrade(self): 122 self.event_threads = [] 123 self.after_event_threads = [] 124 try: 125 self.log.info("\n*** Start init operations before upgrade begins ***") 126 if self.initialize_events: 127 initialize_events = self.run_event(self.initialize_events) 128 self.finish_events(initialize_events) 129 if not self.success_run and self.failed_thread is not None: 130 raise Exception("*** Failed to {0} ***".format(self.failed_thread)) 131 self.cluster_stats(self.servers[:self.nodes_init]) 132 if self.before_events: 133 self.event_threads += self.run_event(self.before_events) 134 self.log.info("\n*** Start upgrade cluster ***") 135 self.event_threads += self.upgrade_event() 136 if self.upgrade_type == "online": 137 self.monitor_dcp_rebalance() 138 self.finish_events(self.event_threads) 139 self.log.info("\nWill install upgrade version to any free nodes") 140 out_nodes = self._get_free_nodes() 141 self.log.info("Here is free nodes {0}".format(out_nodes)) 142 """ only install nodes out when there is cluster operation """ 143 cluster_ops = ["rebalance_in", "rebalance_out", "rebalance_in_out"] 144 for event in self.after_events[0].split("-"): 145 if event in cluster_ops: 146 self.log.info("\n\nThere are cluster ops after upgrade. Need to " 147 "install free nodes in upgrade version") 148 self._install(out_nodes) 149 break 150 self.generate_map_nodes_out_dist_upgrade(\ 151 self.after_upgrade_services_out_dist) 152 self.log.info("\n\n*** Start operations after upgrade is done ***") 153 self.add_built_in_server_user() 154 if self.after_events: 155 self.after_event_threads = self.run_event(self.after_events) 156 self.finish_events(self.after_event_threads) 157 if not self.success_run and self.failed_thread is not None: 158 raise Exception("*** Failed to {0} ***".format(self.failed_thread)) 159 """ Default set to always verify data """ 160 if self.after_events[0]: 161 self.log.info("*** Start after events ***") 162 for event in self.after_events[0].split("-"): 163 if "delete_buckets" in event: 164 self.log.info("After events has delete buckets event. " 165 "No items verification needed") 166 self.verify_after_events = False 167 break 168 if self.verify_after_events: 169 self.log.info("*** Start data verification ***") 170 self.cluster_stats(self.in_servers_pool.values()) 171 self._verify_data_active_replica() 172 except Exception, ex: 173 self.log.info(ex) 174 print "*** Stop all events to stop the test ***" 175 self.stop_all_events(self.event_threads) 176 self.stop_all_events(self.after_event_threads) 177 raise 178 finally: 179 self.log.info("any events for which we need to cleanup") 180 self.cleanup_events() 181 182 def _record_vbuckets(self, master, servers): 183 map ={} 184 for bucket in self.buckets: 185 self.log.info(" record vbucket for the bucket {0}".format(bucket.name)) 186 map[bucket.name] = RestHelper(RestConnection(master))\ 187 ._get_vbuckets(servers, bucket_name=bucket.name) 188 return map 189 190 def _find_master(self): 191 self.master = self.in_servers_pool.values()[0] 192 193 def _verify_data_active_replica(self): 194 """ set data_analysis True by default """ 195 self.data_analysis = self.input.param("data_analysis",False) 196 self.total_vbuckets = self.initial_vbuckets 197 if self.data_analysis: 198 disk_replica_dataset, disk_active_dataset = \ 199 self.get_and_compare_active_replica_data_set_all(\ 200 self.in_servers_pool.values(),\ 201 self.buckets,\ 202 path=None) 203 self.data_analysis_active_replica_all(disk_active_dataset,\ 204 disk_replica_dataset,\ 205 self.in_servers_pool.values(),\ 206 self.buckets, path=None) 207 """ check vbucket distribution analysis after rebalance """ 208 self.vb_distribution_analysis(servers = \ 209 self.in_servers_pool.values(),\ 210 buckets = self.buckets,\ 211 std = 1.0 ,\ 212 total_vbuckets = self.total_vbuckets) 213 214 def _verify_vbuckets(self, old_vbucket_map, new_vbucket_map): 215 for bucket in self.buckets: 216 self._verify_vbucket_nums_for_swap(old_vbucket_map[bucket.name],\ 217 new_vbucket_map[bucket.name]) 218 219 def stop_all_events(self, thread_list): 220 for t in thread_list: 221 try: 222 if t.isAlive(): 223 t.stop() 224 except Exception, ex: 225 self.log.info(ex) 226 227 def cleanup_events(self): 228 thread_list = [] 229 for event in self.final_events: 230 t = threading.Thread(target=self.find_function(event), args = ()) 231 t.daemon = True 232 t.start() 233 thread_list.append(t) 234 for t in thread_list: 235 t.join() 236 237 def run_event_in_sequence(self, events): 238 q = self.queue 239 self.log.info("run_event_in_sequence") 240 for event in events.split("-"): 241 t = threading.Thread(target=self.find_function(event), args = (q,)) 242 t.daemon = True 243 t.start() 244 t.join() 245 self.success_run = True 246 while not self.queue.empty(): 247 self.success_run &= self.queue.get() 248 if not self.success_run: 249 self.failed_thread = event 250 break 251 252 def run_event(self, events): 253 thread_list = [] 254 for event in events: 255 if "-" in event: 256 t = threading.Thread(target=self.run_event_in_sequence, args = (event,)) 257 t.start() 258 t.join() 259 elif event != '': 260 t = threading.Thread(target=self.find_function(event), args = ()) 261 t.daemon = True 262 t.start() 263 thread_list.append(t) 264 return thread_list 265 266 def find_function(self, event): 267 return getattr(self, event) 268 269 def finish_events(self, thread_list): 270 for t in thread_list: 271 t.join() 272 273 def upgrade_event(self): 274 self.log.info("upgrade_event") 275 thread_list = [] 276 if self.upgrade_type == "online": 277 t = threading.Thread(target=self.online_upgrade, args = ()) 278 elif self.upgrade_type == "offline": 279 t = threading.Thread(target=self.offline_upgrade, args = ()) 280 t.daemon = True 281 t.start() 282 thread_list.append(t) 283 return thread_list 284 285 def server_crash(self): 286 try: 287 self.log.info("server_crash") 288 self.targetProcess= self.input.param("targetProcess",'memcached') 289 for node in self.nodes_out_list: 290 remote = RemoteMachineShellConnection(node) 291 remote.terminate_process(process_name=self.targetProcess) 292 except Exception, ex: 293 self.log.info(ex) 294 raise 295 296 def server_stop(self): 297 try: 298 self.log.info("server_stop") 299 for node in self.nodes_out_list: 300 remote = RemoteMachineShellConnection(node) 301 remote.stop_server() 302 self.final_events.append("start_server") 303 except Exception, ex: 304 self.log.info(ex) 305 raise 306 307 def start_server(self): 308 try: 309 self.log.info("start_server") 310 for node in self.nodes_out_list: 311 remote = RemoteMachineShellConnection(node) 312 remote.start_server() 313 except Exception, ex: 314 self.log.info(ex) 315 raise 316 317 def failover(self, queue=None): 318 failover_node = False 319 try: 320 self.log.info("VVVVVV failover a node ") 321 print "failover node ", self.nodes_out_list 322 nodes = self.get_nodes_in_cluster_after_upgrade() 323 failover_task = self.cluster.async_failover([self.master], 324 failover_nodes = self.nodes_out_list, graceful=self.graceful) 325 failover_task.result() 326 if self.graceful: 327 """ Check if rebalance is still running """ 328 msg = "graceful failover failed for nodes" 329 self.assertTrue(RestConnection(self.master).monitorRebalance(\ 330 stop_if_loop=True), msg=msg) 331 rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init], 332 [], self.nodes_out_list) 333 rebalance.result() 334 failover_node = True 335 else: 336 msg = "Failed to failover a node" 337 self.assertTrue(RestConnection(self.master).monitorRebalance(\ 338 stop_if_loop=True), msg=msg) 339 rebalance = self.cluster.async_rebalance(nodes, [], 340 self.nodes_out_list) 341 rebalance.result() 342 failover_node = True 343 except Exception, ex: 344 self.log.info(ex) 345 if queue is not None: 346 queue.put(False) 347 if failover_node and queue is not None: 348 queue.put(True) 349 350 def autofailover(self): 351 try: 352 self.log.info("autofailover") 353 autofailover_timeout = 30 354 status = RestConnection(self.master).update_autofailover_settings(True, autofailover_timeout) 355 self.assertTrue(status, 'failed to change autofailover_settings!') 356 servr_out = self.nodes_out_list 357 remote = RemoteMachineShellConnection(self.nodes_out_list[0]) 358 remote.stop_server() 359 self.sleep(autofailover_timeout + 10, "Wait for autofailover") 360 rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init], 361 [], [self.nodes_out_list[0]]) 362 rebalance.result() 363 except Exception, ex: 364 self.log.info(ex) 365 raise 366 367 def network_partitioning(self): 368 try: 369 self.log.info("network_partitioning") 370 for node in self.nodes_out_list: 371 self.start_firewall_on_node(node) 372 self.final_events.append("undo_network_partitioning") 373 except Exception, ex: 374 self.log.info(ex) 375 raise 376 377 def undo_network_partitioning(self): 378 try: 379 self.log.info("remove_network_partitioning") 380 for node in self.nodes_out_list: 381 self.stop_firewall_on_node(node) 382 except Exception, ex: 383 self.log.info(ex) 384 raise 385 386 def bucket_compaction(self): 387 try: 388 self.log.info("couchbase_bucket_compaction") 389 compact_tasks = [] 390 for bucket in self.buckets: 391 compact_tasks.append(self.cluster.async_compact_bucket(self.master,bucket)) 392 except Exception, ex: 393 self.log.info(ex) 394 raise 395 396 def warmup(self, queue=None): 397 node_warmuped = False 398 try: 399 self.log.info("Start warmup operation") 400 nodes = self.get_nodes_in_cluster_after_upgrade() 401 for server in nodes: 402 remote = RemoteMachineShellConnection(server) 403 remote.stop_server() 404 remote.start_server() 405 remote.disconnect() 406 ClusterOperationHelper.wait_for_ns_servers_or_assert(nodes, self) 407 node_warmuped = True 408 except Exception, ex: 409 self.log.info(ex) 410 if queue is not None: 411 queue.put(False) 412 if node_warmuped and queue is not None: 413 queue.put(True) 414 415 def create_lww_bucket(self): 416 self.time_synchronization='enabledWithOutDrift' 417 bucket='default' 418 print 'time_sync {0}'.format(self.time_synchronization) 419 420 helper = RestHelper(self.rest) 421 if not helper.bucket_exists(bucket): 422 node_ram_ratio = BucketOperationHelper.base_bucket_ratio( 423 self.servers) 424 info = self.rest.get_nodes_self() 425 self.rest.create_bucket(bucket=bucket, 426 ramQuotaMB=512,authType='sasl',timeSynchronization=self.time_synchronization) 427 try: 428 ready = BucketOperationHelper.wait_for_memcached(self.master, 429 bucket) 430 self.assertTrue(ready, '', msg = '[ERROR] Expect bucket creation to not work.') 431 finally: 432 self.log.info("Success, created lww bucket") 433 434 435 def bucket_flush(self, queue=None): 436 bucket_flushed = False 437 try: 438 self.log.info("bucket_flush ops") 439 self.rest =RestConnection(self.master) 440 for bucket in self.buckets: 441 self.rest.flush_bucket(bucket.name) 442 bucket_flushed = True 443 except Exception, ex: 444 self.log.info(ex) 445 if queue is not None: 446 queue.put(False) 447 if bucket_flushed and queue is not None: 448 queue.put(True) 449 450 def delete_buckets(self, queue=None): 451 bucket_deleted = False 452 try: 453 self.log.info("delete_buckets") 454 self.rest = RestConnection(self.master) 455 for bucket in self.buckets: 456 self.log.info("delete bucket {0}".format(bucket.name)) 457 self.rest.delete_bucket(bucket.name) 458 bucket_deleted = True 459 except Exception, ex: 460 self.log.info(ex) 461 if queue is not None: 462 queue.put(False) 463 if bucket_deleted and queue is not None: 464 queue.put(True) 465 466 def create_buckets(self, queue=None): 467 bucket_created = False 468 try: 469 self.log.info("create_buckets") 470 if self.dgm_run: 471 self.bucket_size = 256 472 self.default_bucket = False 473 self.sasl_buckets = 1 474 self.sasl_bucket_name = self.sasl_bucket_name + "_" \ 475 + str(self.total_buckets) 476 self.rest = RestConnection(self.master) 477 self._bucket_creation() 478 self.sleep(5, "sleep after create bucket") 479 self.total_buckets +=1 480 bucket_created = True 481 except Exception, ex: 482 self.log.info(ex) 483 if queue is not None: 484 queue.put(False) 485 if bucket_created and queue is not None: 486 queue.put(True) 487 488 def change_bucket_properties(self): 489 try: 490 self.rest = RestConnection(self.master) 491 #Change Bucket Properties 492 for bucket in self.buckets: 493 self.rest.change_bucket_props(bucket, ramQuotaMB=None,\ 494 authType=None, saslPassword=None, replicaNumber=0,\ 495 proxyPort=None, replicaIndex=None, flushEnabled=False) 496 except Exception, ex: 497 self.log.info(ex) 498 raise 499 500 def rebalance_in(self, queue=None): 501 rebalance_in = False 502 service_in = copy.deepcopy(self.after_upgrade_services_in) 503 if service_in is None: 504 service_in = ["kv"] 505 free_nodes = self._convert_server_map(self._get_free_nodes()) 506 if not free_nodes.values(): 507 raise Exception("No free node available to rebalance in") 508 try: 509 self.nodes_in_list = self.out_servers_pool.values()[:self.nodes_in] 510 if int(self.nodes_in) == 1: 511 if len(free_nodes.keys()) > 1: 512 free_node_in = [free_nodes.values()[0]] 513 if len(self.after_upgrade_services_in) > 1: 514 service_in = [self.after_upgrade_services_in[0]] 515 else: 516 free_node_in = free_nodes.values() 517 self.log.info("<<<=== rebalance_in node {0} with services {1}"\ 518 .format(free_node_in, service_in[0])) 519 rebalance = \ 520 self.cluster.async_rebalance(self.servers[:self.nodes_init], 521 free_node_in, 522 [], services = service_in) 523 524 rebalance.result() 525 self.in_servers_pool.update(free_nodes) 526 rebalance_in = True 527 if any("index" in services for services in service_in): 528 self.log.info("Set storageMode to forestdb after add " 529 "index node {0} to cluster".format(free_nodes.keys())) 530 RestConnection(free_nodes.values()[0]).set_indexer_storage_mode() 531 if self.after_upgrade_services_in and \ 532 len(self.after_upgrade_services_in) > 1: 533 self.log.info("remove service '{0}' from service list after " 534 "rebalance done ".format(self.after_upgrade_services_in[0])) 535 self.after_upgrade_services_in.pop(0) 536 self.sleep(10, "wait 10 seconds after rebalance") 537 if free_node_in and free_node_in[0] not in self.servers: 538 self.servers.append(free_node_in[0]) 539 except Exception, ex: 540 self.log.info(ex) 541 if queue is not None: 542 queue.put(False) 543 if rebalance_in and queue is not None: 544 queue.put(True) 545 546 def rebalance_out(self, queue=None): 547 rebalance_out = False 548 try: 549 self.log.info("=====>>>> rebalance_out node {0}"\ 550 .format(self.nodes_out_list)) 551 rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],\ 552 [], self.nodes_out_list) 553 rebalance.result() 554 rebalance_out = True 555 except Exception, ex: 556 self.log.info(ex) 557 if queue is not None: 558 queue.put(False) 559 if rebalance_out and queue is not None: 560 queue.put(True) 561 562 def rebalance_in_out(self, queue=None): 563 rebalance_in_out = False 564 try: 565 self.nodes_in_list = self.out_servers_pool.values()[:self.nodes_in] 566 self.log.info("<<<<<===== rebalance_in node {0}"\ 567 .format(self.nodes_in_list)) 568 self.log.info("=====>>>>> rebalance_out node {0}"\ 569 .format(self.nodes_out_list)) 570 rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],\ 571 self.nodes_in_list, self.nodes_out_list,\ 572 services = self.after_upgrade_services_in) 573 rebalance.result() 574 rebalance_in_out = True 575 except Exception, ex: 576 self.log.info(ex) 577 if queue is not None: 578 queue.put(False) 579 if rebalance_in_out and queue is not None: 580 queue.put(True) 581 582 def incremental_backup(self): 583 self.log.info("incremental_backup") 584 585 def full_backup(self): 586 self.log.info("full_backup") 587 588 def cb_collect_info(self): 589 try: 590 self.log.info("cb_collect_info") 591 log_file_name = "/tmp/sample.zip" 592 output, error = self.shell.execute_cbcollect_info("%s" % (log_file_name)) 593 except Exception, ex: 594 raise 595 finally: 596 self.log.info(ex) 597 598 def create_index(self, queue=None): 599 self.log.info("create_index") 600 self.index_list = {} 601 create_index = False 602 self._initialize_n1ql_helper() 603 try: 604 self.n1ql_helper.create_primary_index(using_gsi = True, 605 server = self.n1ql_server) 606 #self.n1ql_helper.create_primary_index(using_gsi = False, 607 # server = self.n1ql_server) 608 self.log.info("done create_index") 609 create_index = True 610 except Exception, e: 611 self.log.info(e) 612 if queue is not None: 613 queue.put(False) 614 if create_index and queue is not None: 615 queue.put(True) 616 617 def create_index_with_replica_and_query(self, queue=None): 618 """ ,groups=simple,reset_services=True 619 """ 620 self.log.info("Create index with replica and query") 621 self.n1ql_node = self.get_nodes_from_services_map(service_type="n1ql") 622 self._initialize_n1ql_helper() 623 self.index_name_prefix = "random_index_" + str(randint(100000, 999999)) 624 create_index_query = "CREATE INDEX " + self.index_name_prefix + \ 625 " ON default(age) USING GSI WITH {{'num_replica': {0}}};"\ 626 .format(self.num_index_replicas) 627 try: 628 self.create_index() 629 self.n1ql_helper.run_cbq_query(query=create_index_query, 630 server=self.n1ql_node) 631 except Exception, e: 632 self.log.info(e) 633 self.sleep(30) 634 index_map = self.get_index_map() 635 self.log.info(index_map) 636 if not self.expected_err_msg: 637 self.n1ql_helper.verify_replica_indexes([self.index_name_prefix], 638 index_map, 639 self.num_index_replicas) 640 641 def verify_index_with_replica_and_query(self, queue=None): 642 index_map = self.get_index_map() 643 try: 644 self.n1ql_helper.verify_replica_indexes([self.index_name_prefix], 645 index_map, 646 self.num_index_replicas) 647 except Exception, e: 648 self.log.info(e) 649 if queue is not None: 650 queue.put(False) 651 652 def create_views(self, queue=None): 653 self.log.info("*** create_views ***") 654 """ default is 1 ddoc. Change number of ddoc by param ddocs_num=new_number 655 default is 2 views. Change number of views by param 656 view_per_ddoc=new_view_per_doc """ 657 try: 658 self.create_ddocs_and_views(queue) 659 except Exception, e: 660 self.log.info(e) 661 662 def query_views(self, queue=None): 663 self.log.info("*** query_views ***") 664 try: 665 self.verify_all_queries(queue) 666 except Exception, e: 667 self.log.info(e) 668 669 def drop_views(self): 670 self.log.info("drop_views") 671 672 def drop_index(self): 673 self.log.info("drop_index") 674 for bucket_name in self.index_list.keys(): 675 query = "drop index {0} on {1} using gsi"\ 676 .format(self.index_list[bucket_name], bucket_name) 677 self.n1ql_helper.run_cbq_query(query, self.n1ql_server) 678 679 def query_explain(self): 680 self.log.info("query_explain") 681 for bucket in self.buckets: 682 query = "select count(*) from {0}".format(bucket.name) 683 self.n1ql_helper.run_cbq_query(query, self.n1ql_server) 684 query = "explain select count(*) from {0}".format(bucket.name) 685 self.n1ql_helper.run_cbq_query(query, self.n1ql_server) 686 query = "select count(*) from {0} where field_1 = 1".format(bucket.name) 687 self.n1ql_helper.run_cbq_query(query, self.n1ql_server) 688 query = "explain select count(*) from {0} where field_1 = 1".format(bucket.name) 689 self.n1ql_helper.run_cbq_query(query, self.n1ql_server) 690 691 def change_settings(self): 692 try: 693 status = True 694 if "update_notifications" in self.input.test_params: 695 status &= self.rest.update_notifications(str(self.input.param("update_notifications", 'true')).lower()) 696 if "autofailover_timeout" in self.input.test_params: 697 status &= self.rest.update_autofailover_settings(True, self.input.param("autofailover_timeout", None)) 698 if "autofailover_alerts" in self.input.test_params: 699 status &= self.rest.set_alerts_settings('couchbase@localhost', 'root@localhost', 'user', 'pwd') 700 if "autocompaction" in self.input.test_params: 701 tmp, _, _ = self.rest.set_auto_compaction(viewFragmntThresholdPercentage= 702 self.input.param("autocompaction", 50)) 703 status &= tmp 704 if not status: 705 self.fail("some settings were not set correctly!") 706 except Exception, ex: 707 self.log.info(ex) 708 raise 709 710 def create_eventing_services(self, queue=None): 711 """ Only work after cluster upgrade to 5.5.0 completely """ 712 try: 713 rest = RestConnection(self.master) 714 cb_version = rest.get_nodes_version() 715 if 5.5 > float(cb_version[:3]): 716 self.log.info("This eventing test is only for cb version 5.5 and later.") 717 return 718 719 bucket_params = self._create_bucket_params(server=self.master, size=128, 720 replicas=self.num_replicas) 721 self.cluster.create_standard_bucket(name=self.src_bucket_name, port=STANDARD_BUCKET_PORT + 1, 722 bucket_params=bucket_params) 723 self.buckets = RestConnection(self.master).get_buckets() 724 self.src_bucket = RestConnection(self.master).get_buckets() 725 self.cluster.create_standard_bucket(name=self.dst_bucket_name, port=STANDARD_BUCKET_PORT + 1, 726 bucket_params=bucket_params) 727 self.cluster.create_standard_bucket(name=self.metadata_bucket_name, port=STANDARD_BUCKET_PORT + 1, 728 bucket_params=bucket_params) 729 self.buckets = RestConnection(self.master).get_buckets() 730 self.gens_load = self.generate_docs(self.docs_per_day) 731 self.expiry = 3 732 733 self.restServer = self.get_nodes_from_services_map(service_type="eventing") 734 """ must be self.rest to pass in deploy_function""" 735 self.rest = RestConnection(self.restServer) 736 self.load(self.gens_load, buckets=self.buckets, flag=self.item_flag, verify_data=False, 737 batch_size=self.batch_size) 738 function_name = "Function_{0}_{1}".format(randint(1, 1000000000), self._testMethodName) 739 self.function_name = function_name[0:90] 740 body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_ON_UPDATE, worker_count=3) 741 bk_events_created = False 742 rs_events_created = False 743 try: 744 self.deploy_function(body) 745 bk_events_created = True 746 self.verify_eventing_results(self.function_name, 0, skip_stats_validation=True) 747 except Exception as e: 748 self.log.error(e) 749 finally: 750 self.undeploy_and_delete_function(body) 751 except Exception, e: 752 self.log.info(e) 753 754 def create_cbas_services(self, queue=None): 755 """ 756 This test only need max 4 servers to run and only upgrade to vulcan and later 757 Command to run: 758 upgrade.upgrade_tests.UpgradeTests.test_upgrade,items=5000,initial_version=4.6.4-xxxx, 759 nodes_init=3,initialize_events=kv_ops_initialize,upgrade_services_in='kv:index', 760 after_events=rebalance_in-create_cbas_services,after_upgrade_services_in=cbas, 761 dgm_run=true,upgrade_test=True,skip_init_check_cbserver=true,released_upgrade_version=5.5.0-xxx 762 """ 763 try: 764 self.validate_error = False 765 rest = RestConnection(self.master) 766 cb_version = rest.get_nodes_version() 767 if 5.5 > float(cb_version[:3]): 768 self.log.info("This analytic test is only for cb version 5.5 and later.") 769 return 770 self.log.info("Get cbas nodes in cluster") 771 cbas_node = self.get_nodes_from_services_map(service_type="cbas") 772 cbas_rest = RestConnection(self.servers[self.nodes_init]) 773 self.get_services_map() 774 775 kv_nodes = copy.deepcopy(self.servers) 776 kv_maps = [x.replace(":8091", "") for x in self.services_map["kv"]] 777 self.log.info("Get kv node in cluster") 778 for i, node in enumerate(kv_nodes): 779 if node.ip not in kv_maps: 780 del kv_nodes[i] 781 self.cbas_node = cbas_node 782 self.load_sample_buckets(servers=kv_nodes, bucketName="travel-sample", 783 total_items=31591, rest=cbas_rest) 784 self.test_create_dataset_on_bucket() 785 except Exception as e: 786 self.log.info(e) 787 if queue is not None: 788 queue.put(False) 789 if queue is not None: 790 queue.put(True) 791 792 def online_upgrade(self): 793 try: 794 self.log.info("online_upgrade") 795 self.initial_version = self.upgrade_versions[0] 796 self.sleep(self.sleep_time, "Pre-setup of old version is done. " 797 "Wait for online upgrade to {0} version"\ 798 .format(self.initial_version)) 799 self.product = 'couchbase-server' 800 if self.online_upgrade_type == "swap": 801 self.online_upgrade_swap_rebalance() 802 else: 803 self.online_upgrade_incremental() 804 except Exception, ex: 805 self.log.info(ex) 806 raise 807 808 def online_upgrade_swap_rebalance(self): 809 self.log.info("online_upgrade_swap_rebalance") 810 self.swap_num_servers = self.input.param('swap_num_servers', 1) 811 servers = self._convert_server_map(self.servers[:self.nodes_init]) 812 out_servers = self._convert_server_map(self.servers[self.nodes_init:]) 813 self.swap_num_servers = min(self.swap_num_servers, len(out_servers)) 814 start_services_num = 0 815 for i in range(self.nodes_init / self.swap_num_servers): 816 servers_in = {} 817 new_servers = copy.deepcopy(servers) 818 servicesNodeOut = "" 819 for key in out_servers.keys(): 820 servers_in[key] = out_servers[key] 821 out_servers[key].upgraded = True 822 out_servers.pop(key) 823 if len(servers_in) == self.swap_num_servers: 824 break 825 servers_out = {} 826 node_out = None 827 new_servers.update(servers_in) 828 for key in servers.keys(): 829 if len(servers_out) == self.swap_num_servers: 830 break 831 elif not servers[key].upgraded: 832 servers_out[key] = servers[key] 833 new_servers.pop(key) 834 out_servers.update(servers_out) 835 rest = RestConnection(servers.values()[0]) 836 self.log.info("****************************************".format(servers)) 837 self.log.info("cluster nodes = {0}".format(servers.values())) 838 self.log.info("cluster service map = {0}".format(rest.get_nodes_services())) 839 self.log.info("cluster version map = {0}".format(rest.get_nodes_version())) 840 self.log.info("to include in cluster = {0}".format(servers_in.values())) 841 self.log.info("to exclude from cluster = {0}".format(servers_out.values())) 842 self.log.info("****************************************".format(servers)) 843 rest = RestConnection(servers_out.values()[0]) 844 servicesNodeOut = rest.get_nodes_services() 845 servicesNodeOut = ",".join(servicesNodeOut[servers_out.keys()[0]] ) 846 self._install(servers_in.values()) 847 self.sleep(10, "Wait for ns server is ready") 848 old_vbucket_map = self._record_vbuckets(self.master, servers.values()) 849 try: 850 if self.upgrade_services_in == "same": 851 self.cluster.rebalance(servers.values(), servers_in.values(),\ 852 servers_out.values(), 853 services=[servicesNodeOut]) 854 elif self.upgrade_services_in != None and len(self.upgrade_services_in) > 0: 855 self.cluster.rebalance(servers.values(), 856 servers_in.values(), 857 servers_out.values(), 858 services = \ 859 self.upgrade_services_in[start_services_num:start_services_num+\ 860 len(servers_in.values())]) 861 start_services_num += len(servers_in.values()) 862 else: 863 self.cluster.rebalance(servers.values(), servers_in.values(),\ 864 servers_out.values()) 865 except Exception, ex: 866 self.log.info(ex) 867 raise 868 self.out_servers_pool = servers_out 869 self.in_servers_pool = new_servers 870 servers = new_servers 871 self.servers = servers.values() 872 self.master = self.servers[0] 873 if self.verify_vbucket_info: 874 new_vbucket_map = self._record_vbuckets(self.master, self.servers) 875 self._verify_vbuckets(old_vbucket_map, new_vbucket_map) 876 # in the middle of online upgrade events 877 if self.in_between_events: 878 self.event_threads = [] 879 self.event_threads += self.run_event(self.in_between_events) 880 self.finish_events(self.event_threads) 881 self.in_between_events = None 882 883 def online_upgrade_incremental(self): 884 self.log.info("online_upgrade_incremental") 885 try: 886 for server in self.servers[1:]: 887 self.cluster.rebalance(self.servers, [], [server]) 888 self.initial_version = self.upgrade_versions[0] 889 self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for online upgrade to {0} version".\ 890 format(self.initial_version)) 891 self.product = 'couchbase-server' 892 self._install([server]) 893 self.sleep(self.sleep_time, "Installation of new version is done. Wait for rebalance") 894 self.cluster.rebalance(self.servers, [server], []) 895 self.log.info("Rebalanced in upgraded nodes") 896 self.sleep(self.sleep_time) 897 self._new_master(self.servers[1]) 898 self.cluster.rebalance(self.servers, [], [self.servers[0]]) 899 self.log.info("Rebalanced out all old version nodes") 900 except Exception, ex: 901 self.log.info(ex) 902 raise 903 904 def offline_upgrade(self): 905 self._offline_upgrade() 906 907 def failover_add_back(self): 908 try: 909 rest = RestConnection(self.master) 910 recoveryType = self.input.param("recoveryType", "full") 911 servr_out = self.nodes_out_list 912 failover_task =self.cluster.async_failover([self.master], 913 failover_nodes = servr_out, graceful=self.graceful) 914 failover_task.result() 915 nodes_all = rest.node_statuses() 916 nodes = [] 917 if servr_out[0].ip == "127.0.0.1": 918 for failover_node in servr_out: 919 nodes.extend([node for node in nodes_all 920 if (str(node.port) == failover_node.port)]) 921 else: 922 for failover_node in servr_out: 923 nodes.extend([node for node in nodes_all 924 if node.ip == failover_node.ip]) 925 for node in nodes: 926 self.log.info(node) 927 rest.add_back_node(node.id) 928 rest.set_recovery_type(otpNode=node.id, recoveryType=recoveryType) 929 rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init], 930 [], []) 931 rebalance.result() 932 except Exception, ex: 933 raise 934 935 def kv_ops_initialize(self, queue=None): 936 try: 937 self.log.info("kv_ops_initialize") 938 self._load_all_buckets(self.master, self.gen_initial_create, "create",\ 939 self.expire_time, flag=self.item_flag) 940 self.log.info("done kv_ops_initialize") 941 except Exception, ex: 942 self.log.info(ex) 943 if queue is not None: 944 queue.put(False) 945 raise 946 if queue is not None: 947 queue.put(True) 948 949 def kv_after_ops_create(self, queue=None): 950 try: 951 self.log.info("kv_after_ops_create") 952 self._load_all_buckets(self.master, self.after_gen_create, "create",\ 953 self.expire_time, flag=self.item_flag) 954 for bucket in self.buckets: 955 self.log.info(" record vbucket for the bucket {0}"\ 956 .format(bucket.name)) 957 curr_items = \ 958 RestConnection(self.master).get_active_key_count(bucket.name) 959 self.log.info("{0} curr_items in bucket {1} "\ 960 .format(curr_items, bucket.name)) 961 except Exception, ex: 962 self.log.info(ex) 963 if queue is not None: 964 queue.put(False) 965 if queue is not None: 966 queue.put(True) 967 968 def kv_after_ops_update(self): 969 try: 970 self.log.info("kv_after_ops_update") 971 self._load_all_buckets(self.master, self.after_gen_update, "update", 972 self.expire_time, flag=self.item_flag) 973 except Exception, ex: 974 self.log.info(ex) 975 raise 976 977 def kv_after_ops_delete(self): 978 try: 979 self.log.info("kv_after_ops_delete") 980 self._load_all_buckets(self.master, self.after_gen_delete, "delete", 981 self.expire_time, flag=self.item_flag) 982 except Exception, ex: 983 self.log.info(ex) 984 raise 985 986 def doc_ops_initialize(self, queue=None): 987 try: 988 self.log.info("load doc to all buckets") 989 self._load_doc_data_all_buckets(data_op="create", batch_size=1000, 990 gen_load=None) 991 self.log.info("done initialize load doc to all buckets") 992 except Exception, ex: 993 self.log.info(ex) 994 if queue is not None: 995 queue.put(False) 996 if queue is not None: 997 queue.put(True) 998 999 def kv_ops_create(self): 1000 try: 1001 self.log.info("kv_ops_create") 1002 self._load_all_buckets(self.master, self.gen_create, "create", 1003 self.expire_time, flag=self.item_flag) 1004 except Exception, ex: 1005 self.log.info(ex) 1006 raise 1007 1008 def kv_ops_update(self): 1009 try: 1010 self.log.info("kv_ops_update") 1011 self._load_all_buckets(self.master, self.gen_update, "update", 1012 self.expire_time, flag=self.item_flag) 1013 except Exception, ex: 1014 self.log.info(ex) 1015 raise 1016 1017 def kv_ops_delete(self): 1018 try: 1019 self.log.info("kv_ops_delete") 1020 self._load_all_buckets(self.master, self.gen_delete, "delete", 1021 self.expire_time, flag=self.item_flag) 1022 except Exception, ex: 1023 self.log.info(ex) 1024 raise 1025 1026 def add_sub_doc(self): 1027 try: 1028 self.log.info("add sub doc") 1029 """add sub doc code here""" 1030 except Exception, ex: 1031 self.log.info(ex) 1032 raise 1033 1034 1035 def create_fts_index(self, queue=None): 1036 try: 1037 self.log.info("Checking if index already exists ...") 1038 name = "default" 1039 """ test on one bucket """ 1040 for bucket in self.buckets: 1041 name = bucket.name 1042 break 1043 SOURCE_CB_PARAMS = { 1044 "authUser": "default", 1045 "authPassword": "", 1046 "authSaslUser": "", 1047 "authSaslPassword": "", 1048 "clusterManagerBackoffFactor": 0, 1049 "clusterManagerSleepInitMS": 0, 1050 "clusterManagerSleepMaxMS": 20000, 1051 "dataManagerBackoffFactor": 0, 1052 "dataManagerSleepInitMS": 0, 1053 "dataManagerSleepMaxMS": 20000, 1054 "feedBufferSizeBytes": 0, 1055 "feedBufferAckThreshold": 0 1056 } 1057 self.index_type = 'fulltext-index' 1058 self.index_definition = { 1059 "type": "fulltext-index", 1060 "name": "", 1061 "uuid": "", 1062 "params": {}, 1063 "sourceType": "couchbase", 1064 "sourceName": "", 1065 "sourceUUID": "", 1066 "sourceParams": SOURCE_CB_PARAMS, 1067 "planParams": {} 1068 } 1069 self.name = self.index_definition['name'] = \ 1070 self.index_definition['sourceName'] = name 1071 fts_node = self.get_nodes_from_services_map("fts", \ 1072 servers=self.get_nodes_in_cluster_after_upgrade()) 1073 if fts_node: 1074 rest = RestConnection(fts_node) 1075 status, _ = rest.get_fts_index_definition(self.name) 1076 if status != 400: 1077 rest.delete_fts_index(self.name) 1078 self.log.info("Creating {0} {1} on {2}".format(self.index_type, 1079 self.name, rest.ip)) 1080 rest.create_fts_index(self.name, self.index_definition) 1081 else: 1082 raise("No FTS node in cluster") 1083 self.ops_dist_map = self.calculate_data_change_distribution( 1084 create_per=self.create_ops_per , update_per=self.update_ops_per , 1085 delete_per=self.delete_ops_per, expiry_per=self.expiry_ops_per, 1086 start=0, end=self.docs_per_day) 1087 self.log.info(self.ops_dist_map) 1088 self.dataset = "default" 1089 self.docs_gen_map = self.generate_ops_docs(self.docs_per_day, 0) 1090 self.async_ops_all_buckets(self.docs_gen_map, batch_size=100) 1091 except Exception, ex: 1092 self.log.info(ex) 1093 1094 def create_fts_index_query(self, queue=None): 1095 try: 1096 self.fts_obj = self.create_fts_index_query_compare() 1097 return self.fts_obj 1098 except Exception, ex: 1099 self.log.info(ex) 1100 if queue is not None: 1101 queue.put(False) 1102 if queue is not None: 1103 queue.put(True) 1104 1105 def cluster_stats(self, servers): 1106 self._wait_for_stats_all_buckets(servers) 1107 1108 def _initialize_n1ql_helper(self): 1109 if self.n1ql_helper == None: 1110 self.n1ql_server = self.get_nodes_from_services_map(service_type = \ 1111 "n1ql",servers=self.input.servers) 1112 self.n1ql_helper = N1QLHelper(version = "sherlock", shell = None, 1113 use_rest = True, max_verify = self.max_verify, 1114 buckets = self.buckets, item_flag = None, 1115 n1ql_port = self.n1ql_server.n1ql_port, full_docs_list = [], 1116 log = self.log, input = self.input, master = self.master) 1117 1118 def _get_free_nodes(self): 1119 self.log.info("Get free nodes in pool not in cluster yet") 1120 nodes = self.get_nodes_in_cluster_after_upgrade() 1121 free_nodes = copy.deepcopy(self.input.servers) 1122 found = False 1123 for node in nodes: 1124 for server in free_nodes: 1125 if str(server.ip).strip() == str(node.ip).strip(): 1126 self.log.info("this node {0} is in cluster".format(server)) 1127 free_nodes.remove(server) 1128 found = True 1129 if not free_nodes: 1130 self.log.info("no free node") 1131 return free_nodes 1132 else: 1133 self.log.info("here is the list of free nodes {0}"\ 1134 .format(free_nodes)) 1135 return free_nodes 1136 1137 def get_nodes_in_cluster_after_upgrade(self, master_node=None): 1138 rest = None 1139 if master_node == None: 1140 rest = RestConnection(self.master) 1141 else: 1142 rest = RestConnection(master_node) 1143 nodes = rest.node_statuses() 1144 server_set = [] 1145 for node in nodes: 1146 for server in self.input.servers: 1147 if server.ip == node.ip: 1148 server_set.append(server) 1149 return server_set 1150