1#
2# Copyright 2012, Couchbase, Inc.
3# All Rights Reserved
4#
5# Licensed under the Apache License, Version 2.0 (the "License")
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18from Queue import Queue, Full, Empty
19from threading import Thread, Event, Lock
20from exception import MemcachedTimeoutException, InvalidArgumentException
21
22import logger
23import hmac
24import socket
25import random
26import exceptions
27import zlib
28import struct
29import urllib
30import warnings
31try:
32    import json
33except:
34    import simplejson as json
35from copy import deepcopy
36from rest_client import RestHelper, RestConnection
37
38
39class MemcachedConstants(object):
40    # Command constants
41    CMD_GET = 0
42    CMD_SET = 1
43    CMD_ADD = 2
44    CMD_REPLACE = 3
45    CMD_DELETE = 4
46    CMD_INCR = 5
47    CMD_DECR = 6
48    CMD_QUIT = 7
49    CMD_FLUSH = 8
50    CMD_GETQ = 9
51    CMD_NOOP = 10
52    CMD_VERSION = 11
53    CMD_STAT = 0x10
54    CMD_APPEND = 0x0e
55    CMD_PREPEND = 0x0f
56    CMD_TOUCH = 0x1c
57    CMD_GAT = 0x1d
58
59    # SASL stuff
60    CMD_SASL_LIST_MECHS = 0x20
61    CMD_SASL_AUTH = 0x21
62    CMD_SASL_STEP = 0x22
63
64    # Bucket extension
65    CMD_CREATE_BUCKET = 0x85
66    CMD_DELETE_BUCKET = 0x86
67    CMD_LIST_BUCKETS = 0x87
68    CMD_EXPAND_BUCKET = 0x88
69    CMD_SELECT_BUCKET = 0x89
70
71    CMD_STOP_PERSISTENCE = 0x80
72    CMD_START_PERSISTENCE = 0x81
73    CMD_SET_FLUSH_PARAM = 0x82
74    CMD_RESTORE_FILE = 0x83
75    CMD_RESTORE_ABORT = 0x84
76    CMD_RESTORE_COMPLETE = 0x85
77    #Online update
78    CMD_START_ONLINEUPDATE = 0x86
79    CMD_COMPLETE_ONLINEUPDATE = 0x87
80    CMD_REVERT_ONLINEUPDATE = 0x88
81
82    CMD_START_REPLICATION = 0x90
83    CMD_STOP_REPLICATION = 0x91
84    CMD_SET_TAP_PARAM = 0x92
85    CMD_EVICT_KEY = 0x93
86
87    # Replication
88    CMD_TAP_CONNECT = 0x40
89    CMD_TAP_MUTATION = 0x41
90    CMD_TAP_DELETE = 0x42
91    CMD_TAP_FLUSH = 0x43
92    CMD_TAP_OPAQUE = 0x44
93    CMD_TAP_VBUCKET_SET = 0x45
94    CMD_TAP_CHECKPOINT_START = 0x46
95    CMD_TAP_CHECKPOINT_END = 0x47
96
97    # vbucket stuff
98    CMD_SET_VBUCKET_STATE = 0x3d
99    CMD_GET_VBUCKET_STATE = 0x3e
100    CMD_DELETE_VBUCKET = 0x3f
101
102    CMD_GET_LOCKED = 0x94
103
104    CMD_SYNC = 0x96
105
106    # TAP client registration
107    CMD_DEREGISTER_TAP_CLIENT = 0x89
108
109    # event IDs for the SYNC command responses
110    CMD_SYNC_EVENT_PERSISTED = 1
111    CMD_SYNC_EVENT_MODIFED = 2
112    CMD_SYNC_EVENT_DELETED = 3
113    CMD_SYNC_EVENT_REPLICATED = 4
114    CMD_SYNC_INVALID_KEY = 5
115    CMD_SYNC_INVALID_CAS = 6
116
117    VB_STATE_ACTIVE = 1
118    VB_STATE_REPLICA = 2
119    VB_STATE_PENDING = 3
120    VB_STATE_DEAD = 4
121    VB_STATE_NAMES = {'active': VB_STATE_ACTIVE,
122                      'replica': VB_STATE_REPLICA,
123                      'pending': VB_STATE_PENDING,
124                      'dead': VB_STATE_DEAD}
125
126    COMMAND_NAMES = (dict(((globals()[k], k) for k in globals()
127                     if k.startswith("CMD_"))))
128
129    # TAP_OPAQUE types
130    TAP_OPAQUE_ENABLE_AUTO_NACK = 0
131    TAP_OPAQUE_INITIAL_VBUCKET_STREAM = 1
132    TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC = 2
133    TAP_OPAQUE_OPEN_CHECKPOINT = 3
134
135    # TAP connect flags
136    TAP_FLAG_BACKFILL = 0x01
137    TAP_FLAG_DUMP = 0x02
138    TAP_FLAG_LIST_VBUCKETS = 0x04
139    TAP_FLAG_TAKEOVER_VBUCKETS = 0x08
140    TAP_FLAG_SUPPORT_ACK = 0x10
141    TAP_FLAG_REQUEST_KEYS_ONLY = 0x20
142    TAP_FLAG_CHECKPOINT = 0x40
143    TAP_FLAG_REGISTERED_CLIENT = 0x80
144
145    TAP_FLAG_TYPES = {TAP_FLAG_BACKFILL: ">Q",
146                      TAP_FLAG_REGISTERED_CLIENT: ">B"}
147
148    # TAP per-message flags
149    TAP_FLAG_ACK = 0x01
150    TAP_FLAG_NO_VALUE = 0x02  # The value for key is not included in the packet
151
152    # Flags, expiration
153    SET_PKT_FMT = ">II"
154
155    # flags
156    GET_RES_FMT = ">I"
157
158    # How long until the deletion takes effect.
159    DEL_PKT_FMT = ""
160
161    ## TAP stuff
162    # eng-specific length, flags, ttl, [res, res, res]; item flags, exp
163    TAP_MUTATION_PKT_FMT = ">HHbxxxII"
164    TAP_GENERAL_PKT_FMT = ">HHbxxx"
165
166    # amount, initial value, expiration
167    INCRDECR_PKT_FMT = ">QQI"
168    # Special incr expiration that means do not store
169    INCRDECR_SPECIAL = 0xffffffff
170    INCRDECR_RES_FMT = ">Q"
171
172    # Time bomb
173    FLUSH_PKT_FMT = ">I"
174
175    # Touch commands
176    # expiration
177    TOUCH_PKT_FMT = ">I"
178    GAT_PKT_FMT = ">I"
179    GETL_PKT_FMT = "I"
180
181    # 2 bit integer.  :/
182    VB_SET_PKT_FMT = ">I"
183
184    MAGIC_BYTE = 0x80
185    REQ_MAGIC_BYTE = 0x80
186    RES_MAGIC_BYTE = 0x81
187
188    # magic, opcode, keylen, extralen, datatype, vbucket, bodylen, opaque, cas
189    REQ_PKT_FMT = ">BBHBBHIIQ"
190    # magic, opcode, keylen, extralen, datatype, status, bodylen, opaque, cas
191    RES_PKT_FMT = ">BBHBBHIIQ"
192    # min recv packet size
193    MIN_RECV_PACKET = struct.calcsize(REQ_PKT_FMT)
194    # The header sizes don't deviate
195    assert struct.calcsize(REQ_PKT_FMT) == struct.calcsize(RES_PKT_FMT)
196
197    EXTRA_HDR_FMTS = {
198        CMD_SET: SET_PKT_FMT,
199        CMD_ADD: SET_PKT_FMT,
200        CMD_REPLACE: SET_PKT_FMT,
201        CMD_INCR: INCRDECR_PKT_FMT,
202        CMD_DECR: INCRDECR_PKT_FMT,
203        CMD_DELETE: DEL_PKT_FMT,
204        CMD_FLUSH: FLUSH_PKT_FMT,
205        CMD_TAP_MUTATION: TAP_MUTATION_PKT_FMT,
206        CMD_TAP_DELETE: TAP_GENERAL_PKT_FMT,
207        CMD_TAP_FLUSH: TAP_GENERAL_PKT_FMT,
208        CMD_TAP_OPAQUE: TAP_GENERAL_PKT_FMT,
209        CMD_TAP_VBUCKET_SET: TAP_GENERAL_PKT_FMT,
210        CMD_SET_VBUCKET_STATE: VB_SET_PKT_FMT,
211        }
212
213    EXTRA_HDR_SIZES = dict(
214        [(k, struct.calcsize(v)) for (k, v) in EXTRA_HDR_FMTS.items()])
215
216    ERR_UNKNOWN_CMD = 0x81
217    ERR_NOT_FOUND = 0x1
218    ERR_EXISTS = 0x2
219    ERR_AUTH = 0x20
220    ERR_AUTH_CONTINUE = 0x21
221
222
223class MemcachedError(exceptions.Exception):
224    """Error raised when a command fails."""
225
226    def __init__(self, status, msg):
227        supermsg = 'Memcached error #' + repr(status)
228        if msg:
229            supermsg += ":  " + msg
230        exceptions.Exception.__init__(self, supermsg)
231
232        self.status = status
233        self.msg = msg
234
235    def __repr__(self):
236        return "<MemcachedError #%d ``%s''>" % (self.status, self.msg)
237
238
239class MemcachedClient(object):
240    """Simple memcached client."""
241
242    vbucketId = 0
243
244    def __init__(self, host='127.0.0.1', port=11211):
245        self.host = host
246        self.port = port
247        self.s = socket.socket()
248        self.s.connect_ex((host, port))
249        self.r = random.Random()
250        self.log = logger.logger("MemcachedClient")
251        self.vbucket_count = 1024
252
253    def close(self):
254        self.s.close()
255
256    def __del__(self):
257        self.close()
258
259    def _sendCmd(self, cmd, key, val, opaque, extraHeader='', cas=0):
260        self._sendMsg(cmd, key, val, opaque, extraHeader=extraHeader, cas=cas,
261                      vbucketId=self.vbucketId)
262
263    def _sendMsg(self, cmd, key, val, opaque, extraHeader='', cas=0,
264                 dtype=0, vbucketId=0,
265                 fmt=MemcachedConstants.REQ_PKT_FMT,
266                 magic=MemcachedConstants.REQ_MAGIC_BYTE):
267        msg = struct.pack(fmt, magic,
268                          cmd, len(key), len(extraHeader), dtype, vbucketId,
269                          len(key) + len(extraHeader) + len(val), opaque, cas)
270        self.s.send(msg + extraHeader + key + val)
271
272    def _recvMsg(self):
273        response = ""
274        while len(response) < MemcachedConstants.MIN_RECV_PACKET:
275            data = self.s.recv(MemcachedConstants.MIN_RECV_PACKET
276                               - len(response))
277            if data == '':
278                raise exceptions.EOFError("Got empty data (remote died?)."
279                                          " from %s" % (self.host))
280            response += data
281        assert len(response) == MemcachedConstants.MIN_RECV_PACKET
282        magic, cmd, keylen, extralen, dtype, errcode, remaining, opaque, cas =\
283        struct.unpack(MemcachedConstants.RES_PKT_FMT, response)
284
285        rv = ""
286        while remaining > 0:
287            data = self.s.recv(remaining)
288            if data == '':
289                raise exceptions.EOFError("Got empty data (remote died?)."
290                                          " from %s" % (self.host))
291            rv += data
292            remaining -= len(data)
293
294        assert (magic in (MemcachedConstants.RES_MAGIC_BYTE,
295                          MemcachedConstants.REQ_MAGIC_BYTE)),\
296            "Got magic: %d" % magic
297        return cmd, errcode, opaque, cas, keylen, extralen, rv
298
299    def _handleKeyedResponse(self, myopaque):
300        cmd, errcode, opaque, cas, keylen, extralen, rv = self._recvMsg()
301        assert myopaque is None or opaque == myopaque, \
302        "expected opaque %x, got %x" % (myopaque, opaque)
303        if errcode:
304            raise MemcachedError(errcode, rv)
305        return cmd, opaque, cas, keylen, extralen, rv
306
307    def _handleSingleResponse(self, myopaque):
308        cmd, opaque, cas, keylen, extralen, data =\
309            self._handleKeyedResponse(myopaque)
310        return opaque, cas, data
311
312    def _doCmd(self, cmd, key, val, extraHeader='', cas=0):
313        """Send a command and await its response."""
314        opaque = self.r.randint(0, 2 ** 32)
315        self._sendCmd(cmd, key, val, opaque, extraHeader, cas)
316        return self._handleSingleResponse(opaque)
317
318    def _mutate(self, cmd, key, exp, flags, cas, val):
319        return self._doCmd(cmd, key, val,
320                           struct.pack(MemcachedConstants.SET_PKT_FMT, flags,
321                                       exp), cas)
322
323    def _cat(self, cmd, key, cas, val):
324        return self._doCmd(cmd, key, val, '', cas)
325
326    def append(self, key, value, cas=0, vbucket=-1):
327        self._set_vbucket_id(key, vbucket)
328        return self._cat(MemcachedConstants.CMD_APPEND, key, cas, value)
329
330    def prepend(self, key, value, cas=0, vbucket=-1):
331        self._set_vbucket_id(key, vbucket)
332        return self._cat(MemcachedConstants.CMD_PREPEND, key, cas, value)
333
334    def __incrdecr(self, cmd, key, amt, init, exp):
335        something, cas, val =\
336            self._doCmd(cmd, key, '',
337                        struct.pack(MemcachedConstants.INCRDECR_PKT_FMT,
338                                amt, init, exp))
339        return struct.unpack(MemcachedConstants.INCRDECR_RES_FMT, val)[0], cas
340
341    def incr(self, key, amt=1, init=0, exp=0, vbucket=-1):
342        """Increment or create the named counter."""
343        self._set_vbucket_id(key, vbucket)
344        return self.__incrdecr(MemcachedConstants.CMD_INCR, key, amt, init,
345                               exp)
346
347    def decr(self, key, amt=1, init=0, exp=0, vbucket=-1):
348        """Decrement or create the named counter."""
349        self._set_vbucket_id(key, vbucket)
350        return self.__incrdecr(MemcachedConstants.CMD_DECR, key, amt, init,
351                               exp)
352
353    def _set_vbucket_id(self, key, vbucket):
354        if vbucket == -1:
355            self.vbucketId = (zlib.crc32(key) >> 16) & (self.vbucket_count - 1)
356        else:
357            self.vbucketId = vbucket
358
359    def set(self, key, exp, flags, val, vbucket=-1):
360        """Set a value in the memcached server."""
361        self._set_vbucket_id(key, vbucket)
362        return self._mutate(MemcachedConstants.CMD_SET, key, exp, flags, 0,
363                            val)
364
365    def add(self, key, exp, flags, val, vbucket=-1):
366        """Add a value in the memcached server iff it doesn't already exist."""
367        self._set_vbucket_id(key, vbucket)
368        return self._mutate(MemcachedConstants.CMD_ADD, key, exp, flags, 0,
369                            val)
370
371    def replace(self, key, exp, flags, val, vbucket=-1):
372        """Replace a value in the memcached server iff it already exists."""
373        self._set_vbucket_id(key, vbucket)
374        return self._mutate(MemcachedConstants.CMD_REPLACE, key, exp, flags, 0,
375                            val)
376
377    def __parseGet(self, data, klen=0):
378        flags = struct.unpack(MemcachedConstants.GET_RES_FMT, data[-1][:4])[0]
379        return flags, data[1], data[-1][4 + klen:]
380
381    def get(self, key, vbucket=-1):
382        """Get the value for a given key within the memcached server."""
383        self._set_vbucket_id(key, vbucket)
384        parts = self._doCmd(MemcachedConstants.CMD_GET, key, '')
385
386        return self.__parseGet(parts)
387
388    def getl(self, key, exp=15, vbucket=-1):
389        """Get the value for a given key within the memcached server."""
390        self._set_vbucket_id(key, vbucket)
391        parts = self._doCmd(MemcachedConstants.CMD_GET_LOCKED, key, '',
392                            struct.pack(MemcachedConstants.GETL_PKT_FMT, exp))
393        return self.__parseGet(parts)
394
395    def cas(self, key, exp, flags, oldVal, val, vbucket=-1):
396        """CAS in a new value for the given key and comparison value."""
397        self._set_vbucket_id(key, vbucket)
398        self._mutate(MemcachedConstants.CMD_SET, key, exp, flags,
399                     oldVal, val)
400
401    def touch(self, key, exp, vbucket=-1):
402        """Touch a key in the memcached server."""
403        self._set_vbucket_id(key, vbucket)
404        return self._doCmd(MemcachedConstants.CMD_TOUCH, key, '',
405                           struct.pack(MemcachedConstants.TOUCH_PKT_FMT, exp))
406
407    def gat(self, key, exp, vbucket=-1):
408        """Get the value for a given key and touch it."""
409        self._set_vbucket_id(key, vbucket)
410        parts = self._doCmd(MemcachedConstants.CMD_GAT, key, '',
411                            struct.pack(MemcachedConstants.GAT_PKT_FMT, exp))
412        return self.__parseGet(parts)
413
414    def version(self):
415        """Get the version of the server."""
416        return self._doCmd(MemcachedConstants.CMD_VERSION, '', '')
417
418    def sasl_mechanisms(self):
419        """Get the supported SASL methods."""
420
421        return set(self._doCmd(MemcachedConstants.CMD_SASL_LIST_MECHS,
422                               '', '')[2].split(' '))
423
424    def sasl_auth_start(self, mech, data):
425        """Start a sasl auth session."""
426        return self._doCmd(MemcachedConstants.CMD_SASL_AUTH, mech, data)
427
428    def sasl_auth_plain(self, user, password, foruser=''):
429        """Perform plain auth."""
430        return self.sasl_auth_start('PLAIN', '\0'.join([foruser, user,
431                                                        password]))
432
433    def sasl_auth_cram_md5(self, user, password):
434        """Start a plan auth session."""
435        challenge = None
436        try:
437            self.sasl_auth_start('CRAM-MD5', '')
438        except MemcachedError, e:
439            if e.status != MemcachedConstants.ERR_AUTH_CONTINUE:
440                raise
441            challenge = e.msg
442
443        dig = hmac.HMAC(password, challenge).hexdigest()
444        return self._doCmd(MemcachedConstants.CMD_SASL_STEP, 'CRAM-MD5',
445                           user + ' ' + dig)
446
447    def stop_persistence(self):
448        return self._doCmd(MemcachedConstants.CMD_STOP_PERSISTENCE, '', '')
449
450    def start_persistence(self):
451        return self._doCmd(MemcachedConstants.CMD_START_PERSISTENCE, '', '')
452
453    def set_flush_param(self, key, val):
454        return self._doCmd(MemcachedConstants.CMD_SET_FLUSH_PARAM, key, val)
455
456    def stop_replication(self):
457        return self._doCmd(MemcachedConstants.CMD_STOP_REPLICATION, '', '')
458
459    def start_replication(self):
460        return self._doCmd(MemcachedConstants.CMD_START_REPLICATION, '', '')
461
462    def start_onlineupdate(self):
463        return self._doCmd(MemcachedConstants.CMD_START_ONLINEUPDATE, '', '')
464
465    def complete_onlineupdate(self):
466        return self._doCmd(MemcachedConstants.CMD_COMPLETE_ONLINEUPDATE, '',
467                           '')
468
469    def revert_onlineupdate(self):
470        return self._doCmd(MemcachedConstants.CMD_REVERT_ONLINEUPDATE, '', '')
471
472    def set_tap_param(self, key, val):
473        return self._doCmd(MemcachedConstants.CMD_SET_TAP_PARAM, key, val)
474
475    def set_vbucket_state(self, vbucket, stateName):
476        assert isinstance(vbucket, int)
477        self.vbucketId = vbucket
478        state = struct.pack(MemcachedConstants.VB_SET_PKT_FMT,
479                            MemcachedConstants.VB_STATE_NAMES[stateName])
480        return self._doCmd(MemcachedConstants.CMD_SET_VBUCKET_STATE, '',
481                           state)
482
483    def get_vbucket_state(self, vbucket):
484        return self._doCmd(MemcachedConstants.CMD_GET_VBUCKET_STATE,
485                           str(vbucket), '')
486
487    def delete_vbucket(self, vbucket):
488        assert isinstance(vbucket, int)
489        self.vbucketId = vbucket
490        return self._doCmd(MemcachedConstants.CMD_DELETE_VBUCKET, '', '')
491
492    def evict_key(self, key):
493        return self._doCmd(MemcachedConstants.CMD_EVICT_KEY, key, '')
494
495    def getMulti(self, keys):
496        """Get values for any available keys in the given iterable.
497
498        Returns a dict of matched keys to their values."""
499        opaqued = dict(enumerate(keys))
500        terminal = len(opaqued) + 10
501        # Send all of the keys in quiet
502        for k, v in opaqued.iteritems():
503            self._sendCmd(MemcachedConstants.CMD_GETQ, v, '', k)
504
505        self._sendCmd(MemcachedConstants.CMD_NOOP, '', '', terminal)
506
507        # Handle the response
508        rv = {}
509        done = False
510        while not done:
511            opaque, cas, data = self._handleSingleResponse(None)
512            if opaque != terminal:
513                rv[opaqued[opaque]] = self.__parseGet((opaque, cas, data))
514            else:
515                done = True
516
517        return rv
518
519    def stats(self, sub=''):
520        """Get stats."""
521        opaque = self.r.randint(0, 2 ** 32)
522        self._sendCmd(MemcachedConstants.CMD_STAT, sub, '', opaque)
523        done = False
524        rv = {}
525        while not done:
526            cmd, opaque, cas, klen, extralen, data =\
527                self._handleKeyedResponse(None)
528            if klen:
529                rv[data[0:klen]] = data[klen:]
530            else:
531                done = True
532        return rv
533
534    def noop(self):
535        """Send a noop command."""
536        return self._doCmd(MemcachedConstants.CMD_NOOP, '', '')
537
538    def delete(self, key, cas=0, vbucket=-1):
539        """Delete the value for a given key within the memcached server."""
540        self._set_vbucket_id(key, vbucket)
541        return self._doCmd(MemcachedConstants.CMD_DELETE, key, '', '', cas)
542
543    def flush(self, timebomb=0):
544        """Flush all storage in a memcached instance."""
545        return self._doCmd(MemcachedConstants.CMD_FLUSH, '', '',
546                           struct.pack(MemcachedConstants.FLUSH_PKT_FMT,
547                                       timebomb))
548
549    def bucket_select(self, name):
550        return self._doCmd(MemcachedConstants.CMD_SELECT_BUCKET, name, '')
551
552    def sync_persistence(self, keyspecs):
553        payload = self._build_sync_payload(0x8, keyspecs)
554        opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "",
555                                        payload)
556        return opaque, cas, self._parse_sync_response(data)
557
558    def sync_mutation(self, keyspecs):
559        payload = self._build_sync_payload(0x4, keyspecs)
560        opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "",
561                                        payload)
562        return opaque, cas, self._parse_sync_response(data)
563
564    def sync_replication(self, numReplicas, keyspecs):
565        payload = self._build_sync_payload((numReplicas & 0x0f) << 4, keyspecs)
566        opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "",
567                                        payload)
568        return opaque, cas, self._parse_sync_response(data)
569
570    def sync_replication_or_persistence(self, numReplicas, keyspecs):
571        payload = self._build_sync_payload(((numReplicas & 0x0f) << 4) |
572                                            0x8, keyspecs)
573
574        opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "",
575                                        payload)
576        return opaque, cas, self._parse_sync_response(data)
577
578    def sync_replication_and_persistence(self, numReplicas, keyspecs):
579        payload = self._build_sync_payload(((numReplicas & 0x0f) << 4) |
580                                            0xA, keyspecs)
581
582        opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "",
583                                        payload)
584        return opaque, cas, self._parse_sync_response(data)
585
586    def _build_sync_payload(self, flags, keyspecs):
587        payload = struct.pack(">I", flags)
588        payload += struct.pack(">H", len(keyspecs))
589
590        for spec in keyspecs:
591            if not isinstance(spec, dict):
592                raise TypeError("each keyspec must be a dict")
593            if 'vbucket' not in spec:
594                raise TypeError("missing vbucket property in keyspec")
595            if 'key' not in spec:
596                raise TypeError("missing key property in keyspec")
597
598            payload += struct.pack(">Q", spec.get('cas', 0))
599            payload += struct.pack(">H", spec['vbucket'])
600            payload += struct.pack(">H", len(spec['key']))
601            payload += spec['key']
602
603        return payload
604
605    def _parse_sync_response(self, data):
606        keyspecs = []
607        nkeys = struct.unpack(">H", data[0: struct.calcsize("H")])[0]
608        offset = struct.calcsize("H")
609        for i in xrange(nkeys):
610            spec = {}
611            width = struct.calcsize("QHHB")
612            spec['cas'], spec['vbucket'], keylen, eventid =\
613                struct.unpack(">QHHB", data[offset: offset + width])
614
615            offset += width
616            spec['key'] = data[offset: offset + keylen]
617            offset += keylen
618
619            if eventid == MemcachedConstants.CMD_SYNC_EVENT_PERSISTED:
620                spec['event'] = 'persisted'
621            elif eventid == MemcachedConstants.CMD_SYNC_EVENT_MODIFED:
622                spec['event'] = 'modified'
623            elif eventid == MemcachedConstants.CMD_SYNC_EVENT_DELETED:
624                spec['event'] = 'deleted'
625            elif eventid == MemcachedConstants.CMD_SYNC_EVENT_REPLICATED:
626                spec['event'] = 'replicated'
627            elif eventid == MemcachedConstants.CMD_SYNC_INVALID_KEY:
628                spec['event'] = 'invalid key'
629            elif spec['event'] == MemcachedConstants.CMD_SYNC_INVALID_CAS:
630                spec['event'] = 'invalid cas'
631            else:
632                spec['event'] = eventid
633
634            keyspecs.append(spec)
635        return keyspecs
636
637    def restore_file(self, filename):
638        """Initiate restore of a given file."""
639        return self._doCmd(MemcachedConstants.CMD_RESTORE_FILE, filename, '')
640
641    def restore_complete(self):
642        """Notify the server that we're done restoring."""
643        return self._doCmd(MemcachedConstants.CMD_RESTORE_COMPLETE, '', '')
644
645    def deregister_tap_client(self, tap_name):
646        """Deregister the TAP client with a given name."""
647        return self._doCmd(MemcachedConstants.CMD_DEREGISTER_TAP_CLIENT,
648                           tap_name, '')
649
650
651class CouchbaseClient(object):
652    #poll server every few seconds to see if the vbucket-map
653    #has changes
654    def __init__(self, url, bucket, password="", verbose=False):
655        self.log = logger.logger("CouchbaseClient")
656        self.bucket = bucket
657        self.rest_username = bucket
658        self.rest_password = password
659        self._memcacheds = {}
660        self._vBucketMap = {}
661        self._vBucketMap_lock = Lock()
662        self._vBucketMapFastForward = {}
663        self._vBucketMapFastForward_lock = Lock()
664        #TODO: use regular expressions to parse the url
665        server = {}
666        if not bucket:
667            raise InvalidArgumentException("bucket can not be an empty string",
668                                           parameters="bucket")
669        if not url:
670            raise InvalidArgumentException("url can not be an empty string",
671                                           parameters="url")
672        if (url.find("http://") != -1 and url.rfind(":") != -1 and
673            url.find("/pools/default") != -1):
674            server["ip"] = (url[url.find("http://")
675                            + len("http://"):url.rfind(":")])
676            server["port"] = url[url.rfind(":") + 1:url.find("/pools/default")]
677            server["username"] = self.rest_username
678            server["password"] = self.rest_password
679        else:
680            raise InvalidArgumentException("invalid url string",
681                                           parameters=url)
682        self.servers = [server]
683        self.servers_lock = Lock()
684        self.rest = RestConnection(server)
685        self.reconfig_vbucket_map()
686        self.init_vbucket_connections()
687        self.dispatcher = CommandDispatcher(self)
688        self.dispatcher_thread = Thread(name="dispatcher-thread",
689                                        target=self._start_dispatcher)
690        self.dispatcher_thread.daemon = True
691        self.dispatcher_thread.start()
692        self.streaming_thread = Thread(name="streaming",
693                                       target=self._start_streaming, args=())
694        self.streaming_thread.daemon = True
695        self.streaming_thread.start()
696        self.verbose = verbose
697
698    def _start_dispatcher(self):
699        self.dispatcher.dispatch()
700
701    def _start_streaming(self):
702        # This will dynamically update vBucketMap, vBucketMapFastForward,
703        # and servers
704        urlopener = urllib.FancyURLopener()
705        urlopener.prompt_user_passwd = lambda host, realm: (self.rest_username,
706                                                            self.rest_password)
707        current_servers = True
708        while current_servers:
709            self.servers_lock.acquire()
710            current_servers = deepcopy(self.servers)
711            self.servers_lock.release()
712            for server in current_servers:
713                response = urlopener.open("http://%s:%s/pools/default/bucketsS"
714                                          "treaming/%s" % (server["ip"],
715                                          server["port"], self.bucket))
716                while response:
717                    try:
718                        line = response.readline()
719                        if not line:
720                            # try next server if we get an EOF
721                            response.close()
722                            break
723                    except:
724                        # try next server if we fail to read
725                        response.close()
726                        break
727                    try:
728                        data = json.loads(line)
729                    except:
730                        continue
731
732                    serverlist = data['vBucketServerMap']['serverList']
733                    vbucketmapfastforward = {}
734                    index = 0
735                    if 'vBucketMapForward' in data['vBucketServerMap']:
736                        for vbucket in\
737                            data['vBucketServerMap']['vBucketMapForward']:
738                            vbucketmapfastforward[index] =\
739                                serverlist[vbucket[0]]
740                            index += 1
741                        self._vBucketMapFastForward_lock.acquire()
742                        self._vBucketMapFastForward =\
743                            deepcopy(vbucketmapfastforward)
744                        self._vBucketMapFastForward_lock.release()
745                    vbucketmap = {}
746                    index = 0
747                    for vbucket in data['vBucketServerMap']['vBucketMap']:
748                        vbucketmap[index] = serverlist[vbucket[0]]
749                        index += 1
750
751                    # only update vBucketMap if we don't have a fastforward
752                    # on a not_mb_vbucket error, we already update the
753                    # vBucketMap from the fastforward map
754                    if not vbucketmapfastforward:
755                        self._vBucketMap_lock.acquire()
756                        self._vBucketMap = deepcopy(vbucketmap)
757                        self._vBucketMap_lock.release()
758
759                    new_servers = []
760                    nodes = data["nodes"]
761                    for node in nodes:
762                        if (node["clusterMembership"] == "active" and
763                            node["status"] == "healthy"):
764                            ip, port = node["hostname"].split(":")
765                            new_servers.append({"ip": ip,
766                                                "port": port,
767                                                "username": self.rest_username,
768                                                "password": self.rest_password
769                                                })
770                    new_servers.sort()
771                    self.servers_lock.acquire()
772                    self.servers = deepcopy(new_servers)
773                    self.servers_lock.release()
774
775    def init_vbucket_connections(self):
776        # start up all vbucket connections
777        self._vBucketMap_lock.acquire()
778        vbucketcount = len(self._vBucketMap)
779        self._vBucketMap_lock.release()
780        for i in range(vbucketcount):
781            self.start_vbucket_connection(i)
782
783    def start_vbucket_connection(self, vbucket):
784        self._vBucketMap_lock.acquire()
785        server = deepcopy(self._vBucketMap[vbucket])
786        self._vBucketMap_lock.release()
787        serverIp, serverPort = server.split(":")
788        if not server in self._memcacheds:
789            self._memcacheds[server] =\
790                MemcachedClientHelper.direct_client(self.rest, serverIp,
791                                                    serverPort, self.bucket)
792
793    def start_vbucket_fastforward_connection(self, vbucket):
794        self._vBucketMapFastForward_lock.acquire()
795        if not vbucket in self._vBucketMapFastForward:
796            self._vBucketMapFastForward_lock.release()
797            return
798        server = deepcopy(self._vBucketMapFastForward[vbucket])
799        self._vBucketMapFastForward_lock.release()
800        serverIp, serverPort = server.split(":")
801        if not server in self._memcacheds:
802            self._memcacheds[server] =\
803                MemcachedClientHelper.direct_client(self.rest, serverIp,
804                                                    serverPort, self.bucket)
805
806    def restart_vbucket_connection(self, vbucket):
807        self._vBucketMap_lock.acquire()
808        server = deepcopy(self._vBucketMap[vbucket])
809        self._vBucketMap_lock.release()
810        serverIp, serverPort = server.split(":")
811        if server in self._memcacheds:
812            self._memcacheds[server].close()
813        self._memcacheds[server] =\
814            MemcachedClientHelper.direct_client(self.rest, serverIp,
815                                                serverPort, self.bucket)
816
817    def reconfig_vbucket_map(self, vbucket=-1):
818        vb_ready = RestHelper(self.rest).vbucket_map_ready(self.bucket, 60)
819        if not vb_ready:
820            raise Exception("vbucket map is not ready for bucket %s" %
821                            (self.bucket))
822        vBuckets = self.rest.get_vbuckets(self.bucket)
823        self.vbucket_count = len(vBuckets)
824
825        self._vBucketMap_lock.acquire()
826        for vBucket in vBuckets:
827            if vBucket.id == vbucket or vbucket == -1:
828                self._vBucketMap[vBucket.id] = vBucket.master
829        self._vBucketMap_lock.release()
830
831    def memcached(self, key, fastforward=False):
832        self._vBucketMap_lock.acquire()
833        self._vBucketMapFastForward_lock.acquire()
834        vBucketId = (zlib.crc32(key) >> 16) & (len(self._vBucketMap) - 1)
835
836        if fastforward and vBucketId in self._vBucketMapFastForward:
837            # only try the fastforward if we have an entry
838            # otherwise we just wait for the main map to update
839            self.start_vbucket_fastforward_connection(vBucketId)
840            self._vBucketMap[vBucketId] =\
841                self._vBucketMapFastForward[vBucketId]
842
843        if vBucketId not in self._vBucketMap:
844            msg = "vbucket map does not have an entry for vb : %s"
845            self._vBucketMapFastForward_lock.release()
846            self._vBucketMap_lock.release()
847            raise Exception(msg % (vBucketId))
848        if self._vBucketMap[vBucketId] not in self._memcacheds:
849            msg = "smart client does not have a mc connection for server : %s"
850            self._vBucketMapFastForward_lock.release()
851            self._vBucketMap_lock.release()
852            raise Exception(msg % (self._vBucketMap[vBucketId]))
853        r = self._memcacheds[self._vBucketMap[vBucketId]]
854        self._vBucketMapFastForward_lock.release()
855        self._vBucketMap_lock.release()
856        return r
857
858    def vbucketid(self, key):
859        self._vBucketMap_lock.acquire()
860        r = (zlib.crc32(key) >> 16) & (len(self._vBucketMap) - 1)
861        self._vBucketMap_lock.release()
862        return r
863
864    def done(self):
865        if self.dispatcher:
866            self.dispatcher.shutdown()
867            if self.verbose:
868                self.log.info("dispatcher shutdown invoked")
869            [self._memcacheds[ip].close() for ip in self._memcacheds]
870            if self.verbose:
871                self.log.info("closed all memcached open connections")
872            self.dispatcher = None
873
874    def _respond(self, item, event):
875        timeout = 30
876        event.wait(timeout)
877        if not event.isSet():
878            # if we timeout, then try to reconnect to the server
879            # responsible for this vbucket
880            self.restart_vbucket_connection(self.vbucketid(item['key']))
881            raise MemcachedTimeoutException(item, timeout)
882        if "error" in item["response"]:
883            raise item["response"]["error"]
884        return item["response"]["return"]
885
886    def get(self, key):
887        event = Event()
888        item = {"operation": "get", "key": key, "event": event, "response": {}}
889        self.dispatcher.put(item)
890        return self._respond(item, event)
891
892    def gat(self, key, expiry):
893        event = Event()
894        item = {"operation": "gat", "key": key, "expiry": expiry,
895                "event": event, "response": {}}
896        self.dispatcher.put(item)
897        return self._respond(item, event)
898
899    def touch(self, key, expiry):
900        event = Event()
901        item = {"operation": "touch", "key": key, "expiry": expiry,
902                "event": event,
903                "response": {}}
904        self.dispatcher.put(item)
905        return self._respond(item, event)
906
907    def cas(self, key, expiry, flags, old_value, value):
908        event = Event()
909        item = {"operation": "cas", "key": key, "expiry": expiry,
910                "flags": flags, "old_value": old_value, "value": value,
911                "event": event, "response": {}}
912        self.dispatcher.put(item)
913        return self._respond(item, event)
914
915    def decr(self, key, amount=1, init=0, expiry=0):
916        event = Event()
917        item = {"operation": "decr", "key": key, "amount": amount,
918                "init": init, "expiry": expiry, "event": event, "response": {}}
919        self.dispatcher.put(item)
920        return self._respond(item, event)
921
922    def set(self, key, expiry, flags, value):
923        event = Event()
924        item = {"operation": "set", "key": key, "expiry": expiry,
925                "flags": flags, "value": value, "event": event, "response": {}}
926        self.dispatcher.put(item)
927        return self._respond(item, event)
928
929    def add(self, key, expiry, flags, value):
930        event = Event()
931        item = {"operation": "add", "key": key, "expiry": expiry,
932                "flags": flags, "value": value, "event": event, "response": {}}
933        self.dispatcher.put(item)
934        return self._respond(item, event)
935
936    def append(self, key, value, cas=0):
937        event = Event()
938        item = {"operation": "append", "key": key, "cas": cas, "value": value,
939                "event": event, "response": {}}
940        self.dispatcher.put(item)
941        return self._respond(item, event)
942
943    def delete(self, key, cas=0):
944        event = Event()
945        item = {"operation": "delete", "key": key, "cas": cas, "event": event,
946                "response": {}}
947        self.dispatcher.put(item)
948        return self._respond(item, event)
949
950    def prepend(self, key, value, cas=0):
951        event = Event()
952        item = {"operation": "prepend", "key": key, "cas": cas, "value": value,
953                "event": event, "response": {}}
954        self.dispatcher.put(item)
955        return self._respond(item, event)
956
957    def getl(self, key, expiry=15):
958        event = Event()
959        item = {"operation": "getl", "key": key, "expiry": expiry,
960                "event": event, "response": {}}
961        self.dispatcher.put(item)
962        return self._respond(item, event)
963
964    def replace(self, key, expiry, flags, value):
965        event = Event()
966        item = {"operation": "replace", "key": key, "expiry": expiry,
967                "flags": flags, "value": value, "event": event, "response": {}}
968        self.dispatcher.put(item)
969        return self._respond(item, event)
970
971    def incr(self, key, amount=1, init=0, expiry=0):
972        event = Event()
973        item = {"operation": "incr", "key": key, "amount": amount,
974                "init": init, "expiry": expiry, "event": event, "response": {}}
975        self.dispatcher.put(item)
976        return self._respond(item, event)
977
978    def flush(self, wait_time=0):
979        event = Event()
980        item = {"operation": "flush", "expiry": wait_time, "event": event,
981                "response": {}}
982        self.dispatcher.put(item)
983        return self._respond(item, event)
984
985
986class VBucketAwareCouchbaseClient(CouchbaseClient):
987    def __init__(self, host, username, password):
988        warnings.warn("Server is deprecated; use VBucketAwareCouchbaseClient"
989                      " instead", DeprecationWarning)
990        CouchbaseClient.__init__(self, host, username, password)
991
992
993class CommandDispatcher(object):
994    #this class contains a queue where request
995
996    def __init__(self, vbaware, verbose=False):
997        #have a queue , in case of not my vbucket error
998        #let's reinitialize the config/memcached socket connections ?
999        self.queue = Queue(10000)
1000        self.status = "initialized"
1001        self.vbaware = vbaware
1002        self.reconfig_callback = self.vbaware.reconfig_vbucket_map
1003        self.start_connection_callback = self.vbaware.start_vbucket_connection
1004        self.restart_connection_callback =\
1005            self.vbaware.restart_vbucket_connection
1006        self.verbose = verbose
1007        self.log = logger.logger("CommandDispatcher")
1008        self._dispatcher_stopped_event = Event()
1009
1010    def put(self, item):
1011        try:
1012            self.queue.put(item, False)
1013        except Full:
1014            #TODO: add a better error message here
1015            raise Exception("queue is full")
1016
1017    def shutdown(self):
1018        if self.status != "shutdown":
1019            self.status = "shutdown"
1020            if self.verbose:
1021                self.log.info("dispatcher shutdown command received")
1022        self._dispatcher_stopped_event.wait(2)
1023
1024    def reconfig_completed(self):
1025        self.status = "ok"
1026
1027    def dispatch(self):
1028        while (self.status != "shutdown" or (self.status == "shutdown" and
1029               self.queue.qsize() > 0)):
1030            #wait if its reconfiguring the vbucket-map
1031            if self.status == "vbucketmap-configuration":
1032                continue
1033            try:
1034                item = self.queue.get(block=True, timeout=1)
1035                if item:
1036                    try:
1037                        self.do(item)
1038                        # do will only raise not_my_vbucket_exception,
1039                        # EOF and socket.error
1040                    except MemcachedError, ex:
1041                        # if we get a not_my_vbucket then requeue item
1042                        #  with fast forward map vbucket
1043                        self.log.error(ex)
1044                        self.reconfig_callback(ex.vbucket)
1045                        self.start_connection_callback(ex.vbucket)
1046                        item["fastforward"] = True
1047                        self.queue.put(item)
1048                    except exceptions.EOFError, ex:
1049                        # we go an EOF error, restart the connection
1050                        self.log.error(ex)
1051                        self.restart_connection_callback(ex.vbucket)
1052                        self.queue.put(item)
1053                    except socket.error, ex:
1054                        # we got a socket error, restart the connection
1055                        self.log.error(ex)
1056                        self.restart_connection_callback(ex.vbucket)
1057                        self.queue.put(item)
1058
1059            except Empty:
1060                pass
1061        if self.verbose:
1062            self.log.info("dispatcher stopped")
1063            self._dispatcher_stopped_event.set()
1064
1065    def _raise_if_recoverable(self, ex, item):
1066        if isinstance(ex, MemcachedError) and ex.status == 7:
1067            ex.vbucket = item["vbucket"]
1068            print ex
1069            self.log.error("got not my vb error. key: %s, vbucket: %s" %
1070                           (item["key"], item["vbucket"]))
1071            raise ex
1072        if isinstance(ex, exceptions.EOFError):
1073            ex.vbucket = item["vbucket"]
1074            print ex
1075            self.log.error("got EOF")
1076            raise ex
1077        if isinstance(ex, socket.error):
1078            ex.vbucket = item["vbucket"]
1079            print ex
1080            self.log.error("got socket.error")
1081            raise ex
1082        item["response"]["error"] = ex
1083
1084    def do(self, item):
1085        #find which vbucket this belongs to and then run the operation on that
1086        if "key" in item:
1087            item["vbucket"] = self.vbaware.vbucketid(item["key"])
1088        if not "fastforward" in item:
1089            item["fastforward"] = False
1090        item["response"]["return"] = None
1091
1092        if item["operation"] == "get":
1093            key = item["key"]
1094            try:
1095                item["response"]["return"] =\
1096                    self.vbaware.memcached(key, item["fastforward"]).get(key)
1097            except Exception, ex:
1098                self._raise_if_recoverable(ex, item)
1099            item["event"].set()
1100        elif item["operation"] == "set":
1101            key = item["key"]
1102            expiry = item["expiry"]
1103            flags = item["flags"]
1104            value = item["value"]
1105            try:
1106                conn = self.vbaware.memcached(key, item["fastforward"])
1107                item["response"]["return"] = conn.set(key, expiry, flags,
1108                                                      value)
1109            except Exception, ex:
1110                self._raise_if_recoverable(ex, item)
1111            item["event"].set()
1112        elif item["operation"] == "add":
1113            key = item["key"]
1114            expiry = item["expiry"]
1115            flags = item["flags"]
1116            value = item["value"]
1117            try:
1118                conn = self.vbaware.memcached(key, item["fastforward"])
1119                item["response"]["return"] = conn.add(key, expiry, flags,
1120                                                      value)
1121            except Exception, ex:
1122                self._raise_if_recoverable(ex, item)
1123            item["event"].set()
1124        elif item["operation"] == "replace":
1125            key = item["key"]
1126            expiry = item["expiry"]
1127            flags = item["flags"]
1128            value = item["value"]
1129            try:
1130                conn = self.vbaware.memcached(key, item["fastforward"])
1131                item["response"]["return"] = conn.replace(key, expiry, flags,
1132                                                          value)
1133            except Exception, ex:
1134                self._raise_if_recoverable(ex, item)
1135            item["event"].set()
1136        elif item["operation"] == "delete":
1137            key = item["key"]
1138            cas = item["cas"]
1139            try:
1140                conn = self.vbaware.memcached(key, item["fastforward"])
1141                item["response"]["return"] = conn.delete(key, cas)
1142            except Exception, ex:
1143                self._raise_if_recoverable(ex, item)
1144            item["event"].set()
1145        elif item["operation"] == "prepend":
1146            key = item["key"]
1147            cas = item["cas"]
1148            value = item["value"]
1149            try:
1150                conn = self.vbaware.memcached(key, item["fastforward"])
1151                item["response"]["return"] = conn.prepend(key, value, cas)
1152            except Exception, ex:
1153                self._raise_if_recoverable(ex, item)
1154            item["event"].set()
1155        elif item["operation"] == "append":
1156            key = item["key"]
1157            cas = item["cas"]
1158            value = item["value"]
1159            try:
1160                conn = self.vbaware.memcached(key, item["fastforward"])
1161                item["response"]["return"] = conn.append(key, value, cas)
1162            except Exception, ex:
1163                self._raise_if_recoverable(ex, item)
1164            item["event"].set()
1165        elif item["operation"] == "getl":
1166            key = item["key"]
1167            expiry = item["expiry"]
1168            try:
1169                conn = self.vbaware.memcached(key, item["fastforward"])
1170                item["response"]["return"] = conn.getl(key, expiry)
1171            except Exception, ex:
1172                self._raise_if_recoverable(ex, item)
1173            item["event"].set()
1174        elif item["operation"] == "gat":
1175            key = item["key"]
1176            expiry = item["expiry"]
1177            try:
1178                conn = self.vbaware.memcached(key, item["fastforward"])
1179                item["response"]["return"] = conn.gat(key, expiry)
1180            except Exception, ex:
1181                self._raise_if_recoverable(ex, item)
1182            item["event"].set()
1183        elif item["operation"] == "touch":
1184            key = item["key"]
1185            expiry = item["expiry"]
1186            try:
1187                conn = self.vbaware.memcached(key, item["fastforward"])
1188                item["response"]["return"] = conn.touch(key, expiry)
1189            except Exception, ex:
1190                self._raise_if_recoverable(ex, item)
1191            item["event"].set()
1192        elif item["operation"] == "incr":
1193            key = item["key"]
1194            amount = item["amount"]
1195            init = item["init"]
1196            expiry = item["expiry"]
1197            try:
1198                conn = self.vbaware.memcached(key, item["fastforward"])
1199                item["response"]["return"] = conn.incr(key, amount, init,
1200                                                       expiry)
1201            except Exception, ex:
1202                self._raise_if_recoverable(ex, item)
1203            item["event"].set()
1204        elif item["operation"] == "decr":
1205            key = item["key"]
1206            amount = item["amount"]
1207            init = item["init"]
1208            expiry = item["expiry"]
1209            try:
1210                conn = self.vbaware.memcached(key, item["fastforward"])
1211                item["response"]["return"] = conn.decr(key, amount, init,
1212                                                       expiry)
1213            except Exception, ex:
1214                self._raise_if_recoverable(ex, item)
1215            item["event"].set()
1216
1217        elif item["operation"] == "cas":
1218            key = item["key"]
1219            expiry = item["expiry"]
1220            flags = item["flags"]
1221            old_value = item["old_value"]
1222            value = item["value"]
1223            try:
1224                conn = self.vbaware.memcached(key, item["fastforward"])
1225                item["response"]["return"] = conn.cas(key, expiry, flags,
1226                                                      old_value, value)
1227            except Exception, ex:
1228                self._raise_if_recoverable(ex, item)
1229            item["event"].set()
1230        elif item["operation"] == "flush":
1231            wait_time = item["expiry"]
1232            for key, conn in self.vbaware._memcacheds.items():
1233                conn.flush(wait_time)
1234            item["response"]["return"] = True
1235            item["event"].set()
1236
1237
1238class MemcachedClientHelper(object):
1239    @staticmethod
1240    def direct_client(rest, ip, port, bucket):
1241        bucket_info = rest.get_bucket(bucket)
1242        vBuckets = bucket_info.vbuckets
1243        for node in bucket_info.nodes:
1244            if node.ip == ip and node.memcached == int(port):
1245                client = MemcachedClient(ip.encode('ascii', 'ignore'),
1246                                         int(port))
1247                client.vbucket_count = len(vBuckets)
1248                client.sasl_auth_plain(bucket_info.name.encode('ascii'),
1249                                       bucket_info.saslPassword
1250                                       .encode('ascii'))
1251                return client
1252        raise Exception(("unexpected error - unable to find ip:%s in this"
1253                         " cluster" % (ip)))
1254