1#!/usr/bin/env python
2"""
3tap protocol client.
4
5Copyright (c) 2010  Dustin Sallings <dustin@spy.net>
6"""
7import exceptions
8
9import sys
10import socket
11import string
12import random
13import struct
14import asyncore
15import uuid
16import mc_bin_server
17
18from memcacheConstants import REQ_MAGIC_BYTE
19from memcacheConstants import REQ_PKT_FMT
20
21import memcacheConstants
22
23class TapConnection(object):
24    def __init__(self, server, port, callback, clientId=None, opts={}):
25        self.wbuf = self._createTapCall(clientId, opts)
26        print self.wbuf
27        self.callback = callback
28        self.identifier = (server.ip, port)
29        print 'tap connection : {0} {1}'.format(server.ip, port)
30        self.s = socket.socket()
31        self.s.connect_ex((server.ip, port))
32        sent = self.s.send(self.wbuf)
33        self.wbuf = self.wbuf[sent:]
34        self.rbuf = ""
35
36
37    def __hasEnoughBytes(self):
38        rv = False
39        if len(self.rbuf) >= memcacheConstants.MIN_RECV_PACKET:
40            magic, cmd, keylen, extralen, datatype, vb, remaining, opaque, cas =\
41            struct.unpack(REQ_PKT_FMT, self.rbuf[:memcacheConstants.MIN_RECV_PACKET])
42            rv = len(self.rbuf) - memcacheConstants.MIN_RECV_PACKET >= remaining
43        return rv
44
45    def receive(self):
46        BUFFER_SIZE = 4096
47        self.s.settimeout(10)
48        try:
49            self.rbuf += self.s.recv(BUFFER_SIZE)
50        except:
51            return
52        while self.__hasEnoughBytes():
53            magic, cmd, keylen, extralen, datatype, vb, remaining, opaque, cas =\
54            struct.unpack(REQ_PKT_FMT, self.rbuf[:memcacheConstants.MIN_RECV_PACKET])
55            assert magic == REQ_MAGIC_BYTE
56            assert keylen <= remaining, "Keylen is too big: %d > %d"\
57            % (keylen, remaining)
58            assert extralen == memcacheConstants.EXTRA_HDR_SIZES.get(cmd, 0),\
59            "Extralen is too large for cmd 0x%x: %d" % (cmd, extralen)
60            # Grab the data section of this request
61            data = self.rbuf[memcacheConstants.MIN_RECV_PACKET:memcacheConstants.MIN_RECV_PACKET + remaining]
62            assert len(data) == remaining
63            # Remove this request from the read buffer
64            self.rbuf = self.rbuf[memcacheConstants.MIN_RECV_PACKET + remaining:]
65            # Process the command
66            cmdVal = self.processCommand(cmd, keylen, vb, extralen, cas, data)
67            # Queue the response to the client if applicable.
68            if cmdVal:
69                try:
70                    status, cas, response = cmdVal
71                except ValueError:
72                    print "Got", cmdVal
73                    raise
74                dtype = 0
75                extralen = memcacheConstants.EXTRA_HDR_SIZES.get(cmd, 0)
76                self.wbuf += struct.pack(memcacheConstants.RES_PKT_FMT,
77                                         memcacheConstants.RES_MAGIC_BYTE, cmd, keylen,
78                                         extralen, dtype, status,
79                                         len(response), opaque, cas) + response
80
81
82    def _createTapCall(self, key=None, opts={}):
83        # Client identifier
84        if not key:
85            key = "".join(random.sample(string.letters, 16))
86        dtype = 0
87        opaque = 0
88        cas = 0
89
90        extraHeader, val = self._encodeOpts(opts)
91        print "opts: ", extraHeader, val
92
93        msg = struct.pack(REQ_PKT_FMT, REQ_MAGIC_BYTE,
94                          memcacheConstants.CMD_TAP_CONNECT,
95                          len(key), len(extraHeader), dtype, 0,
96                          len(key) + len(extraHeader) + len(val),
97                          opaque, cas)
98        return msg + extraHeader + key + val
99
100    def _encodeOpts(self, opts):
101        header = 0
102        val = []
103        for op in sorted(opts.keys()):
104            header |= op
105            if op in memcacheConstants.TAP_FLAG_TYPES:
106                val.append(struct.pack(memcacheConstants.TAP_FLAG_TYPES[op],
107                                       opts[op]))
108            elif op == memcacheConstants.TAP_FLAG_LIST_VBUCKETS:
109                val.append(self._encodeVBucketList(opts[op]))
110            else:
111                val.append(opts[op])
112        return struct.pack(">I", header), ''.join(val)
113
114    def _encodeVBucketList(self, vbl):
115        l = list(vbl) # in case it's a generator
116        vals = [struct.pack("!H", len(l))]
117        for v in vbl:
118            vals.append(struct.pack("!H", v))
119        return ''.join(vals)
120
121    def processCommand(self, cmd, klen, vb, extralen, cas, data):
122        extra = data[0:extralen]
123        key = data[extralen:(extralen + klen)]
124        val = data[(extralen + klen):]
125        self.callback(self.identifier, cmd, extra, key, vb, val, cas)
126
127    def handle_connect(self):
128        print "connected..."
129
130
131    def handle_close(self):
132        print "handle_close"
133        self.close()
134
135
136class TapClient(object):
137    def __init__(self, servers, callback, opts={}):
138        for t in servers:
139            tc = TapConnection(t.host, t.port, callback, t.id, opts)
140
141
142def buildGoodSet(goodChars=string.printable, badChar='?'):
143    """Build a translation table that turns all characters not in goodChars
144    to badChar"""
145    allChars = string.maketrans("", "")
146    badchars = string.translate(allChars, allChars, goodChars)
147    rv = string.maketrans(badchars, badChar * len(badchars))
148    return rv
149
150
151class TapDescriptor(object):
152    port = 11210
153    id = None
154
155    def __init__(self, server):
156        self.host = server.ip
157        self.port = 11210
158        self.id = str(uuid.uuid4())
159
160# Build a translation table that includes only characters
161transt = buildGoodSet()
162
163def abbrev(v, maxlen=30):
164    if len(v) > maxlen:
165        return v[:maxlen] + "..."
166    else:
167        return v
168
169
170def keyprint(v):
171    return string.translate(abbrev(v), transt)
172
173#def mainLoop(serverList, cb, opts={}):
174#    """Run the given callback for each tap message from any of the
175#    upstream servers.
176#
177#    loops until all connections drop
178#    """
179#
180#    connections = (TapDescriptor(a) for a in serverList)
181#    TapClient(connections, cb, opts=opts)
182#    asyncore.loop()
183
184#if __name__ == '__main__':
185#    def cb(identifier, cmd, extra, key, vb, val, cas):
186#        print "%s: ``%s'' (vb:%d) -> ``%s'' (%d bytes from %s)" % (
187#            memcacheConstants.COMMAND_NAMES[cmd],
188#            key, vb, keyprint(val), len(val), identifier)
189#
190#    # This is an example opts parameter to do future-only tap:
191#    opts = {memcacheConstants.TAP_FLAG_BACKFILL: 0xffffffff}
192#    # If you omit it, or supply a past time_t value for backfill, it
193#    # will get all data.
194#    opts = {}
195#
196#    mainLoop(sys.argv[1:], cb, opts)
197