1import base64
2import json
3import urllib
4import httplib2
5import socket
6import time
7import logger
8import uuid
9from copy import deepcopy
10from threading import Thread
11
12try:
13    from couchbase_helper.document import DesignDocument, View
14except ImportError:
15    from lib.couchbase_helper.document import DesignDocument, View
16
17from memcached.helper.kvstore import KVStore
18from exception import ServerAlreadyJoinedException, ServerUnavailableException, InvalidArgumentException
19from membase.api.exception import BucketCreationException, ServerSelfJoinException, ClusterRemoteException, \
20    RebalanceFailedException, FailoverFailedException, DesignDocCreationException, QueryViewException, \
21    ReadDocumentException, GetBucketInfoFailed, CompactViewFailed, SetViewInfoNotFound, AddNodeException, \
22    BucketFlushFailed, CBRecoveryFailedException, XDCRException, SetRecoveryTypeFailed, BucketCompactionException
23log = logger.Logger.get_logger()
24
25#helper library methods built on top of RestConnection interface
26
27class RestHelper(object):
28    def __init__(self, rest_connection):
29        self.rest = rest_connection
30
31    def is_ns_server_running(self, timeout_in_seconds=360):
32        end_time = time.time() + timeout_in_seconds
33        while time.time() <= end_time:
34            try:
35                status = self.rest.get_nodes_self(5)
36                if status is not None and status.status == 'healthy':
37                    return True
38                else:
39                    if status is not None:
40                        log.warn("server {0}:{1} status is {2}".format(self.rest.ip, self.rest.port, status.status))
41                    else:
42                        log.warn("server {0}:{1} status is down".format(self.rest.ip, self.rest.port))
43            except ServerUnavailableException:
44                log.error("server {0}:{1} is unavailable".format(self.rest.ip, self.rest.port))
45            time.sleep(1)
46        msg = 'unable to connect to the node {0} even after waiting {1} seconds'
47        log.error(msg.format(self.rest.ip, timeout_in_seconds))
48        return False
49
50    def is_cluster_healthy(self, timeout=120):
51        #get the nodes and verify that all the nodes.status are healthy
52        nodes = self.rest.node_statuses(timeout)
53        return all(node.status == 'healthy' for node in nodes)
54
55    def rebalance_reached(self, percentage=100):
56        start = time.time()
57        progress = 0
58        previous_progress = 0
59        retry = 0
60        while progress is not -1 and progress < percentage and retry < 40:
61            #-1 is error , -100 means could not retrieve progress
62            progress = self.rest._rebalance_progress()
63            if progress == -100:
64                log.error("unable to retrieve rebalanceProgress.try again in 2 seconds")
65                retry += 1
66            else:
67                if previous_progress == progress:
68                    retry += 0.5
69                else:
70                    retry = 0
71                    previous_progress = progress
72            #sleep for 2 seconds
73            time.sleep(2)
74        if progress <= 0:
75            log.error("rebalance progress code : {0}".format(progress))
76            return False
77        elif retry >= 40:
78            log.error("rebalance stuck on {0}%".format(progress))
79            return False
80        else:
81            duration = time.time() - start
82            log.info('rebalance reached >{0}% in {1} seconds '.format(progress, duration))
83            return True
84
85    #return true if cluster balanced, false if it needs rebalance
86    def is_cluster_rebalanced(self):
87        command = "ns_orchestrator:needs_rebalance()"
88        status, content = self.rest.diag_eval(command)
89        if status:
90            return content.lower() == "false"
91        log.error("can't define if cluster balanced")
92        return None
93
94
95    #this method will rebalance the cluster by passing the remote_node as
96    #ejected node
97    def remove_nodes(self, knownNodes, ejectedNodes, wait_for_rebalance=True):
98        if len(ejectedNodes) == 0:
99            return False
100        self.rest.rebalance(knownNodes, ejectedNodes)
101        if wait_for_rebalance:
102            return self.rest.monitorRebalance()
103        else:
104            return False
105
106    def vbucket_map_ready(self, bucket, timeout_in_seconds=360):
107        end_time = time.time() + timeout_in_seconds
108        while time.time() <= end_time:
109            vBuckets = self.rest.get_vbuckets(bucket)
110            if vBuckets:
111                return True
112            else:
113                time.sleep(0.5)
114        msg = 'vbucket map is not ready for bucket {0} after waiting {1} seconds'
115        log.info(msg.format(bucket, timeout_in_seconds))
116        return False
117
118    def bucket_exists(self, bucket):
119        try:
120            buckets = self.rest.get_buckets()
121            names = [item.name for item in buckets]
122            log.info("node {1} existing buckets : {0}" \
123                              .format(names, self.rest.ip))
124            for item in buckets:
125                if item.name == bucket:
126                    log.info("node {1} found bucket {0}" \
127                             .format(bucket, self.rest.ip))
128                    return True
129            return False
130        except Exception:
131            return False
132
133    def wait_for_node_status(self, node, expected_status, timeout_in_seconds):
134        status_reached = False
135        end_time = time.time() + timeout_in_seconds
136        while time.time() <= end_time and not status_reached:
137            nodes = self.rest.node_statuses()
138            for n in nodes:
139                if node.id == n.id:
140                    log.info('node {0} status : {1}'.format(node.id, n.status))
141                    if n.status.lower() == expected_status.lower():
142                        status_reached = True
143                    break
144            if not status_reached:
145                log.info("sleep for 5 seconds before reading the node.status again")
146                time.sleep(5)
147        log.info('node {0} status_reached : {1}'.format(node.id, status_reached))
148        return status_reached
149
150    def _wait_for_task_pid(self, pid, end_time, ddoc_name):
151        while (time.time() < end_time):
152            new_pid, _ = self.rest._get_indexer_task_pid(ddoc_name)
153            if pid == new_pid:
154                time.sleep(5)
155                continue
156            else:
157                return
158
159    def _wait_for_indexer_ddoc(self, servers, ddoc_name, timeout=300):
160        nodes = self.rest.get_nodes()
161        servers_to_check = []
162        for node in nodes:
163            for server in servers:
164                if node.ip == server.ip and str(node.port) == str(server.port):
165                    servers_to_check.append(server)
166        for server in servers_to_check:
167            try:
168                rest = RestConnection(server)
169                log.info('Check index for ddoc %s , server %s' % (ddoc_name, server.ip))
170                end_time = time.time() + timeout
171                log.info('Start getting index for ddoc %s , server %s' % (ddoc_name, server.ip))
172                old_pid, is_pid_blocked = rest._get_indexer_task_pid(ddoc_name)
173                if not old_pid:
174                    log.info('Index for ddoc %s is not going on, server %s' % (ddoc_name, server.ip))
175                    continue
176                while is_pid_blocked:
177                    log.info('Index for ddoc %s is blocked, server %s' % (ddoc_name, server.ip))
178                    self._wait_for_task_pid(old_pid, end_time, ddoc_name)
179                    old_pid, is_pid_blocked = rest._get_indexer_task_pid(ddoc_name)
180                    if time.time() > end_time:
181                        log.error("INDEX IS STILL BLOKED node %s ddoc % pid %" % (server, ddoc_name, old_pid))
182                        break
183                if old_pid:
184                    log.info('Index for ddoc %s is running, server %s' % (ddoc_name, server.ip))
185                    self._wait_for_task_pid(old_pid, end_time, ddoc_name)
186            except Exception, ex:
187                log.error('unable to check index on server %s because of %s' % (server.ip, str(ex)))
188
189    def _get_vbuckets(self, servers, bucket_name='default'):
190        vbuckets_servers = {}
191        for server in servers:
192            buckets = RestConnection(server).get_buckets()
193            if bucket_name:
194                bucket_to_check = [bucket for bucket in buckets
195                               if bucket.name == bucket_name][0]
196            else:
197                bucket_to_check = [bucket for bucket in buckets][0]
198            vbuckets_servers[server] = {}
199            vbs_active = [vb.id for vb in bucket_to_check.vbuckets
200                           if vb.master.startswith(str(server.ip))]
201            vbs_replica = []
202            for replica_num in xrange(0, bucket_to_check.numReplicas):
203                vbs_replica.extend([vb.id for vb in bucket_to_check.vbuckets
204                                    if vb.replica[replica_num].startswith(str(server.ip))])
205            vbuckets_servers[server]['active_vb'] = vbs_active
206            vbuckets_servers[server]['replica_vb'] = vbs_replica
207        return vbuckets_servers
208
209class RestConnection(object):
210
211    def __new__(self, serverInfo={}):
212
213
214        # allow port to determine
215        # behavior of restconnection
216        port = None
217        if isinstance(serverInfo, dict):
218            if 'port' in serverInfo:
219                port = serverInfo['port']
220        else:
221            port = serverInfo.port
222
223        if not port:
224            port = 8091
225
226        if int(port) in xrange(9091, 9100):
227            # return elastic search rest connection
228            from membase.api.esrest_client import EsRestConnection
229            obj = object.__new__(EsRestConnection, serverInfo)
230        else:
231            # default
232            obj = object.__new__(self, serverInfo)
233        return obj
234
235    def __init__(self, serverInfo):
236        #serverInfo can be a json object
237        if isinstance(serverInfo, dict):
238            self.ip = serverInfo["ip"]
239            self.username = serverInfo["username"]
240            self.password = serverInfo["password"]
241            self.port = serverInfo["port"]
242            self.hostname = ''
243            if "hostname" in serverInfo:
244                self.hostname = serverInfo["hostname"]
245        else:
246            self.ip = serverInfo.ip
247            self.username = serverInfo.rest_username
248            self.password = serverInfo.rest_password
249            self.port = serverInfo.port
250            self.hostname = ''
251            if hasattr(serverInfo, 'hostname') and serverInfo.hostname and\
252               serverInfo.hostname.find(self.ip) == -1:
253                self.hostname = serverInfo.hostname
254        self.baseUrl = "http://{0}:{1}/".format(self.ip, self.port)
255        self.capiBaseUrl = "http://{0}:{1}/".format(self.ip, 8092)
256        if self.hostname:
257            self.baseUrl = "http://{0}:{1}/".format(self.hostname, self.port)
258            self.capiBaseUrl = "http://{0}:{1}/".format(self.hostname, 8092)
259        #for Node is unknown to this cluster error
260        for iteration in xrange(5):
261            http_res, success = self.init_http_request(self.baseUrl + 'nodes/self')
262            if not success and type(http_res) == unicode and\
263               (http_res.find('Node is unknown to this cluster') > -1 or http_res.find('Unexpected server error, request logged') > -1):
264                log.error("Error {0} was gotten, 5 seconds sleep before retry".format(http_res))
265                time.sleep(5)
266                if iteration == 2:
267                    log.error("node {0}:{1} is in a broken state!".format(self.ip, self.port))
268                    raise ServerUnavailableException(self.ip)
269                continue
270            else:
271                break
272        #determine the real couchApiBase for cluster_run
273        #couchApiBase appeared in version 2.*
274        if not http_res or http_res["version"][0:2] == "1.":
275            self.capiBaseUrl = self.baseUrl + "/couchBase"
276        else:
277            for iteration in xrange(5):
278                if "couchApiBase" not in http_res.keys():
279                    if self.is_cluster_mixed():
280                        self.capiBaseUrl = self.baseUrl + "/couchBase"
281                        return
282                    time.sleep(0.2)
283                    http_res, success = self.init_http_request(self.baseUrl + 'nodes/self')
284                else:
285                    self.capiBaseUrl = http_res["couchApiBase"]
286                    return
287            raise ServerUnavailableException("couchApiBase doesn't exist in nodes/self: %s " % http_res)
288
289    def sasl_streaming_rq(self, bucket, timeout=120):
290        api = self.baseUrl + 'pools/default/bucketsStreaming/{0}'.format(bucket)
291        if isinstance(bucket, Bucket):
292            api = self.baseUrl + 'pools/default/bucketsStreaming/{0}'.format(bucket.name)
293        try:
294            httplib2.Http(timeout=timeout).request(api, 'GET', '',
295                                                   headers=self._create_capi_headers_with_auth(self.username, self.password))
296        except Exception, ex:
297            log.warn('Exception while streaming: %s' % str(ex))
298
299    def open_sasl_streaming_connection(self, bucket, timeout=1000):
300        log.info("Opening sasl streaming connection for bucket %s" %
301                 (bucket, bucket.name)[isinstance(bucket, Bucket)])
302        t = Thread(target=self.sasl_streaming_rq,
303                          name="streaming_" + str(uuid.uuid4())[:4],
304                          args=(bucket, timeout))
305        try:
306            t.start()
307        except:
308            log.warn("thread is not started")
309            return None
310        return t
311
312    def is_cluster_mixed(self):
313            http_res, success = self.init_http_request(self.baseUrl + 'pools/default')
314            if http_res == u'unknown pool':
315                return False
316            versions = list(set([node["version"][:1] for node in http_res["nodes"]]))
317            if '1' in versions and '2' in versions:
318                 return True
319            return False
320
321    def is_enterprise_edition(self):
322        http_res, success = self.init_http_request(self.baseUrl + 'pools/default')
323        if http_res == u'unknown pool':
324            return False
325        editions = []
326        community_nodes = []
327        """ get the last word in node["version"] as in "version": "2.5.1-1073-rel-enterprise" """
328        for node in http_res["nodes"]:
329            editions.extend(node["version"].split("-")[-1:])
330            if "community" in node["version"].split("-")[-1:]:
331                community_nodes.extend(node["hostname"].split(":")[:1])
332        if "community" in editions:
333            log.error("IP(s) for node(s) with community edition {0}".format(community_nodes))
334            return False
335        return True
336
337    def init_http_request(self, api):
338        try:
339            status, content, header = self._http_request(api, 'GET', headers=self._create_capi_headers_with_auth(self.username, self.password))
340            json_parsed = json.loads(content)
341            if status:
342                return json_parsed, True
343            else:
344                print("{0} with status {1}: {2}".format(api, status, json_parsed))
345                return json_parsed, False
346        except ValueError:
347            print("{0}: {1}".format(api, content))
348            return content, False
349
350    def rename_node(self, hostname, username='Administrator', password='password'):
351        params = urllib.urlencode({'username': username,
352                                   'password': password,
353                                   'hostname': hostname})
354
355        api = "%snode/controller/rename" % (self.baseUrl)
356        status, content, header = self._http_request(api, 'POST', params)
357        return status, content
358
359    def active_tasks(self):
360        api = self.capiBaseUrl + "_active_tasks"
361        try:
362            status, content, header = self._http_request(api, 'GET', headers=self._create_capi_headers())
363            json_parsed = json.loads(content)
364        except ValueError:
365            return ""
366        return json_parsed
367
368    def ns_server_tasks(self):
369        api = self.baseUrl + 'pools/default/tasks'
370        try:
371            status, content, header = self._http_request(api, 'GET', headers=self._create_headers())
372            return json.loads(content)
373        except ValueError:
374            return ""
375
376    # DEPRECATED: use create_ddoc() instead.
377    def create_view(self, design_doc_name, bucket_name, views, options=None):
378        return self.create_ddoc(design_doc_name, bucket_name, views, options)
379
380    def create_ddoc(self, design_doc_name, bucket, views, options=None):
381        design_doc = DesignDocument(design_doc_name, views, options=options)
382        if design_doc.name.find('/') != -1:
383            design_doc.name = design_doc.name.replace('/', '%2f')
384            design_doc.id = '_design/{0}'.format(design_doc.name)
385        return self.create_design_document(bucket, design_doc)
386
387    def create_design_document(self, bucket, design_doc):
388        design_doc_name = design_doc.id
389        api = '%s/%s/%s' % (self.capiBaseUrl, bucket, design_doc_name)
390        if isinstance(bucket, Bucket):
391            api = '%s/%s/%s' % (self.capiBaseUrl, bucket.name, design_doc_name)
392
393        if isinstance(bucket, Bucket) and bucket.authType == "sasl":
394            status, content, header = self._http_request(api, 'PUT', str(design_doc),
395                                                headers=self._create_capi_headers_with_auth(
396                                                username=bucket.name, password=bucket.saslPassword))
397        else:
398            status, content, header = self._http_request(api, 'PUT', str(design_doc),
399                                                 headers=self._create_capi_headers())
400        if not status:
401            raise DesignDocCreationException(design_doc_name, content)
402        return json.loads(content)
403
404    def is_index_triggered(self, ddoc_name, index_type='main'):
405        run, block = self._get_indexer_task_pid(ddoc_name, index_type=index_type)
406        if run or block:
407            return True
408        else:
409            return False
410
411    def _get_indexer_task_pid(self, ddoc_name, index_type='main'):
412        active_tasks = self.active_tasks()
413        if u'error' in active_tasks:
414            return None
415        if active_tasks:
416            for task in active_tasks:
417                if task['type'] == 'indexer' and task['indexer_type'] == index_type:
418                    for ddoc in task['design_documents']:
419                        if ddoc == ('_design/%s' % ddoc_name):
420                            return task['pid'], False
421                if task['type'] == 'blocked_indexer' and task['indexer_type'] == index_type:
422                    for ddoc in task['design_documents']:
423                        if ddoc == ('_design/%s' % ddoc_name):
424                            return task['pid'], True
425        return None, None
426
427    def query_view(self, design_doc_name, view_name, bucket, query, timeout=120, invalid_query=False, type="view"):
428        status, content, header = self._query(design_doc_name, view_name, bucket, type, query, timeout)
429        if not status and not invalid_query:
430            stat = 0
431            if 'status' in header:
432                stat = int(header['status'])
433            raise QueryViewException(view_name, content, status=stat)
434        return json.loads(content)
435
436    def _query(self, design_doc_name, view_name, bucket, view_type, query, timeout):
437        if design_doc_name.find('/') != -1:
438            design_doc_name = design_doc_name.replace('/', '%2f')
439        if view_name.find('/') != -1:
440            view_name = view_name.replace('/', '%2f')
441        api = self.capiBaseUrl + '%s/_design/%s/_%s/%s?%s' % (bucket,
442                                               design_doc_name, view_type,
443                                               view_name,
444                                               urllib.urlencode(query))
445        if isinstance(bucket, Bucket):
446            api = self.capiBaseUrl + '%s/_design/%s/_%s/%s?%s' % (bucket.name,
447                                                  design_doc_name, view_type,
448                                                  view_name,
449                                                  urllib.urlencode(query))
450        log.info("index query url: {0}".format(api))
451        if isinstance(bucket, Bucket) and bucket.authType == "sasl":
452            status, content, header = self._http_request(api, headers=self._create_capi_headers_with_auth(
453                                                username=bucket.name, password=bucket.saslPassword),
454                                                timeout=timeout)
455        else:
456            status, content, header = self._http_request(api, headers=self._create_capi_headers(),
457                                             timeout=timeout)
458        return status, content, header
459
460    def view_results(self, bucket, ddoc_name, params, limit=100, timeout=120,
461                     view_name=None):
462        status, json = self._index_results(bucket, "view", ddoc_name, params, limit, timeout=timeout, view_name=view_name)
463        if not status:
464            raise Exception("unable to obtain view results")
465        return json
466
467
468    # DEPRECATED: Incorrectly named function kept for backwards compatibility.
469    def get_view(self, bucket, view):
470        log.info("DEPRECATED function get_view(" + view + "). use get_ddoc()")
471        return self.get_ddoc(bucket, view)
472
473    def get_data_path(self):
474        node_info = self.get_nodes_self()
475        data_path = node_info.storage[0].get_data_path()
476        return data_path
477
478    def get_memcached_port(self):
479        node_info = self.get_nodes_self()
480        return node_info.memcached
481
482    def get_ddoc(self, bucket, ddoc_name):
483        status, json, meta = self._get_design_doc(bucket, ddoc_name)
484        if not status:
485            raise ReadDocumentException(ddoc_name, json)
486        return json, meta
487
488    #the same as Preview a Random Document on UI
489    def get_random_key(self, bucket):
490        api = self.baseUrl + 'pools/default/buckets/%s/localRandomKey' % (bucket)
491        status, content, header = self._http_request(api, headers=self._create_capi_headers())
492        json_parsed = json.loads(content)
493        if not status:
494            raise Exception("unable to get random document/key for bucket %s" % (bucket))
495        return json_parsed
496
497    def run_view(self, bucket, view, name):
498        api = self.capiBaseUrl + '/%s/_design/%s/_view/%s' % (bucket, view, name)
499        status, content, header = self._http_request(api, headers=self._create_capi_headers())
500        json_parsed = json.loads(content)
501        if not status:
502            raise Exception("unable to create view")
503        return json_parsed
504
505    def delete_view(self, bucket, view):
506        status, json = self._delete_design_doc(bucket, view)
507        if not status:
508            raise Exception("unable to delete the view")
509        return json
510
511    def spatial_results(self, bucket, spatial, params, limit=100):
512        status, json = self._index_results(bucket, "spatial", spatial,
513                                           params, limit)
514        if not status:
515            raise Exception("unable to obtain spatial view results")
516        return json
517
518    def create_spatial(self, bucket, spatial, function):
519        status, json = self._create_design_doc(bucket, spatial, function)
520        if status == False:
521            raise Exception("unable to create spatial view")
522        return json
523
524    def get_spatial(self, bucket, spatial):
525        status, json, meta = self._get_design_doc(bucket, spatial)
526        if not status:
527            raise Exception("unable to get the spatial view definition")
528        return json, meta
529
530    def delete_spatial(self, bucket, spatial):
531        status, json = self._delete_design_doc(bucket, spatial)
532        if not status:
533            raise Exception("unable to delete the spatial view")
534        return json
535
536    # type_ is "view" or "spatial"
537    def _index_results(self, bucket, type_, ddoc_name, params, limit, timeout=120,
538                       view_name=None):
539        if view_name is None:
540            view_name = ddoc_name
541        query = '/{0}/_design/{1}/_{2}/{3}'
542        api = self.capiBaseUrl + query.format(bucket, ddoc_name, type_, view_name)
543
544        num_params = 0
545        if limit != None:
546            num_params = 1
547            api += "?limit={0}".format(limit)
548        for param in params:
549            if num_params > 0:
550                api += "&"
551            else:
552                api += "?"
553            num_params += 1
554
555            if param in ["key", "startkey", "endkey", "start_range",
556                         "end_range"] or isinstance(params[param], bool):
557                api += "{0}={1}".format(param,
558                                        json.dumps(params[param],
559                                                   separators=(',',':')))
560            else:
561                api += "{0}={1}".format(param, params[param])
562
563        log.info("index query url: {0}".format(api))
564        status, content, header = self._http_request(api, headers=self._create_capi_headers(), timeout=timeout)
565        json_parsed = json.loads(content)
566        return status, json_parsed
567
568    def get_couch_doc(self, doc_id, bucket="default", timeout=120):
569        """ use couchBase uri to retrieve document from a bucket """
570        api = self.capiBaseUrl + '/%s/%s' % (bucket, doc_id)
571        status, content, header = self._http_request(api, headers=self._create_capi_headers(),
572                                             timeout=timeout)
573        if not status:
574            raise ReadDocumentException(doc_id, content)
575        return  json.loads(content)
576
577    def _create_design_doc(self, bucket, name, function):
578        api = self.capiBaseUrl + '/%s/_design/%s' % (bucket, name)
579        status, content, header = self._http_request(
580            api, 'PUT', function, headers=self._create_capi_headers())
581        json_parsed = json.loads(content)
582        return status, json_parsed
583
584    def _get_design_doc(self, bucket, name):
585        api = self.capiBaseUrl + '/%s/_design/%s' % (bucket, name)
586        if isinstance(bucket, Bucket):
587            api = self.capiBaseUrl + '/%s/_design/%s' % (bucket.name, name)
588
589        if isinstance(bucket, Bucket) and bucket.authType == "sasl" and bucket.name != "default":
590            status, content, header = self._http_request(api, headers=self._create_capi_headers_with_auth(
591                                                username=bucket.name, password=bucket.saslPassword))
592        else:
593            status, content, header = self._http_request(api, headers=self._create_capi_headers())
594        json_parsed = json.loads(content)
595        meta_parsed = ""
596        if status:
597            #in dp4 builds meta data is in content, not in header
598            if 'x-couchbase-meta' in header:
599                meta = header['x-couchbase-meta']
600                meta_parsed = json.loads(meta)
601            else:
602                meta_parsed = {}
603                meta_parsed["_rev"] = json_parsed["_rev"]
604                meta_parsed["_id"] = json_parsed["_id"]
605        return status, json_parsed, meta_parsed
606
607    def _delete_design_doc(self, bucket, name):
608        status, design_doc, meta = self._get_design_doc(bucket, name)
609        if not status:
610            raise Exception("unable to find for deletion design document")
611        api = self.capiBaseUrl + '/%s/_design/%s' % (bucket, name)
612        if isinstance(bucket, Bucket):
613            api = self.capiBaseUrl + '/%s/_design/%s' % (bucket.name, name)
614        if isinstance(bucket, Bucket) and bucket.authType == "sasl" and bucket.name != "default":
615            status, content, header = self._http_request(api, 'DELETE', headers=self._create_capi_headers_with_auth(
616                                                username=bucket.name, password=bucket.saslPassword))
617        else:
618            status, content, header = self._http_request(api, 'DELETE', headers=self._create_capi_headers())
619        json_parsed = json.loads(content)
620        return status, json_parsed
621
622    def spatial_compaction(self, bucket, design_name):
623        api = self.capiBaseUrl + '/%s/_design/%s/_spatial/_compact' % (bucket, design_name)
624        if isinstance(bucket, Bucket):
625            api = self.capiBaseUrl + \
626            '/%s/_design/%s/_spatial/_compact' % (bucket.name, design_name)
627
628        if isinstance(bucket, Bucket) and bucket.authType == "sasl":
629            status, content, header = self._http_request(api, 'POST', headers=self._create_capi_headers_with_auth(
630                                                username=bucket.name, password=bucket.saslPassword))
631        else:
632            status, content, header = self._http_request(api, 'POST', headers=self._create_capi_headers())
633        json_parsed = json.loads(content)
634        return status, json_parsed
635
636    # Make a _design/_info request
637    def set_view_info(self, bucket, design_name):
638        """Get view diagnostic info (node specific)"""
639        api = self.capiBaseUrl
640        if isinstance(bucket, Bucket):
641            api += '/_set_view/{0}/_design/{1}/_info'.format(bucket.name, design_name)
642        else:
643            api += '_set_view/{0}/_design/{1}/_info'.format(bucket, design_name)
644
645        if isinstance(bucket, Bucket) and bucket.authType == "sasl":
646            headers = self._create_capi_headers_with_auth(
647                username=bucket.name, password=bucket.saslPassword)
648            status, content, header = self._http_request(api, 'POST',
649                                                         headers=headers)
650        else:
651            headers = self._create_capi_headers()
652            status, content, header = self._http_request(api, 'GET',
653                                                         headers=headers)
654        if not status:
655            raise SetViewInfoNotFound(design_name, content)
656        json_parsed = json.loads(content)
657        return status, json_parsed
658
659    # Make a _spatial/_info request
660    def spatial_info(self, bucket, design_name):
661        api = self.capiBaseUrl + \
662            '/%s/_design/%s/_spatial/_info' % (bucket, design_name)
663        status, content, header = self._http_request(
664            api, 'GET', headers=self._create_capi_headers())
665        json_parsed = json.loads(content)
666        return status, json_parsed
667
668    def _create_capi_headers(self):
669        return {'Content-Type': 'application/json',
670                'Accept': '*/*'}
671
672    def _create_capi_headers_with_auth(self, username, password):
673        authorization = base64.encodestring('%s:%s' % (username, password))
674        return {'Content-Type': 'application/json',
675                'Authorization': 'Basic %s' % authorization,
676                'Accept': '*/*'}
677
678    #authorization must be a base64 string of username:password
679    def _create_headers(self):
680        authorization = base64.encodestring('%s:%s' % (self.username, self.password))
681        return {'Content-Type': 'application/x-www-form-urlencoded',
682                'Authorization': 'Basic %s' % authorization,
683                'Accept': '*/*'}
684
685    def _http_request(self, api, method='GET', params='', headers=None, timeout=120):
686        if not headers:
687            headers = self._create_headers()
688        end_time = time.time() + timeout
689        while True:
690            try:
691                response, content = httplib2.Http(timeout=timeout).request(api, method, params, headers)
692                if response['status'] in ['200', '201', '202']:
693                    return True, content, response
694                else:
695                    try:
696                        json_parsed = json.loads(content)
697                    except ValueError as e:
698                        json_parsed = {}
699                        json_parsed["error"] = "status: {0}, content: {1}".format(response['status'], content)
700                    reason = "unknown"
701                    if "error" in json_parsed:
702                        reason = json_parsed["error"]
703                    log.error('{0} error {1} reason: {2} {3}'.format(api, response['status'], reason, content.rstrip('\n')))
704                    return False, content, response
705            except socket.error as e:
706                log.error("socket error while connecting to {0} error {1} ".format(api, e))
707                if time.time() > end_time:
708                    raise ServerUnavailableException(ip=self.ip)
709            except httplib2.ServerNotFoundError as e:
710                log.error("ServerNotFoundError error while connecting to {0} error {1} ".format(api, e))
711                if time.time() > end_time:
712                    raise ServerUnavailableException(ip=self.ip)
713            time.sleep(1)
714
715    def init_cluster(self, username='Administrator', password='password', port='8091'):
716        api = self.baseUrl + 'settings/web'
717        params = urllib.urlencode({'port': port,
718                                   'username': username,
719                                   'password': password})
720        log.info('settings/web params on {0}:{1}:{2}'.format(self.ip, self.port, params))
721        status, content, header = self._http_request(api, 'POST', params)
722        return status
723
724    def get_cluster_settings(self):
725        settings = {}
726        api = self.baseUrl + 'settings/web'
727        status, content, header = self._http_request(api, 'GET')
728        if status:
729            settings = json.loads(content)
730        log.info('settings/web params on {0}:{1}:{2}'.format(self.ip, self.port, settings))
731        return settings
732
733    def init_cluster_memoryQuota(self, username='Administrator',
734                                 password='password',
735                                 memoryQuota=256):
736        api = self.baseUrl + 'pools/default'
737        params = urllib.urlencode({'memoryQuota': memoryQuota,
738                                   'username': username,
739                                   'password': password})
740        log.info('pools/default params : {0}'.format(params))
741        status, content, header = self._http_request(api, 'POST', params)
742        return status
743
744    def get_cluster_ceritificate(self):
745        api = self.baseUrl + 'pools/default/certificate'
746        status, content, _ = self._http_request(api, 'GET')
747        if status:
748            return content
749        else:
750            log.error("/poos/default/certificate status:{0},content:{1}".format(status, content))
751            raise Exception("certificate API failed")
752
753    def regenerate_cluster_certificate(self):
754        api = self.baseUrl + 'controller/regenerateCertificate'
755        status, content, _ = self._http_request(api, 'POST')
756        if status:
757            return content
758        else:
759            log.error("controller/regenerateCertificate status:{0},content:{1}".format(status, content))
760            raise Exception("regenerateCertificate API failed")
761
762    def __remote_clusters(self, api, op, remoteIp, remotePort, username, password, name, demandEncryption=0, certificate=''):
763        param_map = {'hostname': "{0}:{1}".format(remoteIp, remotePort),
764                        'username': username,
765                        'password': password,
766                        'name':name}
767        if demandEncryption:
768            param_map ['demandEncryption'] = 'on'
769            param_map['certificate'] = certificate
770        params = urllib.urlencode(param_map)
771        status, content, _ = self._http_request(api, 'POST', params)
772        #sample response :
773        # [{"name":"two","uri":"/pools/default/remoteClusters/two","validateURI":"/pools/default/remoteClusters/two?just_validate=1","hostname":"127.0.0.1:9002","username":"Administrator"}]
774        if status:
775            remoteCluster = json.loads(content)
776        else:
777            log.error("/remoteCluster failed : status:{0},content:{1}".format(status, content))
778            raise Exception("remoteCluster API '{0} remote cluster' failed".format(op))
779        return remoteCluster
780
781    def add_remote_cluster(self, remoteIp, remotePort, username, password, name, demandEncryption=0, certificate=''):
782        #example : password:password username:Administrator hostname:127.0.0.1:9002 name:two
783        msg = "adding remote cluster hostname:{0}:{1} with username:password {2}:{3} name:{4} to source node: {5}:{6}"
784        log.info(msg.format(remoteIp, remotePort, username, password, name, self.ip, self.port))
785        api = self.baseUrl + 'pools/default/remoteClusters'
786        self.__remote_clusters(api, 'add', remoteIp, remotePort, username, password, name, demandEncryption, certificate)
787
788    def modify_remote_cluster(self, remoteIp, remotePort, username, password, name, demandEncryption=0, certificate=''):
789        log.info("modifying remote cluster name:{0}".format(name))
790        api = self.baseUrl + 'pools/default/remoteClusters/' + urllib.quote(name)
791        self.__remote_clusters(api, 'modify', remoteIp, remotePort, username, password, name, demandEncryption, certificate)
792
793    def get_remote_clusters(self):
794        remote_clusters = []
795        api = self.baseUrl + 'pools/default/remoteClusters/'
796        params = urllib.urlencode({})
797        status, content, header = self._http_request(api, 'GET', params)
798        if status:
799            remote_clusters = json.loads(content)
800        return remote_clusters
801
802    def remove_all_remote_clusters(self):
803        remote_clusters = self.get_remote_clusters()
804        for remote_cluster in remote_clusters:
805            if remote_cluster["deleted"] == False:
806                self.remove_remote_cluster(remote_cluster["name"])
807
808    def remove_remote_cluster(self, name):
809        #example : name:two
810        msg = "removing remote cluster name:{0}".format(urllib.quote(name))
811        log.info(msg)
812        api = self.baseUrl + 'pools/default/remoteClusters/{0}'.format(urllib.quote(name))
813        params = urllib.urlencode({})
814        status, content, header = self._http_request(api, 'DELETE', params)
815        #sample response :
816        # [{"name":"two","uri":"/pools/default/remoteClusters/two","validateURI":"/pools/default/remoteClusters/two?just_validate=1","hostname":"127.0.0.1:9002","username":"Administrator"}]
817        if status:
818            json_parsed = json.loads(content)
819        else:
820            log.error("failed to remove remote cluster: status:{0},content:{1}".format(status, content))
821            raise Exception("remoteCluster API 'remove cluster' failed")
822        return json_parsed
823
824
825    #replicationType:continuous toBucket:default toCluster:two fromBucket:default
826    def start_replication(self, replicationType, fromBucket, toCluster, rep_type="xmem", toBucket=None):
827        toBucket = toBucket or fromBucket
828
829        msg = "starting {0} replication type:{1} from {2} to {3} in the remote cluster {4}"
830        log.info(msg.format(replicationType, rep_type, fromBucket, toBucket, toCluster))
831        api = self.baseUrl + 'controller/createReplication'
832        param_map = {'replicationType': replicationType,
833                     'toBucket': toBucket,
834                     'fromBucket': fromBucket,
835                     'toCluster': toCluster,
836                     'type': rep_type}
837        params = urllib.urlencode(param_map)
838        status, content, _ = self._http_request(api, 'POST', params)
839        #response : {"database":"http://127.0.0.1:9500/_replicator",
840        # "id": "replication_id"}
841        if status:
842            json_parsed = json.loads(content)
843            return (json_parsed['database'], json_parsed['id'])
844        else:
845            log.error("/controller/createReplication failed : status:{0},content:{1}".format(status, content))
846            raise Exception("create replication failed : status:{0},content:{1}".format(status, content))
847
848    def get_replications(self):
849        replications = []
850        content = self.ns_server_tasks()
851        for item in content:
852            if item["type"] == "xdcr":
853                replications.append(item)
854        return replications
855
856    def remove_all_replications(self):
857        replications = self.get_replications()
858        for replication in replications:
859            self.stop_replication(replication["cancelURI"])
860
861    def stop_replication(self, uri):
862        api = self.baseUrl + uri
863        self._http_request(api, 'DELETE')
864
865    def remove_all_recoveries(self):
866        recoveries = []
867        content = self.ns_server_tasks()
868        for item in content:
869            if item["type"] == "recovery":
870                recoveries.append(item)
871        for recovery in recoveries:
872            api = self.baseUrl + recovery["stopURI"]
873            status, content, header = self._http_request(api, 'POST')
874            if not status:
875                raise CBRecoveryFailedException("impossible to stop cbrecovery by {0}".format(api))
876            log.info("recovery stopped by {0}".format(api))
877
878
879    #params serverIp : the server to add to this cluster
880    #raises exceptions when
881    #unauthorized user
882    #server unreachable
883    #can't add the node to itself ( TODO )
884    #server already added
885    #returns otpNode
886    def add_node(self, user='', password='', remoteIp='', port='8091', zone_name='', services = None):
887        otpNode = None
888        log.info('adding remote node @{0}:{1} to this cluster @{2}:{3}'\
889                          .format(remoteIp, port, self.ip, self.port))
890        if zone_name == '':
891            api = self.baseUrl + 'controller/addNode'
892        else:
893            api = self.baseUrl + 'pools/default/serverGroups'
894            if self.is_zone_exist(zone_name):
895                zones = self.get_zone_names()
896                api = "/".join((api, zones[zone_name], "addNode"))
897                log.info("node {0} will be added to zone {1}".format(remoteIp, zone_name))
898            else:
899                raise Exception("There is not zone with name: %s in cluster" % zone_name)
900
901        params = urllib.urlencode({'hostname': "{0}:{1}".format(remoteIp, port),
902                                   'user': user,
903                                   'password': password})
904        if services != None:
905            services = ','.join(services)
906            params = urllib.urlencode({'hostname': "{0}:{1}".format(remoteIp, port),
907                                   'user': user,
908                                   'password': password,
909                                   'services': services})
910        status, content, header = self._http_request(api, 'POST', params)
911        if status:
912            json_parsed = json.loads(content)
913            otpNodeId = json_parsed['otpNode']
914            otpNode = OtpNode(otpNodeId)
915            if otpNode.ip == '127.0.0.1':
916                otpNode.ip = self.ip
917        else:
918            self.print_UI_logs()
919            try:
920                #print logs from node that we want to add
921                wanted_node = deepcopy(self)
922                wanted_node.ip = remoteIp
923                wanted_node.print_UI_logs()
924            except Exception, ex:
925                self.log(ex)
926            if content.find('Prepare join failed. Node is already part of cluster') >= 0:
927                raise ServerAlreadyJoinedException(nodeIp=self.ip,
928                                                   remoteIp=remoteIp)
929            elif content.find('Prepare join failed. Joining node to itself is not allowed') >= 0:
930                raise ServerSelfJoinException(nodeIp=self.ip,
931                                          remoteIp=remoteIp)
932            else:
933                log.error('add_node error : {0}'.format(content))
934                raise AddNodeException(nodeIp=self.ip,
935                                          remoteIp=remoteIp,
936                                          reason=content)
937        return otpNode
938
939        #params serverIp : the server to add to this cluster
940    #raises exceptions when
941    #unauthorized user
942    #server unreachable
943    #can't add the node to itself ( TODO )
944    #server already added
945    #returns otpNode
946    def do_join_cluster(self, user='', password='', remoteIp='', port='8091', zone_name='', services = None):
947        otpNode = None
948        log.info('adding remote node @{0}:{1} to this cluster @{2}:{3}'\
949                          .format(remoteIp, port, self.ip, self.port))
950        api = self.baseUrl + '/node/controller/doJoinCluster'
951        params = urllib.urlencode({'hostname': "{0}:{1}".format(remoteIp, port),
952                                   'user': user,
953                                   'password': password})
954        if services != None:
955            services = ','.join(services)
956            params = urllib.urlencode({'hostname': "{0}:{1}".format(remoteIp, port),
957                                   'user': user,
958                                   'password': password,
959                                   'services': services})
960        status, content, header = self._http_request(api, 'POST', params)
961        if status:
962            json_parsed = json.loads(content)
963            otpNodeId = json_parsed['otpNode']
964            otpNode = OtpNode(otpNodeId)
965            if otpNode.ip == '127.0.0.1':
966                otpNode.ip = self.ip
967        else:
968            self.print_UI_logs()
969            try:
970                #print logs from node that we want to add
971                wanted_node = deepcopy(self)
972                wanted_node.ip = remoteIp
973                wanted_node.print_UI_logs()
974            except Exception, ex:
975                self.log(ex)
976            if content.find('Prepare join failed. Node is already part of cluster') >= 0:
977                raise ServerAlreadyJoinedException(nodeIp=self.ip,
978                                                   remoteIp=remoteIp)
979            elif content.find('Prepare join failed. Joining node to itself is not allowed') >= 0:
980                raise ServerSelfJoinException(nodeIp=self.ip,
981                                          remoteIp=remoteIp)
982            else:
983                log.error('add_node error : {0}'.format(content))
984                raise AddNodeException(nodeIp=self.ip,
985                                          remoteIp=remoteIp,
986                                          reason=content)
987        return otpNode
988
989
990    def eject_node(self, user='', password='', otpNode=None):
991        if not otpNode:
992            log.error('otpNode parameter required')
993            return False
994        api = self.baseUrl + 'controller/ejectNode'
995        params = urllib.urlencode({'otpNode': otpNode,
996                                   'user': user,
997                                   'password': password})
998        status, content, header = self._http_request(api, 'POST', params)
999        if status:
1000            log.info('ejectNode successful')
1001        else:
1002            if content.find('Prepare join failed. Node is already part of cluster') >= 0:
1003                raise ServerAlreadyJoinedException(nodeIp=self.ip,
1004                                                   remoteIp=otpNode)
1005            else:
1006                # TODO : raise an exception here
1007                log.error('eject_node error {0}'.format(content))
1008        return True
1009
1010    def force_eject_node(self):
1011        self.diag_eval("gen_server:cast(ns_cluster, leave).")
1012        self.check_delay_restart_coucbase_server()
1013
1014    """ when we do reset couchbase server by force reject, couchbase server will not
1015        down right away but delay few seconds to be down depend on server spec.
1016        This fx will detect that delay and return true when couchbase server down and
1017        up again after force reject """
1018    def check_delay_restart_coucbase_server(self):
1019        api = self.baseUrl + 'nodes/self'
1020        headers = self._create_headers()
1021        break_out = 0
1022        count_cbserver_up = 0
1023        while break_out < 60 and count_cbserver_up < 2:
1024            try:
1025                response, content = httplib2.Http(timeout=120).request(api, 'GET', '', headers)
1026                if response['status'] in ['200', '201', '202'] and count_cbserver_up == 0:
1027                    log.info("couchbase server is up but down soon.")
1028                    time.sleep(1)
1029                    break_out += 1  # time needed for couchbase server reload after reset config
1030                elif response['status'] in ['200', '201', '202']:
1031                    count_cbserver_up = 2
1032                    log.info("couchbase server is up again")
1033            except socket.error as e:
1034                log.info("couchbase server is down.  Waiting for couchbase server up")
1035                time.sleep(2)
1036                break_out += 1
1037                count_cbserver_up = 1
1038                pass
1039        if break_out >= 60:
1040            raise Exception("Couchbase server did not start after 120 seconds")
1041
1042    def fail_over(self, otpNode=None, graceful=False):
1043        if otpNode is None:
1044            log.error('otpNode parameter required')
1045            return False
1046        api = self.baseUrl + 'controller/failOver'
1047        if graceful:
1048            api = self.baseUrl + 'controller/startGracefulFailover'
1049        params = urllib.urlencode({'otpNode': otpNode})
1050        status, content, header = self._http_request(api, 'POST', params)
1051        if status:
1052            log.info('fail_over node {0} successful'.format(otpNode))
1053        else:
1054            log.error('fail_over node {0} error : {1}'.format(otpNode, content))
1055            raise FailoverFailedException(content)
1056        return status
1057
1058    def set_recovery_type(self, otpNode=None, recoveryType=None):
1059        log.info("Going to set recoveryType={0} for node :: {1}".format(recoveryType, otpNode))
1060        if otpNode == None:
1061            log.error('otpNode parameter required')
1062            return False
1063        if recoveryType == None:
1064            log.error('recoveryType is not set')
1065            return False
1066        api = self.baseUrl + 'controller/setRecoveryType'
1067        params = urllib.urlencode({'otpNode': otpNode,
1068                                   'recoveryType': recoveryType})
1069        status, content, header = self._http_request(api, 'POST', params)
1070        if status:
1071            log.info('recoveryType for node {0} set successful'.format(otpNode))
1072        else:
1073            log.error('recoveryType node {0} not set with error : {1}'.format(otpNode, content))
1074            raise SetRecoveryTypeFailed(content)
1075        return status
1076
1077    def add_back_node(self, otpNode=None):
1078        if otpNode is None:
1079            log.error('otpNode parameter required')
1080            return False
1081        api = self.baseUrl + 'controller/reAddNode'
1082        params = urllib.urlencode({'otpNode': otpNode})
1083        status, content, header = self._http_request(api, 'POST', params)
1084        if status:
1085            log.info('add_back_node {0} successful'.format(otpNode))
1086        else:
1087            log.error('add_back_node {0} error : {1}'.format(otpNode, content))
1088            raise InvalidArgumentException('controller/reAddNode',
1089                                           parameters=params)
1090        return status
1091
1092    def rebalance(self, otpNodes=[], ejectedNodes=[], deltaRecoveryBuckets=None):
1093        knownNodes = ','.join(otpNodes)
1094        ejectedNodesString = ','.join(ejectedNodes)
1095        if deltaRecoveryBuckets == None:
1096            params = urllib.urlencode({'knownNodes': knownNodes,
1097                                    'ejectedNodes': ejectedNodesString,
1098                                    'user': self.username,
1099                                    'password': self.password})
1100        else:
1101            deltaRecoveryBuckets = ",".join(deltaRecoveryBuckets)
1102            params = urllib.urlencode({'knownNodes': knownNodes,
1103                                    'ejectedNodes': ejectedNodesString,
1104                                    'deltaRecoveryBuckets': deltaRecoveryBuckets,
1105                                    'user': self.username,
1106                                    'password': self.password})
1107        log.info('rebalance params : {0}'.format(params))
1108        api = self.baseUrl + "controller/rebalance"
1109        status, content, header = self._http_request(api, 'POST', params)
1110        if status:
1111            log.info('rebalance operation started')
1112        else:
1113            log.error('rebalance operation failed: {0}'.format(content))
1114            #extract the error
1115            raise InvalidArgumentException('controller/rebalance with error message {0}'.format(content),
1116                                           parameters=params)
1117        return status
1118
1119    def diag_eval(self, code):
1120        api = '{0}{1}'.format(self.baseUrl, 'diag/eval/')
1121        status, content, header = self._http_request(api, "POST", code)
1122        log.info("/diag/eval status on {0}:{1}: {2} content: {3} command: {4}".
1123                 format(self.ip, self.port, status, content, code))
1124        return status, content
1125
1126    def set_chk_max_items(self, max_items):
1127        status, content = self.diag_eval("ns_config:set(chk_max_items, " + str(max_items) + ")")
1128        return status, content
1129
1130    def set_chk_period(self, period):
1131        status, content = self.diag_eval("ns_config:set(chk_period, " + str(period) + ")")
1132        return status, content
1133
1134    def set_enable_flow_control(self, flow=True, bucket='default'):
1135        flow_control = "false"
1136        if flow:
1137           flow_control = "true"
1138        code = "ns_bucket:update_bucket_props(\"" + bucket + "\", [{extra_config_string, \"upr_enable_flow_control=" + flow_control + "\"}])"
1139        status, content = self.diag_eval(code)
1140        return status, content
1141
1142    def diag_master_events(self):
1143        api = '{0}{1}'.format(self.baseUrl, 'diag/masterEvents?o=1')
1144        status, content, header = self._http_request(api, "GET")
1145        log.info("diag/masterEvents?o=1 status: {0} content: {1}".format(status, content))
1146        return status, content
1147
1148    def monitorRebalance(self, stop_if_loop=True):
1149        start = time.time()
1150        progress = 0
1151        retry = 0
1152        same_progress_count = 0
1153        previous_progress = 0
1154        while progress != -1 and (progress != 100 or self._rebalance_progress_status() == 'running') and retry < 20:
1155            #-1 is error , -100 means could not retrieve progress
1156            progress = self._rebalance_progress()
1157            if progress == -100:
1158                log.error("unable to retrieve rebalanceProgress.try again in 1 second")
1159                retry += 1
1160            else:
1161                retry = 0
1162            if stop_if_loop:
1163                #reset same_progress_count if get a different result, or progress is still O
1164                #(it may take a long time until the results are different from 0)
1165                if previous_progress != progress or progress == 0:
1166                    previous_progress = progress
1167                    same_progress_count = 0
1168                else:
1169                    same_progress_count += 1
1170                if same_progress_count > 50:
1171                    log.error("apparently rebalance progress code in infinite loop: {0}".format(progress))
1172                    return False
1173            #sleep for 5 seconds
1174            time.sleep(5)
1175        if progress < 0:
1176            log.error("rebalance progress code : {0}".format(progress))
1177            return False
1178        else:
1179            duration = time.time() - start
1180            if duration > 10:
1181                sleep = 10
1182            else:
1183                sleep = duration
1184            log.info('rebalance progress took {0} seconds '.format(duration))
1185            log.info("sleep for {0} seconds after rebalance...".format(sleep))
1186            time.sleep(sleep)
1187            return True
1188
1189    def _rebalance_progress_status(self):
1190        api = self.baseUrl + "pools/default/rebalanceProgress"
1191        status, content, header = self._http_request(api)
1192        json_parsed = json.loads(content)
1193        if status:
1194            if "status" in json_parsed:
1195                return json_parsed['status']
1196        else:
1197            return None
1198
1199    def _rebalance_progress(self):
1200        avg_percentage = -1
1201        api = self.baseUrl + "pools/default/rebalanceProgress"
1202        try:
1203            status, content, header = self._http_request(api)
1204        except ServerUnavailableException as e:
1205            log.error(e)
1206            return -100
1207        json_parsed = json.loads(content)
1208        if status:
1209            if "status" in json_parsed:
1210                if "errorMessage" in json_parsed:
1211                    msg = '{0} - rebalance failed'.format(json_parsed)
1212                    log.error(msg)
1213                    self.print_UI_logs()
1214                    raise RebalanceFailedException(msg)
1215                elif json_parsed["status"] == "running":
1216                    total_percentage = 0
1217                    count = 0
1218                    for key in json_parsed:
1219                        if key.find('@') >= 0:
1220                            ns_1_dictionary = json_parsed[key]
1221                            percentage = ns_1_dictionary['progress'] * 100
1222                            count += 1
1223                            total_percentage += percentage
1224                    if count:
1225                        avg_percentage = (total_percentage / count)
1226                    else:
1227                        avg_percentage = 0
1228                    log.info('rebalance percentage : {0:.02f} %'.
1229                             format(round(avg_percentage, 2)))
1230                else:
1231                    avg_percentage = 100
1232        else:
1233            avg_percentage = -100
1234        return avg_percentage
1235
1236
1237    def log_client_error(self, post):
1238        api = self.baseUrl + 'logClientError'
1239        status, content, header = self._http_request(api, 'POST', post)
1240        if not status:
1241            log.error('unable to logClientError')
1242
1243    #returns node data for this host
1244    def get_nodes_self(self, timeout=120):
1245        node = None
1246        api = self.baseUrl + 'nodes/self'
1247        status, content, header = self._http_request(api, timeout=timeout)
1248        if status:
1249            json_parsed = json.loads(content)
1250            node = RestParser().parse_get_nodes_response(json_parsed)
1251        return node
1252
1253    def node_statuses(self, timeout=120):
1254        nodes = []
1255        api = self.baseUrl + 'nodeStatuses'
1256        status, content, header = self._http_request(api, timeout=timeout)
1257        json_parsed = json.loads(content)
1258        if status:
1259            for key in json_parsed:
1260                #each key contain node info
1261                value = json_parsed[key]
1262                #get otp,get status
1263                node = OtpNode(id=value['otpNode'],
1264                               status=value['status'])
1265                if node.ip == '127.0.0.1':
1266                    node.ip = self.ip
1267                node.port = int(key[key.rfind(":") + 1:])
1268                node.replication = value['replication']
1269                if 'gracefulFailoverPossible' in value.keys():
1270                    node.gracefulFailoverPossible = value['gracefulFailoverPossible']
1271                else:
1272                    node.gracefulFailoverPossible = False
1273                nodes.append(node)
1274        return nodes
1275
1276    def cluster_status(self):
1277        parsed = {}
1278        api = self.baseUrl + 'pools/default'
1279        status, content, header = self._http_request(api)
1280        if status:
1281            parsed = json.loads(content)
1282        return parsed
1283
1284    def fetch_vbucket_map(self, bucket="default"):
1285        """Return vbucket map for bucket
1286        Keyword argument:
1287        bucket -- bucket name
1288        """
1289        api = self.baseUrl + 'pools/default/buckets/' + bucket
1290        status, content, header = self._http_request(api)
1291        _stats = json.loads(content)
1292        return _stats['vBucketServerMap']['vBucketMap']
1293
1294    def get_vbucket_map_and_server_list(self, bucket="default"):
1295        """ Return server list, replica and vbuckets map
1296        that matches to server list """
1297        vbucket_map = self.fetch_vbucket_map(bucket)
1298        api = self.baseUrl + 'pools/default/buckets/' + bucket
1299        status, content, header = self._http_request(api)
1300        _stats = json.loads(content)
1301        num_replica = _stats['vBucketServerMap']['numReplicas']
1302        vbucket_map = _stats['vBucketServerMap']['vBucketMap']
1303        servers = _stats['vBucketServerMap']['serverList']
1304        server_list = []
1305        for node in servers:
1306            node = node.split(":")
1307            server_list.append(node[0])
1308        return vbucket_map, server_list, num_replica
1309
1310    def get_pools_info(self):
1311        parsed = {}
1312        api = self.baseUrl + 'pools'
1313        status, content, header = self._http_request(api)
1314        json_parsed = json.loads(content)
1315        if status:
1316            parsed = json_parsed
1317        return parsed
1318
1319    def get_pools(self):
1320        version = None
1321        api = self.baseUrl + 'pools'
1322        status, content, header = self._http_request(api)
1323        json_parsed = json.loads(content)
1324        if status:
1325            version = MembaseServerVersion(json_parsed['implementationVersion'], json_parsed['componentsVersion'])
1326        return version
1327
1328    def get_buckets(self):
1329        #get all the buckets
1330        buckets = []
1331        api = '{0}{1}'.format(self.baseUrl, 'pools/default/buckets?basic_stats=true')
1332        status, content, header = self._http_request(api)
1333        json_parsed = json.loads(content)
1334        if status:
1335            for item in json_parsed:
1336                bucketInfo = RestParser().parse_get_bucket_json(item)
1337                buckets.append(bucketInfo)
1338        return buckets
1339
1340    def get_bucket_stats_for_node(self, bucket='default', node=None):
1341        if not node:
1342            log.error('node_ip not specified')
1343            return None
1344        stats = {}
1345        api = "{0}{1}{2}{3}{4}:{5}{6}".format(self.baseUrl, 'pools/default/buckets/',
1346                                     bucket, "/nodes/", node.ip, node.port, "/stats")
1347        status, content, header = self._http_request(api)
1348        if status:
1349            json_parsed = json.loads(content)
1350            op = json_parsed["op"]
1351            samples = op["samples"]
1352            for stat_name in samples:
1353                if len(samples[stat_name]) == 0:
1354                    stats[stat_name] = []
1355                else:
1356                    stats[stat_name] = samples[stat_name][-1]
1357        return stats
1358
1359    def fetch_bucket_stats(self, bucket='default', zoom='minute'):
1360        """Return deserialized buckets stats.
1361        Keyword argument:
1362        bucket -- bucket name
1363        zoom -- stats zoom level (minute | hour | day | week | month | year)
1364        """
1365        api = self.baseUrl + 'pools/default/buckets/{0}/stats?zoom={1}'.format(bucket, zoom)
1366        status, content, header = self._http_request(api)
1367        return json.loads(content)
1368
1369    def fetch_system_stats(self):
1370        """Return deserialized system stats."""
1371        api = self.baseUrl + 'pools/default/'
1372        status, content, header = self._http_request(api)
1373        return json.loads(content)
1374
1375    def get_xdc_queue_size(self, bucket):
1376        """Fetch bucket stats and return the latest value of XDC replication
1377        queue size"""
1378        bucket_stats = self.fetch_bucket_stats(bucket)
1379        return bucket_stats['op']['samples']['replication_changes_left'][-1]
1380
1381    def get_nodes(self):
1382        nodes = []
1383        api = self.baseUrl + 'pools/default'
1384        status, content, header = self._http_request(api)
1385        count = 0
1386        while not content and count < 7:
1387            log.info("sleep 5 seconds and retry")
1388            time.sleep(5)
1389            status, content, header = self._http_request(api)
1390            count += 1
1391        if count == 7:
1392            raise Exception("could not get node info after 30 seconds")
1393        json_parsed = json.loads(content)
1394        if status:
1395            if "nodes" in json_parsed:
1396                for json_node in json_parsed["nodes"]:
1397                    node = RestParser().parse_get_nodes_response(json_node)
1398                    node.rest_username = self.username
1399                    node.rest_password = self.password
1400                    if node.ip == "127.0.0.1":
1401                        node.ip = self.ip
1402                    # Only add nodes which are active on cluster
1403                    if node.clusterMembership == 'active':
1404                        nodes.append(node)
1405                    else:
1406                        log.info("Node {0} not part of cluster {1}".format(node.ip, node.clusterMembership))
1407        return nodes
1408
1409    # this method returns the number of node in cluster
1410    def get_cluster_size(self):
1411        nodes = self.get_nodes()
1412        node_ip = []
1413        for node in nodes:
1414            node_ip.append(node.ip)
1415        log.info("Number of node(s) in cluster is {0} node(s)".format(len(node_ip)))
1416        return len(node_ip)
1417
1418    # this method returns the versions of nodes in cluster
1419    def get_nodes_versions(self):
1420        nodes = self.get_nodes()
1421        versions = []
1422        for node in nodes:
1423            versions.append(node.version)
1424        log.info("Node versions in cluster {0}".format(versions))
1425        return versions
1426
1427
1428    def get_bucket_stats(self, bucket='default'):
1429        stats = {}
1430        status, json_parsed = self.get_bucket_stats_json(bucket)
1431        if status:
1432            op = json_parsed["op"]
1433            samples = op["samples"]
1434            for stat_name in samples:
1435                if samples[stat_name]:
1436                    last_sample = len(samples[stat_name]) - 1
1437                    if last_sample:
1438                        stats[stat_name] = samples[stat_name][last_sample]
1439        return stats
1440
1441    def get_bucket_stats_json(self, bucket='default'):
1442        stats = {}
1443        api = "{0}{1}{2}{3}".format(self.baseUrl, 'pools/default/buckets/', bucket, "/stats")
1444        if isinstance(bucket, Bucket):
1445            api = '{0}{1}{2}{3}'.format(self.baseUrl, 'pools/default/buckets/', bucket.name, "/stats")
1446        status, content, header = self._http_request(api)
1447        json_parsed = json.loads(content)
1448        return status, json_parsed
1449
1450    def get_bucket_json(self, bucket='default'):
1451        api = '{0}{1}{2}'.format(self.baseUrl, 'pools/default/buckets/', bucket)
1452        if isinstance(bucket, Bucket):
1453            api = '{0}{1}{2}'.format(self.baseUrl, 'pools/default/buckets/', bucket.name)
1454        status, content, header = self._http_request(api)
1455        if not status:
1456            raise GetBucketInfoFailed(bucket, content)
1457        return json.loads(content)
1458
1459    def get_bucket(self, bucket='default', num_attempt=1, timeout=1):
1460        bucketInfo = None
1461        api = '%s%s%s?basic_stats=true' % (self.baseUrl, 'pools/default/buckets/', bucket)
1462        if isinstance(bucket, Bucket):
1463            api = '%s%s%s?basic_stats=true' % (self.baseUrl, 'pools/default/buckets/', bucket.name)
1464        status, content, header = self._http_request(api)
1465        num = 1
1466        while not status and num_attempt > num:
1467            log.error("try to get {0} again after {1} sec".format(api, timeout))
1468            time.sleep(timeout)
1469            status, content, header = self._http_request(api)
1470            num += 1
1471        if status:
1472            bucketInfo = RestParser().parse_get_bucket_response(content)
1473        return bucketInfo
1474
1475    def get_vbuckets(self, bucket='default'):
1476        b = self.get_bucket(bucket)
1477        return None if not b else b.vbuckets
1478
1479    def delete_bucket(self, bucket='default'):
1480        api = '%s%s%s' % (self.baseUrl, 'pools/default/buckets/', bucket)
1481        if isinstance(bucket, Bucket):
1482            api = '%s%s%s' % (self.baseUrl, 'pools/default/buckets/', bucket.name)
1483        status, content, header = self._http_request(api, 'DELETE')
1484
1485        if int(header['status']) == 500:
1486            # According to http://docs.couchbase.com/couchbase-manual-2.5/cb-rest-api/#deleting-buckets
1487            # the cluster will return with 500 if it failed to nuke
1488            # the bucket on all of the nodes within 30 secs
1489            log.warn("Bucket deletion timed out waiting for all nodes")
1490
1491        return status
1492
1493    # figure out the proxy port
1494    def create_bucket(self, bucket='',
1495                      ramQuotaMB=1,
1496                      authType='none',
1497                      saslPassword='',
1498                      replicaNumber=1,
1499                      proxyPort=11211,
1500                      bucketType='membase',
1501                      replica_index=1,
1502                      threadsNumber=3,
1503                      flushEnabled=1,
1504                      evictionPolicy='valueOnly'):
1505        api = '{0}{1}'.format(self.baseUrl, 'pools/default/buckets')
1506        params = urllib.urlencode({})
1507
1508        #this only works for default bucket ?
1509        if bucket == 'default':
1510            params = urllib.urlencode({'name': bucket,
1511                                       'authType': 'sasl',
1512                                       'saslPassword': saslPassword,
1513                                       'ramQuotaMB': ramQuotaMB,
1514                                       'replicaNumber': replicaNumber,
1515                                       'proxyPort': proxyPort,
1516                                       'bucketType': bucketType,
1517                                       'replicaIndex': replica_index,
1518                                       'threadsNumber': threadsNumber,
1519                                       'flushEnabled': flushEnabled,
1520                                        'evictionPolicy': evictionPolicy})
1521        elif authType == 'none':
1522            params = urllib.urlencode({'name': bucket,
1523                                       'ramQuotaMB': ramQuotaMB,
1524                                       'authType': authType,
1525                                       'replicaNumber': replicaNumber,
1526                                       'proxyPort': proxyPort,
1527                                       'bucketType': bucketType,
1528                                       'replicaIndex': replica_index,
1529                                       'threadsNumber': threadsNumber,
1530                                       'flushEnabled': flushEnabled,
1531                                       'evictionPolicy': evictionPolicy})
1532        elif authType == 'sasl':
1533            params = urllib.urlencode({'name': bucket,
1534                                       'ramQuotaMB': ramQuotaMB,
1535                                       'authType': authType,
1536                                       'saslPassword': saslPassword,
1537                                       'replicaNumber': replicaNumber,
1538                                       'proxyPort': self.get_nodes_self().moxi,
1539                                       'bucketType': bucketType,
1540                                       'replicaIndex': replica_index,
1541                                       'threadsNumber': threadsNumber,
1542                                       'flushEnabled': flushEnabled,
1543                                       'evictionPolicy': evictionPolicy})
1544        log.info("{0} with param: {1}".format(api, params))
1545        create_start_time = time.time()
1546
1547        maxwait = 60
1548        for numsleep in range(maxwait):
1549            status, content, header = self._http_request(api, 'POST', params)
1550            if status:
1551                break
1552            elif (int(header['status']) == 503 and
1553                    '{"_":"Bucket with given name still exists"}' in content):
1554                log.info("The bucket still exists, sleep 1 sec and retry")
1555                time.sleep(1)
1556            else:
1557                raise BucketCreationException(ip=self.ip, bucket_name=bucket)
1558
1559        if (numsleep + 1) == maxwait:
1560            log.error("Tried to create the bucket for {0} secs.. giving up".
1561                      format(maxwait))
1562            raise BucketCreationException(ip=self.ip, bucket_name=bucket)
1563
1564        create_time = time.time() - create_start_time
1565        log.info("{0:.02f} seconds to create bucket {1}".
1566                 format(round(create_time, 2), bucket))
1567        return status
1568
1569    def change_bucket_props(self, bucket,
1570                      ramQuotaMB=None,
1571                      authType=None,
1572                      saslPassword=None,
1573                      replicaNumber=None,
1574                      proxyPort=None,
1575                      replicaIndex=None,
1576                      flushEnabled=None):
1577        api = '{0}{1}{2}'.format(self.baseUrl, 'pools/default/buckets/', bucket)
1578        if isinstance(bucket, Bucket):
1579            api = '{0}{1}{2}'.format(self.baseUrl, 'pools/default/buckets/', bucket.name)
1580        params = urllib.urlencode({})
1581        params_dict = {}
1582        existing_bucket = self.get_bucket_json(bucket)
1583        if ramQuotaMB:
1584            params_dict["ramQuotaMB"] = ramQuotaMB
1585        if authType:
1586            params_dict["authType"] = authType
1587        if saslPassword:
1588            params_dict["authType"] = "sasl"
1589            params_dict["saslPassword"] = saslPassword
1590        if replicaNumber:
1591            params_dict["replicaNumber"] = replicaNumber
1592        if proxyPort:
1593            params_dict["proxyPort"] = proxyPort
1594        if replicaIndex:
1595            params_dict["replicaIndex"] = replicaIndex
1596        if flushEnabled:
1597            params_dict["flushEnabled"] = flushEnabled
1598        params = urllib.urlencode(params_dict)
1599
1600        log.info("%s with param: %s" % (api, params))
1601        status, content, header = self._http_request(api, 'POST', params)
1602        if not status:
1603            raise Exception("Unable to set bucket settings %s for bucket" % (params, bucket))
1604        log.info("bucket %s updated" % bucket)
1605        return status
1606
1607    #return AutoFailoverSettings
1608    def get_autofailover_settings(self):
1609        settings = None
1610        api = self.baseUrl + 'settings/autoFailover'
1611        status, content, header = self._http_request(api)
1612        json_parsed = json.loads(content)
1613        if status:
1614            settings = AutoFailoverSettings()
1615            settings.enabled = json_parsed["enabled"]
1616            settings.count = json_parsed["count"]
1617            settings.timeout = json_parsed["timeout"]
1618        return settings
1619
1620    def update_autofailover_settings(self, enabled, timeout):
1621        if enabled:
1622            params = urllib.urlencode({'enabled': 'true',
1623                                       'timeout': timeout})
1624        else:
1625            params = urllib.urlencode({'enabled': 'false',
1626                                       'timeout': timeout})
1627        api = self.baseUrl + 'settings/autoFailover'
1628        log.info('settings/autoFailover params : {0}'.format(params))
1629        status, content, header = self._http_request(api, 'POST', params)
1630        if not status:
1631            log.error('''failed to change autofailover_settings!
1632                         See MB-7282. Workaround:
1633                         wget --user=Administrator --password=asdasd --post-data='rpc:call(mb_master:master_node(), erlang, apply ,[fun () -> erlang:exit(erlang:whereis(mb_master), kill) end, []]).' http://localhost:8091/diag/eval''')
1634        return status
1635
1636    def reset_autofailover(self):
1637        api = self.baseUrl + 'settings/autoFailover/resetCount'
1638        status, content, header = self._http_request(api, 'POST', '')
1639        return status
1640
1641    def set_alerts_settings(self, recipients, sender, email_username, email_password, email_host='localhost', email_port=25, email_encrypt='false', alerts='auto_failover_node,auto_failover_maximum_reached'):
1642        api = self.baseUrl + 'settings/alerts'
1643        params = urllib.urlencode({'enabled': 'true',
1644                                   'recipients': recipients,
1645                                   'sender': sender,
1646                                   'emailUser': email_username,
1647                                   'emailPass': email_password,
1648                                   'emailHost': email_host,
1649                                   'emailPrt': email_port,
1650                                   'emailEncrypt': email_encrypt,
1651                                   'alerts': alerts})
1652        log.info('settings/alerts params : {0}'.format(params))
1653        status, content, header = self._http_request(api, 'POST', params)
1654        return status
1655
1656    def get_alerts_settings(self):
1657        api = self.baseUrl + 'settings/alerts'
1658        status, content, header = self._http_request(api)
1659        json_parsed = json.loads(content)
1660        if not status:
1661            raise Exception("unable to get autofailover alerts settings")
1662        return json_parsed
1663
1664    def disable_alerts(self):
1665        api = self.baseUrl + 'settings/alerts'
1666        params = urllib.urlencode({'enabled': 'false'})
1667        log.info('settings/alerts params : {0}'.format(params))
1668        status, content, header = self._http_request(api, 'POST', params)
1669        return status
1670
1671    def stop_rebalance(self, wait_timeout=10):
1672        api = self.baseUrl + '/controller/stopRebalance'
1673        status, content, header = self._http_request(api, 'POST')
1674        if status:
1675            for i in xrange(wait_timeout):
1676                if self._rebalance_progress_status() == 'running':
1677                    log.warn("rebalance is not stopped yet after {0} sec".format(i + 1))
1678                    time.sleep(1)
1679                    status = False
1680                else:
1681                    log.info("rebalance was stopped")
1682                    status = True
1683                    break
1684        else:
1685            log.error("Rebalance is not stopped due to {0}".format(content))
1686        return status
1687
1688    def set_data_path(self, data_path=None, index_path=None):
1689        api = self.baseUrl + '/nodes/self/controller/settings'
1690        paths = {}
1691        if data_path:
1692            paths['path'] = data_path
1693        if index_path:
1694            paths['index_path'] = index_path
1695        if paths:
1696            params = urllib.urlencode(paths)
1697            log.info('/nodes/self/controller/settings params : {0}'.format(params))
1698            status, content, header = self._http_request(api, 'POST', params)
1699            if status:
1700                log.info("Setting data_path: {0}: status {1}".format(data_path, status))
1701            else:
1702                log.error("Unable to set data_path {0} : {1}".format(data_path, content))
1703            return status
1704
1705    def get_database_disk_size(self, bucket='default'):
1706        api = self.baseUrl + "pools/{0}/buckets".format(bucket)
1707        status, content, header = self._http_request(api)
1708        json_parsed = json.loads(content)
1709        # disk_size in MB
1710        disk_size = (json_parsed[0]["basicStats"]["diskUsed"]) / (1024 * 1024)
1711        return status, disk_size
1712
1713    def ddoc_compaction(self, design_doc_id, bucket="default"):
1714        api = self.baseUrl + "pools/default/buckets/%s/ddocs/%s/controller/compactView" % \
1715            (bucket, design_doc_id)
1716        status, content, header = self._http_request(api, 'POST')
1717        if not status:
1718            raise CompactViewFailed(design_doc_id, content)
1719        log.info("compaction for ddoc '%s' was triggered" % design_doc_id)
1720
1721    def check_compaction_status(self, bucket_name):
1722        tasks = self.active_tasks()
1723        if "error" in tasks:
1724            raise Exception(tasks)
1725        for task in tasks:
1726            log.info("Task is {0}".format(task))
1727            if task["type"] == "bucket_compaction":
1728                if task["bucket"] == bucket_name:
1729                    return True, task["progress"]
1730        return False, None
1731
1732    def change_memcached_t_option(self, value):
1733        cmd = '[ns_config:update_key({node, N, memcached}, fun (PList)' + \
1734              ' -> lists:keystore(verbosity, 1, PList, {verbosity, \'-t ' + str(value) + '\'}) end)' + \
1735              ' || N <- ns_node_disco:nodes_wanted()].'
1736        return self.diag_eval(cmd)
1737
1738    def set_ensure_full_commit(self, value):
1739        """Dynamic settings changes"""
1740        # the boolean paramter is used to turn on/off ensure_full_commit(). In XDCR,
1741        # issuing checkpoint in this function is expensive and not necessary in some
1742        # test, turning off this function would speed up some test. The default value
1743        # is ON.
1744        cmd = 'ns_config:set(ensure_full_commit_enabled, {0}).'.format(value)
1745        return self.diag_eval(cmd)
1746
1747    def get_internalSettings(self, param):
1748            """allows to get internalSettings values for:
1749            indexAwareRebalanceDisabled, rebalanceIndexWaitingDisabled,
1750            rebalanceIndexPausingDisabled, maxParallelIndexers,
1751            maxParallelReplicaIndexers, maxBucketCount"""
1752            api = self.baseUrl + "internalSettings"
1753            status, content, header = self._http_request(api)
1754            json_parsed = json.loads(content)
1755            param = json_parsed[param]
1756            return param
1757
1758    def set_internalSetting(self, param, value):
1759        "Set any internal setting"
1760        api = self.baseUrl + "internalSettings"
1761
1762        if isinstance(value, bool):
1763            value = str(value).lower()
1764
1765        params = urllib.urlencode({param : value})
1766        status, content, header = self._http_request(api, "POST", params)
1767        log.info('Update internal setting {0}={1}'.format(param, value))
1768        return status
1769
1770    def get_replication_for_buckets(self, src_bucket_name, dest_bucket_name):
1771        replications = self.get_replications()
1772        for replication in replications:
1773            if src_bucket_name in replication['source'] and \
1774                replication['target'].endswith(dest_bucket_name):
1775                return replication
1776        raise XDCRException("Replication with Src bucket: {0} and Target bucket: {1} not found".
1777                        format(src_bucket_name, dest_bucket_name))
1778
1779    """ By default, these are the global replication settings -
1780        { optimisticReplicationThreshold:256,
1781        workerBatchSize:500,
1782        failureRestartInterval:1,
1783        docBatchSizeKb":2048,
1784        checkpointInterval":1800,
1785        maxConcurrentReps":32}
1786        You can override these using set_xdcr_param()
1787    """
1788    def set_xdcr_param(self, src_bucket_name,
1789                                         dest_bucket_name, param, value):
1790        replication = self.get_replication_for_buckets(src_bucket_name, dest_bucket_name)
1791        api = self.baseUrl + replication['settingsURI']
1792        value = str(value).lower()
1793        params = urllib.urlencode({param: value})
1794        status, _, _ = self._http_request(api, "POST", params)
1795        if not status:
1796            raise XDCRException("Unable to set replication setting {0}={1} on bucket {2} on node {3}".
1797                            format(param, value, src_bucket_name, self.ip))
1798        log.info("Updated {0}={1} on bucket'{2}' on {3}".format(param, value, src_bucket_name, self.ip))
1799
1800    # Gets per-replication setting value
1801    def get_xdcr_param(self, src_bucket_name,
1802                                    dest_bucket_name, param):
1803        replication = self.get_replication_for_buckets(src_bucket_name, dest_bucket_name)
1804        api = self.baseUrl + replication['settingsURI']
1805        status, content, _ = self._http_request(api)
1806        if not status:
1807            raise XDCRException("Unable to get replication setting {0} on bucket {1} on node {2}".
1808                      format(param, src_bucket_name, self.ip))
1809        json_parsed = json.loads(content)
1810        # when per-replication settings match global(internal) settings,
1811        # the param is not returned by rest API
1812        # in such cases, return internalSetting value for the param
1813        try:
1814            return json_parsed[param]
1815        except KeyError:
1816            if param == 'pauseRequested':
1817                return False
1818            else:
1819                param = 'xdcr' + param[0].upper() + param[1:]
1820                log.info("Trying to fetch xdcr param:{0} from global settings".
1821                         format(param))
1822                return self.get_internalSettings(param)
1823
1824    # Returns a boolean value on whether replication
1825    def is_replication_paused(self, src_bucket_name, dest_bucket_name):
1826        return self.get_xdcr_param(src_bucket_name, dest_bucket_name, 'pauseRequested')
1827
1828    """ Enable master trace logging for xdcr
1829    wget -O- --post-data='ale:set_loglevel(xdcr_trace, debug).' http://Administrator:asdasd@127.0.0.1:8091/diag/eval"""
1830    def enable_xdcr_trace_logging(self):
1831        self.diag_eval('ale:set_loglevel(xdcr_trace, debug).')
1832
1833    def get_recent_xdcr_vb_ckpt(self, src_bucket_name):
1834        command = 'ns_server_testrunner_api:grab_all_xdcr_checkpoints("%s", 10).' % src_bucket_name
1835        status, content = self.diag_eval(command)
1836        if not status:
1837            raise Exception("Unable to get recent XDCR checkpoint information")
1838        json_parsed = json.loads(content)
1839        # a single decoding will only return checkpoint record as string
1840        # convert string to dict using json
1841        chkpt_doc_string = json_parsed.values()[0].replace('"', '\"')
1842        chkpt_dict = json.loads(chkpt_doc_string)
1843        return chkpt_dict
1844
1845    def set_reb_cons_view(self, disable):
1846        """Enable/disable consistent view for rebalance tasks"""
1847        api = self.baseUrl + "internalSettings"
1848        params = {"indexAwareRebalanceDisabled": str(disable).lower()}
1849        params = urllib.urlencode(params)
1850        status, content, header = self._http_request(api, "POST", params)
1851        log.info('Consistent-views during rebalance was set as indexAwareRebalanceDisabled={0}'\
1852                 .format(str(disable).lower()))
1853        return status
1854
1855    def set_reb_index_waiting(self, disable):
1856        """Enable/disable rebalance index waiting"""
1857        api = self.baseUrl + "internalSettings"
1858        params = {"rebalanceIndexWaitingDisabled": str(disable).lower()}
1859        params = urllib.urlencode(params)
1860        status, content, header = self._http_request(api, "POST", params)
1861        log.info('rebalance index waiting was set as rebalanceIndexWaitingDisabled={0}'\
1862                 .format(str(disable).lower()))
1863        return status
1864
1865    def set_rebalance_index_pausing(self, disable):
1866        """Enable/disable index pausing during rebalance"""
1867        api = self.baseUrl + "internalSettings"
1868        params = {"rebalanceIndexPausingDisabled": str(disable).lower()}
1869        params = urllib.urlencode(params)
1870        status, content, header = self._http_request(api, "POST", params)
1871        log.info('index pausing during rebalance was set as rebalanceIndexPausingDisabled={0}'\
1872                 .format(str(disable).lower()))
1873        return status
1874
1875    def set_max_parallel_indexers(self, count):
1876        """set max parallel indexer threads"""
1877        api = self.baseUrl + "internalSettings"
1878        params = {"maxParallelIndexers": count}
1879        params = urllib.urlencode(params)
1880        status, content, header = self._http_request(api, "POST", params)
1881        log.info('max parallel indexer threads was set as maxParallelIndexers={0}'.\
1882                 format(count))
1883        return status
1884
1885    def set_max_parallel_replica_indexers(self, count):
1886        """set max parallel replica indexers threads"""
1887        api = self.baseUrl + "internalSettings"
1888        params = {"maxParallelReplicaIndexers": count}
1889        params = urllib.urlencode(params)
1890        status, content, header = self._http_request(api, "POST", params)
1891        log.info('max parallel replica indexers threads was set as maxParallelReplicaIndexers={0}'.\
1892                 format(count))
1893        return status
1894
1895    def get_internal_replication_type(self):
1896        buckets = self.get_buckets()
1897        cmd = "\'{ok, BC} = ns_bucket:get_bucket(%s), ns_bucket:replication_type(BC).\'" % buckets[0].name
1898        return self.diag_eval(cmd)
1899
1900    def set_mc_threads(self, mc_threads=4):
1901        """
1902        Change number of memcached threads and restart the cluster
1903        """
1904        cmd = "[ns_config:update_key({node, N, memcached}, " \
1905              "fun (PList) -> lists:keystore(verbosity, 1, PList," \
1906              " {verbosity, \"-t %s\"}) end) " \
1907              "|| N <- ns_node_disco:nodes_wanted()]." % mc_threads
1908
1909        return self.diag_eval(cmd)
1910
1911    def set_auto_compaction(self, parallelDBAndVC="false",
1912                            dbFragmentThreshold=None,
1913                            viewFragmntThreshold=None,
1914                            dbFragmentThresholdPercentage=None,
1915                            viewFragmntThresholdPercentage=None,
1916                            allowedTimePeriodFromHour=None,
1917                            allowedTimePeriodFromMin=None,
1918                            allowedTimePeriodToHour=None,
1919                            allowedTimePeriodToMin=None,
1920                            allowedTimePeriodAbort=None,
1921                            bucket=None):
1922        """Reset compaction values to default, try with old fields (dp4 build)
1923        and then try with newer fields"""
1924        params = {}
1925        api = self.baseUrl
1926
1927        if bucket is None:
1928            # setting is cluster wide
1929            api = api + "controller/setAutoCompaction"
1930        else:
1931            # overriding per/bucket compaction setting
1932            api = api + "pools/default/buckets/" + bucket
1933            params["autoCompactionDefined"] = "true"
1934            # reuse current ram quota in mb per node
1935            num_nodes = len(self.node_statuses())
1936            bucket_info = self.get_bucket_json(bucket)
1937            quota = self.get_bucket_json(bucket)["quota"]["ram"] / (1048576 * num_nodes)
1938            params["ramQuotaMB"] = quota
1939            if bucket_info["authType"] == "sasl" and bucket_info["name"] != "default":
1940                params["authType"] = self.get_bucket_json(bucket)["authType"]
1941                params["saslPassword"] = self.get_bucket_json(bucket)["saslPassword"]
1942
1943        params["parallelDBAndViewCompaction"] = parallelDBAndVC
1944        # Need to verify None because the value could be = 0
1945        if dbFragmentThreshold is not None:
1946            params["databaseFragmentationThreshold[size]"] = dbFragmentThreshold
1947        if viewFragmntThreshold is not None:
1948            params["viewFragmentationThreshold[size]"] = viewFragmntThreshold
1949        if dbFragmentThresholdPercentage is not None:
1950            params["databaseFragmentationThreshold[percentage]"] = dbFragmentThresholdPercentage
1951        if viewFragmntThresholdPercentage is not None:
1952            params["viewFragmentationThreshold[percentage]"] = viewFragmntThresholdPercentage
1953        if allowedTimePeriodFromHour is not None:
1954            params["allowedTimePeriod[fromHour]"] = allowedTimePeriodFromHour
1955        if allowedTimePeriodFromMin is not None:
1956            params["allowedTimePeriod[fromMinute]"] = allowedTimePeriodFromMin
1957        if allowedTimePeriodToHour is not None:
1958            params["allowedTimePeriod[toHour]"] = allowedTimePeriodToHour
1959        if allowedTimePeriodToMin is not None:
1960            params["allowedTimePeriod[toMinute]"] = allowedTimePeriodToMin
1961        if allowedTimePeriodAbort is not None:
1962            params["allowedTimePeriod[abortOutside]"] = allowedTimePeriodAbort
1963
1964        params = urllib.urlencode(params)
1965        log.info("'%s' bucket's settings will be changed with parameters: %s" % (bucket, params))
1966        return self._http_request(api, "POST", params)
1967
1968    def set_global_loglevel(self, loglevel='error'):
1969        """Set cluster-wide logging level for core components
1970
1971        Possible loglevel:
1972            -- debug
1973            -- info
1974            -- warn
1975            -- error
1976        """
1977
1978        api = self.baseUrl + 'diag/eval'
1979        request_body = 'rpc:eval_everywhere(erlang, apply, [fun () -> \
1980                        [ale:set_loglevel(L, {0}) || L <- \
1981                        [ns_server, couchdb, user, menelaus, ns_doctor, stats, \
1982                        rebalance, cluster, views, stderr]] end, []]).'.format(loglevel)
1983        return self._http_request(api=api, method='POST', params=request_body,
1984                                  headers=self._create_headers())
1985
1986    def set_couchdb_option(self, section, option, value):
1987        """Dynamic settings changes"""
1988
1989        cmd = 'ns_config:set({{couchdb, {{{0}, {1}}}}}, {2}).'.format(section,
1990                                                                      option,
1991                                                                      value)
1992        return self.diag_eval(cmd)
1993
1994    def get_alerts(self):
1995        api = self.baseUrl + "pools/default/"
1996        status, content, header = self._http_request(api)
1997        json_parsed = json.loads(content)
1998        if status:
1999            if "alerts" in json_parsed:
2000                return json_parsed['alerts']
2001        else:
2002            return None
2003
2004    def flush_bucket(self, bucket="default"):
2005        if isinstance(bucket, Bucket):
2006            bucket_name = bucket.name
2007        else:
2008            bucket_name = bucket
2009        api = self.baseUrl + "pools/default/buckets/%s/controller/doFlush" % (bucket_name)
2010        status, content, header = self._http_request(api, 'POST')
2011        if not status:
2012            raise BucketFlushFailed(self.ip, bucket_name)
2013        log.info("Flush for bucket '%s' was triggered" % bucket_name)
2014
2015    def update_notifications(self, enable):
2016        api = self.baseUrl + 'settings/stats'
2017        params = urllib.urlencode({'sendStats' : enable})
2018        log.info('settings/stats params : {0}'.format(params))
2019        status, content, header = self._http_request(api, 'POST', params)
2020        return status
2021
2022    def get_notifications(self):
2023        api = self.baseUrl + 'settings/stats'
2024        status, content, header = self._http_request(api)
2025        json_parsed = json.loads(content)
2026        if status:
2027            return json_parsed["sendStats"]
2028        return None
2029
2030    def get_logs(self, last_n=10, contains_text=None):
2031        api = self.baseUrl + 'logs'
2032        status, content, header = self._http_request(api)
2033        json_parsed = json.loads(content)
2034        logs = json_parsed['list']
2035        logs.reverse()
2036        result = []
2037        for i in xrange(min(last_n, len(logs))):
2038            result.append(logs[i])
2039            if contains_text is not None and contains_text in logs[i]["text"]:
2040                break
2041        return result
2042
2043    def print_UI_logs(self, last_n=10, contains_text=None):
2044        logs = self.get_logs(last_n, contains_text)
2045        log.info("Latest logs from UI on {0}:".format(self.ip))
2046        for lg in logs: log.error(lg)
2047
2048    def delete_ro_user(self):
2049        api = self.baseUrl + 'settings/readOnlyUser'
2050        status, content, header = self._http_request(api, 'DELETE', '')
2051        return status
2052
2053    def create_ro_user(self, username, password):
2054        api = self.baseUrl + 'settings/readOnlyUser'
2055        params = urllib.urlencode({'username' : username, 'password' : password})
2056        log.info('settings/readOnlyUser params : {0}'.format(params))
2057        status, content, header = self._http_request(api, 'POST', params)
2058        return status
2059
2060    def query_tool(self, query, port=8093, timeout=650, query_params={}):
2061        params = {'statement' : query}
2062        params.update(query_params)
2063        params = urllib.urlencode(params)
2064        log.info('query params : {0}'.format(params))
2065        api = "http://%s:%s/query?%s" % (self.ip, port, params)
2066        status, content, header = self._http_request(api, 'POST', timeout=timeout)
2067        try:
2068            return json.loads(content)
2069        except ValueError:
2070            return content
2071
2072    def query_tool_stats(self):
2073        log.info('query n1ql stats')
2074        api = "http://%s:8093/query/stats" % (self.ip)
2075        status, content, header = self._http_request(api, 'GET')
2076        log.info(content)
2077        try:
2078            return json.loads(content)
2079        except ValueError:
2080            return content
2081
2082    # return all rack/zone info
2083    def get_all_zones_info(self, timeout=120):
2084        zones = {}
2085        api = self.baseUrl + 'pools/default/serverGroups'
2086        status, content, header = self._http_request(api, timeout=timeout)
2087        if status:
2088            zones = json.loads(content)
2089        else:
2090            raise Exception("Failed to get all zones info.\n \
2091                  Zone only supports from couchbase server version 2.5 and up.")
2092        return zones
2093
2094    # return group name and unique uuid
2095    def get_zone_names(self):
2096        zone_names = {}
2097        zone_info = self.get_all_zones_info()
2098        if zone_info and len(zone_info["groups"]) >= 1:
2099            for i in range(0, len(zone_info["groups"])):
2100                # pools/default/serverGroups/ = 27 chars
2101                zone_names[zone_info["groups"][i]["name"]] = zone_info["groups"][i]["uri"][28:]
2102        return zone_names
2103
2104    def add_zone(self, zone_name):
2105        api = self.baseUrl + 'pools/default/serverGroups'
2106        request_name = "name={0}".format(zone_name)
2107        status, content, header = self._http_request(api, "POST", \
2108                                        params=request_name)
2109        if status:
2110            log.info("zone {0} is added".format(zone_name))
2111            return True
2112        else:
2113            raise Exception("Failed to add zone with name: %s " % zone_name)
2114
2115    def delete_zone(self, zone_name):
2116        api = self.baseUrl + 'pools/default/serverGroups/'
2117        # check if zone exist
2118        found = False
2119        zones = self.get_zone_names()
2120        for zone in zones:
2121            if zone_name == zone:
2122                api += zones[zone_name]
2123                found = True
2124                break
2125        if not found:
2126            raise Exception("There is not zone with name: %s in cluster" % zone_name)
2127        status, content, header = self._http_request(api, "DELETE")
2128        if status:
2129            log.info("zone {0} is deleted".format(zone_name))
2130        else:
2131            raise Exception("Failed to delete zone with name: %s " % zone_name)
2132
2133    def rename_zone(self, old_name, new_name):
2134        api = self.baseUrl + 'pools/default/serverGroups/'
2135        # check if zone exist
2136        found = False
2137        zones = self.get_zone_names()
2138        for zone in zones:
2139            if old_name == zone:
2140                api += zones[old_name]
2141                request_name = "name={0}".format(new_name)
2142                found = True
2143                break
2144        if not found:
2145            raise Exception("There is not zone with name: %s in cluster" % old_name)
2146        status, content, header = self._http_request(api, "PUT", params=request_name)
2147        if status:
2148            log.info("zone {0} is renamed to {1}".format(old_name, new_name))
2149        else:
2150            raise Exception("Failed to rename zone with name: %s " % old_name)
2151
2152    # get all nodes info in one zone/rack/group
2153    def get_nodes_in_zone(self, zone_name):
2154        nodes = {}
2155        tmp = {}
2156        zone_info = self.get_all_zones_info()
2157        if zone_name != "":
2158            found = False
2159            if len(zone_info["groups"]) >= 1:
2160                for i in range(0, len(zone_info["groups"])):
2161                    if zone_info["groups"][i]["name"] == zone_name:
2162                        tmp = zone_info["groups"][i]["nodes"]
2163                        if not tmp:
2164                            log.info("zone {0} is existed but no node in it".format(zone_name))
2165                        # remove port
2166                        for node in tmp:
2167                            node["hostname"] = node["hostname"].split(":")
2168                            node["hostname"] = node["hostname"][0]
2169                            nodes[node["hostname"]] = node
2170                        found = True
2171                        break
2172            if not found:
2173                raise Exception("There is not zone with name: %s in cluster" % zone_name)
2174        return nodes
2175
2176    def get_zone_and_nodes(self):
2177        """ only return zones with node in its """
2178        zones = {}
2179        tmp = {}
2180        zone_info = self.get_all_zones_info()
2181        if len(zone_info["groups"]) >= 1:
2182            for i in range(0, len(zone_info["groups"])):
2183                tmp = zone_info["groups"][i]["nodes"]
2184                if not tmp:
2185                    log.info("zone {0} is existed but no node in it".format(tmp))
2186                # remove port
2187                else:
2188                    nodes = []
2189                    for node in tmp:
2190                        node["hostname"] = node["hostname"].split(":")
2191                        node["hostname"] = node["hostname"][0]
2192                        print node["hostname"][0]
2193                        nodes.append(node["hostname"])
2194                    zones[zone_info["groups"][i]["name"]] = nodes
2195        return zones
2196
2197    def get_zone_uri(self):
2198        zone_uri = {}
2199        zone_info = self.get_all_zones_info()
2200        if zone_info and len(zone_info["groups"]) >= 1:
2201            for i in range(0, len(zone_info["groups"])):
2202                zone_uri[zone_info["groups"][i]["name"]] = zone_info["groups"][i]["uri"]
2203        return zone_uri
2204
2205    def shuffle_nodes_in_zones(self, moved_nodes, source_zone, target_zone):
2206        # moved_nodes should be a IP list like
2207        # ["192.168.171.144", "192.168.171.145"]
2208        request = ""
2209        for i in range(0, len(moved_nodes)):
2210            moved_nodes[i] = "ns_1@" + moved_nodes[i]
2211
2212        all_zones = self.get_all_zones_info()
2213        api = self.baseUrl + all_zones["uri"][1:]
2214
2215        moved_node_json = []
2216        for i in range(0, len(all_zones["groups"])):
2217            for node in all_zones["groups"][i]["nodes"]:
2218                if all_zones["groups"][i]["name"] == source_zone:
2219                    for n in moved_nodes:
2220                        if n == node["otpNode"]:
2221                            moved_node_json.append({"otpNode": node["otpNode"]})
2222
2223        zone_json = {}
2224        group_json = []
2225        for i in range(0, len(all_zones["groups"])):
2226            node_j = []
2227            zone_json["uri"] = all_zones["groups"][i]["uri"]
2228            zone_json["name"] = all_zones["groups"][i]["name"]
2229            zone_json["nodes"] = node_j
2230
2231            if not all_zones["groups"][i]["nodes"]:
2232                if all_zones["groups"][i]["name"] == target_zone:
2233                    for i in range(0, len(moved_node_json)):
2234                        zone_json["nodes"].append(moved_node_json[i])
2235                else:
2236                    zone_json["nodes"] = []
2237            else:
2238                for node in all_zones["groups"][i]["nodes"]:
2239                    if all_zones["groups"][i]["name"] == source_zone and \
2240                                           node["otpNode"] in moved_nodes:
2241                        pass
2242                    else:
2243                        node_j.append({"otpNode": node["otpNode"]})
2244                if all_zones["groups"][i]["name"] == target_zone:
2245                    for k in range(0, len(moved_node_json)):
2246                        node_j.append(moved_node_json[k])
2247                    zone_json["nodes"] = node_j
2248            group_json.append({"name": zone_json["name"], "uri": zone_json["uri"], "nodes": zone_json["nodes"]})
2249        request = '{{"groups": {0} }}'.format(json.dumps(group_json))
2250        status, content, header = self._http_request(api, "PUT", params=request)
2251        # sample request format
2252        # request = ' {"groups":[{"uri":"/pools/default/serverGroups/0","nodes": [] },\
2253        #                       {"uri":"/pools/default/serverGroups/c8275b7a88e6745c02815dde4a505e70","nodes": [] },\
2254        #                        {"uri":"/pools/default/serverGroups/1acd9810a027068bd14a1ddd43db414f","nodes": \
2255        #                               [{"otpNode":"ns_1@192.168.171.144"},{"otpNode":"ns_1@192.168.171.145"}]} ]} '
2256        return status
2257
2258    def is_zone_exist(self, zone_name):
2259        found = False
2260        zones = self.get_zone_names()
2261        if zones:
2262            for zone in zones:
2263                if zone_name == zone:
2264                    found = True
2265                    return True
2266                    break
2267        if not found:
2268            log.error("There is not zone with name: {0} in cluster.".format(zone_name))
2269            return False
2270
2271    def start_cluster_logs_collection(self, nodes="*", upload=False, \
2272                                      uploadHost=None, customer="", ticket=""):
2273        if not upload:
2274            params = urllib.urlencode({"nodes":nodes})
2275        else:
2276            params = urllib.urlencode({"nodes":nodes, "uploadHost":uploadHost, \
2277                                       "customer":customer, "ticket":ticket})
2278        api = self.baseUrl + "controller/startLogsCollection"
2279        status, content, header = self._http_request(api, "POST", params)
2280        return status, content
2281
2282    def get_cluster_logs_collection_info(self):
2283        api = self.baseUrl + "pools/default/tasks/"
2284        status, content, header = self._http_request(api, "GET")
2285        if status:
2286            tmp = json.loads(content)
2287            for k in tmp:
2288                if k["type"] == "clusterLogsCollection":
2289                    content = k
2290                    return content
2291        return None
2292
2293    """ result["progress"]: progress logs collected at cluster level
2294        result["status]: status logs collected at cluster level
2295        result["perNode"]: all information logs collected at each node """
2296    def get_cluster_logs_collection_status(self):
2297        result = self.get_cluster_logs_collection_info()
2298        if result:
2299            return result["progress"], result["status"], result["perNode"]
2300        return None, None, None
2301
2302    def cancel_cluster_logs_collection(self):
2303        api = self.baseUrl + "controller/cancelLogsCollection"
2304        status, content, header = self._http_request(api, "POST")
2305        return status, content
2306
2307    def get_bucket_CCCP(self, bucket):
2308        log.info("Getting CCCP config ")
2309        api = '%spools/default/b/%s' % (self.baseUrl, bucket)
2310        if isinstance(bucket, Bucket):
2311            api = '%spools/default/b/%s' % (self.baseUrl, bucket.name)
2312        status, content, header = self._http_request(api)
2313        if status:
2314            return json.loads(content)
2315        return None
2316
2317    def get_recovery_task(self):
2318        content = self.ns_server_tasks()
2319        for item in content:
2320            if item["type"] == "recovery":
2321                return item
2322        return None
2323
2324
2325    def get_recovery_progress(self, recoveryStatusURI):
2326        api = '%s%s' % (self.baseUrl, recoveryStatusURI)
2327        status, content, header = self._http_request(api)
2328        if status:
2329            return json.loads(content)
2330        return None
2331
2332    def get_warming_up_tasks(self):
2333        tasks = self.ns_server_tasks()
2334        tasks_warmup = []
2335        for task in tasks:
2336            if task["type"] == "warming_up":
2337                tasks_warmup.append(task)
2338        return tasks_warmup
2339
2340    def compact_bucket(self, bucket="default"):
2341        api = self.baseUrl + 'pools/default/buckets/{0}/controller/compactBucket'.format(bucket)
2342        status, content, header = self._http_request(api, 'POST')
2343        if status:
2344            log.info('bucket compaction successful')
2345        else:
2346            raise BucketCompactionException(bucket)
2347
2348        return True
2349
2350    def cancel_bucket_compaction(self, bucket="default"):
2351        api = self.baseUrl + 'pools/default/buckets/{0}/controller/cancelBucketCompaction'.format(bucket)
2352        if isinstance(bucket, Bucket):
2353            api = self.baseUrl + 'pools/default/buckets/{0}/controller/cancelBucketCompaction'.format(bucket.name)
2354        status, content, header = self._http_request(api, 'POST')
2355        log.info("Status is {0}".format(status))
2356        if status:
2357            log.info('Cancel bucket compaction successful')
2358        else:
2359            raise BucketCompactionException(bucket)
2360        return True
2361
2362class MembaseServerVersion:
2363    def __init__(self, implementationVersion='', componentsVersion=''):
2364        self.implementationVersion = implementationVersion
2365        self.componentsVersion = componentsVersion
2366
2367
2368#this class will also contain more node related info
2369class OtpNode(object):
2370    def __init__(self, id='', status=''):
2371        self.id = id
2372        self.ip = ''
2373        self.replication = ''
2374        self.port = 8091
2375        self.gracefulFailoverPossible = 'true'
2376        #extract ns ip from the otpNode string
2377        #its normally ns_1@10.20.30.40
2378        if id.find('@') >= 0:
2379            self.ip = id[id.index('@') + 1:]
2380        self.status = status
2381
2382
2383class NodeInfo(object):
2384    def __init__(self):
2385        self.availableStorage = None  # list
2386        self.memoryQuota = None
2387
2388
2389class NodeDataStorage(object):
2390    def __init__(self):
2391        self.type = ''  #hdd or ssd
2392        self.path = ''
2393        self.index_path = ''
2394        self.quotaMb = ''
2395        self.state = ''  #ok
2396
2397    def __str__(self):
2398        return '{0}'.format({'type': self.type,
2399                             'path': self.path,
2400                             'index_path' : self.index_path,
2401                             'quotaMb': self.quotaMb,
2402                             'state': self.state})
2403
2404    def get_data_path(self):
2405        return self.path
2406
2407    def get_index_path(self):
2408        return self.index_path
2409
2410
2411class NodeDiskStorage(object):
2412    def __init__(self):
2413        self.type = 0
2414        self.path = ''
2415        self.sizeKBytes = 0
2416        self.usagePercent = 0
2417
2418
2419class Bucket(object):
2420    def __init__(self, bucket_size='', name="", authType="sasl", saslPassword="", num_replicas=0, port=11211, master_id=None,
2421                 type='', eviction_policy="valueOnly", bucket_priority=None):
2422        self.name = name
2423        self.port = port
2424        self.type = type
2425        self.nodes = None
2426        self.stats = None
2427        self.servers = []
2428        self.vbuckets = []
2429        self.forward_map = []
2430        self.numReplicas = num_replicas
2431        self.saslPassword = saslPassword
2432        self.authType = ""
2433        self.bucket_size = bucket_size
2434        self.kvs = {1:KVStore()}
2435        self.authType = authType
2436        self.master_id = master_id
2437        self.eviction_policy = eviction_policy
2438        self.bucket_priority = bucket_priority
2439
2440    def __str__(self):
2441        return self.name
2442
2443
2444class Node(object):
2445    def __init__(self):
2446        self.uptime = 0
2447        self.memoryTotal = 0
2448        self.memoryFree = 0
2449        self.mcdMemoryReserved = 0
2450        self.mcdMemoryAllocated = 0
2451        self.status = ""
2452        self.hostname = ""
2453        self.clusterCompatibility = ""
2454        self.clusterMembership = ""
2455        self.version = ""
2456        self.os = ""
2457        self.ports = []
2458        self.availableStorage = []
2459        self.storage = []
2460        self.memoryQuota = 0
2461        self.moxi = 11211
2462        self.memcached = 11210
2463        self.id = ""
2464        self.ip = ""
2465        self.rest_username = ""
2466        self.rest_password = ""
2467        self.port = 8091
2468
2469
2470class AutoFailoverSettings(object):
2471    def __init__(self):
2472        self.enabled = True
2473        self.timeout = 0
2474        self.count = 0
2475
2476
2477class NodePort(object):
2478    def __init__(self):
2479        self.proxy = 0
2480        self.direct = 0
2481
2482
2483class BucketStats(object):
2484    def __init__(self):
2485        self.opsPerSec = 0
2486        self.itemCount = 0
2487        self.diskUsed = 0
2488        self.memUsed = 0
2489        self.ram = 0
2490
2491
2492class vBucket(object):
2493    def __init__(self):
2494        self.master = ''
2495        self.replica = []
2496        self.id = -1
2497
2498
2499class RestParser(object):
2500    def parse_get_nodes_response(self, parsed):
2501        node = Node()
2502        node.uptime = parsed['uptime']
2503        node.memoryFree = parsed['memoryFree']
2504        node.memoryTotal = parsed['memoryTotal']
2505        node.mcdMemoryAllocated = parsed['mcdMemoryAllocated']
2506        node.mcdMemoryReserved = parsed['mcdMemoryReserved']
2507        node.status = parsed['status']
2508        node.hostname = parsed['hostname']
2509        node.clusterCompatibility = parsed['clusterCompatibility']
2510        node.clusterMembership = parsed['clusterMembership']
2511        node.version = parsed['version']
2512        node.curr_items = 0
2513        if 'interestingStats' in parsed and 'curr_items' in parsed['interestingStats']:
2514            node.curr_items = parsed['interestingStats']['curr_items']
2515        node.port = parsed["hostname"][parsed["hostname"].find(":") + 1:]
2516        node.os = parsed['os']
2517
2518        if "otpNode" in parsed:
2519            node.id = parsed["otpNode"]
2520            if parsed["otpNode"].find('@') >= 0:
2521                node.ip = node.id[node.id.index('@') + 1:]
2522        elif "hostname" in parsed:
2523            node.ip = parsed["hostname"].split(":")[0]
2524
2525        # memoryQuota
2526        if 'memoryQuota' in parsed:
2527            node.memoryQuota = parsed['memoryQuota']
2528        if 'availableStorage' in parsed:
2529            availableStorage = parsed['availableStorage']
2530            for key in availableStorage:
2531                #let's assume there is only one disk in each noce
2532                dict_parsed = parsed['availableStorage']
2533                if 'path' in dict_parsed and 'sizeKBytes' in dict_parsed and 'usagePercent' in dict_parsed:
2534                    diskStorage = NodeDiskStorage()
2535                    diskStorage.path = dict_parsed['path']
2536                    diskStorage.sizeKBytes = dict_parsed['sizeKBytes']
2537                    diskStorage.type = key
2538                    diskStorage.usagePercent = dict_parsed['usagePercent']
2539                    node.availableStorage.append(diskStorage)
2540                    log.info(diskStorage)
2541
2542        if 'storage' in parsed:
2543            storage = parsed['storage']
2544            for key in storage:
2545                disk_storage_list = storage[key]
2546                for dict_parsed in disk_storage_list:
2547                    if 'path' in dict_parsed and 'state' in dict_parsed and 'quotaMb' in dict_parsed:
2548                        dataStorage = NodeDataStorage()
2549                        dataStorage.path = dict_parsed['path']
2550                        dataStorage.index_path = dict_parsed.get('index_path', '')
2551                        dataStorage.quotaMb = dict_parsed['quotaMb']
2552                        dataStorage.state = dict_parsed['state']
2553                        dataStorage.type = key
2554                        node.storage.append(dataStorage)
2555
2556        # ports":{"proxy":11211,"direct":11210}
2557        if "ports" in parsed:
2558            ports = parsed["ports"]
2559            if "proxy" in ports:
2560                node.moxi = ports["proxy"]
2561            if "direct" in ports:
2562                node.memcached = ports["direct"]
2563        return node
2564
2565    def parse_get_bucket_response(self, response):
2566        parsed = json.loads(response)
2567        return self.parse_get_bucket_json(parsed)
2568
2569    def parse_get_bucket_json(self, parsed):
2570        bucket = Bucket()
2571        bucket.name = parsed['name']
2572        bucket.type = parsed['bucketType']
2573        bucket.port = parsed['proxyPort']
2574        bucket.authType = parsed["authType"]
2575        bucket.saslPassword = parsed["saslPassword"]
2576        bucket.nodes = list()
2577        if 'vBucketServerMap' in parsed:
2578            vBucketServerMap = parsed['vBucketServerMap']
2579            serverList = vBucketServerMap['serverList']
2580            bucket.servers.extend(serverList)
2581            if "numReplicas" in vBucketServerMap:
2582                bucket.numReplicas = vBucketServerMap["numReplicas"]
2583            #vBucketMapForward
2584            if 'vBucketMapForward' in vBucketServerMap:
2585                #let's gather the forward map
2586                vBucketMapForward = vBucketServerMap['vBucketMapForward']
2587                counter = 0
2588                for vbucket in vBucketMapForward:
2589                    #there will be n number of replicas
2590                    vbucketInfo = vBucket()
2591                    vbucketInfo.master = serverList[vbucket[0]]
2592                    if vbucket:
2593                        for i in range(1, len(vbucket)):
2594                            if vbucket[i] != -1:
2595                                vbucketInfo.replica.append(serverList[vbucket[i]])
2596                    vbucketInfo.id = counter
2597                    counter += 1
2598                    bucket.forward_map.append(vbucketInfo)
2599            vBucketMap = vBucketServerMap['vBucketMap']
2600            counter = 0
2601            for vbucket in vBucketMap:
2602                #there will be n number of replicas
2603                vbucketInfo = vBucket()
2604                vbucketInfo.master = serverList[vbucket[0]]
2605                if vbucket:
2606                    for i in range(1, len(vbucket)):
2607                        if vbucket[i] != -1:
2608                            vbucketInfo.replica.append(serverList[vbucket[i]])
2609                vbucketInfo.id = counter
2610                counter += 1
2611                bucket.vbuckets.append(vbucketInfo)
2612                #now go through each vbucket and populate the info
2613            #who is master , who is replica
2614        # get the 'storageTotals'
2615        log.debug('read {0} vbuckets'.format(len(bucket.vbuckets)))
2616        stats = parsed['basicStats']
2617        #vBucketServerMap
2618        bucketStats = BucketStats()
2619        log.debug('stats:{0}'.format(stats))
2620        bucketStats.opsPerSec = stats['opsPerSec']
2621        bucketStats.itemCount = stats['itemCount']
2622        if bucket.type != "memcached":
2623            bucketStats.diskUsed = stats['diskUsed']
2624        bucketStats.memUsed = stats['memUsed']
2625        quota = parsed['quota']
2626        bucketStats.ram = quota['ram']
2627        bucket.stats = bucketStats
2628        nodes = parsed['nodes']
2629        for nodeDictionary in nodes:
2630            node = Node()
2631            node.uptime = nodeDictionary['uptime']
2632            node.memoryFree = nodeDictionary['memoryFree']
2633            node.memoryTotal = nodeDictionary['memoryTotal']
2634            node.mcdMemoryAllocated = nodeDictionary['mcdMemoryAllocated']
2635            node.mcdMemoryReserved = nodeDictionary['mcdMemoryReserved']
2636            node.status = nodeDictionary['status']
2637            node.hostname = nodeDictionary['hostname']
2638            if 'clusterCompatibility' in nodeDictionary:
2639                node.clusterCompatibility = nodeDictionary['clusterCompatibility']
2640            if 'clusterMembership' in nodeDictionary:
2641                node.clusterCompatibility = nodeDictionary['clusterMembership']
2642            node.version = nodeDictionary['version']
2643            node.os = nodeDictionary['os']
2644            if "ports" in nodeDictionary:
2645                ports = nodeDictionary["ports"]
2646                if "proxy" in ports:
2647                    node.moxi = ports["proxy"]
2648                if "direct" in ports:
2649                    node.memcached = ports["direct"]
2650            if "hostname" in nodeDictionary:
2651                value = str(nodeDictionary["hostname"])
2652                node.ip = value[:value.rfind(":")]
2653                node.port = int(value[value.rfind(":") + 1:])
2654            if "otpNode" in nodeDictionary:
2655                node.id = nodeDictionary["otpNode"]
2656            bucket.nodes.append(node)
2657        return bucket
2658
2659
2660