1import sys
2import time
3import datetime
4import copy
5import os
6
7sys.path = ["../"] + sys.path
8
9import unittest
10import logger
11from membase.api.rest_client import RestConnection, Bucket, RestHelper
12from couchbase_helper.cluster import Cluster
13from TestInput import TestInputSingleton
14from membase.helper.bucket_helper import BucketOperationHelper
15from membase.helper.cluster_helper import ClusterOperationHelper
16from cache import ObjCacher, CacheHelper
17import testcfg as cfg
18from app.workload_manager import ClusterStatus
19from testconstants import STANDARD_BUCKET_PORT
20
21ObjCacher().clear(CacheHelper.CLUSTERSTATUSKEY)
22
23class initialize(unittest.TestCase):
24    def setUp(self):
25        self._log = logger.Logger.get_logger()
26        self._input = TestInputSingleton.input
27        self._clusters_dic = self._input.clusters
28        self._clusters_keys_olst = range(len(self._clusters_dic))
29        try:
30            self._num_initial_nodes = self._input.param("initial_nodes", '1').split(',')
31        except:
32            self._num_initial_nodes = [self._input.param("initial_nodes", '1')]
33        self._buckets = []
34        self._default_bucket = self._input.param("default_bucket", False)
35        if self._default_bucket:
36            self.default_bucket_name = "default"
37        self._standard_buckets = self._input.param("standard_buckets", 0)
38        self._sasl_buckets = self._input.param("sasl_buckets",0)
39        self._buckets = []
40        self._default_quota = self._input.param("default_mem_quota", 0)
41        self._sasl_quota = self._input.param("sasl_mem_quota", 0)
42        self._standard_quota = self._input.param("standard_mem_quota", 0)
43        self._mem_quota_int = 0
44        self._num_replicas = self._input.param("replicas", 1)
45        self._xdcr = self._input.param("xdcr", False)
46        self._rdirection = self._input.param("rdirection","unidirection")
47        if self._xdcr:
48            #Considering that there be a maximum of 2 clusters for XDCR
49            self._s_master = self._clusters_dic[0][0]
50            self._d_master = self._clusters_dic[1][0]
51
52    def tearDown(self):
53        pass
54
55class SETUP(initialize):
56    def setitup(self):
57        # if user forget to assign the number of initial nodes for any cluster
58        # use 1 node as default
59        if len(self._num_initial_nodes) < len(self._clusters_keys_olst):
60            diff = len(self._clusters_keys_olst) - len(self._num_initial_nodes)
61            for i in range(diff):
62                self._num_initial_nodes.append('1')
63
64        for key in self._clusters_keys_olst:
65            clusterStatus = None
66            if key == 0:
67                clusterStatus = CacheHelper.clusterstatus(cfg.CB_CLUSTER_TAG+"_status") or ClusterStatus()
68            else:
69                clusterStatus = CacheHelper.clusterstatus(cfg.CB_REMOTE_CLUSTER_TAG[key-1]+"_status") or\
70                    ClusterStatus(cfg.CB_REMOTE_CLUSTER_TAG[key-1]+"_status")
71
72            clusterStatus.all_available_hosts = ["%s:%s" % (node.ip, node.port) for node in self._clusters_dic[key]]
73
74            self.set_the_cluster_up(self._clusters_dic[key][:int(self._num_initial_nodes[key])])
75
76        time.sleep(20)
77
78        if self._xdcr:
79            self._link_create_replications(self._s_master, self._d_master, "cluster1")
80            if self._rdirection == "bidirection":
81                self._link_create_replications(self._d_master, self._s_master, "cluster0")
82
83    def setupXDCR(self):
84        self._link_create_replications(self._s_master, self._d_master, "cluster1")
85        if self._rdirection == "bidirection":
86            self._link_create_replications(self._d_master, self._s_master, "cluster0")
87
88    def terminate(self):
89        if self._xdcr:
90            self._terminate_replications(self._s_master, "cluster1")
91            if self._rdirection == "bidirection":
92                self._terminate_replications(self._d_master, "cluster0")
93        for key in self._clusters_keys_olst:
94            nodes = self._clusters_dic[key]
95            for node in nodes:
96                rest = RestConnection(node)
97                buckets = rest.get_buckets()
98                for bucket in buckets:
99                    status = rest.delete_bucket(bucket.name)
100                    if status:
101                        self._log.info('Deleted bucket : {0} from {1}'.format(bucket.name, node.ip))
102            rest = RestConnection(nodes[0])
103            helper = RestHelper(rest)
104            servers = rest.node_statuses()
105            master_id = rest.get_nodes_self().id
106            if len(nodes) > 1:
107                removed = helper.remove_nodes(knownNodes=[node.id for node in servers],
108                                          ejectedNodes=[node.id for node in servers if node.id != master_id],
109                                          wait_for_rebalance=True   )
110
111    def _terminate_replications(self, master, cluster_name):
112        rest = RestConnection(master)
113        rest.remove_all_replications()
114        os.system("curl --user {0}:{1} -X DELETE http://{2}:{3}/pools/default/remoteClusters/{4}".format(
115                    master.rest_username, master.rest_password, master.ip, master.port, cluster_name))
116
117    def set_the_cluster_up(self, nodes):
118        self._init_nodes(nodes)
119        self._config_cluster(nodes)
120        self._create_buckets(nodes)
121
122    def _init_nodes(self, nodes):
123        for node in nodes:
124            rest = RestConnection(node)
125            rest.init_cluster(node.rest_username, node.rest_password)
126            info = rest.get_nodes_self()
127            quota = int(info.mcdMemoryReserved)
128            self._mem_quota_int = quota
129            rest.init_cluster_memoryQuota(node.rest_username, node.rest_password, quota)
130
131    def _config_cluster(self, nodes):
132        master = nodes[0]
133        rest = RestConnection(master)
134        for node in nodes[1:]:
135            rest.add_node(master.rest_username, master.rest_password,
136                          node.ip, node.port)
137        servers = rest.node_statuses()
138        rest.rebalance(otpNodes=[node.id for node in servers], ejectedNodes=[])
139        time.sleep(5)
140
141    def _create_buckets(self, nodes):
142        master_node = nodes[0]
143        num_buckets = 0
144        if self._default_bucket:
145            num_buckets += 1
146        num_buckets += self._sasl_buckets + self._standard_buckets
147        if num_buckets == 0:
148            return
149        bucket_size = self._get_bucket_size(master_node, nodes, self._mem_quota_int, num_buckets)
150        rest = RestConnection(master_node)
151        master_id = rest.get_nodes_self().id
152        if self._default_bucket:
153            if self._default_quota != 0:
154                bucket_size = self._default_quota
155            rest = RestConnection(nodes[0])
156            rest.create_bucket(bucket=self.default_bucket_name,
157                               ramQuotaMB=bucket_size,
158                               replicaNumber=self._num_replicas,
159                               proxyPort=11211,
160                               authType="none",
161                               saslPassword=None)
162            self._buckets.append(self.default_bucket_name)
163        if self._sasl_buckets > 0:
164            if self._sasl_quota != 0:
165                bucket_size = self._sasl_quota
166            self._create_sasl_buckets(master_node, master_id, bucket_size, password="password")
167        if self._standard_buckets > 0:
168            if self._standard_quota != 0:
169                bucket_size = self._standard_quota
170            self._create_standard_buckets(master_node, master_id, bucket_size)
171
172    def _link_create_replications(self, master_1, master_2, cluster_name):
173        rest = RestConnection(master_1)
174        rest.add_remote_cluster(master_2.ip, master_2.port, master_1.rest_username,
175                                 master_1.rest_password, cluster_name)
176        time.sleep(30)
177        if len(self._buckets) == 0:
178            self._buckets = rest.get_buckets()
179        for bucket in set(self._buckets):
180            rep_id = rest.start_replication("continuous", bucket, cluster_name)
181
182    def _create_sasl_buckets(self, server, server_id, bucket_size, password):
183        rest = RestConnection(server)
184        for i in range(self._sasl_buckets):
185            if i == 0:
186                name = "saslbucket"
187            else:
188                name = "saslbucket-" + str(i)
189            rest.create_bucket(bucket=name,
190                               ramQuotaMB=bucket_size,
191                               replicaNumber=self._num_replicas,
192                               proxyPort=11211,
193                               authType="sasl",
194                               saslPassword=password)
195            self._buckets.append(name)
196
197    def _create_standard_buckets(self, server, server_id, bucket_size):
198        rest = RestConnection(server)
199        for i in range(self._standard_buckets):
200            if i == 0:
201                name = "standardbucket"
202            else:
203                name = "standardbucket-" + str(i)
204            rest.create_bucket(bucket=name,
205                               ramQuotaMB=bucket_size,
206                               replicaNumber=self._num_replicas,
207                               proxyPort=STANDARD_BUCKET_PORT + i,
208                               authType="none",
209                               saslPassword=None)
210            self._buckets.append(name)
211
212    def _get_bucket_size(self, master_node, nodes, mem_quota, num_buckets, ratio=3.0 / 2.0):
213        for node in nodes:
214            if node.ip == master_node.ip:
215                return int(ratio / float(len(nodes)) / float(num_buckets) * float(mem_quota))
216        return int(ratio / float(num_buckets) * float(mem_quota))
217