1#!/usr/bin/env python
2"""
3TAP protocol client library.
4
5Copyright (c) 2010  Dustin Sallings <dustin@spy.net>
6"""
7
8import socket
9import string
10import random
11import struct
12import asyncore
13
14import mc_bin_server
15import mc_bin_client
16
17from memcacheConstants import REQ_MAGIC_BYTE, RES_MAGIC_BYTE
18from memcacheConstants import REQ_PKT_FMT, RES_PKT_FMT, MIN_RECV_PACKET
19from memcacheConstants import SET_PKT_FMT, DEL_PKT_FMT, INCRDECR_RES_FMT
20
21import memcacheConstants
22
23class TapConnection(mc_bin_server.MemcachedBinaryChannel):
24
25    def __init__(self, server, port, callback, clientId=None, opts={}, user=None, pswd=None):
26        mc_bin_server.MemcachedBinaryChannel.__init__(self, None, None,
27                                                      self._createTapCall(clientId,
28                                                                          opts))
29        self.server = server
30        self.port = port
31        self.callback = callback
32        self.identifier = (server, port)
33        self.user = user
34        self.pswd = pswd
35        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
36        self.connect((server, port))
37
38    def create_socket(self, family, type):
39        if not self.user:
40            mc_bin_server.MemcachedBinaryChannel.create_socket(self, family, type)
41            return
42
43        self.family_and_type = family, type
44
45        self.mc = mc_bin_client.MemcachedClient(self.server, self.port)
46        self.mc.sasl_auth_plain(self.user, self.pswd or "")
47
48        sock = self.mc.s
49        sock.setblocking(0)
50        self.set_socket(sock)
51
52    def _createTapCall(self, key=None, opts={}):
53        # Client identifier
54        if not key:
55            key = "".join(random.sample(string.letters, 16))
56        dtype=0
57        opaque=0
58        cas=0
59
60        extraHeader, val = self._encodeOpts(opts)
61
62        msg=struct.pack(REQ_PKT_FMT, REQ_MAGIC_BYTE,
63                        memcacheConstants.CMD_TAP_CONNECT,
64                        len(key), len(extraHeader), dtype, 0,
65                        len(key) + len(extraHeader) + len(val),
66                        opaque, cas)
67        return msg + extraHeader + key + val
68
69    def _encodeOpts(self, opts):
70        header = 0
71        val = []
72        for op in sorted(opts.keys()):
73            header |= op
74            if op in memcacheConstants.TAP_FLAG_TYPES:
75                val.append(struct.pack(memcacheConstants.TAP_FLAG_TYPES[op],
76                                       opts[op]))
77            elif op == memcacheConstants.TAP_FLAG_LIST_VBUCKETS:
78                val.append(self._encodeVBucketList(opts[op]))
79            else:
80                val.append(opts[op])
81        return struct.pack(">I", header), ''.join(val)
82
83    def _encodeVBucketList(self, vbl):
84        l = list(vbl) # in case it's a generator
85        vals = [struct.pack("!H", len(l))]
86        for v in vbl:
87            vals.append(struct.pack("!H", v))
88        return ''.join(vals)
89
90    def processCommand(self, cmd, klen, vb, extralen, cas, data):
91        extra = data[0:extralen]
92        key = data[extralen:(extralen+klen)]
93        val = data[(extralen+klen):]
94        return self.callback(self.identifier, cmd, extra, key, vb, val, cas)
95
96    def handle_connect(self):
97        pass
98
99    def handle_close(self):
100        self.close()
101
102class TapClient(object):
103
104    def __init__(self, servers, callback, opts={}, user=None, pswd=None):
105        for t in servers:
106            tc = TapConnection(t.host, t.port, callback, t.id, opts, user, pswd)
107
108class TapDescriptor(object):
109    port = 11211
110    id = None
111
112    def __init__(self, s):
113        self.host = s
114        if ':' in s:
115            self.host, self.port = s.split(':', 1)
116            self.port = int(self.port)
117
118        if '@' in self.host:
119            self.id, self.host = self.host.split('@', 1)
120
121