1import logging
2from base_2i import BaseSecondaryIndexingTests
3from couchbase_helper.query_definitions import QueryDefinition
4from membase.api.rest_client import RestConnection
5from membase.helper.bucket_helper import BucketOperationHelper
6from remote.remote_util import RemoteMachineShellConnection
7
8
9log = logging.getLogger(__name__)
10class SecondaryIndexingCreateDropTests(BaseSecondaryIndexingTests):
11
12    def setUp(self):
13        super(SecondaryIndexingCreateDropTests, self).setUp()
14
15    def tearDown(self):
16        super(SecondaryIndexingCreateDropTests, self).tearDown()
17
18    def test_multi_create_drop_index(self):
19        if self.run_async:
20            tasks = self.async_run_multi_operations(buckets = self.buckets, query_definitions = self.query_definitions, create_index = True, drop_index = False)
21            for task in tasks:
22                task.result()
23            tasks = self.async_run_multi_operations(buckets = self.buckets, query_definitions = self.query_definitions, create_index = False, drop_index = True)
24            for task in tasks:
25                task.result()
26        else:
27            self.run_multi_operations(buckets = self.buckets, query_definitions = self.query_definitions, create_index = True, drop_index = True)
28
29    def test_create_index_on_empty_bucket(self):
30        """
31        Fix for MB-15329
32        Create indexes on empty buckets
33        :return:
34        """
35        rest = RestConnection(self.master)
36        for bucket in self.buckets:
37            log.info("Flushing bucket {0}...".format(bucket.name))
38            rest.flush_bucket(bucket)
39        self.sleep(30)
40        self.multi_create_index(buckets=self.buckets,query_definitions=self.query_definitions)
41        self._verify_bucket_count_with_index_count()
42
43    def test_deployment_plan_with_defer_build_plan_create_drop_index(self):
44        self.run_async = True
45        self.test_multi_create_drop_index()
46
47    def test_deployment_plan_with_nodes_only_plan_create_drop_index_for_secondary_index(self):
48        query_definitions = []
49        tasks = []
50        verification_map ={}
51        query_definition_map ={}
52        servers = self.get_nodes_from_services_map(service_type = "index", get_all_nodes = True)
53        try:
54            servers.reverse()
55            for bucket in self.buckets:
56                query_definition_map[bucket.name] =[]
57                for server in servers:
58                    index_name = "index_name_ip_{0}_port_{1}_{2}".format(server.ip.replace(".","_"),server.port,bucket.name)
59                    query_definition = QueryDefinition(index_name=index_name, index_fields = ["join_yr"], \
60                        query_template = "", groups = [])
61                    query_definition_map[bucket.name].append(query_definition)
62                    query_definitions.append(query_definition)
63                    node_key = "{0}:{1}".format(server.ip,server.port)
64                    deploy_node_info = [node_key]
65                    if node_key not in verification_map.keys():
66                        verification_map[node_key] = {}
67                    verification_map[node_key][bucket.name]=index_name
68                    tasks.append(self.async_create_index(bucket.name, query_definition, deploy_node_info = deploy_node_info))
69                for task in tasks:
70                    task.result()
71            index_map = self.get_index_stats(perNode=True)
72            self.log.info(index_map)
73            for bucket in self.buckets:
74                for node in index_map.keys():
75                    self.log.info(" verifying node {0}".format(node))
76                    self.assertTrue(verification_map[node][bucket.name] in index_map[node][bucket.name].keys(), \
77                        "for bucket {0} and node {1}, could not find key {2} in {3}".format(bucket.name, node, verification_map[node][bucket.name],index_map))
78        except Exception, ex:
79            self.log.info(ex)
80            raise
81        finally:
82            for bucket in self.buckets:
83                self.log.info("<<<<<<<<<<<< drop index {0} >>>>>>>>>>>".format(bucket.name))
84                self.run_multi_operations(buckets = [bucket], query_definitions = query_definition_map[bucket.name], drop_index = True)
85
86    def test_fail_deployment_plan_defer_build_same_name_index(self):
87        query_definitions = []
88        tasks = []
89        index_name = "test_deployment_plan_defer_build_same_name_index"
90        servers = self.get_nodes_from_services_map(service_type = "index", get_all_nodes = True)
91        try:
92            servers.reverse()
93            for server in servers:
94                self.defer_build=True
95                query_definition = QueryDefinition(index_name=index_name, index_fields = ["join_yr"], \
96                    query_template = "", groups = [])
97                query_definitions.append(query_definition)
98                deploy_node_info = ["{0}:{1}".format(server.ip,server.port)]
99                tasks.append(self.async_create_index(self.buckets[0], query_definition, deploy_node_info = deploy_node_info))
100            for task in tasks:
101                task.result()
102        except Exception, ex:
103            msg =  "index test_deployment_plan_defer_build_same_name_index already exist"
104            self.assertTrue(msg in str(ex),ex)
105
106    def test_concurrent_deployment_plan_defer_build_different_name_index(self):
107        query_definitions = []
108        tasks = []
109        self.defer_build = True
110        index_name = "test_concurrent_deployment_plan_defer_build_different_name_index"
111        servers = self.get_nodes_from_services_map(service_type = "index", get_all_nodes = True)
112        try:
113            servers.reverse()
114            for server in servers:
115                self.defer_build=True
116                for index in range(0,10):
117                    query_definition = QueryDefinition(index_name=index_name+"_"+str(index), index_fields = ["join_yr"], \
118                        query_template = "", groups = [])
119                    query_definitions.append(query_definition)
120                    deploy_node_info = ["{0}:{1}".format(server.ip,server.port)]
121                    tasks.append(self.async_create_index(self.buckets[0], query_definition, deploy_node_info = deploy_node_info))
122            for task in tasks:
123                task.result()
124            index_list = [query_definition.index_name for query_definition in query_definitions]
125            task = self.async_build_index(bucket = "default", index_list = index_list)
126            task.result()
127            tasks = []
128            for index_name in index_list:
129                tasks.append(self.async_monitor_index(bucket = "default", index_name = index_name))
130            for task in tasks:
131                task.result()
132        except Exception, ex:
133            msg =  "Index test_deployment_plan_defer_build_same_name_index already exist"
134            self.assertTrue(msg in str(ex), ex)
135
136    def test_failure_concurrent_create_index(self):
137        try:
138            self.run_async = True
139            tasks = []
140            for query_definition in self.query_definitions:
141                tasks.append(self.async_create_index(self.buckets[0].name, query_definition))
142            for task in tasks:
143                task.result()
144            self.assertTrue(False, " Created indexes concurrently, should have failed! ")
145        except Exception, ex:
146            msg = "Build Already In Progress"
147            self.assertTrue(msg in str(ex),ex)
148
149    def test_deployment_plan_with_nodes_only_plan_create_drop_index_for_primary_index(self):
150        server = self.get_nodes_from_services_map(service_type = "n1ql")
151        self.query = "DROP PRIMARY INDEX ON {0} using gsi".format(self.buckets[0].name)
152        try:
153            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
154        except Exception, ex:
155            self.log.info(ex)
156        query_definitions = []
157        servers = self.get_nodes_from_services_map(service_type = "index", get_all_nodes = True)
158        deploy_node_info = ["{0}:{1}".format(servers[0].ip,servers[0].port)]
159        self.query = "CREATE PRIMARY INDEX ON {0} using gsi".format(self.buckets[0].name)
160        deployment_plan = {}
161        if deploy_node_info  != None:
162            deployment_plan["nodes"] = deploy_node_info
163        if self.defer_build != None:
164            deployment_plan["defer_build"] = self.defer_build
165        if len(deployment_plan) != 0:
166            self.query += " WITH "+ str(deployment_plan)
167        try:
168            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
169            if self.defer_build:
170                build_index_task = self.async_build_index(self.buckets[0], ["`#primary`"])
171                build_index_task.result()
172            check = self.n1ql_helper.is_index_online_and_in_list(self.buckets[0], "#primary", server = server)
173            self.assertTrue(check, "index primary failed to be created")
174
175            self.query = "DROP PRIMARY INDEX ON {0} using gsi".format(self.buckets[0].name)
176            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
177        except Exception, ex:
178            self.log.info(ex)
179            raise
180
181    def test_create_primary_using_views_with_existing_primary_index_gsi(self):
182        query_definition = QueryDefinition(
183            index_name="test_failure_create_primary_using_views_with_existing_primary_index_gsi",
184            index_fields = "crap",
185            query_template = "",
186            groups = [])
187        check = False
188        self.query = "CREATE PRIMARY INDEX ON {0} USING VIEW".format(self.buckets[0].name)
189        try:
190            # create index
191            server = self.get_nodes_from_services_map(service_type = "n1ql")
192            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
193        except Exception, ex:
194            self.log.info(ex)
195            raise
196
197    def test_create_primary_using_gsi_with_existing_primary_index_views(self):
198        query_definition = QueryDefinition(
199            index_name="test_failure_create_primary_using_gsi_with_existing_primary_index_views",
200            index_fields = "crap",
201            query_template = "",
202            groups = [])
203        check = False
204        self.query = "CREATE PRIMARY INDEX ON {0} USING GSI".format(self.buckets[0].name)
205        try:
206            # create index
207            server = self.get_nodes_from_services_map(service_type = "n1ql")
208            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
209        except Exception, ex:
210            self.log.info(ex)
211            raise
212
213    def test_create_gsi_index_existing_view_index(self):
214        self.indexes= self.input.param("indexes","").split(":")
215        query_definition = QueryDefinition(
216            index_name="test_failure_create_index_existing_index",
217            index_fields = self.indexes,
218            query_template = "",
219            groups = [])
220        self.query = query_definition.generate_index_create_query(bucket = self.buckets[0].name,
221         use_gsi_for_secondary = False, gsi_type=self.gsi_type)
222        try:
223            # create index
224            server = self.get_nodes_from_services_map(service_type = "n1ql")
225            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
226            # create same index again
227            self.query = query_definition.generate_index_create_query(bucket = self.buckets[0].name,
228            use_gsi_for_secondary = True, gsi_type=self.gsi_type)
229            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
230        except Exception, ex:
231            self.log.info(ex)
232            raise
233        finally:
234            self.query = query_definition.generate_index_drop_query(bucket = self.buckets[0].name)
235            actual_result = self.n1ql_helper.run_cbq_query(query = self.query, server = server)
236
237    def test_failure_create_index_big_fields(self):
238        field_name = ""
239        field_name += ",".join([ str(a) for a in range(1,100)]).replace(",","_")
240        query_definition = QueryDefinition(
241            index_name="test_failure_create_index_existing_index",
242            index_fields = field_name,
243            query_template = "",
244            groups = [])
245        self.query = query_definition.generate_index_create_query(bucket = self.buckets[0], gsi_type=self.gsi_type)
246        try:
247            # create index
248            server = self.get_nodes_from_services_map(service_type = "n1ql")
249            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
250        except Exception, ex:
251            msg="Expression not indexable"
252            self.assertTrue(msg in str(ex),
253                " 5000 error not recived as expected {0}".format(ex))
254
255    def test_create_gsi_index_without_primary_index(self):
256        self.indexes= self.input.param("indexes","").split(":")
257        query_definition = QueryDefinition(
258            index_name="test_failure_create_index_existing_index",
259            index_fields = self.indexes,
260            query_template = "",
261            groups = [])
262        self.query = query_definition.generate_index_create_query(bucket = self.buckets[0].name, gsi_type=self.gsi_type)
263        try:
264            # create index
265            server = self.get_nodes_from_services_map(service_type = "n1ql")
266            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
267        except Exception, ex:
268            msg="Keyspace not_present_bucket name not found - cause: Bucket not_present_bucket not found"
269            self.assertTrue(msg in str(ex),
270                " 5000 error not recived as expected {0}".format(ex))
271
272    def test_failure_create_index_non_existing_bucket(self):
273        self.indexes= self.input.param("indexes","").split(":")
274        query_definition = QueryDefinition(
275            index_name="test_failure_create_index_existing_index",
276            index_fields = self.indexes,
277            query_template = "",
278            groups = [])
279        self.query = query_definition.generate_index_create_query(bucket = "not_present_bucket", gsi_type=self.gsi_type)
280        try:
281            # create index
282            server = self.get_nodes_from_services_map(service_type = "n1ql")
283            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
284        except Exception, ex:
285            msg="Keyspace not found keyspace not_present_bucket - cause: No bucket named not_present_bucket"
286            self.assertTrue(msg in str(ex),
287                " 5000 error not recived as expected {0}".format(ex))
288
289    def test_failure_drop_index_non_existing_bucket(self):
290        query_definition = QueryDefinition(
291            index_name="test_failure_create_index_existing_index",
292            index_fields = "crap",
293            query_template = "",
294            groups = [])
295        self.query = query_definition.generate_index_drop_query(bucket = "not_present_bucket")
296        try:
297            # create index
298            server = self.get_nodes_from_services_map(service_type = "n1ql")
299            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
300        except Exception, ex:
301            msg="Keyspace not found keyspace not_present_bucket - cause: No bucket named not_present_bucket"
302            self.assertTrue(msg in str(ex),
303                " 5000 error not recived as expected {0}".format(ex))
304
305    def test_failure_drop_index_non_existing_index(self):
306        query_definition = QueryDefinition(
307            index_name="test_failure_create_index_existing_index",
308            index_fields = "crap",
309            query_template = "",
310            groups = [])
311        self.query = query_definition.generate_index_drop_query(bucket = self.buckets[0].name)
312        try:
313            # create index
314            server = self.get_nodes_from_services_map(service_type = "n1ql")
315            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
316        except Exception, ex:
317            msg="GSI index test_failure_create_index_existing_index not found"
318            self.assertTrue(msg in str(ex),
319                " 5000 error not recived as expected {0}".format(ex))
320
321    def test_failure_create_index_existing_index(self):
322        self.indexes= self.input.param("indexes","").split(":")
323        query_definition = QueryDefinition(
324            index_name="test_failure_create_index_existing_index",
325            index_fields = self.indexes,
326            query_template = "",
327            groups = [])
328        self.query = query_definition.generate_index_create_query(bucket = self.buckets[0].name, gsi_type=self.gsi_type)
329        try:
330            # create index
331            server = self.get_nodes_from_services_map(service_type = "n1ql")
332            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
333            # create same index again
334            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
335        except Exception, ex:
336            self.assertTrue("index test_failure_create_index_existing_index already exist" in str(ex),
337                " 5000 error not recived as expected {0}".format(ex))
338        finally:
339            self.query = query_definition.generate_index_drop_query(bucket = self.buckets[0].name)
340            self.n1ql_helper.run_cbq_query(query = self.query, server = server)
341
342    def test_fail_create_kv_node_down(self):
343        servr_out =[]
344        servr_out = self.get_nodes_from_services_map(service_type = "kv", get_all_nodes = True)
345        node_out = servr_out[1]
346        if servr_out[1] == self.servers[0]:
347            node_out = servr_out[0]
348        remote = RemoteMachineShellConnection(node_out)
349        remote.stop_server()
350        self.sleep(10)
351        index_name = self.query_definitions[0].index_name
352        self.query = self.query_definitions[0].generate_index_create_query(bucket = self.buckets[0].name,
353                                                                           gsi_type=self.gsi_type)
354        try:
355            res = self.n1ql_helper.run_cbq_query(query = self.query, server = self.n1ql_node)
356            self.log.info(res)
357        except Exception, ex:
358            msg = "cause: Encountered transient error.  Index creation will be retried in background."
359            self.log.info(ex)
360            self.assertTrue(msg in str(ex), ex)
361        finally:
362            remote = RemoteMachineShellConnection(node_out)
363            remote.start_server()
364
365    def test_fail_drop_index_node_down(self):
366        try:
367            self.run_multi_operations(buckets = self.buckets,
368                query_definitions = self.query_definitions, create_index = True, drop_index = False)
369            servr_out = self.get_nodes_from_services_map(service_type = "index", get_all_nodes = True)
370            failover_task = self.cluster.async_failover([self.master],
371                    failover_nodes = servr_out, graceful=self.graceful)
372            failover_task.result()
373            self.sleep(10)
374            self.query = self.query_definitions[0].generate_index_drop_query(bucket = self.buckets[0].name, use_gsi_for_secondary = self.use_gsi_for_secondary, use_gsi_for_primary = self.use_gsi_for_primary)
375            self.n1ql_helper.run_cbq_query(query = self.query, server = self.n1ql_node)
376            self.log.info(" non-existant indexes cannot be dropped ")
377        except Exception, ex:
378            self.log.info(ex)
379            msg = "GSI index {0} not found".format(self.query_definitions[0].index_name)
380            self.assertTrue(msg in str(ex), ex)
381
382    def test_delete_bucket_while_index_build(self):
383        create_index_task = []
384        index_list = []
385        self.defer_build=True
386        for bucket in self.buckets:
387            for query_definition in self.query_definitions:
388                create_index_task.append(self.async_create_index(bucket.name, query_definition))
389                index_list.append(query_definition.index_name)
390        for task in create_index_task:
391            task.result()
392        try:
393            for bucket in self.buckets:
394                build_task = self.async_build_index(bucket, index_list)
395                log.info("Deleting bucket {0}".format(bucket.name))
396                BucketOperationHelper.delete_bucket_or_assert(serverInfo=self.master, bucket=bucket.name)
397                build_task.result()
398        except Exception as ex:
399            msg = "Keyspace not found keyspace"
400            self.assertIn(msg, str(ex), str(ex))
401            log.info("Error while building index Expected...")
402
403    def test_ambiguity_in_gsi_indexes_due_to_node_down(self):
404        servr_out = self.get_nodes_from_services_map(service_type = "index")
405        query_definitions = []
406        tasks = []
407        try:
408            query_definition = QueryDefinition(index_name="test_ambiguity_in_gsi_indexes_due_to_node_down", index_fields = ["join_yr"], \
409                    query_template = "SELECT * from %s WHERE join_yr > 1999", groups = [])
410            query_definitions.append(query_definition)
411            deploy_node_info = ["{0}:{1}".format(servr_out.ip,servr_out.port)]
412            task = self.async_create_index(self.buckets[0].name, query_definition, deploy_node_info = deploy_node_info)
413            task.result()
414            remote = RemoteMachineShellConnection(servr_out)
415            remote.stop_server()
416            self.sleep(10)
417            task = self.async_create_index(self.buckets[0].name, query_definition)
418            task.result()
419            self.assertTrue(False, "Duplicate index should not be allowed when index node is down")
420        except Exception, ex:
421            self.log.info(ex)
422            remote = RemoteMachineShellConnection(servr_out)
423            remote.start_server()
424            self.assertTrue("Index test_ambiguity_in_gsi_indexes_due_to_node_down already exist" in str(ex),ex)
425