1import sys 2import time 3import datetime 4import copy 5import os 6 7sys.path = ["../"] + sys.path 8 9import unittest 10import logger 11from membase.api.rest_client import RestConnection, Bucket, RestHelper 12from couchbase_helper.cluster import Cluster 13from TestInput import TestInputSingleton 14from membase.helper.bucket_helper import BucketOperationHelper 15from membase.helper.cluster_helper import ClusterOperationHelper 16from cache import ObjCacher, CacheHelper 17import testcfg as cfg 18from app.workload_manager import ClusterStatus 19from testconstants import STANDARD_BUCKET_PORT 20 21ObjCacher().clear(CacheHelper.CLUSTERSTATUSKEY) 22 23class initialize(unittest.TestCase): 24 def setUp(self): 25 self._log = logger.Logger.get_logger() 26 self._input = TestInputSingleton.input 27 self._clusters_dic = self._input.clusters 28 self._clusters_keys_olst = range(len(self._clusters_dic)) 29 try: 30 self._num_initial_nodes = self._input.param("initial_nodes", '1').split(',') 31 except: 32 self._num_initial_nodes = [self._input.param("initial_nodes", '1')] 33 self._buckets = [] 34 self._default_bucket = self._input.param("default_bucket", False) 35 if self._default_bucket: 36 self.default_bucket_name = "default" 37 self._standard_buckets = self._input.param("standard_buckets", 0) 38 self._sasl_buckets = self._input.param("sasl_buckets",0) 39 self._buckets = [] 40 self._default_quota = self._input.param("default_mem_quota", 0) 41 self._sasl_quota = self._input.param("sasl_mem_quota", 0) 42 self._standard_quota = self._input.param("standard_mem_quota", 0) 43 self._mem_quota_int = 0 44 self._num_replicas = self._input.param("replicas", 1) 45 self._xdcr = self._input.param("xdcr", False) 46 self._rdirection = self._input.param("rdirection","unidirection") 47 if self._xdcr: 48 #Considering that there be a maximum of 2 clusters for XDCR 49 self._s_master = self._clusters_dic[0][0] 50 self._d_master = self._clusters_dic[1][0] 51 52 def tearDown(self): 53 pass 54 55class SETUP(initialize): 56 def setitup(self): 57 # if user forget to assign the number of initial nodes for any cluster 58 # use 1 node as default 59 if len(self._num_initial_nodes) < len(self._clusters_keys_olst): 60 diff = len(self._clusters_keys_olst) - len(self._num_initial_nodes) 61 for i in range(diff): 62 self._num_initial_nodes.append('1') 63 64 for key in self._clusters_keys_olst: 65 clusterStatus = None 66 if key == 0: 67 clusterStatus = CacheHelper.clusterstatus(cfg.CB_CLUSTER_TAG+"_status") or ClusterStatus() 68 else: 69 clusterStatus = CacheHelper.clusterstatus(cfg.CB_REMOTE_CLUSTER_TAG[key-1]+"_status") or\ 70 ClusterStatus(cfg.CB_REMOTE_CLUSTER_TAG[key-1]+"_status") 71 72 clusterStatus.all_available_hosts = ["%s:%s" % (node.ip, node.port) for node in self._clusters_dic[key]] 73 74 self.set_the_cluster_up(self._clusters_dic[key][:int(self._num_initial_nodes[key])]) 75 76 time.sleep(20) 77 78 if self._xdcr: 79 self._link_create_replications(self._s_master, self._d_master, "cluster1") 80 if self._rdirection == "bidirection": 81 self._link_create_replications(self._d_master, self._s_master, "cluster0") 82 83 def setupXDCR(self): 84 self._link_create_replications(self._s_master, self._d_master, "cluster1") 85 if self._rdirection == "bidirection": 86 self._link_create_replications(self._d_master, self._s_master, "cluster0") 87 88 def terminate(self): 89 if self._xdcr: 90 self._terminate_replications(self._s_master, "cluster1") 91 if self._rdirection == "bidirection": 92 self._terminate_replications(self._d_master, "cluster0") 93 for key in self._clusters_keys_olst: 94 nodes = self._clusters_dic[key] 95 for node in nodes: 96 rest = RestConnection(node) 97 buckets = rest.get_buckets() 98 for bucket in buckets: 99 status = rest.delete_bucket(bucket.name) 100 if status: 101 self._log.info('Deleted bucket : {0} from {1}'.format(bucket.name, node.ip)) 102 rest = RestConnection(nodes[0]) 103 helper = RestHelper(rest) 104 servers = rest.node_statuses() 105 master_id = rest.get_nodes_self().id 106 if len(nodes) > 1: 107 removed = helper.remove_nodes(knownNodes=[node.id for node in servers], 108 ejectedNodes=[node.id for node in servers if node.id != master_id], 109 wait_for_rebalance=True ) 110 111 def _terminate_replications(self, master, cluster_name): 112 rest = RestConnection(master) 113 rest.remove_all_replications() 114 os.system("curl --user {0}:{1} -X DELETE http://{2}:{3}/pools/default/remoteClusters/{4}".format( 115 master.rest_username, master.rest_password, master.ip, master.port, cluster_name)) 116 117 def set_the_cluster_up(self, nodes): 118 self._init_nodes(nodes) 119 self._config_cluster(nodes) 120 self._create_buckets(nodes) 121 122 def _init_nodes(self, nodes): 123 for node in nodes: 124 rest = RestConnection(node) 125 rest.init_cluster(node.rest_username, node.rest_password) 126 info = rest.get_nodes_self() 127 quota = int(info.mcdMemoryReserved) 128 self._mem_quota_int = quota 129 rest.init_cluster_memoryQuota(node.rest_username, node.rest_password, quota) 130 131 def _config_cluster(self, nodes): 132 master = nodes[0] 133 rest = RestConnection(master) 134 for node in nodes[1:]: 135 rest.add_node(master.rest_username, master.rest_password, 136 node.ip, node.port) 137 servers = rest.node_statuses() 138 rest.rebalance(otpNodes=[node.id for node in servers], ejectedNodes=[]) 139 time.sleep(5) 140 141 def _create_buckets(self, nodes): 142 master_node = nodes[0] 143 num_buckets = 0 144 if self._default_bucket: 145 num_buckets += 1 146 num_buckets += self._sasl_buckets + self._standard_buckets 147 if num_buckets == 0: 148 return 149 bucket_size = self._get_bucket_size(master_node, nodes, self._mem_quota_int, num_buckets) 150 rest = RestConnection(master_node) 151 master_id = rest.get_nodes_self().id 152 if self._default_bucket: 153 if self._default_quota != 0: 154 bucket_size = self._default_quota 155 rest = RestConnection(nodes[0]) 156 rest.create_bucket(bucket=self.default_bucket_name, 157 ramQuotaMB=bucket_size, 158 replicaNumber=self._num_replicas, 159 proxyPort=11211, 160 authType="none", 161 saslPassword=None) 162 self._buckets.append(self.default_bucket_name) 163 if self._sasl_buckets > 0: 164 if self._sasl_quota != 0: 165 bucket_size = self._sasl_quota 166 self._create_sasl_buckets(master_node, master_id, bucket_size, password="password") 167 if self._standard_buckets > 0: 168 if self._standard_quota != 0: 169 bucket_size = self._standard_quota 170 self._create_standard_buckets(master_node, master_id, bucket_size) 171 172 def _link_create_replications(self, master_1, master_2, cluster_name): 173 rest = RestConnection(master_1) 174 rest.add_remote_cluster(master_2.ip, master_2.port, master_1.rest_username, 175 master_1.rest_password, cluster_name) 176 time.sleep(30) 177 if len(self._buckets) == 0: 178 self._buckets = rest.get_buckets() 179 for bucket in set(self._buckets): 180 rep_id = rest.start_replication("continuous", bucket, cluster_name) 181 182 def _create_sasl_buckets(self, server, server_id, bucket_size, password): 183 rest = RestConnection(server) 184 for i in range(self._sasl_buckets): 185 if i == 0: 186 name = "saslbucket" 187 else: 188 name = "saslbucket-" + str(i) 189 rest.create_bucket(bucket=name, 190 ramQuotaMB=bucket_size, 191 replicaNumber=self._num_replicas, 192 proxyPort=11211, 193 authType="sasl", 194 saslPassword=password) 195 self._buckets.append(name) 196 197 def _create_standard_buckets(self, server, server_id, bucket_size): 198 rest = RestConnection(server) 199 for i in range(self._standard_buckets): 200 if i == 0: 201 name = "standardbucket" 202 else: 203 name = "standardbucket-" + str(i) 204 rest.create_bucket(bucket=name, 205 ramQuotaMB=bucket_size, 206 replicaNumber=self._num_replicas, 207 proxyPort=STANDARD_BUCKET_PORT + i, 208 authType="none", 209 saslPassword=None) 210 self._buckets.append(name) 211 212 def _get_bucket_size(self, master_node, nodes, mem_quota, num_buckets, ratio=3.0 / 2.0): 213 for node in nodes: 214 if node.ip == master_node.ip: 215 return int(ratio / float(len(nodes)) / float(num_buckets) * float(mem_quota)) 216 return int(ratio / float(num_buckets) * float(mem_quota)) 217