xref: /6.0.3/testrunner/lib/tasks/task.py (revision 33445f7a)
1import os
2import time
3import logger
4import random
5import socket
6import string
7import copy
8import json
9import re
10import math
11import crc32
12import traceback
13import testconstants
14from httplib import IncompleteRead
15from threading import Thread
16from memcacheConstants import ERR_NOT_FOUND,NotFoundError
17from membase.api.rest_client import RestConnection, Bucket, RestHelper
18from membase.api.exception import BucketCreationException
19from membase.helper.bucket_helper import BucketOperationHelper
20from memcached.helper.data_helper import KVStoreAwareSmartClient, MemcachedClientHelper
21from memcached.helper.kvstore import KVStore
22from couchbase_helper.document import DesignDocument, View
23from mc_bin_client import MemcachedError
24from tasks.future import Future
25from couchbase_helper.stats_tools import StatsCommon
26from membase.api.exception import N1QLQueryException, DropIndexException, CreateIndexException, DesignDocCreationException, QueryViewException, ReadDocumentException, RebalanceFailedException, \
27                                    GetBucketInfoFailed, CompactViewFailed, SetViewInfoNotFound, FailoverFailedException, \
28                                    ServerUnavailableException, BucketFlushFailed, CBRecoveryFailedException, BucketCompactionException, AutoFailoverException
29from remote.remote_util import RemoteMachineShellConnection, RemoteUtilHelper
30from couchbase_helper.documentgenerator import BatchedDocumentGenerator
31from TestInput import TestInputServer, TestInputSingleton
32from testconstants import MIN_KV_QUOTA, INDEX_QUOTA, FTS_QUOTA, COUCHBASE_FROM_4DOT6,\
33                          THROUGHPUT_CONCURRENCY, ALLOW_HTP, CBAS_QUOTA, COUCHBASE_FROM_VERSION_4,\
34                          CLUSTER_QUOTA_RATIO
35from multiprocessing import Process, Manager, Semaphore
36
37try:
38    CHECK_FLAG = False
39    if (testconstants.TESTRUNNER_CLIENT in os.environ.keys()) and os.environ[testconstants.TESTRUNNER_CLIENT] == testconstants.PYTHON_SDK:
40        from sdk_client import SDKSmartClient as VBucketAwareMemcached
41        from sdk_client import SDKBasedKVStoreAwareSmartClient as KVStoreAwareSmartClient
42    else:
43        CHECK_FLAG = True
44        from memcached.helper.data_helper import VBucketAwareMemcached,KVStoreAwareSmartClient
45except Exception as e:
46    CHECK_FLAG = True
47    from memcached.helper.data_helper import VBucketAwareMemcached,KVStoreAwareSmartClient
48
49# TODO: Setup stacktracer
50# TODO: Needs "easy_install pygments"
51# import stacktracer
52# stacktracer.trace_start("trace.html",interval=30,auto=True) # Set auto flag to always update file!
53
54
55CONCURRENCY_LOCK = Semaphore(THROUGHPUT_CONCURRENCY)
56PENDING = 'PENDING'
57EXECUTING = 'EXECUTING'
58CHECKING = 'CHECKING'
59FINISHED = 'FINISHED'
60
61class Task(Future):
62    def __init__(self, name):
63        Future.__init__(self)
64        self.log = logger.Logger.get_logger()
65        self.state = PENDING
66        self.name = name
67        self.cancelled = False
68        self.retries = 0
69        self.res = None
70
71    def step(self, task_manager):
72        if not self.done():
73            if self.state == PENDING:
74                self.state = EXECUTING
75                task_manager.schedule(self)
76            elif self.state == EXECUTING:
77                self.execute(task_manager)
78            elif self.state == CHECKING:
79                self.check(task_manager)
80            elif self.state != FINISHED:
81                raise Exception("Bad State in {0}: {1}".format(self.name, self.state))
82
83    def execute(self, task_manager):
84        raise NotImplementedError
85
86    def check(self, task_manager):
87        raise NotImplementedError
88
89    def set_unexpected_exception(self, e, suffix = ""):
90        self.log.error("Unexpected exception [{0}] caught".format(e) + suffix)
91        self.log.error(''.join(traceback.format_stack()))
92        self.set_exception(e)
93
94class NodeInitializeTask(Task):
95    def __init__(self, server, disabled_consistent_view=None,
96                 rebalanceIndexWaitingDisabled=None,
97                 rebalanceIndexPausingDisabled=None,
98                 maxParallelIndexers=None,
99                 maxParallelReplicaIndexers=None,
100                 port=None, quota_percent=None,
101                 index_quota_percent=None,
102                 services = None, gsi_type='forestdb'):
103        Task.__init__(self, "node_init_task")
104        self.server = server
105        self.port = port or server.port
106        self.quota = 0
107        self.index_quota = 0
108        self.index_quota_percent = index_quota_percent
109        self.quota_percent = quota_percent
110        self.disable_consistent_view = disabled_consistent_view
111        self.rebalanceIndexWaitingDisabled = rebalanceIndexWaitingDisabled
112        self.rebalanceIndexPausingDisabled = rebalanceIndexPausingDisabled
113        self.maxParallelIndexers = maxParallelIndexers
114        self.maxParallelReplicaIndexers = maxParallelReplicaIndexers
115        self.services = services
116        self.gsi_type = gsi_type
117
118    def execute(self, task_manager):
119        try:
120            rest = RestConnection(self.server)
121        except ServerUnavailableException as error:
122                self.state = FINISHED
123                self.set_exception(error)
124                return
125        info = Future.wait_until(lambda: rest.get_nodes_self(),
126                                 lambda x: x.memoryTotal > 0, 10)
127        self.log.info("server: %s, nodes/self: %s", self.server, info.__dict__)
128
129        username = self.server.rest_username
130        password = self.server.rest_password
131
132        if int(info.port) in range(9091,9991):
133            self.state = FINISHED
134            self.set_result(True)
135            return
136
137        self.quota = int(info.mcdMemoryReserved * 2/3)
138        if self.index_quota_percent:
139            self.index_quota = int((info.mcdMemoryReserved * 2/3) * \
140                                      self.index_quota_percent / 100)
141            rest.set_service_memoryQuota(service='indexMemoryQuota', username=username, password=password, memoryQuota=self.index_quota)
142        if self.quota_percent:
143           self.quota = int(info.mcdMemoryReserved * self.quota_percent / 100)
144
145        """ Adjust KV RAM to correct value when there is INDEX
146            and FTS services added to node from Watson  """
147        index_quota = INDEX_QUOTA
148        kv_quota = int(info.mcdMemoryReserved * CLUSTER_QUOTA_RATIO)
149        if self.index_quota_percent:
150                index_quota = self.index_quota
151        if not self.quota_percent:
152            set_services = copy.deepcopy(self.services)
153            if set_services is None:
154                set_services = ["kv"]
155#             info = rest.get_nodes_self()
156#             cb_version = info.version[:5]
157#             if cb_version in COUCHBASE_FROM_VERSION_4:
158            if "index" in set_services:
159                self.log.info("quota for index service will be %s MB" % (index_quota))
160                kv_quota -= index_quota
161                self.log.info("set index quota to node %s " % self.server.ip)
162                rest.set_service_memoryQuota(service='indexMemoryQuota', memoryQuota=index_quota)
163            if "fts" in set_services:
164                self.log.info("quota for fts service will be %s MB" % (FTS_QUOTA))
165                kv_quota -= FTS_QUOTA
166                self.log.info("set both index and fts quota at node %s "% self.server.ip)
167                rest.set_service_memoryQuota(service='ftsMemoryQuota', memoryQuota=FTS_QUOTA)
168            if "cbas" in set_services:
169                self.log.info("quota for cbas service will be %s MB" % (CBAS_QUOTA))
170                kv_quota -= CBAS_QUOTA
171                rest.set_service_memoryQuota(service = "cbasMemoryQuota", memoryQuota=CBAS_QUOTA)
172            if kv_quota < MIN_KV_QUOTA:
173                    raise Exception("KV RAM needs to be more than %s MB"
174                            " at node  %s"  % (MIN_KV_QUOTA, self.server.ip))
175            if kv_quota < int(self.quota):
176                self.quota = kv_quota
177
178        rest.init_cluster_memoryQuota(username, password, self.quota)
179
180        if self.services:
181            status = rest.init_node_services(username= username, password = password,\
182                                          port = self.port, hostname= self.server.ip,\
183                                                              services= self.services)
184            if not status:
185                self.state = FINISHED
186                self.set_exception(Exception('unable to set services for server %s'\
187                                                               % (self.server.ip)))
188                return
189        if self.disable_consistent_view is not None:
190            rest.set_reb_cons_view(self.disable_consistent_view)
191        if self.rebalanceIndexWaitingDisabled is not None:
192            rest.set_reb_index_waiting(self.rebalanceIndexWaitingDisabled)
193        if self.rebalanceIndexPausingDisabled is not None:
194            rest.set_rebalance_index_pausing(self.rebalanceIndexPausingDisabled)
195        if self.maxParallelIndexers is not None:
196            rest.set_max_parallel_indexers(self.maxParallelIndexers)
197        if self.maxParallelReplicaIndexers is not None:
198            rest.set_max_parallel_replica_indexers(self.maxParallelReplicaIndexers)
199
200        rest.init_cluster(username, password, self.port)
201        remote_shell = RemoteMachineShellConnection(self.server)
202        remote_shell.enable_diag_eval_on_non_local_hosts()
203        if rest.is_cluster_compat_mode_greater_than(4.0):
204            if self.gsi_type == "plasma":
205                if not rest.is_cluster_compat_mode_greater_than(5.0):
206                    rest.set_indexer_storage_mode(username, password, "forestdb")
207                else:
208                    rest.set_indexer_storage_mode(username, password, self.gsi_type)
209            else:
210                rest.set_indexer_storage_mode(username, password, self.gsi_type)
211        self.server.port = self.port
212        try:
213            rest = RestConnection(self.server)
214        except ServerUnavailableException as error:
215                self.state = FINISHED
216                self.set_exception(error)
217                return
218        info = rest.get_nodes_self()
219
220        if info is None:
221            self.state = FINISHED
222            self.set_exception(Exception('unable to get information on a server %s, it is available?' % (self.server.ip)))
223            return
224        self.state = CHECKING
225        task_manager.schedule(self)
226
227    def check(self, task_manager):
228        self.state = FINISHED
229        self.set_result(self.quota)
230
231
232class BucketCreateTask(Task):
233    def __init__(self, bucket_params):
234        Task.__init__(self, "bucket_create_task")
235        self.server = bucket_params['server']
236        self.bucket = bucket_params['bucket_name']
237        self.replicas = bucket_params['replicas']
238        self.port = bucket_params['port']
239        self.size = bucket_params['size']
240        self.password = bucket_params['password']
241        self.bucket_type = bucket_params['bucket_type']
242        self.enable_replica_index = bucket_params['enable_replica_index']
243        self.eviction_policy = bucket_params['eviction_policy']
244        self.lww = bucket_params['lww']
245        if 'maxTTL' in bucket_params:
246            self.maxttl = bucket_params['maxTTL']
247        else:
248            self.maxttl = 0
249        if 'compressionMode' in bucket_params:
250            self.compressionMode = bucket_params['compressionMode']
251        else:
252            self.compressionMode = 'passive'
253        self.flush_enabled = bucket_params['flush_enabled']
254        if bucket_params['bucket_priority'] is None or bucket_params['bucket_priority'].lower() is 'low':
255            self.bucket_priority = 3
256        else:
257            self.bucket_priority = 8
258
259    def execute(self, task_manager):
260        try:
261            rest = RestConnection(self.server)
262        except ServerUnavailableException as error:
263                self.state = FINISHED
264                self.set_exception(error)
265                return
266        info = rest.get_nodes_self()
267
268        if self.size <= 0:
269            self.size = info.memoryQuota * 2 / 3
270
271        authType = 'none' if self.password is None else 'sasl'
272
273        if int(info.port) in xrange(9091, 9991):
274            try:
275                self.port = info.port
276                rest.create_bucket(bucket=self.bucket)
277                self.state = CHECKING
278                task_manager.schedule(self)
279            except Exception as e:
280                self.state = FINISHED
281                self.set_exception(e)
282            return
283
284
285        version = rest.get_nodes_self().version
286        try:
287            if float(version[:2]) >= 3.0 and self.bucket_priority is not None:
288                rest.create_bucket(bucket=self.bucket,
289                               ramQuotaMB=self.size,
290                               replicaNumber=self.replicas,
291                               proxyPort=self.port,
292                               authType=authType,
293                               saslPassword=self.password,
294                               bucketType=self.bucket_type,
295                               replica_index=self.enable_replica_index,
296                               flushEnabled=self.flush_enabled,
297                               evictionPolicy=self.eviction_policy,
298                               threadsNumber=self.bucket_priority,
299                               lww=self.lww,
300                               maxTTL=self.maxttl,
301                               compressionMode=self.compressionMode
302                               )
303            else:
304                rest.create_bucket(bucket=self.bucket,
305                               ramQuotaMB=self.size,
306                               replicaNumber=self.replicas,
307                               proxyPort=self.port,
308                               authType=authType,
309                               saslPassword=self.password,
310                               bucketType=self.bucket_type,
311                               replica_index=self.enable_replica_index,
312                               flushEnabled=self.flush_enabled,
313                               evictionPolicy=self.eviction_policy,
314                               lww=self.lww,
315                               maxTTL=self.maxttl,
316                               compressionMode=self.compressionMode)
317            self.state = CHECKING
318            task_manager.schedule(self)
319
320        except BucketCreationException as e:
321            self.state = FINISHED
322            self.set_exception(e)
323        # catch and set all unexpected exceptions
324        except Exception as e:
325            self.state = FINISHED
326            self.set_unexpected_exception(e)
327
328    def check(self, task_manager):
329        try:
330            if self.bucket_type == 'memcached' or int(self.port) in xrange(9091, 9991):
331                self.set_result(True)
332                self.state = FINISHED
333                return
334            if BucketOperationHelper.wait_for_memcached(self.server, self.bucket):
335                self.log.info("bucket '{0}' was created with per node RAM quota: {1}".format(self.bucket, self.size))
336                self.set_result(True)
337                self.state = FINISHED
338                return
339            else:
340                self.log.warn("vbucket map not ready after try {0}".format(self.retries))
341                if self.retries >= 5:
342                    self.set_result(False)
343                    self.state = FINISHED
344                    return
345        except Exception as e:
346            self.log.error("Unexpected error: %s" % str(e))
347            self.log.warn("vbucket map not ready after try {0}".format(self.retries))
348            if self.retries >= 5:
349                self.state = FINISHED
350                self.set_exception(e)
351        self.retries = self.retries + 1
352        task_manager.schedule(self)
353
354
355class BucketDeleteTask(Task):
356    def __init__(self, server, bucket="default"):
357        Task.__init__(self, "bucket_delete_task")
358        self.server = server
359        self.bucket = bucket
360
361    def execute(self, task_manager):
362        try:
363            rest = RestConnection(self.server)
364            if rest.delete_bucket(self.bucket):
365                self.state = CHECKING
366                task_manager.schedule(self)
367            else:
368                self.log.info(StatsCommon.get_stats([self.server], self.bucket, "timings"))
369                self.state = FINISHED
370                self.set_result(False)
371        # catch and set all unexpected exceptions
372
373        except Exception as e:
374            self.state = FINISHED
375            self.log.info(StatsCommon.get_stats([self.server], self.bucket, "timings"))
376            self.set_unexpected_exception(e)
377
378    def check(self, task_manager):
379        try:
380            rest = RestConnection(self.server)
381            if BucketOperationHelper.wait_for_bucket_deletion(self.bucket, rest, 200):
382                self.set_result(True)
383            else:
384                self.set_result(False)
385            self.state = FINISHED
386        # catch and set all unexpected exceptions
387        except Exception as e:
388            self.state = FINISHED
389            self.log.info(StatsCommon.get_stats([self.server], self.bucket, "timings"))
390            self.set_unexpected_exception(e)
391
392class RebalanceTask(Task):
393    def __init__(self, servers, to_add=[], to_remove=[], do_stop=False, progress=30,
394                 use_hostnames=False, services=None):
395        Task.__init__(self, "rebalance_task")
396        self.servers = servers
397        self.to_add = to_add
398        self.to_remove = to_remove
399        self.start_time = None
400        self.services = services
401        self.monitor_vbuckets_shuffling = False
402
403        try:
404            self.rest = RestConnection(self.servers[0])
405        except ServerUnavailableException, e:
406            self.log.error(e)
407            self.state = FINISHED
408            self.set_exception(e)
409        self.retry_get_progress = 0
410        self.use_hostnames = use_hostnames
411        self.previous_progress = 0
412        self.old_vbuckets = {}
413
414    def execute(self, task_manager):
415        try:
416            if len(self.to_add) and len(self.to_add) == len(self.to_remove):
417                node_version_check = self.rest.check_node_versions()
418                non_swap_servers = set(self.servers) - set(self.to_remove) - set(self.to_add)
419                self.old_vbuckets = RestHelper(self.rest)._get_vbuckets(non_swap_servers, None)
420                if self.old_vbuckets:
421                    self.monitor_vbuckets_shuffling = True
422                if self.monitor_vbuckets_shuffling and node_version_check and self.services:
423                    for service_group in self.services:
424                        if "kv" not in service_group:
425                            self.monitor_vbuckets_shuffling = False
426                if self.monitor_vbuckets_shuffling and node_version_check:
427                    services_map = self.rest.get_nodes_services()
428                    for remove_node in self.to_remove:
429                         key = "{0}:{1}".format(remove_node.ip,remove_node.port)
430                         services = services_map[key]
431                         if "kv" not in services:
432                            self.monitor_vbuckets_shuffling = False
433                if self.monitor_vbuckets_shuffling:
434                    self.log.info("This is swap rebalance and we will monitor vbuckets shuffling")
435            self.add_nodes(task_manager)
436            self.start_rebalance(task_manager)
437            self.state = CHECKING
438            task_manager.schedule(self)
439        except Exception as e:
440            self.state = FINISHED
441            traceback.print_exc()
442            self.set_exception(e)
443
444    def add_nodes(self, task_manager):
445        master = self.servers[0]
446        services_for_node = None
447        node_index = 0
448        for node in self.to_add:
449            self.log.info("adding node {0}:{1} to cluster".format(node.ip, node.port))
450            if self.services != None:
451                services_for_node = [self.services[node_index]]
452                node_index += 1
453            if self.use_hostnames:
454                self.rest.add_node(master.rest_username, master.rest_password,
455                                   node.hostname, node.port, services = services_for_node)
456            else:
457                self.rest.add_node(master.rest_username, master.rest_password,
458                                   node.ip, node.port, services = services_for_node)
459
460    def start_rebalance(self, task_manager):
461        nodes = self.rest.node_statuses()
462
463        # Determine whether its a cluster_run/not
464        cluster_run = True
465
466        firstIp = self.servers[0].ip
467        if len(self.servers) == 1 and self.servers[0].port == '8091':
468            cluster_run = False
469        else:
470            for node in self.servers:
471                if node.ip != firstIp:
472                    cluster_run = False
473                    break
474        ejectedNodes = []
475
476        for server in self.to_remove:
477            for node in nodes:
478                if cluster_run:
479                    if int(server.port) == int(node.port):
480                        ejectedNodes.append(node.id)
481                else:
482                    if self.use_hostnames:
483                        if server.hostname == node.ip and int(server.port) == int(node.port):
484                            ejectedNodes.append(node.id)
485                    elif server.ip == node.ip and int(server.port) == int(node.port):
486                        ejectedNodes.append(node.id)
487        if self.rest.is_cluster_mixed():
488            # workaround MB-8094
489            self.log.warn("cluster is mixed. sleep for 15 seconds before rebalance")
490            time.sleep(15)
491
492        self.rest.rebalance(otpNodes=[node.id for node in nodes], ejectedNodes=ejectedNodes)
493        self.start_time = time.time()
494
495    def check(self, task_manager):
496        status = None
497        progress = -100
498        try:
499            if self.monitor_vbuckets_shuffling:
500                self.log.info("This is swap rebalance and we will monitor vbuckets shuffling")
501                non_swap_servers = set(self.servers) - set(self.to_remove) - set(self.to_add)
502                new_vbuckets = RestHelper(self.rest)._get_vbuckets(non_swap_servers, None)
503                for vb_type in ["active_vb", "replica_vb"]:
504                    for srv in non_swap_servers:
505                        if not(len(self.old_vbuckets[srv][vb_type]) + 1 >= len(new_vbuckets[srv][vb_type]) and\
506                           len(self.old_vbuckets[srv][vb_type]) - 1 <= len(new_vbuckets[srv][vb_type])):
507                            msg = "Vbuckets were suffled! Expected %s for %s" % (vb_type, srv.ip) + \
508                                " are %s. And now are %s" % (
509                                len(self.old_vbuckets[srv][vb_type]),
510                                len(new_vbuckets[srv][vb_type]))
511                            self.log.error(msg)
512                            self.log.error("Old vbuckets: %s, new vbuckets %s" % (self.old_vbuckets, new_vbuckets))
513                            raise Exception(msg)
514            (status, progress) = self.rest._rebalance_status_and_progress()
515            self.log.info("Rebalance - status: {}, progress: {:.02f}%".format(status, progress))
516            # if ServerUnavailableException
517            if progress == -100:
518                self.retry_get_progress += 1
519            if self.previous_progress != progress:
520                self.previous_progress = progress
521                self.retry_get_progress = 0
522            else:
523                self.retry_get_progress += 1
524        except RebalanceFailedException as ex:
525            self.state = FINISHED
526            self.set_exception(ex)
527            self.retry_get_progress += 1
528        # catch and set all unexpected exceptions
529        except Exception as e:
530            self.state = FINISHED
531            self.set_unexpected_exception(e, " in {0} sec".format(time.time() - self.start_time))
532        retry_get_process_num = 25
533        if self.rest.is_cluster_mixed():
534            """ for mix cluster, rebalance takes longer """
535            self.log.info("rebalance in mix cluster")
536            retry_get_process_num = 40
537        # we need to wait for status to be 'none' (i.e. rebalance actually finished and
538        # not just 'running' and at 100%) before we declare ourselves done
539        if progress != -1 and status != 'none':
540            if self.retry_get_progress < retry_get_process_num:
541                task_manager.schedule(self, 10)
542            else:
543                self.state = FINISHED
544                #self.set_result(False)
545                self.rest.print_UI_logs()
546                self.set_exception(RebalanceFailedException(\
547                                "seems like rebalance hangs. please check logs!"))
548        else:
549            success_cleaned = []
550            for removed in self.to_remove:
551                try:
552                    rest = RestConnection(removed)
553                except ServerUnavailableException, e:
554                    self.log.error(e)
555                    continue
556                start = time.time()
557                while time.time() - start < 30:
558                    try:
559                        if 'pools' in rest.get_pools_info() and \
560                                      (len(rest.get_pools_info()["pools"]) == 0):
561                            success_cleaned.append(removed)
562                            break
563                        else:
564                            time.sleep(0.1)
565                    except (ServerUnavailableException, IncompleteRead), e:
566                        self.log.error(e)
567            result = True
568            for node in set(self.to_remove) - set(success_cleaned):
569                self.log.error("node {0}:{1} was not cleaned after removing from cluster"\
570                                                              .format(node.ip, node.port))
571                result = False
572
573            self.log.info("rebalancing was completed with progress: {0}% in {1} sec".
574                          format(progress, time.time() - self.start_time))
575            self.state = FINISHED
576            self.set_result(result)
577
578class StatsWaitTask(Task):
579    EQUAL = '=='
580    NOT_EQUAL = '!='
581    LESS_THAN = '<'
582    LESS_THAN_EQ = '<='
583    GREATER_THAN = '>'
584    GREATER_THAN_EQ = '>='
585
586    def __init__(self, servers, bucket, param, stat, comparison, value):
587        Task.__init__(self, "stats_wait_task")
588        self.servers = servers
589        self.bucket = bucket
590        if isinstance(bucket, Bucket):
591            self.bucket = bucket.name
592        self.param = param
593        self.stat = stat
594        self.comparison = comparison
595        self.value = value
596        self.conns = {}
597
598    def execute(self, task_manager):
599        self.state = CHECKING
600        task_manager.schedule(self)
601
602    def check(self, task_manager):
603        stat_result = 0
604        for server in self.servers:
605            try:
606                client = self._get_connection(server)
607                stats = client.stats(self.param)
608                if not stats.has_key(self.stat):
609                    self.state = FINISHED
610                    self.set_exception(Exception("Stat {0} not found".format(self.stat)))
611                    return
612                if stats[self.stat].isdigit():
613                    stat_result += long(stats[self.stat])
614                else:
615                    stat_result = stats[self.stat]
616            except EOFError as ex:
617                self.state = FINISHED
618                self.set_exception(ex)
619                return
620        if not self._compare(self.comparison, str(stat_result), self.value):
621            self.log.warn("Not Ready: %s %s %s %s expected on %s, %s bucket" % (self.stat, stat_result,
622                      self.comparison, self.value, self._stringify_servers(), self.bucket))
623            task_manager.schedule(self, 5)
624            return
625        self.log.info("Saw %s %s %s %s expected on %s,%s bucket" % (self.stat, stat_result,
626                      self.comparison, self.value, self._stringify_servers(), self.bucket))
627
628        for server, conn in self.conns.items():
629            conn.close()
630        self.state = FINISHED
631        self.set_result(True)
632
633    def _stringify_servers(self):
634        return ''.join([`server.ip + ":" + str(server.port)` for server in self.servers])
635
636    def _get_connection(self, server, admin_user='cbadminbucket',admin_pass='password'):
637        if not self.conns.has_key(server):
638            for i in xrange(3):
639                try:
640                    self.conns[server] = MemcachedClientHelper.direct_client(server, self.bucket, admin_user=admin_user,
641                                                                             admin_pass=admin_pass)
642                    return self.conns[server]
643                except (EOFError, socket.error):
644                    self.log.error("failed to create direct client, retry in 1 sec")
645                    time.sleep(1)
646            self.conns[server] = MemcachedClientHelper.direct_client(server, self.bucket, admin_user=admin_user,
647                                                                     admin_pass=admin_pass)
648        return self.conns[server]
649
650    def _compare(self, cmp_type, a, b):
651        if isinstance(b, (int, long)) and a.isdigit():
652            a = long(a)
653        elif isinstance(b, (int, long)) and not a.isdigit():
654                return False
655        if (cmp_type == StatsWaitTask.EQUAL and a == b) or\
656            (cmp_type == StatsWaitTask.NOT_EQUAL and a != b) or\
657            (cmp_type == StatsWaitTask.LESS_THAN_EQ and a <= b) or\
658            (cmp_type == StatsWaitTask.GREATER_THAN_EQ and a >= b) or\
659            (cmp_type == StatsWaitTask.LESS_THAN and a < b) or\
660            (cmp_type == StatsWaitTask.GREATER_THAN and a > b):
661            return True
662        return False
663
664
665class XdcrStatsWaitTask(StatsWaitTask):
666    def __init__(self, servers, bucket, param, stat, comparison, value):
667        StatsWaitTask.__init__(self, servers, bucket, param, stat, comparison, value)
668
669    def check(self, task_manager):
670        stat_result = 0
671        for server in self.servers:
672            try:
673                rest = RestConnection(server)
674                stat = 'replications/' + rest.get_replication_for_buckets(self.bucket, self.bucket)['id'] + '/' + self.stat
675                # just get the required value, don't fetch the big big structure of stats
676                stats_value = rest.fetch_bucket_xdcr_stats(self.bucket)['op']['samples'][stat][-1]
677                stat_result += long(stats_value)
678            except (EOFError, Exception)  as ex:
679                self.state = FINISHED
680                self.set_exception(ex)
681                return
682        if not self._compare(self.comparison, str(stat_result), self.value):
683            self.log.warn("Not Ready: %s %s %s %s expected on %s, %s bucket" % (self.stat, stat_result,
684                      self.comparison, self.value, self._stringify_servers(), self.bucket))
685            task_manager.schedule(self, 5)
686            return
687        self.log.info("Saw %s %s %s %s expected on %s,%s bucket" % (self.stat, stat_result,
688                      self.comparison, self.value, self._stringify_servers(), self.bucket))
689
690        for server, conn in self.conns.items():
691            conn.close()
692        self.state = FINISHED
693        self.set_result(True)
694
695class GenericLoadingTask(Thread, Task):
696    def __init__(self, server, bucket, kv_store, batch_size=1, pause_secs=1, timeout_secs=60, compression=True):
697        Thread.__init__(self)
698        Task.__init__(self, "load_gen_task")
699        self.kv_store = kv_store
700        self.batch_size = batch_size
701        self.pause = pause_secs
702        self.timeout = timeout_secs
703        self.server = server
704        self.bucket = bucket
705        if CHECK_FLAG:
706            self.client = VBucketAwareMemcached(RestConnection(server), bucket)
707        else:
708            self.client = VBucketAwareMemcached(RestConnection(server), bucket, compression=compression)
709        self.process_concurrency = THROUGHPUT_CONCURRENCY
710        # task queue's for synchronization
711        process_manager = Manager()
712        self.wait_queue = process_manager.Queue()
713        self.shared_kvstore_queue = process_manager.Queue()
714
715    def execute(self, task_manager):
716        self.start()
717        self.state = EXECUTING
718
719    def check(self, task_manager):
720        pass
721
722    def run(self):
723        while self.has_next() and not self.done():
724            self.next()
725        self.state = FINISHED
726        self.set_result(True)
727
728    def has_next(self):
729        raise NotImplementedError
730
731    def next(self):
732        raise NotImplementedError
733
734    def _unlocked_create(self, partition, key, value, is_base64_value=False):
735        try:
736            value_json = json.loads(value)
737            if isinstance(value_json, dict):
738                value_json['mutated'] = 0
739            value = json.dumps(value_json)
740        except ValueError:
741            index = random.choice(range(len(value)))
742            if not is_base64_value:
743                value = value[0:index] + random.choice(string.ascii_uppercase) + value[index + 1:]
744        except TypeError:
745            value = json.dumps(value)
746
747        try:
748            self.client.set(key, self.exp, self.flag, value)
749            if self.only_store_hash:
750                value = str(crc32.crc32_hash(value))
751            partition.set(key, value, self.exp, self.flag)
752        except Exception as error:
753            self.state = FINISHED
754            self.set_exception(error)
755
756
757    def _unlocked_read(self, partition, key):
758        try:
759            o, c, d = self.client.get(key)
760        except MemcachedError as error:
761            if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
762                pass
763            else:
764                self.state = FINISHED
765                self.set_exception(error)
766
767    def _unlocked_replica_read(self, partition, key):
768        try:
769            o, c, d = self.client.getr(key)
770        except Exception as error:
771            self.state = FINISHED
772            self.set_exception(error)
773
774    def _unlocked_update(self, partition, key):
775        value = None
776        try:
777            o, c, value = self.client.get(key)
778            if value is None:
779                return
780
781            value_json = json.loads(value)
782            value_json['mutated'] += 1
783            value = json.dumps(value_json)
784        except MemcachedError as error:
785            if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
786                # there is no such item, we do not know what value to set
787                return
788            else:
789                self.state = FINISHED
790                self.log.error("%s, key: %s update operation." % (error, key))
791                self.set_exception(error)
792                return
793        except ValueError:
794            if value is None:
795                return
796            index = random.choice(range(len(value)))
797            value = value[0:index] + random.choice(string.ascii_uppercase) + value[index + 1:]
798        except BaseException as error:
799            self.state = FINISHED
800            self.set_exception(error)
801
802        try:
803            self.client.set(key, self.exp, self.flag, value)
804            if self.only_store_hash:
805                value = str(crc32.crc32_hash(value))
806            partition.set(key, value, self.exp, self.flag)
807        except BaseException as error:
808            self.state = FINISHED
809            self.set_exception(error)
810
811    def _unlocked_delete(self, partition, key):
812        try:
813            self.client.delete(key)
814            partition.delete(key)
815        except MemcachedError as error:
816            if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
817                pass
818            else:
819                self.state = FINISHED
820                self.log.error("%s, key: %s delete operation." % (error, key))
821                self.set_exception(error)
822        except BaseException as error:
823            self.state = FINISHED
824            self.set_exception(error)
825
826    def _unlocked_append(self, partition, key, value):
827        try:
828            o, c, old_value = self.client.get(key)
829            if value is None:
830                return
831            value_json = json.loads(value)
832            old_value_json = json.loads(old_value)
833            old_value_json.update(value_json)
834            old_value = json.dumps(old_value_json)
835            value = json.dumps(value_json)
836        except MemcachedError as error:
837            if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
838                # there is no such item, we do not know what value to set
839                return
840            else:
841                self.state = FINISHED
842                self.set_exception(error)
843                return
844        except ValueError:
845            o, c, old_value = self.client.get(key)
846            index = random.choice(range(len(value)))
847            value = value[0:index] + random.choice(string.ascii_uppercase) + value[index + 1:]
848            old_value += value
849        except BaseException as error:
850            self.state = FINISHED
851            self.set_exception(error)
852
853        try:
854            self.client.append(key, value)
855            if self.only_store_hash:
856                old_value = str(crc32.crc32_hash(old_value))
857            partition.set(key, old_value)
858        except BaseException as error:
859            self.state = FINISHED
860            self.set_exception(error)
861
862
863    # start of batch methods
864    def _create_batch_client(self, key_val, shared_client = None):
865        """
866        standalone method for creating key/values in batch (sans kvstore)
867
868        arguments:
869            key_val -- array of key/value dicts to load size = self.batch_size
870            shared_client -- optional client to use for data loading
871        """
872        try:
873            self._process_values_for_create(key_val)
874            client = shared_client or self.client
875            client.setMulti(self.exp, self.flag, key_val, self.pause, self.timeout, parallel=False)
876        except (MemcachedError, ServerUnavailableException, socket.error, EOFError, AttributeError, RuntimeError) as error:
877            self.state = FINISHED
878            self.set_exception(error)
879
880    def _create_batch(self, partition_keys_dic, key_val):
881            self._create_batch_client(key_val)
882            self._populate_kvstore(partition_keys_dic, key_val)
883
884    def _update_batch(self, partition_keys_dic, key_val):
885        try:
886            self._process_values_for_update(partition_keys_dic, key_val)
887            self.client.setMulti(self.exp, self.flag, key_val, self.pause, self.timeout, parallel=False)
888            self._populate_kvstore(partition_keys_dic, key_val)
889        except (MemcachedError, ServerUnavailableException, socket.error, EOFError, AttributeError, RuntimeError) as error:
890            self.state = FINISHED
891            self.set_exception(error)
892
893
894    def _delete_batch(self, partition_keys_dic, key_val):
895        for partition, keys in partition_keys_dic.items():
896            for key in keys:
897                try:
898                    self.client.delete(key)
899                    partition.delete(key)
900                except MemcachedError as error:
901                    if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
902                        pass
903                    else:
904                        self.state = FINISHED
905                        self.set_exception(error)
906                        return
907                except (ServerUnavailableException, socket.error, EOFError, AttributeError) as error:
908                    self.state = FINISHED
909                    self.set_exception(error)
910
911
912    def _read_batch(self, partition_keys_dic, key_val):
913        try:
914            o, c, d = self.client.getMulti(key_val.keys(), self.pause, self.timeout)
915        except MemcachedError as error:
916                self.state = FINISHED
917                self.set_exception(error)
918
919    def _process_values_for_create(self, key_val):
920        for key, value in key_val.items():
921            try:
922                value_json = json.loads(value)
923                value_json['mutated'] = 0
924                value = json.dumps(value_json)
925            except ValueError:
926                index = random.choice(range(len(value)))
927                value = value[0:index] + random.choice(string.ascii_uppercase) + value[index + 1:]
928            except TypeError:
929                 value = json.dumps(value)
930            finally:
931                key_val[key] = value
932
933    def _process_values_for_update(self, partition_keys_dic, key_val):
934        for partition, keys in partition_keys_dic.items():
935            for key in keys:
936                value = partition.get_valid(key)
937                if value is None:
938                    del key_val[key]
939                    continue
940                try:
941                    value = key_val[key]  # new updated value, however it is not their in orginal code "LoadDocumentsTask"
942                    value_json = json.loads(value)
943                    value_json['mutated'] += 1
944                    value = json.dumps(value_json)
945                except ValueError:
946                    index = random.choice(range(len(value)))
947                    value = value[0:index] + random.choice(string.ascii_uppercase) + value[index + 1:]
948                finally:
949                    key_val[key] = value
950
951
952    def _populate_kvstore(self, partition_keys_dic, key_val):
953        for partition, keys in partition_keys_dic.items():
954            self._populate_kvstore_partition(partition, keys, key_val)
955
956    def _release_locks_on_kvstore(self):
957        for part in self._partitions_keyvals_dic.keys:
958            self.kv_store.release_lock(part)
959
960    def _populate_kvstore_partition(self, partition, keys, key_val):
961        for key in keys:
962            if self.only_store_hash:
963                key_val[key] = str(crc32.crc32_hash(key_val[key]))
964            partition.set(key, key_val[key], self.exp, self.flag)
965
966
967class LoadDocumentsTask(GenericLoadingTask):
968
969    def __init__(self, server, bucket, generator, kv_store, op_type, exp, flag=0,
970                 only_store_hash=True, proxy_client=None, batch_size=1, pause_secs=1, timeout_secs=30,
971                 compression=True):
972        GenericLoadingTask.__init__(self, server, bucket, kv_store, batch_size=batch_size,pause_secs=pause_secs,
973                                    timeout_secs=timeout_secs, compression=compression)
974
975        self.generator = generator
976        self.op_type = op_type
977        self.exp = exp
978        self.flag = flag
979        self.only_store_hash = only_store_hash
980
981        if proxy_client:
982            self.log.info("Changing client to proxy %s:%s..." % (proxy_client.host,
983                                                              proxy_client.port))
984            self.client = proxy_client
985
986    def has_next(self):
987        return self.generator.has_next()
988
989    def next(self, override_generator = None):
990        if self.batch_size == 1:
991            key, value = self.generator.next()
992            partition = self.kv_store.acquire_partition(key)
993            if self.op_type == 'create':
994                is_base64_value = (self.generator.__class__.__name__ == 'Base64Generator')
995                self._unlocked_create(partition, key, value, is_base64_value=is_base64_value)
996            elif self.op_type == 'read':
997                self._unlocked_read(partition, key)
998            elif self.op_type == 'read_replica':
999                self._unlocked_replica_read(partition, key)
1000            elif self.op_type == 'update':
1001                self._unlocked_update(partition, key)
1002            elif self.op_type == 'delete':
1003                self._unlocked_delete(partition, key)
1004            elif self.op_type == 'append':
1005                self._unlocked_append(partition, key, value)
1006            else:
1007                self.state = FINISHED
1008                self.set_exception(Exception("Bad operation type: %s" % self.op_type))
1009            self.kv_store.release_partition(key)
1010
1011        else:
1012            doc_gen = override_generator or self.generator
1013            key_value = doc_gen.next_batch()
1014            partition_keys_dic = self.kv_store.acquire_partitions(key_value.keys())
1015            if self.op_type == 'create':
1016                self._create_batch(partition_keys_dic, key_value)
1017            elif self.op_type == 'update':
1018                self._update_batch(partition_keys_dic, key_value)
1019            elif self.op_type == 'delete':
1020                self._delete_batch(partition_keys_dic, key_value)
1021            elif self.op_type == 'read':
1022                self._read_batch(partition_keys_dic, key_value)
1023            else:
1024                self.state = FINISHED
1025                self.set_exception(Exception("Bad operation type: %s" % self.op_type))
1026            self.kv_store.release_partitions(partition_keys_dic.keys())
1027
1028
1029
1030class LoadDocumentsGeneratorsTask(LoadDocumentsTask):
1031    def __init__(self, server, bucket, generators, kv_store, op_type, exp, flag=0, only_store_hash=True,
1032                 batch_size=1,pause_secs=1, timeout_secs=60, compression=True):
1033        LoadDocumentsTask.__init__(self, server, bucket, generators[0], kv_store, op_type, exp, flag=flag,
1034                    only_store_hash=only_store_hash, batch_size=batch_size, pause_secs=pause_secs,
1035                                   timeout_secs=timeout_secs, compression=compression)
1036
1037        if batch_size == 1:
1038            self.generators = generators
1039        else:
1040            self.generators = []
1041            for i in generators:
1042                self.generators.append(BatchedDocumentGenerator(i, batch_size))
1043
1044        # only run high throughput for batch-create workloads
1045        # also check number of input generators isn't greater than
1046        # process_concurrency as too many generators become inefficient
1047        self.is_high_throughput_mode = False
1048        if ALLOW_HTP and not TestInputSingleton.input.param("disable_HTP", False):
1049            self.is_high_throughput_mode = self.op_type == "create" and \
1050                self.batch_size > 1 and \
1051                len(self.generators) < self.process_concurrency
1052
1053        self.input_generators = generators
1054
1055        self.op_types = None
1056        self.buckets = None
1057        if isinstance(op_type, list):
1058            self.op_types = op_type
1059        if isinstance(bucket, list):
1060            self.buckets = bucket
1061        self.compression = compression
1062
1063    def run(self):
1064        if self.op_types:
1065            if len(self.op_types) != len(self.generators):
1066                self.state = FINISHED
1067                self.set_exception(Exception("not all generators have op_type!"))
1068        if self.buckets:
1069            if len(self.op_types) != len(self.buckets):
1070                self.state = FINISHED
1071                self.set_exception(Exception("not all generators have bucket specified!"))
1072
1073        # check if running in high throughput mode or normal
1074        if self.is_high_throughput_mode:
1075            self.run_high_throughput_mode()
1076        else:
1077            self.run_normal_throughput_mode()
1078
1079        self.state = FINISHED
1080        self.set_result(True)
1081
1082    def run_normal_throughput_mode(self):
1083        iterator = 0
1084        for generator in self.generators:
1085            self.generator = generator
1086            if self.op_types:
1087                self.op_type = self.op_types[iterator]
1088            if self.buckets:
1089                self.bucket = self.buckets[iterator]
1090            while self.has_next() and not self.done():
1091                self.next()
1092            iterator += 1
1093
1094    def run_high_throughput_mode(self):
1095
1096        # high throughput mode requires partitioning the doc generators
1097        self.generators = []
1098        for gen in self.input_generators:
1099            gen_start = int(gen.start)
1100            gen_end = max(int(gen.end), 1)
1101            gen_range = max(int(gen.end/self.process_concurrency), 1)
1102            for pos in range(gen_start, gen_end, gen_range):
1103                partition_gen = copy.deepcopy(gen)
1104                partition_gen.start = pos
1105                partition_gen.itr = pos
1106                partition_gen.end = pos+gen_range
1107                if partition_gen.end > gen.end:
1108                    partition_gen.end = gen.end
1109                batch_gen = BatchedDocumentGenerator(
1110                        partition_gen,
1111                        self.batch_size)
1112                self.generators.append(batch_gen)
1113
1114        iterator = 0
1115        all_processes = []
1116        for generator in self.generators:
1117
1118            # only start processing when there resources available
1119            CONCURRENCY_LOCK.acquire()
1120
1121            generator_process = Process(
1122                target=self.run_generator,
1123                args=(generator, iterator))
1124            generator_process.start()
1125            iterator += 1
1126            all_processes.append(generator_process)
1127
1128            # add child process to wait queue
1129            self.wait_queue.put(iterator)
1130
1131        # wait for all child processes to finish
1132        self.wait_queue.join()
1133
1134        # merge kvstore partitions
1135        while self.shared_kvstore_queue.empty() is False:
1136
1137            # get partitions created by child process
1138            rv =  self.shared_kvstore_queue.get()
1139            if rv["err"] is not None:
1140                raise Exception(rv["err"])
1141
1142            # merge child partitions with parent
1143            generator_partitions = rv["partitions"]
1144            self.kv_store.merge_partitions(generator_partitions)
1145
1146            # terminate child process
1147            iterator-=1
1148            all_processes[iterator].terminate()
1149
1150    def run_generator(self, generator, iterator):
1151
1152        # create a tmp kvstore to track work
1153        tmp_kv_store = KVStore()
1154        rv = {"err": None, "partitions": None}
1155
1156        try:
1157            if CHECK_FLAG:
1158                client = VBucketAwareMemcached(
1159                        RestConnection(self.server),
1160                        self.bucket)
1161            else:
1162                client = VBucketAwareMemcached(
1163                    RestConnection(self.server),
1164                    self.bucket, compression=self.compression)
1165            if self.op_types:
1166                self.op_type = self.op_types[iterator]
1167            if self.buckets:
1168                self.bucket = self.buckets[iterator]
1169
1170            while generator.has_next() and not self.done():
1171
1172                # generate
1173                key_value = generator.next_batch()
1174
1175                # create
1176                self._create_batch_client(key_value, client)
1177
1178                # cache
1179                self.cache_items(tmp_kv_store, key_value)
1180
1181        except Exception as ex:
1182            rv["err"] = ex
1183        else:
1184            rv["partitions"] = tmp_kv_store.get_partitions()
1185        finally:
1186            # share the kvstore from this generator
1187            self.shared_kvstore_queue.put(rv)
1188            self.wait_queue.task_done()
1189            # release concurrency lock
1190            CONCURRENCY_LOCK.release()
1191
1192
1193    def cache_items(self, store, key_value):
1194        """
1195            unpacks keys,values and adds them to provided store
1196        """
1197        for key, value in key_value.iteritems():
1198            if self.only_store_hash:
1199                value = str(crc32.crc32_hash(value))
1200            store.partition(key)["partition"].set(
1201                key,
1202                value,
1203                self.exp,
1204                self.flag)
1205
1206class ESLoadGeneratorTask(Task):
1207    """
1208        Class to load/update/delete documents into/from Elastic Search
1209    """
1210
1211    def __init__(self, es_instance, index_name, generator, op_type="create"):
1212        Task.__init__(self, "ES_loader_task")
1213        self.es_instance = es_instance
1214        self.index_name = index_name
1215        self.generator = generator
1216        self.iterator = 0
1217        self.log.info("Starting to load data into Elastic Search ...")
1218
1219    def check(self, task_manager):
1220        self.state = FINISHED
1221        self.set_result(True)
1222
1223    def execute(self, task_manager):
1224        for key, doc in self.generator:
1225            doc = json.loads(doc)
1226            self.es_instance.load_data(self.index_name,
1227                                       json.dumps(doc, encoding='utf-8'),
1228                                       doc['type'],
1229                                       key)
1230            self.iterator += 1
1231            if math.fmod(self.iterator, 500) == 0.0:
1232                self.log.info("{0} documents loaded into ES".
1233                              format(self.iterator))
1234        self.state = FINISHED
1235        self.set_result(True)
1236
1237class ESBulkLoadGeneratorTask(Task):
1238    """
1239        Class to load/update/delete documents into/from Elastic Search
1240    """
1241
1242    def __init__(self, es_instance, index_name, generator, op_type="create",
1243                 batch=1000):
1244        Task.__init__(self, "ES_loader_task")
1245        self.es_instance = es_instance
1246        self.index_name = index_name
1247        self.generator = generator
1248        self.iterator = 0
1249        self.op_type = op_type
1250        self.batch_size = batch
1251        self.log.info("Starting operation '%s' on Elastic Search ..." % op_type)
1252
1253    def check(self, task_manager):
1254        self.state = FINISHED
1255        self.set_result(True)
1256
1257    def execute(self, task_manager):
1258        es_filename = "/tmp/es_bulk.txt"
1259        es_bulk_docs = []
1260        loaded = 0
1261        batched = 0
1262        for key, doc in self.generator:
1263            doc = json.loads(doc)
1264            es_doc = {
1265                self.op_type: {
1266                    "_index": self.index_name,
1267                    "_type": doc['type'],
1268                    "_id": key,
1269                }
1270            }
1271            es_bulk_docs.append(json.dumps(es_doc))
1272            if self.op_type == "create":
1273                es_bulk_docs.append(json.dumps(doc))
1274            elif self.op_type == "update":
1275                doc['mutated'] += 1
1276                es_bulk_docs.append(json.dumps({"doc": doc}))
1277            batched += 1
1278            if batched == self.batch_size or not self.generator.has_next():
1279                es_file = open(es_filename, "wb")
1280                for line in es_bulk_docs:
1281                    es_file.write("%s\n" %line)
1282                es_file.close()
1283                self.es_instance.load_bulk_data(es_filename)
1284                loaded += batched
1285                self.log.info("{0} documents bulk loaded into ES".format(loaded))
1286                self.es_instance.update_index(self.index_name)
1287                batched = 0
1288        indexed = self.es_instance.get_index_count(self.index_name)
1289        self.log.info("ES index count for '{0}': {1}".
1290                              format(self.index_name, indexed))
1291        self.state = FINISHED
1292        self.set_result(True)
1293
1294
1295class ESRunQueryCompare(Task):
1296    def __init__(self, fts_index, es_instance, query_index, es_index_name=None):
1297        Task.__init__(self, "Query_runner_task")
1298        self.fts_index = fts_index
1299        self.fts_query = fts_index.fts_queries[query_index]
1300        self.es = es_instance
1301        if self.es:
1302            self.es_query = es_instance.es_queries[query_index]
1303        self.max_verify = None
1304        self.show_results = False
1305        self.query_index = query_index
1306        self.passed = True
1307        self.es_index_name = es_index_name or "es_index"
1308
1309    def check(self, task_manager):
1310        self.state = FINISHED
1311        self.set_result(self.result)
1312
1313    def execute(self, task_manager):
1314        self.es_compare = True
1315        try:
1316            self.log.info("---------------------------------------"
1317                          "-------------- Query # %s -------------"
1318                          "---------------------------------------"
1319                          % str(self.query_index+1))
1320            try:
1321                fts_hits, fts_doc_ids, fts_time, fts_status = \
1322                    self.run_fts_query(self.fts_query)
1323                self.log.info("Status: %s" %fts_status)
1324                if fts_hits < 0:
1325                    self.passed = False
1326                elif 'errors' in fts_status.keys() and fts_status['errors']:
1327                        if fts_status['successful'] == 0 and \
1328                                (list(set(fts_status['errors'].values())) ==
1329                                    [u'context deadline exceeded'] or
1330                                list(set(fts_status['errors'].values())) ==
1331                                    [u'TooManyClauses[maxClauseCount is set to 1024]']):
1332                            # too many clauses in the query for fts to process
1333                            self.log.info("FTS chose not to run this big query"
1334                                          "...skipping ES validation")
1335                            self.passed = True
1336                            self.es_compare = False
1337                        elif 0 < fts_status['successful'] < \
1338                                self.fts_index.num_pindexes:
1339                            # partial results
1340                            self.log.info("FTS returned partial results..."
1341                                          "skipping ES validation")
1342                            self.passed = True
1343                            self.es_compare = False
1344                self.log.info("FTS hits for query: %s is %s (took %sms)" % \
1345                        (json.dumps(self.fts_query, ensure_ascii=False),
1346                        fts_hits,
1347                        float(fts_time)/1000000))
1348            except ServerUnavailableException:
1349                self.log.error("ERROR: FTS Query timed out (client timeout=70s)!")
1350                self.passed = False
1351            if self.es and self.es_query:
1352                es_hits, es_doc_ids, es_time = self.run_es_query(self.es_query)
1353                self.log.info("ES hits for query: %s on %s is %s (took %sms)" % \
1354                              (json.dumps(self.es_query,  ensure_ascii=False),
1355                               self.es_index_name,
1356                               es_hits,
1357                               es_time))
1358                if self.passed and self.es_compare:
1359                    if int(es_hits) != int(fts_hits):
1360                        msg = "FAIL: FTS hits: %s, while ES hits: %s"\
1361                              % (fts_hits, es_hits)
1362                        self.log.error(msg)
1363                    es_but_not_fts = list(set(es_doc_ids) - set(fts_doc_ids))
1364                    fts_but_not_es = list(set(fts_doc_ids) - set(es_doc_ids))
1365                    if not (es_but_not_fts or fts_but_not_es):
1366                        self.log.info("SUCCESS: Docs returned by FTS = docs"
1367                                      " returned by ES, doc_ids verified")
1368                    else:
1369                        if fts_but_not_es:
1370                            msg = "FAIL: Following %s doc(s) were not returned" \
1371                                  " by ES,but FTS, printing 50: %s" \
1372                                  % (len(fts_but_not_es), fts_but_not_es[:50])
1373                        else:
1374                            msg = "FAIL: Following %s docs were not returned" \
1375                                  " by FTS, but ES, printing 50: %s" \
1376                                  % (len(es_but_not_fts), es_but_not_fts[:50])
1377                        self.log.error(msg)
1378                        self.passed = False
1379            self.state = CHECKING
1380            task_manager.schedule(self)
1381        except Exception as e:
1382            self.log.error(e)
1383            self.set_exception(e)
1384            self.state = FINISHED
1385
1386    def run_fts_query(self, query):
1387        return self.fts_index.execute_query(query)
1388
1389    def run_es_query(self, query):
1390        return self.es.search(index_name=self.es_index_name, query=query)
1391
1392
1393# This will be obsolete with the implementation of batch operations in LoadDocumentsTaks
1394class BatchedLoadDocumentsTask(GenericLoadingTask):
1395    def __init__(self, server, bucket, generator, kv_store, op_type, exp, flag=0, only_store_hash=True,
1396                 batch_size=100, pause_secs=1, timeout_secs=60, compression=True):
1397        GenericLoadingTask.__init__(self, server, bucket, kv_store, compression=compression)
1398        self.batch_generator = BatchedDocumentGenerator(generator, batch_size)
1399        self.op_type = op_type
1400        self.exp = exp
1401        self.flag = flag
1402        self.only_store_hash = only_store_hash
1403        self.batch_size = batch_size
1404        self.pause = pause_secs
1405        self.timeout = timeout_secs
1406        self.bucket = bucket
1407        self.server = server
1408
1409    def has_next(self):
1410        has = self.batch_generator.has_next()
1411        if math.fmod(self.batch_generator._doc_gen.itr, 50000) == 0.0 or not has:
1412            self.log.info("Batch {0} documents queued #: {1} with exp:{2} @ {3}, bucket {4}".\
1413                          format(self.op_type,
1414                                 (self.batch_generator._doc_gen.itr - self.batch_generator._doc_gen.start),
1415                                 self.exp,
1416                                 self.server.ip,
1417                                 self.bucket))
1418        return has
1419
1420    def next(self):
1421        key_value = self.batch_generator.next_batch()
1422        partition_keys_dic = self.kv_store.acquire_partitions(key_value.keys())
1423        if self.op_type == 'create':
1424            self._create_batch(partition_keys_dic, key_value)
1425        elif self.op_type == 'update':
1426            self._update_batch(partition_keys_dic, key_value)
1427        elif self.op_type == 'delete':
1428            self._delete_batch(partition_keys_dic, key_value)
1429        elif self.op_type == 'read':
1430            self._read_batch(partition_keys_dic, key_value)
1431        else:
1432            self.state = FINISHED
1433            self.set_exception(Exception("Bad operation type: %s" % self.op_type))
1434        self.kv_store.release_partitions(partition_keys_dic.keys())
1435
1436    def _create_batch(self, partition_keys_dic, key_val):
1437        try:
1438            self._process_values_for_create(key_val)
1439            self.client.setMulti(self.exp, self.flag, key_val, self.pause, self.timeout, parallel=False)
1440            self._populate_kvstore(partition_keys_dic, key_val)
1441        except (MemcachedError, ServerUnavailableException, socket.error, EOFError, AttributeError, RuntimeError) as error:
1442            self.state = FINISHED
1443            self.set_exception(error)
1444
1445
1446    def _update_batch(self, partition_keys_dic, key_val):
1447        try:
1448            self._process_values_for_update(partition_keys_dic, key_val)
1449            self.client.setMulti(self.exp, self.flag, key_val, self.pause, self.timeout, parallel=False)
1450            self._populate_kvstore(partition_keys_dic, key_val)
1451        except (MemcachedError, ServerUnavailableException, socket.error, EOFError, AttributeError, RuntimeError) as error:
1452            self.state = FINISHED
1453            self.set_exception(error)
1454
1455
1456    def _delete_batch(self, partition_keys_dic, key_val):
1457        for partition, keys in partition_keys_dic.items():
1458            for key in keys:
1459                try:
1460                    self.client.delete(key)
1461                    partition.delete(key)
1462                except MemcachedError as error:
1463                    if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
1464                        pass
1465                    else:
1466                        self.state = FINISHED
1467                        self.set_exception(error)
1468                        return
1469                except (ServerUnavailableException, socket.error, EOFError, AttributeError) as error:
1470                    self.state = FINISHED
1471                    self.set_exception(error)
1472
1473
1474    def _read_batch(self, partition_keys_dic, key_val):
1475        try:
1476            o, c, d = self.client.getMulti(key_val.keys(), self.pause, self.timeout)
1477        except MemcachedError as error:
1478                self.state = FINISHED
1479                self.set_exception(error)
1480
1481    def _process_values_for_create(self, key_val):
1482        for key, value in key_val.items():
1483            try:
1484                value_json = json.loads(value)
1485                value_json['mutated'] = 0
1486                value = json.dumps(value_json)
1487            except ValueError:
1488                index = random.choice(range(len(value)))
1489                value = value[0:index] + random.choice(string.ascii_uppercase) + value[index + 1:]
1490            finally:
1491                key_val[key] = value
1492
1493    def _process_values_for_update(self, partition_keys_dic, key_val):
1494        for partition, keys in partition_keys_dic.items():
1495            for key in keys:
1496                value = partition.get_valid(key)
1497                if value is None:
1498                    del key_val[key]
1499                    continue
1500                try:
1501                    value = key_val[key]  # new updated value, however it is not their in orginal code "LoadDocumentsTask"
1502                    value_json = json.loads(value)
1503                    value_json['mutated'] += 1
1504                    value = json.dumps(value_json)
1505                except ValueError:
1506                    index = random.choice(range(len(value)))
1507                    value = value[0:index] + random.choice(string.ascii_uppercase) + value[index + 1:]
1508                finally:
1509                    key_val[key] = value
1510
1511
1512    def _populate_kvstore(self, partition_keys_dic, key_val):
1513        for partition, keys in partition_keys_dic.items():
1514            self._populate_kvstore_partition(partition, keys, key_val)
1515
1516    def _release_locks_on_kvstore(self):
1517        for part in self._partitions_keyvals_dic.keys:
1518            self.kv_store.release_lock(part)
1519
1520    def _populate_kvstore_partition(self, partition, keys, key_val):
1521        for key in keys:
1522            if self.only_store_hash:
1523                key_val[key] = str(crc32.crc32_hash(key_val[key]))
1524            partition.set(key, key_val[key], self.exp, self.flag)
1525
1526
1527
1528class WorkloadTask(GenericLoadingTask):
1529    def __init__(self, server, bucket, kv_store, num_ops, create, read, update, delete, exp, compression=True):
1530        GenericLoadingTask.__init__(self, server, bucket, kv_store, compression=compression)
1531        self.itr = 0
1532        self.num_ops = num_ops
1533        self.create = create
1534        self.read = create + read
1535        self.update = create + read + update
1536        self.delete = create + read + update + delete
1537        self.exp = exp
1538
1539    def has_next(self):
1540        if self.num_ops == 0 or self.itr < self.num_ops:
1541            return True
1542        return False
1543
1544    def next(self):
1545        self.itr += 1
1546        rand = random.randint(1, self.delete)
1547        if rand > 0 and rand <= self.create:
1548            self._create_random_key()
1549        elif rand > self.create and rand <= self.read:
1550            self._get_random_key()
1551        elif rand > self.read and rand <= self.update:
1552            self._update_random_key()
1553        elif rand > self.update and rand <= self.delete:
1554            self._delete_random_key()
1555
1556    def _get_random_key(self):
1557        partition, part_num = self.kv_store.acquire_random_partition()
1558        if partition is None:
1559            return
1560
1561        key = partition.get_random_valid_key()
1562        if key is None:
1563            self.kv_store.release_partition(part_num)
1564            return
1565
1566        self._unlocked_read(partition, key)
1567        self.kv_store.release_partition(part_num)
1568
1569    def _create_random_key(self):
1570        partition, part_num = self.kv_store.acquire_random_partition(False)
1571        if partition is None:
1572            return
1573
1574        key = partition.get_random_deleted_key()
1575        if key is None:
1576            self.kv_store.release_partition(part_num)
1577            return
1578
1579        value = partition.get_deleted(key)
1580        if value is None:
1581            self.kv_store.release_partition(part_num)
1582            return
1583
1584        self._unlocked_create(partition, key, value)
1585        self.kv_store.release_partition(part_num)
1586
1587    def _update_random_key(self):
1588        partition, part_num = self.kv_store.acquire_random_partition()
1589        if partition is None:
1590            return
1591
1592        key = partition.get_random_valid_key()
1593        if key is None:
1594            self.kv_store.release_partition(part_num)
1595            return
1596
1597        self._unlocked_update(partition, key)
1598        self.kv_store.release_partition(part_num)
1599
1600    def _delete_random_key(self):
1601        partition, part_num = self.kv_store.acquire_random_partition()
1602        if partition is None:
1603            return
1604
1605        key = partition.get_random_valid_key()
1606        if key is None:
1607            self.kv_store.release_partition(part_num)
1608            return
1609
1610        self._unlocked_delete(partition, key)
1611        self.kv_store.release_partition(part_num)
1612
1613class ValidateDataTask(GenericLoadingTask):
1614    def __init__(self, server, bucket, kv_store, max_verify=None, only_store_hash=True, replica_to_read=None,
1615                 compression=True):
1616        GenericLoadingTask.__init__(self, server, bucket, kv_store, compression=compression)
1617        self.valid_keys, self.deleted_keys = kv_store.key_set()
1618        self.num_valid_keys = len(self.valid_keys)
1619        self.num_deleted_keys = len(self.deleted_keys)
1620        self.itr = 0
1621        self.max_verify = self.num_valid_keys + self.num_deleted_keys
1622        self.only_store_hash = only_store_hash
1623        self.replica_to_read = replica_to_read
1624        if max_verify is not None:
1625            self.max_verify = min(max_verify, self.max_verify)
1626        self.log.info("%s items will be verified on %s bucket" % (self.max_verify, bucket))
1627        self.start_time = time.time()
1628
1629    def has_next(self):
1630        if self.itr < (self.num_valid_keys + self.num_deleted_keys) and\
1631            self.itr < self.max_verify:
1632            if not self.itr % 50000:
1633                self.log.info("{0} items were verified".format(self.itr))
1634            return True
1635        self.log.info("{0} items were verified in {1} sec.the average number of ops\
1636            - {2} per second ".format(self.itr, time.time() - self.start_time,
1637                self.itr / (time.time() - self.start_time)).rstrip())
1638        return False
1639
1640    def next(self):
1641        if self.itr < self.num_valid_keys:
1642            self._check_valid_key(self.valid_keys[self.itr])
1643        else:
1644            self._check_deleted_key(self.deleted_keys[self.itr - self.num_valid_keys])
1645        self.itr += 1
1646
1647    def _check_valid_key(self, key):
1648        partition = self.kv_store.acquire_partition(key)
1649
1650        value = partition.get_valid(key)
1651        flag = partition.get_flag(key)
1652        if value is None or flag is None:
1653            self.kv_store.release_partition(key)
1654            return
1655
1656        try:
1657            if self.replica_to_read is None:
1658                o, c, d = self.client.get(key)
1659            else:
1660                o, c, d = self.client.getr(key, replica_index=self.replica_to_read)
1661            if self.only_store_hash:
1662                if crc32.crc32_hash(d) != int(value):
1663                    self.state = FINISHED
1664                    self.set_exception(Exception('Key: %s, Bad hash result: %d != %d for key %s' % (key, crc32.crc32_hash(d), int(value), key)))
1665            else:
1666                value = json.dumps(value)
1667                if d != json.loads(value):
1668                    self.state = FINISHED
1669                    self.set_exception(Exception('Key: %s, Bad result: %s != %s for key %s' % (key, json.dumps(d), value, key)))
1670            if CHECK_FLAG and o != flag:
1671                self.state = FINISHED
1672                self.set_exception(Exception('Key: %s, Bad result for flag value: %s != the value we set: %s' % (key, o, flag)))
1673
1674        except MemcachedError as error:
1675            if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
1676                pass
1677            else:
1678                self.state = FINISHED
1679                self.set_exception(error)
1680        except Exception as error:
1681            self.log.error("Unexpected error: %s" % str(error))
1682            self.state = FINISHED
1683            self.set_exception(error)
1684        self.kv_store.release_partition(key)
1685
1686    def _check_deleted_key(self, key):
1687        partition = self.kv_store.acquire_partition(key)
1688
1689        try:
1690            self.client.delete(key)
1691            if partition.get_valid(key) is not None:
1692                self.state = FINISHED
1693                self.set_exception(Exception('Not Deletes: %s' % (key)))
1694        except MemcachedError as error:
1695            if error.status == ERR_NOT_FOUND:
1696                pass
1697            else:
1698                self.state = FINISHED
1699                self.set_exception(error)
1700        except Exception as error:
1701            if error.rc != NotFoundError:
1702                self.state = FINISHED
1703                self.set_exception(error)
1704        self.kv_store.release_partition(key)
1705
1706class ValidateDataWithActiveAndReplicaTask(GenericLoadingTask):
1707    def __init__(self, server, bucket, kv_store, max_verify=None, compression=True):
1708        GenericLoadingTask.__init__(self, server, bucket, kv_store, compression=compression)
1709        self.valid_keys, self.deleted_keys = kv_store.key_set()
1710        self.num_valid_keys = len(self.valid_keys)
1711        self.num_deleted_keys = len(self.deleted_keys)
1712        self.itr = 0
1713        self.max_verify = self.num_valid_keys + self.num_deleted_keys
1714        if max_verify is not None:
1715            self.max_verify = min(max_verify, self.max_verify)
1716        self.log.info("%s items will be verified on %s bucket" % (self.max_verify, bucket))
1717        self.start_time = time.time()
1718
1719    def has_next(self):
1720        if self.itr < (self.num_valid_keys + self.num_deleted_keys) and\
1721            self.itr < self.max_verify:
1722            if not self.itr % 50000:
1723                self.log.info("{0} items were verified".format(self.itr))
1724            return True
1725        self.log.info("{0} items were verified in {1} sec.the average number of ops\
1726            - {2} per second ".format(self.itr, time.time() - self.start_time,
1727                self.itr / (time.time() - self.start_time)).rstrip())
1728        return False
1729
1730    def next(self):
1731        if self.itr < self.num_valid_keys:
1732            self._check_valid_key(self.valid_keys[self.itr])
1733        else:
1734            self._check_deleted_key(self.deleted_keys[self.itr - self.num_valid_keys])
1735        self.itr += 1
1736
1737    def _check_valid_key(self, key):
1738        try:
1739            o, c, d = self.client.get(key)
1740            o_r, c_r, d_r = self.client.getr(key, replica_index=0)
1741            if o != o_r:
1742                self.state = FINISHED
1743                self.set_exception(Exception('ACTIVE AND REPLICA FLAG CHECK FAILED :: Key: %s, Bad result for CAS value: REPLICA FLAG %s != ACTIVE FLAG %s' % (key, o_r, o)))
1744            if c != c_r:
1745                self.state = FINISHED
1746                self.set_exception(Exception('ACTIVE AND REPLICA CAS CHECK FAILED :: Key: %s, Bad result for CAS value: REPLICA CAS %s != ACTIVE CAS %s' % (key, c_r, c)))
1747            if d != d_r:
1748                self.state = FINISHED
1749                self.set_exception(Exception('ACTIVE AND REPLICA VALUE CHECK FAILED :: Key: %s, Bad result for Value value: REPLICA VALUE %s != ACTIVE VALUE %s' % (key, d_r, d)))
1750
1751        except MemcachedError as error:
1752            if error.status == ERR_NOT_FOUND and partition.get_valid(key) is None:
1753                pass
1754            else:
1755                self.state = FINISHED
1756                self.set_exception(error)
1757        except Exception as error:
1758            self.log.error("Unexpected error: %s" % str(error))
1759            self.state = FINISHED
1760            self.set_exception(error)
1761
1762    def _check_deleted_key(self, key):
1763        partition = self.kv_store.acquire_partition(key)
1764        try:
1765            self.client.delete(key)
1766            if partition.get_valid(key) is not None:
1767                self.state = FINISHED
1768                self.set_exception(Exception('ACTIVE CHECK :: Not Deletes: %s' % (key)))
1769        except MemcachedError as error:
1770            if error.status == ERR_NOT_FOUND:
1771                pass
1772            else:
1773                self.state = FINISHED
1774                self.set_exception(error)
1775        except Exception as error:
1776            if error.rc != NotFoundError:
1777                self.state = FINISHED
1778                self.set_exception(error)
1779        self.kv_store.release_partition(key)
1780
1781class BatchedValidateDataTask(GenericLoadingTask):
1782    def __init__(self, server, bucket, kv_store, max_verify=None, only_store_hash=True, batch_size=100,
1783                 timeout_sec=5, compression=True):
1784        GenericLoadingTask.__init__(self, server, bucket, kv_store, compression=compression)
1785        self.valid_keys, self.deleted_keys = kv_store.key_set()
1786        self.num_valid_keys = len(self.valid_keys)
1787        self.num_deleted_keys = len(self.deleted_keys)
1788        self.itr = 0
1789        self.max_verify = self.num_valid_keys + self.num_deleted_keys
1790        self.timeout_sec = timeout_sec
1791        self.only_store_hash = only_store_hash
1792        if max_verify is not None:
1793            self.max_verify = min(max_verify, self.max_verify)
1794        self.log.info("%s items will be verified on %s bucket" % (self.max_verify, bucket))
1795        self.batch_size = batch_size
1796        self.start_time = time.time()
1797
1798    def has_next(self):
1799        has = False
1800        if self.itr < (self.num_valid_keys + self.num_deleted_keys) and self.itr < self.max_verify:
1801            has = True
1802        if math.fmod(self.itr, 10000) == 0.0:
1803                self.log.info("{0} items were verified".format(self.itr))
1804        if not has:
1805            self.log.info("{0} items were verified in {1} sec.the average number of ops\
1806                - {2} per second".format(self.itr, time.time() - self.start_time,
1807                self.itr / (time.time() - self.start_time)).rstrip())
1808        return has
1809
1810    def next(self):
1811        if self.itr < self.num_valid_keys:
1812            keys_batch = self.valid_keys[self.itr:self.itr + self.batch_size]
1813            self.itr += len(keys_batch)
1814            self._check_valid_keys(keys_batch)
1815        else:
1816            self._check_deleted_key(self.deleted_keys[self.itr - self.num_valid_keys])
1817            self.itr += 1
1818
1819    def _check_valid_keys(self, keys):
1820        partition_keys_dic = self.kv_store.acquire_partitions(keys)
1821        try:
1822            key_vals = self.client.getMulti(keys, parallel=True, timeout_sec=self.timeout_sec)
1823        except ValueError, error:
1824            self.state = FINISHED
1825            self.kv_store.release_partitions(partition_keys_dic.keys())
1826            self.set_exception(error)
1827            return
1828        except BaseException, error:
1829        # handle all other exception, for instance concurrent.futures._base.TimeoutError
1830            self.state = FINISHED
1831            self.kv_store.release_partitions(partition_keys_dic.keys())
1832            self.set_exception(error)
1833            return
1834        for partition, keys in partition_keys_dic.items():
1835            self._check_validity(partition, keys, key_vals)
1836        self.kv_store.release_partitions(partition_keys_dic.keys())
1837
1838    def _check_validity(self, partition, keys, key_vals):
1839
1840        for key in keys:
1841            value = partition.get_valid(key)
1842            flag = partition.get_flag(key)
1843            if value is None:
1844                continue
1845            try:
1846                o, c, d = key_vals[key]
1847
1848                if self.only_store_hash:
1849                    if crc32.crc32_hash(d) != int(value):
1850                        self.state = FINISHED
1851                        self.set_exception(Exception('Key: %s Bad hash result: %d != %d' % (key, crc32.crc32_hash(d), int(value))))
1852                else:
1853                    #value = json.dumps(value)
1854                    if json.loads(d) != json.loads(value):
1855                        self.state = FINISHED
1856                        self.set_exception(Exception('Key: %s Bad result: %s != %s' % (key, json.dumps(d), value)))
1857                if CHECK_FLAG and o != flag:
1858                    self.state = FINISHED
1859                    self.set_exception(Exception('Key: %s Bad result for flag value: %s != the value we set: %s' % (key, o, flag)))
1860            except KeyError as error:
1861                self.state = FINISHED
1862                self.set_exception(error)
1863
1864    def _check_deleted_key(self, key):
1865        partition = self.kv_store.acquire_partition(key)
1866        try:
1867            self.client.delete(key)
1868            if partition.get_valid(key) is not None:
1869                self.state = FINISHED
1870                self.set_exception(Exception('Not Deletes: %s' % (key)))
1871        except MemcachedError as error:
1872            if error.status == ERR_NOT_FOUND:
1873                pass
1874            else:
1875                self.state = FINISHED
1876                self.kv_store.release_partitions(key)
1877                self.set_exception(error)
1878        except Exception as error:
1879            if error.rc != NotFoundError:
1880                self.state = FINISHED
1881                self.kv_store.release_partitions(key)
1882                self.set_exception(error)
1883        self.kv_store.release_partition(key)
1884
1885
1886class VerifyRevIdTask(GenericLoadingTask):
1887    def __init__(self, src_server, dest_server, bucket, src_kv_store, dest_kv_store, max_err_count=200000,
1888                 max_verify=None, compression=True):
1889        GenericLoadingTask.__init__(self, src_server, bucket, src_kv_store, compression=compression)
1890        from memcached.helper.data_helper import VBucketAwareMemcached as SmartClient
1891        self.client_src = SmartClient(RestConnection(src_server), bucket)
1892        self.client_dest = SmartClient(RestConnection(dest_server), bucket)
1893        self.src_valid_keys, self.src_deleted_keys = src_kv_store.key_set()
1894        self.dest_valid_keys, self.dest_del_keys = dest_kv_store.key_set()
1895        self.num_valid_keys = len(self.src_valid_keys)
1896        self.num_deleted_keys = len(self.src_deleted_keys)
1897        self.keys_not_found = {self.client.rest.ip: [], self.client_dest.rest.ip: []}
1898        if max_verify:
1899            self.max_verify = max_verify
1900        else:
1901            self.max_verify = self.num_valid_keys + self.num_deleted_keys
1902        self.itr = 0
1903        self.not_matching_filter_keys = 0
1904        self.err_count = 0
1905        self.max_err_count = max_err_count
1906        self.src_server = src_server
1907        self.bucket = bucket
1908        self.log.info("RevID verification: in progress for %s ..." % bucket.name)
1909
1910    def has_next(self):
1911        if self.itr < (self.num_valid_keys + self.num_deleted_keys) and \
1912                        self.err_count < self.max_err_count and \
1913                        self.itr < self.max_verify:
1914            return True
1915        self.log.info("RevId Verification : {0} existing items have been verified"
1916                      .format(self.itr if self.itr < self.num_valid_keys else self.num_valid_keys))
1917        self.log.info("RevId Verification : {0} deleted items have been verified"
1918                      .format(self.itr - self.num_valid_keys if self.itr > self.num_valid_keys else 0))
1919        self.log.info("RevId Verification : {0} keys were apparently filtered "
1920                      "out and not found in target bucket"
1921                      .format(self.not_matching_filter_keys))
1922
1923        # if there are missing keys, we would have printed them by now
1924        # check if excess keys are present on server, if yes, set an exception
1925        # TODO : print excess keys
1926        server = RestConnection(self.src_server)
1927        server_count = server.fetch_bucket_stats(bucket=self.bucket.name)["op"]["samples"]["curr_items"][-1]
1928        if server_count > self.num_valid_keys:
1929            self.set_exception(Exception("ERROR: {0} keys present on bucket {1} "
1930                                        "on {2} while kvstore expects only {3}"
1931                                        .format(server_count, self.bucket.name,
1932                                         self.src_server.ip, self.num_valid_keys)))
1933        return False
1934
1935    def next(self):
1936        if self.itr < self.num_valid_keys:
1937            self._check_key_revId(self.src_valid_keys[self.itr])
1938        elif self.itr < (self.num_valid_keys + self.num_deleted_keys):
1939            # verify deleted/expired keys
1940            self._check_key_revId(self.src_deleted_keys[self.itr - self.num_valid_keys],
1941                                  ignore_meta_data=['expiration','cas'])
1942        self.itr += 1
1943
1944        # show progress of verification for every 50k items
1945        if math.fmod(self.itr, 50000) == 0.0:
1946            self.log.info("{0} items have been verified".format(self.itr))
1947
1948
1949    def __get_meta_data(self, client, key):
1950        try:
1951            mc = client.memcached(key)
1952            meta_data = eval("{'deleted': %s, 'flags': %s, 'expiration': %s, 'seqno': %s, 'cas': %s}" % (mc.getMeta(key)))
1953            return meta_data
1954        except MemcachedError as error:
1955            if error.status == ERR_NOT_FOUND:
1956                # if a filter was specified, the key will not be found in
1957                # target kv store if key did not match filter expression
1958                if key not in self.src_deleted_keys and key in (self.dest_valid_keys+self.dest_del_keys):
1959                    self.err_count += 1
1960                    self.keys_not_found[client.rest.ip].append(("key: %s" % key, "vbucket: %s" % client._get_vBucket_id(key)))
1961                else:
1962                    self.not_matching_filter_keys +=1
1963            else:
1964                self.state = FINISHED
1965                self.set_exception(error)
1966        # catch and set all unexpected exceptions
1967        except Exception as e:
1968            self.state = FINISHED
1969            self.set_unexpected_exception(e)
1970
1971    def _check_key_revId(self, key, ignore_meta_data=[]):
1972        src_meta_data = self.__get_meta_data(self.client_src, key)
1973        dest_meta_data = self.__get_meta_data(self.client_dest, key)
1974        if not src_meta_data or not dest_meta_data:
1975            return
1976        prev_error_count = self.err_count
1977        err_msg = []
1978        # seqno number should never be zero
1979        if src_meta_data['seqno'] == 0:
1980            self.err_count += 1
1981            err_msg.append(
1982                "seqno on Source should not be 0, Error Count:{0}".format(self.err_count))
1983
1984        if dest_meta_data['seqno'] == 0:
1985            self.err_count += 1
1986            err_msg.append(
1987                "seqno on Destination should not be 0, Error Count:{0}".format(self.err_count))
1988
1989        # verify all metadata
1990        for meta_key in src_meta_data.keys():
1991            check = True
1992            if meta_key == 'flags' and not CHECK_FLAG:
1993                check = False
1994            if check and src_meta_data[meta_key] != dest_meta_data[meta_key] and meta_key not in ignore_meta_data:
1995                self.err_count += 1
1996                err_msg.append("{0} mismatch: Source {0}:{1}, Destination {0}:{2}, Error Count:{3}"
1997                    .format(meta_key, src_meta_data[meta_key],
1998                        dest_meta_data[meta_key], self.err_count))
1999
2000        if self.err_count - prev_error_count > 0 and self.err_count < 200:
2001            self.log.error("===== Verifying rev_ids failed for key: {0}, bucket:{1} =====".format(key, self.bucket))
2002            [self.log.error(err) for err in err_msg]
2003            self.log.error("Source meta data: %s" % src_meta_data)
2004            self.log.error("Dest meta data: %s" % dest_meta_data)
2005            self.state = FINISHED
2006
2007class VerifyMetaDataTask(GenericLoadingTask):
2008    def __init__(self, dest_server, bucket, kv_store, meta_data_store, max_err_count=100, compression=True):
2009        GenericLoadingTask.__init__(self, dest_server, bucket, kv_store, compression=compression)
2010        from memcached.helper.data_helper import VBucketAwareMemcached as SmartClient
2011        self.client = SmartClient(RestConnection(dest_server), bucket)
2012        self.valid_keys, self.deleted_keys = kv_store.key_set()
2013        self.num_valid_keys = len(self.valid_keys)
2014        self.num_deleted_keys = len(self.deleted_keys)
2015        self.keys_not_found = {self.client.rest.ip: [], self.client.rest.ip: []}
2016        self.itr = 0
2017        self.err_count = 0
2018        self.max_err_count = max_err_count
2019        self.meta_data_store = meta_data_store
2020
2021    def has_next(self):
2022        if self.itr < (self.num_valid_keys + self.num_deleted_keys) and self.err_count < self.max_err_count:
2023            return True
2024        self.log.info("Meta Data Verification : {0} existing items have been verified"
2025                      .format(self.itr if self.itr < self.num_valid_keys else self.num_valid_keys))
2026        self.log.info("Meta Data Verification : {0} deleted items have been verified"
2027                      .format(self.itr - self.num_valid_keys if self.itr > self.num_valid_keys else 0))
2028        return False
2029
2030    def next(self):
2031        if self.itr < self.num_valid_keys:
2032            self._check_key_meta_data(self.valid_keys[self.itr])
2033        elif self.itr < (self.num_valid_keys + self.num_deleted_keys):
2034            # verify deleted/expired keys
2035            self._check_key_meta_data(self.deleted_keys[self.itr - self.num_valid_keys],
2036                                  ignore_meta_data=['expiration'])
2037        self.itr += 1
2038
2039        # show progress of verification for every 50k items
2040        if math.fmod(self.itr, 50000) == 0.0:
2041            self.log.info("{0} items have been verified".format(self.itr))
2042
2043    def __get_meta_data(self, client, key):
2044        try:
2045            mc = client.memcached(key)
2046            meta_data = eval("{'deleted': %s, 'flags': %s, 'expiration': %s, 'seqno': %s, 'cas': %s}" % (mc.getMeta(key)))
2047            return meta_data
2048        except MemcachedError as error:
2049            if error.status == ERR_NOT_FOUND:
2050                if key not in self.deleted_keys:
2051                    self.err_count += 1
2052                    self.keys_not_found[client.rest.ip].append(("key: %s" % key, "vbucket: %s" % client._get_vBucket_id(key)))
2053            else:
2054                self.state = FINISHED
2055                self.set_exception(error)
2056
2057    def _check_key_meta_data(self, key, ignore_meta_data=[]):
2058        src_meta_data = self.meta_data_store[key]
2059        dest_meta_data = self.__get_meta_data(self.client, key)
2060        if not src_meta_data or not dest_meta_data:
2061            return
2062        prev_error_count = self.err_count
2063        err_msg = []
2064        # seqno number should never be zero
2065        if dest_meta_data['seqno'] == 0:
2066            self.err_count += 1
2067            err_msg.append(
2068                "seqno on Destination should not be 0, Error Count:{0}".format(self.err_count))
2069
2070        # verify all metadata
2071        for meta_key in src_meta_data.keys():
2072            if src_meta_data[meta_key] != dest_meta_data[meta_key] and meta_key not in ignore_meta_data:
2073                self.err_count += 1
2074                err_msg.append("{0} mismatch: Source {0}:{1}, Destination {0}:{2}, Error Count:{3}"
2075                    .format(meta_key, src_meta_data[meta_key],
2076                        dest_meta_data[meta_key], self.err_count))
2077
2078        if self.err_count - prev_error_count > 0:
2079            self.log.error("===== Verifying meta data failed for key: {0} =====".format(key))
2080            [self.log.error(err) for err in err_msg]
2081            self.log.error("Source meta data: %s" % src_meta_data)
2082            self.log.error("Dest meta data: %s" % dest_meta_data)
2083            self.state = FINISHED
2084
2085class GetMetaDataTask(GenericLoadingTask):
2086    def __init__(self, dest_server, bucket, kv_store, compression=True):
2087        GenericLoadingTask.__init__(self, dest_server, bucket, kv_store, compression=compression)
2088        from memcached.helper.data_helper import VBucketAwareMemcached as SmartClient
2089        self.client = SmartClient(RestConnection(dest_server), bucket)
2090        self.valid_keys, self.deleted_keys = kv_store.key_set()
2091        self.num_valid_keys = len(self.valid_keys)
2092        self.num_deleted_keys = len(self.deleted_keys)
2093        self.keys_not_found = {self.client.rest.ip: [], self.client.rest.ip: []}
2094        self.itr = 0
2095        self.err_count = 0
2096        self.max_err_count = 100
2097        self.meta_data_store = {}
2098
2099    def has_next(self):
2100        if self.itr < (self.num_valid_keys + self.num_deleted_keys) and self.err_count < self.max_err_count:
2101            return True
2102        self.log.info("Get Meta Data : {0} existing items have been gathered"
2103                      .format(self.itr if self.itr < self.num_valid_keys else self.num_valid_keys))
2104        self.log.info("Get Meta Data : {0} deleted items have been gathered"
2105                      .format(self.itr - self.num_valid_keys if self.itr > self.num_valid_keys else 0))
2106        return False
2107
2108    def next(self):
2109        if self.itr < self.num_valid_keys:
2110            self.meta_data_store[self.valid_keys[self.itr]] = self.__get_meta_data(self.client,self.valid_keys[self.itr])
2111        elif self.itr < (self.num_valid_keys + self.num_deleted_keys):
2112            self.meta_data_store[self.deleted_keys[self.itr - self.num_valid_keys]] = self.__get_meta_data(self.client,self.deleted_keys[self.itr - self.num_valid_keys])
2113        self.itr += 1
2114
2115    def __get_meta_data(self, client, key):
2116        try:
2117            mc = client.memcached(key)
2118            meta_data = eval("{'deleted': %s, 'flags': %s, 'expiration': %s, 'seqno': %s, 'cas': %s}" % (mc.getMeta(key)))
2119            return meta_data
2120        except MemcachedError as error:
2121            if error.status == ERR_NOT_FOUND:
2122                if key not in self.deleted_keys:
2123                    self.err_count += 1
2124                    self.keys_not_found[client.rest.ip].append(("key: %s" % key, "vbucket: %s" % client._get_vBucket_id(key)))
2125            else:
2126                self.state = FINISHED
2127                self.set_exception(error)
2128
2129    def get_meta_data_store(self):
2130        return self.meta_data_store
2131
2132class ViewCreateTask(Task):
2133    def __init__(self, server, design_doc_name, view, bucket="default", with_query=True,
2134                 check_replication=False, ddoc_options=None):
2135        Task.__init__(self, "create_view_task")
2136        self.server = server
2137        self.bucket = bucket
2138        self.view = view
2139        prefix = ""
2140        if self.view:
2141            prefix = ("", "dev_")[self.view.dev_view]
2142        if design_doc_name.find('/') != -1:
2143            design_doc_name = design_doc_name.replace('/', '%2f')
2144        self.design_doc_name = prefix + design_doc_name
2145        self.ddoc_rev_no = 0
2146        self.with_query = with_query
2147        self.check_replication = check_replication
2148        self.ddoc_options = ddoc_options
2149        self.rest = RestConnection(self.server)
2150
2151    def execute(self, task_manager):
2152
2153        try:
2154            # appending view to existing design doc
2155            content, meta = self.rest.get_ddoc(self.bucket, self.design_doc_name)
2156            ddoc = DesignDocument._init_from_json(self.design_doc_name, content)
2157            # if view is to be updated
2158            if self.view:
2159                if self.view.is_spatial:
2160                    ddoc.add_spatial_view(self.view)
2161                else:
2162                    ddoc.add_view(self.view)
2163            self.ddoc_rev_no = self._parse_revision(meta['rev'])
2164        except ReadDocumentException:
2165            # creating first view in design doc
2166            if self.view:
2167                if self.view.is_spatial:
2168                    ddoc = DesignDocument(self.design_doc_name, [], spatial_views=[self.view])
2169                else:
2170                    ddoc = DesignDocument(self.design_doc_name, [self.view])
2171            # create an empty design doc
2172            else:
2173                ddoc = DesignDocument(self.design_doc_name, [])
2174            if self.ddoc_options:
2175                ddoc.options = self.ddoc_options
2176        # catch and set all unexpected exceptions
2177        except Exception as e:
2178            self.state = FINISHED
2179            self.set_unexpected_exception(e)
2180
2181        try:
2182            self.rest.create_design_document(self.bucket, ddoc)
2183            self.state = CHECKING
2184            task_manager.schedule(self)
2185
2186        except DesignDocCreationException as e:
2187            self.state = FINISHED
2188            self.set_exception(e)
2189
2190        # catch and set all unexpected exceptions
2191        except Exception as e:
2192            self.state = FINISHED
2193            self.set_unexpected_exception(e)
2194
2195    def check(self, task_manager):
2196        try:
2197            # only query if the DDoc has a view
2198            if self.view:
2199                if self.with_query:
2200                    query = {"stale" : "ok"}
2201                    if self.view.is_spatial:
2202                        content = \
2203                            self.rest.query_view(self.design_doc_name, self.view.name,
2204                                                 self.bucket, query, type="spatial")
2205                    else:
2206                        content = \
2207                            self.rest.query_view(self.design_doc_name, self.view.name,
2208                                                 self.bucket, query)
2209                else:
2210                     _, json_parsed, _ = self.rest._get_design_doc(self.bucket, self.design_doc_name)
2211                     if self.view.is_spatial:
2212                         if self.view.name not in json_parsed["spatial"].keys():
2213                             self.set_exception(
2214                                Exception("design doc {O} doesn't contain spatial view {1}".format(
2215                                self.design_doc_name, self.view.name)))
2216                     else:
2217                         if self.view.name not in json_parsed["views"].keys():
2218                             self.set_exception(Exception("design doc {O} doesn't contain view {1}".format(
2219                                self.design_doc_name, self.view.name)))
2220                self.log.info("view : {0} was created successfully in ddoc: {1}".format(self.view.name, self.design_doc_name))
2221            else:
2222                # if we have reached here, it means design doc was successfully updated
2223                self.log.info("Design Document : {0} was updated successfully".format(self.design_doc_name))
2224
2225            self.state = FINISHED
2226            if self._check_ddoc_revision():
2227                self.set_result(self.ddoc_rev_no)
2228            else:
2229                self.set_exception(Exception("failed to update design document"))
2230
2231            if self.check_replication:
2232                self._check_ddoc_replication_on_nodes()
2233
2234        except QueryViewException as e:
2235            if e.message.find('not_found') or e.message.find('view_undefined') > -1:
2236                task_manager.schedule(self, 2)
2237            else:
2238                self.state = FINISHED
2239                self.set_unexpected_exception(e)
2240        # catch and set all unexpected exceptions
2241        except Exception as e:
2242            self.state = FINISHED
2243            self.set_unexpected_exception(e)
2244
2245    def _check_ddoc_revision(self):
2246        valid = False
2247        try:
2248            content, meta = self.rest.get_ddoc(self.bucket, self.design_doc_name)
2249            new_rev_id = self._parse_revision(meta['rev'])
2250            if new_rev_id != self.ddoc_rev_no:
2251                self.ddoc_rev_no = new_rev_id
2252                valid = True
2253        except ReadDocumentException:
2254            pass
2255
2256        # catch and set all unexpected exceptions
2257        except Exception as e:
2258            self.state = FINISHED
2259            self.set_unexpected_exception(e)
2260
2261        return valid
2262
2263    def _parse_revision(self, rev_string):
2264        return int(rev_string.split('-')[0])
2265
2266    def _check_ddoc_replication_on_nodes(self):
2267
2268        nodes = self.rest.node_statuses()
2269        retry_count = 3
2270
2271        # nothing to check if there is only 1 node
2272        if len(nodes) <= 1:
2273            return
2274
2275        for node in nodes:
2276            server_info = {"ip" : node.ip,
2277                       "port" : node.port,
2278                       "username" : self.rest.username,
2279                       "password" : self.rest.password}
2280
2281            for count in xrange(retry_count):
2282                try:
2283                    rest_node = RestConnection(server_info)
2284                    content, meta = rest_node.get_ddoc(self.bucket, self.design_doc_name)
2285                    new_rev_id = self._parse_revision(meta['rev'])
2286                    if new_rev_id == self.ddoc_rev_no:
2287                        break
2288                    else:
2289                        self.log.info("Design Doc {0} version is not updated on node {1}:{2}. Retrying.".format(self.design_doc_name, node.ip, node.port))
2290                        time.sleep(2)
2291                except ReadDocumentException as e:
2292                    if(count < retry_count):
2293                        self.log.info("Design Doc {0} not yet available on node {1}:{2}. Retrying.".format(self.design_doc_name, node.ip, node.port))
2294                        time.sleep(2)
2295                    else:
2296                        self.log.error("Design Doc {0} failed to replicate on node {1}:{2}".format(self.design_doc_name, node.ip, node.port))
2297                        self.set_exception(e)
2298                        self.state = FINISHED
2299                        break
2300                except Exception as e:
2301                    if(count < retry_count):
2302                        self.log.info("Unexpected Exception Caught. Retrying.")
2303                        time.sleep(2)
2304                    else:
2305                        self.set_unexpected_exception(e)
2306                        self.state = FINISHED
2307                        break
2308            else:
2309                self.set_exception(Exception("Design Doc {0} version mismatch on node {1}:{2}".format(self.design_doc_name, node.ip, node.port)))
2310
2311
2312class ViewDeleteTask(Task):
2313    def __init__(self, server, design_doc_name, view, bucket="default"):
2314        Task.__init__(self, "delete_view_task")
2315        self.server = server
2316        self.bucket = bucket
2317        self.view = view
2318        prefix = ""
2319        if self.view:
2320            prefix = ("", "dev_")[self.view.dev_view]
2321        self.design_doc_name = prefix + design_doc_name
2322
2323    def execute(self, task_manager):
2324        try:
2325            rest = RestConnection(self.server)
2326            if self.view:
2327                # remove view from existing design doc
2328                content, header = rest.get_ddoc(self.bucket, self.design_doc_name)
2329                ddoc = DesignDocument._init_from_json(self.design_doc_name, content)
2330                if self.view.is_spatial:
2331                    status = ddoc.delete_spatial(self.view)
2332                else:
2333                    status = ddoc.delete_view(self.view)
2334                if not status:
2335                    self.state = FINISHED
2336                    self.set_exception(Exception('View does not exist! %s' % (self.view.name)))
2337
2338                # update design doc
2339                rest.create_design_document(self.bucket, ddoc)
2340                self.state = CHECKING
2341                task_manager.schedule(self, 2)
2342            else:
2343                # delete the design doc
2344                rest.delete_view(self.bucket, self.design_doc_name)
2345                self.log.info("Design Doc : {0} was successfully deleted".format(self.design_doc_name))
2346                self.state = FINISHED
2347                self.set_result(True)
2348
2349        except (ValueError, ReadDocumentException, DesignDocCreationException) as e:
2350            self.state = FINISHED
2351            self.set_exception(e)
2352
2353        # catch and set all unexpected exceptions
2354        except Exception as e:
2355            self.state = FINISHED
2356            self.set_unexpected_exception(e)
2357
2358    def check(self, task_manager):
2359        try:
2360            rest = RestConnection(self.server)
2361            # make sure view was deleted
2362            query = {"stale" : "ok"}
2363            content = \
2364                rest.query_view(self.design_doc_name, self.view.name, self.bucket, query)
2365            self.state = FINISHED
2366            self.set_resu