xref: /3.0.3-GA/couchbase-cli/pump_dcp.py (revision c616f12f)
1#!/usr/bin/env python
2
3import logging
4import os
5import random
6import simplejson as json
7import socket
8import struct
9import time
10import threading
11import select
12import exceptions
13
14import cb_bin_client
15import couchbaseConstants
16from cbqueue import PumpQueue
17
18import pump
19import pump_mc
20import pump_cb
21import pump_tap
22
23try:
24    import ctypes
25except ImportError:
26    cb_path = '/opt/couchbase/lib/python'
27    while cb_path in sys.path:
28        sys.path.remove(cb_path)
29    try:
30        import ctypes
31    except ImportError:
32        sys.exit('error: could not import ctypes module')
33    else:
34        sys.path.insert(0, cb_path)
35
36class DCPStreamSource(pump_tap.TAPDumpSource, threading.Thread):
37    """Can read from cluster/server/bucket via DCP streaming."""
38    HIGH_SEQNO = "high_seqno"
39    VB_UUID = "uuid"
40    ABS_HIGH_SEQNO = "abs_high_seqno"
41    PURGE_SEQNO = "purge_seqno"
42
43    HIGH_SEQNO_BYTE = 8
44    UUID_BYTE = 8
45
46    def __init__(self, opts, spec, source_bucket, source_node,
47                 source_map, sink_map, ctl, cur):
48        super(DCPStreamSource, self).__init__(opts, spec, source_bucket, source_node,
49                                            source_map, sink_map, ctl, cur)
50        threading.Thread.__init__(self)
51
52        self.dcp_done = False
53        self.dcp_conn = None
54        self.mem_conn = None
55        self.dcp_name = opts.process_name
56        self.ack_last = False
57        self.cmd_last = None
58        self.num_msg = 0
59        self.version_supported = self.source_node['version'].split(".") >= ["3", "0", "0"]
60        self.recv_min_bytes = int(opts.extra.get("recv_min_bytes", 4096))
61        self.batch_max_bytes = int(opts.extra.get("batch_max_bytes", 400000))
62        self.vbucket_list = getattr(opts, "vbucket_list", None)
63        self.r=random.Random()
64        self.queue = PumpQueue()
65        self.response = PumpQueue()
66        self.running = False
67        self.stream_list = {}
68        self.unack_size = 0
69        self.node_vbucket_map = None
70
71    @staticmethod
72    def can_handle(opts, spec):
73        return (spec.startswith("http://") or
74                spec.startswith("couchbase://"))
75
76    @staticmethod
77    def check(opts, spec):
78        return pump_tap.TAPDumpSource.check(opts, spec)
79
80    @staticmethod
81    def provide_design(opts, source_spec, source_bucket, source_map):
82        return pump_tap.TAPDumpSource.provide_design(opts, source_spec, source_bucket, source_map);
83
84    def build_node_vbucket_map(self):
85        if self.source_bucket.has_key("vBucketServerMap"):
86            server_list = self.source_bucket["vBucketServerMap"]["serverList"]
87            vbucket_map = self.source_bucket["vBucketServerMap"]["vBucketMap"]
88        else:
89            return None
90
91        node_vbucket_map = []
92        nodename = self.source_node.get('hostname', 'N/A').split(":")[0]
93        nodeindex = -1
94        for index, node in enumerate(server_list):
95            if nodename in node:
96                nodeindex = index
97                break
98        for index, vblist in enumerate(vbucket_map):
99            if vblist[0] >= 0 and vblist[0] == nodeindex:
100                node_vbucket_map.append(index)
101        return node_vbucket_map
102
103    def provide_batch(self):
104        if not self.version_supported:
105            return super(DCPStreamSource, self).provide_batch()
106        cur_sleep = 0.2
107        cur_retry = 0
108        max_retry = self.opts.extra['max_retry']
109        if not self.node_vbucket_map:
110            self.node_vbucket_map = self.build_node_vbucket_map()
111
112        while True:
113            if self.dcp_done:
114                return 0, None
115
116            rv, dcp_conn = self.get_dcp_conn()
117            if rv != 0:
118                self.dcp_done = True
119                return rv, None
120
121            rv, batch = self.provide_dcp_batch_actual(dcp_conn)
122            if rv == 0:
123                return rv, batch
124
125            if self.dcp_conn:
126                self.dcp_conn.close()
127                self.dcp_conn = None
128
129            if cur_retry > max_retry:
130                self.dcp_done = True
131                return rv, batch
132
133            logging.warn("backoff: %s, sleeping: %s, on error: %s" %
134                         (cur_retry, cur_sleep, rv))
135            time.sleep(cur_sleep)
136            cur_sleep = min(cur_sleep * 2, 20) # Max backoff sleep 20 seconds.
137            cur_retry = cur_retry + 1
138
139    def provide_dcp_batch_actual(self, dcp_conn):
140        batch = pump.Batch(self)
141
142        batch_max_size = self.opts.extra['batch_max_size']
143        batch_max_bytes = self.opts.extra['batch_max_bytes']
144        delta_ack_size = batch_max_bytes * 10 / 4 #ack every 25% of buffer size
145        last_processed = 0
146        total_bytes_read = 0
147
148        vbid = 0
149        cmd = 0
150        start_seqno = 0
151        end_seqno = 0
152        vb_uuid = 0
153        hi_seqno = 0
154        ss_start_seqno = 0
155        ss_end_seqno = 0
156        try:
157            while (not self.dcp_done and
158                   batch.size() < batch_max_size and
159                   batch.bytes < batch_max_bytes):
160
161                if self.response.empty():
162                    if len(self.stream_list) > 0:
163                        time.sleep(.25)
164                    else:
165                        self.dcp_done = True
166                    continue
167                unprocessed_size = total_bytes_read - last_processed
168                if unprocessed_size > delta_ack_size:
169                    rv = self.ack_buffer_size(unprocessed_size)
170                    if rv:
171                        logging.error(rv)
172                    else:
173                        last_processed = total_bytes_read
174
175                cmd, errcode, opaque, cas, keylen, extlen, data, datalen, dtype, bytes_read = \
176                    self.response.get()
177                total_bytes_read += bytes_read
178                rv = 0
179                metalen = flags = flg = exp = 0
180                key = val = ext = ''
181                need_ack = False
182                seqno = 0
183                if cmd == couchbaseConstants.CMD_DCP_REQUEST_STREAM:
184                    if errcode == couchbaseConstants.ERR_SUCCESS:
185                        pair_index = (self.source_bucket['name'], self.source_node['hostname'])
186                        start = 0
187                        step = DCPStreamSource.HIGH_SEQNO_BYTE + DCPStreamSource.UUID_BYTE
188                        while start+step <= datalen:
189                            uuid, seqno = struct.unpack(
190                                            couchbaseConstants.DCP_VB_UUID_SEQNO_PKT_FMT, \
191                                            data[start:start + step])
192                            if pair_index not in self.cur['failoverlog']:
193                                self.cur['failoverlog'][pair_index] = {}
194                            if opaque not in self.cur['failoverlog'][pair_index] or \
195                               not self.cur['failoverlog'][pair_index][opaque]:
196                                self.cur['failoverlog'][pair_index][opaque] = [(uuid, seqno)]
197                            else:
198                                self.cur['failoverlog'][pair_index][opaque].append((uuid, seqno))
199                            start = start + step
200                    elif errcode == couchbaseConstants.ERR_KEY_ENOENT:
201                        logging.warn("producer doesn't know about the vbucket uuid, rollback to 0")
202                        vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_end_seqno = \
203                            self.stream_list[opaque]
204                        del self.stream_list[opaque]
205                    elif errcode == couchbaseConstants.ERR_KEY_EEXISTS:
206                       logging.warn("a stream exists on the connection for vbucket:%s" % opaque)
207                    elif errcode ==  couchbaseConstants.ERR_NOT_MY_VBUCKET:
208                        logging.warn("Vbucket is not active anymore, skip it:%s" % vbid)
209                        del self.stream_list[opaque]
210                    elif errcode == couchbaseConstants.ERR_ERANGE:
211                        #logging.warn("Start or end sequence numbers specified incorrectly,(%s, %s)" % \
212                        #             (start_seqno, end_seqno))
213                        del self.stream_list[opaque]
214                    elif errcode == couchbaseConstants.ERR_ROLLBACK:
215                        vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_stop_seqno = \
216                            self.stream_list[opaque]
217                        start_seqno, = struct.unpack(couchbaseConstants.DCP_VB_SEQNO_PKT_FMT, data)
218                        #find the most latest uuid, hi_seqno that fit start_seqno
219                        if self.cur['failoverlog']:
220                            pair_index = (self.source_bucket['name'], self.source_node['hostname'])
221                            if self.cur['failoverlog'][pair_index].get("vbid"):
222                                for uuid, seqno in self.cur['failoverlog'][pair_index][vbid]:
223                                    if start_seqno >= seqno:
224                                        vb_uuid = uuid
225                                        break
226                        ss_start_seqno = start_seqno
227                        ss_end_seqno = start_seqno
228                        if self.cur['snapshot']:
229                            pair_index = (self.source_bucket['name'], self.source_node['hostname'])
230                            if self.cur['snapshot'][pair_index].get("vbid"):
231                                ss_start_seqno, ss_end_seqno = self.cur['snapshot'][pair_index][vbid]
232                        self.request_dcp_stream(vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_end_seqno)
233
234                        del self.stream_list[opaque]
235                        self.stream_list[opaque] = \
236                            (vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_end_seqno)
237                elif cmd == couchbaseConstants.CMD_DCP_MUTATION:
238                    vbucket_id = errcode
239                    seqno, rev_seqno, flg, exp, locktime, metalen, nru = \
240                        struct.unpack(couchbaseConstants.DCP_MUTATION_PKT_FMT, data[0:extlen])
241                    key_start = extlen
242                    val_start = key_start + keylen
243                    key = data[extlen:val_start]
244                    val = data[val_start:]
245                    if not self.skip(key, vbucket_id):
246                        msg = (cmd, vbucket_id, key, flg, exp, cas, rev_seqno, val, seqno, dtype, \
247                               metalen)
248                        batch.append(msg, len(val))
249                        self.num_msg += 1
250                elif cmd == couchbaseConstants.CMD_DCP_DELETE or \
251                     cmd == couchbaseConstants.CMD_DCP_EXPIRATION:
252                    vbucket_id = errcode
253                    seqno, rev_seqno, metalen = \
254                        struct.unpack(couchbaseConstants.DCP_DELETE_PKT_FMT, data[0:extlen])
255                    key_start = extlen
256                    val_start = key_start + keylen
257                    key = data[extlen:val_start]
258                    if not self.skip(key, vbucket_id):
259                        msg = (cmd, vbucket_id, key, flg, exp, cas, rev_seqno, val, seqno, dtype, \
260                               metalen)
261                        batch.append(msg, len(val))
262                        self.num_msg += 1
263                    if cmd == couchbaseConstants.CMD_DCP_DELETE:
264                        batch.adjust_size += 1
265                elif cmd == couchbaseConstants.CMD_DCP_FLUSH:
266                    logging.warn("stopping: saw CMD_DCP_FLUSH")
267                    self.dcp_done = True
268                    break
269                elif cmd == couchbaseConstants.CMD_DCP_END_STREAM:
270                    del self.stream_list[opaque]
271                    if not len(self.stream_list):
272                        self.dcp_done = True
273                elif cmd == couchbaseConstants.CMD_DCP_SNAPSHOT_MARKER:
274                    ss_start_seqno, ss_end_seqno, _ = \
275                        struct.unpack(couchbaseConstants.DCP_SNAPSHOT_PKT_FMT, data[0:extlen])
276                    pair_index = (self.source_bucket['name'], self.source_node['hostname'])
277                    if not self.cur['snapshot']:
278                        self.cur['snapshot'] = {}
279                    if pair_index not in self.cur['snapshot']:
280                        self.cur['snapshot'][pair_index] = {}
281                    self.cur['snapshot'][pair_index][opaque] = (ss_start_seqno, ss_end_seqno)
282                elif cmd == couchbaseConstants.CMD_DCP_NOOP:
283                    need_ack = True
284                elif cmd == couchbaseConstants.CMD_DCP_BUFFER_ACK:
285                    if errcode != couchbaseConstants.ERR_SUCCESS:
286                        logging.warning("buffer ack response errcode:%s" % errcode)
287                    continue
288                else:
289                    logging.warn("warning: unexpected DCP message: %s" % cmd)
290                    return "unexpected DCP message: %s" % cmd, batch
291
292                if need_ack:
293                    self.ack_last = True
294                    try:
295                        dcp_conn._sendMsg(cmd, '', '', opaque, vbucketId=0,
296                                          fmt=couchbaseConstants.RES_PKT_FMT,
297                                          magic=couchbaseConstants.RES_MAGIC_BYTE)
298                    except socket.error:
299                        return ("error: socket.error on send();"
300                                " perhaps the source server: %s was rebalancing"
301                                " or had connectivity/server problems" %
302                                (self.source_node['hostname'])), batch
303                    except EOFError:
304                        self.dcp_done = True
305                        return ("error: EOFError on socket send();"
306                                " perhaps the source server: %s was rebalancing"
307                                " or had connectivity/server problems" %
308                                (self.source_node['hostname'])), batch
309
310                    # Close the batch when there's an ACK handshake, so
311                    # the server can concurrently send us the next batch.
312                    # If we are slow, our slow ACK's will naturally slow
313                    # down the server.
314                    self.ack_buffer_size(total_bytes_read - last_processed)
315                    return 0, batch
316
317                self.ack_last = False
318                self.cmd_last = cmd
319
320        except EOFError:
321            if batch.size() <= 0 and self.ack_last:
322                # A closed conn after an ACK means clean end of TAP dump.
323                self.dcp_done = True
324
325        if batch.size() <= 0:
326            return 0, None
327        self.ack_buffer_size(total_bytes_read - last_processed)
328        return 0, batch
329
330    def get_dcp_conn(self):
331        """Return previously connected dcp conn."""
332
333        if not self.dcp_conn:
334            host = self.source_node['hostname'].split(':')[0]
335            port = self.source_node['ports']['direct']
336            version = self.source_node['version']
337
338            logging.debug("  DCPStreamSource connecting mc: " +
339                          host + ":" + str(port))
340
341            self.dcp_conn = cb_bin_client.MemcachedClient(host, port)
342            if not self.dcp_conn:
343                return "error: could not connect to memcached: " + \
344                    host + ":" + str(port), None
345            self.mem_conn = cb_bin_client.MemcachedClient(host, port)
346            if not self.dcp_conn:
347                return "error: could not connect to memcached: " + \
348                    host + ":" + str(port), None
349            sasl_user = str(pump.get_username(self.source_bucket.get("name", self.opts.username)))
350            sasl_pswd = str(pump.get_password(self.source_bucket.get("saslPassword", self.opts.password)))
351            if sasl_user:
352                try:
353                    self.dcp_conn.sasl_auth_cram_md5(sasl_user, sasl_pswd)
354                    self.mem_conn.sasl_auth_cram_md5(sasl_user, sasl_pswd)
355                except cb_bin_client.MemcachedError:
356                    try:
357                        self.dcp_conn.sasl_auth_plain(sasl_user, sasl_pswd)
358                        self.mem_conn.sasl_auth_plain(sasl_user, sasl_pswd)
359                    except EOFError:
360                        return "error: SASL auth error: %s:%s, user: %s" % \
361                            (host, port, sasl_user), None
362                    except cb_bin_client.MemcachedError:
363                        return "error: SASL auth failed: %s:%s, user: %s" % \
364                            (host, port, sasl_user), None
365                    except socket.error:
366                        return "error: SASL auth socket error: %s:%s, user: %s" % \
367                            (host, port, sasl_user), None
368                except EOFError:
369                    return "error: SASL auth error: %s:%s, user: %s" % \
370                        (host, port, sasl_user), None
371                except socket.error:
372                    return "error: SASL auth socket error: %s:%s, user: %s" % \
373                        (host, port, sasl_user), None
374            extra = struct.pack(couchbaseConstants.DCP_CONNECT_PKT_FMT, 0, \
375                                couchbaseConstants.FLAG_DCP_PRODUCER)
376            try:
377                opaque=self.r.randint(0, 2**32)
378                self.dcp_conn._sendCmd(couchbaseConstants.CMD_DCP_CONNECT, self.dcp_name, \
379                                       '', opaque, extra)
380                self.dcp_conn._handleSingleResponse(opaque)
381
382                # set connection buffer size. Considering header size, we roughly
383                # set the total received package size as 10 times as value size.
384                opaque=self.r.randint(0, 2**32)
385                self.dcp_conn._sendCmd(couchbaseConstants.CMD_DCP_CONTROL,
386                                       couchbaseConstants.KEY_DCP_CONNECTION_BUFFER_SIZE,
387                                       str(self.batch_max_bytes * 10), opaque)
388                self.dcp_conn._handleSingleResponse(opaque)
389            except EOFError:
390                return "error: Fail to set up DCP connection", None
391            except cb_bin_client.MemcachedError:
392                return "error: DCP connect memcached error", None
393            except socket.error:
394                return "error: DCP connection error", None
395            self.running = True
396            self.start()
397
398            self.setup_dcp_streams()
399        return 0, self.dcp_conn
400
401    def ack_buffer_size(self, buf_size):
402        try:
403            opaque=self.r.randint(0, 2**32)
404            self.dcp_conn._sendCmd(couchbaseConstants.CMD_DCP_BUFFER_ACK, '', '', \
405                opaque, struct.pack(">I", int(buf_size)))
406            logging.debug("Send buffer size: %d" % buf_size)
407        except socket.error:
408            return "error: socket error during sending buffer ack msg"
409        except EOFError:
410            return "error: send buffer ack msg"
411
412        return None
413
414    def run(self):
415        if not self.dcp_conn:
416            logging.error("socket to memcached server is not created yet.")
417            return
418
419        bytes_read = ''
420        rd_timeout = 1
421        desc = [self.dcp_conn.s]
422        extra_bytes = 0
423        while self.running:
424            readers, writers, errors = select.select(desc, [], [], rd_timeout)
425            rd_timeout = .25
426
427            for reader in readers:
428                data = reader.recv(self.recv_min_bytes)
429                logging.debug("Read %d bytes off the wire" % len(data))
430                if len(data) == 0:
431                    raise exceptions.EOFError("Got empty data (remote died?).")
432                bytes_read += data
433            while len(bytes_read) >= couchbaseConstants.MIN_RECV_PACKET:
434                magic, opcode, keylen, extlen, datatype, status, bodylen, opaque, cas= \
435                    struct.unpack(couchbaseConstants.RES_PKT_FMT, \
436                                  bytes_read[0:couchbaseConstants.MIN_RECV_PACKET])
437
438                if len(bytes_read) < (couchbaseConstants.MIN_RECV_PACKET+bodylen):
439                    extra_bytes = len(bytes_read)
440                    break
441
442                rd_timeout = 0
443                body = bytes_read[couchbaseConstants.MIN_RECV_PACKET : \
444                                  couchbaseConstants.MIN_RECV_PACKET+bodylen]
445                bytes_read = bytes_read[couchbaseConstants.MIN_RECV_PACKET+bodylen:]
446                self.response.put((opcode, status, opaque, cas, keylen, extlen, body, \
447                                   bodylen, datatype, \
448                                   couchbaseConstants.MIN_RECV_PACKET+bodylen+extra_bytes))
449                extra_bytes = 0
450
451    def setup_dcp_streams(self):
452        #send request to retrieve vblist and uuid for the node
453        stats = self.mem_conn.stats("vbucket-seqno")
454        if not stats:
455            return "error: fail to retrive vbucket seqno", None
456        self.mem_conn.close()
457
458        uuid_list = {}
459        seqno_list = {}
460        vb_list = {}
461        for key, val in stats.items():
462            vb, counter = key.split(":")
463            if counter in [DCPStreamSource.VB_UUID,
464                           DCPStreamSource.HIGH_SEQNO,
465                           DCPStreamSource.ABS_HIGH_SEQNO,
466                           DCPStreamSource.PURGE_SEQNO]:
467                if not vb_list.has_key(vb[3:]):
468                    vb_list[vb[3:]] = {}
469                vb_list[vb[3:]][counter] = int(val)
470        flags = 0
471        pair_index = (self.source_bucket['name'], self.source_node['hostname'])
472        for vbid in vb_list.iterkeys():
473            if int(vbid) not in self.node_vbucket_map:
474                #skip nonactive vbucket
475                continue
476            if self.cur['seqno'] and self.cur['seqno'][pair_index]:
477                start_seqno = self.cur['seqno'][pair_index][vbid]
478            else:
479                start_seqno = 0
480            uuid = 0
481            if self.cur['failoverlog'] and self.cur['failoverlog'][pair_index]:
482                if vbid in self.cur['failoverlog'][pair_index] and \
483                   self.cur['failoverlog'][pair_index][vbid]:
484                    #Use the latest failover log
485                    self.cur['failoverlog'][pair_index][vbid] = \
486                        sorted(self.cur['failoverlog'][pair_index][vbid], \
487                               key=lambda tup: tup[1],
488                               reverse=True)
489                    uuid, _ = self.cur['failoverlog'][pair_index][vbid][0]
490            ss_start_seqno = start_seqno
491            ss_end_seqno = start_seqno
492            if self.cur['snapshot'] and self.cur['snapshot'][pair_index]:
493                if vbid in self.cur['snapshot'][pair_index] and \
494                   self.cur['snapshot'][pair_index][vbid]:
495                    ss_start_seqno, ss_end_seqno = self.cur['snapshot'][pair_index][vbid]
496            self.request_dcp_stream(int(vbid), flags, start_seqno,
497                                    vb_list[vbid][DCPStreamSource.HIGH_SEQNO],
498                                    uuid, ss_start_seqno, ss_end_seqno)
499
500    def request_dcp_stream(self,
501                           vbid,
502                           flags,
503                           start_seqno,
504                           end_seqno,
505                           vb_uuid,
506                           ss_start_seqno,
507                           ss_end_seqno):
508        if not self.dcp_conn:
509            return "error: no dcp connection setup yet.", None
510        extra = struct.pack(couchbaseConstants.DCP_STREAM_REQ_PKT_FMT,
511                            int(flags), 0,
512                            int(start_seqno),
513                            int(end_seqno),
514                            int(vb_uuid),
515                            int(ss_start_seqno),
516                            int(ss_end_seqno))
517        self.dcp_conn._sendMsg(couchbaseConstants.CMD_DCP_REQUEST_STREAM, '', '', vbid, \
518                               extra, 0, 0, vbid)
519        self.stream_list[vbid] = (vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_end_seqno)
520
521    def _read_dcp_conn(self, dcp_conn):
522        buf, cmd, errcode, opaque, cas, keylen, extlen, data, datalen = \
523            self.recv_dcp_msg(dcp_conn.s)
524        dcp_conn.buf = buf
525
526        rv = 0
527        metalen = flags = ttl = flg = exp = 0
528        meta = key = val = ext = ''
529        need_ack = False
530        extlen = seqno = 0
531        if data:
532            ext = data[0:extlen]
533            if cmd == couchbaseConstants.CMD_DCP_MUTATION:
534                seqno, rev_seqno, flg, exp, locktime, meta = \
535                    struct.unpack(couchbaseConstants.DCP_MUTATION_PKT_FMT, ext)
536                key_start = extlen
537                val_start = key_start + keylen
538                key = data[extlen:val_start]
539                val = data[val_start:]
540            elif cmd == couchbaseConstants.CMD_DCP_DELETE or \
541                 cmd == couchbaseConstants.CMD_DCP_EXPIRATION:
542                seqno, rev_seqno, meta = \
543                    struct.unpack(couchbaseConstants.DCP_DELETE_PKT_FMT, ext)
544                key_start = extlen
545                val_start = key_start + keylen
546                key = data[extlen:val_start]
547            else:
548                rv = "error: uninterpreted DCP commands:%s" % cmd
549        elif datalen:
550            rv = "error: could not read full TAP message body"
551
552        return rv, cmd, errcode, key, flg, exp, cas, meta, val, opaque, need_ack, seqno
553
554    def _recv_dcp_msg_error(self, sock, buf):
555        pkt, buf = self.recv_dcp(sock, couchbaseConstants.MIN_RECV_PACKET, buf)
556        if not pkt:
557            raise EOFError()
558        magic, cmd, keylen, extlen, dtype, errcode, datalen, opaque, cas = \
559            struct.unpack(couchbaseConstants.REQ_PKT_FMT, pkt)
560        if magic != couchbaseConstants.REQ_MAGIC_BYTE:
561            raise Exception("unexpected recv_msg magic: " + str(magic))
562        data, buf = self.recv_dcp(sock, datalen, buf)
563        return buf, cmd, errcode, opaque, cas, keylen, extlen, data, datalen
564
565    def _recv_dcp_msg(self, sock):
566        response = ""
567        while len(response) < couchbaseConstants.MIN_RECV_PACKET:
568            data = sock.recv(couchbaseConstants.MIN_RECV_PACKET - len(response))
569            if data == '':
570                raise exceptions.EOFError("Got empty data (remote died?).")
571            response += data
572        assert len(response) == couchbaseConstants.MIN_RECV_PACKET
573        magic, cmd, keylen, extralen, dtype, errcode, remaining, opaque, cas=\
574            struct.unpack(couchbaseConstants.RES_PKT_FMT, response)
575
576        datalen = remaining
577        rv = ""
578        while remaining > 0:
579            data = sock.recv(remaining)
580            if data == '':
581                raise exceptions.EOFError("Got empty data (remote died?).")
582            rv += data
583            remaining -= len(data)
584
585        assert (magic in (couchbaseConstants.RES_MAGIC_BYTE, couchbaseConstants.REQ_MAGIC_BYTE)), \
586               "Got magic: %d" % magic
587        return cmd, errcode, opaque, cas, keylen, extralen, rv, datalen
588
589    def _recv_dcp(self, skt, nbytes, buf):
590        recv_arr = [ buf ]
591        recv_tot = len(buf) # In bytes.
592
593        while recv_tot < nbytes:
594            data = None
595            try:
596                data = skt.recv(max(nbytes - len(buf), self.recv_min_bytes))
597            except socket.timeout:
598                logging.error("error: recv socket.timeout")
599            except Exception, e:
600                msg = str(e)
601                if msg.find("Connection reset by peer") >= 0:
602                    logging.error("error: recv exception: " + \
603                        "%s, %s may be inactive" % (msg, self.source_node["hostname"]))
604                else:
605                    logging.error("error: recv exception: " + str(e))
606
607
608            if not data:
609                return None, ''
610
611            recv_arr.append(data)
612            recv_tot += len(data)
613
614        joined = ''.join(recv_arr)
615
616        return joined[:nbytes], joined[nbytes:]
617
618    @staticmethod
619    def total_msgs(opts, source_bucket, source_node, source_map):
620        source_name = source_node.get("hostname", None)
621        if not source_name:
622            return 0, None
623
624        spec = source_map['spec']
625        name = source_bucket['name']
626        vbuckets_num = len(source_bucket['vBucketServerMap']['vBucketMap'])
627        if not vbuckets_num:
628            return 0, None
629
630        vbucket_list = getattr(opts, "vbucket_list", None)
631
632        stats_vals = {}
633        host, port, user, pswd, _ = pump.parse_spec(opts, spec, 8091)
634        for stats in ["curr_items", "vb_active_resident_items_ratio"]:
635            path = "/pools/default/buckets/%s/stats/%s" % (name, stats)
636            err, json, data = pump.rest_request_json(host, int(port),
637                                                     user, pswd, path,
638                                                     reason="total_msgs")
639            if err:
640                return 0, None
641
642            nodeStats = data.get("nodeStats", None)
643            if not nodeStats:
644                return 0, None
645            vals = nodeStats.get(source_name, None)
646            if not vals:
647                return 0, None
648            stats_vals[stats] = vals[-1]
649
650        total_msgs = stats_vals["curr_items"]
651        resident_ratio = stats_vals["vb_active_resident_items_ratio"]
652        if 0 < resident_ratio < 100:
653            #for DGM case, server will transfer both in-memory items and
654            #backfill all items on disk
655            total_msgs += (resident_ratio/100.0) * stats_vals["curr_items"]
656        return 0, int(total_msgs)
657
658class DCPSink(pump_cb.CBSink):
659    """Smart client using DCP protocol to steam data to couchbase cluster."""
660    def __init__(self, opts, spec, source_bucket, source_node,
661                 source_map, sink_map, ctl, cur):
662        super(DCPSink, self).__init__(opts, spec, source_bucket, source_node,
663                                     source_map, sink_map, ctl, cur)
664
665        self.vbucket_list = getattr(opts, "vbucket_list", None)
666
667    @staticmethod
668    def check_base(opts, spec):
669        #Overwrite CBSink.check_base() to allow replica vbucket state
670
671        op = getattr(opts, "destination_operation", None)
672        if op not in [None, 'set', 'add', 'get']:
673            return ("error: --destination-operation unsupported value: %s" +
674                    "; use set, add, get") % op
675
676        return pump.EndPoint.check_base(opts, spec)
677
678    def scatter_gather(self, mconns, batch):
679        sink_map_buckets = self.sink_map['buckets']
680        if len(sink_map_buckets) != 1:
681            return "error: CBSink.run() expected 1 bucket in sink_map", None, None
682
683        vbuckets_num = len(sink_map_buckets[0]['vBucketServerMap']['vBucketMap'])
684        vbuckets = batch.group_by_vbucket_id(vbuckets_num, self.rehash)
685
686        # Scatter or send phase.
687        for vbucket_id, msgs in vbuckets.iteritems():
688            rv, conn = self.find_conn(mconns, vbucket_id)
689            if rv != 0:
690                return rv, None, None
691            rv = self.send_msgs(conn, msgs, self.operation(),
692                                vbucket_id=vbucket_id)
693            if rv != 0:
694                return rv, None, None
695
696        # Yield to let other threads do stuff while server's processing.
697        time.sleep(0.01)
698
699        # Gather or recv phase.
700        # Only process the last msg which requires ack request
701        last_msg = []
702        retry_batch = None
703        need_refresh = False
704        for vbucket_id, msgs in vbuckets.iteritems():
705            last_msg.append(msgs[-1])
706            rv, conn = self.find_conn(mconns, vbucket_id)
707            if rv != 0:
708                return rv, None, None
709            rv, retry, refresh = \
710                self.recv_msgs(conn, last_msg, vbucket_id=vbucket_id, verify_opaque=False)
711            if rv != 0:
712                return rv, None, None
713            if retry:
714                retry_batch = batch
715            if refresh:
716                need_refresh = True
717
718        return 0, retry_batch, retry_batch and not need_refresh
719
720    def connect_mc(self, node, sasl_user, sasl_pswd):
721        """Return previously connected dcp conn."""
722
723        host = node.split(':')[0]
724        port = node.split(':')[1]
725
726        logging.debug("  DCPSink connecting mc: " +
727                      host + ":" + str(port))
728
729        dcp_conn = cb_bin_client.MemcachedClient(host, port)
730        if not dcp_conn:
731            return "error: could not connect to memcached: " + \
732                host + ":" + str(port), None
733        if sasl_user:
734            try:
735                dcp_conn.sasl_auth_plain(sasl_user, sasl_pswd)
736            except EOFError:
737                return "error: SASL auth error: %s:%s, user: %s" % \
738                    (host, port, sasl_user), None
739            except cb_bin_client.MemcachedError:
740                return "error: SASL auth failed: %s:%s, user: %s" % \
741                    (host, port, sasl_user), None
742            except socket.error:
743                return "error: SASL auth socket error: %s:%s, user: %s" % \
744                    (host, port, sasl_user), None
745        extra = struct.pack(couchbaseConstants.DCP_CONNECT_PKT_FMT, 0, \
746                            couchbaseConstants.FLAG_DCP_CONSUMER)
747        try:
748            opaque=self.r.randint(0, 2**32)
749            dcp_conn._sendCmd(couchbaseConstants.CMD_DCP_CONNECT, self.dcp_name, '', opaque, extra)
750            dcp_conn._handleSingleResponse(opaque)
751        except EOFError:
752            return "error: Fail to set up DCP connection", None
753        except cb_bin_client.MemcachedError:
754            return "error: DCP connect memcached error", None
755        except socket.error:
756            return "error: DCP connection error", None
757
758        return 0, dcp_conn
759
760    def find_conn(self, mconns, vbucket_id):
761        if not self.vbucket_list:
762            return super(DCPSink, self).find_conn(mconns, vbucket_id)
763
764        vbuckets = json.loads(self.vbucket_list)
765        if isinstance(vbuckets, list):
766            if vbucket_id not in vbuckets:
767                return "error: unexpected vbucket id:" + str(vbucket_id), None
768            return super(DCPSink, self).find_conn(mconns, vbucket_id)
769
770        bucket = self.sink_map['buckets'][0]
771        serverList = bucket['vBucketServerMap']['serverList']
772        if serverList:
773            port_number = serverList[0].split(":")[-1]
774        else:
775            port_number = 11210
776
777        conn = None
778        if isinstance(vbuckets, dict):
779            for node, vblist in vbuckets.iteritems():
780                for vbucket in vblist:
781                    if int(vbucket) == int(vbucket_id):
782                        host_port = "%s:%s" % (node, port_number)
783                        conn = mconns.get(host_port, None)
784                        if not conn:
785                            user = bucket['name']
786                            pswd = bucket['saslPassword']
787                            rv, conn = DCPSink.connect_mc(host_port, user, pswd)
788                            if rv != 0:
789                                logging.error("error: CBSink.connect() for send: " + rv)
790                                return rv, None
791                            mconns[host_port] = conn
792                        break
793        return 0, conn
794
795    def send_msgs(self, conn, msgs, operation, vbucket_id=None):
796        m = []
797        #Ask for acknowledgement for the last msg of batch
798        for i, msg in enumerate(msgs):
799            cmd, vbucket_id_msg, key, flg, exp, cas, meta, val, seqno, dtype, nmeta = msg
800            if vbucket_id is not None:
801                vbucket_id_msg = vbucket_id
802
803            if self.skip(key, vbucket_id_msg):
804                continue
805
806            if cmd == couchbaseConstants.CMD_GET:
807                val, flg, exp, cas = '', 0, 0, 0
808            if cmd == couchbaseConstants.CMD_NOOP:
809                key, val, flg, exp, cas = '', '', 0, 0, 0
810            need_ack = (i == len(msgs)-1)
811            rv, req = self.cmd_request(cmd, vbucket_id_msg, key, val,
812                                       ctypes.c_uint32(flg).value,
813                                       exp, cas, meta, i, seqno, dtype, nmeta, need_ack)
814            if rv != 0:
815                return rv
816            self.append_req(m, req)
817        if m:
818            try:
819                conn.s.send(''.join(m))
820            except socket.error, e:
821                return "error: conn.send() exception: %s" % (e)
822        return 0
823
824    def cmd_request(self, cmd, vbucket_id, key, val, flg, exp, cas, meta, opaque, seqno, dtype,\
825                    nmeta, need_ack):
826        if meta:
827            seq_no = str(meta)
828            if len(seq_no) > 8:
829                seq_no = seq_no[0:8]
830            if len(seq_no) < 8:
831                # The seq_no might be 32-bits from 2.0DP4, so pad with 0x00's.
832                seq_no = ('\x00\x00\x00\x00\x00\x00\x00\x00' + seq_no)[-8:]
833            check_seqno, = struct.unpack(">Q", seq_no)
834            if not check_seqno:
835                seq_no = ('\x00\x00\x00\x00\x00\x00\x00\x00' + 1)[-8:]
836        else:
837            seq_no = ('\x00\x00\x00\x00\x00\x00\x00\x00' + 1)[-8:]
838        metalen = len(str(seq_no))
839        locktime = 0
840        rev_seqno = 0
841        nru = 0
842        if cmd == couchbaseConstants.CMD_DCP_MUTATION:
843            ext = struct.pack(couchbaseConstants.DCP_MUTATION_PKT_FMT,
844                              seqno, rev_seqno, flg, exp, locktime, metalen, nru)
845        elif cmd == couchbaseConstants.CMD_DCP_DELETE:
846            ext = struct.pack(couchbaseConstants.DCP_DELETE_PKT_FMT,
847                              seqno, rev_seqno, metalen)
848        else:
849            return "error: MCSink - unknown tap cmd for request: " + str(cmd), None
850
851        hdr = self.cmd_header(cmd, vbucket_id, key, val, ext, cas, opaque, metalen, dtype)
852        return 0, (hdr, ext, seq_no, key, val)
853
854    def cmd_header(self, cmd, vbucket_id, key, val, ext, cas, opaque, metalen,
855                   dtype=0,
856                   fmt=couchbaseConstants.REQ_PKT_FMT,
857                   magic=couchbaseConstants.REQ_MAGIC_BYTE):
858        #MB-11902
859        dtype = 0
860        return struct.pack(fmt, magic, cmd,
861                           len(key), len(ext), dtype, vbucket_id,
862                           metalen + len(key) + len(val) + len(ext), opaque, cas)
863
864    #TODO : where is the seq_no? same or different from tap seq_no?
865    def append_req(self, m, req):
866        hdr, ext, seq_no, key, val = req
867        m.append(hdr)
868        if ext:
869            m.append(ext)
870        if seq_no:
871            m.append(seq_no)
872        if key:
873            m.append(str(key))
874        if val:
875            m.append(str(val))
876