xref: /5.5.2/kv_engine/engines/ep/src/dcp/stream.cc (revision 9993d01a)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include "checkpoint.h"
21#include "collections/vbucket_filter.h"
22#include "dcp/backfill-manager.h"
23#include "dcp/backfill.h"
24#include "dcp/consumer.h"
25#include "dcp/producer.h"
26#include "dcp/response.h"
27#include "dcp/stream.h"
28#include "ep_engine.h"
29#include "ep_time.h"
30#include "executorpool.h"
31#include "failover-table.h"
32#include "kv_bucket.h"
33#include "kvstore.h"
34#include "replicationthrottle.h"
35#include "statwriter.h"
36
37#include <platform/checked_snprintf.h>
38
39#include <memory>
40
41
42const char* to_string(Stream::Snapshot type) {
43    switch (type) {
44    case Stream::Snapshot::None:
45        return "none";
46    case Stream::Snapshot::Disk:
47        return "disk";
48    case Stream::Snapshot::Memory:
49        return "memory";
50    }
51    throw std::logic_error("to_string(Stream::Snapshot): called with invalid "
52            "Snapshot type:" + std::to_string(int(type)));
53}
54
55const std::string to_string(Stream::Type type) {
56    switch (type) {
57    case Stream::Type::Active:
58        return "Active";
59    case Stream::Type::Notifier:
60        return "Notifier";
61    case Stream::Type::Passive:
62        return "Passive";
63    }
64    throw std::logic_error("to_string(Stream::Type): called with invalid "
65            "type:" + std::to_string(int(type)));
66}
67
68const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
69
70Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
71               uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
72               uint64_t vb_uuid, uint64_t snap_start_seqno,
73               uint64_t snap_end_seqno, Type type)
74    : name_(name),
75      flags_(flags),
76      opaque_(opaque),
77      vb_(vb),
78      start_seqno_(start_seqno),
79      end_seqno_(end_seqno),
80      vb_uuid_(vb_uuid),
81      snap_start_seqno_(snap_start_seqno),
82      snap_end_seqno_(snap_end_seqno),
83      state_(StreamState::Pending),
84      type_(type),
85      itemsReady(false),
86      readyQ_non_meta_items(0),
87      readyQueueMemory(0) {
88}
89
90Stream::~Stream() {
91    // NB: reusing the "unlocked" method without a lock because we're
92    // destructing and should not take any locks.
93    clear_UNLOCKED();
94}
95
96const std::string Stream::to_string(Stream::StreamState st) {
97    switch(st) {
98    case StreamState::Pending:
99        return "pending";
100    case StreamState::Backfilling:
101        return "backfilling";
102    case StreamState::InMemory:
103        return "in-memory";
104    case StreamState::TakeoverSend:
105        return "takeover-send";
106    case StreamState::TakeoverWait:
107        return "takeover-wait";
108    case StreamState::Reading:
109        return "reading";
110    case StreamState::Dead:
111        return "dead";
112    }
113    throw std::invalid_argument(
114        "Stream::to_string(StreamState): " + std::to_string(int(st)));
115}
116
117bool Stream::isTypeActive() const {
118    return type_ == Type::Active;
119}
120
121bool Stream::isActive() const {
122    return state_.load() != StreamState::Dead;
123}
124
125bool Stream::isBackfilling() const {
126    return state_.load() == StreamState::Backfilling;
127}
128
129bool Stream::isInMemory() const {
130    return state_.load() == StreamState::InMemory;
131}
132
133bool Stream::isPending() const {
134    return state_.load() == StreamState::Pending;
135}
136
137bool Stream::isTakeoverSend() const {
138    return state_.load() == StreamState::TakeoverSend;
139}
140
141bool Stream::isTakeoverWait() const {
142    return state_.load() == StreamState::TakeoverWait;
143}
144
145void Stream::clear_UNLOCKED() {
146    while (!readyQ.empty()) {
147        popFromReadyQ();
148    }
149}
150
151void Stream::pushToReadyQ(std::unique_ptr<DcpResponse> resp) {
152    /* expect streamMutex.ownsLock() == true */
153    if (resp) {
154        if (!resp->isMetaEvent()) {
155            readyQ_non_meta_items++;
156        }
157        readyQueueMemory.fetch_add(resp->getMessageSize(),
158                                   std::memory_order_relaxed);
159        readyQ.push(std::move(resp));
160    }
161}
162
163std::unique_ptr<DcpResponse> Stream::popFromReadyQ(void) {
164    /* expect streamMutex.ownsLock() == true */
165    if (!readyQ.empty()) {
166        auto front = std::move(readyQ.front());
167        readyQ.pop();
168
169        if (!front->isMetaEvent()) {
170            readyQ_non_meta_items--;
171        }
172        const uint32_t respSize = front->getMessageSize();
173
174        /* Decrement the readyQ size */
175        if (respSize <= readyQueueMemory.load(std::memory_order_relaxed)) {
176            readyQueueMemory.fetch_sub(respSize, std::memory_order_relaxed);
177        } else {
178            LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
179                "underflow, likely wrong stat calculation! curr size: %" PRIu64
180                "; new size: %d",
181                name_.c_str(), getVBucket(),
182                readyQueueMemory.load(std::memory_order_relaxed), respSize);
183            readyQueueMemory.store(0, std::memory_order_relaxed);
184        }
185
186        return front;
187    }
188
189    return nullptr;
190}
191
192uint64_t Stream::getReadyQueueMemory() {
193    return readyQueueMemory.load(std::memory_order_relaxed);
194}
195
196void Stream::addStats(ADD_STAT add_stat, const void *c) {
197    try {
198        const int bsize = 1024;
199        char buffer[bsize];
200        checked_snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(),
201                         vb_);
202        add_casted_stat(buffer, flags_, add_stat, c);
203        checked_snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(),
204                         vb_);
205        add_casted_stat(buffer, opaque_, add_stat, c);
206        checked_snprintf(buffer, bsize, "%s:stream_%d_start_seqno",
207                         name_.c_str(), vb_);
208        add_casted_stat(buffer, start_seqno_, add_stat, c);
209        checked_snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(),
210                         vb_);
211        add_casted_stat(buffer, end_seqno_, add_stat, c);
212        checked_snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(),
213                         vb_);
214        add_casted_stat(buffer, vb_uuid_, add_stat, c);
215        checked_snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno",
216                         name_.c_str(), vb_);
217        add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
218        checked_snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno",
219                         name_.c_str(), vb_);
220        add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
221        checked_snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(),
222                         vb_);
223        add_casted_stat(buffer, to_string(state_.load()), add_stat, c);
224    } catch (std::exception& error) {
225        LOG(EXTENSION_LOG_WARNING,
226            "Stream::addStats: Failed to build stats: %s", error.what());
227    }
228}
229
230ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
231                           std::shared_ptr<DcpProducer> p,
232                           const std::string& n,
233                           uint32_t flags,
234                           uint32_t opaque,
235                           VBucket& vbucket,
236                           uint64_t st_seqno,
237                           uint64_t en_seqno,
238                           uint64_t vb_uuid,
239                           uint64_t snap_start_seqno,
240                           uint64_t snap_end_seqno,
241                           IncludeValue includeVal,
242                           IncludeXattrs includeXattrs,
243                           IncludeDeleteTime includeDeleteTime,
244                           const Collections::Filter& filter,
245                           const Collections::VB::Manifest& manifest)
246    : Stream(n,
247             flags,
248             opaque,
249             vbucket.getId(),
250             st_seqno,
251             en_seqno,
252             vb_uuid,
253             snap_start_seqno,
254             snap_end_seqno,
255             Type::Active),
256      isBackfillTaskRunning(false),
257      pendingBackfill(false),
258      lastReadSeqno(st_seqno),
259      backfillRemaining(0),
260      lastReadSeqnoUnSnapshotted(st_seqno),
261      lastSentSeqno(st_seqno),
262      curChkSeqno(st_seqno),
263      takeoverState(vbucket_state_pending),
264      itemsFromMemoryPhase(0),
265      firstMarkerSent(false),
266      waitForSnapshot(0),
267      engine(e),
268      producerPtr(p),
269      lastSentSnapEndSeqno(0),
270      chkptItemsExtractionInProgress(false),
271      includeValue(includeVal),
272      includeXattributes(includeXattrs),
273      includeDeleteTime(includeDeleteTime),
274      snappyEnabled(p->isSnappyEnabled() ? SnappyEnabled::Yes
275                                         : SnappyEnabled::No),
276      forceValueCompression(p->isForceValueCompressionEnabled()
277                                    ? ForceValueCompression::Yes
278                                    : ForceValueCompression::No),
279      filter(filter, manifest),
280      cursorName(n + '/' + std::to_string(cursorUID.fetch_add(1))) {
281    const char* type = "";
282    if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
283        type = "takeover ";
284        end_seqno_ = dcpMaxSeqno;
285    }
286
287    ReaderLockHolder rlh(vbucket.getStateLock());
288    if (vbucket.getState() == vbucket_state_replica) {
289        snapshot_info_t info = vbucket.checkpointManager->getSnapshotInfo();
290        if (info.range.end > en_seqno) {
291            end_seqno_ = info.range.end;
292        }
293    }
294
295    p->getLogger().log(
296            EXTENSION_LOG_NOTICE,
297            "(vb %" PRIu16 ") Creating %sstream with start seqno %" PRIu64
298            " and end seqno %" PRIu64 "; requested end seqno was %" PRIu64,
299            vbucket.getId(),
300            type,
301            st_seqno,
302            end_seqno_,
303            en_seqno);
304
305    backfillItems.memory = 0;
306    backfillItems.disk = 0;
307    backfillItems.sent = 0;
308
309    bufferedBackfill.bytes = 0;
310    bufferedBackfill.items = 0;
311
312    takeoverStart = 0;
313    takeoverSendMaxTime = engine->getConfiguration().getDcpTakeoverMaxTime();
314
315    if (start_seqno_ >= end_seqno_) {
316        /* streamMutex lock needs to be acquired because endStream
317         * potentially makes call to pushToReadyQueue.
318         */
319        LockHolder lh(streamMutex);
320        endStream(END_STREAM_OK);
321        itemsReady.store(true);
322        // lock is released on leaving the scope
323    }
324
325    // Finally obtain a copy of the current separator
326    currentSeparator = vbucket.getManifest().lock().getSeparator();
327}
328
329ActiveStream::~ActiveStream() {
330    if (state_ != StreamState::Dead) {
331        removeCheckpointCursor();
332    }
333}
334
335std::unique_ptr<DcpResponse> ActiveStream::next() {
336    std::lock_guard<std::mutex> lh(streamMutex);
337    return next(lh);
338}
339
340std::unique_ptr<DcpResponse> ActiveStream::next(
341        std::lock_guard<std::mutex>& lh) {
342    std::unique_ptr<DcpResponse> response;
343
344    switch (state_.load()) {
345        case StreamState::Pending:
346            break;
347        case StreamState::Backfilling:
348            response = backfillPhase(lh);
349            break;
350        case StreamState::InMemory:
351            response = inMemoryPhase();
352            break;
353        case StreamState::TakeoverSend:
354            response = takeoverSendPhase();
355            break;
356        case StreamState::TakeoverWait:
357            response = takeoverWaitPhase();
358            break;
359        case StreamState::Reading:
360            // Not valid for an active stream.
361            {
362                auto producer = producerPtr.lock();
363                std::string connHeader =
364                        producer ? producer->logHeader()
365                                 : "DCP (Producer): **Deleted conn**";
366                throw std::logic_error(
367                        "ActiveStream::next: Invalid state "
368                        "StreamReading for stream " +
369                        connHeader + " vb " + std::to_string(vb_));
370            }
371            break;
372        case StreamState::Dead:
373            response = deadPhase();
374            break;
375    }
376
377    itemsReady.store(response ? true : false);
378    return response;
379}
380
381void ActiveStream::registerCursor(CheckpointManager& chkptmgr,
382                                  uint64_t lastProcessedSeqno) {
383    try {
384        CursorRegResult result = chkptmgr.registerCursorBySeqno(
385                cursorName, lastProcessedSeqno, MustSendCheckpointEnd::NO);
386        cursorRegistered = true;
387
388        /*
389         * MB-22960:  Due to cursor dropping we re-register the replication
390         * cursor only during backfill when we mark the disk snapshot.  However
391         * by this point it is possible that the CheckpointManager no longer
392         * contains the next sequence number the replication stream requires
393         * (i.e. next one after the backfill seqnos).
394         *
395         * To avoid this data loss when we register the cursor we check to see
396         * if the result is greater than the lastProcessedSeqno + 1.
397         * If so we know we may have missed some items and may need to perform
398         * another backfill.
399         *
400         * We actually only need to do another backfill if the result is greater
401         * than the lastProcessedSeqno + 1 and registerCursorBySeqno returns
402         * true, indicating that the resulting seqno starts with the first item
403         * on a checkpoint.
404         */
405        const uint64_t nextRequiredSeqno = lastProcessedSeqno + 1;
406        if (result.first > nextRequiredSeqno && result.second) {
407            pendingBackfill = true;
408        }
409        curChkSeqno = result.first;
410    } catch(std::exception& error) {
411        log(EXTENSION_LOG_WARNING,
412            "(vb %" PRIu16 ") Failed to register cursor: %s",
413            vb_,
414            error.what());
415        endStream(END_STREAM_STATE);
416    }
417}
418
419void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
420    {
421        LockHolder lh(streamMutex);
422        uint64_t chkCursorSeqno = endSeqno;
423
424        if (!isBackfilling()) {
425            log(EXTENSION_LOG_WARNING,
426                "(vb %" PRIu16
427                ") ActiveStream::"
428                "markDiskSnapshot: Unexpected state_:%s",
429                vb_,
430                to_string(state_.load()).c_str());
431            return;
432        }
433
434        /* We need to send the requested 'snap_start_seqno_' as the snapshot
435           start when we are sending the first snapshot because the first
436           snapshot could be resumption of a previous snapshot */
437        if (!firstMarkerSent) {
438            startSeqno = std::min(snap_start_seqno_, startSeqno);
439            firstMarkerSent = true;
440        }
441
442        VBucketPtr vb = engine->getVBucket(vb_);
443        if (!vb) {
444            log(EXTENSION_LOG_WARNING,
445                "(vb %" PRIu16
446                ") "
447                "ActiveStream::markDiskSnapshot, vbucket "
448                "does not exist",
449                vb_);
450            return;
451        }
452        // An atomic read of vbucket state without acquiring the
453        // reader lock for state should suffice here.
454        if (vb->getState() == vbucket_state_replica) {
455            if (end_seqno_ > endSeqno) {
456                /* We possibly have items in the open checkpoint
457                   (incomplete snapshot) */
458                snapshot_info_t info = vb->checkpointManager->getSnapshotInfo();
459                log(EXTENSION_LOG_NOTICE,
460                    "(vb %" PRIu16
461                    ") Merging backfill and memory snapshot for a "
462                    "replica vbucket, backfill start seqno %" PRIu64
463                    ", "
464                    "backfill end seqno %" PRIu64
465                    ", "
466                    "snapshot end seqno after merge %" PRIu64,
467                    vb_,
468                    startSeqno,
469                    endSeqno,
470                    info.range.end);
471                endSeqno = info.range.end;
472            }
473        }
474
475        log(EXTENSION_LOG_NOTICE,
476            "(vb %" PRIu16 ") Sending disk snapshot with start seqno %" PRIu64
477            " and end seqno %" PRIu64,
478            vb_,
479            startSeqno,
480            endSeqno);
481        pushToReadyQ(std::make_unique<SnapshotMarker>(
482                opaque_, vb_, startSeqno, endSeqno, MARKER_FLAG_DISK));
483        lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
484
485        if (!(flags_ & DCP_ADD_STREAM_FLAG_DISKONLY)) {
486            // Only re-register the cursor if we still need to get memory
487            // snapshots
488            registerCursor(*vb->checkpointManager, chkCursorSeqno);
489        }
490    }
491    notifyStreamReady();
492}
493
494bool ActiveStream::backfillReceived(std::unique_ptr<Item> itm,
495                                    backfill_source_t backfill_source,
496                                    bool force) {
497    if (!itm) {
498        return false;
499    }
500
501    if (itm->shouldReplicate()) {
502        std::unique_lock<std::mutex> lh(streamMutex);
503        if (isBackfilling() && filter.checkAndUpdate(*itm)) {
504            queued_item qi(std::move(itm));
505            std::unique_ptr<DcpResponse> resp(makeResponseFromItem(qi));
506            auto producer = producerPtr.lock();
507            if (!producer ||
508                !producer->recordBackfillManagerBytesRead(
509                        resp->getApproximateSize(), force)) {
510                // Deleting resp may also delete itm (which is owned by
511                // resp)
512                resp.reset();
513                return false;
514            }
515
516            bufferedBackfill.bytes.fetch_add(resp->getApproximateSize());
517            bufferedBackfill.items++;
518            lastReadSeqno.store(uint64_t(*resp->getBySeqno()));
519
520            pushToReadyQ(std::move(resp));
521
522            lh.unlock();
523            notifyStreamReady();
524
525            if (backfill_source == BACKFILL_FROM_MEMORY) {
526                backfillItems.memory++;
527            } else {
528                backfillItems.disk++;
529            }
530        }
531    }
532
533    return true;
534}
535
536void ActiveStream::completeBackfill() {
537    {
538        LockHolder lh(streamMutex);
539        if (isBackfilling()) {
540            log(EXTENSION_LOG_NOTICE,
541                "(vb %" PRIu16 ") Backfill complete, %" PRIu64
542                " items "
543                "read from disk, %" PRIu64
544                " from memory, last seqno read: "
545                "%" PRIu64 ", pendingBackfill : %s",
546                vb_,
547                uint64_t(backfillItems.disk.load()),
548                uint64_t(backfillItems.memory.load()),
549                lastReadSeqno.load(),
550                pendingBackfill ? "True" : "False");
551        } else {
552            log(EXTENSION_LOG_WARNING,
553                "(vb %" PRIu16
554                ") ActiveStream::completeBackfill: "
555                "Unexpected state_:%s",
556                vb_,
557                to_string(state_.load()).c_str());
558        }
559    }
560
561    bool inverse = true;
562    isBackfillTaskRunning.compare_exchange_strong(inverse, false);
563    notifyStreamReady();
564}
565
566void ActiveStream::snapshotMarkerAckReceived() {
567    if (--waitForSnapshot == 0) {
568        notifyStreamReady();
569    }
570}
571
572void ActiveStream::setVBucketStateAckRecieved() {
573    VBucketPtr vbucket = engine->getVBucket(vb_);
574    if (!vbucket) {
575        log(EXTENSION_LOG_WARNING,
576            "(vb %" PRIu16
577            ") not present during ack for set "
578            "vbucket during takeover",
579            vb_);
580        return;
581    }
582
583    {
584        /* Order in which the below 3 locks are acquired is important to avoid
585           any potential lock inversion problems */
586        std::unique_lock<std::mutex> epVbSetLh(engine->getKVBucket()->getVbSetMutexLock());
587        WriterLockHolder vbStateLh(vbucket->getStateLock());
588        std::unique_lock<std::mutex> lh(streamMutex);
589        if (isTakeoverWait()) {
590            if (takeoverState == vbucket_state_pending) {
591                log(EXTENSION_LOG_INFO,
592                    "(vb %" PRIu16
593                    ") Receive ack for set vbucket state to "
594                    "pending message",
595                    vb_);
596
597                takeoverState = vbucket_state_active;
598                transitionState(StreamState::TakeoverSend);
599
600                engine->getKVBucket()->setVBucketState_UNLOCKED(
601                                                        vb_,
602                                                        vbucket_state_dead,
603                                                        false /* transfer */,
604                                                        false /* notify_dcp */,
605                                                        epVbSetLh,
606                                                        &vbStateLh);
607
608                log(EXTENSION_LOG_NOTICE,
609                    "(vb %" PRIu16
610                    ") Vbucket marked as dead, last sent "
611                    "seqno: %" PRIu64 ", high seqno: %" PRIu64,
612                    vb_,
613                    lastSentSeqno.load(),
614                    vbucket->getHighSeqno());
615            } else {
616                log(EXTENSION_LOG_NOTICE,
617                    "(vb %" PRIu16
618                    ") Receive ack for set vbucket state to "
619                    "active message",
620                    vb_);
621                endStream(END_STREAM_OK);
622            }
623        } else {
624            log(EXTENSION_LOG_WARNING,
625                "(vb %" PRIu16
626                ") Unexpected ack for set vbucket op on "
627                "stream '%s' state '%s'",
628                vb_,
629                name_.c_str(),
630                to_string(state_.load()).c_str());
631            return;
632        }
633    }
634
635    notifyStreamReady();
636}
637
638std::unique_ptr<DcpResponse> ActiveStream::backfillPhase(
639        std::lock_guard<std::mutex>& lh) {
640    auto resp = nextQueuedItem();
641
642    if (resp) {
643        /* It is ok to have recordBackfillManagerBytesSent() and
644           bufferedBackfill.bytes.fetch_sub() for all events because
645           resp->getApproximateSize() is non zero for only certain resp types.
646           (MB-24905 is open to make the accounting cleaner) */
647        auto producer = producerPtr.lock();
648        if (!producer) {
649            throw std::logic_error("(vb:" + std::to_string(vb_) +
650                                   " )Producer reference null in the stream in "
651                                   "backfillPhase(). This should not happen as "
652                                   "the function is called from the producer "
653                                   "object");
654        }
655
656        producer->recordBackfillManagerBytesSent(resp->getApproximateSize());
657        bufferedBackfill.bytes.fetch_sub(resp->getApproximateSize());
658        if (!resp->isMetaEvent() || resp->isSystemEvent()) {
659            bufferedBackfill.items--;
660        }
661
662        // Only DcpResponse objects representing items from "disk" have a size
663        // so only update backfillRemaining when non-zero
664        if (resp->getApproximateSize()) {
665            if (backfillRemaining.load(std::memory_order_relaxed) > 0) {
666                backfillRemaining.fetch_sub(1, std::memory_order_relaxed);
667            }
668        }
669    }
670
671    if (!isBackfillTaskRunning && readyQ.empty()) {
672        // Given readyQ.empty() is True resp will be NULL
673        backfillRemaining.store(0, std::memory_order_relaxed);
674        // The previous backfill has completed.  Check to see if another
675        // backfill needs to be scheduled.
676        if (pendingBackfill) {
677            scheduleBackfill_UNLOCKED(true);
678            pendingBackfill = false;
679        } else {
680            if (lastReadSeqno.load() >= end_seqno_) {
681                endStream(END_STREAM_OK);
682            } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
683                transitionState(StreamState::TakeoverSend);
684            } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
685                endStream(END_STREAM_OK);
686            } else {
687                transitionState(StreamState::InMemory);
688            }
689
690            if (!resp) {
691                resp = nextQueuedItem();
692            }
693        }
694    }
695
696    return resp;
697}
698
699std::unique_ptr<DcpResponse> ActiveStream::inMemoryPhase() {
700    if (lastSentSeqno.load() >= end_seqno_) {
701        endStream(END_STREAM_OK);
702    } else if (readyQ.empty()) {
703        if (pendingBackfill) {
704            // Moving the state from InMemory to Backfilling will result in a
705            // backfill being scheduled
706            transitionState(StreamState::Backfilling);
707            pendingBackfill = false;
708            return NULL;
709        } else if (nextCheckpointItem()) {
710            return NULL;
711        }
712    }
713    return nextQueuedItem();
714}
715
716std::unique_ptr<DcpResponse> ActiveStream::takeoverSendPhase() {
717    VBucketPtr vb = engine->getVBucket(vb_);
718    if (vb && takeoverStart != 0 &&
719        !vb->isTakeoverBackedUp() &&
720        (ep_current_time() - takeoverStart) > takeoverSendMaxTime) {
721        vb->setTakeoverBackedUpState(true);
722    }
723
724    if (!readyQ.empty()) {
725        return nextQueuedItem();
726    } else {
727        if (nextCheckpointItem()) {
728            return NULL;
729        }
730    }
731
732    if (waitForSnapshot != 0) {
733        return NULL;
734    }
735
736    if (vb) {
737        vb->setTakeoverBackedUpState(false);
738        takeoverStart = 0;
739    }
740
741    auto producer = producerPtr.lock();
742    if (producer) {
743        if (producer->bufferLogInsert(SetVBucketState::baseMsgBytes)) {
744            transitionState(StreamState::TakeoverWait);
745            return std::make_unique<SetVBucketState>(
746                    opaque_, vb_, takeoverState);
747        }
748    }
749    return nullptr;
750}
751
752std::unique_ptr<DcpResponse> ActiveStream::takeoverWaitPhase() {
753    return nextQueuedItem();
754}
755
756std::unique_ptr<DcpResponse> ActiveStream::deadPhase() {
757    auto resp = nextQueuedItem();
758    if (!resp) {
759        log(EXTENSION_LOG_NOTICE,
760            "(vb %" PRIu16
761            ") Stream closed, "
762            "%" PRIu64
763            " items sent from backfill phase, "
764            "%" PRIu64
765            " items sent from memory phase, "
766            "%" PRIu64 " was last seqno sent",
767            vb_,
768            uint64_t(backfillItems.sent.load()),
769            uint64_t(itemsFromMemoryPhase.load()),
770            lastSentSeqno.load());
771    }
772    return resp;
773}
774
775bool ActiveStream::isCompressionEnabled() {
776    auto producer = producerPtr.lock();
777    if (producer) {
778        return producer->isCompressionEnabled();
779    }
780    /* If the 'producer' is deleted, what we return doesn't matter */
781    return false;
782}
783
784void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
785    Stream::addStats(add_stat, c);
786
787    try {
788        const int bsize = 1024;
789        char buffer[bsize];
790        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_disk_items",
791                         name_.c_str(), vb_);
792        add_casted_stat(buffer, backfillItems.disk, add_stat, c);
793        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_mem_items",
794                         name_.c_str(), vb_);
795        add_casted_stat(buffer, backfillItems.memory, add_stat, c);
796        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_sent",
797                         name_.c_str(), vb_);
798        add_casted_stat(buffer, backfillItems.sent, add_stat, c);
799        checked_snprintf(buffer, bsize, "%s:stream_%d_memory_phase",
800                         name_.c_str(), vb_);
801        add_casted_stat(buffer, itemsFromMemoryPhase.load(), add_stat, c);
802        checked_snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno",
803                         name_.c_str(), vb_);
804        add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
805        checked_snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
806                         name_.c_str(), vb_);
807        add_casted_stat(buffer,
808                        lastSentSnapEndSeqno.load(std::memory_order_relaxed),
809                        add_stat, c);
810        checked_snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno",
811                         name_.c_str(), vb_);
812        add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
813        checked_snprintf(buffer,
814                         bsize,
815                         "%s:stream_%d_last_read_seqno_unsnapshotted",
816                         name_.c_str(),
817                         vb_);
818        add_casted_stat(buffer, lastReadSeqnoUnSnapshotted.load(), add_stat, c);
819        checked_snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory",
820                         name_.c_str(), vb_);
821        add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
822        checked_snprintf(buffer, bsize, "%s:stream_%d_items_ready",
823                         name_.c_str(), vb_);
824        add_casted_stat(buffer, itemsReady.load() ? "true" : "false", add_stat,
825                        c);
826        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_bytes",
827                         name_.c_str(), vb_);
828        add_casted_stat(buffer, bufferedBackfill.bytes, add_stat, c);
829        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_items",
830                         name_.c_str(), vb_);
831        add_casted_stat(buffer, bufferedBackfill.items, add_stat, c);
832
833        checked_snprintf(buffer,
834                         bsize,
835                         "%s:stream_%d_cursor_registered",
836                         name_.c_str(),
837                         vb_);
838        add_casted_stat(buffer, cursorRegistered, add_stat, c);
839
840        if (isTakeoverSend() && takeoverStart != 0) {
841            checked_snprintf(buffer, bsize, "%s:stream_%d_takeover_since",
842                             name_.c_str(), vb_);
843            add_casted_stat(buffer, ep_current_time() - takeoverStart, add_stat,
844                            c);
845        }
846    } catch (std::exception& error) {
847        LOG(EXTENSION_LOG_WARNING,
848            "ActiveStream::addStats: Failed to build stats: %s", error.what());
849    }
850
851    filter.addStats(add_stat, c, name_, vb_);
852}
853
854void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie,
855                                    const VBucket& vb) {
856    LockHolder lh(streamMutex);
857
858    add_casted_stat("name", name_, add_stat, cookie);
859    if (!isActive()) {
860        log(EXTENSION_LOG_WARNING,
861            "(vb %" PRIu16
862            ") "
863            "ActiveStream::addTakeoverStats: Stream has "
864            "status StreamDead",
865            vb_);
866        // Return status of does_not_exist to ensure rebalance does not hang.
867        add_casted_stat("status", "does_not_exist", add_stat, cookie);
868        add_casted_stat("estimate", 0, add_stat, cookie);
869        add_casted_stat("backfillRemaining", 0, add_stat, cookie);
870        return;
871    }
872
873    size_t total = backfillRemaining.load(std::memory_order_relaxed);
874    if (isBackfilling()) {
875        add_casted_stat("status", "backfilling", add_stat, cookie);
876    } else {
877        add_casted_stat("status", "in-memory", add_stat, cookie);
878    }
879    add_casted_stat("backfillRemaining",
880                    backfillRemaining.load(std::memory_order_relaxed),
881                    add_stat, cookie);
882
883    size_t vb_items = vb.getNumItems();
884    size_t chk_items = 0;
885    if (vb_items > 0 && cursorRegistered) {
886        chk_items = vb.checkpointManager->getNumItemsForCursor(cursorName);
887    }
888
889    size_t del_items = 0;
890    try {
891        del_items = engine->getKVBucket()->getNumPersistedDeletes(vb_);
892    } catch (std::runtime_error& e) {
893        log(EXTENSION_LOG_WARNING,
894            "ActiveStream:addTakeoverStats: exception while getting num "
895            "persisted "
896            "deletes for vbucket:%" PRIu16
897            " - treating as 0 deletes. "
898            "Details: %s",
899            vb_,
900            e.what());
901    }
902
903    if (end_seqno_ < curChkSeqno) {
904        chk_items = 0;
905    } else if ((end_seqno_ - curChkSeqno) < chk_items) {
906        chk_items = end_seqno_ - curChkSeqno + 1;
907    }
908    total += chk_items;
909
910    add_casted_stat("estimate", total, add_stat, cookie);
911    add_casted_stat("chk_items", chk_items, add_stat, cookie);
912    add_casted_stat("vb_items", vb_items, add_stat, cookie);
913    add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
914}
915
916std::unique_ptr<DcpResponse> ActiveStream::nextQueuedItem() {
917    if (!readyQ.empty()) {
918        auto& response = readyQ.front();
919        auto producer = producerPtr.lock();
920        if (!producer) {
921            return nullptr;
922        }
923        if (producer->bufferLogInsert(response->getMessageSize())) {
924            auto seqno = response->getBySeqno();
925            if (seqno) {
926                lastSentSeqno.store(*seqno);
927
928                if (isBackfilling()) {
929                    backfillItems.sent++;
930                } else {
931                    itemsFromMemoryPhase++;
932                }
933            }
934
935            // See if the response is a system-event
936            processSystemEvent(response.get());
937            return popFromReadyQ();
938        }
939    }
940    return nullptr;
941}
942
943bool ActiveStream::nextCheckpointItem() {
944    VBucketPtr vbucket = engine->getVBucket(vb_);
945    if (vbucket &&
946        vbucket->checkpointManager->getNumItemsForCursor(cursorName) > 0) {
947        // schedule this stream to build the next checkpoint
948        auto producer = producerPtr.lock();
949        if (!producer) {
950            return false;
951        }
952        producer->scheduleCheckpointProcessorTask(shared_from_this());
953        return true;
954    } else if (chkptItemsExtractionInProgress) {
955        return true;
956    }
957    return false;
958}
959
960ActiveStreamCheckpointProcessorTask::ActiveStreamCheckpointProcessorTask(
961        EventuallyPersistentEngine& e, std::shared_ptr<DcpProducer> p)
962    : GlobalTask(
963              &e, TaskId::ActiveStreamCheckpointProcessorTask, INT_MAX, false),
964      description("Process checkpoint(s) for DCP producer " + p->getName()),
965      notified(false),
966      iterationsBeforeYield(
967              e.getConfiguration().getDcpProducerSnapshotMarkerYieldLimit()),
968      producerPtr(p) {
969}
970
971bool ActiveStreamCheckpointProcessorTask::run() {
972    if (engine->getEpStats().isShutdown) {
973        return false;
974    }
975
976    // Setup that we will sleep forever when done.
977    snooze(INT_MAX);
978
979    // Clear the notfification flag
980    notified.store(false);
981
982    size_t iterations = 0;
983    do {
984        std::shared_ptr<ActiveStream> stream = queuePop();
985
986        if (stream) {
987            stream->nextCheckpointItemTask();
988        } else {
989            break;
990        }
991        iterations++;
992    } while(!queueEmpty()
993            && iterations < iterationsBeforeYield);
994
995    // Now check if we were re-notified or there are still checkpoints
996    bool expected = true;
997    if (notified.compare_exchange_strong(expected, false)
998        || !queueEmpty()) {
999        // wakeUp, essentially yielding and allowing other tasks a go
1000        wakeUp();
1001    }
1002
1003    return true;
1004}
1005
1006void ActiveStreamCheckpointProcessorTask::wakeup() {
1007    ExecutorPool::get()->wake(getId());
1008}
1009
1010void ActiveStreamCheckpointProcessorTask::schedule(
1011        std::shared_ptr<ActiveStream> stream) {
1012    pushUnique(stream->getVBucket());
1013
1014    bool expected = false;
1015    if (notified.compare_exchange_strong(expected, true)) {
1016        wakeup();
1017    }
1018}
1019
1020void ActiveStreamCheckpointProcessorTask::cancelTask() {
1021    LockHolder lh(workQueueLock);
1022    while (!queue.empty()) {
1023        queue.pop();
1024    }
1025    queuedVbuckets.clear();
1026}
1027
1028void ActiveStreamCheckpointProcessorTask::addStats(const std::string& name,
1029                                                   ADD_STAT add_stat,
1030                                                   const void* c) const {
1031    // Take a copy of the queue data under lock; then format it to stats.
1032    std::queue<VBucket::id_type> qCopy;
1033    std::unordered_set<VBucket::id_type> qMapCopy;
1034    {
1035        LockHolder lh(workQueueLock);
1036        qCopy = queue;
1037        qMapCopy = queuedVbuckets;
1038    }
1039
1040    auto prefix = name + ":ckpt_processor_";
1041    add_casted_stat((prefix + "queue_size").c_str(), qCopy.size(), add_stat, c);
1042    add_casted_stat(
1043            (prefix + "queue_map_size").c_str(), qMapCopy.size(), add_stat, c);
1044
1045    // Form a comma-separated string of the queue's contents.
1046    std::string contents;
1047    while (!qCopy.empty()) {
1048        contents += std::to_string(qCopy.front()) + ",";
1049        qCopy.pop();
1050    }
1051    if (!contents.empty()) {
1052        contents.pop_back();
1053    }
1054    add_casted_stat(
1055            (prefix + "queue_contents").c_str(), contents.c_str(), add_stat, c);
1056
1057    // Form a comma-separated string of the queue map's contents.
1058    std::string qMapContents;
1059    for (auto& vbid : qMapCopy) {
1060        qMapContents += std::to_string(vbid) + ",";
1061    }
1062    if (!qMapContents.empty()) {
1063        qMapContents.pop_back();
1064    }
1065    add_casted_stat((prefix + "queue_map_contents").c_str(),
1066                    qMapContents.c_str(),
1067                    add_stat,
1068                    c);
1069
1070    add_casted_stat((prefix + "notified").c_str(), notified, add_stat, c);
1071}
1072
1073void ActiveStream::nextCheckpointItemTask() {
1074    // MB-29369: Obtain stream mutex here
1075    LockHolder lh(streamMutex);
1076    nextCheckpointItemTask(lh);
1077}
1078
1079void ActiveStream::nextCheckpointItemTask(const LockHolder& streamMutex) {
1080    VBucketPtr vbucket = engine->getVBucket(vb_);
1081    if (vbucket) {
1082        // MB-29369: only run the task's work if the stream is in an in-memory
1083        // phase (of which takeover is a variant).
1084        if (isInMemory() || isTakeoverSend()) {
1085            auto items = getOutstandingItems(*vbucket);
1086            processItems(items, streamMutex);
1087        }
1088    } else {
1089        /* The entity deleting the vbucket must set stream to dead,
1090           calling setDead(END_STREAM_STATE) will cause deadlock because
1091           it will try to grab streamMutex which is already acquired at this
1092           point here */
1093        return;
1094    }
1095}
1096
1097std::vector<queued_item> ActiveStream::getOutstandingItems(VBucket& vb) {
1098    std::vector<queued_item> items;
1099    // Commencing item processing - set guard flag.
1100    chkptItemsExtractionInProgress.store(true);
1101
1102    auto _begin_ = ProcessClock::now();
1103    vb.checkpointManager->getAllItemsForCursor(cursorName, items);
1104    engine->getEpStats().dcpCursorsGetItemsHisto.add(
1105            std::chrono::duration_cast<std::chrono::microseconds>(
1106                    ProcessClock::now() - _begin_));
1107
1108    if (vb.checkpointManager->hasClosedCheckpointWhichCanBeRemoved()) {
1109        engine->getKVBucket()->wakeUpCheckpointRemover();
1110    }
1111    return items;
1112}
1113
1114/**
1115 * This function is used to find out if a given item's value
1116 * needs to be changed
1117 */
1118static bool shouldModifyItem(const queued_item& item,
1119                             IncludeValue includeValue,
1120                             IncludeXattrs includeXattrs,
1121                             bool isForceValueCompressionEnabled,
1122                             bool isSnappyEnabled) {
1123    // If there is no value, no modification needs to be done
1124    if (item->getValue()) {
1125        /**
1126         * If value needs to be included
1127         */
1128        if (includeValue == IncludeValue::No) {
1129            return true;
1130        }
1131
1132        /**
1133         * Check if value needs to be compressed or decompressed
1134         * If yes, then then value definitely needs modification
1135         */
1136        if (isSnappyEnabled) {
1137            if (isForceValueCompressionEnabled) {
1138                if (!mcbp::datatype::is_snappy(item->getDataType())) {
1139                    return true;
1140                }
1141            }
1142        } else {
1143            if (mcbp::datatype::is_snappy(item->getDataType())) {
1144                return true;
1145            }
1146        }
1147
1148        /**
1149         * If the value doesn't have to be compressed, then
1150         * check if xattrs need to be pruned. If not, then
1151         * value needs no modification
1152         */
1153        if (includeXattrs == IncludeXattrs::No &&
1154            mcbp::datatype::is_xattr(item->getDataType())) {
1155            return true;
1156        }
1157    }
1158
1159    return false;
1160}
1161
1162std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
1163        const queued_item& item) {
1164    // Note: This function is hot - it is called for every item to be
1165    // sent over the DCP connection.
1166    if (item->getOperation() != queue_op::system_event) {
1167        auto cKey = Collections::DocKey::make(item->getKey(), currentSeparator);
1168        if (shouldModifyItem(item, includeValue, includeXattributes,
1169                             isForceValueCompressionEnabled(),
1170                             isSnappyEnabled())) {
1171            auto finalItem = std::make_unique<Item>(*item);
1172            finalItem->pruneValueAndOrXattrs(includeValue, includeXattributes);
1173
1174            if (isSnappyEnabled()) {
1175                if (isForceValueCompressionEnabled()) {
1176                    if (!mcbp::datatype::is_snappy(finalItem->getDataType())) {
1177                        if (!finalItem->compressValue()) {
1178                            LOG(EXTENSION_LOG_WARNING,
1179                                "Failed to snappy compress an uncompressed value");
1180                        }
1181                    }
1182                }
1183            } else {
1184                if (mcbp::datatype::is_snappy(finalItem->getDataType())) {
1185                    if (!finalItem->decompressValue()) {
1186                        LOG(EXTENSION_LOG_WARNING,
1187                            "Failed to snappy uncompress a compressed value");
1188                    }
1189                }
1190            }
1191
1192            /**
1193             * Create a mutation response to be placed in the ready queue.
1194             */
1195            return std::make_unique<MutationProducerResponse>(
1196                    std::move(finalItem),
1197                    opaque_,
1198                    includeValue,
1199                    includeXattributes,
1200                    includeDeleteTime,
1201                    cKey.getCollectionLen());
1202        }
1203
1204        // Item unmodified - construct response from original.
1205        return std::make_unique<MutationProducerResponse>(
1206                item,
1207                opaque_,
1208                includeValue,
1209                includeXattributes,
1210                includeDeleteTime,
1211                cKey.getCollectionLen());
1212    }
1213    return SystemEventProducerMessage::make(opaque_, item);
1214}
1215
1216void ActiveStream::processItems(std::vector<queued_item>& items,
1217                                const LockHolder& streamMutex) {
1218    if (!items.empty()) {
1219        bool mark = false;
1220        if (items.front()->getOperation() == queue_op::checkpoint_start) {
1221            mark = true;
1222        }
1223
1224        std::deque<std::unique_ptr<DcpResponse>> mutations;
1225        for (auto& qi : items) {
1226            if (SystemEventReplicate::process(*qi) == ProcessStatus::Continue) {
1227                curChkSeqno = qi->getBySeqno();
1228                lastReadSeqnoUnSnapshotted = qi->getBySeqno();
1229                // Check if the item is allowed on the stream, note the filter
1230                // updates itself for collection deletion events
1231                if (filter.checkAndUpdate(*qi)) {
1232                    mutations.push_back(makeResponseFromItem(qi));
1233                }
1234
1235            } else if (qi->getOperation() == queue_op::checkpoint_start) {
1236                /* if there are already other mutations, then they belong to the
1237                   previous checkpoint and hence we must create a snapshot and
1238                   put them onto readyQ */
1239                if (!mutations.empty()) {
1240                    snapshot(mutations, mark);
1241                    /* clear out all the mutations since they are already put
1242                       onto the readyQ */
1243                    mutations.clear();
1244                }
1245                /* mark true as it indicates a new checkpoint snapshot */
1246                mark = true;
1247            }
1248        }
1249
1250        if (mutations.empty()) {
1251            // If we only got checkpoint start or ends check to see if there are
1252            // any more snapshots before pausing the stream.
1253            nextCheckpointItemTask(streamMutex);
1254        } else {
1255            snapshot(mutations, mark);
1256        }
1257    }
1258
1259    // After the snapshot has been processed, check if the filter is now empty
1260    // a stream with an empty filter does nothing but self close
1261    if (filter.empty()) {
1262        // Filter is now empty empty, so endStream
1263        endStream(END_STREAM_FILTER_EMPTY);
1264    }
1265
1266    // Completed item processing - clear guard flag and notify producer.
1267    chkptItemsExtractionInProgress.store(false);
1268    notifyStreamReady(true);
1269}
1270
1271void ActiveStream::snapshot(std::deque<std::unique_ptr<DcpResponse>>& items,
1272                            bool mark) {
1273    if (items.empty()) {
1274        return;
1275    }
1276
1277    /* This assumes that all items in the "items deque" is put onto readyQ */
1278    lastReadSeqno.store(lastReadSeqnoUnSnapshotted);
1279
1280    if (isCurrentSnapshotCompleted()) {
1281        uint32_t flags = MARKER_FLAG_MEMORY;
1282
1283        // Get OptionalSeqnos which for the items list types should have values
1284        auto seqnoStart = items.front()->getBySeqno();
1285        auto seqnoEnd = items.back()->getBySeqno();
1286        if (!seqnoStart || !seqnoEnd) {
1287            throw std::logic_error(
1288                    "ActiveStream::snapshot incorrect DcpEvent, missing a "
1289                    "seqno " +
1290                    std::string(items.front()->to_string()) + " " +
1291                    std::string(items.back()->to_string()));
1292        }
1293
1294        uint64_t snapStart = *seqnoStart;
1295        uint64_t snapEnd = *seqnoEnd;
1296
1297        if (mark) {
1298            flags |= MARKER_FLAG_CHK;
1299        }
1300
1301        if (isTakeoverSend()) {
1302            waitForSnapshot++;
1303            flags |= MARKER_FLAG_ACK;
1304        }
1305
1306        /* We need to send the requested 'snap_start_seqno_' as the snapshot
1307           start when we are sending the first snapshot because the first
1308           snapshot could be resumption of a previous snapshot */
1309        if (!firstMarkerSent) {
1310            snapStart = std::min(snap_start_seqno_, snapStart);
1311            firstMarkerSent = true;
1312        }
1313        pushToReadyQ(std::make_unique<SnapshotMarker>(
1314                opaque_, vb_, snapStart, snapEnd, flags));
1315        lastSentSnapEndSeqno.store(snapEnd, std::memory_order_relaxed);
1316    }
1317
1318    for (auto& item : items) {
1319        pushToReadyQ(std::move(item));
1320    }
1321}
1322
1323uint32_t ActiveStream::setDead(end_stream_status_t status) {
1324    {
1325        LockHolder lh(streamMutex);
1326        endStream(status);
1327    }
1328
1329    if (status != END_STREAM_DISCONNECTED) {
1330        notifyStreamReady();
1331    }
1332    return 0;
1333}
1334
1335void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
1336    if (isActive()) {
1337        notifyStreamReady();
1338    }
1339}
1340
1341void ActiveStream::endStream(end_stream_status_t reason) {
1342    if (isActive()) {
1343        pendingBackfill = false;
1344        if (isBackfilling()) {
1345            // If Stream were in Backfilling state, clear out the
1346            // backfilled items to clear up the backfill buffer.
1347            clear_UNLOCKED();
1348            auto producer = producerPtr.lock();
1349            if (producer) {
1350                producer->recordBackfillManagerBytesSent(
1351                        bufferedBackfill.bytes);
1352            }
1353            bufferedBackfill.bytes = 0;
1354            bufferedBackfill.items = 0;
1355        }
1356        transitionState(StreamState::Dead);
1357        if (reason != END_STREAM_DISCONNECTED) {
1358            pushToReadyQ(
1359                    std::make_unique<StreamEndResponse>(opaque_, reason, vb_));
1360        }
1361        log(EXTENSION_LOG_NOTICE,
1362            "(vb %" PRIu16
1363            ") Stream closing, "
1364            "sent until seqno %" PRIu64
1365            " "
1366            "remaining items %" PRIu64
1367            ", "
1368            "reason: %s",
1369            vb_,
1370            lastSentSeqno.load(),
1371            uint64_t(readyQ_non_meta_items.load()),
1372            getEndStreamStatusStr(reason).c_str());
1373    }
1374}
1375
1376void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
1377    if (isBackfillTaskRunning) {
1378        log(EXTENSION_LOG_NOTICE,
1379            "(vb %" PRIu16
1380            ") Skipping "
1381            "scheduleBackfill_UNLOCKED; "
1382            "lastReadSeqno %" PRIu64
1383            ", reschedule flag "
1384            ": %s",
1385            vb_,
1386            lastReadSeqno.load(),
1387            reschedule ? "True" : "False");
1388        return;
1389    }
1390
1391    VBucketPtr vbucket = engine->getVBucket(vb_);
1392    if (!vbucket) {
1393        log(EXTENSION_LOG_WARNING,
1394            "(vb %" PRIu16
1395            ") Failed to schedule "
1396            "backfill as unable to get vbucket; "
1397            "lastReadSeqno : %" PRIu64
1398            ", "
1399            "reschedule : %s",
1400            vb_,
1401            lastReadSeqno.load(),
1402            reschedule ? "True" : "False");
1403        return;
1404    }
1405
1406    auto producer = producerPtr.lock();
1407    if (!producer) {
1408        log(EXTENSION_LOG_WARNING,
1409            "(vb %" PRIu16
1410            ") Aborting scheduleBackfill_UNLOCKED() "
1411            "as the producer conn is deleted; "
1412            "lastReadSeqno : %" PRIu64
1413            ", "
1414            "reschedule : %s",
1415            vb_,
1416            lastReadSeqno.load(),
1417            reschedule ? "True" : "False");
1418        return;
1419    }
1420
1421    uint64_t backfillStart = lastReadSeqno.load() + 1;
1422    uint64_t backfillEnd = 0;
1423    bool tryBackfill = false;
1424
1425    if ((flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) || reschedule) {
1426        uint64_t vbHighSeqno = static_cast<uint64_t>(vbucket->getHighSeqno());
1427        if (lastReadSeqno.load() > vbHighSeqno) {
1428            throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
1429                                   "lastReadSeqno (which is " +
1430                                   std::to_string(lastReadSeqno.load()) +
1431                                   " ) is greater than vbHighSeqno (which is " +
1432                                   std::to_string(vbHighSeqno) + " ). " +
1433                                   "for stream " + producer->logHeader() +
1434                                   "; vb " + std::to_string(vb_));
1435        }
1436        if (reschedule) {
1437            /* We need to do this for reschedule because in case of
1438               DCP_ADD_STREAM_FLAG_DISKONLY (the else part), end_seqno_ is
1439               set to last persisted seqno befor calling
1440               scheduleBackfill_UNLOCKED() */
1441            backfillEnd = engine->getKVBucket()->getLastPersistedSeqno(vb_);
1442        } else {
1443            backfillEnd = end_seqno_;
1444        }
1445        tryBackfill = true;
1446    } else {
1447        try {
1448            std::tie(curChkSeqno, tryBackfill) =
1449                    vbucket->checkpointManager->registerCursorBySeqno(
1450                            cursorName,
1451                            lastReadSeqno.load(),
1452                            MustSendCheckpointEnd::NO);
1453            cursorRegistered = true;
1454        } catch(std::exception& error) {
1455            log(EXTENSION_LOG_WARNING,
1456                "(vb %" PRIu16
1457                ") Failed to register "
1458                "cursor: %s",
1459                vb_,
1460                error.what());
1461            endStream(END_STREAM_STATE);
1462        }
1463
1464        if (lastReadSeqno.load() > curChkSeqno) {
1465            throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
1466                                   "lastReadSeqno (which is " +
1467                                   std::to_string(lastReadSeqno.load()) +
1468                                   " ) is greater than curChkSeqno (which is " +
1469                                   std::to_string(curChkSeqno) + " ). " +
1470                                   "for stream " + producer->logHeader() +
1471                                   "; vb " + std::to_string(vb_));
1472        }
1473
1474        /* We need to find the minimum seqno that needs to be backfilled in
1475         * order to make sure that we don't miss anything when transitioning
1476         * to a memory snapshot. The backfill task will always make sure that
1477         * the backfill end seqno is contained in the backfill.
1478         */
1479        if (backfillStart < curChkSeqno) {
1480            if (curChkSeqno > end_seqno_) {
1481                /* Backfill only is enough */
1482                backfillEnd = end_seqno_;
1483            } else {
1484                /* Backfill + in-memory streaming */
1485                backfillEnd = curChkSeqno - 1;
1486            }
1487        }
1488    }
1489
1490    if (backfillStart <= backfillEnd && tryBackfill) {
1491        log(EXTENSION_LOG_NOTICE,
1492            "(vb %" PRIu16
1493            ") Scheduling backfill "
1494            "from %" PRIu64 " to %" PRIu64
1495            ", reschedule "
1496            "flag : %s",
1497            vb_,
1498            backfillStart,
1499            backfillEnd,
1500            reschedule ? "True" : "False");
1501        producer->scheduleBackfillManager(
1502                *vbucket, shared_from_this(), backfillStart, backfillEnd);
1503        isBackfillTaskRunning.store(true);
1504    } else {
1505        if (reschedule) {
1506            // Infrequent code path, see comment below.
1507            log(EXTENSION_LOG_NOTICE,
1508                "(vb %" PRIu16
1509                ") Did not schedule "
1510                "backfill with reschedule : True, "
1511                "tryBackfill : True; "
1512                "backfillStart : %" PRIu64
1513                ", "
1514                "backfillEnd : %" PRIu64
1515                ", "
1516                "flags_ : %" PRIu32
1517                ", "
1518                "start_seqno_ : %" PRIu64
1519                ", "
1520                "end_seqno_ : %" PRIu64
1521                ", "
1522                "lastReadSeqno : %" PRIu64
1523                ", "
1524                "lastSentSeqno : %" PRIu64
1525                ", "
1526                "curChkSeqno : %" PRIu64
1527                ", "
1528                "itemsReady : %s",
1529                vb_,
1530                backfillStart,
1531                backfillEnd,
1532                flags_,
1533                start_seqno_,
1534                end_seqno_,
1535                lastReadSeqno.load(),
1536                lastSentSeqno.load(),
1537                curChkSeqno.load(),
1538                itemsReady ? "True" : "False");
1539
1540            /* Cursor was dropped, but we will not do backfill.
1541             * This may happen in a corner case where, the memory usage is high
1542             * due to other vbuckets and persistence cursor moves ahead of
1543             * replication cursor to new checkpoint open but does not persist
1544             * items yet.
1545             *
1546             * Because we dropped the cursor but did not do a backfill (and
1547             * therefore did not re-register a cursor in markDiskSnapshot) we
1548             * must re-register the cursor here.
1549             */
1550            try {
1551                CursorRegResult result =
1552                        vbucket->checkpointManager->registerCursorBySeqno(
1553                                cursorName,
1554                                lastReadSeqno.load(),
1555                                MustSendCheckpointEnd::NO);
1556                cursorRegistered = true;
1557                curChkSeqno = result.first;
1558            } catch (std::exception& error) {
1559                log(EXTENSION_LOG_WARNING,
1560                    "(vb %" PRIu16
1561                    ") Failed to register "
1562                    "cursor: %s",
1563                    vb_,
1564                    error.what());
1565                endStream(END_STREAM_STATE);
1566            }
1567        }
1568        if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
1569            endStream(END_STREAM_OK);
1570        } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
1571            transitionState(StreamState::TakeoverSend);
1572        } else {
1573            transitionState(StreamState::InMemory);
1574        }
1575        if (reschedule) {
1576            /*
1577             * It is not absolutely necessary to notify immediately as conn
1578             * manager or an incoming item will cause a notification eventually,
1579             * but wouldn't hurt to do so.
1580             *
1581             * Note: must not notify when we schedule a backfill for the first
1582             * time (i.e. when reschedule is false) because the stream is not
1583             * yet in producer conn list of streams.
1584             */
1585            notifyStreamReady();
1586        }
1587    }
1588}
1589
1590bool ActiveStream::handleSlowStream() {
1591    LockHolder lh(streamMutex);
1592    log(EXTENSION_LOG_NOTICE,
1593        "(vb %" PRIu16
1594        ") Handling slow stream; "
1595        "state_ : %s, "
1596        "lastReadSeqno : %" PRIu64
1597        ", "
1598        "lastSentSeqno : %" PRIu64
1599        ", "
1600        "vBucketHighSeqno : %" PRIu64
1601        ", "
1602        "isBackfillTaskRunning : %s",
1603        vb_,
1604        to_string(state_.load()).c_str(),
1605        lastReadSeqno.load(),
1606        lastSentSeqno.load(),
1607        engine->getVBucket(vb_)->getHighSeqno(),
1608        isBackfillTaskRunning.load() ? "True" : "False");
1609
1610    bool status = false;
1611    switch (state_.load()) {
1612        case StreamState::Backfilling:
1613        case StreamState::InMemory:
1614            /* Drop the existing cursor and set pending backfill */
1615            status = dropCheckpointCursor_UNLOCKED();
1616            pendingBackfill = true;
1617            return status;
1618        case StreamState::TakeoverSend:
1619            /* To be handled later if needed */
1620        case StreamState::TakeoverWait:
1621            /* To be handled later if needed */
1622        case StreamState::Dead:
1623            /* To be handled later if needed */
1624            return false;
1625        case StreamState::Pending:
1626        case StreamState::Reading: {
1627            auto producer = producerPtr.lock();
1628            std::string connHeader =
1629                    producer ? producer->logHeader()
1630                             : "DCP (Producer): **Deleted conn**";
1631            throw std::logic_error(
1632                    "ActiveStream::handleSlowStream: "
1633                    "called with state " +
1634                    to_string(state_.load()) +
1635                    " "
1636                    "for stream " +
1637                    connHeader + "; vb " + std::to_string(vb_));
1638        }
1639    }
1640    return false;
1641}
1642
1643std::string ActiveStream::getEndStreamStatusStr(end_stream_status_t status) {
1644    switch (status) {
1645    case END_STREAM_OK:
1646        return "The stream ended due to all items being streamed";
1647    case END_STREAM_CLOSED:
1648        return "The stream closed early due to a close stream message";
1649    case END_STREAM_STATE:
1650        return "The stream closed early because the vbucket state changed";
1651    case END_STREAM_DISCONNECTED:
1652        return "The stream closed early because the conn was disconnected";
1653    case END_STREAM_SLOW:
1654        return "The stream was closed early because it was too slow";
1655    case END_STREAM_BACKFILL_FAIL:
1656        return "The stream closed early due to backfill failure";
1657    case END_STREAM_ROLLBACK:
1658        return "The stream closed early because the vbucket rollback'ed";
1659    case END_STREAM_FILTER_EMPTY:
1660        return "The stream closed because all of the filtered collections "
1661               "were deleted";
1662    }
1663    return std::string{"Status unknown: " + std::to_string(status) +
1664                       "; this should not have happened!"};
1665}
1666
1667void ActiveStream::transitionState(StreamState newState) {
1668    if (state_ == newState) {
1669        return;
1670    }
1671
1672    EXTENSION_LOG_LEVEL logLevel = getTransitionStateLogLevel(state_, newState);
1673    log(logLevel,
1674        "ActiveStream::transitionState: (vb %d) "
1675        "Transitioning from %s to %s",
1676        vb_,
1677        to_string(state_.load()).c_str(),
1678        to_string(newState).c_str());
1679
1680    bool validTransition = false;
1681    switch (state_.load()) {
1682        case StreamState::Pending:
1683            if (newState == StreamState::Backfilling ||
1684                    newState == StreamState::Dead) {
1685                validTransition = true;
1686            }
1687            break;
1688        case StreamState::Backfilling:
1689            if(newState == StreamState::InMemory ||
1690               newState == StreamState::TakeoverSend ||
1691               newState == StreamState::Dead) {
1692                validTransition = true;
1693            }
1694            break;
1695        case StreamState::InMemory:
1696            if (newState == StreamState::Backfilling ||
1697                    newState == StreamState::Dead) {
1698                validTransition = true;
1699            }
1700            break;
1701        case StreamState::TakeoverSend:
1702            if (newState == StreamState::TakeoverWait ||
1703                    newState == StreamState::Dead) {
1704                validTransition = true;
1705            }
1706            break;
1707        case StreamState::TakeoverWait:
1708            if (newState == StreamState::TakeoverSend ||
1709                    newState == StreamState::Dead) {
1710                validTransition = true;
1711            }
1712            break;
1713        case StreamState::Reading:
1714            // Active stream should never be in READING state.
1715            validTransition = false;
1716            break;
1717        case StreamState::Dead:
1718            // Once DEAD, no other transitions should occur.
1719            validTransition = false;
1720            break;
1721    }
1722
1723    if (!validTransition) {
1724        throw std::invalid_argument("ActiveStream::transitionState:"
1725                " newState (which is " + to_string(newState) +
1726                ") is not valid for current state (which is " +
1727                to_string(state_.load()) + ")");
1728    }
1729
1730    StreamState oldState = state_.load();
1731    state_ = newState;
1732
1733    switch (newState) {
1734        case StreamState::Backfilling:
1735            if (StreamState::Pending == oldState) {
1736                scheduleBackfill_UNLOCKED(false /* reschedule */);
1737            } else if (StreamState::InMemory == oldState) {
1738                scheduleBackfill_UNLOCKED(true /* reschedule */);
1739            }
1740            break;
1741        case StreamState::InMemory:
1742            // Check if the producer has sent up till the last requested
1743            // sequence number already, if not - move checkpoint items into
1744            // the ready queue.
1745            if (lastSentSeqno.load() >= end_seqno_) {
1746                // Stream transitioning to DEAD state
1747                endStream(END_STREAM_OK);
1748                notifyStreamReady();
1749            } else {
1750                nextCheckpointItem();
1751            }
1752            break;
1753        case StreamState::TakeoverSend:
1754            takeoverStart = ep_current_time();
1755            if (!nextCheckpointItem()) {
1756                notifyStreamReady(true);
1757            }
1758            break;
1759        case StreamState::Dead:
1760            removeCheckpointCursor();
1761            break;
1762        case StreamState::TakeoverWait:
1763        case StreamState::Pending:
1764            break;
1765        case StreamState::Reading:
1766            throw std::logic_error("ActiveStream::transitionState:"
1767                    " newState can't be " + to_string(newState) +
1768                    "!");
1769    }
1770}
1771
1772size_t ActiveStream::getItemsRemaining() {
1773    VBucketPtr vbucket = engine->getVBucket(vb_);
1774
1775    if (!vbucket || !isActive()) {
1776        return 0;
1777    }
1778
1779    // Items remaining is the sum of:
1780    // (a) Items outstanding in checkpoints
1781    // (b) Items pending in our readyQ, excluding any meta items.
1782    size_t ckptItems = 0;
1783    if (cursorRegistered) {
1784        ckptItems =
1785                vbucket->checkpointManager->getNumItemsForCursor(cursorName);
1786    }
1787    return ckptItems + readyQ_non_meta_items;
1788}
1789
1790uint64_t ActiveStream::getLastReadSeqno() const {
1791    return lastReadSeqno.load();
1792}
1793
1794uint64_t ActiveStream::getLastSentSeqno() const {
1795    return lastSentSeqno.load();
1796}
1797
1798void ActiveStream::log(EXTENSION_LOG_LEVEL severity,
1799                       const char* fmt,
1800                       ...) const {
1801    va_list va;
1802    va_start(va, fmt);
1803    auto producer = producerPtr.lock();
1804    if (producer) {
1805        producer->getLogger().vlog(severity, fmt, va);
1806    } else {
1807        static Logger defaultLogger =
1808                Logger("DCP (Producer): **Deleted conn**");
1809        defaultLogger.vlog(severity, fmt, va);
1810    }
1811    va_end(va);
1812}
1813
1814bool ActiveStream::isCurrentSnapshotCompleted() const
1815{
1816    VBucketPtr vbucket = engine->getVBucket(vb_);
1817    // An atomic read of vbucket state without acquiring the
1818    // reader lock for state should suffice here.
1819    if (vbucket && vbucket->getState() == vbucket_state_replica) {
1820        if (lastSentSnapEndSeqno.load(std::memory_order_relaxed) >=
1821            lastReadSeqno) {
1822            return false;
1823        }
1824    }
1825    return true;
1826}
1827
1828bool ActiveStream::dropCheckpointCursor_UNLOCKED() {
1829    VBucketPtr vbucket = engine->getVBucket(vb_);
1830    if (!vbucket) {
1831        endStream(END_STREAM_STATE);
1832        notifyStreamReady();
1833    }
1834    /* Drop the existing cursor */
1835    cursorRegistered = false;
1836    return vbucket->checkpointManager->removeCursor(cursorName);
1837}
1838
1839EXTENSION_LOG_LEVEL ActiveStream::getTransitionStateLogLevel(
1840        StreamState currState, StreamState newState) {
1841    if ((currState == StreamState::Pending) ||
1842        (newState == StreamState::Dead)) {
1843        return EXTENSION_LOG_INFO;
1844    }
1845    return EXTENSION_LOG_NOTICE;
1846}
1847
1848void ActiveStream::processSystemEvent(DcpResponse* response) {
1849    if (response->getEvent() == DcpResponse::Event::SystemEvent) {
1850        auto se = static_cast<SystemEventProducerMessage*>(response);
1851        if (se->getSystemEvent() == mcbp::systemevent::id::CollectionsSeparatorChanged) {
1852            currentSeparator =
1853                    std::string(se->getKey().data(), se->getKey().size());
1854            // filter needs new separator?
1855        }
1856    }
1857}
1858
1859void ActiveStream::notifyStreamReady(bool force) {
1860    auto producer = producerPtr.lock();
1861    if (!producer) {
1862        return;
1863    }
1864
1865    bool inverse = false;
1866    if (force || itemsReady.compare_exchange_strong(inverse, true)) {
1867        producer->notifyStreamReady(vb_);
1868    }
1869}
1870
1871void ActiveStream::removeCheckpointCursor() {
1872    VBucketPtr vb = engine->getVBucket(vb_);
1873    if (vb) {
1874        cursorRegistered = false;
1875        vb->checkpointManager->removeCursor(cursorName);
1876    }
1877}
1878
1879std::atomic<uint64_t> ActiveStream::cursorUID;
1880
1881NotifierStream::NotifierStream(EventuallyPersistentEngine* e,
1882                               std::shared_ptr<DcpProducer> p,
1883                               const std::string& name,
1884                               uint32_t flags,
1885                               uint32_t opaque,
1886                               uint16_t vb,
1887                               uint64_t st_seqno,
1888                               uint64_t en_seqno,
1889                               uint64_t vb_uuid,
1890                               uint64_t snap_start_seqno,
1891                               uint64_t snap_end_seqno)
1892    : Stream(name,
1893             flags,
1894             opaque,
1895             vb,
1896             st_seqno,
1897             en_seqno,
1898             vb_uuid,
1899             snap_start_seqno,
1900             snap_end_seqno,
1901             Type::Notifier),
1902      producerPtr(p) {
1903    LockHolder lh(streamMutex);
1904    VBucketPtr vbucket = e->getVBucket(vb_);
1905    if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
1906        pushToReadyQ(std::make_unique<StreamEndResponse>(
1907                opaque_, END_STREAM_OK, vb_));
1908        transitionState(StreamState::Dead);
1909        itemsReady.store(true);
1910    }
1911
1912    p->getLogger().log(EXTENSION_LOG_NOTICE,
1913        "(vb %d) stream created with start seqno %" PRIu64 " and end seqno %"
1914        PRIu64, vb, st_seqno, en_seqno);
1915}
1916
1917uint32_t NotifierStream::setDead(end_stream_status_t status) {
1918    std::unique_lock<std::mutex> lh(streamMutex);
1919    if (isActive()) {
1920        transitionState(StreamState::Dead);
1921        if (status != END_STREAM_DISCONNECTED) {
1922            pushToReadyQ(
1923                    std::make_unique<StreamEndResponse>(opaque_, status, vb_));
1924            lh.unlock();
1925            notifyStreamReady();
1926        }
1927    }
1928    return 0;
1929}
1930
1931void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
1932    std::unique_lock<std::mutex> lh(streamMutex);
1933    if (isActive() && start_seqno_ < seqno) {
1934        pushToReadyQ(std::make_unique<StreamEndResponse>(
1935                opaque_, END_STREAM_OK, vb_));
1936        transitionState(StreamState::Dead);
1937        lh.unlock();
1938        notifyStreamReady();
1939    }
1940}
1941
1942std::unique_ptr<DcpResponse> NotifierStream::next() {
1943    LockHolder lh(streamMutex);
1944
1945    if (readyQ.empty()) {
1946        itemsReady.store(false);
1947        return nullptr;
1948    }
1949
1950    auto& response = readyQ.front();
1951    auto producer = producerPtr.lock();
1952    if (producer && producer->bufferLogInsert(response->getMessageSize())) {
1953        return popFromReadyQ();
1954    }
1955    return nullptr;
1956}
1957
1958void NotifierStream::transitionState(StreamState newState) {
1959    log(EXTENSION_LOG_INFO,
1960        "NotifierStream::transitionState: (vb %d) "
1961        "Transitioning from %s to %s",
1962        vb_,
1963        to_string(state_.load()).c_str(),
1964        to_string(newState).c_str());
1965
1966    if (state_ == newState) {
1967        return;
1968    }
1969
1970    bool validTransition = false;
1971    switch (state_.load()) {
1972        case StreamState::Pending:
1973            if (newState == StreamState::Dead) {
1974                validTransition = true;
1975            }
1976            break;
1977
1978        case StreamState::Backfilling:
1979        case StreamState::InMemory:
1980        case StreamState::TakeoverSend:
1981        case StreamState::TakeoverWait:
1982        case StreamState::Reading:
1983        case StreamState::Dead:
1984            // No other state transitions are valid for a notifier stream.
1985            break;
1986    }
1987
1988    if (!validTransition) {
1989        throw std::invalid_argument("NotifierStream::transitionState:"
1990                " newState (which is " + to_string(newState) +
1991                ") is not valid for current state (which is " +
1992                to_string(state_.load()) + ")");
1993    }
1994    state_ = newState;
1995}
1996
1997void NotifierStream::addStats(ADD_STAT add_stat, const void* c) {
1998    Stream::addStats(add_stat, c);
1999}
2000
2001void NotifierStream::log(EXTENSION_LOG_LEVEL severity,
2002                         const char* fmt,
2003                         ...) const {
2004    va_list va;
2005    va_start(va, fmt);
2006    auto producer = producerPtr.lock();
2007    if (producer) {
2008        producer->getLogger().vlog(severity, fmt, va);
2009    } else {
2010        static Logger defaultLogger =
2011                Logger("DCP (Notifier): **Deleted conn**");
2012        defaultLogger.vlog(severity, fmt, va);
2013    }
2014    va_end(va);
2015}
2016
2017void NotifierStream::notifyStreamReady() {
2018    auto producer = producerPtr.lock();
2019    if (!producer) {
2020        return;
2021    }
2022
2023    bool inverse = false;
2024    if (itemsReady.compare_exchange_strong(inverse, true)) {
2025        producer->notifyStreamReady(vb_);
2026    }
2027}
2028
2029PassiveStream::PassiveStream(EventuallyPersistentEngine* e,
2030                             std::shared_ptr<DcpConsumer> c,
2031                             const std::string& name,
2032                             uint32_t flags,
2033                             uint32_t opaque,
2034                             uint16_t vb,
2035                             uint64_t st_seqno,
2036                             uint64_t en_seqno,
2037                             uint64_t vb_uuid,
2038                             uint64_t snap_start_seqno,
2039                             uint64_t snap_end_seqno,
2040                             uint64_t vb_high_seqno)
2041    : Stream(name,
2042             flags,
2043             opaque,
2044             vb,
2045             st_seqno,
2046             en_seqno,
2047             vb_uuid,
2048             snap_start_seqno,
2049             snap_end_seqno,
2050             Type::Passive),
2051      engine(e),
2052      consumerPtr(c),
2053      last_seqno(vb_high_seqno),
2054      cur_snapshot_start(0),
2055      cur_snapshot_end(0),
2056      cur_snapshot_type(Snapshot::None),
2057      cur_snapshot_ack(false) {
2058    LockHolder lh(streamMutex);
2059    streamRequest_UNLOCKED(vb_uuid);
2060    itemsReady.store(true);
2061}
2062
2063PassiveStream::~PassiveStream() {
2064    uint32_t unackedBytes = clearBuffer_UNLOCKED();
2065    if (state_ != StreamState::Dead) {
2066        // Destructed a "live" stream, log it.
2067        log(EXTENSION_LOG_NOTICE,
2068            "(vb %" PRId16
2069            ") Destructing stream."
2070            " last_seqno is %" PRIu64 ", unAckedBytes is %" PRIu32 ".",
2071            vb_,
2072            last_seqno.load(),
2073            unackedBytes);
2074    }
2075}
2076
2077void PassiveStream::streamRequest(uint64_t vb_uuid) {
2078    {
2079        std::unique_lock<std::mutex> lh(streamMutex);
2080        streamRequest_UNLOCKED(vb_uuid);
2081    }
2082    notifyStreamReady();
2083}
2084
2085void PassiveStream::streamRequest_UNLOCKED(uint64_t vb_uuid) {
2086    /* the stream should send a don't care vb_uuid if start_seqno is 0 */
2087    pushToReadyQ(std::make_unique<StreamRequest>(vb_,
2088                                                 opaque_,
2089                                                 flags_,
2090                                                 start_seqno_,
2091                                                 end_seqno_,
2092                                                 start_seqno_ ? vb_uuid : 0,
2093                                                 snap_start_seqno_,
2094                                                 snap_end_seqno_));
2095
2096    const char* type = (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER)
2097        ? "takeover stream" : "stream";
2098    log(EXTENSION_LOG_NOTICE,
2099        "(vb %" PRId16 ") Attempting to add %s: opaque_:%" PRIu32
2100        ", "
2101        "start_seqno_:%" PRIu64 ", end_seqno_:%" PRIu64
2102        ", "
2103        "vb_uuid:%" PRIu64 ", snap_start_seqno_:%" PRIu64
2104        ", "
2105        "snap_end_seqno_:%" PRIu64 ", last_seqno:%" PRIu64,
2106        vb_,
2107        type,
2108        opaque_,
2109        start_seqno_,
2110        end_seqno_,
2111        vb_uuid,
2112        snap_start_seqno_,
2113        snap_end_seqno_,
2114        last_seqno.load());
2115}
2116
2117uint32_t PassiveStream::setDead(end_stream_status_t status) {
2118    /* Hold buffer lock so that we clear out all items before we set the stream
2119       to dead state. We do not want to add any new message to the buffer or
2120       process any items in the buffer once we set the stream state to dead. */
2121    std::unique_lock<std::mutex> lg(buffer.bufMutex);
2122    uint32_t unackedBytes = clearBuffer_UNLOCKED();
2123    bool killed = false;
2124
2125    LockHolder slh(streamMutex);
2126    if (transitionState(StreamState::Dead)) {
2127        killed = true;
2128    }
2129
2130    if (killed) {
2131        EXTENSION_LOG_LEVEL logLevel = EXTENSION_LOG_NOTICE;
2132        if (END_STREAM_DISCONNECTED == status) {
2133            logLevel = EXTENSION_LOG_WARNING;
2134        }
2135        log(logLevel,
2136            "(vb %" PRId16
2137            ") Setting stream to dead state, last_seqno is %" PRIu64
2138            ", unAckedBytes is %" PRIu32 ", status is %s",
2139            vb_,
2140            last_seqno.load(),
2141            unackedBytes,
2142            getEndStreamStatusStr(status).c_str());
2143    }
2144    return unackedBytes;
2145}
2146
2147void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
2148    std::unique_lock<std::mutex> lh(streamMutex);
2149    if (isPending()) {
2150        if (status == ENGINE_SUCCESS) {
2151            transitionState(StreamState::Reading);
2152        } else {
2153            transitionState(StreamState::Dead);
2154        }
2155        pushToReadyQ(std::make_unique<AddStreamResponse>(
2156                add_opaque, opaque_, status));
2157        lh.unlock();
2158        notifyStreamReady();
2159    }
2160}
2161
2162void PassiveStream::reconnectStream(VBucketPtr &vb,
2163                                    uint32_t new_opaque,
2164                                    uint64_t start_seqno) {
2165    /* the stream should send a don't care vb_uuid if start_seqno is 0 */
2166    vb_uuid_ = start_seqno ? vb->failovers->getLatestEntry().vb_uuid : 0;
2167
2168    snapshot_info_t info = vb->checkpointManager->getSnapshotInfo();
2169    if (info.range.end == info.start) {
2170        info.range.start = info.start;
2171    }
2172
2173    snap_start_seqno_ = info.range.start;
2174    start_seqno_ = info.start;
2175    snap_end_seqno_ = info.range.end;
2176
2177    log(EXTENSION_LOG_NOTICE,
2178        "(vb %d) Attempting to reconnect stream with opaque %" PRIu32
2179        ", start seq no %" PRIu64 ", end seq no %" PRIu64
2180        ", snap start seqno %" PRIu64 ", and snap end seqno %" PRIu64,
2181        vb_,
2182        new_opaque,
2183        start_seqno,
2184        end_seqno_,
2185        snap_start_seqno_,
2186        snap_end_seqno_);
2187    {
2188        LockHolder lh(streamMutex);
2189        last_seqno.store(start_seqno);
2190        pushToReadyQ(std::make_unique<StreamRequest>(vb_,
2191                                                     new_opaque,
2192                                                     flags_,
2193                                                     start_seqno,
2194                                                     end_seqno_,
2195                                                     vb_uuid_,
2196                                                     snap_start_seqno_,
2197                                                     snap_end_seqno_));
2198    }
2199    notifyStreamReady();
2200}
2201
2202ENGINE_ERROR_CODE PassiveStream::messageReceived(std::unique_ptr<DcpResponse> dcpResponse) {
2203    if (!dcpResponse) {
2204        return ENGINE_EINVAL;
2205    }
2206
2207    if (!isActive()) {
2208        return ENGINE_KEY_ENOENT;
2209    }
2210
2211    auto seqno = dcpResponse->getBySeqno();
2212    if (seqno) {
2213        if (uint64_t(*seqno) <= last_seqno.load()) {
2214            log(EXTENSION_LOG_WARNING,
2215                "(vb %d) Erroneous (out of sequence) message (%s) received, "
2216                "with opaque: %" PRIu32 ", its seqno (%" PRIu64
2217                ") is not "
2218                "greater than last received seqno (%" PRIu64
2219                "); "
2220                "Dropping mutation!",
2221                vb_,
2222                dcpResponse->to_string(),
2223                opaque_,
2224                *seqno,
2225                last_seqno.load());
2226            return ENGINE_ERANGE;
2227        }
2228        last_seqno.store(*seqno);
2229    } else if(dcpResponse->getEvent() == DcpResponse::Event::SnapshotMarker) {
2230        auto s = static_cast<SnapshotMarker*>(dcpResponse.get());
2231        uint64_t snapStart = s->getStartSeqno();
2232        uint64_t snapEnd = s->getEndSeqno();
2233        if (snapStart < last_seqno.load() && snapEnd <= last_seqno.load()) {
2234            log(EXTENSION_LOG_WARNING,
2235                "(vb %d) Erroneous snapshot marker received, with "
2236                "opaque: %" PRIu32
2237                ", its start "
2238                "(%" PRIu64 "), and end (%" PRIu64
2239                ") are less than last "
2240                "received seqno (%" PRIu64 "); Dropping marker!",
2241                vb_,
2242                opaque_,
2243                snapStart,
2244                snapEnd,
2245                last_seqno.load());
2246            return ENGINE_ERANGE;
2247        }
2248    }
2249
2250    switch (engine->getReplicationThrottle().getStatus()) {
2251    case ReplicationThrottle::Status::Disconnect:
2252        log(EXTENSION_LOG_WARNING,
2253            "vb:%" PRIu16
2254            " Disconnecting the connection as there is "
2255            "no memory to complete replication",
2256            vb_);
2257        return ENGINE_DISCONNECT;
2258    case ReplicationThrottle::Status::Process:
2259        if (buffer.empty()) {
2260            /* Process the response here itself rather than buffering it */
2261            ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2262            switch (dcpResponse->getEvent()) {
2263            case DcpResponse::Event::Mutation:
2264                ret = processMutation(
2265                        static_cast<MutationResponse*>(dcpResponse.get()));
2266                break;
2267            case DcpResponse::Event::Deletion:
2268            case DcpResponse::Event::Expiration:
2269                ret = processDeletion(
2270                        static_cast<MutationResponse*>(dcpResponse.get()));
2271                break;
2272            case DcpResponse::Event::SnapshotMarker:
2273                processMarker(static_cast<SnapshotMarker*>(dcpResponse.get()));
2274                break;
2275            case DcpResponse::Event::SetVbucket:
2276                processSetVBucketState(
2277                        static_cast<SetVBucketState*>(dcpResponse.get()));
2278                break;
2279            case DcpResponse::Event::StreamEnd: {
2280                LockHolder lh(streamMutex);
2281                transitionState(StreamState::Dead);
2282            } break;
2283            case DcpResponse::Event::SystemEvent: {
2284                ret = processSystemEvent(
2285                        *static_cast<SystemEventMessage*>(dcpResponse.get()));
2286                break;
2287            }
2288            default:
2289                log(EXTENSION_LOG_WARNING,
2290                    "(vb %d) Unknown event:%d, opaque:%" PRIu32,
2291                    vb_,
2292                    int(dcpResponse->getEvent()),
2293                    opaque_);
2294                return ENGINE_DISCONNECT;
2295            }
2296
2297            if (ret == ENGINE_ENOMEM) {
2298                if (engine->getReplicationThrottle().doDisconnectOnNoMem()) {
2299                    log(EXTENSION_LOG_WARNING,
2300                        "vb:%" PRIu16
2301                        " Disconnecting the connection as there is no "
2302                        "memory to complete replication; process dcp "
2303                        "event returned no memory",
2304                        vb_);
2305                    return ENGINE_DISCONNECT;
2306                }
2307            }
2308
2309            if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
2310                return ret;
2311            }
2312        }
2313        break;
2314    case ReplicationThrottle::Status::Pause:
2315        /* Do nothing specific here, we buffer item for this case and
2316           other cases below */
2317        break;
2318    }
2319
2320    // Only buffer if the stream is not dead
2321    if (isActive()) {
2322        buffer.push(std::move(dcpResponse));
2323    }
2324    return ENGINE_TMPFAIL;
2325}
2326
2327process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes,
2328                                                             size_t batchSize) {
2329    std::unique_lock<std::mutex> lh(buffer.bufMutex);
2330    uint32_t count = 0;
2331    uint32_t message_bytes = 0;
2332    uint32_t total_bytes_processed = 0;
2333    bool failed = false, noMem = false;
2334
2335    while (count < batchSize && !buffer.messages.empty()) {
2336        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2337        /* If the stream is in dead state we should not process any remaining
2338           items in the buffer, we should rather clear them */
2339        if (!isActive()) {
2340            total_bytes_processed += clearBuffer_UNLOCKED();
2341            processed_bytes = total_bytes_processed;
2342            return all_processed;
2343        }
2344
2345        std::unique_ptr<DcpResponse> response = buffer.pop_front(lh);
2346
2347        // Release bufMutex whilst we attempt to process the message
2348        // a lock inversion exists with connManager if we hold this.
2349        lh.unlock();
2350
2351        message_bytes = response->getMessageSize();
2352
2353        switch (response->getEvent()) {
2354            case DcpResponse::Event::Mutation:
2355                ret = processMutation(static_cast<MutationResponse*>(response.get()));
2356                break;
2357            case DcpResponse::Event::Deletion:
2358            case DcpResponse::Event::Expiration:
2359                ret = processDeletion(static_cast<MutationResponse*>(response.get()));
2360                break;
2361            case DcpResponse::Event::SnapshotMarker:
2362                processMarker(static_cast<SnapshotMarker*>(response.get()));
2363                break;
2364            case DcpResponse::Event::SetVbucket:
2365                processSetVBucketState(static_cast<SetVBucketState*>(response.get()));
2366                break;
2367            case DcpResponse::Event::StreamEnd:
2368                {
2369                    LockHolder lh(streamMutex);
2370                    transitionState(StreamState::Dead);
2371                }
2372                break;
2373            case DcpResponse::Event::SystemEvent: {
2374                    ret = processSystemEvent(
2375                            *static_cast<SystemEventMessage*>(response.get()));
2376                    break;
2377                }
2378            default:
2379                log(EXTENSION_LOG_WARNING,
2380                    "PassiveStream::processBufferedMessages:"
2381                    "(vb %" PRIu16
2382                    ") PassiveStream ignoring "
2383                    "unknown message type %s",
2384                    vb_,
2385                    response->to_string());
2386                continue;
2387        }
2388
2389        if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
2390            failed = true;
2391            if (ret == ENGINE_ENOMEM) {
2392                noMem = true;
2393            }
2394        }
2395
2396        // Re-acquire bufMutex so that
2397        // 1) we can update the buffer
2398        // 2) safely re-check the while conditional statement
2399        lh.lock();
2400
2401        // If we failed and the stream is not dead, stash the DcpResponse at the
2402        // front of the queue and break the loop.
2403        if (failed && isActive()) {
2404            buffer.push_front(std::move(response), lh);
2405            break;
2406        }
2407
2408        count++;
2409        if (ret != ENGINE_ERANGE) {
2410            total_bytes_processed += message_bytes;
2411        }
2412    }
2413
2414    processed_bytes = total_bytes_processed;
2415
2416    if (failed) {
2417        if (noMem && engine->getReplicationThrottle().doDisconnectOnNoMem()) {
2418            log(EXTENSION_LOG_WARNING,
2419                "vb:%" PRIu16
2420                " Processor task indicating disconnection as "
2421                "there is no memory to complete replication; process dcp "
2422                "event returned no memory ",
2423                vb_);
2424            return stop_processing;
2425        }
2426        return cannot_process;
2427    }
2428
2429    return all_processed;
2430}
2431
2432ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
2433    VBucketPtr vb = engine->getVBucket(vb_);
2434    if (!vb) {
2435        return ENGINE_NOT_MY_VBUCKET;
2436    }
2437
2438    auto consumer = consumerPtr.lock();
2439    if (!consumer) {
2440        return ENGINE_DISCONNECT;
2441    }
2442
2443    if (uint64_t(*mutation->getBySeqno()) < cur_snapshot_start.load() ||
2444        uint64_t(*mutation->getBySeqno()) > cur_snapshot_end.load()) {
2445        log(EXTENSION_LOG_WARNING,
2446            "(vb %d) Erroneous mutation [sequence "
2447            "number does not fall in the expected snapshot range : "
2448            "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64
2449            ") <= "
2450            "snapshot_end (%" PRIu64 ")]; Dropping the mutation!",
2451            vb_,
2452            cur_snapshot_start.load(),
2453            *mutation->getBySeqno(),
2454            cur_snapshot_end.load());
2455        return ENGINE_ERANGE;
2456    }
2457
2458    // MB-17517: Check for the incoming item's CAS validity. We /shouldn't/
2459    // receive anything without a valid CAS, however given that versions without
2460    // this check may send us "bad" CAS values, we should regenerate them (which
2461    // is better than rejecting the data entirely).
2462    if (!Item::isValidCas(mutation->getItem()->getCas())) {
2463        log(EXTENSION_LOG_WARNING,
2464            "Invalid CAS (0x%" PRIx64 ") received for mutation {vb:%" PRIu16
2465            ", seqno:%" PRId64 "}. Regenerating new CAS",
2466            mutation->getItem()->getCas(),
2467            vb_,
2468            mutation->getItem()->getBySeqno());
2469        mutation->getItem()->setCas();
2470    }
2471
2472    ENGINE_ERROR_CODE ret;
2473    if (vb->isBackfillPhase()) {
2474        ret = engine->getKVBucket()->addBackfillItem(
2475                *mutation->getItem(),
2476                GenerateBySeqno::No,
2477                mutation->getExtMetaData());
2478    } else {
2479        ret = engine->getKVBucket()->setWithMeta(*mutation->getItem(),
2480                                                 0,
2481                                                 NULL,
2482                                                 consumer->getCookie(),
2483                                                 {vbucket_state_active,
2484                                                  vbucket_state_replica,
2485                                                  vbucket_state_pending},
2486                                                 CheckConflicts::No,
2487                                                 true,
2488                                                 GenerateBySeqno::No,
2489                                                 GenerateCas::No,
2490                                                 mutation->getExtMetaData(),
2491                                                 true);
2492    }
2493
2494    if (ret != ENGINE_SUCCESS) {
2495        log(EXTENSION_LOG_WARNING,
2496            "vb:%" PRIu16
2497            " Got error '%s' while trying to process "
2498            "mutation with seqno:%" PRId64,
2499            vb_,
2500            cb::to_string(cb::to_engine_errc(ret)).c_str(),
2501            mutation->getItem()->getBySeqno());
2502    } else {
2503        handleSnapshotEnd(vb, *mutation->getBySeqno());
2504    }
2505
2506    return ret;
2507}
2508
2509ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
2510    VBucketPtr vb = engine->getVBucket(vb_);
2511    if (!vb) {
2512        return ENGINE_NOT_MY_VBUCKET;
2513    }
2514
2515    auto consumer = consumerPtr.lock();
2516    if (!consumer) {
2517        return ENGINE_DISCONNECT;
2518    }
2519
2520    if (uint64_t(*deletion->getBySeqno()) < cur_snapshot_start.load() ||
2521        uint64_t(*deletion->getBySeqno()) > cur_snapshot_end.load()) {
2522        log(EXTENSION_LOG_WARNING,
2523            "(vb %d) Erroneous deletion [sequence "
2524            "number does not fall in the expected snapshot range : "
2525            "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64
2526            ") <= "
2527            "snapshot_end (%" PRIu64 ")]; Dropping the deletion!",
2528            vb_,
2529            cur_snapshot_start.load(),
2530            *deletion->getBySeqno(),
2531            cur_snapshot_end.load());
2532        return ENGINE_ERANGE;
2533    }
2534
2535    // The deleted value has a body, send it through the mutation path so we
2536    // set the deleted item with a value
2537    if (deletion->getItem()->getNBytes()) {
2538        return processMutation(deletion);
2539    }
2540
2541    uint64_t delCas = 0;
2542    ENGINE_ERROR_CODE ret;
2543    ItemMetaData meta = deletion->getItem()->getMetaData();
2544
2545    // MB-17517: Check for the incoming item's CAS validity.
2546    if (!Item::isValidCas(meta.cas)) {
2547        log(EXTENSION_LOG_WARNING,
2548            "Invalid CAS (0x%" PRIx64 ") received for deletion {vb:%" PRIu16
2549            ", seqno:%" PRId64 "}. Regenerating new CAS",
2550            meta.cas,
2551            vb_,
2552            *deletion->getBySeqno());
2553        meta.cas = Item::nextCas();
2554    }
2555
2556    ret = engine->getKVBucket()->deleteWithMeta(deletion->getItem()->getKey(),
2557                                                delCas,
2558                                                nullptr,
2559                                                deletion->getVBucket(),
2560                                                consumer->getCookie(),
2561                                                {vbucket_state_active,
2562                                                 vbucket_state_replica,
2563                                                 vbucket_state_pending},
2564                                                CheckConflicts::No,
2565                                                meta,
2566                                                vb->isBackfillPhase(),
2567                                                GenerateBySeqno::No,
2568                                                GenerateCas::No,
2569                                                *deletion->getBySeqno(),
2570                                                deletion->getExtMetaData(),
2571                                                true);
2572    if (ret == ENGINE_KEY_ENOENT) {
2573        ret = ENGINE_SUCCESS;
2574    }
2575
2576    if (ret != ENGINE_SUCCESS) {
2577        log(EXTENSION_LOG_WARNING,
2578            "vb:%" PRIu16
2579            " Got error '%s' while trying to process "
2580            "deletion with seqno:%" PRId64,
2581            vb_,
2582            cb::to_string(cb::to_engine_errc(ret)).c_str(),
2583            *deletion->getBySeqno());
2584    } else {
2585        handleSnapshotEnd(vb, *deletion->getBySeqno());
2586    }
2587
2588    return ret;
2589}
2590
2591ENGINE_ERROR_CODE PassiveStream::processSystemEvent(
2592        const SystemEventMessage& event) {
2593    VBucketPtr vb = engine->getVBucket(vb_);
2594
2595    if (!vb) {
2596        return ENGINE_NOT_MY_VBUCKET;
2597    }
2598
2599    ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
2600    // Depending on the event, extras is different and key may even be empty
2601    // The specific handler will know how to interpret.
2602    switch (event.getSystemEvent()) {
2603    case mcbp::systemevent::id::CreateCollection: {
2604        rv = processCreateCollection(*vb, {event});
2605        break;
2606    }
2607    case mcbp::systemevent::id::DeleteCollection: {
2608        rv = processBeginDeleteCollection(*vb, {event});
2609        break;
2610    }
2611    case mcbp::systemevent::id::CollectionsSeparatorChanged: {
2612        rv = processSeparatorChanged(*vb, {event});
2613        break;
2614    }
2615    default: {
2616        rv = ENGINE_EINVAL;
2617        break;
2618    }
2619    }
2620
2621    if (rv != ENGINE_SUCCESS) {
2622        log(EXTENSION_LOG_WARNING,
2623            "vb:%" PRIu16
2624            " Got error '%s' while trying to process "
2625            "system event",
2626            vb_,
2627            cb::to_string(cb::to_engine_errc(rv)).c_str());
2628    } else {
2629        handleSnapshotEnd(vb, *event.getBySeqno());
2630    }
2631
2632    return rv;
2633}
2634
2635ENGINE_ERROR_CODE PassiveStream::processCreateCollection(
2636        VBucket& vb, const CreateOrDeleteCollectionEvent& event) {
2637    try {
2638        vb.replicaAddCollection(event.getManifestUid(),
2639                                event.getCollection(),
2640                                event.getBySeqno());
2641    } catch (std::exception& e) {
2642        LOG(EXTENSION_LOG_WARNING,
2643            "PassiveStream::processCreateCollection exception %s",
2644            e.what());
2645        return ENGINE_EINVAL;
2646    }
2647    return ENGINE_SUCCESS;
2648}
2649
2650ENGINE_ERROR_CODE PassiveStream::processBeginDeleteCollection(
2651        VBucket& vb, const CreateOrDeleteCollectionEvent& event) {
2652    try {
2653        vb.replicaBeginDeleteCollection(event.getManifestUid(),
2654                                        event.getCollection(),
2655                                        event.getBySeqno());
2656    } catch (std::exception& e) {
2657        LOG(EXTENSION_LOG_WARNING,
2658            "PassiveStream::processBeginDeleteCollection exception %s",
2659            e.what());
2660        return ENGINE_EINVAL;
2661    }
2662    return ENGINE_SUCCESS;
2663}
2664
2665ENGINE_ERROR_CODE PassiveStream::processSeparatorChanged(
2666        VBucket& vb, const ChangeSeparatorCollectionEvent& event) {
2667    try {
2668        vb.replicaChangeCollectionSeparator(event.getManifestUid(),
2669                                            event.getSeparator(),
2670                                            event.getBySeqno());
2671    } catch (std::exception& e) {
2672        LOG(EXTENSION_LOG_WARNING,
2673            "PassiveStream::processSeparatorChanged exception %s",
2674            e.what());
2675        return ENGINE_EINVAL;
2676    }
2677    return ENGINE_SUCCESS;
2678}
2679
2680void PassiveStream::processMarker(SnapshotMarker* marker) {
2681    VBucketPtr vb = engine->getVBucket(vb_);
2682
2683    cur_snapshot_start.store(marker->getStartSeqno());
2684    cur_snapshot_end.store(marker->getEndSeqno());
2685    cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK) ?
2686            Snapshot::Disk : Snapshot::Memory);
2687
2688    if (vb) {
2689        auto& ckptMgr = *vb->checkpointManager;
2690        if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
2691            vb->setBackfillPhase(true);
2692            // calling setBackfillPhase sets the openCheckpointId to zero.
2693            ckptMgr.setBackfillPhase(cur_snapshot_start.load(),
2694                                     cur_snapshot_end.load());
2695        } else {
2696            if (marker->getFlags() & MARKER_FLAG_CHK ||
2697                vb->checkpointManager->getOpenCheckpointId() == 0) {
2698                ckptMgr.createSnapshot(cur_snapshot_start.load(),
2699                                       cur_snapshot_end.load());
2700            } else {
2701                ckptMgr.updateCurrentSnapshotEnd(cur_snapshot_end.load());
2702            }
2703            vb->setBackfillPhase(false);
2704        }
2705
2706        if (marker->getFlags() & MARKER_FLAG_ACK) {
2707            cur_snapshot_ack = true;
2708        }
2709    }
2710}
2711
2712void PassiveStream::processSetVBucketState(SetVBucketState* state) {
2713    engine->getKVBucket()->setVBucketState(vb_, state->getState(), true);
2714    {
2715        LockHolder lh (streamMutex);
2716        pushToReadyQ(std::make_unique<SetVBucketStateResponse>(opaque_,
2717                                                               ENGINE_SUCCESS));
2718    }
2719    notifyStreamReady();
2720}
2721
2722void PassiveStream::handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno) {
2723    if (byseqno == cur_snapshot_end.load()) {
2724        auto& ckptMgr = *vb->checkpointManager;
2725
2726        if (cur_snapshot_type.load() == Snapshot::Disk &&
2727                vb->isBackfillPhase()) {
2728            vb->setBackfillPhase(false);
2729        }
2730
2731        // MB-30019: we always want to close the open checkpoint on replica
2732        // vbuckets when the Consumer receives the snapshot-end mutation
2733        ckptMgr.checkAndAddNewCheckpoint(ckptMgr.getOpenCheckpointId() + 1,
2734                                         *vb);
2735
2736        if (cur_snapshot_ack) {
2737            {
2738                LockHolder lh(streamMutex);
2739                pushToReadyQ(std::make_unique<SnapshotMarkerResponse>(
2740                        opaque_, ENGINE_SUCCESS));
2741            }
2742            notifyStreamReady();
2743            cur_snapshot_ack = false;
2744        }
2745        cur_snapshot_type.store(Snapshot::None);
2746    }
2747}
2748
2749void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
2750    Stream::addStats(add_stat, c);
2751
2752    try {
2753        const int bsize = 1024;
2754        char buf[bsize];
2755        size_t bufferItems = 0;
2756        size_t bufferBytes = 0;
2757        {
2758            std::lock_guard<std::mutex> lg(buffer.bufMutex);
2759            bufferItems = buffer.messages.size();
2760            bufferBytes = buffer.bytes;
2761        }
2762        checked_snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(),
2763                         vb_);
2764        add_casted_stat(buf, bufferItems, add_stat, c);
2765        checked_snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(),
2766                         vb_);
2767        add_casted_stat(buf, bufferBytes, add_stat, c);
2768        checked_snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(),
2769                         vb_);
2770        add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
2771        checked_snprintf(buf, bsize, "%s:stream_%d_last_received_seqno",
2772                         name_.c_str(), vb_);
2773        add_casted_stat(buf, last_seqno.load(), add_stat, c);
2774        checked_snprintf(buf, bsize, "%s:stream_%d_ready_queue_memory",
2775                         name_.c_str(), vb_);
2776        add_casted_stat(buf, getReadyQueueMemory(), add_stat, c);
2777
2778        checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type",
2779                         name_.c_str(), vb_);
2780        add_casted_stat(buf, ::to_string(cur_snapshot_type.load()),
2781                        add_stat, c);
2782
2783        if (cur_snapshot_type.load() != Snapshot::None) {
2784            checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start",
2785                             name_.c_str(), vb_);
2786            add_casted_stat(buf, cur_snapshot_start.load(), add_stat, c);
2787            checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end",
2788                             name_.c_str(), vb_);
2789            add_casted_stat(buf, cur_snapshot_end.load(), add_stat, c);
2790        }
2791    } catch (std::exception& error) {
2792        LOG(EXTENSION_LOG_WARNING,
2793            "PassiveStream::addStats: Failed to build stats: %s", error.what());
2794    }
2795}
2796
2797std::unique_ptr<DcpResponse> PassiveStream::next() {
2798    LockHolder lh(streamMutex);
2799
2800    if (readyQ.empty()) {
2801        itemsReady.store(false);
2802        return NULL;
2803    }
2804
2805    return popFromReadyQ();
2806}
2807
2808uint32_t PassiveStream::clearBuffer_UNLOCKED() {
2809    uint32_t unackedBytes = buffer.bytes;
2810    buffer.messages.clear();
2811    buffer.bytes = 0;
2812    return unackedBytes;
2813}
2814
2815bool PassiveStream::transitionState(StreamState newState) {
2816    log(EXTENSION_LOG_INFO,
2817        "PassiveStream::transitionState: (vb %d) "
2818        "Transitioning from %s to %s",
2819        vb_,
2820        to_string(state_.load()).c_str(),
2821        to_string(newState).c_str());
2822
2823    if (state_ == newState) {
2824        return false;
2825    }
2826
2827    bool validTransition = false;
2828    switch (state_.load()) {
2829        case StreamState::Pending:
2830            if (newState == StreamState::Reading ||
2831                    newState == StreamState::Dead) {
2832                validTransition = true;
2833            }
2834            break;
2835
2836        case StreamState::Backfilling:
2837        case StreamState::InMemory:
2838        case StreamState::TakeoverSend:
2839        case StreamState::TakeoverWait:
2840            // Not valid for passive streams
2841            break;
2842
2843        case StreamState::Reading:
2844            if (newState == StreamState::Pending ||
2845                    newState == StreamState::Dead) {
2846                validTransition = true;
2847            }
2848            break;
2849
2850        case StreamState::Dead:
2851            // Once 'dead' shouldn't transition away from it.
2852            break;
2853    }
2854
2855    if (!validTransition) {
2856        throw std::invalid_argument("PassiveStream::transitionState:"
2857                " newState (which is" + to_string(newState) +
2858                ") is not valid for current state (which is " +
2859                to_string(state_.load()) + ")");
2860    }
2861
2862    state_ = newState;
2863    return true;
2864}
2865
2866std::string PassiveStream::getEndStreamStatusStr(end_stream_status_t status) {
2867    switch (status) {
2868        case END_STREAM_OK:
2869            return "The stream closed as part of normal operation";
2870        case END_STREAM_CLOSED:
2871            return "The stream closed due to a close stream message";
2872        case END_STREAM_DISCONNECTED:
2873            return "The stream closed early because the conn was disconnected";
2874        case END_STREAM_STATE:
2875            return "The stream closed early because the vbucket state changed";
2876        default:
2877            break;
2878    }
2879    return std::string{"Status unknown: " + std::to_string(status) +
2880                       "; this should not have happened!"};
2881}
2882
2883void PassiveStream::log(EXTENSION_LOG_LEVEL severity,
2884                        const char* fmt,
2885                        ...) const {
2886    va_list va;
2887    va_start(va, fmt);
2888    auto consumer = consumerPtr.lock();
2889    if (consumer) {
2890        consumer->getLogger().vlog(severity, fmt, va);
2891    } else {
2892        static Logger defaultLogger =
2893                Logger("DCP (Consumer): **Deleted conn**");
2894        defaultLogger.vlog(severity, fmt, va);
2895    }
2896    va_end(va);
2897}
2898
2899void PassiveStream::notifyStreamReady() {
2900    auto consumer = consumerPtr.lock();
2901    if (!consumer) {
2902        return;
2903    }
2904
2905    bool inverse = false;
2906    if (itemsReady.compare_exchange_strong(inverse, true)) {
2907        consumer->notifyStreamReady(vb_);
2908    }
2909}
2910