1#!/usr/bin/env python
2"""
3Binary memcached test client.
4
5Copyright (c) 2007  Dustin Sallings <dustin@spy.net>
6"""
7
8import hmac
9import socket
10import random
11import struct
12import exceptions
13
14from couchbaseConstants import REQ_MAGIC_BYTE, RES_MAGIC_BYTE
15from couchbaseConstants import REQ_PKT_FMT, RES_PKT_FMT, MIN_RECV_PACKET
16from couchbaseConstants import SET_PKT_FMT, INCRDECR_RES_FMT
17import couchbaseConstants
18
19class MemcachedError(exceptions.Exception):
20    """Error raised when a command fails."""
21
22    def __init__(self, status, msg):
23        supermsg='Memcached error #' + `status`
24        if msg: supermsg += ":  " + msg
25        exceptions.Exception.__init__(self, supermsg)
26
27        self.status=status
28        self.msg=msg
29
30    def __repr__(self):
31        return "<MemcachedError #%d ``%s''>" % (self.status, self.msg)
32
33class MemcachedClient(object):
34    """Simple memcached client."""
35
36    vbucketId = 0
37
38    def __init__(self, host='127.0.0.1', port=11211, family=socket.AF_INET):
39        self.host = host
40        self.port = port
41        self.s=socket.socket(family, socket.SOCK_STREAM)
42        if hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
43            self.s.connect_ex(host)
44        else:
45            self.s.connect_ex((host, port))
46        self.r=random.Random()
47
48    def close(self):
49        self.s.close()
50
51    def __del__(self):
52        self.close()
53
54    def _sendCmd(self, cmd, key, val, opaque, extraHeader='', cas=0):
55        self._sendMsg(cmd, key, val, opaque, extraHeader=extraHeader, cas=cas,
56                      vbucketId=self.vbucketId)
57
58    def _sendMsg(self, cmd, key, val, opaque, extraHeader='', cas=0,
59                 dtype=0, vbucketId=0,
60                 fmt=REQ_PKT_FMT, magic=REQ_MAGIC_BYTE):
61        msg=struct.pack(fmt, magic,
62            cmd, len(key), len(extraHeader), dtype, vbucketId,
63                len(key) + len(extraHeader) + len(val), opaque, cas)
64        self.s.send(msg + extraHeader + key + val)
65
66    def _recvMsg(self):
67        response = ""
68        while len(response) < MIN_RECV_PACKET:
69            data = self.s.recv(MIN_RECV_PACKET - len(response))
70            if data == '':
71                raise exceptions.EOFError("Got empty data (remote died?).")
72            response += data
73        assert len(response) == MIN_RECV_PACKET
74        magic, cmd, keylen, extralen, dtype, errcode, remaining, opaque, cas=\
75            struct.unpack(RES_PKT_FMT, response)
76
77        rv = ""
78        while remaining > 0:
79            data = self.s.recv(remaining)
80            if data == '':
81                raise exceptions.EOFError("Got empty data (remote died?).")
82            rv += data
83            remaining -= len(data)
84
85        assert (magic in (RES_MAGIC_BYTE, REQ_MAGIC_BYTE)), "Got magic: %d" % magic
86        return cmd, errcode, opaque, cas, keylen, extralen, rv
87
88    def _handleKeyedResponse(self, myopaque):
89        cmd, errcode, opaque, cas, keylen, extralen, rv = self._recvMsg()
90        assert myopaque is None or opaque == myopaque, \
91            "expected opaque %x, got %x" % (myopaque, opaque)
92        if errcode != 0:
93            raise MemcachedError(errcode,  rv)
94        return cmd, opaque, cas, keylen, extralen, rv
95
96    def _handleSingleResponse(self, myopaque):
97        cmd, opaque, cas, keylen, extralen, data = self._handleKeyedResponse(myopaque)
98        return opaque, cas, data
99
100    def _doCmd(self, cmd, key, val, extraHeader='', cas=0):
101        """Send a command and await its response."""
102        opaque=self.r.randint(0, 2**32)
103        self._sendCmd(cmd, key, val, opaque, extraHeader, cas)
104        return self._handleSingleResponse(opaque)
105
106    def _mutate(self, cmd, key, exp, flags, cas, val):
107        return self._doCmd(cmd, key, val, struct.pack(SET_PKT_FMT, flags, exp),
108            cas)
109
110    def _cat(self, cmd, key, cas, val):
111        return self._doCmd(cmd, key, val, '', cas)
112
113    def append(self, key, value, cas=0):
114        return self._cat(couchbaseConstants.CMD_APPEND, key, cas, value)
115
116    def prepend(self, key, value, cas=0):
117        return self._cat(couchbaseConstants.CMD_PREPEND, key, cas, value)
118
119    def __incrdecr(self, cmd, key, amt, init, exp):
120        something, cas, val=self._doCmd(cmd, key, '',
121            struct.pack(couchbaseConstants.INCRDECR_PKT_FMT, amt, init, exp))
122        return struct.unpack(INCRDECR_RES_FMT, val)[0], cas
123
124    def incr(self, key, amt=1, init=0, exp=0):
125        """Increment or create the named counter."""
126        return self.__incrdecr(couchbaseConstants.CMD_INCR, key, amt, init, exp)
127
128    def decr(self, key, amt=1, init=0, exp=0):
129        """Decrement or create the named counter."""
130        return self.__incrdecr(couchbaseConstants.CMD_DECR, key, amt, init, exp)
131
132    def _doMetaCmd(self, cmd, key, value, cas, exp, flags, seqno, remote_cas):
133        extra = struct.pack('>IIQQ', flags, exp, seqno, remote_cas)
134        return self._doCmd(cmd, key, value, extra, cas)
135
136    def _doRevCmd(self, cmd, key, exp, flags, value, rev, cas=0):
137        seqno, revid = rev
138        meta_data = struct.pack('>I', seqno) + revid
139        meta_type = couchbaseConstants.META_REVID
140        meta = (meta_type, meta_data)
141        return self._doMetaCmd(cmd, key, exp, flags, value, meta, cas)
142
143    def set(self, key, exp, flags, val):
144        """Set a value in the memcached server."""
145        return self._mutate(couchbaseConstants.CMD_SET, key, exp, flags, 0, val)
146
147    def setWithMeta(self, key, value, exp, flags, seqno, remote_cas):
148        """Set a value and its meta data in the memcached server."""
149        return self._doMetaCmd(couchbaseConstants.CMD_SET_WITH_META,
150                               key, value, 0, exp, flags, seqno, remote_cas)
151
152    def setWithRev(self, key, exp, flags, value, rev):
153        """Set a value and its revision in the memcached server."""
154        return self._doRevCmd(couchbaseConstants.CMD_SET_WITH_META,
155                              key, exp, flags, value, rev)
156
157    def add(self, key, exp, flags, val):
158        """Add a value in the memcached server iff it doesn't already exist."""
159        return self._mutate(couchbaseConstants.CMD_ADD, key, exp, flags, 0, val)
160
161    def addWithMeta(self, key, value, exp, flags, seqno, remote_cas):
162        return self._doMetaCmd(couchbaseConstants.CMD_ADD_WITH_META,
163                               key, value, 0, exp, flags, seqno, remote_cas)
164
165    def addWithRev(self, key, exp, flags, value, rev):
166        return self._doRevCmd(couchbaseConstants.CMD_ADD_WITH_META,
167                              key, exp, flags, value, rev)
168
169    def replace(self, key, exp, flags, val):
170        """Replace a value in the memcached server iff it already exists."""
171        return self._mutate(couchbaseConstants.CMD_REPLACE, key, exp, flags, 0,
172            val)
173
174    def observe(self, key, vbucket):
175        """Observe a key for persistence and replication."""
176        value = struct.pack('>HH', vbucket, len(key)) + key
177        opaque, cas, data = self._doCmd(couchbaseConstants.CMD_OBSERVE, '', value)
178        rep_time = (cas & 0xFFFFFFFF)
179        persist_time =  (cas >> 32) & 0xFFFFFFFF
180        persisted = struct.unpack('>B', data[4+len(key)])[0]
181        return opaque, rep_time, persist_time, persisted
182
183    def __parseGet(self, data, klen=0):
184        flags=struct.unpack(couchbaseConstants.GET_RES_FMT, data[-1][:4])[0]
185        return flags, data[1], data[-1][4 + klen:]
186
187    def get(self, key):
188        """Get the value for a given key within the memcached server."""
189        parts=self._doCmd(couchbaseConstants.CMD_GET, key, '')
190        return self.__parseGet(parts)
191
192    def getMeta(self, key):
193        """Get the metadata for a given key within the memcached server."""
194        opaque, cas, data = self._doCmd(couchbaseConstants.CMD_GET_META, key, '')
195        deleted = struct.unpack('>I', data[0:4])[0]
196        flags = struct.unpack('>I', data[4:8])[0]
197        exp = struct.unpack('>I', data[8:12])[0]
198        seqno = struct.unpack('>Q', data[12:20])[0]
199        return (deleted, flags, exp, seqno, cas)
200
201    def getl(self, key, exp=15):
202        """Get the value for a given key within the memcached server."""
203        parts=self._doCmd(couchbaseConstants.CMD_GET_LOCKED, key, '',
204            struct.pack(couchbaseConstants.GETL_PKT_FMT, exp))
205        return self.__parseGet(parts)
206
207    def cas(self, key, exp, flags, oldVal, val):
208        """CAS in a new value for the given key and comparison value."""
209        self._mutate(couchbaseConstants.CMD_SET, key, exp, flags,
210            oldVal, val)
211
212    def touch(self, key, exp):
213        """Touch a key in the memcached server."""
214        return self._doCmd(couchbaseConstants.CMD_TOUCH, key, '',
215            struct.pack(couchbaseConstants.TOUCH_PKT_FMT, exp))
216
217    def gat(self, key, exp):
218        """Get the value for a given key and touch it within the memcached server."""
219        parts=self._doCmd(couchbaseConstants.CMD_GAT, key, '',
220            struct.pack(couchbaseConstants.GAT_PKT_FMT, exp))
221        return self.__parseGet(parts)
222
223    def getr(self, key):
224        """Get the value for a given key in a replica vbucket within the memcached server."""
225        parts=self._doCmd(couchbaseConstants.CMD_GET_REPLICA, key, '')
226        return self.__parseGet(parts, len(key))
227
228    def version(self):
229        """Get the value for a given key within the memcached server."""
230        return self._doCmd(couchbaseConstants.CMD_VERSION, '', '')
231
232    def verbose(self, level):
233        """Set the verbosity level."""
234        return self._doCmd(couchbaseConstants.CMD_VERBOSE, '', '',
235                           extraHeader=struct.pack(">I", level))
236
237    def sasl_mechanisms(self):
238        """Get the supported SASL methods."""
239        return set(self._doCmd(couchbaseConstants.CMD_SASL_LIST_MECHS,
240                               '', '')[2].split(' '))
241
242    def sasl_auth_start(self, mech, data):
243        """Start a sasl auth session."""
244        return self._doCmd(couchbaseConstants.CMD_SASL_AUTH, mech, data)
245
246    def sasl_auth_plain(self, user, password, foruser=''):
247        """Perform plain auth."""
248        return self.sasl_auth_start('PLAIN', '\0'.join([foruser, user, password]))
249
250    def sasl_auth_cram_md5(self, user, password):
251        """Start a plan auth session."""
252        challenge = ""
253        try:
254            self.sasl_auth_start('CRAM-MD5', '')
255        except MemcachedError, e:
256            if e.status != couchbaseConstants.ERR_AUTH_CONTINUE:
257                raise
258            challenge = e.msg
259
260        dig = hmac.HMAC(password, challenge).hexdigest()
261        return self._doCmd(couchbaseConstants.CMD_SASL_STEP, 'CRAM-MD5',
262                           user + ' ' + dig)
263
264    def stop_persistence(self):
265        return self._doCmd(couchbaseConstants.CMD_STOP_PERSISTENCE, '', '')
266
267    def start_persistence(self):
268        return self._doCmd(couchbaseConstants.CMD_START_PERSISTENCE, '', '')
269
270    def set_param(self, key, val, type):
271        print "setting param:", key, val
272        type = struct.pack(couchbaseConstants.SET_PARAM_FMT, type)
273        return self._doCmd(couchbaseConstants.CMD_SET_PARAM, key, val, type)
274
275    def set_vbucket_state(self, vbucket, stateName):
276        assert isinstance(vbucket, int)
277        self.vbucketId = vbucket
278        state = struct.pack(couchbaseConstants.VB_SET_PKT_FMT,
279                            couchbaseConstants.VB_STATE_NAMES[stateName])
280        return self._doCmd(couchbaseConstants.CMD_SET_VBUCKET_STATE, '', '', state)
281
282    def get_vbucket_state(self, vbucket):
283        assert isinstance(vbucket, int)
284        self.vbucketId = vbucket
285        return self._doCmd(couchbaseConstants.CMD_GET_VBUCKET_STATE, '', '')
286
287    def delete_vbucket(self, vbucket):
288        assert isinstance(vbucket, int)
289        self.vbucketId = vbucket
290        return self._doCmd(couchbaseConstants.CMD_DELETE_VBUCKET, '', '')
291
292    def evict_key(self, key):
293        return self._doCmd(couchbaseConstants.CMD_EVICT_KEY, key, '')
294
295    def getMulti(self, keys):
296        """Get values for any available keys in the given iterable.
297
298        Returns a dict of matched keys to their values."""
299        opaqued=dict(enumerate(keys))
300        terminal=len(opaqued)+10
301        # Send all of the keys in quiet
302        for k,v in opaqued.iteritems():
303            self._sendCmd(couchbaseConstants.CMD_GETQ, v, '', k)
304
305        self._sendCmd(couchbaseConstants.CMD_NOOP, '', '', terminal)
306
307        # Handle the response
308        rv={}
309        done=False
310        while not done:
311            opaque, cas, data=self._handleSingleResponse(None)
312            if opaque != terminal:
313                rv[opaqued[opaque]]=self.__parseGet((opaque, cas, data))
314            else:
315                done=True
316
317        return rv
318
319    def setMulti(self, exp, flags, items):
320        """Multi-set (using setq).
321
322        Give me (key, value) pairs."""
323
324        # If this is a dict, convert it to a pair generator
325        if hasattr(items, 'iteritems'):
326            items = items.iteritems()
327
328        opaqued=dict(enumerate(items))
329        terminal=len(opaqued)+10
330        extra=struct.pack(SET_PKT_FMT, flags, exp)
331
332        # Send all of the keys in quiet
333        for opaque,kv in opaqued.iteritems():
334            self._sendCmd(couchbaseConstants.CMD_SETQ, kv[0], kv[1], opaque, extra)
335
336        self._sendCmd(couchbaseConstants.CMD_NOOP, '', '', terminal)
337
338        # Handle the response
339        failed = []
340        done=False
341        while not done:
342            try:
343                opaque, cas, data = self._handleSingleResponse(None)
344                done = opaque == terminal
345            except MemcachedError, e:
346                failed.append(e)
347
348        return failed
349
350    def delMulti(self, items):
351        """Multi-delete (using delq).
352
353        Give me a collection of keys."""
354
355        opaqued = dict(enumerate(items))
356        terminal = len(opaqued)+10
357        extra = ''
358
359        # Send all of the keys in quiet
360        for opaque, k in opaqued.iteritems():
361            self._sendCmd(couchbaseConstants.CMD_DELETEQ, k, '', opaque, extra)
362
363        self._sendCmd(couchbaseConstants.CMD_NOOP, '', '', terminal)
364
365        # Handle the response
366        failed = []
367        done=False
368        while not done:
369            try:
370                opaque, cas, data = self._handleSingleResponse(None)
371                done = opaque == terminal
372            except MemcachedError, e:
373                failed.append(e)
374
375        return failed
376
377    def stats(self, sub=''):
378        """Get stats."""
379        opaque=self.r.randint(0, 2**32)
380        self._sendCmd(couchbaseConstants.CMD_STAT, sub, '', opaque)
381        done = False
382        rv = {}
383        while not done:
384            cmd, opaque, cas, klen, extralen, data = self._handleKeyedResponse(None)
385            if klen:
386                rv[data[0:klen]] = data[klen:]
387            else:
388                done = True
389        return rv
390
391    def noop(self):
392        """Send a noop command."""
393        return self._doCmd(couchbaseConstants.CMD_NOOP, '', '')
394
395    def hello(self):
396        """Send a hello command for feature checking"""
397        #MB-11902
398        #return self._doCmd(couchbaseConstants.CMD_HELLO, '', struct.pack(">H", 1))
399        return 0, 0, 0
400
401    def delete(self, key, cas=0):
402        """Delete the value for a given key within the memcached server."""
403        return self._doCmd(couchbaseConstants.CMD_DELETE, key, '', '', cas)
404
405    def flush(self, timebomb=0):
406        """Flush all storage in a memcached instance."""
407        return self._doCmd(couchbaseConstants.CMD_FLUSH, '', '',
408            struct.pack(couchbaseConstants.FLUSH_PKT_FMT, timebomb))
409
410    def bucket_select(self, name):
411        return self._doCmd(couchbaseConstants.CMD_SELECT_BUCKET, name, '')
412
413    def restore_file(self, filename):
414        """Initiate restore of a given file."""
415        return self._doCmd(couchbaseConstants.CMD_RESTORE_FILE, filename, '', '', 0)
416
417    def restore_complete(self):
418        """Notify the server that we're done restoring."""
419        return self._doCmd(couchbaseConstants.CMD_RESTORE_COMPLETE, '', '', '', 0)
420
421    def deregister_tap_client(self, tap_name):
422        """Deregister the TAP client with a given name."""
423        return self._doCmd(couchbaseConstants.CMD_DEREGISTER_TAP_CLIENT, tap_name, '', '', 0)
424
425    def reset_replication_chain(self):
426        """Reset the replication chain."""
427        return self._doCmd(couchbaseConstants.CMD_RESET_REPLICATION_CHAIN, '', '', '', 0)
428