1import paramiko
2import logger
3import json
4import uuid
5import math
6import re
7import time
8import testconstants
9import datetime
10import time
11from datetime import date
12from couchbase_helper.tuq_generators import TuqGenerators
13from remote.remote_util import RemoteMachineShellConnection
14from membase.api.exception import CBQError, ReadDocumentException
15from membase.api.rest_client import RestConnection
16import copy
17
18class N1QLHelper():
19    def __init__(self, version=None, master=None, shell=None,  max_verify=0, buckets=[], item_flag=0,
20                 n1ql_port=8093, full_docs_list=[], log=None, input=None, database=None, use_rest=None):
21        self.version = version
22        self.shell = shell
23        self.max_verify = max_verify
24        self.buckets = buckets
25        self.item_flag = item_flag
26        self.n1ql_port = n1ql_port
27        self.input = input
28        self.log = log
29        self.use_rest = use_rest
30        self.full_docs_list = full_docs_list
31        self.master = master
32        self.database = database
33        if self.full_docs_list and len(self.full_docs_list) > 0:
34            self.gen_results = TuqGenerators(self.log, self.full_docs_list)
35
36    def killall_tuq_process(self):
37        self.shell.execute_command("killall cbq-engine")
38        self.shell.execute_command("killall tuqtng")
39        self.shell.execute_command("killall indexer")
40
41    def run_query_from_template(self, query_template):
42        self.query = self.gen_results.generate_query(query_template)
43        expected_result = self.gen_results.generate_expected_result()
44        actual_result = self.run_cbq_query()
45        return actual_result, expected_result
46
47    def run_cbq_query(self, query=None, min_output_size=10, server=None, query_params={}, is_prepared=False,
48                      scan_consistency=None, scan_vector=None, verbose=True):
49        if query is None:
50            query = self.query
51        if server is None:
52            server = self.master
53            if server.ip == "127.0.0.1":
54                self.n1ql_port = server.n1ql_port
55        else:
56            if server.ip == "127.0.0.1":
57                self.n1ql_port = server.n1ql_port
58            if self.input.tuq_client and "client" in self.input.tuq_client:
59                server = self.tuq_client
60        if self.n1ql_port is None or self.n1ql_port == '':
61            self.n1ql_port = self.input.param("n1ql_port", 8093)
62            if not self.n1ql_port:
63                self.log.info(" n1ql_port is not defined, processing will not proceed further")
64                raise Exception("n1ql_port is not defined, processing will not proceed further")
65        cred_params = {'creds': []}
66        for bucket in self.buckets:
67            if bucket.saslPassword:
68                cred_params['creds'].append({'user': 'local:%s' % bucket.name, 'pass': bucket.saslPassword})
69        query_params.update(cred_params)
70        if self.use_rest:
71            query_params = {}
72            if scan_consistency:
73                query_params['scan_consistency']= scan_consistency
74            if scan_vector:
75                query_params['scan_vector']= str(scan_vector).replace("'", '"')
76            if verbose:
77                self.log.info('RUN QUERY %s' % query)
78            result = RestConnection(server).query_tool(query, self.n1ql_port, query_params=query_params, is_prepared = is_prepared, verbose = verbose)
79        else:
80            shell = RemoteMachineShellConnection(server)
81            url = "'http://%s:8093/query/service'" % server.ip
82            cmd = "%s/cbq  -engine=http://%s:8093/" % (testconstants.LINUX_COUCHBASE_BIN_PATH, server.ip)
83            query = query.replace('"', '\\"')
84            if "#primary" in query:
85                query = query.replace("'#primary'", '\\"#primary\\"')
86            query = "select curl('POST', " + url + ", {'data' : 'statement=%s'})" % query
87            print query
88            output = shell.execute_commands_inside(cmd, query, "", "", "", "", "")
89            print "-"*128
90            print output
91            new_curl = json.dumps(output[47:])
92            string_curl = json.loads(new_curl)
93            result = json.loads(string_curl)
94            print result
95        if isinstance(result, str) or 'errors' in result:
96            error_result = str(result)
97            length_display = len(error_result)
98            if length_display > 500:
99                error_result = error_result[:500]
100            raise CBQError(error_result, server.ip)
101        self.log.info("TOTAL ELAPSED TIME: %s" % result["metrics"]["elapsedTime"])
102        return result
103
104    def _verify_results(self, actual_result, expected_result, missing_count = 1, extra_count = 1):
105        self.log.info(" Analyzing Actual Result")
106        actual_result = self._gen_dict(actual_result)
107        self.log.info(" Analyzing Expected Result")
108        expected_result = self._gen_dict(expected_result)
109        if len(actual_result) != len(expected_result):
110            raise Exception("Results are incorrect.Actual num %s. Expected num: %s.\n" % (len(actual_result), len(expected_result)))
111        msg = "The number of rows match but the results mismatch, please check"
112        if actual_result != expected_result:
113            raise Exception(msg)
114
115    def _verify_results_rqg(self, subquery, aggregate=False, n1ql_result=[], sql_result=[], hints=["a1"], aggregate_pushdown=False):
116        new_n1ql_result = []
117        for result in n1ql_result:
118            if result != {}:
119                new_n1ql_result.append(result)
120
121        n1ql_result = new_n1ql_result
122
123        if self._is_function_in_result(hints):
124            return self._verify_results_rqg_for_function(n1ql_result, sql_result, aggregate_pushdown=aggregate_pushdown)
125
126        check = self._check_sample(n1ql_result, hints)
127        actual_result = n1ql_result
128
129        if actual_result == [{}]:
130            actual_result = []
131        if check:
132            actual_result = self._gen_dict(n1ql_result)
133
134        actual_result = sorted(actual_result)
135        expected_result = sorted(sql_result)
136
137        if len(actual_result) != len(expected_result):
138            extra_msg = self._get_failure_message(expected_result, actual_result)
139            raise Exception("Results are incorrect. Actual num %s. Expected num: %s. :: %s \n" % (len(actual_result), len(expected_result), extra_msg))
140
141        msg = "The number of rows match but the results mismatch, please check"
142        if subquery:
143            for x, y in zip(actual_result, expected_result):
144                if aggregate:
145                    productId = x['ABC'][0]['$1']
146                else:
147                    productId = x['ABC'][0]['productId']
148                if(productId != y['ABC']) or \
149                                x['datetime_field1'] != y['datetime_field1'] or \
150                                x['primary_key_id'] != y['primary_key_id'] or \
151                                x['varchar_field1'] != y['varchar_field1'] or \
152                                x['decimal_field1'] != y['decimal_field1'] or \
153                                x['char_field1'] != y['char_field1'] or \
154                                x['int_field1'] != y['int_field1'] or \
155                                x['bool_field1'] != y['bool_field1']:
156                    print "actual_result is %s" % actual_result
157                    print "expected result is %s" % expected_result
158                    extra_msg = self._get_failure_message(expected_result, actual_result)
159                    raise Exception(msg+"\n "+extra_msg)
160        else:
161            if self._sort_data(actual_result) != self._sort_data(expected_result):
162                extra_msg = self._get_failure_message(expected_result, actual_result)
163                raise Exception(msg+"\n "+extra_msg)
164
165    def _sort_data(self, result):
166        new_data = []
167        for data in result:
168            new_data.append(sorted(data))
169        return new_data
170
171    def _verify_results_crud_rqg(self, n1ql_result=[], sql_result=[], hints=["primary_key_id"]):
172        new_n1ql_result = []
173        for result in n1ql_result:
174            if result != {}:
175                new_n1ql_result.append(result)
176        n1ql_result = new_n1ql_result
177        if self._is_function_in_result(hints):
178            return self._verify_results_rqg_for_function(n1ql_result, sql_result)
179        check = self._check_sample(n1ql_result, hints)
180        actual_result = n1ql_result
181        if actual_result == [{}]:
182            actual_result = []
183        if check:
184            actual_result = self._gen_dict(n1ql_result)
185        actual_result = sorted(actual_result)
186        expected_result = sorted(sql_result)
187
188        if len(actual_result) != len(expected_result):
189            extra_msg = self._get_failure_message(expected_result, actual_result)
190            raise Exception("Results are incorrect. Actual num %s. Expected num: %s.:: %s \n" % (len(actual_result), len(expected_result), extra_msg))
191        if not self._result_comparison_analysis(actual_result, expected_result):
192            msg = "The number of rows match but the results mismatch, please check"
193            extra_msg = self._get_failure_message(expected_result, actual_result)
194            raise Exception(msg+"\n "+extra_msg)
195
196    def _get_failure_message(self, expected_result, actual_result):
197        if expected_result is None:
198            expected_result = []
199        if actual_result is None:
200            actual_result = []
201        len_expected_result = len(expected_result)
202        len_actual_result = len(actual_result)
203        len_expected_result = min(5, len_expected_result)
204        len_actual_result = min(5, len_actual_result)
205        extra_msg = "mismatch in results :: expected :: {0}, actual :: {1} ".format(expected_result[0:len_expected_result], actual_result[0:len_actual_result])
206        return extra_msg
207
208    def _result_comparison_analysis(self, expected_result, actual_result):
209        expected_map = {}
210        actual_map = {}
211        for data in expected_result:
212            primary=None
213            for key in data.keys():
214                keys = key
215                if keys.encode('ascii') == "primary_key_id":
216                    primary = keys
217            expected_map[data[primary]] = data
218        for data in actual_result:
219            primary = None
220            for key in data.keys():
221                keys = key
222                if keys.encode('ascii') == "primary_key_id":
223                    primary = keys
224            actual_map[data[primary]] = data
225        check = True
226        for key in expected_map.keys():
227            if sorted(actual_map[key]) != sorted(expected_map[key]):
228                check= False
229        return check
230
231    def _analyze_for_special_case_using_func(self, expected_result, actual_result):
232        if expected_result is None:
233            expected_result = []
234        if actual_result is None:
235            actual_result = []
236        if len(expected_result) == 1:
237            value = expected_result[0].values()[0]
238            if value is None or value == 0:
239                expected_result = []
240        if len(actual_result) == 1:
241            value = actual_result[0].values()[0]
242            if value is None or value == 0:
243                actual_result = []
244        return expected_result, actual_result
245
246    def _is_function_in_result(self, result):
247        if result == "FUN":
248            return True
249        return False
250
251    def _verify_results_rqg_for_function(self, n1ql_result=[], sql_result=[], hints=["a1"], aggregate_pushdown=False):
252        if not aggregate_pushdown:
253            sql_result, n1ql_result = self._analyze_for_special_case_using_func(sql_result, n1ql_result)
254        if len(sql_result) != len(n1ql_result):
255            msg = "the number of results do not match :: sql = {0}, n1ql = {1}".format(len(sql_result), len(n1ql_result))
256            extra_msg = self._get_failure_message(sql_result, n1ql_result)
257            raise Exception(msg+"\n"+extra_msg)
258        n1ql_result = self._gen_dict_n1ql_func_result(n1ql_result)
259        n1ql_result = sorted(n1ql_result)
260        sql_result = self._gen_dict_n1ql_func_result(sql_result)
261        sql_result = sorted(sql_result)
262        if len(sql_result) == 0 and len(n1ql_result) == 0:
263            return
264        if sql_result != n1ql_result:
265            i = 0
266            for sql_value, n1ql_value in zip(sql_result, n1ql_result):
267                if sql_value != n1ql_value:
268                    break
269                i = i + 1
270            num_results = len(sql_result)
271            last_idx = min(i+5, num_results)
272            msg = "mismatch in results :: result length :: {3}, first mismatch position :: {0}, sql value :: {1}, n1ql value :: {2} ".format(i, sql_result[i:last_idx], n1ql_result[i:last_idx], num_results)
273            raise Exception(msg)
274
275    def _convert_to_number(self, val):
276        if not isinstance(val, str):
277            return val
278        value = -1
279        try:
280            if value == '':
281                return 0
282            value = int(val.split("(")[1].split(")")[0])
283        except Exception, ex:
284            self.log.info(ex)
285        finally:
286            return value
287
288    def analyze_failure(self, actual, expected):
289        missing_keys = []
290        different_values = []
291        for key in expected.keys():
292            if key not in actual.keys():
293                missing_keys.append(key)
294            if expected[key] != actual[key]:
295                different_values.append("for key {0}, expected {1} \n actual {2}".format(key, expected[key], actual[key]))
296        self.log.info(missing_keys)
297        if len(different_values) > 0:
298            self.log.info(" number of such cases {0}".format(len(different_values)))
299            self.log.info(" example key {0}".format(different_values[0]))
300
301    def check_missing_and_extra(self, actual, expected):
302        missing = []
303        extra = []
304        for item in actual:
305            if not (item in expected):
306                extra.append(item)
307        for item in expected:
308            if not (item in actual):
309                missing.append(item)
310        return missing, extra
311
312    def build_url(self, version):
313        info = self.shell.extract_remote_info()
314        type = info.distribution_type.lower()
315        if type in ["ubuntu", "centos", "red hat"]:
316            url = "https://s3.amazonaws.com/packages.couchbase.com/releases/couchbase-query/dp1/"
317            url += "couchbase-query_%s_%s_linux.tar.gz" % (version, info.architecture_type)
318        #TODO for windows
319        return url
320
321    def _restart_indexer(self):
322        couchbase_path = "/opt/couchbase/var/lib/couchbase"
323        cmd = "rm -f {0}/meta;rm -f /tmp/log_upr_client.sock".format(couchbase_path)
324        self.shell.execute_command(cmd)
325
326    def _start_command_line_query(self, server):
327        self.shell = RemoteMachineShellConnection(server)
328        self._set_env_variable(server)
329        if self.version == "git_repo":
330            os = self.shell.extract_remote_info().type.lower()
331            if os != 'windows':
332                gopath = testconstants.LINUX_GOPATH
333            else:
334                gopath = testconstants.WINDOWS_GOPATH
335            if self.input.tuq_client and "gopath" in self.input.tuq_client:
336                gopath = self.input.tuq_client["gopath"]
337            if os == 'windows':
338                cmd = "cd %s/src/github.com/couchbase/query/server/main; " % (gopath) +\
339                "./cbq-engine.exe -datastore http://%s:%s/ >/dev/null 2>&1 &" % (server.ip, server.port)
340            else:
341                cmd = "cd %s/src/github.com/couchbase/query//server/main; " % (gopath) +\
342                "./cbq-engine -datastore http://%s:%s/ >n1ql.log 2>&1 &" % (server.ip, server.port)
343            self.shell.execute_command(cmd)
344        elif self.version == "sherlock":
345            os = self.shell.extract_remote_info().type.lower()
346            if os != 'windows':
347                couchbase_path = testconstants.LINUX_COUCHBASE_BIN_PATH
348            else:
349                couchbase_path = testconstants.WIN_COUCHBASE_BIN_PATH
350            if self.input.tuq_client and "sherlock_path" in self.input.tuq_client:
351                couchbase_path = "%s/bin" % self.input.tuq_client["sherlock_path"]
352                print "PATH TO SHERLOCK: %s" % couchbase_path
353            if os == 'windows':
354                cmd = "cd %s; " % (couchbase_path) +\
355                "./cbq-engine.exe -datastore http://%s:%s/ >/dev/null 2>&1 &" % (server.ip, server.port)
356            else:
357                cmd = "cd %s; " % (couchbase_path) +\
358                "./cbq-engine -datastore http://%s:%s/ >n1ql.log 2>&1 &" % (server.ip, server.port)
359                n1ql_port = self.input.param("n1ql_port", None)
360                if server.ip == "127.0.0.1" and server.n1ql_port:
361                    n1ql_port = server.n1ql_port
362                if n1ql_port:
363                    cmd = "cd %s; " % (couchbase_path) +\
364                './cbq-engine -datastore http://%s:%s/ -http=":%s">n1ql.log 2>&1 &' % (server.ip, server.port, n1ql_port)
365            self.shell.execute_command(cmd)
366        else:
367            os = self.shell.extract_remote_info().type.lower()
368            if os != 'windows':
369                cmd = "cd /tmp/tuq;./cbq-engine -couchbase http://%s:%s/ >/dev/null 2>&1 &" % (server.ip, server.port)
370            else:
371                cmd = "cd /cygdrive/c/tuq;./cbq-engine.exe -couchbase http://%s:%s/ >/dev/null 2>&1 &" % (server.ip, server.port)
372            self.shell.execute_command(cmd)
373
374    def _parse_query_output(self, output):
375        if output.find("cbq>") == 0:
376            output = output[output.find("cbq>") + 4:].strip()
377        if output.find("tuq_client>") == 0:
378            output = output[output.find("tuq_client>") + 11:].strip()
379        if output.find("cbq>") != -1:
380            output = output[:output.find("cbq>")].strip()
381        if output.find("tuq_client>") != -1:
382            output = output[:output.find("tuq_client>")].strip()
383        return json.loads(output)
384
385    def sort_nested_list(self, result):
386        actual_result = []
387        for item in result:
388            curr_item = {}
389            for key, value in item.iteritems():
390                if isinstance(value, list) or isinstance(value, set):
391                    curr_item[key] = sorted(value)
392                else:
393                    curr_item[key] = value
394            actual_result.append(curr_item)
395        return actual_result
396
397    def configure_gomaxprocs(self):
398        max_proc = self.input.param("gomaxprocs", None)
399        cmd = "export GOMAXPROCS=%s" % max_proc
400        for _ in self.servers:
401            shell_connection = RemoteMachineShellConnection(self.master)
402            shell_connection.execute_command(cmd)
403
404    def drop_primary_index(self, using_gsi = True, server = None):
405        if server is None:
406            server = self.master
407        self.log.info("CHECK FOR PRIMARY INDEXES")
408        for bucket in self.buckets:
409            self.query = "DROP PRIMARY INDEX ON {0}".format(bucket.name)
410            if using_gsi:
411                self.query += " USING GSI"
412            if not using_gsi:
413                self.query += " USING VIEW "
414            self.log.info(self.query)
415            try:
416                check = self._is_index_in_list(bucket.name, "#primary", server = server)
417                if check:
418                    self.run_cbq_query(server=server)
419            except Exception, ex:
420                self.log.error('ERROR during index creation %s' % str(ex))
421
422    def create_primary_index(self, using_gsi=True, server=None):
423        if server is None:
424            server = self.master
425        for bucket in self.buckets:
426            self.query = "CREATE PRIMARY INDEX ON %s " % bucket.name
427            if using_gsi:
428                self.query += " USING GSI"
429            if not using_gsi:
430                self.query += " USING VIEW "
431            if self.use_rest:
432                try:
433                    check = self._is_index_in_list(bucket.name, "#primary", server = server)
434                    if not check:
435                        self.run_cbq_query(server = server,query_params={'timeout' : '900s'})
436                        check = self.is_index_online_and_in_list(bucket.name, "#primary", server = server)
437                        if not check:
438                            raise Exception(" Timed-out Exception while building primary index for bucket {0} !!!".format(bucket.name))
439                    else:
440                        raise Exception(" Primary Index Already present, This looks like a bug !!!")
441                except Exception, ex:
442                    self.log.error('ERROR during index creation %s' % str(ex))
443                    raise ex
444
445    def create_partitioned_primary_index(self, using_gsi=True, server=None):
446        if server is None:
447            server = self.master
448        for bucket in self.buckets:
449            self.query = "CREATE PRIMARY INDEX ON %s " % bucket.name
450            if using_gsi:
451                self.query += " PARTITION BY HASH(meta().id) USING GSI"
452            if not using_gsi:
453                self.query += " USING VIEW "
454            if self.use_rest:
455                try:
456                    check = self._is_index_in_list(bucket.name, "#primary",
457                                                   server=server)
458                    if not check:
459                        self.run_cbq_query(server=server,
460                                           query_params={'timeout': '900s'})
461                        check = self.is_index_online_and_in_list(bucket.name,
462                                                                 "#primary",
463                                                                 server=server)
464                        if not check:
465                            raise Exception(
466                                " Timed-out Exception while building primary index for bucket {0} !!!".format(
467                                    bucket.name))
468                    else:
469                        raise Exception(
470                            " Primary Index Already present, This looks like a bug !!!")
471                except Exception, ex:
472                    self.log.error('ERROR during index creation %s' % str(ex))
473                    raise ex
474
475    def verify_index_with_explain(self, actual_result, index_name, check_covering_index=False):
476        check = True
477        if check_covering_index:
478            if "covering" in str(actual_result):
479                check = True
480            else:
481                check = False
482        if index_name in str(actual_result):
483            return True and check
484        return False
485
486    def run_query_and_verify_result(self, server=None, query=None, timeout=120.0, max_try=1, expected_result=None,
487                                    scan_consistency=None, scan_vector=None, verify_results=True):
488        check = False
489        init_time = time.time()
490        try_count = 0
491        while not check:
492            next_time = time.time()
493            try:
494                actual_result = self.run_cbq_query(query=query, server=server, scan_consistency=scan_consistency,
495                                                   scan_vector=scan_vector)
496                if verify_results:
497                    self._verify_results(sorted(actual_result['results']), sorted(expected_result))
498                else:
499                    return "ran query with success and validated results", True
500                check = True
501            except Exception, ex:
502                if next_time - init_time > timeout or try_count >= max_try:
503                    return ex, False
504            finally:
505                try_count += 1
506        return "ran query with success and validated results", check
507
508    def get_index_names(self, server=None):
509        query = "select distinct(name) from system:indexes where `using`='gsi'"
510        index_names = []
511        if server is None:
512            server = self.master
513        res = self.run_cbq_query(query=query, server=server)
514        for item in res['results']:
515            index_names.append(item['name'])
516        return index_names
517
518    def is_index_online_and_in_list(self, bucket, index_name, server=None, timeout=600.0):
519        check = self._is_index_in_list(bucket, index_name, server=server)
520        init_time = time.time()
521        while not check:
522            time.sleep(1)
523            check = self._is_index_in_list(bucket, index_name, server=server)
524            next_time = time.time()
525            if check or (next_time - init_time > timeout):
526                return check
527        return check
528
529    def is_index_ready_and_in_list(self, bucket, index_name, server=None, timeout=600.0):
530        query = "SELECT * FROM system:indexes where name = \'{0}\'".format(index_name)
531        if server is None:
532            server = self.master
533        init_time = time.time()
534        check = False
535        while not check:
536            res = self.run_cbq_query(query=query, server=server)
537            for item in res['results']:
538                if 'keyspace_id' not in item['indexes']:
539                    check = False
540                elif item['indexes']['keyspace_id'] == str(bucket) \
541                        and item['indexes']['name'] == index_name \
542                        and item['indexes']['state'] == "online":
543                    check = True
544            time.sleep(1)
545            next_time = time.time()
546            check = check or (next_time - init_time > timeout)
547        return check
548
549    def is_index_online_and_in_list_bulk(self, bucket, index_names=[], server=None, index_state="online", timeout=600.0):
550        check, index_names = self._is_index_in_list_bulk(bucket, index_names, server=server, index_state=index_state)
551        init_time = time.time()
552        while not check:
553            check, index_names = self._is_index_in_list_bulk(bucket, index_names, server=server, index_state=index_state)
554            next_time = time.time()
555            if check or (next_time - init_time > timeout):
556                return check
557        return check
558
559    def gen_build_index_query(self, bucket="default", index_list=[]):
560        return "BUILD INDEX on {0}({1}) USING GSI".format(bucket, ",".join(index_list))
561
562    def gen_query_parameter(self, scan_vector=None, scan_consistency=None):
563        query_params = {}
564        if scan_vector:
565            query_params.update("scan_vector", scan_vector)
566        if scan_consistency:
567            query_params.update("scan_consistency", scan_consistency)
568        return query_params
569
570    def _is_index_in_list(self, bucket, index_name, server=None, index_state=["pending", "building", "deferred"]):
571        query = "SELECT * FROM system:indexes where name = \'{0}\'".format(index_name)
572        if server is None:
573            server = self.master
574        res = self.run_cbq_query(query=query, server=server)
575        for item in res['results']:
576            if 'keyspace_id' not in item['indexes']:
577                return False
578            if item['indexes']['keyspace_id'] == str(bucket) and item['indexes']['name'] == index_name and item['indexes']['state'] not in index_state:
579                return True
580        return False
581
582    def _is_index_in_list_bulk(self, bucket, index_names=[], server=None, index_state=["pending","building"]):
583        query = "SELECT * FROM system:indexes"
584        if server is None:
585            server = self.master
586        res = self.run_cbq_query(query=query, server=server)
587        found_index_list = []
588        for item in res['results']:
589            if 'keyspace_id' not in item['indexes']:
590                return False
591            for index_name in index_names:
592                if item['indexes']['keyspace_id'] == str(bucket) and item['indexes']['name'] == index_name and item['indexes']['state'] not in index_state:
593                    found_index_list.append(index_name)
594        if len(found_index_list) == len(index_names):
595            return True, []
596        return False, list(set(index_names) - set(found_index_list))
597
598    def gen_index_map(self, server=None):
599        query = "SELECT * FROM system:indexes"
600        if server is None:
601            server = self.master
602        res = self.run_cbq_query(query=query, server=server)
603        index_map = {}
604        for item in res['results']:
605            bucket_name = item['indexes']['keyspace_id'].encode('ascii', 'ignore')
606            if bucket_name not in index_map.keys():
607                index_map[bucket_name] = {}
608            index_name = str(item['indexes']['name'])
609            index_map[bucket_name][index_name] = {}
610            index_map[bucket_name][index_name]['state'] = item['indexes']['state']
611        return index_map
612
613    def get_index_count_using_primary_index(self, buckets, server=None):
614        query = "SELECT COUNT(*) FROM {0}"
615        map= {}
616        if server is None:
617            server = self.master
618        for bucket in buckets:
619            res = self.run_cbq_query(query=query.format(bucket.name), server=server)
620            map[bucket.name] = int(res["results"][0]["$1"])
621        return map
622
623    def get_index_count_using_index(self, bucket, index_name, server=None):
624        query = 'SELECT COUNT(*) FROM {0} USE INDEX ({1})'.format(bucket.name, index_name)
625        if not server:
626            server = self.master
627        res = self.run_cbq_query(query=query, server=server)
628        return int(res['results'][0]['$1'])
629
630    def _gen_dict(self, result):
631        result_set = []
632        if result is not None and len(result) > 0:
633            for val in result:
634                for key in val.keys():
635                    result_set.append(val[key])
636        return result_set
637
638    def _gen_dict_n1ql_func_result(self, result):
639        result_set = [val[key] for val in result for key in val.keys()]
640        new_result_set = []
641        if len(result_set) > 0:
642            for value in result_set:
643                if isinstance(value, float):
644                    new_result_set.append(round(value, 0))
645                elif value == 'None':
646                    new_result_set.append(None)
647                else:
648                    new_result_set.append(value)
649        else:
650            new_result_set = result_set
651        return new_result_set
652
653    def _check_sample(self, result, expected_in_key=None):
654        if expected_in_key == "FUN":
655            return False
656        if expected_in_key is None or len(expected_in_key) == 0:
657            return False
658        if result is not None and len(result) > 0:
659            sample = result[0]
660            for key in sample.keys():
661                for sample in expected_in_key:
662                    if key in sample:
663                        return True
664        return False
665
666    def old_gen_dict(self, result):
667        result_set = []
668        map = {}
669        duplicate_keys = []
670        try:
671            if result is not None and len(result) > 0:
672                for val in result:
673                    for key in val.keys():
674                        result_set.append(val[key])
675            for val in result_set:
676                if val["_id"] in map.keys():
677                    duplicate_keys.append(val["_id"])
678                map[val["_id"]] = val
679            keys = map.keys()
680            keys.sort()
681        except Exception, ex:
682            self.log.info(ex)
683            raise
684        if len(duplicate_keys) > 0:
685            raise Exception(" duplicate_keys {0}".format(duplicate_keys))
686        return map
687
688    def _set_env_variable(self, server):
689        self.shell.execute_command("export NS_SERVER_CBAUTH_URL=\"http://{0}:{1}/_cbauth\"".format(server.ip, server.port))
690        self.shell.execute_command("export NS_SERVER_CBAUTH_USER=\"{0}\"".format(server.rest_username))
691        self.shell.execute_command("export NS_SERVER_CBAUTH_PWD=\"{0}\"".format(server.rest_password))
692        self.shell.execute_command("export NS_SERVER_CBAUTH_RPC_URL=\"http://{0}:{1}/cbauth-demo\"".format(server.ip, server.port))
693        self.shell.execute_command("export CBAUTH_REVRPC_URL=\"http://{0}:{1}@{2}:{3}/query\"".format(server.rest_username, server.rest_password, server.ip, server.port))
694
695    def verify_indexes_redistributed(self, map_before_rebalance, map_after_rebalance, stats_map_before_rebalance,
696                                     stats_map_after_rebalance, nodes_in, nodes_out, swap_rebalance=False):
697        # verify that number of indexes before and after rebalance are same
698        no_of_indexes_before_rebalance = 0
699        no_of_indexes_after_rebalance = 0
700        for bucket in map_before_rebalance:
701            no_of_indexes_before_rebalance += len(map_before_rebalance[bucket])
702        for bucket in map_after_rebalance:
703            no_of_indexes_after_rebalance += len(map_after_rebalance[bucket])
704        self.log.info("Number of indexes before rebalance : {0}".format(no_of_indexes_before_rebalance))
705        self.log.info("Number of indexes after rebalance  : {0}".format(no_of_indexes_after_rebalance))
706        if no_of_indexes_before_rebalance != no_of_indexes_after_rebalance:
707            self.log.info("some indexes are missing after rebalance")
708            raise Exception("some indexes are missing after rebalance")
709
710        # verify that index names before and after rebalance are same
711        index_names_before_rebalance = []
712        index_names_after_rebalance = []
713        for bucket in map_before_rebalance:
714            for index in map_before_rebalance[bucket]:
715                index_names_before_rebalance.append(index)
716        for bucket in map_after_rebalance:
717            for index in map_after_rebalance[bucket]:
718                index_names_after_rebalance.append(index)
719        self.log.info("Index names before rebalance : {0}".format(sorted(index_names_before_rebalance)))
720        self.log.info("Index names after rebalance  : {0}".format(sorted(index_names_after_rebalance)))
721        if sorted(index_names_before_rebalance) != sorted(index_names_after_rebalance):
722            self.log.info("number of indexes are same but index names don't match")
723            raise Exception("number of indexes are same but index names don't match")
724
725        # verify that rebalanced out nodes are not present
726        host_names_before_rebalance = []
727        host_names_after_rebalance = []
728        for bucket in map_before_rebalance:
729            for index in map_before_rebalance[bucket]:
730                host_names_before_rebalance.append(map_before_rebalance[bucket][index]['hosts'])
731        indexer_nodes_before_rebalance = sorted(set(host_names_before_rebalance))
732        for bucket in map_after_rebalance:
733            for index in map_after_rebalance[bucket]:
734                host_names_after_rebalance.append(map_after_rebalance[bucket][index]['hosts'])
735        indexer_nodes_after_rebalance = sorted(set(host_names_after_rebalance))
736        self.log.info("Host names of indexer nodes before rebalance : {0}".format(indexer_nodes_before_rebalance))
737        self.log.info("Host names of indexer nodes after rebalance  : {0}".format(indexer_nodes_after_rebalance))
738        # indexes need to redistributed in case of rebalance out, not necessarily in case of rebalance in
739        if nodes_out and indexer_nodes_before_rebalance == indexer_nodes_after_rebalance:
740            self.log.info("Even after rebalance some of rebalanced out nodes still have indexes")
741            raise Exception("Even after rebalance some of rebalanced out nodes still have indexes")
742        for node_out in nodes_out:
743            if node_out in indexer_nodes_after_rebalance:
744                self.log.info("rebalanced out node still present after rebalance {0} : {1}".format(node_out,
745                                                                                                   indexer_nodes_after_rebalance))
746                raise Exception("rebalanced out node still present after rebalance")
747        if swap_rebalance:
748            for node_in in nodes_in:
749                # strip of unnecessary data for comparison
750                ip_address = str(node_in).replace("ip:", "").replace(" port", "").replace(" ssh_username:root", "").replace(" ssh_username:Administrator", "")
751                if ip_address not in indexer_nodes_after_rebalance:
752                    self.log.info("swap rebalanced in node is not distributed any indexes")
753                    raise Exception("swap rebalanced in node is not distributed any indexes")
754
755        # verify that items_count before and after rebalance are same
756        items_count_before_rebalance = {}
757        items_count_after_rebalance = {}
758        for bucket in stats_map_before_rebalance:
759            for index in stats_map_before_rebalance[bucket]:
760                items_count_before_rebalance[index] = stats_map_before_rebalance[bucket][index][
761                    "items_count"]
762        for bucket in stats_map_after_rebalance:
763            for index in stats_map_after_rebalance[bucket]:
764                items_count_after_rebalance[index] = stats_map_after_rebalance[bucket][index]["items_count"]
765        self.log.info("item_count of indexes before rebalance {0}".format(items_count_before_rebalance))
766        self.log.info("item_count of indexes after rebalance {0}".format(items_count_after_rebalance))
767        if cmp(items_count_before_rebalance, items_count_after_rebalance) != 0:
768            self.log.info("items_count mismatch")
769            raise Exception("items_count mismatch")
770
771        # verify that index status before and after rebalance are same
772        index_state_before_rebalance = {}
773        index_state_after_rebalance = {}
774        for bucket in map_before_rebalance:
775            for index in map_before_rebalance[bucket]:
776                index_state_before_rebalance[index] = map_before_rebalance[bucket][index]["status"]
777        for bucket in map_after_rebalance:
778            for index in map_after_rebalance[bucket]:
779                index_state_after_rebalance[index] = map_after_rebalance[bucket][index]["status"]
780        self.log.info("index status of indexes rebalance {0}".format(index_state_before_rebalance))
781        self.log.info("index status of indexes rebalance {0}".format(index_state_after_rebalance))
782        if cmp(index_state_before_rebalance, index_state_after_rebalance) != 0:
783            self.log.info("index status mismatch")
784            raise Exception("index status mismatch")
785
786        # Rebalance is not guaranteed to achieve a balanced cluster.
787        # The indexes will be distributed in a manner to satisfy the resource requirements of each index.
788        # Hence printing the index distribution just for logging/debugging purposes
789        index_distribution_map_before_rebalance = {}
790        index_distribution_map_after_rebalance = {}
791        for node in host_names_before_rebalance:
792            index_distribution_map_before_rebalance[node] = index_distribution_map_before_rebalance.get(node, 0) + 1
793        for node in host_names_after_rebalance:
794            index_distribution_map_after_rebalance[node] = index_distribution_map_after_rebalance.get(node, 0) + 1
795        self.log.info("Distribution of indexes before rebalance")
796        for k, v in index_distribution_map_before_rebalance.iteritems():
797            print k, v
798        self.log.info("Distribution of indexes after rebalance")
799        for k, v in index_distribution_map_after_rebalance.iteritems():
800            print k, v
801
802    def verify_replica_indexes(self, index_names, index_map, num_replicas, expected_nodes=None):
803        # 1. Validate count of no_of_indexes
804        # 2. Validate index names
805        # 3. Validate index replica have the same id
806        # 4. Validate index replicas are on different hosts
807
808        nodes = []
809        for index_name in index_names:
810            index_host_name, index_id = self.get_index_details_using_index_name(index_name, index_map)
811            nodes.append(index_host_name)
812
813            for i in range(0, num_replicas):
814                index_replica_name = index_name + " (replica {0})".format(str(i+1))
815
816                try:
817                    index_replica_hostname, index_replica_id = self.get_index_details_using_index_name(
818                        index_replica_name, index_map)
819                except Exception, ex:
820                    self.log.info(str(ex))
821                    raise Exception(str(ex))
822
823                self.log.info("Hostnames : %s , %s" % (index_host_name, index_replica_hostname))
824                self.log.info("Index IDs : %s, %s" % (index_id, index_replica_id))
825
826                nodes.append(index_replica_hostname)
827
828                if index_id != index_replica_id:
829                    self.log.info("Index ID for main index and replica indexes not same")
830                    raise Exception("index id different for replicas")
831
832                if index_host_name == index_replica_hostname:
833                    self.log.info("Index hostname for main index and replica indexes are same")
834                    raise Exception("index hostname same for replicas")
835
836        if expected_nodes:
837            expected_nodes = expected_nodes.sort()
838            nodes = nodes.sort()
839            if not expected_nodes == nodes:
840                self.fail("Replicas not created on expected hosts")
841
842    def verify_replica_indexes_build_status(self, index_map, num_replicas, defer_build=False):
843
844        index_names = self.get_index_names()
845
846        for index_name in index_names:
847            index_status, index_build_progress = self.get_index_status_using_index_name(index_name, index_map)
848            if not defer_build and index_status != "Ready":
849                self.log.info("Expected %s status to be Ready, but it is %s" % (index_name, index_status))
850                raise Exception("Index status incorrect")
851            elif defer_build and index_status != "Created":
852                self.log.info(
853                    "Expected %s status to be Created, but it is %s" % (index_name, index_status))
854                raise Exception("Index status incorrect")
855            else:
856                self.log.info("index_name = %s, defer_build = %s, index_status = %s" % (index_name, defer_build, index_status))
857
858            for i in range(1, num_replicas+1):
859                index_replica_name = index_name + " (replica {0})".format(str(i))
860                try:
861                    index_replica_status, index_replica_progress = self.get_index_status_using_index_name(index_replica_name, index_map)
862                except Exception, ex:
863                    self.log.info(str(ex))
864                    raise Exception(str(ex))
865
866                if not defer_build and index_replica_status != "Ready":
867                    self.log.info("Expected %s status to be Ready, but it is %s" % (index_replica_name, index_replica_status))
868                    raise Exception("Index status incorrect")
869                elif defer_build and index_replica_status != "Created":
870                    self.log.info("Expected %s status to be Created, but it is %s" % (index_replica_name, index_replica_status))
871                    raise Exception("Index status incorrect")
872                else:
873                    self.log.info("index_name = %s, defer_build = %s, index_replica_status = %s" % (index_replica_name, defer_build, index_status))
874
875    def get_index_details_using_index_name(self, index_name, index_map):
876        for key in index_map.iterkeys():
877            if index_name in index_map[key].keys():
878                return index_map[key][index_name]['hosts'], index_map[key][index_name]['id']
879            else:
880                raise Exception ("Index does not exist - {0}".format(index_name))
881
882    def get_index_status_using_index_name(self, index_name, index_map):
883        for key in index_map.iterkeys():
884            if index_name in index_map[key].keys():
885                return index_map[key][index_name]['status'], \
886                       index_map[key][index_name]['progress']
887            else:
888                raise Exception("Index does not exist - {0}".format(index_name))
889
890
891
892
893
894
895