xref: /4.6.4/ep-engine/src/dcp/stream.cc (revision 0286542c)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <platform/checked_snprintf.h>
21
22#include "ep_engine.h"
23#include "failover-table.h"
24#include "kvstore.h"
25#include "statwriter.h"
26#include "dcp/backfill-manager.h"
27#include "dcp/backfill.h"
28#include "dcp/consumer.h"
29#include "dcp/producer.h"
30#include "dcp/response.h"
31#include "dcp/stream.h"
32#include "replicationthrottle.h"
33
34static const char* snapshotTypeToString(snapshot_type_t type) {
35    static const char * const snapshotTypes[] = { "none", "disk", "memory" };
36    if (type < none || type > memory) {
37        throw std::invalid_argument("snapshotTypeToString: type (which is " +
38                                    std::to_string(type) +
39                                    ") is not a valid snapshot_type_t");
40    }
41    return snapshotTypes[type];
42}
43
44const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
45
46Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
47               uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
48               uint64_t vb_uuid, uint64_t snap_start_seqno,
49               uint64_t snap_end_seqno)
50    : name_(name), flags_(flags), opaque_(opaque), vb_(vb),
51      start_seqno_(start_seqno), end_seqno_(end_seqno), vb_uuid_(vb_uuid),
52      snap_start_seqno_(snap_start_seqno),
53      snap_end_seqno_(snap_end_seqno),
54      state_(STREAM_PENDING), itemsReady(false),
55      readyQ_non_meta_items(0),
56      readyQueueMemory(0) {
57}
58
59Stream::~Stream() {
60    // NB: reusing the "unlocked" method without a lock because we're
61    // destructing and should not take any locks.
62    clear_UNLOCKED();
63}
64
65void Stream::clear_UNLOCKED() {
66    while (!readyQ.empty()) {
67        DcpResponse* resp = readyQ.front();
68        popFromReadyQ();
69        delete resp;
70    }
71}
72
73void Stream::pushToReadyQ(DcpResponse* resp)
74{
75   /* expect streamMutex.ownsLock() == true */
76    if (resp) {
77        readyQ.push(resp);
78        if (!resp->isMetaEvent()) {
79            readyQ_non_meta_items++;
80        }
81        readyQueueMemory.fetch_add(resp->getMessageSize(),
82                                   std::memory_order_relaxed);
83    }
84}
85
86void Stream::popFromReadyQ(void)
87{
88    /* expect streamMutex.ownsLock() == true */
89    if (!readyQ.empty()) {
90        const auto& front = readyQ.front();
91        if (!front->isMetaEvent()) {
92            readyQ_non_meta_items--;
93        }
94        const uint32_t respSize = front->getMessageSize();
95        readyQ.pop();
96
97        /* Decrement the readyQ size */
98        if (respSize <= readyQueueMemory.load(std::memory_order_relaxed)) {
99            readyQueueMemory.fetch_sub(respSize, std::memory_order_relaxed);
100        } else {
101            LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
102                "underflow, likely wrong stat calculation! curr size: %" PRIu64
103                "; new size: %d",
104                name_.c_str(), getVBucket(),
105                readyQueueMemory.load(std::memory_order_relaxed), respSize);
106            readyQueueMemory.store(0, std::memory_order_relaxed);
107        }
108    }
109}
110
111uint64_t Stream::getReadyQueueMemory() {
112    return readyQueueMemory.load(std::memory_order_relaxed);
113}
114
115const char* Stream::stateName(stream_state_t st) {
116    static const char * const stateNames[] = {
117        "pending", "backfilling", "in-memory", "takeover-send", "takeover-wait",
118        "reading", "dead"
119    };
120    if (st < STREAM_PENDING || st > STREAM_DEAD) {
121        throw std::invalid_argument("Stream::stateName: st (which is " +
122                                        std::to_string(st) +
123                                        ") is not a valid stream_state_t");
124    }
125    return stateNames[st];
126}
127
128void Stream::addStats(ADD_STAT add_stat, const void *c) {
129    try {
130        const int bsize = 1024;
131        char buffer[bsize];
132        checked_snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(),
133                         vb_);
134        add_casted_stat(buffer, flags_, add_stat, c);
135        checked_snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(),
136                         vb_);
137        add_casted_stat(buffer, opaque_, add_stat, c);
138        checked_snprintf(buffer, bsize, "%s:stream_%d_start_seqno",
139                         name_.c_str(), vb_);
140        add_casted_stat(buffer, start_seqno_, add_stat, c);
141        checked_snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(),
142                         vb_);
143        add_casted_stat(buffer, end_seqno_, add_stat, c);
144        checked_snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(),
145                         vb_);
146        add_casted_stat(buffer, vb_uuid_, add_stat, c);
147        checked_snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno",
148                         name_.c_str(), vb_);
149        add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
150        checked_snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno",
151                         name_.c_str(), vb_);
152        add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
153        checked_snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(),
154                         vb_);
155        add_casted_stat(buffer, stateName(state_), add_stat, c);
156    } catch (std::exception& error) {
157        LOG(EXTENSION_LOG_WARNING,
158            "Stream::addStats: Failed to build stats: %s", error.what());
159    }
160}
161
162ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
163                           const std::string &n, uint32_t flags,
164                           uint32_t opaque, uint16_t vb, uint64_t st_seqno,
165                           uint64_t en_seqno, uint64_t vb_uuid,
166                           uint64_t snap_start_seqno, uint64_t snap_end_seqno)
167    :  Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
168              snap_start_seqno, snap_end_seqno),
169       isBackfillTaskRunning(false), pendingBackfill(false),
170       lastReadSeqnoUnSnapshotted(st_seqno), lastReadSeqno(st_seqno),
171       lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
172       takeoverState(vbucket_state_pending), backfillRemaining(0),
173       itemsFromMemoryPhase(0), firstMarkerSent(false), waitForSnapshot(0),
174       engine(e), producer(p),
175       payloadType((flags & DCP_ADD_STREAM_FLAG_NO_VALUE) ? KEY_ONLY :
176                                                            KEY_VALUE),
177       lastSentSnapEndSeqno(0), chkptItemsExtractionInProgress(false) {
178
179    const char* type = "";
180    if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
181        type = "takeover ";
182        end_seqno_ = dcpMaxSeqno;
183    }
184
185    RCPtr<VBucket> vbucket = engine->getVBucket(vb);
186    if (vbucket) {
187        ReaderLockHolder rlh(vbucket->getStateLock());
188        if (vbucket->getState() == vbucket_state_replica) {
189            snapshot_info_t info = vbucket->checkpointManager.getSnapshotInfo();
190            if (info.range.end > en_seqno) {
191                end_seqno_ = info.range.end;
192            }
193        }
194    }
195
196    producer->getLogger().log(EXTENSION_LOG_NOTICE,
197        "(vb %" PRIu16 ") Creating %sstream with start seqno %" PRIu64
198        " and end seqno %" PRIu64, vb, type, st_seqno, en_seqno);
199
200    backfillItems.memory = 0;
201    backfillItems.disk = 0;
202    backfillItems.sent = 0;
203
204    type_ = STREAM_ACTIVE;
205
206    bufferedBackfill.bytes = 0;
207    bufferedBackfill.items = 0;
208
209    takeoverStart = 0;
210    takeoverSendMaxTime = engine->getConfiguration().getDcpTakeoverMaxTime();
211
212    if (start_seqno_ >= end_seqno_) {
213        /* streamMutex lock needs to be acquired because endStream
214         * potentially makes call to pushToReadyQueue.
215         */
216        LockHolder lh(streamMutex);
217        endStream(END_STREAM_OK);
218        itemsReady.store(true);
219        // lock is released on leaving the scope
220    }
221}
222
223ActiveStream::~ActiveStream() {
224    transitionState(STREAM_DEAD);
225}
226
227DcpResponse* ActiveStream::next() {
228    LockHolder lh(streamMutex);
229
230    stream_state_t initState = state_;
231
232    DcpResponse* response = NULL;
233
234    bool validTransition = false;
235    switch (initState) {
236        case STREAM_PENDING:
237            validTransition = true;
238            break;
239        case STREAM_BACKFILLING:
240            validTransition = true;
241            response = backfillPhase(lh);
242            break;
243        case STREAM_IN_MEMORY:
244            validTransition = true;
245            response = inMemoryPhase();
246            break;
247        case STREAM_TAKEOVER_SEND:
248            validTransition = true;
249            response = takeoverSendPhase();
250            break;
251        case STREAM_TAKEOVER_WAIT:
252            validTransition = true;
253            response = takeoverWaitPhase();
254            break;
255        case STREAM_READING:
256            // Not valid for an active stream.
257            break;
258        case STREAM_DEAD:
259            validTransition = true;
260            response = deadPhase();
261            break;
262    }
263
264    if (!validTransition) {
265        throw std::invalid_argument("ActiveStream::transitionState:"
266                " invalid state " + std::to_string(state_) + " for stream " +
267                producer->logHeader() + " vb " + std::to_string(vb_));
268    }
269
270    stream_state_t newState = state_;
271
272    if (newState != STREAM_DEAD && newState != state_ && !response) {
273        lh.unlock();
274        return next();
275    }
276
277    itemsReady.store(response ? true : false);
278    return response;
279}
280
281void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
282    LockHolder lh(streamMutex);
283    uint64_t chkCursorSeqno = endSeqno;
284
285    if (state_ != STREAM_BACKFILLING) {
286        producer->getLogger().log(EXTENSION_LOG_WARNING,
287                                  "(vb %" PRIu16 ") ActiveStream::"
288                                  "markDiskSnapshot: Unexpected state_:%s",
289                                  vb_, stateName(state_));
290        return;
291    }
292
293    startSeqno = std::min(snap_start_seqno_, startSeqno);
294    firstMarkerSent = true;
295
296    RCPtr<VBucket> vb = engine->getVBucket(vb_);
297    if (!vb) {
298        producer->getLogger().log(EXTENSION_LOG_WARNING,"(vb %" PRIu16 ") "
299                                  "ActiveStream::markDiskSnapshot, vbucket "
300                                  "does not exist", vb_);
301        return;
302    }
303    // An atomic read of vbucket state without acquiring the
304    // reader lock for state should suffice here.
305    if (vb->getState() == vbucket_state_replica) {
306        if (end_seqno_ > endSeqno) {
307            /* We possibly have items in the open checkpoint
308               (incomplete snapshot) */
309            snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
310            producer->getLogger().log(EXTENSION_LOG_NOTICE,
311                "(vb %" PRIu16 ") Merging backfill and memory snapshot for a "
312                "replica vbucket, backfill start seqno %" PRIu64 ", "
313                "backfill end seqno %" PRIu64 ", "
314                "snapshot end seqno after merge %" PRIu64,
315                vb_, startSeqno, endSeqno, info.range.end);
316            endSeqno = info.range.end;
317        }
318    }
319
320    producer->getLogger().log(EXTENSION_LOG_NOTICE,
321        "(vb %" PRIu16 ") Sending disk snapshot with start seqno %" PRIu64
322        " and end seqno %" PRIu64, vb_, startSeqno, endSeqno);
323    pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
324                                    MARKER_FLAG_DISK));
325    lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
326
327    if (!(flags_ & DCP_ADD_STREAM_FLAG_DISKONLY)) {
328        // Only re-register the cursor if we still need to get memory snapshots
329        CursorRegResult result =
330            vb->checkpointManager.registerCursorBySeqno(
331                                                name_, chkCursorSeqno,
332                                                MustSendCheckpointEnd::NO);
333        curChkSeqno = result.first;
334    }
335
336    lh.unlock();
337    bool inverse = false;
338    if (itemsReady.compare_exchange_strong(inverse, true)) {
339        producer->notifyStreamReady(vb_);
340    }
341}
342
343bool ActiveStream::backfillReceived(Item* itm, backfill_source_t backfill_source) {
344    if (nullptr == itm) {
345        return false;
346    }
347
348    if (itm->shouldReplicate()) {
349        LockHolder lh(streamMutex);
350        if (state_ == STREAM_BACKFILLING) {
351            if (!producer->recordBackfillManagerBytesRead(itm->size())) {
352                delete itm;
353                return false;
354            }
355
356            bufferedBackfill.bytes.fetch_add(itm->size());
357            bufferedBackfill.items++;
358
359            pushToReadyQ(new MutationResponse(itm, opaque_, nullptr));
360
361            lastReadSeqno.store(itm->getBySeqno());
362            lh.unlock();
363            bool inverse = false;
364            if (itemsReady.compare_exchange_strong(inverse, true)) {
365                producer->notifyStreamReady(vb_);
366            }
367
368            if (backfill_source == BACKFILL_FROM_MEMORY) {
369                backfillItems.memory++;
370            } else {
371                backfillItems.disk++;
372            }
373        } else {
374            delete itm;
375        }
376    } else {
377        delete itm;
378    }
379
380    return true;
381}
382
383void ActiveStream::completeBackfill() {
384    {
385        LockHolder lh(streamMutex);
386        if (state_ == STREAM_BACKFILLING) {
387            producer->getLogger().log(EXTENSION_LOG_NOTICE,
388                    "(vb %" PRIu16 ") Backfill complete, %" PRIu64 " items "
389                    "read from disk, %" PRIu64 " from memory, last seqno read: "
390                    "%" PRIu64 ", pendingBackfill : %s",
391                    vb_, uint64_t(backfillItems.disk.load()),
392                    uint64_t(backfillItems.memory.load()),
393                    lastReadSeqno.load(),
394                    pendingBackfill ? "True" : "False");
395        } else {
396            producer->getLogger().log(EXTENSION_LOG_WARNING,
397                    "(vb %" PRIu16 ") ActiveStream::completeBackfill: "
398                    "Unexpected state_:%s", vb_, stateName(state_));
399        }
400    }
401
402    bool inverse = true;
403    isBackfillTaskRunning.compare_exchange_strong(inverse, false);
404    inverse = false;
405    if (itemsReady.compare_exchange_strong(inverse, true)) {
406        producer->notifyStreamReady(vb_);
407    }
408}
409
410void ActiveStream::snapshotMarkerAckReceived() {
411    bool inverse = false;
412    if (--waitForSnapshot == 0 &&
413        itemsReady.compare_exchange_strong(inverse, true)) {
414        producer->notifyStreamReady(vb_);
415    }
416}
417
418void ActiveStream::setVBucketStateAckRecieved() {
419    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
420    if (!vbucket) {
421        producer->getLogger().log(EXTENSION_LOG_WARNING,
422                                  "(vb %" PRIu16
423                                  ") not present during ack for set vbucket "
424                                  "during takeover",
425                                  vb_);
426        return;
427    }
428
429    {
430        /* Order in which the below 3 locks are acquired is important to avoid
431           any potential lock inversion problems */
432        LockHolder epVbSetLh(engine->getEpStore()->getVbSetMutexLock());
433        WriterLockHolder vbStateLh(vbucket->getStateLock());
434        LockHolder lh(streamMutex);
435        if (state_ == STREAM_TAKEOVER_WAIT) {
436            if (takeoverState == vbucket_state_pending) {
437                producer->getLogger().log(EXTENSION_LOG_INFO,
438                        "(vb %" PRIu16 ") Receive ack for set vbucket state to "
439                        "pending message",
440                        vb_);
441
442                takeoverState = vbucket_state_active;
443                transitionState(STREAM_TAKEOVER_SEND);
444
445                engine->getEpStore()->setVBucketState_UNLOCKED(
446                                                        vb_,
447                                                        vbucket_state_dead,
448                                                        false /* transfer */,
449                                                        false /* notify_dcp */,
450                                                        epVbSetLh,
451                                                        &vbStateLh);
452
453                producer->getLogger().log(EXTENSION_LOG_NOTICE,
454                        "(vb %" PRIu16 ") Vbucket marked as dead, last sent "
455                        "seqno: %" PRIu64 ", high seqno: %" PRIu64,
456                        vb_,
457                        lastSentSeqno.load(),
458                        vbucket->getHighSeqno());
459            } else {
460                producer->getLogger().log(EXTENSION_LOG_NOTICE,
461                        "(vb %" PRIu16 ") Receive ack for set vbucket state to "
462                        "active message",
463                        vb_);
464                endStream(END_STREAM_OK);
465            }
466        } else {
467            producer->getLogger().log(EXTENSION_LOG_WARNING,
468                    "(vb %" PRIu16 ") Unexpected ack for set vbucket op on "
469                    "stream '%s' state '%s'",
470                    vb_,
471                    name_.c_str(),
472                    stateName(state_));
473            return;
474        }
475    }
476
477    bool inverse = false;
478    if (itemsReady.compare_exchange_strong(inverse, true)) {
479        producer->notifyStreamReady(vb_);
480    }
481}
482
483DcpResponse* ActiveStream::backfillPhase(LockHolder& lh) {
484    DcpResponse* resp = nextQueuedItem();
485
486    if (resp && (resp->getEvent() == DCP_MUTATION ||
487         resp->getEvent() == DCP_DELETION ||
488         resp->getEvent() == DCP_EXPIRATION)) {
489        MutationResponse* m = static_cast<MutationResponse*>(resp);
490        producer->recordBackfillManagerBytesSent(m->getItem()->size());
491        bufferedBackfill.bytes.fetch_sub(m->getItem()->size());
492        bufferedBackfill.items--;
493        if (backfillRemaining.load(std::memory_order_relaxed) > 0) {
494            backfillRemaining.fetch_sub(1, std::memory_order_relaxed);
495        }
496    }
497
498    if (!isBackfillTaskRunning && readyQ.empty()) {
499        // Given readyQ.empty() is True resp will be NULL
500        backfillRemaining.store(0, std::memory_order_relaxed);
501        // The previous backfill has completed.  Check to see if another
502        // backfill needs to be scheduled.
503        if (pendingBackfill) {
504            scheduleBackfill_UNLOCKED(true);
505            pendingBackfill = false;
506        } else {
507            if (lastReadSeqno.load() >= end_seqno_) {
508                endStream(END_STREAM_OK);
509            } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
510                transitionState(STREAM_TAKEOVER_SEND);
511            } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
512                endStream(END_STREAM_OK);
513            } else {
514                transitionState(STREAM_IN_MEMORY);
515            }
516
517            if (!resp) {
518                resp = nextQueuedItem();
519            }
520        }
521    }
522
523    return resp;
524}
525
526DcpResponse* ActiveStream::inMemoryPhase() {
527    if (lastSentSeqno.load() >= end_seqno_) {
528        endStream(END_STREAM_OK);
529    } else if (readyQ.empty()) {
530        if (pendingBackfill) {
531            // Moving the state from STREAM_IN_MEMORY to
532            // STREAM_BACKFILLING will result in a backfill being scheduled
533            transitionState(STREAM_BACKFILLING);
534            pendingBackfill = false;
535            return NULL;
536        } else if (nextCheckpointItem()) {
537            return NULL;
538        }
539    }
540
541    return nextQueuedItem();
542}
543
544DcpResponse* ActiveStream::takeoverSendPhase() {
545
546    RCPtr<VBucket> vb = engine->getVBucket(vb_);
547    if (vb && takeoverStart != 0 &&
548        !vb->isTakeoverBackedUp() &&
549        (ep_current_time() - takeoverStart) > takeoverSendMaxTime) {
550        vb->setTakeoverBackedUpState(true);
551    }
552
553    if (!readyQ.empty()) {
554        return nextQueuedItem();
555    } else {
556        if (nextCheckpointItem()) {
557            return NULL;
558        }
559    }
560
561    if (waitForSnapshot != 0) {
562        return NULL;
563    }
564
565    if (vb) {
566        vb->setTakeoverBackedUpState(false);
567        takeoverStart = 0;
568    }
569
570    DcpResponse* resp = NULL;
571    if (producer->bufferLogInsert(SetVBucketState::baseMsgBytes)) {
572        resp = new SetVBucketState(opaque_, vb_, takeoverState);
573        transitionState(STREAM_TAKEOVER_WAIT);
574    }
575    return resp;
576}
577
578DcpResponse* ActiveStream::takeoverWaitPhase() {
579    return nextQueuedItem();
580}
581
582DcpResponse* ActiveStream::deadPhase() {
583    DcpResponse* resp = nextQueuedItem();
584    if (!resp) {
585        producer->getLogger().log(EXTENSION_LOG_NOTICE,
586                                  "(vb %" PRIu16 ") Stream closed, "
587                                  "%" PRIu64 " items sent from backfill phase, "
588                                  "%" PRIu64 " items sent from memory phase, "
589                                  "%" PRIu64 " was last seqno sent",
590                                  vb_,
591                                  uint64_t(backfillItems.sent.load()),
592                                  uint64_t(itemsFromMemoryPhase.load()),
593                                  lastSentSeqno.load());
594    }
595    return resp;
596}
597
598bool ActiveStream::isCompressionEnabled() {
599    return producer->isValueCompressionEnabled();
600}
601
602void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
603    Stream::addStats(add_stat, c);
604
605    try {
606        const int bsize = 1024;
607        char buffer[bsize];
608        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_disk_items",
609                         name_.c_str(), vb_);
610        add_casted_stat(buffer, backfillItems.disk, add_stat, c);
611        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_mem_items",
612                         name_.c_str(), vb_);
613        add_casted_stat(buffer, backfillItems.memory, add_stat, c);
614        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_sent",
615                         name_.c_str(), vb_);
616        add_casted_stat(buffer, backfillItems.sent, add_stat, c);
617        checked_snprintf(buffer, bsize, "%s:stream_%d_memory_phase",
618                         name_.c_str(), vb_);
619        add_casted_stat(buffer, itemsFromMemoryPhase.load(), add_stat, c);
620        checked_snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno",
621                         name_.c_str(), vb_);
622        add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
623        checked_snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
624                         name_.c_str(), vb_);
625        add_casted_stat(buffer,
626                        lastSentSnapEndSeqno.load(std::memory_order_relaxed),
627                        add_stat, c);
628        checked_snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno",
629                         name_.c_str(), vb_);
630        add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
631        checked_snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory",
632                         name_.c_str(), vb_);
633        add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
634        checked_snprintf(buffer, bsize, "%s:stream_%d_items_ready",
635                         name_.c_str(), vb_);
636        add_casted_stat(buffer, itemsReady.load() ? "true" : "false", add_stat,
637                        c);
638        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_bytes",
639                         name_.c_str(), vb_);
640        add_casted_stat(buffer, bufferedBackfill.bytes, add_stat, c);
641        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_items",
642                         name_.c_str(), vb_);
643        add_casted_stat(buffer, bufferedBackfill.items, add_stat, c);
644
645        if ((state_ == STREAM_TAKEOVER_SEND) && takeoverStart != 0) {
646            checked_snprintf(buffer, bsize, "%s:stream_%d_takeover_since",
647                             name_.c_str(), vb_);
648            add_casted_stat(buffer, ep_current_time() - takeoverStart, add_stat,
649                            c);
650        }
651    } catch (std::exception& error) {
652        LOG(EXTENSION_LOG_WARNING,
653            "ActiveStream::addStats: Failed to build stats: %s", error.what());
654    }
655}
656
657void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
658    LockHolder lh(streamMutex);
659
660    RCPtr<VBucket> vb = engine->getVBucket(vb_);
661    add_casted_stat("name", name_, add_stat, cookie);
662    if (!vb || state_ == STREAM_DEAD) {
663        add_casted_stat("status", "completed", add_stat, cookie);
664        add_casted_stat("estimate", 0, add_stat, cookie);
665        add_casted_stat("backfillRemaining", 0, add_stat, cookie);
666        return;
667    }
668
669    size_t total = backfillRemaining.load(std::memory_order_relaxed);
670    if (state_ == STREAM_BACKFILLING) {
671        add_casted_stat("status", "backfilling", add_stat, cookie);
672    } else {
673        add_casted_stat("status", "in-memory", add_stat, cookie);
674    }
675    add_casted_stat("backfillRemaining",
676                    backfillRemaining.load(std::memory_order_relaxed),
677                    add_stat, cookie);
678
679    item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
680    size_t vb_items = vb->getNumItems(iep);
681    size_t chk_items = vb_items > 0 ?
682                vb->checkpointManager.getNumItemsForCursor(name_) : 0;
683
684    size_t del_items = 0;
685    try {
686        del_items = engine->getEpStore()->getRWUnderlying(vb_)->
687                                                        getNumPersistedDeletes(vb_);
688    } catch (std::runtime_error& e) {
689        producer->getLogger().log(EXTENSION_LOG_WARNING,
690            "ActiveStream:addTakeoverStats: exception while getting num persisted "
691            "deletes for vbucket:%" PRIu16 " - treating as 0 deletes. "
692            "Details: %s", vb_, e.what());
693    }
694
695    if (end_seqno_ < curChkSeqno) {
696        chk_items = 0;
697    } else if ((end_seqno_ - curChkSeqno) < chk_items) {
698        chk_items = end_seqno_ - curChkSeqno + 1;
699    }
700    total += chk_items;
701
702    add_casted_stat("estimate", total, add_stat, cookie);
703    add_casted_stat("chk_items", chk_items, add_stat, cookie);
704    add_casted_stat("vb_items", vb_items, add_stat, cookie);
705    add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
706}
707
708DcpResponse* ActiveStream::nextQueuedItem() {
709    if (!readyQ.empty()) {
710        DcpResponse* response = readyQ.front();
711        if (producer->bufferLogInsert(response->getMessageSize())) {
712            if (response->getEvent() == DCP_MUTATION ||
713                    response->getEvent() == DCP_DELETION ||
714                    response->getEvent() == DCP_EXPIRATION) {
715                lastSentSeqno.store(
716                        dynamic_cast<MutationResponse*>(response)->getBySeqno());
717
718                if (state_ == STREAM_BACKFILLING) {
719                    backfillItems.sent++;
720                } else {
721                    itemsFromMemoryPhase++;
722                }
723            }
724            popFromReadyQ();
725            return response;
726        }
727    }
728    return NULL;
729}
730
731bool ActiveStream::nextCheckpointItem() {
732    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
733    if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(name_) > 0) {
734        // schedule this stream to build the next checkpoint
735        producer->scheduleCheckpointProcessorTask(this);
736        return true;
737    } else if (chkptItemsExtractionInProgress) {
738        return true;
739    }
740    return false;
741}
742
743bool ActiveStreamCheckpointProcessorTask::run() {
744    if (engine->getEpStats().isShutdown) {
745        return false;
746    }
747
748    // Setup that we will sleep forever when done.
749    snooze(INT_MAX);
750
751    // Clear the notfification flag
752    notified.store(false);
753
754    size_t iterations = 0;
755    do {
756        stream_t nextStream = queuePop();
757        ActiveStream* stream = static_cast<ActiveStream*>(nextStream.get());
758
759        if (stream) {
760            stream->nextCheckpointItemTask();
761        } else {
762            break;
763        }
764        iterations++;
765    } while(!queueEmpty()
766            && iterations < iterationsBeforeYield);
767
768    // Now check if we were re-notified or there are still checkpoints
769    bool expected = true;
770    if (notified.compare_exchange_strong(expected, false)
771        || !queueEmpty()) {
772        // snooze for 0, essentially yielding and allowing other tasks a go
773        snooze(0.0);
774    }
775
776    return true;
777}
778
779void ActiveStreamCheckpointProcessorTask::wakeup() {
780    ExecutorPool::get()->wake(getId());
781}
782
783void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
784    pushUnique(stream);
785
786    bool expected = false;
787    if (notified.compare_exchange_strong(expected, true)) {
788        wakeup();
789    }
790}
791
792void ActiveStreamCheckpointProcessorTask::clearQueues() {
793    LockHolder lh(workQueueLock);
794    while (!queue.empty()) {
795        queue.pop();
796    }
797    queuedVbuckets.clear();
798}
799
800void ActiveStream::nextCheckpointItemTask() {
801    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
802    if (vbucket) {
803        std::vector<queued_item> items;
804        getOutstandingItems(vbucket, items);
805        processItems(items);
806    } else {
807        /* The entity deleting the vbucket must set stream to dead,
808           calling setDead(END_STREAM_STATE) will cause deadlock because
809           it will try to grab streamMutex which is already acquired at this
810           point here */
811        return;
812    }
813}
814
815void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
816                                       std::vector<queued_item> &items) {
817    // Commencing item processing - set guard flag.
818    chkptItemsExtractionInProgress.store(true);
819
820    vb->checkpointManager.getAllItemsForCursor(name_, items);
821    if (vb->checkpointManager.getNumCheckpoints() > 1) {
822        engine->getEpStore()->wakeUpCheckpointRemover();
823    }
824}
825
826void ActiveStream::processItems(std::vector<queued_item>& items) {
827    if (!items.empty()) {
828        bool mark = false;
829        if (items.front()->getOperation() == queue_op::checkpoint_start) {
830            mark = true;
831        }
832
833        std::deque<MutationResponse*> mutations;
834        std::vector<queued_item>::iterator itr = items.begin();
835        for (; itr != items.end(); ++itr) {
836            queued_item& qi = *itr;
837
838            if (qi->shouldReplicate()) {
839                curChkSeqno = qi->getBySeqno();
840                lastReadSeqnoUnSnapshotted = qi->getBySeqno();
841
842                mutations.push_back(new MutationResponse(qi, opaque_, nullptr,
843                            isSendMutationKeyOnlyEnabled() ? KEY_ONLY :
844                                                             KEY_VALUE));
845            } else if (qi->getOperation() == queue_op::checkpoint_start) {
846                /* if there are already other mutations, then they belong to the
847                   previous checkpoint and hence we must create a snapshot and
848                   put them onto readyQ */
849                if (!mutations.empty()) {
850                    snapshot(mutations, mark);
851                    /* clear out all the mutations since they are already put
852                       onto the readyQ */
853                    mutations.clear();
854                }
855                /* mark true as it indicates a new checkpoint snapshot */
856                mark = true;
857            }
858        }
859
860        if (mutations.empty()) {
861            // If we only got checkpoint start or ends check to see if there are
862            // any more snapshots before pausing the stream.
863            nextCheckpointItemTask();
864        } else {
865            snapshot(mutations, mark);
866        }
867    }
868
869    // Completed item processing - clear guard flag and notify producer.
870    chkptItemsExtractionInProgress.store(false);
871    producer->notifyStreamReady(vb_);
872}
873
874void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
875    if (items.empty()) {
876        return;
877    }
878
879    LockHolder lh(streamMutex);
880
881    if ((state_ == STREAM_DEAD) || (state_ == STREAM_BACKFILLING)) {
882        // If stream was closed forcefully by the time the checkpoint items
883        // retriever task completed, or if we decided to switch the stream to
884        // backfill state from in-memory state, none of the acquired mutations
885        // should be added on the stream's readyQ. We must drop items in case
886        // we switch state from in-memory to backfill because we schedule
887        // backfill from lastReadSeqno + 1
888        std::deque<MutationResponse *>::iterator itr = items.begin();
889        for (; itr != items.end(); ++itr) {
890            delete *itr;
891        }
892        items.clear();
893        return;
894    }
895
896    /* This assumes that all items in the "items deque" is put onto readyQ */
897    lastReadSeqno.store(lastReadSeqnoUnSnapshotted);
898
899    if (isCurrentSnapshotCompleted()) {
900        uint32_t flags = MARKER_FLAG_MEMORY;
901        uint64_t snapStart = items.front()->getBySeqno();
902        uint64_t snapEnd = items.back()->getBySeqno();
903
904        if (mark) {
905            flags |= MARKER_FLAG_CHK;
906        }
907
908        if (state_ == STREAM_TAKEOVER_SEND) {
909            waitForSnapshot++;
910            flags |= MARKER_FLAG_ACK;
911        }
912
913        if (!firstMarkerSent) {
914            snapStart = std::min(snap_start_seqno_, snapStart);
915            firstMarkerSent = true;
916        }
917        pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
918                                        flags));
919        lastSentSnapEndSeqno.store(snapEnd, std::memory_order_relaxed);
920    }
921
922    std::deque<MutationResponse*>::iterator itemItr;
923    for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
924        pushToReadyQ(*itemItr);
925    }
926}
927
928uint32_t ActiveStream::setDead(end_stream_status_t status) {
929    {
930        LockHolder lh(streamMutex);
931        endStream(status);
932    }
933
934    bool inverse = false;
935    if (status != END_STREAM_DISCONNECTED &&
936        itemsReady.compare_exchange_strong(inverse, true)) {
937        producer->notifyStreamReady(vb_);
938    }
939    return 0;
940}
941
942void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
943    if (state_ != STREAM_DEAD) {
944        bool inverse = false;
945        if (itemsReady.compare_exchange_strong(inverse, true)) {
946            producer->notifyStreamReady(vb_);
947        }
948    }
949}
950
951void ActiveStream::endStream(end_stream_status_t reason) {
952    if (state_ != STREAM_DEAD) {
953        pendingBackfill = false;
954        if (state_ == STREAM_BACKFILLING) {
955            // If Stream were in Backfilling state, clear out the
956            // backfilled items to clear up the backfill buffer.
957            clear_UNLOCKED();
958            producer->recordBackfillManagerBytesSent(bufferedBackfill.bytes);
959            bufferedBackfill.bytes = 0;
960            bufferedBackfill.items = 0;
961        }
962        transitionState(STREAM_DEAD);
963        if (reason != END_STREAM_DISCONNECTED) {
964            pushToReadyQ(new StreamEndResponse(opaque_, reason, vb_));
965        }
966        producer->getLogger().log(EXTENSION_LOG_NOTICE,
967                                  "(vb %" PRIu16 ") Stream closing, "
968                                  "sent until seqno %" PRIu64 " "
969                                  "remaining items %" PRIu64 ", "
970                                  "reason: %s",
971                                  vb_,
972                                  lastSentSeqno.load(),
973                                  uint64_t(readyQ_non_meta_items.load()),
974                                  getEndStreamStatusStr(reason));
975    }
976}
977
978void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
979    if (isBackfillTaskRunning) {
980        producer->getLogger().log(EXTENSION_LOG_NOTICE,
981                                  "(vb %" PRIu16 ") Skipping "
982                                  "scheduleBackfill_UNLOCKED; "
983                                  "lastReadSeqno %" PRIu64 ", reschedule flag "
984                                  ": %s", vb_, lastReadSeqno.load(),
985                                  reschedule ? "True" : "False");
986        return;
987    }
988
989    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
990    if (!vbucket) {
991        producer->getLogger().log(EXTENSION_LOG_WARNING,
992                                  "(vb %" PRIu16 ") Failed to schedule "
993                                  "backfill as unable to get vbucket; "
994                                  "lastReadSeqno : %" PRIu64 ", "
995                                  "reschedule : %s",
996                                  vb_, lastReadSeqno.load(),
997                                  reschedule ? "True" : "False");
998        return;
999    }
1000
1001    uint64_t backfillStart = lastReadSeqno.load() + 1;
1002    uint64_t backfillEnd = 0;
1003    bool tryBackfill = false;
1004
1005    if ((flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) || reschedule) {
1006        uint64_t vbHighSeqno = static_cast<uint64_t>(vbucket->getHighSeqno());
1007        if (lastReadSeqno.load() > vbHighSeqno) {
1008            throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
1009                                   "lastReadSeqno (which is " +
1010                                   std::to_string(lastReadSeqno.load()) +
1011                                   " ) is greater than vbHighSeqno (which is " +
1012                                   std::to_string(vbHighSeqno) + " ). " +
1013                                   "for stream " + producer->logHeader() +
1014                                   "; vb " + std::to_string(vb_));
1015        }
1016        if (reschedule) {
1017            /* We need to do this for reschedule because in case of
1018               DCP_ADD_STREAM_FLAG_DISKONLY (the else part), end_seqno_ is
1019               set to last persisted seqno befor calling
1020               scheduleBackfill_UNLOCKED() */
1021            backfillEnd = engine->getEpStore()->getLastPersistedSeqno(vb_);
1022        } else {
1023            backfillEnd = end_seqno_;
1024        }
1025        tryBackfill = true;
1026    } else {
1027        CursorRegResult result =
1028            vbucket->checkpointManager.registerCursorBySeqno(
1029                                                name_,
1030                                                lastReadSeqno.load(),
1031                                                MustSendCheckpointEnd::NO);
1032        curChkSeqno = result.first;
1033        tryBackfill = result.second;
1034
1035        if (lastReadSeqno.load() > curChkSeqno) {
1036            throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
1037                                   "lastReadSeqno (which is " +
1038                                   std::to_string(lastReadSeqno.load()) +
1039                                   " ) is greater than curChkSeqno (which is " +
1040                                   std::to_string(curChkSeqno) + " ). " +
1041                                   "for stream " + producer->logHeader() +
1042                                   "; vb " + std::to_string(vb_));
1043        }
1044
1045        /* We need to find the minimum seqno that needs to be backfilled in
1046         * order to make sure that we don't miss anything when transitioning
1047         * to a memory snapshot. The backfill task will always make sure that
1048         * the backfill end seqno is contained in the backfill.
1049         */
1050        if (backfillStart < curChkSeqno) {
1051            if (curChkSeqno > end_seqno_) {
1052                /* Backfill only is enough */
1053                backfillEnd = end_seqno_;
1054            } else {
1055                /* Backfill + in-memory streaming */
1056                backfillEnd = curChkSeqno - 1;
1057            }
1058        }
1059    }
1060
1061    if (backfillStart <= backfillEnd && tryBackfill) {
1062        producer->getLogger().log(EXTENSION_LOG_NOTICE,
1063                                  "(vb %" PRIu16 ") Scheduling backfill "
1064                                  "from %" PRIu64 " to %" PRIu64 ", reschedule "
1065                                  "flag : %s", vb_, backfillStart, backfillEnd,
1066                                  reschedule ? "True" : "False");
1067        producer->scheduleBackfillManager(this, backfillStart, backfillEnd);
1068        isBackfillTaskRunning.store(true);
1069    } else {
1070        if (reschedule) {
1071            // Infrequent code path, see comment below.
1072            producer->getLogger().log(EXTENSION_LOG_NOTICE,
1073                                      "(vb %" PRIu16 ") Did not schedule "
1074                                      "backfill with reschedule : True; "
1075                                      "backfillStart : %" PRIu64 ", "
1076                                      "backfillEnd : %" PRIu64 ", "
1077                                      "flags_ : %" PRIu32 ", "
1078                                      "tryBackfill : %s, "
1079                                      "start_seqno_ : %" PRIu64 ", "
1080                                      "end_seqno_ : %" PRIu64 ", "
1081                                      "lastReadSeqno : %" PRIu64 ", "
1082                                      "lastSentSeqno : %" PRIu64 ", "
1083                                      "curChkSeqno : %" PRIu64 ", "
1084                                      "itemsReady : %s",
1085                                      vb_, backfillStart, backfillEnd, flags_,
1086                                      tryBackfill ? "True" : "False",
1087                                      start_seqno_, end_seqno_,
1088                                      lastReadSeqno.load(),
1089                                      lastSentSeqno.load(), curChkSeqno.load(),
1090                                      itemsReady ? "True" : "False");
1091        }
1092        if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
1093            endStream(END_STREAM_OK);
1094        } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
1095            transitionState(STREAM_TAKEOVER_SEND);
1096        } else {
1097            transitionState(STREAM_IN_MEMORY);
1098        }
1099        if (reschedule) {
1100            /* Cursor was dropped, but we will not do backfill.
1101               This may happen in a corner case where, the memory
1102               usage is high due to other vbuckets and persistence cursor moves
1103               ahead of replication cursor to new checkpoint open but does not
1104               persist items yet.
1105               Note: (1) We must not notify when we schedule backfill for the
1106                         first time because the stream is not yet in producer
1107                         conn list of streams
1108                     (2) It is not absolutely necessary to notify immediately
1109                         as conn manager or an incoming items will cause a
1110                         notification eventually, but wouldn't hurt to do so */
1111            bool inverse = false;
1112            if (itemsReady.compare_exchange_strong(inverse, true)) {
1113                producer->notifyStreamReady(vb_);
1114            }
1115        }
1116    }
1117}
1118
1119void ActiveStream::handleSlowStream()
1120{
1121    LockHolder lh(streamMutex);
1122    producer->getLogger().log(EXTENSION_LOG_NOTICE,
1123                              "(vb %" PRIu16 ") Handling slow stream; "
1124                              "state_ : %s, "
1125                              "lastReadSeqno : %" PRIu64 ", "
1126                              "lastSentSeqno : %" PRIu64 ", "
1127                              "isBackfillTaskRunning : %s",
1128                              vb_, stateName(state_),
1129                              lastReadSeqno.load(),
1130                              lastSentSeqno.load(),
1131                              isBackfillTaskRunning.load() ? "True" : "False");
1132    switch (state_.load()) {
1133        case STREAM_BACKFILLING:
1134            if (isBackfillTaskRunning.load()) {
1135                /* Drop the existing cursor and set pending backfill */
1136                dropCheckpointCursor_UNLOCKED();
1137                pendingBackfill = true;
1138            } else {
1139                scheduleBackfill_UNLOCKED(true);
1140            }
1141            break;
1142        case STREAM_IN_MEMORY:
1143            /* Drop the existing cursor and set pending backfill */
1144            dropCheckpointCursor_UNLOCKED();
1145            pendingBackfill = true;
1146            break;
1147        case STREAM_TAKEOVER_SEND:
1148            /* To be handled later if needed */
1149        case STREAM_TAKEOVER_WAIT:
1150            /* To be handled later if needed */
1151        case STREAM_DEAD:
1152            /* To be handled later if needed */
1153            break;
1154        case STREAM_PENDING:
1155        case STREAM_READING:
1156            throw std::logic_error("ActiveStream::handleSlowStream: "
1157                                   "called with state " +
1158                                   std::to_string(state_.load()) + " . " +
1159                                   "for stream " + producer->logHeader() +
1160                                   "; vb " + std::to_string(vb_));
1161    }
1162}
1163
1164const char* ActiveStream::getEndStreamStatusStr(end_stream_status_t status)
1165{
1166    switch (status) {
1167    case END_STREAM_OK:
1168        return "The stream ended due to all items being streamed";
1169    case END_STREAM_CLOSED:
1170        return "The stream closed early due to a close stream message";
1171    case END_STREAM_STATE:
1172        return "The stream closed early because the vbucket state changed";
1173    case END_STREAM_DISCONNECTED:
1174        return "The stream closed early because the conn was disconnected";
1175    case END_STREAM_SLOW:
1176        return "The stream was closed early because it was too slow";
1177    }
1178    std::string msg("Status unknown: " + std::to_string(status) +
1179                    "; this should not have happened!");
1180    return msg.c_str();
1181}
1182
1183void ActiveStream::transitionState(stream_state_t newState) {
1184    producer->getLogger().log(EXTENSION_LOG_DEBUG,
1185                              "(vb %d) Transitioning from %s to %s",
1186                              vb_, stateName(state_), stateName(newState));
1187
1188    if (state_ == newState) {
1189        return;
1190    }
1191
1192    bool validTransition = false;
1193    switch (state_.load()) {
1194        case STREAM_PENDING:
1195            if (newState == STREAM_BACKFILLING || newState == STREAM_DEAD) {
1196                validTransition = true;
1197            }
1198            break;
1199        case STREAM_BACKFILLING:
1200            if(newState == STREAM_IN_MEMORY ||
1201               newState == STREAM_TAKEOVER_SEND ||
1202               newState == STREAM_DEAD) {
1203                validTransition = true;
1204            }
1205            break;
1206        case STREAM_IN_MEMORY:
1207            if (newState == STREAM_BACKFILLING || newState == STREAM_DEAD) {
1208                validTransition = true;
1209            }
1210            break;
1211        case STREAM_TAKEOVER_SEND:
1212            if (newState == STREAM_TAKEOVER_WAIT || newState == STREAM_DEAD) {
1213                validTransition = true;
1214            }
1215            break;
1216        case STREAM_TAKEOVER_WAIT:
1217            if (newState == STREAM_TAKEOVER_SEND || newState == STREAM_DEAD) {
1218                validTransition = true;
1219            }
1220            break;
1221        case STREAM_READING:
1222            // Active stream should never be in READING state.
1223            validTransition = false;
1224            break;
1225        case STREAM_DEAD:
1226            // Once DEAD, no other transitions should occur.
1227            validTransition = false;
1228            break;
1229    }
1230
1231    if (!validTransition) {
1232        throw std::invalid_argument("ActiveStream::transitionState:"
1233                " newState (which is " + std::to_string(newState) +
1234                ") is not valid for current state (which is " +
1235                std::to_string(state_) + ")");
1236    }
1237
1238    stream_state_t oldState = state_.load();
1239    state_ = newState;
1240
1241    switch (newState) {
1242        case STREAM_BACKFILLING:
1243            if (STREAM_PENDING == oldState) {
1244                scheduleBackfill_UNLOCKED(false /* reschedule */);
1245            } else if (STREAM_IN_MEMORY == oldState) {
1246                scheduleBackfill_UNLOCKED(true /* reschedule */);
1247            }
1248            break;
1249        case STREAM_IN_MEMORY:
1250            // Check if the producer has sent up till the last requested
1251            // sequence number already, if not - move checkpoint items into
1252            // the ready queue.
1253            if (lastSentSeqno.load() >= end_seqno_) {
1254                // Stream transitioning to DEAD state
1255                endStream(END_STREAM_OK);
1256                bool inverse = false;
1257                if (itemsReady.compare_exchange_strong(inverse, true)) {
1258                    producer->notifyStreamReady(vb_);
1259                }
1260            } else {
1261                nextCheckpointItem();
1262            }
1263            break;
1264        case STREAM_TAKEOVER_SEND:
1265            takeoverStart = ep_current_time();
1266            nextCheckpointItem();
1267            break;
1268        case STREAM_DEAD:
1269            {
1270                RCPtr<VBucket> vb = engine->getVBucket(vb_);
1271                if (vb) {
1272                    vb->checkpointManager.removeCursor(name_);
1273                }
1274                break;
1275            }
1276        case STREAM_TAKEOVER_WAIT:
1277        case STREAM_PENDING:
1278            break;
1279        case STREAM_READING:
1280            throw std::logic_error("ActiveStream::transitionState:"
1281                    " newState can't be " + std::to_string(newState) + "!");
1282    }
1283}
1284
1285size_t ActiveStream::getItemsRemaining() {
1286    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1287
1288    if (!vbucket || state_ == STREAM_DEAD) {
1289        return 0;
1290    }
1291
1292    // Items remaining is the sum of:
1293    // (a) Items outstanding in checkpoints
1294    // (b) Items pending in our readyQ, excluding any meta items.
1295    return vbucket->checkpointManager.getNumItemsForCursor(name_) +
1296            readyQ_non_meta_items;
1297}
1298
1299uint64_t ActiveStream::getLastReadSeqno() const {
1300    return lastReadSeqno.load();
1301}
1302
1303uint64_t ActiveStream::getLastSentSeqno() const {
1304    return lastSentSeqno.load();
1305}
1306
1307const Logger& ActiveStream::getLogger() const
1308{
1309    return producer->getLogger();
1310}
1311
1312bool ActiveStream::isSendMutationKeyOnlyEnabled() const
1313{
1314    return (KEY_ONLY == payloadType);
1315}
1316
1317bool ActiveStream::isCurrentSnapshotCompleted() const
1318{
1319    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1320    // An atomic read of vbucket state without acquiring the
1321    // reader lock for state should suffice here.
1322    if (vbucket && vbucket->getState() == vbucket_state_replica) {
1323        if (lastSentSnapEndSeqno.load(std::memory_order_relaxed) >=
1324            lastReadSeqno) {
1325            return false;
1326        }
1327    }
1328    return true;
1329}
1330
1331void ActiveStream::dropCheckpointCursor_UNLOCKED()
1332{
1333    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1334    if (!vbucket) {
1335        endStream(END_STREAM_STATE);
1336        bool inverse = false;
1337        if (itemsReady.compare_exchange_strong(inverse, true)) {
1338            producer->notifyStreamReady(vb_);
1339        }
1340    }
1341    /* Drop the existing cursor */
1342    vbucket->checkpointManager.removeCursor(name_);
1343}
1344
1345NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
1346                               const std::string &name, uint32_t flags,
1347                               uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1348                               uint64_t en_seqno, uint64_t vb_uuid,
1349                               uint64_t snap_start_seqno,
1350                               uint64_t snap_end_seqno)
1351    : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1352             snap_start_seqno, snap_end_seqno),
1353      producer(p) {
1354    LockHolder lh(streamMutex);
1355    RCPtr<VBucket> vbucket = e->getVBucket(vb_);
1356    if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
1357        pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1358        transitionState(STREAM_DEAD);
1359        itemsReady.store(true);
1360    }
1361
1362    type_ = STREAM_NOTIFIER;
1363
1364    producer->getLogger().log(EXTENSION_LOG_NOTICE,
1365        "(vb %d) stream created with start seqno %" PRIu64 " and end seqno %"
1366        PRIu64, vb, st_seqno, en_seqno);
1367}
1368
1369uint32_t NotifierStream::setDead(end_stream_status_t status) {
1370    LockHolder lh(streamMutex);
1371    if (state_ != STREAM_DEAD) {
1372        transitionState(STREAM_DEAD);
1373        if (status != END_STREAM_DISCONNECTED) {
1374            pushToReadyQ(new StreamEndResponse(opaque_, status, vb_));
1375            lh.unlock();
1376            bool inverse = false;
1377            if (itemsReady.compare_exchange_strong(inverse, true)) {
1378                producer->notifyStreamReady(vb_);
1379            }
1380        }
1381    }
1382    return 0;
1383}
1384
1385void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
1386    LockHolder lh(streamMutex);
1387    if (state_ != STREAM_DEAD && start_seqno_ < seqno) {
1388        pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1389        transitionState(STREAM_DEAD);
1390        lh.unlock();
1391        bool inverse = false;
1392        if (itemsReady.compare_exchange_strong(inverse, true)) {
1393            producer->notifyStreamReady(vb_);
1394        }
1395    }
1396}
1397
1398DcpResponse* NotifierStream::next() {
1399    LockHolder lh(streamMutex);
1400
1401    if (readyQ.empty()) {
1402        itemsReady.store(false);
1403        return NULL;
1404    }
1405
1406    DcpResponse* response = readyQ.front();
1407    if (producer->bufferLogInsert(response->getMessageSize())) {
1408        popFromReadyQ();
1409    } else {
1410        response = NULL;
1411    }
1412
1413    return response;
1414}
1415
1416void NotifierStream::transitionState(stream_state_t newState) {
1417    producer->getLogger().log(EXTENSION_LOG_DEBUG,
1418        "(vb %d) Transitioning from %s to %s", vb_,
1419        stateName(state_), stateName(newState));
1420
1421    if (state_ == newState) {
1422        return;
1423    }
1424
1425    bool validTransition = false;
1426    switch (state_.load()) {
1427        case STREAM_PENDING:
1428            if (newState == STREAM_DEAD) {
1429                validTransition = true;
1430            }
1431            break;
1432
1433        case STREAM_BACKFILLING:
1434        case STREAM_IN_MEMORY:
1435        case STREAM_TAKEOVER_SEND:
1436        case STREAM_TAKEOVER_WAIT:
1437        case STREAM_READING:
1438        case STREAM_DEAD:
1439            // No other state transitions are valid for a notifier stream.
1440            break;
1441    }
1442
1443    if (!validTransition) {
1444        throw std::invalid_argument("NotifierStream::transitionState:"
1445                " newState (which is " + std::to_string(newState) +
1446                ") is not valid for current state (which is " +
1447                std::to_string(state_) + ")");
1448    }
1449    state_ = newState;
1450}
1451
1452PassiveStream::PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t c,
1453                             const std::string &name, uint32_t flags,
1454                             uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1455                             uint64_t en_seqno, uint64_t vb_uuid,
1456                             uint64_t snap_start_seqno, uint64_t snap_end_seqno,
1457                             uint64_t vb_high_seqno)
1458    : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1459             snap_start_seqno, snap_end_seqno),
1460      engine(e), consumer(c), last_seqno(vb_high_seqno), cur_snapshot_start(0),
1461      cur_snapshot_end(0), cur_snapshot_type(none), cur_snapshot_ack(false) {
1462    LockHolder lh(streamMutex);
1463    pushToReadyQ(new StreamRequest(vb, opaque, flags, st_seqno, en_seqno,
1464                                  vb_uuid, snap_start_seqno, snap_end_seqno));
1465    itemsReady.store(true);
1466    type_ = STREAM_PASSIVE;
1467
1468    const char* type = (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) ? "takeover" : "";
1469    consumer->getLogger().log(EXTENSION_LOG_NOTICE,
1470        "(vb %" PRId16 ") Attempting to add %s stream"
1471        " with start seqno %" PRIu64 ", end seqno %" PRIu64 ","
1472        " vbucket uuid %" PRIu64 ", snap start seqno %" PRIu64 ","
1473        " snap end seqno %" PRIu64 ", and vb_high_seqno %" PRIu64 "",
1474        vb, type, st_seqno, en_seqno, vb_uuid,
1475        snap_start_seqno, snap_end_seqno, vb_high_seqno);
1476}
1477
1478PassiveStream::~PassiveStream() {
1479    uint32_t unackedBytes = clearBuffer_UNLOCKED();
1480    if (transitionState(STREAM_DEAD)) {
1481        // Destructed a "live" stream, log it.
1482        consumer->getLogger().log(EXTENSION_LOG_NOTICE,
1483            "(vb %" PRId16 ") Destructing stream."
1484            " last_seqno is %" PRIu64 ", unAckedBytes is %" PRIu32 ".",
1485            vb_, last_seqno.load(), unackedBytes);
1486    }
1487}
1488
1489uint32_t PassiveStream::setDead(end_stream_status_t status) {
1490    /* Hold buffer lock so that we clear out all items before we set the stream
1491       to dead state. We do not want to add any new message to the buffer or
1492       process any items in the buffer once we set the stream state to dead. */
1493    LockHolder lh(buffer.bufMutex);
1494    uint32_t unackedBytes = clearBuffer_UNLOCKED();
1495    bool killed = false;
1496
1497    LockHolder slh(streamMutex);
1498    if (transitionState(STREAM_DEAD)) {
1499        killed = true;
1500    }
1501
1502    if (killed) {
1503        EXTENSION_LOG_LEVEL logLevel = EXTENSION_LOG_NOTICE;
1504        if (END_STREAM_DISCONNECTED == status) {
1505            logLevel = EXTENSION_LOG_WARNING;
1506        }
1507        consumer->getLogger().log(logLevel,
1508            "(vb %" PRId16 ") Setting stream to dead state, last_seqno is %"
1509            PRIu64 ", unAckedBytes is %" PRIu32 ", status is %s",
1510            vb_, last_seqno.load(), unackedBytes, getEndStreamStatusStr(status));
1511    }
1512    return unackedBytes;
1513}
1514
1515void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
1516    LockHolder lh(streamMutex);
1517    if (state_ == STREAM_PENDING) {
1518        if (status == ENGINE_SUCCESS) {
1519            transitionState(STREAM_READING);
1520        } else {
1521            transitionState(STREAM_DEAD);
1522        }
1523        pushToReadyQ(new AddStreamResponse(add_opaque, opaque_, status));
1524        lh.unlock();
1525        bool inverse = false;
1526        if (itemsReady.compare_exchange_strong(inverse, true)) {
1527            consumer->notifyStreamReady(vb_);
1528        }
1529    }
1530}
1531
1532void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
1533                                    uint32_t new_opaque,
1534                                    uint64_t start_seqno) {
1535    vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
1536
1537    snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
1538    if (info.range.end == info.start) {
1539        info.range.start = info.start;
1540    }
1541
1542    snap_start_seqno_ = info.range.start;
1543    start_seqno_ = info.start;
1544    snap_end_seqno_ = info.range.end;
1545
1546    consumer->getLogger().log(EXTENSION_LOG_NOTICE,
1547        "(vb %d) Attempting to reconnect stream with opaque %" PRIu32
1548        ", start seq no %" PRIu64 ", end seq no %" PRIu64
1549        ", snap start seqno %" PRIu64 ", and snap end seqno %" PRIu64,
1550        vb_, new_opaque, start_seqno, end_seqno_,
1551        snap_start_seqno_, snap_end_seqno_);
1552
1553    LockHolder lh(streamMutex);
1554    last_seqno.store(start_seqno);
1555    pushToReadyQ(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
1556                                  end_seqno_, vb_uuid_, snap_start_seqno_,
1557                                  snap_end_seqno_));
1558    lh.unlock();
1559    bool inverse = false;
1560    if (itemsReady.compare_exchange_strong(inverse, true)) {
1561        consumer->notifyStreamReady(vb_);
1562    }
1563}
1564
1565ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
1566    if(nullptr == resp) {
1567        return ENGINE_EINVAL;
1568    }
1569
1570    LockHolder lh(buffer.bufMutex);
1571
1572    if (state_ == STREAM_DEAD) {
1573        delete resp;
1574        return ENGINE_KEY_ENOENT;
1575    }
1576
1577    switch (resp->getEvent()) {
1578        case DCP_MUTATION:
1579        case DCP_DELETION:
1580        case DCP_EXPIRATION:
1581        {
1582            MutationResponse* m = static_cast<MutationResponse*>(resp);
1583            uint64_t bySeqno = m->getBySeqno();
1584            if (bySeqno <= last_seqno.load()) {
1585                consumer->getLogger().log(EXTENSION_LOG_WARNING,
1586                    "(vb %d) Erroneous (out of sequence) mutation received, "
1587                    "with opaque: %" PRIu32 ", its seqno (%" PRIu64 ") is not "
1588                    "greater than last received seqno (%" PRIu64 "); "
1589                    "Dropping mutation!",
1590                    vb_, opaque_, bySeqno, last_seqno.load());
1591                delete m;
1592                return ENGINE_ERANGE;
1593            }
1594            last_seqno.store(bySeqno);
1595            break;
1596        }
1597        case DCP_SNAPSHOT_MARKER:
1598        {
1599            SnapshotMarker* s = static_cast<SnapshotMarker*>(resp);
1600            uint64_t snapStart = s->getStartSeqno();
1601            uint64_t snapEnd = s->getEndSeqno();
1602            if (snapStart < last_seqno.load() && snapEnd <= last_seqno.load()) {
1603                consumer->getLogger().log(EXTENSION_LOG_WARNING,
1604                    "(vb %d) Erroneous snapshot marker received, with "
1605                    "opaque: %" PRIu32 ", its start "
1606                    "(%" PRIu64 "), and end (%" PRIu64 ") are less than last "
1607                    "received seqno (%" PRIu64 "); Dropping marker!",
1608                    vb_, opaque_, snapStart, snapEnd, last_seqno.load());
1609                delete s;
1610                return ENGINE_ERANGE;
1611            }
1612            break;
1613        }
1614        case DCP_SET_VBUCKET:
1615        case DCP_STREAM_END:
1616        {
1617            /* No validations necessary */
1618            break;
1619        }
1620        default:
1621        {
1622            consumer->getLogger().log(EXTENSION_LOG_WARNING,
1623                "(vb %d) Unknown DCP op received: %d; Disconnecting connection..",
1624                vb_, resp->getEvent());
1625            return ENGINE_DISCONNECT;
1626        }
1627    }
1628
1629    if (engine->getReplicationThrottle().shouldProcess() && !buffer.items) {
1630        /* Process the response here itself rather than buffering it */
1631        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1632        switch (resp->getEvent()) {
1633            case DCP_MUTATION:
1634                ret = processMutation(static_cast<MutationResponse*>(resp));
1635                break;
1636            case DCP_DELETION:
1637            case DCP_EXPIRATION:
1638                ret = processDeletion(static_cast<MutationResponse*>(resp));
1639                break;
1640            case DCP_SNAPSHOT_MARKER:
1641                processMarker(static_cast<SnapshotMarker*>(resp));
1642                break;
1643            case DCP_SET_VBUCKET:
1644                processSetVBucketState(static_cast<SetVBucketState*>(resp));
1645                break;
1646            case DCP_STREAM_END:
1647                {
1648                    LockHolder lh(streamMutex);
1649                    transitionState(STREAM_DEAD);
1650                }
1651                break;
1652            default:
1653                // Above switch should've returned DISCONNECT, throw an exception
1654                throw std::logic_error("PassiveStream::messageReceived: (vb " +
1655                                       std::to_string(vb_) +
1656                                       ") received unknown message type " +
1657                                       std::to_string(resp->getEvent()));
1658        }
1659        if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
1660            delete resp;
1661            return ret;
1662        }
1663    }
1664
1665    buffer.messages.push(resp);
1666    buffer.items++;
1667    buffer.bytes += resp->getMessageSize();
1668
1669    return ENGINE_TMPFAIL;
1670}
1671
1672process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes,
1673                                                             size_t batchSize) {
1674    LockHolder lh(buffer.bufMutex);
1675    uint32_t count = 0;
1676    uint32_t message_bytes = 0;
1677    uint32_t total_bytes_processed = 0;
1678    bool failed = false;
1679    if (buffer.messages.empty()) {
1680        return all_processed;
1681    }
1682
1683    while (count < batchSize && !buffer.messages.empty()) {
1684        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1685        /* If the stream is in dead state we should not process any remaining
1686           items in the buffer, we should rather clear them */
1687        if (state_ == STREAM_DEAD) {
1688            total_bytes_processed += clearBuffer_UNLOCKED();
1689            processed_bytes = total_bytes_processed;
1690            return all_processed;
1691        }
1692
1693        DcpResponse *response = buffer.messages.front();
1694        message_bytes = response->getMessageSize();
1695
1696        switch (response->getEvent()) {
1697            case DCP_MUTATION:
1698                ret = processMutation(static_cast<MutationResponse*>(response));
1699                break;
1700            case DCP_DELETION:
1701            case DCP_EXPIRATION:
1702                ret = processDeletion(static_cast<MutationResponse*>(response));
1703                break;
1704            case DCP_SNAPSHOT_MARKER:
1705                processMarker(static_cast<SnapshotMarker*>(response));
1706                break;
1707            case DCP_SET_VBUCKET:
1708                processSetVBucketState(static_cast<SetVBucketState*>(response));
1709                break;
1710            case DCP_STREAM_END:
1711                {
1712                    LockHolder lh(streamMutex);
1713                    transitionState(STREAM_DEAD);
1714                }
1715                break;
1716            default:
1717                consumer->getLogger().log(EXTENSION_LOG_WARNING,
1718                                          "PassiveStream::processBufferedMessages:"
1719                                          "(vb %" PRIu16 ") PassiveStream failing "
1720                                          "unknown message type %d",
1721                                          vb_, response->getEvent());
1722                failed = true;
1723        }
1724
1725        if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
1726            failed = true;
1727            break;
1728        }
1729
1730        delete response;
1731        buffer.messages.pop();
1732        buffer.items--;
1733        buffer.bytes -= message_bytes;
1734        count++;
1735        if (ret != ENGINE_ERANGE) {
1736            total_bytes_processed += message_bytes;
1737        }
1738    }
1739
1740    processed_bytes = total_bytes_processed;
1741
1742    if (failed) {
1743        return cannot_process;
1744    }
1745
1746    return all_processed;
1747}
1748
1749ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
1750    RCPtr<VBucket> vb = engine->getVBucket(vb_);
1751    if (!vb) {
1752        return ENGINE_NOT_MY_VBUCKET;
1753    }
1754
1755    if (mutation->getBySeqno() < cur_snapshot_start.load() ||
1756        mutation->getBySeqno() > cur_snapshot_end.load()) {
1757        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1758            "(vb %d) Erroneous mutation [sequence "
1759            "number does not fall in the expected snapshot range : "
1760            "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64 ") <= "
1761            "snapshot_end (%" PRIu64 ")]; Dropping the mutation!",
1762            vb_, cur_snapshot_start.load(),
1763            mutation->getBySeqno(), cur_snapshot_end.load());
1764        return ENGINE_ERANGE;
1765    }
1766
1767    // MB-17517: Check for the incoming item's CAS validity. We /shouldn't/
1768    // receive anything without a valid CAS, however given that versions without
1769    // this check may send us "bad" CAS values, we should regenerate them (which
1770    // is better than rejecting the data entirely).
1771    if (!Item::isValidCas(mutation->getItem()->getCas())) {
1772        LOG(EXTENSION_LOG_WARNING,
1773            "%s Invalid CAS (0x%" PRIx64 ") received for mutation {vb:%" PRIu16
1774            ", seqno:%" PRId64 "}. Regenerating new CAS",
1775            consumer->logHeader(),
1776            mutation->getItem()->getCas(), vb_,
1777            mutation->getItem()->getBySeqno());
1778        mutation->getItem()->setCas();
1779    }
1780
1781    ENGINE_ERROR_CODE ret;
1782    if (vb->isBackfillPhase()) {
1783        ret = engine->getEpStore()->addTAPBackfillItem(*mutation->getItem(),
1784                                                    INITIAL_NRU_VALUE,
1785                                                    false,
1786                                                    mutation->getExtMetaData());
1787    } else {
1788        ret = engine->getEpStore()->setWithMeta(*mutation->getItem(), 0, NULL,
1789                                                consumer->getCookie(), true,
1790                                                true, INITIAL_NRU_VALUE,
1791                                                GenerateBySeqno::No,
1792                                                GenerateCas::No,
1793                                                mutation->getExtMetaData(),
1794                                                true);
1795    }
1796
1797    // We should probably handle these error codes in a better way, but since
1798    // the producer side doesn't do anything with them anyways let's just log
1799    // them for now until we come up with a better solution.
1800    if (ret != ENGINE_SUCCESS) {
1801        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1802            "Got an error code %d while trying to process mutation", ret);
1803    } else {
1804        handleSnapshotEnd(vb, mutation->getBySeqno());
1805    }
1806
1807    return ret;
1808}
1809
1810ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
1811    RCPtr<VBucket> vb = engine->getVBucket(vb_);
1812    if (!vb) {
1813        return ENGINE_NOT_MY_VBUCKET;
1814    }
1815
1816    if (deletion->getBySeqno() < cur_snapshot_start.load() ||
1817        deletion->getBySeqno() > cur_snapshot_end.load()) {
1818        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1819            "(vb %d) Erroneous deletion [sequence "
1820            "number does not fall in the expected snapshot range : "
1821            "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64 ") <= "
1822            "snapshot_end (%" PRIu64 ")]; Dropping the deletion!",
1823            vb_, cur_snapshot_start.load(),
1824            deletion->getBySeqno(), cur_snapshot_end.load());
1825        return ENGINE_ERANGE;
1826    }
1827
1828    uint64_t delCas = 0;
1829    ENGINE_ERROR_CODE ret;
1830    ItemMetaData meta = deletion->getItem()->getMetaData();
1831
1832    // MB-17517: Check for the incoming item's CAS validity.
1833    if (!Item::isValidCas(meta.cas)) {
1834        LOG(EXTENSION_LOG_WARNING,
1835            "%s Invalid CAS (0x%" PRIx64 ") received for deletion {vb:%" PRIu16
1836            ", seqno:%" PRId64 "}. Regenerating new CAS",
1837            consumer->logHeader(), meta.cas, vb_, deletion->getBySeqno());
1838        meta.cas = Item::nextCas();
1839    }
1840
1841    ret = engine->getEpStore()->deleteWithMeta(deletion->getItem()->getKey(),
1842                                               &delCas, NULL, deletion->getVBucket(),
1843                                               consumer->getCookie(), true,
1844                                               &meta, vb->isBackfillPhase(),
1845                                               GenerateBySeqno::No,
1846                                               GenerateCas::No,
1847                                               deletion->getBySeqno(),
1848                                               deletion->getExtMetaData(),
1849                                               true);
1850    if (ret == ENGINE_KEY_ENOENT) {
1851        ret = ENGINE_SUCCESS;
1852    }
1853
1854    // We should probably handle these error codes in a better way, but since
1855    // the producer side doesn't do anything with them anyways let's just log
1856    // them for now until we come up with a better solution.
1857    if (ret != ENGINE_SUCCESS) {
1858        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1859            "Got an error code %d while trying to process deletion", ret);
1860    } else {
1861        handleSnapshotEnd(vb, deletion->getBySeqno());
1862    }
1863
1864    return ret;
1865}
1866
1867void PassiveStream::processMarker(SnapshotMarker* marker) {
1868    RCPtr<VBucket> vb = engine->getVBucket(vb_);
1869
1870    cur_snapshot_start.store(marker->getStartSeqno());
1871    cur_snapshot_end.store(marker->getEndSeqno());
1872    cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory);
1873
1874    if (vb) {
1875        if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
1876            vb->setBackfillPhase(true);
1877            // calling checkpointManager.setBackfillPhase sets the
1878            // openCheckpointId to zero
1879            vb->checkpointManager.setBackfillPhase(cur_snapshot_start.load(),
1880                                                   cur_snapshot_end.load());
1881        } else {
1882            if (marker->getFlags() & MARKER_FLAG_CHK ||
1883                    vb->checkpointManager.getOpenCheckpointId() == 0) {
1884                vb->checkpointManager.createSnapshot(cur_snapshot_start.load(),
1885                                                     cur_snapshot_end.load());
1886            } else {
1887                vb->checkpointManager.updateCurrentSnapshotEnd(cur_snapshot_end.load());
1888            }
1889            vb->setBackfillPhase(false);
1890        }
1891
1892        if (marker->getFlags() & MARKER_FLAG_ACK) {
1893            cur_snapshot_ack = true;
1894        }
1895    }
1896}
1897
1898void PassiveStream::processSetVBucketState(SetVBucketState* state) {
1899    engine->getEpStore()->setVBucketState(vb_, state->getState(), true);
1900
1901    LockHolder lh (streamMutex);
1902    pushToReadyQ(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
1903    lh.unlock();
1904    bool inverse = false;
1905    if (itemsReady.compare_exchange_strong(inverse, true)) {
1906        consumer->notifyStreamReady(vb_);
1907    }
1908}
1909
1910void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
1911    if (byseqno == cur_snapshot_end.load()) {
1912        if (cur_snapshot_type.load() == disk && vb->isBackfillPhase()) {
1913            vb->setBackfillPhase(false);
1914            uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1915            vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1916        } else {
1917            size_t mem_threshold = engine->getEpStats().mem_high_wat.load();
1918            size_t mem_used = engine->getEpStats().getTotalMemoryUsed();
1919            /* We want to add a new replica checkpoint if the mem usage is above
1920               high watermark (85%) */
1921            if (mem_threshold < mem_used) {
1922                uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1923                vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1924            }
1925        }
1926
1927        if (cur_snapshot_ack) {
1928            LockHolder lh(streamMutex);
1929            pushToReadyQ(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
1930            lh.unlock();
1931            bool inverse = false;
1932            if (itemsReady.compare_exchange_strong(inverse, true)) {
1933                consumer->notifyStreamReady(vb_);
1934            }
1935            cur_snapshot_ack = false;
1936        }
1937        cur_snapshot_type.store(none);
1938    }
1939}
1940
1941void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
1942    Stream::addStats(add_stat, c);
1943
1944    try {
1945        const int bsize = 1024;
1946        char buf[bsize];
1947        checked_snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(),
1948                         vb_);
1949        add_casted_stat(buf, buffer.items, add_stat, c);
1950        checked_snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(),
1951                         vb_);
1952        add_casted_stat(buf, buffer.bytes, add_stat, c);
1953        checked_snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(),
1954                         vb_);
1955        add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
1956        checked_snprintf(buf, bsize, "%s:stream_%d_last_received_seqno",
1957                         name_.c_str(), vb_);
1958        add_casted_stat(buf, last_seqno.load(), add_stat, c);
1959        checked_snprintf(buf, bsize, "%s:stream_%d_ready_queue_memory",
1960                         name_.c_str(), vb_);
1961        add_casted_stat(buf, getReadyQueueMemory(), add_stat, c);
1962
1963        checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type",
1964                         name_.c_str(), vb_);
1965        add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type.load()),
1966                        add_stat, c);
1967
1968        if (cur_snapshot_type.load() != none) {
1969            checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start",
1970                             name_.c_str(), vb_);
1971            add_casted_stat(buf, cur_snapshot_start.load(), add_stat, c);
1972            checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end",
1973                             name_.c_str(), vb_);
1974            add_casted_stat(buf, cur_snapshot_end.load(), add_stat, c);
1975        }
1976    } catch (std::exception& error) {
1977        LOG(EXTENSION_LOG_WARNING,
1978            "PassiveStream::addStats: Failed to build stats: %s", error.what());
1979    }
1980}
1981
1982DcpResponse* PassiveStream::next() {
1983    LockHolder lh(streamMutex);
1984
1985    if (readyQ.empty()) {
1986        itemsReady.store(false);
1987        return NULL;
1988    }
1989
1990    DcpResponse* response = readyQ.front();
1991    popFromReadyQ();
1992    return response;
1993}
1994
1995uint32_t PassiveStream::clearBuffer_UNLOCKED() {
1996    uint32_t unackedBytes = buffer.bytes;
1997
1998    while (!buffer.messages.empty()) {
1999        DcpResponse* resp = buffer.messages.front();
2000        buffer.messages.pop();
2001        delete resp;
2002    }
2003
2004    buffer.bytes = 0;
2005    buffer.items = 0;
2006    return unackedBytes;
2007}
2008
2009bool PassiveStream::transitionState(stream_state_t newState) {
2010    consumer->getLogger().log(EXTENSION_LOG_DEBUG,
2011        "(vb %d) Transitioning from %s to %s",
2012        vb_, stateName(state_), stateName(newState));
2013
2014    if (state_ == newState) {
2015        return false;
2016    }
2017
2018    bool validTransition = false;
2019    switch (state_.load()) {
2020        case STREAM_PENDING:
2021            if (newState == STREAM_READING || newState == STREAM_DEAD) {
2022                validTransition = true;
2023            }
2024            break;
2025
2026        case STREAM_BACKFILLING:
2027        case STREAM_IN_MEMORY:
2028        case STREAM_TAKEOVER_SEND:
2029        case STREAM_TAKEOVER_WAIT:
2030            // Not valid for passive streams
2031            break;
2032
2033        case STREAM_READING:
2034            if (newState == STREAM_PENDING || newState == STREAM_DEAD) {
2035                validTransition = true;
2036            }
2037            break;
2038
2039        case STREAM_DEAD:
2040            // Once 'dead' shouldn't transition away from it.
2041            break;
2042    }
2043
2044    if (!validTransition) {
2045        throw std::invalid_argument("PassiveStream::transitionState:"
2046                " newState (which is" + std::to_string(newState) +
2047                ") is not valid for current state (which is " +
2048                std::to_string(state_) + ")");
2049    }
2050
2051    state_ = newState;
2052    return true;
2053}
2054
2055const char* PassiveStream::getEndStreamStatusStr(end_stream_status_t status)
2056{
2057    switch (status) {
2058        case END_STREAM_OK:
2059            return "The stream closed as part of normal operation";
2060        case END_STREAM_CLOSED:
2061            return "The stream closed due to a close stream message";
2062        case END_STREAM_DISCONNECTED:
2063            return "The stream closed early because the conn was disconnected";
2064        case END_STREAM_STATE:
2065            return "The stream closed early because the vbucket state changed";
2066        default:
2067            break;
2068    }
2069    std::string msg("Status unknown: " + std::to_string(status) +
2070                    "; this should not have happened!");
2071    return msg.c_str();
2072}
2073