1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2/* 3 * Copyright 2013 Couchbase, Inc 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18#include "config.h" 19 20#include "backfill.h" 21#include "ep_engine.h" 22#include "failover-table.h" 23#include "dcp-producer.h" 24#include "dcp-response.h" 25#include "dcp-stream.h" 26 27const uint32_t DcpProducer::defaultNoopInerval = 20; 28 29void BufferLog::insert(DcpResponse* response) { 30 cb_assert(!isFull()); 31 bytes_sent += response->getMessageSize(); 32} 33 34void BufferLog::free(uint32_t bytes_to_free) { 35 if (bytes_sent >= bytes_to_free) { 36 bytes_sent -= bytes_to_free; 37 } else { 38 bytes_sent = 0; 39 } 40} 41 42DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie, 43 const std::string &name, bool isNotifier) 44 : Producer(e, cookie, name), rejectResp(NULL), 45 notifyOnly(isNotifier), lastSendTime(ep_current_time()), log(NULL), 46 itemsSent(0), totalBytesSent(0), ackedBytes(0) { 47 setSupportAck(true); 48 setReserved(true); 49 setPaused(true); 50 51 if (notifyOnly) { 52 setLogHeader("DCP (Notifier) " + getName() + " -"); 53 } else { 54 setLogHeader("DCP (Producer) " + getName() + " -"); 55 } 56 57 if (getName().find("replication") != std::string::npos) { 58 engine_.setDCPPriority(getCookie(), CONN_PRIORITY_HIGH); 59 } else if (getName().find("xdcr") != std::string::npos) { 60 engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED); 61 } else if (getName().find("views") != std::string::npos) { 62 engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED); 63 } 64 65 // The consumer assigns opaques starting at 0 so lets have the producer 66 //start using opaques at 10M to prevent any opaque conflicts. 67 noopCtx.opaque = 10000000; 68 noopCtx.sendTime = ep_current_time(); 69 70 // This is for backward compatibility with Couchbase 3.0. In 3.0 we set the 71 // noop interval to 20 seconds by default, but in post 3.0 releases we set 72 // it to be higher by default. Starting in 3.0.1 the DCP consumer sets the 73 // noop interval of the producer when connecting so in an all 3.0.1+ cluster 74 // this value will be overriden. In 3.0 however we do not set the noop 75 // interval so setting this value will make sure we don't disconnect on 76 // accident due to the producer and the consumer having a different noop 77 // interval. 78 noopCtx.noopInterval = defaultNoopInerval; 79 noopCtx.pendingRecv = false; 80 noopCtx.enabled = false; 81} 82 83DcpProducer::~DcpProducer() { 84 if (log) { 85 delete log; 86 } 87} 88 89ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags, 90 uint32_t opaque, 91 uint16_t vbucket, 92 uint64_t start_seqno, 93 uint64_t end_seqno, 94 uint64_t vbucket_uuid, 95 uint64_t snap_start_seqno, 96 uint64_t snap_end_seqno, 97 uint64_t *rollback_seqno, 98 dcp_add_failover_log callback) { 99 if (doDisconnect()) { 100 return ENGINE_DISCONNECT; 101 } 102 103 LockHolder lh(queueLock); 104 RCPtr<VBucket> vb = engine_.getVBucket(vbucket); 105 if (!vb) { 106 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because " 107 "this vbucket doesn't exist", logHeader(), vbucket); 108 return ENGINE_NOT_MY_VBUCKET; 109 } 110 111 if (vb->checkpointManager.getOpenCheckpointId() == 0) { 112 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because " 113 "this vbucket is in backfill state", logHeader(), vbucket); 114 return ENGINE_TMPFAIL; 115 } 116 117 if (flags & DCP_ADD_STREAM_FLAG_LATEST) { 118 end_seqno = vb->getHighSeqno(); 119 } 120 121 if (flags & DCP_ADD_STREAM_FLAG_DISKONLY) { 122 end_seqno = engine_.getEpStore()->getLastPersistedSeqno(vbucket); 123 } 124 125 if (!notifyOnly && start_seqno > end_seqno) { 126 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because " 127 "the start seqno (%llu) is larger than the end seqno (%llu)", 128 logHeader(), vbucket, start_seqno, end_seqno); 129 return ENGINE_ERANGE; 130 } 131 132 if (!notifyOnly && !(snap_start_seqno <= start_seqno && 133 start_seqno <= snap_end_seqno)) { 134 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because " 135 "the snap start seqno (%llu) <= start seqno (%llu) <= snap end " 136 "seqno (%llu) is required", logHeader(), vbucket, snap_start_seqno, 137 start_seqno, snap_end_seqno); 138 return ENGINE_ERANGE; 139 } 140 141 bool add_vb_conn_map = true; 142 std::map<uint16_t, stream_t>::iterator itr; 143 if ((itr = streams.find(vbucket)) != streams.end()) { 144 if (itr->second->getState() != STREAM_DEAD) { 145 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed" 146 " because a stream already exists for this vbucket", 147 logHeader(), vbucket); 148 return ENGINE_KEY_EEXISTS; 149 } else { 150 streams.erase(vbucket); 151 ready.remove(vbucket); 152 // Don't need to add an entry to vbucket-to-conns map 153 add_vb_conn_map = false; 154 } 155 } 156 157 // If we are a notify stream then we can't use the start_seqno supplied 158 // since if it is greater than the current high seqno then it will always 159 // trigger a rollback. As a result we should use the current high seqno for 160 // rollback purposes. 161 uint64_t notifySeqno = start_seqno; 162 if (notifyOnly && start_seqno > static_cast<uint64_t>(vb->getHighSeqno())) { 163 start_seqno = static_cast<uint64_t>(vb->getHighSeqno()); 164 } 165 166 if (vb->failovers->needsRollback(start_seqno, vb->getHighSeqno(), 167 vbucket_uuid, snap_start_seqno, 168 snap_end_seqno, vb->getPurgeSeqno(), 169 rollback_seqno)) { 170 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed " 171 "because a rollback to seqno %llu is required (start seqno %llu, " 172 "vb_uuid %llu, snapStartSeqno %llu, snapEndSeqno %llu)", 173 logHeader(), vbucket, *rollback_seqno, start_seqno, vbucket_uuid, 174 snap_start_seqno, snap_end_seqno); 175 return ENGINE_ROLLBACK; 176 } 177 178 ENGINE_ERROR_CODE rv = vb->failovers->addFailoverLog(getCookie(), callback); 179 if (rv != ENGINE_SUCCESS) { 180 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Couldn't add failover log to " 181 "stream request due to error %d", logHeader(), vbucket, rv); 182 return rv; 183 } 184 185 if (notifyOnly) { 186 streams[vbucket] = new NotifierStream(&engine_, this, getName(), flags, 187 opaque, vbucket, notifySeqno, 188 end_seqno, vbucket_uuid, 189 snap_start_seqno, snap_end_seqno); 190 } else { 191 streams[vbucket] = new ActiveStream(&engine_, this, getName(), flags, 192 opaque, vbucket, start_seqno, 193 end_seqno, vbucket_uuid, 194 snap_start_seqno, snap_end_seqno); 195 static_cast<ActiveStream*>(streams[vbucket].get())->setActive(); 196 } 197 198 ready.push_back(vbucket); 199 lh.unlock(); 200 if (add_vb_conn_map) { 201 connection_t conn(this); 202 engine_.getDcpConnMap().addVBConnByVBId(conn, vbucket); 203 } 204 205 return rv; 206} 207 208ENGINE_ERROR_CODE DcpProducer::getFailoverLog(uint32_t opaque, uint16_t vbucket, 209 dcp_add_failover_log callback) { 210 (void) opaque; 211 if (doDisconnect()) { 212 return ENGINE_DISCONNECT; 213 } 214 215 RCPtr<VBucket> vb = engine_.getVBucket(vbucket); 216 if (!vb) { 217 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Get Failover Log failed " 218 "because this vbucket doesn't exist", logHeader(), vbucket); 219 return ENGINE_NOT_MY_VBUCKET; 220 } 221 222 return vb->failovers->addFailoverLog(getCookie(), callback); 223} 224 225ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) { 226 setLastWalkTime(); 227 228 if (doDisconnect()) { 229 return ENGINE_DISCONNECT; 230 } 231 232 ENGINE_ERROR_CODE ret; 233 if ((ret = maybeSendNoop(producers)) != ENGINE_FAILED) { 234 return ret; 235 } 236 237 DcpResponse *resp; 238 if (rejectResp) { 239 resp = rejectResp; 240 rejectResp = NULL; 241 } else { 242 resp = getNextItem(); 243 if (!resp) { 244 return ENGINE_SUCCESS; 245 } 246 } 247 248 ret = ENGINE_SUCCESS; 249 250 Item* itmCpy = NULL; 251 if (resp->getEvent() == DCP_MUTATION) { 252 itmCpy = static_cast<MutationResponse*>(resp)->getItemCopy(); 253 } 254 255 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, 256 true); 257 switch (resp->getEvent()) { 258 case DCP_STREAM_END: 259 { 260 StreamEndResponse *se = static_cast<StreamEndResponse*>(resp); 261 ret = producers->stream_end(getCookie(), se->getOpaque(), 262 se->getVbucket(), se->getFlags()); 263 break; 264 } 265 case DCP_MUTATION: 266 { 267 MutationResponse *m = dynamic_cast<MutationResponse*> (resp); 268 ret = producers->mutation(getCookie(), m->getOpaque(), itmCpy, 269 m->getVBucket(), m->getBySeqno(), 270 m->getRevSeqno(), 0, NULL, 0, 271 m->getItem()->getNRUValue()); 272 break; 273 } 274 case DCP_DELETION: 275 { 276 MutationResponse *m = static_cast<MutationResponse*>(resp); 277 ret = producers->deletion(getCookie(), m->getOpaque(), 278 m->getItem()->getKey().c_str(), 279 m->getItem()->getNKey(), 280 m->getItem()->getCas(), 281 m->getVBucket(), m->getBySeqno(), 282 m->getRevSeqno(), NULL, 0); 283 break; 284 } 285 case DCP_SNAPSHOT_MARKER: 286 { 287 SnapshotMarker *s = static_cast<SnapshotMarker*>(resp); 288 ret = producers->marker(getCookie(), s->getOpaque(), 289 s->getVBucket(), 290 s->getStartSeqno(), 291 s->getEndSeqno(), 292 s->getFlags()); 293 break; 294 } 295 case DCP_SET_VBUCKET: 296 { 297 SetVBucketState *s = static_cast<SetVBucketState*>(resp); 298 ret = producers->set_vbucket_state(getCookie(), s->getOpaque(), 299 s->getVBucket(), s->getState()); 300 break; 301 } 302 default: 303 { 304 LOG(EXTENSION_LOG_WARNING, "%s Unexpected dcp event (%d), " 305 "disconnecting", logHeader(), resp->getEvent()); 306 ret = ENGINE_DISCONNECT; 307 break; 308 } 309 } 310 311 ObjectRegistry::onSwitchThread(epe); 312 if (resp->getEvent() == DCP_MUTATION && ret != ENGINE_SUCCESS) { 313 delete itmCpy; 314 } 315 316 if (ret == ENGINE_E2BIG) { 317 rejectResp = resp; 318 } else { 319 delete resp; 320 } 321 322 lastSendTime = ep_current_time(); 323 return (ret == ENGINE_SUCCESS) ? ENGINE_WANT_MORE : ret; 324} 325 326ENGINE_ERROR_CODE DcpProducer::bufferAcknowledgement(uint32_t opaque, 327 uint16_t vbucket, 328 uint32_t buffer_bytes) { 329 LockHolder lh(queueLock); 330 if (log) { 331 bool wasFull = log->isFull(); 332 333 ackedBytes.fetch_add(buffer_bytes); 334 log->free(buffer_bytes); 335 lh.unlock(); 336 337 if (wasFull) { 338 engine_.getDcpConnMap().notifyPausedConnection(this, true); 339 } 340 } 341 342 return ENGINE_SUCCESS; 343} 344 345ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key, 346 uint16_t nkey, const void* value, 347 uint32_t nvalue) { 348 LockHolder lh(queueLock); 349 const char* param = static_cast<const char*>(key); 350 std::string keyStr(static_cast<const char*>(key), nkey); 351 std::string valueStr(static_cast<const char*>(value), nvalue); 352 353 if (strncmp(param, "connection_buffer_size", nkey) == 0) { 354 uint32_t size; 355 if (parseUint32(valueStr.c_str(), &size)) { 356 if (!log) { 357 log = new BufferLog(size); 358 } else if (log->getBufferSize() != size) { 359 log->setBufferSize(size); 360 } 361 return ENGINE_SUCCESS; 362 } 363 } else if (strncmp(param, "stream_buffer_size", nkey) == 0) { 364 LOG(EXTENSION_LOG_WARNING, "%s The ctrl parameter stream_buffer_size is" 365 "not supported by this engine", logHeader()); 366 return ENGINE_ENOTSUP; 367 } else if (strncmp(param, "enable_noop", nkey) == 0) { 368 if (valueStr.compare("true") == 0) { 369 noopCtx.enabled = true; 370 } else { 371 noopCtx.enabled = false; 372 } 373 return ENGINE_SUCCESS; 374 } else if (strncmp(param, "set_noop_interval", nkey) == 0) { 375 if (parseUint32(valueStr.c_str(), &noopCtx.noopInterval)) { 376 return ENGINE_SUCCESS; 377 } 378 } 379 380 LOG(EXTENSION_LOG_WARNING, "%s Invalid ctrl parameter '%s' for %s", 381 logHeader(), valueStr.c_str(), keyStr.c_str()); 382 383 return ENGINE_EINVAL; 384} 385 386ENGINE_ERROR_CODE DcpProducer::handleResponse( 387 protocol_binary_response_header *resp) { 388 if (doDisconnect()) { 389 return ENGINE_DISCONNECT; 390 } 391 392 uint8_t opcode = resp->response.opcode; 393 if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE || 394 opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) { 395 protocol_binary_response_dcp_stream_req* pkt = 396 reinterpret_cast<protocol_binary_response_dcp_stream_req*>(resp); 397 uint32_t opaque = pkt->message.header.response.opaque; 398 399 LockHolder lh(queueLock); 400 stream_t active_stream; 401 std::map<uint16_t, stream_t>::iterator itr; 402 for (itr = streams.begin() ; itr != streams.end(); ++itr) { 403 active_stream = itr->second; 404 Stream *str = active_stream.get(); 405 if (str && str->getType() == STREAM_ACTIVE) { 406 ActiveStream* as = static_cast<ActiveStream*>(str); 407 if (as && opaque == itr->second->getOpaque()) { 408 break; 409 } 410 } 411 } 412 413 if (itr != streams.end()) { 414 lh.unlock(); 415 ActiveStream *as = static_cast<ActiveStream*>(active_stream.get()); 416 if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE) { 417 as->setVBucketStateAckRecieved(); 418 } else if (opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) { 419 as->snapshotMarkerAckReceived(); 420 } 421 } 422 423 return ENGINE_SUCCESS; 424 } else if (opcode == PROTOCOL_BINARY_CMD_DCP_MUTATION || 425 opcode == PROTOCOL_BINARY_CMD_DCP_DELETION || 426 opcode == PROTOCOL_BINARY_CMD_DCP_EXPIRATION || 427 opcode == PROTOCOL_BINARY_CMD_DCP_STREAM_END) { 428 // TODO: When nacking is implemented we need to handle these responses 429 return ENGINE_SUCCESS; 430 } else if (opcode == PROTOCOL_BINARY_CMD_DCP_NOOP) { 431 if (noopCtx.opaque == resp->response.opaque) { 432 noopCtx.pendingRecv = false; 433 return ENGINE_SUCCESS; 434 } 435 } 436 437 LOG(EXTENSION_LOG_WARNING, "%s Trying to handle an unknown response %d, " 438 "disconnecting", logHeader(), opcode); 439 440 return ENGINE_DISCONNECT; 441} 442 443ENGINE_ERROR_CODE DcpProducer::closeStream(uint32_t opaque, uint16_t vbucket) { 444 if (doDisconnect()) { 445 return ENGINE_DISCONNECT; 446 } 447 448 LockHolder lh(queueLock); 449 std::map<uint16_t, stream_t>::iterator itr; 450 if ((itr = streams.find(vbucket)) == streams.end()) { 451 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because no " 452 "stream exists for this vbucket", logHeader(), vbucket); 453 return ENGINE_KEY_ENOENT; 454 } else if (!itr->second->isActive()) { 455 LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because " 456 "stream is already marked as dead", logHeader(), vbucket); 457 streams.erase(vbucket); 458 ready.remove(vbucket); 459 lh.unlock(); 460 connection_t conn(this); 461 engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket); 462 return ENGINE_KEY_ENOENT; 463 } 464 465 stream_t stream = itr->second; 466 streams.erase(vbucket); 467 ready.remove(vbucket); 468 lh.unlock(); 469 470 stream->setDead(END_STREAM_CLOSED); 471 connection_t conn(this); 472 engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket); 473 return ENGINE_SUCCESS; 474} 475 476void DcpProducer::addStats(ADD_STAT add_stat, const void *c) { 477 Producer::addStats(add_stat, c); 478 479 LockHolder lh(queueLock); 480 481 addStat("items_sent", getItemsSent(), add_stat, c); 482 addStat("items_remaining", getItemsRemaining_UNLOCKED(), add_stat, c); 483 addStat("total_bytes_sent", getTotalBytes(), add_stat, c); 484 addStat("last_sent_time", lastSendTime, add_stat, c); 485 addStat("noop_enabled", noopCtx.enabled, add_stat, c); 486 addStat("noop_wait", noopCtx.pendingRecv, add_stat, c); 487 488 if (log) { 489 addStat("max_buffer_bytes", log->getBufferSize(), add_stat, c); 490 addStat("unacked_bytes", log->getBytesSent(), add_stat, c); 491 addStat("total_acked_bytes", ackedBytes, add_stat, c); 492 addStat("flow_control", "enabled", add_stat, c); 493 } else { 494 addStat("flow_control", "disabled", add_stat, c); 495 } 496 497 std::map<uint16_t, stream_t>::iterator itr; 498 for (itr = streams.begin(); itr != streams.end(); ++itr) { 499 itr->second->addStats(add_stat, c); 500 } 501} 502 503void DcpProducer::addTakeoverStats(ADD_STAT add_stat, const void* c, 504 uint16_t vbid) { 505 LockHolder lh(queueLock); 506 std::map<uint16_t, stream_t>::iterator itr = streams.find(vbid); 507 if (itr != streams.end()) { 508 Stream *s = itr->second.get(); 509 if (s && s->getType() == STREAM_ACTIVE) { 510 ActiveStream* as = static_cast<ActiveStream*>(s); 511 if (as) { 512 as->addTakeoverStats(add_stat, c); 513 } 514 } 515 } 516} 517 518void DcpProducer::aggregateQueueStats(ConnCounter* aggregator) { 519 LockHolder lh(queueLock); 520 if (!aggregator) { 521 LOG(EXTENSION_LOG_WARNING, "%s Pointer to the queue stats aggregator" 522 " is NULL!!!", logHeader()); 523 return; 524 } 525 aggregator->conn_queueDrain += itemsSent; 526 aggregator->conn_totalBytes += totalBytesSent; 527 aggregator->conn_queueRemaining += getItemsRemaining_UNLOCKED(); 528 aggregator->conn_queueBackfillRemaining += totalBackfillBacklogs; 529} 530 531void DcpProducer::notifySeqnoAvailable(uint16_t vbucket, uint64_t seqno) { 532 LockHolder lh(queueLock); 533 std::map<uint16_t, stream_t>::iterator itr = streams.find(vbucket); 534 if (itr != streams.end() && itr->second->isActive()) { 535 stream_t stream = itr->second; 536 lh.unlock(); 537 stream->notifySeqnoAvailable(seqno); 538 } 539} 540 541void DcpProducer::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) { 542 LockHolder lh(queueLock); 543 std::map<uint16_t, stream_t>::iterator itr = streams.find(vbucket); 544 if (itr != streams.end()) { 545 stream_t stream = itr->second; 546 lh.unlock(); 547 stream->setDead(END_STREAM_STATE); 548 } 549} 550 551void DcpProducer::closeAllStreams() { 552 LockHolder lh(queueLock); 553 std::list<uint16_t> vblist; 554 while (!streams.empty()) { 555 std::map<uint16_t, stream_t>::iterator itr = streams.begin(); 556 uint16_t vbid = itr->first; 557 itr->second->setDead(END_STREAM_DISCONNECTED); 558 streams.erase(vbid); 559 ready.remove(vbid); 560 vblist.push_back(vbid); 561 } 562 lh.unlock(); 563 564 connection_t conn(this); 565 std::list<uint16_t>::iterator it = vblist.begin(); 566 for (; it != vblist.end(); ++it) { 567 engine_.getDcpConnMap().removeVBConnByVBId(conn, *it); 568 } 569} 570 571const char* DcpProducer::getType() const { 572 if (notifyOnly) { 573 return "notifier"; 574 } else { 575 return "producer"; 576 } 577} 578 579DcpResponse* DcpProducer::getNextItem() { 580 LockHolder lh(queueLock); 581 582 setPaused(false); 583 while (!ready.empty()) { 584 if (log && log->isFull()) { 585 setPaused(true); 586 return NULL; 587 } 588 589 uint16_t vbucket = ready.front(); 590 ready.pop_front(); 591 592 if (streams.find(vbucket) == streams.end()) { 593 continue; 594 } 595 DcpResponse* op = streams[vbucket]->next(); 596 if (!op) { 597 continue; 598 } 599 600 switch (op->getEvent()) { 601 case DCP_SNAPSHOT_MARKER: 602 case DCP_MUTATION: 603 case DCP_DELETION: 604 case DCP_EXPIRATION: 605 case DCP_STREAM_END: 606 case DCP_SET_VBUCKET: 607 break; 608 default: 609 LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to write" 610 " an unexpected event %d", logHeader(), op->getEvent()); 611 abort(); 612 } 613 614 if (log) { 615 log->insert(op); 616 } 617 ready.push_back(vbucket); 618 619 if (op->getEvent() == DCP_MUTATION || op->getEvent() == DCP_DELETION || 620 op->getEvent() == DCP_EXPIRATION) { 621 itemsSent++; 622 } 623 624 totalBytesSent = totalBytesSent + op->getMessageSize(); 625 626 return op; 627 } 628 629 setPaused(true); 630 return NULL; 631} 632 633void DcpProducer::setDisconnect(bool disconnect) { 634 ConnHandler::setDisconnect(disconnect); 635 636 if (disconnect) { 637 LockHolder lh(queueLock); 638 std::map<uint16_t, stream_t>::iterator itr = streams.begin(); 639 for (; itr != streams.end(); ++itr) { 640 itr->second->setDead(END_STREAM_DISCONNECTED); 641 } 642 } 643} 644 645void DcpProducer::notifyStreamReady(uint16_t vbucket, bool schedule) { 646 LockHolder lh(queueLock); 647 648 std::list<uint16_t>::iterator iter = 649 std::find(ready.begin(), ready.end(), vbucket); 650 if (iter != ready.end()) { 651 return; 652 } 653 654 ready.push_back(vbucket); 655 lh.unlock(); 656 657 if (!log || (log && !log->isFull())) { 658 engine_.getDcpConnMap().notifyPausedConnection(this, schedule); 659 } 660} 661 662ENGINE_ERROR_CODE DcpProducer::maybeSendNoop(struct dcp_message_producers* producers) { 663 if (noopCtx.enabled) { 664 size_t sinceTime = ep_current_time() - noopCtx.sendTime; 665 if (noopCtx.pendingRecv && sinceTime > noopCtx.noopInterval) { 666 LOG(EXTENSION_LOG_WARNING, "%s Disconnected because the connection" 667 " appears to be dead", logHeader()); 668 return ENGINE_DISCONNECT; 669 } else if (!noopCtx.pendingRecv && sinceTime > noopCtx.noopInterval) { 670 ENGINE_ERROR_CODE ret; 671 EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true); 672 ret = producers->noop(getCookie(), ++noopCtx.opaque); 673 ObjectRegistry::onSwitchThread(epe); 674 675 if (ret == ENGINE_SUCCESS) { 676 ret = ENGINE_WANT_MORE; 677 } 678 noopCtx.pendingRecv = true; 679 noopCtx.sendTime = ep_current_time(); 680 lastSendTime = ep_current_time(); 681 return ret; 682 } 683 } 684 return ENGINE_FAILED; 685} 686 687bool DcpProducer::isTimeForNoop() { 688 // Not Implemented 689 return false; 690} 691 692void DcpProducer::setTimeForNoop() { 693 // Not Implemented 694} 695 696void DcpProducer::clearQueues() { 697 LockHolder lh(queueLock); 698 std::map<uint16_t, stream_t>::iterator itr = streams.begin(); 699 for (; itr != streams.end(); ++itr) { 700 itr->second->clear(); 701 } 702} 703 704void DcpProducer::appendQueue(std::list<queued_item> *q) { 705 (void) q; 706 abort(); // Not Implemented 707} 708 709size_t DcpProducer::getBackfillQueueSize() { 710 return totalBackfillBacklogs; 711} 712 713size_t DcpProducer::getItemsSent() { 714 return itemsSent; 715} 716 717size_t DcpProducer::getItemsRemaining_UNLOCKED() { 718 size_t remainingSize = 0; 719 720 std::map<uint16_t, stream_t>::iterator itr = streams.begin(); 721 for (; itr != streams.end(); ++itr) { 722 Stream *s = (itr->second).get(); 723 724 if (s->getType() == STREAM_ACTIVE) { 725 ActiveStream *as = static_cast<ActiveStream *>(s); 726 remainingSize += as->getItemsRemaining(); 727 } 728 } 729 730 return remainingSize; 731} 732 733size_t DcpProducer::getTotalBytes() { 734 return totalBytesSent; 735} 736 737std::list<uint16_t> DcpProducer::getVBList() { 738 LockHolder lh(queueLock); 739 std::list<uint16_t> vblist; 740 std::map<uint16_t, stream_t>::iterator itr = streams.begin(); 741 for (; itr != streams.end(); ++itr) { 742 vblist.push_back(itr->first); 743 } 744 return vblist; 745} 746 747bool DcpProducer::windowIsFull() { 748 abort(); // Not Implemented 749} 750 751void DcpProducer::flush() { 752 abort(); // Not Implemented 753} 754