xref: /3.0.3-GA/couchbase-cli/pump_dcp.py (revision 1d239dd7)
1#!/usr/bin/env python
2
3import logging
4import os
5import random
6import simplejson as json
7import socket
8import struct
9import time
10import threading
11import select
12import exceptions
13
14import cb_bin_client
15import couchbaseConstants
16from cbqueue import PumpQueue
17
18import pump
19import pump_mc
20import pump_cb
21import pump_tap
22
23try:
24    import ctypes
25except ImportError:
26    cb_path = '/opt/couchbase/lib/python'
27    while cb_path in sys.path:
28        sys.path.remove(cb_path)
29    try:
30        import ctypes
31    except ImportError:
32        sys.exit('error: could not import ctypes module')
33    else:
34        sys.path.insert(0, cb_path)
35
36class DCPStreamSource(pump_tap.TAPDumpSource, threading.Thread):
37    """Can read from cluster/server/bucket via DCP streaming."""
38    HIGH_SEQNO = "high_seqno"
39    VB_UUID = "uuid"
40    ABS_HIGH_SEQNO = "abs_high_seqno"
41    PURGE_SEQNO = "purge_seqno"
42
43    HIGH_SEQNO_BYTE = 8
44    UUID_BYTE = 8
45
46    def __init__(self, opts, spec, source_bucket, source_node,
47                 source_map, sink_map, ctl, cur):
48        super(DCPStreamSource, self).__init__(opts, spec, source_bucket, source_node,
49                                            source_map, sink_map, ctl, cur)
50        threading.Thread.__init__(self)
51
52        self.dcp_done = False
53        self.dcp_conn = None
54        self.mem_conn = None
55        self.dcp_name = opts.process_name
56        self.ack_last = False
57        self.cmd_last = None
58        self.num_msg = 0
59        self.version_supported = self.source_node['version'].split(".") >= ["3", "0", "0"]
60        self.recv_min_bytes = int(opts.extra.get("recv_min_bytes", 4096))
61        self.batch_max_bytes = int(opts.extra.get("batch_max_bytes", 400000))
62        self.flow_control = int(opts.extra.get("flow_control", 1))
63        self.vbucket_list = getattr(opts, "vbucket_list", None)
64        self.r=random.Random()
65        self.queue = PumpQueue()
66        self.response = PumpQueue()
67        self.running = False
68        self.stream_list = {}
69        self.unack_size = 0
70        self.node_vbucket_map = None
71
72    @staticmethod
73    def can_handle(opts, spec):
74        return (spec.startswith("http://") or
75                spec.startswith("couchbase://"))
76
77    @staticmethod
78    def check(opts, spec):
79        return pump_tap.TAPDumpSource.check(opts, spec)
80
81    @staticmethod
82    def provide_design(opts, source_spec, source_bucket, source_map):
83        return pump_tap.TAPDumpSource.provide_design(opts, source_spec, source_bucket, source_map);
84
85    def build_node_vbucket_map(self):
86        if self.source_bucket.has_key("vBucketServerMap"):
87            server_list = self.source_bucket["vBucketServerMap"]["serverList"]
88            vbucket_map = self.source_bucket["vBucketServerMap"]["vBucketMap"]
89        else:
90            return None
91
92        node_vbucket_map = []
93        nodename = self.source_node.get('hostname', 'N/A').split(":")[0]
94        nodeindex = -1
95        for index, node in enumerate(server_list):
96            if nodename in node:
97                nodeindex = index
98                break
99        for index, vblist in enumerate(vbucket_map):
100            if vblist[0] >= 0 and vblist[0] == nodeindex:
101                node_vbucket_map.append(index)
102        return node_vbucket_map
103
104    def provide_batch(self):
105        if not self.version_supported:
106            return "error: cannot back up 2.x or older clusters with 3.x tools", None
107
108        cur_sleep = 0.2
109        cur_retry = 0
110        max_retry = self.opts.extra['max_retry']
111        if not self.node_vbucket_map:
112            self.node_vbucket_map = self.build_node_vbucket_map()
113
114        while True:
115            if self.dcp_done:
116                return 0, None
117
118            rv, dcp_conn = self.get_dcp_conn()
119            if rv != 0:
120                self.dcp_done = True
121                return rv, None
122
123            rv, batch = self.provide_dcp_batch_actual(dcp_conn)
124            if rv == 0:
125                return rv, batch
126
127            if self.dcp_conn:
128                self.dcp_conn.close()
129                self.dcp_conn = None
130
131            if cur_retry > max_retry:
132                self.dcp_done = True
133                return rv, batch
134
135            logging.warn("backoff: %s, sleeping: %s, on error: %s" %
136                         (cur_retry, cur_sleep, rv))
137            time.sleep(cur_sleep)
138            cur_sleep = min(cur_sleep * 2, 20) # Max backoff sleep 20 seconds.
139            cur_retry = cur_retry + 1
140
141    def provide_dcp_batch_actual(self, dcp_conn):
142        batch = pump.Batch(self)
143
144        batch_max_size = self.opts.extra['batch_max_size']
145        batch_max_bytes = self.opts.extra['batch_max_bytes']
146        delta_ack_size = batch_max_bytes * 10 / 4 #ack every 25% of buffer size
147        last_processed = 0
148        total_bytes_read = 0
149
150        vbid = 0
151        cmd = 0
152        start_seqno = 0
153        end_seqno = 0
154        vb_uuid = 0
155        hi_seqno = 0
156        ss_start_seqno = 0
157        ss_end_seqno = 0
158        try:
159            while (not self.dcp_done and
160                   batch.size() < batch_max_size and
161                   batch.bytes < batch_max_bytes):
162
163                if self.response.empty():
164                    if len(self.stream_list) > 0:
165                        time.sleep(.25)
166                    else:
167                        self.dcp_done = True
168                    continue
169                unprocessed_size = total_bytes_read - last_processed
170                if unprocessed_size > delta_ack_size:
171                    rv = self.ack_buffer_size(unprocessed_size)
172                    if rv:
173                        logging.error(rv)
174                    else:
175                        last_processed = total_bytes_read
176
177                cmd, errcode, opaque, cas, keylen, extlen, data, datalen, dtype, bytes_read = \
178                    self.response.get()
179                total_bytes_read += bytes_read
180                rv = 0
181                metalen = flags = flg = exp = 0
182                key = val = ext = ''
183                need_ack = False
184                seqno = 0
185                if cmd == couchbaseConstants.CMD_DCP_REQUEST_STREAM:
186                    if errcode == couchbaseConstants.ERR_SUCCESS:
187                        pair_index = (self.source_bucket['name'], self.source_node['hostname'])
188                        start = 0
189                        step = DCPStreamSource.HIGH_SEQNO_BYTE + DCPStreamSource.UUID_BYTE
190                        while start+step <= datalen:
191                            uuid, seqno = struct.unpack(
192                                            couchbaseConstants.DCP_VB_UUID_SEQNO_PKT_FMT, \
193                                            data[start:start + step])
194                            if pair_index not in self.cur['failoverlog']:
195                                self.cur['failoverlog'][pair_index] = {}
196                            if opaque not in self.cur['failoverlog'][pair_index] or \
197                               not self.cur['failoverlog'][pair_index][opaque]:
198                                self.cur['failoverlog'][pair_index][opaque] = [(uuid, seqno)]
199                            else:
200                                self.cur['failoverlog'][pair_index][opaque].append((uuid, seqno))
201                            start = start + step
202                    elif errcode == couchbaseConstants.ERR_KEY_ENOENT:
203                        logging.warn("producer doesn't know about the vbucket uuid, rollback to 0")
204                        vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_end_seqno = \
205                            self.stream_list[opaque]
206                        del self.stream_list[opaque]
207                    elif errcode == couchbaseConstants.ERR_KEY_EEXISTS:
208                       logging.warn("a stream exists on the connection for vbucket:%s" % opaque)
209                    elif errcode ==  couchbaseConstants.ERR_NOT_MY_VBUCKET:
210                        logging.warn("Vbucket is not active anymore, skip it:%s" % vbid)
211                        del self.stream_list[opaque]
212                    elif errcode == couchbaseConstants.ERR_ERANGE:
213                        #logging.warn("Start or end sequence numbers specified incorrectly,(%s, %s)" % \
214                        #             (start_seqno, end_seqno))
215                        del self.stream_list[opaque]
216                    elif errcode == couchbaseConstants.ERR_ROLLBACK:
217                        vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_stop_seqno = \
218                            self.stream_list[opaque]
219                        start_seqno, = struct.unpack(couchbaseConstants.DCP_VB_SEQNO_PKT_FMT, data)
220                        #find the most latest uuid, hi_seqno that fit start_seqno
221                        if self.cur['failoverlog']:
222                            pair_index = (self.source_bucket['name'], self.source_node['hostname'])
223                            if self.cur['failoverlog'][pair_index].get("vbid"):
224                                for uuid, seqno in self.cur['failoverlog'][pair_index][vbid]:
225                                    if start_seqno >= seqno:
226                                        vb_uuid = uuid
227                                        break
228                        ss_start_seqno = start_seqno
229                        ss_end_seqno = start_seqno
230                        if self.cur['snapshot']:
231                            pair_index = (self.source_bucket['name'], self.source_node['hostname'])
232                            if self.cur['snapshot'][pair_index].get("vbid"):
233                                ss_start_seqno, ss_end_seqno = self.cur['snapshot'][pair_index][vbid]
234                        self.request_dcp_stream(vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_end_seqno)
235
236                        del self.stream_list[opaque]
237                        self.stream_list[opaque] = \
238                            (vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_end_seqno)
239                elif cmd == couchbaseConstants.CMD_DCP_MUTATION:
240                    vbucket_id = errcode
241                    seqno, rev_seqno, flg, exp, locktime, metalen, nru = \
242                        struct.unpack(couchbaseConstants.DCP_MUTATION_PKT_FMT, data[0:extlen])
243                    key_start = extlen
244                    val_start = key_start + keylen
245                    key = data[extlen:val_start]
246                    val = data[val_start:]
247                    if not self.skip(key, vbucket_id):
248                        msg = (cmd, vbucket_id, key, flg, exp, cas, rev_seqno, val, seqno, dtype, \
249                               metalen)
250                        batch.append(msg, len(val))
251                        self.num_msg += 1
252                elif cmd == couchbaseConstants.CMD_DCP_DELETE or \
253                     cmd == couchbaseConstants.CMD_DCP_EXPIRATION:
254                    vbucket_id = errcode
255                    seqno, rev_seqno, metalen = \
256                        struct.unpack(couchbaseConstants.DCP_DELETE_PKT_FMT, data[0:extlen])
257                    key_start = extlen
258                    val_start = key_start + keylen
259                    key = data[extlen:val_start]
260                    if not self.skip(key, vbucket_id):
261                        msg = (cmd, vbucket_id, key, flg, exp, cas, rev_seqno, val, seqno, dtype, \
262                               metalen)
263                        batch.append(msg, len(val))
264                        self.num_msg += 1
265                    if cmd == couchbaseConstants.CMD_DCP_DELETE:
266                        batch.adjust_size += 1
267                elif cmd == couchbaseConstants.CMD_DCP_FLUSH:
268                    logging.warn("stopping: saw CMD_DCP_FLUSH")
269                    self.dcp_done = True
270                    break
271                elif cmd == couchbaseConstants.CMD_DCP_END_STREAM:
272                    del self.stream_list[opaque]
273                    if not len(self.stream_list):
274                        self.dcp_done = True
275                elif cmd == couchbaseConstants.CMD_DCP_SNAPSHOT_MARKER:
276                    ss_start_seqno, ss_end_seqno, _ = \
277                        struct.unpack(couchbaseConstants.DCP_SNAPSHOT_PKT_FMT, data[0:extlen])
278                    pair_index = (self.source_bucket['name'], self.source_node['hostname'])
279                    if not self.cur['snapshot']:
280                        self.cur['snapshot'] = {}
281                    if pair_index not in self.cur['snapshot']:
282                        self.cur['snapshot'][pair_index] = {}
283                    self.cur['snapshot'][pair_index][opaque] = (ss_start_seqno, ss_end_seqno)
284                elif cmd == couchbaseConstants.CMD_DCP_NOOP:
285                    need_ack = True
286                elif cmd == couchbaseConstants.CMD_DCP_BUFFER_ACK:
287                    if errcode != couchbaseConstants.ERR_SUCCESS:
288                        logging.warning("buffer ack response errcode:%s" % errcode)
289                    continue
290                else:
291                    logging.warn("warning: unexpected DCP message: %s" % cmd)
292                    return "unexpected DCP message: %s" % cmd, batch
293
294                if need_ack:
295                    self.ack_last = True
296                    try:
297                        dcp_conn._sendMsg(cmd, '', '', opaque, vbucketId=0,
298                                          fmt=couchbaseConstants.RES_PKT_FMT,
299                                          magic=couchbaseConstants.RES_MAGIC_BYTE)
300                    except socket.error:
301                        return ("error: socket.error on send();"
302                                " perhaps the source server: %s was rebalancing"
303                                " or had connectivity/server problems" %
304                                (self.source_node['hostname'])), batch
305                    except EOFError:
306                        self.dcp_done = True
307                        return ("error: EOFError on socket send();"
308                                " perhaps the source server: %s was rebalancing"
309                                " or had connectivity/server problems" %
310                                (self.source_node['hostname'])), batch
311
312                    # Close the batch when there's an ACK handshake, so
313                    # the server can concurrently send us the next batch.
314                    # If we are slow, our slow ACK's will naturally slow
315                    # down the server.
316                    self.ack_buffer_size(total_bytes_read - last_processed)
317                    return 0, batch
318
319                self.ack_last = False
320                self.cmd_last = cmd
321
322        except EOFError:
323            if batch.size() <= 0 and self.ack_last:
324                # A closed conn after an ACK means clean end of TAP dump.
325                self.dcp_done = True
326
327        if batch.size() <= 0:
328            return 0, None
329        self.ack_buffer_size(total_bytes_read - last_processed)
330        return 0, batch
331
332    def get_dcp_conn(self):
333        """Return previously connected dcp conn."""
334
335        if not self.dcp_conn:
336            host = self.source_node['hostname'].split(':')[0]
337            port = self.source_node['ports']['direct']
338            version = self.source_node['version']
339
340            logging.debug("  DCPStreamSource connecting mc: " +
341                          host + ":" + str(port))
342
343            self.dcp_conn = cb_bin_client.MemcachedClient(host, port)
344            if not self.dcp_conn:
345                return "error: could not connect to memcached: " + \
346                    host + ":" + str(port), None
347            self.mem_conn = cb_bin_client.MemcachedClient(host, port)
348            if not self.dcp_conn:
349                return "error: could not connect to memcached: " + \
350                    host + ":" + str(port), None
351            sasl_user = str(self.source_bucket.get("name", pump.get_username(self.opts.username)))
352            sasl_pswd = str(self.source_bucket.get("saslPassword", pump.get_password(self.opts.password)))
353            if sasl_user:
354                try:
355                    self.dcp_conn.sasl_auth_cram_md5(sasl_user, sasl_pswd)
356                    self.mem_conn.sasl_auth_cram_md5(sasl_user, sasl_pswd)
357                except cb_bin_client.MemcachedError:
358                    try:
359                        self.dcp_conn.sasl_auth_plain(sasl_user, sasl_pswd)
360                        self.mem_conn.sasl_auth_plain(sasl_user, sasl_pswd)
361                    except EOFError:
362                        return "error: SASL auth error: %s:%s, user: %s" % \
363                            (host, port, sasl_user), None
364                    except cb_bin_client.MemcachedError:
365                        return "error: SASL auth failed: %s:%s, user: %s" % \
366                            (host, port, sasl_user), None
367                    except socket.error:
368                        return "error: SASL auth socket error: %s:%s, user: %s" % \
369                            (host, port, sasl_user), None
370                except EOFError:
371                    return "error: SASL auth error: %s:%s, user: %s" % \
372                        (host, port, sasl_user), None
373                except socket.error:
374                    return "error: SASL auth socket error: %s:%s, user: %s" % \
375                        (host, port, sasl_user), None
376            extra = struct.pack(couchbaseConstants.DCP_CONNECT_PKT_FMT, 0, \
377                                couchbaseConstants.FLAG_DCP_PRODUCER)
378            try:
379                opaque=self.r.randint(0, 2**32)
380                self.dcp_conn._sendCmd(couchbaseConstants.CMD_DCP_CONNECT, self.dcp_name, \
381                                       '', opaque, extra)
382                self.dcp_conn._handleSingleResponse(opaque)
383
384                if self.flow_control:
385                    # set connection buffer size. Considering header size, we roughly
386                    # set the total received package size as 10 times as value size.
387                    opaque=self.r.randint(0, 2**32)
388                    self.dcp_conn._sendCmd(couchbaseConstants.CMD_DCP_CONTROL,
389                                           couchbaseConstants.KEY_DCP_CONNECTION_BUFFER_SIZE,
390                                           str(self.batch_max_bytes * 10), opaque)
391                    self.dcp_conn._handleSingleResponse(opaque)
392            except EOFError:
393                return "error: Fail to set up DCP connection", None
394            except cb_bin_client.MemcachedError:
395                return "error: DCP connect memcached error", None
396            except socket.error:
397                return "error: DCP connection error", None
398            self.running = True
399            self.start()
400
401            self.setup_dcp_streams()
402        return 0, self.dcp_conn
403
404    def ack_buffer_size(self, buf_size):
405        if self.flow_control:
406            try:
407                opaque=self.r.randint(0, 2**32)
408                self.dcp_conn._sendCmd(couchbaseConstants.CMD_DCP_BUFFER_ACK, '', '', \
409                    opaque, struct.pack(">I", int(buf_size)))
410                logging.debug("Send buffer size: %d" % buf_size)
411            except socket.error:
412                return "error: socket error during sending buffer ack msg"
413            except EOFError:
414                return "error: send buffer ack msg"
415
416        return None
417
418    def run(self):
419        if not self.dcp_conn:
420            logging.error("socket to memcached server is not created yet.")
421            return
422
423        bytes_read = ''
424        rd_timeout = 1
425        desc = [self.dcp_conn.s]
426        extra_bytes = 0
427        while self.running:
428            readers, writers, errors = select.select(desc, [], [], rd_timeout)
429            rd_timeout = .25
430
431            for reader in readers:
432                data = reader.recv(self.recv_min_bytes)
433                logging.debug("Read %d bytes off the wire" % len(data))
434                if len(data) == 0:
435                    raise exceptions.EOFError("Got empty data (remote died?).")
436                bytes_read += data
437            while len(bytes_read) >= couchbaseConstants.MIN_RECV_PACKET:
438                magic, opcode, keylen, extlen, datatype, status, bodylen, opaque, cas= \
439                    struct.unpack(couchbaseConstants.RES_PKT_FMT, \
440                                  bytes_read[0:couchbaseConstants.MIN_RECV_PACKET])
441
442                if len(bytes_read) < (couchbaseConstants.MIN_RECV_PACKET+bodylen):
443                    extra_bytes = len(bytes_read)
444                    break
445
446                rd_timeout = 0
447                body = bytes_read[couchbaseConstants.MIN_RECV_PACKET : \
448                                  couchbaseConstants.MIN_RECV_PACKET+bodylen]
449                bytes_read = bytes_read[couchbaseConstants.MIN_RECV_PACKET+bodylen:]
450                self.response.put((opcode, status, opaque, cas, keylen, extlen, body, \
451                                   bodylen, datatype, \
452                                   couchbaseConstants.MIN_RECV_PACKET+bodylen+extra_bytes))
453                extra_bytes = 0
454
455    def setup_dcp_streams(self):
456        #send request to retrieve vblist and uuid for the node
457        stats = self.mem_conn.stats("vbucket-seqno")
458        if not stats:
459            return "error: fail to retrive vbucket seqno", None
460        self.mem_conn.close()
461
462        uuid_list = {}
463        seqno_list = {}
464        vb_list = {}
465        for key, val in stats.items():
466            vb, counter = key.split(":")
467            if counter in [DCPStreamSource.VB_UUID,
468                           DCPStreamSource.HIGH_SEQNO,
469                           DCPStreamSource.ABS_HIGH_SEQNO,
470                           DCPStreamSource.PURGE_SEQNO]:
471                if not vb_list.has_key(vb[3:]):
472                    vb_list[vb[3:]] = {}
473                vb_list[vb[3:]][counter] = int(val)
474        flags = 0
475        pair_index = (self.source_bucket['name'], self.source_node['hostname'])
476        vbuckets = None
477        if self.vbucket_list:
478            vbuckets = json.loads(self.vbucket_list)
479        for vbid in vb_list.iterkeys():
480            if int(vbid) not in self.node_vbucket_map:
481                #skip nonactive vbucket
482                continue
483            if vbuckets and int(vbid) not in vbuckets:
484                #skip vbuckets that are not in this run
485                continue
486
487            if self.cur['seqno'] and self.cur['seqno'][pair_index]:
488                start_seqno = self.cur['seqno'][pair_index][vbid]
489            else:
490                start_seqno = 0
491            uuid = 0
492            if self.cur['failoverlog'] and self.cur['failoverlog'][pair_index]:
493                if vbid in self.cur['failoverlog'][pair_index] and \
494                   self.cur['failoverlog'][pair_index][vbid]:
495                    #Use the latest failover log
496                    self.cur['failoverlog'][pair_index][vbid] = \
497                        sorted(self.cur['failoverlog'][pair_index][vbid], \
498                               key=lambda tup: tup[1],
499                               reverse=True)
500                    uuid, _ = self.cur['failoverlog'][pair_index][vbid][0]
501            ss_start_seqno = start_seqno
502            ss_end_seqno = start_seqno
503            if self.cur['snapshot'] and self.cur['snapshot'][pair_index]:
504                if vbid in self.cur['snapshot'][pair_index] and \
505                   self.cur['snapshot'][pair_index][vbid]:
506                    ss_start_seqno, ss_end_seqno = self.cur['snapshot'][pair_index][vbid]
507            self.request_dcp_stream(int(vbid), flags, start_seqno,
508                                    vb_list[vbid][DCPStreamSource.HIGH_SEQNO],
509                                    uuid, ss_start_seqno, ss_end_seqno)
510
511    def request_dcp_stream(self,
512                           vbid,
513                           flags,
514                           start_seqno,
515                           end_seqno,
516                           vb_uuid,
517                           ss_start_seqno,
518                           ss_end_seqno):
519        if not self.dcp_conn:
520            return "error: no dcp connection setup yet.", None
521        extra = struct.pack(couchbaseConstants.DCP_STREAM_REQ_PKT_FMT,
522                            int(flags), 0,
523                            int(start_seqno),
524                            int(end_seqno),
525                            int(vb_uuid),
526                            int(ss_start_seqno),
527                            int(ss_end_seqno))
528        self.dcp_conn._sendMsg(couchbaseConstants.CMD_DCP_REQUEST_STREAM, '', '', vbid, \
529                               extra, 0, 0, vbid)
530        self.stream_list[vbid] = (vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_end_seqno)
531
532    def _read_dcp_conn(self, dcp_conn):
533        buf, cmd, errcode, opaque, cas, keylen, extlen, data, datalen = \
534            self.recv_dcp_msg(dcp_conn.s)
535        dcp_conn.buf = buf
536
537        rv = 0
538        metalen = flags = ttl = flg = exp = 0
539        meta = key = val = ext = ''
540        need_ack = False
541        extlen = seqno = 0
542        if data:
543            ext = data[0:extlen]
544            if cmd == couchbaseConstants.CMD_DCP_MUTATION:
545                seqno, rev_seqno, flg, exp, locktime, meta = \
546                    struct.unpack(couchbaseConstants.DCP_MUTATION_PKT_FMT, ext)
547                key_start = extlen
548                val_start = key_start + keylen
549                key = data[extlen:val_start]
550                val = data[val_start:]
551            elif cmd == couchbaseConstants.CMD_DCP_DELETE or \
552                 cmd == couchbaseConstants.CMD_DCP_EXPIRATION:
553                seqno, rev_seqno, meta = \
554                    struct.unpack(couchbaseConstants.DCP_DELETE_PKT_FMT, ext)
555                key_start = extlen
556                val_start = key_start + keylen
557                key = data[extlen:val_start]
558            else:
559                rv = "error: uninterpreted DCP commands:%s" % cmd
560        elif datalen:
561            rv = "error: could not read full TAP message body"
562
563        return rv, cmd, errcode, key, flg, exp, cas, meta, val, opaque, need_ack, seqno
564
565    def _recv_dcp_msg_error(self, sock, buf):
566        pkt, buf = self.recv_dcp(sock, couchbaseConstants.MIN_RECV_PACKET, buf)
567        if not pkt:
568            raise EOFError()
569        magic, cmd, keylen, extlen, dtype, errcode, datalen, opaque, cas = \
570            struct.unpack(couchbaseConstants.REQ_PKT_FMT, pkt)
571        if magic != couchbaseConstants.REQ_MAGIC_BYTE:
572            raise Exception("unexpected recv_msg magic: " + str(magic))
573        data, buf = self.recv_dcp(sock, datalen, buf)
574        return buf, cmd, errcode, opaque, cas, keylen, extlen, data, datalen
575
576    def _recv_dcp_msg(self, sock):
577        response = ""
578        while len(response) < couchbaseConstants.MIN_RECV_PACKET:
579            data = sock.recv(couchbaseConstants.MIN_RECV_PACKET - len(response))
580            if data == '':
581                raise exceptions.EOFError("Got empty data (remote died?).")
582            response += data
583        assert len(response) == couchbaseConstants.MIN_RECV_PACKET
584        magic, cmd, keylen, extralen, dtype, errcode, remaining, opaque, cas=\
585            struct.unpack(couchbaseConstants.RES_PKT_FMT, response)
586
587        datalen = remaining
588        rv = ""
589        while remaining > 0:
590            data = sock.recv(remaining)
591            if data == '':
592                raise exceptions.EOFError("Got empty data (remote died?).")
593            rv += data
594            remaining -= len(data)
595
596        assert (magic in (couchbaseConstants.RES_MAGIC_BYTE, couchbaseConstants.REQ_MAGIC_BYTE)), \
597               "Got magic: %d" % magic
598        return cmd, errcode, opaque, cas, keylen, extralen, rv, datalen
599
600    def _recv_dcp(self, skt, nbytes, buf):
601        recv_arr = [ buf ]
602        recv_tot = len(buf) # In bytes.
603
604        while recv_tot < nbytes:
605            data = None
606            try:
607                data = skt.recv(max(nbytes - len(buf), self.recv_min_bytes))
608            except socket.timeout:
609                logging.error("error: recv socket.timeout")
610            except Exception, e:
611                msg = str(e)
612                if msg.find("Connection reset by peer") >= 0:
613                    logging.error("error: recv exception: " + \
614                        "%s, %s may be inactive" % (msg, self.source_node["hostname"]))
615                else:
616                    logging.error("error: recv exception: " + str(e))
617
618
619            if not data:
620                return None, ''
621
622            recv_arr.append(data)
623            recv_tot += len(data)
624
625        joined = ''.join(recv_arr)
626
627        return joined[:nbytes], joined[nbytes:]
628
629    @staticmethod
630    def total_msgs(opts, source_bucket, source_node, source_map):
631        source_name = source_node.get("hostname", None)
632        if not source_name:
633            return 0, None
634
635        spec = source_map['spec']
636        name = source_bucket['name']
637        vbuckets_num = len(source_bucket['vBucketServerMap']['vBucketMap'])
638        if not vbuckets_num:
639            return 0, None
640
641        vbucket_list = getattr(opts, "vbucket_list", None)
642
643        stats_vals = {}
644        host, port, user, pswd, _ = pump.parse_spec(opts, spec, 8091)
645        for stats in ["curr_items", "vb_active_resident_items_ratio"]:
646            path = "/pools/default/buckets/%s/stats/%s" % (name, stats)
647            err, json, data = pump.rest_request_json(host, int(port),
648                                                     user, pswd, path,
649                                                     reason="total_msgs")
650            if err:
651                return 0, None
652
653            nodeStats = data.get("nodeStats", None)
654            if not nodeStats:
655                return 0, None
656            vals = nodeStats.get(source_name, None)
657            if not vals:
658                return 0, None
659            stats_vals[stats] = vals[-1]
660
661        total_msgs = stats_vals["curr_items"]
662        resident_ratio = stats_vals["vb_active_resident_items_ratio"]
663        if 0 < resident_ratio < 100:
664            #for DGM case, server will transfer both in-memory items and
665            #backfill all items on disk
666            total_msgs += (resident_ratio/100.0) * stats_vals["curr_items"]
667        return 0, int(total_msgs)
668
669class DCPSink(pump_cb.CBSink):
670    """Smart client using DCP protocol to steam data to couchbase cluster."""
671    def __init__(self, opts, spec, source_bucket, source_node,
672                 source_map, sink_map, ctl, cur):
673        super(DCPSink, self).__init__(opts, spec, source_bucket, source_node,
674                                     source_map, sink_map, ctl, cur)
675
676        self.vbucket_list = getattr(opts, "vbucket_list", None)
677
678    @staticmethod
679    def check_base(opts, spec):
680        #Overwrite CBSink.check_base() to allow replica vbucket state
681
682        op = getattr(opts, "destination_operation", None)
683        if op not in [None, 'set', 'add', 'get']:
684            return ("error: --destination-operation unsupported value: %s" +
685                    "; use set, add, get") % op
686
687        return pump.EndPoint.check_base(opts, spec)
688
689    def scatter_gather(self, mconns, batch):
690        sink_map_buckets = self.sink_map['buckets']
691        if len(sink_map_buckets) != 1:
692            return "error: CBSink.run() expected 1 bucket in sink_map", None, None
693
694        vbuckets_num = len(sink_map_buckets[0]['vBucketServerMap']['vBucketMap'])
695        vbuckets = batch.group_by_vbucket_id(vbuckets_num, self.rehash)
696
697        # Scatter or send phase.
698        for vbucket_id, msgs in vbuckets.iteritems():
699            rv, conn = self.find_conn(mconns, vbucket_id)
700            if rv != 0:
701                return rv, None, None
702            rv = self.send_msgs(conn, msgs, self.operation(),
703                                vbucket_id=vbucket_id)
704            if rv != 0:
705                return rv, None, None
706
707        # Yield to let other threads do stuff while server's processing.
708        time.sleep(0.01)
709
710        # Gather or recv phase.
711        # Only process the last msg which requires ack request
712        last_msg = []
713        retry_batch = None
714        need_refresh = False
715        for vbucket_id, msgs in vbuckets.iteritems():
716            last_msg.append(msgs[-1])
717            rv, conn = self.find_conn(mconns, vbucket_id)
718            if rv != 0:
719                return rv, None, None
720            rv, retry, refresh = \
721                self.recv_msgs(conn, last_msg, vbucket_id=vbucket_id, verify_opaque=False)
722            if rv != 0:
723                return rv, None, None
724            if retry:
725                retry_batch = batch
726            if refresh:
727                need_refresh = True
728
729        return 0, retry_batch, retry_batch and not need_refresh
730
731    def connect_mc(self, node, sasl_user, sasl_pswd):
732        """Return previously connected dcp conn."""
733
734        host = node.split(':')[0]
735        port = node.split(':')[1]
736
737        logging.debug("  DCPSink connecting mc: " +
738                      host + ":" + str(port))
739
740        dcp_conn = cb_bin_client.MemcachedClient(host, port)
741        if not dcp_conn:
742            return "error: could not connect to memcached: " + \
743                host + ":" + str(port), None
744        if sasl_user:
745            try:
746                dcp_conn.sasl_auth_plain(sasl_user, sasl_pswd)
747            except EOFError:
748                return "error: SASL auth error: %s:%s, user: %s" % \
749                    (host, port, sasl_user), None
750            except cb_bin_client.MemcachedError:
751                return "error: SASL auth failed: %s:%s, user: %s" % \
752                    (host, port, sasl_user), None
753            except socket.error:
754                return "error: SASL auth socket error: %s:%s, user: %s" % \
755                    (host, port, sasl_user), None
756        extra = struct.pack(couchbaseConstants.DCP_CONNECT_PKT_FMT, 0, \
757                            couchbaseConstants.FLAG_DCP_CONSUMER)
758        try:
759            opaque=self.r.randint(0, 2**32)
760            dcp_conn._sendCmd(couchbaseConstants.CMD_DCP_CONNECT, self.dcp_name, '', opaque, extra)
761            dcp_conn._handleSingleResponse(opaque)
762        except EOFError:
763            return "error: Fail to set up DCP connection", None
764        except cb_bin_client.MemcachedError:
765            return "error: DCP connect memcached error", None
766        except socket.error:
767            return "error: DCP connection error", None
768
769        return 0, dcp_conn
770
771    def find_conn(self, mconns, vbucket_id):
772        if not self.vbucket_list:
773            return super(DCPSink, self).find_conn(mconns, vbucket_id)
774
775        vbuckets = json.loads(self.vbucket_list)
776        if isinstance(vbuckets, list):
777            if vbucket_id not in vbuckets:
778                return "error: unexpected vbucket id:" + str(vbucket_id), None
779            return super(DCPSink, self).find_conn(mconns, vbucket_id)
780
781        bucket = self.sink_map['buckets'][0]
782        serverList = bucket['vBucketServerMap']['serverList']
783        if serverList:
784            port_number = serverList[0].split(":")[-1]
785        else:
786            port_number = 11210
787
788        conn = None
789        if isinstance(vbuckets, dict):
790            for node, vblist in vbuckets.iteritems():
791                for vbucket in vblist:
792                    if int(vbucket) == int(vbucket_id):
793                        host_port = "%s:%s" % (node, port_number)
794                        conn = mconns.get(host_port, None)
795                        if not conn:
796                            user = bucket['name']
797                            pswd = bucket['saslPassword']
798                            rv, conn = DCPSink.connect_mc(host_port, user, pswd)
799                            if rv != 0:
800                                logging.error("error: CBSink.connect() for send: " + rv)
801                                return rv, None
802                            mconns[host_port] = conn
803                        break
804        return 0, conn
805
806    def send_msgs(self, conn, msgs, operation, vbucket_id=None):
807        m = []
808        #Ask for acknowledgement for the last msg of batch
809        for i, msg in enumerate(msgs):
810            cmd, vbucket_id_msg, key, flg, exp, cas, meta, val, seqno, dtype, nmeta = msg
811            if vbucket_id is not None:
812                vbucket_id_msg = vbucket_id
813
814            if self.skip(key, vbucket_id_msg):
815                continue
816
817            if cmd == couchbaseConstants.CMD_GET:
818                val, flg, exp, cas = '', 0, 0, 0
819            if cmd == couchbaseConstants.CMD_NOOP:
820                key, val, flg, exp, cas = '', '', 0, 0, 0
821            need_ack = (i == len(msgs)-1)
822            rv, req = self.cmd_request(cmd, vbucket_id_msg, key, val,
823                                       ctypes.c_uint32(flg).value,
824                                       exp, cas, meta, i, seqno, dtype, nmeta, need_ack)
825            if rv != 0:
826                return rv
827            self.append_req(m, req)
828        if m:
829            try:
830                conn.s.send(''.join(m))
831            except socket.error, e:
832                return "error: conn.send() exception: %s" % (e)
833        return 0
834
835    def cmd_request(self, cmd, vbucket_id, key, val, flg, exp, cas, meta, opaque, seqno, dtype,\
836                    nmeta, need_ack):
837        if meta:
838            seq_no = str(meta)
839            if len(seq_no) > 8:
840                seq_no = seq_no[0:8]
841            if len(seq_no) < 8:
842                # The seq_no might be 32-bits from 2.0DP4, so pad with 0x00's.
843                seq_no = ('\x00\x00\x00\x00\x00\x00\x00\x00' + seq_no)[-8:]
844            check_seqno, = struct.unpack(">Q", seq_no)
845            if not check_seqno:
846                seq_no = ('\x00\x00\x00\x00\x00\x00\x00\x00' + 1)[-8:]
847        else:
848            seq_no = ('\x00\x00\x00\x00\x00\x00\x00\x00' + 1)[-8:]
849        metalen = len(str(seq_no))
850        locktime = 0
851        rev_seqno = 0
852        nru = 0
853        if cmd == couchbaseConstants.CMD_DCP_MUTATION:
854            ext = struct.pack(couchbaseConstants.DCP_MUTATION_PKT_FMT,
855                              seqno, rev_seqno, flg, exp, locktime, metalen, nru)
856        elif cmd == couchbaseConstants.CMD_DCP_DELETE:
857            ext = struct.pack(couchbaseConstants.DCP_DELETE_PKT_FMT,
858                              seqno, rev_seqno, metalen)
859        else:
860            return "error: MCSink - unknown tap cmd for request: " + str(cmd), None
861
862        hdr = self.cmd_header(cmd, vbucket_id, key, val, ext, cas, opaque, metalen, dtype)
863        return 0, (hdr, ext, seq_no, key, val)
864
865    def cmd_header(self, cmd, vbucket_id, key, val, ext, cas, opaque, metalen,
866                   dtype=0,
867                   fmt=couchbaseConstants.REQ_PKT_FMT,
868                   magic=couchbaseConstants.REQ_MAGIC_BYTE):
869        #MB-11902
870        dtype = 0
871        return struct.pack(fmt, magic, cmd,
872                           len(key), len(ext), dtype, vbucket_id,
873                           metalen + len(key) + len(val) + len(ext), opaque, cas)
874
875    #TODO : where is the seq_no? same or different from tap seq_no?
876    def append_req(self, m, req):
877        hdr, ext, seq_no, key, val = req
878        m.append(hdr)
879        if ext:
880            m.append(ext)
881        if seq_no:
882            m.append(seq_no)
883        if key:
884            m.append(str(key))
885        if val:
886            m.append(str(val))
887