1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include "ep_engine.h"
21 #include "failover-table.h"
22 #include "kvstore.h"
23 #include "statwriter.h"
24 #include "dcp-stream.h"
25 #include "dcp-consumer.h"
26 #include "dcp-producer.h"
27 #include "dcp-response.h"
28 
29 #define DCP_BACKFILL_SLEEP_TIME 2
30 
snapshotTypeToString(snapshot_type_t type)31 static const char* snapshotTypeToString(snapshot_type_t type) {
32     static const char * const snapshotTypes[] = { "none", "disk", "memory" };
33     cb_assert(type >= none && type <= memory);
34     return snapshotTypes[type];
35 }
36 
37 const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
38 const size_t PassiveStream::batchSize = 10;
39 
40 class SnapshotMarkerCallback : public Callback<SeqnoRange> {
41 public:
SnapshotMarkerCallback(stream_t s)42     SnapshotMarkerCallback(stream_t s)
43         : stream(s) {
44         cb_assert(s->getType() == STREAM_ACTIVE);
45     }
46 
callback(SeqnoRange &range)47     void callback(SeqnoRange &range) {
48         uint64_t st = range.getStartSeqno();
49         uint64_t en = range.getEndSeqno();
50         static_cast<ActiveStream*>(stream.get())->markDiskSnapshot(st, en);
51     }
52 
53 private:
54     stream_t stream;
55 };
56 
57 class CacheCallback : public Callback<CacheLookup> {
58 public:
CacheCallback(EventuallyPersistentEngine* e, stream_t &s)59     CacheCallback(EventuallyPersistentEngine* e, stream_t &s)
60         : engine_(e), stream_(s) {
61         Stream *str = stream_.get();
62         if (str) {
63             cb_assert(str->getType() == STREAM_ACTIVE);
64         }
65     }
66 
67     void callback(CacheLookup &lookup);
68 
69 private:
70     EventuallyPersistentEngine* engine_;
71     stream_t stream_;
72 };
73 
callback(CacheLookup &lookup)74 void CacheCallback::callback(CacheLookup &lookup) {
75     RCPtr<VBucket> vb = engine_->getEpStore()->getVBucket(lookup.getVBucketId());
76     if (!vb) {
77         setStatus(ENGINE_SUCCESS);
78         return;
79     }
80 
81     int bucket_num(0);
82     LockHolder lh = vb->ht.getLockedBucket(lookup.getKey(), &bucket_num);
83     StoredValue *v = vb->ht.unlocked_find(lookup.getKey(), bucket_num, false, false);
84     if (v && v->isResident() && v->getBySeqno() == lookup.getBySeqno()) {
85         Item* it = v->toItem(false, lookup.getVBucketId());
86         lh.unlock();
87         static_cast<ActiveStream*>(stream_.get())->backfillReceived(it);
88         setStatus(ENGINE_KEY_EEXISTS);
89     } else {
90         setStatus(ENGINE_SUCCESS);
91     }
92 }
93 
94 class DiskCallback : public Callback<GetValue> {
95 public:
DiskCallback(stream_t &s)96     DiskCallback(stream_t &s)
97         : stream_(s) {
98         Stream *str = stream_.get();
99         if (str) {
100             cb_assert(str->getType() == STREAM_ACTIVE);
101         }
102     }
103 
callback(GetValue &val)104     void callback(GetValue &val) {
105         cb_assert(val.getValue());
106         ActiveStream* active_stream = static_cast<ActiveStream*>(stream_.get());
107         active_stream->backfillReceived(val.getValue());
108     }
109 
110 private:
111     stream_t stream_;
112 };
113 
114 class DCPBackfill : public GlobalTask {
115 public:
DCPBackfill(EventuallyPersistentEngine* e, stream_t s, uint64_t start_seqno, uint64_t end_seqno, const Priority &p, double sleeptime = 0, bool shutdown = false)116     DCPBackfill(EventuallyPersistentEngine* e, stream_t s,
117                 uint64_t start_seqno, uint64_t end_seqno, const Priority &p,
118                 double sleeptime = 0, bool shutdown = false)
119         : GlobalTask(e, p, sleeptime, shutdown), engine(e), stream(s),
120           startSeqno(start_seqno), endSeqno(end_seqno) {
121         cb_assert(stream->getType() == STREAM_ACTIVE);
122     }
123 
124     bool run();
125 
126     std::string getDescription();
127 
128 private:
129     EventuallyPersistentEngine *engine;
130     stream_t                    stream;
131     uint64_t                    startSeqno;
132     uint64_t                    endSeqno;
133 };
134 
run()135 bool DCPBackfill::run() {
136     uint16_t vbid = stream->getVBucket();
137 
138     if (engine->getEpStore()->isMemoryUsageTooHigh()) {
139         LOG(EXTENSION_LOG_INFO, "VBucket %d dcp backfill task temporarily "
140                 "suspended  because the current memory usage is too high",
141                 vbid);
142         snooze(DCP_BACKFILL_SLEEP_TIME);
143         return true;
144     }
145 
146     uint64_t lastPersistedSeqno =
147         engine->getEpStore()->getLastPersistedSeqno(vbid);
148     uint64_t diskSeqno =
149         engine->getEpStore()->getRWUnderlying(vbid)->getLastPersistedSeqno(vbid);
150 
151     if (lastPersistedSeqno < endSeqno) {
152         LOG(EXTENSION_LOG_WARNING, "Rescheduling backfill for vbucket %d "
153             "because backfill up to seqno %llu is needed but only up to "
154             "%llu is persisted (disk %llu)", vbid, endSeqno,
155             lastPersistedSeqno, diskSeqno);
156         snooze(DCP_BACKFILL_SLEEP_TIME);
157         return true;
158     }
159 
160     KVStore* kvstore = engine->getEpStore()->getROUnderlying(vbid);
161     size_t numItems = kvstore->getNumItems(vbid, startSeqno,
162                                            std::numeric_limits<uint64_t>::max());
163     static_cast<ActiveStream*>(stream.get())->incrBackfillRemaining(numItems);
164 
165     shared_ptr<Callback<GetValue> > cb(new DiskCallback(stream));
166     shared_ptr<Callback<CacheLookup> > cl(new CacheCallback(engine, stream));
167     shared_ptr<Callback<SeqnoRange> > sr(new SnapshotMarkerCallback(stream));
168     kvstore->dump(vbid, startSeqno, cb, cl, sr);
169 
170     static_cast<ActiveStream*>(stream.get())->completeBackfill();
171 
172     LOG(EXTENSION_LOG_WARNING, "Backfill task (%llu to %llu) finished for vb %d"
173         " disk seqno %llu memory seqno %llu", startSeqno, endSeqno,
174         stream->getVBucket(), diskSeqno, lastPersistedSeqno);
175 
176     return false;
177 }
178 
getDescription()179 std::string DCPBackfill::getDescription() {
180     std::stringstream ss;
181     ss << "DCP backfill for vbucket " << stream->getVBucket();
182     return ss.str();
183 }
184 
Stream(const std::string &name, uint32_t flags, uint32_t opaque, uint16_t vb, uint64_t start_seqno, uint64_t end_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)185 Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
186                uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
187                uint64_t vb_uuid, uint64_t snap_start_seqno,
188                uint64_t snap_end_seqno)
189     : name_(name), flags_(flags), opaque_(opaque), vb_(vb),
190       start_seqno_(start_seqno), end_seqno_(end_seqno), vb_uuid_(vb_uuid),
191       snap_start_seqno_(snap_start_seqno),
192       snap_end_seqno_(snap_end_seqno),
193       state_(STREAM_PENDING), itemsReady(false) {
194 }
195 
clear_UNLOCKED()196 void Stream::clear_UNLOCKED() {
197     while (!readyQ.empty()) {
198         DcpResponse* resp = readyQ.front();
199         delete resp;
200         readyQ.pop();
201     }
202 }
203 
stateName(stream_state_t st) const204 const char * Stream::stateName(stream_state_t st) const {
205     static const char * const stateNames[] = {
206         "pending", "backfilling", "in-memory", "takeover-send", "takeover-wait",
207         "reading", "dead"
208     };
209     cb_assert(st >= STREAM_PENDING && st <= STREAM_DEAD);
210     return stateNames[st];
211 }
212 
addStats(ADD_STAT add_stat, const void *c)213 void Stream::addStats(ADD_STAT add_stat, const void *c) {
214     const int bsize = 128;
215     char buffer[bsize];
216     snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(), vb_);
217     add_casted_stat(buffer, flags_, add_stat, c);
218     snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(), vb_);
219     add_casted_stat(buffer, opaque_, add_stat, c);
220     snprintf(buffer, bsize, "%s:stream_%d_start_seqno", name_.c_str(), vb_);
221     add_casted_stat(buffer, start_seqno_, add_stat, c);
222     snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(), vb_);
223     add_casted_stat(buffer, end_seqno_, add_stat, c);
224     snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(), vb_);
225     add_casted_stat(buffer, vb_uuid_, add_stat, c);
226     snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno", name_.c_str(), vb_);
227     add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
228     snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno", name_.c_str(), vb_);
229     add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
230     snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(), vb_);
231     add_casted_stat(buffer, stateName(state_), add_stat, c);
232 }
233 
ActiveStream(EventuallyPersistentEngine* e, DcpProducer* p, const std::string &n, uint32_t flags, uint32_t opaque, uint16_t vb, uint64_t st_seqno, uint64_t en_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)234 ActiveStream::ActiveStream(EventuallyPersistentEngine* e, DcpProducer* p,
235                            const std::string &n, uint32_t flags,
236                            uint32_t opaque, uint16_t vb, uint64_t st_seqno,
237                            uint64_t en_seqno, uint64_t vb_uuid,
238                            uint64_t snap_start_seqno, uint64_t snap_end_seqno)
239     :  Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
240               snap_start_seqno, snap_end_seqno),
241        lastReadSeqno(st_seqno), lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
242        takeoverState(vbucket_state_pending), backfillRemaining(0),
243        itemsFromBackfill(0), itemsFromMemory(0), firstMarkerSent(false),
244        waitForSnapshot(0), engine(e), producer(p),
245        isBackfillTaskRunning(false) {
246 
247     const char* type = "";
248     if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
249         type = "takeover ";
250         end_seqno_ = dcpMaxSeqno;
251     }
252 
253     if (start_seqno_ >= end_seqno_) {
254         endStream(END_STREAM_OK);
255         itemsReady = true;
256     }
257 
258     type_ = STREAM_ACTIVE;
259 
260     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) %sstream created with start seqno "
261         "%llu and end seqno %llu", producer->logHeader(), vb, type, st_seqno,
262         en_seqno);
263 }
264 
next()265 DcpResponse* ActiveStream::next() {
266     LockHolder lh(streamMutex);
267 
268     stream_state_t initState = state_;
269 
270     DcpResponse* response = NULL;
271     switch (state_) {
272         case STREAM_PENDING:
273             break;
274         case STREAM_BACKFILLING:
275             response = backfillPhase();
276             break;
277         case STREAM_IN_MEMORY:
278             response = inMemoryPhase();
279             break;
280         case STREAM_TAKEOVER_SEND:
281             response = takeoverSendPhase();
282             break;
283         case STREAM_TAKEOVER_WAIT:
284             response = takeoverWaitPhase();
285             break;
286         case STREAM_DEAD:
287             response = deadPhase();
288             break;
289         default:
290             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid state '%s'",
291                 producer->logHeader(), vb_, stateName(state_));
292             abort();
293     }
294 
295     if (state_ != STREAM_DEAD && initState != state_ && !response) {
296         lh.unlock();
297         return next();
298     }
299 
300     itemsReady = response ? true : false;
301     return response;
302 }
303 
markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno)304 void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
305     LockHolder lh(streamMutex);
306     if (state_ != STREAM_BACKFILLING) {
307         return;
308     }
309 
310     startSeqno = std::min(snap_start_seqno_, startSeqno);
311     firstMarkerSent = true;
312 
313     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Sending disk snapshot with start "
314         "seqno %llu and end seqno %llu", producer->logHeader(), vb_, startSeqno,
315         endSeqno);
316     readyQ.push(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
317                                    MARKER_FLAG_DISK));
318     RCPtr<VBucket> vb = engine->getVBucket(vb_);
319     if (!vb) {
320         endStream(END_STREAM_STATE);
321     } else {
322         if (endSeqno > end_seqno_) {
323             endSeqno = end_seqno_;
324         }
325         // Only re-register the cursor if we still need to get memory snapshots
326         CursorRegResult result =
327             vb->checkpointManager.registerTAPCursorBySeqno(name_, endSeqno);
328         curChkSeqno = result.first;
329     }
330 
331     if (!itemsReady) {
332         itemsReady = true;
333         lh.unlock();
334         producer->notifyStreamReady(vb_, false);
335     }
336 }
337 
backfillReceived(Item* itm)338 void ActiveStream::backfillReceived(Item* itm) {
339     LockHolder lh(streamMutex);
340     if (state_ == STREAM_BACKFILLING) {
341         readyQ.push(new MutationResponse(itm, opaque_));
342         lastReadSeqno = itm->getBySeqno();
343 
344         if (!itemsReady) {
345             itemsReady = true;
346             lh.unlock();
347             producer->notifyStreamReady(vb_, false);
348         }
349     } else {
350         delete itm;
351     }
352 }
353 
completeBackfill()354 void ActiveStream::completeBackfill() {
355     LockHolder lh(streamMutex);
356 
357     if (state_ == STREAM_BACKFILLING) {
358         isBackfillTaskRunning = false;
359         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Backfill complete, %d items read"
360             " from disk, last seqno read: %ld", producer->logHeader(), vb_,
361             itemsFromBackfill, lastReadSeqno);
362 
363         if (!itemsReady) {
364             itemsReady = true;
365             lh.unlock();
366             producer->notifyStreamReady(vb_, false);
367         }
368     }
369 }
370 
snapshotMarkerAckReceived()371 void ActiveStream::snapshotMarkerAckReceived() {
372     LockHolder lh (streamMutex);
373     waitForSnapshot--;
374 
375     if (!itemsReady && waitForSnapshot == 0) {
376         itemsReady = true;
377         lh.unlock();
378         producer->notifyStreamReady(vb_, true);
379     }
380 }
381 
setVBucketStateAckRecieved()382 void ActiveStream::setVBucketStateAckRecieved() {
383     LockHolder lh(streamMutex);
384     if (state_ == STREAM_TAKEOVER_WAIT) {
385         if (takeoverState == vbucket_state_pending) {
386             RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
387             engine->getEpStore()->setVBucketState(vb_, vbucket_state_dead,
388                                                   false, false);
389             takeoverState = vbucket_state_active;
390             transitionState(STREAM_TAKEOVER_SEND);
391             LOG(EXTENSION_LOG_INFO, "%s (vb %d) Receive ack for set vbucket "
392                 "state to pending message", producer->logHeader(), vb_);
393         } else {
394             LOG(EXTENSION_LOG_INFO, "%s (vb %d) Receive ack for set vbucket "
395                 "state to active message", producer->logHeader(), vb_);
396             endStream(END_STREAM_OK);
397         }
398 
399         if (!itemsReady) {
400             itemsReady = true;
401             lh.unlock();
402             producer->notifyStreamReady(vb_, true);
403         }
404     } else {
405         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Unexpected ack for set vbucket "
406             "op on stream '%s' state '%s'", producer->logHeader(), vb_,
407             name_.c_str(), stateName(state_));
408     }
409 }
410 
backfillPhase()411 DcpResponse* ActiveStream::backfillPhase() {
412     DcpResponse* resp = nextQueuedItem();
413 
414     if (resp && backfillRemaining > 0 &&
415         (resp->getEvent() == DCP_MUTATION ||
416          resp->getEvent() == DCP_DELETION ||
417          resp->getEvent() == DCP_EXPIRATION)) {
418         backfillRemaining--;
419     }
420 
421     if (!isBackfillTaskRunning && readyQ.empty()) {
422         backfillRemaining = 0;
423         if (lastReadSeqno >= end_seqno_) {
424             endStream(END_STREAM_OK);
425         } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
426             transitionState(STREAM_TAKEOVER_SEND);
427         } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
428             endStream(END_STREAM_OK);
429         } else {
430             transitionState(STREAM_IN_MEMORY);
431         }
432 
433         if (!resp) {
434             resp = nextQueuedItem();
435         }
436     }
437 
438     return resp;
439 }
440 
inMemoryPhase()441 DcpResponse* ActiveStream::inMemoryPhase() {
442     if (!readyQ.empty()) {
443         return nextQueuedItem();
444     }
445 
446     if (lastSentSeqno >= end_seqno_) {
447         endStream(END_STREAM_OK);
448     } else {
449         nextCheckpointItem();
450     }
451 
452     return nextQueuedItem();
453 }
454 
takeoverSendPhase()455 DcpResponse* ActiveStream::takeoverSendPhase() {
456     if (!readyQ.empty()) {
457         return nextQueuedItem();
458     } else {
459         nextCheckpointItem();
460         if (!readyQ.empty()) {
461             return nextQueuedItem();
462         }
463     }
464 
465     if (waitForSnapshot != 0) {
466         return NULL;
467     }
468 
469     DcpResponse* resp = new SetVBucketState(opaque_, vb_, takeoverState);
470     transitionState(STREAM_TAKEOVER_WAIT);
471     return resp;
472 }
473 
takeoverWaitPhase()474 DcpResponse* ActiveStream::takeoverWaitPhase() {
475     return nextQueuedItem();
476 }
477 
deadPhase()478 DcpResponse* ActiveStream::deadPhase() {
479     return nextQueuedItem();
480 }
481 
addStats(ADD_STAT add_stat, const void *c)482 void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
483     Stream::addStats(add_stat, c);
484 
485     const int bsize = 128;
486     char buffer[bsize];
487     snprintf(buffer, bsize, "%s:stream_%d_backfilled", name_.c_str(), vb_);
488     add_casted_stat(buffer, itemsFromBackfill, add_stat, c);
489     snprintf(buffer, bsize, "%s:stream_%d_memory", name_.c_str(), vb_);
490     add_casted_stat(buffer, itemsFromMemory, add_stat, c);
491     snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
492     add_casted_stat(buffer, lastSentSeqno, add_stat, c);
493     snprintf(buffer, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
494     add_casted_stat(buffer, itemsReady ? "true" : "false", add_stat, c);
495 }
496 
addTakeoverStats(ADD_STAT add_stat, const void *cookie)497 void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
498     LockHolder lh(streamMutex);
499 
500     RCPtr<VBucket> vb = engine->getVBucket(vb_);
501     add_casted_stat("name", name_, add_stat, cookie);
502     if (!vb || state_ == STREAM_DEAD) {
503         add_casted_stat("status", "completed", add_stat, cookie);
504         add_casted_stat("estimate", 0, add_stat, cookie);
505         add_casted_stat("backfillRemaining", 0, add_stat, cookie);
506         add_casted_stat("estimate", 0, add_stat, cookie);
507         return;
508     }
509 
510     size_t total = backfillRemaining;
511     if (state_ == STREAM_BACKFILLING) {
512         add_casted_stat("status", "backfilling", add_stat, cookie);
513     } else {
514         add_casted_stat("status", "in-memory", add_stat, cookie);
515     }
516     add_casted_stat("backfillRemaining", backfillRemaining, add_stat, cookie);
517 
518     item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
519     size_t vb_items = vb->getNumItems(iep);
520     size_t chk_items = vb_items > 0 ?
521                 vb->checkpointManager.getNumItemsForTAPConnection(name_) : 0;
522     size_t del_items = engine->getEpStore()->getRWUnderlying(vb_)->
523                                                     getNumPersistedDeletes(vb_);
524 
525     if (end_seqno_ < curChkSeqno) {
526         chk_items = 0;
527     } else if ((end_seqno_ - curChkSeqno) < chk_items) {
528         chk_items = end_seqno_ - curChkSeqno + 1;
529     }
530     total += chk_items;
531 
532     add_casted_stat("estimate", total, add_stat, cookie);
533     add_casted_stat("chk_items", chk_items, add_stat, cookie);
534     add_casted_stat("vb_items", vb_items, add_stat, cookie);
535     add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
536 }
537 
nextQueuedItem()538 DcpResponse* ActiveStream::nextQueuedItem() {
539     if (!readyQ.empty()) {
540         DcpResponse* response = readyQ.front();
541         if (response->getEvent() == DCP_MUTATION ||
542             response->getEvent() == DCP_DELETION ||
543             response->getEvent() == DCP_EXPIRATION) {
544             lastSentSeqno = dynamic_cast<MutationResponse*>(response)->getBySeqno();
545 
546             if (state_ == STREAM_BACKFILLING) {
547                 itemsFromBackfill++;
548             } else {
549                 itemsFromMemory++;
550             }
551         }
552         readyQ.pop();
553         return response;
554     }
555     return NULL;
556 }
557 
nextCheckpointItem()558 void ActiveStream::nextCheckpointItem() {
559     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
560 
561     bool mark = false;
562     std::list<queued_item> items;
563     std::list<MutationResponse*> mutations;
564     vbucket->checkpointManager.getAllItemsForCursor(name_, items);
565     if (vbucket->checkpointManager.getNumCheckpoints() > 1) {
566         engine->getEpStore()->wakeUpCheckpointRemover();
567     }
568 
569     if (items.empty()) {
570         return;
571     }
572 
573     if (items.front()->getOperation() == queue_op_checkpoint_start) {
574         mark = true;
575     }
576 
577     while (!items.empty()) {
578         queued_item qi = items.front();
579         items.pop_front();
580 
581         if (qi->getOperation() == queue_op_set ||
582             qi->getOperation() == queue_op_del) {
583             curChkSeqno = qi->getBySeqno();
584             lastReadSeqno = qi->getBySeqno();
585 
586             mutations.push_back(new MutationResponse(qi, opaque_));
587         } else if (qi->getOperation() == queue_op_checkpoint_start) {
588             snapshot(mutations, mark);
589             mark = true;
590         }
591     }
592 
593     if (mutations.empty()) {
594         // If we only got checkpoint start or ends check to see if there are
595         // any more snapshots before pausing the stream.
596         nextCheckpointItem();
597     } else {
598         snapshot(mutations, mark);
599     }
600 }
601 
snapshot(std::list<MutationResponse*>& items, bool mark)602 void ActiveStream::snapshot(std::list<MutationResponse*>& items, bool mark) {
603     if (items.empty()) {
604         return;
605     }
606 
607     uint32_t flags = MARKER_FLAG_MEMORY;
608     uint64_t snapStart = items.front()->getBySeqno();
609     uint64_t snapEnd = items.back()->getBySeqno();
610 
611     if (mark) {
612         flags |= MARKER_FLAG_CHK;
613     }
614 
615     if (state_ == STREAM_TAKEOVER_SEND) {
616         waitForSnapshot++;
617         flags |= MARKER_FLAG_ACK;
618     }
619 
620     if (!firstMarkerSent) {
621         snapStart = std::min(snap_start_seqno_, snapStart);
622         firstMarkerSent = true;
623     }
624 
625     readyQ.push(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd, flags));
626     while(!items.empty()) {
627         readyQ.push(items.front());
628         items.pop_front();
629     }
630 }
631 
setDead(end_stream_status_t status)632 uint32_t ActiveStream::setDead(end_stream_status_t status) {
633     LockHolder lh(streamMutex);
634     endStream(status);
635 
636     if (!itemsReady && status != END_STREAM_DISCONNECTED) {
637         itemsReady = true;
638         lh.unlock();
639         producer->notifyStreamReady(vb_, true);
640     }
641     return 0;
642 }
643 
notifySeqnoAvailable(uint64_t seqno)644 void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
645     LockHolder lh(streamMutex);
646     if (state_ != STREAM_DEAD) {
647         if (!itemsReady) {
648             itemsReady = true;
649             lh.unlock();
650             producer->notifyStreamReady(vb_, true);
651         }
652     }
653 }
654 
endStream(end_stream_status_t reason)655 void ActiveStream::endStream(end_stream_status_t reason) {
656     if (state_ != STREAM_DEAD) {
657         if (reason != END_STREAM_DISCONNECTED) {
658             readyQ.push(new StreamEndResponse(opaque_, reason, vb_));
659         }
660         transitionState(STREAM_DEAD);
661         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream closing, %llu items sent"
662             " from disk, %llu items sent from memory, %llu was last seqno sent",
663             producer->logHeader(), vb_, itemsFromBackfill, itemsFromMemory,
664             lastSentSeqno);
665     }
666 }
667 
scheduleBackfill()668 void ActiveStream::scheduleBackfill() {
669     if (!isBackfillTaskRunning) {
670         RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
671         if (!vbucket) {
672             return;
673         }
674 
675         CursorRegResult result =
676             vbucket->checkpointManager.registerTAPCursorBySeqno(name_,
677                                                                 lastReadSeqno);
678         curChkSeqno = result.first;
679         bool isFirstItem = result.second;
680 
681         cb_assert(lastReadSeqno <= curChkSeqno);
682         uint64_t backfillStart = lastReadSeqno + 1;
683 
684         /* We need to find the minimum seqno that needs to be backfilled in
685          * order to make sure that we don't miss anything when transitioning
686          * to a memory snapshot. The backfill task will always make sure that
687          * the backfill end seqno is contained in the backfill.
688          */
689         uint64_t backfillEnd = 0;
690         if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) { // disk backfill only
691             backfillEnd = end_seqno_;
692         } else { // disk backfill + in-memory streaming
693             if (backfillStart < curChkSeqno) {
694                 if (curChkSeqno > end_seqno_) {
695                     backfillEnd = end_seqno_;
696                 } else {
697                     backfillEnd = curChkSeqno - 1;
698                 }
699             }
700         }
701 
702         bool tryBackfill = isFirstItem || flags_ & DCP_ADD_STREAM_FLAG_DISKONLY;
703 
704         if (backfillStart <= backfillEnd && tryBackfill) {
705             ExTask task = new DCPBackfill(engine, this, backfillStart, backfillEnd,
706                                           Priority::TapBgFetcherPriority, 0, false);
707             ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
708             isBackfillTaskRunning = true;
709         } else {
710             if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
711                 endStream(END_STREAM_OK);
712             } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
713                 transitionState(STREAM_TAKEOVER_SEND);
714             } else {
715                 transitionState(STREAM_IN_MEMORY);
716             }
717             itemsReady = true;
718         }
719     }
720 }
721 
transitionState(stream_state_t newState)722 void ActiveStream::transitionState(stream_state_t newState) {
723     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
724         producer->logHeader(), vb_, stateName(state_), stateName(newState));
725 
726     if (state_ == newState) {
727         return;
728     }
729 
730     switch (state_) {
731         case STREAM_PENDING:
732             cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
733             break;
734         case STREAM_BACKFILLING:
735             cb_assert(newState == STREAM_IN_MEMORY ||
736                    newState == STREAM_TAKEOVER_SEND ||
737                    newState == STREAM_DEAD);
738             break;
739         case STREAM_IN_MEMORY:
740             cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
741             break;
742         case STREAM_TAKEOVER_SEND:
743             cb_assert(newState == STREAM_TAKEOVER_WAIT || newState == STREAM_DEAD);
744             break;
745         case STREAM_TAKEOVER_WAIT:
746             cb_assert(newState == STREAM_TAKEOVER_SEND || newState == STREAM_DEAD);
747             break;
748         default:
749             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
750                 "to %s", producer->logHeader(), vb_, stateName(state_),
751                 stateName(newState));
752             abort();
753     }
754 
755     state_ = newState;
756 
757     if (newState == STREAM_BACKFILLING) {
758         scheduleBackfill();
759     } else if (newState == STREAM_TAKEOVER_SEND) {
760         nextCheckpointItem();
761     } else if (newState == STREAM_DEAD) {
762         RCPtr<VBucket> vb = engine->getVBucket(vb_);
763         if (vb) {
764             vb->checkpointManager.removeTAPCursor(name_);
765         }
766     }
767 }
768 
getItemsRemaining()769 size_t ActiveStream::getItemsRemaining() {
770     RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
771 
772     if (!vbucket || state_ == STREAM_DEAD) {
773         return 0;
774     }
775 
776     uint64_t high_seqno = vbucket->getHighSeqno();
777 
778     if (end_seqno_ < high_seqno) {
779         if (end_seqno_ > lastSentSeqno) {
780             return (end_seqno_ - lastSentSeqno);
781         }
782     } else {
783         if (high_seqno > lastSentSeqno) {
784             return (high_seqno - lastSentSeqno);
785         }
786     }
787 
788     return 0;
789 }
790 
NotifierStream(EventuallyPersistentEngine* e, DcpProducer* p, const std::string &name, uint32_t flags, uint32_t opaque, uint16_t vb, uint64_t st_seqno, uint64_t en_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)791 NotifierStream::NotifierStream(EventuallyPersistentEngine* e, DcpProducer* p,
792                                const std::string &name, uint32_t flags,
793                                uint32_t opaque, uint16_t vb, uint64_t st_seqno,
794                                uint64_t en_seqno, uint64_t vb_uuid,
795                                uint64_t snap_start_seqno,
796                                uint64_t snap_end_seqno)
797     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
798              snap_start_seqno, snap_end_seqno),
799       producer(p) {
800     LockHolder lh(streamMutex);
801     RCPtr<VBucket> vbucket = e->getVBucket(vb_);
802     if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
803         readyQ.push(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
804         transitionState(STREAM_DEAD);
805         itemsReady = true;
806     }
807 
808     type_ = STREAM_NOTIFIER;
809 
810     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) stream created with start seqno "
811         "%llu and end seqno %llu", producer->logHeader(), vb, st_seqno,
812         en_seqno);
813 }
814 
setDead(end_stream_status_t status)815 uint32_t NotifierStream::setDead(end_stream_status_t status) {
816     LockHolder lh(streamMutex);
817     if (state_ != STREAM_DEAD) {
818         transitionState(STREAM_DEAD);
819         if (status != END_STREAM_DISCONNECTED) {
820             readyQ.push(new StreamEndResponse(opaque_, status, vb_));
821             if (!itemsReady) {
822                 itemsReady = true;
823                 lh.unlock();
824                 producer->notifyStreamReady(vb_, true);
825             }
826         }
827     }
828     return 0;
829 }
830 
notifySeqnoAvailable(uint64_t seqno)831 void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
832     LockHolder lh(streamMutex);
833     if (state_ != STREAM_DEAD && start_seqno_ < seqno) {
834         readyQ.push(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
835         transitionState(STREAM_DEAD);
836         if (!itemsReady) {
837             itemsReady = true;
838             lh.unlock();
839             producer->notifyStreamReady(vb_, true);
840         }
841     }
842 }
843 
next()844 DcpResponse* NotifierStream::next() {
845     LockHolder lh(streamMutex);
846 
847     if (readyQ.empty()) {
848         itemsReady = false;
849         return NULL;
850     }
851 
852     DcpResponse* response = readyQ.front();
853     readyQ.pop();
854 
855     return response;
856 }
857 
transitionState(stream_state_t newState)858 void NotifierStream::transitionState(stream_state_t newState) {
859     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
860         producer->logHeader(), vb_, stateName(state_), stateName(newState));
861 
862     if (state_ == newState) {
863         return;
864     }
865 
866     switch (state_) {
867         case STREAM_PENDING:
868             cb_assert(newState == STREAM_DEAD);
869             break;
870         default:
871             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
872                 "to %s", producer->logHeader(), vb_, stateName(state_),
873                 stateName(newState));
874             abort();
875     }
876 
877     state_ = newState;
878 }
879 
PassiveStream(EventuallyPersistentEngine* e, DcpConsumer* c, const std::string &name, uint32_t flags, uint32_t opaque, uint16_t vb, uint64_t st_seqno, uint64_t en_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)880 PassiveStream::PassiveStream(EventuallyPersistentEngine* e, DcpConsumer* c,
881                              const std::string &name, uint32_t flags,
882                              uint32_t opaque, uint16_t vb, uint64_t st_seqno,
883                              uint64_t en_seqno, uint64_t vb_uuid,
884                              uint64_t snap_start_seqno, uint64_t snap_end_seqno)
885     : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
886              snap_start_seqno, snap_end_seqno),
887       engine(e), consumer(c), last_seqno(st_seqno), cur_snapshot_start(0),
888       cur_snapshot_end(0), cur_snapshot_type(none), cur_snapshot_ack(false),
889       saveSnapshot(false) {
890     LockHolder lh(streamMutex);
891     readyQ.push(new StreamRequest(vb, opaque, flags, st_seqno, en_seqno,
892                                   vb_uuid, snap_start_seqno, snap_end_seqno));
893     itemsReady = true;
894     type_ = STREAM_PASSIVE;
895 
896     const char* type = (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) ? "takeover" : "";
897     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Attempting to add %s stream with "
898         "start seqno %llu, end seqno %llu, vbucket uuid %llu, snap start seqno "
899         "%llu, and snap end seqno %llu", consumer->logHeader(), vb, type,
900         st_seqno, en_seqno, vb_uuid, snap_start_seqno, snap_end_seqno);
901 }
902 
~PassiveStream()903 PassiveStream::~PassiveStream() {
904     LockHolder lh(streamMutex);
905     clear_UNLOCKED();
906     cb_assert(state_ == STREAM_DEAD);
907     cb_assert(buffer.bytes == 0);
908 }
909 
setDead(end_stream_status_t status)910 uint32_t PassiveStream::setDead(end_stream_status_t status) {
911     LockHolder lh(streamMutex);
912     transitionState(STREAM_DEAD);
913     uint32_t unackedBytes = buffer.bytes;
914     clearBuffer();
915     return unackedBytes;
916 }
917 
acceptStream(uint16_t status, uint32_t add_opaque)918 void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
919     LockHolder lh(streamMutex);
920     if (state_ == STREAM_PENDING) {
921         if (status == ENGINE_SUCCESS) {
922             transitionState(STREAM_READING);
923         } else {
924             transitionState(STREAM_DEAD);
925         }
926         readyQ.push(new AddStreamResponse(add_opaque, opaque_, status));
927         if (!itemsReady) {
928             itemsReady = true;
929             lh.unlock();
930             consumer->notifyStreamReady(vb_);
931         }
932     }
933 }
934 
reconnectStream(RCPtr<VBucket> &vb, uint32_t new_opaque, uint64_t start_seqno)935 void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
936                                     uint32_t new_opaque,
937                                     uint64_t start_seqno) {
938     vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
939     vb->getCurrentSnapshot(snap_start_seqno_, snap_end_seqno_);
940 
941     LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Attempting to reconnect stream "
942         "with opaque %ld, start seq no %llu, end seq no %llu, snap start seqno "
943         "%llu, and snap end seqno %llu", consumer->logHeader(), vb_, new_opaque,
944         start_seqno, end_seqno_, snap_start_seqno_, snap_end_seqno_);
945 
946     LockHolder lh(streamMutex);
947     last_seqno = start_seqno;
948     readyQ.push(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
949                                   end_seqno_, vb_uuid_, snap_start_seqno_,
950                                   snap_end_seqno_));
951     if (!itemsReady) {
952         itemsReady = true;
953         lh.unlock();
954         consumer->notifyStreamReady(vb_);
955     }
956 }
957 
messageReceived(DcpResponse* resp)958 ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
959     LockHolder lh(buffer.bufMutex);
960     cb_assert(resp);
961 
962     if (state_ == STREAM_DEAD) {
963         delete resp;
964         return ENGINE_KEY_ENOENT;
965     }
966 
967     if (resp->getEvent() == DCP_DELETION || resp->getEvent() == DCP_MUTATION ||
968         resp->getEvent() == DCP_EXPIRATION) {
969         MutationResponse* m = static_cast<MutationResponse*>(resp);
970         uint64_t bySeqno = m->getBySeqno();
971         if (bySeqno <= last_seqno) {
972             LOG(EXTENSION_LOG_INFO, "%s Dropping dcp mutation for vbucket %d "
973                 "with opaque %ld because the byseqno given (%llu) must be "
974                 "larger than %llu", consumer->logHeader(), vb_, opaque_,
975                 bySeqno, last_seqno);
976             delete m;
977             return ENGINE_ERANGE;
978         }
979         last_seqno = bySeqno;
980     }
981 
982     buffer.messages.push(resp);
983     buffer.items++;
984     buffer.bytes += resp->getMessageSize();
985 
986     return ENGINE_SUCCESS;
987 }
988 
processBufferedMessages(uint32_t& processed_bytes)989 process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes) {
990     LockHolder lh(buffer.bufMutex);
991     uint32_t count = 0;
992     uint32_t message_bytes = 0;
993     uint32_t total_bytes_processed = 0;
994     bool failed = false;
995 
996     if (buffer.messages.empty()) {
997         return all_processed;
998     }
999 
1000     while (count < PassiveStream::batchSize && !buffer.messages.empty()) {
1001         ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1002         DcpResponse *response = buffer.messages.front();
1003         message_bytes = response->getMessageSize();
1004 
1005         switch (response->getEvent()) {
1006             case DCP_MUTATION:
1007                 ret = processMutation(static_cast<MutationResponse*>(response));
1008                 break;
1009             case DCP_DELETION:
1010             case DCP_EXPIRATION:
1011                 ret = processDeletion(static_cast<MutationResponse*>(response));
1012                 break;
1013             case DCP_SNAPSHOT_MARKER:
1014                 processMarker(static_cast<SnapshotMarker*>(response));
1015                 break;
1016             case DCP_SET_VBUCKET:
1017                 processSetVBucketState(static_cast<SetVBucketState*>(response));
1018                 break;
1019             case DCP_STREAM_END:
1020                 transitionState(STREAM_DEAD);
1021                 delete response;
1022                 break;
1023             default:
1024                 abort();
1025         }
1026 
1027         if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
1028             failed = true;
1029             break;
1030         }
1031 
1032         buffer.messages.pop();
1033         buffer.items--;
1034         buffer.bytes -= message_bytes;
1035         count++;
1036         total_bytes_processed += message_bytes;
1037     }
1038 
1039     processed_bytes = total_bytes_processed;
1040 
1041     if (failed) {
1042         return cannot_process;
1043     }
1044 
1045     return all_processed;
1046 }
1047 
processMutation(MutationResponse* mutation)1048 ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
1049     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1050     if (!vb) {
1051         return ENGINE_NOT_MY_VBUCKET;
1052     }
1053 
1054     ENGINE_ERROR_CODE ret;
1055     if (saveSnapshot) {
1056         LockHolder lh = vb->getSnapshotLock();
1057         ret = commitMutation(mutation, vb->isBackfillPhase());
1058         vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start, cur_snapshot_end);
1059         saveSnapshot = false;
1060         lh.unlock();
1061     } else {
1062         ret = commitMutation(mutation, vb->isBackfillPhase());
1063     }
1064 
1065     // We should probably handle these error codes in a better way, but since
1066     // the producer side doesn't do anything with them anyways let's just log
1067     // them for now until we come up with a better solution.
1068     if (ret != ENGINE_SUCCESS) {
1069         LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1070             "process  mutation", consumer->logHeader(), ret);
1071     }
1072 
1073     handleSnapshotEnd(vb, mutation->getBySeqno());
1074 
1075     if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
1076         delete mutation;
1077     }
1078 
1079     return ret;
1080 }
1081 
commitMutation(MutationResponse* mutation, bool backfillPhase)1082 ENGINE_ERROR_CODE PassiveStream::commitMutation(MutationResponse* mutation,
1083                                                 bool backfillPhase) {
1084     if (backfillPhase) {
1085         return engine->getEpStore()->addTAPBackfillItem(*mutation->getItem(),
1086                                                         INITIAL_NRU_VALUE,
1087                                                         false);
1088     } else {
1089         return engine->getEpStore()->setWithMeta(*mutation->getItem(), 0,
1090                                                  consumer->getCookie(), true,
1091                                                  true, INITIAL_NRU_VALUE, false,
1092                                                  true);
1093     }
1094 }
1095 
processDeletion(MutationResponse* deletion)1096 ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
1097     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1098     if (!vb) {
1099         return ENGINE_NOT_MY_VBUCKET;
1100     }
1101 
1102     ENGINE_ERROR_CODE ret;
1103     if (saveSnapshot) {
1104         LockHolder lh = vb->getSnapshotLock();
1105         ret = commitDeletion(deletion, vb->isBackfillPhase());
1106         vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start, cur_snapshot_end);
1107         saveSnapshot = false;
1108         lh.unlock();
1109     } else {
1110         ret = commitDeletion(deletion, vb->isBackfillPhase());
1111     }
1112 
1113     if (ret == ENGINE_KEY_ENOENT) {
1114         ret = ENGINE_SUCCESS;
1115     }
1116 
1117     // We should probably handle these error codes in a better way, but since
1118     // the producer side doesn't do anything with them anyways let's just log
1119     // them for now until we come up with a better solution.
1120     if (ret != ENGINE_SUCCESS) {
1121         LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1122             "process  deletion", consumer->logHeader(), ret);
1123     }
1124 
1125     handleSnapshotEnd(vb, deletion->getBySeqno());
1126 
1127     if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
1128         delete deletion;
1129     }
1130 
1131     return ret;
1132 }
1133 
commitDeletion(MutationResponse* deletion, bool backfillPhase)1134 ENGINE_ERROR_CODE PassiveStream::commitDeletion(MutationResponse* deletion,
1135                                                 bool backfillPhase) {
1136     uint64_t delCas = 0;
1137     ItemMetaData meta = deletion->getItem()->getMetaData();
1138     return engine->getEpStore()->deleteWithMeta(deletion->getItem()->getKey(),
1139                                                 &delCas, deletion->getVBucket(),
1140                                                 consumer->getCookie(), true,
1141                                                 &meta, backfillPhase, false,
1142                                                 deletion->getBySeqno(), true);
1143 }
1144 
processMarker(SnapshotMarker* marker)1145 void PassiveStream::processMarker(SnapshotMarker* marker) {
1146     RCPtr<VBucket> vb = engine->getVBucket(vb_);
1147 
1148     cur_snapshot_start = marker->getStartSeqno();
1149     cur_snapshot_end = marker->getEndSeqno();
1150     cur_snapshot_type = (marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory;
1151     saveSnapshot = true;
1152 
1153     if (vb) {
1154         if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
1155             vb->setBackfillPhase(true);
1156             vb->checkpointManager.checkAndAddNewCheckpoint(0, vb);
1157         } else {
1158             if (marker->getFlags() & MARKER_FLAG_CHK ||
1159                 vb->checkpointManager.getOpenCheckpointId() == 0) {
1160                 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1161                 vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1162             }
1163             vb->setBackfillPhase(false);
1164         }
1165 
1166         if (marker->getFlags() & MARKER_FLAG_ACK) {
1167             cur_snapshot_ack = true;
1168         }
1169     }
1170     delete marker;
1171 }
1172 
processSetVBucketState(SetVBucketState* state)1173 void PassiveStream::processSetVBucketState(SetVBucketState* state) {
1174     engine->getEpStore()->setVBucketState(vb_, state->getState(), true);
1175     delete state;
1176 
1177     LockHolder lh (streamMutex);
1178     readyQ.push(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
1179     if (!itemsReady) {
1180         itemsReady = true;
1181         lh.unlock();
1182         consumer->notifyStreamReady(vb_);
1183     }
1184 }
1185 
handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno)1186 void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
1187     if (byseqno == cur_snapshot_end) {
1188         if (cur_snapshot_type == disk && vb->isBackfillPhase()) {
1189             vb->setBackfillPhase(false);
1190             uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1191             vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1192         }
1193 
1194         if (cur_snapshot_ack) {
1195             LockHolder lh(streamMutex);
1196             readyQ.push(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
1197             if (!itemsReady) {
1198                 itemsReady = true;
1199                 lh.unlock();
1200                 consumer->notifyStreamReady(vb_);
1201             }
1202             cur_snapshot_ack = false;
1203         }
1204         cur_snapshot_type = none;
1205         vb->setCurrentSnapshot(byseqno, byseqno);
1206     }
1207 }
1208 
addStats(ADD_STAT add_stat, const void *c)1209 void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
1210     Stream::addStats(add_stat, c);
1211 
1212     const int bsize = 128;
1213     char buf[bsize];
1214     snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(), vb_);
1215     add_casted_stat(buf, buffer.items, add_stat, c);
1216     snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(), vb_);
1217     add_casted_stat(buf, buffer.bytes, add_stat, c);
1218     snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
1219     add_casted_stat(buf, itemsReady ? "true" : "false", add_stat, c);
1220     snprintf(buf, bsize, "%s:stream_%d_last_received_seqno", name_.c_str(), vb_);
1221     add_casted_stat(buf, last_seqno, add_stat, c);
1222 
1223     snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type", name_.c_str(), vb_);
1224     add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type), add_stat, c);
1225 
1226     if (cur_snapshot_type != none) {
1227         snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start", name_.c_str(), vb_);
1228         add_casted_stat(buf, cur_snapshot_start, add_stat, c);
1229         snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end", name_.c_str(), vb_);
1230         add_casted_stat(buf, cur_snapshot_end, add_stat, c);
1231     }
1232 }
1233 
next()1234 DcpResponse* PassiveStream::next() {
1235     LockHolder lh(streamMutex);
1236 
1237     if (readyQ.empty()) {
1238         itemsReady = false;
1239         return NULL;
1240     }
1241 
1242     DcpResponse* response = readyQ.front();
1243     readyQ.pop();
1244     return response;
1245 }
1246 
clearBuffer()1247 void PassiveStream::clearBuffer() {
1248     LockHolder lh(buffer.bufMutex);
1249 
1250     while (!buffer.messages.empty()) {
1251         DcpResponse* resp = buffer.messages.front();
1252         buffer.messages.pop();
1253         delete resp;
1254     }
1255 
1256     buffer.bytes = 0;
1257     buffer.items = 0;
1258 }
1259 
transitionState(stream_state_t newState)1260 void PassiveStream::transitionState(stream_state_t newState) {
1261     LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
1262         consumer->logHeader(), vb_, stateName(state_), stateName(newState));
1263 
1264     if (state_ == newState) {
1265         return;
1266     }
1267 
1268     switch (state_) {
1269         case STREAM_PENDING:
1270             cb_assert(newState == STREAM_READING || newState == STREAM_DEAD);
1271             break;
1272         case STREAM_READING:
1273             cb_assert(newState == STREAM_PENDING || newState == STREAM_DEAD);
1274             break;
1275         default:
1276             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
1277                 "to %s", consumer->logHeader(), vb_, stateName(state_),
1278                 stateName(newState));
1279             abort();
1280     }
1281 
1282     state_ = newState;
1283 }
1284