1import time
2from membase.api.rest_client import RestConnection
3from memcached.helper.data_helper import VBucketAwareMemcached, MemcachedClientHelper
4from basetestcase import BaseTestCase
5from mc_bin_client import MemcachedError
6from couchbase_helper.documentgenerator import BlobGenerator
7from datetime import datetime, timedelta
8from couchbase_helper.document import View
9from membase.helper.rebalance_helper import RebalanceHelper
10from membase.helper.cluster_helper import  ClusterOperationHelper
11
12class ObserveTests(BaseTestCase):
13    def setUp(self):
14        super(ObserveTests, self).setUp()
15        # self.pre_warmup_stats = {}
16        self.node_servers = []
17        self.timeout = 120
18        self.nodes_in = int(self.input.param("nodes_in", 1))
19        self.observe_with = self.input.param("observe_with", "")
20        self.default_map_func = 'function (doc) { emit(doc.age, doc.first_name);}'
21        self.default_design_doc = "Doc1"
22        map_func = 'function (doc) { emit(null, doc);}'
23        self.default_view = View("default_view", map_func, None)
24        self.access_log = self.input.param("access_log", False)
25        self.servs_in = [self.servers[i + 1] for i in range(self.nodes_in)]
26        self.mutate_by = self.input.param("mutate_by", "set")
27        self.nodes_init = self.input.param("nodes_init", 2)
28        self.without_access_log = self.input.param("without_access_log", False)
29        try:
30            self.log.info("Observe Rebalance Started")
31            self.cluster.rebalance(self.servers[:1], self.servs_in, [])
32        except Exception, e:
33            self.tearDown()
34            self.fail(e)
35
36    def tearDown(self):
37        super(ObserveTests, self).tearDown()
38
39    def _load_doc_data_all_buckets(self, op_type='create', start=0, end=0, expiry=0):
40        loaded = False
41        count = 0
42        gen_load = BlobGenerator('observe', 'observe', 1024, start=start, end=end)
43        while not loaded and count < 60:
44            try :
45                self._load_all_buckets(self.servers[0], gen_load, op_type, expiry)
46                loaded = True
47            except MemcachedError as error:
48                if error.status == 134:
49                    loaded = False
50                    self.log.error("Memcached error 134, wait for 5 seconds and then try again")
51                    count += 1
52                    time.sleep(5)
53
54    def _async_load_doc_data_all_buckets(self, op_type='create', start=0, end=0):
55        gen_load = BlobGenerator('observe', 'observe', 1024, start=start, end=end)
56        tasks = self._async_load_all_buckets(self.servers[0], gen_load, op_type, 0)
57        return tasks
58
59    def block_for_replication(self, key, cas=0, num=1, timeout=0, persist=False):
60        """
61        observe a key until it has been replicated to @param num of servers
62
63        @param persist : block until item has been persisted to disk
64        """
65        vbucketid = self.client._get_vBucket_id(key)
66        repl_servers = self._get_server_str(vbucketid, repl=True)
67        persisted = 0
68        self.log.info("VbucketId:%s on replicated servers:%s" % (vbucketid, repl_servers))
69
70        while len(repl_servers) >= num > 0:
71            for server in repl_servers:
72                node = self._get_node(server)
73                self.log.info("Replicated Server:- %s" % (server))
74                newclient = MemcachedClientHelper.direct_client(node, self.default_bucket_name)
75                t_start = datetime.now()
76                while persisted == 0:
77                    opaque, rep_time, persist_time, persisted, cas = newclient.observe(key)
78                t_end = datetime.now()
79                self.log.info("######key:-%s and Server:- %s#########" % (key, server))
80                self.log.info("Persisted:- %s" % (persisted))
81                self.log.info("Time taken to persist:- %s" % (t_end - t_start))
82                num = num + 1
83                if num == 0:
84                    break
85        return True
86
87    def _get_server_str(self, vbucketid, repl=True):
88        """retrieve server string {ip:port} based on vbucketid"""
89        memcacheds, vBucketMap, vBucketMapReplica = self.client.request_map(self.client.rest, self.client.bucket)
90        if repl:
91            server = vBucketMapReplica[vbucketid]
92        else:
93            server = vBucketMap[vbucketid]
94        return server
95
96    def _get_node(self, server_ip_port):
97        server_ip = server_ip_port.split(":")
98        print server_ip[0]
99        for server in self.servers:
100            print server.ip
101            if server.ip == server_ip[0]:
102                return server
103            else:
104                continue
105        return None
106
107    def _create_multi_set_batch(self):
108        key_val = {}
109        keys = ["observe%s" % (i) for i in xrange(self.num_items)]
110        for key in keys:
111            key_val[key] = "multiset"
112        return key_val
113
114    @staticmethod
115    def _run_observe(self):
116        tasks = []
117        query_set = "true"
118        persisted = 0
119        mutated = False
120        count = 0
121        for bucket in self.buckets:
122            self.cluster.create_view(self.master, self.default_design_doc,
123                                      self.default_view, bucket , self.wait_timeout * 2)
124            client = VBucketAwareMemcached(RestConnection(self.master), bucket)
125            self.max_time = timedelta(microseconds=0)
126            if self.mutate_by == "multi_set":
127                key_val = self._create_multi_set_batch()
128                client.setMulti(0, 0, key_val)
129            keys = ["observe%s" % (i) for i in xrange(self.num_items)]
130            for key in keys:
131                mutated = False
132                while not mutated and count < 60:
133                    try:
134                        if self.mutate_by == "set":
135                            # client.memcached(key).set(key, 0, 0, "set")
136                            client.set(key, 0, 0, "setvalue")
137                        elif self.mutate_by == "append":
138                            client.memcached(key).append(key, "append")
139                        elif self.mutate_by == "prepend" :
140                            client.memcached(key).prepend(key, "prepend")
141                        elif self.mutate_by == "incr":
142                            client.memcached(key).incr(key, 1)
143                        elif self.mutate_by == "decr":
144                            client.memcached(key).decr(key)
145                        mutated = True
146                        t_start = datetime.now()
147                    except MemcachedError as error:
148                        if error.status == 134:
149                            loaded = False
150                            self.log.error("Memcached error 134, wait for 5 seconds and then try again")
151                            count += 1
152                            time.sleep(5)
153                while persisted == 0:
154                    opaque, rep_time, persist_time, persisted, cas = client.observe(key)
155                t_end = datetime.now()
156                self.log.info("##########key:-%s################" % (key))
157                self.log.info("Persisted:- %s" % (persisted))
158                self.log.info("Persist_Time:- %s" % (rep_time))
159                self.log.info("Time2:- %s" % (t_end - t_start))
160                if self.max_time <= (t_end - t_start):
161                    self.max_time = (t_end - t_start)
162                    self.log.info("Max Time taken for observe is :- %s" % self.max_time)
163                    self.log.info("Cas Value:- %s" % (cas))
164            query = {"stale" : "false", "full_set" : "true", "connection_timeout" : 60000}
165            self.cluster.query_view(self.master, "dev_Doc1", self.default_view.name, query, self.num_items, bucket, timeout=self.wait_timeout)
166            self.log.info("Observe Validation:- view: %s in design doc dev_Doc1 and in bucket %s" % (self.default_view, bucket))
167            # check whether observe has to run with delete and delete parallel with observe or not
168            if len (self.observe_with) > 0 :
169                if self.observe_with == "delete" :
170                    self.log.info("Deleting 0- %s number of items" % (self.num_items / 2))
171                    self._load_doc_data_all_buckets('delete', 0, self.num_items / 2)
172                    query_set = "true"
173                elif self.observe_with == "delete_parallel":
174                    self.log.info("Deleting Parallel 0- %s number of items" % (self.num_items / 2))
175                    tasks = self._async_load_doc_data_all_buckets('delete', 0, self.num_items / 2)
176                    query_set = "false"
177                for key in keys:
178                    opaque, rep_time, persist_time, persisted, cas = client.memcached(key).observe(key)
179                    self.log.info("##########key:-%s################" % (key))
180                    self.log.info("Persisted:- %s" % (persisted))
181                if self.observe_with == "delete_parallel":
182                    for task in tasks:
183                        task.result()
184
185                query = {"stale" : "false", "full_set" : query_set, "connection_timeout" : 60000}
186                self.cluster.query_view(self.master, "dev_Doc1", self.default_view.name, query, self.num_items / 2, bucket, timeout=self.wait_timeout)
187                self.log.info("Observe Validation:- view: %s in design doc dev_Doc1 and in bucket %s" % (self.default_view, self.default_bucket_name))
188
189        """test_observe_basic_data_load_delete will test observer basic scenario
190       i) Loading data and then run observe
191       ii) deleting data then run observe
192       iii) deleting data parallel with observe in parallel
193       then verify the persistence by querying a view """
194    def test_observe_basic_data_load_delete(self):
195        self._load_doc_data_all_buckets('create', 0, self.num_items)
196        # Persist all the loaded data item
197        for bucket in self.buckets:
198            RebalanceHelper.wait_for_persistence(self.master, bucket)
199        rebalance = self.input.param("rebalance", "no")
200        if rebalance == "in":
201            self.servs_in = [self.servers[len(self.servers) - 1]]
202            rebalance = self.cluster.async_rebalance(self.servers[:1], self.servs_in , [])
203            self._run_observe(self)
204            rebalance.result()
205        elif rebalance == "out":
206            self.servs_out = [self.servers[self.nodes_init - 1]]
207            rebalance = self.cluster.async_rebalance(self.servers[:1], [] , self.servs_out)
208            self._run_observe(self)
209            rebalance.result()
210        else:
211            self._run_observe(self)
212
213
214    def test_observe_with_replication(self):
215        self._load_doc_data_all_buckets('create', 0, self.num_items)
216        if self.observe_with == "delete" :
217            self.log.info("Deleting 0- %s number of items" % (self.num_items / 2))
218            self._load_doc_data_all_buckets('delete', 0, self.num_items / 2)
219            query_set = "true"
220        elif self.observe_with == "delete_parallel":
221            self.log.info("Deleting Parallel 0- %s number of items" % (self.num_items / 2))
222            tasks = self._async_load_doc_data_all_buckets('delete', 0, self.num_items / 2)
223            query_set = "false"
224        keys = ["observe%s" % (i) for i in xrange(self.num_items)]
225        self.key_count = 0
226        self.max_time = 0
227        self.client = VBucketAwareMemcached(RestConnection(self.master), self.default_bucket_name)
228        for key in keys:
229            self.key_count = self.key_count + 1
230            self.block_for_replication(key, 0, 1)
231        if self.observe_with == "delete_parallel":
232            for task in tasks:
233                task.result()
234
235    def test_observe_with_warmup(self):
236        self._load_doc_data_all_buckets('create', 0, self.num_items)
237        # Persist all the loaded data item
238        self.log.info("Nodes in cluster: %s" % self.servers[:self.nodes_init])
239        for bucket in self.buckets:
240            RebalanceHelper.wait_for_persistence(self.master, bucket)
241            self._stats_befor_warmup(bucket.name)
242            self._restart_memcache(bucket.name)
243            # for bucket in self.buckets:
244            ClusterOperationHelper._wait_warmup_completed(self, self.servers[:self.nodes_init], bucket.name)
245            self._run_observe(self)
246