xref: /6.0.3/couchbase-cli/cluster_manager.py (revision 2ddc1a08)
1"""Management API's for Couchbase Cluster"""
2
3
4import csv
5import json
6import requests
7import StringIO
8import time
9import urllib
10import urlparse
11
12N1QL_SERVICE = 'n1ql'
13INDEX_SERVICE = 'index'
14MGMT_SERVICE = 'mgmt'
15FTS_SERVICE = 'fts'
16EVENT_SERVICE = 'eventing'
17
18ERR_AUTH = 'unable to access the REST API - please check your username (-u) and password (-p)'
19ERR_INTERNAL = 'Internal server error, please retry your request'
20
21DEFAULT_REQUEST_TIMEOUT = 60
22
23# Remove this once we can verify SSL certificates
24requests.packages.urllib3.disable_warnings()
25
26def request(f):
27    def g(*args, **kwargs):
28        cm = args[0]
29        url = args[1]
30        try:
31            return f(*args, **kwargs)
32        except requests.exceptions.ConnectionError, e:
33            if '[SSL: CERTIFICATE_VERIFY_FAILED]' in str(e):
34                return None, ['Certificate verification failed.\n' +
35                'If you are using self-signed certificates you can re-run this command with\n' +
36                'the --no-ssl-verify flag. Note however that disabling ssl verification\n' +
37                'means that couchbase-cli will be vulnerable to man-in-the-middle attacks.\n\n' +
38                'For the most secure access to Couchbase make sure that you have X.509\n' +
39                'certificates set up in your cluster and use the --cacert flag to specify\n' +
40                'your client certificate.']
41            elif str(e).startswith('[SSL]'):
42                return None, ['Unable to connect with the given CA certificate: ', str(e)]
43            return None, ['Unable to connect to host at %s: ' % cm.hostname, str(e)]
44        except requests.exceptions.ReadTimeout, e:
45            return None, ['Request to host `%s` timed out after %d seconds' % (url, cm.timeout)]
46    return g
47
48
49class ServiceNotAvailableException(Exception):
50    """An exception raised when a service does not exist in the target cluster"""
51
52    def __init__(self, service):
53        Exception.__init__(self, "Service %s not available in target cluster" % service)
54
55
56class ClusterManager(object):
57    """A set of REST API's for managing a Couchbase cluster"""
58
59    def __init__(self, hostname, username, password, sslFlag=False, verifyCert=True,
60                 caCert=True, debug=False, timeout=DEFAULT_REQUEST_TIMEOUT, cert=None):
61        hostname = hostname.replace("couchbase://", "http://", 1)
62        hostname = hostname.replace("couchbases://", "https://", 1)
63
64        self.hostname = hostname
65        # verify argument on Request functions can take boolean or a path to a CA if
66        # a path is not provide but the cert still needs to be verified it should use
67        # the system provided CAs
68        self.verifyCert = verifyCert
69        self.caCert = caCert
70        if not verifyCert:
71            self.caCert = False
72        # This is for client side certs which is currently not used.
73        self.cert = cert
74
75        parsed = urlparse.urlparse(hostname)
76        if sslFlag:
77            hostport = parsed.hostname.split(':')
78            if parsed.scheme == 'http://':
79                if parsed.port == 8091:
80                    self.hostname = "https://" + parsed.hostname + ":18091"
81                else:
82                    self.hostname = "https://" + parsed.hostname + ":" + parsed.port
83
84            # Certificates and verification are not used when the ssl flag is
85            # specified.
86            self.verifyCert = False
87            self.caCert = False
88
89        self.username = username
90        self.password = password
91        self.timeout = timeout
92        self.ssl = self.hostname.startswith("https://")
93        self.debug = debug
94
95    def restore_index_metadata(self, bucket, index_defs):
96        hosts, errors = self.get_hostnames_for_service(INDEX_SERVICE)
97        if errors:
98            return None, errors
99
100        if not hosts:
101            raise ServiceNotAvailableException(INDEX_SERVICE)
102
103        url = hosts[0] + '/restoreIndexMetadata?bucket=%s' % bucket
104        return self._post_json(url, index_defs)
105
106    def get_index_metadata(self, bucket):
107        hosts, errors = self.get_hostnames_for_service(INDEX_SERVICE)
108        if errors:
109            return None, errors
110
111        if not hosts:
112            raise ServiceNotAvailableException(INDEX_SERVICE)
113
114        url = hosts[0] + '/getIndexMetadata?bucket=%s' % bucket
115        return self._get(url)
116
117    def restore_fts_index_metadata(self, index_defs):
118        hosts, errors = self.get_hostnames_for_service(FTS_SERVICE)
119        if errors:
120            return None, errors
121
122        if not hosts:
123            raise ServiceNotAvailableException(FTS_SERVICE)
124
125        for index_def in index_defs:
126            url = hosts[0] + '/api/index/%s?prevIndexUUID=*' % index_def["name"]
127            if "sourceUUID" in index_def:
128                del index_def["sourceUUID"]
129            result, errors = self._put_json(url, index_def)
130            if errors:
131                return None, errors
132
133        return None, None
134
135    def get_fts_index_metadata(self, bucket):
136        hosts, errors = self.get_hostnames_for_service(FTS_SERVICE)
137        if errors:
138            return None, errors
139
140        if not hosts:
141            raise ServiceNotAvailableException(FTS_SERVICE)
142
143        url = hosts[0] + '/api/index'
144        result, errors = self._get(url)
145        if errors:
146            return None, errors
147
148
149        bucket_index_defs = []
150        if "indexDefs" in result and result["indexDefs"] is not None:
151            for _, index_def in result["indexDefs"]["indexDefs"].iteritems():
152                if index_def["type"] == "fulltext-index" and index_def["sourceName"] == bucket:
153                    bucket_index_defs.append(index_def)
154        return bucket_index_defs, None
155
156    def n1ql_query(self, stmt, args=None):
157        """Sends a N1QL query
158
159        Sends a N1QL query and returns the result of the query. Raises a
160        ServiceNotAvailable exception if the target cluster is no running the n1ql
161        service."""
162
163        hosts, errors = self.get_hostnames_for_service(N1QL_SERVICE)
164        if errors:
165            return None, errors
166
167        if not hosts:
168            raise ServiceNotAvailableException(N1QL_SERVICE)
169
170        url = hosts[0] + '/query/service'
171        body = {'statement': str(stmt)}
172
173        if args:
174            body['args'] = str(args)
175
176        result, errors = self._post_form_encoded(url, body)
177        if errors:
178            return None, errors
179
180        return result, None
181
182    def is_cluster_initialized(self):
183        data, errors = self.pools()
184        if (errors and len(errors) == 1 and errors[0] == ERR_AUTH) or \
185            (data and data['pools'] and len(data['pools']) > 0):
186            return True, None
187        return False, errors
188
189    def is_enterprise(self):
190        data, errors = self.pools()
191        if errors:
192            return None, errors
193        return data["isEnterprise"], None
194
195    def get_hostnames_for_service(self, service_name):
196        """ Gets all hostnames that run a service
197
198        Gets all hostnames for specified service and returns a list of strings
199        in the form "http://hostname:port". If the ClusterManager is configured
200        to use SSL/TLS then "https://" is prefixed to each name instead of
201        "http://"."""
202        url = self.hostname + '/pools/default/nodeServices'
203        data, errors = self._get(url)
204        if errors:
205            return None, errors
206
207        hosts = []
208        for node in data['nodesExt']:
209            node_host = '127.0.0.1'
210            if 'hostname' in node:
211                node_host = node['hostname']
212
213            # Check for Raw IPv6 address
214            if ':' in node_host:
215                node_host = '[' + node_host + ']'
216
217            http_prefix = 'http://'
218            fts_port_name = 'fts'
219            n1ql_port_name = 'n1ql'
220            mgmt_port_name = 'mgmt'
221            index_port_name = 'indexHttp'
222            event_port_name = 'eventingAdminPort'
223
224            if self.ssl:
225                http_prefix = 'https://'
226                n1ql_port_name = 'n1qlSSL'
227                mgmt_port_name = 'mgmtSSL'
228                event_port_name = 'eventingSSL'
229
230                # The is no ssl port for the index or fts services
231
232            if service_name == MGMT_SERVICE and mgmt_port_name in node['services']:
233                hosts.append(http_prefix + node_host + ':' + str(node['services'][mgmt_port_name]))
234
235            if service_name == N1QL_SERVICE and n1ql_port_name in node['services']:
236                hosts.append(http_prefix + node_host + ':' + str(node['services'][n1ql_port_name]))
237
238            if service_name == INDEX_SERVICE and index_port_name in node['services']:
239                hosts.append(http_prefix + node_host + ':' + str(node['services'][index_port_name]))
240
241            if service_name == FTS_SERVICE and fts_port_name in node['services']:
242                hosts.append(http_prefix + node_host + ':' + str(node['services'][fts_port_name]))
243
244            if service_name == EVENT_SERVICE and event_port_name in node['services']:
245                hosts.append(http_prefix + node_host + ':' + str(node['services'][event_port_name]))
246
247        return hosts, None
248
249    def pools(self, pool=None):
250        """ Retrieves information about Couchbase management pools
251
252        Returns Couchbase pools data"""
253        url = self.hostname + '/pools'
254        if pool:
255            url += '/' + pool
256        return self._get(url)
257
258    def set_admin_password(self, password):
259        url = self.hostname + '/controller/resetAdminPassword'
260        params = { "password": password }
261
262        return self._post_form_encoded(url, params)
263
264    def regenerate_admin_password(self):
265        url = self.hostname + '/controller/resetAdminPassword?generate=1'
266
267        return self._post_form_encoded(url, None)
268
269    def rotate_master_pwd(self):
270        url = self.hostname + '/node/controller/rotateDataKey'
271        return self._post_form_encoded(url, None)
272
273    def set_master_pwd(self, password):
274        url = self.hostname + '/node/controller/changeMasterPassword'
275        params = { "newPassword": password }
276        return self._post_form_encoded(url, params)
277
278    def set_pools_default(self, data_ramsize, index_ramsize, fts_ramsize, cbas_ramsize, eventing_ramsize, cluster_name):
279        """ Sets Couchbase RAM Quotas for various services
280
281        Options:
282        data_ramsize - An integer denoting the size in MB, None skips the parameter
283        index_ramsize - An integer denoting the size in MB, None skips the parameter
284        fts_ramsize - An integer denoting the size in MB, None skips the parameter
285        cbas_ramsize - An integer denoting the size in MB, None skips the parameter
286        eventing_ramsize - An integer denoting the size in MB, None skips the parameter
287        cluster_name - Sets a name for the cluster, None skips the parameter
288        """
289        url = self.hostname + '/pools/default'
290        params = {}
291        if data_ramsize:
292            params["memoryQuota"] = data_ramsize
293        if index_ramsize:
294            params["indexMemoryQuota"] = index_ramsize
295        if fts_ramsize:
296            params["ftsMemoryQuota"] = fts_ramsize
297        if cbas_ramsize:
298            params["cbasMemoryQuota"] = cbas_ramsize
299        if eventing_ramsize:
300            params["eventingMemoryQuota"] = eventing_ramsize
301        if cluster_name:
302            params["clusterName"] = cluster_name
303
304        return self._post_form_encoded(url, params)
305
306    def setup_services(self, services):
307        """ Sets the services on a node
308
309        Options:
310        services - A string containing a comma separated list of services
311        """
312        url = self.hostname + '/node/controller/setupServices'
313        params = { "services": services }
314
315        return self._post_form_encoded(url, params)
316
317    def set_admin_credentials(self, username, password, port):
318        """Sets the admin credentials and port for a cluster
319
320        Options:
321        username - The username for the cluster
322        password - The password for the cluster
323        port - The port number for the admin console to listen on. If set to
324               None then the port is kept the same as it currently is.
325        """
326        url = self.hostname + '/settings/web'
327        params = {}
328
329        if username:
330            params["username"] = username
331        if password:
332            params["password"] = password
333        if port:
334            params["port"] = port
335        else:
336            params["port"] = "SAME"
337
338        return self._post_form_encoded(url, params)
339
340    def enable_notifications(self, enable):
341        url = self.hostname + '/settings/stats'
342        params = { "sendStats": "false"}
343
344        if enable:
345            params["sendStats"] = "true"
346
347        return self._post_form_encoded(url, params)
348
349    def get_server_groups(self):
350        url = self.hostname + '/pools/default/serverGroups'
351        return self._get(url)
352
353    def get_server_group(self, groupName):
354        groups, errors = self.get_server_groups()
355        if errors:
356            return None, error
357
358        if not groups or not groups["groups"] or groups["groups"] == 0:
359            return None, ["No server groups found"]
360
361        if groupName:
362            for group in groups["groups"]:
363                if group["name"] == groupName:
364                    return group, None
365            return None, ["Group `%s` not found" % groupName]
366        else:
367            return groups["groups"][0], None
368
369    def add_server(self, add_server, groupName, username, password, services):
370        group, errors = self.get_server_group(groupName)
371        if errors:
372            return None, errors
373
374        url = self.hostname + group["addNodeURI"]
375        params = { "hostname": add_server,
376                   "user": username,
377                   "password": password,
378                   "services": services }
379
380        return self._post_form_encoded(url, params)
381
382    def readd_server(self, server):
383        _, _, _, readd, _, errors = self._get_otps_names(readd_nodes=[server])
384        if errors:
385            return None, errors
386
387        if len(readd) != 1:
388            return None, ["Server not found %s" % server]
389
390        url = self.hostname + '/controller/reAddNode'
391        params = { "otpNode": readd[0] }
392
393        return self._post_form_encoded(url, params)
394
395    def get_tasks(self):
396        url = self.hostname + '/pools/default/tasks'
397        return self._get(url)
398
399    def collect_logs_start(self, servers, redaction_level, salt, log_dir, tmp_dir, upload, upload_host, upload_proxy,
400                           upload_customer, upload_ticket):
401        url = self.hostname + '/controller/startLogsCollection'
402        params = dict()
403
404        if servers == "*":
405            params["nodes"] = servers
406        else:
407            nodes = servers.split(",")
408            known, _, _, readd, _, errors = self._get_otps_names(readd_nodes=nodes)
409            if errors:
410                return None, errors
411
412            if len(nodes) != len(readd):
413                return None, ["Servers list contains invalid servers"]
414
415            params["nodes"] = ",".join(readd)
416
417        if redaction_level:
418            params["logRedactionLevel"] = redaction_level
419        if log_dir:
420            params["logDir"] = log_dir
421        if tmp_dir:
422            params["tmpDir"] = tmp_dir
423        if salt:
424            params["logRedactionSalt"] = salt
425
426        if upload:
427            if upload_host:
428                params["uploadHost"] = upload_host
429            if upload_proxy:
430                params["uploadProxy"] = upload_proxy
431            if upload_customer:
432                params["customer"] = upload_customer
433            if upload_ticket:
434                params["ticket"] = upload_ticket
435
436        return self._post_form_encoded(url, params)
437
438    def collect_logs_stop(self):
439        url = self.hostname + '/controller/cancelLogsCollection'
440        return self._post_form_encoded(url, dict())
441
442    def failover(self, servers_to_failover, force):
443        _, _, failover, _, _, errors = self._get_otps_names(failover_nodes=servers_to_failover)
444        if errors:
445            return None, errors
446
447
448        if len(failover) != len(servers_to_failover):
449            if len(servers_to_failover) == 1:
450                return None, ["Server can't be failed over because it's not part of the cluster"]
451            return None, ["Some nodes specified to be failed over are not part of the cluster"]
452
453        params = {"otpNode": [server for server, _ in failover]}
454
455        if force:
456            params["allowUnsafe"] = "true"
457            url = self.hostname + '/controller/failOver'
458            return self._post_form_encoded(url, params)
459        else:
460            for server, server_status in failover:
461                if server_status != 'healthy':
462                    return None, ["% can't be gracefully failed over because it is not healthy", server]
463            url = self.hostname + '/controller/startGracefulFailover'
464            return self._post_form_encoded(url, params)
465
466    def recovery(self, server, recovery_type):
467        _, _, _, readd, _, errors = self._get_otps_names(readd_nodes=[server])
468        if errors:
469            return None, errors
470
471        if len(readd) != 1:
472            return None, ["Server not found %s" % server]
473
474        url = self.hostname + '/controller/setRecoveryType'
475        params = { "otpNode": readd[0],
476                   "recoveryType": recovery_type }
477
478        return self._post_form_encoded(url, params)
479
480    def rebalance(self, remove_nodes):
481        url = self.hostname + '/controller/rebalance'
482        all, eject, _, _, _, errors = self._get_otps_names(eject_nodes=remove_nodes)
483        if errors:
484            return None, errors
485
486        if len(eject) != len(remove_nodes):
487            return None, ["Some nodes specified to be removed are not part of the cluster"]
488
489        params = { "knownNodes": ','.join(all),
490                   "ejectedNodes": ','.join(eject) }
491
492        return self._post_form_encoded(url, params)
493
494    def rebalance_status(self):
495        data, errors = self.get_tasks()
496        if errors:
497            return (None, None), errors
498
499        rv = {
500            "status": "unknown",
501            "msg": "unknown state",
502            "details": {}
503        }
504
505        for task in data:
506            if task["type"] != "rebalance":
507                continue
508
509            err_msg = None
510            if "errorMessage" in task:
511                rv["status"] = "errored"
512                rv["msg"] = task['errorMessage']
513                break
514            elif task["status"] == "running":
515                rv["status"] = task["status"]
516                rv["msg"] = "Rebalance is running"
517                rv["details"]["progress"] = task["progress"]
518                rv["details"]["refresh"] = task["recommendedRefreshPeriod"]
519                rv["details"]["totalBuckets"] = 0
520                rv["details"]["curBucket"] = 0
521                rv["details"]["curBucketName"] = ""
522
523                if "bucketsCount" in task["detailedProgress"]:
524                    rv["details"]["totalBuckets"] = task["detailedProgress"]["bucketsCount"]
525
526                if "bucketNumber" in task["detailedProgress"]:
527                    rv["details"]["curBucket"] = task["detailedProgress"]["bucketNumber"]
528
529                if "bucket" in task["detailedProgress"]:
530                    rv["details"]["curBucketName"] = task["detailedProgress"]["bucket"]
531
532                acc = 0
533                if "perNode" in task["detailedProgress"]:
534
535                    for _, node in task["detailedProgress"]["perNode"].iteritems():
536                        acc += node["ingoing"]["docsTotal"] - node["ingoing"]["docsTransferred"]
537                        acc += node["outgoing"]["docsTotal"] - node["outgoing"]["docsTransferred"]
538
539                rv["details"]["docsRemaining"] = acc
540            elif task["status"] == "notRunning":
541                rv["status"] = task["status"]
542                rv["msg"] = "Rebalance is not running"
543                if "statusIsStale" in task:
544                    if task["statusIsStale"] or task["statusIsStale"] == "true":
545                        rv["status"] = "stale"
546                        rv["msg"] = "Current status us stale, please retry"
547
548            break
549
550        return rv, None
551
552    def _get_otps_names(self, eject_nodes=[], failover_nodes=[], readd_nodes=[]):
553        result, errors = self.pools('default')
554        if errors:
555            return None, None, None, None, None, errors
556
557        all = list()
558        eject = list()
559        failover = list()
560        readd = list()
561        hostnames = list()
562        for node in result["nodes"]:
563            if "otpNode" not in node:
564                return [], [], [], [], [], ["Unable to get otp names"]
565            all.append(node['otpNode'])
566            hostnames.append(node['hostname'])
567            if node['hostname'] in eject_nodes:
568                eject.append(node['otpNode'])
569            if node['hostname'] in failover_nodes:
570                if node['clusterMembership'] != 'active':
571                    return [], [], [], [], [], ["Can't failover a node that isn't in the cluster"]
572                else:
573                    failover.append((node['otpNode'], node['status']))
574            _, host = node['otpNode'].split('@')
575            hostport = "%s:%d" % (host, 8091)
576            if node['hostname'] in readd_nodes or hostport in readd_nodes:
577                readd.append(node['otpNode'])
578
579        return all, eject, failover, readd, hostnames, None
580
581    def create_bucket(self, name, bucket_type, memory_quota,
582                      eviction_policy, replicas, replica_indexes,
583                      threads_number, conflict_resolution, flush_enabled,
584                      max_ttl, compression_mode, sync, timeout=60):
585        url = self.hostname + '/pools/default/buckets'
586
587        if name is None:
588            return None ["The bucket name is required when creating a bucket"]
589        if bucket_type is None:
590            return None ["The bucket type is required when creating a bucket"]
591        if memory_quota is None:
592            return None ["The bucket memory quota is required when creating a bucket"]
593
594        params = { "name": name,
595                   "bucketType": bucket_type,
596                   "ramQuotaMB": memory_quota }
597
598        if eviction_policy is not None:
599            params["evictionPolicy"] = eviction_policy
600        if replicas is not None:
601            params["replicaNumber"] = replicas
602        if replica_indexes is not None:
603            params["replicaIndex"] = replica_indexes
604        if threads_number is not None:
605            params["threadsNumber"] = threads_number
606        if conflict_resolution is not None:
607            params["conflictResolutionType"] = conflict_resolution
608        if flush_enabled is not None:
609            params["flushEnabled"] = flush_enabled
610        if max_ttl is not None:
611            params["maxTTL"] = max_ttl
612        if compression_mode is not None:
613            params["compressionMode"] = compression_mode
614
615        result, errors = self._post_form_encoded(url, params)
616        if errors:
617            return None, errors
618
619        if sync:
620            all_node_ready = False
621            start = time.time()
622            while (time.time() - start) <= timeout and not all_node_ready:
623                buckets, errors = self.list_buckets()
624                if name not in buckets:
625                    time.sleep(1)
626                    continue
627
628                url = self.hostname + '/pools/default/buckets/' + name
629                content, errors = self._get(url)
630                if errors:
631                    return None, errors
632
633                all_node_ready = True
634                for node in content["nodes"]:
635                    if node["status"] != "healthy":
636                        all_node_ready = False
637                        break
638                if not all_node_ready:
639                    time.sleep(1)
640
641            if not all_node_ready:
642                return None, ["Bucket created, but not ready after %d seconds" % timeout]
643
644        return result, None
645
646
647    def edit_bucket(self, name, memory_quota, eviction_policy,
648                    replicas, threads_number, flush_enabled, max_ttl,
649                    compression_mode, remove_port):
650        url = self.hostname + '/pools/default/buckets/' + name
651
652        if name is None:
653            return None ["The bucket name is required when editing a bucket"]
654
655        params = {}
656        if memory_quota is not None:
657            params["ramQuotaMB"] = memory_quota
658        if eviction_policy is not None:
659            params["evictionPolicy"] = eviction_policy
660        if replicas is not None:
661            params["replicaNumber"] = replicas
662        if threads_number is not None:
663            params["threadsNumber"] = threads_number
664        if flush_enabled is not None:
665            params["flushEnabled"] = flush_enabled
666        if max_ttl is not None:
667            params["maxTTL"] = max_ttl
668        if compression_mode is not None:
669            params["compressionMode"] = compression_mode
670        if remove_port:
671            params["proxyPort"] = "none"
672
673        return self._post_form_encoded(url, params)
674
675    def delete_bucket(self, name):
676        url = self.hostname + '/pools/default/buckets/' + name
677        return self._delete(url, None)
678
679    def flush_bucket(self, name):
680        if name is None:
681            return None ["The bucket name is required when flushing a bucket"]
682
683        url = self.hostname + '/pools/default/buckets/' + name + '/controller/doFlush'
684        return self._post_form_encoded(url, None)
685
686    def compact_bucket(self, name, view_only, data_only):
687        if data_only and not view_only:
688            url = self.hostname + '/pools/default/buckets/' + name + \
689                '/controller/compactDatabases'
690            return self._post_form_encoded(url, None)
691        elif view_only and not data_only:
692            url = self.hostname + '/pools/default/buckets/' + name + '/ddocs'
693            ddocs, errors = self._get(url)
694            if errors:
695                return None, errors
696
697            for row in ddocs["rows"]:
698                url = self.hostname + row["controllers"]["compact"]
699                _, errors = self._post_form_encoded(url, None)
700                if errors:
701                    return None, errors
702            return None, None
703        elif not data_only and not view_only:
704            url = self.hostname + '/pools/default/buckets/' + name + \
705                '/controller/compactBucket'
706            return self._post_form_encoded(url, None)
707        else:
708            return None, ["Cannot compact data only and view only, pick one or neither"]
709
710    def list_buckets(self, extended=False):
711        url = self.hostname + '/pools/default/buckets'
712        result, errors = self._get(url)
713        if errors:
714            return None, errors
715
716        if extended:
717            return result, errors
718
719        names = list()
720        for bucket in result:
721            names.append(bucket["name"])
722
723        return names, None
724
725    def get_bucket(self, name):
726        url = self.hostname + '/pools/default/buckets'
727        result, errors = self._get(url)
728        if errors:
729            return None, errors
730
731        for bucket in result:
732            if bucket["name"] == name:
733                return bucket, None
734
735        return None, ["Bucket not found"]
736
737    def set_data_paths(self, data_path, index_path, cbas_path, java_home):
738        url = self.hostname + '/nodes/self/controller/settings'
739        params = dict()
740
741        if data_path is not None:
742            params["path"] = data_path
743
744        if index_path is not None:
745            params["index_path"] = index_path
746
747        if cbas_path is not None:
748            params["cbas_path"] = cbas_path
749
750        if java_home is not None:
751            params["java_home"] = java_home
752
753        return self._post_form_encoded(url, params)
754
755    def node_info(self):
756        url = self.hostname + '/nodes/self'
757        return self._get(url)
758
759    def get_babysitter_cookie(self):
760        url = self.hostname + '/diag/eval'
761        return self._post_form_encoded(url, 'ns_server:get_babysitter_cookie().')
762
763    def set_hostname(self, hostname):
764        url = self.hostname + '/node/controller/rename'
765        params = { "hostname": hostname }
766        return self._post_form_encoded(url, params)
767
768    def stop_rebalance(self):
769        params = {"allowUnsafe": "true"}
770        url = self.hostname + '/controller/stopRebalance'
771        return self._post_form_encoded(url, params)
772
773    def create_server_group(self, name):
774        url = self.hostname + '/pools/default/serverGroups'
775        params = { "name": name }
776        return self._post_form_encoded(url, params)
777
778    def delete_server_group(self, name):
779        uri, errors = self._get_server_group_uri(name)
780        if errors:
781            return None, errors
782
783        url = self.hostname + uri
784        params = { "name": name }
785        return self._delete(url, params)
786
787    def rename_server_group(self, name, new_name):
788        uri, errors = self._get_server_group_uri(name)
789        if errors:
790            return None, errors
791
792        url = self.hostname + uri
793        params = { "name": new_name }
794        return self._put(url, params)
795
796    def move_servers_between_groups(self, servers, from_group, to_group):
797        groups, errors = self.get_server_groups()
798        if errors:
799            return None, errors
800
801        # Find the groups to move servers between
802        move_from_group = None
803        move_to_group = None
804        for group in groups["groups"]:
805            if from_group == group['name']:
806                move_from_group = group
807            if to_group == group['name']:
808                move_to_group = group
809
810        if move_from_group is None:
811            return None, ["Group to move servers from `%s` not found" % from_group]
812        if move_to_group is None:
813            return None, ["Group to move servers to `%s` not found" % from_group]
814
815        # Find the servers to move in the from group
816        nodes_to_move = []
817        for server in servers:
818            found = False
819            for node in move_from_group["nodes"]:
820                if server == node["hostname"]:
821                    nodes_to_move.append(node)
822                    move_from_group["nodes"].remove(node)
823                    found = True
824
825            if not found:
826                return None, ["Can't move %s because it doesn't exist in '%s'" % (server, from_group)]
827
828        # Move the servers to the to group
829        for node in nodes_to_move:
830            move_to_group["nodes"].append(node)
831
832        url = self.hostname + groups["uri"]
833        return self._put_json(url, groups)
834
835    def get_server_groups(self):
836        url = self.hostname + '/pools/default/serverGroups'
837        return self._get(url)
838
839    def _get_server_group_uri(self, name):
840        groups, errors = self.get_server_groups()
841        if errors:
842            return errors
843
844        for group in groups["groups"]:
845            if name == group["name"]:
846                return group["uri"], None
847        return None, ["Group `%s` not found" % name]
848
849    def delete_rbac_user(self, username, auth_domain):
850        url = self.hostname + '/settings/rbac/users/%s/%s' % (auth_domain, username)
851        return self._delete(url, None)
852
853    def list_rbac_users(self):
854        url = self.hostname + '/settings/rbac/users'
855        return self._get(url)
856
857    def my_roles(self):
858        url = self.hostname + '/whoami'
859        return self._get(url)
860
861    def set_rbac_user(self, username, password, name, roles, auth_domain):
862        if auth_domain is None:
863            return None, ["The authentication type is required"]
864
865        if username is None:
866            return None, ["The username is required"]
867
868        url = self.hostname + '/settings/rbac/users/%s/%s' % (auth_domain, username)
869
870        params = {}
871        if name is not None:
872            params["name"] = name
873        if password is not None:
874            params["password"] = password
875        if roles is not None:
876            params["roles"] = roles
877        return self._put(url, params)
878
879    def get_password_policy(self):
880        url = self.hostname + '/settings/passwordPolicy'
881        return self._get(url)
882
883    def set_security_settings(self, disable_http_ui):
884        url = self.hostname + '/settings/security'
885
886        params = {
887            "disableUIOverHttp":  "true" if disable_http_ui else "false"
888        }
889
890        return self._post_form_encoded(url, params)
891
892    def set_password_policy(self, min_length, upper_case, lower_case, digit,
893                            special_char):
894        url = self.hostname + '/settings/passwordPolicy'
895
896        params = {
897            "minLength": min_length,
898            "enforceUppercase": "true" if upper_case else "false",
899            "enforceLowercase": "true" if lower_case else "false",
900            "enforceDigits": "true" if digit else "false",
901            "enforceSpecialChars": "true" if special_char else "false"
902        }
903
904        return self._post_form_encoded(url, params)
905
906    def set_audit_settings(self, enabled, log_path, rotate_interval, rotate_size):
907        url = self.hostname + '/settings/audit'
908
909        params = dict()
910        if enabled:
911            params["auditdEnabled"] = enabled
912        if log_path:
913            params["logPath"] = log_path
914        if rotate_interval:
915            params["rotateInterval"] = rotate_interval
916        if rotate_size:
917            params["rotateSize"] = rotate_size
918
919        if "logPath" not in params and "auditdEnabled" in params and params["auditdEnabled"] == "true":
920            return None, ["The audit log path must be specified when auditing is first set up"]
921
922        return self._post_form_encoded(url, params)
923
924    def set_autofailover_settings(self, enabled, timeout, failoverOfServerGroups, maxCount,
925                                  failoverOnDataDiskIssuesEnabled, failoverOnDataDiskIssuesTimePeriod):
926        url = self.hostname + '/settings/autoFailover'
927
928        params = dict()
929        if enabled:
930            params["enabled"] = enabled
931        if timeout:
932            params["timeout"] = timeout
933        if failoverOfServerGroups:
934            params["failoverServerGroup"] = failoverOfServerGroups
935        if failoverOnDataDiskIssuesEnabled:
936            params["failoverOnDataDiskIssues[enabled]"] = failoverOnDataDiskIssuesEnabled
937        if maxCount:
938            params["maxCount"] = maxCount
939        if failoverOnDataDiskIssuesTimePeriod:
940            params["failoverOnDataDiskIssues[timePeriod]"] = failoverOnDataDiskIssuesTimePeriod
941
942        return self._post_form_encoded(url, params)
943
944    def set_autoreprovision_settings(self, enabled, max_nodes):
945        url = self.hostname + '/settings/autoReprovision'
946
947        params = dict()
948        if enabled:
949            params["enabled"] = enabled
950        if max_nodes:
951            params["maxNodes"] = max_nodes
952
953        return self._post_form_encoded(url, params)
954
955    def set_compaction_settings(self, dbFragPerc, dbFragSize, viewFragPerc, viewFragSize,
956                                fromHour, fromMin, toHour, toMin, abortOutside,
957                                parallelDBAndViewCompact, purgeInterval, gsiMode, gsiPerc,
958                                gsiInterval, gsiFromHour, gsiFromMin, gsiToHour, gsiToMin,
959                                enableGsiAbort):
960        url = self.hostname + '/controller/setAutoCompaction'
961        params = dict()
962
963        if dbFragPerc is not None:
964            params["databaseFragmentationThreshold[percentage]"] = dbFragPerc
965        if dbFragSize is not None:
966            params["databaseFragmentationThreshold[size]"] = dbFragSize
967        if viewFragPerc is not None:
968            params["viewFragmentationThreshold[percentage]"] = viewFragPerc
969        if viewFragSize is not None:
970            params["viewFragmentationThreshold[size]"] = viewFragSize
971        if fromHour is not None:
972            params["allowedTimePeriod[fromHour]"] = fromHour
973        if fromMin is not None:
974            params["allowedTimePeriod[fromMinute]"] = fromMin
975        if toHour is not None:
976            params["allowedTimePeriod[toHour]"] = toHour
977        if toMin is not None:
978            params["allowedTimePeriod[toMinute]"] = toMin
979        if abortOutside is not None:
980            params["allowedTimePeriod[abortOutside]"] = abortOutside
981        if parallelDBAndViewCompact is not None:
982            params["parallelDBAndViewCompaction"] = parallelDBAndViewCompact
983        if purgeInterval is not None:
984            params["purgeInterval"] = purgeInterval
985        if gsiMode is not None:
986            params["indexCompactionMode"] = gsiMode
987        if gsiPerc is not None:
988            params["indexFragmentationThreshold[percentage]"] = gsiPerc
989        if gsiInterval is not None:
990            params["indexCircularCompaction[daysOfWeek]"] = gsiInterval
991        if gsiFromHour is not None:
992            params["indexCircularCompaction[interval][fromHour]"] = gsiFromHour
993        if gsiFromMin is not None:
994            params["indexCircularCompaction[interval][fromMinute]"] = gsiFromMin
995        if gsiToHour is not None:
996            params["indexCircularCompaction[interval][toHour]"] = gsiToHour
997        if gsiToMin is not None:
998            params["indexCircularCompaction[interval][toMinute]"] = gsiToMin
999        if enableGsiAbort is not None:
1000            params["indexCircularCompaction[interval][abortOutside]"] = enableGsiAbort
1001
1002        return self._post_form_encoded(url, params)
1003
1004    def set_index_settings(self, storageMode, maxRollbackPoints, stableSnapInterval,
1005                           memSnapInterval, threads, logLevel):
1006        """ Sets global index settings"""
1007        params = dict()
1008        if storageMode is not None:
1009            params["storageMode"] = storageMode
1010        if maxRollbackPoints is not None:
1011            params["maxRollbackPoints"] = maxRollbackPoints
1012        if stableSnapInterval is not None:
1013            params["stableSnapshotInterval"] = stableSnapInterval
1014        if memSnapInterval is not None:
1015            params["memorySnapshotInterval"] = memSnapInterval
1016        if threads is not None:
1017            params["indexerThreads"] = threads
1018        if logLevel is not None:
1019            params["logLevel"] = logLevel
1020
1021        url = self.hostname + '/settings/indexes'
1022        return self._post_form_encoded(url, params)
1023
1024    def set_alert_settings(self, enabled_email_alerts, email_recipients,
1025                           email_sender, email_user, email_pass, email_host,
1026                           email_port, email_encrypted, alerts):
1027        url = self.hostname + '/settings/alerts'
1028        params = dict()
1029
1030        if enabled_email_alerts:
1031            params["enabled"] = enabled_email_alerts
1032        if email_recipients:
1033            params["recipients"] = email_recipients
1034        if email_sender:
1035            params["sender"] = email_sender
1036        if email_user:
1037            params["emailUser"] = email_user
1038        if email_pass:
1039            params["emailPass"] = email_pass
1040        if email_host:
1041            params["emailHost"] = email_host
1042        if email_port:
1043            params["emailPort"] = email_port
1044        if email_encrypted:
1045            params["emailEncrypt"] = email_encrypted
1046        if alerts is not None:
1047            params["alerts"] = alerts
1048
1049        return self._post_form_encoded(url, params)
1050
1051    def index_settings(self):
1052        """ Retrieves the index settings
1053
1054            Returns a map of all global index settings"""
1055        url = self.hostname + '/settings/indexes'
1056        return self._get(url)
1057
1058    def ldap_settings(self, enabled, read_only_admins, admins):
1059        """ Sets LDAP Settings
1060
1061        enabled - The string "true" or "false"
1062        admins - A new line separated list or the string "asterisk"
1063        read_only_admins - A new line separated list or the string "asterisk"
1064        """
1065
1066        url = self.hostname + '/settings/saslauthdAuth'
1067        params = { "enabled": enabled }
1068
1069        if read_only_admins is not None:
1070            params["roAdmins"] = read_only_admins
1071        if admins is not None:
1072            params["admins"] = admins
1073
1074        return self._post_form_encoded(url, params)
1075
1076    def setRoles(self,userList,roleList,userNameList):
1077        # we take a comma-delimited list of roles that needs to go into a dictionary
1078        paramDict = {"roles" : roleList}
1079        userIds = []
1080        userNames = []
1081        userF = StringIO.StringIO(userList)
1082        for idList in csv.reader(userF, delimiter=','):
1083            userIds.extend(idList)
1084
1085        # did they specify user names?
1086        if userNameList != None:
1087            userNameF = StringIO.StringIO(userNameList)
1088            for nameList in csv.reader(userNameF, delimiter=','):
1089                userNames.extend(nameList)
1090            if len(userNames) != len(userIds):
1091                return None, ["Error: specified %d user ids and %d user names, must have the same number of each." %  (len(userIds),len(userNames))]
1092
1093        # did they specify user names?
1094        # but we need a separate REST call for each user in the comma-delimited user list
1095        for index in range(len(userIds)):
1096            user = userIds[index]
1097            paramDict["id"] = user
1098            if len(userNames) > 0:
1099                paramDict["name"] = userNames[index]
1100            url = self.hostname + '/settings/rbac/users/' + user
1101            data, errors = self._put(url,paramDict)
1102            if errors:
1103                return data, errors
1104
1105        return data, errors
1106
1107    def deleteRoles(self,userList):
1108        # need a separate REST call for each user in the comma-delimited user list
1109        userF = StringIO.StringIO(userList)
1110        reader = csv.reader(userF, delimiter=',')
1111        for users in reader:
1112            for user in users:
1113                url = self.hostname + '/settings/rbac/users/' + user
1114                data, errors = self._delete(url, None)
1115                if errors:
1116                    return data, errors
1117
1118        return data, errors
1119
1120        url = self.hostname + '/settings/rbac/users'
1121        data, errors = self._get(url)
1122
1123        return data, errors
1124
1125    def retrieve_cluster_certificate(self, extended=False):
1126        """ Retrieves the current cluster certificate
1127
1128        Gets the current cluster certificate. If extended is set tot True then
1129        we return the extended certificate which contains the certificate type,
1130        certicicate key, expiration, subject, and warnings."""
1131        url = self.hostname + '/pools/default/certificate'
1132        if extended:
1133            url += '?extended=true'
1134        return self._get(url)
1135
1136    def regenerate_cluster_certificate(self):
1137        """ Regenerates the cluster certificate
1138
1139        Regenerates the cluster certificate and returns the new certificate."""
1140        url = self.hostname + '/controller/regenerateCertificate'
1141        return self._post_form_encoded(url, None)
1142
1143    def upload_cluster_certificate(self, certificate):
1144        """ Uploads a new cluster certificate"""
1145        url = self.hostname + '/controller/uploadClusterCA'
1146        return self._post_form_encoded(url, certificate)
1147
1148    def retrieve_node_certificate(self, node):
1149        """ Retrieves the current node certificate
1150
1151        Returns the current node certificate"""
1152        url = self.hostname + '/pools/default/certificate/node/' + node
1153        return self._get(url)
1154
1155    def set_node_certificate(self):
1156        """Activates the current node certificate
1157
1158        Grabs chain.pem and pkey.pem from the <data folder>/inbox/ directory and
1159        applies them to the node. chain.pem contains the chain encoded certificates
1160        starting from the node certificat and ending with the last intermediate
1161        certificate before cluster CA. pkey.pem contains the pem encoded private
1162        key for node certifiactes. Both files should exist on the server before
1163        this API is called."""
1164        url = self.hostname + '/node/controller/reloadCertificate'
1165        return self._post_form_encoded(url, None)
1166
1167    def set_client_cert_auth(self, config):
1168        """Enable/disable the client cert auth"""
1169        url = self.hostname + '/settings/clientCertAuth'
1170        return self._post_json(url, config)
1171
1172    def retrieve_client_cert_auth(self):
1173        url = self.hostname + '/settings/clientCertAuth'
1174        return self._get(url)
1175
1176    def create_xdcr_reference(self, name, hostname, username, password, encrypted,
1177                              encryptionType, certificate, clientCertificate, clientKey):
1178        return self._set_xdcr_reference(False, name, hostname, username,
1179                                        password, encrypted, encryptionType,
1180                                        certificate, clientCertificate, clientKey)
1181
1182    def edit_xdcr_reference(self, name, hostname, username, password, encrypted,
1183                            encryptionType, certificate, clientCertificate, clientKey):
1184        return self._set_xdcr_reference(True, name, hostname, username,
1185                                        password, encrypted, encryptionType,
1186                                        certificate, clientCertificate, clientKey)
1187
1188    def _set_xdcr_reference(self, edit, name, hostname, username, password,
1189                            encrypted, encryptionType, certificate, clientCertificate, clientKey):
1190        url = self.hostname + '/pools/default/remoteClusters'
1191        params = {}
1192
1193        if edit:
1194            url += '/' + urllib.quote(name)
1195
1196        if name is not None:
1197            params["name"] = name
1198        if hostname is not None:
1199            params["hostname"] = hostname
1200        if username is not None:
1201            params["username"] = username
1202        if password is not None:
1203            params["password"] = password
1204        if encrypted is not None:
1205            params["demandEncryption"] = encrypted
1206        if encryptionType is not None:
1207            params["encryptionType"] = encryptionType
1208        if certificate is not None:
1209            params["certificate"] = certificate
1210        if clientCertificate:
1211            params['clientCertificate'] = clientCertificate
1212        if clientKey:
1213            params['clientKey'] = clientKey
1214
1215        return self._post_form_encoded(url, params)
1216
1217    def delete_xdcr_reference(self, name):
1218        url = self.hostname + '/pools/default/remoteClusters/' + urllib.quote(name)
1219        return self._delete(url, None)
1220
1221    def list_xdcr_references(self):
1222        url = self.hostname + '/pools/default/remoteClusters/'
1223        return self._get(url)
1224
1225    def xdcr_replicator_settings(self, chk_interval, worker_batch_size,
1226                                 doc_batch_size, fail_interval, replication_thresh,
1227                                 src_nozzles, dst_nozzles, usage_limit, compression,
1228                                 log_level, stats_interval, replicator_id):
1229        url = self.hostname + '/settings/replications/' + urllib.quote_plus(replicator_id)
1230        params = self._get_xdcr_params(chk_interval, worker_batch_size, doc_batch_size,
1231                                       fail_interval, replication_thresh, src_nozzles,
1232                                       dst_nozzles, usage_limit, compression, log_level,
1233                                       stats_interval)
1234        return self._post_form_encoded(url, params)
1235
1236    def xdcr_global_settings(self, chk_interval, worker_batch_size, doc_batch_size,
1237                             fail_interval, replication_threshold, src_nozzles,
1238                             dst_nozzles, usage_limit, compression, log_level, stats_interval):
1239        url = self.hostname + '/settings/replications'
1240        params = self._get_xdcr_params(chk_interval, worker_batch_size, doc_batch_size,
1241                                       fail_interval, replication_threshold, src_nozzles,
1242                                       dst_nozzles, usage_limit, compression, log_level, stats_interval)
1243        return self._post_form_encoded(url, params)
1244
1245    def _get_xdcr_params(self, chk_interval, worker_batch_size, doc_batch_size,
1246                         fail_interval, replication_threshold, src_nozzles,
1247                         dst_nozzles, usage_limit, compression, log_level, stats_interval):
1248        params = {}
1249        if chk_interval is not None:
1250            params["checkpointInterval"] = chk_interval
1251        if worker_batch_size is not None:
1252            params["workerBatchSize"] = worker_batch_size
1253        if doc_batch_size is not None:
1254            params["docBatchSizeKb"] = doc_batch_size
1255        if fail_interval is not None:
1256            params["failureRestartInterval"] = fail_interval
1257        if replication_threshold is not None:
1258            params["optimisticReplicationThreshold"] = replication_threshold
1259        if src_nozzles is not None:
1260            params["sourceNozzlePerNode"] = src_nozzles
1261        if dst_nozzles is not None:
1262            params["targetNozzlePerNode"] = dst_nozzles
1263        if usage_limit is not None:
1264            params["bandwidthLimit"] = usage_limit
1265        if compression is not None:
1266            params["compressionType"] = compression
1267        if log_level is not None:
1268            params["logLevel"] = log_level
1269        if stats_interval is not None:
1270            params["statsInterval"] = stats_interval
1271        return params
1272
1273    def create_xdcr_replication(self, name, to_bucket, from_bucket, filter, rep_mode, compression):
1274        url = self.hostname + '/controller/createReplication'
1275        params = { "replicationType": "continuous" }
1276
1277        if to_bucket is not None:
1278            params["toBucket"] = to_bucket
1279        if name is not None:
1280            params["toCluster"] = name
1281        if from_bucket is not None:
1282            params["fromBucket"] = from_bucket
1283        if rep_mode is not None:
1284            params["type"] = rep_mode
1285        if filter is not None:
1286            params["filterExpression"] = filter
1287        if compression is not None:
1288            params["compressionType"] = compression
1289
1290        return self._post_form_encoded(url, params)
1291
1292    def delete_xdcr_replicator(self, replicator_id):
1293        url = self.hostname + '/controller/cancelXCDR/' + urllib.quote_plus(replicator_id)
1294        return self._delete(url, None)
1295
1296    def pause_xdcr_replication(self, replicator_id):
1297        url = self.hostname + '/settings/replications/' + urllib.quote_plus(replicator_id)
1298        params = { "pauseRequested": "true" }
1299        return self._post_form_encoded(url, params)
1300
1301    def resume_xdcr_replication(self, replicator_id):
1302        url = self.hostname + '/settings/replications/' + urllib.quote_plus(replicator_id)
1303        params = { "pauseRequested": "false" }
1304        return self._post_form_encoded(url, params)
1305
1306    def list_functions(self):
1307        hosts, errors = self.get_hostnames_for_service(EVENT_SERVICE)
1308        if errors:
1309            return None, errors
1310
1311        if not hosts:
1312            raise ServiceNotAvailableException(EVENT_SERVICE)
1313        url = hosts[0] + '/api/v1/functions'
1314        return self._get(url)
1315
1316    def export_functions(self):
1317        hosts, errors = self.get_hostnames_for_service(EVENT_SERVICE)
1318        if errors:
1319            return None, errors
1320
1321        if not hosts:
1322            raise ServiceNotAvailableException(EVENT_SERVICE)
1323        url = hosts[0] + '/api/v1/export'
1324        return self._get(url)
1325
1326    def import_functions(self, parms):
1327        hosts, errors = self.get_hostnames_for_service(EVENT_SERVICE)
1328        if errors:
1329            return None, errors
1330        if not hosts:
1331            raise ServiceNotAvailableException(EVENT_SERVICE)
1332        url = hosts[0] + '/api/v1/import'
1333        return self._post_json(url, parms)
1334
1335    def delete_function(self, function):
1336        hosts, errors = self.get_hostnames_for_service(EVENT_SERVICE)
1337        if errors:
1338            return None, errors
1339
1340        if not hosts:
1341            raise ServiceNotAvailableException(EVENT_SERVICE)
1342        url = hosts[0] + '/api/v1/functions/' + urllib.quote_plus(function)
1343        return self._delete(url, None)
1344
1345    def deploy_function(self, function, deploy):
1346        hosts, errors = self.get_hostnames_for_service(EVENT_SERVICE)
1347        if errors:
1348            return None, errors
1349
1350        if not hosts:
1351            raise ServiceNotAvailableException(EVENT_SERVICE)
1352
1353        parms = {}
1354        if deploy:
1355            parms["deployment_status"] = True
1356            parms["processing_status"] = True
1357        else:
1358            parms["deployment_status"] = False
1359            parms["processing_status"] = False
1360
1361        url = hosts[0] + '/api/v1/functions/' + urllib.quote_plus(function) + '/settings'
1362        return self._post_json(url, parms)
1363
1364    # Low level methods for basic HTML operations
1365
1366    @request
1367    def _get(self, url):
1368        if self.debug:
1369            print "GET %s" % url
1370        response = requests.get(url, auth=(self.username, self.password), verify=self.caCert,
1371                                cert=self.cert, timeout=self.timeout)
1372        return _handle_response(response, self.debug)
1373
1374    @request
1375    def _post_form_encoded(self, url, params):
1376        if self.debug:
1377            if params is None:
1378                params = {}
1379            print "POST %s %s" % (url, urllib.urlencode(params))
1380        response = requests.post(url, auth=(self.username, self.password), data=params,
1381                                 cert=self.cert, verify=self.caCert, timeout=self.timeout)
1382        return _handle_response(response, self.debug)
1383
1384    @request
1385    def _post_json(self, url, params):
1386        if self.debug:
1387            if params is None:
1388                params = {}
1389            print "POST %s %s" % (url, json.dumps(params))
1390        response = requests.post(url, auth=(self.username, self.password), json=params,
1391                                 cert=self.cert, verify=self.caCert, timeout=self.timeout)
1392        return _handle_response(response, self.debug)
1393
1394    @request
1395    def _put(self, url, params):
1396        if self.debug:
1397            if params is None:
1398                params = {}
1399            print "PUT %s %s" % (url, urllib.urlencode(params))
1400        response = requests.put(url, params, auth=(self.username, self.password),
1401                                cert=None, verify=self.caCert, timeout=self.timeout)
1402        return _handle_response(response, self.debug)
1403
1404    @request
1405    def _put_json(self, url, params):
1406        if self.debug:
1407            if params is None:
1408                params = {}
1409            print "PUT %s %s" % (url, json.dumps(params))
1410        response = requests.put(url, auth=(self.username, self.password), json=params,
1411                                cert=None, verify=self.caCert, timeout=self.timeout)
1412        return _handle_response(response, self.debug)
1413
1414    @request
1415    def _delete(self, url, params):
1416        if self.debug:
1417            if params is None:
1418                params = {}
1419            print "DELETE %s %s" % (url, urllib.urlencode(params))
1420        response = requests.delete(url, auth=(self.username, self.password), data=params,
1421                                   cert=None, verify=self.caCert, timeout=self.timeout)
1422        return _handle_response(response, self.debug)
1423
1424
1425def _handle_response(response, debug):
1426    if debug:
1427        output = str(response.status_code)
1428        if response.headers:
1429            output += ', {0}'.format(response.headers)
1430        if response.content:
1431            response.encoding = 'utf-8'
1432            output += ', {0}'.format(response.content)
1433        print output
1434    if response.status_code in [200, 202]:
1435        if 'Content-Type' not in response.headers:
1436            return "", None
1437        if not response.content:
1438            return "", None
1439        if 'application/json' in response.headers['Content-Type']:
1440            return response.json(), None
1441        else:
1442            response.encoding = 'utf-8'
1443            return response.text, None
1444    elif response.status_code in [400, 404]:
1445        if 'application/json' in response.headers['Content-Type']:
1446            errors = response.json()
1447            if isinstance(errors, list):
1448                return None, errors
1449            if "errors" in errors and isinstance(errors["errors"], list):
1450                return None, errors["errors"]
1451            if isinstance(errors, dict):
1452                if "errors" in errors and isinstance(errors["errors"], dict):
1453                    errors = errors["errors"]
1454                rv = list()
1455                for key, value in errors.iteritems():
1456                    rv.append(key + " - " + str(value))
1457                return None, rv
1458        return None, [response.text]
1459    elif response.status_code == 401:
1460        return None, [ERR_AUTH]
1461    elif response.status_code == 403:
1462        errors = response.json()
1463        return None, [errors["message"] + ": " + ", ".join(errors["permissions"])]
1464    # Error codes from Eventing service
1465    elif response.status_code  in [406, 422, 423]:
1466        errors = response.json()
1467        if "description" in errors:
1468            return None, [errors["description"]]
1469        return None, ['Received unexpected status %d' % response.status_code]
1470    # Error code from Eventing Service
1471    elif response.status_code == 207:
1472        errors = response.json()
1473        if isinstance(errors, list):
1474            rv = list()
1475            for error in errors:
1476                if error['code'] == 20:
1477                    rv.append(error['info'])
1478            return None, rv
1479        else:
1480            return None, ['Received unexpected status %d' % response.status_code]
1481    elif response.status_code == 500:
1482        return None, [ERR_INTERNAL]
1483    else:
1484        return None, ['Received unexpected status %d' % response.status_code]
1485