1import os
2import json
3import uuid
4import copy
5import pprint
6import re
7import logging
8import testconstants
9import time
10import traceback
11import collections
12from subprocess import Popen, PIPE
13from remote.remote_util import RemoteMachineShellConnection
14from couchbase_helper.tuq_generators import JsonGenerator
15from basetestcase import BaseTestCase
16from membase.api.exception import CBQError, ReadDocumentException
17from couchbase_helper.documentgenerator import DocumentGenerator
18from membase.api.rest_client import RestConnection
19from security.rbac_base import RbacBase
20# from sdk_client import SDKClient
21from couchbase_helper.tuq_generators import TuqGenerators
22#from xdcr.upgradeXDCR import UpgradeTests
23from couchbase_helper.documentgenerator import JSONNonDocGenerator
24from couchbase.cluster import Cluster
25from couchbase.cluster import PasswordAuthenticator
26import couchbase.subdocument as SD
27from couchbase.n1ql import N1QLQuery, STATEMENT_PLUS,CONSISTENCY_REQUEST, MutationState
28import ast
29
30
31JOIN_INNER = "INNER"
32JOIN_LEFT = "LEFT"
33JOIN_RIGHT = "RIGHT"
34
35
36class QueryTests(BaseTestCase):
37    def setUp(self):
38        if not self._testMethodName == 'suite_setUp' \
39                and str(self.__class__).find('upgrade_n1qlrbac') == -1 \
40                and str(self.__class__).find('n1ql_upgrade') == -1 \
41                and str(self.__class__).find('AggregatePushdownRecoveryClass') == -1:
42            self.skip_buckets_handle = True
43        else:
44            self.skip_buckets_handle = False
45        super(QueryTests, self).setUp()
46        if self.input.param("force_clean", False):
47            self.skip_buckets_handle = False
48            super(QueryTests, self).setUp()
49        self.log.info("==============  QueryTests setup has started ==============")
50        self.version = self.input.param("cbq_version", "sherlock")
51        self.flat_json = self.input.param("flat_json", False)
52        self.directory_flat_json = self.input.param("directory_flat_json", "/tmp/")
53        if self.input.tuq_client and "client" in self.input.tuq_client:
54            self.shell = RemoteMachineShellConnection(self.input.tuq_client["client"])
55        else:
56            self.shell = RemoteMachineShellConnection(self.master)
57        if self.input.param("start_cmd", True) and self.input.param("cbq_version", "sherlock") != 'sherlock':
58            self._start_command_line_query(self.master, user=self.master.rest_username, password=self.master.rest_password)
59        self.use_rest = self.input.param("use_rest", True)
60        self.hint_index = self.input.param("hint", None)
61        self.max_verify = self.input.param("max_verify", None)
62        self.buckets = RestConnection(self.master).get_buckets()
63        self.docs_per_day = self.input.param("doc-per-day", 49)
64        self.item_flag = self.input.param("item_flag", 4042322160)
65        self.ipv6 = self.input.param("ipv6", False)
66        self.n1ql_port = self.input.param("n1ql_port", 8093)
67        self.analytics = self.input.param("analytics", False)
68        self.dataset = getattr(self, 'dataset',  self.input.param("dataset", "default"))
69        self.primary_indx_type = self.input.param("primary_indx_type", 'GSI')
70        self.index_type = self.input.param("index_type", 'GSI')
71        self.skip_primary_index = self.input.param("skip_primary_index", False)
72        self.primary_indx_drop = self.input.param("primary_indx_drop", False)
73        self.monitoring = self.input.param("monitoring", False)
74        self.value_size = self.input.param("value_size", 0)
75        self.isprepared = False
76        self.named_prepare = self.input.param("named_prepare", None)
77        self.encoded_prepare = self.input.param("encoded_prepare", False)
78        self.scan_consistency = self.input.param("scan_consistency", 'REQUEST_PLUS')
79        shell = RemoteMachineShellConnection(self.master)
80        type = shell.extract_remote_info().distribution_type
81        shell.disconnect()
82        self.path = testconstants.LINUX_COUCHBASE_BIN_PATH
83        self.array_indexing = self.input.param("array_indexing", False)
84        self.gens_load = self.gen_docs(self.docs_per_day)
85        self.skip_load = self.input.param("skip_load", False)
86        self.skip_index = self.input.param("skip_index", False)
87        self.plasma_dgm = self.input.param("plasma_dgm", False)
88        self.DGM = self.input.param("DGM", False)
89        self.covering_index = self.input.param("covering_index", False)
90        self.cluster_ops = self.input.param("cluster_ops",False)
91        self.server = self.master
92        self.rest = RestConnection(self.server)
93        self.username = self.rest.username
94        self.password = self.rest.password
95        self.cover = self.input.param("cover", False)
96        self.curl_path = "curl"
97        self.n1ql_certs_path = "/opt/couchbase/var/lib/couchbase/n1qlcerts"
98        if type.lower() == 'windows':
99            self.path = testconstants.WIN_COUCHBASE_BIN_PATH
100            self.curl_path = "%scurl" % self.path
101            self.n1ql_certs_path = "/cygdrive/c/Program\ Files/Couchbase/server/var/lib/couchbase/n1qlcerts"
102        elif type.lower() == "mac":
103            self.path = testconstants.MAC_COUCHBASE_BIN_PATH
104        if self.primary_indx_type.lower() == "gsi":
105            self.gsi_type = self.input.param("gsi_type", 'plasma')
106        if self.input.param("reload_data", False):
107            if self.analytics:
108                self.cluster.rebalance([self.master, self.cbas_node], [], [self.cbas_node], services=['cbas'])
109            for bucket in self.buckets:
110                self.cluster.bucket_flush(self.master, bucket=bucket, timeout=180000)
111            self.gens_load = self.gen_docs(self.docs_per_day)
112            self.load(self.gens_load, batch_size=1000, flag=self.item_flag)
113            if self.analytics:
114                self.cluster.rebalance([self.master, self.cbas_node], [self.cbas_node], [], services=['cbas'])
115        if not (hasattr(self, 'skip_generation') and self.skip_generation):
116            self.full_list = self.generate_full_docs_list(self.gens_load)
117        if self.input.param("gomaxprocs", None):
118            self.configure_gomaxprocs()
119        self.gen_results = TuqGenerators(self.log, self.generate_full_docs_list(self.gens_load))
120        if str(self.__class__).find('QueriesUpgradeTests') == -1 and self.primary_index_created == False:
121            if self.analytics == False:
122                self.create_primary_index_for_3_0_and_greater()
123        self.log.info('-'*100)
124        self.log.info('Temp fix for MB-16888')
125        if self.cluster_ops == False:
126           self.shell.execute_command("killall -9 cbq-engine")
127           self.shell.execute_command("killall -9 indexer")
128           self.sleep(30, 'wait for indexer')
129        self.log.info('-'*100)
130        if self.analytics:
131            self.setup_analytics()
132            self.sleep(30, 'wait for analytics setup')
133        if self.monitoring:
134            self.run_cbq_query('delete from system:prepareds')
135            self.run_cbq_query('delete from system:completed_requests')
136        self.log.info("==============  QueryTests setup has completed ==============")
137
138    def suite_setUp(self):
139        self.log.info("==============  QueryTests suite_setup has started ==============")
140        try:
141            os = self.shell.extract_remote_info().type.lower()
142            if os != 'windows':
143                self.sleep(10, 'sleep before load')
144            if not self.skip_load:
145                if self.flat_json:
146                    self.load_directory(self.gens_load)
147                else:
148                    self.load(self.gens_load, batch_size=1000, flag=self.item_flag)
149            if not self.input.param("skip_build_tuq", True):
150                self._build_tuq(self.master)
151            self.skip_buckets_handle = True
152            if self.analytics:
153                self.cluster.rebalance([self.master, self.cbas_node], [self.cbas_node], [], services=['cbas'])
154                self.setup_analytics()
155                self.sleep(30, 'wait for analytics setup')
156        except Exception, ex:
157            self.log.error('SUITE SETUP FAILED')
158            self.log.info(ex)
159            traceback.print_exc()
160            self.tearDown()
161        self.log.info("==============  QueryTests suite_setup has completed ==============")
162
163    def tearDown(self):
164        self.log.info("==============  QueryTests tearDown has started ==============")
165        if self._testMethodName == 'suite_tearDown':
166            self.skip_buckets_handle = False
167        if self.analytics == True:
168            bucket_username = "cbadminbucket"
169            bucket_password = "password"
170            data = 'use Default ;'
171            for bucket in self.buckets:
172                data += 'disconnect bucket {0} if connected;'.format(bucket.name)
173                data += 'drop dataset {0} if exists;'.format(bucket.name+ "_shadow")
174                data += 'drop bucket {0} if exists;'.format(bucket.name)
175            self.write_file("file.txt", data)
176            url = 'http://{0}:8095/analytics/service'.format(self.cbas_node.ip)
177            cmd = 'curl -s --data pretty=true --data-urlencode "statement@file.txt" ' + url + " -u " + bucket_username + ":" + bucket_password
178            os.system(cmd)
179            os.remove(filename)
180        self.log.info("==============  QueryTests tearDown has completed ==============")
181        super(QueryTests, self).tearDown()
182
183    def suite_tearDown(self):
184        self.log.info("==============  QueryTests suite_tearDown has started ==============")
185        if not self.input.param("skip_build_tuq", True):
186            if hasattr(self, 'shell'):
187                self._kill_all_processes_cbq()
188        self.log.info("==============  QueryTests suite_tearDown has completed ==============")
189
190##############################################################################################
191#
192#  Setup Helpers
193##############################################################################################
194
195    def log_config_info(self):
196        try:
197            current_indexes = []
198            query_response = self.run_cbq_query("SELECT * FROM system:indexes")
199            current_indexes = [(i['indexes']['name'],
200                                i['indexes']['keyspace_id'],
201                                frozenset([key.replace('`', '').replace('(', '').replace(')', '')
202                                           for key in i['indexes']['index_key']]),
203                                i['indexes']['state'],
204                                i['indexes']['using']) for i in query_response['results']]
205            # get all buckets
206            query_response = self.run_cbq_query("SELECT * FROM system:keyspaces")
207            buckets = [i['keyspaces']['name'] for i in query_response['results']]
208            self.log.info("==============  System Config: ==============\n")
209            for bucket in buckets:
210                query_response = self.run_cbq_query("SELECT COUNT(*) FROM `" + bucket + "`")
211                docs = query_response['results'][0]['$1']
212                bucket_indexes = []
213                for index in current_indexes:
214                    if index[1] == bucket:
215                        bucket_indexes.append(index[0])
216                self.log.info("Bucket: " + bucket)
217                self.log.info("Indexes: " + str(bucket_indexes))
218                self.log.info("Docs: " + str(docs) + "\n")
219            self.log.info("=============================================")
220        except Exception as e:
221            pass
222
223    def fail_if_no_buckets(self):
224        buckets = False
225        for a_bucket in self.buckets:
226            buckets = True
227        if not buckets:
228            self.fail('FAIL: This test requires buckets')
229
230    def write_file(self, filename, data):
231        f = open(filename,'w')
232        f.write(data)
233        f.close()
234
235    def setup_analytics(self):
236        data = 'use Default;'
237        self.log.info("No. of buckets : %s", len(self.buckets))
238        bucket_username = "cbadminbucket"
239        bucket_password = "password"
240        for bucket in self.buckets:
241            data += 'create bucket {0} with {{"bucket":"{0}","nodes":"{1}"}} ;'.format(bucket.name,self.cbas_node.ip)
242            data += 'create shadow dataset {1} on {0}; '.format(bucket.name,bucket.name+"_shadow")
243            data += 'connect bucket {0} with {{"username":"{1}","password":"{2}"}};'.format(bucket.name, bucket_username, bucket_password)
244        self.write_file("file.txt", data)
245        url = 'http://{0}:8095/analytics/service'.format(self.cbas_node.ip)
246        cmd = 'curl -s --data pretty=true --data-urlencode "statement@file.txt" ' + url + " -u " + bucket_username + ":" + bucket_password
247        os.system(cmd)
248        os.remove(filename)
249
250    def get_index_storage_stats(self, timeout=120, index_map=None):
251        api = self.index_baseUrl + 'stats/storage'
252        status, content, header = self._http_request(api, timeout=timeout)
253        if not status:
254            raise Exception(content)
255        json_parsed = json.loads(content)
256        index_storage_stats = {}
257        for index_stats in json_parsed:
258            bucket = index_stats["Index"].split(":")[0]
259            index_name = index_stats["Index"].split(":")[1]
260            if not bucket in index_storage_stats.keys():
261                index_storage_stats[bucket] = {}
262            index_storage_stats[bucket][index_name] = index_stats["Stats"]
263        return index_storage_stats
264
265    def get_dgm_for_plasma(self, indexer_nodes=None, memory_quota=256):
266        """
267        Internal Method to create OOM scenario
268        :return:
269        """
270        def validate_disk_writes(indexer_nodes=None):
271            if not indexer_nodes:
272                indexer_nodes = self.get_nodes_from_services_map(service_type="index", get_all_nodes=True)
273            for node in indexer_nodes:
274                indexer_rest = RestConnection(node)
275                content = self.get_index_storage_stats()
276                for index in content.values():
277                    for stats in index.values():
278                        if stats["MainStore"]["resident_ratio"] >= 1.00:
279                            return False
280            return True
281
282        def kv_mutations(self, docs=1):
283            if not docs:
284                docs = self.docs_per_day
285            gens_load = self.gen_docs(docs)
286            self.full_docs_list = self.generate_full_docs_list(gens_load)
287            self.gen_results = TuqGenerators(self.log, self.full_docs_list)
288            self.load(gens_load, buckets=self.buckets, flag=self.item_flag,
289                  verify_data=False, batch_size=1000)
290        if self.gsi_type != "plasma":
291            return
292        if not self.plasma_dgm:
293            return
294        self.log.info("Trying to get all indexes in DGM...")
295        self.log.info("Setting indexer memory quota to {0} MB...".format(memory_quota))
296        node = self.get_nodes_from_services_map(service_type="index")
297        rest = RestConnection(node)
298        rest.set_service_memoryQuota(service='indexMemoryQuota', memoryQuota=memory_quota)
299        cnt = 0
300        docs = 50 + self.docs_per_day
301        while cnt < 100:
302            if validate_disk_writes(indexer_nodes):
303                self.log.info("========== DGM is achieved ==========")
304                return True
305            kv_mutations(self, docs)
306            self.sleep(30)
307            cnt += 1
308            docs += 20
309        return False
310
311    #This method is not being used
312    def print_list_of_dicts(self, list_to_print, num_elements=10):
313        print('\n\n')
314        print('Printing a list...')
315        for item in list_to_print:
316            self.print_dict(item)
317            num_elements = num_elements-1
318            if num_elements == 0:
319                break
320
321    #This method is only used by the function right above it, which is not being used
322    def print_dict(self, dict_to_print):
323        for k, v in dict_to_print.iteritems():
324            print(k, v)
325        print('\n')
326
327        def get_user_list(self):
328            """
329            :return:  a list of {'id': 'userid', 'name': 'some_name ,
330            'password': 'passw0rd'}
331            """
332        user_list = []
333        for user in self.inp_users:
334            user_list.append({att: user[att] for att in ('id',
335                                                         'name',
336                                                         'password')})
337        return user_list
338
339    def get_user_role_list(self):
340        """
341        :return:  a list of {'id': 'userid', 'name': 'some_name ,
342         'roles': 'admin:fts_admin[default]'}
343        """
344        user_role_list = []
345        for user in self.inp_users:
346            user_role_list.append({att: user[att] for att in ('id',
347                                                              'name',
348                                                              'roles')})
349        return user_role_list
350
351    def create_users(self, users=None):
352        """
353        :param user: takes a list of {'id': 'xxx', 'name': 'some_name ,
354                                        'password': 'passw0rd'}
355        :return: Nothing
356        """
357        if not users:
358            users = self.users
359        RbacBase().create_user_source(users,'builtin',self.master)
360        self.log.info("SUCCESS: User(s) %s created"
361                      % ','.join([user['name'] for user in users]))
362
363    def assign_role(self, rest=None, roles=None):
364        if not rest:
365            rest = RestConnection(self.master)
366        #Assign roles to users
367        if not roles:
368            roles = self.roles
369        RbacBase().add_user_role(roles, rest,'builtin')
370        for user_role in roles:
371            self.log.info("SUCCESS: Role(s) %s assigned to %s"
372                          %(user_role['roles'], user_role['id']))
373
374##############################################################################################
375#
376#   Query Runner
377##############################################################################################
378    def query_runner(self, test_dict):
379        test_results = dict()
380        restore_indexes = self.get_parsed_indexes()
381        res_dict = dict()
382        res_dict['errors'] = []
383        for test_name in sorted(test_dict.keys()):
384            try:
385                index_list = test_dict[test_name]['indexes']
386                pre_queries = test_dict[test_name]['pre_queries']
387                queries = test_dict[test_name]['queries']
388                post_queries = test_dict[test_name]['post_queries']
389                asserts = test_dict[test_name]['asserts']
390                cleanups = test_dict[test_name]['cleanups']
391
392                # INDEX STAGE
393                current_indexes = self.get_parsed_indexes()
394                desired_indexes = self.parse_desired_indexes(index_list)
395                desired_index_set = self.make_hashable_index_set(desired_indexes)
396                current_index_set = self.make_hashable_index_set(current_indexes)
397
398                # drop all undesired indexes
399                self.drop_undesired_indexes(desired_index_set, current_index_set, current_indexes)
400
401                # create desired indexes
402                current_indexes = self.get_parsed_indexes()
403                current_index_set = self.make_hashable_index_set(current_indexes)
404                self.create_desired_indexes(desired_index_set, current_index_set, desired_indexes)
405
406                res_dict['pre_q_res'] = []
407                res_dict['q_res'] = []
408                res_dict['post_q_res'] = []
409                res_dict['errors'] = []
410                res_dict['cleanup_res'] = []
411
412                # PRE_QUERIES STAGE
413                self.log.info('Running Pre-query Stage')
414                for func in pre_queries:
415                    res = func(res_dict)
416                    res_dict['pre_q_res'].append(res)
417                # QUERIES STAGE
418                self.log.info('Running Query Stage')
419                for query in queries:
420                    res = self.run_cbq_query(query)
421                    res_dict['q_res'].append(res)
422                # POST_QUERIES STAGE
423                self.log.info('Running Post-query Stage')
424                for func in post_queries:
425                    res = func(res_dict)
426                    res_dict['post_q_res'].append(res)
427                # ASSERT STAGE
428                self.log.info('Running Assert Stage')
429                for func in asserts:
430                    res = func(res_dict)
431                    self.log.info('Pass: ' + test_name)
432                # CLEANUP STAGE
433                self.log.info('Running Cleanup Stage')
434                for func in cleanups:
435                    res = func(res_dict)
436                    res_dict['cleanup_res'].append(res)
437            except Exception as e:
438                self.log.info('Fail: ' + test_name)
439                res_dict['errors'].append((test_name, e, traceback.format_exc(), res_dict))
440
441            test_results[test_name] = res_dict
442
443        ## reset indexes
444        self.log.info('Queries completed, restoring previous indexes')
445        current_indexes = self.get_parsed_indexes()
446        restore_index_set = self.make_hashable_index_set(restore_indexes)
447        current_index_set = self.make_hashable_index_set(current_indexes)
448        self.drop_undesired_indexes(restore_index_set, current_index_set, current_indexes)
449        current_indexes = self.get_parsed_indexes()
450        current_index_set = self.make_hashable_index_set(current_indexes)
451        self.create_desired_indexes(restore_index_set, current_index_set, restore_indexes)
452
453        ## print errors
454        errors = [error for key in test_results.keys() for error in test_results[key]['errors']]
455        has_errors = False
456        if errors != []:
457            has_errors = True
458            error_string = '\n ************************ There are %s errors: ************************ \n \n' % (len(errors))
459            for error in errors:
460                error_string += '************************ Error in query: ' + str(error[0]) + ' ************************ \n'
461                error_string += str(error[2]) + '\n'
462            error_string += '************************ End of Errors ************************ \n'
463            self.log.error(error_string)
464
465        # trigger failure
466        self.assertEqual(has_errors, False)
467
468    def is_index_present(self, bucket_name, index_name, fields_set, using):
469        desired_index = (index_name, bucket_name,
470                         frozenset([field.split()[0].replace('`', '').replace('(', '').replace(')', '') for field in fields_set]),
471                         "online", using)
472        query_response = self.run_cbq_query("SELECT * FROM system:indexes")
473        current_indexes = [(i['indexes']['name'],
474                            i['indexes']['keyspace_id'],
475                            frozenset([key.replace('`', '').replace('(', '').replace(')', '')
476                                       for key in i['indexes']['index_key']]),
477                            i['indexes']['state'],
478                            i['indexes']['using']) for i in query_response['results']]
479        if desired_index in current_indexes:
480            return True
481        else:
482            return False
483
484    def wait_for_all_indexes_online(self):
485        cur_indexes = self.get_parsed_indexes()
486        for index in cur_indexes:
487            self._wait_for_index_online(index['bucket'], index['name'])
488
489    def wait_for_index_present(self, bucket_name, index_name, fields_set, using):
490        self.with_retry(lambda: self.is_index_present(bucket_name, index_name, fields_set, using), eval=True, delay=1, tries=30)
491
492    def wait_for_index_drop(self, bucket_name, index_name, fields_set, using):
493        self.with_retry(lambda: self.is_index_present(bucket_name, index_name, fields_set, using), eval=False, delay=1, tries=30)
494
495    def get_parsed_indexes(self):
496        query_response = self.run_cbq_query("SELECT * FROM system:indexes")
497        current_indexes = [{'name': i['indexes']['name'],
498                            'bucket': i['indexes']['keyspace_id'],
499                            'fields': frozenset([key.replace('`', '').replace('(', '').replace(')', '')
500                                                 for key in i['indexes']['index_key']]),
501                            'state': i['indexes']['state'],
502                            'using': i['indexes']['using'],
503                            'where': i['indexes'].get('condition', ''),
504                            'is_primary': i['indexes'].get('is_primary', False)} for i in query_response['results']]
505        return current_indexes
506
507    def parse_desired_indexes(self, index_list):
508        desired_indexes = [{'name': index['name'],
509                            'bucket': index['bucket'],
510                            'fields': frozenset([field.split()[0] for field in index['fields']]),
511                            'state': index['state'],
512                            'using': index['using'],
513                            'where': index.get('where', ''),
514                            'is_primary': index.get('is_primary', False)} for index in index_list]
515        return desired_indexes
516
517    def make_hashable_index_set(self, parsed_indexes):
518        return frozenset([frozenset(index_dict.items()) for index_dict in parsed_indexes])
519
520    def get_index_vars(self, index):
521        name = index['name']
522        keyspace = index['bucket']
523        fields = index['fields']
524        joined_fields = ', '.join(fields)
525        using = index['using']
526        is_primary = index['is_primary']
527        where = index['where']
528        return name, keyspace, fields, joined_fields, using, is_primary, where
529
530    def drop_undesired_indexes(self, desired_index_set, current_index_set, current_indexes):
531        if desired_index_set != current_index_set:
532            for current_index in current_indexes:
533                if frozenset(current_index.items()) not in desired_index_set:
534                    # drop index
535                    name, keyspace, fields, joined_fields, using, is_primary, where = self.get_index_vars(current_index)
536                    self.log.info("dropping index: %s %s %s" % (keyspace, name, using))
537                    if is_primary:
538                        self.run_cbq_query("DROP PRIMARY INDEX on %s USING %s" % (keyspace, using))
539                    else:
540                        self.run_cbq_query("DROP INDEX %s.%s USING %s" % (keyspace, name, using))
541                    self.wait_for_index_drop(keyspace, name, fields, using)
542
543    def create_desired_indexes(self, desired_index_set, current_index_set, desired_indexes):
544        if desired_index_set != current_index_set:
545            for desired_index in desired_indexes:
546                if frozenset(desired_index.items()) not in current_index_set:
547                    name, keyspace, fields, joined_fields, using, is_primary, where = self.get_index_vars(desired_index)
548                    self.log.info("creating index: %s %s %s" % (keyspace, name, using))
549                    if is_primary:
550                        self.run_cbq_query("CREATE PRIMARY INDEX ON `%s` USING %s" % (keyspace, using))
551                    else:
552                        if where != '':
553                            self.run_cbq_query("CREATE INDEX %s ON %s(%s) WHERE %s  USING %s" % (name, keyspace, joined_fields, where, using))
554                        else:
555                            self.run_cbq_query("CREATE INDEX %s ON %s(%s) USING %s" % (name, keyspace, joined_fields, using))
556                    self.wait_for_index_present(keyspace, name, fields, using)
557
558##############################################################################################
559#
560#   COMMON FUNCTIONS
561##############################################################################################
562    def ExplainPlanHelper(self, res):
563        try:
564            rv = res["results"][0]["plan"]
565        except:
566            rv = res["results"][0]
567        return rv
568
569    def PreparePlanHelper(self, res):
570        try:
571            rv = res["results"][0]["plan"]
572        except:
573            rv = res["results"][0]["operator"]
574        return rv
575
576    def gen_docs(self, docs_per_day=1, type='default', values_type=None, name='tuq', start=0, end=0):
577        json_generator = JsonGenerator()
578        generators = []
579        self.log.info('Generating %s:%s data...' % (type, self.dataset))
580        if type == 'default':
581            if self.array_indexing:
582                generators = json_generator.generate_docs_employee_array(docs_per_day, start)
583            elif self.dataset == 'default':
584                #not working
585                generators = json_generator.generate_docs_employee(docs_per_day, start)
586            elif self.dataset == 'sabre':
587                #works
588                generators = json_generator.generate_docs_sabre(docs_per_day, start)
589            elif self.dataset == 'employee':
590                #not working
591                generators = json_generator.generate_docs_employee_data(docs_per_day=docs_per_day, start=start)
592            elif self.dataset == 'simple':
593                #not working
594                generators = json_generator.generate_docs_employee_simple_data(docs_per_day=docs_per_day, start=start)
595            elif self.dataset == 'sales':
596                #not working
597                generators = json_generator.generate_docs_employee_sales_data(docs_per_day=docs_per_day, start=start)
598            elif self.dataset == 'bigdata':
599                #not working
600                generators = json_generator.generate_docs_bigdata(end=(1000*docs_per_day), start=start, value_size=self.value_size)
601            elif self.dataset == 'array':
602                generators = json_generator.generate_all_type_documents_for_gsi(docs_per_day=docs_per_day, start=start)
603            elif self.dataset == 'aggr':
604                generators = json_generator.generate_doc_for_aggregate_pushdown(docs_per_day=docs_per_day, start=start)
605            elif self.dataset == 'join':
606                types = ['Engineer', 'Sales', 'Support']
607                join_yr = [2010, 2011]
608                join_mo = xrange(1, 12 + 1)
609                join_day = xrange(1, 28 + 1)
610                template = '{{ "name":"{0}", "join_yr":{1}, "join_mo":{2}, "join_day":{3},'
611                template += ' "job_title":"{4}", "tasks_ids":{5}}}'
612                for info in types:
613                    for year in join_yr:
614                        for month in join_mo:
615                            for day in join_day:
616                                name = ["employee-%s" % (str(day))]
617                                tasks_ids = ["test_task-%s" % day, "test_task-%s" % (day + 1)]
618                                generators.append(DocumentGenerator("query-test-%s-%s-%s-%s" % (info, year, month, day),
619                                                                    template, name, [year], [month], [day], [info], [tasks_ids],
620                                                                    start=start, end=docs_per_day))
621            else:
622                self.fail("There is no dataset %s, please enter a valid one" % self.dataset)
623        elif type == 'base64':
624            if end == 0:
625                end = self.num_items
626            values = ["Engineer", "Sales", "Support"]
627            generators = [JSONNonDocGenerator(name, values, start=start, end=end)]
628
629        elif type == 'tasks':
630            start, end = 0, (28 + 1)
631            template = '{{ "task_name":"{0}", "project": "{1}"}}'
632            generators.append(DocumentGenerator("test_task", template, ["test_task-%s" % i for i in xrange(0,10)],
633                                                ["CB"], start=start, end=10))
634            generators.append(DocumentGenerator("test_task", template, ["test_task-%s" % i for i in xrange(10,20)],
635                                                ["MB"], start=10, end=20))
636            generators.append(DocumentGenerator("test_task", template, ["test_task-%s" % i for i in xrange(20,end)],
637                                                ["IT"], start=20, end=end))
638
639        elif type == 'json_non_docs':
640            if end==0:
641                end = self.num_items
642            if values_type == 'string':
643                values = ['Engineer', 'Sales', 'Support']
644            elif values_type == 'int':
645                values = [100, 200, 300, 400, 500]
646            elif values_type == 'array':
647                values = [[10, 20], [20, 30], [30, 40]]
648            else:
649                return []
650            generators = [JSONNonDocGenerator(name, values, start=start, end=end)]
651
652        elif type == 'nulls':
653            if not end:
654                end = self.num_items
655            generators = []
656            index = end/3
657            template = '{{ "feature_name":"{0}", "coverage_tests" : {{"P0":{1}, "P1":{2}, "P2":{3}}},'
658            template += '"story_point" : {4},"jira_tickets": {5}}}'
659            names = [str(i) for i in xrange(0, index)]
660            rates = xrange(0, index)
661            points = [[1, 2, 3], ]
662            jira_tickets = ['[{"Number": 1, "project": "cb", "description": "test"},' + \
663                            '{"Number": 2, "project": "mb", "description": "test"}]',]
664            generators.append(DocumentGenerator(name, template, names, rates, rates, rates, points, jira_tickets,
665                                                start=start, end=index))
666            template = '{{ "feature_name":"{0}", "coverage_tests" : {{"P0": null, "P1":null, "P2":null}},'
667            template += '"story_point" : [1,2,null],"jira_tickets": {1}}}'
668            jira_tickets = ['[{"Number": 1, "project": "cb", "description": "test"},' + \
669                            '{"Number": 2, "project": "mb", "description": null}]',]
670            names = [str(i) for i in xrange(index, index + index)]
671            generators.append(DocumentGenerator(name, template, names, jira_tickets, start=index, end=index + index))
672            template = '{{ "feature_name":"{0}", "coverage_tests" : {{"P4": 2}},'
673            template += '"story_point" : [null,null],"jira_tickets": {1}}}'
674            names = [str(i) for i in xrange(index + index, end)]
675            jira_tickets = ['[{"Number": 1, "project": "cb", "description": "test"},' + \
676                            '{"Number": 2, "project": "mb"}]',]
677            generators.append(DocumentGenerator(name, template, names, jira_tickets, start=index + index, end=end))
678        return generators
679
680    def buckets_docs_ready(self, bucket_docs_map):
681        ready = True
682        rest_conn = RestConnection(self.master)
683        bucket_docs_rest = rest_conn.get_buckets_itemCount()
684        for bucket in bucket_docs_map.keys():
685            query_response = self.run_cbq_query("SELECT COUNT(*) FROM `"+bucket+"`")
686            docs = query_response['results'][0]['$1']
687            if docs != bucket_docs_map[bucket] or bucket_docs_rest[bucket] != bucket_docs_map[bucket]:
688                self.log.info("Bucket Docs Not Ready For Bucket: " + str(bucket) + "... \n Expected: " + str(bucket_docs_map[bucket]) + "\n Query: " + str(docs) + "\n Rest: " + str(bucket_docs_rest[bucket]))
689                ready = False
690        return ready
691
692    def buckets_status_ready(self, bucket_status_map):
693        ready = True
694        rest_conn = RestConnection(self.master)
695        for bucket in bucket_status_map.keys():
696            status = rest_conn.get_bucket_status(bucket)
697            if status != bucket_status_map[bucket]:
698                self.log.info("still waiting for bucket: " + bucket + " with status: " + str(status) + " to have " + str(bucket_status_map[bucket]) + " status")
699                ready = False
700        return ready
701
702    def wait_for_buckets_status(self, bucket_status_map, delay, retries):
703        self.with_retry(lambda: self.buckets_status_ready(bucket_status_map), delay=delay, tries=retries)
704
705    def wait_for_bucket_docs(self, bucket_doc_map, delay, retries):
706        self.with_retry(lambda: self.buckets_docs_ready(bucket_doc_map), delay=delay, tries=retries)
707
708    def with_retry(self, func, eval=True, delay=5, tries=10, func_params=None):
709        attempts = 0
710        while attempts < tries:
711            attempts = attempts + 1
712            res = func()
713            if res == eval:
714                return res
715            else:
716                self.sleep(delay, 'incorrect results, sleeping for %s' % delay)
717        raise Exception('timeout, invalid results: %s' % res)
718
719    def negative_common_body(self, queries_errors={}):
720        if not queries_errors:
721            self.fail("No queries to run!")
722        check_code = False
723        self.fail_if_no_buckets()
724        for bucket in self.buckets:
725            for query_template, error_arg in queries_errors.iteritems():
726                if isinstance(error_arg,str):
727                    error = error_arg
728                else:
729                    error, code = error_arg
730                    check_code = True
731                try:
732                    query = self.gen_results.generate_query(query_template)
733                    actual_result = self.run_cbq_query(query.format(bucket.name))
734                except CBQError as ex:
735                    self.log.error(ex)
736                    self.log.error(error)
737                    self.assertTrue(str(ex).find(error) != -1,
738                                    "Error is incorrect.Actual %s.\n Expected: %s.\n" %(
739                                                                str(ex).split(':')[-1], error))
740                    if check_code:
741                        self.assertTrue(str(ex).find(str(code)) != -1,
742                                        "Error code is incorrect.Actual %s.\n Expected: %s.\n" % (str(ex), code))
743                else:
744                    self.fail("There were no errors. Error expected: %s" % error)
745
746    def prepared_common_body(self, server=None):
747        self.isprepared = True
748        result_no_prepare = self.run_cbq_query(server=server)['results']
749        if self.named_prepare:
750            if 'concurrent' not in self.named_prepare:
751                self.named_prepare = self.named_prepare + "_" +str(uuid.uuid4())[:4]
752            query = "PREPARE %s from %s" % (self.named_prepare, self.query)
753        else:
754            query = "PREPARE %s" % self.query
755        prepared = self.run_cbq_query(query=query, server=server)['results'][0]
756        if self.encoded_prepare and len(self.servers) > 1:
757            encoded_plan=prepared['encoded_plan']
758            result_with_prepare = self.run_cbq_query(query=prepared, is_prepared=True, encoded_plan=encoded_plan, server=server)['results']
759        else:
760            result_with_prepare = self.run_cbq_query(query=prepared, is_prepared=True, server=server)['results']
761        if self.cover:
762            self.assertTrue("IndexScan in %s" % result_with_prepare)
763            self.assertTrue("covers in %s" % result_with_prepare)
764            self.assertTrue("filter_covers in %s" % result_with_prepare)
765            self.assertFalse('ERROR' in (str(word).upper() for word in result_with_prepare))
766        msg = "Query result with prepare and without doesn't match.\nNo prepare: %s ... %s\nWith prepare: %s ... %s" \
767              % (result_no_prepare[:100], result_no_prepare[-100:], result_with_prepare[:100], result_with_prepare[-100:])
768        self.assertTrue(sorted(result_no_prepare) == sorted(result_with_prepare), msg)
769
770    def run_cbq_query(self, query=None, min_output_size=10, server=None, query_params={}, is_prepared=False, encoded_plan=None):
771        if query is None:
772            query = self.query
773        if server is None:
774            server = self.master
775            if self.input.tuq_client and "client" in self.input.tuq_client:
776                server = self.tuq_client
777        cred_params = {'creds': []}
778        rest = RestConnection(server)
779        username = rest.username
780        password = rest.password
781        cred_params['creds'].append({'user': username, 'pass': password})
782        for bucket in self.buckets:
783            if bucket.saslPassword:
784                cred_params['creds'].append({'user': 'local:%s' % bucket.name, 'pass': bucket.saslPassword})
785        query_params.update(cred_params)
786        if self.testrunner_client == 'python_sdk' and not is_prepared:
787            sdk_cluster = Cluster('couchbase://' + str(server.ip))
788            authenticator = PasswordAuthenticator(username, password)
789            sdk_cluster.authenticate(authenticator)
790            for bucket in self.buckets:
791                cb = sdk_cluster.open_bucket(bucket.name)
792
793            sdk_query = N1QLQuery(query)
794
795            # if is_prepared:
796            #     sdk_query.adhoc = False
797
798            if 'scan_consistency' in query_params:
799                if query_params['scan_consistency'] == 'REQUEST_PLUS':
800                    sdk_query.consistency = CONSISTENCY_REQUEST  # request_plus is currently mapped to the CONSISTENT_REQUEST constant in the Python SDK
801                elif query_params['scan_consistency'] == 'STATEMENT_PLUS':
802                    sdk_query.consistency = STATEMENT_PLUS
803                else:
804                    raise ValueError('Unknown consistency')
805            # Python SDK returns results row by row, so we need to iterate through all the results
806            row_iterator = cb.n1ql_query(sdk_query)
807            content = []
808            try:
809                for row in row_iterator:
810                    content.append(row)
811                row_iterator.meta['results'] = content
812                result = row_iterator.meta
813            except Exception, e:
814                #This will parse the resulting HTTP error and return only the dictionary containing the query results
815                result = ast.literal_eval(str(e).split("value=")[1].split(", http_status")[0])
816
817        elif self.use_rest:
818            query_params.update({'scan_consistency': self.scan_consistency})
819            if hasattr(self, 'query_params') and self.query_params:
820                query_params = self.query_params
821            if self.hint_index and (query.lower().find('select') != -1):
822                from_clause = re.sub(r'let.*', '',
823                                     re.sub(r'.*from', '', re.sub(r'where.*', '', query)))
824                from_clause = re.sub(r'LET.*', '',
825                                     re.sub(r'.*FROM', '', re.sub(r'WHERE.*', '', from_clause)))
826                from_clause = re.sub(r'select.*', '', re.sub(r'order by.*', '',
827                                                             re.sub(r'group by.*', '',
828                                                                    from_clause)))
829                from_clause = re.sub(r'SELECT.*', '', re.sub(r'ORDER BY.*', '',
830                                                             re.sub(r'GROUP BY.*', '',
831                                                                    from_clause)))
832                hint = ' USE INDEX (%s using %s) ' % (self.hint_index, self.index_type)
833                query = query.replace(from_clause, from_clause + hint)
834
835            if not is_prepared:
836                self.log.info('RUN QUERY %s' % query)
837
838            if self.analytics:
839                query = query + ";"
840                for bucket in self.buckets:
841                    query = query.replace(bucket.name, bucket.name + "_shadow")
842                result = RestConnection(self.cbas_node).execute_statement_on_cbas(query,
843                                                                                  "immediate")
844                result = json.loads(result)
845            else:
846                result = rest.query_tool(query, self.n1ql_port, query_params=query_params,
847                                         is_prepared=is_prepared, named_prepare=self.named_prepare,
848                                         encoded_plan=encoded_plan, servers=self.servers)
849        else:
850            if self.version == "git_repo":
851                output = self.shell.execute_commands_inside(
852                    "$GOPATH/src/github.com/couchbase/query/" + \
853                    "shell/cbq/cbq ", "", "", "", "", "", "")
854            else:
855                if not (self.isprepared):
856                    query = query.replace('"', '\\"')
857                    query = query.replace('`', '\\`')
858                    if self.ipv6:
859                        cmd = "%scbq  -engine=http://%s:%s/ -q -u %s -p %s" % (
860                        self.path, server.ip, self.n1ql_port, username, password)
861                    else:
862                        cmd = "%scbq  -engine=http://%s:%s/ -q -u %s -p %s" % (
863                        self.path, server.ip, server.port, username, password)
864
865                    output = self.shell.execute_commands_inside(cmd, query, "", "", "", "", "")
866                    if not (output[0] == '{'):
867                        output1 = '{%s' % output
868                    else:
869                        output1 = output
870                    result = json.loads(output1)
871        if isinstance(result, str) or 'errors' in result:
872            raise CBQError(result, server.ip)
873        if 'metrics' in result:
874            self.log.info("TOTAL ELAPSED TIME: %s" % result["metrics"]["elapsedTime"])
875        return result
876
877    def build_url(self, version):
878        info = self.shell.extract_remote_info()
879        type = info.distribution_type.lower()
880        if type in ["ubuntu", "centos", "red hat"]:
881            url = "https://s3.amazonaws.com/packages.couchbase.com/releases/couchbase-query/dp1/"
882            url += "couchbase-query_%s_%s_linux.tar.gz" %(version, info.architecture_type)
883        #TODO for windows
884        return url
885
886    def _build_tuq(self, server):
887        if self.version == "git_repo":
888            os = self.shell.extract_remote_info().type.lower()
889            if os != 'windows':
890                goroot = testconstants.LINUX_GOROOT
891                gopath = testconstants.LINUX_GOPATH
892            else:
893                goroot = testconstants.WINDOWS_GOROOT
894                gopath = testconstants.WINDOWS_GOPATH
895            if self.input.tuq_client and "gopath" in self.input.tuq_client:
896                gopath = self.input.tuq_client["gopath"]
897            if self.input.tuq_client and "goroot" in self.input.tuq_client:
898                goroot = self.input.tuq_client["goroot"]
899            cmd = "rm -rf {0}/src/github.com".format(gopath)
900            self.shell.execute_command(cmd)
901            cmd= 'export GOROOT={0} && export GOPATH={1} &&'.format(goroot, gopath) +\
902                ' export PATH=$PATH:$GOROOT/bin && go get github.com/couchbaselabs/tuqtng;' +\
903                'cd $GOPATH/src/github.com/couchbaselabs/tuqtng; go get -d -v ./...; cd .'
904            self.shell.execute_command(cmd)
905            cmd = 'export GOROOT={0} && export GOPATH={1} &&'.format(goroot, gopath) +\
906                ' export PATH=$PATH:$GOROOT/bin && cd $GOPATH/src/github.com/couchbaselabs/tuqtng; go build; cd .'
907            self.shell.execute_command(cmd)
908            cmd = 'export GOROOT={0} && export GOPATH={1} &&'.format(goroot, gopath) +\
909                ' export PATH=$PATH:$GOROOT/bin && cd $GOPATH/src/github.com/couchbaselabs/tuqtng/tuq_client; go build; cd .'
910            self.shell.execute_command(cmd)
911        else:
912            cbq_url = self.build_url(self.version)
913            #TODO for windows
914            cmd = "cd /tmp; mkdir tuq;cd tuq; wget {0} -O tuq.tar.gz;".format(cbq_url)
915            cmd += "tar -xvf tuq.tar.gz;rm -rf tuq.tar.gz"
916            self.shell.execute_command(cmd)
917
918    def _start_command_line_query(self, server, options='', user=None, password=None):
919        out = ''
920        if user and password:
921            auth_row = '%s:%s@' % (user, password)
922        os = self.shell.extract_remote_info().type.lower()
923        if self.flat_json:
924            if os == 'windows':
925                gopath = testconstants.WINDOWS_GOPATH
926                cmd = "cd %s/src/github.com/couchbase/query/server/cbq-engine; " % (gopath) +\
927                "./cbq-engine.exe -datastore=dir:%sdata >/dev/null 2>&1 &" % (self.directory_flat_json)
928            else:
929                gopath = testconstants.LINUX_GOPATH
930                cmd = "cd %s/src/github.com/couchbase/query/server/cbq-engine; " % (gopath) +\
931                "./cbq-engine -datastore=dir:%s/data >n1ql.log 2>&1 &" %(self.directory_flat_json)
932            out = self.shell.execute_command(cmd)
933            self.log.info(out)
934        elif self.version == "git_repo":
935            if os != 'windows':
936                gopath = testconstants.LINUX_GOPATH
937            else:
938                gopath = testconstants.WINDOWS_GOPATH
939            if self.input.tuq_client and "gopath" in self.input.tuq_client:
940                gopath = self.input.tuq_client["gopath"]
941            if os == 'windows':
942                cmd = "cd %s/src/github.com/couchbase/query/server/cbq-engine; " % (gopath) +\
943                "./cbq-engine.exe -datastore http://%s%s:%s/ %s >/dev/null 2>&1 &" %(('', auth_row)[auth_row is not None], server.ip, server.port, options)
944            else:
945                cmd = "cd %s/src/github.com/couchbase/query/server/cbq-engine; " % (gopath) +\
946                "./cbq-engine -datastore http://%s%s:%s/ %s >n1ql.log 2>&1 &" % (('', auth_row)[auth_row is not None], server.ip, server.port, options)
947            out = self.shell.execute_command(cmd)
948        elif self.version == "sherlock":
949            if self.services_init and self.services_init.find('n1ql') != -1:
950                return
951            if self.master.services and self.master.services.find('n1ql') !=-1:
952                return
953            if os == 'windows':
954                couchbase_path = testconstants.WIN_COUCHBASE_BIN_PATH
955                cmd = "cd %s; " % (couchbase_path) +\
956                "./cbq-engine.exe -datastore http://%s%s:%s/ %s >/dev/null 2>&1 &" %(('', auth_row)[auth_row is not None], server.ip, server.port, options)
957            else:
958                couchbase_path = testconstants.LINUX_COUCHBASE_BIN_PATH
959                cmd = "cd %s; " % (couchbase_path) +\
960                "./cbq-engine -datastore http://%s%s:%s/ %s >/dev/null 2>&1 &" %(('', auth_row)[auth_row is not None], server.ip, server.port, options)
961            out = self.shell.execute_command(cmd)
962            self.log.info(out)
963        else:
964            if os != 'windows':
965                cmd = "cd /tmp/tuq;./cbq-engine -couchbase http://%s:%s/ >/dev/null 2>&1 &" %(server.ip, server.port)
966            else:
967                cmd = "cd /cygdrive/c/tuq;./cbq-engine.exe -couchbase http://%s:%s/ >/dev/null 2>&1 &" %(server.ip, server.port)
968            self.shell.execute_command(cmd)
969        return out
970
971    #This method has no usages anywhere
972    def _set_env_variable(self, server):
973        self.shell.execute_command("export NS_SERVER_CBAUTH_URL=\"http://{0}:{1}/_cbauth\"".format(server.ip,server.port))
974        self.shell.execute_command("export NS_SERVER_CBAUTH_USER=\"{0}\"".format(server.rest_username))
975        self.shell.execute_command("export NS_SERVER_CBAUTH_PWD=\"{0}\"".format(server.rest_password))
976        self.shell.execute_command("export NS_SERVER_CBAUTH_RPC_URL=\"http://{0}:{1}/cbauth-demo\"".format(server.ip,server.port))
977        self.shell.execute_command("export CBAUTH_REVRPC_URL=\"http://{0}:{1}@{2}:{3}/query\"".format(server.rest_username,server.rest_password,server.ip,server.port))
978
979    #This method has no usages anywhere
980    def _parse_query_output(self, output):
981        if output.find("cbq>") == 0:
982            output = output[output.find("cbq>") + 4:].strip()
983        if output.find("tuq_client>") == 0:
984            output = output[output.find("tuq_client>") + 11:].strip()
985        if output.find("cbq>") != -1:
986            output = output[:output.find("cbq>")].strip()
987        if output.find("tuq_client>") != -1:
988            output = output[:output.find("tuq_client>")].strip()
989        return json.loads(output)
990
991    def _verify_results(self, actual_result, expected_result):
992        if self.max_verify is not None:
993            actual_result = actual_result[:self.max_verify]
994            expected_result = expected_result[:self.max_verify]
995            self.assertTrue(actual_result == expected_result, "Results are incorrect")
996            return
997        if len(actual_result) != len(expected_result):
998            missing, extra = self.check_missing_and_extra(actual_result, expected_result)
999            self.log.error("Missing items: %s.\n Extra items: %s" % (missing[:100], extra[:100]))
1000            self.fail("Results are incorrect.Actual num %s. Expected num: %s.\n" % (len(actual_result), len(expected_result)))
1001        msg = "Results are incorrect.\n Actual first and last 100:  %s.\n ... \n %s Expected first and last 100: %s.\n  ... \n %s" \
1002              % (actual_result[:100], actual_result[-100:], expected_result[:100], expected_result[-100:])
1003        self.assertTrue(actual_result == expected_result, msg)
1004
1005    def _verify_aggregate_query_results(self, result, query, bucket):
1006        def _gen_dict(res):
1007            result_set = []
1008            if res is not None and len(res) > 0:
1009                for val in res:
1010                    for key in val.keys():
1011                        result_set.append(val[key])
1012            return result_set
1013
1014        self.restServer = self.get_nodes_from_services_map(service_type="n1ql")
1015        self.rest = RestConnection(self.restServer)
1016        self.rest.set_query_index_api_mode(1)
1017        primary_query = query % (bucket, "#primary")
1018        primary_result = self.run_cbq_query(primary_query)
1019        self.rest.set_query_index_api_mode(3)
1020        self.log.info(" Analyzing Actual Result")
1021
1022        actual_result = _gen_dict(sorted(primary_result["results"]))
1023        self.log.info(" Analyzing Expected Result")
1024        expected_result = _gen_dict(sorted(result["results"]))
1025        if len(actual_result) != len(expected_result):
1026            return False
1027        if actual_result != expected_result:
1028            return False
1029        return True
1030
1031
1032    def check_missing_and_extra(self, actual, expected):
1033        missing, extra = [], []
1034        for item in actual:
1035            if not (item in expected):
1036                 extra.append(item)
1037        for item in expected:
1038            if not (item in actual):
1039                missing.append(item)
1040        return missing, extra
1041
1042    def sort_nested_list(self, result, key=None):
1043        actual_result = []
1044        for item in result:
1045            curr_item = {}
1046            for key, value in item.iteritems():
1047                if isinstance(value, list) or isinstance(value, set):
1048                    if not isinstance(value, set) and key and isinstance(value[0], dict) and key in value:
1049                        curr_item[key] = sorted(value, key=lambda doc: (doc['task_name']))
1050                    else:
1051                        curr_item[key] = sorted(value)
1052                else:
1053                    curr_item[key] = value
1054            actual_result.append(curr_item)
1055        return actual_result
1056
1057    def configure_gomaxprocs(self):
1058        max_proc = self.input.param("gomaxprocs", None)
1059        cmd = "export GOMAXPROCS=%s" % max_proc
1060        for server in self.servers:
1061            shell_connection = RemoteMachineShellConnection(self.master)
1062            shell_connection.execute_command(cmd)
1063
1064    def load_directory(self, generators_load):
1065        gens_load = []
1066        for generator_load in generators_load:
1067            gens_load.append(copy.deepcopy(generator_load))
1068        items = 0
1069        for gen_load in gens_load:
1070            items += (gen_load.end - gen_load.start)
1071
1072        self.fail_if_no_buckets()
1073        for bucket in self.buckets:
1074            try:
1075                shell = RemoteMachineShellConnection(self.master)
1076                self.log.info("Delete directory's content %sdata/default/%s ..." % (self.directory_flat_json, bucket.name))
1077                o = shell.execute_command('rm -rf %sdata/default/*' % self.directory_flat_json)
1078                self.log.info("Create directory %sdata/default/%s..." % (self.directory_flat_json, bucket.name))
1079                o = shell.execute_command('mkdir -p %sdata/default/%s' % (self.directory_flat_json, bucket.name))
1080                self.log.info("Load %s documents to %sdata/default/%s..." % (items, self.directory_flat_json, bucket.name))
1081                for gen_load in gens_load:
1082                    gen_load.reset()
1083                    for i in xrange(gen_load.end):
1084                        key, value = gen_load.next()
1085                        out = shell.execute_command("echo '%s' > %sdata/default/%s/%s.json" % (value, self.directory_flat_json,
1086                                                                                                bucket.name, key))
1087                self.log.info("LOAD IS FINISHED")
1088            except Exception, ex:
1089                self.log.info(ex)
1090                traceback.print_exc()
1091            finally:
1092                shell.disconnect()
1093
1094    '''Two separate flags are used to control whether or not a primary index is created, one for tuq(skip_index)
1095       and one for newtuq(skip_primary_index) we should go back and merge these flags and fix the conf files'''
1096    def create_primary_index_for_3_0_and_greater(self):
1097        if self.skip_index or self.skip_primary_index:
1098            self.log.info("Not creating index")
1099            return
1100        if self.flat_json:
1101            return
1102        self.sleep(30, 'Sleep for some time prior to index creation')
1103        rest = RestConnection(self.master)
1104        versions = rest.get_nodes_versions()
1105        if int(versions[0].split('.')[0]) > 2:
1106            for bucket in self.buckets:
1107                if self.primary_indx_drop:
1108                    self.log.info("Dropping primary index for %s ..." % bucket.name)
1109                    self.query = "DROP PRIMARY INDEX ON %s using %s" % (bucket.name, self.primary_indx_type)
1110                    self.sleep(6, 'Sleep for some time after index drop')
1111                self.query = "select * from system:indexes where name='#primary' and keyspace_id = %s" % bucket.name
1112                res = self.run_cbq_query(self.query)
1113                if res['metrics']['resultCount'] == 0:
1114                    self.query = "CREATE PRIMARY INDEX ON %s USING %s" % (bucket.name, self.primary_indx_type)
1115                    self.log.info("Creating primary index for %s ..." % bucket.name)
1116                    try:
1117                        self.run_cbq_query()
1118                        self.primary_index_created = True
1119                        if self.primary_indx_type.lower() == 'gsi':
1120                            self._wait_for_index_online(bucket.name, '#primary')
1121                    except Exception, ex:
1122                        self.log.info(str(ex))
1123
1124    def ensure_primary_indexes_exist(self):
1125        query_response = self.run_cbq_query("SELECT * FROM system:keyspaces")
1126        buckets = [i['keyspaces']['name'] for i in query_response['results']]
1127        current_indexes = self.get_parsed_indexes()
1128        index_list = [{'name': '#primary',
1129                       'bucket': bucket,
1130                       'fields': [],
1131                       'state': 'online',
1132                       'using': self.index_type.lower(),
1133                       'is_primary': True} for bucket in buckets]
1134        desired_indexes = self.parse_desired_indexes(index_list)
1135        desired_index_set = self.make_hashable_index_set(desired_indexes)
1136        current_index_set = self.make_hashable_index_set(current_indexes)
1137        self.create_desired_indexes(desired_index_set, current_index_set, desired_indexes)
1138
1139    def _wait_for_index_online(self, bucket, index_name, timeout=12000):
1140        end_time = time.time() + timeout
1141        while time.time() < end_time:
1142            query = "SELECT * FROM system:indexes where name='%s'" % index_name
1143            res = self.run_cbq_query(query)
1144            for item in res['results']:
1145                if 'keyspace_id' not in item['indexes']:
1146                    self.log.error(item)
1147                    continue
1148                bucket_name = ""
1149                if isinstance(bucket, str) or isinstance(bucket, unicode):
1150                    bucket_name = bucket
1151                else:
1152                    bucket_name = bucket.name
1153                if item['indexes']['keyspace_id'] == bucket_name:
1154                    if item['indexes']['state'] == "online":
1155                        return
1156            self.sleep(5, 'index is pending or not in the list. sleeping... (%s)' % [item['indexes'] for item in res['results']])
1157        raise Exception('index %s is not online. last response is %s' % (index_name, res))
1158
1159
1160##############################################################################################
1161#
1162#   newtuq COMMON FUNCTIONS
1163##############################################################################################
1164
1165    def run_query_from_template(self, query_template):
1166        self.query = self.gen_results.generate_query(query_template)
1167        expected_result = self.gen_results.generate_expected_result()
1168        actual_result = self.run_cbq_query()
1169        return actual_result, expected_result
1170
1171    def run_query_with_subquery_select_from_template(self, query_template):
1172        subquery_template = re.sub(r'.*\$subquery\(', '', query_template)
1173        subquery_template = subquery_template[:subquery_template.rfind(')')]
1174        keys_num = int(re.sub(r'.*KEYS \$', '', subquery_template).replace('KEYS $', ''))
1175        subquery_full_list = self.generate_full_docs_list(gens_load=self.gens_load,keys=self._get_keys(keys_num))
1176        subquery_template = re.sub(r'USE KEYS.*', '', subquery_template)
1177        sub_results = TuqGenerators(self.log, subquery_full_list)
1178        self.query = sub_results.generate_query(subquery_template)
1179        expected_sub = sub_results.generate_expected_result()
1180        alias = re.sub(r',.*', '', re.sub(r'.*\$subquery\(.*\)', '', query_template))
1181        alias = re.sub(r'.*as','', re.sub(r'FROM.*', '', alias)).strip()
1182        if not alias:
1183            alias = '$1'
1184        for item in self.gen_results.full_set:
1185            item[alias] = expected_sub[0]
1186        query_template = re.sub(r',.*\$subquery\(.*\).*%s' % alias, ',%s' % alias, query_template)
1187        self.query = self.gen_results.generate_query(query_template)
1188        expected_result = self.gen_results.generate_expected_result()
1189        actual_result = self.run_cbq_query()
1190        return actual_result, expected_result
1191
1192    def run_query_with_subquery_from_template(self, query_template):
1193        subquery_template = re.sub(r'.*\$subquery\(', '', query_template)
1194        subquery_template = subquery_template[:subquery_template.rfind(')')]
1195        subquery_full_list = self.generate_full_docs_list(gens_load=self.gens_load)
1196        sub_results = TuqGenerators(self.log, subquery_full_list)
1197        self.query = sub_results.generate_query(subquery_template)
1198        expected_sub = sub_results.generate_expected_result()
1199        alias = re.sub(r',.*', '', re.sub(r'.*\$subquery\(.*\)', '', query_template))
1200        alias = re.sub(r'.*as ', '', alias).strip()
1201        self.gen_results = TuqGenerators(self.log, expected_sub)
1202        query_template = re.sub(r'\$subquery\(.*\).*%s' % alias, ' %s' % alias, query_template)
1203        self.query = self.gen_results.generate_query(query_template)
1204        expected_result = self.gen_results.generate_expected_result()
1205        actual_result = self.run_cbq_query()
1206        return actual_result, expected_result
1207
1208    def _get_keys(self, key_num):
1209        keys = []
1210        for gen in self.gens_load:
1211            gen_copy = copy.deepcopy(gen)
1212            for i in xrange(gen_copy.end):
1213                key, _ = gen_copy.next()
1214                keys.append(key)
1215                if len(keys) == key_num:
1216                    return keys
1217        return keys
1218
1219    def run_active_requests(self, e, t):
1220        while not e.isSet():
1221            logging.debug('wait_for_event_timeout starting')
1222            event_is_set = e.wait(t)
1223            logging.debug('event set: %s', event_is_set)
1224            if event_is_set:
1225                result = self.run_cbq_query("select * from system:active_requests")
1226                self.assertTrue(result['metrics']['resultCount'] == 1)
1227                requestId = result['requestID']
1228                result = self.run_cbq_query(
1229                    'delete from system:active_requests where requestId  =  "%s"' % requestId)
1230                time.sleep(20)
1231                result = self.run_cbq_query(
1232                    'select * from system:active_requests  where requestId  =  "%s"' % requestId)
1233                self.assertTrue(result['metrics']['resultCount'] == 0)
1234                result = self.run_cbq_query("select * from system:completed_requests")
1235                requestId = result['requestID']
1236                result = self.run_cbq_query(
1237                    'delete from system:completed_requests where requestId  =  "%s"' % requestId)
1238                time.sleep(10)
1239                result = self.run_cbq_query(
1240                    'select * from system:completed_requests where requestId  =  "%s"' % requestId)
1241                self.assertTrue(result['metrics']['resultCount'] == 0)
1242
1243
1244##############################################################################################
1245#
1246#   tuq_sanity.py helpers
1247##############################################################################################
1248
1249    def expected_substr(self, a_string, start, index):
1250        if start is 0:
1251            substring = a_string[index:]
1252            if index >= len(a_string):
1253                return None
1254            elif index < -len(a_string):
1255                return None
1256            else:
1257                return substring
1258        if start is 1:
1259            substring = a_string[index - start:] if index > 0 else a_string[index:]
1260            if index >= len(a_string):
1261                return None
1262            elif index < -len(a_string):
1263                return None
1264            else:
1265                return substring
1266
1267    def run_regex_query(self, word, substring, regex_type=''):
1268        self.query = "select REGEXP_POSITION%s('%s', '%s')" % (regex_type, word, substring)
1269        results = self.run_cbq_query()
1270        return results['results'][0]['$1']
1271
1272    def run_position_query(self, word, substring, position_type = ''):
1273        self.query = "select POSITION%s('%s', '%s')" % (position_type, word, substring)
1274        results = self.run_cbq_query()
1275        return results['results'][0]['$1']
1276
1277    def check_explain_covering_index(self,index):
1278        for bucket in self.buckets:
1279            res = self.run_cbq_query()
1280            s = pprint.pformat( res, indent=4 )
1281            if index in s:
1282                self.log.info("correct index used in json result ")
1283            else:
1284                self.log.error("correct index not used in json result ")
1285                self.fail("correct index not used in json result ")
1286            if 'covers' in s:
1287                self.log.info("covers key present in json result ")
1288            else:
1289                self.log.error("covers key missing from json result ")
1290                self.fail("covers key missing from json result ")
1291            if 'cover' in s:
1292                self.log.info("cover keyword present in json children ")
1293            else:
1294                self.log.error("cover keyword missing from json children ")
1295                self.fail("cover keyword missing from json children ")
1296            if 'IntersectScan' in s:
1297                self.log.error("This is a covered query, Intersect scan should not be used")
1298##############################################################################################
1299#
1300#   upgrade_n1qlrbac.py helpers
1301##############################################################################################
1302    def query_select_insert_update_delete_helper(self):
1303        self.create_users(users=[{'id': 'john_insert', 'name': 'johnInsert', 'password':'password'}])
1304        self.create_users(users=[{'id': 'john_update', 'name': 'johnUpdate', 'password':'password'}])
1305        self.create_users(users=[{'id': 'john_delete', 'name': 'johnDelete', 'password':'password'}])
1306        self.create_users(users=[{'id': 'john_select', 'name': 'johnSelect', 'password':'password'}])
1307        self.create_users(users=[{'id': 'john_select2', 'name': 'johnSelect2', 'password':'password'}])
1308        self.create_users(users=[{'id': 'john_rep', 'name': 'johnRep', 'password':'password'}])
1309        self.create_users(users=[{'id': 'john_bucket_admin', 'name': 'johnBucketAdmin', 'password':'password'}])
1310        items = [("query_insert",'john_insert'), ("query_update",'john_update'), ("query_delete",'john_delete'),
1311                         ("query_select",'john_select'), ("bucket_admin",'john_bucket_admin'), ("query_select",'john_select2')]
1312        for bucket in self.buckets:
1313            for item in items:
1314                self.query = "GRANT {0} on {2} to {1}".format(item[0],item[1],bucket.name)
1315                self.n1ql_helper.run_cbq_query(query = self.query, server = self.n1ql_node)
1316
1317            self.query = "GRANT {0} to {1}".format("replication_admin",'john_rep')
1318            self.n1ql_helper.run_cbq_query(query = self.query, server = self.n1ql_node)
1319
1320    def query_select_insert_update_delete_helper_default(self):
1321        self.create_users(users=[{'id': 'john_insert', 'name': 'johnInsert', 'password':'password'}])
1322        self.create_users(users=[{'id': 'john_update', 'name': 'johnUpdate', 'password':'password'}])
1323        self.create_users(users=[{'id': 'john_delete', 'name': 'johnDelete', 'password':'password'}])
1324        self.create_users(users=[{'id': 'john_select', 'name': 'johnSelect', 'password':'password'}])
1325        self.create_users(users=[{'id': 'john_select2', 'name': 'johnSelect2', 'password':'password'}])
1326        self.create_users(users=[{'id': 'john_rep', 'name': 'johnRep', 'password':'password'}])
1327        self.create_users(users=[{'id': 'john_bucket_admin', 'name': 'johnBucketAdmin', 'password':'password'}])
1328        self.query = "GRANT {0} to {1}".format("replication_admin",'john_rep')
1329        self.n1ql_helper.run_cbq_query(query = self.query, server = self.n1ql_node)
1330
1331    def change_and_update_permission(self, query_type, permission, user, bucket, cmd, error_msg):
1332        if query_type == 'with_bucket':
1333            self.query = "GRANT {0} on {1} to {2}".format(permission, bucket, user)
1334        if query_type == 'without_bucket':
1335            self.query = "GRANT {0} to {1}".format(permission, user)
1336        if query_type in ['with_bucket', 'without_bucket']:
1337            self.n1ql_helper.run_cbq_query(query=self.query, server=self.n1ql_node)
1338        output, error = self.shell.execute_command(cmd)
1339        self.shell.log_command_output(output, error)
1340        self.assertTrue(any("success" in line for line in output), error_msg.format(bucket, user))
1341        self.log.info("Query executed successfully")
1342
1343    def check_permissions_helper(self):
1344        for bucket in self.buckets:
1345            cmd = "%s -u %s:%s http://%s:8093/query/service -d " \
1346                  "'statement=INSERT INTO %s (KEY, VALUE) VALUES(\"test\", { \"value1\": \"one1\" })'"% \
1347                  (self.curl_path,'john_insert', 'password', self.master.ip, bucket.name)
1348            self.change_and_update_permission(None, None, 'johnInsert', bucket.name, cmd, "Unable to insert into {0} as user {1}")
1349
1350            old_name = "employee-14"
1351            new_name = "employee-14-2"
1352            cmd = "{6} -u {0}:{1} http://{2}:8093/query/service -d " \
1353                  "'statement=UPDATE {3} a set name = '{4}' where name = '{5}' limit 1'". \
1354                format('john_update', 'password', self.master.ip, bucket.name, new_name, old_name,self.curl_path)
1355            self.change_and_update_permission(None, None, 'johnUpdate', bucket.name, cmd, "Unable to update into {0} as user {1}")
1356
1357            del_name = "employee-14"
1358            cmd = "{5} -u {0}:{1} http://{2}:8093/query/service -d " \
1359                  "'statement=DELETE FROM {3} a WHERE name = '{4}''". \
1360                format('john_delete', 'password', self.master.ip, bucket.name, del_name,self.curl_path)
1361            self.change_and_update_permission(None, None, 'john_delete', bucket.name, cmd, "Unable to delete from {0} as user {1}")
1362
1363            cmd = "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from {3} LIMIT 10'". \
1364                format('john_select2', 'password', self.master.ip,bucket.name,self.curl_path)
1365            self.change_and_update_permission(None, None, 'john_select2', bucket.name, cmd, "Unable to select from {0} as user {1}")
1366
1367    def create_and_verify_system_catalog_users_helper(self):
1368        self.create_users(users=[{'id': 'john_system', 'name': 'john', 'password':'password'}])
1369        self.query = "GRANT {0} to {1}".format("query_system_catalog",'john_system')
1370        self.n1ql_helper.run_cbq_query(query = self.query, server = self.n1ql_node)
1371        for bucket in self.buckets:
1372            cmds = ["{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from system:keyspaces'". \
1373                        format('john_system','password', self.master.ip, bucket.name,self.curl_path),
1374                    "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from system:namespaces'". \
1375                        format('john_system','password', self.master.ip, bucket.name,self.curl_path),
1376                    "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from system:datastores'". \
1377                        format('john_system','password', self.master.ip, bucket.name,self.curl_path),
1378                    "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from system:indexes'". \
1379                        format('john_system','password', self.master.ip, bucket.name,self.curl_path),
1380                    "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from system:completed_requests'". \
1381                        format('john_system','password', self.master.ip, bucket.name,self.curl_path),
1382                    "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from system:active_requests'". \
1383                        format('john_system','password', self.master.ip, bucket.name,self.curl_path),
1384                    "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from system:prepareds'". \
1385                        format('john_system','password', self.master.ip, bucket.name,self.curl_path),
1386                    "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from system:my_user_info'". \
1387                        format('john_system','password', self.master.ip, bucket.name,self.curl_path)]
1388            for cmd in cmds:
1389                self.change_and_update_permission(None, None, 'john_system', bucket.name, cmd, "Unable to select from {0} as user {1}")
1390
1391    def check_system_catalog_helper(self):
1392        """
1393        These test might fail for now as system catalog tables are not
1394        fully implemented based on query PM's doc.
1395        :return:
1396        """
1397        self.system_catalog_helper_delete_for_upgrade()
1398        self.system_catalog_helper_select_for_upgrade()
1399
1400    def query_assert_success(self, query):
1401        self.query = query
1402        res = self.run_cbq_query(query=self.query)
1403        self.assertEqual(res['status'], 'success')
1404
1405    def system_catalog_helper_select_for_upgrade(self):
1406        for query in ['select * from system:datastores', 'select * from system:namespaces',
1407                      'select * from system:keyspaces']:
1408            self.query_assert_success(query)
1409        self.query = 'create index idx1 on {0}(name)'.format(self.buckets[0].name)
1410        res = self.run_cbq_query(query=query)
1411        self.sleep(10)
1412        for query in ['select * from system:indexes', 'select * from system:dual',
1413                      "prepare st1 from select * from {0} union select * from {0} union select * from {0}".format(self.buckets[0].name),
1414                      'execute st1']:
1415            self.query_assert_success(query)
1416
1417    def system_catalog_helper_delete_for_upgrade(self):
1418        self.queries = ['delete from system:datastores', 'delete from system:namespaces', 'delete from system:keyspaces',
1419                        'delete from system:indexes', 'delete from system:user_info', 'delete from system:nodes',
1420                        'delete from system:applicable_roles']
1421        for query in self.queries:
1422            try:
1423                self.run_cbq_query(query=query)
1424            except Exception, ex:
1425                self.log.error(ex)
1426                self.assertNotEqual(str(ex).find("'code': 11003"), -1)
1427        try:
1428            query = 'delete from system:dual'
1429            self.run_cbq_query(query=query)
1430        except Exception,ex:
1431            self.log.error(ex)
1432            self.assertNotEqual(str(ex).find("'code': 11000"), -1)
1433
1434        queries = ['delete from system:completed_requests', 'delete from system:active_requests where state!="running"',
1435                   'delete from system:prepareds']
1436        for query in queries:
1437            res = self.run_cbq_query(query=query)
1438            self.assertEqual(res['status'], 'success')
1439
1440        queries = ['select * from system:completed_requests', 'select * from system:active_requests',
1441                   'select * from system:prepareds']
1442        for query in queries:
1443            res = self.run_cbq_query(query=query)
1444            self.assertEqual(res['status'], 'success')
1445
1446    def change_and_verify_pre_upgrade_ldap_users_permissions(self):
1447        for bucket in self.buckets:
1448            # change permission of john_bucketadmin1 and verify its able to execute the correct query.
1449            cmd = "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from {3} limit 1'". \
1450                format('bucket0', 'password', self.master.ip,bucket.name,self.curl_path)
1451            self.change_and_update_permission('with_bucket', "query_select", 'bucket0', bucket.name, cmd,
1452                                              "Unable to select from {0} as user {1}")
1453
1454            # change permission of john_bucketadminAll and verify its able to execute the correct query.
1455            cmd = "%s -u %s:%s http://%s:8093/query/service -d 'statement=INSERT INTO %s (KEY, VALUE) VALUES(\"1\", { \"value1\": \"one1\" })'" \
1456                  % (self.curl_path,'bucket0', 'password',self.master.ip,bucket.name)
1457            self.change_and_update_permission('with_bucket', "query_insert", 'bucket0', bucket.name, cmd,
1458                                              "Unable to insert into {0} as user {1}")
1459
1460            # change permission of cluster_user and verify its able to execute the correct query.
1461            old_name = "employee-14"
1462            new_name = "employee-14-2"
1463            cmd = "{6} -u {0}:{1} http://{2}:8093/query/service -d 'statement=UPDATE {3} a set name = '{4}' where " \
1464                  "name = '{5}' limit 1'".format('bucket0', 'password',self.master.ip,bucket.name,new_name,
1465                                                 old_name,self.curl_path)
1466            self.change_and_update_permission('with_bucket', "query_update", 'bucket0', bucket.name, cmd,
1467                                              "Unable to update  {0} as user {1}")
1468
1469            #change permission of bucket0 and verify its able to execute the correct query.
1470            del_name = "employee-14"
1471            cmd = "{5} -u {0}:{1} http://{2}:8093/query/service -d 'statement=DELETE FROM {3} a WHERE name = '{4}''". \
1472                format('bucket0', 'password', self.master.ip, bucket.name, del_name,self.curl_path)
1473            self.change_and_update_permission('with_bucket', "query_delete", 'bucket0', bucket.name, cmd,
1474                                              "Unable to delete from {0} as user {1}")
1475
1476            # change permission of cbadminbucket user and verify its able to execute the correct query.
1477            cmd = "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from system:keyspaces'". \
1478                format('cbadminbucket','password', self.master.ip, bucket.name,self.curl_path)
1479            self.change_and_update_permission('without_bucket', "query_system_catalog", 'cbadminbucket',
1480                                              'cbadminbucket', cmd, "Unable to select from system:keyspaces as user {0}")
1481
1482    def create_ldap_auth_helper(self):
1483        """
1484        Helper function for creating ldap users pre-upgrade
1485        :return:
1486        """
1487        # not able to create bucket admin on passwordless bucket pre upgrade
1488        users = [
1489            {'id': 'john_bucketadminAll', 'name': 'john_bucketadminAll', 'password': 'password'},
1490            {'id': 'cluster_user','name':'cluster_user','password':'password'},
1491            {'id': 'read_user','name':'read_user','password':'password'},
1492            {'id': 'cadmin','name':'cadmin','password':'password'},]
1493        RbacBase().create_user_source(users, 'ldap', self.master)
1494        rolelist = [{'id': 'john_bucketadminAll', 'name': 'john_bucketadminAll','roles': 'bucket_admin[*]'},
1495                    {'id': 'cluster_user', 'name': 'cluster_user','roles': 'cluster_admin'},
1496                    {'id': 'read_user', 'name': 'read_user','roles': 'ro_admin'},
1497                    {'id': 'cadmin', 'name': 'cadmin','roles': 'admin'}]
1498        RbacBase().add_user_role(rolelist, RestConnection(self.master), 'ldap')
1499
1500    def verify_pre_upgrade_users_permissions_helper(self,test = ''):
1501
1502        cmd = "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from {3} LIMIT 10'". \
1503            format('bucket0', 'password', self.master.ip,'bucket0',self.curl_path)
1504        self.change_and_update_permission(None, None, 'bucket0', 'bucket0', cmd, "Unable to select from {0} as user {1}")
1505
1506        if test == 'online_upgrade':
1507            cmd = "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from {3} LIMIT 10'". \
1508                format('cbadminbucket', 'password', self.master.ip,'default',self.curl_path)
1509        else:
1510            cmd = "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from {3} LIMIT 10'". \
1511                format('cbadminbucket', 'password', self.master.ip,'bucket0',self.curl_path)
1512
1513        self.change_and_update_permission(None, None, 'cbadminbucket', 'bucket0', cmd, "Unable to select from {0} as user {1}")
1514
1515        cmd = "{3} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from system:keyspaces'". \
1516            format('cbadminbucket', 'password', self.master.ip,self.curl_path)
1517        self.change_and_update_permission(None, None, 'cbadminbucket', 'system:keyspaces', cmd, "Unable to select from {0} as user {1}")
1518
1519        for bucket in self.buckets:
1520            cmd = "%s -u %s:%s http://%s:8093/query/service -d " \
1521                  "'statement=INSERT INTO %s (KEY, VALUE) VALUES(\"5\", { \"value1\": \"one1\" })'"% \
1522                  (self.curl_path,'bucket0', 'password', self.master.ip, bucket.name)
1523
1524            self.change_and_update_permission(None, None, 'bucket0', bucket.name, cmd, "Unable to insert into {0} as user {1}")
1525
1526            old_name = "employee-14"
1527            new_name = "employee-14-2"
1528            cmd = "{6} -u {0}:{1} http://{2}:8093/query/service -d " \
1529                  "'statement=UPDATE {3} a set name = '{4}' where name = '{5}' limit 1'". \
1530                format('bucket0', 'password', self.master.ip, bucket.name, new_name, old_name,self.curl_path)
1531            self.change_and_update_permission(None, None, 'bucket0', bucket.name, cmd, "Unable to update into {0} as user {1}")
1532
1533            del_name = "employee-14"
1534            cmd = "{5} -u {0}:{1} http://{2}:8093/query/service -d 'statement=DELETE FROM {3} a WHERE name = '{4}''". \
1535                format('bucket0', 'password', self.master.ip, bucket.name, del_name,self.curl_path)
1536            self.change_and_update_permission(None, None, 'bucket0', bucket.name, cmd, "Unable to delete from {0} as user {1}")
1537
1538    def use_pre_upgrade_users_post_upgrade(self):
1539        for bucket in self.buckets:
1540            cmd = "%s -u %s:%s http://%s:8093/query/service -d " \
1541                  "'statement=INSERT INTO %s (KEY, VALUE) VALUES(\"test2\", { \"value1\": \"one1\" })'"% \
1542                  (self.curl_path,'cbadminbucket', 'password', self.master.ip, bucket.name)
1543            self.change_and_update_permission(None, None, 'johnInsert', bucket.name, cmd, "Unable to insert into {0} as user {1}")
1544
1545            old_name = "employee-14"
1546            new_name = "employee-14-2"
1547            cmd = "{6} -u {0}:{1} http://{2}:8093/query/service -d " \
1548                  "'statement=UPDATE {3} a set name = '{4}' where name = '{5}' limit 1'". \
1549                format('cbadminbucket', 'password', self.master.ip, bucket.name, new_name, old_name,self.curl_path)
1550            self.change_and_update_permission(None, None, 'johnUpdate', bucket.name, cmd, "Unable to update into {0} as user {1}")
1551
1552            cmd = "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from {3} LIMIT 10'". \
1553                format(bucket.name, 'password', self.master.ip,bucket.name,self.curl_path)
1554            self.change_and_update_permission(None, None, bucket.name, bucket.name, cmd, "Unable to select from {0} as user {1}")
1555
1556    def change_permissions_and_verify_pre_upgrade_users(self):
1557        for bucket in self.buckets:
1558            # change permission of john_cluster and verify its able to execute the correct query.
1559            cmd = "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from {3} limit 1'". \
1560                format(bucket.name, 'password', self.master.ip, bucket.name, self.curl_path)
1561            self.change_and_update_permission('with_bucket', "query_select", bucket.name,
1562                                              bucket.name, cmd, "Unable to select from {0} as user {1}")
1563
1564            # change permission of ro_non_ldap and verify its able to execute the correct query.
1565            old_name = "employee-14"
1566            new_name = "employee-14-2"
1567            cmd = "{6} -u {0}:{1} http://{2}:8093/query/service -d 'statement=UPDATE {3} a set name = '{4}' where " \
1568                  "name = '{5}' limit 1'".format('cbadminbucket', 'readonlypassword',self.master.ip,bucket.name,new_name, old_name,self.curl_path)
1569            self.change_and_update_permission('with_bucket', "query_update", 'cbadminbucket',
1570                                              bucket.name, cmd, "Unable to update  {0} as user {1}")
1571
1572            # change permission of john_admin and verify its able to execute the correct query.
1573            del_name = "employee-14"
1574            cmd = "{5} -u {0}:{1} http://{2}:8093/query/service -d " \
1575                  "'statement=DELETE FROM {3} a WHERE name = '{4}''". \
1576                format('cbadminbucket', 'password', self.master.ip, bucket.name, del_name,self.curl_path)
1577            self.change_and_update_permission('with_bucket', "query_delete", 'cbadminbucket',
1578                                              bucket.name, cmd, "Unable to update  {0} as user {1}")
1579
1580            # change permission of bob user and verify its able to execute the correct query.
1581
1582            self.change_and_update_permission('without_bucket', "query_system_catalog", 'cbadminbucket',
1583                                              bucket.name, cmd, "Unable to select from system:keyspaces as user {1}")
1584
1585    def change_permissions_and_verify_new_users(self):
1586        for bucket in self.buckets:
1587            # change permission of john_insert and verify its able to execute the correct query.
1588            cmd = "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from {3} limit 1'". \
1589                format('john_insert', 'password', self.master.ip, bucket.name, self.curl_path)
1590            self.change_and_update_permission('with_bucket', "bucket_admin", 'john_insert',
1591                                              bucket.name, cmd, "Unable to select from {0} as user {1}")
1592
1593            # change permission of john_update and verify its able to execute the correct query.
1594            cmd = "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=INSERT INTO {3} values(\"k055\", 123  )' " \
1595                .format('john_update', 'password',self.master.ip,bucket.name,self.curl_path)
1596            self.change_and_update_permission('with_bucket', "query_insert", 'john_update',
1597                                              bucket.name, cmd, "Unable to insert into {0} as user {1}")
1598
1599            # change permission of john_select and verify its able to execute the correct query.
1600            old_name = "employee-14"
1601            new_name = "employee-14-2"
1602            cmd = "{6} -u {0}:{1} http://{2}:8093/query/service -d 'statement=UPDATE {3} a set name = '{4}' where " \
1603                  "name = '{5}' limit 1'".format('john_select', 'password', self.master.ip, bucket.name, new_name,
1604                                                 old_name, self.curl_path)
1605            self.change_and_update_permission('without_bucket', "cluster_admin", 'john_select',
1606                                              bucket.name, cmd, "Unable to update  {0} as user {1}")
1607            cmd = "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from {3} limit 1'". \
1608                format('john_select', 'password', self.master.ip,bucket.name,self.curl_path)
1609            output, error = self.shell.execute_command(cmd)
1610            self.assertTrue(any("success" in line for line in output), "Unable to select from {0} as user {1}".
1611                            format(bucket.name, 'john_select'))
1612
1613            # change permission of john_select2 and verify its able to execute the correct query.
1614            del_name = "employee-14"
1615            cmd = "{5} -u {0}:{1} http://{2}:8093/query/service -d 'statement=DELETE FROM {3} a WHERE name = '{4}''". \
1616                format('john_select2', 'password', self.master.ip, bucket.name, del_name,self.curl_path)
1617            self.change_and_update_permission('with_bucket', "query_delete", 'john_select2',
1618                                              bucket.name, cmd, "Unable to delete from {0} as user {1}")
1619
1620            # change permission of john_delete and verify its able to execute the correct query.
1621            cmd = "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement=SELECT * from {3} limit 1'". \
1622                format('john_delete', 'password', self.master.ip,bucket.name,self.curl_path)
1623            self.change_and_update_permission('with_bucket', "query_select", 'john_delete',
1624                                              bucket.name, cmd, "Unable to select from {0} as user {1}")
1625
1626    def create_users(self, users=None):
1627        """
1628        :param user: takes a list of {'id': 'xxx', 'name': 'some_name ,
1629                                        'password': 'passw0rd'}
1630        :return: Nothing
1631        """
1632        if not users:
1633            users = self.users
1634        RbacBase().create_user_source(users, 'builtin', self.master)
1635        self.log.info("SUCCESS: User(s) %s created" % ','.join([user['name'] for user in users]))
1636
1637    def create_users_before_upgrade_non_ldap(self):
1638        """
1639        password needs to be added statically for these users
1640        on the specific machine where ldap is enabled.
1641        """
1642        cli_cmd = "{0}couchbase-cli -c {1}:8091 -u Administrator -p password".format(self.path, self.master.ip)
1643        cmds = [("create a read only user account", cli_cmd+" user-manage --set --ro-username=ro_non_ldap --ro-password=readonlypassword"),
1644                ("create a bucket admin on bucket0 user account", cli_cmd+" admin-role-manage --set-users=bob --set-names=Bob --roles=bucket_admin[bucket0]"),
1645                ("create a bucket admin on all buckets user account", cli_cmd+" admin-role-manage --set-users=mary --set-names=Mary --roles=bucket_admin[*]"),
1646                ("create a cluster admin user account", cli_cmd+"admin-role-manage --set-users=john_cluster --set-names=john_cluster --roles=cluster_admin"),
1647                ("create a admin user account", cli_cmd+" admin-role-manage --set-users=john_admin --set-names=john_admin --roles=admin")]
1648        for cmd in cmds:
1649            self.log.info(cmd[0])
1650            self.shell.execute_command(cmd[1])
1651        users = [{'id': 'Bob', 'name': 'Bob', 'password': 'password', 'roles': 'admin'},
1652                 {'id': 'mary', 'name': 'Mary', 'password': 'password', 'roles': 'cluster_admin'},
1653                 {'id': 'john_cluster','name':'john_cluster','password':'password', 'roles': 'cluster_admin'},
1654                 {'id': 'ro_non_ldap','name':'ro_non_ldap','password':'readonlypassword', 'roles': 'ro_admin'},
1655                 {'id': 'john_admin','name':'john_admin','password':'password', 'roles': 'admin'}]
1656
1657        RbacBase().create_user_source(users, 'ldap', self.master)
1658        RbacBase().add_user_role(users, RestConnection(self.master), 'ldap')
1659
1660    def _perform_offline_upgrade(self):
1661        for server in self.servers:
1662            remote = RemoteMachineShellConnection(server)
1663            remote.stop_server()
1664            remote.disconnect()
1665            self.upgrade_servers.append(server)
1666        upgrade_threads = self._async_update(self.upgrade_to, self.servers)
1667        for upgrade_thread in upgrade_threads:
1668            upgrade_thread.join()
1669        self.sleep(20)
1670        self.add_built_in_server_user()
1671        self.sleep(20)
1672        self.upgrade_servers = self.servers
1673
1674    def _perform_online_upgrade_with_rebalance(self):
1675        self.nodes_upgrade_path = self.input.param("nodes_upgrade_path", "").split("-")
1676        for service in self.nodes_upgrade_path:
1677            nodes = self.get_nodes_from_services_map(service_type=service, get_all_nodes=True)
1678
1679            self.log.info("----- Upgrading all {0} nodes -----".format(service))
1680            for node in nodes:
1681                node_rest = RestConnection(node)
1682                node_info = "{0}:{1}".format(node.ip, node.port)
1683                node_services_list = node_rest.get_nodes_services()[node_info]
1684                node_services = [",".join(node_services_list)]
1685
1686                if "n1ql" in node_services_list:
1687                    n1ql_nodes = self.get_nodes_from_services_map(service_type="n1ql", get_all_nodes=True)
1688                    if len(n1ql_nodes) > 1:
1689                        for n1ql_node in n1ql_nodes:
1690                            if node.ip != n1ql_node.ip:
1691                                self.n1ql_node = n1ql_node
1692                                break
1693
1694                self.log.info("Rebalancing the node out...")
1695                rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],[], [node])
1696                rebalance.result()
1697                active_nodes = []
1698                for active_node in self.servers:
1699                    if active_node.ip != node.ip:
1700                        active_nodes.append(active_node)
1701                self.log.info("Upgrading the node...")
1702                upgrade_th = self._async_update(self.upgrade_to, [node])
1703                for th in upgrade_th:
1704                    th.join()
1705
1706                self.log.info("==== Upgrade Complete ====")
1707                self.log.info("Adding node back to cluster...")
1708                rebalance = self.cluster.async_rebalance(active_nodes, [node], [], services=node_services)
1709                rebalance.result()
1710                self.sleep(60)
1711                node_version = RestConnection(node).get_nodes_versions()
1712                self.log.info("{0} node {1} Upgraded to: {2}".format(service, node.ip, node_version))
1713
1714    def _perform_online_upgrade_with_failover(self):
1715        self.nodes_upgrade_path = self.input.param("nodes_upgrade_path", "").split("-")
1716        for service in self.nodes_upgrade_path:
1717            nodes = self.get_nodes_from_services_map(service_type=service, get_all_nodes=True)
1718
1719            self.log.info("----- Upgrading all {0} nodes -----".format(service))
1720            for node in nodes:
1721                node_rest = RestConnection(node)
1722                node_info = "{0}:{1}".format(node.ip, node.port)
1723                node_services_list = node_rest.get_nodes_services()[node_info]
1724                node_services = [",".join(node_services_list)]
1725
1726                self.log.info("Rebalancing the node out...")
1727                failover_task = self.cluster.async_failover([self.master], failover_nodes=[node], graceful=False)
1728                failover_task.result()
1729                active_nodes = []
1730                for active_node in self.servers:
1731                    if active_node.ip != node.ip:
1732                        active_nodes.append(active_node)
1733                self.log.info("Upgrading the node...")
1734                upgrade_th = self._async_update(self.upgrade_to, [node])
1735                for th in upgrade_th:
1736                    th.join()
1737
1738                self.log.info("==== Upgrade Complete ====")
1739                self.sleep(30)
1740
1741                self.log.info("Adding node back to cluster...")
1742                rest = RestConnection(self.master)
1743                nodes_all = rest.node_statuses()
1744                for cluster_node in nodes_all:
1745                    if cluster_node.ip == node.ip:
1746                        self.log.info("Adding Back: {0}".format(node))
1747                        rest.add_back_node(cluster_node.id)
1748                        rest.set_recovery_type(otpNode=cluster_node.id, recoveryType="full")
1749
1750                self.log.info("Adding node back to cluster...")
1751                rebalance = self.cluster.async_rebalance(active_nodes, [], [])
1752                rebalance.result()
1753                self.sleep(60)
1754                node_version = RestConnection(node).get_nodes_versions()
1755                self.log.info("{0} node {1} Upgraded to: {2}".format(service, node.ip, node_version))
1756
1757##############################################################################################
1758#
1759# n1ql_rbac_2.py helpers
1760# Again very specific, some things are generalizable, perhaps rbac should have its own query base test
1761#
1762##############################################################################################
1763    def create_users(self, users=None):
1764        """
1765        :param user: takes a list of {'id': 'xxx', 'name': 'some_name ,
1766                                        'password': 'passw0rd'}
1767        :return: Nothing
1768        """
1769        if not users:
1770            users = self.users
1771        RbacBase().create_user_source(users,'builtin',self.master)
1772        self.log.info("SUCCESS: User(s) %s created" % ','.join([user['name'] for user in users]))
1773
1774    def assign_role(self, rest=None, roles=None):
1775        if not rest:
1776            rest = RestConnection(self.master)
1777        #Assign roles to users
1778        if not roles:
1779            roles = self.roles
1780        RbacBase().add_user_role(roles, rest,'builtin')
1781        for user_role in roles:
1782            self.log.info("SUCCESS: Role(s) %s assigned to %s"
1783                          %(user_role['roles'], user_role['id']))
1784
1785    def delete_role(self, rest=None, user_ids=None):
1786        if not rest:
1787            rest = RestConnection(self.master)
1788        if not user_ids:
1789            user_ids = [user['id'] for user in self.roles]
1790        RbacBase().remove_user_role(user_ids, rest)
1791        self.sleep(20, "wait for user to get deleted...")
1792        self.log.info("user roles revoked for %s" % ", ".join(user_ids))
1793
1794    def get_user_list(self):
1795        """
1796        :return:  a list of {'id': 'userid', 'name': 'some_name ,
1797        'password': 'passw0rd'}
1798        """
1799        user_list = []
1800        for user in self.inp_users:
1801            user_list.append({att: user[att] for att in ('id', 'name', 'password')})
1802        return user_list
1803
1804    def get_user_role_list(self):
1805        """
1806        :return:  a list of {'id': 'userid', 'name': 'some_name ,
1807         'roles': 'admin:fts_admin[default]'}
1808        """
1809        user_role_list = []
1810        for user in self.inp_users:
1811            user_role_list.append({att: user[att] for att in ('id', 'name', 'roles')})
1812        return user_role_list
1813
1814    def retrieve_roles(self):
1815        return self.retrieve_rbac('roles')
1816
1817    def retrieve_users(self):
1818        return self.retrieve_rbac('users')
1819
1820    def retrieve_rbac(self, type):
1821        if type == 'users':
1822            url = "/settings/rbac/users"
1823            prepend = " Retrieve User Roles"
1824        if type == 'roles':
1825            url = "/settings/rbac/roles"
1826            prepend = " Retrieve all User roles"
1827        rest = RestConnection(self.master)
1828        api = rest.baseUrl + url
1829        status, content, header = rest._http_request(api, 'GET')
1830        self.log.info("{3} - Status - {0} -- Content - {1} -- Header - {2}".format(status, content, header, prepend))
1831        return status, content, header
1832
1833    def grant_role(self, role=None):
1834        if not role:
1835            role = self.roles[0]['roles']
1836        if self.all_buckets:
1837            list = []
1838            for bucket in self.buckets:
1839                list.append(bucket.name)
1840            names = ','.join(list)
1841            self.query = "GRANT {0} on {1} to {2}".format(role,names, self.users[0]['id'])
1842            actual_result = self.run_cbq_query()
1843        elif "," in role:
1844            roles = role.split(",")
1845            for role in roles:
1846                role1 = role.split("(")[0]
1847                name = role.split("(")[1][:-1]
1848                self.query = "GRANT {0} on {1} to {2}".format(role1,name, self.users[0]['id'])
1849                actual_result =self.run_cbq_query()
1850        elif "(" in role:
1851            role1 = role.split("(")[0]
1852            name = role.split("(")[1][:-1]
1853            self.query = "GRANT {0} on {1} to {2}".format(role1,name, self.users[0]['id'])
1854            actual_result = self.run_cbq_query()
1855        else:
1856            self.query = "GRANT {0} to {1}".format(role, self.users[0]['id'])
1857            actual_result = self.run_cbq_query()
1858        msg = "Unable to grant role {0} to {1}".format(role, self.users[0]['id'])
1859        self.assertTrue(actual_result['status'] == 'success', msg)
1860
1861    def revoke_role(self, role=None):
1862        if not role:
1863            role = self.roles[0]['roles']
1864            if self.all_buckets:
1865                role += "(`*`)"
1866        self.query = "REVOKE {0} FROM {1}".format(role, self.users[0]['id'])
1867        actual_result = self.run_cbq_query()
1868        msg = "Unable to revoke role {0} from {1}".format(role, self.users[0]['id'])
1869        self.assertTrue(actual_result['status'] == 'success', msg)
1870
1871    def curl_with_roles(self, query):
1872        shell = RemoteMachineShellConnection(self.master)
1873        cmd = "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement={3}'". \
1874            format(self.users[0]['id'], self.users[0]['password'], self.master.ip, query, self.curl_path)
1875        output, error = shell.execute_command(cmd)
1876        shell.log_command_output(output, error)
1877        new_list = [string.strip() for string in output]
1878        concat_string = ''.join(new_list)
1879        json_output = json.loads(concat_string)
1880        try:
1881            return json_output
1882        except ValueError:
1883            return error
1884
1885    def system_catalog_helper_select(self, test, role=""):
1886        res = self.curl_with_roles('select * from system:datastores')
1887        self.assertTrue(res['metrics']['resultCount'] == 1)
1888        res = self.curl_with_roles('select * from system:namespaces')
1889        self.assertTrue(res['metrics']['resultCount'] == 1)
1890        res = self.curl_with_roles('select * from system:keyspaces')
1891
1892        if role in ["query_update(default)", "query_delete(default)", "query_insert(default)"]:
1893            self.assertTrue(res['status'] == 'success')
1894        elif role.startswith("query_") or role.startswith("select") or role in ["bucket_full_access(default)", "query_delete(default)"]:
1895            self.assertTrue(res['metrics']['resultCount'] == 1)
1896        else:
1897            self.assertTrue(res['metrics']['resultCount'] == 2)
1898
1899        self.query = 'create primary index on {0}'.format(self.buckets[0].name)
1900        try:
1901            self.curl_with_roles(self.query)
1902        except Exception, ex:
1903            self.log.error(ex)
1904
1905        if role not in ["query_insert(default)", "query_update(default)", "query_delete(default)"]:
1906            self.query = 'create primary index on {0}'.format(self.buckets[1].name)
1907            try:
1908                self.curl_with_roles(self.query)
1909            except Exception, ex:
1910                self.log.error(ex)
1911
1912        if role not in ["views_admin(standard_bucket0)", "views_admin(default)", "query_insert(default)",
1913                        "query_update(default)", "query_delete(default)"]:
1914            self.query = 'create index idx1 on {0}(name)'.format(self.buckets[0].name)
1915            res = self.curl_with_roles(self.query)
1916            self.sleep(10)
1917            self.query = 'create index idx2 on {0}(name)'.format(self.buckets[1].name)
1918            self.curl_with_roles(self.query)
1919            self.sleep(10)
1920            self.query = 'select * from system:indexes'
1921            res = self.curl_with_roles(self.query)
1922
1923        if role in ["admin", "cluster_admin", "bucket_admin"]:
1924            self.assertTrue(res['metrics']['resultCount'] == 4)
1925        elif role in ["bucket_admin(default)", "bucket_admin(standard_bucket0)", "query_system_catalog", "ro_admin", "replication_admin"]:
1926            self.assertTrue(res['status'] == 'success')
1927
1928        self.query = 'select * from system:dual'
1929        res = self.curl_with_roles(self.query)
1930        self.assertTrue(res['metrics']['resultCount'] == 1)
1931        self.query = 'select * from system:user_info'
1932        res = self.curl_with_roles(self.query)
1933
1934        if role == "admin":
1935            self.assertTrue(res['status'] == 'success')
1936        elif role == "cluster_admin":
1937            self.assertTrue(str(res).find("'code': 13014") != -1)
1938
1939        self.query = 'select * from system:nodes'
1940        res = self.curl_with_roles(self.query)
1941
1942        if role == "bucket_full_access(default)":
1943            self.assertTrue(res['status'] == 'stopped')
1944        elif role in ["select(default)", "query_select(default)", "select(standard_bucket0)", "query_select(standard_bucket0)"]:
1945            self.assertTrue(str(res).find("'code': 13014") != -1)
1946        elif role in ["insert(default)", "query_insert(default)", "query_update(default)", "query_delete(default)"]:
1947            self.assertTrue(res['status'] == 'stopped')
1948        else:
1949            self.assertTrue(res['status'] == 'success')
1950
1951        self.query = 'select * from system:applicable_roles'
1952        res = self.curl_with_roles(self.query)
1953
1954        if role == "admin":
1955            self.assertTrue(res['status'] == 'success')
1956        elif role == "ro_admin":
1957            self.assertTrue(res['status'] == 'success')
1958        elif role == "cluster_admin" or role == "bucket_admin(default)":
1959            self.assertTrue(str(res).find("'code': 13014") != -1)
1960
1961        # if (role == "query_insert(default)" or role == "query_delete(default)" or role
1962        # == "query_update(default)"):
1963        #     self.assertTrue(res['status']=='stopped')
1964        # elif(role == "bucket_admin(standard_bucket0)" or role == "views_admin(
1965        # standard_bucket0)" or role == "views_admin(default)" or role == "views_admin"
1966        # or role == "replication_admin" or role == "query_system_catalog" or role ==
1967        # "ro_admin"):
1968        #     self.assertTrue(str(res).find("'code': 13014")!=-1)
1969        # else:
1970        #     self.assertTrue(res['metrics']['resultCount']> 0)
1971        if role not in ["ro_admin", "replication_admin", "query_insert(default)", "query_delete(default)",
1972                        "query_update(default)", "bucket_full_access(default)", "query_system_catalog", "views_admin(default)"]:
1973            self.query = "prepare st1 from select * from default union select * from default union select * from default"
1974            res = self.curl_with_roles(self.query)
1975            self.query = 'execute st1'
1976            res = self.curl_with_roles(self.query)
1977            if role in ["bucket_admin(standard_bucket0)", "views_admin(standard_bucket0)", "replication_admin"]:
1978                self.assertTrue(str(res).find("'code': 4040") != -1)
1979            elif role == "select(default)" or role == "query_select(default)":
1980                self.assertTrue(res['metrics']['resultCount'] == 0)
1981            else:
1982                self.assertTrue(res['status'] == 'success')
1983
1984            if role not in ["query_insert(default)", "query_delete(default)", "query_update(default)"]:
1985                self.query = "prepare st2 from select * from default union select * from " \
1986                             "standard_bucket0 union select * from default"
1987                res = self.curl_with_roles(self.query)
1988
1989                if role in ["bucket_admin(standard_bucket0)", "views_admin(standard_bucket0)",
1990                            "views_admin(default)", "views_admin", "bucket_admin(default)", "replication_admin",
1991                            "query_system_catalog", "select(default)", "query_select(default)"]:
1992                    self.assertTrue(str(res).find("'code': 13014") != -1)
1993                else:
1994                    self.assertTrue(res['metrics']['resultCount'] > 0)
1995
1996                self.query = 'execute st2'
1997                res = self.curl_with_roles(self.query)
1998                if role in ["bucket_admin(standard_bucket0)", "views_admin(standard_bucket0)", "views_admin(default)",
1999                            "views_admin", "bucket_admin(default)", "replication_admin", "query_system_catalog",
2000                            "select(default)", "query_select(default)"]:
2001                    self.assertTrue(str(res).find("'code': 4040") != -1)
2002                else:
2003                    self.assertTrue(res['status'] == 'success')
2004
2005                self.query = 'select * from system:completed_requests'
2006                res = self.curl_with_roles(self.query)
2007
2008                if role == "select(default)" or role == "query_select(default)":
2009                    self.assertTrue(str(res).find("'code': 13014") != -1)
2010                elif role == "bucket_admin(standard_bucket0)":
2011                    self.assertTrue(res['metrics']['resultCount'] > 0)
2012                else:
2013                    self.assertTrue(res['status'] == 'success')
2014
2015        if role not in ["query_insert(default)", "query_delete(default)", "query_update(default)",
2016                        "bucket_full_access(default)", "ro_admin"]:
2017            self.query = 'select * from system:prepareds'
2018            res = self.curl_with_roles(self.query)
2019
2020            if role == "select(default)" or role == "query_select(default)":
2021                self.assertTrue(str(res).find("'code': 13014") != -1)
2022            else:
2023                self.assertTrue(res['status'] == 'success')
2024
2025            self.query = 'select * from system:active_requests'
2026            res = self.curl_with_roles(self.query)
2027
2028            if role == "select(default)" or role == "query_select(default)":
2029                self.assertTrue(str(res).find("'code': 13014") != -1)
2030            else:
2031                self.assertTrue(res['metrics']['resultCount'] > 0)
2032
2033            self.query = 'drop index {0}.idx1'.format(self.buckets[0].name)
2034            res = self.curl_with_roles(self.query)
2035            self.query = 'drop index {0}.idx2'.format(self.buckets[1].name)
2036            res = self.curl_with_roles(self.query)
2037            self.query = 'select * from system:indexes'
2038            res = self.curl_with_roles(self.query)
2039
2040        if role == "views_admin(default)":
2041            self.assertTrue(res['status'] == 'success')
2042        elif role in ["bucket_admin(standard_bucket0)", "bucket_admin(default)", "select(default)", "query_select(default)"]:
2043            self.assertTrue(res['metrics']['resultCount'] == 1)
2044        elif role in ["query_insert(default)", "query_delete(default)", "query_update(default)"]:
2045            self.assertTrue(res['metrics']['resultCount'] == 0)
2046            # elif (role == "ro_admin"):
2047            #     self.assertTrue(res['metrics']['resultCount']==2)
2048
2049    def try_query_assert(self, query, find_string):
2050        try:
2051            self.curl_with_roles(query)
2052        except Exception as ex:
2053            self.log.error(ex)
2054            self.assertTrue(str(ex).find(find_string) != -1)
2055
2056    def system_catalog_helper_insert(self, test, role=""):
2057        self.try_query_assert('insert into system:datastores values("k051", { "id":123  } )', "System datastore :  Not implemented ")
2058        self.try_query_assert('insert into system:namespaces values("k051", { "id":123  } )', "System datastore :  Not implemented ")
2059        self.try_query_assert('insert into system:keyspaces values("k051", { "id":123  } )', "System datastore :  Not implemented ")
2060        self.try_query_assert('insert into system:indexes values("k051", { "id":123  } )', "System datastore :  Not implemented ")
2061        self.try_query_assert('insert into system:dual values("k051", { "id":123  } )', "System datastore error Mutations not allowed on system:dual.")
2062        self.try_query_assert('insert into system:user_info values("k051", { "id":123  } )', "System datastore :  Not implemented ")
2063        self.try_query_assert('insert into system:nodes values("k051", { "id":123  } )', "System datastore :  Not implemented ")
2064        self.try_query_assert('insert into system:applicable_roles values("k051", { "id":123  } )', "System datastore :  Not implemented ")
2065        self.try_query_assert('insert into system:prepareds values("k051", { "id":123  } )', "System datastore :  Not implemented ")
2066        self.try_query_assert('insert into system:completed_requests values("k051", { "id":123  } )', "System datastore :  Not implemented ")
2067        self.try_query_assert('insert into system:active_requests values("k051", { "id":123  } )', "System datastore :  Not implemented ")
2068
2069    def system_catalog_helper_update(self, test, role=""):
2070        self.try_query_assert('update system:datastores use keys "%s" set name="%s"' % ("id", "test"), "'code': 11000")
2071        self.try_query_assert('update system:namespaces use keys "%s" set name="%s"' % ("id", "test"), "'code': 11003")
2072        self.try_query_assert('update system:keyspaces use keys "%s" set name="%s"' % ("id", "test"), "'code': 11003")
2073        self.try_query_assert('update system:indexes use keys "%s" set name="%s"' % ("id", "test"), "'code': 11003")
2074        self.try_query_assert('update system:dual use keys "%s" set name="%s"' % ("id", "test"), "'code': 11003")
2075        self.try_query_assert('update system:user_info use keys "%s" set name="%s"' % ("id", "test"), "'code': 5200")
2076        self.try_query_assert('update system:nodes use keys "%s" set name="%s"' % ("id", "test"), "'code': 11003}")
2077        # panic seen here as of now,hence commenting it out for now.
2078        self.try_query_assert('update system:applicable_roles use keys "%s" set name="%s"' % ("id", "test"), "'code': 11000")
2079        self.try_query_assert('update system:active_requests use keys "%s" set name="%s"' % ("id", "test"), "'code': 11000")
2080        self.try_query_assert('update system:completed_requests use keys "%s" set name="%s"' % ("id", "test"), "'code': 11000")
2081        self.try_query_assert('update system:prepareds use keys "%s" set name="%s"' % ("id", "test"), "'code': 11000")
2082
2083    # Query does not support drop these tables or buckets yet.We can add the test once it
2084    #  is supported.
2085    # Right now we cannot compare results in assert.
2086    # def system_catalog_helper_drop(self,query_params_with_roles,test = ""):
2087    #     self.query = 'drop system:datastores'
2088    #     res = self.run_cbq_query()
2089    #     print res
2090    #     self.query = 'drop system:namespaces'
2091    #     res = self.run_cbq_query()
2092    #     print res
2093    #     self.query = 'drop system:keyspaces'
2094    #     res = self.run_cbq_query()
2095    #     print res
2096    #     self.query = 'drop system:indexes'
2097    #     res = self.run_cbq_query()
2098    #     print res
2099    #     self.query = 'drop system:dual'
2100    #     res = self.run_cbq_query()
2101    #     print res
2102    #     self.query = 'drop system:user_info'
2103    #     res = self.run_cbq_query()
2104    #     print res
2105    #     self.query = 'drop system:nodes'
2106    #     res = self.run_cbq_query()
2107    #     print res
2108    #     self.query = 'drop system:applicable_roles'
2109    #     res = self.run_cbq_query()
2110    #     print res
2111    #     self.query = 'drop system:prepareds'
2112    #     res = self.run_cbq_query()
2113    #     print res
2114    #     self.query = 'drop system:completed_requests'
2115    #     res = self.run_cbq_query()
2116    #     print res
2117    #     self.query = 'drop system:active_requests'
2118    #     res = self.run_cbq_query()
2119    #     print res
2120
2121    def query_with_roles(self, query, find_string):
2122        self.query = query
2123        res = self.curl_with_roles(self.query)
2124        self.assertTrue(str(res).find(find_string) != -1)
2125
2126    def system_catalog_helper_delete(self, test, role="admin"):
2127        self.query_with_roles('delete from system:datastores', "'code': 11003")
2128        self.query_with_roles('delete from system:namespaces', "'code': 11003")
2129        # To be fixed in next version
2130        # self.query_with_roles('delete from system:keyspaces', "'code': 11003")
2131        self.query_with_roles('delete from system:indexes', "'code': 11003")
2132        self.query_with_roles('delete from system:dual', "'code': 11000")
2133
2134        self.query_with_roles('delete from system:user_info', "'code': 11003")
2135        self.query_with_roles('delete from system:nodes', "'code': 11003")
2136        self.query_with_roles('delete from system:applicable_roles', "'code': 11003")
2137        self.query = 'delete from system:completed_requests'
2138        res = self.curl_with_roles(self.query)
2139        role_list = ["query_delete(default)", "query_delete(standard_bucket0)", "delete(default)", "bucket_full_access(default)"]
2140        self.assertNotEquals(res['status'], 'success') if role in role_list else self.assertTrue(res['status'] == 'success')
2141        self.query = 'delete from system:active_requests'
2142        res = self.curl_with_roles(self.query)
2143        self.assertTrue(res['status'] == 'stopped')
2144        if role not in role_list:
2145            self.query = 'delete from system:prepareds'
2146            res = self.curl_with_roles(self.query)
2147            self.assertTrue(res['status'] == 'success')
2148
2149    def select_my_user_info(self):
2150        self.query = 'select * from system:my_user_info'
2151        res = self.curl_with_roles(self.query)
2152        self.assertTrue(res['status'] == 'success')
2153
2154##############################################################################################
2155#
2156#  tuq_curl.py and tuq_curl_whitelist.py helpers
2157#
2158##############################################################################################
2159
2160    '''Convert output of remote_util.execute_commands_inside to json'''
2161    def convert_to_json(self, output_curl):
2162        new_curl = "{" + output_curl
2163        json_curl = json.loads(new_curl)
2164        return json_curl
2165
2166    '''Convert output of remote_util.execute_command to json
2167       (stripping all white space to match execute_command_inside output)'''
2168    def convert_list_to_json(self, output_of_curl):
2169        new_list = [string.replace(" ", "") for string in output_of_curl]
2170        concat_string = ''.join(new_list)
2171        json_output = json.loads(concat_string)
2172        return json_output
2173
2174    '''Convert output of remote_util.execute_command to json to match the output of run_cbq_query'''
2175    def convert_list_to_json_with_spacing(self,output_of_curl):
2176        new_list = [string.strip() for string in output_of_curl]
2177        concat_string = ''.join(new_list)
2178        json_output = json.loads(concat_string)
2179        return json_output
2180
2181##############################################################################################
2182#
2183#  tuq_ascdesc.py helper
2184#
2185##############################################################################################
2186    def compare(self, test, query, expected_result_list):
2187        actual_result_list = []
2188        actual_result = self.run_cbq_query(query)
2189        for i in xrange(0, 5):
2190            if test in ["test_asc_desc_composite_index", "test_meta", "test_asc_desc_array_index"]:
2191                actual_result_list.append(actual_result['results'][i]['default']['_id'])
2192            elif test in ["test_desc_isReverse_ascOrder"]:
2193                actual_result_list.append(actual_result['results'][i]['id'])
2194        self.assertEqual(actual_result_list, expected_result_list)
2195        query = query.replace("from default", "from default use index(`#primary`)")
2196        expected_result = self.run_cbq_query(query)
2197        self.assertEqual(actual_result['results'], expected_result['results'])
2198
2199##############################################################################################
2200#
2201#  tuq_advancedcbqshell.py helpers
2202#
2203##############################################################################################
2204    def execute_commands_inside(self, main_command, query, queries, bucket1, password, bucket2, source,
2205                                subcommands=[], min_output_size=0,
2206                                end_msg='', timeout=250):
2207        shell = RemoteMachineShellConnection(self.master)
2208        shell.extract_remote_info()
2209        filename = "/tmp/test2"
2210        iswin = False
2211
2212        if shell.info.type.lower() == 'windows':
2213            iswin = True
2214            filename = "/cygdrive/c/tmp/test.txt"
2215
2216        filedata = ""
2217        if not (query == ""):
2218            main_command = main_command + " -s=\"" + query + '"'
2219        elif (shell.remote and not (queries == "")):
2220            sftp = shell._ssh_client.open_sftp()
2221            filein = sftp.open(filename, 'w')
2222            for query in queries:
2223                filein.write(query)
2224                filein.write('\n')
2225            fileout = sftp.open(filename, 'r')
2226            filedata = fileout.read()
2227            fileout.close()
2228        elif not (queries == ""):
2229            f = open(filename, 'w')
2230            for query in queries:
2231                f.write(query)
2232                f.write('\n')
2233            f.close()
2234            fileout = open(filename, 'r')
2235            filedata = fileout.read()
2236            fileout.close()
2237
2238        newdata = filedata.replace("bucketname", bucket2)
2239        newdata = newdata.replace("user", bucket1)
2240        newdata = newdata.replace("pass", password)
2241        newdata = newdata.replace("bucket1", bucket1)
2242
2243        newdata = newdata.replace("user1", bucket1)
2244        newdata = newdata.replace("pass1", password)
2245        newdata = newdata.replace("bucket2", bucket2)
2246        newdata = newdata.replace("user2", bucket2)
2247        newdata = newdata.replace("pass2", password)
2248
2249        if (shell.remote and not (queries == "")):
2250            f = sftp.open(filename, 'w')
2251            f.write(newdata)
2252            f.close()
2253        elif not (queries == ""):
2254            f = open(filename, 'w')
2255            f.write(newdata)
2256            f.close()
2257        if not (queries == ""):
2258            if (source):
2259                if iswin:
2260                    main_command = main_command + "  -s=\"\SOURCE " + 'c:\\\\tmp\\\\test.txt'
2261                else:
2262                    main_command = main_command + "  -s=\"\SOURCE " + filename + '"'
2263            else:
2264                if iswin:
2265                    main_command = main_command + " -f=" + 'c:\\\\tmp\\\\test.txt'
2266                else:
2267                    main_command = main_command + " -f=" + filename
2268
2269        self.log.info("running command on {0}: {1}".format(self.master.ip, main_command))
2270        output = ""
2271        if shell.remote:
2272            stdin, stdout, stderro = shell._ssh_client.exec_command(main_command)
2273            time.sleep(20)
2274            count = 0
2275            for line in stdout.readlines():
2276                if (count >= 0):
2277                    output += line.strip()
2278                    output = output.strip()
2279                    if "Inputwasnotastatement" in output:
2280                        output = "status:FAIL"
2281                        break
2282                    if "timeout" in output:
2283                        output = "status:timeout"
2284                else:
2285                    count += 1
2286            stdin.close()
2287            stdout.close()
2288            stderro.close()
2289        else:
2290            p = Popen(main_command, shell=True, stdout=PIPE, stderr=PIPE)
2291            stdout, stderro = p.communicate()
2292            output = stdout
2293            print output
2294            time.sleep(1)
2295        if (shell.remote and not (queries == "")):
2296            sftp.remove(filename)
2297            sftp.close()
2298        elif not (queries == ""):
2299            os.remove(filename)
2300
2301        return (output)
2302
2303##############################################################################################
2304#
2305#  date_time_functions.py helpers
2306#   These are very specific to this testing, should probably go back
2307##############################################################################################
2308
2309    def _generate_date_part_millis_query(self, expression, part, timezone=None):
2310        if not timezone:
2311            query = 'SELECT DATE_PART_MILLIS({0}, "{1}")'.format(expression, part)
2312        else:
2313            query = 'SELECT DATE_PART_MILLIS({0}, "{1}", "{2}")'.format(expression, part, timezone)
2314        return query
2315
2316    def _generate_date_format_str_query(self, expression, format):
2317        query = 'SELECT DATE_FORMAT_STR("{0}", "{1}")'.format(expression, format)
2318        return query
2319
2320    def _generate_date_range_str_query(self, initial_date, final_date, part, increment=None):
2321        if increment is None:
2322            query = 'SELECT DATE_RANGE_STR("{0}", "{1}", "{2}")'.format(initial_date, final_date, part)
2323        else:
2324            query = 'SELECT DATE_RANGE_STR("{0}", "{1}", "{2}", {3})'.format(initial_date, final_date, part, increment)
2325        return query
2326
2327    def _generate_date_range_millis_query(self, initial_millis, final_millis, part, increment=None):
2328        if increment is None:
2329            query = 'SELECT DATE_RANGE_MILLIS({0}, {1}, "{2}")'.format(initial_millis, final_millis, part)
2330        else:
2331            query = 'SELECT DATE_RANGE_MILLIS({0}, {1}, "{2}", {3})'.format(initial_millis, final_millis, part, increment)
2332        return query
2333
2334    def _convert_to_millis(self, expression):
2335        query = 'SELECT STR_TO_MILLIS("{0}")'.format(expression)
2336        results = self.run_cbq_query(query)
2337        return results["results"][0]["$1"]
2338
2339    def _is_date_part_present(self, expression):
2340        return (len(expression.split("-")) > 1)
2341
2342    def _is_time_part_present(self, expression):
2343        return (len(expression.split(":")) > 1)
2344
2345##############################################################################################
2346#
2347#  n1ql_options.py helpers
2348#
2349##############################################################################################
2350    def curl_helper(self, statement):
2351        cmd = "{4} -u {0}:{1} http://{2}:8093/query/service -d 'statement={3}'". \
2352            format('Administrator', 'password', self.master.ip, statement, self.curl_path)
2353        return self.run_helper_cmd(cmd)
2354
2355    def prepare_helper(self, statement):
2356        cmd = '{4} -u {0}:{1} http://{2}:8093/query/service -d \'prepared="{3}"&$type="Engineer"&$name="employee-4"\''. \
2357            format('Administrator', 'password', self.master.ip, statement, self.curl_path)
2358        return self.run_helper_cmd(cmd)
2359
2360    def prepare_helper2(self, statement):
2361        cmd = '{4} -u {0}:{1} http://{2}:8093/query/service -d \'prepared="{3}"&args=["Engineer","employee-4"]\''. \
2362            format('Administrator', 'password', self.master.ip, statement, self.curl_path)
2363        return self.run_helper_cmd(cmd)
2364
2365    def run_helper_cmd(self, cmd):
2366        shell = RemoteMachineShellConnection(self.master)
2367        output, error = shell.execute_command(cmd)
2368        new_list = [string.strip() for string in output]
2369        concat_string = ''.join(new_list)
2370        json_output = json.loads(concat_string)
2371        return json_output
2372
2373##############################################################################################
2374#
2375#  n1ql_ro_user.py helpers
2376#
2377##############################################################################################
2378    def _kill_all_processes_cbq(self):
2379        if hasattr(self, 'shell'):
2380            o = self.shell.execute_command("ps -aef| grep cbq-engine")
2381            if len(o):
2382                for cbq_engine in o[0]:
2383                    if cbq_engine.find('grep') == -1:
2384                        pid = [item for item in cbq_engine.split(' ') if item][1]
2385                        self.shell.execute_command("kill -9 %s" % pid)
2386
2387##############################################################################################
2388#
2389#   tuq_views_ops.py helpers
2390##############################################################################################
2391
2392    def _compare_view_and_tool_result(self,