1import unittest
2import time
3import copy
4import logger
5import logging
6import re
7
8from couchbase_helper.cluster import Cluster
9from membase.api.rest_client import RestConnection, Bucket
10from membase.api.exception import ServerUnavailableException
11from remote.remote_util import RemoteMachineShellConnection
12from remote.remote_util import RemoteUtilHelper
13from testconstants import STANDARD_BUCKET_PORT
14from couchbase_helper.document import View
15from membase.helper.cluster_helper import ClusterOperationHelper
16from couchbase_helper.stats_tools import StatsCommon
17from membase.helper.bucket_helper import BucketOperationHelper
18from memcached.helper.data_helper import MemcachedClientHelper
19from TestInput import TestInputSingleton
20from scripts.collect_server_info import cbcollectRunner
21from scripts import collect_data_files
22from tasks.future import TimeoutError
23
24from couchbase_helper.documentgenerator import BlobGenerator, DocumentGenerator
25from lib.membase.api.exception import XDCRException
26from security.auditmain import audit
27from security.rbac_base import RbacBase
28
29
30class RenameNodeException(XDCRException):
31
32    """Exception thrown when converting ip to hostname failed
33    """
34
35    def __init__(self, msg=''):
36        XDCRException.__init__(self, msg)
37
38
39class RebalanceNotStopException(XDCRException):
40
41    """Exception thrown when stopping rebalance failed
42    """
43
44    def __init__(self, msg=''):
45        XDCRException.__init__(self, msg)
46
47
48def raise_if(cond, ex):
49    """Raise Exception if condition is True
50    """
51    if cond:
52        raise ex
53
54
55class TOPOLOGY:
56    CHAIN = "chain"
57    STAR = "star"
58    RING = "ring"
59    HYBRID = "hybrid"
60
61
62class REPLICATION_DIRECTION:
63    UNIDIRECTION = "unidirection"
64    BIDIRECTION = "bidirection"
65
66
67class REPLICATION_TYPE:
68    CONTINUOUS = "continuous"
69
70
71class REPLICATION_PROTOCOL:
72    CAPI = "capi"
73    XMEM = "xmem"
74
75
76class INPUT:
77    REPLICATION_DIRECTION = "rdirection"
78    CLUSTER_TOPOLOGY = "ctopology"
79    SEED_DATA = "sdata"
80    SEED_DATA_MODE = "sdata_mode"
81    SEED_DATA_OPERATION = "sdata_op"
82    POLL_INTERVAL = "poll_interval"  # in seconds
83    POLL_TIMEOUT = "poll_timeout"  # in seconds
84    SEED_DATA_MODE_SYNC = "sync"
85
86
87class OPS:
88    CREATE = "create"
89    UPDATE = "update"
90    DELETE = "delete"
91    APPEND = "append"
92
93
94class EVICTION_POLICY:
95    VALUE_ONLY = "valueOnly"
96    FULL_EVICTION = "fullEviction"
97    NO_EVICTION = "noEviction"
98    NRU_EVICTION = "nruEviction"
99    CB = [VALUE_ONLY, FULL_EVICTION]
100    EPH = [NO_EVICTION, NRU_EVICTION]
101
102class BUCKET_PRIORITY:
103    HIGH = "high"
104
105
106class BUCKET_NAME:
107    DEFAULT = "default"
108
109
110class OS:
111    WINDOWS = "windows"
112    LINUX = "linux"
113    OSX = "osx"
114
115
116class COMMAND:
117    SHUTDOWN = "shutdown"
118    REBOOT = "reboot"
119
120
121class STATE:
122    RUNNING = "running"
123
124
125class REPL_PARAM:
126    FAILURE_RESTART = "failureRestartInterval"
127    CHECKPOINT_INTERVAL = "checkpointInterval"
128    OPTIMISTIC_THRESHOLD = "optimisticReplicationThreshold"
129    FILTER_EXP = "filterExpression"
130    SOURCE_NOZZLES = "sourceNozzlePerNode"
131    TARGET_NOZZLES = "targetNozzlePerNode"
132    BATCH_COUNT = "workerBatchSize"
133    BATCH_SIZE = "docBatchSizeKb"
134    LOG_LEVEL = "logLevel"
135    MAX_REPLICATION_LAG = "maxExpectedReplicationLag"
136    TIMEOUT_PERC = "timeoutPercentageCap"
137    PAUSE_REQUESTED = "pauseRequested"
138
139
140class TEST_XDCR_PARAM:
141    FAILURE_RESTART = "failure_restart_interval"
142    CHECKPOINT_INTERVAL = "checkpoint_interval"
143    OPTIMISTIC_THRESHOLD = "optimistic_threshold"
144    FILTER_EXP = "filter_expression"
145    SOURCE_NOZZLES = "source_nozzles"
146    TARGET_NOZZLES = "target_nozzles"
147    BATCH_COUNT = "batch_count"
148    BATCH_SIZE = "batch_size"
149    LOG_LEVEL = "log_level"
150    MAX_REPLICATION_LAG = "max_replication_lag"
151    TIMEOUT_PERC = "timeout_percentage"
152
153    @staticmethod
154    def get_test_to_create_repl_param_map():
155        return {
156            TEST_XDCR_PARAM.FAILURE_RESTART: REPL_PARAM.FAILURE_RESTART,
157            TEST_XDCR_PARAM.CHECKPOINT_INTERVAL: REPL_PARAM.CHECKPOINT_INTERVAL,
158            TEST_XDCR_PARAM.OPTIMISTIC_THRESHOLD: REPL_PARAM.OPTIMISTIC_THRESHOLD,
159            TEST_XDCR_PARAM.FILTER_EXP: REPL_PARAM.FILTER_EXP,
160            TEST_XDCR_PARAM.SOURCE_NOZZLES: REPL_PARAM.SOURCE_NOZZLES,
161            TEST_XDCR_PARAM.TARGET_NOZZLES: REPL_PARAM.TARGET_NOZZLES,
162            TEST_XDCR_PARAM.BATCH_COUNT: REPL_PARAM.BATCH_COUNT,
163            TEST_XDCR_PARAM.BATCH_SIZE: REPL_PARAM.BATCH_SIZE,
164            TEST_XDCR_PARAM.MAX_REPLICATION_LAG: REPL_PARAM.MAX_REPLICATION_LAG,
165            TEST_XDCR_PARAM.TIMEOUT_PERC: REPL_PARAM.TIMEOUT_PERC,
166            TEST_XDCR_PARAM.LOG_LEVEL: REPL_PARAM.LOG_LEVEL
167        }
168
169
170class XDCR_PARAM:
171    # Per-replication params (input)
172    XDCR_FAILURE_RESTART = "xdcrFailureRestartInterval"
173    XDCR_CHECKPOINT_INTERVAL = "xdcrCheckpointInterval"
174    XDCR_OPTIMISTIC_THRESHOLD = "xdcrOptimisticReplicationThreshold"
175    XDCR_FILTER_EXP = "xdcrFilterExpression"
176    XDCR_SOURCE_NOZZLES = "xdcrSourceNozzlePerNode"
177    XDCR_TARGET_NOZZLES = "xdcrTargetNozzlePerNode"
178    XDCR_BATCH_COUNT = "xdcrWorkerBatchSize"
179    XDCR_BATCH_SIZE = "xdcrDocBatchSizeKb"
180    XDCR_LOG_LEVEL = "xdcrLogLevel"
181    XDCR_MAX_REPLICATION_LAG = "xdcrMaxExpectedReplicationLag"
182    XDCR_TIMEOUT_PERC = "xdcrTimeoutPercentageCap"
183
184
185class CHECK_AUDIT_EVENT:
186    CHECK = False
187
188# Event Definition:
189# https://github.com/couchbase/goxdcr/blob/master/etc/audit_descriptor.json
190
191class GO_XDCR_AUDIT_EVENT_ID:
192    CREATE_CLUSTER = 16384
193    MOD_CLUSTER = 16385
194    RM_CLUSTER = 16386
195    CREATE_REPL = 16387
196    PAUSE_REPL = 16388
197    RESUME_REPL = 16389
198    CAN_REPL = 16390
199    DEFAULT_SETT = 16391
200    IND_SETT = 16392
201
202
203class NodeHelper:
204    _log = logger.Logger.get_logger()
205
206    @staticmethod
207    def disable_firewall(
208            server, rep_direction=REPLICATION_DIRECTION.UNIDIRECTION):
209        """Disable firewall to put restriction to replicate items in XDCR.
210        @param server: server object to disable firewall
211        @param rep_direction: replication direction unidirection/bidirection
212        """
213        shell = RemoteMachineShellConnection(server)
214        shell.info = shell.extract_remote_info()
215
216        if shell.info.type.lower() == "windows":
217            output, error = shell.execute_command('netsh advfirewall set publicprofile state off')
218            shell.log_command_output(output, error)
219            output, error = shell.execute_command('netsh advfirewall set privateprofile state off')
220            shell.log_command_output(output, error)
221            # for details see RemoteUtilHelper.enable_firewall for windows
222            output, error = shell.execute_command('netsh advfirewall firewall delete rule name="block erl.exe in"')
223            shell.log_command_output(output, error)
224            output, error = shell.execute_command('netsh advfirewall firewall delete rule name="block erl.exe out"')
225            shell.log_command_output(output, error)
226        else:
227            o, r = shell.execute_command("iptables -F")
228            shell.log_command_output(o, r)
229            o, r = shell.execute_command(
230                "/sbin/iptables -A INPUT -p tcp -i eth0 --dport 1000:65535 -j ACCEPT")
231            shell.log_command_output(o, r)
232            if rep_direction == REPLICATION_DIRECTION.BIDIRECTION:
233                o, r = shell.execute_command(
234                    "/sbin/iptables -A OUTPUT -p tcp -o eth0 --dport 1000:65535 -j ACCEPT")
235                shell.log_command_output(o, r)
236            o, r = shell.execute_command(
237                "/sbin/iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT")
238            shell.log_command_output(o, r)
239            # self.log.info("enabled firewall on {0}".format(server))
240            o, r = shell.execute_command("/sbin/iptables --list")
241            shell.log_command_output(o, r)
242        shell.disconnect()
243
244    @staticmethod
245    def reboot_server(server, test_case, wait_timeout=60):
246        """Reboot a server and wait for couchbase server to run.
247        @param server: server object, which needs to be rebooted.
248        @param test_case: test case object, since it has assert() function
249                        which is used by wait_for_ns_servers_or_assert
250                        to throw assertion.
251        @param wait_timeout: timeout to whole reboot operation.
252        """
253        # self.log.info("Rebooting server '{0}'....".format(server.ip))
254        shell = RemoteMachineShellConnection(server)
255        if shell.extract_remote_info().type.lower() == OS.WINDOWS:
256            o, r = shell.execute_command(
257                "{0} -r -f -t 0".format(COMMAND.SHUTDOWN))
258        elif shell.extract_remote_info().type.lower() == OS.LINUX:
259            o, r = shell.execute_command(COMMAND.REBOOT)
260        shell.log_command_output(o, r)
261        # wait for restart and warmup on all server
262        time.sleep(wait_timeout * 5)
263        # disable firewall on these nodes
264        NodeHelper.disable_firewall(server)
265        # wait till server is ready after warmup
266        ClusterOperationHelper.wait_for_ns_servers_or_assert(
267            [server],
268            test_case,
269            wait_if_warmup=True)
270
271    @staticmethod
272    def enable_firewall(
273            server, rep_direction=REPLICATION_DIRECTION.UNIDIRECTION):
274        """Enable firewall
275        @param server: server object to enable firewall
276        @param rep_direction: replication direction unidirection/bidirection
277        """
278        is_bidirectional = rep_direction == REPLICATION_DIRECTION.BIDIRECTION
279        RemoteUtilHelper.enable_firewall(
280            server,
281            bidirectional=is_bidirectional,
282            xdcr=True)
283
284    @staticmethod
285    def do_a_warm_up(server):
286        """Warmp up server
287        """
288        shell = RemoteMachineShellConnection(server)
289        shell.stop_couchbase()
290        time.sleep(5)
291        shell.start_couchbase()
292        shell.disconnect()
293
294    @staticmethod
295    def wait_service_started(server, wait_time=120):
296        """Function will wait for Couchbase service to be in
297        running phase.
298        """
299        shell = RemoteMachineShellConnection(server)
300        os_type = shell.extract_remote_info().distribution_type
301        if os_type.lower() == 'windows':
302            cmd = "sc query CouchbaseServer | grep STATE"
303        else:
304            cmd = "service couchbase-server status"
305        now = time.time()
306        while time.time() - now < wait_time:
307            output, _ = shell.execute_command(cmd)
308            if str(output).lower().find("running") != -1:
309                # self.log.info("Couchbase service is running")
310                return
311            time.sleep(10)
312        raise Exception(
313            "Couchbase service is not running after {0} seconds".format(
314                wait_time))
315
316    @staticmethod
317    def wait_warmup_completed(warmupnodes, bucket_names=["default"]):
318        if isinstance(bucket_names, str):
319            bucket_names = [bucket_names]
320        start = time.time()
321        for server in warmupnodes:
322            for bucket in bucket_names:
323                while time.time() - start < 150:
324                    try:
325                        mc = MemcachedClientHelper.direct_client(server, bucket)
326                        if mc.stats()["ep_warmup_thread"] == "complete":
327                            NodeHelper._log.info(
328                                "Warmed up: %s items on %s on %s" %
329                                (mc.stats("warmup")["ep_warmup_key_count"], bucket, server))
330                            time.sleep(10)
331                            break
332                        elif mc.stats()["ep_warmup_thread"] == "running":
333                            NodeHelper._log.info(
334                                "Still warming up .. ep_warmup_key_count : %s" % (mc.stats("warmup")["ep_warmup_key_count"]))
335                            continue
336                        else:
337                            NodeHelper._log.info(
338                                "Value of ep_warmup_thread does not exist, exiting from this server")
339                            break
340                    except Exception as e:
341                        NodeHelper._log.info(e)
342                        time.sleep(10)
343                if mc.stats()["ep_warmup_thread"] == "running":
344                    NodeHelper._log.info(
345                            "ERROR: ep_warmup_thread's status not complete")
346                mc.close()
347
348
349    @staticmethod
350    def wait_node_restarted(
351            server, test_case, wait_time=120, wait_if_warmup=False,
352            check_service=False):
353        """Wait server to be re-started
354        """
355        now = time.time()
356        if check_service:
357            NodeHelper.wait_service_started(server, wait_time)
358            wait_time = now + wait_time - time.time()
359        num = 0
360        while num < wait_time / 10:
361            try:
362                ClusterOperationHelper.wait_for_ns_servers_or_assert(
363                    [server], test_case, wait_time=wait_time - num * 10,
364                    wait_if_warmup=wait_if_warmup)
365                break
366            except ServerUnavailableException:
367                num += 1
368                time.sleep(10)
369
370    @staticmethod
371    def kill_erlang(server):
372        """Kill erlang process running on server.
373        """
374        NodeHelper._log.info("Killing erlang on server: {0}".format(server))
375        shell = RemoteMachineShellConnection(server)
376        os_info = shell.extract_remote_info()
377        shell.kill_erlang(os_info)
378        shell.disconnect()
379
380    @staticmethod
381    def kill_memcached(server):
382        """Kill memcached process running on server.
383        """
384        shell = RemoteMachineShellConnection(server)
385        shell.kill_erlang()
386        shell.disconnect()
387
388    @staticmethod
389    def get_goxdcr_log_dir(node):
390        """Gets couchbase log directory, even for cluster_run
391        """
392        _, dir = RestConnection(node).diag_eval('filename:absname(element(2, application:get_env(ns_server,error_logger_mf_dir))).')
393        return str(dir)
394
395    @staticmethod
396    def check_goxdcr_log(server, str, goxdcr_log=None, print_matches=None, log_name=None, timeout=0):
397        """ Checks if a string 'str' is present in 'log_name' on 'server'
398            and returns the number of occurances
399            @param goxdcr_log: goxdcr log location on the server
400            @timeout: search every 10 seconds until timeout
401        """
402        if not log_name:
403            log_name = "goxdcr.log"
404        if not goxdcr_log:
405            goxdcr_log = NodeHelper.get_goxdcr_log_dir(server)\
406                     + '/' + log_name + '*'
407        shell = RemoteMachineShellConnection(server)
408        info = shell.extract_remote_info().type.lower()
409        if info == "windows":
410            cmd = "grep "
411        else:
412            cmd = "zgrep "
413        cmd += "\"{0}\" {1}".format(str, goxdcr_log)
414        iter = 0
415        count = 0
416        matches = []
417        # Search 5 times with a break of timeout sec
418        while iter < 5:
419            matches, err = shell.execute_command(cmd)
420            count = len(matches)
421            if count > 0 or timeout == 0:
422                break
423            else:
424                time.sleep(timeout)
425                NodeHelper._log.info("Waiting {0}s for {1} to appear in {2} ..".format(timeout, str, log_name))
426            iter += 1
427        shell.disconnect()
428        NodeHelper._log.info(count)
429        if print_matches:
430            NodeHelper._log.info(matches)
431            return matches, count
432        return count
433
434    @staticmethod
435    def rename_nodes(servers):
436        """Rename server name from ip to their hostname
437        @param servers: list of server objects.
438        @return: dictionary whose key is server and value is hostname
439        """
440        hostnames = {}
441        for server in servers:
442            shell = RemoteMachineShellConnection(server)
443            try:
444                if "ec2" in str(server.ip):
445                    hostname = shell.get_aws_public_hostname()
446                elif "azure" in str(server.ip):
447                    hostname = str(server.ip)
448                else:
449                    hostname = shell.get_full_hostname()
450                rest = RestConnection(server)
451                renamed, content = rest.rename_node(
452                    hostname, username=server.rest_username,
453                    password=server.rest_password)
454                raise_if(
455                    not renamed,
456                    RenameNodeException(
457                        "Server %s is not renamed! Hostname %s. Error %s" % (
458                            server, hostname, content)
459                    )
460                )
461                hostnames[server] = hostname
462                server.hostname = hostname
463            finally:
464                shell.disconnect()
465        return hostnames
466
467    # Returns version like "x.x.x" after removing build number
468    @staticmethod
469    def get_cb_version(node):
470        rest = RestConnection(node)
471        version = rest.get_nodes_self().version
472        return version[:version.rfind('-')]
473
474    @staticmethod
475    def set_wall_clock_time(node, date_str):
476        shell = RemoteMachineShellConnection(node)
477        # os_info = shell.extract_remote_info()
478        # if os_info == OS.LINUX:
479        # date command works on linux and windows cygwin as well.
480        shell.execute_command(
481            "sudo date -s '%s'" %
482            time.ctime(
483                date_str.tx_time))
484        # elif os_info == OS.WINDOWS:
485        #    raise "NEED To SETUP DATE COMMAND FOR WINDOWS"
486
487    @staticmethod
488    def get_cbcollect_info(server):
489        """Collect cbcollectinfo logs for all the servers in the cluster.
490        """
491        path = TestInputSingleton.input.param("logs_folder", "/tmp")
492        print "grabbing cbcollect from {0}".format(server.ip)
493        path = path or "."
494        try:
495            cbcollectRunner(server, path).run()
496            TestInputSingleton.input.test_params[
497                "get-cbcollect-info"] = False
498        except Exception as e:
499            NodeHelper._log.error(
500                "IMPOSSIBLE TO GRAB CBCOLLECT FROM {0}: {1}".format(
501                    server.ip,
502                    e))
503    @staticmethod
504    def collect_data_files(server):
505        """Collect bucket data files for all the servers in the cluster.
506        Data files are collected only if data is not verified on the cluster.
507        """
508        path = TestInputSingleton.input.param("logs_folder", "/tmp")
509        collect_data_files.cbdatacollectRunner(server, path).run()
510
511    @staticmethod
512    def collect_logs(server, cluster_run=False):
513        """Grab cbcollect before we cleanup
514        """
515        NodeHelper.get_cbcollect_info(server)
516        if not cluster_run:
517            NodeHelper.collect_data_files(server)
518
519class ValidateAuditEvent:
520
521    @staticmethod
522    def validate_audit_event(event_id, master_node, expected_results):
523        if CHECK_AUDIT_EVENT.CHECK:
524            audit_obj = audit(event_id, master_node)
525            field_verified, value_verified = audit_obj.validateEvents(
526                expected_results)
527            raise_if(
528                not field_verified,
529                XDCRException("One of the fields is not matching"))
530            raise_if(
531                not value_verified,
532                XDCRException("Values for one of the fields is not matching"))
533
534
535class FloatingServers:
536
537    """Keep Track of free servers, For Rebalance-in
538    or swap-rebalance operations.
539    """
540    _serverlist = []
541
542
543class XDCRRemoteClusterRef:
544
545    """Class keep the information related to Remote Cluster References.
546    """
547
548    def __init__(self, src_cluster, dest_cluster, name, encryption=False, replicator_target_role=False):
549        """
550        @param src_cluster: source couchbase cluster object.
551        @param dest_cluster: destination couchbase cluster object:
552        @param name: remote cluster reference name.
553        @param encryption: True to enable SSL encryption for replication else
554                        False
555        """
556        self.__src_cluster = src_cluster
557        self.__dest_cluster = dest_cluster
558        self.__name = name
559        self.__encryption = encryption
560        self.__rest_info = {}
561        self.__replicator_target_role = replicator_target_role
562        self.__use_scramsha = TestInputSingleton.input.param("use_scramsha", False)
563
564        # List of XDCReplication objects
565        self.__replications = []
566
567    def __str__(self):
568        return "{0} -> {1}, Name: {2}".format(
569            self.__src_cluster.get_name(), self.__dest_cluster.get_name(),
570            self.__name)
571
572    def get_cb_clusters(self):
573        return self.__cb_clusters
574
575    def get_src_cluster(self):
576        return self.__src_cluster
577
578    def get_dest_cluster(self):
579        return self.__dest_cluster
580
581    def get_name(self):
582        return self.__name
583
584    def get_replications(self):
585        return self.__replications
586
587    def get_rest_info(self):
588        return self.__rest_info
589
590    def get_replication_for_bucket(self, bucket):
591        for replication in self.__replications:
592            if replication.get_src_bucket().name == bucket.name:
593                return replication
594        return None
595
596    def __get_event_expected_results(self):
597        expected_results = {
598                "real_userid:source": "ns_server",
599                "real_userid:user": self.__src_cluster.get_master_node().rest_username,
600                "cluster_name": self.__name,
601                "cluster_hostname": "%s:%s" % (self.__dest_cluster.get_master_node().ip, self.__dest_cluster.get_master_node().port),
602                "is_encrypted": self.__encryption,
603                "encryption_type": ""}
604
605        return expected_results
606
607    def __validate_create_event(self):
608            ValidateAuditEvent.validate_audit_event(
609                GO_XDCR_AUDIT_EVENT_ID.CREATE_CLUSTER,
610                self.__src_cluster.get_master_node(),
611                self.__get_event_expected_results())
612
613    def add(self):
614        """create cluster reference- add remote cluster
615        """
616        rest_conn_src = RestConnection(self.__src_cluster.get_master_node())
617        certificate = ""
618        dest_master = self.__dest_cluster.get_master_node()
619        if self.__encryption:
620            rest_conn_dest = RestConnection(dest_master)
621            certificate = rest_conn_dest.get_cluster_ceritificate()
622
623        if self.__replicator_target_role:
624            self.dest_user = "replicator_user"
625            self.dest_pass = "password"
626        else:
627            self.dest_user = dest_master.rest_username
628            self.dest_pass = dest_master.rest_password
629
630        if not self.__use_scramsha:
631            self.__rest_info = rest_conn_src.add_remote_cluster(
632                dest_master.ip, dest_master.port,
633                self.dest_user,
634                self.dest_pass, self.__name,
635                demandEncryption=self.__encryption,
636                certificate=certificate)
637        else:
638            print "Using scram-sha authentication"
639            self.__rest_info = rest_conn_src.add_remote_cluster(
640                dest_master.ip, dest_master.port,
641                self.dest_user,
642                self.dest_pass, self.__name,
643                demandEncryption=self.__encryption,
644                encryptionType="half"
645            )
646
647        self.__validate_create_event()
648
649    def __validate_modify_event(self):
650        ValidateAuditEvent.validate_audit_event(
651            GO_XDCR_AUDIT_EVENT_ID.MOD_CLUSTER,
652            self.__src_cluster.get_master_node(),
653            self.__get_event_expected_results())
654
655    def use_scram_sha_auth(self):
656        self.__use_scramsha = True
657        self.__encryption = True
658        self.modify()
659
660    def modify(self, encryption=True):
661        """Modify cluster reference to enable SSL encryption
662        """
663        dest_master = self.__dest_cluster.get_master_node()
664        rest_conn_src = RestConnection(self.__src_cluster.get_master_node())
665        certificate = ""
666        if encryption:
667            rest_conn_dest = RestConnection(dest_master)
668            if not self.__use_scramsha:
669                certificate = rest_conn_dest.get_cluster_ceritificate()
670                self.__rest_info = rest_conn_src.modify_remote_cluster(
671                    dest_master.ip, dest_master.port,
672                    self.dest_user,
673                    self.dest_pass, self.__name,
674                    demandEncryption=encryption,
675                    certificate=certificate)
676            else:
677                print "Using scram-sha authentication"
678                self.__rest_info = rest_conn_src.modify_remote_cluster(
679                    dest_master.ip, dest_master.port,
680                    self.dest_user,
681                    self.dest_pass, self.__name,
682                    demandEncryption=encryption,
683                    encryptionType="half")
684        self.__encryption = encryption
685        self.__validate_modify_event()
686
687    def __validate_remove_event(self):
688        ValidateAuditEvent.validate_audit_event(
689            GO_XDCR_AUDIT_EVENT_ID.RM_CLUSTER,
690            self.__src_cluster.get_master_node(),
691            self.__get_event_expected_results())
692
693
694    def remove(self):
695        RestConnection(
696            self.__src_cluster.get_master_node()).remove_remote_cluster(
697            self.__name)
698        self.__validate_remove_event()
699
700    def create_replication(
701            self, fromBucket,
702            rep_type=REPLICATION_PROTOCOL.XMEM,
703            toBucket=None):
704        """Create replication objects, but replication will not get
705        started here.
706        """
707        self.__replications.append(
708            XDCReplication(
709                self,
710                fromBucket,
711                rep_type,
712                toBucket))
713
714    def clear_all_replications(self):
715        self.__replications = []
716
717    def start_all_replications(self):
718        """Start all created replication
719        """
720        [repl.start() for repl in self.__replications]
721
722    def pause_all_replications(self, verify=False):
723        """Pause all created replication
724        """
725        [repl.pause(verify=verify) for repl in self.__replications]
726
727    def pause_all_replications_by_id(self, verify=False):
728        [repl.pause(repl_id=repl.get_repl_id(), verify=verify) for repl in self.__replications]
729
730    def resume_all_replications(self, verify=False):
731        """Resume all created replication
732        """
733        [repl.resume(verify=verify) for repl in self.__replications]
734
735    def resume_all_replications_by_id(self, verify=False):
736        [repl.resume(repl_id=repl.get_repl_id(), verify=verify) for repl in self.__replications]
737
738    def stop_all_replications(self):
739        rest = RestConnection(self.__src_cluster.get_master_node())
740        rest_all_repls = rest.get_replications()
741        for repl in self.__replications:
742            for rest_all_repl in rest_all_repls:
743                if repl.get_repl_id() == rest_all_repl['id']:
744                    repl.cancel(rest, rest_all_repl)
745        self.clear_all_replications()
746
747
748class XDCReplication:
749
750    def __init__(self, remote_cluster_ref, from_bucket, rep_type, to_bucket):
751        """
752        @param remote_cluster_ref: XDCRRemoteClusterRef object
753        @param from_bucket: Source bucket (Bucket object)
754        @param rep_type: replication protocol REPLICATION_PROTOCOL.CAPI/XMEM
755        @param to_bucket: Destination bucket (Bucket object)
756        """
757        self.__input = TestInputSingleton.input
758        self.__remote_cluster_ref = remote_cluster_ref
759        self.__from_bucket = from_bucket
760        self.__to_bucket = to_bucket or from_bucket
761        self.__src_cluster = self.__remote_cluster_ref.get_src_cluster()
762        self.__dest_cluster = self.__remote_cluster_ref.get_dest_cluster()
763        self.__src_cluster_name = self.__src_cluster.get_name()
764        self.__rep_type = rep_type
765        self.__test_xdcr_params = {}
766        self.__updated_params = {}
767
768        self.__parse_test_xdcr_params()
769        self.log = logger.Logger.get_logger()
770
771        # Response from REST API
772        self.__rep_id = None
773
774    def __str__(self):
775        return "Replication {0}:{1} -> {2}:{3}".format(
776            self.__src_cluster.get_name(),
777            self.__from_bucket.name, self.__dest_cluster.get_name(),
778            self.__to_bucket.name)
779
780    # get per replication params specified as from_bucket@cluster_name=
781    # eg. default@C1="xdcrFilterExpression:loadOne,xdcrCheckpointInterval:60,
782    # xdcrFailureRestartInterval:20"
783    def __parse_test_xdcr_params(self):
784        param_str = self.__input.param(
785            "%s@%s" %
786            (self.__from_bucket, self.__src_cluster_name), None)
787        if param_str:
788            argument_split = re.split('[:,]', param_str)
789            self.__test_xdcr_params.update(
790                dict(zip(argument_split[::2], argument_split[1::2]))
791            )
792
793    def __convert_test_to_xdcr_params(self):
794        xdcr_params = {}
795        xdcr_param_map = TEST_XDCR_PARAM.get_test_to_create_repl_param_map()
796        for test_param, value in self.__test_xdcr_params.iteritems():
797            xdcr_params[xdcr_param_map[test_param]] = value
798        return xdcr_params
799
800    def get_filter_exp(self):
801        if TEST_XDCR_PARAM.FILTER_EXP in self.__test_xdcr_params:
802            return self.__test_xdcr_params[TEST_XDCR_PARAM.FILTER_EXP]
803        return None
804
805    def get_src_bucket(self):
806        return self.__from_bucket
807
808    def get_dest_bucket(self):
809        return self.__to_bucket
810
811    def get_src_cluster(self):
812        return self.__src_cluster
813
814    def get_dest_cluster(self):
815        return self.__dest_cluster
816
817    def get_repl_id(self):
818        return self.__rep_id
819
820    def __get_event_expected_results(self):
821        expected_results = {
822                "real_userid:source": "ns_server",
823                "real_userid:user": self.__src_cluster.get_master_node().rest_username,
824                "local_cluster_name": "%s:%s" % (self.__src_cluster.get_master_node().ip, self.__src_cluster.get_master_node().port),
825                "source_bucket_name": self.__from_bucket.name,
826                "remote_cluster_name": self.__remote_cluster_ref.get_name(),
827                "target_bucket_name": self.__to_bucket.name
828            }
829        # optional audit param
830        if self.get_filter_exp():
831            expected_results["filter_expression"] = self.get_filter_exp()
832        return expected_results
833
834    def __validate_update_repl_event(self):
835        expected_results = {
836            "settings": {
837                "continuous": 'true',
838                "target": self.__to_bucket.name,
839                "source": self.__from_bucket.name,
840                "type": "xdc-%s" % self.__rep_type
841            },
842            "id": self.__rep_id,
843            "real_userid:source": "ns_server",
844            "real_userid:user": self.__src_cluster.get_master_node().rest_username,
845        }
846        expected_results["settings"].update(self.__updated_params)
847
848    def __validate_set_param_event(self):
849        expected_results = self.__get_event_expected_results()
850        expected_results["updated_settings"] = self.__updated_params
851        ValidateAuditEvent.validate_audit_event(
852            GO_XDCR_AUDIT_EVENT_ID.IND_SETT,
853            self.get_src_cluster().get_master_node(), expected_results)
854
855    def get_xdcr_setting(self, param):
856        """Get a replication setting value
857        """
858        src_master = self.__src_cluster.get_master_node()
859        return RestConnection(src_master).get_xdcr_param(
860                    self.__from_bucket.name,
861                    self.__to_bucket.name,
862                    param)
863
864    def set_xdcr_param(self, param, value, verify_event=True):
865        """Set a replication setting to a value
866        """
867        src_master = self.__src_cluster.get_master_node()
868        RestConnection(src_master).set_xdcr_param(
869            self.__from_bucket.name,
870            self.__to_bucket.name,
871            param,
872            value)
873        self.log.info("Updated {0}={1} on bucket'{2}' on {3}".format(param, value, self.__from_bucket.name,
874                                                                     self.__src_cluster.get_master_node().ip))
875        self.__updated_params[param] = value
876        if verify_event:
877            self.__validate_set_param_event()
878
879    def __validate_start_audit_event(self):
880        ValidateAuditEvent.validate_audit_event(
881            GO_XDCR_AUDIT_EVENT_ID.CREATE_REPL,
882                self.get_src_cluster().get_master_node(),
883                self.__get_event_expected_results())
884
885    def start(self):
886        """Start replication"""
887        src_master = self.__src_cluster.get_master_node()
888        rest_conn_src = RestConnection(src_master)
889        self.__rep_id = rest_conn_src.start_replication(
890            REPLICATION_TYPE.CONTINUOUS,
891            self.__from_bucket,
892            self.__remote_cluster_ref.get_name(),
893            rep_type=self.__rep_type,
894            toBucket=self.__to_bucket,
895            xdcr_params=self.__convert_test_to_xdcr_params())
896        self.__validate_start_audit_event()
897        #if within this 10s for pipeline updater if we try to create another replication, it doesn't work until the previous pipeline is updated.
898        # but better to have this 10s sleep between replications.
899        time.sleep(10)
900
901    def __verify_pause(self):
902        """Verify if replication is paused"""
903        src_master = self.__src_cluster.get_master_node()
904        # Is bucket replication paused?
905        if not RestConnection(src_master).is_replication_paused(
906                self.__from_bucket.name,
907                self.__to_bucket.name):
908            raise XDCRException(
909                "XDCR is not paused for SrcBucket: {0}, Target Bucket: {1}".
910                format(self.__from_bucket.name,
911                       self.__to_bucket.name))
912
913    def __validate_pause_event(self):
914        ValidateAuditEvent.validate_audit_event(
915            GO_XDCR_AUDIT_EVENT_ID.PAUSE_REPL,
916            self.get_src_cluster().get_master_node(),
917            self.__get_event_expected_results())
918
919    def pause(self, repl_id=None, verify=False):
920        """Pause replication"""
921        src_master = self.__src_cluster.get_master_node()
922        if repl_id:
923            if not RestConnection(src_master).is_replication_paused_by_id(repl_id):
924                RestConnection(src_master).pause_resume_repl_by_id(repl_id, REPL_PARAM.PAUSE_REQUESTED, 'true')
925        else:
926            if not RestConnection(src_master).is_replication_paused(
927                    self.__from_bucket.name, self.__to_bucket.name):
928                self.set_xdcr_param(
929                    REPL_PARAM.PAUSE_REQUESTED,
930                    'true',
931                    verify_event=False)
932
933        self.__validate_pause_event()
934
935        if verify:
936            self.__verify_pause()
937
938    def _is_cluster_replicating(self):
939        count = 0
940        src_master = self.__src_cluster.get_master_node()
941        while count < 3:
942            outbound_mutations = self.__src_cluster.get_xdcr_stat(
943                self.__from_bucket.name,
944                'replication_changes_left')
945            if outbound_mutations == 0:
946                self.log.info(
947                    "Outbound mutations on {0} is {1}".format(
948                        src_master.ip,
949                        outbound_mutations))
950                count += 1
951                continue
952            else:
953                self.log.info(
954                    "Outbound mutations on {0} is {1}".format(
955                        src_master.ip,
956                        outbound_mutations))
957                self.log.info("Node {0} is replicating".format(src_master.ip))
958                break
959        else:
960            self.log.info(
961                "Outbound mutations on {0} is {1}".format(
962                    src_master.ip,
963                    outbound_mutations))
964            self.log.info(
965                "Cluster with node {0} is not replicating".format(
966                    src_master.ip))
967            return False
968        return True
969
970    def __verify_resume(self):
971        """Verify if replication is resumed"""
972        src_master = self.__src_cluster.get_master_node()
973        # Is bucket replication paused?
974        if RestConnection(src_master).is_replication_paused(self.__from_bucket.name,
975                                                            self.__to_bucket.name):
976            raise XDCRException(
977                "Replication is not resumed for SrcBucket: {0}, \
978                Target Bucket: {1}".format(self.__from_bucket, self.__to_bucket))
979
980        if not self._is_cluster_replicating():
981            self.log.info("XDCR completed on {0}".format(src_master.ip))
982
983    def __validate_resume_event(self):
984        ValidateAuditEvent.validate_audit_event(
985            GO_XDCR_AUDIT_EVENT_ID.RESUME_REPL,
986            self.get_src_cluster().get_master_node(),
987            self.__get_event_expected_results())
988
989    def resume(self, repl_id=None, verify=False):
990        """Resume replication if paused"""
991        src_master = self.__src_cluster.get_master_node()
992        if repl_id:
993            if RestConnection(src_master).is_replication_paused_by_id(repl_id):
994                RestConnection(src_master).pause_resume_repl_by_id(repl_id, REPL_PARAM.PAUSE_REQUESTED, 'false')
995        else:
996            if RestConnection(src_master).is_replication_paused(
997                    self.__from_bucket.name, self.__to_bucket.name):
998                self.set_xdcr_param(
999                REPL_PARAM.PAUSE_REQUESTED,
1000                'false',
1001                verify_event=False)
1002
1003        self.__validate_resume_event()
1004
1005        if verify:
1006            self.__verify_resume()
1007
1008    def __validate_cancel_event(self):
1009        ValidateAuditEvent.validate_audit_event(
1010            GO_XDCR_AUDIT_EVENT_ID.CAN_REPL,
1011            self.get_src_cluster().get_master_node(),
1012            self.__get_event_expected_results())
1013
1014
1015    def cancel(self, rest, rest_all_repl):
1016        rest.stop_replication(rest_all_repl["cancelURI"])
1017        self.__validate_cancel_event()
1018
1019
1020class CouchbaseCluster:
1021
1022    def __init__(self, name, nodes, log, use_hostname=False, sdk_compression=True):
1023        """
1024        @param name: Couchbase cluster name. e.g C1, C2 to distinguish in logs.
1025        @param nodes: list of server objects (read from ini file).
1026        @param log: logger object to print logs.
1027        @param use_hostname: True if use node's hostname rather ip to access
1028                        node else False.
1029        """
1030        self.__name = name
1031        self.__nodes = nodes
1032        self.__log = log
1033        self.__mem_quota = 0
1034        self.__use_hostname = use_hostname
1035        self.__master_node = nodes[0]
1036        self.__design_docs = []
1037        self.__buckets = []
1038        self.__hostnames = {}
1039        self.__fail_over_nodes = []
1040        self.__data_verified = True
1041        self.__meta_data_verified = True
1042        self.__remote_clusters = []
1043        self.__clusterop = Cluster()
1044        self.__kv_gen = {}
1045        self.sdk_compression = sdk_compression
1046
1047    def __str__(self):
1048        return "Couchbase Cluster: %s, Master Ip: %s" % (
1049            self.__name, self.__master_node.ip)
1050
1051    def __stop_rebalance(self):
1052        rest = RestConnection(self.__master_node)
1053        if rest._rebalance_progress_status() == 'running':
1054            self.__log.warning(
1055                "rebalancing is still running, test should be verified")
1056            stopped = rest.stop_rebalance()
1057            raise_if(
1058                not stopped,
1059                RebalanceNotStopException("unable to stop rebalance"))
1060
1061    def __init_nodes(self, disabled_consistent_view=None):
1062        """Initialize all nodes. Rename node to hostname
1063        if needed by test.
1064        """
1065        tasks = []
1066        for node in self.__nodes:
1067            tasks.append(
1068                self.__clusterop.async_init_node(
1069                    node,
1070                    disabled_consistent_view))
1071        for task in tasks:
1072            mem_quota = task.result()
1073            if mem_quota < self.__mem_quota or self.__mem_quota == 0:
1074                self.__mem_quota = mem_quota
1075        if self.__use_hostname:
1076            self.__hostnames.update(NodeHelper.rename_nodes(self.__nodes))
1077
1078    def get_host_names(self):
1079        return self.__hostnames
1080
1081    def get_master_node(self):
1082        return self.__master_node
1083
1084    def get_mem_quota(self):
1085        return self.__mem_quota
1086
1087    def get_remote_clusters(self):
1088        return self.__remote_clusters
1089
1090    def clear_all_remote_clusters(self):
1091        self.__remote_clusters = []
1092
1093    def get_nodes(self):
1094        return self.__nodes
1095
1096    def get_name(self):
1097        return self.__name
1098
1099    def get_cluster(self):
1100        return self.__clusterop
1101
1102    def get_kv_gen(self):
1103        raise_if(
1104            self.__kv_gen is None,
1105            XDCRException(
1106                "KV store is empty on couchbase cluster: %s" %
1107                self))
1108        return self.__kv_gen
1109
1110    def get_remote_cluster_ref_by_name(self, cluster_name):
1111        for remote_cluster_ref in self.__remote_clusters:
1112            if remote_cluster_ref.get_name() == cluster_name:
1113                return remote_cluster_ref
1114        self.__log.info("Remote cluster reference by name %s does not exist on %s"
1115                        % (cluster_name, self.__name))
1116        return None
1117
1118    def init_cluster(self, disabled_consistent_view=None):
1119        """Initialize cluster.
1120        1. Initialize all nodes.
1121        2. Add all nodes to the cluster.
1122        3. Enable xdcr trace logs to easy debug for xdcr items mismatch issues.
1123        """
1124        master = RestConnection(self.__master_node)
1125        self.enable_diag_eval_on_non_local_hosts(self.__master_node)
1126        is_master_sherlock_or_greater = master.is_cluster_compat_mode_greater_than(3.1)
1127        self.__init_nodes(disabled_consistent_view)
1128        self.__clusterop.async_rebalance(
1129            self.__nodes,
1130            self.__nodes[1:],
1131            [],
1132            use_hostnames=self.__use_hostname).result()
1133
1134        if CHECK_AUDIT_EVENT.CHECK:
1135            if master.is_enterprise_edition() and is_master_sherlock_or_greater:
1136                # enable audit by default in all goxdcr tests
1137                audit_obj = audit(host=self.__master_node)
1138                status = audit_obj.getAuditStatus()
1139                self.__log.info("Audit status on {0} is {1}".
1140                            format(self.__name, status))
1141                if not status:
1142                    self.__log.info("Enabling audit ...")
1143                    audit_obj.setAuditEnable('true')
1144
1145    def enable_diag_eval_on_non_local_hosts(self, master):
1146        """
1147        Enable diag/eval to be run on non-local hosts.
1148        :param master: Node information of the master node of the cluster
1149        :return: Nothing
1150        """
1151        remote = RemoteMachineShellConnection(master)
1152        output, error = remote.enable_diag_eval_on_non_local_hosts()
1153        if output is not None:
1154            if "ok" not in output:
1155                self.__log.error("Error in enabling diag/eval on non-local hosts on {}".format(master.ip))
1156                raise Exception("Error in enabling diag/eval on non-local hosts on {}".format(master.ip))
1157            else:
1158                self.__log.info(
1159                    "Enabled diag/eval for non-local hosts from {}".format(
1160                        master.ip))
1161        else:
1162            self.__log.info("Running in compatibility mode, not enabled diag/eval for non-local hosts")
1163
1164    def _create_bucket_params(self, server, replicas=1, size=0, port=11211, password=None,
1165                             bucket_type=None, enable_replica_index=1, eviction_policy='valueOnly',
1166                             bucket_priority=None, flush_enabled=1, lww=False, maxttl=None):
1167        """Create a set of bucket_parameters to be sent to all of the bucket_creation methods
1168        Parameters:
1169            server - The server to create the bucket on. (TestInputServer)
1170            bucket_name - The name of the bucket to be created. (String)
1171            port - The port to create this bucket on. (String)
1172            password - The password for this bucket. (String)
1173            size - The size of the bucket to be created. (int)
1174            enable_replica_index - can be 0 or 1, 1 enables indexing of replica bucket data (int)
1175            replicas - The number of replicas for this bucket. (int)
1176            eviction_policy - The eviction policy for the bucket, can be valueOnly or fullEviction. (String)
1177            bucket_priority - The priority of the bucket:either none, low, or high. (String)
1178            bucket_type - The type of bucket. (String)
1179            flushEnabled - Enable or Disable the flush functionality of the bucket. (int)
1180            lww = determine the conflict resolution type of the bucket. (Boolean)
1181
1182        Returns:
1183            bucket_params - A dictionary containing the parameters needed to create a bucket."""
1184
1185        bucket_params = {}
1186        bucket_params['server'] = server
1187        bucket_params['replicas'] = replicas
1188        bucket_params['size'] = size
1189        bucket_params['port'] = port
1190        bucket_params['password'] = password
1191        bucket_type = TestInputSingleton.input.param("bucket_type", "membase")
1192        bucket_params['bucket_type'] = bucket_type
1193        bucket_params['enable_replica_index'] = enable_replica_index
1194        if bucket_type == "ephemeral":
1195            if eviction_policy in EVICTION_POLICY.EPH:
1196                bucket_params['eviction_policy'] = eviction_policy
1197            else:
1198                bucket_params['eviction_policy'] = EVICTION_POLICY.NRU_EVICTION
1199            if eviction_policy == EVICTION_POLICY.NRU_EVICTION:
1200                if "6.0.2-" in NodeHelper.get_cb_version(server):
1201                    self.set_internal_setting("AllowSourceNRUCreation", "true")
1202        else:
1203            if eviction_policy in EVICTION_POLICY.CB:
1204                bucket_params['eviction_policy'] = eviction_policy
1205            else:
1206                bucket_params['eviction_policy'] = EVICTION_POLICY.VALUE_ONLY
1207        bucket_params['bucket_priority'] = bucket_priority
1208        bucket_params['flush_enabled'] = flush_enabled
1209        bucket_params['lww'] = lww
1210        bucket_params['maxTTL'] = maxttl
1211        return bucket_params
1212
1213    def set_global_checkpt_interval(self, value):
1214        self.set_xdcr_param("checkpointInterval",value)
1215
1216    def set_internal_setting(self, param, value):
1217        output, _ = RemoteMachineShellConnection(self.__master_node).execute_command_raw(
1218            "curl -X GET http://Administrator:password@localhost:9998/xdcr/internalSettings")
1219        if not '"' + param + '":' + value in output:
1220            RemoteMachineShellConnection(self.__master_node).execute_command_raw(
1221                "curl http://Administrator:password@localhost:9998/xdcr/internalSettings -X POST -d " +
1222                urllib.quote_plus(param +'="' + value + '"'))
1223
1224    def __remove_all_remote_clusters(self):
1225        rest_remote_clusters = RestConnection(
1226            self.__master_node).get_remote_clusters()
1227        for remote_cluster_ref in self.__remote_clusters:
1228            for rest_remote_cluster in rest_remote_clusters:
1229                if remote_cluster_ref.get_name() == rest_remote_cluster[
1230                        'name']:
1231                    if not rest_remote_cluster.get('deleted', False):
1232                        remote_cluster_ref.remove()
1233        self.__remote_clusters = []
1234
1235    def __remove_all_replications(self):
1236        for remote_cluster_ref in self.__remote_clusters:
1237            remote_cluster_ref.stop_all_replications()
1238
1239    def cleanup_cluster(
1240            self,
1241            test_case,
1242            from_rest=False,
1243            cluster_shutdown=True):
1244        """Cleanup cluster.
1245        1. Remove all remote cluster references.
1246        2. Remove all replications.
1247        3. Remove all buckets.
1248        @param test_case: Test case object.
1249        @param test_failed: True if test failed else False.
1250        @param cluster_run: True if test execution is single node cluster run else False.
1251        @param cluster_shutdown: True if Task (task.py) Scheduler needs to shutdown else False
1252        """
1253        try:
1254            self.__log.info("removing xdcr/nodes settings")
1255            rest = RestConnection(self.__master_node)
1256            if from_rest:
1257                rest.remove_all_replications()
1258                rest.remove_all_remote_clusters()
1259            else:
1260                self.__remove_all_replications()
1261                self.__remove_all_remote_clusters()
1262            rest.remove_all_recoveries()
1263            self.__stop_rebalance()
1264            self.__log.info("cleanup {0}".format(self.__nodes))
1265            for node in self.__nodes:
1266                BucketOperationHelper.delete_all_buckets_or_assert(
1267                    [node],
1268                    test_case)
1269                force_eject = TestInputSingleton.input.param(
1270                    "forceEject",
1271                    False)
1272                if force_eject and node != self.__master_node:
1273                    try:
1274                        rest = RestConnection(node)
1275                        rest.force_eject_node()
1276                    except BaseException as e:
1277                        self.__log.error(e)
1278                else:
1279                    ClusterOperationHelper.cleanup_cluster([node])
1280                ClusterOperationHelper.wait_for_ns_servers_or_assert(
1281                    [node],
1282                    test_case)
1283        finally:
1284            if cluster_shutdown:
1285                self.__clusterop.shutdown(force=True)
1286
1287    def create_sasl_buckets(
1288            self, bucket_size, num_buckets=1, num_replicas=1,
1289            eviction_policy=EVICTION_POLICY.VALUE_ONLY,
1290            bucket_priority=BUCKET_PRIORITY.HIGH, lww=False,
1291            maxttl=None):
1292        """Create sasl buckets.
1293        @param bucket_size: size of the bucket.
1294        @param num_buckets: number of buckets to create.
1295        @param num_replicas: number of replicas (1-3).
1296        @param eviction_policy: valueOnly etc.
1297        @param bucket_priority: high/low etc.
1298        @param lww: conflict_resolution_type.
1299        """
1300        bucket_tasks = []
1301        for i in range(num_buckets):
1302            name = "sasl_bucket_" + str(i + 1)
1303            sasl_params = self._create_bucket_params(server=self.__master_node, password='password',
1304                                                     size=bucket_size, replicas=num_replicas,
1305                                                     eviction_policy=eviction_policy,
1306                                                     bucket_priority=bucket_priority,
1307                                                     lww=lww, maxttl=maxttl)
1308            bucket_tasks.append(self.__clusterop.async_create_sasl_bucket(name=name,password='password',
1309                                                                          bucket_params=sasl_params))
1310
1311            self.__buckets.append(
1312                Bucket(
1313                    name=name, authType="sasl", saslPassword="password",
1314                    num_replicas=num_replicas, bucket_size=bucket_size,
1315                    eviction_policy=eviction_policy,
1316                    bucket_priority=bucket_priority,
1317                    lww=lww,
1318                    maxttl=maxttl
1319                ))
1320
1321        for task in bucket_tasks:
1322            task.result()
1323
1324    def create_standard_buckets(
1325            self, bucket_size, num_buckets=1, num_replicas=1,
1326            eviction_policy=EVICTION_POLICY.VALUE_ONLY,
1327            bucket_priority=BUCKET_PRIORITY.HIGH, lww=False, maxttl=None):
1328        """Create standard buckets.
1329        @param bucket_size: size of the bucket.
1330        @param num_buckets: number of buckets to create.
1331        @param num_replicas: number of replicas (1-3).
1332        @param eviction_policy: valueOnly etc.
1333        @param bucket_priority: high/low etc.
1334        @param lww: conflict_resolution_type.
1335        """
1336        bucket_tasks = []
1337        for i in range(num_buckets):
1338            name = "standard_bucket_" + str(i + 1)
1339            standard_params = self._create_bucket_params(
1340                server=self.__master_node,
1341                size=bucket_size,
1342                replicas=num_replicas,
1343                eviction_policy=eviction_policy,
1344                bucket_priority=bucket_priority,
1345                lww=lww,
1346                maxttl=maxttl)
1347
1348            bucket_tasks.append(self.__clusterop.async_create_standard_bucket(name=name, port=STANDARD_BUCKET_PORT+i,
1349                                                                              bucket_params=standard_params))
1350
1351
1352            self.__buckets.append(
1353                Bucket(
1354                    name=name,
1355                    authType=None,
1356                    saslPassword=None,
1357                    num_replicas=num_replicas,
1358                    bucket_size=bucket_size,
1359                    port=STANDARD_BUCKET_PORT + i,
1360                    eviction_policy=eviction_policy,
1361                    bucket_priority=bucket_priority,
1362                    lww=lww,
1363                    maxttl=maxttl
1364                ))
1365
1366        for task in bucket_tasks:
1367            task.result()
1368
1369    def create_default_bucket(
1370            self, bucket_size, num_replicas=1,
1371            eviction_policy=EVICTION_POLICY.VALUE_ONLY,
1372            bucket_priority=BUCKET_PRIORITY.HIGH, lww=False,
1373            maxttl=None):
1374        """Create default bucket.
1375        @param bucket_size: size of the bucket.
1376        @param num_replicas: number of replicas (1-3).
1377        @param eviction_policy: valueOnly etc.
1378        @param bucket_priority: high/low etc.
1379        @param lww: conflict_resolution_type.
1380        """
1381        bucket_params=self._create_bucket_params(
1382            server=self.__master_node,
1383            size=bucket_size,
1384            replicas=num_replicas,
1385            eviction_policy=eviction_policy,
1386            bucket_priority=bucket_priority,
1387            lww=lww,
1388            maxttl=maxttl)
1389
1390        self.__clusterop.create_default_bucket(bucket_params)
1391        self.__buckets.append(
1392            Bucket(
1393                name=BUCKET_NAME.DEFAULT,
1394                authType="sasl",
1395                saslPassword="",
1396                num_replicas=num_replicas,
1397                bucket_size=bucket_size,
1398                eviction_policy=eviction_policy,
1399                bucket_priority=bucket_priority,
1400                lww=lww,
1401                maxttl=maxttl
1402            ))
1403
1404    def get_buckets(self):
1405        return self.__buckets
1406
1407    def add_bucket(self, bucket='',
1408                   ramQuotaMB=1,
1409                   authType='none',
1410                   saslPassword='',
1411                   replicaNumber=1,
1412                   proxyPort=11211,
1413                   bucketType='membase',
1414                   evictionPolicy='valueOnly'):
1415        self.__buckets.append(Bucket(bucket_size=ramQuotaMB, name=bucket, authType=authType,
1416                                     saslPassword=saslPassword, num_replicas=replicaNumber,
1417                                     port=proxyPort, type=bucketType, eviction_policy=evictionPolicy))
1418
1419    def get_bucket_by_name(self, bucket_name):
1420        """Return the bucket with given name
1421        @param bucket_name: bucket name.
1422        @return: bucket object
1423        """
1424        for bucket in self.__buckets:
1425            if bucket.name == bucket_name:
1426                return bucket
1427
1428        raise Exception(
1429            "Bucket with name: %s not found on the cluster" %
1430            bucket_name)
1431
1432    def delete_bucket(self, bucket_name):
1433        """Delete bucket with given name
1434        @param bucket_name: bucket name (string) to delete
1435        """
1436        bucket_to_remove = self.get_bucket_by_name(bucket_name)
1437        self.__clusterop.bucket_delete(
1438            self.__master_node,
1439            bucket_to_remove.name)
1440        self.__buckets.remove(bucket_to_remove)
1441
1442    def delete_all_buckets(self):
1443        for bucket_to_remove in self.__buckets:
1444            self.__clusterop.bucket_delete(
1445                self.__master_node,
1446                bucket_to_remove.name)
1447            self.__buckets.remove(bucket_to_remove)
1448
1449    def flush_buckets(self, buckets=[]):
1450        buckets = buckets or self.__buckets
1451        tasks = []
1452        for bucket in buckets:
1453            tasks.append(self.__clusterop.async_bucket_flush(
1454                self.__master_node,
1455                bucket))
1456        [task.result() for task in tasks]
1457
1458    def async_load_bucket(self, bucket, num_items, value_size=512, exp=0,
1459                          kv_store=1, flag=0, only_store_hash=True,
1460                          batch_size=1000, pause_secs=1, timeout_secs=30):
1461        """Load data asynchronously on given bucket. Function don't wait for
1462        load data to finish, return immidiately.
1463        @param bucket: bucket where to load data.
1464        @param num_items: number of items to load
1465        @param value_size: size of the one item.
1466        @param exp: expiration value.
1467        @param kv_store: kv store index.
1468        @param flag:
1469        @param only_store_hash: True to store hash of item else False.
1470        @param batch_size: batch size for load data at a time.
1471        @param pause_secs: pause for next batch load.
1472        @param timeout_secs: timeout
1473        @return: task object
1474        """
1475        seed = "%s-key-" % self.__name
1476        self.__kv_gen[
1477            OPS.CREATE] = BlobGenerator(
1478            seed,
1479            seed,
1480            value_size,
1481            end=num_items)
1482
1483        gen = copy.deepcopy(self.__kv_gen[OPS.CREATE])
1484        task = self.__clusterop.async_load_gen_docs(
1485            self.__master_node, bucket.name, gen, bucket.kvs[kv_store],
1486            OPS.CREATE, exp, flag, only_store_hash, batch_size, pause_secs,
1487            timeout_secs, compression=self.sdk_compression)
1488        return task
1489
1490    def load_bucket(self, bucket, num_items, value_size=512, exp=0,
1491                    kv_store=1, flag=0, only_store_hash=True,
1492                    batch_size=1000, pause_secs=1, timeout_secs=30):
1493        """Load data synchronously on given bucket. Function wait for
1494        load data to finish.
1495        @param bucket: bucket where to load data.
1496        @param num_items: number of items to load
1497        @param value_size: size of the one item.
1498        @param exp: expiration value.
1499        @param kv_store: kv store index.
1500        @param flag:
1501        @param only_store_hash: True to store hash of item else False.
1502        @param batch_size: batch size for load data at a time.
1503        @param pause_secs: pause for next batch load.
1504        @param timeout_secs: timeout
1505        """
1506        task = self.async_load_bucket(bucket, num_items, value_size, exp,
1507                                      kv_store, flag, only_store_hash,
1508                                      batch_size, pause_secs, timeout_secs)
1509        task.result()
1510
1511    def async_load_all_buckets(self, num_items, value_size=512, exp=0,
1512                               kv_store=1, flag=0, only_store_hash=True,
1513                               batch_size=1000, pause_secs=1, timeout_secs=30):
1514        """Load data asynchronously on all buckets of the cluster.
1515        Function don't wait for load data to finish, return immidiately.
1516        @param num_items: number of items to load
1517        @param value_size: size of the one item.
1518        @param exp: expiration value.
1519        @param kv_store: kv store index.
1520        @param flag:
1521        @param only_store_hash: True to store hash of item else False.
1522        @param batch_size: batch size for load data at a time.
1523        @param pause_secs: pause for next batch load.
1524        @param timeout_secs: timeout
1525        @return: task objects list
1526        """
1527        seed = "%s-key-" % self.__name
1528        self.__kv_gen[
1529            OPS.CREATE] = BlobGenerator(
1530            seed,
1531            seed,
1532            value_size,
1533            start=0,
1534            end=num_items)
1535        tasks = []
1536        for bucket in self.__buckets:
1537            gen = copy.deepcopy(self.__kv_gen[OPS.CREATE])
1538            tasks.append(
1539                self.__clusterop.async_load_gen_docs(
1540                    self.__master_node, bucket.name, gen, bucket.kvs[kv_store],
1541                    OPS.CREATE, exp, flag, only_store_hash, batch_size,
1542                    pause_secs, timeout_secs, compression=self.sdk_compression)
1543            )
1544        return tasks
1545
1546    def load_all_buckets(self, num_items, value_size=512, exp=0,
1547                         kv_store=1, flag=0, only_store_hash=True,
1548                         batch_size=1000, pause_secs=1, timeout_secs=30):
1549        """Load data synchronously on all buckets. Function wait for
1550        load data to finish.
1551        @param num_items: number of items to load
1552        @param value_size: size of the one item.
1553        @param exp: expiration value.
1554        @param kv_store: kv store index.
1555        @param flag:
1556        @param only_store_hash: True to store hash of item else False.
1557        @param batch_size: batch size for load data at a time.
1558        @param pause_secs: pause for next batch load.
1559        @param timeout_secs: timeout
1560        """
1561        tasks = self.async_load_all_buckets(
1562            num_items, value_size, exp, kv_store, flag, only_store_hash,
1563            batch_size, pause_secs, timeout_secs)
1564        for task in tasks:
1565            task.result()
1566
1567    def load_all_buckets_from_generator(self, kv_gen, ops=OPS.CREATE, exp=0,
1568                                        kv_store=1, flag=0, only_store_hash=True,
1569                                        batch_size=1000, pause_secs=1, timeout_secs=30):
1570        """Load data synchronously on all buckets. Function wait for
1571        load data to finish.
1572        @param gen: BlobGenerator() object
1573        @param ops: OPS.CREATE/UPDATE/DELETE/APPEND.
1574        @param exp: expiration value.
1575        @param kv_store: kv store index.
1576        @param flag:
1577        @param only_store_hash: True to store hash of item else False.
1578        @param batch_size: batch size for load data at a time.
1579        @param pause_secs: pause for next batch load.
1580        @param timeout_secs: timeout
1581        """
1582        # TODO append generator values if op_type is already present
1583        if ops not in self.__kv_gen:
1584            self.__kv_gen[ops] = kv_gen
1585
1586        tasks = []
1587        for bucket in self.__buckets:
1588            kv_gen = copy.deepcopy(self.__kv_gen[OPS.CREATE])
1589            tasks.append(
1590                self.__clusterop.async_load_gen_docs(
1591                    self.__master_node, bucket.name, kv_gen,
1592                    bucket.kvs[kv_store], ops, exp, flag,
1593                    only_store_hash, batch_size, pause_secs, timeout_secs,
1594                    compression=self.sdk_compression)
1595            )
1596        for task in tasks:
1597            task.result()
1598
1599    def async_load_all_buckets_from_generator(self, kv_gen, ops=OPS.CREATE, exp=0,
1600                                              kv_store=1, flag=0, only_store_hash=True,
1601                                              batch_size=1000, pause_secs=1, timeout_secs=30):
1602        """Load data asynchronously on all buckets. Function wait for
1603        load data to finish.
1604        @param gen: BlobGenerator() object
1605        @param ops: OPS.CREATE/UPDATE/DELETE/APPEND.
1606        @param exp: expiration value.
1607        @param kv_store: kv store index.
1608        @param flag:
1609        @param only_store_hash: True to store hash of item else False.
1610        @param batch_size: batch size for load data at a time.
1611        @param pause_secs: pause for next batch load.
1612        @param timeout_secs: timeout
1613        """
1614        # TODO append generator values if op_type is already present
1615        if ops not in self.__kv_gen:
1616            self.__kv_gen[ops] = kv_gen
1617
1618        tasks = []
1619        for bucket in self.__buckets:
1620            kv_gen = copy.deepcopy(self.__kv_gen[OPS.CREATE])
1621            tasks.append(
1622                self.__clusterop.async_load_gen_docs(
1623                    self.__master_node, bucket.name, kv_gen,
1624                    bucket.kvs[kv_store], ops, exp, flag,
1625                    only_store_hash, batch_size, pause_secs, timeout_secs, compression=self.sdk_compression)
1626            )
1627        return tasks
1628
1629    def load_all_buckets_till_dgm(self, active_resident_threshold, items=0,
1630                                  value_size=512, exp=0, kv_store=1, flag=0,
1631                                  only_store_hash=True, batch_size=1000,
1632                                  pause_secs=1, timeout_secs=30):
1633        """Load data synchronously on all buckets till dgm (Data greater than memory)
1634        for given active_resident_threshold
1635        @param active_resident_threshold: Dgm threshold.
1636        @param value_size: size of the one item.
1637        @param exp: expiration value.
1638        @param kv_store: kv store index.
1639        @param flag:
1640        @param only_store_hash: True to store hash of item else False.
1641        @param batch_size: batch size for load data at a time.
1642        @param pause_secs: pause for next batch load.
1643        @param timeout_secs: timeout
1644        """
1645        items = int(items)
1646        self.__log.info("First loading \"items\" {0} number keys to handle "
1647                      "update/deletes in dgm cases".format(items))
1648        self.load_all_buckets(items)
1649
1650        self.__log.info("Now loading extra keys to reach dgm limit")
1651        seed = "%s-key-" % self.__name
1652        end = 0
1653        for bucket in self.__buckets:
1654            current_active_resident = StatsCommon.get_stats(
1655                [self.__master_node],
1656                bucket,
1657                '',
1658                'vb_active_perc_mem_resident')[self.__master_node]
1659            start = items
1660            end = start + batch_size * 10
1661            while int(current_active_resident) > active_resident_threshold:
1662                self.__log.info("loading %s keys ..." % (end-start))
1663
1664                kv_gen = BlobGenerator(
1665                    seed,
1666                    seed,
1667                    value_size,
1668                    start=start,
1669                    end=end)
1670
1671                tasks = []
1672                tasks.append(self.__clusterop.async_load_gen_docs(
1673                    self.__master_node, bucket.name, kv_gen, bucket.kvs[kv_store],
1674                    OPS.CREATE, exp, flag, only_store_hash, batch_size,
1675                    pause_secs, timeout_secs, compression=self.sdk_compression))
1676
1677                for task in tasks:
1678                    task.result()
1679                start = end
1680                end = start + batch_size * 10
1681                current_active_resident = StatsCommon.get_stats(
1682                    [self.__master_node],
1683                    bucket,
1684                    '',
1685                    'vb_active_perc_mem_resident')[self.__master_node]
1686                self.__log.info(
1687                    "Current resident ratio: %s, desired: %s bucket %s" % (
1688                        current_active_resident,
1689                        active_resident_threshold,
1690                        bucket.name))
1691            self.__log.info("Loaded a total of %s keys into bucket %s"
1692                            % (end,bucket.name))
1693        self.__kv_gen[OPS.CREATE] = BlobGenerator(
1694                seed,
1695                seed,
1696                value_size,
1697                start = 0,
1698                end=end)
1699
1700    def async_update_delete(
1701            self, op_type, perc=30, expiration=0, kv_store=1):
1702        """Perform update/delete operation on all buckets. Function don't wait
1703        operation to finish.
1704        @param op_type: OPS.CREATE/OPS.UPDATE/OPS.DELETE
1705        @param perc: percentage of data to be deleted or created
1706        @param expiration: time for expire items
1707        @return: task object list
1708        """
1709        raise_if(
1710            OPS.CREATE not in self.__kv_gen,
1711            XDCRException(
1712                "Data is not loaded in cluster.Load data before update/delete")
1713        )
1714        tasks = []
1715        for bucket in self.__buckets:
1716            if op_type == OPS.UPDATE:
1717                if isinstance(self.__kv_gen[OPS.CREATE], BlobGenerator):
1718                    self.__kv_gen[OPS.UPDATE] = BlobGenerator(
1719                        self.__kv_gen[OPS.CREATE].name,
1720                        self.__kv_gen[OPS.CREATE].seed,
1721                        self.__kv_gen[OPS.CREATE].value_size,
1722                        start=0,
1723                        end=int(self.__kv_gen[OPS.CREATE].end * (float)(perc) / 100))
1724                elif isinstance(self.__kv_gen[OPS.CREATE], DocumentGenerator):
1725                    self.__kv_gen[OPS.UPDATE] = DocumentGenerator(
1726                        self.__kv_gen[OPS.CREATE].name,
1727                        self.__kv_gen[OPS.CREATE].template,
1728                        self.__kv_gen[OPS.CREATE].args,
1729                        start=0,
1730                        end=int(self.__kv_gen[OPS.CREATE].end * (float)(perc) / 100))
1731                gen = copy.deepcopy(self.__kv_gen[OPS.UPDATE])
1732            elif op_type == OPS.DELETE:
1733                if isinstance(self.__kv_gen[OPS.CREATE], BlobGenerator):
1734                    self.__kv_gen[OPS.DELETE] = BlobGenerator(
1735                        self.__kv_gen[OPS.CREATE].name,
1736                        self.__kv_gen[OPS.CREATE].seed,
1737                        self.__kv_gen[OPS.CREATE].value_size,
1738                        start=int((self.__kv_gen[OPS.CREATE].end) * (float)(
1739                            100 - perc) / 100),
1740                        end=self.__kv_gen[OPS.CREATE].end)
1741                elif isinstance(self.__kv_gen[OPS.CREATE], DocumentGenerator):
1742                    self.__kv_gen[OPS.DELETE] = DocumentGenerator(
1743                        self.__kv_gen[OPS.CREATE].name,
1744                        self.__kv_gen[OPS.CREATE].template,
1745                        self.__kv_gen[OPS.CREATE].args,
1746                        start=0,
1747                        end=int(self.__kv_gen[OPS.CREATE].end * (float)(perc) / 100))
1748                gen = copy.deepcopy(self.__kv_gen[OPS.DELETE])
1749            else:
1750                raise XDCRException("Unknown op_type passed: %s" % op_type)
1751
1752            self.__log.info("At bucket '{0}' @ {1}: operation: {2}, key range {3} - {4}".
1753                       format(bucket.name, self.__name, op_type, gen.start, gen.end))
1754            tasks.append(
1755                self.__clusterop.async_load_gen_docs(
1756                    self.__master_node,
1757                    bucket.name,
1758                    gen,
1759                    bucket.kvs[kv_store],
1760                    op_type,
1761                    expiration,
1762                    batch_size=1000,
1763                    compression=self.sdk_compression)
1764            )
1765        return tasks
1766
1767    def update_delete_data(
1768            self, op_type, perc=30, expiration=0, wait_for_expiration=True):
1769        """Perform update/delete operation on all buckets. Function wait
1770        operation to finish.
1771        @param op_type: OPS.CREATE/OPS.UPDATE/OPS.DELETE
1772        @param perc: percentage of data to be deleted or created
1773        @param expiration: time for expire items
1774        @param wait_for_expiration: True if wait for expire of items after
1775        update else False
1776        """
1777        tasks = self.async_update_delete(op_type, perc, expiration)
1778
1779        [task.result() for task in tasks]
1780
1781        if wait_for_expiration and expiration:
1782            self.__log.info("Waiting for expiration of updated items")
1783            time.sleep(expiration)
1784
1785    def run_expiry_pager(self, val=10):
1786        """Run expiry pager process and set interval to 10 seconds
1787        and wait for 10 seconds.
1788        @param val: time in seconds.
1789        """
1790        for bucket in self.__buckets:
1791            ClusterOperationHelper.flushctl_set(
1792                self.__master_node,
1793                "exp_pager_stime",
1794                val,
1795                bucket)
1796            self.__log.info("wait for expiry pager to run on all these nodes")
1797        time.sleep(val)
1798
1799    def async_create_views(
1800            self, design_doc_name, views, bucket=BUCKET_NAME.DEFAULT):
1801        """Create given views on Cluster.
1802        @param design_doc_name: name of design doc.
1803        @param views: views objects.
1804        @param bucket: bucket name.
1805        @return: task list for CreateViewTask
1806        """
1807        tasks = []
1808        if len(views):
1809            for view in views:
1810                task = self.__clusterop.async_create_view(
1811                    self.__master_node,
1812                    design_doc_name,
1813                    view,
1814                    bucket)
1815                tasks.append(task)
1816        else:
1817            task = self.__clusterop.async_create_view(
1818                self.__master_node,
1819                design_doc_name,
1820                None,
1821                bucket)
1822            tasks.append(task)
1823        return tasks
1824
1825    def async_compact_view(
1826            self, design_doc_name, bucket=BUCKET_NAME.DEFAULT,
1827            with_rebalance=False):
1828        """Create given views on Cluster.
1829        @param design_doc_name: name of design doc.
1830        @param bucket: bucket name.
1831        @param with_rebalance: True if compaction is called during
1832        rebalance or False.
1833        @return: task object
1834        """
1835        task = self.__clusterop.async_compact_view(
1836            self.__master_node,
1837            design_doc_name,
1838            bucket,
1839            with_rebalance)
1840        return task
1841
1842    def disable_compaction(self, bucket=BUCKET_NAME.DEFAULT):
1843        """Disable view compaction
1844        @param bucket: bucket name.
1845        """
1846        new_config = {"viewFragmntThresholdPercentage": None,
1847                      "dbFragmentThresholdPercentage": None,
1848                      "dbFragmentThreshold": None,
1849                      "viewFragmntThreshold": None}
1850        self.__clusterop.modify_fragmentation_config(
1851            self.__master_node,
1852            new_config,
1853            bucket)
1854
1855    def async_monitor_view_fragmentation(
1856            self,
1857            design_doc_name,
1858            fragmentation_value,
1859            bucket=BUCKET_NAME.DEFAULT):
1860        """Monitor view fragmantation during compation.
1861        @param design_doc_name: name of design doc.
1862        @param fragmentation_value: fragmentation threshold to monitor.
1863        @param bucket: bucket name.
1864        """
1865        task = self.__clusterop.async_monitor_view_fragmentation(
1866            self.__master_node,
1867            design_doc_name,
1868            fragmentation_value,
1869            bucket)
1870        return task
1871
1872    def async_query_view(
1873            self, design_doc_name, view_name, query,
1874            expected_rows=None, bucket="default", retry_time=2):
1875        """Perform View Query for given view asynchronously.
1876        @param design_doc_name: design_doc name.
1877        @param view_name: view name
1878        @param query: query expression
1879        @param expected_rows: number of rows expected returned in query.
1880        @param bucket: bucket name.
1881        @param retry_time: retry to perform view query
1882        @return: task object of ViewQueryTask class
1883        """
1884        task = self.__clusterop.async_query_view(
1885            self.__master_node,
1886            design_doc_name,
1887            view_name,
1888            query,
1889            expected_rows,
1890            bucket=bucket,
1891            retry_time=retry_time)
1892        return task
1893
1894    def query_view(
1895            self, design_doc_name, view_name, query,
1896            expected_rows=None, bucket="default", retry_time=2, timeout=None):
1897        """Perform View Query for given view synchronously.
1898        @param design_doc_name: design_doc name.
1899        @param view_name: view name
1900        @param query: query expression
1901        @param expected_rows: number of rows expected returned in query.
1902        @param bucket: bucket name.
1903        @param retry_time: retry to perform view query
1904        @param timeout: None if wait for query result until returned
1905        else pass timeout value.
1906        """
1907
1908        task = self.__clusterop.async_query_view(
1909            self.__master_node,
1910            design_doc_name,
1911            view_name,
1912            query,
1913            expected_rows,
1914            bucket=bucket, retry_time=retry_time)
1915        task.result(timeout)
1916
1917    def __async_rebalance_out(self, master=False, num_nodes=1):
1918        """Rebalance-out nodes from Cluster
1919        @param master: True if rebalance-out master node only.
1920        @param num_nodes: number of nodes to rebalance-out from cluster.
1921        """
1922        raise_if(
1923            len(self.__nodes) <= num_nodes,
1924            XDCRException(
1925                "Cluster needs:{0} nodes for rebalance-out, current: {1}".
1926                format((num_nodes + 1), len(self.__nodes)))
1927        )
1928        if master:
1929            to_remove_node = [self.__master_node]
1930        else:
1931            to_remove_node = self.__nodes[-num_nodes:]
1932        self.__log.info(
1933            "Starting rebalance-out nodes:{0} at {1} cluster {2}".format(
1934                to_remove_node, self.__name, self.__master_node.ip))
1935        task = self.__clusterop.async_rebalance(
1936            self.__nodes,
1937            [],
1938            to_remove_node)
1939
1940        [self.__nodes.remove(node) for node in to_remove_node]
1941
1942        if master:
1943            self.__master_node = self.__nodes[0]
1944
1945        return task
1946
1947    def async_rebalance_in_out(self, remove_nodes, num_add_nodes=1, master=False):
1948        """Rebalance-in-out nodes from Cluster
1949        @param remove_nodes: a list of nodes to be rebalanced-out
1950        @param master: True if rebalance-out master node only.
1951        @param num_nodes: number of nodes to add back to cluster.
1952        """
1953        raise_if(len(FloatingServers._serverlist) < num_add_nodes,
1954            XDCRException(
1955                "Cluster needs {0} nodes for rebalance-in, current: {1}".
1956                format((num_add_nodes),
1957                       len(FloatingServers._serverlist)))
1958        )
1959
1960        add_nodes = []
1961        for _ in range(num_add_nodes):
1962            add_nodes.append(FloatingServers._serverlist.pop())
1963
1964        self.__log.info(
1965            "Starting rebalance-out: {0}, rebalance-in: {1} at {2} cluster {3}".
1966            format(
1967                remove_nodes,
1968                add_nodes,
1969                self.__name,
1970                self.__master_node.ip))
1971        task = self.__clusterop.async_rebalance(
1972            self.__nodes,
1973            add_nodes,
1974            remove_nodes)
1975
1976        if not remove_nodes:
1977            remove_nodes = self.__fail_over_nodes
1978
1979        for node in remove_nodes:
1980            for server in self.__nodes:
1981                if node.ip == server.ip:
1982                    self.__nodes.remove(server)
1983        self.__nodes.extend(add_nodes)
1984
1985        if master:
1986            self.__master_node = self.__nodes[0]
1987        return task
1988
1989    def async_rebalance_out_master(self):
1990        return self.__async_rebalance_out(master=True)
1991
1992    def async_rebalance_out(self, num_nodes=1):
1993        return self.__async_rebalance_out(num_nodes=num_nodes)
1994
1995    def rebalance_out_master(self):
1996        task = self.__async_rebalance_out(master=True)
1997        task.result()
1998
1999    def rebalance_out(self, num_nodes=1):
2000        task = self.__async_rebalance_out(num_nodes=num_nodes)
2001        task.result()
2002
2003    def async_rebalance_in(self, num_nodes=1):
2004        """Rebalance-in nodes into Cluster asynchronously
2005        @param num_nodes: number of nodes to rebalance-in to cluster.
2006        """
2007        raise_if(
2008            len(FloatingServers._serverlist) < num_nodes,
2009            XDCRException(
2010                "Number of free nodes: {0} is not preset to add {1} nodes.".
2011                format(len(FloatingServers._serverlist), num_nodes))
2012        )
2013        to_add_node = []
2014        for _ in range(num_nodes):
2015            to_add_node.append(FloatingServers._serverlist.pop())
2016        self.__log.info(
2017            "Starting rebalance-in nodes:{0} at {1} cluster {2}".format(
2018                to_add_node, self.__name, self.__master_node.ip))
2019        task = self.__clusterop.async_rebalance(self.__nodes, to_add_node, [])
2020        self.__nodes.extend(to_add_node)
2021        return task
2022
2023    def rebalance_in(self, num_nodes=1):
2024        """Rebalance-in nodes
2025        @param num_nodes: number of nodes to add to cluster.
2026        """
2027        task = self.async_rebalance_in(num_nodes)
2028        task.result()
2029
2030    def __async_swap_rebalance(self, master=False):
2031        """Swap-rebalance nodes on Cluster
2032        @param master: True if swap-rebalance master node else False.
2033        """
2034        if master:
2035            to_remove_node = [self.__master_node]
2036        else:
2037            to_remove_node = [self.__nodes[-1]]
2038
2039        to_add_node = [FloatingServers._serverlist.pop()]
2040
2041        self.__log.info(
2042            "Starting swap-rebalance [remove_node:{0}] -> [add_node:{1}] at {2} cluster {3}"
2043            .format(to_remove_node[0].ip, to_add_node[0].ip, self.__name,
2044                    self.__master_node.ip))
2045        task = self.__clusterop.async_rebalance(
2046            self.__nodes,
2047            to_add_node,
2048            to_remove_node)
2049
2050        [self.__nodes.remove(node) for node in to_remove_node]
2051        self.__nodes.extend(to_add_node)
2052
2053        if master:
2054            self.__master_node = self.__nodes[0]
2055
2056        return task
2057
2058    def async_swap_rebalance_master(self):
2059        return self.__async_swap_rebalance(master=True)
2060
2061    def async_swap_rebalance(self):
2062        return self.__async_swap_rebalance()
2063
2064    def swap_rebalance_master(self):
2065        """Swap rebalance master node.
2066        """
2067        task = self.__async_swap_rebalance(master=True)
2068        task.result()
2069
2070    def swap_rebalance(self):
2071        """Swap rebalance non-master node
2072        """
2073        task = self.__async_swap_rebalance()
2074        task.result()
2075
2076    def __async_failover(self, master=False, num_nodes=1, graceful=False):
2077        """Failover nodes from Cluster
2078        @param master: True if failover master node only.
2079        @param num_nodes: number of nodes to rebalance-out from cluster.
2080        @param graceful: True if graceful failover else False.
2081        """
2082        raise_if(
2083            len(self.__nodes) <= 1,
2084            XDCRException(
2085                "More than 1 node required in cluster to perform failover")
2086        )
2087        if master:
2088            self.__fail_over_nodes = [self.__master_node]
2089        else:
2090            self.__fail_over_nodes = self.__nodes[-num_nodes:]
2091
2092        self.__log.info(
2093            "Starting failover for nodes:{0} at {1} cluster {2}".format(
2094                self.__fail_over_nodes, self.__name, self.__master_node.ip))
2095        task = self.__clusterop.async_failover(
2096            self.__nodes,
2097            self.__fail_over_nodes,
2098            graceful)
2099
2100        return task
2101
2102    def async_failover(self, num_nodes=1, graceful=False):
2103        return self.__async_failover(num_nodes=num_nodes, graceful=graceful)
2104
2105    def failover(self, num_nodes=1, graceful=False):
2106        self.__async_failover(num_nodes=num_nodes, graceful=graceful).result()
2107
2108    def failover_and_rebalance_master(self, graceful=False, rebalance=True,master=True):
2109        """Failover master node
2110        @param graceful: True if graceful failover else False
2111        @param rebalance: True if do rebalance operation after failover.
2112        """
2113        task = self.__async_failover(master, graceful=graceful)
2114        task.result()
2115        if graceful:
2116            # wait for replica update
2117            time.sleep(60)
2118            # use rebalance stats to monitor failover
2119            RestConnection(self.__master_node).monitorRebalance()
2120        if rebalance:
2121            self.rebalance_failover_nodes()
2122        self.__master_node = self.__nodes[0]
2123
2124    def failover_and_rebalance_nodes(self, num_nodes=1, graceful=False,
2125                                     rebalance=True):
2126        """ Failover non-master nodes
2127        @param num_nodes: number of nodes to failover.
2128        @param graceful: True if graceful failover else False
2129        @param rebalance: True if do rebalance operation after failover.
2130        """
2131        task = self.__async_failover(
2132            master=False,
2133            num_nodes=num_nodes,
2134            graceful=graceful)
2135        task.result()
2136        if graceful:
2137            time.sleep(60)
2138            # use rebalance stats to monitor failover
2139            RestConnection(self.__master_node).monitorRebalance()
2140        if rebalance:
2141            self.rebalance_failover_nodes()
2142
2143    def rebalance_failover_nodes(self):
2144        self.__clusterop.rebalance(self.__nodes, [], self.__fail_over_nodes)
2145        [self.__nodes.remove(node) for node in self.__fail_over_nodes]
2146        self.__fail_over_nodes = []
2147
2148    def add_back_node(self, recovery_type=None):
2149        """add-back failed-over node to the cluster.
2150            @param recovery_type: delta/full
2151        """
2152        raise_if(
2153            len(self.__fail_over_nodes) < 1,
2154            XDCRException("No failover nodes available to add_back")
2155        )
2156        rest = RestConnection(self.__master_node)
2157        server_nodes = rest.node_statuses()
2158        for failover_node in self.__fail_over_nodes:
2159            for server_node in server_nodes:
2160                if server_node.ip == failover_node.ip:
2161                    rest.add_back_node(server_node.id)
2162                    if recovery_type:
2163                        rest.set_recovery_type(
2164                            otpNode=server_node.id,
2165                            recoveryType=recovery_type)
2166        for node in self.__fail_over_nodes:
2167            if node not in self.__nodes:
2168                self.__nodes.append(node)
2169        self.__clusterop.rebalance(self.__nodes, [], [])
2170        self.__fail_over_nodes = []
2171
2172    def warmup_node(self, master=False):
2173        """Warmup node on cluster
2174        @param master: True if warmup master-node else False.
2175        """
2176        from random import randrange
2177
2178        if master:
2179            warmup_node = self.__master_node
2180
2181        else:
2182            warmup_node = self.__nodes[
2183                randrange(
2184                    1, len(
2185                        self.__nodes))]
2186        NodeHelper.do_a_warm_up(warmup_node)
2187        return warmup_node
2188
2189    def reboot_one_node(self, test_case, master=False):
2190        from random import randrange
2191
2192        if master:
2193            reboot_node = self.__master_node
2194
2195        else:
2196            reboot_node = self.__nodes[
2197                randrange(
2198                    1, len(
2199                        self.__nodes))]
2200        NodeHelper.reboot_server(reboot_node, test_case)
2201        return reboot_node
2202
2203    def restart_couchbase_on_all_nodes(self, bucket_names=["default"]):
2204        for node in self.__nodes:
2205            NodeHelper.do_a_warm_up(node)
2206
2207        NodeHelper.wait_warmup_completed(self.__nodes, bucket_names)
2208
2209    def set_xdcr_param(self, param, value):
2210        """Set Replication parameter on couchbase server:
2211        @param param: XDCR parameter name.
2212        @param value: Value of parameter.
2213        """
2214        for remote_ref in self.get_remote_clusters():
2215            for repl in remote_ref.get_replications():
2216                src_bucket = repl.get_src_bucket()
2217                dst_bucket = repl.get_dest_bucket()
2218                RestConnection(self.__master_node).set_xdcr_param(src_bucket.name, dst_bucket.name, param, value)
2219
2220                expected_results = {
2221                    "real_userid:source": "ns_server",
2222                    "real_userid:user": self.__master_node.rest_username,
2223                    "local_cluster_name": "%s:%s" % (self.__master_node.ip, self.__master_node.port),
2224                    "updated_settings:" + param: value,
2225                    "source_bucket_name": repl.get_src_bucket().name,
2226                    "remote_cluster_name": "remote_cluster_C1-C2",
2227                    "target_bucket_name": repl.get_dest_bucket().name
2228                }
2229
2230                # In case of ns_server xdcr, no events generate for it.
2231                ValidateAuditEvent.validate_audit_event(
2232                    GO_XDCR_AUDIT_EVENT_ID.IND_SETT,
2233                    self.get_master_node(),
2234                    expected_results)
2235
2236    def get_xdcr_stat(self, bucket_name, stat):
2237        """ Return given XDCR stat for given bucket.
2238        @param bucket_name: name of bucket.
2239        @param stat: stat name
2240        @return: value of stat
2241        """
2242        return int(RestConnection(self.__master_node).fetch_bucket_xdcr_stats(
2243            bucket_name)['op']['samples'][stat][-1])
2244
2245    def wait_for_xdcr_stat(self, bucket, stat, comparison, value):
2246        """Wait for given stat for a bucket to given condition.
2247        @param bucket: bucket name
2248        @param stat: stat name
2249        @param comparison: comparison operatior e.g. "==", "<"
2250        @param value: value to compare.
2251        """
2252        task = self.__clusterop.async_wait_for_xdcr_stat(
2253            self.__nodes,
2254            bucket,
2255            '',
2256            stat,
2257            comparison,
2258            value)
2259        task.result()
2260
2261    def add_remote_cluster(self, dest_cluster, name, encryption=False, replicator_target_role=False):
2262        """Create remote cluster reference or add remote cluster for xdcr.
2263        @param dest_cluster: Destination cb cluster object.
2264        @param name: name of remote cluster reference
2265        @param encryption: True if encryption for xdcr else False
2266        """
2267        remote_cluster = XDCRRemoteClusterRef(
2268            self,
2269            dest_cluster,
2270            name,
2271            encryption,
2272            replicator_target_role
2273        )
2274        remote_cluster.add()
2275        self.__remote_clusters.append(remote_cluster)
2276
2277    # add params to what to modify
2278    def modify_remote_cluster(self, remote_cluster_name, require_encryption):
2279        """Modify Remote Cluster Reference Settings for given name.
2280        @param remote_cluster_name: name of the remote cluster to change.
2281        @param require_encryption: Value of encryption if need to change True/False.
2282        """
2283        for remote_cluster in self.__remote_clusters:
2284            if remote_cluster_name == remote_cluster.get_name():
2285                remote_cluster. modify(require_encryption)
2286                break
2287        else:
2288            raise XDCRException(
2289                "No such remote cluster found with name: {0}".format(
2290                    remote_cluster_name))
2291
2292    def wait_for_flusher_empty(self, timeout=60):
2293        """Wait for disk queue to completely flush.
2294        """
2295        tasks = []
2296        for node in self.__nodes:
2297            for bucket in self.__buckets:
2298                tasks.append(
2299                    self.__clusterop.async_wait_for_stats(
2300                        [node],
2301                        bucket,
2302                        '',
2303                        'ep_queue_size',
2304                        '==',
2305                        0))
2306        for task in tasks:
2307            task.result(timeout)
2308
2309    def verify_items_count(self, timeout=600):
2310        """Wait for actual bucket items count reach to the count on bucket kv_store.
2311        """
2312        active_key_count_passed = True
2313        replica_key_count_passed = True
2314        curr_time = time.time()
2315        end_time = curr_time + timeout
2316
2317        # Check active, curr key count
2318        rest = RestConnection(self.__master_node)
2319        buckets = copy.copy(self.get_buckets())
2320
2321        for bucket in buckets:
2322            items = sum([len(kv_store) for kv_store in bucket.kvs.values()])
2323            while True:
2324                try:
2325                    active_keys = int(rest.get_active_key_count(bucket.name))
2326                except Exception as e:
2327                    self.__log.error(e)
2328                    bucket_info = rest.get_bucket_json(bucket.name)
2329                    nodes = bucket_info["nodes"]
2330                    active_keys = 0
2331                    for node in nodes:
2332                        active_keys += node["interestingStats"]["curr_items"]
2333                if active_keys != items:
2334                        self.__log.warn("Not Ready: vb_active_curr_items %s == "
2335                                "%s expected on %s, %s bucket"
2336                                 % (active_keys, items, self.__name, bucket.name))
2337                        time.sleep(5)
2338                        if time.time() > end_time:
2339                            self.__log.error(
2340                            "ERROR: Timed-out waiting for active item count to match")
2341                            active_key_count_passed = False
2342                            break
2343                        continue
2344                else:
2345                    self.__log.info("Saw: vb_active_curr_items %s == "
2346                            "%s expected on %s, %s bucket"
2347                            % (active_keys, items, self.__name, bucket.name))
2348                    break
2349        # check replica count
2350        curr_time = time.time()
2351        end_time = curr_time + timeout
2352        buckets = copy.copy(self.get_buckets())
2353
2354        for bucket in buckets:
2355            if len(self.__nodes) > 1:
2356                items = sum([len(kv_store) for kv_store in bucket.kvs.values()])
2357                items = items * bucket.numReplicas
2358            else:
2359                items = 0
2360            while True:
2361                try:
2362                    replica_keys = int(rest.get_replica_key_count(bucket.name))
2363                except Exception as e:
2364                    self.__log.error(e)
2365                    bucket_info = rest.get_bucket_json(bucket.name)
2366                    nodes = bucket_info["nodes"]
2367                    replica_keys = 0
2368                    for node in nodes:
2369                        replica_keys += node["interestingStats"]["vb_replica_curr_items"]
2370                if replica_keys != items:
2371                    self.__log.warn("Not Ready: vb_replica_curr_items %s == "
2372                            "%s expected on %s, %s bucket"
2373                             % (replica_keys, items ,self.__name, bucket.name))
2374                    time.sleep(3)
2375                    if time.time() > end_time:
2376                        self.__log.error(
2377                        "ERROR: Timed-out waiting for replica item count to match")
2378                        replica_key_count_passed = False
2379                        self.run_cbvdiff()
2380                        break
2381                    continue
2382                else:
2383                    self.__log.info("Saw: vb_replica_curr_items %s == "
2384                            "%s expected on %s, %s bucket"
2385                            % (replica_keys, items, self.__name, bucket.name))
2386                    break
2387        return active_key_count_passed, replica_key_count_passed
2388
2389    def run_cbvdiff(self):
2390        """ Run cbvdiff, a tool that compares active and replica vbucket keys
2391        Eg. ./cbvdiff -b standardbucket  172.23.105.44:11210,172.23.105.45:11210
2392             VBucket 232: active count 59476 != 59477 replica count
2393        """
2394        node_str = ""
2395        for node in self.__nodes:
2396            if node_str:
2397                node_str += ','
2398            node_str += node.ip + ':11210'
2399        ssh_conn = RemoteMachineShellConnection(self.__master_node)
2400        for bucket in self.__buckets:
2401            self.__log.info(
2402                "Executing cbvdiff for bucket {0}".format(
2403                    bucket.name))
2404            if bucket.saslPassword:
2405                ssh_conn.execute_cbvdiff(bucket, node_str, bucket.saslPassword)
2406            else:
2407                ssh_conn.execute_cbvdiff(bucket, node_str)
2408        ssh_conn.disconnect()
2409
2410    def verify_data(self, kv_store=1, timeout=None,
2411                    max_verify=None, only_store_hash=True, batch_size=1000, skip=[]):
2412        """Verify data of all the buckets. Function read data from cb server and
2413        compare it with bucket's kv_store.
2414        @param kv_store: Index of kv_store where item values are stored on
2415        bucket.
2416        @param timeout: None if wait indefinitely else give timeout value.
2417        @param max_verify: number of items to verify. None if verify all items
2418        on bucket.
2419        @param only_store_hash: True if verify hash of items else False.
2420        @param batch_size: batch size to read items from server.
2421        """
2422        self.__data_verified = False
2423        tasks = []
2424        for bucket in self.__buckets:
2425            if bucket.name not in skip:
2426                tasks.append(
2427                        self.__clusterop.async_verify_data(
2428                                self.__master_node,
2429                                bucket,
2430                                bucket.kvs[kv_store],
2431                                max_verify,
2432                                only_store_hash,
2433                                batch_size,
2434                                timeout_sec=60,
2435                                compression=self.sdk_compression))
2436        for task in tasks:
2437            task.result(timeout)
2438
2439        self.__data_verified = True
2440
2441    def verify_meta_data(self, kv_store=1, timeout=None, skip=[]):
2442        """Verify if metadata for bucket matches on src and dest clusters
2443        @param kv_store: Index of kv_store where item values are stored on
2444        bucket.
2445        @param timeout: None if wait indefinitely else give timeout value.
2446        """
2447        self.__meta_data_verified = False
2448        tasks = []
2449        for bucket in self.__buckets:
2450            if bucket.name not in skip:
2451                data_map = {}
2452                gather_task = self.__clusterop.async_get_meta_data(self.__master_node, bucket, bucket.kvs[kv_store],
2453                                                        compression=self.sdk_compression)
2454                gather_task.result()
2455                data_map[bucket.name] = gather_task.get_meta_data_store()
2456                tasks.append(
2457                    self.__clusterop.async_verify_meta_data(
2458                        self.__master_node,
2459                        bucket,
2460                        bucket.kvs[kv_store],
2461                        data_map[bucket.name]
2462                        ))
2463        for task in tasks:
2464            task.result(timeout)
2465        self.__meta_data_verified = True
2466
2467    def wait_for_dcp_queue_drain(self, timeout=180):
2468        """Wait for ep_dcp_xdcr_items_remaining to reach 0.
2469        @return: True if reached 0 else False.
2470        """
2471        self.__log.info(
2472            "Waiting for dcp queue to drain on cluster node: %s" %
2473            self.__master_node.ip)
2474        curr_time = time.time()
2475        end_time = curr_time + timeout
2476        rest = RestConnection(self.__master_node)
2477        buckets = copy.copy(self.get_buckets())
2478        for bucket in buckets:
2479            try:
2480                mutations = int(rest.get_dcp_queue_size(bucket.name))
2481                self.__log.info(
2482                    "Current dcp queue size on %s for %s is %s" %
2483                    (self.__name, bucket.name, mutations))
2484                if mutations == 0:
2485                    buckets.remove(bucket)
2486                else:
2487                    time.sleep(5)
2488                    end_time = end_time - 5
2489            except Exception as e:
2490                self.__log.error(e)
2491            if curr_time > end_time:
2492                self.__log.error(
2493                "Timeout occurs while waiting for dcp queue to drain")
2494                return False
2495        return True
2496
2497    def wait_for_outbound_mutations(self, timeout=180):
2498        """Wait for Outbound mutations to reach 0.
2499        @return: True if mutations reached to 0 else False.
2500        """
2501        self.__log.info(
2502            "Waiting for Outbound mutation to be zero on cluster node: %s" %
2503            self.__master_node.ip)
2504        curr_time = time.time()
2505        end_time = curr_time + timeout
2506        rest = RestConnection(self.__master_node)
2507        while curr_time < end_time:
2508            found = 0
2509            for bucket in self.__buckets:
2510                try:
2511                    mutations = int(rest.get_xdc_queue_size(bucket.name))
2512                except KeyError:
2513                    self.__log.error("Stat \"replication_changes_left\" not found")
2514                    return False
2515                self.__log.info(
2516                    "Current Outbound mutations on cluster node: %s for bucket %s is %s" %
2517                    (self.__name, bucket.name, mutations))
2518                if mutations == 0:
2519                    found = found + 1
2520            if found == len(self.__buckets):
2521                break
2522            time.sleep(5)
2523            end_time = end_time - 5
2524        else:
2525            # MB-9707: Updating this code from fail to warning to avoid test
2526            # to abort, as per this
2527            # bug, this particular stat i.e. replication_changes_left is buggy.
2528            self.__log.error(
2529                "Timeout occurs while waiting for mutations to be replicated")
2530            return False
2531        return True
2532
2533    def pause_all_replications(self, verify=False):
2534        for remote_cluster_ref in self.__remote_clusters:
2535            remote_cluster_ref.pause_all_replications(verify)
2536
2537    def pause_all_replications_by_id(self, verify=False):
2538        for remote_cluster_ref in self.__remote_clusters:
2539            remote_cluster_ref.pause_all_replications_by_id(verify)
2540
2541    def resume_all_replications(self, verify=False):
2542        for remote_cluster_ref in self.__remote_clusters:
2543            remote_cluster_ref.resume_all_replications(verify)
2544
2545    def resume_all_replications_by_id(self, verify=False):
2546        for remote_cluster_ref in self.__remote_clusters:
2547            remote_cluster_ref.resume_all_replications_by_id(verify)
2548
2549    def enable_time_sync(self, enable):
2550        """
2551        @param enable: True if time_sync needs to enabled else False
2552        """
2553        # TODO call rest api
2554        pass
2555
2556    def set_wall_clock_time(self, date_str):
2557        for node in self.__nodes:
2558            NodeHelper.set_wall_clock_time(node, date_str)
2559
2560
2561class Utility:
2562
2563    @staticmethod
2564    def make_default_views(prefix, num_views, is_dev_ddoc=False):
2565        """Create default views for testing.
2566        @param prefix: prefix for view name
2567        @param num_views: number of views to create
2568        @param is_dev_ddoc: True if Development View else False
2569        """
2570        default_map_func = "function (doc) {\n  emit(doc._id, doc);\n}"
2571        default_view_name = (prefix, "default_view")[prefix is None]
2572        return [View(default_view_name + str(i), default_map_func,
2573                     None, is_dev_ddoc) for i in xrange(num_views)]
2574
2575    @staticmethod
2576    def get_rc_name(src_cluster_name, dest_cluster_name):
2577        return "remote_cluster_" + src_cluster_name + "-" + dest_cluster_name
2578
2579
2580class XDCRNewBaseTest(unittest.TestCase):
2581
2582    def setUp(self):
2583        unittest.TestCase.setUp(self)
2584        self._input = TestInputSingleton.input
2585        self.log = logger.Logger.get_logger()
2586        self.__init_logger()
2587        self.__cb_clusters = []
2588        self.__cluster_op = Cluster()
2589        self.__init_parameters()
2590        self.log.info(
2591            "==== XDCRNewbasetests setup is started for test #{0} {1} ===="
2592            .format(self.__case_number, self._testMethodName))
2593
2594        self.__setup_for_test()
2595
2596        self.log.info(
2597            "==== XDCRNewbasetests setup is finished for test #{0} {1} ===="
2598            .format(self.__case_number, self._testMethodName))
2599
2600    def __is_test_failed(self):
2601        return (hasattr(self, '_resultForDoCleanups')
2602                and len(self._resultForDoCleanups.failures
2603                        or self._resultForDoCleanups.errors)) \
2604            or (hasattr(self, '_exc_info')
2605                and self._exc_info()[1] is not None)
2606
2607    def __is_cleanup_needed(self):
2608        return self.__is_test_failed() and (str(self.__class__).find(
2609            'upgradeXDCR') != -1 or self._input.param("stop-on-failure", False)
2610        )
2611
2612    def __is_cluster_run(self):
2613        return len(set([server.ip for server in self._input.servers])) == 1
2614
2615    def tearDown(self):
2616        """Clusters cleanup"""
2617        if self._input.param("negative_test", False):
2618            if hasattr(self, '_resultForDoCleanups') \
2619                and len(self._resultForDoCleanups.failures
2620                        or self._resultForDoCleanups.errors):
2621                self._resultForDoCleanups.failures = []
2622                self._resultForDoCleanups.errors = []
2623                self.log.info("This is marked as a negative test and contains "
2624                              "errors as expected, hence not failing it")
2625            else:
2626                raise XDCRException("Negative test passed!")
2627
2628        # collect logs before tearing down clusters
2629        if self._input.param("get-cbcollect-info", False) and \
2630                self.__is_test_failed():
2631            for server in self._input.servers:
2632                self.log.info("Collecting logs @ {0}".format(server.ip))
2633                NodeHelper.collect_logs(server, self.__is_cluster_run())
2634
2635        for i in range(1, len(self.__cb_clusters) + 1):
2636            # Remove rbac users in teardown
2637            role_del = ['cbadminbucket']
2638            RbacBase().remove_user_role(role_del, RestConnection(self.get_cb_cluster_by_name('C' + str(i)).get_master_node()))
2639            if self._replicator_role:
2640                role_del = ['replicator_user']
2641                RbacBase().remove_user_role(role_del,
2642                                            RestConnection(self.get_cb_cluster_by_name('C' + str(i)).get_master_node()))
2643
2644        try:
2645            if self.__is_cleanup_needed() or self._input.param("skip_cleanup", False):
2646                self.log.warn("CLEANUP WAS SKIPPED")
2647                return
2648            self.log.info(
2649                "====  XDCRNewbasetests cleanup is started for test #{0} {1} ===="
2650                .format(self.__case_number, self._testMethodName))
2651            for cb_cluster in self.__cb_clusters:
2652                cb_cluster.cleanup_cluster(self,from_rest=True)
2653            self.log.info(
2654                "====  XDCRNewbasetests cleanup is finished for test #{0} {1} ==="
2655                .format(self.__case_number, self._testMethodName))
2656        finally:
2657            self.__cluster_op.shutdown(force=True)
2658            unittest.TestCase.tearDown(self)
2659
2660    def __init_logger(self):
2661        if self._input.param("log_level", None):
2662            self.log.setLevel(level=0)
2663            for hd in self.log.handlers:
2664                if str(hd.__class__).find('FileHandler') != -1:
2665                    hd.setLevel(level=logging.DEBUG)
2666                else:
2667                    hd.setLevel(
2668                        level=getattr(
2669                            logging,
2670                            self._input.param(
2671                                "log_level",
2672                                None)))
2673
2674    def __setup_for_test(self):
2675        use_hostanames = self._input.param("use_hostnames", False)
2676        sdk_compression = self._input.param("sdk_compression", True)
2677        counter = 1
2678        for _, nodes in self._input.clusters.iteritems():
2679            cluster_nodes = copy.deepcopy(nodes)
2680            if len(self.__cb_clusters) == int(self.__chain_length):
2681                break
2682            self.__cb_clusters.append(
2683                CouchbaseCluster(
2684                    "C%s" % counter, cluster_nodes,
2685                    self.log, use_hostanames, sdk_compression=sdk_compression))
2686            counter += 1
2687
2688        self.__cleanup_previous()
2689        self.__init_clusters()
2690
2691        for i in range(1, len(self.__cb_clusters) + 1):
2692            # Add built-in user to C1
2693            testuser = [{'id': 'cbadminbucket', 'name': 'cbadminbucket', 'password': 'password'}]
2694            RbacBase().create_user_source(testuser, 'builtin',
2695                                          self.get_cb_cluster_by_name('C' + str(i)).get_master_node())
2696
2697
2698            # Assign user to role
2699            role_list = [{'id': 'cbadminbucket', 'name': 'cbadminbucket', 'roles': 'admin'}]
2700            RbacBase().add_user_role(role_list,
2701                                     RestConnection(self.get_cb_cluster_by_name('C' + str(i)).get_master_node()),
2702                                     'builtin')
2703
2704
2705        self.__set_free_servers()
2706        if str(self.__class__).find('upgradeXDCR') == -1 and str(self.__class__).find('lww') == -1:
2707            self.__create_buckets()
2708
2709        if self._replicator_role:
2710            for i in range(1, len(self.__cb_clusters) + 1):
2711                testuser_replicator = [{'id': 'replicator_user', 'name': 'replicator_user', 'password': 'password'}]
2712                RbacBase().create_user_source(testuser_replicator, 'builtin',
2713                                              self.get_cb_cluster_by_name('C' + str(i)).get_master_node())
2714
2715                if self._replicator_role and self._replicator_all_buckets:
2716                    role = 'replication_target[*]'
2717                else:
2718                    role = 'replication_target[default]'
2719                role_list_replicator = [
2720                        {'id': 'replicator_user', 'name': 'replicator_user', 'roles': role}]
2721                RbacBase().add_user_role(role_list_replicator,
2722                                             RestConnection(self.get_cb_cluster_by_name('C' + str(i)).get_master_node()),
2723                                             'builtin')
2724
2725    def __init_parameters(self):
2726        self.__case_number = self._input.param("case_number", 0)
2727        self.__topology = self._input.param("ctopology", TOPOLOGY.CHAIN)
2728        # complex topology tests (> 2 clusters must specify chain_length >2)
2729        self.__chain_length = self._input.param("chain_length", 2)
2730        self.__rep_type = self._input.param("replication_type",REPLICATION_PROTOCOL.XMEM)
2731        self.__num_sasl_buckets = self._input.param("sasl_buckets", 0)
2732        self.__num_stand_buckets = self._input.param("standard_buckets", 0)
2733
2734        self.__eviction_policy = self._input.param("eviction_policy",'valueOnly')
2735        self.__mixed_priority = self._input.param("mixed_priority", None)
2736
2737        self.__lww = self._input.param("lww", 0)
2738        self.__fail_on_errors = self._input.param("fail_on_errors", True)
2739        # simply append to this list, any error from log we want to fail test on
2740        self.__report_error_list = []
2741        if self.__fail_on_errors:
2742            self.__report_error_list = ["panic:",
2743                                        "non-recoverable error from xmem client. response status=KEY_ENOENT"]
2744
2745        # for format {ip1: {"panic": 2, "KEY_ENOENT":3}}
2746        self.__error_count_dict = {}
2747        if len(self.__report_error_list) > 0:
2748            self.__initialize_error_count_dict()
2749
2750        self._repl_restart_count_dict = {}
2751        self.__initialize_repl_restart_count_dict()
2752
2753        # Public init parameters - Used in other tests too.
2754        # Move above private to this section if needed in future, but
2755        # Ensure to change other tests too.
2756        self._demand_encryption = self._input.param(
2757            "demand_encryption",
2758            False)
2759        self._num_replicas = self._input.param("replicas", 1)
2760        self._create_default_bucket = self._input.param("default_bucket",True)
2761        self._rdirection = self._input.param("rdirection",
2762                            REPLICATION_DIRECTION.UNIDIRECTION)
2763        self._num_items = self._input.param("items", 1000)
2764        self._value_size = self._input.param("value_size", 512)
2765        self._poll_timeout = self._input.param("poll_timeout", 120)
2766        self._perc_upd = self._input.param("upd", 30)
2767        self._perc_del = self._input.param("del", 30)
2768        self._upd_clusters = self._input.param("update", [])
2769        if self._upd_clusters:
2770            self._upd_clusters = self._upd_clusters.split("-")
2771        self._del_clusters = self._input.param("delete", [])
2772        if self._del_clusters:
2773            self._del_clusters = self._del_clusters.split('-')
2774        self._expires = self._input.param("expires", 0)
2775        self._wait_for_expiration = self._input.param("wait_for_expiration",True)
2776        self._warmup = self._input.param("warm", "").split('-')
2777        self._rebalance = self._input.param("rebalance", "").split('-')
2778        self._failover = self._input.param("failover", "").split('-')
2779        self._wait_timeout = self._input.param("timeout", 60)
2780        self._disable_compaction = self._input.param("disable_compaction","").split('-')
2781        self._item_count_timeout = self._input.param("item_count_timeout", 300)
2782        self._checkpoint_interval = self._input.param("checkpoint_interval",60)
2783        self._optimistic_threshold = self._input.param("optimistic_threshold", 256)
2784        self._dgm_run = self._input.param("dgm_run", False)
2785        self._active_resident_threshold = \
2786            self._input.param("active_resident_threshold", 100)
2787        CHECK_AUDIT_EVENT.CHECK = self._input.param("verify_audit", 0)
2788        self._max_verify = self._input.param("max_verify", 100000)
2789        self._sdk_compression = self._input.param("sdk_compression", True)
2790        self._evict_with_compactor = self._input.param("evict_with_compactor", False)
2791        self._replicator_role = self._input.param("replicator_role",False)
2792        self._replicator_all_buckets = self._input.param("replicator_all_buckets",False)
2793
2794    def __initialize_error_count_dict(self):
2795        """
2796            initializes self.__error_count_dict with ip, error and err count
2797            like {ip1: {"panic": 2, "KEY_ENOENT":3}}
2798        """
2799        if not self.__is_cluster_run():
2800            goxdcr_log = NodeHelper.get_goxdcr_log_dir(self._input.servers[0])\
2801                     + '/goxdcr.log*'
2802        for node in self._input.servers:
2803            if self.__is_cluster_run():
2804                goxdcr_log = NodeHelper.get_goxdcr_log_dir(node)\
2805                     + '/goxdcr.log*'
2806            self.__error_count_dict[node.ip] = {}
2807            for error in self.__report_error_list:
2808                self.__error_count_dict[node.ip][error] = \
2809                    NodeHelper.check_goxdcr_log(node, error, goxdcr_log)
2810        self.log.info(self.__error_count_dict)
2811
2812    def __initialize_repl_restart_count_dict(self):
2813        """
2814            initializes self.__error_count_dict with ip, repl restart count
2815            like {{ip1: 3}, {ip2: 4}}
2816        """
2817        if not self.__is_cluster_run():
2818            goxdcr_log = NodeHelper.get_goxdcr_log_dir(self._input.servers[0])\
2819                     + '/goxdcr.log*'
2820        for node in self._input.servers:
2821            if self.__is_cluster_run():
2822                goxdcr_log = NodeHelper.get_goxdcr_log_dir(node)\
2823                     + '/goxdcr.log*'
2824            self._repl_restart_count_dict[node.ip] = \
2825                NodeHelper.check_goxdcr_log(node,
2826                                            "Try to fix Pipeline",
2827                                            goxdcr_log)
2828        self.log.info(self._repl_restart_count_dict)
2829
2830    def __cleanup_previous(self):
2831        for cluster in self.__cb_clusters:
2832            cluster.cleanup_cluster(
2833                self,
2834                from_rest=True,
2835                cluster_shutdown=False)
2836
2837    def __init_clusters(self):
2838        self.log.info("Initializing all clusters...")
2839        disabled_consistent_view = self._input.param(
2840            "disabled_consistent_view",
2841            None)
2842        for cluster in self.__cb_clusters:
2843            cluster.init_cluster(disabled_consistent_view)
2844
2845    def __set_free_servers(self):
2846        total_servers = self._input.servers
2847        cluster_nodes = []
2848        for _, nodes in self._input.clusters.iteritems():
2849            cluster_nodes.extend(nodes)
2850        for server in total_servers:
2851            for cluster_node in cluster_nodes:
2852                if server.ip == cluster_node.ip and\
2853                                server.port == cluster_node.port:
2854                    break
2855                else:
2856                    continue
2857            else:
2858                FloatingServers._serverlist.append(server)
2859
2860    def get_cluster_op(self):
2861        return self.__cluster_op
2862
2863    def add_built_in_server_user(self, testuser=None, rolelist=None, node=None):
2864        """
2865           From spock, couchbase server is built with some users that handles
2866           some specific task such as:
2867               cbadminbucket
2868           Default added user is cbadminbucket with admin role
2869        """
2870        if testuser is None:
2871            testuser = [{'id': 'cbadminbucket', 'name': 'cbadminbucket',
2872                                                'password': 'password'}]
2873        if rolelist is None:
2874            rolelist = [{'id': 'cbadminbucket', 'name': 'cbadminbucket',
2875                                                      'roles': 'admin'}]
2876        if node is None:
2877            node = self.master
2878
2879        self.log.info("**** add built-in '%s' user to node %s ****" % (testuser[0]["name"],
2880                                                                       node.ip))
2881        RbacBase().create_user_source(testuser, 'builtin', node)
2882
2883        self.log.info("**** add '%s' role to '%s' user ****" % (rolelist[0]["roles"],
2884                                                                testuser[0]["name"]))
2885        RbacBase().add_user_role(rolelist, RestConnection(node), 'builtin')
2886
2887    def get_cb_cluster_by_name(self, name):
2888        """Return couchbase cluster object for given name.
2889        @return: CouchbaseCluster object
2890        """
2891        for cb_cluster in self.__cb_clusters:
2892            if cb_cluster.get_name() == name:
2893                return cb_cluster
2894        raise XDCRException("Couchbase Cluster with name: %s not exist" % name)
2895
2896    def get_num_cb_cluster(self):
2897        """Return number of couchbase clusters for tests.
2898        """
2899        return len(self.__cb_clusters)
2900
2901    def get_cb_clusters(self):
2902        return self.__cb_clusters
2903
2904    def get_lww(self):
2905        return self.__lww
2906
2907    def get_report_error_list(self):
2908        return self.__report_error_list
2909
2910    def __calculate_bucket_size(self, cluster_quota, num_buckets):
2911
2912        if 'quota_percent' in self._input.test_params:
2913            quota_percent = int(self._input.test_params['quota_percent'])
2914        else:
2915            quota_percent = None
2916
2917        dgm_run = self._input.param("dgm_run", 0)
2918        bucket_size = 0
2919        if dgm_run:
2920            # buckets cannot be created if size<100MB
2921            bucket_size = 256
2922        elif quota_percent is not None and num_buckets > 0:
2923            bucket_size = int( float(cluster_quota - 500) * float(quota_percent/100.0 ) /float(num_buckets) )
2924        elif num_buckets > 0:
2925            bucket_size = int((float(cluster_quota) - 500)/float(num_buckets))
2926        return bucket_size
2927
2928    def __create_buckets(self):
2929        # if mixed priority is set by user, set high priority for sasl and
2930        # standard buckets
2931        if self.__mixed_priority:
2932            bucket_priority = 'high'
2933        else:
2934            bucket_priority = None
2935        num_buckets = self.__num_sasl_buckets + \
2936            self.__num_stand_buckets + int(self._create_default_bucket)
2937
2938        maxttl = self._input.param("maxttl", None)
2939
2940        for cb_cluster in self.__cb_clusters:
2941            total_quota = cb_cluster.get_mem_quota()
2942            bucket_size = self.__calculate_bucket_size(
2943                total_quota,
2944                num_buckets)
2945
2946            if self._create_default_bucket:
2947                cb_cluster.create_default_bucket(
2948                    bucket_size,
2949                    self._num_replicas,
2950                    eviction_policy=self.__eviction_policy,
2951                    bucket_priority=bucket_priority,
2952                    lww=self.__lww,
2953                    maxttl=maxttl)
2954
2955            cb_cluster.create_sasl_buckets(
2956                bucket_size, num_buckets=self.__num_sasl_buckets,
2957                num_replicas=self._num_replicas,
2958                eviction_policy=self.__eviction_policy,
2959                bucket_priority=bucket_priority, lww=self.__lww,
2960                maxttl=maxttl)
2961
2962            cb_cluster.create_standard_buckets(
2963                bucket_size, num_buckets=self.__num_stand_buckets,
2964                num_replicas=self._num_replicas,
2965                eviction_policy=self.__eviction_policy,
2966                bucket_priority=bucket_priority, lww=self.__lww,
2967                maxttl=maxttl)
2968
2969    def create_buckets_on_cluster(self, cluster_name):
2970        # if mixed priority is set by user, set high priority for sasl and
2971        # standard buckets
2972        if self.__mixed_priority:
2973            bucket_priority = 'high'
2974        else:
2975            bucket_priority = None
2976        num_buckets = self.__num_sasl_buckets + \
2977            self.__num_stand_buckets + int(self._create_default_bucket)
2978
2979        cb_cluster = self.get_cb_cluster_by_name(cluster_name)
2980        total_quota = cb_cluster.get_mem_quota()
2981        bucket_size = self.__calculate_bucket_size(
2982            total_quota,
2983            num_buckets)
2984
2985        if self._create_default_bucket:
2986            cb_cluster.create_default_bucket(
2987                bucket_size,
2988                self._num_replicas,
2989                eviction_policy=self.__eviction_policy,
2990                bucket_priority=bucket_priority)
2991
2992        cb_cluster.create_sasl_buckets(
2993            bucket_size, num_buckets=self.__num_sasl_buckets,
2994            num_replicas=self._num_replicas,
2995            eviction_policy=self.__eviction_policy,
2996            bucket_priority=bucket_priority)
2997
2998        cb_cluster.create_standard_buckets(
2999            bucket_size, num_buckets=self.__num_stand_buckets,
3000            num_replicas=self._num_replicas,
3001            eviction_policy=self.__eviction_policy,
3002            bucket_priority=bucket_priority)
3003
3004    def __set_topology_chain(self):
3005        """Will Setup Remote Cluster Chain Topology i.e. A -> B -> C
3006        """
3007        for i, cb_cluster in enumerate(self.__cb_clusters):
3008            if i >= len(self.__cb_clusters) - 1:
3009                break
3010            cb_cluster.add_remote_cluster(
3011                self.__cb_clusters[i + 1],
3012                Utility.get_rc_name(
3013                    cb_cluster.get_name(),
3014                    self.__cb_clusters[i + 1].get_name()),
3015                self._demand_encryption,
3016                self._replicator_role
3017            )
3018            if self._rdirection == REPLICATION_DIRECTION.BIDIRECTION:
3019                self.__cb_clusters[i + 1].add_remote_cluster(
3020                    cb_cluster,
3021                    Utility.get_rc_name(
3022                        self.__cb_clusters[i + 1].get_name(),
3023                        cb_cluster.get_name()),
3024                    self._demand_encryption,
3025                    self._replicator_role
3026                )
3027
3028    def __set_topology_star(self):
3029        """Will Setup Remote Cluster Star Topology i.e. A-> B, A-> C, A-> D
3030        """
3031        hub = self.__cb_clusters[0]
3032        for cb_cluster in self.__cb_clusters[1:]:
3033            hub.add_remote_cluster(
3034                cb_cluster,
3035                Utility.get_rc_name(hub.get_name(), cb_cluster.get_name()),
3036                self._demand_encryption,
3037                self._replicator_role
3038            )
3039            if self._rdirection == REPLICATION_DIRECTION.BIDIRECTION:
3040                cb_cluster.add_remote_cluster(
3041                    hub,
3042                    Utility.get_rc_name(cb_cluster.get_name(), hub.get_name()),
3043                    self._demand_encryption,
3044                    self._replicator_role
3045                )
3046
3047    def __set_topology_ring(self):
3048        """
3049        Will Setup Remote Cluster Ring Topology i.e. A -> B -> C -> A
3050        """
3051        self.__set_topology_chain()
3052        self.__cb_clusters[-1].add_remote_cluster(
3053            self.__cb_clusters[0],
3054            Utility.get_rc_name(
3055                self.__cb_clusters[-1].get_name(),
3056                self.__cb_clusters[0].get_name()),
3057            self._demand_encryption,
3058            self._replicator_role
3059        )
3060        if self._rdirection == REPLICATION_DIRECTION.BIDIRECTION:
3061            self.__cb_clusters[0].add_remote_cluster(
3062                self.__cb_clusters[-1],
3063                Utility.get_rc_name(
3064                    self.__cb_clusters[0].get_name(),
3065                    self.__cb_clusters[-1].get_name()),
3066                self._demand_encryption,
3067                self._replicator_role
3068            )
3069
3070    def set_xdcr_topology(self):
3071        """Setup xdcr topology as per ctopology test parameter.
3072        """
3073        if self.__topology == TOPOLOGY.CHAIN:
3074            self.__set_topology_chain()
3075        elif self.__topology == TOPOLOGY.STAR:
3076            self.__set_topology_star()
3077        elif self.__topology == TOPOLOGY.RING:
3078            self.__set_topology_ring()
3079        elif self._input.param(TOPOLOGY.HYBRID, 0):
3080            self.set_hybrid_topology()
3081        else:
3082            raise XDCRException(
3083                'Unknown topology set: {0}'.format(
3084                    self.__topology))
3085
3086    def __parse_topology_param(self):
3087        tokens = re.split(r'(>|<>|<|\s)', self.__topology)
3088        return tokens
3089
3090    def set_hybrid_topology(self):
3091        """Set user defined topology
3092        Hybrid Topology Notations:
3093        '> or <' for Unidirection replication between clusters
3094        '<>' for Bi-direction replication between clusters
3095        Test Input:  ctopology="C1>C2<>C3>C4<>C1"
3096        """
3097        tokens = self.__parse_topology_param()
3098        counter = 0
3099        while counter < len(tokens) - 1:
3100            src_cluster = self.get_cb_cluster_by_name(tokens[counter])
3101            dest_cluster = self.get_cb_cluster_by_name(tokens[counter + 2])
3102            if ">" in tokens[counter + 1]:
3103                src_cluster.add_remote_cluster(
3104                    dest_cluster,
3105                    Utility.get_rc_name(
3106                        src_cluster.get_name(),
3107                        dest_cluster.get_name()),
3108                    self._demand_encryption
3109                )
3110            if "<" in tokens[counter + 1]:
3111                dest_cluster.add_remote_cluster(
3112                    src_cluster,
3113                    Utility.get_rc_name(
3114                        dest_cluster.get_name(), src_cluster.get_name()),
3115                    self._demand_encryption
3116                )
3117            counter += 2
3118
3119    def __async_load_chain(self):
3120        for i, cluster in enumerate(self.__cb_clusters):
3121            if self._rdirection == REPLICATION_DIRECTION.BIDIRECTION:
3122                if i > len(self.__cb_clusters) - 1:
3123                    break
3124            else:
3125                if i >= len(self.__cb_clusters) - 1:
3126                    break
3127            if not self._dgm_run:
3128                return cluster.async_load_all_buckets(self._num_items,
3129                                                      self._value_size)
3130            else:
3131                #TODO: async this!
3132                cluster.load_all_buckets_till_dgm(
3133                    active_resident_threshold=self._active_resident_threshold,
3134                    items=self._num_items)
3135
3136    def __load_chain(self):
3137        for i, cluster in enumerate(self.__cb_clusters):
3138            if self._rdirection == REPLICATION_DIRECTION.BIDIRECTION:
3139                if i > len(self.__cb_clusters) - 1:
3140                    break
3141            else:
3142                if i >= len(self.__cb_clusters) - 1:
3143                    break
3144            if not self._dgm_run:
3145                cluster.load_all_buckets(self._num_items, self._value_size)
3146            else:
3147                cluster.load_all_buckets_till_dgm(
3148                    active_resident_threshold=self._active_resident_threshold,
3149                    items=self._num_items)
3150
3151    def __load_star(self):
3152        hub = self.__cb_clusters[0]
3153        if self._dgm_run:
3154            hub.load_all_buckets_till_dgm(
3155                active_resident_threshold=self._active_resident_threshold,
3156                item=self._num_items)
3157        else:
3158            hub.load_all_buckets(self._num_items, self._value_size)
3159
3160    def __async_load_star(self):
3161        hub = self.__cb_clusters[0]
3162        if self._dgm_run:
3163            #TODO: async this
3164            hub.load_all_buckets_till_dgm(
3165                active_resident_threshold=self._active_resident_threshold,
3166                item=self._num_items)
3167        else:
3168            return hub.async_load_all_buckets(self._num_items, self._value_size)
3169
3170    def __load_ring(self):
3171        self.__load_chain()
3172
3173    def __async_load_ring(self):
3174        self.__async_load_chain()
3175
3176    def load_data_topology(self):
3177        """load data as per ctopology test parameter
3178        """
3179        if self.__topology == TOPOLOGY.CHAIN:
3180            self.__load_chain()
3181        elif self.__topology == TOPOLOGY.STAR:
3182            self.__load_star()
3183        elif self.__topology == TOPOLOGY.RING:
3184            self.__load_ring()
3185        elif self._input.param(TOPOLOGY.HYBRID, 0):
3186            self.__load_star()
3187        else:
3188            raise XDCRException(
3189                'Unknown topology set: {0}'.format(
3190                    self.__topology))
3191
3192    def async_load_data_topology(self):
3193        """load data as per ctopology test parameter
3194        """
3195        if self.__topology == TOPOLOGY.CHAIN:
3196            return self.__async_load_chain()
3197        elif self.__topology == TOPOLOGY.STAR:
3198            return self.__async_load_star()
3199        elif self.__topology == TOPOLOGY.