1import time
2import logger
3from threading import Thread
4from exception import TimeoutException
5from membase.api.rest_client import RestConnection
6from membase.api.exception import BucketCreationException
7from membase.helper.bucket_helper import BucketOperationHelper
8from memcached.helper.data_helper import KVStoreAwareSmartClient, KVStoreSmartClientHelper
9import copy
10import json
11import uuid
12from memcached.helper.data_helper import MemcachedClientHelper
13
14#TODO: Setup stacktracer
15#TODO: Needs "easy_install pygments"
16#import stacktracer
17#stacktracer.trace_start("trace.html",interval=30,auto=True) # Set auto flag to always update file!
18
19
20class Task():
21    def __init__(self, name):
22        self.log = logger.Logger.get_logger()
23        self.name = name
24        self.cancelled = False
25        self.retries = 0
26        self.res = None
27
28    def cancel(self):
29        self.cancelled = True
30
31    def is_timed_out(self):
32        if self.res is not None and self.res['status'] == 'timed_out':
33            return True
34        return False
35
36    def set_result(self, result):
37        self.res = result
38
39    def result(self, retries=0):
40        while self.res is None:
41            if retries == 0 or self.retries < retries:
42                time.sleep(1)
43            else:
44                self.res = {"status": "timed_out", "value": None}
45        if self.res['status'] == 'error':
46            raise Exception(self.res['value'])
47        if self.res['status'] == 'timed_out':
48            raise TimeoutException("task {0} timed out, tried {1} times".format(self.name,
49                self.retries))
50        return self.res['value']
51
52
53class NodeInitializeTask(Task):
54    def __init__(self, server):
55        Task.__init__(self, "node_init_task")
56        self.state = "initializing"
57        self.server = server
58
59    def step(self, task_manager):
60        if self.cancelled:
61            self.result = self.set_result({"status": "cancelled", "value": None})
62        elif self.is_timed_out():
63            return
64        elif self.state == "initializing":
65            self.state = "node init"
66            task_manager.schedule(self)
67        elif self.state == "node init":
68            self.init_node()
69            self.state = "cluster init"
70            task_manager.schedule(self)
71        elif self.state == "cluster init":
72            self.init_node_memory()
73        else:
74            raise Exception("Bad State in NodeInitializationTask")
75
76    def init_node(self):
77        rest = RestConnection(self.server)
78        rest.init_cluster(self.server.rest_username, self.server.rest_password)
79
80    def init_node_memory(self):
81        rest = RestConnection(self.server)
82        info = rest.get_nodes_self()
83        quota = int(info.mcdMemoryReserved * 2 / 3)
84        rest.init_cluster_memoryQuota(self.server.rest_username, self.server.rest_password, quota)
85        self.state = "finished"
86        self.set_result({"status": "success", "value": quota})
87
88
89class BucketCreateTask(Task):
90    def __init__(self, server, bucket='default', replicas=1, port=11210, size=0, password=None):
91        Task.__init__(self, "bucket_create_task")
92        self.server = server
93        self.bucket = bucket
94        self.replicas = replicas
95        self.port = port
96        self.size = size
97        self.password = password
98        self.state = "initializing"
99
100    def step(self, task_manager):
101        if self.cancelled:
102            self.result = self.set_result({"status": "cancelled", "value": None})
103        elif self.is_timed_out():
104            return
105        elif self.state == "initializing":
106            self.state = "creating"
107            task_manager.schedule(self)
108        elif self.state == "creating":
109            self.create_bucket(task_manager)
110        elif self.state == "checking":
111            self.check_bucket_ready(task_manager)
112        else:
113            raise Exception("Bad State in BucketCreateTask")
114
115    def create_bucket(self, task_manager):
116        rest = RestConnection(self.server)
117        if self.size <= 0:
118            info = rest.get_nodes_self()
119            self.size = info.memoryQuota * 2 / 3
120
121        authType = 'none' if self.password is None else 'sasl'
122
123        try:
124            rest.create_bucket(bucket=self.bucket,
125                ramQuotaMB=self.size,
126                replicaNumber=self.replicas,
127                proxyPort=self.port,
128                authType=authType,
129                saslPassword=self.password)
130            self.state = "checking"
131            task_manager.schedule(self)
132        except BucketCreationException:
133            self.state = "finished"
134            self.set_result({"status": "error",
135                             "value": "Failed to create bucket {0}".format(self.bucket)})
136
137    def check_bucket_ready(self, task_manager):
138        try:
139            if BucketOperationHelper.wait_for_memcached(self.server, self.bucket):
140                self.set_result({"status": "success", "value": None})
141                self.state == "finished"
142                return
143            else:
144                self.log.info("vbucket map not ready after try {0}".format(self.retries))
145        except Exception:
146            self.log.info("vbucket map not ready after try {0}".format(self.retries))
147        self.retries = self.retries + 1
148        task_manager.schedule(self)
149
150
151class BucketDeleteTask(Task):
152    def __init__(self, server, bucket="default"):
153        Task.__init__(self, "bucket_delete_task")
154        self.server = server
155        self.bucket = bucket
156        self.state = "initializing"
157
158    def step(self, task_manager):
159        if self.cancelled:
160            self.result = self.set_result({"status": "cancelled", "value": None})
161        elif self.is_timed_out():
162            return
163        elif self.state == "initializing":
164            self.state = "creating"
165            task_manager.schedule(self)
166        elif self.state == "creating":
167            self.delete_bucket(task_manager)
168        elif self.state == "checking":
169            self.check_bucket_deleted()
170        else:
171            raise Exception("Bad State in BucketDeleteTask")
172
173    def delete_bucket(self, task_manager):
174        rest = RestConnection(self.server)
175        if self.bucket in [bucket.name for bucket in rest.get_buckets()]:
176            if rest.delete_bucket(self.bucket):
177                self.state = "checking"
178                task_manager.schedule(self)
179            else:
180                self.state = "finished"
181                self.set_result({"status": "error",
182                                 "value": "Failed to delete bucket {0}".format(self.bucket)})
183        else:
184            # bucket already deleted
185            self.state = "finished"
186            self.set_result({"status": "success", "value": None})
187
188    def check_bucket_deleted(self):
189        rest = RestConnection(self.server)
190        if BucketOperationHelper.wait_for_bucket_deletion(self.bucket, rest, 200):
191            self.set_result({"status": "success", "value": None})
192        else:
193            self.set_result({"status": "error",
194                             "value": "{0} bucket took too long to delete".format(self.bucket)})
195        self.state = "finished"
196
197
198class RebalanceTask(Task):
199    def __init__(self, servers, to_add=[], to_remove=[], do_stop=False, progress=30):
200        Task.__init__(self, "rebalance_task")
201        self.servers = servers
202        self.to_add = to_add
203        self.to_remove = to_remove
204        self.do_stop = do_stop
205        self.progress = progress
206        self.state = "initializing"
207        self.log.info("tcmalloc fragmentation stats before Rebalance ")
208        self.getStats(self.servers[0])
209
210    def getStats(self, servers):
211        rest = RestConnection(self.servers[0])
212        nodes = rest.node_statuses()
213        buckets = rest.get_buckets()
214
215        for node in nodes:
216            for bucket in buckets:
217                bucket = bucket.name
218                _node = {"ip": node.ip, "port": node.port, "username": self.servers[0].rest_username,
219                         "password": self.servers[0].rest_password}
220                try:
221                    mc = MemcachedClientHelper.direct_client(_node, bucket)
222                    self.log.info("Bucket :{0} ip {1} : Stats {2} \n".format(bucket, node.ip, mc.stats("memory")))
223                except Exception:
224                    self.log.info("Server {0} not yet part of the cluster".format(node.ip))
225
226    def step(self, task_manager):
227        if self.cancelled:
228            self.result = self.set_result({"status": "cancelled", "value": None})
229        elif self.is_timed_out():
230            return
231        elif self.state == "initializing":
232            self.state = "add_nodes"
233            task_manager.schedule(self)
234        elif self.state == "add_nodes":
235            self.add_nodes(task_manager)
236        elif self.state == "start_rebalance":
237            self.start_rebalance(task_manager)
238        elif self.state == "stop_rebalance":
239            self.stop_rebalance(task_manager)
240        elif self.state == "rebalancing":
241            self.rebalancing(task_manager)
242        else:
243            raise Exception("Bad State in RebalanceTask: {0}".format(self.state))
244
245    def add_nodes(self, task_manager):
246        master = self.servers[0]
247        rest = RestConnection(master)
248        try:
249            for node in self.to_add:
250                self.log.info("adding node {0}:{1} to cluster".format(node.ip, node.port))
251                rest.add_node(master.rest_username, master.rest_password,
252                    node.ip, node.port)
253            self.state = "start_rebalance"
254            task_manager.schedule(self)
255        except Exception as e:
256            self.state = "finished"
257            self.set_result({"status": "error", "value": e})
258
259    def start_rebalance(self, task_manager):
260        rest = RestConnection(self.servers[0])
261        nodes = rest.node_statuses()
262        ejectedNodes = []
263        for node in self.to_remove:
264            ejectedNodes.append(node.id)
265        try:
266            rest.rebalance(otpNodes=[node.id for node in nodes], ejectedNodes=ejectedNodes)
267            self.state = "rebalancing"
268            task_manager.schedule(self)
269        except Exception as e:
270            self.state = "finishing"
271            self.set_result({"status": "error", "value": e})
272
273    def stop_rebalance(self, task_manager):
274        rest = RestConnection(self.servers[0])
275        try:
276            rest.stop_rebalance()
277            # We don't want to start rebalance immediately
278            self.log.info("Rebalance Stopped, sleep for 20 secs")
279            time.sleep(20)
280            self.do_stop = False
281            self.state = "start_rebalance"
282            task_manager.schedule(self)
283        except Exception as e:
284            self.state = "finishing"
285            self.set_result({"status": "error", "value": e})
286
287    def rebalancing(self, task_manager):
288        rest = RestConnection(self.servers[0])
289        try:
290            progress = rest._rebalance_progress()
291            if progress is not -1 and progress is not 100:
292                if self.do_stop and progress >= self.progress:
293                    self.state = "stop_rebalance"
294                    task_manager.schedule(self, 1)
295                else:
296                    task_manager.schedule(self, 1)
297            else:
298                self.state = "finishing"
299                self.set_result({"status": "success", "value": None})
300                self.log.info("tcmalloc fragmentation stats after Rebalance ")
301                self.getStats(self.servers[0])
302        except Exception as e:
303            self.state = "finishing"
304            self.set_result({"status": "error", "value": e})
305
306
307class FailOverTask(Task):
308    def __init__(self, servers, to_remove=[]):
309        Task.__init__(self, "failover_task")
310        self.servers = servers
311        self.to_remove = to_remove
312        self.state = "initializing"
313
314    def step(self, task_manager):
315        if self.cancelled:
316            self.result = self.set_result({"status": "cancelled", "value": None})
317        elif self.is_timed_out():
318            return
319        elif self.state == "initializing":
320            self.state = "start_failover"
321            task_manager.schedule(self)
322        elif self.state == "start_failover":
323            self.start_failover(task_manager)
324        elif self.state == "failing over":
325            self.failingOver(task_manager)
326        else:
327            raise Exception("Bad State in Failover: {0}".format(self.state))
328
329    def start_failover(self, task_manager):
330        rest = RestConnection(self.servers[0])
331        ejectedNodes = []
332        for node in self.to_remove:
333            ejectedNodes.append(node.id)
334        try:
335            rest.fail_over(self.to_remove[0])
336            self.state = "failing over"
337            task_manager.schedule(self)
338        except Exception as e:
339            self.state = "finishing"
340            self.set_result({"status": "error", "value": e})
341
342    def failingOver(self, task_manager):
343        rest = RestConnection(self.servers[0])
344        ejectedNodes = []
345
346        for node in self.to_remove:
347            ejectedNodes.append(node.id)
348            self.log.info("ejected node is : %s" % ejectedNodes[0])
349
350        progress = rest.fail_over(ejectedNodes[0])
351        if not progress:
352            self.state = "finishing"
353            self.log.info("Error! Missing Parameter ... No node to fail over")
354            self.set_result({"status": "error", "value": None})
355        else:
356            self.state = "finishing"
357            self.log.info("Success! FailedOver node {0}".format([ejectedNodes]))
358            self.set_result({"status": "success", "value": None})
359
360
361#OperationGeneratorTask
362#OperationGenerating
363class GenericLoadingTask(Thread, Task):
364    def __init__(self, rest, bucket, kv_store=None, store_enabled=True, info=None):
365        Thread.__init__(self)
366        Task.__init__(self, "gen_task")
367        self.rest = rest
368        self.bucket = bucket
369        self.client = KVStoreAwareSmartClient(rest, bucket, kv_store, info, store_enabled)
370        self.state = "initializing"
371        self.doc_op_count = 0
372
373    def cancel(self):
374        self._Thread__stop()
375        self.join()
376        self.log.info("cancelling task: {0}".format(self.name))
377        self.cancelled = True
378
379
380    def step(self, task_manager):
381        if self.cancelled:
382            self.result = self.set_result({"status": "cancelled",
383                                           "value": self.doc_op_count})
384        if self.state == "initializing":
385            self.state = "running"
386            self.start()
387            task_manager.schedule(self, 2)
388        elif self.state == "running":
389            self.log.info("{0}: {1} ops completed".format(self.name, self.doc_op_count))
390            if self.is_alive():
391                task_manager.schedule(self, 5)
392            else:
393                self.join()
394                self.state = "finished"
395                self.set_result({"status": "success", "value": self.doc_op_count})
396        elif self.state != "finished":
397            raise Exception("Bad State in DocumentGeneratorTask")
398
399    def do_task_op(self, op, key, value=None, expiration=None):
400        retry_count = 0
401        ok = False
402        if value is not None:
403            value = value.replace(" ", "")
404
405        while retry_count < 5 and not ok:
406            try:
407                if op == "set":
408                    if expiration is None:
409                        self.client.set(key, value)
410                    else:
411                        self.client.set(key, value, expiration)
412                if op == "get":
413                    value = self.client.mc_get(key)
414                if op == "delete":
415                    self.client.delete(key)
416                ok = True
417            except Exception:
418                retry_count += 1
419                self.client.reset_vbuckets(self.rest,
420                                           set([self.client._get_vBucket_id(key)]))
421        return value
422
423
424class LoadDocGeneratorTask(GenericLoadingTask):
425    def __init__(self, rest, doc_generators, bucket="default", kv_store=None,
426                 store_enabled=True, expiration=None, loop=False):
427        GenericLoadingTask.__init__(self, rest, bucket, kv_store, store_enabled)
428
429        self.doc_generators = doc_generators
430        self.expiration = None
431        self.loop = loop
432        self.name = "doc-load-task{0}".format(str(uuid.uuid4())[:7])
433
434    def run(self):
435        while True:
436            dg = copy.deepcopy(self.doc_generators)
437            for doc_generator in dg:
438                for value in doc_generator:
439                    _value = value.encode("ascii", "ignore")
440                    _json = json.loads(_value, encoding="utf-8")
441                    _id = _json["meta"]["id"].encode("ascii", "ignore")
442                    try:
443                        self.do_task_op("set", _id, json.dumps(_json["json"]), self.expiration)
444                        self.doc_op_count += 1
445                    except Exception as e:
446                        self.state = "finished"
447                        self.set_result({"status": "error", "value": e})
448                        return
449                    except:
450                        return
451            if not self.loop:
452                self.set_result({"status": "success", "value": self.doc_op_count})
453                return
454
455
456class DocumentAccessTask(GenericLoadingTask):
457    def __init__(self, rest, doc_ids, bucket="default",
458                 info=None, loop=False):
459        GenericLoadingTask.__init__(self, rest, bucket, info=info)
460        self.doc_ids = doc_ids
461        self.name = "doc-get-task{0}".format(str(uuid.uuid4())[:7])
462        self.loop = loop
463
464    def run(self):
465        while True:
466            for _id in self.doc_ids:
467                try:
468                    self.do_task_op("get", _id)
469                    self.doc_op_count = self.doc_op_count + 1
470                except Exception as e:
471                    self.set_result({"status": "error",
472                                     "value": "get failed {0}".format(e)})
473            if not self.loop:
474                self.set_result({"status": "success", "value": self.doc_op_count})
475                return
476
477
478class DocumentExpireTask(GenericLoadingTask):
479    def __init__(self, rest, doc_ids, bucket="default", info=None,
480                 kv_store=None, store_enabled=True, expiration=5):
481        GenericLoadingTask.__init__(self, rest, bucket, kv_store, store_enabled)
482        self.doc_ids = doc_ids
483        self.name = "doc-expire-task{0}".format(str(uuid.uuid4())[:7])
484        self.expiration = expiration
485
486    def run(self):
487        for _id in self.doc_ids:
488            try:
489                item = self.do_task_op("get", _id)
490                if item:
491                    val = item['value']
492                    self.do_task_op("set", _id, val, self.expiration)
493                    self.doc_op_count = self.doc_op_count + 1
494                else:
495                    self.set_result({"status": "error",
496                                     "value": "failed get key to expire {0}".format(_id)})
497            except Exception as e:
498                self.set_result({"status": "error",
499                                 "value": "expiration failed {0}".format(e)})
500
501        # wait till docs are 'expected' to be expired before returning success
502        time.sleep(self.expiration)
503        self.set_result({"status": "success", "value": self.doc_op_count})
504
505
506class DocumentDeleteTask(GenericLoadingTask):
507    def __init__(self, rest, doc_ids, bucket="default", info=None,
508                 kv_store=None, store_enabled=True):
509        GenericLoadingTask.__init__(self, rest, bucket, kv_store, store_enabled)
510        self.doc_ids = doc_ids
511        self.name = "doc-delete-task{0}".format(str(uuid.uuid4())[:7])
512
513    def run(self):
514        for _id in self.doc_ids:
515            try:
516                self.do_task_op("delete", _id)
517                self.doc_op_count = self.doc_op_count + 1
518            except Exception as e:
519                self.set_result({"status": "error",
520                                 "value": "deletes failed {0}".format(e)})
521        self.set_result({"status": "success", "value": self.doc_op_count})
522
523
524class KVStoreIntegrityTask(Task, Thread):
525    def __init__(self, rest, kv_store, bucket="default"):
526        self.state = "initializing"
527        self.res = None
528        Thread.__init__(self)
529
530        self.client = KVStoreAwareSmartClient(rest, bucket, kv_store)
531        self.kv_helper = KVStoreSmartClientHelper()
532
533    def step(self, task_manager):
534        if self.state == "initializing":
535            self.state = "running"
536            self.start()
537            task_manager.schedule(self)
538        elif self.state == "running":
539            if self.res is not None:
540                self.state = "complete"
541            else:
542                task_manager.schedule(self, 5)
543        else:
544            raise Exception("Bad State in KVStoreIntegrityTask")
545
546    def run(self):
547        validation_failures = KVStoreSmartClientHelper.do_verification(self.client)
548        self.set_result({"status": "success", "value": validation_failures})
549
550