1import base64 2import json 3import urllib 4import httplib2 5import socket 6import time 7import logger 8import uuid 9from copy import deepcopy 10from threading import Thread 11 12try: 13 from couchbase_helper.document import DesignDocument, View 14except ImportError: 15 from lib.couchbase_helper.document import DesignDocument, View 16 17from memcached.helper.kvstore import KVStore 18from exception import ServerAlreadyJoinedException, ServerUnavailableException, InvalidArgumentException 19from membase.api.exception import BucketCreationException, ServerSelfJoinException, ClusterRemoteException, \ 20 RebalanceFailedException, FailoverFailedException, DesignDocCreationException, QueryViewException, \ 21 ReadDocumentException, GetBucketInfoFailed, CompactViewFailed, SetViewInfoNotFound, AddNodeException, \ 22 BucketFlushFailed, CBRecoveryFailedException, XDCRException, SetRecoveryTypeFailed, BucketCompactionException 23log = logger.Logger.get_logger() 24 25#helper library methods built on top of RestConnection interface 26 27class RestHelper(object): 28 def __init__(self, rest_connection): 29 self.rest = rest_connection 30 31 def is_ns_server_running(self, timeout_in_seconds=360): 32 end_time = time.time() + timeout_in_seconds 33 while time.time() <= end_time: 34 try: 35 status = self.rest.get_nodes_self(5) 36 if status is not None and status.status == 'healthy': 37 return True 38 else: 39 if status is not None: 40 log.warn("server {0}:{1} status is {2}".format(self.rest.ip, self.rest.port, status.status)) 41 else: 42 log.warn("server {0}:{1} status is down".format(self.rest.ip, self.rest.port)) 43 except ServerUnavailableException: 44 log.error("server {0}:{1} is unavailable".format(self.rest.ip, self.rest.port)) 45 time.sleep(1) 46 msg = 'unable to connect to the node {0} even after waiting {1} seconds' 47 log.error(msg.format(self.rest.ip, timeout_in_seconds)) 48 return False 49 50 def is_cluster_healthy(self, timeout=120): 51 #get the nodes and verify that all the nodes.status are healthy 52 nodes = self.rest.node_statuses(timeout) 53 return all(node.status == 'healthy' for node in nodes) 54 55 def rebalance_reached(self, percentage=100): 56 start = time.time() 57 progress = 0 58 previous_progress = 0 59 retry = 0 60 while progress is not -1 and progress < percentage and retry < 40: 61 #-1 is error , -100 means could not retrieve progress 62 progress = self.rest._rebalance_progress() 63 if progress == -100: 64 log.error("unable to retrieve rebalanceProgress.try again in 2 seconds") 65 retry += 1 66 else: 67 if previous_progress == progress: 68 retry += 0.5 69 else: 70 retry = 0 71 previous_progress = progress 72 #sleep for 2 seconds 73 time.sleep(2) 74 if progress <= 0: 75 log.error("rebalance progress code : {0}".format(progress)) 76 return False 77 elif retry >= 40: 78 log.error("rebalance stuck on {0}%".format(progress)) 79 return False 80 else: 81 duration = time.time() - start 82 log.info('rebalance reached >{0}% in {1} seconds '.format(progress, duration)) 83 return True 84 85 #return true if cluster balanced, false if it needs rebalance 86 def is_cluster_rebalanced(self): 87 command = "ns_orchestrator:needs_rebalance()" 88 status, content = self.rest.diag_eval(command) 89 if status: 90 return content.lower() == "false" 91 log.error("can't define if cluster balanced") 92 return None 93 94 95 #this method will rebalance the cluster by passing the remote_node as 96 #ejected node 97 def remove_nodes(self, knownNodes, ejectedNodes, wait_for_rebalance=True): 98 if len(ejectedNodes) == 0: 99 return False 100 self.rest.rebalance(knownNodes, ejectedNodes) 101 if wait_for_rebalance: 102 return self.rest.monitorRebalance() 103 else: 104 return False 105 106 def vbucket_map_ready(self, bucket, timeout_in_seconds=360): 107 end_time = time.time() + timeout_in_seconds 108 while time.time() <= end_time: 109 vBuckets = self.rest.get_vbuckets(bucket) 110 if vBuckets: 111 return True 112 else: 113 time.sleep(0.5) 114 msg = 'vbucket map is not ready for bucket {0} after waiting {1} seconds' 115 log.info(msg.format(bucket, timeout_in_seconds)) 116 return False 117 118 def bucket_exists(self, bucket): 119 try: 120 buckets = self.rest.get_buckets() 121 names = [item.name for item in buckets] 122 log.info("node {1} existing buckets : {0}" \ 123 .format(names, self.rest.ip)) 124 for item in buckets: 125 if item.name == bucket: 126 log.info("node {1} found bucket {0}" \ 127 .format(bucket, self.rest.ip)) 128 return True 129 return False 130 except Exception: 131 return False 132 133 def wait_for_node_status(self, node, expected_status, timeout_in_seconds): 134 status_reached = False 135 end_time = time.time() + timeout_in_seconds 136 while time.time() <= end_time and not status_reached: 137 nodes = self.rest.node_statuses() 138 for n in nodes: 139 if node.id == n.id: 140 log.info('node {0} status : {1}'.format(node.id, n.status)) 141 if n.status.lower() == expected_status.lower(): 142 status_reached = True 143 break 144 if not status_reached: 145 log.info("sleep for 5 seconds before reading the node.status again") 146 time.sleep(5) 147 log.info('node {0} status_reached : {1}'.format(node.id, status_reached)) 148 return status_reached 149 150 def _wait_for_task_pid(self, pid, end_time, ddoc_name): 151 while (time.time() < end_time): 152 new_pid, _ = self.rest._get_indexer_task_pid(ddoc_name) 153 if pid == new_pid: 154 time.sleep(5) 155 continue 156 else: 157 return 158 159 def _wait_for_indexer_ddoc(self, servers, ddoc_name, timeout=300): 160 nodes = self.rest.get_nodes() 161 servers_to_check = [] 162 for node in nodes: 163 for server in servers: 164 if node.ip == server.ip and str(node.port) == str(server.port): 165 servers_to_check.append(server) 166 for server in servers_to_check: 167 try: 168 rest = RestConnection(server) 169 log.info('Check index for ddoc %s , server %s' % (ddoc_name, server.ip)) 170 end_time = time.time() + timeout 171 log.info('Start getting index for ddoc %s , server %s' % (ddoc_name, server.ip)) 172 old_pid, is_pid_blocked = rest._get_indexer_task_pid(ddoc_name) 173 if not old_pid: 174 log.info('Index for ddoc %s is not going on, server %s' % (ddoc_name, server.ip)) 175 continue 176 while is_pid_blocked: 177 log.info('Index for ddoc %s is blocked, server %s' % (ddoc_name, server.ip)) 178 self._wait_for_task_pid(old_pid, end_time, ddoc_name) 179 old_pid, is_pid_blocked = rest._get_indexer_task_pid(ddoc_name) 180 if time.time() > end_time: 181 log.error("INDEX IS STILL BLOKED node %s ddoc % pid %" % (server, ddoc_name, old_pid)) 182 break 183 if old_pid: 184 log.info('Index for ddoc %s is running, server %s' % (ddoc_name, server.ip)) 185 self._wait_for_task_pid(old_pid, end_time, ddoc_name) 186 except Exception, ex: 187 log.error('unable to check index on server %s because of %s' % (server.ip, str(ex))) 188 189 def _get_vbuckets(self, servers, bucket_name='default'): 190 vbuckets_servers = {} 191 for server in servers: 192 buckets = RestConnection(server).get_buckets() 193 if bucket_name: 194 bucket_to_check = [bucket for bucket in buckets 195 if bucket.name == bucket_name][0] 196 else: 197 bucket_to_check = [bucket for bucket in buckets][0] 198 vbuckets_servers[server] = {} 199 vbs_active = [vb.id for vb in bucket_to_check.vbuckets 200 if vb.master.startswith(str(server.ip))] 201 vbs_replica = [] 202 for replica_num in xrange(0, bucket_to_check.numReplicas): 203 vbs_replica.extend([vb.id for vb in bucket_to_check.vbuckets 204 if vb.replica[replica_num].startswith(str(server.ip))]) 205 vbuckets_servers[server]['active_vb'] = vbs_active 206 vbuckets_servers[server]['replica_vb'] = vbs_replica 207 return vbuckets_servers 208 209class RestConnection(object): 210 211 def __new__(self, serverInfo={}): 212 213 214 # allow port to determine 215 # behavior of restconnection 216 port = None 217 if isinstance(serverInfo, dict): 218 if 'port' in serverInfo: 219 port = serverInfo['port'] 220 else: 221 port = serverInfo.port 222 223 if not port: 224 port = 8091 225 226 if int(port) in xrange(9091, 9100): 227 # return elastic search rest connection 228 from membase.api.esrest_client import EsRestConnection 229 obj = object.__new__(EsRestConnection, serverInfo) 230 else: 231 # default 232 obj = object.__new__(self, serverInfo) 233 return obj 234 235 def __init__(self, serverInfo): 236 #serverInfo can be a json object 237 if isinstance(serverInfo, dict): 238 self.ip = serverInfo["ip"] 239 self.username = serverInfo["username"] 240 self.password = serverInfo["password"] 241 self.port = serverInfo["port"] 242 self.hostname = '' 243 if "hostname" in serverInfo: 244 self.hostname = serverInfo["hostname"] 245 else: 246 self.ip = serverInfo.ip 247 self.username = serverInfo.rest_username 248 self.password = serverInfo.rest_password 249 self.port = serverInfo.port 250 self.hostname = '' 251 if hasattr(serverInfo, 'hostname') and serverInfo.hostname and\ 252 serverInfo.hostname.find(self.ip) == -1: 253 self.hostname = serverInfo.hostname 254 self.baseUrl = "http://{0}:{1}/".format(self.ip, self.port) 255 self.capiBaseUrl = "http://{0}:{1}/".format(self.ip, 8092) 256 if self.hostname: 257 self.baseUrl = "http://{0}:{1}/".format(self.hostname, self.port) 258 self.capiBaseUrl = "http://{0}:{1}/".format(self.hostname, 8092) 259 #for Node is unknown to this cluster error 260 for iteration in xrange(5): 261 http_res, success = self.init_http_request(self.baseUrl + 'nodes/self') 262 if not success and type(http_res) == unicode and\ 263 (http_res.find('Node is unknown to this cluster') > -1 or http_res.find('Unexpected server error, request logged') > -1): 264 log.error("Error {0} was gotten, 5 seconds sleep before retry".format(http_res)) 265 time.sleep(5) 266 if iteration == 2: 267 log.error("node {0}:{1} is in a broken state!".format(self.ip, self.port)) 268 raise ServerUnavailableException(self.ip) 269 continue 270 else: 271 break 272 #determine the real couchApiBase for cluster_run 273 #couchApiBase appeared in version 2.* 274 if not http_res or http_res["version"][0:2] == "1.": 275 self.capiBaseUrl = self.baseUrl + "/couchBase" 276 else: 277 for iteration in xrange(5): 278 if "couchApiBase" not in http_res.keys(): 279 if self.is_cluster_mixed(): 280 self.capiBaseUrl = self.baseUrl + "/couchBase" 281 return 282 time.sleep(0.2) 283 http_res, success = self.init_http_request(self.baseUrl + 'nodes/self') 284 else: 285 self.capiBaseUrl = http_res["couchApiBase"] 286 return 287 raise ServerUnavailableException("couchApiBase doesn't exist in nodes/self: %s " % http_res) 288 289 def sasl_streaming_rq(self, bucket, timeout=120): 290 api = self.baseUrl + 'pools/default/bucketsStreaming/{0}'.format(bucket) 291 if isinstance(bucket, Bucket): 292 api = self.baseUrl + 'pools/default/bucketsStreaming/{0}'.format(bucket.name) 293 try: 294 httplib2.Http(timeout=timeout).request(api, 'GET', '', 295 headers=self._create_capi_headers_with_auth(self.username, self.password)) 296 except Exception, ex: 297 log.warn('Exception while streaming: %s' % str(ex)) 298 299 def open_sasl_streaming_connection(self, bucket, timeout=1000): 300 log.info("Opening sasl streaming connection for bucket %s" % 301 (bucket, bucket.name)[isinstance(bucket, Bucket)]) 302 t = Thread(target=self.sasl_streaming_rq, 303 name="streaming_" + str(uuid.uuid4())[:4], 304 args=(bucket, timeout)) 305 try: 306 t.start() 307 except: 308 log.warn("thread is not started") 309 return None 310 return t 311 312 def is_cluster_mixed(self): 313 http_res, success = self.init_http_request(self.baseUrl + 'pools/default') 314 if http_res == u'unknown pool': 315 return False 316 versions = list(set([node["version"][:1] for node in http_res["nodes"]])) 317 if '1' in versions and '2' in versions: 318 return True 319 return False 320 321 def is_enterprise_edition(self): 322 http_res, success = self.init_http_request(self.baseUrl + 'pools/default') 323 if http_res == u'unknown pool': 324 return False 325 editions = [] 326 community_nodes = [] 327 """ get the last word in node["version"] as in "version": "2.5.1-1073-rel-enterprise" """ 328 for node in http_res["nodes"]: 329 editions.extend(node["version"].split("-")[-1:]) 330 if "community" in node["version"].split("-")[-1:]: 331 community_nodes.extend(node["hostname"].split(":")[:1]) 332 if "community" in editions: 333 log.error("IP(s) for node(s) with community edition {0}".format(community_nodes)) 334 return False 335 return True 336 337 def init_http_request(self, api): 338 try: 339 status, content, header = self._http_request(api, 'GET', headers=self._create_capi_headers_with_auth(self.username, self.password)) 340 json_parsed = json.loads(content) 341 if status: 342 return json_parsed, True 343 else: 344 print("{0} with status {1}: {2}".format(api, status, json_parsed)) 345 return json_parsed, False 346 except ValueError: 347 print("{0}: {1}".format(api, content)) 348 return content, False 349 350 def rename_node(self, hostname, username='Administrator', password='password'): 351 params = urllib.urlencode({'username': username, 352 'password': password, 353 'hostname': hostname}) 354 355 api = "%snode/controller/rename" % (self.baseUrl) 356 status, content, header = self._http_request(api, 'POST', params) 357 return status, content 358 359 def active_tasks(self): 360 api = self.capiBaseUrl + "_active_tasks" 361 try: 362 status, content, header = self._http_request(api, 'GET', headers=self._create_capi_headers()) 363 json_parsed = json.loads(content) 364 except ValueError: 365 return "" 366 return json_parsed 367 368 def ns_server_tasks(self): 369 api = self.baseUrl + 'pools/default/tasks' 370 try: 371 status, content, header = self._http_request(api, 'GET', headers=self._create_headers()) 372 return json.loads(content) 373 except ValueError: 374 return "" 375 376 # DEPRECATED: use create_ddoc() instead. 377 def create_view(self, design_doc_name, bucket_name, views, options=None): 378 return self.create_ddoc(design_doc_name, bucket_name, views, options) 379 380 def create_ddoc(self, design_doc_name, bucket, views, options=None): 381 design_doc = DesignDocument(design_doc_name, views, options=options) 382 if design_doc.name.find('/') != -1: 383 design_doc.name = design_doc.name.replace('/', '%2f') 384 design_doc.id = '_design/{0}'.format(design_doc.name) 385 return self.create_design_document(bucket, design_doc) 386 387 def create_design_document(self, bucket, design_doc): 388 design_doc_name = design_doc.id 389 api = '%s/%s/%s' % (self.capiBaseUrl, bucket, design_doc_name) 390 if isinstance(bucket, Bucket): 391 api = '%s/%s/%s' % (self.capiBaseUrl, bucket.name, design_doc_name) 392 393 if isinstance(bucket, Bucket) and bucket.authType == "sasl": 394 status, content, header = self._http_request(api, 'PUT', str(design_doc), 395 headers=self._create_capi_headers_with_auth( 396 username=bucket.name, password=bucket.saslPassword)) 397 else: 398 status, content, header = self._http_request(api, 'PUT', str(design_doc), 399 headers=self._create_capi_headers()) 400 if not status: 401 raise DesignDocCreationException(design_doc_name, content) 402 return json.loads(content) 403 404 def is_index_triggered(self, ddoc_name, index_type='main'): 405 run, block = self._get_indexer_task_pid(ddoc_name, index_type=index_type) 406 if run or block: 407 return True 408 else: 409 return False 410 411 def _get_indexer_task_pid(self, ddoc_name, index_type='main'): 412 active_tasks = self.active_tasks() 413 if u'error' in active_tasks: 414 return None 415 if active_tasks: 416 for task in active_tasks: 417 if task['type'] == 'indexer' and task['indexer_type'] == index_type: 418 for ddoc in task['design_documents']: 419 if ddoc == ('_design/%s' % ddoc_name): 420 return task['pid'], False 421 if task['type'] == 'blocked_indexer' and task['indexer_type'] == index_type: 422 for ddoc in task['design_documents']: 423 if ddoc == ('_design/%s' % ddoc_name): 424 return task['pid'], True 425 return None, None 426 427 def query_view(self, design_doc_name, view_name, bucket, query, timeout=120, invalid_query=False, type="view"): 428 status, content, header = self._query(design_doc_name, view_name, bucket, type, query, timeout) 429 if not status and not invalid_query: 430 stat = 0 431 if 'status' in header: 432 stat = int(header['status']) 433 raise QueryViewException(view_name, content, status=stat) 434 return json.loads(content) 435 436 def _query(self, design_doc_name, view_name, bucket, view_type, query, timeout): 437 if design_doc_name.find('/') != -1: 438 design_doc_name = design_doc_name.replace('/', '%2f') 439 if view_name.find('/') != -1: 440 view_name = view_name.replace('/', '%2f') 441 api = self.capiBaseUrl + '%s/_design/%s/_%s/%s?%s' % (bucket, 442 design_doc_name, view_type, 443 view_name, 444 urllib.urlencode(query)) 445 if isinstance(bucket, Bucket): 446 api = self.capiBaseUrl + '%s/_design/%s/_%s/%s?%s' % (bucket.name, 447 design_doc_name, view_type, 448 view_name, 449 urllib.urlencode(query)) 450 log.info("index query url: {0}".format(api)) 451 if isinstance(bucket, Bucket) and bucket.authType == "sasl": 452 status, content, header = self._http_request(api, headers=self._create_capi_headers_with_auth( 453 username=bucket.name, password=bucket.saslPassword), 454 timeout=timeout) 455 else: 456 status, content, header = self._http_request(api, headers=self._create_capi_headers(), 457 timeout=timeout) 458 return status, content, header 459 460 def view_results(self, bucket, ddoc_name, params, limit=100, timeout=120, 461 view_name=None): 462 status, json = self._index_results(bucket, "view", ddoc_name, params, limit, timeout=timeout, view_name=view_name) 463 if not status: 464 raise Exception("unable to obtain view results") 465 return json 466 467 468 # DEPRECATED: Incorrectly named function kept for backwards compatibility. 469 def get_view(self, bucket, view): 470 log.info("DEPRECATED function get_view(" + view + "). use get_ddoc()") 471 return self.get_ddoc(bucket, view) 472 473 def get_data_path(self): 474 node_info = self.get_nodes_self() 475 data_path = node_info.storage[0].get_data_path() 476 return data_path 477 478 def get_memcached_port(self): 479 node_info = self.get_nodes_self() 480 return node_info.memcached 481 482 def get_ddoc(self, bucket, ddoc_name): 483 status, json, meta = self._get_design_doc(bucket, ddoc_name) 484 if not status: 485 raise ReadDocumentException(ddoc_name, json) 486 return json, meta 487 488 #the same as Preview a Random Document on UI 489 def get_random_key(self, bucket): 490 api = self.baseUrl + 'pools/default/buckets/%s/localRandomKey' % (bucket) 491 status, content, header = self._http_request(api, headers=self._create_capi_headers()) 492 json_parsed = json.loads(content) 493 if not status: 494 raise Exception("unable to get random document/key for bucket %s" % (bucket)) 495 return json_parsed 496 497 def run_view(self, bucket, view, name): 498 api = self.capiBaseUrl + '/%s/_design/%s/_view/%s' % (bucket, view, name) 499 status, content, header = self._http_request(api, headers=self._create_capi_headers()) 500 json_parsed = json.loads(content) 501 if not status: 502 raise Exception("unable to create view") 503 return json_parsed 504 505 def delete_view(self, bucket, view): 506 status, json = self._delete_design_doc(bucket, view) 507 if not status: 508 raise Exception("unable to delete the view") 509 return json 510 511 def spatial_results(self, bucket, spatial, params, limit=100): 512 status, json = self._index_results(bucket, "spatial", spatial, 513 params, limit) 514 if not status: 515 raise Exception("unable to obtain spatial view results") 516 return json 517 518 def create_spatial(self, bucket, spatial, function): 519 status, json = self._create_design_doc(bucket, spatial, function) 520 if status == False: 521 raise Exception("unable to create spatial view") 522 return json 523 524 def get_spatial(self, bucket, spatial): 525 status, json, meta = self._get_design_doc(bucket, spatial) 526 if not status: 527 raise Exception("unable to get the spatial view definition") 528 return json, meta 529 530 def delete_spatial(self, bucket, spatial): 531 status, json = self._delete_design_doc(bucket, spatial) 532 if not status: 533 raise Exception("unable to delete the spatial view") 534 return json 535 536 # type_ is "view" or "spatial" 537 def _index_results(self, bucket, type_, ddoc_name, params, limit, timeout=120, 538 view_name=None): 539 if view_name is None: 540 view_name = ddoc_name 541 query = '/{0}/_design/{1}/_{2}/{3}' 542 api = self.capiBaseUrl + query.format(bucket, ddoc_name, type_, view_name) 543 544 num_params = 0 545 if limit != None: 546 num_params = 1 547 api += "?limit={0}".format(limit) 548 for param in params: 549 if num_params > 0: 550 api += "&" 551 else: 552 api += "?" 553 num_params += 1 554 555 if param in ["key", "startkey", "endkey", "start_range", 556 "end_range"] or isinstance(params[param], bool): 557 api += "{0}={1}".format(param, 558 json.dumps(params[param], 559 separators=(',',':'))) 560 else: 561 api += "{0}={1}".format(param, params[param]) 562 563 log.info("index query url: {0}".format(api)) 564 status, content, header = self._http_request(api, headers=self._create_capi_headers(), timeout=timeout) 565 json_parsed = json.loads(content) 566 return status, json_parsed 567 568 def get_couch_doc(self, doc_id, bucket="default", timeout=120): 569 """ use couchBase uri to retrieve document from a bucket """ 570 api = self.capiBaseUrl + '/%s/%s' % (bucket, doc_id) 571 status, content, header = self._http_request(api, headers=self._create_capi_headers(), 572 timeout=timeout) 573 if not status: 574 raise ReadDocumentException(doc_id, content) 575 return json.loads(content) 576 577 def _create_design_doc(self, bucket, name, function): 578 api = self.capiBaseUrl + '/%s/_design/%s' % (bucket, name) 579 status, content, header = self._http_request( 580 api, 'PUT', function, headers=self._create_capi_headers()) 581 json_parsed = json.loads(content) 582 return status, json_parsed 583 584 def _get_design_doc(self, bucket, name): 585 api = self.capiBaseUrl + '/%s/_design/%s' % (bucket, name) 586 if isinstance(bucket, Bucket): 587 api = self.capiBaseUrl + '/%s/_design/%s' % (bucket.name, name) 588 589 if isinstance(bucket, Bucket) and bucket.authType == "sasl" and bucket.name != "default": 590 status, content, header = self._http_request(api, headers=self._create_capi_headers_with_auth( 591 username=bucket.name, password=bucket.saslPassword)) 592 else: 593 status, content, header = self._http_request(api, headers=self._create_capi_headers()) 594 json_parsed = json.loads(content) 595 meta_parsed = "" 596 if status: 597 #in dp4 builds meta data is in content, not in header 598 if 'x-couchbase-meta' in header: 599 meta = header['x-couchbase-meta'] 600 meta_parsed = json.loads(meta) 601 else: 602 meta_parsed = {} 603 meta_parsed["_rev"] = json_parsed["_rev"] 604 meta_parsed["_id"] = json_parsed["_id"] 605 return status, json_parsed, meta_parsed 606 607 def _delete_design_doc(self, bucket, name): 608 status, design_doc, meta = self._get_design_doc(bucket, name) 609 if not status: 610 raise Exception("unable to find for deletion design document") 611 api = self.capiBaseUrl + '/%s/_design/%s' % (bucket, name) 612 if isinstance(bucket, Bucket): 613 api = self.capiBaseUrl + '/%s/_design/%s' % (bucket.name, name) 614 if isinstance(bucket, Bucket) and bucket.authType == "sasl" and bucket.name != "default": 615 status, content, header = self._http_request(api, 'DELETE', headers=self._create_capi_headers_with_auth( 616 username=bucket.name, password=bucket.saslPassword)) 617 else: 618 status, content, header = self._http_request(api, 'DELETE', headers=self._create_capi_headers()) 619 json_parsed = json.loads(content) 620 return status, json_parsed 621 622 def spatial_compaction(self, bucket, design_name): 623 api = self.capiBaseUrl + '/%s/_design/%s/_spatial/_compact' % (bucket, design_name) 624 if isinstance(bucket, Bucket): 625 api = self.capiBaseUrl + \ 626 '/%s/_design/%s/_spatial/_compact' % (bucket.name, design_name) 627 628 if isinstance(bucket, Bucket) and bucket.authType == "sasl": 629 status, content, header = self._http_request(api, 'POST', headers=self._create_capi_headers_with_auth( 630 username=bucket.name, password=bucket.saslPassword)) 631 else: 632 status, content, header = self._http_request(api, 'POST', headers=self._create_capi_headers()) 633 json_parsed = json.loads(content) 634 return status, json_parsed 635 636 # Make a _design/_info request 637 def set_view_info(self, bucket, design_name): 638 """Get view diagnostic info (node specific)""" 639 api = self.capiBaseUrl 640 if isinstance(bucket, Bucket): 641 api += '/_set_view/{0}/_design/{1}/_info'.format(bucket.name, design_name) 642 else: 643 api += '_set_view/{0}/_design/{1}/_info'.format(bucket, design_name) 644 645 if isinstance(bucket, Bucket) and bucket.authType == "sasl": 646 headers = self._create_capi_headers_with_auth( 647 username=bucket.name, password=bucket.saslPassword) 648 status, content, header = self._http_request(api, 'POST', 649 headers=headers) 650 else: 651 headers = self._create_capi_headers() 652 status, content, header = self._http_request(api, 'GET', 653 headers=headers) 654 if not status: 655 raise SetViewInfoNotFound(design_name, content) 656 json_parsed = json.loads(content) 657 return status, json_parsed 658 659 # Make a _spatial/_info request 660 def spatial_info(self, bucket, design_name): 661 api = self.capiBaseUrl + \ 662 '/%s/_design/%s/_spatial/_info' % (bucket, design_name) 663 status, content, header = self._http_request( 664 api, 'GET', headers=self._create_capi_headers()) 665 json_parsed = json.loads(content) 666 return status, json_parsed 667 668 def _create_capi_headers(self): 669 return {'Content-Type': 'application/json', 670 'Accept': '*/*'} 671 672 def _create_capi_headers_with_auth(self, username, password): 673 authorization = base64.encodestring('%s:%s' % (username, password)) 674 return {'Content-Type': 'application/json', 675 'Authorization': 'Basic %s' % authorization, 676 'Accept': '*/*'} 677 678 #authorization must be a base64 string of username:password 679 def _create_headers(self): 680 authorization = base64.encodestring('%s:%s' % (self.username, self.password)) 681 return {'Content-Type': 'application/x-www-form-urlencoded', 682 'Authorization': 'Basic %s' % authorization, 683 'Accept': '*/*'} 684 685 def _http_request(self, api, method='GET', params='', headers=None, timeout=120): 686 if not headers: 687 headers = self._create_headers() 688 end_time = time.time() + timeout 689 while True: 690 try: 691 response, content = httplib2.Http(timeout=timeout).request(api, method, params, headers) 692 if response['status'] in ['200', '201', '202']: 693 return True, content, response 694 else: 695 try: 696 json_parsed = json.loads(content) 697 except ValueError as e: 698 json_parsed = {} 699 json_parsed["error"] = "status: {0}, content: {1}".format(response['status'], content) 700 reason = "unknown" 701 if "error" in json_parsed: 702 reason = json_parsed["error"] 703 log.error('{0} error {1} reason: {2} {3}'.format(api, response['status'], reason, content.rstrip('\n'))) 704 return False, content, response 705 except socket.error as e: 706 log.error("socket error while connecting to {0} error {1} ".format(api, e)) 707 if time.time() > end_time: 708 raise ServerUnavailableException(ip=self.ip) 709 except httplib2.ServerNotFoundError as e: 710 log.error("ServerNotFoundError error while connecting to {0} error {1} ".format(api, e)) 711 if time.time() > end_time: 712 raise ServerUnavailableException(ip=self.ip) 713 time.sleep(1) 714 715 def init_cluster(self, username='Administrator', password='password', port='8091'): 716 api = self.baseUrl + 'settings/web' 717 params = urllib.urlencode({'port': port, 718 'username': username, 719 'password': password}) 720 log.info('settings/web params on {0}:{1}:{2}'.format(self.ip, self.port, params)) 721 status, content, header = self._http_request(api, 'POST', params) 722 return status 723 724 def get_cluster_settings(self): 725 settings = {} 726 api = self.baseUrl + 'settings/web' 727 status, content, header = self._http_request(api, 'GET') 728 if status: 729 settings = json.loads(content) 730 log.info('settings/web params on {0}:{1}:{2}'.format(self.ip, self.port, settings)) 731 return settings 732 733 def init_cluster_memoryQuota(self, username='Administrator', 734 password='password', 735 memoryQuota=256): 736 api = self.baseUrl + 'pools/default' 737 params = urllib.urlencode({'memoryQuota': memoryQuota, 738 'username': username, 739 'password': password}) 740 log.info('pools/default params : {0}'.format(params)) 741 status, content, header = self._http_request(api, 'POST', params) 742 return status 743 744 def get_cluster_ceritificate(self): 745 api = self.baseUrl + 'pools/default/certificate' 746 status, content, _ = self._http_request(api, 'GET') 747 if status: 748 return content 749 else: 750 log.error("/poos/default/certificate status:{0},content:{1}".format(status, content)) 751 raise Exception("certificate API failed") 752 753 def regenerate_cluster_certificate(self): 754 api = self.baseUrl + 'controller/regenerateCertificate' 755 status, content, _ = self._http_request(api, 'POST') 756 if status: 757 return content 758 else: 759 log.error("controller/regenerateCertificate status:{0},content:{1}".format(status, content)) 760 raise Exception("regenerateCertificate API failed") 761 762 def __remote_clusters(self, api, op, remoteIp, remotePort, username, password, name, demandEncryption=0, certificate=''): 763 param_map = {'hostname': "{0}:{1}".format(remoteIp, remotePort), 764 'username': username, 765 'password': password, 766 'name':name} 767 if demandEncryption: 768 param_map ['demandEncryption'] = 'on' 769 param_map['certificate'] = certificate 770 params = urllib.urlencode(param_map) 771 status, content, _ = self._http_request(api, 'POST', params) 772 #sample response : 773 # [{"name":"two","uri":"/pools/default/remoteClusters/two","validateURI":"/pools/default/remoteClusters/two?just_validate=1","hostname":"127.0.0.1:9002","username":"Administrator"}] 774 if status: 775 remoteCluster = json.loads(content) 776 else: 777 log.error("/remoteCluster failed : status:{0},content:{1}".format(status, content)) 778 raise Exception("remoteCluster API '{0} remote cluster' failed".format(op)) 779 return remoteCluster 780 781 def add_remote_cluster(self, remoteIp, remotePort, username, password, name, demandEncryption=0, certificate=''): 782 #example : password:password username:Administrator hostname:127.0.0.1:9002 name:two 783 msg = "adding remote cluster hostname:{0}:{1} with username:password {2}:{3} name:{4} to source node: {5}:{6}" 784 log.info(msg.format(remoteIp, remotePort, username, password, name, self.ip, self.port)) 785 api = self.baseUrl + 'pools/default/remoteClusters' 786 self.__remote_clusters(api, 'add', remoteIp, remotePort, username, password, name, demandEncryption, certificate) 787 788 def modify_remote_cluster(self, remoteIp, remotePort, username, password, name, demandEncryption=0, certificate=''): 789 log.info("modifying remote cluster name:{0}".format(name)) 790 api = self.baseUrl + 'pools/default/remoteClusters/' + urllib.quote(name) 791 self.__remote_clusters(api, 'modify', remoteIp, remotePort, username, password, name, demandEncryption, certificate) 792 793 def get_remote_clusters(self): 794 remote_clusters = [] 795 api = self.baseUrl + 'pools/default/remoteClusters/' 796 params = urllib.urlencode({}) 797 status, content, header = self._http_request(api, 'GET', params) 798 if status: 799 remote_clusters = json.loads(content) 800 return remote_clusters 801 802 def remove_all_remote_clusters(self): 803 remote_clusters = self.get_remote_clusters() 804 for remote_cluster in remote_clusters: 805 if remote_cluster["deleted"] == False: 806 self.remove_remote_cluster(remote_cluster["name"]) 807 808 def remove_remote_cluster(self, name): 809 #example : name:two 810 msg = "removing remote cluster name:{0}".format(urllib.quote(name)) 811 log.info(msg) 812 api = self.baseUrl + 'pools/default/remoteClusters/{0}'.format(urllib.quote(name)) 813 params = urllib.urlencode({}) 814 status, content, header = self._http_request(api, 'DELETE', params) 815 #sample response : 816 # [{"name":"two","uri":"/pools/default/remoteClusters/two","validateURI":"/pools/default/remoteClusters/two?just_validate=1","hostname":"127.0.0.1:9002","username":"Administrator"}] 817 if status: 818 json_parsed = json.loads(content) 819 else: 820 log.error("failed to remove remote cluster: status:{0},content:{1}".format(status, content)) 821 raise Exception("remoteCluster API 'remove cluster' failed") 822 return json_parsed 823 824 825 #replicationType:continuous toBucket:default toCluster:two fromBucket:default 826 def start_replication(self, replicationType, fromBucket, toCluster, rep_type="xmem", toBucket=None): 827 toBucket = toBucket or fromBucket 828 829 msg = "starting {0} replication type:{1} from {2} to {3} in the remote cluster {4}" 830 log.info(msg.format(replicationType, rep_type, fromBucket, toBucket, toCluster)) 831 api = self.baseUrl + 'controller/createReplication' 832 param_map = {'replicationType': replicationType, 833 'toBucket': toBucket, 834 'fromBucket': fromBucket, 835 'toCluster': toCluster, 836 'type': rep_type} 837 params = urllib.urlencode(param_map) 838 status, content, _ = self._http_request(api, 'POST', params) 839 #response : {"database":"http://127.0.0.1:9500/_replicator", 840 # "id": "replication_id"} 841 if status: 842 json_parsed = json.loads(content) 843 return (json_parsed['database'], json_parsed['id']) 844 else: 845 log.error("/controller/createReplication failed : status:{0},content:{1}".format(status, content)) 846 raise Exception("create replication failed : status:{0},content:{1}".format(status, content)) 847 848 def get_replications(self): 849 replications = [] 850 content = self.ns_server_tasks() 851 for item in content: 852 if item["type"] == "xdcr": 853 replications.append(item) 854 return replications 855 856 def remove_all_replications(self): 857 replications = self.get_replications() 858 for replication in replications: 859 self.stop_replication(replication["cancelURI"]) 860 861 def stop_replication(self, uri): 862 api = self.baseUrl + uri 863 self._http_request(api, 'DELETE') 864 865 def remove_all_recoveries(self): 866 recoveries = [] 867 content = self.ns_server_tasks() 868 for item in content: 869 if item["type"] == "recovery": 870 recoveries.append(item) 871 for recovery in recoveries: 872 api = self.baseUrl + recovery["stopURI"] 873 status, content, header = self._http_request(api, 'POST') 874 if not status: 875 raise CBRecoveryFailedException("impossible to stop cbrecovery by {0}".format(api)) 876 log.info("recovery stopped by {0}".format(api)) 877 878 879 #params serverIp : the server to add to this cluster 880 #raises exceptions when 881 #unauthorized user 882 #server unreachable 883 #can't add the node to itself ( TODO ) 884 #server already added 885 #returns otpNode 886 def add_node(self, user='', password='', remoteIp='', port='8091', zone_name='', services = None): 887 otpNode = None 888 log.info('adding remote node @{0}:{1} to this cluster @{2}:{3}'\ 889 .format(remoteIp, port, self.ip, self.port)) 890 if zone_name == '': 891 api = self.baseUrl + 'controller/addNode' 892 else: 893 api = self.baseUrl + 'pools/default/serverGroups' 894 if self.is_zone_exist(zone_name): 895 zones = self.get_zone_names() 896 api = "/".join((api, zones[zone_name], "addNode")) 897 log.info("node {0} will be added to zone {1}".format(remoteIp, zone_name)) 898 else: 899 raise Exception("There is not zone with name: %s in cluster" % zone_name) 900 901 params = urllib.urlencode({'hostname': "{0}:{1}".format(remoteIp, port), 902 'user': user, 903 'password': password}) 904 if services != None: 905 services = ','.join(services) 906 params = urllib.urlencode({'hostname': "{0}:{1}".format(remoteIp, port), 907 'user': user, 908 'password': password, 909 'services': services}) 910 status, content, header = self._http_request(api, 'POST', params) 911 if status: 912 json_parsed = json.loads(content) 913 otpNodeId = json_parsed['otpNode'] 914 otpNode = OtpNode(otpNodeId) 915 if otpNode.ip == '127.0.0.1': 916 otpNode.ip = self.ip 917 else: 918 self.print_UI_logs() 919 try: 920 #print logs from node that we want to add 921 wanted_node = deepcopy(self) 922 wanted_node.ip = remoteIp 923 wanted_node.print_UI_logs() 924 except Exception, ex: 925 self.log(ex) 926 if content.find('Prepare join failed. Node is already part of cluster') >= 0: 927 raise ServerAlreadyJoinedException(nodeIp=self.ip, 928 remoteIp=remoteIp) 929 elif content.find('Prepare join failed. Joining node to itself is not allowed') >= 0: 930 raise ServerSelfJoinException(nodeIp=self.ip, 931 remoteIp=remoteIp) 932 else: 933 log.error('add_node error : {0}'.format(content)) 934 raise AddNodeException(nodeIp=self.ip, 935 remoteIp=remoteIp, 936 reason=content) 937 return otpNode 938 939 #params serverIp : the server to add to this cluster 940 #raises exceptions when 941 #unauthorized user 942 #server unreachable 943 #can't add the node to itself ( TODO ) 944 #server already added 945 #returns otpNode 946 def do_join_cluster(self, user='', password='', remoteIp='', port='8091', zone_name='', services = None): 947 otpNode = None 948 log.info('adding remote node @{0}:{1} to this cluster @{2}:{3}'\ 949 .format(remoteIp, port, self.ip, self.port)) 950 api = self.baseUrl + '/node/controller/doJoinCluster' 951 params = urllib.urlencode({'hostname': "{0}:{1}".format(remoteIp, port), 952 'user': user, 953 'password': password}) 954 if services != None: 955 services = ','.join(services) 956 params = urllib.urlencode({'hostname': "{0}:{1}".format(remoteIp, port), 957 'user': user, 958 'password': password, 959 'services': services}) 960 status, content, header = self._http_request(api, 'POST', params) 961 if status: 962 json_parsed = json.loads(content) 963 otpNodeId = json_parsed['otpNode'] 964 otpNode = OtpNode(otpNodeId) 965 if otpNode.ip == '127.0.0.1': 966 otpNode.ip = self.ip 967 else: 968 self.print_UI_logs() 969 try: 970 #print logs from node that we want to add 971 wanted_node = deepcopy(self) 972 wanted_node.ip = remoteIp 973 wanted_node.print_UI_logs() 974 except Exception, ex: 975 self.log(ex) 976 if content.find('Prepare join failed. Node is already part of cluster') >= 0: 977 raise ServerAlreadyJoinedException(nodeIp=self.ip, 978 remoteIp=remoteIp) 979 elif content.find('Prepare join failed. Joining node to itself is not allowed') >= 0: 980 raise ServerSelfJoinException(nodeIp=self.ip, 981 remoteIp=remoteIp) 982 else: 983 log.error('add_node error : {0}'.format(content)) 984 raise AddNodeException(nodeIp=self.ip, 985 remoteIp=remoteIp, 986 reason=content) 987 return otpNode 988 989 990 def eject_node(self, user='', password='', otpNode=None): 991 if not otpNode: 992 log.error('otpNode parameter required') 993 return False 994 api = self.baseUrl + 'controller/ejectNode' 995 params = urllib.urlencode({'otpNode': otpNode, 996 'user': user, 997 'password': password}) 998 status, content, header = self._http_request(api, 'POST', params) 999 if status: 1000 log.info('ejectNode successful') 1001 else: 1002 if content.find('Prepare join failed. Node is already part of cluster') >= 0: 1003 raise ServerAlreadyJoinedException(nodeIp=self.ip, 1004 remoteIp=otpNode) 1005 else: 1006 # TODO : raise an exception here 1007 log.error('eject_node error {0}'.format(content)) 1008 return True 1009 1010 def force_eject_node(self): 1011 self.diag_eval("gen_server:cast(ns_cluster, leave).") 1012 self.check_delay_restart_coucbase_server() 1013 1014 """ when we do reset couchbase server by force reject, couchbase server will not 1015 down right away but delay few seconds to be down depend on server spec. 1016 This fx will detect that delay and return true when couchbase server down and 1017 up again after force reject """ 1018 def check_delay_restart_coucbase_server(self): 1019 api = self.baseUrl + 'nodes/self' 1020 headers = self._create_headers() 1021 break_out = 0 1022 count_cbserver_up = 0 1023 while break_out < 60 and count_cbserver_up < 2: 1024 try: 1025 response, content = httplib2.Http(timeout=120).request(api, 'GET', '', headers) 1026 if response['status'] in ['200', '201', '202'] and count_cbserver_up == 0: 1027 log.info("couchbase server is up but down soon.") 1028 time.sleep(1) 1029 break_out += 1 # time needed for couchbase server reload after reset config 1030 elif response['status'] in ['200', '201', '202']: 1031 count_cbserver_up = 2 1032 log.info("couchbase server is up again") 1033 except socket.error as e: 1034 log.info("couchbase server is down. Waiting for couchbase server up") 1035 time.sleep(2) 1036 break_out += 1 1037 count_cbserver_up = 1 1038 pass 1039 if break_out >= 60: 1040 raise Exception("Couchbase server did not start after 120 seconds") 1041 1042 def fail_over(self, otpNode=None, graceful=False): 1043 if otpNode is None: 1044 log.error('otpNode parameter required') 1045 return False 1046 api = self.baseUrl + 'controller/failOver' 1047 if graceful: 1048 api = self.baseUrl + 'controller/startGracefulFailover' 1049 params = urllib.urlencode({'otpNode': otpNode}) 1050 status, content, header = self._http_request(api, 'POST', params) 1051 if status: 1052 log.info('fail_over node {0} successful'.format(otpNode)) 1053 else: 1054 log.error('fail_over node {0} error : {1}'.format(otpNode, content)) 1055 raise FailoverFailedException(content) 1056 return status 1057 1058 def set_recovery_type(self, otpNode=None, recoveryType=None): 1059 log.info("Going to set recoveryType={0} for node :: {1}".format(recoveryType, otpNode)) 1060 if otpNode == None: 1061 log.error('otpNode parameter required') 1062 return False 1063 if recoveryType == None: 1064 log.error('recoveryType is not set') 1065 return False 1066 api = self.baseUrl + 'controller/setRecoveryType' 1067 params = urllib.urlencode({'otpNode': otpNode, 1068 'recoveryType': recoveryType}) 1069 status, content, header = self._http_request(api, 'POST', params) 1070 if status: 1071 log.info('recoveryType for node {0} set successful'.format(otpNode)) 1072 else: 1073 log.error('recoveryType node {0} not set with error : {1}'.format(otpNode, content)) 1074 raise SetRecoveryTypeFailed(content) 1075 return status 1076 1077 def add_back_node(self, otpNode=None): 1078 if otpNode is None: 1079 log.error('otpNode parameter required') 1080 return False 1081 api = self.baseUrl + 'controller/reAddNode' 1082 params = urllib.urlencode({'otpNode': otpNode}) 1083 status, content, header = self._http_request(api, 'POST', params) 1084 if status: 1085 log.info('add_back_node {0} successful'.format(otpNode)) 1086 else: 1087 log.error('add_back_node {0} error : {1}'.format(otpNode, content)) 1088 raise InvalidArgumentException('controller/reAddNode', 1089 parameters=params) 1090 return status 1091 1092 def rebalance(self, otpNodes=[], ejectedNodes=[], deltaRecoveryBuckets=None): 1093 knownNodes = ','.join(otpNodes) 1094 ejectedNodesString = ','.join(ejectedNodes) 1095 if deltaRecoveryBuckets == None: 1096 params = urllib.urlencode({'knownNodes': knownNodes, 1097 'ejectedNodes': ejectedNodesString, 1098 'user': self.username, 1099 'password': self.password}) 1100 else: 1101 deltaRecoveryBuckets = ",".join(deltaRecoveryBuckets) 1102 params = urllib.urlencode({'knownNodes': knownNodes, 1103 'ejectedNodes': ejectedNodesString, 1104 'deltaRecoveryBuckets': deltaRecoveryBuckets, 1105 'user': self.username, 1106 'password': self.password}) 1107 log.info('rebalance params : {0}'.format(params)) 1108 api = self.baseUrl + "controller/rebalance" 1109 status, content, header = self._http_request(api, 'POST', params) 1110 if status: 1111 log.info('rebalance operation started') 1112 else: 1113 log.error('rebalance operation failed: {0}'.format(content)) 1114 #extract the error 1115 raise InvalidArgumentException('controller/rebalance with error message {0}'.format(content), 1116 parameters=params) 1117 return status 1118 1119 def diag_eval(self, code): 1120 api = '{0}{1}'.format(self.baseUrl, 'diag/eval/') 1121 status, content, header = self._http_request(api, "POST", code) 1122 log.info("/diag/eval status on {0}:{1}: {2} content: {3} command: {4}". 1123 format(self.ip, self.port, status, content, code)) 1124 return status, content 1125 1126 def set_chk_max_items(self, max_items): 1127 status, content = self.diag_eval("ns_config:set(chk_max_items, " + str(max_items) + ")") 1128 return status, content 1129 1130 def set_chk_period(self, period): 1131 status, content = self.diag_eval("ns_config:set(chk_period, " + str(period) + ")") 1132 return status, content 1133 1134 def set_enable_flow_control(self, flow=True, bucket='default'): 1135 flow_control = "false" 1136 if flow: 1137 flow_control = "true" 1138 code = "ns_bucket:update_bucket_props(\"" + bucket + "\", [{extra_config_string, \"upr_enable_flow_control=" + flow_control + "\"}])" 1139 status, content = self.diag_eval(code) 1140 return status, content 1141 1142 def diag_master_events(self): 1143 api = '{0}{1}'.format(self.baseUrl, 'diag/masterEvents?o=1') 1144 status, content, header = self._http_request(api, "GET") 1145 log.info("diag/masterEvents?o=1 status: {0} content: {1}".format(status, content)) 1146 return status, content 1147 1148 def monitorRebalance(self, stop_if_loop=True): 1149 start = time.time() 1150 progress = 0 1151 retry = 0 1152 same_progress_count = 0 1153 previous_progress = 0 1154 while progress != -1 and (progress != 100 or self._rebalance_progress_status() == 'running') and retry < 20: 1155 #-1 is error , -100 means could not retrieve progress 1156 progress = self._rebalance_progress() 1157 if progress == -100: 1158 log.error("unable to retrieve rebalanceProgress.try again in 1 second") 1159 retry += 1 1160 else: 1161 retry = 0 1162 if stop_if_loop: 1163 #reset same_progress_count if get a different result, or progress is still O 1164 #(it may take a long time until the results are different from 0) 1165 if previous_progress != progress or progress == 0: 1166 previous_progress = progress 1167 same_progress_count = 0 1168 else: 1169 same_progress_count += 1 1170 if same_progress_count > 50: 1171 log.error("apparently rebalance progress code in infinite loop: {0}".format(progress)) 1172 return False 1173 #sleep for 5 seconds 1174 time.sleep(5) 1175 if progress < 0: 1176 log.error("rebalance progress code : {0}".format(progress)) 1177 return False 1178 else: 1179 duration = time.time() - start 1180 if duration > 10: 1181 sleep = 10 1182 else: 1183 sleep = duration 1184 log.info('rebalance progress took {0} seconds '.format(duration)) 1185 log.info("sleep for {0} seconds after rebalance...".format(sleep)) 1186 time.sleep(sleep) 1187 return True 1188 1189 def _rebalance_progress_status(self): 1190 api = self.baseUrl + "pools/default/rebalanceProgress" 1191 status, content, header = self._http_request(api) 1192 json_parsed = json.loads(content) 1193 if status: 1194 if "status" in json_parsed: 1195 return json_parsed['status'] 1196 else: 1197 return None 1198 1199 def _rebalance_progress(self): 1200 avg_percentage = -1 1201 api = self.baseUrl + "pools/default/rebalanceProgress" 1202 try: 1203 status, content, header = self._http_request(api) 1204 except ServerUnavailableException as e: 1205 log.error(e) 1206 return -100 1207 json_parsed = json.loads(content) 1208 if status: 1209 if "status" in json_parsed: 1210 if "errorMessage" in json_parsed: 1211 msg = '{0} - rebalance failed'.format(json_parsed) 1212 log.error(msg) 1213 self.print_UI_logs() 1214 raise RebalanceFailedException(msg) 1215 elif json_parsed["status"] == "running": 1216 total_percentage = 0 1217 count = 0 1218 for key in json_parsed: 1219 if key.find('@') >= 0: 1220 ns_1_dictionary = json_parsed[key] 1221 percentage = ns_1_dictionary['progress'] * 100 1222 count += 1 1223 total_percentage += percentage 1224 if count: 1225 avg_percentage = (total_percentage / count) 1226 else: 1227 avg_percentage = 0 1228 log.info('rebalance percentage : {0:.02f} %'. 1229 format(round(avg_percentage, 2))) 1230 else: 1231 avg_percentage = 100 1232 else: 1233 avg_percentage = -100 1234 return avg_percentage 1235 1236 1237 def log_client_error(self, post): 1238 api = self.baseUrl + 'logClientError' 1239 status, content, header = self._http_request(api, 'POST', post) 1240 if not status: 1241 log.error('unable to logClientError') 1242 1243 #returns node data for this host 1244 def get_nodes_self(self, timeout=120): 1245 node = None 1246 api = self.baseUrl + 'nodes/self' 1247 status, content, header = self._http_request(api, timeout=timeout) 1248 if status: 1249 json_parsed = json.loads(content) 1250 node = RestParser().parse_get_nodes_response(json_parsed) 1251 return node 1252 1253 def node_statuses(self, timeout=120): 1254 nodes = [] 1255 api = self.baseUrl + 'nodeStatuses' 1256 status, content, header = self._http_request(api, timeout=timeout) 1257 json_parsed = json.loads(content) 1258 if status: 1259 for key in json_parsed: 1260 #each key contain node info 1261 value = json_parsed[key] 1262 #get otp,get status 1263 node = OtpNode(id=value['otpNode'], 1264 status=value['status']) 1265 if node.ip == '127.0.0.1': 1266 node.ip = self.ip 1267 node.port = int(key[key.rfind(":") + 1:]) 1268 node.replication = value['replication'] 1269 if 'gracefulFailoverPossible' in value.keys(): 1270 node.gracefulFailoverPossible = value['gracefulFailoverPossible'] 1271 else: 1272 node.gracefulFailoverPossible = False 1273 nodes.append(node) 1274 return nodes 1275 1276 def cluster_status(self): 1277 parsed = {} 1278 api = self.baseUrl + 'pools/default' 1279 status, content, header = self._http_request(api) 1280 if status: 1281 parsed = json.loads(content) 1282 return parsed 1283 1284 def fetch_vbucket_map(self, bucket="default"): 1285 """Return vbucket map for bucket 1286 Keyword argument: 1287 bucket -- bucket name 1288 """ 1289 api = self.baseUrl + 'pools/default/buckets/' + bucket 1290 status, content, header = self._http_request(api) 1291 _stats = json.loads(content) 1292 return _stats['vBucketServerMap']['vBucketMap'] 1293 1294 def get_vbucket_map_and_server_list(self, bucket="default"): 1295 """ Return server list, replica and vbuckets map 1296 that matches to server list """ 1297 vbucket_map = self.fetch_vbucket_map(bucket) 1298 api = self.baseUrl + 'pools/default/buckets/' + bucket 1299 status, content, header = self._http_request(api) 1300 _stats = json.loads(content) 1301 num_replica = _stats['vBucketServerMap']['numReplicas'] 1302 vbucket_map = _stats['vBucketServerMap']['vBucketMap'] 1303 servers = _stats['vBucketServerMap']['serverList'] 1304 server_list = [] 1305 for node in servers: 1306 node = node.split(":") 1307 server_list.append(node[0]) 1308 return vbucket_map, server_list, num_replica 1309 1310 def get_pools_info(self): 1311 parsed = {} 1312 api = self.baseUrl + 'pools' 1313 status, content, header = self._http_request(api) 1314 json_parsed = json.loads(content) 1315 if status: 1316 parsed = json_parsed 1317 return parsed 1318 1319 def get_pools(self): 1320 version = None 1321 api = self.baseUrl + 'pools' 1322 status, content, header = self._http_request(api) 1323 json_parsed = json.loads(content) 1324 if status: 1325 version = MembaseServerVersion(json_parsed['implementationVersion'], json_parsed['componentsVersion']) 1326 return version 1327 1328 def get_buckets(self): 1329 #get all the buckets 1330 buckets = [] 1331 api = '{0}{1}'.format(self.baseUrl, 'pools/default/buckets?basic_stats=true') 1332 status, content, header = self._http_request(api) 1333 json_parsed = json.loads(content) 1334 if status: 1335 for item in json_parsed: 1336 bucketInfo = RestParser().parse_get_bucket_json(item) 1337 buckets.append(bucketInfo) 1338 return buckets 1339 1340 def get_bucket_stats_for_node(self, bucket='default', node=None): 1341 if not node: 1342 log.error('node_ip not specified') 1343 return None 1344 stats = {} 1345 api = "{0}{1}{2}{3}{4}:{5}{6}".format(self.baseUrl, 'pools/default/buckets/', 1346 bucket, "/nodes/", node.ip, node.port, "/stats") 1347 status, content, header = self._http_request(api) 1348 if status: 1349 json_parsed = json.loads(content) 1350 op = json_parsed["op"] 1351 samples = op["samples"] 1352 for stat_name in samples: 1353 if len(samples[stat_name]) == 0: 1354 stats[stat_name] = [] 1355 else: 1356 stats[stat_name] = samples[stat_name][-1] 1357 return stats 1358 1359 def fetch_bucket_stats(self, bucket='default', zoom='minute'): 1360 """Return deserialized buckets stats. 1361 Keyword argument: 1362 bucket -- bucket name 1363 zoom -- stats zoom level (minute | hour | day | week | month | year) 1364 """ 1365 api = self.baseUrl + 'pools/default/buckets/{0}/stats?zoom={1}'.format(bucket, zoom) 1366 status, content, header = self._http_request(api) 1367 return json.loads(content) 1368 1369 def fetch_system_stats(self): 1370 """Return deserialized system stats.""" 1371 api = self.baseUrl + 'pools/default/' 1372 status, content, header = self._http_request(api) 1373 return json.loads(content) 1374 1375 def get_xdc_queue_size(self, bucket): 1376 """Fetch bucket stats and return the latest value of XDC replication 1377 queue size""" 1378 bucket_stats = self.fetch_bucket_stats(bucket) 1379 return bucket_stats['op']['samples']['replication_changes_left'][-1] 1380 1381 def get_nodes(self): 1382 nodes = [] 1383 api = self.baseUrl + 'pools/default' 1384 status, content, header = self._http_request(api) 1385 count = 0 1386 while not content and count < 7: 1387 log.info("sleep 5 seconds and retry") 1388 time.sleep(5) 1389 status, content, header = self._http_request(api) 1390 count += 1 1391 if count == 7: 1392 raise Exception("could not get node info after 30 seconds") 1393 json_parsed = json.loads(content) 1394 if status: 1395 if "nodes" in json_parsed: 1396 for json_node in json_parsed["nodes"]: 1397 node = RestParser().parse_get_nodes_response(json_node) 1398 node.rest_username = self.username 1399 node.rest_password = self.password 1400 if node.ip == "127.0.0.1": 1401 node.ip = self.ip 1402 # Only add nodes which are active on cluster 1403 if node.clusterMembership == 'active': 1404 nodes.append(node) 1405 else: 1406 log.info("Node {0} not part of cluster {1}".format(node.ip, node.clusterMembership)) 1407 return nodes 1408 1409 # this method returns the number of node in cluster 1410 def get_cluster_size(self): 1411 nodes = self.get_nodes() 1412 node_ip = [] 1413 for node in nodes: 1414 node_ip.append(node.ip) 1415 log.info("Number of node(s) in cluster is {0} node(s)".format(len(node_ip))) 1416 return len(node_ip) 1417 1418 # this method returns the versions of nodes in cluster 1419 def get_nodes_versions(self): 1420 nodes = self.get_nodes() 1421 versions = [] 1422 for node in nodes: 1423 versions.append(node.version) 1424 log.info("Node versions in cluster {0}".format(versions)) 1425 return versions 1426 1427 1428 def get_bucket_stats(self, bucket='default'): 1429 stats = {} 1430 status, json_parsed = self.get_bucket_stats_json(bucket) 1431 if status: 1432 op = json_parsed["op"] 1433 samples = op["samples"] 1434 for stat_name in samples: 1435 if samples[stat_name]: 1436 last_sample = len(samples[stat_name]) - 1 1437 if last_sample: 1438 stats[stat_name] = samples[stat_name][last_sample] 1439 return stats 1440 1441 def get_bucket_stats_json(self, bucket='default'): 1442 stats = {} 1443 api = "{0}{1}{2}{3}".format(self.baseUrl, 'pools/default/buckets/', bucket, "/stats") 1444 if isinstance(bucket, Bucket): 1445 api = '{0}{1}{2}{3}'.format(self.baseUrl, 'pools/default/buckets/', bucket.name, "/stats") 1446 status, content, header = self._http_request(api) 1447 json_parsed = json.loads(content) 1448 return status, json_parsed 1449 1450 def get_bucket_json(self, bucket='default'): 1451 api = '{0}{1}{2}'.format(self.baseUrl, 'pools/default/buckets/', bucket) 1452 if isinstance(bucket, Bucket): 1453 api = '{0}{1}{2}'.format(self.baseUrl, 'pools/default/buckets/', bucket.name) 1454 status, content, header = self._http_request(api) 1455 if not status: 1456 raise GetBucketInfoFailed(bucket, content) 1457 return json.loads(content) 1458 1459 def get_bucket(self, bucket='default', num_attempt=1, timeout=1): 1460 bucketInfo = None 1461 api = '%s%s%s?basic_stats=true' % (self.baseUrl, 'pools/default/buckets/', bucket) 1462 if isinstance(bucket, Bucket): 1463 api = '%s%s%s?basic_stats=true' % (self.baseUrl, 'pools/default/buckets/', bucket.name) 1464 status, content, header = self._http_request(api) 1465 num = 1 1466 while not status and num_attempt > num: 1467 log.error("try to get {0} again after {1} sec".format(api, timeout)) 1468 time.sleep(timeout) 1469 status, content, header = self._http_request(api) 1470 num += 1 1471 if status: 1472 bucketInfo = RestParser().parse_get_bucket_response(content) 1473 return bucketInfo 1474 1475 def get_vbuckets(self, bucket='default'): 1476 b = self.get_bucket(bucket) 1477 return None if not b else b.vbuckets 1478 1479 def delete_bucket(self, bucket='default'): 1480 api = '%s%s%s' % (self.baseUrl, 'pools/default/buckets/', bucket) 1481 if isinstance(bucket, Bucket): 1482 api = '%s%s%s' % (self.baseUrl, 'pools/default/buckets/', bucket.name) 1483 status, content, header = self._http_request(api, 'DELETE') 1484 1485 if int(header['status']) == 500: 1486 # According to http://docs.couchbase.com/couchbase-manual-2.5/cb-rest-api/#deleting-buckets 1487 # the cluster will return with 500 if it failed to nuke 1488 # the bucket on all of the nodes within 30 secs 1489 log.warn("Bucket deletion timed out waiting for all nodes") 1490 1491 return status 1492 1493 # figure out the proxy port 1494 def create_bucket(self, bucket='', 1495 ramQuotaMB=1, 1496 authType='none', 1497 saslPassword='', 1498 replicaNumber=1, 1499 proxyPort=11211, 1500 bucketType='membase', 1501 replica_index=1, 1502 threadsNumber=3, 1503 flushEnabled=1, 1504 evictionPolicy='valueOnly'): 1505 api = '{0}{1}'.format(self.baseUrl, 'pools/default/buckets') 1506 params = urllib.urlencode({}) 1507 1508 #this only works for default bucket ? 1509 if bucket == 'default': 1510 params = urllib.urlencode({'name': bucket, 1511 'authType': 'sasl', 1512 'saslPassword': saslPassword, 1513 'ramQuotaMB': ramQuotaMB, 1514 'replicaNumber': replicaNumber, 1515 'proxyPort': proxyPort, 1516 'bucketType': bucketType, 1517 'replicaIndex': replica_index, 1518 'threadsNumber': threadsNumber, 1519 'flushEnabled': flushEnabled, 1520 'evictionPolicy': evictionPolicy}) 1521 elif authType == 'none': 1522 params = urllib.urlencode({'name': bucket, 1523 'ramQuotaMB': ramQuotaMB, 1524 'authType': authType, 1525 'replicaNumber': replicaNumber, 1526 'proxyPort': proxyPort, 1527 'bucketType': bucketType, 1528 'replicaIndex': replica_index, 1529 'threadsNumber': threadsNumber, 1530 'flushEnabled': flushEnabled, 1531 'evictionPolicy': evictionPolicy}) 1532 elif authType == 'sasl': 1533 params = urllib.urlencode({'name': bucket, 1534 'ramQuotaMB': ramQuotaMB, 1535 'authType': authType, 1536 'saslPassword': saslPassword, 1537 'replicaNumber': replicaNumber, 1538 'proxyPort': self.get_nodes_self().moxi, 1539 'bucketType': bucketType, 1540 'replicaIndex': replica_index, 1541 'threadsNumber': threadsNumber, 1542 'flushEnabled': flushEnabled, 1543 'evictionPolicy': evictionPolicy}) 1544 log.info("{0} with param: {1}".format(api, params)) 1545 create_start_time = time.time() 1546 1547 maxwait = 60 1548 for numsleep in range(maxwait): 1549 status, content, header = self._http_request(api, 'POST', params) 1550 if status: 1551 break 1552 elif (int(header['status']) == 503 and 1553 '{"_":"Bucket with given name still exists"}' in content): 1554 log.info("The bucket still exists, sleep 1 sec and retry") 1555 time.sleep(1) 1556 else: 1557 raise BucketCreationException(ip=self.ip, bucket_name=bucket) 1558 1559 if (numsleep + 1) == maxwait: 1560 log.error("Tried to create the bucket for {0} secs.. giving up". 1561 format(maxwait)) 1562 raise BucketCreationException(ip=self.ip, bucket_name=bucket) 1563 1564 create_time = time.time() - create_start_time 1565 log.info("{0:.02f} seconds to create bucket {1}". 1566 format(round(create_time, 2), bucket)) 1567 return status 1568 1569 def change_bucket_props(self, bucket, 1570 ramQuotaMB=None, 1571 authType=None, 1572 saslPassword=None, 1573 replicaNumber=None, 1574 proxyPort=None, 1575 replicaIndex=None, 1576 flushEnabled=None): 1577 api = '{0}{1}{2}'.format(self.baseUrl, 'pools/default/buckets/', bucket) 1578 if isinstance(bucket, Bucket): 1579 api = '{0}{1}{2}'.format(self.baseUrl, 'pools/default/buckets/', bucket.name) 1580 params = urllib.urlencode({}) 1581 params_dict = {} 1582 existing_bucket = self.get_bucket_json(bucket) 1583 if ramQuotaMB: 1584 params_dict["ramQuotaMB"] = ramQuotaMB 1585 if authType: 1586 params_dict["authType"] = authType 1587 if saslPassword: 1588 params_dict["authType"] = "sasl" 1589 params_dict["saslPassword"] = saslPassword 1590 if replicaNumber: 1591 params_dict["replicaNumber"] = replicaNumber 1592 if proxyPort: 1593 params_dict["proxyPort"] = proxyPort 1594 if replicaIndex: 1595 params_dict["replicaIndex"] = replicaIndex 1596 if flushEnabled: 1597 params_dict["flushEnabled"] = flushEnabled 1598 params = urllib.urlencode(params_dict) 1599 1600 log.info("%s with param: %s" % (api, params)) 1601 status, content, header = self._http_request(api, 'POST', params) 1602 if not status: 1603 raise Exception("Unable to set bucket settings %s for bucket" % (params, bucket)) 1604 log.info("bucket %s updated" % bucket) 1605 return status 1606 1607 #return AutoFailoverSettings 1608 def get_autofailover_settings(self): 1609 settings = None 1610 api = self.baseUrl + 'settings/autoFailover' 1611 status, content, header = self._http_request(api) 1612 json_parsed = json.loads(content) 1613 if status: 1614 settings = AutoFailoverSettings() 1615 settings.enabled = json_parsed["enabled"] 1616 settings.count = json_parsed["count"] 1617 settings.timeout = json_parsed["timeout"] 1618 return settings 1619 1620 def update_autofailover_settings(self, enabled, timeout): 1621 if enabled: 1622 params = urllib.urlencode({'enabled': 'true', 1623 'timeout': timeout}) 1624 else: 1625 params = urllib.urlencode({'enabled': 'false', 1626 'timeout': timeout}) 1627 api = self.baseUrl + 'settings/autoFailover' 1628 log.info('settings/autoFailover params : {0}'.format(params)) 1629 status, content, header = self._http_request(api, 'POST', params) 1630 if not status: 1631 log.error('''failed to change autofailover_settings! 1632 See MB-7282. Workaround: 1633 wget --user=Administrator --password=asdasd --post-data='rpc:call(mb_master:master_node(), erlang, apply ,[fun () -> erlang:exit(erlang:whereis(mb_master), kill) end, []]).' http://localhost:8091/diag/eval''') 1634 return status 1635 1636 def reset_autofailover(self): 1637 api = self.baseUrl + 'settings/autoFailover/resetCount' 1638 status, content, header = self._http_request(api, 'POST', '') 1639 return status 1640 1641 def set_alerts_settings(self, recipients, sender, email_username, email_password, email_host='localhost', email_port=25, email_encrypt='false', alerts='auto_failover_node,auto_failover_maximum_reached'): 1642 api = self.baseUrl + 'settings/alerts' 1643 params = urllib.urlencode({'enabled': 'true', 1644 'recipients': recipients, 1645 'sender': sender, 1646 'emailUser': email_username, 1647 'emailPass': email_password, 1648 'emailHost': email_host, 1649 'emailPrt': email_port, 1650 'emailEncrypt': email_encrypt, 1651 'alerts': alerts}) 1652 log.info('settings/alerts params : {0}'.format(params)) 1653 status, content, header = self._http_request(api, 'POST', params) 1654 return status 1655 1656 def get_alerts_settings(self): 1657 api = self.baseUrl + 'settings/alerts' 1658 status, content, header = self._http_request(api) 1659 json_parsed = json.loads(content) 1660 if not status: 1661 raise Exception("unable to get autofailover alerts settings") 1662 return json_parsed 1663 1664 def disable_alerts(self): 1665 api = self.baseUrl + 'settings/alerts' 1666 params = urllib.urlencode({'enabled': 'false'}) 1667 log.info('settings/alerts params : {0}'.format(params)) 1668 status, content, header = self._http_request(api, 'POST', params) 1669 return status 1670 1671 def stop_rebalance(self, wait_timeout=10): 1672 api = self.baseUrl + '/controller/stopRebalance' 1673 status, content, header = self._http_request(api, 'POST') 1674 if status: 1675 for i in xrange(wait_timeout): 1676 if self._rebalance_progress_status() == 'running': 1677 log.warn("rebalance is not stopped yet after {0} sec".format(i + 1)) 1678 time.sleep(1) 1679 status = False 1680 else: 1681 log.info("rebalance was stopped") 1682 status = True 1683 break 1684 else: 1685 log.error("Rebalance is not stopped due to {0}".format(content)) 1686 return status 1687 1688 def set_data_path(self, data_path=None, index_path=None): 1689 api = self.baseUrl + '/nodes/self/controller/settings' 1690 paths = {} 1691 if data_path: 1692 paths['path'] = data_path 1693 if index_path: 1694 paths['index_path'] = index_path 1695 if paths: 1696 params = urllib.urlencode(paths) 1697 log.info('/nodes/self/controller/settings params : {0}'.format(params)) 1698 status, content, header = self._http_request(api, 'POST', params) 1699 if status: 1700 log.info("Setting data_path: {0}: status {1}".format(data_path, status)) 1701 else: 1702 log.error("Unable to set data_path {0} : {1}".format(data_path, content)) 1703 return status 1704 1705 def get_database_disk_size(self, bucket='default'): 1706 api = self.baseUrl + "pools/{0}/buckets".format(bucket) 1707 status, content, header = self._http_request(api) 1708 json_parsed = json.loads(content) 1709 # disk_size in MB 1710 disk_size = (json_parsed[0]["basicStats"]["diskUsed"]) / (1024 * 1024) 1711 return status, disk_size 1712 1713 def ddoc_compaction(self, design_doc_id, bucket="default"): 1714 api = self.baseUrl + "pools/default/buckets/%s/ddocs/%s/controller/compactView" % \ 1715 (bucket, design_doc_id) 1716 status, content, header = self._http_request(api, 'POST') 1717 if not status: 1718 raise CompactViewFailed(design_doc_id, content) 1719 log.info("compaction for ddoc '%s' was triggered" % design_doc_id) 1720 1721 def check_compaction_status(self, bucket_name): 1722 tasks = self.active_tasks() 1723 if "error" in tasks: 1724 raise Exception(tasks) 1725 for task in tasks: 1726 log.info("Task is {0}".format(task)) 1727 if task["type"] == "bucket_compaction": 1728 if task["bucket"] == bucket_name: 1729 return True, task["progress"] 1730 return False, None 1731 1732 def change_memcached_t_option(self, value): 1733 cmd = '[ns_config:update_key({node, N, memcached}, fun (PList)' + \ 1734 ' -> lists:keystore(verbosity, 1, PList, {verbosity, \'-t ' + str(value) + '\'}) end)' + \ 1735 ' || N <- ns_node_disco:nodes_wanted()].' 1736 return self.diag_eval(cmd) 1737 1738 def set_ensure_full_commit(self, value): 1739 """Dynamic settings changes""" 1740 # the boolean paramter is used to turn on/off ensure_full_commit(). In XDCR, 1741 # issuing checkpoint in this function is expensive and not necessary in some 1742 # test, turning off this function would speed up some test. The default value 1743 # is ON. 1744 cmd = 'ns_config:set(ensure_full_commit_enabled, {0}).'.format(value) 1745 return self.diag_eval(cmd) 1746 1747 def get_internalSettings(self, param): 1748 """allows to get internalSettings values for: 1749 indexAwareRebalanceDisabled, rebalanceIndexWaitingDisabled, 1750 rebalanceIndexPausingDisabled, maxParallelIndexers, 1751 maxParallelReplicaIndexers, maxBucketCount""" 1752 api = self.baseUrl + "internalSettings" 1753 status, content, header = self._http_request(api) 1754 json_parsed = json.loads(content) 1755 param = json_parsed[param] 1756 return param 1757 1758 def set_internalSetting(self, param, value): 1759 "Set any internal setting" 1760 api = self.baseUrl + "internalSettings" 1761 1762 if isinstance(value, bool): 1763 value = str(value).lower() 1764 1765 params = urllib.urlencode({param : value}) 1766 status, content, header = self._http_request(api, "POST", params) 1767 log.info('Update internal setting {0}={1}'.format(param, value)) 1768 return status 1769 1770 def get_replication_for_buckets(self, src_bucket_name, dest_bucket_name): 1771 replications = self.get_replications() 1772 for replication in replications: 1773 if src_bucket_name in replication['source'] and \ 1774 replication['target'].endswith(dest_bucket_name): 1775 return replication 1776 raise XDCRException("Replication with Src bucket: {0} and Target bucket: {1} not found". 1777 format(src_bucket_name, dest_bucket_name)) 1778 1779 """ By default, these are the global replication settings - 1780 { optimisticReplicationThreshold:256, 1781 workerBatchSize:500, 1782 failureRestartInterval:1, 1783 docBatchSizeKb":2048, 1784 checkpointInterval":1800, 1785 maxConcurrentReps":32} 1786 You can override these using set_xdcr_param() 1787 """ 1788 def set_xdcr_param(self, src_bucket_name, 1789 dest_bucket_name, param, value): 1790 replication = self.get_replication_for_buckets(src_bucket_name, dest_bucket_name) 1791 api = self.baseUrl + replication['settingsURI'] 1792 value = str(value).lower() 1793 params = urllib.urlencode({param: value}) 1794 status, _, _ = self._http_request(api, "POST", params) 1795 if not status: 1796 raise XDCRException("Unable to set replication setting {0}={1} on bucket {2} on node {3}". 1797 format(param, value, src_bucket_name, self.ip)) 1798 log.info("Updated {0}={1} on bucket'{2}' on {3}".format(param, value, src_bucket_name, self.ip)) 1799 1800 # Gets per-replication setting value 1801 def get_xdcr_param(self, src_bucket_name, 1802 dest_bucket_name, param): 1803 replication = self.get_replication_for_buckets(src_bucket_name, dest_bucket_name) 1804 api = self.baseUrl + replication['settingsURI'] 1805 status, content, _ = self._http_request(api) 1806 if not status: 1807 raise XDCRException("Unable to get replication setting {0} on bucket {1} on node {2}". 1808 format(param, src_bucket_name, self.ip)) 1809 json_parsed = json.loads(content) 1810 # when per-replication settings match global(internal) settings, 1811 # the param is not returned by rest API 1812 # in such cases, return internalSetting value for the param 1813 try: 1814 return json_parsed[param] 1815 except KeyError: 1816 if param == 'pauseRequested': 1817 return False 1818 else: 1819 param = 'xdcr' + param[0].upper() + param[1:] 1820 log.info("Trying to fetch xdcr param:{0} from global settings". 1821 format(param)) 1822 return self.get_internalSettings(param) 1823 1824 # Returns a boolean value on whether replication 1825 def is_replication_paused(self, src_bucket_name, dest_bucket_name): 1826 return self.get_xdcr_param(src_bucket_name, dest_bucket_name, 'pauseRequested') 1827 1828 """ Enable master trace logging for xdcr 1829 wget -O- --post-data='ale:set_loglevel(xdcr_trace, debug).' http://Administrator:asdasd@127.0.0.1:8091/diag/eval""" 1830 def enable_xdcr_trace_logging(self): 1831 self.diag_eval('ale:set_loglevel(xdcr_trace, debug).') 1832 1833 def get_recent_xdcr_vb_ckpt(self, src_bucket_name): 1834 command = 'ns_server_testrunner_api:grab_all_xdcr_checkpoints("%s", 10).' % src_bucket_name 1835 status, content = self.diag_eval(command) 1836 if not status: 1837 raise Exception("Unable to get recent XDCR checkpoint information") 1838 json_parsed = json.loads(content) 1839 # a single decoding will only return checkpoint record as string 1840 # convert string to dict using json 1841 chkpt_doc_string = json_parsed.values()[0].replace('"', '\"') 1842 chkpt_dict = json.loads(chkpt_doc_string) 1843 return chkpt_dict 1844 1845 def set_reb_cons_view(self, disable): 1846 """Enable/disable consistent view for rebalance tasks""" 1847 api = self.baseUrl + "internalSettings" 1848 params = {"indexAwareRebalanceDisabled": str(disable).lower()} 1849 params = urllib.urlencode(params) 1850 status, content, header = self._http_request(api, "POST", params) 1851 log.info('Consistent-views during rebalance was set as indexAwareRebalanceDisabled={0}'\ 1852 .format(str(disable).lower())) 1853 return status 1854 1855 def set_reb_index_waiting(self, disable): 1856 """Enable/disable rebalance index waiting""" 1857 api = self.baseUrl + "internalSettings" 1858 params = {"rebalanceIndexWaitingDisabled": str(disable).lower()} 1859 params = urllib.urlencode(params) 1860 status, content, header = self._http_request(api, "POST", params) 1861 log.info('rebalance index waiting was set as rebalanceIndexWaitingDisabled={0}'\ 1862 .format(str(disable).lower())) 1863 return status 1864 1865 def set_rebalance_index_pausing(self, disable): 1866 """Enable/disable index pausing during rebalance""" 1867 api = self.baseUrl + "internalSettings" 1868 params = {"rebalanceIndexPausingDisabled": str(disable).lower()} 1869 params = urllib.urlencode(params) 1870 status, content, header = self._http_request(api, "POST", params) 1871 log.info('index pausing during rebalance was set as rebalanceIndexPausingDisabled={0}'\ 1872 .format(str(disable).lower())) 1873 return status 1874 1875 def set_max_parallel_indexers(self, count): 1876 """set max parallel indexer threads""" 1877 api = self.baseUrl + "internalSettings" 1878 params = {"maxParallelIndexers": count} 1879 params = urllib.urlencode(params) 1880 status, content, header = self._http_request(api, "POST", params) 1881 log.info('max parallel indexer threads was set as maxParallelIndexers={0}'.\ 1882 format(count)) 1883 return status 1884 1885 def set_max_parallel_replica_indexers(self, count): 1886 """set max parallel replica indexers threads""" 1887 api = self.baseUrl + "internalSettings" 1888 params = {"maxParallelReplicaIndexers": count} 1889 params = urllib.urlencode(params) 1890 status, content, header = self._http_request(api, "POST", params) 1891 log.info('max parallel replica indexers threads was set as maxParallelReplicaIndexers={0}'.\ 1892 format(count)) 1893 return status 1894 1895 def get_internal_replication_type(self): 1896 buckets = self.get_buckets() 1897 cmd = "\'{ok, BC} = ns_bucket:get_bucket(%s), ns_bucket:replication_type(BC).\'" % buckets[0].name 1898 return self.diag_eval(cmd) 1899 1900 def set_mc_threads(self, mc_threads=4): 1901 """ 1902 Change number of memcached threads and restart the cluster 1903 """ 1904 cmd = "[ns_config:update_key({node, N, memcached}, " \ 1905 "fun (PList) -> lists:keystore(verbosity, 1, PList," \ 1906 " {verbosity, \"-t %s\"}) end) " \ 1907 "|| N <- ns_node_disco:nodes_wanted()]." % mc_threads 1908 1909 return self.diag_eval(cmd) 1910 1911 def set_auto_compaction(self, parallelDBAndVC="false", 1912 dbFragmentThreshold=None, 1913 viewFragmntThreshold=None, 1914 dbFragmentThresholdPercentage=None, 1915 viewFragmntThresholdPercentage=None, 1916 allowedTimePeriodFromHour=None, 1917 allowedTimePeriodFromMin=None, 1918 allowedTimePeriodToHour=None, 1919 allowedTimePeriodToMin=None, 1920 allowedTimePeriodAbort=None, 1921 bucket=None): 1922 """Reset compaction values to default, try with old fields (dp4 build) 1923 and then try with newer fields""" 1924 params = {} 1925 api = self.baseUrl 1926 1927 if bucket is None: 1928 # setting is cluster wide 1929 api = api + "controller/setAutoCompaction" 1930 else: 1931 # overriding per/bucket compaction setting 1932 api = api + "pools/default/buckets/" + bucket 1933 params["autoCompactionDefined"] = "true" 1934 # reuse current ram quota in mb per node 1935 num_nodes = len(self.node_statuses()) 1936 bucket_info = self.get_bucket_json(bucket) 1937 quota = self.get_bucket_json(bucket)["quota"]["ram"] / (1048576 * num_nodes) 1938 params["ramQuotaMB"] = quota 1939 if bucket_info["authType"] == "sasl" and bucket_info["name"] != "default": 1940 params["authType"] = self.get_bucket_json(bucket)["authType"] 1941 params["saslPassword"] = self.get_bucket_json(bucket)["saslPassword"] 1942 1943 params["parallelDBAndViewCompaction"] = parallelDBAndVC 1944 # Need to verify None because the value could be = 0 1945 if dbFragmentThreshold is not None: 1946 params["databaseFragmentationThreshold[size]"] = dbFragmentThreshold 1947 if viewFragmntThreshold is not None: 1948 params["viewFragmentationThreshold[size]"] = viewFragmntThreshold 1949 if dbFragmentThresholdPercentage is not None: 1950 params["databaseFragmentationThreshold[percentage]"] = dbFragmentThresholdPercentage 1951 if viewFragmntThresholdPercentage is not None: 1952 params["viewFragmentationThreshold[percentage]"] = viewFragmntThresholdPercentage 1953 if allowedTimePeriodFromHour is not None: 1954 params["allowedTimePeriod[fromHour]"] = allowedTimePeriodFromHour 1955 if allowedTimePeriodFromMin is not None: 1956 params["allowedTimePeriod[fromMinute]"] = allowedTimePeriodFromMin 1957 if allowedTimePeriodToHour is not None: 1958 params["allowedTimePeriod[toHour]"] = allowedTimePeriodToHour 1959 if allowedTimePeriodToMin is not None: 1960 params["allowedTimePeriod[toMinute]"] = allowedTimePeriodToMin 1961 if allowedTimePeriodAbort is not None: 1962 params["allowedTimePeriod[abortOutside]"] = allowedTimePeriodAbort 1963 1964 params = urllib.urlencode(params) 1965 log.info("'%s' bucket's settings will be changed with parameters: %s" % (bucket, params)) 1966 return self._http_request(api, "POST", params) 1967 1968 def set_global_loglevel(self, loglevel='error'): 1969 """Set cluster-wide logging level for core components 1970 1971 Possible loglevel: 1972 -- debug 1973 -- info 1974 -- warn 1975 -- error 1976 """ 1977 1978 api = self.baseUrl + 'diag/eval' 1979 request_body = 'rpc:eval_everywhere(erlang, apply, [fun () -> \ 1980 [ale:set_loglevel(L, {0}) || L <- \ 1981 [ns_server, couchdb, user, menelaus, ns_doctor, stats, \ 1982 rebalance, cluster, views, stderr]] end, []]).'.format(loglevel) 1983 return self._http_request(api=api, method='POST', params=request_body, 1984 headers=self._create_headers()) 1985 1986 def set_couchdb_option(self, section, option, value): 1987 """Dynamic settings changes""" 1988 1989 cmd = 'ns_config:set({{couchdb, {{{0}, {1}}}}}, {2}).'.format(section, 1990 option, 1991 value) 1992 return self.diag_eval(cmd) 1993 1994 def get_alerts(self): 1995 api = self.baseUrl + "pools/default/" 1996 status, content, header = self._http_request(api) 1997 json_parsed = json.loads(content) 1998 if status: 1999 if "alerts" in json_parsed: 2000 return json_parsed['alerts'] 2001 else: 2002 return None 2003 2004 def flush_bucket(self, bucket="default"): 2005 if isinstance(bucket, Bucket): 2006 bucket_name = bucket.name 2007 else: 2008 bucket_name = bucket 2009 api = self.baseUrl + "pools/default/buckets/%s/controller/doFlush" % (bucket_name) 2010 status, content, header = self._http_request(api, 'POST') 2011 if not status: 2012 raise BucketFlushFailed(self.ip, bucket_name) 2013 log.info("Flush for bucket '%s' was triggered" % bucket_name) 2014 2015 def update_notifications(self, enable): 2016 api = self.baseUrl + 'settings/stats' 2017 params = urllib.urlencode({'sendStats' : enable}) 2018 log.info('settings/stats params : {0}'.format(params)) 2019 status, content, header = self._http_request(api, 'POST', params) 2020 return status 2021 2022 def get_notifications(self): 2023 api = self.baseUrl + 'settings/stats' 2024 status, content, header = self._http_request(api) 2025 json_parsed = json.loads(content) 2026 if status: 2027 return json_parsed["sendStats"] 2028 return None 2029 2030 def get_logs(self, last_n=10, contains_text=None): 2031 api = self.baseUrl + 'logs' 2032 status, content, header = self._http_request(api) 2033 json_parsed = json.loads(content) 2034 logs = json_parsed['list'] 2035 logs.reverse() 2036 result = [] 2037 for i in xrange(min(last_n, len(logs))): 2038 result.append(logs[i]) 2039 if contains_text is not None and contains_text in logs[i]["text"]: 2040 break 2041 return result 2042 2043 def print_UI_logs(self, last_n=10, contains_text=None): 2044 logs = self.get_logs(last_n, contains_text) 2045 log.info("Latest logs from UI on {0}:".format(self.ip)) 2046 for lg in logs: log.error(lg) 2047 2048 def delete_ro_user(self): 2049 api = self.baseUrl + 'settings/readOnlyUser' 2050 status, content, header = self._http_request(api, 'DELETE', '') 2051 return status 2052 2053 def create_ro_user(self, username, password): 2054 api = self.baseUrl + 'settings/readOnlyUser' 2055 params = urllib.urlencode({'username' : username, 'password' : password}) 2056 log.info('settings/readOnlyUser params : {0}'.format(params)) 2057 status, content, header = self._http_request(api, 'POST', params) 2058 return status 2059 2060 def query_tool(self, query, port=8093, timeout=650, query_params={}): 2061 params = {'statement' : query} 2062 params.update(query_params) 2063 params = urllib.urlencode(params) 2064 log.info('query params : {0}'.format(params)) 2065 api = "http://%s:%s/query?%s" % (self.ip, port, params) 2066 status, content, header = self._http_request(api, 'POST', timeout=timeout) 2067 try: 2068 return json.loads(content) 2069 except ValueError: 2070 return content 2071 2072 def query_tool_stats(self): 2073 log.info('query n1ql stats') 2074 api = "http://%s:8093/query/stats" % (self.ip) 2075 status, content, header = self._http_request(api, 'GET') 2076 log.info(content) 2077 try: 2078 return json.loads(content) 2079 except ValueError: 2080 return content 2081 2082 # return all rack/zone info 2083 def get_all_zones_info(self, timeout=120): 2084 zones = {} 2085 api = self.baseUrl + 'pools/default/serverGroups' 2086 status, content, header = self._http_request(api, timeout=timeout) 2087 if status: 2088 zones = json.loads(content) 2089 else: 2090 raise Exception("Failed to get all zones info.\n \ 2091 Zone only supports from couchbase server version 2.5 and up.") 2092 return zones 2093 2094 # return group name and unique uuid 2095 def get_zone_names(self): 2096 zone_names = {} 2097 zone_info = self.get_all_zones_info() 2098 if zone_info and len(zone_info["groups"]) >= 1: 2099 for i in range(0, len(zone_info["groups"])): 2100 # pools/default/serverGroups/ = 27 chars 2101 zone_names[zone_info["groups"][i]["name"]] = zone_info["groups"][i]["uri"][28:] 2102 return zone_names 2103 2104 def add_zone(self, zone_name): 2105 api = self.baseUrl + 'pools/default/serverGroups' 2106 request_name = "name={0}".format(zone_name) 2107 status, content, header = self._http_request(api, "POST", \ 2108 params=request_name) 2109 if status: 2110 log.info("zone {0} is added".format(zone_name)) 2111 return True 2112 else: 2113 raise Exception("Failed to add zone with name: %s " % zone_name) 2114 2115 def delete_zone(self, zone_name): 2116 api = self.baseUrl + 'pools/default/serverGroups/' 2117 # check if zone exist 2118 found = False 2119 zones = self.get_zone_names() 2120 for zone in zones: 2121 if zone_name == zone: 2122 api += zones[zone_name] 2123 found = True 2124 break 2125 if not found: 2126 raise Exception("There is not zone with name: %s in cluster" % zone_name) 2127 status, content, header = self._http_request(api, "DELETE") 2128 if status: 2129 log.info("zone {0} is deleted".format(zone_name)) 2130 else: 2131 raise Exception("Failed to delete zone with name: %s " % zone_name) 2132 2133 def rename_zone(self, old_name, new_name): 2134 api = self.baseUrl + 'pools/default/serverGroups/' 2135 # check if zone exist 2136 found = False 2137 zones = self.get_zone_names() 2138 for zone in zones: 2139 if old_name == zone: 2140 api += zones[old_name] 2141 request_name = "name={0}".format(new_name) 2142 found = True 2143 break 2144 if not found: 2145 raise Exception("There is not zone with name: %s in cluster" % old_name) 2146 status, content, header = self._http_request(api, "PUT", params=request_name) 2147 if status: 2148 log.info("zone {0} is renamed to {1}".format(old_name, new_name)) 2149 else: 2150 raise Exception("Failed to rename zone with name: %s " % old_name) 2151 2152 # get all nodes info in one zone/rack/group 2153 def get_nodes_in_zone(self, zone_name): 2154 nodes = {} 2155 tmp = {} 2156 zone_info = self.get_all_zones_info() 2157 if zone_name != "": 2158 found = False 2159 if len(zone_info["groups"]) >= 1: 2160 for i in range(0, len(zone_info["groups"])): 2161 if zone_info["groups"][i]["name"] == zone_name: 2162 tmp = zone_info["groups"][i]["nodes"] 2163 if not tmp: 2164 log.info("zone {0} is existed but no node in it".format(zone_name)) 2165 # remove port 2166 for node in tmp: 2167 node["hostname"] = node["hostname"].split(":") 2168 node["hostname"] = node["hostname"][0] 2169 nodes[node["hostname"]] = node 2170 found = True 2171 break 2172 if not found: 2173 raise Exception("There is not zone with name: %s in cluster" % zone_name) 2174 return nodes 2175 2176 def get_zone_and_nodes(self): 2177 """ only return zones with node in its """ 2178 zones = {} 2179 tmp = {} 2180 zone_info = self.get_all_zones_info() 2181 if len(zone_info["groups"]) >= 1: 2182 for i in range(0, len(zone_info["groups"])): 2183 tmp = zone_info["groups"][i]["nodes"] 2184 if not tmp: 2185 log.info("zone {0} is existed but no node in it".format(tmp)) 2186 # remove port 2187 else: 2188 nodes = [] 2189 for node in tmp: 2190 node["hostname"] = node["hostname"].split(":") 2191 node["hostname"] = node["hostname"][0] 2192 print node["hostname"][0] 2193 nodes.append(node["hostname"]) 2194 zones[zone_info["groups"][i]["name"]] = nodes 2195 return zones 2196 2197 def get_zone_uri(self): 2198 zone_uri = {} 2199 zone_info = self.get_all_zones_info() 2200 if zone_info and len(zone_info["groups"]) >= 1: 2201 for i in range(0, len(zone_info["groups"])): 2202 zone_uri[zone_info["groups"][i]["name"]] = zone_info["groups"][i]["uri"] 2203 return zone_uri 2204 2205 def shuffle_nodes_in_zones(self, moved_nodes, source_zone, target_zone): 2206 # moved_nodes should be a IP list like 2207 # ["192.168.171.144", "192.168.171.145"] 2208 request = "" 2209 for i in range(0, len(moved_nodes)): 2210 moved_nodes[i] = "ns_1@" + moved_nodes[i] 2211 2212 all_zones = self.get_all_zones_info() 2213 api = self.baseUrl + all_zones["uri"][1:] 2214 2215 moved_node_json = [] 2216 for i in range(0, len(all_zones["groups"])): 2217 for node in all_zones["groups"][i]["nodes"]: 2218 if all_zones["groups"][i]["name"] == source_zone: 2219 for n in moved_nodes: 2220 if n == node["otpNode"]: 2221 moved_node_json.append({"otpNode": node["otpNode"]}) 2222 2223 zone_json = {} 2224 group_json = [] 2225 for i in range(0, len(all_zones["groups"])): 2226 node_j = [] 2227 zone_json["uri"] = all_zones["groups"][i]["uri"] 2228 zone_json["name"] = all_zones["groups"][i]["name"] 2229 zone_json["nodes"] = node_j 2230 2231 if not all_zones["groups"][i]["nodes"]: 2232 if all_zones["groups"][i]["name"] == target_zone: 2233 for i in range(0, len(moved_node_json)): 2234 zone_json["nodes"].append(moved_node_json[i]) 2235 else: 2236 zone_json["nodes"] = [] 2237 else: 2238 for node in all_zones["groups"][i]["nodes"]: 2239 if all_zones["groups"][i]["name"] == source_zone and \ 2240 node["otpNode"] in moved_nodes: 2241 pass 2242 else: 2243 node_j.append({"otpNode": node["otpNode"]}) 2244 if all_zones["groups"][i]["name"] == target_zone: 2245 for k in range(0, len(moved_node_json)): 2246 node_j.append(moved_node_json[k]) 2247 zone_json["nodes"] = node_j 2248 group_json.append({"name": zone_json["name"], "uri": zone_json["uri"], "nodes": zone_json["nodes"]}) 2249 request = '{{"groups": {0} }}'.format(json.dumps(group_json)) 2250 status, content, header = self._http_request(api, "PUT", params=request) 2251 # sample request format 2252 # request = ' {"groups":[{"uri":"/pools/default/serverGroups/0","nodes": [] },\ 2253 # {"uri":"/pools/default/serverGroups/c8275b7a88e6745c02815dde4a505e70","nodes": [] },\ 2254 # {"uri":"/pools/default/serverGroups/1acd9810a027068bd14a1ddd43db414f","nodes": \ 2255 # [{"otpNode":"ns_1@192.168.171.144"},{"otpNode":"ns_1@192.168.171.145"}]} ]} ' 2256 return status 2257 2258 def is_zone_exist(self, zone_name): 2259 found = False 2260 zones = self.get_zone_names() 2261 if zones: 2262 for zone in zones: 2263 if zone_name == zone: 2264 found = True 2265 return True 2266 break 2267 if not found: 2268 log.error("There is not zone with name: {0} in cluster.".format(zone_name)) 2269 return False 2270 2271 def start_cluster_logs_collection(self, nodes="*", upload=False, \ 2272 uploadHost=None, customer="", ticket=""): 2273 if not upload: 2274 params = urllib.urlencode({"nodes":nodes}) 2275 else: 2276 params = urllib.urlencode({"nodes":nodes, "uploadHost":uploadHost, \ 2277 "customer":customer, "ticket":ticket}) 2278 api = self.baseUrl + "controller/startLogsCollection" 2279 status, content, header = self._http_request(api, "POST", params) 2280 return status, content 2281 2282 def get_cluster_logs_collection_info(self): 2283 api = self.baseUrl + "pools/default/tasks/" 2284 status, content, header = self._http_request(api, "GET") 2285 if status: 2286 tmp = json.loads(content) 2287 for k in tmp: 2288 if k["type"] == "clusterLogsCollection": 2289 content = k 2290 return content 2291 return None 2292 2293 """ result["progress"]: progress logs collected at cluster level 2294 result["status]: status logs collected at cluster level 2295 result["perNode"]: all information logs collected at each node """ 2296 def get_cluster_logs_collection_status(self): 2297 result = self.get_cluster_logs_collection_info() 2298 if result: 2299 return result["progress"], result["status"], result["perNode"] 2300 return None, None, None 2301 2302 def cancel_cluster_logs_collection(self): 2303 api = self.baseUrl + "controller/cancelLogsCollection" 2304 status, content, header = self._http_request(api, "POST") 2305 return status, content 2306 2307 def get_bucket_CCCP(self, bucket): 2308 log.info("Getting CCCP config ") 2309 api = '%spools/default/b/%s' % (self.baseUrl, bucket) 2310 if isinstance(bucket, Bucket): 2311 api = '%spools/default/b/%s' % (self.baseUrl, bucket.name) 2312 status, content, header = self._http_request(api) 2313 if status: 2314 return json.loads(content) 2315 return None 2316 2317 def get_recovery_task(self): 2318 content = self.ns_server_tasks() 2319 for item in content: 2320 if item["type"] == "recovery": 2321 return item 2322 return None 2323 2324 2325 def get_recovery_progress(self, recoveryStatusURI): 2326 api = '%s%s' % (self.baseUrl, recoveryStatusURI) 2327 status, content, header = self._http_request(api) 2328 if status: 2329 return json.loads(content) 2330 return None 2331 2332 def get_warming_up_tasks(self): 2333 tasks = self.ns_server_tasks() 2334 tasks_warmup = [] 2335 for task in tasks: 2336 if task["type"] == "warming_up": 2337 tasks_warmup.append(task) 2338 return tasks_warmup 2339 2340 def compact_bucket(self, bucket="default"): 2341 api = self.baseUrl + 'pools/default/buckets/{0}/controller/compactBucket'.format(bucket) 2342 status, content, header = self._http_request(api, 'POST') 2343 if status: 2344 log.info('bucket compaction successful') 2345 else: 2346 raise BucketCompactionException(bucket) 2347 2348 return True 2349 2350 def cancel_bucket_compaction(self, bucket="default"): 2351 api = self.baseUrl + 'pools/default/buckets/{0}/controller/cancelBucketCompaction'.format(bucket) 2352 if isinstance(bucket, Bucket): 2353 api = self.baseUrl + 'pools/default/buckets/{0}/controller/cancelBucketCompaction'.format(bucket.name) 2354 status, content, header = self._http_request(api, 'POST') 2355 log.info("Status is {0}".format(status)) 2356 if status: 2357 log.info('Cancel bucket compaction successful') 2358 else: 2359 raise BucketCompactionException(bucket) 2360 return True 2361 2362class MembaseServerVersion: 2363 def __init__(self, implementationVersion='', componentsVersion=''): 2364 self.implementationVersion = implementationVersion 2365 self.componentsVersion = componentsVersion 2366 2367 2368#this class will also contain more node related info 2369class OtpNode(object): 2370 def __init__(self, id='', status=''): 2371 self.id = id 2372 self.ip = '' 2373 self.replication = '' 2374 self.port = 8091 2375 self.gracefulFailoverPossible = 'true' 2376 #extract ns ip from the otpNode string 2377 #its normally ns_1@10.20.30.40 2378 if id.find('@') >= 0: 2379 self.ip = id[id.index('@') + 1:] 2380 self.status = status 2381 2382 2383class NodeInfo(object): 2384 def __init__(self): 2385 self.availableStorage = None # list 2386 self.memoryQuota = None 2387 2388 2389class NodeDataStorage(object): 2390 def __init__(self): 2391 self.type = '' #hdd or ssd 2392 self.path = '' 2393 self.index_path = '' 2394 self.quotaMb = '' 2395 self.state = '' #ok 2396 2397 def __str__(self): 2398 return '{0}'.format({'type': self.type, 2399 'path': self.path, 2400 'index_path' : self.index_path, 2401 'quotaMb': self.quotaMb, 2402 'state': self.state}) 2403 2404 def get_data_path(self): 2405 return self.path 2406 2407 def get_index_path(self): 2408 return self.index_path 2409 2410 2411class NodeDiskStorage(object): 2412 def __init__(self): 2413 self.type = 0 2414 self.path = '' 2415 self.sizeKBytes = 0 2416 self.usagePercent = 0 2417 2418 2419class Bucket(object): 2420 def __init__(self, bucket_size='', name="", authType="sasl", saslPassword="", num_replicas=0, port=11211, master_id=None, 2421 type='', eviction_policy="valueOnly", bucket_priority=None): 2422 self.name = name 2423 self.port = port 2424 self.type = type 2425 self.nodes = None 2426 self.stats = None 2427 self.servers = [] 2428 self.vbuckets = [] 2429 self.forward_map = [] 2430 self.numReplicas = num_replicas 2431 self.saslPassword = saslPassword 2432 self.authType = "" 2433 self.bucket_size = bucket_size 2434 self.kvs = {1:KVStore()} 2435 self.authType = authType 2436 self.master_id = master_id 2437 self.eviction_policy = eviction_policy 2438 self.bucket_priority = bucket_priority 2439 2440 def __str__(self): 2441 return self.name 2442 2443 2444class Node(object): 2445 def __init__(self): 2446 self.uptime = 0 2447 self.memoryTotal = 0 2448 self.memoryFree = 0 2449 self.mcdMemoryReserved = 0 2450 self.mcdMemoryAllocated = 0 2451 self.status = "" 2452 self.hostname = "" 2453 self.clusterCompatibility = "" 2454 self.clusterMembership = "" 2455 self.version = "" 2456 self.os = "" 2457 self.ports = [] 2458 self.availableStorage = [] 2459 self.storage = [] 2460 self.memoryQuota = 0 2461 self.moxi = 11211 2462 self.memcached = 11210 2463 self.id = "" 2464 self.ip = "" 2465 self.rest_username = "" 2466 self.rest_password = "" 2467 self.port = 8091 2468 2469 2470class AutoFailoverSettings(object): 2471 def __init__(self): 2472 self.enabled = True 2473 self.timeout = 0 2474 self.count = 0 2475 2476 2477class NodePort(object): 2478 def __init__(self): 2479 self.proxy = 0 2480 self.direct = 0 2481 2482 2483class BucketStats(object): 2484 def __init__(self): 2485 self.opsPerSec = 0 2486 self.itemCount = 0 2487 self.diskUsed = 0 2488 self.memUsed = 0 2489 self.ram = 0 2490 2491 2492class vBucket(object): 2493 def __init__(self): 2494 self.master = '' 2495 self.replica = [] 2496 self.id = -1 2497 2498 2499class RestParser(object): 2500 def parse_get_nodes_response(self, parsed): 2501 node = Node() 2502 node.uptime = parsed['uptime'] 2503 node.memoryFree = parsed['memoryFree'] 2504 node.memoryTotal = parsed['memoryTotal'] 2505 node.mcdMemoryAllocated = parsed['mcdMemoryAllocated'] 2506 node.mcdMemoryReserved = parsed['mcdMemoryReserved'] 2507 node.status = parsed['status'] 2508 node.hostname = parsed['hostname'] 2509 node.clusterCompatibility = parsed['clusterCompatibility'] 2510 node.clusterMembership = parsed['clusterMembership'] 2511 node.version = parsed['version'] 2512 node.curr_items = 0 2513 if 'interestingStats' in parsed and 'curr_items' in parsed['interestingStats']: 2514 node.curr_items = parsed['interestingStats']['curr_items'] 2515 node.port = parsed["hostname"][parsed["hostname"].find(":") + 1:] 2516 node.os = parsed['os'] 2517 2518 if "otpNode" in parsed: 2519 node.id = parsed["otpNode"] 2520 if parsed["otpNode"].find('@') >= 0: 2521 node.ip = node.id[node.id.index('@') + 1:] 2522 elif "hostname" in parsed: 2523 node.ip = parsed["hostname"].split(":")[0] 2524 2525 # memoryQuota 2526 if 'memoryQuota' in parsed: 2527 node.memoryQuota = parsed['memoryQuota'] 2528 if 'availableStorage' in parsed: 2529 availableStorage = parsed['availableStorage'] 2530 for key in availableStorage: 2531 #let's assume there is only one disk in each noce 2532 dict_parsed = parsed['availableStorage'] 2533 if 'path' in dict_parsed and 'sizeKBytes' in dict_parsed and 'usagePercent' in dict_parsed: 2534 diskStorage = NodeDiskStorage() 2535 diskStorage.path = dict_parsed['path'] 2536 diskStorage.sizeKBytes = dict_parsed['sizeKBytes'] 2537 diskStorage.type = key 2538 diskStorage.usagePercent = dict_parsed['usagePercent'] 2539 node.availableStorage.append(diskStorage) 2540 log.info(diskStorage) 2541 2542 if 'storage' in parsed: 2543 storage = parsed['storage'] 2544 for key in storage: 2545 disk_storage_list = storage[key] 2546 for dict_parsed in disk_storage_list: 2547 if 'path' in dict_parsed and 'state' in dict_parsed and 'quotaMb' in dict_parsed: 2548 dataStorage = NodeDataStorage() 2549 dataStorage.path = dict_parsed['path'] 2550 dataStorage.index_path = dict_parsed.get('index_path', '') 2551 dataStorage.quotaMb = dict_parsed['quotaMb'] 2552 dataStorage.state = dict_parsed['state'] 2553 dataStorage.type = key 2554 node.storage.append(dataStorage) 2555 2556 # ports":{"proxy":11211,"direct":11210} 2557 if "ports" in parsed: 2558 ports = parsed["ports"] 2559 if "proxy" in ports: 2560 node.moxi = ports["proxy"] 2561 if "direct" in ports: 2562 node.memcached = ports["direct"] 2563 return node 2564 2565 def parse_get_bucket_response(self, response): 2566 parsed = json.loads(response) 2567 return self.parse_get_bucket_json(parsed) 2568 2569 def parse_get_bucket_json(self, parsed): 2570 bucket = Bucket() 2571 bucket.name = parsed['name'] 2572 bucket.type = parsed['bucketType'] 2573 bucket.port = parsed['proxyPort'] 2574 bucket.authType = parsed["authType"] 2575 bucket.saslPassword = parsed["saslPassword"] 2576 bucket.nodes = list() 2577 if 'vBucketServerMap' in parsed: 2578 vBucketServerMap = parsed['vBucketServerMap'] 2579 serverList = vBucketServerMap['serverList'] 2580 bucket.servers.extend(serverList) 2581 if "numReplicas" in vBucketServerMap: 2582 bucket.numReplicas = vBucketServerMap["numReplicas"] 2583 #vBucketMapForward 2584 if 'vBucketMapForward' in vBucketServerMap: 2585 #let's gather the forward map 2586 vBucketMapForward = vBucketServerMap['vBucketMapForward'] 2587 counter = 0 2588 for vbucket in vBucketMapForward: 2589 #there will be n number of replicas 2590 vbucketInfo = vBucket() 2591 vbucketInfo.master = serverList[vbucket[0]] 2592 if vbucket: 2593 for i in range(1, len(vbucket)): 2594 if vbucket[i] != -1: 2595 vbucketInfo.replica.append(serverList[vbucket[i]]) 2596 vbucketInfo.id = counter 2597 counter += 1 2598 bucket.forward_map.append(vbucketInfo) 2599 vBucketMap = vBucketServerMap['vBucketMap'] 2600 counter = 0 2601 for vbucket in vBucketMap: 2602 #there will be n number of replicas 2603 vbucketInfo = vBucket() 2604 vbucketInfo.master = serverList[vbucket[0]] 2605 if vbucket: 2606 for i in range(1, len(vbucket)): 2607 if vbucket[i] != -1: 2608 vbucketInfo.replica.append(serverList[vbucket[i]]) 2609 vbucketInfo.id = counter 2610 counter += 1 2611 bucket.vbuckets.append(vbucketInfo) 2612 #now go through each vbucket and populate the info 2613 #who is master , who is replica 2614 # get the 'storageTotals' 2615 log.debug('read {0} vbuckets'.format(len(bucket.vbuckets))) 2616 stats = parsed['basicStats'] 2617 #vBucketServerMap 2618 bucketStats = BucketStats() 2619 log.debug('stats:{0}'.format(stats)) 2620 bucketStats.opsPerSec = stats['opsPerSec'] 2621 bucketStats.itemCount = stats['itemCount'] 2622 if bucket.type != "memcached": 2623 bucketStats.diskUsed = stats['diskUsed'] 2624 bucketStats.memUsed = stats['memUsed'] 2625 quota = parsed['quota'] 2626 bucketStats.ram = quota['ram'] 2627 bucket.stats = bucketStats 2628 nodes = parsed['nodes'] 2629 for nodeDictionary in nodes: 2630 node = Node() 2631 node.uptime = nodeDictionary['uptime'] 2632 node.memoryFree = nodeDictionary['memoryFree'] 2633 node.memoryTotal = nodeDictionary['memoryTotal'] 2634 node.mcdMemoryAllocated = nodeDictionary['mcdMemoryAllocated'] 2635 node.mcdMemoryReserved = nodeDictionary['mcdMemoryReserved'] 2636 node.status = nodeDictionary['status'] 2637 node.hostname = nodeDictionary['hostname'] 2638 if 'clusterCompatibility' in nodeDictionary: 2639 node.clusterCompatibility = nodeDictionary['clusterCompatibility'] 2640 if 'clusterMembership' in nodeDictionary: 2641 node.clusterCompatibility = nodeDictionary['clusterMembership'] 2642 node.version = nodeDictionary['version'] 2643 node.os = nodeDictionary['os'] 2644 if "ports" in nodeDictionary: 2645 ports = nodeDictionary["ports"] 2646 if "proxy" in ports: 2647 node.moxi = ports["proxy"] 2648 if "direct" in ports: 2649 node.memcached = ports["direct"] 2650 if "hostname" in nodeDictionary: 2651 value = str(nodeDictionary["hostname"]) 2652 node.ip = value[:value.rfind(":")] 2653 node.port = int(value[value.rfind(":") + 1:]) 2654 if "otpNode" in nodeDictionary: 2655 node.id = nodeDictionary["otpNode"] 2656 bucket.nodes.append(node) 2657 return bucket 2658 2659 2660