xref: /4.6.4/ep-engine/src/dcp/consumer.cc (revision 73d84472)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "dcp/consumer.h"
19
20#include "ep_engine.h"
21#include "failover-table.h"
22#include "replicationthrottle.h"
23#include "dcp/stream.h"
24#include "dcp/response.h"
25
26#include <climits>
27
28const std::string DcpConsumer::noopCtrlMsg = "enable_noop";
29const std::string DcpConsumer::noopIntervalCtrlMsg = "set_noop_interval";
30const std::string DcpConsumer::connBufferCtrlMsg = "connection_buffer_size";
31const std::string DcpConsumer::priorityCtrlMsg = "set_priority";
32const std::string DcpConsumer::extMetadataCtrlMsg = "enable_ext_metadata";
33const std::string DcpConsumer::valueCompressionCtrlMsg = "enable_value_compression";
34const std::string DcpConsumer::cursorDroppingCtrlMsg = "supports_cursor_dropping";
35
36class Processor : public GlobalTask {
37public:
38    Processor(EventuallyPersistentEngine* e,
39              connection_t c,
40              double sleeptime = 1,
41              bool completeBeforeShutdown = true)
42        : GlobalTask(e, TaskId::Processor, sleeptime, completeBeforeShutdown),
43          conn(c) {}
44
45    ~Processor() {
46        DcpConsumer* consumer = static_cast<DcpConsumer*>(conn.get());
47        consumer->taskCancelled();
48    }
49
50    bool run() {
51        DcpConsumer* consumer = static_cast<DcpConsumer*>(conn.get());
52        if (consumer->doDisconnect()) {
53            return false;
54        }
55
56        double sleepFor = 0.0;
57        enum process_items_error_t state = consumer->processBufferedItems();
58        switch (state) {
59            case all_processed:
60                sleepFor = INT_MAX;
61                break;
62            case more_to_process:
63                sleepFor = 0.0;
64                break;
65            case cannot_process:
66                sleepFor = 5.0;
67                break;
68        }
69
70        if (consumer->notifiedProcessor(false)) {
71            snooze(0.0);
72            state = more_to_process;
73        } else {
74            snooze(sleepFor);
75            // Check if the processor was notified again,
76            // in which case the task should wake immediately.
77            if (consumer->notifiedProcessor(false)) {
78                snooze(0.0);
79                state = more_to_process;
80            }
81        }
82
83        consumer->setProcessorTaskState(state);
84
85        return true;
86    }
87
88    std::string getDescription() {
89        std::stringstream ss;
90        ss << "Processing buffered items for " << conn->getName();
91        return ss.str();
92    }
93
94private:
95    connection_t conn;
96};
97
98DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
99                         const std::string &name)
100    : Consumer(engine, cookie, name),
101      lastMessageTime(ep_current_time()),
102      opaqueCounter(0),
103      processorTaskId(0),
104      processorTaskState(all_processed),
105      processorNotification(false),
106      backoffs(0),
107      taskAlreadyCancelled(false),
108      flowControl(engine, this),
109      processBufferedMessagesYieldThreshold(engine.getConfiguration().
110                                                getDcpConsumerProcessBufferedMessagesYieldLimit()),
111      processBufferedMessagesBatchSize(engine.getConfiguration().
112                                            getDcpConsumerProcessBufferedMessagesBatchSize()) {
113    Configuration& config = engine.getConfiguration();
114    streams.resize(config.getMaxVbuckets());
115    setSupportAck(false);
116    setLogHeader("DCP (Consumer) " + getName() + " -");
117    setReserved(true);
118
119    noopInterval = config.getDcpNoopInterval();
120
121    pendingEnableNoop = config.isDcpEnableNoop();
122    pendingSendNoopInterval = config.isDcpEnableNoop();
123    pendingSetPriority = true;
124    pendingEnableExtMetaData = true;
125    pendingEnableValueCompression = config.isDcpValueCompressionEnabled();
126    pendingSupportCursorDropping = true;
127
128    ExTask task = new Processor(&engine, this, 1);
129    processorTaskId = ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
130}
131
132DcpConsumer::~DcpConsumer() {
133    cancelTask();
134}
135
136
137void DcpConsumer::cancelTask() {
138    bool inverse = false;
139    if (taskAlreadyCancelled.compare_exchange_strong(inverse, true)) {
140        ExecutorPool::get()->cancel(processorTaskId);
141    }
142}
143
144void DcpConsumer::taskCancelled() {
145    bool inverse = false;
146    taskAlreadyCancelled.compare_exchange_strong(inverse, true);
147}
148
149ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque, uint16_t vbucket,
150                                         uint32_t flags) {
151    lastMessageTime = ep_current_time();
152    LockHolder lh(readyMutex);
153    if (doDisconnect()) {
154        return ENGINE_DISCONNECT;
155    }
156
157    RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
158    if (!vb) {
159        logger.log(EXTENSION_LOG_WARNING,
160            "(vb %d) Add stream failed because this vbucket doesn't exist",
161            vbucket);
162        return ENGINE_NOT_MY_VBUCKET;
163    }
164
165    if (vb->getState() == vbucket_state_active) {
166        logger.log(EXTENSION_LOG_WARNING,
167            "(vb %d) Add stream failed because this vbucket happens to be in "
168            "active state", vbucket);
169        return ENGINE_NOT_MY_VBUCKET;
170    }
171
172    snapshot_info_t info = vb->checkpointManager.getSnapshotInfo();
173    if (info.range.end == info.start) {
174        info.range.start = info.start;
175    }
176
177    uint32_t new_opaque = ++opaqueCounter;
178    failover_entry_t entry = vb->failovers->getLatestEntry();
179    uint64_t start_seqno = info.start;
180    uint64_t end_seqno = std::numeric_limits<uint64_t>::max();
181    uint64_t vbucket_uuid = entry.vb_uuid;
182    uint64_t snap_start_seqno = info.range.start;
183    uint64_t snap_end_seqno = info.range.end;
184    uint64_t high_seqno = vb->getHighSeqno();
185
186    passive_stream_t stream = streams[vbucket];
187    if (stream && stream->isActive()) {
188        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot add stream because one "
189            "already exists", logHeader(), vbucket);
190        return ENGINE_KEY_EEXISTS;
191    }
192
193    streams[vbucket] = new PassiveStream(&engine_, this, getName(), flags,
194                                         new_opaque, vbucket, start_seqno,
195                                         end_seqno, vbucket_uuid,
196                                         snap_start_seqno, snap_end_seqno,
197                                         high_seqno);
198    ready.push_back(vbucket);
199    opaqueMap_[new_opaque] = std::make_pair(opaque, vbucket);
200
201    return ENGINE_SUCCESS;
202}
203
204ENGINE_ERROR_CODE DcpConsumer::closeStream(uint32_t opaque, uint16_t vbucket) {
205    lastMessageTime = ep_current_time();
206    if (doDisconnect()) {
207        streams[vbucket].reset();
208        return ENGINE_DISCONNECT;
209    }
210
211    opaque_map::iterator oitr = opaqueMap_.find(opaque);
212    if (oitr != opaqueMap_.end()) {
213        opaqueMap_.erase(oitr);
214    }
215
216    passive_stream_t stream = streams[vbucket];
217    if (!stream) {
218        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because no "
219            "stream exists for this vbucket", logHeader(), vbucket);
220        return ENGINE_KEY_ENOENT;
221    }
222
223    uint32_t bytesCleared = stream->setDead(END_STREAM_CLOSED);
224    flowControl.incrFreedBytes(bytesCleared);
225    streams[vbucket].reset();
226    notifyConsumerIfNecessary(true/*schedule*/);
227
228    return ENGINE_SUCCESS;
229}
230
231ENGINE_ERROR_CODE DcpConsumer::streamEnd(uint32_t opaque, uint16_t vbucket,
232                                         uint32_t flags) {
233    lastMessageTime = ep_current_time();
234    if (doDisconnect()) {
235        return ENGINE_DISCONNECT;
236    }
237
238    ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
239    passive_stream_t stream = streams[vbucket];
240    if (stream && stream->getOpaque() == opaque && stream->isActive()) {
241        LOG(EXTENSION_LOG_INFO, "%s (vb %d) End stream received with reason %d",
242            logHeader(), vbucket, flags);
243
244        StreamEndResponse* response;
245        try {
246            response = new StreamEndResponse(opaque, flags, vbucket);
247        } catch (const std::bad_alloc&) {
248            return ENGINE_ENOMEM;
249        }
250
251        err = stream->messageReceived(response);
252
253        if (err == ENGINE_TMPFAIL) {
254            notifyVbucketReady(vbucket);
255        }
256    }
257
258    // The item was buffered and will be processed later
259    if (err == ENGINE_TMPFAIL) {
260        return ENGINE_SUCCESS;
261    }
262
263    if (err != ENGINE_SUCCESS) {
264        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) End stream received with opaque "
265            "%d but does not exist", logHeader(), vbucket, opaque);
266    }
267
268    flowControl.incrFreedBytes(StreamEndResponse::baseMsgBytes);
269    notifyConsumerIfNecessary(true/*schedule*/);
270
271    return err;
272}
273
274ENGINE_ERROR_CODE DcpConsumer::mutation(uint32_t opaque, const void* key,
275                                        uint16_t nkey, const void* value,
276                                        uint32_t nvalue, uint64_t cas,
277                                        uint16_t vbucket, uint32_t flags,
278                                        uint8_t datatype, uint32_t locktime,
279                                        uint64_t bySeqno, uint64_t revSeqno,
280                                        uint32_t exptime, uint8_t nru,
281                                        const void* meta, uint16_t nmeta) {
282    lastMessageTime = ep_current_time();
283    if (doDisconnect()) {
284        return ENGINE_DISCONNECT;
285    }
286
287    if (bySeqno == 0) {
288        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid sequence number(0) "
289            "for mutation!", logHeader(), vbucket);
290        return ENGINE_EINVAL;
291    }
292
293    ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
294    passive_stream_t stream = streams[vbucket];
295    if (stream && stream->getOpaque() == opaque && stream->isActive()) {
296        queued_item item(new Item(key, nkey, flags, exptime, value, nvalue,
297                                  &datatype, EXT_META_LEN, cas, bySeqno,
298                                  vbucket, revSeqno));
299
300        ExtendedMetaData *emd = NULL;
301        if (nmeta > 0) {
302            emd = new ExtendedMetaData(meta, nmeta);
303
304            if (emd == NULL) {
305                return ENGINE_ENOMEM;
306            }
307            if (emd->getStatus() == ENGINE_EINVAL) {
308                delete emd;
309                return ENGINE_EINVAL;
310            }
311        }
312
313        MutationResponse* response;
314        try {
315            response = new MutationResponse(item, opaque, emd);
316        } catch (const std::bad_alloc&) {
317            delete emd;
318            return ENGINE_ENOMEM;
319        }
320
321        err = stream->messageReceived(response);
322
323        if (err == ENGINE_TMPFAIL) {
324            notifyVbucketReady(vbucket);
325        }
326    }
327
328    // The item was buffered and will be processed later
329    if (err == ENGINE_TMPFAIL) {
330        return ENGINE_SUCCESS;
331    }
332
333    uint32_t bytes =
334        MutationResponse::mutationBaseMsgBytes + nkey + nmeta + nvalue;
335    flowControl.incrFreedBytes(bytes);
336    notifyConsumerIfNecessary(true/*schedule*/);
337
338    return err;
339}
340
341ENGINE_ERROR_CODE DcpConsumer::deletion(uint32_t opaque, const void* key,
342                                        uint16_t nkey, uint64_t cas,
343                                        uint16_t vbucket, uint64_t bySeqno,
344                                        uint64_t revSeqno, const void* meta,
345                                        uint16_t nmeta) {
346    lastMessageTime = ep_current_time();
347    if (doDisconnect()) {
348        return ENGINE_DISCONNECT;
349    }
350
351    if (bySeqno == 0) {
352        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid sequence number(0)"
353            "for deletion!", logHeader(), vbucket);
354        return ENGINE_EINVAL;
355    }
356
357    ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
358    passive_stream_t stream = streams[vbucket];
359    if (stream && stream->getOpaque() == opaque && stream->isActive()) {
360        queued_item item(new Item(key, nkey, 0, 0, NULL, 0, NULL, 0, cas, bySeqno,
361                                  vbucket, revSeqno));
362        item->setDeleted();
363
364        ExtendedMetaData *emd = NULL;
365        if (nmeta > 0) {
366            emd = new ExtendedMetaData(meta, nmeta);
367
368            if (emd == NULL) {
369                return ENGINE_ENOMEM;
370            }
371            if (emd->getStatus() == ENGINE_EINVAL) {
372                delete emd;
373                return ENGINE_EINVAL;
374            }
375        }
376
377        MutationResponse* response;
378        try {
379            response = new MutationResponse(item, opaque, emd);
380        } catch (const std::bad_alloc&) {
381            delete emd;
382            return ENGINE_ENOMEM;
383        }
384
385        err = stream->messageReceived(response);
386
387        if (err == ENGINE_TMPFAIL) {
388            notifyVbucketReady(vbucket);
389        }
390    }
391
392    // The item was buffered and will be processed later
393    if (err == ENGINE_TMPFAIL) {
394        return ENGINE_SUCCESS;
395    }
396
397    uint32_t bytes = MutationResponse::deletionBaseMsgBytes + nkey + nmeta;
398    flowControl.incrFreedBytes(bytes);
399    notifyConsumerIfNecessary(true/*schedule*/);
400
401    return err;
402}
403
404ENGINE_ERROR_CODE DcpConsumer::expiration(uint32_t opaque, const void* key,
405                                          uint16_t nkey, uint64_t cas,
406                                          uint16_t vbucket, uint64_t bySeqno,
407                                          uint64_t revSeqno, const void* meta,
408                                          uint16_t nmeta) {
409    // lastMessageTime is set in deletion function
410    return deletion(opaque, key, nkey, cas, vbucket, bySeqno, revSeqno, meta,
411                    nmeta);
412}
413
414ENGINE_ERROR_CODE DcpConsumer::snapshotMarker(uint32_t opaque,
415                                              uint16_t vbucket,
416                                              uint64_t start_seqno,
417                                              uint64_t end_seqno,
418                                              uint32_t flags) {
419    lastMessageTime = ep_current_time();
420    if (doDisconnect()) {
421        return ENGINE_DISCONNECT;
422    }
423
424    if (start_seqno > end_seqno) {
425        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Invalid snapshot marker "
426            "received, snap_start (%" PRIu64 ") <= snap_end (%" PRIu64 ")",
427            logHeader(), vbucket, start_seqno, end_seqno);
428        return ENGINE_EINVAL;
429    }
430
431    ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
432    passive_stream_t stream = streams[vbucket];
433    if (stream && stream->getOpaque() == opaque && stream->isActive()) {
434        SnapshotMarker* response;
435        try {
436            response = new SnapshotMarker(opaque, vbucket, start_seqno,
437                                          end_seqno, flags);
438        } catch (const std::bad_alloc&) {
439            return ENGINE_ENOMEM;
440        }
441
442        err = stream->messageReceived(response);
443
444        if (err == ENGINE_TMPFAIL) {
445            notifyVbucketReady(vbucket);
446        }
447    }
448
449    // The item was buffered and will be processed later
450    if (err == ENGINE_TMPFAIL) {
451        return ENGINE_SUCCESS;
452    }
453
454    flowControl.incrFreedBytes(SnapshotMarker::baseMsgBytes);
455    notifyConsumerIfNecessary(true/*schedule*/);
456
457    return err;
458}
459
460ENGINE_ERROR_CODE DcpConsumer::noop(uint32_t opaque) {
461    lastMessageTime = ep_current_time();
462    return ENGINE_SUCCESS;
463}
464
465ENGINE_ERROR_CODE DcpConsumer::flush(uint32_t opaque, uint16_t vbucket) {
466    lastMessageTime = ep_current_time();
467    if (doDisconnect()) {
468        return ENGINE_DISCONNECT;
469    }
470
471    return ENGINE_ENOTSUP;
472}
473
474ENGINE_ERROR_CODE DcpConsumer::setVBucketState(uint32_t opaque,
475                                               uint16_t vbucket,
476                                               vbucket_state_t state) {
477    lastMessageTime = ep_current_time();
478    if (doDisconnect()) {
479        return ENGINE_DISCONNECT;
480    }
481
482    ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
483    passive_stream_t stream = streams[vbucket];
484    if (stream && stream->getOpaque() == opaque && stream->isActive()) {
485        SetVBucketState* response;
486        try {
487            response = new SetVBucketState(opaque, vbucket, state);
488        } catch (const std::bad_alloc&) {
489            return ENGINE_ENOMEM;
490        }
491
492        err = stream->messageReceived(response);
493
494        if (err == ENGINE_TMPFAIL) {
495            notifyVbucketReady(vbucket);
496        }
497    }
498
499    // The item was buffered and will be processed later
500    if (err == ENGINE_TMPFAIL) {
501        return ENGINE_SUCCESS;
502    }
503
504    flowControl.incrFreedBytes(SetVBucketState::baseMsgBytes);
505    notifyConsumerIfNecessary(true/*schedule*/);
506
507    return err;
508}
509
510ENGINE_ERROR_CODE DcpConsumer::step(struct dcp_message_producers* producers) {
511    setLastWalkTime();
512
513    if (doDisconnect()) {
514        return ENGINE_DISCONNECT;
515    }
516
517    ENGINE_ERROR_CODE ret;
518    if ((ret = flowControl.handleFlowCtl(producers)) != ENGINE_FAILED) {
519        if (ret == ENGINE_SUCCESS) {
520            ret = ENGINE_WANT_MORE;
521        }
522        return ret;
523    }
524
525    if ((ret = handleNoop(producers)) != ENGINE_FAILED) {
526        if (ret == ENGINE_SUCCESS) {
527            ret = ENGINE_WANT_MORE;
528        }
529        return ret;
530    }
531
532    if ((ret = handlePriority(producers)) != ENGINE_FAILED) {
533        if (ret == ENGINE_SUCCESS) {
534            ret = ENGINE_WANT_MORE;
535        }
536        return ret;
537    }
538
539    if ((ret = handleExtMetaData(producers)) != ENGINE_FAILED) {
540        if (ret == ENGINE_SUCCESS) {
541            ret = ENGINE_WANT_MORE;
542        }
543        return ret;
544    }
545
546    if ((ret = handleValueCompression(producers)) != ENGINE_FAILED) {
547        if (ret == ENGINE_SUCCESS) {
548            ret = ENGINE_WANT_MORE;
549        }
550        return ret;
551    }
552
553    if ((ret = supportCursorDropping(producers)) != ENGINE_FAILED) {
554        if (ret == ENGINE_SUCCESS) {
555            ret = ENGINE_WANT_MORE;
556        }
557        return ret;
558    }
559
560    DcpResponse *resp = getNextItem();
561    if (resp == NULL) {
562        return ENGINE_SUCCESS;
563    }
564
565    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
566    switch (resp->getEvent()) {
567        case DCP_ADD_STREAM:
568        {
569            AddStreamResponse *as = static_cast<AddStreamResponse*>(resp);
570            ret = producers->add_stream_rsp(getCookie(), as->getOpaque(),
571                                            as->getStreamOpaque(),
572                                            as->getStatus());
573            break;
574        }
575        case DCP_STREAM_REQ:
576        {
577            StreamRequest *sr = static_cast<StreamRequest*> (resp);
578            ret = producers->stream_req(getCookie(), sr->getOpaque(),
579                                        sr->getVBucket(), sr->getFlags(),
580                                        sr->getStartSeqno(), sr->getEndSeqno(),
581                                        sr->getVBucketUUID(),
582                                        sr->getSnapStartSeqno(),
583                                        sr->getSnapEndSeqno());
584            break;
585        }
586        case DCP_SET_VBUCKET:
587        {
588            SetVBucketStateResponse* vs;
589            vs = static_cast<SetVBucketStateResponse*>(resp);
590            ret = producers->set_vbucket_state_rsp(getCookie(), vs->getOpaque(),
591                                                   vs->getStatus());
592            break;
593        }
594        case DCP_SNAPSHOT_MARKER:
595        {
596            SnapshotMarkerResponse* mr;
597            mr = static_cast<SnapshotMarkerResponse*>(resp);
598            ret = producers->marker_rsp(getCookie(), mr->getOpaque(),
599                                        mr->getStatus());
600            break;
601        }
602        default:
603            LOG(EXTENSION_LOG_WARNING, "%s Unknown consumer event (%d), "
604                "disconnecting", logHeader(), resp->getEvent());
605            ret = ENGINE_DISCONNECT;
606    }
607    ObjectRegistry::onSwitchThread(epe);
608    delete resp;
609
610    if (ret == ENGINE_SUCCESS) {
611        return ENGINE_WANT_MORE;
612    }
613    return ret;
614}
615
616bool RollbackTask::run() {
617    if (cons->doDisconnect()) {
618        return false;
619    }
620    if (cons->doRollback(opaque, vbid, rollbackSeqno)) {
621        return true;
622    }
623    ++(engine->getEpStats().rollbackCount);
624    return false;
625}
626
627ENGINE_ERROR_CODE DcpConsumer::handleResponse(
628                                        protocol_binary_response_header *resp) {
629    if (doDisconnect()) {
630        return ENGINE_DISCONNECT;
631    }
632
633    uint8_t opcode = resp->response.opcode;
634    uint32_t opaque = resp->response.opaque;
635
636    opaque_map::iterator oitr = opaqueMap_.find(opaque);
637
638    bool validOpaque = false;
639    if (oitr != opaqueMap_.end()) {
640        validOpaque = isValidOpaque(opaque, oitr->second.second);
641    }
642
643    if (!validOpaque) {
644        logger.log(EXTENSION_LOG_WARNING,
645            "Received response with opaque %" PRIu32 " and that stream no "
646                    "longer exists", opaque);
647        return ENGINE_KEY_ENOENT;
648    }
649
650    if (opcode == PROTOCOL_BINARY_CMD_DCP_STREAM_REQ) {
651        protocol_binary_response_dcp_stream_req* pkt =
652            reinterpret_cast<protocol_binary_response_dcp_stream_req*>(resp);
653
654        uint16_t vbid = oitr->second.second;
655        uint16_t status = ntohs(pkt->message.header.response.status);
656        uint64_t bodylen = ntohl(pkt->message.header.response.bodylen);
657        uint8_t* body = pkt->bytes + sizeof(protocol_binary_response_header);
658
659        if (status == PROTOCOL_BINARY_RESPONSE_ROLLBACK) {
660            if (bodylen != sizeof(uint64_t)) {
661                LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Received rollback "
662                    "request with incorrect bodylen of %" PRIu64 ", disconnecting",
663                    logHeader(), vbid, bodylen);
664                return ENGINE_DISCONNECT;
665            }
666            uint64_t rollbackSeqno = 0;
667            memcpy(&rollbackSeqno, body, sizeof(uint64_t));
668            rollbackSeqno = ntohll(rollbackSeqno);
669
670            LOG(EXTENSION_LOG_NOTICE, "%s (vb %d) Received rollback request "
671                "to rollback seq no. %" PRIu64, logHeader(), vbid, rollbackSeqno);
672
673            ExTask task = new RollbackTask(&engine_, opaque, vbid,
674                                           rollbackSeqno, this);
675            ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
676            return ENGINE_SUCCESS;
677        }
678
679        if (((bodylen % 16) != 0 || bodylen == 0) && status == ENGINE_SUCCESS) {
680            LOG(EXTENSION_LOG_WARNING, "%s (vb %d)Got a stream response with a "
681                "bad failover log (length %" PRIu64 "), disconnecting",
682                logHeader(), vbid, bodylen);
683            return ENGINE_DISCONNECT;
684        }
685
686        streamAccepted(opaque, status, body, bodylen);
687        return ENGINE_SUCCESS;
688    } else if (opcode == PROTOCOL_BINARY_CMD_DCP_BUFFER_ACKNOWLEDGEMENT ||
689               opcode == PROTOCOL_BINARY_CMD_DCP_CONTROL) {
690        return ENGINE_SUCCESS;
691    }
692
693    LOG(EXTENSION_LOG_WARNING, "%s Trying to handle an unknown response %d, "
694        "disconnecting", logHeader(), opcode);
695
696    return ENGINE_DISCONNECT;
697}
698
699bool DcpConsumer::doRollback(uint32_t opaque,
700                             uint16_t vbid,
701                             uint64_t rollbackSeqno) {
702    TaskStatus status = engine_.getEpStore()->rollback(vbid, rollbackSeqno);
703
704    switch (status) {
705    case TaskStatus::Reschedule:
706        return true; // Reschedule the rollback.
707    case TaskStatus::Abort:
708        logger.log(EXTENSION_LOG_WARNING,
709                   "vb:%" PRIu16 " Rollback failed on the vbucket",
710                   vbid);
711        break;
712    case TaskStatus::Complete: {
713        RCPtr<VBucket> vb = engine_.getVBucket(vbid);
714        if (!vb) {
715            logger.log(EXTENSION_LOG_WARNING,
716                       "vb:%" PRIu16
717                       " Aborting rollback task as the vbucket "
718                       "was deleted after rollback",
719                       vbid);
720            break;
721        }
722        passive_stream_t stream = streams[vbid];
723        if (stream) {
724            stream->reconnectStream(vb, opaque, vb->getHighSeqno());
725        }
726        break;
727    }
728    }
729    return false; // Do not reschedule the rollback
730}
731
732void DcpConsumer::addStats(ADD_STAT add_stat, const void *c) {
733    ConnHandler::addStats(add_stat, c);
734
735    int max_vbuckets = engine_.getConfiguration().getMaxVbuckets();
736    for (int vbucket = 0; vbucket < max_vbuckets; vbucket++) {
737        passive_stream_t stream = streams[vbucket];
738        if (stream) {
739            stream->addStats(add_stat, c);
740        }
741    }
742
743    addStat("total_backoffs", backoffs, add_stat, c);
744    addStat("processor_task_state", getProcessorTaskStatusStr(), add_stat, c);
745    flowControl.addStats(add_stat, c);
746}
747
748void DcpConsumer::aggregateQueueStats(ConnCounter& aggregator) {
749    aggregator.conn_queueBackoff += backoffs;
750}
751
752bool DcpConsumer::tryAndAssignVbucketsStream(uint16_t vbid, passive_stream_t& stream) {
753    if (streams[vbid]) {
754        // This assignment hits a spinlock, so we only want to do it
755        // if streams[vbucket] owns a pointer
756        stream = streams[vbid];
757        // Now we've done the 'expensive' copy, recheck as it's possible
758        // the stream went to null after our first cheap check.
759        return stream.get() != nullptr;
760    }
761    return false;
762}
763
764process_items_error_t DcpConsumer::drainStreamsBufferedItems(passive_stream_t& stream,
765                                                             size_t yieldThreshold) {
766    process_items_error_t rval = all_processed;
767    uint32_t bytesProcessed = 0;
768    size_t iterations = 0;
769    do {
770        if (!engine_.getReplicationThrottle().shouldProcess()) {
771            backoffs++;
772            vbReady.pushUnique(stream->getVBucket());
773            return cannot_process;
774        }
775
776        bytesProcessed = 0;
777        rval = stream->processBufferedMessages(bytesProcessed,
778                                               processBufferedMessagesBatchSize);
779        flowControl.incrFreedBytes(bytesProcessed);
780
781        // Notifying memcached on clearing items for flow control
782        notifyConsumerIfNecessary(false/*schedule*/);
783
784        iterations++;
785    } while (bytesProcessed > 0 &&
786             rval == all_processed &&
787             iterations <= yieldThreshold);
788
789    // The stream may not be done yet so must go back in the ready queue
790    if (bytesProcessed > 0) {
791        vbReady.pushUnique(stream->getVBucket());
792        rval = more_to_process; // Return more_to_process to force a snooze(0.0)
793    }
794
795    return rval;
796}
797
798process_items_error_t DcpConsumer::processBufferedItems() {
799    process_items_error_t process_ret = all_processed;
800    uint16_t vbucket = 0;
801    while (vbReady.popFront(vbucket)) {
802        passive_stream_t stream;
803
804        if (!tryAndAssignVbucketsStream(vbucket, stream)) {
805            // Try popping again
806            continue;
807        }
808
809        process_ret = drainStreamsBufferedItems(stream,
810                                                processBufferedMessagesYieldThreshold);
811
812        if (process_ret == all_processed) {
813            return more_to_process;
814        }
815
816        if (process_ret == cannot_process) {
817            // If items for current vbucket weren't processed,
818            // re-add current vbucket
819            if (vbReady.size() > 0) {
820                // If there are more vbuckets in queue, sleep(0).
821                process_ret = more_to_process;
822            }
823            vbReady.pushUnique(vbucket);
824        }
825
826        return process_ret;
827    }
828
829    return process_ret;
830}
831
832void DcpConsumer::notifyVbucketReady(uint16_t vbucket) {
833    if (vbReady.pushUnique(vbucket) &&
834        notifiedProcessor(true)) {
835        ExecutorPool::get()->wake(processorTaskId);
836    }
837}
838
839bool DcpConsumer::notifiedProcessor(bool to) {
840    bool inverse = !to;
841    return processorNotification.compare_exchange_strong(inverse, to);
842}
843
844void DcpConsumer::setProcessorTaskState(enum process_items_error_t to) {
845    processorTaskState = to;
846}
847
848std::string DcpConsumer::getProcessorTaskStatusStr() {
849    switch (processorTaskState.load()) {
850        case all_processed:
851            return "ALL_PROCESSED";
852        case more_to_process:
853            return "MORE_TO_PROCESS";
854        case cannot_process:
855            return "CANNOT_PROCESS";
856    }
857
858    return "UNKNOWN";
859}
860
861DcpResponse* DcpConsumer::getNextItem() {
862    LockHolder lh(readyMutex);
863
864    setPaused(false);
865    while (!ready.empty()) {
866        uint16_t vbucket = ready.front();
867        ready.pop_front();
868
869        passive_stream_t stream = streams[vbucket];
870        if (!stream) {
871            continue;
872        }
873
874        DcpResponse* op = stream->next();
875        if (!op) {
876            continue;
877        }
878        switch (op->getEvent()) {
879            case DCP_STREAM_REQ:
880            case DCP_ADD_STREAM:
881            case DCP_SET_VBUCKET:
882            case DCP_SNAPSHOT_MARKER:
883                break;
884            default:
885                throw std::logic_error(
886                        std::string("DcpConsumer::getNextItem: ") + logHeader() +
887                        " is attempting to write an unexpected event: " +
888                        std::to_string(op->getEvent()));
889        }
890
891        ready.push_back(vbucket);
892        return op;
893    }
894    setPaused(true);
895
896    return NULL;
897}
898
899void DcpConsumer::notifyStreamReady(uint16_t vbucket) {
900    LockHolder lh(readyMutex);
901    std::list<uint16_t>::iterator iter =
902        std::find(ready.begin(), ready.end(), vbucket);
903    if (iter != ready.end()) {
904        return;
905    }
906
907    ready.push_back(vbucket);
908    lh.unlock();
909
910    notifyPaused(/*schedule*/true);
911}
912
913void DcpConsumer::streamAccepted(uint32_t opaque, uint16_t status, uint8_t* body,
914                                 uint32_t bodylen) {
915
916    opaque_map::iterator oitr = opaqueMap_.find(opaque);
917    if (oitr != opaqueMap_.end()) {
918        uint32_t add_opaque = oitr->second.first;
919        uint16_t vbucket = oitr->second.second;
920
921        passive_stream_t stream = streams[vbucket];
922        if (stream && stream->getOpaque() == opaque &&
923            stream->getState() == STREAM_PENDING) {
924            if (status == ENGINE_SUCCESS) {
925                RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
926                vb->failovers->replaceFailoverLog(body, bodylen);
927                EventuallyPersistentStore* st = engine_.getEpStore();
928                st->scheduleVBStatePersist(vbucket);
929            }
930            LOG(EXTENSION_LOG_INFO, "%s (vb %d) Add stream for opaque %" PRIu32
931                " %s with error code %d", logHeader(), vbucket, opaque,
932                status == ENGINE_SUCCESS ? "succeeded" : "failed", status);
933            stream->acceptStream(status, add_opaque);
934        } else {
935            LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Trying to add stream, but "
936                "none exists (opaque: %" PRIu32 ", add_opaque: %" PRIu32 ")",
937                logHeader(), vbucket, opaque, add_opaque);
938        }
939        opaqueMap_.erase(opaque);
940    } else {
941        LOG(EXTENSION_LOG_WARNING, "%s No opaque found for add stream response "
942            "with opaque %" PRIu32, logHeader(), opaque);
943    }
944}
945
946bool DcpConsumer::isValidOpaque(uint32_t opaque, uint16_t vbucket) {
947    passive_stream_t stream = streams[vbucket];
948    return stream && stream->getOpaque() == opaque;
949}
950
951void DcpConsumer::closeAllStreams() {
952    int max_vbuckets = engine_.getConfiguration().getMaxVbuckets();
953    for (int vbucket = 0; vbucket < max_vbuckets; vbucket++) {
954        passive_stream_t stream = streams[vbucket];
955        if (stream) {
956            stream->setDead(END_STREAM_DISCONNECTED);
957            streams[vbucket].reset();
958        }
959    }
960}
961
962void DcpConsumer::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
963    passive_stream_t stream = streams[vbucket];
964    if (stream) {
965        LOG(EXTENSION_LOG_INFO, "%s (vb %" PRIu16 ") State changed to "
966            "%s, closing passive stream!",
967            logHeader(), vbucket, VBucket::toString(state));
968        uint32_t bytesCleared = stream->setDead(END_STREAM_STATE);
969        flowControl.incrFreedBytes(bytesCleared);
970        streams[vbucket].reset();
971        notifyConsumerIfNecessary(true/*schedule*/);
972    }
973}
974
975ENGINE_ERROR_CODE DcpConsumer::handleNoop(struct dcp_message_producers* producers) {
976    if (pendingEnableNoop) {
977        ENGINE_ERROR_CODE ret;
978        uint32_t opaque = ++opaqueCounter;
979        std::string val("true");
980        EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
981        ret = producers->control(getCookie(), opaque,
982                                 noopCtrlMsg.c_str(), noopCtrlMsg.size(),
983                                 val.c_str(), val.size());
984        ObjectRegistry::onSwitchThread(epe);
985        pendingEnableNoop = false;
986        return ret;
987    }
988
989    if (pendingSendNoopInterval) {
990        ENGINE_ERROR_CODE ret;
991        uint32_t opaque = ++opaqueCounter;
992        std::string interval = std::to_string(noopInterval);
993        EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
994        ret = producers->control(getCookie(), opaque,
995                                 noopIntervalCtrlMsg.c_str(),
996                                 noopIntervalCtrlMsg.size(),
997                                 interval.c_str(), interval.size());
998        ObjectRegistry::onSwitchThread(epe);
999        pendingSendNoopInterval = false;
1000        return ret;
1001    }
1002
1003    if ((ep_current_time() - lastMessageTime) > (noopInterval * 2)) {
1004        LOG(EXTENSION_LOG_NOTICE, "%s Disconnecting because a message has "
1005            "not been received for %u seconds. lastMessageTime was %u seconds ago.",
1006            logHeader(), (noopInterval * 2), (ep_current_time() - lastMessageTime));
1007        return ENGINE_DISCONNECT;
1008    }
1009
1010    return ENGINE_FAILED;
1011}
1012
1013ENGINE_ERROR_CODE DcpConsumer::handlePriority(struct dcp_message_producers* producers) {
1014    if (pendingSetPriority) {
1015        ENGINE_ERROR_CODE ret;
1016        uint32_t opaque = ++opaqueCounter;
1017        std::string val("high");
1018        EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
1019        ret = producers->control(getCookie(), opaque,
1020                                 priorityCtrlMsg.c_str(), priorityCtrlMsg.size(),
1021                                 val.c_str(), val.size());
1022        ObjectRegistry::onSwitchThread(epe);
1023        pendingSetPriority = false;
1024        return ret;
1025    }
1026
1027    return ENGINE_FAILED;
1028}
1029
1030ENGINE_ERROR_CODE DcpConsumer::handleExtMetaData(struct dcp_message_producers* producers) {
1031    if (pendingEnableExtMetaData) {
1032        ENGINE_ERROR_CODE ret;
1033        uint32_t opaque = ++opaqueCounter;
1034        std::string val("true");
1035        EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
1036        ret = producers->control(getCookie(), opaque,
1037                                 extMetadataCtrlMsg.c_str(),
1038                                 extMetadataCtrlMsg.size(),
1039                                 val.c_str(), val.size());
1040        ObjectRegistry::onSwitchThread(epe);
1041        pendingEnableExtMetaData = false;
1042        return ret;
1043    }
1044
1045    return ENGINE_FAILED;
1046}
1047
1048ENGINE_ERROR_CODE DcpConsumer::handleValueCompression(struct dcp_message_producers* producers) {
1049    if (pendingEnableValueCompression) {
1050        ENGINE_ERROR_CODE ret;
1051        uint32_t opaque = ++opaqueCounter;
1052        std::string val("true");
1053        EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
1054        ret = producers->control(getCookie(), opaque,
1055                                 valueCompressionCtrlMsg.c_str(),
1056                                 valueCompressionCtrlMsg.size(),
1057                                 val.c_str(), val.size());
1058        ObjectRegistry::onSwitchThread(epe);
1059        pendingEnableValueCompression = false;
1060        return ret;
1061    }
1062
1063    return ENGINE_FAILED;
1064}
1065
1066ENGINE_ERROR_CODE DcpConsumer::supportCursorDropping(struct dcp_message_producers* producers) {
1067    if (pendingSupportCursorDropping) {
1068        ENGINE_ERROR_CODE ret;
1069        uint32_t opaque = ++opaqueCounter;
1070        std::string val("true");
1071        EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
1072        ret = producers->control(getCookie(), opaque,
1073                                 cursorDroppingCtrlMsg.c_str(),
1074                                 cursorDroppingCtrlMsg.size(),
1075                                 val.c_str(), val.size());
1076        ObjectRegistry::onSwitchThread(epe);
1077        pendingSupportCursorDropping = false;
1078        return ret;
1079    }
1080
1081    return ENGINE_FAILED;
1082}
1083
1084uint64_t DcpConsumer::incrOpaqueCounter()
1085{
1086    return (++opaqueCounter);
1087}
1088
1089uint32_t DcpConsumer::getFlowControlBufSize()
1090{
1091    return flowControl.getFlowControlBufSize();
1092}
1093
1094void DcpConsumer::setFlowControlBufSize(uint32_t newSize)
1095{
1096    flowControl.setFlowControlBufSize(newSize);
1097}
1098
1099const std::string& DcpConsumer::getControlMsgKey(void)
1100{
1101    return connBufferCtrlMsg;
1102}
1103
1104bool DcpConsumer::isStreamPresent(uint16_t vbucket)
1105{
1106    if (streams[vbucket] && streams[vbucket]->isActive()) {
1107        return true;
1108    }
1109    return false;
1110}
1111
1112void DcpConsumer::notifyConsumerIfNecessary(bool schedule) {
1113    if (flowControl.isBufferSufficientlyDrained()) {
1114        /**
1115         * Notify memcached to get flow control buffer ack out.
1116         * We cannot wait till the ConnManager daemon task notifies
1117         * the memcached as it would cause delay in buffer ack being
1118         * sent out to the producer.
1119         */
1120        notifyPaused(schedule);
1121    }
1122}
1123
1124void DcpConsumer::notifyPaused(bool schedule) {
1125    engine_.getDcpConnMap().notifyPausedConnection(this, schedule);
1126}
1127