1#!/usr/bin/env python 2""" 3Binary memcached test client. 4 5Copyright (c) 2007 Dustin Sallings <dustin@spy.net> 6""" 7 8import sys 9import time 10import hmac 11import socket 12import random 13import struct 14import exceptions 15import select 16 17from memcacheConstants import REQ_MAGIC_BYTE, RES_MAGIC_BYTE 18from memcacheConstants import REQ_PKT_FMT, RES_PKT_FMT, MIN_RECV_PACKET 19from memcacheConstants import SET_PKT_FMT, DEL_PKT_FMT, INCRDECR_RES_FMT 20from memcacheConstants import TOUCH_PKT_FMT, GAT_PKT_FMT, GETL_PKT_FMT 21from memcacheConstants import COMPACT_DB_PKT_FMT 22import memcacheConstants 23 24class MemcachedError(exceptions.Exception): 25 """Error raised when a command fails.""" 26 27 def __init__(self, status, msg): 28 supermsg='Memcached error #' + `status` 29 if msg: supermsg += ": " + msg 30 exceptions.Exception.__init__(self, supermsg) 31 32 self.status=status 33 self.msg=msg 34 35 def __repr__(self): 36 return "<MemcachedError #%d ``%s''>" % (self.status, self.msg) 37 38class MemcachedClient(object): 39 """Simple memcached client.""" 40 41 vbucketId = 0 42 43 def __init__(self, host='127.0.0.1', port=11211, family=socket.AF_INET): 44 self.host = host 45 self.port = port 46 self.s=socket.socket(family, socket.SOCK_STREAM) 47 if hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 48 self.s.connect_ex(host) 49 else: 50 self.s.connect_ex((host, port)) 51 self.s.setblocking(0) 52 self.r=random.Random() 53 54 def close(self): 55 self.s.close() 56 57 def __del__(self): 58 self.close() 59 60 def _sendCmd(self, cmd, key, val, opaque, extraHeader='', cas=0): 61 self._sendMsg(cmd, key, val, opaque, extraHeader=extraHeader, cas=cas, 62 vbucketId=self.vbucketId) 63 64 def _sendMsg(self, cmd, key, val, opaque, extraHeader='', cas=0, 65 dtype=0, vbucketId=0, 66 fmt=REQ_PKT_FMT, magic=REQ_MAGIC_BYTE): 67 msg=struct.pack(fmt, magic, 68 cmd, len(key), len(extraHeader), dtype, vbucketId, 69 len(key) + len(extraHeader) + len(val), opaque, cas) 70 self.s.send(msg + extraHeader + key + val) 71 72 def _socketRecv(self, amount): 73 ready = select.select([self.s], [], [], 30) 74 if ready[0]: 75 return self.s.recv(amount) 76 raise exceptions.EOFError("Operation timed out") 77 78 def _recvMsg(self): 79 response = "" 80 while len(response) < MIN_RECV_PACKET: 81 data = self._socketRecv(MIN_RECV_PACKET - len(response)) 82 if data == '': 83 raise exceptions.EOFError("Got empty data (remote died?).") 84 response += data 85 assert len(response) == MIN_RECV_PACKET 86 magic, cmd, keylen, extralen, dtype, errcode, remaining, opaque, cas=\ 87 struct.unpack(RES_PKT_FMT, response) 88 89 rv = "" 90 while remaining > 0: 91 data = self._socketRecv(remaining) 92 if data == '': 93 raise exceptions.EOFError("Got empty data (remote died?).") 94 rv += data 95 remaining -= len(data) 96 97 assert (magic in (RES_MAGIC_BYTE, REQ_MAGIC_BYTE)), "Got magic: %d" % magic 98 return cmd, errcode, opaque, cas, keylen, extralen, rv 99 100 def _handleKeyedResponse(self, myopaque): 101 cmd, errcode, opaque, cas, keylen, extralen, rv = self._recvMsg() 102 assert myopaque is None or opaque == myopaque, \ 103 "expected opaque %x, got %x" % (myopaque, opaque) 104 if errcode != 0: 105 raise MemcachedError(errcode, rv) 106 return cmd, opaque, cas, keylen, extralen, rv 107 108 def _handleSingleResponse(self, myopaque): 109 cmd, opaque, cas, keylen, extralen, data = self._handleKeyedResponse(myopaque) 110 return opaque, cas, data 111 112 def _doCmd(self, cmd, key, val, extraHeader='', cas=0): 113 """Send a command and await its response.""" 114 opaque=self.r.randint(0, 2**32) 115 self._sendCmd(cmd, key, val, opaque, extraHeader, cas) 116 return self._handleSingleResponse(opaque) 117 118 def _mutate(self, cmd, key, exp, flags, cas, val): 119 return self._doCmd(cmd, key, val, struct.pack(SET_PKT_FMT, flags, exp), 120 cas) 121 122 def _cat(self, cmd, key, cas, val): 123 return self._doCmd(cmd, key, val, '', cas) 124 125 def append(self, key, value, cas=0): 126 return self._cat(memcacheConstants.CMD_APPEND, key, cas, value) 127 128 def prepend(self, key, value, cas=0): 129 return self._cat(memcacheConstants.CMD_PREPEND, key, cas, value) 130 131 def __incrdecr(self, cmd, key, amt, init, exp): 132 something, cas, val=self._doCmd(cmd, key, '', 133 struct.pack(memcacheConstants.INCRDECR_PKT_FMT, amt, init, exp)) 134 return struct.unpack(INCRDECR_RES_FMT, val)[0], cas 135 136 def incr(self, key, amt=1, init=0, exp=0): 137 """Increment or create the named counter.""" 138 return self.__incrdecr(memcacheConstants.CMD_INCR, key, amt, init, exp) 139 140 def decr(self, key, amt=1, init=0, exp=0): 141 """Decrement or create the named counter.""" 142 return self.__incrdecr(memcacheConstants.CMD_DECR, key, amt, init, exp) 143 144 def _doMetaCmd(self, cmd, key, value, cas, exp, flags, seqno, remote_cas): 145 extra = struct.pack('>IIQQ', flags, exp, seqno, remote_cas) 146 return self._doCmd(cmd, key, value, extra, cas) 147 148 def set(self, key, exp, flags, val): 149 """Set a value in the memcached server.""" 150 return self._mutate(memcacheConstants.CMD_SET, key, exp, flags, 0, val) 151 152 def setWithMeta(self, key, value, exp, flags, seqno, remote_cas): 153 """Set a value and its meta data in the memcached server.""" 154 return self._doMetaCmd(memcacheConstants.CMD_SET_WITH_META, 155 key, value, 0, exp, flags, seqno, remote_cas) 156 157 def add(self, key, exp, flags, val): 158 """Add a value in the memcached server iff it doesn't already exist.""" 159 return self._mutate(memcacheConstants.CMD_ADD, key, exp, flags, 0, val) 160 161 def addWithMeta(self, key, value, exp, flags, seqno, remote_cas): 162 return self._doMetaCmd(memcacheConstants.CMD_ADD_WITH_META, 163 key, value, 0, exp, flags, seqno, remote_cas) 164 165 def replace(self, key, exp, flags, val): 166 """Replace a value in the memcached server iff it already exists.""" 167 return self._mutate(memcacheConstants.CMD_REPLACE, key, exp, flags, 0, 168 val) 169 170 def observe(self, key, vbucket): 171 """Observe a key for persistence and replication.""" 172 value = struct.pack('>HH', vbucket, len(key)) + key 173 opaque, cas, data = self._doCmd(memcacheConstants.CMD_OBSERVE, '', value) 174 rep_time = (cas & 0xFFFFFFFF) 175 persist_time = (cas >> 32) & 0xFFFFFFFF 176 persisted = struct.unpack('>B', data[4+len(key)])[0] 177 return opaque, rep_time, persist_time, persisted 178 179 def __parseGet(self, data, klen=0): 180 flags=struct.unpack(memcacheConstants.GET_RES_FMT, data[-1][:4])[0] 181 return flags, data[1], data[-1][4 + klen:] 182 183 def get(self, key): 184 """Get the value for a given key within the memcached server.""" 185 parts=self._doCmd(memcacheConstants.CMD_GET, key, '') 186 return self.__parseGet(parts) 187 188 def getMeta(self, key): 189 """Get the metadata for a given key within the memcached server.""" 190 opaque, cas, data = self._doCmd(memcacheConstants.CMD_GET_META, key, '') 191 deleted = struct.unpack('>I', data[0:4])[0] 192 flags = struct.unpack('>I', data[4:8])[0] 193 exp = struct.unpack('>I', data[8:12])[0] 194 seqno = struct.unpack('>Q', data[12:20])[0] 195 return (deleted, flags, exp, seqno, cas) 196 197 def getl(self, key, exp=15): 198 """Get the value for a given key within the memcached server.""" 199 parts=self._doCmd(memcacheConstants.CMD_GET_LOCKED, key, '', 200 struct.pack(memcacheConstants.GETL_PKT_FMT, exp)) 201 return self.__parseGet(parts) 202 203 def cas(self, key, exp, flags, oldVal, val): 204 """CAS in a new value for the given key and comparison value.""" 205 self._mutate(memcacheConstants.CMD_SET, key, exp, flags, 206 oldVal, val) 207 208 def touch(self, key, exp): 209 """Touch a key in the memcached server.""" 210 return self._doCmd(memcacheConstants.CMD_TOUCH, key, '', 211 struct.pack(memcacheConstants.TOUCH_PKT_FMT, exp)) 212 213 def gat(self, key, exp): 214 """Get the value for a given key and touch it within the memcached server.""" 215 parts=self._doCmd(memcacheConstants.CMD_GAT, key, '', 216 struct.pack(memcacheConstants.GAT_PKT_FMT, exp)) 217 return self.__parseGet(parts) 218 219 def getr(self, key): 220 """Get the value for a given key in a replica vbucket within the memcached server.""" 221 parts=self._doCmd(memcacheConstants.CMD_GET_REPLICA, key, '') 222 return self.__parseGet(parts, len(key)) 223 224 def version(self): 225 """Get the value for a given key within the memcached server.""" 226 return self._doCmd(memcacheConstants.CMD_VERSION, '', '') 227 228 def verbose(self, level): 229 """Set the verbosity level.""" 230 return self._doCmd(memcacheConstants.CMD_VERBOSE, '', '', 231 extraHeader=struct.pack(">I", level)) 232 233 def sasl_mechanisms(self): 234 """Get the supported SASL methods.""" 235 return set(self._doCmd(memcacheConstants.CMD_SASL_LIST_MECHS, 236 '', '')[2].split(' ')) 237 238 def sasl_auth_start(self, mech, data): 239 """Start a sasl auth session.""" 240 return self._doCmd(memcacheConstants.CMD_SASL_AUTH, mech, data) 241 242 def sasl_auth_plain(self, user, password, foruser=''): 243 """Perform plain auth.""" 244 return self.sasl_auth_start('PLAIN', '\0'.join([foruser, user, password])) 245 246 def sasl_auth_cram_md5(self, user, password): 247 """Start a plan auth session.""" 248 try: 249 self.sasl_auth_start('CRAM-MD5', '') 250 except MemcachedError, e: 251 if e.status != memcacheConstants.ERR_AUTH_CONTINUE: 252 raise 253 challenge = e.msg 254 255 dig = hmac.HMAC(password, challenge).hexdigest() 256 return self._doCmd(memcacheConstants.CMD_SASL_STEP, 'CRAM-MD5', 257 user + ' ' + dig) 258 259 def stop_persistence(self): 260 return self._doCmd(memcacheConstants.CMD_STOP_PERSISTENCE, '', '') 261 262 def start_persistence(self): 263 return self._doCmd(memcacheConstants.CMD_START_PERSISTENCE, '', '') 264 265 def set_param(self, key, val, type): 266 print "setting param:", key, val 267 type = struct.pack(memcacheConstants.SET_PARAM_FMT, type) 268 return self._doCmd(memcacheConstants.CMD_SET_PARAM, key, val, type) 269 270 def set_vbucket_state(self, vbucket, stateName): 271 assert isinstance(vbucket, int) 272 self.vbucketId = vbucket 273 state = struct.pack(memcacheConstants.VB_SET_PKT_FMT, 274 memcacheConstants.VB_STATE_NAMES[stateName]) 275 return self._doCmd(memcacheConstants.CMD_SET_VBUCKET_STATE, '', '', 276 state) 277 278 def compact_db(self, vbucket, purgeBeforeTs, purgeBeforeSeq, dropDeletes): 279 assert isinstance(vbucket, int) 280 assert isinstance(purgeBeforeTs, long) 281 assert isinstance(purgeBeforeSeq, long) 282 assert isinstance(dropDeletes, int) 283 self.vbucketId = vbucket 284 compact = struct.pack(memcacheConstants.COMPACT_DB_PKT_FMT, 285 purgeBeforeTs, purgeBeforeSeq, dropDeletes) 286 return self._doCmd(memcacheConstants.CMD_COMPACT_DB, '', '', 287 compact) 288 289 def get_vbucket_state(self, vbucket): 290 assert isinstance(vbucket, int) 291 self.vbucketId = vbucket 292 return self._doCmd(memcacheConstants.CMD_GET_VBUCKET_STATE, '', '') 293 294 def delete_vbucket(self, vbucket): 295 assert isinstance(vbucket, int) 296 self.vbucketId = vbucket 297 return self._doCmd(memcacheConstants.CMD_DELETE_VBUCKET, '', '') 298 299 def evict_key(self, key): 300 return self._doCmd(memcacheConstants.CMD_EVICT_KEY, key, '') 301 302 def getMulti(self, keys): 303 """Get values for any available keys in the given iterable. 304 305 Returns a dict of matched keys to their values.""" 306 opaqued=dict(enumerate(keys)) 307 terminal=len(opaqued)+10 308 # Send all of the keys in quiet 309 for k,v in opaqued.iteritems(): 310 self._sendCmd(memcacheConstants.CMD_GETQ, v, '', k) 311 312 self._sendCmd(memcacheConstants.CMD_NOOP, '', '', terminal) 313 314 # Handle the response 315 rv={} 316 done=False 317 while not done: 318 opaque, cas, data=self._handleSingleResponse(None) 319 if opaque != terminal: 320 rv[opaqued[opaque]]=self.__parseGet((opaque, cas, data)) 321 else: 322 done=True 323 324 return rv 325 326 def setMulti(self, exp, flags, items): 327 """Multi-set (using setq). 328 329 Give me (key, value) pairs.""" 330 331 # If this is a dict, convert it to a pair generator 332 if hasattr(items, 'iteritems'): 333 items = items.iteritems() 334 335 opaqued=dict(enumerate(items)) 336 terminal=len(opaqued)+10 337 extra=struct.pack(SET_PKT_FMT, flags, exp) 338 339 # Send all of the keys in quiet 340 for opaque,kv in opaqued.iteritems(): 341 self._sendCmd(memcacheConstants.CMD_SETQ, kv[0], kv[1], opaque, extra) 342 343 self._sendCmd(memcacheConstants.CMD_NOOP, '', '', terminal) 344 345 # Handle the response 346 failed = [] 347 done=False 348 while not done: 349 try: 350 opaque, cas, data = self._handleSingleResponse(None) 351 done = opaque == terminal 352 except MemcachedError, e: 353 failed.append(e) 354 355 return failed 356 357 def delMulti(self, items): 358 """Multi-delete (using delq). 359 360 Give me a collection of keys.""" 361 362 opaqued = dict(enumerate(items)) 363 terminal = len(opaqued)+10 364 extra = '' 365 366 # Send all of the keys in quiet 367 for opaque, k in opaqued.iteritems(): 368 self._sendCmd(memcacheConstants.CMD_DELETEQ, k, '', opaque, extra) 369 370 self._sendCmd(memcacheConstants.CMD_NOOP, '', '', terminal) 371 372 # Handle the response 373 failed = [] 374 done=False 375 while not done: 376 try: 377 opaque, cas, data = self._handleSingleResponse(None) 378 done = opaque == terminal 379 except MemcachedError, e: 380 failed.append(e) 381 382 return failed 383 384 def stats(self, sub=''): 385 """Get stats.""" 386 opaque=self.r.randint(0, 2**32) 387 self._sendCmd(memcacheConstants.CMD_STAT, sub, '', opaque) 388 done = False 389 rv = {} 390 while not done: 391 cmd, opaque, cas, klen, extralen, data = self._handleKeyedResponse(None) 392 if klen: 393 rv[data[0:klen]] = data[klen:] 394 else: 395 done = True 396 return rv 397 398 def noop(self): 399 """Send a noop command.""" 400 return self._doCmd(memcacheConstants.CMD_NOOP, '', '') 401 402 def delete(self, key, cas=0): 403 """Delete the value for a given key within the memcached server.""" 404 return self._doCmd(memcacheConstants.CMD_DELETE, key, '', '', cas) 405 406 def flush(self, timebomb=0): 407 """Flush all storage in a memcached instance.""" 408 return self._doCmd(memcacheConstants.CMD_FLUSH, '', '', 409 struct.pack(memcacheConstants.FLUSH_PKT_FMT, timebomb)) 410 411 def bucket_select(self, name): 412 return self._doCmd(memcacheConstants.CMD_SELECT_BUCKET, name, '') 413 414 def deregister_tap_client(self, tap_name): 415 """Deregister the TAP client with a given name.""" 416 return self._doCmd(memcacheConstants.CMD_DEREGISTER_TAP_CLIENT, tap_name, '', '', 0) 417 418 def reset_replication_chain(self): 419 """Reset the replication chain.""" 420 return self._doCmd(memcacheConstants.CMD_RESET_REPLICATION_CHAIN, '', '', '', 0) 421