xref: /6.0.3/couchbase-cli/cluster_manager.py (revision e794237c)
1"""Management API's for Couchbase Cluster"""
2
3
4import csv
5import json
6import requests
7import StringIO
8import time
9import urllib
10import urlparse
11
12N1QL_SERVICE = 'n1ql'
13INDEX_SERVICE = 'index'
14MGMT_SERVICE = 'mgmt'
15FTS_SERVICE = 'fts'
16EVENT_SERVICE = 'eventing'
17
18ERR_AUTH = 'unable to access the REST API - please check your username (-u) and password (-p)'
19ERR_INTERNAL = 'Internal server error, please retry your request'
20
21DEFAULT_REQUEST_TIMEOUT = 60
22
23# Remove this once we can verify SSL certificates
24requests.packages.urllib3.disable_warnings()
25
26def request(f):
27    def g(*args, **kwargs):
28        cm = args[0]
29        url = args[1]
30        try:
31            return f(*args, **kwargs)
32        except requests.exceptions.ConnectionError, e:
33            if '[SSL: CERTIFICATE_VERIFY_FAILED]' in str(e):
34                return None, ['Certificate verification failed.\n' +
35                'If you are using self-signed certificates you can re-run this command with\n' +
36                'the --no-ssl-verify flag. Note however that disabling ssl verification\n' +
37                'means that couchbase-cli will be vulnerable to man-in-the-middle attacks.\n\n' +
38                'For the most secure access to Couchbase make sure that you have X.509\n' +
39                'certificates set up in your cluster and use the --cacert flag to specify\n' +
40                'your client certificate.']
41            elif str(e).startswith('[SSL]'):
42                return None, ['Unable to connect with the given CA certificate: ', str(e)]
43            return None, ['Unable to connect to host at %s: ' % cm.hostname, str(e)]
44        except requests.exceptions.ReadTimeout, e:
45            return None, ['Request to host `%s` timed out after %d seconds' % (url, cm.timeout)]
46    return g
47
48
49class ServiceNotAvailableException(Exception):
50    """An exception raised when a service does not exist in the target cluster"""
51
52    def __init__(self, service):
53        Exception.__init__(self, "Service %s not available in target cluster" % service)
54
55
56class ClusterManager(object):
57    """A set of REST API's for managing a Couchbase cluster"""
58
59    def __init__(self, hostname, username, password, sslFlag=False, verifyCert=True,
60                 caCert=True, debug=False, timeout=DEFAULT_REQUEST_TIMEOUT, cert=None):
61        hostname = hostname.replace("couchbase://", "http://", 1)
62        hostname = hostname.replace("couchbases://", "https://", 1)
63
64        self.hostname = hostname
65        # verify argument on Request functions can take boolean or a path to a CA if
66        # a path is not provide but the cert still needs to be verified it should use
67        # the system provided CAs
68        self.verifyCert = verifyCert
69        self.caCert = caCert
70        if not verifyCert:
71            self.caCert = False
72        # This is for client side certs which is currently not used.
73        self.cert = cert
74
75        parsed = urlparse.urlparse(hostname)
76        if sslFlag:
77            hostport = parsed.hostname.split(':')
78            if parsed.scheme == 'http://':
79                if parsed.port == 8091:
80                    self.hostname = "https://" + parsed.hostname + ":18091"
81                else:
82                    self.hostname = "https://" + parsed.hostname + ":" + parsed.port
83
84            # Certificates and verification are not used when the ssl flag is
85            # specified.
86            self.verifyCert = False
87            self.caCert = False
88
89        self.username = username
90        self.password = password
91        self.timeout = timeout
92        self.ssl = self.hostname.startswith("https://")
93        self.debug = debug
94
95    def restore_index_metadata(self, bucket, index_defs):
96        hosts, errors = self.get_hostnames_for_service(INDEX_SERVICE)
97        if errors:
98            return None, errors
99
100        if not hosts:
101            raise ServiceNotAvailableException(INDEX_SERVICE)
102
103        url = hosts[0] + '/restoreIndexMetadata?bucket=%s' % bucket
104        return self._post_json(url, index_defs)
105
106    def get_index_metadata(self, bucket):
107        hosts, errors = self.get_hostnames_for_service(INDEX_SERVICE)
108        if errors:
109            return None, errors
110
111        if not hosts:
112            raise ServiceNotAvailableException(INDEX_SERVICE)
113
114        url = hosts[0] + '/getIndexMetadata?bucket=%s' % bucket
115        return self._get(url)
116
117    def restore_fts_index_metadata(self, index_defs):
118        hosts, errors = self.get_hostnames_for_service(FTS_SERVICE)
119        if errors:
120            return None, errors
121
122        if not hosts:
123            raise ServiceNotAvailableException(FTS_SERVICE)
124
125        for index_def in index_defs:
126            url = hosts[0] + '/api/index/%s?prevIndexUUID=*' % index_def["name"]
127            if "sourceUUID" in index_def:
128                del index_def["sourceUUID"]
129            result, errors = self._put_json(url, index_def)
130            if errors:
131                return None, errors
132
133        return None, None
134
135    def get_fts_index_metadata(self, bucket):
136        hosts, errors = self.get_hostnames_for_service(FTS_SERVICE)
137        if errors:
138            return None, errors
139
140        if not hosts:
141            raise ServiceNotAvailableException(FTS_SERVICE)
142
143        url = hosts[0] + '/api/index'
144        result, errors = self._get(url)
145        if errors:
146            return None, errors
147
148
149        bucket_index_defs = []
150        if "indexDefs" in result and result["indexDefs"] is not None:
151            for _, index_def in result["indexDefs"]["indexDefs"].iteritems():
152                if index_def["type"] == "fulltext-index" and index_def["sourceName"] == bucket:
153                    bucket_index_defs.append(index_def)
154        return bucket_index_defs, None
155
156    def n1ql_query(self, stmt, args=None):
157        """Sends a N1QL query
158
159        Sends a N1QL query and returns the result of the query. Raises a
160        ServiceNotAvailable exception if the target cluster is no running the n1ql
161        service."""
162
163        hosts, errors = self.get_hostnames_for_service(N1QL_SERVICE)
164        if errors:
165            return None, errors
166
167        if not hosts:
168            raise ServiceNotAvailableException(N1QL_SERVICE)
169
170        url = hosts[0] + '/query/service'
171        body = {'statement': str(stmt)}
172
173        if args:
174            body['args'] = str(args)
175
176        result, errors = self._post_form_encoded(url, body)
177        if errors:
178            return None, errors
179
180        return result, None
181
182    def is_cluster_initialized(self):
183        data, errors = self.pools()
184        if (errors and len(errors) == 1 and errors[0] == ERR_AUTH) or \
185            (data and data['pools'] and len(data['pools']) > 0):
186            return True, None
187        return False, errors
188
189    def is_enterprise(self):
190        data, errors = self.pools()
191        if errors:
192            return None, errors
193        return data["isEnterprise"], None
194
195    def get_hostnames_for_service(self, service_name):
196        """ Gets all hostnames that run a service
197
198        Gets all hostnames for specified service and returns a list of strings
199        in the form "http://hostname:port". If the ClusterManager is configured
200        to use SSL/TLS then "https://" is prefixed to each name instead of
201        "http://"."""
202        url = self.hostname + '/pools/default/nodeServices'
203        data, errors = self._get(url)
204        if errors:
205            return None, errors
206
207        hosts = []
208        for node in data['nodesExt']:
209            node_host = '127.0.0.1'
210            if 'hostname' in node:
211                node_host = node['hostname']
212
213            # Check for Raw IPv6 address
214            if ':' in node_host:
215                node_host = '[' + node_host + ']'
216
217            http_prefix = 'http://'
218            fts_port_name = 'fts'
219            n1ql_port_name = 'n1ql'
220            mgmt_port_name = 'mgmt'
221            index_port_name = 'indexHttp'
222            event_port_name = 'eventingAdminPort'
223
224            if self.ssl:
225                http_prefix = 'https://'
226                n1ql_port_name = 'n1qlSSL'
227                mgmt_port_name = 'mgmtSSL'
228                event_port_name = 'eventingSSL'
229
230                # The is no ssl port for the index or fts services
231
232            if service_name == MGMT_SERVICE and mgmt_port_name in node['services']:
233                hosts.append(http_prefix + node_host + ':' + str(node['services'][mgmt_port_name]))
234
235            if service_name == N1QL_SERVICE and n1ql_port_name in node['services']:
236                hosts.append(http_prefix + node_host + ':' + str(node['services'][n1ql_port_name]))
237
238            if service_name == INDEX_SERVICE and index_port_name in node['services']:
239                hosts.append(http_prefix + node_host + ':' + str(node['services'][index_port_name]))
240
241            if service_name == FTS_SERVICE and fts_port_name in node['services']:
242                hosts.append(http_prefix + node_host + ':' + str(node['services'][fts_port_name]))
243
244            if service_name == EVENT_SERVICE and event_port_name in node['services']:
245                hosts.append(http_prefix + node_host + ':' + str(node['services'][event_port_name]))
246
247        return hosts, None
248
249    def pools(self, pool=None):
250        """ Retrieves information about Couchbase management pools
251
252        Returns Couchbase pools data"""
253        url = self.hostname + '/pools'
254        if pool:
255            url += '/' + pool
256        return self._get(url)
257
258    def set_admin_password(self, password):
259        url = self.hostname + '/controller/resetAdminPassword'
260        params = { "password": password }
261
262        return self._post_form_encoded(url, params)
263
264    def regenerate_admin_password(self):
265        url = self.hostname + '/controller/resetAdminPassword?generate=1'
266
267        return self._post_form_encoded(url, None)
268
269    def rotate_master_pwd(self):
270        url = self.hostname + '/node/controller/rotateDataKey'
271        return self._post_form_encoded(url, None)
272
273    def set_master_pwd(self, password):
274        url = self.hostname + '/node/controller/changeMasterPassword'
275        params = { "newPassword": password }
276        return self._post_form_encoded(url, params)
277
278    def set_pools_default(self, data_ramsize, index_ramsize, fts_ramsize, cbas_ramsize, eventing_ramsize, cluster_name):
279        """ Sets Couchbase RAM Quotas for various services
280
281        Options:
282        data_ramsize - An integer denoting the size in MB, None skips the parameter
283        index_ramsize - An integer denoting the size in MB, None skips the parameter
284        fts_ramsize - An integer denoting the size in MB, None skips the parameter
285        cbas_ramsize - An integer denoting the size in MB, None skips the parameter
286        eventing_ramsize - An integer denoting the size in MB, None skips the parameter
287        cluster_name - Sets a name for the cluster, None skips the parameter
288        """
289        url = self.hostname + '/pools/default'
290        params = {}
291        if data_ramsize:
292            params["memoryQuota"] = data_ramsize
293        if index_ramsize:
294            params["indexMemoryQuota"] = index_ramsize
295        if fts_ramsize:
296            params["ftsMemoryQuota"] = fts_ramsize
297        if cbas_ramsize:
298            params["cbasMemoryQuota"] = cbas_ramsize
299        if eventing_ramsize:
300            params["eventingMemoryQuota"] = eventing_ramsize
301        if cluster_name:
302            params["clusterName"] = cluster_name
303
304        return self._post_form_encoded(url, params)
305
306    def setup_services(self, services):
307        """ Sets the services on a node
308
309        Options:
310        services - A string containing a comma separated list of services
311        """
312        url = self.hostname + '/node/controller/setupServices'
313        params = { "services": services }
314
315        return self._post_form_encoded(url, params)
316
317    def set_admin_credentials(self, username, password, port):
318        """Sets the admin credentials and port for a cluster
319
320        Options:
321        username - The username for the cluster
322        password - The password for the cluster
323        port - The port number for the admin console to listen on. If set to
324               None then the port is kept the same as it currently is.
325        """
326        url = self.hostname + '/settings/web'
327        params = {}
328
329        if username:
330            params["username"] = username
331        if password:
332            params["password"] = password
333        if port:
334            params["port"] = port
335        else:
336            params["port"] = "SAME"
337
338        return self._post_form_encoded(url, params)
339
340    def enable_notifications(self, enable):
341        url = self.hostname + '/settings/stats'
342        params = { "sendStats": "false"}
343
344        if enable:
345            params["sendStats"] = "true"
346
347        return self._post_form_encoded(url, params)
348
349    def get_server_groups(self):
350        url = self.hostname + '/pools/default/serverGroups'
351        return self._get(url)
352
353    def get_server_group(self, groupName):
354        groups, errors = self.get_server_groups()
355        if errors:
356            return None, error
357
358        if not groups or not groups["groups"] or groups["groups"] == 0:
359            return None, ["No server groups found"]
360
361        if groupName:
362            for group in groups["groups"]:
363                if group["name"] == groupName:
364                    return group, None
365            return None, ["Group `%s` not found" % groupName]
366        else:
367            return groups["groups"][0], None
368
369    def add_server(self, add_server, groupName, username, password, services):
370        group, errors = self.get_server_group(groupName)
371        if errors:
372            return None, errors
373
374        url = self.hostname + group["addNodeURI"]
375        params = { "hostname": add_server,
376                   "user": username,
377                   "password": password,
378                   "services": services }
379
380        return self._post_form_encoded(url, params)
381
382    def readd_server(self, server):
383        _, _, _, readd, _, errors = self._get_otps_names(readd_nodes=[server])
384        if errors:
385            return None, errors
386
387        if len(readd) != 1:
388            return None, ["Server not found %s" % server]
389
390        url = self.hostname + '/controller/reAddNode'
391        params = { "otpNode": readd[0] }
392
393        return self._post_form_encoded(url, params)
394
395    def get_tasks(self):
396        url = self.hostname + '/pools/default/tasks'
397        return self._get(url)
398
399    def collect_logs_start(self, servers, redaction_level, salt, log_dir, tmp_dir, upload, upload_host, upload_proxy,
400                           upload_customer, upload_ticket):
401        url = self.hostname + '/controller/startLogsCollection'
402        params = dict()
403
404        if servers == "*":
405            params["nodes"] = servers
406        else:
407            nodes = servers.split(",")
408            known, _, _, readd, _, errors = self._get_otps_names(readd_nodes=nodes)
409            if errors:
410                return None, errors
411
412            if len(nodes) != len(readd):
413                return None, ["Servers list contains invalid servers"]
414
415            params["nodes"] = ",".join(readd)
416
417        if redaction_level:
418            params["logRedactionLevel"] = redaction_level
419        if log_dir:
420            params["logDir"] = log_dir
421        if tmp_dir:
422            params["tmpDir"] = tmp_dir
423        if salt:
424            params["logRedactionSalt"] = salt
425
426        if upload:
427            if upload_host:
428                params["uploadHost"] = upload_host
429            if upload_proxy:
430                params["uploadProxy"] = upload_proxy
431            if upload_customer:
432                params["customer"] = upload_customer
433            if upload_ticket:
434                params["ticket"] = upload_ticket
435
436        return self._post_form_encoded(url, params)
437
438    def collect_logs_stop(self):
439        url = self.hostname + '/controller/cancelLogsCollection'
440        return self._post_form_encoded(url, dict())
441
442    def failover(self, servers_to_failover, force):
443        _, _, failover, _, _, errors = self._get_otps_names(failover_nodes=servers_to_failover)
444        if errors:
445            return None, errors
446
447
448        if len(failover) != len(servers_to_failover):
449            if len(servers_to_failover) == 1:
450                return None, ["Server can't be failed over because it's not part of the cluster"]
451            return None, ["Some nodes specified to be failed over are not part of the cluster"]
452
453        params = {"otpNode": [server for server, _ in failover]}
454
455        if force:
456            params["allowUnsafe"] = "true"
457            url = self.hostname + '/controller/failOver'
458            return self._post_form_encoded(url, params)
459        else:
460            for server, server_status in failover:
461                if server_status != 'healthy':
462                    return None, ["% can't be gracefully failed over because it is not healthy", server]
463            url = self.hostname + '/controller/startGracefulFailover'
464            return self._post_form_encoded(url, params)
465
466    def recovery(self, server, recovery_type):
467        _, _, _, readd, _, errors = self._get_otps_names(readd_nodes=[server])
468        if errors:
469            return None, errors
470
471        if len(readd) != 1:
472            return None, ["Server not found %s" % server]
473
474        url = self.hostname + '/controller/setRecoveryType'
475        params = { "otpNode": readd[0],
476                   "recoveryType": recovery_type }
477
478        return self._post_form_encoded(url, params)
479
480    def rebalance(self, remove_nodes):
481        url = self.hostname + '/controller/rebalance'
482        all, eject, _, _, _, errors = self._get_otps_names(eject_nodes=remove_nodes)
483        if errors:
484            return None, errors
485
486        if len(eject) != len(remove_nodes):
487            return None, ["Some nodes specified to be removed are not part of the cluster"]
488
489        params = { "knownNodes": ','.join(all),
490                   "ejectedNodes": ','.join(eject) }
491
492        return self._post_form_encoded(url, params)
493
494    def rebalance_status(self):
495        data, errors = self.get_tasks()
496        if errors:
497            return (None, None), errors
498
499        rv = {
500            "status": "unknown",
501            "msg": "unknown state",
502            "details": {}
503        }
504
505        for task in data:
506            if task["type"] != "rebalance":
507                continue
508
509            err_msg = None
510            if "errorMessage" in task:
511                rv["status"] = "errored"
512                rv["msg"] = task['errorMessage']
513                break
514            elif task["status"] == "running":
515                rv["status"] = task["status"]
516                rv["msg"] = "Rebalance is running"
517                rv["details"]["progress"] = task["progress"]
518                rv["details"]["refresh"] = task["recommendedRefreshPeriod"]
519                rv["details"]["totalBuckets"] = 0
520                rv["details"]["curBucket"] = 0
521                rv["details"]["curBucketName"] = ""
522
523                if "bucketsCount" in task["detailedProgress"]:
524                    rv["details"]["totalBuckets"] = task["detailedProgress"]["bucketsCount"]
525
526                if "bucketNumber" in task["detailedProgress"]:
527                    rv["details"]["curBucket"] = task["detailedProgress"]["bucketNumber"]
528
529                if "bucket" in task["detailedProgress"]:
530                    rv["details"]["curBucketName"] = task["detailedProgress"]["bucket"]
531
532                acc = 0
533                if "perNode" in task["detailedProgress"]:
534
535                    for _, node in task["detailedProgress"]["perNode"].iteritems():
536                        acc += node["ingoing"]["docsTotal"] - node["ingoing"]["docsTransferred"]
537                        acc += node["outgoing"]["docsTotal"] - node["outgoing"]["docsTransferred"]
538
539                rv["details"]["docsRemaining"] = acc
540            elif task["status"] == "notRunning":
541                rv["status"] = task["status"]
542                rv["msg"] = "Rebalance is not running"
543                if "statusIsStale" in task:
544                    if task["statusIsStale"] or task["statusIsStale"] == "true":
545                        rv["status"] = "stale"
546                        rv["msg"] = "Current status us stale, please retry"
547
548            break
549
550        return rv, None
551
552    def _get_otps_names(self, eject_nodes=[], failover_nodes=[], readd_nodes=[]):
553        result, errors = self.pools('default')
554        if errors:
555            return None, None, None, None, None, errors
556
557        all = list()
558        eject = list()
559        failover = list()
560        readd = list()
561        hostnames = list()
562        for node in result["nodes"]:
563            if "otpNode" not in node:
564                return [], [], [], [], [], ["Unable to get otp names"]
565            all.append(node['otpNode'])
566            hostnames.append(node['hostname'])
567            if node['hostname'] in eject_nodes:
568                eject.append(node['otpNode'])
569            if node['hostname'] in failover_nodes:
570                if node['clusterMembership'] != 'active':
571                    return [], [], [], [], [], ["Can't failover a node that isn't in the cluster"]
572                else:
573                    failover.append((node['otpNode'], node['status']))
574            _, host = node['otpNode'].split('@')
575            hostport = "%s:%d" % (host, 8091)
576            if node['hostname'] in readd_nodes or hostport in readd_nodes:
577                readd.append(node['otpNode'])
578
579        return all, eject, failover, readd, hostnames, None
580
581    def create_bucket(self, name, bucket_type, memory_quota,
582                      eviction_policy, replicas, replica_indexes,
583                      threads_number, conflict_resolution, flush_enabled,
584                      max_ttl, compression_mode, sync, timeout=60):
585        url = self.hostname + '/pools/default/buckets'
586
587        if name is None:
588            return None ["The bucket name is required when creating a bucket"]
589        if bucket_type is None:
590            return None ["The bucket type is required when creating a bucket"]
591        if memory_quota is None:
592            return None ["The bucket memory quota is required when creating a bucket"]
593
594        params = { "name": name,
595                   "bucketType": bucket_type,
596                   "ramQuotaMB": memory_quota }
597
598        if eviction_policy is not None:
599            params["evictionPolicy"] = eviction_policy
600        if replicas is not None:
601            params["replicaNumber"] = replicas
602        if replica_indexes is not None:
603            params["replicaIndex"] = replica_indexes
604        if threads_number is not None:
605            params["threadsNumber"] = threads_number
606        if conflict_resolution is not None:
607            params["conflictResolutionType"] = conflict_resolution
608        if flush_enabled is not None:
609            params["flushEnabled"] = flush_enabled
610        if max_ttl is not None:
611            params["maxTTL"] = max_ttl
612        if compression_mode is not None:
613            params["compressionMode"] = compression_mode
614
615        result, errors = self._post_form_encoded(url, params)
616        if errors:
617            return None, errors
618
619        if sync:
620            all_node_ready = False
621            start = time.time()
622            while (time.time() - start) <= timeout and not all_node_ready:
623                buckets, errors = self.list_buckets()
624                if name not in buckets:
625                    time.sleep(1)
626                    continue
627
628                url = self.hostname + '/pools/default/buckets/' + name
629                content, errors = self._get(url)
630                if errors:
631                    return None, errors
632
633                all_node_ready = True
634                for node in content["nodes"]:
635                    if node["status"] != "healthy":
636                        all_node_ready = False
637                        break
638                if not all_node_ready:
639                    time.sleep(1)
640
641            if not all_node_ready:
642                return None, ["Bucket created, but not ready after %d seconds" % timeout]
643
644        return result, None
645
646
647    def edit_bucket(self, name, memory_quota, eviction_policy,
648                    replicas, threads_number, flush_enabled, max_ttl,
649                    compression_mode, remove_port):
650        url = self.hostname + '/pools/default/buckets/' + name
651
652        if name is None:
653            return None ["The bucket name is required when editing a bucket"]
654
655        params = {}
656        if memory_quota is not None:
657            params["ramQuotaMB"] = memory_quota
658        if eviction_policy is not None:
659            params["evictionPolicy"] = eviction_policy
660        if replicas is not None:
661            params["replicaNumber"] = replicas
662        if threads_number is not None:
663            params["threadsNumber"] = threads_number
664        if flush_enabled is not None:
665            params["flushEnabled"] = flush_enabled
666        if max_ttl is not None:
667            params["maxTTL"] = max_ttl
668        if compression_mode is not None:
669            params["compressionMode"] = compression_mode
670        if remove_port:
671            params["proxyPort"] = "none"
672
673        return self._post_form_encoded(url, params)
674
675    def delete_bucket(self, name):
676        url = self.hostname + '/pools/default/buckets/' + name
677        return self._delete(url, None)
678
679    def flush_bucket(self, name):
680        if name is None:
681            return None ["The bucket name is required when flushing a bucket"]
682
683        url = self.hostname + '/pools/default/buckets/' + name + '/controller/doFlush'
684        return self._post_form_encoded(url, None)
685
686    def compact_bucket(self, name, view_only, data_only):
687        if data_only and not view_only:
688            url = self.hostname + '/pools/default/buckets/' + name + \
689                '/controller/compactDatabases'
690            return self._post_form_encoded(url, None)
691        elif view_only and not data_only:
692            url = self.hostname + '/pools/default/buckets/' + name + '/ddocs'
693            ddocs, errors = self._get(url)
694            if errors:
695                return None, errors
696
697            for row in ddocs["rows"]:
698                url = self.hostname + row["controllers"]["compact"]
699                _, errors = self._post_form_encoded(url, None)
700                if errors:
701                    return None, errors
702            return None, None
703        elif not data_only and not view_only:
704            url = self.hostname + '/pools/default/buckets/' + name + \
705                '/controller/compactBucket'
706            return self._post_form_encoded(url, None)
707        else:
708            return None, ["Cannot compact data only and view only, pick one or neither"]
709
710    def list_buckets(self, extended=False):
711        url = self.hostname + '/pools/default/buckets'
712        result, errors = self._get(url)
713        if errors:
714            return None, errors
715
716        if extended:
717            return result, errors
718
719        names = list()
720        for bucket in result:
721            names.append(bucket["name"])
722
723        return names, None
724
725    def get_bucket(self, name):
726        url = self.hostname + '/pools/default/buckets'
727        result, errors = self._get(url)
728        if errors:
729            return None, errors
730
731        for bucket in result:
732            if bucket["name"] == name:
733                return bucket, None
734
735        return None, ["Bucket not found"]
736
737    def set_data_paths(self, data_path, index_path, cbas_path):
738        url = self.hostname + '/nodes/self/controller/settings'
739        params = dict()
740
741        if data_path is not None:
742            params["path"] = data_path
743
744        if index_path is not None:
745            params["index_path"] = index_path
746
747        if cbas_path is not None:
748            params["cbas_path"] = cbas_path
749
750        return self._post_form_encoded(url, params)
751
752    def node_info(self):
753        url = self.hostname + '/nodes/self'
754        return self._get(url)
755
756    def get_babysitter_cookie(self):
757        url = self.hostname + '/diag/eval'
758        return self._post_form_encoded(url, 'ns_server:get_babysitter_cookie().')
759
760    def set_hostname(self, hostname):
761        url = self.hostname + '/node/controller/rename'
762        params = { "hostname": hostname }
763        return self._post_form_encoded(url, params)
764
765    def stop_rebalance(self):
766        params = {"allowUnsafe": "true"}
767        url = self.hostname + '/controller/stopRebalance'
768        return self._post_form_encoded(url, params)
769
770    def create_server_group(self, name):
771        url = self.hostname + '/pools/default/serverGroups'
772        params = { "name": name }
773        return self._post_form_encoded(url, params)
774
775    def delete_server_group(self, name):
776        uri, errors = self._get_server_group_uri(name)
777        if errors:
778            return None, errors
779
780        url = self.hostname + uri
781        params = { "name": name }
782        return self._delete(url, params)
783
784    def rename_server_group(self, name, new_name):
785        uri, errors = self._get_server_group_uri(name)
786        if errors:
787            return None, errors
788
789        url = self.hostname + uri
790        params = { "name": new_name }
791        return self._put(url, params)
792
793    def move_servers_between_groups(self, servers, from_group, to_group):
794        groups, errors = self.get_server_groups()
795        if errors:
796            return None, errors
797
798        # Find the groups to move servers between
799        move_from_group = None
800        move_to_group = None
801        for group in groups["groups"]:
802            if from_group == group['name']:
803                move_from_group = group
804            if to_group == group['name']:
805                move_to_group = group
806
807        if move_from_group is None:
808            return None, ["Group to move servers from `%s` not found" % from_group]
809        if move_to_group is None:
810            return None, ["Group to move servers to `%s` not found" % from_group]
811
812        # Find the servers to move in the from group
813        nodes_to_move = []
814        for server in servers:
815            found = False
816            for node in move_from_group["nodes"]:
817                if server == node["hostname"]:
818                    nodes_to_move.append(node)
819                    move_from_group["nodes"].remove(node)
820                    found = True
821
822            if not found:
823                return None, ["Can't move %s because it doesn't exist in '%s'" % (server, from_group)]
824
825        # Move the servers to the to group
826        for node in nodes_to_move:
827            move_to_group["nodes"].append(node)
828
829        url = self.hostname + groups["uri"]
830        return self._put_json(url, groups)
831
832    def get_server_groups(self):
833        url = self.hostname + '/pools/default/serverGroups'
834        return self._get(url)
835
836    def _get_server_group_uri(self, name):
837        groups, errors = self.get_server_groups()
838        if errors:
839            return errors
840
841        for group in groups["groups"]:
842            if name == group["name"]:
843                return group["uri"], None
844        return None, ["Group `%s` not found" % name]
845
846    def delete_rbac_user(self, username, auth_domain):
847        url = self.hostname + '/settings/rbac/users/%s/%s' % (auth_domain, username)
848        return self._delete(url, None)
849
850    def list_rbac_users(self):
851        url = self.hostname + '/settings/rbac/users'
852        return self._get(url)
853
854    def my_roles(self):
855        url = self.hostname + '/whoami'
856        return self._get(url)
857
858    def set_rbac_user(self, username, password, name, roles, auth_domain):
859        if auth_domain is None:
860            return None, ["The authentication type is required"]
861
862        if username is None:
863            return None, ["The username is required"]
864
865        url = self.hostname + '/settings/rbac/users/%s/%s' % (auth_domain, username)
866
867        params = {}
868        if name is not None:
869            params["name"] = name
870        if password is not None:
871            params["password"] = password
872        if roles is not None:
873            params["roles"] = roles
874        return self._put(url, params)
875
876    def get_password_policy(self):
877        url = self.hostname + '/settings/passwordPolicy'
878        return self._get(url)
879
880    def set_security_settings(self, disable_http_ui):
881        url = self.hostname + '/settings/security'
882
883        params = {
884            "disableUIOverHttp":  "true" if disable_http_ui else "false"
885        }
886
887        return self._post_form_encoded(url, params)
888
889    def set_password_policy(self, min_length, upper_case, lower_case, digit,
890                            special_char):
891        url = self.hostname + '/settings/passwordPolicy'
892
893        params = {
894            "minLength": min_length,
895            "enforceUppercase": "true" if upper_case else "false",
896            "enforceLowercase": "true" if lower_case else "false",
897            "enforceDigits": "true" if digit else "false",
898            "enforceSpecialChars": "true" if special_char else "false"
899        }
900
901        return self._post_form_encoded(url, params)
902
903    def set_audit_settings(self, enabled, log_path, rotate_interval, rotate_size):
904        url = self.hostname + '/settings/audit'
905
906        params = dict()
907        if enabled:
908            params["auditdEnabled"] = enabled
909        if log_path:
910            params["logPath"] = log_path
911        if rotate_interval:
912            params["rotateInterval"] = rotate_interval
913        if rotate_size:
914            params["rotateSize"] = rotate_size
915
916        if "logPath" not in params and "auditdEnabled" in params and params["auditdEnabled"] == "true":
917            return None, ["The audit log path must be specified when auditing is first set up"]
918
919        return self._post_form_encoded(url, params)
920
921    def set_autofailover_settings(self, enabled, timeout, failoverOfServerGroups, maxCount,
922                                  failoverOnDataDiskIssuesEnabled, failoverOnDataDiskIssuesTimePeriod):
923        url = self.hostname + '/settings/autoFailover'
924
925        params = dict()
926        if enabled:
927            params["enabled"] = enabled
928        if timeout:
929            params["timeout"] = timeout
930        if failoverOfServerGroups:
931            params["failoverServerGroup"] = failoverOfServerGroups
932        if failoverOnDataDiskIssuesEnabled:
933            params["failoverOnDataDiskIssues[enabled]"] = failoverOnDataDiskIssuesEnabled
934        if maxCount:
935            params["maxCount"] = maxCount
936        if failoverOnDataDiskIssuesTimePeriod:
937            params["failoverOnDataDiskIssues[timePeriod]"] = failoverOnDataDiskIssuesTimePeriod
938
939        return self._post_form_encoded(url, params)
940
941    def set_autoreprovision_settings(self, enabled, max_nodes):
942        url = self.hostname + '/settings/autoReprovision'
943
944        params = dict()
945        if enabled:
946            params["enabled"] = enabled
947        if max_nodes:
948            params["maxNodes"] = max_nodes
949
950        return self._post_form_encoded(url, params)
951
952    def set_compaction_settings(self, dbFragPerc, dbFragSize, viewFragPerc, viewFragSize,
953                                fromHour, fromMin, toHour, toMin, abortOutside,
954                                parallelDBAndViewCompact, purgeInterval, gsiMode, gsiPerc,
955                                gsiInterval, gsiFromHour, gsiFromMin, gsiToHour, gsiToMin,
956                                enableGsiAbort):
957        url = self.hostname + '/controller/setAutoCompaction'
958        params = dict()
959
960        if dbFragPerc is not None:
961            params["databaseFragmentationThreshold[percentage]"] = dbFragPerc
962        if dbFragSize is not None:
963            params["databaseFragmentationThreshold[size]"] = dbFragSize
964        if viewFragPerc is not None:
965            params["viewFragmentationThreshold[percentage]"] = viewFragPerc
966        if viewFragSize is not None:
967            params["viewFragmentationThreshold[size]"] = viewFragSize
968        if fromHour is not None:
969            params["allowedTimePeriod[fromHour]"] = fromHour
970        if fromMin is not None:
971            params["allowedTimePeriod[fromMinute]"] = fromMin
972        if toHour is not None:
973            params["allowedTimePeriod[toHour]"] = toHour
974        if toMin is not None:
975            params["allowedTimePeriod[toMinute]"] = toMin
976        if abortOutside is not None:
977            params["allowedTimePeriod[abortOutside]"] = abortOutside
978        if parallelDBAndViewCompact is not None:
979            params["parallelDBAndViewCompaction"] = parallelDBAndViewCompact
980        if purgeInterval is not None:
981            params["purgeInterval"] = purgeInterval
982        if gsiMode is not None:
983            params["indexCompactionMode"] = gsiMode
984        if gsiPerc is not None:
985            params["indexFragmentationThreshold[percentage]"] = gsiPerc
986        if gsiInterval is not None:
987            params["indexCircularCompaction[daysOfWeek]"] = gsiInterval
988        if gsiFromHour is not None:
989            params["indexCircularCompaction[interval][fromHour]"] = gsiFromHour
990        if gsiFromMin is not None:
991            params["indexCircularCompaction[interval][fromMinute]"] = gsiFromMin
992        if gsiToHour is not None:
993            params["indexCircularCompaction[interval][toHour]"] = gsiToHour
994        if gsiToMin is not None:
995            params["indexCircularCompaction[interval][toMinute]"] = gsiToMin
996        if enableGsiAbort is not None:
997            params["indexCircularCompaction[interval][abortOutside]"] = enableGsiAbort
998
999        return self._post_form_encoded(url, params)
1000
1001    def set_index_settings(self, storageMode, maxRollbackPoints, stableSnapInterval,
1002                           memSnapInterval, threads, logLevel):
1003        """ Sets global index settings"""
1004        params = dict()
1005        if storageMode is not None:
1006            params["storageMode"] = storageMode
1007        if maxRollbackPoints is not None:
1008            params["maxRollbackPoints"] = maxRollbackPoints
1009        if stableSnapInterval is not None:
1010            params["stableSnapshotInterval"] = stableSnapInterval
1011        if memSnapInterval is not None:
1012            params["memorySnapshotInterval"] = memSnapInterval
1013        if threads is not None:
1014            params["indexerThreads"] = threads
1015        if logLevel is not None:
1016            params["logLevel"] = logLevel
1017
1018        url = self.hostname + '/settings/indexes'
1019        return self._post_form_encoded(url, params)
1020
1021    def set_alert_settings(self, enabled_email_alerts, email_recipients,
1022                           email_sender, email_user, email_pass, email_host,
1023                           email_port, email_encrypted, alerts):
1024        url = self.hostname + '/settings/alerts'
1025        params = dict()
1026
1027        if enabled_email_alerts:
1028            params["enabled"] = enabled_email_alerts
1029        if email_recipients:
1030            params["recipients"] = email_recipients
1031        if email_sender:
1032            params["sender"] = email_sender
1033        if email_user:
1034            params["emailUser"] = email_user
1035        if email_pass:
1036            params["emailPass"] = email_pass
1037        if email_host:
1038            params["emailHost"] = email_host
1039        if email_port:
1040            params["emailPort"] = email_port
1041        if email_encrypted:
1042            params["emailEncrypt"] = email_encrypted
1043        if alerts is not None:
1044            params["alerts"] = alerts
1045
1046        return self._post_form_encoded(url, params)
1047
1048    def index_settings(self):
1049        """ Retrieves the index settings
1050
1051            Returns a map of all global index settings"""
1052        url = self.hostname + '/settings/indexes'
1053        return self._get(url)
1054
1055    def ldap_settings(self, enabled, read_only_admins, admins):
1056        """ Sets LDAP Settings
1057
1058        enabled - The string "true" or "false"
1059        admins - A new line separated list or the string "asterisk"
1060        read_only_admins - A new line separated list or the string "asterisk"
1061        """
1062
1063        url = self.hostname + '/settings/saslauthdAuth'
1064        params = { "enabled": enabled }
1065
1066        if read_only_admins is not None:
1067            params["roAdmins"] = read_only_admins
1068        if admins is not None:
1069            params["admins"] = admins
1070
1071        return self._post_form_encoded(url, params)
1072
1073    def setRoles(self,userList,roleList,userNameList):
1074        # we take a comma-delimited list of roles that needs to go into a dictionary
1075        paramDict = {"roles" : roleList}
1076        userIds = []
1077        userNames = []
1078        userF = StringIO.StringIO(userList)
1079        for idList in csv.reader(userF, delimiter=','):
1080            userIds.extend(idList)
1081
1082        # did they specify user names?
1083        if userNameList != None:
1084            userNameF = StringIO.StringIO(userNameList)
1085            for nameList in csv.reader(userNameF, delimiter=','):
1086                userNames.extend(nameList)
1087            if len(userNames) != len(userIds):
1088                return None, ["Error: specified %d user ids and %d user names, must have the same number of each." %  (len(userIds),len(userNames))]
1089
1090        # did they specify user names?
1091        # but we need a separate REST call for each user in the comma-delimited user list
1092        for index in range(len(userIds)):
1093            user = userIds[index]
1094            paramDict["id"] = user
1095            if len(userNames) > 0:
1096                paramDict["name"] = userNames[index]
1097            url = self.hostname + '/settings/rbac/users/' + user
1098            data, errors = self._put(url,paramDict)
1099            if errors:
1100                return data, errors
1101
1102        return data, errors
1103
1104    def deleteRoles(self,userList):
1105        # need a separate REST call for each user in the comma-delimited user list
1106        userF = StringIO.StringIO(userList)
1107        reader = csv.reader(userF, delimiter=',')
1108        for users in reader:
1109            for user in users:
1110                url = self.hostname + '/settings/rbac/users/' + user
1111                data, errors = self._delete(url, None)
1112                if errors:
1113                    return data, errors
1114
1115        return data, errors
1116
1117        url = self.hostname + '/settings/rbac/users'
1118        data, errors = self._get(url)
1119
1120        return data, errors
1121
1122    def retrieve_cluster_certificate(self, extended=False):
1123        """ Retrieves the current cluster certificate
1124
1125        Gets the current cluster certificate. If extended is set tot True then
1126        we return the extended certificate which contains the certificate type,
1127        certicicate key, expiration, subject, and warnings."""
1128        url = self.hostname + '/pools/default/certificate'
1129        if extended:
1130            url += '?extended=true'
1131        return self._get(url)
1132
1133    def regenerate_cluster_certificate(self):
1134        """ Regenerates the cluster certificate
1135
1136        Regenerates the cluster certificate and returns the new certificate."""
1137        url = self.hostname + '/controller/regenerateCertificate'
1138        return self._post_form_encoded(url, None)
1139
1140    def upload_cluster_certificate(self, certificate):
1141        """ Uploads a new cluster certificate"""
1142        url = self.hostname + '/controller/uploadClusterCA'
1143        return self._post_form_encoded(url, certificate)
1144
1145    def retrieve_node_certificate(self, node):
1146        """ Retrieves the current node certificate
1147
1148        Returns the current node certificate"""
1149        url = self.hostname + '/pools/default/certificate/node/' + node
1150        return self._get(url)
1151
1152    def set_node_certificate(self):
1153        """Activates the current node certificate
1154
1155        Grabs chain.pem and pkey.pem from the <data folder>/inbox/ directory and
1156        applies them to the node. chain.pem contains the chain encoded certificates
1157        starting from the node certificat and ending with the last intermediate
1158        certificate before cluster CA. pkey.pem contains the pem encoded private
1159        key for node certifiactes. Both files should exist on the server before
1160        this API is called."""
1161        url = self.hostname + '/node/controller/reloadCertificate'
1162        return self._post_form_encoded(url, None)
1163
1164    def set_client_cert_auth(self, config):
1165        """Enable/disable the client cert auth"""
1166        url = self.hostname + '/settings/clientCertAuth'
1167        return self._post_json(url, config)
1168
1169    def retrieve_client_cert_auth(self):
1170        url = self.hostname + '/settings/clientCertAuth'
1171        return self._get(url)
1172
1173    def create_xdcr_reference(self, name, hostname, username, password, encrypted,
1174                              encryptionType, certificate, clientCertificate, clientKey):
1175        return self._set_xdcr_reference(False, name, hostname, username,
1176                                        password, encrypted, encryptionType,
1177                                        certificate, clientCertificate, clientKey)
1178
1179    def edit_xdcr_reference(self, name, hostname, username, password, encrypted,
1180                            encryptionType, certificate, clientCertificate, clientKey):
1181        return self._set_xdcr_reference(True, name, hostname, username,
1182                                        password, encrypted, encryptionType,
1183                                        certificate, clientCertificate, clientKey)
1184
1185    def _set_xdcr_reference(self, edit, name, hostname, username, password,
1186                            encrypted, encryptionType, certificate, clientCertificate, clientKey):
1187        url = self.hostname + '/pools/default/remoteClusters'
1188        params = {}
1189
1190        if edit:
1191            url += '/' + urllib.quote(name)
1192
1193        if name is not None:
1194            params["name"] = name
1195        if hostname is not None:
1196            params["hostname"] = hostname
1197        if username is not None:
1198            params["username"] = username
1199        if password is not None:
1200            params["password"] = password
1201        if encrypted is not None:
1202            params["demandEncryption"] = encrypted
1203        if encryptionType is not None:
1204            params["encryptionType"] = encryptionType
1205        if certificate is not None:
1206            params["certificate"] = certificate
1207        if clientCertificate:
1208            params['clientCertificate'] = clientCertificate
1209        if clientKey:
1210            params['clientKey'] = clientKey
1211
1212        return self._post_form_encoded(url, params)
1213
1214    def delete_xdcr_reference(self, name):
1215        url = self.hostname + '/pools/default/remoteClusters/' + urllib.quote(name)
1216        return self._delete(url, None)
1217
1218    def list_xdcr_references(self):
1219        url = self.hostname + '/pools/default/remoteClusters/'
1220        return self._get(url)
1221
1222    def xdcr_replicator_settings(self, chk_interval, worker_batch_size,
1223                                 doc_batch_size, fail_interval, replication_thresh,
1224                                 src_nozzles, dst_nozzles, usage_limit, compression,
1225                                 log_level, stats_interval, replicator_id):
1226        url = self.hostname + '/settings/replications/' + urllib.quote_plus(replicator_id)
1227        params = self._get_xdcr_params(chk_interval, worker_batch_size, doc_batch_size,
1228                                       fail_interval, replication_thresh, src_nozzles,
1229                                       dst_nozzles, usage_limit, compression, log_level,
1230                                       stats_interval)
1231        return self._post_form_encoded(url, params)
1232
1233    def xdcr_global_settings(self, chk_interval, worker_batch_size, doc_batch_size,
1234                             fail_interval, replication_threshold, src_nozzles,
1235                             dst_nozzles, usage_limit, compression, log_level, stats_interval):
1236        url = self.hostname + '/settings/replications'
1237        params = self._get_xdcr_params(chk_interval, worker_batch_size, doc_batch_size,
1238                                       fail_interval, replication_threshold, src_nozzles,
1239                                       dst_nozzles, usage_limit, compression, log_level, stats_interval)
1240        return self._post_form_encoded(url, params)
1241
1242    def _get_xdcr_params(self, chk_interval, worker_batch_size, doc_batch_size,
1243                         fail_interval, replication_threshold, src_nozzles,
1244                         dst_nozzles, usage_limit, compression, log_level, stats_interval):
1245        params = {}
1246        if chk_interval is not None:
1247            params["checkpointInterval"] = chk_interval
1248        if worker_batch_size is not None:
1249            params["workerBatchSize"] = worker_batch_size
1250        if doc_batch_size is not None:
1251            params["docBatchSizeKb"] = doc_batch_size
1252        if fail_interval is not None:
1253            params["failureRestartInterval"] = fail_interval
1254        if replication_threshold is not None:
1255            params["optimisticReplicationThreshold"] = replication_threshold
1256        if src_nozzles is not None:
1257            params["sourceNozzlePerNode"] = src_nozzles
1258        if dst_nozzles is not None:
1259            params["targetNozzlePerNode"] = dst_nozzles
1260        if usage_limit is not None:
1261            params["bandwidthLimit"] = usage_limit
1262        if compression is not None:
1263            params["compressionType"] = compression
1264        if log_level is not None:
1265            params["logLevel"] = log_level
1266        if stats_interval is not None:
1267            params["statsInterval"] = stats_interval
1268        return params
1269
1270    def create_xdcr_replication(self, name, to_bucket, from_bucket, filter, rep_mode, compression):
1271        url = self.hostname + '/controller/createReplication'
1272        params = { "replicationType": "continuous" }
1273
1274        if to_bucket is not None:
1275            params["toBucket"] = to_bucket
1276        if name is not None:
1277            params["toCluster"] = name
1278        if from_bucket is not None:
1279            params["fromBucket"] = from_bucket
1280        if rep_mode is not None:
1281            params["type"] = rep_mode
1282        if filter is not None:
1283            params["filterExpression"] = filter
1284        if compression is not None:
1285            params["compressionType"] = compression
1286
1287        return self._post_form_encoded(url, params)
1288
1289    def delete_xdcr_replicator(self, replicator_id):
1290        url = self.hostname + '/controller/cancelXCDR/' + urllib.quote_plus(replicator_id)
1291        return self._delete(url, None)
1292
1293    def pause_xdcr_replication(self, replicator_id):
1294        url = self.hostname + '/settings/replications/' + urllib.quote_plus(replicator_id)
1295        params = { "pauseRequested": "true" }
1296        return self._post_form_encoded(url, params)
1297
1298    def resume_xdcr_replication(self, replicator_id):
1299        url = self.hostname + '/settings/replications/' + urllib.quote_plus(replicator_id)
1300        params = { "pauseRequested": "false" }
1301        return self._post_form_encoded(url, params)
1302
1303    def list_functions(self):
1304        hosts, errors = self.get_hostnames_for_service(EVENT_SERVICE)
1305        if errors:
1306            return None, errors
1307
1308        if not hosts:
1309            raise ServiceNotAvailableException(EVENT_SERVICE)
1310        url = hosts[0] + '/api/v1/functions'
1311        return self._get(url)
1312
1313    def export_functions(self):
1314        hosts, errors = self.get_hostnames_for_service(EVENT_SERVICE)
1315        if errors:
1316            return None, errors
1317
1318        if not hosts:
1319            raise ServiceNotAvailableException(EVENT_SERVICE)
1320        url = hosts[0] + '/api/v1/export'
1321        return self._get(url)
1322
1323    def import_functions(self, parms):
1324        hosts, errors = self.get_hostnames_for_service(EVENT_SERVICE)
1325        if errors:
1326            return None, errors
1327        if not hosts:
1328            raise ServiceNotAvailableException(EVENT_SERVICE)
1329        url = hosts[0] + '/api/v1/import'
1330        return self._post_json(url, parms)
1331
1332    def delete_function(self, function):
1333        hosts, errors = self.get_hostnames_for_service(EVENT_SERVICE)
1334        if errors:
1335            return None, errors
1336
1337        if not hosts:
1338            raise ServiceNotAvailableException(EVENT_SERVICE)
1339        url = hosts[0] + '/api/v1/functions/' + urllib.quote_plus(function)
1340        return self._delete(url, None)
1341
1342    def deploy_function(self, function, deploy):
1343        hosts, errors = self.get_hostnames_for_service(EVENT_SERVICE)
1344        if errors:
1345            return None, errors
1346
1347        if not hosts:
1348            raise ServiceNotAvailableException(EVENT_SERVICE)
1349
1350        parms = {}
1351        if deploy:
1352            parms["deployment_status"] = True
1353            parms["processing_status"] = True
1354        else:
1355            parms["deployment_status"] = False
1356            parms["processing_status"] = False
1357
1358        url = hosts[0] + '/api/v1/functions/' + urllib.quote_plus(function) + '/settings'
1359        return self._post_json(url, parms)
1360
1361    # Low level methods for basic HTML operations
1362
1363    @request
1364    def _get(self, url):
1365        if self.debug:
1366            print "GET %s" % url
1367        response = requests.get(url, auth=(self.username, self.password), verify=self.caCert,
1368                                cert=self.cert, timeout=self.timeout)
1369        return _handle_response(response, self.debug)
1370
1371    @request
1372    def _post_form_encoded(self, url, params):
1373        if self.debug:
1374            if params is None:
1375                params = {}
1376            print "POST %s %s" % (url, urllib.urlencode(params))
1377        response = requests.post(url, auth=(self.username, self.password), data=params,
1378                                 cert=self.cert, verify=self.caCert, timeout=self.timeout)
1379        return _handle_response(response, self.debug)
1380
1381    @request
1382    def _post_json(self, url, params):
1383        if self.debug:
1384            if params is None:
1385                params = {}
1386            print "POST %s %s" % (url, json.dumps(params))
1387        response = requests.post(url, auth=(self.username, self.password), json=params,
1388                                 cert=self.cert, verify=self.caCert, timeout=self.timeout)
1389        return _handle_response(response, self.debug)
1390
1391    @request
1392    def _put(self, url, params):
1393        if self.debug:
1394            if params is None:
1395                params = {}
1396            print "PUT %s %s" % (url, urllib.urlencode(params))
1397        response = requests.put(url, params, auth=(self.username, self.password),
1398                                cert=None, verify=self.caCert, timeout=self.timeout)
1399        return _handle_response(response, self.debug)
1400
1401    @request
1402    def _put_json(self, url, params):
1403        if self.debug:
1404            if params is None:
1405                params = {}
1406            print "PUT %s %s" % (url, json.dumps(params))
1407        response = requests.put(url, auth=(self.username, self.password), json=params,
1408                                cert=None, verify=self.caCert, timeout=self.timeout)
1409        return _handle_response(response, self.debug)
1410
1411    @request
1412    def _delete(self, url, params):
1413        if self.debug:
1414            if params is None:
1415                params = {}
1416            print "DELETE %s %s" % (url, urllib.urlencode(params))
1417        response = requests.delete(url, auth=(self.username, self.password), data=params,
1418                                   cert=None, verify=self.caCert, timeout=self.timeout)
1419        return _handle_response(response, self.debug)
1420
1421
1422def _handle_response(response, debug):
1423    if debug:
1424        print response.status_code, response.text
1425    if response.status_code in [200, 202]:
1426        if 'Content-Type' not in response.headers:
1427            return "", None
1428        if not response.text:
1429            return "", None
1430        if 'application/json' in response.headers['Content-Type']:
1431            return response.json(), None
1432        else:
1433            return response.text, None
1434    elif response.status_code in [400, 404]:
1435        if 'application/json' in response.headers['Content-Type']:
1436            errors = response.json()
1437            if isinstance(errors, list):
1438                return None, errors
1439            if "errors" in errors and isinstance(errors["errors"], list):
1440                return None, errors["errors"]
1441            if isinstance(errors, dict):
1442                if "errors" in errors and isinstance(errors["errors"], dict):
1443                    errors = errors["errors"]
1444                rv = list()
1445                for key, value in errors.iteritems():
1446                    rv.append(key + " - " + str(value))
1447                return None, rv
1448        return None, [response.text]
1449    elif response.status_code == 401:
1450        return None, [ERR_AUTH]
1451    elif response.status_code == 403:
1452        errors = response.json()
1453        return None, [errors["message"] + ": " + ", ".join(errors["permissions"])]
1454    # Error codes from Eventing service
1455    elif response.status_code  in [406, 422, 423]:
1456        errors = response.json()
1457        if "description" in errors:
1458            return None, [errors["description"]]
1459        return None, ['Received unexpected status %d' % response.status_code]
1460    # Error code from Eventing Service
1461    elif response.status_code == 207:
1462        errors = response.json()
1463        if isinstance(errors, list):
1464            rv = list()
1465            for error in errors:
1466                if error['code'] == 20:
1467                    rv.append(error['info'])
1468            return None, rv
1469        else:
1470            return None, ['Received unexpected status %d' % response.status_code]
1471    elif response.status_code == 500:
1472        return None, [ERR_INTERNAL]
1473    else:
1474        return None, ['Received unexpected status %d' % response.status_code]
1475