xref: /5.5.2/kv_engine/engines/ep/src/dcp/stream.cc (revision 075614a6)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include "checkpoint.h"
21#include "collections/vbucket_filter.h"
22#include "dcp/backfill-manager.h"
23#include "dcp/backfill.h"
24#include "dcp/consumer.h"
25#include "dcp/producer.h"
26#include "dcp/response.h"
27#include "dcp/stream.h"
28#include "ep_engine.h"
29#include "ep_time.h"
30#include "executorpool.h"
31#include "failover-table.h"
32#include "kv_bucket.h"
33#include "kvstore.h"
34#include "replicationthrottle.h"
35#include "statwriter.h"
36
37#include <platform/checked_snprintf.h>
38
39#include <memory>
40
41
42const char* to_string(Stream::Snapshot type) {
43    switch (type) {
44    case Stream::Snapshot::None:
45        return "none";
46    case Stream::Snapshot::Disk:
47        return "disk";
48    case Stream::Snapshot::Memory:
49        return "memory";
50    }
51    throw std::logic_error("to_string(Stream::Snapshot): called with invalid "
52            "Snapshot type:" + std::to_string(int(type)));
53}
54
55const std::string to_string(Stream::Type type) {
56    switch (type) {
57    case Stream::Type::Active:
58        return "Active";
59    case Stream::Type::Notifier:
60        return "Notifier";
61    case Stream::Type::Passive:
62        return "Passive";
63    }
64    throw std::logic_error("to_string(Stream::Type): called with invalid "
65            "type:" + std::to_string(int(type)));
66}
67
68const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
69
70Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
71               uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
72               uint64_t vb_uuid, uint64_t snap_start_seqno,
73               uint64_t snap_end_seqno, Type type)
74    : name_(name),
75      flags_(flags),
76      opaque_(opaque),
77      vb_(vb),
78      start_seqno_(start_seqno),
79      end_seqno_(end_seqno),
80      vb_uuid_(vb_uuid),
81      snap_start_seqno_(snap_start_seqno),
82      snap_end_seqno_(snap_end_seqno),
83      state_(StreamState::Pending),
84      type_(type),
85      itemsReady(false),
86      readyQ_non_meta_items(0),
87      readyQueueMemory(0) {
88}
89
90Stream::~Stream() {
91    // NB: reusing the "unlocked" method without a lock because we're
92    // destructing and should not take any locks.
93    clear_UNLOCKED();
94}
95
96const std::string Stream::to_string(Stream::StreamState st) {
97    switch(st) {
98    case StreamState::Pending:
99        return "pending";
100    case StreamState::Backfilling:
101        return "backfilling";
102    case StreamState::InMemory:
103        return "in-memory";
104    case StreamState::TakeoverSend:
105        return "takeover-send";
106    case StreamState::TakeoverWait:
107        return "takeover-wait";
108    case StreamState::Reading:
109        return "reading";
110    case StreamState::Dead:
111        return "dead";
112    }
113    throw std::invalid_argument(
114        "Stream::to_string(StreamState): " + std::to_string(int(st)));
115}
116
117bool Stream::isTypeActive() const {
118    return type_ == Type::Active;
119}
120
121bool Stream::isActive() const {
122    return state_.load() != StreamState::Dead;
123}
124
125bool Stream::isBackfilling() const {
126    return state_.load() == StreamState::Backfilling;
127}
128
129bool Stream::isInMemory() const {
130    return state_.load() == StreamState::InMemory;
131}
132
133bool Stream::isPending() const {
134    return state_.load() == StreamState::Pending;
135}
136
137bool Stream::isTakeoverSend() const {
138    return state_.load() == StreamState::TakeoverSend;
139}
140
141bool Stream::isTakeoverWait() const {
142    return state_.load() == StreamState::TakeoverWait;
143}
144
145void Stream::clear_UNLOCKED() {
146    while (!readyQ.empty()) {
147        popFromReadyQ();
148    }
149}
150
151void Stream::pushToReadyQ(std::unique_ptr<DcpResponse> resp) {
152    /* expect streamMutex.ownsLock() == true */
153    if (resp) {
154        if (!resp->isMetaEvent()) {
155            readyQ_non_meta_items++;
156        }
157        readyQueueMemory.fetch_add(resp->getMessageSize(),
158                                   std::memory_order_relaxed);
159        readyQ.push(std::move(resp));
160    }
161}
162
163std::unique_ptr<DcpResponse> Stream::popFromReadyQ(void) {
164    /* expect streamMutex.ownsLock() == true */
165    if (!readyQ.empty()) {
166        auto front = std::move(readyQ.front());
167        readyQ.pop();
168
169        if (!front->isMetaEvent()) {
170            readyQ_non_meta_items--;
171        }
172        const uint32_t respSize = front->getMessageSize();
173
174        /* Decrement the readyQ size */
175        if (respSize <= readyQueueMemory.load(std::memory_order_relaxed)) {
176            readyQueueMemory.fetch_sub(respSize, std::memory_order_relaxed);
177        } else {
178            LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
179                "underflow, likely wrong stat calculation! curr size: %" PRIu64
180                "; new size: %d",
181                name_.c_str(), getVBucket(),
182                readyQueueMemory.load(std::memory_order_relaxed), respSize);
183            readyQueueMemory.store(0, std::memory_order_relaxed);
184        }
185
186        return front;
187    }
188
189    return nullptr;
190}
191
192uint64_t Stream::getReadyQueueMemory() {
193    return readyQueueMemory.load(std::memory_order_relaxed);
194}
195
196void Stream::addStats(ADD_STAT add_stat, const void *c) {
197    try {
198        const int bsize = 1024;
199        char buffer[bsize];
200        checked_snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(),
201                         vb_);
202        add_casted_stat(buffer, flags_, add_stat, c);
203        checked_snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(),
204                         vb_);
205        add_casted_stat(buffer, opaque_, add_stat, c);
206        checked_snprintf(buffer, bsize, "%s:stream_%d_start_seqno",
207                         name_.c_str(), vb_);
208        add_casted_stat(buffer, start_seqno_, add_stat, c);
209        checked_snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(),
210                         vb_);
211        add_casted_stat(buffer, end_seqno_, add_stat, c);
212        checked_snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(),
213                         vb_);
214        add_casted_stat(buffer, vb_uuid_, add_stat, c);
215        checked_snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno",
216                         name_.c_str(), vb_);
217        add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
218        checked_snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno",
219                         name_.c_str(), vb_);
220        add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
221        checked_snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(),
222                         vb_);
223        add_casted_stat(buffer, to_string(state_.load()), add_stat, c);
224    } catch (std::exception& error) {
225        LOG(EXTENSION_LOG_WARNING,
226            "Stream::addStats: Failed to build stats: %s", error.what());
227    }
228}
229
230ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
231                           std::shared_ptr<DcpProducer> p,
232                           const std::string& n,
233                           uint32_t flags,
234                           uint32_t opaque,
235                           VBucket& vbucket,
236                           uint64_t st_seqno,
237                           uint64_t en_seqno,
238                           uint64_t vb_uuid,
239                           uint64_t snap_start_seqno,
240                           uint64_t snap_end_seqno,
241                           IncludeValue includeVal,
242                           IncludeXattrs includeXattrs,
243                           IncludeDeleteTime includeDeleteTime,
244                           const Collections::Filter& filter,
245                           const Collections::VB::Manifest& manifest)
246    : Stream(n,
247             flags,
248             opaque,
249             vbucket.getId(),
250             st_seqno,
251             en_seqno,
252             vb_uuid,
253             snap_start_seqno,
254             snap_end_seqno,
255             Type::Active),
256      isBackfillTaskRunning(false),
257      pendingBackfill(false),
258      lastReadSeqno(st_seqno),
259      backfillRemaining(0),
260      lastReadSeqnoUnSnapshotted(st_seqno),
261      lastSentSeqno(st_seqno),
262      curChkSeqno(st_seqno),
263      takeoverState(vbucket_state_pending),
264      itemsFromMemoryPhase(0),
265      firstMarkerSent(false),
266      waitForSnapshot(0),
267      engine(e),
268      producerPtr(p),
269      lastSentSnapEndSeqno(0),
270      chkptItemsExtractionInProgress(false),
271      includeValue(includeVal),
272      includeXattributes(includeXattrs),
273      includeDeleteTime(includeDeleteTime),
274      snappyEnabled(p->isSnappyEnabled() ? SnappyEnabled::Yes
275                                         : SnappyEnabled::No),
276      forceValueCompression(p->isForceValueCompressionEnabled()
277                                    ? ForceValueCompression::Yes
278                                    : ForceValueCompression::No),
279      filter(filter, manifest),
280      cursorName(n + '/' + std::to_string(cursorUID.fetch_add(1))) {
281    const char* type = "";
282    if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
283        type = "takeover ";
284        end_seqno_ = dcpMaxSeqno;
285    }
286
287    ReaderLockHolder rlh(vbucket.getStateLock());
288    if (vbucket.getState() == vbucket_state_replica) {
289        snapshot_info_t info = vbucket.checkpointManager->getSnapshotInfo();
290        if (info.range.end > en_seqno) {
291            end_seqno_ = info.range.end;
292        }
293    }
294
295    p->getLogger().log(
296            EXTENSION_LOG_NOTICE,
297            "(vb %" PRIu16 ") Creating %sstream with start seqno %" PRIu64
298            " and end seqno %" PRIu64 "; requested end seqno was %" PRIu64,
299            vbucket.getId(),
300            type,
301            st_seqno,
302            end_seqno_,
303            en_seqno);
304
305    backfillItems.memory = 0;
306    backfillItems.disk = 0;
307    backfillItems.sent = 0;
308
309    bufferedBackfill.bytes = 0;
310    bufferedBackfill.items = 0;
311
312    takeoverStart = 0;
313    takeoverSendMaxTime = engine->getConfiguration().getDcpTakeoverMaxTime();
314
315    if (start_seqno_ >= end_seqno_) {
316        /* streamMutex lock needs to be acquired because endStream
317         * potentially makes call to pushToReadyQueue.
318         */
319        LockHolder lh(streamMutex);
320        endStream(END_STREAM_OK);
321        itemsReady.store(true);
322        // lock is released on leaving the scope
323    }
324
325    // Finally obtain a copy of the current separator
326    currentSeparator = vbucket.getManifest().lock().getSeparator();
327}
328
329ActiveStream::~ActiveStream() {
330    if (state_ != StreamState::Dead) {
331        removeCheckpointCursor();
332    }
333}
334
335std::unique_ptr<DcpResponse> ActiveStream::next() {
336    std::lock_guard<std::mutex> lh(streamMutex);
337    return next(lh);
338}
339
340std::unique_ptr<DcpResponse> ActiveStream::next(
341        std::lock_guard<std::mutex>& lh) {
342    std::unique_ptr<DcpResponse> response;
343
344    switch (state_.load()) {
345        case StreamState::Pending:
346            break;
347        case StreamState::Backfilling:
348            response = backfillPhase(lh);
349            break;
350        case StreamState::InMemory:
351            response = inMemoryPhase();
352            break;
353        case StreamState::TakeoverSend:
354            response = takeoverSendPhase();
355            break;
356        case StreamState::TakeoverWait:
357            response = takeoverWaitPhase();
358            break;
359        case StreamState::Reading:
360            // Not valid for an active stream.
361            {
362                auto producer = producerPtr.lock();
363                std::string connHeader =
364                        producer ? producer->logHeader()
365                                 : "DCP (Producer): **Deleted conn**";
366                throw std::logic_error(
367                        "ActiveStream::next: Invalid state "
368                        "StreamReading for stream " +
369                        connHeader + " vb " + std::to_string(vb_));
370            }
371            break;
372        case StreamState::Dead:
373            response = deadPhase();
374            break;
375    }
376
377    itemsReady.store(response ? true : false);
378    return response;
379}
380
381void ActiveStream::registerCursor(CheckpointManager& chkptmgr,
382                                  uint64_t lastProcessedSeqno) {
383    try {
384        CursorRegResult result = chkptmgr.registerCursorBySeqno(
385                cursorName, lastProcessedSeqno, MustSendCheckpointEnd::NO);
386        cursorRegistered = true;
387
388        /*
389         * MB-22960:  Due to cursor dropping we re-register the replication
390         * cursor only during backfill when we mark the disk snapshot.  However
391         * by this point it is possible that the CheckpointManager no longer
392         * contains the next sequence number the replication stream requires
393         * (i.e. next one after the backfill seqnos).
394         *
395         * To avoid this data loss when we register the cursor we check to see
396         * if the result is greater than the lastProcessedSeqno + 1.
397         * If so we know we may have missed some items and may need to perform
398         * another backfill.
399         *
400         * We actually only need to do another backfill if the result is greater
401         * than the lastProcessedSeqno + 1 and registerCursorBySeqno returns
402         * true, indicating that the resulting seqno starts with the first item
403         * on a checkpoint.
404         */
405        const uint64_t nextRequiredSeqno = lastProcessedSeqno + 1;
406        if (result.first > nextRequiredSeqno && result.second) {
407            pendingBackfill = true;
408        }
409        curChkSeqno = result.first;
410    } catch(std::exception& error) {
411        log(EXTENSION_LOG_WARNING,
412            "(vb %" PRIu16 ") Failed to register cursor: %s",
413            vb_,
414            error.what());
415        endStream(END_STREAM_STATE);
416    }
417}
418
419void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
420    {
421        LockHolder lh(streamMutex);
422        uint64_t chkCursorSeqno = endSeqno;
423
424        if (!isBackfilling()) {
425            log(EXTENSION_LOG_WARNING,
426                "(vb %" PRIu16
427                ") ActiveStream::"
428                "markDiskSnapshot: Unexpected state_:%s",
429                vb_,
430                to_string(state_.load()).c_str());
431            return;
432        }
433
434        /* We need to send the requested 'snap_start_seqno_' as the snapshot
435           start when we are sending the first snapshot because the first
436           snapshot could be resumption of a previous snapshot */
437        if (!firstMarkerSent) {
438            startSeqno = std::min(snap_start_seqno_, startSeqno);
439            firstMarkerSent = true;
440        }
441
442        VBucketPtr vb = engine->getVBucket(vb_);
443        if (!vb) {
444            log(EXTENSION_LOG_WARNING,
445                "(vb %" PRIu16
446                ") "
447                "ActiveStream::markDiskSnapshot, vbucket "
448                "does not exist",
449                vb_);
450            return;
451        }
452        // An atomic read of vbucket state without acquiring the
453        // reader lock for state should suffice here.
454        if (vb->getState() == vbucket_state_replica) {
455            if (end_seqno_ > endSeqno) {
456                /* We possibly have items in the open checkpoint
457                   (incomplete snapshot) */
458                snapshot_info_t info = vb->checkpointManager->getSnapshotInfo();
459                log(EXTENSION_LOG_NOTICE,
460                    "(vb %" PRIu16
461                    ") Merging backfill and memory snapshot for a "
462                    "replica vbucket, backfill start seqno %" PRIu64
463                    ", "
464                    "backfill end seqno %" PRIu64
465                    ", "
466                    "snapshot end seqno after merge %" PRIu64,
467                    vb_,
468                    startSeqno,
469                    endSeqno,
470                    info.range.end);
471                endSeqno = info.range.end;
472            }
473        }
474
475        log(EXTENSION_LOG_NOTICE,
476            "(vb %" PRIu16 ") Sending disk snapshot with start seqno %" PRIu64
477            " and end seqno %" PRIu64,
478            vb_,
479            startSeqno,
480            endSeqno);
481        pushToReadyQ(std::make_unique<SnapshotMarker>(
482                opaque_, vb_, startSeqno, endSeqno, MARKER_FLAG_DISK));
483        lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
484
485        if (!(flags_ & DCP_ADD_STREAM_FLAG_DISKONLY)) {
486            // Only re-register the cursor if we still need to get memory
487            // snapshots
488            registerCursor(*vb->checkpointManager, chkCursorSeqno);
489        }
490    }
491    notifyStreamReady();
492}
493
494bool ActiveStream::backfillReceived(std::unique_ptr<Item> itm,
495                                    backfill_source_t backfill_source,
496                                    bool force) {
497    if (!itm) {
498        return false;
499    }
500
501    if (itm->shouldReplicate()) {
502        std::unique_lock<std::mutex> lh(streamMutex);
503        if (isBackfilling() && filter.checkAndUpdate(*itm)) {
504            queued_item qi(std::move(itm));
505            std::unique_ptr<DcpResponse> resp(makeResponseFromItem(qi));
506            auto producer = producerPtr.lock();
507            if (!producer ||
508                !producer->recordBackfillManagerBytesRead(
509                        resp->getApproximateSize(), force)) {
510                // Deleting resp may also delete itm (which is owned by
511                // resp)
512                resp.reset();
513                return false;
514            }
515
516            bufferedBackfill.bytes.fetch_add(resp->getApproximateSize());
517            bufferedBackfill.items++;
518            lastReadSeqno.store(uint64_t(*resp->getBySeqno()));
519
520            pushToReadyQ(std::move(resp));
521
522            lh.unlock();
523            notifyStreamReady();
524
525            if (backfill_source == BACKFILL_FROM_MEMORY) {
526                backfillItems.memory++;
527            } else {
528                backfillItems.disk++;
529            }
530        }
531    }
532
533    return true;
534}
535
536void ActiveStream::completeBackfill() {
537    {
538        LockHolder lh(streamMutex);
539        if (isBackfilling()) {
540            log(EXTENSION_LOG_NOTICE,
541                "(vb %" PRIu16 ") Backfill complete, %" PRIu64
542                " items "
543                "read from disk, %" PRIu64
544                " from memory, last seqno read: "
545                "%" PRIu64 ", pendingBackfill : %s",
546                vb_,
547                uint64_t(backfillItems.disk.load()),
548                uint64_t(backfillItems.memory.load()),
549                lastReadSeqno.load(),
550                pendingBackfill ? "True" : "False");
551        } else {
552            log(EXTENSION_LOG_WARNING,
553                "(vb %" PRIu16
554                ") ActiveStream::completeBackfill: "
555                "Unexpected state_:%s",
556                vb_,
557                to_string(state_.load()).c_str());
558        }
559    }
560
561    bool inverse = true;
562    isBackfillTaskRunning.compare_exchange_strong(inverse, false);
563    notifyStreamReady();
564}
565
566void ActiveStream::snapshotMarkerAckReceived() {
567    if (--waitForSnapshot == 0) {
568        notifyStreamReady();
569    }
570}
571
572void ActiveStream::setVBucketStateAckRecieved() {
573    VBucketPtr vbucket = engine->getVBucket(vb_);
574    if (!vbucket) {
575        log(EXTENSION_LOG_WARNING,
576            "(vb %" PRIu16
577            ") not present during ack for set "
578            "vbucket during takeover",
579            vb_);
580        return;
581    }
582
583    {
584        /* Order in which the below 3 locks are acquired is important to avoid
585           any potential lock inversion problems */
586        std::unique_lock<std::mutex> epVbSetLh(engine->getKVBucket()->getVbSetMutexLock());
587        WriterLockHolder vbStateLh(vbucket->getStateLock());
588        std::unique_lock<std::mutex> lh(streamMutex);
589        if (isTakeoverWait()) {
590            if (takeoverState == vbucket_state_pending) {
591                log(EXTENSION_LOG_INFO,
592                    "(vb %" PRIu16
593                    ") Receive ack for set vbucket state to "
594                    "pending message",
595                    vb_);
596
597                takeoverState = vbucket_state_active;
598                transitionState(StreamState::TakeoverSend);
599
600                engine->getKVBucket()->setVBucketState_UNLOCKED(
601                                                        vb_,
602                                                        vbucket_state_dead,
603                                                        false /* transfer */,
604                                                        false /* notify_dcp */,
605                                                        epVbSetLh,
606                                                        &vbStateLh);
607
608                log(EXTENSION_LOG_NOTICE,
609                    "(vb %" PRIu16
610                    ") Vbucket marked as dead, last sent "
611                    "seqno: %" PRIu64 ", high seqno: %" PRIu64,
612                    vb_,
613                    lastSentSeqno.load(),
614                    vbucket->getHighSeqno());
615            } else {
616                log(EXTENSION_LOG_NOTICE,
617                    "(vb %" PRIu16
618                    ") Receive ack for set vbucket state to "
619                    "active message",
620                    vb_);
621                endStream(END_STREAM_OK);
622            }
623        } else {
624            log(EXTENSION_LOG_WARNING,
625                "(vb %" PRIu16
626                ") Unexpected ack for set vbucket op on "
627                "stream '%s' state '%s'",
628                vb_,
629                name_.c_str(),
630                to_string(state_.load()).c_str());
631            return;
632        }
633    }
634
635    notifyStreamReady();
636}
637
638std::unique_ptr<DcpResponse> ActiveStream::backfillPhase(
639        std::lock_guard<std::mutex>& lh) {
640    auto resp = nextQueuedItem();
641
642    if (resp) {
643        /* It is ok to have recordBackfillManagerBytesSent() and
644           bufferedBackfill.bytes.fetch_sub() for all events because
645           resp->getApproximateSize() is non zero for only certain resp types.
646           (MB-24905 is open to make the accounting cleaner) */
647        auto producer = producerPtr.lock();
648        if (!producer) {
649            throw std::logic_error("(vb:" + std::to_string(vb_) +
650                                   " )Producer reference null in the stream in "
651                                   "backfillPhase(). This should not happen as "
652                                   "the function is called from the producer "
653                                   "object");
654        }
655
656        producer->recordBackfillManagerBytesSent(resp->getApproximateSize());
657        bufferedBackfill.bytes.fetch_sub(resp->getApproximateSize());
658        if (!resp->isMetaEvent() || resp->isSystemEvent()) {
659            bufferedBackfill.items--;
660        }
661
662        // Only DcpResponse objects representing items from "disk" have a size
663        // so only update backfillRemaining when non-zero
664        if (resp->getApproximateSize()) {
665            if (backfillRemaining.load(std::memory_order_relaxed) > 0) {
666                backfillRemaining.fetch_sub(1, std::memory_order_relaxed);
667            }
668        }
669    }
670
671    if (!isBackfillTaskRunning && readyQ.empty()) {
672        // Given readyQ.empty() is True resp will be NULL
673        backfillRemaining.store(0, std::memory_order_relaxed);
674        // The previous backfill has completed.  Check to see if another
675        // backfill needs to be scheduled.
676        if (pendingBackfill) {
677            scheduleBackfill_UNLOCKED(true);
678            pendingBackfill = false;
679        } else {
680            if (lastReadSeqno.load() >= end_seqno_) {
681                endStream(END_STREAM_OK);
682            } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
683                transitionState(StreamState::TakeoverSend);
684            } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
685                endStream(END_STREAM_OK);
686            } else {
687                transitionState(StreamState::InMemory);
688            }
689
690            if (!resp) {
691                resp = nextQueuedItem();
692            }
693        }
694    }
695
696    return resp;
697}
698
699std::unique_ptr<DcpResponse> ActiveStream::inMemoryPhase() {
700    if (lastSentSeqno.load() >= end_seqno_) {
701        endStream(END_STREAM_OK);
702    } else if (readyQ.empty()) {
703        if (pendingBackfill) {
704            // Moving the state from InMemory to Backfilling will result in a
705            // backfill being scheduled
706            transitionState(StreamState::Backfilling);
707            pendingBackfill = false;
708            return NULL;
709        } else if (nextCheckpointItem()) {
710            return NULL;
711        }
712    }
713    return nextQueuedItem();
714}
715
716std::unique_ptr<DcpResponse> ActiveStream::takeoverSendPhase() {
717    VBucketPtr vb = engine->getVBucket(vb_);
718    if (vb && takeoverStart != 0 &&
719        !vb->isTakeoverBackedUp() &&
720        (ep_current_time() - takeoverStart) > takeoverSendMaxTime) {
721        vb->setTakeoverBackedUpState(true);
722    }
723
724    if (!readyQ.empty()) {
725        return nextQueuedItem();
726    } else {
727        if (nextCheckpointItem()) {
728            return NULL;
729        }
730    }
731
732    if (waitForSnapshot != 0) {
733        return NULL;
734    }
735
736    if (vb) {
737        vb->setTakeoverBackedUpState(false);
738        takeoverStart = 0;
739    }
740
741    auto producer = producerPtr.lock();
742    if (producer) {
743        if (producer->bufferLogInsert(SetVBucketState::baseMsgBytes)) {
744            transitionState(StreamState::TakeoverWait);
745            return std::make_unique<SetVBucketState>(
746                    opaque_, vb_, takeoverState);
747        }
748    }
749    return nullptr;
750}
751
752std::unique_ptr<DcpResponse> ActiveStream::takeoverWaitPhase() {
753    return nextQueuedItem();
754}
755
756std::unique_ptr<DcpResponse> ActiveStream::deadPhase() {
757    auto resp = nextQueuedItem();
758    if (!resp) {
759        log(EXTENSION_LOG_NOTICE,
760            "(vb %" PRIu16
761            ") Stream closed, "
762            "%" PRIu64
763            " items sent from backfill phase, "
764            "%" PRIu64
765            " items sent from memory phase, "
766            "%" PRIu64 " was last seqno sent",
767            vb_,
768            uint64_t(backfillItems.sent.load()),
769            uint64_t(itemsFromMemoryPhase.load()),
770            lastSentSeqno.load());
771    }
772    return resp;
773}
774
775bool ActiveStream::isCompressionEnabled() {
776    auto producer = producerPtr.lock();
777    if (producer) {
778        return producer->isCompressionEnabled();
779    }
780    /* If the 'producer' is deleted, what we return doesn't matter */
781    return false;
782}
783
784void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
785    Stream::addStats(add_stat, c);
786
787    try {
788        const int bsize = 1024;
789        char buffer[bsize];
790        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_disk_items",
791                         name_.c_str(), vb_);
792        add_casted_stat(buffer, backfillItems.disk, add_stat, c);
793        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_mem_items",
794                         name_.c_str(), vb_);
795        add_casted_stat(buffer, backfillItems.memory, add_stat, c);
796        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_sent",
797                         name_.c_str(), vb_);
798        add_casted_stat(buffer, backfillItems.sent, add_stat, c);
799        checked_snprintf(buffer, bsize, "%s:stream_%d_memory_phase",
800                         name_.c_str(), vb_);
801        add_casted_stat(buffer, itemsFromMemoryPhase.load(), add_stat, c);
802        checked_snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno",
803                         name_.c_str(), vb_);
804        add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
805        checked_snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
806                         name_.c_str(), vb_);
807        add_casted_stat(buffer,
808                        lastSentSnapEndSeqno.load(std::memory_order_relaxed),
809                        add_stat, c);
810        checked_snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno",
811                         name_.c_str(), vb_);
812        add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
813        checked_snprintf(buffer,
814                         bsize,
815                         "%s:stream_%d_last_read_seqno_unsnapshotted",
816                         name_.c_str(),
817                         vb_);
818        add_casted_stat(buffer, lastReadSeqnoUnSnapshotted.load(), add_stat, c);
819        checked_snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory",
820                         name_.c_str(), vb_);
821        add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
822        checked_snprintf(buffer, bsize, "%s:stream_%d_items_ready",
823                         name_.c_str(), vb_);
824        add_casted_stat(buffer, itemsReady.load() ? "true" : "false", add_stat,
825                        c);
826        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_bytes",
827                         name_.c_str(), vb_);
828        add_casted_stat(buffer, bufferedBackfill.bytes, add_stat, c);
829        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_items",
830                         name_.c_str(), vb_);
831        add_casted_stat(buffer, bufferedBackfill.items, add_stat, c);
832
833        if (isTakeoverSend() && takeoverStart != 0) {
834            checked_snprintf(buffer, bsize, "%s:stream_%d_takeover_since",
835                             name_.c_str(), vb_);
836            add_casted_stat(buffer, ep_current_time() - takeoverStart, add_stat,
837                            c);
838        }
839    } catch (std::exception& error) {
840        LOG(EXTENSION_LOG_WARNING,
841            "ActiveStream::addStats: Failed to build stats: %s", error.what());
842    }
843
844    filter.addStats(add_stat, c, name_, vb_);
845}
846
847void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie,
848                                    const VBucket& vb) {
849    LockHolder lh(streamMutex);
850
851    add_casted_stat("name", name_, add_stat, cookie);
852    if (!isActive()) {
853        log(EXTENSION_LOG_WARNING,
854            "(vb %" PRIu16
855            ") "
856            "ActiveStream::addTakeoverStats: Stream has "
857            "status StreamDead",
858            vb_);
859        // Return status of does_not_exist to ensure rebalance does not hang.
860        add_casted_stat("status", "does_not_exist", add_stat, cookie);
861        add_casted_stat("estimate", 0, add_stat, cookie);
862        add_casted_stat("backfillRemaining", 0, add_stat, cookie);
863        return;
864    }
865
866    size_t total = backfillRemaining.load(std::memory_order_relaxed);
867    if (isBackfilling()) {
868        add_casted_stat("status", "backfilling", add_stat, cookie);
869    } else {
870        add_casted_stat("status", "in-memory", add_stat, cookie);
871    }
872    add_casted_stat("backfillRemaining",
873                    backfillRemaining.load(std::memory_order_relaxed),
874                    add_stat, cookie);
875
876    size_t vb_items = vb.getNumItems();
877    size_t chk_items = 0;
878    if (vb_items > 0 && cursorRegistered) {
879        chk_items = vb.checkpointManager->getNumItemsForCursor(cursorName);
880    }
881
882    size_t del_items = 0;
883    try {
884        del_items = engine->getKVBucket()->getNumPersistedDeletes(vb_);
885    } catch (std::runtime_error& e) {
886        log(EXTENSION_LOG_WARNING,
887            "ActiveStream:addTakeoverStats: exception while getting num "
888            "persisted "
889            "deletes for vbucket:%" PRIu16
890            " - treating as 0 deletes. "
891            "Details: %s",
892            vb_,
893            e.what());
894    }
895
896    if (end_seqno_ < curChkSeqno) {
897        chk_items = 0;
898    } else if ((end_seqno_ - curChkSeqno) < chk_items) {
899        chk_items = end_seqno_ - curChkSeqno + 1;
900    }
901    total += chk_items;
902
903    add_casted_stat("estimate", total, add_stat, cookie);
904    add_casted_stat("chk_items", chk_items, add_stat, cookie);
905    add_casted_stat("vb_items", vb_items, add_stat, cookie);
906    add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
907}
908
909std::unique_ptr<DcpResponse> ActiveStream::nextQueuedItem() {
910    if (!readyQ.empty()) {
911        auto& response = readyQ.front();
912        auto producer = producerPtr.lock();
913        if (!producer) {
914            return nullptr;
915        }
916        if (producer->bufferLogInsert(response->getMessageSize())) {
917            auto seqno = response->getBySeqno();
918            if (seqno) {
919                lastSentSeqno.store(*seqno);
920
921                if (isBackfilling()) {
922                    backfillItems.sent++;
923                } else {
924                    itemsFromMemoryPhase++;
925                }
926            }
927
928            // See if the response is a system-event
929            processSystemEvent(response.get());
930            return popFromReadyQ();
931        }
932    }
933    return nullptr;
934}
935
936bool ActiveStream::nextCheckpointItem() {
937    VBucketPtr vbucket = engine->getVBucket(vb_);
938    if (vbucket &&
939        vbucket->checkpointManager->getNumItemsForCursor(cursorName) > 0) {
940        // schedule this stream to build the next checkpoint
941        auto producer = producerPtr.lock();
942        if (!producer) {
943            return false;
944        }
945        producer->scheduleCheckpointProcessorTask(shared_from_this());
946        return true;
947    } else if (chkptItemsExtractionInProgress) {
948        return true;
949    }
950    return false;
951}
952
953ActiveStreamCheckpointProcessorTask::ActiveStreamCheckpointProcessorTask(
954        EventuallyPersistentEngine& e, std::shared_ptr<DcpProducer> p)
955    : GlobalTask(
956              &e, TaskId::ActiveStreamCheckpointProcessorTask, INT_MAX, false),
957      description("Process checkpoint(s) for DCP producer " + p->getName()),
958      notified(false),
959      iterationsBeforeYield(
960              e.getConfiguration().getDcpProducerSnapshotMarkerYieldLimit()),
961      producerPtr(p) {
962}
963
964bool ActiveStreamCheckpointProcessorTask::run() {
965    if (engine->getEpStats().isShutdown) {
966        return false;
967    }
968
969    // Setup that we will sleep forever when done.
970    snooze(INT_MAX);
971
972    // Clear the notfification flag
973    notified.store(false);
974
975    size_t iterations = 0;
976    do {
977        std::shared_ptr<ActiveStream> stream = queuePop();
978
979        if (stream) {
980            stream->nextCheckpointItemTask();
981        } else {
982            break;
983        }
984        iterations++;
985    } while(!queueEmpty()
986            && iterations < iterationsBeforeYield);
987
988    // Now check if we were re-notified or there are still checkpoints
989    bool expected = true;
990    if (notified.compare_exchange_strong(expected, false)
991        || !queueEmpty()) {
992        // wakeUp, essentially yielding and allowing other tasks a go
993        wakeUp();
994    }
995
996    return true;
997}
998
999void ActiveStreamCheckpointProcessorTask::wakeup() {
1000    ExecutorPool::get()->wake(getId());
1001}
1002
1003void ActiveStreamCheckpointProcessorTask::schedule(
1004        std::shared_ptr<ActiveStream> stream) {
1005    pushUnique(stream->getVBucket());
1006
1007    bool expected = false;
1008    if (notified.compare_exchange_strong(expected, true)) {
1009        wakeup();
1010    }
1011}
1012
1013void ActiveStreamCheckpointProcessorTask::cancelTask() {
1014    LockHolder lh(workQueueLock);
1015    while (!queue.empty()) {
1016        queue.pop();
1017    }
1018    queuedVbuckets.clear();
1019}
1020
1021void ActiveStreamCheckpointProcessorTask::addStats(const std::string& name,
1022                                                   ADD_STAT add_stat,
1023                                                   const void* c) const {
1024    // Take a copy of the queue data under lock; then format it to stats.
1025    std::queue<VBucket::id_type> qCopy;
1026    std::unordered_set<VBucket::id_type> qMapCopy;
1027    {
1028        LockHolder lh(workQueueLock);
1029        qCopy = queue;
1030        qMapCopy = queuedVbuckets;
1031    }
1032
1033    auto prefix = name + ":ckpt_processor_";
1034    add_casted_stat((prefix + "queue_size").c_str(), qCopy.size(), add_stat, c);
1035    add_casted_stat(
1036            (prefix + "queue_map_size").c_str(), qMapCopy.size(), add_stat, c);
1037
1038    // Form a comma-separated string of the queue's contents.
1039    std::string contents;
1040    while (!qCopy.empty()) {
1041        contents += std::to_string(qCopy.front()) + ",";
1042        qCopy.pop();
1043    }
1044    if (!contents.empty()) {
1045        contents.pop_back();
1046    }
1047    add_casted_stat(
1048            (prefix + "queue_contents").c_str(), contents.c_str(), add_stat, c);
1049
1050    // Form a comma-separated string of the queue map's contents.
1051    std::string qMapContents;
1052    for (auto& vbid : qMapCopy) {
1053        qMapContents += std::to_string(vbid) + ",";
1054    }
1055    if (!qMapContents.empty()) {
1056        qMapContents.pop_back();
1057    }
1058    add_casted_stat((prefix + "queue_map_contents").c_str(),
1059                    qMapContents.c_str(),
1060                    add_stat,
1061                    c);
1062
1063    add_casted_stat((prefix + "notified").c_str(), notified, add_stat, c);
1064}
1065
1066void ActiveStream::nextCheckpointItemTask() {
1067    // MB-29369: Obtain stream mutex here
1068    LockHolder lh(streamMutex);
1069    nextCheckpointItemTask(lh);
1070}
1071
1072void ActiveStream::nextCheckpointItemTask(const LockHolder& streamMutex) {
1073    VBucketPtr vbucket = engine->getVBucket(vb_);
1074    if (vbucket) {
1075        // MB-29369: only run the task's work if the stream is in an in-memory
1076        // phase (of which takeover is a variant).
1077        if (isInMemory() || isTakeoverSend()) {
1078            auto items = getOutstandingItems(*vbucket);
1079            processItems(items, streamMutex);
1080        }
1081    } else {
1082        /* The entity deleting the vbucket must set stream to dead,
1083           calling setDead(END_STREAM_STATE) will cause deadlock because
1084           it will try to grab streamMutex which is already acquired at this
1085           point here */
1086        return;
1087    }
1088}
1089
1090std::vector<queued_item> ActiveStream::getOutstandingItems(VBucket& vb) {
1091    std::vector<queued_item> items;
1092    // Commencing item processing - set guard flag.
1093    chkptItemsExtractionInProgress.store(true);
1094
1095    auto _begin_ = ProcessClock::now();
1096    vb.checkpointManager->getAllItemsForCursor(cursorName, items);
1097    engine->getEpStats().dcpCursorsGetItemsHisto.add(
1098            std::chrono::duration_cast<std::chrono::microseconds>(
1099                    ProcessClock::now() - _begin_));
1100
1101    if (vb.checkpointManager->hasClosedCheckpointWhichCanBeRemoved()) {
1102        engine->getKVBucket()->wakeUpCheckpointRemover();
1103    }
1104    return items;
1105}
1106
1107/**
1108 * This function is used to find out if a given item's value
1109 * needs to be changed
1110 */
1111static bool shouldModifyItem(const queued_item& item,
1112                             IncludeValue includeValue,
1113                             IncludeXattrs includeXattrs,
1114                             bool isForceValueCompressionEnabled,
1115                             bool isSnappyEnabled) {
1116    // If there is no value, no modification needs to be done
1117    if (item->getValue()) {
1118        /**
1119         * If value needs to be included
1120         */
1121        if (includeValue == IncludeValue::No) {
1122            return true;
1123        }
1124
1125        /**
1126         * Check if value needs to be compressed or decompressed
1127         * If yes, then then value definitely needs modification
1128         */
1129        if (isSnappyEnabled) {
1130            if (isForceValueCompressionEnabled) {
1131                if (!mcbp::datatype::is_snappy(item->getDataType())) {
1132                    return true;
1133                }
1134            }
1135        } else {
1136            if (mcbp::datatype::is_snappy(item->getDataType())) {
1137                return true;
1138            }
1139        }
1140
1141        /**
1142         * If the value doesn't have to be compressed, then
1143         * check if xattrs need to be pruned. If not, then
1144         * value needs no modification
1145         */
1146        if (includeXattrs == IncludeXattrs::No &&
1147            mcbp::datatype::is_xattr(item->getDataType())) {
1148            return true;
1149        }
1150    }
1151
1152    return false;
1153}
1154
1155std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
1156        const queued_item& item) {
1157    // Note: This function is hot - it is called for every item to be
1158    // sent over the DCP connection.
1159    if (item->getOperation() != queue_op::system_event) {
1160        auto cKey = Collections::DocKey::make(item->getKey(), currentSeparator);
1161        if (shouldModifyItem(item, includeValue, includeXattributes,
1162                             isForceValueCompressionEnabled(),
1163                             isSnappyEnabled())) {
1164            auto finalItem = std::make_unique<Item>(*item);
1165            finalItem->pruneValueAndOrXattrs(includeValue, includeXattributes);
1166
1167            if (isSnappyEnabled()) {
1168                if (isForceValueCompressionEnabled()) {
1169                    if (!mcbp::datatype::is_snappy(finalItem->getDataType())) {
1170                        if (!finalItem->compressValue()) {
1171                            LOG(EXTENSION_LOG_WARNING,
1172                                "Failed to snappy compress an uncompressed value");
1173                        }
1174                    }
1175                }
1176            } else {
1177                if (mcbp::datatype::is_snappy(finalItem->getDataType())) {
1178                    if (!finalItem->decompressValue()) {
1179                        LOG(EXTENSION_LOG_WARNING,
1180                            "Failed to snappy uncompress a compressed value");
1181                    }
1182                }
1183            }
1184
1185            /**
1186             * Create a mutation response to be placed in the ready queue.
1187             */
1188            return std::make_unique<MutationProducerResponse>(
1189                    std::move(finalItem),
1190                    opaque_,
1191                    includeValue,
1192                    includeXattributes,
1193                    includeDeleteTime,
1194                    cKey.getCollectionLen());
1195        }
1196
1197        // Item unmodified - construct response from original.
1198        return std::make_unique<MutationProducerResponse>(
1199                item,
1200                opaque_,
1201                includeValue,
1202                includeXattributes,
1203                includeDeleteTime,
1204                cKey.getCollectionLen());
1205    }
1206    return SystemEventProducerMessage::make(opaque_, item);
1207}
1208
1209void ActiveStream::processItems(std::vector<queued_item>& items,
1210                                const LockHolder& streamMutex) {
1211    if (!items.empty()) {
1212        bool mark = false;
1213        if (items.front()->getOperation() == queue_op::checkpoint_start) {
1214            mark = true;
1215        }
1216
1217        std::deque<std::unique_ptr<DcpResponse>> mutations;
1218        for (auto& qi : items) {
1219            if (SystemEventReplicate::process(*qi) == ProcessStatus::Continue) {
1220                curChkSeqno = qi->getBySeqno();
1221                lastReadSeqnoUnSnapshotted = qi->getBySeqno();
1222                // Check if the item is allowed on the stream, note the filter
1223                // updates itself for collection deletion events
1224                if (filter.checkAndUpdate(*qi)) {
1225                    mutations.push_back(makeResponseFromItem(qi));
1226                }
1227
1228            } else if (qi->getOperation() == queue_op::checkpoint_start) {
1229                /* if there are already other mutations, then they belong to the
1230                   previous checkpoint and hence we must create a snapshot and
1231                   put them onto readyQ */
1232                if (!mutations.empty()) {
1233                    snapshot(mutations, mark);
1234                    /* clear out all the mutations since they are already put
1235                       onto the readyQ */
1236                    mutations.clear();
1237                }
1238                /* mark true as it indicates a new checkpoint snapshot */
1239                mark = true;
1240            }
1241        }
1242
1243        if (mutations.empty()) {
1244            // If we only got checkpoint start or ends check to see if there are
1245            // any more snapshots before pausing the stream.
1246            nextCheckpointItemTask(streamMutex);
1247        } else {
1248            snapshot(mutations, mark);
1249        }
1250    }
1251
1252    // After the snapshot has been processed, check if the filter is now empty
1253    // a stream with an empty filter does nothing but self close
1254    if (filter.empty()) {
1255        // Filter is now empty empty, so endStream
1256        endStream(END_STREAM_FILTER_EMPTY);
1257    }
1258
1259    // Completed item processing - clear guard flag and notify producer.
1260    chkptItemsExtractionInProgress.store(false);
1261    notifyStreamReady(true);
1262}
1263
1264void ActiveStream::snapshot(std::deque<std::unique_ptr<DcpResponse>>& items,
1265                            bool mark) {
1266    if (items.empty()) {
1267        return;
1268    }
1269
1270    /* This assumes that all items in the "items deque" is put onto readyQ */
1271    lastReadSeqno.store(lastReadSeqnoUnSnapshotted);
1272
1273    if (isCurrentSnapshotCompleted()) {
1274        uint32_t flags = MARKER_FLAG_MEMORY;
1275
1276        // Get OptionalSeqnos which for the items list types should have values
1277        auto seqnoStart = items.front()->getBySeqno();
1278        auto seqnoEnd = items.back()->getBySeqno();
1279        if (!seqnoStart || !seqnoEnd) {
1280            throw std::logic_error(
1281                    "ActiveStream::snapshot incorrect DcpEvent, missing a "
1282                    "seqno " +
1283                    std::string(items.front()->to_string()) + " " +
1284                    std::string(items.back()->to_string()));
1285        }
1286
1287        uint64_t snapStart = *seqnoStart;
1288        uint64_t snapEnd = *seqnoEnd;
1289
1290        if (mark) {
1291            flags |= MARKER_FLAG_CHK;
1292        }
1293
1294        if (isTakeoverSend()) {
1295            waitForSnapshot++;
1296            flags |= MARKER_FLAG_ACK;
1297        }
1298
1299        /* We need to send the requested 'snap_start_seqno_' as the snapshot
1300           start when we are sending the first snapshot because the first
1301           snapshot could be resumption of a previous snapshot */
1302        if (!firstMarkerSent) {
1303            snapStart = std::min(snap_start_seqno_, snapStart);
1304            firstMarkerSent = true;
1305        }
1306        pushToReadyQ(std::make_unique<SnapshotMarker>(
1307                opaque_, vb_, snapStart, snapEnd, flags));
1308        lastSentSnapEndSeqno.store(snapEnd, std::memory_order_relaxed);
1309    }
1310
1311    for (auto& item : items) {
1312        pushToReadyQ(std::move(item));
1313    }
1314}
1315
1316uint32_t ActiveStream::setDead(end_stream_status_t status) {
1317    {
1318        LockHolder lh(streamMutex);
1319        endStream(status);
1320    }
1321
1322    if (status != END_STREAM_DISCONNECTED) {
1323        notifyStreamReady();
1324    }
1325    return 0;
1326}
1327
1328void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
1329    if (isActive()) {
1330        notifyStreamReady();
1331    }
1332}
1333
1334void ActiveStream::endStream(end_stream_status_t reason) {
1335    if (isActive()) {
1336        pendingBackfill = false;
1337        if (isBackfilling()) {
1338            // If Stream were in Backfilling state, clear out the
1339            // backfilled items to clear up the backfill buffer.
1340            clear_UNLOCKED();
1341            auto producer = producerPtr.lock();
1342            if (producer) {
1343                producer->recordBackfillManagerBytesSent(
1344                        bufferedBackfill.bytes);
1345            }
1346            bufferedBackfill.bytes = 0;
1347            bufferedBackfill.items = 0;
1348        }
1349        transitionState(StreamState::Dead);
1350        if (reason != END_STREAM_DISCONNECTED) {
1351            pushToReadyQ(
1352                    std::make_unique<StreamEndResponse>(opaque_, reason, vb_));
1353        }
1354        log(EXTENSION_LOG_NOTICE,
1355            "(vb %" PRIu16
1356            ") Stream closing, "
1357            "sent until seqno %" PRIu64
1358            " "
1359            "remaining items %" PRIu64
1360            ", "
1361            "reason: %s",
1362            vb_,
1363            lastSentSeqno.load(),
1364            uint64_t(readyQ_non_meta_items.load()),
1365            getEndStreamStatusStr(reason).c_str());
1366    }
1367}
1368
1369void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
1370    if (isBackfillTaskRunning) {
1371        log(EXTENSION_LOG_NOTICE,
1372            "(vb %" PRIu16
1373            ") Skipping "
1374            "scheduleBackfill_UNLOCKED; "
1375            "lastReadSeqno %" PRIu64
1376            ", reschedule flag "
1377            ": %s",
1378            vb_,
1379            lastReadSeqno.load(),
1380            reschedule ? "True" : "False");
1381        return;
1382    }
1383
1384    VBucketPtr vbucket = engine->getVBucket(vb_);
1385    if (!vbucket) {
1386        log(EXTENSION_LOG_WARNING,
1387            "(vb %" PRIu16
1388            ") Failed to schedule "
1389            "backfill as unable to get vbucket; "
1390            "lastReadSeqno : %" PRIu64
1391            ", "
1392            "reschedule : %s",
1393            vb_,
1394            lastReadSeqno.load(),
1395            reschedule ? "True" : "False");
1396        return;
1397    }
1398
1399    auto producer = producerPtr.lock();
1400    if (!producer) {
1401        log(EXTENSION_LOG_WARNING,
1402            "(vb %" PRIu16
1403            ") Aborting scheduleBackfill_UNLOCKED() "
1404            "as the producer conn is deleted; "
1405            "lastReadSeqno : %" PRIu64
1406            ", "
1407            "reschedule : %s",
1408            vb_,
1409            lastReadSeqno.load(),
1410            reschedule ? "True" : "False");
1411        return;
1412    }
1413
1414    uint64_t backfillStart = lastReadSeqno.load() + 1;
1415    uint64_t backfillEnd = 0;
1416    bool tryBackfill = false;
1417
1418    if ((flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) || reschedule) {
1419        uint64_t vbHighSeqno = static_cast<uint64_t>(vbucket->getHighSeqno());
1420        if (lastReadSeqno.load() > vbHighSeqno) {
1421            throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
1422                                   "lastReadSeqno (which is " +
1423                                   std::to_string(lastReadSeqno.load()) +
1424                                   " ) is greater than vbHighSeqno (which is " +
1425                                   std::to_string(vbHighSeqno) + " ). " +
1426                                   "for stream " + producer->logHeader() +
1427                                   "; vb " + std::to_string(vb_));
1428        }
1429        if (reschedule) {
1430            /* We need to do this for reschedule because in case of
1431               DCP_ADD_STREAM_FLAG_DISKONLY (the else part), end_seqno_ is
1432               set to last persisted seqno befor calling
1433               scheduleBackfill_UNLOCKED() */
1434            backfillEnd = engine->getKVBucket()->getLastPersistedSeqno(vb_);
1435        } else {
1436            backfillEnd = end_seqno_;
1437        }
1438        tryBackfill = true;
1439    } else {
1440        try {
1441            std::tie(curChkSeqno, tryBackfill) =
1442                    vbucket->checkpointManager->registerCursorBySeqno(
1443                            cursorName,
1444                            lastReadSeqno.load(),
1445                            MustSendCheckpointEnd::NO);
1446            cursorRegistered = true;
1447        } catch(std::exception& error) {
1448            log(EXTENSION_LOG_WARNING,
1449                "(vb %" PRIu16
1450                ") Failed to register "
1451                "cursor: %s",
1452                vb_,
1453                error.what());
1454            endStream(END_STREAM_STATE);
1455        }
1456
1457        if (lastReadSeqno.load() > curChkSeqno) {
1458            throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
1459                                   "lastReadSeqno (which is " +
1460                                   std::to_string(lastReadSeqno.load()) +
1461                                   " ) is greater than curChkSeqno (which is " +
1462                                   std::to_string(curChkSeqno) + " ). " +
1463                                   "for stream " + producer->logHeader() +
1464                                   "; vb " + std::to_string(vb_));
1465        }
1466
1467        /* We need to find the minimum seqno that needs to be backfilled in
1468         * order to make sure that we don't miss anything when transitioning
1469         * to a memory snapshot. The backfill task will always make sure that
1470         * the backfill end seqno is contained in the backfill.
1471         */
1472        if (backfillStart < curChkSeqno) {
1473            if (curChkSeqno > end_seqno_) {
1474                /* Backfill only is enough */
1475                backfillEnd = end_seqno_;
1476            } else {
1477                /* Backfill + in-memory streaming */
1478                backfillEnd = curChkSeqno - 1;
1479            }
1480        }
1481    }
1482
1483    if (backfillStart <= backfillEnd && tryBackfill) {
1484        log(EXTENSION_LOG_NOTICE,
1485            "(vb %" PRIu16
1486            ") Scheduling backfill "
1487            "from %" PRIu64 " to %" PRIu64
1488            ", reschedule "
1489            "flag : %s",
1490            vb_,
1491            backfillStart,
1492            backfillEnd,
1493            reschedule ? "True" : "False");
1494        producer->scheduleBackfillManager(
1495                *vbucket, shared_from_this(), backfillStart, backfillEnd);
1496        isBackfillTaskRunning.store(true);
1497    } else {
1498        if (reschedule) {
1499            // Infrequent code path, see comment below.
1500            log(EXTENSION_LOG_NOTICE,
1501                "(vb %" PRIu16
1502                ") Did not schedule "
1503                "backfill with reschedule : True, "
1504                "tryBackfill : True; "
1505                "backfillStart : %" PRIu64
1506                ", "
1507                "backfillEnd : %" PRIu64
1508                ", "
1509                "flags_ : %" PRIu32
1510                ", "
1511                "start_seqno_ : %" PRIu64
1512                ", "
1513                "end_seqno_ : %" PRIu64
1514                ", "
1515                "lastReadSeqno : %" PRIu64
1516                ", "
1517                "lastSentSeqno : %" PRIu64
1518                ", "
1519                "curChkSeqno : %" PRIu64
1520                ", "
1521                "itemsReady : %s",
1522                vb_,
1523                backfillStart,
1524                backfillEnd,
1525                flags_,
1526                start_seqno_,
1527                end_seqno_,
1528                lastReadSeqno.load(),
1529                lastSentSeqno.load(),
1530                curChkSeqno.load(),
1531                itemsReady ? "True" : "False");
1532
1533            /* Cursor was dropped, but we will not do backfill.
1534             * This may happen in a corner case where, the memory usage is high
1535             * due to other vbuckets and persistence cursor moves ahead of
1536             * replication cursor to new checkpoint open but does not persist
1537             * items yet.
1538             *
1539             * Because we dropped the cursor but did not do a backfill (and
1540             * therefore did not re-register a cursor in markDiskSnapshot) we
1541             * must re-register the cursor here.
1542             */
1543            try {
1544                CursorRegResult result =
1545                        vbucket->checkpointManager->registerCursorBySeqno(
1546                                cursorName,
1547                                lastReadSeqno.load(),
1548                                MustSendCheckpointEnd::NO);
1549                cursorRegistered = true;
1550                curChkSeqno = result.first;
1551            } catch (std::exception& error) {
1552                log(EXTENSION_LOG_WARNING,
1553                    "(vb %" PRIu16
1554                    ") Failed to register "
1555                    "cursor: %s",
1556                    vb_,
1557                    error.what());
1558                endStream(END_STREAM_STATE);
1559            }
1560        }
1561        if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
1562            endStream(END_STREAM_OK);
1563        } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
1564            transitionState(StreamState::TakeoverSend);
1565        } else {
1566            transitionState(StreamState::InMemory);
1567        }
1568        if (reschedule) {
1569            /*
1570             * It is not absolutely necessary to notify immediately as conn
1571             * manager or an incoming item will cause a notification eventually,
1572             * but wouldn't hurt to do so.
1573             *
1574             * Note: must not notify when we schedule a backfill for the first
1575             * time (i.e. when reschedule is false) because the stream is not
1576             * yet in producer conn list of streams.
1577             */
1578            notifyStreamReady();
1579        }
1580    }
1581}
1582
1583bool ActiveStream::handleSlowStream() {
1584    LockHolder lh(streamMutex);
1585    log(EXTENSION_LOG_NOTICE,
1586        "(vb %" PRIu16
1587        ") Handling slow stream; "
1588        "state_ : %s, "
1589        "lastReadSeqno : %" PRIu64
1590        ", "
1591        "lastSentSeqno : %" PRIu64
1592        ", "
1593        "vBucketHighSeqno : %" PRIu64
1594        ", "
1595        "isBackfillTaskRunning : %s",
1596        vb_,
1597        to_string(state_.load()).c_str(),
1598        lastReadSeqno.load(),
1599        lastSentSeqno.load(),
1600        engine->getVBucket(vb_)->getHighSeqno(),
1601        isBackfillTaskRunning.load() ? "True" : "False");
1602
1603    bool status = false;
1604    switch (state_.load()) {
1605        case StreamState::Backfilling:
1606        case StreamState::InMemory:
1607            /* Drop the existing cursor and set pending backfill */
1608            status = dropCheckpointCursor_UNLOCKED();
1609            pendingBackfill = true;
1610            return status;
1611        case StreamState::TakeoverSend:
1612            /* To be handled later if needed */
1613        case StreamState::TakeoverWait:
1614            /* To be handled later if needed */
1615        case StreamState::Dead:
1616            /* To be handled later if needed */
1617            return false;
1618        case StreamState::Pending:
1619        case StreamState::Reading: {
1620            auto producer = producerPtr.lock();
1621            std::string connHeader =
1622                    producer ? producer->logHeader()
1623                             : "DCP (Producer): **Deleted conn**";
1624            throw std::logic_error(
1625                    "ActiveStream::handleSlowStream: "
1626                    "called with state " +
1627                    to_string(state_.load()) +
1628                    " "
1629                    "for stream " +
1630                    connHeader + "; vb " + std::to_string(vb_));
1631        }
1632    }
1633    return false;
1634}
1635
1636std::string ActiveStream::getEndStreamStatusStr(end_stream_status_t status) {
1637    switch (status) {
1638    case END_STREAM_OK:
1639        return "The stream ended due to all items being streamed";
1640    case END_STREAM_CLOSED:
1641        return "The stream closed early due to a close stream message";
1642    case END_STREAM_STATE:
1643        return "The stream closed early because the vbucket state changed";
1644    case END_STREAM_DISCONNECTED:
1645        return "The stream closed early because the conn was disconnected";
1646    case END_STREAM_SLOW:
1647        return "The stream was closed early because it was too slow";
1648    case END_STREAM_BACKFILL_FAIL:
1649        return "The stream closed early due to backfill failure";
1650    case END_STREAM_ROLLBACK:
1651        return "The stream closed early because the vbucket rollback'ed";
1652    case END_STREAM_FILTER_EMPTY:
1653        return "The stream closed because all of the filtered collections "
1654               "were deleted";
1655    }
1656    return std::string{"Status unknown: " + std::to_string(status) +
1657                       "; this should not have happened!"};
1658}
1659
1660void ActiveStream::transitionState(StreamState newState) {
1661    if (state_ == newState) {
1662        return;
1663    }
1664
1665    EXTENSION_LOG_LEVEL logLevel = getTransitionStateLogLevel(state_, newState);
1666    log(logLevel,
1667        "ActiveStream::transitionState: (vb %d) "
1668        "Transitioning from %s to %s",
1669        vb_,
1670        to_string(state_.load()).c_str(),
1671        to_string(newState).c_str());
1672
1673    bool validTransition = false;
1674    switch (state_.load()) {
1675        case StreamState::Pending:
1676            if (newState == StreamState::Backfilling ||
1677                    newState == StreamState::Dead) {
1678                validTransition = true;
1679            }
1680            break;
1681        case StreamState::Backfilling:
1682            if(newState == StreamState::InMemory ||
1683               newState == StreamState::TakeoverSend ||
1684               newState == StreamState::Dead) {
1685                validTransition = true;
1686            }
1687            break;
1688        case StreamState::InMemory:
1689            if (newState == StreamState::Backfilling ||
1690                    newState == StreamState::Dead) {
1691                validTransition = true;
1692            }
1693            break;
1694        case StreamState::TakeoverSend:
1695            if (newState == StreamState::TakeoverWait ||
1696                    newState == StreamState::Dead) {
1697                validTransition = true;
1698            }
1699            break;
1700        case StreamState::TakeoverWait:
1701            if (newState == StreamState::TakeoverSend ||
1702                    newState == StreamState::Dead) {
1703                validTransition = true;
1704            }
1705            break;
1706        case StreamState::Reading:
1707            // Active stream should never be in READING state.
1708            validTransition = false;
1709            break;
1710        case StreamState::Dead:
1711            // Once DEAD, no other transitions should occur.
1712            validTransition = false;
1713            break;
1714    }
1715
1716    if (!validTransition) {
1717        throw std::invalid_argument("ActiveStream::transitionState:"
1718                " newState (which is " + to_string(newState) +
1719                ") is not valid for current state (which is " +
1720                to_string(state_.load()) + ")");
1721    }
1722
1723    StreamState oldState = state_.load();
1724    state_ = newState;
1725
1726    switch (newState) {
1727        case StreamState::Backfilling:
1728            if (StreamState::Pending == oldState) {
1729                scheduleBackfill_UNLOCKED(false /* reschedule */);
1730            } else if (StreamState::InMemory == oldState) {
1731                scheduleBackfill_UNLOCKED(true /* reschedule */);
1732            }
1733            break;
1734        case StreamState::InMemory:
1735            // Check if the producer has sent up till the last requested
1736            // sequence number already, if not - move checkpoint items into
1737            // the ready queue.
1738            if (lastSentSeqno.load() >= end_seqno_) {
1739                // Stream transitioning to DEAD state
1740                endStream(END_STREAM_OK);
1741                notifyStreamReady();
1742            } else {
1743                nextCheckpointItem();
1744            }
1745            break;
1746        case StreamState::TakeoverSend:
1747            takeoverStart = ep_current_time();
1748            if (!nextCheckpointItem()) {
1749                notifyStreamReady(true);
1750            }
1751            break;
1752        case StreamState::Dead:
1753            removeCheckpointCursor();
1754            break;
1755        case StreamState::TakeoverWait:
1756        case StreamState::Pending:
1757            break;
1758        case StreamState::Reading:
1759            throw std::logic_error("ActiveStream::transitionState:"
1760                    " newState can't be " + to_string(newState) +
1761                    "!");
1762    }
1763}
1764
1765size_t ActiveStream::getItemsRemaining() {
1766    VBucketPtr vbucket = engine->getVBucket(vb_);
1767
1768    if (!vbucket || !isActive()) {
1769        return 0;
1770    }
1771
1772    // Items remaining is the sum of:
1773    // (a) Items outstanding in checkpoints
1774    // (b) Items pending in our readyQ, excluding any meta items.
1775    size_t ckptItems = 0;
1776    if (cursorRegistered) {
1777        ckptItems =
1778                vbucket->checkpointManager->getNumItemsForCursor(cursorName);
1779    }
1780    return ckptItems + readyQ_non_meta_items;
1781}
1782
1783uint64_t ActiveStream::getLastReadSeqno() const {
1784    return lastReadSeqno.load();
1785}
1786
1787uint64_t ActiveStream::getLastSentSeqno() const {
1788    return lastSentSeqno.load();
1789}
1790
1791void ActiveStream::log(EXTENSION_LOG_LEVEL severity,
1792                       const char* fmt,
1793                       ...) const {
1794    va_list va;
1795    va_start(va, fmt);
1796    auto producer = producerPtr.lock();
1797    if (producer) {
1798        producer->getLogger().vlog(severity, fmt, va);
1799    } else {
1800        static Logger defaultLogger =
1801                Logger("DCP (Producer): **Deleted conn**");
1802        defaultLogger.vlog(severity, fmt, va);
1803    }
1804    va_end(va);
1805}
1806
1807bool ActiveStream::isCurrentSnapshotCompleted() const
1808{
1809    VBucketPtr vbucket = engine->getVBucket(vb_);
1810    // An atomic read of vbucket state without acquiring the
1811    // reader lock for state should suffice here.
1812    if (vbucket && vbucket->getState() == vbucket_state_replica) {
1813        if (lastSentSnapEndSeqno.load(std::memory_order_relaxed) >=
1814            lastReadSeqno) {
1815            return false;
1816        }
1817    }
1818    return true;
1819}
1820
1821bool ActiveStream::dropCheckpointCursor_UNLOCKED() {
1822    VBucketPtr vbucket = engine->getVBucket(vb_);
1823    if (!vbucket) {
1824        endStream(END_STREAM_STATE);
1825        notifyStreamReady();
1826    }
1827    /* Drop the existing cursor */
1828    cursorRegistered = false;
1829    return vbucket->checkpointManager->removeCursor(cursorName);
1830}
1831
1832EXTENSION_LOG_LEVEL ActiveStream::getTransitionStateLogLevel(
1833        StreamState currState, StreamState newState) {
1834    if ((currState == StreamState::Pending) ||
1835        (newState == StreamState::Dead)) {
1836        return EXTENSION_LOG_INFO;
1837    }
1838    return EXTENSION_LOG_NOTICE;
1839}
1840
1841void ActiveStream::processSystemEvent(DcpResponse* response) {
1842    if (response->getEvent() == DcpResponse::Event::SystemEvent) {
1843        auto se = static_cast<SystemEventProducerMessage*>(response);
1844        if (se->getSystemEvent() == mcbp::systemevent::id::CollectionsSeparatorChanged) {
1845            currentSeparator =
1846                    std::string(se->getKey().data(), se->getKey().size());
1847            // filter needs new separator?
1848        }
1849    }
1850}
1851
1852void ActiveStream::notifyStreamReady(bool force) {
1853    auto producer = producerPtr.lock();
1854    if (!producer) {
1855        return;
1856    }
1857
1858    bool inverse = false;
1859    if (force || itemsReady.compare_exchange_strong(inverse, true)) {
1860        producer->notifyStreamReady(vb_);
1861    }
1862}
1863
1864void ActiveStream::removeCheckpointCursor() {
1865    VBucketPtr vb = engine->getVBucket(vb_);
1866    if (vb) {
1867        cursorRegistered = false;
1868        vb->checkpointManager->removeCursor(cursorName);
1869    }
1870}
1871
1872std::atomic<uint64_t> ActiveStream::cursorUID;
1873
1874NotifierStream::NotifierStream(EventuallyPersistentEngine* e,
1875                               std::shared_ptr<DcpProducer> p,
1876                               const std::string& name,
1877                               uint32_t flags,
1878                               uint32_t opaque,
1879                               uint16_t vb,
1880                               uint64_t st_seqno,
1881                               uint64_t en_seqno,
1882                               uint64_t vb_uuid,
1883                               uint64_t snap_start_seqno,
1884                               uint64_t snap_end_seqno)
1885    : Stream(name,
1886             flags,
1887             opaque,
1888             vb,
1889             st_seqno,
1890             en_seqno,
1891             vb_uuid,
1892             snap_start_seqno,
1893             snap_end_seqno,
1894             Type::Notifier),
1895      producerPtr(p) {
1896    LockHolder lh(streamMutex);
1897    VBucketPtr vbucket = e->getVBucket(vb_);
1898    if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
1899        pushToReadyQ(std::make_unique<StreamEndResponse>(
1900                opaque_, END_STREAM_OK, vb_));
1901        transitionState(StreamState::Dead);
1902        itemsReady.store(true);
1903    }
1904
1905    p->getLogger().log(EXTENSION_LOG_NOTICE,
1906        "(vb %d) stream created with start seqno %" PRIu64 " and end seqno %"
1907        PRIu64, vb, st_seqno, en_seqno);
1908}
1909
1910uint32_t NotifierStream::setDead(end_stream_status_t status) {
1911    std::unique_lock<std::mutex> lh(streamMutex);
1912    if (isActive()) {
1913        transitionState(StreamState::Dead);
1914        if (status != END_STREAM_DISCONNECTED) {
1915            pushToReadyQ(
1916                    std::make_unique<StreamEndResponse>(opaque_, status, vb_));
1917            lh.unlock();
1918            notifyStreamReady();
1919        }
1920    }
1921    return 0;
1922}
1923
1924void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
1925    std::unique_lock<std::mutex> lh(streamMutex);
1926    if (isActive() && start_seqno_ < seqno) {
1927        pushToReadyQ(std::make_unique<StreamEndResponse>(
1928                opaque_, END_STREAM_OK, vb_));
1929        transitionState(StreamState::Dead);
1930        lh.unlock();
1931        notifyStreamReady();
1932    }
1933}
1934
1935std::unique_ptr<DcpResponse> NotifierStream::next() {
1936    LockHolder lh(streamMutex);
1937
1938    if (readyQ.empty()) {
1939        itemsReady.store(false);
1940        return nullptr;
1941    }
1942
1943    auto& response = readyQ.front();
1944    auto producer = producerPtr.lock();
1945    if (producer && producer->bufferLogInsert(response->getMessageSize())) {
1946        return popFromReadyQ();
1947    }
1948    return nullptr;
1949}
1950
1951void NotifierStream::transitionState(StreamState newState) {
1952    log(EXTENSION_LOG_INFO,
1953        "NotifierStream::transitionState: (vb %d) "
1954        "Transitioning from %s to %s",
1955        vb_,
1956        to_string(state_.load()).c_str(),
1957        to_string(newState).c_str());
1958
1959    if (state_ == newState) {
1960        return;
1961    }
1962
1963    bool validTransition = false;
1964    switch (state_.load()) {
1965        case StreamState::Pending:
1966            if (newState == StreamState::Dead) {
1967                validTransition = true;
1968            }
1969            break;
1970
1971        case StreamState::Backfilling:
1972        case StreamState::InMemory:
1973        case StreamState::TakeoverSend:
1974        case StreamState::TakeoverWait:
1975        case StreamState::Reading:
1976        case StreamState::Dead:
1977            // No other state transitions are valid for a notifier stream.
1978            break;
1979    }
1980
1981    if (!validTransition) {
1982        throw std::invalid_argument("NotifierStream::transitionState:"
1983                " newState (which is " + to_string(newState) +
1984                ") is not valid for current state (which is " +
1985                to_string(state_.load()) + ")");
1986    }
1987    state_ = newState;
1988}
1989
1990void NotifierStream::addStats(ADD_STAT add_stat, const void* c) {
1991    Stream::addStats(add_stat, c);
1992}
1993
1994void NotifierStream::log(EXTENSION_LOG_LEVEL severity,
1995                         const char* fmt,
1996                         ...) const {
1997    va_list va;
1998    va_start(va, fmt);
1999    auto producer = producerPtr.lock();
2000    if (producer) {
2001        producer->getLogger().vlog(severity, fmt, va);
2002    } else {
2003        static Logger defaultLogger =
2004                Logger("DCP (Notifier): **Deleted conn**");
2005        defaultLogger.vlog(severity, fmt, va);
2006    }
2007    va_end(va);
2008}
2009
2010void NotifierStream::notifyStreamReady() {
2011    auto producer = producerPtr.lock();
2012    if (!producer) {
2013        return;
2014    }
2015
2016    bool inverse = false;
2017    if (itemsReady.compare_exchange_strong(inverse, true)) {
2018        producer->notifyStreamReady(vb_);
2019    }
2020}
2021
2022PassiveStream::PassiveStream(EventuallyPersistentEngine* e,
2023                             std::shared_ptr<DcpConsumer> c,
2024                             const std::string& name,
2025                             uint32_t flags,
2026                             uint32_t opaque,
2027                             uint16_t vb,
2028                             uint64_t st_seqno,
2029                             uint64_t en_seqno,
2030                             uint64_t vb_uuid,
2031                             uint64_t snap_start_seqno,
2032                             uint64_t snap_end_seqno,
2033                             uint64_t vb_high_seqno)
2034    : Stream(name,
2035             flags,
2036             opaque,
2037             vb,
2038             st_seqno,
2039             en_seqno,
2040             vb_uuid,
2041             snap_start_seqno,
2042             snap_end_seqno,
2043             Type::Passive),
2044      engine(e),
2045      consumerPtr(c),
2046      last_seqno(vb_high_seqno),
2047      cur_snapshot_start(0),
2048      cur_snapshot_end(0),
2049      cur_snapshot_type(Snapshot::None),
2050      cur_snapshot_ack(false) {
2051    LockHolder lh(streamMutex);
2052    streamRequest_UNLOCKED(vb_uuid);
2053    itemsReady.store(true);
2054}
2055
2056PassiveStream::~PassiveStream() {
2057    uint32_t unackedBytes = clearBuffer_UNLOCKED();
2058    if (state_ != StreamState::Dead) {
2059        // Destructed a "live" stream, log it.
2060        log(EXTENSION_LOG_NOTICE,
2061            "(vb %" PRId16
2062            ") Destructing stream."
2063            " last_seqno is %" PRIu64 ", unAckedBytes is %" PRIu32 ".",
2064            vb_,
2065            last_seqno.load(),
2066            unackedBytes);
2067    }
2068}
2069
2070void PassiveStream::streamRequest(uint64_t vb_uuid) {
2071    {
2072        std::unique_lock<std::mutex> lh(streamMutex);
2073        streamRequest_UNLOCKED(vb_uuid);
2074    }
2075    notifyStreamReady();
2076}
2077
2078void PassiveStream::streamRequest_UNLOCKED(uint64_t vb_uuid) {
2079    /* the stream should send a don't care vb_uuid if start_seqno is 0 */
2080    pushToReadyQ(std::make_unique<StreamRequest>(vb_,
2081                                                 opaque_,
2082                                                 flags_,
2083                                                 start_seqno_,
2084                                                 end_seqno_,
2085                                                 start_seqno_ ? vb_uuid : 0,
2086                                                 snap_start_seqno_,
2087                                                 snap_end_seqno_));
2088
2089    const char* type = (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER)
2090        ? "takeover stream" : "stream";
2091    log(EXTENSION_LOG_NOTICE,
2092        "(vb %" PRId16 ") Attempting to add %s: opaque_:%" PRIu32
2093        ", "
2094        "start_seqno_:%" PRIu64 ", end_seqno_:%" PRIu64
2095        ", "
2096        "vb_uuid:%" PRIu64 ", snap_start_seqno_:%" PRIu64
2097        ", "
2098        "snap_end_seqno_:%" PRIu64 ", last_seqno:%" PRIu64,
2099        vb_,
2100        type,
2101        opaque_,
2102        start_seqno_,
2103        end_seqno_,
2104        vb_uuid,
2105        snap_start_seqno_,
2106        snap_end_seqno_,
2107        last_seqno.load());
2108}
2109
2110uint32_t PassiveStream::setDead(end_stream_status_t status) {
2111    /* Hold buffer lock so that we clear out all items before we set the stream
2112       to dead state. We do not want to add any new message to the buffer or
2113       process any items in the buffer once we set the stream state to dead. */
2114    std::unique_lock<std::mutex> lg(buffer.bufMutex);
2115    uint32_t unackedBytes = clearBuffer_UNLOCKED();
2116    bool killed = false;
2117
2118    LockHolder slh(streamMutex);
2119    if (transitionState(StreamState::Dead)) {
2120        killed = true;
2121    }
2122
2123    if (killed) {
2124        EXTENSION_LOG_LEVEL logLevel = EXTENSION_LOG_NOTICE;
2125        if (END_STREAM_DISCONNECTED == status) {
2126            logLevel = EXTENSION_LOG_WARNING;
2127        }
2128        log(logLevel,
2129            "(vb %" PRId16
2130            ") Setting stream to dead state, last_seqno is %" PRIu64
2131            ", unAckedBytes is %" PRIu32 ", status is %s",
2132            vb_,
2133            last_seqno.load(),
2134            unackedBytes,
2135            getEndStreamStatusStr(status).c_str());
2136    }
2137    return unackedBytes;
2138}
2139
2140void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
2141    std::unique_lock<std::mutex> lh(streamMutex);
2142    if (isPending()) {
2143        if (status == ENGINE_SUCCESS) {
2144            transitionState(StreamState::Reading);
2145        } else {
2146            transitionState(StreamState::Dead);
2147        }
2148        pushToReadyQ(std::make_unique<AddStreamResponse>(
2149                add_opaque, opaque_, status));
2150        lh.unlock();
2151        notifyStreamReady();
2152    }
2153}
2154
2155void PassiveStream::reconnectStream(VBucketPtr &vb,
2156                                    uint32_t new_opaque,
2157                                    uint64_t start_seqno) {
2158    /* the stream should send a don't care vb_uuid if start_seqno is 0 */
2159    vb_uuid_ = start_seqno ? vb->failovers->getLatestEntry().vb_uuid : 0;
2160
2161    snapshot_info_t info = vb->checkpointManager->getSnapshotInfo();
2162    if (info.range.end == info.start) {
2163        info.range.start = info.start;
2164    }
2165
2166    snap_start_seqno_ = info.range.start;
2167    start_seqno_ = info.start;
2168    snap_end_seqno_ = info.range.end;
2169
2170    log(EXTENSION_LOG_NOTICE,
2171        "(vb %d) Attempting to reconnect stream with opaque %" PRIu32
2172        ", start seq no %" PRIu64 ", end seq no %" PRIu64
2173        ", snap start seqno %" PRIu64 ", and snap end seqno %" PRIu64,
2174        vb_,
2175        new_opaque,
2176        start_seqno,
2177        end_seqno_,
2178        snap_start_seqno_,
2179        snap_end_seqno_);
2180    {
2181        LockHolder lh(streamMutex);
2182        last_seqno.store(start_seqno);
2183        pushToReadyQ(std::make_unique<StreamRequest>(vb_,
2184                                                     new_opaque,
2185                                                     flags_,
2186                                                     start_seqno,
2187                                                     end_seqno_,
2188                                                     vb_uuid_,
2189                                                     snap_start_seqno_,
2190                                                     snap_end_seqno_));
2191    }
2192    notifyStreamReady();
2193}
2194
2195ENGINE_ERROR_CODE PassiveStream::messageReceived(std::unique_ptr<DcpResponse> dcpResponse) {
2196    if (!dcpResponse) {
2197        return ENGINE_EINVAL;
2198    }
2199
2200    if (!isActive()) {
2201        return ENGINE_KEY_ENOENT;
2202    }
2203
2204    auto seqno = dcpResponse->getBySeqno();
2205    if (seqno) {
2206        if (uint64_t(*seqno) <= last_seqno.load()) {
2207            log(EXTENSION_LOG_WARNING,
2208                "(vb %d) Erroneous (out of sequence) message (%s) received, "
2209                "with opaque: %" PRIu32 ", its seqno (%" PRIu64
2210                ") is not "
2211                "greater than last received seqno (%" PRIu64
2212                "); "
2213                "Dropping mutation!",
2214                vb_,
2215                dcpResponse->to_string(),
2216                opaque_,
2217                *seqno,
2218                last_seqno.load());
2219            return ENGINE_ERANGE;
2220        }
2221        last_seqno.store(*seqno);
2222    } else if(dcpResponse->getEvent() == DcpResponse::Event::SnapshotMarker) {
2223        auto s = static_cast<SnapshotMarker*>(dcpResponse.get());
2224        uint64_t snapStart = s->getStartSeqno();
2225        uint64_t snapEnd = s->getEndSeqno();
2226        if (snapStart < last_seqno.load() && snapEnd <= last_seqno.load()) {
2227            log(EXTENSION_LOG_WARNING,
2228                "(vb %d) Erroneous snapshot marker received, with "
2229                "opaque: %" PRIu32
2230                ", its start "
2231                "(%" PRIu64 "), and end (%" PRIu64
2232                ") are less than last "
2233                "received seqno (%" PRIu64 "); Dropping marker!",
2234                vb_,
2235                opaque_,
2236                snapStart,
2237                snapEnd,
2238                last_seqno.load());
2239            return ENGINE_ERANGE;
2240        }
2241    }
2242
2243    switch (engine->getReplicationThrottle().getStatus()) {
2244    case ReplicationThrottle::Status::Disconnect:
2245        log(EXTENSION_LOG_WARNING,
2246            "vb:%" PRIu16
2247            " Disconnecting the connection as there is "
2248            "no memory to complete replication",
2249            vb_);
2250        return ENGINE_DISCONNECT;
2251    case ReplicationThrottle::Status::Process:
2252        if (buffer.empty()) {
2253            /* Process the response here itself rather than buffering it */
2254            ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2255            switch (dcpResponse->getEvent()) {
2256            case DcpResponse::Event::Mutation:
2257                ret = processMutation(
2258                        static_cast<MutationResponse*>(dcpResponse.get()));
2259                break;
2260            case DcpResponse::Event::Deletion:
2261            case DcpResponse::Event::Expiration:
2262                ret = processDeletion(
2263                        static_cast<MutationResponse*>(dcpResponse.get()));
2264                break;
2265            case DcpResponse::Event::SnapshotMarker:
2266                processMarker(static_cast<SnapshotMarker*>(dcpResponse.get()));
2267                break;
2268            case DcpResponse::Event::SetVbucket:
2269                processSetVBucketState(
2270                        static_cast<SetVBucketState*>(dcpResponse.get()));
2271                break;
2272            case DcpResponse::Event::StreamEnd: {
2273                LockHolder lh(streamMutex);
2274                transitionState(StreamState::Dead);
2275            } break;
2276            case DcpResponse::Event::SystemEvent: {
2277                ret = processSystemEvent(
2278                        *static_cast<SystemEventMessage*>(dcpResponse.get()));
2279                break;
2280            }
2281            default:
2282                log(EXTENSION_LOG_WARNING,
2283                    "(vb %d) Unknown event:%d, opaque:%" PRIu32,
2284                    vb_,
2285                    int(dcpResponse->getEvent()),
2286                    opaque_);
2287                return ENGINE_DISCONNECT;
2288            }
2289
2290            if (ret == ENGINE_ENOMEM) {
2291                if (engine->getReplicationThrottle().doDisconnectOnNoMem()) {
2292                    log(EXTENSION_LOG_WARNING,
2293                        "vb:%" PRIu16
2294                        " Disconnecting the connection as there is no "
2295                        "memory to complete replication; process dcp "
2296                        "event returned no memory",
2297                        vb_);
2298                    return ENGINE_DISCONNECT;
2299                }
2300            }
2301
2302            if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
2303                return ret;
2304            }
2305        }
2306        break;
2307    case ReplicationThrottle::Status::Pause:
2308        /* Do nothing specific here, we buffer item for this case and
2309           other cases below */
2310        break;
2311    }
2312
2313    // Only buffer if the stream is not dead
2314    if (isActive()) {
2315        buffer.push(std::move(dcpResponse));
2316    }
2317    return ENGINE_TMPFAIL;
2318}
2319
2320process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes,
2321                                                             size_t batchSize) {
2322    std::unique_lock<std::mutex> lh(buffer.bufMutex);
2323    uint32_t count = 0;
2324    uint32_t message_bytes = 0;
2325    uint32_t total_bytes_processed = 0;
2326    bool failed = false, noMem = false;
2327
2328    while (count < batchSize && !buffer.messages.empty()) {
2329        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2330        /* If the stream is in dead state we should not process any remaining
2331           items in the buffer, we should rather clear them */
2332        if (!isActive()) {
2333            total_bytes_processed += clearBuffer_UNLOCKED();
2334            processed_bytes = total_bytes_processed;
2335            return all_processed;
2336        }
2337
2338        std::unique_ptr<DcpResponse> response = buffer.pop_front(lh);
2339
2340        // Release bufMutex whilst we attempt to process the message
2341        // a lock inversion exists with connManager if we hold this.
2342        lh.unlock();
2343
2344        message_bytes = response->getMessageSize();
2345
2346        switch (response->getEvent()) {
2347            case DcpResponse::Event::Mutation:
2348                ret = processMutation(static_cast<MutationResponse*>(response.get()));
2349                break;
2350            case DcpResponse::Event::Deletion:
2351            case DcpResponse::Event::Expiration:
2352                ret = processDeletion(static_cast<MutationResponse*>(response.get()));
2353                break;
2354            case DcpResponse::Event::SnapshotMarker:
2355                processMarker(static_cast<SnapshotMarker*>(response.get()));
2356                break;
2357            case DcpResponse::Event::SetVbucket:
2358                processSetVBucketState(static_cast<SetVBucketState*>(response.get()));
2359                break;
2360            case DcpResponse::Event::StreamEnd:
2361                {
2362                    LockHolder lh(streamMutex);
2363                    transitionState(StreamState::Dead);
2364                }
2365                break;
2366            case DcpResponse::Event::SystemEvent: {
2367                    ret = processSystemEvent(
2368                            *static_cast<SystemEventMessage*>(response.get()));
2369                    break;
2370                }
2371            default:
2372                log(EXTENSION_LOG_WARNING,
2373                    "PassiveStream::processBufferedMessages:"
2374                    "(vb %" PRIu16
2375                    ") PassiveStream ignoring "
2376                    "unknown message type %s",
2377                    vb_,
2378                    response->to_string());
2379                continue;
2380        }
2381
2382        if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
2383            failed = true;
2384            if (ret == ENGINE_ENOMEM) {
2385                noMem = true;
2386            }
2387        }
2388
2389        // Re-acquire bufMutex so that
2390        // 1) we can update the buffer
2391        // 2) safely re-check the while conditional statement
2392        lh.lock();
2393
2394        // If we failed and the stream is not dead, stash the DcpResponse at the
2395        // front of the queue and break the loop.
2396        if (failed && isActive()) {
2397            buffer.push_front(std::move(response), lh);
2398            break;
2399        }
2400
2401        count++;
2402        if (ret != ENGINE_ERANGE) {
2403            total_bytes_processed += message_bytes;
2404        }
2405    }
2406
2407    processed_bytes = total_bytes_processed;
2408
2409    if (failed) {
2410        if (noMem && engine->getReplicationThrottle().doDisconnectOnNoMem()) {
2411            log(EXTENSION_LOG_WARNING,
2412                "vb:%" PRIu16
2413                " Processor task indicating disconnection as "
2414                "there is no memory to complete replication; process dcp "
2415                "event returned no memory ",
2416                vb_);
2417            return stop_processing;
2418        }
2419        return cannot_process;
2420    }
2421
2422    return all_processed;
2423}
2424
2425ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
2426    VBucketPtr vb = engine->getVBucket(vb_);
2427    if (!vb) {
2428        return ENGINE_NOT_MY_VBUCKET;
2429    }
2430
2431    auto consumer = consumerPtr.lock();
2432    if (!consumer) {
2433        return ENGINE_DISCONNECT;
2434    }
2435
2436    if (uint64_t(*mutation->getBySeqno()) < cur_snapshot_start.load() ||
2437        uint64_t(*mutation->getBySeqno()) > cur_snapshot_end.load()) {
2438        log(EXTENSION_LOG_WARNING,
2439            "(vb %d) Erroneous mutation [sequence "
2440            "number does not fall in the expected snapshot range : "
2441            "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64
2442            ") <= "
2443            "snapshot_end (%" PRIu64 ")]; Dropping the mutation!",
2444            vb_,
2445            cur_snapshot_start.load(),
2446            *mutation->getBySeqno(),
2447            cur_snapshot_end.load());
2448        return ENGINE_ERANGE;
2449    }
2450
2451    // MB-17517: Check for the incoming item's CAS validity. We /shouldn't/
2452    // receive anything without a valid CAS, however given that versions without
2453    // this check may send us "bad" CAS values, we should regenerate them (which
2454    // is better than rejecting the data entirely).
2455    if (!Item::isValidCas(mutation->getItem()->getCas())) {
2456        log(EXTENSION_LOG_WARNING,
2457            "Invalid CAS (0x%" PRIx64 ") received for mutation {vb:%" PRIu16
2458            ", seqno:%" PRId64 "}. Regenerating new CAS",
2459            mutation->getItem()->getCas(),
2460            vb_,
2461            mutation->getItem()->getBySeqno());
2462        mutation->getItem()->setCas();
2463    }
2464
2465    ENGINE_ERROR_CODE ret;
2466    if (vb->isBackfillPhase()) {
2467        ret = engine->getKVBucket()->addBackfillItem(
2468                *mutation->getItem(),
2469                GenerateBySeqno::No,
2470                mutation->getExtMetaData());
2471    } else {
2472        ret = engine->getKVBucket()->setWithMeta(*mutation->getItem(),
2473                                                 0,
2474                                                 NULL,
2475                                                 consumer->getCookie(),
2476                                                 {vbucket_state_active,
2477                                                  vbucket_state_replica,
2478                                                  vbucket_state_pending},
2479                                                 CheckConflicts::No,
2480                                                 true,
2481                                                 GenerateBySeqno::No,
2482                                                 GenerateCas::No,
2483                                                 mutation->getExtMetaData(),
2484                                                 true);
2485    }
2486
2487    if (ret != ENGINE_SUCCESS) {
2488        log(EXTENSION_LOG_WARNING,
2489            "vb:%" PRIu16
2490            " Got error '%s' while trying to process "
2491            "mutation with seqno:%" PRId64,
2492            vb_,
2493            cb::to_string(cb::to_engine_errc(ret)).c_str(),
2494            mutation->getItem()->getBySeqno());
2495    } else {
2496        handleSnapshotEnd(vb, *mutation->getBySeqno());
2497    }
2498
2499    return ret;
2500}
2501
2502ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
2503    VBucketPtr vb = engine->getVBucket(vb_);
2504    if (!vb) {
2505        return ENGINE_NOT_MY_VBUCKET;
2506    }
2507
2508    auto consumer = consumerPtr.lock();
2509    if (!consumer) {
2510        return ENGINE_DISCONNECT;
2511    }
2512
2513    if (uint64_t(*deletion->getBySeqno()) < cur_snapshot_start.load() ||
2514        uint64_t(*deletion->getBySeqno()) > cur_snapshot_end.load()) {
2515        log(EXTENSION_LOG_WARNING,
2516            "(vb %d) Erroneous deletion [sequence "
2517            "number does not fall in the expected snapshot range : "
2518            "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64
2519            ") <= "
2520            "snapshot_end (%" PRIu64 ")]; Dropping the deletion!",
2521            vb_,
2522            cur_snapshot_start.load(),
2523            *deletion->getBySeqno(),
2524            cur_snapshot_end.load());
2525        return ENGINE_ERANGE;
2526    }
2527
2528    // The deleted value has a body, send it through the mutation path so we
2529    // set the deleted item with a value
2530    if (deletion->getItem()->getNBytes()) {
2531        return processMutation(deletion);
2532    }
2533
2534    uint64_t delCas = 0;
2535    ENGINE_ERROR_CODE ret;
2536    ItemMetaData meta = deletion->getItem()->getMetaData();
2537
2538    // MB-17517: Check for the incoming item's CAS validity.
2539    if (!Item::isValidCas(meta.cas)) {
2540        log(EXTENSION_LOG_WARNING,
2541            "Invalid CAS (0x%" PRIx64 ") received for deletion {vb:%" PRIu16
2542            ", seqno:%" PRId64 "}. Regenerating new CAS",
2543            meta.cas,
2544            vb_,
2545            *deletion->getBySeqno());
2546        meta.cas = Item::nextCas();
2547    }
2548
2549    ret = engine->getKVBucket()->deleteWithMeta(deletion->getItem()->getKey(),
2550                                                delCas,
2551                                                nullptr,
2552                                                deletion->getVBucket(),
2553                                                consumer->getCookie(),
2554                                                {vbucket_state_active,
2555                                                 vbucket_state_replica,
2556                                                 vbucket_state_pending},
2557                                                CheckConflicts::No,
2558                                                meta,
2559                                                vb->isBackfillPhase(),
2560                                                GenerateBySeqno::No,
2561                                                GenerateCas::No,
2562                                                *deletion->getBySeqno(),
2563                                                deletion->getExtMetaData(),
2564                                                true);
2565    if (ret == ENGINE_KEY_ENOENT) {
2566        ret = ENGINE_SUCCESS;
2567    }
2568
2569    if (ret != ENGINE_SUCCESS) {
2570        log(EXTENSION_LOG_WARNING,
2571            "vb:%" PRIu16
2572            " Got error '%s' while trying to process "
2573            "deletion with seqno:%" PRId64,
2574            vb_,
2575            cb::to_string(cb::to_engine_errc(ret)).c_str(),
2576            *deletion->getBySeqno());
2577    } else {
2578        handleSnapshotEnd(vb, *deletion->getBySeqno());
2579    }
2580
2581    return ret;
2582}
2583
2584ENGINE_ERROR_CODE PassiveStream::processSystemEvent(
2585        const SystemEventMessage& event) {
2586    VBucketPtr vb = engine->getVBucket(vb_);
2587
2588    if (!vb) {
2589        return ENGINE_NOT_MY_VBUCKET;
2590    }
2591
2592    ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
2593    // Depending on the event, extras is different and key may even be empty
2594    // The specific handler will know how to interpret.
2595    switch (event.getSystemEvent()) {
2596    case mcbp::systemevent::id::CreateCollection: {
2597        rv = processCreateCollection(*vb, {event});
2598        break;
2599    }
2600    case mcbp::systemevent::id::DeleteCollection: {
2601        rv = processBeginDeleteCollection(*vb, {event});
2602        break;
2603    }
2604    case mcbp::systemevent::id::CollectionsSeparatorChanged: {
2605        rv = processSeparatorChanged(*vb, {event});
2606        break;
2607    }
2608    default: {
2609        rv = ENGINE_EINVAL;
2610        break;
2611    }
2612    }
2613
2614    if (rv != ENGINE_SUCCESS) {
2615        log(EXTENSION_LOG_WARNING,
2616            "vb:%" PRIu16
2617            " Got error '%s' while trying to process "
2618            "system event",
2619            vb_,
2620            cb::to_string(cb::to_engine_errc(rv)).c_str());
2621    } else {
2622        handleSnapshotEnd(vb, *event.getBySeqno());
2623    }
2624
2625    return rv;
2626}
2627
2628ENGINE_ERROR_CODE PassiveStream::processCreateCollection(
2629        VBucket& vb, const CreateOrDeleteCollectionEvent& event) {
2630    try {
2631        vb.replicaAddCollection(event.getManifestUid(),
2632                                event.getCollection(),
2633                                event.getBySeqno());
2634    } catch (std::exception& e) {
2635        LOG(EXTENSION_LOG_WARNING,
2636            "PassiveStream::processCreateCollection exception %s",
2637            e.what());
2638        return ENGINE_EINVAL;
2639    }
2640    return ENGINE_SUCCESS;
2641}
2642
2643ENGINE_ERROR_CODE PassiveStream::processBeginDeleteCollection(
2644        VBucket& vb, const CreateOrDeleteCollectionEvent& event) {
2645    try {
2646        vb.replicaBeginDeleteCollection(event.getManifestUid(),
2647                                        event.getCollection(),
2648                                        event.getBySeqno());
2649    } catch (std::exception& e) {
2650        LOG(EXTENSION_LOG_WARNING,
2651            "PassiveStream::processBeginDeleteCollection exception %s",
2652            e.what());
2653        return ENGINE_EINVAL;
2654    }
2655    return ENGINE_SUCCESS;
2656}
2657
2658ENGINE_ERROR_CODE PassiveStream::processSeparatorChanged(
2659        VBucket& vb, const ChangeSeparatorCollectionEvent& event) {
2660    try {
2661        vb.replicaChangeCollectionSeparator(event.getManifestUid(),
2662                                            event.getSeparator(),
2663                                            event.getBySeqno());
2664    } catch (std::exception& e) {
2665        LOG(EXTENSION_LOG_WARNING,
2666            "PassiveStream::processSeparatorChanged exception %s",
2667            e.what());
2668        return ENGINE_EINVAL;
2669    }
2670    return ENGINE_SUCCESS;
2671}
2672
2673void PassiveStream::processMarker(SnapshotMarker* marker) {
2674    VBucketPtr vb = engine->getVBucket(vb_);
2675
2676    cur_snapshot_start.store(marker->getStartSeqno());
2677    cur_snapshot_end.store(marker->getEndSeqno());
2678    cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK) ?
2679            Snapshot::Disk : Snapshot::Memory);
2680
2681    if (vb) {
2682        auto& ckptMgr = *vb->checkpointManager;
2683        if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
2684            vb->setBackfillPhase(true);
2685            // calling setBackfillPhase sets the openCheckpointId to zero.
2686            ckptMgr.setBackfillPhase(cur_snapshot_start.load(),
2687                                     cur_snapshot_end.load());
2688        } else {
2689            if (marker->getFlags() & MARKER_FLAG_CHK ||
2690                vb->checkpointManager->getOpenCheckpointId() == 0) {
2691                ckptMgr.createSnapshot(cur_snapshot_start.load(),
2692                                       cur_snapshot_end.load());
2693            } else {
2694                ckptMgr.updateCurrentSnapshotEnd(cur_snapshot_end.load());
2695            }
2696            vb->setBackfillPhase(false);
2697        }
2698
2699        if (marker->getFlags() & MARKER_FLAG_ACK) {
2700            cur_snapshot_ack = true;
2701        }
2702    }
2703}
2704
2705void PassiveStream::processSetVBucketState(SetVBucketState* state) {
2706    engine->getKVBucket()->setVBucketState(vb_, state->getState(), true);
2707    {
2708        LockHolder lh (streamMutex);
2709        pushToReadyQ(std::make_unique<SetVBucketStateResponse>(opaque_,
2710                                                               ENGINE_SUCCESS));
2711    }
2712    notifyStreamReady();
2713}
2714
2715void PassiveStream::handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno) {
2716    if (byseqno == cur_snapshot_end.load()) {
2717        auto& ckptMgr = *vb->checkpointManager;
2718
2719        if (cur_snapshot_type.load() == Snapshot::Disk &&
2720                vb->isBackfillPhase()) {
2721            vb->setBackfillPhase(false);
2722        }
2723
2724        // MB-30019: we always want to close the open checkpoint on replica
2725        // vbuckets when the Consumer receives the snapshot-end mutation
2726        ckptMgr.checkAndAddNewCheckpoint(ckptMgr.getOpenCheckpointId() + 1,
2727                                         *vb);
2728
2729        if (cur_snapshot_ack) {
2730            {
2731                LockHolder lh(streamMutex);
2732                pushToReadyQ(std::make_unique<SnapshotMarkerResponse>(
2733                        opaque_, ENGINE_SUCCESS));
2734            }
2735            notifyStreamReady();
2736            cur_snapshot_ack = false;
2737        }
2738        cur_snapshot_type.store(Snapshot::None);
2739    }
2740}
2741
2742void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
2743    Stream::addStats(add_stat, c);
2744
2745    try {
2746        const int bsize = 1024;
2747        char buf[bsize];
2748        size_t bufferItems = 0;
2749        size_t bufferBytes = 0;
2750        {
2751            std::lock_guard<std::mutex> lg(buffer.bufMutex);
2752            bufferItems = buffer.messages.size();
2753            bufferBytes = buffer.bytes;
2754        }
2755        checked_snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(),
2756                         vb_);
2757        add_casted_stat(buf, bufferItems, add_stat, c);
2758        checked_snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(),
2759                         vb_);
2760        add_casted_stat(buf, bufferBytes, add_stat, c);
2761        checked_snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(),
2762                         vb_);
2763        add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
2764        checked_snprintf(buf, bsize, "%s:stream_%d_last_received_seqno",
2765                         name_.c_str(), vb_);
2766        add_casted_stat(buf, last_seqno.load(), add_stat, c);
2767        checked_snprintf(buf, bsize, "%s:stream_%d_ready_queue_memory",
2768                         name_.c_str(), vb_);
2769        add_casted_stat(buf, getReadyQueueMemory(), add_stat, c);
2770
2771        checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type",
2772                         name_.c_str(), vb_);
2773        add_casted_stat(buf, ::to_string(cur_snapshot_type.load()),
2774                        add_stat, c);
2775
2776        if (cur_snapshot_type.load() != Snapshot::None) {
2777            checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start",
2778                             name_.c_str(), vb_);
2779            add_casted_stat(buf, cur_snapshot_start.load(), add_stat, c);
2780            checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end",
2781                             name_.c_str(), vb_);
2782            add_casted_stat(buf, cur_snapshot_end.load(), add_stat, c);
2783        }
2784    } catch (std::exception& error) {
2785        LOG(EXTENSION_LOG_WARNING,
2786            "PassiveStream::addStats: Failed to build stats: %s", error.what());
2787    }
2788}
2789
2790std::unique_ptr<DcpResponse> PassiveStream::next() {
2791    LockHolder lh(streamMutex);
2792
2793    if (readyQ.empty()) {
2794        itemsReady.store(false);
2795        return NULL;
2796    }
2797
2798    return popFromReadyQ();
2799}
2800
2801uint32_t PassiveStream::clearBuffer_UNLOCKED() {
2802    uint32_t unackedBytes = buffer.bytes;
2803    buffer.messages.clear();
2804    buffer.bytes = 0;
2805    return unackedBytes;
2806}
2807
2808bool PassiveStream::transitionState(StreamState newState) {
2809    log(EXTENSION_LOG_INFO,
2810        "PassiveStream::transitionState: (vb %d) "
2811        "Transitioning from %s to %s",
2812        vb_,
2813        to_string(state_.load()).c_str(),
2814        to_string(newState).c_str());
2815
2816    if (state_ == newState) {
2817        return false;
2818    }
2819
2820    bool validTransition = false;
2821    switch (state_.load()) {
2822        case StreamState::Pending:
2823            if (newState == StreamState::Reading ||
2824                    newState == StreamState::Dead) {
2825                validTransition = true;
2826            }
2827            break;
2828
2829        case StreamState::Backfilling:
2830        case StreamState::InMemory:
2831        case StreamState::TakeoverSend:
2832        case StreamState::TakeoverWait:
2833            // Not valid for passive streams
2834            break;
2835
2836        case StreamState::Reading:
2837            if (newState == StreamState::Pending ||
2838                    newState == StreamState::Dead) {
2839                validTransition = true;
2840            }
2841            break;
2842
2843        case StreamState::Dead:
2844            // Once 'dead' shouldn't transition away from it.
2845            break;
2846    }
2847
2848    if (!validTransition) {
2849        throw std::invalid_argument("PassiveStream::transitionState:"
2850                " newState (which is" + to_string(newState) +
2851                ") is not valid for current state (which is " +
2852                to_string(state_.load()) + ")");
2853    }
2854
2855    state_ = newState;
2856    return true;
2857}
2858
2859std::string PassiveStream::getEndStreamStatusStr(end_stream_status_t status) {
2860    switch (status) {
2861        case END_STREAM_OK:
2862            return "The stream closed as part of normal operation";
2863        case END_STREAM_CLOSED:
2864            return "The stream closed due to a close stream message";
2865        case END_STREAM_DISCONNECTED:
2866            return "The stream closed early because the conn was disconnected";
2867        case END_STREAM_STATE:
2868            return "The stream closed early because the vbucket state changed";
2869        default:
2870            break;
2871    }
2872    return std::string{"Status unknown: " + std::to_string(status) +
2873                       "; this should not have happened!"};
2874}
2875
2876void PassiveStream::log(EXTENSION_LOG_LEVEL severity,
2877                        const char* fmt,
2878                        ...) const {
2879    va_list va;
2880    va_start(va, fmt);
2881    auto consumer = consumerPtr.lock();
2882    if (consumer) {
2883        consumer->getLogger().vlog(severity, fmt, va);
2884    } else {
2885        static Logger defaultLogger =
2886                Logger("DCP (Consumer): **Deleted conn**");
2887        defaultLogger.vlog(severity, fmt, va);
2888    }
2889    va_end(va);
2890}
2891
2892void PassiveStream::notifyStreamReady() {
2893    auto consumer = consumerPtr.lock();
2894    if (!consumer) {
2895        return;
2896    }
2897
2898    bool inverse = false;
2899    if (itemsReady.compare_exchange_strong(inverse, true)) {
2900        consumer->notifyStreamReady(vb_);
2901    }
2902}
2903