xref: /6.6.0/kv_engine/engines/ep/src/dcp/producer.cc (revision 36134ec9)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "dcp/producer.h"
19
20#include "backfill.h"
21#include "bucket_logger.h"
22#include "checkpoint_manager.h"
23#include "collections/manager.h"
24#include "collections/vbucket_filter.h"
25#include "common.h"
26#include "connhandler_impl.h"
27#include "dcp/active_stream.h"
28#include "dcp/active_stream_checkpoint_processor_task.h"
29#include "dcp/backfill-manager.h"
30#include "dcp/dcpconnmap.h"
31#include "dcp/notifier_stream.h"
32#include "dcp/response.h"
33#include "executorpool.h"
34#include "failover-table.h"
35#include "item_eviction.h"
36#include "kv_bucket.h"
37#include "snappy-c.h"
38#include "statwriter.h"
39
40#include <memcached/server_cookie_iface.h>
41
42const std::chrono::seconds DcpProducer::defaultDcpNoopTxInterval(20);
43
44DcpProducer::BufferLog::State DcpProducer::BufferLog::getState_UNLOCKED() {
45    if (isEnabled_UNLOCKED()) {
46        if (isFull_UNLOCKED()) {
47            return Full;
48        } else {
49            return SpaceAvailable;
50        }
51    }
52    return Disabled;
53}
54
55void DcpProducer::BufferLog::setBufferSize(size_t maxBytes) {
56    std::unique_lock<folly::SharedMutex> wlh(logLock);
57    this->maxBytes = maxBytes;
58    if (maxBytes == 0) {
59        bytesOutstanding = 0;
60        ackedBytes.reset(0);
61    }
62}
63
64bool DcpProducer::BufferLog::insert(size_t bytes) {
65    std::unique_lock<folly::SharedMutex> wlh(logLock);
66    bool inserted = false;
67    // If the log is not enabled
68    // or there is space, allow the insert
69    if (!isEnabled_UNLOCKED() || !isFull_UNLOCKED()) {
70        bytesOutstanding += bytes;
71        inserted = true;
72    }
73    return inserted;
74}
75
76void DcpProducer::BufferLog::release_UNLOCKED(size_t bytes) {
77    if (bytes > bytesOutstanding) {
78        EP_LOG_INFO(
79                "{} Attempting to release {} bytes which is greater than "
80                "bytesOutstanding:{}",
81                producer.logHeader(),
82                uint64_t(bytes),
83                uint64_t(bytesOutstanding));
84    }
85
86    bytesOutstanding -= bytes;
87}
88
89bool DcpProducer::BufferLog::pauseIfFull() {
90    std::shared_lock<folly::SharedMutex> rhl(logLock);
91    if (getState_UNLOCKED() == Full) {
92        producer.pause(PausedReason::BufferLogFull);
93        return true;
94    }
95    return false;
96}
97
98void DcpProducer::BufferLog::unpauseIfSpaceAvailable() {
99    std::shared_lock<folly::SharedMutex> rhl(logLock);
100    if (getState_UNLOCKED() == Full) {
101        EP_LOG_INFO(
102                "{} Unable to notify paused connection because "
103                "DcpProducer::BufferLog is full; ackedBytes:{}"
104                ", bytesSent:{}, maxBytes:{}",
105                producer.logHeader(),
106                uint64_t(ackedBytes),
107                uint64_t(bytesOutstanding),
108                uint64_t(maxBytes));
109    } else {
110        producer.scheduleNotify();
111    }
112}
113
114void DcpProducer::BufferLog::acknowledge(size_t bytes) {
115    std::unique_lock<folly::SharedMutex> wlh(logLock);
116    State state = getState_UNLOCKED();
117    if (state != Disabled) {
118        release_UNLOCKED(bytes);
119        ackedBytes += bytes;
120
121        if (state == Full) {
122            EP_LOG_INFO(
123                    "{} Notifying paused connection now that "
124                    "DcpProducer::BufferLog is no longer full; ackedBytes:{}"
125                    ", bytesSent:{}, maxBytes:{}",
126                    producer.logHeader(),
127                    uint64_t(ackedBytes),
128                    uint64_t(bytesOutstanding),
129                    uint64_t(maxBytes));
130            producer.scheduleNotify();
131        }
132    }
133}
134
135void DcpProducer::BufferLog::addStats(const AddStatFn& add_stat,
136                                      const void* c) {
137    std::shared_lock<folly::SharedMutex> rhl(logLock);
138    if (isEnabled_UNLOCKED()) {
139        producer.addStat("max_buffer_bytes", maxBytes, add_stat, c);
140        producer.addStat("unacked_bytes", bytesOutstanding, add_stat, c);
141        producer.addStat("total_acked_bytes", ackedBytes, add_stat, c);
142        producer.addStat("flow_control", "enabled", add_stat, c);
143    } else {
144        producer.addStat("flow_control", "disabled", add_stat, c);
145    }
146}
147
148/// Decode IncludeValue from DCP producer flags.
149static IncludeValue toIncludeValue(uint32_t flags) {
150    using cb::mcbp::request::DcpOpenPayload;
151    if ((flags & DcpOpenPayload::NoValue) != 0) {
152        return IncludeValue::No;
153    }
154    if ((flags & DcpOpenPayload::NoValueWithUnderlyingDatatype) != 0) {
155        return IncludeValue::NoWithUnderlyingDatatype;
156    }
157    return IncludeValue::Yes;
158}
159
160DcpProducer::DcpProducer(EventuallyPersistentEngine& e,
161                         const void* cookie,
162                         const std::string& name,
163                         uint32_t flags,
164                         bool startTask)
165    : ConnHandler(e, cookie, name),
166      notifyOnly((flags & cb::mcbp::request::DcpOpenPayload::Notifier) != 0),
167      sendStreamEndOnClientStreamClose(false),
168      consumerSupportsHifiMfu(false),
169      lastSendTime(ep_current_time()),
170      log(*this),
171      backfillMgr(std::make_shared<BackfillManager>(engine_)),
172      streams(streamsMapSize),
173      itemsSent(0),
174      totalBytesSent(0),
175      totalUncompressedDataSize(0),
176      includeValue(toIncludeValue(flags)),
177      includeXattrs(
178              ((flags & cb::mcbp::request::DcpOpenPayload::IncludeXattrs) != 0)
179                      ? IncludeXattrs::Yes
180                      : IncludeXattrs::No),
181      includeDeleteTime(
182              ((flags &
183                cb::mcbp::request::DcpOpenPayload::IncludeDeleteTimes) != 0)
184                      ? IncludeDeleteTime::Yes
185                      : IncludeDeleteTime::No),
186      createChkPtProcessorTsk(startTask) {
187    setSupportAck(true);
188    setReserved(true);
189    pause(PausedReason::Initializing);
190    if (notifyOnly) {
191        setLogHeader("DCP (Notifier) " + getName() + " -");
192    } else {
193        setLogHeader("DCP (Producer) " + getName() + " -");
194    }
195    // Reduce the minimum log level of view engine DCP streams as they are
196    // extremely noisy due to creating new stream, per vbucket,per design doc
197    // every ~10s.
198    if (name.find("eq_dcpq:mapreduce_view") != std::string::npos ||
199        name.find("eq_dcpq:spatial_view") != std::string::npos) {
200        logger->set_level(spdlog::level::level_enum::warn);
201        // Unregister this logger so that any changes in verbosity will not
202        // be reflected in this logger. This prevents us from getting in a
203        // state where we change the verbosity to a more verbose value, then
204        // cannot return to the original state where this logger only prints
205        // warning level messages and others print info level.
206        logger->unregister();
207    }
208
209    // MB-28468: Reduce the minimum log level of FTS DCP streams as they are
210    // very noisy due to creating streams for non-existing vBuckets. Future
211    // development of FTS should remedy this, however for now, we need to
212    // reduce their verbosity as they cause the memcached log to rotate early.
213    if (name.find("eq_dcpq:fts") != std::string::npos) {
214        logger->set_level(spdlog::level::level_enum::critical);
215        // Unregister this logger so that any changes in verbosity will not
216        // be reflected in this logger. This prevents us from getting in a
217        // state where we change the verbosity to a more verbose value, then
218        // cannot return to the original state where this logger only prints
219        // critical level messages and others print info level.
220        logger->unregister();
221    }
222
223    engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
224
225    // The consumer assigns opaques starting at 0 so lets have the producer
226    //start using opaques at 10M to prevent any opaque conflicts.
227    noopCtx.opaque = 10000000;
228    noopCtx.sendTime = ep_current_time();
229
230    // This is for backward compatibility with Couchbase 3.0. In 3.0 we set the
231    // noop interval to 20 seconds by default, but in post 3.0 releases we set
232    // it to be higher by default. Starting in 3.0.1 the DCP consumer sets the
233    // noop interval of the producer when connecting so in an all 3.0.1+ cluster
234    // this value will be overridden. In 3.0 however we do not set the noop
235    // interval so setting this value will make sure we don't disconnect on
236    // accident due to the producer and the consumer having a different noop
237    // interval.
238    noopCtx.dcpNoopTxInterval = defaultDcpNoopTxInterval;
239    noopCtx.pendingRecv = false;
240    noopCtx.enabled = false;
241
242    enableExtMetaData = false;
243    forceValueCompression = false;
244    enableExpiryOpcode = false;
245
246    // Cursor dropping is disabled for replication connections by default,
247    // but will be enabled through a control message to support backward
248    // compatibility. For all other type of DCP connections, cursor dropping
249    // will be enabled by default.
250    if (name.find("replication") < name.length()) {
251        supportsCursorDropping = false;
252    } else {
253        supportsCursorDropping = true;
254    }
255
256    includeDeletedUserXattrs =
257            ((flags &
258              cb::mcbp::request::DcpOpenPayload::IncludeDeletedUserXattrs) != 0)
259                    ? IncludeDeletedUserXattrs::Yes
260                    : IncludeDeletedUserXattrs::No;
261}
262
263DcpProducer::~DcpProducer() {
264    backfillMgr.reset();
265}
266
267void DcpProducer::cancelCheckpointCreatorTask() {
268    LockHolder guard(checkpointCreator->mutex);
269    if (checkpointCreator->task) {
270        static_cast<ActiveStreamCheckpointProcessorTask*>(
271                checkpointCreator->task.get())
272                ->cancelTask();
273        ExecutorPool::get()->cancel(checkpointCreator->task->getId());
274    }
275}
276
277ENGINE_ERROR_CODE DcpProducer::streamRequest(
278        uint32_t flags,
279        uint32_t opaque,
280        Vbid vbucket,
281        uint64_t start_seqno,
282        uint64_t end_seqno,
283        uint64_t vbucket_uuid,
284        uint64_t snap_start_seqno,
285        uint64_t snap_end_seqno,
286        uint64_t* rollback_seqno,
287        dcp_add_failover_log callback,
288        boost::optional<cb::const_char_buffer> json) {
289    lastReceiveTime = ep_current_time();
290    if (doDisconnect()) {
291        return ENGINE_DISCONNECT;
292    }
293
294    VBucketPtr vb = engine_.getVBucket(vbucket);
295    if (!vb) {
296        logger->warn(
297                "({}) Stream request failed because "
298                "this vbucket doesn't exist",
299                vbucket);
300        return ENGINE_NOT_MY_VBUCKET;
301    }
302
303    // check for mandatory noop
304    if ((includeXattrs == IncludeXattrs::Yes) || json.is_initialized()) {
305        if (!noopCtx.enabled &&
306            engine_.getConfiguration().isDcpNoopMandatoryForV5Features()) {
307            logger->warn(
308                    "({}) noop is mandatory for v5 features like "
309                    "xattrs and collections",
310                    vbucket);
311            return ENGINE_ENOTSUP;
312        }
313    }
314
315    if ((flags & DCP_ADD_STREAM_ACTIVE_VB_ONLY) &&
316        (vb->getState() != vbucket_state_active)) {
317        logger->info(
318                "({}) Stream request failed because "
319                "the vbucket is in state:{}, only active vbuckets were "
320                "requested",
321                vbucket,
322                vb->toString(vb->getState()));
323        return ENGINE_NOT_MY_VBUCKET;
324    }
325
326    if (!notifyOnly && start_seqno > end_seqno) {
327        EP_LOG_WARN(
328                "{} ({}) Stream request failed because the start "
329                "seqno ({}) is larger than the end seqno ({}); "
330                "Incorrect params passed by the DCP client",
331                logHeader(),
332                vbucket,
333                start_seqno,
334                end_seqno);
335        return ENGINE_ERANGE;
336    }
337
338    if (!notifyOnly && !(snap_start_seqno <= start_seqno &&
339        start_seqno <= snap_end_seqno)) {
340        logger->warn(
341                "({}) Stream request failed because "
342                "the snap start seqno ({}) <= start seqno ({})"
343                " <= snap end seqno ({}) is required",
344                vbucket,
345                snap_start_seqno,
346                start_seqno,
347                snap_end_seqno);
348        return ENGINE_ERANGE;
349    }
350
351    // Construct the filter before rollback checks so we ensure the client view
352    // of collections is compatible with the vbucket.
353    Collections::VB::Filter filter(json, vb->getManifest());
354
355    if (!filter.getStreamId() &&
356        multipleStreamRequests == MultipleStreamRequests::Yes) {
357        logger->warn(
358                "Stream request for {} failed because a valid stream-ID is "
359                "required.",
360                vbucket);
361        return ENGINE_DCP_STREAMID_INVALID;
362    } else if (filter.getStreamId() &&
363               multipleStreamRequests == MultipleStreamRequests::No) {
364        logger->warn(
365                "Stream request for {} failed because a stream-ID:{} is "
366                "present "
367                "but not required.",
368                vbucket,
369                filter.getStreamId());
370        return ENGINE_DCP_STREAMID_INVALID;
371    }
372
373    // Check if this vbid can be added to this producer connection, and if
374    // the vb connection map needs updating (if this is a new VB).
375    bool callAddVBConnByVBId = true;
376    auto found = streams.find(vbucket.get());
377    if (found != streams.end()) {
378        // vbid is already mapped. found.second is a shared_ptr<StreamContainer>
379        if (found->second) {
380            auto handle = found->second->wlock();
381            for (; !handle.end(); handle.next()) {
382                auto& sp = handle.get(); // get the shared_ptr<Stream>
383                if (sp->compareStreamId(filter.getStreamId())) {
384                    // Error if found and active
385                    if (sp->isActive()) {
386                        logger->warn(
387                                "({}) Stream ({}) request failed"
388                                " because a stream already exists for this "
389                                "vbucket",
390                                vbucket,
391                                filter.getStreamId().to_string());
392                        return ENGINE_KEY_EEXISTS;
393                    } else {
394                        // Found a 'dead' stream which can be replaced.
395                        handle.erase();
396
397                        // Don't need to add an entry to vbucket-to-conns map
398                        callAddVBConnByVBId = false;
399
400                        break;
401                    }
402                }
403            }
404        }
405    }
406
407    // If we are a notify stream then we can't use the start_seqno supplied
408    // since if it is greater than the current high seqno then it will always
409    // trigger a rollback. As a result we should use the current high seqno for
410    // rollback purposes.
411    uint64_t notifySeqno = start_seqno;
412    if (notifyOnly && start_seqno > static_cast<uint64_t>(vb->getHighSeqno())) {
413        start_seqno = static_cast<uint64_t>(vb->getHighSeqno());
414    }
415
416    std::pair<bool, std::string> need_rollback =
417            vb->failovers->needsRollback(start_seqno,
418                                         vb->getHighSeqno(),
419                                         vbucket_uuid,
420                                         snap_start_seqno,
421                                         snap_end_seqno,
422                                         vb->getPurgeSeqno(),
423                                         flags & DCP_ADD_STREAM_STRICT_VBUUID,
424                                         rollback_seqno);
425
426    if (need_rollback.first) {
427        logger->warn(
428                "({}) Stream request requires rollback to seqno:{} "
429                "because {}. Client requested seqnos:{{{},{}}} "
430                "snapshot:{{{},{}}} uuid:{}",
431                vbucket,
432                *rollback_seqno,
433                need_rollback.second,
434                start_seqno,
435                end_seqno,
436                snap_start_seqno,
437                snap_end_seqno,
438                vbucket_uuid);
439        return ENGINE_ROLLBACK;
440    }
441
442    std::vector<vbucket_failover_t> failoverEntries =
443            vb->failovers->getFailoverLog();
444
445    if (flags & DCP_ADD_STREAM_FLAG_LATEST) {
446        end_seqno = vb->getHighSeqno();
447    }
448
449    if (flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
450        end_seqno = engine_.getKVBucket()->getLastPersistedSeqno(vbucket);
451    }
452
453    if (!notifyOnly && start_seqno > end_seqno) {
454        EP_LOG_WARN(
455                "{} ({}) Stream request failed because "
456                "the start seqno ({}) is larger than the end seqno ({}"
457                "), stream request flags {}, vb_uuid {}, snapStartSeqno {}, "
458                "snapEndSeqno {}; should have rolled back instead",
459                logHeader(),
460                vbucket,
461                start_seqno,
462                end_seqno,
463                flags,
464                vbucket_uuid,
465                snap_start_seqno,
466                snap_end_seqno);
467        return ENGINE_ERANGE;
468    }
469
470    if (!notifyOnly && start_seqno > static_cast<uint64_t>(vb->getHighSeqno()))
471    {
472        EP_LOG_WARN(
473                "{} ({}) Stream request failed because "
474                "the start seqno ({}) is larger than the vb highSeqno "
475                "({}), stream request flags is {}, vb_uuid {}, snapStartSeqno "
476                "{}, snapEndSeqno {}; should have rolled back instead",
477                logHeader(),
478                vbucket,
479                start_seqno,
480                vb->getHighSeqno(),
481                flags,
482                vbucket_uuid,
483                snap_start_seqno,
484                snap_end_seqno);
485        return ENGINE_ERANGE;
486    }
487
488    std::shared_ptr<Stream> s;
489    if (notifyOnly) {
490        s = std::make_shared<NotifierStream>(&engine_,
491                                             shared_from_this(),
492                                             getName(),
493                                             flags,
494                                             opaque,
495                                             vbucket,
496                                             notifySeqno,
497                                             end_seqno,
498                                             vbucket_uuid,
499                                             snap_start_seqno,
500                                             snap_end_seqno);
501    } else {
502        try {
503            s = std::make_shared<ActiveStream>(&engine_,
504                                               shared_from_this(),
505                                               getName(),
506                                               flags,
507                                               opaque,
508                                               *vb,
509                                               start_seqno,
510                                               end_seqno,
511                                               vbucket_uuid,
512                                               snap_start_seqno,
513                                               snap_end_seqno,
514                                               includeValue,
515                                               includeXattrs,
516                                               includeDeleteTime,
517                                               includeDeletedUserXattrs,
518                                               std::move(filter));
519        } catch (const cb::engine_error& e) {
520            logger->warn(
521                    "({}) Stream request failed because "
522                    "the filter cannot be constructed, returning:{}",
523                    Vbid(vbucket),
524                    e.code().value());
525            return ENGINE_ERROR_CODE(e.code().value());
526        }
527        /* We want to create the 'createCheckpointProcessorTask' here even if
528           the stream creation fails later on in the func. The goal is to
529           create the 'checkpointProcessorTask' before any valid active stream
530           is created */
531        if (createChkPtProcessorTsk && !checkpointCreator->task) {
532            createCheckpointProcessorTask();
533            scheduleCheckpointProcessorTask();
534        }
535    }
536
537    {
538        folly::SharedMutex::ReadHolder rlh(vb->getStateLock());
539        if (vb->getState() == vbucket_state_dead) {
540            logger->warn(
541                    "({}) Stream request failed because "
542                    "this vbucket is in dead state",
543                    vbucket);
544            return ENGINE_NOT_MY_VBUCKET;
545        }
546
547        if (vb->isReceivingInitialDiskSnapshot()) {
548            logger->info(
549                    "({}) Stream request failed because this vbucket"
550                    "is currently receiving its initial disk snapshot",
551                    vbucket);
552            return ENGINE_TMPFAIL;
553        }
554
555        if (!notifyOnly) {
556            // MB-19428: Only activate the stream if we are adding it to the
557            // streams map.
558            static_cast<ActiveStream*>(s.get())->setActive();
559        }
560
561        updateStreamsMap(vbucket, filter.getStreamId(), s);
562    }
563
564    // See MB-25820:  Ensure that callback is called only after all other
565    // possible error cases have been tested.  This is to ensure we do not
566    // generate two responses for a single streamRequest.
567    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL,
568                                                                     true);
569    ENGINE_ERROR_CODE rv = callback(failoverEntries.data(),
570                                    failoverEntries.size(),
571                                    getCookie());
572    ObjectRegistry::onSwitchThread(epe);
573    if (rv != ENGINE_SUCCESS) {
574        logger->warn(
575                "({}) Couldn't add failover log to "
576                "stream request due to error {}",
577                vbucket,
578                rv);
579    }
580
581    notifyStreamReady(vbucket);
582
583    if (callAddVBConnByVBId) {
584        engine_.getDcpConnMap().addVBConnByVBId(shared_from_this(), vbucket);
585    }
586
587    return rv;
588}
589
590uint8_t DcpProducer::encodeItemHotness(const Item& item) const {
591    auto freqCount = item.getFreqCounterValue();
592    if (consumerSupportsHifiMfu) {
593        // The consumer supports the hifi_mfu eviction
594        // policy, therefore use the frequency counter.
595        return freqCount;
596    }
597    // The consumer does not support the hifi_mfu
598    // eviction policy, therefore map from the 8-bit
599    // probabilistic counter (256 states) to NRU (4 states).
600    return ItemEviction::convertFreqCountToNRUValue(freqCount);
601}
602
603cb::unique_item_ptr DcpProducer::toUniqueItemPtr(
604        std::unique_ptr<Item>&& item) const {
605    return {item.release(), cb::ItemDeleter(&engine_)};
606}
607
608ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
609
610    if (doDisconnect()) {
611        return ENGINE_DISCONNECT;
612    }
613
614    ENGINE_ERROR_CODE ret;
615    if ((ret = maybeDisconnect()) != ENGINE_FAILED) {
616          return ret;
617    }
618
619    if ((ret = maybeSendNoop(producers)) != ENGINE_FAILED) {
620        return ret;
621    }
622
623    std::unique_ptr<DcpResponse> resp;
624    if (rejectResp) {
625        resp = std::move(rejectResp);
626    } else {
627        resp = getNextItem();
628        if (!resp) {
629            return ENGINE_EWOULDBLOCK;
630        }
631    }
632
633    std::unique_ptr<Item> itmCpy;
634    totalUncompressedDataSize.fetch_add(resp->getMessageSize());
635
636    auto* mutationResponse = dynamic_cast<MutationResponse*>(resp.get());
637    if (mutationResponse) {
638        itmCpy = std::make_unique<Item>(*mutationResponse->getItem());
639        if (isCompressionEnabled()) {
640            /**
641             * Retrieve the uncompressed length if the document is compressed.
642             * This is to account for the total number of bytes if the data
643             * was sent as uncompressed
644             */
645            if (mcbp::datatype::is_snappy(itmCpy->getDataType())) {
646                size_t inflated_length = 0;
647                if (snappy_uncompressed_length(itmCpy->getData(), itmCpy->getNBytes(),
648                                               &inflated_length) == SNAPPY_OK) {
649                    totalUncompressedDataSize.fetch_add(inflated_length -
650                                                        itmCpy->getNBytes());
651                }
652            }
653        }
654    }
655
656    switch (resp->getEvent()) {
657        case DcpResponse::Event::StreamEnd:
658        {
659            StreamEndResponse* se = static_cast<StreamEndResponse*>(resp.get());
660            ret = producers->stream_end(
661                    se->getOpaque(),
662                    se->getVbucket(),
663                    mapEndStreamStatus(getCookie(), se->getFlags()),
664                    resp->getStreamId());
665
666            if (sendStreamEndOnClientStreamClose) {
667                // We did not remove the ConnHandler earlier so we could wait to
668                // send the streamEnd We have done that now, remove it.
669                engine_.getDcpConnMap().removeVBConnByVBId(getCookie(),
670                                                           se->getVbucket());
671                std::shared_ptr<Stream> stream;
672                bool vbFound;
673                std::tie(stream, vbFound) = closeStreamInner(
674                        se->getVbucket(), resp->getStreamId(), true);
675                if (!stream) {
676                    throw std::logic_error(
677                            "DcpProducer::step(StreamEnd): no stream was "
678                            "found "
679                            "for " +
680                            se->getVbucket().to_string() + " " +
681                            resp->getStreamId().to_string());
682                }
683            }
684            break;
685        }
686        case DcpResponse::Event::Commit: {
687            CommitSyncWrite* csr = static_cast<CommitSyncWrite*>(resp.get());
688            ret = producers->commit(csr->getOpaque(),
689                                    csr->getVbucket(),
690                                    csr->getKey(),
691                                    csr->getPreparedSeqno(),
692                                    csr->getCommitSeqno());
693            break;
694        }
695
696        case DcpResponse::Event::Mutation:
697        {
698            if (itmCpy == nullptr) {
699                throw std::logic_error(
700                    "DcpProducer::step(Mutation): itmCpy must be != nullptr");
701            }
702
703            const uint8_t hotness =
704                    encodeItemHotness(*mutationResponse->getItem());
705            ret = producers->mutation(mutationResponse->getOpaque(),
706                                      toUniqueItemPtr(std::move(itmCpy)),
707                                      mutationResponse->getVBucket(),
708                                      *mutationResponse->getBySeqno(),
709                                      mutationResponse->getRevSeqno(),
710                                      0 /* lock time */,
711                                      hotness,
712                                      mutationResponse->getStreamId());
713            break;
714        }
715        case DcpResponse::Event::Deletion:
716        {
717            if (itmCpy == nullptr) {
718                throw std::logic_error(
719                    "DcpProducer::step(Deletion): itmCpy must be != nullptr");
720            }
721            ret = deletionV1OrV2(includeDeleteTime,
722                                 *mutationResponse,
723                                 producers,
724                                 std::move(itmCpy),
725                                 ret,
726                                 mutationResponse->getStreamId());
727            break;
728        }
729        case DcpResponse::Event::Expiration: {
730            if (itmCpy == nullptr) {
731                throw std::logic_error(
732                        "DcpProducer::step(Expiration): itmCpy must be != "
733                        "nullptr");
734            }
735            if (enableExpiryOpcode) {
736                if (includeDeleteTime == IncludeDeleteTime::No) {
737                    throw std::logic_error(
738                            "DcpProducer::step(Expiration): If enabling Expiry "
739                            "opcodes, you cannot disable delete_v2");
740                }
741                ret = producers->expiration(
742                        mutationResponse->getOpaque(),
743                        toUniqueItemPtr(std::move(itmCpy)),
744                        mutationResponse->getVBucket(),
745                        *mutationResponse->getBySeqno(),
746                        mutationResponse->getRevSeqno(),
747                        mutationResponse->getItem()->getExptime(),
748                        resp->getStreamId());
749            } else {
750                ret = deletionV1OrV2(includeDeleteTime,
751                                     *mutationResponse,
752                                     producers,
753                                     std::move(itmCpy),
754                                     ret,
755                                     resp->getStreamId());
756            }
757            break;
758        }
759        case DcpResponse::Event::Prepare: {
760            if (itmCpy == nullptr) {
761                throw std::logic_error(
762                        "DcpProducer::step(Prepare): itmCpy must be != "
763                        "nullptr");
764            }
765
766            const uint8_t hotness =
767                    encodeItemHotness(*mutationResponse->getItem());
768            const auto docState = mutationResponse->getItem()->isDeleted()
769                                          ? DocumentState::Deleted
770                                          : DocumentState::Alive;
771            ret = producers->prepare(mutationResponse->getOpaque(),
772                                     toUniqueItemPtr(std::move(itmCpy)),
773                                     mutationResponse->getVBucket(),
774                                     *mutationResponse->getBySeqno(),
775                                     mutationResponse->getRevSeqno(),
776                                     0 /* lock time */,
777                                     hotness,
778                                     docState,
779                                     mutationResponse->getItem()
780                                             ->getDurabilityReqs()
781                                             .getLevel());
782            break;
783        }
784        case DcpResponse::Event::Abort: {
785            AbortSyncWrite& abort = dynamic_cast<AbortSyncWrite&>(*resp);
786            ret = producers->abort(abort.getOpaque(),
787                                   abort.getVbucket(),
788                                   abort.getKey(),
789                                   abort.getPreparedSeqno(),
790                                   abort.getAbortSeqno());
791            break;
792        }
793        case DcpResponse::Event::SnapshotMarker:
794        {
795            SnapshotMarker* s = static_cast<SnapshotMarker*>(resp.get());
796            ret = producers->marker(s->getOpaque(),
797                                    s->getVBucket(),
798                                    s->getStartSeqno(),
799                                    s->getEndSeqno(),
800                                    s->getFlags(),
801                                    s->getHighCompletedSeqno(),
802                                    s->getMaxVisibleSeqno(),
803                                    resp->getStreamId());
804            break;
805        }
806        case DcpResponse::Event::SetVbucket:
807        {
808            SetVBucketState* s = static_cast<SetVBucketState*>(resp.get());
809            ret = producers->set_vbucket_state(
810                    s->getOpaque(), s->getVBucket(), s->getState());
811            break;
812        }
813        case DcpResponse::Event::SystemEvent: {
814            SystemEventProducerMessage* s =
815                    static_cast<SystemEventProducerMessage*>(resp.get());
816            ret = producers->system_event(
817                    s->getOpaque(),
818                    s->getVBucket(),
819                    s->getSystemEvent(),
820                    *s->getBySeqno(),
821                    s->getVersion(),
822                    {reinterpret_cast<const uint8_t*>(s->getKey().data()),
823                     s->getKey().size()},
824                    s->getEventData(),
825                    resp->getStreamId());
826            break;
827        }
828        default:
829        {
830            logger->warn(
831                    "Unexpected dcp event ({}), "
832                    "disconnecting",
833                    resp->to_string());
834            ret = ENGINE_DISCONNECT;
835            break;
836        }
837    }
838
839    if (ret == ENGINE_E2BIG) {
840        rejectResp = std::move(resp);
841    }
842
843    if (ret == ENGINE_SUCCESS) {
844        switch (resp->getEvent()) {
845        case DcpResponse::Event::Abort:
846        case DcpResponse::Event::Commit:
847        case DcpResponse::Event::Deletion:
848        case DcpResponse::Event::Expiration:
849        case DcpResponse::Event::Mutation:
850        case DcpResponse::Event::Prepare:
851        case DcpResponse::Event::SystemEvent:
852            itemsSent++;
853            break;
854        case DcpResponse::Event::AddStream:
855        case DcpResponse::Event::SeqnoAcknowledgement:
856        case DcpResponse::Event::SetVbucket:
857        case DcpResponse::Event::SnapshotMarker:
858        case DcpResponse::Event::StreamReq:
859        case DcpResponse::Event::StreamEnd:
860            break;
861        }
862
863        totalBytesSent.fetch_add(resp->getMessageSize());
864    }
865
866    lastSendTime = ep_current_time();
867    return ret;
868}
869
870ENGINE_ERROR_CODE DcpProducer::bufferAcknowledgement(uint32_t opaque,
871                                                     Vbid vbucket,
872                                                     uint32_t buffer_bytes) {
873    lastReceiveTime = ep_current_time();
874    log.acknowledge(buffer_bytes);
875    return ENGINE_SUCCESS;
876}
877
878ENGINE_ERROR_CODE DcpProducer::deletionV1OrV2(
879        IncludeDeleteTime includeDeleteTime,
880        MutationResponse& mutationResponse,
881        dcp_message_producers* producers,
882        std::unique_ptr<Item> itmCpy,
883        ENGINE_ERROR_CODE ret,
884        cb::mcbp::DcpStreamId sid) {
885    if (includeDeleteTime == IncludeDeleteTime::Yes) {
886        ret = producers->deletion_v2(
887                mutationResponse.getOpaque(),
888                toUniqueItemPtr(std::move(itmCpy)),
889                mutationResponse.getVBucket(),
890                *mutationResponse.getBySeqno(),
891                mutationResponse.getRevSeqno(),
892                mutationResponse.getItem()->getDeleteTime(),
893                sid);
894    } else {
895        ret = producers->deletion(mutationResponse.getOpaque(),
896                                  toUniqueItemPtr(std::move(itmCpy)),
897                                  mutationResponse.getVBucket(),
898                                  *mutationResponse.getBySeqno(),
899                                  mutationResponse.getRevSeqno(),
900                                  sid);
901    }
902    return ret;
903}
904
905ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque,
906                                       cb::const_char_buffer key,
907                                       cb::const_char_buffer value) {
908    lastReceiveTime = ep_current_time();
909    const char* param = key.data();
910    std::string keyStr(key.data(), key.size());
911    std::string valueStr(value.data(), value.size());
912
913    if (strncmp(param, "backfill_order", key.size()) == 0) {
914        using ScheduleOrder = BackfillManager::ScheduleOrder;
915        if (valueStr == "round-robin") {
916            backfillMgr->setBackfillOrder(ScheduleOrder::RoundRobin);
917        } else if (valueStr == "sequential") {
918            backfillMgr->setBackfillOrder(ScheduleOrder::Sequential);
919        } else {
920            engine_.setErrorContext(
921                    getCookie(),
922                    "Unsupported value '" + keyStr +
923                            "' for ctrl parameter 'backfill_order'");
924            return ENGINE_EINVAL;
925        }
926        return ENGINE_SUCCESS;
927
928    } else if (strncmp(param, "connection_buffer_size", key.size()) == 0) {
929        uint32_t size;
930        if (parseUint32(valueStr.c_str(), &size)) {
931            /* Size 0 implies the client (DCP consumer) does not support
932               flow control */
933            log.setBufferSize(size);
934            return ENGINE_SUCCESS;
935        }
936    } else if (strncmp(param, "stream_buffer_size", key.size()) == 0) {
937        logger->warn(
938                "The ctrl parameter stream_buffer_size is"
939                "not supported by this engine");
940        return ENGINE_ENOTSUP;
941    } else if (strncmp(param, "enable_noop", key.size()) == 0) {
942        if (valueStr == "true") {
943            noopCtx.enabled = true;
944        } else {
945            noopCtx.enabled = false;
946        }
947        return ENGINE_SUCCESS;
948    } else if (strncmp(param, "enable_ext_metadata", key.size()) == 0) {
949        if (valueStr == "true") {
950            enableExtMetaData = true;
951        } else {
952            enableExtMetaData = false;
953        }
954        return ENGINE_SUCCESS;
955    } else if (strncmp(param, "force_value_compression", key.size()) == 0) {
956        if (!engine_.isDatatypeSupported(getCookie(),
957                               PROTOCOL_BINARY_DATATYPE_SNAPPY)) {
958            engine_.setErrorContext(getCookie(), "The ctrl parameter "
959                  "force_value_compression is only supported if datatype "
960                  "snappy is enabled on the connection");
961            return ENGINE_EINVAL;
962        }
963        if (valueStr == "true") {
964            forceValueCompression = true;
965        } else {
966            forceValueCompression = false;
967        }
968        return ENGINE_SUCCESS;
969        // vulcan onwards we accept two cursor_dropping control keys.
970    } else if (keyStr == "supports_cursor_dropping_vulcan" ||
971               keyStr == "supports_cursor_dropping") {
972        if (valueStr == "true") {
973            supportsCursorDropping = true;
974        } else {
975            supportsCursorDropping = false;
976        }
977        return ENGINE_SUCCESS;
978    } else if (strncmp(param, "supports_hifi_MFU", key.size()) == 0) {
979        consumerSupportsHifiMfu = (valueStr == "true");
980        return ENGINE_SUCCESS;
981    } else if (strncmp(param, "set_noop_interval", key.size()) == 0) {
982        uint32_t noopInterval;
983        if (parseUint32(valueStr.c_str(), &noopInterval)) {
984            /*
985             * We need to ensure that we only set the noop interval to a value
986             * that is a multiple of the connection manager interval. The reason
987             * is that if there is no DCP traffic we snooze for the connection
988             * manager interval before sending the noop.
989             */
990            if (noopInterval % engine_.getConfiguration().
991                    getConnectionManagerInterval() == 0) {
992                noopCtx.dcpNoopTxInterval = std::chrono::seconds(noopInterval);
993                return ENGINE_SUCCESS;
994            } else {
995                logger->warn(
996                        "The ctrl parameter "
997                        "set_noop_interval:{} is being set to seconds."
998                        "This is not a multiple of the "
999                        "connectionManagerInterval:{} "
1000                        "of seconds, and so is not supported.",
1001                        noopInterval,
1002                        engine_.getConfiguration()
1003                                .getConnectionManagerInterval());
1004                return ENGINE_EINVAL;
1005            }
1006        }
1007    } else if (strncmp(param, "set_priority", key.size()) == 0) {
1008        if (valueStr == "high") {
1009            engine_.setDCPPriority(getCookie(), CONN_PRIORITY_HIGH);
1010            return ENGINE_SUCCESS;
1011        } else if (valueStr == "medium") {
1012            engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
1013            return ENGINE_SUCCESS;
1014        } else if (valueStr == "low") {
1015            engine_.setDCPPriority(getCookie(), CONN_PRIORITY_LOW);
1016            return ENGINE_SUCCESS;
1017        }
1018    } else if (keyStr == "send_stream_end_on_client_close_stream") {
1019        if (valueStr == "true") {
1020            sendStreamEndOnClientStreamClose = true;
1021        }
1022        /* Do not want to give an option to the client to disable this.
1023           Default is disabled, client has only a choice to enable.
1024           This is a one time setting and there is no point giving the client an
1025           option to toggle it back mid way during the connection */
1026        return ENGINE_SUCCESS;
1027    } else if (strncmp(param, "enable_expiry_opcode", key.size()) == 0) {
1028        // Expiry opcode uses the same encoding as deleteV2 (includes
1029        // delete time); therefore a client can only enable expiry_opcode
1030        // if the dcpOpen flags have includeDeleteTime set.
1031        enableExpiryOpcode = valueStr == "true" &&
1032                             includeDeleteTime == IncludeDeleteTime::Yes;
1033
1034        return ENGINE_SUCCESS;
1035    } else if (keyStr == "enable_stream_id") {
1036        if (valueStr != "true") {
1037            // For simplicity, user cannot turn this off, it is by default off
1038            // and can only be enabled one-way per Producer.
1039            return ENGINE_EINVAL;
1040        }
1041        multipleStreamRequests = MultipleStreamRequests::Yes;
1042        return ENGINE_SUCCESS;
1043    } else if (key == "enable_sync_writes") {
1044        if (valueStr == "true") {
1045            supportsSyncReplication = SyncReplication::SyncWrites;
1046            if (!consumerName.empty()) {
1047                supportsSyncReplication = SyncReplication::SyncReplication;
1048            }
1049            return ENGINE_SUCCESS;
1050        }
1051    } else if (key == "consumer_name" && !valueStr.empty()) {
1052        consumerName = valueStr;
1053        if (supportsSyncReplication == SyncReplication::SyncWrites) {
1054            supportsSyncReplication = SyncReplication::SyncReplication;
1055        }
1056        return ENGINE_SUCCESS;
1057    } else if (key == "include_deleted_user_xattrs") {
1058        if (valueStr == "true") {
1059            if (includeDeletedUserXattrs == IncludeDeletedUserXattrs::Yes) {
1060                return ENGINE_SUCCESS;
1061            } else {
1062                // Note: Return here as there is no invalid param, we just want
1063                // to inform the DCP client that this Producer does not enable
1064                // IncludeDeletedUserXattrs, so we do not want to log as below
1065                return ENGINE_EINVAL;
1066            }
1067        }
1068    }
1069
1070    logger->warn("Invalid ctrl parameter '{}' for {}", valueStr, keyStr);
1071
1072    return ENGINE_EINVAL;
1073}
1074
1075ENGINE_ERROR_CODE DcpProducer::seqno_acknowledged(uint32_t opaque,
1076                                                  Vbid vbucket,
1077                                                  uint64_t prepared_seqno) {
1078    if (!isSyncReplicationEnabled()) {
1079        logger->warn(
1080                "({}) seqno_acknowledge failed because SyncReplication is"
1081                " not enabled on this Producer");
1082        return ENGINE_EINVAL;
1083    }
1084
1085    if (consumerName.empty()) {
1086        logger->warn(
1087                "({}) seqno_acknowledge failed because this producer does"
1088                " not have an associated consumer name");
1089        return ENGINE_EINVAL;
1090    }
1091
1092    VBucketPtr vb = engine_.getVBucket(vbucket);
1093    if (!vb) {
1094        logger->warn(
1095                "({}) seqno_acknowledge failed because this vbucket doesn't "
1096                "exist",
1097                vbucket);
1098        return ENGINE_NOT_MY_VBUCKET;
1099    }
1100
1101    logger->debug("({}) seqno_acknowledged: prepared_seqno:{}",
1102                  vbucket,
1103                  prepared_seqno);
1104
1105    // Confirm that we only receive ack seqnos we have sent
1106    auto rv = streams.find(vbucket.get());
1107    if (rv == streams.end()) {
1108        throw std::logic_error(
1109                "Replica acked seqno:" + std::to_string(prepared_seqno) +
1110                " for vbucket:" + to_string(vbucket) +
1111                " but we don't have a StreamContainer for that vb");
1112    }
1113
1114    std::shared_ptr<ActiveStream> stream;
1115    for (auto itr = rv->second->rlock(); !itr.end(); itr.next()) {
1116        auto s = itr.get();
1117        if (s->getOpaque() == opaque) {
1118            stream = dynamic_pointer_cast<ActiveStream>(s);
1119            break;
1120        }
1121    }
1122
1123    if (!stream) {
1124        // No stream found, may be the case that we have just ended our
1125        // stream and removed the stream from our map but the consumer is
1126        // not yet aware and we have received a seqno ack. Just return
1127        // success and ignore the ack.
1128        return ENGINE_SUCCESS;
1129    }
1130
1131    if (seqnoAckHook) {
1132        seqnoAckHook();
1133    }
1134
1135    return stream->seqnoAck(consumerName, prepared_seqno);
1136}
1137
1138bool DcpProducer::handleResponse(const protocol_binary_response_header* resp) {
1139    lastReceiveTime = ep_current_time();
1140    if (doDisconnect()) {
1141        return false;
1142    }
1143
1144    const auto opcode = resp->response.getClientOpcode();
1145    const auto opaque = resp->response.getOpaque();
1146    const auto responseStatus = resp->response.getStatus();
1147
1148    // Search for an active stream with the same opaque as the response.
1149    auto streamFindFn = [opaque](const StreamsMap::value_type& s) {
1150      auto handle = s.second->rlock();
1151      for (; !handle.end(); handle.next()) {
1152          const auto& stream = handle.get();
1153          auto* as = dynamic_cast<ActiveStream*>(stream.get());
1154          if (as && opaque == stream->getOpaque()) {
1155              return stream; // return matching shared_ptr<Stream>
1156          }
1157      }
1158      return ContainerElement{};
1159    };
1160
1161    const auto errorMessageHandler = [&]() -> bool {
1162        // For DcpPrepare, DcpCommit and DcpAbort we may see KeyEnoent or
1163        // Einval for the following reasons.
1164        // KeyEnoent:
1165        // In this case we receive a KeyEnoent, we need to disconnect as we
1166        // must have sent an a commit or abort of key that the consumer is
1167        // unaware of and we should never see KeyEnoent from a DcpPrepare.
1168        // Einval:
1169        // If we have seen a Einval we also need to disconnect as we must
1170        // have sent an invalid. Mutation or packet to the consumer e.g. we
1171        // sent an abort to the consumer in a non disk snapshot without it
1172        // having seen a prepare.
1173        if (responseStatus == cb::mcbp::Status::NotMyVbucket ||
1174            responseStatus == cb::mcbp::Status::KeyEexists ||
1175            (responseStatus == cb::mcbp::Status::KeyEnoent &&
1176             opcode != cb::mcbp::ClientOpcode::DcpPrepare &&
1177             opcode != cb::mcbp::ClientOpcode::DcpAbort &&
1178             opcode != cb::mcbp::ClientOpcode::DcpCommit)) {
1179            // Use find_if2 which will return the matching shared_ptr<Stream>
1180            auto stream = find_if2(streamFindFn);
1181            std::string streamInfo("null");
1182            if (stream) {
1183                streamInfo = fmt::format(fmt("stream name:{}, {}, state:{}"),
1184                                         stream->getName(),
1185                                         stream->getVBucket(),
1186                                         stream->getStateName());
1187            }
1188            std::string mcbpStatusName;
1189            if (responseStatus == cb::mcbp::Status::NotMyVbucket) {
1190                mcbpStatusName = "NotMyVbucket";
1191            } else if (responseStatus == cb::mcbp::Status::KeyEnoent) {
1192                mcbpStatusName = "KeyEnoent";
1193            } else if (responseStatus == cb::mcbp::Status::KeyEexists) {
1194                mcbpStatusName = "KeyEexists";
1195            }
1196            logger->info(
1197                    "DcpProducer::handleResponse received "
1198                    "unexpected "
1199                    "response:{}, Will not disconnect as {} will "
1200                    "affect "
1201                    "only one stream:{}",
1202                    mcbpStatusName,
1203                    resp->response.toJSON(true).dump(),
1204                    streamInfo);
1205            return true;
1206        }
1207        logger->error(
1208                "DcpProducer::handleResponse disconnecting, received "
1209                "unexpected "
1210                "response:{}",
1211                resp->response.toJSON(true).dump());
1212        return false;
1213    };
1214
1215    switch (opcode) {
1216    case cb::mcbp::ClientOpcode::DcpSetVbucketState:
1217    case cb::mcbp::ClientOpcode::DcpSnapshotMarker: {
1218        // Use find_if2 which will return the matching shared_ptr<Stream>
1219        auto stream = find_if2(streamFindFn);
1220        if (stream) {
1221            auto* as = static_cast<ActiveStream*>(stream.get());
1222            if (opcode == cb::mcbp::ClientOpcode::DcpSetVbucketState) {
1223                as->setVBucketStateAckRecieved();
1224            } else {
1225                as->snapshotMarkerAckReceived();
1226            }
1227        }
1228
1229        return true;
1230    }
1231    case cb::mcbp::ClientOpcode::DcpStreamEnd:
1232        // The consumer could of closed the stream (DcpStreamEnd), enoent is
1233        // expected, but any other errors are not expected.
1234        if (responseStatus == cb::mcbp::Status::KeyEnoent ||
1235            responseStatus == cb::mcbp::Status::Success) {
1236            return true;
1237        }
1238        return errorMessageHandler();
1239    case cb::mcbp::ClientOpcode::DcpNoop:
1240        if (noopCtx.opaque == resp->response.getOpaque()) {
1241            noopCtx.pendingRecv = false;
1242            return true;
1243        }
1244        return errorMessageHandler();
1245    case cb::mcbp::ClientOpcode::DcpOpen:
1246    case cb::mcbp::ClientOpcode::DcpAddStream:
1247    case cb::mcbp::ClientOpcode::DcpCloseStream:
1248    case cb::mcbp::ClientOpcode::DcpStreamReq:
1249    case cb::mcbp::ClientOpcode::DcpGetFailoverLog:
1250    case cb::mcbp::ClientOpcode::DcpMutation:
1251    case cb::mcbp::ClientOpcode::DcpDeletion:
1252    case cb::mcbp::ClientOpcode::DcpExpiration:
1253    case cb::mcbp::ClientOpcode::DcpBufferAcknowledgement:
1254    case cb::mcbp::ClientOpcode::DcpControl:
1255    case cb::mcbp::ClientOpcode::DcpSystemEvent:
1256    case cb::mcbp::ClientOpcode::GetErrorMap:
1257    case cb::mcbp::ClientOpcode::DcpPrepare:
1258    case cb::mcbp::ClientOpcode::DcpCommit:
1259    case cb::mcbp::ClientOpcode::DcpAbort:
1260        if (responseStatus == cb::mcbp::Status::Success) {
1261            return true;
1262        }
1263        return errorMessageHandler();
1264    default:
1265        std::string errorMsg(
1266                "DcpProducer::handleResponse received an unknown client "
1267                "opcode: ");
1268        errorMsg += resp->response.toJSON(true).dump();
1269        throw std::logic_error(errorMsg);
1270    }
1271}
1272
1273std::pair<std::shared_ptr<Stream>, bool> DcpProducer::closeStreamInner(
1274        Vbid vbucket, cb::mcbp::DcpStreamId sid, bool eraseFromMapIfFound) {
1275    std::shared_ptr<Stream> stream;
1276    bool vbFound = false;
1277
1278    auto rv = streams.find(vbucket.get());
1279    if (rv != streams.end()) {
1280        vbFound = true;
1281        // Vbucket is mapped, get exclusive access to the StreamContainer
1282        auto handle = rv->second->wlock();
1283        // Try and locate a matching stream
1284        for (; !handle.end(); handle.next()) {
1285            if (handle.get()->compareStreamId(sid)) {
1286                stream = handle.get();
1287                break;
1288            }
1289        }
1290
1291        if (eraseFromMapIfFound && stream) {
1292            // Need to tidy up the map, call erase on the handle,
1293            // which will erase the current element from the container
1294            handle.erase();
1295        }
1296    }
1297    return {stream, vbFound};
1298}
1299
1300ENGINE_ERROR_CODE DcpProducer::closeStream(uint32_t opaque,
1301                                           Vbid vbucket,
1302                                           cb::mcbp::DcpStreamId sid) {
1303    lastReceiveTime = ep_current_time();
1304    if (doDisconnect()) {
1305        return ENGINE_DISCONNECT;
1306    }
1307
1308    if (!sid && multipleStreamRequests == MultipleStreamRequests::Yes) {
1309        logger->warn(
1310                "({}) closeStream request failed because a valid "
1311                "stream-ID is required.",
1312                vbucket);
1313        return ENGINE_DCP_STREAMID_INVALID;
1314    } else if (sid && multipleStreamRequests == MultipleStreamRequests::No) {
1315        logger->warn(
1316                "({}) closeStream request failed because a "
1317                "stream-ID:{} is present "
1318                "but not required.",
1319                vbucket,
1320                sid);
1321        return ENGINE_DCP_STREAMID_INVALID;
1322    }
1323
1324    /* We should not remove the stream from the streams map if we have to
1325       send the "STREAM_END" response asynchronously to the consumer, so
1326       use the value of sendStreamEndOnClientStreamClose to determine if the
1327       stream should be removed if found*/
1328    auto rv = closeStreamInner(vbucket, sid, !sendStreamEndOnClientStreamClose);
1329
1330    ENGINE_ERROR_CODE ret;
1331    if (!rv.first) {
1332        logger->warn(
1333                "({}) Cannot close stream because no "
1334                "stream exists for this vbucket {}",
1335                vbucket,
1336                sid);
1337        return sid && rv.second ? ENGINE_DCP_STREAMID_INVALID
1338                                : ENGINE_KEY_ENOENT;
1339    } else {
1340        if (!rv.first->isActive()) {
1341            logger->warn(
1342                    "({}) Cannot close stream because "
1343                    "stream is already marked as dead {}",
1344                    vbucket,
1345                    sid);
1346            ret = ENGINE_KEY_ENOENT;
1347        } else {
1348            rv.first->setDead(END_STREAM_CLOSED);
1349            ret = ENGINE_SUCCESS;
1350        }
1351        if (!sendStreamEndOnClientStreamClose) {
1352            /* Remove the conn from 'vb_conns map' only when we have removed the
1353               stream from the producer connections StreamsMap */
1354            engine_.getDcpConnMap().removeVBConnByVBId(getCookie(), vbucket);
1355        }
1356    }
1357
1358    return ret;
1359}
1360
1361void DcpProducer::notifyBackfillManager() {
1362    backfillMgr->wakeUpTask();
1363}
1364
1365bool DcpProducer::recordBackfillManagerBytesRead(size_t bytes, bool force) {
1366    if (force) {
1367        backfillMgr->bytesForceRead(bytes);
1368        return true;
1369    }
1370    return backfillMgr->bytesCheckAndRead(bytes);
1371}
1372
1373void DcpProducer::recordBackfillManagerBytesSent(size_t bytes) {
1374    backfillMgr->bytesSent(bytes);
1375}
1376
1377bool DcpProducer::scheduleBackfillManager(VBucket& vb,
1378                                          std::shared_ptr<ActiveStream> s,
1379                                          uint64_t start,
1380                                          uint64_t end) {
1381    if (start <= end) {
1382        switch (backfillMgr->schedule(
1383                vb.createDCPBackfill(engine_, s, start, end))) {
1384        case BackfillManager::ScheduleResult::Active:
1385            break;
1386        case BackfillManager::ScheduleResult::Pending:
1387            EP_LOG_INFO(
1388                    "Backfill for {} {} is pending", s->getName(), vb.getId());
1389            break;
1390        }
1391        return true;
1392    }
1393    return false;
1394}
1395
1396void DcpProducer::addStats(const AddStatFn& add_stat, const void* c) {
1397    ConnHandler::addStats(add_stat, c);
1398
1399    addStat("items_sent", getItemsSent(), add_stat, c);
1400    addStat("items_remaining", getItemsRemaining(), add_stat, c);
1401    addStat("total_bytes_sent", getTotalBytesSent(), add_stat, c);
1402    if (isCompressionEnabled()) {
1403        addStat("total_uncompressed_data_size", getTotalUncompressedDataSize(),
1404                add_stat, c);
1405    }
1406    addStat("last_sent_time", lastSendTime, add_stat, c);
1407    addStat("last_receive_time", lastReceiveTime, add_stat, c);
1408    addStat("noop_enabled", noopCtx.enabled, add_stat, c);
1409    addStat("noop_wait", noopCtx.pendingRecv, add_stat, c);
1410    addStat("enable_ext_metadata", enableExtMetaData, add_stat, c);
1411    addStat("force_value_compression", forceValueCompression, add_stat, c);
1412    addStat("cursor_dropping", supportsCursorDropping, add_stat, c);
1413    addStat("send_stream_end_on_client_close_stream",
1414            sendStreamEndOnClientStreamClose,
1415            add_stat,
1416            c);
1417    addStat("enable_expiry_opcode", enableExpiryOpcode, add_stat, c);
1418    addStat("enable_stream_id",
1419            multipleStreamRequests == MultipleStreamRequests::Yes,
1420            add_stat,
1421            c);
1422    addStat("synchronous_replication", isSyncReplicationEnabled(), add_stat, c);
1423    addStat("synchronous_writes", isSyncWritesEnabled(), add_stat, c);
1424
1425    // Possible that the producer has had its streams closed and hence doesn't
1426    // have a backfill manager anymore.
1427    if (backfillMgr) {
1428        backfillMgr->addStats(*this, add_stat, c);
1429    }
1430
1431    log.addStats(add_stat, c);
1432
1433    ExTask pointerCopy;
1434    { // Locking scope
1435        LockHolder guard(checkpointCreator->mutex);
1436        pointerCopy = checkpointCreator->task;
1437    }
1438
1439    if (pointerCopy) {
1440        static_cast<ActiveStreamCheckpointProcessorTask*>(pointerCopy.get())
1441                ->addStats(getName(), add_stat, c);
1442    }
1443
1444    ready.addStats(getName() + ":dcp_ready_queue_", add_stat, c);
1445
1446    // Make a copy of all valid streams (under lock), and then call addStats
1447    // for each one. (Done in two stages to minmise how long we have the
1448    // streams map locked for).
1449    std::vector<std::shared_ptr<Stream>> valid_streams;
1450
1451    std::for_each(streams.begin(),
1452                  streams.end(),
1453                  [&valid_streams](const StreamsMap::value_type& vt) {
1454                      for (auto handle = vt.second->rlock(); !handle.end();
1455                           handle.next()) {
1456                          valid_streams.push_back(handle.get());
1457                      }
1458                  });
1459
1460    for (const auto& stream : valid_streams) {
1461        stream->addStats(add_stat, c);
1462    }
1463
1464    addStat("num_streams", valid_streams.size(), add_stat, c);
1465}
1466
1467void DcpProducer::addTakeoverStats(const AddStatFn& add_stat,
1468                                   const void* c,
1469                                   const VBucket& vb) {
1470    // Only do takeover stats on 'traditional' streams
1471    if (multipleStreamRequests == MultipleStreamRequests::Yes) {
1472        return;
1473    }
1474
1475    auto rv = streams.find(vb.getId().get());
1476
1477    if (rv != streams.end()) {
1478        auto handle = rv->second->rlock();
1479        // Only perform takeover stats on singleton streams
1480        if (handle.size() == 1) {
1481            auto stream = handle.get();
1482            ActiveStream* as = nullptr;
1483            if ((as = dynamic_cast<ActiveStream*>(stream.get()))) {
1484                as->addTakeoverStats(add_stat, c, vb);
1485                return;
1486            }
1487            logger->warn(
1488                    "({}) "
1489                    "DcpProducer::addTakeoverStats Stream type is {} and not "
1490                    "the expected Active",
1491                    vb.getId(),
1492                    stream->getStreamTypeName());
1493        } else if (handle.size() > 1) {
1494            throw std::logic_error(
1495                    "DcpProducer::addTakeoverStats unexpected size streams:(" +
1496                    std::to_string(handle.size()) + ") found " +
1497                    vb.getId().to_string());
1498        } else {
1499            // Logically, finding a StreamContainer with no stream is similar to
1500            // not finding a StreamContainer at all, both should return does_not_exist
1501            logger->info(
1502                    "({}) "
1503                    "DcpProducer::addTakeoverStats empty streams list found",
1504                    vb.getId());
1505        }
1506    } else {
1507        logger->info(
1508                "({}) "
1509                "DcpProducer::addTakeoverStats Unable to find stream",
1510                vb.getId());
1511    }
1512    // Error path - return status of does_not_exist to ensure rebalance does not
1513    // hang.
1514    add_casted_stat("status", "does_not_exist", add_stat, c);
1515    add_casted_stat("estimate", 0, add_stat, c);
1516    add_casted_stat("backfillRemaining", 0, add_stat, c);
1517}
1518
1519void DcpProducer::aggregateQueueStats(ConnCounter& aggregator) {
1520    aggregator.conn_queueDrain += itemsSent;
1521    aggregator.conn_totalBytes += totalBytesSent;
1522    aggregator.conn_totalUncompressedDataSize += totalUncompressedDataSize;
1523    aggregator.conn_queueRemaining += getItemsRemaining();
1524}
1525
1526void DcpProducer::notifySeqnoAvailable(Vbid vbucket,
1527                                       uint64_t seqno,
1528                                       SyncWriteOperation syncWrite) {
1529    if (syncWrite == SyncWriteOperation::Yes &&
1530        getSyncReplSupport() == SyncReplication::No) {
1531        // Don't bother notifying this Producer if the operation is a prepare
1532        // and we do not support SyncWrites or SyncReplication. It wouldn't send
1533        // anything anyway and we'd run a bunch of tasks on NonIO threads, front
1534        // end worker threads and potentially AuxIO threads.
1535        return;
1536    }
1537
1538    auto rv = streams.find(vbucket.get());
1539
1540    if (rv != streams.end()) {
1541        auto handle = rv->second->rlock();
1542        for (; !handle.end(); handle.next()) {
1543            if (handle.get()->isActive()) {
1544                handle.get()->notifySeqnoAvailable(seqno);
1545            }
1546        }
1547    }
1548}
1549
1550void DcpProducer::closeStreamDueToVbStateChange(
1551        Vbid vbucket,
1552        vbucket_state_t state,
1553        boost::optional<folly::SharedMutex::WriteHolder&> vbstateLock) {
1554    if (setStreamDeadStatus(vbucket, {}, END_STREAM_STATE, vbstateLock)) {
1555        logger->debug("({}) State changed to {}, closing active stream!",
1556                      vbucket,
1557                      VBucket::toString(state));
1558    }
1559}
1560
1561void DcpProducer::closeStreamDueToRollback(Vbid vbucket) {
1562    if (setStreamDeadStatus(vbucket, {}, END_STREAM_ROLLBACK)) {
1563        logger->debug(
1564                "({}) Rollback occurred,"
1565                "closing stream (downstream must rollback too)",
1566                vbucket);
1567    }
1568}
1569
1570bool DcpProducer::handleSlowStream(Vbid vbid, const CheckpointCursor* cursor) {
1571    if (supportsCursorDropping) {
1572        auto rv = streams.find(vbid.get());
1573        if (rv != streams.end()) {
1574            for (auto handle = rv->second->rlock(); !handle.end();
1575                 handle.next()) {
1576                if (handle.get()->getCursor().lock().get() == cursor) {
1577                    ActiveStream* as =
1578                            static_cast<ActiveStream*>(handle.get().get());
1579                    return as->handleSlowStream();
1580                }
1581            }
1582        }
1583    }
1584    return false;
1585}
1586
1587bool DcpProducer::setStreamDeadStatus(
1588        Vbid vbid,
1589        cb::mcbp::DcpStreamId sid,
1590        end_stream_status_t status,
1591        boost::optional<folly::SharedMutex::WriteHolder&> vbstateLock) {
1592    auto rv = streams.find(vbid.get());
1593    if (rv != streams.end()) {
1594        std::shared_ptr<Stream> streamPtr;
1595        // MB-35073: holding StreamContainer rlock while calling setDead
1596        // has been seen to cause lock inversion elsewhere.
1597        // Collect sharedptr then setDead once lock is released (itr out of
1598        // scope).
1599        for (auto itr = rv->second->rlock(); !itr.end(); itr.next()) {
1600            if (itr.get()->compareStreamId(sid)) {
1601                streamPtr = itr.get();
1602                break;
1603            }
1604        }
1605
1606        // MB-36637: At KVBucket::setVBucketState we acquire an exclusive lock
1607        // to vbstate and pass it down here. If that is the case, then we have
1608        // to avoid the call to ActiveStream::setDead(status) as it may deadlock
1609        // by acquiring the same lock again.
1610        auto* activeStream = dynamic_cast<ActiveStream*>(streamPtr.get());
1611        if (activeStream && vbstateLock) {
1612            activeStream->setDead(status, *vbstateLock);
1613        } else if (streamPtr) {
1614            streamPtr->setDead(status);
1615        }
1616
1617        return true;
1618    }
1619
1620    return false;
1621}
1622
1623void DcpProducer::closeAllStreams() {
1624    if (closeAllStreamsPreLockHook) {
1625        closeAllStreamsPreLockHook();
1626    }
1627
1628    std::lock_guard<std::mutex> lg(closeAllStreamsLock);
1629
1630    if (closeAllStreamsPostLockHook) {
1631        closeAllStreamsPostLockHook();
1632    }
1633
1634    lastReceiveTime = ep_current_time();
1635    std::vector<Vbid> vbvector;
1636    {
1637        std::for_each(streams.begin(),
1638                      streams.end(),
1639                      [this, &vbvector](StreamsMap::value_type& vt) {
1640                          vbvector.push_back((Vbid)vt.first);
1641                          std::vector<std::shared_ptr<Stream>> streamPtrs;
1642                          // MB-35073: holding StreamContainer lock while
1643                          // calling setDead leads to lock inversion - so
1644                          // collect sharedptrs in one pass then setDead once
1645                          // lock is released (itr out of scope).
1646                          {
1647                              auto handle = vt.second->wlock();
1648                              for (; !handle.end(); handle.next()) {
1649                                  streamPtrs.push_back(handle.get());
1650                              }
1651                              handle.clear();
1652                          }
1653
1654                          for (auto streamPtr : streamPtrs) {
1655                              streamPtr->setDead(END_STREAM_DISCONNECTED);
1656                          }
1657                      });
1658    }
1659    for (const auto vbid: vbvector) {
1660        engine_.getDcpConnMap().removeVBConnByVBId(getCookie(), vbid);
1661    }
1662
1663    if (closeAllStreamsHook) {
1664        closeAllStreamsHook();
1665    }
1666
1667    // Destroy the backfillManager. (BackfillManager task also
1668    // may hold a weak reference to it while running, but that is
1669    // guaranteed to decay and free the BackfillManager once it
1670    // completes run().
1671    // This will terminate any tasks and delete any backfills
1672    // associated with this Producer.  This is necessary as if we
1673    // don't, then the RCPtr references which exist between
1674    // DcpProducer and ActiveStream result in us leaking DcpProducer
1675    // objects (and Couchstore vBucket files, via DCPBackfill task).
1676    backfillMgr.reset();
1677}
1678
1679const char* DcpProducer::getType() const {
1680    if (notifyOnly) {
1681        return "notifier";
1682    } else {
1683        return "producer";
1684    }
1685}
1686
1687std::unique_ptr<DcpResponse> DcpProducer::getNextItem() {
1688    do {
1689        unPause();
1690
1691        Vbid vbucket = Vbid(0);
1692        while (ready.popFront(vbucket)) {
1693            if (log.pauseIfFull()) {
1694                ready.pushUnique(vbucket);
1695                return NULL;
1696            }
1697
1698            auto rv = streams.find(vbucket.get());
1699            if (rv == streams.end()) {
1700                // The vbucket is not in the map.
1701                continue;
1702            }
1703
1704            std::unique_ptr<DcpResponse> response;
1705
1706            // Use the resumable handle so that we can service the streams that
1707            // are associated with the vbucket. If a stream returns a response
1708            // we will ship it (i.e. leave this scope). Then next time we visit
1709            // the VB, we should /resume/ from the next stream in the container.
1710            for (auto resumableIterator = rv->second->startResumable();
1711                 !resumableIterator.complete();
1712                 resumableIterator.next()) {
1713                const std::shared_ptr<Stream>& stream = resumableIterator.get();
1714
1715                if (stream) {
1716                    response = stream->next();
1717
1718                    if (response) {
1719                        // VB gave us something, validate it
1720                        switch (response->getEvent()) {
1721                        case DcpResponse::Event::SnapshotMarker:
1722                        case DcpResponse::Event::Mutation:
1723                        case DcpResponse::Event::Deletion:
1724                        case DcpResponse::Event::Expiration:
1725                        case DcpResponse::Event::Prepare:
1726                        case DcpResponse::Event::Commit:
1727                        case DcpResponse::Event::Abort:
1728                        case DcpResponse::Event::StreamEnd:
1729                        case DcpResponse::Event::SetVbucket:
1730                        case DcpResponse::Event::SystemEvent:
1731                            break;
1732                        default:
1733                            throw std::logic_error(
1734                                    std::string("DcpProducer::getNextItem: "
1735                                                "Producer (") +
1736                                    logHeader() +
1737                                    ") is attempting to "
1738                                    "write an unexpected event:" +
1739                                    response->to_string());
1740                        }
1741
1742                        ready.pushUnique(vbucket);
1743                        return response;
1744                    } // else next stream for vb
1745                }
1746            }
1747        }
1748
1749        // flag we are paused
1750        pause(PausedReason::ReadyListEmpty);
1751
1752        // re-check the ready queue.
1753        // A new vbucket could of became ready and the notifier could of seen
1754        // paused = false, so reloop so we don't miss an operation.
1755    } while(!ready.empty());
1756
1757    return nullptr;
1758}
1759
1760void DcpProducer::setDisconnect() {
1761    ConnHandler::setDisconnect();
1762    std::for_each(
1763            streams.begin(), streams.end(), [](StreamsMap::value_type& vt) {
1764                std::vector<std::shared_ptr<Stream>> streamPtrs;
1765                // MB-35049: hold StreamContainer rlock while calling setDead
1766                // leads to lock inversion - so collect sharedptrs in one pass
1767                // then setDead once it is released (itr out of scope).
1768                for (auto itr = vt.second->rlock(); !itr.end(); itr.next()) {
1769                    streamPtrs.push_back(itr.get());
1770                }
1771                for (auto stream : streamPtrs) {
1772                    stream->setDead(END_STREAM_DISCONNECTED);
1773                }
1774            });
1775}
1776
1777void DcpProducer::notifyStreamReady(Vbid vbucket) {
1778    if (ready.pushUnique(vbucket)) {
1779        // Transitioned from empty to non-empty readyQ - unpause the Producer.
1780        log.unpauseIfSpaceAvailable();
1781    }
1782}
1783
1784void DcpProducer::immediatelyNotify() {
1785    engine_.getDcpConnMap().notifyPausedConnection(shared_from_this());
1786}
1787
1788void DcpProducer::scheduleNotify() {
1789    engine_.getDcpConnMap().addConnectionToPending(shared_from_this());
1790}
1791
1792ENGINE_ERROR_CODE DcpProducer::maybeDisconnect() {
1793    const auto now = ep_current_time();
1794    auto elapsedTime = now - lastReceiveTime;
1795    auto dcpIdleTimeout = getIdleTimeout();
1796    if (noopCtx.enabled && std::chrono::seconds(elapsedTime) > dcpIdleTimeout) {
1797        logger->info(
1798                "Disconnecting because a message has not been received for "
1799                "the DCP idle timeout of {}s. "
1800                "Sent last message (e.g. mutation/noop/streamEnd) {}s ago. "
1801                "Received last message {}s ago. "
1802                "Last sent noop {}s ago. "
1803                "DCP noop interval is {}s. "
1804                "opaque: {}, pendingRecv: {}.",
1805                dcpIdleTimeout.count(),
1806                (now - lastSendTime),
1807                elapsedTime,
1808                (now - noopCtx.sendTime),
1809                noopCtx.dcpNoopTxInterval.count(),
1810                noopCtx.opaque,
1811                noopCtx.pendingRecv ? "true" : "false");
1812        return ENGINE_DISCONNECT;
1813    }
1814    // Returning ENGINE_FAILED means ignore and continue
1815    // without disconnecting
1816    return ENGINE_FAILED;
1817}
1818
1819ENGINE_ERROR_CODE DcpProducer::maybeSendNoop(
1820        struct dcp_message_producers* producers) {
1821    if (!noopCtx.enabled) {
1822        // Returning ENGINE_FAILED means ignore and continue
1823        // without sending a noop
1824        return ENGINE_FAILED;
1825    }
1826    std::chrono::seconds elapsedTime(ep_current_time() - noopCtx.sendTime);
1827
1828    // Check to see if waiting for a noop reply.
1829    // If not try to send a noop to the consumer if the interval has passed
1830    if (!noopCtx.pendingRecv && elapsedTime >= noopCtx.dcpNoopTxInterval) {
1831        const auto ret = producers->noop(++noopCtx.opaque);
1832
1833        if (ret == ENGINE_SUCCESS) {
1834            noopCtx.pendingRecv = true;
1835            noopCtx.sendTime = ep_current_time();
1836            lastSendTime = noopCtx.sendTime;
1837        }
1838        return ret;
1839    }
1840
1841    // We have already sent a noop and are awaiting a receive or
1842    // the time interval has not passed.  In either case continue
1843    // without sending a noop.
1844    return ENGINE_FAILED;
1845}
1846
1847void DcpProducer::clearQueues() {
1848    std::for_each(
1849            streams.begin(), streams.end(), [](StreamsMap::value_type& vt) {
1850                for (auto itr = vt.second->rlock(); !itr.end(); itr.next()) {
1851                    itr.get()->clear();
1852                }
1853            });
1854}
1855
1856size_t DcpProducer::getItemsSent() {
1857    return itemsSent;
1858}
1859
1860size_t DcpProducer::getItemsRemaining() {
1861    size_t remainingSize = 0;
1862    std::for_each(
1863            streams.begin(),
1864            streams.end(),
1865            [&remainingSize](const StreamsMap::value_type& vt) {
1866                for (auto itr = vt.second->rlock(); !itr.end(); itr.next()) {
1867                    auto* as = dynamic_cast<ActiveStream*>(itr.get().get());
1868                    if (as) {
1869                        remainingSize += as->getItemsRemaining();
1870                    }
1871                }
1872            });
1873
1874    return remainingSize;
1875}
1876
1877size_t DcpProducer::getTotalBytesSent() {
1878    return totalBytesSent;
1879}
1880
1881size_t DcpProducer::getTotalUncompressedDataSize() {
1882    return totalUncompressedDataSize;
1883}
1884
1885std::vector<Vbid> DcpProducer::getVBVector() {
1886    std::vector<Vbid> vbvector;
1887    std::for_each(streams.begin(),
1888                  streams.end(),
1889                  [&vbvector](StreamsMap::value_type& iter) {
1890                      vbvector.push_back((Vbid)iter.first);
1891                  });
1892    return vbvector;
1893}
1894
1895bool DcpProducer::bufferLogInsert(size_t bytes) {
1896    return log.insert(bytes);
1897}
1898
1899void DcpProducer::createCheckpointProcessorTask() {
1900    LockHolder guard(checkpointCreator->mutex);
1901    checkpointCreator->task =
1902            std::make_shared<ActiveStreamCheckpointProcessorTask>(
1903                    engine_, shared_from_this());
1904}
1905
1906void DcpProducer::scheduleCheckpointProcessorTask() {
1907    LockHolder guard(checkpointCreator->mutex);
1908    ExecutorPool::get()->schedule(checkpointCreator->task);
1909}
1910
1911void DcpProducer::scheduleCheckpointProcessorTask(
1912        std::shared_ptr<ActiveStream> s) {
1913    LockHolder guard(checkpointCreator->mutex);
1914    if (!checkpointCreator->task) {
1915        throw std::logic_error(
1916                "DcpProducer::scheduleCheckpointProcessorTask task is null");
1917    }
1918    static_cast<ActiveStreamCheckpointProcessorTask*>(
1919            checkpointCreator->task.get())
1920            ->schedule(s);
1921}
1922
1923std::shared_ptr<StreamContainer<std::shared_ptr<Stream>>>
1924DcpProducer::findStreams(Vbid vbid) {
1925    auto it = streams.find(vbid.get());
1926    if (it != streams.end()) {
1927        return it->second;
1928    }
1929    return nullptr;
1930}
1931
1932void DcpProducer::updateStreamsMap(Vbid vbid,
1933                                   cb::mcbp::DcpStreamId sid,
1934                                   std::shared_ptr<Stream>& stream) {
1935    auto found = streams.find(vbid.get());
1936
1937    if (found != streams.end()) {
1938        // vbid is already mapped found.first is a shared_ptr<StreamContainer>
1939        if (found->second) {
1940            auto handle = found->second->wlock();
1941            for (; !handle.end(); handle.next()) {
1942                auto& sp = handle.get(); // get the shared_ptr<Stream>
1943                if (sp->compareStreamId(sid)) {
1944                    // Error if found - given we just checked this
1945                    // in the pre-flight checks for streamRequest.
1946                    auto msg = fmt::format(
1947                            "({}) Stream ({}) request failed"
1948                            " because a stream unexpectedly exists in "
1949                            "StreamContainer for this vbucket",
1950                            vbid,
1951                            sid.to_string());
1952                    logger->warn(msg);
1953                    throw std::logic_error("DcpProducer::updateStreamsMap " +
1954                                           msg);
1955                }
1956            }
1957
1958            /*
1959             * Add the Stream to the StreamContainer if we allow multiple
1960             * streams or if there are no other streams currently in the
1961             * container for this vb. We're under a writelock so we won't race
1962             * and accidentally create multiple streams if we don't support it.
1963             */
1964            if (multipleStreamRequests == MultipleStreamRequests::Yes ||
1965                handle.empty()) {
1966                // If we're here the vbid is mapped so we must update the
1967                // existing container
1968                handle.push_front(stream);
1969            } else {
1970                throw std::logic_error(
1971                        "DcpProducer::updateStreamsMap invalid state to add "
1972                        "multiple streams");
1973            }
1974        } else {
1975            throw std::logic_error("DcpProducer::updateStreamsMap " +
1976                                   vbid.to_string() + " is mapped to null");
1977        }
1978    } else {
1979        // vbid is not mapped
1980        streams.insert(std::make_pair(
1981                vbid.get(),
1982                std::make_shared<StreamContainer<ContainerElement>>(stream)));
1983    }
1984}
1985
1986end_stream_status_t DcpProducer::mapEndStreamStatus(
1987        const void* cookie, end_stream_status_t status) const {
1988    if (status == END_STREAM_FILTER_EMPTY &&
1989        !engine_.isCollectionsSupported(cookie)) {
1990        return END_STREAM_OK;
1991    }
1992    return status;
1993}
1994
1995std::string DcpProducer::getConsumerName() const {
1996    return consumerName;
1997}
1998