1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2013 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "config.h"
19
20 #include "ep_engine.h"
21 #include "failover-table.h"
22 #include "connmap.h"
23 #include "tapthrottle.h"
24 #include "dcp-consumer.h"
25 #include "dcp-response.h"
26 #include "dcp-stream.h"
27
28 class Processer : public GlobalTask {
29 public:
Processer(EventuallyPersistentEngine* e, connection_t c, const Priority &p, double sleeptime = 1, bool shutdown = false)30 Processer(EventuallyPersistentEngine* e, connection_t c,
31 const Priority &p, double sleeptime = 1, bool shutdown = false)
32 : GlobalTask(e, p, sleeptime, shutdown), conn(c) {}
33
34 bool run();
35
36 std::string getDescription();
37
38 private:
39 connection_t conn;
40 };
41
run()42 bool Processer::run() {
43 DcpConsumer* consumer = static_cast<DcpConsumer*>(conn.get());
44 if (consumer->doDisconnect()) {
45 return false;
46 }
47
48 switch (consumer->processBufferedItems()) {
49 case all_processed:
50 snooze(1);
51 break;
52 case more_to_process:
53 snooze(0);
54 break;
55 case cannot_process:
56 snooze(5);
57 break;
58 default:
59 abort();
60 }
61
62 return true;
63 }
64
getDescription()65 std::string Processer::getDescription() {
66 std::stringstream ss;
67 ss << "Processing buffered items for " << conn->getName();
68 return ss.str();
69 }
70
DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie, const std::string &name)71 DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
72 const std::string &name)
73 : Consumer(engine, cookie, name), opaqueCounter(0), processTaskId(0),
74 itemsToProcess(false), lastNoopTime(ep_current_time()), backoffs(0) {
75 Configuration& config = engine.getConfiguration();
76 streams = new passive_stream_t[config.getMaxVbuckets()];
77 setSupportAck(false);
78 setLogHeader("DCP (Consumer) " + getName() + " -");
79 setReserved(true);
80
81 flowControl.enabled = config.isDcpEnableFlowControl();
82 flowControl.bufferSize = config.getDcpConnBufferSize();
83 flowControl.maxUnackedBytes = config.getDcpMaxUnackedBytes();
84
85 noopInterval = config.getDcpNoopInterval();
86 enableNoop = config.isDcpEnableNoop();
87 sendNoopInterval = config.isDcpEnableNoop();
88
89 ExTask task = new Processer(&engine, this, Priority::PendingOpsPriority, 1);
90 processTaskId = ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
91 }
92
~DcpConsumer()93 DcpConsumer::~DcpConsumer() {
94 closeAllStreams();
95 delete[] streams;
96 }
97
addStream(uint32_t opaque, uint16_t vbucket, uint32_t flags)98 ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
99 uint32_t flags) {
100 LockHolder lh(streamMutex);
101 if (doDisconnect()) {
102 return ENGINE_DISCONNECT;
103 }
104
105 RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
106 if (!vb) {
107 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Add stream failed because this "
108 "vbucket doesn't exist", logHeader(), vbucket);
109 return ENGINE_NOT_MY_VBUCKET;
110 }
111
112 if (vb->getState() == vbucket_state_active) {
113 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Add stream failed because this "
114 "vbucket happens to be in active state", logHeader(), vbucket);
115 return ENGINE_NOT_MY_VBUCKET;
116 }
117
118 uint32_t new_opaque = ++opaqueCounter;
119 failover_entry_t entry = vb->failovers->getLatestEntry();
120 uint64_t start_seqno = vb->getHighSeqno();
121 uint64_t end_seqno = std::numeric_limits<uint64_t>::max();
122 uint64_t vbucket_uuid = entry.vb_uuid;
123 uint64_t snap_start_seqno;
124 uint64_t snap_end_seqno;
125
126 vb->getCurrentSnapshot(snap_start_seqno, snap_end_seqno);
127
128 passive_stream_t stream = streams[vbucket];
129 if (stream && stream->isActive()) {
130 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot add stream because one "
131 "already exists", logHeader(), vbucket);
132 return ENGINE_KEY_EEXISTS;
133 }
134
135 streams[vbucket] = new PassiveStream(&engine_, this, getName(), flags,
136 new_opaque, vbucket, start_seqno,
137 end_seqno, vbucket_uuid,
138 snap_start_seqno, snap_end_seqno);
139 ready.push_back(vbucket);
140 opaqueMap_[new_opaque] = std::make_pair(opaque, vbucket);
141
142 return ENGINE_SUCCESS;
143 }
144
closeStream(uint32_t opaque, uint16_t vbucket)145 ENGINE_ERROR_CODE DcpConsumer::closeStream(uint32_t opaque, uint16_t vbucket) {
146 LockHolder lh(streamMutex);
147 if (doDisconnect()) {
148 return ENGINE_DISCONNECT;
149 }
150
151 opaque_map::iterator oitr = opaqueMap_.find(opaque);
152 if (oitr != opaqueMap_.end()) {
153 opaqueMap_.erase(oitr);
154 }
155
156 passive_stream_t stream = streams[vbucket];
157 if (!stream) {
158 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because no "
159 "stream exists for this vbucket", logHeader(), vbucket);
160 return ENGINE_KEY_ENOENT;
161 }
162
163 uint32_t bytesCleared = stream->setDead(END_STREAM_CLOSED);
164 flowControl.freedBytes.fetch_add(bytesCleared);
165 return ENGINE_SUCCESS;
166 }
167
streamEnd(uint32_t opaque, uint16_t vbucket, uint32_t flags)168 ENGINE_ERROR_CODE DcpConsumer::streamEnd(uint32_t opaque, uint16_t vbucket,
169 uint32_t flags) {
170 if (doDisconnect()) {
171 return ENGINE_DISCONNECT;
172 }
173
174 ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
175 passive_stream_t stream = streams[vbucket];
176 if (stream && stream->getOpaque() == opaque && stream->isActive()) {
177 LOG(EXTENSION_LOG_INFO, "%s (vb %d) End stream received with reason %d",
178 logHeader(), vbucket, flags);
179 StreamEndResponse* response = new StreamEndResponse(opaque, flags,
180 vbucket);
181 err = stream->messageReceived(response);
182
183 bool disable = false;
184 if (err == ENGINE_SUCCESS &&
185 itemsToProcess.compare_exchange_strong(disable, true)) {
186 ExecutorPool::get()->wake(processTaskId);
187 }
188 }
189
190 if (err != ENGINE_SUCCESS) {
191 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) End stream received with opaque "
192 "%d but does not exist", logHeader(), vbucket, opaque);
193 flowControl.freedBytes.fetch_add(StreamEndResponse::baseMsgBytes);
194 }
195
196 return err;
197 }
198
mutation(uint32_t opaque, const void* key, uint16_t nkey, const void* value, uint32_t nvalue, uint64_t cas, uint16_t vbucket, uint32_t flags, uint8_t datatype, uint32_t locktime, uint64_t bySeqno, uint64_t revSeqno, uint32_t exptime, uint8_t nru, const void* meta, uint16_t nmeta)199 ENGINE_ERROR_CODE DcpConsumer::mutation(uint32_t opaque, const void* key,
200 uint16_t nkey, const void* value,
201 uint32_t nvalue, uint64_t cas,
202 uint16_t vbucket, uint32_t flags,
203 uint8_t datatype, uint32_t locktime,
204 uint64_t bySeqno, uint64_t revSeqno,
205 uint32_t exptime, uint8_t nru,
206 const void* meta, uint16_t nmeta) {
207 if (doDisconnect()) {
208 return ENGINE_DISCONNECT;
209 }
210
211 ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
212 passive_stream_t stream = streams[vbucket];
213 if (stream && stream->getOpaque() == opaque && stream->isActive()) {
214 std::string key_str(static_cast<const char*>(key), nkey);
215 value_t vblob(Blob::New(static_cast<const char*>(value), nvalue,
216 &(datatype), (uint8_t) EXT_META_LEN));
217 Item *item = new Item(key_str, flags, exptime, vblob, cas, bySeqno,
218 vbucket, revSeqno);
219 MutationResponse* response = new MutationResponse(item, opaque);
220 err = stream->messageReceived(response);
221
222 bool disable = false;
223 if (err == ENGINE_SUCCESS &&
224 itemsToProcess.compare_exchange_strong(disable, true)) {
225 ExecutorPool::get()->wake(processTaskId);
226 }
227 }
228
229 if (err != ENGINE_SUCCESS) {
230 uint32_t bytes =
231 MutationResponse::mutationBaseMsgBytes + nkey + nmeta + nvalue;
232 flowControl.freedBytes.fetch_add(bytes);
233 }
234
235 return err;
236 }
237
deletion(uint32_t opaque, const void* key, uint16_t nkey, uint64_t cas, uint16_t vbucket, uint64_t bySeqno, uint64_t revSeqno, const void* meta, uint16_t nmeta)238 ENGINE_ERROR_CODE DcpConsumer::deletion(uint32_t opaque, const void* key,
239 uint16_t nkey, uint64_t cas,
240 uint16_t vbucket, uint64_t bySeqno,
241 uint64_t revSeqno, const void* meta,
242 uint16_t nmeta) {
243 if (doDisconnect()) {
244 return ENGINE_DISCONNECT;
245 }
246
247 ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
248 passive_stream_t stream = streams[vbucket];
249 if (stream && stream->getOpaque() == opaque && stream->isActive()) {
250 Item* item = new Item(key, nkey, 0, 0, NULL, 0, NULL, 0, cas, bySeqno,
251 vbucket, revSeqno);
252 item->setDeleted();
253 MutationResponse* response = new MutationResponse(item, opaque);
254 err = stream->messageReceived(response);
255
256 bool disable = false;
257 if (err == ENGINE_SUCCESS &&
258 itemsToProcess.compare_exchange_strong(disable, true)) {
259 ExecutorPool::get()->wake(processTaskId);
260 }
261 }
262
263 if (err != ENGINE_SUCCESS) {
264 uint32_t bytes = MutationResponse::deletionBaseMsgBytes + nkey + nmeta;
265 flowControl.freedBytes.fetch_add(bytes);
266 }
267
268 return err;
269 }
270
expiration(uint32_t opaque, const void* key, uint16_t nkey, uint64_t cas, uint16_t vbucket, uint64_t bySeqno, uint64_t revSeqno, const void* meta, uint16_t nmeta)271 ENGINE_ERROR_CODE DcpConsumer::expiration(uint32_t opaque, const void* key,
272 uint16_t nkey, uint64_t cas,
273 uint16_t vbucket, uint64_t bySeqno,
274 uint64_t revSeqno, const void* meta,
275 uint16_t nmeta) {
276 return deletion(opaque, key, nkey, cas, vbucket, bySeqno, revSeqno, meta,
277 nmeta);
278 }
279
snapshotMarker(uint32_t opaque, uint16_t vbucket, uint64_t start_seqno, uint64_t end_seqno, uint32_t flags)280 ENGINE_ERROR_CODE DcpConsumer::snapshotMarker(uint32_t opaque,
281 uint16_t vbucket,
282 uint64_t start_seqno,
283 uint64_t end_seqno,
284 uint32_t flags) {
285 if (doDisconnect()) {
286 return ENGINE_DISCONNECT;
287 }
288
289 ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
290 passive_stream_t stream = streams[vbucket];
291 if (stream && stream->getOpaque() == opaque && stream->isActive()) {
292 SnapshotMarker* response = new SnapshotMarker(opaque, vbucket,
293 start_seqno, end_seqno,
294 flags);
295 err = stream->messageReceived(response);
296
297 bool disable = false;
298 if (err == ENGINE_SUCCESS &&
299 itemsToProcess.compare_exchange_strong(disable, true)) {
300 ExecutorPool::get()->wake(processTaskId);
301 }
302 }
303
304 if (err != ENGINE_SUCCESS) {
305 flowControl.freedBytes.fetch_add(SnapshotMarker::baseMsgBytes);
306 }
307
308 return err;
309 }
310
noop(uint32_t opaque)311 ENGINE_ERROR_CODE DcpConsumer::noop(uint32_t opaque) {
312 lastNoopTime = ep_current_time();
313 return ENGINE_SUCCESS;
314 }
315
flush(uint32_t opaque, uint16_t vbucket)316 ENGINE_ERROR_CODE DcpConsumer::flush(uint32_t opaque, uint16_t vbucket) {
317 if (doDisconnect()) {
318 return ENGINE_DISCONNECT;
319 }
320
321 return ENGINE_ENOTSUP;
322 }
323
setVBucketState(uint32_t opaque, uint16_t vbucket, vbucket_state_t state)324 ENGINE_ERROR_CODE DcpConsumer::setVBucketState(uint32_t opaque,
325 uint16_t vbucket,
326 vbucket_state_t state) {
327 if (doDisconnect()) {
328 return ENGINE_DISCONNECT;
329 }
330
331 ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
332 passive_stream_t stream = streams[vbucket];
333 if (stream && stream->getOpaque() == opaque && stream->isActive()) {
334 SetVBucketState* response = new SetVBucketState(opaque, vbucket, state);
335 err = stream->messageReceived(response);
336
337 bool disable = false;
338 if (err == ENGINE_SUCCESS &&
339 itemsToProcess.compare_exchange_strong(disable, true)) {
340 ExecutorPool::get()->wake(processTaskId);
341 }
342 }
343
344 if (err != ENGINE_SUCCESS) {
345 flowControl.freedBytes.fetch_add(SetVBucketState::baseMsgBytes);
346 }
347
348 return err;
349 }
350
step(struct dcp_message_producers* producers)351 ENGINE_ERROR_CODE DcpConsumer::step(struct dcp_message_producers* producers) {
352 setLastWalkTime();
353
354 if (doDisconnect()) {
355 return ENGINE_DISCONNECT;
356 }
357
358 ENGINE_ERROR_CODE ret;
359 if ((ret = handleFlowCtl(producers)) != ENGINE_FAILED) {
360 if (ret == ENGINE_SUCCESS) {
361 ret = ENGINE_WANT_MORE;
362 }
363 return ret;
364 }
365
366 if ((ret = handleNoop(producers)) != ENGINE_FAILED) {
367 if (ret == ENGINE_SUCCESS) {
368 ret = ENGINE_WANT_MORE;
369 }
370 return ret;
371 }
372
373 DcpResponse *resp = getNextItem();
374 if (resp == NULL) {
375 return ENGINE_SUCCESS;
376 }
377
378 ret = ENGINE_SUCCESS;
379 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
380 switch (resp->getEvent()) {
381 case DCP_ADD_STREAM:
382 {
383 AddStreamResponse *as = static_cast<AddStreamResponse*>(resp);
384 ret = producers->add_stream_rsp(getCookie(), as->getOpaque(),
385 as->getStreamOpaque(),
386 as->getStatus());
387 break;
388 }
389 case DCP_STREAM_REQ:
390 {
391 StreamRequest *sr = static_cast<StreamRequest*> (resp);
392 ret = producers->stream_req(getCookie(), sr->getOpaque(),
393 sr->getVBucket(), sr->getFlags(),
394 sr->getStartSeqno(), sr->getEndSeqno(),
395 sr->getVBucketUUID(),
396 sr->getSnapStartSeqno(),
397 sr->getSnapEndSeqno());
398 break;
399 }
400 case DCP_SET_VBUCKET:
401 {
402 SetVBucketStateResponse* vs;
403 vs = static_cast<SetVBucketStateResponse*>(resp);
404 ret = producers->set_vbucket_state_rsp(getCookie(), vs->getOpaque(),
405 vs->getStatus());
406 break;
407 }
408 case DCP_SNAPSHOT_MARKER:
409 {
410 SnapshotMarkerResponse* mr;
411 mr = static_cast<SnapshotMarkerResponse*>(resp);
412 ret = producers->marker_rsp(getCookie(), mr->getOpaque(),
413 mr->getStatus());
414 break;
415 }
416 default:
417 LOG(EXTENSION_LOG_WARNING, "%s Unknown consumer event (%d), "
418 "disconnecting", logHeader(), resp->getEvent());
419 ret = ENGINE_DISCONNECT;
420 }
421 ObjectRegistry::onSwitchThread(epe);
422 delete resp;
423
424 if (ret == ENGINE_SUCCESS) {
425 return ENGINE_WANT_MORE;
426 }
427 return ret;
428 }
429
run()430 bool RollbackTask::run() {
431 if (cons->doRollback(opaque, vbid, rollbackSeqno)) {
432 return true;
433 }
434 ++(engine->getEpStats().rollbackCount);
435 return false;
436 }
437
handleResponse( protocol_binary_response_header *resp)438 ENGINE_ERROR_CODE DcpConsumer::handleResponse(
439 protocol_binary_response_header *resp) {
440 if (doDisconnect()) {
441 return ENGINE_DISCONNECT;
442 }
443
444 uint8_t opcode = resp->response.opcode;
445 uint32_t opaque = resp->response.opaque;
446
447 opaque_map::iterator oitr = opaqueMap_.find(opaque);
448
449 bool validOpaque = false;
450 if (oitr != opaqueMap_.end()) {
451 validOpaque = isValidOpaque(opaque, oitr->second.second);
452 }
453
454 if (!validOpaque) {
455 LOG(EXTENSION_LOG_WARNING, "%s Received response with opaque %ld and "
456 "that stream no longer exists", logHeader());
457 return ENGINE_KEY_ENOENT;
458 }
459
460 if (opcode == PROTOCOL_BINARY_CMD_DCP_STREAM_REQ) {
461 protocol_binary_response_dcp_stream_req* pkt =
462 reinterpret_cast<protocol_binary_response_dcp_stream_req*>(resp);
463
464 uint16_t vbid = oitr->second.second;
465 uint16_t status = ntohs(pkt->message.header.response.status);
466 uint64_t bodylen = ntohl(pkt->message.header.response.bodylen);
467 uint8_t* body = pkt->bytes + sizeof(protocol_binary_response_header);
468
469 if (status == PROTOCOL_BINARY_RESPONSE_ROLLBACK) {
470 cb_assert(bodylen == sizeof(uint64_t));
471 uint64_t rollbackSeqno = 0;
472 memcpy(&rollbackSeqno, body, sizeof(uint64_t));
473 rollbackSeqno = ntohll(rollbackSeqno);
474
475 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Received rollback request "
476 "to rollback seq no. %llu", logHeader(), vbid, rollbackSeqno);
477
478 ExTask task = new RollbackTask(&engine_, opaque, vbid,
479 rollbackSeqno, this,
480 Priority::TapBgFetcherPriority);
481 ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
482 return ENGINE_SUCCESS;
483 }
484
485 if (((bodylen % 16) != 0 || bodylen == 0) && status == ENGINE_SUCCESS) {
486 LOG(EXTENSION_LOG_WARNING, "%s (vb %d)Got a stream response with a "
487 "bad failover log (length %llu), disconnecting", logHeader(),
488 vbid, bodylen);
489 return ENGINE_DISCONNECT;
490 }
491
492 streamAccepted(opaque, status, body, bodylen);
493 return ENGINE_SUCCESS;
494 } else if (opcode == PROTOCOL_BINARY_CMD_DCP_BUFFER_ACKNOWLEDGEMENT ||
495 opcode == PROTOCOL_BINARY_CMD_DCP_CONTROL) {
496 return ENGINE_SUCCESS;
497 }
498
499 LOG(EXTENSION_LOG_WARNING, "%s Trying to handle an unknown response %d, "
500 "disconnecting", logHeader(), opcode);
501
502 return ENGINE_DISCONNECT;
503 }
504
doRollback(uint32_t opaque, uint16_t vbid, uint64_t rollbackSeqno)505 bool DcpConsumer::doRollback(uint32_t opaque, uint16_t vbid,
506 uint64_t rollbackSeqno) {
507 ENGINE_ERROR_CODE err = engine_.getEpStore()->rollback(vbid, rollbackSeqno);
508
509 if (err == ENGINE_NOT_MY_VBUCKET) {
510 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Rollback failed because the "
511 "vbucket was not found", logHeader(), vbid);
512 return false;
513 } else if (err == ENGINE_TMPFAIL) {
514 return true; // Reschedule the rollback.
515 }
516
517 cb_assert(err == ENGINE_SUCCESS);
518
519 LockHolder lh(streamMutex);
520 RCPtr<VBucket> vb = engine_.getVBucket(vbid);
521 streams[vbid]->reconnectStream(vb, opaque, vb->getHighSeqno());
522
523 return false;
524 }
525
addStats(ADD_STAT add_stat, const void *c)526 void DcpConsumer::addStats(ADD_STAT add_stat, const void *c) {
527 ConnHandler::addStats(add_stat, c);
528
529 int max_vbuckets = engine_.getConfiguration().getMaxVbuckets();
530 for (int vbucket = 0; vbucket < max_vbuckets; vbucket++) {
531 passive_stream_t stream = streams[vbucket];
532 if (stream) {
533 stream->addStats(add_stat, c);
534 }
535 }
536
537 addStat("total_backoffs", backoffs, add_stat, c);
538 if (flowControl.enabled) {
539 addStat("total_acked_bytes", flowControl.ackedBytes, add_stat, c);
540 }
541 }
542
aggregateQueueStats(ConnCounter* aggregator)543 void DcpConsumer::aggregateQueueStats(ConnCounter* aggregator) {
544 aggregator->conn_queueBackoff += backoffs;
545 }
546
processBufferedItems()547 process_items_error_t DcpConsumer::processBufferedItems() {
548 itemsToProcess.store(false);
549 process_items_error_t process_ret = all_processed;
550
551 int max_vbuckets = engine_.getConfiguration().getMaxVbuckets();
552 for (int vbucket = 0; vbucket < max_vbuckets; vbucket++) {
553
554 passive_stream_t stream = streams[vbucket];
555 if (!stream) {
556 continue;
557 }
558
559 uint32_t bytes_processed;
560
561 do {
562 if (!engine_.getTapThrottle().shouldProcess()) {
563 backoffs++;
564 return cannot_process;
565 }
566
567 bytes_processed = 0;
568 process_ret = stream->processBufferedMessages(bytes_processed);
569 flowControl.freedBytes.fetch_add(bytes_processed);
570 } while (bytes_processed > 0 && process_ret != cannot_process);
571 }
572
573 if (process_ret == all_processed && itemsToProcess.load()) {
574 return more_to_process;
575 }
576
577 return process_ret;
578 }
579
getNextItem()580 DcpResponse* DcpConsumer::getNextItem() {
581 LockHolder lh(streamMutex);
582
583 setPaused(false);
584 while (!ready.empty()) {
585 uint16_t vbucket = ready.front();
586 ready.pop_front();
587
588 passive_stream_t stream = streams[vbucket];
589 if (!stream) {
590 continue;
591 }
592
593 DcpResponse* op = stream->next();
594 if (!op) {
595 continue;
596 }
597 switch (op->getEvent()) {
598 case DCP_STREAM_REQ:
599 case DCP_ADD_STREAM:
600 case DCP_SET_VBUCKET:
601 case DCP_SNAPSHOT_MARKER:
602 break;
603 default:
604 LOG(EXTENSION_LOG_WARNING, "%s Consumer is attempting to write"
605 " an unexpected event %d", logHeader(), op->getEvent());
606 abort();
607 }
608
609 ready.push_back(vbucket);
610 return op;
611 }
612 setPaused(true);
613
614 return NULL;
615 }
616
notifyStreamReady(uint16_t vbucket)617 void DcpConsumer::notifyStreamReady(uint16_t vbucket) {
618 std::list<uint16_t>::iterator iter =
619 std::find(ready.begin(), ready.end(), vbucket);
620 if (iter != ready.end()) {
621 return;
622 }
623
624 ready.push_back(vbucket);
625
626 engine_.getDcpConnMap().notifyPausedConnection(this, true);
627 }
628
streamAccepted(uint32_t opaque, uint16_t status, uint8_t* body, uint32_t bodylen)629 void DcpConsumer::streamAccepted(uint32_t opaque, uint16_t status, uint8_t* body,
630 uint32_t bodylen) {
631 LockHolder lh(streamMutex);
632
633 opaque_map::iterator oitr = opaqueMap_.find(opaque);
634 if (oitr != opaqueMap_.end()) {
635 uint32_t add_opaque = oitr->second.first;
636 uint16_t vbucket = oitr->second.second;
637
638 passive_stream_t stream = streams[vbucket];
639 if (stream && stream->getOpaque() == opaque &&
640 stream->getState() == STREAM_PENDING) {
641 if (status == ENGINE_SUCCESS) {
642 RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
643 vb->failovers->replaceFailoverLog(body, bodylen);
644 EventuallyPersistentStore* st = engine_.getEpStore();
645 st->scheduleVBSnapshot(Priority::VBucketPersistHighPriority,
646 st->getVBuckets().getShard(vbucket)->getId());
647 }
648 LOG(EXTENSION_LOG_INFO, "%s (vb %d) Add stream for opaque %ld"
649 " %s with error code %d", logHeader(), vbucket, opaque,
650 status == ENGINE_SUCCESS ? "succeeded" : "failed", status);
651 stream->acceptStream(status, add_opaque);
652 } else {
653 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Trying to add stream, but "
654 "none exists (opaque: %d, add_opaque: %d)", logHeader(),
655 vbucket, opaque, add_opaque);
656 }
657 opaqueMap_.erase(opaque);
658 } else {
659 LOG(EXTENSION_LOG_WARNING, "%s No opaque found for add stream response "
660 "with opaque %ld", logHeader(), opaque);
661 }
662 }
663
isValidOpaque(uint32_t opaque, uint16_t vbucket)664 bool DcpConsumer::isValidOpaque(uint32_t opaque, uint16_t vbucket) {
665 LockHolder lh(streamMutex);
666 passive_stream_t stream = streams[vbucket];
667 return stream && stream->getOpaque() == opaque;
668 }
669
closeAllStreams()670 void DcpConsumer::closeAllStreams() {
671 int max_vbuckets = engine_.getConfiguration().getMaxVbuckets();
672 for (int vbucket = 0; vbucket < max_vbuckets; vbucket++) {
673 passive_stream_t stream = streams[vbucket];
674 if (stream) {
675 stream->setDead(END_STREAM_DISCONNECTED);
676 }
677 }
678 }
679
handleNoop(struct dcp_message_producers* producers)680 ENGINE_ERROR_CODE DcpConsumer::handleNoop(struct dcp_message_producers* producers) {
681 if (enableNoop) {
682 ENGINE_ERROR_CODE ret;
683 uint32_t opaque = ++opaqueCounter;
684 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
685 ret = producers->control(getCookie(), opaque, "enable_noop", 11,
686 "true", 4);
687 ObjectRegistry::onSwitchThread(epe);
688 enableNoop = false;
689 return ret;
690 }
691
692 if (sendNoopInterval) {
693 ENGINE_ERROR_CODE ret;
694 uint32_t opaque = ++opaqueCounter;
695 char buf_size[10];
696 snprintf(buf_size, 10, "%u", noopInterval);
697 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
698 ret = producers->control(getCookie(), opaque, "set_noop_interval", 17,
699 buf_size, strlen(buf_size));
700 ObjectRegistry::onSwitchThread(epe);
701 sendNoopInterval = false;
702 return ret;
703 }
704
705 if ((ep_current_time() - lastNoopTime) > (noopInterval * 2)) {
706 LOG(EXTENSION_LOG_WARNING, "%s Disconnecting because noop message has "
707 "no been received for %u seconds", logHeader(), (noopInterval * 2));
708 return ENGINE_DISCONNECT;
709 }
710
711 return ENGINE_FAILED;
712 }
713
handleFlowCtl(struct dcp_message_producers* producers)714 ENGINE_ERROR_CODE DcpConsumer::handleFlowCtl(struct dcp_message_producers* producers) {
715 if (flowControl.enabled) {
716 ENGINE_ERROR_CODE ret;
717 uint32_t ackable_bytes = flowControl.freedBytes;
718 if (flowControl.pendingControl) {
719 uint32_t opaque = ++opaqueCounter;
720 char buf_size[10];
721 snprintf(buf_size, 10, "%u", flowControl.bufferSize);
722 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
723 ret = producers->control(getCookie(), opaque,
724 "connection_buffer_size", 22, buf_size,
725 strlen(buf_size));
726 ObjectRegistry::onSwitchThread(epe);
727 flowControl.pendingControl = false;
728 return (ret == ENGINE_SUCCESS) ? ENGINE_WANT_MORE : ret;
729 } else if (ackable_bytes > (flowControl.bufferSize * .2)) {
730 // Send a buffer ack when at least 20% of the buffer is drained
731 uint32_t opaque = ++opaqueCounter;
732 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
733 ret = producers->buffer_acknowledgement(getCookie(), opaque, 0,
734 ackable_bytes);
735 ObjectRegistry::onSwitchThread(epe);
736 flowControl.lastBufferAck = ep_current_time();
737 flowControl.ackedBytes.fetch_add(ackable_bytes);
738 flowControl.freedBytes.fetch_sub(ackable_bytes);
739 return (ret == ENGINE_SUCCESS) ? ENGINE_WANT_MORE : ret;
740 } else if (ackable_bytes > 0 &&
741 (ep_current_time() - flowControl.lastBufferAck) > 5) {
742 // Ack at least every 5 seconds
743 uint32_t opaque = ++opaqueCounter;
744 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
745 ret = producers->buffer_acknowledgement(getCookie(), opaque, 0,
746 ackable_bytes);
747 ObjectRegistry::onSwitchThread(epe);
748 flowControl.lastBufferAck = ep_current_time();
749 flowControl.ackedBytes.fetch_add(ackable_bytes);
750 flowControl.freedBytes.fetch_sub(ackable_bytes);
751 return (ret == ENGINE_SUCCESS) ? ENGINE_WANT_MORE : ret;
752 }
753 }
754 return ENGINE_FAILED;
755 }
756