1import copy
2import random
3import unittest
4import time
5from basetestcase import BaseTestCase
6from couchbase_helper.documentgenerator import DocumentGenerator
7from membase.helper.cluster_helper import ClusterOperationHelper
8from remote.remote_util import RemoteMachineShellConnection
9from membase.api.rest_client import RestConnection
10from memcached.helper.data_helper import VBucketAwareMemcached,MemcachedClientHelper
11
12class GetrTests(BaseTestCase):
13
14    DURING_REBALANCE = 1
15    AFTER_REBALANCE = 2
16    SWAP_REBALANCE = 3
17
18    FAILOVER_NO_REBALANCE = 1
19    FAILOVER_ADD_BACK = 2
20    FAILOVER_REBALANCE = 3
21
22    def setUp(self):
23        super(GetrTests, self).setUp()
24        descr = self.input.param("descr", "")
25        if descr:
26            self.log.info("Test:{0}".format(descr))
27        self.skipload = self.input.param("skipload", False)
28        self.data_ops = self.input.param("data_ops", 'create')
29        self.expiration = self.input.param("expiration", 0)
30        self.wait_expiration = self.input.param("wait_expiration", False)
31        self.flags = self.input.param("flags", 0)
32        self.warmup_nodes = self.input.param("warmup", 0)
33        self.rebalance = self.input.param("rebalance", 0)
34        self.failover = self.input.param("failover", 0)
35        self.failover_factor = self.input.param("failover-factor", 1)
36        self.error = self.input.param("error", None)
37        self.replica_to_read = self.input.param("replica_to_read", 0)
38        self.value_size = self.input.param("value_size", 0)
39
40    def tearDown(self):
41        super(GetrTests, self).tearDown()
42
43    def getr_test(self):
44        if self.nodes_init > len(self.servers):
45            result = unittest.TextTestRunner(verbosity=2)._makeResult()
46            result.skipped=[('getr_test', "There is not enough VMs!!!")]
47            return result
48
49        gen_1 = DocumentGenerator('test_docs', '{{"age": {0}}}', xrange(5),
50                                      start=0, end=self.num_items/2)
51        gen_2 = DocumentGenerator('test_docs', '{{"age": {0}}}', xrange(5),
52                                      start=self.num_items/2, end=self.num_items)
53        if self.value_size:
54            gen_1 = DocumentGenerator('test_docs', '{{"name": "{0}"}}',
55                                      [self.value_size * 'a'],
56                                      start=0, end=self.num_items/2)
57            gen_2 = DocumentGenerator('test_docs', '{{"name": "{0}"}}',
58                                      [self.value_size * 'a'],
59                                      start=self.num_items/2, end=self.num_items)
60        self.log.info("LOAD PHASE")
61        if not self.skipload:
62            self.perform_docs_ops(self.master, [gen_1, gen_2], self.data_ops)
63
64        self.log.info("CLUSTER OPS PHASE")
65        if self.rebalance == GetrTests.AFTER_REBALANCE:
66            self.cluster.rebalance(self.servers[:self.nodes_init],
67                                   self.servers[self.nodes_init:], [])
68        if self.rebalance == GetrTests.DURING_REBALANCE:
69            rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
70                            self.servers[self.nodes_init : self.nodes_init + self.nodes_in],
71                            [])
72        if self.rebalance == GetrTests.SWAP_REBALANCE:
73            self.cluster.rebalance(self.servers[:self.nodes_init],
74                                   self.servers[self.nodes_init :
75                                                self.nodes_init + self.nodes_in],
76                                   self.servers[self.nodes_init - self.nodes_in : self.nodes_init])
77        if self.warmup_nodes:
78            self.perform_warm_up()
79        if self.failover:
80            self.perform_failover()
81        if self.wait_expiration:
82            self.sleep(self.expiration)
83        try:
84            self.log.info("READ REPLICA PHASE")
85            servrs = self.servers[:self.nodes_init]
86            self.expire_pager(servrs)
87            if self.failover in [GetrTests.FAILOVER_NO_REBALANCE, GetrTests.FAILOVER_REBALANCE]:
88                servrs = self.servers[:self.nodes_init - self.failover_factor]
89            if self.rebalance == GetrTests.AFTER_REBALANCE:
90                servrs = self.servers
91            if self.rebalance == GetrTests.SWAP_REBALANCE:
92                servrs = self.servers[:self.nodes_init - self.nodes_in]
93                servrs.extend(self.servers[self.nodes_init :
94                                           self.nodes_init + self.nodes_in])
95
96            self.log.info("Checking replica read")
97            if self.failover == GetrTests.FAILOVER_NO_REBALANCE:
98                self._verify_all_buckets(self.master, only_store_hash=False,
99                                         replica_to_read=self.replica_to_read,
100                                         batch_size=1)
101            else:
102                self.verify_cluster_stats(servrs, only_store_hash=False,
103                                          replica_to_read=self.replica_to_read, batch_size=1,
104                                          timeout=(self.wait_timeout * 10))
105        except Exception, ex:
106            if self.error and str(ex).find(self.error) != -1:
107                self.log.info("Expected error %s appeared as expected" % self.error)
108            else:
109                raise ex
110        if self.rebalance == GetrTests.DURING_REBALANCE:
111            rebalance.result()
112
113    def getr_negative_test(self):
114        gen_1 = DocumentGenerator('test_docs', '{{"age": {0}}}', xrange(5),
115                                      start=0, end=self.num_items/2)
116        gen_2 = DocumentGenerator('test_docs', '{{"age": {0}}}', xrange(5),
117                                      start=self.num_items/2, end=self.num_items)
118        self.log.info("LOAD PHASE")
119        if not self.skipload:
120            self.perform_docs_ops(self.master, [gen_1, gen_2], self.data_ops)
121
122        if self.wait_expiration:
123            self.sleep(self.expiration)
124
125        self.log.info("READ REPLICA PHASE")
126        self.log.info("Checking replica read")
127        self.expire_pager([self.master])
128        try:
129            self._load_all_buckets(self.master, gen_1, 'read_replica', self.expiration, batch_size=1)
130        except Exception, ex:
131            if self.error and str(ex).find(self.error) != -1:
132                self.log.info("Expected error %s appeared as expected" % self.error)
133            else:
134                raise ex
135        else:
136            if self.error:
137                self.fail("Expected error %s didn't appear as expected" % self.error)
138
139    def getr_negative_corrupted_keys_test(self):
140        key = self.input.param("key", '')
141        gen = DocumentGenerator('test_docs', '{{"age": {0}}}', xrange(5),
142                                      start=0, end=self.num_items)
143        self.perform_docs_ops(self.master, [gen], 'create')
144        self.log.info("Checking replica read")
145        client = VBucketAwareMemcached(RestConnection(self.master), self.default_bucket_name)
146        try:
147            o, c, d = client.getr(key)
148        except Exception, ex:
149            if self.error and str(ex).find(self.error) != -1:
150                self.log.info("Expected error %s appeared as expected" % self.error)
151            else:
152                raise ex
153        else:
154            if self.error:
155                self.fail("Expected error %s didn't appear as expected" % self.error)
156
157    def test_getr_bucket_ops(self):
158        bucket_to_delete_same_read = self.input.param("bucket_to_delete_same_read", True)
159        gen_1 = DocumentGenerator('test_docs', '{{"age": {0}}}', xrange(5),
160                                      start=0, end=self.num_items)
161        self.log.info("LOAD PHASE")
162        self.perform_docs_ops(self.master, [gen_1], self.data_ops)
163
164        self.log.info("Start bucket ops")
165        bucket_read = self.buckets[0]
166        bucket_delete = (self.buckets[1], self.buckets[0])[bucket_to_delete_same_read]
167        try:
168            self.log.info("READ REPLICA PHASE")
169            self.log.info("Checking replica read")
170            task_verify = self.cluster.async_verify_data(self.master, bucket_read,
171                                                         bucket_read.kvs[1],
172                                                         only_store_hash=False,
173                                                         replica_to_read=self.replica_to_read)
174            task_delete_bucket = self.cluster.async_bucket_delete(self.master, bucket_delete.name)
175            task_verify.result()
176            task_delete_bucket.result()
177        except Exception, ex:
178            task_delete_bucket.result()
179            if self.error and str(ex).find(self.error) != -1:
180                self.log.info("Expected error %s appeared as expected" % self.error)
181            else:
182                raise ex
183        else:
184            if self.error:
185                self.fail("Expected error %s didn't appear as expected" % self.error)
186
187    def getr_rebalance_test(self):
188        gen = DocumentGenerator('test_docs', '{{"age": {0}}}', xrange(5),
189                                      start=0, end=self.num_items)
190        self.perform_docs_ops(self.master, [gen], 'create')
191        self.log.info("Checking replica read")
192        client = VBucketAwareMemcached(RestConnection(self.master), self.default_bucket_name)
193        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
194                            self.servers[self.nodes_init : self.nodes_init + self.nodes_in],
195                            [])
196        try:
197            while gen.has_next():
198                key, _ = gen.next()
199                o, c, d = client.getr(key)
200        finally:
201            rebalance.result()
202
203    def getr_negative_corrupted_vbucket_test(self):
204        vbucket_state = self.input.param("vbucket_state", '')
205        gen = DocumentGenerator('test_docs', '{{"age": {0}}}', xrange(5),
206                                start=0, end=self.num_items)
207        self.perform_docs_ops(self.master, [gen], 'create')
208        self.log.info("Checking replica read")
209        client = VBucketAwareMemcached(RestConnection(self.master), self.default_bucket_name)
210        vbuckets_num = RestConnection(self.master).get_vbuckets(self.buckets[0])
211        while gen.has_next():
212            try:
213                key, _ = gen.next()
214                vBucketId = client._get_vBucket_id(key)
215                mem = client.memcached_for_replica_vbucket(vBucketId)
216                if vbucket_state:
217                    mem.set_vbucket_state(vBucketId, vbucket_state)
218                    msg = "Vbucket %s set to pending state" % vBucketId
219                    mem_to_read = mem
220                else:
221                    wrong_vbucket = [v for v in client.vBucketMapReplica
222                                   if mem.host != client.vBucketMapReplica[v][0].split(':')[0] or\
223                                   str(mem.port) != client.vBucketMapReplica[v][0].split(':')[1]][0]
224                    mem_to_read = client.memcached_for_replica_vbucket(wrong_vbucket)
225                    msg = "Key: %s. Correct host is %s, test try to get from %s host. " %(
226                                                        key, mem.host, mem_to_read.host)
227                    msg += "Correct vbucket %s, wrong vbucket %s" % (vBucketId, wrong_vbucket)
228                self.log.info(msg)
229                client._send_op(mem_to_read.getr, key)
230            except Exception, ex:
231                if self.error and str(ex).find(self.error) != -1:
232                    self.log.info("Expected error %s appeared as expected" % self.error)
233                else:
234                    raise ex
235            else:
236                if self.error:
237                    self.fail("Expected error %s didn't appear as expected" % self.error)
238
239    def getr_dgm_test(self):
240        resident_ratio = self.input.param("resident_ratio", 50)
241        gens = []
242        delta_items = 200000
243        self.num_items = 0
244        mc = MemcachedClientHelper.direct_client(self.master, self.default_bucket_name)
245
246        self.log.info("LOAD PHASE")
247        end_time = time.time() + self.wait_timeout * 30
248        while (int(mc.stats()["vb_active_perc_mem_resident"]) == 0 or\
249               int(mc.stats()["vb_active_perc_mem_resident"]) > resident_ratio) and\
250              time.time() < end_time:
251            self.log.info("Resident ratio is %s" % mc.stats()["vb_active_perc_mem_resident"])
252            gen = DocumentGenerator('test_docs', '{{"age": {0}}}', xrange(5),
253                                    start=self.num_items, end=(self.num_items + delta_items))
254            gens.append(copy.deepcopy(gen))
255            self._load_all_buckets(self.master, gen, 'create', self.expiration, kv_store=1,
256                                   flag=self.flags, only_store_hash=False, batch_size=1)
257            self.num_items += delta_items
258            self.log.info("Resident ratio is %s" % mc.stats()["vb_active_perc_mem_resident"])
259        self.assertTrue(int(mc.stats()["vb_active_perc_mem_resident"]) < resident_ratio,
260                        "Resident ratio is not reached")
261        self.verify_cluster_stats(self.servers[:self.nodes_init], only_store_hash=False,
262                                  batch_size=1)
263        self.log.info("Currently loaded items: %s" % self.num_items)
264
265        self.log.info("READ REPLICA PHASE")
266        self.verify_cluster_stats(self.servers[:self.nodes_init], only_store_hash=False,
267                                  replica_to_read=self.replica_to_read, batch_size=1)
268
269    def perform_docs_ops(self, server, gens, op_type, kv_store=1, only_store_hash=False,
270                         batch_size=1):
271        for gen in gens:
272            gen_ops = copy.deepcopy(gen)
273            self._load_all_buckets(server, gen_ops, 'create', self.expiration, kv_store=kv_store,
274                                  flag=self.flags, only_store_hash=only_store_hash, batch_size=batch_size)
275        gen_ops = copy.deepcopy(gens[0])
276        if self.data_ops == 'update':
277            self._load_all_buckets(server, gen_ops, 'update', self.expiration, kv_store=kv_store,
278                              flag=self.flags, only_store_hash=only_store_hash, batch_size=batch_size)
279        if self.data_ops in ['delete', 'recreate']:
280            self._load_all_buckets(server, gen_ops, 'delete', self.expiration, kv_store=kv_store,
281                              flag=self.flags, only_store_hash=only_store_hash, batch_size=batch_size)
282        if self.data_ops == 'recreate':
283            self._load_all_buckets(server, gen_ops, 'create', self.expiration, kv_store=kv_store,
284                              flag=self.flags, only_store_hash=only_store_hash, batch_size=batch_size)
285        self.verify_cluster_stats(self.servers[:self.nodes_init], only_store_hash=only_store_hash,
286                                  batch_size=batch_size, timeout=(self.wait_timeout * 10))
287
288    def perform_warm_up(self):
289        warmup_nodes = self.servers[-self.warmup_nodes:]
290        for warmup_node in warmup_nodes:
291            shell = RemoteMachineShellConnection(warmup_node)
292            shell.stop_couchbase()
293            shell.disconnect()
294        self.sleep(20)
295        for warmup_node in warmup_nodes:
296            shell = RemoteMachineShellConnection(warmup_node)
297            shell.start_couchbase()
298            shell.disconnect()
299        ClusterOperationHelper.wait_for_ns_servers_or_assert(warmup_nodes, self)
300
301    def perform_failover(self):
302        rest = RestConnection(self.master)
303        nodes = rest.node_statuses()
304        failover_servers = self.servers[:self.nodes_init][-self.failover_factor:]
305        failover_nodes = []
306        for server in failover_servers:
307            for node in nodes:
308                if node.ip == server.ip and str(node.port) == server.port:
309                    failover_nodes.append(node)
310        for node in failover_nodes:
311            rest.fail_over(node.id)
312            self.sleep(5)
313        if self.failover == GetrTests.FAILOVER_REBALANCE:
314            self.cluster.rebalance(self.servers[:self.nodes_init],
315                               [], failover_servers)
316        if self.failover == GetrTests.FAILOVER_ADD_BACK:
317            for node in failover_nodes:
318                rest.add_back_node(node.id)
319            self.cluster.rebalance(self.servers[:self.nodes_init],
320                                   [], [])
321