1import eventlet 2import json 3import random 4import string 5import json 6import sys 7import time 8from couchbase.couchbaseclient import CouchbaseClient 9from multiprocessing import Process 10import testcfg as cfg 11pool = eventlet.GreenPool(1000000) 12processMap = {} 13 14 15 16class SysCouchClient(CouchbaseClient): 17 def __init__(self, url, bucket, cred, accport): 18 self.pending_get_msgs = 0 19 self.bucket = bucket 20 self.url = url 21 self.cred = cred 22 self.ready = True 23 self.accport = accport 24 try: 25 super(SysCouchClient, self).__init__(url, bucket, cred) 26 print "sdk_%s: connected to %s => %s" % (accport, url, bucket) 27 except Exception as ex: 28 print "sdk_%s: unable to establish connection to %s => %s, %s " %\ 29 (accport, url, bucket, ex) 30 self.ready = False 31 32 def incr_pending_get_msgs(self, val): 33 self.pending_get_msgs = \ 34 self.pending_get_msgs + val 35 36class CouchClientManager(): 37 def __init__(self, accport): 38 self.client_map = {} 39 self.accport = accport 40 41 def add_bucket_client(self, bucket = "default", 42 password = "", 43 ip = cfg.COUCHBASE_IP, 44 port = cfg.COUCHBASE_PORT): 45 46 url = "http://%s:%s/pools/default" % (ip, port) 47 client = SysCouchClient(url, bucket, password, self.accport) 48 if client.ready == True: 49 self.client_map[bucket] = client 50 51 def get_bucket_client(self, bucket, 52 password = "", 53 ip = cfg.COUCHBASE_IP, 54 port = cfg.COUCHBASE_PORT): 55 56 if bucket not in self.client_map: 57 self.add_bucket_client(bucket, password, ip, port) 58 59 return self.client_map[bucket] 60 61 def requestHandler(self, client_sock): 62 c = '' 63 respond = False 64 while True: 65 _recv = client_sock.recv(1024) 66 if not _recv: 67 break 68 if _recv == '\0': 69 respond = True 70 break 71 else: 72 c = c + _recv 73 rc = self._requestHandler(c) 74 75 if respond: 76 self.sendClientResponse(client_sock, rc) 77 78 client_sock.close() 79 80 def sendClientResponse(self, client_sock, rc): 81 msg = "" 82 if rc is not None: 83 try: 84 msg = json.dumps(rc) 85 except Exception: 86 pass 87 client_sock.send(msg) 88 89 90 def _requestHandler(self, c, retries = 0): 91 try: 92 data = json.loads(c) 93 self.client_from_req(data) 94 res = self.exec_request(data) 95 return res 96 except ValueError as ex: 97 print ex 98 print "unable to decode json: %s" % c 99 except Exception as ex: 100 processMap[self.accport]["connected"] = False 101 print "Error: %s" % ex 102 103 def exec_request(self, data): 104 if data['command'] == 'set': 105 return self.do_set(data) 106 107 if data['command'] == 'setq': 108 return self.do_setq(data) 109 110 if data['command'] == 'mset': 111 return self.do_mset(data) 112 113 if data['command'] == 'mdelete': 114 return self.do_mdelete(data) 115 116 if data['command'] == 'get': 117 return self.do_get(data) 118 119 if data['command'] == 'mget': 120 return self.do_mget(data) 121 122 if data['command'] == 'delete': 123 return self.do_delete(data) 124 125 if data['command'] == 'query': 126 return self.do_query(data) 127 128 if data['command'] == 'latency': 129 return self.get_op_latency(data) 130 131 def do_mset(self, data): 132 133 keys = data['args'] 134 template = data['template'] 135 kv = template['kv'] 136 ttl = 0 137 flags = 0 138 139 if "ttl" in template: 140 ttl = int(template['ttl']) 141 if "flags" in template: 142 flags = template['flags'] 143 144 client = self.client_from_req(data) 145 146 for key in keys: 147 doc = {"args" : [key, ttl, flags, kv]} 148 try: 149 self.do_setq(doc, client) 150 except Exception as ex: 151 raise Exception(ex) 152 153 rc = client.recv_bulk_responses() 154 if len(rc) > 0: 155 for msg in rc: 156 if isinstance(msg, dict) and 'error' in msg and int(msg["error"]) != 0: 157 if int(msg["error"]) == 7: 158 client.reconfig_vbucket_map(forward=True) 159 if int(msg["error"]) == 134: 160 pass # temp failure (oom) ignore 161 else: 162 ts = time.localtime() 163 ts_string = "%s/%s/%s %s:%s:%s" %\ 164 (ts.tm_year, ts.tm_mon, ts.tm_mday, ts.tm_hour, ts.tm_min, ts.tm_sec) 165 print "%s:set MemcachedError%d: %s" % (ts_string, msg["error"], msg["rv"]) 166 167 return True 168 169 170 def _get_set_args(self, data): 171 key = str(data['args'][0]) 172 exp = int(data['args'][1]) 173 flags = int(data['args'][2]) 174 value = json.dumps(data['args'][3]) 175 176 return key, exp, flags, value 177 178 def do_setq(self, args, client): 179 key, exp, flags, value = self._get_set_args(args) 180 client.setq(key,exp, flags, value) 181 return True 182 183 def get_op_latency(self, data): 184 185 # retrieve instance of sdk client 186 client = self.client_from_req(data) 187 188 # op_args pass in as tuple, i.e 189 # set => ('key', 0, 0, 'val') 190 op_args = data['args'] 191 192 # select op 193 op = data['op'] 194 if op == 'set': 195 op_args[3] = json.dumps(op_args[3]) 196 func = client.set 197 if op == 'get': 198 func = client.get 199 if op == 'delete': 200 func = client.delete 201 202 # timed wrapper 203 start = time.time() 204 rc = func(*op_args) # exec 205 end = time.time() 206 207 latency = end - start 208 return latency 209 210 211 def do_set(self, data): 212 key, exp, flags, value = self._get_set_args(data) 213 client = self.client_from_req(data) 214 return client.set(key,exp, flags, value) 215 216 def do_get(self, data): 217 key = str(data['args'][0]) 218 client = self.client_from_req(data) 219 client.get(key) 220 return key 221 222 def do_mget(self, data): 223 keys = data['args'] 224 client = self.client_from_req(data) 225 for key in keys: 226 key = str(key) 227 client.getq(key) 228 229 # increment getq count 230 client.incr_pending_get_msgs(len(keys)) 231 232 if client.pending_get_msgs > 400: 233 rc = client.recv_bulk_responses() 234 if len(rc) > 0: 235 for msg in rc: 236 if isinstance(msg, dict) and 'error' in msg and int(msg["error"]) != 0: 237 if int(msg["error"]) == 7: 238 client.reconfig_vbucket_map(forward=True) 239 if int(msg["error"]) == 134: 240 pass # temp failure (oom) ignore 241 else: 242 ts = time.localtime() 243 ts_string = "%s/%s/%s %s:%s:%s" %\ 244 (ts.tm_year, ts.tm_mon, ts.tm_mday, ts.tm_hour, ts.tm_min, ts.tm_sec) 245 print "%s:get MemcachedError%d: %s" % (ts_string, msg["error"], msg["rv"]) 246 247 248 client.pending_get_msgs = 0 249 else: 250 client.noop() 251 return True 252 253 254 def do_delete(self, data): 255 256 key = str(data['args'][0]) 257 client = self.client_from_req(data) 258 res = client.delete(key) 259 return res 260 261 def do_mdelete(self, data): 262 keys = data['args'] 263 results = [] 264 client = self.client_from_req(data) 265 for key in keys: 266 key = str(key) 267 client.deleteq(key) 268 269 return True 270 271 272 def client_from_req(self, data, password = ""): 273 bucket = str(data["bucket"]) 274 if "password" in data: 275 password = str(data["password"]) 276 ip = data['cb_ip'] 277 port = data['cb_port'] 278 client = self.get_bucket_client(bucket, password, ip, port) 279 return client 280 281 282def monitorSubprocesses(): 283 # when any subprocess ends, attempt to restart 284 while True: 285 for port in processMap: 286 if not processMap[port]['process'].is_alive(): 287 restart_listener(port) 288 time.sleep(1) 289 290def restart_listener(port): 291 stop_listener(port) 292 start_listener(port) 293 294def stop_listener(port): 295 process = processMap[port]["process"] 296 try: 297 print "sdk_%s: exiting" % (port) 298 process.terminate() 299 except Exception as ex: 300 print "sdk_%s: error occured termination %s" % (port, ex) 301 302def start_listener(port): 303 p = Process(target=_run, args=(port,)) 304 processMap[port] = {"process" : p, 305 "connected" : True, 306 "alt_nodes" : []} 307 308 print "sdk_%s: starting" % port 309 p.start() 310 311def _run(port): 312 server = eventlet.listen(('127.0.0.1', port)) 313 client_mgr = CouchClientManager(port) 314 while processMap[port]["connected"]: 315 new_sock, address = server.accept() 316 pool.spawn_n(client_mgr.requestHandler, new_sock) 317 318if __name__ == '__main__': 319 for port in xrange(50008, 50012): 320 start_listener(port) 321 monitorSubprocesses() 322