1import paramiko 2import logger 3import json 4import uuid 5import math 6import re 7import time 8import testconstants 9import datetime 10import time 11from datetime import date 12from couchbase_helper.tuq_generators import TuqGenerators 13from remote.remote_util import RemoteMachineShellConnection 14from membase.api.exception import CBQError, ReadDocumentException 15from membase.api.rest_client import RestConnection 16import copy 17 18class N1QLHelper(): 19 def __init__(self, version=None, master=None, shell=None, max_verify=0, buckets=[], item_flag=0, 20 n1ql_port=8093, full_docs_list=[], log=None, input=None, database=None, use_rest=None): 21 self.version = version 22 self.shell = shell 23 self.max_verify = max_verify 24 self.buckets = buckets 25 self.item_flag = item_flag 26 self.n1ql_port = n1ql_port 27 self.input = input 28 self.log = log 29 self.use_rest = use_rest 30 self.full_docs_list = full_docs_list 31 self.master = master 32 self.database = database 33 if self.full_docs_list and len(self.full_docs_list) > 0: 34 self.gen_results = TuqGenerators(self.log, self.full_docs_list) 35 36 def killall_tuq_process(self): 37 self.shell.execute_command("killall cbq-engine") 38 self.shell.execute_command("killall tuqtng") 39 self.shell.execute_command("killall indexer") 40 41 def run_query_from_template(self, query_template): 42 self.query = self.gen_results.generate_query(query_template) 43 expected_result = self.gen_results.generate_expected_result() 44 actual_result = self.run_cbq_query() 45 return actual_result, expected_result 46 47 def run_cbq_query(self, query=None, min_output_size=10, server=None, query_params={}, is_prepared=False, 48 scan_consistency=None, scan_vector=None, verbose=True): 49 if query is None: 50 query = self.query 51 if server is None: 52 server = self.master 53 if server.ip == "127.0.0.1": 54 self.n1ql_port = server.n1ql_port 55 else: 56 if server.ip == "127.0.0.1": 57 self.n1ql_port = server.n1ql_port 58 if self.input.tuq_client and "client" in self.input.tuq_client: 59 server = self.tuq_client 60 if self.n1ql_port is None or self.n1ql_port == '': 61 self.n1ql_port = self.input.param("n1ql_port", 8093) 62 if not self.n1ql_port: 63 self.log.info(" n1ql_port is not defined, processing will not proceed further") 64 raise Exception("n1ql_port is not defined, processing will not proceed further") 65 cred_params = {'creds': []} 66 for bucket in self.buckets: 67 if bucket.saslPassword: 68 cred_params['creds'].append({'user': 'local:%s' % bucket.name, 'pass': bucket.saslPassword}) 69 query_params.update(cred_params) 70 if self.use_rest: 71 query_params = {} 72 if scan_consistency: 73 query_params['scan_consistency']= scan_consistency 74 if scan_vector: 75 query_params['scan_vector']= str(scan_vector).replace("'", '"') 76 if verbose: 77 self.log.info('RUN QUERY %s' % query) 78 result = RestConnection(server).query_tool(query, self.n1ql_port, query_params=query_params, is_prepared = is_prepared, verbose = verbose) 79 else: 80 shell = RemoteMachineShellConnection(server) 81 url = "'http://%s:8093/query/service'" % server.ip 82 cmd = "%s/cbq -engine=http://%s:8093/" % (testconstants.LINUX_COUCHBASE_BIN_PATH, server.ip) 83 query = query.replace('"', '\\"') 84 if "#primary" in query: 85 query = query.replace("'#primary'", '\\"#primary\\"') 86 query = "select curl('POST', " + url + ", {'data' : 'statement=%s'})" % query 87 print query 88 output = shell.execute_commands_inside(cmd, query, "", "", "", "", "") 89 print "-"*128 90 print output 91 new_curl = json.dumps(output[47:]) 92 string_curl = json.loads(new_curl) 93 result = json.loads(string_curl) 94 print result 95 if isinstance(result, str) or 'errors' in result: 96 error_result = str(result) 97 length_display = len(error_result) 98 if length_display > 500: 99 error_result = error_result[:500] 100 raise CBQError(error_result, server.ip) 101 self.log.info("TOTAL ELAPSED TIME: %s" % result["metrics"]["elapsedTime"]) 102 return result 103 104 def _verify_results(self, actual_result, expected_result, missing_count = 1, extra_count = 1): 105 self.log.info(" Analyzing Actual Result") 106 actual_result = self._gen_dict(actual_result) 107 self.log.info(" Analyzing Expected Result") 108 expected_result = self._gen_dict(expected_result) 109 if len(actual_result) != len(expected_result): 110 raise Exception("Results are incorrect.Actual num %s. Expected num: %s.\n" % (len(actual_result), len(expected_result))) 111 msg = "The number of rows match but the results mismatch, please check" 112 if actual_result != expected_result: 113 raise Exception(msg) 114 115 def _verify_results_rqg(self, subquery, aggregate=False, n1ql_result=[], sql_result=[], hints=["a1"], aggregate_pushdown=False): 116 new_n1ql_result = [] 117 for result in n1ql_result: 118 if result != {}: 119 new_n1ql_result.append(result) 120 121 n1ql_result = new_n1ql_result 122 123 if self._is_function_in_result(hints): 124 return self._verify_results_rqg_for_function(n1ql_result, sql_result, aggregate_pushdown=aggregate_pushdown) 125 126 check = self._check_sample(n1ql_result, hints) 127 actual_result = n1ql_result 128 129 if actual_result == [{}]: 130 actual_result = [] 131 if check: 132 actual_result = self._gen_dict(n1ql_result) 133 134 actual_result = sorted(actual_result) 135 expected_result = sorted(sql_result) 136 137 if len(actual_result) != len(expected_result): 138 extra_msg = self._get_failure_message(expected_result, actual_result) 139 raise Exception("Results are incorrect. Actual num %s. Expected num: %s. :: %s \n" % (len(actual_result), len(expected_result), extra_msg)) 140 141 msg = "The number of rows match but the results mismatch, please check" 142 if subquery: 143 for x, y in zip(actual_result, expected_result): 144 if aggregate: 145 productId = x['ABC'][0]['$1'] 146 else: 147 productId = x['ABC'][0]['productId'] 148 if(productId != y['ABC']) or \ 149 x['datetime_field1'] != y['datetime_field1'] or \ 150 x['primary_key_id'] != y['primary_key_id'] or \ 151 x['varchar_field1'] != y['varchar_field1'] or \ 152 x['decimal_field1'] != y['decimal_field1'] or \ 153 x['char_field1'] != y['char_field1'] or \ 154 x['int_field1'] != y['int_field1'] or \ 155 x['bool_field1'] != y['bool_field1']: 156 print "actual_result is %s" % actual_result 157 print "expected result is %s" % expected_result 158 extra_msg = self._get_failure_message(expected_result, actual_result) 159 raise Exception(msg+"\n "+extra_msg) 160 else: 161 if self._sort_data(actual_result) != self._sort_data(expected_result): 162 extra_msg = self._get_failure_message(expected_result, actual_result) 163 raise Exception(msg+"\n "+extra_msg) 164 165 def _sort_data(self, result): 166 new_data = [] 167 for data in result: 168 new_data.append(sorted(data)) 169 return new_data 170 171 def _verify_results_crud_rqg(self, n1ql_result=[], sql_result=[], hints=["primary_key_id"]): 172 new_n1ql_result = [] 173 for result in n1ql_result: 174 if result != {}: 175 new_n1ql_result.append(result) 176 n1ql_result = new_n1ql_result 177 if self._is_function_in_result(hints): 178 return self._verify_results_rqg_for_function(n1ql_result, sql_result) 179 check = self._check_sample(n1ql_result, hints) 180 actual_result = n1ql_result 181 if actual_result == [{}]: 182 actual_result = [] 183 if check: 184 actual_result = self._gen_dict(n1ql_result) 185 actual_result = sorted(actual_result) 186 expected_result = sorted(sql_result) 187 188 if len(actual_result) != len(expected_result): 189 extra_msg = self._get_failure_message(expected_result, actual_result) 190 raise Exception("Results are incorrect. Actual num %s. Expected num: %s.:: %s \n" % (len(actual_result), len(expected_result), extra_msg)) 191 if not self._result_comparison_analysis(actual_result, expected_result): 192 msg = "The number of rows match but the results mismatch, please check" 193 extra_msg = self._get_failure_message(expected_result, actual_result) 194 raise Exception(msg+"\n "+extra_msg) 195 196 def _get_failure_message(self, expected_result, actual_result): 197 if expected_result is None: 198 expected_result = [] 199 if actual_result is None: 200 actual_result = [] 201 len_expected_result = len(expected_result) 202 len_actual_result = len(actual_result) 203 len_expected_result = min(5, len_expected_result) 204 len_actual_result = min(5, len_actual_result) 205 extra_msg = "mismatch in results :: expected :: {0}, actual :: {1} ".format(expected_result[0:len_expected_result], actual_result[0:len_actual_result]) 206 return extra_msg 207 208 def _result_comparison_analysis(self, expected_result, actual_result): 209 expected_map = {} 210 actual_map = {} 211 for data in expected_result: 212 primary=None 213 for key in data.keys(): 214 keys = key 215 if keys.encode('ascii') == "primary_key_id": 216 primary = keys 217 expected_map[data[primary]] = data 218 for data in actual_result: 219 primary = None 220 for key in data.keys(): 221 keys = key 222 if keys.encode('ascii') == "primary_key_id": 223 primary = keys 224 actual_map[data[primary]] = data 225 check = True 226 for key in expected_map.keys(): 227 if sorted(actual_map[key]) != sorted(expected_map[key]): 228 check= False 229 return check 230 231 def _analyze_for_special_case_using_func(self, expected_result, actual_result): 232 if expected_result is None: 233 expected_result = [] 234 if actual_result is None: 235 actual_result = [] 236 if len(expected_result) == 1: 237 value = expected_result[0].values()[0] 238 if value is None or value == 0: 239 expected_result = [] 240 if len(actual_result) == 1: 241 value = actual_result[0].values()[0] 242 if value is None or value == 0: 243 actual_result = [] 244 return expected_result, actual_result 245 246 def _is_function_in_result(self, result): 247 if result == "FUN": 248 return True 249 return False 250 251 def _verify_results_rqg_for_function(self, n1ql_result=[], sql_result=[], hints=["a1"], aggregate_pushdown=False): 252 if not aggregate_pushdown: 253 sql_result, n1ql_result = self._analyze_for_special_case_using_func(sql_result, n1ql_result) 254 if len(sql_result) != len(n1ql_result): 255 msg = "the number of results do not match :: sql = {0}, n1ql = {1}".format(len(sql_result), len(n1ql_result)) 256 extra_msg = self._get_failure_message(sql_result, n1ql_result) 257 raise Exception(msg+"\n"+extra_msg) 258 n1ql_result = self._gen_dict_n1ql_func_result(n1ql_result) 259 n1ql_result = sorted(n1ql_result) 260 sql_result = self._gen_dict_n1ql_func_result(sql_result) 261 sql_result = sorted(sql_result) 262 if len(sql_result) == 0 and len(n1ql_result) == 0: 263 return 264 if sql_result != n1ql_result: 265 i = 0 266 for sql_value, n1ql_value in zip(sql_result, n1ql_result): 267 if sql_value != n1ql_value: 268 break 269 i = i + 1 270 num_results = len(sql_result) 271 last_idx = min(i+5, num_results) 272 msg = "mismatch in results :: result length :: {3}, first mismatch position :: {0}, sql value :: {1}, n1ql value :: {2} ".format(i, sql_result[i:last_idx], n1ql_result[i:last_idx], num_results) 273 raise Exception(msg) 274 275 def _convert_to_number(self, val): 276 if not isinstance(val, str): 277 return val 278 value = -1 279 try: 280 if value == '': 281 return 0 282 value = int(val.split("(")[1].split(")")[0]) 283 except Exception, ex: 284 self.log.info(ex) 285 finally: 286 return value 287 288 def analyze_failure(self, actual, expected): 289 missing_keys = [] 290 different_values = [] 291 for key in expected.keys(): 292 if key not in actual.keys(): 293 missing_keys.append(key) 294 if expected[key] != actual[key]: 295 different_values.append("for key {0}, expected {1} \n actual {2}".format(key, expected[key], actual[key])) 296 self.log.info(missing_keys) 297 if len(different_values) > 0: 298 self.log.info(" number of such cases {0}".format(len(different_values))) 299 self.log.info(" example key {0}".format(different_values[0])) 300 301 def check_missing_and_extra(self, actual, expected): 302 missing = [] 303 extra = [] 304 for item in actual: 305 if not (item in expected): 306 extra.append(item) 307 for item in expected: 308 if not (item in actual): 309 missing.append(item) 310 return missing, extra 311 312 def build_url(self, version): 313 info = self.shell.extract_remote_info() 314 type = info.distribution_type.lower() 315 if type in ["ubuntu", "centos", "red hat"]: 316 url = "https://s3.amazonaws.com/packages.couchbase.com/releases/couchbase-query/dp1/" 317 url += "couchbase-query_%s_%s_linux.tar.gz" % (version, info.architecture_type) 318 #TODO for windows 319 return url 320 321 def _restart_indexer(self): 322 couchbase_path = "/opt/couchbase/var/lib/couchbase" 323 cmd = "rm -f {0}/meta;rm -f /tmp/log_upr_client.sock".format(couchbase_path) 324 self.shell.execute_command(cmd) 325 326 def _start_command_line_query(self, server): 327 self.shell = RemoteMachineShellConnection(server) 328 self._set_env_variable(server) 329 if self.version == "git_repo": 330 os = self.shell.extract_remote_info().type.lower() 331 if os != 'windows': 332 gopath = testconstants.LINUX_GOPATH 333 else: 334 gopath = testconstants.WINDOWS_GOPATH 335 if self.input.tuq_client and "gopath" in self.input.tuq_client: 336 gopath = self.input.tuq_client["gopath"] 337 if os == 'windows': 338 cmd = "cd %s/src/github.com/couchbase/query/server/main; " % (gopath) +\ 339 "./cbq-engine.exe -datastore http://%s:%s/ >/dev/null 2>&1 &" % (server.ip, server.port) 340 else: 341 cmd = "cd %s/src/github.com/couchbase/query//server/main; " % (gopath) +\ 342 "./cbq-engine -datastore http://%s:%s/ >n1ql.log 2>&1 &" % (server.ip, server.port) 343 self.shell.execute_command(cmd) 344 elif self.version == "sherlock": 345 os = self.shell.extract_remote_info().type.lower() 346 if os != 'windows': 347 couchbase_path = testconstants.LINUX_COUCHBASE_BIN_PATH 348 else: 349 couchbase_path = testconstants.WIN_COUCHBASE_BIN_PATH 350 if self.input.tuq_client and "sherlock_path" in self.input.tuq_client: 351 couchbase_path = "%s/bin" % self.input.tuq_client["sherlock_path"] 352 print "PATH TO SHERLOCK: %s" % couchbase_path 353 if os == 'windows': 354 cmd = "cd %s; " % (couchbase_path) +\ 355 "./cbq-engine.exe -datastore http://%s:%s/ >/dev/null 2>&1 &" % (server.ip, server.port) 356 else: 357 cmd = "cd %s; " % (couchbase_path) +\ 358 "./cbq-engine -datastore http://%s:%s/ >n1ql.log 2>&1 &" % (server.ip, server.port) 359 n1ql_port = self.input.param("n1ql_port", None) 360 if server.ip == "127.0.0.1" and server.n1ql_port: 361 n1ql_port = server.n1ql_port 362 if n1ql_port: 363 cmd = "cd %s; " % (couchbase_path) +\ 364 './cbq-engine -datastore http://%s:%s/ -http=":%s">n1ql.log 2>&1 &' % (server.ip, server.port, n1ql_port) 365 self.shell.execute_command(cmd) 366 else: 367 os = self.shell.extract_remote_info().type.lower() 368 if os != 'windows': 369 cmd = "cd /tmp/tuq;./cbq-engine -couchbase http://%s:%s/ >/dev/null 2>&1 &" % (server.ip, server.port) 370 else: 371 cmd = "cd /cygdrive/c/tuq;./cbq-engine.exe -couchbase http://%s:%s/ >/dev/null 2>&1 &" % (server.ip, server.port) 372 self.shell.execute_command(cmd) 373 374 def _parse_query_output(self, output): 375 if output.find("cbq>") == 0: 376 output = output[output.find("cbq>") + 4:].strip() 377 if output.find("tuq_client>") == 0: 378 output = output[output.find("tuq_client>") + 11:].strip() 379 if output.find("cbq>") != -1: 380 output = output[:output.find("cbq>")].strip() 381 if output.find("tuq_client>") != -1: 382 output = output[:output.find("tuq_client>")].strip() 383 return json.loads(output) 384 385 def sort_nested_list(self, result): 386 actual_result = [] 387 for item in result: 388 curr_item = {} 389 for key, value in item.iteritems(): 390 if isinstance(value, list) or isinstance(value, set): 391 curr_item[key] = sorted(value) 392 else: 393 curr_item[key] = value 394 actual_result.append(curr_item) 395 return actual_result 396 397 def configure_gomaxprocs(self): 398 max_proc = self.input.param("gomaxprocs", None) 399 cmd = "export GOMAXPROCS=%s" % max_proc 400 for _ in self.servers: 401 shell_connection = RemoteMachineShellConnection(self.master) 402 shell_connection.execute_command(cmd) 403 404 def drop_primary_index(self, using_gsi = True, server = None): 405 if server is None: 406 server = self.master 407 self.log.info("CHECK FOR PRIMARY INDEXES") 408 for bucket in self.buckets: 409 self.query = "DROP PRIMARY INDEX ON {0}".format(bucket.name) 410 if using_gsi: 411 self.query += " USING GSI" 412 if not using_gsi: 413 self.query += " USING VIEW " 414 self.log.info(self.query) 415 try: 416 check = self._is_index_in_list(bucket.name, "#primary", server = server) 417 if check: 418 self.run_cbq_query(server=server) 419 except Exception, ex: 420 self.log.error('ERROR during index creation %s' % str(ex)) 421 422 def create_primary_index(self, using_gsi=True, server=None): 423 if server is None: 424 server = self.master 425 for bucket in self.buckets: 426 self.query = "CREATE PRIMARY INDEX ON %s " % bucket.name 427 if using_gsi: 428 self.query += " USING GSI" 429 if not using_gsi: 430 self.query += " USING VIEW " 431 if self.use_rest: 432 try: 433 check = self._is_index_in_list(bucket.name, "#primary", server = server) 434 if not check: 435 self.run_cbq_query(server = server,query_params={'timeout' : '900s'}) 436 check = self.is_index_online_and_in_list(bucket.name, "#primary", server = server) 437 if not check: 438 raise Exception(" Timed-out Exception while building primary index for bucket {0} !!!".format(bucket.name)) 439 else: 440 raise Exception(" Primary Index Already present, This looks like a bug !!!") 441 except Exception, ex: 442 self.log.error('ERROR during index creation %s' % str(ex)) 443 raise ex 444 445 def create_partitioned_primary_index(self, using_gsi=True, server=None): 446 if server is None: 447 server = self.master 448 for bucket in self.buckets: 449 self.query = "CREATE PRIMARY INDEX ON %s " % bucket.name 450 if using_gsi: 451 self.query += " PARTITION BY HASH(meta().id) USING GSI" 452 if not using_gsi: 453 self.query += " USING VIEW " 454 if self.use_rest: 455 try: 456 check = self._is_index_in_list(bucket.name, "#primary", 457 server=server) 458 if not check: 459 self.run_cbq_query(server=server, 460 query_params={'timeout': '900s'}) 461 check = self.is_index_online_and_in_list(bucket.name, 462 "#primary", 463 server=server) 464 if not check: 465 raise Exception( 466 " Timed-out Exception while building primary index for bucket {0} !!!".format( 467 bucket.name)) 468 else: 469 raise Exception( 470 " Primary Index Already present, This looks like a bug !!!") 471 except Exception, ex: 472 self.log.error('ERROR during index creation %s' % str(ex)) 473 raise ex 474 475 def verify_index_with_explain(self, actual_result, index_name, check_covering_index=False): 476 check = True 477 if check_covering_index: 478 if "covering" in str(actual_result): 479 check = True 480 else: 481 check = False 482 if index_name in str(actual_result): 483 return True and check 484 return False 485 486 def run_query_and_verify_result(self, server=None, query=None, timeout=120.0, max_try=1, expected_result=None, 487 scan_consistency=None, scan_vector=None, verify_results=True): 488 check = False 489 init_time = time.time() 490 try_count = 0 491 while not check: 492 next_time = time.time() 493 try: 494 actual_result = self.run_cbq_query(query=query, server=server, scan_consistency=scan_consistency, 495 scan_vector=scan_vector) 496 if verify_results: 497 self._verify_results(sorted(actual_result['results']), sorted(expected_result)) 498 else: 499 return "ran query with success and validated results", True 500 check = True 501 except Exception, ex: 502 if next_time - init_time > timeout or try_count >= max_try: 503 return ex, False 504 finally: 505 try_count += 1 506 return "ran query with success and validated results", check 507 508 def get_index_names(self, server=None): 509 query = "select distinct(name) from system:indexes where `using`='gsi'" 510 index_names = [] 511 if server is None: 512 server = self.master 513 res = self.run_cbq_query(query=query, server=server) 514 for item in res['results']: 515 index_names.append(item['name']) 516 return index_names 517 518 def is_index_online_and_in_list(self, bucket, index_name, server=None, timeout=600.0): 519 check = self._is_index_in_list(bucket, index_name, server=server) 520 init_time = time.time() 521 while not check: 522 time.sleep(1) 523 check = self._is_index_in_list(bucket, index_name, server=server) 524 next_time = time.time() 525 if check or (next_time - init_time > timeout): 526 return check 527 return check 528 529 def is_index_ready_and_in_list(self, bucket, index_name, server=None, timeout=600.0): 530 query = "SELECT * FROM system:indexes where name = \'{0}\'".format(index_name) 531 if server is None: 532 server = self.master 533 init_time = time.time() 534 check = False 535 while not check: 536 res = self.run_cbq_query(query=query, server=server) 537 for item in res['results']: 538 if 'keyspace_id' not in item['indexes']: 539 check = False 540 elif item['indexes']['keyspace_id'] == str(bucket) \ 541 and item['indexes']['name'] == index_name \ 542 and item['indexes']['state'] == "online": 543 check = True 544 time.sleep(1) 545 next_time = time.time() 546 check = check or (next_time - init_time > timeout) 547 return check 548 549 def is_index_online_and_in_list_bulk(self, bucket, index_names=[], server=None, index_state="online", timeout=600.0): 550 check, index_names = self._is_index_in_list_bulk(bucket, index_names, server=server, index_state=index_state) 551 init_time = time.time() 552 while not check: 553 check, index_names = self._is_index_in_list_bulk(bucket, index_names, server=server, index_state=index_state) 554 next_time = time.time() 555 if check or (next_time - init_time > timeout): 556 return check 557 return check 558 559 def gen_build_index_query(self, bucket="default", index_list=[]): 560 return "BUILD INDEX on {0}({1}) USING GSI".format(bucket, ",".join(index_list)) 561 562 def gen_query_parameter(self, scan_vector=None, scan_consistency=None): 563 query_params = {} 564 if scan_vector: 565 query_params.update("scan_vector", scan_vector) 566 if scan_consistency: 567 query_params.update("scan_consistency", scan_consistency) 568 return query_params 569 570 def _is_index_in_list(self, bucket, index_name, server=None, index_state=["pending", "building", "deferred"]): 571 query = "SELECT * FROM system:indexes where name = \'{0}\'".format(index_name) 572 if server is None: 573 server = self.master 574 res = self.run_cbq_query(query=query, server=server) 575 for item in res['results']: 576 if 'keyspace_id' not in item['indexes']: 577 return False 578 if item['indexes']['keyspace_id'] == str(bucket) and item['indexes']['name'] == index_name and item['indexes']['state'] not in index_state: 579 return True 580 return False 581 582 def _is_index_in_list_bulk(self, bucket, index_names=[], server=None, index_state=["pending","building"]): 583 query = "SELECT * FROM system:indexes" 584 if server is None: 585 server = self.master 586 res = self.run_cbq_query(query=query, server=server) 587 found_index_list = [] 588 for item in res['results']: 589 if 'keyspace_id' not in item['indexes']: 590 return False 591 for index_name in index_names: 592 if item['indexes']['keyspace_id'] == str(bucket) and item['indexes']['name'] == index_name and item['indexes']['state'] not in index_state: 593 found_index_list.append(index_name) 594 if len(found_index_list) == len(index_names): 595 return True, [] 596 return False, list(set(index_names) - set(found_index_list)) 597 598 def gen_index_map(self, server=None): 599 query = "SELECT * FROM system:indexes" 600 if server is None: 601 server = self.master 602 res = self.run_cbq_query(query=query, server=server) 603 index_map = {} 604 for item in res['results']: 605 bucket_name = item['indexes']['keyspace_id'].encode('ascii', 'ignore') 606 if bucket_name not in index_map.keys(): 607 index_map[bucket_name] = {} 608 index_name = str(item['indexes']['name']) 609 index_map[bucket_name][index_name] = {} 610 index_map[bucket_name][index_name]['state'] = item['indexes']['state'] 611 return index_map 612 613 def get_index_count_using_primary_index(self, buckets, server=None): 614 query = "SELECT COUNT(*) FROM {0}" 615 map= {} 616 if server is None: 617 server = self.master 618 for bucket in buckets: 619 res = self.run_cbq_query(query=query.format(bucket.name), server=server) 620 map[bucket.name] = int(res["results"][0]["$1"]) 621 return map 622 623 def get_index_count_using_index(self, bucket, index_name, server=None): 624 query = 'SELECT COUNT(*) FROM {0} USE INDEX ({1})'.format(bucket.name, index_name) 625 if not server: 626 server = self.master 627 res = self.run_cbq_query(query=query, server=server) 628 return int(res['results'][0]['$1']) 629 630 def _gen_dict(self, result): 631 result_set = [] 632 if result is not None and len(result) > 0: 633 for val in result: 634 for key in val.keys(): 635 result_set.append(val[key]) 636 return result_set 637 638 def _gen_dict_n1ql_func_result(self, result): 639 result_set = [val[key] for val in result for key in val.keys()] 640 new_result_set = [] 641 if len(result_set) > 0: 642 for value in result_set: 643 if isinstance(value, float): 644 new_result_set.append(round(value, 0)) 645 elif value == 'None': 646 new_result_set.append(None) 647 else: 648 new_result_set.append(value) 649 else: 650 new_result_set = result_set 651 return new_result_set 652 653 def _check_sample(self, result, expected_in_key=None): 654 if expected_in_key == "FUN": 655 return False 656 if expected_in_key is None or len(expected_in_key) == 0: 657 return False 658 if result is not None and len(result) > 0: 659 sample = result[0] 660 for key in sample.keys(): 661 for sample in expected_in_key: 662 if key in sample: 663 return True 664 return False 665 666 def old_gen_dict(self, result): 667 result_set = [] 668 map = {} 669 duplicate_keys = [] 670 try: 671 if result is not None and len(result) > 0: 672 for val in result: 673 for key in val.keys(): 674 result_set.append(val[key]) 675 for val in result_set: 676 if val["_id"] in map.keys(): 677 duplicate_keys.append(val["_id"]) 678 map[val["_id"]] = val 679 keys = map.keys() 680 keys.sort() 681 except Exception, ex: 682 self.log.info(ex) 683 raise 684 if len(duplicate_keys) > 0: 685 raise Exception(" duplicate_keys {0}".format(duplicate_keys)) 686 return map 687 688 def _set_env_variable(self, server): 689 self.shell.execute_command("export NS_SERVER_CBAUTH_URL=\"http://{0}:{1}/_cbauth\"".format(server.ip, server.port)) 690 self.shell.execute_command("export NS_SERVER_CBAUTH_USER=\"{0}\"".format(server.rest_username)) 691 self.shell.execute_command("export NS_SERVER_CBAUTH_PWD=\"{0}\"".format(server.rest_password)) 692 self.shell.execute_command("export NS_SERVER_CBAUTH_RPC_URL=\"http://{0}:{1}/cbauth-demo\"".format(server.ip, server.port)) 693 self.shell.execute_command("export CBAUTH_REVRPC_URL=\"http://{0}:{1}@{2}:{3}/query\"".format(server.rest_username, server.rest_password, server.ip, server.port)) 694 695 def verify_indexes_redistributed(self, map_before_rebalance, map_after_rebalance, stats_map_before_rebalance, 696 stats_map_after_rebalance, nodes_in, nodes_out, swap_rebalance=False): 697 # verify that number of indexes before and after rebalance are same 698 no_of_indexes_before_rebalance = 0 699 no_of_indexes_after_rebalance = 0 700 for bucket in map_before_rebalance: 701 no_of_indexes_before_rebalance += len(map_before_rebalance[bucket]) 702 for bucket in map_after_rebalance: 703 no_of_indexes_after_rebalance += len(map_after_rebalance[bucket]) 704 self.log.info("Number of indexes before rebalance : {0}".format(no_of_indexes_before_rebalance)) 705 self.log.info("Number of indexes after rebalance : {0}".format(no_of_indexes_after_rebalance)) 706 if no_of_indexes_before_rebalance != no_of_indexes_after_rebalance: 707 self.log.info("some indexes are missing after rebalance") 708 raise Exception("some indexes are missing after rebalance") 709 710 # verify that index names before and after rebalance are same 711 index_names_before_rebalance = [] 712 index_names_after_rebalance = [] 713 for bucket in map_before_rebalance: 714 for index in map_before_rebalance[bucket]: 715 index_names_before_rebalance.append(index) 716 for bucket in map_after_rebalance: 717 for index in map_after_rebalance[bucket]: 718 index_names_after_rebalance.append(index) 719 self.log.info("Index names before rebalance : {0}".format(sorted(index_names_before_rebalance))) 720 self.log.info("Index names after rebalance : {0}".format(sorted(index_names_after_rebalance))) 721 if sorted(index_names_before_rebalance) != sorted(index_names_after_rebalance): 722 self.log.info("number of indexes are same but index names don't match") 723 raise Exception("number of indexes are same but index names don't match") 724 725 # verify that rebalanced out nodes are not present 726 host_names_before_rebalance = [] 727 host_names_after_rebalance = [] 728 for bucket in map_before_rebalance: 729 for index in map_before_rebalance[bucket]: 730 host_names_before_rebalance.append(map_before_rebalance[bucket][index]['hosts']) 731 indexer_nodes_before_rebalance = sorted(set(host_names_before_rebalance)) 732 for bucket in map_after_rebalance: 733 for index in map_after_rebalance[bucket]: 734 host_names_after_rebalance.append(map_after_rebalance[bucket][index]['hosts']) 735 indexer_nodes_after_rebalance = sorted(set(host_names_after_rebalance)) 736 self.log.info("Host names of indexer nodes before rebalance : {0}".format(indexer_nodes_before_rebalance)) 737 self.log.info("Host names of indexer nodes after rebalance : {0}".format(indexer_nodes_after_rebalance)) 738 # indexes need to redistributed in case of rebalance out, not necessarily in case of rebalance in 739 if nodes_out and indexer_nodes_before_rebalance == indexer_nodes_after_rebalance: 740 self.log.info("Even after rebalance some of rebalanced out nodes still have indexes") 741 raise Exception("Even after rebalance some of rebalanced out nodes still have indexes") 742 for node_out in nodes_out: 743 if node_out in indexer_nodes_after_rebalance: 744 self.log.info("rebalanced out node still present after rebalance {0} : {1}".format(node_out, 745 indexer_nodes_after_rebalance)) 746 raise Exception("rebalanced out node still present after rebalance") 747 if swap_rebalance: 748 for node_in in nodes_in: 749 # strip of unnecessary data for comparison 750 ip_address = str(node_in).replace("ip:", "").replace(" port", "").replace(" ssh_username:root", "").replace(" ssh_username:Administrator", "") 751 if ip_address not in indexer_nodes_after_rebalance: 752 self.log.info("swap rebalanced in node is not distributed any indexes") 753 raise Exception("swap rebalanced in node is not distributed any indexes") 754 755 # verify that items_count before and after rebalance are same 756 items_count_before_rebalance = {} 757 items_count_after_rebalance = {} 758 for bucket in stats_map_before_rebalance: 759 for index in stats_map_before_rebalance[bucket]: 760 items_count_before_rebalance[index] = stats_map_before_rebalance[bucket][index][ 761 "items_count"] 762 for bucket in stats_map_after_rebalance: 763 for index in stats_map_after_rebalance[bucket]: 764 items_count_after_rebalance[index] = stats_map_after_rebalance[bucket][index]["items_count"] 765 self.log.info("item_count of indexes before rebalance {0}".format(items_count_before_rebalance)) 766 self.log.info("item_count of indexes after rebalance {0}".format(items_count_after_rebalance)) 767 if cmp(items_count_before_rebalance, items_count_after_rebalance) != 0: 768 self.log.info("items_count mismatch") 769 raise Exception("items_count mismatch") 770 771 # verify that index status before and after rebalance are same 772 index_state_before_rebalance = {} 773 index_state_after_rebalance = {} 774 for bucket in map_before_rebalance: 775 for index in map_before_rebalance[bucket]: 776 index_state_before_rebalance[index] = map_before_rebalance[bucket][index]["status"] 777 for bucket in map_after_rebalance: 778 for index in map_after_rebalance[bucket]: 779 index_state_after_rebalance[index] = map_after_rebalance[bucket][index]["status"] 780 self.log.info("index status of indexes rebalance {0}".format(index_state_before_rebalance)) 781 self.log.info("index status of indexes rebalance {0}".format(index_state_after_rebalance)) 782 if cmp(index_state_before_rebalance, index_state_after_rebalance) != 0: 783 self.log.info("index status mismatch") 784 raise Exception("index status mismatch") 785 786 # Rebalance is not guaranteed to achieve a balanced cluster. 787 # The indexes will be distributed in a manner to satisfy the resource requirements of each index. 788 # Hence printing the index distribution just for logging/debugging purposes 789 index_distribution_map_before_rebalance = {} 790 index_distribution_map_after_rebalance = {} 791 for node in host_names_before_rebalance: 792 index_distribution_map_before_rebalance[node] = index_distribution_map_before_rebalance.get(node, 0) + 1 793 for node in host_names_after_rebalance: 794 index_distribution_map_after_rebalance[node] = index_distribution_map_after_rebalance.get(node, 0) + 1 795 self.log.info("Distribution of indexes before rebalance") 796 for k, v in index_distribution_map_before_rebalance.iteritems(): 797 print k, v 798 self.log.info("Distribution of indexes after rebalance") 799 for k, v in index_distribution_map_after_rebalance.iteritems(): 800 print k, v 801 802 def verify_replica_indexes(self, index_names, index_map, num_replicas, expected_nodes=None): 803 # 1. Validate count of no_of_indexes 804 # 2. Validate index names 805 # 3. Validate index replica have the same id 806 # 4. Validate index replicas are on different hosts 807 808 nodes = [] 809 for index_name in index_names: 810 index_host_name, index_id = self.get_index_details_using_index_name(index_name, index_map) 811 nodes.append(index_host_name) 812 813 for i in range(0, num_replicas): 814 index_replica_name = index_name + " (replica {0})".format(str(i+1)) 815 816 try: 817 index_replica_hostname, index_replica_id = self.get_index_details_using_index_name( 818 index_replica_name, index_map) 819 except Exception, ex: 820 self.log.info(str(ex)) 821 raise Exception(str(ex)) 822 823 self.log.info("Hostnames : %s , %s" % (index_host_name, index_replica_hostname)) 824 self.log.info("Index IDs : %s, %s" % (index_id, index_replica_id)) 825 826 nodes.append(index_replica_hostname) 827 828 if index_id != index_replica_id: 829 self.log.info("Index ID for main index and replica indexes not same") 830 raise Exception("index id different for replicas") 831 832 if index_host_name == index_replica_hostname: 833 self.log.info("Index hostname for main index and replica indexes are same") 834 raise Exception("index hostname same for replicas") 835 836 if expected_nodes: 837 expected_nodes = expected_nodes.sort() 838 nodes = nodes.sort() 839 if not expected_nodes == nodes: 840 self.fail("Replicas not created on expected hosts") 841 842 def verify_replica_indexes_build_status(self, index_map, num_replicas, defer_build=False): 843 844 index_names = self.get_index_names() 845 846 for index_name in index_names: 847 index_status, index_build_progress = self.get_index_status_using_index_name(index_name, index_map) 848 if not defer_build and index_status != "Ready": 849 self.log.info("Expected %s status to be Ready, but it is %s" % (index_name, index_status)) 850 raise Exception("Index status incorrect") 851 elif defer_build and index_status != "Created": 852 self.log.info( 853 "Expected %s status to be Created, but it is %s" % (index_name, index_status)) 854 raise Exception("Index status incorrect") 855 else: 856 self.log.info("index_name = %s, defer_build = %s, index_status = %s" % (index_name, defer_build, index_status)) 857 858 for i in range(1, num_replicas+1): 859 index_replica_name = index_name + " (replica {0})".format(str(i)) 860 try: 861 index_replica_status, index_replica_progress = self.get_index_status_using_index_name(index_replica_name, index_map) 862 except Exception, ex: 863 self.log.info(str(ex)) 864 raise Exception(str(ex)) 865 866 if not defer_build and index_replica_status != "Ready": 867 self.log.info("Expected %s status to be Ready, but it is %s" % (index_replica_name, index_replica_status)) 868 raise Exception("Index status incorrect") 869 elif defer_build and index_replica_status != "Created": 870 self.log.info("Expected %s status to be Created, but it is %s" % (index_replica_name, index_replica_status)) 871 raise Exception("Index status incorrect") 872 else: 873 self.log.info("index_name = %s, defer_build = %s, index_replica_status = %s" % (index_replica_name, defer_build, index_status)) 874 875 def get_index_details_using_index_name(self, index_name, index_map): 876 for key in index_map.iterkeys(): 877 if index_name in index_map[key].keys(): 878 return index_map[key][index_name]['hosts'], index_map[key][index_name]['id'] 879 else: 880 raise Exception ("Index does not exist - {0}".format(index_name)) 881 882 def get_index_status_using_index_name(self, index_name, index_map): 883 for key in index_map.iterkeys(): 884 if index_name in index_map[key].keys(): 885 return index_map[key][index_name]['status'], \ 886 index_map[key][index_name]['progress'] 887 else: 888 raise Exception("Index does not exist - {0}".format(index_name)) 889 890 891 892 893 894 895