1import uuid
2import copy
3from tuqquery.tuq import QueryTests
4from couchbase_helper.documentgenerator import DocumentGenerator
5
6
7JOIN_INNER = "INNER"
8JOIN_LEFT = "LEFT"
9JOIN_RIGHT = "RIGHT"
10
11class JoinTests(QueryTests):
12    def setUp(self):
13        try:
14            super(JoinTests, self).setUp()
15            self.gens_tasks = self.generate_docs_tasks()
16            self.type_join = self.input.param("type_join", JOIN_INNER)
17        except Exception, ex:
18            self.log.error("ERROR SETUP FAILED: %s" % str(ex))
19            raise ex
20
21    def suite_setUp(self):
22        super(JoinTests, self).suite_setUp()
23        self.load(self.gens_tasks, start_items=self.num_items)
24
25    def tearDown(self):
26        super(JoinTests, self).tearDown()
27
28    def suite_tearDown(self):
29        super(JoinTests, self).suite_tearDown()
30
31    def test_simple_join_keys(self):
32        for bucket in self.buckets:
33            self.query = "SELECT employee.name, employee.tasks_ids, new_project " +\
34            "FROM %s as employee %s JOIN default.project as new_project " % (bucket.name, self.type_join) +\
35            "ON KEYS employee.tasks_ids"
36            actual_result = self.run_cbq_query()
37            actual_result = sorted(actual_result['results'])
38            full_list = self._generate_full_joined_docs_list(join_type=self.type_join)
39            expected_result = [doc for doc in full_list if not doc]
40            expected_result.extend([{"name" : doc['name'], "tasks_ids" : doc['tasks_ids'],
41                                     "new_project" : doc['project']}
42                                    for doc in full_list if doc and 'project' in doc])
43            expected_result.extend([{"name" : doc['name'], "tasks_ids" : doc['tasks_ids']}
44                                    for doc in full_list if doc and not 'project' in doc])
45            expected_result = sorted(expected_result)
46            self._verify_results(actual_result, expected_result)
47
48    def test_join_several_keys(self):
49        for bucket in self.buckets:
50            self.query = "SELECT employee.name, employee.tasks_ids, new_task.project, new_task.task_name " +\
51            "FROM %s as employee %s JOIN default as new_task " % (bucket.name, self.type_join) +\
52            "ON KEYS employee.tasks_ids"
53            actual_result = self.run_cbq_query()
54            actual_result = sorted(actual_result['results'])
55            full_list = self._generate_full_joined_docs_list(join_type=self.type_join)
56            expected_result = [doc for doc in full_list if not doc]
57            expected_result.extend([{"name" : doc['name'], "tasks_ids" : doc['tasks_ids'],
58                                     "project" : doc['project'], "task_name" : doc['task_name']}
59                                    for doc in full_list if doc and 'project' in doc])
60            expected_result.extend([{"name" : doc['name'], "tasks_ids" : doc['tasks_ids']}
61                                    for doc in full_list if doc and not 'project' in doc])
62            expected_result = sorted(expected_result)
63            self._verify_results(actual_result, expected_result)
64
65    def test_where_join_keys(self):
66        for bucket in self.buckets:
67            self.query = "SELECT employee.name, employee.tasks_ids, new_project_full.project new_project " +\
68            "FROM %s as employee %s JOIN default as new_project_full " % (bucket.name, self.type_join) +\
69            "ON KEYS employee.tasks_ids WHERE new_project_full.project == 'IT'"
70            actual_result = self.run_cbq_query()
71            actual_result = sorted(actual_result['results'])
72            expected_result = self._generate_full_joined_docs_list(join_type=self.type_join)
73            expected_result = [{"name" : doc['name'], "tasks_ids" : doc['tasks_ids'],
74                                "new_project" : doc['project']}
75                               for doc in expected_result if doc and 'project' in doc and\
76                               doc['project'] == 'IT']
77            expected_result = sorted(expected_result)
78            self._verify_results(actual_result, expected_result)
79
80    def test_join_unnest_alias(self):
81        for bucket in self.buckets:
82            self.query = "SELECT task2 FROM %s emp1 JOIN %s" % (bucket.name, bucket.name) +\
83            " task ON KEYS emp1.tasks_ids UNNEST emp1.tasks_ids as task2"
84            actual_result = self.run_cbq_query()
85            actual_result = sorted(actual_result['results'], key=lambda doc:(
86                                                               doc['task2']))
87            expected_result = self._generate_full_joined_docs_list()
88            expected_result = [{"task2" : task} for doc in expected_result
89                               for task in doc['tasks_ids']]
90            expected_result = sorted(expected_result, key=lambda doc:(
91                                                          doc['task2']))
92            self._verify_results(actual_result, expected_result)
93
94    def test_unnest(self):
95        for bucket in self.buckets:
96            self.query = "SELECT emp.name, task FROM %s emp %s UNNEST emp.tasks_ids task" % (bucket.name,self.type_join)
97            actual_result = self.run_cbq_query()
98            actual_result = sorted(actual_result['results'])
99            expected_result = self._generate_full_docs_list(self.gens_load)
100            expected_result = [{"task" : task, "name" : doc["name"]}
101                               for doc in expected_result for task in doc['tasks_ids']]
102            if self.type_join.upper() == JOIN_LEFT:
103                expected_result.extend([{}] * self.gens_tasks[-1].end)
104            expected_result = sorted(expected_result)
105            self._verify_results(actual_result, expected_result)
106
107    def test_subquery_select(self):
108        for bucket in self.buckets:
109            self.query = "select task_name, (select count(task_name) cn from %s d use keys %s) as names from %s" % (bucket.name, str(['test_task-%s' % i for i in xrange(0, 29)]),
110                                                                                                                    bucket.name)
111            self.run_cbq_query()
112            actual_result = self.run_cbq_query()
113            actual_result = sorted(actual_result['results'])
114            expected_result_subquery = {"cn" : 29}
115            expected_result = [{'names' : [expected_result_subquery]}] * len(self._generate_full_docs_list(self.gens_load))
116            expected_result.extend([{'task_name': doc['task_name'],
117                                     'names' : [expected_result_subquery]}
118                                    for doc in self._generate_full_docs_list(self.gens_tasks)])
119            expected_result = sorted(expected_result)
120            self._verify_results(actual_result, expected_result)
121
122    def test_subquery_where_aggr(self):
123        for bucket in self.buckets:
124            self.query = "select name, join_day from %s where join_day =" % (bucket.name) +\
125            " (select AVG(join_day) as average from %s d use keys %s)[0].average" % (bucket.name,
126                                                                               str(['query-test-Sales-%s' % i
127                                                                                    for i in xrange(0, self.docs_per_day)]))
128            all_docs_list = self._generate_full_docs_list(self.gens_load)
129            actual_result = self.run_cbq_query()
130            actual_result = sorted(actual_result['results'])
131            expected_result = [{'name' : doc['name'],
132                                'join_day' : doc['join_day']}
133                               for doc in all_docs_list
134                               if doc['job_title'] == 'Sales' and doc['join_day'] == 1]
135            expected_result = sorted(expected_result)
136            self._verify_results(actual_result, expected_result)
137
138    def test_subquery_where_in(self):
139        for bucket in self.buckets:
140            self.query = "select name, join_day from %s where join_day IN " % (bucket.name) +\
141            " (select ARRAY_AGG(join_day) as average from %s d use keys %s)[0].average" % (bucket.name,
142                                                                               str(['query-test-Sales-%s' % i
143                                                                                    for i in xrange(0, self.docs_per_day)]))
144            all_docs_list = self._generate_full_docs_list(self.gens_load)
145            actual_result = self.run_cbq_query()
146            actual_result = sorted(actual_result['results'])
147            expected_result = [{'name' : doc['name'],
148                                'join_day' : doc['join_day']}
149                               for doc in all_docs_list
150                               if doc['job_title'] == 'Sales' and doc['join_day'] == 1]
151            expected_result = sorted(expected_result)
152            self._verify_results(actual_result, expected_result)
153
154    def test_where_in_subquery(self):
155        for bucket in self.buckets:
156            self.query = "select name, tasks_ids from %s where tasks_ids[0] IN" % bucket.name +\
157            " (select ARRAY_AGG(DISTINCT task_name) as names from %s d " % bucket.name +\
158            "use keys %s where project='MB')[0].names" % ('["test_task-1", "test_task-2"]')
159            all_docs_list = self._generate_full_docs_list(self.gens_load)
160            actual_result = self.run_cbq_query()
161            actual_result = sorted(actual_result['results'])
162            expected_result = [{'name' : doc['name'],
163                                'tasks_ids' : doc['tasks_ids']}
164                               for doc in all_docs_list
165                               if doc['tasks_ids'] in ['test_task-1', 'test_task-2']]
166            expected_result = sorted(expected_result)
167            self._verify_results(actual_result, expected_result)
168##############################################################################################
169#
170#   KEY
171##############################################################################################
172
173    def test_keys(self):
174        for bucket in self.buckets:
175            keys_select = []
176            generator = copy.deepcopy(self.gens_tasks[0])
177            for i in xrange(5):
178                key, _ = generator.next()
179                keys_select.append(key)
180            self.query = 'select task_name FROM %s USE KEYS %s' % (bucket.name, keys_select)
181            actual_result = self.run_cbq_query()
182            actual_result = sorted(actual_result['results'], key=lambda doc: (
183                                                                       doc['task_name']))
184            full_list = self._generate_full_docs_list(self.gens_tasks, keys=keys_select)
185            expected_result = [{"task_name" : doc['task_name']} for doc in full_list]
186            expected_result = sorted(expected_result, key=lambda doc: (doc['task_name']))
187            self._verify_results(actual_result, expected_result)
188
189            keys_select.extend(["wrong"])
190            self.query = 'select task_name FROM %s USE KEYS %s' % (bucket.name, keys_select)
191            actual_result = self.run_cbq_query()
192            actual_result = sorted(actual_result['results'])
193            self._verify_results(actual_result, expected_result)
194
195            self.query = 'select task_name FROM %s USE KEYS ["wrong_one","wrong_second"]' % (bucket.name)
196            actual_result = self.run_cbq_query()
197            self.assertFalse(actual_result['results'], "Having a wrong key query returned some result")
198
199    def test_key_array(self):
200        for bucket in self.buckets:
201            gen_select = copy.deepcopy(self.gens_tasks[0])
202            key_select, value_select = gen_select.next()
203            self.query = 'SELECT * FROM %s USE KEYS ARRAY emp._id FOR emp IN [%s] END' % (bucket.name, value_select)
204            actual_result = self.run_cbq_query()
205            actual_result = sorted(actual_result['results'])
206            expected_result = self._generate_full_docs_list(self.gens_tasks, keys=[key_select])
207            expected_result = [{bucket.name : doc} for doc in expected_result]
208            expected_result = sorted(expected_result)
209            self._verify_results(actual_result, expected_result)
210
211            key2_select, value2_select = gen_select.next()
212            self.query = 'SELECT * FROM %s USE KEYS ARRAY emp._id FOR emp IN [%s,%s] END' % (bucket.name,
213                                                                                      value_select,
214                                                                                      value2_select)
215            actual_result = self.run_cbq_query()
216            actual_result = sorted(actual_result['results'])
217            expected_result = self._generate_full_docs_list(self.gens_tasks, keys=[key_select, key2_select])
218            expected_result = [{bucket.name : doc} for doc in expected_result]
219            expected_result = sorted(expected_result)
220            self._verify_results(actual_result, expected_result)
221
222##############################################################################################
223#
224#   NEST
225##############################################################################################
226
227
228    def test_simple_nest_keys(self):
229        for bucket in self.buckets:
230            self.query = "SELECT * FROM %s emp %s NEST %s tasks " % (
231                                                bucket.name, self.type_join, bucket.name) +\
232                         "ON KEYS emp.tasks_ids"
233            actual_result = self.run_cbq_query()
234            actual_result = self.sort_nested_list(actual_result['results'])
235            actual_result = sorted(actual_result, key=lambda doc:
236                                   self._get_for_sort(doc))
237            self._delete_ids(actual_result)
238            full_list = self._generate_full_nested_docs_list(join_type=self.type_join)
239            expected_result = [{"emp" : doc['item'], "tasks" : doc['items_nested']}
240                               for doc in full_list if doc and 'items_nested' in doc]
241            expected_result.extend([{"emp" : doc['item']}
242                                    for doc in full_list if not 'items_nested' in doc])
243            expected_result = sorted(expected_result, key=lambda doc:
244                                   self._get_for_sort(doc))
245            self._delete_ids(expected_result)
246            self._verify_results(actual_result, expected_result)
247
248    def test_simple_nest_key(self):
249        for bucket in self.buckets:
250            self.query = "SELECT * FROM %s emp %s NEST %s tasks " % (
251                                                bucket.name, self.type_join, bucket.name) +\
252                         "KEY emp.tasks_ids[0]"
253            actual_result = self.run_cbq_query()
254            actual_result = sorted(actual_result['results'], key=lambda doc:
255                                                            self._get_for_sort(doc))
256            self._delete_ids(actual_result)
257            full_list = self._generate_full_nested_docs_list(particular_key=0,
258                                                             join_type=self.type_join)
259            expected_result = [{"emp" : doc['item'], "tasks" : doc['items_nested']}
260                               for doc in full_list if doc and 'items_nested' in doc]
261            expected_result.extend([{"emp" : doc['item']}
262                                    for doc in full_list if not 'items_nested' in doc])
263            expected_result = sorted(expected_result, key=lambda doc:
264                                                            self._get_for_sort(doc))
265            self._delete_ids(expected_result)
266            self._verify_results(actual_result, expected_result)
267
268    def test_nest_keys_with_array(self):
269        for bucket in self.buckets:
270            self.query = "select emp.name, ARRAY item.project FOR item in items end projects " +\
271                         "FROM %s emp %s NEST %s items " % (
272                                                    bucket.name, self.type_join, bucket.name) +\
273                         "ON KEYS emp.tasks_ids"
274            actual_result = self.run_cbq_query()
275            actual_result = self.sort_nested_list(actual_result['results'])
276            actual_result = sorted(actual_result)
277            full_list = self._generate_full_nested_docs_list(join_type=self.type_join)
278            expected_result = [{"name" : doc['item']['name'],
279                                "projects" : [nested_doc['project'] for nested_doc in doc['items_nested']]}
280                               for doc in full_list if doc and 'items_nested' in doc]
281            expected_result.extend([{} for doc in full_list if not 'items_nested' in doc])
282            expected_result = sorted(expected_result)
283            self._verify_results(actual_result, expected_result)
284
285    def test_nest_keys_where(self):
286        for bucket in self.buckets:
287            self.query = "select emp.name, ARRAY item.project FOR item in items end projects " +\
288                         "FROM %s emp %s NEST %s items " % (
289                                                    bucket.name, self.type_join, bucket.name) +\
290                         "ON KEYS emp.tasks_ids where ANY item IN items SATISFIES item.project == 'CB' end"
291            actual_result = self.run_cbq_query()
292            actual_result = sorted(actual_result['results'], key=lambda doc: (doc['name'], doc['projects']))
293            full_list = self._generate_full_nested_docs_list(join_type=self.type_join)
294            expected_result = [{"name" : doc['item']['name'],
295                                "projects" : [nested_doc['project'] for nested_doc in doc['items_nested']]}
296                               for doc in full_list if doc and 'items_nested' in doc and\
297                               len([nested_doc for nested_doc in doc['items_nested']
298                                    if nested_doc['project'] == 'CB']) > 0]
299            expected_result = sorted(expected_result, key=lambda doc: (doc['name'], doc['projects']))
300            self._verify_results(actual_result, expected_result)
301
302    def _get_for_sort(self, doc):
303        if not 'emp' in doc:
304            return ''
305        if 'name' in doc['emp']:
306            return doc['emp']['name'], doc['emp']['join_yr'],\
307                   doc['emp']['join_mo'], doc['emp']['job_title']
308        else:
309            return doc['emp']['task_name']
310
311    def _delete_ids(self, result):
312        for item in result:
313            if 'emp' in item:
314                del item['emp']['_id']
315            if 'tasks' in item:
316                for task in item['tasks']:
317                    if task and '_id' in task:
318                        del task['_id']
319
320    def generate_docs(self, docs_per_day, start=0):
321        generators = []
322        types = ['Engineer', 'Sales', 'Support']
323        join_yr = [2010, 2011]
324        join_mo = xrange(1, 12 + 1)
325        join_day = xrange(1, 28 + 1)
326        template = '{{ "name":"{0}", "join_yr":{1}, "join_mo":{2}, "join_day":{3},'
327        template += ' "job_title":"{4}", "tasks_ids":{5}}}'
328        for info in types:
329            for year in join_yr:
330                for month in join_mo:
331                    for day in join_day:
332                        name = ["employee-%s" % (str(day))]
333                        tasks_ids = ["test_task-%s" % day, "test_task-%s" % (day + 1)]
334                        generators.append(DocumentGenerator("query-test-%s-%s-%s-%s" % (info, year, month, day),
335                                               template,
336                                               name, [year], [month], [day],
337                                               [info], [tasks_ids],
338                                               start=start, end=docs_per_day))
339        return generators
340
341    def generate_docs_tasks(self):
342        generators = []
343        start, end = 0, (28 + 1)
344        template = '{{ "task_name":"{0}", "project": "{1}"}}'
345        generators.append(DocumentGenerator("test_task", template,
346                                            ["test_task-%s" % i for i in xrange(0,10)],
347                                            ["CB"],
348                                            start=start, end=10))
349        generators.append(DocumentGenerator("test_task", template,
350                                            ["test_task-%s" % i for i in xrange(10,20)],
351                                            ["MB"],
352                                            start=10, end=20))
353        generators.append(DocumentGenerator("test_task", template,
354                                            ["test_task-%s" % i for i in xrange(20,end)],
355                                            ["IT"],
356                                            start=20, end=end))
357        return generators
358
359    def _generate_full_joined_docs_list(self, join_type=JOIN_INNER,
360                                        particular_key=None):
361        joined_list = []
362        all_docs_list = self._generate_full_docs_list(self.gens_load)
363        if join_type.upper() == JOIN_INNER:
364            for item in all_docs_list:
365                keys = item["tasks_ids"]
366                if particular_key is not None:
367                    keys=[item["tasks_ids"][particular_key]]
368                tasks_items = self._generate_full_docs_list(self.gens_tasks, keys=keys)
369                for tasks_item in tasks_items:
370                    item_to_add = copy.deepcopy(item)
371                    item_to_add.update(tasks_item)
372                    joined_list.append(item_to_add)
373        elif join_type.upper() == JOIN_LEFT:
374            for item in all_docs_list:
375                keys = item["tasks_ids"]
376                if particular_key is not None:
377                    keys=[item["tasks_ids"][particular_key]]
378                tasks_items = self._generate_full_docs_list(self.gens_tasks, keys=keys)
379                for key in keys:
380                    item_to_add = copy.deepcopy(item)
381                    if key in [doc["_id"] for doc in tasks_items]:
382                        item_to_add.update([doc for doc in tasks_items if key == doc['_id']][0])
383                    joined_list.append(item_to_add)
384            joined_list.extend([{}] * self.gens_tasks[-1].end)
385        elif join_type.upper() == JOIN_RIGHT:
386            raise Exception("RIGHT JOIN doen't exists in corrunt implementation")
387        else:
388            raise Exception("Unknown type of join")
389        return joined_list
390
391    def _generate_full_nested_docs_list(self, join_type=JOIN_INNER,
392                                        particular_key=None):
393        nested_list = []
394        all_docs_list = self._generate_full_docs_list(self.gens_load)
395        if join_type.upper() == JOIN_INNER:
396            for item in all_docs_list:
397                keys = item["tasks_ids"]
398                if particular_key is not None:
399                    keys=[item["tasks_ids"][particular_key]]
400                tasks_items = self._generate_full_docs_list(self.gens_tasks, keys=keys)
401                if tasks_items:
402                    nested_list.append({"items_nested" : tasks_items,
403                                        "item" : item})
404        elif join_type.upper() == JOIN_LEFT:
405            for item in all_docs_list:
406                keys = item["tasks_ids"]
407                if particular_key is not None:
408                    keys=[item["tasks_ids"][particular_key]]
409                tasks_items = self._generate_full_docs_list(self.gens_tasks, keys=keys)
410                if tasks_items:
411                    nested_list.append({"items_nested" : tasks_items,
412                                        "item" : item})
413            tasks_doc_list = self._generate_full_docs_list(self.gens_tasks)
414            for item in tasks_doc_list:
415                nested_list.append({"item" : item})
416        elif join_type.upper() == JOIN_RIGHT:
417            raise Exception("RIGHT JOIN doen't exists in corrunt implementation")
418        else:
419            raise Exception("Unknown type of join")
420        return nested_list
421