xref: /4.6.4/ep-engine/src/dcp/stream.cc (revision 370f70cd)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <platform/checked_snprintf.h>
21
22#include "ep_engine.h"
23#include "failover-table.h"
24#include "kvstore.h"
25#include "statwriter.h"
26#include "dcp/backfill-manager.h"
27#include "dcp/backfill.h"
28#include "dcp/consumer.h"
29#include "dcp/producer.h"
30#include "dcp/response.h"
31#include "dcp/stream.h"
32#include "replicationthrottle.h"
33
34static const char* snapshotTypeToString(snapshot_type_t type) {
35    static const char * const snapshotTypes[] = { "none", "disk", "memory" };
36    if (type < none || type > memory) {
37        throw std::invalid_argument("snapshotTypeToString: type (which is " +
38                                    std::to_string(type) +
39                                    ") is not a valid snapshot_type_t");
40    }
41    return snapshotTypes[type];
42}
43
44const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
45
46Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
47               uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
48               uint64_t vb_uuid, uint64_t snap_start_seqno,
49               uint64_t snap_end_seqno)
50    : name_(name), flags_(flags), opaque_(opaque), vb_(vb),
51      start_seqno_(start_seqno), end_seqno_(end_seqno), vb_uuid_(vb_uuid),
52      snap_start_seqno_(snap_start_seqno),
53      snap_end_seqno_(snap_end_seqno),
54      state_(STREAM_PENDING), itemsReady(false),
55      readyQ_non_meta_items(0),
56      readyQueueMemory(0) {
57}
58
59Stream::~Stream() {
60    // NB: reusing the "unlocked" method without a lock because we're
61    // destructing and should not take any locks.
62    clear_UNLOCKED();
63}
64
65void Stream::clear_UNLOCKED() {
66    while (!readyQ.empty()) {
67        DcpResponse* resp = readyQ.front();
68        popFromReadyQ();
69        delete resp;
70    }
71}
72
73void Stream::pushToReadyQ(DcpResponse* resp)
74{
75   /* expect streamMutex.ownsLock() == true */
76    if (resp) {
77        readyQ.push(resp);
78        if (!resp->isMetaEvent()) {
79            readyQ_non_meta_items++;
80        }
81        readyQueueMemory.fetch_add(resp->getMessageSize(),
82                                   std::memory_order_relaxed);
83    }
84}
85
86void Stream::popFromReadyQ(void)
87{
88    /* expect streamMutex.ownsLock() == true */
89    if (!readyQ.empty()) {
90        const auto& front = readyQ.front();
91        if (!front->isMetaEvent()) {
92            readyQ_non_meta_items--;
93        }
94        const uint32_t respSize = front->getMessageSize();
95        readyQ.pop();
96
97        /* Decrement the readyQ size */
98        if (respSize <= readyQueueMemory.load(std::memory_order_relaxed)) {
99            readyQueueMemory.fetch_sub(respSize, std::memory_order_relaxed);
100        } else {
101            LOG(EXTENSION_LOG_DEBUG, "readyQ size for stream %s (vb %d)"
102                "underflow, likely wrong stat calculation! curr size: %" PRIu64
103                "; new size: %d",
104                name_.c_str(), getVBucket(),
105                readyQueueMemory.load(std::memory_order_relaxed), respSize);
106            readyQueueMemory.store(0, std::memory_order_relaxed);
107        }
108    }
109}
110
111uint64_t Stream::getReadyQueueMemory() {
112    return readyQueueMemory.load(std::memory_order_relaxed);
113}
114
115const char* Stream::stateName(stream_state_t st) {
116    static const char * const stateNames[] = {
117        "pending", "backfilling", "in-memory", "takeover-send", "takeover-wait",
118        "reading", "dead"
119    };
120    if (st < STREAM_PENDING || st > STREAM_DEAD) {
121        throw std::invalid_argument("Stream::stateName: st (which is " +
122                                        std::to_string(st) +
123                                        ") is not a valid stream_state_t");
124    }
125    return stateNames[st];
126}
127
128void Stream::addStats(ADD_STAT add_stat, const void *c) {
129    try {
130        const int bsize = 1024;
131        char buffer[bsize];
132        checked_snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(),
133                         vb_);
134        add_casted_stat(buffer, flags_, add_stat, c);
135        checked_snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(),
136                         vb_);
137        add_casted_stat(buffer, opaque_, add_stat, c);
138        checked_snprintf(buffer, bsize, "%s:stream_%d_start_seqno",
139                         name_.c_str(), vb_);
140        add_casted_stat(buffer, start_seqno_, add_stat, c);
141        checked_snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(),
142                         vb_);
143        add_casted_stat(buffer, end_seqno_, add_stat, c);
144        checked_snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(),
145                         vb_);
146        add_casted_stat(buffer, vb_uuid_, add_stat, c);
147        checked_snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno",
148                         name_.c_str(), vb_);
149        add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
150        checked_snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno",
151                         name_.c_str(), vb_);
152        add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
153        checked_snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(),
154                         vb_);
155        add_casted_stat(buffer, stateName(state_), add_stat, c);
156    } catch (std::exception& error) {
157        LOG(EXTENSION_LOG_WARNING,
158            "Stream::addStats: Failed to build stats: %s", error.what());
159    }
160}
161
162ActiveStream::ActiveStream(EventuallyPersistentEngine* e, dcp_producer_t p,
163                           const std::string &n, uint32_t flags,
164                           uint32_t opaque, uint16_t vb, uint64_t st_seqno,
165                           uint64_t en_seqno, uint64_t vb_uuid,
166                           uint64_t snap_start_seqno, uint64_t snap_end_seqno)
167    :  Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
168              snap_start_seqno, snap_end_seqno),
169       isBackfillTaskRunning(false), pendingBackfill(false),
170       lastReadSeqnoUnSnapshotted(st_seqno), lastReadSeqno(st_seqno),
171       lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
172       takeoverState(vbucket_state_pending), backfillRemaining(0),
173       itemsFromMemoryPhase(0), firstMarkerSent(false), waitForSnapshot(0),
174       engine(e), producer(p),
175       payloadType((flags & DCP_ADD_STREAM_FLAG_NO_VALUE) ? KEY_ONLY :
176                                                            KEY_VALUE),
177       lastSentSnapEndSeqno(0), chkptItemsExtractionInProgress(false) {
178
179    const char* type = "";
180    if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
181        type = "takeover ";
182        end_seqno_ = dcpMaxSeqno;
183    }
184
185    RCPtr<VBucket> vbucket = engine->getVBucket(vb);
186    if (vbucket) {
187        ReaderLockHolder rlh(vbucket->getStateLock());
188        if (vbucket->getState() == vbucket_state_replica) {
189            snapshot_info_t info = vbucket->checkpointManager.getSnapshotInfo();
190            if (info.range.end > en_seqno) {
191                end_seqno_ = info.range.end;
192            }
193        }
194    }
195
196    producer->getLogger().log(EXTENSION_LOG_NOTICE,
197        "(vb %" PRIu16 ") Creating %sstream with start seqno %" PRIu64
198        " and end seqno %" PRIu64, vb, type, st_seqno, en_seqno);
199
200    backfillItems.memory = 0;
201    backfillItems.disk = 0;
202    backfillItems.sent = 0;
203
204    type_ = STREAM_ACTIVE;
205
206    bufferedBackfill.bytes = 0;
207    bufferedBackfill.items = 0;
208
209    takeoverStart = 0;
210    takeoverSendMaxTime = engine->getConfiguration().getDcpTakeoverMaxTime();
211
212    if (start_seqno_ >= end_seqno_) {
213        /* streamMutex lock needs to be acquired because endStream
214         * potentially makes call to pushToReadyQueue.
215         */
216        LockHolder lh(streamMutex);
217        endStream(END_STREAM_OK);
218        itemsReady.store(true);
219        // lock is released on leaving the scope
220    }
221}
222
223ActiveStream::~ActiveStream() {
224    transitionState(STREAM_DEAD);
225}
226
227DcpResponse* ActiveStream::next() {
228    LockHolder lh(streamMutex);
229
230    stream_state_t initState = state_;
231
232    DcpResponse* response = NULL;
233
234    bool validTransition = false;
235    switch (initState) {
236        case STREAM_PENDING:
237            validTransition = true;
238            break;
239        case STREAM_BACKFILLING:
240            validTransition = true;
241            response = backfillPhase();
242            break;
243        case STREAM_IN_MEMORY:
244            validTransition = true;
245            response = inMemoryPhase();
246            break;
247        case STREAM_TAKEOVER_SEND:
248            validTransition = true;
249            response = takeoverSendPhase();
250            break;
251        case STREAM_TAKEOVER_WAIT:
252            validTransition = true;
253            response = takeoverWaitPhase();
254            break;
255        case STREAM_READING:
256            // Not valid for an active stream.
257            break;
258        case STREAM_DEAD:
259            validTransition = true;
260            response = deadPhase();
261            break;
262    }
263
264    if (!validTransition) {
265        throw std::invalid_argument("ActiveStream::transitionState:"
266                " invalid state " + std::to_string(state_) + " for stream " +
267                producer->logHeader() + " vb " + std::to_string(vb_));
268    }
269
270    stream_state_t newState = state_;
271
272    if (newState != STREAM_DEAD && newState != state_ && !response) {
273        lh.unlock();
274        return next();
275    }
276
277    itemsReady.store(response ? true : false);
278    return response;
279}
280
281void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
282    LockHolder lh(streamMutex);
283    uint64_t chkCursorSeqno = endSeqno;
284
285    if (state_ != STREAM_BACKFILLING) {
286        return;
287    }
288
289    startSeqno = std::min(snap_start_seqno_, startSeqno);
290    firstMarkerSent = true;
291
292    RCPtr<VBucket> vb = engine->getVBucket(vb_);
293    // An atomic read of vbucket state without acquiring the
294    // reader lock for state should suffice here.
295    if (vb && vb->getState() == vbucket_state_replica) {
296        if (end_seqno_ > endSeqno) {
297            /* We possibly have items in the open checkpoint
298               (incomplete snapshot) */
299            snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
300            producer->getLogger().log(EXTENSION_LOG_NOTICE,
301                "(vb %" PRIu16 ") Merging backfill and memory snapshot for a "
302                "replica vbucket, backfill start seqno %" PRIu64 ", "
303                "backfill end seqno %" PRIu64 ", "
304                "snapshot end seqno after merge %" PRIu64,
305                vb_, startSeqno, endSeqno, info.range.end);
306            endSeqno = info.range.end;
307        }
308    }
309
310    producer->getLogger().log(EXTENSION_LOG_NOTICE,
311        "(vb %" PRIu16 ") Sending disk snapshot with start seqno %" PRIu64
312        " and end seqno %" PRIu64, vb_, startSeqno, endSeqno);
313    pushToReadyQ(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
314                                    MARKER_FLAG_DISK));
315    lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
316
317    if (!vb) {
318        endStream(END_STREAM_STATE);
319    } else if (!(flags_ & DCP_ADD_STREAM_FLAG_DISKONLY)) {
320        // Only re-register the cursor if we still need to get memory snapshots
321        CursorRegResult result =
322            vb->checkpointManager.registerCursorBySeqno(
323                                                name_, chkCursorSeqno,
324                                                MustSendCheckpointEnd::NO);
325        curChkSeqno = result.first;
326    }
327
328    lh.unlock();
329    bool inverse = false;
330    if (itemsReady.compare_exchange_strong(inverse, true)) {
331        producer->notifyStreamReady(vb_);
332    }
333}
334
335bool ActiveStream::backfillReceived(Item* itm, backfill_source_t backfill_source) {
336    if (nullptr == itm) {
337        return false;
338    }
339
340    if (itm->shouldReplicate()) {
341        LockHolder lh(streamMutex);
342        if (state_ == STREAM_BACKFILLING) {
343            if (!producer->recordBackfillManagerBytesRead(itm->size())) {
344                delete itm;
345                return false;
346            }
347
348            bufferedBackfill.bytes.fetch_add(itm->size());
349            bufferedBackfill.items++;
350
351            pushToReadyQ(new MutationResponse(itm, opaque_, nullptr));
352
353            lastReadSeqno.store(itm->getBySeqno());
354            lh.unlock();
355            bool inverse = false;
356            if (itemsReady.compare_exchange_strong(inverse, true)) {
357                producer->notifyStreamReady(vb_);
358            }
359
360            if (backfill_source == BACKFILL_FROM_MEMORY) {
361                backfillItems.memory++;
362            } else {
363                backfillItems.disk++;
364            }
365        } else {
366            delete itm;
367        }
368    } else {
369        delete itm;
370    }
371
372    return true;
373}
374
375void ActiveStream::completeBackfill() {
376    {
377        LockHolder lh(streamMutex);
378        if (state_ == STREAM_BACKFILLING) {
379            producer->getLogger().log(EXTENSION_LOG_NOTICE,
380                    "(vb %" PRIu16 ") Backfill complete, %" PRIu64 " items "
381                    "read from disk, %" PRIu64 " from memory, last seqno read: "
382                    "%" PRIu64 "\n", vb_, uint64_t(backfillItems.disk.load()),
383                    uint64_t(backfillItems.memory.load()),
384                    lastReadSeqno.load());
385
386            isBackfillTaskRunning = false;
387            if (pendingBackfill) {
388                scheduleBackfill_UNLOCKED(true);
389                pendingBackfill = false;
390            }
391
392            bool expected = false;
393            if (itemsReady.compare_exchange_strong(expected, true)) {
394                producer->notifyStreamReady(vb_);
395            }
396
397            /**
398              * MB-22451: It is important that we return here because
399              * scheduleBackfill_UNLOCKED(true) can set
400              * isBackfillTaskRunning to true.  Therefore if we don't return we
401              * will set isBackfillTaskRunning prematurely back to false, (see
402              * below).
403              */
404            return;
405        }
406    }
407
408    bool inverse = true;
409    isBackfillTaskRunning.compare_exchange_strong(inverse, false);
410    inverse = false;
411    if (itemsReady.compare_exchange_strong(inverse, true)) {
412        producer->notifyStreamReady(vb_);
413    }
414}
415
416void ActiveStream::snapshotMarkerAckReceived() {
417    bool inverse = false;
418    if (--waitForSnapshot == 0 &&
419        itemsReady.compare_exchange_strong(inverse, true)) {
420        producer->notifyStreamReady(vb_);
421    }
422}
423
424void ActiveStream::setVBucketStateAckRecieved() {
425    LockHolder lh(streamMutex);
426    if (state_ == STREAM_TAKEOVER_WAIT) {
427        if (takeoverState == vbucket_state_pending) {
428            producer->getLogger().log(EXTENSION_LOG_INFO,
429                "(vb %" PRIu16 ") Receive ack for set vbucket state to pending "
430                "message", vb_);
431
432            takeoverState = vbucket_state_active;
433            transitionState(STREAM_TAKEOVER_SEND);
434            lh.unlock();
435
436            engine->getEpStore()->setVBucketState(vb_, vbucket_state_dead,
437                                                  false, false);
438            RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
439            producer->getLogger().log(EXTENSION_LOG_NOTICE,
440                "(vb %" PRIu16 ") Vbucket marked as dead, last sent seqno: %"
441                PRIu64 ", high seqno: %" PRIu64,
442                vb_, lastSentSeqno.load(), vbucket->getHighSeqno());
443        } else {
444            producer->getLogger().log(EXTENSION_LOG_INFO,
445                "(vb %" PRIu16 ") Receive ack for set vbucket state to active "
446                "message", vb_);
447            endStream(END_STREAM_OK);
448            lh.unlock();
449        }
450
451        bool inverse = false;
452        if (itemsReady.compare_exchange_strong(inverse, true)) {
453            producer->notifyStreamReady(vb_);
454        }
455    } else {
456        producer->getLogger().log(EXTENSION_LOG_WARNING,
457            "(vb %" PRIu16 ") Unexpected ack for set vbucket op on stream '%s' "
458            "state '%s'", vb_, name_.c_str(), stateName(state_));
459    }
460
461}
462
463DcpResponse* ActiveStream::backfillPhase() {
464    DcpResponse* resp = nextQueuedItem();
465
466    if (resp && (resp->getEvent() == DCP_MUTATION ||
467         resp->getEvent() == DCP_DELETION ||
468         resp->getEvent() == DCP_EXPIRATION)) {
469        MutationResponse* m = static_cast<MutationResponse*>(resp);
470        producer->recordBackfillManagerBytesSent(m->getItem()->size());
471        bufferedBackfill.bytes.fetch_sub(m->getItem()->size());
472        bufferedBackfill.items--;
473        if (backfillRemaining.load(std::memory_order_relaxed) > 0) {
474            backfillRemaining.fetch_sub(1, std::memory_order_relaxed);
475        }
476    }
477
478    if (!isBackfillTaskRunning && readyQ.empty()) {
479        backfillRemaining.store(0, std::memory_order_relaxed);
480        if (lastReadSeqno.load() >= end_seqno_) {
481            endStream(END_STREAM_OK);
482        } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
483            transitionState(STREAM_TAKEOVER_SEND);
484        } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
485            endStream(END_STREAM_OK);
486        } else {
487            transitionState(STREAM_IN_MEMORY);
488        }
489
490        if (!resp) {
491            resp = nextQueuedItem();
492        }
493    }
494
495    return resp;
496}
497
498DcpResponse* ActiveStream::inMemoryPhase() {
499    if (lastSentSeqno.load() >= end_seqno_) {
500        endStream(END_STREAM_OK);
501    } else if (readyQ.empty()) {
502        if (pendingBackfill) {
503            transitionState(STREAM_BACKFILLING);
504            pendingBackfill = false;
505            return NULL;
506        } else if (nextCheckpointItem()) {
507            return NULL;
508        }
509    }
510
511    return nextQueuedItem();
512}
513
514DcpResponse* ActiveStream::takeoverSendPhase() {
515
516    RCPtr<VBucket> vb = engine->getVBucket(vb_);
517    if (vb && takeoverStart != 0 &&
518        !vb->isTakeoverBackedUp() &&
519        (ep_current_time() - takeoverStart) > takeoverSendMaxTime) {
520        vb->setTakeoverBackedUpState(true);
521    }
522
523    if (!readyQ.empty()) {
524        return nextQueuedItem();
525    } else {
526        if (nextCheckpointItem()) {
527            return NULL;
528        }
529    }
530
531    if (waitForSnapshot != 0) {
532        return NULL;
533    }
534
535    if (vb) {
536        vb->setTakeoverBackedUpState(false);
537        takeoverStart = 0;
538    }
539
540    DcpResponse* resp = NULL;
541    if (producer->bufferLogInsert(SetVBucketState::baseMsgBytes)) {
542        resp = new SetVBucketState(opaque_, vb_, takeoverState);
543        transitionState(STREAM_TAKEOVER_WAIT);
544    }
545    return resp;
546}
547
548DcpResponse* ActiveStream::takeoverWaitPhase() {
549    return nextQueuedItem();
550}
551
552DcpResponse* ActiveStream::deadPhase() {
553    DcpResponse* resp = nextQueuedItem();
554    if (!resp) {
555        producer->getLogger().log(EXTENSION_LOG_NOTICE,
556                                  "(vb %" PRIu16 ") Stream closed, "
557                                  "%" PRIu64 " items sent from backfill phase, "
558                                  "%" PRIu64 " items sent from memory phase, "
559                                  "%" PRIu64 " was last seqno sent",
560                                  vb_,
561                                  uint64_t(backfillItems.sent.load()),
562                                  uint64_t(itemsFromMemoryPhase.load()),
563                                  lastSentSeqno.load());
564    }
565    return resp;
566}
567
568bool ActiveStream::isCompressionEnabled() {
569    return producer->isValueCompressionEnabled();
570}
571
572void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
573    Stream::addStats(add_stat, c);
574
575    try {
576        const int bsize = 1024;
577        char buffer[bsize];
578        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_disk_items",
579                         name_.c_str(), vb_);
580        add_casted_stat(buffer, backfillItems.disk, add_stat, c);
581        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_mem_items",
582                         name_.c_str(), vb_);
583        add_casted_stat(buffer, backfillItems.memory, add_stat, c);
584        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_sent",
585                         name_.c_str(), vb_);
586        add_casted_stat(buffer, backfillItems.sent, add_stat, c);
587        checked_snprintf(buffer, bsize, "%s:stream_%d_memory_phase",
588                         name_.c_str(), vb_);
589        add_casted_stat(buffer, itemsFromMemoryPhase.load(), add_stat, c);
590        checked_snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno",
591                         name_.c_str(), vb_);
592        add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
593        checked_snprintf(buffer, bsize, "%s:stream_%d_last_sent_snap_end_seqno",
594                         name_.c_str(), vb_);
595        add_casted_stat(buffer,
596                        lastSentSnapEndSeqno.load(std::memory_order_relaxed),
597                        add_stat, c);
598        checked_snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno",
599                         name_.c_str(), vb_);
600        add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
601        checked_snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory",
602                         name_.c_str(), vb_);
603        add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
604        checked_snprintf(buffer, bsize, "%s:stream_%d_items_ready",
605                         name_.c_str(), vb_);
606        add_casted_stat(buffer, itemsReady.load() ? "true" : "false", add_stat,
607                        c);
608        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_bytes",
609                         name_.c_str(), vb_);
610        add_casted_stat(buffer, bufferedBackfill.bytes, add_stat, c);
611        checked_snprintf(buffer, bsize, "%s:stream_%d_backfill_buffer_items",
612                         name_.c_str(), vb_);
613        add_casted_stat(buffer, bufferedBackfill.items, add_stat, c);
614
615        if ((state_ == STREAM_TAKEOVER_SEND) && takeoverStart != 0) {
616            checked_snprintf(buffer, bsize, "%s:stream_%d_takeover_since",
617                             name_.c_str(), vb_);
618            add_casted_stat(buffer, ep_current_time() - takeoverStart, add_stat,
619                            c);
620        }
621    } catch (std::exception& error) {
622        LOG(EXTENSION_LOG_WARNING,
623            "ActiveStream::addStats: Failed to build stats: %s", error.what());
624    }
625}
626
627void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
628    LockHolder lh(streamMutex);
629
630    RCPtr<VBucket> vb = engine->getVBucket(vb_);
631    add_casted_stat("name", name_, add_stat, cookie);
632    if (!vb || state_ == STREAM_DEAD) {
633        add_casted_stat("status", "completed", add_stat, cookie);
634        add_casted_stat("estimate", 0, add_stat, cookie);
635        add_casted_stat("backfillRemaining", 0, add_stat, cookie);
636        return;
637    }
638
639    size_t total = backfillRemaining.load(std::memory_order_relaxed);
640    if (state_ == STREAM_BACKFILLING) {
641        add_casted_stat("status", "backfilling", add_stat, cookie);
642    } else {
643        add_casted_stat("status", "in-memory", add_stat, cookie);
644    }
645    add_casted_stat("backfillRemaining",
646                    backfillRemaining.load(std::memory_order_relaxed),
647                    add_stat, cookie);
648
649    item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
650    size_t vb_items = vb->getNumItems(iep);
651    size_t chk_items = vb_items > 0 ?
652                vb->checkpointManager.getNumItemsForCursor(name_) : 0;
653
654    size_t del_items = 0;
655    try {
656        del_items = engine->getEpStore()->getRWUnderlying(vb_)->
657                                                        getNumPersistedDeletes(vb_);
658    } catch (std::runtime_error& e) {
659        producer->getLogger().log(EXTENSION_LOG_WARNING,
660            "ActiveStream:addTakeoverStats: exception while getting num persisted "
661            "deletes for vbucket:%" PRIu16 " - treating as 0 deletes. "
662            "Details: %s", vb_, e.what());
663    }
664
665    if (end_seqno_ < curChkSeqno) {
666        chk_items = 0;
667    } else if ((end_seqno_ - curChkSeqno) < chk_items) {
668        chk_items = end_seqno_ - curChkSeqno + 1;
669    }
670    total += chk_items;
671
672    add_casted_stat("estimate", total, add_stat, cookie);
673    add_casted_stat("chk_items", chk_items, add_stat, cookie);
674    add_casted_stat("vb_items", vb_items, add_stat, cookie);
675    add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
676}
677
678DcpResponse* ActiveStream::nextQueuedItem() {
679    if (!readyQ.empty()) {
680        DcpResponse* response = readyQ.front();
681        if (producer->bufferLogInsert(response->getMessageSize())) {
682            if (response->getEvent() == DCP_MUTATION ||
683                    response->getEvent() == DCP_DELETION ||
684                    response->getEvent() == DCP_EXPIRATION) {
685                lastSentSeqno.store(
686                        dynamic_cast<MutationResponse*>(response)->getBySeqno());
687
688                if (state_ == STREAM_BACKFILLING) {
689                    backfillItems.sent++;
690                } else {
691                    itemsFromMemoryPhase++;
692                }
693            }
694            popFromReadyQ();
695            return response;
696        }
697    }
698    return NULL;
699}
700
701bool ActiveStream::nextCheckpointItem() {
702    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
703    if (vbucket && vbucket->checkpointManager.getNumItemsForCursor(name_) > 0) {
704        // schedule this stream to build the next checkpoint
705        producer->scheduleCheckpointProcessorTask(this);
706        return true;
707    } else if (chkptItemsExtractionInProgress) {
708        return true;
709    }
710    return false;
711}
712
713bool ActiveStreamCheckpointProcessorTask::run() {
714    if (engine->getEpStats().isShutdown) {
715        return false;
716    }
717
718    // Setup that we will sleep forever when done.
719    snooze(INT_MAX);
720
721    // Clear the notfification flag
722    notified.store(false);
723
724    size_t iterations = 0;
725    do {
726        stream_t nextStream = queuePop();
727        ActiveStream* stream = static_cast<ActiveStream*>(nextStream.get());
728
729        if (stream) {
730            stream->nextCheckpointItemTask();
731        } else {
732            break;
733        }
734        iterations++;
735    } while(!queueEmpty()
736            && iterations < iterationsBeforeYield);
737
738    // Now check if we were re-notified or there are still checkpoints
739    bool expected = true;
740    if (notified.compare_exchange_strong(expected, false)
741        || !queueEmpty()) {
742        // snooze for 0, essentially yielding and allowing other tasks a go
743        snooze(0.0);
744    }
745
746    return true;
747}
748
749void ActiveStreamCheckpointProcessorTask::wakeup() {
750    ExecutorPool::get()->wake(getId());
751}
752
753void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
754    pushUnique(stream);
755
756    bool expected = false;
757    if (notified.compare_exchange_strong(expected, true)) {
758        wakeup();
759    }
760}
761
762void ActiveStreamCheckpointProcessorTask::clearQueues() {
763    LockHolder lh(workQueueLock);
764    while (!queue.empty()) {
765        queue.pop();
766    }
767    queuedVbuckets.clear();
768}
769
770void ActiveStream::nextCheckpointItemTask() {
771    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
772    if (vbucket) {
773        std::vector<queued_item> items;
774        getOutstandingItems(vbucket, items);
775        processItems(items);
776    } else {
777        /* The entity deleting the vbucket must set stream to dead,
778           calling setDead(END_STREAM_STATE) will cause deadlock because
779           it will try to grab streamMutex which is already acquired at this
780           point here */
781        return;
782    }
783}
784
785void ActiveStream::getOutstandingItems(RCPtr<VBucket> &vb,
786                                       std::vector<queued_item> &items) {
787    // Commencing item processing - set guard flag.
788    chkptItemsExtractionInProgress.store(true);
789
790    vb->checkpointManager.getAllItemsForCursor(name_, items);
791    if (vb->checkpointManager.getNumCheckpoints() > 1) {
792        engine->getEpStore()->wakeUpCheckpointRemover();
793    }
794}
795
796void ActiveStream::processItems(std::vector<queued_item>& items) {
797    if (!items.empty()) {
798        bool mark = false;
799        if (items.front()->getOperation() == queue_op::checkpoint_start) {
800            mark = true;
801        }
802
803        std::deque<MutationResponse*> mutations;
804        std::vector<queued_item>::iterator itr = items.begin();
805        for (; itr != items.end(); ++itr) {
806            queued_item& qi = *itr;
807
808            if (qi->shouldReplicate()) {
809                curChkSeqno = qi->getBySeqno();
810                lastReadSeqnoUnSnapshotted = qi->getBySeqno();
811
812                mutations.push_back(new MutationResponse(qi, opaque_, nullptr,
813                            isSendMutationKeyOnlyEnabled() ? KEY_ONLY :
814                                                             KEY_VALUE));
815            } else if (qi->getOperation() == queue_op::checkpoint_start) {
816                /* if there are already other mutations, then they belong to the
817                   previous checkpoint and hence we must create a snapshot and
818                   put them onto readyQ */
819                if (!mutations.empty()) {
820                    snapshot(mutations, mark);
821                    /* clear out all the mutations since they are already put
822                       onto the readyQ */
823                    mutations.clear();
824                }
825                /* mark true as it indicates a new checkpoint snapshot */
826                mark = true;
827            }
828        }
829
830        if (mutations.empty()) {
831            // If we only got checkpoint start or ends check to see if there are
832            // any more snapshots before pausing the stream.
833            nextCheckpointItemTask();
834        } else {
835            snapshot(mutations, mark);
836        }
837    }
838
839    // Completed item processing - clear guard flag and notify producer.
840    chkptItemsExtractionInProgress.store(false);
841    producer->notifyStreamReady(vb_);
842}
843
844void ActiveStream::snapshot(std::deque<MutationResponse*>& items, bool mark) {
845    if (items.empty()) {
846        return;
847    }
848
849    LockHolder lh(streamMutex);
850
851    if ((state_ == STREAM_DEAD) || (state_ == STREAM_BACKFILLING)) {
852        // If stream was closed forcefully by the time the checkpoint items
853        // retriever task completed, or if we decided to switch the stream to
854        // backfill state from in-memory state, none of the acquired mutations
855        // should be added on the stream's readyQ. We must drop items in case
856        // we switch state from in-memory to backfill because we schedule
857        // backfill from lastReadSeqno + 1
858        std::deque<MutationResponse *>::iterator itr = items.begin();
859        for (; itr != items.end(); ++itr) {
860            delete *itr;
861        }
862        items.clear();
863        return;
864    }
865
866    /* This assumes that all items in the "items deque" is put onto readyQ */
867    lastReadSeqno.store(lastReadSeqnoUnSnapshotted);
868
869    if (isCurrentSnapshotCompleted()) {
870        uint32_t flags = MARKER_FLAG_MEMORY;
871        uint64_t snapStart = items.front()->getBySeqno();
872        uint64_t snapEnd = items.back()->getBySeqno();
873
874        if (mark) {
875            flags |= MARKER_FLAG_CHK;
876        }
877
878        if (state_ == STREAM_TAKEOVER_SEND) {
879            waitForSnapshot++;
880            flags |= MARKER_FLAG_ACK;
881        }
882
883        if (!firstMarkerSent) {
884            snapStart = std::min(snap_start_seqno_, snapStart);
885            firstMarkerSent = true;
886        }
887        pushToReadyQ(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd,
888                                        flags));
889        lastSentSnapEndSeqno.store(snapEnd, std::memory_order_relaxed);
890    }
891
892    std::deque<MutationResponse*>::iterator itemItr;
893    for (itemItr = items.begin(); itemItr != items.end(); itemItr++) {
894        pushToReadyQ(*itemItr);
895    }
896}
897
898uint32_t ActiveStream::setDead(end_stream_status_t status) {
899    {
900        LockHolder lh(streamMutex);
901        endStream(status);
902    }
903
904    bool inverse = false;
905    if (status != END_STREAM_DISCONNECTED &&
906        itemsReady.compare_exchange_strong(inverse, true)) {
907        producer->notifyStreamReady(vb_);
908    }
909    return 0;
910}
911
912void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
913    if (state_ != STREAM_DEAD) {
914        bool inverse = false;
915        if (itemsReady.compare_exchange_strong(inverse, true)) {
916            producer->notifyStreamReady(vb_);
917        }
918    }
919}
920
921void ActiveStream::endStream(end_stream_status_t reason) {
922    if (state_ != STREAM_DEAD) {
923        pendingBackfill = false;
924        if (state_ == STREAM_BACKFILLING) {
925            // If Stream were in Backfilling state, clear out the
926            // backfilled items to clear up the backfill buffer.
927            clear_UNLOCKED();
928            producer->recordBackfillManagerBytesSent(bufferedBackfill.bytes);
929            bufferedBackfill.bytes = 0;
930            bufferedBackfill.items = 0;
931        }
932        transitionState(STREAM_DEAD);
933        if (reason != END_STREAM_DISCONNECTED) {
934            pushToReadyQ(new StreamEndResponse(opaque_, reason, vb_));
935        }
936        producer->getLogger().log(EXTENSION_LOG_NOTICE,
937                                  "(vb %" PRIu16 ") Stream closing, "
938                                  "sent until seqno %" PRIu64 " "
939                                  "remaining items %" PRIu64 ", "
940                                  "reason: %s",
941                                  vb_,
942                                  lastSentSeqno.load(),
943                                  uint64_t(readyQ_non_meta_items.load()),
944                                  getEndStreamStatusStr(reason));
945    }
946}
947
948void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
949    if (isBackfillTaskRunning) {
950        producer->getLogger().log(EXTENSION_LOG_NOTICE,
951                                  "(vb %" PRIu16 ") Skipping "
952                                  "scheduleBackfill_UNLOCKED; "
953                                  "lastReadSeqno %" PRIu64 ", reschedule flag "
954                                  ": %s", vb_, lastReadSeqno.load(),
955                                  reschedule ? "True" : "False");
956        return;
957    }
958
959    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
960    if (!vbucket) {
961        return;
962    }
963
964    uint64_t backfillStart = lastReadSeqno.load() + 1;
965    uint64_t backfillEnd = 0;
966    bool tryBackfill = false;
967
968    if ((flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) || reschedule) {
969        uint64_t vbHighSeqno = static_cast<uint64_t>(vbucket->getHighSeqno());
970        if (lastReadSeqno.load() > vbHighSeqno) {
971            throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
972                                   "lastReadSeqno (which is " +
973                                   std::to_string(lastReadSeqno.load()) +
974                                   " ) is greater than vbHighSeqno (which is " +
975                                   std::to_string(vbHighSeqno) + " ). " +
976                                   "for stream " + producer->logHeader() +
977                                   "; vb " + std::to_string(vb_));
978        }
979        if (reschedule) {
980            /* We need to do this for reschedule because in case of
981               DCP_ADD_STREAM_FLAG_DISKONLY (the else part), end_seqno_ is
982               set to last persisted seqno befor calling
983               scheduleBackfill_UNLOCKED() */
984            backfillEnd = engine->getEpStore()->getLastPersistedSeqno(vb_);
985        } else {
986            backfillEnd = end_seqno_;
987        }
988        tryBackfill = true;
989    } else {
990        CursorRegResult result =
991            vbucket->checkpointManager.registerCursorBySeqno(
992                                                name_,
993                                                lastReadSeqno.load(),
994                                                MustSendCheckpointEnd::NO);
995        curChkSeqno = result.first;
996        tryBackfill = result.second;
997
998        if (lastReadSeqno.load() > curChkSeqno) {
999            throw std::logic_error("ActiveStream::scheduleBackfill_UNLOCKED: "
1000                                   "lastReadSeqno (which is " +
1001                                   std::to_string(lastReadSeqno.load()) +
1002                                   " ) is greater than curChkSeqno (which is " +
1003                                   std::to_string(curChkSeqno) + " ). " +
1004                                   "for stream " + producer->logHeader() +
1005                                   "; vb " + std::to_string(vb_));
1006        }
1007
1008        /* We need to find the minimum seqno that needs to be backfilled in
1009         * order to make sure that we don't miss anything when transitioning
1010         * to a memory snapshot. The backfill task will always make sure that
1011         * the backfill end seqno is contained in the backfill.
1012         */
1013        if (backfillStart < curChkSeqno) {
1014            if (curChkSeqno > end_seqno_) {
1015                /* Backfill only is enough */
1016                backfillEnd = end_seqno_;
1017            } else {
1018                /* Backfill + in-memory streaming */
1019                backfillEnd = curChkSeqno - 1;
1020            }
1021        }
1022    }
1023
1024    if (backfillStart <= backfillEnd && tryBackfill) {
1025        producer->getLogger().log(EXTENSION_LOG_NOTICE,
1026                                  "(vb %" PRIu16 ") Scheduling backfill "
1027                                  "from %" PRIu64 " to %" PRIu64 ", reschedule "
1028                                  "flag : %s", vb_, backfillStart, backfillEnd,
1029                                  reschedule ? "True" : "False");
1030        producer->scheduleBackfillManager(this, backfillStart, backfillEnd);
1031        isBackfillTaskRunning.store(true);
1032    } else {
1033        if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
1034            endStream(END_STREAM_OK);
1035        } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
1036            transitionState(STREAM_TAKEOVER_SEND);
1037        } else {
1038            transitionState(STREAM_IN_MEMORY);
1039        }
1040        if (reschedule) {
1041            /* Cursor was dropped, but we will not do backfill.
1042               This may happen in a corner case where, the memory
1043               usage is high due to other vbuckets and persistence cursor moves
1044               ahead of replication cursor to new checkpoint open but does not
1045               persist items yet.
1046               Note: (1) We must not notify when we schedule backfill for the
1047                         first time because the stream is not yet in producer
1048                         conn list of streams
1049                     (2) It is not absolutely necessary to notify immediately
1050                         as conn manager or an incoming items will cause a
1051                         notification eventually, but wouldn't hurt to do so */
1052            bool inverse = false;
1053            if (itemsReady.compare_exchange_strong(inverse, true)) {
1054                producer->notifyStreamReady(vb_);
1055            }
1056        }
1057    }
1058}
1059
1060void ActiveStream::handleSlowStream()
1061{
1062    LockHolder lh(streamMutex);
1063    switch (state_.load()) {
1064        case STREAM_BACKFILLING:
1065            if (isBackfillTaskRunning.load()) {
1066                /* Drop the existing cursor and set pending backfill */
1067                dropCheckpointCursor_UNLOCKED();
1068                pendingBackfill = true;
1069            } else {
1070                scheduleBackfill_UNLOCKED(true);
1071            }
1072            break;
1073        case STREAM_IN_MEMORY:
1074            /* Drop the existing cursor and set pending backfill */
1075            dropCheckpointCursor_UNLOCKED();
1076            pendingBackfill = true;
1077            break;
1078        case STREAM_TAKEOVER_SEND:
1079            /* To be handled later if needed */
1080        case STREAM_TAKEOVER_WAIT:
1081            /* To be handled later if needed */
1082        case STREAM_DEAD:
1083            /* To be handled later if needed */
1084            break;
1085        case STREAM_PENDING:
1086        case STREAM_READING:
1087            throw std::logic_error("ActiveStream::handleSlowStream: "
1088                                   "called with state " +
1089                                   std::to_string(state_.load()) + " . " +
1090                                   "for stream " + producer->logHeader() +
1091                                   "; vb " + std::to_string(vb_));
1092    }
1093}
1094
1095const char* ActiveStream::getEndStreamStatusStr(end_stream_status_t status)
1096{
1097    switch (status) {
1098    case END_STREAM_OK:
1099        return "The stream ended due to all items being streamed";
1100    case END_STREAM_CLOSED:
1101        return "The stream closed early due to a close stream message";
1102    case END_STREAM_STATE:
1103        return "The stream closed early because the vbucket state changed";
1104    case END_STREAM_DISCONNECTED:
1105        return "The stream closed early because the conn was disconnected";
1106    case END_STREAM_SLOW:
1107        return "The stream was closed early because it was too slow";
1108    }
1109    std::string msg("Status unknown: " + std::to_string(status) +
1110                    "; this should not have happened!");
1111    return msg.c_str();
1112}
1113
1114void ActiveStream::transitionState(stream_state_t newState) {
1115    producer->getLogger().log(EXTENSION_LOG_DEBUG,
1116                              "(vb %d) Transitioning from %s to %s",
1117                              vb_, stateName(state_), stateName(newState));
1118
1119    if (state_ == newState) {
1120        return;
1121    }
1122
1123    bool validTransition = false;
1124    switch (state_.load()) {
1125        case STREAM_PENDING:
1126            if (newState == STREAM_BACKFILLING || newState == STREAM_DEAD) {
1127                validTransition = true;
1128            }
1129            break;
1130        case STREAM_BACKFILLING:
1131            if(newState == STREAM_IN_MEMORY ||
1132               newState == STREAM_TAKEOVER_SEND ||
1133               newState == STREAM_DEAD) {
1134                validTransition = true;
1135            }
1136            break;
1137        case STREAM_IN_MEMORY:
1138            if (newState == STREAM_BACKFILLING || newState == STREAM_DEAD) {
1139                validTransition = true;
1140            }
1141            break;
1142        case STREAM_TAKEOVER_SEND:
1143            if (newState == STREAM_TAKEOVER_WAIT || newState == STREAM_DEAD) {
1144                validTransition = true;
1145            }
1146            break;
1147        case STREAM_TAKEOVER_WAIT:
1148            if (newState == STREAM_TAKEOVER_SEND || newState == STREAM_DEAD) {
1149                validTransition = true;
1150            }
1151            break;
1152        case STREAM_READING:
1153            // Active stream should never be in READING state.
1154            validTransition = false;
1155            break;
1156        case STREAM_DEAD:
1157            // Once DEAD, no other transitions should occur.
1158            validTransition = false;
1159            break;
1160    }
1161
1162    if (!validTransition) {
1163        throw std::invalid_argument("ActiveStream::transitionState:"
1164                " newState (which is " + std::to_string(newState) +
1165                ") is not valid for current state (which is " +
1166                std::to_string(state_) + ")");
1167    }
1168
1169    stream_state_t oldState = state_.load();
1170    state_ = newState;
1171
1172    switch (newState) {
1173        case STREAM_BACKFILLING:
1174            if (STREAM_PENDING == oldState) {
1175                scheduleBackfill_UNLOCKED(false /* reschedule */);
1176            } else if (STREAM_IN_MEMORY == oldState) {
1177                scheduleBackfill_UNLOCKED(true /* reschedule */);
1178            }
1179            break;
1180        case STREAM_IN_MEMORY:
1181            // Check if the producer has sent up till the last requested
1182            // sequence number already, if not - move checkpoint items into
1183            // the ready queue.
1184            if (lastSentSeqno.load() >= end_seqno_) {
1185                // Stream transitioning to DEAD state
1186                endStream(END_STREAM_OK);
1187                bool inverse = false;
1188                if (itemsReady.compare_exchange_strong(inverse, true)) {
1189                    producer->notifyStreamReady(vb_);
1190                }
1191            } else {
1192                nextCheckpointItem();
1193            }
1194            break;
1195        case STREAM_TAKEOVER_SEND:
1196            takeoverStart = ep_current_time();
1197            nextCheckpointItem();
1198            break;
1199        case STREAM_DEAD:
1200            {
1201                RCPtr<VBucket> vb = engine->getVBucket(vb_);
1202                if (vb) {
1203                    vb->checkpointManager.removeCursor(name_);
1204                }
1205                break;
1206            }
1207        case STREAM_TAKEOVER_WAIT:
1208        case STREAM_PENDING:
1209            break;
1210        case STREAM_READING:
1211            throw std::logic_error("ActiveStream::transitionState:"
1212                    " newState can't be " + std::to_string(newState) + "!");
1213    }
1214}
1215
1216size_t ActiveStream::getItemsRemaining() {
1217    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1218
1219    if (!vbucket || state_ == STREAM_DEAD) {
1220        return 0;
1221    }
1222
1223    // Items remaining is the sum of:
1224    // (a) Items outstanding in checkpoints
1225    // (b) Items pending in our readyQ, excluding any meta items.
1226    return vbucket->checkpointManager.getNumItemsForCursor(name_) +
1227            readyQ_non_meta_items;
1228}
1229
1230uint64_t ActiveStream::getLastReadSeqno() const {
1231    return lastReadSeqno.load();
1232}
1233
1234uint64_t ActiveStream::getLastSentSeqno() const {
1235    return lastSentSeqno.load();
1236}
1237
1238const Logger& ActiveStream::getLogger() const
1239{
1240    return producer->getLogger();
1241}
1242
1243bool ActiveStream::isSendMutationKeyOnlyEnabled() const
1244{
1245    return (KEY_ONLY == payloadType);
1246}
1247
1248bool ActiveStream::isCurrentSnapshotCompleted() const
1249{
1250    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1251    // An atomic read of vbucket state without acquiring the
1252    // reader lock for state should suffice here.
1253    if (vbucket && vbucket->getState() == vbucket_state_replica) {
1254        if (lastSentSnapEndSeqno.load(std::memory_order_relaxed) >=
1255            lastReadSeqno) {
1256            return false;
1257        }
1258    }
1259    return true;
1260}
1261
1262void ActiveStream::dropCheckpointCursor_UNLOCKED()
1263{
1264    RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
1265    if (!vbucket) {
1266        endStream(END_STREAM_STATE);
1267        bool inverse = false;
1268        if (itemsReady.compare_exchange_strong(inverse, true)) {
1269            producer->notifyStreamReady(vb_);
1270        }
1271    }
1272    /* Drop the existing cursor */
1273    vbucket->checkpointManager.removeCursor(name_);
1274}
1275
1276NotifierStream::NotifierStream(EventuallyPersistentEngine* e, dcp_producer_t p,
1277                               const std::string &name, uint32_t flags,
1278                               uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1279                               uint64_t en_seqno, uint64_t vb_uuid,
1280                               uint64_t snap_start_seqno,
1281                               uint64_t snap_end_seqno)
1282    : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1283             snap_start_seqno, snap_end_seqno),
1284      producer(p) {
1285    LockHolder lh(streamMutex);
1286    RCPtr<VBucket> vbucket = e->getVBucket(vb_);
1287    if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
1288        pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1289        transitionState(STREAM_DEAD);
1290        itemsReady.store(true);
1291    }
1292
1293    type_ = STREAM_NOTIFIER;
1294
1295    producer->getLogger().log(EXTENSION_LOG_NOTICE,
1296        "(vb %d) stream created with start seqno %" PRIu64 " and end seqno %"
1297        PRIu64, vb, st_seqno, en_seqno);
1298}
1299
1300uint32_t NotifierStream::setDead(end_stream_status_t status) {
1301    LockHolder lh(streamMutex);
1302    if (state_ != STREAM_DEAD) {
1303        transitionState(STREAM_DEAD);
1304        if (status != END_STREAM_DISCONNECTED) {
1305            pushToReadyQ(new StreamEndResponse(opaque_, status, vb_));
1306            lh.unlock();
1307            bool inverse = false;
1308            if (itemsReady.compare_exchange_strong(inverse, true)) {
1309                producer->notifyStreamReady(vb_);
1310            }
1311        }
1312    }
1313    return 0;
1314}
1315
1316void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
1317    LockHolder lh(streamMutex);
1318    if (state_ != STREAM_DEAD && start_seqno_ < seqno) {
1319        pushToReadyQ(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
1320        transitionState(STREAM_DEAD);
1321        lh.unlock();
1322        bool inverse = false;
1323        if (itemsReady.compare_exchange_strong(inverse, true)) {
1324            producer->notifyStreamReady(vb_);
1325        }
1326    }
1327}
1328
1329DcpResponse* NotifierStream::next() {
1330    LockHolder lh(streamMutex);
1331
1332    if (readyQ.empty()) {
1333        itemsReady.store(false);
1334        return NULL;
1335    }
1336
1337    DcpResponse* response = readyQ.front();
1338    if (producer->bufferLogInsert(response->getMessageSize())) {
1339        popFromReadyQ();
1340    } else {
1341        response = NULL;
1342    }
1343
1344    return response;
1345}
1346
1347void NotifierStream::transitionState(stream_state_t newState) {
1348    producer->getLogger().log(EXTENSION_LOG_DEBUG,
1349        "(vb %d) Transitioning from %s to %s", vb_,
1350        stateName(state_), stateName(newState));
1351
1352    if (state_ == newState) {
1353        return;
1354    }
1355
1356    bool validTransition = false;
1357    switch (state_.load()) {
1358        case STREAM_PENDING:
1359            if (newState == STREAM_DEAD) {
1360                validTransition = true;
1361            }
1362            break;
1363
1364        case STREAM_BACKFILLING:
1365        case STREAM_IN_MEMORY:
1366        case STREAM_TAKEOVER_SEND:
1367        case STREAM_TAKEOVER_WAIT:
1368        case STREAM_READING:
1369        case STREAM_DEAD:
1370            // No other state transitions are valid for a notifier stream.
1371            break;
1372    }
1373
1374    if (!validTransition) {
1375        throw std::invalid_argument("NotifierStream::transitionState:"
1376                " newState (which is " + std::to_string(newState) +
1377                ") is not valid for current state (which is " +
1378                std::to_string(state_) + ")");
1379    }
1380    state_ = newState;
1381}
1382
1383PassiveStream::PassiveStream(EventuallyPersistentEngine* e, dcp_consumer_t c,
1384                             const std::string &name, uint32_t flags,
1385                             uint32_t opaque, uint16_t vb, uint64_t st_seqno,
1386                             uint64_t en_seqno, uint64_t vb_uuid,
1387                             uint64_t snap_start_seqno, uint64_t snap_end_seqno,
1388                             uint64_t vb_high_seqno)
1389    : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
1390             snap_start_seqno, snap_end_seqno),
1391      engine(e), consumer(c), last_seqno(vb_high_seqno), cur_snapshot_start(0),
1392      cur_snapshot_end(0), cur_snapshot_type(none), cur_snapshot_ack(false) {
1393    LockHolder lh(streamMutex);
1394    pushToReadyQ(new StreamRequest(vb, opaque, flags, st_seqno, en_seqno,
1395                                  vb_uuid, snap_start_seqno, snap_end_seqno));
1396    itemsReady.store(true);
1397    type_ = STREAM_PASSIVE;
1398
1399    const char* type = (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) ? "takeover" : "";
1400    consumer->getLogger().log(EXTENSION_LOG_NOTICE,
1401        "(vb %" PRId16 ") Attempting to add %s stream"
1402        " with start seqno %" PRIu64 ", end seqno %" PRIu64 ","
1403        " vbucket uuid %" PRIu64 ", snap start seqno %" PRIu64 ","
1404        " snap end seqno %" PRIu64 ", and vb_high_seqno %" PRIu64 "",
1405        vb, type, st_seqno, en_seqno, vb_uuid,
1406        snap_start_seqno, snap_end_seqno, vb_high_seqno);
1407}
1408
1409PassiveStream::~PassiveStream() {
1410    uint32_t unackedBytes = clearBuffer_UNLOCKED();
1411    if (transitionState(STREAM_DEAD)) {
1412        // Destructed a "live" stream, log it.
1413        consumer->getLogger().log(EXTENSION_LOG_NOTICE,
1414            "(vb %" PRId16 ") Destructing stream."
1415            " last_seqno is %" PRIu64 ", unAckedBytes is %" PRIu32 ".",
1416            vb_, last_seqno.load(), unackedBytes);
1417    }
1418}
1419
1420uint32_t PassiveStream::setDead(end_stream_status_t status) {
1421    /* Hold buffer lock so that we clear out all items before we set the stream
1422       to dead state. We do not want to add any new message to the buffer or
1423       process any items in the buffer once we set the stream state to dead. */
1424    LockHolder lh(buffer.bufMutex);
1425    uint32_t unackedBytes = clearBuffer_UNLOCKED();
1426    bool killed = false;
1427
1428    LockHolder slh(streamMutex);
1429    if (transitionState(STREAM_DEAD)) {
1430        killed = true;
1431    }
1432
1433    if (killed) {
1434        EXTENSION_LOG_LEVEL logLevel = EXTENSION_LOG_NOTICE;
1435        if (END_STREAM_DISCONNECTED == status) {
1436            logLevel = EXTENSION_LOG_WARNING;
1437        }
1438        consumer->getLogger().log(logLevel,
1439            "(vb %" PRId16 ") Setting stream to dead state, last_seqno is %"
1440            PRIu64 ", unAckedBytes is %" PRIu32 ", status is %s",
1441            vb_, last_seqno.load(), unackedBytes, getEndStreamStatusStr(status));
1442    }
1443    return unackedBytes;
1444}
1445
1446void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
1447    LockHolder lh(streamMutex);
1448    if (state_ == STREAM_PENDING) {
1449        if (status == ENGINE_SUCCESS) {
1450            transitionState(STREAM_READING);
1451        } else {
1452            transitionState(STREAM_DEAD);
1453        }
1454        pushToReadyQ(new AddStreamResponse(add_opaque, opaque_, status));
1455        lh.unlock();
1456        bool inverse = false;
1457        if (itemsReady.compare_exchange_strong(inverse, true)) {
1458            consumer->notifyStreamReady(vb_);
1459        }
1460    }
1461}
1462
1463void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
1464                                    uint32_t new_opaque,
1465                                    uint64_t start_seqno) {
1466    vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
1467
1468    snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
1469    if (info.range.end == info.start) {
1470        info.range.start = info.start;
1471    }
1472
1473    snap_start_seqno_ = info.range.start;
1474    start_seqno_ = info.start;
1475    snap_end_seqno_ = info.range.end;
1476
1477    consumer->getLogger().log(EXTENSION_LOG_NOTICE,
1478        "(vb %d) Attempting to reconnect stream with opaque %" PRIu32
1479        ", start seq no %" PRIu64 ", end seq no %" PRIu64
1480        ", snap start seqno %" PRIu64 ", and snap end seqno %" PRIu64,
1481        vb_, new_opaque, start_seqno, end_seqno_,
1482        snap_start_seqno_, snap_end_seqno_);
1483
1484    LockHolder lh(streamMutex);
1485    last_seqno.store(start_seqno);
1486    pushToReadyQ(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
1487                                  end_seqno_, vb_uuid_, snap_start_seqno_,
1488                                  snap_end_seqno_));
1489    lh.unlock();
1490    bool inverse = false;
1491    if (itemsReady.compare_exchange_strong(inverse, true)) {
1492        consumer->notifyStreamReady(vb_);
1493    }
1494}
1495
1496ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
1497    if(nullptr == resp) {
1498        return ENGINE_EINVAL;
1499    }
1500
1501    LockHolder lh(buffer.bufMutex);
1502
1503    if (state_ == STREAM_DEAD) {
1504        delete resp;
1505        return ENGINE_KEY_ENOENT;
1506    }
1507
1508    switch (resp->getEvent()) {
1509        case DCP_MUTATION:
1510        case DCP_DELETION:
1511        case DCP_EXPIRATION:
1512        {
1513            MutationResponse* m = static_cast<MutationResponse*>(resp);
1514            uint64_t bySeqno = m->getBySeqno();
1515            if (bySeqno <= last_seqno.load()) {
1516                consumer->getLogger().log(EXTENSION_LOG_WARNING,
1517                    "(vb %d) Erroneous (out of sequence) mutation received, "
1518                    "with opaque: %" PRIu32 ", its seqno (%" PRIu64 ") is not "
1519                    "greater than last received seqno (%" PRIu64 "); "
1520                    "Dropping mutation!",
1521                    vb_, opaque_, bySeqno, last_seqno.load());
1522                delete m;
1523                return ENGINE_ERANGE;
1524            }
1525            last_seqno.store(bySeqno);
1526            break;
1527        }
1528        case DCP_SNAPSHOT_MARKER:
1529        {
1530            SnapshotMarker* s = static_cast<SnapshotMarker*>(resp);
1531            uint64_t snapStart = s->getStartSeqno();
1532            uint64_t snapEnd = s->getEndSeqno();
1533            if (snapStart < last_seqno.load() && snapEnd <= last_seqno.load()) {
1534                consumer->getLogger().log(EXTENSION_LOG_WARNING,
1535                    "(vb %d) Erroneous snapshot marker received, with "
1536                    "opaque: %" PRIu32 ", its start "
1537                    "(%" PRIu64 "), and end (%" PRIu64 ") are less than last "
1538                    "received seqno (%" PRIu64 "); Dropping marker!",
1539                    vb_, opaque_, snapStart, snapEnd, last_seqno.load());
1540                delete s;
1541                return ENGINE_ERANGE;
1542            }
1543            break;
1544        }
1545        case DCP_SET_VBUCKET:
1546        case DCP_STREAM_END:
1547        {
1548            /* No validations necessary */
1549            break;
1550        }
1551        default:
1552        {
1553            consumer->getLogger().log(EXTENSION_LOG_WARNING,
1554                "(vb %d) Unknown DCP op received: %d; Disconnecting connection..",
1555                vb_, resp->getEvent());
1556            return ENGINE_DISCONNECT;
1557        }
1558    }
1559
1560    if (engine->getReplicationThrottle().shouldProcess() && !buffer.items) {
1561        /* Process the response here itself rather than buffering it */
1562        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1563        switch (resp->getEvent()) {
1564            case DCP_MUTATION:
1565                ret = processMutation(static_cast<MutationResponse*>(resp));
1566                break;
1567            case DCP_DELETION:
1568            case DCP_EXPIRATION:
1569                ret = processDeletion(static_cast<MutationResponse*>(resp));
1570                break;
1571            case DCP_SNAPSHOT_MARKER:
1572                processMarker(static_cast<SnapshotMarker*>(resp));
1573                break;
1574            case DCP_SET_VBUCKET:
1575                processSetVBucketState(static_cast<SetVBucketState*>(resp));
1576                break;
1577            case DCP_STREAM_END:
1578                {
1579                    LockHolder lh(streamMutex);
1580                    transitionState(STREAM_DEAD);
1581                }
1582                break;
1583            default:
1584                // Above switch should've returned DISCONNECT, throw an exception
1585                throw std::logic_error("PassiveStream::messageReceived: (vb " +
1586                                       std::to_string(vb_) +
1587                                       ") received unknown message type " +
1588                                       std::to_string(resp->getEvent()));
1589        }
1590        if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
1591            delete resp;
1592            return ret;
1593        }
1594    }
1595
1596    buffer.messages.push(resp);
1597    buffer.items++;
1598    buffer.bytes += resp->getMessageSize();
1599
1600    return ENGINE_TMPFAIL;
1601}
1602
1603process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes,
1604                                                             size_t batchSize) {
1605    LockHolder lh(buffer.bufMutex);
1606    uint32_t count = 0;
1607    uint32_t message_bytes = 0;
1608    uint32_t total_bytes_processed = 0;
1609    bool failed = false;
1610    if (buffer.messages.empty()) {
1611        return all_processed;
1612    }
1613
1614    while (count < batchSize && !buffer.messages.empty()) {
1615        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1616        /* If the stream is in dead state we should not process any remaining
1617           items in the buffer, we should rather clear them */
1618        if (state_ == STREAM_DEAD) {
1619            total_bytes_processed += clearBuffer_UNLOCKED();
1620            processed_bytes = total_bytes_processed;
1621            return all_processed;
1622        }
1623
1624        DcpResponse *response = buffer.messages.front();
1625        message_bytes = response->getMessageSize();
1626
1627        switch (response->getEvent()) {
1628            case DCP_MUTATION:
1629                ret = processMutation(static_cast<MutationResponse*>(response));
1630                break;
1631            case DCP_DELETION:
1632            case DCP_EXPIRATION:
1633                ret = processDeletion(static_cast<MutationResponse*>(response));
1634                break;
1635            case DCP_SNAPSHOT_MARKER:
1636                processMarker(static_cast<SnapshotMarker*>(response));
1637                break;
1638            case DCP_SET_VBUCKET:
1639                processSetVBucketState(static_cast<SetVBucketState*>(response));
1640                break;
1641            case DCP_STREAM_END:
1642                {
1643                    LockHolder lh(streamMutex);
1644                    transitionState(STREAM_DEAD);
1645                }
1646                break;
1647            default:
1648                consumer->getLogger().log(EXTENSION_LOG_WARNING,
1649                                          "PassiveStream::processBufferedMessages:"
1650                                          "(vb %" PRIu16 ") PassiveStream failing "
1651                                          "unknown message type %d",
1652                                          vb_, response->getEvent());
1653                failed = true;
1654        }
1655
1656        if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
1657            failed = true;
1658            break;
1659        }
1660
1661        delete response;
1662        buffer.messages.pop();
1663        buffer.items--;
1664        buffer.bytes -= message_bytes;
1665        count++;
1666        if (ret != ENGINE_ERANGE) {
1667            total_bytes_processed += message_bytes;
1668        }
1669    }
1670
1671    processed_bytes = total_bytes_processed;
1672
1673    if (failed) {
1674        return cannot_process;
1675    }
1676
1677    return all_processed;
1678}
1679
1680ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
1681    RCPtr<VBucket> vb = engine->getVBucket(vb_);
1682    if (!vb) {
1683        return ENGINE_NOT_MY_VBUCKET;
1684    }
1685
1686    if (mutation->getBySeqno() < cur_snapshot_start.load() ||
1687        mutation->getBySeqno() > cur_snapshot_end.load()) {
1688        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1689            "(vb %d) Erroneous mutation [sequence "
1690            "number does not fall in the expected snapshot range : "
1691            "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64 ") <= "
1692            "snapshot_end (%" PRIu64 ")]; Dropping the mutation!",
1693            vb_, cur_snapshot_start.load(),
1694            mutation->getBySeqno(), cur_snapshot_end.load());
1695        return ENGINE_ERANGE;
1696    }
1697
1698    // MB-17517: Check for the incoming item's CAS validity. We /shouldn't/
1699    // receive anything without a valid CAS, however given that versions without
1700    // this check may send us "bad" CAS values, we should regenerate them (which
1701    // is better than rejecting the data entirely).
1702    if (!Item::isValidCas(mutation->getItem()->getCas())) {
1703        LOG(EXTENSION_LOG_WARNING,
1704            "%s Invalid CAS (0x%" PRIx64 ") received for mutation {vb:%" PRIu16
1705            ", seqno:%" PRId64 "}. Regenerating new CAS",
1706            consumer->logHeader(),
1707            mutation->getItem()->getCas(), vb_,
1708            mutation->getItem()->getBySeqno());
1709        mutation->getItem()->setCas();
1710    }
1711
1712    ENGINE_ERROR_CODE ret;
1713    if (vb->isBackfillPhase()) {
1714        ret = engine->getEpStore()->addTAPBackfillItem(*mutation->getItem(),
1715                                                    INITIAL_NRU_VALUE,
1716                                                    false,
1717                                                    mutation->getExtMetaData());
1718    } else {
1719        ret = engine->getEpStore()->setWithMeta(*mutation->getItem(), 0, NULL,
1720                                                consumer->getCookie(), true,
1721                                                true, INITIAL_NRU_VALUE,
1722                                                GenerateBySeqno::No,
1723                                                GenerateCas::No,
1724                                                mutation->getExtMetaData(),
1725                                                true);
1726    }
1727
1728    // We should probably handle these error codes in a better way, but since
1729    // the producer side doesn't do anything with them anyways let's just log
1730    // them for now until we come up with a better solution.
1731    if (ret != ENGINE_SUCCESS) {
1732        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1733            "Got an error code %d while trying to process mutation", ret);
1734    } else {
1735        handleSnapshotEnd(vb, mutation->getBySeqno());
1736    }
1737
1738    return ret;
1739}
1740
1741ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
1742    RCPtr<VBucket> vb = engine->getVBucket(vb_);
1743    if (!vb) {
1744        return ENGINE_NOT_MY_VBUCKET;
1745    }
1746
1747    if (deletion->getBySeqno() < cur_snapshot_start.load() ||
1748        deletion->getBySeqno() > cur_snapshot_end.load()) {
1749        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1750            "(vb %d) Erroneous deletion [sequence "
1751            "number does not fall in the expected snapshot range : "
1752            "{snapshot_start (%" PRIu64 ") <= seq_no (%" PRIu64 ") <= "
1753            "snapshot_end (%" PRIu64 ")]; Dropping the deletion!",
1754            vb_, cur_snapshot_start.load(),
1755            deletion->getBySeqno(), cur_snapshot_end.load());
1756        return ENGINE_ERANGE;
1757    }
1758
1759    uint64_t delCas = 0;
1760    ENGINE_ERROR_CODE ret;
1761    ItemMetaData meta = deletion->getItem()->getMetaData();
1762
1763    // MB-17517: Check for the incoming item's CAS validity.
1764    if (!Item::isValidCas(meta.cas)) {
1765        LOG(EXTENSION_LOG_WARNING,
1766            "%s Invalid CAS (0x%" PRIx64 ") received for deletion {vb:%" PRIu16
1767            ", seqno:%" PRId64 "}. Regenerating new CAS",
1768            consumer->logHeader(), meta.cas, vb_, deletion->getBySeqno());
1769        meta.cas = Item::nextCas();
1770    }
1771
1772    ret = engine->getEpStore()->deleteWithMeta(deletion->getItem()->getKey(),
1773                                               &delCas, NULL, deletion->getVBucket(),
1774                                               consumer->getCookie(), true,
1775                                               &meta, vb->isBackfillPhase(),
1776                                               GenerateBySeqno::No,
1777                                               GenerateCas::No,
1778                                               deletion->getBySeqno(),
1779                                               deletion->getExtMetaData(),
1780                                               true);
1781    if (ret == ENGINE_KEY_ENOENT) {
1782        ret = ENGINE_SUCCESS;
1783    }
1784
1785    // We should probably handle these error codes in a better way, but since
1786    // the producer side doesn't do anything with them anyways let's just log
1787    // them for now until we come up with a better solution.
1788    if (ret != ENGINE_SUCCESS) {
1789        consumer->getLogger().log(EXTENSION_LOG_WARNING,
1790            "Got an error code %d while trying to process deletion", ret);
1791    } else {
1792        handleSnapshotEnd(vb, deletion->getBySeqno());
1793    }
1794
1795    return ret;
1796}
1797
1798void PassiveStream::processMarker(SnapshotMarker* marker) {
1799    RCPtr<VBucket> vb = engine->getVBucket(vb_);
1800
1801    cur_snapshot_start.store(marker->getStartSeqno());
1802    cur_snapshot_end.store(marker->getEndSeqno());
1803    cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory);
1804
1805    if (vb) {
1806        if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
1807            vb->setBackfillPhase(true);
1808            vb->checkpointManager.setBackfillPhase(cur_snapshot_start.load(),
1809                                                   cur_snapshot_end.load());
1810        } else {
1811            if (marker->getFlags() & MARKER_FLAG_CHK ||
1812                vb->checkpointManager.getOpenCheckpointId() == 0) {
1813                vb->checkpointManager.createSnapshot(cur_snapshot_start.load(),
1814                                                     cur_snapshot_end.load());
1815            } else {
1816                vb->checkpointManager.updateCurrentSnapshotEnd(cur_snapshot_end.load());
1817            }
1818            vb->setBackfillPhase(false);
1819        }
1820
1821        if (marker->getFlags() & MARKER_FLAG_ACK) {
1822            cur_snapshot_ack = true;
1823        }
1824    }
1825}
1826
1827void PassiveStream::processSetVBucketState(SetVBucketState* state) {
1828    engine->getEpStore()->setVBucketState(vb_, state->getState(), true);
1829
1830    LockHolder lh (streamMutex);
1831    pushToReadyQ(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
1832    lh.unlock();
1833    bool inverse = false;
1834    if (itemsReady.compare_exchange_strong(inverse, true)) {
1835        consumer->notifyStreamReady(vb_);
1836    }
1837}
1838
1839void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
1840    if (byseqno == cur_snapshot_end.load()) {
1841        if (cur_snapshot_type.load() == disk && vb->isBackfillPhase()) {
1842            vb->setBackfillPhase(false);
1843            uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1844            vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1845        } else {
1846            size_t mem_threshold = engine->getEpStats().mem_high_wat.load();
1847            size_t mem_used = engine->getEpStats().getTotalMemoryUsed();
1848            /* We want to add a new replica checkpoint if the mem usage is above
1849               high watermark (85%) */
1850            if (mem_threshold < mem_used) {
1851                uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1852                vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1853            }
1854        }
1855
1856        if (cur_snapshot_ack) {
1857            LockHolder lh(streamMutex);
1858            pushToReadyQ(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
1859            lh.unlock();
1860            bool inverse = false;
1861            if (itemsReady.compare_exchange_strong(inverse, true)) {
1862                consumer->notifyStreamReady(vb_);
1863            }
1864            cur_snapshot_ack = false;
1865        }
1866        cur_snapshot_type.store(none);
1867    }
1868}
1869
1870void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
1871    Stream::addStats(add_stat, c);
1872
1873    try {
1874        const int bsize = 1024;
1875        char buf[bsize];
1876        checked_snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(),
1877                         vb_);
1878        add_casted_stat(buf, buffer.items, add_stat, c);
1879        checked_snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(),
1880                         vb_);
1881        add_casted_stat(buf, buffer.bytes, add_stat, c);
1882        checked_snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(),
1883                         vb_);
1884        add_casted_stat(buf, itemsReady.load() ? "true" : "false", add_stat, c);
1885        checked_snprintf(buf, bsize, "%s:stream_%d_last_received_seqno",
1886                         name_.c_str(), vb_);
1887        add_casted_stat(buf, last_seqno.load(), add_stat, c);
1888        checked_snprintf(buf, bsize, "%s:stream_%d_ready_queue_memory",
1889                         name_.c_str(), vb_);
1890        add_casted_stat(buf, getReadyQueueMemory(), add_stat, c);
1891
1892        checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type",
1893                         name_.c_str(), vb_);
1894        add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type.load()),
1895                        add_stat, c);
1896
1897        if (cur_snapshot_type.load() != none) {
1898            checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start",
1899                             name_.c_str(), vb_);
1900            add_casted_stat(buf, cur_snapshot_start.load(), add_stat, c);
1901            checked_snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end",
1902                             name_.c_str(), vb_);
1903            add_casted_stat(buf, cur_snapshot_end.load(), add_stat, c);
1904        }
1905    } catch (std::exception& error) {
1906        LOG(EXTENSION_LOG_WARNING,
1907            "PassiveStream::addStats: Failed to build stats: %s", error.what());
1908    }
1909}
1910
1911DcpResponse* PassiveStream::next() {
1912    LockHolder lh(streamMutex);
1913
1914    if (readyQ.empty()) {
1915        itemsReady.store(false);
1916        return NULL;
1917    }
1918
1919    DcpResponse* response = readyQ.front();
1920    popFromReadyQ();
1921    return response;
1922}
1923
1924uint32_t PassiveStream::clearBuffer_UNLOCKED() {
1925    uint32_t unackedBytes = buffer.bytes;
1926
1927    while (!buffer.messages.empty()) {
1928        DcpResponse* resp = buffer.messages.front();
1929        buffer.messages.pop();
1930        delete resp;
1931    }
1932
1933    buffer.bytes = 0;
1934    buffer.items = 0;
1935    return unackedBytes;
1936}
1937
1938bool PassiveStream::transitionState(stream_state_t newState) {
1939    consumer->getLogger().log(EXTENSION_LOG_DEBUG,
1940        "(vb %d) Transitioning from %s to %s",
1941        vb_, stateName(state_), stateName(newState));
1942
1943    if (state_ == newState) {
1944        return false;
1945    }
1946
1947    bool validTransition = false;
1948    switch (state_.load()) {
1949        case STREAM_PENDING:
1950            if (newState == STREAM_READING || newState == STREAM_DEAD) {
1951                validTransition = true;
1952            }
1953            break;
1954
1955        case STREAM_BACKFILLING:
1956        case STREAM_IN_MEMORY:
1957        case STREAM_TAKEOVER_SEND:
1958        case STREAM_TAKEOVER_WAIT:
1959            // Not valid for passive streams
1960            break;
1961
1962        case STREAM_READING:
1963            if (newState == STREAM_PENDING || newState == STREAM_DEAD) {
1964                validTransition = true;
1965            }
1966            break;
1967
1968        case STREAM_DEAD:
1969            // Once 'dead' shouldn't transition away from it.
1970            break;
1971    }
1972
1973    if (!validTransition) {
1974        throw std::invalid_argument("PassiveStream::transitionState:"
1975                " newState (which is" + std::to_string(newState) +
1976                ") is not valid for current state (which is " +
1977                std::to_string(state_) + ")");
1978    }
1979
1980    state_ = newState;
1981    return true;
1982}
1983
1984const char* PassiveStream::getEndStreamStatusStr(end_stream_status_t status)
1985{
1986    switch (status) {
1987        case END_STREAM_OK:
1988            return "The stream closed as part of normal operation";
1989        case END_STREAM_CLOSED:
1990            return "The stream closed due to a close stream message";
1991        case END_STREAM_DISCONNECTED:
1992            return "The stream closed early because the conn was disconnected";
1993        case END_STREAM_STATE:
1994            return "The stream closed early because the vbucket state changed";
1995        default:
1996            break;
1997    }
1998    std::string msg("Status unknown: " + std::to_string(status) +
1999                    "; this should not have happened!");
2000    return msg.c_str();
2001}
2002