1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include "backfill.h"
21 #include "ep_engine.h"
22 #include "failover-table.h"
23 #include "dcp-producer.h"
24 #include "dcp-response.h"
25 #include "dcp-stream.h"
26 
27 const uint32_t DcpProducer::defaultNoopInerval = 20;
28 
insert(DcpResponse* response)29 void BufferLog::insert(DcpResponse* response) {
30     cb_assert(!isFull());
31     bytes_sent += response->getMessageSize();
32 }
33 
free(uint32_t bytes_to_free)34 void BufferLog::free(uint32_t bytes_to_free) {
35     if (bytes_sent >= bytes_to_free) {
36         bytes_sent -= bytes_to_free;
37     } else {
38         bytes_sent = 0;
39     }
40 }
41 
DcpProducer(EventuallyPersistentEngine &e, const void *cookie, const std::string &name, bool isNotifier)42 DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
43                          const std::string &name, bool isNotifier)
44     : Producer(e, cookie, name), rejectResp(NULL),
45       notifyOnly(isNotifier), lastSendTime(ep_current_time()), log(NULL),
46       itemsSent(0), totalBytesSent(0), ackedBytes(0) {
47     setSupportAck(true);
48     setReserved(true);
49     setPaused(true);
50 
51     if (notifyOnly) {
52         setLogHeader("DCP (Notifier) " + getName() + " -");
53     } else {
54         setLogHeader("DCP (Producer) " + getName() + " -");
55     }
56 
57     if (getName().find("replication") != std::string::npos) {
58         engine_.setDCPPriority(getCookie(), CONN_PRIORITY_HIGH);
59     } else if (getName().find("xdcr") != std::string::npos) {
60         engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
61     } else if (getName().find("views") != std::string::npos) {
62         engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
63     }
64 
65     // The consumer assigns opaques starting at 0 so lets have the producer
66     //start using opaques at 10M to prevent any opaque conflicts.
67     noopCtx.opaque = 10000000;
68     noopCtx.sendTime = ep_current_time();
69 
70     // This is for backward compatibility with Couchbase 3.0. In 3.0 we set the
71     // noop interval to 20 seconds by default, but in post 3.0 releases we set
72     // it to be higher by default. Starting in 3.0.1 the DCP consumer sets the
73     // noop interval of the producer when connecting so in an all 3.0.1+ cluster
74     // this value will be overriden. In 3.0 however we do not set the noop
75     // interval so setting this value will make sure we don't disconnect on
76     // accident due to the producer and the consumer having a different noop
77     // interval.
78     noopCtx.noopInterval = defaultNoopInerval;
79     noopCtx.pendingRecv = false;
80     noopCtx.enabled = false;
81 }
82 
~DcpProducer()83 DcpProducer::~DcpProducer() {
84     if (log) {
85         delete log;
86     }
87 }
88 
streamRequest(uint32_t flags, uint32_t opaque, uint16_t vbucket, uint64_t start_seqno, uint64_t end_seqno, uint64_t vbucket_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno, uint64_t *rollback_seqno, dcp_add_failover_log callback)89 ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
90                                              uint32_t opaque,
91                                              uint16_t vbucket,
92                                              uint64_t start_seqno,
93                                              uint64_t end_seqno,
94                                              uint64_t vbucket_uuid,
95                                              uint64_t snap_start_seqno,
96                                              uint64_t snap_end_seqno,
97                                              uint64_t *rollback_seqno,
98                                              dcp_add_failover_log callback) {
99     if (doDisconnect()) {
100         return ENGINE_DISCONNECT;
101     }
102 
103     LockHolder lh(queueLock);
104     RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
105     if (!vb) {
106         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
107             "this vbucket doesn't exist", logHeader(), vbucket);
108         return ENGINE_NOT_MY_VBUCKET;
109     }
110 
111     if (vb->checkpointManager.getOpenCheckpointId() == 0) {
112         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
113             "this vbucket is in backfill state", logHeader(), vbucket);
114         return ENGINE_TMPFAIL;
115     }
116 
117     if (flags & DCP_ADD_STREAM_FLAG_LATEST) {
118         end_seqno = vb->getHighSeqno();
119     }
120 
121     if (flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
122         end_seqno = engine_.getEpStore()->getLastPersistedSeqno(vbucket);
123     }
124 
125     if (!notifyOnly && start_seqno > end_seqno) {
126         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
127             "the start seqno (%llu) is larger than the end seqno (%llu)",
128             logHeader(), vbucket, start_seqno, end_seqno);
129         return ENGINE_ERANGE;
130     }
131 
132     if (!notifyOnly && !(snap_start_seqno <= start_seqno &&
133         start_seqno <= snap_end_seqno)) {
134         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
135             "the snap start seqno (%llu) <= start seqno (%llu) <= snap end "
136             "seqno (%llu) is required", logHeader(), vbucket, snap_start_seqno,
137             start_seqno, snap_end_seqno);
138         return ENGINE_ERANGE;
139     }
140 
141     bool add_vb_conn_map = true;
142     std::map<uint16_t, stream_t>::iterator itr;
143     if ((itr = streams.find(vbucket)) != streams.end()) {
144         if (itr->second->getState() != STREAM_DEAD) {
145             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed"
146                 " because a stream already exists for this vbucket",
147                 logHeader(), vbucket);
148             return ENGINE_KEY_EEXISTS;
149         } else {
150             streams.erase(vbucket);
151             ready.remove(vbucket);
152             // Don't need to add an entry to vbucket-to-conns map
153             add_vb_conn_map = false;
154         }
155     }
156 
157     // If we are a notify stream then we can't use the start_seqno supplied
158     // since if it is greater than the current high seqno then it will always
159     // trigger a rollback. As a result we should use the current high seqno for
160     // rollback purposes.
161     uint64_t notifySeqno = start_seqno;
162     if (notifyOnly && start_seqno > static_cast<uint64_t>(vb->getHighSeqno())) {
163         start_seqno = static_cast<uint64_t>(vb->getHighSeqno());
164     }
165 
166     if (vb->failovers->needsRollback(start_seqno, vb->getHighSeqno(),
167                                      vbucket_uuid, snap_start_seqno,
168                                      snap_end_seqno, vb->getPurgeSeqno(),
169                                      rollback_seqno)) {
170         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed "
171             "because a rollback to seqno %llu is required (start seqno %llu, "
172             "vb_uuid %llu, snapStartSeqno %llu, snapEndSeqno %llu)",
173             logHeader(), vbucket, *rollback_seqno, start_seqno, vbucket_uuid,
174             snap_start_seqno, snap_end_seqno);
175         return ENGINE_ROLLBACK;
176     }
177 
178     ENGINE_ERROR_CODE rv = vb->failovers->addFailoverLog(getCookie(), callback);
179     if (rv != ENGINE_SUCCESS) {
180         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Couldn't add failover log to "
181             "stream request due to error %d", logHeader(), vbucket, rv);
182         return rv;
183     }
184 
185     if (notifyOnly) {
186         streams[vbucket] = new NotifierStream(&engine_, this, getName(), flags,
187                                               opaque, vbucket, notifySeqno,
188                                               end_seqno, vbucket_uuid,
189                                               snap_start_seqno, snap_end_seqno);
190     } else {
191         streams[vbucket] = new ActiveStream(&engine_, this, getName(), flags,
192                                             opaque, vbucket, start_seqno,
193                                             end_seqno, vbucket_uuid,
194                                             snap_start_seqno, snap_end_seqno);
195         static_cast<ActiveStream*>(streams[vbucket].get())->setActive();
196     }
197 
198     ready.push_back(vbucket);
199     lh.unlock();
200     if (add_vb_conn_map) {
201         connection_t conn(this);
202         engine_.getDcpConnMap().addVBConnByVBId(conn, vbucket);
203     }
204 
205     return rv;
206 }
207 
getFailoverLog(uint32_t opaque, uint16_t vbucket, dcp_add_failover_log callback)208 ENGINE_ERROR_CODE DcpProducer::getFailoverLog(uint32_t opaque, uint16_t vbucket,
209                                               dcp_add_failover_log callback) {
210     (void) opaque;
211     if (doDisconnect()) {
212         return ENGINE_DISCONNECT;
213     }
214 
215     RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
216     if (!vb) {
217         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Get Failover Log failed "
218             "because this vbucket doesn't exist", logHeader(), vbucket);
219         return ENGINE_NOT_MY_VBUCKET;
220     }
221 
222     return vb->failovers->addFailoverLog(getCookie(), callback);
223 }
224 
step(struct dcp_message_producers* producers)225 ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
226     setLastWalkTime();
227 
228     if (doDisconnect()) {
229         return ENGINE_DISCONNECT;
230     }
231 
232     ENGINE_ERROR_CODE ret;
233     if ((ret = maybeSendNoop(producers)) != ENGINE_FAILED) {
234         return ret;
235     }
236 
237     DcpResponse *resp;
238     if (rejectResp) {
239         resp = rejectResp;
240         rejectResp = NULL;
241     } else {
242         resp = getNextItem();
243         if (!resp) {
244             return ENGINE_SUCCESS;
245         }
246     }
247 
248     ret = ENGINE_SUCCESS;
249 
250     Item* itmCpy = NULL;
251     if (resp->getEvent() == DCP_MUTATION) {
252         itmCpy = static_cast<MutationResponse*>(resp)->getItemCopy();
253     }
254 
255     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL,
256                                                                      true);
257     switch (resp->getEvent()) {
258         case DCP_STREAM_END:
259         {
260             StreamEndResponse *se = static_cast<StreamEndResponse*>(resp);
261             ret = producers->stream_end(getCookie(), se->getOpaque(),
262                                         se->getVbucket(), se->getFlags());
263             break;
264         }
265         case DCP_MUTATION:
266         {
267             MutationResponse *m = dynamic_cast<MutationResponse*> (resp);
268             ret = producers->mutation(getCookie(), m->getOpaque(), itmCpy,
269                                       m->getVBucket(), m->getBySeqno(),
270                                       m->getRevSeqno(), 0, NULL, 0,
271                                       m->getItem()->getNRUValue());
272             break;
273         }
274         case DCP_DELETION:
275         {
276             MutationResponse *m = static_cast<MutationResponse*>(resp);
277             ret = producers->deletion(getCookie(), m->getOpaque(),
278                                       m->getItem()->getKey().c_str(),
279                                       m->getItem()->getNKey(),
280                                       m->getItem()->getCas(),
281                                       m->getVBucket(), m->getBySeqno(),
282                                       m->getRevSeqno(), NULL, 0);
283             break;
284         }
285         case DCP_SNAPSHOT_MARKER:
286         {
287             SnapshotMarker *s = static_cast<SnapshotMarker*>(resp);
288             ret = producers->marker(getCookie(), s->getOpaque(),
289                                     s->getVBucket(),
290                                     s->getStartSeqno(),
291                                     s->getEndSeqno(),
292                                     s->getFlags());
293             break;
294         }
295         case DCP_SET_VBUCKET:
296         {
297             SetVBucketState *s = static_cast<SetVBucketState*>(resp);
298             ret = producers->set_vbucket_state(getCookie(), s->getOpaque(),
299                                                s->getVBucket(), s->getState());
300             break;
301         }
302         default:
303         {
304             LOG(EXTENSION_LOG_WARNING, "%s Unexpected dcp event (%d), "
305                 "disconnecting", logHeader(), resp->getEvent());
306             ret = ENGINE_DISCONNECT;
307             break;
308         }
309     }
310 
311     ObjectRegistry::onSwitchThread(epe);
312     if (resp->getEvent() == DCP_MUTATION && ret != ENGINE_SUCCESS) {
313         delete itmCpy;
314     }
315 
316     if (ret == ENGINE_E2BIG) {
317         rejectResp = resp;
318     } else {
319         delete resp;
320     }
321 
322     lastSendTime = ep_current_time();
323     return (ret == ENGINE_SUCCESS) ? ENGINE_WANT_MORE : ret;
324 }
325 
bufferAcknowledgement(uint32_t opaque, uint16_t vbucket, uint32_t buffer_bytes)326 ENGINE_ERROR_CODE DcpProducer::bufferAcknowledgement(uint32_t opaque,
327                                                      uint16_t vbucket,
328                                                      uint32_t buffer_bytes) {
329     LockHolder lh(queueLock);
330     if (log) {
331         bool wasFull = log->isFull();
332 
333         ackedBytes.fetch_add(buffer_bytes);
334         log->free(buffer_bytes);
335         lh.unlock();
336 
337         if (wasFull) {
338             engine_.getDcpConnMap().notifyPausedConnection(this, true);
339         }
340     }
341 
342     return ENGINE_SUCCESS;
343 }
344 
control(uint32_t opaque, const void* key, uint16_t nkey, const void* value, uint32_t nvalue)345 ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
346                                        uint16_t nkey, const void* value,
347                                        uint32_t nvalue) {
348     LockHolder lh(queueLock);
349     const char* param = static_cast<const char*>(key);
350     std::string keyStr(static_cast<const char*>(key), nkey);
351     std::string valueStr(static_cast<const char*>(value), nvalue);
352 
353     if (strncmp(param, "connection_buffer_size", nkey) == 0) {
354         uint32_t size;
355         if (parseUint32(valueStr.c_str(), &size)) {
356             if (!log) {
357                 log = new BufferLog(size);
358             } else if (log->getBufferSize() != size) {
359                 log->setBufferSize(size);
360             }
361             return ENGINE_SUCCESS;
362         }
363     } else if (strncmp(param, "stream_buffer_size", nkey) == 0) {
364         LOG(EXTENSION_LOG_WARNING, "%s The ctrl parameter stream_buffer_size is"
365             "not supported by this engine", logHeader());
366         return ENGINE_ENOTSUP;
367     } else if (strncmp(param, "enable_noop", nkey) == 0) {
368         if (valueStr.compare("true") == 0) {
369             noopCtx.enabled = true;
370         } else {
371             noopCtx.enabled = false;
372         }
373         return ENGINE_SUCCESS;
374     } else if (strncmp(param, "set_noop_interval", nkey) == 0) {
375         if (parseUint32(valueStr.c_str(), &noopCtx.noopInterval)) {
376             return ENGINE_SUCCESS;
377         }
378     }
379 
380     LOG(EXTENSION_LOG_WARNING, "%s Invalid ctrl parameter '%s' for %s",
381         logHeader(), valueStr.c_str(), keyStr.c_str());
382 
383     return ENGINE_EINVAL;
384 }
385 
handleResponse( protocol_binary_response_header *resp)386 ENGINE_ERROR_CODE DcpProducer::handleResponse(
387                                         protocol_binary_response_header *resp) {
388     if (doDisconnect()) {
389         return ENGINE_DISCONNECT;
390     }
391 
392     uint8_t opcode = resp->response.opcode;
393     if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE ||
394         opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
395         protocol_binary_response_dcp_stream_req* pkt =
396             reinterpret_cast<protocol_binary_response_dcp_stream_req*>(resp);
397         uint32_t opaque = pkt->message.header.response.opaque;
398 
399         LockHolder lh(queueLock);
400         stream_t active_stream;
401         std::map<uint16_t, stream_t>::iterator itr;
402         for (itr = streams.begin() ; itr != streams.end(); ++itr) {
403             active_stream = itr->second;
404             Stream *str = active_stream.get();
405             if (str && str->getType() == STREAM_ACTIVE) {
406                 ActiveStream* as = static_cast<ActiveStream*>(str);
407                 if (as && opaque == itr->second->getOpaque()) {
408                     break;
409                 }
410             }
411         }
412 
413         if (itr != streams.end()) {
414             lh.unlock();
415             ActiveStream *as = static_cast<ActiveStream*>(active_stream.get());
416             if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE) {
417                 as->setVBucketStateAckRecieved();
418             } else if (opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
419                 as->snapshotMarkerAckReceived();
420             }
421         }
422 
423         return ENGINE_SUCCESS;
424     } else if (opcode == PROTOCOL_BINARY_CMD_DCP_MUTATION ||
425         opcode == PROTOCOL_BINARY_CMD_DCP_DELETION ||
426         opcode == PROTOCOL_BINARY_CMD_DCP_EXPIRATION ||
427         opcode == PROTOCOL_BINARY_CMD_DCP_STREAM_END) {
428         // TODO: When nacking is implemented we need to handle these responses
429         return ENGINE_SUCCESS;
430     } else if (opcode == PROTOCOL_BINARY_CMD_DCP_NOOP) {
431         if (noopCtx.opaque == resp->response.opaque) {
432             noopCtx.pendingRecv = false;
433             return ENGINE_SUCCESS;
434         }
435     }
436 
437     LOG(EXTENSION_LOG_WARNING, "%s Trying to handle an unknown response %d, "
438         "disconnecting", logHeader(), opcode);
439 
440     return ENGINE_DISCONNECT;
441 }
442 
closeStream(uint32_t opaque, uint16_t vbucket)443 ENGINE_ERROR_CODE DcpProducer::closeStream(uint32_t opaque, uint16_t vbucket) {
444     if (doDisconnect()) {
445         return ENGINE_DISCONNECT;
446     }
447 
448     LockHolder lh(queueLock);
449     std::map<uint16_t, stream_t>::iterator itr;
450     if ((itr = streams.find(vbucket)) == streams.end()) {
451         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because no "
452             "stream exists for this vbucket", logHeader(), vbucket);
453         return ENGINE_KEY_ENOENT;
454     } else if (!itr->second->isActive()) {
455         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because "
456             "stream is already marked as dead", logHeader(), vbucket);
457         streams.erase(vbucket);
458         ready.remove(vbucket);
459         lh.unlock();
460         connection_t conn(this);
461         engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
462         return ENGINE_KEY_ENOENT;
463     }
464 
465     stream_t stream = itr->second;
466     streams.erase(vbucket);
467     ready.remove(vbucket);
468     lh.unlock();
469 
470     stream->setDead(END_STREAM_CLOSED);
471     connection_t conn(this);
472     engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
473     return ENGINE_SUCCESS;
474 }
475 
addStats(ADD_STAT add_stat, const void *c)476 void DcpProducer::addStats(ADD_STAT add_stat, const void *c) {
477     Producer::addStats(add_stat, c);
478 
479     LockHolder lh(queueLock);
480 
481     addStat("items_sent", getItemsSent(), add_stat, c);
482     addStat("items_remaining", getItemsRemaining_UNLOCKED(), add_stat, c);
483     addStat("total_bytes_sent", getTotalBytes(), add_stat, c);
484     addStat("last_sent_time", lastSendTime, add_stat, c);
485     addStat("noop_enabled", noopCtx.enabled, add_stat, c);
486     addStat("noop_wait", noopCtx.pendingRecv, add_stat, c);
487 
488     if (log) {
489         addStat("max_buffer_bytes", log->getBufferSize(), add_stat, c);
490         addStat("unacked_bytes", log->getBytesSent(), add_stat, c);
491         addStat("total_acked_bytes", ackedBytes, add_stat, c);
492         addStat("flow_control", "enabled", add_stat, c);
493     } else {
494         addStat("flow_control", "disabled", add_stat, c);
495     }
496 
497     std::map<uint16_t, stream_t>::iterator itr;
498     for (itr = streams.begin(); itr != streams.end(); ++itr) {
499         itr->second->addStats(add_stat, c);
500     }
501 }
502 
addTakeoverStats(ADD_STAT add_stat, const void* c, uint16_t vbid)503 void DcpProducer::addTakeoverStats(ADD_STAT add_stat, const void* c,
504                                    uint16_t vbid) {
505     LockHolder lh(queueLock);
506     std::map<uint16_t, stream_t>::iterator itr = streams.find(vbid);
507     if (itr != streams.end()) {
508         Stream *s = itr->second.get();
509         if (s && s->getType() == STREAM_ACTIVE) {
510             ActiveStream* as = static_cast<ActiveStream*>(s);
511             if (as) {
512                 as->addTakeoverStats(add_stat, c);
513             }
514         }
515     }
516 }
517 
aggregateQueueStats(ConnCounter* aggregator)518 void DcpProducer::aggregateQueueStats(ConnCounter* aggregator) {
519     LockHolder lh(queueLock);
520     if (!aggregator) {
521         LOG(EXTENSION_LOG_WARNING, "%s Pointer to the queue stats aggregator"
522             " is NULL!!!", logHeader());
523         return;
524     }
525     aggregator->conn_queueDrain += itemsSent;
526     aggregator->conn_totalBytes += totalBytesSent;
527     aggregator->conn_queueRemaining += getItemsRemaining_UNLOCKED();
528     aggregator->conn_queueBackfillRemaining += totalBackfillBacklogs;
529 }
530 
notifySeqnoAvailable(uint16_t vbucket, uint64_t seqno)531 void DcpProducer::notifySeqnoAvailable(uint16_t vbucket, uint64_t seqno) {
532     LockHolder lh(queueLock);
533     std::map<uint16_t, stream_t>::iterator itr = streams.find(vbucket);
534     if (itr != streams.end() && itr->second->isActive()) {
535         stream_t stream = itr->second;
536         lh.unlock();
537         stream->notifySeqnoAvailable(seqno);
538     }
539 }
540 
vbucketStateChanged(uint16_t vbucket, vbucket_state_t state)541 void DcpProducer::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
542     LockHolder lh(queueLock);
543     std::map<uint16_t, stream_t>::iterator itr = streams.find(vbucket);
544     if (itr != streams.end()) {
545         stream_t stream = itr->second;
546         lh.unlock();
547         stream->setDead(END_STREAM_STATE);
548     }
549 }
550 
closeAllStreams()551 void DcpProducer::closeAllStreams() {
552     LockHolder lh(queueLock);
553     std::list<uint16_t> vblist;
554     while (!streams.empty()) {
555         std::map<uint16_t, stream_t>::iterator itr = streams.begin();
556         uint16_t vbid = itr->first;
557         itr->second->setDead(END_STREAM_DISCONNECTED);
558         streams.erase(vbid);
559         ready.remove(vbid);
560         vblist.push_back(vbid);
561     }
562     lh.unlock();
563 
564     connection_t conn(this);
565     std::list<uint16_t>::iterator it = vblist.begin();
566     for (; it != vblist.end(); ++it) {
567         engine_.getDcpConnMap().removeVBConnByVBId(conn, *it);
568     }
569 }
570 
getType() const571 const char* DcpProducer::getType() const {
572     if (notifyOnly) {
573         return "notifier";
574     } else {
575         return "producer";
576     }
577 }
578 
getNextItem()579 DcpResponse* DcpProducer::getNextItem() {
580     LockHolder lh(queueLock);
581 
582     setPaused(false);
583     while (!ready.empty()) {
584         if (log && log->isFull()) {
585             setPaused(true);
586             return NULL;
587         }
588 
589         uint16_t vbucket = ready.front();
590         ready.pop_front();
591 
592         if (streams.find(vbucket) == streams.end()) {
593             continue;
594         }
595         DcpResponse* op = streams[vbucket]->next();
596         if (!op) {
597             continue;
598         }
599 
600         switch (op->getEvent()) {
601             case DCP_SNAPSHOT_MARKER:
602             case DCP_MUTATION:
603             case DCP_DELETION:
604             case DCP_EXPIRATION:
605             case DCP_STREAM_END:
606             case DCP_SET_VBUCKET:
607                 break;
608             default:
609                 LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to write"
610                     " an unexpected event %d", logHeader(), op->getEvent());
611                 abort();
612         }
613 
614         if (log) {
615             log->insert(op);
616         }
617         ready.push_back(vbucket);
618 
619         if (op->getEvent() == DCP_MUTATION || op->getEvent() == DCP_DELETION ||
620             op->getEvent() == DCP_EXPIRATION) {
621             itemsSent++;
622         }
623 
624         totalBytesSent = totalBytesSent + op->getMessageSize();
625 
626         return op;
627     }
628 
629     setPaused(true);
630     return NULL;
631 }
632 
setDisconnect(bool disconnect)633 void DcpProducer::setDisconnect(bool disconnect) {
634     ConnHandler::setDisconnect(disconnect);
635 
636     if (disconnect) {
637         LockHolder lh(queueLock);
638         std::map<uint16_t, stream_t>::iterator itr = streams.begin();
639         for (; itr != streams.end(); ++itr) {
640             itr->second->setDead(END_STREAM_DISCONNECTED);
641         }
642     }
643 }
644 
notifyStreamReady(uint16_t vbucket, bool schedule)645 void DcpProducer::notifyStreamReady(uint16_t vbucket, bool schedule) {
646     LockHolder lh(queueLock);
647 
648     std::list<uint16_t>::iterator iter =
649         std::find(ready.begin(), ready.end(), vbucket);
650     if (iter != ready.end()) {
651         return;
652     }
653 
654     ready.push_back(vbucket);
655     lh.unlock();
656 
657     if (!log || (log && !log->isFull())) {
658         engine_.getDcpConnMap().notifyPausedConnection(this, schedule);
659     }
660 }
661 
maybeSendNoop(struct dcp_message_producers* producers)662 ENGINE_ERROR_CODE DcpProducer::maybeSendNoop(struct dcp_message_producers* producers) {
663     if (noopCtx.enabled) {
664         size_t sinceTime = ep_current_time() - noopCtx.sendTime;
665         if (noopCtx.pendingRecv && sinceTime > noopCtx.noopInterval) {
666             LOG(EXTENSION_LOG_WARNING, "%s Disconnected because the connection"
667                 " appears to be dead", logHeader());
668             return ENGINE_DISCONNECT;
669         } else if (!noopCtx.pendingRecv && sinceTime > noopCtx.noopInterval) {
670             ENGINE_ERROR_CODE ret;
671             EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
672             ret = producers->noop(getCookie(), ++noopCtx.opaque);
673             ObjectRegistry::onSwitchThread(epe);
674 
675             if (ret == ENGINE_SUCCESS) {
676                 ret = ENGINE_WANT_MORE;
677             }
678             noopCtx.pendingRecv = true;
679             noopCtx.sendTime = ep_current_time();
680             lastSendTime = ep_current_time();
681             return ret;
682         }
683     }
684     return ENGINE_FAILED;
685 }
686 
isTimeForNoop()687 bool DcpProducer::isTimeForNoop() {
688     // Not Implemented
689     return false;
690 }
691 
setTimeForNoop()692 void DcpProducer::setTimeForNoop() {
693     // Not Implemented
694 }
695 
clearQueues()696 void DcpProducer::clearQueues() {
697     LockHolder lh(queueLock);
698     std::map<uint16_t, stream_t>::iterator itr = streams.begin();
699     for (; itr != streams.end(); ++itr) {
700         itr->second->clear();
701     }
702 }
703 
appendQueue(std::list<queued_item> *q)704 void DcpProducer::appendQueue(std::list<queued_item> *q) {
705     (void) q;
706     abort(); // Not Implemented
707 }
708 
getBackfillQueueSize()709 size_t DcpProducer::getBackfillQueueSize() {
710     return totalBackfillBacklogs;
711 }
712 
getItemsSent()713 size_t DcpProducer::getItemsSent() {
714     return itemsSent;
715 }
716 
getItemsRemaining_UNLOCKED()717 size_t DcpProducer::getItemsRemaining_UNLOCKED() {
718     size_t remainingSize = 0;
719 
720     std::map<uint16_t, stream_t>::iterator itr = streams.begin();
721     for (; itr != streams.end(); ++itr) {
722         Stream *s = (itr->second).get();
723 
724         if (s->getType() == STREAM_ACTIVE) {
725             ActiveStream *as = static_cast<ActiveStream *>(s);
726             remainingSize += as->getItemsRemaining();
727         }
728     }
729 
730     return remainingSize;
731 }
732 
getTotalBytes()733 size_t DcpProducer::getTotalBytes() {
734     return totalBytesSent;
735 }
736 
getVBList()737 std::list<uint16_t> DcpProducer::getVBList() {
738     LockHolder lh(queueLock);
739     std::list<uint16_t> vblist;
740     std::map<uint16_t, stream_t>::iterator itr = streams.begin();
741     for (; itr != streams.end(); ++itr) {
742         vblist.push_back(itr->first);
743     }
744     return vblist;
745 }
746 
windowIsFull()747 bool DcpProducer::windowIsFull() {
748     abort(); // Not Implemented
749 }
750 
flush()751 void DcpProducer::flush() {
752     abort(); // Not Implemented
753 }
754