1import time
2import logger
3import random
4import socket
5import string
6import copy
7import json
8import re
9import math
10import crc32
11import traceback
12from httplib import IncompleteRead
13from threading import Thread
14from memcacheConstants import ERR_NOT_FOUND
15from membase.api.rest_client import RestConnection, Bucket, RestHelper
16from membase.api.exception import BucketCreationException
17from membase.helper.bucket_helper import BucketOperationHelper
18from memcached.helper.data_helper import KVStoreAwareSmartClient, VBucketAwareMemcached, MemcachedClientHelper
19from couchbase_helper.document import DesignDocument, View
20from mc_bin_client import MemcachedError
21from tasks.future import Future
22from couchbase_helper.stats_tools import StatsCommon
23from membase.api.exception import DesignDocCreationException, QueryViewException, ReadDocumentException, RebalanceFailedException, \
24                                    GetBucketInfoFailed, CompactViewFailed, SetViewInfoNotFound, FailoverFailedException, \
25                                    ServerUnavailableException, BucketFlushFailed, CBRecoveryFailedException, BucketCompactionException
26from remote.remote_util import RemoteMachineShellConnection
27from couchbase_helper.documentgenerator import BatchedDocumentGenerator
28
29# TODO: Setup stacktracer
30# TODO: Needs "easy_install pygments"
31# import stacktracer
32# stacktracer.trace_start("trace.html",interval=30,auto=True) # Set auto flag to always update file!
33
34
35PENDING = 'PENDING'
36EXECUTING = 'EXECUTING'
37CHECKING = 'CHECKING'
38FINISHED = 'FINISHED'
39
40class Task(Future):
41    def __init__(self, name):
42        Future.__init__(self)
43        self.log = logger.Logger.get_logger()
44        self.state = PENDING
45        self.name = name
46        self.cancelled = False
47        self.retries = 0
48        self.res = None
49
50    def step(self, task_manager):
51        if not self.done():
52            if self.state == PENDING:
53                self.state = EXECUTING
54                task_manager.schedule(self)
55            elif self.state == EXECUTING:
56                self.execute(task_manager)
57            elif self.state == CHECKING:
58                self.check(task_manager)
59            elif self.state != FINISHED:
60                raise Exception("Bad State in {0}: {1}".format(self.name, self.state))
61
62    def execute(self, task_manager):
63        raise NotImplementedError
64
65    def check(self, task_manager):
66        raise NotImplementedError
67
68class NodeInitializeTask(Task):
69    def __init__(self, server, disabled_consistent_view=None,
70                 rebalanceIndexWaitingDisabled=None,
71                 rebalanceIndexPausingDisabled=None,
72                 maxParallelIndexers=None,
73                 maxParallelReplicaIndexers=None,
74                 port=None, quota_percent=None):
75        Task.__init__(self, "node_init_task")
76        self.server = server
77        self.port = port or server.port
78        self.quota = 0
79        self.quota_percent = quota_percent
80        self.disable_consistent_view = disabled_consistent_view
81        self.rebalanceIndexWaitingDisabled = rebalanceIndexWaitingDisabled
82        self.rebalanceIndexPausingDisabled = rebalanceIndexPausingDisabled
83        self.maxParallelIndexers = maxParallelIndexers
84        self.maxParallelReplicaIndexers = maxParallelReplicaIndexers
85
86
87    def execute(self, task_manager):
88        try:
89            rest = RestConnection(self.server)
90        except ServerUnavailableException as error:
91                self.state = FINISHED
92                self.set_exception(error)
93                return
94        username = self.server.rest_username
95        password = self.server.rest_password
96
97        if self.disable_consistent_view is not None:
98            rest.set_reb_cons_view(self.disable_consistent_view)
99        if self.rebalanceIndexWaitingDisabled is not None:
100            rest.set_reb_index_waiting(self.rebalanceIndexWaitingDisabled)
101        if self.rebalanceIndexPausingDisabled is not None:
102            rest.set_rebalance_index_pausing(self.rebalanceIndexPausingDisabled)
103        if self.maxParallelIndexers is not None:
104            rest.set_max_parallel_indexers(self.maxParallelIndexers)
105        if self.maxParallelReplicaIndexers is not None:
106            rest.set_max_parallel_replica_indexers(self.maxParallelReplicaIndexers)
107
108        rest.init_cluster(username, password, self.port)
109        self.server.port = self.port
110        try:
111            rest = RestConnection(self.server)
112        except ServerUnavailableException as error:
113                self.state = FINISHED
114                self.set_exception(error)
115                return
116        info = rest.get_nodes_self()
117        if info is None:
118            self.state = FINISHED
119            self.set_exception(Exception('unable to get information on a server %s, it is available?' % (self.server.ip)))
120            return
121        self.quota = int(info.mcdMemoryReserved * 2 / 3)
122        if self.quota_percent:
123           self.quota = int(info.mcdMemoryReserved * self.quota_percent / 100)
124        rest.init_cluster_memoryQuota(username, password, self.quota)
125        self.state = CHECKING
126        task_manager.schedule(self)
127
128    def check(self, task_manager):
129        self.state = FINISHED
130        self.set_result(self.quota)
131
132
133class BucketCreateTask(Task):
134    def __init__(self, server, bucket='default', replicas=1, size=0, port=11211, password=None, bucket_type='membase',
135                 enable_replica_index=1, eviction_policy='valueOnly', bucket_priority=None):
136        Task.__init__(self, "bucket_create_task")
137        self.server = server
138        self.bucket = bucket
139        self.replicas = replicas
140        self.port = port
141        self.size = size
142        self.password = password
143        self.bucket_type = bucket_type
144        self.enable_replica_index = enable_replica_index
145        self.eviction_policy = eviction_policy
146        self.bucket_priority = None
147        if bucket_priority is not None:
148            self.bucket_priority = 8
149
150    def execute(self, task_manager):
151        try:
152            rest = RestConnection(self.server)
153        except ServerUnavailableException as error:
154                self.state = FINISHED
155                self.set_exception(error)
156                return
157        if self.size <= 0:
158            info = rest.get_nodes_self()
159            self.size = info.memoryQuota * 2 / 3
160
161        authType = 'none' if self.password is None else 'sasl'
162
163        version = rest.get_nodes_self().version
164        try:
165            if float(version[:2]) >= 3.0 and self.bucket_priority is not None:
166                rest.create_bucket(bucket=self.bucket,
167                               ramQuotaMB=self.size,
168                               replicaNumber=self.replicas,
169                               proxyPort=self.port,
170                               authType=authType,
171                               saslPassword=self.password,
172                               bucketType=self.bucket_type,
173                               replica_index=self.enable_replica_index,
174                               evictionPolicy=self.eviction_policy,
175                               threadsNumber=self.bucket_priority)
176            else:
177                rest.create_bucket(bucket=self.bucket,
178                               ramQuotaMB=self.size,
179                               replicaNumber=self.replicas,
180                               proxyPort=self.port,
181                               authType=authType,
182                               saslPassword=self.password,
183                               bucketType=self.bucket_type,
184                               replica_index=self.enable_replica_index,
185                               evictionPolicy=self.eviction_policy)
186            self.state = CHECKING
187            task_manager.schedule(self)
188        except BucketCreationException as e:
189            self.state = FINISHED
190            self.set_exception(e)
191        # catch and set all unexpected exceptions
192        except Exception as e:
193            self.state = FINISHED
194            self.log.error("Unexpected Exception Caught")
195            self.set_exception(e)
196
197    def check(self, task_manager):
198        try:
199            if self.bucket_type == 'memcached':
200                self.set_result(True)
201                self.state = FINISHED
202                return
203            if BucketOperationHelper.wait_for_memcached(self.server, self.bucket):
204                self.log.info("bucket '{0}' was created with per node RAM quota: {1}".format(self.bucket, self.size))
205                self.set_result(True)
206                self.state = FINISHED
207                return
208            else:
209                self.log.warn("vbucket map not ready after try {0}".format(self.retries))
210                if self.retries >= 5:
211                    self.set_result(False)
212                    self.state = FINISHED
213                    return
214        except Exception as e:
215            self.log.error("Unexpected error: %s" % str(e))
216            self.log.warn("vbucket map not ready after try {0}".format(self.retries))
217            if self.retries >= 5:
218                self.state = FINISHED
219                self.set_exception(e)
220        self.retries = self.retries + 1
221        task_manager.schedule(self)
222
223
224class BucketDeleteTask(Task):
225    def __init__(self, server, bucket="default"):
226        Task.__init__(self, "bucket_delete_task")
227        self.server = server
228        self.bucket = bucket
229
230    def execute(self, task_manager):
231        try:
232            rest = RestConnection(self.server)
233            if rest.delete_bucket(self.bucket):
234                self.state = CHECKING
235                task_manager.schedule(self)
236            else:
237                self.log.info(StatsCommon.get_stats([self.server], self.bucket, "timings"))
238                self.state = FINISHED
239                self.set_result(False)
240        # catch and set all unexpected exceptions
241
242        except Exception as e:
243            self.state = FINISHED
244            self.log.error("Unexpected Exception Caught")
245            self.log.info(StatsCommon.get_stats([self.server], self.bucket, "timings"))
246            self.set_exception(e)
247
248
249    def check(self, task_manager):
250        try:
251            rest = RestConnection(self.server)
252            if BucketOperationHelper.wait_for_bucket_deletion(self.bucket, rest, 200):
253                self.set_result(True)
254            else:
255                self.set_result(False)
256            self.state = FINISHED
257        # catch and set all unexpected exceptions
258        except Exception as e:
259            self.state = FINISHED
260            self.log.error("Unexpected Exception Caught")
261            self.log.info(StatsCommon.get_stats([self.server], self.bucket, "timings"))
262            self.set_exception(e)
263
264class RebalanceTask(Task):
265    def __init__(self, servers, to_add=[], to_remove=[], do_stop=False, progress=30,
266                 use_hostnames=False, services = None):
267        Task.__init__(self, "rebalance_task")
268        self.servers = servers
269        self.to_add = to_add
270        self.to_remove = to_remove
271        self.start_time = None
272        self.services = services
273
274        try:
275            self.rest = RestConnection(self.servers[0])
276        except ServerUnavailableException, e:
277            self.log.error(e)
278            self.state = FINISHED
279            self.set_exception(e)
280        self.retry_get_progress = 0
281        self.use_hostnames = use_hostnames
282        self.previous_progress = 0
283        self.old_vbuckets = {}
284
285    def execute(self, task_manager):
286        try:
287            if len(self.to_add) and len(self.to_add) == len(self.to_remove):
288                self.log.info("This is swap rebalance and we will monitor vbuckets shuffling")
289                non_swap_servers = set(self.servers) - set(self.to_remove) - set(self.to_add)
290                self.old_vbuckets = RestHelper(self.rest)._get_vbuckets(non_swap_servers, None)
291            self.add_nodes(task_manager)
292            self.start_rebalance(task_manager)
293            self.state = CHECKING
294            task_manager.schedule(self)
295        except Exception as e:
296            self.state = FINISHED
297            self.set_exception(e)
298
299    def add_nodes(self, task_manager):
300        master = self.servers[0]
301        services_for_node = None
302        node_index = 0
303        for node in self.to_add:
304            self.log.info("adding node {0}:{1} to cluster".format(node.ip, node.port))
305            if self.services != None:
306                services_for_node = [self.services[node_index]]
307                node_index += 1
308            if self.use_hostnames:
309                self.rest.add_node(master.rest_username, master.rest_password,
310                                   node.hostname, node.port, services = services_for_node)
311            else:
312                self.rest.add_node(master.rest_username, master.rest_password,
313                                   node.ip, node.port, services = services_for_node)
314
315    def start_rebalance(self, task_manager):
316        nodes = self.rest.node_statuses()
317
318        # Determine whether its a cluster_run/not
319        cluster_run = True
320
321        firstIp = self.servers[0].ip
322        if len(self.servers) == 1 and self.servers[0].port == '8091':
323            cluster_run = False
324        else:
325            for node in self.servers:
326                if node.ip != firstIp:
327                    cluster_run = False
328                    break
329        ejectedNodes = []
330
331        for server in self.to_remove:
332            for node in nodes:
333                if cluster_run:
334                    if int(server.port) == int(node.port):
335                        ejectedNodes.append(node.id)
336                else:
337                    if self.use_hostnames:
338                        if server.hostname == node.ip and int(server.port) == int(node.port):
339                            ejectedNodes.append(node.id)
340                    elif server.ip == node.ip and int(server.port) == int(node.port):
341                        ejectedNodes.append(node.id)
342        if self.rest.is_cluster_mixed():
343            # workaround MB-8094
344            self.log.warn("cluster is mixed. sleep for 10 seconds before rebalance")
345            time.sleep(10)
346
347        self.rest.rebalance(otpNodes=[node.id for node in nodes], ejectedNodes=ejectedNodes)
348        self.start_time = time.time()
349
350    def check(self, task_manager):
351        progress = -100
352        try:
353            if self.old_vbuckets:
354                non_swap_servers = set(self.servers) - set(self.to_remove) - set(self.to_add)
355                new_vbuckets = RestHelper(self.rest)._get_vbuckets(non_swap_servers, None)
356                for vb_type in ["active_vb", "replica_vb"]:
357                    for srv in non_swap_servers:
358                        if not(len(self.old_vbuckets[srv][vb_type]) + 1 >= len(new_vbuckets[srv][vb_type]) and\
359                           len(self.old_vbuckets[srv][vb_type]) - 1 <= len(new_vbuckets[srv][vb_type])):
360                            msg = "Vbuckets were suffled! Expected %s for %s" % (vb_type, srv.ip) + \
361                                " are %s. And now are %s" % (
362                                len(self.old_vbuckets[srv][vb_type]),
363                                len(new_vbuckets[srv][vb_type]))
364                            self.log.error(msg)
365                            self.log.error("Old vbuckets: %s, new vbuckets %s" % (self.old_vbuckets, new_vbuckets))
366                            raise Exception(msg)
367            progress = self.rest._rebalance_progress()
368            # if ServerUnavailableException
369            if progress == -100:
370                self.retry_get_progress += 1
371            if self.previous_progress != progress:
372                self.previous_progress = progress
373            else:
374                self.retry_get_progress += 1
375        except RebalanceFailedException as ex:
376            self.state = FINISHED
377            self.set_exception(ex)
378            self.retry_get_progress += 1
379        # catch and set all unexpected exceptions
380        except Exception as e:
381            self.state = FINISHED
382            self.log.error("Unexpected Exception Caught in {0} sec".
383                          format(time.time() - self.start_time))
384            self.set_exception(e)
385        if progress != -1 and progress != 100:
386            if self.retry_get_progress < 20:
387                task_manager.schedule(self, 10)
388            else:
389                self.state = FINISHED
390                #self.set_result(False)
391                self.rest.print_UI_logs()
392                self.set_exception(RebalanceFailedException("seems like rebalance hangs. please check logs!"))
393        else:
394            success_cleaned = []
395            for removed in self.to_remove:
396                try:
397                    rest = RestConnection(removed)
398                except ServerUnavailableException, e:
399                    self.log.error(e)
400                    continue
401                start = time.time()
402                while time.time() - start < 30:
403                    try:
404                        if 'pools' in rest.get_pools_info() and (len(rest.get_pools_info()["pools"]) == 0):
405                            success_cleaned.append(removed)
406                            break
407                        else:
408                            time.sleep(0.1)
409                    except (ServerUnavailableException, IncompleteRead), e:
410                        self.log.error(e)
411            result = True
412            for node in set(self.to_remove) - set(success_cleaned):
413                self.log.error("node {0}:{1} was not cleaned after removing from cluster".format(
414                           node.ip, node.port))
415                result = False
416
417            self.log.info("rebalancing was completed with progress: {0}% in {1} sec".
418                          format(progress, time.time() - self.start_time))
419            self.state = FINISHED
420            self.set_result(result)
421
422class StatsWaitTask(Task):
423    EQUAL = '=='
424    NOT_EQUAL = '!='
425    LESS_THAN = '<'
426    LESS_THAN_EQ = '<='
427    GREATER_THAN = '>'
428    GREATER_THAN_EQ = '>='
429
430    def __init__(self, servers, bucket, param, stat, comparison, value):
431        Task.__init__(self, "stats_wait_task")
432        self.servers = servers
433        self.bucket = bucket
434        if isinstance(bucket, Bucket):
435            self.bucket = bucket.name
436        self.param = param
437        self.stat = stat
438        self.comparison = comparison
439        self.value = value
440        self.conns = {}
441
442    def execute(self, task_manager):
443        self.state = CHECKING
444        task_manager.schedule(self)
445
446    def check(self, task_manager):
447        stat_result = 0
448        for server in self.servers:
449            try:
450                client = self._get_connection(server)
451                stats = client.stats(self.param)
452                if not stats.has_key(self.stat):
453                    self.state = FINISHED
454                    self.set_exception(Exception("Stat {0} not found".format(self.stat)))
455                    return
456                if stats[self.stat].isdigit():
457                    stat_result += long(stats[self.stat])
458                else:
459                    stat_result = stats[self.stat]
460            except EOFError as ex:
461                self.state = FINISHED
462                self.set_exception(ex)
463                return
464        if not self._compare(self.comparison, str(stat_result), self.value):
465            self.log.warn("Not Ready: %s %s %s %s expected on %s, %s bucket" % (self.stat, stat_result,
466                      self.comparison, self.value, self._stringify_servers(), self.bucket))
467            task_manager.schedule(self, 5)
468            return
469        self.log.info("Saw %s %s %s %s expected on %s,%s bucket" % (self.stat, stat_result,
470                      self.comparison, self.value, self._stringify_servers(), self.bucket))
471
472        for server, conn in self.conns.items():
473            conn.close()
474        self.state = FINISHED
475        self.set_result(True)
476
477    def _stringify_servers(self):
478        return ''.join([`server.ip + ":" + str(server.port)` for server in self.servers])
479
480    def _get_connection(self, server):
481        if not self.conns.has_key(server):
482            for i in xrange(3):
483                try:
484                    self.conns[server] = MemcachedClientHelper.direct_client(server, self.bucket)
485                    return self.conns[server]
486                except (EOFError, socket.error):
487                    self.log.error("failed to create direct client, retry in 1 sec")
488                    time.sleep(1)
489            self.conns[server] = MemcachedClientHelper.direct_client(server, self.bucket)
490        return self.conns[server]
491
492    def _compare(self, cmp_type, a, b):
493        if isinstance(b, (int, long)) and a.isdigit():
494            a = long(a)
495        elif isinstance(b, (int, long)) and not a.isdigit():
496                return False
497        if (cmp_type == StatsWaitTask.EQUAL and a == b) or\
498            (cmp_type == StatsWaitTask.NOT_EQUAL and a != b) or\
499            (cmp_type == StatsWaitTask.LESS_THAN_EQ and a <= b) or\
500            (cmp_type == StatsWaitTask.GREATER_THAN_EQ and a >= b) or\
501            (cmp_type == StatsWaitTask.LESS_THAN and a < b) or\
502            (cmp_type == StatsWaitTask.GREATER_THAN and a > b):
503            return True
504        return False
505
506
507class XdcrStatsWaitTask(StatsWaitTask):
508    def __init__(self, servers, bucket, param, stat, comparison, value):
509        StatsWaitTask.__init__(self, servers, bucket, param, stat, comparison, value)
510
511    def check(self, task_manager):
512        stat_result = 0
513        for server in self.servers:
514            try:
515                rest = RestConnection(server)
516                stat = 'replications/' + rest.get_replication_for_buckets(self.bucket, self.bucket)['id'] + '/' + self.stat
517                # just get the required value, don't fetch the big big structure of stats
518                stats_value = rest.fetch_bucket_stats(self.bucket)['op']['samples'][stat][-1]
519                stat_result += long(stats_value)
520            except (EOFError, Exception)  as ex:
521                self.state = FINISHED
522                self.set_exception(ex)
523                return
524        if not self._compare(self.comparison, str(stat_result), self.value):
525            self.log.warn("Not Ready: %s %s %s %s expected on %s, %s bucket" % (self.stat, stat_result,
526                      self.comparison, self.value, self._stringify_servers(), self.bucket))
527            task_manager.schedule(self, 5)
528            return
529        self.log.info("Saw %s %s %s %s expected on %s,%s bucket" % (self.stat, stat_result,
530                      self.comparison, self.value, self._stringify_servers(), self.bucket))
531
532        for server, conn in self.conns.items():
533            conn.close()
534        self.state = FINISHED
535        self.set_result(True)
536
537class GenericLoadingTask(Thread, Task):
538    def __init__(self, server, bucket, kv_store):
539        Thread.__init__(self)
540        Task.__init__(self, "load_gen_task")
541        self.kv_store = kv_store
542        self.client = VBucketAwareMemcached(RestConnection(server), bucket)
543
544    def execute(self, task_manager):
545        self.start()
546        self.state = EXECUTING
547
548    def check(self, task_manager):
549        pass
550
551    def run(self):
552        while self.has_next() and not self.done():
553            self.next()
554        self.state = FINISHED
555        self.set_result(True)
556
557    def has_next(self):
558        raise NotImplementedError
559
560    def next(self):
561        raise NotImplementedError
562
563    def _unlocked_create(self, partition, key, value, is_base64_value=False):
564        try:
565            value_json = json.loads(value)
566            value_json['mutated'] = 0
567            value = json.dumps(value_json)
568        except ValueError:
569            index = random.choice(range(len(value)))
570            if not is_base64_value:
571                value = value[0:index] + random.choice(string.ascii_uppercase) + value[index + 1:]
572        except TypeError:
573            value = json.dumps(value)
574
575        try:
576            self.client.set(key, self.exp, self.flag, value)
577            if self.only_store_hash:
578                value = str(crc32.crc32_hash(value))
579            partition.set(key, value, self.exp, self.flag)
580        except Exception as error:
581            self.state = FINISHED
582            self.set_exception(error)
583
584    def _unlocked_read(self, partition, key):
585        try:
586            o, c, d = self.client.get(key)
587        except MemcachedError as error:
588            if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
589                pass
590            else:
591                self.state = FINISHED
592                self.set_exception(error)
593
594    def _unlocked_replica_read(self, partition, key):
595        try:
596            o, c, d = self.client.getr(key)
597        except Exception as error:
598            self.state = FINISHED
599            self.set_exception(error)
600
601    def _unlocked_update(self, partition, key):
602        value = None
603        try:
604            o, c, value = self.client.get(key)
605            if value is None:
606                return
607
608            value_json = json.loads(value)
609            value_json['mutated'] += 1
610            value = json.dumps(value_json)
611        except MemcachedError as error:
612            if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
613                # there is no such item, we do not know what value to set
614                return
615            else:
616                self.state = FINISHED
617                self.log.error("%s, key: %s update operation." % (error, key))
618                self.set_exception(error)
619                return
620        except ValueError:
621            if value is None:
622                return
623            index = random.choice(range(len(value)))
624            value = value[0:index] + random.choice(string.ascii_uppercase) + value[index + 1:]
625        except BaseException as error:
626            self.state = FINISHED
627            self.set_exception(error)
628
629        try:
630            self.client.set(key, self.exp, self.flag, value)
631            if self.only_store_hash:
632                value = str(crc32.crc32_hash(value))
633            partition.set(key, value, self.exp, self.flag)
634        except BaseException as error:
635            self.state = FINISHED
636            self.set_exception(error)
637
638    def _unlocked_delete(self, partition, key):
639        try:
640            self.client.delete(key)
641            partition.delete(key)
642        except MemcachedError as error:
643            if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
644                pass
645            else:
646                self.state = FINISHED
647                self.log.error("%s, key: %s delete operation." % (error, key))
648                self.set_exception(error)
649        except BaseException as error:
650            self.state = FINISHED
651            self.set_exception(error)
652
653    def _unlocked_append(self, partition, key, value):
654        try:
655            o, c, old_value = self.client.get(key)
656            if value is None:
657                return
658            value_json = json.loads(value)
659            old_value_json = json.loads(old_value)
660            old_value_json.update(value_json)
661            old_value = json.dumps(old_value_json)
662            value = json.dumps(value_json)
663        except MemcachedError as error:
664            if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
665                # there is no such item, we do not know what value to set
666                return
667            else:
668                self.state = FINISHED
669                self.set_exception(error)
670                return
671        except ValueError:
672            o, c, old_value = self.client.get(key)
673            index = random.choice(range(len(value)))
674            value = value[0:index] + random.choice(string.ascii_uppercase) + value[index + 1:]
675            old_value += value
676        except BaseException as error:
677            self.state = FINISHED
678            self.set_exception(error)
679
680        try:
681            self.client.append(key, value)
682            if self.only_store_hash:
683                old_value = str(crc32.crc32_hash(old_value))
684            partition.set(key, old_value)
685        except BaseException as error:
686            self.state = FINISHED
687            self.set_exception(error)
688
689
690class LoadDocumentsTask(GenericLoadingTask):
691    def __init__(self, server, bucket, generator, kv_store, op_type, exp, flag=0,
692                 only_store_hash=True, proxy_client=None):
693        GenericLoadingTask.__init__(self, server, bucket, kv_store)
694        self.generator = generator
695        self.op_type = op_type
696        self.exp = exp
697        self.flag = flag
698        self.only_store_hash = only_store_hash
699        if proxy_client:
700            self.log.info("Changing client to proxy %s:%s..." % (proxy_client.host,
701                                                              proxy_client.port))
702            self.client = proxy_client
703
704    def has_next(self):
705        return self.generator.has_next()
706
707    def next(self):
708        key, value = self.generator.next()
709        partition = self.kv_store.acquire_partition(key)
710        if self.op_type == 'create':
711            is_base64_value = (self.generator.__class__.__name__ == 'Base64Generator')
712            self._unlocked_create(partition, key, value, is_base64_value=is_base64_value)
713        elif self.op_type == 'read':
714            self._unlocked_read(partition, key)
715        elif self.op_type == 'read_replica':
716            self._unlocked_replica_read(partition, key)
717        elif self.op_type == 'update':
718            self._unlocked_update(partition, key)
719        elif self.op_type == 'delete':
720            self._unlocked_delete(partition, key)
721        elif self.op_type == 'append':
722            self._unlocked_append(partition, key, value)
723        else:
724            self.state = FINISHED
725            self.set_exception(Exception("Bad operation type: %s" % self.op_type))
726        self.kv_store.release_partition(key)
727
728class LoadDocumentsGeneratorsTask(LoadDocumentsTask):
729    def __init__(self, server, bucket, generators, kv_store, op_type, exp, flag=0, only_store_hash=True):
730        LoadDocumentsTask.__init__(self, server, bucket, generators[0], kv_store, op_type, exp, flag=flag, only_store_hash=only_store_hash)
731        self.generators = generators
732        self.op_types = None
733        self.buckets = None
734        if isinstance(op_type, list):
735            self.op_types = op_type
736        if isinstance(bucket, list):
737            self.buckets = bucket
738
739    def run(self):
740        if self.op_types:
741            if len(self.op_types) != len(self.generators):
742                self.state = FINISHED
743                self.set_exception(Exception("not all generators have op_type!"))
744        if self.buckets:
745            if len(self.op_types) != len(self.buckets):
746                self.state = FINISHED
747                self.set_exception(Exception("not all generators have bucket specified!"))
748        iterator = 0
749        for generator in self.generators:
750            self.generator = generator
751            if self.op_types:
752                self.op_type = self.op_types[iterator]
753            if self.buckets:
754                self.bucket = self.buckets[iterator]
755            while self.has_next() and not self.done():
756                self.next()
757            iterator += 1
758        self.state = FINISHED
759        self.set_result(True)
760
761class BatchedLoadDocumentsTask(GenericLoadingTask):
762    def __init__(self, server, bucket, generator, kv_store, op_type, exp, flag=0, only_store_hash=True, batch_size=100, pause_secs=1, timeout_secs=30):
763        GenericLoadingTask.__init__(self, server, bucket, kv_store)
764        self.batch_generator = BatchedDocumentGenerator(generator, batch_size)
765        self.op_type = op_type
766        self.exp = exp
767        self.flag = flag
768        self.only_store_hash = only_store_hash
769        self.batch_size = batch_size
770        self.pause = pause_secs
771        self.timeout = timeout_secs
772
773    def has_next(self):
774        has = self.batch_generator.has_next()
775        if math.fmod(self.batch_generator._doc_gen.itr, 50000) == 0.0 or not has:
776            self.log.info("Batch {0} documents done #: {1} with exp:{2}".\
777                          format(self.op_type,
778                                 (self.batch_generator._doc_gen.itr - self.batch_generator._doc_gen.start),
779                                 self.exp))
780        return has
781
782    def next(self):
783        key_value = self.batch_generator.next_batch()
784        partition_keys_dic = self.kv_store.acquire_partitions(key_value.keys())
785        if self.op_type == 'create':
786            self._create_batch(partition_keys_dic, key_value)
787        elif self.op_type == 'update':
788            self._update_batch(partition_keys_dic, key_value)
789        elif self.op_type == 'delete':
790            self._delete_batch(partition_keys_dic, key_value)
791        elif self.op_type == 'read':
792            self._read_batch(partition_keys_dic, key_value)
793        else:
794            self.state = FINISHED
795            self.set_exception(Exception("Bad operation type: %s" % self.op_type))
796        self.kv_store.release_partitions(partition_keys_dic.keys())
797
798
799    def _create_batch(self, partition_keys_dic, key_val):
800        try:
801            self._process_values_for_create(key_val)
802            self.client.setMulti(self.exp, self.flag, key_val, self.pause, self.timeout, parallel=False)
803            self._populate_kvstore(partition_keys_dic, key_val)
804        except (MemcachedError, ServerUnavailableException, socket.error, EOFError, AttributeError, RuntimeError) as error:
805            self.state = FINISHED
806            self.set_exception(error)
807
808
809    def _update_batch(self, partition_keys_dic, key_val):
810        try:
811            self._process_values_for_update(partition_keys_dic, key_val)
812            self.client.setMulti(self.exp, self.flag, key_val, self.pause, self.timeout, parallel=False)
813            self._populate_kvstore(partition_keys_dic, key_val)
814        except (MemcachedError, ServerUnavailableException, socket.error, EOFError, AttributeError, RuntimeError) as error:
815            self.state = FINISHED
816            self.set_exception(error)
817
818
819    def _delete_batch(self, partition_keys_dic, key_val):
820        for partition, keys in partition_keys_dic.items():
821            for key in keys:
822                try:
823                    self.client.delete(key)
824                    partition.delete(key)
825                except MemcachedError as error:
826                    if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
827                        pass
828                    else:
829                        self.state = FINISHED
830                        self.set_exception(error)
831                        return
832                except (ServerUnavailableException, socket.error, EOFError, AttributeError) as error:
833                    self.state = FINISHED
834                    self.set_exception(error)
835
836
837    def _read_batch(self, partition_keys_dic, key_val):
838        try:
839            o, c, d = self.client.getMulti(key_val.keys(), self.pause, self.timeout)
840        except MemcachedError as error:
841                self.state = FINISHED
842                self.set_exception(error)
843
844    def _process_values_for_create(self, key_val):
845        for key, value in key_val.items():
846            try:
847                value_json = json.loads(value)
848                value_json['mutated'] = 0
849                value = json.dumps(value_json)
850            except ValueError:
851                index = random.choice(range(len(value)))
852                value = value[0:index] + random.choice(string.ascii_uppercase) + value[index + 1:]
853            finally:
854                key_val[key] = value
855
856    def _process_values_for_update(self, partition_keys_dic, key_val):
857        for partition, keys in partition_keys_dic.items():
858            for key in keys:
859                value = partition.get_valid(key)
860                if value is None:
861                    del key_val[key]
862                    continue
863                try:
864                    value = key_val[key]  # new updated value, however it is not their in orginal code "LoadDocumentsTask"
865                    value_json = json.loads(value)
866                    value_json['mutated'] += 1
867                    value = json.dumps(value_json)
868                except ValueError:
869                    index = random.choice(range(len(value)))
870                    value = value[0:index] + random.choice(string.ascii_uppercase) + value[index + 1:]
871                finally:
872                    key_val[key] = value
873
874
875    def _populate_kvstore(self, partition_keys_dic, key_val):
876        for partition, keys in partition_keys_dic.items():
877            self._populate_kvstore_partition(partition, keys, key_val)
878
879    def _release_locks_on_kvstore(self):
880        for part in self._partitions_keyvals_dic.keys:
881            self.kv_store.release_lock(part)
882
883    def _populate_kvstore_partition(self, partition, keys, key_val):
884        for key in keys:
885            if self.only_store_hash:
886                key_val[key] = str(crc32.crc32_hash(key_val[key]))
887            partition.set(key, key_val[key], self.exp, self.flag)
888
889class WorkloadTask(GenericLoadingTask):
890    def __init__(self, server, bucket, kv_store, num_ops, create, read, update, delete, exp):
891        GenericLoadingTask.__init__(self, server, bucket, kv_store)
892        self.itr = 0
893        self.num_ops = num_ops
894        self.create = create
895        self.read = create + read
896        self.update = create + read + update
897        self.delete = create + read + update + delete
898        self.exp = exp
899
900    def has_next(self):
901        if self.num_ops == 0 or self.itr < self.num_ops:
902            return True
903        return False
904
905    def next(self):
906        self.itr += 1
907        rand = random.randint(1, self.delete)
908        if rand > 0 and rand <= self.create:
909            self._create_random_key()
910        elif rand > self.create and rand <= self.read:
911            self._get_random_key()
912        elif rand > self.read and rand <= self.update:
913            self._update_random_key()
914        elif rand > self.update and rand <= self.delete:
915            self._delete_random_key()
916
917    def _get_random_key(self):
918        partition, part_num = self.kv_store.acquire_random_partition()
919        if partition is None:
920            return
921
922        key = partition.get_random_valid_key()
923        if key is None:
924            self.kv_store.release_partition(part_num)
925            return
926
927        self._unlocked_read(partition, key)
928        self.kv_store.release_partition(part_num)
929
930    def _create_random_key(self):
931        partition, part_num = self.kv_store.acquire_random_partition(False)
932        if partition is None:
933            return
934
935        key = partition.get_random_deleted_key()
936        if key is None:
937            self.kv_store.release_partition(part_num)
938            return
939
940        value = partition.get_deleted(key)
941        if value is None:
942            self.kv_store.release_partition(part_num)
943            return
944
945        self._unlocked_create(partition, key, value)
946        self.kv_store.release_partition(part_num)
947
948    def _update_random_key(self):
949        partition, part_num = self.kv_store.acquire_random_partition()
950        if partition is None:
951            return
952
953        key = partition.get_random_valid_key()
954        if key is None:
955            self.kv_store.release_partition(part_num)
956            return
957
958        self._unlocked_update(partition, key)
959        self.kv_store.release_partition(part_num)
960
961    def _delete_random_key(self):
962        partition, part_num = self.kv_store.acquire_random_partition()
963        if partition is None:
964            return
965
966        key = partition.get_random_valid_key()
967        if key is None:
968            self.kv_store.release_partition(part_num)
969            return
970
971        self._unlocked_delete(partition, key)
972        self.kv_store.release_partition(part_num)
973
974class ValidateDataTask(GenericLoadingTask):
975    def __init__(self, server, bucket, kv_store, max_verify=None, only_store_hash=True, replica_to_read=None):
976        GenericLoadingTask.__init__(self, server, bucket, kv_store)
977        self.valid_keys, self.deleted_keys = kv_store.key_set()
978        self.num_valid_keys = len(self.valid_keys)
979        self.num_deleted_keys = len(self.deleted_keys)
980        self.itr = 0
981        self.max_verify = self.num_valid_keys + self.num_deleted_keys
982        self.only_store_hash = only_store_hash
983        self.replica_to_read = replica_to_read
984        if max_verify is not None:
985            self.max_verify = min(max_verify, self.max_verify)
986        self.log.info("%s items will be verified on %s bucket" % (self.max_verify, bucket))
987        self.start_time = time.time()
988
989    def has_next(self):
990        if self.itr < (self.num_valid_keys + self.num_deleted_keys) and\
991            self.itr < self.max_verify:
992            if not self.itr % 50000:
993                self.log.info("{0} items were verified".format(self.itr))
994            return True
995        self.log.info("{0} items were verified in {1} sec.the average number of ops\
996            - {2} per second ".format(self.itr, time.time() - self.start_time,
997                self.itr / (time.time() - self.start_time)).rstrip())
998        return False
999
1000    def next(self):
1001        if self.itr < self.num_valid_keys:
1002            self._check_valid_key(self.valid_keys[self.itr])
1003        else:
1004            self._check_deleted_key(self.deleted_keys[self.itr - self.num_valid_keys])
1005        self.itr += 1
1006
1007    def _check_valid_key(self, key):
1008        partition = self.kv_store.acquire_partition(key)
1009
1010        value = partition.get_valid(key)
1011        flag = partition.get_flag(key)
1012        if value is None or flag is None:
1013            self.kv_store.release_partition(key)
1014            return
1015
1016        try:
1017            if self.replica_to_read is None:
1018                o, c, d = self.client.get(key)
1019            else:
1020                o, c, d = self.client.getr(key, replica_index=self.replica_to_read)
1021            if self.only_store_hash:
1022                if crc32.crc32_hash(d) != int(value):
1023                    self.state = FINISHED
1024                    self.set_exception(Exception('Key: %s, Bad hash result: %d != %d for key %s' % (key, crc32.crc32_hash(d), int(value), key)))
1025            else:
1026                value = json.dumps(value)
1027                if d != json.loads(value):
1028                    self.state = FINISHED
1029                    self.set_exception(Exception('Key: %s, Bad result: %s != %s for key %s' % (key, json.dumps(d), value, key)))
1030            if o != flag:
1031                self.state = FINISHED
1032                self.set_exception(Exception('Key: %s, Bad result for flag value: %s != the value we set: %s' % (key, o, flag)))
1033
1034        except MemcachedError as error:
1035            if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
1036                pass
1037            else:
1038                self.state = FINISHED
1039                self.set_exception(error)
1040        except Exception as error:
1041            self.log.error("Unexpected error: %s" % str(error))
1042            self.state = FINISHED
1043            self.set_exception(error)
1044        self.kv_store.release_partition(key)
1045
1046    def _check_deleted_key(self, key):
1047        partition = self.kv_store.acquire_partition(key)
1048
1049        try:
1050            o, c, d = self.client.delete(key)
1051            if partition.get_valid(key) is not None:
1052                self.state = FINISHED
1053                self.set_exception(Exception('Not Deletes: %s' % (key)))
1054        except MemcachedError as error:
1055            if error.status == ERR_NOT_FOUND:
1056                pass
1057            else:
1058                self.state = FINISHED
1059                self.set_exception(error)
1060        self.kv_store.release_partition(key)
1061
1062class ValidateDataWithActiveAndReplicaTask(GenericLoadingTask):
1063    def __init__(self, server, bucket, kv_store, max_verify=None):
1064        GenericLoadingTask.__init__(self, server, bucket, kv_store)
1065        self.valid_keys, self.deleted_keys = kv_store.key_set()
1066        self.num_valid_keys = len(self.valid_keys)
1067        self.num_deleted_keys = len(self.deleted_keys)
1068        self.itr = 0
1069        self.max_verify = self.num_valid_keys + self.num_deleted_keys
1070        if max_verify is not None:
1071            self.max_verify = min(max_verify, self.max_verify)
1072        self.log.info("%s items will be verified on %s bucket" % (self.max_verify, bucket))
1073        self.start_time = time.time()
1074
1075    def has_next(self):
1076        if self.itr < (self.num_valid_keys + self.num_deleted_keys) and\
1077            self.itr < self.max_verify:
1078            if not self.itr % 50000:
1079                self.log.info("{0} items were verified".format(self.itr))
1080            return True
1081        self.log.info("{0} items were verified in {1} sec.the average number of ops\
1082            - {2} per second ".format(self.itr, time.time() - self.start_time,
1083                self.itr / (time.time() - self.start_time)).rstrip())
1084        return False
1085
1086    def next(self):
1087        if self.itr < self.num_valid_keys:
1088            self._check_valid_key(self.valid_keys[self.itr])
1089        else:
1090            self._check_deleted_key(self.deleted_keys[self.itr - self.num_valid_keys])
1091        self.itr += 1
1092
1093    def _check_valid_key(self, key):
1094        try:
1095            o, c, d = self.client.get(key)
1096            o_r, c_r, d_r = self.client.getr(key, replica_index=0)
1097            if o != o_r:
1098                self.state = FINISHED
1099                self.set_exception(Exception('ACTIVE AND REPLICA FLAG CHECK FAILED :: Key: %s, Bad result for CAS value: REPLICA FLAG %s != ACTIVE FLAG %s' % (key, o_r, o)))
1100            if c != c_r:
1101                self.state = FINISHED
1102                self.set_exception(Exception('ACTIVE AND REPLICA CAS CHECK FAILED :: Key: %s, Bad result for CAS value: REPLICA CAS %s != ACTIVE CAS %s' % (key, c_r, c)))
1103            if d != d_r:
1104                self.state = FINISHED
1105                self.set_exception(Exception('ACTIVE AND REPLICA VALUE CHECK FAILED :: Key: %s, Bad result for Value value: REPLICA VALUE %s != ACTIVE VALUE %s' % (key, d_r, d)))
1106
1107        except MemcachedError as error:
1108            if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
1109                pass
1110            else:
1111                self.state = FINISHED
1112                self.set_exception(error)
1113        except Exception as error:
1114            self.log.error("Unexpected error: %s" % str(error))
1115            self.state = FINISHED
1116            self.set_exception(error)
1117
1118    def _check_deleted_key(self, key):
1119        partition = self.kv_store.acquire_partition(key)
1120        try:
1121            o, c, d = self.client.delete(key)
1122            if partition.get_valid(key) is not None:
1123                self.state = FINISHED
1124                self.set_exception(Exception('ACTIVE CHECK :: Not Deletes: %s' % (key)))
1125        except MemcachedError as error:
1126            if error.status == ERR_NOT_FOUND:
1127                pass
1128            else:
1129                self.state = FINISHED
1130                self.set_exception(error)
1131        self.kv_store.release_partition(key)
1132
1133class BatchedValidateDataTask(GenericLoadingTask):
1134    def __init__(self, server, bucket, kv_store, max_verify=None, only_store_hash=True, batch_size=100, timeout_sec=5):
1135        GenericLoadingTask.__init__(self, server, bucket, kv_store)
1136        self.valid_keys, self.deleted_keys = kv_store.key_set()
1137        self.num_valid_keys = len(self.valid_keys)
1138        self.num_deleted_keys = len(self.deleted_keys)
1139        self.itr = 0
1140        self.max_verify = self.num_valid_keys + self.num_deleted_keys
1141        self.timeout_sec = timeout_sec
1142        self.only_store_hash = only_store_hash
1143        if max_verify is not None:
1144            self.max_verify = min(max_verify, self.max_verify)
1145        self.log.info("%s items will be verified on %s bucket" % (self.max_verify, bucket))
1146        self.batch_size = batch_size
1147        self.start_time = time.time()
1148
1149    def has_next(self):
1150        has = False
1151        if self.itr < (self.num_valid_keys + self.num_deleted_keys) and self.itr < self.max_verify:
1152            has = True
1153        if math.fmod(self.itr, 10000) == 0.0:
1154                self.log.info("{0} items were verified".format(self.itr))
1155        if not has:
1156            self.log.info("{0} items were verified in {1} sec.the average number of ops\
1157                - {2} per second".format(self.itr, time.time() - self.start_time,
1158                self.itr / (time.time() - self.start_time)).rstrip())
1159        return has
1160
1161    def next(self):
1162        if self.itr < self.num_valid_keys:
1163            keys_batch = self.valid_keys[self.itr:self.itr + self.batch_size]
1164            self.itr += len(keys_batch)
1165            self._check_valid_keys(keys_batch)
1166        else:
1167            self._check_deleted_key(self.deleted_keys[self.itr - self.num_valid_keys])
1168            self.itr += 1
1169
1170    def _check_valid_keys(self, keys):
1171        partition_keys_dic = self.kv_store.acquire_partitions(keys)
1172        try:
1173            key_vals = self.client.getMulti(keys, parallel=True, timeout_sec=self.timeout_sec)
1174        except ValueError, error:
1175            self.state = FINISHED
1176            self.set_exception(error)
1177            return
1178        except BaseException, error:
1179        # handle all other exception, for instance concurrent.futures._base.TimeoutError
1180            self.state = FINISHED
1181            self.set_exception(error)
1182            return
1183        for partition, keys in partition_keys_dic.items():
1184            self._check_validity(partition, keys, key_vals)
1185        self.kv_store.release_partitions(partition_keys_dic.keys())
1186
1187    def _check_validity(self, partition, keys, key_vals):
1188
1189        for key in keys:
1190            value = partition.get_valid(key)
1191            flag = partition.get_flag(key)
1192            if value is None:
1193                continue
1194            try:
1195                o, c, d = key_vals[key]
1196
1197                if self.only_store_hash:
1198                    if crc32.crc32_hash(d) != int(value):
1199                        self.state = FINISHED
1200                        self.set_exception(Exception('Key: %s Bad hash result: %d != %d' % (key, crc32.crc32_hash(d), int(value))))
1201                else:
1202                    value = json.dumps(value)
1203                    if d != json.loads(value):
1204                        self.state = FINISHED
1205                        self.set_exception(Exception('Key: %s Bad result: %s != %s' % (key, json.dumps(d), value)))
1206                if o != flag:
1207                    self.state = FINISHED
1208                    self.set_exception(Exception('Key: %s Bad result for flag value: %s != the value we set: %s' % (key, o, flag)))
1209            except KeyError as error:
1210                self.state = FINISHED
1211                self.set_exception(error)
1212
1213    def _check_deleted_key(self, key):
1214        partition = self.kv_store.acquire_partition(key)
1215
1216        try:
1217            o, c, d = self.client.delete(key)
1218            if partition.get_valid(key) is not None:
1219                self.state = FINISHED
1220                self.set_exception(Exception('Not Deletes: %s' % (key)))
1221        except MemcachedError as error:
1222            if error.status == ERR_NOT_FOUND:
1223                pass
1224            else:
1225                self.state = FINISHED
1226                self.set_exception(error)
1227        self.kv_store.release_partition(key)
1228
1229
1230class VerifyRevIdTask(GenericLoadingTask):
1231    def __init__(self, src_server, dest_server, bucket, kv_store, max_err_count=100):
1232        GenericLoadingTask.__init__(self, src_server, bucket, kv_store)
1233        self.client_dest = VBucketAwareMemcached(RestConnection(dest_server), bucket)
1234        self.valid_keys, self.deleted_keys = kv_store.key_set()
1235        self.num_valid_keys = len(self.valid_keys)
1236        self.num_deleted_keys = len(self.deleted_keys)
1237        self.keys_not_found = {self.client.rest.ip: [], self.client_dest.rest.ip: []}
1238        self.itr = 0
1239        self.err_count = 0
1240        self.max_err_count = max_err_count
1241
1242    def has_next(self):
1243        if self.itr < (self.num_valid_keys + self.num_deleted_keys) and self.err_count < self.max_err_count:
1244            return True
1245        self.log.info("RevId Verification : {0} existing items have been verified"
1246                      .format(self.itr if self.itr < self.num_valid_keys else self.num_valid_keys))
1247        self.log.info("RevId Verification : {0} deleted items have been verified"
1248                      .format(self.itr - self.num_valid_keys if self.itr > self.num_valid_keys else 0))
1249        return False
1250
1251    def next(self):
1252        if self.itr < self.num_valid_keys:
1253            self._check_key_revId(self.valid_keys[self.itr])
1254        elif self.itr < (self.num_valid_keys + self.num_deleted_keys):
1255            # verify deleted/expired keys
1256            self._check_key_revId(self.deleted_keys[self.itr - self.num_valid_keys],
1257                                  ignore_meta_data=['expiration'])
1258        self.itr += 1
1259
1260        # show progress of verification for every 50k items
1261        if math.fmod(self.itr, 50000) == 0.0:
1262            self.log.info("{0} items have been verified".format(self.itr))
1263
1264    def __get_meta_data(self, client, key):
1265        try:
1266            mc = client.memcached(key)
1267            meta_data = eval("{'deleted': %s, 'flags': %s, 'expiration': %s, 'seqno': %s, 'cas': %s}" % (mc.getMeta(key)))
1268            return meta_data
1269        except MemcachedError as error:
1270            if error.status == ERR_NOT_FOUND:
1271                if key not in self.deleted_keys:
1272                    self.err_count += 1
1273                    self.keys_not_found[client.rest.ip].append(("key: %s" % key, "vbucket: %s" % client._get_vBucket_id(key)))
1274            else:
1275                self.state = FINISHED
1276                self.set_exception(error)
1277        # catch and set all unexpected exceptions
1278        except Exception as e:
1279            self.state = FINISHED
1280            self.log.error("Unexpected Exception Caught: {0}".format(e))
1281            self.set_exception(e)
1282
1283    def _check_key_revId(self, key, ignore_meta_data=[]):
1284        src_meta_data = self.__get_meta_data(self.client, key)
1285        dest_meta_data = self.__get_meta_data(self.client_dest, key)
1286        if not src_meta_data or not dest_meta_data:
1287            return
1288        prev_error_count = self.err_count
1289        err_msg = []
1290        # seqno number should never be zero
1291        if src_meta_data['seqno'] == 0:
1292            self.err_count += 1
1293            err_msg.append(
1294                "seqno on Source should not be 0, Error Count:{0}".format(self.err_count))
1295
1296        if dest_meta_data['seqno'] == 0:
1297            self.err_count += 1
1298            err_msg.append(
1299                "seqno on Destination should not be 0, Error Count:{0}".format(self.err_count))
1300
1301        # verify all metadata
1302        for meta_key in src_meta_data.keys():
1303            if src_meta_data[meta_key] != dest_meta_data[meta_key] and meta_key not in ignore_meta_data:
1304                self.err_count += 1
1305                err_msg.append("{0} mismatch: Source {0}:{1}, Destination {0}:{2}, Error Count:{3}"
1306                    .format(meta_key, src_meta_data[meta_key],
1307                        dest_meta_data[meta_key], self.err_count))
1308
1309        if self.err_count - prev_error_count > 0:
1310            self.log.error("===== Verifying rev_ids failed for key: {0} =====".format(key))
1311            [self.log.error(err) for err in err_msg]
1312            self.log.error("Source meta data: %s" % src_meta_data)
1313            self.log.error("Dest meta data: %s" % dest_meta_data)
1314            self.state = FINISHED
1315
1316class VerifyMetaDataTask(GenericLoadingTask):
1317    def __init__(self, dest_server, bucket, kv_store, meta_data_store, max_err_count=100):
1318        GenericLoadingTask.__init__(self, dest_server, bucket, kv_store)
1319        self.client = VBucketAwareMemcached(RestConnection(dest_server), bucket)
1320        self.valid_keys, self.deleted_keys = kv_store.key_set()
1321        self.num_valid_keys = len(self.valid_keys)
1322        self.num_deleted_keys = len(self.deleted_keys)
1323        self.keys_not_found = {self.client.rest.ip: [], self.client.rest.ip: []}
1324        self.itr = 0
1325        self.err_count = 0
1326        self.max_err_count = max_err_count
1327        self.meta_data_store = meta_data_store
1328
1329    def has_next(self):
1330        if self.itr < (self.num_valid_keys + self.num_deleted_keys) and self.err_count < self.max_err_count:
1331            return True
1332        self.log.info("Meta Data Verification : {0} existing items have been verified"
1333                      .format(self.itr if self.itr < self.num_valid_keys else self.num_valid_keys))
1334        self.log.info("Meta Data Verification : {0} deleted items have been verified"
1335                      .format(self.itr - self.num_valid_keys if self.itr > self.num_valid_keys else 0))
1336        return False
1337
1338    def next(self):
1339        if self.itr < self.num_valid_keys:
1340            self._check_key_meta_data(self.valid_keys[self.itr])
1341        elif self.itr < (self.num_valid_keys + self.num_deleted_keys):
1342            # verify deleted/expired keys
1343            self._check_key_meta_data(self.deleted_keys[self.itr - self.num_valid_keys],
1344                                  ignore_meta_data=['expiration'])
1345        self.itr += 1
1346
1347        # show progress of verification for every 50k items
1348        if math.fmod(self.itr, 50000) == 0.0:
1349            self.log.info("{0} items have been verified".format(self.itr))
1350
1351    def __get_meta_data(self, client, key):
1352        try:
1353            mc = client.memcached(key)
1354            meta_data = eval("{'deleted': %s, 'flags': %s, 'expiration': %s, 'seqno': %s, 'cas': %s}" % (mc.getMeta(key)))
1355            return meta_data
1356        except MemcachedError as error:
1357            if error.status == ERR_NOT_FOUND:
1358                if key not in self.deleted_keys:
1359                    self.err_count += 1
1360                    self.keys_not_found[client.rest.ip].append(("key: %s" % key, "vbucket: %s" % client._get_vBucket_id(key)))
1361            else:
1362                self.state = FINISHED
1363                self.set_exception(error)
1364
1365    def _check_key_meta_data(self, key, ignore_meta_data=[]):
1366        src_meta_data = self.meta_data_store[key]
1367        dest_meta_data = self.__get_meta_data(self.client, key)
1368        if not src_meta_data or not dest_meta_data:
1369            return
1370        prev_error_count = self.err_count
1371        err_msg = []
1372        # seqno number should never be zero
1373        if dest_meta_data['seqno'] == 0:
1374            self.err_count += 1
1375            err_msg.append(
1376                "seqno on Destination should not be 0, Error Count:{0}".format(self.err_count))
1377
1378        # verify all metadata
1379        for meta_key in src_meta_data.keys():
1380            if src_meta_data[meta_key] != dest_meta_data[meta_key] and meta_key not in ignore_meta_data:
1381                self.err_count += 1
1382                err_msg.append("{0} mismatch: Source {0}:{1}, Destination {0}:{2}, Error Count:{3}"
1383                    .format(meta_key, src_meta_data[meta_key],
1384                        dest_meta_data[meta_key], self.err_count))
1385
1386        if self.err_count - prev_error_count > 0:
1387            self.log.error("===== Verifying meta data failed for key: {0} =====".format(key))
1388            [self.log.error(err) for err in err_msg]
1389            self.log.error("Source meta data: %s" % src_meta_data)
1390            self.log.error("Dest meta data: %s" % dest_meta_data)
1391            self.state = FINISHED
1392
1393class GetMetaDataTask(GenericLoadingTask):
1394    def __init__(self, dest_server, bucket, kv_store):
1395        GenericLoadingTask.__init__(self, dest_server, bucket, kv_store)
1396        self.client = VBucketAwareMemcached(RestConnection(dest_server), bucket)
1397        self.valid_keys, self.deleted_keys = kv_store.key_set()
1398        self.num_valid_keys = len(self.valid_keys)
1399        self.num_deleted_keys = len(self.deleted_keys)
1400        self.keys_not_found = {self.client.rest.ip: [], self.client.rest.ip: []}
1401        self.itr = 0
1402        self.err_count = 0
1403        self.max_err_count = 100
1404        self.meta_data_store = {}
1405
1406    def has_next(self):
1407        if self.itr < (self.num_valid_keys + self.num_deleted_keys) and self.err_count < self.max_err_count:
1408            return True
1409        self.log.info("Get Meta Data : {0} existing items have been gathered"
1410                      .format(self.itr if self.itr < self.num_valid_keys else self.num_valid_keys))
1411        self.log.info("Get Meta Data : {0} deleted items have been gathered"
1412                      .format(self.itr - self.num_valid_keys if self.itr > self.num_valid_keys else 0))
1413        return False
1414
1415    def next(self):
1416        if self.itr < self.num_valid_keys:
1417            self.meta_data_store[self.valid_keys[self.itr]] = self.__get_meta_data(self.client,self.valid_keys[self.itr])
1418        elif self.itr < (self.num_valid_keys + self.num_deleted_keys):
1419            self.meta_data_store[self.deleted_keys[self.itr - self.num_valid_keys]] = self.__get_meta_data(self.client,self.deleted_keys[self.itr - self.num_valid_keys])
1420        self.itr += 1
1421
1422    def __get_meta_data(self, client, key):
1423        try:
1424            mc = client.memcached(key)
1425            meta_data = eval("{'deleted': %s, 'flags': %s, 'expiration': %s, 'seqno': %s, 'cas': %s}" % (mc.getMeta(key)))
1426            return meta_data
1427        except MemcachedError as error:
1428            if error.status == ERR_NOT_FOUND:
1429                if key not in self.deleted_keys:
1430                    self.err_count += 1
1431                    self.keys_not_found[client.rest.ip].append(("key: %s" % key, "vbucket: %s" % client._get_vBucket_id(key)))
1432            else:
1433                self.state = FINISHED
1434                self.set_exception(error)
1435
1436    def get_meta_data_store(self):
1437        return self.meta_data_store
1438
1439class ViewCreateTask(Task):
1440    def __init__(self, server, design_doc_name, view, bucket="default", with_query=True,
1441                 check_replication=False, ddoc_options=None):
1442        Task.__init__(self, "create_view_task")
1443        self.server = server
1444        self.bucket = bucket
1445        self.view = view
1446        prefix = ""
1447        if self.view:
1448            prefix = ("", "dev_")[self.view.dev_view]
1449        if design_doc_name.find('/') != -1:
1450            design_doc_name = design_doc_name.replace('/', '%2f')
1451        self.design_doc_name = prefix + design_doc_name
1452        self.ddoc_rev_no = 0
1453        self.with_query = with_query
1454        self.check_replication = check_replication
1455        self.ddoc_options = ddoc_options
1456        self.rest = RestConnection(self.server)
1457
1458    def execute(self, task_manager):
1459
1460        try:
1461            # appending view to existing design doc
1462            content, meta = self.rest.get_ddoc(self.bucket, self.design_doc_name)
1463            ddoc = DesignDocument._init_from_json(self.design_doc_name, content)
1464            # if view is to be updated
1465            if self.view:
1466                if self.view.is_spatial:
1467                    ddoc.add_spatial_view(self.view)
1468                else:
1469                    ddoc.add_view(self.view)
1470            self.ddoc_rev_no = self._parse_revision(meta['rev'])
1471        except ReadDocumentException:
1472            # creating first view in design doc
1473            if self.view:
1474                if self.view.is_spatial:
1475                    ddoc = DesignDocument(self.design_doc_name, [], spatial_views=[self.view])
1476                else:
1477                    ddoc = DesignDocument(self.design_doc_name, [self.view])
1478            # create an empty design doc
1479            else:
1480                ddoc = DesignDocument(self.design_doc_name, [])
1481            if self.ddoc_options:
1482                ddoc.options = self.ddoc_options
1483        # catch and set all unexpected exceptions
1484        except Exception as e:
1485            self.state = FINISHED
1486            self.log.error("Unexpected Exception Caught")
1487            self.set_exception(e)
1488
1489        try:
1490            self.rest.create_design_document(self.bucket, ddoc)
1491            self.state = CHECKING
1492            task_manager.schedule(self)
1493
1494        except DesignDocCreationException as e:
1495            self.state = FINISHED
1496            self.set_exception(e)
1497
1498        # catch and set all unexpected exceptions
1499        except Exception as e:
1500            self.state = FINISHED
1501            self.log.error("Unexpected Exception Caught")
1502            self.set_exception(e)
1503
1504    def check(self, task_manager):
1505        try:
1506            # only query if the DDoc has a view
1507            if self.view:
1508                if self.with_query:
1509                    query = {"stale" : "ok"}
1510                    if self.view.is_spatial:
1511                        content = \
1512                            self.rest.query_view(self.design_doc_name, self.view.name,
1513                                                 self.bucket, query, type="spatial")
1514                    else:
1515                        content = \
1516                            self.rest.query_view(self.design_doc_name, self.view.name,
1517                                                 self.bucket, query)
1518                else:
1519                     _, json_parsed, _ = self.rest._get_design_doc(self.bucket, self.design_doc_name)
1520                     if self.view.is_spatial:
1521                         if self.view.name not in json_parsed["spatial"].keys():
1522                             self.set_exception(
1523                                Exception("design doc {O} doesn't contain spatial view {1}".format(
1524                                self.design_doc_name, self.view.name)))
1525                     else:
1526                         if self.view.name not in json_parsed["views"].keys():
1527                             self.set_exception(Exception("design doc {O} doesn't contain view {1}".format(
1528                                self.design_doc_name, self.view.name)))
1529                self.log.info("view : {0} was created successfully in ddoc: {1}".format(self.view.name, self.design_doc_name))
1530            else:
1531                # if we have reached here, it means design doc was successfully updated
1532                self.log.info("Design Document : {0} was updated successfully".format(self.design_doc_name))
1533
1534            self.state = FINISHED
1535            if self._check_ddoc_revision():
1536                self.set_result(self.ddoc_rev_no)
1537            else:
1538                self.set_exception(Exception("failed to update design document"))
1539
1540            if self.check_replication:
1541                self._check_ddoc_replication_on_nodes()
1542
1543        except QueryViewException as e:
1544            if e.message.find('not_found') or e.message.find('view_undefined') > -1:
1545                task_manager.schedule(self, 2)
1546            else:
1547                self.state = FINISHED
1548                self.log.error("Unexpected Exception Caught")
1549                self.set_exception(e)
1550        # catch and set all unexpected exceptions
1551        except Exception as e:
1552            self.state = FINISHED
1553            self.log.error("Unexpected Exception Caught")
1554            self.set_exception(e)
1555
1556    def _check_ddoc_revision(self):
1557        valid = False
1558        try:
1559            content, meta = self.rest.get_ddoc(self.bucket, self.design_doc_name)
1560            new_rev_id = self._parse_revision(meta['rev'])
1561            if new_rev_id != self.ddoc_rev_no:
1562                self.ddoc_rev_no = new_rev_id
1563                valid = True
1564        except ReadDocumentException:
1565            pass
1566
1567        # catch and set all unexpected exceptions
1568        except Exception as e:
1569            self.state = FINISHED
1570            self.log.error("Unexpected Exception Caught")
1571            self.set_exception(e)
1572
1573        return valid
1574
1575    def _parse_revision(self, rev_string):
1576        return int(rev_string.split('-')[0])
1577
1578    def _check_ddoc_replication_on_nodes(self):
1579
1580        nodes = self.rest.node_statuses()
1581        retry_count = 3
1582
1583        # nothing to check if there is only 1 node
1584        if len(nodes) <= 1:
1585            return
1586
1587        for node in nodes:
1588            server_info = {"ip" : node.ip,
1589                       "port" : node.port,
1590                       "username" : self.rest.username,
1591                       "password" : self.rest.password}
1592
1593            for count in xrange(retry_count):
1594                try:
1595                    rest_node = RestConnection(server_info)
1596                    content, meta = rest_node.get_ddoc(self.bucket, self.design_doc_name)
1597                    new_rev_id = self._parse_revision(meta['rev'])
1598                    if new_rev_id == self.ddoc_rev_no:
1599                        break
1600                    else:
1601                        self.log.info("Design Doc {0} version is not updated on node {1}:{2}. Retrying.".format(self.design_doc_name, node.ip, node.port))
1602                        time.sleep(2)
1603                except ReadDocumentException as e:
1604                    if(count < retry_count):
1605                        self.log.info("Design Doc {0} not yet available on node {1}:{2}. Retrying.".format(self.design_doc_name, node.ip, node.port))
1606                        time.sleep(2)
1607                    else:
1608                        self.log.error("Design Doc {0} failed to replicate on node {1}:{2}".format(self.design_doc_name, node.ip, node.port))
1609                        self.set_exception(e)
1610                        self.state = FINISHED
1611                        break
1612                except Exception as e:
1613                    if(count < retry_count):
1614                        self.log.info("Unexpected Exception Caught. Retrying.")
1615                        time.sleep(2)
1616                    else:
1617                        self.log.error("Unexpected Exception Caught")
1618                        self.set_exception(e)
1619                        self.state = FINISHED
1620                        break
1621            else:
1622                self.set_exception(Exception("Design Doc {0} version mismatch on node {1}:{2}".format(self.design_doc_name, node.ip, node.port)))
1623
1624
1625class ViewDeleteTask(Task):
1626    def __init__(self, server, design_doc_name, view, bucket="default"):
1627        Task.__init__(self, "delete_view_task")
1628        self.server = server
1629        self.bucket = bucket
1630        self.view = view
1631        prefix = ""
1632        if self.view:
1633            prefix = ("", "dev_")[self.view.dev_view]
1634        self.design_doc_name = prefix + design_doc_name
1635
1636    def execute(self, task_manager):
1637        try:
1638            rest = RestConnection(self.server)
1639            if self.view:
1640                # remove view from existing design doc
1641                content, header = rest.get_ddoc(self.bucket, self.design_doc_name)
1642                ddoc = DesignDocument._init_from_json(self.design_doc_name, content)
1643                if self.view.is_spatial:
1644                    status = ddoc.delete_spatial(self.view)
1645                else:
1646                    status = ddoc.delete_view(self.view)
1647                if not status:
1648                    self.state = FINISHED
1649                    self.set_exception(Exception('View does not exist! %s' % (self.view.name)))
1650
1651                # update design doc
1652                rest.create_design_document(self.bucket, ddoc)
1653                self.state = CHECKING
1654                task_manager.schedule(self, 2)
1655            else:
1656                # delete the design doc
1657                rest.delete_view(self.bucket, self.design_doc_name)
1658                self.log.info("Design Doc : {0} was successfully deleted".format(self.design_doc_name))
1659                self.state = FINISHED
1660                self.set_result(True)
1661
1662        except (ValueError, ReadDocumentException, DesignDocCreationException) as e:
1663            self.state = FINISHED
1664            self.set_exception(e)
1665
1666        # catch and set all unexpected exceptions
1667        except Exception as e:
1668            self.state = FINISHED
1669            self.log.error("Unexpected Exception Caught")
1670            self.set_exception(e)
1671
1672    def check(self, task_manager):
1673        try:
1674            rest = RestConnection(self.server)
1675            # make sure view was deleted
1676            query = {"stale" : "ok"}
1677            content = \
1678                rest.query_view(self.design_doc_name, self.view.name, self.bucket, query)
1679            self.state = FINISHED
1680            self.set_result(False)
1681        except QueryViewException as e:
1682            self.log.info("view : {0} was successfully deleted in ddoc: {1}".format(self.view.name, self.design_doc_name))
1683            self.state = FINISHED
1684            self.set_result(True)
1685
1686        # catch and set all unexpected exceptions
1687        except Exception as e:
1688            self.state = FINISHED
1689            self.log.error("Unexpected Exception Caught")
1690            self.set_exception(e)
1691
1692class ViewQueryTask(Task):
1693    def __init__(self, server, design_doc_name, view_name,
1694                 query, expected_rows=None,
1695                 bucket="default", retry_time=2):
1696        Task.__init__(self, "query_view_task")
1697        self.server = server
1698        self.bucket = bucket
1699        self.view_name = view_name
1700        self.design_doc_name = design_doc_name
1701        self.query = query
1702        self.expected_rows = expected_rows
1703        self.retry_time = retry_time
1704        self.timeout = 900
1705
1706    def execute(self, task_manager):
1707        try:
1708            rest = RestConnection(self.server)
1709            # make sure view can be queried
1710            content = \
1711                rest.query_view(self.design_doc_name, self.view_name, self.bucket, self.query, self.timeout)
1712
1713            if self.expected_rows is None:
1714                # no verification
1715                self.state = FINISHED
1716                self.set_result(content)
1717            else:
1718                self.state = CHECKING
1719                task_manager.schedule(self)
1720
1721        except QueryViewException as e:
1722            # initial query failed, try again
1723            task_manager.schedule(self, self.retry_time)
1724
1725        # catch and set all unexpected exceptions
1726        except Exception as e:
1727            self.state = FINISHED
1728            self.log.error("Unexpected Exception Caught")
1729            self.set_exception(e)
1730
1731    def check(self, task_manager):
1732        try:
1733            rest = RestConnection(self.server)
1734            # query and verify expected num of rows returned
1735            content = \
1736                rest.query_view(self.design_doc_name, self.view_name, self.bucket, self.query, self.timeout)
1737
1738            self.log.info("Server: %s, Design Doc: %s, View: %s, (%d rows) expected, (%d rows) returned" % \
1739                          (self.server.ip, self.design_doc_name, self.view_name, self.expected_rows, len(content['rows'])))
1740
1741            raised_error = content.get(u'error', '') or ''.join([str(item) for item in content.get(u'errors', [])])
1742            if raised_error:
1743                raise QueryViewException(self.view_name, raised_error)
1744
1745            if len(content['rows']) == self.expected_rows:
1746                self.log.info("expected number of rows: '{0}' was found for view query".format(self.
1747                            expected_rows))
1748                self.state = FINISHED
1749                self.set_result(True)
1750            else:
1751                if len(content['rows']) > self.expected_rows:
1752                    raise QueryViewException(self.view_name, "Server: {0}, Design Doc: {1}, actual returned rows: '{2}' are greater than expected {3}"
1753                                             .format(self.server.ip, self.design_doc_name, len(content['rows']), self.expected_rows,))
1754                if "stale" in self.query:
1755                    if self.query["stale"].lower() == "false":
1756                        self.state = FINISHED
1757                        self.set_result(False)
1758
1759                # retry until expected results or task times out
1760                task_manager.schedule(self, self.retry_time)
1761        except QueryViewException as e:
1762            # subsequent query failed! exit
1763            self.state = FINISHED
1764            self.set_exception(e)
1765
1766        # catch and set all unexpected exceptions
1767        except Exception as e:
1768            self.state = FINISHED
1769            self.log.error("Unexpected Exception Caught")
1770            self.set_exception(e)
1771
1772class MonitorViewQueryResultsTask(Task):
1773    def __init__(self, servers, design_doc_name, view,
1774                 query, expected_docs=None, bucket="default",
1775                 retries=100, error=None, verify_rows=False,
1776                 server_to_query=0):
1777        Task.__init__(self, "query_view_task")
1778        self.servers = servers
1779        self.bucket = bucket
1780        self.view_name = view.name
1781        self.view = view
1782        self.design_doc_name = design_doc_name
1783        self.query = query
1784        self.retries = retries
1785        self.current_retry = 0
1786        self.timeout = 900
1787        self.error = error
1788        self.expected_docs = expected_docs
1789        self.verify_rows = verify_rows
1790        self.rest = RestConnection(self.servers[server_to_query])
1791        self.results = None
1792        self.connection_timeout = 60000
1793        self.query["connection_timeout"] = self.connection_timeout
1794        if self.design_doc_name.find("dev_") == 0:
1795            self.query["full_set"] = "true"
1796
1797    def execute(self, task_manager):
1798
1799        try:
1800            self.current_retry += 1
1801            self.results = self.rest.query_view(
1802                self.design_doc_name, self.view_name, self.bucket, self.query,
1803                self.timeout)
1804            raised_error = self.results.get(u'error', '') or ''.join([str(item) for item in self.results.get(u'errors', [])])
1805            if raised_error:
1806                raise QueryViewException(self.view_name, raised_error)
1807            else:
1808                self.log.info("view %s, query %s: expected- %s, actual -%s" % (
1809                                        self.design_doc_name, self.query,
1810                                        len(self.expected_docs),
1811                                        len(self.results.get(u'rows', []))))
1812                self.state = CHECKING
1813                task_manager.schedule(self)
1814        except QueryViewException, ex:
1815            self.log.error("During query run (ddoc=%s, query=%s, server=%s) error is: %s" % (
1816                                self.design_doc_name, self.query, self.servers[0].ip, str(ex)))
1817            if self.error and str(ex).find(self.error) != -1:
1818                self.state = FINISHED
1819                self.set_result({"passed" : True,
1820                                 "errors" : str(ex)})
1821            elif self.current_retry == self.retries:
1822                self.state = FINISHED
1823                self.set_result({"passed" : False,
1824                                 "errors" : str(ex)})
1825            elif str(ex).find('view_undefined') != -1 or \
1826                str(ex).find('not_found') != -1 or \
1827                str(ex).find('unable to reach') != -1 or \
1828                str(ex).find('socket error') != -1 or \
1829                str(ex).find('econnrefused') != -1 or \
1830                str(ex).find("doesn't exist") != -1 or \
1831                str(ex).find('missing') != -1 or \
1832                str(ex).find("Undefined set view") != -1:
1833                self.log.error(
1834                       "view_results not ready yet ddoc=%s , try again in 10 seconds..." %
1835                       self.design_doc_name)
1836                task_manager.schedule(self, 10)
1837            elif str(ex).find('timeout') != -1:
1838                self.connection_timeout = self.connection_timeout * 2
1839                self.log.error("view_results not ready yet ddoc=%s ," % self.design_doc_name + \
1840                               " try again in 10 seconds... and double timeout")
1841                task_manager.schedule(self, 10)
1842            else:
1843                self.state = FINISHED
1844                res = {"passed" : False,
1845                       "errors" : str(ex)}
1846                if self.results and self.results.get(u'rows', []):
1847                    res['results'] = self.results
1848                self.set_result(res)
1849        except Exception, ex:
1850            if self.current_retry == self.retries:
1851                self.state = CHECKING
1852                self.log.error("view %s, query %s: verifying results" % (
1853                                        self.design_doc_name, self.query))
1854                task_manager.schedule(self)
1855            else:
1856                self.log.error(
1857                       "view_results not ready yet ddoc=%s , try again in 10 seconds..." %
1858                       self.design_doc_name)
1859                task_manager.schedule(self, 10)
1860
1861    def check(self, task_manager):
1862        try:
1863            if self.view.red_func and (('reduce' in self.query and\
1864                        self.query['reduce'] == "true") or (not 'reduce' in self.query)):
1865                if len(self.expected_docs) != len(self.results.get(u'rows', [])):
1866                    if self.current_retry == self.retries:
1867                        self.state = FINISHED
1868                        msg = "ddoc=%s, query=%s, server=%s" % (
1869                            self.design_doc_name, self.query, self.servers[0].ip)
1870                        msg += "Number of groups expected:%s, actual:%s" % (
1871                             len(self.expected_docs), len(self.results.get(u'rows', [])))
1872                        self.set_result({"passed" : False,
1873                                         "errors" : msg})
1874                    else:
1875                        RestHelper(self.rest)._wait_for_indexer_ddoc(self.servers, self.design_doc_name)
1876                        self.state = EXECUTING
1877                        task_manager.schedule(self, 10)
1878                else:
1879                    for row in self.expected_docs:
1880                        key_expected = row['key']
1881
1882                        if not (key_expected in [key['key'] for key in self.results.get(u'rows', [])]):
1883                            if self.current_retry == self.retries:
1884                                self.state = FINISHED
1885                                msg = "ddoc=%s, query=%s, server=%s" % (
1886                                    self.design_doc_name, self.query, self.servers[0].ip)
1887                                msg += "Key expected but not present :%s" % (key_expected)
1888                                self.set_result({"passed" : False,
1889                                                 "errors" : msg})
1890                            else:
1891                                RestHelper(self.rest)._wait_for_indexer_ddoc(self.servers, self.design_doc_name)
1892                                self.state = EXECUTING
1893                                task_manager.schedule(self, 10)
1894                        else:
1895                            for res in self.results.get(u'rows', []):
1896                                if key_expected == res['key']:
1897                                    value = res['value']
1898                                    break
1899                            msg = "ddoc=%s, query=%s, server=%s\n" % (
1900                                    self.design_doc_name, self.query, self.servers[0].ip)
1901                            msg += "Key %s: expected value %s, actual: %s" % (
1902                                                                key_expected, row['value'], value)
1903                            self.log.info(msg)
1904                            if row['value'] == value:
1905                                self.state = FINISHED
1906                                self.log.info(msg)
1907                                self.set_result({"passed" : True,
1908                                                 "errors" : []})
1909                            else:
1910                                if self.current_retry == self.retries:
1911                                    self.state = FINISHED
1912                                    self.log.error(msg)
1913                                    self.set_result({"passed" : True,
1914                                                     "errors" : msg})
1915                                else:
1916                                    RestHelper(self.rest)._wait_for_indexer_ddoc(self.servers, self.design_doc_name)
1917                                    self.state = EXECUTING
1918                                    task_manager.schedule(self, 10)
1919                return
1920            if len(self.expected_docs) > len(self.results.get(u'rows', [])):
1921                if self.current_retry == self.retries:
1922                    self.state = FINISHED
1923                    self.set_result({"passed" : False,
1924                                     "errors" : [],
1925                                     "results" : self.results})
1926                else:
1927                    RestHelper(self.rest)._wait_for_indexer_ddoc(self.servers, self.design_doc_name)
1928                    if self.current_retry == 70:
1929                        self.query["stale"] = 'false'
1930                    self.log.info("View result is still not expected (ddoc=%s, query=%s, server=%s). retry in 10 sec" % (
1931                                    self.design_doc_name, self.query, self.servers[0].ip))
1932                    self.state = EXECUTING
1933                    task_manager.schedule(self, 10)
1934            elif len(self.expected_docs) < len(self.results.get(u'rows', [])):
1935                self.state = FINISHED
1936                self.set_result({"passed" : False,
1937                                 "errors" : [],
1938                                 "results" : self.results})
1939            elif len(self.expected_docs) == len(self.results.get(u'rows', [])):
1940                if self.verify_rows:
1941                    expected_ids = [row['id'] for row in self.expected_docs]
1942                    rows_ids = [str(row['id']) for row in self.results[u'rows']]
1943                    if expected_ids == rows_ids:
1944                        self.state = FINISHED
1945                        self.set_result({"passed" : True,
1946                                         "errors" : []})
1947                    else:
1948                        if self.current_retry == self.retries:
1949                            self.state = FINISHED
1950                            self.set_result({"passed" : False,
1951                                             "errors" : [],
1952                                             "results" : self.results})
1953                        else:
1954                            self.state = EXECUTING
1955                            task_manager.schedule(self, 10)
1956                else:
1957                    self.state = FINISHED
1958                    self.set_result({"passed" : True,
1959                                     "errors" : []})
1960        # catch and set all unexpected exceptions
1961        except Exception, e:
1962            self.state = FINISHED
1963            self.log.error("Exception caught %s" % str(e))
1964            self.set_exception(e)
1965            self.set_result({"passed" : False,
1966                             "errors" : str(e)})
1967
1968
1969class ModifyFragmentationConfigTask(Task):
1970
1971    """
1972        Given a config dictionary attempt to configure fragmentation settings.
1973        This task will override the default settings that are provided for
1974        a given <bucket>.
1975    """
1976
1977    def __init__(self, server, config=None, bucket="default"):
1978        Task.__init__(self, "modify_frag_config_task")
1979
1980        self.server = server
1981        self.config = {"parallelDBAndVC" : "false",
1982                       "dbFragmentThreshold" : None,
1983                       "viewFragmntThreshold" : None,
1984                       "dbFragmentThresholdPercentage" : 100,
1985                       "viewFragmntThresholdPercentage" : 100,
1986                       "allowedTimePeriodFromHour" : None,
1987                       "allowedTimePeriodFromMin" : None,
1988                       "allowedTimePeriodToHour" : None,
1989                       "allowedTimePeriodToMin" : None,
1990                       "allowedTimePeriodAbort" : None,
1991                       "autoCompactionDefined" : "true"}
1992        self.bucket = bucket
1993
1994        for key in config:
1995            self.config[key] = config[key]
1996
1997    def execute(self, task_manager):
1998        try:
1999            rest = RestConnection(self.server)
2000            rest.set_auto_compaction(parallelDBAndVC=self.config["parallelDBAndVC"],
2001                                     dbFragmentThreshold=self.config["dbFragmentThreshold"],
2002                                     viewFragmntThreshold=self.config["viewFragmntThreshold"],
2003                                     dbFragmentThresholdPercentage=self.config["dbFragmentThresholdPercentage"],
2004                                     viewFragmntThresholdPercentage=self.config["viewFragmntThresholdPercentage"],
2005                                     allowedTimePeriodFromHour=self.config["allowedTimePeriodFromHour"],
2006                                     allowedTimePeriodFromMin=self.config["allowedTimePeriodFromMin"],
2007                                     allowedTimePeriodToHour=self.config["allowedTimePeriodToHour"],
2008                                     allowedTimePeriodToMin=self.config["allowedTimePeriodToMin"],
2009                                     allowedTimePeriodAbort=self.config["allowedTimePeriodAbort"],
2010                                     bucket=self.bucket)
2011
2012            self.state = CHECKING
2013            task_manager.schedule(self, 10)
2014
2015        except Exception as e:
2016            self.state = FINISHED
2017            self.set_exception(e)
2018
2019
2020    def check(self, task_manager):
2021        try:
2022            rest = RestConnection(self.server)
2023            # verify server accepted settings
2024            content = rest.get_bucket_json(self.bucket)
2025            if content["autoCompactionSettings"] == False:
2026                self.set_exception(Exception("Failed to set auto compaction settings"))
2027            else:
2028                # retrieved compaction settings
2029                self.set_result(True)
2030            self.state = FINISHED
2031
2032        except GetBucketInfoFailed as e:
2033            # subsequent query failed! exit
2034            self.state = FINISHED
2035            self.set_exception(e)
2036        # catch and set all unexpected exceptions
2037        except Exception as e:
2038            self.state = FINISHED
2039            self.log.error("Unexpected Exception Caught")
2040            self.set_exception(e)
2041
2042
2043class MonitorActiveTask(Task):
2044
2045    """
2046        Attempt to monitor active task that  is available in _active_tasks API.
2047        It allows to monitor indexer, bucket compaction.
2048
2049        Execute function looks at _active_tasks API and tries to identifies task for monitoring
2050        and its pid by: task type('indexer' , 'bucket_compaction', 'view_compaction' )
2051        and target value (for example "_design/ddoc" for indexing, bucket "default" for bucket compaction or
2052        "_design/dev_view" for view compaction).
2053        wait_task=True means that task should be found in the first attempt otherwise,
2054        we can assume that the task has been completed( reached 100%).
2055
2056        Check function monitors task by pid that was identified in execute func
2057        and matches new progress result with the previous.
2058        task is failed if:
2059            progress is not changed  during num_iterations iteration
2060            new progress was gotten less then previous
2061        task is passed and completed if:
2062            progress reached wait_progress value
2063            task was not found by pid(believe that it's over)
2064    """
2065
2066    def __init__(self, server, type, target_value, wait_progress=100, num_iterations=100, wait_task=True):
2067        Task.__init__(self, "monitor_active_task")
2068        self.server = server
2069        self.type = type  # indexer or bucket_compaction
2070        self.target_key = ""
2071        if self.type == "indexer":
2072            self.target_key = "design_documents"
2073            if target_value.lower() in ['true', 'false']:
2074                # track initial_build indexer task
2075                self.target_key = "initial_build"
2076        elif self.type == "bucket_compaction":
2077            self.target_key = "original_target"
2078        elif self.type == "view_compaction":
2079            self.target_key = "design_documents"
2080        else:
2081            raise Exception("type %s is not defined!" % self.type)
2082        self.target_value = target_value
2083        self.wait_progress = wait_progress
2084        self.num_iterations = num_iterations
2085        self.wait_task = wait_task
2086
2087        self.rest = RestConnection(self.server)
2088        self.current_progress = None
2089        self.current_iter = 0
2090        self.task_pid = None
2091
2092
2093    def execute(self, task_manager):
2094        tasks = self.rest.active_tasks()
2095        print tasks
2096        for task in tasks:
2097            if task["type"] == self.type and ((
2098                        self.target_key == "design_documents" and task[self.target_key][0] == self.target_value) or (
2099                        self.target_key == "original_target" and task[self.target_key]["type"] == self.target_value) or (
2100                        self.target_key == "initial_build" and str(task[self.target_key]) == self.target_value)):
2101                self.current_progress = task["progress"]
2102                self.task_pid = task["pid"]
2103                self.log.info("monitoring active task was found:" + str(task))
2104                self.log.info("progress %s:%s - %s %%" % (self.type, self.target_value, task["progress"]))
2105                if self.current_progress >= self.wait_progress:
2106                    self.log.info("expected progress was gotten: %s" % self.current_progress)
2107                    self.state = FINISHED
2108                    self.set_result(True)
2109
2110                else:
2111                    self.state = CHECKING
2112                    task_manager.schedule(self, 5)
2113                return
2114        if self.wait_task:
2115            # task is not performed
2116            self.state = FINISHED
2117            self.log.error("expected active task %s:%s was not found" % (self.type, self.target_value))
2118            self.set_result(False)
2119        else:
2120            # task was completed
2121            self.state = FINISHED
2122            self.log.info("task for monitoring %s:%s was not found" % (self.type, self.target_value))
2123            self.set_result(True)
2124
2125
2126    def check(self, task_manager):
2127        tasks = self.rest.active_tasks()
2128        for task in tasks:
2129            # if task still exists
2130            if task["pid"] == self.task_pid:
2131                self.log.info("progress %s:%s - %s %%" % (self.type, self.target_value, task["progress"]))
2132                # reached expected progress
2133                if task["progress"] >= self.wait_progress:
2134                        self.state = FINISHED
2135                        self.log.error("progress was reached %s" % self.wait_progress)
2136                        self.set_result(True)
2137                # progress value was changed
2138                if task["progress"] > self.current_progress:
2139                    self.current_progress = task["progress"]
2140                    self.currebt_iter = 0
2141                    task_manager.schedule(self, 2)
2142                # progress value was not changed
2143                elif task["progress"] == self.current_progress:
2144                    if self.current_iter < self.num_iterations:
2145                        time.sleep(2)
2146                        self.current_iter += 1
2147                        task_manager.schedule(self, 2)
2148                    # num iteration with the same progress = num_iterations
2149                    else:
2150                        self.state = FINISHED
2151                        self.log.error("progress for active task was not changed during %s sec" % 2 * self.num_iterations)
2152                        self.set_result(False)
2153                else:
2154                    self.state = FINISHED
2155                    self.log.error("progress for task %s:%s changed direction!" % (self.type, self.target_value))
2156                    self.set_result(False)
2157
2158        # task was completed
2159        self.state = FINISHED
2160        self.log.info("task %s:%s was completed" % (self.type, self.target_value))
2161        self.set_result(True)
2162
2163class MonitorViewFragmentationTask(Task):
2164
2165    """
2166        Attempt to monitor fragmentation that is occurring for a given design_doc.
2167        execute stage is just for preliminary sanity checking of values and environment.
2168
2169        Check function looks at index file accross all nodes and attempts to calculate
2170        total fragmentation occurring by the views within the design_doc.
2171
2172        Note: If autocompaction is enabled and user attempts to monitor for fragmentation
2173        value higher than level at which auto_compaction kicks in a warning is sent and
2174        it is best user to use lower value as this can lead to infinite monitoring.
2175    """
2176
2177    def __init__(self, server, design_doc_name, fragmentation_value=10, bucket="default"):
2178
2179        Task.__init__(self, "monitor_frag_task")
2180        self.server = server
2181        self.bucket = bucket
2182        self.fragmentation_value = fragmentation_value
2183        self.design_doc_name = design_doc_name
2184
2185
2186    def execute(self, task_manager):
2187
2188        # sanity check of fragmentation value
2189        if  self.fragmentation_value < 0 or self.fragmentation_value > 100:
2190            err_msg = \
2191                "Invalid value for fragmentation %d" % self.fragmentation_value
2192            self.state = FINISHED
2193            self.set_exception(Exception(err_msg))
2194
2195        # warning if autocompaction is less than <fragmentation_value>
2196        try:
2197            auto_compact_percentage = self._get_current_auto_compaction_percentage()
2198            if auto_compact_percentage != "undefined" and auto_compact_percentage < self.fragmentation_value:
2199                self.log.warn("Auto compaction is set to %s. Therefore fragmentation_value %s may not be reached" % (auto_compact_percentage, self.fragmentation_value))
2200
2201            self.state = CHECKING
2202            task_manager.schedule(self, 5)
2203        except GetBucketInfoFailed as e:
2204            self.state = FINISHED
2205            self.set_exception(e)
2206        # catch and set all unexpected exceptions
2207        except Exception as e:
2208            self.state = FINISHED
2209            self.log.error("Unexpected Exception Caught")
2210            self.set_exception(e)
2211
2212
2213    def _get_current_auto_compaction_percentage(self):
2214        """ check at bucket level and cluster level for compaction percentage """
2215
2216        auto_compact_percentage = None
2217        rest = RestConnection(self.server)
2218
2219        content = rest.get_bucket_json(self.bucket)
2220        if content["autoCompactionSettings"] == False:
2221            # try to read cluster level compaction settings
2222            content = rest.cluster_status()
2223
2224        auto_compact_percentage = \
2225                content["autoCompactionSettings"]["viewFragmentationThreshold"]["percentage"]
2226
2227        return auto_compact_percentage
2228
2229    def check(self, task_manager):
2230
2231        rest = RestConnection(self.server)
2232        new_frag_value = MonitorViewFragmentationTask.\
2233            calc_ddoc_fragmentation(rest, self.design_doc_name, bucket=self.bucket)
2234
2235        self.log.info("%s: current amount of fragmentation = %d" % (self.design_doc_name,
2236                                                                    new_frag_value))
2237        if new_frag_value > self.fragmentation_value:
2238            self.state = FINISHED
2239            self.set_result(True)
2240        else:
2241            # try again
2242            task_manager.schedule(self, 2)
2243
2244    @staticmethod
2245    def aggregate_ddoc_info(rest, design_doc_name, bucket="default", with_rebalance=False):
2246
2247        nodes = rest.node_statuses()
2248        info = []
2249        for node in nodes:
2250            server_info = {"ip" : node.ip,
2251                           "port" : node.port,
2252                           "username" : rest.username,
2253                           "password" : rest.password}
2254            rest = RestConnection(server_info)
2255            status = False
2256            try:
2257                status, content = rest.set_view_info(bucket, design_doc_name)
2258            except Exception as e:
2259                print(str(e))
2260                if "Error occured reading set_view _info" in str(e) and with_rebalance:
2261                    print("node {0} {1} is not ready yet?: {2}".format(
2262                                    node.id, node.port, e.message))
2263                else:
2264                    raise e
2265            if status:
2266                info.append(content)
2267        return info
2268
2269    @staticmethod
2270    def calc_ddoc_fragmentation(rest, design_doc_name, bucket="default", with_rebalance=False):
2271
2272        total_disk_size = 0
2273        total_data_size = 0
2274        total_fragmentation = 0
2275
2276        nodes_ddoc_info = \
2277            MonitorViewFragmentationTask.aggregate_ddoc_info(rest,
2278                                                         design_doc_name,
2279                                                         bucket, with_rebalance)
2280        total_disk_size = sum([content['disk_size'] for content in nodes_ddoc_info])
2281        total_data_size = sum([content['data_size'] for content in nodes_ddoc_info])
2282
2283        if total_disk_size > 0 and total_data_size > 0:
2284            total_fragmentation = \
2285                (total_disk_size - total_data_size) / float(total_disk_size) * 100
2286
2287        return total_fragmentation
2288
2289class ViewCompactionTask(Task):
2290
2291    """
2292        Executes view compaction for a given design doc. This is technicially view compaction
2293        as represented by the api and also because the fragmentation is generated by the
2294        keys emitted by map/reduce functions within views.  Task will check that compaction
2295        history for design doc is incremented and if any work was really done.
2296    """
2297
2298    def __init__(self, server, design_doc_name, bucket="default", with_rebalance=False):
2299
2300        Task.__init__(self, "view_compaction_task")
2301        self.server = server
2302        self.bucket = bucket
2303        self.design_doc_name = design_doc_name
2304        self.ddoc_id = "_design%2f" + design_doc_name
2305        self.compaction_revision = 0
2306        self.precompacted_fragmentation = 0
2307        self.with_rebalance = with_rebalance
2308        self.rest = RestConnection(self.server)
2309
2310    def execute(self, task_manager):
2311        try:
2312            self.compaction_revision, self.precompacted_fragmentation = \
2313                self._get_compaction_details()
2314            self.log.info("{0}: stats compaction before triggering it: ({1},{2})".
2315                          format(self.design_doc_name,
2316                                 self.compaction_revision, self.precompacted_fragmentation))
2317            if self.precompacted_fragmentation == 0:
2318                self.log.info("%s: There is nothing to compact, fragmentation is 0" %
2319                              self.design_doc_name)
2320                self.set_result(False)
2321                self.state = FINISHED
2322                return
2323            self.rest.ddoc_compaction(self.ddoc_id, self.bucket)
2324            self.state = CHECKING
2325            task_manager.schedule(self, 2)
2326        except (CompactViewFailed, SetViewInfoNotFound) as ex:
2327            self.state = FINISHED
2328            self.set_exception(ex)
2329        # catch and set all unexpected exceptions
2330        except Exception as e:
2331            self.state = FINISHED
2332            self.log.error("Unexpected Exception Caught")
2333            self.set_exception(e)
2334
2335    # verify compaction history incremented and some defraging occurred
2336    def check(self, task_manager):
2337
2338        try:
2339            _compaction_running = self._is_compacting()
2340            new_compaction_revision, fragmentation = self._get_compaction_details()
2341            self.log.info("{0}: stats compaction:revision and fragmentation: ({1},{2})".
2342                          format(self.design_doc_name,
2343                                 new_compaction_revision, fragmentation))
2344
2345            if new_compaction_revision == self.compaction_revision and _compaction_running :
2346                # compaction ran successfully but compaction was not changed
2347                # perhaps we are still compacting
2348                self.log.info("design doc {0} is compacting".format(self.design_doc_name))
2349                task_manager.schedule(self, 3)
2350            elif new_compaction_revision > self.compaction_revision or\
2351                 self.precompacted_fragmentation > fragmentation:
2352                self.log.info("{1}: compactor was run, compaction revision was changed on {0}".format(new_compaction_revision,
2353                                                                                                      self.design_doc_name))
2354                frag_val_diff = fragmentation - self.precompacted_fragmentation
2355                self.log.info("%s: fragmentation went from %d to %d" % \
2356                              (self.design_doc_name,
2357                               self.precompacted_fragmentation, fragmentation))
2358
2359                if frag_val_diff > 0:
2360
2361                    # compaction ran successfully but datasize still same
2362                    # perhaps we are still compacting
2363                    if self._is_compacting():
2364                        task_manager.schedule(self, 2)
2365                    self.log.info("compaction was completed, but fragmentation value {0} is more than before compaction {1}".
2366                                  format(fragmentation, self.precompacted_fragmentation))
2367                    # probably we already compacted, but no work needed to be done
2368                    self.set_result(self.with_rebalance)
2369                else:
2370                    self.set_result(True)
2371                self.state = FINISHED
2372            else:
2373                # Sometimes the compacting is not started immediately
2374                for i in xrange(17):
2375                    time.sleep(3)
2376                    if self._is_compacting():
2377                        task_manager.schedule(self, 2)
2378                        return
2379                    else:
2380                        new_compaction_revision, fragmentation = self._get_compaction_details()
2381                        self.log.info("{2}: stats compaction: ({0},{1})".
2382                          format(new_compaction_revision, fragmentation,
2383                                 self.design_doc_name))
2384                        # case of rebalance when with concurrent updates it's possible that
2385                        # compaction value has not changed significantly
2386                        if