1import copy
2import json
3import threading
4
5from base_2i import BaseSecondaryIndexingTests
6from membase.api.rest_client import RestConnection, RestHelper
7import random
8from lib import testconstants
9from lib.couchbase_helper.tuq_generators import TuqGenerators
10from lib.memcached.helper.data_helper import MemcachedClientHelper
11from lib.remote.remote_util import RemoteMachineShellConnection
12from threading import Thread
13from pytests.query_tests_helper import QueryHelperTests
14from couchbase_helper.documentgenerator import JsonDocGenerator
15from couchbase_helper.cluster import Cluster
16from gsi_replica_indexes import GSIReplicaIndexesTests
17from lib.membase.helper.cluster_helper import ClusterOperationHelper
18
19
20class GSIIndexPartitioningTests(GSIReplicaIndexesTests):
21    def setUp(self):
22        super(GSIIndexPartitioningTests, self).setUp()
23        self.num_items = self.input.param("items", 5000)
24        self.log.info("No. of items: {0}".format(str(self.num_items)))
25        self.index_servers = self.get_nodes_from_services_map(
26            service_type="index", get_all_nodes=True)
27        self.rest = RestConnection(self.index_servers[0])
28        self.node_list = []
29        for server in self.index_servers:
30            self.node_list.append(server.ip + ":" + server.port)
31
32        self.num_queries = self.input.param("num_queries", 100)
33        self.num_index_partitions = self.input.param("num_index_partitions", 8)
34        self.recover_failed_node = self.input.param("recover_failed_node",
35                                                    False)
36        self.op_type = self.input.param("op_type", "create")
37        self.node_operation = self.input.param("node_op", "reboot")
38
39    def tearDown(self):
40        super(GSIIndexPartitioningTests, self).tearDown()
41
42    # Test that generates n number of create index statements with various permutations and combinations
43    # of different clauses used in the create index statement.
44    def test_create_partitioned_indexes(self):
45        self._load_emp_dataset(end=self.num_items)
46
47        create_index_queries = self.generate_random_create_index_statements(
48            bucketname=self.buckets[0].name, idx_node_list=self.node_list,
49            num_statements=self.num_queries)
50
51        failed_index_creation = 0
52        for create_index_query in create_index_queries:
53
54            try:
55                self.n1ql_helper.run_cbq_query(
56                    query=create_index_query["index_definition"],
57                    server=self.n1ql_node)
58            except Exception, ex:
59                self.log.info(str(ex))
60
61            self.sleep(10)
62
63            index_metadata = self.rest.get_indexer_metadata()
64            index_map = self.get_index_map()
65
66            if index_metadata:
67                status = self.validate_partitioned_indexes(create_index_query,
68                                                           index_map,
69                                                           index_metadata)
70
71                if not status:
72                    failed_index_creation += 1
73                    self.log.info(
74                        "** Following query failed validation : {0}".format(
75                            create_index_query["index_definition"]))
76            else:
77                failed_index_creation += 1
78                self.log.info(
79                    "** Following index did not get created : {0}".format(
80                        create_index_query["index_definition"]))
81                self.log.info("output from /getIndexStatus")
82                self.log.info(index_metadata)
83                self.log.info("Index Map")
84                self.log.info(index_map)
85
86            drop_index_query = "DROP INDEX default.{0}".format(
87                create_index_query["index_name"])
88            try:
89                self.n1ql_helper.run_cbq_query(
90                    query=drop_index_query,
91                    server=self.n1ql_node)
92            except Exception, ex:
93                self.log.info(str(ex))
94
95        self.log.info(
96            "Total Create Index Statements Run: {0}, Passed : {1}, Failed : {2}".format(
97                self.num_queries, self.num_queries - failed_index_creation,
98                failed_index_creation))
99        self.assertTrue(failed_index_creation == 0,
100                        "Some create index statements failed validations. Pls see the test log above for details.")
101
102    def test_partition_index_with_excluded_nodes(self):
103        self._load_emp_dataset(end=self.num_items)
104
105        # Setting to exclude a node for planner
106        self.rest.set_index_planner_settings("excludeNode=in")
107        # Create partitioned index
108        create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name)"
109
110        try:
111            self.n1ql_helper.run_cbq_query(
112                query=create_index_statement,
113                server=self.n1ql_node)
114        except Exception, ex:
115            self.log.info(str(ex))
116
117        # Validate index created and check the hosts on which partitions are hosted.
118        expected_hosts = self.node_list[1:]
119        expected_hosts.sort()
120        validated = False
121        index_metadata = self.rest.get_indexer_metadata()
122        self.log.info("Indexer Metadata :::")
123        self.log.info(index_metadata)
124
125        for index in index_metadata["status"]:
126            if index["name"] == "idx1":
127                self.log.info("Expected Hosts : {0}".format(expected_hosts))
128                self.log.info("Actual Hosts   : {0}".format(index["hosts"]))
129                self.assertEqual(index["hosts"], expected_hosts,
130                                 "Planner did not ignore excluded node during index creation")
131                validated = True
132
133        if not validated:
134            self.fail("Looks like index was not created.")
135
136    def test_replica_partition_index_with_excluded_nodes(self):
137        self._load_emp_dataset(end=self.num_items)
138
139        # Setting to exclude a node for planner
140        self.rest.set_index_planner_settings("excludeNode=in")
141        # Create partitioned index
142        create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_replica':{0}}}".format(
143            self.num_index_replicas)
144        try:
145            self.n1ql_helper.run_cbq_query(
146                query=create_index_statement,
147                server=self.n1ql_node)
148        except Exception, ex:
149            self.log.info(str(ex))
150
151        index_names = []
152        index_names.append("idx1")
153        for i in range(1, self.num_index_replicas + 1):
154            index_names.append("idx1 (replica {0})".format(str(i)))
155
156        # Need to see if the indexes get created in the first place
157
158        # Validate index created and check the hosts on which partitions are hosted.
159        expected_hosts = self.node_list[1:]
160        expected_hosts.sort()
161        validated = False
162        index_metadata = self.rest.get_indexer_metadata()
163        self.log.info("Indexer Metadata :::")
164        self.log.info(index_metadata)
165
166        index_validated = 0
167        for index_name in index_names:
168            for index in index_metadata["status"]:
169                if index["name"] == index_name:
170                    self.log.info("Expected Hosts : {0}".format(expected_hosts))
171                    self.log.info("Actual Hosts   : {0}".format(index["hosts"]))
172                    self.assertEqual(index["hosts"], expected_hosts,
173                                     "Planner did not ignore excluded node during index creation for {0}".format(
174                                         index_name))
175                    index_validated += 1
176
177        self.assertEqual(index_validated, (self.num_index_replicas + 1),
178                         "All index replicas not created")
179
180    def test_partition_index_by_non_indexed_field(self):
181        self._load_emp_dataset(end=self.num_items)
182
183        create_index_statement = "CREATE INDEX idx1 on default(name,dept) partition by hash(salary) USING GSI"
184        try:
185            self.n1ql_helper.run_cbq_query(query=create_index_statement,
186                                           server=self.n1ql_node)
187        except Exception, ex:
188            self.log.info(str(ex))
189            self.fail("index creation failed with error : {0}".format(str(ex)))
190
191        self.sleep(10)
192        index_map = self.get_index_map()
193        self.log.info(index_map)
194
195        index_metadata = self.rest.get_indexer_metadata()
196        self.log.info("Indexer Metadata Before Build:")
197        self.log.info(index_metadata)
198
199        index_details = {}
200        index_details["index_name"] = "idx1"
201        index_details["num_partitions"] = self.num_index_partitions
202        index_details["defer_build"] = False
203
204        self.assertTrue(
205            self.validate_partitioned_indexes(index_details, index_map,
206                                              index_metadata),
207            "Partitioned index created not as expected")
208
209    def test_default_num_partitions(self):
210        self._load_emp_dataset(end=self.num_items)
211
212        self.rest.set_index_settings(
213            {"indexer.numPartitions": 6})
214
215        create_index_statement = "CREATE INDEX idx1 on default(name,dept) partition by hash(salary) USING GSI"
216        try:
217            self.n1ql_helper.run_cbq_query(query=create_index_statement,
218                                           server=self.n1ql_node)
219        except Exception, ex:
220            self.log.info(str(ex))
221            self.fail("index creation failed with error : {0}".format(str(ex)))
222
223        self.sleep(10)
224        index_map = self.get_index_map()
225        self.log.info(index_map)
226
227        index_metadata = self.rest.get_indexer_metadata()
228        self.log.info("Indexer Metadata Before Build:")
229        self.log.info(index_metadata)
230
231        index_details = {}
232        index_details["index_name"] = "idx1"
233        index_details["num_partitions"] = 6
234        index_details["defer_build"] = False
235
236        self.assertTrue(
237            self.validate_partitioned_indexes(index_details, index_map,
238                                              index_metadata),
239            "Partitioned index created not as expected")
240
241    def test_change_default_num_partitions_after_create_index(self):
242        self._load_emp_dataset(end=self.num_items)
243
244        self.rest.set_index_settings(
245            {"indexer.numPartitions": 16})
246
247        create_index_statement = "CREATE INDEX idx1 on default(name,dept) partition by hash(salary) USING GSI"
248        try:
249            self.n1ql_helper.run_cbq_query(query=create_index_statement,
250                                           server=self.n1ql_node)
251        except Exception, ex:
252            self.log.info(str(ex))
253            self.fail("index creation failed with error : {0}".format(str(ex)))
254
255        self.sleep(10)
256        index_map = self.get_index_map()
257        self.log.info(index_map)
258
259        index_metadata = self.rest.get_indexer_metadata()
260        self.log.info("Indexer Metadata Before Build:")
261        self.log.info(index_metadata)
262
263        index_details = {}
264        index_details["index_name"] = "idx1"
265        index_details["num_partitions"] = 16
266        index_details["defer_build"] = False
267
268        self.assertTrue(
269            self.validate_partitioned_indexes(index_details, index_map,
270                                              index_metadata),
271            "Partitioned index created not as expected")
272
273        self.rest.set_index_settings(
274            {"indexer.numPartitions": 32})
275
276        create_index_statement = "CREATE INDEX idx2 on default(namesalary) partition by hash(salary) USING GSI"
277        try:
278            self.n1ql_helper.run_cbq_query(query=create_index_statement,
279                                           server=self.n1ql_node)
280        except Exception, ex:
281            self.log.info(str(ex))
282            self.fail("index creation failed with error : {0}".format(str(ex)))
283
284        self.sleep(10)
285        index_map = self.get_index_map()
286        self.log.info(index_map)
287
288        index_metadata = self.rest.get_indexer_metadata()
289        self.log.info("Indexer Metadata Before Build:")
290        self.log.info(index_metadata)
291
292        index_details = {}
293        index_details["index_name"] = "idx2"
294        index_details["num_partitions"] = 32
295        index_details["defer_build"] = False
296
297        self.assertTrue(
298            self.validate_partitioned_indexes(index_details, index_map,
299                                              index_metadata),
300            "Partitioned index created not as expected")
301
302        # Validate num_partitions for idx1 doesnt change
303        index_details = {}
304        index_details["index_name"] = "idx1"
305        index_details["num_partitions"] = 16
306        index_details["defer_build"] = False
307
308        self.assertTrue(
309            self.validate_partitioned_indexes(index_details, index_map,
310                                              index_metadata),
311            "Num partitions for existing indexes changed after updating default value")
312
313    def test_default_num_partitions_negative(self):
314        self._load_emp_dataset(end=self.num_items)
315
316        self.rest.set_index_settings(
317            {"indexer.numPartitions": 8})
318
319        numpartition_values_str = ["abc", "2018-03-04 18:02:37"]
320        numpartition_values_num = [0, -5, 46.6789]
321
322        for value in numpartition_values_str:
323
324            indexname = "index_" + str(random.randint(1, 100))
325            try:
326                self.rest.set_index_settings(
327                    {"indexer.numPartitions": '{0}'.format(value)})
328
329                create_index_statement = "CREATE INDEX {0} on default(name,dept) partition by hash(salary) USING GSI".format(
330                    indexname)
331
332                self.n1ql_helper.run_cbq_query(query=create_index_statement,
333                                               server=self.n1ql_node)
334            except Exception, ex:
335                self.log.info(str(ex))
336
337            self.sleep(10)
338            index_map = self.get_index_map()
339            self.log.info(index_map)
340
341            index_metadata = self.rest.get_indexer_metadata()
342            self.log.info("Indexer Metadata Before Build:")
343            self.log.info(index_metadata)
344
345            index_details = {}
346            index_details["index_name"] = indexname
347            if (not isinstance(value, str)) and int(value) > 0:
348                index_details["num_partitions"] = int(value)
349            else:
350                index_details["num_partitions"] = 8
351            index_details["defer_build"] = False
352
353            self.assertTrue(
354                self.validate_partitioned_indexes(index_details, index_map,
355                                                  index_metadata),
356                "Partitioned index created not as expected")
357
358        for value in numpartition_values_num:
359            indexname = "index_" + str(random.randint(101, 200))
360            try:
361                self.rest.set_index_settings(
362                    {"indexer.numPartitions": value})
363
364                create_index_statement = "CREATE INDEX {0} on default(name,dept) partition by hash(salary) USING GSI".format(
365                    indexname)
366
367                self.n1ql_helper.run_cbq_query(query=create_index_statement,
368                                               server=self.n1ql_node)
369            except Exception, ex:
370                self.log.info(str(ex))
371
372            self.sleep(10)
373            index_map = self.get_index_map()
374            self.log.info(index_map)
375
376            index_metadata = self.rest.get_indexer_metadata()
377            self.log.info("Indexer Metadata Before Build:")
378            self.log.info(index_metadata)
379
380            index_details = {}
381            index_details["index_name"] = indexname
382            if (not isinstance(value, str)) and int(value) > 0:
383                index_details["num_partitions"] = int(value)
384            else:
385                index_details["num_partitions"] = 8
386            index_details["defer_build"] = False
387
388            self.assertTrue(
389                self.validate_partitioned_indexes(index_details, index_map,
390                                                  index_metadata),
391                "Partitioned index created not as expected")
392
393    def test_numpartitions_negative(self):
394        self._load_emp_dataset(end=self.num_items)
395
396        create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_partition':null}}"
397        try:
398            self.n1ql_helper.run_cbq_query(
399                query=create_index_statement,
400                server=self.n1ql_node)
401        except Exception, ex:
402            self.log.info(str(ex))
403
404        numpartition_values_str = ["abc", "2018-03-04 18:02:37"]
405        for value in numpartition_values_str:
406            # Create partitioned index
407            create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_partition':'{0}'}}".format(
408                value)
409            try:
410                self.n1ql_helper.run_cbq_query(
411                    query=create_index_statement,
412                    server=self.n1ql_node)
413            except Exception, ex:
414                self.log.info(str(ex))
415            else:
416                self.fail(
417                    "Index got created with an invalid num_partition value : {0}".format(
418                        value))
419
420        numpartition_values_num = [0, -5]
421        for value in numpartition_values_num:
422            # Create partitioned index
423            create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_partition':{0}}}".format(
424                value)
425            try:
426                self.n1ql_helper.run_cbq_query(
427                    query=create_index_statement,
428                    server=self.n1ql_node)
429            except Exception, ex:
430                self.log.info(str(ex))
431            else:
432                self.fail(
433                    "Index got created with an invalid num_partition value : {0}".format(
434                        value))
435
436        create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {'num_partition':47.6789}"
437        try:
438            self.n1ql_helper.run_cbq_query(
439                query=create_index_statement,
440                server=self.n1ql_node)
441        except Exception, ex:
442            self.log.info(str(ex))
443            self.fail(
444                "Index did not get created with an double value for num_partition value : 47.6789")
445        else:
446            self.log.info("Index got created successfully with num_partition being a double value : 47.6789")
447
448
449    def test_partitioned_index_with_replica(self):
450        self._load_emp_dataset(end=self.num_items)
451
452        # Create partitioned index
453        create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_replica':{0}, 'num_partition':{1}}}".format(
454            self.num_index_replicas, self.num_index_partitions)
455
456        try:
457            self.n1ql_helper.run_cbq_query(
458                query=create_index_statement,
459                server=self.n1ql_node)
460        except Exception, ex:
461            self.log.info(str(ex))
462
463        index_metadata = self.rest.get_indexer_metadata()
464        self.log.info("Indexer Metadata :::")
465        self.log.info(index_metadata)
466
467        self.assertTrue(self.validate_partition_map(index_metadata, "idx1",
468                                                    self.num_index_replicas,
469                                                    self.num_index_partitions),
470                        "Partition map validation failed")
471
472    def test_partitioned_index_with_replica_with_server_groups(self):
473        self._load_emp_dataset(end=self.num_items)
474        self._create_server_groups()
475
476        # Create partitioned index
477        create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_replica':{0}}}".format(
478            self.num_index_replicas)
479
480        try:
481            self.n1ql_helper.run_cbq_query(
482                query=create_index_statement,
483                server=self.n1ql_node)
484        except Exception, ex:
485            self.log.info(str(ex))
486
487        index_metadata = self.rest.get_indexer_metadata()
488
489        index_hosts_list = []
490        for index in index_metadata["status"]:
491            index_hosts_list.append(index["hosts"])
492
493        self.log.info("Index Host List : {0}".format(index_hosts_list))
494
495        # Need to change the validation logic here. Between index and its replicas, they should have a full set of partitions in both the server groups.
496        # idx11 - .101, .102: 3, 4, 5, 10, 11, 15, 16
497        # idx11 - .103, .104: 1, 2, 6, 7, 8, 9, 12, 13, 14
498
499        # idx12 - .101, .102: 1, 2, 6, 7, 8, 9, 12, 13, 14
500        # idx12 - .103, .104: 3, 4, 5, 10, 11, 15, 16
501
502        validation = True
503        for i in range(0, len(index_hosts_list)):
504            for j in range(i + 1, len(index_hosts_list)):
505                if (index_hosts_list[i].sort() != index_hosts_list[j].sort()):
506                    continue
507                else:
508                    validation &= False
509
510        self.assertTrue(validation,
511                        "Partitions of replica indexes do not honour server grouping")
512
513    def test_create_partitioned_index_one_node_already_down(self):
514        self._load_emp_dataset(end=self.num_items)
515
516        node_out = self.servers[self.node_out]
517        failover_task = self.cluster.async_failover(
518            self.servers[:self.nodes_init],
519            [node_out],
520            self.graceful, wait_for_pending=60)
521
522        failover_task.result()
523
524        # Create partitioned index
525        create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name)"
526
527        try:
528            self.n1ql_helper.run_cbq_query(
529                query=create_index_statement,
530                server=self.n1ql_node)
531        except Exception, ex:
532            self.log.info(str(ex))
533            self.fail("Failed to create index with one node failed")
534
535        if node_out == self.index_servers[0]:
536            rest = RestConnection(self.index_servers[1])
537        else:
538            rest = self.rest
539
540        index_metadata = rest.get_indexer_metadata()
541        self.log.info("Indexer Metadata :::")
542        self.log.info(index_metadata)
543
544        hosts = index_metadata["status"][0]["hosts"]
545        self.log.info("Actual nodes : {0}".format(hosts))
546        node_out_str = node_out.ip + ":" + node_out.port
547        self.assertTrue(node_out_str not in hosts,
548                        "Partitioned index not created on expected hosts")
549
550    def test_create_partitioned_index_one_node_network_partitioned(self):
551        self._load_emp_dataset(end=self.num_items)
552
553        node_out = self.servers[self.node_out]
554        self.start_firewall_on_node(node_out)
555        self.sleep(10)
556
557        # Create partitioned index
558        create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name)"
559
560        try:
561            self.n1ql_helper.run_cbq_query(
562                query=create_index_statement,
563                server=self.n1ql_node)
564        except Exception, ex:
565            self.log.info(str(ex))
566            self.fail("Failed to create index with one node failed")
567        finally:
568            # Heal network partition and wait for some time to allow indexes
569            # to get built automatically on that node
570            self.stop_firewall_on_node(node_out)
571            self.sleep(120)
572
573        index_metadata = self.rest.get_indexer_metadata()
574        self.log.info("Indexer Metadata :::")
575        self.log.info(index_metadata)
576
577        hosts = index_metadata["status"][0]["hosts"]
578        self.log.info("Actual nodes : {0}".format(hosts))
579        node_out_str = node_out.ip + ":" + node_out.port
580        self.assertTrue(node_out_str not in hosts,
581                        "Partitioned index not created on expected hosts")
582
583    def test_node_fails_during_create_partitioned_index(self):
584        self._load_emp_dataset(end=self.num_items)
585
586        node_out = self.servers[self.node_out]
587
588        # Create partitioned index
589        create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name)"
590
591        threads = []
592        threads.append(
593            Thread(target=self.n1ql_helper.run_cbq_query, name="run_query",
594                   args=(create_index_statement, 10, self.n1ql_node)))
595        threads.append(
596            Thread(target=self.cluster.failover, name="failover", args=(
597                self.servers[:self.nodes_init], [node_out], self.graceful,
598                False, 60)))
599
600        for thread in threads:
601            thread.start()
602        self.sleep(5)
603        for thread in threads:
604            thread.join()
605
606        self.sleep(30)
607
608        if node_out == self.index_servers[0]:
609            rest = RestConnection(self.index_servers[1])
610        else:
611            rest = self.rest
612
613        index_metadata = rest.get_indexer_metadata()
614        self.log.info("Indexer Metadata :::")
615        self.log.info(index_metadata)
616
617    def test_node_nw_partitioned_during_create_partitioned_index(self):
618        self._load_emp_dataset(end=self.num_items)
619
620        node_out = self.servers[self.node_out]
621
622        # Create partitioned index
623        create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name)"
624
625        threads = []
626        threads.append(
627            Thread(target=self.start_firewall_on_node,
628                   name="network_partitioning", args=(node_out,)))
629        threads.append(
630            Thread(target=self.n1ql_helper.run_cbq_query, name="run_query",
631                   args=(create_index_statement, 10, self.n1ql_node)))
632
633        for thread in threads:
634            thread.start()
635        self.sleep(5)
636        for thread in threads:
637            thread.join()
638
639        self.sleep(10)
640
641        try:
642            index_metadata = self.rest.get_indexer_metadata()
643            self.log.info("Indexer Metadata :::")
644            self.log.info(index_metadata)
645            if index_metadata != {}:
646                hosts = index_metadata["status"][0]["hosts"]
647                self.log.info("Actual nodes : {0}".format(hosts))
648                node_out_str = node_out.ip + ":" + node_out.port
649                self.assertTrue(node_out_str not in hosts,
650                                "Partitioned index not created on expected hosts")
651            else:
652                self.log.info(
653                    "Cannot retrieve index metadata since one node is down")
654        except Exception, ex:
655            self.log.info(str(ex))
656        finally:
657            self.stop_firewall_on_node(node_out)
658            self.sleep(30)
659            index_metadata = self.rest.get_indexer_metadata()
660            self.log.info("Indexer Metadata :::")
661            self.log.info(index_metadata)
662
663            hosts = index_metadata["status"][0]["hosts"]
664            node_out_str = node_out.ip + ":" + node_out.port
665            self.assertTrue(node_out_str in hosts,
666                            "Partitioned index not created on all hosts")
667
668    def test_node_nw_partitioned_during_create_partitioned_index_with_node_list(
669            self):
670        self._load_emp_dataset(end=self.num_items)
671
672        node_out = self.servers[self.node_out]
673        node_list_str = "[\"" + "\",\"".join(self.node_list) + "\"]"
674
675        # Create partitioned index
676        create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'nodes' : {0}}}".format(
677            node_list_str)
678
679        threads = []
680
681        threads.append(
682            Thread(target=self.start_firewall_on_node,
683                   name="network_partitioning", args=(node_out,)))
684        threads.append(
685            Thread(target=self.n1ql_helper.run_cbq_query, name="run_query",
686                   args=(create_index_statement, 10, self.n1ql_node)))
687
688        for thread in threads:
689            thread.start()
690        self.sleep(5)
691        for thread in threads:
692            thread.join()
693
694        self.sleep(10)
695
696        try:
697            index_metadata = self.rest.get_indexer_metadata()
698            self.log.info("Indexer Metadata :::")
699            self.log.info(index_metadata)
700            if index_metadata != {}:
701                hosts = index_metadata["status"][0]["hosts"]
702                self.log.info("Actual nodes : {0}".format(hosts))
703                node_out_str = node_out.ip + ":" + node_out.port
704                self.assertTrue(node_out_str not in hosts,
705                                "Partitioned index not created on expected hosts")
706            else:
707                self.log.info(
708                    "Cannot retrieve index metadata since one node is down")
709        except Exception, ex:
710            self.log.info(str(ex))
711        finally:
712            self.stop_firewall_on_node(node_out)
713            self.sleep(30)
714            index_metadata = self.rest.get_indexer_metadata()
715            self.log.info("Indexer Metadata :::")
716            self.log.info(index_metadata)
717
718            hosts = index_metadata["status"][0]["hosts"]
719            node_out_str = node_out.ip + ":" + node_out.port
720            self.assertTrue(node_out_str in hosts,
721                            "Partitioned index not created on all hosts")
722
723    def test_build_partitioned_index(self):
724        self._load_emp_dataset(end=self.num_items)
725
726        index_name_prefix = "random_index_" + str(
727            random.randint(100000, 999999))
728        if self.num_index_replicas > 0:
729            create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI  WITH {{'num_partition': {0}, 'defer_build': true, 'num_replica':{1}}};".format(
730                self.num_index_partitions, self.num_index_replicas)
731        else:
732            create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI  WITH {{'num_partition': {0}, 'defer_build': true}};".format(
733                self.num_index_partitions)
734        try:
735            self.n1ql_helper.run_cbq_query(query=create_index_query,
736                                           server=self.n1ql_node)
737        except Exception, ex:
738            self.log.info(str(ex))
739            self.fail("index creation failed with error : {0}".format(str(ex)))
740
741        self.sleep(10)
742        index_map = self.get_index_map()
743        self.log.info(index_map)
744
745        index_metadata = self.rest.get_indexer_metadata()
746        self.log.info("Indexer Metadata Before Build:")
747        self.log.info(index_metadata)
748
749        index_details = {}
750        index_details["index_name"] = index_name_prefix
751        index_details["num_partitions"] = self.num_index_partitions
752        index_details["defer_build"] = True
753
754        self.assertTrue(
755            self.validate_partitioned_indexes(index_details, index_map,
756                                              index_metadata),
757            "Deferred Partitioned index created not as expected")
758
759        # Validation for replica indexes
760        if self.num_index_replicas > 0:
761            for i in range(1, self.num_index_replicas + 1):
762                index_details[
763                    "index_name"] = index_name_prefix + " (replica {0})".format(
764                    str(i))
765                self.assertTrue(
766                    self.validate_partitioned_indexes(index_details, index_map,
767                                                      index_metadata),
768                    "Deferred Partitioned index created not as expected")
769
770        build_index_query = "BUILD INDEX on `default`(" + index_name_prefix + ")"
771
772        try:
773            self.n1ql_helper.run_cbq_query(query=build_index_query,
774                                           server=self.n1ql_node)
775        except Exception, ex:
776            self.log.info(str(ex))
777            self.fail("index building failed with error : {0}".format(str(ex)))
778
779        self.sleep(30)
780        index_map = self.get_index_map()
781        index_metadata = self.rest.get_indexer_metadata()
782        self.log.info("Indexer Metadata After Build:")
783        self.log.info(index_metadata)
784
785        index_details["index_name"] = index_name_prefix
786        index_details["defer_build"] = False
787
788        self.assertTrue(
789            self.validate_partitioned_indexes(index_details, index_map,
790                                              index_metadata),
791            "Deferred Partitioned index created not as expected")
792        # Validation for replica indexes
793        if self.num_index_replicas > 0:
794            for i in range(1, self.num_index_replicas + 1):
795                index_details[
796                    "index_name"] = index_name_prefix + " (replica {0})".format(
797                    str(i))
798                self.assertTrue(
799                    self.validate_partitioned_indexes(index_details, index_map,
800                                                      index_metadata),
801                    "Deferred Partitioned index created not as expected")
802
803    def test_build_partitioned_index_one_failed_node(self):
804        self._load_emp_dataset(end=self.num_items)
805
806        index_name_prefix = "random_index_" + str(
807            random.randint(100000, 999999))
808        node_list_str = "[\"" + "\",\"".join(self.node_list) + "\"]"
809        create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI  WITH {{'num_partition': {0}, 'nodes': {1}, 'defer_build': true}};".format(
810            self.num_index_partitions, node_list_str)
811        try:
812            self.n1ql_helper.run_cbq_query(query=create_index_query,
813                                           server=self.n1ql_node)
814        except Exception, ex:
815            self.log.info(str(ex))
816            self.fail("index creation failed with error : {0}".format(str(ex)))
817
818        self.sleep(10)
819        index_map = self.get_index_map()
820        self.log.info(index_map)
821
822        index_metadata = self.rest.get_indexer_metadata()
823        self.log.info("Indexer Metadata Before Build:")
824        self.log.info(index_metadata)
825
826        index_details = {}
827        index_details["index_name"] = index_name_prefix
828        index_details["num_partitions"] = self.num_index_partitions
829        index_details["defer_build"] = True
830
831        self.assertTrue(
832            self.validate_partitioned_indexes(index_details, index_map,
833                                              index_metadata),
834            "Deferred Partitioned index created not as expected")
835
836        node_out = self.servers[self.node_out]
837        failover_task = self.cluster.async_failover(
838            self.servers[:self.nodes_init],
839            [node_out],
840            self.graceful, wait_for_pending=180)
841
842        failover_task.result()
843
844        build_index_query = "BUILD INDEX on `default`(" + index_name_prefix + ")"
845
846        try:
847            self.n1ql_helper.run_cbq_query(query=build_index_query,
848                                           server=self.n1ql_node)
849        except Exception, ex:
850            self.log.info(str(ex))
851            self.fail("index building failed with error : {0}".format(str(ex)))
852
853        self.sleep(30)
854        index_map = self.get_index_map()
855        if node_out == self.index_servers[0]:
856            rest = RestConnection(self.index_servers[1])
857        else:
858            rest = self.rest
859        index_metadata = rest.get_indexer_metadata()
860        self.log.info("Indexer Metadata After Build:")
861        self.log.info(index_metadata)
862
863        index_details["defer_build"] = False
864
865        # At this point, since one node is in a failed state, all partitions would not be built.
866        self.assertTrue(
867            self.validate_partitioned_indexes(index_details, index_map,
868                                              index_metadata, skip_numpartitions_check=True),
869            "Deferred Partitioned index created not as expected")
870
871        # Recover the failed node and check if after recovery, all partitions are built.
872        if self.recover_failed_node:
873            nodes_all = self.rest.node_statuses()
874            for node in nodes_all:
875                if node.ip == node_out.ip:
876                    break
877
878            self.rest.set_recovery_type(node.id, self.recovery_type)
879            self.rest.add_back_node(node.id)
880
881            rebalance = self.cluster.async_rebalance(
882                self.servers[:self.nodes_init],
883                [], [])
884            reached = RestHelper(self.rest).rebalance_reached()
885            self.assertTrue(reached,
886                            "rebalance failed, stuck or did not complete")
887            rebalance.result()
888            self.sleep(180)
889
890            index_map = self.get_index_map()
891            index_metadata = self.rest.get_indexer_metadata()
892            self.log.info("Indexer Metadata After Build:")
893            self.log.info(index_metadata)
894
895            index_details["defer_build"] = False
896
897            self.assertTrue(
898                self.validate_partitioned_indexes(index_details, index_map,
899                                                  index_metadata),
900                "Deferred Partitioned index created not as expected")
901
902    def test_failover_during_build_partitioned_index(self):
903        self._load_emp_dataset(end=self.num_items)
904
905        index_name_prefix = "random_index_" + str(
906            random.randint(100000, 999999))
907        node_list_str = "[\"" + "\",\"".join(self.node_list) + "\"]"
908        create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI  WITH {{'num_partition': {0}, 'nodes': {1}, 'defer_build': true}};".format(
909            self.num_index_partitions, node_list_str)
910        try:
911            self.n1ql_helper.run_cbq_query(query=create_index_query,
912                                           server=self.n1ql_node)
913        except Exception, ex:
914            self.log.info(str(ex))
915            self.fail("index creation failed with error : {0}".format(str(ex)))
916
917        self.sleep(10)
918        index_map = self.get_index_map()
919        self.log.info(index_map)
920
921        index_metadata = self.rest.get_indexer_metadata()
922        self.log.info("Indexer Metadata Before Build:")
923        self.log.info(index_metadata)
924
925        index_details = {}
926        index_details["index_name"] = index_name_prefix
927        index_details["num_partitions"] = self.num_index_partitions
928        index_details["defer_build"] = True
929
930        self.assertTrue(
931            self.validate_partitioned_indexes(index_details, index_map,
932                                              index_metadata),
933            "Deferred Partitioned index created not as expected")
934
935        node_out = self.servers[self.node_out]
936        build_index_query = "BUILD INDEX on `default`(" + index_name_prefix + ")"
937        threads = []
938        threads.append(
939            Thread(target=self.n1ql_helper.run_cbq_query, name="run_query",
940                   args=(build_index_query, 10, self.n1ql_node)))
941        threads.append(
942            Thread(target=self.cluster.async_failover, name="failover", args=(
943                self.servers[:self.nodes_init], [node_out], self.graceful)))
944        for thread in threads:
945            thread.start()
946            thread.join()
947        self.sleep(30)
948
949        index_map = self.get_index_map()
950        if node_out == self.index_servers[0]:
951            rest = RestConnection(self.index_servers[1])
952        else:
953            rest = self.rest
954        index_metadata = rest.get_indexer_metadata()
955        self.log.info("Indexer Metadata After Build:")
956        self.log.info(index_metadata)
957
958        index_details["defer_build"] = False
959
960        # At this point, since one node is in a failed state, all partitions would not be built.
961        self.assertTrue(
962            self.validate_partitioned_indexes(index_details, index_map,
963                                              index_metadata, skip_numpartitions_check=True),
964            "Deferred Partitioned index created not as expected")
965
966    def test_build_partitioned_index_with_network_partitioning(self):
967        self._load_emp_dataset(end=self.num_items)
968
969        index_name_prefix = "random_index_" + str(
970            random.randint(100000, 999999))
971        node_list_str = "[\"" + "\",\"".join(self.node_list) + "\"]"
972        create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI  WITH {{'num_partition': {0}, 'nodes': {1}, 'defer_build': true}};".format(
973            self.num_index_partitions, node_list_str)
974        try:
975            self.n1ql_helper.run_cbq_query(query=create_index_query,
976                                           server=self.n1ql_node)
977        except Exception, ex:
978            self.log.info(str(ex))
979            self.fail("index creation failed with error : {0}".format(str(ex)))
980
981        self.sleep(10)
982        index_map = self.get_index_map()
983        self.log.info(index_map)
984
985        index_metadata = self.rest.get_indexer_metadata()
986        self.log.info("Indexer Metadata Before Build:")
987        self.log.info(index_metadata)
988
989        index_details = {}
990        index_details["index_name"] = index_name_prefix
991        index_details["num_partitions"] = self.num_index_partitions
992        index_details["defer_build"] = True
993
994        self.assertTrue(
995            self.validate_partitioned_indexes(index_details, index_map,
996                                              index_metadata),
997            "Deferred Partitioned index created not as expected")
998
999        node_out = self.servers[self.node_out]
1000        build_index_query = "BUILD INDEX on `default`(" + index_name_prefix + ")"
1001
1002        try:
1003            self.start_firewall_on_node(node_out)
1004            self.sleep(10)
1005            self.n1ql_helper.run_cbq_query(query=build_index_query,
1006                                           server=self.n1ql_node)
1007        except Exception, ex:
1008            self.log.info(str(ex))
1009            if not "Index build will be retried in background" in str(ex):
1010                self.fail("index building failed with error : {0}".format(str(ex)))
1011            else:
1012                self.log.info("Index build failed with expected error")
1013
1014        finally:
1015            # Heal network partition and wait for some time to allow indexes
1016            # to get built automatically on that node
1017            self.stop_firewall_on_node(node_out)
1018            self.sleep(360)
1019
1020            index_map = self.get_index_map()
1021            index_metadata = self.rest.get_indexer_metadata()
1022            self.log.info("Indexer Metadata After Build:")
1023            self.log.info(index_metadata)
1024
1025            index_details["defer_build"] = False
1026
1027            self.assertTrue(
1028                self.validate_partitioned_indexes(index_details, index_map,
1029                                                  index_metadata),
1030                "Deferred Partitioned index created not as expected")
1031
1032    def test_drop_partitioned_index(self):
1033        self._load_emp_dataset(end=self.num_items)
1034
1035        index_name_prefix = "random_index_" + str(
1036            random.randint(100000, 999999))
1037
1038        with_clause = "WITH {{'num_partition': {0} ".format(
1039            self.num_index_partitions)
1040        if self.num_index_replicas > 0:
1041            with_clause += ", 'num_replica':{0}".format(self.num_index_replicas)
1042        if self.defer_build:
1043            with_clause += ", 'defer_build':True"
1044        with_clause += " }"
1045
1046        create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI  {0}".format(
1047            with_clause)
1048
1049        try:
1050            self.n1ql_helper.run_cbq_query(query=create_index_query,
1051                                           server=self.n1ql_node)
1052        except Exception, ex:
1053            self.log.info(str(ex))
1054            self.fail(
1055                "index creation failed with error : {0}".format(str(ex)))
1056
1057        self.sleep(10)
1058        index_map = self.get_index_map()
1059        self.log.info(index_map)
1060
1061        index_metadata = self.rest.get_indexer_metadata()
1062        self.log.info("Indexer Metadata Before Build:")
1063        self.log.info(index_metadata)
1064
1065        index_details = {}
1066        index_details["index_name"] = index_name_prefix
1067        index_details["num_partitions"] = self.num_index_partitions
1068        index_details["defer_build"] = self.defer_build
1069
1070        self.assertTrue(
1071            self.validate_partitioned_indexes(index_details, index_map,
1072                                              index_metadata),
1073            "Deferred Partitioned index created not as expected")
1074
1075        # Validation for replica indexes
1076        if self.num_index_replicas > 0:
1077            for i in range(1, self.num_index_replicas + 1):
1078                index_details[
1079                    "index_name"] = index_name_prefix + " (replica {0})".format(
1080                    str(i))
1081                self.assertTrue(
1082                    self.validate_partitioned_indexes(index_details,
1083                                                      index_map,
1084                                                      index_metadata),
1085                    "Deferred Partitioned index created not as expected")
1086
1087        drop_index_query = "DROP INDEX `default`." + index_name_prefix
1088
1089        try:
1090            self.n1ql_helper.run_cbq_query(query=drop_index_query,
1091                                           server=self.n1ql_node)
1092        except Exception, ex:
1093            self.log.info(str(ex))
1094            self.fail(
1095                "Drop index failed with error : {0}".format(str(ex)))
1096
1097        self.sleep(30)
1098        index_map = self.get_index_map()
1099        self.log.info("Index map after drop index: %s", index_map)
1100        if not index_map == {}:
1101            self.fail("Indexes not dropped correctly")
1102
1103    def test_delete_bucket_cascade_drop_partitioned_index(self):
1104        self._load_emp_dataset(end=self.num_items)
1105
1106        index_name_prefix = "random_index_" + str(
1107            random.randint(100000, 999999))
1108
1109        with_clause = "WITH {{'num_partition': {0} ".format(
1110            self.num_index_partitions)
1111        if self.num_index_replicas > 0:
1112            with_clause += ", 'num_replica':{0}".format(self.num_index_replicas)
1113        if self.defer_build:
1114            with_clause += ", 'defer_build':True"
1115        with_clause += " }"
1116
1117        create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI  {0}".format(
1118            with_clause)
1119        create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_partition':{0}}}".format(
1120                self.num_index_partitions)
1121
1122        try:
1123            self.n1ql_helper.run_cbq_query(query=create_index_query,
1124                                           server=self.n1ql_node)
1125            self.n1ql_helper.run_cbq_query(query=create_primary_index_statement,
1126                                           server=self.n1ql_node)
1127        except Exception, ex:
1128            self.log.info(str(ex))
1129            self.fail(
1130                "index creation failed with error : {0}".format(str(ex)))
1131
1132        self.sleep(10)
1133        index_map = self.get_index_map()
1134        self.log.info(index_map)
1135
1136        index_metadata = self.rest.get_indexer_metadata()
1137        self.log.info("Indexer Metadata Before Build:")
1138        self.log.info(index_metadata)
1139
1140        index_details = {}
1141        index_details["index_name"] = index_name_prefix
1142        index_details["num_partitions"] = self.num_index_partitions
1143        index_details["defer_build"] = self.defer_build
1144
1145        self.assertTrue(
1146            self.validate_partitioned_indexes(index_details, index_map,
1147                                              index_metadata),
1148            "Deferred Partitioned index created not as expected")
1149
1150        # Validation for replica indexes
1151        if self.num_index_replicas > 0:
1152            for i in range(1, self.num_index_replicas + 1):
1153                index_details[
1154                    "index_name"] = index_name_prefix + " (replica {0})".format(
1155                    str(i))
1156                self.assertTrue(
1157                    self.validate_partitioned_indexes(index_details,
1158                                                      index_map,
1159                                                      index_metadata),
1160                    "Deferred Partitioned index created not as expected")
1161
1162        self.cluster.bucket_delete(server=self.master, bucket='default')
1163
1164        self.sleep(30)
1165        index_map = self.get_index_map()
1166        self.log.info("Index map after drop index: %s", index_map)
1167        if not index_map == {}:
1168            self.fail("Indexes not dropped correctly")
1169
1170    def test_drop_partitioned_index_one_failed_node(self):
1171        self._load_emp_dataset(end=self.num_items)
1172
1173        index_name_prefix = "random_index_" + str(
1174            random.randint(100000, 999999))
1175        node_list_str = "[\"" + "\",\"".join(self.node_list) + "\"]"
1176        create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI  WITH {{'num_partition': {0}, 'nodes': {1}}};".format(
1177            self.num_index_partitions, node_list_str)
1178        try:
1179            self.n1ql_helper.run_cbq_query(query=create_index_query,
1180                                           server=self.n1ql_node)
1181        except Exception, ex:
1182            self.log.info(str(ex))
1183            self.fail(
1184                "index creation failed with error : {0}".format(str(ex)))
1185
1186        self.sleep(10)
1187        index_map = self.get_index_map()
1188        self.log.info(index_map)
1189
1190        index_metadata = self.rest.get_indexer_metadata()
1191        self.log.info("Indexer Metadata:")
1192        self.log.info(index_metadata)
1193
1194        index_details = {}
1195        index_details["index_name"] = index_name_prefix
1196        index_details["num_partitions"] = self.num_index_partitions
1197        index_details["defer_build"] = False
1198
1199        self.assertTrue(
1200            self.validate_partitioned_indexes(index_details, index_map,
1201                                              index_metadata),
1202            "Partitioned index created not as expected")
1203
1204        node_out = self.servers[self.node_out]
1205        failover_task = self.cluster.async_failover(
1206            self.servers[:self.nodes_init],
1207            [node_out],
1208            self.graceful, wait_for_pending=180)
1209
1210        failover_task.result()
1211
1212        drop_index_query = "DROP INDEX `default`." + index_name_prefix
1213
1214        try:
1215            self.n1ql_helper.run_cbq_query(query=drop_index_query,
1216                                           server=self.n1ql_node)
1217        except Exception, ex:
1218            self.log.info(str(ex))
1219            self.fail(
1220                "Drop index failed with error : {0}".format(str(ex)))
1221
1222        self.sleep(30)
1223        index_map = self.get_index_map()
1224        self.log.info("Index map after drop index: %s", index_map)
1225        if not index_map == {}:
1226            self.fail("Indexes not dropped correctly")
1227
1228        if self.recover_failed_node:
1229            nodes_all = self.rest.node_statuses()
1230            for node in nodes_all:
1231                if node.ip == node_out.ip:
1232                    break
1233
1234            self.rest.set_recovery_type(node.id, self.recovery_type)
1235            self.rest.add_back_node(node.id)
1236
1237            rebalance = self.cluster.async_rebalance(
1238                self.servers[:self.nodes_init],
1239                [], [])
1240            reached = RestHelper(self.rest).rebalance_reached()
1241            self.assertTrue(reached,
1242                            "rebalance failed, stuck or did not complete")
1243            rebalance.result()
1244            self.sleep(180)
1245
1246            index_map = self.get_index_map()
1247            self.log.info("Index map after drop index: %s", index_map)
1248            if not index_map == {}:
1249                self.fail("Indexes not dropped correctly")
1250
1251    def test_failover_during_drop_partitioned_index(self):
1252        self._load_emp_dataset(end=self.num_items)
1253
1254        index_name_prefix = "random_index_" + str(
1255            random.randint(100000, 999999))
1256        node_list_str = "[\"" + "\",\"".join(self.node_list) + "\"]"
1257        create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI  WITH {{'num_partition': {0}, 'nodes': {1}}};".format(
1258            self.num_index_partitions, node_list_str)
1259        try:
1260            self.n1ql_helper.run_cbq_query(query=create_index_query,
1261                                           server=self.n1ql_node)
1262        except Exception, ex:
1263            self.log.info(str(ex))
1264            self.fail("index creation failed with error : {0}".format(
1265                str(ex)))
1266
1267        self.sleep(10)
1268        index_map = self.get_index_map()
1269        self.log.info(index_map)
1270
1271        index_metadata = self.rest.get_indexer_metadata()
1272        self.log.info("Indexer Metadata:")
1273        self.log.info(index_metadata)
1274
1275        index_details = {}
1276        index_details["index_name"] = index_name_prefix
1277        index_details["num_partitions"] = self.num_index_partitions
1278        index_details["defer_build"] = False
1279
1280        self.assertTrue(
1281            self.validate_partitioned_indexes(index_details, index_map,
1282                                              index_metadata),
1283            "Partitioned index created not as expected")
1284
1285        node_out = self.servers[self.node_out]
1286        drop_index_query = "DROP INDEX `default`." + index_name_prefix
1287        threads = []
1288        threads.append(
1289            Thread(target=self.n1ql_helper.run_cbq_query,
1290                   name="run_query",
1291                   args=(drop_index_query, 10, self.n1ql_node)))
1292        threads.append(
1293            Thread(target=self.cluster.async_failover, name="failover",
1294                   args=(
1295                       self.servers[:self.nodes_init], [node_out],
1296                       self.graceful)))
1297        for thread in threads:
1298            thread.start()
1299            thread.join()
1300        self.sleep(30)
1301
1302        index_map = self.get_index_map()
1303        self.log.info("Index map after drop index: %s", index_map)
1304        if not index_map == {}:
1305            self.fail("Indexes not dropped correctly")
1306
1307    def test_drop_partitioned_index_with_network_partitioning(self):
1308        self._load_emp_dataset(end=self.num_items)
1309
1310        index_name_prefix = "random_index_" + str(
1311            random.randint(100000, 999999))
1312        node_list_str = "[\"" + "\",\"".join(self.node_list) + "\"]"
1313        create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI  WITH {{'num_partition': {0}, 'nodes': {1}}};".format(
1314            self.num_index_partitions, node_list_str)
1315        try:
1316            self.n1ql_helper.run_cbq_query(query=create_index_query,
1317                                           server=self.n1ql_node)
1318        except Exception, ex:
1319            self.log.info(str(ex))
1320            self.fail(
1321                "index creation failed with error : {0}".format(str(ex)))
1322
1323        self.sleep(10)
1324        index_map = self.get_index_map()
1325        self.log.info(index_map)
1326
1327        index_metadata = self.rest.get_indexer_metadata()
1328        self.log.info("Indexer Metadata Before Build:")
1329        self.log.info(index_metadata)
1330
1331        index_details = {}
1332        index_details["index_name"] = index_name_prefix
1333        index_details["num_partitions"] = self.num_index_partitions
1334        index_details["defer_build"] = False
1335
1336        self.assertTrue(
1337            self.validate_partitioned_indexes(index_details, index_map,
1338                                              index_metadata),
1339            "Partitioned index created not as expected")
1340
1341        node_out = self.servers[self.node_out]
1342        self.start_firewall_on_node(node_out)
1343
1344        drop_index_query = "DROP INDEX `default`." + index_name_prefix
1345
1346        try:
1347            self.start_firewall_on_node(node_out)
1348            self.sleep(10)
1349            self.n1ql_helper.run_cbq_query(query=drop_index_query,
1350                                           server=self.n1ql_node)
1351        except Exception, ex:
1352            self.log.info(str(ex))
1353            if not "the operation will automaticaly retry after cluster is back to normal" in str(ex):
1354                self.fail(
1355                    "index drop failed with error : {0}".format(str(ex)))
1356            else:
1357                self.log.info("Index drop failed with expected error")
1358
1359        finally:
1360            # Heal network partition and wait for some time to allow indexes
1361            # to get built automatically on that node
1362            self.stop_firewall_on_node(node_out)
1363            self.sleep(360)
1364
1365            index_map = self.get_index_map()
1366            self.log.info("Index map after drop index: %s", index_map)
1367            if not index_map == {}:
1368                self.fail("Indexes not dropped correctly")
1369
1370    def test_partitioned_index_warmup_behaviour(self):
1371        node_out = self.servers[self.node_out]
1372
1373        self._load_emp_dataset(end=self.num_items)
1374
1375        index_name_prefix = "random_index_" + str(
1376            random.randint(100000, 999999))
1377
1378        create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(name,dept,salary) partition by hash(name) USING GSI"
1379        if self.defer_build:
1380            create_index_query += " WITH {'defer_build':true}"
1381
1382        try:
1383            self.n1ql_helper.run_cbq_query(query=create_index_query,
1384                                           server=self.n1ql_node)
1385        except Exception, ex:
1386            self.log.info(str(ex))
1387            self.fail(
1388                "index creation failed with error : {0}".format(str(ex)))
1389
1390        self.sleep(10)
1391        index_map = self.get_index_map()
1392        self.log.info(index_map)
1393
1394        index_metadata = self.rest.get_indexer_metadata()
1395        self.log.info("Indexer Metadata Before:")
1396        self.log.info(index_metadata)
1397
1398        index_details = {}
1399        index_details["index_name"] = index_name_prefix
1400        index_details["num_partitions"] = self.num_index_partitions
1401        index_details["defer_build"] = self.defer_build
1402
1403        self.assertTrue(
1404            self.validate_partitioned_indexes(index_details, index_map,
1405                                              index_metadata),
1406            "Partitioned index created not as expected")
1407
1408        remote_client = RemoteMachineShellConnection(node_out)
1409        if self.node_operation == "kill_indexer":
1410            remote_client.terminate_process(process_name="indexer")
1411            remote_client.disconnect()
1412        else:
1413            self.reboot_node(node_out)
1414
1415        # wait for restart and warmup on all node
1416        self.sleep(self.wait_timeout * 3)
1417        # disable firewall on these nodes
1418        self.stop_firewall_on_node(node_out)
1419        # wait till node is ready after warmup
1420        ClusterOperationHelper.wait_for_ns_servers_or_assert([node_out], self,
1421                                                             wait_if_warmup=True)
1422
1423        index_map = self.get_index_map()
1424        self.log.info(index_map)
1425
1426        index_metadata = self.rest.get_indexer_metadata()
1427        self.log.info("Indexer Metadata After:")
1428        self.log.info(index_metadata)
1429
1430        self.assertTrue(
1431            self.validate_partitioned_indexes(index_details, index_map,
1432                                              index_metadata),
1433            "Partitioned index warmup behavior not as expected")
1434
1435    def test_mutations_on_partitioned_indexes(self):
1436        self.run_async_index_operations(operation_type="create_index")
1437        self.run_doc_ops()
1438        self.sleep(30)
1439
1440        # Get item counts
1441        bucket_item_count, total_item_count, total_num_docs_processed = self.get_stats_for_partitioned_indexes()
1442
1443        self.assertEqual(bucket_item_count, total_item_count,
1444                         "# Items indexed {0} do not match bucket items {1}".format(
1445                             total_item_count, bucket_item_count))
1446
1447    def test_update_mutations_on_indexed_keys_partitioned_indexes(self):
1448        create_index_query = "CREATE INDEX idx1 ON default(name,mutated) partition by hash(name) USING GSI;"
1449        try:
1450            self.n1ql_helper.run_cbq_query(query=create_index_query,
1451                                           server=self.n1ql_node)
1452        except Exception, ex:
1453            self.log.info(str(ex))
1454            self.fail(
1455                "index creation failed with error : {0}".format(str(ex)))
1456        self.run_doc_ops()
1457        self.sleep(30)
1458
1459        # Get item counts
1460        bucket_item_count, total_item_count, total_num_docs_processed = self.get_stats_for_partitioned_indexes(
1461            index_name="idx1")
1462
1463        self.assertEqual(bucket_item_count, total_item_count,
1464                         "# Items indexed {0} do not match bucket items {1}".format(
1465                             total_item_count, bucket_item_count))
1466
1467    def test_kv_full_rollback_on_partitioned_indexes(self):
1468        self.run_async_index_operations(operation_type="create_index")
1469        self.sleep(30)
1470
1471        self.cluster.bucket_flush(self.master)
1472        self.sleep(60)
1473
1474        # Get item counts
1475        bucket_item_count, total_item_count, total_num_docs_processed = self.get_stats_for_partitioned_indexes()
1476
1477        self.assertEqual(total_item_count, 0, "Rollback to zero fails")
1478
1479    def test_kv_partial_rollback_on_partitioned_indexes(self):
1480        self.run_async_index_operations(operation_type="create_index")
1481
1482        # Stop Persistence on Node A & Node B
1483        self.log.info("Stopping persistence on NodeA & NodeB")
1484        mem_client = MemcachedClientHelper.direct_client(self.servers[0],
1485                                                         "default")
1486        mem_client.stop_persistence()
1487        mem_client = MemcachedClientHelper.direct_client(self.servers[1],
1488                                                         "default")
1489        mem_client.stop_persistence()
1490
1491        self.run_doc_ops()
1492
1493        self.sleep(10)
1494
1495        # Get count before rollback
1496        bucket_count_before_rollback, item_count_before_rollback, num_docs_processed_before_rollback = self.get_stats_for_partitioned_indexes()
1497
1498        # Kill memcached on Node A so that Node B becomes master
1499        self.log.info("Kill Memcached process on NodeA")
1500        shell = RemoteMachineShellConnection(self.master)
1501        shell.kill_memcached()
1502
1503        # Start persistence on Node B
1504        self.log.info("Starting persistence on NodeB")
1505        mem_client = MemcachedClientHelper.direct_client(
1506            self.input.servers[1], "default")
1507        mem_client.start_persistence()
1508
1509        # Failover Node B
1510        self.log.info("Failing over NodeB")
1511        self.sleep(10)
1512        failover_task = self.cluster.async_failover(
1513            self.servers[:self.nodes_init], [self.servers[1]], self.graceful,
1514            wait_for_pending=120)
1515
1516        failover_task.result()
1517
1518        # Wait for a couple of mins to allow rollback to complete
1519        self.sleep(120)
1520
1521        # Get count after rollback
1522        bucket_count_after_rollback, item_count_after_rollback, num_docs_processed_after_rollback = self.get_stats_for_partitioned_indexes()
1523
1524        self.assertEqual(bucket_count_after_rollback, item_count_after_rollback,
1525                         "Partial KV Rollback not processed by Partitioned indexes")
1526
1527    def test_scan_availability(self):
1528        create_index_query = "CREATE INDEX idx1 ON default(name,mutated) partition by hash(BASE64(meta().id)) USING GSI"
1529        if self.num_index_replicas:
1530            create_index_query += " with {{'num_replica':{0}}};".format(
1531                self.num_index_replicas)
1532        try:
1533            self.n1ql_helper.run_cbq_query(query=create_index_query,
1534                                           server=self.n1ql_node)
1535        except Exception, ex:
1536            self.log.info(str(ex))
1537            self.fail(
1538                "index creation failed with error : {0}".format(str(ex)))
1539
1540        node_out = self.servers[self.node_out]
1541        failover_task = self.cluster.async_failover(
1542            self.servers[:self.nodes_init],
1543            [node_out],
1544            self.graceful, wait_for_pending=60)
1545
1546        failover_task.result()
1547
1548        self.sleep(30)
1549
1550        # Run query
1551        scan_query = "select name,mutated from default where name > 'a' and mutated >=0;"
1552        try:
1553            self.n1ql_helper.run_cbq_query(query=scan_query,
1554                                           server=self.n1ql_node)
1555        except Exception, ex:
1556            self.log.info(str(ex))
1557            if self.num_index_replicas == 0:
1558                if self.expected_err_msg in str(ex):
1559                    pass
1560                else:
1561                    self.fail(
1562                        "Scan failed with unexpected error message".format(
1563                            str(ex)))
1564            else:
1565                self.fail("Scan failed")
1566
1567    def test_scan_availability_with_network_partitioning(self):
1568        create_index_query = "CREATE INDEX idx1 ON default(name,mutated) partition by hash(BASE64(meta().id)) USING GSI"
1569        if self.num_index_replicas:
1570            create_index_query += " with {{'num_replica':{0}}};".format(
1571                self.num_index_replicas)
1572        try:
1573            self.n1ql_helper.run_cbq_query(query=create_index_query,
1574                                           server=self.n1ql_node)
1575        except Exception, ex:
1576            self.log.info(str(ex))
1577            self.fail(
1578                "index creation failed with error : {0}".format(str(ex)))
1579
1580        # Induce network partitioning on one of the nodes
1581        node_out = self.servers[self.node_out]
1582        self.start_firewall_on_node(node_out)
1583
1584        # Run query
1585        scan_query = "select name,mutated from default where name > 'a' and mutated >=0;"
1586        try:
1587            self.n1ql_helper.run_cbq_query(query=scan_query,
1588                                           server=self.n1ql_node)
1589        except Exception, ex:
1590            self.log.info(
1591                "Scan failed as one indexer node was experiencing network partititioning. Error : %s",
1592                str(ex))
1593
1594        # Heal Network Partitioning
1595        self.stop_firewall_on_node(node_out)
1596
1597        # Re-run query
1598        scan_query = "select name,mutated from default where name > 'a' and mutated >=0;"
1599        try:
1600            self.n1ql_helper.run_cbq_query(query=scan_query,
1601                                           server=self.n1ql_node)
1602        except Exception, ex:
1603            self.log.info(str(ex))
1604            if self.num_index_replicas:
1605                if self.expected_err_msg in str(ex):
1606                    pass
1607                else:
1608                    self.fail(
1609                        "Scan failed with unexpected error message".format(
1610                            str(ex)))
1611            else:
1612                self.fail("Scan failed")
1613
1614    def test_index_scans(self):
1615        self._load_emp_dataset(end=self.num_items)
1616
1617        # Create Partitioned and non-partitioned indexes
1618
1619        if self.num_index_partitions > 0:
1620            self.rest.set_index_settings(
1621                {"indexer.numPartitions": self.num_index_partitions})
1622
1623        create_partitioned_index1_query = "CREATE INDEX partitioned_idx1 ON default(name,dept,salary) partition by hash(name,dept,salary) USING GSI;"
1624        create_index1_query = "CREATE INDEX non_partitioned_idx1 ON default(name,dept,salary) USING GSI;"
1625        create_partitioned_index2_query = "create index partitioned_idx2 on default(name,manages.team_size) partition by hash(manages.team_size) USING GSI;"
1626        create_index2_query = "create index non_partitioned_idx2 on default(name,manages.team_size) USING GSI;"
1627        create_partitioned_index3_query = "create index partitioned_idx3 on default(name,manages.team_size) partition by hash(name,manages.team_size) USING GSI;"
1628
1629        try:
1630            self.n1ql_helper.run_cbq_query(
1631                query=create_partitioned_index1_query,
1632                server=self.n1ql_node)
1633            self.n1ql_helper.run_cbq_query(query=create_index1_query,
1634                                           server=self.n1ql_node)
1635            self.n1ql_helper.run_cbq_query(
1636                query=create_partitioned_index2_query,
1637                server=self.n1ql_node)
1638            self.n1ql_helper.run_cbq_query(query=create_index2_query,
1639                                           server=self.n1ql_node)
1640            self.n1ql_helper.run_cbq_query(
1641                query=create_partitioned_index3_query,
1642                server=self.n1ql_node)
1643        except Exception, ex:
1644            self.log.info(str(ex))
1645            self.fail(
1646                "index creation failed with error : {0}".format(str(ex)))
1647
1648        # Scans
1649
1650        queries = []
1651
1652        # 1. Small lookup query with equality predicate on the partition key
1653        query_details = {}
1654        query_details[
1655            "query"] = "select name,dept,salary from default USE INDEX (indexname USING GSI) where name='Safiya Palmer'"
1656        query_details["partitioned_idx_name"] = "partitioned_idx1"
1657        query_details["non_partitioned_idx_name"] = "non_partitioned_idx1"
1658        queries.append(query_details)
1659
1660        # 2. Pagination query with equality predicate on the partition key
1661        query_details = {}
1662        query_details[
1663            "query"] = "select name,dept,salary from default USE INDEX (indexname USING GSI) where name is not missing AND dept='HR' offset 0 limit 10"
1664        query_details["partitioned_idx_name"] = "partitioned_idx1"
1665        query_details["non_partitioned_idx_name"] = "non_partitioned_idx1"
1666        queries.append(query_details)
1667
1668        # 3. Large aggregated query
1669        query_details = {}
1670        query_details[
1671            "query"] = "select count(name), dept from default USE INDEX (indexname USING GSI) where name is not missing group by dept"
1672        query_details["partitioned_idx_name"] = "partitioned_idx1"
1673        query_details["non_partitioned_idx_name"] = "non_partitioned_idx1"
1674        queries.append(query_details)
1675
1676        # 4. Scan with large result sets
1677        query_details = {}
1678        query_details[
1679            "query"] = "select name,dept,salary from default USE INDEX (indexname USING GSI) where name is not missing AND salary > 10000"
1680        query_details["partitioned_idx_name"] = "partitioned_idx1"
1681        query_details["non_partitioned_idx_name"] = "non_partitioned_idx1"
1682        queries.append(query_details)
1683
1684        # 5. Scan that does not require sorted data
1685        query_details = {}
1686        query_details[
1687            "query"] = "select name,dept,salary from default USE INDEX (indexname USING GSI) where name is not missing AND salary > 100000"
1688        query_details["partitioned_idx_name"] = "partitioned_idx1"
1689        query_details["non_partitioned_idx_name"] = "non_partitioned_idx1"
1690        queries.append(query_details)
1691
1692        # 6. Scan that requires sorted data
1693        query_details = {}
1694        query_details[
1695            "query"] = "select name,dept,salary from default USE INDEX (indexname USING GSI) where name is not missing AND salary > 10000 order by dept asc,salary desc"
1696        query_details["partitioned_idx_name"] = "partitioned_idx1"
1697        query_details["non_partitioned_idx_name"] = "non_partitioned_idx1"
1698        queries.append(query_details)
1699
1700        # 7. Scan with predicate on a dataset that has some values for the partition key missing, and present for some
1701        query_details = {}
1702        query_details[
1703            "query"] = "select name from default USE INDEX (indexname USING GSI) where name is not missing AND manages.team_size > 3"
1704        query_details["partitioned_idx_name"] = "partitioned_idx2"
1705        query_details["non_partitioned_idx_name"] = "non_partitioned_idx2"
1706        queries.append(query_details)
1707
1708        # 8. Index partitioned on multiple keys. Scan with predicate on multiple keys with a dataset that has some values for the partition keys missing, and present for some
1709        query_details = {}
1710        query_details[
1711            "query"] = "select name from default USE INDEX (indexname USING GSI) where manages.team_size >= 3 and manages.team_size <= 7 and name like 'A%'"
1712        query_details["partitioned_idx_name"] = "partitioned_idx3"
1713        query_details["non_partitioned_idx_name"] = "non_partitioned_idx2"
1714        queries.append(query_details)
1715
1716        # 9. Overlap scans on partition keys
1717        query_details = {}
1718        query_details[
1719            "query"] = "select name from default USE INDEX (indexname USING GSI) where name is not missing AND (manages.team_size >= 3 or manages.team_size >= 7)"
1720        query_details["partitioned_idx_name"] = "partitioned_idx2"
1721        query_details["non_partitioned_idx_name"] = "non_partitioned_idx2"
1722        queries.append(query_details)
1723
1724        total_scans = 0
1725        failures = 0
1726        for query_details in queries:
1727            total_scans += 1
1728
1729            try:
1730                query_partitioned_index = query_details["query"].replace(
1731                    "indexname", query_details["partitioned_idx_name"])
1732                query_non_partitioned_index = query_details["query"].replace(
1733                    "indexname", query_details["non_partitioned_idx_name"])
1734
1735                result_partitioned_index = \
1736                    self.n1ql_helper.run_cbq_query(
1737                        query=query_partitioned_index,
1738                        min_output_size=10000000,
1739                        server=self.n1ql_node)["results"]
1740                result_non_partitioned_index = self.n1ql_helper.run_cbq_query(
1741                    query=query_non_partitioned_index, min_output_size=10000000,
1742                    server=self.n1ql_node)["results"]
1743
1744                self.log.info("Partitioned : {0}".format(
1745                    str(result_partitioned_index.sort())))
1746                self.log.info("Non Partitioned : {0}".format(
1747                    str(result_non_partitioned_index.sort())))
1748
1749                if result_partitioned_index.sort() != result_non_partitioned_index.sort():
1750                    failures += 1
1751                    self.log.info(
1752                        "*** This query does not return same results for partitioned and non-partitioned indexes.")
1753            except Exception, ex:
1754                self.log.info(str(ex))
1755
1756        self.log.info(
1757            "Total scans : {0}, Matching results : {1}, Non-matching results : {2}".format(
1758                total_scans, total_scans - failures, failures))
1759        self.assertEqual(failures, 0,
1760                         "Some scans did not yield the same results for partitioned index and non-partitioned indexes. Details above.")
1761
1762    def test_load_balancing_amongst_partitioned_index_replicas(self):
1763        index_name_prefix = "random_index_" + str(
1764            random.randint(100000, 999999))
1765        create_index_query = "CREATE INDEX " + index_name_prefix + " ON default(age) partition by hash (meta().id) USING GSI  WITH {{'num_replica': {0},'num_partition':{1}}};".format(
1766            self.num_index_replicas, self.num_index_partitions)
1767        select_query = "SELECT count(age) from default"
1768        try:
1769            self.n1ql_helper.run_cbq_query(query=create_index_query,
1770                                           server=self.n1ql_node)
1771        except Exception, ex:
1772            self.log.info(str(ex))
1773            if self.expected_err_msg not in str(ex):
1774                self.fail(
1775                    "index creation did not fail with expected error : {0}".format(
1776                        str(ex)))
1777            else:
1778                self.log.info("Index creation failed as expected")
1779        self.sleep(30)
1780        index_map = self.get_index_map()
1781        self.log.info(index_map)
1782
1783        index_metadata = self.rest.get_indexer_metadata()
1784        self.log.info("Indexer Metadata Before Build:")
1785        self.log.info(index_metadata)
1786
1787        index_details = {}
1788        index_details["index_name"] = index_name_prefix
1789        index_details["num_partitions"] = self.num_index_partitions
1790        index_details["defer_build"] = False
1791
1792        self.assertTrue(
1793            self.validate_partitioned_indexes(index_details, index_map,
1794                                              index_metadata),
1795            "Partitioned index created not as expected")
1796
1797        self.assertTrue(self.validate_partition_map(index_metadata, index_name_prefix,
1798                                                    self.num_index_replicas,
1799                                                    self.num_index_partitions),
1800                        "Partition map validation failed")
1801
1802        # Run select query 100 times
1803        for i in range(0, 100):
1804            self.n1ql_helper.run_cbq_query(query=select_query,
1805                                           server=self.n1ql_node)
1806
1807        index_stats = self.get_index_stats(perNode=True)
1808        load_balanced = True
1809        for i in range(0, self.num_index_replicas + 1):
1810            if i == 0:
1811                index_name = index_name_prefix
1812            else:
1813                index_name = index_name_prefix + " (replica {0})".format(str(i))
1814
1815            hosts, _ = self.n1ql_helper.get_index_details_using_index_name(
1816                index_name, index_map)
1817            for hostname in hosts:
1818                num_request_served = index_stats[hostname]['default'][index_name][
1819                    "num_completed_requests"]
1820                self.log.info("# Requests served by %s on %s = %s" % (
1821                    index_name, hostname, num_request_served))
1822                if num_request_served == 0:
1823                    load_balanced = False
1824
1825        if not load_balanced:
1826            self.fail("Load is not balanced amongst index replicas")
1827
1828    def test_indexer_pushdowns_multiscan(self):
1829        self._load_emp_dataset(end=self.num_items)
1830
1831        # Create Partitioned indexes
1832        create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(name,dept,salary) partition by hash(meta().id) USING GSI with {{'num_partition':{0}}};".format(
1833            self.num_index_partitions)
1834
1835        try:
1836            self.n1ql_helper.run_cbq_query(
1837                query=create_partitioned_index_query,
1838                server=self.n1ql_node)
1839        except Exception, ex:
1840            self.log.info(str(ex))
1841            self.fail(
1842                "index creation failed with error : {0}".format(str(ex)))
1843
1844        explain_query1 = "EXPLAIN select name from default where name is not missing and dept='HR' and salary > 120000 and salary < 150000"
1845        results = self.n1ql_helper.run_cbq_query(query=explain_query1,
1846                                                 server=self.n1ql_node)
1847
1848        self.log.info("Explain plan for query 1 : {0}".format(results))
1849
1850        span_pushdown, _, _, _, _ = self.validate_query_plan(
1851            plan=results["results"][0]["plan"], index_name="partitioned_idx1",
1852            num_spans=3)
1853
1854        self.assertTrue(span_pushdown, "Operators not pushed down to indexer")
1855
1856        explain_query2 = "EXPLAIN select name from default where name is not missing and dept='HR' and salary BETWEEN 120000 and 150000"
1857        results = self.n1ql_helper.run_cbq_query(query=explain_query2,
1858                                                 server=self.n1ql_node)
1859
1860        self.log.info("Explain plan for query 2 : {0}".format(results))
1861
1862        span_pushdown, _, _, _, _ = self.validate_query_plan(
1863            plan=results["results"][0]["plan"], index_name="partitioned_idx1",
1864            num_spans=3)
1865
1866        self.assertTrue(span_pushdown, "Operators not pushed down to indexer")
1867
1868        explain_query3 = "EXPLAIN select name from default where name is not missing and dept='HR' and (salary > 120000 or salary > 180000)"
1869        results = self.n1ql_helper.run_cbq_query(query=explain_query3,
1870                                                 server=self.n1ql_node)
1871
1872        self.log.info("Explain plan for query 3 : {0}".format(results))
1873
1874        span_pushdown, _, _, _, _ = self.validate_query_plan(
1875            plan=results["results"][0]["plan"], index_name="partitioned_idx1",
1876            num_spans=3)
1877
1878        self.assertTrue(span_pushdown, "Operators not pushed down to indexer")
1879
1880    def test_indexer_pushdowns_offset_limit(self):
1881        self._load_emp_dataset(end=self.num_items)
1882
1883        # Create Partitioned indexes
1884        create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(name,dept,salary) partition by hash(meta().id) USING GSI with {{'num_partition':{0}}};".format(
1885            self.num_index_partitions)
1886
1887        try:
1888            self.n1ql_helper.run_cbq_query(
1889                query=create_partitioned_index_query,
1890                server=self.n1ql_node)
1891        except Exception, ex:
1892            self.log.info(str(ex))
1893            self.fail(
1894                "index creation failed with error : {0}".format(str(ex)))
1895
1896        explain_query1 = "EXPLAIN select name from default where name is not missing and dept='HR' and salary > 120000 and salary < 150000 OFFSET 10 LIMIT 10"
1897        results = self.n1ql_helper.run_cbq_query(query=explain_query1,
1898                                                 server=self.n1ql_node)
1899
1900        self.log.info("Explain plan for query 1 : {0}".format(results))
1901
1902        _, limit_pushdown, offset_pushdown, _, _ = self.validate_query_plan(
1903            plan=results["results"][0]["plan"], index_name="partitioned_idx1",
1904            offset=10, limit=10)
1905
1906        self.assertTrue(limit_pushdown & offset_pushdown,
1907                        "Operators not pushed down to indexer")
1908
1909    def test_indexer_pushdowns_projection(self):
1910        self._load_emp_dataset(end=self.num_items)
1911
1912        # Create Partitioned indexes
1913        create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(name,dept,salary) partition by hash(meta().id) USING GSI with {{'num_partition':{0}}};".format(
1914            self.num_index_partitions)
1915
1916        try:
1917            self.n1ql_helper.run_cbq_query(
1918                query=create_partitioned_index_query,
1919                server=self.n1ql_node)
1920        except Exception, ex:
1921            self.log.info(str(ex))
1922            self.fail(
1923                "index creation failed with error : {0}".format(str(ex)))
1924
1925        explain_query1 = "EXPLAIN select name from default where name is not missing and lower(dept) > 'accounts'"
1926        results = self.n1ql_helper.run_cbq_query(query=explain_query1,
1927                                                 server=self.n1ql_node)
1928
1929        self.log.info("Explain plan for query 1 : {0}".format(results))
1930
1931        self.sleep(30)
1932
1933        _, _, _, projection_pushdown, _ = self.validate_query_plan(
1934            plan=results["results"][0]["plan"], index_name="partitioned_idx1",
1935            projection_list=[0, 1])
1936
1937        self.assertTrue(projection_pushdown,
1938                        "Operators not pushed down to indexer")
1939
1940        explain_query2 = "EXPLAIN select name,dept,salary from default where name is not missing and lower(dept) > 'accounts'"
1941        results = self.n1ql_helper.run_cbq_query(query=explain_query2,
1942                                                 server=self.n1ql_node)
1943
1944        self.log.info("Explain plan for query 1 : {0}".format(results))
1945
1946        _, _, _, projection_pushdown, _ = self.validate_query_plan(
1947            plan=results["results"][0]["plan"], index_name="partitioned_idx1",
1948            projection_list=[0, 1, 2])
1949
1950        self.assertTrue(projection_pushdown,
1951                        "Operators not pushed down to indexer")
1952
1953        explain_query3 = "EXPLAIN select meta().id from default where name is not missing and lower(dept) > 'accounts'"
1954        results = self.n1ql_helper.run_cbq_query(query=explain_query3,
1955                                                 server=self.n1ql_node)
1956
1957        self.log.info("Explain plan for query 1 : {0}".format(results))
1958
1959        _, _, _, projection_pushdown, _ = self.validate_query_plan(
1960            plan=results["results"][0]["plan"], index_name="partitioned_idx1",
1961            projection_list=[0, 1])
1962
1963        self.assertTrue(projection_pushdown,
1964                        "Operators not pushed down to indexer")
1965
1966    def test_indexer_pushdowns_sorting(self):
1967        self._load_emp_dataset(end=self.num_items)
1968
1969        # Create Partitioned indexes
1970        create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(name,dept,salary) partition by hash(meta().id) USING GSI with {{'num_partition':{0}}};".format(
1971            self.num_index_partitions)
1972
1973        try:
1974            self.n1ql_helper.run_cbq_query(
1975                query=create_partitioned_index_query,
1976                server=self.n1ql_node)
1977        except Exception, ex:
1978            self.log.info(str(ex))
1979            self.fail(
1980                "index creation failed with error : {0}".format(str(ex)))
1981
1982        explain_query1 = "EXPLAIN select name,dept,salary from default where name is not missing order by name,dept,salary"
1983        results = self.n1ql_helper.run_cbq_query(query=explain_query1,
1984                                                 server=self.n1ql_node)
1985
1986        self.log.info("Explain plan for query 1 : {0}".format(results))
1987
1988        _, _, _, _, sorting_pushdown = self.validate_query_plan(
1989            plan=results["results"][0]["plan"],
1990            index_name="partitioned_idx1",
1991            index_order_list=[{'keypos': 0}, {'keypos': 1}, {'keypos': 2}])
1992
1993        self.assertTrue(sorting_pushdown,
1994                        "Operators not pushed down to indexer")
1995
1996        explain_query2 = "EXPLAIN select name,dept,salary from default where name is not missing order by name,dept"
1997        results = self.n1ql_helper.run_cbq_query(query=explain_query2,
1998                                                 server=self.n1ql_node)
1999
2000        self.log.info("Explain plan for query 2 : {0}".format(results))
2001
2002        _, _, _, _, sorting_pushdown = self.validate_query_plan(
2003            plan=results["results"][0]["plan"],
2004            index_name="partitioned_idx1",
2005            index_order_list=[{'keypos': 0}, {'keypos': 1}])
2006
2007        self.assertTrue(sorting_pushdown,
2008                        "Operators not pushed down to indexer")
2009
2010        explain_query3 = "EXPLAIN select meta().id from default where name is not missing order by name"
2011        results = self.n1ql_helper.run_cbq_query(query=explain_query3,
2012                                                 server=self.n1ql_node)
2013
2014        self.log.info("Explain plan for query 3 : {0}".format(results))
2015
2016        _, _, _, _, sorting_pushdown = self.validate_query_plan(
2017            plan=results["results"][0]["plan"],
2018            index_name="partitioned_idx1",
2019            index_order_list=[{'keypos': 0}])
2020
2021        self.assertTrue(sorting_pushdown,
2022                        "Operators not pushed down to indexer")
2023
2024    def test_indexer_pushdowns_sorting_desc(self):
2025        self._load_emp_dataset(end=self.num_items)
2026
2027        # Create Partitioned indexes
2028        create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(name,dept,salary desc) partition by hash(meta().id) USING GSI with {{'num_partition':{0}}};".format(
2029            self.num_index_partitions)
2030
2031        try:
2032            self.n1ql_helper.run_cbq_query(
2033                query=create_partitioned_index_query,
2034                server=self.n1ql_node)
2035        except Exception, ex:
2036            self.log.info(str(ex))
2037            self.fail(
2038                "index creation failed with error : {0}".format(str(ex)))
2039
2040        explain_query1 = "EXPLAIN select name,dept,salary from default where name is not missing order by name,dept,salary desc"
2041        results = self.n1ql_helper.run_cbq_query(query=explain_query1,
2042                                                 server=self.n1ql_node)
2043
2044        self.log.info("Explain plan for query 1 : {0}".format(results))
2045
2046        _, _, _, _, sorting_pushdown = self.validate_query_plan(
2047            plan=results["results"][0]["plan"],
2048            index_name="partitioned_idx1",
2049            index_order_list=[{'keypos': 0}, {'keypos': 1},
2050                              {"desc": True, 'keypos': 2}])
2051
2052        self.assertTrue(sorting_pushdown,
2053                        "Operators not pushed down to indexer")
2054
2055    def test_multiple_operator_indexer_pushdowns(self):
2056        self._load_emp_dataset(end=self.num_items)
2057
2058        # Create Partitioned indexes
2059        create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(name,dept,salary) partition by hash(meta().id) USING GSI with {{'num_partition':{0}}};".format(
2060            self.num_index_partitions)
2061
2062        try:
2063            self.n1ql_helper.run_cbq_query(
2064                query=create_partitioned_index_query,
2065                server=self.n1ql_node)
2066        except Exception, ex:
2067            self.log.info(str(ex))
2068            self.fail(
2069                "index creation failed with error : {0}".format(str(ex)))
2070
2071        explain_query1 = "EXPLAIN select name from default where name is not missing order by name OFFSET 10 LIMIT 10"
2072        results = self.n1ql_helper.run_cbq_query(query=explain_query1,
2073                                                 server=self.n1ql_node)
2074
2075        self.log.info("Explain plan for query 1 : {0}".format(results))
2076
2077        scan_pushdown, limit_pushdown, offset_pushdown, projection_pushdown, sorting_pushdown = self.validate_query_plan(
2078            plan=results["results"][0]["plan"], index_name="partitioned_idx1",
2079            num_spans=1, offset=10, limit=10, index_order_list=[{'keypos': 0}],
2080            projection_list=[0])
2081
2082        self.assertTrue(
2083            scan_pushdown & limit_pushdown & offset_pushdown & projection_pushdown & sorting_pushdown,
2084            "Operators not pushed down to indexer")
2085
2086    def test_aggregate_indexer_pushdowns_group_by_leading_keys(self):
2087        self._load_emp_dataset(end=self.num_items)
2088
2089        # Create Partitioned indexes
2090        create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(dept,name,salary) partition by hash(meta().id) USING GSI with {{'num_partition':{0}}};".format(
2091            self.num_index_partitions)
2092
2093        try:
2094            self.n1ql_helper.run_cbq_query(
2095                query=create_partitioned_index_query,
2096                server=self.n1ql_node)
2097        except Exception, ex:
2098            self.log.info(str(ex))
2099            self.fail(
2100                "index creation failed with error : {0}".format(str(ex)))
2101
2102        explain_query1 = "EXPLAIN select dept,count(*) from default where dept is not missing GROUP BY dept"
2103        results = self.n1ql_helper.run_cbq_query(query=explain_query1,
2104                                                 server=self.n1ql_node)
2105
2106        self.log.info("Explain plan for query 1 : {0}".format(results))
2107
2108        agg_pushdown = False
2109        if "index_group_aggs" in str(results):
2110            agg_pushdown = True
2111
2112        self.assertTrue(agg_pushdown, "Operators not pushed down to indexer")
2113
2114        explain_query2 = "EXPLAIN select dept,sum(salary), min(salary), max(salary), avg(salary) from default where dept is not missing GROUP BY dept"
2115        results = self.n1ql_helper.run_cbq_query(query=explain_query2,
2116                                                 server=self.n1ql_node)
2117
2118        self.log.info("Explain plan for query 2 : {0}".format(results))
2119
2120        agg_pushdown = False
2121        if "index_group_aggs" in str(results):
2122            agg_pushdown = True
2123
2124        self.assertTrue(agg_pushdown, "Operators not pushed down to indexer")
2125
2126    def test_aggregate_indexer_pushdowns_group_by_partition_keys(self):
2127        self._load_emp_dataset(end=self.num_items)
2128
2129        # Create Partitioned indexes
2130        create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(dept,name,salary) partition by hash(LOWER(name),UPPER(dept)) USING GSI with {{'num_partition':{0}}};".format(
2131            self.num_index_partitions)
2132
2133        try:
2134            self.n1ql_helper.run_cbq_query(
2135                query=create_partitioned_index_query,
2136                server=self.n1ql_node)
2137        except Exception, ex:
2138            self.log.info(str(ex))
2139            self.fail(
2140                "index creation failed with error : {0}".format(str(ex)))
2141
2142        explain_query1 = "EXPLAIN select name,dept,count(*) from default where dept is not missing GROUP BY name,dept"
2143        results = self.n1ql_helper.run_cbq_query(query=explain_query1,
2144                                                 server=self.n1ql_node)
2145
2146        self.log.info("Explain plan for query 1 : {0}".format(results))
2147
2148        agg_pushdown = False
2149        if "index_group_aggs" in str(results):
2150            agg_pushdown = True
2151
2152        self.assertTrue(agg_pushdown, "Operators not pushed down to indexer")
2153
2154        explain_query2 = "EXPLAIN select dept,sum(salary), min(salary), max(salary), avg(salary) from default where dept is not missing GROUP BY dept"
2155        results = self.n1ql_helper.run_cbq_query(query=explain_query2,
2156                                                 server=self.n1ql_node)
2157
2158        self.log.info("Explain plan for query 2 : {0}".format(results))
2159
2160        agg_pushdown = False
2161        if "index_group_aggs" in str(results):
2162            agg_pushdown = True
2163
2164        self.assertTrue(agg_pushdown, "Operators not pushed down to indexer")
2165
2166    def test_aggregate_indexer_pushdowns_partition_keys_index_keys(self):
2167        self._load_emp_dataset(end=self.num_items)
2168
2169        # Create Partitioned indexes
2170        create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(dept,name,salary) partition by hash(LOWER(dept)) USING GSI with {{'num_partition':{0}}};".format(
2171            self.num_index_partitions)
2172
2173        try:
2174            self.n1ql_helper.run_cbq_query(
2175                query=create_partitioned_index_query,
2176                server=self.n1ql_node)
2177        except Exception, ex:
2178            self.log.info(str(ex))
2179            self.fail(
2180                "index creation failed with error : {0}".format(str(ex)))
2181
2182        explain_query1 = "EXPLAIN select salary,count(*) from default where dept is not missing GROUP BY salary"
2183        results = self.n1ql_helper.run_cbq_query(query=explain_query1,
2184                                                 server=self.n1ql_node)
2185
2186        self.log.info("Explain plan for query 1 : {0}".format(results))
2187
2188        agg_pushdown = False
2189        if "index_group_aggs" in str(results):
2190            agg_pushdown = True
2191
2192        self.assertTrue(agg_pushdown, "Operators not pushed down to indexer")
2193
2194        explain_query2 = "EXPLAIN select dept,sum(salary), min(salary), max(salary), avg(salary) from default where dept is not missing GROUP BY dept"
2195        results = self.n1ql_helper.run_cbq_query(query=explain_query2,
2196                                                 server=self.n1ql_node)
2197
2198        self.log.info("Explain plan for query 2 : {0}".format(results))
2199
2200        agg_pushdown = False
2201        if "index_group_aggs" in str(results):
2202            agg_pushdown = True
2203
2204        self.assertTrue(agg_pushdown, "Operators not pushed down to indexer")
2205
2206    def test_aggregate_indexer_pushdowns_groupby_trailing_keys_partition_keys(
2207            self):
2208        self._load_emp_dataset(end=self.num_items)
2209
2210        # Create Partitioned indexes
2211        create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(dept,name,salary) partition by hash(salary) USING GSI with {{'num_partition':{0}}};".format(
2212            self.num_index_partitions)
2213
2214        try:
2215            self.n1ql_helper.run_cbq_query(
2216                query=create_partitioned_index_query,
2217                server=self.n1ql_node)
2218        except Exception, ex:
2219            self.log.info(str(ex))
2220            self.fail(
2221                "index creation failed with error : {0}".format(str(ex)))
2222
2223        explain_query1 = "EXPLAIN select salary,count(*) from default where dept is not missing GROUP BY salary"
2224        results = self.n1ql_helper.run_cbq_query(query=explain_query1,
2225                                                 server=self.n1ql_node)
2226
2227        self.log.info("Explain plan for query 1 : {0}".format(results))
2228
2229        agg_pushdown = False
2230        if "index_group_aggs" in str(results):
2231            agg_pushdown = True
2232
2233        self.assertTrue(agg_pushdown, "Operators not pushed down to indexer")
2234
2235    def test_aggregate_indexer_pushdowns_groupby_trailing_keys_not_partition_keys(
2236            self):
2237        self._load_emp_dataset(end=self.num_items)
2238
2239        # Create Partitioned indexes
2240        create_partitioned_index_query = "CREATE INDEX partitioned_idx1 ON default(dept,name,salary) partition by hash(dept) USING GSI with {{'num_partition':{0}}};".format(
2241            self.num_index_partitions)
2242
2243        try:
2244            self.n1ql_helper.run_cbq_query(
2245                query=create_partitioned_index_query,
2246                server=self.n1ql_node)
2247        except Exception, ex:
2248            self.log.info(str(ex))
2249            self.fail(
2250                "index creation failed with error : {0}".format(str(ex)))
2251
2252        explain_query1 = "EXPLAIN select salary,count(*) from default where dept is not missing GROUP BY salary"
2253        results = self.n1ql_helper.run_cbq_query(query=explain_query1,
2254                                                 server=self.n1ql_node)
2255
2256        self.log.info("Explain plan for query 1 : {0}".format(results))
2257
2258        agg_pushdown = False
2259        if "index_group_aggs" in str(results):
2260            agg_pushdown = True
2261
2262        self.assertTrue(agg_pushdown, "Operators not pushed down to indexer")
2263
2264    def test_rebalance_out_with_partitioned_indexes_with_concurrent_querying(
2265            self):
2266        self._load_emp_dataset(end=self.num_items)
2267
2268        with_statement = "with {{'num_partition':{0}".format(self.num_index_partitions)
2269        if self.num_index_replicas > 0:
2270            with_statement += ", 'num_replica':{0}".format(self.num_index_replicas)
2271        if self.defer_build:
2272            with_statement += ", 'defer_build': true"
2273        with_statement += " }"
2274
2275        create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) " + with_statement
2276        create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) " + with_statement
2277
2278        try:
2279            self.n1ql_helper.run_cbq_query(
2280                query=create_index_statement,
2281                server=self.n1ql_node)
2282            self.n1ql_helper.run_cbq_query(
2283                query=create_primary_index_statement,
2284                server=self.n1ql_node)
2285        except Exception, ex:
2286            self.log.info(str(ex))
2287
2288        self.sleep(30)
2289
2290        node_out = self.servers[self.node_out]
2291        node_out_str = node_out.ip + ":" + str(node_out.port)
2292
2293        # Get Index Names
2294        index_names = ["idx1", "pidx1"]
2295        if self.num_index_replicas > 0:
2296            for i in range(1, self.num_index_replicas + 1):
2297                index_names.append("idx1 (replica {0})".format(str(i)))
2298                index_names.append("pidx1 (replica {0})".format(str(i)))
2299
2300        self.log.info(index_names)
2301
2302        # Get Stats and index partition map before rebalance
2303        index_data_before = {}
2304        for index in index_names:
2305            _, total_item_count_before, _ = self.get_stats_for_partitioned_indexes(
2306                index_name=index)
2307            index_data_before[index] = {}
2308            index_data_before[index]["item_count"] = total_item_count_before
2309            index_data_before[index][
2310                "index_metadata"] = self.rest.get_indexer_metadata()
2311
2312        # start querying
2313        query = "select name,dept,salary from default where name is not missing and dept='HR' and salary > 120000;"
2314        t1 = Thread(target=self._run_queries, args=(query, 30,))
2315        t1.start()
2316        # rebalance out a indexer node when querying is in progress
2317        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
2318                                                 [], [node_out])
2319        reached = RestHelper(self.rest).rebalance_reached()
2320        self.assertTrue(reached, "rebalance failed, stuck or did not complete")
2321        rebalance.result()
2322        t1.join()
2323
2324        self.sleep(30)
2325
2326        # Get Stats and index partition map after rebalance
2327        node_list = copy.deepcopy(self.node_list)
2328        node_list.remove(node_out_str)
2329        self.log.info(node_list)
2330
2331        index_data_after = {}
2332
2333        for index in index_names:
2334            self.index_servers = self.get_nodes_from_services_map(
2335                service_type="index", get_all_nodes=True)
2336            _, total_item_count_after, _ = self.get_stats_for_partitioned_indexes(
2337                index_name=index, node_list=node_list)
2338            index_data_after[index] = {}
2339            index_data_after[index]["item_count"] = total_item_count_after
2340            index_data_after[index][
2341                "index_metadata"] = RestConnection(
2342                self.index_servers[0]).get_indexer_metadata()
2343
2344        for index in index_names:
2345            # Validate index item count before and after
2346            self.assertEqual(index_data_before[index]["item_count"],
2347                             index_data_after[index]["item_count"],
2348                             "Item count in index do not match after cluster ops.")
2349
2350            # Validate host list, partition count and partition distribution
2351            self.assertTrue(
2352                self.validate_partition_distribution_after_cluster_ops(
2353                    index, index_data_before[index]["index_metadata"],
2354                    index_data_after[index]["index_metadata"], [],
2355                    [node_out]),
2356                "Partition distribution post cluster ops has some issues")
2357
2358    def test_rebalance_out_with_partitioned_indexes_with_concurrent_querying_stop_and_resume(
2359            self):
2360        resume = self.input.param("resume_stopped_rebalance", False)
2361        resume_delay = self.input.param("resume_delay", 0)
2362
2363        self._load_emp_dataset(end=self.num_items)
2364
2365        # Create partitioned index
2366        if self.num_index_replicas > 0:
2367            create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_replica':{0}, 'num_partition':{1}}}".format(
2368                self.num_index_replicas, self.num_index_partitions)
2369            create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_replica':{0}, 'num_partition':{1}}}".format(
2370                self.num_index_replicas, self.num_index_partitions)
2371        else:
2372            create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_partition':{0}}}".format(
2373                self.num_index_partitions)
2374            create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_partition':{0}}}".format(
2375                self.num_index_partitions)
2376
2377        try:
2378            self.n1ql_helper.run_cbq_query(
2379                query=create_index_statement,
2380                server=self.n1ql_node)
2381            self.n1ql_helper.run_cbq_query(
2382                query=create_primary_index_statement,
2383                server=self.n1ql_node)
2384        except Exception, ex:
2385            self.log.info(str(ex))
2386
2387        self.sleep(30)
2388
2389        node_out = self.servers[self.node_out]
2390        node_out_str = node_out.ip + ":" + str(node_out.port)
2391
2392        # Get Index Names
2393        index_names = ["idx1", "pidx1"]
2394        if self.num_index_replicas > 0:
2395            for i in range(1, self.num_index_replicas + 1):
2396                index_names.append("idx1 (replica {0})".format(str(i)))
2397                index_names.append("pidx1 (replica {0})".format(str(i)))
2398
2399        self.log.info(index_names)
2400
2401        # Get Stats and index partition map before rebalance
2402        index_data_before = {}
2403        for index in index_names:
2404            _, total_item_count_before, _ = self.get_stats_for_partitioned_indexes(
2405                index_name=index)
2406            index_data_before[index] = {}
2407            index_data_before[index]["item_count"] = total_item_count_before
2408            index_data_before[index][
2409                "index_metadata"] = self.rest.get_indexer_metadata()
2410
2411        # start querying
2412        query = "select name,dept,salary from default where name is not missing and dept='HR' and salary > 120000;"
2413        t1 = Thread(target=self._run_queries, args=(query, 30,))
2414        t1.start()
2415        # rebalance out a indexer node when querying is in progress
2416        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
2417                                                 [], [node_out])
2418        stopped = RestConnection(self.master).stop_rebalance(
2419            wait_timeout=self.wait_timeout / 3)
2420        self.assertTrue(stopped, msg="unable to stop rebalance")
2421        rebalance.result()
2422
2423        if resume:
2424            if resume_delay > 0:
2425                self.sleep(resume_delay,
2426                           "Sleep for some time before resume stopped rebalance")
2427            rebalance = self.cluster.async_rebalance(
2428                self.servers[:self.nodes_init],
2429                [], [node_out])
2430
2431            reached = RestHelper(self.rest).rebalance_reached()
2432            self.assertTrue(reached,
2433                            "rebalance failed, stuck or did not complete")
2434            rebalance.result()
2435
2436        t1.join()
2437
2438        self.sleep(30)
2439
2440        # Get Stats and index partition map after rebalance
2441        node_list = copy.deepcopy(self.node_list)
2442        node_list.remove(node_out_str)
2443        self.log.info(node_list)
2444
2445        index_data_after = {}
2446        for index in index_names:
2447            self.index_servers = self.get_nodes_from_services_map(
2448                service_type="index", get_all_nodes=True)
2449            _, total_item_count_after, _ = self.get_stats_for_partitioned_indexes(
2450                index_name=index, node_list=node_list)
2451            index_data_after[index] = {}
2452            index_data_after[index]["item_count"] = total_item_count_after
2453            index_data_after[index][
2454                "index_metadata"] = RestConnection(self.index_servers[0]).get_indexer_metadata()
2455
2456        for index in index_names:
2457            # Validate index item count before and after
2458            self.assertEqual(index_data_before[index]["item_count"],
2459                             index_data_after[index]["item_count"],
2460                             "Item count in index do not match after cluster ops.")
2461
2462            # Validate host list, partition count and partition distribution
2463            self.assertTrue(
2464                self.validate_partition_distribution_after_cluster_ops(
2465                    index, index_data_before[index]["index_metadata"],
2466                    index_data_after[index]["index_metadata"], [],
2467                    [node_out]),
2468                "Partition distribution post cluster ops has some issues")
2469
2470    def test_rebalance_in_with_partitioned_indexes_with_concurrent_querying(
2471            self):
2472        self._load_emp_dataset(end=self.num_items)
2473
2474        with_statement = "with {{'num_partition':{0}".format(
2475            self.num_index_partitions)
2476        if self.num_index_replicas > 0:
2477            with_statement += ", 'num_replica':{0}".format(
2478                self.num_index_replicas)
2479        if self.defer_build:
2480            with_statement += ", 'defer_build': true"
2481        with_statement += " }"
2482
2483        create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) " + with_statement
2484        create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) " + with_statement
2485
2486        try:
2487            self.n1ql_helper.run_cbq_query(
2488                query=create_index_statement,
2489                server=self.n1ql_node)
2490            self.n1ql_helper.run_cbq_query(
2491                query=create_primary_index_statement,
2492                server=self.n1ql_node)
2493        except Exception, ex:
2494            self.log.info(str(ex))
2495
2496        self.sleep(30)
2497
2498        node_in = self.servers[self.nodes_init]
2499        node_in_str = node_in.ip + ":" + str(node_in.port)
2500        services_in = ["index"]
2501
2502        # Get Index Names
2503        index_names = ["idx1", "pidx1"]
2504        if self.num_index_replicas > 0:
2505            for i in range(1, self.num_index_replicas + 1):
2506                index_names.append("idx1 (replica {0})".format(str(i)))
2507                index_names.append("pidx1 (replica {0})".format(str(i)))
2508
2509        self.log.info(index_names)
2510
2511        # Get Stats and index partition map before rebalance
2512        index_data_before = {}
2513        for index in index_names:
2514            _, total_item_count_before, _ = self.get_stats_for_partitioned_indexes(
2515                index_name=index)
2516            index_data_before[index] = {}
2517            index_data_before[index]["item_count"] = total_item_count_before
2518            index_data_before[index][
2519                "index_metadata"] = self.rest.get_indexer_metadata()
2520
2521        # start querying
2522        query = "select name,dept,salary from default where name is not missing and dept='HR' and salary > 120000;"
2523        t1 = Thread(target=self._run_queries, args=(query, 30,))
2524        t1.start()
2525        # rebalance out a indexer node when querying is in progress
2526        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
2527                                                 [node_in], [],
2528                                                 services=services_in)
2529        reached = RestHelper(self.rest).rebalance_reached()
2530        self.assertTrue(reached, "rebalance failed, stuck or did not complete")
2531        rebalance.result()
2532        t1.join()
2533
2534        self.sleep(30)
2535
2536        # Get Stats and index partition map after rebalance
2537        node_list = copy.deepcopy(self.node_list)
2538        node_list.append(node_in_str)
2539        self.log.info(node_list)
2540
2541        index_data_after = {}
2542        for index in index_names:
2543            self.index_servers = self.get_nodes_from_services_map(
2544                service_type="index", get_all_nodes=True)
2545            _, total_item_count_after, _ = self.get_stats_for_partitioned_indexes(
2546                index_name=index, node_list=node_list)
2547            index_data_after[index] = {}
2548            index_data_after[index]["item_count"] = total_item_count_after
2549            index_data_after[index][
2550                "index_metadata"] = RestConnection(self.index_servers[0]).get_indexer_metadata()
2551
2552        for index in index_names:
2553            # Validate index item count before and after
2554            self.assertEqual(index_data_before[index]["item_count"],
2555                             index_data_after[index]["item_count"],
2556                             "Item count in index do not match after cluster ops.")
2557
2558            # Validate host list, partition count and partition distribution
2559            self.assertTrue(
2560                self.validate_partition_distribution_after_cluster_ops(
2561                    index, index_data_before[index]["index_metadata"],
2562                    index_data_after[index]["index_metadata"], [node_in],
2563                    []),
2564                "Partition distribution post cluster ops has some issues")
2565
2566    def test_swap_rebalance_with_partitioned_indexes_with_concurrent_querying(
2567            self):
2568        self._load_emp_dataset(end=self.num_items)
2569
2570        with_statement = "with {{'num_partition':{0}".format(
2571            self.num_index_partitions)
2572        if self.num_index_replicas > 0:
2573            with_statement += ", 'num_replica':{0}".format(
2574                self.num_index_replicas)
2575        if self.defer_build:
2576            with_statement += ", 'defer_build': true"
2577        with_statement += " }"
2578
2579        create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) " + with_statement
2580        create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) " + with_statement
2581
2582        try:
2583            self.n1ql_helper.run_cbq_query(
2584                query=create_index_statement,
2585                server=self.n1ql_node)
2586            self.n1ql_helper.run_cbq_query(
2587                query=create_primary_index_statement,
2588                server=self.n1ql_node)
2589        except Exception, ex:
2590            self.log.info(str(ex))
2591
2592        self.sleep(30)
2593
2594        node_out = self.servers[self.node_out]
2595        node_out_str = node_out.ip + ":" + str(node_out.port)
2596
2597        node_in = self.servers[self.nodes_init]
2598        node_in_str = node_in.ip + ":" + str(node_in.port)
2599        services_in = ["index"]
2600
2601        # Get Index Names
2602        index_names = ["idx1", "pidx1"]
2603        if self.num_index_replicas > 0:
2604            for i in range(1, self.num_index_replicas + 1):
2605                index_names.append("idx1 (replica {0})".format(str(i)))
2606                index_names.append("pidx1 (replica {0})".format(str(i)))
2607
2608        self.log.info(index_names)
2609
2610        # Get Stats and index partition map before rebalance
2611        index_data_before = {}
2612        for index in index_names:
2613            _, total_item_count_before, _ = self.get_stats_for_partitioned_indexes(
2614                index_name=index)
2615            index_data_before[index] = {}
2616            index_data_before[index]["item_count"] = total_item_count_before
2617            index_data_before[index][
2618                "index_metadata"] = self.rest.get_indexer_metadata()
2619
2620        try:
2621            # start querying
2622            query = "select name,dept,salary from default where name is not missing and dept='HR' and salary > 120000;"
2623            t1 = Thread(target=self._run_queries, args=(query, 30,))
2624            t1.start()
2625            # rebalance out a indexer node when querying is in progress
2626            rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
2627                                                     [node_in], [node_out],
2628                                                     services=services_in)
2629            reached = RestHelper(self.rest).rebalance_reached()
2630            self.assertTrue(reached, "rebalance failed, stuck or did not complete")
2631            rebalance.result()
2632            t1.join()
2633        except Exception, ex:
2634            self.log.info(str(ex))
2635
2636        self.sleep(30)
2637
2638        # Get Stats and index partition map after rebalance
2639        node_list = copy.deepcopy(self.node_list)
2640        node_list.append(node_in_str)
2641        node_list.remove(node_out_str)
2642
2643        index_data_after = {}
2644        for index in index_names:
2645            self.index_servers = self.get_nodes_from_services_map(
2646                service_type="index", get_all_nodes=True)
2647            _, total_item_count_after, _ = self.get_stats_for_partitioned_indexes(
2648                index_name=index, node_list=node_list)
2649            index_data_after[index] = {}
2650            index_data_after[index]["item_count"] = total_item_count_after
2651            index_data_after[index][
2652                "index_metadata"] = RestConnection(self.index_servers[0]).get_indexer_metadata()
2653
2654        for index in index_names:
2655            # Validate index item count before and after
2656            self.assertEqual(index_data_before[index]["item_count"],
2657                             index_data_after[index]["item_count"],
2658                             "Item count in index do not match after cluster ops.")
2659
2660            # Validate host list, partition count and partition distribution
2661            self.assertTrue(
2662                self.validate_partition_distribution_after_cluster_ops(
2663                    index, index_data_before[index]["index_metadata"],
2664                    index_data_after[index]["index_metadata"], [node_in],
2665                    [node_out]),
2666                "Partition distribution post cluster ops has some issues")
2667
2668    def test_failover_with_partitioned_indexes_with_concurrent_querying(
2669            self):
2670        self._load_emp_dataset(end=self.num_items)
2671
2672        # Create partitioned index
2673        if self.num_index_replicas > 0:
2674            create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_replica':{0}, 'num_partition':{1}}}".format(
2675                self.num_index_replicas, self.num_index_partitions)
2676            create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_replica':{0}, 'num_partition':{1}}}".format(
2677                self.num_index_replicas, self.num_index_partitions)
2678        else:
2679            create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_partition':{1}}}".format(
2680                self.num_index_replicas, self.num_index_partitions)
2681            create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_partition':{1}}}".format(
2682                self.num_index_replicas, self.num_index_partitions)
2683
2684        try:
2685            self.n1ql_helper.run_cbq_query(
2686                query=create_index_statement,
2687                server=self.n1ql_node)
2688            self.n1ql_helper.run_cbq_query(
2689                query=create_primary_index_statement,
2690                server=self.n1ql_node)
2691        except Exception, ex:
2692            self.log.info(str(ex))
2693
2694        self.sleep(30)
2695
2696        node_out = self.servers[self.node_out]
2697        node_out_str = node_out.ip + ":" + str(node_out.port)
2698
2699        # Get Index Names
2700        index_names = ["idx1", "pidx1"]
2701        if self.num_index_replicas > 0:
2702            for i in range(1, self.num_index_replicas + 1):
2703                index_names.append("idx1 (replica {0})".format(str(i)))
2704                index_names.append("pidx1 (replica {0})".format(str(i)))
2705
2706        self.log.info(index_names)
2707
2708        # Get Stats and index partition map before rebalance
2709        index_data_before = {}
2710        for index in index_names:
2711            _, total_item_count_before, _ = self.get_stats_for_partitioned_indexes(
2712                index_name=index)
2713            index_data_before[index] = {}
2714            index_data_before[index]["item_count"] = total_item_count_before
2715            index_data_before[index][
2716                "index_metadata"] = self.rest.get_indexer_metadata()
2717
2718        # start querying
2719        query = "select name,dept,salary from default where name is not missing and dept='HR' and salary > 120000;"
2720        t1 = Thread(target=self._run_queries, args=(query, 30,))
2721        t1.start()
2722
2723        # failover and rebalance out a indexer node when querying is in progress
2724        failover_task = self.cluster.async_failover(
2725            self.servers[:self.nodes_init],
2726            [node_out],
2727            self.graceful, wait_for_pending=180)
2728
2729        failover_task.result()
2730
2731        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
2732                                                 [], [node_out])
2733        reached = RestHelper(self.rest).rebalance_reached()
2734        self.assertTrue(reached, "rebalance failed, stuck or did not complete")
2735        rebalance.result()
2736        t1.join()
2737
2738        self.sleep(30)
2739
2740        # Get Stats and index partition map after rebalance
2741        node_list = copy.deepcopy(self.node_list)
2742        node_list.remove(node_out_str)
2743        self.log.info(node_list)
2744
2745        index_data_after = {}
2746        for index in index_names:
2747            self.index_servers = self.get_nodes_from_services_map(
2748                service_type="index", get_all_nodes=True)
2749            _, total_item_count_after, _ = self.get_stats_for_partitioned_indexes(
2750                index_name=index, node_list=node_list)
2751            index_data_after[index] = {}
2752            index_data_after[index]["item_count"] = total_item_count_after
2753            index_data_after[index][
2754                "index_metadata"] = RestConnection(self.index_servers[0]).get_indexer_metadata()
2755
2756        for index in index_names:
2757            # Validate index item count before and after
2758            self.assertEqual(index_data_before[index]["item_count"],
2759                             index_data_after[index]["item_count"],
2760                             "Item count in index do not match after cluster ops.")
2761
2762            # Validate host list, partition count and partition distribution
2763            self.assertTrue(
2764                self.validate_partition_distribution_after_cluster_ops(
2765                    index, index_data_before[index]["index_metadata"],
2766                    index_data_after[index]["index_metadata"], [],
2767                    [node_out]),
2768                "Partition distribution post cluster ops has some issues")
2769
2770    def test_failover_addback_with_partitioned_indexes_with_concurrent_querying(
2771            self):
2772        self._load_emp_dataset(end=self.num_items)
2773
2774        # Create partitioned index
2775        if self.num_index_replicas > 0:
2776            create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_replica':{0}, 'num_partition':{1}}}".format(
2777                self.num_index_replicas, self.num_index_partitions)
2778            create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_replica':{0}, 'num_partition':{1}}}".format(
2779                self.num_index_replicas, self.num_index_partitions)
2780        else:
2781            create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_partition':{1}}}".format(
2782                self.num_index_replicas, self.num_index_partitions)
2783            create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_partition':{1}}}".format(
2784                self.num_index_replicas, self.num_index_partitions)
2785
2786        try:
2787            self.n1ql_helper.run_cbq_query(
2788                query=create_index_statement,
2789                server=self.n1ql_node)
2790            self.n1ql_helper.run_cbq_query(
2791                query=create_primary_index_statement,
2792                server=self.n1ql_node)
2793        except Exception, ex:
2794            self.log.info(str(ex))
2795
2796        self.sleep(30)
2797
2798        node_out = self.servers[self.node_out]
2799        node_out_str = node_out.ip + ":" + str(node_out.port)
2800
2801        # Get Index Names
2802        index_names = ["idx1", "pidx1"]
2803        if self.num_index_replicas > 0:
2804            for i in range(1, self.num_index_replicas + 1):
2805                index_names.append("idx1 (replica {0})".format(str(i)))
2806                index_names.append("pidx1 (replica {0})".format(str(i)))
2807
2808        self.log.info(index_names)
2809
2810        # Get Stats and index partition map before rebalance
2811        index_data_before = {}
2812        for index in index_names:
2813            _, total_item_count_before, _ = self.get_stats_for_partitioned_indexes(
2814                index_name=index)
2815            index_data_before[index] = {}
2816            index_data_before[index]["item_count"] = total_item_count_before
2817            index_data_before[index][
2818                "index_metadata"] = self.rest.get_indexer_metadata()
2819
2820        # start querying
2821        query = "select name,dept,salary from default where name is not missing and dept='HR' and salary > 120000;"
2822        t1 = Thread(target=self._run_queries, args=(query, 30,))
2823        t1.start()
2824
2825        # failover and rebalance out a indexer node when querying is in progress
2826        nodes_all = self.rest.node_statuses()
2827        for node in nodes_all:
2828            if node.ip == node_out.ip:
2829                break
2830
2831        failover_task = self.cluster.async_failover(
2832            self.servers[:self.nodes_init],
2833            [node_out],
2834            self.graceful, wait_for_pending=180)
2835
2836        failover_task.result()
2837
2838        self.rest.set_recovery_type(node.id, self.recovery_type)
2839        self.rest.add_back_node(node.id)
2840
2841        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
2842                                                 [], [])
2843
2844        reached = RestHelper(self.rest).rebalance_reached()
2845        self.assertTrue(reached, "rebalance failed, stuck or did not complete")
2846        rebalance.result()
2847        t1.join()
2848
2849        self.sleep(30)
2850
2851        # Get Stats and index partition map after rebalance
2852        index_data_after = {}
2853        for index in index_names:
2854            self.index_servers = self.get_nodes_from_services_map(
2855                service_type="index", get_all_nodes=True)
2856            _, total_item_count_after, _ = self.get_stats_for_partitioned_indexes(
2857                index_name=index)
2858            index_data_after[index] = {}
2859            index_data_after[index]["item_count"] = total_item_count_after
2860            index_data_after[index][
2861                "index_metadata"] = RestConnection(self.index_servers[0]).get_indexer_metadata()
2862
2863        for index in index_names:
2864            # Validate index item count before and after
2865            self.assertEqual(index_data_before[index]["item_count"],
2866                             index_data_after[index]["item_count"],
2867                             "Item count in index do not match after cluster ops.")
2868
2869            # Validate host list, partition count and partition distribution
2870            self.assertTrue(
2871                self.validate_partition_distribution_after_cluster_ops(
2872                    index, index_data_before[index]["index_metadata"],
2873                    index_data_after[index]["index_metadata"], [],
2874                    []),
2875                "Partition distribution post cluster ops has some issues")
2876
2877    def test_kv_rebalance_out_with_partitioned_indexes_with_concurrent_querying(
2878            self):
2879        self._load_emp_dataset(end=self.num_items)
2880
2881        # Create partitioned index
2882        if self.num_index_replicas > 0:
2883            create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_replica':{0}, 'num_partition':{1}}}".format(
2884                self.num_index_replicas, self.num_index_partitions)
2885            create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_replica':{0}, 'num_partition':{1}}}".format(
2886                self.num_index_replicas, self.num_index_partitions)
2887        else:
2888            create_index_statement = "CREATE INDEX idx1 on default(name,dept,salary) partition by hash(name) with {{'num_partition':{1}}}".format(
2889                self.num_index_replicas, self.num_index_partitions)
2890            create_primary_index_statement = "CREATE PRIMARY INDEX pidx1 on default partition by hash(meta().id) with {{'num_partition':{1}}}".format(
2891                self.num_index_replicas, self.num_index_partitions)
2892
2893        try:
2894            self.n1ql_helper.run_cbq_query(
2895                query=create_index_statement,
2896                server=self.n1ql_node)
2897            self.n1ql_helper.run_cbq_query(
2898                query=create_primary_index_statement,
2899                server=self.n1ql_node)
2900        except Exception, ex:
2901            self.log.info(str(ex))
2902
2903        self.sleep(30)
2904
2905        node_out = self.servers[self.node_out]
2906        node_out_str = node_out.ip + ":" + str(node_out.port)
2907
2908        # Get Index Names
2909        index_names = ["idx1", "pidx1"]
2910        if self.num_index_replicas > 0:
2911            for i in range(1, self.num_index_replicas + 1):
2912                index_names.append("idx1 (replica {0})".format(str(i)))
2913                index_names.append("pidx1 (replica {0})".format(str(i)))
2914
2915        self.log.info(index_names)
2916
2917        # Get Stats and index partition map before rebalance
2918        index_data_before = {}
2919        for index in index_names:
2920            _, total_item_count_before, _ = self.get_stats_for_partitioned_indexes(
2921                index_name=index)
2922            index_data_before[index] = {}
2923            index_data_before[index]["item_count"] = total_item_count_before
2924            index_data_before[index][
2925                "index_metadata"] = self.rest.get_indexer_metadata()
2926
2927        # start querying
2928        query = "select name,dept,salary from default where name is not missing and dept='HR' and salary > 120000;"
2929        t1 = Thread(target=self._run_queries, args=(query, 30,))
2930        t1.start()
2931        # rebalance out a indexer node when querying is in progress
2932        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
2933                                                 [], [node_out])
2934        reached = RestHelper(self.rest).rebalance_reached()
2935        self.assertTrue(reached, "rebalance failed, stuck or did not complete")
2936        rebalance.result()
2937        t1.join()
2938
2939        self.sleep(30)
2940
2941        # Get Stats and index partition map after rebalance
2942
2943        index_data_after = {}
2944        for index in index_names:
2945            self.index_servers = self.get_nodes_from_services_map(
2946                service_type="index", get_all_nodes=True)
2947            _, total_item_count_after, _ = self.get_stats_for_partitioned_indexes(
2948                index_name=index)
2949            index_data_after[index] = {}
2950            index_data_after[index]["item_count"] = total_item_count_after
2951            index_data_after[index][
2952                "index_metadata"] = RestConnec