1import unittest
2import sys
3import time
4import types
5import logger
6import json
7from collections import defaultdict
8
9from TestInput import TestInputSingleton
10from couchbase_helper.document import View
11from membase.api.rest_client import RestConnection, Bucket
12from membase.helper.cluster_helper import ClusterOperationHelper
13from mc_bin_client import MemcachedClient
14from memcached.helper.data_helper import MemcachedClientHelper
15from basetestcase import BaseTestCase
16from membase.api.exception import QueryViewException
17from membase.helper.rebalance_helper import RebalanceHelper
18
19
20class ViewMergingTests(BaseTestCase):
21
22    def setUp(self):
23        try:
24            if 'first_case' not in TestInputSingleton.input.test_params:
25                TestInputSingleton.input.test_params['default_bucket'] = False
26                TestInputSingleton.input.test_params['skip_cleanup'] = True
27            self.default_bucket_name = 'default'
28            super(ViewMergingTests, self).setUp()
29            if 'first_case' in TestInputSingleton.input.test_params:
30                self.cluster.rebalance(self.servers[:], self.servers[1:], [])
31            # We use only one bucket in this test suite
32            self.rest = RestConnection(self.master)
33            self.bucket = self.rest.get_bucket(Bucket(name=self.default_bucket_name))
34            # num_docs must be a multiple of the number of vbuckets
35            self.num_docs = self.input.param("num_docs_per_vbucket", 1) * \
36                            len(self.bucket.vbuckets)
37            self.is_dev_view = self.input.param("is_dev_view", False)
38            self.map_view_name = 'mapview1'
39            self.red_view_name = 'redview1'
40            self.red_view_stats_name = 'redview_stats'
41            self.clients = self.init_clients()
42            if 'first_case' in TestInputSingleton.input.test_params:
43                 self.create_ddocs(False)
44                 self.create_ddocs(True)
45        except Exception as ex:
46            self.input.test_params["stop-on-failure"] = True
47            self.log.error("SETUP WAS FAILED. ALL TESTS WILL BE SKIPPED")
48            self.fail(ex)
49
50    def tearDown(self):
51        # clean up will only performed on the last run
52        if 'last_case' in TestInputSingleton.input.test_params:
53            TestInputSingleton.input.test_params['skip_cleanup'] = False
54            super(ViewMergingTests, self).tearDown()
55        else:
56            self.cluster.shutdown(force=True)
57            self._log_finish(self)
58
59    def test_empty_vbuckets(self):
60        results = self.merged_query(self.map_view_name)
61        self.assertEquals(results.get(u'total_rows', None), 0)
62        self.assertEquals(len(results.get(u'rows', None)), 0)
63
64    def test_nonexisting_views(self):
65        view_names = ['mapview2', 'mapview3', 'mapview4', 'mapview5']
66        for view_name in view_names:
67            try:
68                self.merged_query(view_name)
69            except QueryViewException:
70                self.log.info("QueryViewException is raised as expected")
71            except Exception:
72                self.assertFail("QueryViewException is expected, but not raised")
73            else:
74                self.assertFail("QueryViewException is expected, but not raised")
75
76    def test_non_empty_view(self):
77        num_vbuckets = len(self.rest.get_vbuckets(self.bucket))
78        docs = ViewMergingTests.make_docs(1, self.num_docs + 1)
79        self.populate_sequenced(num_vbuckets, docs)
80        results = self.merged_query(self.map_view_name)
81        self.assertEquals(results.get(u'total_rows', 0), self.num_docs)
82        self.assertEquals(len(results.get(u'rows', [])), self.num_docs)
83
84    def test_queries(self):
85        all_query_params = ['skip', 'limit', 'startkey', 'endkey', 'startkey_docid',
86                        'endkey_docid', 'inclusive_end', 'descending', 'key']
87        current_params = {}
88        for key in self.input.test_params:
89            if key in all_query_params:
90                current_params[key] = str(self.input.test_params[key])
91        results = self.merged_query(self.map_view_name, current_params)
92        self.verify_results(results, current_params)
93
94    def test_keys(self):
95        keys = [5, 3, 10, 39, 666666, 21]
96        results = self.merged_query(self.map_view_name,
97                                    {"keys": keys})
98        self.verify_results(results, {"keys": keys})
99
100    def test_include_docs(self):
101        results = self.merged_query(self.map_view_name,
102                                    {"include_docs": "true"})
103        self.verify_results(results, {"include_docs": "true"})
104        num = 1
105        for row in results.get(u'rows', []):
106            self.assertEquals(row['doc']['json']['integer'], num)
107            self.assertEquals(row['doc']['json']['string'], str(num))
108            self.assertEquals(row['doc']['meta']['id'], str(num))
109            num += 1
110
111    def test_queries_reduce(self):
112        all_query_params = ['skip', 'limit', 'startkey', 'endkey', 'startkey_docid',
113                        'endkey_docid', 'inclusive_end', 'descending', 'reduce',
114                        'group', 'group_level']
115        current_params = {}
116        for key in self.input.test_params:
117            if key in all_query_params:
118                current_params[key] = str(self.input.test_params[key])
119        results = self.merged_query(self.red_view_name, current_params)
120        self.verify_results_reduce(results, current_params)
121
122    def test_stale_ok_alternated_docs(self):
123        num_vbuckets = len(self.rest.get_vbuckets(self.bucket))
124        docs = ViewMergingTests.make_docs(self.num_docs + 1, self.num_docs + 3)
125        self.populate_alternated(num_vbuckets, docs)
126        results = self.merged_query(self.map_view_name, {'stale': 'ok'})
127        self.assertEquals(results.get(u'total_rows', None), self.num_docs)
128        self.assertEquals(len(results.get(u'rows', None)), self.num_docs)
129        self.verify_keys_are_sorted(results)
130
131    def test_stale_update_after_alternated_docs(self):
132        results = self.merged_query(self.map_view_name, {'stale': 'update_after'})
133        self.assertEquals(results.get(u'total_rows', None), self.num_docs)
134        self.assertEquals(len(results.get(u'rows', None)), self.num_docs)
135        time.sleep(1)
136        results = self.merged_query(self.map_view_name, {'stale': 'ok'})
137        self.assertEquals(results.get(u'total_rows', None), self.num_docs + 2)
138        self.assertEquals(len(results.get(u'rows', None)), self.num_docs + 2)
139        self.verify_keys_are_sorted(results)
140
141    def test_stats_error(self):
142        nodes = self.input.param("num_nodes", 0)
143        params = {'stale': 'false', 'on_error': 'stop'}
144        if nodes == 1:
145            try:
146                self.merged_query(self.red_view_stats_name, params=params, ddoc='test2')
147                self.assertTrue(False, "Expected exception when querying _stats view")
148            except QueryViewException as ex:
149                # Couchbase version < 3.0
150                if "Builtin _stats" in ex:
151                    expectedStr = 'Error occured querying view ' + self.red_view_stats_name + \
152                        ': {"error":"error","reason":"Builtin _stats function requires map' + \
153                        ' values to be numbers"}'
154                else:
155                    expectedStr = 'Error occured querying view ' + self.red_view_stats_name + \
156                            ': {"error":"error","reason":"Reducer: Error building index for view `' + \
157                            self.red_view_stats_name + '`, reason: Value is not a number (key \\"1\\")"}'
158
159                self.assertEquals(str(ex).strip("\n"), expectedStr)
160        else:
161            self.assertTrue(nodes > 1)
162            results = self.merged_query(self.red_view_stats_name, params=params, ddoc='test2')
163            self.assertEquals(len(results.get(u'rows', None)), 0)
164            self.assertEquals(len(results.get(u'errors', None)), 1)
165
166    def test_dev_view(self):
167        # A lot of additional documents are needed in order to trigger the
168        # dev views. If the number is too low, production views will be used
169        docs = ViewMergingTests.make_docs(0, self.num_docs)
170        num_vbuckets = len(self.rest.get_vbuckets(self.bucket))
171        self.populate_alternated(num_vbuckets, docs)
172        results = self.merged_query(self.map_view_name, {})
173        self.verify_results_dev(results)
174
175    def test_view_startkey_endkey_validation(self):
176        """Regression test for MB-6591
177
178        This tests makes sure that the validation of startkey/endkey works
179        with view, which uses Unicode collation. Return results
180        when startkey is smaller than endkey. When endkey is smaller than
181        the startkey and exception should be raised. With Unicode collation
182        "foo" < "Foo", with raw collation "Foo" < "foo".
183        """
184        startkey = '"foo"'
185        endkey = '"Foo"'
186        params = {"startkey": startkey, "endkey": endkey}
187        results = self.merged_query(self.map_view_name, params)
188        self.assertTrue('rows' in results, "Results were returned")
189
190        # Flip startkey and endkey
191        params = {"startkey": endkey, "endkey": startkey}
192        self.assertRaises(Exception, self.merged_query, self.map_view_name,
193                          params)
194
195    def calculate_matching_keys(self, params):
196        keys = range(1, self.num_docs + 1)
197        if 'descending' in params:
198            keys.reverse()
199        if 'startkey' in params:
200            if params['startkey'].find('[') > -1:
201                key = eval(params['startkey'])[0]
202            else:
203                key = int(params['startkey'])
204            keys = keys[keys.index(key):]
205            if 'startkey_docid' in params:
206                if params['startkey_docid'] > params['startkey']:
207                    keys = keys[1:]
208        if 'endkey' in params:
209            if params['endkey'].find('[') > -1:
210                key = eval(params['endkey'])[0]
211            else:
212                key = int(params['endkey'])
213            keys = keys[:keys.index(key) + 1]
214            if 'endkey_docid' in params:
215                if params['endkey_docid'] < params['endkey']:
216                    keys = keys[:-1]
217        if 'inclusive_end' in params and params['inclusive_end'] == 'false':
218            keys = keys[:-1]
219        if 'skip' in params:
220            keys = keys[(int(params['skip'])):]
221        if 'limit' in params:
222            keys = keys[:(int(params['limit']))]
223        if 'key' in params:
224            if int(params['key']) <= self.num_docs:
225                keys = [int(params['key']), ]
226            else:
227                keys = []
228        if 'keys' in params:
229            keys = []
230            for k in params['keys']:
231                if int(k) <= self.num_docs:
232                    keys.append(int(k))
233            # When ?keys=[...] parameter is sent, rows are not guaranteed to come
234            # sorted by key.
235            keys.sort()
236        return keys
237
238    def verify_results(self, results, params):
239        expected = self.calculate_matching_keys(params)
240        self.assertEquals(results.get(u'total_rows', 0), self.num_docs,
241                          "total_rows parameter is wrong, expected %d, actual %d"
242                          % (self.num_docs, results.get(u'total_rows', 0)))
243        self.assertEquals(len(results.get(u'rows', [])), len(expected),
244                          "Rows number is wrong, expected %d, actual %d"
245                          % (len(expected), len(results.get(u'rows', []))))
246        if expected:
247            if 'keys' not in params:
248                # first
249                self.assertEquals(results.get(u'rows', [])[0]['key'], expected[0],
250                                  "First row key is wrong, expected %d, actual %d"
251                                  % (expected[0], results.get(u'rows', [])[0]['key']))
252                # last
253                self.assertEquals(results.get(u'rows', [])[-1]['key'], expected[-1],
254                                  "Last row key is wrong, expected %d, actual %d"
255                                  % (expected[-1], results.get(u'rows', [])[-1]['key']))
256                desc = 'descending' in params and params['descending'] == 'true'
257                self.verify_keys_are_sorted(results, desc=desc)
258            else:
259                actual = sorted([row[u'key'] for row in results.get(u'rows', [])])
260                self.assertEquals(actual, expected,
261                                  "Results are wrong, expected %s, actual %s"
262                                  % (expected, results.get(u'rows', [])))
263
264    def verify_results_dev(self, results):
265        # A development view is always a subset of the production view,
266        # hence only check for that (and not the exact items)
267        expected = self.calculate_matching_keys({})
268        self.assertTrue(len(results.get(u'rows', [])) < len(expected) and
269                        len(results.get(u'rows', [])) > 0,
270                          ("Rows number is wrong, expected to be lower than "
271                           "%d and >0, but it was %d"
272                          % (len(expected), len(results.get(u'rows', [])))))
273        self.assertTrue(
274            results.get(u'rows', [])[0]['key'] != expected[0] or
275            results.get(u'rows', [])[-1]['key'] != expected[-1],
276            "Dev view should be a subset, but returned the same as "
277            "the production view")
278        self.verify_keys_are_sorted(results)
279
280    def verify_results_reduce(self, results, params):
281        if 'reduce' in params and params['reduce'] == 'false' or \
282          'group_level' in params or 'group' in params and params['group'] == 'true':
283            expected = self.calculate_matching_keys(params)
284            self.assertEquals(len(results.get(u'rows', [])), len(expected),
285                          "Rows number is wrong, expected %d, actual %d"
286                          % (len(expected), len(results.get(u'rows', []))))
287            if expected:
288                #first
289                self.assertEquals(results.get(u'rows', [])[0]['key'][0], expected[0],
290                                  "First element is wrong, expected %d, actual %d"
291                                  % (expected[0], results.get(u'rows', [])[0]['key'][0]))
292                #last
293                self.assertEquals(results.get(u'rows', [])[-1]['key'][0], expected[-1],
294                                  "Last element is wrong, expected %d, actual %d"
295                                  % (expected[-1], results.get(u'rows', [])[-1]['key'][0]))
296        else:
297            expected = self.calculate_matching_keys(params)
298            self.assertEquals(results.get(u'rows', [])[0][u'value'], len(expected),
299                              "Value for reduce is incorrect. Expected %s, actual %s"
300                              % (len(expected), results.get(u'rows', [])[0][u'value']))
301
302    def create_ddocs(self, is_dev_view):
303        mapview = View(self.map_view_name, '''function(doc) {
304             emit(doc.integer, doc.string);
305          }''', dev_view=is_dev_view)
306        self.cluster.create_view(self.master, 'test', mapview)
307        redview = View(self.red_view_name, '''function(doc) {
308             emit([doc.integer, doc.string], doc.integer);
309          }''', '''_count''', dev_view=is_dev_view)
310        self.cluster.create_view(self.master, 'test', redview)
311        redview_stats = View(self.red_view_stats_name, '''function(doc) {
312             emit(doc.string, doc.string);
313          }''', '''_stats''', dev_view=is_dev_view)
314        self.cluster.create_view(self.master, 'test2', redview_stats)
315        RebalanceHelper.wait_for_persistence(self.master, self.bucket, 0)
316
317    def init_clients(self):
318        """Initialise clients for all servers there are vBuckets on
319
320        It returns a dict with 'ip:port' as key (this information is also
321        stored this way in every vBucket in the `master` property) and
322        the MemcachedClient as the value
323        """
324        clients = {}
325
326        for vbucket in self.bucket.vbuckets:
327            if vbucket.master not in clients:
328                ip, port = vbucket.master.split(':')
329                clients[vbucket.master] = MemcachedClient(ip, int(port))
330        return clients
331
332    def populate_alternated(self, num_vbuckets, docs):
333        """Every vBucket gets a doc first
334
335        Populating the vBuckets alternated means that every vBucket gets
336        a document first, before it receives the second one and so on.
337
338        For example if we have 6 documents named doc-1 ... doc-6 and 3
339        vBuckets the result will be:
340
341            vbucket-1: doc-1, doc-4
342            vbucket-2: doc-2, doc-5
343            vbucket-3: doc-3, doc-6
344        """
345        for i, doc in enumerate(docs):
346            self.insert_into_vbucket(i % num_vbuckets, doc)
347        RebalanceHelper.wait_for_persistence(self.master, self.bucket, 0)
348
349    def populate_sequenced(self, num_vbuckets, docs):
350        """vBuckets get filled up one by one
351
352        Populating the vBuckets sequenced means that the vBucket gets
353        a certain number of documents, before the next one gets some.
354
355        For example if we have 6 documents named doc-1 ... doc-6 and 3
356        vBuckets the result will be:
357
358            vbucket-1: doc-1, doc-2
359            vbucket-2: doc-3, doc-4
360            vbucket-3: doc-5, doc-5
361        """
362        docs_per_vbucket = len(docs) / num_vbuckets
363
364        for vbucket in range(num_vbuckets):
365            start = vbucket * docs_per_vbucket
366            end = start + docs_per_vbucket
367            for doc in docs[start:end]:
368                    self.insert_into_vbucket(vbucket, doc)
369        RebalanceHelper.wait_for_persistence(self.master, self.bucket, 0)
370
371    def insert_into_vbucket(self, vbucket_id, doc):
372        """Insert a document into a certain vBucket
373
374        The memcached clients must already been initialised in the
375        self.clients property.
376        """
377        vbucket = self.bucket.vbuckets[vbucket_id]
378        client = self.clients[vbucket.master]
379
380        client.set(doc['json']['key'], 0, 0, json.dumps(doc['json']['body']).encode("ascii", "ignore"), vbucket_id)
381
382    @staticmethod
383    def make_docs(start, end):
384        """Create documents
385
386        `key` will be used as a key and won't end up in the final
387        document body.
388        `body` will be used as the document body
389        """
390        docs = []
391        for i in range(start, end):
392            doc = {
393                'key': str(i),
394                'body': { 'integer': i, 'string': str(i)}}
395
396            docs.append({"meta":{"id": str(i)}, "json": doc })
397        return docs
398
399    def merged_query(self, view_name, params={}, ddoc='test'):
400       bucket = self.default_bucket_name
401       if not 'stale' in params:
402           params['stale'] = 'false'
403       ddoc = ("", "dev_")[self.is_dev_view] + ddoc
404       return self.rest.query_view(ddoc, view_name, bucket, params)
405
406    def verify_keys_are_sorted(self, results, desc=False):
407        current_keys = [row['key'] for row in results['rows']]
408        self.assertTrue(ViewMergingTests._verify_list_is_sorted(current_keys, desc=desc), 'keys are not sorted')
409        self.log.info('rows are sorted by key')
410
411    @staticmethod
412    def _verify_list_is_sorted(keys, key=lambda x: x, desc=False):
413        if desc:
414            return all([key(keys[i]) >= key(keys[i + 1]) for i in xrange(len(keys) - 1)])
415        else:
416            return all([key(keys[i]) <= key(keys[i + 1]) for i in xrange(len(keys) - 1)])
417