xref: /4.6.0/couchbase-cli/pump_mc.py (revision 38e3ecf9)
1#!/usr/bin/env python
2
3import logging
4import socket
5import struct
6import time
7import sys
8
9import cb_bin_client
10import couchbaseConstants
11import pump
12import cbsnappy as snappy
13
14try:
15    import ctypes
16except ImportError:
17    cb_path = '/opt/couchbase/lib/python'
18    while cb_path in sys.path:
19        sys.path.remove(cb_path)
20    try:
21        import ctypes
22    except ImportError:
23        sys.exit('error: could not import ctypes module')
24    else:
25        sys.path.insert(0, cb_path)
26
27OP_MAP = {
28    'get': couchbaseConstants.CMD_GET,
29    'set': couchbaseConstants.CMD_SET,
30    'add': couchbaseConstants.CMD_ADD,
31    'delete': couchbaseConstants.CMD_DELETE,
32    }
33
34OP_MAP_WITH_META = {
35    'get': couchbaseConstants.CMD_GET,
36    'set': couchbaseConstants.CMD_SET_WITH_META,
37    'add': couchbaseConstants.CMD_ADD_WITH_META,
38    'delete': couchbaseConstants.CMD_DELETE_WITH_META
39    }
40
41class MCSink(pump.Sink):
42    """Dumb client sink using binary memcached protocol.
43       Used when moxi or memcached is destination."""
44
45    def __init__(self, opts, spec, source_bucket, source_node,
46                 source_map, sink_map, ctl, cur):
47        super(MCSink, self).__init__(opts, spec, source_bucket, source_node,
48                                     source_map, sink_map, ctl, cur)
49
50        self.op_map = OP_MAP
51        if opts.extra.get("try_xwm", 1):
52            self.op_map = OP_MAP_WITH_META
53        self.conflict_resolve = opts.extra.get("conflict_resolve", 1)
54        self.lww_restore = 0
55        self.init_worker(MCSink.run)
56        self.uncompress = opts.extra.get("uncompress", 0)
57
58        if self.get_conflict_resolution_type() == "lww":
59            self.lww_restore = 1
60
61    def close(self):
62        self.push_next_batch(None, None)
63
64    @staticmethod
65    def check_base(opts, spec):
66        if getattr(opts, "destination_vbucket_state", "active") != "active":
67            return ("error: only --destination-vbucket-state=active" +
68                    " is supported by this destination: %s") % (spec)
69
70        op = getattr(opts, "destination_operation", None)
71        if not op in [None, 'set', 'add', 'get']:
72            return ("error: --destination-operation unsupported value: %s" +
73                    "; use set, add, get") % (op)
74
75        # Skip immediate superclass Sink.check_base(),
76        # since MCSink can handle different destination operations.
77        return pump.EndPoint.check_base(opts, spec)
78
79    @staticmethod
80    def run(self):
81        """Worker thread to asynchronously store batches into sink."""
82
83        mconns = {} # State kept across scatter_gather() calls.
84        backoff_cap = self.opts.extra.get("backoff_cap", 10)
85        while not self.ctl['stop']:
86            batch, future = self.pull_next_batch()
87            if not batch:
88                self.future_done(future, 0)
89                self.close_mconns(mconns)
90                return
91
92            backoff = 0.1 # Reset backoff after a good batch.
93
94            while batch:  # Loop in case retry is required.
95                rv, batch, need_backoff = self.scatter_gather(mconns, batch)
96                if rv != 0:
97                    self.future_done(future, rv)
98                    self.close_mconns(mconns)
99                    return
100
101                if batch:
102                    self.cur["tot_sink_retry_batch"] = \
103                        self.cur.get("tot_sink_retry_batch", 0) + 1
104
105                if need_backoff:
106                    backoff = min(backoff * 2.0, backoff_cap)
107                    logging.warn("backing off, secs: %s" % (backoff))
108                    time.sleep(backoff)
109
110            self.future_done(future, 0)
111
112        self.close_mconns(mconns)
113
114    def get_conflict_resolution_type(self):
115        bucket = self.sink_map["buckets"][0]
116        confResType = "seqno"
117        if "conflictResolutionType" in bucket:
118            confResType = bucket["conflictResolutionType"]
119        return confResType
120
121    def close_mconns(self, mconns):
122        for k, conn in mconns.items():
123            self.add_stop_event(conn)
124            conn.close()
125
126    def scatter_gather(self, mconns, batch):
127        conn = mconns.get("conn")
128        if not conn:
129            rv, conn = self.connect()
130            if rv != 0:
131                return rv, None
132            mconns["conn"] = conn
133
134        # TODO: (1) MCSink - run() handle --data parameter.
135
136        # Scatter or send phase.
137        rv = self.send_msgs(conn, batch.msgs, self.operation())
138        if rv != 0:
139            return rv, None, None
140
141        # Gather or recv phase.
142        rv, retry, refresh = self.recv_msgs(conn, batch.msgs)
143        if refresh:
144            self.refresh_sink_map()
145        if retry:
146            return rv, batch, True
147
148        return rv, None, None
149
150    def send_msgs(self, conn, msgs, operation, vbucket_id=None):
151        m = []
152
153        msg_format_length = 0
154        for i, msg in enumerate(msgs):
155            if not msg_format_length:
156                msg_format_length = len(msg)
157            cmd, vbucket_id_msg, key, flg, exp, cas, meta, val = msg[:8]
158            seqno = dtype = nmeta = conf_res = 0
159            if msg_format_length > 8:
160                seqno, dtype, nmeta, conf_res = msg[8:]
161            if vbucket_id is not None:
162                vbucket_id_msg = vbucket_id
163
164            if self.skip(key, vbucket_id_msg):
165                continue
166
167            rv, cmd = self.translate_cmd(cmd, operation, meta)
168            if rv != 0:
169                return rv
170            if dtype > 2:
171                if self.uncompress and val:
172                    try:
173                        val = snappy.uncompress(val)
174                    except Exception, err:
175                        pass
176            if cmd == couchbaseConstants.CMD_GET:
177                val, flg, exp, cas = '', 0, 0, 0
178            if cmd == couchbaseConstants.CMD_NOOP:
179                key, val, flg, exp, cas = '', '', 0, 0, 0
180            if cmd in (couchbaseConstants.CMD_DELETE, couchbaseConstants.CMD_DELETE_WITH_META):
181                val = ''
182            rv, req = self.cmd_request(cmd, vbucket_id_msg, key, val,
183                                       ctypes.c_uint32(flg).value,
184                                       exp, cas, meta, i, dtype, nmeta,
185                                       conf_res)
186            if rv != 0:
187                return rv
188
189            self.append_req(m, req)
190
191        if m:
192            try:
193                conn.s.send(''.join(m))
194            except socket.error, e:
195                return "error: conn.send() exception: %s" % (e)
196
197        return 0
198
199    def recv_msgs(self, conn, msgs, vbucket_id=None, verify_opaque=True):
200        refresh = False
201        retry = False
202
203        for i, msg in enumerate(msgs):
204            cmd, vbucket_id_msg, key, flg, exp, cas, meta, val = msg[:8]
205            if vbucket_id is not None:
206                vbucket_id_msg = vbucket_id
207
208            if self.skip(key, vbucket_id_msg):
209                continue
210
211            try:
212                r_cmd, r_status, r_ext, r_key, r_val, r_cas, r_opaque = \
213                    self.read_conn(conn)
214                if verify_opaque and i != r_opaque:
215                    return "error: opaque mismatch: %s %s" % (i, r_opaque), None, None
216
217                if r_status == couchbaseConstants.ERR_SUCCESS:
218                    continue
219                elif r_status == couchbaseConstants.ERR_KEY_EEXISTS:
220                    #logging.warn("item exists: %s, key: %s" %
221                    #             (self.spec, key))
222                    continue
223                elif r_status == couchbaseConstants.ERR_KEY_ENOENT:
224                    if (cmd != couchbaseConstants.CMD_TAP_DELETE and
225                        cmd != couchbaseConstants.CMD_GET):
226                        logging.warn("item not found: %s, key: %s" %
227                                     (self.spec, key))
228                    continue
229                elif (r_status == couchbaseConstants.ERR_ETMPFAIL or
230                      r_status == couchbaseConstants.ERR_EBUSY or
231                      r_status == couchbaseConstants.ERR_ENOMEM):
232                    retry = True # Retry the whole batch again next time.
233                    continue     # But, finish recv'ing current batch.
234                elif r_status == couchbaseConstants.ERR_NOT_MY_VBUCKET:
235                    msg = ("received NOT_MY_VBUCKET;"
236                           " perhaps the cluster is/was rebalancing;"
237                           " vbucket_id: %s, key: %s, spec: %s, host:port: %s:%s"
238                           % (vbucket_id_msg, key, self.spec,
239                              conn.host, conn.port))
240                    if self.opts.extra.get("nmv_retry", 1):
241                        logging.warn("warning: " + msg)
242                        refresh = True
243                        retry = True
244                        self.cur["tot_sink_not_my_vbucket"] = \
245                            self.cur.get("tot_sink_not_my_vbucket", 0) + 1
246                    else:
247                        return "error: " + msg, None, None
248                elif r_status == couchbaseConstants.ERR_UNKNOWN_COMMAND:
249                    if self.op_map == OP_MAP:
250                        if not retry:
251                            return "error: unknown command: %s" % (r_cmd), None, None
252                    else:
253                        if not retry:
254                            logging.warn("destination does not take XXX-WITH-META"
255                                         " commands; will use META-less commands")
256                        self.op_map = OP_MAP
257                        retry = True
258                else:
259                    return "error: MCSink MC error: " + str(r_status), None, None
260
261            except Exception, e:
262                logging.error("MCSink exception: %s", e)
263                return "error: MCSink exception: " + str(e), None, None
264        return 0, retry, refresh
265
266    def translate_cmd(self, cmd, op, meta):
267        if len(str(meta)) <= 0:
268            # The source gave no meta, so use regular commands.
269            self.op_map = OP_MAP
270
271        if cmd in[couchbaseConstants.CMD_TAP_MUTATION, couchbaseConstants.CMD_DCP_MUTATION] :
272            m = self.op_map.get(op, None)
273            if m:
274                return 0, m
275            return "error: MCSink.translate_cmd, unsupported op: " + op, None
276
277        if cmd in [couchbaseConstants.CMD_TAP_DELETE, couchbaseConstants.CMD_DCP_DELETE]:
278            if op == 'get':
279                return 0, couchbaseConstants.CMD_NOOP
280            return 0, self.op_map['delete']
281
282        if cmd == couchbaseConstants.CMD_GET:
283            return 0, cmd
284
285        return "error: MCSink - unknown cmd: %s, op: %s" % (cmd, op), None
286
287    def append_req(self, m, req):
288        hdr, ext, key, val, extra_meta = req
289        m.append(hdr)
290        if ext:
291            m.append(ext)
292        if key:
293            m.append(str(key))
294        if val:
295            m.append(str(val))
296        if extra_meta:
297            m.append(extra_meta)
298
299    @staticmethod
300    def can_handle(opts, spec):
301        return (spec.startswith("memcached://") or
302                spec.startswith("memcached-binary://"))
303
304    @staticmethod
305    def check(opts, spec, source_map):
306        host, port, user, pswd, path = \
307            pump.parse_spec(opts, spec, int(getattr(opts, "port", 11211)))
308        if opts.ssl:
309            ports = couchbaseConstants.SSL_PORT
310        rv, conn = MCSink.connect_mc(host, port, user, pswd)
311        if rv != 0:
312            return rv, None
313        conn.close()
314        return 0, None
315
316    def refresh_sink_map(self):
317        return 0
318
319    @staticmethod
320    def consume_design(opts, sink_spec, sink_map,
321                       source_bucket, source_map, source_design):
322        if source_design:
323            logging.warn("warning: cannot restore bucket design"
324                         " on a memached destination")
325        return 0
326
327    def consume_batch_async(self, batch):
328        return self.push_next_batch(batch, pump.SinkBatchFuture(self, batch))
329
330    def connect(self):
331        host, port, user, pswd, path = \
332            pump.parse_spec(self.opts, self.spec,
333                            int(getattr(self.opts, "port", 11211)))
334        if self.opts.ssl:
335            port = couchbaseConstants.SSL_PORT
336        return MCSink.connect_mc(host, port, user, pswd)
337
338    @staticmethod
339    def connect_mc(host, port, user, pswd):
340        mc = cb_bin_client.MemcachedClient(host, int(port))
341        if user:
342            try:
343                mc.sasl_auth_cram_md5(str(user), str(pswd))
344            except cb_bin_client.MemcachedError:
345                try:
346                    mc.sasl_auth_plain(str(user), str(pswd))
347                except EOFError:
348                    return "error: SASL auth error: %s:%s, user: %s" % \
349                        (host, port, user), None
350                except cb_bin_client.MemcachedError:
351                    return "error: SASL auth failed: %s:%s, user: %s" % \
352                        (host, port, user), None
353                except socket.error:
354                    return "error: SASL auth exception: %s:%s, user: %s" % \
355                        (host, port, user), None
356            except EOFError:
357                return "error: SASL auth error: %s:%s, user: %s" % \
358                    (host, port, user), None
359            except socket.error:
360                return "error: SASL auth exception: %s:%s, user: %s" % \
361                    (host, port, user), None
362        return 0, mc
363
364    def cmd_request(self, cmd, vbucket_id, key, val, flg, exp, cas, meta, opaque, dtype, nmeta, conf_res):
365        ext_meta = ''
366        if (cmd == couchbaseConstants.CMD_SET_WITH_META or
367            cmd == couchbaseConstants.CMD_ADD_WITH_META or
368            cmd == couchbaseConstants.CMD_DELETE_WITH_META):
369
370            force = 0
371            if int(self.conflict_resolve) == 0:
372                force |= 1
373            if int(self.lww_restore) == 1:
374                force |= 2
375            if meta:
376                try:
377                    ext = struct.pack(">IIQQI", flg, exp, int(str(meta)), cas, force)
378                except ValueError:
379                    seq_no = str(meta)
380                    if len(seq_no) > 8:
381                        seq_no = seq_no[0:8]
382                    if len(seq_no) < 8:
383                        # The seq_no might be 32-bits from 2.0DP4, so pad with 0x00's.
384                        seq_no = ('\x00\x00\x00\x00\x00\x00\x00\x00' + seq_no)[-8:]
385
386                    check_seqno, = struct.unpack(">Q", seq_no)
387                    if check_seqno:
388                        ext = (struct.pack(">II", flg, exp) + seq_no +
389                               struct.pack(">QI", cas, force))
390                    else:
391                        ext = struct.pack(">IIQQI", flg, exp, 1, cas, force)
392            else:
393                ext = struct.pack(">IIQQI", flg, exp, 1, cas, force)
394            if conf_res:
395                extra_meta = struct.pack(">BBHH",
396                                couchbaseConstants.DCP_EXTRA_META_VERSION,
397                                couchbaseConstants.DCP_EXTRA_META_CONFLICT_RESOLUTION,
398                                len(conf_res),
399                                conf_res)
400                ext += struct.pack(">H", len(extra_meta))
401        elif (cmd == couchbaseConstants.CMD_SET or
402              cmd == couchbaseConstants.CMD_ADD):
403            ext = struct.pack(couchbaseConstants.SET_PKT_FMT, flg, exp)
404        elif (cmd == couchbaseConstants.CMD_DELETE or
405              cmd == couchbaseConstants.CMD_GET or
406              cmd == couchbaseConstants.CMD_NOOP):
407            ext = ''
408        else:
409            return "error: MCSink - unknown cmd for request: " + str(cmd), None
410
411        hdr = self.cmd_header(cmd, vbucket_id, key, val, ext, 0, opaque, dtype)
412        return 0, (hdr, ext, key, val, ext_meta)
413
414    def cmd_header(self, cmd, vbucket_id, key, val, ext, cas, opaque,
415                   dtype=0,
416                   fmt=couchbaseConstants.REQ_PKT_FMT,
417                   magic=couchbaseConstants.REQ_MAGIC_BYTE):
418        #MB-11902
419        dtype = 0
420        return struct.pack(fmt, magic, cmd,
421                           len(key), len(ext), dtype, vbucket_id,
422                           len(key) + len(ext) + len(val), opaque, cas)
423
424    def read_conn(self, conn):
425        ext = ''
426        key = ''
427        val = ''
428
429        buf, cmd, errcode, extlen, keylen, data, cas, opaque = \
430            self.recv_msg(conn.s, getattr(conn, 'buf', ''))
431        conn.buf = buf
432
433        if data:
434            ext = data[0:extlen]
435            key = data[extlen:extlen+keylen]
436            val = data[extlen+keylen:]
437
438        return cmd, errcode, ext, key, val, cas, opaque
439
440    def recv_msg(self, sock, buf):
441        pkt, buf = self.recv(sock, couchbaseConstants.MIN_RECV_PACKET, buf)
442        if not pkt:
443            raise EOFError()
444        magic, cmd, keylen, extlen, dtype, errcode, datalen, opaque, cas = \
445            struct.unpack(couchbaseConstants.RES_PKT_FMT, pkt)
446        if magic != couchbaseConstants.RES_MAGIC_BYTE:
447            raise Exception("unexpected recv_msg magic: " + str(magic))
448        data, buf = self.recv(sock, datalen, buf)
449        return buf, cmd, errcode, extlen, keylen, data, cas, opaque
450
451    def recv(self, skt, nbytes, buf):
452        while len(buf) < nbytes:
453            data = None
454            try:
455                data = skt.recv(max(nbytes - len(buf), 4096))
456            except socket.timeout:
457                logging.error("error: recv socket.timeout")
458            except Exception, e:
459                logging.error("error: recv exception: " + str(e))
460
461            if not data:
462                return None, ''
463            buf += data
464
465        return buf[:nbytes], buf[nbytes:]
466