1import eventlet
2import json
3import random
4import string
5import json
6import sys
7import time
8from couchbase.couchbaseclient import CouchbaseClient
9from multiprocessing import Process
10import testcfg as cfg
11pool = eventlet.GreenPool(1000000)
12processMap = {}
13
14
15
16class SysCouchClient(CouchbaseClient):
17    def __init__(self, url, bucket, cred, accport):
18        self.pending_get_msgs = 0
19        self.bucket = bucket
20        self.url = url
21        self.cred = cred
22        self.ready = True
23        self.accport = accport
24        try:
25            super(SysCouchClient, self).__init__(url, bucket, cred)
26            print "sdk_%s: connected to %s => %s" % (accport, url, bucket)
27        except Exception as ex:
28            print "sdk_%s: unable to establish connection to %s => %s, %s " %\
29                (accport, url, bucket, ex)
30            self.ready = False
31
32    def incr_pending_get_msgs(self, val):
33        self.pending_get_msgs =  \
34            self.pending_get_msgs + val
35
36class CouchClientManager():
37    def __init__(self, accport):
38        self.client_map = {}
39        self.accport = accport
40
41    def add_bucket_client(self, bucket = "default",
42                          password = "",
43                          ip = cfg.COUCHBASE_IP,
44                          port = cfg.COUCHBASE_PORT):
45
46        url = "http://%s:%s/pools/default"  % (ip, port)
47        client = SysCouchClient(url, bucket, password, self.accport)
48        if client.ready == True:
49            self.client_map[bucket] = client
50
51    def get_bucket_client(self, bucket,
52                          password = "",
53                          ip = cfg.COUCHBASE_IP,
54                          port = cfg.COUCHBASE_PORT):
55
56        if bucket not in self.client_map:
57            self.add_bucket_client(bucket, password, ip, port)
58
59        return self.client_map[bucket]
60
61    def requestHandler(self, client_sock):
62        c = ''
63        respond = False
64        while True:
65            _recv = client_sock.recv(1024)
66            if not _recv:
67                break
68            if _recv == '\0':
69                respond = True
70                break
71            else:
72                c = c + _recv
73        rc = self._requestHandler(c)
74
75        if respond:
76            self.sendClientResponse(client_sock, rc)
77
78        client_sock.close()
79
80    def sendClientResponse(self, client_sock, rc):
81        msg = ""
82        if rc is not None:
83            try:
84                msg = json.dumps(rc)
85            except Exception:
86                pass
87        client_sock.send(msg)
88
89
90    def _requestHandler(self, c, retries = 0):
91        try:
92            data = json.loads(c)
93            self.client_from_req(data)
94            res = self.exec_request(data)
95            return res
96        except ValueError as ex:
97            print ex
98            print "unable to decode json: %s" % c
99        except Exception as ex:
100            processMap[self.accport]["connected"] = False
101            print "Error: %s" % ex
102
103    def exec_request(self, data):
104        if data['command'] == 'set':
105            return self.do_set(data)
106
107        if data['command'] == 'setq':
108            return self.do_setq(data)
109
110        if data['command'] == 'mset':
111            return self.do_mset(data)
112
113        if data['command'] == 'mdelete':
114            return self.do_mdelete(data)
115
116        if data['command'] == 'get':
117            return self.do_get(data)
118
119        if data['command'] == 'mget':
120            return self.do_mget(data)
121
122        if data['command'] == 'delete':
123            return self.do_delete(data)
124
125        if data['command'] == 'query':
126            return self.do_query(data)
127
128        if data['command'] == 'latency':
129            return self.get_op_latency(data)
130
131    def do_mset(self, data):
132
133        keys = data['args']
134        template = data['template']
135        kv = template['kv']
136        ttl = 0
137        flags = 0
138
139        if "ttl" in template:
140            ttl = int(template['ttl'])
141        if "flags" in template:
142            flags = template['flags']
143
144        client = self.client_from_req(data)
145
146        for key in keys:
147            doc = {"args" : [key, ttl, flags, kv]}
148            try:
149                self.do_setq(doc, client)
150            except Exception as ex:
151                raise Exception(ex)
152
153        rc = client.recv_bulk_responses()
154        if len(rc) > 0:
155            for msg in rc:
156                if isinstance(msg, dict) and 'error' in msg and int(msg["error"]) != 0:
157                   if int(msg["error"]) == 7:
158                       client.reconfig_vbucket_map(forward=True)
159                   if int(msg["error"]) == 134:
160                       pass # temp failure (oom) ignore
161                   else:
162                       ts = time.localtime()
163                       ts_string = "%s/%s/%s %s:%s:%s" %\
164                           (ts.tm_year, ts.tm_mon, ts.tm_mday, ts.tm_hour, ts.tm_min, ts.tm_sec)
165                       print "%s:set MemcachedError%d: %s" % (ts_string, msg["error"], msg["rv"])
166
167        return True
168
169
170    def _get_set_args(self, data):
171        key = str(data['args'][0])
172        exp = int(data['args'][1])
173        flags = int(data['args'][2])
174        value = json.dumps(data['args'][3])
175
176        return key, exp, flags, value
177
178    def do_setq(self, args, client):
179        key, exp, flags, value = self._get_set_args(args)
180        client.setq(key,exp, flags, value)
181        return True
182
183    def get_op_latency(self, data):
184
185        # retrieve instance of sdk client
186        client = self.client_from_req(data)
187
188        # op_args pass in as tuple, i.e
189        # set => ('key', 0, 0, 'val')
190        op_args = data['args']
191
192        # select op
193        op = data['op']
194        if op == 'set':
195            op_args[3] = json.dumps(op_args[3])
196            func = client.set
197        if op == 'get':
198            func = client.get
199        if op == 'delete':
200            func = client.delete
201
202        # timed wrapper
203        start = time.time()
204        rc = func(*op_args)  # exec
205        end = time.time()
206
207        latency = end - start
208        return latency
209
210
211    def do_set(self, data):
212        key, exp, flags, value = self._get_set_args(data)
213        client = self.client_from_req(data)
214        return client.set(key,exp, flags, value)
215
216    def do_get(self, data):
217        key = str(data['args'][0])
218        client = self.client_from_req(data)
219        client.get(key)
220        return key
221
222    def do_mget(self, data):
223        keys = data['args']
224        client = self.client_from_req(data)
225        for key in keys:
226            key = str(key)
227            client.getq(key)
228
229        # increment getq count
230        client.incr_pending_get_msgs(len(keys))
231
232        if client.pending_get_msgs > 400:
233            rc = client.recv_bulk_responses()
234            if len(rc) > 0:
235                for msg in rc:
236                    if isinstance(msg, dict) and 'error' in msg and int(msg["error"]) != 0:
237                        if int(msg["error"]) == 7:
238                            client.reconfig_vbucket_map(forward=True)
239                        if int(msg["error"]) == 134:
240                            pass # temp failure (oom) ignore
241                        else:
242                            ts = time.localtime()
243                            ts_string = "%s/%s/%s %s:%s:%s" %\
244                                (ts.tm_year, ts.tm_mon, ts.tm_mday, ts.tm_hour, ts.tm_min, ts.tm_sec)
245                            print "%s:get MemcachedError%d: %s" % (ts_string, msg["error"], msg["rv"])
246
247
248            client.pending_get_msgs = 0
249        else:
250            client.noop()
251        return True
252
253
254    def do_delete(self, data):
255
256        key = str(data['args'][0])
257        client = self.client_from_req(data)
258        res = client.delete(key)
259        return res
260
261    def do_mdelete(self, data):
262        keys = data['args']
263        results = []
264        client = self.client_from_req(data)
265        for key in keys:
266            key = str(key)
267            client.deleteq(key)
268
269        return True
270
271
272    def client_from_req(self, data, password = ""):
273        bucket = str(data["bucket"])
274        if "password" in data:
275            password = str(data["password"])
276        ip = data['cb_ip']
277        port = data['cb_port']
278        client = self.get_bucket_client(bucket, password, ip, port)
279        return client
280
281
282def monitorSubprocesses():
283    # when any subprocess ends, attempt to restart
284    while True:
285        for port in processMap:
286            if not processMap[port]['process'].is_alive():
287                restart_listener(port)
288        time.sleep(1)
289
290def restart_listener(port):
291    stop_listener(port)
292    start_listener(port)
293
294def stop_listener(port):
295    process = processMap[port]["process"]
296    try:
297        print "sdk_%s: exiting" % (port)
298        process.terminate()
299    except Exception as ex:
300        print "sdk_%s: error occured termination %s" % (port, ex)
301
302def start_listener(port):
303    p = Process(target=_run, args=(port,))
304    processMap[port] = {"process" : p,
305                        "connected" : True,
306                        "alt_nodes" : []}
307
308    print "sdk_%s: starting" % port
309    p.start()
310
311def _run(port):
312    server = eventlet.listen(('127.0.0.1', port))
313    client_mgr = CouchClientManager(port)
314    while processMap[port]["connected"]:
315        new_sock, address = server.accept()
316        pool.spawn_n(client_mgr.requestHandler, new_sock)
317
318if __name__ == '__main__':
319    for port in xrange(50008, 50012):
320        start_listener(port)
321    monitorSubprocesses()
322