1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2013 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "config.h"
19
20 #include "backfill.h"
21 #include "ep_engine.h"
22 #include "failover-table.h"
23 #include "dcp-producer.h"
24 #include "dcp-response.h"
25 #include "dcp-stream.h"
26
27 const uint32_t DcpProducer::defaultNoopInerval = 20;
28
insert(DcpResponse* response)29 void BufferLog::insert(DcpResponse* response) {
30 cb_assert(!isFull());
31 bytes_sent += response->getMessageSize();
32 }
33
free(uint32_t bytes_to_free)34 void BufferLog::free(uint32_t bytes_to_free) {
35 if (bytes_sent >= bytes_to_free) {
36 bytes_sent -= bytes_to_free;
37 } else {
38 bytes_sent = 0;
39 }
40 }
41
DcpProducer(EventuallyPersistentEngine &e, const void *cookie, const std::string &name, bool isNotifier)42 DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
43 const std::string &name, bool isNotifier)
44 : Producer(e, cookie, name), rejectResp(NULL),
45 notifyOnly(isNotifier), lastSendTime(ep_current_time()), log(NULL),
46 itemsSent(0), totalBytesSent(0), ackedBytes(0) {
47 setSupportAck(true);
48 setReserved(true);
49 setPaused(true);
50
51 if (notifyOnly) {
52 setLogHeader("DCP (Notifier) " + getName() + " -");
53 } else {
54 setLogHeader("DCP (Producer) " + getName() + " -");
55 }
56
57 if (getName().find("replication") != std::string::npos) {
58 engine_.setDCPPriority(getCookie(), CONN_PRIORITY_HIGH);
59 } else if (getName().find("xdcr") != std::string::npos) {
60 engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
61 } else if (getName().find("views") != std::string::npos) {
62 engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
63 }
64
65 // The consumer assigns opaques starting at 0 so lets have the producer
66 //start using opaques at 10M to prevent any opaque conflicts.
67 noopCtx.opaque = 10000000;
68 noopCtx.sendTime = ep_current_time();
69
70 // This is for backward compatibility with Couchbase 3.0. In 3.0 we set the
71 // noop interval to 20 seconds by default, but in post 3.0 releases we set
72 // it to be higher by default. Starting in 3.0.1 the DCP consumer sets the
73 // noop interval of the producer when connecting so in an all 3.0.1+ cluster
74 // this value will be overriden. In 3.0 however we do not set the noop
75 // interval so setting this value will make sure we don't disconnect on
76 // accident due to the producer and the consumer having a different noop
77 // interval.
78 noopCtx.noopInterval = defaultNoopInerval;
79 noopCtx.pendingRecv = false;
80 noopCtx.enabled = false;
81 }
82
~DcpProducer()83 DcpProducer::~DcpProducer() {
84 if (log) {
85 delete log;
86 }
87 }
88
streamRequest(uint32_t flags, uint32_t opaque, uint16_t vbucket, uint64_t start_seqno, uint64_t end_seqno, uint64_t vbucket_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno, uint64_t *rollback_seqno, dcp_add_failover_log callback)89 ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
90 uint32_t opaque,
91 uint16_t vbucket,
92 uint64_t start_seqno,
93 uint64_t end_seqno,
94 uint64_t vbucket_uuid,
95 uint64_t snap_start_seqno,
96 uint64_t snap_end_seqno,
97 uint64_t *rollback_seqno,
98 dcp_add_failover_log callback) {
99 if (doDisconnect()) {
100 return ENGINE_DISCONNECT;
101 }
102
103 LockHolder lh(queueLock);
104 RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
105 if (!vb) {
106 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
107 "this vbucket doesn't exist", logHeader(), vbucket);
108 return ENGINE_NOT_MY_VBUCKET;
109 }
110
111 if (vb->checkpointManager.getOpenCheckpointId() == 0) {
112 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
113 "this vbucket is in backfill state", logHeader(), vbucket);
114 return ENGINE_TMPFAIL;
115 }
116
117 if (flags & DCP_ADD_STREAM_FLAG_LATEST) {
118 end_seqno = vb->getHighSeqno();
119 }
120
121 if (flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
122 end_seqno = engine_.getEpStore()->getLastPersistedSeqno(vbucket);
123 }
124
125 if (!notifyOnly && start_seqno > end_seqno) {
126 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
127 "the start seqno (%llu) is larger than the end seqno (%llu)",
128 logHeader(), vbucket, start_seqno, end_seqno);
129 return ENGINE_ERANGE;
130 }
131
132 if (!notifyOnly && !(snap_start_seqno <= start_seqno &&
133 start_seqno <= snap_end_seqno)) {
134 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
135 "the snap start seqno (%llu) <= start seqno (%llu) <= snap end "
136 "seqno (%llu) is required", logHeader(), vbucket, snap_start_seqno,
137 start_seqno, snap_end_seqno);
138 return ENGINE_ERANGE;
139 }
140
141 bool add_vb_conn_map = true;
142 std::map<uint16_t, stream_t>::iterator itr;
143 if ((itr = streams.find(vbucket)) != streams.end()) {
144 if (itr->second->getState() != STREAM_DEAD) {
145 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed"
146 " because a stream already exists for this vbucket",
147 logHeader(), vbucket);
148 return ENGINE_KEY_EEXISTS;
149 } else {
150 streams.erase(vbucket);
151 ready.remove(vbucket);
152 // Don't need to add an entry to vbucket-to-conns map
153 add_vb_conn_map = false;
154 }
155 }
156
157 // If we are a notify stream then we can't use the start_seqno supplied
158 // since if it is greater than the current high seqno then it will always
159 // trigger a rollback. As a result we should use the current high seqno for
160 // rollback purposes.
161 uint64_t notifySeqno = start_seqno;
162 if (notifyOnly && start_seqno > static_cast<uint64_t>(vb->getHighSeqno())) {
163 start_seqno = static_cast<uint64_t>(vb->getHighSeqno());
164 }
165
166 if (vb->failovers->needsRollback(start_seqno, vb->getHighSeqno(),
167 vbucket_uuid, snap_start_seqno,
168 snap_end_seqno, vb->getPurgeSeqno(),
169 rollback_seqno)) {
170 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed "
171 "because a rollback to seqno %llu is required (start seqno %llu, "
172 "vb_uuid %llu, snapStartSeqno %llu, snapEndSeqno %llu)",
173 logHeader(), vbucket, *rollback_seqno, start_seqno, vbucket_uuid,
174 snap_start_seqno, snap_end_seqno);
175 return ENGINE_ROLLBACK;
176 }
177
178 ENGINE_ERROR_CODE rv = vb->failovers->addFailoverLog(getCookie(), callback);
179 if (rv != ENGINE_SUCCESS) {
180 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Couldn't add failover log to "
181 "stream request due to error %d", logHeader(), vbucket, rv);
182 return rv;
183 }
184
185 if (notifyOnly) {
186 streams[vbucket] = new NotifierStream(&engine_, this, getName(), flags,
187 opaque, vbucket, notifySeqno,
188 end_seqno, vbucket_uuid,
189 snap_start_seqno, snap_end_seqno);
190 } else {
191 streams[vbucket] = new ActiveStream(&engine_, this, getName(), flags,
192 opaque, vbucket, start_seqno,
193 end_seqno, vbucket_uuid,
194 snap_start_seqno, snap_end_seqno);
195 static_cast<ActiveStream*>(streams[vbucket].get())->setActive();
196 }
197
198 ready.push_back(vbucket);
199 lh.unlock();
200 if (add_vb_conn_map) {
201 connection_t conn(this);
202 engine_.getDcpConnMap().addVBConnByVBId(conn, vbucket);
203 }
204
205 return rv;
206 }
207
getFailoverLog(uint32_t opaque, uint16_t vbucket, dcp_add_failover_log callback)208 ENGINE_ERROR_CODE DcpProducer::getFailoverLog(uint32_t opaque, uint16_t vbucket,
209 dcp_add_failover_log callback) {
210 (void) opaque;
211 if (doDisconnect()) {
212 return ENGINE_DISCONNECT;
213 }
214
215 RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
216 if (!vb) {
217 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Get Failover Log failed "
218 "because this vbucket doesn't exist", logHeader(), vbucket);
219 return ENGINE_NOT_MY_VBUCKET;
220 }
221
222 return vb->failovers->addFailoverLog(getCookie(), callback);
223 }
224
step(struct dcp_message_producers* producers)225 ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
226 setLastWalkTime();
227
228 if (doDisconnect()) {
229 return ENGINE_DISCONNECT;
230 }
231
232 ENGINE_ERROR_CODE ret;
233 if ((ret = maybeSendNoop(producers)) != ENGINE_FAILED) {
234 return ret;
235 }
236
237 DcpResponse *resp;
238 if (rejectResp) {
239 resp = rejectResp;
240 rejectResp = NULL;
241 } else {
242 resp = getNextItem();
243 if (!resp) {
244 return ENGINE_SUCCESS;
245 }
246 }
247
248 ret = ENGINE_SUCCESS;
249
250 Item* itmCpy = NULL;
251 if (resp->getEvent() == DCP_MUTATION) {
252 itmCpy = static_cast<MutationResponse*>(resp)->getItemCopy();
253 }
254
255 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL,
256 true);
257 switch (resp->getEvent()) {
258 case DCP_STREAM_END:
259 {
260 StreamEndResponse *se = static_cast<StreamEndResponse*>(resp);
261 ret = producers->stream_end(getCookie(), se->getOpaque(),
262 se->getVbucket(), se->getFlags());
263 break;
264 }
265 case DCP_MUTATION:
266 {
267 MutationResponse *m = dynamic_cast<MutationResponse*> (resp);
268 ret = producers->mutation(getCookie(), m->getOpaque(), itmCpy,
269 m->getVBucket(), m->getBySeqno(),
270 m->getRevSeqno(), 0, NULL, 0,
271 m->getItem()->getNRUValue());
272 break;
273 }
274 case DCP_DELETION:
275 {
276 MutationResponse *m = static_cast<MutationResponse*>(resp);
277 ret = producers->deletion(getCookie(), m->getOpaque(),
278 m->getItem()->getKey().c_str(),
279 m->getItem()->getNKey(),
280 m->getItem()->getCas(),
281 m->getVBucket(), m->getBySeqno(),
282 m->getRevSeqno(), NULL, 0);
283 break;
284 }
285 case DCP_SNAPSHOT_MARKER:
286 {
287 SnapshotMarker *s = static_cast<SnapshotMarker*>(resp);
288 ret = producers->marker(getCookie(), s->getOpaque(),
289 s->getVBucket(),
290 s->getStartSeqno(),
291 s->getEndSeqno(),
292 s->getFlags());
293 break;
294 }
295 case DCP_SET_VBUCKET:
296 {
297 SetVBucketState *s = static_cast<SetVBucketState*>(resp);
298 ret = producers->set_vbucket_state(getCookie(), s->getOpaque(),
299 s->getVBucket(), s->getState());
300 break;
301 }
302 default:
303 {
304 LOG(EXTENSION_LOG_WARNING, "%s Unexpected dcp event (%d), "
305 "disconnecting", logHeader(), resp->getEvent());
306 ret = ENGINE_DISCONNECT;
307 break;
308 }
309 }
310
311 ObjectRegistry::onSwitchThread(epe);
312 if (resp->getEvent() == DCP_MUTATION && ret != ENGINE_SUCCESS) {
313 delete itmCpy;
314 }
315
316 if (ret == ENGINE_E2BIG) {
317 rejectResp = resp;
318 } else {
319 delete resp;
320 }
321
322 lastSendTime = ep_current_time();
323 return (ret == ENGINE_SUCCESS) ? ENGINE_WANT_MORE : ret;
324 }
325
bufferAcknowledgement(uint32_t opaque, uint16_t vbucket, uint32_t buffer_bytes)326 ENGINE_ERROR_CODE DcpProducer::bufferAcknowledgement(uint32_t opaque,
327 uint16_t vbucket,
328 uint32_t buffer_bytes) {
329 LockHolder lh(queueLock);
330 if (log) {
331 bool wasFull = log->isFull();
332
333 ackedBytes.fetch_add(buffer_bytes);
334 log->free(buffer_bytes);
335 lh.unlock();
336
337 if (wasFull) {
338 engine_.getDcpConnMap().notifyPausedConnection(this, true);
339 }
340 }
341
342 return ENGINE_SUCCESS;
343 }
344
control(uint32_t opaque, const void* key, uint16_t nkey, const void* value, uint32_t nvalue)345 ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
346 uint16_t nkey, const void* value,
347 uint32_t nvalue) {
348 LockHolder lh(queueLock);
349 const char* param = static_cast<const char*>(key);
350 std::string keyStr(static_cast<const char*>(key), nkey);
351 std::string valueStr(static_cast<const char*>(value), nvalue);
352
353 if (strncmp(param, "connection_buffer_size", nkey) == 0) {
354 uint32_t size;
355 if (parseUint32(valueStr.c_str(), &size)) {
356 if (!log) {
357 log = new BufferLog(size);
358 } else if (log->getBufferSize() != size) {
359 log->setBufferSize(size);
360 }
361 return ENGINE_SUCCESS;
362 }
363 } else if (strncmp(param, "stream_buffer_size", nkey) == 0) {
364 LOG(EXTENSION_LOG_WARNING, "%s The ctrl parameter stream_buffer_size is"
365 "not supported by this engine", logHeader());
366 return ENGINE_ENOTSUP;
367 } else if (strncmp(param, "enable_noop", nkey) == 0) {
368 if (valueStr.compare("true") == 0) {
369 noopCtx.enabled = true;
370 } else {
371 noopCtx.enabled = false;
372 }
373 return ENGINE_SUCCESS;
374 } else if (strncmp(param, "set_noop_interval", nkey) == 0) {
375 if (parseUint32(valueStr.c_str(), &noopCtx.noopInterval)) {
376 return ENGINE_SUCCESS;
377 }
378 }
379
380 LOG(EXTENSION_LOG_WARNING, "%s Invalid ctrl parameter '%s' for %s",
381 logHeader(), valueStr.c_str(), keyStr.c_str());
382
383 return ENGINE_EINVAL;
384 }
385
handleResponse( protocol_binary_response_header *resp)386 ENGINE_ERROR_CODE DcpProducer::handleResponse(
387 protocol_binary_response_header *resp) {
388 if (doDisconnect()) {
389 return ENGINE_DISCONNECT;
390 }
391
392 uint8_t opcode = resp->response.opcode;
393 if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE ||
394 opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
395 protocol_binary_response_dcp_stream_req* pkt =
396 reinterpret_cast<protocol_binary_response_dcp_stream_req*>(resp);
397 uint32_t opaque = pkt->message.header.response.opaque;
398
399 LockHolder lh(queueLock);
400 stream_t active_stream;
401 std::map<uint16_t, stream_t>::iterator itr;
402 for (itr = streams.begin() ; itr != streams.end(); ++itr) {
403 active_stream = itr->second;
404 Stream *str = active_stream.get();
405 if (str && str->getType() == STREAM_ACTIVE) {
406 ActiveStream* as = static_cast<ActiveStream*>(str);
407 if (as && opaque == itr->second->getOpaque()) {
408 break;
409 }
410 }
411 }
412
413 if (itr != streams.end()) {
414 lh.unlock();
415 ActiveStream *as = static_cast<ActiveStream*>(active_stream.get());
416 if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE) {
417 as->setVBucketStateAckRecieved();
418 } else if (opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
419 as->snapshotMarkerAckReceived();
420 }
421 }
422
423 return ENGINE_SUCCESS;
424 } else if (opcode == PROTOCOL_BINARY_CMD_DCP_MUTATION ||
425 opcode == PROTOCOL_BINARY_CMD_DCP_DELETION ||
426 opcode == PROTOCOL_BINARY_CMD_DCP_EXPIRATION ||
427 opcode == PROTOCOL_BINARY_CMD_DCP_STREAM_END) {
428 // TODO: When nacking is implemented we need to handle these responses
429 return ENGINE_SUCCESS;
430 } else if (opcode == PROTOCOL_BINARY_CMD_DCP_NOOP) {
431 if (noopCtx.opaque == resp->response.opaque) {
432 noopCtx.pendingRecv = false;
433 return ENGINE_SUCCESS;
434 }
435 }
436
437 LOG(EXTENSION_LOG_WARNING, "%s Trying to handle an unknown response %d, "
438 "disconnecting", logHeader(), opcode);
439
440 return ENGINE_DISCONNECT;
441 }
442
closeStream(uint32_t opaque, uint16_t vbucket)443 ENGINE_ERROR_CODE DcpProducer::closeStream(uint32_t opaque, uint16_t vbucket) {
444 if (doDisconnect()) {
445 return ENGINE_DISCONNECT;
446 }
447
448 LockHolder lh(queueLock);
449 std::map<uint16_t, stream_t>::iterator itr;
450 if ((itr = streams.find(vbucket)) == streams.end()) {
451 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because no "
452 "stream exists for this vbucket", logHeader(), vbucket);
453 return ENGINE_KEY_ENOENT;
454 } else if (!itr->second->isActive()) {
455 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because "
456 "stream is already marked as dead", logHeader(), vbucket);
457 streams.erase(vbucket);
458 ready.remove(vbucket);
459 lh.unlock();
460 connection_t conn(this);
461 engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
462 return ENGINE_KEY_ENOENT;
463 }
464
465 stream_t stream = itr->second;
466 streams.erase(vbucket);
467 ready.remove(vbucket);
468 lh.unlock();
469
470 stream->setDead(END_STREAM_CLOSED);
471 connection_t conn(this);
472 engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
473 return ENGINE_SUCCESS;
474 }
475
addStats(ADD_STAT add_stat, const void *c)476 void DcpProducer::addStats(ADD_STAT add_stat, const void *c) {
477 Producer::addStats(add_stat, c);
478
479 LockHolder lh(queueLock);
480
481 addStat("items_sent", getItemsSent(), add_stat, c);
482 addStat("items_remaining", getItemsRemaining_UNLOCKED(), add_stat, c);
483 addStat("total_bytes_sent", getTotalBytes(), add_stat, c);
484 addStat("last_sent_time", lastSendTime, add_stat, c);
485 addStat("noop_enabled", noopCtx.enabled, add_stat, c);
486 addStat("noop_wait", noopCtx.pendingRecv, add_stat, c);
487
488 if (log) {
489 addStat("max_buffer_bytes", log->getBufferSize(), add_stat, c);
490 addStat("unacked_bytes", log->getBytesSent(), add_stat, c);
491 addStat("total_acked_bytes", ackedBytes, add_stat, c);
492 addStat("flow_control", "enabled", add_stat, c);
493 } else {
494 addStat("flow_control", "disabled", add_stat, c);
495 }
496
497 std::map<uint16_t, stream_t>::iterator itr;
498 for (itr = streams.begin(); itr != streams.end(); ++itr) {
499 itr->second->addStats(add_stat, c);
500 }
501 }
502
addTakeoverStats(ADD_STAT add_stat, const void* c, uint16_t vbid)503 void DcpProducer::addTakeoverStats(ADD_STAT add_stat, const void* c,
504 uint16_t vbid) {
505 LockHolder lh(queueLock);
506 std::map<uint16_t, stream_t>::iterator itr = streams.find(vbid);
507 if (itr != streams.end()) {
508 Stream *s = itr->second.get();
509 if (s && s->getType() == STREAM_ACTIVE) {
510 ActiveStream* as = static_cast<ActiveStream*>(s);
511 if (as) {
512 as->addTakeoverStats(add_stat, c);
513 }
514 }
515 }
516 }
517
aggregateQueueStats(ConnCounter* aggregator)518 void DcpProducer::aggregateQueueStats(ConnCounter* aggregator) {
519 LockHolder lh(queueLock);
520 if (!aggregator) {
521 LOG(EXTENSION_LOG_WARNING, "%s Pointer to the queue stats aggregator"
522 " is NULL!!!", logHeader());
523 return;
524 }
525 aggregator->conn_queueDrain += itemsSent;
526 aggregator->conn_totalBytes += totalBytesSent;
527 aggregator->conn_queueRemaining += getItemsRemaining_UNLOCKED();
528 aggregator->conn_queueBackfillRemaining += totalBackfillBacklogs;
529 }
530
notifySeqnoAvailable(uint16_t vbucket, uint64_t seqno)531 void DcpProducer::notifySeqnoAvailable(uint16_t vbucket, uint64_t seqno) {
532 LockHolder lh(queueLock);
533 std::map<uint16_t, stream_t>::iterator itr = streams.find(vbucket);
534 if (itr != streams.end() && itr->second->isActive()) {
535 stream_t stream = itr->second;
536 lh.unlock();
537 stream->notifySeqnoAvailable(seqno);
538 }
539 }
540
vbucketStateChanged(uint16_t vbucket, vbucket_state_t state)541 void DcpProducer::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
542 LockHolder lh(queueLock);
543 std::map<uint16_t, stream_t>::iterator itr = streams.find(vbucket);
544 if (itr != streams.end()) {
545 stream_t stream = itr->second;
546 lh.unlock();
547 stream->setDead(END_STREAM_STATE);
548 }
549 }
550
closeAllStreams()551 void DcpProducer::closeAllStreams() {
552 LockHolder lh(queueLock);
553 std::list<uint16_t> vblist;
554 while (!streams.empty()) {
555 std::map<uint16_t, stream_t>::iterator itr = streams.begin();
556 uint16_t vbid = itr->first;
557 itr->second->setDead(END_STREAM_DISCONNECTED);
558 streams.erase(vbid);
559 ready.remove(vbid);
560 vblist.push_back(vbid);
561 }
562 lh.unlock();
563
564 connection_t conn(this);
565 std::list<uint16_t>::iterator it = vblist.begin();
566 for (; it != vblist.end(); ++it) {
567 engine_.getDcpConnMap().removeVBConnByVBId(conn, *it);
568 }
569 }
570
getType() const571 const char* DcpProducer::getType() const {
572 if (notifyOnly) {
573 return "notifier";
574 } else {
575 return "producer";
576 }
577 }
578
getNextItem()579 DcpResponse* DcpProducer::getNextItem() {
580 LockHolder lh(queueLock);
581
582 setPaused(false);
583 while (!ready.empty()) {
584 if (log && log->isFull()) {
585 setPaused(true);
586 return NULL;
587 }
588
589 uint16_t vbucket = ready.front();
590 ready.pop_front();
591
592 if (streams.find(vbucket) == streams.end()) {
593 continue;
594 }
595 DcpResponse* op = streams[vbucket]->next();
596 if (!op) {
597 continue;
598 }
599
600 switch (op->getEvent()) {
601 case DCP_SNAPSHOT_MARKER:
602 case DCP_MUTATION:
603 case DCP_DELETION:
604 case DCP_EXPIRATION:
605 case DCP_STREAM_END:
606 case DCP_SET_VBUCKET:
607 break;
608 default:
609 LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to write"
610 " an unexpected event %d", logHeader(), op->getEvent());
611 abort();
612 }
613
614 if (log) {
615 log->insert(op);
616 }
617 ready.push_back(vbucket);
618
619 if (op->getEvent() == DCP_MUTATION || op->getEvent() == DCP_DELETION ||
620 op->getEvent() == DCP_EXPIRATION) {
621 itemsSent++;
622 }
623
624 totalBytesSent = totalBytesSent + op->getMessageSize();
625
626 return op;
627 }
628
629 setPaused(true);
630 return NULL;
631 }
632
setDisconnect(bool disconnect)633 void DcpProducer::setDisconnect(bool disconnect) {
634 ConnHandler::setDisconnect(disconnect);
635
636 if (disconnect) {
637 LockHolder lh(queueLock);
638 std::map<uint16_t, stream_t>::iterator itr = streams.begin();
639 for (; itr != streams.end(); ++itr) {
640 itr->second->setDead(END_STREAM_DISCONNECTED);
641 }
642 }
643 }
644
notifyStreamReady(uint16_t vbucket, bool schedule)645 void DcpProducer::notifyStreamReady(uint16_t vbucket, bool schedule) {
646 LockHolder lh(queueLock);
647
648 std::list<uint16_t>::iterator iter =
649 std::find(ready.begin(), ready.end(), vbucket);
650 if (iter != ready.end()) {
651 return;
652 }
653
654 ready.push_back(vbucket);
655 lh.unlock();
656
657 if (!log || (log && !log->isFull())) {
658 engine_.getDcpConnMap().notifyPausedConnection(this, schedule);
659 }
660 }
661
maybeSendNoop(struct dcp_message_producers* producers)662 ENGINE_ERROR_CODE DcpProducer::maybeSendNoop(struct dcp_message_producers* producers) {
663 if (noopCtx.enabled) {
664 size_t sinceTime = ep_current_time() - noopCtx.sendTime;
665 if (noopCtx.pendingRecv && sinceTime > noopCtx.noopInterval) {
666 LOG(EXTENSION_LOG_WARNING, "%s Disconnected because the connection"
667 " appears to be dead", logHeader());
668 return ENGINE_DISCONNECT;
669 } else if (!noopCtx.pendingRecv && sinceTime > noopCtx.noopInterval) {
670 ENGINE_ERROR_CODE ret;
671 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
672 ret = producers->noop(getCookie(), ++noopCtx.opaque);
673 ObjectRegistry::onSwitchThread(epe);
674
675 if (ret == ENGINE_SUCCESS) {
676 ret = ENGINE_WANT_MORE;
677 }
678 noopCtx.pendingRecv = true;
679 noopCtx.sendTime = ep_current_time();
680 lastSendTime = ep_current_time();
681 return ret;
682 }
683 }
684 return ENGINE_FAILED;
685 }
686
isTimeForNoop()687 bool DcpProducer::isTimeForNoop() {
688 // Not Implemented
689 return false;
690 }
691
setTimeForNoop()692 void DcpProducer::setTimeForNoop() {
693 // Not Implemented
694 }
695
clearQueues()696 void DcpProducer::clearQueues() {
697 LockHolder lh(queueLock);
698 std::map<uint16_t, stream_t>::iterator itr = streams.begin();
699 for (; itr != streams.end(); ++itr) {
700 itr->second->clear();
701 }
702 }
703
appendQueue(std::list<queued_item> *q)704 void DcpProducer::appendQueue(std::list<queued_item> *q) {
705 (void) q;
706 abort(); // Not Implemented
707 }
708
getBackfillQueueSize()709 size_t DcpProducer::getBackfillQueueSize() {
710 return totalBackfillBacklogs;
711 }
712
getItemsSent()713 size_t DcpProducer::getItemsSent() {
714 return itemsSent;
715 }
716
getItemsRemaining_UNLOCKED()717 size_t DcpProducer::getItemsRemaining_UNLOCKED() {
718 size_t remainingSize = 0;
719
720 std::map<uint16_t, stream_t>::iterator itr = streams.begin();
721 for (; itr != streams.end(); ++itr) {
722 Stream *s = (itr->second).get();
723
724 if (s->getType() == STREAM_ACTIVE) {
725 ActiveStream *as = static_cast<ActiveStream *>(s);
726 remainingSize += as->getItemsRemaining();
727 }
728 }
729
730 return remainingSize;
731 }
732
getTotalBytes()733 size_t DcpProducer::getTotalBytes() {
734 return totalBytesSent;
735 }
736
getVBList()737 std::list<uint16_t> DcpProducer::getVBList() {
738 LockHolder lh(queueLock);
739 std::list<uint16_t> vblist;
740 std::map<uint16_t, stream_t>::iterator itr = streams.begin();
741 for (; itr != streams.end(); ++itr) {
742 vblist.push_back(itr->first);
743 }
744 return vblist;
745 }
746
windowIsFull()747 bool DcpProducer::windowIsFull() {
748 abort(); // Not Implemented
749 }
750
flush()751 void DcpProducer::flush() {
752 abort(); // Not Implemented
753 }
754