1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include "ep_engine.h"
21 #include "failover-table.h"
22 #include "kvstore.h"
23 #include "statwriter.h"
24 #include "dcp-stream.h"
25 #include "dcp-consumer.h"
26 #include "dcp-producer.h"
27 #include "dcp-response.h"
28 
29 #define DCP_BACKFILL_SLEEP_TIME 2
30 
snapshotTypeToString(snapshot_type_t type)31 static const char* snapshotTypeToString(snapshot_type_t type) {
32     static const char * const snapshotTypes[] = { "none", "disk", "memory" };
33     cb_assert(type >= none && type <= memory);
34     return snapshotTypes[type];
35 }
36 
37 const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
38 const size_t PassiveStream::batchSize = 10;
39 
40 class SnapshotMarkerCallback : public Callback<SeqnoRange> {
41 public:
SnapshotMarkerCallback(stream_t s)42     SnapshotMarkerCallback(stream_t s)
43         : stream(s) {
44         cb_assert(s->getType() == STREAM_ACTIVE);
45     }
46 
callback(SeqnoRange &range)47     void callback(SeqnoRange &range) {
48         uint64_t st = range.getStartSeqno();
49         uint64_t en = range.getEndSeqno();
50         static_cast<ActiveStream*>(stream.get())->markDiskSnapshot(st, en);
51     }
52 
53 private:
54     stream_t stream;
55 };
56 
57 class CacheCallback : public Callback<CacheLookup> {
58 public:
CacheCallback(EventuallyPersistentEngine* e, stream_t &s)59     CacheCallback(EventuallyPersistentEngine* e, stream_t &s)
60         : engine_(e), stream_(s) {
61         Stream *str = stream_.get();
62         if (str) {
63             cb_assert(str->getType() == STREAM_ACTIVE);
64         }
65     }
66 
67     void callback(CacheLookup &lookup);
68 
69 private:
70     EventuallyPersistentEngine* engine_;
71     stream_t stream_;
72 };
73 
callback(CacheLookup &lookup)74 void CacheCallback::callback(CacheLookup &lookup) {
75     RCPtr<VBucket> vb = engine_->getEpStore()->getVBucket(lookup.getVBucketId());
76     if (!vb) {
77         setStatus(ENGINE_SUCCESS);
78         return;
79     }
80 
81     int bucket_num(0);
82     LockHolder lh = vb->ht.getLockedBucket(lookup.getKey(), &bucket_num);
83     StoredValue *v = vb->ht.unlocked_find(lookup.getKey(), bucket_num, false, false);
84     if (v && v->isResident() && v->getBySeqno() == lookup.getBySeqno()) {
85         Item* it = v->toItem(false, lookup.getVBucketId());
86         lh.unlock();
87         static_cast<ActiveStream*>(stream_.get())->backfillReceived(it);
88         setStatus(ENGINE_KEY_EEXISTS);
89     } else {
90         setStatus(ENGINE_SUCCESS);
91     }
92 }
93 
94 class DiskCallback : public Callback<GetValue> {
95 public:
DiskCallback(stream_t &s)96     DiskCallback(stream_t &s)
97         : stream_(s) {
98         Stream *str = stream_.get();
99         if (str) {
100             cb_assert(str->getType() == STREAM_ACTIVE);
101         }
102     }
103 
callback(GetValue &val)104     void callback(GetValue &val) {
105         cb_assert(val.getValue());
106         ActiveStream* active_stream = static_cast<ActiveStream*>(stream_.get());
107         active_stream->backfillReceived(val.getValue());
108     }
109 
110 private:
111     stream_t stream_;
112 };
113 
114 class DCPBackfill : public GlobalTask {
115 public:
DCPBackfill(EventuallyPersistentEngine* e, stream_t s, uint64_t start_seqno, uint64_t end_seqno, const Priority &p, double sleeptime = 0, bool shutdown = false)116     DCPBackfill(EventuallyPersistentEngine* e, stream_t s,
117                 uint64_t start_seqno, uint64_t end_seqno, const Priority &p,
118                 double sleeptime = 0, bool shutdown = false)
119         : GlobalTask(e, p, sleeptime, shutdown), engine(e), stream(s),
120           startSeqno(start_seqno), endSeqno(end_seqno) {
121         cb_assert(stream->getType() == STREAM_ACTIVE);
122     }
123 
124     bool run();
125 
126     std::string getDescription();
127 
128 private:
129     EventuallyPersistentEngine *engine;
130     stream_t                    stream;
131     uint64_t                    startSeqno;
132     uint64_t                    endSeqno;
133 };
134 
run()135 bool DCPBackfill::run() {
136     uint16_t vbid = stream->getVBucket();
137 
138     if (engine->getEpStore()->isMemoryUsageTooHigh()) {
139         LOG(EXTENSION_LOG_INFO, "VBucket %d dcp backfill task temporarily "
140                 "suspended  because the current memory usage is too high",
141                 vbid);
142         snooze(DCP_BACKFILL_SLEEP_TIME);
143         return true;
144     }
145 
146     uint64_t lastPersistedSeqno =
147         engine->getEpStore()->getLastPersistedSeqno(vbid);
148     uint64_t diskSeqno =
149         engine->getEpStore()->getRWUnderlying(vbid)->getLastPersistedSeqno(vbid);
150 
151     if (lastPersistedSeqno < endSeqno) {
152         LOG(EXTENSION_LOG_WARNING, "Rescheduling backfill for vbucket %d "
153             "because backfill up to seqno %llu is needed but only up to "
154             "%llu is persisted (disk %llu)", vbid, endSeqno,
155             lastPersistedSeqno, diskSeqno);
156         snooze(DCP_BACKFILL_SLEEP_TIME);
157         return true;
158     }
159 
160     KVStore* kvstore = engine->getEpStore()->getROUnderlying(vbid);
161     size_t numItems = kvstore->getNumItems(vbid, startSeqno,
162                                            std::numeric_limits<uint64_t>::max());
163     static_cast<ActiveStream*>(stream.get())->incrBackfillRemaining(numItems);
164 
165     shared_ptr<Callback<GetValue> > cb(new DiskCallback(stream));
166     shared_ptr<Callback<CacheLookup> > cl(new CacheCallback(engine, stream));
167     shared_ptr<Callback<SeqnoRange> > sr(new SnapshotMarkerCallback(stream));
168     kvstore->dump(vbid, startSeqno, cb, cl, sr);
169 
170     static_cast<ActiveStream*>(stream.get())->completeBackfill();
171 
172     LOG(EXTENSION_LOG_WARNING, "Backfill task (%llu to %llu) finished for vb %d"
173         " disk seqno %llu memory seqno %llu", startSeqno, endSeqno,
174         stream->getVBucket(), diskSeqno, lastPersistedSeqno);
175 
176     return false;
177 }
178 
getDescription()179 std::string DCPBackfill::getDescription() {
180     std::stringstream ss;
181     ss << "DCP backfill for vbucket " << stream->getVBucket();
182     return ss.str();
183 }
184 
Stream(const std::string &name, uint32_t flags, uint32_t opaque, uint16_t vb, uint64_t start_seqno, uint64_t end_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)185 Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
186                uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
187                uint64_t vb_uuid, uint64_t snap_start_seqno,
188                uint64_t snap_end_seqno)
189     : name_(name), flags_(flags), opaque_(opaque), vb_(vb),
190       start_seqno_(start_seqno), end_seqno_(end_seqno), vb_uuid_(vb_uuid),
191       snap_start_seqno_(snap_start_seqno),
192       snap_end_seqno_(snap_end_seqno),
193       state_(STREAM_PENDING), itemsReady(false) {
194 }
195 
clear_UNLOCKED()196 void Stream::clear_UNLOCKED() {
197     while (!readyQ.empty()) {
198         DcpResponse* resp = readyQ.front();
199         delete resp;
200         readyQ.pop();
201     }
202 }
203 
stateName(stream_state_t st) const204 const char * Stream::stateName(stream_state_t st) const {
205     static const char * const stateNames[] = {
206         "pending", "backfilling", "in-memory", "takeover-send", "takeover-wait",
207         "reading", "dead"
208     };
209     cb_assert(st >= STREAM_PENDING && st <= STREAM_DEAD);
210     return stateNames[st];
211 }
212 
addStats(ADD_STAT add_stat, const void *c)213 void Stream::addStats(ADD_STAT add_stat, const void *c) {
214     const int bsize = 128;
215     char buffer[bsize];
216     snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(), vb_);
217     add_casted_stat(buffer, flags_, add_stat, c);
218     snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(), vb_);
219     add_casted_stat(buffer, opaque_, add_stat, c);
220     snprintf(buffer, bsize, "%s:stream_%d_start_seqno", name_.c_str(), vb_);
221     add_casted_stat(buffer, start_seqno_, add_stat, c);
222     snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(), vb_);
223     add_casted_stat(buffer, end_seqno_, add_stat, c);
224     snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(), vb_);
225     add_casted_stat(buffer, vb_uuid_, add_stat, c);
226     snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno", name_.c_str(), vb_);
227     add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
228     snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno", name_.c_str(), vb_);
229     add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
230     snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(), vb_);
231     add_casted_stat(buffer, stateName(state_), add_stat, c);
232 }
233 
ActiveStream(EventuallyPersistentEngine* e, DcpProducer* p, const std::string &n, uint32_t flags, uint32_t opaque, uint16_t vb, uint64_t st_seqno, uint64_t en_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)234 ActiveStream::ActiveStream(EventuallyPersistentEngine* e, DcpProducer* p,
235                            const std::string &n, uint32_t flags,
236                            uint32_t opaque, uint16_t vb, uint64_t st_seqno,
237                            uint64_t en_seqno, uint64_t vb_uuid,
238                            uint64_t snap_start_seqno, uint64_t snap_end_seqno)
239     :  Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
240               snap_start_seqno, snap_end_seqno),
241        lastReadSeqno(st_seqno), lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
242        takeoverState(vbucket_state_pending), backfillRemaining(0),
243        itemsFromBackfill(0), itemsFromMemory(0), firstMarkerSent(false),
244        waitForSnapshot(0), engine(e), producer(p),
245        isBackfillTaskRunning(false) {
246 
247     const char* type = "";
248     if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
249         type = "takeover ";
250         end_seqno_ = dcpMaxSeqno;
251     }
252 
253     if (start_seqno_ >= end_seqno_) {
254         endStream(END_STREAM_OK);
255         itemsReady = true;
256     }
257 
258     type_ = STREAM_ACTIVE;
259 
260     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) %sstream created with start seqno "
261         "%llu and end seqno %llu", producer->logHeader(), vb, type, st_seqno,
262         en_seqno);
263 }
264 
next()265 DcpResponse* ActiveStream::next() {
266     LockHolder lh(streamMutex);
267 
268     stream_state_t initState = state_;
269 
270     DcpResponse* response = NULL;
271     switch (state_) {
272         case STREAM_PENDING:
273             break;
274         case STREAM_BACKFILLING:
275             response = backfillPhase();
276             break;
277         case STREAM_IN_MEMORY:
278             response = inMemoryPhase();
279             break;
280         case STREAM_TAKEOVER_SEND:
281             response = takeoverSendPhase();
282             break;
283         case STREAM_TAKEOVER_WAIT:
284             response = takeoverWaitPhase();
285             break;
286         case STREAM_DEAD:
287             response = deadPhase();
288             break;
289         default:
290             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid state '%s'",
291                 producer->logHeader(), vb_, stateName(state_));
292             abort();
293     }
294 
295     if (state_ != STREAM_DEAD && initState != state_ && !response) {
296         lh.unlock();
297         return next();
298     }
299 
300     itemsReady = response ? true : false;
301     return response;
302 }
303 
markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno)304 void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
305     LockHolder lh(streamMutex);
306     if (state_ != STREAM_BACKFILLING) {
307         return;
308     }
309 
310     startSeqno = std::min(snap_start_seqno_, startSeqno);
311     firstMarkerSent = true;
312 
313     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Sending disk snapshot with start "
314         "seqno %llu and end seqno %llu", producer->logHeader(), vb_, startSeqno,
315         endSeqno);
316     readyQ.push(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
317                                    MARKER_FLAG_DISK));
318     RCPtr<VBucket> vb = engine->getVBucket(vb_);
319     if (!vb) {
320         endStream(END_STREAM_STATE);
321     } else {
322         if (endSeqno > end_seqno_) {
323             endSeqno = end_seqno_;
324         }
325         // Only re-register the cursor if we still need to get memory snapshots
326         CursorRegResult result =
327             vb->checkpointManager.registerTAPCursorBySeqno(name_, endSeqno);
328         curChkSeqno = result.first;
329     }
330 
331     if (!itemsReady) {
332         itemsReady = true;
333         lh.unlock();
334         producer->notifyStreamReady(vb_, false);
335     }
336 }
337 
backfillReceived(Item* itm)338 void ActiveStream::backfillReceived(Item* itm) {
339     LockHolder lh(streamMutex);
340     if (state_ == STREAM_BACKFILLING) {
341         readyQ.push(new MutationResponse(itm, opaque_));
342         lastReadSeqno = itm->getBySeqno();
343 
344         if (!itemsReady) {
345             itemsReady = true;
346             lh.unlock();
347             producer->notifyStreamReady(vb_, false);
348         }
349     } else {
350         delete itm;
351     }
352 }
353 
completeBackfill()354 void ActiveStream::completeBackfill() {
355     LockHolder lh(streamMutex);
356 
357     if (state_ == STREAM_BACKFILLING) {
358         isBackfillTaskRunning = false;
359         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Backfill complete, %d items read"
360             " from disk, last seqno read: %ld", producer->logHeader(), vb_,
361             itemsFromBackfill, lastReadSeqno);
362 
363         if (!itemsReady) {
364             itemsReady = true;
365             lh.unlock();
366             producer->notifyStreamReady(vb_, false);
367         }
368     }
369 }
370 
snapshotMarkerAckReceived()371 void ActiveStream::snapshotMarkerAckReceived() {
372     LockHolder lh (streamMutex);
373     waitForSnapshot--;
374 
375     if (!itemsReady && waitForSnapshot == 0) {
376         itemsReady = true;
377         lh.unlock();
378         producer->notifyStreamReady(vb_, true);
379     }
380 }
381 
setVBucketStateAckRecieved()382 void ActiveStream::setVBucketStateAckRecieved() {
383     LockHolder lh(streamMutex);
384     if (state_ == STREAM_TAKEOVER_WAIT) {
385         if (takeoverState == vbucket_state_pending) {
386             RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
387             engine->getEpStore()->setVBucketState(vb_, vbucket_state_dead,
388                                                   false, false);
389             takeoverState = vbucket_state_active;
390             transitionState(STREAM_TAKEOVER_SEND);
391             LOG(EXTENSION_LOG_INFO, "%s (vb %d) Receive ack for set vbucket "
392                 "state to pending message", producer->logHeader(), vb_);
393         } else {
394             LOG(EXTENSION_LOG_INFO, "%s (vb %d) Receive ack for set vbucket "
395                 "state to active message", producer->logHeader(), vb_);
396             endStream(END_STREAM_OK);
397         }
398 
399         if (!itemsReady) {
400             itemsReady = true;
401             lh.unlock();
402             producer->notifyStreamReady(vb_, true);
403         }
404     } else {
405         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Unexpected ack for set vbucket "
406             "op on stream '%s' state '%s'", producer->logHeader(), vb_,
407             name_.c_str(), stateName(state_));
408     }
409 }
410 
backfillPhase()411 DcpResponse* ActiveStream::backfillPhase() {
412     DcpResponse* resp = nextQueuedItem();
413 
414     if (resp && backfillRemaining > 0 &&
415         (resp->getEvent() == DCP_MUTATION ||
416          resp->getEvent() == DCP_DELETION ||
417          resp->getEvent() == DCP_EXPIRATION)) {
418         backfillRemaining--;
419     }
420 
421     if (!isBackfillTaskRunning && readyQ.empty()) {
422         backfillRemaining = 0;
423         if (lastReadSeqno >= end_seqno_) {
424             endStream(END_STREAM_OK);
425         } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
426             transitionState(STREAM_TAKEOVER_SEND);
427         } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
428             endStream(END_STREAM_OK);
429         } else {
430             transitionState(STREAM_IN_MEMORY);
431         }
432 
433         if (!resp) {
434             resp = nextQueuedItem();
435         }
436     }
437 
438     return resp;
439 }
440 
inMemoryPhase()441 DcpResponse* ActiveStream::inMemoryPhase() {
442     if (!readyQ.empty()) {
443         return nextQueuedItem();
444     }
445 
446     if (lastSentSeqno >= end_seqno_) {
447         endStream(END_STREAM_OK);
448     } else {
449         nextCheckpointItem();
450     }
451 
452     return nextQueuedItem();
453 }
454 
takeoverSendPhase()455 DcpResponse* ActiveStream::takeoverSendPhase() {
456     if (!readyQ.empty()) {
457         return nextQueuedItem();
458     } else {
459         nextCheckpointItem();
460         if (!readyQ.empty()) {
461             return nextQueuedItem();
462         }
463     }
464 
465     if (waitForSnapshot != 0) {
466         return NULL;
467     }
468 
469     DcpResponse* resp = new SetVBucketState(opaque_, vb_, takeoverState);
470     transitionState(STREAM_TAKEOVER_WAIT);
471     return resp;
472 }
473 
takeoverWaitPhase()474 DcpResponse* ActiveStream::takeoverWaitPhase() {
475     return nextQueuedItem();
476 }
477 
deadPhase()478 DcpResponse* ActiveStream::deadPhase() {
479     return nextQueuedItem();
480 }
481 
addStats(ADD_STAT add_stat, const void *c)482 void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
483     Stream::addStats(add_stat, c);
484 
485     const int bsize = 128;
486     char buffer[bsize];
487     snprintf(buffer, bsize, "%s:stream_%d_backfilled", name_.c_str(), vb_);
488     add_casted_stat(buffer, itemsFromBackfill, add_stat, c);
489     snprintf(buffer, bsize, "%s:stream_%d_memory", name_.c_str(), vb_);
490     add_casted_stat(buffer, itemsFromMemory, add_stat, c);
491     snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
492     add_casted_stat(buffer, lastSentSeqno, add_stat, c);
493     snprintf(buffer, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
494     add_casted_stat(buffer, itemsReady ? "true" : "false", add_stat, c);
495 }
496 
addTakeoverStats(ADD_STAT add_stat, const void *cookie)497 void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
498     LockHolder lh(streamMutex);
499 
500     RCPtr<VBucket> vb = engine->getVBucket(vb_);
501     add_casted_stat("name", name_, add_stat, cookie);
502     if (!vb || state_ == STREAM_DEAD) {
503         add_casted_stat("status", "completed", add_stat, cookie);
504         add_casted_stat("estimate", 0, add_stat, cookie);
505         add_casted_stat("backfillRemaining", 0, add_stat, cookie);
506         add_casted_stat("estimate", 0, add_stat, cookie);
507         return;
508     }
509 
510     size_t total = backfillRemaining;
511     if (state_ == STREAM_BACKFILLING) {
512         add_casted_stat("status", "backfilling", add_stat, cookie);
513     } else {
514         add_casted_stat("status", "in-memory", add_stat, cookie);
515     }
516     add_casted_stat("backfillRemaining", backfillRemaining, add_stat, cookie);
517 
518     item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
519     size_t vb_items = vb->getNumItems(iep);
520     size_t chk_items = vb_items > 0 ?
521                 vb->checkpointManager.getNumItemsForTAPConnection(name_) : 0;
522     size_t del_items = engine->getEpStore()->getRWUnderlying(vb_)->
523                                                     getNumPersistedDeletes(vb_);
524 
525     if (end_seqno_ < curChkSeqno) {
526         chk_items = 0;
527     } else if ((end_seqno_ - curChkSeqno) < chk_items) {
528         chk_items = end_seqno_ - curChkSeqno + 1;
529     }
530     total += chk_items;
531 
532     add_casted_stat("estimate", total, add_stat, cookie);
533     add_casted_stat("chk_items", chk_items, add_stat, cookie);
534     add_casted_stat("vb_items", vb_items, add_stat, cookie);
535     add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
536 }
537 
nextQueuedItem()538 DcpResponse* ActiveStream::nextQueuedItem() {
539     if (!readyQ.empty()) {
540         DcpResponse* response = readyQ.front();
541         if (response->getEvent() == DCP_MUTATION ||
542             response->getEvent() == DCP_DELETION ||
543             response->getEvent() == DCP_EXPIRATION) {
544             lastSentSeqno = dynamic_cast<MutationResponse*>(response)->getBySeqno();
545 
546             if (state_ == STREAM_BACKFILLING) {
547                 itemsFromBackfill++;
548             } else {
549                 itemsFromMemory++;
550             }
551         }
552         readyQ.pop();
553         return response;
554     }
555     return NULL;
556 }
557 
nextCheckpointItem()558 void ActiveStream::nextCheckpointItem() {
559     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
560 
561     bool mark = false;
562     std::list<queued_item> items;
563     std::list<MutationResponse*> mutations;
564     vbucket->checkpointManager.getAllItemsForCursor(name_, items);
565     if (vbucket->checkpointManager.getNumCheckpoints() > 1) {
566         engine->getEpStore()->wakeUpCheckpointRemover();
567     }
568 
569     if (!items.empty() && items.front()->getOperation() == queue_op_checkpoint_start) {
570         mark = true;
571     }
572 
573     while (!items.empty()) {
574         queued_item qi = items.front();
575         items.pop_front();
576 
577         if (qi->getOperation() == queue_op_set ||
578             qi->getOperation() == queue_op_del) {
579             curChkSeqno = qi->getBySeqno();
580             lastReadSeqno = qi->getBySeqno();
581 
582             mutations.push_back(new MutationResponse(qi, opaque_));
583         } else if (qi->getOperation() == queue_op_checkpoint_start) {
584             snapshot(mutations, mark);
585             mark = true;
586         }
587     }
588     snapshot(mutations, mark);
589 }
590 
snapshot(std::list<MutationResponse*>& items, bool mark)591 void ActiveStream::snapshot(std::list<MutationResponse*>& items, bool mark) {
592     if (items.empty()) {
593         return;
594     }
595 
596     uint32_t flags = MARKER_FLAG_MEMORY;
597     uint64_t snapStart = items.front()->getBySeqno();
598     uint64_t snapEnd = items.back()->getBySeqno();
599 
600     if (mark) {
601         flags |= MARKER_FLAG_CHK;
602     }
603 
604     if (state_ == STREAM_TAKEOVER_SEND) {
605         waitForSnapshot++;
606         flags |= MARKER_FLAG_ACK;
607     }
608 
609     if (!firstMarkerSent) {
610         snapStart = std::min(snap_start_seqno_, snapStart);
611         firstMarkerSent = true;
612     }
613 
614     readyQ.push(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd, flags));
615     while(!items.empty()) {
616         readyQ.push(items.front());
617         items.pop_front();
618     }
619 }
620 
setDead(end_stream_status_t status)621 uint32_t ActiveStream::setDead(end_stream_status_t status) {
622     LockHolder lh(streamMutex);
623     endStream(status);
624 
625     if (!itemsReady && status != END_STREAM_DISCONNECTED) {
626         itemsReady = true;
627         lh.unlock();
628         producer->notifyStreamReady(vb_, true);
629     }
630     return 0;
631 }
632 
notifySeqnoAvailable(uint64_t seqno)633 void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
634     LockHolder lh(streamMutex);
635     if (state_ != STREAM_DEAD) {
636         if (!itemsReady) {
637             itemsReady = true;
638             lh.unlock();
639             producer->notifyStreamReady(vb_, true);
640         }
641     }
642 }
643 
endStream(end_stream_status_t reason)644 void ActiveStream::endStream(end_stream_status_t reason) {
645     if (state_ != STREAM_DEAD) {
646         if (reason != END_STREAM_DISCONNECTED) {
647             readyQ.push(new StreamEndResponse(opaque_, reason, vb_));
648         }
649         transitionState(STREAM_DEAD);
650         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream closing, %llu items sent"
651             " from disk, %llu items sent from memory, %llu was last seqno sent",
652             producer->logHeader(), vb_, itemsFromBackfill, itemsFromMemory,
653             lastSentSeqno);
654     }
655 }
656 
scheduleBackfill()657 void ActiveStream::scheduleBackfill() {
658     if (!isBackfillTaskRunning) {
659         RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
660         if (!vbucket) {
661             return;
662         }
663 
664         CursorRegResult result =
665             vbucket->checkpointManager.registerTAPCursorBySeqno(name_,
666                                                                 lastReadSeqno);
667         curChkSeqno = result.first;
668         bool isFirstItem = result.second;
669 
670         cb_assert(lastReadSeqno <= curChkSeqno);
671         uint64_t backfillStart = lastReadSeqno + 1;
672 
673         /* We need to find the minimum seqno that needs to be backfilled in
674          * order to make sure that we don't miss anything when transitioning
675          * to a memory snapshot. The backfill task will always make sure that
676          * the backfill end seqno is contained in the backfill.
677          */
678         uint64_t backfillEnd = 0;
679         if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) { // disk backfill only
680             backfillEnd = end_seqno_;
681         } else { // disk backfill + in-memory streaming
682             if (backfillStart < curChkSeqno) {
683                 if (curChkSeqno > end_seqno_) {
684                     backfillEnd = end_seqno_;
685                 } else {
686                     backfillEnd = curChkSeqno - 1;
687                 }
688             }
689         }
690 
691         bool tryBackfill = isFirstItem || flags_ & DCP_ADD_STREAM_FLAG_DISKONLY;
692 
693         if (backfillStart <= backfillEnd && tryBackfill) {
694             ExTask task = new DCPBackfill(engine, this, backfillStart, backfillEnd,
695                                           Priority::TapBgFetcherPriority, 0, false);
696             ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
697             isBackfillTaskRunning = true;
698         } else {
699             if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
700                 endStream(END_STREAM_OK);
701             } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
702                 transitionState(STREAM_TAKEOVER_SEND);
703             } else {
704                 transitionState(STREAM_IN_MEMORY);
705             }
706             itemsReady = true;
707         }
708     }
709 }
710 
transitionState(stream_state_t newState)711 void ActiveStream::transitionState(stream_state_t newState) {
712     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
713         producer->logHeader(), vb_, stateName(state_), stateName(newState));
714 
715     if (state_ == newState) {
716         return;
717     }
718 
719     switch (state_) {
720         case STREAM_PENDING:
721             cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
722             break;
723         case STREAM_BACKFILLING:
724             cb_assert(newState == STREAM_IN_MEMORY ||
725                    newState == STREAM_TAKEOVER_SEND ||
726                    newState == STREAM_DEAD);
727             break;
728         case STREAM_IN_MEMORY:
729             cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
730             break;
731         case STREAM_TAKEOVER_SEND:
732             cb_assert(newState == STREAM_TAKEOVER_WAIT || newState == STREAM_DEAD);
733             break;
734         case STREAM_TAKEOVER_WAIT:
735             cb_assert(newState == STREAM_TAKEOVER_SEND || newState == STREAM_DEAD);
736             break;
737         default:
738             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
739                 "to %s", producer->logHeader(), vb_, stateName(state_),
740                 stateName(newState));
741             abort();
742     }
743 
744     state_ = newState;
745 
746     if (newState == STREAM_BACKFILLING) {
747         scheduleBackfill();
748     } else if (newState == STREAM_TAKEOVER_SEND) {
749         nextCheckpointItem();
750     } else if (newState == STREAM_DEAD) {
751         RCPtr<VBucket> vb = engine->getVBucket(vb_);
752         if (vb) {
753             vb->checkpointManager.removeTAPCursor(name_);
754         }
755     }
756 }
757 
getItemsRemaining()758 size_t ActiveStream::getItemsRemaining() {
759     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
760 
761     if (!vbucket || state_ == STREAM_DEAD) {
762         return 0;
763     }
764 
765     uint64_t high_seqno = vbucket->getHighSeqno();
766 
767     if (end_seqno_ < high_seqno) {
768         if (end_seqno_ > lastSentSeqno) {
769             return (end_seqno_ - lastSentSeqno);
770         }
771     } else {
772         if (high_seqno > lastSentSeqno) {
773             return (high_seqno - lastSentSeqno);
774         }
775     }
776 
777     return 0;
778 }
779 
NotifierStream(EventuallyPersistentEngine* e, DcpProducer* p, const std::string &name, uint32_t flags, uint32_t opaque, uint16_t vb, uint64_t st_seqno, uint64_t en_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)780 NotifierStream::NotifierStream(EventuallyPersistentEngine* e, DcpProducer* p,
781                                const std::string &name, uint32_t flags,
782                                uint32_t opaque, uint16_t vb, uint64_t st_seqno,
783                                uint64_t en_seqno, uint64_t vb_uuid,
784                                uint64_t snap_start_seqno,
785                                uint64_t snap_end_seqno)
786     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
787              snap_start_seqno, snap_end_seqno),
788       producer(p) {
789     LockHolder lh(streamMutex);
790     RCPtr<VBucket> vbucket = e->getVBucket(vb_);
791     if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
792         readyQ.push(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
793         transitionState(STREAM_DEAD);
794         itemsReady = true;
795     }
796 
797     type_ = STREAM_NOTIFIER;
798 
799     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) stream created with start seqno "
800         "%llu and end seqno %llu", producer->logHeader(), vb, st_seqno,
801         en_seqno);
802 }
803 
setDead(end_stream_status_t status)804 uint32_t NotifierStream::setDead(end_stream_status_t status) {
805     LockHolder lh(streamMutex);
806     if (state_ != STREAM_DEAD) {
807         transitionState(STREAM_DEAD);
808         if (status != END_STREAM_DISCONNECTED) {
809             readyQ.push(new StreamEndResponse(opaque_, status, vb_));
810             if (!itemsReady) {
811                 itemsReady = true;
812                 lh.unlock();
813                 producer->notifyStreamReady(vb_, true);
814             }
815         }
816     }
817     return 0;
818 }
819 
notifySeqnoAvailable(uint64_t seqno)820 void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
821     LockHolder lh(streamMutex);
822     if (state_ != STREAM_DEAD && start_seqno_ < seqno) {
823         readyQ.push(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
824         transitionState(STREAM_DEAD);
825         if (!itemsReady) {
826             itemsReady = true;
827             lh.unlock();
828             producer->notifyStreamReady(vb_, true);
829         }
830     }
831 }
832 
next()833 DcpResponse* NotifierStream::next() {
834     LockHolder lh(streamMutex);
835 
836     if (readyQ.empty()) {
837         itemsReady = false;
838         return NULL;
839     }
840 
841     DcpResponse* response = readyQ.front();
842     readyQ.pop();
843 
844     return response;
845 }
846 
transitionState(stream_state_t newState)847 void NotifierStream::transitionState(stream_state_t newState) {
848     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
849         producer->logHeader(), vb_, stateName(state_), stateName(newState));
850 
851     if (state_ == newState) {
852         return;
853     }
854 
855     switch (state_) {
856         case STREAM_PENDING:
857             cb_assert(newState == STREAM_DEAD);
858             break;
859         default:
860             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
861                 "to %s", producer->logHeader(), vb_, stateName(state_),
862                 stateName(newState));
863             abort();
864     }
865 
866     state_ = newState;
867 }
868 
PassiveStream(EventuallyPersistentEngine* e, DcpConsumer* c, const std::string &name, uint32_t flags, uint32_t opaque, uint16_t vb, uint64_t st_seqno, uint64_t en_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)869 PassiveStream::PassiveStream(EventuallyPersistentEngine* e, DcpConsumer* c,
870                              const std::string &name, uint32_t flags,
871                              uint32_t opaque, uint16_t vb, uint64_t st_seqno,
872                              uint64_t en_seqno, uint64_t vb_uuid,
873                              uint64_t snap_start_seqno, uint64_t snap_end_seqno)
874     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
875              snap_start_seqno, snap_end_seqno),
876       engine(e), consumer(c), last_seqno(st_seqno), cur_snapshot_start(0),
877       cur_snapshot_end(0), cur_snapshot_type(none), cur_snapshot_ack(false),
878       saveSnapshot(false) {
879     LockHolder lh(streamMutex);
880     readyQ.push(new StreamRequest(vb, opaque, flags, st_seqno, en_seqno,
881                                   vb_uuid, snap_start_seqno, snap_end_seqno));
882     itemsReady = true;
883     type_ = STREAM_PASSIVE;
884 
885     const char* type = (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) ? "takeover" : "";
886     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Attempting to add %s stream with "
887         "start seqno %llu, end seqno %llu, vbucket uuid %llu, snap start seqno "
888         "%llu, and snap end seqno %llu", consumer->logHeader(), vb, type,
889         st_seqno, en_seqno, vb_uuid, snap_start_seqno, snap_end_seqno);
890 }
891 
~PassiveStream()892 PassiveStream::~PassiveStream() {
893     LockHolder lh(streamMutex);
894     clear_UNLOCKED();
895     cb_assert(state_ == STREAM_DEAD);
896     cb_assert(buffer.bytes == 0);
897 }
898 
setDead(end_stream_status_t status)899 uint32_t PassiveStream::setDead(end_stream_status_t status) {
900     LockHolder lh(streamMutex);
901     transitionState(STREAM_DEAD);
902     uint32_t unackedBytes = buffer.bytes;
903     clearBuffer();
904     return unackedBytes;
905 }
906 
acceptStream(uint16_t status, uint32_t add_opaque)907 void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
908     LockHolder lh(streamMutex);
909     if (state_ == STREAM_PENDING) {
910         if (status == ENGINE_SUCCESS) {
911             transitionState(STREAM_READING);
912         } else {
913             transitionState(STREAM_DEAD);
914         }
915         readyQ.push(new AddStreamResponse(add_opaque, opaque_, status));
916         if (!itemsReady) {
917             itemsReady = true;
918             lh.unlock();
919             consumer->notifyStreamReady(vb_);
920         }
921     }
922 }
923 
reconnectStream(RCPtr<VBucket> &vb, uint32_t new_opaque, uint64_t start_seqno)924 void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
925                                     uint32_t new_opaque,
926                                     uint64_t start_seqno) {
927     vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
928     vb->getCurrentSnapshot(snap_start_seqno_, snap_end_seqno_);
929 
930     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Attempting to reconnect stream "
931         "with opaque %ld, start seq no %llu, end seq no %llu, snap start seqno "
932         "%llu, and snap end seqno %llu", consumer->logHeader(), vb_, new_opaque,
933         start_seqno, end_seqno_, snap_start_seqno_, snap_end_seqno_);
934 
935     LockHolder lh(streamMutex);
936     last_seqno = start_seqno;
937     readyQ.push(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
938                                   end_seqno_, vb_uuid_, snap_start_seqno_,
939                                   snap_end_seqno_));
940     if (!itemsReady) {
941         itemsReady = true;
942         lh.unlock();
943         consumer->notifyStreamReady(vb_);
944     }
945 }
946 
messageReceived(DcpResponse* resp)947 ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
948     LockHolder lh(buffer.bufMutex);
949     cb_assert(resp);
950 
951     if (state_ == STREAM_DEAD) {
952         delete resp;
953         return ENGINE_KEY_ENOENT;
954     }
955 
956     if (resp->getEvent() == DCP_DELETION || resp->getEvent() == DCP_MUTATION ||
957         resp->getEvent() == DCP_EXPIRATION) {
958         MutationResponse* m = static_cast<MutationResponse*>(resp);
959         uint64_t bySeqno = m->getBySeqno();
960         if (bySeqno <= last_seqno) {
961             LOG(EXTENSION_LOG_INFO, "%s Dropping dcp mutation for vbucket %d "
962                 "with opaque %ld because the byseqno given (%llu) must be "
963                 "larger than %llu", consumer->logHeader(), vb_, opaque_,
964                 bySeqno, last_seqno);
965             delete m;
966             return ENGINE_ERANGE;
967         }
968         last_seqno = bySeqno;
969     }
970 
971     buffer.messages.push(resp);
972     buffer.items++;
973     buffer.bytes += resp->getMessageSize();
974 
975     return ENGINE_SUCCESS;
976 }
977 
processBufferedMessages(uint32_t& processed_bytes)978 process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes) {
979     LockHolder lh(buffer.bufMutex);
980     uint32_t count = 0;
981     uint32_t message_bytes = 0;
982     uint32_t total_bytes_processed = 0;
983     bool failed = false;
984 
985     if (buffer.messages.empty()) {
986         return all_processed;
987     }
988 
989     while (count < PassiveStream::batchSize && !buffer.messages.empty()) {
990         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
991         DcpResponse *response = buffer.messages.front();
992         message_bytes = response->getMessageSize();
993 
994         switch (response->getEvent()) {
995             case DCP_MUTATION:
996                 ret = processMutation(static_cast<MutationResponse*>(response));
997                 break;
998             case DCP_DELETION:
999             case DCP_EXPIRATION:
1000                 ret = processDeletion(static_cast<MutationResponse*>(response));
1001                 break;
1002             case DCP_SNAPSHOT_MARKER:
1003                 processMarker(static_cast<SnapshotMarker*>(response));
1004                 break;
1005             case DCP_SET_VBUCKET:
1006                 processSetVBucketState(static_cast<SetVBucketState*>(response));
1007                 break;
1008             case DCP_STREAM_END:
1009                 transitionState(STREAM_DEAD);
1010                 delete response;
1011                 break;
1012             default:
1013                 abort();
1014         }
1015 
1016         if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
1017             failed = true;
1018             break;
1019         }
1020 
1021         buffer.messages.pop();
1022         buffer.items--;
1023         buffer.bytes -= message_bytes;
1024         count++;
1025         total_bytes_processed += message_bytes;
1026     }
1027 
1028     processed_bytes = total_bytes_processed;
1029 
1030     if (failed) {
1031         return cannot_process;
1032     }
1033 
1034     return all_processed;
1035 }
1036 
processMutation(MutationResponse* mutation)1037 ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
1038     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1039     if (!vb) {
1040         return ENGINE_NOT_MY_VBUCKET;
1041     }
1042 
1043     ENGINE_ERROR_CODE ret;
1044     if (saveSnapshot) {
1045         LockHolder lh = vb->getSnapshotLock();
1046         ret = commitMutation(mutation, vb->isBackfillPhase());
1047         vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start, cur_snapshot_end);
1048         saveSnapshot = false;
1049         lh.unlock();
1050     } else {
1051         ret = commitMutation(mutation, vb->isBackfillPhase());
1052     }
1053 
1054     // We should probably handle these error codes in a better way, but since
1055     // the producer side doesn't do anything with them anyways let's just log
1056     // them for now until we come up with a better solution.
1057     if (ret != ENGINE_SUCCESS) {
1058         LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1059             "process  mutation", consumer->logHeader(), ret);
1060     }
1061 
1062     handleSnapshotEnd(vb, mutation->getBySeqno());
1063 
1064     if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
1065         delete mutation;
1066     }
1067 
1068     return ret;
1069 }
1070 
commitMutation(MutationResponse* mutation, bool backfillPhase)1071 ENGINE_ERROR_CODE PassiveStream::commitMutation(MutationResponse* mutation,
1072                                                 bool backfillPhase) {
1073     if (backfillPhase) {
1074         return engine->getEpStore()->addTAPBackfillItem(*mutation->getItem(),
1075                                                         INITIAL_NRU_VALUE,
1076                                                         false);
1077     } else {
1078         return engine->getEpStore()->setWithMeta(*mutation->getItem(), 0,
1079                                                  consumer->getCookie(), true,
1080                                                  true, INITIAL_NRU_VALUE, false);
1081     }
1082 }
1083 
processDeletion(MutationResponse* deletion)1084 ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
1085     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1086     if (!vb) {
1087         return ENGINE_NOT_MY_VBUCKET;
1088     }
1089 
1090     ENGINE_ERROR_CODE ret;
1091     if (saveSnapshot) {
1092         LockHolder lh = vb->getSnapshotLock();
1093         ret = commitDeletion(deletion, vb->isBackfillPhase());
1094         vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start, cur_snapshot_end);
1095         saveSnapshot = false;
1096         lh.unlock();
1097     } else {
1098         ret = commitDeletion(deletion, vb->isBackfillPhase());
1099     }
1100 
1101     if (ret == ENGINE_KEY_ENOENT) {
1102         ret = ENGINE_SUCCESS;
1103     }
1104 
1105     // We should probably handle these error codes in a better way, but since
1106     // the producer side doesn't do anything with them anyways let's just log
1107     // them for now until we come up with a better solution.
1108     if (ret != ENGINE_SUCCESS) {
1109         LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1110             "process  deletion", consumer->logHeader(), ret);
1111     }
1112 
1113     handleSnapshotEnd(vb, deletion->getBySeqno());
1114 
1115     if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
1116         delete deletion;
1117     }
1118 
1119     return ret;
1120 }
1121 
commitDeletion(MutationResponse* deletion, bool backfillPhase)1122 ENGINE_ERROR_CODE PassiveStream::commitDeletion(MutationResponse* deletion,
1123                                                 bool backfillPhase) {
1124     uint64_t delCas = 0;
1125     ItemMetaData meta = deletion->getItem()->getMetaData();
1126     return engine->getEpStore()->deleteWithMeta(deletion->getItem()->getKey(),
1127                                                 &delCas, deletion->getVBucket(),
1128                                                 consumer->getCookie(), true,
1129                                                 &meta, backfillPhase, false,
1130                                                 deletion->getBySeqno());
1131 }
1132 
processMarker(SnapshotMarker* marker)1133 void PassiveStream::processMarker(SnapshotMarker* marker) {
1134     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1135 
1136     cur_snapshot_start = marker->getStartSeqno();
1137     cur_snapshot_end = marker->getEndSeqno();
1138     cur_snapshot_type = (marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory;
1139     saveSnapshot = true;
1140 
1141     if (vb) {
1142         if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
1143             vb->setBackfillPhase(true);
1144             vb->checkpointManager.checkAndAddNewCheckpoint(0, vb);
1145         } else {
1146             if (marker->getFlags() & MARKER_FLAG_CHK ||
1147                 vb->checkpointManager.getOpenCheckpointId() == 0) {
1148                 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1149                 vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1150             }
1151             vb->setBackfillPhase(false);
1152         }
1153 
1154         if (marker->getFlags() & MARKER_FLAG_ACK) {
1155             cur_snapshot_ack = true;
1156         }
1157     }
1158     delete marker;
1159 }
1160 
processSetVBucketState(SetVBucketState* state)1161 void PassiveStream::processSetVBucketState(SetVBucketState* state) {
1162     engine->getEpStore()->setVBucketState(vb_, state->getState(), true);
1163     delete state;
1164 
1165     LockHolder lh (streamMutex);
1166     readyQ.push(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
1167     if (!itemsReady) {
1168         itemsReady = true;
1169         lh.unlock();
1170         consumer->notifyStreamReady(vb_);
1171     }
1172 }
1173 
handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno)1174 void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
1175     if (byseqno == cur_snapshot_end) {
1176         if (cur_snapshot_type == disk && vb->isBackfillPhase()) {
1177             vb->setBackfillPhase(false);
1178             uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1179             vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1180         }
1181 
1182         if (cur_snapshot_ack) {
1183             LockHolder lh(streamMutex);
1184             readyQ.push(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
1185             if (!itemsReady) {
1186                 itemsReady = true;
1187                 lh.unlock();
1188                 consumer->notifyStreamReady(vb_);
1189             }
1190             cur_snapshot_ack = false;
1191         }
1192         cur_snapshot_type = none;
1193         vb->setCurrentSnapshot(byseqno, byseqno);
1194     }
1195 }
1196 
addStats(ADD_STAT add_stat, const void *c)1197 void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
1198     Stream::addStats(add_stat, c);
1199 
1200     const int bsize = 128;
1201     char buf[bsize];
1202     snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(), vb_);
1203     add_casted_stat(buf, buffer.items, add_stat, c);
1204     snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(), vb_);
1205     add_casted_stat(buf, buffer.bytes, add_stat, c);
1206     snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
1207     add_casted_stat(buf, itemsReady ? "true" : "false", add_stat, c);
1208     snprintf(buf, bsize, "%s:stream_%d_last_received_seqno", name_.c_str(), vb_);
1209     add_casted_stat(buf, last_seqno, add_stat, c);
1210 
1211     snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type", name_.c_str(), vb_);
1212     add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type), add_stat, c);
1213 
1214     if (cur_snapshot_type != none) {
1215         snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start", name_.c_str(), vb_);
1216         add_casted_stat(buf, cur_snapshot_start, add_stat, c);
1217         snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end", name_.c_str(), vb_);
1218         add_casted_stat(buf, cur_snapshot_end, add_stat, c);
1219     }
1220 }
1221 
next()1222 DcpResponse* PassiveStream::next() {
1223     LockHolder lh(streamMutex);
1224 
1225     if (readyQ.empty()) {
1226         itemsReady = false;
1227         return NULL;
1228     }
1229 
1230     DcpResponse* response = readyQ.front();
1231     readyQ.pop();
1232     return response;
1233 }
1234 
clearBuffer()1235 void PassiveStream::clearBuffer() {
1236     LockHolder lh(buffer.bufMutex);
1237 
1238     while (!buffer.messages.empty()) {
1239         DcpResponse* resp = buffer.messages.front();
1240         buffer.messages.pop();
1241         delete resp;
1242     }
1243 
1244     buffer.bytes = 0;
1245     buffer.items = 0;
1246 }
1247 
transitionState(stream_state_t newState)1248 void PassiveStream::transitionState(stream_state_t newState) {
1249     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
1250         consumer->logHeader(), vb_, stateName(state_), stateName(newState));
1251 
1252     if (state_ == newState) {
1253         return;
1254     }
1255 
1256     switch (state_) {
1257         case STREAM_PENDING:
1258             cb_assert(newState == STREAM_READING || newState == STREAM_DEAD);
1259             break;
1260         case STREAM_READING:
1261             cb_assert(newState == STREAM_PENDING || newState == STREAM_DEAD);
1262             break;
1263         default:
1264             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
1265                 "to %s", consumer->logHeader(), vb_, stateName(state_),
1266                 stateName(newState));
1267             abort();
1268     }
1269 
1270     state_ = newState;
1271 }
1272