1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2013 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "config.h"
19
20 #include "ep_engine.h"
21 #include "failover-table.h"
22 #include "kvstore.h"
23 #include "statwriter.h"
24 #include "dcp-stream.h"
25 #include "dcp-consumer.h"
26 #include "dcp-producer.h"
27 #include "dcp-response.h"
28
29 #define DCP_BACKFILL_SLEEP_TIME 2
30
snapshotTypeToString(snapshot_type_t type)31 static const char* snapshotTypeToString(snapshot_type_t type) {
32 static const char * const snapshotTypes[] = { "none", "disk", "memory" };
33 cb_assert(type >= none && type <= memory);
34 return snapshotTypes[type];
35 }
36
37 const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
38 const size_t PassiveStream::batchSize = 10;
39
40 class SnapshotMarkerCallback : public Callback<SeqnoRange> {
41 public:
SnapshotMarkerCallback(stream_t s)42 SnapshotMarkerCallback(stream_t s)
43 : stream(s) {
44 cb_assert(s->getType() == STREAM_ACTIVE);
45 }
46
callback(SeqnoRange &range)47 void callback(SeqnoRange &range) {
48 uint64_t st = range.getStartSeqno();
49 uint64_t en = range.getEndSeqno();
50 static_cast<ActiveStream*>(stream.get())->markDiskSnapshot(st, en);
51 }
52
53 private:
54 stream_t stream;
55 };
56
57 class CacheCallback : public Callback<CacheLookup> {
58 public:
CacheCallback(EventuallyPersistentEngine* e, stream_t &s)59 CacheCallback(EventuallyPersistentEngine* e, stream_t &s)
60 : engine_(e), stream_(s) {
61 Stream *str = stream_.get();
62 if (str) {
63 cb_assert(str->getType() == STREAM_ACTIVE);
64 }
65 }
66
67 void callback(CacheLookup &lookup);
68
69 private:
70 EventuallyPersistentEngine* engine_;
71 stream_t stream_;
72 };
73
callback(CacheLookup &lookup)74 void CacheCallback::callback(CacheLookup &lookup) {
75 RCPtr<VBucket> vb = engine_->getEpStore()->getVBucket(lookup.getVBucketId());
76 if (!vb) {
77 setStatus(ENGINE_SUCCESS);
78 return;
79 }
80
81 int bucket_num(0);
82 LockHolder lh = vb->ht.getLockedBucket(lookup.getKey(), &bucket_num);
83 StoredValue *v = vb->ht.unlocked_find(lookup.getKey(), bucket_num, false, false);
84 if (v && v->isResident() && v->getBySeqno() == lookup.getBySeqno()) {
85 Item* it = v->toItem(false, lookup.getVBucketId());
86 lh.unlock();
87 static_cast<ActiveStream*>(stream_.get())->backfillReceived(it);
88 setStatus(ENGINE_KEY_EEXISTS);
89 } else {
90 setStatus(ENGINE_SUCCESS);
91 }
92 }
93
94 class DiskCallback : public Callback<GetValue> {
95 public:
DiskCallback(stream_t &s)96 DiskCallback(stream_t &s)
97 : stream_(s) {
98 Stream *str = stream_.get();
99 if (str) {
100 cb_assert(str->getType() == STREAM_ACTIVE);
101 }
102 }
103
callback(GetValue &val)104 void callback(GetValue &val) {
105 cb_assert(val.getValue());
106 ActiveStream* active_stream = static_cast<ActiveStream*>(stream_.get());
107 active_stream->backfillReceived(val.getValue());
108 }
109
110 private:
111 stream_t stream_;
112 };
113
114 class DCPBackfill : public GlobalTask {
115 public:
DCPBackfill(EventuallyPersistentEngine* e, stream_t s, uint64_t start_seqno, uint64_t end_seqno, const Priority &p, double sleeptime = 0, bool shutdown = false)116 DCPBackfill(EventuallyPersistentEngine* e, stream_t s,
117 uint64_t start_seqno, uint64_t end_seqno, const Priority &p,
118 double sleeptime = 0, bool shutdown = false)
119 : GlobalTask(e, p, sleeptime, shutdown), engine(e), stream(s),
120 startSeqno(start_seqno), endSeqno(end_seqno) {
121 cb_assert(stream->getType() == STREAM_ACTIVE);
122 }
123
124 bool run();
125
126 std::string getDescription();
127
128 private:
129 EventuallyPersistentEngine *engine;
130 stream_t stream;
131 uint64_t startSeqno;
132 uint64_t endSeqno;
133 };
134
run()135 bool DCPBackfill::run() {
136 uint16_t vbid = stream->getVBucket();
137
138 if (engine->getEpStore()->isMemoryUsageTooHigh()) {
139 LOG(EXTENSION_LOG_INFO, "VBucket %d dcp backfill task temporarily "
140 "suspended because the current memory usage is too high",
141 vbid);
142 snooze(DCP_BACKFILL_SLEEP_TIME);
143 return true;
144 }
145
146 uint64_t lastPersistedSeqno =
147 engine->getEpStore()->getLastPersistedSeqno(vbid);
148 uint64_t diskSeqno =
149 engine->getEpStore()->getRWUnderlying(vbid)->getLastPersistedSeqno(vbid);
150
151 if (lastPersistedSeqno < endSeqno) {
152 LOG(EXTENSION_LOG_WARNING, "Rescheduling backfill for vbucket %d "
153 "because backfill up to seqno %llu is needed but only up to "
154 "%llu is persisted (disk %llu)", vbid, endSeqno,
155 lastPersistedSeqno, diskSeqno);
156 snooze(DCP_BACKFILL_SLEEP_TIME);
157 return true;
158 }
159
160 KVStore* kvstore = engine->getEpStore()->getROUnderlying(vbid);
161 size_t numItems = kvstore->getNumItems(vbid, startSeqno,
162 std::numeric_limits<uint64_t>::max());
163 static_cast<ActiveStream*>(stream.get())->incrBackfillRemaining(numItems);
164
165 shared_ptr<Callback<GetValue> > cb(new DiskCallback(stream));
166 shared_ptr<Callback<CacheLookup> > cl(new CacheCallback(engine, stream));
167 shared_ptr<Callback<SeqnoRange> > sr(new SnapshotMarkerCallback(stream));
168 kvstore->dump(vbid, startSeqno, cb, cl, sr);
169
170 static_cast<ActiveStream*>(stream.get())->completeBackfill();
171
172 LOG(EXTENSION_LOG_WARNING, "Backfill task (%llu to %llu) finished for vb %d"
173 " disk seqno %llu memory seqno %llu", startSeqno, endSeqno,
174 stream->getVBucket(), diskSeqno, lastPersistedSeqno);
175
176 return false;
177 }
178
getDescription()179 std::string DCPBackfill::getDescription() {
180 std::stringstream ss;
181 ss << "DCP backfill for vbucket " << stream->getVBucket();
182 return ss.str();
183 }
184
Stream(const std::string &name, uint32_t flags, uint32_t opaque, uint16_t vb, uint64_t start_seqno, uint64_t end_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)185 Stream::Stream(const std::string &name, uint32_t flags, uint32_t opaque,
186 uint16_t vb, uint64_t start_seqno, uint64_t end_seqno,
187 uint64_t vb_uuid, uint64_t snap_start_seqno,
188 uint64_t snap_end_seqno)
189 : name_(name), flags_(flags), opaque_(opaque), vb_(vb),
190 start_seqno_(start_seqno), end_seqno_(end_seqno), vb_uuid_(vb_uuid),
191 snap_start_seqno_(snap_start_seqno),
192 snap_end_seqno_(snap_end_seqno),
193 state_(STREAM_PENDING), itemsReady(false) {
194 }
195
clear_UNLOCKED()196 void Stream::clear_UNLOCKED() {
197 while (!readyQ.empty()) {
198 DcpResponse* resp = readyQ.front();
199 delete resp;
200 readyQ.pop();
201 }
202 }
203
stateName(stream_state_t st) const204 const char * Stream::stateName(stream_state_t st) const {
205 static const char * const stateNames[] = {
206 "pending", "backfilling", "in-memory", "takeover-send", "takeover-wait",
207 "reading", "dead"
208 };
209 cb_assert(st >= STREAM_PENDING && st <= STREAM_DEAD);
210 return stateNames[st];
211 }
212
addStats(ADD_STAT add_stat, const void *c)213 void Stream::addStats(ADD_STAT add_stat, const void *c) {
214 const int bsize = 128;
215 char buffer[bsize];
216 snprintf(buffer, bsize, "%s:stream_%d_flags", name_.c_str(), vb_);
217 add_casted_stat(buffer, flags_, add_stat, c);
218 snprintf(buffer, bsize, "%s:stream_%d_opaque", name_.c_str(), vb_);
219 add_casted_stat(buffer, opaque_, add_stat, c);
220 snprintf(buffer, bsize, "%s:stream_%d_start_seqno", name_.c_str(), vb_);
221 add_casted_stat(buffer, start_seqno_, add_stat, c);
222 snprintf(buffer, bsize, "%s:stream_%d_end_seqno", name_.c_str(), vb_);
223 add_casted_stat(buffer, end_seqno_, add_stat, c);
224 snprintf(buffer, bsize, "%s:stream_%d_vb_uuid", name_.c_str(), vb_);
225 add_casted_stat(buffer, vb_uuid_, add_stat, c);
226 snprintf(buffer, bsize, "%s:stream_%d_snap_start_seqno", name_.c_str(), vb_);
227 add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
228 snprintf(buffer, bsize, "%s:stream_%d_snap_end_seqno", name_.c_str(), vb_);
229 add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
230 snprintf(buffer, bsize, "%s:stream_%d_state", name_.c_str(), vb_);
231 add_casted_stat(buffer, stateName(state_), add_stat, c);
232 }
233
ActiveStream(EventuallyPersistentEngine* e, DcpProducer* p, const std::string &n, uint32_t flags, uint32_t opaque, uint16_t vb, uint64_t st_seqno, uint64_t en_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)234 ActiveStream::ActiveStream(EventuallyPersistentEngine* e, DcpProducer* p,
235 const std::string &n, uint32_t flags,
236 uint32_t opaque, uint16_t vb, uint64_t st_seqno,
237 uint64_t en_seqno, uint64_t vb_uuid,
238 uint64_t snap_start_seqno, uint64_t snap_end_seqno)
239 : Stream(n, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
240 snap_start_seqno, snap_end_seqno),
241 lastReadSeqno(st_seqno), lastSentSeqno(st_seqno), curChkSeqno(st_seqno),
242 takeoverState(vbucket_state_pending), backfillRemaining(0),
243 itemsFromBackfill(0), itemsFromMemory(0), firstMarkerSent(false),
244 waitForSnapshot(0), engine(e), producer(p),
245 isBackfillTaskRunning(false) {
246
247 const char* type = "";
248 if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
249 type = "takeover ";
250 end_seqno_ = dcpMaxSeqno;
251 }
252
253 if (start_seqno_ >= end_seqno_) {
254 endStream(END_STREAM_OK);
255 itemsReady = true;
256 }
257
258 type_ = STREAM_ACTIVE;
259
260 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) %sstream created with start seqno "
261 "%llu and end seqno %llu", producer->logHeader(), vb, type, st_seqno,
262 en_seqno);
263 }
264
next()265 DcpResponse* ActiveStream::next() {
266 LockHolder lh(streamMutex);
267
268 stream_state_t initState = state_;
269
270 DcpResponse* response = NULL;
271 switch (state_) {
272 case STREAM_PENDING:
273 break;
274 case STREAM_BACKFILLING:
275 response = backfillPhase();
276 break;
277 case STREAM_IN_MEMORY:
278 response = inMemoryPhase();
279 break;
280 case STREAM_TAKEOVER_SEND:
281 response = takeoverSendPhase();
282 break;
283 case STREAM_TAKEOVER_WAIT:
284 response = takeoverWaitPhase();
285 break;
286 case STREAM_DEAD:
287 response = deadPhase();
288 break;
289 default:
290 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid state '%s'",
291 producer->logHeader(), vb_, stateName(state_));
292 abort();
293 }
294
295 if (state_ != STREAM_DEAD && initState != state_ && !response) {
296 lh.unlock();
297 return next();
298 }
299
300 itemsReady = response ? true : false;
301 return response;
302 }
303
markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno)304 void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
305 LockHolder lh(streamMutex);
306 if (state_ != STREAM_BACKFILLING) {
307 return;
308 }
309
310 startSeqno = std::min(snap_start_seqno_, startSeqno);
311 firstMarkerSent = true;
312
313 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Sending disk snapshot with start "
314 "seqno %llu and end seqno %llu", producer->logHeader(), vb_, startSeqno,
315 endSeqno);
316 readyQ.push(new SnapshotMarker(opaque_, vb_, startSeqno, endSeqno,
317 MARKER_FLAG_DISK));
318 RCPtr<VBucket> vb = engine->getVBucket(vb_);
319 if (!vb) {
320 endStream(END_STREAM_STATE);
321 } else {
322 if (endSeqno > end_seqno_) {
323 endSeqno = end_seqno_;
324 }
325 // Only re-register the cursor if we still need to get memory snapshots
326 CursorRegResult result =
327 vb->checkpointManager.registerTAPCursorBySeqno(name_, endSeqno);
328 curChkSeqno = result.first;
329 }
330
331 if (!itemsReady) {
332 itemsReady = true;
333 lh.unlock();
334 producer->notifyStreamReady(vb_, false);
335 }
336 }
337
backfillReceived(Item* itm)338 void ActiveStream::backfillReceived(Item* itm) {
339 LockHolder lh(streamMutex);
340 if (state_ == STREAM_BACKFILLING) {
341 readyQ.push(new MutationResponse(itm, opaque_));
342 lastReadSeqno = itm->getBySeqno();
343
344 if (!itemsReady) {
345 itemsReady = true;
346 lh.unlock();
347 producer->notifyStreamReady(vb_, false);
348 }
349 } else {
350 delete itm;
351 }
352 }
353
completeBackfill()354 void ActiveStream::completeBackfill() {
355 LockHolder lh(streamMutex);
356
357 if (state_ == STREAM_BACKFILLING) {
358 isBackfillTaskRunning = false;
359 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Backfill complete, %d items read"
360 " from disk, last seqno read: %ld", producer->logHeader(), vb_,
361 itemsFromBackfill, lastReadSeqno);
362
363 if (!itemsReady) {
364 itemsReady = true;
365 lh.unlock();
366 producer->notifyStreamReady(vb_, false);
367 }
368 }
369 }
370
snapshotMarkerAckReceived()371 void ActiveStream::snapshotMarkerAckReceived() {
372 LockHolder lh (streamMutex);
373 waitForSnapshot--;
374
375 if (!itemsReady && waitForSnapshot == 0) {
376 itemsReady = true;
377 lh.unlock();
378 producer->notifyStreamReady(vb_, true);
379 }
380 }
381
setVBucketStateAckRecieved()382 void ActiveStream::setVBucketStateAckRecieved() {
383 LockHolder lh(streamMutex);
384 if (state_ == STREAM_TAKEOVER_WAIT) {
385 if (takeoverState == vbucket_state_pending) {
386 RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
387 engine->getEpStore()->setVBucketState(vb_, vbucket_state_dead,
388 false, false);
389 takeoverState = vbucket_state_active;
390 transitionState(STREAM_TAKEOVER_SEND);
391 LOG(EXTENSION_LOG_INFO, "%s (vb %d) Receive ack for set vbucket "
392 "state to pending message", producer->logHeader(), vb_);
393 } else {
394 LOG(EXTENSION_LOG_INFO, "%s (vb %d) Receive ack for set vbucket "
395 "state to active message", producer->logHeader(), vb_);
396 endStream(END_STREAM_OK);
397 }
398
399 if (!itemsReady) {
400 itemsReady = true;
401 lh.unlock();
402 producer->notifyStreamReady(vb_, true);
403 }
404 } else {
405 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Unexpected ack for set vbucket "
406 "op on stream '%s' state '%s'", producer->logHeader(), vb_,
407 name_.c_str(), stateName(state_));
408 }
409 }
410
backfillPhase()411 DcpResponse* ActiveStream::backfillPhase() {
412 DcpResponse* resp = nextQueuedItem();
413
414 if (resp && backfillRemaining > 0 &&
415 (resp->getEvent() == DCP_MUTATION ||
416 resp->getEvent() == DCP_DELETION ||
417 resp->getEvent() == DCP_EXPIRATION)) {
418 backfillRemaining--;
419 }
420
421 if (!isBackfillTaskRunning && readyQ.empty()) {
422 backfillRemaining = 0;
423 if (lastReadSeqno >= end_seqno_) {
424 endStream(END_STREAM_OK);
425 } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
426 transitionState(STREAM_TAKEOVER_SEND);
427 } else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
428 endStream(END_STREAM_OK);
429 } else {
430 transitionState(STREAM_IN_MEMORY);
431 }
432
433 if (!resp) {
434 resp = nextQueuedItem();
435 }
436 }
437
438 return resp;
439 }
440
inMemoryPhase()441 DcpResponse* ActiveStream::inMemoryPhase() {
442 if (!readyQ.empty()) {
443 return nextQueuedItem();
444 }
445
446 if (lastSentSeqno >= end_seqno_) {
447 endStream(END_STREAM_OK);
448 } else {
449 nextCheckpointItem();
450 }
451
452 return nextQueuedItem();
453 }
454
takeoverSendPhase()455 DcpResponse* ActiveStream::takeoverSendPhase() {
456 if (!readyQ.empty()) {
457 return nextQueuedItem();
458 } else {
459 nextCheckpointItem();
460 if (!readyQ.empty()) {
461 return nextQueuedItem();
462 }
463 }
464
465 if (waitForSnapshot != 0) {
466 return NULL;
467 }
468
469 DcpResponse* resp = new SetVBucketState(opaque_, vb_, takeoverState);
470 transitionState(STREAM_TAKEOVER_WAIT);
471 return resp;
472 }
473
takeoverWaitPhase()474 DcpResponse* ActiveStream::takeoverWaitPhase() {
475 return nextQueuedItem();
476 }
477
deadPhase()478 DcpResponse* ActiveStream::deadPhase() {
479 return nextQueuedItem();
480 }
481
addStats(ADD_STAT add_stat, const void *c)482 void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
483 Stream::addStats(add_stat, c);
484
485 const int bsize = 128;
486 char buffer[bsize];
487 snprintf(buffer, bsize, "%s:stream_%d_backfilled", name_.c_str(), vb_);
488 add_casted_stat(buffer, itemsFromBackfill, add_stat, c);
489 snprintf(buffer, bsize, "%s:stream_%d_memory", name_.c_str(), vb_);
490 add_casted_stat(buffer, itemsFromMemory, add_stat, c);
491 snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
492 add_casted_stat(buffer, lastSentSeqno, add_stat, c);
493 snprintf(buffer, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
494 add_casted_stat(buffer, itemsReady ? "true" : "false", add_stat, c);
495 }
496
addTakeoverStats(ADD_STAT add_stat, const void *cookie)497 void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie) {
498 LockHolder lh(streamMutex);
499
500 RCPtr<VBucket> vb = engine->getVBucket(vb_);
501 add_casted_stat("name", name_, add_stat, cookie);
502 if (!vb || state_ == STREAM_DEAD) {
503 add_casted_stat("status", "completed", add_stat, cookie);
504 add_casted_stat("estimate", 0, add_stat, cookie);
505 add_casted_stat("backfillRemaining", 0, add_stat, cookie);
506 add_casted_stat("estimate", 0, add_stat, cookie);
507 return;
508 }
509
510 size_t total = backfillRemaining;
511 if (state_ == STREAM_BACKFILLING) {
512 add_casted_stat("status", "backfilling", add_stat, cookie);
513 } else {
514 add_casted_stat("status", "in-memory", add_stat, cookie);
515 }
516 add_casted_stat("backfillRemaining", backfillRemaining, add_stat, cookie);
517
518 item_eviction_policy_t iep = engine->getEpStore()->getItemEvictionPolicy();
519 size_t vb_items = vb->getNumItems(iep);
520 size_t chk_items = vb_items > 0 ?
521 vb->checkpointManager.getNumItemsForTAPConnection(name_) : 0;
522 size_t del_items = engine->getEpStore()->getRWUnderlying(vb_)->
523 getNumPersistedDeletes(vb_);
524
525 if (end_seqno_ < curChkSeqno) {
526 chk_items = 0;
527 } else if ((end_seqno_ - curChkSeqno) < chk_items) {
528 chk_items = end_seqno_ - curChkSeqno + 1;
529 }
530 total += chk_items;
531
532 add_casted_stat("estimate", total, add_stat, cookie);
533 add_casted_stat("chk_items", chk_items, add_stat, cookie);
534 add_casted_stat("vb_items", vb_items, add_stat, cookie);
535 add_casted_stat("on_disk_deletes", del_items, add_stat, cookie);
536 }
537
nextQueuedItem()538 DcpResponse* ActiveStream::nextQueuedItem() {
539 if (!readyQ.empty()) {
540 DcpResponse* response = readyQ.front();
541 if (response->getEvent() == DCP_MUTATION ||
542 response->getEvent() == DCP_DELETION ||
543 response->getEvent() == DCP_EXPIRATION) {
544 lastSentSeqno = dynamic_cast<MutationResponse*>(response)->getBySeqno();
545
546 if (state_ == STREAM_BACKFILLING) {
547 itemsFromBackfill++;
548 } else {
549 itemsFromMemory++;
550 }
551 }
552 readyQ.pop();
553 return response;
554 }
555 return NULL;
556 }
557
nextCheckpointItem()558 void ActiveStream::nextCheckpointItem() {
559 RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
560
561 bool mark = false;
562 std::list<queued_item> items;
563 std::list<MutationResponse*> mutations;
564 vbucket->checkpointManager.getAllItemsForCursor(name_, items);
565 if (vbucket->checkpointManager.getNumCheckpoints() > 1) {
566 engine->getEpStore()->wakeUpCheckpointRemover();
567 }
568
569 if (items.empty()) {
570 return;
571 }
572
573 if (items.front()->getOperation() == queue_op_checkpoint_start) {
574 mark = true;
575 }
576
577 while (!items.empty()) {
578 queued_item qi = items.front();
579 items.pop_front();
580
581 if (qi->getOperation() == queue_op_set ||
582 qi->getOperation() == queue_op_del) {
583 curChkSeqno = qi->getBySeqno();
584 lastReadSeqno = qi->getBySeqno();
585
586 mutations.push_back(new MutationResponse(qi, opaque_));
587 } else if (qi->getOperation() == queue_op_checkpoint_start) {
588 snapshot(mutations, mark);
589 mark = true;
590 }
591 }
592
593 if (mutations.empty()) {
594 // If we only got checkpoint start or ends check to see if there are
595 // any more snapshots before pausing the stream.
596 nextCheckpointItem();
597 } else {
598 snapshot(mutations, mark);
599 }
600 }
601
snapshot(std::list<MutationResponse*>& items, bool mark)602 void ActiveStream::snapshot(std::list<MutationResponse*>& items, bool mark) {
603 if (items.empty()) {
604 return;
605 }
606
607 uint32_t flags = MARKER_FLAG_MEMORY;
608 uint64_t snapStart = items.front()->getBySeqno();
609 uint64_t snapEnd = items.back()->getBySeqno();
610
611 if (mark) {
612 flags |= MARKER_FLAG_CHK;
613 }
614
615 if (state_ == STREAM_TAKEOVER_SEND) {
616 waitForSnapshot++;
617 flags |= MARKER_FLAG_ACK;
618 }
619
620 if (!firstMarkerSent) {
621 snapStart = std::min(snap_start_seqno_, snapStart);
622 firstMarkerSent = true;
623 }
624
625 readyQ.push(new SnapshotMarker(opaque_, vb_, snapStart, snapEnd, flags));
626 while(!items.empty()) {
627 readyQ.push(items.front());
628 items.pop_front();
629 }
630 }
631
setDead(end_stream_status_t status)632 uint32_t ActiveStream::setDead(end_stream_status_t status) {
633 LockHolder lh(streamMutex);
634 endStream(status);
635
636 if (!itemsReady && status != END_STREAM_DISCONNECTED) {
637 itemsReady = true;
638 lh.unlock();
639 producer->notifyStreamReady(vb_, true);
640 }
641 return 0;
642 }
643
notifySeqnoAvailable(uint64_t seqno)644 void ActiveStream::notifySeqnoAvailable(uint64_t seqno) {
645 LockHolder lh(streamMutex);
646 if (state_ != STREAM_DEAD) {
647 if (!itemsReady) {
648 itemsReady = true;
649 lh.unlock();
650 producer->notifyStreamReady(vb_, true);
651 }
652 }
653 }
654
endStream(end_stream_status_t reason)655 void ActiveStream::endStream(end_stream_status_t reason) {
656 if (state_ != STREAM_DEAD) {
657 if (reason != END_STREAM_DISCONNECTED) {
658 readyQ.push(new StreamEndResponse(opaque_, reason, vb_));
659 }
660 transitionState(STREAM_DEAD);
661 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream closing, %llu items sent"
662 " from disk, %llu items sent from memory, %llu was last seqno sent",
663 producer->logHeader(), vb_, itemsFromBackfill, itemsFromMemory,
664 lastSentSeqno);
665 }
666 }
667
scheduleBackfill()668 void ActiveStream::scheduleBackfill() {
669 if (!isBackfillTaskRunning) {
670 RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
671 if (!vbucket) {
672 return;
673 }
674
675 CursorRegResult result =
676 vbucket->checkpointManager.registerTAPCursorBySeqno(name_,
677 lastReadSeqno);
678 curChkSeqno = result.first;
679 bool isFirstItem = result.second;
680
681 cb_assert(lastReadSeqno <= curChkSeqno);
682 uint64_t backfillStart = lastReadSeqno + 1;
683
684 /* We need to find the minimum seqno that needs to be backfilled in
685 * order to make sure that we don't miss anything when transitioning
686 * to a memory snapshot. The backfill task will always make sure that
687 * the backfill end seqno is contained in the backfill.
688 */
689 uint64_t backfillEnd = 0;
690 if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) { // disk backfill only
691 backfillEnd = end_seqno_;
692 } else { // disk backfill + in-memory streaming
693 if (backfillStart < curChkSeqno) {
694 if (curChkSeqno > end_seqno_) {
695 backfillEnd = end_seqno_;
696 } else {
697 backfillEnd = curChkSeqno - 1;
698 }
699 }
700 }
701
702 bool tryBackfill = isFirstItem || flags_ & DCP_ADD_STREAM_FLAG_DISKONLY;
703
704 if (backfillStart <= backfillEnd && tryBackfill) {
705 ExTask task = new DCPBackfill(engine, this, backfillStart, backfillEnd,
706 Priority::TapBgFetcherPriority, 0, false);
707 ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
708 isBackfillTaskRunning = true;
709 } else {
710 if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
711 endStream(END_STREAM_OK);
712 } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
713 transitionState(STREAM_TAKEOVER_SEND);
714 } else {
715 transitionState(STREAM_IN_MEMORY);
716 }
717 itemsReady = true;
718 }
719 }
720 }
721
transitionState(stream_state_t newState)722 void ActiveStream::transitionState(stream_state_t newState) {
723 LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
724 producer->logHeader(), vb_, stateName(state_), stateName(newState));
725
726 if (state_ == newState) {
727 return;
728 }
729
730 switch (state_) {
731 case STREAM_PENDING:
732 cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
733 break;
734 case STREAM_BACKFILLING:
735 cb_assert(newState == STREAM_IN_MEMORY ||
736 newState == STREAM_TAKEOVER_SEND ||
737 newState == STREAM_DEAD);
738 break;
739 case STREAM_IN_MEMORY:
740 cb_assert(newState == STREAM_BACKFILLING || newState == STREAM_DEAD);
741 break;
742 case STREAM_TAKEOVER_SEND:
743 cb_assert(newState == STREAM_TAKEOVER_WAIT || newState == STREAM_DEAD);
744 break;
745 case STREAM_TAKEOVER_WAIT:
746 cb_assert(newState == STREAM_TAKEOVER_SEND || newState == STREAM_DEAD);
747 break;
748 default:
749 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
750 "to %s", producer->logHeader(), vb_, stateName(state_),
751 stateName(newState));
752 abort();
753 }
754
755 state_ = newState;
756
757 if (newState == STREAM_BACKFILLING) {
758 scheduleBackfill();
759 } else if (newState == STREAM_TAKEOVER_SEND) {
760 nextCheckpointItem();
761 } else if (newState == STREAM_DEAD) {
762 RCPtr<VBucket> vb = engine->getVBucket(vb_);
763 if (vb) {
764 vb->checkpointManager.removeTAPCursor(name_);
765 }
766 }
767 }
768
getItemsRemaining()769 size_t ActiveStream::getItemsRemaining() {
770 RCPtr<VBucket> vbucket = engine->getVBucket(vb_);
771
772 if (!vbucket || state_ == STREAM_DEAD) {
773 return 0;
774 }
775
776 uint64_t high_seqno = vbucket->getHighSeqno();
777
778 if (end_seqno_ < high_seqno) {
779 if (end_seqno_ > lastSentSeqno) {
780 return (end_seqno_ - lastSentSeqno);
781 }
782 } else {
783 if (high_seqno > lastSentSeqno) {
784 return (high_seqno - lastSentSeqno);
785 }
786 }
787
788 return 0;
789 }
790
NotifierStream(EventuallyPersistentEngine* e, DcpProducer* p, const std::string &name, uint32_t flags, uint32_t opaque, uint16_t vb, uint64_t st_seqno, uint64_t en_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)791 NotifierStream::NotifierStream(EventuallyPersistentEngine* e, DcpProducer* p,
792 const std::string &name, uint32_t flags,
793 uint32_t opaque, uint16_t vb, uint64_t st_seqno,
794 uint64_t en_seqno, uint64_t vb_uuid,
795 uint64_t snap_start_seqno,
796 uint64_t snap_end_seqno)
797 : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
798 snap_start_seqno, snap_end_seqno),
799 producer(p) {
800 LockHolder lh(streamMutex);
801 RCPtr<VBucket> vbucket = e->getVBucket(vb_);
802 if (vbucket && static_cast<uint64_t>(vbucket->getHighSeqno()) > st_seqno) {
803 readyQ.push(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
804 transitionState(STREAM_DEAD);
805 itemsReady = true;
806 }
807
808 type_ = STREAM_NOTIFIER;
809
810 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) stream created with start seqno "
811 "%llu and end seqno %llu", producer->logHeader(), vb, st_seqno,
812 en_seqno);
813 }
814
setDead(end_stream_status_t status)815 uint32_t NotifierStream::setDead(end_stream_status_t status) {
816 LockHolder lh(streamMutex);
817 if (state_ != STREAM_DEAD) {
818 transitionState(STREAM_DEAD);
819 if (status != END_STREAM_DISCONNECTED) {
820 readyQ.push(new StreamEndResponse(opaque_, status, vb_));
821 if (!itemsReady) {
822 itemsReady = true;
823 lh.unlock();
824 producer->notifyStreamReady(vb_, true);
825 }
826 }
827 }
828 return 0;
829 }
830
notifySeqnoAvailable(uint64_t seqno)831 void NotifierStream::notifySeqnoAvailable(uint64_t seqno) {
832 LockHolder lh(streamMutex);
833 if (state_ != STREAM_DEAD && start_seqno_ < seqno) {
834 readyQ.push(new StreamEndResponse(opaque_, END_STREAM_OK, vb_));
835 transitionState(STREAM_DEAD);
836 if (!itemsReady) {
837 itemsReady = true;
838 lh.unlock();
839 producer->notifyStreamReady(vb_, true);
840 }
841 }
842 }
843
next()844 DcpResponse* NotifierStream::next() {
845 LockHolder lh(streamMutex);
846
847 if (readyQ.empty()) {
848 itemsReady = false;
849 return NULL;
850 }
851
852 DcpResponse* response = readyQ.front();
853 readyQ.pop();
854
855 return response;
856 }
857
transitionState(stream_state_t newState)858 void NotifierStream::transitionState(stream_state_t newState) {
859 LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
860 producer->logHeader(), vb_, stateName(state_), stateName(newState));
861
862 if (state_ == newState) {
863 return;
864 }
865
866 switch (state_) {
867 case STREAM_PENDING:
868 cb_assert(newState == STREAM_DEAD);
869 break;
870 default:
871 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
872 "to %s", producer->logHeader(), vb_, stateName(state_),
873 stateName(newState));
874 abort();
875 }
876
877 state_ = newState;
878 }
879
PassiveStream(EventuallyPersistentEngine* e, DcpConsumer* c, const std::string &name, uint32_t flags, uint32_t opaque, uint16_t vb, uint64_t st_seqno, uint64_t en_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)880 PassiveStream::PassiveStream(EventuallyPersistentEngine* e, DcpConsumer* c,
881 const std::string &name, uint32_t flags,
882 uint32_t opaque, uint16_t vb, uint64_t st_seqno,
883 uint64_t en_seqno, uint64_t vb_uuid,
884 uint64_t snap_start_seqno, uint64_t snap_end_seqno)
885 : Stream(name, flags, opaque, vb, st_seqno, en_seqno, vb_uuid,
886 snap_start_seqno, snap_end_seqno),
887 engine(e), consumer(c), last_seqno(st_seqno), cur_snapshot_start(0),
888 cur_snapshot_end(0), cur_snapshot_type(none), cur_snapshot_ack(false),
889 saveSnapshot(false) {
890 LockHolder lh(streamMutex);
891 readyQ.push(new StreamRequest(vb, opaque, flags, st_seqno, en_seqno,
892 vb_uuid, snap_start_seqno, snap_end_seqno));
893 itemsReady = true;
894 type_ = STREAM_PASSIVE;
895
896 const char* type = (flags & DCP_ADD_STREAM_FLAG_TAKEOVER) ? "takeover" : "";
897 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Attempting to add %s stream with "
898 "start seqno %llu, end seqno %llu, vbucket uuid %llu, snap start seqno "
899 "%llu, and snap end seqno %llu", consumer->logHeader(), vb, type,
900 st_seqno, en_seqno, vb_uuid, snap_start_seqno, snap_end_seqno);
901 }
902
~PassiveStream()903 PassiveStream::~PassiveStream() {
904 LockHolder lh(streamMutex);
905 clear_UNLOCKED();
906 cb_assert(state_ == STREAM_DEAD);
907 cb_assert(buffer.bytes == 0);
908 }
909
setDead(end_stream_status_t status)910 uint32_t PassiveStream::setDead(end_stream_status_t status) {
911 LockHolder lh(streamMutex);
912 transitionState(STREAM_DEAD);
913 uint32_t unackedBytes = buffer.bytes;
914 clearBuffer();
915 return unackedBytes;
916 }
917
acceptStream(uint16_t status, uint32_t add_opaque)918 void PassiveStream::acceptStream(uint16_t status, uint32_t add_opaque) {
919 LockHolder lh(streamMutex);
920 if (state_ == STREAM_PENDING) {
921 if (status == ENGINE_SUCCESS) {
922 transitionState(STREAM_READING);
923 } else {
924 transitionState(STREAM_DEAD);
925 }
926 readyQ.push(new AddStreamResponse(add_opaque, opaque_, status));
927 if (!itemsReady) {
928 itemsReady = true;
929 lh.unlock();
930 consumer->notifyStreamReady(vb_);
931 }
932 }
933 }
934
reconnectStream(RCPtr<VBucket> &vb, uint32_t new_opaque, uint64_t start_seqno)935 void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
936 uint32_t new_opaque,
937 uint64_t start_seqno) {
938 vb_uuid_ = vb->failovers->getLatestEntry().vb_uuid;
939 vb->getCurrentSnapshot(snap_start_seqno_, snap_end_seqno_);
940
941 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Attempting to reconnect stream "
942 "with opaque %ld, start seq no %llu, end seq no %llu, snap start seqno "
943 "%llu, and snap end seqno %llu", consumer->logHeader(), vb_, new_opaque,
944 start_seqno, end_seqno_, snap_start_seqno_, snap_end_seqno_);
945
946 LockHolder lh(streamMutex);
947 last_seqno = start_seqno;
948 readyQ.push(new StreamRequest(vb_, new_opaque, flags_, start_seqno,
949 end_seqno_, vb_uuid_, snap_start_seqno_,
950 snap_end_seqno_));
951 if (!itemsReady) {
952 itemsReady = true;
953 lh.unlock();
954 consumer->notifyStreamReady(vb_);
955 }
956 }
957
messageReceived(DcpResponse* resp)958 ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
959 LockHolder lh(buffer.bufMutex);
960 cb_assert(resp);
961
962 if (state_ == STREAM_DEAD) {
963 delete resp;
964 return ENGINE_KEY_ENOENT;
965 }
966
967 if (resp->getEvent() == DCP_DELETION || resp->getEvent() == DCP_MUTATION ||
968 resp->getEvent() == DCP_EXPIRATION) {
969 MutationResponse* m = static_cast<MutationResponse*>(resp);
970 uint64_t bySeqno = m->getBySeqno();
971 if (bySeqno <= last_seqno) {
972 LOG(EXTENSION_LOG_INFO, "%s Dropping dcp mutation for vbucket %d "
973 "with opaque %ld because the byseqno given (%llu) must be "
974 "larger than %llu", consumer->logHeader(), vb_, opaque_,
975 bySeqno, last_seqno);
976 delete m;
977 return ENGINE_ERANGE;
978 }
979 last_seqno = bySeqno;
980 }
981
982 buffer.messages.push(resp);
983 buffer.items++;
984 buffer.bytes += resp->getMessageSize();
985
986 return ENGINE_SUCCESS;
987 }
988
processBufferedMessages(uint32_t& processed_bytes)989 process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed_bytes) {
990 LockHolder lh(buffer.bufMutex);
991 uint32_t count = 0;
992 uint32_t message_bytes = 0;
993 uint32_t total_bytes_processed = 0;
994 bool failed = false;
995
996 if (buffer.messages.empty()) {
997 return all_processed;
998 }
999
1000 while (count < PassiveStream::batchSize && !buffer.messages.empty()) {
1001 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1002 DcpResponse *response = buffer.messages.front();
1003 message_bytes = response->getMessageSize();
1004
1005 switch (response->getEvent()) {
1006 case DCP_MUTATION:
1007 ret = processMutation(static_cast<MutationResponse*>(response));
1008 break;
1009 case DCP_DELETION:
1010 case DCP_EXPIRATION:
1011 ret = processDeletion(static_cast<MutationResponse*>(response));
1012 break;
1013 case DCP_SNAPSHOT_MARKER:
1014 processMarker(static_cast<SnapshotMarker*>(response));
1015 break;
1016 case DCP_SET_VBUCKET:
1017 processSetVBucketState(static_cast<SetVBucketState*>(response));
1018 break;
1019 case DCP_STREAM_END:
1020 transitionState(STREAM_DEAD);
1021 delete response;
1022 break;
1023 default:
1024 abort();
1025 }
1026
1027 if (ret == ENGINE_TMPFAIL || ret == ENGINE_ENOMEM) {
1028 failed = true;
1029 break;
1030 }
1031
1032 buffer.messages.pop();
1033 buffer.items--;
1034 buffer.bytes -= message_bytes;
1035 count++;
1036 total_bytes_processed += message_bytes;
1037 }
1038
1039 processed_bytes = total_bytes_processed;
1040
1041 if (failed) {
1042 return cannot_process;
1043 }
1044
1045 return all_processed;
1046 }
1047
processMutation(MutationResponse* mutation)1048 ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
1049 RCPtr<VBucket> vb = engine->getVBucket(vb_);
1050 if (!vb) {
1051 return ENGINE_NOT_MY_VBUCKET;
1052 }
1053
1054 ENGINE_ERROR_CODE ret;
1055 if (saveSnapshot) {
1056 LockHolder lh = vb->getSnapshotLock();
1057 ret = commitMutation(mutation, vb->isBackfillPhase());
1058 vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start, cur_snapshot_end);
1059 saveSnapshot = false;
1060 lh.unlock();
1061 } else {
1062 ret = commitMutation(mutation, vb->isBackfillPhase());
1063 }
1064
1065 // We should probably handle these error codes in a better way, but since
1066 // the producer side doesn't do anything with them anyways let's just log
1067 // them for now until we come up with a better solution.
1068 if (ret != ENGINE_SUCCESS) {
1069 LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1070 "process mutation", consumer->logHeader(), ret);
1071 }
1072
1073 handleSnapshotEnd(vb, mutation->getBySeqno());
1074
1075 if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
1076 delete mutation;
1077 }
1078
1079 return ret;
1080 }
1081
commitMutation(MutationResponse* mutation, bool backfillPhase)1082 ENGINE_ERROR_CODE PassiveStream::commitMutation(MutationResponse* mutation,
1083 bool backfillPhase) {
1084 if (backfillPhase) {
1085 return engine->getEpStore()->addTAPBackfillItem(*mutation->getItem(),
1086 INITIAL_NRU_VALUE,
1087 false);
1088 } else {
1089 return engine->getEpStore()->setWithMeta(*mutation->getItem(), 0,
1090 consumer->getCookie(), true,
1091 true, INITIAL_NRU_VALUE, false,
1092 true);
1093 }
1094 }
1095
processDeletion(MutationResponse* deletion)1096 ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
1097 RCPtr<VBucket> vb = engine->getVBucket(vb_);
1098 if (!vb) {
1099 return ENGINE_NOT_MY_VBUCKET;
1100 }
1101
1102 ENGINE_ERROR_CODE ret;
1103 if (saveSnapshot) {
1104 LockHolder lh = vb->getSnapshotLock();
1105 ret = commitDeletion(deletion, vb->isBackfillPhase());
1106 vb->setCurrentSnapshot_UNLOCKED(cur_snapshot_start, cur_snapshot_end);
1107 saveSnapshot = false;
1108 lh.unlock();
1109 } else {
1110 ret = commitDeletion(deletion, vb->isBackfillPhase());
1111 }
1112
1113 if (ret == ENGINE_KEY_ENOENT) {
1114 ret = ENGINE_SUCCESS;
1115 }
1116
1117 // We should probably handle these error codes in a better way, but since
1118 // the producer side doesn't do anything with them anyways let's just log
1119 // them for now until we come up with a better solution.
1120 if (ret != ENGINE_SUCCESS) {
1121 LOG(EXTENSION_LOG_WARNING, "%s Got an error code %d while trying to "
1122 "process deletion", consumer->logHeader(), ret);
1123 }
1124
1125 handleSnapshotEnd(vb, deletion->getBySeqno());
1126
1127 if (ret != ENGINE_TMPFAIL && ret != ENGINE_ENOMEM) {
1128 delete deletion;
1129 }
1130
1131 return ret;
1132 }
1133
commitDeletion(MutationResponse* deletion, bool backfillPhase)1134 ENGINE_ERROR_CODE PassiveStream::commitDeletion(MutationResponse* deletion,
1135 bool backfillPhase) {
1136 uint64_t delCas = 0;
1137 ItemMetaData meta = deletion->getItem()->getMetaData();
1138 return engine->getEpStore()->deleteWithMeta(deletion->getItem()->getKey(),
1139 &delCas, deletion->getVBucket(),
1140 consumer->getCookie(), true,
1141 &meta, backfillPhase, false,
1142 deletion->getBySeqno(), true);
1143 }
1144
processMarker(SnapshotMarker* marker)1145 void PassiveStream::processMarker(SnapshotMarker* marker) {
1146 RCPtr<VBucket> vb = engine->getVBucket(vb_);
1147
1148 cur_snapshot_start = marker->getStartSeqno();
1149 cur_snapshot_end = marker->getEndSeqno();
1150 cur_snapshot_type = (marker->getFlags() & MARKER_FLAG_DISK) ? disk : memory;
1151 saveSnapshot = true;
1152
1153 if (vb) {
1154 if (marker->getFlags() & MARKER_FLAG_DISK && vb->getHighSeqno() == 0) {
1155 vb->setBackfillPhase(true);
1156 vb->checkpointManager.checkAndAddNewCheckpoint(0, vb);
1157 } else {
1158 if (marker->getFlags() & MARKER_FLAG_CHK ||
1159 vb->checkpointManager.getOpenCheckpointId() == 0) {
1160 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1161 vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1162 }
1163 vb->setBackfillPhase(false);
1164 }
1165
1166 if (marker->getFlags() & MARKER_FLAG_ACK) {
1167 cur_snapshot_ack = true;
1168 }
1169 }
1170 delete marker;
1171 }
1172
processSetVBucketState(SetVBucketState* state)1173 void PassiveStream::processSetVBucketState(SetVBucketState* state) {
1174 engine->getEpStore()->setVBucketState(vb_, state->getState(), true);
1175 delete state;
1176
1177 LockHolder lh (streamMutex);
1178 readyQ.push(new SetVBucketStateResponse(opaque_, ENGINE_SUCCESS));
1179 if (!itemsReady) {
1180 itemsReady = true;
1181 lh.unlock();
1182 consumer->notifyStreamReady(vb_);
1183 }
1184 }
1185
handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno)1186 void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
1187 if (byseqno == cur_snapshot_end) {
1188 if (cur_snapshot_type == disk && vb->isBackfillPhase()) {
1189 vb->setBackfillPhase(false);
1190 uint64_t id = vb->checkpointManager.getOpenCheckpointId() + 1;
1191 vb->checkpointManager.checkAndAddNewCheckpoint(id, vb);
1192 }
1193
1194 if (cur_snapshot_ack) {
1195 LockHolder lh(streamMutex);
1196 readyQ.push(new SnapshotMarkerResponse(opaque_, ENGINE_SUCCESS));
1197 if (!itemsReady) {
1198 itemsReady = true;
1199 lh.unlock();
1200 consumer->notifyStreamReady(vb_);
1201 }
1202 cur_snapshot_ack = false;
1203 }
1204 cur_snapshot_type = none;
1205 vb->setCurrentSnapshot(byseqno, byseqno);
1206 }
1207 }
1208
addStats(ADD_STAT add_stat, const void *c)1209 void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
1210 Stream::addStats(add_stat, c);
1211
1212 const int bsize = 128;
1213 char buf[bsize];
1214 snprintf(buf, bsize, "%s:stream_%d_buffer_items", name_.c_str(), vb_);
1215 add_casted_stat(buf, buffer.items, add_stat, c);
1216 snprintf(buf, bsize, "%s:stream_%d_buffer_bytes", name_.c_str(), vb_);
1217 add_casted_stat(buf, buffer.bytes, add_stat, c);
1218 snprintf(buf, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
1219 add_casted_stat(buf, itemsReady ? "true" : "false", add_stat, c);
1220 snprintf(buf, bsize, "%s:stream_%d_last_received_seqno", name_.c_str(), vb_);
1221 add_casted_stat(buf, last_seqno, add_stat, c);
1222
1223 snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_type", name_.c_str(), vb_);
1224 add_casted_stat(buf, snapshotTypeToString(cur_snapshot_type), add_stat, c);
1225
1226 if (cur_snapshot_type != none) {
1227 snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_start", name_.c_str(), vb_);
1228 add_casted_stat(buf, cur_snapshot_start, add_stat, c);
1229 snprintf(buf, bsize, "%s:stream_%d_cur_snapshot_end", name_.c_str(), vb_);
1230 add_casted_stat(buf, cur_snapshot_end, add_stat, c);
1231 }
1232 }
1233
next()1234 DcpResponse* PassiveStream::next() {
1235 LockHolder lh(streamMutex);
1236
1237 if (readyQ.empty()) {
1238 itemsReady = false;
1239 return NULL;
1240 }
1241
1242 DcpResponse* response = readyQ.front();
1243 readyQ.pop();
1244 return response;
1245 }
1246
clearBuffer()1247 void PassiveStream::clearBuffer() {
1248 LockHolder lh(buffer.bufMutex);
1249
1250 while (!buffer.messages.empty()) {
1251 DcpResponse* resp = buffer.messages.front();
1252 buffer.messages.pop();
1253 delete resp;
1254 }
1255
1256 buffer.bytes = 0;
1257 buffer.items = 0;
1258 }
1259
transitionState(stream_state_t newState)1260 void PassiveStream::transitionState(stream_state_t newState) {
1261 LOG(EXTENSION_LOG_DEBUG, "%s (vb %d) Transitioning from %s to %s",
1262 consumer->logHeader(), vb_, stateName(state_), stateName(newState));
1263
1264 if (state_ == newState) {
1265 return;
1266 }
1267
1268 switch (state_) {
1269 case STREAM_PENDING:
1270 cb_assert(newState == STREAM_READING || newState == STREAM_DEAD);
1271 break;
1272 case STREAM_READING:
1273 cb_assert(newState == STREAM_PENDING || newState == STREAM_DEAD);
1274 break;
1275 default:
1276 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid Transition from %s "
1277 "to %s", consumer->logHeader(), vb_, stateName(state_),
1278 stateName(newState));
1279 abort();
1280 }
1281
1282 state_ = newState;
1283 }
1284