1import time 2import logger 3from threading import Thread 4from exception import TimeoutException 5from membase.api.rest_client import RestConnection 6from membase.api.exception import BucketCreationException 7from membase.helper.bucket_helper import BucketOperationHelper 8from memcached.helper.data_helper import KVStoreAwareSmartClient, KVStoreSmartClientHelper 9import copy 10import json 11import uuid 12from memcached.helper.data_helper import MemcachedClientHelper 13 14#TODO: Setup stacktracer 15#TODO: Needs "easy_install pygments" 16#import stacktracer 17#stacktracer.trace_start("trace.html",interval=30,auto=True) # Set auto flag to always update file! 18 19 20class Task(): 21 def __init__(self, name): 22 self.log = logger.Logger.get_logger() 23 self.name = name 24 self.cancelled = False 25 self.retries = 0 26 self.res = None 27 28 def cancel(self): 29 self.cancelled = True 30 31 def is_timed_out(self): 32 if self.res is not None and self.res['status'] == 'timed_out': 33 return True 34 return False 35 36 def set_result(self, result): 37 self.res = result 38 39 def result(self, retries=0): 40 while self.res is None: 41 if retries == 0 or self.retries < retries: 42 time.sleep(1) 43 else: 44 self.res = {"status": "timed_out", "value": None} 45 if self.res['status'] == 'error': 46 raise Exception(self.res['value']) 47 if self.res['status'] == 'timed_out': 48 raise TimeoutException("task {0} timed out, tried {1} times".format(self.name, 49 self.retries)) 50 return self.res['value'] 51 52 53class NodeInitializeTask(Task): 54 def __init__(self, server): 55 Task.__init__(self, "node_init_task") 56 self.state = "initializing" 57 self.server = server 58 59 def step(self, task_manager): 60 if self.cancelled: 61 self.result = self.set_result({"status": "cancelled", "value": None}) 62 elif self.is_timed_out(): 63 return 64 elif self.state == "initializing": 65 self.state = "node init" 66 task_manager.schedule(self) 67 elif self.state == "node init": 68 self.init_node() 69 self.state = "cluster init" 70 task_manager.schedule(self) 71 elif self.state == "cluster init": 72 self.init_node_memory() 73 else: 74 raise Exception("Bad State in NodeInitializationTask") 75 76 def init_node(self): 77 rest = RestConnection(self.server) 78 rest.init_cluster(self.server.rest_username, self.server.rest_password) 79 80 def init_node_memory(self): 81 rest = RestConnection(self.server) 82 info = rest.get_nodes_self() 83 quota = int(info.mcdMemoryReserved * 2 / 3) 84 rest.init_cluster_memoryQuota(self.server.rest_username, self.server.rest_password, quota) 85 self.state = "finished" 86 self.set_result({"status": "success", "value": quota}) 87 88 89class BucketCreateTask(Task): 90 def __init__(self, server, bucket='default', replicas=1, port=11210, size=0, password=None): 91 Task.__init__(self, "bucket_create_task") 92 self.server = server 93 self.bucket = bucket 94 self.replicas = replicas 95 self.port = port 96 self.size = size 97 self.password = password 98 self.state = "initializing" 99 100 def step(self, task_manager): 101 if self.cancelled: 102 self.result = self.set_result({"status": "cancelled", "value": None}) 103 elif self.is_timed_out(): 104 return 105 elif self.state == "initializing": 106 self.state = "creating" 107 task_manager.schedule(self) 108 elif self.state == "creating": 109 self.create_bucket(task_manager) 110 elif self.state == "checking": 111 self.check_bucket_ready(task_manager) 112 else: 113 raise Exception("Bad State in BucketCreateTask") 114 115 def create_bucket(self, task_manager): 116 rest = RestConnection(self.server) 117 if self.size <= 0: 118 info = rest.get_nodes_self() 119 self.size = info.memoryQuota * 2 / 3 120 121 authType = 'none' if self.password is None else 'sasl' 122 123 try: 124 rest.create_bucket(bucket=self.bucket, 125 ramQuotaMB=self.size, 126 replicaNumber=self.replicas, 127 proxyPort=self.port, 128 authType=authType, 129 saslPassword=self.password) 130 self.state = "checking" 131 task_manager.schedule(self) 132 except BucketCreationException: 133 self.state = "finished" 134 self.set_result({"status": "error", 135 "value": "Failed to create bucket {0}".format(self.bucket)}) 136 137 def check_bucket_ready(self, task_manager): 138 try: 139 if BucketOperationHelper.wait_for_memcached(self.server, self.bucket): 140 self.set_result({"status": "success", "value": None}) 141 self.state == "finished" 142 return 143 else: 144 self.log.info("vbucket map not ready after try {0}".format(self.retries)) 145 except Exception: 146 self.log.info("vbucket map not ready after try {0}".format(self.retries)) 147 self.retries = self.retries + 1 148 task_manager.schedule(self) 149 150 151class BucketDeleteTask(Task): 152 def __init__(self, server, bucket="default"): 153 Task.__init__(self, "bucket_delete_task") 154 self.server = server 155 self.bucket = bucket 156 self.state = "initializing" 157 158 def step(self, task_manager): 159 if self.cancelled: 160 self.result = self.set_result({"status": "cancelled", "value": None}) 161 elif self.is_timed_out(): 162 return 163 elif self.state == "initializing": 164 self.state = "creating" 165 task_manager.schedule(self) 166 elif self.state == "creating": 167 self.delete_bucket(task_manager) 168 elif self.state == "checking": 169 self.check_bucket_deleted() 170 else: 171 raise Exception("Bad State in BucketDeleteTask") 172 173 def delete_bucket(self, task_manager): 174 rest = RestConnection(self.server) 175 if self.bucket in [bucket.name for bucket in rest.get_buckets()]: 176 if rest.delete_bucket(self.bucket): 177 self.state = "checking" 178 task_manager.schedule(self) 179 else: 180 self.state = "finished" 181 self.set_result({"status": "error", 182 "value": "Failed to delete bucket {0}".format(self.bucket)}) 183 else: 184 # bucket already deleted 185 self.state = "finished" 186 self.set_result({"status": "success", "value": None}) 187 188 def check_bucket_deleted(self): 189 rest = RestConnection(self.server) 190 if BucketOperationHelper.wait_for_bucket_deletion(self.bucket, rest, 200): 191 self.set_result({"status": "success", "value": None}) 192 else: 193 self.set_result({"status": "error", 194 "value": "{0} bucket took too long to delete".format(self.bucket)}) 195 self.state = "finished" 196 197 198class RebalanceTask(Task): 199 def __init__(self, servers, to_add=[], to_remove=[], do_stop=False, progress=30): 200 Task.__init__(self, "rebalance_task") 201 self.servers = servers 202 self.to_add = to_add 203 self.to_remove = to_remove 204 self.do_stop = do_stop 205 self.progress = progress 206 self.state = "initializing" 207 self.log.info("tcmalloc fragmentation stats before Rebalance ") 208 self.getStats(self.servers[0]) 209 210 def getStats(self, servers): 211 rest = RestConnection(self.servers[0]) 212 nodes = rest.node_statuses() 213 buckets = rest.get_buckets() 214 215 for node in nodes: 216 for bucket in buckets: 217 bucket = bucket.name 218 _node = {"ip": node.ip, "port": node.port, "username": self.servers[0].rest_username, 219 "password": self.servers[0].rest_password} 220 try: 221 mc = MemcachedClientHelper.direct_client(_node, bucket) 222 self.log.info("Bucket :{0} ip {1} : Stats {2} \n".format(bucket, node.ip, mc.stats("memory"))) 223 except Exception: 224 self.log.info("Server {0} not yet part of the cluster".format(node.ip)) 225 226 def step(self, task_manager): 227 if self.cancelled: 228 self.result = self.set_result({"status": "cancelled", "value": None}) 229 elif self.is_timed_out(): 230 return 231 elif self.state == "initializing": 232 self.state = "add_nodes" 233 task_manager.schedule(self) 234 elif self.state == "add_nodes": 235 self.add_nodes(task_manager) 236 elif self.state == "start_rebalance": 237 self.start_rebalance(task_manager) 238 elif self.state == "stop_rebalance": 239 self.stop_rebalance(task_manager) 240 elif self.state == "rebalancing": 241 self.rebalancing(task_manager) 242 else: 243 raise Exception("Bad State in RebalanceTask: {0}".format(self.state)) 244 245 def add_nodes(self, task_manager): 246 master = self.servers[0] 247 rest = RestConnection(master) 248 try: 249 for node in self.to_add: 250 self.log.info("adding node {0}:{1} to cluster".format(node.ip, node.port)) 251 rest.add_node(master.rest_username, master.rest_password, 252 node.ip, node.port) 253 self.state = "start_rebalance" 254 task_manager.schedule(self) 255 except Exception as e: 256 self.state = "finished" 257 self.set_result({"status": "error", "value": e}) 258 259 def start_rebalance(self, task_manager): 260 rest = RestConnection(self.servers[0]) 261 nodes = rest.node_statuses() 262 ejectedNodes = [] 263 for node in self.to_remove: 264 ejectedNodes.append(node.id) 265 try: 266 rest.rebalance(otpNodes=[node.id for node in nodes], ejectedNodes=ejectedNodes) 267 self.state = "rebalancing" 268 task_manager.schedule(self) 269 except Exception as e: 270 self.state = "finishing" 271 self.set_result({"status": "error", "value": e}) 272 273 def stop_rebalance(self, task_manager): 274 rest = RestConnection(self.servers[0]) 275 try: 276 rest.stop_rebalance() 277 # We don't want to start rebalance immediately 278 self.log.info("Rebalance Stopped, sleep for 20 secs") 279 time.sleep(20) 280 self.do_stop = False 281 self.state = "start_rebalance" 282 task_manager.schedule(self) 283 except Exception as e: 284 self.state = "finishing" 285 self.set_result({"status": "error", "value": e}) 286 287 def rebalancing(self, task_manager): 288 rest = RestConnection(self.servers[0]) 289 try: 290 progress = rest._rebalance_progress() 291 if progress is not -1 and progress is not 100: 292 if self.do_stop and progress >= self.progress: 293 self.state = "stop_rebalance" 294 task_manager.schedule(self, 1) 295 else: 296 task_manager.schedule(self, 1) 297 else: 298 self.state = "finishing" 299 self.set_result({"status": "success", "value": None}) 300 self.log.info("tcmalloc fragmentation stats after Rebalance ") 301 self.getStats(self.servers[0]) 302 except Exception as e: 303 self.state = "finishing" 304 self.set_result({"status": "error", "value": e}) 305 306 307class FailOverTask(Task): 308 def __init__(self, servers, to_remove=[]): 309 Task.__init__(self, "failover_task") 310 self.servers = servers 311 self.to_remove = to_remove 312 self.state = "initializing" 313 314 def step(self, task_manager): 315 if self.cancelled: 316 self.result = self.set_result({"status": "cancelled", "value": None}) 317 elif self.is_timed_out(): 318 return 319 elif self.state == "initializing": 320 self.state = "start_failover" 321 task_manager.schedule(self) 322 elif self.state == "start_failover": 323 self.start_failover(task_manager) 324 elif self.state == "failing over": 325 self.failingOver(task_manager) 326 else: 327 raise Exception("Bad State in Failover: {0}".format(self.state)) 328 329 def start_failover(self, task_manager): 330 rest = RestConnection(self.servers[0]) 331 ejectedNodes = [] 332 for node in self.to_remove: 333 ejectedNodes.append(node.id) 334 try: 335 rest.fail_over(self.to_remove[0]) 336 self.state = "failing over" 337 task_manager.schedule(self) 338 except Exception as e: 339 self.state = "finishing" 340 self.set_result({"status": "error", "value": e}) 341 342 def failingOver(self, task_manager): 343 rest = RestConnection(self.servers[0]) 344 ejectedNodes = [] 345 346 for node in self.to_remove: 347 ejectedNodes.append(node.id) 348 self.log.info("ejected node is : %s" % ejectedNodes[0]) 349 350 progress = rest.fail_over(ejectedNodes[0]) 351 if not progress: 352 self.state = "finishing" 353 self.log.info("Error! Missing Parameter ... No node to fail over") 354 self.set_result({"status": "error", "value": None}) 355 else: 356 self.state = "finishing" 357 self.log.info("Success! FailedOver node {0}".format([ejectedNodes])) 358 self.set_result({"status": "success", "value": None}) 359 360 361#OperationGeneratorTask 362#OperationGenerating 363class GenericLoadingTask(Thread, Task): 364 def __init__(self, rest, bucket, kv_store=None, store_enabled=True, info=None): 365 Thread.__init__(self) 366 Task.__init__(self, "gen_task") 367 self.rest = rest 368 self.bucket = bucket 369 self.client = KVStoreAwareSmartClient(rest, bucket, kv_store, info, store_enabled) 370 self.state = "initializing" 371 self.doc_op_count = 0 372 373 def cancel(self): 374 self._Thread__stop() 375 self.join() 376 self.log.info("cancelling task: {0}".format(self.name)) 377 self.cancelled = True 378 379 380 def step(self, task_manager): 381 if self.cancelled: 382 self.result = self.set_result({"status": "cancelled", 383 "value": self.doc_op_count}) 384 if self.state == "initializing": 385 self.state = "running" 386 self.start() 387 task_manager.schedule(self, 2) 388 elif self.state == "running": 389 self.log.info("{0}: {1} ops completed".format(self.name, self.doc_op_count)) 390 if self.is_alive(): 391 task_manager.schedule(self, 5) 392 else: 393 self.join() 394 self.state = "finished" 395 self.set_result({"status": "success", "value": self.doc_op_count}) 396 elif self.state != "finished": 397 raise Exception("Bad State in DocumentGeneratorTask") 398 399 def do_task_op(self, op, key, value=None, expiration=None): 400 retry_count = 0 401 ok = False 402 if value is not None: 403 value = value.replace(" ", "") 404 405 while retry_count < 5 and not ok: 406 try: 407 if op == "set": 408 if expiration is None: 409 self.client.set(key, value) 410 else: 411 self.client.set(key, value, expiration) 412 if op == "get": 413 value = self.client.mc_get(key) 414 if op == "delete": 415 self.client.delete(key) 416 ok = True 417 except Exception: 418 retry_count += 1 419 self.client.reset_vbuckets(self.rest, 420 set([self.client._get_vBucket_id(key)])) 421 return value 422 423 424class LoadDocGeneratorTask(GenericLoadingTask): 425 def __init__(self, rest, doc_generators, bucket="default", kv_store=None, 426 store_enabled=True, expiration=None, loop=False): 427 GenericLoadingTask.__init__(self, rest, bucket, kv_store, store_enabled) 428 429 self.doc_generators = doc_generators 430 self.expiration = None 431 self.loop = loop 432 self.name = "doc-load-task{0}".format(str(uuid.uuid4())[:7]) 433 434 def run(self): 435 while True: 436 dg = copy.deepcopy(self.doc_generators) 437 for doc_generator in dg: 438 for value in doc_generator: 439 _value = value.encode("ascii", "ignore") 440 _json = json.loads(_value, encoding="utf-8") 441 _id = _json["meta"]["id"].encode("ascii", "ignore") 442 try: 443 self.do_task_op("set", _id, json.dumps(_json["json"]), self.expiration) 444 self.doc_op_count += 1 445 except Exception as e: 446 self.state = "finished" 447 self.set_result({"status": "error", "value": e}) 448 return 449 except: 450 return 451 if not self.loop: 452 self.set_result({"status": "success", "value": self.doc_op_count}) 453 return 454 455 456class DocumentAccessTask(GenericLoadingTask): 457 def __init__(self, rest, doc_ids, bucket="default", 458 info=None, loop=False): 459 GenericLoadingTask.__init__(self, rest, bucket, info=info) 460 self.doc_ids = doc_ids 461 self.name = "doc-get-task{0}".format(str(uuid.uuid4())[:7]) 462 self.loop = loop 463 464 def run(self): 465 while True: 466 for _id in self.doc_ids: 467 try: 468 self.do_task_op("get", _id) 469 self.doc_op_count = self.doc_op_count + 1 470 except Exception as e: 471 self.set_result({"status": "error", 472 "value": "get failed {0}".format(e)}) 473 if not self.loop: 474 self.set_result({"status": "success", "value": self.doc_op_count}) 475 return 476 477 478class DocumentExpireTask(GenericLoadingTask): 479 def __init__(self, rest, doc_ids, bucket="default", info=None, 480 kv_store=None, store_enabled=True, expiration=5): 481 GenericLoadingTask.__init__(self, rest, bucket, kv_store, store_enabled) 482 self.doc_ids = doc_ids 483 self.name = "doc-expire-task{0}".format(str(uuid.uuid4())[:7]) 484 self.expiration = expiration 485 486 def run(self): 487 for _id in self.doc_ids: 488 try: 489 item = self.do_task_op("get", _id) 490 if item: 491 val = item['value'] 492 self.do_task_op("set", _id, val, self.expiration) 493 self.doc_op_count = self.doc_op_count + 1 494 else: 495 self.set_result({"status": "error", 496 "value": "failed get key to expire {0}".format(_id)}) 497 except Exception as e: 498 self.set_result({"status": "error", 499 "value": "expiration failed {0}".format(e)}) 500 501 # wait till docs are 'expected' to be expired before returning success 502 time.sleep(self.expiration) 503 self.set_result({"status": "success", "value": self.doc_op_count}) 504 505 506class DocumentDeleteTask(GenericLoadingTask): 507 def __init__(self, rest, doc_ids, bucket="default", info=None, 508 kv_store=None, store_enabled=True): 509 GenericLoadingTask.__init__(self, rest, bucket, kv_store, store_enabled) 510 self.doc_ids = doc_ids 511 self.name = "doc-delete-task{0}".format(str(uuid.uuid4())[:7]) 512 513 def run(self): 514 for _id in self.doc_ids: 515 try: 516 self.do_task_op("delete", _id) 517 self.doc_op_count = self.doc_op_count + 1 518 except Exception as e: 519 self.set_result({"status": "error", 520 "value": "deletes failed {0}".format(e)}) 521 self.set_result({"status": "success", "value": self.doc_op_count}) 522 523 524class KVStoreIntegrityTask(Task, Thread): 525 def __init__(self, rest, kv_store, bucket="default"): 526 self.state = "initializing" 527 self.res = None 528 Thread.__init__(self) 529 530 self.client = KVStoreAwareSmartClient(rest, bucket, kv_store) 531 self.kv_helper = KVStoreSmartClientHelper() 532 533 def step(self, task_manager): 534 if self.state == "initializing": 535 self.state = "running" 536 self.start() 537 task_manager.schedule(self) 538 elif self.state == "running": 539 if self.res is not None: 540 self.state = "complete" 541 else: 542 task_manager.schedule(self, 5) 543 else: 544 raise Exception("Bad State in KVStoreIntegrityTask") 545 546 def run(self): 547 validation_failures = KVStoreSmartClientHelper.do_verification(self.client) 548 self.set_result({"status": "success", "value": validation_failures}) 549 550