1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include "ep_engine.h"
21 #include "failover-table.h"
22 #include "connmap.h"
23 #include "tapthrottle.h"
24 #include "dcp-consumer.h"
25 #include "dcp-response.h"
26 #include "dcp-stream.h"
27 
28 class Processer : public GlobalTask {
29 public:
Processer(EventuallyPersistentEngine* e, connection_t c, const Priority &p, double sleeptime = 1, bool shutdown = false)30     Processer(EventuallyPersistentEngine* e, connection_t c,
31                 const Priority &p, double sleeptime = 1, bool shutdown = false)
32         : GlobalTask(e, p, sleeptime, shutdown), conn(c) {}
33 
34     bool run();
35 
36     std::string getDescription();
37 
38 private:
39     connection_t conn;
40 };
41 
run()42 bool Processer::run() {
43     DcpConsumer* consumer = static_cast<DcpConsumer*>(conn.get());
44     if (consumer->doDisconnect()) {
45         return false;
46     }
47 
48     switch (consumer->processBufferedItems()) {
49         case all_processed:
50             snooze(1);
51             break;
52         case more_to_process:
53             snooze(0);
54             break;
55         case cannot_process:
56             snooze(5);
57             break;
58         default:
59             abort();
60     }
61 
62     return true;
63 }
64 
getDescription()65 std::string Processer::getDescription() {
66     std::stringstream ss;
67     ss << "Processing buffered items for " << conn->getName();
68     return ss.str();
69 }
70 
DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie, const std::string &name)71 DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
72                          const std::string &name)
73     : Consumer(engine, cookie, name), opaqueCounter(0), processTaskId(0),
74           itemsToProcess(false), lastNoopTime(ep_current_time()), backoffs(0) {
75     Configuration& config = engine.getConfiguration();
76     streams = new passive_stream_t[config.getMaxVbuckets()];
77     setSupportAck(false);
78     setLogHeader("DCP (Consumer) " + getName() + " -");
79     setReserved(true);
80 
81     flowControl.enabled = config.isDcpEnableFlowControl();
82     flowControl.bufferSize = config.getDcpConnBufferSize();
83     flowControl.maxUnackedBytes = config.getDcpMaxUnackedBytes();
84 
85     noopInterval = config.getDcpNoopInterval();
86     enableNoop = config.isDcpEnableNoop();
87     sendNoopInterval = config.isDcpEnableNoop();
88 
89     ExTask task = new Processer(&engine, this, Priority::PendingOpsPriority, 1);
90     processTaskId = ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
91 }
92 
~DcpConsumer()93 DcpConsumer::~DcpConsumer() {
94     closeAllStreams();
95     delete[] streams;
96 }
97 
addStream(uint32_t opaque, uint16_t vbucket, uint32_t flags)98 ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
99                                          uint32_t flags) {
100     LockHolder lh(streamMutex);
101     if (doDisconnect()) {
102         return ENGINE_DISCONNECT;
103     }
104 
105     RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
106     if (!vb) {
107         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Add stream failed because this "
108             "vbucket doesn't exist", logHeader(), vbucket);
109         return ENGINE_NOT_MY_VBUCKET;
110     }
111 
112     if (vb->getState() == vbucket_state_active) {
113         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Add stream failed because this "
114             "vbucket happens to be in active state", logHeader(), vbucket);
115         return ENGINE_NOT_MY_VBUCKET;
116     }
117 
118     uint32_t new_opaque = ++opaqueCounter;
119     failover_entry_t entry = vb->failovers->getLatestEntry();
120     uint64_t start_seqno = vb->getHighSeqno();
121     uint64_t end_seqno = std::numeric_limits<uint64_t>::max();
122     uint64_t vbucket_uuid = entry.vb_uuid;
123     uint64_t snap_start_seqno;
124     uint64_t snap_end_seqno;
125 
126     vb->getCurrentSnapshot(snap_start_seqno, snap_end_seqno);
127 
128     passive_stream_t stream = streams[vbucket];
129     if (stream && stream->isActive()) {
130         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot add stream because one "
131             "already exists", logHeader(), vbucket);
132         return ENGINE_KEY_EEXISTS;
133     }
134 
135     streams[vbucket] = new PassiveStream(&engine_, this, getName(), flags,
136                                          new_opaque, vbucket, start_seqno,
137                                          end_seqno, vbucket_uuid,
138                                          snap_start_seqno, snap_end_seqno);
139     ready.push_back(vbucket);
140     opaqueMap_[new_opaque] = std::make_pair(opaque, vbucket);
141 
142     return ENGINE_SUCCESS;
143 }
144 
closeStream(uint32_t opaque, uint16_t vbucket)145 ENGINE_ERROR_CODE DcpConsumer::closeStream(uint32_t opaque, uint16_t vbucket) {
146     LockHolder lh(streamMutex);
147     if (doDisconnect()) {
148         return ENGINE_DISCONNECT;
149     }
150 
151     opaque_map::iterator oitr = opaqueMap_.find(opaque);
152     if (oitr != opaqueMap_.end()) {
153         opaqueMap_.erase(oitr);
154     }
155 
156     passive_stream_t stream = streams[vbucket];
157     if (!stream) {
158         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because no "
159             "stream exists for this vbucket", logHeader(), vbucket);
160         return ENGINE_KEY_ENOENT;
161     }
162 
163     uint32_t bytesCleared = stream->setDead(END_STREAM_CLOSED);
164     flowControl.freedBytes.fetch_add(bytesCleared);
165     return ENGINE_SUCCESS;
166 }
167 
streamEnd(uint32_t opaque, uint16_t vbucket, uint32_t flags)168 ENGINE_ERROR_CODE DcpConsumer::streamEnd(uint32_t opaque, uint16_t vbucket,
169                                          uint32_t flags) {
170     if (doDisconnect()) {
171         return ENGINE_DISCONNECT;
172     }
173 
174     ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
175     passive_stream_t stream = streams[vbucket];
176     if (stream && stream->getOpaque() == opaque && stream->isActive()) {
177         LOG(EXTENSION_LOG_INFO, "%s (vb %d) End stream received with reason %d",
178             logHeader(), vbucket, flags);
179         StreamEndResponse* response = new StreamEndResponse(opaque, flags,
180                                                             vbucket);
181         err = stream->messageReceived(response);
182 
183         bool disable = false;
184         if (err == ENGINE_SUCCESS &&
185             itemsToProcess.compare_exchange_strong(disable, true)) {
186             ExecutorPool::get()->wake(processTaskId);
187         }
188     }
189 
190     if (err != ENGINE_SUCCESS) {
191         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) End stream received with opaque "
192             "%d but does not exist", logHeader(), vbucket, opaque);
193         flowControl.freedBytes.fetch_add(StreamEndResponse::baseMsgBytes);
194     }
195 
196     return err;
197 }
198 
mutation(uint32_t opaque, const void* key, uint16_t nkey, const void* value, uint32_t nvalue, uint64_t cas, uint16_t vbucket, uint32_t flags, uint8_t datatype, uint32_t locktime, uint64_t bySeqno, uint64_t revSeqno, uint32_t exptime, uint8_t nru, const void* meta, uint16_t nmeta)199 ENGINE_ERROR_CODE DcpConsumer::mutation(uint32_t opaque, const void* key,
200                                         uint16_t nkey, const void* value,
201                                         uint32_t nvalue, uint64_t cas,
202                                         uint16_t vbucket, uint32_t flags,
203                                         uint8_t datatype, uint32_t locktime,
204                                         uint64_t bySeqno, uint64_t revSeqno,
205                                         uint32_t exptime, uint8_t nru,
206                                         const void* meta, uint16_t nmeta) {
207     if (doDisconnect()) {
208         return ENGINE_DISCONNECT;
209     }
210 
211     ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
212     passive_stream_t stream = streams[vbucket];
213     if (stream && stream->getOpaque() == opaque && stream->isActive()) {
214         std::string key_str(static_cast<const char*>(key), nkey);
215         value_t vblob(Blob::New(static_cast<const char*>(value), nvalue,
216                       &(datatype), (uint8_t) EXT_META_LEN));
217         Item *item = new Item(key_str, flags, exptime, vblob, cas, bySeqno,
218                               vbucket, revSeqno);
219         MutationResponse* response = new MutationResponse(item, opaque);
220         err = stream->messageReceived(response);
221 
222         bool disable = false;
223         if (err == ENGINE_SUCCESS &&
224             itemsToProcess.compare_exchange_strong(disable, true)) {
225             ExecutorPool::get()->wake(processTaskId);
226         }
227     }
228 
229     if (err != ENGINE_SUCCESS) {
230         uint32_t bytes =
231             MutationResponse::mutationBaseMsgBytes + nkey + nmeta + nvalue;
232         flowControl.freedBytes.fetch_add(bytes);
233     }
234 
235     return err;
236 }
237 
deletion(uint32_t opaque, const void* key, uint16_t nkey, uint64_t cas, uint16_t vbucket, uint64_t bySeqno, uint64_t revSeqno, const void* meta, uint16_t nmeta)238 ENGINE_ERROR_CODE DcpConsumer::deletion(uint32_t opaque, const void* key,
239                                         uint16_t nkey, uint64_t cas,
240                                         uint16_t vbucket, uint64_t bySeqno,
241                                         uint64_t revSeqno, const void* meta,
242                                         uint16_t nmeta) {
243     if (doDisconnect()) {
244         return ENGINE_DISCONNECT;
245     }
246 
247     ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
248     passive_stream_t stream = streams[vbucket];
249     if (stream && stream->getOpaque() == opaque && stream->isActive()) {
250         Item* item = new Item(key, nkey, 0, 0, NULL, 0, NULL, 0, cas, bySeqno,
251                               vbucket, revSeqno);
252         item->setDeleted();
253         MutationResponse* response = new MutationResponse(item, opaque);
254         err = stream->messageReceived(response);
255 
256         bool disable = false;
257         if (err == ENGINE_SUCCESS &&
258             itemsToProcess.compare_exchange_strong(disable, true)) {
259             ExecutorPool::get()->wake(processTaskId);
260         }
261     }
262 
263     if (err != ENGINE_SUCCESS) {
264         uint32_t bytes = MutationResponse::deletionBaseMsgBytes + nkey + nmeta;
265         flowControl.freedBytes.fetch_add(bytes);
266     }
267 
268     return err;
269 }
270 
expiration(uint32_t opaque, const void* key, uint16_t nkey, uint64_t cas, uint16_t vbucket, uint64_t bySeqno, uint64_t revSeqno, const void* meta, uint16_t nmeta)271 ENGINE_ERROR_CODE DcpConsumer::expiration(uint32_t opaque, const void* key,
272                                           uint16_t nkey, uint64_t cas,
273                                           uint16_t vbucket, uint64_t bySeqno,
274                                           uint64_t revSeqno, const void* meta,
275                                           uint16_t nmeta) {
276     return deletion(opaque, key, nkey, cas, vbucket, bySeqno, revSeqno, meta,
277                     nmeta);
278 }
279 
snapshotMarker(uint32_t opaque, uint16_t vbucket, uint64_t start_seqno, uint64_t end_seqno, uint32_t flags)280 ENGINE_ERROR_CODE DcpConsumer::snapshotMarker(uint32_t opaque,
281                                               uint16_t vbucket,
282                                               uint64_t start_seqno,
283                                               uint64_t end_seqno,
284                                               uint32_t flags) {
285     if (doDisconnect()) {
286         return ENGINE_DISCONNECT;
287     }
288 
289     ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
290     passive_stream_t stream = streams[vbucket];
291     if (stream && stream->getOpaque() == opaque && stream->isActive()) {
292         SnapshotMarker* response = new SnapshotMarker(opaque, vbucket,
293                                                       start_seqno, end_seqno,
294                                                       flags);
295         err = stream->messageReceived(response);
296 
297         bool disable = false;
298         if (err == ENGINE_SUCCESS &&
299             itemsToProcess.compare_exchange_strong(disable, true)) {
300             ExecutorPool::get()->wake(processTaskId);
301         }
302     }
303 
304     if (err != ENGINE_SUCCESS) {
305         flowControl.freedBytes.fetch_add(SnapshotMarker::baseMsgBytes);
306     }
307 
308     return err;
309 }
310 
noop(uint32_t opaque)311 ENGINE_ERROR_CODE DcpConsumer::noop(uint32_t opaque) {
312     lastNoopTime = ep_current_time();
313     return ENGINE_SUCCESS;
314 }
315 
flush(uint32_t opaque, uint16_t vbucket)316 ENGINE_ERROR_CODE DcpConsumer::flush(uint32_t opaque, uint16_t vbucket) {
317     if (doDisconnect()) {
318         return ENGINE_DISCONNECT;
319     }
320 
321     return ENGINE_ENOTSUP;
322 }
323 
setVBucketState(uint32_t opaque, uint16_t vbucket, vbucket_state_t state)324 ENGINE_ERROR_CODE DcpConsumer::setVBucketState(uint32_t opaque,
325                                                uint16_t vbucket,
326                                                vbucket_state_t state) {
327     if (doDisconnect()) {
328         return ENGINE_DISCONNECT;
329     }
330 
331     ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
332     passive_stream_t stream = streams[vbucket];
333     if (stream && stream->getOpaque() == opaque && stream->isActive()) {
334         SetVBucketState* response = new SetVBucketState(opaque, vbucket, state);
335         err = stream->messageReceived(response);
336 
337         bool disable = false;
338         if (err == ENGINE_SUCCESS &&
339             itemsToProcess.compare_exchange_strong(disable, true)) {
340             ExecutorPool::get()->wake(processTaskId);
341         }
342     }
343 
344     if (err != ENGINE_SUCCESS) {
345         flowControl.freedBytes.fetch_add(SetVBucketState::baseMsgBytes);
346     }
347 
348     return err;
349 }
350 
step(struct dcp_message_producers* producers)351 ENGINE_ERROR_CODE DcpConsumer::step(struct dcp_message_producers* producers) {
352     setLastWalkTime();
353 
354     if (doDisconnect()) {
355         return ENGINE_DISCONNECT;
356     }
357 
358     ENGINE_ERROR_CODE ret;
359     if ((ret = handleFlowCtl(producers)) != ENGINE_FAILED) {
360         if (ret == ENGINE_SUCCESS) {
361             ret = ENGINE_WANT_MORE;
362         }
363         return ret;
364     }
365 
366     if ((ret = handleNoop(producers)) != ENGINE_FAILED) {
367         if (ret == ENGINE_SUCCESS) {
368             ret = ENGINE_WANT_MORE;
369         }
370         return ret;
371     }
372 
373     DcpResponse *resp = getNextItem();
374     if (resp == NULL) {
375         return ENGINE_SUCCESS;
376     }
377 
378     ret = ENGINE_SUCCESS;
379     EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
380     switch (resp->getEvent()) {
381         case DCP_ADD_STREAM:
382         {
383             AddStreamResponse *as = static_cast<AddStreamResponse*>(resp);
384             ret = producers->add_stream_rsp(getCookie(), as->getOpaque(),
385                                             as->getStreamOpaque(),
386                                             as->getStatus());
387             break;
388         }
389         case DCP_STREAM_REQ:
390         {
391             StreamRequest *sr = static_cast<StreamRequest*> (resp);
392             ret = producers->stream_req(getCookie(), sr->getOpaque(),
393                                         sr->getVBucket(), sr->getFlags(),
394                                         sr->getStartSeqno(), sr->getEndSeqno(),
395                                         sr->getVBucketUUID(),
396                                         sr->getSnapStartSeqno(),
397                                         sr->getSnapEndSeqno());
398             break;
399         }
400         case DCP_SET_VBUCKET:
401         {
402             SetVBucketStateResponse* vs;
403             vs = static_cast<SetVBucketStateResponse*>(resp);
404             ret = producers->set_vbucket_state_rsp(getCookie(), vs->getOpaque(),
405                                                    vs->getStatus());
406             break;
407         }
408         case DCP_SNAPSHOT_MARKER:
409         {
410             SnapshotMarkerResponse* mr;
411             mr = static_cast<SnapshotMarkerResponse*>(resp);
412             ret = producers->marker_rsp(getCookie(), mr->getOpaque(),
413                                         mr->getStatus());
414             break;
415         }
416         default:
417             LOG(EXTENSION_LOG_WARNING, "%s Unknown consumer event (%d), "
418                 "disconnecting", logHeader(), resp->getEvent());
419             ret = ENGINE_DISCONNECT;
420     }
421     ObjectRegistry::onSwitchThread(epe);
422     delete resp;
423 
424     if (ret == ENGINE_SUCCESS) {
425         return ENGINE_WANT_MORE;
426     }
427     return ret;
428 }
429 
run()430 bool RollbackTask::run() {
431     if (cons->doRollback(opaque, vbid, rollbackSeqno)) {
432         return true;
433     }
434     ++(engine->getEpStats().rollbackCount);
435     return false;
436 }
437 
handleResponse( protocol_binary_response_header *resp)438 ENGINE_ERROR_CODE DcpConsumer::handleResponse(
439                                         protocol_binary_response_header *resp) {
440     if (doDisconnect()) {
441         return ENGINE_DISCONNECT;
442     }
443 
444     uint8_t opcode = resp->response.opcode;
445     uint32_t opaque = resp->response.opaque;
446 
447     opaque_map::iterator oitr = opaqueMap_.find(opaque);
448 
449     bool validOpaque = false;
450     if (oitr != opaqueMap_.end()) {
451         validOpaque = isValidOpaque(opaque, oitr->second.second);
452     }
453 
454     if (!validOpaque) {
455         LOG(EXTENSION_LOG_WARNING, "%s Received response with opaque %ld and "
456             "that stream no longer exists", logHeader());
457         return ENGINE_KEY_ENOENT;
458     }
459 
460     if (opcode == PROTOCOL_BINARY_CMD_DCP_STREAM_REQ) {
461         protocol_binary_response_dcp_stream_req* pkt =
462             reinterpret_cast<protocol_binary_response_dcp_stream_req*>(resp);
463 
464         uint16_t vbid = oitr->second.second;
465         uint16_t status = ntohs(pkt->message.header.response.status);
466         uint64_t bodylen = ntohl(pkt->message.header.response.bodylen);
467         uint8_t* body = pkt->bytes + sizeof(protocol_binary_response_header);
468 
469         if (status == PROTOCOL_BINARY_RESPONSE_ROLLBACK) {
470             cb_assert(bodylen == sizeof(uint64_t));
471             uint64_t rollbackSeqno = 0;
472             memcpy(&rollbackSeqno, body, sizeof(uint64_t));
473             rollbackSeqno = ntohll(rollbackSeqno);
474 
475             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Received rollback request "
476                 "to rollback seq no. %llu", logHeader(), vbid, rollbackSeqno);
477 
478             ExTask task = new RollbackTask(&engine_, opaque, vbid,
479                                            rollbackSeqno, this,
480                                            Priority::TapBgFetcherPriority);
481             ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
482             return ENGINE_SUCCESS;
483         }
484 
485         if (((bodylen % 16) != 0 || bodylen == 0) && status == ENGINE_SUCCESS) {
486             LOG(EXTENSION_LOG_WARNING, "%s (vb %d)Got a stream response with a "
487                 "bad failover log (length %llu), disconnecting", logHeader(),
488                 vbid, bodylen);
489             return ENGINE_DISCONNECT;
490         }
491 
492         streamAccepted(opaque, status, body, bodylen);
493         return ENGINE_SUCCESS;
494     } else if (opcode == PROTOCOL_BINARY_CMD_DCP_BUFFER_ACKNOWLEDGEMENT ||
495                opcode == PROTOCOL_BINARY_CMD_DCP_CONTROL) {
496         return ENGINE_SUCCESS;
497     }
498 
499     LOG(EXTENSION_LOG_WARNING, "%s Trying to handle an unknown response %d, "
500         "disconnecting", logHeader(), opcode);
501 
502     return ENGINE_DISCONNECT;
503 }
504 
doRollback(uint32_t opaque, uint16_t vbid, uint64_t rollbackSeqno)505 bool DcpConsumer::doRollback(uint32_t opaque, uint16_t vbid,
506                              uint64_t rollbackSeqno) {
507     ENGINE_ERROR_CODE err = engine_.getEpStore()->rollback(vbid, rollbackSeqno);
508 
509     if (err == ENGINE_NOT_MY_VBUCKET) {
510         LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Rollback failed because the "
511                 "vbucket was not found", logHeader(), vbid);
512         return false;
513     } else if (err == ENGINE_TMPFAIL) {
514         return true; // Reschedule the rollback.
515     }
516 
517     cb_assert(err == ENGINE_SUCCESS);
518 
519     LockHolder lh(streamMutex);
520     RCPtr<VBucket> vb = engine_.getVBucket(vbid);
521     streams[vbid]->reconnectStream(vb, opaque, vb->getHighSeqno());
522 
523     return false;
524 }
525 
addStats(ADD_STAT add_stat, const void *c)526 void DcpConsumer::addStats(ADD_STAT add_stat, const void *c) {
527     ConnHandler::addStats(add_stat, c);
528 
529     int max_vbuckets = engine_.getConfiguration().getMaxVbuckets();
530     for (int vbucket = 0; vbucket < max_vbuckets; vbucket++) {
531         passive_stream_t stream = streams[vbucket];
532         if (stream) {
533             stream->addStats(add_stat, c);
534         }
535     }
536 
537     addStat("total_backoffs", backoffs, add_stat, c);
538     if (flowControl.enabled) {
539         addStat("total_acked_bytes", flowControl.ackedBytes, add_stat, c);
540     }
541 }
542 
aggregateQueueStats(ConnCounter* aggregator)543 void DcpConsumer::aggregateQueueStats(ConnCounter* aggregator) {
544     aggregator->conn_queueBackoff += backoffs;
545 }
546 
processBufferedItems()547 process_items_error_t DcpConsumer::processBufferedItems() {
548     itemsToProcess.store(false);
549     process_items_error_t process_ret = all_processed;
550 
551     int max_vbuckets = engine_.getConfiguration().getMaxVbuckets();
552     for (int vbucket = 0; vbucket < max_vbuckets; vbucket++) {
553 
554         passive_stream_t stream = streams[vbucket];
555         if (!stream) {
556             continue;
557         }
558 
559         uint32_t bytes_processed;
560 
561         do {
562             if (!engine_.getTapThrottle().shouldProcess()) {
563                 backoffs++;
564                 return cannot_process;
565             }
566 
567             bytes_processed = 0;
568             process_ret = stream->processBufferedMessages(bytes_processed);
569             flowControl.freedBytes.fetch_add(bytes_processed);
570         } while (bytes_processed > 0 && process_ret != cannot_process);
571     }
572 
573     if (process_ret == all_processed && itemsToProcess.load()) {
574         return more_to_process;
575     }
576 
577     return process_ret;
578 }
579 
getNextItem()580 DcpResponse* DcpConsumer::getNextItem() {
581     LockHolder lh(streamMutex);
582 
583     setPaused(false);
584     while (!ready.empty()) {
585         uint16_t vbucket = ready.front();
586         ready.pop_front();
587 
588         passive_stream_t stream = streams[vbucket];
589         if (!stream) {
590             continue;
591         }
592 
593         DcpResponse* op = stream->next();
594         if (!op) {
595             continue;
596         }
597         switch (op->getEvent()) {
598             case DCP_STREAM_REQ:
599             case DCP_ADD_STREAM:
600             case DCP_SET_VBUCKET:
601             case DCP_SNAPSHOT_MARKER:
602                 break;
603             default:
604                 LOG(EXTENSION_LOG_WARNING, "%s Consumer is attempting to write"
605                     " an unexpected event %d", logHeader(), op->getEvent());
606                 abort();
607         }
608 
609         ready.push_back(vbucket);
610         return op;
611     }
612     setPaused(true);
613 
614     return NULL;
615 }
616 
notifyStreamReady(uint16_t vbucket)617 void DcpConsumer::notifyStreamReady(uint16_t vbucket) {
618     std::list<uint16_t>::iterator iter =
619         std::find(ready.begin(), ready.end(), vbucket);
620     if (iter != ready.end()) {
621         return;
622     }
623 
624     ready.push_back(vbucket);
625 
626     engine_.getDcpConnMap().notifyPausedConnection(this, true);
627 }
628 
streamAccepted(uint32_t opaque, uint16_t status, uint8_t* body, uint32_t bodylen)629 void DcpConsumer::streamAccepted(uint32_t opaque, uint16_t status, uint8_t* body,
630                                  uint32_t bodylen) {
631     LockHolder lh(streamMutex);
632 
633     opaque_map::iterator oitr = opaqueMap_.find(opaque);
634     if (oitr != opaqueMap_.end()) {
635         uint32_t add_opaque = oitr->second.first;
636         uint16_t vbucket = oitr->second.second;
637 
638         passive_stream_t stream = streams[vbucket];
639         if (stream && stream->getOpaque() == opaque &&
640             stream->getState() == STREAM_PENDING) {
641             if (status == ENGINE_SUCCESS) {
642                 RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
643                 vb->failovers->replaceFailoverLog(body, bodylen);
644                 EventuallyPersistentStore* st = engine_.getEpStore();
645                 st->scheduleVBSnapshot(Priority::VBucketPersistHighPriority,
646                                 st->getVBuckets().getShard(vbucket)->getId());
647             }
648             LOG(EXTENSION_LOG_INFO, "%s (vb %d) Add stream for opaque %ld"
649                 " %s with error code %d", logHeader(), vbucket, opaque,
650                 status == ENGINE_SUCCESS ? "succeeded" : "failed", status);
651             stream->acceptStream(status, add_opaque);
652         } else {
653             LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Trying to add stream, but "
654                 "none exists (opaque: %d, add_opaque: %d)", logHeader(),
655                 vbucket, opaque, add_opaque);
656         }
657         opaqueMap_.erase(opaque);
658     } else {
659         LOG(EXTENSION_LOG_WARNING, "%s No opaque found for add stream response "
660             "with opaque %ld", logHeader(), opaque);
661     }
662 }
663 
isValidOpaque(uint32_t opaque, uint16_t vbucket)664 bool DcpConsumer::isValidOpaque(uint32_t opaque, uint16_t vbucket) {
665     LockHolder lh(streamMutex);
666     passive_stream_t stream = streams[vbucket];
667     return stream && stream->getOpaque() == opaque;
668 }
669 
closeAllStreams()670 void DcpConsumer::closeAllStreams() {
671     int max_vbuckets = engine_.getConfiguration().getMaxVbuckets();
672     for (int vbucket = 0; vbucket < max_vbuckets; vbucket++) {
673         passive_stream_t stream = streams[vbucket];
674         if (stream) {
675             stream->setDead(END_STREAM_DISCONNECTED);
676         }
677     }
678 }
679 
handleNoop(struct dcp_message_producers* producers)680 ENGINE_ERROR_CODE DcpConsumer::handleNoop(struct dcp_message_producers* producers) {
681     if (enableNoop) {
682         ENGINE_ERROR_CODE ret;
683         uint32_t opaque = ++opaqueCounter;
684         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
685         ret = producers->control(getCookie(), opaque, "enable_noop", 11,
686                                  "true", 4);
687         ObjectRegistry::onSwitchThread(epe);
688         enableNoop = false;
689         return ret;
690     }
691 
692     if (sendNoopInterval) {
693         ENGINE_ERROR_CODE ret;
694         uint32_t opaque = ++opaqueCounter;
695         char buf_size[10];
696         snprintf(buf_size, 10, "%u", noopInterval);
697         EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
698         ret = producers->control(getCookie(), opaque, "set_noop_interval", 17,
699                                  buf_size, strlen(buf_size));
700         ObjectRegistry::onSwitchThread(epe);
701         sendNoopInterval = false;
702         return ret;
703     }
704 
705     if ((ep_current_time() - lastNoopTime) > (noopInterval * 2)) {
706         LOG(EXTENSION_LOG_WARNING, "%s Disconnecting because noop message has "
707             "no been received for %u seconds", logHeader(), (noopInterval * 2));
708         return ENGINE_DISCONNECT;
709     }
710 
711     return ENGINE_FAILED;
712 }
713 
handleFlowCtl(struct dcp_message_producers* producers)714 ENGINE_ERROR_CODE DcpConsumer::handleFlowCtl(struct dcp_message_producers* producers) {
715     if (flowControl.enabled) {
716         ENGINE_ERROR_CODE ret;
717         uint32_t ackable_bytes = flowControl.freedBytes;
718         if (flowControl.pendingControl) {
719             uint32_t opaque = ++opaqueCounter;
720             char buf_size[10];
721             snprintf(buf_size, 10, "%u", flowControl.bufferSize);
722             EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
723             ret = producers->control(getCookie(), opaque,
724                                      "connection_buffer_size", 22, buf_size,
725                                      strlen(buf_size));
726             ObjectRegistry::onSwitchThread(epe);
727             flowControl.pendingControl = false;
728             return (ret == ENGINE_SUCCESS) ? ENGINE_WANT_MORE : ret;
729         } else if (ackable_bytes > (flowControl.bufferSize * .2)) {
730             // Send a buffer ack when at least 20% of the buffer is drained
731             uint32_t opaque = ++opaqueCounter;
732             EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
733             ret = producers->buffer_acknowledgement(getCookie(), opaque, 0,
734                                                     ackable_bytes);
735             ObjectRegistry::onSwitchThread(epe);
736             flowControl.lastBufferAck = ep_current_time();
737             flowControl.ackedBytes.fetch_add(ackable_bytes);
738             flowControl.freedBytes.fetch_sub(ackable_bytes);
739             return (ret == ENGINE_SUCCESS) ? ENGINE_WANT_MORE : ret;
740         } else if (ackable_bytes > 0 &&
741                    (ep_current_time() - flowControl.lastBufferAck) > 5) {
742             // Ack at least every 5 seconds
743             uint32_t opaque = ++opaqueCounter;
744             EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
745             ret = producers->buffer_acknowledgement(getCookie(), opaque, 0,
746                                                     ackable_bytes);
747             ObjectRegistry::onSwitchThread(epe);
748             flowControl.lastBufferAck = ep_current_time();
749             flowControl.ackedBytes.fetch_add(ackable_bytes);
750             flowControl.freedBytes.fetch_sub(ackable_bytes);
751             return (ret == ENGINE_SUCCESS) ? ENGINE_WANT_MORE : ret;
752         }
753     }
754     return ENGINE_FAILED;
755 }
756