xref: /4.6.4/ep-engine/src/dcp/stream.cc (revision 0b5d8a3c)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <platform/checked_snprintf.h>
21
22#include "ep_engine.h"
23#include "failover-table.h"
24#include "kvstore.h"
25#include "statwriter.h"
26#include "dcp/backfill-manager.h"
27#include "dcp/backfill.h"
28#include "dcp/consumer.h"
29#include "dcp/producer.h"
30#include "dcp/response.h"
31#include "dcp/stream.h"
32#include "replicationthrottle.h"
33
34static const char* snapshotTypeToString(snapshot_type_t type) {
35    static const char * const snapshotTypes[] = { "none", "disk", "memory" };
36    if (type < none || type > memory) {
37        throw std::invalid_argument("snapshotTypeToString: type (which is " +
38                                    std::to_string(type) +
39                                    ") is not a valid snapshot_type_t");
40    }
41    return snapshotTypes[type];
42}
43
44const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
45
46Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
47               uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
48               uint64_t vb_uuid, uint64_t snap_start_seqno,
49               uint64_t snap_end_seqno)
50    : name_(name), flags_(flags), opaque_(opaque), vb_(vb),
51      start_seqno_(start_seqno), end_seqno_(end_seqno), vb_uuid_(vb_uuid),
52      snap_start_seqno_(snap_start_seqno),
53      snap_end_seqno_(snap_end_seqno),
54      state_(STREAM_PENDING), itemsReady(false),
55      readyQ_non_meta_items(0),
56      readyQueueMemory(0) {
57}
58
59Stream::~Stream() {
60    // NB: reusing the "unlocked" method without a lock because we're
61    // destructing and should not take any locks.
62    clear_UNLOCKED();
63}
64
65void Stream::clear_UNLOCKED() {
66    while (!readyQ.empty()) {
67        DcpResponse* resp = readyQ.front();
68        popFromReadyQ();
69        delete resp;
70    }
71}
72
73void Stream::pushToReadyQ(DcpResponse* resp)
74{
75   /* expect streamMutex.ownsLock() == true */
76    if (resp) {
77        readyQ.push(resp);
78        if (!resp->isMetaEvent()) {
79            readyQ_non_meta_items++;
80        }
81        readyQueueMemory.fetch_add(resp->getMessageSize(),
82                                   std::memory_order_relaxed);
83    }
84}
85
86void Stream::popFromReadyQ(void)
87{
88    /* expect streamMutex.ownsLock() == true */
89    if (!readyQ.empty()) {
90        const auto& front = readyQ.front();
91        if (!front->isMetaEvent()) {
92            readyQ_non_meta_items--;
93        }
94        const uint32_t respSize = front->getMessageSize();
95        readyQ.pop();
96
97        /* Decrement the readyQ size */
98        if (respSize <= readyQueueMemory.load(std::memory_order_relaxed)) {
99            readyQueueMemory.fetch_sub(respSize, std::memory_order_relaxed);
100        } else {
101            LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
102                "underflow, likely wrong stat calculation! curr size: %" PRIu64
103                "; new size: %d",
104                name_.c_str(), getVBucket(),
105                readyQueueMemory.load(std::memory_order_relaxed), respSize);
106            readyQueueMemory.store(0, std::memory_order_relaxed);
107        }
108    }
109}
110
111uint64_t Stream::getReadyQueueMemory() {
112    return readyQueueMemory.load(std::memory_order_relaxed);
113}
114
115const char* Stream::stateName(stream_state_t st) {
116    static const char * const stateNames[] = {
117        "pending", "backfilling", "in-memory", "takeover-send", "takeover-wait",
118        "reading", "dead"
119    };
120    if (st < STREAM_PENDING || st > STREAM_DEAD) {
121        throw std::invalid_argument("Stream::stateName: st (which is " +
122                                        std::to_string(st) +
123                                        ") is not a valid stream_state_t");
124    }
125    return stateNames[st];
126}
127
128void Stream::addStats(ADD_STAT add_stat, const void *c) {
129    try {
130        const int bsize = 1024;
131        char buffer[bsize];
132        checked_snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(),
133                         vb_);
134        add_casted_stat(buffer, flags_, add_stat, c);
135        checked_snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(),
136                         vb_);
137        add_casted_stat(buffer, opaque_, add_stat, c);
138        checked_snprintf(buffer, bsize, "%s:stream_%d_start_seqno",
139                         name_.c_str(), vb_);
140        add_casted_stat(buffer, start_seqno_, add_stat, c);
141        checked_snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(),
142                         vb_);
143        add_casted_stat(buffer, end_seqno_, add_stat, c);
144        checked_snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(),
145                         vb_);
146        add_casted_stat(buffer, vb_uuid_, add_stat, c);
147        checked_snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno",
148                         name_.c_str(), vb_);
149        add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
150        checked_snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno",
151                         name_.c_str(), vb_);
152        add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
153        checked_snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(),
154                         vb_);
155        add_casted_stat(buffer, stateName(state_), add_stat, c);
156    } catch (std::exception& error) {
157        LOG(EXTENSION_LOG_WARNING,
158            "Stream::addStats: Failed to build stats: %s", error.what());
159    }
160}
161
162ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
163                           const std::string &n, uint32_t flags,
164                           uint32_t opaque, uint16_t vb, uint64_t st_seqno,
165                           uint64_t en_seqno, uint64_t vb_uuid,
166                           uint64_t snap_start_seqno, uint64_t snap_end_seqno)
167    :  Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
168              snap_start_seqno, snap_end_seqno),
169       isBackfillTaskRunning(false), pendingBackfill(false),
170       lastReadSeqnoUnSnapshotted(st_seqno), lastReadSeqno(st_seqno),
171       lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
172       takeoverState(vbucket_state_pending), backfillRemaining(0),
173       itemsFromMemoryPhase(0), firstMarkerSent(false), waitForSnapshot(0),
174       engine(e), producer(p),
175       payloadType((flags & DCP_ADD_STREAM_FLAG_NO_VALUE) ? KEY_ONLY :
176                                                            KEY_VALUE),
177       lastSentSnapEndSeqno(0), chkptItemsExtractionInProgress(false) {
178
179    const char* type = "";
180    if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
181        type = "takeover ";
182        end_seqno_ = dcpMaxSeqno;
183    }
184
185    RCPtr<VBucket> vbucket = engine->getVBucket(vb);
186    if (vbucket) {
187        ReaderLockHolder rlh(vbucket->getStateLock());
188        if (vbucket->getState() == vbucket_state_replica) {
189            snapshot_info_t info = vbucket->checkpointManager.getSnapshotInfo();
190            if (info.range.end > en_seqno) {
191                end_seqno_ = info.range.end;
192            }
193        }
194    }
195
196    producer->getLogger().log(EXTENSION_LOG_NOTICE,
197        "(vb %" PRIu16 ") Creating %sstream with start seqno %" PRIu64
198        " and end seqno %" PRIu64, vb, type, st_seqno, en_seqno);
199
200    backfillItems.memory = 0;
201    backfillItems.disk = 0;
202    backfillItems.sent = 0;
203
204    type_ = STREAM_ACTIVE;
205
206    bufferedBackfill.bytes = 0;
207    bufferedBackfill.items = 0;
208
209    takeoverStart = 0;
210    takeoverSendMaxTime = engine->getConfiguration().getDcpTakeoverMaxTime();
211
212    if (start_seqno_ >= end_seqno_) {
213        /* streamMutex lock needs to be acquired because endStream
214         * potentially makes call to pushToReadyQueue.
215         */
216        LockHolder lh(streamMutex);
217        endStream(END_STREAM_OK);
218        itemsReady.store(true);
219        // lock is released on leaving the scope
220    }
221}
222
223ActiveStream::~ActiveStream() {
224    transitionState(STREAM_DEAD);
225}
226
227DcpResponse* ActiveStream::next() {
228    LockHolder lh(streamMutex);
229
230    stream_state_t initState = state_;
231
232    DcpResponse* response = NULL;
233
234    bool validTransition = false;
235    switch (initState) {
236        case STREAM_PENDING:
237            validTransition = true;
238            break;
239        case STREAM_BACKFILLING:
240            validTransition = true;
241            response = backfillPhase();
242            break;
243        case STREAM_IN_MEMORY:
244            validTransition = true;
245            response = inMemoryPhase();
246            break;
247        case STREAM_TAKEOVER_SEND:
248            validTransition = true;
249            response = takeoverSendPhase();
250            break;
251        case STREAM_TAKEOVER_WAIT:
252            validTransition = true;
253            response = takeoverWaitPhase();
254            break;
255        case STREAM_READING:
256            // Not valid for an active stream.
257            break;
258        case STREAM_DEAD:
259            validTransition = true;
260            response = deadPhase();
261            break;
262    }
263
264    if (!validTransition) {
265        throw std::invalid_argument("ActiveStream::transitionState:"
266                " invalid state " + std::to_string(state_) + " for stream " +
267                producer->logHeader() + " vb " + std::to_string(vb_));
268    }
269
270    stream_state_t newState = state_;
271
272    if (newState != STREAM_DEAD && newState != state_ && !response) {
273        lh.unlock();
274        return next();
275    }
276
277    itemsReady.store(response ? true : false);
278    return response;
279}
280
281void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
282    LockHolder lh(streamMutex);
283    uint64_t chkCursorSeqno = endSeqno;
284
285    if (state_ != STREAM_BACKFILLING) {
286        return;
287    }
288
289    startSeqno = std::min(snap_start_seqno_, startSeqno);
290    firstMarkerSent = true;
291
292    RCPtr<VBucket> vb = engine->getVBucket(vb_);
293    // An atomic read of vbucket state without acquiring the
294    // reader lock for state should suffice here.
295    if (vb && vb->getState() == vbucket_state_replica) {
296        if (end_seqno_ > endSeqno) {
297            /* We possibly have items in the open checkpoint
298               (incomplete snapshot) */
299            snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
300            producer->getLogger().log(EXTENSION_LOG_NOTICE,
301                "(vb %" PRIu16 ") Merging backfill and memory snapshot for a "
302                "replica vbucket, backfill start seqno %" PRIu64 ", "
303                "backfill end seqno %" PRIu64 ", "
304                "snapshot end seqno after merge %" PRIu64,
305                vb_, startSeqno, endSeqno, info.range.end);
306            endSeqno = info.range.end;
307        }
308    }
309
310    producer->getLogger().log(EXTENSION_LOG_NOTICE,
311        "(vb %" PRIu16 ") Sending disk snapshot with start seqno %" PRIu64
312        " and end seqno %" PRIu64, vb_, startSeqno, endSeqno);
313    pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
314                                    MARKER_FLAG_DISK));
315    lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
316
317    if (!vb) {
318        endStream(END_STREAM_STATE);
319    } else if (!(flags_ & DCP_ADD_STREAM_FLAG_DISKONLY)) {
320        // Only re-register the cursor if we still need to get memory snapshots
321        CursorRegResult result =
322            vb->checkpointManager.registerCursorBySeqno(
323                                                name_, chkCursorSeqno,
324                                                MustSendCheckpointEnd::NO);
325        curChkSeqno = result.first;
326    }
327
328    lh.unlock();
329    bool inverse = false;
330    if (itemsReady.compare_exchange_strong(inverse, true)) {
331        producer->notifyStreamReady(vb_);
332    }
333}
334
335bool ActiveStream::backfillReceived(Item* itm, backfill_source_t backfill_source) {
336    if (nullptr == itm) {
337        return false;
338    }
339
340    if (itm->shouldReplicate()) {
341        LockHolder lh(streamMutex);
342        if (state_ == STREAM_BACKFILLING) {
343            if (!producer->recordBackfillManagerBytesRead(itm->size())) {
344                delete itm;
345                return false;
346            }
347
348            bufferedBackfill.bytes.fetch_add(itm->size());
349            bufferedBackfill.items++;
350
351            pushToReadyQ(new MutationResponse(itm, opaque_, nullptr));
352
353            lastReadSeqno.store(itm->getBySeqno());
354            lh.unlock();
355            bool inverse = false;
356            if (itemsReady.compare_exchange_strong(inverse, true)) {
357                producer->notifyStreamReady(vb_);
358            }
359
360            if (backfill_source == BACKFILL_FROM_MEMORY) {
361                backfillItems.memory++;
362            } else {
363                backfillItems.disk++;
364            }
365        } else {
366            delete itm;
367        }
368    } else {
369        delete itm;
370    }
371
372    return true;
373}
374
375void ActiveStream::completeBackfill() {
376    {
377        LockHolder lh(streamMutex);
378        if (state_ == STREAM_BACKFILLING) {
379            producer->getLogger().log(EXTENSION_LOG_NOTICE,
380                    "(vb %" PRIu16 ") Backfill complete, %" PRIu64 " items "
381                    "read from disk, %" PRIu64 " from memory, last seqno read: "
382                    "%" PRIu64 ", pendingBackfill : %s",
383                    vb_, uint64_t(backfillItems.disk.load()),
384                    uint64_t(backfillItems.memory.load()),
385                    lastReadSeqno.load(),
386                    pendingBackfill ? "True" : "False");
387
388            isBackfillTaskRunning = false;
389            if (pendingBackfill) {
390                scheduleBackfill_UNLOCKED(true);
391                pendingBackfill = false;
392            }
393
394            bool expected = false;
395            if (itemsReady.compare_exchange_strong(expected, true)) {
396                producer->notifyStreamReady(vb_);
397            }
398
399            /**
400              * MB-22451: It is important that we return here because
401              * scheduleBackfill_UNLOCKED(true) can set
402              * isBackfillTaskRunning to true.  Therefore if we don't return we
403              * will set isBackfillTaskRunning prematurely back to false, (see
404              * below).
405              */
406            return;
407        }
408    }
409
410    bool inverse = true;
411    isBackfillTaskRunning.compare_exchange_strong(inverse, false);
412    inverse = false;
413    if (itemsReady.compare_exchange_strong(inverse, true)) {
414        producer->notifyStreamReady(vb_);
415    }
416}
417
418void ActiveStream::snapshotMarkerAckReceived() {
419    bool inverse = false;
420    if (--waitForSnapshot == 0 &&
421        itemsReady.compare_exchange_strong(inverse, true)) {
422        producer->notifyStreamReady(vb_);
423    }
424}
425
426void ActiveStream::setVBucketStateAckRecieved() {
427    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
428    if (!vbucket) {
429        producer->getLogger().log(EXTENSION_LOG_WARNING,
430                                  "(vb %" PRIu16
431                                  ") not present during ack for set vbucket "
432                                  "during takeover",
433                                  vb_);
434        return;
435    }
436
437    {
438        /* Order in which the below 3 locks are acquired is important to avoid
439           any potential lock inversion problems */
440        LockHolder epVbSetLh(engine->getEpStore()->getVbSetMutexLock());
441        WriterLockHolder vbStateLh(vbucket->getStateLock());
442        LockHolder lh(streamMutex);
443        if (state_ == STREAM_TAKEOVER_WAIT) {
444            if (takeoverState == vbucket_state_pending) {
445                producer->getLogger().log(EXTENSION_LOG_INFO,
446                        "(vb %" PRIu16 ") Receive ack for set vbucket state to "
447                        "pending message",
448                        vb_);
449
450                takeoverState = vbucket_state_active;
451                transitionState(STREAM_TAKEOVER_SEND);
452
453                engine->getEpStore()->setVBucketState_UNLOCKED(
454                                                        vb_,
455                                                        vbucket_state_dead,
456                                                        false /* transfer */,
457                                                        false /* notify_dcp */,
458                                                        epVbSetLh,
459                                                        &vbStateLh);
460
461                producer->getLogger().log(EXTENSION_LOG_NOTICE,
462                        "(vb %" PRIu16 ") Vbucket marked as dead, last sent "
463                        "seqno: %" PRIu64 ", high seqno: %" PRIu64,
464                        vb_,
465                        lastSentSeqno.load(),
466                        vbucket->getHighSeqno());
467            } else {
468                producer->getLogger().log(EXTENSION_LOG_NOTICE,
469                        "(vb %" PRIu16 ") Receive ack for set vbucket state to "
470                        "active message",
471                        vb_);
472                endStream(END_STREAM_OK);
473            }
474        } else {
475            producer->getLogger().log(EXTENSION_LOG_WARNING,
476                    "(vb %" PRIu16 ") Unexpected ack for set vbucket op on "
477                    "stream '%s' state '%s'",
478                    vb_,
479                    name_.c_str(),
480                    stateName(state_));
481            return;
482        }
483    }
484
485    bool inverse = false;
486    if (itemsReady.compare_exchange_strong(inverse, true)) {
487        producer->notifyStreamReady(vb_);
488    }
489}
490
491DcpResponse* ActiveStream::backfillPhase() {
492    DcpResponse* resp = nextQueuedItem();
493
494    if (resp && (resp->getEvent() == DCP_MUTATION ||
495         resp->getEvent() == DCP_DELETION ||
496         resp->getEvent() == DCP_EXPIRATION)) {
497        MutationResponse* m = static_cast<MutationResponse*>(resp);
498        producer->recordBackfillManagerBytesSent(m->getItem()->size());
499        bufferedBackfill.bytes.fetch_sub(m->getItem()->size());
500        bufferedBackfill.items--;
501        if (backfillRemaining.load(std::memory_order_relaxed) > 0) {
502            backfillRemaining.fetch_sub(1, std::memory_order_relaxed);
503        }
504    }
505
506    if (!isBackfillTaskRunning && readyQ.empty()) {
507        backfillRemaining.store(0, std::memory_order_relaxed);
508        if (lastReadSeqno.load() >= end_seqno_) {
509            endStream(END_STREAM_OK);
510        } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
511            transitionState(STREAM_TAKEOVER_SEND);
512        } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
513            endStream(END_STREAM_OK);
514        } else {
515            transitionState(STREAM_IN_MEMORY);
516        }
517
518        if (!resp) {
519            resp = nextQueuedItem();
520        }
521    }
522
523    return resp;
524}
525
526DcpResponse* ActiveStream::inMemoryPhase() {
527    if (lastSentSeqno.load() >= end_seqno_) {
528        endStream(END_STREAM_OK);
529    } else if (readyQ.empty()) {
530        if (pendingBackfill) {
531            transitionState(STREAM_BACKFILLING);
532            pendingBackfill = false;
533            return NULL;
534        } else if (nextCheckpointItem()) {
535            return NULL;
536        }
537    }
538
539    return nextQueuedItem();
540}
541
542DcpResponse* ActiveStream::takeoverSendPhase() {
543
544    RCPtr<VBucket> vb = engine->getVBucket(vb_);
545    if (vb && takeoverStart != 0 &&
546        !vb->isTakeoverBackedUp() &&
547        (ep_current_time() - takeoverStart) > takeoverSendMaxTime) {
548        vb->setTakeoverBackedUpState(true);
549    }
550
551    if (!readyQ.empty()) {
552        return nextQueuedItem();
553    } else {
554        if (nextCheckpointItem()) {
555            return NULL;
556        }
557    }
558
559    if (waitForSnapshot != 0) {
560        return NULL;
561    }
562
563    if (vb) {
564        vb->setTakeoverBackedUpState(false);
565        takeoverStart = 0;
566    }
567
568    DcpResponse* resp = NULL;
569    if (producer->bufferLogInsert(SetVBucketState::baseMsgBytes)) {
570        resp = new SetVBucketState(opaque_, vb_, takeoverState);
571        transitionState(STREAM_TAKEOVER_WAIT);
572    }
573    return resp;
574}
575
576DcpResponse* ActiveStream::takeoverWaitPhase() {
577    return nextQueuedItem();
578}
579
580DcpResponse* ActiveStream::deadPhase() {
581    DcpResponse* resp = nextQueuedItem();
582    if (!resp) {
583        producer->getLogger().log(EXTENSION_LOG_NOTICE,
584                                  "(vb %" PRIu16 ") Stream closed, "
585                                  "%" PRIu64 " items sent from backfill phase, "
586                                  "%" PRIu64 " items sent from memory phase, "
587                                  "%" PRIu64 " was last seqno sent",
588                                  vb_,
589                                  uint64_t(backfillItems.sent.load()),
590                                  uint64_t(itemsFromMemoryPhase.load()),
591                                  lastSentSeqno.load());
592    }
593    return resp;
594}
595
596bool ActiveStream::isCompressionEnabled() {
597    return producer->isValueCompressionEnabled();
598}
599
600void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
601    Stream::addStats(add_stat, c);
602
603    try {
604        const int bsize = 1024;
605        char buffer[bsize];
606        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_disk_items",
607                         name_.c_str(), vb_);
608        add_casted_stat(buffer, backfillItems.disk, add_stat, c);
609        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_mem_items",
610                         name_.c_str(), vb_);
611        add_casted_stat(buffer, backfillItems.memory, add_stat, c);
612        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_sent",
613                         name_.c_str(), vb_);
614        add_casted_stat(buffer, backfillItems.sent, add_stat, c);
615        checked_snprintf(buffer, bsize, "%s:stream_%d_memory_phase",
616                         name_.c_str(), vb_);
617        add_casted_stat(buffer, itemsFromMemoryPhase.load(), add_stat, c);
618        checked_snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno",
619                         name_.c_str(), vb_);
620        add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
621        checked_snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
622                         name_.c_str(), vb_);
623        add_casted_stat(buffer,
624                        lastSentSnapEndSeqno.load(std::memory_order_relaxed),
625                        add_stat, c);
626        checked_snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno",
627                         name_.c_str(), vb_);
628        add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
629        checked_snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory",
630                         name_.c_str(), vb_);
631        add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
632        checked_snprintf(buffer, bsize, "%s:stream_%d_items_ready",
633                         name_.c_str(), vb_);
634        add_casted_stat(buffer, itemsReady.load() ? "true" : "false", add_stat,
635                        c);
636        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_bytes",
637                         name_.c_str(), vb_);
638        add_casted_stat(buffer, bufferedBackfill.bytes, add_stat, c);
639        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_items",
640                         name_.c_str(), vb_);
641        add_casted_stat(buffer, bufferedBackfill.items, add_stat, c);
642
643        if ((state_ == STREAM_TAKEOVER_SEND) && takeoverStart != 0) {
644            checked_snprintf(buffer, bsize, "%s:stream_%d_takeover_since",
645                             name_.c_str(), vb_);
646            add_casted_stat(buffer, ep_current_time() - takeoverStart, add_stat,
647                            c);
648        }
649    } catch (std::exception& error) {
650        LOG(EXTENSION_LOG_WARNING,
651            "ActiveStream::addStats: Failed to build stats: %s", error.what());
652    }
653}
654
655void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
656    LockHolder lh(streamMutex);
657
658    RCPtr<VBucket> vb = engine->getVBucket(vb_);
659    add_casted_stat("name", name_, add_stat, cookie);
660    if (!vb || state_ == STREAM_DEAD) {
661        add_casted_stat("status", "completed", add_stat, cookie);
662        add_casted_stat("estimate", 0, add_stat, cookie);
663        add_casted_stat("backfillRemaining", 0, add_stat, cookie);
664        return;
665    }
666
667    size_t total = backfillRemaining.load(std::memory_order_relaxed);
668    if (state_ == STREAM_BACKFILLING) {
669        add_casted_stat("status", "backfilling", add_stat, cookie);
670    } else {
671        add_casted_stat("status", "in-memory", add_stat, cookie);
672    }
673    add_casted_stat("backfillRemaining",
674                    backfillRemaining.load(std::memory_order_relaxed),
675                    add_stat, cookie);
676
677    item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
678    size_t vb_items = vb->getNumItems(iep);
679    size_t chk_items = vb_items > 0 ?
680                vb->checkpointManager.getNumItemsForCursor(name_) : 0;
681
682    size_t del_items = 0;
683    try {
684        del_items = engine->getEpStore()->getRWUnderlying(vb_)->
685                                                        getNumPersistedDeletes(vb_);
686    } catch (std::runtime_error& e) {
687        producer->getLogger().log(EXTENSION_LOG_WARNING,
688            "ActiveStream:addTakeoverStats: exception while getting num persisted "
689            "deletes for vbucket:%" PRIu16 " - treating as 0 deletes. "
690            "Details: %s", vb_, e.what());
691    }
692
693    if (end_seqno_ < curChkSeqno) {
694        chk_items = 0;
695    } else if ((end_seqno_ - curChkSeqno) < chk_items) {
696        chk_items = end_seqno_ - curChkSeqno + 1;
697    }
698    total += chk_items;
699
700    add_casted_stat("estimate", total, add_stat, cookie);
701    add_casted_stat("chk_items", chk_items, add_stat, cookie);
702    add_casted_stat("vb_items", vb_items, add_stat, cookie);
703    add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
704}
705
706DcpResponse* ActiveStream::nextQueuedItem() {
707    if (!readyQ.empty()) {
708        DcpResponse* response = readyQ.front();
709        if (producer->bufferLogInsert(response->getMessageSize())) {
710            if (response->getEvent() == DCP_MUTATION ||
711                    response->getEvent() == DCP_DELETION ||
712                    response->getEvent() == DCP_EXPIRATION) {
713                lastSentSeqno.store(
714                        dynamic_cast<MutationResponse*>(response)->getBySeqno());
715
716                if (state_ == STREAM_BACKFILLING) {
717                    backfillItems.sent++;
718                } else {
719                    itemsFromMemoryPhase++;
720                }
721            }
722            popFromReadyQ();
723            return response;
724        }
725    }
726    return NULL;
727}
728
729bool ActiveStream::nextCheckpointItem() {
730    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
731    if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(name_) > 0) {
732        // schedule this stream to build the next checkpoint
733        producer->scheduleCheckpointProcessorTask(this);
734        return true;
735    } else if (chkptItemsExtractionInProgress) {
736        return true;
737    }
738    return false;
739}
740
741bool ActiveStreamCheckpointProcessorTask::run() {
742    if (engine->getEpStats().isShutdown) {
743        return false;
744    }
745
746    // Setup that we will sleep forever when done.
747    snooze(INT_MAX);
748
749    // Clear the notfification flag
750    notified.store(false);
751
752    size_t iterations = 0;
753    do {
754        stream_t nextStream = queuePop();
755        ActiveStream* stream = static_cast<ActiveStream*>(nextStream.get());
756
757        if (stream) {
758            stream->nextCheckpointItemTask();
759        } else {
760            break;
761        }
762        iterations++;
763    } while(!queueEmpty()
764            && iterations < iterationsBeforeYield);
765
766    // Now check if we were re-notified or there are still checkpoints
767    bool expected = true;
768    if (notified.compare_exchange_strong(expected, false)
769        || !queueEmpty()) {
770        // snooze for 0, essentially yielding and allowing other tasks a go
771        snooze(0.0);
772    }
773
774    return true;
775}
776
777void ActiveStreamCheckpointProcessorTask::wakeup() {
778    ExecutorPool::get()->wake(getId());
779}
780
781void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
782    pushUnique(stream);
783
784    bool expected = false;
785    if (notified.compare_exchange_strong(expected, true)) {
786        wakeup();
787    }
788}
789
790void ActiveStreamCheckpointProcessorTask::clearQueues() {
791    LockHolder lh(workQueueLock);
792    while (!queue.empty()) {
793        queue.pop();
794    }
795    queuedVbuckets.clear();
796}
797
798void ActiveStream::nextCheckpointItemTask() {
799    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
800    if (vbucket) {
801        std::vector<queued_item> items;
802        getOutstandingItems(vbucket, items);
803        processItems(items);
804    } else {
805        /* The entity deleting the vbucket must set stream to dead,
806           calling setDead(END_STREAM_STATE) will cause deadlock because
807           it will try to grab streamMutex which is already acquired at this
808           point here */
809        return;
810    }
811}
812
813void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
814                                       std::vector<queued_item> &items) {
815    // Commencing item processing - set guard flag.
816    chkptItemsExtractionInProgress.store(true);
817
818    vb->checkpointManager.getAllItemsForCursor(name_, items);
819    if (vb->checkpointManager.getNumCheckpoints() > 1) {
820        engine->getEpStore()->wakeUpCheckpointRemover();
821    }
822}
823
824void ActiveStream::processItems(std::vector<queued_item>& items) {
825    if (!items.empty()) {
826        bool mark = false;
827        if (items.front()->getOperation() == queue_op::checkpoint_start) {
828            mark = true;
829        }
830
831        std::deque<MutationResponse*> mutations;
832        std::vector<queued_item>::iterator itr = items.begin();
833        for (; itr != items.end(); ++itr) {
834            queued_item& qi = *itr;
835
836            if (qi->shouldReplicate()) {
837                curChkSeqno = qi->getBySeqno();
838                lastReadSeqnoUnSnapshotted = qi->getBySeqno();
839
840                mutations.push_back(new MutationResponse(qi, opaque_, nullptr,
841                            isSendMutationKeyOnlyEnabled() ? KEY_ONLY :
842                                                             KEY_VALUE));
843            } else if (qi->getOperation() == queue_op::checkpoint_start) {
844                /* if there are already other mutations, then they belong to the
845                   previous checkpoint and hence we must create a snapshot and
846                   put them onto readyQ */
847                if (!mutations.empty()) {
848                    snapshot(mutations, mark);
849                    /* clear out all the mutations since they are already put
850                       onto the readyQ */
851                    mutations.clear();
852                }
853                /* mark true as it indicates a new checkpoint snapshot */
854                mark = true;
855            }
856        }
857
858        if (mutations.empty()) {
859            // If we only got checkpoint start or ends check to see if there are
860            // any more snapshots before pausing the stream.
861            nextCheckpointItemTask();
862        } else {
863            snapshot(mutations, mark);
864        }
865    }
866
867    // Completed item processing - clear guard flag and notify producer.
868    chkptItemsExtractionInProgress.store(false);
869    producer->notifyStreamReady(vb_);
870}
871
872void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
873    if (items.empty()) {
874        return;
875    }
876
877    LockHolder lh(streamMutex);
878
879    if ((state_ == STREAM_DEAD) || (state_ == STREAM_BACKFILLING)) {
880        // If stream was closed forcefully by the time the checkpoint items
881        // retriever task completed, or if we decided to switch the stream to
882        // backfill state from in-memory state, none of the acquired mutations
883        // should be added on the stream's readyQ. We must drop items in case
884        // we switch state from in-memory to backfill because we schedule
885        // backfill from lastReadSeqno + 1
886        std::deque<MutationResponse *>::iterator itr = items.begin();
887        for (; itr != items.end(); ++itr) {
888            delete *itr;
889        }
890        items.clear();
891        return;
892    }
893
894    /* This assumes that all items in the "items deque" is put onto readyQ */
895    lastReadSeqno.store(lastReadSeqnoUnSnapshotted);
896
897    if (isCurrentSnapshotCompleted()) {
898        uint32_t flags = MARKER_FLAG_MEMORY;
899        uint64_t snapStart = items.front()->getBySeqno();
900        uint64_t snapEnd = items.back()->getBySeqno();
901
902        if (mark) {
903            flags |= MARKER_FLAG_CHK;
904        }
905
906        if (state_ == STREAM_TAKEOVER_SEND) {
907            waitForSnapshot++;
908            flags |= MARKER_FLAG_ACK;
909        }
910
911        if (!firstMarkerSent) {
912            snapStart = std::min(snap_start_seqno_, snapStart);
913            firstMarkerSent = true;
914        }
915        pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
916                                        flags));
917        lastSentSnapEndSeqno.store(snapEnd, std::memory_order_relaxed);
918    }
919
920    std::deque<MutationResponse*>::iterator itemItr;
921    for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
922        pushToReadyQ(*itemItr);
923    }
924}
925
926uint32_t ActiveStream::setDead(end_stream_status_t status) {
927    {
928        LockHolder lh(streamMutex);
929        endStream(status);
930    }
931
932    bool inverse = false;
933    if (status != END_STREAM_DISCONNECTED &&
934        itemsReady.compare_exchange_strong(inverse, true)) {
935        producer->notifyStreamReady(vb_);
936    }
937    return 0;
938}
939
940void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
941    if (state_ != STREAM_DEAD) {
942        bool inverse = false;
943        if (itemsReady.compare_exchange_strong(inverse, true)) {
944            producer->notifyStreamReady(vb_);
945        }
946    }
947}
948
949void ActiveStream::endStream(end_stream_status_t reason) {
950    if (state_ != STREAM_DEAD) {
951        pendingBackfill = false;
952        if (state_ == STREAM_BACKFILLING) {
953            // If Stream were in Backfilling state, clear out the
954            // backfilled items to clear up the backfill buffer.
955            clear_UNLOCKED();
956            producer->recordBackfillManagerBytesSent(bufferedBackfill.bytes);
957            bufferedBackfill.bytes = 0;
958            bufferedBackfill.items = 0;
959        }
960        transitionState(STREAM_DEAD);
961        if (reason != END_STREAM_DISCONNECTED) {
962            pushToReadyQ(new StreamEndResponse(opaque_, reason, vb_));
963        }
964        producer->getLogger().log(EXTENSION_LOG_NOTICE,
965                                  "(vb %" PRIu16 ") Stream closing, "
966                                  "sent until seqno %" PRIu64 " "
967                                  "remaining items %" PRIu64 ", "
968                                  "reason: %s",
969                                  vb_,
970                                  lastSentSeqno.load(),
971                                  uint64_t(readyQ_non_meta_items.load()),
972                                  getEndStreamStatusStr(reason));
973    }
974}
975
976void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
977    if (isBackfillTaskRunning) {
978        producer->getLogger().log(EXTENSION_LOG_NOTICE,
979                                  "(vb %" PRIu16 ") Skipping "
980                                  "scheduleBackfill_UNLOCKED; "
981                                  "lastReadSeqno %" PRIu64 ", reschedule flag "
982                                  ": %s", vb_, lastReadSeqno.load(),
983                                  reschedule ? "True" : "False");
984        return;
985    }
986
987    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
988    if (!vbucket) {
989        producer->getLogger().log(EXTENSION_LOG_WARNING,
990                                  "(vb %" PRIu16 ") Failed to schedule "
991                                  "backfill as unable to get vbucket; "
992                                  "lastReadSeqno : %" PRIu64 ", "
993                                  "reschedule : %s",
994                                  vb_, lastReadSeqno.load(),
995                                  reschedule ? "True" : "False");
996        return;
997    }
998
999    uint64_t backfillStart = lastReadSeqno.load() + 1;
1000    uint64_t backfillEnd = 0;
1001    bool tryBackfill = false;
1002
1003    if ((flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) || reschedule) {
1004        uint64_t vbHighSeqno = static_cast<uint64_t>(vbucket->getHighSeqno());
1005        if (lastReadSeqno.load() > vbHighSeqno) {
1006            throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
1007                                   "lastReadSeqno (which is " +
1008                                   std::to_string(lastReadSeqno.load()) +
1009                                   " ) is greater than vbHighSeqno (which is " +
1010                                   std::to_string(vbHighSeqno) + " ). " +
1011                                   "for stream " + producer->logHeader() +
1012                                   "; vb " + std::to_string(vb_));
1013        }
1014        if (reschedule) {
1015            /* We need to do this for reschedule because in case of
1016               DCP_ADD_STREAM_FLAG_DISKONLY (the else part), end_seqno_ is
1017               set to last persisted seqno befor calling
1018               scheduleBackfill_UNLOCKED() */
1019            backfillEnd = engine->getEpStore()->getLastPersistedSeqno(vb_);
1020        } else {
1021            backfillEnd = end_seqno_;
1022        }
1023        tryBackfill = true;
1024    } else {
1025        CursorRegResult result =
1026            vbucket->checkpointManager.registerCursorBySeqno(
1027                                                name_,
1028                                                lastReadSeqno.load(),
1029                                                MustSendCheckpointEnd::NO);
1030        curChkSeqno = result.first;
1031        tryBackfill = result.second;
1032
1033        if (lastReadSeqno.load() > curChkSeqno) {
1034            throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
1035                                   "lastReadSeqno (which is " +
1036                                   std::to_string(lastReadSeqno.load()) +
1037                                   " ) is greater than curChkSeqno (which is " +
1038                                   std::to_string(curChkSeqno) + " ). " +
1039                                   "for stream " + producer->logHeader() +
1040                                   "; vb " + std::to_string(vb_));
1041        }
1042
1043        /* We need to find the minimum seqno that needs to be backfilled in
1044         * order to make sure that we don't miss anything when transitioning
1045         * to a memory snapshot. The backfill task will always make sure that
1046         * the backfill end seqno is contained in the backfill.
1047         */
1048        if (backfillStart < curChkSeqno) {
1049            if (curChkSeqno > end_seqno_) {
1050                /* Backfill only is enough */
1051                backfillEnd = end_seqno_;
1052            } else {
1053                /* Backfill + in-memory streaming */
1054                backfillEnd = curChkSeqno - 1;
1055            }
1056        }
1057    }
1058
1059    if (backfillStart <= backfillEnd && tryBackfill) {
1060        producer->getLogger().log(EXTENSION_LOG_NOTICE,
1061                                  "(vb %" PRIu16 ") Scheduling backfill "
1062                                  "from %" PRIu64 " to %" PRIu64 ", reschedule "
1063                                  "flag : %s", vb_, backfillStart, backfillEnd,
1064                                  reschedule ? "True" : "False");
1065        producer->scheduleBackfillManager(this, backfillStart, backfillEnd);
1066        isBackfillTaskRunning.store(true);
1067    } else {
1068        if (reschedule) {
1069            // Infrequent code path, see comment below.
1070            producer->getLogger().log(EXTENSION_LOG_NOTICE,
1071                                      "(vb %" PRIu16 ") Did not schedule "
1072                                      "backfill with reschedule : True; "
1073                                      "backfillStart : %" PRIu64 ", "
1074                                      "backfillEnd : %" PRIu64 ", "
1075                                      "flags_ : %" PRIu32 ", "
1076                                      "tryBackfill : %s, "
1077                                      "start_seqno_ : %" PRIu64 ", "
1078                                      "end_seqno_ : %" PRIu64 ", "
1079                                      "lastReadSeqno : %" PRIu64 ", "
1080                                      "lastSentSeqno : %" PRIu64 ", "
1081                                      "curChkSeqno : %" PRIu64 ", "
1082                                      "itemsReady : %s",
1083                                      vb_, backfillStart, backfillEnd, flags_,
1084                                      tryBackfill ? "True" : "False",
1085                                      start_seqno_, end_seqno_,
1086                                      lastReadSeqno.load(),
1087                                      lastSentSeqno.load(), curChkSeqno.load(),
1088                                      itemsReady ? "True" : "False");
1089        }
1090        if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
1091            endStream(END_STREAM_OK);
1092        } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
1093            transitionState(STREAM_TAKEOVER_SEND);
1094        } else {
1095            transitionState(STREAM_IN_MEMORY);
1096        }
1097        if (reschedule) {
1098            /* Cursor was dropped, but we will not do backfill.
1099               This may happen in a corner case where, the memory
1100               usage is high due to other vbuckets and persistence cursor moves
1101               ahead of replication cursor to new checkpoint open but does not
1102               persist items yet.
1103               Note: (1) We must not notify when we schedule backfill for the
1104                         first time because the stream is not yet in producer
1105                         conn list of streams
1106                     (2) It is not absolutely necessary to notify immediately
1107                         as conn manager or an incoming items will cause a
1108                         notification eventually, but wouldn't hurt to do so */
1109            bool inverse = false;
1110            if (itemsReady.compare_exchange_strong(inverse, true)) {
1111                producer->notifyStreamReady(vb_);
1112            }
1113        }
1114    }
1115}
1116
1117void ActiveStream::handleSlowStream()
1118{
1119    LockHolder lh(streamMutex);
1120    producer->getLogger().log(EXTENSION_LOG_NOTICE,
1121                              "(vb %" PRIu16 ") Handling slow stream; "
1122                              "state_ : %s, "
1123                              "lastReadSeqno : %" PRIu64 ", "
1124                              "lastSentSeqno : %" PRIu64 ", "
1125                              "isBackfillTaskRunning : %s",
1126                              vb_, stateName(state_),
1127                              lastReadSeqno.load(),
1128                              lastSentSeqno.load(),
1129                              isBackfillTaskRunning.load() ? "True" : "False");
1130    switch (state_.load()) {
1131        case STREAM_BACKFILLING:
1132            if (isBackfillTaskRunning.load()) {
1133                /* Drop the existing cursor and set pending backfill */
1134                dropCheckpointCursor_UNLOCKED();
1135                pendingBackfill = true;
1136            } else {
1137                scheduleBackfill_UNLOCKED(true);
1138            }
1139            break;
1140        case STREAM_IN_MEMORY:
1141            /* Drop the existing cursor and set pending backfill */
1142            dropCheckpointCursor_UNLOCKED();
1143            pendingBackfill = true;
1144            break;
1145        case STREAM_TAKEOVER_SEND:
1146            /* To be handled later if needed */
1147        case STREAM_TAKEOVER_WAIT:
1148            /* To be handled later if needed */
1149        case STREAM_DEAD:
1150            /* To be handled later if needed */
1151            break;
1152        case STREAM_PENDING:
1153        case STREAM_READING:
1154            throw std::logic_error("ActiveStream::handleSlowStream: "
1155                                   "called with state " +
1156                                   std::to_string(state_.load()) + " . " +
1157                                   "for stream " + producer->logHeader() +
1158                                   "; vb " + std::to_string(vb_));
1159    }
1160}
1161
1162const char* ActiveStream::getEndStreamStatusStr(end_stream_status_t status)
1163{
1164    switch (status) {
1165    case END_STREAM_OK:
1166        return "The stream ended due to all items being streamed";
1167    case END_STREAM_CLOSED:
1168        return "The stream closed early due to a close stream message";
1169    case END_STREAM_STATE:
1170        return "The stream closed early because the vbucket state changed";
1171    case END_STREAM_DISCONNECTED:
1172        return "The stream closed early because the conn was disconnected";
1173    case END_STREAM_SLOW:
1174        return "The stream was closed early because it was too slow";
1175    }
1176    std::string msg("Status unknown: " + std::to_string(status) +
1177                    "; this should not have happened!");
1178    return msg.c_str();
1179}
1180
1181void ActiveStream::transitionState(stream_state_t newState) {
1182    producer->getLogger().log(EXTENSION_LOG_DEBUG,
1183                              "(vb %d) Transitioning from %s to %s",
1184                              vb_, stateName(state_), stateName(newState));
1185
1186    if (state_ == newState) {
1187        return;
1188    }
1189
1190    bool validTransition = false;
1191    switch (state_.load()) {
1192        case STREAM_PENDING:
1193            if (newState == STREAM_BACKFILLING || newState == STREAM_DEAD) {
1194                validTransition = true;
1195            }
1196            break;
1197        case STREAM_BACKFILLING:
1198            if(newState == STREAM_IN_MEMORY ||
1199               newState == STREAM_TAKEOVER_SEND ||
1200               newState == STREAM_DEAD) {
1201                validTransition = true;
1202            }
1203            break;
1204        case STREAM_IN_MEMORY:
1205            if (newState == STREAM_BACKFILLING || newState == STREAM_DEAD) {
1206                validTransition = true;
1207            }
1208            break;
1209        case STREAM_TAKEOVER_SEND:
1210            if (newState == STREAM_TAKEOVER_WAIT || newState == STREAM_DEAD) {
1211                validTransition = true;
1212            }
1213            break;
1214        case STREAM_TAKEOVER_WAIT:
1215            if (newState == STREAM_TAKEOVER_SEND || newState == STREAM_DEAD) {
1216                validTransition = true;
1217            }
1218            break;
1219        case STREAM_READING:
1220            // Active stream should never be in READING state.
1221            validTransition = false;
1222            break;
1223        case STREAM_DEAD:
1224            // Once DEAD, no other transitions should occur.
1225            validTransition = false;
1226            break;
1227    }
1228
1229    if (!validTransition) {
1230        throw std::invalid_argument("ActiveStream::transitionState:"
1231                " newState (which is " + std::to_string(newState) +
1232                ") is not valid for current state (which is " +
1233                std::to_string(state_) + ")");
1234    }
1235
1236    stream_state_t oldState = state_.load();
1237    state_ = newState;
1238
1239    switch (newState) {
1240        case STREAM_BACKFILLING:
1241            if (STREAM_PENDING == oldState) {
1242                scheduleBackfill_UNLOCKED(false /* reschedule */);
1243            } else if (STREAM_IN_MEMORY == oldState) {
1244                scheduleBackfill_UNLOCKED(true /* reschedule */);
1245            }
1246            break;
1247        case STREAM_IN_MEMORY:
1248            // Check if the producer has sent up till the last requested
1249            // sequence number already, if not - move checkpoint items into
1250            // the ready queue.
1251            if (lastSentSeqno.load() >= end_seqno_) {
1252                // Stream transitioning to DEAD state
1253                endStream(END_STREAM_OK);
1254                bool inverse = false;
1255                if (itemsReady.compare_exchange_strong(inverse, true)) {
1256                    producer->notifyStreamReady(vb_);
1257                }
1258            } else {
1259                nextCheckpointItem();
1260            }
1261            break;
1262        case STREAM_TAKEOVER_SEND:
1263            takeoverStart = ep_current_time();
1264            nextCheckpointItem();
1265            break;
1266        case STREAM_DEAD:
1267            {
1268                RCPtr<VBucket> vb = engine->getVBucket(vb_);
1269                if (vb) {
1270                    vb->checkpointManager.removeCursor(name_);
1271                }
1272                break;
1273            }
1274        case STREAM_TAKEOVER_WAIT:
1275        case STREAM_PENDING:
1276            break;
1277        case STREAM_READING:
1278            throw std::logic_error("ActiveStream::transitionState:"
1279                    " newState can't be " + std::to_string(newState) + "!");
1280    }
1281}
1282
1283size_t ActiveStream::getItemsRemaining() {
1284    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1285
1286    if (!vbucket || state_ == STREAM_DEAD) {
1287        return 0;
1288    }
1289
1290    // Items remaining is the sum of:
1291    // (a) Items outstanding in checkpoints
1292    // (b) Items pending in our readyQ, excluding any meta items.
1293    return vbucket->checkpointManager.getNumItemsForCursor(name_) +
1294            readyQ_non_meta_items;
1295}
1296
1297uint64_t ActiveStream::getLastReadSeqno() const {
1298    return lastReadSeqno.load();
1299}
1300
1301uint64_t ActiveStream::getLastSentSeqno() const {
1302    return lastSentSeqno.load();
1303}
1304
1305const Logger& ActiveStream::getLogger() const
1306{
1307    return producer->getLogger();
1308}
1309
1310bool ActiveStream::isSendMutationKeyOnlyEnabled() const
1311{
1312    return (KEY_ONLY == payloadType);
1313}
1314
1315bool ActiveStream::isCurrentSnapshotCompleted() const
1316{
1317    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1318    // An atomic read of vbucket state without acquiring the
1319    // reader lock for state should suffice here.
1320    if (vbucket && vbucket->getState() == vbucket_state_replica) {
1321        if (lastSentSnapEndSeqno.load(std::memory_order_relaxed) >=
1322            lastReadSeqno) {
1323            return false;
1324        }
1325    }
1326    return true;
1327}
1328
1329void ActiveStream::dropCheckpointCursor_UNLOCKED()
1330{
1331    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1332    if (!vbucket) {
1333        endStream(END_STREAM_STATE);
1334        bool inverse = false;
1335        if (itemsReady.compare_exchange_strong(inverse, true)) {
1336            producer->notifyStreamReady(vb_);
1337        }
1338    }
1339    /* Drop the existing cursor */
1340    vbucket->checkpointManager.removeCursor(name_);
1341}
1342
1343NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
1344                               const std::string &name, uint32_t flags,
1345                               uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1346                               uint64_t en_seqno, uint64_t vb_uuid,
1347                               uint64_t snap_start_seqno,
1348                               uint64_t snap_end_seqno)
1349    : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1350             snap_start_seqno, snap_end_seqno),
1351      producer(p) {
1352    LockHolder lh(streamMutex);
1353    RCPtr<VBucket> vbucket = e->getVBucket(vb_);
1354    if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
1355        pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1356        transitionState(STREAM_DEAD);
1357        itemsReady.store(true);
1358    }
1359
1360    type_ = STREAM_NOTIFIER;
1361
1362    producer->getLogger().log(EXTENSION_LOG_NOTICE,
1363        "(vb %d) stream created with start seqno %" PRIu64 " and end seqno %"
1364        PRIu64, vb, st_seqno, en_seqno);
1365}
1366
1367uint32_t NotifierStream::setDead(end_stream_status_t status) {
1368    LockHolder lh(streamMutex);
1369    if (state_ != STREAM_DEAD) {
1370        transitionState(STREAM_DEAD);
1371        if (status != END_STREAM_DISCONNECTED) {
1372            pushToReadyQ(new StreamEndResponse(opaque_, status, vb_));
1373            lh.unlock();
1374            bool inverse = false;
1375            if (itemsReady.compare_exchange_strong(inverse, true)) {
1376                producer->notifyStreamReady(vb_);
1377            }
1378        }
1379    }
1380    return 0;
1381}
1382
1383void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
1384    LockHolder lh(streamMutex);
1385    if (state_ != STREAM_DEAD && start_seqno_ < seqno) {
1386        pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1387        transitionState(STREAM_DEAD);
1388        lh.unlock();
1389        bool inverse = false;
1390        if (itemsReady.compare_exchange_strong(inverse, true)) {
1391            producer->notifyStreamReady(vb_);
1392        }
1393    }
1394}
1395
1396DcpResponse* NotifierStream::next() {
1397    LockHolder lh(streamMutex);
1398
1399    if (readyQ.empty()) {
1400        itemsReady.store(false);
1401        return NULL;
1402    }
1403
1404    DcpResponse* response = readyQ.front();
1405    if (producer->bufferLogInsert(response->getMessageSize())) {
1406        popFromReadyQ();
1407    } else {
1408        response = NULL;
1409    }
1410
1411    return response;
1412}
1413
1414void NotifierStream::transitionState(stream_state_t newState) {
1415    producer->getLogger().log(EXTENSION_LOG_DEBUG,
1416        "(vb %d) Transitioning from %s to %s", vb_,
1417        stateName(state_), stateName(newState));
1418
1419    if (state_ == newState) {
1420        return;
1421    }
1422
1423    bool validTransition = false;
1424    switch (state_.load()) {
1425        case STREAM_PENDING:
1426            if (newState == STREAM_DEAD) {
1427                validTransition = true;
1428            }
1429            break;
1430
1431        case STREAM_BACKFILLING:
1432        case STREAM_IN_MEMORY:
1433        case STREAM_TAKEOVER_SEND:
1434        case STREAM_TAKEOVER_WAIT:
1435        case STREAM_READING:
1436        case STREAM_DEAD:
1437            // No other state transitions are valid for a notifier stream.
1438            break;
1439    }
1440
1441    if (!validTransition) {
1442        throw std::invalid_argument("NotifierStream::transitionState:"
1443                " newState (which is " + std::to_string(newState) +
1444                ") is not valid for current state (which is " +
1445                std::to_string(state_) + ")");
1446    }
1447    state_ = newState;
1448}
1449
1450PassiveStream::PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t c,
1451                             const std::string &name, uint32_t flags,
1452                             uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1453                             uint64_t en_seqno, uint64_t vb_uuid,
1454                             uint64_t snap_start_seqno, uint64_t snap_end_seqno,
1455                             uint64_t vb_high_seqno)
1456    : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1457             snap_start_seqno, snap_end_seqno),
1458      engine(e), consumer(c), last_seqno(vb_high_seqno), cur_snapshot_start(0),
1459      cur_snapshot_end(0), cur_snapshot_type(none), cur_snapshot_ack(false) {
1460    LockHolder lh(streamMutex);
1461    pushToReadyQ(new StreamRequest(vb, opaque, flags, st_seqno, en_seqno,
1462                                  vb_uuid, snap_start_seqno, snap_end_seqno));
1463    itemsReady.store(true);
1464    type_ = STREAM_PASSIVE;
1465
1466    const char* type = (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) ? "takeover" : "";
1467    consumer->getLogger().log(EXTENSION_LOG_NOTICE,
1468        "(vb %" PRId16 ") Attempting to add %s stream"
1469        " with start seqno %" PRIu64 ", end seqno %" PRIu64 ","
1470        " vbucket uuid %" PRIu64 ", snap start seqno %" PRIu64 ","
1471        " snap end seqno %" PRIu64 ", and vb_high_seqno %" PRIu64 "",
1472        vb, type, st_seqno, en_seqno, vb_uuid,
1473        snap_start_seqno, snap_end_seqno, vb_high_seqno);
1474}
1475
1476PassiveStream::~PassiveStream() {
1477    uint32_t unackedBytes = clearBuffer_UNLOCKED();
1478    if (transitionState(STREAM_DEAD)) {
1479        // Destructed a "live" stream, log it.
1480        consumer->getLogger().log(EXTENSION_LOG_NOTICE,
1481            "(vb %" PRId16 ") Destructing stream."
1482            " last_seqno is %" PRIu64 ", unAckedBytes is %" PRIu32 ".",
1483            vb_, last_seqno.load(), unackedBytes);
1484    }
1485}
1486
1487uint32_t PassiveStream::setDead(end_stream_status_t status) {
1488    /* Hold buffer lock so that we clear out all items before we set the stream
1489       to dead state. We do not want to add any new message to the buffer or
1490       process any items in the buffer once we set the stream state to dead. */
1491    LockHolder lh(buffer.bufMutex);
1492    uint32_t unackedBytes = clearBuffer_UNLOCKED();
1493    bool killed = false;
1494
1495    LockHolder slh(streamMutex);
1496    if (transitionState(STREAM_DEAD)) {
1497        killed = true;
1498    }
1499
1500    if (killed) {
1501        EXTENSION_LOG_LEVEL logLevel = EXTENSION_LOG_NOTICE;
1502        if (END_STREAM_DISCONNECTED == status) {
1503            logLevel = EXTENSION_LOG_WARNING;
1504        }
1505        consumer->getLogger().log(logLevel,
1506            "(vb %" PRId16 ") Setting stream to dead state, last_seqno is %"
1507            PRIu64 ", unAckedBytes is %" PRIu32 ", status is %s",
1508            vb_, last_seqno.load(), unackedBytes, getEndStreamStatusStr(status));
1509    }
1510    return unackedBytes;
1511}
1512
1513void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
1514    LockHolder lh(streamMutex);
1515    if (state_ == STREAM_PENDING) {
1516        if (status == ENGINE_SUCCESS) {
1517            transitionState(STREAM_READING);
1518        } else {
1519            transitionState(STREAM_DEAD);
1520        }
1521        pushToReadyQ(new AddStreamResponse(add_opaque, opaque_, status));
1522        lh.unlock();
1523        bool inverse = false;
1524        if (itemsReady.compare_exchange_strong(inverse, true)) {
1525            consumer->notifyStreamReady(vb_);
1526        }
1527    }
1528}
1529
1530void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
1531                                    uint32_t new_opaque,
1532                                    uint64_t start_seqno) {
1533    vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
1534
1535    snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
1536    if (info.range.end == info.start) {
1537        info.range.start = info.start;
1538    }
1539
1540    snap_start_seqno_ = info.range.start;
1541    start_seqno_ = info.start;
1542    snap_end_seqno_ = info.range.end;
1543
1544    consumer->getLogger().log(EXTENSION_LOG_NOTICE,
1545        "(vb %d) Attempting to reconnect stream with opaque %" PRIu32
1546        ", start seq no %" PRIu64 ", end seq no %" PRIu64
1547        ", snap start seqno %" PRIu64 ", and snap end seqno %" PRIu64,
1548        vb_, new_opaque, start_seqno, end_seqno_,
1549        snap_start_seqno_, snap_end_seqno_);
1550
1551    LockHolder lh(streamMutex);
1552    last_seqno.store(start_seqno);
1553    pushToReadyQ(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
1554                                  end_seqno_, vb_uuid_, snap_start_seqno_,
1555                                  snap_end_seqno_));
1556    lh.unlock();
1557    bool inverse = false;
1558    if (itemsReady.compare_exchange_strong(inverse, true)) {
1559        consumer->notifyStreamReady(vb_);
1560    }
1561}
1562
1563ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
1564    if(nullptr == resp) {
1565        return ENGINE_EINVAL;
1566    }
1567
1568    LockHolder lh(buffer.bufMutex);
1569
1570    if (state_ == STREAM_DEAD) {
1571        delete resp;
1572        return ENGINE_KEY_ENOENT;
1573    }
1574
1575    switch (resp->getEvent()) {
1576        case DCP_MUTATION:
1577        case DCP_DELETION:
1578        case DCP_EXPIRATION:
1579        {
1580            MutationResponse* m = static_cast<MutationResponse*>(resp);
1581            uint64_t bySeqno = m->getBySeqno();
1582            if (bySeqno <= last_seqno.load()) {
1583                consumer->getLogger().log(EXTENSION_LOG_WARNING,
1584                    "(vb %d) Erroneous (out of sequence) mutation received, "
1585                    "with opaque: %" PRIu32 ", its seqno (%" PRIu64 ") is not "
1586                    "greater than last received seqno (%" PRIu64 "); "
1587                    "Dropping mutation!",
1588                    vb_, opaque_, bySeqno, last_seqno.load());
1589                delete m;
1590                return ENGINE_ERANGE;
1591            }
1592            last_seqno.store(bySeqno);
1593            break;
1594        }
1595        case DCP_SNAPSHOT_MARKER:
1596        {
1597            SnapshotMarker* s = static_cast<SnapshotMarker*>(resp);
1598            uint64_t snapStart = s->getStartSeqno();
1599            uint64_t snapEnd = s->getEndSeqno();
1600            if (snapStart < last_seqno.load() && snapEnd <= last_seqno.load()) {
1601                consumer->getLogger().log(EXTENSION_LOG_WARNING,
1602                    "(vb %d) Erroneous snapshot marker received, with "
1603                    "opaque: %" PRIu32 ", its start "
1604                    "(%" PRIu64 "), and end (%" PRIu64 ") are less than last "
1605                    "received seqno (%" PRIu64 "); Dropping marker!",
1606                    vb_, opaque_, snapStart, snapEnd, last_seqno.load());
1607                delete s;
1608                return ENGINE_ERANGE;
1609            }
1610            break;
1611        }
1612        case DCP_SET_VBUCKET:
1613        case DCP_STREAM_END:
1614        {
1615            /* No validations necessary */
1616            break;
1617        }
1618        default:
1619        {
1620            consumer->getLogger().log(EXTENSION_LOG_WARNING,
1621                "(vb %d) Unknown DCP op received: %d; Disconnecting connection..",
1622                vb_, resp->getEvent());
1623            return ENGINE_DISCONNECT;
1624        }
1625    }
1626
1627    if (engine->getReplicationThrottle().shouldProcess() && !buffer.items) {
1628        /* Process the response here itself rather than buffering it */
1629        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1630        switch (resp->getEvent()) {
1631            case DCP_MUTATION:
1632                ret = processMutation(static_cast<MutationResponse*>(resp));
1633                break;
1634            case DCP_DELETION:
1635            case DCP_EXPIRATION:
1636                ret = processDeletion(static_cast<MutationResponse*>(resp));
1637                break;
1638            case DCP_SNAPSHOT_MARKER:
1639                processMarker(static_cast<SnapshotMarker*>(resp));
1640                break;
1641            case DCP_SET_VBUCKET:
1642                processSetVBucketState(static_cast<SetVBucketState*>(resp));
1643                break;
1644            case DCP_STREAM_END:
1645                {
1646                    LockHolder lh(streamMutex);
1647                    transitionState(STREAM_DEAD);
1648                }
1649                break;
1650            default:
1651                // Above switch should've returned DISCONNECT, throw an exception
1652                throw std::logic_error("PassiveStream::messageReceived: (vb " +
1653                                       std::to_string(vb_) +
1654                                       ") received unknown message type " +
1655                                       std::to_string(resp->getEvent()));
1656        }
1657        if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
1658            delete resp;
1659            return ret;
1660        }
1661    }
1662
1663    buffer.messages.push(resp);
1664    buffer.items++;
1665    buffer.bytes += resp->getMessageSize();
1666
1667    return ENGINE_TMPFAIL;
1668}
1669
1670process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes,
1671                                                             size_t batchSize) {
1672    LockHolder lh(buffer.bufMutex);
1673    uint32_t count = 0;
1674    uint32_t message_bytes = 0;
1675    uint32_t total_bytes_processed = 0;
1676    bool failed = false;
1677    if (buffer.messages.empty()) {
1678        return all_processed;
1679    }
1680
1681    while (count < batchSize && !buffer.messages.empty()) {
1682        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1683        /* If the stream is in dead state we should not process any remaining
1684           items in the buffer, we should rather clear them */
1685        if (state_ == STREAM_DEAD) {
1686            total_bytes_processed += clearBuffer_UNLOCKED();
1687            processed_bytes = total_bytes_processed;
1688            return all_processed;
1689        }
1690
1691        DcpResponse *response = buffer.messages.front();
1692        message_bytes = response->getMessageSize();
1693
1694        switch (response->getEvent()) {
1695            case DCP_MUTATION:
1696                ret = processMutation(static_cast<MutationResponse*>(response));
1697                break;
1698            case DCP_DELETION:
1699            case DCP_EXPIRATION:
1700                ret = processDeletion(static_cast<MutationResponse*>(response));
1701                break;
1702            case DCP_SNAPSHOT_MARKER:
1703                processMarker(static_cast<SnapshotMarker*>(response));
1704                break;
1705            case DCP_SET_VBUCKET:
1706                processSetVBucketState(static_cast<SetVBucketState*>(response));
1707                break;
1708            case DCP_STREAM_END:
1709                {
1710                    LockHolder lh(streamMutex);
1711                    transitionState(STREAM_DEAD);
1712                }
1713                break;
1714            default:
1715                consumer->getLogger().log(EXTENSION_LOG_WARNING,
1716                                          "PassiveStream::processBufferedMessages:"
1717                                          "(vb %" PRIu16 ") PassiveStream failing "
1718                                          "unknown message type %d",
1719                                          vb_, response->getEvent());
1720                failed = true;
1721        }
1722
1723        if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
1724            failed = true;
1725            break;
1726        }
1727
1728        delete response;
1729        buffer.messages.pop();
1730        buffer.items--;
1731        buffer.bytes -= message_bytes;
1732        count++;
1733        if (ret != ENGINE_ERANGE) {
1734            total_bytes_processed += message_bytes;
1735        }
1736    }
1737
1738    processed_bytes = total_bytes_processed;
1739
1740    if (failed) {
1741        return cannot_process;
1742    }
1743
1744    return all_processed;
1745}
1746
1747ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
1748    RCPtr<VBucket> vb = engine->getVBucket(vb_);
1749    if (!vb) {
1750        return ENGINE_NOT_MY_VBUCKET;
1751    }
1752
1753    if (mutation->getBySeqno() < cur_snapshot_start.load() ||
1754        mutation->getBySeqno() > cur_snapshot_end.load()) {
1755        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1756            "(vb %d) Erroneous mutation [sequence "
1757            "number does not fall in the expected snapshot range : "
1758            "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64 ") <= "
1759            "snapshot_end (%" PRIu64 ")]; Dropping the mutation!",
1760            vb_, cur_snapshot_start.load(),
1761            mutation->getBySeqno(), cur_snapshot_end.load());
1762        return ENGINE_ERANGE;
1763    }
1764
1765    // MB-17517: Check for the incoming item's CAS validity. We /shouldn't/
1766    // receive anything without a valid CAS, however given that versions without
1767    // this check may send us "bad" CAS values, we should regenerate them (which
1768    // is better than rejecting the data entirely).
1769    if (!Item::isValidCas(mutation->getItem()->getCas())) {
1770        LOG(EXTENSION_LOG_WARNING,
1771            "%s Invalid CAS (0x%" PRIx64 ") received for mutation {vb:%" PRIu16
1772            ", seqno:%" PRId64 "}. Regenerating new CAS",
1773            consumer->logHeader(),
1774            mutation->getItem()->getCas(), vb_,
1775            mutation->getItem()->getBySeqno());
1776        mutation->getItem()->setCas();
1777    }
1778
1779    ENGINE_ERROR_CODE ret;
1780    if (vb->isBackfillPhase()) {
1781        ret = engine->getEpStore()->addTAPBackfillItem(*mutation->getItem(),
1782                                                    INITIAL_NRU_VALUE,
1783                                                    false,
1784                                                    mutation->getExtMetaData());
1785    } else {
1786        ret = engine->getEpStore()->setWithMeta(*mutation->getItem(), 0, NULL,
1787                                                consumer->getCookie(), true,
1788                                                true, INITIAL_NRU_VALUE,
1789                                                GenerateBySeqno::No,
1790                                                GenerateCas::No,
1791                                                mutation->getExtMetaData(),
1792                                                true);
1793    }
1794
1795    // We should probably handle these error codes in a better way, but since
1796    // the producer side doesn't do anything with them anyways let's just log
1797    // them for now until we come up with a better solution.
1798    if (ret != ENGINE_SUCCESS) {
1799        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1800            "Got an error code %d while trying to process mutation", ret);
1801    } else {
1802        handleSnapshotEnd(vb, mutation->getBySeqno());
1803    }
1804
1805    return ret;
1806}
1807
1808ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
1809    RCPtr<VBucket> vb = engine->getVBucket(vb_);
1810    if (!vb) {
1811        return ENGINE_NOT_MY_VBUCKET;
1812    }
1813
1814    if (deletion->getBySeqno() < cur_snapshot_start.load() ||
1815        deletion->getBySeqno() > cur_snapshot_end.load()) {
1816        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1817            "(vb %d) Erroneous deletion [sequence "
1818            "number does not fall in the expected snapshot range : "
1819            "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64 ") <= "
1820            "snapshot_end (%" PRIu64 ")]; Dropping the deletion!",
1821            vb_, cur_snapshot_start.load(),
1822            deletion->getBySeqno(), cur_snapshot_end.load());
1823        return ENGINE_ERANGE;
1824    }
1825
1826    uint64_t delCas = 0;
1827    ENGINE_ERROR_CODE ret;
1828    ItemMetaData meta = deletion->getItem()->getMetaData();
1829
1830    // MB-17517: Check for the incoming item's CAS validity.
1831    if (!Item::isValidCas(meta.cas)) {
1832        LOG(EXTENSION_LOG_WARNING,
1833            "%s Invalid CAS (0x%" PRIx64 ") received for deletion {vb:%" PRIu16
1834            ", seqno:%" PRId64 "}. Regenerating new CAS",
1835            consumer->logHeader(), meta.cas, vb_, deletion->getBySeqno());
1836        meta.cas = Item::nextCas();
1837    }
1838
1839    ret = engine->getEpStore()->deleteWithMeta(deletion->getItem()->getKey(),
1840                                               &delCas, NULL, deletion->getVBucket(),
1841                                               consumer->getCookie(), true,
1842                                               &meta, vb->isBackfillPhase(),
1843                                               GenerateBySeqno::No,
1844                                               GenerateCas::No,
1845                                               deletion->getBySeqno(),
1846                                               deletion->getExtMetaData(),
1847                                               true);
1848    if (ret == ENGINE_KEY_ENOENT) {
1849        ret = ENGINE_SUCCESS;
1850    }
1851
1852    // We should probably handle these error codes in a better way, but since
1853    // the producer side doesn't do anything with them anyways let's just log
1854    // them for now until we come up with a better solution.
1855    if (ret != ENGINE_SUCCESS) {
1856        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1857            "Got an error code %d while trying to process deletion", ret);
1858    } else {
1859        handleSnapshotEnd(vb, deletion->getBySeqno());
1860    }
1861
1862    return ret;
1863}
1864
1865void PassiveStream::processMarker(SnapshotMarker* marker) {
1866    RCPtr<VBucket> vb = engine->getVBucket(vb_);
1867
1868    cur_snapshot_start.store(marker->getStartSeqno());
1869    cur_snapshot_end.store(marker->getEndSeqno());
1870    cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory);
1871
1872    if (vb) {
1873        if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
1874            vb->setBackfillPhase(true);
1875            // calling checkpointManager.setBackfillPhase sets the
1876            // openCheckpointId to zero
1877            vb->checkpointManager.setBackfillPhase(cur_snapshot_start.load(),
1878                                                   cur_snapshot_end.load());
1879        } else {
1880            if (marker->getFlags() & MARKER_FLAG_CHK ||
1881                    vb->checkpointManager.getOpenCheckpointId() == 0) {
1882                vb->checkpointManager.createSnapshot(cur_snapshot_start.load(),
1883                                                     cur_snapshot_end.load());
1884            } else {
1885                vb->checkpointManager.updateCurrentSnapshotEnd(cur_snapshot_end.load());
1886            }
1887            vb->setBackfillPhase(false);
1888        }
1889
1890        if (marker->getFlags() & MARKER_FLAG_ACK) {
1891            cur_snapshot_ack = true;
1892        }
1893    }
1894}
1895
1896void PassiveStream::processSetVBucketState(SetVBucketState* state) {
1897    engine->getEpStore()->setVBucketState(vb_, state->getState(), true);
1898
1899    LockHolder lh (streamMutex);
1900    pushToReadyQ(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
1901    lh.unlock();
1902    bool inverse = false;
1903    if (itemsReady.compare_exchange_strong(inverse, true)) {
1904        consumer->notifyStreamReady(vb_);
1905    }
1906}
1907
1908void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
1909    if (byseqno == cur_snapshot_end.load()) {
1910        if (cur_snapshot_type.load() == disk && vb->isBackfillPhase()) {
1911            vb->setBackfillPhase(false);
1912            uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1913            vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1914        } else {
1915            size_t mem_threshold = engine->getEpStats().mem_high_wat.load();
1916            size_t mem_used = engine->getEpStats().getTotalMemoryUsed();
1917            /* We want to add a new replica checkpoint if the mem usage is above
1918               high watermark (85%) */
1919            if (mem_threshold < mem_used) {
1920                uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1921                vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1922            }
1923        }
1924
1925        if (cur_snapshot_ack) {
1926            LockHolder lh(streamMutex);
1927            pushToReadyQ(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
1928            lh.unlock();
1929            bool inverse = false;
1930            if (itemsReady.compare_exchange_strong(inverse, true)) {
1931                consumer->notifyStreamReady(vb_);
1932            }
1933            cur_snapshot_ack = false;
1934        }
1935        cur_snapshot_type.store(none);
1936    }
1937}
1938
1939void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
1940    Stream::addStats(add_stat, c);
1941
1942    try {
1943        const int bsize = 1024;
1944        char buf[bsize];
1945        checked_snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(),
1946                         vb_);
1947        add_casted_stat(buf, buffer.items, add_stat, c);
1948        checked_snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(),
1949                         vb_);
1950        add_casted_stat(buf, buffer.bytes, add_stat, c);
1951        checked_snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(),
1952                         vb_);
1953        add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
1954        checked_snprintf(buf, bsize, "%s:stream_%d_last_received_seqno",
1955                         name_.c_str(), vb_);
1956        add_casted_stat(buf, last_seqno.load(), add_stat, c);
1957        checked_snprintf(buf, bsize, "%s:stream_%d_ready_queue_memory",
1958                         name_.c_str(), vb_);
1959        add_casted_stat(buf, getReadyQueueMemory(), add_stat, c);
1960
1961        checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type",
1962                         name_.c_str(), vb_);
1963        add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type.load()),
1964                        add_stat, c);
1965
1966        if (cur_snapshot_type.load() != none) {
1967            checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start",
1968                             name_.c_str(), vb_);
1969            add_casted_stat(buf, cur_snapshot_start.load(), add_stat, c);
1970            checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end",
1971                             name_.c_str(), vb_);
1972            add_casted_stat(buf, cur_snapshot_end.load(), add_stat, c);
1973        }
1974    } catch (std::exception& error) {
1975        LOG(EXTENSION_LOG_WARNING,
1976            "PassiveStream::addStats: Failed to build stats: %s", error.what());
1977    }
1978}
1979
1980DcpResponse* PassiveStream::next() {
1981    LockHolder lh(streamMutex);
1982
1983    if (readyQ.empty()) {
1984        itemsReady.store(false);
1985        return NULL;
1986    }
1987
1988    DcpResponse* response = readyQ.front();
1989    popFromReadyQ();
1990    return response;
1991}
1992
1993uint32_t PassiveStream::clearBuffer_UNLOCKED() {
1994    uint32_t unackedBytes = buffer.bytes;
1995
1996    while (!buffer.messages.empty()) {
1997        DcpResponse* resp = buffer.messages.front();
1998        buffer.messages.pop();
1999        delete resp;
2000    }
2001
2002    buffer.bytes = 0;
2003    buffer.items = 0;
2004    return unackedBytes;
2005}
2006
2007bool PassiveStream::transitionState(stream_state_t newState) {
2008    consumer->getLogger().log(EXTENSION_LOG_DEBUG,
2009        "(vb %d) Transitioning from %s to %s",
2010        vb_, stateName(state_), stateName(newState));
2011
2012    if (state_ == newState) {
2013        return false;
2014    }
2015
2016    bool validTransition = false;
2017    switch (state_.load()) {
2018        case STREAM_PENDING:
2019            if (newState == STREAM_READING || newState == STREAM_DEAD) {
2020                validTransition = true;
2021            }
2022            break;
2023
2024        case STREAM_BACKFILLING:
2025        case STREAM_IN_MEMORY:
2026        case STREAM_TAKEOVER_SEND:
2027        case STREAM_TAKEOVER_WAIT:
2028            // Not valid for passive streams
2029            break;
2030
2031        case STREAM_READING:
2032            if (newState == STREAM_PENDING || newState == STREAM_DEAD) {
2033                validTransition = true;
2034            }
2035            break;
2036
2037        case STREAM_DEAD:
2038            // Once 'dead' shouldn't transition away from it.
2039            break;
2040    }
2041
2042    if (!validTransition) {
2043        throw std::invalid_argument("PassiveStream::transitionState:"
2044                " newState (which is" + std::to_string(newState) +
2045                ") is not valid for current state (which is " +
2046                std::to_string(state_) + ")");
2047    }
2048
2049    state_ = newState;
2050    return true;
2051}
2052
2053const char* PassiveStream::getEndStreamStatusStr(end_stream_status_t status)
2054{
2055    switch (status) {
2056        case END_STREAM_OK:
2057            return "The stream closed as part of normal operation";
2058        case END_STREAM_CLOSED:
2059            return "The stream closed due to a close stream message";
2060        case END_STREAM_DISCONNECTED:
2061            return "The stream closed early because the conn was disconnected";
2062        case END_STREAM_STATE:
2063            return "The stream closed early because the vbucket state changed";
2064        default:
2065            break;
2066    }
2067    std::string msg("Status unknown: " + std::to_string(status) +
2068                    "; this should not have happened!");
2069    return msg.c_str();
2070}
2071