1from tuq import QueryTests
2
3
4class QueryMiscTests(QueryTests):
5
6    def setUp(self):
7        super(QueryMiscTests, self).setUp()
8        self.log.info("==============  QueriesIndexTests setup has started ==============")
9        self.log.info("==============  QueriesIndexTests setup has completed ==============")
10        self.log_config_info()
11
12    def suite_setUp(self):
13        super(QueryMiscTests, self).suite_setUp()
14        self.log.info("==============  QueriesIndexTests suite_setup has started ==============")
15        self.log.info("==============  QueriesIndexTests suite_setup has completed ==============")
16        self.log_config_info()
17
18    def tearDown(self):
19        self.log_config_info()
20        self.log.info("==============  QueriesIndexTests tearDown has started ==============")
21        self.log.info("==============  QueriesIndexTests tearDown has completed ==============")
22        super(QueryMiscTests, self).tearDown()
23
24    def suite_tearDown(self):
25        self.log_config_info()
26        self.log.info("==============  QueriesIndexTests suite_tearDown has started ==============")
27        self.log.info("==============  QueriesIndexTests suite_tearDown has completed ==============")
28        super(QueryMiscTests, self).suite_tearDown()
29
30    '''MB-30946: Empty array from index scan not working properly when backfill is used'''
31    def test_empty_array_low_scancap(self):
32        createdIndex = False
33        createdBucket = False
34        try:
35            temp_bucket_params = self._create_bucket_params(server=self.master, size=self.bucket_size,
36                                                            replicas=self.num_replicas, bucket_type=self.bucket_type,
37                                                            enable_replica_index=self.enable_replica_index,
38                                                            eviction_policy=self.eviction_policy, lww=self.lww)
39            self.cluster.create_standard_bucket("temp_bucket", 11222, temp_bucket_params)
40            createdBucket = True
41            self.query = 'INSERT INTO temp_bucket VALUES(UUID(),{"severity":"low","deferred":[]}),' \
42                         'VALUES(UUID(),{"severity":"low","deferred":[]}),' \
43                         'VALUES(UUID(),{"severity":"low","deferred":[]}),' \
44                         'VALUES(UUID(),{"severity":"low","deferred":[]}),' \
45                         'VALUES(UUID(),{"severity":"low","deferred":[]}),' \
46                         'VALUES(UUID(),{"severity":"low","deferred":[]}),' \
47                         'VALUES(UUID(),{"severity":"low","deferred":[]}),' \
48                         'VALUES(UUID(),{"severity":"low","deferred":[]}),' \
49                         'VALUES(UUID(),{"severity":"low","deferred":[]}),' \
50                         'VALUES(UUID(),{"severity":"low","deferred":[]});'
51            res = self.run_cbq_query()
52
53            self.query = 'CREATE INDEX ix1 ON temp_bucket(severity,deferred);'
54            res = self.run_cbq_query()
55
56            self._wait_for_index_online("temp_bucket", "ix1")
57            createdIndex = True
58
59            self.query = 'SELECT META().id, deferred FROM temp_bucket WHERE severity = "low";'
60            expect_res1 = self.run_cbq_query()
61            self.assertEqual(len(expect_res1['results']), 10)
62            for item in expect_res1['results']:
63                self.assertEqual(item["deferred"], [])
64
65            self.query = 'SELECT META().id, deferred FROM temp_bucket WHERE severity = "low" AND EVERY v IN deferred SATISFIES v != "X" END;'
66            expect_res2 = self.run_cbq_query()
67            self.assertEqual(len(expect_res2['results']), 10)
68            for item in expect_res2['results']:
69                self.assertEqual(item["deferred"], [])
70
71            self.query = 'SELECT META().id, deferred FROM temp_bucket WHERE severity = "low";'
72            actual_res1 = self.run_cbq_query(query_params={"scan_cap": 2})
73            self.assertEqual(actual_res1['results'], expect_res1['results'])
74
75            self.query = 'SELECT META().id, deferred FROM temp_bucket WHERE severity = "low" AND EVERY v IN deferred SATISFIES v != "X" END;'
76            actual_res2 = self.run_cbq_query(query_params={"scan_cap": 2})
77            self.assertEqual(actual_res2['results'], expect_res2['results'])
78        finally:
79            if createdIndex:
80                self.query = 'DROP INDEX temp_bucket.ix1'
81                self.run_cbq_query()
82                self.wait_for_index_drop("temp_bucket", "ix1", ["severity", "deferred"], self.gsi_type)
83            if createdBucket:
84                self.cluster.bucket_delete(self.master, "temp_bucket")
85
86    '''MB-28636: query with OrderedIntersect scan returns empty result set intermittently'''
87    def test_orderintersectscan_nonempty_results(self):
88        createdIndexes = {}
89        createdBucket = False
90        try:
91            temp_bucket_params = self._create_bucket_params(server=self.master, size=self.bucket_size,
92                                                            replicas=self.num_replicas, bucket_type=self.bucket_type,
93                                                            enable_replica_index=self.enable_replica_index,
94                                                            eviction_policy=self.eviction_policy, lww=self.lww)
95            self.cluster.create_standard_bucket("temp_bucket", 11222, temp_bucket_params)
96            createdBucket = True
97
98            self.query = 'CREATE primary index ON temp_bucket'
99            self.run_cbq_query()
100            self._wait_for_index_online("temp_bucket", "#primary")
101            createdIndexes["#primary"] = []
102
103            self.query = 'INSERT INTO temp_bucket VALUES(UUID(), {"CUSTOMER_ID":551,"MSISDN":UUID(), "ICCID":UUID()})'
104            self.run_cbq_query()
105
106            self.query = 'INSERT INTO temp_bucket (KEY UUID(), VALUE d) SELECT {"CUSTOMER_ID":551,"MSISDN":UUID(),"ICCID":UUID()} AS d  FROM temp_bucket WHERE CUSTOMER_ID == 551;'
107            for i in range(0, 8):
108                self.run_cbq_query()
109            self.query = 'select * from temp_bucket'
110            res = self.run_cbq_query()
111            self.assertEqual(len(res['results']), 256)
112
113            self.query = 'CREATE INDEX `xi1` ON `temp_bucket`(`CUSTOMER_ID`,`MSISDN`);'
114            self.run_cbq_query()
115            self._wait_for_index_online("temp_bucket", "xi1")
116            createdIndexes["xi1"] = ['CUSTOMER_ID', 'MSISDN']
117
118            self.query = 'CREATE INDEX `ai_SIM` ON `temp_bucket`(distinct pairs({`CUSTOMER_ID`, `ICCID`, `MSISDN` }));'
119            self.run_cbq_query()
120            self._wait_for_index_online("temp_bucket", "ai_SIM")
121            createdIndexes["ai_SIM"] = ['distinct pairs{"CUSTOMER_ID": CUSTOMER_ID, "ICCID": ICCID, "MSISDN": MSISDN}']
122
123            self.query = 'select TIM_ID, MSISDN from temp_bucket WHERE CUSTOMER_ID = 551 ORDER BY MSISDN ASC LIMIT 2 OFFSET 0 '
124            for i in range(0, 100):
125                res = self.run_cbq_query()
126                self.assertEqual(len(res['results']), 2)
127        finally:
128            for index in createdIndexes.keys():
129                if index == "#primary":
130                    self.query = "DROP primary index on temp_bucket"
131                else:
132                    self.query = 'DROP INDEX temp_bucket.'+str(index)
133                self.run_cbq_query()
134                self.wait_for_index_drop("temp_bucket", index, createdIndexes[index], self.gsi_type)
135            if createdBucket:
136                self.cluster.bucket_delete(self.master, "temp_bucket")
137
138    def test_intersectscan_thread_growth(self):
139        createdIndexes = {}
140        createdBucket = False
141        try:
142            temp_bucket_params = self._create_bucket_params(server=self.master, size=self.bucket_size,
143                                                            replicas=self.num_replicas, bucket_type=self.bucket_type,
144                                                            enable_replica_index=self.enable_replica_index,
145                                                            eviction_policy=self.eviction_policy, lww=self.lww)
146            self.cluster.create_standard_bucket("temp_bucket", 11222, temp_bucket_params)
147            createdBucket = True
148
149            self.query = 'CREATE primary index ON temp_bucket'
150            self.run_cbq_query()
151            self._wait_for_index_online("temp_bucket", "#primary")
152            createdIndexes["#primary"] = []
153
154            self.query = 'CREATE INDEX ix1 ON `temp_bucket`(a)'
155            self.run_cbq_query()
156            self._wait_for_index_online("temp_bucket", "ix1")
157            createdIndexes["ix1"] = ['a']
158
159            self.query = 'CREATE INDEX ix2 ON `temp_bucket`(b)'
160            self.run_cbq_query()
161            self._wait_for_index_online("temp_bucket", "ix2")
162            createdIndexes["ix2"] = ['b']
163
164            for i in range(0, 20000):
165                self.query = 'INSERT INTO `temp_bucket` (KEY, VALUE) VALUES ("'+str(i)+'", {"a":'+str(i)+', "b":'+str(i)+'})'
166                self.run_cbq_query()
167            self.query = 'select * from temp_bucket'
168            res = self.run_cbq_query()
169            self.assertEqual(len(res['results']), 20000)
170
171            self.query = 'explain select * from `temp_bucket` where a > 0 and b > 0 limit 1000'
172            res = self.run_cbq_query()
173            self.assertTrue("IntersectScan" in str(res['results'][0]['plan']))
174            self.assertTrue("ix1" in str(res['results'][0]['plan']))
175            self.assertTrue("ix2" in str(res['results'][0]['plan']))
176
177            self.log.info("priming query engine")
178            self.query = 'select * from `temp_bucket` where a > 0 and b > 0 limit 1000'
179            for i in range(0, 10):
180                res = self.run_cbq_query()
181                self.assertEqual(len(res['results']), 1000)
182            port_for_version = "8093"
183            cbversion = self.cb_version.split(".")
184            major_version = cbversion[0]
185            minor_version = cbversion[1]
186            if (int(major_version) < 5) or (int(major_version) == 5 and int(minor_version) < 5):
187                port_for_version = "6060"
188            curl_cmd = "curl -u Administrator:password http://localhost:"+port_for_version+"/debug/pprof/goroutine?debug=2"
189            curl_output = self.shell.execute_command(curl_cmd)
190            pprof_list = curl_output[0]
191            count_intersect = 0
192            for item in pprof_list:
193                if "Intersect" in str(item) or "intersect" in str(item):
194                    count_intersect = count_intersect + 1
195            self.log.info("number of intersect threads after primed: "+str(count_intersect))
196
197            self.sleep(10)
198
199            # run query 1000 times now
200            for i in range(0, 1000):
201                res = self.run_cbq_query()
202                self.assertEqual(len(res['results']), 1000)
203
204            curl_output = self.shell.execute_command(curl_cmd)
205            pprof_list = curl_output[0]
206            count_intersect_a = 0
207            for item in pprof_list:
208                if "Intersect" in str(item) or "intersect" in str(item):
209                    count_intersect_a = count_intersect_a + 1
210
211            self.log.info("number of intersect threads A: "+str(count_intersect_a))
212            self.assertEqual(count_intersect_a, 0)
213
214            self.sleep(60)
215
216            curl_output = self.shell.execute_command(curl_cmd)
217            pprof_list = curl_output[0]
218            count_intersect_b = 0
219            for item in pprof_list:
220                if "Intersect" in str(item) or "intersect" in str(item):
221                    count_intersect_b = count_intersect_b + 1
222            self.log.info("number of intersect threads B: "+str(count_intersect_b))
223            self.assertEqual(count_intersect_b, 0)
224        finally:
225            for index in createdIndexes.keys():
226                if index == "#primary":
227                    self.query = "DROP primary index on temp_bucket"
228                else:
229                    self.query = 'DROP INDEX temp_bucket.'+str(index)
230                self.run_cbq_query()
231                self.wait_for_index_drop("temp_bucket", index, createdIndexes[index], self.gsi_type)
232            if createdBucket:
233                self.cluster.bucket_delete(self.master, "temp_bucket")