1import paramiko
2import logger
3import json
4import uuid
5import math
6import re
7import time
8import testconstants
9import datetime
10import time
11from datetime import date
12from couchbase_helper.tuq_generators import TuqGenerators
13from remote.remote_util import RemoteMachineShellConnection
14from membase.api.exception import CBQError, ReadDocumentException
15from membase.api.rest_client import RestConnection
16
17class AnalyticsHelper():
18    def __init__(self, version = None, master = None, shell = None, use_rest = None, max_verify = 0, buckets = [],
19        item_flag = 0,  analytics_port = 8095, n1ql_port = 8093, full_docs_list = [], log = None, input = None,database = None):
20        self.version = version
21        self.shell = shell
22        self.n1ql_port = n1ql_port
23        self.max_verify = max_verify
24        self.buckets = buckets
25        self.item_flag = item_flag
26        self.analytics_port = analytics_port
27        self.input = input
28        self.log = log
29        self.use_rest = True
30        self.full_docs_list = full_docs_list
31        self.master = master
32        self.database = database
33        if self.full_docs_list and len(self.full_docs_list) > 0:
34            self.gen_results = TuqGenerators(self.log, self.full_docs_list)
35
36
37    def killall_tuq_process(self):
38        self.shell.execute_command("killall cbq-engine")
39        self.shell.execute_command("killall tuqtng")
40        self.shell.execute_command("killall indexer")
41
42    def run_query_from_template(self, query_template):
43        self.query = self.gen_results.generate_query(query_template)
44        expected_result = self.gen_results.generate_expected_result()
45        actual_result = self.run_analytics_query()
46        return actual_result, expected_result
47
48    def run_analytics_query(self, query=None, min_output_size=10, server=None, query_params = {}, is_prepared=False, scan_consistency = None, scan_vector = None, verbose= False):
49        if query is None:
50            query = self.query
51        if server is None:
52           server = self.master
53           if server.ip == "127.0.0.1":
54            self.analytics_port = server.analytics_port
55        else:
56            if server.ip == "127.0.0.1":
57                self.analytics_port = server.analytics_port
58            if self.input.tuq_client and "client" in self.input.tuq_client:
59                server = self.tuq_client
60        if self.analytics_port == None or self.analytics_port == '':
61            self.analytics_port = self.input.param("analytics_port", 8095)
62            if not self.analytics_port:
63                self.log.info(" analytics_port is not defined, processing will not proceed further")
64                raise Exception("analytics_port is not defined, processing will not proceed further")
65        cred_params = {'creds': []}
66        for bucket in self.buckets:
67            if bucket.saslPassword:
68                cred_params['creds'].append({'user': 'local:%s' % bucket.name, 'pass': bucket.saslPassword})
69        query_params.update(cred_params)
70        if self.use_rest:
71            query_params = {}
72            if scan_consistency:
73                query_params['scan_consistency']=  scan_consistency
74            if scan_vector:
75                query_params['scan_vector']=  str(scan_vector).replace("'", '"')
76            if verbose:
77                self.log.info('RUN QUERY %s' % query)
78            query = query + ";"
79            if "USE INDEX" in query:
80                query = query.replace("USE INDEX(`#primary` USING GSI)"," ")
81            for bucket in self.buckets:
82                query = query.replace(bucket.name+" ",bucket.name+"_shadow ")
83
84
85            self.log.info(" CBAS QUERY :: {0}".format(query))
86            result = RestConnection(server).analytics_tool(query, self.analytics_port, query_params=query_params, verbose = verbose)
87
88        if isinstance(result, str) or 'errors' in result:
89            error_result = str(result)
90            length_display = len(error_result)
91            if length_display > 500:
92                error_result = error_result[:500]
93            raise CBQError(error_result, server.ip)
94        self.log.info("TOTAL ELAPSED TIME: %s" % result["metrics"]["elapsedTime"])
95        return result
96
97
98
99    def _verify_results(self, actual_result, expected_result, missing_count = 1, extra_count = 1):
100        self.log.info(" Analyzing Actual Result")
101        actual_result = self._gen_dict(actual_result)
102        self.log.info(" Analyzing Expected Result")
103        expected_result = self._gen_dict(expected_result)
104        if len(actual_result) != len(expected_result):
105            raise Exception("Results are incorrect.Actual num %s. Expected num: %s.\n" % (
106                                            len(actual_result), len(expected_result)))
107        msg = "The number of rows match but the results mismatch, please check"
108        if actual_result != expected_result:
109            raise Exception(msg)
110
111    def _verify_results_rqg_new(self, n1ql_result = [], sql_result = [], hints = ["a1"]):
112        new_n1ql_result = []
113        for result in n1ql_result:
114            if result != {}:
115                for key in result.keys():
116                    if key.find('_shadow') != -1:
117                        new_n1ql_result.append(result[key])
118                    else:
119                        new_n1ql_result.append(result)
120                        break
121        n1ql_result = new_n1ql_result
122        if self._is_function_in_result(hints):
123            return self._verify_results_rqg_for_function(n1ql_result, sql_result)
124        check = self._check_sample(n1ql_result, hints)
125        actual_result = n1ql_result
126        if actual_result == [{}]:
127            actual_result = []
128        if check:
129            actual_result = self._gen_dict(n1ql_result)
130        actual_result = sorted(actual_result)
131        expected_result = sorted(sql_result)
132        if len(actual_result) != len(expected_result):
133            extra_msg = self._get_failure_message(expected_result, actual_result)
134            raise Exception("Results are incorrect.Actual num %s. Expected num: %s.:: %s \n" % (
135                                            len(actual_result), len(expected_result), extra_msg))
136        msg = "The number of rows match but the results mismatch, please check"
137        if self._sort_data(actual_result) != self._sort_data(expected_result):
138            extra_msg = self._get_failure_message(expected_result, actual_result)
139            raise Exception(msg+"\n "+extra_msg)
140
141    def _verify_results_rqg(self, n1ql_result = [], sql_result = [], hints = ["a1"]):
142        new_n1ql_result = []
143        for result in n1ql_result:
144            if result != {}:
145                new_n1ql_result.append(result)
146        n1ql_result = new_n1ql_result
147        if self._is_function_in_result(hints):
148            return self._verify_results_rqg_for_function(n1ql_result, sql_result)
149        check = self._check_sample(n1ql_result, hints)
150        actual_result = n1ql_result
151        if actual_result == [{}]:
152            actual_result = []
153        if check:
154            actual_result = self._gen_dict(n1ql_result)
155        actual_result = sorted(actual_result)
156        expected_result = sorted(sql_result)
157        if len(actual_result) != len(expected_result):
158            extra_msg = self._get_failure_message(expected_result, actual_result)
159            raise Exception("Results are incorrect.Actual num %s. Expected num: %s.:: %s \n" % (
160                                            len(actual_result), len(expected_result), extra_msg))
161        msg = "The number of rows match but the results mismatch, please check"
162        if self._sort_data(actual_result) != self._sort_data(expected_result):
163            extra_msg = self._get_failure_message(expected_result, actual_result)
164            raise Exception(msg+"\n "+extra_msg)
165
166    def _sort_data(self, result):
167        new_data =[]
168        for data in result:
169            new_data.append(sorted(data))
170        return new_data
171
172    def _verify_results_crud_rqg(self, n1ql_result = [], sql_result = [], hints = ["primary_key_id"]):
173        new_n1ql_result = []
174        for result in n1ql_result:
175            if result != {}:
176                new_n1ql_result.append(result)
177        n1ql_result = new_n1ql_result
178        if self._is_function_in_result(hints):
179            return self._verify_results_rqg_for_function(n1ql_result, sql_result)
180        check = self._check_sample(n1ql_result, hints)
181        actual_result = n1ql_result
182        if actual_result == [{}]:
183            actual_result = []
184        if check:
185            actual_result = self._gen_dict(n1ql_result)
186        actual_result = sorted(actual_result)
187        expected_result = sorted(sql_result)
188        if len(actual_result) != len(expected_result):
189            extra_msg = self._get_failure_message(expected_result, actual_result)
190            raise Exception("Results are incorrect.Actual num %s. Expected num: %s.:: %s \n" % (
191                                            len(actual_result), len(expected_result), extra_msg))
192        if not self._result_comparison_analysis(actual_result,expected_result) :
193            msg = "The number of rows match but the results mismatch, please check"
194            extra_msg = self._get_failure_message(expected_result, actual_result)
195            raise Exception(msg+"\n "+extra_msg)
196
197    def _get_failure_message(self, expected_result, actual_result):
198        if expected_result == None:
199            expected_result = []
200        if actual_result == None:
201            actual_result = []
202        len_expected_result = len(expected_result)
203        len_actual_result = len(actual_result)
204        len_expected_result = min(5,len_expected_result)
205        len_actual_result = min(5,len_actual_result)
206        extra_msg = "mismatch in results :: expected :: {0}, actual :: {1} ".format(expected_result[0:len_expected_result], actual_result[0:len_actual_result])
207        return extra_msg
208
209    def _result_comparison_analysis(self, expected_result, actual_result):
210        expected_map ={}
211        actual_map ={}
212        for data in expected_result:
213            primary=None
214            for key in data.keys():
215                keys = key
216                if keys.encode('ascii') == "primary_key_id":
217                    primary = keys
218            expected_map[data[primary]] = data
219        for data in actual_result:
220            primary=None
221            for key in data.keys():
222                keys = key
223                if keys.encode('ascii') == "primary_key_id":
224                    primary = keys
225            actual_map[data[primary]] = data
226        check = True
227        for key in expected_map.keys():
228            if sorted(actual_map[key]) != sorted(expected_map[key]):
229                check= False
230        return check
231
232    def _analyze_for_special_case_using_func(self, expected_result, actual_result):
233        if expected_result == None:
234            expected_result = []
235        if actual_result == None:
236            actual_result = []
237        if len(expected_result) == 1:
238            value = expected_result[0].values()[0]
239            if value == None or value == 0:
240                expected_result = []
241        if len(actual_result) == 1:
242            value = actual_result[0].values()[0]
243            if value == None or value == 0:
244                actual_result = []
245        return expected_result, actual_result
246
247    def _is_function_in_result(self, result):
248        if result == "FUN":
249            return True
250        return False
251
252    def _verify_results_rqg_for_function(self, n1ql_result = [], sql_result = [], hints = ["a1"]):
253        actual_count = -1
254        expected_count = -1
255        actual_result = n1ql_result
256        sql_result, actual_result= self._analyze_for_special_case_using_func(sql_result, actual_result)
257        if len(sql_result) != len(actual_result):
258            msg = "the number of results do not match :: expected = {0}, actual = {1}".format(len(n1ql_result), len(sql_result))
259            extra_msg = self._get_failure_message(sql_result, actual_result)
260            raise Exception(msg+"\n"+extra_msg)
261        n1ql_result = self._gen_dict_n1ql_func_result(n1ql_result)
262        n1ql_result = sorted(n1ql_result)
263        sql_result = self._gen_dict_n1ql_func_result(sql_result)
264        sql_result = sorted(sql_result)
265        if  len(sql_result) == 0 and len(actual_result) == 0:
266            return
267        if sql_result != n1ql_result:
268            max = 2
269            if len(sql_result) < 5:
270                max = len(sql_result)
271            msg = "mismatch in results :: expected [0:{0}]:: {1}, actual [0:{0}]:: {2} ".format(max, sql_result[0:max], n1ql_result[0:max])
272            raise Exception(msg)
273
274    def _convert_to_number(self, val):
275        if not isinstance(val, str):
276            return val
277        value = -1
278        try:
279            if value == '':
280                return 0
281            value = int(val.split("(")[1].split(")")[0])
282        except Exception, ex:
283            self.log.info(ex)
284        finally:
285            return value
286
287    def analyze_failure(self, actual, expected):
288        missing_keys =[]
289        different_values = []
290        for key in expected.keys():
291            if key not in actual.keys():
292                missing_keys.append(key)
293            if expected[key] != actual[key]:
294                different_values.append("for key {0}, expected {1} \n actual {2}".
295                    format(key, expected[key], actual[key]))
296        self.log.info(missing_keys)
297        if(len(different_values) > 0):
298            self.log.info(" number of such cases {0}".format(len(different_values)))
299            self.log.info(" example key {0}".format(different_values[0]))
300
301    def check_missing_and_extra(self, actual, expected):
302        missing = []
303        extra = []
304        for item in actual:
305            if not (item in expected):
306                 extra.append(item)
307        for item in expected:
308            if not (item in actual):
309                missing.append(item)
310        return missing, extra
311
312    def build_url(self, version):
313        info = self.shell.extract_remote_info()
314        type = info.distribution_type.lower()
315        if type in ["ubuntu", "centos", "red hat"]:
316            url = "https://s3.amazonaws.com/packages.couchbase.com/releases/couchbase-query/dp1/"
317            url += "couchbase-query_%s_%s_linux.tar.gz" %(
318                                version, info.architecture_type)
319        #TODO for windows
320        return url
321
322    def _restart_indexer(self):
323        couchbase_path = "/opt/couchbase/var/lib/couchbase"
324        cmd = "rm -f {0}/meta;rm -f /tmp/log_upr_client.sock".format(couchbase_path)
325        self.shell.execute_command(cmd)
326
327    def _start_command_line_query(self, server):
328        self.shell = RemoteMachineShellConnection(server)
329        self._set_env_variable(server)
330        if self.version == "git_repo":
331            os = self.shell.extract_remote_info().type.lower()
332            if os != 'windows':
333                gopath = testconstants.LINUX_GOPATH
334            else:
335                gopath = testconstants.WINDOWS_GOPATH
336            if self.input.tuq_client and "gopath" in self.input.tuq_client:
337                gopath = self.input.tuq_client["gopath"]
338            if os == 'windows':
339                cmd = "cd %s/src/github.com/couchbase/query/server/main; " % (gopath) +\
340                "./cbq-engine.exe -datastore http://%s:%s/ >/dev/null 2>&1 &" %(
341                                                                server.ip, server.port)
342            else:
343                cmd = "cd %s/src/github.com/couchbase/query//server/main; " % (gopath) +\
344                "./cbq-engine -datastore http://%s:%s/ >n1ql.log 2>&1 &" %(
345                                                                server.ip, server.port)
346            self.shell.execute_command(cmd)
347        elif self.version == "sherlock":
348            os = self.shell.extract_remote_info().type.lower()
349            if os != 'windows':
350                couchbase_path = testconstants.LINUX_COUCHBASE_BIN_PATH
351            else:
352                couchbase_path = testconstants.WIN_COUCHBASE_BIN_PATH
353            if self.input.tuq_client and "sherlock_path" in self.input.tuq_client:
354                couchbase_path = "%s/bin" % self.input.tuq_client["sherlock_path"]
355                print "PATH TO SHERLOCK: %s" % couchbase_path
356            if os == 'windows':
357                cmd = "cd %s; " % (couchbase_path) +\
358                "./cbq-engine.exe -datastore http://%s:%s/ >/dev/null 2>&1 &" %(
359                                                                server.ip, server.port)
360            else:
361                cmd = "cd %s; " % (couchbase_path) +\
362                "./cbq-engine -datastore http://%s:%s/ >n1ql.log 2>&1 &" %(
363                                                                server.ip, server.port)
364                n1ql_port = self.input.param("n1ql_port", None)
365                if server.ip == "127.0.0.1" and server.n1ql_port:
366                    n1ql_port = server.n1ql_port
367                if n1ql_port:
368                    cmd = "cd %s; " % (couchbase_path) +\
369                './cbq-engine -datastore http://%s:%s/ -http=":%s">n1ql.log 2>&1 &' %(
370                                                                server.ip, server.port, n1ql_port)
371            self.shell.execute_command(cmd)
372        else:
373            os = self.shell.extract_remote_info().type.lower()
374            if os != 'windows':
375                cmd = "cd /tmp/tuq;./cbq-engine -couchbase http://%s:%s/ >/dev/null 2>&1 &" %(
376                                                                server.ip, server.port)
377            else:
378                cmd = "cd /cygdrive/c/tuq;./cbq-engine.exe -couchbase http://%s:%s/ >/dev/null 2>&1 &" %(
379                                                                server.ip, server.port)
380            self.shell.execute_command(cmd)
381    def _parse_query_output(self, output):
382        if output.find("cbq>") == 0:
383            output = output[output.find("cbq>") + 4:].strip()
384        if output.find("tuq_client>") == 0:
385            output = output[output.find("tuq_client>") + 11:].strip()
386        if output.find("cbq>") != -1:
387            output = output[:output.find("cbq>")].strip()
388        if output.find("tuq_client>") != -1:
389            output = output[:output.find("tuq_client>")].strip()
390        return json.loads(output)
391
392    def sort_nested_list(self, result):
393        actual_result = []
394        for item in result:
395            curr_item = {}
396            for key, value in item.iteritems():
397                if isinstance(value, list) or isinstance(value, set):
398                    curr_item[key] = sorted(value)
399                else:
400                    curr_item[key] = value
401            actual_result.append(curr_item)
402        return actual_result
403
404    def configure_gomaxprocs(self):
405        max_proc = self.input.param("gomaxprocs", None)
406        cmd = "export GOMAXPROCS=%s" % max_proc
407        for server in self.servers:
408            shell_connection = RemoteMachineShellConnection(self.master)
409            shell_connection.execute_command(cmd)
410
411    def drop_primary_index(self, using_gsi = True, server = None):
412        if server == None:
413            server = self.master
414        self.log.info("CHECK FOR PRIMARY INDEXES")
415        for bucket in self.buckets:
416            self.query = "DROP PRIMARY INDEX ON {0}".format(bucket.name)
417            if using_gsi:
418                self.query += " USING GSI"
419            if not using_gsi:
420                self.query += " USING VIEW "
421            self.log.info(self.query)
422            try:
423                check = self._is_index_in_list(bucket.name, "#primary", server = server)
424                if check:
425                    self.run_analytics_query(server = server)
426            except Exception, ex:
427                self.log.error('ERROR during index creation %s' % str(ex))
428
429    def create_primary_index(self, using_gsi = True, server = None):
430        if server == None:
431            server = self.master
432        for bucket in self.buckets:
433            self.query = "CREATE PRIMARY INDEX ON %s " % (bucket.name)
434            if using_gsi:
435                self.query += " USING GSI"
436                # if gsi_type == "memdb":
437                #     self.query += " WITH {'index_type': 'memdb'}"
438            if not using_gsi:
439                self.query += " USING VIEW "
440            try:
441                check = self._is_index_in_list(bucket.name, "#primary", server = server)
442                if not check:
443                    self.run_analytics_query(server = server)
444                    check = self.is_index_online_and_in_list(bucket.name, "#primary", server = server)
445                    if not check:
446                        raise Exception(" Timed-out Exception while building primary index for bucket {0} !!!".format(bucket.name))
447                else:
448                    raise Exception(" Primary Index Already present, This looks like a bug !!!")
449            except Exception, ex:
450                self.log.error('ERROR during index creation %s' % str(ex))
451                raise ex
452
453    def verify_index_with_explain(self, actual_result, index_name, check_covering_index= False):
454        check = True
455        if check_covering_index:
456            if "covering" in str(actual_result):
457                check = True
458            else:
459                check = False
460        if index_name in str(actual_result):
461            return True and check
462        return False
463
464    def run_query_and_verify_result(self, server = None, query = None, timeout = 120.0, max_try = 1,
465     expected_result = None, scan_consistency = None, scan_vector = None, verify_results = True):
466        check = False
467        init_time = time.time()
468        try_count = 0
469        while not check:
470            next_time = time.time()
471            try:
472                actual_result = self.run_analytics_query(query = query, server = server,
473                 scan_consistency = scan_consistency, scan_vector = scan_vector)
474                if verify_results:
475                    self._verify_results(sorted(actual_result['results']), sorted(expected_result))
476                else:
477                    return "ran query with success and validated results" , True
478                check = True
479            except Exception, ex:
480                if (next_time - init_time > timeout or try_count >= max_try):
481                    return ex, False
482            finally:
483                try_count += 1
484        return "ran query with success and validated results" , check
485
486
487    def run_cbq_query(self, query=None, min_output_size=10, server=None, query_params = {}, is_prepared=False, scan_consistency = None, scan_vector = None, verbose= True):
488        if query is None:
489            query = self.query
490        if server is None:
491           server = self.master
492           if server.ip == "127.0.0.1":
493            self.n1ql_port = server.n1ql_port
494        else:
495            if server.ip == "127.0.0.1":
496                self.n1ql_port = server.n1ql_port
497            if self.input.tuq_client and "client" in self.input.tuq_client:
498                server = self.tuq_client
499        if self.n1ql_port == None or self.n1ql_port == '':
500            self.n1ql_port = self.input.param("n1ql_port", 90)
501            if not self.n1ql_port:
502                self.log.info(" n1ql_port is not defined, processing will not proceed further")
503                raise Exception("n1ql_port is not defined, processing will not proceed further")
504        cred_params = {'creds': []}
505        for bucket in self.buckets:
506            if bucket.saslPassword:
507                cred_params['creds'].append({'user': 'local:%s' % bucket.name, 'pass': bucket.saslPassword})
508        query_params.update(cred_params)
509        if self.use_rest:
510            query_params = {}
511            if scan_consistency:
512                query_params['scan_consistency']=  scan_consistency
513            if scan_vector:
514                query_params['scan_vector']=  str(scan_vector).replace("'", '"')
515            if verbose:
516                self.log.info('RUN QUERY %s' % query)
517            result = RestConnection(server).query_tool(query, self.n1ql_port, query_params=query_params, is_prepared = is_prepared, verbose = verbose)
518        else:
519            # if self.version == "git_repo":
520            #     output = self.shell.execute_commands_inside("$GOPATH/src/github.com/couchbaselabs/tuqtng/" +\
521            #                                                 "tuq_client/tuq_client " +\
522            #                                                 "-engine=http://%s:8093/" % server.ip,
523            #                                            subcommands=[query,],
524            #                                            min_output_size=20,
525            #                                            end_msg='tuq_client>')
526            # else:
527            #os = self.shell.extract_remote_info().type.lower()
528            shell = RemoteMachineShellConnection(server)
529            #query = query.replace('"', '\\"')
530            #query = query.replace('`', '\\`')
531            #if os == "linux":
532            cmd = "%s/cbq  -engine=http://%s:8093/" % (testconstants.LINUX_COUCHBASE_BIN_PATH,server.ip)
533            output = shell.execute_commands_inside(cmd,query,"","","","","")
534            print "--------------------------------------------------------------------------------------------------------------------------------"
535            print output
536            result = json.loads(output)
537            print result
538            result = self._parse_query_output(output)
539        if isinstance(result, str) or 'errors' in result:
540            error_result = str(result)
541            length_display = len(error_result)
542            if length_display > 500:
543                error_result = error_result[:500]
544            raise CBQError(error_result, server.ip)
545        self.log.info("TOTAL ELAPSED TIME: %s" % result["metrics"]["elapsedTime"])
546        return result
547
548    # def is_index_online_and_in_list(self, bucket, index_name, server=None, timeout=600.0):
549    #     check = self._is_index_in_list(bucket, index_name, server = server)
550    #     init_time = time.time()
551    #     while not check:
552    #         time.sleep(1)
553    #         check = self._is_index_in_list(bucket, index_name, server = server)
554    #         next_time = time.time()
555    #         if check or (next_time - init_time > timeout):
556    #             return check
557    #     return check
558    #
559    # def is_index_ready_and_in_list(self, bucket, index_name, server=None, timeout=600.0):
560    #     query = "SELECT * FROM system:indexes where name = \'{0}\'".format(index_name)
561    #     if server == None:
562    #         server = self.master
563    #     init_time = time.time()
564    #     check = False
565    #     while not check:
566    #         res = self.run_analytics_query(query=query, server=server)
567    #         for item in res['results']:
568    #             if 'keyspace_id' not in item['indexes']:
569    #                 check = False
570    #             elif item['indexes']['keyspace_id'] == str(bucket) \
571    #                     and item['indexes']['name'] == index_name \
572    #                     and item['indexes']['state'] == "online":
573    #                 check = True
574    #         time.sleep(1)
575    #         next_time = time.time()
576    #         check = check or (next_time - init_time > timeout)
577    #     return check
578
579    # def is_index_online_and_in_list_bulk(self, bucket, index_names = [], server = None, index_state = "online", timeout = 600.0):
580    #     check, index_names = self._is_index_in_list_bulk(bucket, index_names, server = server, index_state = index_state)
581    #     init_time = time.time()
582    #     while not check:
583    #         check, index_names = self._is_index_in_list_bulk(bucket, index_names, server = server, index_state = index_state)
584    #         next_time = time.time()
585    #         if check or (next_time - init_time > timeout):
586    #             return check
587    #     return check
588    #
589    # def gen_build_index_query(self, bucket = "default", index_list = []):
590    #     return "BUILD INDEX on {0}({1}) USING GSI".format(bucket,",".join(index_list))
591    #
592    # def gen_query_parameter(self, scan_vector = None, scan_consistency = None):
593    #     query_params = {}
594    #     if scan_vector:
595    #         query_params.update("scan_vector", scan_vector)
596    #     if scan_consistency:
597    #         query_params.update("scan_consistency", scan_consistency)
598    #     return query_params
599
600    # def _is_index_in_list(self, bucket, index_name, server = None, index_state = ["pending", "building", "deferred"]):
601    #     query = "SELECT * FROM system:indexes where name = \'{0}\'".format(index_name)
602    #     if server == None:
603    #         server = self.master
604    #     res = self.run_cbq_query(query = query, server = server)
605    #     for item in res['results']:
606    #         if 'keyspace_id' not in item['indexes']:
607    #             return False
608    #         if item['indexes']['keyspace_id'] == str(bucket) and item['indexes']['name'] == index_name and item['indexes']['state'] not in index_state:
609    #             return True
610    #     return False
611    #
612    # def _is_index_in_list_bulk(self, bucket, index_names = [], server = None, index_state = ["pending","building"]):
613    #     query = "SELECT * FROM system:indexes"
614    #     if server == None:
615    #         server = self.master
616    #     res = self.run_cbq_query(query = query, server = server)
617    #     index_count=0
618    #     found_index_list = []
619    #     for item in res['results']:
620    #         if 'keyspace_id' not in item['indexes']:
621    #             return False
622    #         for index_name in index_names:
623    #             if item['indexes']['keyspace_id'] == str(bucket) and item['indexes']['name'] == index_name and item['indexes']['state'] not in index_state:
624    #                 found_index_list.append(index_name)
625    #     if len(found_index_list) == len(index_names):
626    #         return True, []
627    #     return False, list(set(index_names) - set(found_index_list))
628    #
629    # def gen_index_map(self, server = None):
630    #     query = "SELECT * FROM system:indexes"
631    #     if server == None:
632    #         server = self.master
633    #     res = self.run_cbq_query(query = query, server = server)
634    #     index_map = {}
635    #     for item in res['results']:
636    #         bucket_name = item['indexes']['keyspace_id'].encode('ascii','ignore')
637    #         if bucket_name not in index_map.keys():
638    #             index_map[bucket_name] = {}
639    #         index_name = str(item['indexes']['name'])
640    #         index_map[bucket_name][index_name] = {}
641    #         index_map[bucket_name][index_name]['state'] = item['indexes']['state']
642    #     return index_map
643    #
644    # def get_index_count_using_primary_index(self, buckets, server = None):
645    #     query = "SELECT COUNT(*) FROM {0}"
646    #     map= {}
647    #     if server == None:
648    #         server = self.master
649    #     for bucket in buckets:
650    #         res = self.run_cbq_query(query = query.format(bucket.name), server = server)
651    #         map[bucket.name] = int(res["results"][0]["$1"])
652    #     return map
653    #
654    # def get_index_count_using_index(self, bucket, index_name,server=None):
655    #     query = 'SELECT COUNT(*) FROM {0} USE INDEX ({1})'.format(bucket.name, index_name)
656    #     if not server:
657    #         server = self.master
658    #     res = self.run_cbq_query(query=query, server=server)
659    #     return int(res['results'][0]['$1'])
660
661    def _gen_dict(self, result):
662        result_set = []
663        if result != None and len(result) > 0:
664                for val in result:
665                    for key in val.keys():
666                        result_set.append(val[key])
667        return result_set
668
669    def _gen_dict_n1ql_func_result(self, result):
670        result_set = [val[key] for val in result for key in val.keys()]
671        new_result_set = []
672        if len(result_set) > 0:
673            for value in result_set:
674                if isinstance(value, float):
675                    new_result_set.append(round(value, 0))
676                else:
677                    new_result_set.append(value)
678        else:
679            new_result_set = result_set
680        return new_result_set
681
682    def _check_sample(self, result, expected_in_key = None):
683        if expected_in_key == "FUN":
684            return False
685        if expected_in_key == None or len(expected_in_key) == 0:
686            return False
687        if result != None and len(result) > 0:
688            sample=result[0]
689            for key in sample.keys():
690                for sample in expected_in_key:
691                    if key in sample:
692                        return True
693        return False
694
695    def old_gen_dict(self, result):
696        result_set = []
697        map = {}
698        duplicate_keys = []
699        try:
700            if result != None and len(result) > 0:
701                for val in result:
702                    for key in val.keys():
703                        result_set.append(val[key])
704            for val in result_set:
705                if val["_id"] in map.keys():
706                    duplicate_keys.append(val["_id"])
707                map[val["_id"]] = val
708            keys = map.keys()
709            keys.sort()
710        except Exception, ex:
711            self.log.info(ex)
712            raise
713        if len(duplicate_keys) > 0:
714            raise Exception(" duplicate_keys {0}".format(duplicate_keys))
715        return map
716
717