xref: /4.6.4/ep-engine/src/dcp/stream.cc (revision 1d750376)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <platform/checked_snprintf.h>
21
22#include "ep_engine.h"
23#include "failover-table.h"
24#include "kvstore.h"
25#include "statwriter.h"
26#include "dcp/backfill-manager.h"
27#include "dcp/backfill.h"
28#include "dcp/consumer.h"
29#include "dcp/producer.h"
30#include "dcp/response.h"
31#include "dcp/stream.h"
32#include "replicationthrottle.h"
33
34static const char* snapshotTypeToString(snapshot_type_t type) {
35    static const char * const snapshotTypes[] = { "none", "disk", "memory" };
36    if (type < none || type > memory) {
37        throw std::invalid_argument("snapshotTypeToString: type (which is " +
38                                    std::to_string(type) +
39                                    ") is not a valid snapshot_type_t");
40    }
41    return snapshotTypes[type];
42}
43
44const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
45
46Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
47               uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
48               uint64_t vb_uuid, uint64_t snap_start_seqno,
49               uint64_t snap_end_seqno)
50    : name_(name), flags_(flags), opaque_(opaque), vb_(vb),
51      start_seqno_(start_seqno), end_seqno_(end_seqno), vb_uuid_(vb_uuid),
52      snap_start_seqno_(snap_start_seqno),
53      snap_end_seqno_(snap_end_seqno),
54      state_(STREAM_PENDING), itemsReady(false),
55      readyQ_non_meta_items(0),
56      readyQueueMemory(0) {
57}
58
59Stream::~Stream() {
60    // NB: reusing the "unlocked" method without a lock because we're
61    // destructing and should not take any locks.
62    clear_UNLOCKED();
63}
64
65void Stream::clear_UNLOCKED() {
66    while (!readyQ.empty()) {
67        DcpResponse* resp = readyQ.front();
68        popFromReadyQ();
69        delete resp;
70    }
71}
72
73void Stream::pushToReadyQ(DcpResponse* resp)
74{
75   /* expect streamMutex.ownsLock() == true */
76    if (resp) {
77        readyQ.push(resp);
78        if (!resp->isMetaEvent()) {
79            readyQ_non_meta_items++;
80        }
81        readyQueueMemory.fetch_add(resp->getMessageSize(),
82                                   std::memory_order_relaxed);
83    }
84}
85
86void Stream::popFromReadyQ(void)
87{
88    /* expect streamMutex.ownsLock() == true */
89    if (!readyQ.empty()) {
90        const auto& front = readyQ.front();
91        if (!front->isMetaEvent()) {
92            readyQ_non_meta_items--;
93        }
94        const uint32_t respSize = front->getMessageSize();
95        readyQ.pop();
96
97        /* Decrement the readyQ size */
98        if (respSize <= readyQueueMemory.load(std::memory_order_relaxed)) {
99            readyQueueMemory.fetch_sub(respSize, std::memory_order_relaxed);
100        } else {
101            LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
102                "underflow, likely wrong stat calculation! curr size: %" PRIu64
103                "; new size: %d",
104                name_.c_str(), getVBucket(),
105                readyQueueMemory.load(std::memory_order_relaxed), respSize);
106            readyQueueMemory.store(0, std::memory_order_relaxed);
107        }
108    }
109}
110
111uint64_t Stream::getReadyQueueMemory() {
112    return readyQueueMemory.load(std::memory_order_relaxed);
113}
114
115const char* Stream::stateName(stream_state_t st) {
116    static const char * const stateNames[] = {
117        "pending", "backfilling", "in-memory", "takeover-send", "takeover-wait",
118        "reading", "dead"
119    };
120    if (st < STREAM_PENDING || st > STREAM_DEAD) {
121        throw std::invalid_argument("Stream::stateName: st (which is " +
122                                        std::to_string(st) +
123                                        ") is not a valid stream_state_t");
124    }
125    return stateNames[st];
126}
127
128void Stream::addStats(ADD_STAT add_stat, const void *c) {
129    try {
130        const int bsize = 1024;
131        char buffer[bsize];
132        checked_snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(),
133                         vb_);
134        add_casted_stat(buffer, flags_, add_stat, c);
135        checked_snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(),
136                         vb_);
137        add_casted_stat(buffer, opaque_, add_stat, c);
138        checked_snprintf(buffer, bsize, "%s:stream_%d_start_seqno",
139                         name_.c_str(), vb_);
140        add_casted_stat(buffer, start_seqno_, add_stat, c);
141        checked_snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(),
142                         vb_);
143        add_casted_stat(buffer, end_seqno_, add_stat, c);
144        checked_snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(),
145                         vb_);
146        add_casted_stat(buffer, vb_uuid_, add_stat, c);
147        checked_snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno",
148                         name_.c_str(), vb_);
149        add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
150        checked_snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno",
151                         name_.c_str(), vb_);
152        add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
153        checked_snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(),
154                         vb_);
155        add_casted_stat(buffer, stateName(state_), add_stat, c);
156    } catch (std::exception& error) {
157        LOG(EXTENSION_LOG_WARNING,
158            "Stream::addStats: Failed to build stats: %s", error.what());
159    }
160}
161
162ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
163                           const std::string &n, uint32_t flags,
164                           uint32_t opaque, uint16_t vb, uint64_t st_seqno,
165                           uint64_t en_seqno, uint64_t vb_uuid,
166                           uint64_t snap_start_seqno, uint64_t snap_end_seqno)
167    :  Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
168              snap_start_seqno, snap_end_seqno),
169       isBackfillTaskRunning(false), pendingBackfill(false),
170       lastReadSeqnoUnSnapshotted(st_seqno), lastReadSeqno(st_seqno),
171       lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
172       takeoverState(vbucket_state_pending), backfillRemaining(0),
173       itemsFromMemoryPhase(0), firstMarkerSent(false), waitForSnapshot(0),
174       engine(e), producer(p),
175       payloadType((flags & DCP_ADD_STREAM_FLAG_NO_VALUE) ? KEY_ONLY :
176                                                            KEY_VALUE),
177       lastSentSnapEndSeqno(0), chkptItemsExtractionInProgress(false) {
178
179    const char* type = "";
180    if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
181        type = "takeover ";
182        end_seqno_ = dcpMaxSeqno;
183    }
184
185    RCPtr<VBucket> vbucket = engine->getVBucket(vb);
186    if (vbucket) {
187        ReaderLockHolder rlh(vbucket->getStateLock());
188        if (vbucket->getState() == vbucket_state_replica) {
189            snapshot_info_t info = vbucket->checkpointManager.getSnapshotInfo();
190            if (info.range.end > en_seqno) {
191                end_seqno_ = info.range.end;
192            }
193        }
194    }
195
196    producer->getLogger().log(EXTENSION_LOG_NOTICE,
197        "(vb %" PRIu16 ") Creating %sstream with start seqno %" PRIu64
198        " and end seqno %" PRIu64, vb, type, st_seqno, en_seqno);
199
200    backfillItems.memory = 0;
201    backfillItems.disk = 0;
202    backfillItems.sent = 0;
203
204    type_ = STREAM_ACTIVE;
205
206    bufferedBackfill.bytes = 0;
207    bufferedBackfill.items = 0;
208
209    takeoverStart = 0;
210    takeoverSendMaxTime = engine->getConfiguration().getDcpTakeoverMaxTime();
211
212    if (start_seqno_ >= end_seqno_) {
213        /* streamMutex lock needs to be acquired because endStream
214         * potentially makes call to pushToReadyQueue.
215         */
216        LockHolder lh(streamMutex);
217        endStream(END_STREAM_OK);
218        itemsReady.store(true);
219        // lock is released on leaving the scope
220    }
221}
222
223ActiveStream::~ActiveStream() {
224    transitionState(STREAM_DEAD);
225}
226
227DcpResponse* ActiveStream::next() {
228    LockHolder lh(streamMutex);
229
230    stream_state_t initState = state_;
231
232    DcpResponse* response = NULL;
233
234    bool validTransition = false;
235    switch (initState) {
236        case STREAM_PENDING:
237            validTransition = true;
238            break;
239        case STREAM_BACKFILLING:
240            validTransition = true;
241            response = backfillPhase();
242            break;
243        case STREAM_IN_MEMORY:
244            validTransition = true;
245            response = inMemoryPhase();
246            break;
247        case STREAM_TAKEOVER_SEND:
248            validTransition = true;
249            response = takeoverSendPhase();
250            break;
251        case STREAM_TAKEOVER_WAIT:
252            validTransition = true;
253            response = takeoverWaitPhase();
254            break;
255        case STREAM_READING:
256            // Not valid for an active stream.
257            break;
258        case STREAM_DEAD:
259            validTransition = true;
260            response = deadPhase();
261            break;
262    }
263
264    if (!validTransition) {
265        throw std::invalid_argument("ActiveStream::transitionState:"
266                " invalid state " + std::to_string(state_) + " for stream " +
267                producer->logHeader() + " vb " + std::to_string(vb_));
268    }
269
270    stream_state_t newState = state_;
271
272    if (newState != STREAM_DEAD && newState != state_ && !response) {
273        lh.unlock();
274        return next();
275    }
276
277    itemsReady.store(response ? true : false);
278    return response;
279}
280
281void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
282    LockHolder lh(streamMutex);
283    uint64_t chkCursorSeqno = endSeqno;
284
285    if (state_ != STREAM_BACKFILLING) {
286        return;
287    }
288
289    startSeqno = std::min(snap_start_seqno_, startSeqno);
290    firstMarkerSent = true;
291
292    RCPtr<VBucket> vb = engine->getVBucket(vb_);
293    // An atomic read of vbucket state without acquiring the
294    // reader lock for state should suffice here.
295    if (vb && vb->getState() == vbucket_state_replica) {
296        if (end_seqno_ > endSeqno) {
297            /* We possibly have items in the open checkpoint
298               (incomplete snapshot) */
299            snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
300            producer->getLogger().log(EXTENSION_LOG_NOTICE,
301                "(vb %" PRIu16 ") Merging backfill and memory snapshot for a "
302                "replica vbucket, backfill start seqno %" PRIu64 ", "
303                "backfill end seqno %" PRIu64 ", "
304                "snapshot end seqno after merge %" PRIu64,
305                vb_, startSeqno, endSeqno, info.range.end);
306            endSeqno = info.range.end;
307        }
308    }
309
310    producer->getLogger().log(EXTENSION_LOG_NOTICE,
311        "(vb %" PRIu16 ") Sending disk snapshot with start seqno %" PRIu64
312        " and end seqno %" PRIu64, vb_, startSeqno, endSeqno);
313    pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
314                                    MARKER_FLAG_DISK));
315    lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
316
317    if (!vb) {
318        endStream(END_STREAM_STATE);
319    } else if (!(flags_ & DCP_ADD_STREAM_FLAG_DISKONLY)) {
320        // Only re-register the cursor if we still need to get memory snapshots
321        CursorRegResult result =
322            vb->checkpointManager.registerCursorBySeqno(
323                                                name_, chkCursorSeqno,
324                                                MustSendCheckpointEnd::NO);
325        curChkSeqno = result.first;
326    }
327
328    lh.unlock();
329    bool inverse = false;
330    if (itemsReady.compare_exchange_strong(inverse, true)) {
331        producer->notifyStreamReady(vb_);
332    }
333}
334
335bool ActiveStream::backfillReceived(Item* itm, backfill_source_t backfill_source) {
336    if (nullptr == itm) {
337        return false;
338    }
339
340    if (itm->shouldReplicate()) {
341        LockHolder lh(streamMutex);
342        if (state_ == STREAM_BACKFILLING) {
343            if (!producer->recordBackfillManagerBytesRead(itm->size())) {
344                delete itm;
345                return false;
346            }
347
348            bufferedBackfill.bytes.fetch_add(itm->size());
349            bufferedBackfill.items++;
350
351            pushToReadyQ(new MutationResponse(itm, opaque_, nullptr));
352
353            lastReadSeqno.store(itm->getBySeqno());
354            lh.unlock();
355            bool inverse = false;
356            if (itemsReady.compare_exchange_strong(inverse, true)) {
357                producer->notifyStreamReady(vb_);
358            }
359
360            if (backfill_source == BACKFILL_FROM_MEMORY) {
361                backfillItems.memory++;
362            } else {
363                backfillItems.disk++;
364            }
365        } else {
366            delete itm;
367        }
368    } else {
369        delete itm;
370    }
371
372    return true;
373}
374
375void ActiveStream::completeBackfill() {
376    {
377        LockHolder lh(streamMutex);
378        if (state_ == STREAM_BACKFILLING) {
379            producer->getLogger().log(EXTENSION_LOG_NOTICE,
380                    "(vb %" PRIu16 ") Backfill complete, %" PRIu64 " items "
381                    "read from disk, %" PRIu64 " from memory, last seqno read: "
382                    "%" PRIu64 "\n", vb_, uint64_t(backfillItems.disk.load()),
383                    uint64_t(backfillItems.memory.load()),
384                    lastReadSeqno.load());
385
386            isBackfillTaskRunning = false;
387            if (pendingBackfill) {
388                scheduleBackfill_UNLOCKED(true);
389                pendingBackfill = false;
390            }
391
392            bool expected = false;
393            if (itemsReady.compare_exchange_strong(expected, true)) {
394                producer->notifyStreamReady(vb_);
395            }
396
397            /**
398              * MB-22451: It is important that we return here because
399              * scheduleBackfill_UNLOCKED(true) can set
400              * isBackfillTaskRunning to true.  Therefore if we don't return we
401              * will set isBackfillTaskRunning prematurely back to false, (see
402              * below).
403              */
404            return;
405        }
406    }
407
408    bool inverse = true;
409    isBackfillTaskRunning.compare_exchange_strong(inverse, false);
410    inverse = false;
411    if (itemsReady.compare_exchange_strong(inverse, true)) {
412        producer->notifyStreamReady(vb_);
413    }
414}
415
416void ActiveStream::snapshotMarkerAckReceived() {
417    bool inverse = false;
418    if (--waitForSnapshot == 0 &&
419        itemsReady.compare_exchange_strong(inverse, true)) {
420        producer->notifyStreamReady(vb_);
421    }
422}
423
424void ActiveStream::setVBucketStateAckRecieved() {
425    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
426    if (!vbucket) {
427        producer->getLogger().log(EXTENSION_LOG_WARNING,
428                                  "(vb %" PRIu16
429                                  ") not present during ack for set vbucket "
430                                  "during takeover",
431                                  vb_);
432        return;
433    }
434
435    {
436        /* Order in which the below 3 locks are acquired is important to avoid
437           any potential lock inversion problems */
438        LockHolder epVbSetLh(engine->getEpStore()->getVbSetMutexLock());
439        WriterLockHolder vbStateLh(vbucket->getStateLock());
440        LockHolder lh(streamMutex);
441        if (state_ == STREAM_TAKEOVER_WAIT) {
442            if (takeoverState == vbucket_state_pending) {
443                producer->getLogger().log(EXTENSION_LOG_INFO,
444                        "(vb %" PRIu16 ") Receive ack for set vbucket state to "
445                        "pending message",
446                        vb_);
447
448                takeoverState = vbucket_state_active;
449                transitionState(STREAM_TAKEOVER_SEND);
450
451                engine->getEpStore()->setVBucketState_UNLOCKED(
452                                                        vb_,
453                                                        vbucket_state_dead,
454                                                        false /* transfer */,
455                                                        false /* notify_dcp */,
456                                                        epVbSetLh,
457                                                        &vbStateLh);
458
459                producer->getLogger().log(EXTENSION_LOG_NOTICE,
460                        "(vb %" PRIu16 ") Vbucket marked as dead, last sent "
461                        "seqno: %" PRIu64 ", high seqno: %" PRIu64,
462                        vb_,
463                        lastSentSeqno.load(),
464                        vbucket->getHighSeqno());
465            } else {
466                producer->getLogger().log(EXTENSION_LOG_NOTICE,
467                        "(vb %" PRIu16 ") Receive ack for set vbucket state to "
468                        "active message",
469                        vb_);
470                endStream(END_STREAM_OK);
471            }
472        } else {
473            producer->getLogger().log(EXTENSION_LOG_WARNING,
474                    "(vb %" PRIu16 ") Unexpected ack for set vbucket op on "
475                    "stream '%s' state '%s'",
476                    vb_,
477                    name_.c_str(),
478                    stateName(state_));
479            return;
480        }
481    }
482
483    bool inverse = false;
484    if (itemsReady.compare_exchange_strong(inverse, true)) {
485        producer->notifyStreamReady(vb_);
486    }
487}
488
489DcpResponse* ActiveStream::backfillPhase() {
490    DcpResponse* resp = nextQueuedItem();
491
492    if (resp && (resp->getEvent() == DCP_MUTATION ||
493         resp->getEvent() == DCP_DELETION ||
494         resp->getEvent() == DCP_EXPIRATION)) {
495        MutationResponse* m = static_cast<MutationResponse*>(resp);
496        producer->recordBackfillManagerBytesSent(m->getItem()->size());
497        bufferedBackfill.bytes.fetch_sub(m->getItem()->size());
498        bufferedBackfill.items--;
499        if (backfillRemaining.load(std::memory_order_relaxed) > 0) {
500            backfillRemaining.fetch_sub(1, std::memory_order_relaxed);
501        }
502    }
503
504    if (!isBackfillTaskRunning && readyQ.empty()) {
505        backfillRemaining.store(0, std::memory_order_relaxed);
506        if (lastReadSeqno.load() >= end_seqno_) {
507            endStream(END_STREAM_OK);
508        } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
509            transitionState(STREAM_TAKEOVER_SEND);
510        } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
511            endStream(END_STREAM_OK);
512        } else {
513            transitionState(STREAM_IN_MEMORY);
514        }
515
516        if (!resp) {
517            resp = nextQueuedItem();
518        }
519    }
520
521    return resp;
522}
523
524DcpResponse* ActiveStream::inMemoryPhase() {
525    if (lastSentSeqno.load() >= end_seqno_) {
526        endStream(END_STREAM_OK);
527    } else if (readyQ.empty()) {
528        if (pendingBackfill) {
529            transitionState(STREAM_BACKFILLING);
530            pendingBackfill = false;
531            return NULL;
532        } else if (nextCheckpointItem()) {
533            return NULL;
534        }
535    }
536
537    return nextQueuedItem();
538}
539
540DcpResponse* ActiveStream::takeoverSendPhase() {
541
542    RCPtr<VBucket> vb = engine->getVBucket(vb_);
543    if (vb && takeoverStart != 0 &&
544        !vb->isTakeoverBackedUp() &&
545        (ep_current_time() - takeoverStart) > takeoverSendMaxTime) {
546        vb->setTakeoverBackedUpState(true);
547    }
548
549    if (!readyQ.empty()) {
550        return nextQueuedItem();
551    } else {
552        if (nextCheckpointItem()) {
553            return NULL;
554        }
555    }
556
557    if (waitForSnapshot != 0) {
558        return NULL;
559    }
560
561    if (vb) {
562        vb->setTakeoverBackedUpState(false);
563        takeoverStart = 0;
564    }
565
566    DcpResponse* resp = NULL;
567    if (producer->bufferLogInsert(SetVBucketState::baseMsgBytes)) {
568        resp = new SetVBucketState(opaque_, vb_, takeoverState);
569        transitionState(STREAM_TAKEOVER_WAIT);
570    }
571    return resp;
572}
573
574DcpResponse* ActiveStream::takeoverWaitPhase() {
575    return nextQueuedItem();
576}
577
578DcpResponse* ActiveStream::deadPhase() {
579    DcpResponse* resp = nextQueuedItem();
580    if (!resp) {
581        producer->getLogger().log(EXTENSION_LOG_NOTICE,
582                                  "(vb %" PRIu16 ") Stream closed, "
583                                  "%" PRIu64 " items sent from backfill phase, "
584                                  "%" PRIu64 " items sent from memory phase, "
585                                  "%" PRIu64 " was last seqno sent",
586                                  vb_,
587                                  uint64_t(backfillItems.sent.load()),
588                                  uint64_t(itemsFromMemoryPhase.load()),
589                                  lastSentSeqno.load());
590    }
591    return resp;
592}
593
594bool ActiveStream::isCompressionEnabled() {
595    return producer->isValueCompressionEnabled();
596}
597
598void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
599    Stream::addStats(add_stat, c);
600
601    try {
602        const int bsize = 1024;
603        char buffer[bsize];
604        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_disk_items",
605                         name_.c_str(), vb_);
606        add_casted_stat(buffer, backfillItems.disk, add_stat, c);
607        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_mem_items",
608                         name_.c_str(), vb_);
609        add_casted_stat(buffer, backfillItems.memory, add_stat, c);
610        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_sent",
611                         name_.c_str(), vb_);
612        add_casted_stat(buffer, backfillItems.sent, add_stat, c);
613        checked_snprintf(buffer, bsize, "%s:stream_%d_memory_phase",
614                         name_.c_str(), vb_);
615        add_casted_stat(buffer, itemsFromMemoryPhase.load(), add_stat, c);
616        checked_snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno",
617                         name_.c_str(), vb_);
618        add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
619        checked_snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
620                         name_.c_str(), vb_);
621        add_casted_stat(buffer,
622                        lastSentSnapEndSeqno.load(std::memory_order_relaxed),
623                        add_stat, c);
624        checked_snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno",
625                         name_.c_str(), vb_);
626        add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
627        checked_snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory",
628                         name_.c_str(), vb_);
629        add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
630        checked_snprintf(buffer, bsize, "%s:stream_%d_items_ready",
631                         name_.c_str(), vb_);
632        add_casted_stat(buffer, itemsReady.load() ? "true" : "false", add_stat,
633                        c);
634        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_bytes",
635                         name_.c_str(), vb_);
636        add_casted_stat(buffer, bufferedBackfill.bytes, add_stat, c);
637        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_items",
638                         name_.c_str(), vb_);
639        add_casted_stat(buffer, bufferedBackfill.items, add_stat, c);
640
641        if ((state_ == STREAM_TAKEOVER_SEND) && takeoverStart != 0) {
642            checked_snprintf(buffer, bsize, "%s:stream_%d_takeover_since",
643                             name_.c_str(), vb_);
644            add_casted_stat(buffer, ep_current_time() - takeoverStart, add_stat,
645                            c);
646        }
647    } catch (std::exception& error) {
648        LOG(EXTENSION_LOG_WARNING,
649            "ActiveStream::addStats: Failed to build stats: %s", error.what());
650    }
651}
652
653void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
654    LockHolder lh(streamMutex);
655
656    RCPtr<VBucket> vb = engine->getVBucket(vb_);
657    add_casted_stat("name", name_, add_stat, cookie);
658    if (!vb || state_ == STREAM_DEAD) {
659        add_casted_stat("status", "completed", add_stat, cookie);
660        add_casted_stat("estimate", 0, add_stat, cookie);
661        add_casted_stat("backfillRemaining", 0, add_stat, cookie);
662        return;
663    }
664
665    size_t total = backfillRemaining.load(std::memory_order_relaxed);
666    if (state_ == STREAM_BACKFILLING) {
667        add_casted_stat("status", "backfilling", add_stat, cookie);
668    } else {
669        add_casted_stat("status", "in-memory", add_stat, cookie);
670    }
671    add_casted_stat("backfillRemaining",
672                    backfillRemaining.load(std::memory_order_relaxed),
673                    add_stat, cookie);
674
675    item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
676    size_t vb_items = vb->getNumItems(iep);
677    size_t chk_items = vb_items > 0 ?
678                vb->checkpointManager.getNumItemsForCursor(name_) : 0;
679
680    size_t del_items = 0;
681    try {
682        del_items = engine->getEpStore()->getRWUnderlying(vb_)->
683                                                        getNumPersistedDeletes(vb_);
684    } catch (std::runtime_error& e) {
685        producer->getLogger().log(EXTENSION_LOG_WARNING,
686            "ActiveStream:addTakeoverStats: exception while getting num persisted "
687            "deletes for vbucket:%" PRIu16 " - treating as 0 deletes. "
688            "Details: %s", vb_, e.what());
689    }
690
691    if (end_seqno_ < curChkSeqno) {
692        chk_items = 0;
693    } else if ((end_seqno_ - curChkSeqno) < chk_items) {
694        chk_items = end_seqno_ - curChkSeqno + 1;
695    }
696    total += chk_items;
697
698    add_casted_stat("estimate", total, add_stat, cookie);
699    add_casted_stat("chk_items", chk_items, add_stat, cookie);
700    add_casted_stat("vb_items", vb_items, add_stat, cookie);
701    add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
702}
703
704DcpResponse* ActiveStream::nextQueuedItem() {
705    if (!readyQ.empty()) {
706        DcpResponse* response = readyQ.front();
707        if (producer->bufferLogInsert(response->getMessageSize())) {
708            if (response->getEvent() == DCP_MUTATION ||
709                    response->getEvent() == DCP_DELETION ||
710                    response->getEvent() == DCP_EXPIRATION) {
711                lastSentSeqno.store(
712                        dynamic_cast<MutationResponse*>(response)->getBySeqno());
713
714                if (state_ == STREAM_BACKFILLING) {
715                    backfillItems.sent++;
716                } else {
717                    itemsFromMemoryPhase++;
718                }
719            }
720            popFromReadyQ();
721            return response;
722        }
723    }
724    return NULL;
725}
726
727bool ActiveStream::nextCheckpointItem() {
728    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
729    if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(name_) > 0) {
730        // schedule this stream to build the next checkpoint
731        producer->scheduleCheckpointProcessorTask(this);
732        return true;
733    } else if (chkptItemsExtractionInProgress) {
734        return true;
735    }
736    return false;
737}
738
739bool ActiveStreamCheckpointProcessorTask::run() {
740    if (engine->getEpStats().isShutdown) {
741        return false;
742    }
743
744    // Setup that we will sleep forever when done.
745    snooze(INT_MAX);
746
747    // Clear the notfification flag
748    notified.store(false);
749
750    size_t iterations = 0;
751    do {
752        stream_t nextStream = queuePop();
753        ActiveStream* stream = static_cast<ActiveStream*>(nextStream.get());
754
755        if (stream) {
756            stream->nextCheckpointItemTask();
757        } else {
758            break;
759        }
760        iterations++;
761    } while(!queueEmpty()
762            && iterations < iterationsBeforeYield);
763
764    // Now check if we were re-notified or there are still checkpoints
765    bool expected = true;
766    if (notified.compare_exchange_strong(expected, false)
767        || !queueEmpty()) {
768        // snooze for 0, essentially yielding and allowing other tasks a go
769        snooze(0.0);
770    }
771
772    return true;
773}
774
775void ActiveStreamCheckpointProcessorTask::wakeup() {
776    ExecutorPool::get()->wake(getId());
777}
778
779void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
780    pushUnique(stream);
781
782    bool expected = false;
783    if (notified.compare_exchange_strong(expected, true)) {
784        wakeup();
785    }
786}
787
788void ActiveStreamCheckpointProcessorTask::clearQueues() {
789    LockHolder lh(workQueueLock);
790    while (!queue.empty()) {
791        queue.pop();
792    }
793    queuedVbuckets.clear();
794}
795
796void ActiveStream::nextCheckpointItemTask() {
797    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
798    if (vbucket) {
799        std::vector<queued_item> items;
800        getOutstandingItems(vbucket, items);
801        processItems(items);
802    } else {
803        /* The entity deleting the vbucket must set stream to dead,
804           calling setDead(END_STREAM_STATE) will cause deadlock because
805           it will try to grab streamMutex which is already acquired at this
806           point here */
807        return;
808    }
809}
810
811void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
812                                       std::vector<queued_item> &items) {
813    // Commencing item processing - set guard flag.
814    chkptItemsExtractionInProgress.store(true);
815
816    vb->checkpointManager.getAllItemsForCursor(name_, items);
817    if (vb->checkpointManager.getNumCheckpoints() > 1) {
818        engine->getEpStore()->wakeUpCheckpointRemover();
819    }
820}
821
822void ActiveStream::processItems(std::vector<queued_item>& items) {
823    if (!items.empty()) {
824        bool mark = false;
825        if (items.front()->getOperation() == queue_op::checkpoint_start) {
826            mark = true;
827        }
828
829        std::deque<MutationResponse*> mutations;
830        std::vector<queued_item>::iterator itr = items.begin();
831        for (; itr != items.end(); ++itr) {
832            queued_item& qi = *itr;
833
834            if (qi->shouldReplicate()) {
835                curChkSeqno = qi->getBySeqno();
836                lastReadSeqnoUnSnapshotted = qi->getBySeqno();
837
838                mutations.push_back(new MutationResponse(qi, opaque_, nullptr,
839                            isSendMutationKeyOnlyEnabled() ? KEY_ONLY :
840                                                             KEY_VALUE));
841            } else if (qi->getOperation() == queue_op::checkpoint_start) {
842                /* if there are already other mutations, then they belong to the
843                   previous checkpoint and hence we must create a snapshot and
844                   put them onto readyQ */
845                if (!mutations.empty()) {
846                    snapshot(mutations, mark);
847                    /* clear out all the mutations since they are already put
848                       onto the readyQ */
849                    mutations.clear();
850                }
851                /* mark true as it indicates a new checkpoint snapshot */
852                mark = true;
853            }
854        }
855
856        if (mutations.empty()) {
857            // If we only got checkpoint start or ends check to see if there are
858            // any more snapshots before pausing the stream.
859            nextCheckpointItemTask();
860        } else {
861            snapshot(mutations, mark);
862        }
863    }
864
865    // Completed item processing - clear guard flag and notify producer.
866    chkptItemsExtractionInProgress.store(false);
867    producer->notifyStreamReady(vb_);
868}
869
870void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
871    if (items.empty()) {
872        return;
873    }
874
875    LockHolder lh(streamMutex);
876
877    if ((state_ == STREAM_DEAD) || (state_ == STREAM_BACKFILLING)) {
878        // If stream was closed forcefully by the time the checkpoint items
879        // retriever task completed, or if we decided to switch the stream to
880        // backfill state from in-memory state, none of the acquired mutations
881        // should be added on the stream's readyQ. We must drop items in case
882        // we switch state from in-memory to backfill because we schedule
883        // backfill from lastReadSeqno + 1
884        std::deque<MutationResponse *>::iterator itr = items.begin();
885        for (; itr != items.end(); ++itr) {
886            delete *itr;
887        }
888        items.clear();
889        return;
890    }
891
892    /* This assumes that all items in the "items deque" is put onto readyQ */
893    lastReadSeqno.store(lastReadSeqnoUnSnapshotted);
894
895    if (isCurrentSnapshotCompleted()) {
896        uint32_t flags = MARKER_FLAG_MEMORY;
897        uint64_t snapStart = items.front()->getBySeqno();
898        uint64_t snapEnd = items.back()->getBySeqno();
899
900        if (mark) {
901            flags |= MARKER_FLAG_CHK;
902        }
903
904        if (state_ == STREAM_TAKEOVER_SEND) {
905            waitForSnapshot++;
906            flags |= MARKER_FLAG_ACK;
907        }
908
909        if (!firstMarkerSent) {
910            snapStart = std::min(snap_start_seqno_, snapStart);
911            firstMarkerSent = true;
912        }
913        pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
914                                        flags));
915        lastSentSnapEndSeqno.store(snapEnd, std::memory_order_relaxed);
916    }
917
918    std::deque<MutationResponse*>::iterator itemItr;
919    for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
920        pushToReadyQ(*itemItr);
921    }
922}
923
924uint32_t ActiveStream::setDead(end_stream_status_t status) {
925    {
926        LockHolder lh(streamMutex);
927        endStream(status);
928    }
929
930    bool inverse = false;
931    if (status != END_STREAM_DISCONNECTED &&
932        itemsReady.compare_exchange_strong(inverse, true)) {
933        producer->notifyStreamReady(vb_);
934    }
935    return 0;
936}
937
938void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
939    if (state_ != STREAM_DEAD) {
940        bool inverse = false;
941        if (itemsReady.compare_exchange_strong(inverse, true)) {
942            producer->notifyStreamReady(vb_);
943        }
944    }
945}
946
947void ActiveStream::endStream(end_stream_status_t reason) {
948    if (state_ != STREAM_DEAD) {
949        pendingBackfill = false;
950        if (state_ == STREAM_BACKFILLING) {
951            // If Stream were in Backfilling state, clear out the
952            // backfilled items to clear up the backfill buffer.
953            clear_UNLOCKED();
954            producer->recordBackfillManagerBytesSent(bufferedBackfill.bytes);
955            bufferedBackfill.bytes = 0;
956            bufferedBackfill.items = 0;
957        }
958        transitionState(STREAM_DEAD);
959        if (reason != END_STREAM_DISCONNECTED) {
960            pushToReadyQ(new StreamEndResponse(opaque_, reason, vb_));
961        }
962        producer->getLogger().log(EXTENSION_LOG_NOTICE,
963                                  "(vb %" PRIu16 ") Stream closing, "
964                                  "sent until seqno %" PRIu64 " "
965                                  "remaining items %" PRIu64 ", "
966                                  "reason: %s",
967                                  vb_,
968                                  lastSentSeqno.load(),
969                                  uint64_t(readyQ_non_meta_items.load()),
970                                  getEndStreamStatusStr(reason));
971    }
972}
973
974void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
975    if (isBackfillTaskRunning) {
976        producer->getLogger().log(EXTENSION_LOG_NOTICE,
977                                  "(vb %" PRIu16 ") Skipping "
978                                  "scheduleBackfill_UNLOCKED; "
979                                  "lastReadSeqno %" PRIu64 ", reschedule flag "
980                                  ": %s", vb_, lastReadSeqno.load(),
981                                  reschedule ? "True" : "False");
982        return;
983    }
984
985    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
986    if (!vbucket) {
987        return;
988    }
989
990    uint64_t backfillStart = lastReadSeqno.load() + 1;
991    uint64_t backfillEnd = 0;
992    bool tryBackfill = false;
993
994    if ((flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) || reschedule) {
995        uint64_t vbHighSeqno = static_cast<uint64_t>(vbucket->getHighSeqno());
996        if (lastReadSeqno.load() > vbHighSeqno) {
997            throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
998                                   "lastReadSeqno (which is " +
999                                   std::to_string(lastReadSeqno.load()) +
1000                                   " ) is greater than vbHighSeqno (which is " +
1001                                   std::to_string(vbHighSeqno) + " ). " +
1002                                   "for stream " + producer->logHeader() +
1003                                   "; vb " + std::to_string(vb_));
1004        }
1005        if (reschedule) {
1006            /* We need to do this for reschedule because in case of
1007               DCP_ADD_STREAM_FLAG_DISKONLY (the else part), end_seqno_ is
1008               set to last persisted seqno befor calling
1009               scheduleBackfill_UNLOCKED() */
1010            backfillEnd = engine->getEpStore()->getLastPersistedSeqno(vb_);
1011        } else {
1012            backfillEnd = end_seqno_;
1013        }
1014        tryBackfill = true;
1015    } else {
1016        CursorRegResult result =
1017            vbucket->checkpointManager.registerCursorBySeqno(
1018                                                name_,
1019                                                lastReadSeqno.load(),
1020                                                MustSendCheckpointEnd::NO);
1021        curChkSeqno = result.first;
1022        tryBackfill = result.second;
1023
1024        if (lastReadSeqno.load() > curChkSeqno) {
1025            throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
1026                                   "lastReadSeqno (which is " +
1027                                   std::to_string(lastReadSeqno.load()) +
1028                                   " ) is greater than curChkSeqno (which is " +
1029                                   std::to_string(curChkSeqno) + " ). " +
1030                                   "for stream " + producer->logHeader() +
1031                                   "; vb " + std::to_string(vb_));
1032        }
1033
1034        /* We need to find the minimum seqno that needs to be backfilled in
1035         * order to make sure that we don't miss anything when transitioning
1036         * to a memory snapshot. The backfill task will always make sure that
1037         * the backfill end seqno is contained in the backfill.
1038         */
1039        if (backfillStart < curChkSeqno) {
1040            if (curChkSeqno > end_seqno_) {
1041                /* Backfill only is enough */
1042                backfillEnd = end_seqno_;
1043            } else {
1044                /* Backfill + in-memory streaming */
1045                backfillEnd = curChkSeqno - 1;
1046            }
1047        }
1048    }
1049
1050    if (backfillStart <= backfillEnd && tryBackfill) {
1051        producer->getLogger().log(EXTENSION_LOG_NOTICE,
1052                                  "(vb %" PRIu16 ") Scheduling backfill "
1053                                  "from %" PRIu64 " to %" PRIu64 ", reschedule "
1054                                  "flag : %s", vb_, backfillStart, backfillEnd,
1055                                  reschedule ? "True" : "False");
1056        producer->scheduleBackfillManager(this, backfillStart, backfillEnd);
1057        isBackfillTaskRunning.store(true);
1058    } else {
1059        if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
1060            endStream(END_STREAM_OK);
1061        } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
1062            transitionState(STREAM_TAKEOVER_SEND);
1063        } else {
1064            transitionState(STREAM_IN_MEMORY);
1065        }
1066        if (reschedule) {
1067            /* Cursor was dropped, but we will not do backfill.
1068               This may happen in a corner case where, the memory
1069               usage is high due to other vbuckets and persistence cursor moves
1070               ahead of replication cursor to new checkpoint open but does not
1071               persist items yet.
1072               Note: (1) We must not notify when we schedule backfill for the
1073                         first time because the stream is not yet in producer
1074                         conn list of streams
1075                     (2) It is not absolutely necessary to notify immediately
1076                         as conn manager or an incoming items will cause a
1077                         notification eventually, but wouldn't hurt to do so */
1078            bool inverse = false;
1079            if (itemsReady.compare_exchange_strong(inverse, true)) {
1080                producer->notifyStreamReady(vb_);
1081            }
1082        }
1083    }
1084}
1085
1086void ActiveStream::handleSlowStream()
1087{
1088    LockHolder lh(streamMutex);
1089    switch (state_.load()) {
1090        case STREAM_BACKFILLING:
1091            if (isBackfillTaskRunning.load()) {
1092                /* Drop the existing cursor and set pending backfill */
1093                dropCheckpointCursor_UNLOCKED();
1094                pendingBackfill = true;
1095            } else {
1096                scheduleBackfill_UNLOCKED(true);
1097            }
1098            break;
1099        case STREAM_IN_MEMORY:
1100            /* Drop the existing cursor and set pending backfill */
1101            dropCheckpointCursor_UNLOCKED();
1102            pendingBackfill = true;
1103            break;
1104        case STREAM_TAKEOVER_SEND:
1105            /* To be handled later if needed */
1106        case STREAM_TAKEOVER_WAIT:
1107            /* To be handled later if needed */
1108        case STREAM_DEAD:
1109            /* To be handled later if needed */
1110            break;
1111        case STREAM_PENDING:
1112        case STREAM_READING:
1113            throw std::logic_error("ActiveStream::handleSlowStream: "
1114                                   "called with state " +
1115                                   std::to_string(state_.load()) + " . " +
1116                                   "for stream " + producer->logHeader() +
1117                                   "; vb " + std::to_string(vb_));
1118    }
1119}
1120
1121const char* ActiveStream::getEndStreamStatusStr(end_stream_status_t status)
1122{
1123    switch (status) {
1124    case END_STREAM_OK:
1125        return "The stream ended due to all items being streamed";
1126    case END_STREAM_CLOSED:
1127        return "The stream closed early due to a close stream message";
1128    case END_STREAM_STATE:
1129        return "The stream closed early because the vbucket state changed";
1130    case END_STREAM_DISCONNECTED:
1131        return "The stream closed early because the conn was disconnected";
1132    case END_STREAM_SLOW:
1133        return "The stream was closed early because it was too slow";
1134    }
1135    std::string msg("Status unknown: " + std::to_string(status) +
1136                    "; this should not have happened!");
1137    return msg.c_str();
1138}
1139
1140void ActiveStream::transitionState(stream_state_t newState) {
1141    producer->getLogger().log(EXTENSION_LOG_DEBUG,
1142                              "(vb %d) Transitioning from %s to %s",
1143                              vb_, stateName(state_), stateName(newState));
1144
1145    if (state_ == newState) {
1146        return;
1147    }
1148
1149    bool validTransition = false;
1150    switch (state_.load()) {
1151        case STREAM_PENDING:
1152            if (newState == STREAM_BACKFILLING || newState == STREAM_DEAD) {
1153                validTransition = true;
1154            }
1155            break;
1156        case STREAM_BACKFILLING:
1157            if(newState == STREAM_IN_MEMORY ||
1158               newState == STREAM_TAKEOVER_SEND ||
1159               newState == STREAM_DEAD) {
1160                validTransition = true;
1161            }
1162            break;
1163        case STREAM_IN_MEMORY:
1164            if (newState == STREAM_BACKFILLING || newState == STREAM_DEAD) {
1165                validTransition = true;
1166            }
1167            break;
1168        case STREAM_TAKEOVER_SEND:
1169            if (newState == STREAM_TAKEOVER_WAIT || newState == STREAM_DEAD) {
1170                validTransition = true;
1171            }
1172            break;
1173        case STREAM_TAKEOVER_WAIT:
1174            if (newState == STREAM_TAKEOVER_SEND || newState == STREAM_DEAD) {
1175                validTransition = true;
1176            }
1177            break;
1178        case STREAM_READING:
1179            // Active stream should never be in READING state.
1180            validTransition = false;
1181            break;
1182        case STREAM_DEAD:
1183            // Once DEAD, no other transitions should occur.
1184            validTransition = false;
1185            break;
1186    }
1187
1188    if (!validTransition) {
1189        throw std::invalid_argument("ActiveStream::transitionState:"
1190                " newState (which is " + std::to_string(newState) +
1191                ") is not valid for current state (which is " +
1192                std::to_string(state_) + ")");
1193    }
1194
1195    stream_state_t oldState = state_.load();
1196    state_ = newState;
1197
1198    switch (newState) {
1199        case STREAM_BACKFILLING:
1200            if (STREAM_PENDING == oldState) {
1201                scheduleBackfill_UNLOCKED(false /* reschedule */);
1202            } else if (STREAM_IN_MEMORY == oldState) {
1203                scheduleBackfill_UNLOCKED(true /* reschedule */);
1204            }
1205            break;
1206        case STREAM_IN_MEMORY:
1207            // Check if the producer has sent up till the last requested
1208            // sequence number already, if not - move checkpoint items into
1209            // the ready queue.
1210            if (lastSentSeqno.load() >= end_seqno_) {
1211                // Stream transitioning to DEAD state
1212                endStream(END_STREAM_OK);
1213                bool inverse = false;
1214                if (itemsReady.compare_exchange_strong(inverse, true)) {
1215                    producer->notifyStreamReady(vb_);
1216                }
1217            } else {
1218                nextCheckpointItem();
1219            }
1220            break;
1221        case STREAM_TAKEOVER_SEND:
1222            takeoverStart = ep_current_time();
1223            nextCheckpointItem();
1224            break;
1225        case STREAM_DEAD:
1226            {
1227                RCPtr<VBucket> vb = engine->getVBucket(vb_);
1228                if (vb) {
1229                    vb->checkpointManager.removeCursor(name_);
1230                }
1231                break;
1232            }
1233        case STREAM_TAKEOVER_WAIT:
1234        case STREAM_PENDING:
1235            break;
1236        case STREAM_READING:
1237            throw std::logic_error("ActiveStream::transitionState:"
1238                    " newState can't be " + std::to_string(newState) + "!");
1239    }
1240}
1241
1242size_t ActiveStream::getItemsRemaining() {
1243    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1244
1245    if (!vbucket || state_ == STREAM_DEAD) {
1246        return 0;
1247    }
1248
1249    // Items remaining is the sum of:
1250    // (a) Items outstanding in checkpoints
1251    // (b) Items pending in our readyQ, excluding any meta items.
1252    return vbucket->checkpointManager.getNumItemsForCursor(name_) +
1253            readyQ_non_meta_items;
1254}
1255
1256uint64_t ActiveStream::getLastReadSeqno() const {
1257    return lastReadSeqno.load();
1258}
1259
1260uint64_t ActiveStream::getLastSentSeqno() const {
1261    return lastSentSeqno.load();
1262}
1263
1264const Logger& ActiveStream::getLogger() const
1265{
1266    return producer->getLogger();
1267}
1268
1269bool ActiveStream::isSendMutationKeyOnlyEnabled() const
1270{
1271    return (KEY_ONLY == payloadType);
1272}
1273
1274bool ActiveStream::isCurrentSnapshotCompleted() const
1275{
1276    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1277    // An atomic read of vbucket state without acquiring the
1278    // reader lock for state should suffice here.
1279    if (vbucket && vbucket->getState() == vbucket_state_replica) {
1280        if (lastSentSnapEndSeqno.load(std::memory_order_relaxed) >=
1281            lastReadSeqno) {
1282            return false;
1283        }
1284    }
1285    return true;
1286}
1287
1288void ActiveStream::dropCheckpointCursor_UNLOCKED()
1289{
1290    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1291    if (!vbucket) {
1292        endStream(END_STREAM_STATE);
1293        bool inverse = false;
1294        if (itemsReady.compare_exchange_strong(inverse, true)) {
1295            producer->notifyStreamReady(vb_);
1296        }
1297    }
1298    /* Drop the existing cursor */
1299    vbucket->checkpointManager.removeCursor(name_);
1300}
1301
1302NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
1303                               const std::string &name, uint32_t flags,
1304                               uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1305                               uint64_t en_seqno, uint64_t vb_uuid,
1306                               uint64_t snap_start_seqno,
1307                               uint64_t snap_end_seqno)
1308    : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1309             snap_start_seqno, snap_end_seqno),
1310      producer(p) {
1311    LockHolder lh(streamMutex);
1312    RCPtr<VBucket> vbucket = e->getVBucket(vb_);
1313    if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
1314        pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1315        transitionState(STREAM_DEAD);
1316        itemsReady.store(true);
1317    }
1318
1319    type_ = STREAM_NOTIFIER;
1320
1321    producer->getLogger().log(EXTENSION_LOG_NOTICE,
1322        "(vb %d) stream created with start seqno %" PRIu64 " and end seqno %"
1323        PRIu64, vb, st_seqno, en_seqno);
1324}
1325
1326uint32_t NotifierStream::setDead(end_stream_status_t status) {
1327    LockHolder lh(streamMutex);
1328    if (state_ != STREAM_DEAD) {
1329        transitionState(STREAM_DEAD);
1330        if (status != END_STREAM_DISCONNECTED) {
1331            pushToReadyQ(new StreamEndResponse(opaque_, status, vb_));
1332            lh.unlock();
1333            bool inverse = false;
1334            if (itemsReady.compare_exchange_strong(inverse, true)) {
1335                producer->notifyStreamReady(vb_);
1336            }
1337        }
1338    }
1339    return 0;
1340}
1341
1342void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
1343    LockHolder lh(streamMutex);
1344    if (state_ != STREAM_DEAD && start_seqno_ < seqno) {
1345        pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1346        transitionState(STREAM_DEAD);
1347        lh.unlock();
1348        bool inverse = false;
1349        if (itemsReady.compare_exchange_strong(inverse, true)) {
1350            producer->notifyStreamReady(vb_);
1351        }
1352    }
1353}
1354
1355DcpResponse* NotifierStream::next() {
1356    LockHolder lh(streamMutex);
1357
1358    if (readyQ.empty()) {
1359        itemsReady.store(false);
1360        return NULL;
1361    }
1362
1363    DcpResponse* response = readyQ.front();
1364    if (producer->bufferLogInsert(response->getMessageSize())) {
1365        popFromReadyQ();
1366    } else {
1367        response = NULL;
1368    }
1369
1370    return response;
1371}
1372
1373void NotifierStream::transitionState(stream_state_t newState) {
1374    producer->getLogger().log(EXTENSION_LOG_DEBUG,
1375        "(vb %d) Transitioning from %s to %s", vb_,
1376        stateName(state_), stateName(newState));
1377
1378    if (state_ == newState) {
1379        return;
1380    }
1381
1382    bool validTransition = false;
1383    switch (state_.load()) {
1384        case STREAM_PENDING:
1385            if (newState == STREAM_DEAD) {
1386                validTransition = true;
1387            }
1388            break;
1389
1390        case STREAM_BACKFILLING:
1391        case STREAM_IN_MEMORY:
1392        case STREAM_TAKEOVER_SEND:
1393        case STREAM_TAKEOVER_WAIT:
1394        case STREAM_READING:
1395        case STREAM_DEAD:
1396            // No other state transitions are valid for a notifier stream.
1397            break;
1398    }
1399
1400    if (!validTransition) {
1401        throw std::invalid_argument("NotifierStream::transitionState:"
1402                " newState (which is " + std::to_string(newState) +
1403                ") is not valid for current state (which is " +
1404                std::to_string(state_) + ")");
1405    }
1406    state_ = newState;
1407}
1408
1409PassiveStream::PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t c,
1410                             const std::string &name, uint32_t flags,
1411                             uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1412                             uint64_t en_seqno, uint64_t vb_uuid,
1413                             uint64_t snap_start_seqno, uint64_t snap_end_seqno,
1414                             uint64_t vb_high_seqno)
1415    : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1416             snap_start_seqno, snap_end_seqno),
1417      engine(e), consumer(c), last_seqno(vb_high_seqno), cur_snapshot_start(0),
1418      cur_snapshot_end(0), cur_snapshot_type(none), cur_snapshot_ack(false) {
1419    LockHolder lh(streamMutex);
1420    pushToReadyQ(new StreamRequest(vb, opaque, flags, st_seqno, en_seqno,
1421                                  vb_uuid, snap_start_seqno, snap_end_seqno));
1422    itemsReady.store(true);
1423    type_ = STREAM_PASSIVE;
1424
1425    const char* type = (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) ? "takeover" : "";
1426    consumer->getLogger().log(EXTENSION_LOG_NOTICE,
1427        "(vb %" PRId16 ") Attempting to add %s stream"
1428        " with start seqno %" PRIu64 ", end seqno %" PRIu64 ","
1429        " vbucket uuid %" PRIu64 ", snap start seqno %" PRIu64 ","
1430        " snap end seqno %" PRIu64 ", and vb_high_seqno %" PRIu64 "",
1431        vb, type, st_seqno, en_seqno, vb_uuid,
1432        snap_start_seqno, snap_end_seqno, vb_high_seqno);
1433}
1434
1435PassiveStream::~PassiveStream() {
1436    uint32_t unackedBytes = clearBuffer_UNLOCKED();
1437    if (transitionState(STREAM_DEAD)) {
1438        // Destructed a "live" stream, log it.
1439        consumer->getLogger().log(EXTENSION_LOG_NOTICE,
1440            "(vb %" PRId16 ") Destructing stream."
1441            " last_seqno is %" PRIu64 ", unAckedBytes is %" PRIu32 ".",
1442            vb_, last_seqno.load(), unackedBytes);
1443    }
1444}
1445
1446uint32_t PassiveStream::setDead(end_stream_status_t status) {
1447    /* Hold buffer lock so that we clear out all items before we set the stream
1448       to dead state. We do not want to add any new message to the buffer or
1449       process any items in the buffer once we set the stream state to dead. */
1450    LockHolder lh(buffer.bufMutex);
1451    uint32_t unackedBytes = clearBuffer_UNLOCKED();
1452    bool killed = false;
1453
1454    LockHolder slh(streamMutex);
1455    if (transitionState(STREAM_DEAD)) {
1456        killed = true;
1457    }
1458
1459    if (killed) {
1460        EXTENSION_LOG_LEVEL logLevel = EXTENSION_LOG_NOTICE;
1461        if (END_STREAM_DISCONNECTED == status) {
1462            logLevel = EXTENSION_LOG_WARNING;
1463        }
1464        consumer->getLogger().log(logLevel,
1465            "(vb %" PRId16 ") Setting stream to dead state, last_seqno is %"
1466            PRIu64 ", unAckedBytes is %" PRIu32 ", status is %s",
1467            vb_, last_seqno.load(), unackedBytes, getEndStreamStatusStr(status));
1468    }
1469    return unackedBytes;
1470}
1471
1472void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
1473    LockHolder lh(streamMutex);
1474    if (state_ == STREAM_PENDING) {
1475        if (status == ENGINE_SUCCESS) {
1476            transitionState(STREAM_READING);
1477        } else {
1478            transitionState(STREAM_DEAD);
1479        }
1480        pushToReadyQ(new AddStreamResponse(add_opaque, opaque_, status));
1481        lh.unlock();
1482        bool inverse = false;
1483        if (itemsReady.compare_exchange_strong(inverse, true)) {
1484            consumer->notifyStreamReady(vb_);
1485        }
1486    }
1487}
1488
1489void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
1490                                    uint32_t new_opaque,
1491                                    uint64_t start_seqno) {
1492    vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
1493
1494    snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
1495    if (info.range.end == info.start) {
1496        info.range.start = info.start;
1497    }
1498
1499    snap_start_seqno_ = info.range.start;
1500    start_seqno_ = info.start;
1501    snap_end_seqno_ = info.range.end;
1502
1503    consumer->getLogger().log(EXTENSION_LOG_NOTICE,
1504        "(vb %d) Attempting to reconnect stream with opaque %" PRIu32
1505        ", start seq no %" PRIu64 ", end seq no %" PRIu64
1506        ", snap start seqno %" PRIu64 ", and snap end seqno %" PRIu64,
1507        vb_, new_opaque, start_seqno, end_seqno_,
1508        snap_start_seqno_, snap_end_seqno_);
1509
1510    LockHolder lh(streamMutex);
1511    last_seqno.store(start_seqno);
1512    pushToReadyQ(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
1513                                  end_seqno_, vb_uuid_, snap_start_seqno_,
1514                                  snap_end_seqno_));
1515    lh.unlock();
1516    bool inverse = false;
1517    if (itemsReady.compare_exchange_strong(inverse, true)) {
1518        consumer->notifyStreamReady(vb_);
1519    }
1520}
1521
1522ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
1523    if(nullptr == resp) {
1524        return ENGINE_EINVAL;
1525    }
1526
1527    LockHolder lh(buffer.bufMutex);
1528
1529    if (state_ == STREAM_DEAD) {
1530        delete resp;
1531        return ENGINE_KEY_ENOENT;
1532    }
1533
1534    switch (resp->getEvent()) {
1535        case DCP_MUTATION:
1536        case DCP_DELETION:
1537        case DCP_EXPIRATION:
1538        {
1539            MutationResponse* m = static_cast<MutationResponse*>(resp);
1540            uint64_t bySeqno = m->getBySeqno();
1541            if (bySeqno <= last_seqno.load()) {
1542                consumer->getLogger().log(EXTENSION_LOG_WARNING,
1543                    "(vb %d) Erroneous (out of sequence) mutation received, "
1544                    "with opaque: %" PRIu32 ", its seqno (%" PRIu64 ") is not "
1545                    "greater than last received seqno (%" PRIu64 "); "
1546                    "Dropping mutation!",
1547                    vb_, opaque_, bySeqno, last_seqno.load());
1548                delete m;
1549                return ENGINE_ERANGE;
1550            }
1551            last_seqno.store(bySeqno);
1552            break;
1553        }
1554        case DCP_SNAPSHOT_MARKER:
1555        {
1556            SnapshotMarker* s = static_cast<SnapshotMarker*>(resp);
1557            uint64_t snapStart = s->getStartSeqno();
1558            uint64_t snapEnd = s->getEndSeqno();
1559            if (snapStart < last_seqno.load() && snapEnd <= last_seqno.load()) {
1560                consumer->getLogger().log(EXTENSION_LOG_WARNING,
1561                    "(vb %d) Erroneous snapshot marker received, with "
1562                    "opaque: %" PRIu32 ", its start "
1563                    "(%" PRIu64 "), and end (%" PRIu64 ") are less than last "
1564                    "received seqno (%" PRIu64 "); Dropping marker!",
1565                    vb_, opaque_, snapStart, snapEnd, last_seqno.load());
1566                delete s;
1567                return ENGINE_ERANGE;
1568            }
1569            break;
1570        }
1571        case DCP_SET_VBUCKET:
1572        case DCP_STREAM_END:
1573        {
1574            /* No validations necessary */
1575            break;
1576        }
1577        default:
1578        {
1579            consumer->getLogger().log(EXTENSION_LOG_WARNING,
1580                "(vb %d) Unknown DCP op received: %d; Disconnecting connection..",
1581                vb_, resp->getEvent());
1582            return ENGINE_DISCONNECT;
1583        }
1584    }
1585
1586    if (engine->getReplicationThrottle().shouldProcess() && !buffer.items) {
1587        /* Process the response here itself rather than buffering it */
1588        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1589        switch (resp->getEvent()) {
1590            case DCP_MUTATION:
1591                ret = processMutation(static_cast<MutationResponse*>(resp));
1592                break;
1593            case DCP_DELETION:
1594            case DCP_EXPIRATION:
1595                ret = processDeletion(static_cast<MutationResponse*>(resp));
1596                break;
1597            case DCP_SNAPSHOT_MARKER:
1598                processMarker(static_cast<SnapshotMarker*>(resp));
1599                break;
1600            case DCP_SET_VBUCKET:
1601                processSetVBucketState(static_cast<SetVBucketState*>(resp));
1602                break;
1603            case DCP_STREAM_END:
1604                {
1605                    LockHolder lh(streamMutex);
1606                    transitionState(STREAM_DEAD);
1607                }
1608                break;
1609            default:
1610                // Above switch should've returned DISCONNECT, throw an exception
1611                throw std::logic_error("PassiveStream::messageReceived: (vb " +
1612                                       std::to_string(vb_) +
1613                                       ") received unknown message type " +
1614                                       std::to_string(resp->getEvent()));
1615        }
1616        if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
1617            delete resp;
1618            return ret;
1619        }
1620    }
1621
1622    buffer.messages.push(resp);
1623    buffer.items++;
1624    buffer.bytes += resp->getMessageSize();
1625
1626    return ENGINE_TMPFAIL;
1627}
1628
1629process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes,
1630                                                             size_t batchSize) {
1631    LockHolder lh(buffer.bufMutex);
1632    uint32_t count = 0;
1633    uint32_t message_bytes = 0;
1634    uint32_t total_bytes_processed = 0;
1635    bool failed = false;
1636    if (buffer.messages.empty()) {
1637        return all_processed;
1638    }
1639
1640    while (count < batchSize && !buffer.messages.empty()) {
1641        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1642        /* If the stream is in dead state we should not process any remaining
1643           items in the buffer, we should rather clear them */
1644        if (state_ == STREAM_DEAD) {
1645            total_bytes_processed += clearBuffer_UNLOCKED();
1646            processed_bytes = total_bytes_processed;
1647            return all_processed;
1648        }
1649
1650        DcpResponse *response = buffer.messages.front();
1651        message_bytes = response->getMessageSize();
1652
1653        switch (response->getEvent()) {
1654            case DCP_MUTATION:
1655                ret = processMutation(static_cast<MutationResponse*>(response));
1656                break;
1657            case DCP_DELETION:
1658            case DCP_EXPIRATION:
1659                ret = processDeletion(static_cast<MutationResponse*>(response));
1660                break;
1661            case DCP_SNAPSHOT_MARKER:
1662                processMarker(static_cast<SnapshotMarker*>(response));
1663                break;
1664            case DCP_SET_VBUCKET:
1665                processSetVBucketState(static_cast<SetVBucketState*>(response));
1666                break;
1667            case DCP_STREAM_END:
1668                {
1669                    LockHolder lh(streamMutex);
1670                    transitionState(STREAM_DEAD);
1671                }
1672                break;
1673            default:
1674                consumer->getLogger().log(EXTENSION_LOG_WARNING,
1675                                          "PassiveStream::processBufferedMessages:"
1676                                          "(vb %" PRIu16 ") PassiveStream failing "
1677                                          "unknown message type %d",
1678                                          vb_, response->getEvent());
1679                failed = true;
1680        }
1681
1682        if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
1683            failed = true;
1684            break;
1685        }
1686
1687        delete response;
1688        buffer.messages.pop();
1689        buffer.items--;
1690        buffer.bytes -= message_bytes;
1691        count++;
1692        if (ret != ENGINE_ERANGE) {
1693            total_bytes_processed += message_bytes;
1694        }
1695    }
1696
1697    processed_bytes = total_bytes_processed;
1698
1699    if (failed) {
1700        return cannot_process;
1701    }
1702
1703    return all_processed;
1704}
1705
1706ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
1707    RCPtr<VBucket> vb = engine->getVBucket(vb_);
1708    if (!vb) {
1709        return ENGINE_NOT_MY_VBUCKET;
1710    }
1711
1712    if (mutation->getBySeqno() < cur_snapshot_start.load() ||
1713        mutation->getBySeqno() > cur_snapshot_end.load()) {
1714        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1715            "(vb %d) Erroneous mutation [sequence "
1716            "number does not fall in the expected snapshot range : "
1717            "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64 ") <= "
1718            "snapshot_end (%" PRIu64 ")]; Dropping the mutation!",
1719            vb_, cur_snapshot_start.load(),
1720            mutation->getBySeqno(), cur_snapshot_end.load());
1721        return ENGINE_ERANGE;
1722    }
1723
1724    // MB-17517: Check for the incoming item's CAS validity. We /shouldn't/
1725    // receive anything without a valid CAS, however given that versions without
1726    // this check may send us "bad" CAS values, we should regenerate them (which
1727    // is better than rejecting the data entirely).
1728    if (!Item::isValidCas(mutation->getItem()->getCas())) {
1729        LOG(EXTENSION_LOG_WARNING,
1730            "%s Invalid CAS (0x%" PRIx64 ") received for mutation {vb:%" PRIu16
1731            ", seqno:%" PRId64 "}. Regenerating new CAS",
1732            consumer->logHeader(),
1733            mutation->getItem()->getCas(), vb_,
1734            mutation->getItem()->getBySeqno());
1735        mutation->getItem()->setCas();
1736    }
1737
1738    ENGINE_ERROR_CODE ret;
1739    if (vb->isBackfillPhase()) {
1740        ret = engine->getEpStore()->addTAPBackfillItem(*mutation->getItem(),
1741                                                    INITIAL_NRU_VALUE,
1742                                                    false,
1743                                                    mutation->getExtMetaData());
1744    } else {
1745        ret = engine->getEpStore()->setWithMeta(*mutation->getItem(), 0, NULL,
1746                                                consumer->getCookie(), true,
1747                                                true, INITIAL_NRU_VALUE,
1748                                                GenerateBySeqno::No,
1749                                                GenerateCas::No,
1750                                                mutation->getExtMetaData(),
1751                                                true);
1752    }
1753
1754    // We should probably handle these error codes in a better way, but since
1755    // the producer side doesn't do anything with them anyways let's just log
1756    // them for now until we come up with a better solution.
1757    if (ret != ENGINE_SUCCESS) {
1758        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1759            "Got an error code %d while trying to process mutation", ret);
1760    } else {
1761        handleSnapshotEnd(vb, mutation->getBySeqno());
1762    }
1763
1764    return ret;
1765}
1766
1767ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
1768    RCPtr<VBucket> vb = engine->getVBucket(vb_);
1769    if (!vb) {
1770        return ENGINE_NOT_MY_VBUCKET;
1771    }
1772
1773    if (deletion->getBySeqno() < cur_snapshot_start.load() ||
1774        deletion->getBySeqno() > cur_snapshot_end.load()) {
1775        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1776            "(vb %d) Erroneous deletion [sequence "
1777            "number does not fall in the expected snapshot range : "
1778            "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64 ") <= "
1779            "snapshot_end (%" PRIu64 ")]; Dropping the deletion!",
1780            vb_, cur_snapshot_start.load(),
1781            deletion->getBySeqno(), cur_snapshot_end.load());
1782        return ENGINE_ERANGE;
1783    }
1784
1785    uint64_t delCas = 0;
1786    ENGINE_ERROR_CODE ret;
1787    ItemMetaData meta = deletion->getItem()->getMetaData();
1788
1789    // MB-17517: Check for the incoming item's CAS validity.
1790    if (!Item::isValidCas(meta.cas)) {
1791        LOG(EXTENSION_LOG_WARNING,
1792            "%s Invalid CAS (0x%" PRIx64 ") received for deletion {vb:%" PRIu16
1793            ", seqno:%" PRId64 "}. Regenerating new CAS",
1794            consumer->logHeader(), meta.cas, vb_, deletion->getBySeqno());
1795        meta.cas = Item::nextCas();
1796    }
1797
1798    ret = engine->getEpStore()->deleteWithMeta(deletion->getItem()->getKey(),
1799                                               &delCas, NULL, deletion->getVBucket(),
1800                                               consumer->getCookie(), true,
1801                                               &meta, vb->isBackfillPhase(),
1802                                               GenerateBySeqno::No,
1803                                               GenerateCas::No,
1804                                               deletion->getBySeqno(),
1805                                               deletion->getExtMetaData(),
1806                                               true);
1807    if (ret == ENGINE_KEY_ENOENT) {
1808        ret = ENGINE_SUCCESS;
1809    }
1810
1811    // We should probably handle these error codes in a better way, but since
1812    // the producer side doesn't do anything with them anyways let's just log
1813    // them for now until we come up with a better solution.
1814    if (ret != ENGINE_SUCCESS) {
1815        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1816            "Got an error code %d while trying to process deletion", ret);
1817    } else {
1818        handleSnapshotEnd(vb, deletion->getBySeqno());
1819    }
1820
1821    return ret;
1822}
1823
1824void PassiveStream::processMarker(SnapshotMarker* marker) {
1825    RCPtr<VBucket> vb = engine->getVBucket(vb_);
1826
1827    cur_snapshot_start.store(marker->getStartSeqno());
1828    cur_snapshot_end.store(marker->getEndSeqno());
1829    cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory);
1830
1831    if (vb) {
1832        if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
1833            vb->setBackfillPhase(true);
1834            // calling checkpointManager.setBackfillPhase sets the
1835            // openCheckpointId to zero
1836            vb->checkpointManager.setBackfillPhase(cur_snapshot_start.load(),
1837                                                   cur_snapshot_end.load());
1838        } else {
1839            if (marker->getFlags() & MARKER_FLAG_CHK ||
1840                    vb->checkpointManager.getOpenCheckpointId() == 0) {
1841                vb->checkpointManager.createSnapshot(cur_snapshot_start.load(),
1842                                                     cur_snapshot_end.load());
1843            } else {
1844                vb->checkpointManager.updateCurrentSnapshotEnd(cur_snapshot_end.load());
1845            }
1846            vb->setBackfillPhase(false);
1847        }
1848
1849        if (marker->getFlags() & MARKER_FLAG_ACK) {
1850            cur_snapshot_ack = true;
1851        }
1852    }
1853}
1854
1855void PassiveStream::processSetVBucketState(SetVBucketState* state) {
1856    engine->getEpStore()->setVBucketState(vb_, state->getState(), true);
1857
1858    LockHolder lh (streamMutex);
1859    pushToReadyQ(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
1860    lh.unlock();
1861    bool inverse = false;
1862    if (itemsReady.compare_exchange_strong(inverse, true)) {
1863        consumer->notifyStreamReady(vb_);
1864    }
1865}
1866
1867void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
1868    if (byseqno == cur_snapshot_end.load()) {
1869        if (cur_snapshot_type.load() == disk && vb->isBackfillPhase()) {
1870            vb->setBackfillPhase(false);
1871            uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1872            vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1873        } else {
1874            size_t mem_threshold = engine->getEpStats().mem_high_wat.load();
1875            size_t mem_used = engine->getEpStats().getTotalMemoryUsed();
1876            /* We want to add a new replica checkpoint if the mem usage is above
1877               high watermark (85%) */
1878            if (mem_threshold < mem_used) {
1879                uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1880                vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1881            }
1882        }
1883
1884        if (cur_snapshot_ack) {
1885            LockHolder lh(streamMutex);
1886            pushToReadyQ(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
1887            lh.unlock();
1888            bool inverse = false;
1889            if (itemsReady.compare_exchange_strong(inverse, true)) {
1890                consumer->notifyStreamReady(vb_);
1891            }
1892            cur_snapshot_ack = false;
1893        }
1894        cur_snapshot_type.store(none);
1895    }
1896}
1897
1898void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
1899    Stream::addStats(add_stat, c);
1900
1901    try {
1902        const int bsize = 1024;
1903        char buf[bsize];
1904        checked_snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(),
1905                         vb_);
1906        add_casted_stat(buf, buffer.items, add_stat, c);
1907        checked_snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(),
1908                         vb_);
1909        add_casted_stat(buf, buffer.bytes, add_stat, c);
1910        checked_snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(),
1911                         vb_);
1912        add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
1913        checked_snprintf(buf, bsize, "%s:stream_%d_last_received_seqno",
1914                         name_.c_str(), vb_);
1915        add_casted_stat(buf, last_seqno.load(), add_stat, c);
1916        checked_snprintf(buf, bsize, "%s:stream_%d_ready_queue_memory",
1917                         name_.c_str(), vb_);
1918        add_casted_stat(buf, getReadyQueueMemory(), add_stat, c);
1919
1920        checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type",
1921                         name_.c_str(), vb_);
1922        add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type.load()),
1923                        add_stat, c);
1924
1925        if (cur_snapshot_type.load() != none) {
1926            checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start",
1927                             name_.c_str(), vb_);
1928            add_casted_stat(buf, cur_snapshot_start.load(), add_stat, c);
1929            checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end",
1930                             name_.c_str(), vb_);
1931            add_casted_stat(buf, cur_snapshot_end.load(), add_stat, c);
1932        }
1933    } catch (std::exception& error) {
1934        LOG(EXTENSION_LOG_WARNING,
1935            "PassiveStream::addStats: Failed to build stats: %s", error.what());
1936    }
1937}
1938
1939DcpResponse* PassiveStream::next() {
1940    LockHolder lh(streamMutex);
1941
1942    if (readyQ.empty()) {
1943        itemsReady.store(false);
1944        return NULL;
1945    }
1946
1947    DcpResponse* response = readyQ.front();
1948    popFromReadyQ();
1949    return response;
1950}
1951
1952uint32_t PassiveStream::clearBuffer_UNLOCKED() {
1953    uint32_t unackedBytes = buffer.bytes;
1954
1955    while (!buffer.messages.empty()) {
1956        DcpResponse* resp = buffer.messages.front();
1957        buffer.messages.pop();
1958        delete resp;
1959    }
1960
1961    buffer.bytes = 0;
1962    buffer.items = 0;
1963    return unackedBytes;
1964}
1965
1966bool PassiveStream::transitionState(stream_state_t newState) {
1967    consumer->getLogger().log(EXTENSION_LOG_DEBUG,
1968        "(vb %d) Transitioning from %s to %s",
1969        vb_, stateName(state_), stateName(newState));
1970
1971    if (state_ == newState) {
1972        return false;
1973    }
1974
1975    bool validTransition = false;
1976    switch (state_.load()) {
1977        case STREAM_PENDING:
1978            if (newState == STREAM_READING || newState == STREAM_DEAD) {
1979                validTransition = true;
1980            }
1981            break;
1982
1983        case STREAM_BACKFILLING:
1984        case STREAM_IN_MEMORY:
1985        case STREAM_TAKEOVER_SEND:
1986        case STREAM_TAKEOVER_WAIT:
1987            // Not valid for passive streams
1988            break;
1989
1990        case STREAM_READING:
1991            if (newState == STREAM_PENDING || newState == STREAM_DEAD) {
1992                validTransition = true;
1993            }
1994            break;
1995
1996        case STREAM_DEAD:
1997            // Once 'dead' shouldn't transition away from it.
1998            break;
1999    }
2000
2001    if (!validTransition) {
2002        throw std::invalid_argument("PassiveStream::transitionState:"
2003                " newState (which is" + std::to_string(newState) +
2004                ") is not valid for current state (which is " +
2005                std::to_string(state_) + ")");
2006    }
2007
2008    state_ = newState;
2009    return true;
2010}
2011
2012const char* PassiveStream::getEndStreamStatusStr(end_stream_status_t status)
2013{
2014    switch (status) {
2015        case END_STREAM_OK:
2016            return "The stream closed as part of normal operation";
2017        case END_STREAM_CLOSED:
2018            return "The stream closed due to a close stream message";
2019        case END_STREAM_DISCONNECTED:
2020            return "The stream closed early because the conn was disconnected";
2021        case END_STREAM_STATE:
2022            return "The stream closed early because the vbucket state changed";
2023        default:
2024            break;
2025    }
2026    std::string msg("Status unknown: " + std::to_string(status) +
2027                    "; this should not have happened!");
2028    return msg.c_str();
2029}
2030