1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include "backfill.h"
21#include "ep_engine.h"
22#include "failover-table.h"
23#include "dcp-producer.h"
24#include "dcp-response.h"
25#include "dcp-stream.h"
26
27const uint32_t DcpProducer::defaultNoopInerval = 20;
28
29void BufferLog::insert(DcpResponse* response) {
30    cb_assert(!isFull());
31    bytes_sent += response->getMessageSize();
32}
33
34void BufferLog::free(uint32_t bytes_to_free) {
35    if (bytes_sent >= bytes_to_free) {
36        bytes_sent -= bytes_to_free;
37    } else {
38        bytes_sent = 0;
39    }
40}
41
42DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
43                         const std::string &name, bool isNotifier)
44    : Producer(e, cookie, name), rejectResp(NULL),
45      notifyOnly(isNotifier), lastSendTime(ep_current_time()), log(NULL),
46      itemsSent(0), totalBytesSent(0), ackedBytes(0) {
47    setSupportAck(true);
48    setReserved(true);
49    setPaused(true);
50
51    if (notifyOnly) {
52        setLogHeader("DCP (Notifier) " + getName() + " -");
53    } else {
54        setLogHeader("DCP (Producer) " + getName() + " -");
55    }
56
57    if (getName().find("replication") != std::string::npos) {
58        engine_.setDCPPriority(getCookie(), CONN_PRIORITY_HIGH);
59    } else if (getName().find("xdcr") != std::string::npos) {
60        engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
61    } else if (getName().find("views") != std::string::npos) {
62        engine_.setDCPPriority(getCookie(), CONN_PRIORITY_MED);
63    }
64
65    // The consumer assigns opaques starting at 0 so lets have the producer
66    //start using opaques at 10M to prevent any opaque conflicts.
67    noopCtx.opaque = 10000000;
68    noopCtx.sendTime = ep_current_time();
69
70    // This is for backward compatibility with Couchbase 3.0. In 3.0 we set the
71    // noop interval to 20 seconds by default, but in post 3.0 releases we set
72    // it to be higher by default. Starting in 3.0.1 the DCP consumer sets the
73    // noop interval of the producer when connecting so in an all 3.0.1+ cluster
74    // this value will be overriden. In 3.0 however we do not set the noop
75    // interval so setting this value will make sure we don't disconnect on
76    // accident due to the producer and the consumer having a different noop
77    // interval.
78    noopCtx.noopInterval = defaultNoopInerval;
79    noopCtx.pendingRecv = false;
80    noopCtx.enabled = false;
81}
82
83DcpProducer::~DcpProducer() {
84    if (log) {
85        delete log;
86    }
87}
88
89ENGINE_ERROR_CODE DcpProducer::streamRequest(uint32_t flags,
90                                             uint32_t opaque,
91                                             uint16_t vbucket,
92                                             uint64_t start_seqno,
93                                             uint64_t end_seqno,
94                                             uint64_t vbucket_uuid,
95                                             uint64_t snap_start_seqno,
96                                             uint64_t snap_end_seqno,
97                                             uint64_t *rollback_seqno,
98                                             dcp_add_failover_log callback) {
99    if (doDisconnect()) {
100        return ENGINE_DISCONNECT;
101    }
102
103    LockHolder lh(queueLock);
104    RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
105    if (!vb) {
106        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
107            "this vbucket doesn't exist", logHeader(), vbucket);
108        return ENGINE_NOT_MY_VBUCKET;
109    }
110
111    if (vb->checkpointManager.getOpenCheckpointId() == 0) {
112        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
113            "this vbucket is in backfill state", logHeader(), vbucket);
114        return ENGINE_TMPFAIL;
115    }
116
117    if (flags & DCP_ADD_STREAM_FLAG_LATEST) {
118        end_seqno = vb->getHighSeqno();
119    }
120
121    if (flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
122        end_seqno = engine_.getEpStore()->getLastPersistedSeqno(vbucket);
123    }
124
125    if (!notifyOnly && start_seqno > end_seqno) {
126        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
127            "the start seqno (%llu) is larger than the end seqno (%llu)",
128            logHeader(), vbucket, start_seqno, end_seqno);
129        return ENGINE_ERANGE;
130    }
131
132    if (!notifyOnly && !(snap_start_seqno <= start_seqno &&
133        start_seqno <= snap_end_seqno)) {
134        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed because "
135            "the snap start seqno (%llu) <= start seqno (%llu) <= snap end "
136            "seqno (%llu) is required", logHeader(), vbucket, snap_start_seqno,
137            start_seqno, snap_end_seqno);
138        return ENGINE_ERANGE;
139    }
140
141    bool add_vb_conn_map = true;
142    std::map<uint16_t, stream_t>::iterator itr;
143    if ((itr = streams.find(vbucket)) != streams.end()) {
144        if (itr->second->getState() != STREAM_DEAD) {
145            LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed"
146                " because a stream already exists for this vbucket",
147                logHeader(), vbucket);
148            return ENGINE_KEY_EEXISTS;
149        } else {
150            streams.erase(vbucket);
151            ready.remove(vbucket);
152            // Don't need to add an entry to vbucket-to-conns map
153            add_vb_conn_map = false;
154        }
155    }
156
157    // If we are a notify stream then we can't use the start_seqno supplied
158    // since if it is greater than the current high seqno then it will always
159    // trigger a rollback. As a result we should use the current high seqno for
160    // rollback purposes.
161    uint64_t notifySeqno = start_seqno;
162    if (notifyOnly && start_seqno > static_cast<uint64_t>(vb->getHighSeqno())) {
163        start_seqno = static_cast<uint64_t>(vb->getHighSeqno());
164    }
165
166    if (vb->failovers->needsRollback(start_seqno, vb->getHighSeqno(),
167                                     vbucket_uuid, snap_start_seqno,
168                                     snap_end_seqno, vb->getPurgeSeqno(),
169                                     rollback_seqno)) {
170        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Stream request failed "
171            "because a rollback to seqno %llu is required (start seqno %llu, "
172            "vb_uuid %llu, snapStartSeqno %llu, snapEndSeqno %llu)",
173            logHeader(), vbucket, *rollback_seqno, start_seqno, vbucket_uuid,
174            snap_start_seqno, snap_end_seqno);
175        return ENGINE_ROLLBACK;
176    }
177
178    ENGINE_ERROR_CODE rv = vb->failovers->addFailoverLog(getCookie(), callback);
179    if (rv != ENGINE_SUCCESS) {
180        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Couldn't add failover log to "
181            "stream request due to error %d", logHeader(), vbucket, rv);
182        return rv;
183    }
184
185    if (notifyOnly) {
186        streams[vbucket] = new NotifierStream(&engine_, this, getName(), flags,
187                                              opaque, vbucket, notifySeqno,
188                                              end_seqno, vbucket_uuid,
189                                              snap_start_seqno, snap_end_seqno);
190    } else {
191        streams[vbucket] = new ActiveStream(&engine_, this, getName(), flags,
192                                            opaque, vbucket, start_seqno,
193                                            end_seqno, vbucket_uuid,
194                                            snap_start_seqno, snap_end_seqno);
195        static_cast<ActiveStream*>(streams[vbucket].get())->setActive();
196    }
197
198    ready.push_back(vbucket);
199    lh.unlock();
200    if (add_vb_conn_map) {
201        connection_t conn(this);
202        engine_.getDcpConnMap().addVBConnByVBId(conn, vbucket);
203    }
204
205    return rv;
206}
207
208ENGINE_ERROR_CODE DcpProducer::getFailoverLog(uint32_t opaque, uint16_t vbucket,
209                                              dcp_add_failover_log callback) {
210    (void) opaque;
211    if (doDisconnect()) {
212        return ENGINE_DISCONNECT;
213    }
214
215    RCPtr<VBucket> vb = engine_.getVBucket(vbucket);
216    if (!vb) {
217        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Get Failover Log failed "
218            "because this vbucket doesn't exist", logHeader(), vbucket);
219        return ENGINE_NOT_MY_VBUCKET;
220    }
221
222    return vb->failovers->addFailoverLog(getCookie(), callback);
223}
224
225ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
226    setLastWalkTime();
227
228    if (doDisconnect()) {
229        return ENGINE_DISCONNECT;
230    }
231
232    ENGINE_ERROR_CODE ret;
233    if ((ret = maybeSendNoop(producers)) != ENGINE_FAILED) {
234        return ret;
235    }
236
237    DcpResponse *resp;
238    if (rejectResp) {
239        resp = rejectResp;
240        rejectResp = NULL;
241    } else {
242        resp = getNextItem();
243        if (!resp) {
244            return ENGINE_SUCCESS;
245        }
246    }
247
248    ret = ENGINE_SUCCESS;
249
250    Item* itmCpy = NULL;
251    if (resp->getEvent() == DCP_MUTATION) {
252        itmCpy = static_cast<MutationResponse*>(resp)->getItemCopy();
253    }
254
255    EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL,
256                                                                     true);
257    switch (resp->getEvent()) {
258        case DCP_STREAM_END:
259        {
260            StreamEndResponse *se = static_cast<StreamEndResponse*>(resp);
261            ret = producers->stream_end(getCookie(), se->getOpaque(),
262                                        se->getVbucket(), se->getFlags());
263            break;
264        }
265        case DCP_MUTATION:
266        {
267            MutationResponse *m = dynamic_cast<MutationResponse*> (resp);
268            ret = producers->mutation(getCookie(), m->getOpaque(), itmCpy,
269                                      m->getVBucket(), m->getBySeqno(),
270                                      m->getRevSeqno(), 0, NULL, 0,
271                                      m->getItem()->getNRUValue());
272            break;
273        }
274        case DCP_DELETION:
275        {
276            MutationResponse *m = static_cast<MutationResponse*>(resp);
277            ret = producers->deletion(getCookie(), m->getOpaque(),
278                                      m->getItem()->getKey().c_str(),
279                                      m->getItem()->getNKey(),
280                                      m->getItem()->getCas(),
281                                      m->getVBucket(), m->getBySeqno(),
282                                      m->getRevSeqno(), NULL, 0);
283            break;
284        }
285        case DCP_SNAPSHOT_MARKER:
286        {
287            SnapshotMarker *s = static_cast<SnapshotMarker*>(resp);
288            ret = producers->marker(getCookie(), s->getOpaque(),
289                                    s->getVBucket(),
290                                    s->getStartSeqno(),
291                                    s->getEndSeqno(),
292                                    s->getFlags());
293            break;
294        }
295        case DCP_SET_VBUCKET:
296        {
297            SetVBucketState *s = static_cast<SetVBucketState*>(resp);
298            ret = producers->set_vbucket_state(getCookie(), s->getOpaque(),
299                                               s->getVBucket(), s->getState());
300            break;
301        }
302        default:
303        {
304            LOG(EXTENSION_LOG_WARNING, "%s Unexpected dcp event (%d), "
305                "disconnecting", logHeader(), resp->getEvent());
306            ret = ENGINE_DISCONNECT;
307            break;
308        }
309    }
310
311    ObjectRegistry::onSwitchThread(epe);
312    if (resp->getEvent() == DCP_MUTATION && ret != ENGINE_SUCCESS) {
313        delete itmCpy;
314    }
315
316    if (ret == ENGINE_E2BIG) {
317        rejectResp = resp;
318    } else {
319        delete resp;
320    }
321
322    lastSendTime = ep_current_time();
323    return (ret == ENGINE_SUCCESS) ? ENGINE_WANT_MORE : ret;
324}
325
326ENGINE_ERROR_CODE DcpProducer::bufferAcknowledgement(uint32_t opaque,
327                                                     uint16_t vbucket,
328                                                     uint32_t buffer_bytes) {
329    LockHolder lh(queueLock);
330    if (log) {
331        bool wasFull = log->isFull();
332
333        ackedBytes.fetch_add(buffer_bytes);
334        log->free(buffer_bytes);
335        lh.unlock();
336
337        if (wasFull) {
338            engine_.getDcpConnMap().notifyPausedConnection(this, true);
339        }
340    }
341
342    return ENGINE_SUCCESS;
343}
344
345ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
346                                       uint16_t nkey, const void* value,
347                                       uint32_t nvalue) {
348    LockHolder lh(queueLock);
349    const char* param = static_cast<const char*>(key);
350    std::string keyStr(static_cast<const char*>(key), nkey);
351    std::string valueStr(static_cast<const char*>(value), nvalue);
352
353    if (strncmp(param, "connection_buffer_size", nkey) == 0) {
354        uint32_t size;
355        if (parseUint32(valueStr.c_str(), &size)) {
356            if (!log) {
357                log = new BufferLog(size);
358            } else if (log->getBufferSize() != size) {
359                log->setBufferSize(size);
360            }
361            return ENGINE_SUCCESS;
362        }
363    } else if (strncmp(param, "stream_buffer_size", nkey) == 0) {
364        LOG(EXTENSION_LOG_WARNING, "%s The ctrl parameter stream_buffer_size is"
365            "not supported by this engine", logHeader());
366        return ENGINE_ENOTSUP;
367    } else if (strncmp(param, "enable_noop", nkey) == 0) {
368        if (valueStr.compare("true") == 0) {
369            noopCtx.enabled = true;
370        } else {
371            noopCtx.enabled = false;
372        }
373        return ENGINE_SUCCESS;
374    } else if (strncmp(param, "set_noop_interval", nkey) == 0) {
375        if (parseUint32(valueStr.c_str(), &noopCtx.noopInterval)) {
376            return ENGINE_SUCCESS;
377        }
378    }
379
380    LOG(EXTENSION_LOG_WARNING, "%s Invalid ctrl parameter '%s' for %s",
381        logHeader(), valueStr.c_str(), keyStr.c_str());
382
383    return ENGINE_EINVAL;
384}
385
386ENGINE_ERROR_CODE DcpProducer::handleResponse(
387                                        protocol_binary_response_header *resp) {
388    if (doDisconnect()) {
389        return ENGINE_DISCONNECT;
390    }
391
392    uint8_t opcode = resp->response.opcode;
393    if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE ||
394        opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
395        protocol_binary_response_dcp_stream_req* pkt =
396            reinterpret_cast<protocol_binary_response_dcp_stream_req*>(resp);
397        uint32_t opaque = pkt->message.header.response.opaque;
398
399        LockHolder lh(queueLock);
400        stream_t active_stream;
401        std::map<uint16_t, stream_t>::iterator itr;
402        for (itr = streams.begin() ; itr != streams.end(); ++itr) {
403            active_stream = itr->second;
404            Stream *str = active_stream.get();
405            if (str && str->getType() == STREAM_ACTIVE) {
406                ActiveStream* as = static_cast<ActiveStream*>(str);
407                if (as && opaque == itr->second->getOpaque()) {
408                    break;
409                }
410            }
411        }
412
413        if (itr != streams.end()) {
414            lh.unlock();
415            ActiveStream *as = static_cast<ActiveStream*>(active_stream.get());
416            if (opcode == PROTOCOL_BINARY_CMD_DCP_SET_VBUCKET_STATE) {
417                as->setVBucketStateAckRecieved();
418            } else if (opcode == PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER) {
419                as->snapshotMarkerAckReceived();
420            }
421        }
422
423        return ENGINE_SUCCESS;
424    } else if (opcode == PROTOCOL_BINARY_CMD_DCP_MUTATION ||
425        opcode == PROTOCOL_BINARY_CMD_DCP_DELETION ||
426        opcode == PROTOCOL_BINARY_CMD_DCP_EXPIRATION ||
427        opcode == PROTOCOL_BINARY_CMD_DCP_STREAM_END) {
428        // TODO: When nacking is implemented we need to handle these responses
429        return ENGINE_SUCCESS;
430    } else if (opcode == PROTOCOL_BINARY_CMD_DCP_NOOP) {
431        if (noopCtx.opaque == resp->response.opaque) {
432            noopCtx.pendingRecv = false;
433            return ENGINE_SUCCESS;
434        }
435    }
436
437    LOG(EXTENSION_LOG_WARNING, "%s Trying to handle an unknown response %d, "
438        "disconnecting", logHeader(), opcode);
439
440    return ENGINE_DISCONNECT;
441}
442
443ENGINE_ERROR_CODE DcpProducer::closeStream(uint32_t opaque, uint16_t vbucket) {
444    if (doDisconnect()) {
445        return ENGINE_DISCONNECT;
446    }
447
448    LockHolder lh(queueLock);
449    std::map<uint16_t, stream_t>::iterator itr;
450    if ((itr = streams.find(vbucket)) == streams.end()) {
451        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because no "
452            "stream exists for this vbucket", logHeader(), vbucket);
453        return ENGINE_KEY_ENOENT;
454    } else if (!itr->second->isActive()) {
455        LOG(EXTENSION_LOG_WARNING, "%s (vb %d) Cannot close stream because "
456            "stream is already marked as dead", logHeader(), vbucket);
457        streams.erase(vbucket);
458        ready.remove(vbucket);
459        lh.unlock();
460        connection_t conn(this);
461        engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
462        return ENGINE_KEY_ENOENT;
463    }
464
465    stream_t stream = itr->second;
466    streams.erase(vbucket);
467    ready.remove(vbucket);
468    lh.unlock();
469
470    stream->setDead(END_STREAM_CLOSED);
471    connection_t conn(this);
472    engine_.getDcpConnMap().removeVBConnByVBId(conn, vbucket);
473    return ENGINE_SUCCESS;
474}
475
476void DcpProducer::addStats(ADD_STAT add_stat, const void *c) {
477    Producer::addStats(add_stat, c);
478
479    LockHolder lh(queueLock);
480
481    addStat("items_sent", getItemsSent(), add_stat, c);
482    addStat("items_remaining", getItemsRemaining_UNLOCKED(), add_stat, c);
483    addStat("total_bytes_sent", getTotalBytes(), add_stat, c);
484    addStat("last_sent_time", lastSendTime, add_stat, c);
485    addStat("noop_enabled", noopCtx.enabled, add_stat, c);
486    addStat("noop_wait", noopCtx.pendingRecv, add_stat, c);
487
488    if (log) {
489        addStat("max_buffer_bytes", log->getBufferSize(), add_stat, c);
490        addStat("unacked_bytes", log->getBytesSent(), add_stat, c);
491        addStat("total_acked_bytes", ackedBytes, add_stat, c);
492        addStat("flow_control", "enabled", add_stat, c);
493    } else {
494        addStat("flow_control", "disabled", add_stat, c);
495    }
496
497    std::map<uint16_t, stream_t>::iterator itr;
498    for (itr = streams.begin(); itr != streams.end(); ++itr) {
499        itr->second->addStats(add_stat, c);
500    }
501}
502
503void DcpProducer::addTakeoverStats(ADD_STAT add_stat, const void* c,
504                                   uint16_t vbid) {
505    LockHolder lh(queueLock);
506    std::map<uint16_t, stream_t>::iterator itr = streams.find(vbid);
507    if (itr != streams.end()) {
508        Stream *s = itr->second.get();
509        if (s && s->getType() == STREAM_ACTIVE) {
510            ActiveStream* as = static_cast<ActiveStream*>(s);
511            if (as) {
512                as->addTakeoverStats(add_stat, c);
513            }
514        }
515    }
516}
517
518void DcpProducer::aggregateQueueStats(ConnCounter* aggregator) {
519    LockHolder lh(queueLock);
520    if (!aggregator) {
521        LOG(EXTENSION_LOG_WARNING, "%s Pointer to the queue stats aggregator"
522            " is NULL!!!", logHeader());
523        return;
524    }
525    aggregator->conn_queueDrain += itemsSent;
526    aggregator->conn_totalBytes += totalBytesSent;
527    aggregator->conn_queueRemaining += getItemsRemaining_UNLOCKED();
528    aggregator->conn_queueBackfillRemaining += totalBackfillBacklogs;
529}
530
531void DcpProducer::notifySeqnoAvailable(uint16_t vbucket, uint64_t seqno) {
532    LockHolder lh(queueLock);
533    std::map<uint16_t, stream_t>::iterator itr = streams.find(vbucket);
534    if (itr != streams.end() && itr->second->isActive()) {
535        stream_t stream = itr->second;
536        lh.unlock();
537        stream->notifySeqnoAvailable(seqno);
538    }
539}
540
541void DcpProducer::vbucketStateChanged(uint16_t vbucket, vbucket_state_t state) {
542    LockHolder lh(queueLock);
543    std::map<uint16_t, stream_t>::iterator itr = streams.find(vbucket);
544    if (itr != streams.end()) {
545        stream_t stream = itr->second;
546        lh.unlock();
547        stream->setDead(END_STREAM_STATE);
548    }
549}
550
551void DcpProducer::closeAllStreams() {
552    LockHolder lh(queueLock);
553    std::list<uint16_t> vblist;
554    while (!streams.empty()) {
555        std::map<uint16_t, stream_t>::iterator itr = streams.begin();
556        uint16_t vbid = itr->first;
557        itr->second->setDead(END_STREAM_DISCONNECTED);
558        streams.erase(vbid);
559        ready.remove(vbid);
560        vblist.push_back(vbid);
561    }
562    lh.unlock();
563
564    connection_t conn(this);
565    std::list<uint16_t>::iterator it = vblist.begin();
566    for (; it != vblist.end(); ++it) {
567        engine_.getDcpConnMap().removeVBConnByVBId(conn, *it);
568    }
569}
570
571const char* DcpProducer::getType() const {
572    if (notifyOnly) {
573        return "notifier";
574    } else {
575        return "producer";
576    }
577}
578
579DcpResponse* DcpProducer::getNextItem() {
580    LockHolder lh(queueLock);
581
582    setPaused(false);
583    while (!ready.empty()) {
584        if (log && log->isFull()) {
585            setPaused(true);
586            return NULL;
587        }
588
589        uint16_t vbucket = ready.front();
590        ready.pop_front();
591
592        if (streams.find(vbucket) == streams.end()) {
593            continue;
594        }
595        DcpResponse* op = streams[vbucket]->next();
596        if (!op) {
597            continue;
598        }
599
600        switch (op->getEvent()) {
601            case DCP_SNAPSHOT_MARKER:
602            case DCP_MUTATION:
603            case DCP_DELETION:
604            case DCP_EXPIRATION:
605            case DCP_STREAM_END:
606            case DCP_SET_VBUCKET:
607                break;
608            default:
609                LOG(EXTENSION_LOG_WARNING, "%s Producer is attempting to write"
610                    " an unexpected event %d", logHeader(), op->getEvent());
611                abort();
612        }
613
614        if (log) {
615            log->insert(op);
616        }
617        ready.push_back(vbucket);
618
619        if (op->getEvent() == DCP_MUTATION || op->getEvent() == DCP_DELETION ||
620            op->getEvent() == DCP_EXPIRATION) {
621            itemsSent++;
622        }
623
624        totalBytesSent = totalBytesSent + op->getMessageSize();
625
626        return op;
627    }
628
629    setPaused(true);
630    return NULL;
631}
632
633void DcpProducer::setDisconnect(bool disconnect) {
634    ConnHandler::setDisconnect(disconnect);
635
636    if (disconnect) {
637        LockHolder lh(queueLock);
638        std::map<uint16_t, stream_t>::iterator itr = streams.begin();
639        for (; itr != streams.end(); ++itr) {
640            itr->second->setDead(END_STREAM_DISCONNECTED);
641        }
642    }
643}
644
645void DcpProducer::notifyStreamReady(uint16_t vbucket, bool schedule) {
646    LockHolder lh(queueLock);
647
648    std::list<uint16_t>::iterator iter =
649        std::find(ready.begin(), ready.end(), vbucket);
650    if (iter != ready.end()) {
651        return;
652    }
653
654    ready.push_back(vbucket);
655    lh.unlock();
656
657    if (!log || (log && !log->isFull())) {
658        engine_.getDcpConnMap().notifyPausedConnection(this, schedule);
659    }
660}
661
662ENGINE_ERROR_CODE DcpProducer::maybeSendNoop(struct dcp_message_producers* producers) {
663    if (noopCtx.enabled) {
664        size_t sinceTime = ep_current_time() - noopCtx.sendTime;
665        if (noopCtx.pendingRecv && sinceTime > noopCtx.noopInterval) {
666            LOG(EXTENSION_LOG_WARNING, "%s Disconnected because the connection"
667                " appears to be dead", logHeader());
668            return ENGINE_DISCONNECT;
669        } else if (!noopCtx.pendingRecv && sinceTime > noopCtx.noopInterval) {
670            ENGINE_ERROR_CODE ret;
671            EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
672            ret = producers->noop(getCookie(), ++noopCtx.opaque);
673            ObjectRegistry::onSwitchThread(epe);
674
675            if (ret == ENGINE_SUCCESS) {
676                ret = ENGINE_WANT_MORE;
677            }
678            noopCtx.pendingRecv = true;
679            noopCtx.sendTime = ep_current_time();
680            lastSendTime = ep_current_time();
681            return ret;
682        }
683    }
684    return ENGINE_FAILED;
685}
686
687bool DcpProducer::isTimeForNoop() {
688    // Not Implemented
689    return false;
690}
691
692void DcpProducer::setTimeForNoop() {
693    // Not Implemented
694}
695
696void DcpProducer::clearQueues() {
697    LockHolder lh(queueLock);
698    std::map<uint16_t, stream_t>::iterator itr = streams.begin();
699    for (; itr != streams.end(); ++itr) {
700        itr->second->clear();
701    }
702}
703
704void DcpProducer::appendQueue(std::list<queued_item> *q) {
705    (void) q;
706    abort(); // Not Implemented
707}
708
709size_t DcpProducer::getBackfillQueueSize() {
710    return totalBackfillBacklogs;
711}
712
713size_t DcpProducer::getItemsSent() {
714    return itemsSent;
715}
716
717size_t DcpProducer::getItemsRemaining_UNLOCKED() {
718    size_t remainingSize = 0;
719
720    std::map<uint16_t, stream_t>::iterator itr = streams.begin();
721    for (; itr != streams.end(); ++itr) {
722        Stream *s = (itr->second).get();
723
724        if (s->getType() == STREAM_ACTIVE) {
725            ActiveStream *as = static_cast<ActiveStream *>(s);
726            remainingSize += as->getItemsRemaining();
727        }
728    }
729
730    return remainingSize;
731}
732
733size_t DcpProducer::getTotalBytes() {
734    return totalBytesSent;
735}
736
737std::list<uint16_t> DcpProducer::getVBList() {
738    LockHolder lh(queueLock);
739    std::list<uint16_t> vblist;
740    std::map<uint16_t, stream_t>::iterator itr = streams.begin();
741    for (; itr != streams.end(); ++itr) {
742        vblist.push_back(itr->first);
743    }
744    return vblist;
745}
746
747bool DcpProducer::windowIsFull() {
748    abort(); // Not Implemented
749}
750
751void DcpProducer::flush() {
752    abort(); // Not Implemented
753}
754