1from random import Random, shuffle
2import sys
3from threading import Thread
4sys.path.append('.')
5sys.path.append('lib')
6
7from load_runner import LoadRunner
8
9from multiprocessing import Queue
10from multiprocessing.process import Process
11from TestInput import TestInputParser
12import logger
13import time
14#this process will run al the time unless it is shut down by the control center
15
16#get the node list from nodes_statuses()
17# run the mbbackup maybe every 60 minutes and then after getting the backup just delete those files
18
19#request = {'queue':queue,'servers':servers,'interval':600}
20from membase.api.rest_client import RestConnection, RestHelper
21from remote.remote_util import RemoteMachineShellConnection
22
23
24def start_load(argv):
25    queue = Queue(10)
26    test_input = TestInputParser.get_test_input(argv)
27    load_info = {
28        'server_info': [test_input.servers[0]],
29        'memcached_info': {
30            'bucket_name': "default",
31            'bucket_port': "11210",
32            'bucket_password': "",
33            },
34        'operation_info': {
35            'operation_distribution': {'set': 10},
36            'valuesize_distribution': {20: 30, 30: 5, 25: 5},
37            'create_percent': 25,
38            'threads': 6,
39            },
40        'limit_info': {
41            'max_items': 0,
42            'operation_count': 0,
43            'time': time.time() + 24 * 60 * 60,
44            'max_size': 0,
45            },
46        }
47    thread = Thread(target=loadrunner, args=(queue, test_input.servers, load_info))
48    thread.start()
49    time.sleep(24 * 60 * 60)
50    queue.put("stop")
51
52
53def loadrunner(queue, servers, load_info):
54    interval = 2 * 60 * 60
55    params = {"queue": queue, "servers": servers, "interval": interval, "load_info": load_info}
56    print params
57    runner = LoadRunnerProcess(params=params)
58    runner.load()
59    time.sleep(24 * 60 * 60)
60
61
62class LoadRunnerProcess(object):
63    def __init__(self, params):
64        self.queue = params["queue"]
65        self.servers = [params["servers"][0]]
66        self.interval = params["interval"]
67        self.log = logger.Logger.get_logger()
68        self.load_info = params["load_info"]
69
70    def load(self):
71        loader = LoadRunner(self.load_info)
72        loader.start()
73        time.sleep(6000)
74        loader.stop()
75        loader.wait()
76
77
78################################################################################
79
80def start_backup(argv):
81    queue = Queue(10)
82    test_input = TestInputParser.get_test_input(argv)
83    thread = Thread(target=backup, args=(queue, test_input.servers))
84    thread.start()
85    time.sleep(24 * 60 * 60)
86    queue.put("stop")
87
88
89def backup(queue, servers):
90    interval = 2 * 60 * 60
91    params = {"queue": queue, "servers": servers, "interval": interval}
92    backup = BackupProcess(params=params)
93    backup.backup()
94
95
96class BackupProcess(object):
97    def __init__(self, params):
98        self.queue = params["queue"]
99        self.servers = params["servers"]
100        self.interval = params["interval"]
101        self.log = logger.Logger.get_logger()
102
103    def backup(self):
104        while True:
105            try:
106                x = self.queue.get_nowait()
107                self.log.info("get_nowait : {0}".format(x))
108
109                break
110                #things are notmal just do another back aafter
111                #waiting for self.interval
112            except Exception:
113                master = self.servers[0]
114                rest = RestConnection(master)
115                nodes = rest.node_statuses()
116                map = self.node_server_map(nodes, self.servers)
117                self.log.info("cluster has {0} nodes".format(len(nodes)))
118                for node in nodes:
119                    try:
120                        from Crypto.Random import atfork
121                        atfork()
122                        BackupHelper(map[node]).backup('default', "/tmp")
123                        BackupHelper(map[node]).backup('default', "/tmp")
124                    except Exception as ex:
125                        print ex
126                    self.log.info("backed up the data into ")
127                time.sleep(self.interval)
128
129
130    def node_server_map(self, nodes, servers):
131        map = {}
132        for node in nodes:
133            for server in servers:
134                if node.ip == server.ip:
135                    self.log.info("node.ip : {0} , server.ip : {1}".format(node.ip, server.ip))
136                    map[node] = server
137                    break
138        return map
139
140        def monitor(self):
141            pass
142            #a thread function which monitor vital system stats during mbbackup
143            #to see if the ops per second drops signifcantly
144
145
146class BackupHelper(object):
147    def __init__(self, serverInfo):
148        self.server = serverInfo
149        self.log = logger.Logger.get_logger()
150        self.shell = RemoteMachineShellConnection(self.server)
151
152
153    #data_file = default-data/default
154    def backup(self, bucket, backup_location):
155        node = RestConnection(self.server).get_nodes_self()
156        mbbackup_path = "{0}/{1}".format(self.server.cli_path, "mbbackup")
157        data_directory = "{0}/{1}-{2}/{3}".format(node.storage[0].path, bucket, "data", bucket)
158        command = "{0} {1} {2}".format(mbbackup_path,
159                                       data_directory,
160                                       backup_location)
161        output, error = self.shell.execute_command(command)
162        self.shell.log_command_output(output, error)
163
164
165def start_combo(argv):
166    queue = Queue(10)
167    test_input = TestInputParser.get_test_input(argv)
168    thread = Thread(target=combo, args=(queue, test_input))
169    thread.start()
170    time.sleep(24 * 60 * 60)
171    queue.put("stop")
172
173
174def combo(queue, input):
175    combo = ComboBaseTests(input)
176    combo.loop()
177
178
179class ComboBaseTests(object):
180    # start from 1..n
181    # then from no failover x node and rebalance and
182    # verify we did not lose items
183    @staticmethod
184    def choose_nodes(master, nodes, howmany):
185        selected = []
186        for node in nodes:
187            if not ComboBaseTests.contains(node.ip, master.ip) and\
188               not ComboBaseTests.contains(node.ip, '127.0.0.1'):
189                selected.append(node)
190                if len(selected) == howmany:
191                    break
192        return selected
193
194    @staticmethod
195    def contains(string1, string2):
196        if string1 and string2:
197            return string1.find(string2) != -1
198        return False
199
200    def __init__(self, input):
201        self._input = input
202        self._servers = self._input.servers
203        self.log = logger.Logger.get_logger()
204
205    def loop(self):
206        duration = 2400
207        replica = 1
208        load_ratio = 5
209        if 'duration' in self._input.test_params:
210            duration = int(self._input.test_params['duration'])
211        if 'replica' in self._input.test_params:
212            replica = int(self._input.test_params['replica'])
213        self.common_test_body(replica, load_ratio, duration)
214
215    def common_test_body(self, replica, load_ratio, timeout=10):
216        log = logger.Logger.get_logger()
217        start_time = time.time()
218        log.info("replica : {0}".format(replica))
219        log.info("load_ratio : {0}".format(load_ratio))
220        master = self._servers[0]
221        log.info('picking server : {0} as the master'.format(master))
222        rest = RestConnection(master)
223        while time.time() < ( start_time + 60 * timeout):
224            #rebalance out step nodes
225            #let's add some items ?
226            nodes = rest.node_statuses()
227            delta = len(self._servers) - len(nodes)
228            if delta > 0:
229                if delta > 1:
230                    how_many_add = Random().randint(1, delta)
231                else:
232                    how_many_add = 1
233                self.log.info("going to add {0} nodes".format(how_many_add))
234                self.rebalance_in(how_many=how_many_add)
235            else:
236                self.log.info("all nodes already joined the cluster")
237            time.sleep(30 * 60)
238            #dont rebalance out if there are not too many nodes
239            if len(nodes) >= (3.0 / 4.0 * len(self._servers)):
240                nodes = rest.node_statuses()
241                how_many_out = Random().randint(1, len(nodes) - 1)
242                self.log.info("going to remove {0} nodes".format(how_many_out))
243                self.rebalance_out(how_many=how_many_out)
244
245    def rebalance_out(self, how_many):
246        msg = "choosing three nodes and rebalance them out from the cluster"
247        self.log.info(msg)
248        rest = RestConnection(self._servers[0])
249        nodes = rest.node_statuses()
250        nodeIps = [node.ip for node in nodes]
251        self.log.info("current nodes : {0}".format(nodeIps))
252        toBeEjected = []
253        toBeEjectedServers = []
254        selection = self._servers[1:]
255        shuffle(selection)
256        for server in selection:
257            for node in nodes:
258                if server.ip == node.ip:
259                    toBeEjected.append(node.id)
260                    toBeEjectedServers.append(server)
261                    break
262            if len(toBeEjected) == how_many:
263                break
264        if len(toBeEjected) > 0:
265            self.log.info("selected {0} for rebalance out from the cluster".format(toBeEjected))
266            otpNodes = [node.id for node in nodes]
267            started = rest.rebalance(otpNodes, toBeEjected)
268            msg = "rebalance operation started ? {0}"
269            self.log.info(msg.format(started))
270            if started:
271                result = rest.monitorRebalance()
272                msg = "successfully rebalanced out selected nodes from the cluster ? {0}"
273                self.log.info(msg.format(result))
274                for server in toBeEjectedServers:
275                    shell = RemoteMachineShellConnection(server)
276                    try:
277                        shell.stop_membase()
278                    except:
279                        pass
280                    try:
281                        shell.start_membase()
282                    except:
283                        pass
284                    shell.disconnect()
285                    RestHelper(RestConnection(server)).is_ns_server_running()
286                    #let's restart membase on those nodes
287                return result
288        return True
289
290    def rebalance_in(self, how_many):
291        rest = RestConnection(self._servers[0])
292        nodes = rest.node_statuses()
293        #choose how_many nodes from self._servers which are not part of
294        # nodes
295        nodeIps = [node.ip for node in nodes]
296        self.log.info("current nodes : {0}".format(nodeIps))
297        toBeAdded = []
298        selection = self._servers[1:]
299        shuffle(selection)
300        for server in selection:
301            if not server.ip in nodeIps:
302                toBeAdded.append(server)
303            if len(toBeAdded) == how_many:
304                break
305
306        for server in toBeAdded:
307            rest.add_node('Administrator', 'password', server.ip)
308            #check if its added ?
309        nodes = rest.node_statuses()
310        otpNodes = [node.id for node in nodes]
311        started = rest.rebalance(otpNodes, [])
312        msg = "rebalance operation started ? {0}"
313        self.log.info(msg.format(started))
314        if started:
315            result = rest.monitorRebalance()
316            msg = "successfully rebalanced out selected nodes from the cluster ? {0}"
317            self.log.info(msg.format(result))
318            return result
319        return False
320
321
322if __name__ == "__main__":
323    process1 = Process(target=start_load, args=(sys.argv,))
324    process1.start()
325    process2 = Process(target=start_combo, args=(sys.argv,))
326    process2.start()
327    process3 = Process(target=start_backup, args=(sys.argv,))
328    process3.start()
329    process1.join()
330    process2.join()
331    process3.join()
332