1#!/usr/bin/env python
2
3import sys
4import subprocess
5import json
6import ConfigParser
7import getopt
8
9#[global]
10#port:8091
11#
12#[10.1.2.99]
13#ip:localhost
14#port:9000
15#
16#[servers]
17#1:10.1.2.99
18#2:10.1.2.100
19#3:10.1.2.101
20#4:10.1.2.102
21#5:10.1.2.103
22#
23#[membase]
24#rest_username:Administrator
25#rest_password:password
26
27python = sys.executable
28cli = "/opt/couchbase/lib/python/couchbase-cli"
29bucket = "default"
30verbose = False
31
32def usage(err=None):
33    err_code = 0
34    if err:
35        err_code = 1
36        print "Error:",err
37        print
38    print "./rebalance.py -i <inifile> -m <master:port> +<num_in> -<num_out> --phase-hint=[phase_hint]"
39    print ""
40    print " inifile            the standard testrunner ini with all nodes listed (base nodes + number rebalance in + number rebalance out"
41    print " num_in             number of nodes to rebalance in (chooses from the end of the server list)"
42    print " num_out            number of nodes to rebalance out (chooses from the start of the server list)"
43    print " phase_hint         1: remove nodes from the end of the list, 2: remove nodes from the start of the list"
44    print ""
45    print " assuming we start with 24 nodes:"
46    print "./rebalance.py -i nodes.ini -m master:port +24 -12 --phase-hint=1"
47    print "./rebalance.py -i nodes.ini -m master:port +12 -12 --phase-hint=2"
48    print "./rebalance.py -i nodes.ini -m master:port +12"
49    print """24 nodes -> (add 24 new nodes and remove 12 existing nodes) -> 36 nodes
5036 nodes -> (add 12 nodes with upgraded RAM that were removed  and remove 12 existing nodes with lower RAM size) -> 36 nodes
5136 nodes -> (add 12 nodes with upgraded RAM that were removed in previous rebalance)  -> 48 nodes"""
52    sys.exit(err_code)
53
54
55def run_cmd(cmd):
56    rtn = ""
57    p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
58    stdoutdata,stderrdata=p.communicate()
59    rtn += stdoutdata
60    return rtn
61
62def servers_diff(servers1, servers2):
63    diff = []
64    for server in servers1:
65        if server not in servers2:
66            diff.append(server)
67    for server in servers2:
68        if server not in servers1:
69            diff.append(server)
70    return list(set(diff))
71
72def _vbucket_diff(original_vbuckets, new_vbuckets, index):
73    rtn = ""
74    numvbuckets = len(original_vbuckets)
75    moved_vbuckets = 0
76    created_vbuckets = 0
77    deleted_vbuckets = 0
78
79    for i in range(numvbuckets):
80        if original_vbuckets[i][index] == "" and new_vbuckets[i][index] != "":
81            created_vbuckets += 1
82        elif original_vbuckets[i][index] != "" and new_vbuckets[i][index] == "":
83            deleted_vbuckets += 1
84        elif original_vbuckets[i][index] != new_vbuckets[i][index]:
85            if verbose:
86                rtn += `i` + " " + original_vbuckets[i][index] + " -> " + new_vbuckets[i][index] + "\n"
87            moved_vbuckets += 1
88
89    if index == 0:
90        rtn += "moved   " + `moved_vbuckets` + " vbuckets"
91    else:
92        rtn += "moved   " + `moved_vbuckets` + " replica vbuckets" + "\n"
93        rtn += "created " + `created_vbuckets` + " replica vbuckets" + "\n"
94        rtn += "deleted " + `deleted_vbuckets` + " replica vbuckets"
95
96    return rtn
97
98def vbucket_active_diff(original_vbuckets, new_vbuckets):
99    return _vbucket_diff(original_vbuckets, new_vbuckets, 0)
100
101def vbucket_replica_diff(original_vbuckets, new_vbuckets):
102    return _vbucket_diff(original_vbuckets, new_vbuckets, 1) + " replicas"
103
104
105class Server(object):
106    def __init__(self, hostname, rest_port, rest_username, rest_password):
107        self.hostname = hostname
108        self.rest_port = rest_port
109        self.rest_username = rest_username
110        self.rest_password = rest_password
111
112    def __str__(self):
113        return "{0}:{1}".format(self.hostname, self.rest_port)
114
115    def __repr__(self):
116        return self.__str__()
117
118    def __eq__(self, other):
119        return self.hostname == other.hostname and self.rest_port == other.rest_port
120
121    def server_list(self):
122        servers = []
123        cmd = python + " " + cli + " server-list -c {0}:{1} -u {2} -p {3}".format(self.hostname,
124                                                                                  self.rest_port,
125                                                                                  self.rest_username,
126                                                                                  self.rest_password)
127
128        servers_str = run_cmd(cmd)
129        for server in servers_str.split("\n"):
130            if server:
131                hostname = server.split(" ")[1].split(":")[0]
132                port = server.split(" ")[1].split(":")[1]
133                servers.append(Server(hostname, port, self.rest_username, self.rest_password))
134
135        return servers
136
137    def rebalance(self, servers_add, servers_remove):
138        cmd = python + " " + cli
139        cmd += " rebalance -c {0}:{1} -u {2} -p {3}".format(self.hostname,
140                                                            self.rest_port,
141                                                            self.rest_username,
142                                                            self.rest_password)
143
144        for server in servers_add:
145            cmd += " --server-add={0}:{1} --server-add-username={2} --server-add-password={3}".format(server.hostname,
146                                                                                                      server.rest_port,
147                                                                                                      server.rest_username,
148                                                                                                      server.rest_password)
149
150        for server in servers_remove:
151            cmd += " --server-remove={0}:{1}".format(server.hostname,
152                                                     server.rest_port)
153
154        run_cmd(cmd)
155
156    def vbucket_map(self):
157        cmd = python + " " + cli
158        cmd += " bucket-list -c {0}:{1} -u {2} -p {3}".format(self.hostname,
159                                                              self.rest_port,
160                                                              self.rest_username,
161                                                              self.rest_password)
162
163        cmd += " -o json"
164
165        bucket_json = run_cmd(cmd)
166        bucket_info = json.loads(bucket_json)
167
168        for item in bucket_info:
169            if "name" in item and item["name"] == bucket:
170                serverlist = item["vBucketServerMap"]["serverList"]
171                vbucketmap = item["vBucketServerMap"]["vBucketMap"]
172                break
173
174        vbucket_server_map = []
175        for vbucket in vbucketmap:
176            vbucket_server = []
177            for index in vbucket:
178                if index >= 0:
179                    vbucket_server.append(serverlist[index])
180                else:
181                    vbucket_server.append("")
182            vbucket_server_map.append(vbucket_server)
183
184        return vbucket_server_map
185
186
187class Config(object):
188    def __init__(self, argv):
189        # defaults
190        phase_hint = 0
191        num_in = 0
192        num_out = 0
193        master = None
194        inifile = None
195
196        # first parse out the num_in and num_out
197        argv_stripped = []
198        for arg in argv[1:]:
199            if arg[0] == "+":
200                num_in = int(arg[1:])
201            elif arg[0] == "-" and arg[1:].isdigit():
202                num_out = int(arg[1:])
203            else:
204                argv_stripped.append(arg)
205
206        try:
207            (opts, args) = getopt.getopt(argv_stripped, 'hi:m:p:', ['help', 'ini=', 'master=', 'phase-hint='])
208        except IndexError:
209            usage()
210        except getopt.GetoptError, err:
211            usage(err)
212
213        for o, a in opts:
214            if o == "-h" or o == "--help":
215                usage()
216            if o == "-i" or o == "--ini":
217                inifile = a
218            if o == "-m" or o == "--master":
219                master = a
220            if o == "-p" or o == "--phase-hint":
221                phase_hint = int(a)
222
223        if not inifile:
224            usage("missing ini file")
225        if not master:
226            usage("missing master")
227        if not num_in and not num_out:
228            usage("missing num_in or num_out")
229
230
231        self.servers = []
232        self.master = None
233        self.num_in = int(num_in)
234        self.num_out = int(num_out)
235        self.phase_hint = int(phase_hint)
236
237        config = ConfigParser.ConfigParser()
238        config.read(inifile)
239
240        servers_map = {}
241        master_ip = master.split(":")[0]
242        master_port = master.split(":")[1]
243        default_rest_port = 8091
244        rest_username = ""
245        rest_password = ""
246
247        if config.has_option('global', 'port'):
248            default_rest_port = config.get('global', 'port')
249
250        if config.has_option('membase', 'rest_username'):
251            rest_username = config.get('membase', 'rest_username')
252        if config.has_option('membase', 'rest_password'):
253            rest_password = config.get('membase', 'rest_password')
254
255        i = 0
256        for server in config.options('servers'):
257            server_name = config.get('servers',server)
258            server_id = i
259            i = i + 1
260            server_ip = server_name
261            server_port = default_rest_port
262            if config.has_section(server_name):
263                if config.has_option(server_name, 'ip'):
264                    server_ip = config.get(server_name, 'ip')
265                if config.has_option(server_name, 'port'):
266                    server_port = config.get(server_name, 'port')
267
268            server_info = Server(server_ip, server_port, rest_username, rest_password)
269            servers_map[server_id] = server_info
270            if server_ip == master_ip and server_port == master_port:
271                self.master = server_info
272
273        # sort the servers based on their index in the ini file
274        for index,server in servers_map.iteritems():
275            self.servers.append(server)
276
277    def __str__(self):
278        rtn = ""
279        rtn += "servers:\n"
280        for s in self.servers:
281            if s == self.master:
282                rtn += "*" + `s` + "\n"
283            else:
284                rtn += " " + `s` + "\n"
285
286        rtn += "num_in: " + `self.num_in` + "\n"
287        rtn += "num_out: " + `self.num_out` + "\n"
288        rtn += "phase_hint: " + `self.phase_hint` + "\n"
289
290        return rtn
291
292
293if __name__ == "__main__":
294
295    config = Config(sys.argv)
296
297    # get current vbucket state
298    original_vbuckets = config.master.vbucket_map()
299
300    # get current servers, and current unclustered servers
301    current_servers = config.master.server_list()
302    extra_servers = servers_diff(config.servers, current_servers)
303
304    # determine which servers to add and which to remove
305    servers_add = extra_servers[:config.num_in]
306    if config.phase_hint == 1:
307        print "phase 1"
308        servers_remove = config.servers[config.num_out:config.num_out*2]
309    elif config.phase_hint == 2:
310        print "phase 2"
311        servers_remove = config.servers[:config.num_out]
312    else:
313        servers_remove = current_servers[:config.num_out]
314
315    print "adding {0} nodes".format(config.num_in)
316    for server in servers_add:
317        print " " + `server`
318    print "removing {0} nodes".format(config.num_out)
319    for server in servers_remove:
320        print " " + `server`
321
322    # rebalane in/out
323    config.master.rebalance(servers_add, servers_remove)
324
325    # get new master if needed
326    if config.master in servers_remove:
327        if servers_add:
328            config.master = servers_add[0]
329        else:
330            config.master = servers_diff(current_servers, servers_remove + [config.master])[0]
331        print "new master:", config.master
332
333    # get new vbucket state
334    new_vbuckets = config.master.vbucket_map()
335
336    print vbucket_active_diff(original_vbuckets, new_vbuckets)
337    print vbucket_replica_diff(original_vbuckets, new_vbuckets)
338