1#!/usr/bin/env python 2""" 3Binary memcached test client. 4 5Copyright (c) 2007 Dustin Sallings <dustin@spy.net> 6""" 7 8import hmac 9import socket 10import random 11import struct 12import exceptions 13 14from couchbaseConstants import REQ_MAGIC_BYTE, RES_MAGIC_BYTE 15from couchbaseConstants import REQ_PKT_FMT, RES_PKT_FMT, MIN_RECV_PACKET 16from couchbaseConstants import SET_PKT_FMT, INCRDECR_RES_FMT 17import couchbaseConstants 18 19class MemcachedError(exceptions.Exception): 20 """Error raised when a command fails.""" 21 22 def __init__(self, status, msg): 23 supermsg='Memcached error #' + `status` 24 if msg: supermsg += ": " + msg 25 exceptions.Exception.__init__(self, supermsg) 26 27 self.status=status 28 self.msg=msg 29 30 def __repr__(self): 31 return "<MemcachedError #%d ``%s''>" % (self.status, self.msg) 32 33class MemcachedClient(object): 34 """Simple memcached client.""" 35 36 vbucketId = 0 37 38 def __init__(self, host='127.0.0.1', port=11211, family=socket.AF_INET): 39 self.host = host 40 self.port = port 41 self.s=socket.socket(family, socket.SOCK_STREAM) 42 if hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 43 self.s.connect_ex(host) 44 else: 45 self.s.connect_ex((host, port)) 46 self.r=random.Random() 47 48 def close(self): 49 self.s.close() 50 51 def __del__(self): 52 self.close() 53 54 def _sendCmd(self, cmd, key, val, opaque, extraHeader='', cas=0): 55 self._sendMsg(cmd, key, val, opaque, extraHeader=extraHeader, cas=cas, 56 vbucketId=self.vbucketId) 57 58 def _sendMsg(self, cmd, key, val, opaque, extraHeader='', cas=0, 59 dtype=0, vbucketId=0, 60 fmt=REQ_PKT_FMT, magic=REQ_MAGIC_BYTE): 61 msg=struct.pack(fmt, magic, 62 cmd, len(key), len(extraHeader), dtype, vbucketId, 63 len(key) + len(extraHeader) + len(val), opaque, cas) 64 self.s.send(msg + extraHeader + key + val) 65 66 def _recvMsg(self): 67 response = "" 68 while len(response) < MIN_RECV_PACKET: 69 data = self.s.recv(MIN_RECV_PACKET - len(response)) 70 if data == '': 71 raise exceptions.EOFError("Got empty data (remote died?).") 72 response += data 73 assert len(response) == MIN_RECV_PACKET 74 magic, cmd, keylen, extralen, dtype, errcode, remaining, opaque, cas=\ 75 struct.unpack(RES_PKT_FMT, response) 76 77 rv = "" 78 while remaining > 0: 79 data = self.s.recv(remaining) 80 if data == '': 81 raise exceptions.EOFError("Got empty data (remote died?).") 82 rv += data 83 remaining -= len(data) 84 85 assert (magic in (RES_MAGIC_BYTE, REQ_MAGIC_BYTE)), "Got magic: %d" % magic 86 return cmd, errcode, opaque, cas, keylen, extralen, rv 87 88 def _handleKeyedResponse(self, myopaque): 89 cmd, errcode, opaque, cas, keylen, extralen, rv = self._recvMsg() 90 assert myopaque is None or opaque == myopaque, \ 91 "expected opaque %x, got %x" % (myopaque, opaque) 92 if errcode != 0: 93 raise MemcachedError(errcode, rv) 94 return cmd, opaque, cas, keylen, extralen, rv 95 96 def _handleSingleResponse(self, myopaque): 97 cmd, opaque, cas, keylen, extralen, data = self._handleKeyedResponse(myopaque) 98 return opaque, cas, data 99 100 def _doCmd(self, cmd, key, val, extraHeader='', cas=0): 101 """Send a command and await its response.""" 102 opaque=self.r.randint(0, 2**32) 103 self._sendCmd(cmd, key, val, opaque, extraHeader, cas) 104 return self._handleSingleResponse(opaque) 105 106 def _mutate(self, cmd, key, exp, flags, cas, val): 107 return self._doCmd(cmd, key, val, struct.pack(SET_PKT_FMT, flags, exp), 108 cas) 109 110 def _cat(self, cmd, key, cas, val): 111 return self._doCmd(cmd, key, val, '', cas) 112 113 def append(self, key, value, cas=0): 114 return self._cat(couchbaseConstants.CMD_APPEND, key, cas, value) 115 116 def prepend(self, key, value, cas=0): 117 return self._cat(couchbaseConstants.CMD_PREPEND, key, cas, value) 118 119 def __incrdecr(self, cmd, key, amt, init, exp): 120 something, cas, val=self._doCmd(cmd, key, '', 121 struct.pack(couchbaseConstants.INCRDECR_PKT_FMT, amt, init, exp)) 122 return struct.unpack(INCRDECR_RES_FMT, val)[0], cas 123 124 def incr(self, key, amt=1, init=0, exp=0): 125 """Increment or create the named counter.""" 126 return self.__incrdecr(couchbaseConstants.CMD_INCR, key, amt, init, exp) 127 128 def decr(self, key, amt=1, init=0, exp=0): 129 """Decrement or create the named counter.""" 130 return self.__incrdecr(couchbaseConstants.CMD_DECR, key, amt, init, exp) 131 132 def _doMetaCmd(self, cmd, key, value, cas, exp, flags, seqno, remote_cas): 133 extra = struct.pack('>IIQQ', flags, exp, seqno, remote_cas) 134 return self._doCmd(cmd, key, value, extra, cas) 135 136 def _doRevCmd(self, cmd, key, exp, flags, value, rev, cas=0): 137 seqno, revid = rev 138 meta_data = struct.pack('>I', seqno) + revid 139 meta_type = couchbaseConstants.META_REVID 140 meta = (meta_type, meta_data) 141 return self._doMetaCmd(cmd, key, exp, flags, value, meta, cas) 142 143 def set(self, key, exp, flags, val): 144 """Set a value in the memcached server.""" 145 return self._mutate(couchbaseConstants.CMD_SET, key, exp, flags, 0, val) 146 147 def setWithMeta(self, key, value, exp, flags, seqno, remote_cas): 148 """Set a value and its meta data in the memcached server.""" 149 return self._doMetaCmd(couchbaseConstants.CMD_SET_WITH_META, 150 key, value, 0, exp, flags, seqno, remote_cas) 151 152 def setWithRev(self, key, exp, flags, value, rev): 153 """Set a value and its revision in the memcached server.""" 154 return self._doRevCmd(couchbaseConstants.CMD_SET_WITH_META, 155 key, exp, flags, value, rev) 156 157 def add(self, key, exp, flags, val): 158 """Add a value in the memcached server iff it doesn't already exist.""" 159 return self._mutate(couchbaseConstants.CMD_ADD, key, exp, flags, 0, val) 160 161 def addWithMeta(self, key, value, exp, flags, seqno, remote_cas): 162 return self._doMetaCmd(couchbaseConstants.CMD_ADD_WITH_META, 163 key, value, 0, exp, flags, seqno, remote_cas) 164 165 def addWithRev(self, key, exp, flags, value, rev): 166 return self._doRevCmd(couchbaseConstants.CMD_ADD_WITH_META, 167 key, exp, flags, value, rev) 168 169 def replace(self, key, exp, flags, val): 170 """Replace a value in the memcached server iff it already exists.""" 171 return self._mutate(couchbaseConstants.CMD_REPLACE, key, exp, flags, 0, 172 val) 173 174 def observe(self, key, vbucket): 175 """Observe a key for persistence and replication.""" 176 value = struct.pack('>HH', vbucket, len(key)) + key 177 opaque, cas, data = self._doCmd(couchbaseConstants.CMD_OBSERVE, '', value) 178 rep_time = (cas & 0xFFFFFFFF) 179 persist_time = (cas >> 32) & 0xFFFFFFFF 180 persisted = struct.unpack('>B', data[4+len(key)])[0] 181 return opaque, rep_time, persist_time, persisted 182 183 def __parseGet(self, data, klen=0): 184 flags=struct.unpack(couchbaseConstants.GET_RES_FMT, data[-1][:4])[0] 185 return flags, data[1], data[-1][4 + klen:] 186 187 def get(self, key): 188 """Get the value for a given key within the memcached server.""" 189 parts=self._doCmd(couchbaseConstants.CMD_GET, key, '') 190 return self.__parseGet(parts) 191 192 def getMeta(self, key): 193 """Get the metadata for a given key within the memcached server.""" 194 opaque, cas, data = self._doCmd(couchbaseConstants.CMD_GET_META, key, '') 195 deleted = struct.unpack('>I', data[0:4])[0] 196 flags = struct.unpack('>I', data[4:8])[0] 197 exp = struct.unpack('>I', data[8:12])[0] 198 seqno = struct.unpack('>Q', data[12:20])[0] 199 return (deleted, flags, exp, seqno, cas) 200 201 def getl(self, key, exp=15): 202 """Get the value for a given key within the memcached server.""" 203 parts=self._doCmd(couchbaseConstants.CMD_GET_LOCKED, key, '', 204 struct.pack(couchbaseConstants.GETL_PKT_FMT, exp)) 205 return self.__parseGet(parts) 206 207 def cas(self, key, exp, flags, oldVal, val): 208 """CAS in a new value for the given key and comparison value.""" 209 self._mutate(couchbaseConstants.CMD_SET, key, exp, flags, 210 oldVal, val) 211 212 def touch(self, key, exp): 213 """Touch a key in the memcached server.""" 214 return self._doCmd(couchbaseConstants.CMD_TOUCH, key, '', 215 struct.pack(couchbaseConstants.TOUCH_PKT_FMT, exp)) 216 217 def gat(self, key, exp): 218 """Get the value for a given key and touch it within the memcached server.""" 219 parts=self._doCmd(couchbaseConstants.CMD_GAT, key, '', 220 struct.pack(couchbaseConstants.GAT_PKT_FMT, exp)) 221 return self.__parseGet(parts) 222 223 def getr(self, key): 224 """Get the value for a given key in a replica vbucket within the memcached server.""" 225 parts=self._doCmd(couchbaseConstants.CMD_GET_REPLICA, key, '') 226 return self.__parseGet(parts, len(key)) 227 228 def version(self): 229 """Get the value for a given key within the memcached server.""" 230 return self._doCmd(couchbaseConstants.CMD_VERSION, '', '') 231 232 def verbose(self, level): 233 """Set the verbosity level.""" 234 return self._doCmd(couchbaseConstants.CMD_VERBOSE, '', '', 235 extraHeader=struct.pack(">I", level)) 236 237 def sasl_mechanisms(self): 238 """Get the supported SASL methods.""" 239 return set(self._doCmd(couchbaseConstants.CMD_SASL_LIST_MECHS, 240 '', '')[2].split(' ')) 241 242 def sasl_auth_start(self, mech, data): 243 """Start a sasl auth session.""" 244 return self._doCmd(couchbaseConstants.CMD_SASL_AUTH, mech, data) 245 246 def sasl_auth_plain(self, user, password, foruser=''): 247 """Perform plain auth.""" 248 return self.sasl_auth_start('PLAIN', '\0'.join([foruser, user, password])) 249 250 def sasl_auth_cram_md5(self, user, password): 251 """Start a plan auth session.""" 252 challenge = "" 253 try: 254 self.sasl_auth_start('CRAM-MD5', '') 255 except MemcachedError, e: 256 if e.status != couchbaseConstants.ERR_AUTH_CONTINUE: 257 raise 258 challenge = e.msg 259 260 dig = hmac.HMAC(password, challenge).hexdigest() 261 return self._doCmd(couchbaseConstants.CMD_SASL_STEP, 'CRAM-MD5', 262 user + ' ' + dig) 263 264 def stop_persistence(self): 265 return self._doCmd(couchbaseConstants.CMD_STOP_PERSISTENCE, '', '') 266 267 def start_persistence(self): 268 return self._doCmd(couchbaseConstants.CMD_START_PERSISTENCE, '', '') 269 270 def set_param(self, key, val, type): 271 print "setting param:", key, val 272 type = struct.pack(couchbaseConstants.SET_PARAM_FMT, type) 273 return self._doCmd(couchbaseConstants.CMD_SET_PARAM, key, val, type) 274 275 def set_vbucket_state(self, vbucket, stateName): 276 assert isinstance(vbucket, int) 277 self.vbucketId = vbucket 278 state = struct.pack(couchbaseConstants.VB_SET_PKT_FMT, 279 couchbaseConstants.VB_STATE_NAMES[stateName]) 280 return self._doCmd(couchbaseConstants.CMD_SET_VBUCKET_STATE, '', '', state) 281 282 def get_vbucket_state(self, vbucket): 283 assert isinstance(vbucket, int) 284 self.vbucketId = vbucket 285 return self._doCmd(couchbaseConstants.CMD_GET_VBUCKET_STATE, '', '') 286 287 def delete_vbucket(self, vbucket): 288 assert isinstance(vbucket, int) 289 self.vbucketId = vbucket 290 return self._doCmd(couchbaseConstants.CMD_DELETE_VBUCKET, '', '') 291 292 def evict_key(self, key): 293 return self._doCmd(couchbaseConstants.CMD_EVICT_KEY, key, '') 294 295 def getMulti(self, keys): 296 """Get values for any available keys in the given iterable. 297 298 Returns a dict of matched keys to their values.""" 299 opaqued=dict(enumerate(keys)) 300 terminal=len(opaqued)+10 301 # Send all of the keys in quiet 302 for k,v in opaqued.iteritems(): 303 self._sendCmd(couchbaseConstants.CMD_GETQ, v, '', k) 304 305 self._sendCmd(couchbaseConstants.CMD_NOOP, '', '', terminal) 306 307 # Handle the response 308 rv={} 309 done=False 310 while not done: 311 opaque, cas, data=self._handleSingleResponse(None) 312 if opaque != terminal: 313 rv[opaqued[opaque]]=self.__parseGet((opaque, cas, data)) 314 else: 315 done=True 316 317 return rv 318 319 def setMulti(self, exp, flags, items): 320 """Multi-set (using setq). 321 322 Give me (key, value) pairs.""" 323 324 # If this is a dict, convert it to a pair generator 325 if hasattr(items, 'iteritems'): 326 items = items.iteritems() 327 328 opaqued=dict(enumerate(items)) 329 terminal=len(opaqued)+10 330 extra=struct.pack(SET_PKT_FMT, flags, exp) 331 332 # Send all of the keys in quiet 333 for opaque,kv in opaqued.iteritems(): 334 self._sendCmd(couchbaseConstants.CMD_SETQ, kv[0], kv[1], opaque, extra) 335 336 self._sendCmd(couchbaseConstants.CMD_NOOP, '', '', terminal) 337 338 # Handle the response 339 failed = [] 340 done=False 341 while not done: 342 try: 343 opaque, cas, data = self._handleSingleResponse(None) 344 done = opaque == terminal 345 except MemcachedError, e: 346 failed.append(e) 347 348 return failed 349 350 def delMulti(self, items): 351 """Multi-delete (using delq). 352 353 Give me a collection of keys.""" 354 355 opaqued = dict(enumerate(items)) 356 terminal = len(opaqued)+10 357 extra = '' 358 359 # Send all of the keys in quiet 360 for opaque, k in opaqued.iteritems(): 361 self._sendCmd(couchbaseConstants.CMD_DELETEQ, k, '', opaque, extra) 362 363 self._sendCmd(couchbaseConstants.CMD_NOOP, '', '', terminal) 364 365 # Handle the response 366 failed = [] 367 done=False 368 while not done: 369 try: 370 opaque, cas, data = self._handleSingleResponse(None) 371 done = opaque == terminal 372 except MemcachedError, e: 373 failed.append(e) 374 375 return failed 376 377 def stats(self, sub=''): 378 """Get stats.""" 379 opaque=self.r.randint(0, 2**32) 380 self._sendCmd(couchbaseConstants.CMD_STAT, sub, '', opaque) 381 done = False 382 rv = {} 383 while not done: 384 cmd, opaque, cas, klen, extralen, data = self._handleKeyedResponse(None) 385 if klen: 386 rv[data[0:klen]] = data[klen:] 387 else: 388 done = True 389 return rv 390 391 def noop(self): 392 """Send a noop command.""" 393 return self._doCmd(couchbaseConstants.CMD_NOOP, '', '') 394 395 def hello(self): 396 """Send a hello command for feature checking""" 397 #MB-11902 398 #return self._doCmd(couchbaseConstants.CMD_HELLO, '', struct.pack(">H", 1)) 399 return 0, 0, 0 400 401 def delete(self, key, cas=0): 402 """Delete the value for a given key within the memcached server.""" 403 return self._doCmd(couchbaseConstants.CMD_DELETE, key, '', '', cas) 404 405 def flush(self, timebomb=0): 406 """Flush all storage in a memcached instance.""" 407 return self._doCmd(couchbaseConstants.CMD_FLUSH, '', '', 408 struct.pack(couchbaseConstants.FLUSH_PKT_FMT, timebomb)) 409 410 def bucket_select(self, name): 411 return self._doCmd(couchbaseConstants.CMD_SELECT_BUCKET, name, '') 412 413 def restore_file(self, filename): 414 """Initiate restore of a given file.""" 415 return self._doCmd(couchbaseConstants.CMD_RESTORE_FILE, filename, '', '', 0) 416 417 def restore_complete(self): 418 """Notify the server that we're done restoring.""" 419 return self._doCmd(couchbaseConstants.CMD_RESTORE_COMPLETE, '', '', '', 0) 420 421 def deregister_tap_client(self, tap_name): 422 """Deregister the TAP client with a given name.""" 423 return self._doCmd(couchbaseConstants.CMD_DEREGISTER_TAP_CLIENT, tap_name, '', '', 0) 424 425 def reset_replication_chain(self): 426 """Reset the replication chain.""" 427 return self._doCmd(couchbaseConstants.CMD_RESET_REPLICATION_CHAIN, '', '', '', 0) 428