xref: /4.6.4/ep-engine/src/dcp/producer.cc (revision 17d8153f)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include <vector>
19
20#include "dcp/producer.h"
21
22#include "backfill.h"
23#include "compress.h"
24#include "common.h"
25#include "ep_engine.h"
26#include "failover-table.h"
27#include "dcp/backfill-manager.h"
28#include "dcp/response.h"
29#include "dcp/stream.h"
30
31const uint32_t DcpProducer::defaultNoopInerval = 20;
32
33DcpProducer::BufferLog::State DcpProducer::BufferLog::getState_UNLOCKED() {
34    if (isEnabled_UNLOCKED()) {
35        if (isFull_UNLOCKED()) {
36            return Full;
37        } else {
38            return SpaceAvailable;
39        }
40    }
41    return Disabled;
42}
43
44void DcpProducer::BufferLog::setBufferSize(size_t maxBytes) {
45    WriterLockHolder lh(logLock);
46    this->maxBytes = maxBytes;
47    if (maxBytes == 0) {
48        bytesSent = 0;
49        ackedBytes = 0;
50    }
51}
52
53bool DcpProducer::BufferLog::insert(size_t bytes) {
54    WriterLockHolder wlh(logLock);
55    bool inserted = false;
56    // If the log is not enabled
57    // or there is space, allow the insert
58    if (!isEnabled_UNLOCKED() || !isFull_UNLOCKED()) {
59        bytesSent += bytes;
60        inserted = true;
61    }
62    return inserted;
63}
64
65void DcpProducer::BufferLog::release_UNLOCKED(size_t bytes) {
66    if (bytesSent >= bytes) {
67        bytesSent -= bytes;
68    } else {
69        bytesSent = 0;
70    }
71}
72
73bool DcpProducer::BufferLog::pauseIfFull() {
74    ReaderLockHolder rlh(logLock);
75    if (getState_UNLOCKED() == Full) {
76        producer.setPaused(true);
77        return true;
78    }
79    return false;
80}
81
82void DcpProducer::BufferLog::unpauseIfSpaceAvailable() {
83    ReaderLockHolder rlh(logLock);
84    if (getState_UNLOCKED() != Full) {
85        producer.notifyPaused(true);
86    }
87}
88
89void DcpProducer::BufferLog::acknowledge(size_t bytes) {
90    WriterLockHolder wlh(logLock);
91    State state = getState_UNLOCKED();
92    if (state != Disabled) {
93        release_UNLOCKED(bytes);
94        ackedBytes += bytes;
95        if (state == Full) {
96            producer.notifyPaused(true);
97        }
98    }
99}
100
101void DcpProducer::BufferLog::addStats(ADD_STAT add_stat, const void *c) {
102    ReaderLockHolder rlh(logLock);
103    if (isEnabled_UNLOCKED()) {
104        producer.addStat("max_buffer_bytes", maxBytes, add_stat, c);
105        producer.addStat("unacked_bytes", bytesSent, add_stat, c);
106        producer.addStat("total_acked_bytes", ackedBytes, add_stat, c);
107        producer.addStat("flow_control", "enabled", add_stat, c);
108    } else {
109        producer.addStat("flow_control", "disabled", add_stat, c);
110    }
111}
112
113DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
114                         const std::string &name, bool isNotifier)
115    : Producer(e, cookie, name), rejectResp(NULL),
116      notifyOnly(isNotifier), lastSendTime(ep_current_time()), log(*this),
117      itemsSent(0), totalBytesSent(0) {
118    setSupportAck(true);
119    setReserved(true);
120    setPaused(true);
121
122    if (notifyOnly) {
123        setLogHeader("DCP (Notifier) " + getName() + " -");
124    } else {
125        setLogHeader("DCP (Producer) " + getName() + " -");
126    }
127    // Reduce the minimum log level of view engine DCP streams as they are
128    // extremely noisy due to creating new stream, per vbucket,per design doc
129    // every ~10s.
130    if (name.find("eq_dcpq:mapreduce_view") != std::string::npos ||
131        name.find("eq_dcpq:spatial_view") != std::string::npos) {
132        logger.min_log_level = EXTENSION_LOG_WARNING;
133    }
134
135    engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
136    priority.assign("medium");
137
138    // The consumer assigns opaques starting at 0 so lets have the producer
139    //start using opaques at 10M to prevent any opaque conflicts.
140    noopCtx.opaque = 10000000;
141    noopCtx.sendTime = ep_current_time();
142
143    // This is for backward compatibility with Couchbase 3.0. In 3.0 we set the
144    // noop interval to 20 seconds by default, but in post 3.0 releases we set
145    // it to be higher by default. Starting in 3.0.1 the DCP consumer sets the
146    // noop interval of the producer when connecting so in an all 3.0.1+ cluster
147    // this value will be overriden. In 3.0 however we do not set the noop
148    // interval so setting this value will make sure we don't disconnect on
149    // accident due to the producer and the consumer having a different noop
150    // interval.
151    noopCtx.noopInterval = defaultNoopInerval;
152    noopCtx.pendingRecv = false;
153    noopCtx.enabled = false;
154
155    enableExtMetaData = false;
156    enableValueCompression = false;
157
158    // Cursor dropping is disabled for replication connections by default,
159    // but will be enabled through a control message to support backward
160    // compatibility. For all other type of DCP connections, cursor dropping
161    // will be enabled by default.
162    if (name.find("replication") < name.length()) {
163        supportsCursorDropping = false;
164    } else {
165        supportsCursorDropping = true;
166    }
167
168    backfillMgr.reset(new BackfillManager(&engine_));
169
170    checkpointCreatorTask = new ActiveStreamCheckpointProcessorTask(e);
171    ExecutorPool::get()->schedule(checkpointCreatorTask, AUXIO_TASK_IDX);
172}
173
174DcpProducer::~DcpProducer() {
175    backfillMgr.reset();
176    delete rejectResp;
177
178    ExecutorPool::get()->cancel(checkpointCreatorTask->getId());
179}
180
181ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
182                                             uint32_t opaque,
183                                             uint16_t vbucket,
184                                             uint64_t start_seqno,
185                                             uint64_t end_seqno,
186                                             uint64_t vbucket_uuid,
187                                             uint64_t snap_start_seqno,
188                                             uint64_t snap_end_seqno,
189                                             uint64_t *rollback_seqno,
190                                             dcp_add_failover_log callback) {
191    if (doDisconnect()) {
192        return ENGINE_DISCONNECT;
193    }
194
195    RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
196    if (!vb) {
197        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
198            "this vbucket doesn't exist", logHeader(), vbucket);
199        return ENGINE_NOT_MY_VBUCKET;
200    }
201
202    if (vb->isBackfillPhase()) {
203        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
204            "this vbucket is in backfill state", logHeader(), vbucket);
205        return ENGINE_TMPFAIL;
206    }
207
208    if (!notifyOnly && start_seqno > end_seqno) {
209        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
210            "the start seqno (%" PRIu64 ") is larger than the end seqno "
211            "(%" PRIu64 "); "
212            "Incorrect params passed by the DCP client",
213            logHeader(), vbucket, start_seqno, end_seqno);
214        return ENGINE_ERANGE;
215    }
216
217    if (!notifyOnly && !(snap_start_seqno <= start_seqno &&
218        start_seqno <= snap_end_seqno)) {
219        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
220            "the snap start seqno (%" PRIu64 ") <= start seqno (%" PRIu64 ")"
221            " <= snap end seqno (%" PRIu64 ") is required", logHeader(), vbucket,
222            snap_start_seqno, start_seqno, snap_end_seqno);
223        return ENGINE_ERANGE;
224    }
225
226    bool add_vb_conn_map = true;
227    std::map<uint16_t, stream_t>::iterator itr;
228    {
229        WriterLockHolder wlh(streamsMutex);
230        if ((itr = streams.find(vbucket)) != streams.end()) {
231            if (itr->second->getState() != STREAM_DEAD) {
232                LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed"
233                    " because a stream already exists for this vbucket",
234                    logHeader(), vbucket);
235                return ENGINE_KEY_EEXISTS;
236            } else {
237                streams.erase(vbucket);
238
239                // Don't need to add an entry to vbucket-to-conns map
240                add_vb_conn_map = false;
241            }
242        }
243    }
244
245    // If we are a notify stream then we can't use the start_seqno supplied
246    // since if it is greater than the current high seqno then it will always
247    // trigger a rollback. As a result we should use the current high seqno for
248    // rollback purposes.
249    uint64_t notifySeqno = start_seqno;
250    if (notifyOnly && start_seqno > static_cast<uint64_t>(vb->getHighSeqno())) {
251        start_seqno = static_cast<uint64_t>(vb->getHighSeqno());
252    }
253
254    if (vb->failovers->needsRollback(start_seqno, vb->getHighSeqno(),
255                                     vbucket_uuid, snap_start_seqno,
256                                     snap_end_seqno, vb->getPurgeSeqno(),
257                                     rollback_seqno)) {
258        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed "
259            "because a rollback to seqno %" PRIu64 " is required "
260            "(start seqno %" PRIu64 ", vb_uuid %" PRIu64 ", snapStartSeqno %" PRIu64
261            ", snapEndSeqno %" PRIu64 ")",
262            logHeader(), vbucket, *rollback_seqno, start_seqno, vbucket_uuid,
263            snap_start_seqno, snap_end_seqno);
264        return ENGINE_ROLLBACK;
265    }
266
267    ENGINE_ERROR_CODE rv = vb->failovers->addFailoverLog(getCookie(), callback);
268    if (rv != ENGINE_SUCCESS) {
269        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Couldn't add failover log to "
270            "stream request due to error %d", logHeader(), vbucket, rv);
271        return rv;
272    }
273
274    if (flags & DCP_ADD_STREAM_FLAG_LATEST) {
275        end_seqno = vb->getHighSeqno();
276    }
277
278    if (flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
279        end_seqno = engine_.getEpStore()->getLastPersistedSeqno(vbucket);
280    }
281
282    if (!notifyOnly && start_seqno > end_seqno) {
283        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
284            "the start seqno (%" PRIu64 ") is larger than the end seqno (%"
285            PRIu64 "), stream request flags %d, vb_uuid %" PRIu64
286            ", snapStartSeqno %" PRIu64 ", snapEndSeqno %" PRIu64
287            "; should have rolled back instead",
288            logHeader(), vbucket, start_seqno, end_seqno, flags, vbucket_uuid,
289            snap_start_seqno, snap_end_seqno);
290        return ENGINE_ERANGE;
291    }
292
293    if (!notifyOnly && start_seqno > static_cast<uint64_t>(vb->getHighSeqno()))
294    {
295        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
296            "the start seqno (%" PRIu64 ") is larger than the vb highSeqno (%"
297            PRId64 "), stream request flags is %d, vb_uuid %" PRIu64
298            ", snapStartSeqno %" PRIu64 ", snapEndSeqno %" PRIu64
299            "; should have rolled back instead",
300            logHeader(), vbucket, start_seqno, vb->getHighSeqno(), flags,
301            vbucket_uuid, snap_start_seqno, snap_end_seqno);
302        return ENGINE_ERANGE;
303    }
304
305    stream_t s;
306    if (notifyOnly) {
307        s = new NotifierStream(&engine_, this, getName(), flags,
308                               opaque, vbucket, notifySeqno,
309                               end_seqno, vbucket_uuid,
310                               snap_start_seqno, snap_end_seqno);
311   } else {
312        s = new ActiveStream(&engine_, this, getName(), flags,
313                             opaque, vbucket, start_seqno,
314                             end_seqno, vbucket_uuid,
315                             snap_start_seqno, snap_end_seqno);
316    }
317
318    {
319        ReaderLockHolder rlh(vb->getStateLock());
320        if (vb->getState() == vbucket_state_dead) {
321            LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
322                    "this vbucket is in dead state", logHeader(), vbucket);
323            return ENGINE_NOT_MY_VBUCKET;
324        }
325
326        WriterLockHolder wlh(streamsMutex);
327        s->setActive();
328        streams[vbucket] = s;
329    }
330
331    notifyStreamReady(vbucket);
332
333    if (add_vb_conn_map) {
334        connection_t conn(this);
335        engine_.getDcpConnMap().addVBConnByVBId(conn, vbucket);
336    }
337
338    return rv;
339}
340
341ENGINE_ERROR_CODE DcpProducer::getFailoverLog(uint32_t opaque, uint16_t vbucket,
342                                              dcp_add_failover_log callback) {
343    (void) opaque;
344    if (doDisconnect()) {
345        return ENGINE_DISCONNECT;
346    }
347
348    RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
349    if (!vb) {
350        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Get Failover Log failed "
351            "because this vbucket doesn't exist", logHeader(), vbucket);
352        return ENGINE_NOT_MY_VBUCKET;
353    }
354
355    return vb->failovers->addFailoverLog(getCookie(), callback);
356}
357
358ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
359    setLastWalkTime();
360
361    if (doDisconnect()) {
362        return ENGINE_DISCONNECT;
363    }
364
365    ENGINE_ERROR_CODE ret;
366    if ((ret = maybeSendNoop(producers)) != ENGINE_FAILED) {
367        return ret;
368    }
369
370    DcpResponse *resp;
371    if (rejectResp) {
372        resp = rejectResp;
373        rejectResp = NULL;
374    } else {
375        resp = getNextItem();
376        if (!resp) {
377            return ENGINE_SUCCESS;
378        }
379    }
380
381    Item* itmCpy = NULL;
382    if (resp->getEvent() == DCP_MUTATION) {
383        try {
384            itmCpy = static_cast<MutationResponse*>(resp)->getItemCopy();
385        } catch (const std::bad_alloc&) {
386            rejectResp = resp;
387            LOG(EXTENSION_LOG_WARNING, "%s (vb %d) ENOMEM while trying to copy "
388                "item with seqno %" PRIu64 "before streaming it", logHeader(),
389                static_cast<MutationResponse*>(resp)->getVBucket(),
390                static_cast<MutationResponse*>(resp)->getBySeqno());
391            return ENGINE_ENOMEM;
392        } catch (const std::logic_error&) {
393            rejectResp = resp;
394            LOG(EXTENSION_LOG_WARNING, "%s (vb %d) illegal mutation payload "
395                "type while copying an item with seqno %" PRIu64 "before "
396                "streaming it", logHeader(),
397                static_cast<MutationResponse*>(resp)->getVBucket(),
398                static_cast<MutationResponse*>(resp)->getBySeqno());
399            return ENGINE_ENOTSUP;
400        }
401
402        if (enableValueCompression) {
403            /**
404             * If value compression is enabled, the producer will need
405             * to snappy-compress the document before transmitting.
406             * Compression will obviously be done only if the datatype
407             * indicates that the value isn't compressed already.
408             */
409            uint32_t sizeBefore = itmCpy->getNBytes();
410            if (!itmCpy->compressValue(
411                            engine_.getDcpConnMap().getMinCompressionRatio())) {
412                LOG(EXTENSION_LOG_WARNING,
413                    "%s Failed to snappy compress an uncompressed value!",
414                    logHeader());
415            }
416            uint32_t sizeAfter = itmCpy->getNBytes();
417
418            if (sizeAfter < sizeBefore) {
419                log.acknowledge(sizeBefore - sizeAfter);
420            }
421        }
422
423    }
424
425    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL,
426                                                                     true);
427    switch (resp->getEvent()) {
428        case DCP_STREAM_END:
429        {
430            StreamEndResponse *se = static_cast<StreamEndResponse*>(resp);
431            ret = producers->stream_end(getCookie(), se->getOpaque(),
432                                        se->getVbucket(), se->getFlags());
433            break;
434        }
435        case DCP_MUTATION:
436        {
437            MutationResponse *m = dynamic_cast<MutationResponse*> (resp);
438            if (m->getExtMetaData()) {
439                std::pair<const char*, uint16_t> meta = m->getExtMetaData()->getExtMeta();
440                ret = producers->mutation(getCookie(), m->getOpaque(), itmCpy,
441                                          m->getVBucket(), m->getBySeqno(),
442                                          m->getRevSeqno(), 0,
443                                          meta.first, meta.second,
444                                          m->getItem()->getNRUValue());
445            } else {
446                ret = producers->mutation(getCookie(), m->getOpaque(), itmCpy,
447                                          m->getVBucket(), m->getBySeqno(),
448                                          m->getRevSeqno(), 0,
449                                          NULL, 0,
450                                          m->getItem()->getNRUValue());
451            }
452            break;
453        }
454        case DCP_DELETION:
455        {
456            MutationResponse *m = static_cast<MutationResponse*>(resp);
457            if (m->getExtMetaData()) {
458                std::pair<const char*, uint16_t> meta = m->getExtMetaData()->getExtMeta();
459                ret = producers->deletion(getCookie(), m->getOpaque(),
460                                          m->getItem()->getKey().c_str(),
461                                          m->getItem()->getNKey(),
462                                          m->getItem()->getCas(),
463                                          m->getVBucket(), m->getBySeqno(),
464                                          m->getRevSeqno(),
465                                          meta.first, meta.second);
466            } else {
467                ret = producers->deletion(getCookie(), m->getOpaque(),
468                                          m->getItem()->getKey().c_str(),
469                                          m->getItem()->getNKey(),
470                                          m->getItem()->getCas(),
471                                          m->getVBucket(), m->getBySeqno(),
472                                          m->getRevSeqno(),
473                                          NULL, 0);
474            }
475            break;
476        }
477        case DCP_SNAPSHOT_MARKER:
478        {
479            SnapshotMarker *s = static_cast<SnapshotMarker*>(resp);
480            ret = producers->marker(getCookie(), s->getOpaque(),
481                                    s->getVBucket(),
482                                    s->getStartSeqno(),
483                                    s->getEndSeqno(),
484                                    s->getFlags());
485            break;
486        }
487        case DCP_SET_VBUCKET:
488        {
489            SetVBucketState *s = static_cast<SetVBucketState*>(resp);
490            ret = producers->set_vbucket_state(getCookie(), s->getOpaque(),
491                                               s->getVBucket(), s->getState());
492            break;
493        }
494        default:
495        {
496            LOG(EXTENSION_LOG_WARNING, "%s Unexpected dcp event (%d), "
497                "disconnecting", logHeader(), resp->getEvent());
498            ret = ENGINE_DISCONNECT;
499            break;
500        }
501    }
502
503    ObjectRegistry::onSwitchThread(epe);
504    if (resp->getEvent() == DCP_MUTATION && ret != ENGINE_SUCCESS) {
505        delete itmCpy;
506    }
507
508    if (ret == ENGINE_E2BIG) {
509        rejectResp = resp;
510    } else {
511        delete resp;
512    }
513
514    lastSendTime = ep_current_time();
515    return (ret == ENGINE_SUCCESS) ? ENGINE_WANT_MORE : ret;
516}
517
518ENGINE_ERROR_CODE DcpProducer::bufferAcknowledgement(uint32_t opaque,
519                                                     uint16_t vbucket,
520                                                     uint32_t buffer_bytes) {
521    log.acknowledge(buffer_bytes);
522    return ENGINE_SUCCESS;
523}
524
525ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
526                                       uint16_t nkey, const void* value,
527                                       uint32_t nvalue) {
528    const char* param = static_cast<const char*>(key);
529    std::string keyStr(static_cast<const char*>(key), nkey);
530    std::string valueStr(static_cast<const char*>(value), nvalue);
531
532    if (strncmp(param, "connection_buffer_size", nkey) == 0) {
533        uint32_t size;
534        if (parseUint32(valueStr.c_str(), &size)) {
535            /* Size 0 implies the client (DCP consumer) does not support
536               flow control */
537            log.setBufferSize(size);
538            return ENGINE_SUCCESS;
539        }
540    } else if (strncmp(param, "stream_buffer_size", nkey) == 0) {
541        LOG(EXTENSION_LOG_WARNING, "%s The ctrl parameter stream_buffer_size is"
542            "not supported by this engine", logHeader());
543        return ENGINE_ENOTSUP;
544    } else if (strncmp(param, "enable_noop", nkey) == 0) {
545        if (valueStr == "true") {
546            noopCtx.enabled = true;
547        } else {
548            noopCtx.enabled = false;
549        }
550        return ENGINE_SUCCESS;
551    } else if (strncmp(param, "enable_ext_metadata", nkey) == 0) {
552        if (valueStr == "true") {
553            enableExtMetaData = true;
554        } else {
555            enableExtMetaData = false;
556        }
557        return ENGINE_SUCCESS;
558    } else if (strncmp(param, "enable_value_compression", nkey) == 0) {
559        if (valueStr == "true") {
560            enableValueCompression = true;
561        } else {
562            enableValueCompression = false;
563        }
564        return ENGINE_SUCCESS;
565    } else if (strncmp(param, "supports_cursor_dropping", nkey) == 0) {
566        if (valueStr == "true") {
567            supportsCursorDropping = true;
568        } else {
569            supportsCursorDropping = false;
570        }
571        return ENGINE_SUCCESS;
572    } else if (strncmp(param, "set_noop_interval", nkey) == 0) {
573        if (parseUint32(valueStr.c_str(), &noopCtx.noopInterval)) {
574            return ENGINE_SUCCESS;
575        }
576    } else if(strncmp(param, "set_priority", nkey) == 0) {
577        if (valueStr == "high") {
578            engine_.setDCPPriority(getCookie(), CONN_PRIORITY_HIGH);
579            priority.assign("high");
580            return ENGINE_SUCCESS;
581        } else if (valueStr == "medium") {
582            engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
583            priority.assign("medium");
584            return ENGINE_SUCCESS;
585        } else if (valueStr == "low") {
586            engine_.setDCPPriority(getCookie(), CONN_PRIORITY_LOW);
587            priority.assign("low");
588            return ENGINE_SUCCESS;
589        }
590    }
591
592    LOG(EXTENSION_LOG_WARNING, "%s Invalid ctrl parameter '%s' for %s",
593        logHeader(), valueStr.c_str(), keyStr.c_str());
594
595    return ENGINE_EINVAL;
596}
597
598ENGINE_ERROR_CODE DcpProducer::handleResponse(
599                                        protocol_binary_response_header *resp) {
600    if (doDisconnect()) {
601        return ENGINE_DISCONNECT;
602    }
603
604    uint8_t opcode = resp->response.opcode;
605    if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE ||
606        opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
607        protocol_binary_response_dcp_stream_req* pkt =
608            reinterpret_cast<protocol_binary_response_dcp_stream_req*>(resp);
609        uint32_t opaque = pkt->message.header.response.opaque;
610
611        stream_t active_stream;
612        std::map<uint16_t, stream_t>::iterator itr;
613        {
614            ReaderLockHolder rlh(streamsMutex);
615            for (itr = streams.begin() ; itr != streams.end(); ++itr) {
616                active_stream = itr->second;
617                Stream *str = active_stream.get();
618                if (str && str->getType() == STREAM_ACTIVE) {
619                    ActiveStream* as = static_cast<ActiveStream*>(str);
620                    if (as && opaque == itr->second->getOpaque()) {
621                        break;
622                    }
623                }
624            }
625        }
626
627        if (itr != streams.end()) {
628            ActiveStream *as = static_cast<ActiveStream*>(active_stream.get());
629            if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE) {
630                as->setVBucketStateAckRecieved();
631            } else if (opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
632                as->snapshotMarkerAckReceived();
633            }
634        }
635
636        return ENGINE_SUCCESS;
637    } else if (opcode == PROTOCOL_BINARY_CMD_DCP_MUTATION ||
638        opcode == PROTOCOL_BINARY_CMD_DCP_DELETION ||
639        opcode == PROTOCOL_BINARY_CMD_DCP_EXPIRATION ||
640        opcode == PROTOCOL_BINARY_CMD_DCP_STREAM_END) {
641        // TODO: When nacking is implemented we need to handle these responses
642        return ENGINE_SUCCESS;
643    } else if (opcode == PROTOCOL_BINARY_CMD_DCP_NOOP) {
644        if (noopCtx.opaque == resp->response.opaque) {
645            noopCtx.pendingRecv = false;
646            return ENGINE_SUCCESS;
647        }
648    }
649
650    LOG(EXTENSION_LOG_WARNING, "%s Trying to handle an unknown response %d, "
651        "disconnecting", logHeader(), opcode);
652
653    return ENGINE_DISCONNECT;
654}
655
656ENGINE_ERROR_CODE DcpProducer::closeStream(uint32_t opaque, uint16_t vbucket) {
657    if (doDisconnect()) {
658        return ENGINE_DISCONNECT;
659    }
660
661    stream_t stream = findStreamByVbid(vbucket);
662    ENGINE_ERROR_CODE ret;
663    if (!stream) {
664        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because no "
665            "stream exists for this vbucket", logHeader(), vbucket);
666        return ENGINE_KEY_ENOENT;
667    } else if (!stream->isActive()) {
668        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because "
669            "stream is already marked as dead", logHeader(), vbucket);
670        connection_t conn(this);
671        engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
672        ret = ENGINE_KEY_ENOENT;
673    } else {
674        stream->setDead(END_STREAM_CLOSED);
675        connection_t conn(this);
676        engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
677        ret = ENGINE_SUCCESS;
678    }
679
680    {
681        WriterLockHolder wlh(streamsMutex);
682        streams.erase(vbucket);
683    }
684
685    return ret;
686}
687
688void DcpProducer::notifyBackfillManager() {
689    backfillMgr->wakeUpTask();
690}
691
692bool DcpProducer::recordBackfillManagerBytesRead(uint32_t bytes) {
693    return backfillMgr->bytesRead(bytes);
694}
695
696void DcpProducer::recordBackfillManagerBytesSent(uint32_t bytes) {
697    backfillMgr->bytesSent(bytes);
698}
699
700void DcpProducer::scheduleBackfillManager(stream_t s,
701                                          uint64_t start, uint64_t end) {
702    backfillMgr->schedule(s, start, end);
703}
704
705void DcpProducer::addStats(ADD_STAT add_stat, const void *c) {
706    Producer::addStats(add_stat, c);
707
708    addStat("items_sent", getItemsSent(), add_stat, c);
709    addStat("items_remaining", getItemsRemaining(), add_stat, c);
710    addStat("total_bytes_sent", getTotalBytes(), add_stat, c);
711    addStat("last_sent_time", lastSendTime, add_stat,
712            c);
713    addStat("noop_enabled", noopCtx.enabled, add_stat, c);
714    addStat("noop_wait", noopCtx.pendingRecv, add_stat, c);
715    addStat("priority", priority.c_str(), add_stat, c);
716    addStat("enable_ext_metadata", enableExtMetaData ? "enabled" : "disabled",
717            add_stat, c);
718    addStat("enable_value_compression",
719            enableValueCompression ? "enabled" : "disabled",
720            add_stat, c);
721    addStat("cursor_dropping",
722            supportsCursorDropping ? "ELIGIBLE" : "NOT_ELIGIBLE",
723            add_stat, c);
724
725    // Possible that the producer has had its streams closed and hence doesn't
726    // have a backfill manager anymore.
727    if (backfillMgr) {
728        backfillMgr->addStats(this, add_stat, c);
729    }
730
731    log.addStats(add_stat, c);
732
733    ReaderLockHolder rlh(streamsMutex);
734    addStat("num_streams", streams.size(), add_stat, c);
735    std::map<uint16_t, stream_t>::iterator itr;
736    for (itr = streams.begin(); itr != streams.end(); ++itr) {
737        itr->second->addStats(add_stat, c);
738    }
739}
740
741void DcpProducer::addTakeoverStats(ADD_STAT add_stat, const void* c,
742                                   uint16_t vbid) {
743
744    stream_t stream = findStreamByVbid(vbid);
745    if (stream && stream->getType() == STREAM_ACTIVE) {
746        ActiveStream* as = static_cast<ActiveStream*>(stream.get());
747        if (as) {
748            if (as->getState() == STREAM_DEAD) {
749                return;
750            }
751            as->addTakeoverStats(add_stat, c);
752        }
753    }
754}
755
756void DcpProducer::aggregateQueueStats(ConnCounter& aggregator) {
757    aggregator.conn_queueDrain += itemsSent;
758    aggregator.conn_totalBytes += totalBytesSent;
759    aggregator.conn_queueRemaining += getItemsRemaining();
760    aggregator.conn_queueBackfillRemaining += totalBackfillBacklogs;
761}
762
763void DcpProducer::notifySeqnoAvailable(uint16_t vbucket, uint64_t seqno) {
764    stream_t stream = findStreamByVbid(vbucket);
765    if (stream && stream->isActive()) {
766        stream->notifySeqnoAvailable(seqno);
767    }
768}
769
770void DcpProducer::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
771    stream_t stream = findStreamByVbid(vbucket);
772    if (stream) {
773        LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") State changed to "
774            "%s, closing active stream!",
775            logHeader(), vbucket, VBucket::toString(state));
776        stream->setDead(END_STREAM_STATE);
777    }
778}
779
780bool DcpProducer::handleSlowStream(uint16_t vbid,
781                                   const std::string &name) {
782    if (supportsCursorDropping) {
783        stream_t stream = findStreamByVbid(vbid);
784        if (stream) {
785            if (stream->getName().compare(name) == 0) {
786                ActiveStream* as = static_cast<ActiveStream*>(stream.get());
787                if (as) {
788                    LOG(EXTENSION_LOG_NOTICE, "%s (vb %" PRIu16 ")  Producer "
789                        "is handling slow stream;"
790                        " state:%s lastReadSeqno:%" PRIu64
791                        " lastSentSeqno:%" PRIu64,
792                        logHeader(), vbid,
793                        Stream::stateName(as->getState()),
794                        as->getLastReadSeqno(),
795                        as->getLastSentSeqno());
796                    as->handleSlowStream();
797                    return true;
798                }
799            }
800        }
801    }
802    return false;
803}
804
805void DcpProducer::closeAllStreams() {
806    std::vector<uint16_t> vbvector;
807    {
808        WriterLockHolder wlh(streamsMutex);
809        while (!streams.empty()) {
810            std::map<uint16_t, stream_t>::iterator itr = streams.begin();
811            uint16_t vbid = itr->first;
812            itr->second->setDead(END_STREAM_DISCONNECTED);
813            streams.erase(vbid);
814            vbvector.push_back(vbid);
815        }
816    }
817    connection_t conn(this);
818    for (const auto vbid: vbvector) {
819         engine_.getDcpConnMap().removeVBConnByVBId(conn, vbid);
820    }
821
822    // Destroy the backfillManager. (BackfillManager task also
823    // may hold a weak reference to it while running, but that is
824    // guaranteed to decay and free the BackfillManager once it
825    // completes run().
826    // This will terminate any tasks and delete any backfills
827    // associated with this Producer.  This is necessary as if we
828    // don't, then the RCPtr references which exist between
829    // DcpProducer and ActiveStream result in us leaking DcpProducer
830    // objects (and Couchstore vBucket files, via DCPBackfill task).
831    backfillMgr.reset();
832}
833
834const char* DcpProducer::getType() const {
835    if (notifyOnly) {
836        return "notifier";
837    } else {
838        return "producer";
839    }
840}
841
842DcpResponse* DcpProducer::getNextItem() {
843    do {
844        setPaused(false);
845
846        uint16_t vbucket = 0;
847        while (ready.popFront(vbucket)) {
848            if (log.pauseIfFull()) {
849                ready.pushUnique(vbucket);
850                return NULL;
851            }
852
853            DcpResponse* op = NULL;
854            stream_t stream;
855            {
856                ReaderLockHolder rlh(streamsMutex);
857                std::map<uint16_t, stream_t>::iterator it = streams.find(vbucket);
858                if (it == streams.end()) {
859                    continue;
860                }
861                stream.reset(it->second);
862            }
863
864            op = stream->next();
865
866            if (!op) {
867                // stream is empty, try another vbucket.
868                continue;
869            }
870
871            switch (op->getEvent()) {
872                case DCP_SNAPSHOT_MARKER:
873                case DCP_MUTATION:
874                case DCP_DELETION:
875                case DCP_EXPIRATION:
876                case DCP_STREAM_END:
877                case DCP_SET_VBUCKET:
878                    break;
879                default:
880                    throw std::logic_error(
881                            std::string("DcpProducer::getNextItem: "
882                            "Producer (") + logHeader() + ") is attempting to "
883                            "write an unexpected event:" +
884                            std::to_string(op->getEvent()));
885            }
886
887            ready.pushUnique(vbucket);
888
889            if (op->getEvent() == DCP_MUTATION || op->getEvent() == DCP_DELETION ||
890                op->getEvent() == DCP_EXPIRATION) {
891                itemsSent++;
892            }
893
894            totalBytesSent.fetch_add(op->getMessageSize());
895
896            return op;
897        }
898
899        // flag we are paused
900        setPaused(true);
901
902        // re-check the ready queue.
903        // A new vbucket could of became ready and the notifier could of seen
904        // paused = false, so reloop so we don't miss an operation.
905    } while(!ready.empty());
906
907    return NULL;
908}
909
910void DcpProducer::setDisconnect(bool disconnect) {
911    ConnHandler::setDisconnect(disconnect);
912
913    if (disconnect) {
914        ReaderLockHolder rlh(streamsMutex);
915        std::map<uint16_t, stream_t>::iterator itr = streams.begin();
916        for (; itr != streams.end(); ++itr) {
917            itr->second->setDead(END_STREAM_DISCONNECTED);
918        }
919    }
920}
921
922void DcpProducer::notifyStreamReady(uint16_t vbucket) {
923    if (ready.pushUnique(vbucket)) {
924        log.unpauseIfSpaceAvailable();
925    }
926}
927
928void DcpProducer::notifyPaused(bool schedule) {
929    engine_.getDcpConnMap().notifyPausedConnection(this, schedule);
930}
931
932ENGINE_ERROR_CODE DcpProducer::maybeSendNoop(struct dcp_message_producers* producers) {
933    if (!noopCtx.enabled) {
934        // Returning ENGINE_FAILED means ignore and continue without sending a noop
935        return ENGINE_FAILED;
936    }
937    size_t sinceTime = ep_current_time() - noopCtx.sendTime;
938    if (sinceTime <= noopCtx.noopInterval) {
939        // The time interval has not passed so ignore and continue without sending
940        return ENGINE_FAILED;
941    }
942    // The time interval has passed.  First check to see if waiting for a noop reply
943    if (noopCtx.pendingRecv) {
944        LOG(EXTENSION_LOG_NOTICE, "%s Disconnected because the connection"
945            " appears to be dead", logHeader());
946        return ENGINE_DISCONNECT;
947    }
948    // Try to send a noop to the consumer
949    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
950    ENGINE_ERROR_CODE ret = producers->noop(getCookie(), ++noopCtx.opaque);
951    ObjectRegistry::onSwitchThread(epe);
952
953    if (ret == ENGINE_SUCCESS) {
954        ret = ENGINE_WANT_MORE;
955        noopCtx.pendingRecv = true;
956        noopCtx.sendTime = ep_current_time();
957        lastSendTime = noopCtx.sendTime;
958    }
959    return ret;
960}
961
962bool DcpProducer::isTimeForNoop() {
963    // Not Implemented
964    return false;
965}
966
967void DcpProducer::setTimeForNoop() {
968    // Not Implemented
969}
970
971void DcpProducer::clearQueues() {
972    WriterLockHolder wlh(streamsMutex);
973    for (const auto element: streams) {
974        element.second->clear();
975    }
976}
977
978size_t DcpProducer::getBackfillQueueSize() {
979    return totalBackfillBacklogs;
980}
981
982size_t DcpProducer::getItemsSent() {
983    return itemsSent;
984}
985
986size_t DcpProducer::getItemsRemaining() {
987    size_t remainingSize = 0;
988    ReaderLockHolder rlh(streamsMutex);
989    std::map<uint16_t, stream_t>::iterator itr = streams.begin();
990    for (; itr != streams.end(); ++itr) {
991        Stream *s = (itr->second).get();
992
993        if (s->getType() == STREAM_ACTIVE) {
994            ActiveStream *as = static_cast<ActiveStream *>(s);
995            remainingSize += as->getItemsRemaining();
996        }
997    }
998
999    return remainingSize;
1000}
1001
1002size_t DcpProducer::getTotalBytes() {
1003    return totalBytesSent;
1004}
1005
1006stream_t DcpProducer::findStreamByVbid(uint16_t vbid) {
1007    ReaderLockHolder rlh(streamsMutex);
1008    stream_t stream;
1009    std::map<uint16_t, stream_t>::iterator itr = streams.find(vbid);
1010    if (itr != streams.end()) {
1011        stream = itr->second;
1012    }
1013    return stream;
1014}
1015
1016std::vector<uint16_t> DcpProducer::getVBVector() {
1017    ReaderLockHolder rlh(streamsMutex);
1018    std::vector<uint16_t> vbvector;
1019    for (const auto element: streams) {
1020        vbvector.push_back(element.first);
1021    }
1022    return vbvector;
1023}
1024
1025bool DcpProducer::bufferLogInsert(size_t bytes) {
1026    return log.insert(bytes);
1027}
1028
1029void DcpProducer::scheduleCheckpointProcessorTask(stream_t s) {
1030    static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
1031        ->schedule(s);
1032}
1033
1034void DcpProducer::clearCheckpointProcessorTaskQueues() {
1035    static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
1036        ->clearQueues();
1037}
1038