1import copy 2import json 3import threading 4 5from base_2i import BaseSecondaryIndexingTests 6from membase.api.rest_client import RestConnection, RestHelper 7import random 8from lib import testconstants 9from lib.couchbase_helper.tuq_generators import TuqGenerators 10from lib.memcached.helper.data_helper import MemcachedClientHelper 11from lib.remote.remote_util import RemoteMachineShellConnection 12from threading import Thread 13from pytests.query_tests_helper import QueryHelperTests 14from couchbase_helper.documentgenerator import JsonDocGenerator 15from couchbase_helper.cluster import Cluster 16from gsi_replica_indexes import GSIReplicaIndexesTests 17from lib.membase.helper.cluster_helper import ClusterOperationHelper 18 19 20class GSIIndexPartitioningTests(GSIReplicaIndexesTests): 21 def setUp(self): 22 super(GSIIndexPartitioningTests, self).setUp() 23 self.num_items = self.input.param("items", 5000) 24 self.log.info("No. of items: {0}".format(str(self.num_items))) 25 self.index_servers = self.get_nodes_from_services_map( 26 service_type="index", get_all_nodes=True) 27 self.rest = RestConnection(self.index_servers[0]) 28 self.node_list = [] 29 for server in self.index_servers: 30 self.node_list.append(server.ip + ":" + server.port) 31 32 self.num_queries = self.input.param("num_queries", 100) 33 self.num_index_partitions = self.input.param("num_index_partitions", 8) 34 self.recover_failed_node = self.input.param("recover_failed_node", 35 False) 36 self.op_type = self.input.param("op_type", "create") 37 self.node_operation = self.input.param("node_op", "reboot") 38 39 def tearDown(self): 40 super(GSIIndexPartitioningTests, self).tearDown() 41 42 # Test that generates n number of create index statements with various permutations and combinations 43 # of different clauses used in the create index statement. 44 def test_create_partitioned_indexes(self): 45 self._load_emp_dataset(end=self.num_items) 46 47 create_index_queries = self.generate_random_create_index_statements( 48 bucketname=self.buckets[0].name, idx_node_list=self.node_list, 49 num_statements=self.num_queries) 50 51 failed_index_creation = 0 52 for create_index_query in create_index_queries: 53 54 try: 55 self.n1ql_helper.run_cbq_query( 56 query=create_index_query["index_definition"], 57 server=self.n1ql_node) 58 except Exception, ex: 59 self.log.info(str(ex)) 60 61 self.sleep(10) 62 63 index_metadata = self.rest.get_indexer_metadata() 64 index_map = self.get_index_map() 65 66 if index_metadata: 67 status = self.validate_partitioned_indexes(create_index_query, 68 index_map, 69 index_metadata) 70 71 if not status: 72 failed_index_creation += 1 73 self.log.info( 74 "** Following query failed validation : {0}".format( 75 create_index_query["index_definition"])) 76 else: 77 failed_index_creation += 1 78 self.log.info( 79 "** Following index did not get created : {0}".format( 80 create_index_query["index_definition"])) 81 self.log.info("output from /getIndexStatus") 82 self.log.info(index_metadata) 83 self.log.info("Index Map") 84 self.log.info(index_map) 85 86 drop_index_query = "DROP INDEX default.{0}".format( 87 create_index_query["index_name"]) 88 try: 89 self.n1ql_helper.run_cbq_query( 90 query=drop_index_query, 91 server=self.n1ql_node) 92 except Exception, ex: 93 self.log.info(str(ex)) 94 95 self.log.info( 96 "Total Create Index Statements Run: {0}, Passed : {1}, Failed : {2}".format( 97 self.num_queries, self.num_queries - failed_index_creation, 98 failed_index_creation)) 99 self.assertTrue(failed_index_creation == 0, 100 "Some create index statements failed validations. Pls see the test log above for details.") 101 102 def test_partition_index_with_excluded_nodes(self): 103 self._load_emp_dataset(end=self.num_items) 104 105 # Setting to exclude a node for planner 106 self.rest.set_index_planner_settings("excludeNode=in") 107 # Create partitioned index 108 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name)" 109 110 try: 111 self.n1ql_helper.run_cbq_query( 112 query=create_index_statement, 113 server=self.n1ql_node) 114 except Exception, ex: 115 self.log.info(str(ex)) 116 117 # Validate index created and check the hosts on which partitions are hosted. 118 expected_hosts = self.node_list[1:] 119 expected_hosts.sort() 120 validated = False 121 index_metadata = self.rest.get_indexer_metadata() 122 self.log.info("Indexer Metadata :::") 123 self.log.info(index_metadata) 124 125 for index in index_metadata["status"]: 126 if index["name"] == "idx1": 127 self.log.info("Expected Hosts : {0}".format(expected_hosts)) 128 self.log.info("Actual Hosts : {0}".format(index["hosts"])) 129 self.assertEqual(index["hosts"], expected_hosts, 130 "Planner did not ignore excluded node during index creation") 131 validated = True 132 133 if not validated: 134 self.fail("Looks like index was not created.") 135 136 def test_replica_partition_index_with_excluded_nodes(self): 137 self._load_emp_dataset(end=self.num_items) 138 139 # Setting to exclude a node for planner 140 self.rest.set_index_planner_settings("excludeNode=in") 141 # Create partitioned index 142 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_replica':{0}}}".format( 143 self.num_index_replicas) 144 try: 145 self.n1ql_helper.run_cbq_query( 146 query=create_index_statement, 147 server=self.n1ql_node) 148 except Exception, ex: 149 self.log.info(str(ex)) 150 151 index_names = [] 152 index_names.append("idx1") 153 for i in range(1, self.num_index_replicas + 1): 154 index_names.append("idx1 (replica {0})".format(str(i))) 155 156 # Need to see if the indexes get created in the first place 157 158 # Validate index created and check the hosts on which partitions are hosted. 159 expected_hosts = self.node_list[1:] 160 expected_hosts.sort() 161 validated = False 162 index_metadata = self.rest.get_indexer_metadata() 163 self.log.info("Indexer Metadata :::") 164 self.log.info(index_metadata) 165 166 index_validated = 0 167 for index_name in index_names: 168 for index in index_metadata["status"]: 169 if index["name"] == index_name: 170 self.log.info("Expected Hosts : {0}".format(expected_hosts)) 171 self.log.info("Actual Hosts : {0}".format(index["hosts"])) 172 self.assertEqual(index["hosts"], expected_hosts, 173 "Planner did not ignore excluded node during index creation for {0}".format( 174 index_name)) 175 index_validated += 1 176 177 self.assertEqual(index_validated, (self.num_index_replicas + 1), 178 "All index replicas not created") 179 180 def test_partition_index_by_non_indexed_field(self): 181 self._load_emp_dataset(end=self.num_items) 182 183 create_index_statement = "CREATE INDEX idx1 on default(name,dept) partition by hash(salary) USING GSI" 184 try: 185 self.n1ql_helper.run_cbq_query(query=create_index_statement, 186 server=self.n1ql_node) 187 except Exception, ex: 188 self.log.info(str(ex)) 189 self.fail("index creation failed with error : {0}".format(str(ex))) 190 191 self.sleep(10) 192 index_map = self.get_index_map() 193 self.log.info(index_map) 194 195 index_metadata = self.rest.get_indexer_metadata() 196 self.log.info("Indexer Metadata Before Build:") 197 self.log.info(index_metadata) 198 199 index_details = {} 200 index_details["index_name"] = "idx1" 201 index_details["num_partitions"] = self.num_index_partitions 202 index_details["defer_build"] = False 203 204 self.assertTrue( 205 self.validate_partitioned_indexes(index_details, index_map, 206 index_metadata), 207 "Partitioned index created not as expected") 208 209 def test_default_num_partitions(self): 210 self._load_emp_dataset(end=self.num_items) 211 212 self.rest.set_index_settings( 213 {"indexer.numPartitions": 6}) 214 215 create_index_statement = "CREATE INDEX idx1 on default(name,dept) partition by hash(salary) USING GSI" 216 try: 217 self.n1ql_helper.run_cbq_query(query=create_index_statement, 218 server=self.n1ql_node) 219 except Exception, ex: 220 self.log.info(str(ex)) 221 self.fail("index creation failed with error : {0}".format(str(ex))) 222 223 self.sleep(10) 224 index_map = self.get_index_map() 225 self.log.info(index_map) 226 227 index_metadata = self.rest.get_indexer_metadata() 228 self.log.info("Indexer Metadata Before Build:") 229 self.log.info(index_metadata) 230 231 index_details = {} 232 index_details["index_name"] = "idx1" 233 index_details["num_partitions"] = 6 234 index_details["defer_build"] = False 235 236 self.assertTrue( 237 self.validate_partitioned_indexes(index_details, index_map, 238 index_metadata), 239 "Partitioned index created not as expected") 240 241 def test_change_default_num_partitions_after_create_index(self): 242 self._load_emp_dataset(end=self.num_items) 243 244 self.rest.set_index_settings( 245 {"indexer.numPartitions": 16}) 246 247 create_index_statement = "CREATE INDEX idx1 on default(name,dept) partition by hash(salary) USING GSI" 248 try: 249 self.n1ql_helper.run_cbq_query(query=create_index_statement, 250 server=self.n1ql_node) 251 except Exception, ex: 252 self.log.info(str(ex)) 253 self.fail("index creation failed with error : {0}".format(str(ex))) 254 255 self.sleep(10) 256 index_map = self.get_index_map() 257 self.log.info(index_map) 258 259 index_metadata = self.rest.get_indexer_metadata() 260 self.log.info("Indexer Metadata Before Build:") 261 self.log.info(index_metadata) 262 263 index_details = {} 264 index_details["index_name"] = "idx1" 265 index_details["num_partitions"] = 16 266 index_details["defer_build"] = False 267 268 self.assertTrue( 269 self.validate_partitioned_indexes(index_details, index_map, 270 index_metadata), 271 "Partitioned index created not as expected") 272 273 self.rest.set_index_settings( 274 {"indexer.numPartitions": 32}) 275 276 create_index_statement = "CREATE INDEX idx2 on default(namesalary) partition by hash(salary) USING GSI" 277 try: 278 self.n1ql_helper.run_cbq_query(query=create_index_statement, 279 server=self.n1ql_node) 280 except Exception, ex: 281 self.log.info(str(ex)) 282 self.fail("index creation failed with error : {0}".format(str(ex))) 283 284 self.sleep(10) 285 index_map = self.get_index_map() 286 self.log.info(index_map) 287 288 index_metadata = self.rest.get_indexer_metadata() 289 self.log.info("Indexer Metadata Before Build:") 290 self.log.info(index_metadata) 291 292 index_details = {} 293 index_details["index_name"] = "idx2" 294 index_details["num_partitions"] = 32 295 index_details["defer_build"] = False 296 297 self.assertTrue( 298 self.validate_partitioned_indexes(index_details, index_map, 299 index_metadata), 300 "Partitioned index created not as expected") 301 302 # Validate num_partitions for idx1 doesnt change 303 index_details = {} 304 index_details["index_name"] = "idx1" 305 index_details["num_partitions"] = 16 306 index_details["defer_build"] = False 307 308 self.assertTrue( 309 self.validate_partitioned_indexes(index_details, index_map, 310 index_metadata), 311 "Num partitions for existing indexes changed after updating default value") 312 313 def test_default_num_partitions_negative(self): 314 self._load_emp_dataset(end=self.num_items) 315 316 self.rest.set_index_settings( 317 {"indexer.numPartitions": 8}) 318 319 numpartition_values_str = ["abc", "2018-03-04 18:02:37"] 320 numpartition_values_num = [0, -5, 46.6789] 321 322 for value in numpartition_values_str: 323 324 indexname = "index_" + str(random.randint(1, 100)) 325 try: 326 self.rest.set_index_settings( 327 {"indexer.numPartitions": '{0}'.format(value)}) 328 329 create_index_statement = "CREATE INDEX {0} on default(name,dept) partition by hash(salary) USING GSI".format( 330 indexname) 331 332 self.n1ql_helper.run_cbq_query(query=create_index_statement, 333 server=self.n1ql_node) 334 except Exception, ex: 335 self.log.info(str(ex)) 336 337 self.sleep(10) 338 index_map = self.get_index_map() 339 self.log.info(index_map) 340 341 index_metadata = self.rest.get_indexer_metadata() 342 self.log.info("Indexer Metadata Before Build:") 343 self.log.info(index_metadata) 344 345 index_details = {} 346 index_details["index_name"] = indexname 347 if (not isinstance(value, str)) and int(value) > 0: 348 index_details["num_partitions"] = int(value) 349 else: 350 index_details["num_partitions"] = 8 351 index_details["defer_build"] = False 352 353 self.assertTrue( 354 self.validate_partitioned_indexes(index_details, index_map, 355 index_metadata), 356 "Partitioned index created not as expected") 357 358 for value in numpartition_values_num: 359 indexname = "index_" + str(random.randint(101, 200)) 360 try: 361 self.rest.set_index_settings( 362 {"indexer.numPartitions": value}) 363 364 create_index_statement = "CREATE INDEX {0} on default(name,dept) partition by hash(salary) USING GSI".format( 365 indexname) 366 367 self.n1ql_helper.run_cbq_query(query=create_index_statement, 368 server=self.n1ql_node) 369 except Exception, ex: 370 self.log.info(str(ex)) 371 372 self.sleep(10) 373 index_map = self.get_index_map() 374 self.log.info(index_map) 375 376 index_metadata = self.rest.get_indexer_metadata() 377 self.log.info("Indexer Metadata Before Build:") 378 self.log.info(index_metadata) 379 380 index_details = {} 381 index_details["index_name"] = indexname 382 if (not isinstance(value, str)) and int(value) > 0: 383 index_details["num_partitions"] = int(value) 384 else: 385 index_details["num_partitions"] = 8 386 index_details["defer_build"] = False 387 388 self.assertTrue( 389 self.validate_partitioned_indexes(index_details, index_map, 390 index_metadata), 391 "Partitioned index created not as expected") 392 393 def test_numpartitions_negative(self): 394 self._load_emp_dataset(end=self.num_items) 395 396 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_partition':null}}" 397 try: 398 self.n1ql_helper.run_cbq_query( 399 query=create_index_statement, 400 server=self.n1ql_node) 401 except Exception, ex: 402 self.log.info(str(ex)) 403 404 numpartition_values_str = ["abc", "2018-03-04 18:02:37"] 405 for value in numpartition_values_str: 406 # Create partitioned index 407 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_partition':'{0}'}}".format( 408 value) 409 try: 410 self.n1ql_helper.run_cbq_query( 411 query=create_index_statement, 412 server=self.n1ql_node) 413 except Exception, ex: 414 self.log.info(str(ex)) 415 else: 416 self.fail( 417 "Index got created with an invalid num_partition value : {0}".format( 418 value)) 419 420 numpartition_values_num = [0, -5] 421 for value in numpartition_values_num: 422 # Create partitioned index 423 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_partition':{0}}}".format( 424 value) 425 try: 426 self.n1ql_helper.run_cbq_query( 427 query=create_index_statement, 428 server=self.n1ql_node) 429 except Exception, ex: 430 self.log.info(str(ex)) 431 else: 432 self.fail( 433 "Index got created with an invalid num_partition value : {0}".format( 434 value)) 435 436 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {'num_partition':47.6789}" 437 try: 438 self.n1ql_helper.run_cbq_query( 439 query=create_index_statement, 440 server=self.n1ql_node) 441 except Exception, ex: 442 self.log.info(str(ex)) 443 self.fail( 444 "Index did not get created with an double value for num_partition value : 47.6789") 445 else: 446 self.log.info("Index got created successfully with num_partition being a double value : 47.6789") 447 448 449 def test_partitioned_index_with_replica(self): 450 self._load_emp_dataset(end=self.num_items) 451 452 # Create partitioned index 453 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_replica':{0}, 'num_partition':{1}}}".format( 454 self.num_index_replicas, self.num_index_partitions) 455 456 try: 457 self.n1ql_helper.run_cbq_query( 458 query=create_index_statement, 459 server=self.n1ql_node) 460 except Exception, ex: 461 self.log.info(str(ex)) 462 463 index_metadata = self.rest.get_indexer_metadata() 464 self.log.info("Indexer Metadata :::") 465 self.log.info(index_metadata) 466 467 self.assertTrue(self.validate_partition_map(index_metadata, "idx1", 468 self.num_index_replicas, 469 self.num_index_partitions), 470 "Partition map validation failed") 471 472 def test_partitioned_index_with_replica_with_server_groups(self): 473 self._load_emp_dataset(end=self.num_items) 474 self._create_server_groups() 475 476 # Create partitioned index 477 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_replica':{0}}}".format( 478 self.num_index_replicas) 479 480 try: 481 self.n1ql_helper.run_cbq_query( 482 query=create_index_statement, 483 server=self.n1ql_node) 484 except Exception, ex: 485 self.log.info(str(ex)) 486 487 index_metadata = self.rest.get_indexer_metadata() 488 489 index_hosts_list = [] 490 for index in index_metadata["status"]: 491 index_hosts_list.append(index["hosts"]) 492 493 self.log.info("Index Host List : {0}".format(index_hosts_list)) 494 495 # Need to change the validation logic here. Between index and its replicas, they should have a full set of partitions in both the server groups. 496 # idx11 - .101, .102: 3, 4, 5, 10, 11, 15, 16 497 # idx11 - .103, .104: 1, 2, 6, 7, 8, 9, 12, 13, 14 498 499 # idx12 - .101, .102: 1, 2, 6, 7, 8, 9, 12, 13, 14 500 # idx12 - .103, .104: 3, 4, 5, 10, 11, 15, 16 501 502 validation = True 503 for i in range(0, len(index_hosts_list)): 504 for j in range(i + 1, len(index_hosts_list)): 505 if (index_hosts_list[i].sort() != index_hosts_list[j].sort()): 506 continue 507 else: 508 validation &= False 509 510 self.assertTrue(validation, 511 "Partitions of replica indexes do not honour server grouping") 512 513 def test_create_partitioned_index_one_node_already_down(self): 514 self._load_emp_dataset(end=self.num_items) 515 516 node_out = self.servers[self.node_out] 517 failover_task = self.cluster.async_failover( 518 self.servers[:self.nodes_init], 519 [node_out], 520 self.graceful, wait_for_pending=60) 521 522 failover_task.result() 523 524 # Create partitioned index 525 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name)" 526 527 try: 528 self.n1ql_helper.run_cbq_query( 529 query=create_index_statement, 530 server=self.n1ql_node) 531 except Exception, ex: 532 self.log.info(str(ex)) 533 self.fail("Failed to create index with one node failed") 534 535 if node_out == self.index_servers[0]: 536 rest = RestConnection(self.index_servers[1]) 537 else: 538 rest = self.rest 539 540 index_metadata = rest.get_indexer_metadata() 541 self.log.info("Indexer Metadata :::") 542 self.log.info(index_metadata) 543 544 hosts = index_metadata["status"][0]["hosts"] 545 self.log.info("Actual nodes : {0}".format(hosts)) 546 node_out_str = node_out.ip + ":" + node_out.port 547 self.assertTrue(node_out_str not in hosts, 548 "Partitioned index not created on expected hosts") 549 550 def test_create_partitioned_index_one_node_network_partitioned(self): 551 self._load_emp_dataset(end=self.num_items) 552 553 node_out = self.servers[self.node_out] 554 self.start_firewall_on_node(node_out) 555 self.sleep(10) 556 557 # Create partitioned index 558 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name)" 559 560 try: 561 self.n1ql_helper.run_cbq_query( 562 query=create_index_statement, 563 server=self.n1ql_node) 564 except Exception, ex: 565 self.log.info(str(ex)) 566 self.fail("Failed to create index with one node failed") 567 finally: 568 # Heal network partition and wait for some time to allow indexes 569 # to get built automatically on that node 570 self.stop_firewall_on_node(node_out) 571 self.sleep(120) 572 573 index_metadata = self.rest.get_indexer_metadata() 574 self.log.info("Indexer Metadata :::") 575 self.log.info(index_metadata) 576 577 hosts = index_metadata["status"][0]["hosts"] 578 self.log.info("Actual nodes : {0}".format(hosts)) 579 node_out_str = node_out.ip + ":" + node_out.port 580 self.assertTrue(node_out_str not in hosts, 581 "Partitioned index not created on expected hosts") 582 583 def test_node_fails_during_create_partitioned_index(self): 584 self._load_emp_dataset(end=self.num_items) 585 586 node_out = self.servers[self.node_out] 587 588 # Create partitioned index 589 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name)" 590 591 threads = [] 592 threads.append( 593 Thread(target=self.n1ql_helper.run_cbq_query, name="run_query", 594 args=(create_index_statement, 10, self.n1ql_node))) 595 threads.append( 596 Thread(target=self.cluster.failover, name="failover", args=( 597 self.servers[:self.nodes_init], [node_out], self.graceful, 598 False, 60))) 599 600 for thread in threads: 601 thread.start() 602 self.sleep(5) 603 for thread in threads: 604 thread.join() 605 606 self.sleep(30) 607 608 if node_out == self.index_servers[0]: 609 rest = RestConnection(self.index_servers[1]) 610 else: 611 rest = self.rest 612 613 index_metadata = rest.get_indexer_metadata() 614 self.log.info("Indexer Metadata :::") 615 self.log.info(index_metadata) 616 617 def test_node_nw_partitioned_during_create_partitioned_index(self): 618 self._load_emp_dataset(end=self.num_items) 619 620 node_out = self.servers[self.node_out] 621 622 # Create partitioned index 623 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name)" 624 625 threads = [] 626 threads.append( 627 Thread(target=self.start_firewall_on_node, 628 name="network_partitioning", args=(node_out,))) 629 threads.append( 630 Thread(target=self.n1ql_helper.run_cbq_query, name="run_query", 631 args=(create_index_statement, 10, self.n1ql_node))) 632 633 for thread in threads: 634 thread.start() 635 self.sleep(5) 636 for thread in threads: 637 thread.join() 638 639 self.sleep(10) 640 641 try: 642 index_metadata = self.rest.get_indexer_metadata() 643 self.log.info("Indexer Metadata :::") 644 self.log.info(index_metadata) 645 if index_metadata != {}: 646 hosts = index_metadata["status"][0]["hosts"] 647 self.log.info("Actual nodes : {0}".format(hosts)) 648 node_out_str = node_out.ip + ":" + node_out.port 649 self.assertTrue(node_out_str not in hosts, 650 "Partitioned index not created on expected hosts") 651 else: 652 self.log.info( 653 "Cannot retrieve index metadata since one node is down") 654 except Exception, ex: 655 self.log.info(str(ex)) 656 finally: 657 self.stop_firewall_on_node(node_out) 658 self.sleep(30) 659 index_metadata = self.rest.get_indexer_metadata() 660 self.log.info("Indexer Metadata :::") 661 self.log.info(index_metadata) 662 663 hosts = index_metadata["status"][0]["hosts"] 664 node_out_str = node_out.ip + ":" + node_out.port 665 self.assertTrue(node_out_str in hosts, 666 "Partitioned index not created on all hosts") 667 668 def test_node_nw_partitioned_during_create_partitioned_index_with_node_list( 669 self): 670 self._load_emp_dataset(end=self.num_items) 671 672 node_out = self.servers[self.node_out] 673 node_list_str = "[\"" + "\",\"".join(self.node_list) + "\"]" 674 675 # Create partitioned index 676 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'nodes' : {0}}}".format( 677 node_list_str) 678 679 threads = [] 680 681 threads.append( 682 Thread(target=self.start_firewall_on_node, 683 name="network_partitioning", args=(node_out,))) 684 threads.append( 685 Thread(target=self.n1ql_helper.run_cbq_query, name="run_query", 686 args=(create_index_statement, 10, self.n1ql_node))) 687 688 for thread in threads: 689 thread.start() 690 self.sleep(5) 691 for thread in threads: 692 thread.join() 693 694 self.sleep(10) 695 696 try: 697 index_metadata = self.rest.get_indexer_metadata() 698 self.log.info("Indexer Metadata :::") 699 self.log.info(index_metadata) 700 if index_metadata != {}: 701 hosts = index_metadata["status"][0]["hosts"] 702 self.log.info("Actual nodes : {0}".format(hosts)) 703 node_out_str = node_out.ip + ":" + node_out.port 704 self.assertTrue(node_out_str not in hosts, 705 "Partitioned index not created on expected hosts") 706 else: 707 self.log.info( 708 "Cannot retrieve index metadata since one node is down") 709 except Exception, ex: 710 self.log.info(str(ex)) 711 finally: 712 self.stop_firewall_on_node(node_out) 713 self.sleep(30) 714 index_metadata = self.rest.get_indexer_metadata() 715 self.log.info("Indexer Metadata :::") 716 self.log.info(index_metadata) 717 718 hosts = index_metadata["status"][0]["hosts"] 719 node_out_str = node_out.ip + ":" + node_out.port 720 self.assertTrue(node_out_str in hosts, 721 "Partitioned index not created on all hosts") 722 723 def test_build_partitioned_index(self): 724 self._load_emp_dataset(end=self.num_items) 725 726 index_name_prefix = "random_index_" + str( 727 random.randint(100000, 999999)) 728 if self.num_index_replicas > 0: 729 create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI WITH {{'num_partition': {0}, 'defer_build': true, 'num_replica':{1}}};".format( 730 self.num_index_partitions, self.num_index_replicas) 731 else: 732 create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI WITH {{'num_partition': {0}, 'defer_build': true}};".format( 733 self.num_index_partitions) 734 try: 735 self.n1ql_helper.run_cbq_query(query=create_index_query, 736 server=self.n1ql_node) 737 except Exception, ex: 738 self.log.info(str(ex)) 739 self.fail("index creation failed with error : {0}".format(str(ex))) 740 741 self.sleep(10) 742 index_map = self.get_index_map() 743 self.log.info(index_map) 744 745 index_metadata = self.rest.get_indexer_metadata() 746 self.log.info("Indexer Metadata Before Build:") 747 self.log.info(index_metadata) 748 749 index_details = {} 750 index_details["index_name"] = index_name_prefix 751 index_details["num_partitions"] = self.num_index_partitions 752 index_details["defer_build"] = True 753 754 self.assertTrue( 755 self.validate_partitioned_indexes(index_details, index_map, 756 index_metadata), 757 "Deferred Partitioned index created not as expected") 758 759 # Validation for replica indexes 760 if self.num_index_replicas > 0: 761 for i in range(1, self.num_index_replicas + 1): 762 index_details[ 763 "index_name"] = index_name_prefix + " (replica {0})".format( 764 str(i)) 765 self.assertTrue( 766 self.validate_partitioned_indexes(index_details, index_map, 767 index_metadata), 768 "Deferred Partitioned index created not as expected") 769 770 build_index_query = "BUILD INDEX on `default`(" + index_name_prefix + ")" 771 772 try: 773 self.n1ql_helper.run_cbq_query(query=build_index_query, 774 server=self.n1ql_node) 775 except Exception, ex: 776 self.log.info(str(ex)) 777 self.fail("index building failed with error : {0}".format(str(ex))) 778 779 self.sleep(30) 780 index_map = self.get_index_map() 781 index_metadata = self.rest.get_indexer_metadata() 782 self.log.info("Indexer Metadata After Build:") 783 self.log.info(index_metadata) 784 785 index_details["index_name"] = index_name_prefix 786 index_details["defer_build"] = False 787 788 self.assertTrue( 789 self.validate_partitioned_indexes(index_details, index_map, 790 index_metadata), 791 "Deferred Partitioned index created not as expected") 792 # Validation for replica indexes 793 if self.num_index_replicas > 0: 794 for i in range(1, self.num_index_replicas + 1): 795 index_details[ 796 "index_name"] = index_name_prefix + " (replica {0})".format( 797 str(i)) 798 self.assertTrue( 799 self.validate_partitioned_indexes(index_details, index_map, 800 index_metadata), 801 "Deferred Partitioned index created not as expected") 802 803 def test_build_partitioned_index_one_failed_node(self): 804 self._load_emp_dataset(end=self.num_items) 805 806 index_name_prefix = "random_index_" + str( 807 random.randint(100000, 999999)) 808 node_list_str = "[\"" + "\",\"".join(self.node_list) + "\"]" 809 create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI WITH {{'num_partition': {0}, 'nodes': {1}, 'defer_build': true}};".format( 810 self.num_index_partitions, node_list_str) 811 try: 812 self.n1ql_helper.run_cbq_query(query=create_index_query, 813 server=self.n1ql_node) 814 except Exception, ex: 815 self.log.info(str(ex)) 816 self.fail("index creation failed with error : {0}".format(str(ex))) 817 818 self.sleep(10) 819 index_map = self.get_index_map() 820 self.log.info(index_map) 821 822 index_metadata = self.rest.get_indexer_metadata() 823 self.log.info("Indexer Metadata Before Build:") 824 self.log.info(index_metadata) 825 826 index_details = {} 827 index_details["index_name"] = index_name_prefix 828 index_details["num_partitions"] = self.num_index_partitions 829 index_details["defer_build"] = True 830 831 self.assertTrue( 832 self.validate_partitioned_indexes(index_details, index_map, 833 index_metadata), 834 "Deferred Partitioned index created not as expected") 835 836 node_out = self.servers[self.node_out] 837 failover_task = self.cluster.async_failover( 838 self.servers[:self.nodes_init], 839 [node_out], 840 self.graceful, wait_for_pending=180) 841 842 failover_task.result() 843 844 build_index_query = "BUILD INDEX on `default`(" + index_name_prefix + ")" 845 846 try: 847 self.n1ql_helper.run_cbq_query(query=build_index_query, 848 server=self.n1ql_node) 849 except Exception, ex: 850 self.log.info(str(ex)) 851 self.fail("index building failed with error : {0}".format(str(ex))) 852 853 self.sleep(30) 854 index_map = self.get_index_map() 855 if node_out == self.index_servers[0]: 856 rest = RestConnection(self.index_servers[1]) 857 else: 858 rest = self.rest 859 index_metadata = rest.get_indexer_metadata() 860 self.log.info("Indexer Metadata After Build:") 861 self.log.info(index_metadata) 862 863 index_details["defer_build"] = False 864 865 # At this point, since one node is in a failed state, all partitions would not be built. 866 self.assertTrue( 867 self.validate_partitioned_indexes(index_details, index_map, 868 index_metadata, skip_numpartitions_check=True), 869 "Deferred Partitioned index created not as expected") 870 871 # Recover the failed node and check if after recovery, all partitions are built. 872 if self.recover_failed_node: 873 nodes_all = self.rest.node_statuses() 874 for node in nodes_all: 875 if node.ip == node_out.ip: 876 break 877 878 self.rest.set_recovery_type(node.id, self.recovery_type) 879 self.rest.add_back_node(node.id) 880 881 rebalance = self.cluster.async_rebalance( 882 self.servers[:self.nodes_init], 883 [], []) 884 reached = RestHelper(self.rest).rebalance_reached() 885 self.assertTrue(reached, 886 "rebalance failed, stuck or did not complete") 887 rebalance.result() 888 self.sleep(180) 889 890 index_map = self.get_index_map() 891 index_metadata = self.rest.get_indexer_metadata() 892 self.log.info("Indexer Metadata After Build:") 893 self.log.info(index_metadata) 894 895 index_details["defer_build"] = False 896 897 self.assertTrue( 898 self.validate_partitioned_indexes(index_details, index_map, 899 index_metadata), 900 "Deferred Partitioned index created not as expected") 901 902 def test_failover_during_build_partitioned_index(self): 903 self._load_emp_dataset(end=self.num_items) 904 905 index_name_prefix = "random_index_" + str( 906 random.randint(100000, 999999)) 907 node_list_str = "[\"" + "\",\"".join(self.node_list) + "\"]" 908 create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI WITH {{'num_partition': {0}, 'nodes': {1}, 'defer_build': true}};".format( 909 self.num_index_partitions, node_list_str) 910 try: 911 self.n1ql_helper.run_cbq_query(query=create_index_query, 912 server=self.n1ql_node) 913 except Exception, ex: 914 self.log.info(str(ex)) 915 self.fail("index creation failed with error : {0}".format(str(ex))) 916 917 self.sleep(10) 918 index_map = self.get_index_map() 919 self.log.info(index_map) 920 921 index_metadata = self.rest.get_indexer_metadata() 922 self.log.info("Indexer Metadata Before Build:") 923 self.log.info(index_metadata) 924 925 index_details = {} 926 index_details["index_name"] = index_name_prefix 927 index_details["num_partitions"] = self.num_index_partitions 928 index_details["defer_build"] = True 929 930 self.assertTrue( 931 self.validate_partitioned_indexes(index_details, index_map, 932 index_metadata), 933 "Deferred Partitioned index created not as expected") 934 935 node_out = self.servers[self.node_out] 936 build_index_query = "BUILD INDEX on `default`(" + index_name_prefix + ")" 937 threads = [] 938 threads.append( 939 Thread(target=self.n1ql_helper.run_cbq_query, name="run_query", 940 args=(build_index_query, 10, self.n1ql_node))) 941 threads.append( 942 Thread(target=self.cluster.async_failover, name="failover", args=( 943 self.servers[:self.nodes_init], [node_out], self.graceful))) 944 for thread in threads: 945 thread.start() 946 thread.join() 947 self.sleep(30) 948 949 index_map = self.get_index_map() 950 if node_out == self.index_servers[0]: 951 rest = RestConnection(self.index_servers[1]) 952 else: 953 rest = self.rest 954 index_metadata = rest.get_indexer_metadata() 955 self.log.info("Indexer Metadata After Build:") 956 self.log.info(index_metadata) 957 958 index_details["defer_build"] = False 959 960 # At this point, since one node is in a failed state, all partitions would not be built. 961 self.assertTrue( 962 self.validate_partitioned_indexes(index_details, index_map, 963 index_metadata, skip_numpartitions_check=True), 964 "Deferred Partitioned index created not as expected") 965 966 def test_build_partitioned_index_with_network_partitioning(self): 967 self._load_emp_dataset(end=self.num_items) 968 969 index_name_prefix = "random_index_" + str( 970 random.randint(100000, 999999)) 971 node_list_str = "[\"" + "\",\"".join(self.node_list) + "\"]" 972 create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI WITH {{'num_partition': {0}, 'nodes': {1}, 'defer_build': true}};".format( 973 self.num_index_partitions, node_list_str) 974 try: 975 self.n1ql_helper.run_cbq_query(query=create_index_query, 976 server=self.n1ql_node) 977 except Exception, ex: 978 self.log.info(str(ex)) 979 self.fail("index creation failed with error : {0}".format(str(ex))) 980 981 self.sleep(10) 982 index_map = self.get_index_map() 983 self.log.info(index_map) 984 985 index_metadata = self.rest.get_indexer_metadata() 986 self.log.info("Indexer Metadata Before Build:") 987 self.log.info(index_metadata) 988 989 index_details = {} 990 index_details["index_name"] = index_name_prefix 991 index_details["num_partitions"] = self.num_index_partitions 992 index_details["defer_build"] = True 993 994 self.assertTrue( 995 self.validate_partitioned_indexes(index_details, index_map, 996 index_metadata), 997 "Deferred Partitioned index created not as expected") 998 999 node_out = self.servers[self.node_out] 1000 build_index_query = "BUILD INDEX on `default`(" + index_name_prefix + ")" 1001 1002 try: 1003 self.start_firewall_on_node(node_out) 1004 self.sleep(10) 1005 self.n1ql_helper.run_cbq_query(query=build_index_query, 1006 server=self.n1ql_node) 1007 except Exception, ex: 1008 self.log.info(str(ex)) 1009 if not "Index build will be retried in background" in str(ex): 1010 self.fail("index building failed with error : {0}".format(str(ex))) 1011 else: 1012 self.log.info("Index build failed with expected error") 1013 1014 finally: 1015 # Heal network partition and wait for some time to allow indexes 1016 # to get built automatically on that node 1017 self.stop_firewall_on_node(node_out) 1018 self.sleep(360) 1019 1020 index_map = self.get_index_map() 1021 index_metadata = self.rest.get_indexer_metadata() 1022 self.log.info("Indexer Metadata After Build:") 1023 self.log.info(index_metadata) 1024 1025 index_details["defer_build"] = False 1026 1027 self.assertTrue( 1028 self.validate_partitioned_indexes(index_details, index_map, 1029 index_metadata), 1030 "Deferred Partitioned index created not as expected") 1031 1032 def test_drop_partitioned_index(self): 1033 self._load_emp_dataset(end=self.num_items) 1034 1035 index_name_prefix = "random_index_" + str( 1036 random.randint(100000, 999999)) 1037 1038 with_clause = "WITH {{'num_partition': {0} ".format( 1039 self.num_index_partitions) 1040 if self.num_index_replicas > 0: 1041 with_clause += ", 'num_replica':{0}".format(self.num_index_replicas) 1042 if self.defer_build: 1043 with_clause += ", 'defer_build':True" 1044 with_clause += " }" 1045 1046 create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI {0}".format( 1047 with_clause) 1048 1049 try: 1050 self.n1ql_helper.run_cbq_query(query=create_index_query, 1051 server=self.n1ql_node) 1052 except Exception, ex: 1053 self.log.info(str(ex)) 1054 self.fail( 1055 "index creation failed with error : {0}".format(str(ex))) 1056 1057 self.sleep(10) 1058 index_map = self.get_index_map() 1059 self.log.info(index_map) 1060 1061 index_metadata = self.rest.get_indexer_metadata() 1062 self.log.info("Indexer Metadata Before Build:") 1063 self.log.info(index_metadata) 1064 1065 index_details = {} 1066 index_details["index_name"] = index_name_prefix 1067 index_details["num_partitions"] = self.num_index_partitions 1068 index_details["defer_build"] = self.defer_build 1069 1070 self.assertTrue( 1071 self.validate_partitioned_indexes(index_details, index_map, 1072 index_metadata), 1073 "Deferred Partitioned index created not as expected") 1074 1075 # Validation for replica indexes 1076 if self.num_index_replicas > 0: 1077 for i in range(1, self.num_index_replicas + 1): 1078 index_details[ 1079 "index_name"] = index_name_prefix + " (replica {0})".format( 1080 str(i)) 1081 self.assertTrue( 1082 self.validate_partitioned_indexes(index_details, 1083 index_map, 1084 index_metadata), 1085 "Deferred Partitioned index created not as expected") 1086 1087 drop_index_query = "DROP INDEX `default`." + index_name_prefix 1088 1089 try: 1090 self.n1ql_helper.run_cbq_query(query=drop_index_query, 1091 server=self.n1ql_node) 1092 except Exception, ex: 1093 self.log.info(str(ex)) 1094 self.fail( 1095 "Drop index failed with error : {0}".format(str(ex))) 1096 1097 self.sleep(30) 1098 index_map = self.get_index_map() 1099 self.log.info("Index map after drop index: %s", index_map) 1100 if not index_map == {}: 1101 self.fail("Indexes not dropped correctly") 1102 1103 def test_delete_bucket_cascade_drop_partitioned_index(self): 1104 self._load_emp_dataset(end=self.num_items) 1105 1106 index_name_prefix = "random_index_" + str( 1107 random.randint(100000, 999999)) 1108 1109 with_clause = "WITH {{'num_partition': {0} ".format( 1110 self.num_index_partitions) 1111 if self.num_index_replicas > 0: 1112 with_clause += ", 'num_replica':{0}".format(self.num_index_replicas) 1113 if self.defer_build: 1114 with_clause += ", 'defer_build':True" 1115 with_clause += " }" 1116 1117 create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI {0}".format( 1118 with_clause) 1119 create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_partition':{0}}}".format( 1120 self.num_index_partitions) 1121 1122 try: 1123 self.n1ql_helper.run_cbq_query(query=create_index_query, 1124 server=self.n1ql_node) 1125 self.n1ql_helper.run_cbq_query(query=create_primary_index_statement, 1126 server=self.n1ql_node) 1127 except Exception, ex: 1128 self.log.info(str(ex)) 1129 self.fail( 1130 "index creation failed with error : {0}".format(str(ex))) 1131 1132 self.sleep(10) 1133 index_map = self.get_index_map() 1134 self.log.info(index_map) 1135 1136 index_metadata = self.rest.get_indexer_metadata() 1137 self.log.info("Indexer Metadata Before Build:") 1138 self.log.info(index_metadata) 1139 1140 index_details = {} 1141 index_details["index_name"] = index_name_prefix 1142 index_details["num_partitions"] = self.num_index_partitions 1143 index_details["defer_build"] = self.defer_build 1144 1145 self.assertTrue( 1146 self.validate_partitioned_indexes(index_details, index_map, 1147 index_metadata), 1148 "Deferred Partitioned index created not as expected") 1149 1150 # Validation for replica indexes 1151 if self.num_index_replicas > 0: 1152 for i in range(1, self.num_index_replicas + 1): 1153 index_details[ 1154 "index_name"] = index_name_prefix + " (replica {0})".format( 1155 str(i)) 1156 self.assertTrue( 1157 self.validate_partitioned_indexes(index_details, 1158 index_map, 1159 index_metadata), 1160 "Deferred Partitioned index created not as expected") 1161 1162 self.cluster.bucket_delete(server=self.master, bucket='default') 1163 1164 self.sleep(30) 1165 index_map = self.get_index_map() 1166 self.log.info("Index map after drop index: %s", index_map) 1167 if not index_map == {}: 1168 self.fail("Indexes not dropped correctly") 1169 1170 def test_drop_partitioned_index_one_failed_node(self): 1171 self._load_emp_dataset(end=self.num_items) 1172 1173 index_name_prefix = "random_index_" + str( 1174 random.randint(100000, 999999)) 1175 node_list_str = "[\"" + "\",\"".join(self.node_list) + "\"]" 1176 create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI WITH {{'num_partition': {0}, 'nodes': {1}}};".format( 1177 self.num_index_partitions, node_list_str) 1178 try: 1179 self.n1ql_helper.run_cbq_query(query=create_index_query, 1180 server=self.n1ql_node) 1181 except Exception, ex: 1182 self.log.info(str(ex)) 1183 self.fail( 1184 "index creation failed with error : {0}".format(str(ex))) 1185 1186 self.sleep(10) 1187 index_map = self.get_index_map() 1188 self.log.info(index_map) 1189 1190 index_metadata = self.rest.get_indexer_metadata() 1191 self.log.info("Indexer Metadata:") 1192 self.log.info(index_metadata) 1193 1194 index_details = {} 1195 index_details["index_name"] = index_name_prefix 1196 index_details["num_partitions"] = self.num_index_partitions 1197 index_details["defer_build"] = False 1198 1199 self.assertTrue( 1200 self.validate_partitioned_indexes(index_details, index_map, 1201 index_metadata), 1202 "Partitioned index created not as expected") 1203 1204 node_out = self.servers[self.node_out] 1205 failover_task = self.cluster.async_failover( 1206 self.servers[:self.nodes_init], 1207 [node_out], 1208 self.graceful, wait_for_pending=180) 1209 1210 failover_task.result() 1211 1212 drop_index_query = "DROP INDEX `default`." + index_name_prefix 1213 1214 try: 1215 self.n1ql_helper.run_cbq_query(query=drop_index_query, 1216 server=self.n1ql_node) 1217 except Exception, ex: 1218 self.log.info(str(ex)) 1219 self.fail( 1220 "Drop index failed with error : {0}".format(str(ex))) 1221 1222 self.sleep(30) 1223 index_map = self.get_index_map() 1224 self.log.info("Index map after drop index: %s", index_map) 1225 if not index_map == {}: 1226 self.fail("Indexes not dropped correctly") 1227 1228 if self.recover_failed_node: 1229 nodes_all = self.rest.node_statuses() 1230 for node in nodes_all: 1231 if node.ip == node_out.ip: 1232 break 1233 1234 self.rest.set_recovery_type(node.id, self.recovery_type) 1235 self.rest.add_back_node(node.id) 1236 1237 rebalance = self.cluster.async_rebalance( 1238 self.servers[:self.nodes_init], 1239 [], []) 1240 reached = RestHelper(self.rest).rebalance_reached() 1241 self.assertTrue(reached, 1242 "rebalance failed, stuck or did not complete") 1243 rebalance.result() 1244 self.sleep(180) 1245 1246 index_map = self.get_index_map() 1247 self.log.info("Index map after drop index: %s", index_map) 1248 if not index_map == {}: 1249 self.fail("Indexes not dropped correctly") 1250 1251 def test_failover_during_drop_partitioned_index(self): 1252 self._load_emp_dataset(end=self.num_items) 1253 1254 index_name_prefix = "random_index_" + str( 1255 random.randint(100000, 999999)) 1256 node_list_str = "[\"" + "\",\"".join(self.node_list) + "\"]" 1257 create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI WITH {{'num_partition': {0}, 'nodes': {1}}};".format( 1258 self.num_index_partitions, node_list_str) 1259 try: 1260 self.n1ql_helper.run_cbq_query(query=create_index_query, 1261 server=self.n1ql_node) 1262 except Exception, ex: 1263 self.log.info(str(ex)) 1264 self.fail("index creation failed with error : {0}".format( 1265 str(ex))) 1266 1267 self.sleep(10) 1268 index_map = self.get_index_map() 1269 self.log.info(index_map) 1270 1271 index_metadata = self.rest.get_indexer_metadata() 1272 self.log.info("Indexer Metadata:") 1273 self.log.info(index_metadata) 1274 1275 index_details = {} 1276 index_details["index_name"] = index_name_prefix 1277 index_details["num_partitions"] = self.num_index_partitions 1278 index_details["defer_build"] = False 1279 1280 self.assertTrue( 1281 self.validate_partitioned_indexes(index_details, index_map, 1282 index_metadata), 1283 "Partitioned index created not as expected") 1284 1285 node_out = self.servers[self.node_out] 1286 drop_index_query = "DROP INDEX `default`." + index_name_prefix 1287 threads = [] 1288 threads.append( 1289 Thread(target=self.n1ql_helper.run_cbq_query, 1290 name="run_query", 1291 args=(drop_index_query, 10, self.n1ql_node))) 1292 threads.append( 1293 Thread(target=self.cluster.async_failover, name="failover", 1294 args=( 1295 self.servers[:self.nodes_init], [node_out], 1296 self.graceful))) 1297 for thread in threads: 1298 thread.start() 1299 thread.join() 1300 self.sleep(30) 1301 1302 index_map = self.get_index_map() 1303 self.log.info("Index map after drop index: %s", index_map) 1304 if not index_map == {}: 1305 self.fail("Indexes not dropped correctly") 1306 1307 def test_drop_partitioned_index_with_network_partitioning(self): 1308 self._load_emp_dataset(end=self.num_items) 1309 1310 index_name_prefix = "random_index_" + str( 1311 random.randint(100000, 999999)) 1312 node_list_str = "[\"" + "\",\"".join(self.node_list) + "\"]" 1313 create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI WITH {{'num_partition': {0}, 'nodes': {1}}};".format( 1314 self.num_index_partitions, node_list_str) 1315 try: 1316 self.n1ql_helper.run_cbq_query(query=create_index_query, 1317 server=self.n1ql_node) 1318 except Exception, ex: 1319 self.log.info(str(ex)) 1320 self.fail( 1321 "index creation failed with error : {0}".format(str(ex))) 1322 1323 self.sleep(10) 1324 index_map = self.get_index_map() 1325 self.log.info(index_map) 1326 1327 index_metadata = self.rest.get_indexer_metadata() 1328 self.log.info("Indexer Metadata Before Build:") 1329 self.log.info(index_metadata) 1330 1331 index_details = {} 1332 index_details["index_name"] = index_name_prefix 1333 index_details["num_partitions"] = self.num_index_partitions 1334 index_details["defer_build"] = False 1335 1336 self.assertTrue( 1337 self.validate_partitioned_indexes(index_details, index_map, 1338 index_metadata), 1339 "Partitioned index created not as expected") 1340 1341 node_out = self.servers[self.node_out] 1342 self.start_firewall_on_node(node_out) 1343 1344 drop_index_query = "DROP INDEX `default`." + index_name_prefix 1345 1346 try: 1347 self.start_firewall_on_node(node_out) 1348 self.sleep(10) 1349 self.n1ql_helper.run_cbq_query(query=drop_index_query, 1350 server=self.n1ql_node) 1351 except Exception, ex: 1352 self.log.info(str(ex)) 1353 if not "the operation will automaticaly retry after cluster is back to normal" in str(ex): 1354 self.fail( 1355 "index drop failed with error : {0}".format(str(ex))) 1356 else: 1357 self.log.info("Index drop failed with expected error") 1358 1359 finally: 1360 # Heal network partition and wait for some time to allow indexes 1361 # to get built automatically on that node 1362 self.stop_firewall_on_node(node_out) 1363 self.sleep(360) 1364 1365 index_map = self.get_index_map() 1366 self.log.info("Index map after drop index: %s", index_map) 1367 if not index_map == {}: 1368 self.fail("Indexes not dropped correctly") 1369 1370 def test_partitioned_index_warmup_behaviour(self): 1371 node_out = self.servers[self.node_out] 1372 1373 self._load_emp_dataset(end=self.num_items) 1374 1375 index_name_prefix = "random_index_" + str( 1376 random.randint(100000, 999999)) 1377 1378 create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI" 1379 if self.defer_build: 1380 create_index_query += " WITH {'defer_build':true}" 1381 1382 try: 1383 self.n1ql_helper.run_cbq_query(query=create_index_query, 1384 server=self.n1ql_node) 1385 except Exception, ex: 1386 self.log.info(str(ex)) 1387 self.fail( 1388 "index creation failed with error : {0}".format(str(ex))) 1389 1390 self.sleep(10) 1391 index_map = self.get_index_map() 1392 self.log.info(index_map) 1393 1394 index_metadata = self.rest.get_indexer_metadata() 1395 self.log.info("Indexer Metadata Before:") 1396 self.log.info(index_metadata) 1397 1398 index_details = {} 1399 index_details["index_name"] = index_name_prefix 1400 index_details["num_partitions"] = self.num_index_partitions 1401 index_details["defer_build"] = self.defer_build 1402 1403 self.assertTrue( 1404 self.validate_partitioned_indexes(index_details, index_map, 1405 index_metadata), 1406 "Partitioned index created not as expected") 1407 1408 remote_client = RemoteMachineShellConnection(node_out) 1409 if self.node_operation == "kill_indexer": 1410 remote_client.terminate_process(process_name="indexer") 1411 remote_client.disconnect() 1412 else: 1413 self.reboot_node(node_out) 1414 1415 # wait for restart and warmup on all node 1416 self.sleep(self.wait_timeout * 3) 1417 # disable firewall on these nodes 1418 self.stop_firewall_on_node(node_out) 1419 # wait till node is ready after warmup 1420 ClusterOperationHelper.wait_for_ns_servers_or_assert([node_out], self, 1421 wait_if_warmup=True) 1422 1423 index_map = self.get_index_map() 1424 self.log.info(index_map) 1425 1426 index_metadata = self.rest.get_indexer_metadata() 1427 self.log.info("Indexer Metadata After:") 1428 self.log.info(index_metadata) 1429 1430 self.assertTrue( 1431 self.validate_partitioned_indexes(index_details, index_map, 1432 index_metadata), 1433 "Partitioned index warmup behavior not as expected") 1434 1435 def test_mutations_on_partitioned_indexes(self): 1436 self.run_async_index_operations(operation_type="create_index") 1437 self.run_doc_ops() 1438 self.sleep(30) 1439 1440 # Get item counts 1441 bucket_item_count, total_item_count, total_num_docs_processed = self.get_stats_for_partitioned_indexes() 1442 1443 self.assertEqual(bucket_item_count, total_item_count, 1444 "# Items indexed {0} do not match bucket items {1}".format( 1445 total_item_count, bucket_item_count)) 1446 1447 def test_update_mutations_on_indexed_keys_partitioned_indexes(self): 1448 create_index_query = "CREATE INDEX idx1 ON default(name,mutated) partition by hash(name) USING GSI;" 1449 try: 1450 self.n1ql_helper.run_cbq_query(query=create_index_query, 1451 server=self.n1ql_node) 1452 except Exception, ex: 1453 self.log.info(str(ex)) 1454 self.fail( 1455 "index creation failed with error : {0}".format(str(ex))) 1456 self.run_doc_ops() 1457 self.sleep(30) 1458 1459 # Get item counts 1460 bucket_item_count, total_item_count, total_num_docs_processed = self.get_stats_for_partitioned_indexes( 1461 index_name="idx1") 1462 1463 self.assertEqual(bucket_item_count, total_item_count, 1464 "# Items indexed {0} do not match bucket items {1}".format( 1465 total_item_count, bucket_item_count)) 1466 1467 def test_kv_full_rollback_on_partitioned_indexes(self): 1468 self.run_async_index_operations(operation_type="create_index") 1469 self.sleep(30) 1470 1471 self.cluster.bucket_flush(self.master) 1472 self.sleep(60) 1473 1474 # Get item counts 1475 bucket_item_count, total_item_count, total_num_docs_processed = self.get_stats_for_partitioned_indexes() 1476 1477 self.assertEqual(total_item_count, 0, "Rollback to zero fails") 1478 1479 def test_kv_partial_rollback_on_partitioned_indexes(self): 1480 self.run_async_index_operations(operation_type="create_index") 1481 1482 # Stop Persistence on Node A & Node B 1483 self.log.info("Stopping persistence on NodeA & NodeB") 1484 mem_client = MemcachedClientHelper.direct_client(self.servers[0], 1485 "default") 1486 mem_client.stop_persistence() 1487 mem_client = MemcachedClientHelper.direct_client(self.servers[1], 1488 "default") 1489 mem_client.stop_persistence() 1490 1491 self.run_doc_ops() 1492 1493 self.sleep(10) 1494 1495 # Get count before rollback 1496 bucket_count_before_rollback, item_count_before_rollback, num_docs_processed_before_rollback = self.get_stats_for_partitioned_indexes() 1497 1498 # Kill memcached on Node A so that Node B becomes master 1499 self.log.info("Kill Memcached process on NodeA") 1500 shell = RemoteMachineShellConnection(self.master) 1501 shell.kill_memcached() 1502 1503 # Start persistence on Node B 1504 self.log.info("Starting persistence on NodeB") 1505 mem_client = MemcachedClientHelper.direct_client( 1506 self.input.servers[1], "default") 1507 mem_client.start_persistence() 1508 1509 # Failover Node B 1510 self.log.info("Failing over NodeB") 1511 self.sleep(10) 1512 failover_task = self.cluster.async_failover( 1513 self.servers[:self.nodes_init], [self.servers[1]], self.graceful, 1514 wait_for_pending=120) 1515 1516 failover_task.result() 1517 1518 # Wait for a couple of mins to allow rollback to complete 1519 self.sleep(120) 1520 1521 # Get count after rollback 1522 bucket_count_after_rollback, item_count_after_rollback, num_docs_processed_after_rollback = self.get_stats_for_partitioned_indexes() 1523 1524 self.assertEqual(bucket_count_after_rollback, item_count_after_rollback, 1525 "Partial KV Rollback not processed by Partitioned indexes") 1526 1527 def test_scan_availability(self): 1528 create_index_query = "CREATE INDEX idx1 ON default(name,mutated) partition by hash(BASE64(meta().id)) USING GSI" 1529 if self.num_index_replicas: 1530 create_index_query += " with {{'num_replica':{0}}};".format( 1531 self.num_index_replicas) 1532 try: 1533 self.n1ql_helper.run_cbq_query(query=create_index_query, 1534 server=self.n1ql_node) 1535 except Exception, ex: 1536 self.log.info(str(ex)) 1537 self.fail( 1538 "index creation failed with error : {0}".format(str(ex))) 1539 1540 node_out = self.servers[self.node_out] 1541 failover_task = self.cluster.async_failover( 1542 self.servers[:self.nodes_init], 1543 [node_out], 1544 self.graceful, wait_for_pending=60) 1545 1546 failover_task.result() 1547 1548 self.sleep(30) 1549 1550 # Run query 1551 scan_query = "select name,mutated from default where name > 'a' and mutated >=0;" 1552 try: 1553 self.n1ql_helper.run_cbq_query(query=scan_query, 1554 server=self.n1ql_node) 1555 except Exception, ex: 1556 self.log.info(str(ex)) 1557 if self.num_index_replicas == 0: 1558 if self.expected_err_msg in str(ex): 1559 pass 1560 else: 1561 self.fail( 1562 "Scan failed with unexpected error message".format( 1563 str(ex))) 1564 else: 1565 self.fail("Scan failed") 1566 1567 def test_scan_availability_with_network_partitioning(self): 1568 create_index_query = "CREATE INDEX idx1 ON default(name,mutated) partition by hash(BASE64(meta().id)) USING GSI" 1569 if self.num_index_replicas: 1570 create_index_query += " with {{'num_replica':{0}}};".format( 1571 self.num_index_replicas) 1572 try: 1573 self.n1ql_helper.run_cbq_query(query=create_index_query, 1574 server=self.n1ql_node) 1575 except Exception, ex: 1576 self.log.info(str(ex)) 1577 self.fail( 1578 "index creation failed with error : {0}".format(str(ex))) 1579 1580 # Induce network partitioning on one of the nodes 1581 node_out = self.servers[self.node_out] 1582 self.start_firewall_on_node(node_out) 1583 1584 # Run query 1585 scan_query = "select name,mutated from default where name > 'a' and mutated >=0;" 1586 try: 1587 self.n1ql_helper.run_cbq_query(query=scan_query, 1588 server=self.n1ql_node) 1589 except Exception, ex: 1590 self.log.info( 1591 "Scan failed as one indexer node was experiencing network partititioning. Error : %s", 1592 str(ex)) 1593 1594 # Heal Network Partitioning 1595 self.stop_firewall_on_node(node_out) 1596 1597 # Re-run query 1598 scan_query = "select name,mutated from default where name > 'a' and mutated >=0;" 1599 try: 1600 self.n1ql_helper.run_cbq_query(query=scan_query, 1601 server=self.n1ql_node) 1602 except Exception, ex: 1603 self.log.info(str(ex)) 1604 if self.num_index_replicas: 1605 if self.expected_err_msg in str(ex): 1606 pass 1607 else: 1608 self.fail( 1609 "Scan failed with unexpected error message".format( 1610 str(ex))) 1611 else: 1612 self.fail("Scan failed") 1613 1614 def test_index_scans(self): 1615 self._load_emp_dataset(end=self.num_items) 1616 1617 # Create Partitioned and non-partitioned indexes 1618 1619 if self.num_index_partitions > 0: 1620 self.rest.set_index_settings( 1621 {"indexer.numPartitions": self.num_index_partitions}) 1622 1623 create_partitioned_index1_query = "CREATE INDEX partitioned_idx1 ON default(name,dept,salary) partition by hash(name,dept,salary) USING GSI;" 1624 create_index1_query = "CREATE INDEX non_partitioned_idx1 ON default(name,dept,salary) USING GSI;" 1625 create_partitioned_index2_query = "create index partitioned_idx2 on default(name,manages.team_size) partition by hash(manages.team_size) USING GSI;" 1626 create_index2_query = "create index non_partitioned_idx2 on default(name,manages.team_size) USING GSI;" 1627 create_partitioned_index3_query = "create index partitioned_idx3 on default(name,manages.team_size) partition by hash(name,manages.team_size) USING GSI;" 1628 1629 try: 1630 self.n1ql_helper.run_cbq_query( 1631 query=create_partitioned_index1_query, 1632 server=self.n1ql_node) 1633 self.n1ql_helper.run_cbq_query(query=create_index1_query, 1634 server=self.n1ql_node) 1635 self.n1ql_helper.run_cbq_query( 1636 query=create_partitioned_index2_query, 1637 server=self.n1ql_node) 1638 self.n1ql_helper.run_cbq_query(query=create_index2_query, 1639 server=self.n1ql_node) 1640 self.n1ql_helper.run_cbq_query( 1641 query=create_partitioned_index3_query, 1642 server=self.n1ql_node) 1643 except Exception, ex: 1644 self.log.info(str(ex)) 1645 self.fail( 1646 "index creation failed with error : {0}".format(str(ex))) 1647 1648 # Scans 1649 1650 queries = [] 1651 1652 # 1. Small lookup query with equality predicate on the partition key 1653 query_details = {} 1654 query_details[ 1655 "query"] = "select name,dept,salary from default USE INDEX (indexname USING GSI) where name='Safiya Palmer'" 1656 query_details["partitioned_idx_name"] = "partitioned_idx1" 1657 query_details["non_partitioned_idx_name"] = "non_partitioned_idx1" 1658 queries.append(query_details) 1659 1660 # 2. Pagination query with equality predicate on the partition key 1661 query_details = {} 1662 query_details[ 1663 "query"] = "select name,dept,salary from default USE INDEX (indexname USING GSI) where name is not missing AND dept='HR' offset 0 limit 10" 1664 query_details["partitioned_idx_name"] = "partitioned_idx1" 1665 query_details["non_partitioned_idx_name"] = "non_partitioned_idx1" 1666 queries.append(query_details) 1667 1668 # 3. Large aggregated query 1669 query_details = {} 1670 query_details[ 1671 "query"] = "select count(name), dept from default USE INDEX (indexname USING GSI) where name is not missing group by dept" 1672 query_details["partitioned_idx_name"] = "partitioned_idx1" 1673 query_details["non_partitioned_idx_name"] = "non_partitioned_idx1" 1674 queries.append(query_details) 1675 1676 # 4. Scan with large result sets 1677 query_details = {} 1678 query_details[ 1679 "query"] = "select name,dept,salary from default USE INDEX (indexname USING GSI) where name is not missing AND salary > 10000" 1680 query_details["partitioned_idx_name"] = "partitioned_idx1" 1681 query_details["non_partitioned_idx_name"] = "non_partitioned_idx1" 1682 queries.append(query_details) 1683 1684 # 5. Scan that does not require sorted data 1685 query_details = {} 1686 query_details[ 1687 "query"] = "select name,dept,salary from default USE INDEX (indexname USING GSI) where name is not missing AND salary > 100000" 1688 query_details["partitioned_idx_name"] = "partitioned_idx1" 1689 query_details["non_partitioned_idx_name"] = "non_partitioned_idx1" 1690 queries.append(query_details) 1691 1692 # 6. Scan that requires sorted data 1693 query_details = {} 1694 query_details[ 1695 "query"] = "select name,dept,salary from default USE INDEX (indexname USING GSI) where name is not missing AND salary > 10000 order by dept asc,salary desc" 1696 query_details["partitioned_idx_name"] = "partitioned_idx1" 1697 query_details["non_partitioned_idx_name"] = "non_partitioned_idx1" 1698 queries.append(query_details) 1699 1700 # 7. Scan with predicate on a dataset that has some values for the partition key missing, and present for some 1701 query_details = {} 1702 query_details[ 1703 "query"] = "select name from default USE INDEX (indexname USING GSI) where name is not missing AND manages.team_size > 3" 1704 query_details["partitioned_idx_name"] = "partitioned_idx2" 1705 query_details["non_partitioned_idx_name"] = "non_partitioned_idx2" 1706 queries.append(query_details) 1707 1708 # 8. Index partitioned on multiple keys. Scan with predicate on multiple keys with a dataset that has some values for the partition keys missing, and present for some 1709 query_details = {} 1710 query_details[ 1711 "query"] = "select name from default USE INDEX (indexname USING GSI) where manages.team_size >= 3 and manages.team_size <= 7 and name like 'A%'" 1712 query_details["partitioned_idx_name"] = "partitioned_idx3" 1713 query_details["non_partitioned_idx_name"] = "non_partitioned_idx2" 1714 queries.append(query_details) 1715 1716 # 9. Overlap scans on partition keys 1717 query_details = {} 1718 query_details[ 1719 "query"] = "select name from default USE INDEX (indexname USING GSI) where name is not missing AND (manages.team_size >= 3 or manages.team_size >= 7)" 1720 query_details["partitioned_idx_name"] = "partitioned_idx2" 1721 query_details["non_partitioned_idx_name"] = "non_partitioned_idx2" 1722 queries.append(query_details) 1723 1724 total_scans = 0 1725 failures = 0 1726 for query_details in queries: 1727 total_scans += 1 1728 1729 try: 1730 query_partitioned_index = query_details["query"].replace( 1731 "indexname", query_details["partitioned_idx_name"]) 1732 query_non_partitioned_index = query_details["query"].replace( 1733 "indexname", query_details["non_partitioned_idx_name"]) 1734 1735 result_partitioned_index = \ 1736 self.n1ql_helper.run_cbq_query( 1737 query=query_partitioned_index, 1738 min_output_size=10000000, 1739 server=self.n1ql_node)["results"] 1740 result_non_partitioned_index = self.n1ql_helper.run_cbq_query( 1741 query=query_non_partitioned_index, min_output_size=10000000, 1742 server=self.n1ql_node)["results"] 1743 1744 self.log.info("Partitioned : {0}".format( 1745 str(result_partitioned_index.sort()))) 1746 self.log.info("Non Partitioned : {0}".format( 1747 str(result_non_partitioned_index.sort()))) 1748 1749 if result_partitioned_index.sort() != result_non_partitioned_index.sort(): 1750 failures += 1 1751 self.log.info( 1752 "*** This query does not return same results for partitioned and non-partitioned indexes.") 1753 except Exception, ex: 1754 self.log.info(str(ex)) 1755 1756 self.log.info( 1757 "Total scans : {0}, Matching results : {1}, Non-matching results : {2}".format( 1758 total_scans, total_scans - failures, failures)) 1759 self.assertEqual(failures, 0, 1760 "Some scans did not yield the same results for partitioned index and non-partitioned indexes. Details above.") 1761 1762 def test_load_balancing_amongst_partitioned_index_replicas(self): 1763 index_name_prefix = "random_index_" + str( 1764 random.randint(100000, 999999)) 1765 create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(age) partition by hash (meta().id) USING GSI WITH {{'num_replica': {0},'num_partition':{1}}};".format( 1766 self.num_index_replicas, self.num_index_partitions) 1767 select_query = "SELECT count(age) from default" 1768 try: 1769 self.n1ql_helper.run_cbq_query(query=create_index_query, 1770 server=self.n1ql_node) 1771 except Exception, ex: 1772 self.log.info(str(ex)) 1773 if self.expected_err_msg not in str(ex): 1774 self.fail( 1775 "index creation did not fail with expected error : {0}".format( 1776 str(ex))) 1777 else: 1778 self.log.info("Index creation failed as expected") 1779 self.sleep(30) 1780 index_map = self.get_index_map() 1781 self.log.info(index_map) 1782 1783 index_metadata = self.rest.get_indexer_metadata() 1784 self.log.info("Indexer Metadata Before Build:") 1785 self.log.info(index_metadata) 1786 1787 index_details = {} 1788 index_details["index_name"] = index_name_prefix 1789 index_details["num_partitions"] = self.num_index_partitions 1790 index_details["defer_build"] = False 1791 1792 self.assertTrue( 1793 self.validate_partitioned_indexes(index_details, index_map, 1794 index_metadata), 1795 "Partitioned index created not as expected") 1796 1797 self.assertTrue(self.validate_partition_map(index_metadata, index_name_prefix, 1798 self.num_index_replicas, 1799 self.num_index_partitions), 1800 "Partition map validation failed") 1801 1802 # Run select query 100 times 1803 for i in range(0, 100): 1804 self.n1ql_helper.run_cbq_query(query=select_query, 1805 server=self.n1ql_node) 1806 1807 index_stats = self.get_index_stats(perNode=True) 1808 load_balanced = True 1809 for i in range(0, self.num_index_replicas + 1): 1810 if i == 0: 1811 index_name = index_name_prefix 1812 else: 1813 index_name = index_name_prefix + " (replica {0})".format(str(i)) 1814 1815 hosts, _ = self.n1ql_helper.get_index_details_using_index_name( 1816 index_name, index_map) 1817 for hostname in hosts: 1818 num_request_served = index_stats[hostname]['default'][index_name][ 1819 "num_completed_requests"] 1820 self.log.info("# Requests served by %s on %s = %s" % ( 1821 index_name, hostname, num_request_served)) 1822 if num_request_served == 0: 1823 load_balanced = False 1824 1825 if not load_balanced: 1826 self.fail("Load is not balanced amongst index replicas") 1827 1828 def test_indexer_pushdowns_multiscan(self): 1829 self._load_emp_dataset(end=self.num_items) 1830 1831 # Create Partitioned indexes 1832 create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(name,dept,salary) partition by hash(meta().id) USING GSI with {{'num_partition':{0}}};".format( 1833 self.num_index_partitions) 1834 1835 try: 1836 self.n1ql_helper.run_cbq_query( 1837 query=create_partitioned_index_query, 1838 server=self.n1ql_node) 1839 except Exception, ex: 1840 self.log.info(str(ex)) 1841 self.fail( 1842 "index creation failed with error : {0}".format(str(ex))) 1843 1844 explain_query1 = "EXPLAIN select name from default where name is not missing and dept='HR' and salary > 120000 and salary < 150000" 1845 results = self.n1ql_helper.run_cbq_query(query=explain_query1, 1846 server=self.n1ql_node) 1847 1848 self.log.info("Explain plan for query 1 : {0}".format(results)) 1849 1850 span_pushdown, _, _, _, _ = self.validate_query_plan( 1851 plan=results["results"][0]["plan"], index_name="partitioned_idx1", 1852 num_spans=3) 1853 1854 self.assertTrue(span_pushdown, "Operators not pushed down to indexer") 1855 1856 explain_query2 = "EXPLAIN select name from default where name is not missing and dept='HR' and salary BETWEEN 120000 and 150000" 1857 results = self.n1ql_helper.run_cbq_query(query=explain_query2, 1858 server=self.n1ql_node) 1859 1860 self.log.info("Explain plan for query 2 : {0}".format(results)) 1861 1862 span_pushdown, _, _, _, _ = self.validate_query_plan( 1863 plan=results["results"][0]["plan"], index_name="partitioned_idx1", 1864 num_spans=3) 1865 1866 self.assertTrue(span_pushdown, "Operators not pushed down to indexer") 1867 1868 explain_query3 = "EXPLAIN select name from default where name is not missing and dept='HR' and (salary > 120000 or salary > 180000)" 1869 results = self.n1ql_helper.run_cbq_query(query=explain_query3, 1870 server=self.n1ql_node) 1871 1872 self.log.info("Explain plan for query 3 : {0}".format(results)) 1873 1874 span_pushdown, _, _, _, _ = self.validate_query_plan( 1875 plan=results["results"][0]["plan"], index_name="partitioned_idx1", 1876 num_spans=3) 1877 1878 self.assertTrue(span_pushdown, "Operators not pushed down to indexer") 1879 1880 def test_indexer_pushdowns_offset_limit(self): 1881 self._load_emp_dataset(end=self.num_items) 1882 1883 # Create Partitioned indexes 1884 create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(name,dept,salary) partition by hash(meta().id) USING GSI with {{'num_partition':{0}}};".format( 1885 self.num_index_partitions) 1886 1887 try: 1888 self.n1ql_helper.run_cbq_query( 1889 query=create_partitioned_index_query, 1890 server=self.n1ql_node) 1891 except Exception, ex: 1892 self.log.info(str(ex)) 1893 self.fail( 1894 "index creation failed with error : {0}".format(str(ex))) 1895 1896 explain_query1 = "EXPLAIN select name from default where name is not missing and dept='HR' and salary > 120000 and salary < 150000 OFFSET 10 LIMIT 10" 1897 results = self.n1ql_helper.run_cbq_query(query=explain_query1, 1898 server=self.n1ql_node) 1899 1900 self.log.info("Explain plan for query 1 : {0}".format(results)) 1901 1902 _, limit_pushdown, offset_pushdown, _, _ = self.validate_query_plan( 1903 plan=results["results"][0]["plan"], index_name="partitioned_idx1", 1904 offset=10, limit=10) 1905 1906 self.assertTrue(limit_pushdown & offset_pushdown, 1907 "Operators not pushed down to indexer") 1908 1909 def test_indexer_pushdowns_projection(self): 1910 self._load_emp_dataset(end=self.num_items) 1911 1912 # Create Partitioned indexes 1913 create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(name,dept,salary) partition by hash(meta().id) USING GSI with {{'num_partition':{0}}};".format( 1914 self.num_index_partitions) 1915 1916 try: 1917 self.n1ql_helper.run_cbq_query( 1918 query=create_partitioned_index_query, 1919 server=self.n1ql_node) 1920 except Exception, ex: 1921 self.log.info(str(ex)) 1922 self.fail( 1923 "index creation failed with error : {0}".format(str(ex))) 1924 1925 explain_query1 = "EXPLAIN select name from default where name is not missing and lower(dept) > 'accounts'" 1926 results = self.n1ql_helper.run_cbq_query(query=explain_query1, 1927 server=self.n1ql_node) 1928 1929 self.log.info("Explain plan for query 1 : {0}".format(results)) 1930 1931 self.sleep(30) 1932 1933 _, _, _, projection_pushdown, _ = self.validate_query_plan( 1934 plan=results["results"][0]["plan"], index_name="partitioned_idx1", 1935 projection_list=[0, 1]) 1936 1937 self.assertTrue(projection_pushdown, 1938 "Operators not pushed down to indexer") 1939 1940 explain_query2 = "EXPLAIN select name,dept,salary from default where name is not missing and lower(dept) > 'accounts'" 1941 results = self.n1ql_helper.run_cbq_query(query=explain_query2, 1942 server=self.n1ql_node) 1943 1944 self.log.info("Explain plan for query 1 : {0}".format(results)) 1945 1946 _, _, _, projection_pushdown, _ = self.validate_query_plan( 1947 plan=results["results"][0]["plan"], index_name="partitioned_idx1", 1948 projection_list=[0, 1, 2]) 1949 1950 self.assertTrue(projection_pushdown, 1951 "Operators not pushed down to indexer") 1952 1953 explain_query3 = "EXPLAIN select meta().id from default where name is not missing and lower(dept) > 'accounts'" 1954 results = self.n1ql_helper.run_cbq_query(query=explain_query3, 1955 server=self.n1ql_node) 1956 1957 self.log.info("Explain plan for query 1 : {0}".format(results)) 1958 1959 _, _, _, projection_pushdown, _ = self.validate_query_plan( 1960 plan=results["results"][0]["plan"], index_name="partitioned_idx1", 1961 projection_list=[0, 1]) 1962 1963 self.assertTrue(projection_pushdown, 1964 "Operators not pushed down to indexer") 1965 1966 def test_indexer_pushdowns_sorting(self): 1967 self._load_emp_dataset(end=self.num_items) 1968 1969 # Create Partitioned indexes 1970 create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(name,dept,salary) partition by hash(meta().id) USING GSI with {{'num_partition':{0}}};".format( 1971 self.num_index_partitions) 1972 1973 try: 1974 self.n1ql_helper.run_cbq_query( 1975 query=create_partitioned_index_query, 1976 server=self.n1ql_node) 1977 except Exception, ex: 1978 self.log.info(str(ex)) 1979 self.fail( 1980 "index creation failed with error : {0}".format(str(ex))) 1981 1982 explain_query1 = "EXPLAIN select name,dept,salary from default where name is not missing order by name,dept,salary" 1983 results = self.n1ql_helper.run_cbq_query(query=explain_query1, 1984 server=self.n1ql_node) 1985 1986 self.log.info("Explain plan for query 1 : {0}".format(results)) 1987 1988 _, _, _, _, sorting_pushdown = self.validate_query_plan( 1989 plan=results["results"][0]["plan"], 1990 index_name="partitioned_idx1", 1991 index_order_list=[{'keypos': 0}, {'keypos': 1}, {'keypos': 2}]) 1992 1993 self.assertTrue(sorting_pushdown, 1994 "Operators not pushed down to indexer") 1995 1996 explain_query2 = "EXPLAIN select name,dept,salary from default where name is not missing order by name,dept" 1997 results = self.n1ql_helper.run_cbq_query(query=explain_query2, 1998 server=self.n1ql_node) 1999 2000 self.log.info("Explain plan for query 2 : {0}".format(results)) 2001 2002 _, _, _, _, sorting_pushdown = self.validate_query_plan( 2003 plan=results["results"][0]["plan"], 2004 index_name="partitioned_idx1", 2005 index_order_list=[{'keypos': 0}, {'keypos': 1}]) 2006 2007 self.assertTrue(sorting_pushdown, 2008 "Operators not pushed down to indexer") 2009 2010 explain_query3 = "EXPLAIN select meta().id from default where name is not missing order by name" 2011 results = self.n1ql_helper.run_cbq_query(query=explain_query3, 2012 server=self.n1ql_node) 2013 2014 self.log.info("Explain plan for query 3 : {0}".format(results)) 2015 2016 _, _, _, _, sorting_pushdown = self.validate_query_plan( 2017 plan=results["results"][0]["plan"], 2018 index_name="partitioned_idx1", 2019 index_order_list=[{'keypos': 0}]) 2020 2021 self.assertTrue(sorting_pushdown, 2022 "Operators not pushed down to indexer") 2023 2024 def test_indexer_pushdowns_sorting_desc(self): 2025 self._load_emp_dataset(end=self.num_items) 2026 2027 # Create Partitioned indexes 2028 create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(name,dept,salary desc) partition by hash(meta().id) USING GSI with {{'num_partition':{0}}};".format( 2029 self.num_index_partitions) 2030 2031 try: 2032 self.n1ql_helper.run_cbq_query( 2033 query=create_partitioned_index_query, 2034 server=self.n1ql_node) 2035 except Exception, ex: 2036 self.log.info(str(ex)) 2037 self.fail( 2038 "index creation failed with error : {0}".format(str(ex))) 2039 2040 explain_query1 = "EXPLAIN select name,dept,salary from default where name is not missing order by name,dept,salary desc" 2041 results = self.n1ql_helper.run_cbq_query(query=explain_query1, 2042 server=self.n1ql_node) 2043 2044 self.log.info("Explain plan for query 1 : {0}".format(results)) 2045 2046 _, _, _, _, sorting_pushdown = self.validate_query_plan( 2047 plan=results["results"][0]["plan"], 2048 index_name="partitioned_idx1", 2049 index_order_list=[{'keypos': 0}, {'keypos': 1}, 2050 {"desc": True, 'keypos': 2}]) 2051 2052 self.assertTrue(sorting_pushdown, 2053 "Operators not pushed down to indexer") 2054 2055 def test_multiple_operator_indexer_pushdowns(self): 2056 self._load_emp_dataset(end=self.num_items) 2057 2058 # Create Partitioned indexes 2059 create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(name,dept,salary) partition by hash(meta().id) USING GSI with {{'num_partition':{0}}};".format( 2060 self.num_index_partitions) 2061 2062 try: 2063 self.n1ql_helper.run_cbq_query( 2064 query=create_partitioned_index_query, 2065 server=self.n1ql_node) 2066 except Exception, ex: 2067 self.log.info(str(ex)) 2068 self.fail( 2069 "index creation failed with error : {0}".format(str(ex))) 2070 2071 explain_query1 = "EXPLAIN select name from default where name is not missing order by name OFFSET 10 LIMIT 10" 2072 results = self.n1ql_helper.run_cbq_query(query=explain_query1, 2073 server=self.n1ql_node) 2074 2075 self.log.info("Explain plan for query 1 : {0}".format(results)) 2076 2077 scan_pushdown, limit_pushdown, offset_pushdown, projection_pushdown, sorting_pushdown = self.validate_query_plan( 2078 plan=results["results"][0]["plan"], index_name="partitioned_idx1", 2079 num_spans=1, offset=10, limit=10, index_order_list=[{'keypos': 0}], 2080 projection_list=[0]) 2081 2082 self.assertTrue( 2083 scan_pushdown & limit_pushdown & offset_pushdown & projection_pushdown & sorting_pushdown, 2084 "Operators not pushed down to indexer") 2085 2086 def test_aggregate_indexer_pushdowns_group_by_leading_keys(self): 2087 self._load_emp_dataset(end=self.num_items) 2088 2089 # Create Partitioned indexes 2090 create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(dept,name,salary) partition by hash(meta().id) USING GSI with {{'num_partition':{0}}};".format( 2091 self.num_index_partitions) 2092 2093 try: 2094 self.n1ql_helper.run_cbq_query( 2095 query=create_partitioned_index_query, 2096 server=self.n1ql_node) 2097 except Exception, ex: 2098 self.log.info(str(ex)) 2099 self.fail( 2100 "index creation failed with error : {0}".format(str(ex))) 2101 2102 explain_query1 = "EXPLAIN select dept,count(*) from default where dept is not missing GROUP BY dept" 2103 results = self.n1ql_helper.run_cbq_query(query=explain_query1, 2104 server=self.n1ql_node) 2105 2106 self.log.info("Explain plan for query 1 : {0}".format(results)) 2107 2108 agg_pushdown = False 2109 if "index_group_aggs" in str(results): 2110 agg_pushdown = True 2111 2112 self.assertTrue(agg_pushdown, "Operators not pushed down to indexer") 2113 2114 explain_query2 = "EXPLAIN select dept,sum(salary), min(salary), max(salary), avg(salary) from default where dept is not missing GROUP BY dept" 2115 results = self.n1ql_helper.run_cbq_query(query=explain_query2, 2116 server=self.n1ql_node) 2117 2118 self.log.info("Explain plan for query 2 : {0}".format(results)) 2119 2120 agg_pushdown = False 2121 if "index_group_aggs" in str(results): 2122 agg_pushdown = True 2123 2124 self.assertTrue(agg_pushdown, "Operators not pushed down to indexer") 2125 2126 def test_aggregate_indexer_pushdowns_group_by_partition_keys(self): 2127 self._load_emp_dataset(end=self.num_items) 2128 2129 # Create Partitioned indexes 2130 create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(dept,name,salary) partition by hash(LOWER(name),UPPER(dept)) USING GSI with {{'num_partition':{0}}};".format( 2131 self.num_index_partitions) 2132 2133 try: 2134 self.n1ql_helper.run_cbq_query( 2135 query=create_partitioned_index_query, 2136 server=self.n1ql_node) 2137 except Exception, ex: 2138 self.log.info(str(ex)) 2139 self.fail( 2140 "index creation failed with error : {0}".format(str(ex))) 2141 2142 explain_query1 = "EXPLAIN select name,dept,count(*) from default where dept is not missing GROUP BY name,dept" 2143 results = self.n1ql_helper.run_cbq_query(query=explain_query1, 2144 server=self.n1ql_node) 2145 2146 self.log.info("Explain plan for query 1 : {0}".format(results)) 2147 2148 agg_pushdown = False 2149 if "index_group_aggs" in str(results): 2150 agg_pushdown = True 2151 2152 self.assertTrue(agg_pushdown, "Operators not pushed down to indexer") 2153 2154 explain_query2 = "EXPLAIN select dept,sum(salary), min(salary), max(salary), avg(salary) from default where dept is not missing GROUP BY dept" 2155 results = self.n1ql_helper.run_cbq_query(query=explain_query2, 2156 server=self.n1ql_node) 2157 2158 self.log.info("Explain plan for query 2 : {0}".format(results)) 2159 2160 agg_pushdown = False 2161 if "index_group_aggs" in str(results): 2162 agg_pushdown = True 2163 2164 self.assertTrue(agg_pushdown, "Operators not pushed down to indexer") 2165 2166 def test_aggregate_indexer_pushdowns_partition_keys_index_keys(self): 2167 self._load_emp_dataset(end=self.num_items) 2168 2169 # Create Partitioned indexes 2170 create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(dept,name,salary) partition by hash(LOWER(dept)) USING GSI with {{'num_partition':{0}}};".format( 2171 self.num_index_partitions) 2172 2173 try: 2174 self.n1ql_helper.run_cbq_query( 2175 query=create_partitioned_index_query, 2176 server=self.n1ql_node) 2177 except Exception, ex: 2178 self.log.info(str(ex)) 2179 self.fail( 2180 "index creation failed with error : {0}".format(str(ex))) 2181 2182 explain_query1 = "EXPLAIN select salary,count(*) from default where dept is not missing GROUP BY salary" 2183 results = self.n1ql_helper.run_cbq_query(query=explain_query1, 2184 server=self.n1ql_node) 2185 2186 self.log.info("Explain plan for query 1 : {0}".format(results)) 2187 2188 agg_pushdown = False 2189 if "index_group_aggs" in str(results): 2190 agg_pushdown = True 2191 2192 self.assertTrue(agg_pushdown, "Operators not pushed down to indexer") 2193 2194 explain_query2 = "EXPLAIN select dept,sum(salary), min(salary), max(salary), avg(salary) from default where dept is not missing GROUP BY dept" 2195 results = self.n1ql_helper.run_cbq_query(query=explain_query2, 2196 server=self.n1ql_node) 2197 2198 self.log.info("Explain plan for query 2 : {0}".format(results)) 2199 2200 agg_pushdown = False 2201 if "index_group_aggs" in str(results): 2202 agg_pushdown = True 2203 2204 self.assertTrue(agg_pushdown, "Operators not pushed down to indexer") 2205 2206 def test_aggregate_indexer_pushdowns_groupby_trailing_keys_partition_keys( 2207 self): 2208 self._load_emp_dataset(end=self.num_items) 2209 2210 # Create Partitioned indexes 2211 create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(dept,name,salary) partition by hash(salary) USING GSI with {{'num_partition':{0}}};".format( 2212 self.num_index_partitions) 2213 2214 try: 2215 self.n1ql_helper.run_cbq_query( 2216 query=create_partitioned_index_query, 2217 server=self.n1ql_node) 2218 except Exception, ex: 2219 self.log.info(str(ex)) 2220 self.fail( 2221 "index creation failed with error : {0}".format(str(ex))) 2222 2223 explain_query1 = "EXPLAIN select salary,count(*) from default where dept is not missing GROUP BY salary" 2224 results = self.n1ql_helper.run_cbq_query(query=explain_query1, 2225 server=self.n1ql_node) 2226 2227 self.log.info("Explain plan for query 1 : {0}".format(results)) 2228 2229 agg_pushdown = False 2230 if "index_group_aggs" in str(results): 2231 agg_pushdown = True 2232 2233 self.assertTrue(agg_pushdown, "Operators not pushed down to indexer") 2234 2235 def test_aggregate_indexer_pushdowns_groupby_trailing_keys_not_partition_keys( 2236 self): 2237 self._load_emp_dataset(end=self.num_items) 2238 2239 # Create Partitioned indexes 2240 create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(dept,name,salary) partition by hash(dept) USING GSI with {{'num_partition':{0}}};".format( 2241 self.num_index_partitions) 2242 2243 try: 2244 self.n1ql_helper.run_cbq_query( 2245 query=create_partitioned_index_query, 2246 server=self.n1ql_node) 2247 except Exception, ex: 2248 self.log.info(str(ex)) 2249 self.fail( 2250 "index creation failed with error : {0}".format(str(ex))) 2251 2252 explain_query1 = "EXPLAIN select salary,count(*) from default where dept is not missing GROUP BY salary" 2253 results = self.n1ql_helper.run_cbq_query(query=explain_query1, 2254 server=self.n1ql_node) 2255 2256 self.log.info("Explain plan for query 1 : {0}".format(results)) 2257 2258 agg_pushdown = False 2259 if "index_group_aggs" in str(results): 2260 agg_pushdown = True 2261 2262 self.assertTrue(agg_pushdown, "Operators not pushed down to indexer") 2263 2264 def test_rebalance_out_with_partitioned_indexes_with_concurrent_querying( 2265 self): 2266 self._load_emp_dataset(end=self.num_items) 2267 2268 with_statement = "with {{'num_partition':{0}".format(self.num_index_partitions) 2269 if self.num_index_replicas > 0: 2270 with_statement += ", 'num_replica':{0}".format(self.num_index_replicas) 2271 if self.defer_build: 2272 with_statement += ", 'defer_build': true" 2273 with_statement += " }" 2274 2275 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) " + with_statement 2276 create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) " + with_statement 2277 2278 try: 2279 self.n1ql_helper.run_cbq_query( 2280 query=create_index_statement, 2281 server=self.n1ql_node) 2282 self.n1ql_helper.run_cbq_query( 2283 query=create_primary_index_statement, 2284 server=self.n1ql_node) 2285 except Exception, ex: 2286 self.log.info(str(ex)) 2287 2288 self.sleep(30) 2289 2290 node_out = self.servers[self.node_out] 2291 node_out_str = node_out.ip + ":" + str(node_out.port) 2292 2293 # Get Index Names 2294 index_names = ["idx1", "pidx1"] 2295 if self.num_index_replicas > 0: 2296 for i in range(1, self.num_index_replicas + 1): 2297 index_names.append("idx1 (replica {0})".format(str(i))) 2298 index_names.append("pidx1 (replica {0})".format(str(i))) 2299 2300 self.log.info(index_names) 2301 2302 # Get Stats and index partition map before rebalance 2303 index_data_before = {} 2304 for index in index_names: 2305 _, total_item_count_before, _ = self.get_stats_for_partitioned_indexes( 2306 index_name=index) 2307 index_data_before[index] = {} 2308 index_data_before[index]["item_count"] = total_item_count_before 2309 index_data_before[index][ 2310 "index_metadata"] = self.rest.get_indexer_metadata() 2311 2312 # start querying 2313 query = "select name,dept,salary from default where name is not missing and dept='HR' and salary > 120000;" 2314 t1 = Thread(target=self._run_queries, args=(query, 30,)) 2315 t1.start() 2316 # rebalance out a indexer node when querying is in progress 2317 rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init], 2318 [], [node_out]) 2319 reached = RestHelper(self.rest).rebalance_reached() 2320 self.assertTrue(reached, "rebalance failed, stuck or did not complete") 2321 rebalance.result() 2322 t1.join() 2323 2324 self.sleep(30) 2325 2326 # Get Stats and index partition map after rebalance 2327 node_list = copy.deepcopy(self.node_list) 2328 node_list.remove(node_out_str) 2329 self.log.info(node_list) 2330 2331 index_data_after = {} 2332 2333 for index in index_names: 2334 self.index_servers = self.get_nodes_from_services_map( 2335 service_type="index", get_all_nodes=True) 2336 _, total_item_count_after, _ = self.get_stats_for_partitioned_indexes( 2337 index_name=index, node_list=node_list) 2338 index_data_after[index] = {} 2339 index_data_after[index]["item_count"] = total_item_count_after 2340 index_data_after[index][ 2341 "index_metadata"] = RestConnection( 2342 self.index_servers[0]).get_indexer_metadata() 2343 2344 for index in index_names: 2345 # Validate index item count before and after 2346 self.assertEqual(index_data_before[index]["item_count"], 2347 index_data_after[index]["item_count"], 2348 "Item count in index do not match after cluster ops.") 2349 2350 # Validate host list, partition count and partition distribution 2351 self.assertTrue( 2352 self.validate_partition_distribution_after_cluster_ops( 2353 index, index_data_before[index]["index_metadata"], 2354 index_data_after[index]["index_metadata"], [], 2355 [node_out]), 2356 "Partition distribution post cluster ops has some issues") 2357 2358 def test_rebalance_out_with_partitioned_indexes_with_concurrent_querying_stop_and_resume( 2359 self): 2360 resume = self.input.param("resume_stopped_rebalance", False) 2361 resume_delay = self.input.param("resume_delay", 0) 2362 2363 self._load_emp_dataset(end=self.num_items) 2364 2365 # Create partitioned index 2366 if self.num_index_replicas > 0: 2367 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_replica':{0}, 'num_partition':{1}}}".format( 2368 self.num_index_replicas, self.num_index_partitions) 2369 create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_replica':{0}, 'num_partition':{1}}}".format( 2370 self.num_index_replicas, self.num_index_partitions) 2371 else: 2372 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_partition':{0}}}".format( 2373 self.num_index_partitions) 2374 create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_partition':{0}}}".format( 2375 self.num_index_partitions) 2376 2377 try: 2378 self.n1ql_helper.run_cbq_query( 2379 query=create_index_statement, 2380 server=self.n1ql_node) 2381 self.n1ql_helper.run_cbq_query( 2382 query=create_primary_index_statement, 2383 server=self.n1ql_node) 2384 except Exception, ex: 2385 self.log.info(str(ex)) 2386 2387 self.sleep(30) 2388 2389 node_out = self.servers[self.node_out] 2390 node_out_str = node_out.ip + ":" + str(node_out.port) 2391 2392 # Get Index Names 2393 index_names = ["idx1", "pidx1"] 2394 if self.num_index_replicas > 0: 2395 for i in range(1, self.num_index_replicas + 1): 2396 index_names.append("idx1 (replica {0})".format(str(i))) 2397 index_names.append("pidx1 (replica {0})".format(str(i))) 2398 2399 self.log.info(index_names) 2400 2401 # Get Stats and index partition map before rebalance 2402 index_data_before = {} 2403 for index in index_names: 2404 _, total_item_count_before, _ = self.get_stats_for_partitioned_indexes( 2405 index_name=index) 2406 index_data_before[index] = {} 2407 index_data_before[index]["item_count"] = total_item_count_before 2408 index_data_before[index][ 2409 "index_metadata"] = self.rest.get_indexer_metadata() 2410 2411 # start querying 2412 query = "select name,dept,salary from default where name is not missing and dept='HR' and salary > 120000;" 2413 t1 = Thread(target=self._run_queries, args=(query, 30,)) 2414 t1.start() 2415 # rebalance out a indexer node when querying is in progress 2416 rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init], 2417 [], [node_out]) 2418 stopped = RestConnection(self.master).stop_rebalance( 2419 wait_timeout=self.wait_timeout / 3) 2420 self.assertTrue(stopped, msg="unable to stop rebalance") 2421 rebalance.result() 2422 2423 if resume: 2424 if resume_delay > 0: 2425 self.sleep(resume_delay, 2426 "Sleep for some time before resume stopped rebalance") 2427 rebalance = self.cluster.async_rebalance( 2428 self.servers[:self.nodes_init], 2429 [], [node_out]) 2430 2431 reached = RestHelper(self.rest).rebalance_reached() 2432 self.assertTrue(reached, 2433 "rebalance failed, stuck or did not complete") 2434 rebalance.result() 2435 2436 t1.join() 2437 2438 self.sleep(30) 2439 2440 # Get Stats and index partition map after rebalance 2441 node_list = copy.deepcopy(self.node_list) 2442 node_list.remove(node_out_str) 2443 self.log.info(node_list) 2444 2445 index_data_after = {} 2446 for index in index_names: 2447 self.index_servers = self.get_nodes_from_services_map( 2448 service_type="index", get_all_nodes=True) 2449 _, total_item_count_after, _ = self.get_stats_for_partitioned_indexes( 2450 index_name=index, node_list=node_list) 2451 index_data_after[index] = {} 2452 index_data_after[index]["item_count"] = total_item_count_after 2453 index_data_after[index][ 2454 "index_metadata"] = RestConnection(self.index_servers[0]).get_indexer_metadata() 2455 2456 for index in index_names: 2457 # Validate index item count before and after 2458 self.assertEqual(index_data_before[index]["item_count"], 2459 index_data_after[index]["item_count"], 2460 "Item count in index do not match after cluster ops.") 2461 2462 # Validate host list, partition count and partition distribution 2463 self.assertTrue( 2464 self.validate_partition_distribution_after_cluster_ops( 2465 index, index_data_before[index]["index_metadata"], 2466 index_data_after[index]["index_metadata"], [], 2467 [node_out]), 2468 "Partition distribution post cluster ops has some issues") 2469 2470 def test_rebalance_in_with_partitioned_indexes_with_concurrent_querying( 2471 self): 2472 self._load_emp_dataset(end=self.num_items) 2473 2474 with_statement = "with {{'num_partition':{0}".format( 2475 self.num_index_partitions) 2476 if self.num_index_replicas > 0: 2477 with_statement += ", 'num_replica':{0}".format( 2478 self.num_index_replicas) 2479 if self.defer_build: 2480 with_statement += ", 'defer_build': true" 2481 with_statement += " }" 2482 2483 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) " + with_statement 2484 create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) " + with_statement 2485 2486 try: 2487 self.n1ql_helper.run_cbq_query( 2488 query=create_index_statement, 2489 server=self.n1ql_node) 2490 self.n1ql_helper.run_cbq_query( 2491 query=create_primary_index_statement, 2492 server=self.n1ql_node) 2493 except Exception, ex: 2494 self.log.info(str(ex)) 2495 2496 self.sleep(30) 2497 2498 node_in = self.servers[self.nodes_init] 2499 node_in_str = node_in.ip + ":" + str(node_in.port) 2500 services_in = ["index"] 2501 2502 # Get Index Names 2503 index_names = ["idx1", "pidx1"] 2504 if self.num_index_replicas > 0: 2505 for i in range(1, self.num_index_replicas + 1): 2506 index_names.append("idx1 (replica {0})".format(str(i))) 2507 index_names.append("pidx1 (replica {0})".format(str(i))) 2508 2509 self.log.info(index_names) 2510 2511 # Get Stats and index partition map before rebalance 2512 index_data_before = {} 2513 for index in index_names: 2514 _, total_item_count_before, _ = self.get_stats_for_partitioned_indexes( 2515 index_name=index) 2516 index_data_before[index] = {} 2517 index_data_before[index]["item_count"] = total_item_count_before 2518 index_data_before[index][ 2519 "index_metadata"] = self.rest.get_indexer_metadata() 2520 2521 # start querying 2522 query = "select name,dept,salary from default where name is not missing and dept='HR' and salary > 120000;" 2523 t1 = Thread(target=self._run_queries, args=(query, 30,)) 2524 t1.start() 2525 # rebalance out a indexer node when querying is in progress 2526 rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init], 2527 [node_in], [], 2528 services=services_in) 2529 reached = RestHelper(self.rest).rebalance_reached() 2530 self.assertTrue(reached, "rebalance failed, stuck or did not complete") 2531 rebalance.result() 2532 t1.join() 2533 2534 self.sleep(30) 2535 2536 # Get Stats and index partition map after rebalance 2537 node_list = copy.deepcopy(self.node_list) 2538 node_list.append(node_in_str) 2539 self.log.info(node_list) 2540 2541 index_data_after = {} 2542 for index in index_names: 2543 self.index_servers = self.get_nodes_from_services_map( 2544 service_type="index", get_all_nodes=True) 2545 _, total_item_count_after, _ = self.get_stats_for_partitioned_indexes( 2546 index_name=index, node_list=node_list) 2547 index_data_after[index] = {} 2548 index_data_after[index]["item_count"] = total_item_count_after 2549 index_data_after[index][ 2550 "index_metadata"] = RestConnection(self.index_servers[0]).get_indexer_metadata() 2551 2552 for index in index_names: 2553 # Validate index item count before and after 2554 self.assertEqual(index_data_before[index]["item_count"], 2555 index_data_after[index]["item_count"], 2556 "Item count in index do not match after cluster ops.") 2557 2558 # Validate host list, partition count and partition distribution 2559 self.assertTrue( 2560 self.validate_partition_distribution_after_cluster_ops( 2561 index, index_data_before[index]["index_metadata"], 2562 index_data_after[index]["index_metadata"], [node_in], 2563 []), 2564 "Partition distribution post cluster ops has some issues") 2565 2566 def test_swap_rebalance_with_partitioned_indexes_with_concurrent_querying( 2567 self): 2568 self._load_emp_dataset(end=self.num_items) 2569 2570 with_statement = "with {{'num_partition':{0}".format( 2571 self.num_index_partitions) 2572 if self.num_index_replicas > 0: 2573 with_statement += ", 'num_replica':{0}".format( 2574 self.num_index_replicas) 2575 if self.defer_build: 2576 with_statement += ", 'defer_build': true" 2577 with_statement += " }" 2578 2579 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) " + with_statement 2580 create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) " + with_statement 2581 2582 try: 2583 self.n1ql_helper.run_cbq_query( 2584 query=create_index_statement, 2585 server=self.n1ql_node) 2586 self.n1ql_helper.run_cbq_query( 2587 query=create_primary_index_statement, 2588 server=self.n1ql_node) 2589 except Exception, ex: 2590 self.log.info(str(ex)) 2591 2592 self.sleep(30) 2593 2594 node_out = self.servers[self.node_out] 2595 node_out_str = node_out.ip + ":" + str(node_out.port) 2596 2597 node_in = self.servers[self.nodes_init] 2598 node_in_str = node_in.ip + ":" + str(node_in.port) 2599 services_in = ["index"] 2600 2601 # Get Index Names 2602 index_names = ["idx1", "pidx1"] 2603 if self.num_index_replicas > 0: 2604 for i in range(1, self.num_index_replicas + 1): 2605 index_names.append("idx1 (replica {0})".format(str(i))) 2606 index_names.append("pidx1 (replica {0})".format(str(i))) 2607 2608 self.log.info(index_names) 2609 2610 # Get Stats and index partition map before rebalance 2611 index_data_before = {} 2612 for index in index_names: 2613 _, total_item_count_before, _ = self.get_stats_for_partitioned_indexes( 2614 index_name=index) 2615 index_data_before[index] = {} 2616 index_data_before[index]["item_count"] = total_item_count_before 2617 index_data_before[index][ 2618 "index_metadata"] = self.rest.get_indexer_metadata() 2619 2620 try: 2621 # start querying 2622 query = "select name,dept,salary from default where name is not missing and dept='HR' and salary > 120000;" 2623 t1 = Thread(target=self._run_queries, args=(query, 30,)) 2624 t1.start() 2625 # rebalance out a indexer node when querying is in progress 2626 rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init], 2627 [node_in], [node_out], 2628 services=services_in) 2629 reached = RestHelper(self.rest).rebalance_reached() 2630 self.assertTrue(reached, "rebalance failed, stuck or did not complete") 2631 rebalance.result() 2632 t1.join() 2633 except Exception, ex: 2634 self.log.info(str(ex)) 2635 2636 self.sleep(30) 2637 2638 # Get Stats and index partition map after rebalance 2639 node_list = copy.deepcopy(self.node_list) 2640 node_list.append(node_in_str) 2641 node_list.remove(node_out_str) 2642 2643 index_data_after = {} 2644 for index in index_names: 2645 self.index_servers = self.get_nodes_from_services_map( 2646 service_type="index", get_all_nodes=True) 2647 _, total_item_count_after, _ = self.get_stats_for_partitioned_indexes( 2648 index_name=index, node_list=node_list) 2649 index_data_after[index] = {} 2650 index_data_after[index]["item_count"] = total_item_count_after 2651 index_data_after[index][ 2652 "index_metadata"] = RestConnection(self.index_servers[0]).get_indexer_metadata() 2653 2654 for index in index_names: 2655 # Validate index item count before and after 2656 self.assertEqual(index_data_before[index]["item_count"], 2657 index_data_after[index]["item_count"], 2658 "Item count in index do not match after cluster ops.") 2659 2660 # Validate host list, partition count and partition distribution 2661 self.assertTrue( 2662 self.validate_partition_distribution_after_cluster_ops( 2663 index, index_data_before[index]["index_metadata"], 2664 index_data_after[index]["index_metadata"], [node_in], 2665 [node_out]), 2666 "Partition distribution post cluster ops has some issues") 2667 2668 def test_failover_with_partitioned_indexes_with_concurrent_querying( 2669 self): 2670 self._load_emp_dataset(end=self.num_items) 2671 2672 # Create partitioned index 2673 if self.num_index_replicas > 0: 2674 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_replica':{0}, 'num_partition':{1}}}".format( 2675 self.num_index_replicas, self.num_index_partitions) 2676 create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_replica':{0}, 'num_partition':{1}}}".format( 2677 self.num_index_replicas, self.num_index_partitions) 2678 else: 2679 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_partition':{1}}}".format( 2680 self.num_index_replicas, self.num_index_partitions) 2681 create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_partition':{1}}}".format( 2682 self.num_index_replicas, self.num_index_partitions) 2683 2684 try: 2685 self.n1ql_helper.run_cbq_query( 2686 query=create_index_statement, 2687 server=self.n1ql_node) 2688 self.n1ql_helper.run_cbq_query( 2689 query=create_primary_index_statement, 2690 server=self.n1ql_node) 2691 except Exception, ex: 2692 self.log.info(str(ex)) 2693 2694 self.sleep(30) 2695 2696 node_out = self.servers[self.node_out] 2697 node_out_str = node_out.ip + ":" + str(node_out.port) 2698 2699 # Get Index Names 2700 index_names = ["idx1", "pidx1"] 2701 if self.num_index_replicas > 0: 2702 for i in range(1, self.num_index_replicas + 1): 2703 index_names.append("idx1 (replica {0})".format(str(i))) 2704 index_names.append("pidx1 (replica {0})".format(str(i))) 2705 2706 self.log.info(index_names) 2707 2708 # Get Stats and index partition map before rebalance 2709 index_data_before = {} 2710 for index in index_names: 2711 _, total_item_count_before, _ = self.get_stats_for_partitioned_indexes( 2712 index_name=index) 2713 index_data_before[index] = {} 2714 index_data_before[index]["item_count"] = total_item_count_before 2715 index_data_before[index][ 2716 "index_metadata"] = self.rest.get_indexer_metadata() 2717 2718 # start querying 2719 query = "select name,dept,salary from default where name is not missing and dept='HR' and salary > 120000;" 2720 t1 = Thread(target=self._run_queries, args=(query, 30,)) 2721 t1.start() 2722 2723 # failover and rebalance out a indexer node when querying is in progress 2724 failover_task = self.cluster.async_failover( 2725 self.servers[:self.nodes_init], 2726 [node_out], 2727 self.graceful, wait_for_pending=180) 2728 2729 failover_task.result() 2730 2731 rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init], 2732 [], [node_out]) 2733 reached = RestHelper(self.rest).rebalance_reached() 2734 self.assertTrue(reached, "rebalance failed, stuck or did not complete") 2735 rebalance.result() 2736 t1.join() 2737 2738 self.sleep(30) 2739 2740 # Get Stats and index partition map after rebalance 2741 node_list = copy.deepcopy(self.node_list) 2742 node_list.remove(node_out_str) 2743 self.log.info(node_list) 2744 2745 index_data_after = {} 2746 for index in index_names: 2747 self.index_servers = self.get_nodes_from_services_map( 2748 service_type="index", get_all_nodes=True) 2749 _, total_item_count_after, _ = self.get_stats_for_partitioned_indexes( 2750 index_name=index, node_list=node_list) 2751 index_data_after[index] = {} 2752 index_data_after[index]["item_count"] = total_item_count_after 2753 index_data_after[index][ 2754 "index_metadata"] = RestConnection(self.index_servers[0]).get_indexer_metadata() 2755 2756 for index in index_names: 2757 # Validate index item count before and after 2758 self.assertEqual(index_data_before[index]["item_count"], 2759 index_data_after[index]["item_count"], 2760 "Item count in index do not match after cluster ops.") 2761 2762 # Validate host list, partition count and partition distribution 2763 self.assertTrue( 2764 self.validate_partition_distribution_after_cluster_ops( 2765 index, index_data_before[index]["index_metadata"], 2766 index_data_after[index]["index_metadata"], [], 2767 [node_out]), 2768 "Partition distribution post cluster ops has some issues") 2769 2770 def test_failover_addback_with_partitioned_indexes_with_concurrent_querying( 2771 self): 2772 self._load_emp_dataset(end=self.num_items) 2773 2774 # Create partitioned index 2775 if self.num_index_replicas > 0: 2776 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_replica':{0}, 'num_partition':{1}}}".format( 2777 self.num_index_replicas, self.num_index_partitions) 2778 create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_replica':{0}, 'num_partition':{1}}}".format( 2779 self.num_index_replicas, self.num_index_partitions) 2780 else: 2781 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_partition':{1}}}".format( 2782 self.num_index_replicas, self.num_index_partitions) 2783 create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_partition':{1}}}".format( 2784 self.num_index_replicas, self.num_index_partitions) 2785 2786 try: 2787 self.n1ql_helper.run_cbq_query( 2788 query=create_index_statement, 2789 server=self.n1ql_node) 2790 self.n1ql_helper.run_cbq_query( 2791 query=create_primary_index_statement, 2792 server=self.n1ql_node) 2793 except Exception, ex: 2794 self.log.info(str(ex)) 2795 2796 self.sleep(30) 2797 2798 node_out = self.servers[self.node_out] 2799 node_out_str = node_out.ip + ":" + str(node_out.port) 2800 2801 # Get Index Names 2802 index_names = ["idx1", "pidx1"] 2803 if self.num_index_replicas > 0: 2804 for i in range(1, self.num_index_replicas + 1): 2805 index_names.append("idx1 (replica {0})".format(str(i))) 2806 index_names.append("pidx1 (replica {0})".format(str(i))) 2807 2808 self.log.info(index_names) 2809 2810 # Get Stats and index partition map before rebalance 2811 index_data_before = {} 2812 for index in index_names: 2813 _, total_item_count_before, _ = self.get_stats_for_partitioned_indexes( 2814 index_name=index) 2815 index_data_before[index] = {} 2816 index_data_before[index]["item_count"] = total_item_count_before 2817 index_data_before[index][ 2818 "index_metadata"] = self.rest.get_indexer_metadata() 2819 2820 # start querying 2821 query = "select name,dept,salary from default where name is not missing and dept='HR' and salary > 120000;" 2822 t1 = Thread(target=self._run_queries, args=(query, 30,)) 2823 t1.start() 2824 2825 # failover and rebalance out a indexer node when querying is in progress 2826 nodes_all = self.rest.node_statuses() 2827 for node in nodes_all: 2828 if node.ip == node_out.ip: 2829 break 2830 2831 failover_task = self.cluster.async_failover( 2832 self.servers[:self.nodes_init], 2833 [node_out], 2834 self.graceful, wait_for_pending=180) 2835 2836 failover_task.result() 2837 2838 self.rest.set_recovery_type(node.id, self.recovery_type) 2839 self.rest.add_back_node(node.id) 2840 2841 rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init], 2842 [], []) 2843 2844 reached = RestHelper(self.rest).rebalance_reached() 2845 self.assertTrue(reached, "rebalance failed, stuck or did not complete") 2846 rebalance.result() 2847 t1.join() 2848 2849 self.sleep(30) 2850 2851 # Get Stats and index partition map after rebalance 2852 index_data_after = {} 2853 for index in index_names: 2854 self.index_servers = self.get_nodes_from_services_map( 2855 service_type="index", get_all_nodes=True) 2856 _, total_item_count_after, _ = self.get_stats_for_partitioned_indexes( 2857 index_name=index) 2858 index_data_after[index] = {} 2859 index_data_after[index]["item_count"] = total_item_count_after 2860 index_data_after[index][ 2861 "index_metadata"] = RestConnection(self.index_servers[0]).get_indexer_metadata() 2862 2863 for index in index_names: 2864 # Validate index item count before and after 2865 self.assertEqual(index_data_before[index]["item_count"], 2866 index_data_after[index]["item_count"], 2867 "Item count in index do not match after cluster ops.") 2868 2869 # Validate host list, partition count and partition distribution 2870 self.assertTrue( 2871 self.validate_partition_distribution_after_cluster_ops( 2872 index, index_data_before[index]["index_metadata"], 2873 index_data_after[index]["index_metadata"], [], 2874 []), 2875 "Partition distribution post cluster ops has some issues") 2876 2877 def test_kv_rebalance_out_with_partitioned_indexes_with_concurrent_querying( 2878 self): 2879 self._load_emp_dataset(end=self.num_items) 2880 2881 # Create partitioned index 2882 if self.num_index_replicas > 0: 2883 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_replica':{0}, 'num_partition':{1}}}".format( 2884 self.num_index_replicas, self.num_index_partitions) 2885 create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_replica':{0}, 'num_partition':{1}}}".format( 2886 self.num_index_replicas, self.num_index_partitions) 2887 else: 2888 create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_partition':{1}}}".format( 2889 self.num_index_replicas, self.num_index_partitions) 2890 create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_partition':{1}}}".format( 2891 self.num_index_replicas, self.num_index_partitions) 2892 2893 try: 2894 self.n1ql_helper.run_cbq_query( 2895 query=create_index_statement, 2896 server=self.n1ql_node) 2897 self.n1ql_helper.run_cbq_query( 2898 query=create_primary_index_statement, 2899 server=self.n1ql_node) 2900 except Exception, ex: 2901 self.log.info(str(ex)) 2902 2903 self.sleep(30) 2904 2905 node_out = self.servers[self.node_out] 2906 node_out_str = node_out.ip + ":" + str(node_out.port) 2907 2908 # Get Index Names 2909 index_names = ["idx1", "pidx1"] 2910 if self.num_index_replicas > 0: 2911 for i in range(1, self.num_index_replicas + 1): 2912 index_names.append("idx1 (replica {0})".format(str(i))) 2913 index_names.append("pidx1 (replica {0})".format(str(i))) 2914 2915 self.log.info(index_names) 2916 2917 # Get Stats and index partition map before rebalance 2918 index_data_before = {} 2919 for index in index_names: 2920 _, total_item_count_before, _ = self.get_stats_for_partitioned_indexes( 2921 index_name=index) 2922 index_data_before[index] = {} 2923 index_data_before[index]["item_count"] = total_item_count_before 2924 index_data_before[index][ 2925 "index_metadata"] = self.rest.get_indexer_metadata() 2926 2927 # start querying 2928 query = "select name,dept,salary from default where name is not missing and dept='HR' and salary > 120000;" 2929 t1 = Thread(target=self._run_queries, args=(query, 30,)) 2930 t1.start() 2931 # rebalance out a indexer node when querying is in progress 2932 rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init], 2933 [], [node_out]) 2934 reached = RestHelper(self.rest).rebalance_reached() 2935 self.assertTrue(reached, "rebalance failed, stuck or did not complete") 2936 rebalance.result() 2937 t1.join() 2938 2939 self.sleep(30) 2940 2941 # Get Stats and index partition map after rebalance 2942 2943 index_data_after = {} 2944 for index in index_names: 2945 self.index_servers = self.get_nodes_from_services_map( 2946 service_type="index", get_all_nodes=True) 2947 _, total_item_count_after, _ = self.get_stats_for_partitioned_indexes( 2948 index_name=index) 2949 index_data_after[index] = {} 2950 index_data_after[index]["item_count"] = total_item_count_after 2951 index_data_after[index][ 2952 "index_metadata"] = RestConnec