1#!/usr/bin/env python
2"""
3Binary memcached test client.
4
5Copyright (c) 2007  Dustin Sallings <dustin@spy.net>
6"""
7
8import sys
9import time
10import hmac
11import socket
12import random
13import struct
14import exceptions
15import select
16
17from memcacheConstants import REQ_MAGIC_BYTE, RES_MAGIC_BYTE
18from memcacheConstants import REQ_PKT_FMT, RES_PKT_FMT, MIN_RECV_PACKET
19from memcacheConstants import SET_PKT_FMT, DEL_PKT_FMT, INCRDECR_RES_FMT
20from memcacheConstants import TOUCH_PKT_FMT, GAT_PKT_FMT, GETL_PKT_FMT
21from memcacheConstants import COMPACT_DB_PKT_FMT
22import memcacheConstants
23
24class MemcachedError(exceptions.Exception):
25    """Error raised when a command fails."""
26
27    def __init__(self, status, msg):
28        supermsg='Memcached error #' + `status`
29        if msg: supermsg += ":  " + msg
30        exceptions.Exception.__init__(self, supermsg)
31
32        self.status=status
33        self.msg=msg
34
35    def __repr__(self):
36        return "<MemcachedError #%d ``%s''>" % (self.status, self.msg)
37
38class MemcachedClient(object):
39    """Simple memcached client."""
40
41    vbucketId = 0
42
43    def __init__(self, host='127.0.0.1', port=11211, family=socket.AF_INET):
44        self.host = host
45        self.port = port
46        self.s=socket.socket(family, socket.SOCK_STREAM)
47        if hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
48            self.s.connect_ex(host)
49        else:
50            self.s.connect_ex((host, port))
51        self.s.setblocking(0)
52        self.r=random.Random()
53
54    def close(self):
55        self.s.close()
56
57    def __del__(self):
58        self.close()
59
60    def _sendCmd(self, cmd, key, val, opaque, extraHeader='', cas=0):
61        self._sendMsg(cmd, key, val, opaque, extraHeader=extraHeader, cas=cas,
62                      vbucketId=self.vbucketId)
63
64    def _sendMsg(self, cmd, key, val, opaque, extraHeader='', cas=0,
65                 dtype=0, vbucketId=0,
66                 fmt=REQ_PKT_FMT, magic=REQ_MAGIC_BYTE):
67        msg=struct.pack(fmt, magic,
68            cmd, len(key), len(extraHeader), dtype, vbucketId,
69                len(key) + len(extraHeader) + len(val), opaque, cas)
70        self.s.send(msg + extraHeader + key + val)
71
72    def _socketRecv(self, amount):
73        ready = select.select([self.s], [], [], 30)
74        if ready[0]:
75            return self.s.recv(amount)
76        raise exceptions.EOFError("Operation timed out")
77
78    def _recvMsg(self):
79        response = ""
80        while len(response) < MIN_RECV_PACKET:
81            data = self._socketRecv(MIN_RECV_PACKET - len(response))
82            if data == '':
83                raise exceptions.EOFError("Got empty data (remote died?).")
84            response += data
85        assert len(response) == MIN_RECV_PACKET
86        magic, cmd, keylen, extralen, dtype, errcode, remaining, opaque, cas=\
87            struct.unpack(RES_PKT_FMT, response)
88
89        rv = ""
90        while remaining > 0:
91            data = self._socketRecv(remaining)
92            if data == '':
93                raise exceptions.EOFError("Got empty data (remote died?).")
94            rv += data
95            remaining -= len(data)
96
97        assert (magic in (RES_MAGIC_BYTE, REQ_MAGIC_BYTE)), "Got magic: %d" % magic
98        return cmd, errcode, opaque, cas, keylen, extralen, rv
99
100    def _handleKeyedResponse(self, myopaque):
101        cmd, errcode, opaque, cas, keylen, extralen, rv = self._recvMsg()
102        assert myopaque is None or opaque == myopaque, \
103            "expected opaque %x, got %x" % (myopaque, opaque)
104        if errcode != 0:
105            raise MemcachedError(errcode,  rv)
106        return cmd, opaque, cas, keylen, extralen, rv
107
108    def _handleSingleResponse(self, myopaque):
109        cmd, opaque, cas, keylen, extralen, data = self._handleKeyedResponse(myopaque)
110        return opaque, cas, data
111
112    def _doCmd(self, cmd, key, val, extraHeader='', cas=0):
113        """Send a command and await its response."""
114        opaque=self.r.randint(0, 2**32)
115        self._sendCmd(cmd, key, val, opaque, extraHeader, cas)
116        return self._handleSingleResponse(opaque)
117
118    def _mutate(self, cmd, key, exp, flags, cas, val):
119        return self._doCmd(cmd, key, val, struct.pack(SET_PKT_FMT, flags, exp),
120            cas)
121
122    def _cat(self, cmd, key, cas, val):
123        return self._doCmd(cmd, key, val, '', cas)
124
125    def append(self, key, value, cas=0):
126        return self._cat(memcacheConstants.CMD_APPEND, key, cas, value)
127
128    def prepend(self, key, value, cas=0):
129        return self._cat(memcacheConstants.CMD_PREPEND, key, cas, value)
130
131    def __incrdecr(self, cmd, key, amt, init, exp):
132        something, cas, val=self._doCmd(cmd, key, '',
133            struct.pack(memcacheConstants.INCRDECR_PKT_FMT, amt, init, exp))
134        return struct.unpack(INCRDECR_RES_FMT, val)[0], cas
135
136    def incr(self, key, amt=1, init=0, exp=0):
137        """Increment or create the named counter."""
138        return self.__incrdecr(memcacheConstants.CMD_INCR, key, amt, init, exp)
139
140    def decr(self, key, amt=1, init=0, exp=0):
141        """Decrement or create the named counter."""
142        return self.__incrdecr(memcacheConstants.CMD_DECR, key, amt, init, exp)
143
144    def _doMetaCmd(self, cmd, key, value, cas, exp, flags, seqno, remote_cas):
145        extra = struct.pack('>IIQQ', flags, exp, seqno, remote_cas)
146        return self._doCmd(cmd, key, value, extra, cas)
147
148    def set(self, key, exp, flags, val):
149        """Set a value in the memcached server."""
150        return self._mutate(memcacheConstants.CMD_SET, key, exp, flags, 0, val)
151
152    def setWithMeta(self, key, value, exp, flags, seqno, remote_cas):
153        """Set a value and its meta data in the memcached server."""
154        return self._doMetaCmd(memcacheConstants.CMD_SET_WITH_META,
155                               key, value, 0, exp, flags, seqno, remote_cas)
156
157    def add(self, key, exp, flags, val):
158        """Add a value in the memcached server iff it doesn't already exist."""
159        return self._mutate(memcacheConstants.CMD_ADD, key, exp, flags, 0, val)
160
161    def addWithMeta(self, key, value, exp, flags, seqno, remote_cas):
162        return self._doMetaCmd(memcacheConstants.CMD_ADD_WITH_META,
163                               key, value, 0, exp, flags, seqno, remote_cas)
164
165    def replace(self, key, exp, flags, val):
166        """Replace a value in the memcached server iff it already exists."""
167        return self._mutate(memcacheConstants.CMD_REPLACE, key, exp, flags, 0,
168            val)
169
170    def observe(self, key, vbucket):
171        """Observe a key for persistence and replication."""
172        value = struct.pack('>HH', vbucket, len(key)) + key
173        opaque, cas, data = self._doCmd(memcacheConstants.CMD_OBSERVE, '', value)
174        rep_time = (cas & 0xFFFFFFFF)
175        persist_time =  (cas >> 32) & 0xFFFFFFFF
176        persisted = struct.unpack('>B', data[4+len(key)])[0]
177        return opaque, rep_time, persist_time, persisted
178
179    def __parseGet(self, data, klen=0):
180        flags=struct.unpack(memcacheConstants.GET_RES_FMT, data[-1][:4])[0]
181        return flags, data[1], data[-1][4 + klen:]
182
183    def get(self, key):
184        """Get the value for a given key within the memcached server."""
185        parts=self._doCmd(memcacheConstants.CMD_GET, key, '')
186        return self.__parseGet(parts)
187
188    def getMeta(self, key):
189        """Get the metadata for a given key within the memcached server."""
190        opaque, cas, data = self._doCmd(memcacheConstants.CMD_GET_META, key, '')
191        deleted = struct.unpack('>I', data[0:4])[0]
192        flags = struct.unpack('>I', data[4:8])[0]
193        exp = struct.unpack('>I', data[8:12])[0]
194        seqno = struct.unpack('>Q', data[12:20])[0]
195        return (deleted, flags, exp, seqno, cas)
196
197    def getl(self, key, exp=15):
198        """Get the value for a given key within the memcached server."""
199        parts=self._doCmd(memcacheConstants.CMD_GET_LOCKED, key, '',
200            struct.pack(memcacheConstants.GETL_PKT_FMT, exp))
201        return self.__parseGet(parts)
202
203    def cas(self, key, exp, flags, oldVal, val):
204        """CAS in a new value for the given key and comparison value."""
205        self._mutate(memcacheConstants.CMD_SET, key, exp, flags,
206            oldVal, val)
207
208    def touch(self, key, exp):
209        """Touch a key in the memcached server."""
210        return self._doCmd(memcacheConstants.CMD_TOUCH, key, '',
211            struct.pack(memcacheConstants.TOUCH_PKT_FMT, exp))
212
213    def gat(self, key, exp):
214        """Get the value for a given key and touch it within the memcached server."""
215        parts=self._doCmd(memcacheConstants.CMD_GAT, key, '',
216            struct.pack(memcacheConstants.GAT_PKT_FMT, exp))
217        return self.__parseGet(parts)
218
219    def getr(self, key):
220        """Get the value for a given key in a replica vbucket within the memcached server."""
221        parts=self._doCmd(memcacheConstants.CMD_GET_REPLICA, key, '')
222        return self.__parseGet(parts, len(key))
223
224    def version(self):
225        """Get the value for a given key within the memcached server."""
226        return self._doCmd(memcacheConstants.CMD_VERSION, '', '')
227
228    def verbose(self, level):
229        """Set the verbosity level."""
230        return self._doCmd(memcacheConstants.CMD_VERBOSE, '', '',
231                           extraHeader=struct.pack(">I", level))
232
233    def sasl_mechanisms(self):
234        """Get the supported SASL methods."""
235        return set(self._doCmd(memcacheConstants.CMD_SASL_LIST_MECHS,
236                               '', '')[2].split(' '))
237
238    def sasl_auth_start(self, mech, data):
239        """Start a sasl auth session."""
240        return self._doCmd(memcacheConstants.CMD_SASL_AUTH, mech, data)
241
242    def sasl_auth_plain(self, user, password, foruser=''):
243        """Perform plain auth."""
244        return self.sasl_auth_start('PLAIN', '\0'.join([foruser, user, password]))
245
246    def sasl_auth_cram_md5(self, user, password):
247        """Start a plan auth session."""
248        try:
249            self.sasl_auth_start('CRAM-MD5', '')
250        except MemcachedError, e:
251            if e.status != memcacheConstants.ERR_AUTH_CONTINUE:
252                raise
253            challenge = e.msg
254
255        dig = hmac.HMAC(password, challenge).hexdigest()
256        return self._doCmd(memcacheConstants.CMD_SASL_STEP, 'CRAM-MD5',
257                           user + ' ' + dig)
258
259    def stop_persistence(self):
260        return self._doCmd(memcacheConstants.CMD_STOP_PERSISTENCE, '', '')
261
262    def start_persistence(self):
263        return self._doCmd(memcacheConstants.CMD_START_PERSISTENCE, '', '')
264
265    def set_param(self, key, val, type):
266        print "setting param:", key, val
267        type = struct.pack(memcacheConstants.SET_PARAM_FMT, type)
268        return self._doCmd(memcacheConstants.CMD_SET_PARAM, key, val, type)
269
270    def set_vbucket_state(self, vbucket, stateName):
271        assert isinstance(vbucket, int)
272        self.vbucketId = vbucket
273        state = struct.pack(memcacheConstants.VB_SET_PKT_FMT,
274                            memcacheConstants.VB_STATE_NAMES[stateName])
275        return self._doCmd(memcacheConstants.CMD_SET_VBUCKET_STATE, '', '',
276                           state)
277
278    def compact_db(self, vbucket, purgeBeforeTs, purgeBeforeSeq, dropDeletes):
279        assert isinstance(vbucket, int)
280        assert isinstance(purgeBeforeTs, long)
281        assert isinstance(purgeBeforeSeq, long)
282        assert isinstance(dropDeletes, int)
283        self.vbucketId = vbucket
284        compact = struct.pack(memcacheConstants.COMPACT_DB_PKT_FMT,
285                            purgeBeforeTs, purgeBeforeSeq, dropDeletes)
286        return self._doCmd(memcacheConstants.CMD_COMPACT_DB, '', '',
287                           compact)
288
289    def get_vbucket_state(self, vbucket):
290        assert isinstance(vbucket, int)
291        self.vbucketId = vbucket
292        return self._doCmd(memcacheConstants.CMD_GET_VBUCKET_STATE, '', '')
293
294    def delete_vbucket(self, vbucket):
295        assert isinstance(vbucket, int)
296        self.vbucketId = vbucket
297        return self._doCmd(memcacheConstants.CMD_DELETE_VBUCKET, '', '')
298
299    def evict_key(self, key):
300        return self._doCmd(memcacheConstants.CMD_EVICT_KEY, key, '')
301
302    def getMulti(self, keys):
303        """Get values for any available keys in the given iterable.
304
305        Returns a dict of matched keys to their values."""
306        opaqued=dict(enumerate(keys))
307        terminal=len(opaqued)+10
308        # Send all of the keys in quiet
309        for k,v in opaqued.iteritems():
310            self._sendCmd(memcacheConstants.CMD_GETQ, v, '', k)
311
312        self._sendCmd(memcacheConstants.CMD_NOOP, '', '', terminal)
313
314        # Handle the response
315        rv={}
316        done=False
317        while not done:
318            opaque, cas, data=self._handleSingleResponse(None)
319            if opaque != terminal:
320                rv[opaqued[opaque]]=self.__parseGet((opaque, cas, data))
321            else:
322                done=True
323
324        return rv
325
326    def setMulti(self, exp, flags, items):
327        """Multi-set (using setq).
328
329        Give me (key, value) pairs."""
330
331        # If this is a dict, convert it to a pair generator
332        if hasattr(items, 'iteritems'):
333            items = items.iteritems()
334
335        opaqued=dict(enumerate(items))
336        terminal=len(opaqued)+10
337        extra=struct.pack(SET_PKT_FMT, flags, exp)
338
339        # Send all of the keys in quiet
340        for opaque,kv in opaqued.iteritems():
341            self._sendCmd(memcacheConstants.CMD_SETQ, kv[0], kv[1], opaque, extra)
342
343        self._sendCmd(memcacheConstants.CMD_NOOP, '', '', terminal)
344
345        # Handle the response
346        failed = []
347        done=False
348        while not done:
349            try:
350                opaque, cas, data = self._handleSingleResponse(None)
351                done = opaque == terminal
352            except MemcachedError, e:
353                failed.append(e)
354
355        return failed
356
357    def delMulti(self, items):
358        """Multi-delete (using delq).
359
360        Give me a collection of keys."""
361
362        opaqued = dict(enumerate(items))
363        terminal = len(opaqued)+10
364        extra = ''
365
366        # Send all of the keys in quiet
367        for opaque, k in opaqued.iteritems():
368            self._sendCmd(memcacheConstants.CMD_DELETEQ, k, '', opaque, extra)
369
370        self._sendCmd(memcacheConstants.CMD_NOOP, '', '', terminal)
371
372        # Handle the response
373        failed = []
374        done=False
375        while not done:
376            try:
377                opaque, cas, data = self._handleSingleResponse(None)
378                done = opaque == terminal
379            except MemcachedError, e:
380                failed.append(e)
381
382        return failed
383
384    def stats(self, sub=''):
385        """Get stats."""
386        opaque=self.r.randint(0, 2**32)
387        self._sendCmd(memcacheConstants.CMD_STAT, sub, '', opaque)
388        done = False
389        rv = {}
390        while not done:
391            cmd, opaque, cas, klen, extralen, data = self._handleKeyedResponse(None)
392            if klen:
393                rv[data[0:klen]] = data[klen:]
394            else:
395                done = True
396        return rv
397
398    def noop(self):
399        """Send a noop command."""
400        return self._doCmd(memcacheConstants.CMD_NOOP, '', '')
401
402    def delete(self, key, cas=0):
403        """Delete the value for a given key within the memcached server."""
404        return self._doCmd(memcacheConstants.CMD_DELETE, key, '', '', cas)
405
406    def flush(self, timebomb=0):
407        """Flush all storage in a memcached instance."""
408        return self._doCmd(memcacheConstants.CMD_FLUSH, '', '',
409            struct.pack(memcacheConstants.FLUSH_PKT_FMT, timebomb))
410
411    def bucket_select(self, name):
412        return self._doCmd(memcacheConstants.CMD_SELECT_BUCKET, name, '')
413
414    def deregister_tap_client(self, tap_name):
415        """Deregister the TAP client with a given name."""
416        return self._doCmd(memcacheConstants.CMD_DEREGISTER_TAP_CLIENT, tap_name, '', '', 0)
417
418    def reset_replication_chain(self):
419        """Reset the replication chain."""
420        return self._doCmd(memcacheConstants.CMD_RESET_REPLICATION_CHAIN, '', '', '', 0)
421