1#!/usr/bin/env python 2""" 3Ascii memcached test client. 4""" 5 6import socket 7import select 8import exceptions 9 10import memcacheConstants 11 12class MemcachedError(exceptions.Exception): 13 """Error raised when a command fails.""" 14 15 def __init__(self, status, msg): 16 supermsg='Memcached error #' + `status` 17 if msg: supermsg += ": " + msg 18 exceptions.Exception.__init__(self, supermsg) 19 20 self.status=status 21 self.msg=msg 22 23 def __repr__(self): 24 return "<MemcachedError #%d ``%s''>" % (self.status, self.msg) 25 26class MemcachedAsciiClient(object): 27 """Simple ascii memcached client.""" 28 29 def __init__(self, host='127.0.0.1', port=11211, timeout=30): 30 self.host = host 31 self.port = port 32 self.s=socket.socket(socket.AF_INET, socket.SOCK_STREAM) 33 self.timeout = timeout 34 self.s.connect_ex((host, port)) 35 36 def close(self): 37 self.s.close() 38 39 def __del__(self): 40 self.close() 41 42 def _sendMsg(self, cmd): 43 _, w, _ = select.select([], [self.s], [], self.timeout) 44 if w: 45 self.s.send(cmd) 46 else: 47 raise exceptions.EOFError("Timeout waiting for socket send. from {0}".format(self.host)) 48 49 def _recvMsg(self): 50 r, _, _ = select.select([self.s], [], [], self.timeout) 51 if r: 52 response = "" 53 while not response.endswith("\r\n"): 54 data = self.s.recv(1) 55 if data == '': 56 raise exceptions.EOFError("Got empty data (remote died?). from {0}".format(self.host)) 57 response += data 58 return response[:-2] 59 else: 60 raise exceptions.EOFError("Timeout waiting for socket recv. from {0}".format(self.host)) 61 62 def _recvData(self, length): 63 r, _, _ = select.select([self.s], [], [], self.timeout) 64 if r: 65 response = "" 66 while len(response) < length + 2: 67 data = self.s.recv((length + 2) - len(response)) 68 if data == '': 69 raise exceptions.EOFError("Got empty data (remote died?). from {0}".format(self.host)) 70 response += data 71 return response[:-2] 72 else: 73 raise exceptions.EOFError("Timeout waiting for socket recv. from {0}".format(self.host)) 74 75 def _doStore(self, cmd): 76 """Send a command and await its response.""" 77 self._sendMsg(cmd) 78 return self._recvMsg() 79 80 def _doRetrieve(self, cmd): 81 """Send a command and await its response.""" 82 self._sendMsg(cmd) 83 msg = self._recvMsg() 84 result = {} 85 error = "" 86 while msg.split(" ")[0] == "VALUE": 87 key = msg.split(" ")[1] 88 flags = int(msg.split(" ")[2]) % 2**32 89 length = int(msg.split(" ")[3]) 90 cas = int(msg.split(" ")[4]) 91 data = self._recvData(length) 92 result[key] = (flags,cas,data) 93 msg = self._recvMsg() 94 if msg != "END": 95 error = msg 96 return result, error 97 98 def _doStatsVersion(self, cmd): 99 """Send a command and await its response.""" 100 self._sendMsg(cmd) 101 msg = self._recvMsg() 102 result = {} 103 error = "" 104 while msg.split(" ")[0] == "STAT" or \ 105 msg.split(" ")[0] == "VERSION": 106 print "msg:",msg 107 kind = msg.split(" ")[0] 108 key = msg.split(" ")[1] 109 if kind == "VERSION": 110 return key, "" 111 value = msg.split(" ")[2] 112 result[key] = value 113 msg = self._recvMsg() 114 if msg != "END": 115 error = msg 116 return result, error 117 118 def _doIncrDecr(self, cmd): 119 """Send a command and await its response.""" 120 self._sendMsg(cmd) 121 msg = self._recvMsg() 122 try: 123 # asci incr/decr doesn't give us the cas 124 return (int(msg), 0), "" 125 except ValueError: 126 return None, msg 127 128 def append(self, key, value, cas=0): 129 response = self._doStore("append {0} 0 0 {1} {2}\r\n{3}\r\n".format(key, len(value), cas, value)) 130 if response != "STORED": 131 raise MemcachedError(-1, response) 132 133 def prepend(self, key, value, cas=0): 134 response = self._doStore("prepend {0} 0 0 {1} {2}\r\n{3}\r\n".format(key, len(value), cas, value)) 135 if response != "STORED": 136 raise MemcachedError(-1, response) 137 138 def incr(self, key, amt=1, init=0, exp=0): 139 """Increment or create the named counter.""" 140 response, error = self._doIncrDecr("incr {0} {1}\r\n".format(key, amt)) 141 if error: 142 raise MemcachedError(-1, error) 143 return response 144 145 def decr(self, key, amt=1, init=0, exp=0): 146 """Decrement or create the named counter.""" 147 response, error = self._doIncrDecr("decr {0} {1}\r\n".format(key, amt)) 148 if error: 149 raise MemcachedError(-1, error) 150 return response 151 152 def set(self, key, exp, flags, val): 153 """Set a value in the memcached server.""" 154 response = self._doStore("set {0} {1} {2} {3}\r\n{4}\r\n".format(key, flags, exp, len(val), val)) 155 if response != "STORED": 156 raise MemcachedError(-1, response) 157 158 def add(self, key, exp, flags, val): 159 """Add a value in the memcached server iff it doesn't already exist.""" 160 response = self._doStore("add {0} {1} {2} {3}\r\n{4}\r\n".format(key, flags, exp, len(val), val)) 161 if response != "STORED": 162 raise MemcachedError(-1, response) 163 164 def replace(self, key, exp, flags, val): 165 """Replace a value in the memcached server iff it already exists.""" 166 response = self._doStore("replace {0} {1} {2} {3}\r\n{4}\r\n".format(key, flags, exp, len(val), val)) 167 if response != "STORED": 168 raise MemcachedError(-1, response) 169 170 def get(self, key): 171 """Get the value for a given key within the memcached server.""" 172 response, error = self._doRetrieve("gets {0}\r\n".format(key)) 173 if error: 174 raise MemcachedError(-1, error) 175 return response.items()[0][1] 176 177 def getl(self, key, exp=15): 178 """Get the value for a given key within the memcached server.""" 179 response, error = self._doRetrieve("getl {0} {1}\r\n".format(key, exp)) 180 if error: 181 raise MemcachedError(-1, error) 182 return response.items()[0][1] 183 184 def cas(self, key, exp, flags, oldVal, val): 185 """CAS in a new value for the given key and comparison value.""" 186 response = self._doStore("cas {0} {1} {2} {3} {4}\r\n{5}\r\n".format(key, flags, exp, len(val), oldVal, val)) 187 if response != "STORED": 188 raise MemcachedError(-1, response) 189 190 def touch(self, key, exp): 191 """Touch a key in the memcached server.""" 192 response = self._doStore("touch {0} {1}\r\n".format(key, exp)) 193 if response != "STORED": 194 raise MemcachedError(-1, response) 195 196 def gat(self, key, exp): 197 """Get the value for a given key and touch it within the memcached server.""" 198 response, error = self._doRetrieve("gat {0} {1}\r\n".format(key, exp)) 199 if error: 200 raise MemcachedError(-1, error) 201 return response.items()[0][1] 202 203 def version(self): 204 """Get the value for a given key within the memcached server.""" 205 response, error = self._doStatsVersion("version\r\n") 206 if error: 207 raise MemcachedError(-1, error) 208 return response 209 210 def getMulti(self, keys): 211 """Get values for any available keys in the given iterable. 212 Returns a dict of matched keys to their values.""" 213 214 cmd = "gets" 215 for key in keys: 216 cmd += " " + key 217 cmd += "\r\n" 218 response, error = self._doRetrieve(cmd) 219 if error: 220 raise MemcachedError(-1, error) 221 return response 222 223 def stats(self, sub=''): 224 """Get stats.""" 225 if sub: 226 sub = " " + sub 227 response, error = self._doStatsVersion("stats{0}\r\n".format(sub)) 228 if error: 229 raise MemcachedError(-1, error) 230 return response 231 232 def delete(self, key, cas=0): 233 """Delete the value for a given key within the memcached server.""" 234 response = self._doStore("delete {0} {1}\r\n".format(key, cas)) 235 if response != "DELETED": 236 raise MemcachedError(-1, response) 237 238 def flush(self, timebomb=0): 239 """Flush all storage in a memcached instance.""" 240 return self._doCmd(memcacheConstants.CMD_FLUSH, '', '', 241 struct.pack(memcacheConstants.FLUSH_PKT_FMT, timebomb)) 242 243