1#!/usr/bin/env python
2"""
3Ascii memcached test client.
4"""
5
6import socket
7import select
8import exceptions
9
10import memcacheConstants
11
12class MemcachedError(exceptions.Exception):
13    """Error raised when a command fails."""
14
15    def __init__(self, status, msg):
16        supermsg='Memcached error #' + `status`
17        if msg: supermsg += ":  " + msg
18        exceptions.Exception.__init__(self, supermsg)
19
20        self.status=status
21        self.msg=msg
22
23    def __repr__(self):
24        return "<MemcachedError #%d ``%s''>" % (self.status, self.msg)
25
26class MemcachedAsciiClient(object):
27    """Simple ascii memcached client."""
28
29    def __init__(self, host='127.0.0.1', port=11211, timeout=30):
30        self.host = host
31        self.port = port
32        self.s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
33        self.timeout = timeout
34        self.s.connect_ex((host, port))
35
36    def close(self):
37        self.s.close()
38
39    def __del__(self):
40        self.close()
41
42    def _sendMsg(self, cmd):
43        _, w, _ = select.select([], [self.s], [], self.timeout)
44        if w:
45            self.s.send(cmd)
46        else:
47            raise exceptions.EOFError("Timeout waiting for socket send. from {0}".format(self.host))
48
49    def _recvMsg(self):
50        r, _, _ = select.select([self.s], [], [], self.timeout)
51        if r:
52            response = ""
53            while not response.endswith("\r\n"):
54                data = self.s.recv(1)
55                if data == '':
56                    raise exceptions.EOFError("Got empty data (remote died?). from {0}".format(self.host))
57                response += data
58            return response[:-2]
59        else:
60            raise exceptions.EOFError("Timeout waiting for socket recv. from {0}".format(self.host))
61
62    def _recvData(self, length):
63        r, _, _ = select.select([self.s], [], [], self.timeout)
64        if r:
65            response = ""
66            while len(response) < length + 2:
67                data = self.s.recv((length + 2) - len(response))
68                if data == '':
69                    raise exceptions.EOFError("Got empty data (remote died?). from {0}".format(self.host))
70                response += data
71            return response[:-2]
72        else:
73            raise exceptions.EOFError("Timeout waiting for socket recv. from {0}".format(self.host))
74
75    def _doStore(self, cmd):
76        """Send a command and await its response."""
77        self._sendMsg(cmd)
78        return self._recvMsg()
79
80    def _doRetrieve(self, cmd):
81        """Send a command and await its response."""
82        self._sendMsg(cmd)
83        msg = self._recvMsg()
84        result = {}
85        error = ""
86        while msg.split(" ")[0] == "VALUE":
87            key = msg.split(" ")[1]
88            flags = int(msg.split(" ")[2]) % 2**32
89            length = int(msg.split(" ")[3])
90            cas = int(msg.split(" ")[4])
91            data = self._recvData(length)
92            result[key] = (flags,cas,data)
93            msg = self._recvMsg()
94        if msg != "END":
95            error = msg
96        return result, error
97
98    def _doStatsVersion(self, cmd):
99        """Send a command and await its response."""
100        self._sendMsg(cmd)
101        msg = self._recvMsg()
102        result = {}
103        error = ""
104        while msg.split(" ")[0] == "STAT" or \
105                msg.split(" ")[0] == "VERSION":
106            print "msg:",msg
107            kind = msg.split(" ")[0]
108            key = msg.split(" ")[1]
109            if kind == "VERSION":
110                return key, ""
111            value = msg.split(" ")[2]
112            result[key] = value
113            msg = self._recvMsg()
114        if msg != "END":
115            error = msg
116        return result, error
117
118    def _doIncrDecr(self, cmd):
119        """Send a command and await its response."""
120        self._sendMsg(cmd)
121        msg = self._recvMsg()
122        try:
123            # asci incr/decr doesn't give us the cas
124            return (int(msg), 0), ""
125        except ValueError:
126            return None, msg
127
128    def append(self, key, value, cas=0):
129        response = self._doStore("append {0} 0 0 {1} {2}\r\n{3}\r\n".format(key, len(value), cas, value))
130        if response != "STORED":
131            raise MemcachedError(-1, response)
132
133    def prepend(self, key, value, cas=0):
134        response = self._doStore("prepend {0} 0 0 {1} {2}\r\n{3}\r\n".format(key, len(value), cas, value))
135        if response != "STORED":
136            raise MemcachedError(-1, response)
137
138    def incr(self, key, amt=1, init=0, exp=0):
139        """Increment or create the named counter."""
140        response, error = self._doIncrDecr("incr {0} {1}\r\n".format(key, amt))
141        if error:
142            raise MemcachedError(-1, error)
143        return response
144
145    def decr(self, key, amt=1, init=0, exp=0):
146        """Decrement or create the named counter."""
147        response, error = self._doIncrDecr("decr {0} {1}\r\n".format(key, amt))
148        if error:
149            raise MemcachedError(-1, error)
150        return response
151
152    def set(self, key, exp, flags, val):
153        """Set a value in the memcached server."""
154        response = self._doStore("set {0} {1} {2} {3}\r\n{4}\r\n".format(key, flags, exp, len(val), val))
155        if response != "STORED":
156            raise MemcachedError(-1, response)
157
158    def add(self, key, exp, flags, val):
159        """Add a value in the memcached server iff it doesn't already exist."""
160        response = self._doStore("add {0} {1} {2} {3}\r\n{4}\r\n".format(key, flags, exp, len(val), val))
161        if response != "STORED":
162            raise MemcachedError(-1, response)
163
164    def replace(self, key, exp, flags, val):
165        """Replace a value in the memcached server iff it already exists."""
166        response = self._doStore("replace {0} {1} {2} {3}\r\n{4}\r\n".format(key, flags, exp, len(val), val))
167        if response != "STORED":
168            raise MemcachedError(-1, response)
169
170    def get(self, key):
171        """Get the value for a given key within the memcached server."""
172        response, error = self._doRetrieve("gets {0}\r\n".format(key))
173        if error:
174            raise MemcachedError(-1, error)
175        return response.items()[0][1]
176
177    def getl(self, key, exp=15):
178        """Get the value for a given key within the memcached server."""
179        response, error = self._doRetrieve("getl {0} {1}\r\n".format(key, exp))
180        if error:
181            raise MemcachedError(-1, error)
182        return response.items()[0][1]
183
184    def cas(self, key, exp, flags, oldVal, val):
185        """CAS in a new value for the given key and comparison value."""
186        response = self._doStore("cas {0} {1} {2} {3} {4}\r\n{5}\r\n".format(key, flags, exp, len(val), oldVal, val))
187        if response != "STORED":
188            raise MemcachedError(-1, response)
189
190    def touch(self, key, exp):
191        """Touch a key in the memcached server."""
192        response = self._doStore("touch {0} {1}\r\n".format(key, exp))
193        if response != "STORED":
194            raise MemcachedError(-1, response)
195
196    def gat(self, key, exp):
197        """Get the value for a given key and touch it within the memcached server."""
198        response, error = self._doRetrieve("gat {0} {1}\r\n".format(key, exp))
199        if error:
200            raise MemcachedError(-1, error)
201        return response.items()[0][1]
202
203    def version(self):
204        """Get the value for a given key within the memcached server."""
205        response, error = self._doStatsVersion("version\r\n")
206        if error:
207            raise MemcachedError(-1, error)
208        return response
209
210    def getMulti(self, keys):
211        """Get values for any available keys in the given iterable.
212        Returns a dict of matched keys to their values."""
213
214        cmd = "gets"
215        for key in keys:
216            cmd += " " + key
217        cmd += "\r\n"
218        response, error = self._doRetrieve(cmd)
219        if error:
220            raise MemcachedError(-1, error)
221        return response
222
223    def stats(self, sub=''):
224        """Get stats."""
225        if sub:
226            sub = " " + sub
227        response, error = self._doStatsVersion("stats{0}\r\n".format(sub))
228        if error:
229            raise MemcachedError(-1, error)
230        return response
231
232    def delete(self, key, cas=0):
233        """Delete the value for a given key within the memcached server."""
234        response = self._doStore("delete {0} {1}\r\n".format(key, cas))
235        if response != "DELETED":
236            raise MemcachedError(-1, response)
237
238    def flush(self, timebomb=0):
239        """Flush all storage in a memcached instance."""
240        return self._doCmd(memcacheConstants.CMD_FLUSH, '', '',
241            struct.pack(memcacheConstants.FLUSH_PKT_FMT, timebomb))
242
243