1import unittest
2import time
3import copy
4import logger
5import logging
6import re
7import urllib
8
9from couchbase_helper.cluster import Cluster
10from membase.api.rest_client import RestConnection, Bucket
11from membase.api.exception import ServerUnavailableException
12from remote.remote_util import RemoteMachineShellConnection
13from remote.remote_util import RemoteUtilHelper
14from testconstants import STANDARD_BUCKET_PORT
15from couchbase_helper.document import View
16from membase.helper.cluster_helper import ClusterOperationHelper
17from couchbase_helper.stats_tools import StatsCommon
18from membase.helper.bucket_helper import BucketOperationHelper
19from memcached.helper.data_helper import MemcachedClientHelper
20from TestInput import TestInputSingleton
21from scripts.collect_server_info import cbcollectRunner
22from scripts import collect_data_files
23from tasks.future import TimeoutError
24
25from couchbase_helper.documentgenerator import BlobGenerator, DocumentGenerator
26from lib.membase.api.exception import XDCRException
27from security.auditmain import audit
28from security.rbac_base import RbacBase
29
30
31class RenameNodeException(XDCRException):
32
33    """Exception thrown when converting ip to hostname failed
34    """
35
36    def __init__(self, msg=''):
37        XDCRException.__init__(self, msg)
38
39
40class RebalanceNotStopException(XDCRException):
41
42    """Exception thrown when stopping rebalance failed
43    """
44
45    def __init__(self, msg=''):
46        XDCRException.__init__(self, msg)
47
48
49def raise_if(cond, ex):
50    """Raise Exception if condition is True
51    """
52    if cond:
53        raise ex
54
55
56class TOPOLOGY:
57    CHAIN = "chain"
58    STAR = "star"
59    RING = "ring"
60    HYBRID = "hybrid"
61
62
63class REPLICATION_DIRECTION:
64    UNIDIRECTION = "unidirection"
65    BIDIRECTION = "bidirection"
66
67
68class REPLICATION_TYPE:
69    CONTINUOUS = "continuous"
70
71
72class REPLICATION_PROTOCOL:
73    CAPI = "capi"
74    XMEM = "xmem"
75
76
77class INPUT:
78    REPLICATION_DIRECTION = "rdirection"
79    CLUSTER_TOPOLOGY = "ctopology"
80    SEED_DATA = "sdata"
81    SEED_DATA_MODE = "sdata_mode"
82    SEED_DATA_OPERATION = "sdata_op"
83    POLL_INTERVAL = "poll_interval"  # in seconds
84    POLL_TIMEOUT = "poll_timeout"  # in seconds
85    SEED_DATA_MODE_SYNC = "sync"
86
87
88class OPS:
89    CREATE = "create"
90    UPDATE = "update"
91    DELETE = "delete"
92    APPEND = "append"
93
94
95class EVICTION_POLICY:
96    VALUE_ONLY = "valueOnly"
97    FULL_EVICTION = "fullEviction"
98    NO_EVICTION = "noEviction"
99    NRU_EVICTION = "nruEviction"
100    CB = [VALUE_ONLY, FULL_EVICTION]
101    EPH = [NO_EVICTION, NRU_EVICTION]
102
103class BUCKET_PRIORITY:
104    HIGH = "high"
105
106
107class BUCKET_NAME:
108    DEFAULT = "default"
109
110
111class OS:
112    WINDOWS = "windows"
113    LINUX = "linux"
114    OSX = "osx"
115
116
117class COMMAND:
118    SHUTDOWN = "shutdown"
119    REBOOT = "reboot"
120
121
122class STATE:
123    RUNNING = "running"
124
125
126class REPL_PARAM:
127    FAILURE_RESTART = "failureRestartInterval"
128    CHECKPOINT_INTERVAL = "checkpointInterval"
129    OPTIMISTIC_THRESHOLD = "optimisticReplicationThreshold"
130    FILTER_EXP = "filterExpression"
131    SOURCE_NOZZLES = "sourceNozzlePerNode"
132    TARGET_NOZZLES = "targetNozzlePerNode"
133    BATCH_COUNT = "workerBatchSize"
134    BATCH_SIZE = "docBatchSizeKb"
135    LOG_LEVEL = "logLevel"
136    MAX_REPLICATION_LAG = "maxExpectedReplicationLag"
137    TIMEOUT_PERC = "timeoutPercentageCap"
138    PAUSE_REQUESTED = "pauseRequested"
139
140
141class TEST_XDCR_PARAM:
142    FAILURE_RESTART = "failure_restart_interval"
143    CHECKPOINT_INTERVAL = "checkpoint_interval"
144    OPTIMISTIC_THRESHOLD = "optimistic_threshold"
145    FILTER_EXP = "filter_expression"
146    SOURCE_NOZZLES = "source_nozzles"
147    TARGET_NOZZLES = "target_nozzles"
148    BATCH_COUNT = "batch_count"
149    BATCH_SIZE = "batch_size"
150    LOG_LEVEL = "log_level"
151    MAX_REPLICATION_LAG = "max_replication_lag"
152    TIMEOUT_PERC = "timeout_percentage"
153
154    @staticmethod
155    def get_test_to_create_repl_param_map():
156        return {
157            TEST_XDCR_PARAM.FAILURE_RESTART: REPL_PARAM.FAILURE_RESTART,
158            TEST_XDCR_PARAM.CHECKPOINT_INTERVAL: REPL_PARAM.CHECKPOINT_INTERVAL,
159            TEST_XDCR_PARAM.OPTIMISTIC_THRESHOLD: REPL_PARAM.OPTIMISTIC_THRESHOLD,
160            TEST_XDCR_PARAM.FILTER_EXP: REPL_PARAM.FILTER_EXP,
161            TEST_XDCR_PARAM.SOURCE_NOZZLES: REPL_PARAM.SOURCE_NOZZLES,
162            TEST_XDCR_PARAM.TARGET_NOZZLES: REPL_PARAM.TARGET_NOZZLES,
163            TEST_XDCR_PARAM.BATCH_COUNT: REPL_PARAM.BATCH_COUNT,
164            TEST_XDCR_PARAM.BATCH_SIZE: REPL_PARAM.BATCH_SIZE,
165            TEST_XDCR_PARAM.MAX_REPLICATION_LAG: REPL_PARAM.MAX_REPLICATION_LAG,
166            TEST_XDCR_PARAM.TIMEOUT_PERC: REPL_PARAM.TIMEOUT_PERC,
167            TEST_XDCR_PARAM.LOG_LEVEL: REPL_PARAM.LOG_LEVEL
168        }
169
170
171class XDCR_PARAM:
172    # Per-replication params (input)
173    XDCR_FAILURE_RESTART = "xdcrFailureRestartInterval"
174    XDCR_CHECKPOINT_INTERVAL = "xdcrCheckpointInterval"
175    XDCR_OPTIMISTIC_THRESHOLD = "xdcrOptimisticReplicationThreshold"
176    XDCR_FILTER_EXP = "xdcrFilterExpression"
177    XDCR_SOURCE_NOZZLES = "xdcrSourceNozzlePerNode"
178    XDCR_TARGET_NOZZLES = "xdcrTargetNozzlePerNode"
179    XDCR_BATCH_COUNT = "xdcrWorkerBatchSize"
180    XDCR_BATCH_SIZE = "xdcrDocBatchSizeKb"
181    XDCR_LOG_LEVEL = "xdcrLogLevel"
182    XDCR_MAX_REPLICATION_LAG = "xdcrMaxExpectedReplicationLag"
183    XDCR_TIMEOUT_PERC = "xdcrTimeoutPercentageCap"
184
185
186class CHECK_AUDIT_EVENT:
187    CHECK = False
188
189# Event Definition:
190# https://github.com/couchbase/goxdcr/blob/master/etc/audit_descriptor.json
191
192class GO_XDCR_AUDIT_EVENT_ID:
193    CREATE_CLUSTER = 16384
194    MOD_CLUSTER = 16385
195    RM_CLUSTER = 16386
196    CREATE_REPL = 16387
197    PAUSE_REPL = 16388
198    RESUME_REPL = 16389
199    CAN_REPL = 16390
200    DEFAULT_SETT = 16391
201    IND_SETT = 16392
202
203
204class NodeHelper:
205    _log = logger.Logger.get_logger()
206
207    @staticmethod
208    def disable_firewall(
209            server, rep_direction=REPLICATION_DIRECTION.UNIDIRECTION):
210        """Disable firewall to put restriction to replicate items in XDCR.
211        @param server: server object to disable firewall
212        @param rep_direction: replication direction unidirection/bidirection
213        """
214        shell = RemoteMachineShellConnection(server)
215        shell.info = shell.extract_remote_info()
216
217        if shell.info.type.lower() == "windows":
218            output, error = shell.execute_command('netsh advfirewall set publicprofile state off')
219            shell.log_command_output(output, error)
220            output, error = shell.execute_command('netsh advfirewall set privateprofile state off')
221            shell.log_command_output(output, error)
222            # for details see RemoteUtilHelper.enable_firewall for windows
223            output, error = shell.execute_command('netsh advfirewall firewall delete rule name="block erl.exe in"')
224            shell.log_command_output(output, error)
225            output, error = shell.execute_command('netsh advfirewall firewall delete rule name="block erl.exe out"')
226            shell.log_command_output(output, error)
227        else:
228            o, r = shell.execute_command("iptables -F")
229            shell.log_command_output(o, r)
230            o, r = shell.execute_command(
231                "/sbin/iptables -A INPUT -p tcp -i eth0 --dport 1000:65535 -j ACCEPT")
232            shell.log_command_output(o, r)
233            if rep_direction == REPLICATION_DIRECTION.BIDIRECTION:
234                o, r = shell.execute_command(
235                    "/sbin/iptables -A OUTPUT -p tcp -o eth0 --dport 1000:65535 -j ACCEPT")
236                shell.log_command_output(o, r)
237            o, r = shell.execute_command(
238                "/sbin/iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT")
239            shell.log_command_output(o, r)
240            # self.log.info("enabled firewall on {0}".format(server))
241            o, r = shell.execute_command("/sbin/iptables --list")
242            shell.log_command_output(o, r)
243        shell.disconnect()
244
245    @staticmethod
246    def reboot_server(server, test_case, wait_timeout=60):
247        """Reboot a server and wait for couchbase server to run.
248        @param server: server object, which needs to be rebooted.
249        @param test_case: test case object, since it has assert() function
250                        which is used by wait_for_ns_servers_or_assert
251                        to throw assertion.
252        @param wait_timeout: timeout to whole reboot operation.
253        """
254        # self.log.info("Rebooting server '{0}'....".format(server.ip))
255        shell = RemoteMachineShellConnection(server)
256        if shell.extract_remote_info().type.lower() == OS.WINDOWS:
257            o, r = shell.execute_command(
258                "{0} -r -f -t 0".format(COMMAND.SHUTDOWN))
259        elif shell.extract_remote_info().type.lower() == OS.LINUX:
260            o, r = shell.execute_command(COMMAND.REBOOT)
261        shell.log_command_output(o, r)
262        # wait for restart and warmup on all server
263        time.sleep(wait_timeout * 5)
264        # disable firewall on these nodes
265        NodeHelper.disable_firewall(server)
266        # wait till server is ready after warmup
267        ClusterOperationHelper.wait_for_ns_servers_or_assert(
268            [server],
269            test_case,
270            wait_if_warmup=True)
271
272    @staticmethod
273    def enable_firewall(
274            server, rep_direction=REPLICATION_DIRECTION.UNIDIRECTION):
275        """Enable firewall
276        @param server: server object to enable firewall
277        @param rep_direction: replication direction unidirection/bidirection
278        """
279        is_bidirectional = rep_direction == REPLICATION_DIRECTION.BIDIRECTION
280        RemoteUtilHelper.enable_firewall(
281            server,
282            bidirectional=is_bidirectional,
283            xdcr=True)
284
285    @staticmethod
286    def do_a_warm_up(server):
287        """Warmp up server
288        """
289        shell = RemoteMachineShellConnection(server)
290        shell.stop_couchbase()
291        time.sleep(5)
292        shell.start_couchbase()
293        shell.disconnect()
294
295    @staticmethod
296    def wait_service_started(server, wait_time=120):
297        """Function will wait for Couchbase service to be in
298        running phase.
299        """
300        shell = RemoteMachineShellConnection(server)
301        os_type = shell.extract_remote_info().distribution_type
302        if os_type.lower() == 'windows':
303            cmd = "sc query CouchbaseServer | grep STATE"
304        else:
305            cmd = "service couchbase-server status"
306        now = time.time()
307        while time.time() - now < wait_time:
308            output, _ = shell.execute_command(cmd)
309            if str(output).lower().find("running") != -1:
310                # self.log.info("Couchbase service is running")
311                return
312            time.sleep(10)
313        raise Exception(
314            "Couchbase service is not running after {0} seconds".format(
315                wait_time))
316
317    @staticmethod
318    def wait_warmup_completed(warmupnodes, bucket_names=["default"]):
319        if isinstance(bucket_names, str):
320            bucket_names = [bucket_names]
321        start = time.time()
322        for server in warmupnodes:
323            for bucket in bucket_names:
324                while time.time() - start < 150:
325                    try:
326                        mc = MemcachedClientHelper.direct_client(server, bucket)
327                        if mc.stats()["ep_warmup_thread"] == "complete":
328                            NodeHelper._log.info(
329                                "Warmed up: %s items on %s on %s" %
330                                (mc.stats("warmup")["ep_warmup_key_count"], bucket, server))
331                            time.sleep(10)
332                            break
333                        elif mc.stats()["ep_warmup_thread"] == "running":
334                            NodeHelper._log.info(
335                                "Still warming up .. ep_warmup_key_count : %s" % (mc.stats("warmup")["ep_warmup_key_count"]))
336                            continue
337                        else:
338                            NodeHelper._log.info(
339                                "Value of ep_warmup_thread does not exist, exiting from this server")
340                            break
341                    except Exception as e:
342                        NodeHelper._log.info(e)
343                        time.sleep(10)
344                if mc.stats()["ep_warmup_thread"] == "running":
345                    NodeHelper._log.info(
346                            "ERROR: ep_warmup_thread's status not complete")
347                mc.close()
348
349
350    @staticmethod
351    def wait_node_restarted(
352            server, test_case, wait_time=120, wait_if_warmup=False,
353            check_service=False):
354        """Wait server to be re-started
355        """
356        now = time.time()
357        if check_service:
358            NodeHelper.wait_service_started(server, wait_time)
359            wait_time = now + wait_time - time.time()
360        num = 0
361        while num < wait_time / 10:
362            try:
363                ClusterOperationHelper.wait_for_ns_servers_or_assert(
364                    [server], test_case, wait_time=wait_time - num * 10,
365                    wait_if_warmup=wait_if_warmup)
366                break
367            except ServerUnavailableException:
368                num += 1
369                time.sleep(10)
370
371    @staticmethod
372    def kill_erlang(server):
373        """Kill erlang process running on server.
374        """
375        NodeHelper._log.info("Killing erlang on server: {0}".format(server))
376        shell = RemoteMachineShellConnection(server)
377        os_info = shell.extract_remote_info()
378        shell.kill_erlang(os_info)
379        shell.disconnect()
380
381    @staticmethod
382    def kill_memcached(server):
383        """Kill memcached process running on server.
384        """
385        shell = RemoteMachineShellConnection(server)
386        shell.kill_erlang()
387        shell.disconnect()
388
389    @staticmethod
390    def get_goxdcr_log_dir(node):
391        """Gets couchbase log directory, even for cluster_run
392        """
393        _, dir = RestConnection(node).diag_eval('filename:absname(element(2, application:get_env(ns_server,error_logger_mf_dir))).')
394        return str(dir)
395
396    @staticmethod
397    def check_goxdcr_log(server, str, goxdcr_log=None, print_matches=None, log_name=None, timeout=0):
398        """ Checks if a string 'str' is present in 'log_name' on 'server'
399            and returns the number of occurances
400            @param goxdcr_log: goxdcr log location on the server
401            @timeout: search every 10 seconds until timeout
402        """
403        if not log_name:
404            log_name = "goxdcr.log"
405        if not goxdcr_log:
406            goxdcr_log = NodeHelper.get_goxdcr_log_dir(server)\
407                     + '/' + log_name + '*'
408        shell = RemoteMachineShellConnection(server)
409        info = shell.extract_remote_info().type.lower()
410        if info == "windows":
411            cmd = "grep "
412        else:
413            cmd = "zgrep "
414        cmd += "\"{0}\" {1}".format(str, goxdcr_log)
415        iter = 0
416        count = 0
417        matches = []
418        # Search 5 times with a break of timeout sec
419        while iter < 5:
420            matches, err = shell.execute_command(cmd)
421            count = len(matches)
422            if count > 0 or timeout == 0:
423                break
424            else:
425                time.sleep(timeout)
426                NodeHelper._log.info("Waiting {0}s for {1} to appear in {2} ..".format(timeout, str, log_name))
427            iter += 1
428        shell.disconnect()
429        NodeHelper._log.info(count)
430        if print_matches:
431            NodeHelper._log.info(matches)
432            return matches, count
433        return count
434
435    @staticmethod
436    def rename_nodes(servers):
437        """Rename server name from ip to their hostname
438        @param servers: list of server objects.
439        @return: dictionary whose key is server and value is hostname
440        """
441        hostnames = {}
442        for server in servers:
443            shell = RemoteMachineShellConnection(server)
444            try:
445                if "ec2" in str(server.ip):
446                    hostname = shell.get_aws_public_hostname()
447                elif "azure" in str(server.ip):
448                    hostname = str(server.ip)
449                else:
450                    hostname = shell.get_full_hostname()
451                rest = RestConnection(server)
452                renamed, content = rest.rename_node(
453                    hostname, username=server.rest_username,
454                    password=server.rest_password)
455                raise_if(
456                    not renamed,
457                    RenameNodeException(
458                        "Server %s is not renamed! Hostname %s. Error %s" % (
459                            server, hostname, content)
460                    )
461                )
462                hostnames[server] = hostname
463                server.hostname = hostname
464            finally:
465                shell.disconnect()
466        return hostnames
467
468    # Returns version like "x.x.x" after removing build number
469    @staticmethod
470    def get_cb_version(node):
471        rest = RestConnection(node)
472        version = rest.get_nodes_self().version
473        return version[:version.rfind('-')]
474
475    @staticmethod
476    def set_wall_clock_time(node, date_str):
477        shell = RemoteMachineShellConnection(node)
478        # os_info = shell.extract_remote_info()
479        # if os_info == OS.LINUX:
480        # date command works on linux and windows cygwin as well.
481        shell.execute_command(
482            "sudo date -s '%s'" %
483            time.ctime(
484                date_str.tx_time))
485        # elif os_info == OS.WINDOWS:
486        #    raise "NEED To SETUP DATE COMMAND FOR WINDOWS"
487
488    @staticmethod
489    def get_cbcollect_info(server):
490        """Collect cbcollectinfo logs for all the servers in the cluster.
491        """
492        path = TestInputSingleton.input.param("logs_folder", "/tmp")
493        print "grabbing cbcollect from {0}".format(server.ip)
494        path = path or "."
495        try:
496            cbcollectRunner(server, path).run()
497            TestInputSingleton.input.test_params[
498                "get-cbcollect-info"] = False
499        except Exception as e:
500            NodeHelper._log.error(
501                "IMPOSSIBLE TO GRAB CBCOLLECT FROM {0}: {1}".format(
502                    server.ip,
503                    e))
504    @staticmethod
505    def collect_data_files(server):
506        """Collect bucket data files for all the servers in the cluster.
507        Data files are collected only if data is not verified on the cluster.
508        """
509        path = TestInputSingleton.input.param("logs_folder", "/tmp")
510        collect_data_files.cbdatacollectRunner(server, path).run()
511
512    @staticmethod
513    def collect_logs(server, cluster_run=False):
514        """Grab cbcollect before we cleanup
515        """
516        NodeHelper.get_cbcollect_info(server)
517        if not cluster_run:
518            NodeHelper.collect_data_files(server)
519
520class ValidateAuditEvent:
521
522    @staticmethod
523    def validate_audit_event(event_id, master_node, expected_results):
524        if CHECK_AUDIT_EVENT.CHECK:
525            audit_obj = audit(event_id, master_node)
526            field_verified, value_verified = audit_obj.validateEvents(
527                expected_results)
528            raise_if(
529                not field_verified,
530                XDCRException("One of the fields is not matching"))
531            raise_if(
532                not value_verified,
533                XDCRException("Values for one of the fields is not matching"))
534
535
536class FloatingServers:
537
538    """Keep Track of free servers, For Rebalance-in
539    or swap-rebalance operations.
540    """
541    _serverlist = []
542
543
544class XDCRRemoteClusterRef:
545
546    """Class keep the information related to Remote Cluster References.
547    """
548
549    def __init__(self, src_cluster, dest_cluster, name, encryption=False, replicator_target_role=False):
550        """
551        @param src_cluster: source couchbase cluster object.
552        @param dest_cluster: destination couchbase cluster object:
553        @param name: remote cluster reference name.
554        @param encryption: True to enable SSL encryption for replication else
555                        False
556        """
557        self.__src_cluster = src_cluster
558        self.__dest_cluster = dest_cluster
559        self.__name = name
560        self.__encryption = encryption
561        self.__rest_info = {}
562        self.__replicator_target_role = replicator_target_role
563        self.__use_scramsha = TestInputSingleton.input.param("use_scramsha", False)
564
565        # List of XDCReplication objects
566        self.__replications = []
567
568    def __str__(self):
569        return "{0} -> {1}, Name: {2}".format(
570            self.__src_cluster.get_name(), self.__dest_cluster.get_name(),
571            self.__name)
572
573    def get_cb_clusters(self):
574        return self.__cb_clusters
575
576    def get_src_cluster(self):
577        return self.__src_cluster
578
579    def get_dest_cluster(self):
580        return self.__dest_cluster
581
582    def get_name(self):
583        return self.__name
584
585    def get_replications(self):
586        return self.__replications
587
588    def get_rest_info(self):
589        return self.__rest_info
590
591    def get_replication_for_bucket(self, bucket):
592        for replication in self.__replications:
593            if replication.get_src_bucket().name == bucket.name:
594                return replication
595        return None
596
597    def __get_event_expected_results(self):
598        expected_results = {
599                "real_userid:source": "ns_server",
600                "real_userid:user": self.__src_cluster.get_master_node().rest_username,
601                "cluster_name": self.__name,
602                "cluster_hostname": "%s:%s" % (self.__dest_cluster.get_master_node().ip, self.__dest_cluster.get_master_node().port),
603                "is_encrypted": self.__encryption,
604                "encryption_type": ""}
605
606        return expected_results
607
608    def __validate_create_event(self):
609            ValidateAuditEvent.validate_audit_event(
610                GO_XDCR_AUDIT_EVENT_ID.CREATE_CLUSTER,
611                self.__src_cluster.get_master_node(),
612                self.__get_event_expected_results())
613
614    def add(self):
615        """create cluster reference- add remote cluster
616        """
617        rest_conn_src = RestConnection(self.__src_cluster.get_master_node())
618        certificate = ""
619        dest_master = self.__dest_cluster.get_master_node()
620        if self.__encryption:
621            rest_conn_dest = RestConnection(dest_master)
622            certificate = rest_conn_dest.get_cluster_ceritificate()
623
624        if self.__replicator_target_role:
625            self.dest_user = "replicator_user"
626            self.dest_pass = "password"
627        else:
628            self.dest_user = dest_master.rest_username
629            self.dest_pass = dest_master.rest_password
630
631        if not self.__use_scramsha:
632            self.__rest_info = rest_conn_src.add_remote_cluster(
633                dest_master.ip, dest_master.port,
634                self.dest_user,
635                self.dest_pass, self.__name,
636                demandEncryption=self.__encryption,
637                certificate=certificate)
638        else:
639            print "Using scram-sha authentication"
640            self.__rest_info = rest_conn_src.add_remote_cluster(
641                dest_master.ip, dest_master.port,
642                self.dest_user,
643                self.dest_pass, self.__name,
644                demandEncryption=self.__encryption,
645                encryptionType="half"
646            )
647
648        self.__validate_create_event()
649
650    def __validate_modify_event(self):
651        ValidateAuditEvent.validate_audit_event(
652            GO_XDCR_AUDIT_EVENT_ID.MOD_CLUSTER,
653            self.__src_cluster.get_master_node(),
654            self.__get_event_expected_results())
655
656    def use_scram_sha_auth(self):
657        self.__use_scramsha = True
658        self.__encryption = True
659        self.modify()
660
661    def modify(self, encryption=True):
662        """Modify cluster reference to enable SSL encryption
663        """
664        dest_master = self.__dest_cluster.get_master_node()
665        rest_conn_src = RestConnection(self.__src_cluster.get_master_node())
666        certificate = ""
667        if encryption:
668            rest_conn_dest = RestConnection(dest_master)
669            if not self.__use_scramsha:
670                certificate = rest_conn_dest.get_cluster_ceritificate()
671                self.__rest_info = rest_conn_src.modify_remote_cluster(
672                    dest_master.ip, dest_master.port,
673                    self.dest_user,
674                    self.dest_pass, self.__name,
675                    demandEncryption=encryption,
676                    certificate=certificate)
677            else:
678                print "Using scram-sha authentication"
679                self.__rest_info = rest_conn_src.modify_remote_cluster(
680                    dest_master.ip, dest_master.port,
681                    self.dest_user,
682                    self.dest_pass, self.__name,
683                    demandEncryption=encryption,
684                    encryptionType="half")
685        self.__encryption = encryption
686        self.__validate_modify_event()
687
688    def __validate_remove_event(self):
689        ValidateAuditEvent.validate_audit_event(
690            GO_XDCR_AUDIT_EVENT_ID.RM_CLUSTER,
691            self.__src_cluster.get_master_node(),
692            self.__get_event_expected_results())
693
694
695    def remove(self):
696        RestConnection(
697            self.__src_cluster.get_master_node()).remove_remote_cluster(
698            self.__name)
699        self.__validate_remove_event()
700
701    def create_replication(
702            self, fromBucket,
703            rep_type=REPLICATION_PROTOCOL.XMEM,
704            toBucket=None):
705        """Create replication objects, but replication will not get
706        started here.
707        """
708        self.__replications.append(
709            XDCReplication(
710                self,
711                fromBucket,
712                rep_type,
713                toBucket))
714
715    def clear_all_replications(self):
716        self.__replications = []
717
718    def start_all_replications(self):
719        """Start all created replication
720        """
721        [repl.start() for repl in self.__replications]
722
723    def pause_all_replications(self, verify=False):
724        """Pause all created replication
725        """
726        [repl.pause(verify=verify) for repl in self.__replications]
727
728    def pause_all_replications_by_id(self, verify=False):
729        [repl.pause(repl_id=repl.get_repl_id(), verify=verify) for repl in self.__replications]
730
731    def resume_all_replications(self, verify=False):
732        """Resume all created replication
733        """
734        [repl.resume(verify=verify) for repl in self.__replications]
735
736    def resume_all_replications_by_id(self, verify=False):
737        [repl.resume(repl_id=repl.get_repl_id(), verify=verify) for repl in self.__replications]
738
739    def stop_all_replications(self):
740        rest = RestConnection(self.__src_cluster.get_master_node())
741        rest_all_repls = rest.get_replications()
742        for repl in self.__replications:
743            for rest_all_repl in rest_all_repls:
744                if repl.get_repl_id() == rest_all_repl['id']:
745                    repl.cancel(rest, rest_all_repl)
746        self.clear_all_replications()
747
748
749class XDCReplication:
750
751    def __init__(self, remote_cluster_ref, from_bucket, rep_type, to_bucket):
752        """
753        @param remote_cluster_ref: XDCRRemoteClusterRef object
754        @param from_bucket: Source bucket (Bucket object)
755        @param rep_type: replication protocol REPLICATION_PROTOCOL.CAPI/XMEM
756        @param to_bucket: Destination bucket (Bucket object)
757        """
758        self.__input = TestInputSingleton.input
759        self.__remote_cluster_ref = remote_cluster_ref
760        self.__from_bucket = from_bucket
761        self.__to_bucket = to_bucket or from_bucket
762        self.__src_cluster = self.__remote_cluster_ref.get_src_cluster()
763        self.__dest_cluster = self.__remote_cluster_ref.get_dest_cluster()
764        self.__src_cluster_name = self.__src_cluster.get_name()
765        self.__rep_type = rep_type
766        self.__test_xdcr_params = {}
767        self.__updated_params = {}
768
769        self.__parse_test_xdcr_params()
770        self.log = logger.Logger.get_logger()
771
772        # Response from REST API
773        self.__rep_id = None
774
775    def __str__(self):
776        return "Replication {0}:{1} -> {2}:{3}".format(
777            self.__src_cluster.get_name(),
778            self.__from_bucket.name, self.__dest_cluster.get_name(),
779            self.__to_bucket.name)
780
781    # get per replication params specified as from_bucket@cluster_name=
782    # eg. default@C1="xdcrFilterExpression:loadOne,xdcrCheckpointInterval:60,
783    # xdcrFailureRestartInterval:20"
784    def __parse_test_xdcr_params(self):
785        param_str = self.__input.param(
786            "%s@%s" %
787            (self.__from_bucket, self.__src_cluster_name), None)
788        if param_str:
789            argument_split = re.split('[:,]', param_str)
790            self.__test_xdcr_params.update(
791                dict(zip(argument_split[::2], argument_split[1::2]))
792            )
793
794    def __convert_test_to_xdcr_params(self):
795        xdcr_params = {}
796        xdcr_param_map = TEST_XDCR_PARAM.get_test_to_create_repl_param_map()
797        for test_param, value in self.__test_xdcr_params.iteritems():
798            xdcr_params[xdcr_param_map[test_param]] = value
799        return xdcr_params
800
801    def get_filter_exp(self):
802        if TEST_XDCR_PARAM.FILTER_EXP in self.__test_xdcr_params:
803            return self.__test_xdcr_params[TEST_XDCR_PARAM.FILTER_EXP]
804        return None
805
806    def get_src_bucket(self):
807        return self.__from_bucket
808
809    def get_dest_bucket(self):
810        return self.__to_bucket
811
812    def get_src_cluster(self):
813        return self.__src_cluster
814
815    def get_dest_cluster(self):
816        return self.__dest_cluster
817
818    def get_repl_id(self):
819        return self.__rep_id
820
821    def __get_event_expected_results(self):
822        expected_results = {
823                "real_userid:source": "ns_server",
824                "real_userid:user": self.__src_cluster.get_master_node().rest_username,
825                "local_cluster_name": "%s:%s" % (self.__src_cluster.get_master_node().ip, self.__src_cluster.get_master_node().port),
826                "source_bucket_name": self.__from_bucket.name,
827                "remote_cluster_name": self.__remote_cluster_ref.get_name(),
828                "target_bucket_name": self.__to_bucket.name
829            }
830        # optional audit param
831        if self.get_filter_exp():
832            expected_results["filter_expression"] = self.get_filter_exp()
833        return expected_results
834
835    def __validate_update_repl_event(self):
836        expected_results = {
837            "settings": {
838                "continuous": 'true',
839                "target": self.__to_bucket.name,
840                "source": self.__from_bucket.name,
841                "type": "xdc-%s" % self.__rep_type
842            },
843            "id": self.__rep_id,
844            "real_userid:source": "ns_server",
845            "real_userid:user": self.__src_cluster.get_master_node().rest_username,
846        }
847        expected_results["settings"].update(self.__updated_params)
848
849    def __validate_set_param_event(self):
850        expected_results = self.__get_event_expected_results()
851        expected_results["updated_settings"] = self.__updated_params
852        ValidateAuditEvent.validate_audit_event(
853            GO_XDCR_AUDIT_EVENT_ID.IND_SETT,
854            self.get_src_cluster().get_master_node(), expected_results)
855
856    def get_xdcr_setting(self, param):
857        """Get a replication setting value
858        """
859        src_master = self.__src_cluster.get_master_node()
860        return RestConnection(src_master).get_xdcr_param(
861                    self.__from_bucket.name,
862                    self.__to_bucket.name,
863                    param)
864
865    def set_xdcr_param(self, param, value, verify_event=True):
866        """Set a replication setting to a value
867        """
868        src_master = self.__src_cluster.get_master_node()
869        RestConnection(src_master).set_xdcr_param(
870            self.__from_bucket.name,
871            self.__to_bucket.name,
872            param,
873            value)
874        self.log.info("Updated {0}={1} on bucket'{2}' on {3}".format(param, value, self.__from_bucket.name,
875                                                                     self.__src_cluster.get_master_node().ip))
876        self.__updated_params[param] = value
877        if verify_event:
878            self.__validate_set_param_event()
879
880    def __validate_start_audit_event(self):
881        ValidateAuditEvent.validate_audit_event(
882            GO_XDCR_AUDIT_EVENT_ID.CREATE_REPL,
883                self.get_src_cluster().get_master_node(),
884                self.__get_event_expected_results())
885
886    def start(self):
887        """Start replication"""
888        src_master = self.__src_cluster.get_master_node()
889        rest_conn_src = RestConnection(src_master)
890        self.__rep_id = rest_conn_src.start_replication(
891            REPLICATION_TYPE.CONTINUOUS,
892            self.__from_bucket,
893            self.__remote_cluster_ref.get_name(),
894            rep_type=self.__rep_type,
895            toBucket=self.__to_bucket,
896            xdcr_params=self.__convert_test_to_xdcr_params())
897        self.__validate_start_audit_event()
898        #if within this 10s for pipeline updater if we try to create another replication, it doesn't work until the previous pipeline is updated.
899        # but better to have this 10s sleep between replications.
900        time.sleep(10)
901
902    def __verify_pause(self):
903        """Verify if replication is paused"""
904        src_master = self.__src_cluster.get_master_node()
905        # Is bucket replication paused?
906        if not RestConnection(src_master).is_replication_paused(
907                self.__from_bucket.name,
908                self.__to_bucket.name):
909            raise XDCRException(
910                "XDCR is not paused for SrcBucket: {0}, Target Bucket: {1}".
911                format(self.__from_bucket.name,
912                       self.__to_bucket.name))
913
914    def __validate_pause_event(self):
915        ValidateAuditEvent.validate_audit_event(
916            GO_XDCR_AUDIT_EVENT_ID.PAUSE_REPL,
917            self.get_src_cluster().get_master_node(),
918            self.__get_event_expected_results())
919
920    def pause(self, repl_id=None, verify=False):
921        """Pause replication"""
922        src_master = self.__src_cluster.get_master_node()
923        if repl_id:
924            if not RestConnection(src_master).is_replication_paused_by_id(repl_id):
925                RestConnection(src_master).pause_resume_repl_by_id(repl_id, REPL_PARAM.PAUSE_REQUESTED, 'true')
926        else:
927            if not RestConnection(src_master).is_replication_paused(
928                    self.__from_bucket.name, self.__to_bucket.name):
929                self.set_xdcr_param(
930                    REPL_PARAM.PAUSE_REQUESTED,
931                    'true',
932                    verify_event=False)
933
934        self.__validate_pause_event()
935
936        if verify:
937            self.__verify_pause()
938
939    def _is_cluster_replicating(self):
940        count = 0
941        src_master = self.__src_cluster.get_master_node()
942        while count < 3:
943            outbound_mutations = self.__src_cluster.get_xdcr_stat(
944                self.__from_bucket.name,
945                'replication_changes_left')
946            if outbound_mutations == 0:
947                self.log.info(
948                    "Outbound mutations on {0} is {1}".format(
949                        src_master.ip,
950                        outbound_mutations))
951                count += 1
952                continue
953            else:
954                self.log.info(
955                    "Outbound mutations on {0} is {1}".format(
956                        src_master.ip,
957                        outbound_mutations))
958                self.log.info("Node {0} is replicating".format(src_master.ip))
959                break
960        else:
961            self.log.info(
962                "Outbound mutations on {0} is {1}".format(
963                    src_master.ip,
964                    outbound_mutations))
965            self.log.info(
966                "Cluster with node {0} is not replicating".format(
967                    src_master.ip))
968            return False
969        return True
970
971    def __verify_resume(self):
972        """Verify if replication is resumed"""
973        src_master = self.__src_cluster.get_master_node()
974        # Is bucket replication paused?
975        if RestConnection(src_master).is_replication_paused(self.__from_bucket.name,
976                                                            self.__to_bucket.name):
977            raise XDCRException(
978                "Replication is not resumed for SrcBucket: {0}, \
979                Target Bucket: {1}".format(self.__from_bucket, self.__to_bucket))
980
981        if not self._is_cluster_replicating():
982            self.log.info("XDCR completed on {0}".format(src_master.ip))
983
984    def __validate_resume_event(self):
985        ValidateAuditEvent.validate_audit_event(
986            GO_XDCR_AUDIT_EVENT_ID.RESUME_REPL,
987            self.get_src_cluster().get_master_node(),
988            self.__get_event_expected_results())
989
990    def resume(self, repl_id=None, verify=False):
991        """Resume replication if paused"""
992        src_master = self.__src_cluster.get_master_node()
993        if repl_id:
994            if RestConnection(src_master).is_replication_paused_by_id(repl_id):
995                RestConnection(src_master).pause_resume_repl_by_id(repl_id, REPL_PARAM.PAUSE_REQUESTED, 'false')
996        else:
997            if RestConnection(src_master).is_replication_paused(
998                    self.__from_bucket.name, self.__to_bucket.name):
999                self.set_xdcr_param(
1000                REPL_PARAM.PAUSE_REQUESTED,
1001                'false',
1002                verify_event=False)
1003
1004        self.__validate_resume_event()
1005
1006        if verify:
1007            self.__verify_resume()
1008
1009    def __validate_cancel_event(self):
1010        ValidateAuditEvent.validate_audit_event(
1011            GO_XDCR_AUDIT_EVENT_ID.CAN_REPL,
1012            self.get_src_cluster().get_master_node(),
1013            self.__get_event_expected_results())
1014
1015
1016    def cancel(self, rest, rest_all_repl):
1017        rest.stop_replication(rest_all_repl["cancelURI"])
1018        self.__validate_cancel_event()
1019
1020
1021class CouchbaseCluster:
1022
1023    def __init__(self, name, nodes, log, use_hostname=False, sdk_compression=True):
1024        """
1025        @param name: Couchbase cluster name. e.g C1, C2 to distinguish in logs.
1026        @param nodes: list of server objects (read from ini file).
1027        @param log: logger object to print logs.
1028        @param use_hostname: True if use node's hostname rather ip to access
1029                        node else False.
1030        """
1031        self.__name = name
1032        self.__nodes = nodes
1033        self.__log = log
1034        self.__mem_quota = 0
1035        self.__use_hostname = use_hostname
1036        self.__master_node = nodes[0]
1037        self.__design_docs = []
1038        self.__buckets = []
1039        self.__hostnames = {}
1040        self.__fail_over_nodes = []
1041        self.__data_verified = True
1042        self.__meta_data_verified = True
1043        self.__remote_clusters = []
1044        self.__clusterop = Cluster()
1045        self.__kv_gen = {}
1046        self.sdk_compression = sdk_compression
1047
1048    def __str__(self):
1049        return "Couchbase Cluster: %s, Master Ip: %s" % (
1050            self.__name, self.__master_node.ip)
1051
1052    def __stop_rebalance(self):
1053        rest = RestConnection(self.__master_node)
1054        if rest._rebalance_progress_status() == 'running':
1055            self.__log.warning(
1056                "rebalancing is still running, test should be verified")
1057            stopped = rest.stop_rebalance()
1058            raise_if(
1059                not stopped,
1060                RebalanceNotStopException("unable to stop rebalance"))
1061
1062    def __init_nodes(self, disabled_consistent_view=None):
1063        """Initialize all nodes. Rename node to hostname
1064        if needed by test.
1065        """
1066        tasks = []
1067        for node in self.__nodes:
1068            tasks.append(
1069                self.__clusterop.async_init_node(
1070                    node,
1071                    disabled_consistent_view))
1072        for task in tasks:
1073            mem_quota = task.result()
1074            if mem_quota < self.__mem_quota or self.__mem_quota == 0:
1075                self.__mem_quota = mem_quota
1076        if self.__use_hostname:
1077            self.__hostnames.update(NodeHelper.rename_nodes(self.__nodes))
1078
1079    def get_host_names(self):
1080        return self.__hostnames
1081
1082    def get_master_node(self):
1083        return self.__master_node
1084
1085    def get_mem_quota(self):
1086        return self.__mem_quota
1087
1088    def get_remote_clusters(self):
1089        return self.__remote_clusters
1090
1091    def clear_all_remote_clusters(self):
1092        self.__remote_clusters = []
1093
1094    def get_nodes(self):
1095        return self.__nodes
1096
1097    def get_name(self):
1098        return self.__name
1099
1100    def get_cluster(self):
1101        return self.__clusterop
1102
1103    def get_kv_gen(self):
1104        raise_if(
1105            self.__kv_gen is None,
1106            XDCRException(
1107                "KV store is empty on couchbase cluster: %s" %
1108                self))
1109        return self.__kv_gen
1110
1111    def get_remote_cluster_ref_by_name(self, cluster_name):
1112        for remote_cluster_ref in self.__remote_clusters:
1113            if remote_cluster_ref.get_name() == cluster_name:
1114                return remote_cluster_ref
1115        self.__log.info("Remote cluster reference by name %s does not exist on %s"
1116                        % (cluster_name, self.__name))
1117        return None
1118
1119    def init_cluster(self, disabled_consistent_view=None):
1120        """Initialize cluster.
1121        1. Initialize all nodes.
1122        2. Add all nodes to the cluster.
1123        3. Enable xdcr trace logs to easy debug for xdcr items mismatch issues.
1124        """
1125        master = RestConnection(self.__master_node)
1126        self.enable_diag_eval_on_non_local_hosts(self.__master_node)
1127        is_master_sherlock_or_greater = master.is_cluster_compat_mode_greater_than(3.1)
1128        self.__init_nodes(disabled_consistent_view)
1129        self.__clusterop.async_rebalance(
1130            self.__nodes,
1131            self.__nodes[1:],
1132            [],
1133            use_hostnames=self.__use_hostname).result()
1134
1135        if CHECK_AUDIT_EVENT.CHECK:
1136            if master.is_enterprise_edition() and is_master_sherlock_or_greater:
1137                # enable audit by default in all goxdcr tests
1138                audit_obj = audit(host=self.__master_node)
1139                status = audit_obj.getAuditStatus()
1140                self.__log.info("Audit status on {0} is {1}".
1141                            format(self.__name, status))
1142                if not status:
1143                    self.__log.info("Enabling audit ...")
1144                    audit_obj.setAuditEnable('true')
1145
1146    def enable_diag_eval_on_non_local_hosts(self, master):
1147        """
1148        Enable diag/eval to be run on non-local hosts.
1149        :param master: Node information of the master node of the cluster
1150        :return: Nothing
1151        """
1152        remote = RemoteMachineShellConnection(master)
1153        output, error = remote.enable_diag_eval_on_non_local_hosts()
1154        if output is not None:
1155            if "ok" not in output:
1156                self.__log.error("Error in enabling diag/eval on non-local hosts on {}".format(master.ip))
1157                raise Exception("Error in enabling diag/eval on non-local hosts on {}".format(master.ip))
1158            else:
1159                self.__log.info(
1160                    "Enabled diag/eval for non-local hosts from {}".format(
1161                        master.ip))
1162        else:
1163            self.__log.info("Running in compatibility mode, not enabled diag/eval for non-local hosts")
1164
1165    def _create_bucket_params(self, server, replicas=1, size=0, port=11211, password=None,
1166                             bucket_type=None, enable_replica_index=1, eviction_policy='valueOnly',
1167                             bucket_priority=None, flush_enabled=1, lww=False, maxttl=None):
1168        """Create a set of bucket_parameters to be sent to all of the bucket_creation methods
1169        Parameters:
1170            server - The server to create the bucket on. (TestInputServer)
1171            bucket_name - The name of the bucket to be created. (String)
1172            port - The port to create this bucket on. (String)
1173            password - The password for this bucket. (String)
1174            size - The size of the bucket to be created. (int)
1175            enable_replica_index - can be 0 or 1, 1 enables indexing of replica bucket data (int)
1176            replicas - The number of replicas for this bucket. (int)
1177            eviction_policy - The eviction policy for the bucket, can be valueOnly or fullEviction. (String)
1178            bucket_priority - The priority of the bucket:either none, low, or high. (String)
1179            bucket_type - The type of bucket. (String)
1180            flushEnabled - Enable or Disable the flush functionality of the bucket. (int)
1181            lww = determine the conflict resolution type of the bucket. (Boolean)
1182
1183        Returns:
1184            bucket_params - A dictionary containing the parameters needed to create a bucket."""
1185
1186        bucket_params = {}
1187        bucket_params['server'] = server
1188        bucket_params['replicas'] = replicas
1189        bucket_params['size'] = size
1190        bucket_params['port'] = port
1191        bucket_params['password'] = password
1192        bucket_type = TestInputSingleton.input.param("bucket_type", "membase")
1193        bucket_params['bucket_type'] = bucket_type
1194        bucket_params['enable_replica_index'] = enable_replica_index
1195        if bucket_type == "ephemeral":
1196            if eviction_policy in EVICTION_POLICY.EPH:
1197                bucket_params['eviction_policy'] = eviction_policy
1198            else:
1199                bucket_params['eviction_policy'] = EVICTION_POLICY.NRU_EVICTION
1200            if eviction_policy == EVICTION_POLICY.NRU_EVICTION:
1201                if "6.0.2-" in NodeHelper.get_cb_version(server):
1202                    self.set_internal_setting("AllowSourceNRUCreation", "true")
1203        else:
1204            if eviction_policy in EVICTION_POLICY.CB:
1205                bucket_params['eviction_policy'] = eviction_policy
1206            else:
1207                bucket_params['eviction_policy'] = EVICTION_POLICY.VALUE_ONLY
1208        bucket_params['bucket_priority'] = bucket_priority
1209        bucket_params['flush_enabled'] = flush_enabled
1210        bucket_params['lww'] = lww
1211        bucket_params['maxTTL'] = maxttl
1212        return bucket_params
1213
1214    def set_global_checkpt_interval(self, value):
1215        self.set_xdcr_param("checkpointInterval",value)
1216
1217    def set_internal_setting(self, param, value):
1218        output, _ = RemoteMachineShellConnection(self.__master_node).execute_command_raw(
1219            "curl -X GET http://Administrator:password@localhost:9998/xdcr/internalSettings")
1220        if not '"' + param + '":' + value in output:
1221            RemoteMachineShellConnection(self.__master_node).execute_command_raw(
1222                "curl http://Administrator:password@localhost:9998/xdcr/internalSettings -X POST -d " +
1223                urllib.quote_plus(param +'="' + value + '"'))
1224
1225    def __remove_all_remote_clusters(self):
1226        rest_remote_clusters = RestConnection(
1227            self.__master_node).get_remote_clusters()
1228        for remote_cluster_ref in self.__remote_clusters:
1229            for rest_remote_cluster in rest_remote_clusters:
1230                if remote_cluster_ref.get_name() == rest_remote_cluster[
1231                        'name']:
1232                    if not rest_remote_cluster.get('deleted', False):
1233                        remote_cluster_ref.remove()
1234        self.__remote_clusters = []
1235
1236    def __remove_all_replications(self):
1237        for remote_cluster_ref in self.__remote_clusters:
1238            remote_cluster_ref.stop_all_replications()
1239
1240    def cleanup_cluster(
1241            self,
1242            test_case,
1243            from_rest=False,
1244            cluster_shutdown=True):
1245        """Cleanup cluster.
1246        1. Remove all remote cluster references.
1247        2. Remove all replications.
1248        3. Remove all buckets.
1249        @param test_case: Test case object.
1250        @param test_failed: True if test failed else False.
1251        @param cluster_run: True if test execution is single node cluster run else False.
1252        @param cluster_shutdown: True if Task (task.py) Scheduler needs to shutdown else False
1253        """
1254        try:
1255            self.__log.info("removing xdcr/nodes settings")
1256            rest = RestConnection(self.__master_node)
1257            if from_rest:
1258                rest.remove_all_replications()
1259                rest.remove_all_remote_clusters()
1260            else:
1261                self.__remove_all_replications()
1262                self.__remove_all_remote_clusters()
1263            rest.remove_all_recoveries()
1264            self.__stop_rebalance()
1265            self.__log.info("cleanup {0}".format(self.__nodes))
1266            for node in self.__nodes:
1267                BucketOperationHelper.delete_all_buckets_or_assert(
1268                    [node],
1269                    test_case)
1270                force_eject = TestInputSingleton.input.param(
1271                    "forceEject",
1272                    False)
1273                if force_eject and node != self.__master_node:
1274                    try:
1275                        rest = RestConnection(node)
1276                        rest.force_eject_node()
1277                    except BaseException as e:
1278                        self.__log.error(e)
1279                else:
1280                    ClusterOperationHelper.cleanup_cluster([node])
1281                ClusterOperationHelper.wait_for_ns_servers_or_assert(
1282                    [node],
1283                    test_case)
1284        finally:
1285            if cluster_shutdown:
1286                self.__clusterop.shutdown(force=True)
1287
1288    def create_sasl_buckets(
1289            self, bucket_size, num_buckets=1, num_replicas=1,
1290            eviction_policy=EVICTION_POLICY.VALUE_ONLY,
1291            bucket_priority=BUCKET_PRIORITY.HIGH, lww=False,
1292            maxttl=None):
1293        """Create sasl buckets.
1294        @param bucket_size: size of the bucket.
1295        @param num_buckets: number of buckets to create.
1296        @param num_replicas: number of replicas (1-3).
1297        @param eviction_policy: valueOnly etc.
1298        @param bucket_priority: high/low etc.
1299        @param lww: conflict_resolution_type.
1300        """
1301        bucket_tasks = []
1302        for i in range(num_buckets):
1303            name = "sasl_bucket_" + str(i + 1)
1304            sasl_params = self._create_bucket_params(server=self.__master_node, password='password',
1305                                                     size=bucket_size, replicas=num_replicas,
1306                                                     eviction_policy=eviction_policy,
1307                                                     bucket_priority=bucket_priority,
1308                                                     lww=lww, maxttl=maxttl)
1309            bucket_tasks.append(self.__clusterop.async_create_sasl_bucket(name=name,password='password',
1310                                                                          bucket_params=sasl_params))
1311
1312            self.__buckets.append(
1313                Bucket(
1314                    name=name, authType="sasl", saslPassword="password",
1315                    num_replicas=num_replicas, bucket_size=bucket_size,
1316                    eviction_policy=eviction_policy,
1317                    bucket_priority=bucket_priority,
1318                    lww=lww,
1319                    maxttl=maxttl
1320                ))
1321
1322        for task in bucket_tasks:
1323            task.result()
1324
1325    def create_standard_buckets(
1326            self, bucket_size, num_buckets=1, num_replicas=1,
1327            eviction_policy=EVICTION_POLICY.VALUE_ONLY,
1328            bucket_priority=BUCKET_PRIORITY.HIGH, lww=False, maxttl=None):
1329        """Create standard buckets.
1330        @param bucket_size: size of the bucket.
1331        @param num_buckets: number of buckets to create.
1332        @param num_replicas: number of replicas (1-3).
1333        @param eviction_policy: valueOnly etc.
1334        @param bucket_priority: high/low etc.
1335        @param lww: conflict_resolution_type.
1336        """
1337        bucket_tasks = []
1338        for i in range(num_buckets):
1339            name = "standard_bucket_" + str(i + 1)
1340            standard_params = self._create_bucket_params(
1341                server=self.__master_node,
1342                size=bucket_size,
1343                replicas=num_replicas,
1344                eviction_policy=eviction_policy,
1345                bucket_priority=bucket_priority,
1346                lww=lww,
1347                maxttl=maxttl)
1348
1349            bucket_tasks.append(self.__clusterop.async_create_standard_bucket(name=name, port=STANDARD_BUCKET_PORT+i,
1350                                                                              bucket_params=standard_params))
1351
1352
1353            self.__buckets.append(
1354                Bucket(
1355                    name=name,
1356                    authType=None,
1357                    saslPassword=None,
1358                    num_replicas=num_replicas,
1359                    bucket_size=bucket_size,
1360                    port=STANDARD_BUCKET_PORT + i,
1361                    eviction_policy=eviction_policy,
1362                    bucket_priority=bucket_priority,
1363                    lww=lww,
1364                    maxttl=maxttl
1365                ))
1366
1367        for task in bucket_tasks:
1368            task.result()
1369
1370    def create_default_bucket(
1371            self, bucket_size, num_replicas=1,
1372            eviction_policy=EVICTION_POLICY.VALUE_ONLY,
1373            bucket_priority=BUCKET_PRIORITY.HIGH, lww=False,
1374            maxttl=None):
1375        """Create default bucket.
1376        @param bucket_size: size of the bucket.
1377        @param num_replicas: number of replicas (1-3).
1378        @param eviction_policy: valueOnly etc.
1379        @param bucket_priority: high/low etc.
1380        @param lww: conflict_resolution_type.
1381        """
1382        bucket_params=self._create_bucket_params(
1383            server=self.__master_node,
1384            size=bucket_size,
1385            replicas=num_replicas,
1386            eviction_policy=eviction_policy,
1387            bucket_priority=bucket_priority,
1388            lww=lww,
1389            maxttl=maxttl)
1390
1391        self.__clusterop.create_default_bucket(bucket_params)
1392        self.__buckets.append(
1393            Bucket(
1394                name=BUCKET_NAME.DEFAULT,
1395                authType="sasl",
1396                saslPassword="",
1397                num_replicas=num_replicas,
1398                bucket_size=bucket_size,
1399                eviction_policy=eviction_policy,
1400                bucket_priority=bucket_priority,
1401                lww=lww,
1402                maxttl=maxttl
1403            ))
1404
1405    def get_buckets(self):
1406        return self.__buckets
1407
1408    def add_bucket(self, bucket='',
1409                   ramQuotaMB=1,
1410                   authType='none',
1411                   saslPassword='',
1412                   replicaNumber=1,
1413                   proxyPort=11211,
1414                   bucketType='membase',
1415                   evictionPolicy='valueOnly'):
1416        self.__buckets.append(Bucket(bucket_size=ramQuotaMB, name=bucket, authType=authType,
1417                                     saslPassword=saslPassword, num_replicas=replicaNumber,
1418                                     port=proxyPort, type=bucketType, eviction_policy=evictionPolicy))
1419
1420    def get_bucket_by_name(self, bucket_name):
1421        """Return the bucket with given name
1422        @param bucket_name: bucket name.
1423        @return: bucket object
1424        """
1425        for bucket in self.__buckets:
1426            if bucket.name == bucket_name:
1427                return bucket
1428
1429        raise Exception(
1430            "Bucket with name: %s not found on the cluster" %
1431            bucket_name)
1432
1433    def delete_bucket(self, bucket_name):
1434        """Delete bucket with given name
1435        @param bucket_name: bucket name (string) to delete
1436        """
1437        bucket_to_remove = self.get_bucket_by_name(bucket_name)
1438        self.__clusterop.bucket_delete(
1439            self.__master_node,
1440            bucket_to_remove.name)
1441        self.__buckets.remove(bucket_to_remove)
1442
1443    def delete_all_buckets(self):
1444        for bucket_to_remove in self.__buckets:
1445            self.__clusterop.bucket_delete(
1446                self.__master_node,
1447                bucket_to_remove.name)
1448            self.__buckets.remove(bucket_to_remove)
1449
1450    def flush_buckets(self, buckets=[]):
1451        buckets = buckets or self.__buckets
1452        tasks = []
1453        for bucket in buckets:
1454            tasks.append(self.__clusterop.async_bucket_flush(
1455                self.__master_node,
1456                bucket))
1457        [task.result() for task in tasks]
1458
1459    def async_load_bucket(self, bucket, num_items, value_size=512, exp=0,
1460                          kv_store=1, flag=0, only_store_hash=True,
1461                          batch_size=1000, pause_secs=1, timeout_secs=30):
1462        """Load data asynchronously on given bucket. Function don't wait for
1463        load data to finish, return immidiately.
1464        @param bucket: bucket where to load data.
1465        @param num_items: number of items to load
1466        @param value_size: size of the one item.
1467        @param exp: expiration value.
1468        @param kv_store: kv store index.
1469        @param flag:
1470        @param only_store_hash: True to store hash of item else False.
1471        @param batch_size: batch size for load data at a time.
1472        @param pause_secs: pause for next batch load.
1473        @param timeout_secs: timeout
1474        @return: task object
1475        """
1476        seed = "%s-key-" % self.__name
1477        self.__kv_gen[
1478            OPS.CREATE] = BlobGenerator(
1479            seed,
1480            seed,
1481            value_size,
1482            end=num_items)
1483
1484        gen = copy.deepcopy(self.__kv_gen[OPS.CREATE])
1485        task = self.__clusterop.async_load_gen_docs(
1486            self.__master_node, bucket.name, gen, bucket.kvs[kv_store],
1487            OPS.CREATE, exp, flag, only_store_hash, batch_size, pause_secs,
1488            timeout_secs, compression=self.sdk_compression)
1489        return task
1490
1491    def load_bucket(self, bucket, num_items, value_size=512, exp=0,
1492                    kv_store=1, flag=0, only_store_hash=True,
1493                    batch_size=1000, pause_secs=1, timeout_secs=30):
1494        """Load data synchronously on given bucket. Function wait for
1495        load data to finish.
1496        @param bucket: bucket where to load data.
1497        @param num_items: number of items to load
1498        @param value_size: size of the one item.
1499        @param exp: expiration value.
1500        @param kv_store: kv store index.
1501        @param flag:
1502        @param only_store_hash: True to store hash of item else False.
1503        @param batch_size: batch size for load data at a time.
1504        @param pause_secs: pause for next batch load.
1505        @param timeout_secs: timeout
1506        """
1507        task = self.async_load_bucket(bucket, num_items, value_size, exp,
1508                                      kv_store, flag, only_store_hash,
1509                                      batch_size, pause_secs, timeout_secs)
1510        task.result()
1511
1512    def async_load_all_buckets(self, num_items, value_size=512, exp=0,
1513                               kv_store=1, flag=0, only_store_hash=True,
1514                               batch_size=1000, pause_secs=1, timeout_secs=30):
1515        """Load data asynchronously on all buckets of the cluster.
1516        Function don't wait for load data to finish, return immidiately.
1517        @param num_items: number of items to load
1518        @param value_size: size of the one item.
1519        @param exp: expiration value.
1520        @param kv_store: kv store index.
1521        @param flag:
1522        @param only_store_hash: True to store hash of item else False.
1523        @param batch_size: batch size for load data at a time.
1524        @param pause_secs: pause for next batch load.
1525        @param timeout_secs: timeout
1526        @return: task objects list
1527        """
1528        seed = "%s-key-" % self.__name
1529        self.__kv_gen[
1530            OPS.CREATE] = BlobGenerator(
1531            seed,
1532            seed,
1533            value_size,
1534            start=0,
1535            end=num_items)
1536        tasks = []
1537        for bucket in self.__buckets:
1538            gen = copy.deepcopy(self.__kv_gen[OPS.CREATE])
1539            tasks.append(
1540                self.__clusterop.async_load_gen_docs(
1541                    self.__master_node, bucket.name, gen, bucket.kvs[kv_store],
1542                    OPS.CREATE, exp, flag, only_store_hash, batch_size,
1543                    pause_secs, timeout_secs, compression=self.sdk_compression)
1544            )
1545        return tasks
1546
1547    def load_all_buckets(self, num_items, value_size=512, exp=0,
1548                         kv_store=1, flag=0, only_store_hash=True,
1549                         batch_size=1000, pause_secs=1, timeout_secs=30):
1550        """Load data synchronously on all buckets. Function wait for
1551        load data to finish.
1552        @param num_items: number of items to load
1553        @param value_size: size of the one item.
1554        @param exp: expiration value.
1555        @param kv_store: kv store index.
1556        @param flag:
1557        @param only_store_hash: True to store hash of item else False.
1558        @param batch_size: batch size for load data at a time.
1559        @param pause_secs: pause for next batch load.
1560        @param timeout_secs: timeout
1561        """
1562        tasks = self.async_load_all_buckets(
1563            num_items, value_size, exp, kv_store, flag, only_store_hash,
1564            batch_size, pause_secs, timeout_secs)
1565        for task in tasks:
1566            task.result()
1567
1568    def load_all_buckets_from_generator(self, kv_gen, ops=OPS.CREATE, exp=0,
1569                                        kv_store=1, flag=0, only_store_hash=True,
1570                                        batch_size=1000, pause_secs=1, timeout_secs=30):
1571        """Load data synchronously on all buckets. Function wait for
1572        load data to finish.
1573        @param gen: BlobGenerator() object
1574        @param ops: OPS.CREATE/UPDATE/DELETE/APPEND.
1575        @param exp: expiration value.
1576        @param kv_store: kv store index.
1577        @param flag:
1578        @param only_store_hash: True to store hash of item else False.
1579        @param batch_size: batch size for load data at a time.
1580        @param pause_secs: pause for next batch load.
1581        @param timeout_secs: timeout
1582        """
1583        # TODO append generator values if op_type is already present
1584        if ops not in self.__kv_gen:
1585            self.__kv_gen[ops] = kv_gen
1586
1587        tasks = []
1588        for bucket in self.__buckets:
1589            kv_gen = copy.deepcopy(self.__kv_gen[OPS.CREATE])
1590            tasks.append(
1591                self.__clusterop.async_load_gen_docs(
1592                    self.__master_node, bucket.name, kv_gen,
1593                    bucket.kvs[kv_store], ops, exp, flag,
1594                    only_store_hash, batch_size, pause_secs, timeout_secs,
1595                    compression=self.sdk_compression)
1596            )
1597        for task in tasks:
1598            task.result()
1599
1600    def async_load_all_buckets_from_generator(self, kv_gen, ops=OPS.CREATE, exp=0,
1601                                              kv_store=1, flag=0, only_store_hash=True,
1602                                              batch_size=1000, pause_secs=1, timeout_secs=30):
1603        """Load data asynchronously on all buckets. Function wait for
1604        load data to finish.
1605        @param gen: BlobGenerator() object
1606        @param ops: OPS.CREATE/UPDATE/DELETE/APPEND.
1607        @param exp: expiration value.
1608        @param kv_store: kv store index.
1609        @param flag:
1610        @param only_store_hash: True to store hash of item else False.
1611        @param batch_size: batch size for load data at a time.
1612        @param pause_secs: pause for next batch load.
1613        @param timeout_secs: timeout
1614        """
1615        # TODO append generator values if op_type is already present
1616        if ops not in self.__kv_gen:
1617            self.__kv_gen[ops] = kv_gen
1618
1619        tasks = []
1620        for bucket in self.__buckets:
1621            kv_gen = copy.deepcopy(self.__kv_gen[OPS.CREATE])
1622            tasks.append(
1623                self.__clusterop.async_load_gen_docs(
1624                    self.__master_node, bucket.name, kv_gen,
1625                    bucket.kvs[kv_store], ops, exp, flag,
1626                    only_store_hash, batch_size, pause_secs, timeout_secs, compression=self.sdk_compression)
1627            )
1628        return tasks
1629
1630    def load_all_buckets_till_dgm(self, active_resident_threshold, items=0,
1631                                  value_size=512, exp=0, kv_store=1, flag=0,
1632                                  only_store_hash=True, batch_size=1000,
1633                                  pause_secs=1, timeout_secs=30):
1634        """Load data synchronously on all buckets till dgm (Data greater than memory)
1635        for given active_resident_threshold
1636        @param active_resident_threshold: Dgm threshold.
1637        @param value_size: size of the one item.
1638        @param exp: expiration value.
1639        @param kv_store: kv store index.
1640        @param flag:
1641        @param only_store_hash: True to store hash of item else False.
1642        @param batch_size: batch size for load data at a time.
1643        @param pause_secs: pause for next batch load.
1644        @param timeout_secs: timeout
1645        """
1646        items = int(items)
1647        self.__log.info("First loading \"items\" {0} number keys to handle "
1648                      "update/deletes in dgm cases".format(items))
1649        self.load_all_buckets(items)
1650
1651        self.__log.info("Now loading extra keys to reach dgm limit")
1652        seed = "%s-key-" % self.__name
1653        end = 0
1654        for bucket in self.__buckets:
1655            current_active_resident = StatsCommon.get_stats(
1656                [self.__master_node],
1657                bucket,
1658                '',
1659                'vb_active_perc_mem_resident')[self.__master_node]
1660            start = items
1661            end = start + batch_size * 10
1662            while int(current_active_resident) > active_resident_threshold:
1663                self.__log.info("loading %s keys ..." % (end-start))
1664
1665                kv_gen = BlobGenerator(
1666                    seed,
1667                    seed,
1668                    value_size,
1669                    start=start,
1670                    end=end)
1671
1672                tasks = []
1673                tasks.append(self.__clusterop.async_load_gen_docs(
1674                    self.__master_node, bucket.name, kv_gen, bucket.kvs[kv_store],
1675                    OPS.CREATE, exp, flag, only_store_hash, batch_size,
1676                    pause_secs, timeout_secs, compression=self.sdk_compression))
1677
1678                for task in tasks:
1679                    task.result()
1680                start = end
1681                end = start + batch_size * 10
1682                current_active_resident = StatsCommon.get_stats(
1683                    [self.__master_node],
1684                    bucket,
1685                    '',
1686                    'vb_active_perc_mem_resident')[self.__master_node]
1687                self.__log.info(
1688                    "Current resident ratio: %s, desired: %s bucket %s" % (
1689                        current_active_resident,
1690                        active_resident_threshold,
1691                        bucket.name))
1692            self.__log.info("Loaded a total of %s keys into bucket %s"
1693                            % (end,bucket.name))
1694        self.__kv_gen[OPS.CREATE] = BlobGenerator(
1695                seed,
1696                seed,
1697                value_size,
1698                start = 0,
1699                end=end)
1700
1701    def async_update_delete(
1702            self, op_type, perc=30, expiration=0, kv_store=1):
1703        """Perform update/delete operation on all buckets. Function don't wait
1704        operation to finish.
1705        @param op_type: OPS.CREATE/OPS.UPDATE/OPS.DELETE
1706        @param perc: percentage of data to be deleted or created
1707        @param expiration: time for expire items
1708        @return: task object list
1709        """
1710        raise_if(
1711            OPS.CREATE not in self.__kv_gen,
1712            XDCRException(
1713                "Data is not loaded in cluster.Load data before update/delete")
1714        )
1715        tasks = []
1716        for bucket in self.__buckets:
1717            if op_type == OPS.UPDATE:
1718                if isinstance(self.__kv_gen[OPS.CREATE], BlobGenerator):
1719                    self.__kv_gen[OPS.UPDATE] = BlobGenerator(
1720                        self.__kv_gen[OPS.CREATE].name,
1721                        self.__kv_gen[OPS.CREATE].seed,
1722                        self.__kv_gen[OPS.CREATE].value_size,
1723                        start=0,
1724                        end=int(self.__kv_gen[OPS.CREATE].end * (float)(perc) / 100))
1725                elif isinstance(self.__kv_gen[OPS.CREATE], DocumentGenerator):
1726                    self.__kv_gen[OPS.UPDATE] = DocumentGenerator(
1727                        self.__kv_gen[OPS.CREATE].name,
1728                        self.__kv_gen[OPS.CREATE].template,
1729                        self.__kv_gen[OPS.CREATE].args,
1730                        start=0,
1731                        end=int(self.__kv_gen[OPS.CREATE].end * (float)(perc) / 100))
1732                gen = copy.deepcopy(self.__kv_gen[OPS.UPDATE])
1733            elif op_type == OPS.DELETE:
1734                if isinstance(self.__kv_gen[OPS.CREATE], BlobGenerator):
1735                    self.__kv_gen[OPS.DELETE] = BlobGenerator(
1736                        self.__kv_gen[OPS.CREATE].name,
1737                        self.__kv_gen[OPS.CREATE].seed,
1738                        self.__kv_gen[OPS.CREATE].value_size,
1739                        start=int((self.__kv_gen[OPS.CREATE].end) * (float)(
1740                            100 - perc) / 100),
1741                        end=self.__kv_gen[OPS.CREATE].end)
1742                elif isinstance(self.__kv_gen[OPS.CREATE], DocumentGenerator):
1743                    self.__kv_gen[OPS.DELETE] = DocumentGenerator(
1744                        self.__kv_gen[OPS.CREATE].name,
1745                        self.__kv_gen[OPS.CREATE].template,
1746                        self.__kv_gen[OPS.CREATE].args,
1747                        start=0,
1748                        end=int(self.__kv_gen[OPS.CREATE].end * (float)(perc) / 100))
1749                gen = copy.deepcopy(self.__kv_gen[OPS.DELETE])
1750            else:
1751                raise XDCRException("Unknown op_type passed: %s" % op_type)
1752
1753            self.__log.info("At bucket '{0}' @ {1}: operation: {2}, key range {3} - {4}".
1754                       format(bucket.name, self.__name, op_type, gen.start, gen.end))
1755            tasks.append(
1756                self.__clusterop.async_load_gen_docs(
1757                    self.__master_node,
1758                    bucket.name,
1759                    gen,
1760                    bucket.kvs[kv_store],
1761                    op_type,
1762                    expiration,
1763                    batch_size=1000,
1764                    compression=self.sdk_compression)
1765            )
1766        return tasks
1767
1768    def update_delete_data(
1769            self, op_type, perc=30, expiration=0, wait_for_expiration=True):
1770        """Perform update/delete operation on all buckets. Function wait
1771        operation to finish.
1772        @param op_type: OPS.CREATE/OPS.UPDATE/OPS.DELETE
1773        @param perc: percentage of data to be deleted or created
1774        @param expiration: time for expire items
1775        @param wait_for_expiration: True if wait for expire of items after
1776        update else False
1777        """
1778        tasks = self.async_update_delete(op_type, perc, expiration)
1779
1780        [task.result() for task in tasks]
1781
1782        if wait_for_expiration and expiration:
1783            self.__log.info("Waiting for expiration of updated items")
1784            time.sleep(expiration)
1785
1786    def run_expiry_pager(self, val=10):
1787        """Run expiry pager process and set interval to 10 seconds
1788        and wait for 10 seconds.
1789        @param val: time in seconds.
1790        """
1791        for bucket in self.__buckets:
1792            ClusterOperationHelper.flushctl_set(
1793                self.__master_node,
1794                "exp_pager_stime",
1795                val,
1796                bucket)
1797            self.__log.info("wait for expiry pager to run on all these nodes")
1798        time.sleep(val)
1799
1800    def async_create_views(
1801            self, design_doc_name, views, bucket=BUCKET_NAME.DEFAULT):
1802        """Create given views on Cluster.
1803        @param design_doc_name: name of design doc.
1804        @param views: views objects.
1805        @param bucket: bucket name.
1806        @return: task list for CreateViewTask
1807        """
1808        tasks = []
1809        if len(views):
1810            for view in views:
1811                task = self.__clusterop.async_create_view(
1812                    self.__master_node,
1813                    design_doc_name,
1814                    view,
1815                    bucket)
1816                tasks.append(task)
1817        else:
1818            task = self.__clusterop.async_create_view(
1819                self.__master_node,
1820                design_doc_name,
1821                None,
1822                bucket)
1823            tasks.append(task)
1824        return tasks
1825
1826    def async_compact_view(
1827            self, design_doc_name, bucket=BUCKET_NAME.DEFAULT,
1828            with_rebalance=False):
1829        """Create given views on Cluster.
1830        @param design_doc_name: name of design doc.
1831        @param bucket: bucket name.
1832        @param with_rebalance: True if compaction is called during
1833        rebalance or False.
1834        @return: task object
1835        """
1836        task = self.__clusterop.async_compact_view(
1837            self.__master_node,
1838            design_doc_name,
1839            bucket,
1840            with_rebalance)
1841        return task
1842
1843    def disable_compaction(self, bucket=BUCKET_NAME.DEFAULT):
1844        """Disable view compaction
1845        @param bucket: bucket name.
1846        """
1847        new_config = {"viewFragmntThresholdPercentage": None,
1848                      "dbFragmentThresholdPercentage": None,
1849                      "dbFragmentThreshold": None,
1850                      "viewFragmntThreshold": None}
1851        self.__clusterop.modify_fragmentation_config(
1852            self.__master_node,
1853            new_config,
1854            bucket)
1855
1856    def async_monitor_view_fragmentation(
1857            self,
1858            design_doc_name,
1859            fragmentation_value,
1860            bucket=BUCKET_NAME.DEFAULT):
1861        """Monitor view fragmantation during compation.
1862        @param design_doc_name: name of design doc.
1863        @param fragmentation_value: fragmentation threshold to monitor.
1864        @param bucket: bucket name.
1865        """
1866        task = self.__clusterop.async_monitor_view_fragmentation(
1867            self.__master_node,
1868            design_doc_name,
1869            fragmentation_value,
1870            bucket)
1871        return task
1872
1873    def async_query_view(
1874            self, design_doc_name, view_name, query,
1875            expected_rows=None, bucket="default", retry_time=2):
1876        """Perform View Query for given view asynchronously.
1877        @param design_doc_name: design_doc name.
1878        @param view_name: view name
1879        @param query: query expression
1880        @param expected_rows: number of rows expected returned in query.
1881        @param bucket: bucket name.
1882        @param retry_time: retry to perform view query
1883        @return: task object of ViewQueryTask class
1884        """
1885        task = self.__clusterop.async_query_view(
1886            self.__master_node,
1887            design_doc_name,
1888            view_name,
1889            query,
1890            expected_rows,
1891            bucket=bucket,
1892            retry_time=retry_time)
1893        return task
1894
1895    def query_view(
1896            self, design_doc_name, view_name, query,
1897            expected_rows=None, bucket="default", retry_time=2, timeout=None):
1898        """Perform View Query for given view synchronously.
1899        @param design_doc_name: design_doc name.
1900        @param view_name: view name
1901        @param query: query expression
1902        @param expected_rows: number of rows expected returned in query.
1903        @param bucket: bucket name.
1904        @param retry_time: retry to perform view query
1905        @param timeout: None if wait for query result until returned
1906        else pass timeout value.
1907        """
1908
1909        task = self.__clusterop.async_query_view(
1910            self.__master_node,
1911            design_doc_name,
1912            view_name,
1913            query,
1914            expected_rows,
1915            bucket=bucket, retry_time=retry_time)
1916        task.result(timeout)
1917
1918    def __async_rebalance_out(self, master=False, num_nodes=1):
1919        """Rebalance-out nodes from Cluster
1920        @param master: True if rebalance-out master node only.
1921        @param num_nodes: number of nodes to rebalance-out from cluster.
1922        """
1923        raise_if(
1924            len(self.__nodes) <= num_nodes,
1925            XDCRException(
1926                "Cluster needs:{0} nodes for rebalance-out, current: {1}".
1927                format((num_nodes + 1), len(self.__nodes)))
1928        )
1929        if master:
1930            to_remove_node = [self.__master_node]
1931        else:
1932            to_remove_node = self.__nodes[-num_nodes:]
1933        self.__log.info(
1934            "Starting rebalance-out nodes:{0} at {1} cluster {2}".format(
1935                to_remove_node, self.__name, self.__master_node.ip))
1936        task = self.__clusterop.async_rebalance(
1937            self.__nodes,
1938            [],
1939            to_remove_node)
1940
1941        [self.__nodes.remove(node) for node in to_remove_node]
1942
1943        if master:
1944            self.__master_node = self.__nodes[0]
1945
1946        return task
1947
1948    def async_rebalance_in_out(self, remove_nodes, num_add_nodes=1, master=False):
1949        """Rebalance-in-out nodes from Cluster
1950        @param remove_nodes: a list of nodes to be rebalanced-out
1951        @param master: True if rebalance-out master node only.
1952        @param num_nodes: number of nodes to add back to cluster.
1953        """
1954        raise_if(len(FloatingServers._serverlist) < num_add_nodes,
1955            XDCRException(
1956                "Cluster needs {0} nodes for rebalance-in, current: {1}".
1957                format((num_add_nodes),
1958                       len(FloatingServers._serverlist)))
1959        )
1960
1961        add_nodes = []
1962        for _ in range(num_add_nodes):
1963            add_nodes.append(FloatingServers._serverlist.pop())
1964
1965        self.__log.info(
1966            "Starting rebalance-out: {0}, rebalance-in: {1} at {2} cluster {3}".
1967            format(
1968                remove_nodes,
1969                add_nodes,
1970                self.__name,
1971                self.__master_node.ip))
1972        task = self.__clusterop.async_rebalance(
1973            self.__nodes,
1974            add_nodes,
1975            remove_nodes)
1976
1977        if not remove_nodes:
1978            remove_nodes = self.__fail_over_nodes
1979
1980        for node in remove_nodes:
1981            for server in self.__nodes:
1982                if node.ip == server.ip:
1983                    self.__nodes.remove(server)
1984        self.__nodes.extend(add_nodes)
1985
1986        if master:
1987            self.__master_node = self.__nodes[0]
1988        return task
1989
1990    def async_rebalance_out_master(self):
1991        return self.__async_rebalance_out(master=True)
1992
1993    def async_rebalance_out(self, num_nodes=1):
1994        return self.__async_rebalance_out(num_nodes=num_nodes)
1995
1996    def rebalance_out_master(self):
1997        task = self.__async_rebalance_out(master=True)
1998        task.result()
1999
2000    def rebalance_out(self, num_nodes=1):
2001        task = self.__async_rebalance_out(num_nodes=num_nodes)
2002        task.result()
2003
2004    def async_rebalance_in(self, num_nodes=1):
2005        """Rebalance-in nodes into Cluster asynchronously
2006        @param num_nodes: number of nodes to rebalance-in to cluster.
2007        """
2008        raise_if(
2009            len(FloatingServers._serverlist) < num_nodes,
2010            XDCRException(
2011                "Number of free nodes: {0} is not preset to add {1} nodes.".
2012                format(len(FloatingServers._serverlist), num_nodes))
2013        )
2014        to_add_node = []
2015        for _ in range(num_nodes):
2016            to_add_node.append(FloatingServers._serverlist.pop())
2017        self.__log.info(
2018            "Starting rebalance-in nodes:{0} at {1} cluster {2}".format(
2019                to_add_node, self.__name, self.__master_node.ip))
2020        task = self.__clusterop.async_rebalance(self.__nodes, to_add_node, [])
2021        self.__nodes.extend(to_add_node)
2022        return task
2023
2024    def rebalance_in(self, num_nodes=1):
2025        """Rebalance-in nodes
2026        @param num_nodes: number of nodes to add to cluster.
2027        """
2028        task = self.async_rebalance_in(num_nodes)
2029        task.result()
2030
2031    def __async_swap_rebalance(self, master=False):
2032        """Swap-rebalance nodes on Cluster
2033        @param master: True if swap-rebalance master node else False.
2034        """
2035        if master:
2036            to_remove_node = [self.__master_node]
2037        else:
2038            to_remove_node = [self.__nodes[-1]]
2039
2040        to_add_node = [FloatingServers._serverlist.pop()]
2041
2042        self.__log.info(
2043            "Starting swap-rebalance [remove_node:{0}] -> [add_node:{1}] at {2} cluster {3}"
2044            .format(to_remove_node[0].ip, to_add_node[0].ip, self.__name,
2045                    self.__master_node.ip))
2046        task = self.__clusterop.async_rebalance(
2047            self.__nodes,
2048            to_add_node,
2049            to_remove_node)
2050
2051        [self.__nodes.remove(node) for node in to_remove_node]
2052        self.__nodes.extend(to_add_node)
2053
2054        if master:
2055            self.__master_node = self.__nodes[0]
2056
2057        return task
2058
2059    def async_swap_rebalance_master(self):
2060        return self.__async_swap_rebalance(master=True)
2061
2062    def async_swap_rebalance(self):
2063        return self.__async_swap_rebalance()
2064
2065    def swap_rebalance_master(self):
2066        """Swap rebalance master node.
2067        """
2068        task = self.__async_swap_rebalance(master=True)
2069        task.result()
2070
2071    def swap_rebalance(self):
2072        """Swap rebalance non-master node
2073        """
2074        task = self.__async_swap_rebalance()
2075        task.result()
2076
2077    def __async_failover(self, master=False, num_nodes=1, graceful=False):
2078        """Failover nodes from Cluster
2079        @param master: True if failover master node only.
2080        @param num_nodes: number of nodes to rebalance-out from cluster.
2081        @param graceful: True if graceful failover else False.
2082        """
2083        raise_if(
2084            len(self.__nodes) <= 1,
2085            XDCRException(
2086                "More than 1 node required in cluster to perform failover")
2087        )
2088        if master:
2089            self.__fail_over_nodes = [self.__master_node]
2090        else:
2091            self.__fail_over_nodes = self.__nodes[-num_nodes:]
2092
2093        self.__log.info(
2094            "Starting failover for nodes:{0} at {1} cluster {2}".format(
2095                self.__fail_over_nodes, self.__name, self.__master_node.ip))
2096        task = self.__clusterop.async_failover(
2097            self.__nodes,
2098            self.__fail_over_nodes,
2099            graceful)
2100
2101        return task
2102
2103    def async_failover(self, num_nodes=1, graceful=False):
2104        return self.__async_failover(num_nodes=num_nodes, graceful=graceful)
2105
2106    def failover(self, num_nodes=1, graceful=False):
2107        self.__async_failover(num_nodes=num_nodes, graceful=graceful).result()
2108
2109    def failover_and_rebalance_master(self, graceful=False, rebalance=True,master=True):
2110        """Failover master node
2111        @param graceful: True if graceful failover else False
2112        @param rebalance: True if do rebalance operation after failover.
2113        """
2114        task = self.__async_failover(master, graceful=graceful)
2115        task.result()
2116        if graceful:
2117            # wait for replica update
2118            time.sleep(60)
2119            # use rebalance stats to monitor failover
2120            RestConnection(self.__master_node).monitorRebalance()
2121        if rebalance:
2122            self.rebalance_failover_nodes()
2123        self.__master_node = self.__nodes[0]
2124
2125    def failover_and_rebalance_nodes(self, num_nodes=1, graceful=False,
2126                                     rebalance=True):
2127        """ Failover non-master nodes
2128        @param num_nodes: number of nodes to failover.
2129        @param graceful: True if graceful failover else False
2130        @param rebalance: True if do rebalance operation after failover.
2131        """
2132        task = self.__async_failover(
2133            master=False,
2134            num_nodes=num_nodes,
2135            graceful=graceful)
2136        task.result()
2137        if graceful:
2138            time.sleep(60)
2139            # use rebalance stats to monitor failover
2140            RestConnection(self.__master_node).monitorRebalance()
2141        if rebalance:
2142            self.rebalance_failover_nodes()
2143
2144    def rebalance_failover_nodes(self):
2145        self.__clusterop.rebalance(self.__nodes, [], self.__fail_over_nodes)
2146        [self.__nodes.remove(node) for node in self.__fail_over_nodes]
2147        self.__fail_over_nodes = []
2148
2149    def add_back_node(self, recovery_type=None):
2150        """add-back failed-over node to the cluster.
2151            @param recovery_type: delta/full
2152        """
2153        raise_if(
2154            len(self.__fail_over_nodes) < 1,
2155            XDCRException("No failover nodes available to add_back")
2156        )
2157        rest = RestConnection(self.__master_node)
2158        server_nodes = rest.node_statuses()
2159        for failover_node in self.__fail_over_nodes:
2160            for server_node in server_nodes:
2161                if server_node.ip == failover_node.ip:
2162                    rest.add_back_node(server_node.id)
2163                    if recovery_type:
2164                        rest.set_recovery_type(
2165                            otpNode=server_node.id,
2166                            recoveryType=recovery_type)
2167        for node in self.__fail_over_nodes:
2168            if node not in self.__nodes:
2169                self.__nodes.append(node)
2170        self.__clusterop.rebalance(self.__nodes, [], [])
2171        self.__fail_over_nodes = []
2172
2173    def warmup_node(self, master=False):
2174        """Warmup node on cluster
2175        @param master: True if warmup master-node else False.
2176        """
2177        from random import randrange
2178
2179        if master:
2180            warmup_node = self.__master_node
2181
2182        else:
2183            warmup_node = self.__nodes[
2184                randrange(
2185                    1, len(
2186                        self.__nodes))]
2187        NodeHelper.do_a_warm_up(warmup_node)
2188        return warmup_node
2189
2190    def reboot_one_node(self, test_case, master=False):
2191        from random import randrange
2192
2193        if master:
2194            reboot_node = self.__master_node
2195
2196        else:
2197            reboot_node = self.__nodes[
2198                randrange(
2199                    1, len(
2200                        self.__nodes))]
2201        NodeHelper.reboot_server(reboot_node, test_case)
2202        return reboot_node
2203
2204    def restart_couchbase_on_all_nodes(self, bucket_names=["default"]):
2205        for node in self.__nodes:
2206            NodeHelper.do_a_warm_up(node)
2207
2208        NodeHelper.wait_warmup_completed(self.__nodes, bucket_names)
2209
2210    def set_xdcr_param(self, param, value):
2211        """Set Replication parameter on couchbase server:
2212        @param param: XDCR parameter name.
2213        @param value: Value of parameter.
2214        """
2215        for remote_ref in self.get_remote_clusters():
2216            for repl in remote_ref.get_replications():
2217                src_bucket = repl.get_src_bucket()
2218                dst_bucket = repl.get_dest_bucket()
2219                RestConnection(self.__master_node).set_xdcr_param(src_bucket.name, dst_bucket.name, param, value)
2220
2221                expected_results = {
2222                    "real_userid:source": "ns_server",
2223                    "real_userid:user": self.__master_node.rest_username,
2224                    "local_cluster_name": "%s:%s" % (self.__master_node.ip, self.__master_node.port),
2225                    "updated_settings:" + param: value,
2226                    "source_bucket_name": repl.get_src_bucket().name,
2227                    "remote_cluster_name": "remote_cluster_C1-C2",
2228                    "target_bucket_name": repl.get_dest_bucket().name
2229                }
2230
2231                # In case of ns_server xdcr, no events generate for it.
2232                ValidateAuditEvent.validate_audit_event(
2233                    GO_XDCR_AUDIT_EVENT_ID.IND_SETT,
2234                    self.get_master_node(),
2235                    expected_results)
2236
2237    def get_xdcr_stat(self, bucket_name, stat):
2238        """ Return given XDCR stat for given bucket.
2239        @param bucket_name: name of bucket.
2240        @param stat: stat name
2241        @return: value of stat
2242        """
2243        return int(RestConnection(self.__master_node).fetch_bucket_xdcr_stats(
2244            bucket_name)['op']['samples'][stat][-1])
2245
2246    def wait_for_xdcr_stat(self, bucket, stat, comparison, value):
2247        """Wait for given stat for a bucket to given condition.
2248        @param bucket: bucket name
2249        @param stat: stat name
2250        @param comparison: comparison operatior e.g. "==", "<"
2251        @param value: value to compare.
2252        """
2253        task = self.__clusterop.async_wait_for_xdcr_stat(
2254            self.__nodes,
2255            bucket,
2256            '',
2257            stat,
2258            comparison,
2259            value)
2260        task.result()
2261
2262    def add_remote_cluster(self, dest_cluster, name, encryption=False, replicator_target_role=False):
2263        """Create remote cluster reference or add remote cluster for xdcr.
2264        @param dest_cluster: Destination cb cluster object.
2265        @param name: name of remote cluster reference
2266        @param encryption: True if encryption for xdcr else False
2267        """
2268        remote_cluster = XDCRRemoteClusterRef(
2269            self,
2270            dest_cluster,
2271            name,
2272            encryption,
2273            replicator_target_role
2274        )
2275        remote_cluster.add()
2276        self.__remote_clusters.append(remote_cluster)
2277
2278    # add params to what to modify
2279    def modify_remote_cluster(self, remote_cluster_name, require_encryption):
2280        """Modify Remote Cluster Reference Settings for given name.
2281        @param remote_cluster_name: name of the remote cluster to change.
2282        @param require_encryption: Value of encryption if need to change True/False.
2283        """
2284        for remote_cluster in self.__remote_clusters:
2285            if remote_cluster_name == remote_cluster.get_name():
2286                remote_cluster. modify(require_encryption)
2287                break
2288        else:
2289            raise XDCRException(
2290                "No such remote cluster found with name: {0}".format(
2291                    remote_cluster_name))
2292
2293    def wait_for_flusher_empty(self, timeout=60):
2294        """Wait for disk queue to completely flush.
2295        """
2296        tasks = []
2297        for node in self.__nodes:
2298            for bucket in self.__buckets:
2299                tasks.append(
2300                    self.__clusterop.async_wait_for_stats(
2301                        [node],
2302                        bucket,
2303                        '',
2304                        'ep_queue_size',
2305                        '==',
2306                        0))
2307        for task in tasks:
2308            task.result(timeout)
2309
2310    def verify_items_count(self, timeout=600):
2311        """Wait for actual bucket items count reach to the count on bucket kv_store.
2312        """
2313        active_key_count_passed = True
2314        replica_key_count_passed = True
2315        curr_time = time.time()
2316        end_time = curr_time + timeout
2317
2318        # Check active, curr key count
2319        rest = RestConnection(self.__master_node)
2320        buckets = copy.copy(self.get_buckets())
2321
2322        for bucket in buckets:
2323            items = sum([len(kv_store) for kv_store in bucket.kvs.values()])
2324            while True:
2325                try:
2326                    active_keys = int(rest.get_active_key_count(bucket.name))
2327                except Exception as e:
2328                    self.__log.error(e)
2329                    bucket_info = rest.get_bucket_json(bucket.name)
2330                    nodes = bucket_info["nodes"]
2331                    active_keys = 0
2332                    for node in nodes:
2333                        active_keys += node["interestingStats"]["curr_items"]
2334                if active_keys != items:
2335                        self.__log.warn("Not Ready: vb_active_curr_items %s == "
2336                                "%s expected on %s, %s bucket"
2337                                 % (active_keys, items, self.__name, bucket.name))
2338                        time.sleep(5)
2339                        if time.time() > end_time:
2340                            self.__log.error(
2341                            "ERROR: Timed-out waiting for active item count to match")
2342                            active_key_count_passed = False
2343                            break
2344                        continue
2345                else:
2346                    self.__log.info("Saw: vb_active_curr_items %s == "
2347                            "%s expected on %s, %s bucket"
2348                            % (active_keys, items, self.__name, bucket.name))
2349                    break
2350        # check replica count
2351        curr_time = time.time()
2352        end_time = curr_time + timeout
2353        buckets = copy.copy(self.get_buckets())
2354
2355        for bucket in buckets:
2356            if len(self.__nodes) > 1:
2357                items = sum([len(kv_store) for kv_store in bucket.kvs.values()])
2358                items = items * bucket.numReplicas
2359            else:
2360                items = 0
2361            while True:
2362                try:
2363                    replica_keys = int(rest.get_replica_key_count(bucket.name))
2364                except Exception as e:
2365                    self.__log.error(e)
2366                    bucket_info = rest.get_bucket_json(bucket.name)
2367                    nodes = bucket_info["nodes"]
2368                    replica_keys = 0
2369                    for node in nodes:
2370                        replica_keys += node["interestingStats"]["vb_replica_curr_items"]
2371                if replica_keys != items:
2372                    self.__log.warn("Not Ready: vb_replica_curr_items %s == "
2373                            "%s expected on %s, %s bucket"
2374                             % (replica_keys, items ,self.__name, bucket.name))
2375                    time.sleep(3)
2376                    if time.time() > end_time:
2377                        self.__log.error(
2378                        "ERROR: Timed-out waiting for replica item count to match")
2379                        replica_key_count_passed = False
2380                        self.run_cbvdiff()
2381                        break
2382                    continue
2383                else:
2384                    self.__log.info("Saw: vb_replica_curr_items %s == "
2385                            "%s expected on %s, %s bucket"
2386                            % (replica_keys, items, self.__name, bucket.name))
2387                    break
2388        return active_key_count_passed, replica_key_count_passed
2389
2390    def run_cbvdiff(self):
2391        """ Run cbvdiff, a tool that compares active and replica vbucket keys
2392        Eg. ./cbvdiff -b standardbucket  172.23.105.44:11210,172.23.105.45:11210
2393             VBucket 232: active count 59476 != 59477 replica count
2394        """
2395        node_str = ""
2396        for node in self.__nodes:
2397            if node_str:
2398                node_str += ','
2399            node_str += node.ip + ':11210'
2400        ssh_conn = RemoteMachineShellConnection(self.__master_node)
2401        for bucket in self.__buckets:
2402            self.__log.info(
2403                "Executing cbvdiff for bucket {0}".format(
2404                    bucket.name))
2405            if bucket.saslPassword:
2406                ssh_conn.execute_cbvdiff(bucket, node_str, bucket.saslPassword)
2407            else:
2408                ssh_conn.execute_cbvdiff(bucket, node_str)
2409        ssh_conn.disconnect()
2410
2411    def verify_data(self, kv_store=1, timeout=None,
2412                    max_verify=None, only_store_hash=True, batch_size=1000, skip=[]):
2413        """Verify data of all the buckets. Function read data from cb server and
2414        compare it with bucket's kv_store.
2415        @param kv_store: Index of kv_store where item values are stored on
2416        bucket.
2417        @param timeout: None if wait indefinitely else give timeout value.
2418        @param max_verify: number of items to verify. None if verify all items
2419        on bucket.
2420        @param only_store_hash: True if verify hash of items else False.
2421        @param batch_size: batch size to read items from server.
2422        """
2423        self.__data_verified = False
2424        tasks = []
2425        for bucket in self.__buckets:
2426            if bucket.name not in skip:
2427                tasks.append(
2428                        self.__clusterop.async_verify_data(
2429                                self.__master_node,
2430                                bucket,
2431                                bucket.kvs[kv_store],
2432                                max_verify,
2433                                only_store_hash,
2434                                batch_size,
2435                                timeout_sec=60,
2436                                compression=self.sdk_compression))
2437        for task in tasks:
2438            task.result(timeout)
2439
2440        self.__data_verified = True
2441
2442    def verify_meta_data(self, kv_store=1, timeout=None, skip=[]):
2443        """Verify if metadata for bucket matches on src and dest clusters
2444        @param kv_store: Index of kv_store where item values are stored on
2445        bucket.
2446        @param timeout: None if wait indefinitely else give timeout value.
2447        """
2448        self.__meta_data_verified = False
2449        tasks = []
2450        for bucket in self.__buckets:
2451            if bucket.name not in skip:
2452                data_map = {}
2453                gather_task = self.__clusterop.async_get_meta_data(self.__master_node, bucket, bucket.kvs[kv_store],
2454                                                        compression=self.sdk_compression)
2455                gather_task.result()
2456                data_map[bucket.name] = gather_task.get_meta_data_store()
2457                tasks.append(
2458                    self.__clusterop.async_verify_meta_data(
2459                        self.__master_node,
2460                        bucket,
2461                        bucket.kvs[kv_store],
2462                        data_map[bucket.name]
2463                        ))
2464        for task in tasks:
2465            task.result(timeout)
2466        self.__meta_data_verified = True
2467
2468    def wait_for_dcp_queue_drain(self, timeout=180):
2469        """Wait for ep_dcp_xdcr_items_remaining to reach 0.
2470        @return: True if reached 0 else False.
2471        """
2472        self.__log.info(
2473            "Waiting for dcp queue to drain on cluster node: %s" %
2474            self.__master_node.ip)
2475        curr_time = time.time()
2476        end_time = curr_time + timeout
2477        rest = RestConnection(self.__master_node)
2478        buckets = copy.copy(self.get_buckets())
2479        for bucket in buckets:
2480            try:
2481                mutations = int(rest.get_dcp_queue_size(bucket.name))
2482                self.__log.info(
2483                    "Current dcp queue size on %s for %s is %s" %
2484                    (self.__name, bucket.name, mutations))
2485                if mutations == 0:
2486                    buckets.remove(bucket)
2487                else:
2488                    time.sleep(5)
2489                    end_time = end_time - 5
2490            except Exception as e:
2491                self.__log.error(e)
2492            if curr_time > end_time:
2493                self.__log.error(
2494                "Timeout occurs while waiting for dcp queue to drain")
2495                return False
2496        return True
2497
2498    def wait_for_outbound_mutations(self, timeout=180):
2499        """Wait for Outbound mutations to reach 0.
2500        @return: True if mutations reached to 0 else False.
2501        """
2502        self.__log.info(
2503            "Waiting for Outbound mutation to be zero on cluster node: %s" %
2504            self.__master_node.ip)
2505        curr_time = time.time()
2506        end_time = curr_time + timeout
2507        rest = RestConnection(self.__master_node)
2508        while curr_time < end_time:
2509            found = 0
2510            for bucket in self.__buckets:
2511                try:
2512                    mutations = int(rest.get_xdc_queue_size(bucket.name))
2513                except KeyError:
2514                    self.__log.error("Stat \"replication_changes_left\" not found")
2515                    return False
2516                self.__log.info(
2517                    "Current Outbound mutations on cluster node: %s for bucket %s is %s" %
2518                    (self.__name, bucket.name, mutations))
2519                if mutations == 0:
2520                    found = found + 1
2521            if found == len(self.__buckets):
2522                break
2523            time.sleep(5)
2524            end_time = end_time - 5
2525        else:
2526            # MB-9707: Updating this code from fail to warning to avoid test
2527            # to abort, as per this
2528            # bug, this particular stat i.e. replication_changes_left is buggy.
2529            self.__log.error(
2530                "Timeout occurs while waiting for mutations to be replicated")
2531            return False
2532        return True
2533
2534    def pause_all_replications(self, verify=False):
2535        for remote_cluster_ref in self.__remote_clusters:
2536            remote_cluster_ref.pause_all_replications(verify)
2537
2538    def pause_all_replications_by_id(self, verify=False):
2539        for remote_cluster_ref in self.__remote_clusters:
2540            remote_cluster_ref.pause_all_replications_by_id(verify)
2541
2542    def resume_all_replications(self, verify=False):
2543        for remote_cluster_ref in self.__remote_clusters:
2544            remote_cluster_ref.resume_all_replications(verify)
2545
2546    def resume_all_replications_by_id(self, verify=False):
2547        for remote_cluster_ref in self.__remote_clusters:
2548            remote_cluster_ref.resume_all_replications_by_id(verify)
2549
2550    def enable_time_sync(self, enable):
2551        """
2552        @param enable: True if time_sync needs to enabled else False
2553        """
2554        # TODO call rest api
2555        pass
2556
2557    def set_wall_clock_time(self, date_str):
2558        for node in self.__nodes:
2559            NodeHelper.set_wall_clock_time(node, date_str)
2560
2561
2562class Utility:
2563
2564    @staticmethod
2565    def make_default_views(prefix, num_views, is_dev_ddoc=False):
2566        """Create default views for testing.
2567        @param prefix: prefix for view name
2568        @param num_views: number of views to create
2569        @param is_dev_ddoc: True if Development View else False
2570        """
2571        default_map_func = "function (doc) {\n  emit(doc._id, doc);\n}"
2572        default_view_name = (prefix, "default_view")[prefix is None]
2573        return [View(default_view_name + str(i), default_map_func,
2574                     None, is_dev_ddoc) for i in xrange(num_views)]
2575
2576    @staticmethod
2577    def get_rc_name(src_cluster_name, dest_cluster_name):
2578        return "remote_cluster_" + src_cluster_name + "-" + dest_cluster_name
2579
2580
2581class XDCRNewBaseTest(unittest.TestCase):
2582
2583    def setUp(self):
2584        unittest.TestCase.setUp(self)
2585        self._input = TestInputSingleton.input
2586        self.log = logger.Logger.get_logger()
2587        self.__init_logger()
2588        self.__cb_clusters = []
2589        self.__cluster_op = Cluster()
2590        self.__init_parameters()
2591        self.log.info(
2592            "==== XDCRNewbasetests setup is started for test #{0} {1} ===="
2593            .format(self.__case_number, self._testMethodName))
2594
2595        self.__setup_for_test()
2596
2597        self.log.info(
2598            "==== XDCRNewbasetests setup is finished for test #{0} {1} ===="
2599            .format(self.__case_number, self._testMethodName))
2600
2601    def __is_test_failed(self):
2602        return (hasattr(self, '_resultForDoCleanups')
2603                and len(self._resultForDoCleanups.failures
2604                        or self._resultForDoCleanups.errors)) \
2605            or (hasattr(self, '_exc_info')
2606                and self._exc_info()[1] is not None)
2607
2608    def __is_cleanup_needed(self):
2609        return self.__is_test_failed() and (str(self.__class__).find(
2610            'upgradeXDCR') != -1 or self._input.param("stop-on-failure", False)
2611        )
2612
2613    def __is_cluster_run(self):
2614        return len(set([server.ip for server in self._input.servers])) == 1
2615
2616    def tearDown(self):
2617        """Clusters cleanup"""
2618        if self._input.param("negative_test", False):
2619            if hasattr(self, '_resultForDoCleanups') \
2620                and len(self._resultForDoCleanups.failures
2621                        or self._resultForDoCleanups.errors):
2622                self._resultForDoCleanups.failures = []
2623                self._resultForDoCleanups.errors = []
2624                self.log.info("This is marked as a negative test and contains "
2625                              "errors as expected, hence not failing it")
2626            else:
2627                raise XDCRException("Negative test passed!")
2628
2629        # collect logs before tearing down clusters
2630        if self._input.param("get-cbcollect-info", False) and \
2631                self.__is_test_failed():
2632            for server in self._input.servers:
2633                self.log.info("Collecting logs @ {0}".format(server.ip))
2634                NodeHelper.collect_logs(server, self.__is_cluster_run())
2635
2636        for i in range(1, len(self.__cb_clusters) + 1):
2637            # Remove rbac users in teardown
2638            role_del = ['cbadminbucket']
2639            RbacBase().remove_user_role(role_del, RestConnection(self.get_cb_cluster_by_name('C' + str(i)).get_master_node()))
2640            if self._replicator_role:
2641                role_del = ['replicator_user']
2642                RbacBase().remove_user_role(role_del,
2643                                            RestConnection(self.get_cb_cluster_by_name('C' + str(i)).get_master_node()))
2644
2645        try:
2646            if self.__is_cleanup_needed() or self._input.param("skip_cleanup", False):
2647                self.log.warn("CLEANUP WAS SKIPPED")
2648                return
2649            self.log.info(
2650                "====  XDCRNewbasetests cleanup is started for test #{0} {1} ===="
2651                .format(self.__case_number, self._testMethodName))
2652            for cb_cluster in self.__cb_clusters:
2653                cb_cluster.cleanup_cluster(self,from_rest=True)
2654            self.log.info(
2655                "====  XDCRNewbasetests cleanup is finished for test #{0} {1} ==="
2656                .format(self.__case_number, self._testMethodName))
2657        finally:
2658            self.__cluster_op.shutdown(force=True)
2659            unittest.TestCase.tearDown(self)
2660
2661    def __init_logger(self):
2662        if self._input.param("log_level", None):
2663            self.log.setLevel(level=0)
2664            for hd in self.log.handlers:
2665                if str(hd.__class__).find('FileHandler') != -1:
2666                    hd.setLevel(level=logging.DEBUG)
2667                else:
2668                    hd.setLevel(
2669                        level=getattr(
2670                            logging,
2671                            self._input.param(
2672                                "log_level",
2673                                None)))
2674
2675    def __setup_for_test(self):
2676        use_hostanames = self._input.param("use_hostnames", False)
2677        sdk_compression = self._input.param("sdk_compression", True)
2678        counter = 1
2679        for _, nodes in self._input.clusters.iteritems():
2680            cluster_nodes = copy.deepcopy(nodes)
2681            if len(self.__cb_clusters) == int(self.__chain_length):
2682                break
2683            self.__cb_clusters.append(
2684                CouchbaseCluster(
2685                    "C%s" % counter, cluster_nodes,
2686                    self.log, use_hostanames, sdk_compression=sdk_compression))
2687            counter += 1
2688
2689        self.__cleanup_previous()
2690        self.__init_clusters()
2691
2692        for i in range(1, len(self.__cb_clusters) + 1):
2693            # Add built-in user to C1
2694            testuser = [{'id': 'cbadminbucket', 'name': 'cbadminbucket', 'password': 'password'}]
2695            RbacBase().create_user_source(testuser, 'builtin',
2696                                          self.get_cb_cluster_by_name('C' + str(i)).get_master_node())
2697
2698
2699            # Assign user to role
2700            role_list = [{'id': 'cbadminbucket', 'name': 'cbadminbucket', 'roles': 'admin'}]
2701            RbacBase().add_user_role(role_list,
2702                                     RestConnection(self.get_cb_cluster_by_name('C' + str(i)).get_master_node()),
2703                                     'builtin')
2704
2705
2706        self.__set_free_servers()
2707        if str(self.__class__).find('upgradeXDCR') == -1 and str(self.__class__).find('lww') == -1:
2708            self.__create_buckets()
2709
2710        if self._replicator_role:
2711            for i in range(1, len(self.__cb_clusters) + 1):
2712                testuser_replicator = [{'id': 'replicator_user', 'name': 'replicator_user', 'password': 'password'}]
2713                RbacBase().create_user_source(testuser_replicator, 'builtin',
2714                                              self.get_cb_cluster_by_name('C' + str(i)).get_master_node())
2715
2716                if self._replicator_role and self._replicator_all_buckets:
2717                    role = 'replication_target[*]'
2718                else:
2719                    role = 'replication_target[default]'
2720                role_list_replicator = [
2721                        {'id': 'replicator_user', 'name': 'replicator_user', 'roles': role}]
2722                RbacBase().add_user_role(role_list_replicator,
2723                                             RestConnection(self.get_cb_cluster_by_name('C' + str(i)).get_master_node()),
2724                                             'builtin')
2725
2726    def __init_parameters(self):
2727        self.__case_number = self._input.param("case_number", 0)
2728        self.__topology = self._input.param("ctopology", TOPOLOGY.CHAIN)
2729        # complex topology tests (> 2 clusters must specify chain_length >2)
2730        self.__chain_length = self._input.param("chain_length", 2)
2731        self.__rep_type = self._input.param("replication_type",REPLICATION_PROTOCOL.XMEM)
2732        self.__num_sasl_buckets = self._input.param("sasl_buckets", 0)
2733        self.__num_stand_buckets = self._input.param("standard_buckets", 0)
2734
2735        self.__eviction_policy = self._input.param("eviction_policy",'valueOnly')
2736        self.__mixed_priority = self._input.param("mixed_priority", None)
2737
2738        self.__lww = self._input.param("lww", 0)
2739        self.__fail_on_errors = self._input.param("fail_on_errors", True)
2740        # simply append to this list, any error from log we want to fail test on
2741        self.__report_error_list = []
2742        if self.__fail_on_errors:
2743            self.__report_error_list = ["panic:",
2744                                        "non-recoverable error from xmem client. response status=KEY_ENOENT"]
2745
2746        # for format {ip1: {"panic": 2, "KEY_ENOENT":3}}
2747        self.__error_count_dict = {}
2748        if len(self.__report_error_list) > 0:
2749            self.__initialize_error_count_dict()
2750
2751        self._repl_restart_count_dict = {}
2752        self.__initialize_repl_restart_count_dict()
2753
2754        # Public init parameters - Used in other tests too.
2755        # Move above private to this section if needed in future, but
2756        # Ensure to change other tests too.
2757        self._demand_encryption = self._input.param(
2758            "demand_encryption",
2759            False)
2760        self._num_replicas = self._input.param("replicas", 1)
2761        self._create_default_bucket = self._input.param("default_bucket",True)
2762        self._rdirection = self._input.param("rdirection",
2763                            REPLICATION_DIRECTION.UNIDIRECTION)
2764        self._num_items = self._input.param("items", 1000)
2765        self._value_size = self._input.param("value_size", 512)
2766        self._poll_timeout = self._input.param("poll_timeout", 120)
2767        self._perc_upd = self._input.param("upd", 30)
2768        self._perc_del = self._input.param("del", 30)
2769        self._upd_clusters = self._input.param("update", [])
2770        if self._upd_clusters:
2771            self._upd_clusters = self._upd_clusters.split("-")
2772        self._del_clusters = self._input.param("delete", [])
2773        if self._del_clusters:
2774            self._del_clusters = self._del_clusters.split('-')
2775        self._expires = self._input.param("expires", 0)
2776        self._wait_for_expiration = self._input.param("wait_for_expiration",True)
2777        self._warmup = self._input.param("warm", "").split('-')
2778        self._rebalance = self._input.param("rebalance", "").split('-')
2779        self._failover = self._input.param("failover", "").split('-')
2780        self._wait_timeout = self._input.param("timeout", 60)
2781        self._disable_compaction = self._input.param("disable_compaction","").split('-')
2782        self._item_count_timeout = self._input.param("item_count_timeout", 300)
2783        self._checkpoint_interval = self._input.param("checkpoint_interval",60)
2784        self._optimistic_threshold = self._input.param("optimistic_threshold", 256)
2785        self._dgm_run = self._input.param("dgm_run", False)
2786        self._active_resident_threshold = \
2787            self._input.param("active_resident_threshold", 100)
2788        CHECK_AUDIT_EVENT.CHECK = self._input.param("verify_audit", 0)
2789        self._max_verify = self._input.param("max_verify", 100000)
2790        self._sdk_compression = self._input.param("sdk_compression", True)
2791        self._evict_with_compactor = self._input.param("evict_with_compactor", False)
2792        self._replicator_role = self._input.param("replicator_role",False)
2793        self._replicator_all_buckets = self._input.param("replicator_all_buckets",False)
2794
2795    def __initialize_error_count_dict(self):
2796        """
2797            initializes self.__error_count_dict with ip, error and err count
2798            like {ip1: {"panic": 2, "KEY_ENOENT":3}}
2799        """
2800        if not self.__is_cluster_run():
2801            goxdcr_log = NodeHelper.get_goxdcr_log_dir(self._input.servers[0])\
2802                     + '/goxdcr.log*'
2803        for node in self._input.servers:
2804            if self.__is_cluster_run():
2805                goxdcr_log = NodeHelper.get_goxdcr_log_dir(node)\
2806                     + '/goxdcr.log*'
2807            self.__error_count_dict[node.ip] = {}
2808            for error in self.__report_error_list:
2809                self.__error_count_dict[node.ip][error] = \
2810                    NodeHelper.check_goxdcr_log(node, error, goxdcr_log)
2811        self.log.info(self.__error_count_dict)
2812
2813    def __initialize_repl_restart_count_dict(self):
2814        """
2815            initializes self.__error_count_dict with ip, repl restart count
2816            like {{ip1: 3}, {ip2: 4}}
2817        """
2818        if not self.__is_cluster_run():
2819            goxdcr_log = NodeHelper.get_goxdcr_log_dir(self._input.servers[0])\
2820                     + '/goxdcr.log*'
2821        for node in self._input.servers:
2822            if self.__is_cluster_run():
2823                goxdcr_log = NodeHelper.get_goxdcr_log_dir(node)\
2824                     + '/goxdcr.log*'
2825            self._repl_restart_count_dict[node.ip] = \
2826                NodeHelper.check_goxdcr_log(node,
2827                                            "Try to fix Pipeline",
2828                                            goxdcr_log)
2829        self.log.info(self._repl_restart_count_dict)
2830
2831    def __cleanup_previous(self):
2832        for cluster in self.__cb_clusters:
2833            cluster.cleanup_cluster(
2834                self,
2835                from_rest=True,
2836                cluster_shutdown=False)
2837
2838    def __init_clusters(self):
2839        self.log.info("Initializing all clusters...")
2840        disabled_consistent_view = self._input.param(
2841            "disabled_consistent_view",
2842            None)
2843        for cluster in self.__cb_clusters:
2844            cluster.init_cluster(disabled_consistent_view)
2845
2846    def __set_free_servers(self):
2847        total_servers = self._input.servers
2848        cluster_nodes = []
2849        for _, nodes in self._input.clusters.iteritems():
2850            cluster_nodes.extend(nodes)
2851        for server in total_servers:
2852            for cluster_node in cluster_nodes:
2853                if server.ip == cluster_node.ip and\
2854                                server.port == cluster_node.port:
2855                    break
2856                else:
2857                    continue
2858            else:
2859                FloatingServers._serverlist.append(server)
2860
2861    def get_cluster_op(self):
2862        return self.__cluster_op
2863
2864    def add_built_in_server_user(self, testuser=None, rolelist=None, node=None):
2865        """
2866           From spock, couchbase server is built with some users that handles
2867           some specific task such as:
2868               cbadminbucket
2869           Default added user is cbadminbucket with admin role
2870        """
2871        if testuser is None:
2872            testuser = [{'id': 'cbadminbucket', 'name': 'cbadminbucket',
2873                                                'password': 'password'}]
2874        if rolelist is None:
2875            rolelist = [{'id': 'cbadminbucket', 'name': 'cbadminbucket',
2876                                                      'roles': 'admin'}]
2877        if node is None:
2878            node = self.master
2879
2880        self.log.info("**** add built-in '%s' user to node %s ****" % (testuser[0]["name"],
2881                                                                       node.ip))
2882        RbacBase().create_user_source(testuser, 'builtin', node)
2883
2884        self.log.info("**** add '%s' role to '%s' user ****" % (rolelist[0]["roles"],
2885                                                                testuser[0]["name"]))
2886        RbacBase().add_user_role(rolelist, RestConnection(node), 'builtin')
2887
2888    def get_cb_cluster_by_name(self, name):
2889        """Return couchbase cluster object for given name.
2890        @return: CouchbaseCluster object
2891        """
2892        for cb_cluster in self.__cb_clusters:
2893            if cb_cluster.get_name() == name:
2894                return cb_cluster
2895        raise XDCRException("Couchbase Cluster with name: %s not exist" % name)
2896
2897    def get_num_cb_cluster(self):
2898        """Return number of couchbase clusters for tests.
2899        """
2900        return len(self.__cb_clusters)
2901
2902    def get_cb_clusters(self):
2903        return self.__cb_clusters
2904
2905    def get_lww(self):
2906        return self.__lww
2907
2908    def get_report_error_list(self):
2909        return self.__report_error_list
2910
2911    def __calculate_bucket_size(self, cluster_quota, num_buckets):
2912
2913        if 'quota_percent' in self._input.test_params:
2914            quota_percent = int(self._input.test_params['quota_percent'])
2915        else:
2916            quota_percent = None
2917
2918        dgm_run = self._input.param("dgm_run", 0)
2919        bucket_size = 0
2920        if dgm_run:
2921            # buckets cannot be created if size<100MB
2922            bucket_size = 256
2923        elif quota_percent is not None and num_buckets > 0:
2924            bucket_size = int( float(cluster_quota - 500) * float(quota_percent/100.0 ) /float(num_buckets) )
2925        elif num_buckets > 0:
2926            bucket_size = int((float(cluster_quota) - 500)/float(num_buckets))
2927        return bucket_size
2928
2929    def __create_buckets(self):
2930        # if mixed priority is set by user, set high priority for sasl and
2931        # standard buckets
2932        if self.__mixed_priority:
2933            bucket_priority = 'high'
2934        else:
2935            bucket_priority = None
2936        num_buckets = self.__num_sasl_buckets + \
2937            self.__num_stand_buckets + int(self._create_default_bucket)
2938
2939        maxttl = self._input.param("maxttl", None)
2940
2941        for cb_cluster in self.__cb_clusters:
2942            total_quota = cb_cluster.get_mem_quota()
2943            bucket_size = self.__calculate_bucket_size(
2944                total_quota,
2945                num_buckets)
2946
2947            if self._create_default_bucket:
2948                cb_cluster.create_default_bucket(
2949                    bucket_size,
2950                    self._num_replicas,
2951                    eviction_policy=self.__eviction_policy,
2952                    bucket_priority=bucket_priority,
2953                    lww=self.__lww,
2954                    maxttl=maxttl)
2955
2956            cb_cluster.create_sasl_buckets(
2957                bucket_size, num_buckets=self.__num_sasl_buckets,
2958                num_replicas=self._num_replicas,
2959                eviction_policy=self.__eviction_policy,
2960                bucket_priority=bucket_priority, lww=self.__lww,
2961                maxttl=maxttl)
2962
2963            cb_cluster.create_standard_buckets(
2964                bucket_size, num_buckets=self.__num_stand_buckets,
2965                num_replicas=self._num_replicas,
2966                eviction_policy=self.__eviction_policy,
2967                bucket_priority=bucket_priority, lww=self.__lww,
2968                maxttl=maxttl)
2969
2970    def create_buckets_on_cluster(self, cluster_name):
2971        # if mixed priority is set by user, set high priority for sasl and
2972        # standard buckets
2973        if self.__mixed_priority:
2974            bucket_priority = 'high'
2975        else:
2976            bucket_priority = None
2977        num_buckets = self.__num_sasl_buckets + \
2978            self.__num_stand_buckets + int(self._create_default_bucket)
2979
2980        cb_cluster = self.get_cb_cluster_by_name(cluster_name)
2981        total_quota = cb_cluster.get_mem_quota()
2982        bucket_size = self.__calculate_bucket_size(
2983            total_quota,
2984            num_buckets)
2985
2986        if self._create_default_bucket:
2987            cb_cluster.create_default_bucket(
2988                bucket_size,
2989                self._num_replicas,
2990                eviction_policy=self.__eviction_policy,
2991                bucket_priority=bucket_priority)
2992
2993        cb_cluster.create_sasl_buckets(
2994            bucket_size, num_buckets=self.__num_sasl_buckets,
2995            num_replicas=self._num_replicas,
2996            eviction_policy=self.__eviction_policy,
2997            bucket_priority=bucket_priority)
2998
2999        cb_cluster.create_standard_buckets(
3000            bucket_size, num_buckets=self.__num_stand_buckets,
3001            num_replicas=self._num_replicas,
3002            eviction_policy=self.__eviction_policy,
3003            bucket_priority=bucket_priority)
3004
3005    def __set_topology_chain(self):
3006        """Will Setup Remote Cluster Chain Topology i.e. A -> B -> C
3007        """
3008        for i, cb_cluster in enumerate(self.__cb_clusters):
3009            if i >= len(self.__cb_clusters) - 1:
3010                break
3011            cb_cluster.add_remote_cluster(
3012                self.__cb_clusters[i + 1],
3013                Utility.get_rc_name(
3014                    cb_cluster.get_name(),
3015                    self.__cb_clusters[i + 1].get_name()),
3016                self._demand_encryption,
3017                self._replicator_role
3018            )
3019            if self._rdirection == REPLICATION_DIRECTION.BIDIRECTION:
3020                self.__cb_clusters[i + 1].add_remote_cluster(
3021                    cb_cluster,
3022                    Utility.get_rc_name(
3023                        self.__cb_clusters[i + 1].get_name(),
3024                        cb_cluster.get_name()),
3025                    self._demand_encryption,
3026                    self._replicator_role
3027                )
3028
3029    def __set_topology_star(self):
3030        """Will Setup Remote Cluster Star Topology i.e. A-> B, A-> C, A-> D
3031        """
3032        hub = self.__cb_clusters[0]
3033        for cb_cluster in self.__cb_clusters[1:]:
3034            hub.add_remote_cluster(
3035                cb_cluster,
3036                Utility.get_rc_name(hub.get_name(), cb_cluster.get_name()),
3037                self._demand_encryption,
3038                self._replicator_role
3039            )
3040            if self._rdirection == REPLICATION_DIRECTION.BIDIRECTION:
3041                cb_cluster.add_remote_cluster(
3042                    hub,
3043                    Utility.get_rc_name(cb_cluster.get_name(), hub.get_name()),
3044                    self._demand_encryption,
3045                    self._replicator_role
3046                )
3047
3048    def __set_topology_ring(self):
3049        """
3050        Will Setup Remote Cluster Ring Topology i.e. A -> B -> C -> A
3051        """
3052        self.__set_topology_chain()
3053        self.__cb_clusters[-1].add_remote_cluster(
3054            self.__cb_clusters[0],
3055            Utility.get_rc_name(
3056                self.__cb_clusters[-1].get_name(),
3057                self.__cb_clusters[0].get_name()),
3058            self._demand_encryption,
3059            self._replicator_role
3060        )
3061        if self._rdirection == REPLICATION_DIRECTION.BIDIRECTION:
3062            self.__cb_clusters[0].add_remote_cluster(
3063                self.__cb_clusters[-1],
3064                Utility.get_rc_name(
3065                    self.__cb_clusters[0].get_name(),
3066                    self.__cb_clusters[-1].get_name()),
3067                self._demand_encryption,
3068                self._replicator_role
3069            )
3070
3071    def set_xdcr_topology(self):
3072        """Setup xdcr topology as per ctopology test parameter.
3073        """
3074        if self.__topology == TOPOLOGY.CHAIN:
3075            self.__set_topology_chain()
3076        elif self.__topology == TOPOLOGY.STAR:
3077            self.__set_topology_star()
3078        elif self.__topology == TOPOLOGY.RING:
3079            self.__set_topology_ring()
3080        elif self._input.param(TOPOLOGY.HYBRID, 0):
3081            self.set_hybrid_topology()
3082        else:
3083            raise XDCRException(
3084                'Unknown topology set: {0}'.format(
3085                    self.__topology))
3086
3087    def __parse_topology_param(self):
3088        tokens = re.split(r'(>|<>|<|\s)', self.__topology)
3089        return tokens
3090
3091    def set_hybrid_topology(self):
3092        """Set user defined topology
3093        Hybrid Topology Notations:
3094        '> or <' for Unidirection replication between clusters
3095        '<>' for Bi-direction replication between clusters
3096        Test Input:  ctopology="C1>C2<>C3>C4<>C1"
3097        """
3098        tokens = self.__parse_topology_param()
3099        counter = 0
3100        while counter < len(tokens) - 1:
3101            src_cluster = self.get_cb_cluster_by_name(tokens[counter])
3102            dest_cluster = self.get_cb_cluster_by_name(tokens[counter + 2])
3103            if ">" in tokens[counter + 1]:
3104                src_cluster.add_remote_cluster(
3105                    dest_cluster,
3106                    Utility.get_rc_name(
3107                        src_cluster.get_name(),
3108                        dest_cluster.get_name()),
3109                    self._demand_encryption
3110                )
3111            if "<" in tokens[counter + 1]:
3112                dest_cluster.add_remote_cluster(
3113                    src_cluster,
3114                    Utility.get_rc_name(
3115                        dest_cluster.get_name(), src_cluster.get_name()),
3116                    self._demand_encryption
3117                )
3118            counter += 2
3119
3120    def __async_load_chain(self):
3121        for i, cluster in enumerate(self.__cb_clusters):
3122            if self._rdirection == REPLICATION_DIRECTION.BIDIRECTION:
3123                if i > len(self.__cb_clusters) - 1:
3124                    break
3125            else:
3126                if i >= len(self.__cb_clusters) - 1:
3127                    break
3128            if not self._dgm_run:
3129                return cluster.async_load_all_buckets(self._num_items,
3130                                                      self._value_size)
3131            else:
3132                #TODO: async this!
3133                cluster.load_all_buckets_till_dgm(
3134                    active_resident_threshold=self._active_resident_threshold,
3135                    items=self._num_items)
3136
3137    def __load_chain(self):
3138        for i, cluster in enumerate(self.__cb_clusters):
3139            if self._rdirection == REPLICATION_DIRECTION.BIDIRECTION:
3140                if i > len(self.__cb_clusters) - 1:
3141                    break
3142            else:
3143                if i >= len(self.__cb_clusters) - 1:
3144                    break
3145            if not self._dgm_run:
3146                cluster.load_all_buckets(self._num_items, self._value_size)
3147            else:
3148                cluster.load_all_buckets_till_dgm(
3149                    active_resident_threshold=self._active_resident_threshold,
3150                    items=self._num_items)
3151
3152    def __load_star(self):
3153        hub = self.__cb_clusters[0]
3154        if self._dgm_run:
3155            hub.load_all_buckets_till_dgm(
3156                active_resident_threshold=self._active_resident_threshold,
3157                item=self._num_items)
3158        else:
3159            hub.load_all_buckets(self._num_items, self._value_size)
3160
3161    def __async_load_star(self):
3162        hub = self.__cb_clusters[0]
3163        if self._dgm_run:
3164            #TODO: async this
3165            hub.load_all_buckets_till_dgm(
3166                active_resident_threshold=self._active_resident_threshold,
3167                item=self._num_items)
3168        else:
3169            return hub.async_load_all_buckets(self._num_items, self._value_size)
3170
3171    def __load_ring(self):
3172        self.__load_chain()
3173
3174    def __async_load_ring(self):
3175        self.__async_load_chain()
3176
3177    def load_data_topology(self):
3178        """load data as per ctopology test parameter
3179        """
3180        if self.__topology == TOPOLOGY.CHAIN:
3181            self.__load_chain()
3182        elif self.__topology == TOPOLOGY.STAR:
3183            self.__load_star()
3184        elif self.__topology == TOPOLOGY.RING:
3185            self.__load_ring()
3186        elif self._input.param(TOPOLOGY.HYBRID, 0):
3187            self.__load_star()
3188        else:
3189            raise XDCRException(
3190                'Unknown topology set: {0}'.format(
3191                    self.__topology))
3192
3193    def async_load_data_topology(self):
3194        """load data as per ctopology test parameter
3195        """
3196        if self.__topology == TOPOLOGY.CHAIN:
3197            return self.__async_load_chain()
3198        elif self.__topology == TOPOLOGY.STAR:
3199            return self.__async_load_star()
3200        elif self.__topology</