xref: /6.0.3/couchbase-cli/pump_dcp.py (revision bd24e5ec)
1#!/usr/bin/env python
2
3import logging
4import os
5import Queue
6import random
7import json
8import socket
9import struct
10import time
11import threading
12import select
13import exceptions
14
15import cb_bin_client
16import couchbaseConstants
17
18import pump
19import pump_mc
20import pump_cb
21
22from cluster_manager import ClusterManager, ServiceNotAvailableException
23
24bool_to_str = lambda value: str(bool(int(value))).lower()
25
26class DCPStreamSource(pump.Source, threading.Thread):
27    """Can read from cluster/server/bucket via DCP streaming."""
28    HIGH_SEQNO = "high_seqno"
29    VB_UUID = "uuid"
30    ABS_HIGH_SEQNO = "abs_high_seqno"
31    PURGE_SEQNO = "purge_seqno"
32
33    HIGH_SEQNO_BYTE = 8
34    UUID_BYTE = 8
35
36    def __init__(self, opts, spec, source_bucket, source_node,
37                 source_map, sink_map, ctl, cur):
38        if spec.startswith("https://"):
39            setattr(opts, "ssl", True)
40        super(DCPStreamSource, self).__init__(opts, spec, source_bucket, source_node,
41                                            source_map, sink_map, ctl, cur)
42        threading.Thread.__init__(self)
43
44        self.dcp_done = False
45        self.dcp_conn = None
46        self.mem_conn = None
47        self.dcp_name = opts.process_name
48        self.ack_last = False
49        self.cmd_last = None
50        self.num_msg = 0
51        self.version_supported = self.source_node['version'].split(".") >= ["3", "0", "0"]
52        self.recv_min_bytes = int(opts.extra.get("recv_min_bytes", 4096))
53        self.batch_max_bytes = int(opts.extra.get("batch_max_bytes", 400000))
54        self.flow_control = int(opts.extra.get("flow_control", 1))
55        self.vbucket_list = getattr(opts, "vbucket_list", None)
56        self.r=random.Random()
57        self.queue_size = int(opts.extra.get("dcp_consumer_queue_length", 1000))
58        self.response = Queue.Queue(self.queue_size)
59        self.running = False
60        self.stream_list = {}
61        self.unack_size = 0
62        self.node_vbucket_map = None
63
64    @staticmethod
65    def can_handle(opts, spec):
66        return (spec.startswith("http://") or
67                spec.startswith("couchbase://") or
68                spec.startswith("https://")
69                )
70
71    @staticmethod
72    def check(opts, spec):
73        err, map = pump.rest_couchbase(opts, spec)
74        if err:
75            return err, map
76        if not map or not map.get('buckets'):
77            return ("error: no buckets supporting DCP at source: %s;"
78                    " please check your username/password to the cluster" %
79                    (spec)), None
80        return 0, map
81
82    @staticmethod
83    def provide_design(opts, source_spec, source_bucket, source_map):
84        # Ephemeral buckets do not have design docs
85        if source_bucket['bucketType'] == 'ephemeral':
86            return 0, None
87        spec_parts = source_map.get('spec_parts')
88        if not spec_parts:
89            return "error: no design spec_parts", None
90        host, port, user, pswd, path = spec_parts
91
92        source_nodes = pump.filter_bucket_nodes(source_bucket, spec_parts)
93        if not source_nodes:
94            source_nodes = source_bucket['nodes']
95            if not source_nodes:
96                return ("error: no design source node; spec_parts: %s" %
97                        (spec_parts,), None)
98
99        couch_api_base = source_nodes[0].get('couchApiBase')
100        if not couch_api_base:
101            return 0, None # No couchApiBase; probably not 2.0.
102
103        err, ddocs_json, ddocs = \
104            pump.rest_request_json(host, int(port), user, pswd, opts.ssl,
105                                   "/pools/default/buckets/%s/ddocs" %
106                                   (source_bucket['name']),
107                                   reason="provide_design")
108        if err and "response: 404" in err: # A 404/not-found likely means 2.0-DP4.
109            ddocs_json = None
110            ddocs_url = couch_api_base + "/_all_docs"
111            ddocs_qry = "?startkey=\"_design/\"&endkey=\"_design0\"&include_docs=true"
112
113            host, port, user, pswd, path = \
114                pump.parse_spec(opts, ddocs_url, 8092)
115            # Not using user/pwd as 2.0-DP4 CAPI did not support auth.
116            err, ddocs_json, ddocs = \
117                pump.rest_request_json(host, int(port), None, None, opts.ssl,
118                                       path + ddocs_qry,
119                                       reason="provide_design-2.0DP4")
120        if err:
121            return err, None
122
123        if not ddocs.get('rows', None):
124            return 0, None
125        else:
126            return 0, json.dumps(ddocs.get('rows', []))
127
128    @staticmethod
129    def provide_index(opts, source_spec, source_bucket, source_map):
130        try:
131            rest = ClusterManager(source_spec, opts.username, opts.password, opts.ssl, False,
132                                  None, False)
133            result, errors = rest.get_index_metadata(source_bucket['name'])
134            if errors:
135                return errors, None
136            return 0, json.dumps(result["result"])
137        except ServiceNotAvailableException, e:
138            return 0, None
139
140    @staticmethod
141    def provide_fts_index(opts, source_spec, source_bucket, source_map):
142        try:
143            rest = ClusterManager(source_spec, opts.username, opts.password, opts.ssl, False,
144                                  None, False)
145            result, errors = rest.get_fts_index_metadata(source_bucket['name'])
146            if errors:
147                return errors, None
148            return 0, json.dumps(result)
149        except ServiceNotAvailableException, e:
150            return 0, None
151
152    def get_conflict_resolution_type(self):
153        confResType = "seqno"
154        if "conflictResolutionType" in self.source_bucket:
155            confResType = self.source_bucket["conflictResolutionType"]
156        return confResType
157
158    def add_start_event(self, conn):
159        sasl_user = str(self.source_bucket.get("name"))
160        event = {"timestamp": self.get_timestamp(),
161                 "real_userid": {"source": "internal",
162                                 "user": sasl_user,
163                                },
164                 "mode": getattr(self.opts, "mode", "diff"),
165                 "source_bucket": self.source_bucket['name'],
166                 "source_node": self.source_node['hostname']
167                }
168        if conn:
169            try:
170                conn.audit(couchbaseConstants.AUDIT_EVENT_BACKUP_START, json.dumps(event))
171            except Exception, e:
172                logging.warn("auditing error: %s" % e)
173        return 0
174
175    def add_stop_event(self, conn):
176        sasl_user = str(self.source_bucket.get("name"))
177        event = {"timestamp": self.get_timestamp(),
178                 "real_userid": {"source": "internal",
179                                 "user": sasl_user
180                                },
181                 "source_bucket": self.source_bucket['name'],
182                 "source_node": self.source_node['hostname']
183                }
184        if conn:
185            try:
186                conn.audit(couchbaseConstants.AUDIT_EVENT_BACKUP_STOP, json.dumps(event))
187            except Exception, e:
188                logging.warn("auditing error: %s" % e)
189        return 0
190
191    def build_node_vbucket_map(self):
192        if self.source_bucket.has_key("vBucketServerMap"):
193            server_list = self.source_bucket["vBucketServerMap"]["serverList"]
194            vbucket_map = self.source_bucket["vBucketServerMap"]["vBucketMap"]
195        else:
196            return None
197
198        node_vbucket_map = []
199        nodename = self.source_node.get('hostname', 'N/A').split(":")[0]
200        nodeindex = -1
201        for index, node in enumerate(server_list):
202            if nodename in node:
203                nodeindex = index
204                break
205        for index, vblist in enumerate(vbucket_map):
206            if vblist[0] >= 0 and vblist[0] == nodeindex:
207                node_vbucket_map.append(index)
208        return node_vbucket_map
209
210    def provide_batch(self):
211        if not self.version_supported:
212            return "error: cannot back up 2.x or older clusters with 3.x tools", None
213
214        cur_sleep = 0.2
215        cur_retry = 0
216        max_retry = self.opts.extra['max_retry']
217        if not self.node_vbucket_map:
218            self.node_vbucket_map = self.build_node_vbucket_map()
219
220        while True:
221            if self.dcp_done:
222                if self.dcp_conn:
223                    self.add_stop_event(self.dcp_conn)
224                    self.dcp_conn.close()
225                    self.dcp_conn = None
226                return 0, None
227
228            rv = self.get_dcp_conn()
229            if rv != 0:
230                self.dcp_done = True
231                return rv, None
232
233            rv, batch = self.provide_dcp_batch_actual()
234            if rv == 0:
235                return 0, batch
236
237            if self.dcp_conn:
238                self.dcp_conn.close()
239                self.dcp_conn = None
240
241            if cur_retry > max_retry:
242                self.dcp_done = True
243                return rv, batch
244
245            logging.warn("backoff: %s, sleeping: %s, on error: %s" %
246                         (cur_retry, cur_sleep, rv))
247            time.sleep(cur_sleep)
248            cur_sleep = min(cur_sleep * 2, 20) # Max backoff sleep 20 seconds.
249            cur_retry = cur_retry + 1
250
251    def provide_dcp_batch_actual(self):
252        batch = pump.Batch(self)
253
254        batch_max_size = self.opts.extra['batch_max_size']
255        batch_max_bytes = self.opts.extra['batch_max_bytes']
256        delta_ack_size = batch_max_bytes * 10 / 4 #ack every 25% of buffer size
257        last_processed = 0
258        total_bytes_read = 0
259
260        vbid = 0
261        cmd = 0
262        start_seqno = 0
263        end_seqno = 0
264        vb_uuid = 0
265        hi_seqno = 0
266        ss_start_seqno = 0
267        ss_end_seqno = 0
268        no_response_count = 0
269        try:
270            while (not self.dcp_done and
271                   batch.size() < batch_max_size and
272                   batch.bytes < batch_max_bytes):
273
274                if self.response.empty():
275                    if len(self.stream_list) > 0:
276                        logging.debug("no response while there %s active streams" % len(self.stream_list))
277                        time.sleep(.25)
278                        no_response_count = no_response_count + 1
279                        #if not had a response after a minimum of 30 seconds then state we are done
280                        if no_response_count == 120:
281                            logging.warn("no response for 30 seconds while there %s active streams"
282                                         % len(self.stream_list))
283                            self.dcp_done = True
284                    else:
285                        self.dcp_done = True
286                    continue
287                unprocessed_size = total_bytes_read - last_processed
288                if unprocessed_size > delta_ack_size:
289                    rv = self.ack_buffer_size(unprocessed_size)
290                    if rv:
291                        logging.error(rv)
292                    else:
293                        last_processed = total_bytes_read
294
295                cmd, errcode, opaque, cas, keylen, extlen, data, datalen, dtype, bytes_read = \
296                    self.response.get()
297                total_bytes_read += bytes_read
298                rv = 0
299                metalen = flags = flg = exp = 0
300                key = val = ext = ''
301                need_ack = False
302                seqno = 0
303                if cmd == couchbaseConstants.CMD_DCP_REQUEST_STREAM:
304                    if errcode == couchbaseConstants.ERR_SUCCESS:
305                        pair_index = (self.source_bucket['name'], self.source_node['hostname'])
306                        start = 0
307                        step = DCPStreamSource.HIGH_SEQNO_BYTE + DCPStreamSource.UUID_BYTE
308                        while start+step <= datalen:
309                            uuid, seqno = struct.unpack(
310                                            couchbaseConstants.DCP_VB_UUID_SEQNO_PKT_FMT, \
311                                            data[start:start + step])
312                            if pair_index not in self.cur['failoverlog']:
313                                self.cur['failoverlog'][pair_index] = {}
314                            if opaque not in self.cur['failoverlog'][pair_index] or \
315                               not self.cur['failoverlog'][pair_index][opaque]:
316                                self.cur['failoverlog'][pair_index][opaque] = [(uuid, seqno)]
317                            else:
318                                self.cur['failoverlog'][pair_index][opaque].append((uuid, seqno))
319                            start = start + step
320                    elif errcode == couchbaseConstants.ERR_KEY_ENOENT:
321                        logging.warn("producer doesn't know about the vbucket uuid, rollback to 0")
322                        vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_end_seqno = \
323                            self.stream_list[opaque]
324                        del self.stream_list[opaque]
325                    elif errcode == couchbaseConstants.ERR_KEY_EEXISTS:
326                       logging.warn("a stream exists on the connection for vbucket:%s" % opaque)
327                    elif errcode ==  couchbaseConstants.ERR_NOT_MY_VBUCKET:
328                        logging.warn("Vbucket is not active anymore, skip it:%s" % vbid)
329                        del self.stream_list[opaque]
330                    elif errcode == couchbaseConstants.ERR_ERANGE:
331                        logging.warn("Start or end sequence numbers specified incorrectly,(%s, %s)" % \
332                                     (start_seqno, end_seqno))
333                        del self.stream_list[opaque]
334                    elif errcode == couchbaseConstants.ERR_ROLLBACK:
335                        vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_stop_seqno = \
336                            self.stream_list[opaque]
337                        start_seqno, = struct.unpack(couchbaseConstants.DCP_VB_SEQNO_PKT_FMT, data)
338                        #find the most latest uuid, hi_seqno that fit start_seqno
339                        if self.cur['failoverlog']:
340                            pair_index = (self.source_bucket['name'], self.source_node['hostname'])
341                            if self.cur['failoverlog'][pair_index].get("vbid"):
342                                for uuid, seqno in self.cur['failoverlog'][pair_index][vbid]:
343                                    if start_seqno >= seqno:
344                                        vb_uuid = uuid
345                                        break
346                        ss_start_seqno = start_seqno
347                        ss_end_seqno = start_seqno
348                        self.request_dcp_stream(vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_end_seqno)
349
350                        del self.stream_list[opaque]
351                        self.stream_list[opaque] = \
352                            (vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_end_seqno)
353                    else:
354                        logging.error("unprocessed errcode:%s" % errcode)
355                        del self.stream_list[opaque]
356                elif cmd == couchbaseConstants.CMD_DCP_MUTATION:
357                    vbucket_id = errcode
358                    seqno, rev_seqno, flg, exp, locktime, metalen, nru = \
359                        struct.unpack(couchbaseConstants.DCP_MUTATION_PKT_FMT, data[0:extlen])
360                    key_start = extlen
361                    val_start = key_start + keylen
362                    val_len = datalen- keylen - metalen - extlen
363                    meta_start = val_start + val_len
364                    key = data[extlen:val_start]
365                    val = data[val_start:meta_start]
366                    conf_res = 0
367                    if meta_start < datalen:
368                        # handle extra conflict resolution fields
369                        extra_meta = data[meta_start:]
370                        extra_index = 0
371                        version = extra_meta[extra_index]
372                        extra_index += 1
373                        while extra_index < metalen:
374                            id, extlen = struct.unpack(couchbaseConstants.DCP_EXTRA_META_PKG_FMT, extra_meta[extra_index:extra_index+3])
375                            extra_index += 3
376                            if id == couchbaseConstants.DCP_EXTRA_META_CONFLICT_RESOLUTION:
377                                if extlen == 1:
378                                    conf_res, = struct.unpack(">B",extra_meta[extra_index:extra_index+1])
379                                elif extlen == 2:
380                                    conf_res, = struct.unpack(">H",extra_meta[extra_index:extra_index+2])
381                                elif extlen == 4:
382                                    conf_res, = struct.unpack(">I", extra_meta[extra_index:extra_index+4])
383                                elif extlen == 8:
384                                    conf_res, = struct.unpack(">Q", extra_meta[extra_index:extra_index+8])
385                                else:
386                                    logging.error("unsupported extra meta data format:%d" % extlen)
387                                    conf_res = 0
388                            extra_index += extlen
389
390                    if not self.skip(key, vbucket_id):
391                        msg = (cmd, vbucket_id, key, flg, exp, cas, rev_seqno, val, seqno, dtype, \
392                               metalen, conf_res)
393                        batch.append(msg, len(val))
394                        self.num_msg += 1
395                elif cmd == couchbaseConstants.CMD_DCP_DELETE or \
396                     cmd == couchbaseConstants.CMD_DCP_EXPIRATION:
397                    vbucket_id = errcode
398                    seqno, rev_seqno, metalen = \
399                        struct.unpack(couchbaseConstants.DCP_DELETE_PKT_FMT, data[0:extlen])
400                    key_start = extlen
401                    val_start = key_start + keylen
402                    key = data[extlen:val_start]
403                    if not self.skip(key, vbucket_id):
404                        msg = (cmd, vbucket_id, key, flg, exp, cas, rev_seqno, val, seqno, dtype, \
405                               metalen, 0)
406                        batch.append(msg, len(val))
407                        self.num_msg += 1
408                    if cmd == couchbaseConstants.CMD_DCP_DELETE:
409                        batch.adjust_size += 1
410                elif cmd == couchbaseConstants.CMD_DCP_FLUSH:
411                    logging.warn("stopping: saw CMD_DCP_FLUSH")
412                    self.dcp_done = True
413                    break
414                elif cmd == couchbaseConstants.CMD_DCP_END_STREAM:
415                    del self.stream_list[opaque]
416                    if not len(self.stream_list):
417                        self.dcp_done = True
418                elif cmd == couchbaseConstants.CMD_DCP_SNAPSHOT_MARKER:
419                    ss_start_seqno, ss_end_seqno, _ = \
420                        struct.unpack(couchbaseConstants.DCP_SNAPSHOT_PKT_FMT, data[0:extlen])
421                    pair_index = (self.source_bucket['name'], self.source_node['hostname'])
422                    if not self.cur['snapshot']:
423                        self.cur['snapshot'] = {}
424                    if pair_index not in self.cur['snapshot']:
425                        self.cur['snapshot'][pair_index] = {}
426                    self.cur['snapshot'][pair_index][opaque] = (ss_start_seqno, ss_end_seqno)
427                elif cmd == couchbaseConstants.CMD_DCP_NOOP:
428                    need_ack = True
429                elif cmd == couchbaseConstants.CMD_DCP_BUFFER_ACK:
430                    if errcode != couchbaseConstants.ERR_SUCCESS:
431                        logging.warning("buffer ack response errcode:%s" % errcode)
432                    continue
433                else:
434                    logging.warn("warning: unexpected DCP message: %s" % cmd)
435                    return "unexpected DCP message: %s" % cmd, batch
436
437                if need_ack:
438                    self.ack_last = True
439                    try:
440                        self.dcp_conn._sendMsg(cmd, '', '', opaque, vbucketId=0,
441                                          fmt=couchbaseConstants.RES_PKT_FMT,
442                                          magic=couchbaseConstants.RES_MAGIC_BYTE)
443                    except socket.error:
444                        return ("error: socket.error on sendall();"
445                                " perhaps the source server: %s was rebalancing"
446                                " or had connectivity/server problems" %
447                                (self.source_node['hostname'])), batch
448                    except EOFError:
449                        self.dcp_done = True
450                        return ("error: EOFError on socket sendall();"
451                                " perhaps the source server: %s was rebalancing"
452                                " or had connectivity/server problems" %
453                                (self.source_node['hostname'])), batch
454
455                    # Close the batch when there's an ACK handshake, so
456                    # the server can concurrently send us the next batch.
457                    # If we are slow, our slow ACK's will naturally slow
458                    # down the server.
459                    self.ack_buffer_size(total_bytes_read - last_processed)
460                    return 0, batch
461
462                self.ack_last = False
463                self.cmd_last = cmd
464
465        except EOFError:
466            if batch.size() <= 0 and self.ack_last:
467                # A closed conn after an ACK means clean end of TAP dump.
468                self.dcp_done = True
469
470        if batch.size() <= 0:
471            return 0, None
472        self.ack_buffer_size(total_bytes_read - last_processed)
473        return 0, batch
474
475    def get_dcp_conn(self):
476        """Return previously connected dcp conn."""
477
478        if not self.dcp_conn:
479            host, _  = pump.hostport(self.source_node['hostname'])
480            port = self.source_node['ports']['direct']
481            username = self.opts.username
482            password = self.opts.password
483            bucket = str(self.source_bucket.get("name"))
484            if self.opts.ssl:
485                port = couchbaseConstants.SSL_PORT
486
487            logging.debug("  DCPStreamSource connecting mc: " + host + ":" + str(port))
488
489            err, self.dcp_conn = pump.get_mcd_conn(host, port, username, password, bucket)
490            if err:
491                return err, None
492
493            err, self.mem_conn = pump.get_mcd_conn(host, port, username, password, bucket)
494            if err:
495                return err, None
496
497            flags = couchbaseConstants.FLAG_DCP_PRODUCER | couchbaseConstants.FLAG_DCP_XATTRS
498            extra = struct.pack(couchbaseConstants.DCP_CONNECT_PKT_FMT, 0, flags)
499            try:
500                opaque=self.r.randint(0, 2**32)
501                self.dcp_conn._sendCmd(couchbaseConstants.CMD_DCP_CONNECT, self.dcp_name, \
502                                       '', opaque, extra)
503                self.dcp_conn._handleSingleResponse(opaque)
504
505                buff_size = 0
506                if self.flow_control:
507                    # set connection buffer size. Considering header size, we roughly
508                    # set the total received package size as 10 times as value size.
509                    buff_size = self.batch_max_bytes * 10
510
511                opaque=self.r.randint(0, 2**32)
512                self.dcp_conn._sendCmd(couchbaseConstants.CMD_DCP_CONTROL,
513                                       couchbaseConstants.KEY_DCP_CONNECTION_BUFFER_SIZE,
514                                       str(self.batch_max_bytes * 10), opaque)
515                self.dcp_conn._handleSingleResponse(opaque)
516
517                opaque=self.r.randint(0, 2**32)
518                self.dcp_conn._sendCmd(couchbaseConstants.CMD_DCP_CONTROL,
519                                       couchbaseConstants.KEY_DCP_NOOP,
520                                       "true", opaque)
521                self.dcp_conn._handleSingleResponse(opaque)
522
523                opaque=self.r.randint(0, 2**32)
524                self.dcp_conn._sendCmd(couchbaseConstants.CMD_DCP_CONTROL,
525                                       couchbaseConstants.KEY_DCP_NOOP_INTERVAL,
526                                       str(180), opaque)
527                self.dcp_conn._handleSingleResponse(opaque)
528            except EOFError:
529                return "error: Fail to set up DCP connection"
530            except cb_bin_client.MemcachedError:
531                return "error: DCP connect memcached error"
532            except socket.error:
533                return "error: DCP connection error"
534            try:
535                opaque=self.r.randint(0, 2**32)
536                self.dcp_conn._sendCmd(couchbaseConstants.CMD_DCP_CONTROL,
537                                       couchbaseConstants.KEY_DCP_EXT_METADATA,
538                                       bool_to_str(True), opaque)
539                self.dcp_conn._handleSingleResponse(opaque)
540            except EOFError:
541                return "error: Fail to set up DCP connection"
542            except cb_bin_client.MemcachedError:
543                pass
544            except socket.error:
545                return "error: DCP connection error"
546
547            self.running = True
548            self.start()
549
550            self.add_start_event(self.dcp_conn)
551            self.setup_dcp_streams()
552        return 0
553
554    def ack_buffer_size(self, buf_size):
555        if self.flow_control:
556            try:
557                opaque=self.r.randint(0, 2**32)
558                self.dcp_conn._sendCmd(couchbaseConstants.CMD_DCP_BUFFER_ACK, '', '', \
559                    opaque, struct.pack(">I", int(buf_size)))
560                logging.debug("Send buffer size: %d" % buf_size)
561            except socket.error:
562                return "error: socket error during sending buffer ack msg"
563            except EOFError:
564                return "error: send buffer ack msg"
565
566        return None
567
568    def run(self):
569        if not self.dcp_conn:
570            logging.error("socket to memcached server is not created yet.")
571            return
572
573        bytes_read = ''
574        rd_timeout = 1
575        desc = [self.dcp_conn.s]
576        extra_bytes = 0
577        while self.running:
578            try:
579                if self.dcp_done:
580                    time.sleep(1)
581                    continue
582                readers, writers, errors = select.select(desc, [], [], rd_timeout)
583                rd_timeout = .25
584                if len(self.stream_list) == 0:
585                    time.sleep(1)
586                    continue
587
588                for reader in readers:
589                    data = reader.recv(self.recv_min_bytes)
590                    logging.debug("Read %d bytes off the wire" % len(data))
591                    if len(data) == 0:
592                        raise exceptions.EOFError("Got empty data (remote died?).")
593                    bytes_read += data
594                while len(bytes_read) >= couchbaseConstants.MIN_RECV_PACKET:
595                    magic, opcode, keylen, extlen, datatype, status, bodylen, opaque, cas= \
596                        struct.unpack(couchbaseConstants.RES_PKT_FMT, \
597                                      bytes_read[0:couchbaseConstants.MIN_RECV_PACKET])
598
599                    if len(bytes_read) < (couchbaseConstants.MIN_RECV_PACKET+bodylen):
600                        extra_bytes = len(bytes_read)
601                        break
602
603                    rd_timeout = 0
604                    body = bytes_read[couchbaseConstants.MIN_RECV_PACKET : \
605                                      couchbaseConstants.MIN_RECV_PACKET+bodylen]
606                    bytes_read = bytes_read[couchbaseConstants.MIN_RECV_PACKET+bodylen:]
607                    self.response.put((opcode, status, opaque, cas, keylen, extlen, body, \
608                                       bodylen, datatype, \
609                                       couchbaseConstants.MIN_RECV_PACKET+bodylen+extra_bytes))
610                    extra_bytes = 0
611            except socket.error:
612                break
613            except Exception, e:
614                pass
615
616    def setup_dcp_streams(self):
617        #send request to retrieve vblist and uuid for the node
618        stats = self.mem_conn.stats("vbucket-seqno")
619        if not stats:
620            return "error: fail to retrive vbucket seqno", None
621        self.mem_conn.close()
622
623        uuid_list = {}
624        seqno_list = {}
625        vb_list = {}
626        for key, val in stats.items():
627            vb, counter = key.split(":")
628            if counter in [DCPStreamSource.VB_UUID,
629                           DCPStreamSource.HIGH_SEQNO,
630                           DCPStreamSource.ABS_HIGH_SEQNO,
631                           DCPStreamSource.PURGE_SEQNO]:
632                if not vb_list.has_key(vb[3:]):
633                    vb_list[vb[3:]] = {}
634                vb_list[vb[3:]][counter] = int(val)
635        flags = 0
636        pair_index = (self.source_bucket['name'], self.source_node['hostname'])
637        vbucketsOrDict = None
638        vbuckets = None
639        if self.vbucket_list:
640            vbucketsOrDict = json.loads(self.vbucket_list)
641            if type(vbucketsOrDict) is dict:
642                vbuckets = []
643                for key, value in vbucketsOrDict.iteritems():
644                    vbuckets += value
645            else:
646                vbuckets = vbucketsOrDict
647        for vbid in vb_list.iterkeys():
648            if int(vbid) not in self.node_vbucket_map:
649                #skip nonactive vbucket
650                continue
651            if vbuckets and int(vbid) not in vbuckets:
652                #skip vbuckets that are not in this run
653                continue
654
655            if self.cur['seqno'] and self.cur['seqno'][pair_index]:
656                start_seqno = self.cur['seqno'][pair_index][vbid]
657            else:
658                start_seqno = 0
659            uuid = 0
660            if self.cur['failoverlog'] and self.cur['failoverlog'][pair_index]:
661                if vbid in self.cur['failoverlog'][pair_index] and \
662                   self.cur['failoverlog'][pair_index][vbid]:
663                    #Use the latest failover log
664                    self.cur['failoverlog'][pair_index][vbid] = \
665                        sorted(self.cur['failoverlog'][pair_index][vbid], \
666                               key=lambda tup: tup[1],
667                               reverse=True)
668                    uuid, _ = self.cur['failoverlog'][pair_index][vbid][0]
669            ss_start_seqno = start_seqno
670            ss_end_seqno = start_seqno
671            if self.cur['snapshot'] and self.cur['snapshot'][pair_index]:
672                if vbid in self.cur['snapshot'][pair_index] and \
673                   self.cur['snapshot'][pair_index][vbid]:
674                    ss_start_seqno, ss_end_seqno = self.cur['snapshot'][pair_index][vbid]
675                    if start_seqno == ss_end_seqno:
676                        ss_start_seqno = start_seqno
677            self.request_dcp_stream(int(vbid), flags, start_seqno,
678                                    vb_list[vbid][DCPStreamSource.HIGH_SEQNO],
679                                    uuid, ss_start_seqno, ss_end_seqno)
680
681    def request_dcp_stream(self,
682                           vbid,
683                           flags,
684                           start_seqno,
685                           end_seqno,
686                           vb_uuid,
687                           ss_start_seqno,
688                           ss_end_seqno):
689        if not self.dcp_conn:
690            return "error: no dcp connection setup yet.", None
691        extra = struct.pack(couchbaseConstants.DCP_STREAM_REQ_PKT_FMT,
692                            int(flags), 0,
693                            int(start_seqno),
694                            int(end_seqno),
695                            int(vb_uuid),
696                            int(ss_start_seqno),
697                            int(ss_end_seqno))
698        self.dcp_conn._sendMsg(couchbaseConstants.CMD_DCP_REQUEST_STREAM, '', '', vbid, \
699                               extra, 0, 0, vbid)
700        self.stream_list[vbid] = (vbid, flags, start_seqno, end_seqno, vb_uuid, ss_start_seqno, ss_end_seqno)
701
702    def _read_dcp_conn(self, dcp_conn):
703        buf, cmd, errcode, opaque, cas, keylen, extlen, data, datalen = \
704            self.recv_dcp_msg(dcp_conn.s)
705        dcp_conn.buf = buf
706
707        rv = 0
708        metalen = flags = ttl = flg = exp = 0
709        meta = key = val = ext = ''
710        need_ack = False
711        extlen = seqno = 0
712        if data:
713            ext = data[0:extlen]
714            if cmd == couchbaseConstants.CMD_DCP_MUTATION:
715                seqno, rev_seqno, flg, exp, locktime, meta = \
716                    struct.unpack(couchbaseConstants.DCP_MUTATION_PKT_FMT, ext)
717                key_start = extlen
718                val_start = key_start + keylen
719                key = data[extlen:val_start]
720                val = data[val_start:]
721            elif cmd == couchbaseConstants.CMD_DCP_DELETE or \
722                 cmd == couchbaseConstants.CMD_DCP_EXPIRATION:
723                seqno, rev_seqno, meta = \
724                    struct.unpack(couchbaseConstants.DCP_DELETE_PKT_FMT, ext)
725                key_start = extlen
726                val_start = key_start + keylen
727                key = data[extlen:val_start]
728            else:
729                rv = "error: uninterpreted DCP commands:%s" % cmd
730        elif datalen:
731            rv = "error: could not read full TAP message body"
732
733        return rv, cmd, errcode, key, flg, exp, cas, meta, val, opaque, need_ack, seqno
734
735    def _recv_dcp_msg_error(self, sock, buf):
736        pkt, buf = self.recv_dcp(sock, couchbaseConstants.MIN_RECV_PACKET, buf)
737        if not pkt:
738            raise EOFError()
739        magic, cmd, keylen, extlen, dtype, errcode, datalen, opaque, cas = \
740            struct.unpack(couchbaseConstants.REQ_PKT_FMT, pkt)
741        if magic != couchbaseConstants.REQ_MAGIC_BYTE:
742            raise Exception("unexpected recv_msg magic: " + str(magic))
743        data, buf = self.recv_dcp(sock, datalen, buf)
744        return buf, cmd, errcode, opaque, cas, keylen, extlen, data, datalen
745
746    def _recv_dcp_msg(self, sock):
747        response = ""
748        while len(response) < couchbaseConstants.MIN_RECV_PACKET:
749            data = sock.recv(couchbaseConstants.MIN_RECV_PACKET - len(response))
750            if data == '':
751                raise exceptions.EOFError("Got empty data (remote died?).")
752            response += data
753        assert len(response) == couchbaseConstants.MIN_RECV_PACKET
754        magic, cmd, keylen, extralen, dtype, errcode, remaining, opaque, cas=\
755            struct.unpack(couchbaseConstants.RES_PKT_FMT, response)
756
757        datalen = remaining
758        rv = ""
759        while remaining > 0:
760            data = sock.recv(remaining)
761            if data == '':
762                raise exceptions.EOFError("Got empty data (remote died?).")
763            rv += data
764            remaining -= len(data)
765
766        assert (magic in (couchbaseConstants.RES_MAGIC_BYTE, couchbaseConstants.REQ_MAGIC_BYTE)), \
767               "Got magic: %d" % magic
768        return cmd, errcode, opaque, cas, keylen, extralen, rv, datalen
769
770    def _recv_dcp(self, skt, nbytes, buf):
771        recv_arr = [ buf ]
772        recv_tot = len(buf) # In bytes.
773
774        while recv_tot < nbytes:
775            data = None
776            try:
777                data = skt.recv(max(nbytes - len(buf), self.recv_min_bytes))
778            except socket.timeout:
779                logging.error("error: recv socket.timeout")
780            except Exception, e:
781                msg = str(e)
782                if msg.find("Connection reset by peer") >= 0:
783                    logging.error("error: recv exception: " + \
784                        "%s, %s may be inactive" % (msg, self.source_node["hostname"]))
785                else:
786                    logging.error("error: recv exception: " + str(e))
787
788
789            if not data:
790                return None, ''
791
792            recv_arr.append(data)
793            recv_tot += len(data)
794
795        joined = ''.join(recv_arr)
796
797        return joined[:nbytes], joined[nbytes:]
798
799    @staticmethod
800    def total_msgs(opts, source_bucket, source_node, source_map):
801        source_name = source_node.get("hostname", None)
802        if not source_name:
803            return 0, None
804
805        spec = source_map['spec']
806        name = source_bucket['name']
807        vbuckets_num = len(source_bucket['vBucketServerMap']['vBucketMap'])
808        if not vbuckets_num:
809            return 0, None
810
811        vbucket_list = getattr(opts, "vbucket_list", None)
812
813        stats_vals = {}
814        host, port, user, pswd, _ = pump.parse_spec(opts, spec, 8091)
815        for stats in ["curr_items", "vb_active_resident_items_ratio"]:
816            path = "/pools/default/buckets/%s/stats/%s" % (name, stats)
817            err, json, data = pump.rest_request_json(host, int(port),
818                                                     user, pswd, opts.ssl, path,
819                                                     reason="total_msgs")
820            if err:
821                return 0, None
822
823            nodeStats = data.get("nodeStats", None)
824            if not nodeStats:
825                return 0, None
826            vals = nodeStats.get(source_name, None)
827            if not vals:
828                return 0, None
829            stats_vals[stats] = vals[-1]
830
831        total_msgs = stats_vals["curr_items"]
832        resident_ratio = stats_vals["vb_active_resident_items_ratio"]
833        if 0 < resident_ratio < 100:
834            #for DGM case, server will transfer both in-memory items and
835            #backfill all items on disk
836            total_msgs += (resident_ratio/100.0) * stats_vals["curr_items"]
837        return 0, int(total_msgs)
838