1from random import Random, shuffle 2import sys 3from threading import Thread 4sys.path.append('.') 5sys.path.append('lib') 6 7from load_runner import LoadRunner 8 9from multiprocessing import Queue 10from multiprocessing.process import Process 11from TestInput import TestInputParser 12import logger 13import time 14#this process will run al the time unless it is shut down by the control center 15 16#get the node list from nodes_statuses() 17# run the mbbackup maybe every 60 minutes and then after getting the backup just delete those files 18 19#request = {'queue':queue,'servers':servers,'interval':600} 20from membase.api.rest_client import RestConnection, RestHelper 21from remote.remote_util import RemoteMachineShellConnection 22 23 24def start_load(argv): 25 queue = Queue(10) 26 test_input = TestInputParser.get_test_input(argv) 27 load_info = { 28 'server_info': [test_input.servers[0]], 29 'memcached_info': { 30 'bucket_name': "default", 31 'bucket_port': "11210", 32 'bucket_password': "", 33 }, 34 'operation_info': { 35 'operation_distribution': {'set': 10}, 36 'valuesize_distribution': {20: 30, 30: 5, 25: 5}, 37 'create_percent': 25, 38 'threads': 6, 39 }, 40 'limit_info': { 41 'max_items': 0, 42 'operation_count': 0, 43 'time': time.time() + 24 * 60 * 60, 44 'max_size': 0, 45 }, 46 } 47 thread = Thread(target=loadrunner, args=(queue, test_input.servers, load_info)) 48 thread.start() 49 time.sleep(24 * 60 * 60) 50 queue.put("stop") 51 52 53def loadrunner(queue, servers, load_info): 54 interval = 2 * 60 * 60 55 params = {"queue": queue, "servers": servers, "interval": interval, "load_info": load_info} 56 print params 57 runner = LoadRunnerProcess(params=params) 58 runner.load() 59 time.sleep(24 * 60 * 60) 60 61 62class LoadRunnerProcess(object): 63 def __init__(self, params): 64 self.queue = params["queue"] 65 self.servers = [params["servers"][0]] 66 self.interval = params["interval"] 67 self.log = logger.Logger.get_logger() 68 self.load_info = params["load_info"] 69 70 def load(self): 71 loader = LoadRunner(self.load_info) 72 loader.start() 73 time.sleep(6000) 74 loader.stop() 75 loader.wait() 76 77 78################################################################################ 79 80def start_backup(argv): 81 queue = Queue(10) 82 test_input = TestInputParser.get_test_input(argv) 83 thread = Thread(target=backup, args=(queue, test_input.servers)) 84 thread.start() 85 time.sleep(24 * 60 * 60) 86 queue.put("stop") 87 88 89def backup(queue, servers): 90 interval = 2 * 60 * 60 91 params = {"queue": queue, "servers": servers, "interval": interval} 92 backup = BackupProcess(params=params) 93 backup.backup() 94 95 96class BackupProcess(object): 97 def __init__(self, params): 98 self.queue = params["queue"] 99 self.servers = params["servers"] 100 self.interval = params["interval"] 101 self.log = logger.Logger.get_logger() 102 103 def backup(self): 104 while True: 105 try: 106 x = self.queue.get_nowait() 107 self.log.info("get_nowait : {0}".format(x)) 108 109 break 110 #things are notmal just do another back aafter 111 #waiting for self.interval 112 except Exception: 113 master = self.servers[0] 114 rest = RestConnection(master) 115 nodes = rest.node_statuses() 116 map = self.node_server_map(nodes, self.servers) 117 self.log.info("cluster has {0} nodes".format(len(nodes))) 118 for node in nodes: 119 try: 120 from Crypto.Random import atfork 121 atfork() 122 BackupHelper(map[node]).backup('default', "/tmp") 123 BackupHelper(map[node]).backup('default', "/tmp") 124 except Exception as ex: 125 print ex 126 self.log.info("backed up the data into ") 127 time.sleep(self.interval) 128 129 130 def node_server_map(self, nodes, servers): 131 map = {} 132 for node in nodes: 133 for server in servers: 134 if node.ip == server.ip: 135 self.log.info("node.ip : {0} , server.ip : {1}".format(node.ip, server.ip)) 136 map[node] = server 137 break 138 return map 139 140 def monitor(self): 141 pass 142 #a thread function which monitor vital system stats during mbbackup 143 #to see if the ops per second drops signifcantly 144 145 146class BackupHelper(object): 147 def __init__(self, serverInfo): 148 self.server = serverInfo 149 self.log = logger.Logger.get_logger() 150 self.shell = RemoteMachineShellConnection(self.server) 151 152 153 #data_file = default-data/default 154 def backup(self, bucket, backup_location): 155 node = RestConnection(self.server).get_nodes_self() 156 mbbackup_path = "{0}/{1}".format(self.server.cli_path, "mbbackup") 157 data_directory = "{0}/{1}-{2}/{3}".format(node.storage[0].path, bucket, "data", bucket) 158 command = "{0} {1} {2}".format(mbbackup_path, 159 data_directory, 160 backup_location) 161 output, error = self.shell.execute_command(command) 162 self.shell.log_command_output(output, error) 163 164 165def start_combo(argv): 166 queue = Queue(10) 167 test_input = TestInputParser.get_test_input(argv) 168 thread = Thread(target=combo, args=(queue, test_input)) 169 thread.start() 170 time.sleep(24 * 60 * 60) 171 queue.put("stop") 172 173 174def combo(queue, input): 175 combo = ComboBaseTests(input) 176 combo.loop() 177 178 179class ComboBaseTests(object): 180 # start from 1..n 181 # then from no failover x node and rebalance and 182 # verify we did not lose items 183 @staticmethod 184 def choose_nodes(master, nodes, howmany): 185 selected = [] 186 for node in nodes: 187 if not ComboBaseTests.contains(node.ip, master.ip) and\ 188 not ComboBaseTests.contains(node.ip, '127.0.0.1'): 189 selected.append(node) 190 if len(selected) == howmany: 191 break 192 return selected 193 194 @staticmethod 195 def contains(string1, string2): 196 if string1 and string2: 197 return string1.find(string2) != -1 198 return False 199 200 def __init__(self, input): 201 self._input = input 202 self._servers = self._input.servers 203 self.log = logger.Logger.get_logger() 204 205 def loop(self): 206 duration = 2400 207 replica = 1 208 load_ratio = 5 209 if 'duration' in self._input.test_params: 210 duration = int(self._input.test_params['duration']) 211 if 'replica' in self._input.test_params: 212 replica = int(self._input.test_params['replica']) 213 self.common_test_body(replica, load_ratio, duration) 214 215 def common_test_body(self, replica, load_ratio, timeout=10): 216 log = logger.Logger.get_logger() 217 start_time = time.time() 218 log.info("replica : {0}".format(replica)) 219 log.info("load_ratio : {0}".format(load_ratio)) 220 master = self._servers[0] 221 log.info('picking server : {0} as the master'.format(master)) 222 rest = RestConnection(master) 223 while time.time() < ( start_time + 60 * timeout): 224 #rebalance out step nodes 225 #let's add some items ? 226 nodes = rest.node_statuses() 227 delta = len(self._servers) - len(nodes) 228 if delta > 0: 229 if delta > 1: 230 how_many_add = Random().randint(1, delta) 231 else: 232 how_many_add = 1 233 self.log.info("going to add {0} nodes".format(how_many_add)) 234 self.rebalance_in(how_many=how_many_add) 235 else: 236 self.log.info("all nodes already joined the cluster") 237 time.sleep(30 * 60) 238 #dont rebalance out if there are not too many nodes 239 if len(nodes) >= (3.0 / 4.0 * len(self._servers)): 240 nodes = rest.node_statuses() 241 how_many_out = Random().randint(1, len(nodes) - 1) 242 self.log.info("going to remove {0} nodes".format(how_many_out)) 243 self.rebalance_out(how_many=how_many_out) 244 245 def rebalance_out(self, how_many): 246 msg = "choosing three nodes and rebalance them out from the cluster" 247 self.log.info(msg) 248 rest = RestConnection(self._servers[0]) 249 nodes = rest.node_statuses() 250 nodeIps = [node.ip for node in nodes] 251 self.log.info("current nodes : {0}".format(nodeIps)) 252 toBeEjected = [] 253 toBeEjectedServers = [] 254 selection = self._servers[1:] 255 shuffle(selection) 256 for server in selection: 257 for node in nodes: 258 if server.ip == node.ip: 259 toBeEjected.append(node.id) 260 toBeEjectedServers.append(server) 261 break 262 if len(toBeEjected) == how_many: 263 break 264 if len(toBeEjected) > 0: 265 self.log.info("selected {0} for rebalance out from the cluster".format(toBeEjected)) 266 otpNodes = [node.id for node in nodes] 267 started = rest.rebalance(otpNodes, toBeEjected) 268 msg = "rebalance operation started ? {0}" 269 self.log.info(msg.format(started)) 270 if started: 271 result = rest.monitorRebalance() 272 msg = "successfully rebalanced out selected nodes from the cluster ? {0}" 273 self.log.info(msg.format(result)) 274 for server in toBeEjectedServers: 275 shell = RemoteMachineShellConnection(server) 276 try: 277 shell.stop_membase() 278 except: 279 pass 280 try: 281 shell.start_membase() 282 except: 283 pass 284 shell.disconnect() 285 RestHelper(RestConnection(server)).is_ns_server_running() 286 #let's restart membase on those nodes 287 return result 288 return True 289 290 def rebalance_in(self, how_many): 291 rest = RestConnection(self._servers[0]) 292 nodes = rest.node_statuses() 293 #choose how_many nodes from self._servers which are not part of 294 # nodes 295 nodeIps = [node.ip for node in nodes] 296 self.log.info("current nodes : {0}".format(nodeIps)) 297 toBeAdded = [] 298 selection = self._servers[1:] 299 shuffle(selection) 300 for server in selection: 301 if not server.ip in nodeIps: 302 toBeAdded.append(server) 303 if len(toBeAdded) == how_many: 304 break 305 306 for server in toBeAdded: 307 rest.add_node('Administrator', 'password', server.ip) 308 #check if its added ? 309 nodes = rest.node_statuses() 310 otpNodes = [node.id for node in nodes] 311 started = rest.rebalance(otpNodes, []) 312 msg = "rebalance operation started ? {0}" 313 self.log.info(msg.format(started)) 314 if started: 315 result = rest.monitorRebalance() 316 msg = "successfully rebalanced out selected nodes from the cluster ? {0}" 317 self.log.info(msg.format(result)) 318 return result 319 return False 320 321 322if __name__ == "__main__": 323 process1 = Process(target=start_load, args=(sys.argv,)) 324 process1.start() 325 process2 = Process(target=start_combo, args=(sys.argv,)) 326 process2.start() 327 process3 = Process(target=start_backup, args=(sys.argv,)) 328 process3.start() 329 process1.join() 330 process2.join() 331 process3.join() 332