xref: /3.0.3-GA/couchbase-cli/node.py (revision 3e8dfdab)
1"""
2  Implementation for rebalance, add, remove, stop rebalance.
3"""
4
5import time
6import os
7import sys
8import util_cli as util
9import socket
10import simplejson as json
11import re
12import urlparse
13
14from usage import usage
15from restclient import *
16from listservers import *
17
18MAX_LEN_PASSWORD = 24
19
20# the rest commands and associated URIs for various node operations
21
22rest_cmds = {
23    'rebalance'         :'/controller/rebalance',
24    'rebalance-stop'    :'/controller/stopRebalance',
25    'rebalance-status'  :'/pools/default/rebalanceProgress',
26    'server-add'        :'/controller/addNode',
27    'server-readd'      :'/controller/reAddNode',
28    'failover'          :'/controller/failOver',
29    'recovery'          :'/controller/setRecoveryType',
30    'cluster-init'      :'/settings/web',
31    'cluster-edit'      :'/settings/web',
32    'node-init'         :'/nodes/self/controller/settings',
33    'setting-compaction'    :'/controller/setAutoCompaction',
34    'setting-notification'  :'/settings/stats',
35    'setting-autofailover'  :'/settings/autoFailover',
36    'setting-alert'         :'/settings/alerts',
37    'user-manage'           :'/settings/readOnlyUser',
38    'group-manage'          :'/pools/default/serverGroups',
39    'ssl-manage'            :'/pools/default/certificate',
40    'collect-logs-start'  : '/controller/startLogsCollection',
41    'collect-logs-stop'   : '/controller/cancelLogsCollection',
42    'collect-logs-status' : '/pools/default/tasks',
43}
44
45server_no_remove = [
46    'rebalance-stop',
47    'rebalance-status',
48    'server-add',
49    'server-readd',
50    'failover',
51    'recovery',
52]
53server_no_add = [
54    'rebalance-stop',
55    'rebalance-status',
56    'failover',
57    'recovery',
58]
59
60# Map of operations and the HTTP methods used against the REST interface
61
62methods = {
63    'rebalance'         :'POST',
64    'rebalance-stop'    :'POST',
65    'rebalance-status'  :'GET',
66    'eject-server'      :'POST',
67    'server-add'        :'POST',
68    'server-readd'      :'POST',
69    'failover'          :'POST',
70    'recovery'          :'POST',
71    'cluster-init'      :'POST',
72    'cluster-edit'      :'POST',
73    'node-init'         :'POST',
74    'setting-compaction'    :'POST',
75    'setting-notification'  :'POST',
76    'setting-autofailover'  :'POST',
77    'setting-alert'         :'POST',
78    'user-manage'           :'POST',
79    'group-manage'          :'POST',
80    'ssl-manage'            :'GET',
81    'collect-logs-start'  : 'POST',
82    'collect-logs-stop'   : 'POST',
83    'collect-logs-status' : 'GET',
84}
85
86bool_to_str = lambda value: str(bool(int(value))).lower()
87
88# Map of HTTP success code, success message and error message for
89# handling HTTP response properly
90
91class Node:
92    SEP = ";"
93    def __init__(self):
94        self.rest_cmd = rest_cmds['rebalance-status']
95        self.method = 'GET'
96        self.debug = False
97        self.server = ''
98        self.port = ''
99        self.user = ''
100        self.password = ''
101        self.ro_username = ''
102        self.ro_password = ''
103        self.params = {}
104        self.output = 'standard'
105        self.password_new = None
106        self.username_new = None
107        self.sa_username = None
108        self.sa_password = None
109        self.port_new = None
110        self.per_node_quota = None
111        self.data_path = None
112        self.index_path = None
113        self.hostname = None
114        self.enable_auto_failover = None
115        self.enable_notification = None
116        self.autofailover_timeout = None
117        self.enable_email_alert = None
118
119        #compaction related settings
120        self.compaction_db_percentage = None
121        self.compaction_db_size = None
122        self.compaction_view_percentage = None
123        self.compaction_view_size = None
124        self.compaction_period_from = None
125        self.compaction_period_to = None
126        self.enable_compaction_abort = None
127        self.enable_compaction_parallel = None
128        self.purge_interval = None
129
130        #alert settings
131        self.email_recipient = None
132        self.email_sender = None
133        self.email_user = None
134        self.email_password = None
135        self.email_host = None
136        self.email_port = None
137        self.email_enable_encrypt = None
138        self.autofailover_node = None
139        self.autofailover_max_reached = None
140        self.autofailover_node_down = None
141        self.autofailover_cluster_small = None
142        self.alert_ip_changed = None
143        self.alert_disk_space = None
144        self.alert_meta_overhead = None
145        self.alert_meta_oom = None
146        self.alert_write_failed = None
147
148        #group management
149        self.group_name = None
150        self.server_list = []
151        self.from_group = None
152        self.to_group = None
153        self.group_rename = None
154
155        #SSL certificate management
156        self.certificate_file = None
157        self.cmd = None
158
159        self.hard_failover = None
160        self.recovery_type = None
161        self.recovery_buckets = None
162
163        # Collect logs
164        self.nodes = None
165        self.all_nodes = None
166        self.upload = False
167        self.upload_host = None
168        self.customer = None
169        self.ticket = ""
170
171    def runCmd(self, cmd, server, port,
172               user, password, opts):
173        self.rest_cmd = rest_cmds[cmd]
174        self.method = methods[cmd]
175        self.server = server
176        self.port = int(port)
177        self.user = user
178        self.password = password
179        servers = self.processOpts(cmd, opts)
180        if self.debug:
181            print "INFO: servers %s" % servers
182
183        if cmd == 'server-add' and not servers['add']:
184            usage("please list one or more --server-add=HOST[:PORT];"
185                  " or use -h for more help.")
186
187        if cmd == 'server-readd' and not servers['add']:
188            usage("please list one or more --server-add=HOST[:PORT];"
189                  " or use -h for more help.")
190
191        if cmd in ('server-add', 'rebalance'):
192            if len(servers['add']) > 0:
193                if not self.group_name:
194                    self.addServers(servers['add'])
195                else:
196                    self.groupAddServers()
197            if cmd == 'rebalance':
198                self.rebalance(servers)
199
200        elif cmd == 'server-readd':
201            self.reAddServers(servers)
202
203        elif cmd == 'rebalance-status':
204            output_result = self.rebalanceStatus()
205            print output_result
206
207        elif cmd == 'rebalance-stop':
208            output_result = self.rebalanceStop()
209            print output_result
210
211        elif cmd == 'failover':
212            if len(servers['failover']) <= 0:
213                usage("please list one or more --server-failover=HOST[:PORT];"
214                      " or use -h for more help.")
215
216            self.failover(servers)
217
218        elif cmd == 'recovery':
219            if len(servers['recovery']) <= 0:
220                usage("please list one or more --server-recovery=HOST[:PORT];"
221                      " or use -h for more help.")
222            self.recovery(servers)
223
224        elif cmd in ('cluster-init', 'cluster-edit'):
225            self.clusterInit(cmd)
226
227        elif cmd == 'node-init':
228            self.nodeInit()
229
230        elif cmd == 'setting-compaction':
231            self.compaction()
232
233        elif cmd == 'setting-notification':
234            self.notification()
235
236        elif cmd == 'setting-alert':
237            self.alert()
238
239        elif cmd == 'setting-autofailover':
240            self.autofailover()
241
242        elif cmd == 'user-manage':
243            self.userManage()
244
245        elif cmd == 'group-manage':
246            self.groupManage()
247
248        elif cmd == 'ssl-manage':
249            self.retrieveCert()
250
251        elif cmd == 'collect-logs-start':
252            self.collectLogsStart(servers)
253
254        elif cmd == 'collect-logs-stop':
255            self.collectLogsStop()
256
257        elif cmd == 'collect-logs-status':
258            self.collectLogsStatus()
259
260    def clusterInit(self, cmd):
261        rest = restclient.RestClient(self.server,
262                                     self.port,
263                                     {'debug':self.debug})
264        if self.port_new:
265            rest.setParam('port', self.port_new)
266        else:
267            rest.setParam('port', 'SAME')
268        rest.setParam('initStatus', 'done')
269        if self.username_new:
270            rest.setParam('username', self.username_new)
271        else:
272            rest.setParam('username', self.user)
273        if self.password_new:
274            rest.setParam('password', self.password_new)
275        else:
276            rest.setParam('password', self.password)
277
278        if not (rest.getParam('username') and rest.getParam('password')):
279            print "ERROR: Both username and password are required."
280            return
281
282        if len(rest.getParam('password')) > MAX_LEN_PASSWORD:
283            print "ERROR: Password length %s exceeds maximum number of characters allowed, which is %s" \
284                  % (len(rest.getParam('password')), MAX_LEN_PASSWORD)
285            return
286
287        opts = {
288            "error_msg": "unable to init/modify %s" % self.server,
289            "success_msg": "init %s" % self.server
290        }
291
292        output_result = rest.restCmd(self.method,
293                                     self.rest_cmd,
294                                     self.user,
295                                     self.password,
296                                     opts)
297        # per node quota unfortunately runs against a different location
298        if cmd == "cluster-init" and not self.per_node_quota:
299            print "ERROR: option cluster-init-ramsize is not specified"
300            return
301        if self.port_new:
302            self.port = int(self.port_new)
303        if self.username_new:
304            self.user = self.username_new
305        if self.password_new:
306            self.password = self.password_new
307
308        rest = restclient.RestClient(self.server,
309                                     self.port,
310                                     {'debug':self.debug})
311        if self.per_node_quota:
312            rest.setParam('memoryQuota', self.per_node_quota)
313
314        output_result = rest.restCmd(self.method,
315                                     '/pools/default',
316                                     self.user,
317                                     self.password,
318                                     opts)
319        print output_result
320
321
322    def nodeInit(self):
323        rest = restclient.RestClient(self.server,
324                                     self.port,
325                                     {'debug':self.debug})
326        if self.data_path:
327            rest.setParam('path', self.data_path)
328
329        if self.index_path:
330            rest.setParam('index_path', self.index_path)
331
332        opts = {
333            "error_msg": "unable to init %s" % self.server,
334            "success_msg": "init %s" % self.server
335        }
336
337        output_result = rest.restCmd(self.method,
338                                     self.rest_cmd,
339                                     self.user,
340                                     self.password,
341                                     opts)
342        print output_result
343        if self.hostname:
344            rest = restclient.RestClient(self.server,
345                                         self.port,
346                                         {'debug':self.debug})
347            if self.hostname:
348                rest.setParam('hostname', self.hostname)
349
350            opts = {
351                "error_msg": "unable to set hostname for %s" % self.server,
352                "success_msg": "set hostname for %s" % self.server
353            }
354
355            output_result = rest.restCmd('POST',
356                                         '/node/controller/rename',
357                                         self.user,
358                                         self.password,
359                                         opts)
360            print output_result
361
362    def compaction(self):
363        rest = restclient.RestClient(self.server,
364                                     self.port,
365                                     {'debug':self.debug})
366
367        if self.compaction_db_percentage:
368            rest.setParam('databaseFragmentationThreshold[percentage]', self.compaction_db_percentage)
369        if self.compaction_db_size:
370            self.compaction_db_size = int(self.compaction_db_size) * 1024**2
371            rest.setParam('databaseFragmentationThreshold[size]', self.compaction_db_size)
372        if self.compaction_view_percentage:
373            rest.setParam('viewFragmentationThreshold[percentage]', self.compaction_view_percentage)
374        if self.compaction_view_size:
375            self.compaction_view_size = int(self.compaction_view_size) * 1024**2
376            rest.setParam('viewFragmentationThreshold[size]', self.compaction_view_size)
377        if self.compaction_period_from:
378            hour, minute = self.compaction_period_from.split(':')
379            if (int(hour) not in range(24)) or (int(minute) not in range(60)):
380                print "ERROR: invalid hour or minute value for compaction period"
381                return
382            else:
383                rest.setParam('allowedTimePeriod[fromHour]', int(hour))
384                rest.setParam('allowedTimePeriod[fromMinute]', int(minute))
385        if self.compaction_period_to:
386            hour, minute = self.compaction_period_to.split(':')
387            if (int(hour) not in range(24)) or (int(minute) not in range(60)):
388                print "ERROR: invalid hour or minute value for compaction"
389                return
390            else:
391                rest.setParam('allowedTimePeriod[toHour]', hour)
392                rest.setParam('allowedTimePeriod[toMinute]', minute)
393        if self.enable_compaction_abort:
394            rest.setParam('allowedTimePeriod[abortOutside]', self.enable_compaction_abort)
395        if self.enable_compaction_parallel:
396            rest.setParam('parallelDBAndViewCompaction', self.enable_compaction_parallel)
397        else:
398            self.enable_compaction_parallel = bool_to_str(0)
399            rest.setParam('parallelDBAndViewCompaction', self.enable_compaction_parallel)
400
401        if self.compaction_period_from or self.compaction_period_to or self.enable_compaction_abort:
402            if not (self.compaction_period_from and self.compaction_period_to and \
403                    self.enable_compaction_abort):
404                print "ERROR: compaction-period-from, compaction-period-to and enable-compaction-abort have to be specified at the same time"
405                return
406        if self.purge_interval:
407            rest.setParam('purgeInterval', self.purge_interval)
408
409        opts = {
410            "error_msg": "unable to set compaction settings",
411            "success_msg": "set compaction settings"
412        }
413        output_result = rest.restCmd(self.method,
414                                     self.rest_cmd,
415                                     self.user,
416                                     self.password,
417                                     opts)
418        print output_result
419
420    def notification(self):
421        rest = restclient.RestClient(self.server,
422                                     self.port,
423                                     {'debug':self.debug})
424        if self.enable_notification:
425            rest.setParam('sendStats', self.enable_notification)
426
427        opts = {
428            "error_msg": "unable to set notification settings",
429            "success_msg": "set notification settings"
430        }
431        output_result = rest.restCmd(self.method,
432                                     self.rest_cmd,
433                                     self.user,
434                                     self.password,
435                                     opts)
436        print output_result
437
438    def alert(self):
439        rest = restclient.RestClient(self.server,
440                                     self.port,
441                                     {'debug':self.debug})
442        alert_opts = ''
443        if self.enable_email_alert:
444            rest.setParam('enabled', self.enable_email_alert)
445        if self.email_recipient:
446            rest.setParam('recipients', self.email_recipient)
447        if self.email_sender:
448            rest.setParam('sender', self.email_sender)
449        if self.email_user:
450            rest.setParam('emailUser', self.email_user)
451        if self.email_password:
452            rest.setParam('emailPass', self.email_password)
453        if self.email_host:
454            rest.setParam('emailHost', self.email_host)
455        if self.email_port:
456            rest.setParam('emailPort', self.email_port)
457        if self.email_enable_encrypt:
458            rest.setParam('emailEncrypt', self.email_enable_encrypt)
459        if self.autofailover_node:
460            alert_opts = alert_opts + 'auto_failover_node,'
461        if self.autofailover_max_reached:
462            alert_opts = alert_opts + 'auto_failover_maximum_reached,'
463        if self.autofailover_node_down:
464            alert_opts = alert_opts + 'auto_failover_other_nodes_down,'
465        if self.autofailover_cluster_small:
466            alert_opts = alert_opts + 'auto_failover_cluster_too_small,'
467        if self.alert_ip_changed:
468            alert_opts = alert_opts + 'ip,'
469        if self.alert_disk_space:
470            alert_opts = alert_opts + 'disk,'
471        if self.alert_meta_overhead:
472            alert_opts = alert_opts + 'overhead,'
473        if self.alert_meta_oom:
474            alert_opts = alert_opts + 'ep_oom_errors,'
475        if self.alert_write_failed:
476             alert_opts = alert_opts + 'ep_item_commit_failed,'
477
478        if alert_opts:
479            # remove last separator
480            alert_opts = alert_opts[:-1]
481            rest.setParam('alerts', alert_opts)
482
483        opts = {
484            "error_msg": "unable to set alert settings",
485            "success_msg": "set alert settings"
486        }
487        output_result = rest.restCmd(self.method,
488                                     self.rest_cmd,
489                                     self.user,
490                                     self.password,
491                                     opts)
492        print output_result
493
494    def autofailover(self):
495        rest = restclient.RestClient(self.server,
496                                     self.port,
497                                     {'debug':self.debug})
498        if self.autofailover_timeout:
499            if int(self.autofailover_timeout) < 30:
500                print "ERROR: Timeout value must be larger than 30 second."
501                return
502            else:
503                rest.setParam('timeout', self.autofailover_timeout)
504
505        if self.enable_auto_failover:
506            rest.setParam('enabled', self.enable_auto_failover)
507
508        opts = {
509            "error_msg": "unable to set auto failover settings",
510            "success_msg": "set auto failover settings"
511        }
512        output_result = rest.restCmd(self.method,
513                                     self.rest_cmd,
514                                     self.user,
515                                     self.password,
516                                     opts)
517        print output_result
518
519    def processOpts(self, cmd, opts):
520        """ Set standard opts.
521            note: use of a server key keeps optional
522            args aligned with server.
523            """
524        servers = {
525            'add': {},
526            'remove': {},
527            'failover': {},
528            'recovery': {},
529            'log': {},
530        }
531
532        # don't allow options that don't correspond to given commands
533
534        for o, a in opts:
535            usage_msg = "option '%s' is not used with command '%s'" % (o, cmd)
536
537            if o in ( "-r", "--server-remove"):
538                if cmd in server_no_remove:
539                    usage(usage_msg)
540            elif o in ( "-a", "--server-add",
541                        "--server-add-username",
542                        "--server-add-password"):
543                if cmd in server_no_add:
544                    usage(usage_msg)
545
546        server = None
547        for o, a in opts:
548            if o in ("-a", "--server-add"):
549                if a == "self":
550                    a = socket.gethostbyname(socket.getfqdn())
551                server = "%s:%d" % util.hostport(a)
552                servers['add'][server] = { 'user':'', 'password':''}
553                self.server_list.append(server)
554            elif o == "--server-add-username":
555                if server:
556                    servers['add'][server]['user'] = a
557                self.sa_username = a
558            elif o == "--server-add-password":
559                if server:
560                    servers['add'][server]['password'] = a
561                self.sa_password = a
562            elif o in ( "-r", "--server-remove"):
563                server = "%s:%d" % util.hostport(a)
564                servers['remove'][server] = True
565                server = None
566            elif o in ( "--server-failover"):
567                server = "%s:%d" % util.hostport(a)
568                servers['failover'][server] = True
569                server = None
570            elif o in ( "--server-recovery"):
571                server = "%s:%d" % util.hostport(a)
572                servers['recovery'][server] = True
573                server = None
574            elif o == "--nodes":
575                for server in self.normalize_servers(a):
576                    servers['log'][server] = True
577            elif o in ('-o', '--output'):
578                if a == 'json':
579                    self.output = a
580                server = None
581            elif o in ('-d', '--debug'):
582                self.debug = True
583                server = None
584            elif o in ('--cluster-init-password', '--cluster-password'):
585                self.password_new = a
586            elif o in ('--cluster-init-username', '--cluster-username'):
587                self.username_new = a
588            elif o in ('--cluster-init-port', '--cluster-port'):
589                self.port_new = a
590            elif o in ('--cluster-init-ramsize', '--cluster-ramsize'):
591                self.per_node_quota = a
592            elif o == '--enable-auto-failover':
593                self.enable_auto_failover = bool_to_str(a)
594            elif o == '--enable-notification':
595                self.enable_notification = bool_to_str(a)
596            elif o == '--auto-failover-timeout':
597                self.autofailover_timeout = a
598            elif o == '--compaction-db-percentage':
599                self.compaction_db_percentage = a
600            elif o == '--compaction-db-size':
601                self.compaction_db_size = a
602            elif o == '--compaction-view-percentage':
603                self.compaction_view_percentage = a
604            elif o == '--compaction-view-size':
605                self.compaction_view_size = a
606            elif o == '--compaction-period-from':
607                self.compaction_period_from = a
608            elif o == '--compaction-period-to':
609                self.compaction_period_to = a
610            elif o == '--enable-compaction-abort':
611                self.enable_compaction_abort = bool_to_str(a)
612            elif o == '--enable-compaction-parallel':
613                self.enable_compaction_parallel = bool_to_str(a)
614            elif o == '--enable-email-alert':
615                self.enable_email_alert = bool_to_str(a)
616            elif o == '--node-init-data-path':
617                self.data_path = a
618            elif o == '--node-init-index-path':
619                self.index_path = a
620            elif o == '--node-init-hostname':
621                self.hostname = a
622            elif o == '--email-recipients':
623                self.email_recipient = a
624            elif o == '--email-sender':
625                self.email_sender = a
626            elif o == '--email-user':
627                self.email_user = a
628            elif o == '--email-password':
629                self.email_password = a
630            elif o == '--email-host':
631                self.email_host = a
632            elif o == 'email-port':
633                self.email_port = a
634            elif o == '--enable-email-encrypt':
635                self.email_enable_encrypt = bool_to_str(a)
636            elif o == '--alert-auto-failover-node':
637                self.autofailover_node = True
638            elif o == '--alert-auto-failover-max-reached':
639                self.autofailover_max_reached = True
640            elif o == '--alert-auto-failover-node-down':
641                self.autofailover_node_down = True
642            elif o == '--alert-auto-failover-cluster-small':
643                self.autofailover_cluster_small = True
644            elif o == '--alert-ip-changed':
645                self.alert_ip_changed = True
646            elif o == '--alert-disk-space':
647                self.alert_disk_space = True
648            elif o == '--alert-meta-overhead':
649                self.alert_meta_overhead = True
650            elif o == '--alert-meta-oom':
651                self.alert_meta_oom = True
652            elif o == '--alert-write-failed':
653                self.alert_write_failed = True
654            elif o == '--create':
655                self.cmd = 'create'
656            elif o == '--list':
657                self.cmd = 'list'
658            elif o == '--delete':
659                self.cmd = 'delete'
660            elif o == '--set':
661                self.cmd = 'set'
662            elif o == '--ro-username':
663                self.ro_username = a
664            elif o == '--ro-password':
665                self.ro_password = a
666            elif o == '--metadata-purge-interval':
667                self.purge_interval = a
668            elif o == '--group-name':
669                self.group_name = a
670            elif o == '--add-servers':
671                self.server_list = self.normalize_servers(a)
672                self.cmd = 'add-servers'
673            elif o == '--remove-servers':
674                self.server_list = self.normalize_servers(a)
675                self.cmd = 'remove-servers'
676            elif o == '--move-servers':
677                self.server_list = self.normalize_servers(a)
678                self.cmd = 'move-servers'
679            elif o == '--from-group':
680                self.from_group = a
681            elif o == '--to-group':
682                self.to_group = a
683            elif o == '--rename':
684                self.group_rename = a
685                self.cmd = 'rename'
686            elif o == '--retrieve-cert':
687                self.cmd = 'retrieve'
688                self.certificate_file = a
689            elif o == '--regenerate-cert':
690                self.cmd = 'regenerate'
691                self.certificate_file = a
692            elif o == '--force':
693                self.hard_failover = True
694            elif o == '--recovery-type':
695                self.recovery_type = a
696            elif o == '--recovery-buckets':
697                self.recovery_buckets = a
698            elif o == '--nodes':
699                self.nodes = a
700            elif o == '--all-nodes':
701                self.all_nodes = True
702            elif o == '--upload':
703                self.upload = True
704            elif o == '--upload-host':
705                self.upload_host = a
706            elif o == '--customer':
707                self.customer = a
708            elif o == '--ticket':
709                self.ticket = a
710
711        return servers
712
713    def normalize_servers(self, server_list):
714        slist = []
715        for server in server_list.split(Node.SEP):
716            hostport = "%s:%d" % util.hostport(server)
717            slist.append(hostport)
718        return slist
719
720    def addServers(self, servers):
721        for server in servers:
722            user = servers[server]['user']
723            password = servers[server]['password']
724            output_result = self.serverAdd(server,
725                                           user,
726                                           password)
727            print output_result
728
729    def serverAdd(self, add_server, add_with_user, add_with_password):
730        rest = restclient.RestClient(self.server,
731                                     self.port,
732                                     {'debug':self.debug})
733        rest.setParam('hostname', add_server)
734        if add_with_user and add_with_password:
735            rest.setParam('user', add_with_user)
736            rest.setParam('password', add_with_password)
737
738        opts = {
739            'error_msg': "unable to server-add %s" % add_server,
740            'success_msg': "server-add %s" % add_server
741        }
742        output_result = rest.restCmd('POST',
743                                     rest_cmds['server-add'],
744                                     self.user,
745                                     self.password,
746                                     opts)
747        return output_result
748
749    def reAddServers(self, servers):
750        known_otps, eject_otps, failover_otps, readd_otps, _ = \
751            self.getNodeOtps(to_readd=servers['add'])
752
753        for readd_otp in readd_otps:
754            rest = restclient.RestClient(self.server,
755                                         self.port,
756                                         {'debug':self.debug})
757            rest.setParam('otpNode', readd_otp)
758
759            opts = {
760                'error_msg': "unable to re-add %s" % readd_otp,
761                'success_msg': "re-add %s" % readd_otp
762            }
763            output_result = rest.restCmd('POST',
764                                         rest_cmds['server-readd'],
765                                         self.user,
766                                         self.password,
767                                         opts)
768            print output_result
769
770    def getNodeOtps(self, to_eject=[], to_failover=[], to_readd=[]):
771        """ Convert known nodes into otp node id's.
772            """
773        listservers = ListServers()
774        known_nodes_list = listservers.getNodes(
775                                listservers.getData(self.server,
776                                                    self.port,
777                                                    self.user,
778                                                    self.password))
779        known_otps = []
780        eject_otps = []
781        failover_otps = []
782        readd_otps = []
783        hostnames = []
784
785        for node in known_nodes_list:
786            if node.get('otpNode') is None:
787                raise Exception("could not access node")
788            known_otps.append(node['otpNode'])
789            hostnames.append(node['hostname'])
790            if node['hostname'] in to_eject:
791                eject_otps.append(node['otpNode'])
792            if node['hostname'] in to_failover:
793                if node['clusterMembership'] != 'active':
794                    raise Exception('node %s is not active' % node['hostname'])
795                else:
796                    failover_otps.append((node['otpNode'], node['status']))
797            _, host = node['otpNode'].split('@')
798            hostport = "%s:%d" % util.hostport(host)
799            if node['hostname'] in to_readd or hostport in to_readd:
800                readd_otps.append(node['otpNode'])
801
802        return (known_otps, eject_otps, failover_otps, readd_otps, hostnames)
803
804    def recovery(self, servers):
805        known_otps, eject_otps, failover_otps, readd_otps, _ = \
806            self.getNodeOtps(to_readd=servers['recovery'])
807        for readd_otp in readd_otps:
808            rest = restclient.RestClient(self.server,
809                                         self.port,
810                                         {'debug':self.debug})
811            opts = {
812                'error_msg': "unable to setRecoveryType for node %s" % readd_otp,
813                'success_msg': "setRecoveryType for node %s" % readd_otp
814            }
815            rest.setParam('otpNode', readd_otp)
816            if self.recovery_type:
817                rest.setParam('recoveryType', self.recovery_type)
818            else:
819                rest.setParam('recoveryType', 'delta')
820            output_result = rest.restCmd('POST',
821                                         '/controller/setRecoveryType',
822                                         self.user,
823                                         self.password,
824                                         opts)
825            print output_result
826
827    def rebalance(self, servers):
828        known_otps, eject_otps, failover_otps, readd_otps, _ = \
829            self.getNodeOtps(to_eject=servers['remove'])
830        rest = restclient.RestClient(self.server,
831                                     self.port,
832                                     {'debug':self.debug})
833        rest.setParam('knownNodes', ','.join(known_otps))
834        rest.setParam('ejectedNodes', ','.join(eject_otps))
835        if self.recovery_buckets:
836            rest.setParam('requireDeltaRecoveryBuckets', self.recovery_buckets)
837        opts = {
838            'success_msg': 'rebalanced cluster',
839            'error_msg': 'unable to rebalance cluster'
840        }
841        output_result = rest.restCmd('POST',
842                                     rest_cmds['rebalance'],
843                                     self.user,
844                                     self.password,
845                                     opts)
846        if self.debug:
847            print "INFO: rebalance started: %s" % output_result
848
849        sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
850
851        print "INFO: rebalancing",
852
853        status, error = self.rebalanceStatus(prefix='\n')
854        while status in['running', 'unknown']:
855            print ".",
856            time.sleep(0.5)
857            try:
858                status, error = self.rebalanceStatus(prefix='\n')
859            except socket.error:
860                time.sleep(2)
861                status, error = self.rebalanceStatus(prefix='\n')
862
863        if error:
864            print '\n' + error
865            sys.exit(1)
866        else:
867            print '\n' + output_result
868
869    def rebalanceStatus(self, prefix=''):
870        rest = restclient.RestClient(self.server,
871                                     self.port,
872                                     {'debug':self.debug})
873
874        opts = {
875            'error_msg': "unable to obtain rebalance status",
876            'success_msg': "retrieve replication status successfully"
877        }
878        output_result = rest.restCmd('GET',
879                                     '/pools/default/tasks',
880                                     self.user,
881                                     self.password,
882                                     opts)
883        tasks = rest.getJson(output_result)
884        if 'errorMessage' in tasks:
885            error_message = tasks['errorMessage']
886        else:
887            error_message = None
888        for task in tasks:
889            if task["type"] == "rebalance":
890                if task["status"] == "running":
891                    return task["status"], error_message
892                if task["status"] == "notRunning":
893                    if task.has_key("statusIsStale"):
894                        if task["statusIsStale"] or task["statusIsStale"] == "true":
895                            return "unknown", error_message
896
897                return task["status"], error_message
898
899        return "unknown", error_message
900
901    def rebalanceStop(self):
902        rest = restclient.RestClient(self.server,
903                                     self.port,
904                                     {'debug':self.debug})
905
906        opts = {
907            'success_msg': 'rebalance cluster stopped',
908            'error_msg': 'unable to stop rebalance'
909        }
910        output_result = rest.restCmd('POST',
911                                     rest_cmds['rebalance-stop'],
912                                     self.user,
913                                     self.password,
914                                     opts)
915        return output_result
916
917
918    def failover(self, servers):
919        known_otps, eject_otps, failover_otps, readd_otps, _ = \
920            self.getNodeOtps(to_failover=servers['failover'])
921
922        if len(failover_otps) <= 0:
923            usage("specified servers are not part of the cluster: %s" %
924                  servers['failover'].keys())
925
926        for failover_otp, node_status in failover_otps:
927            rest = restclient.RestClient(self.server,
928                                         self.port,
929                                         {'debug':self.debug})
930            opts = {
931                'error_msg': "unable to failover %s" % failover_otp,
932                'success_msg': "failover %s" % failover_otp
933            }
934            rest.setParam('otpNode', failover_otp)
935            if self.hard_failover or node_status != 'healthy':
936                output_result = rest.restCmd('POST',
937                                             rest_cmds['failover'],
938                                             self.user,
939                                             self.password,
940                                             opts)
941                print output_result
942            else:
943                output_result = rest.restCmd('POST',
944                                             '/controller/startGracefulFailover',
945                                             self.user,
946                                             self.password,
947                                             opts)
948                if self.debug:
949                    print "INFO: rebalance started: %s" % output_result
950
951                sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
952
953                print "INFO: graceful failover",
954
955                status, error = self.rebalanceStatus(prefix='\n')
956                while status == 'running':
957                    print ".",
958                    time.sleep(0.5)
959                    try:
960                        status, error = self.rebalanceStatus(prefix='\n')
961                    except socket.error:
962                        time.sleep(2)
963                        status, error = self.rebalanceStatus(prefix='\n')
964
965                if error:
966                    print '\n' + error
967                else:
968                    print '\n' + output_result
969
970    def userManage(self):
971        if self.cmd == 'list':
972            self.roUserList()
973        elif self.cmd == 'delete':
974            self.roUserDelete()
975        elif self.cmd == 'set':
976            self.roUserSet()
977
978    def roUserList(self):
979        rest = restclient.RestClient(self.server,
980                                     self.port,
981                                     {'debug':self.debug})
982        opts = { 'error_msg':'not any read only user defined'}
983        try:
984            output_result = rest.restCmd('GET',
985                                         '/settings/readOnlyAdminName',
986                                         self.user,
987                                         self.password,
988                                         opts)
989            json = rest.getJson(output_result)
990            print json
991        except:
992            pass
993
994    def roUserDelete(self):
995        rest = restclient.RestClient(self.server,
996                                     self.port,
997                                     {'debug':self.debug})
998
999        opts = {
1000            'success_msg': 'readOnly user deleted',
1001            'error_msg': 'unable to delete readOnly user'
1002        }
1003        output_result = rest.restCmd('DELETE',
1004                                     "/settings/readOnlyUser",
1005                                     self.user,
1006                                     self.password,
1007                                     opts)
1008        print output_result
1009
1010    def roUserSet(self):
1011        rest = restclient.RestClient(self.server,
1012                                     self.port,
1013                                     {'debug':self.debug})
1014        try:
1015            output_result = rest.restCmd('GET',
1016                                         '/settings/readOnlyAdminName',
1017                                         self.user,
1018                                         self.password)
1019            json = rest.getJson(output_result)
1020            print "ERROR: readonly user %s exist already. Delete it before creating a new one" % json
1021            return
1022        except:
1023            pass
1024
1025        rest = restclient.RestClient(self.server,
1026                                     self.port,
1027                                     {'debug':self.debug})
1028        if self.ro_username:
1029            rest.setParam('username', self.ro_username)
1030        if self.ro_password:
1031            rest.setParam('password', self.ro_password)
1032        opts = {
1033            'success_msg': 'readOnly user created',
1034            'error_msg': 'fail to create readOnly user'
1035        }
1036        output_result = rest.restCmd('POST',
1037                                     "/settings/readOnlyUser",
1038                                     self.user,
1039                                     self.password,
1040                                     opts)
1041        print output_result
1042
1043    def groupManage(self):
1044        if self.cmd == 'move-servers':
1045            self.groupMoveServer()
1046        elif self.cmd == 'list':
1047             self.groupList()
1048        else:
1049            if self.group_name is None:
1050                usage("please specify --group-name for the operation")
1051            elif self.cmd == 'delete':
1052                self.groupDelete()
1053            elif self.cmd == 'create':
1054                self.groupCreate()
1055            elif self.cmd == 'add-servers':
1056                self.groupAddServers()
1057            elif self.cmd == 'rename':
1058                self.groupRename()
1059            else:
1060                print "Unknown group command:%s" % self.cmd
1061
1062    def getGroupUri(self, groupName):
1063        rest = restclient.RestClient(self.server,
1064                                     self.port,
1065                                     {'debug':self.debug})
1066        output_result = rest.restCmd('GET',
1067                                     '/pools/default/serverGroups',
1068                                     self.user,
1069                                     self.password)
1070        groups = rest.getJson(output_result)
1071        for group in groups["groups"]:
1072            if groupName == group["name"]:
1073                return group["uri"]
1074        return None
1075
1076    def getServerGroups(self):
1077        rest = restclient.RestClient(self.server,
1078                                     self.port,
1079                                     {'debug':self.debug})
1080        output_result = rest.restCmd('GET',
1081                                     '/pools/default/serverGroups',
1082                                     self.user,
1083                                     self.password)
1084        return rest.getJson(output_result)
1085
1086    def groupList(self):
1087        rest = restclient.RestClient(self.server,
1088                                     self.port,
1089                                     {'debug':self.debug})
1090        output_result = rest.restCmd('GET',
1091                                     '/pools/default/serverGroups',
1092                                     self.user,
1093                                     self.password)
1094        groups = rest.getJson(output_result)
1095        found = False
1096        for group in groups["groups"]:
1097            if self.group_name is None or self.group_name == group['name']:
1098                found = True
1099                print '%s' % group['name']
1100                for node in group['nodes']:
1101                    print ' server: %s' % node["hostname"]
1102        if not found and self.group_name:
1103            print "Invalid group name: %s" % self.group_name
1104
1105    def groupCreate(self):
1106        rest = restclient.RestClient(self.server,
1107                                     self.port,
1108                                     {'debug':self.debug})
1109        rest.setParam('name', self.group_name)
1110        opts = {
1111            'error_msg': "unable to create group %s" % self.group_name,
1112            'success_msg': "group created %s" % self.group_name
1113        }
1114        output_result = rest.restCmd('POST',
1115                                     '/pools/default/serverGroups',
1116                                     self.user,
1117                                     self.password,
1118                                     opts)
1119        print output_result
1120
1121    def groupRename(self):
1122        uri = self.getGroupUri(self.group_name)
1123        if uri is None:
1124            usage("invalid group name:%s" % self.group_name)
1125        if self.group_rename is None:
1126            usage("invalid group name:%s" % self.group_name)
1127
1128        rest = restclient.RestClient(self.server,
1129                                     self.port,
1130                                     {'debug':self.debug})
1131        rest.setParam('name', self.group_rename)
1132        opts = {
1133            'error_msg': "unable to rename group %s" % self.group_name,
1134            'success_msg': "group renamed %s" % self.group_name
1135        }
1136        output_result = rest.restCmd('PUT',
1137                                     uri,
1138                                     self.user,
1139                                     self.password,
1140                                     opts)
1141        print output_result
1142
1143    def groupDelete(self):
1144        uri = self.getGroupUri(self.group_name)
1145        if uri is None:
1146            usage("invalid group name:%s" % self.group_name)
1147
1148        rest = restclient.RestClient(self.server,
1149                                     self.port,
1150                                     {'debug':self.debug})
1151        rest.setParam('name', self.group_name)
1152        opts = {
1153            'error_msg': "unable to delete group %s" % self.group_name,
1154            'success_msg': "group deleted %s" % self.group_name
1155        }
1156        output_result = rest.restCmd('DELETE',
1157                                     uri,
1158                                     self.user,
1159                                     self.password,
1160                                     opts)
1161        print output_result
1162
1163    def groupAddServers(self):
1164
1165        uri = self.getGroupUri(self.group_name)
1166        if uri is None:
1167            usage("invalid group name:%s" % self.group_name)
1168        uri = "%s/addNode" % uri
1169        groups = self.getServerGroups()
1170        for server in self.server_list:
1171            rest = restclient.RestClient(self.server,
1172                                     self.port,
1173                                     {'debug':self.debug})
1174            rest.setParam('hostname', server)
1175            if self.sa_username:
1176                rest.setParam('user', self.sa_username)
1177            if self.sa_password:
1178                rest.setParam('password', self.sa_password)
1179
1180            opts = {
1181                'error_msg': "unable to add server '%s' to group '%s'" % (server, self.group_name),
1182                'success_msg': "add server '%s' to group '%s'" % (server, self.group_name)
1183            }
1184            output_result = rest.restCmd('POST',
1185                                     uri,
1186                                     self.user,
1187                                     self.password,
1188                                     opts)
1189            print output_result
1190
1191    def groupMoveServer(self):
1192        groups = self.getServerGroups()
1193        node_info = {}
1194        for group in groups["groups"]:
1195            if self.from_group == group['name']:
1196                for server in self.server_list:
1197                    for node in group["nodes"]:
1198                        if server == node["hostname"]:
1199                            node_info[server] = node
1200                            group["nodes"].remove(node)
1201        if not node_info:
1202            print "No servers removed from group '%s'" % self.from_group
1203            return
1204
1205        for group in groups["groups"]:
1206            if self.to_group == group['name']:
1207                for server in self.server_list:
1208                    found = False
1209                    for node in group["nodes"]:
1210                        if server == node["hostname"]:
1211                            found = True
1212                            break
1213                    if not found:
1214                        group["nodes"].append(node_info[server])
1215
1216        payload = json.dumps(groups)
1217        rest = restclient.RestClient(self.server,
1218                                     self.port,
1219                                     {'debug':self.debug})
1220        rest.setPayload(payload)
1221
1222        opts = {
1223            'error_msg': "unable to move servers from group '%s' to group '%s'" % (self.from_group, self.to_group),
1224            'success_msg': "move servers from group '%s' to group '%s'" % (self.from_group, self.to_group)
1225        }
1226        output_result = rest.restCmd('PUT',
1227                                     groups["uri"],
1228                                     self.user,
1229                                     self.password,
1230                                     opts)
1231        print output_result
1232
1233    def retrieveCert(self):
1234        if self.certificate_file is None:
1235            usage("please specify certificate file name for the operation")
1236
1237        rest = restclient.RestClient(self.server,
1238                                     self.port,
1239                                     {'debug':self.debug})
1240        output_result = ''
1241        if self.cmd == 'retrieve':
1242            opts = {
1243                'error_msg': "unable to %s certificate" % self.cmd,
1244                'success_msg': "Successfully %s certificate" % self.cmd
1245            }
1246            output_result = rest.restCmd('GET',
1247                                         '/pools/default/certificate',
1248                                        self.user,
1249                                        self.password,
1250                                        opts)
1251        elif self.cmd  == 'regenerate':
1252            opts = {
1253                'error_msg': "unable to %s certificate" % self.cmd,
1254                'success_msg': None
1255            }
1256            output_result = rest.restCmd('POST',
1257                                         '/controller/regenerateCertificate',
1258                                        self.user,
1259                                        self.password,
1260                                        opts)
1261        else:
1262            print "ERROR: unknown request:", self.cmd
1263            return
1264
1265        try:
1266            fp = open(self.certificate_file, 'w')
1267            fp.write(output_result)
1268            fp.close()
1269            print "SUCCESS: %s certificate to '%s'" % (self.cmd, self.certificate_file)
1270        except IOError, error:
1271            print "ERROR:", error
1272
1273    def collectLogsStart(self, servers):
1274        """Starts a cluster-wide log collection task"""
1275        if (servers['log'] is None) and (self.all_nodes is not True):
1276            usage("please specify a list of nodes to collect logs from, " +
1277                  " or 'all-nodes'")
1278
1279        rest = restclient.RestClient(self.server, self.port,
1280                                     {'debug': self.debug})
1281        if self.all_nodes:
1282            rest.setParam("nodes", "*")
1283        else:
1284            known_otps, eject_otps, failover_otps, readd_otps, hostnames = \
1285                self.getNodeOtps(to_readd=servers['log'])
1286            if not len(readd_otps):
1287                msg = ",".join(hostnames)
1288                usage("invalid node name specified for collecting logs, available nodes are:\n"+msg)
1289
1290            nodelist = ",".join(readd_otps)
1291            rest.setParam("nodes", nodelist)
1292            print "NODES:", nodelist
1293
1294        if self.upload:
1295            if self.upload_host is None:
1296                usage("please specify an upload-host when using --upload")
1297
1298            rest.setParam("uploadHost", self.upload_host)
1299
1300            if not self.customer:
1301                usage("please specify a value for --customer when using" +
1302                      " --upload")
1303
1304            rest.setParam("customer", self.customer)
1305            rest.setParam("ticket", self.ticket)
1306
1307        opts = {
1308            'error_msg': "unable to start log collection:",
1309            'success_msg': "Log collection started"
1310        }
1311
1312        output_result = rest.restCmd(self.method, self.rest_cmd, self.user,
1313                                     self.password, opts)
1314        print output_result
1315
1316    def collectLogsStop(self):
1317        """Stops a cluster-wide log collection task"""
1318        rest = restclient.RestClient(self.server, self.port,
1319                                     {'debug': self.debug})
1320
1321        opts = {
1322            'success_msg': 'collect logs successfully stopped',
1323            'error_msg': 'unable to stop collect logs'
1324        }
1325        output_result = rest.restCmd(self.method, self.rest_cmd, self.user,
1326                                     self.password, opts)
1327        print output_result
1328
1329    def collectLogsStatus(self):
1330        """Shows the current status of log collection task"""
1331        rest = restclient.RestClient(self.server, self.port,
1332                                     {'debug': self.debug})
1333
1334        opts = {
1335            'error_msg': 'unable to obtain collect logs status'
1336        }
1337        output_result = rest.restCmd(self.method, self.rest_cmd, self.user,
1338                                     self.password, opts)
1339
1340        output_json = rest.getJson(output_result)
1341
1342        for e in output_json:
1343            if ((type(e) == type(dict()) and ('type' in e) and
1344                (e['type'] == 'clusterLogsCollection'))):
1345                print "Status:   %s" % e['status']
1346                if 'perNode' in e:
1347                    print "Details:"
1348                    for node, ns in e["perNode"].iteritems():
1349                        print '\tNode:', node
1350                        print '\tStatus:', ns['status']
1351                        for f in ["path", "statusCode", "url", "uploadStatusCode", "uploadOutput"]:
1352                            if f in ns:
1353                                print '\t', f, ":", ns[f]
1354                        print
1355                return
1356