1import time 2from membase.api.rest_client import RestConnection 3from memcached.helper.data_helper import VBucketAwareMemcached, MemcachedClientHelper 4from basetestcase import BaseTestCase 5from mc_bin_client import MemcachedError 6from couchbase_helper.documentgenerator import BlobGenerator 7from datetime import datetime, timedelta 8from couchbase_helper.document import View 9from membase.helper.rebalance_helper import RebalanceHelper 10from membase.helper.cluster_helper import ClusterOperationHelper 11 12class ObserveTests(BaseTestCase): 13 def setUp(self): 14 super(ObserveTests, self).setUp() 15 # self.pre_warmup_stats = {} 16 self.node_servers = [] 17 self.timeout = 120 18 self.nodes_in = int(self.input.param("nodes_in", 1)) 19 self.observe_with = self.input.param("observe_with", "") 20 self.default_map_func = 'function (doc) { emit(doc.age, doc.first_name);}' 21 self.default_design_doc = "Doc1" 22 map_func = 'function (doc) { emit(null, doc);}' 23 self.default_view = View("default_view", map_func, None) 24 self.access_log = self.input.param("access_log", False) 25 self.servs_in = [self.servers[i + 1] for i in range(self.nodes_in)] 26 self.mutate_by = self.input.param("mutate_by", "set") 27 self.nodes_init = self.input.param("nodes_init", 2) 28 self.without_access_log = self.input.param("without_access_log", False) 29 try: 30 self.log.info("Observe Rebalance Started") 31 self.cluster.rebalance(self.servers[:1], self.servs_in, []) 32 except Exception, e: 33 self.tearDown() 34 self.fail(e) 35 36 def tearDown(self): 37 super(ObserveTests, self).tearDown() 38 39 def _load_doc_data_all_buckets(self, op_type='create', start=0, end=0, expiry=0): 40 loaded = False 41 count = 0 42 gen_load = BlobGenerator('observe', 'observe', 1024, start=start, end=end) 43 while not loaded and count < 60: 44 try : 45 self._load_all_buckets(self.servers[0], gen_load, op_type, expiry) 46 loaded = True 47 except MemcachedError as error: 48 if error.status == 134: 49 loaded = False 50 self.log.error("Memcached error 134, wait for 5 seconds and then try again") 51 count += 1 52 time.sleep(5) 53 54 def _async_load_doc_data_all_buckets(self, op_type='create', start=0, end=0): 55 gen_load = BlobGenerator('observe', 'observe', 1024, start=start, end=end) 56 tasks = self._async_load_all_buckets(self.servers[0], gen_load, op_type, 0) 57 return tasks 58 59 def block_for_replication(self, key, cas=0, num=1, timeout=0, persist=False): 60 """ 61 observe a key until it has been replicated to @param num of servers 62 63 @param persist : block until item has been persisted to disk 64 """ 65 vbucketid = self.client._get_vBucket_id(key) 66 repl_servers = self._get_server_str(vbucketid, repl=True) 67 persisted = 0 68 self.log.info("VbucketId:%s on replicated servers:%s" % (vbucketid, repl_servers)) 69 70 while len(repl_servers) >= num > 0: 71 for server in repl_servers: 72 node = self._get_node(server) 73 self.log.info("Replicated Server:- %s" % (server)) 74 newclient = MemcachedClientHelper.direct_client(node, self.default_bucket_name) 75 t_start = datetime.now() 76 while persisted == 0: 77 opaque, rep_time, persist_time, persisted, cas = newclient.observe(key) 78 t_end = datetime.now() 79 self.log.info("######key:-%s and Server:- %s#########" % (key, server)) 80 self.log.info("Persisted:- %s" % (persisted)) 81 self.log.info("Time taken to persist:- %s" % (t_end - t_start)) 82 num = num + 1 83 if num == 0: 84 break 85 return True 86 87 def _get_server_str(self, vbucketid, repl=True): 88 """retrieve server string {ip:port} based on vbucketid""" 89 memcacheds, vBucketMap, vBucketMapReplica = self.client.request_map(self.client.rest, self.client.bucket) 90 if repl: 91 server = vBucketMapReplica[vbucketid] 92 else: 93 server = vBucketMap[vbucketid] 94 return server 95 96 def _get_node(self, server_ip_port): 97 server_ip = server_ip_port.split(":") 98 print server_ip[0] 99 for server in self.servers: 100 print server.ip 101 if server.ip == server_ip[0]: 102 return server 103 else: 104 continue 105 return None 106 107 def _create_multi_set_batch(self): 108 key_val = {} 109 keys = ["observe%s" % (i) for i in xrange(self.num_items)] 110 for key in keys: 111 key_val[key] = "multiset" 112 return key_val 113 114 @staticmethod 115 def _run_observe(self): 116 tasks = [] 117 query_set = "true" 118 persisted = 0 119 mutated = False 120 count = 0 121 for bucket in self.buckets: 122 self.cluster.create_view(self.master, self.default_design_doc, 123 self.default_view, bucket , self.wait_timeout * 2) 124 client = VBucketAwareMemcached(RestConnection(self.master), bucket) 125 self.max_time = timedelta(microseconds=0) 126 if self.mutate_by == "multi_set": 127 key_val = self._create_multi_set_batch() 128 client.setMulti(0, 0, key_val) 129 keys = ["observe%s" % (i) for i in xrange(self.num_items)] 130 for key in keys: 131 mutated = False 132 while not mutated and count < 60: 133 try: 134 if self.mutate_by == "set": 135 # client.memcached(key).set(key, 0, 0, "set") 136 client.set(key, 0, 0, "setvalue") 137 elif self.mutate_by == "append": 138 client.memcached(key).append(key, "append") 139 elif self.mutate_by == "prepend" : 140 client.memcached(key).prepend(key, "prepend") 141 elif self.mutate_by == "incr": 142 client.memcached(key).incr(key, 1) 143 elif self.mutate_by == "decr": 144 client.memcached(key).decr(key) 145 mutated = True 146 t_start = datetime.now() 147 except MemcachedError as error: 148 if error.status == 134: 149 loaded = False 150 self.log.error("Memcached error 134, wait for 5 seconds and then try again") 151 count += 1 152 time.sleep(5) 153 while persisted == 0: 154 opaque, rep_time, persist_time, persisted, cas = client.observe(key) 155 t_end = datetime.now() 156 self.log.info("##########key:-%s################" % (key)) 157 self.log.info("Persisted:- %s" % (persisted)) 158 self.log.info("Persist_Time:- %s" % (rep_time)) 159 self.log.info("Time2:- %s" % (t_end - t_start)) 160 if self.max_time <= (t_end - t_start): 161 self.max_time = (t_end - t_start) 162 self.log.info("Max Time taken for observe is :- %s" % self.max_time) 163 self.log.info("Cas Value:- %s" % (cas)) 164 query = {"stale" : "false", "full_set" : "true", "connection_timeout" : 60000} 165 self.cluster.query_view(self.master, "dev_Doc1", self.default_view.name, query, self.num_items, bucket, timeout=self.wait_timeout) 166 self.log.info("Observe Validation:- view: %s in design doc dev_Doc1 and in bucket %s" % (self.default_view, bucket)) 167 # check whether observe has to run with delete and delete parallel with observe or not 168 if len (self.observe_with) > 0 : 169 if self.observe_with == "delete" : 170 self.log.info("Deleting 0- %s number of items" % (self.num_items / 2)) 171 self._load_doc_data_all_buckets('delete', 0, self.num_items / 2) 172 query_set = "true" 173 elif self.observe_with == "delete_parallel": 174 self.log.info("Deleting Parallel 0- %s number of items" % (self.num_items / 2)) 175 tasks = self._async_load_doc_data_all_buckets('delete', 0, self.num_items / 2) 176 query_set = "false" 177 for key in keys: 178 opaque, rep_time, persist_time, persisted, cas = client.memcached(key).observe(key) 179 self.log.info("##########key:-%s################" % (key)) 180 self.log.info("Persisted:- %s" % (persisted)) 181 if self.observe_with == "delete_parallel": 182 for task in tasks: 183 task.result() 184 185 query = {"stale" : "false", "full_set" : query_set, "connection_timeout" : 60000} 186 self.cluster.query_view(self.master, "dev_Doc1", self.default_view.name, query, self.num_items / 2, bucket, timeout=self.wait_timeout) 187 self.log.info("Observe Validation:- view: %s in design doc dev_Doc1 and in bucket %s" % (self.default_view, self.default_bucket_name)) 188 189 """test_observe_basic_data_load_delete will test observer basic scenario 190 i) Loading data and then run observe 191 ii) deleting data then run observe 192 iii) deleting data parallel with observe in parallel 193 then verify the persistence by querying a view """ 194 def test_observe_basic_data_load_delete(self): 195 self._load_doc_data_all_buckets('create', 0, self.num_items) 196 # Persist all the loaded data item 197 for bucket in self.buckets: 198 RebalanceHelper.wait_for_persistence(self.master, bucket) 199 rebalance = self.input.param("rebalance", "no") 200 if rebalance == "in": 201 self.servs_in = [self.servers[len(self.servers) - 1]] 202 rebalance = self.cluster.async_rebalance(self.servers[:1], self.servs_in , []) 203 self._run_observe(self) 204 rebalance.result() 205 elif rebalance == "out": 206 self.servs_out = [self.servers[self.nodes_init - 1]] 207 rebalance = self.cluster.async_rebalance(self.servers[:1], [] , self.servs_out) 208 self._run_observe(self) 209 rebalance.result() 210 else: 211 self._run_observe(self) 212 213 214 def test_observe_with_replication(self): 215 self._load_doc_data_all_buckets('create', 0, self.num_items) 216 if self.observe_with == "delete" : 217 self.log.info("Deleting 0- %s number of items" % (self.num_items / 2)) 218 self._load_doc_data_all_buckets('delete', 0, self.num_items / 2) 219 query_set = "true" 220 elif self.observe_with == "delete_parallel": 221 self.log.info("Deleting Parallel 0- %s number of items" % (self.num_items / 2)) 222 tasks = self._async_load_doc_data_all_buckets('delete', 0, self.num_items / 2) 223 query_set = "false" 224 keys = ["observe%s" % (i) for i in xrange(self.num_items)] 225 self.key_count = 0 226 self.max_time = 0 227 self.client = VBucketAwareMemcached(RestConnection(self.master), self.default_bucket_name) 228 for key in keys: 229 self.key_count = self.key_count + 1 230 self.block_for_replication(key, 0, 1) 231 if self.observe_with == "delete_parallel": 232 for task in tasks: 233 task.result() 234 235 def test_observe_with_warmup(self): 236 self._load_doc_data_all_buckets('create', 0, self.num_items) 237 # Persist all the loaded data item 238 self.log.info("Nodes in cluster: %s" % self.servers[:self.nodes_init]) 239 for bucket in self.buckets: 240 RebalanceHelper.wait_for_persistence(self.master, bucket) 241 self._stats_befor_warmup(bucket.name) 242 self._restart_memcache(bucket.name) 243 # for bucket in self.buckets: 244 ClusterOperationHelper._wait_warmup_completed(self, self.servers[:self.nodes_init], bucket.name) 245 self._run_observe(self) 246