1import base64
2import json
3import urllib
4import httplib2
5import logger
6import traceback
7import socket
8import time
9import re
10import uuid
11from copy import deepcopy
12from threading import Thread
13from TestInput import TestInputSingleton
14from testconstants import MIN_KV_QUOTA, INDEX_QUOTA, FTS_QUOTA, CBAS_QUOTA
15from testconstants import COUCHBASE_FROM_VERSION_4, IS_CONTAINER
16
17
18try:
19    from couchbase_helper.document import DesignDocument, View
20except ImportError:
21    from lib.couchbase_helper.document import DesignDocument, View
22
23from memcached.helper.kvstore import KVStore
24from exception import ServerAlreadyJoinedException, ServerUnavailableException, InvalidArgumentException
25from membase.api.exception import BucketCreationException, ServerSelfJoinException, ClusterRemoteException, \
26    RebalanceFailedException, FailoverFailedException, DesignDocCreationException, QueryViewException, \
27    ReadDocumentException, GetBucketInfoFailed, CompactViewFailed, SetViewInfoNotFound, AddNodeException, \
28    BucketFlushFailed, CBRecoveryFailedException, XDCRException, SetRecoveryTypeFailed, BucketCompactionException
29log = logger.Logger.get_logger()
30
31# helper library methods built on top of RestConnection interface
32
33class RestHelper(object):
34    def __init__(self, rest_connection):
35        self.rest = rest_connection
36
37    def is_ns_server_running(self, timeout_in_seconds=360):
38        end_time = time.time() + timeout_in_seconds
39        while time.time() <= end_time:
40            try:
41                status = self.rest.get_nodes_self(5)
42                if status is not None and status.status == 'healthy':
43                    return True
44                else:
45                    if status is not None:
46                        log.warn("server {0}:{1} status is {2}"\
47                            .format(self.rest.ip, self.rest.port, status.status))
48                    else:
49                        log.warn("server {0}:{1} status is down"\
50                                           .format(self.rest.ip, self.rest.port))
51            except ServerUnavailableException:
52                log.error("server {0}:{1} is unavailable"\
53                                           .format(self.rest.ip, self.rest.port))
54            time.sleep(5)
55        msg = 'unable to connect to the node {0} even after waiting {1} seconds'
56        log.error(msg.format(self.rest.ip, timeout_in_seconds))
57        return False
58
59    def is_cluster_healthy(self, timeout=120):
60        # get the nodes and verify that all the nodes.status are healthy
61        nodes = self.rest.node_statuses(timeout)
62        return all(node.status == 'healthy' for node in nodes)
63
64    def rebalance_reached(self, percentage=100):
65        start = time.time()
66        progress = 0
67        previous_progress = 0
68        retry = 0
69        while progress is not -1 and progress < percentage and retry < 40:
70            # -1 is error , -100 means could not retrieve progress
71            progress = self.rest._rebalance_progress()
72            if progress == -100:
73                log.error("unable to retrieve rebalanceProgress.try again in 2 seconds")
74                retry += 1
75            else:
76                if previous_progress == progress:
77                    retry += 0.5
78                else:
79                    retry = 0
80                    previous_progress = progress
81            # sleep for 2 seconds
82            time.sleep(3)
83        if progress <= 0:
84            log.error("rebalance progress code : {0}".format(progress))
85
86            return False
87        elif retry >= 40:
88            log.error("rebalance stuck on {0}%".format(progress))
89            return False
90        else:
91            duration = time.time() - start
92            log.info('rebalance reached >{0}% in {1} seconds '.format(progress, duration))
93            return True
94
95    # return true if cluster balanced, false if it needs rebalance
96    def is_cluster_rebalanced(self):
97        command = "ns_orchestrator:needs_rebalance()"
98        status, content = self.rest.diag_eval(command)
99        if status:
100            return content.lower() == "false"
101        log.error("can't define if cluster balanced")
102        return None
103
104
105    # this method will rebalance the cluster by passing the remote_node as
106    # ejected node
107    def remove_nodes(self, knownNodes, ejectedNodes, wait_for_rebalance=True):
108        if len(ejectedNodes) == 0:
109            return False
110        self.rest.rebalance(knownNodes, ejectedNodes)
111        if wait_for_rebalance:
112            return self.rest.monitorRebalance()
113        else:
114            return False
115
116    def vbucket_map_ready(self, bucket, timeout_in_seconds=360):
117        end_time = time.time() + timeout_in_seconds
118        while time.time() <= end_time:
119            vBuckets = self.rest.get_vbuckets(bucket)
120            if vBuckets:
121                return True
122            else:
123                time.sleep(0.5)
124        msg = 'vbucket map is not ready for bucket {0} after waiting {1} seconds'
125        log.info(msg.format(bucket, timeout_in_seconds))
126        return False
127
128    def bucket_exists(self, bucket):
129        try:
130            buckets = self.rest.get_buckets()
131            names = [item.name for item in buckets]
132            log.info("node {1} existing buckets : {0}" \
133                              .format(names, self.rest.ip))
134            for item in buckets:
135                if item.name == bucket:
136                    log.info("node {1} found bucket {0}" \
137                             .format(bucket, self.rest.ip))
138                    return True
139            return False
140        except Exception:
141            return False
142
143    def wait_for_node_status(self, node, expected_status, timeout_in_seconds):
144        status_reached = False
145        end_time = time.time() + timeout_in_seconds
146        while time.time() <= end_time and not status_reached:
147            nodes = self.rest.node_statuses()
148            for n in nodes:
149                if node.id == n.id:
150                    log.info('node {0} status : {1}'.format(node.id, n.status))
151                    if n.status.lower() == expected_status.lower():
152                        status_reached = True
153                    break
154            if not status_reached:
155                log.info("sleep for 5 seconds before reading the node.status again")
156                time.sleep(5)
157        log.info('node {0} status_reached : {1}'.format(node.id, status_reached))
158        return status_reached
159
160    def _wait_for_task_pid(self, pid, end_time, ddoc_name):
161        while (time.time() < end_time):
162            new_pid, _ = self.rest._get_indexer_task_pid(ddoc_name)
163            if pid == new_pid:
164                time.sleep(5)
165                continue
166            else:
167                return
168
169    def _wait_for_indexer_ddoc(self, servers, ddoc_name, timeout=300):
170        nodes = self.rest.get_nodes()
171        servers_to_check = []
172        for node in nodes:
173            for server in servers:
174                if node.ip == server.ip and str(node.port) == str(server.port):
175                    servers_to_check.append(server)
176        for server in servers_to_check:
177            try:
178                rest = RestConnection(server)
179                log.info('Check index for ddoc %s , server %s' % (ddoc_name, server.ip))
180                end_time = time.time() + timeout
181                log.info('Start getting index for ddoc %s , server %s' % (ddoc_name, server.ip))
182                old_pid, is_pid_blocked = rest._get_indexer_task_pid(ddoc_name)
183                if not old_pid:
184                    log.info('Index for ddoc %s is not going on, server %s' % (ddoc_name, server.ip))
185                    continue
186                while is_pid_blocked:
187                    log.info('Index for ddoc %s is blocked, server %s' % (ddoc_name, server.ip))
188                    self._wait_for_task_pid(old_pid, end_time, ddoc_name)
189                    old_pid, is_pid_blocked = rest._get_indexer_task_pid(ddoc_name)
190                    if time.time() > end_time:
191                        log.error("INDEX IS STILL BLOKED node %s ddoc % pid %" % (server, ddoc_name, old_pid))
192                        break
193                if old_pid:
194                    log.info('Index for ddoc %s is running, server %s' % (ddoc_name, server.ip))
195                    self._wait_for_task_pid(old_pid, end_time, ddoc_name)
196            except Exception, ex:
197                log.error('unable to check index on server %s because of %s' % (server.ip, str(ex)))
198
199    def _get_vbuckets(self, servers, bucket_name='default'):
200        vbuckets_servers = {}
201        for server in servers:
202            buckets = RestConnection(server).get_buckets()
203            if not buckets:
204                return vbuckets_servers
205            if bucket_name:
206                bucket_to_check = [bucket for bucket in buckets
207                               if bucket.name == bucket_name][0]
208            else:
209                bucket_to_check = [bucket for bucket in buckets][0]
210            vbuckets_servers[server] = {}
211            vbs_active = [vb.id for vb in bucket_to_check.vbuckets
212                           if vb.master.startswith(str(server.ip))]
213            vbs_replica = []
214            for replica_num in xrange(0, bucket_to_check.numReplicas):
215                vbs_replica.extend([vb.id for vb in bucket_to_check.vbuckets
216                                    if vb.replica[replica_num].startswith(str(server.ip))])
217            vbuckets_servers[server]['active_vb'] = vbs_active
218            vbuckets_servers[server]['replica_vb'] = vbs_replica
219        return vbuckets_servers
220
221class RestConnection(object):
222
223    def __new__(self, serverInfo={}):
224
225
226        # allow port to determine
227        # behavior of restconnection
228        port = None
229        if isinstance(serverInfo, dict):
230            if 'port' in serverInfo:
231                port = serverInfo['port']
232        else:
233            port = serverInfo.port
234
235        if not port:
236            port = 8091
237
238        if int(port) in xrange(9091, 9100):
239            # return elastic search rest connection
240            from membase.api.esrest_client import EsRestConnection
241            obj = object.__new__(EsRestConnection, serverInfo)
242        else:
243            # default
244            obj = object.__new__(self, serverInfo)
245        return obj
246
247    def __init__(self, serverInfo):
248        # serverInfo can be a json object/dictionary
249        if isinstance(serverInfo, dict):
250            self.ip = serverInfo["ip"]
251            self.username = serverInfo["username"]
252            self.password = serverInfo["password"]
253            self.port = serverInfo["port"]
254            self.index_port = 9102
255            self.fts_port = 8094
256            self.query_port = 8093
257            self.eventing_port = 8096
258            if "index_port" in serverInfo.keys():
259                self.index_port = serverInfo["index_port"]
260            if "fts_port" in serverInfo.keys():
261                if serverInfo['fts_port']:
262                    self.fts_port = serverInfo["fts_port"]
263            if "eventing_port" in serverInfo.keys():
264                if serverInfo['eventing_port']:
265                    self.eventing_port = serverInfo["eventing_port"]
266            self.hostname = ''
267            self.services = ''
268            if "hostname" in serverInfo:
269                self.hostname = serverInfo["hostname"]
270            if "services" in serverInfo:
271                self.services = serverInfo["services"]
272        else:
273            self.ip = serverInfo.ip
274            self.username = serverInfo.rest_username
275            self.password = serverInfo.rest_password
276            self.port = serverInfo.port
277            self.hostname = ''
278            self.index_port = 9102
279            self.fts_port = 8094
280            self.query_port = 8093
281            self.eventing_port = 8096
282            self.services = "kv"
283            self.debug_logs = False
284            if hasattr(serverInfo, "services"):
285                self.services = serverInfo.services
286            if hasattr(serverInfo, 'index_port'):
287                self.index_port = serverInfo.index_port
288            if hasattr(serverInfo, 'query_port'):
289                self.query_port = serverInfo.query_port
290            if hasattr(serverInfo, 'fts_port'):
291                if serverInfo.fts_port:
292                    self.fts_port = serverInfo.fts_port
293            if hasattr(serverInfo, 'eventing_port'):
294                if serverInfo.eventing_port:
295                    self.eventing_port = serverInfo.eventing_port
296            if hasattr(serverInfo, 'hostname') and serverInfo.hostname and\
297               serverInfo.hostname.find(self.ip) == -1:
298                self.hostname = serverInfo.hostname
299            if hasattr(serverInfo, 'services'):
300                self.services = serverInfo.services
301        self.input = TestInputSingleton.input
302        if self.input is not None:
303            """ from watson, services param order and format:
304                new_services=fts-kv-index-n1ql """
305            self.services_node_init = self.input.param("new_services", None)
306            self.debug_logs = self.input.param("debug-logs", False)
307        self.baseUrl = "http://{0}:{1}/".format(self.ip, self.port)
308        self.fts_baseUrl = "http://{0}:{1}/".format(self.ip, self.fts_port)
309        self.index_baseUrl = "http://{0}:{1}/".format(self.ip, self.index_port)
310        self.query_baseUrl = "http://{0}:{1}/".format(self.ip, self.query_port)
311        self.capiBaseUrl = "http://{0}:{1}/".format(self.ip, 8092)
312        self.eventing_baseUrl = "http://{0}:{1}/".format(self.ip, self.eventing_port)
313        if self.hostname:
314            self.baseUrl = "http://{0}:{1}/".format(self.hostname, self.port)
315            self.capiBaseUrl = "http://{0}:{1}/".format(self.hostname, 8092)
316            self.query_baseUrl = "http://{0}:{1}/".format(self.hostname, 8093)
317            self.eventing_baseUrl = "http://{0}:{1}/".format(self.hostname, self.eventing_port)
318
319        # Initialization of CBAS related params
320        self.cbas_base_url = "http://{0}:{1}".format(self.ip, 8095)
321        if hasattr(self.input, 'cbas'):
322            if self.input.cbas:
323                self.cbas_node = self.input.cbas
324                self.cbas_port = 8095
325                if hasattr(self.cbas_node, 'port'):
326                    self.cbas_port = self.cbas_node.port
327                self.cbas_base_url = "http://{0}:{1}".format(
328                    self.cbas_node.ip,
329                    self.cbas_port)
330            elif "cbas" in self.services:
331                self.cbas_base_url = "http://{0}:{1}".format(self.ip, 8095)
332
333        # for Node is unknown to this cluster error
334        for iteration in xrange(5):
335            http_res, success = self.init_http_request(self.baseUrl + 'nodes/self')
336            if not success and type(http_res) == unicode and\
337               (http_res.find('Node is unknown to this cluster') > -1 or \
338                http_res.find('Unexpected server error, request logged') > -1):
339                log.error("Error {0} was gotten, 5 seconds sleep before retry"\
340                                                             .format(http_res))
341                time.sleep(5)
342                if iteration == 2:
343                    log.error("node {0}:{1} is in a broken state!"\
344                                        .format(self.ip, self.port))
345                    raise ServerUnavailableException(self.ip)
346                continue
347            else:
348                break
349        # determine the real couchApiBase for cluster_run
350        # couchApiBase appeared in version 2.*
351        if not http_res or http_res["version"][0:2] == "1.":
352            self.capiBaseUrl = self.baseUrl + "/couchBase"
353        else:
354            for iteration in xrange(5):
355                if "couchApiBase" not in http_res.keys():
356                    if self.is_cluster_mixed():
357                        self.capiBaseUrl = self.baseUrl + "/couchBase"
358                        return
359                    time.sleep(0.2)
360                    http_res, success = self.init_http_request(self.baseUrl + 'nodes/self')
361                else:
362                    self.capiBaseUrl = http_res["couchApiBase"]
363                    return
364            raise ServerUnavailableException("couchApiBase doesn't exist in nodes/self: %s " % http_res)
365
366    def sasl_streaming_rq(self, bucket, timeout=120):
367        api = self.baseUrl + 'pools/default/bucketsStreaming/{0}'.format(bucket)
368        if isinstance(bucket, Bucket):
369            api = self.baseUrl + 'pools/default/bucketsStreaming/{0}'.format(bucket.name)
370        try:
371            httplib2.Http(timeout=timeout).request(api, 'GET', '',
372                                                   headers=self._create_capi_headers())
373        except Exception, ex:
374            log.warn('Exception while streaming: %s' % str(ex))
375
376    def open_sasl_streaming_connection(self, bucket, timeout=1000):
377        if self.debug_logs:
378            log.info("Opening sasl streaming connection for bucket {0}"\
379                 .format((bucket, bucket.name)[isinstance(bucket, Bucket)]))
380        t = Thread(target=self.sasl_streaming_rq,
381                          name="streaming_" + str(uuid.uuid4())[:4],
382                          args=(bucket, timeout))
383        try:
384            t.start()
385        except:
386            log.warn("thread is not started")
387            return None
388        return t
389
390    def is_cluster_mixed(self):
391            http_res, success = self.init_http_request(self.baseUrl + 'pools/default')
392            if http_res == u'unknown pool':
393                return False
394            try:
395                versions = list(set([node["version"][:1] for node in http_res["nodes"]]))
396            except:
397                log.error('Error while processing cluster info {0}'.format(http_res))
398                # not really clear what to return but False see to be a good start until we figure what is happening
399                return False
400
401
402            if '1' in versions and '2' in versions:
403                 return True
404            return False
405
406    def is_cluster_compat_mode_greater_than(self, version):
407        """
408        curl -v -X POST -u Administrator:welcome http://10.3.4.186:8091/diag/eval
409        -d 'cluster_compat_mode:get_compat_version().'
410        Returns : [3,2] if version = 3.2.0
411        """
412        status, content = self.diag_eval('cluster_compat_mode:get_compat_version().')
413        if status:
414            json_parsed = json.loads(content)
415            cluster_ver = float("%s.%s" % (json_parsed[0], json_parsed[1]))
416            if cluster_ver > version:
417                return True
418        return False
419
420    def is_enterprise_edition(self):
421        http_res, success = self.init_http_request(self.baseUrl + 'pools/default')
422        if http_res == u'unknown pool':
423            return False
424        editions = []
425        community_nodes = []
426        """ get the last word in node["version"] as in "version": "2.5.1-1073-rel-enterprise" """
427        for node in http_res["nodes"]:
428            editions.extend(node["version"].split("-")[-1:])
429            if "community" in node["version"].split("-")[-1:]:
430                community_nodes.extend(node["hostname"].split(":")[:1])
431        if "community" in editions:
432            log.error("IP(s) for node(s) with community edition {0}".format(community_nodes))
433            return False
434        return True
435
436    def init_http_request(self, api):
437        content = None
438        try:
439            status, content, header = self._http_request(api, 'GET',
440                                                         headers=self._create_capi_headers())
441            json_parsed = json.loads(content)
442            if status:
443                return json_parsed, True
444            else:
445                print("{0} with status {1}: {2}".format(api, status, json_parsed))
446                return json_parsed, False
447        except ValueError as e:
448            if content is not None:
449                print("{0}: {1}".format(api, content))
450            else:
451                print e
452            return content, False
453
454    def rename_node(self, hostname, username='Administrator', password='password'):
455        params = urllib.urlencode({'username': username,
456                                   'password': password,
457                                   'hostname': hostname})
458
459        api = "%snode/controller/rename" % (self.baseUrl)
460        status, content, header = self._http_request(api, 'POST', params)
461        return status, content
462
463    def active_tasks(self):
464        api = 'http://{0}:{1}/pools/default/tasks'.format(self.ip, self.port)
465        try:
466            status, content, header = self._http_request(api, 'GET',
467                                                         headers=self._create_capi_headers())
468            json_parsed = json.loads(content)
469        except ValueError as e:
470            print(e)
471            return ""
472        return json_parsed
473
474    def ns_server_tasks(self):
475        api = self.baseUrl + 'pools/default/tasks'
476        try:
477            status, content, header = self._http_request(api, 'GET', headers=self._create_headers())
478            return json.loads(content)
479        except ValueError:
480            return ""
481
482    # DEPRECATED: use create_ddoc() instead.
483    def create_view(self, design_doc_name, bucket_name, views, options=None):
484        return self.create_ddoc(design_doc_name, bucket_name, views, options)
485
486    def create_ddoc(self, design_doc_name, bucket, views, options=None):
487        design_doc = DesignDocument(design_doc_name, views, options=options)
488        if design_doc.name.find('/') != -1:
489            design_doc.name = design_doc.name.replace('/', '%2f')
490            design_doc.id = '_design/{0}'.format(design_doc.name)
491        return self.create_design_document(bucket, design_doc)
492
493    def create_design_document(self, bucket, design_doc):
494        design_doc_name = design_doc.id
495        api = '%s/%s/%s' % (self.capiBaseUrl, bucket, design_doc_name)
496        if isinstance(bucket, Bucket):
497            api = '%s/%s/%s' % (self.capiBaseUrl, bucket.name, design_doc_name)
498
499        status, content, header = self._http_request(api, 'PUT', str(design_doc),
500                                                     headers=self._create_capi_headers())
501        if not status:
502            raise DesignDocCreationException(design_doc_name, content)
503        return json.loads(content)
504
505    def is_index_triggered(self, ddoc_name, index_type='main'):
506        run, block = self._get_indexer_task_pid(ddoc_name, index_type=index_type)
507        if run or block:
508            return True
509        else:
510            return False
511
512    def _get_indexer_task_pid(self, ddoc_name, index_type='main'):
513        active_tasks = self.active_tasks()
514        if u'error' in active_tasks:
515            return None
516        if active_tasks:
517            for task in active_tasks:
518                if task['type'] == 'indexer' and task['indexer_type'] == index_type:
519                    for ddoc in task['design_documents']:
520                        if ddoc == ('_design/%s' % ddoc_name):
521                            return task['pid'], False
522                if task['type'] == 'blocked_indexer' and task['indexer_type'] == index_type:
523                    for ddoc in task['design_documents']:
524                        if ddoc == ('_design/%s' % ddoc_name):
525                            return task['pid'], True
526        return None, None
527
528    def query_view(self, design_doc_name, view_name, bucket, query, timeout=120, invalid_query=False, type="view"):
529        status, content, header = self._query(design_doc_name, view_name, bucket, type, query, timeout)
530        if not status and not invalid_query:
531            stat = 0
532            if 'status' in header:
533                stat = int(header['status'])
534            raise QueryViewException(view_name, content, status=stat)
535        return json.loads(content)
536
537    def _query(self, design_doc_name, view_name, bucket, view_type, query, timeout):
538        if design_doc_name.find('/') != -1:
539            design_doc_name = design_doc_name.replace('/', '%2f')
540        if view_name.find('/') != -1:
541            view_name = view_name.replace('/', '%2f')
542        api = self.capiBaseUrl + '%s/_design/%s/_%s/%s?%s' % (bucket,
543                                               design_doc_name, view_type,
544                                               view_name,
545                                               urllib.urlencode(query))
546        if isinstance(bucket, Bucket):
547            api = self.capiBaseUrl + '%s/_design/%s/_%s/%s?%s' % (bucket.name,
548                                                  design_doc_name, view_type,
549                                                  view_name,
550                                                  urllib.urlencode(query))
551        log.info("index query url: {0}".format(api))
552        status, content, header = self._http_request(api, headers=self._create_capi_headers(),
553                                                     timeout=timeout)
554        return status, content, header
555
556    def view_results(self, bucket, ddoc_name, params, limit=100, timeout=120,
557                     view_name=None):
558        status, json = self._index_results(bucket, "view", ddoc_name, params, limit, timeout=timeout, view_name=view_name)
559        if not status:
560            raise Exception("unable to obtain view results")
561        return json
562
563
564    # DEPRECATED: Incorrectly named function kept for backwards compatibility.
565    def get_view(self, bucket, view):
566        log.info("DEPRECATED function get_view(" + view + "). use get_ddoc()")
567        return self.get_ddoc(bucket, view)
568
569    def get_data_path(self):
570        node_info = self.get_nodes_self()
571        data_path = node_info.storage[0].get_data_path()
572        return data_path
573
574    def get_memcached_port(self):
575        node_info = self.get_nodes_self()
576        return node_info.memcached
577
578    def get_ddoc(self, bucket, ddoc_name):
579        status, json, meta = self._get_design_doc(bucket, ddoc_name)
580        if not status:
581            raise ReadDocumentException(ddoc_name, json)
582        return json, meta
583
584    # the same as Preview a Random Document on UI
585    def get_random_key(self, bucket):
586        api = self.baseUrl + 'pools/default/buckets/%s/localRandomKey' % (bucket)
587        status, content, header = self._http_request(api, headers=self._create_capi_headers())
588        json_parsed = json.loads(content)
589        if not status:
590            raise Exception("unable to get random document/key for bucket %s" % (bucket))
591        return json_parsed
592
593    def run_view(self, bucket, view, name):
594        api = self.capiBaseUrl + '/%s/_design/%s/_view/%s' % (bucket, view, name)
595        status, content, header = self._http_request(api, headers=self._create_capi_headers())
596        json_parsed = json.loads(content)
597        if not status:
598            raise Exception("unable to create view")
599        return json_parsed
600
601    def delete_view(self, bucket, view):
602        status, json = self._delete_design_doc(bucket, view)
603        if not status:
604            raise Exception("unable to delete the view")
605        return json
606
607    def spatial_results(self, bucket, spatial, params, limit=100):
608        status, json = self._index_results(bucket, "spatial", spatial,
609                                           params, limit)
610        if not status:
611            raise Exception("unable to obtain spatial view results")
612        return json
613
614    def create_spatial(self, bucket, spatial, function):
615        status, json = self._create_design_doc(bucket, spatial, function)
616        if status == False:
617            raise Exception("unable to create spatial view")
618        return json
619
620    def get_spatial(self, bucket, spatial):
621        status, json, meta = self._get_design_doc(bucket, spatial)
622        if not status:
623            raise Exception("unable to get the spatial view definition")
624        return json, meta
625
626    def delete_spatial(self, bucket, spatial):
627        status, json = self._delete_design_doc(bucket, spatial)
628        if not status:
629            raise Exception("unable to delete the spatial view")
630        return json
631
632    # type_ is "view" or "spatial"
633    def _index_results(self, bucket, type_, ddoc_name, params, limit, timeout=120,
634                       view_name=None):
635        if view_name is None:
636            view_name = ddoc_name
637        query = '/{0}/_design/{1}/_{2}/{3}'
638        api = self.capiBaseUrl + query.format(bucket, ddoc_name, type_, view_name)
639
640        num_params = 0
641        if limit != None:
642            num_params = 1
643            api += "?limit={0}".format(limit)
644        for param in params:
645            if num_params > 0:
646                api += "&"
647            else:
648                api += "?"
649            num_params += 1
650
651            if param in ["key", "startkey", "endkey", "start_range",
652                         "end_range"] or isinstance(params[param], bool):
653                api += "{0}={1}".format(param,
654                                        json.dumps(params[param],
655                                                   separators=(',', ':')))
656            else:
657                api += "{0}={1}".format(param, params[param])
658
659        log.info("index query url: {0}".format(api))
660        status, content, header = self._http_request(api, headers=self._create_capi_headers(), timeout=timeout)
661        json_parsed = json.loads(content)
662        return status, json_parsed
663
664    def get_couch_doc(self, doc_id, bucket="default", timeout=120):
665        """ use couchBase uri to retrieve document from a bucket """
666        api = self.capiBaseUrl + '/%s/%s' % (bucket, doc_id)
667        status, content, header = self._http_request(api, headers=self._create_capi_headers(),
668                                             timeout=timeout)
669        if not status:
670            raise ReadDocumentException(doc_id, content)
671        return  json.loads(content)
672
673    def _create_design_doc(self, bucket, name, function):
674        api = self.capiBaseUrl + '/%s/_design/%s' % (bucket, name)
675        status, content, header = self._http_request(
676            api, 'PUT', function, headers=self._create_capi_headers())
677        json_parsed = json.loads(content)
678        return status, json_parsed
679
680    def _get_design_doc(self, bucket, name):
681        api = self.capiBaseUrl + '/%s/_design/%s' % (bucket, name)
682        if isinstance(bucket, Bucket):
683            api = self.capiBaseUrl + '/%s/_design/%s' % (bucket.name, name)
684
685        status, content, header = self._http_request(api, headers=self._create_capi_headers())
686        json_parsed = json.loads(content)
687        meta_parsed = ""
688        if status:
689            # in dp4 builds meta data is in content, not in header
690            if 'x-couchbase-meta' in header:
691                meta = header['x-couchbase-meta']
692                meta_parsed = json.loads(meta)
693            else:
694                meta_parsed = {}
695                meta_parsed["_rev"] = json_parsed["_rev"]
696                meta_parsed["_id"] = json_parsed["_id"]
697        return status, json_parsed, meta_parsed
698
699    def _delete_design_doc(self, bucket, name):
700        status, design_doc, meta = self._get_design_doc(bucket, name)
701        if not status:
702            raise Exception("unable to find for deletion design document")
703        api = self.capiBaseUrl + '/%s/_design/%s' % (bucket, name)
704        if isinstance(bucket, Bucket):
705            api = self.capiBaseUrl + '/%s/_design/%s' % (bucket.name, name)
706        status, content, header = self._http_request(api, 'DELETE',
707                                                     headers=self._create_capi_headers())
708        json_parsed = json.loads(content)
709        return status, json_parsed
710
711    def spatial_compaction(self, bucket, design_name):
712        api = self.capiBaseUrl + '/%s/_design/%s/_spatial/_compact' % (bucket, design_name)
713        if isinstance(bucket, Bucket):
714            api = self.capiBaseUrl + \
715            '/%s/_design/%s/_spatial/_compact' % (bucket.name, design_name)
716
717        status, content, header = self._http_request(api, 'POST',
718                                                     headers=self._create_capi_headers())
719        json_parsed = json.loads(content)
720        return status, json_parsed
721
722    # Make a _design/_info request
723    def set_view_info(self, bucket, design_name):
724        """Get view diagnostic info (node specific)"""
725        api = self.capiBaseUrl
726        if isinstance(bucket, Bucket):
727            api += '/_set_view/{0}/_design/{1}/_info'.format(bucket.name, design_name)
728        else:
729            api += '_set_view/{0}/_design/{1}/_info'.format(bucket, design_name)
730
731        status, content, header = self._http_request(api, 'GET',
732                                                     headers=self._create_capi_headers())
733        if not status:
734            raise SetViewInfoNotFound(design_name, content)
735        json_parsed = json.loads(content)
736        return status, json_parsed
737
738    # Make a _spatial/_info request
739    def spatial_info(self, bucket, design_name):
740        api = self.capiBaseUrl + \
741            '/%s/_design/%s/_spatial/_info' % (bucket, design_name)
742        status, content, header = self._http_request(
743            api, 'GET', headers=self._create_capi_headers())
744        json_parsed = json.loads(content)
745        return status, json_parsed
746
747    def _create_capi_headers(self):
748        authorization = base64.encodestring('%s:%s' % (self.username, self.password))
749        return {'Content-Type': 'application/json',
750                'Authorization': 'Basic %s' % authorization,
751                'Accept': '*/*'}
752
753    def _create_capi_headers_with_auth(self, username, password):
754        authorization = base64.encodestring(
755            '%s:%s' % (username, password))
756        return {'Content-Type': 'application/json',
757                'Authorization': 'Basic %s' % authorization,
758                'Accept': '*/*'}
759
760    def _create_headers_with_auth(self, username, password):
761        authorization = base64.encodestring('%s:%s' % (username, password))
762        return {'Authorization': 'Basic %s' % authorization}
763
764    # authorization must be a base64 string of username:password
765    def _create_headers(self):
766        authorization = base64.encodestring('%s:%s' % (self.username, self.password))
767        return {'Content-Type': 'application/x-www-form-urlencoded',
768                'Authorization': 'Basic %s' % authorization,
769                'Accept': '*/*'}
770
771    # authorization must be a base64 string of username:password
772    def _create_headers_encoded_prepared(self):
773        authorization = base64.encodestring('%s:%s' % (self.username, self.password))
774        return {'Content-Type': 'application/json',
775                'Authorization': 'Basic %s' % authorization}
776
777    def _get_auth(self, headers):
778        key = 'Authorization'
779        if key in headers:
780            val = headers[key]
781            if val.startswith("Basic "):
782                return "auth: " + base64.decodestring(val[6:])
783        return ""
784
785    def _http_request(self, api, method='GET', params='', headers=None, timeout=120):
786        if not headers:
787            headers = self._create_headers()
788        end_time = time.time() + timeout
789        log.debug("Executing {0} request for following api {1} with Params: {2}  and Headers: {3}"\
790                                                                .format(method,api,params,headers))
791        count = 1
792        while True:
793            try:
794                response, content = httplib2.Http(timeout=timeout).request(api, method,
795                                                                           params, headers)
796                if response['status'] in ['200', '201', '202']:
797                    return True, content, response
798                else:
799                    try:
800                        json_parsed = json.loads(content)
801                    except ValueError as e:
802                        json_parsed = {}
803                        json_parsed["error"] = "status: {0}, content: {1}"\
804                                                           .format(response['status'], content)
805                    reason = "unknown"
806                    if "error" in json_parsed:
807                        reason = json_parsed["error"]
808                    message = '{0} {1} body: {2} headers: {3} error: {4} reason: {5} {6} {7}'.\
809                              format(method, api, params, headers, response['status'], reason,
810                                     content.rstrip('\n'), self._get_auth(headers))
811                    log.error(message)
812                    log.debug(''.join(traceback.format_stack()))
813                    return False, content, response
814            except socket.error as e:
815                if count < 4:
816                    log.error("socket error while connecting to {0} error {1} ".format(api, e))
817                if time.time() > end_time:
818                    log.error("Tried ta connect {0} times".format(count))
819                    raise ServerUnavailableException(ip=self.ip)
820            except httplib2.ServerNotFoundError as e:
821                if count < 4:
822                    log.error("ServerNotFoundError error while connecting to {0} error {1} "\
823                                                                              .format(api, e))
824                if time.time() > end_time:
825                    log.error("Tried ta connect {0} times".format(count))
826                    raise ServerUnavailableException(ip=self.ip)
827            time.sleep(3)
828            count += 1
829
830    def init_cluster(self, username='Administrator', password='password', port='8091'):
831        api = self.baseUrl + 'settings/web'
832        params = urllib.urlencode({'port': port,
833                                   'username': username,
834                                   'password': password})
835        log.info('settings/web params on {0}:{1}:{2}'.format(self.ip, self.port, params))
836        status, content, header = self._http_request(api, 'POST', params)
837        return status
838
839    def init_node(self):
840        """ need a standalone method to initialize a node that could call
841            anywhere with quota from testconstant """
842        self.node_services = []
843        if self.services_node_init is None and self.services == "":
844            self.node_services = ["kv"]
845        elif self.services_node_init is None and self.services != "":
846            self.node_services = self.services.split(",")
847        elif self.services_node_init is not None:
848            self.node_services = self.services_node_init.split("-")
849        kv_quota = 0
850        while kv_quota == 0:
851            time.sleep(1)
852            kv_quota = int(self.get_nodes_self().mcdMemoryReserved)
853        info = self.get_nodes_self()
854        kv_quota = int(info.mcdMemoryReserved * 2 / 3)
855
856        cb_version = info.version[:5]
857        if cb_version in COUCHBASE_FROM_VERSION_4:
858            if "index" in self.node_services:
859                log.info("quota for index service will be %s MB" % (INDEX_QUOTA))
860                kv_quota -= INDEX_QUOTA
861                log.info("set index quota to node %s " % self.ip)
862                self.set_service_memoryQuota(service='indexMemoryQuota', memoryQuota=INDEX_QUOTA)
863            if "fts" in self.node_services:
864                log.info("quota for fts service will be %s MB" % (FTS_QUOTA))
865                kv_quota -= FTS_QUOTA
866                log.info("set both index and fts quota at node %s "% self.ip)
867                self.set_service_memoryQuota(service='ftsMemoryQuota', memoryQuota=FTS_QUOTA)
868            if "cbas" in self.node_services:
869                log.info("quota for cbas service will be %s MB" % (CBAS_QUOTA))
870                kv_quota -= CBAS_QUOTA
871                self.set_service_memoryQuota(service = "cbasMemoryQuota", memoryQuota=CBAS_QUOTA)
872            kv_quota -= 1
873            if kv_quota < MIN_KV_QUOTA:
874                    raise Exception("KV RAM needs to be more than %s MB"
875                            " at node  %s"  % (MIN_KV_QUOTA, self.ip))
876
877        log.info("quota for kv: %s MB" % kv_quota)
878        self.init_cluster_memoryQuota(self.username, self.password, kv_quota)
879        if cb_version in COUCHBASE_FROM_VERSION_4:
880            self.init_node_services(username=self.username, password=self.password,
881                                                       services=self.node_services)
882        self.init_cluster(username=self.username, password=self.password)
883
884    def init_node_services(self, username='Administrator', password='password', hostname='127.0.0.1', port='8091', services=None):
885        api = self.baseUrl + '/node/controller/setupServices'
886        if services == None:
887            log.info(" services are marked as None, will not work")
888            return False
889        if hostname == "127.0.0.1":
890            hostname = "{0}:{1}".format(hostname, port)
891        params = urllib.urlencode({ 'hostname': hostname,
892                                    'user': username,
893                                    'password': password,
894                                    'services': ",".join(services)})
895        log.info('/node/controller/setupServices params on {0}: {1}:{2}'.format(self.ip, self.port, params))
896        status, content, header = self._http_request(api, 'POST', params)
897        error_message = "cannot change node services after cluster is provisioned"
898        if not status and error_message in content:
899            status = True
900            log.info("This node is already provisioned with services, we do not consider this as failure for test case")
901        return status
902
903    def get_cluster_settings(self):
904        settings = {}
905        api = self.baseUrl + 'settings/web'
906        status, content, header = self._http_request(api, 'GET')
907        if status:
908            settings = json.loads(content)
909        log.info('settings/web params on {0}:{1}:{2}'.format(self.ip, self.port, settings))
910        return settings
911
912    def init_cluster_memoryQuota(self, username='Administrator',
913                                 password='password',
914                                 memoryQuota=256):
915        api = self.baseUrl + 'pools/default'
916        params = urllib.urlencode({'memoryQuota': memoryQuota})
917        log.info('pools/default params : {0}'.format(params))
918        status, content, header = self._http_request(api, 'POST', params)
919        return status
920
921    def set_service_memoryQuota(self, service, username='Administrator',
922                                 password='password',
923                                 memoryQuota=256):
924        ''' cbasMemoryQuota for cbas service.
925            ftsMemoryQuota for fts service.
926            indexMemoryQuota for index service.'''
927        api = self.baseUrl + 'pools/default'
928        params = urllib.urlencode({service: memoryQuota})
929        log.info('pools/default params : {0}'.format(params))
930        status, content, header = self._http_request(api, 'POST', params)
931        return status
932
933    def set_cluster_name(self, name):
934        api = self.baseUrl + 'pools/default'
935        if name is None:
936            name = ""
937        params = urllib.urlencode({'clusterName': name})
938        log.info('pools/default params : {0}'.format(params))
939        status, content, header = self._http_request(api, 'POST', params)
940        return status
941
942    def set_indexer_storage_mode(self, username='Administrator',
943                                 password='password',
944                                 storageMode='plasma'):
945        """
946           StorageMode could be plasma or memopt
947           From spock, we replace forestdb with plasma
948        """
949        api = self.baseUrl + 'settings/indexes'
950        params = urllib.urlencode({'storageMode': storageMode})
951        error_message = "storageMode must be one of plasma, memory_optimized"
952        log.info('settings/indexes params : {0}'.format(params))
953        status, content, header = self._http_request(api, 'POST', params)
954        if not status and error_message in content:
955            #TODO: Currently it just acknowledges if there is an error.
956            #And proceeds with further initialization.
957            log.info(content)
958        return status
959
960    def set_indexer_num_replica(self,
961                                 num_replica=0):
962        api = self.index_baseUrl + 'settings'
963        params = {'indexer.settings.num_replica': num_replica}
964        params = json.dumps(params)
965        status, content, header = self._http_request(api, 'POST',
966                                                     params=params,
967                                                     timeout=60)
968        error_message = ""
969        log.info('settings params : {0}'.format(params))
970        status, content, header = self._http_request(api, 'POST', params)
971        if not status and error_message in content:
972            # TODO: Currently it just acknowledges if there is an error.
973            # And proceeds with further initialization.
974            log.info(content)
975        return status
976
977    def cleanup_indexer_rebalance(self, server):
978        if server:
979            api = "http://{0}:{1}/".format(server.ip, self.index_port) + 'cleanupRebalance'
980        else:
981            api = self.baseUrl + 'cleanupRebalance'
982        status, content, _ = self._http_request(api, 'GET')
983        if status:
984            return content
985        else:
986            log.error("cleanupRebalance:{0},content:{1}".format(status, content))
987            raise Exception("indexer rebalance cleanup failed")
988
989    def list_indexer_rebalance_tokens(self, server):
990        if server:
991            api = "http://{0}:{1}/".format(server.ip, self.index_port) + 'listRebalanceTokens'
992        else:
993            api = self.baseUrl + 'listRebalanceTokens'
994        print api
995        status, content, _ = self._http_request(api, 'GET')
996        if status:
997            return content
998        else:
999            log.error("listRebalanceTokens:{0},content:{1}".format(status, content))
1000            raise Exception("list rebalance tokens failed")
1001
1002    def execute_statement_on_cbas(self, statement, mode, pretty=True,
1003                                  timeout=70, client_context_id=None,
1004                                  username=None, password=None):
1005        if not username:
1006            username = self.username
1007        if not password:
1008            password = self.password
1009        api = self.cbas_base_url + "/analytics/service"
1010        headers = self._create_capi_headers_with_auth(username, password)
1011
1012        params = {'statement': statement, 'mode': mode, 'pretty': pretty,
1013                  'client_context_id': client_context_id}
1014        params = json.dumps(params)
1015        status, content, header = self._http_request(api, 'POST',
1016                                                     headers=headers,
1017                                                     params=params,
1018                                                     timeout=timeout)
1019        if status:
1020            return content
1021        elif str(header['status']) == '503':
1022            log.info("Request Rejected")
1023            raise Exception("Request Rejected")
1024        elif str(header['status']) in ['500','400']:
1025            json_content = json.loads(content)
1026            msg = json_content['errors'][0]['msg']
1027            if "Job requirement" in  msg and "exceeds capacity" in msg:
1028                raise Exception("Capacity cannot meet job requirement")
1029            else:
1030                return content
1031        else:
1032            log.error("/analytics/service status:{0},content:{1}".format(
1033                status, content))
1034            raise Exception("Analytics Service API failed")
1035
1036    def delete_active_request_on_cbas(self, client_context_id, username=None, password=None):
1037        if not username:
1038            username = self.username
1039        if not password:
1040            password = self.password
1041
1042        api = self.cbas_base_url + "/analytics/admin/active_requests?client_context_id={0}".format(
1043            client_context_id)
1044        headers = self._create_capi_headers_with_auth(username, password)
1045
1046        status, content, header = self._http_request(api, 'DELETE',
1047                                                     headers=headers,
1048                                                     timeout=60)
1049        if status:
1050            return header['status']
1051        elif str(header['status']) == '404':
1052            log.info("Request Not Found")
1053            return header['status']
1054        else:
1055            log.error(
1056                "/analytics/admin/active_requests status:{0},content:{1}".format(
1057                    status, content))
1058            raise Exception("Analytics Admin API failed")
1059
1060    def get_cluster_ceritificate(self):
1061        api = self.baseUrl + 'pools/default/certificate'
1062        status, content, _ = self._http_request(api, 'GET')
1063        if status:
1064            return content
1065        else:
1066            log.error("/poos/default/certificate status:{0},content:{1}".format(status, content))
1067            raise Exception("certificate API failed")
1068
1069    def regenerate_cluster_certificate(self):
1070        api = self.baseUrl + 'controller/regenerateCertificate'
1071        status, content, _ = self._http_request(api, 'POST')
1072        if status:
1073            return content
1074        else:
1075            log.error("controller/regenerateCertificate status:{0},content:{1}".format(status, content))
1076            raise Exception("regenerateCertificate API failed")
1077
1078    def __remote_clusters(self, api, op, remoteIp, remotePort, username, password, name, demandEncryption=0,
1079                          certificate='', encryptionType="half"):
1080        param_map = {'hostname': "{0}:{1}".format(remoteIp, remotePort),
1081                        'username': username,
1082                        'password': password,
1083                        'name':name}
1084        from TestInput import TestInputServer
1085        remote = TestInputServer()
1086        remote.ip = remoteIp
1087        remote.rest_username = username
1088        remote.rest_password = password
1089        remote.port = remotePort
1090        if demandEncryption:
1091            param_map ['demandEncryption'] = 'on'
1092            if certificate != '':
1093                param_map['certificate'] = certificate
1094            if self.check_node_versions("5.5") and RestConnection(remote).check_node_versions("5.5"):
1095                # 5.5.0 and above
1096                param_map['secureType'] = encryptionType
1097            elif self.check_node_versions("5.0") and RestConnection(remote).check_node_versions("5.0"):
1098                param_map['encryptionType'] = encryptionType
1099        params = urllib.urlencode(param_map)
1100        status, content, _ = self._http_request(api, 'POST', params)
1101        # sample response :
1102        # [{"name":"two","uri":"/pools/default/remoteClusters/two","validateURI":"/pools/default/remoteClusters/two?just_validate=1","hostname":"127.0.0.1:9002","username":"Administrator"}]
1103        if status:
1104            remoteCluster = json.loads(content)
1105        else:
1106            log.error("/remoteCluster failed : status:{0},content:{1}".format(status, content))
1107            raise Exception("remoteCluster API '{0} remote cluster' failed".format(op))
1108        return remoteCluster
1109
1110    def add_remote_cluster(self, remoteIp, remotePort, username, password, name, demandEncryption=0, certificate='',
1111                           encryptionType="full"):
1112        # example : password:password username:Administrator hostname:127.0.0.1:9002 name:two
1113        msg = "adding remote cluster hostname:{0}:{1} with username:password {2}:{3} name:{4} to source node: {5}:{6}"
1114        log.info(msg.format(remoteIp, remotePort, username, password, name, self.ip, self.port))
1115        api = self.baseUrl + 'pools/default/remoteClusters'
1116        return self.__remote_clusters(api, 'add', remoteIp, remotePort, username, password, name, demandEncryption, certificate, encryptionType)
1117
1118    def add_remote_cluster_new(self, remoteIp, remotePort, username, password, name, demandEncryption=0, certificate=''):
1119        # example : password:password username:Administrator hostname:127.0.0.1:9002 name:two
1120        msg = "adding remote cluster hostname:{0}:{1} with username:password {2}:{3} name:{4} to source node: {5}:{6}"
1121        log.info(msg.format(remoteIp, remotePort, username, password, name, self.ip, self.port))
1122        api = self.baseUrl + 'pools/default/remoteClusters'
1123        return self.__remote_clusters(api, 'add', remoteIp, remotePort, username, password, name, demandEncryption, certificate)
1124
1125    def modify_remote_cluster(self, remoteIp, remotePort, username, password, name, demandEncryption=0, certificate='', encryptionType="half"):
1126        log.info("modifying remote cluster name:{0}".format(name))
1127        api = self.baseUrl + 'pools/default/remoteClusters/' + urllib.quote(name)
1128        return self.__remote_clusters(api, 'modify', remoteIp, remotePort, username, password, name, demandEncryption, certificate, encryptionType)
1129
1130    def get_remote_clusters(self):
1131        remote_clusters = []
1132        api = self.baseUrl + 'pools/default/remoteClusters/'
1133        params = urllib.urlencode({})
1134        status, content, header = self._http_request(api, 'GET', params)
1135        if status:
1136            remote_clusters = json.loads(content)
1137        return remote_clusters
1138
1139    def remove_all_remote_clusters(self):
1140        remote_clusters = self.get_remote_clusters()
1141        for remote_cluster in remote_clusters:
1142            try:
1143                if remote_cluster["deleted"] == False:
1144                    self.remove_remote_cluster(remote_cluster["name"])
1145            except KeyError:
1146                # goxdcr cluster references will not contain "deleted" field
1147                self.remove_remote_cluster(remote_cluster["name"])
1148
1149    def remove_remote_cluster(self, name):
1150        # example : name:two
1151        msg = "removing remote cluster name:{0}".format(urllib.quote(name))
1152        log.info(msg)
1153        api = self.baseUrl + 'pools/default/remoteClusters/{0}?'.format(urllib.quote(name))
1154        params = urllib.urlencode({})
1155        status, content, header = self._http_request(api, 'DELETE', params)
1156        #sample response : "ok"
1157        if not status:
1158            log.error("failed to remove remote cluster: status:{0},content:{1}".format(status, content))
1159            raise Exception("remoteCluster API 'remove cluster' failed")
1160
1161    # replicationType:continuous toBucket:default toCluster:two fromBucket:default
1162    # defaults at https://github.com/couchbase/goxdcr/metadata/replication_settings.go#L20-L33
1163    def start_replication(self, replicationType, fromBucket, toCluster, rep_type="xmem", toBucket=None, xdcr_params={}):
1164        toBucket = toBucket or fromBucket
1165        msg = "starting {0} replication type:{1} from {2} to {3} in the remote" \
1166              " cluster {4} with settings {5}"
1167        log.info(msg.format(replicationType, rep_type, fromBucket, toBucket,
1168                            toCluster, xdcr_params))
1169        api = self.baseUrl + 'controller/createReplication'
1170        param_map = {'replicationType': replicationType,
1171                     'toBucket': toBucket,
1172                     'fromBucket': fromBucket,
1173                     'toCluster': toCluster,
1174                     'type': rep_type}
1175        param_map.update(xdcr_params)
1176        params = urllib.urlencode(param_map)
1177        status, content, _ = self._http_request(api, 'POST', params)
1178        # response : {"id": "replication_id"}
1179        if status:
1180            json_parsed = json.loads(content)
1181            log.info("Replication created with id: {0}".format(json_parsed['id']))
1182            return json_parsed['id']
1183        else:
1184            log.error("/controller/createReplication failed : status:{0},content:{1}".format(status, content))
1185            raise Exception("create replication failed : status:{0},content:{1}".format(status, content))
1186
1187    def get_replications(self):
1188        replications = []
1189        content = self.ns_server_tasks()
1190        for item in content:
1191            if not isinstance(item, dict):
1192                log.error("Unexpected error while retrieving pools/default/tasks : {0}".format(content))
1193                raise Exception("Unexpected error while retrieving pools/default/tasks : {0}".format(content))
1194            if item["type"] == "xdcr":
1195                replications.append(item)
1196        return replications
1197
1198    def remove_all_replications(self):
1199        replications = self.get_replications()
1200        for replication in replications:
1201            self.stop_replication(replication["cancelURI"])
1202
1203    def stop_replication(self, uri):
1204        log.info("Deleting replication {0}".format(uri))
1205        api = self.baseUrl[:-1] + uri
1206        status, content, _ = self._http_request(api, 'DELETE')
1207        if status:
1208            log.info("Replication deleted successfully")
1209        else:
1210            log.error("/controller/cancelXDCR failed: status:{0}, content:{1}".format(status, content))
1211            raise Exception("delete replication failed : status:{0}, content:{1}".format(status, content))
1212
1213    def remove_all_recoveries(self):
1214        recoveries = []
1215        content = self.ns_server_tasks()
1216        for item in content:
1217            if item["type"] == "recovery":
1218                recoveries.append(item)
1219        for recovery in recoveries:
1220            api = self.baseUrl + recovery["stopURI"]
1221            status, content, header = self._http_request(api, 'POST')
1222            if not status:
1223                raise CBRecoveryFailedException("impossible to stop cbrecovery by {0}".format(api))
1224            log.info("recovery stopped by {0}".format(api))
1225
1226    # params serverIp : the server to add to this cluster
1227    # raises exceptions when
1228    # unauthorized user
1229    # server unreachable
1230    # can't add the node to itself ( TODO )
1231    # server already added
1232    # returns otpNode
1233    def add_node(self, user='', password='', remoteIp='', port='8091', zone_name='', services=None):
1234        otpNode = None
1235
1236        # if ip format is ipv6 and enclosing brackets are not found,
1237        # enclose self.ip and remoteIp
1238        if self.ip.count(':') and self.ip[0] != '[':
1239            self.ip = '[' + self.ip + ']'
1240        if remoteIp.count(':') and remoteIp[0] != '[':
1241            remoteIp = '[' + remoteIp + ']'
1242
1243        log.info('adding remote node @{0}:{1} to this cluster @{2}:{3}'\
1244                          .format(remoteIp, port, self.ip, self.port))
1245        if zone_name == '':
1246            api = self.baseUrl + 'controller/addNode'
1247        else:
1248            api = self.baseUrl + 'pools/default/serverGroups'
1249            if self.is_zone_exist(zone_name):
1250                zones = self.get_zone_names()
1251                api = "/".join((api, zones[zone_name], "addNode"))
1252                log.info("node {0} will be added to zone {1}".format(remoteIp, zone_name))
1253            else:
1254                raise Exception("There is not zone with name: %s in cluster" % zone_name)
1255
1256        params = urllib.urlencode({'hostname': "{0}:{1}".format(remoteIp, port),
1257                                   'user': user,
1258                                   'password': password})
1259        if services != None:
1260            services = ','.join(services)
1261            params = urllib.urlencode({'hostname': "{0}:{1}".format(remoteIp, port),
1262                                   'user': user,
1263                                   'password': password,
1264                                   'services': services})
1265        status, content, header = self._http_request(api, 'POST', params)
1266        if status:
1267            json_parsed = json.loads(content)
1268            otpNodeId = json_parsed['otpNode']
1269            otpNode = OtpNode(otpNodeId)
1270            if otpNode.ip == '127.0.0.1':
1271                otpNode.ip = self.ip
1272        else:
1273            self.print_UI_logs()
1274            try:
1275                # print logs from node that we want to add
1276                wanted_node = deepcopy(self)
1277                wanted_node.ip = remoteIp
1278                wanted_node.print_UI_logs()
1279            except Exception, ex:
1280                self.log(ex)
1281            if content.find('Prepare join failed. Node is already part of cluster') >= 0:
1282                raise ServerAlreadyJoinedException(nodeIp=self.ip,
1283                                                   remoteIp=remoteIp)
1284            elif content.find('Prepare join failed. Joining node to itself is not allowed') >= 0:
1285                raise ServerSelfJoinException(nodeIp=self.ip,
1286                                          remoteIp=remoteIp)
1287            else:
1288                log.error('add_node error : {0}'.format(content))
1289                raise AddNodeException(nodeIp=self.ip,
1290                                          remoteIp=remoteIp,
1291                                          reason=content)
1292        return otpNode
1293
1294        # params serverIp : the server to add to this cluster
1295    # raises exceptions when
1296    # unauthorized user
1297    # server unreachable
1298    # can't add the node to itself ( TODO )
1299    # server already added
1300    # returns otpNode
1301    def do_join_cluster(self, user='', password='', remoteIp='', port='8091', zone_name='', services=None):
1302        otpNode = None
1303        log.info('adding remote node @{0}:{1} to this cluster @{2}:{3}'\
1304                          .format(remoteIp, port, self.ip, self.port))
1305        api = self.baseUrl + '/node/controller/doJoinCluster'
1306        params = urllib.urlencode({'hostname': "{0}:{1}".format(remoteIp, port),
1307                                   'user': user,
1308                                   'password': password})
1309        if services != None:
1310            services = ','.join(services)
1311            params = urllib.urlencode({'hostname': "{0}:{1}".format(remoteIp, port),
1312                                   'user': user,
1313                                   'password': password,
1314                                   'services': services})
1315        status, content, header = self._http_request(api, 'POST', params)
1316        if status:
1317            json_parsed = json.loads(content)
1318            otpNodeId = json_parsed['otpNode']
1319            otpNode = OtpNode(otpNodeId)
1320            if otpNode.ip == '127.0.0.1':
1321                otpNode.ip = self.ip
1322        else:
1323            self.print_UI_logs()
1324            try:
1325                # print logs from node that we want to add
1326                wanted_node = deepcopy(self)
1327                wanted_node.ip = remoteIp
1328                wanted_node.print_UI_logs()
1329            except Exception, ex:
1330                self.log(ex)
1331            if content.find('Prepare join failed. Node is already part of cluster') >= 0:
1332                raise ServerAlreadyJoinedException(nodeIp=self.ip,
1333                                                   remoteIp=remoteIp)
1334            elif content.find('Prepare join failed. Joining node to itself is not allowed') >= 0:
1335                raise ServerSelfJoinException(nodeIp=self.ip,
1336                                          remoteIp=remoteIp)
1337            else:
1338                log.error('add_node error : {0}'.format(content))
1339                raise AddNodeException(nodeIp=self.ip,
1340                                          remoteIp=remoteIp,
1341                                          reason=content)
1342        return otpNode
1343
1344
1345    def eject_node(self, user='', password='', otpNode=None):
1346        if not otpNode:
1347            log.error('otpNode parameter required')
1348            return False
1349        api = self.baseUrl + 'controller/ejectNode'
1350        params = urllib.urlencode({'otpNode': otpNode,
1351                                   'user': user,
1352                                   'password': password})
1353        status, content, header = self._http_request(api, 'POST', params)
1354        if status:
1355            log.info('ejectNode successful')
1356        else:
1357            if content.find('Prepare join failed. Node is already part of cluster') >= 0:
1358                raise ServerAlreadyJoinedException(nodeIp=self.ip,
1359                                                   remoteIp=otpNode)
1360            else:
1361                # TODO : raise an exception here
1362                log.error('eject_node error {0}'.format(content))
1363        return True
1364
1365    def force_eject_node(self):
1366        self.diag_eval("gen_server:cast(ns_cluster, leave).")
1367        self.check_delay_restart_coucbase_server()
1368
1369    """ when we do reset couchbase server by force reject, couchbase server will not
1370        down right away but delay few seconds to be down depend on server spec.
1371        This fx will detect that delay and return true when couchbase server down and
1372        up again after force reject """
1373    def check_delay_restart_coucbase_server(self):
1374        api = self.baseUrl + 'nodes/self'
1375        headers = self._create_headers()
1376        break_out = 0
1377        count_cbserver_up = 0
1378        while break_out < 60 and count_cbserver_up < 2:
1379            try:
1380                response, content = httplib2.Http(timeout=120).request(api, 'GET', '', headers)
1381                if response['status'] in ['200', '201', '202'] and count_cbserver_up == 0:
1382                    log.info("couchbase server is up but down soon.")
1383                    time.sleep(1)
1384                    break_out += 1  # time needed for couchbase server reload after reset config
1385                elif response['status'] in ['200', '201', '202']:
1386                    count_cbserver_up = 2
1387                    log.info("couchbase server is up again")
1388            except socket.error as e:
1389                log.info("couchbase server is down.  Waiting for couchbase server up")
1390                time.sleep(2)
1391                break_out += 1
1392                count_cbserver_up = 1
1393                pass
1394        if break_out >= 60:
1395            raise Exception("Couchbase server did not start after 60 seconds")
1396
1397    def fail_over(self, otpNode=None, graceful=False):
1398        if otpNode is None:
1399            log.error('otpNode parameter required')
1400            return False
1401        api = self.baseUrl + 'controller/failOver'
1402        if graceful:
1403            api = self.baseUrl + 'controller/startGracefulFailover'
1404        params = urllib.urlencode({'otpNode': otpNode})
1405        status, content, header = self._http_request(api, 'POST', params)
1406        if status:
1407            log.info('fail_over node {0} successful'.format(otpNode))
1408        else:
1409            log.error('fail_over node {0} error : {1}'.format(otpNode, content))
1410            raise FailoverFailedException(content)
1411        return status
1412
1413    def set_recovery_type(self, otpNode=None, recoveryType=None):
1414        log.info("Going to set recoveryType={0} for node :: {1}".format(recoveryType, otpNode))
1415        if otpNode == None:
1416            log.error('otpNode parameter required')
1417            return False
1418        if recoveryType == None:
1419            log.error('recoveryType is not set')
1420            return False
1421        api = self.baseUrl + 'controller/setRecoveryType'
1422        params = urllib.urlencode({'otpNode': otpNode,
1423                                   'recoveryType': recoveryType})
1424        status, content, header = self._http_request(api, 'POST', params)
1425        if status:
1426            log.info('recoveryType for node {0} set successful'.format(otpNode))
1427        else:
1428            log.error('recoveryType node {0} not set with error : {1}'.format(otpNode, content))
1429            raise SetRecoveryTypeFailed(content)
1430        return status
1431
1432    def add_back_node(self, otpNode=None):
1433        if otpNode is None:
1434            log.error('otpNode parameter required')
1435            return False
1436        api = self.baseUrl + 'controller/reAddNode'
1437        params = urllib.urlencode({'otpNode': otpNode})
1438        status, content, header = self._http_request(api, 'POST', params)
1439        if status:
1440            log.info('add_back_node {0} successful'.format(otpNode))
1441        else:
1442            log.error('add_back_node {0} error : {1}'.format(otpNode, content))
1443            raise InvalidArgumentException('controller/reAddNode',
1444                                           parameters=params)
1445        return status
1446
1447    def rebalance(self, otpNodes=[], ejectedNodes=[], deltaRecoveryBuckets=None):
1448        knownNodes = ','.join(otpNodes)
1449        ejectedNodesString = ','.join(ejectedNodes)
1450        if deltaRecoveryBuckets == None:
1451            params = {'knownNodes': knownNodes,
1452                                    'ejectedNodes': ejectedNodesString,
1453                                    'user': self.username,
1454                                    'password': self.password}
1455        else:
1456            deltaRecoveryBuckets = ",".join(deltaRecoveryBuckets)
1457            params = {'knownNodes': knownNodes,
1458                      'ejectedNodes': ejectedNodesString,
1459                      'deltaRecoveryBuckets': deltaRecoveryBuckets,
1460                      'user': self.username,
1461                      'password': self.password}
1462        log.info('rebalance params : {0}'.format(params))
1463        params = urllib.urlencode(params)
1464        api = self.baseUrl + "controller/rebalance"
1465        status, content, header = self._http_request(api, 'POST', params)
1466        if status:
1467            log.info('rebalance operation started')
1468        else:
1469            log.error('rebalance operation failed: {0}'.format(content))
1470            # extract the error
1471            raise InvalidArgumentException('controller/rebalance with error message {0}'.format(content),
1472                                           parameters=params)
1473        return status
1474
1475    def diag_eval(self, code, print_log=True):
1476        api = '{0}{1}'.format(self.baseUrl, 'diag/eval/')
1477        status, content, header = self._http_request(api, "POST", code)
1478        if print_log:
1479            log.info("/diag/eval status on {0}:{1}: {2} content: {3} command: {4}".
1480                     format(self.ip, self.port, status, content, code))
1481        return status, content
1482
1483    def set_chk_max_items(self, max_items):
1484        status, content = self.diag_eval("ns_config:set(chk_max_items, " + str(max_items) + ")")
1485        return status, content
1486
1487    def set_chk_period(self, period):
1488        status, content = self.diag_eval("ns_config:set(chk_period, " + str(period) + ")")
1489        return status, content
1490
1491    def set_enable_flow_control(self, flow=True, bucket='default'):
1492        flow_control = "false"
1493        if flow:
1494           flow_control = "true"
1495        code = "ns_bucket:update_bucket_props(\"" + bucket + "\", [{extra_config_string, \"upr_enable_flow_control=" + flow_control + "\"}])"
1496        status, content = self.diag_eval(code)
1497        return status, content
1498
1499    def diag_master_events(self):
1500        api = '{0}{1}'.format(self.baseUrl, 'diag/masterEvents?o=1')
1501        status, content, header = self._http_request(api, "GET")
1502        log.info("diag/masterEvents?o=1 status: {0} content: {1}".format(status, content))
1503        return status, content
1504
1505
1506    def get_admin_credentials(self):
1507
1508        code = 'ns_config:search_node_prop(node(), ns_config:latest(), memcached, admin_user)'
1509        status, id = self.diag_eval(code)
1510
1511        code = 'ns_config:search_node_prop(node(), ns_config:latest(), memcached, admin_pass)'
1512        status, password = self.diag_eval(code)
1513        return id.strip('"'), password.strip('"')
1514
1515    def monitorRebalance(self, stop_if_loop=True):
1516        start = time.time()
1517        progress = 0
1518        retry = 0
1519        same_progress_count = 0
1520        previous_progress = 0
1521        while progress != -1 and (progress != 100 or \
1522                    self._rebalance_progress_status() == 'running') and retry < 20:
1523            # -1 is error , -100 means could not retrieve progress
1524            progress = self._rebalance_progress()
1525            if progress == -100:
1526                log.error("unable to retrieve rebalanceProgress.try again in 1 second")
1527                retry += 1
1528            else:
1529                retry = 0
1530            if stop_if_loop:
1531                # reset same_progress_count if get a different result,
1532                # or progress is still O
1533                # (it may take a long time until the results are different from 0)
1534                if previous_progress != progress or progress == 0:
1535                    previous_progress = progress
1536                    same_progress_count = 0
1537                else:
1538                    same_progress_count += 1
1539                if same_progress_count > 50:
1540                    log.error("apparently rebalance progress code in infinite loop:"
1541                                                             " {0}".format(progress))
1542                    return False
1543            # sleep 10 seconds to printout less log
1544            time.sleep(10)
1545        if progress < 0:
1546            log.error("rebalance progress code : {0}".format(progress))
1547            return False
1548        else:
1549            duration = time.time() - start
1550            if duration > 10:
1551                sleep = 10
1552            else:
1553                sleep = duration
1554            log.info('rebalance progress took {:.02f} seconds '.format(duration))
1555            log.info("sleep for {0} seconds after rebalance...".format(sleep))
1556            time.sleep(sleep)
1557            return True
1558
1559    def _rebalance_progress_status(self):
1560        api = self.baseUrl + "pools/default/rebalanceProgress"
1561        status, content, header = self._http_request(api)
1562        json_parsed = json.loads(content)
1563        if status:
1564            if "status" in json_parsed:
1565                return json_parsed['status']
1566        else:
1567            return None
1568
1569    def _rebalance_status_and_progress(self):
1570        """
1571        Returns a 2-tuple capturing the rebalance status and progress, as follows:
1572            ('running', progress) - if rebalance is running
1573            ('none', 100)         - if rebalance is not running (i.e. assumed done)
1574            (None, -100)          - if there's an error getting the rebalance progress
1575                                    from the server
1576            (None, -1)            - if the server responds but there's no information on
1577                                    what the status of rebalance is
1578
1579        The progress is computed as a average of the progress of each node
1580        rounded to 2 decimal places.
1581
1582        Throws RebalanceFailedException if rebalance progress returns an error message
1583        """
1584        avg_percentage = -1
1585        rebalance_status = None
1586        api = self.baseUrl + "pools/default/rebalanceProgress"
1587        try:
1588            status, content, header = self._http_request(api)
1589        except ServerUnavailableException as e:
1590            log.error(e)
1591            return None, -100
1592        json_parsed = json.loads(content)
1593        if status:
1594            if "status" in json_parsed:
1595                rebalance_status = json_parsed["status"]
1596                if "errorMessage" in json_parsed:
1597                    msg = '{0} - rebalance failed'.format(json_parsed)
1598                    log.error(msg)
1599                    self.print_UI_logs()
1600                    raise RebalanceFailedException(msg)
1601                elif rebalance_status == "running":
1602                    total_percentage = 0
1603                    count = 0
1604                    for key in json_parsed:
1605                        if key.find('@') >= 0:
1606                            ns_1_dictionary = json_parsed[key]
1607                            percentage = ns_1_dictionary['progress'] * 100
1608                            count += 1
1609                            total_percentage += percentage
1610                    if count:
1611                        avg_percentage = (total_percentage / count)
1612                    else:
1613                        avg_percentage = 0
1614                    log.info('rebalance percentage : {0:.02f} %'.
1615                             format(round(avg_percentage, 2)))
1616                else:
1617                    avg_percentage = 100
1618        else:
1619            avg_percentage = -100
1620        return rebalance_status, avg_percentage
1621
1622    def _rebalance_progress(self):
1623        return self._rebalance_status_and_progress()[1]
1624
1625    def log_client_error(self, post):
1626        api = self.baseUrl + 'logClientError'
1627        status, content, header = self._http_request(api, 'POST', post)
1628        if not status:
1629            log.error('unable to logClientError')
1630        return status,content,header
1631
1632    def trigger_index_compaction(self, timeout=120):
1633        node = None
1634        api = self.index_baseUrl + 'triggerCompaction'
1635        status, content, header = self._http_request(api, timeout=timeout)
1636        if not status:
1637            raise Exception(content)
1638
1639    def set_index_settings(self, setting_json, timeout=120):
1640        api = self.index_baseUrl + 'settings'
1641        status, content, header = self._http_request(api, 'POST', json.dumps(setting_json))
1642        if not status:
1643            raise Exception(content)
1644        log.info("{0} set".format(setting_json))
1645
1646    def set_index_settings_internal(self, setting_json, timeout=120):
1647        api = self.index_baseUrl + 'internal/settings'
1648        status, content, header = self._http_request(api, 'POST',
1649                                                     json.dumps(setting_json))
1650        if not status:
1651	    if header['status']=='404':
1652                log.info("This endpoint is introduced only in 5.5.0, hence not found. Redirecting the request to the old endpoint")
1653                self.set_index_settings(setting_json,timeout)
1654            else:
1655                raise Exception(content)
1656        log.info("{0} set".format(setting_json))
1657
1658    def get_index_settings(self, timeout=120):
1659        node = None
1660        api = self.index_baseUrl + 'settings'
1661        status, content, header = self._http_request(api, timeout=timeout)
1662        if not status:
1663            raise Exception(content)
1664        return json.loads(content)
1665
1666    def set_index_planner_settings(self, setting, timeout=120):
1667        api = self.index_baseUrl + 'settings/planner?{0}'.format(setting)
1668        status, content, header = self._http_request(api, timeout=timeout)
1669        if not status:
1670            raise Exception(content)
1671        return json.loads(content)
1672
1673    def get_index_stats(self, timeout=120, index_map=None):
1674        api = self.index_baseUrl + 'stats'
1675        status, content, header = self._http_request(api, timeout=timeout)
1676        if status:
1677            json_parsed = json.loads(content)
1678            index_map = RestParser().parse_index_stats_response(json_parsed, index_map=index_map)
1679        return index_map
1680
1681    def get_index_storage_stats(self, timeout=120, index_map=None):
1682        api = self.index_baseUrl + 'stats/storage'
1683        status, content, header = self._http_request(api, timeout=timeout)
1684        if not status:
1685            raise Exception(content)
1686        json_parsed = json.loads(content)
1687        index_storage_stats = {}
1688        for index_stats in json_parsed:
1689            bucket = index_stats["Index"].split(":")[0]
1690            index_name = index_stats["Index"].split(":")[1]
1691            if not bucket in index_storage_stats.keys():
1692                index_storage_stats[bucket] = {}
1693            index_storage_stats[bucket][index_name] = index_stats["Stats"]
1694        return index_storage_stats
1695
1696    def get_indexer_stats(self, timeout=120, index_map=None):
1697        api = self.index_baseUrl + 'stats'
1698        index_map = {}
1699        status, content, header = self._http_request(api, timeout=timeout)
1700        if status:
1701            json_parsed = json.loads(content)
1702            for key in json_parsed.keys():
1703                tokens = key.split(":")
1704                val = json_parsed[key]
1705                if len(tokens) == 1:
1706                    field = tokens[0]
1707                    index_map[field] = val
1708        return index_map
1709
1710    def get_indexer_metadata(self, timeout=120, index_map=None):
1711        api = self.index_baseUrl + 'getIndexStatus'
1712        index_map = {}
1713        status, content, header = self._http_request(api, timeout=timeout)
1714        if status:
1715            json_parsed = json.loads(content)
1716            for key in json_parsed.keys():
1717                tokens = key.split(":")
1718                val = json_parsed[key]
1719                if len(tokens) == 1:
1720                    field = tokens[0]
1721                    index_map[field] = val
1722        return index_map
1723
1724    def get_indexer_internal_stats(self, timeout=120, index_map=None):
1725        api = self.index_baseUrl + 'settings?internal=ok'
1726        index_map = {}
1727        status, content, header = self._http_request(api, timeout=timeout)
1728        if status:
1729            json_parsed = json.loads(content)
1730            for key in json_parsed.keys():
1731                tokens = key.split(":")
1732                val = json_parsed[key]
1733                if len(tokens) == 1:
1734                    field = tokens[0]
1735                    index_map[field] = val
1736        return index_map
1737
1738    def get_index_status(self, timeout=120, index_map=None):
1739        api = self.baseUrl + 'indexStatus'
1740        index_map = {}
1741        status, content, header = self._http_request(api, timeout=timeout)
1742        if status:
1743            json_parsed = json.loads(content)
1744            index_map = RestParser().parse_index_status_response(json_parsed)
1745        return index_map
1746
1747    def get_index_id_map(self, timeout=120):
1748        api = self.baseUrl + 'indexStatus'
1749        index_map = {}
1750        status, content, header = self._http_request(api, timeout=timeout)
1751        if status:
1752            json_parsed = json.loads(content)
1753            for map in json_parsed["indexes"]:
1754                bucket_name = map['bucket'].encode('ascii', 'ignore')
1755                if bucket_name not in index_map.keys():
1756                    index_map[bucket_name] = {}
1757                index_name = map['index'].encode('ascii', 'ignore')
1758                index_map[bucket_name][index_name] = {}
1759                index_map[bucket_name][index_name]['id'] = map['id']
1760        return index_map
1761    # returns node data for this host
1762    def get_nodes_self(self, timeout=120):
1763        node = None
1764        api = self.baseUrl + 'nodes/self'
1765        status, content, header = self._http_request(api, timeout=timeout)
1766        if status:
1767            json_parsed = json.loads(content)
1768            node = RestParser().parse_get_nodes_response(json_parsed)
1769        return node
1770
1771    def node_statuses(self, timeout=120):
1772        nodes = []
1773        api = self.baseUrl + 'nodeStatuses'
1774        status, content, header = self._http_request(api, timeout=timeout)
1775        json_parsed = json.loads(content)
1776        if status:
1777            for key in json_parsed:
1778                # each key contain node info
1779                value = json_parsed[key]
1780                # get otp,get status
1781                node = OtpNode(id=value['otpNode'],
1782                               status=value['status'])
1783                if node.ip == '127.0.0.1':
1784                    node.ip = self.ip
1785                node.port = int(key[key.rfind(":") + 1:])
1786                node.replication = value['replication']
1787                if 'gracefulFailoverPossible' in value.keys():
1788                    node.gracefulFailoverPossible = value['gracefulFailoverPossible']
1789                else:
1790                    node.gracefulFailoverPossible = False
1791                nodes.append(node)
1792        return nodes
1793
1794    def cluster_status(self):
1795        parsed = {}
1796        api = self.baseUrl + 'pools/default'
1797        status, content, header = self._http_request(api)
1798        if status:
1799            parsed = json.loads(content)
1800        return parsed
1801
1802    def fetch_vbucket_map(self, bucket="default"):
1803        """Return vbucket map for bucket
1804        Keyword argument:
1805        bucket -- bucket name
1806        """
1807        api = self.baseUrl + 'pools/default/buckets/' + bucket
1808        status, content, header = self._http_request(api)
1809        _stats = json.loads(content)
1810        return _stats['vBucketServerMap']['vBucketMap']
1811
1812    def get_vbucket_map_and_server_list(self, bucket="default"):
1813        """ Return server list, replica and vbuckets map
1814        that matches to server list """
1815        vbucket_map = self.fetch_vbucket_map(bucket)
1816        api = self.baseUrl + 'pools/default/buckets/' + bucket
1817        status, content, header = self._http_request(api)
1818        _stats = json.loads(content)
1819        num_replica = _stats['vBucketServerMap']['numReplicas']
1820        vbucket_map = _stats['vBucketServerMap']['vBucketMap']
1821        servers = _stats['vBucketServerMap']['serverList']
1822        server_list = []
1823        for node in servers:
1824            node = node.split(":")
1825            server_list.append(node[0])
1826        return vbucket_map, server_list, num_replica
1827
1828    def get_pools_info(self):
1829        parsed = {}
1830        api = self.baseUrl + 'pools'
1831        status, content, header = self._http_request(api)
1832        json_parsed = json.loads(content)
1833        if status:
1834            parsed = json_parsed
1835        return parsed
1836
1837    def get_pools_default(self, query='', timeout=30):
1838        parsed = {}
1839        api = self.baseUrl + 'pools/default'
1840        if query:
1841            api += "?" + query
1842
1843        status, content, header = self._http_request(api, timeout=timeout)
1844        json_parsed = json.loads(content)
1845        if status:
1846            parsed = json_parsed
1847        return parsed
1848
1849    def get_pools(self):
1850        version = None
1851        api = self.baseUrl + 'pools'
1852        status, content, header = self._http_request(api)
1853        json_parsed = json.loads(content)
1854        if status:
1855            version = MembaseServerVersion(json_parsed['implementationVersion'], json_parsed['componentsVersion'])
1856        return version
1857
1858    def get_buckets(self):
1859        # get all the buckets
1860        buckets = []
1861        api = '{0}{1}'.format(self.baseUrl, 'pools/default/buckets?basic_stats=true')
1862        status, content, header = self._http_request(api)
1863        json_parsed = json.loads(content)
1864        if status:
1865            for item in json_parsed:
1866                bucketInfo = RestParser().parse_get_bucket_json(item)
1867                buckets.append(bucketInfo)
1868        return buckets
1869
1870    def get_buckets_itemCount(self):
1871        # get all the buckets
1872        bucket_map = {}
1873        api = '{0}{1}'.format(self.baseUrl, 'pools/default/buckets?basic_stats=true')
1874        status, content, header = self._http_request(api)
1875        json_parsed = json.loads(content)
1876        if status:
1877            for item in json_parsed:
1878                bucketInfo = RestParser().parse_get_bucket_json(item)
1879                bucket_map[bucketInfo.name] = bucketInfo.stats.itemCount
1880        log.info(bucket_map)
1881        return bucket_map
1882
1883    def get_bucket_stats_for_node(self, bucket='default', node=None):
1884        if not node:
1885            log.error('node_ip not specified')
1886            return None
1887        stats = {}
1888        api = "{0}{1}{2}{3}{4}:{5}{6}".format(self.baseUrl, 'pools/default/buckets/',
1889                                     bucket, "/nodes/", node.ip, node.port, "/stats")
1890        status, content, header = self._http_request(api)
1891        if status:
1892            json_parsed = json.loads(content)
1893            op = json_parsed["op"]
1894            samples = op["samples"]
1895            for stat_name in samples:
1896                if stat_name not in stats:
1897                    if len(samples[stat_name]) == 0:
1898                        stats[stat_name] = []
1899                    else:
1900                        stats[stat_name] = samples[stat_name][-1]
1901                else:
1902                    raise Exception("Duplicate entry in the stats command {0}".format(stat_name))
1903        return stats
1904
1905    def get_bucket_status(self, bucket):
1906        if not bucket:
1907            log.error("Bucket Name not Specified")
1908            return None
1909        api = self.baseUrl + 'pools/default/buckets'
1910        status, content, header = self._http_request(api)
1911        if status:
1912            json_parsed = json.loads(content)
1913            for item in json_parsed:
1914                if item["name"] == bucket:
1915                    return item["nodes"][0]["status"]
1916            log.error("Bucket {} doesn't exist".format(bucket))
1917            return None
1918
1919    def get_cluster_stats(self):
1920        """
1921        Reads cluster nodes statistics using `pools/default` rest GET method
1922        :return stat_dict - Dictionary of CPU & Memory status each cluster node:
1923        """
1924        stat_dict = dict()
1925        json_output = self.get_pools_default()
1926        if 'nodes' in json_output:
1927            for node_stat in json_output['nodes']:
1928                stat_dict[node_stat['hostname']] = dict()
1929                stat_dict[node_stat['hostname']]['services'] = node_stat['services']
1930                stat_dict[node_stat['hostname']]['cpu_utilization'] = node_stat['systemStats']['cpu_utilization_rate']
1931                stat_dict[node_stat['hostname']]['mem_free'] = node_stat['systemStats']['mem_free']
1932                stat_dict[node_stat['hostname']]['mem_total'] = node_stat['systemStats']['mem_total']
1933                stat_dict[node_stat['hostname']]['swap_mem_used'] = node_stat['systemStats']['swap_used']
1934                stat_dict[node_stat['hostname']]['swap_mem_total'] = node_stat['systemStats']['swap_total']
1935        return stat_dict
1936
1937    def fetch_bucket_stats(self, bucket='default', zoom='minute'):
1938        """Return deserialized buckets stats.
1939        Keyword argument:
1940        bucket -- bucket name
1941        zoom -- stats zoom level (minute | hour | day | week | month | year)
1942        """
1943        api = self.baseUrl + 'pools/default/buckets/{0}/stats?zoom={1}'.format(bucket, zoom)
1944        log.info(api)
1945        status, content, header = self._http_request(api)
1946        return json.loads(content)
1947
1948    def set_query_index_api_mode(self, index_api_mode=3):
1949        api = self.query_baseUrl + 'admin/settings'
1950        query_api_setting = {"max-index-api": index_api_mode}
1951        status, content, header = self._http_request(api, 'POST', json.dumps(query_api_setting))
1952        if not status:
1953            raise Exception(content)
1954        log.info("{0} set".format(query_api_setting))
1955
1956    def fetch_bucket_xdcr_stats(self, bucket='default', zoom='minute'):
1957        """Return deserialized bucket xdcr stats.
1958        Keyword argument:
1959        bucket -- bucket name
1960        zoom -- stats zoom level (minute | hour | day | week | month | year)
1961        """
1962        api = self.baseUrl + 'pools/default/buckets/@xdcr-{0}/stats?zoom={1}'.format(bucket, zoom)
1963        status, content, header = self._http_request(api)
1964        return json.loads(content)
1965
1966    def fetch_system_stats(self):
1967        """Return deserialized system stats."""
1968        api = self.baseUrl + 'pools/default/'
1969        status, content, header = self._http_request(api)
1970        return json.loads(content)
1971
1972    def get_xdc_queue_size(self, bucket):
1973        """Fetch bucket stats and return the latest value of XDC replication
1974        queue size"""
1975        bucket_stats = self.fetch_bucket_xdcr_stats(bucket)
1976        return bucket_stats['op']['samples']['replication_changes_left'][-1]
1977
1978    def get_dcp_queue_size(self, bucket):
1979        """Fetch bucket stats and return the latest value of DCP
1980        queue size"""
1981        bucket_stats = self.fetch_bucket_stats(bucket)
1982        return bucket_stats['op']['samples']['ep_dcp_xdcr_items_remaining'][-1]
1983
1984    def get_active_key_count(self, bucket):
1985        """Fetch bucket stats and return the bucket's curr_items count"""
1986        bucket_stats = self.fetch_bucket_stats(bucket)
1987        return bucket_stats['op']['samples']['curr_items'][-1]
1988
1989    def get_replica_key_count(self, bucket):
1990        """Fetch bucket stats and return the bucket's replica count"""
1991        bucket_stats = self.fetch_bucket_stats(bucket)
1992        return bucket_stats['op']['samples']['vb_replica_curr_items'][-1]
1993
1994    def get_nodes(self):
1995        nodes = []
1996        api = self.baseUrl + 'pools/default'
1997        status, content, header = self._http_request(api)
1998        count = 0
1999        while not content and count < 7:
2000            log.info("sleep 5 seconds and retry")
2001            time.sleep(5)
2002            status, content, header = self._http_request(api)
2003            count += 1
2004        if count == 7:
2005            raise Exception("could not get node info after 30 seconds")
2006        json_parsed = json.loads(content)
2007        if status:
2008            if "nodes" in json_parsed:
2009                for json_node in json_parsed["nodes"]:
2010                    node = RestParser().parse_get_nodes_response(json_node)
2011                    node.rest_username = self.username
2012                    node.rest_password = self.password
2013                    if node.ip == "127.0.0.1":
2014                        node.ip = self.ip
2015                    # Only add nodes which are active on cluster
2016                    if node.clusterMembership == 'active':
2017                        nodes.append(node)
2018                    else:
2019                        log.info("Node {0} not part of cluster {1}".format(node.ip, node.clusterMembership))
2020        return nodes
2021
2022    # this method returns the number of node in cluster
2023    def get_cluster_size(self):
2024        nodes = self.get_nodes()
2025        node_ip = []
2026        for node in nodes:
2027            node_ip.append(node.ip)
2028        log.info("Number of node(s) in cluster is {0} node(s)".format(len(node_ip)))
2029        return len(node_ip)
2030
2031    """ this medthod return version on node that is not initialized yet """
2032    def get_nodes_version(self):
2033        node = self.get_nodes_self()
2034        version = node.version
2035        log.info("Node version in cluster {0}".format(version))
2036        return version
2037
2038    # this method returns the versions of nodes in cluster
2039    def get_nodes_versions(self, logging=True):
2040        nodes = self.get_nodes()
2041        versions = []
2042        for node in nodes:
2043            versions.append(node.version)
2044        if logging:
2045            log.info("Node versions in cluster {0}".format(versions))
2046        return versions
2047
2048    def check_cluster_compatibility(self, version):
2049        """
2050        Check if all nodes in cluster are of versions equal or above the version required.
2051        :param version: Version to check the cluster compatibility for. Should be of format major_ver.minor_ver.
2052                        For example: 5.0, 4.5, 5.1
2053        :return: True if cluster is compatible with the version specified, False otherwise. Return None if cluster is
2054        uninitialized.
2055        """
2056        nodes = self.get_nodes()
2057        if not nodes:
2058            # If nodes returned is None, it means that the cluster is not initialized yet and hence cluster
2059            # compatibility cannot be found. Return None
2060            return None
2061        major_ver, minor_ver = version.split(".")
2062        compatibility = int(major_ver) * 65536 + int(minor_ver)
2063        is_compatible = True
2064        for node in nodes:
2065            clusterCompatibility = int(node.clusterCompatibility)
2066            if clusterCompatibility < compatibility:
2067                is_compatible = False
2068        return is_compatible
2069
2070
2071    # this method returns the services of nodes in cluster - implemented for Sherlock
2072    def get_nodes_services(self):
2073        nodes = self.get_nodes()
2074        map = {}
2075        for node in nodes:
2076            key = "{0}:{1}".format(node.ip, node.port)
2077            map[key] = node.services
2078        return map
2079
2080    # Check node version
2081    def check_node_versions(self, check_version="4.0"):
2082        versions = self.get_nodes_versions()
2083        if versions[0] < check_version:
2084            return False
2085        return True
2086
2087    def get_bucket_stats(self, bucket='default'):
2088        stats = {}
2089        status, json_parsed = self.get_bucket_stats_json(bucket)
2090        if status:
2091            op = json_parsed["op"]
2092            samples = op["samples"]
2093            for stat_name in samples:
2094                if samples[stat_name]:
2095                    last_sample = len(samples[stat_name]) - 1
2096                    if last_sample:
2097                        stats[stat_name] = samples[stat_name][last_sample]
2098        return stats
2099
2100    def get_fts_stats(self, index_name, bucket_name, stat_name):
2101        """
2102        List of fts stats available as of 03/16/2017 -
2103        default:default_idx3:avg_queries_latency: 0,
2104        default:default_idx3:batch_merge_count: 0,
2105        default:default_idx3:doc_count: 0,
2106        default:default_idx3:iterator_next_count: 0,
2107        default:default_idx3:iterator_seek_count: 0,
2108        default:default_idx3:num_bytes_live_data: 0,
2109        default:default_idx3:num_bytes_used_disk: 0,
2110        default:default_idx3:num_mutations_to_index: 0,
2111        default:default_idx3:num_pindexes: 0,
2112        default:default_idx3:num_pindexes_actual: 0,
2113        default:default_idx3:num_pindexes_target: 0,
2114        default:default_idx3:num_recs_to_persist: 0,
2115        default:default_idx3:reader_get_count: 0,
2116        default:default_idx3:reader_multi_get_count: 0,
2117        default:default_idx3:reader_prefix_iterator_count: 0,
2118        default:default_idx3:reader_range_iterator_count: 0,
2119        default:default_idx3:timer_batch_store_count: 0,
2120        default:default_idx3:timer_data_delete_count: 0,
2121        default:default_idx3:timer_data_update_count: 0,
2122        default:default_idx3:timer_opaque_get_count: 0,
2123        default:default_idx3:timer_opaque_set_count: 0,
2124        default:default_idx3:timer_rollback_count: 0,
2125        default:default_idx3:timer_snapshot_start_count: 0,
2126        default:default_idx3:total_bytes_indexed: 0,
2127        default:default_idx3:total_bytes_query_results: 0,
2128        default:default_idx3:total_compactions: 0,
2129        default:default_idx3:total_queries: 0,
2130        default:default_idx3:total_queries_error: 0,
2131        default:default_idx3:total_queries_slow: 0,
2132        default:default_idx3:total_queries_timeout: 0,
2133        default:default_idx3:total_request_time: 0,
2134        default:default_idx3:total_term_searchers: 0,
2135        default:default_idx3:writer_execute_batch_count: 0,
2136        :param index_name: name of the index
2137        :param bucket_name: source bucket
2138        :param stat_name: any of the above
2139        :return:
2140        """
2141        api = "{0}{1}".format(self.fts_baseUrl, 'api/nsstats')
2142        attempts = 0
2143        while attempts < 5:
2144            status, content, header = self._http_request(api)
2145            json_parsed = json.loads(content)
2146            key = bucket_name+':'+index_name+':'+stat_name
2147            if key in json_parsed:
2148                return status, json_parsed[key]
2149            attempts += 1
2150            log.info("Stat {0} not available yet".format(stat_name))
2151            time.sleep(1)
2152        log.error("ERROR: Stat {0} error on {1} on bucket {2}: {3}".
2153                  format(stat_name, index_name, bucket_name, e))
2154
2155    def get_bucket_status(self, bucket):
2156        if not bucket:
2157            log.error("Bucket Name not Specified")
2158            return None
2159        api = self.baseUrl + 'pools/default/buckets'
2160        status, content, header = self._http_request(api)
2161        if status:
2162            json_parsed = json.loads(content)
2163            for item in json_parsed:
2164                if item["name"] == bucket:
2165                    return item["nodes"][0]["status"]
2166            log.error("Bucket {0} doesn't exist".format(bucket))
2167            return None
2168
2169    def get_bucket_stats_json(self, bucket='default'):
2170        stats = {}
2171        api = "{0}{1}{2}{3}".format(self.baseUrl, 'pools/default/buckets/', bucket, "/stats")
2172        if isinstance(bucket, Bucket):
2173            api = '{0}{1}{2}{3}'.format(self.baseUrl, 'pools/default/buckets/', bucket.name, "/stats")
2174        status, content, header = self._http_request(api)
2175        json_parsed = json.loads(content)
2176        return status, json_parsed
2177
2178    def get_bucket_json(self, bucket='default'):
2179        api = '{0}{1}{2}'.format(self.baseUrl, 'pools/default/buckets/', bucket)
2180        if isinstance(bucket, Bucket):
2181            api = '{0}{1}{2}'.format(self.baseUrl, 'pools/default/buckets/', bucket.name)
2182        status, content, header = self._http_request(api)
2183        if not status:
2184            raise GetBucketInfoFailed(bucket, content)
2185        return json.loads(content)
2186
2187    def get_bucket_maxTTL(self, bucket='default'):
2188        bucket_info = self.get_bucket_json(bucket=bucket)
2189        return bucket_info['maxTTL']
2190
2191    def get_bucket_compressionMode(self, bucket='default'):
2192        bucket_info = self.get_bucket_json(bucket=bucket)
2193        info = self.get_nodes_self()
2194        if 5.5 > float(info.version[:3]):
2195            bucket_info['compressionMode'] = "off"
2196        return bucket_info['compressionMode']
2197
2198    def is_lww_enabled(self, bucket='default'):
2199        bucket_info = self.get_bucket_json(bucket=bucket)
2200        try:
2201            if bucket_info['conflictResolutionType'] == 'lww':
2202                return True
2203        except KeyError:
2204            return False
2205
2206    def get_bucket(self, bucket='default', num_attempt=1, timeout=1):
2207        bucketInfo = None
2208        api = '%s%s%s?basic_stats=true' % (self.baseUrl, 'pools/default/buckets/', bucket)
2209        if isinstance(bucket, Bucket):
2210            api = '%s%s%s?basic_stats=true' % (self.baseUrl, 'pools/default/buckets/', bucket.name)
2211        status, content, header = self._http_request(api)
2212        num = 1
2213        while not status and num_attempt > num:
2214            log.error("try to get {0} again after {1} sec".format(api, timeout))
2215            time.sleep(timeout)
2216            status, content, header = self._http_request(api)
2217            num += 1
2218        if status:
2219            bucketInfo = RestParser().parse_get_bucket_response(content)
2220        return bucketInfo
2221
2222    def get_vbuckets(self, bucket='default'):
2223        b = self.get_bucket(bucket)
2224        return None if not b else b.vbuckets
2225
2226    def delete_bucket(self, bucket='default'):
2227        api = '%s%s%s' % (self.baseUrl, 'pools/default/buckets/', bucket)
2228        if isinstance(bucket, Bucket):
2229            api = '%s%s%s' % (self.baseUrl, 'pools/default/buckets/', bucket.name)
2230        status, content, header = self._http_request(api, 'DELETE')
2231
2232        if int(header['status']) == 500:
2233            # According to http://docs.couchbase.com/couchbase-manual-2.5/cb-rest-api/#deleting-buckets
2234            # the cluster will return with 500 if it failed to nuke
2235            # the bucket on all of the nodes within 30 secs
2236            log.warn("Bucket deletion timed out waiting for all nodes")
2237
2238        return status
2239
2240    '''Load any of the three sample buckets'''
2241    def load_sample(self,sample_name):
2242        api = '{0}{1}'.format(self.baseUrl, "sampleBuckets/install")
2243        data = '["{0}"]'.format(sample_name)
2244        status, content, header = self._http_request(api, 'POST', data)
2245        # Sleep to allow the sample bucket to be loaded
2246        time.sleep(10)
2247        return status
2248
2249    # figure out the proxy port
2250    def create_bucket(self, bucket='',
2251                      ramQuotaMB=1,
2252                      authType='none',
2253                      saslPassword='',
2254                      replicaNumber=1,
2255                      proxyPort=11211,
2256                      bucketType='membase',
2257                      replica_index=1,
2258                      threadsNumber=3,
2259                      flushEnabled=1,
2260                      evictionPolicy='valueOnly',
2261                      lww=False,
2262                      maxTTL=None,
2263                      compressionMode='passive'):
2264
2265
2266        api = '{0}{1}'.format(self.baseUrl, 'pools/default/buckets')
2267        params = urllib.urlencode({})
2268
2269
2270
2271        # this only works for default bucket ?
2272        if bucket == 'default':
2273            init_params = {'name': bucket,
2274                           'authType': 'sasl',
2275                           'saslPassword': saslPassword,
2276                           'ramQuotaMB': ramQuotaMB,
2277                           'replicaNumber': replicaNumber,
2278                           #'proxyPort': proxyPort,
2279                           'bucketType': bucketType,
2280                           'replicaIndex': replica_index,
2281                           'threadsNumber': threadsNumber,
2282                           'flushEnabled': flushEnabled,
2283                           'evictionPolicy': evictionPolicy}
2284
2285        elif authType == 'none':
2286            init_params = {'name': bucket,
2287                           'ramQuotaMB': ramQuotaMB,
2288                           'authType': authType,
2289                           'replicaNumber': replicaNumber,
2290                           #'proxyPort': proxyPort,
2291                           'bucketType': bucketType,
2292                           'replicaIndex': replica_index,
2293                           'threadsNumber': threadsNumber,
2294                           'flushEnabled': flushEnabled,
2295                           'evictionPolicy': evictionPolicy}
2296        elif authType == 'sasl':
2297            init_params = {'name': bucket,
2298                           'ramQuotaMB': ramQuotaMB,
2299                           'authType': authType,
2300                           'saslPassword': saslPassword,
2301                           'replicaNumber': replicaNumber,
2302                           #'proxyPort': self.get_nodes_self().moxi,
2303                           'bucketType': bucketType,
2304                           'replicaIndex': replica_index,
2305                           'threadsNumber': threadsNumber,
2306                           'flushEnabled': flushEnabled,
2307                           'evictionPolicy': evictionPolicy}
2308        if lww:
2309            init_params['conflictResolutionType'] = 'lww'
2310
2311        if maxTTL:
2312            init_params['maxTTL'] = maxTTL
2313
2314        if compressionMode and self.is_enterprise_edition():
2315            init_params['compressionMode'] = compressionMode
2316
2317        if bucketType == 'ephemeral':
2318            del init_params['replicaIndex']     # does not apply to ephemeral buckets, and is even rejected
2319
2320        pre_spock = not self.check_cluster_compatibility("5.0")
2321        if pre_spock:
2322            init_params['proxyPort'] = proxyPort
2323
2324        params = urllib.urlencode(init_params)
2325
2326        log.info("{0} with param: {1}".format(api, params))
2327        create_start_time = time.time()
2328
2329        maxwait = 60
2330        for numsleep in range(maxwait):
2331            status, content, header = self._http_request(api, 'POST', params)
2332            if status:
2333                break
2334            elif (int(header['status']) == 503 and
2335                    '{"_":"Bucket with given name still exists"}' in content):
2336                log.info("The bucket still exists, sleep 1 sec and retry")
2337                time.sleep(1)
2338            else:
2339                raise BucketCreationException(ip=self.ip, bucket_name=bucket)
2340
2341        if (numsleep + 1) == maxwait:
2342            log.error("Tried to create the bucket for {0} secs.. giving up".
2343                      format(maxwait))
2344            raise BucketCreationException(ip=self.ip, bucket_name=bucket)
2345
2346
2347
2348
2349        create_time = time.time() - create_start_time
2350        log.info("{0:.02f} seconds to create bucket {1}".
2351                 format(round(create_time, 2), bucket))
2352        return status
2353
2354    def change_bucket_props(self, bucket,
2355                      ramQuotaMB=None,
2356                      authType=None,
2357                      saslPassword=None,
2358                      replicaNumber=None,
2359                      proxyPort=None,
2360                      replicaIndex=None,
2361                      flushEnabled=None,
2362                      timeSynchronization=None,
2363                      maxTTL=None,
2364                      compressionMode=None):
2365        api = '{0}{1}{2}'.format(self.baseUrl, 'pools/default/buckets/', bucket)
2366        if isinstance(bucket, Bucket):
2367            api = '{0}{1}{2}'.format(self.baseUrl, 'pools/default/buckets/', bucket.name)
2368        params = urllib.urlencode({})
2369        params_dict = {}
2370        existing_bucket = self.get_bucket_json(bucket)
2371        if ramQuotaMB:
2372            params_dict["ramQuotaMB"] = ramQuotaMB
2373        if authType:
2374            params_dict["authType"] = authType
2375        if saslPassword:
2376            params_dict["authType"] = "sasl"
2377            params_dict["saslPassword"] = saslPassword
2378        if replicaNumber:
2379            params_dict["replicaNumber"] = replicaNumber
2380        #if proxyPort:
2381        #    params_dict["proxyPort"] = proxyPort
2382        if replicaIndex:
2383            params_dict["replicaIndex"] = replicaIndex
2384        if flushEnabled:
2385            params_dict["flushEnabled"] = flushEnabled
2386        if timeSynchronization:
2387            params_dict["timeSynchronization"] = timeSynchronization
2388        if maxTTL:
2389            params_dict["maxTTL"] = maxTTL
2390        if compressionMode and self.is_enterprise_edition():
2391            params_dict["compressionMode"] = compressionMode
2392
2393        params = urllib.urlencode(params_dict)
2394
2395        log.info("%s with param: %s" % (api, params))
2396        status, content, header = self._http_request(api, 'POST', params)
2397        if timeSynchronization:
2398            if status:
2399                raise Exception("Erroneously able to set bucket settings %s for bucket on time-sync" % (params, bucket))
2400            return status, content
2401        if not status:
2402            raise Exception("Unable to set bucket settings %s for bucket" % (params, bucket))
2403        log.info("bucket %s updated" % bucket)
2404        return status
2405
2406    # return AutoFailoverSettings
2407    def get_autofailover_settings(self):
2408        settings = None
2409        api = self.baseUrl + 'settings/autoFailover'
2410        status, content, header = self._http_request(api)
2411        json_parsed = json.loads(content)
2412        if status:
2413            settings = AutoFailoverSettings()
2414            settings.enabled = json_parsed["enabled"]
2415            settings.count = json_parsed["count"]
2416            settings.timeout = json_parsed["timeout"]
2417        return settings
2418
2419    def update_autofailover_settings(self, enabled, timeout):
2420        if enabled:
2421            params = urllib.urlencode({'enabled': 'true',
2422                                       'timeout': timeout})
2423        else:
2424            params = urllib.urlencode({'enabled': 'false',
2425                                       'timeout': timeout})
2426        api = self.baseUrl + 'settings/autoFailover'
2427        log.info('settings/autoFailover params : {0}'.format(params))
2428        status, content, header = self._http_request(api, 'POST', params)
2429        if not status:
2430            log.error('''failed to change autofailover_settings!
2431                         See MB-7282. Workaround:
2432                         wget --user=Administrator --password=asdasd --post-data='rpc:call(mb_master:master_node(), erlang, apply ,[fun () -> erlang:exit(erlang:whereis(mb_master), kill) end, []]).' http://localhost:8091/diag/eval''')
2433        return status
2434
2435    # return AutoReprovisionSettings
2436    def get_autoreprovision_settings(self):
2437        settings = None
2438        api = self.baseUrl + 'settings/autoReprovision'
2439        status, content, header = self._http_request(api)
2440        json_parsed = json.loads(content)
2441        if status:
2442            settings = AutoReprovisionSettings()
2443            settings.enabled = json_parsed["enabled"]
2444            settings.count = json_parsed["count"]
2445            settings.max_nodes = json_parsed["max_nodes"]
2446        return settings
2447
2448    def update_autoreprovision_settings(self, enabled, maxNodes=1):
2449        if enabled:
2450            params = urllib.urlencode({'enabled': 'true',
2451                                       'maxNodes': maxNodes})
2452        else:
2453            params = urllib.urlencode({'enabled': 'false',
2454                                       'maxNodes': maxNodes})
2455        api = self.baseUrl + 'settings/autoReprovision'
2456        log.info('settings/autoReprovision params : {0}'.format(params))
2457        status, content, header = self._http_request(api, 'POST', params)
2458        if not status:
2459            log.error('failed to change autoReprovision_settings!')
2460        return status
2461
2462    def reset_autofailover(self):
2463        api = self.baseUrl + 'settings/autoFailover/resetCount'
2464        status, content, header = self._http_request(api, 'POST', '')
2465        return status
2466
2467    def reset_autoreprovision(self):
2468        api = self.baseUrl + 'settings/autoReprovision/resetCount'
2469        status, content, header = self._http_request(api, 'POST', '')
2470        return status
2471
2472    def set_alerts_settings(self, recipients, sender, email_username, email_password, email_host='localhost', email_port=25, email_encrypt='false', alerts='auto_failover_node,auto_failover_maximum_reached'):
2473        api = self.baseUrl + 'settings/alerts'
2474        params = urllib.urlencode({'enabled': 'true',
2475                                   'recipients': recipients,
2476                                   'sender': sender,
2477                                   'emailUser': email_username,
2478                                   'emailPass': email_password,
2479                                   'emailHost': email_host,
2480                                   'emailPort': email_port,
2481                                   'emailEncrypt': email_encrypt,
2482                                   'alerts': alerts})
2483        log.info('settings/alerts params : {0}'.format(params))
2484        status, content, header = self._http_request(api, 'POST', params)
2485        return status
2486
2487    def get_alerts_settings(self):
2488        api = self.baseUrl + 'settings/alerts'
2489        status, content, header = self._http_request(api)
2490        json_parsed = json.loads(content)
2491        if not status:
2492            raise Exception("unable to get autofailover alerts settings")
2493        return json_parsed
2494
2495    def disable_alerts(self):
2496        api = self.baseUrl + 'settings/alerts'
2497        params = urllib.urlencode({'enabled': 'false'})
2498        log.info('settings/alerts params : {0}'.format(params))
2499        status, content, header = self._http_request(api, 'POST', params)
2500        return status
2501
2502    def set_cas_drift_threshold(self, bucket, ahead_threshold_in_millisecond, behind_threshold_in_millisecond):
2503
2504        api = self.baseUrl + 'pools/default/buckets/{0}'. format( bucket )
2505        params_dict ={'driftAheadThresholdMs': ahead_threshold_in_millisecond,
2506                      'driftBehindThresholdMs': behind_threshold_in_millisecond}
2507        params = urllib.urlencode(params_dict)
2508        log.info("%s with param: %s" % (api, params))
2509        status, content, header = self._http_request(api, 'POST', params)
2510        return status
2511
2512    def stop_rebalance(self, wait_timeout=10):
2513        api = self.baseUrl + '/controller/stopRebalance'
2514        status, content, header = self._http_request(api, 'POST')
2515        if status:
2516            for i in xrange(wait_timeout):
2517                if self._rebalance_progress_status() == 'running':
2518                    log.warn("rebalance is not stopped yet after {0} sec".format(i + 1))
2519                    time.sleep(1)
2520                    status = False
2521                else:
2522                    log.info("rebalance was stopped")
2523                    status = True
2524                    break
2525        else:
2526            log.error("Rebalance is not stopped due to {0}".format(content))
2527        return status
2528
2529    def set_data_path(self, data_path=None, index_path=None):
2530        api = self.baseUrl + '/nodes/self/controller/settings'
2531        paths = {}
2532        if data_path:
2533            paths['path'] = data_path
2534        if index_path:
2535            paths['index_path'] = index_path
2536        if paths:
2537            params = urllib.urlencode(paths)
2538            log.info('/nodes/self/controller/settings params : {0}'.format(params))
2539            status, content, header = self._http_request(api, 'POST', params)
2540            if status:
2541                log.info("Setting data_path: {0}: status {1}".format(data_path, status))
2542            else:
2543                log.error("Unable to set data_path {0} : {1}".format(data_path, content))
2544            return status
2545
2546    def get_database_disk_size(self, bucket='default'):
2547        api = self.baseUrl + "pools/{0}/buckets".format(bucket)
2548        status, content, header = self._http_request(api)
2549        json_parsed = json.loads(content)
2550        # disk_size in MB
2551        disk_size = (json_parsed[0]["basicStats"]["diskUsed"]) / (1024 * 1024)
2552        return status, disk_size
2553
2554    def ddoc_compaction(self, design_doc_id, bucket="default"):
2555        api = self.baseUrl + "pools/default/buckets/%s/ddocs/%s/controller/compactView" % \
2556            (bucket, design_doc_id)
2557        status, content, header = self._http_request(api, 'POST')
2558        if not status:
2559            raise CompactViewFailed(design_doc_id, content)
2560        log.info("compaction for ddoc '%s' was triggered" % design_doc_id)
2561
2562    def check_compaction_status(self, bucket_name):
2563        tasks = self.active_tasks()
2564        if "error" in tasks:
2565            raise Exception(tasks)
2566        for task in tasks:
2567            log.info("Task is {0}".format(task))
2568            if task["type"] == "bucket_compaction":
2569                if task["bucket"] == bucket_name:
2570                    return True, task["progress"]
2571        return False, None
2572
2573    def change_memcached_t_option(self, value):
2574        cmd = '[ns_config:update_key({node, N, memcached}, fun (PList)' + \
2575              ' -> lists:keystore(verbosity, 1, PList, {verbosity, \'-t ' + str(value) + '\'}) end)' + \
2576              ' || N <- ns_node_disco:nodes_wanted()].'
2577        return self.diag_eval(cmd)
2578
2579    def set_ensure_full_commit(self, value):
2580        """Dynamic settings changes"""
2581        # the boolean paramter is used to turn on/off ensure_full_commit(). In XDCR,
2582        # issuing checkpoint in this function is expensive and not necessary in some
2583        # test, turning off this function would speed up some test. The default value
2584        # is ON.
2585        cmd = 'ns_config:set(ensure_full_commit_enabled, {0}).'.format(value)
2586        return self.diag_eval(cmd)
2587
2588    def get_internalSettings(self, param):
2589            """allows to get internalSettings values for:
2590            indexAwareRebalanceDisabled, rebalanceIndexWaitingDisabled,
2591            rebalanceIndexPausingDisabled, maxParallelIndexers,
2592            maxParallelReplicaIndexers, maxBucketCount"""
2593            api = self.baseUrl + "internalSettings"
2594            status, content, header = self._http_request(api)
2595            json_parsed = json.loads(content)
2596            param = json_parsed[param]
2597            return param
2598
2599    def set_internalSetting(self, param, value):
2600        "Set any internal setting"
2601        api = self.baseUrl + "internalSettings"
2602
2603        if isinstance(value, bool):
2604            value = str(value).lower()
2605
2606        params = urllib.urlencode({param : value})
2607        status, content, header = self._http_request(api, "POST", params)
2608        log.info('Update internal setting {0}={1}'.format(param, value))
2609        return status
2610
2611    def get_replication_for_buckets(self, src_bucket_name, dest_bucket_name):
2612        replications = self.get_replications()
2613        for replication in replications:
2614            if src_bucket_name in replication['source'] and \
2615                replication['target'].endswith(dest_bucket_name):
2616                return replication
2617        raise XDCRException("Replication with Src bucket: {0} and Target bucket: {1} not found".
2618                        format(src_bucket_name, dest_bucket_name))
2619
2620    """ By default, these are the global replication settings -
2621        { optimisticReplicationThreshold:256,
2622        workerBatchSize:500,
2623        failureRestartInterval:1,
2624        docBatchSizeKb":2048,
2625        checkpointInterval":1800,
2626        maxConcurrentReps":32}
2627        You can override these using set_xdcr_param()
2628    """
2629    def set_xdcr_param(self, src_bucket_name,
2630                                         dest_bucket_name, param, value):
2631        replication = self.get_replication_for_buckets(src_bucket_name, dest_bucket_name)
2632        api = self.baseUrl[:-1] + replication['settingsURI']
2633        value = str(value).lower()
2634        params = urllib.urlencode({param: value})
2635        status, _, _ = self._http_request(api, "POST", params)
2636        if not status:
2637            raise XDCRException("Unable to set replication setting {0}={1} on bucket {2} on node {3}".
2638                            format(param, value, src_bucket_name, self.ip))
2639        log.info("Updated {0}={1} on bucket'{2}' on {3}".format(param, value, src_bucket_name, self.ip))
2640
2641    # Gets per-replication setting value
2642    def get_xdcr_param(self, src_bucket_name,
2643                                    dest_bucket_name, param):
2644        replication = self.get_replication_for_buckets(src_bucket_name, dest_bucket_name)
2645        api = self.baseUrl[:-1] + replication['settingsURI']
2646        status, content, _ = self._http_request(api)
2647        if not status:
2648            raise XDCRException("Unable to get replication setting {0} on bucket {1} on node {2}".
2649                      format(param, src_bucket_name, self.ip))
2650        json_parsed = json.loads(content)
2651        # when per-replication settings match global(internal) settings,
2652        # the param is not returned by rest API
2653        # in such cases, return internalSetting value for the param
2654        try:
2655            return json_parsed[param]
2656        except KeyError:
2657            if param == 'pauseRequested':
2658                return False
2659            else:
2660                param = 'xdcr' + param[0].upper() + param[1:]
2661                log.info("Trying to fetch xdcr param:{0} from global settings".
2662                         format(param))
2663                return self.get_internalSettings(param)
2664
2665    # Returns a boolean value on whether replication
2666    def is_replication_paused(self, src_bucket_name, dest_bucket_name):
2667        return self.get_xdcr_param(src_bucket_name, dest_bucket_name, 'pauseRequested')
2668
2669    def is_replication_paused_by_id(self, repl_id):
2670        repl_id = repl_id.replace('/','%2F')
2671        api = self.baseUrl + 'settings/replications/' + repl_id
2672        status, content, header = self._http_request(api)
2673        if not status:
2674            raise XDCRException("Unable to retrieve pause resume status for replication {0}".
2675                                format(repl_id))
2676        repl_stats = json.loads(content)
2677        return repl_stats['pauseRequested']
2678
2679    def pause_resume_repl_by_id(self, repl_id, param, value):
2680        repl_id = repl_id.replace('/','%2F')
2681        api = self.baseUrl + 'settings/replications/' + repl_id
2682        params = urllib.urlencode({param: value})
2683        status, _, _ = self._http_request(api, "POST", params)
2684        if not status:
2685            raise XDCRException("Unable to update {0}={1} setting for replication {2}".
2686                            format(param, value, repl_id))
2687        log.info("Updated {0}={1} on {2}".format(param, value, repl_id))
2688
2689    def get_recent_xdcr_vb_ckpt(self, repl_id):
2690        command = 'ns_server_testrunner_api:grab_all_goxdcr_checkpoints().'
2691        status, content = self.diag_eval(command)
2692        if not status:
2693            raise Exception("Unable to get recent XDCR checkpoint information")
2694        repl_ckpt_list = json.loads(content)
2695        # a single decoding will only return checkpoint record as string
2696        # convert string to dict using json
2697        chkpt_doc_string = repl_ckpt_list['/ckpt/%s/0' % repl_id].replace('"', '\"')
2698        chkpt_dict = json.loads(chkpt_doc_string)
2699        return chkpt_dict['checkpoints'][0]
2700
2701    """ Start of FTS rest apis"""
2702
2703    def set_fts_ram_quota(self, value):
2704        """set fts ram quota"""
2705        api = self.baseUrl + "pools/default"
2706        params = urllib.urlencode({"ftsMemoryQuota": value})
2707        status, content, _ = self._http_request(api, "POST", params)
2708        if status:
2709            log.info("SUCCESS: FTS RAM quota set to {0}mb".format(value))
2710        else:
2711            raise Exception("Error setting fts ram quota: {0}".format(content))
2712        return status
2713
2714
2715    def create_fts_index(self, index_name, params):
2716        """create or edit fts index , returns {"status":"ok"} on success"""
2717        api = self.fts_baseUrl + "api/index/{0}".format(index_name)
2718        log.info(json.dumps(params))
2719        status, content, header = self._http_request(api,
2720                                    'PUT',
2721                                    json.dumps(params, ensure_ascii=False),
2722                                    headers=self._create_capi_headers(),
2723                                    timeout=30)
2724        if status:
2725            log.info("Index {0} created".format(index_name))
2726        else:
2727            raise Exception("Error creating index: {0}".format(content))
2728        return status
2729
2730    def update_fts_index(self, index_name, index_def):
2731        api = self.fts_baseUrl + "api/index/{0}".format(index_name)
2732        log.info(json.dumps(index_def, indent=3))
2733        status, content, header = self._http_request(api,
2734                                    'PUT',
2735                                    json.dumps(index_def, ensure_ascii=False),
2736                                    headers=self._create_capi_headers(),
2737                                    timeout=30)
2738        if status:
2739            log.info("Index/alias {0} updated".format(index_name))
2740        else:
2741            raise Exception("Error updating index: {0}".format(content))
2742        return status
2743
2744    def get_fts_index_definition(self, name, timeout=30):
2745        """ get fts index/alias definition """
2746        json_parsed = {}
2747        api = self.fts_baseUrl + "api/index/{0}".format(name)
2748        status, content, header = self._http_request(
2749            api,
2750            headers=self._create_capi_headers(),
2751            timeout=timeout)
2752        if status:
2753            json_parsed = json.loads(content)
2754        return status, json_parsed
2755
2756    def get_fts_index_doc_count(self, name, timeout=30):
2757        """ get number of docs indexed"""
2758        json_parsed = {}
2759        api = self.fts_baseUrl + "api/index/{0}/count".format(name)
2760        status, content, header = self._http_request(
2761            api,
2762            headers=self._create_capi_headers(),
2763            timeout=timeout)
2764        if status:
2765            json_parsed = json.loads(content)
2766        return json_parsed['count']
2767
2768    def get_fts_index_uuid(self, name, timeout=30):
2769        """ Returns uuid of index/alias """
2770        json_parsed = {}
2771        api = self.fts_baseUrl + "api/index/{0}/".format(name)
2772        status, content, header = self._http_request(
2773            api,
2774            headers=self._create_capi_headers(),
2775            timeout=timeout)
2776        if status:
2777            json_parsed = json.loads(content)
2778        return json_parsed['indexDef']['uuid']
2779
2780    def delete_fts_index(self, name):
2781        """ delete fts index/alias """
2782        api = self.fts_baseUrl + "api/index/{0}".format(name)
2783        status, content, header = self._http_request(
2784            api,
2785            'DELETE',
2786            headers=self._create_capi_headers())
2787        return status
2788
2789    def stop_fts_index_update(self, name):
2790        """ method to stop fts index from updating"""
2791        api = self.fts_baseUrl + "api/index/{0}/ingestControl/pause".format(name)
2792        status, content, header = self._http_request(
2793            api,
2794            'POST',
2795            '',
2796            headers=self._create_capi_headers())
2797        return status
2798
2799    def freeze_fts_index_partitions(self, name):
2800        """ method to freeze index partitions asignment"""
2801        api = self.fts_baseUrl+ "api/index/{0}/planFreezeControl".format(name)
2802        status, content, header = self._http_request(
2803            api,
2804            'POST',
2805            '',
2806            headers=self._create_capi_headers())
2807        return status
2808
2809    def disable_querying_on_fts_index(self, name):
2810        """ method to disable querying on index"""
2811        api = self.fts_baseUrl + "api/index/{0}/queryControl/disallow".format(name)
2812        status, content, header = self._http_request(
2813            api,
2814            'POST',
2815            '',
2816            headers=self._create_capi_headers())
2817        return status
2818
2819    def enable_querying_on_fts_index(self, name):
2820        """ method to enable querying on index"""
2821        api = self.fts_baseUrl + "api/index/{0}/queryControl/allow".format(name)
2822        status, content, header = self._http_request(
2823            api,
2824            'POST',
2825            '',
2826            headers=self._create_capi_headers())
2827        return status
2828
2829    def run_fts_query(self, index_name, query_json, timeout=70):
2830        """Method run an FTS query through rest api"""
2831        api = self.fts_baseUrl + "api/index/{0}/query".format(index_name)
2832        headers = self._create_capi_headers()
2833        status, content, header = self._http_request(
2834            api,
2835            "POST",
2836            json.dumps(query_json, ensure_ascii=False).encode('utf8'),
2837            headers,
2838            timeout=timeout)
2839
2840        if status:
2841            content = json.loads(content)
2842            return content['total_hits'], content['hits'], content['took'], \
2843                   content['status']
2844
2845    def run_fts_query_with_facets(self, index_name, query_json):
2846        """Method run an FTS query through rest api"""
2847        api = self.fts_baseUrl + "api/index/{0}/query".format(index_name)
2848        headers = self._create_capi_headers()
2849        status, content, header = self._http_request(
2850            api,
2851            "POST",
2852            json.dumps(query_json, ensure_ascii=False).encode('utf8'),
2853            headers,
2854            timeout=70)
2855
2856        if status:
2857            content = json.loads(content)
2858            return content['total_hits'], content['hits'], content['took'], \
2859                   content['status'], content['facets']
2860
2861
2862    """ End of FTS rest APIs """
2863
2864    def set_reb_cons_view(self, disable):
2865        """Enable/disable consistent view for rebalance tasks"""
2866        api = self.baseUrl + "internalSettings"
2867        params = {"indexAwareRebalanceDisabled": str(disable).lower()}
2868        params = urllib.urlencode(params)
2869        status, content, header = self._http_request(api, "POST", params)
2870        log.info('Consistent-views during rebalance was set as indexAwareRebalanceDisabled={0}'\
2871                 .format(str(disable).lower()))
2872        return status
2873
2874    def set_reb_index_waiting(self, disable):
2875        """Enable/disable rebalance index waiting"""
2876        api = self.baseUrl + "internalSettings"
2877        params = {"rebalanceIndexWaitingDisabled": str(disable).lower()}
2878        params = urllib.urlencode(params)
2879        status, content, header = self._http_request(api, "POST", params)
2880        log.info('rebalance index waiting was set as rebalanceIndexWaitingDisabled={0}'\
2881                 .format(str(disable).lower()))
2882        return status
2883
2884    def set_rebalance_index_pausing(self, disable):
2885        """Enable/disable index pausing during rebalance"""
2886        api = self.baseUrl + "internalSettings"
2887        params = {"rebalanceIndexPausingDisabled": str(disable).lower()}
2888        params = urllib.urlencode(params)
2889        status, content, header = self._http_request(api, "POST", params)
2890        log.info('index pausing during rebalance was set as rebalanceIndexPausingDisabled={0}'\
2891                 .format(str(disable).lower()))
2892        return status
2893
2894    def set_max_parallel_indexers(self, count):
2895        """set max parallel indexer threads"""
2896        api = self.baseUrl + "internalSettings"
2897        params = {"maxParallelIndexers": count}
2898        params = urllib.urlencode(params)
2899        status, content, header = self._http_request(api, "POST", params)
2900        log.info('max parallel indexer threads was set as maxParallelIndexers={0}'.\
2901                 format(count))
2902        return status
2903
2904    def set_max_parallel_replica_indexers(self, count):
2905        """set max parallel replica indexers threads"""
2906        api = self.baseUrl + "internalSettings"
2907        params = {"maxParallelReplicaIndexers": count}
2908        params = urllib.urlencode(params)
2909        status, content, header = self._http_request(api, "POST", params)
2910        log.info('max parallel replica indexers threads was set as maxParallelReplicaIndexers={0}'.\
2911                 format(count))
2912        return status
2913
2914    def get_internal_replication_type(self):
2915        buckets = self.get_buckets()
2916        cmd = "\'{ok, BC} = ns_bucket:get_bucket(%s), ns_bucket:replication_type(BC).\'" % buckets[0].name
2917        return self.diag_eval(cmd)
2918
2919    def set_mc_threads(self, mc_threads=4):
2920        """
2921        Change number of memcached threads and restart the cluster
2922        """
2923        cmd = "[ns_config:update_key({node, N, memcached}, " \
2924              "fun (PList) -> lists:keystore(verbosity, 1, PList," \
2925              " {verbosity, \"-t %s\"}) end) " \
2926              "|| N <- ns_node_disco:nodes_wanted()]." % mc_threads
2927
2928        return self.diag_eval(cmd)
2929
2930    def get_auto_compaction_settings(self):
2931        api = self.baseUrl + "settings/autoCompaction"
2932        status, content, header = self._http_request(api)
2933        return json.loads(content)
2934
2935    def set_auto_compaction(self, parallelDBAndVC="false",
2936                            dbFragmentThreshold=None,
2937                            viewFragmntThreshold=None,
2938                            dbFragmentThresholdPercentage=None,
2939                            viewFragmntThresholdPercentage=None,
2940                            allowedTimePeriodFromHour=None,
2941                            allowedTimePeriodFromMin=None,
2942                            allowedTimePeriodToHour=None,
2943                            allowedTimePeriodToMin=None,
2944                            allowedTimePeriodAbort=None,
2945                            bucket=None):
2946        """Reset compaction values to default, try with old fields (dp4 build)
2947        and then try with newer fields"""
2948        params = {}
2949        api = self.baseUrl
2950
2951        if bucket is None:
2952            # setting is cluster wide
2953            api = api + "controller/setAutoCompaction"
2954        else:
2955            # overriding per/bucket compaction setting
2956            api = api + "pools/default/buckets/" + bucket
2957            params["autoCompactionDefined"] = "true"
2958            # reuse current ram quota in mb per node
2959            num_nodes = len(self.node_statuses())
2960            bucket_info = self.get_bucket_json(bucket)
2961            quota = self.get_bucket_json(bucket)["quota"]["ram"] / (1048576 * num_nodes)
2962            params["ramQuotaMB"] = quota
2963            if bucket_info["authType"] == "sasl" and bucket_info["name"] != "default":
2964                params["authType"] = self.get_bucket_json(bucket)["authType"]
2965                params["saslPassword"] = self.get_bucket_json(bucket)["saslPassword"]
2966
2967        params["parallelDBAndViewCompaction"] = parallelDBAndVC
2968        # Need to verify None because the value could be = 0
2969        if dbFragmentThreshold is not None:
2970            params["databaseFragmentationThreshold[size]"] = dbFragmentThreshold
2971        if viewFragmntThreshold is not None:
2972            params["viewFragmentationThreshold[size]"] = viewFragmntThreshold
2973        if dbFragmentThresholdPercentage is not None:
2974            params["databaseFragmentationThreshold[percentage]"] = dbFragmentThresholdPercentage
2975        if viewFragmntThresholdPercentage is not None:
2976            params["viewFragmentationThreshold[percentage]"] = viewFragmntThresholdPercentage
2977        if allowedTimePeriodFromHour is not None:
2978            params["allowedTimePeriod[fromHour]"] = allowedTimePeriodFromHour
2979        if allowedTimePeriodFromMin is not None:
2980            params["allowedTimePeriod[fromMinute]"] = allowedTimePeriodFromMin
2981        if allowedTimePeriodToHour is not None:
2982            params["allowedTimePeriod[toHour]"] = allowedTimePeriodToHour
2983        if allowedTimePeriodToMin is not None:
2984            params["allowedTimePeriod[toMinute]"] = allowedTimePeriodToMin
2985        if allowedTimePeriodAbort is not None:
2986            params["allowedTimePeriod[abortOutside]"] = allowedTimePeriodAbort
2987
2988        params = urllib.urlencode(params)
2989        log.info("'%s' bucket's settings will be changed with parameters: %s" % (bucket, params))
2990        return self._http_request(api, "POST", params)
2991
2992    def disable_auto_compaction(self):
2993        """
2994           Cluster-wide Setting
2995              Disable autocompaction on doc and view
2996        """
2997        api = self.baseUrl + "controller/setAutoCompaction"
2998        log.info("Disable autocompaction in cluster-wide setting")
2999        status, content, header = self._http_request(api, "POST",
3000                                  "parallelDBAndViewCompaction=false")
3001        return status
3002
3003    def set_purge_interval_and_parallel_compaction(self, interval=3, parallel="false"):
3004        """
3005           Cluster-wide setting.
3006           Set purge interval
3007           Set parallel db and view compaction
3008           Return: status
3009        """
3010        api = self.baseUrl + "controller/setAutoCompaction"
3011        log.info("Set purgeInterval to %s and parallel DB and view compaction to %s"\
3012                                                              % (interval, parallel))
3013        params = {}
3014        params["purgeInterval"] = interval
3015        params["parallelDBAndViewCompaction"] = parallel
3016        params = urllib.urlencode(params)
3017        status, content, header = self._http_request(api, "POST", params)
3018        return status, content
3019
3020    def set_indexer_compaction(self, mode="circular", indexDayOfWeek=None, indexFromHour=0,
3021                                indexFromMinute=0, abortOutside=False,
3022                                indexToHour=0, indexToMinute=0, fragmentation=30):
3023        """Reset compaction values to default, try with old fields (dp4 build)
3024        and then try with newer fields"""
3025        params = {}
3026        api = self.baseUrl + "controller/setAutoCompaction"
3027        params["indexCompactionMode"] = mode
3028        params["indexCircularCompaction[interval][fromHour]"] = indexFromHour
3029        params["indexCircularCompaction[interval][fromMinute]"] = indexFromMinute
3030        params["indexCircularCompaction[interval][toHour]"] = indexToHour
3031        params["indexCircularCompaction[interval][toMinute]"] = indexToMinute
3032        if indexDayOfWeek:
3033            params["indexCircularCompaction[daysOfWeek]"] = indexDayOfWeek
3034        params["indexCircularCompaction[interval][abortOutside]"] = str(abortOutside).lower()
3035        params["parallelDBAndViewCompaction"] = "false"
3036        if mode == "full":
3037            params["indexFragmentationThreshold[percentage]"] = fragmentation
3038        log.info("Indexer Compaction Settings: %s" % (params))
3039        params = urllib.urlencode(params)
3040        return self._http_request(api, "POST", params)
3041
3042    def set_global_loglevel(self, loglevel='error'):
3043        """Set cluster-wide logging level for core components
3044
3045        Possible loglevel:
3046            -- debug
3047            -- info
3048            -- warn
3049            -- error
3050        """
3051
3052        api = self.baseUrl + 'diag/eval'
3053        request_body = 'rpc:eval_everywhere(erlang, apply, [fun () -> \
3054                        [ale:set_loglevel(L, {0}) || L <- \
3055                        [ns_server, couchdb, user, menelaus, ns_doctor, stats, \
3056                        rebalance, cluster, views, stderr]] end, []]).'.format(loglevel)
3057        return self._http_request(api=api, method='POST', params=request_body,
3058                                  headers=self._create_headers())
3059
3060    def set_indexer_params(self, parameter, val):
3061        """
3062        :Possible  parameters:
3063            -- indexerThreads
3064            -- memorySnapshotInterval
3065            -- stableSnapshotInterval
3066            -- maxRollbackPoints
3067            -- logLevel
3068        """
3069        params = {}
3070        api = self.baseUrl + 'settings/indexes'
3071        params[parameter] = val
3072        params = urllib.urlencode(params)
3073        status, content, header = self._http_request(api, "POST", params)
3074        log.info('Indexer {0} set to {1}'.format(parameter, val))
3075        return status
3076
3077    def get_global_index_settings(self):
3078        api = self.baseUrl + "settings/indexes"
3079        status, content, header = self._http_request(api)
3080        if status:
3081            return json.loads(content)
3082        return None
3083
3084    def set_couchdb_option(self, section, option, value):
3085        """Dynamic settings changes"""
3086
3087        cmd = 'ns_config:set({{couchdb, {{{0}, {1}}}}}, {2}).'.format(section,
3088                                                                      option,
3089                                                                      value)
3090        return self.diag_eval(cmd)
3091
3092    def get_alerts(self):
3093        api = self.baseUrl + "pools/default/"
3094        status, content, header = self._http_request(api)
3095        json_parsed = json.loads(content)
3096        if status:
3097            if "alerts" in json_parsed:
3098                return json_parsed['alerts']
3099        else:
3100            return None
3101
3102    def get_nodes_data_from_cluster(self, param="nodes"):
3103        api = self.baseUrl + "pools/default/"
3104        status, content, header = self._http_request(api)
3105        json_parsed = json.loads(content)
3106        if status:
3107            if param in json_parsed:
3108                return json_parsed[param]
3109        else:
3110            return None
3111
3112    def flush_bucket(self, bucket="default"):
3113        if isinstance(bucket, Bucket):
3114            bucket_name = bucket.name
3115        else:
3116            bucket_name = bucket
3117        api = self.baseUrl + "pools/default/buckets/%s/controller/doFlush" % (bucket_name)
3118        status, content, header = self._http_request(api, 'POST')
3119        if not status:
3120            raise BucketFlushFailed(self.ip, bucket_name)
3121        log.info("Flush for bucket '%s' was triggered" % bucket_name)
3122
3123    def update_notifications(self, enable):
3124        api = self.baseUrl + 'settings/stats'
3125        params = urllib.urlencode({'sendStats' : enable})
3126        log.info('settings/stats params : {0}'.format(params))
3127        status, content, header = self._http_request(api, 'POST', params)
3128        return status
3129
3130    def get_notifications(self):
3131        api = self.baseUrl + 'settings/stats'
3132        status, content, header = self._http_request(api)
3133        json_parsed = json.loads(content)
3134        if status:
3135            return json_parsed["sendStats"]
3136        return None
3137
3138    def get_logs(self, last_n=10, contains_text=None):
3139        api = self.baseUrl + 'logs'
3140        status, content, header = self._http_request(api)
3141        json_parsed = json.loads(content)
3142        logs = json_parsed['list']
3143        logs.reverse()
3144        result = []
3145        for i in xrange(min(last_n, len(logs))):
3146            result.append(logs[i])
3147            if contains_text is not None and contains_text in logs[i]["text"]:
3148                break
3149        return result
3150
3151    def print_UI_logs(self, last_n=10, contains_text=None):
3152        logs = self.get_logs(last_n, contains_text)
3153        log.info("Latest logs from UI on {0}:".format(self.ip))
3154        for lg in logs: log.error(lg)
3155
3156    def get_ro_user(self):
3157        api = self.baseUrl + 'settings/readOnlyAdminName'
3158        status, content, header = self._http_request(api, 'GET', '')
3159        return content, status
3160
3161    def delete_ro_user(self):
3162        api = self.baseUrl + 'settings/readOnlyUser'
3163        status, content, header = self._http_request(api, 'DELETE', '')
3164        return status
3165
3166    def create_ro_user(self, username, password):
3167        api = self.baseUrl + 'settings/readOnlyUser'
3168        params = urllib.urlencode({'username' : username, 'password' : password})
3169        log.info('settings/readOnlyUser params : {0}'.format(params))
3170        status, content, header = self._http_request(api, 'POST', params)
3171        return status
3172
3173    # Change password for readonly user
3174    def changePass_ro_user(self, username, password):
3175        api = self.baseUrl + 'settings/readOnlyUser'
3176        params = urllib.urlencode({'username' : username, 'password' : password})
3177        log.info('settings/readOnlyUser params : {0}'.format(params))
3178        status, content, header = self._http_request(api, 'PUT', params)
3179        return status
3180
3181    '''Start Monitoring/Profiling Rest Calls'''
3182    def set_completed_requests_collection_duration(self, server, min_time):
3183        http = httplib2.Http()
3184        n1ql_port = 8093
3185        api = "http://%s:%s/" % (server.ip, n1ql_port) + "admin/settings"
3186        body = {"completed-threshold": min_time}
3187        headers = self._create_headers_with_auth('Administrator','password')
3188        response,content = http.request(api, "POST", headers=headers, body=json.dumps(body))
3189        return response,content
3190
3191    def set_completed_requests_max_entries(self, server, no_entries):
3192        http = httplib2.Http()
3193        n1ql_port = 8093
3194        api = "http://%s:%s/" % (server.ip, n1ql_port) + "admin/settings"
3195        body = {"completed-limit": no_entries}
3196        headers = self._create_headers_with_auth('Administrator','password')
3197        response,content = http.request(api, "POST", headers=headers, body=json.dumps(body))
3198        return response,content
3199
3200    def set_profiling(self, server, setting):
3201        http = httplib2.Http()
3202        n1ql_port = 8093
3203        api = "http://%s:%s/" % (server.ip, n1ql_port) + "admin/settings"
3204        body = {"profile": setting}
3205        headers = self._create_headers_with_auth('Administrator','password')
3206        response,content = http.request(api, "POST", headers=headers, body=json.dumps(body))
3207        return response,content
3208
3209    def set_profiling_controls(self, server, setting):
3210        http = httplib2.Http()
3211        n1ql_port = 8093
3212        api = "http://%s:%s/" % (server.ip, n1ql_port) + "admin/settings"
3213        body = {"controls": setting}
3214        headers = self._create_headers_with_auth('Administrator','password')
3215        response,content = http.request(api, "POST", headers=headers, body=json.dumps(body))
3216        return response,content
3217
3218    def get_query_admin_settings(self, server):
3219        http = httplib2.Http()
3220        n1ql_port = 8093
3221        api = "http://%s:%s/" % (server.ip, n1ql_port) + "admin/settings"
3222        headers = self._create_headers_with_auth('Administrator', 'password')
3223        response, content = http.request(api, "GET", headers=headers)
3224        result = json.loads(content)
3225        return result
3226
3227    def get_query_vitals(self,server):
3228        http = httplib2.Http()
3229        n1ql_port = 8093
3230        api = "http://%s:%s/" % (server.ip, n1ql_port) + "admin/vitals"
3231        headers = self._create_headers_with_auth('Administrator', 'password')
3232        response, content = http.request(api, "GET", headers=headers)
3233        return response, content
3234    '''End Monitoring/Profiling Rest Calls'''
3235
3236    def create_whitelist(self, server, whitelist):
3237        http = httplib2.Http()
3238        api = "http://%s:%s/" % (server.ip, server.port) + "settings/querySettings/curlWhitelist"
3239        headers = self._create_headers_with_auth('Administrator', 'password')
3240        response,content = http.request(api, "POST", headers=headers, body=json.dumps(whitelist))
3241        return response,content
3242
3243    def query_tool(self, query, port=8093, timeout=1300, query_params={}, is_prepared=False, named_prepare=None,
3244                   verbose = True, encoded_plan=None, servers=None):
3245        key = 'prepared' if is_prepared else 'statement'
3246        headers = None
3247        content=""
3248        prepared = json.dumps(query)
3249        if is_prepared:
3250            if named_prepare and encoded_plan:
3251                http = httplib2.Http()
3252                if len(servers)>1:
3253                    url = "http://%s:%s/query/service" % (servers[1].ip, port)
3254                else:
3255                    url = "http://%s:%s/query/service" % (self.ip, port)
3256
3257                headers = self._create_headers_encoded_prepared()
3258                body = {'prepared': named_prepare, 'encoded_plan':encoded_plan}
3259
3260                response, content = http.request(url, 'POST', headers=headers, body=json.dumps(body))
3261
3262                return eval(content)
3263
3264            elif named_prepare and not encoded_plan:
3265                params = 'prepared=' + urllib.quote(prepared, '~()')
3266                params = 'prepared="%s"'% named_prepare
3267            else:
3268                prepared = json.dumps(query)
3269                prepared = str(prepared.encode('utf-8'))
3270                params = 'prepared=' + urllib.quote(prepared, '~()')
3271            if 'creds' in query_params and query_params['creds']:
3272                headers = self._create_headers_with_auth(query_params['creds'][0]['user'].encode('utf-8'),
3273                                                         query_params['creds'][0]['pass'].encode('utf-8'))
3274            api = "http://%s:%s/query/service?%s" % (self.ip, port, params)
3275            log.info("%s"%api)
3276        else:
3277            params = {key : query}
3278            if 'creds' in query_params and query_params['creds']:
3279                headers = self._create_headers_with_auth(query_params['creds'][0]['user'].encode('utf-8'),
3280                                                         query_params['creds'][0]['pass'].encode('utf-8'))
3281                del query_params['creds']
3282            params.update(query_params)
3283            params = urllib.urlencode(params)
3284            if verbose:
3285                log.info('query params : {0}'.format(params))
3286            api = "http://%s:%s/query?%s" % (self.ip, port, params)
3287
3288
3289        status, content, header = self._http_request(api, 'POST', timeout=timeout, headers=headers)
3290        try:
3291            return json.loads(content)
3292        except ValueError:
3293            return content
3294
3295    def analytics_tool(self, query, port=8095, timeout=650, query_params={}, is_prepared=False, named_prepare=None,
3296                   verbose = True, encoded_plan=None, servers=None):
3297        key = 'prepared' if is_prepared else 'statement'
3298        headers = None
3299        content=""
3300        prepared = json.dumps(query)
3301        if is_prepared:
3302            if named_prepare and encoded_plan:
3303                http = httplib2.Http()
3304                if len(servers)>1:
3305                    url = "http://%s:%s/query/service" % (servers[1].ip, port)
3306                else:
3307                    url = "http://%s:%s/query/service" % (self.ip, port)
3308
3309                headers = {'Content-type': 'application/json'}
3310                body = {'prepared': named_prepare, 'encoded_plan':encoded_plan}
3311
3312                response, content = http.request(url, 'POST', headers=headers, body=json.dumps(body))
3313
3314                return eval(content)
3315
3316            elif named_prepare and not encoded_plan:
3317                params = 'prepared=' + urllib.quote(prepared, '~()')
3318                params = 'prepared="%s"'% named_prepare
3319            else:
3320                prepared = json.dumps(query)
3321                prepared = str(prepared.encode('utf-8'))
3322                params = 'prepared=' + urllib.quote(prepared, '~()')
3323            if 'creds' in query_params and query_params['creds']:
3324                headers = self._create_headers_with_auth(query_params['creds'][0]['user'].encode('utf-8'),
3325                                                         query_params['creds'][0]['pass'].encode('utf-8'))
3326            api = "%s/analytics/service?%s" % (self.cbas_base_url, params)
3327            log.info("%s"%api)
3328        else:
3329            params = {key : query}
3330            if 'creds' in query_params and query_params['creds']:
3331                headers = self._create_headers_with_auth(query_params['creds'][0]['user'].encode('utf-8'),
3332                                                         query_params['creds'][0]['pass'].encode('utf-8'))
3333                del query_params['creds']
3334            params.update(query_params)
3335            params = urllib.urlencode(params)
3336            if verbose:
3337                log.info('query params : {0}'.format(params))
3338            api = "%s/analytics/service?%s" % (self.cbas_base_url, params)
3339        status, content, header = self._http_request(api, 'POST', timeout=timeout, headers=headers)
3340        try:
3341            return json.loads(content)
3342        except ValueError:
3343            return content
3344
3345    def query_tool_stats(self):
3346        log.info('query n1ql stats')
3347        api = "http://%s:8093/admin/stats" % (self.ip)
3348        status, content, header = self._http_request(api, 'GET')
3349        log.info(content)
3350        try:
3351            return json.loads(content)
3352        except ValueError:
3353            return content
3354
3355    def index_tool_stats(self):
3356        log.info('index n1ql stats')
3357        api = "http://%s:8091/indexStatus" % (self.ip)
3358        params = ""
3359        status, content, header = self._http_request(api, 'GET', params)
3360        log.info(content)
3361        try:
3362            return json.loads(content)
3363        except ValueError:
3364            return content
3365
3366    # return all rack/zone info
3367    def get_all_zones_info(self, timeout=120):
3368        zones = {}
3369        api = self.baseUrl + 'pools/default/serverGroups'
3370        status, content, header = self._http_request(api, timeout=timeout)
3371        if status:
3372            zones = json.loads(content)
3373        else:
3374            raise Exception("Failed to get all zones info.\n \
3375                  Zone only supports from couchbase server version 2.5 and up.")
3376        return zones
3377
3378    # return group name and unique uuid
3379    def get_zone_names(self):
3380        zone_names = {}
3381        zone_info = self.get_all_zones_info()
3382        if zone_info and len(zone_info["groups"]) >= 1:
3383            for i in range(0, len(zone_info["groups"])):
3384                # pools/default/serverGroups/ = 27 chars
3385                zone_names[zone_info["groups"][i]["name"]] = zone_info["groups"][i]["uri"][28:]
3386        return zone_names
3387
3388    def add_zone(self, zone_name):
3389        api = self.baseUrl + 'pools/default/serverGroups'
3390        request_name = "name={0}".format(zone_name)
3391        status, content, header = self._http_request(api, "POST", \
3392                                        params=request_name)
3393        if status:
3394            log.info("zone {0} is added".format(zone_name))
3395            return True
3396        else:
3397            raise Exception("Failed to add zone with name: %s " % zone_name)
3398
3399    def delete_zone(self, zone_name):
3400        api = self.baseUrl + 'pools/default/serverGroups/'
3401        # check if zone exist
3402        found = False
3403        zones = self.get_zone_names()
3404        for zone in zones:
3405            if zone_name == zone:
3406                api += zones[zone_name]
3407                found = True
3408                break
3409        if not found:
3410            raise Exception("There is not zone with name: %s in cluster" % zone_name)
3411        status, content, header = self._http_request(api, "DELETE")
3412        if status:
3413            log.info("zone {0} is deleted".format(zone_name))
3414        else:
3415            raise Exception("Failed to delete zone with name: %s " % zone_name)
3416
3417    def rename_zone(self, old_name, new_name):
3418        api = self.baseUrl + 'pools/default/serverGroups/'
3419        # check if zone exist
3420        found = False
3421        zones = self.get_zone_names()
3422        for zone in zones:
3423            if old_name == zone:
3424                api += zones[old_name]
3425                request_name = "name={0}".format(new_name)
3426                found = True
3427                break
3428        if not found:
3429            raise Exception("There is not zone with name: %s in cluster" % old_name)
3430        status, content, header = self._http_request(api, "PUT", params=request_name)
3431        if status:
3432            log.info("zone {0} is renamed to {1}".format(old_name, new_name))
3433        else:
3434            raise Exception("Failed to rename zone with name: %s " % old_name)
3435
3436    # get all nodes info in one zone/rack/group
3437    def get_nodes_in_zone(self, zone_name):
3438        nodes = {}
3439        tmp = {}
3440        zone_info = self.get_all_zones_info()
3441        if zone_name != "":
3442            found = False
3443            if len(zone_info["groups"]) >= 1:
3444                for i in range(0, len(zone_info["groups"])):
3445                    if zone_info["groups"][i]["name"] == zone_name:
3446                        tmp = zone_info["groups"][i]["nodes"]
3447                        if not tmp:
3448                            log.info("zone {0} is existed but no node in it".format(zone_name))
3449                        # remove port
3450                        for node in tmp:
3451                            node["hostname"] = node["hostname"].split(":")
3452                            node["hostname"] = node["hostname"][0]
3453                            nodes[node["hostname"]] = node
3454                        found = True
3455                        break
3456            if not found:
3457                raise Exception("There is not zone with name: %s in cluster" % zone_name)
3458        return nodes
3459
3460    def get_zone_and_nodes(self):
3461        """ only return zones with node in its """
3462        zones = {}
3463        tmp = {}
3464        zone_info = self.get_all_zones_info()
3465        if len(zone_info["groups"]) >= 1:
3466            for i in range(0, len(zone_info["groups"])):
3467                tmp = zone_info["groups"][i]["nodes"]
3468                if not tmp:
3469                    log.info("zone {0} is existed but no node in it".format(tmp))
3470                # remove port
3471                else:
3472                    nodes = []
3473                    for node in tmp:
3474                        node["hostname"] = node["hostname"].split(":")
3475                        node["hostname"] = node["hostname"][0]
3476                        print node["hostname"][0]
3477                        nodes.append(node["hostname"])
3478                    zones[zone_info["groups"][i]["name"]] = nodes
3479        return zones
3480
3481    def get_zone_uri(self):
3482        zone_uri = {}
3483        zone_info = self.get_all_zones_info()
3484        if zone_info and len(zone_info["groups"]) >= 1:
3485            for i in range(0, len(zone_info["groups"])):
3486                zone_uri[zone_info["groups"][i]["name"]] = zone_info["groups"][i]["uri"]
3487        return zone_uri
3488
3489    def shuffle_nodes_in_zones(self, moved_nodes, source_zone, target_zone):
3490        # moved_nodes should be a IP list like
3491        # ["192.168.171.144", "192.168.171.145"]
3492        request = ""
3493        for i in range(0, len(moved_nodes)):
3494            moved_nodes[i] = "ns_1@" + moved_nodes[i]
3495
3496        all_zones = self.get_all_zones_info()
3497        api = self.baseUrl + all_zones["uri"][1:]
3498
3499        moved_node_json = []
3500        for i in range(0, len(all_zones["groups"])):
3501            for node in all_zones["groups"][i]["nodes"]:
3502                if all_zones["groups"][i]["name"] == source_zone:
3503                    for n in moved_nodes:
3504                        if n == node["otpNode"]:
3505                            moved_node_json.append({"otpNode": node["otpNode"]})
3506
3507        zone_json = {}
3508        group_json = []
3509        for i in range(0, len(all_zones["groups"])):
3510            node_j = []
3511            zone_json["uri"] = all_zones["groups"][i]["uri"]
3512            zone_json["name"] = all_zones["groups"][i]["name"]
3513            zone_json["nodes"] = node_j
3514
3515            if not all_zones["groups"][i]["nodes"]:
3516                if all_zones["groups"][i]["name"] == target_zone:
3517                    for i in range(0, len(moved_node_json)):
3518                        zone_json["nodes"].append(moved_node_json[i])
3519                else:
3520                    zone_json["nodes"] = []
3521            else:
3522                for node in all_zones["groups"][i]["nodes"]:
3523                    if all_zones["groups"][i]["name"] == source_zone and \
3524                                           node["otpNode"] in moved_nodes:
3525                        pass
3526                    else:
3527                        node_j.append({"otpNode": node["otpNode"]})
3528                if all_zones["groups"][i]["name"] == target_zone:
3529                    for k in range(0, len(moved_node_json)):
3530                        node_j.append(moved_node_json[k])
3531                    zone_json["nodes"] = node_j
3532            group_json.append({"name": zone_json["name"], "uri": zone_json["uri"], "nodes": zone_json["nodes"]})
3533        request = '{{"groups": {0} }}'.format(json.dumps(group_json))
3534        status, content, header = self._http_request(api, "PUT", params=request)
3535        # sample request format
3536        # request = ' {"groups":[{"uri":"/pools/default/serverGroups/0","nodes": [] },\
3537        #                       {"uri":"/pools/default/serverGroups/c8275b7a88e6745c02815dde4a505e70","nodes": [] },\
3538        #                        {"uri":"/pools/default/serverGroups/1acd9810a027068bd14a1ddd43db414f","nodes": \
3539        #                               [{"otpNode":"ns_1@192.168.171.144"},{"otpNode":"ns_1@192.168.171.145"}]} ]} '
3540        return status
3541
3542    def is_zone_exist(self, zone_name):
3543        found = False
3544        zones = self.get_zone_names()
3545        if zones:
3546            for zone in zones:
3547                if zone_name == zone:
3548                    found = True
3549                    return True
3550                    break
3551        if not found:
3552            log.error("There is not zone with name: {0} in cluster.".format(zone_name))
3553            return False
3554
3555    def get_items_info(self, keys, bucket='default'):
3556        items_info = {}
3557        for key in keys:
3558            api = '{0}{1}{2}/docs/{3}'.format(self.baseUrl, 'pools/default/buckets/', bucket, key)
3559            status, content, header = self._http_request(api)
3560            if status:
3561                items_info[key] = json.loads(content)
3562        return items_info
3563
3564    def start_cluster_logs_collection(self, nodes="*", upload=False, \
3565                                      uploadHost=None, customer="", ticket=""):
3566        if not upload:
3567            params = urllib.urlencode({"nodes":nodes})
3568        else:
3569            params = urllib.urlencode({"nodes":nodes, "uploadHost":uploadHost, \
3570                                       "customer":customer, "ticket":ticket})
3571        api = self.baseUrl + "controller/startLogsCollection"
3572        status, content, header = self._http_request(api, "POST", params)
3573        return status, content
3574
3575    def get_cluster_logs_collection_info(self):
3576        api = self.baseUrl + "pools/default/tasks/"
3577        status, content, header = self._http_request(api, "GET")
3578        if status:
3579            tmp = json.loads(content)
3580            for k in tmp:
3581                if k["type"] == "clusterLogsCollection":
3582                    content = k
3583                    return content
3584        return None
3585
3586    """ result["progress"]: progress logs collected at cluster level
3587        result["status]: status logs collected at cluster level
3588        result["perNode"]: all information logs collected at each node """
3589    def get_cluster_logs_collection_status(self):
3590        result = self.get_cluster_logs_collection_info()
3591        if result:
3592            return result["progress"], result["status"], result["perNode"]
3593        return None, None, None
3594
3595    def cancel_cluster_logs_collection(self):
3596        api = self.baseUrl + "controller/cancelLogsCollection"
3597        status, content, header = self._http_request(api, "POST")
3598        return status, content
3599
3600    def set_log_redaction_level(self, redaction_level="none"):
3601        api = self.baseUrl + "settings/logRedaction"
3602        params = urllib.urlencode({"logRedactionLevel":redaction_level})
3603        status, content, header = self._http_request(api, "POST", params)
3604        if status:
3605            result = json.loads(content)
3606            if result["logRedactionLevel"] == redaction_level:
3607                return True
3608            else:
3609                return False
3610        return False
3611
3612    def get_bucket_CCCP(self, bucket):
3613        log.info("Getting CCCP config ")
3614        api = '%spools/default/b/%s' % (self.baseUrl, bucket)
3615        if isinstance(bucket, Bucket):
3616            api = '%spools/default/b/%s' % (self.baseUrl, bucket.name)
3617        status, content, header = self._http_request(api)
3618        if status:
3619            return json.loads(content)
3620        return None
3621
3622    def get_recovery_task(self):
3623        content = self.ns_server_tasks()
3624        for item in content:
3625            if item["type"] == "recovery":
3626                return item
3627        return None
3628
3629
3630    def get_recovery_progress(self, recoveryStatusURI):
3631        api = '%s%s' % (self.baseUrl, recoveryStatusURI)
3632        status, content, header = self._http_request(api)
3633        if status:
3634            return json.loads(content)
3635        return None
3636
3637    def get_warming_up_tasks(self):
3638        tasks = self.ns_server_tasks()
3639        tasks_warmup = []
3640        for task in tasks:
3641            if task["type"] == "warming_up":
3642                tasks_warmup.append(task)
3643        return tasks_warmup
3644
3645    def compact_bucket(self, bucket="default"):
3646        api = self.baseUrl + 'pools/default/buckets/{0}/controller/compactBucket'.format(bucket)
3647        status, content, header = self._http_request(api, 'POST')
3648        if status:
3649            log.info('bucket compaction successful')
3650        else:
3651            raise BucketCompactionException(bucket)
3652
3653        return True
3654
3655    def cancel_bucket_compaction(self, bucket="default"):
3656        api = self.baseUrl + 'pools/default/buckets/{0}/controller/cancelBucketCompaction'.format(bucket)
3657        if isinstance(bucket, Bucket):
3658            api = self.baseUrl + 'pools/default/buckets/{0}/controller/cancelBucketCompaction'.format(bucket.name)
3659        status, content, header = self._http_request(api, 'POST')
3660        log.info("Status is {0}".format(status))
3661        if status:
3662            log.info('Cancel bucket compaction successful')
3663        else:
3664            raise BucketCompactionException(bucket)
3665        return True
3666
3667
3668    '''LDAP Rest API '''
3669    '''
3670    clearLDAPSettings - Function to clear LDAP settings
3671    Parameter - None
3672    Returns -
3673    status of LDAPAuth clear command
3674    '''
3675    def clearLDAPSettings(self):
3676        api = self.baseUrl + 'settings/saslauthdAuth'
3677        params = urllib.urlencode({'enabled':'false'})
3678        status, content, header = self._http_request(api, 'POST', params)
3679        return status, content, header
3680
3681    '''
3682    ldapUserRestOperation - Execute LDAP REST API
3683    Input Parameter -
3684        authOperation - this is for auth need to be enabled or disabled - True or 0
3685        currAdmmins - a list of username to add to full admin matching with ldap
3686        currROAdmins - a list of username to add to RO Admin
3687    Returns - status, content and header for the command executed
3688    '''
3689    def ldapUserRestOperation(self, authOperation, adminUser='', ROadminUser=''):
3690        authOperation = authOperation
3691        currAdmins = ''
3692        currROAdmins = ''
3693
3694        if (adminUser != ''):
3695            for user in adminUser:
3696                currAdmins = user[0] + "\n\r" + currAdmins
3697
3698        if (ROadminUser != ''):
3699            for user in ROadminUser:
3700                currROAdmins = user[0] + "\n\r" + currROAdmins
3701        content = self.executeLDAPCommand(authOperation, currAdmins, currROAdmins)
3702
3703    '''LDAP Rest API '''
3704    '''
3705    clearLDAPSettings - Function to clear LDAP settings
3706    Parameter - None
3707    Returns -
3708    status of LDAPAuth clear command
3709    '''
3710    def clearLDAPSettings (self):
3711        api = self.baseUrl + 'settings/saslauthdAuth'
3712        params = urllib.urlencode({'enabled':'false'})
3713        status, content, header = self._http_request(api, 'POST', params)
3714        return status, content, header
3715
3716    '''
3717    ldapUserRestOperation - Execute LDAP REST API
3718    Input Parameter -
3719        authOperation - this is for auth need to be enabled or disabled - True or 0
3720        currAdmmins - a list of username to add to full admin matching with ldap
3721        currROAdmins - a list of username to add to RO Admin
3722    Returns - status, content and header for the command executed
3723    '''
3724    def ldapUserRestOperation(self, authOperation, adminUser='', ROadminUser='', exclude=None):
3725        if (authOperation):
3726            authOperation = 'true'
3727        else:
3728            authOperation = 'false'
3729
3730        currAdmins = ''
3731        currROAdmins = ''
3732
3733        if (adminUser != ''):
3734            for user in adminUser:
3735                currAdmins = user[0] + "\n\r" + currAdmins
3736
3737        if (ROadminUser != ''):
3738            for user in ROadminUser:
3739                currROAdmins = user[0] + "\n\r" + currROAdmins
3740        content = self.executeLDAPCommand(authOperation, currAdmins, currROAdmins, exclude)
3741
3742    '''
3743    executeLDAPCommand - Execute LDAP REST API
3744    Input Parameter -
3745        authOperation - this is for auth need to be enabled or disabled - True or 0
3746        currAdmmins - a list of username to add to full admin matching with ldap
3747        currROAdmins - a list of username to add to RO Admin
3748    Returns - status, content and header for the command executed
3749    '''
3750    def executeLDAPCommand(self, authOperation, currAdmins, currROAdmins, exclude=None):
3751        api = self.baseUrl + "settings/saslauthdAuth"
3752
3753        if (exclude is None):
3754            log.info ("into exclude is None")
3755            params = urllib.urlencode({
3756                                            'enabled': authOperation,
3757                                            'admins': '{0}'.format(currAdmins),
3758                                            'roAdmins':'{0}'.format(currROAdmins),
3759                                            })
3760        else:
3761            log.info ("Into exclude for value of fullAdmin {0}".format(exclude))
3762            if (exclude == 'fullAdmin'):
3763                params = urllib.urlencode({
3764                                            'enabled': authOperation,
3765                                            'roAdmins':'{0}'.format(currROAdmins),
3766                                            })
3767            else:
3768                log.info ("Into exclude for value of fullAdmin {0}".format(exclude))
3769                params = urllib.urlencode({
3770                                            'enabled': authOperation,
3771                                            'admins': '{0}'.format(currAdmins),
3772                                            })
3773
3774
3775        status, content, header = self._http_request(api, 'POST', params)
3776        return content
3777    '''
3778    validateLogin - Validate if user can login using a REST API
3779    Input Parameter - user and password to check for login. Also take a boolean to
3780    decide if the status should be 200 or 400 and everything else should be
3781    false
3782    Returns - True of false based if user should login or login fail
3783    '''
3784    def validateLogin(self, user, password, login, getContent=False):
3785        api = self.baseUrl + "uilogin"
3786        header = {'Content-type': 'application/x-www-form-urlencoded'}
3787        params = urllib.urlencode({'user':'{0}'.format(user), 'password':'{0}'.format(password)})
3788        log.info ("value of param is {0}".format(params))
3789        http = httplib2.Http()
3790        status, content = http.request(api, 'POST', headers=header, body=params)
3791        log.info ("Status of login command - {0}".format(status))
3792        if (getContent):
3793            return status, content
3794        if ((status['status'] == "200" and login == True) or (status ['status'] == "400" and login == False)):
3795            return True
3796        else:
3797            return False
3798
3799    '''
3800    ldapRestOperationGet - Get setting of LDAPAuth - Settings
3801    Returns - list of Admins, ROAdmins and is LDAPAuth enabled or not
3802    '''
3803    def ldapRestOperationGetResponse(self):
3804        log.info ("GET command for LDAP Auth")
3805        api = self.baseUrl + "settings/saslauthdAuth"
3806        status, content, header = self._http_request(api, 'GET')
3807        return json.loads(content)
3808
3809    '''
3810    executeValidateCredentials - API to check credentials of users
3811    Input - user and password that needs validation
3812    Returns -
3813        [role]:<currentrole>
3814        [source]:<saslauthd,builtin>
3815    '''
3816    def executeValidateCredentials(self, user, password):
3817        api = self.baseUrl + "validateCredentials"
3818        params = urllib.urlencode({
3819                                   'user':'{0}'.format(user),
3820                                   'password':'{0}'.format(password)
3821                                   })
3822        status, content, header = self._http_request(api, 'POST', params)
3823        log.info ("Status of executeValidateCredentials command - {0}".format(status))
3824        return status, json.loads(content)
3825
3826    '''
3827    Audit Commands
3828    '''
3829    '''
3830    getAuditSettings - API returns audit settings for Audit
3831    Input - None
3832    Returns -
3833        [archive_path]:<path for archieve>
3834        [auditd_enabled]:<enabled disabled status for auditd>
3835        [log_path]:<path for logs>
3836        [rotate_interval]:<log rotate interval>
3837    '''
3838    def getAuditSettings(self):
3839        api = self.baseUrl + "settings/audit"
3840        status, content, header = self._http_request(api, 'GET')
3841        return json.loads(content)
3842
3843    '''
3844    getAuditSettings - API returns audit settings for Audit
3845    Input -
3846        [archive_path]:<path for archieve>
3847        [auditd_enabled]:<enabled disabled status for auditd>
3848        [rotate_interval]:<log rotate interval in seconds>
3849    '''
3850    def setAuditSettings(self, enabled='true', rotateInterval=86400, logPath='/opt/couchbase/var/lib/couchbase/logs'):
3851        api = self.baseUrl + "settings/audit"
3852        params = urllib.urlencode({
3853                                    'rotateInterval':'{0}'.format(rotateInterval),
3854                                    'auditdEnabled':'{0}'.format(enabled),
3855                                    'logPath':'{0}'.format(logPath)
3856                                    })
3857        status, content, header = self._http_request(api, 'POST', params)
3858        log.info ("Value os status is {0}".format(status))
3859        log.info ("Value of content is {0}".format(content))
3860        if (status):
3861            return status
3862        else:
3863            return status, json.loads(content)
3864
3865    def set_downgrade_storage_mode_with_rest(self, downgrade=True, username="Administrator",
3866                                                                   password="password"):
3867        authorization = base64.encodestring('%s:%s' % (username, password))
3868        if downgrade:
3869            api =