xref: /6.6.0/kv_engine/engines/ep/src/dcp/dcpconnmap.cc (revision 166a75af)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "dcpconnmap.h"
19#include "bucket_logger.h"
20#include "configuration.h"
21#include "conn_notifier.h"
22#include "dcp/consumer.h"
23#include "dcp/producer.h"
24#include "ep_engine.h"
25#include "statwriter.h"
26#include <daemon/tracing.h>
27#include <memcached/server_cookie_iface.h>
28#include <memcached/vbucket.h>
29#include <phosphor/phosphor.h>
30
31const uint32_t DcpConnMap::dbFileMem = 10 * 1024;
32const uint16_t DcpConnMap::numBackfillsThreshold = 4096;
33const uint8_t DcpConnMap::numBackfillsMemThreshold = 1;
34
35class DcpConnMap::DcpConfigChangeListener : public ValueChangedListener {
36public:
37    DcpConfigChangeListener(DcpConnMap& connMap);
38    virtual ~DcpConfigChangeListener() {
39    }
40    virtual void sizeValueChanged(const std::string& key, size_t value);
41    virtual void booleanValueChanged(const std::string& key, bool value);
42
43private:
44    DcpConnMap& myConnMap;
45};
46
47DcpConnMap::DcpConnMap(EventuallyPersistentEngine &e)
48    : ConnMap(e),
49      aggrDcpConsumerBufferSize(0) {
50    backfills.numActiveSnoozing = 0;
51    updateMaxActiveSnoozingBackfills(engine.getEpStats().getMaxDataSize());
52    minCompressionRatioForProducer.store(
53                    engine.getConfiguration().getDcpMinCompressionRatio());
54
55    // Note: these allocations are deleted by ~Configuration
56    engine.getConfiguration().addValueChangedListener(
57            "dcp_blacklist_fts_connection_logs",
58            std::make_unique<DcpConfigChangeListener>(*this));
59    engine.getConfiguration().addValueChangedListener(
60            "dcp_consumer_process_buffered_messages_yield_limit",
61            std::make_unique<DcpConfigChangeListener>(*this));
62    engine.getConfiguration().addValueChangedListener(
63            "dcp_consumer_process_buffered_messages_batch_size",
64            std::make_unique<DcpConfigChangeListener>(*this));
65    engine.getConfiguration().addValueChangedListener(
66            "dcp_idle_timeout",
67            std::make_unique<DcpConfigChangeListener>(*this));
68}
69
70DcpConnMap::~DcpConnMap() {
71    EP_LOG_INFO("Deleted dcpConnMap_");
72}
73
74DcpConsumer* DcpConnMap::newConsumer(const void* cookie,
75                                     const std::string& name,
76                                     const std::string& consumerName) {
77    LockHolder lh(connsLock);
78
79    std::string conn_name("eq_dcpq:");
80    conn_name.append(name);
81
82    const auto& iter = map_.find(cookie);
83    if (iter != map_.end()) {
84        iter->second->setDisconnect();
85        EP_LOG_INFO(
86                "Failed to create Dcp Consumer because connection "
87                "({}) already exists.",
88                cookie);
89        return nullptr;
90    }
91
92    /*
93     *  If we request a connection of the same name then
94     *  mark the existing connection as "want to disconnect".
95     */
96    for (const auto& cookieToConn : map_) {
97        if (cookieToConn.second->getName() == conn_name) {
98            EP_LOG_INFO(
99                    "{} Disconnecting existing Dcp Consumer {} as it has the "
100                    "same "
101                    "name as a new connection {}",
102                    cookieToConn.second->logHeader(),
103                    cookieToConn.first,
104                    cookie);
105            cookieToConn.second->setDisconnect();
106        }
107    }
108
109    auto consumer = makeConsumer(engine, cookie, conn_name, consumerName);
110    EP_LOG_DEBUG("{} Connection created", consumer->logHeader());
111    auto* rawPtr = consumer.get();
112    map_[cookie] = std::move(consumer);
113    return rawPtr;
114}
115
116std::shared_ptr<DcpConsumer> DcpConnMap::makeConsumer(
117        EventuallyPersistentEngine& engine,
118        const void* cookie,
119        const std::string& connName,
120        const std::string& consumerName) const {
121    return std::make_shared<DcpConsumer>(
122            engine, cookie, connName, consumerName);
123}
124
125bool DcpConnMap::isPassiveStreamConnected_UNLOCKED(Vbid vbucket) {
126    for (const auto& cookieToConn : map_) {
127        auto* dcpConsumer =
128                dynamic_cast<DcpConsumer*>(cookieToConn.second.get());
129        if (dcpConsumer && dcpConsumer->isStreamPresent(vbucket)) {
130            EP_LOG_DEBUG(
131                    "({}) A DCP passive stream "
132                    "is already exists for the vbucket in connection: {}",
133                    vbucket,
134                    dcpConsumer->logHeader());
135            return true;
136        }
137    }
138    return false;
139}
140
141ENGINE_ERROR_CODE DcpConnMap::addPassiveStream(ConnHandler& conn,
142                                               uint32_t opaque,
143                                               Vbid vbucket,
144                                               uint32_t flags) {
145    LockHolder lh(connsLock);
146    /* Check if a stream (passive) for the vbucket is already present */
147    if (isPassiveStreamConnected_UNLOCKED(vbucket)) {
148        EP_LOG_WARN(
149                "{} ({}) Failing to add passive stream, "
150                "as one already exists for the vbucket!",
151                conn.logHeader(),
152                vbucket);
153        return ENGINE_KEY_EEXISTS;
154    }
155
156    return conn.addStream(opaque, vbucket, flags);
157}
158
159DcpProducer* DcpConnMap::newProducer(const void* cookie,
160                                     const std::string& name,
161                                     uint32_t flags) {
162    DcpProducer* result{nullptr};
163    std::shared_ptr<ConnHandler> oldConn;
164    {
165        LockHolder lh(connsLock);
166
167        std::string conn_name("eq_dcpq:");
168        conn_name.append(name);
169
170        const auto it = map_.find(cookie);
171        if (it != map_.end()) {
172            oldConn = it->second;
173            oldConn->flagDisconnect();
174            EP_LOG_INFO(
175                    "Failed to create Dcp Producer because connection "
176                    "({}) already exists.",
177                    cookie);
178        } else {
179            // If we request a connection of the same name then mark the
180            // existing connection as "want to disconnect" and pull it out from
181            // the conn-map. Note (MB-36915): Just flag the connection here,
182            // defer stream-shutdown to oldConn->setDisconnect below (ie, after
183            // we release the connLock), potential deadlock by lock-inversion
184            // with KVBucket::setVBucketState otherwise (on connLock /
185            // vbstateLock).
186            for (const auto& cookieToConn : map_) {
187                if (cookieToConn.second->getName() == conn_name) {
188                    const auto* oldCookie = cookieToConn.first;
189                    oldConn = cookieToConn.second;
190                    EP_LOG_INFO(
191                            "{} Disconnecting existing Dcp Producer {} as it "
192                            "has the "
193                            "same "
194                            "name as a new connection {}",
195                            cookieToConn.second->logHeader(),
196                            oldCookie,
197                            cookie);
198                    oldConn->flagDisconnect();
199
200                    // Note: I thought that we need to 'map_.erase(oldCookie)'
201                    // here, the connection would stale in connMap forever
202                    // otherwise. But that is not the case. Memcached will call
203                    // down the disconnect-handle for oldCookie, which will be
204                    // correctly removed from conn-map. Any attempt to remove it
205                    // here will lead to issues described MB-36451.
206                }
207            }
208
209            auto producer = std::make_shared<DcpProducer>(
210                    engine, cookie, conn_name, flags, true /*startTask*/);
211            EP_LOG_DEBUG("{} Connection created", producer->logHeader());
212            result = producer.get();
213            map_[cookie] = std::move(producer);
214        }
215    }
216
217    if (oldConn) {
218        oldConn->setDisconnect();
219    }
220
221    return result;
222}
223
224void DcpConnMap::shutdownAllConnections() {
225    EP_LOG_INFO("Shutting down dcp connections!");
226
227    if (connNotifier_ != NULL) {
228        connNotifier_->stop();
229        manageConnections();
230    }
231
232    // Take a copy of the connection map (under lock), then using the
233    // copy iterate across closing all streams and cancelling any
234    // tasks.
235    // We do this so we don't hold the connsLock when calling
236    // notifyPaused() on producer streams, as that would create a lock
237    // cycle between connLock, worker thread lock and releaseLock.
238    CookieToConnectionMap mapCopy;
239    {
240        LockHolder lh(connsLock);
241        mapCopy = map_;
242    }
243
244    closeStreams(mapCopy);
245    cancelTasks(mapCopy);
246}
247
248void DcpConnMap::vbucketStateChanged(
249        Vbid vbucket,
250        vbucket_state_t state,
251        bool closeInboundStreams,
252        boost::optional<folly::SharedMutex::WriteHolder&> vbstateLock) {
253    LockHolder lh(connsLock);
254    for (const auto& cookieToConn : map_) {
255        auto* producer = dynamic_cast<DcpProducer*>(cookieToConn.second.get());
256        if (producer) {
257            producer->closeStreamDueToVbStateChange(
258                    vbucket, state, vbstateLock);
259        } else if (closeInboundStreams) {
260            static_cast<DcpConsumer*>(cookieToConn.second.get())
261                    ->closeStreamDueToVbStateChange(vbucket, state);
262        }
263    }
264}
265
266void DcpConnMap::closeStreamsDueToRollback(Vbid vbucket) {
267    LockHolder lh(connsLock);
268    for (const auto& cookieToConn : map_) {
269        auto* producer = dynamic_cast<DcpProducer*>(cookieToConn.second.get());
270        if (producer) {
271            producer->closeStreamDueToRollback(vbucket);
272        }
273    }
274}
275
276bool DcpConnMap::handleSlowStream(Vbid vbid, const CheckpointCursor* cursor) {
277    size_t lock_num = vbid.get() % vbConnLockNum;
278    std::lock_guard<std::mutex> lh(vbConnLocks[lock_num]);
279
280    for (const auto& weakPtr : vbConns[vbid.get()]) {
281        auto connection = weakPtr.lock();
282        if (!connection) {
283            continue;
284        }
285        auto* producer = dynamic_cast<DcpProducer*>(connection.get());
286        if (producer && producer->handleSlowStream(vbid, cursor)) {
287            return true;
288        }
289    }
290    return false;
291}
292
293void DcpConnMap::closeStreams(CookieToConnectionMap& map) {
294    for (const auto& itr : map) {
295        // Mark the connection as disconnected. This function is called during
296        // the bucket shutdown path and if we don't do so then we could allow a
297        // Producer to accept a racing StreamRequest during shutdown. When
298        // memcached runs the connection again it will be disconnected anyways
299        // as it will see that the bucket is being shut down.
300        itr.second->flagDisconnect();
301        auto producer = dynamic_pointer_cast<DcpProducer>(itr.second);
302        if (producer) {
303            producer->closeAllStreams();
304            producer->cancelCheckpointCreatorTask();
305            // The producer may be in EWOULDBLOCK (if it's idle), therefore
306            // notify him to ensure the front-end connection can close the TCP
307            // connection.
308            producer->immediatelyNotify();
309        } else {
310            auto consumer = dynamic_pointer_cast<DcpConsumer>(itr.second);
311            if (consumer) {
312                consumer->closeAllStreams();
313                // The consumer may be in EWOULDBLOCK (if it's idle), therefore
314                // notify him to ensure the front-end connection can close the
315                // TCP connection.
316                consumer->immediatelyNotify();
317            }
318        }
319    }
320}
321
322void DcpConnMap::cancelTasks(CookieToConnectionMap& map) {
323    for (auto itr : map) {
324        auto consumer = dynamic_pointer_cast<DcpConsumer>(itr.second);
325        if (consumer) {
326            consumer->cancelTask();
327        }
328    }
329}
330
331void DcpConnMap::disconnect(const void *cookie) {
332    // Move the connection matching this cookie from the map_
333    // data structure (under connsLock).
334    std::shared_ptr<ConnHandler> conn;
335    {
336        LockHolder lh(connsLock);
337        auto itr(map_.find(cookie));
338        if (itr != map_.end()) {
339            conn = itr->second;
340            if (conn.get()) {
341                auto* epe = ObjectRegistry::onSwitchThread(nullptr, true);
342                if (epe) {
343                    auto conn_desc =
344                         epe->getServerApi()->cookie->get_log_info(cookie).second;
345                    conn->getLogger().info("Removing connection {}", conn_desc);
346                } else {
347                    conn->getLogger().info("Removing connection {}", cookie);
348                }
349                ObjectRegistry::onSwitchThread(epe);
350                // MB-36557: Just flag the connection as disconnected, defer
351                // streams-shutdown (ie, close-stream + notify-connection) to
352                // disconnectConn below (ie, after we release the connLock).
353                // Potential deadlock by lock-inversion with
354                // KVBucket::setVBucketState otherwise (on connLock /
355                // vbstateLock).
356                conn->flagDisconnect();
357                map_.erase(itr);
358            }
359        }
360    }
361
362    // Note we shutdown the stream *not* under the connsLock; this is
363    // because as part of closing a DcpConsumer stream we need to
364    // acquire PassiveStream::buffer.bufMutex; and that could deadlock
365    // in EPBucket::setVBucketState, via
366    // PassiveStream::processBufferedMessages.
367    if (conn) {
368        auto producer = std::dynamic_pointer_cast<DcpProducer>(conn);
369        if (producer) {
370            producer->closeAllStreams();
371            producer->cancelCheckpointCreatorTask();
372        } else {
373            // Cancel consumer's processer task before closing all streams
374            auto consumer = std::dynamic_pointer_cast<DcpConsumer>(conn);
375            consumer->cancelTask();
376            consumer->closeAllStreams();
377            consumer->scheduleNotify();
378        }
379    }
380
381    // Finished disconnecting the stream; add it to the
382    // deadConnections list.
383    if (conn) {
384        LockHolder lh(connsLock);
385        deadConnections.push_back(conn);
386    }
387}
388
389void DcpConnMap::manageConnections() {
390    std::list<std::shared_ptr<ConnHandler>> release;
391    std::list<std::shared_ptr<ConnHandler>> toNotify;
392    {
393        LockHolder lh(connsLock);
394        while (!deadConnections.empty()) {
395            release.push_back(deadConnections.front());
396            deadConnections.pop_front();
397        }
398
399        // Collect the list of connections that need to be signaled.
400        for (const auto& cookieToConn : map_) {
401            const auto& conn = cookieToConn.second;
402            if (conn && (conn->isPaused() || conn->doDisconnect()) &&
403                conn->isReserved()) {
404                /**
405                 * Note: We want to send a notify even if we have sent one
406                 * previously i.e. tp->sentNotify() == true.  The reason for this
407                 * is manageConnections is used to notify idle connections once a
408                 * second.  This results in the step function being invoked,
409                 * which in turn may result in a dcp noop message being sent.
410                 */
411                toNotify.push_back(conn);
412            }
413        }
414    }
415
416    TRACE_LOCKGUARD_TIMED(releaseLock,
417                          "mutex",
418                          "DcpConnMap::manageConnections::releaseLock",
419                          SlowMutexThreshold);
420
421    for (auto it = toNotify.begin(); it != toNotify.end(); ++it) {
422        if ((*it).get() && (*it)->isReserved()) {
423            engine.notifyIOComplete((*it)->getCookie(), ENGINE_SUCCESS);
424        }
425    }
426
427    while (!release.empty()) {
428        auto conn = release.front();
429        conn->releaseReference();
430        release.pop_front();
431        auto prod = dynamic_pointer_cast<DcpProducer>(conn);
432        if (prod) {
433            removeVBConnections(*prod);
434        }
435    }
436}
437
438void DcpConnMap::removeVBConnections(DcpProducer& prod) {
439    for (const auto vbid : prod.getVBVector()) {
440        size_t lock_num = vbid.get() % vbConnLockNum;
441        std::lock_guard<std::mutex> lh(vbConnLocks[lock_num]);
442        auto& vb_conns = vbConns[vbid.get()];
443        for (auto itr = vb_conns.begin(); itr != vb_conns.end();) {
444            auto connection = (*itr).lock();
445            if (!connection) {
446                // ConnHandler no longer exists, cleanup.
447                itr = vb_conns.erase(itr);
448            } else if (prod.getCookie() == connection->getCookie()) {
449                // Found conn with matching cookie, done.
450                vb_conns.erase(itr);
451                break;
452            } else {
453                ++itr;
454            }
455        }
456    }
457}
458
459void DcpConnMap::notifyVBConnections(Vbid vbid,
460                                     uint64_t bySeqno,
461                                     SyncWriteOperation syncWrite) {
462    size_t lock_num = vbid.get() % vbConnLockNum;
463    std::lock_guard<std::mutex> lh(vbConnLocks[lock_num]);
464
465    for (auto& weakPtr : vbConns[vbid.get()]) {
466        auto connection = weakPtr.lock();
467        if (!connection) {
468            continue;
469        }
470        auto* producer = dynamic_cast<DcpProducer*>(connection.get());
471        if (producer) {
472            producer->notifySeqnoAvailable(vbid, bySeqno, syncWrite);
473        }
474    }
475}
476
477void DcpConnMap::seqnoAckVBPassiveStream(Vbid vbid, int64_t seqno) {
478    // Note: logically we should only have one Consumer per vBucket but
479    // we may keep around old Consumers with either no PassiveStream for
480    // this vBucket or a dead PassiveStream. We need to search the list of
481    // ConnHandlers for the Consumer with the alive PassiveStream for this
482    // vBucket. We will just ack all alive Consumers and allow the Consumer to
483    // determine if it needs to do any work.
484    size_t index = vbid.get() % vbConnLockNum;
485    std::lock_guard<std::mutex> lg(vbConnLocks[index]);
486    for (auto& weakPtr : vbConns[vbid.get()]) {
487        auto connection = weakPtr.lock();
488        if (!connection) {
489            continue;
490        }
491        auto consumer = dynamic_pointer_cast<DcpConsumer>(connection);
492        if (consumer) {
493            // Note: Sync Repl enabled at Consumer only if Producer supports it.
494            //     This is to prevent that 6.5 Consumers send DCP_SEQNO_ACK to
495            //     pre-6.5 Producers (e.g., topology change in a 6.5 cluster
496            //     where a new pre-6.5 Active is elected).
497            if (consumer->isSyncReplicationEnabled()) {
498                consumer->seqnoAckStream(vbid, seqno);
499            }
500        }
501    }
502}
503
504void DcpConnMap::notifyBackfillManagerTasks() {
505    LockHolder lh(connsLock);
506    for (const auto& cookieToConn : map_) {
507        auto* producer = dynamic_cast<DcpProducer*>(cookieToConn.second.get());
508        if (producer) {
509            producer->notifyBackfillManager();
510        }
511    }
512}
513
514bool DcpConnMap::canAddBackfillToActiveQ()
515{
516    std::lock_guard<std::mutex> lh(backfills.mutex);
517    if (backfills.numActiveSnoozing < backfills.maxActiveSnoozing) {
518        ++backfills.numActiveSnoozing;
519        return true;
520    }
521    return false;
522}
523
524void DcpConnMap::decrNumActiveSnoozingBackfills()
525{
526    {
527        std::lock_guard<std::mutex> lh(backfills.mutex);
528        if (backfills.numActiveSnoozing > 0) {
529            --backfills.numActiveSnoozing;
530            return;
531        }
532    }
533    EP_LOG_WARN("ActiveSnoozingBackfills already zero!!!");
534}
535
536void DcpConnMap::updateMaxActiveSnoozingBackfills(size_t maxDataSize)
537{
538    double numBackfillsMemThresholdPercent =
539                         static_cast<double>(numBackfillsMemThreshold)/100;
540    size_t max = maxDataSize * numBackfillsMemThresholdPercent / dbFileMem;
541
542    uint16_t newMaxActive;
543    {
544        std::lock_guard<std::mutex> lh(backfills.mutex);
545        /* We must have atleast one active/snoozing backfill */
546        backfills.maxActiveSnoozing =
547                std::max(static_cast<size_t>(1),
548                         std::min(max, static_cast<size_t>(numBackfillsThreshold)));
549        newMaxActive = backfills.maxActiveSnoozing;
550    }
551    EP_LOG_DEBUG("Max active snoozing backfills set to {}", newMaxActive);
552}
553
554void DcpConnMap::addStats(const AddStatFn& add_stat, const void* c) {
555    LockHolder lh(connsLock);
556    add_casted_stat("ep_dcp_dead_conn_count", deadConnections.size(), add_stat,
557                    c);
558}
559
560void DcpConnMap::updateMinCompressionRatioForProducers(float value) {
561    minCompressionRatioForProducer.store(value);
562}
563
564float DcpConnMap::getMinCompressionRatio() {
565    return minCompressionRatioForProducer.load();
566}
567
568DcpConnMap::DcpConfigChangeListener::DcpConfigChangeListener(DcpConnMap& connMap)
569    : myConnMap(connMap){}
570
571void DcpConnMap::DcpConfigChangeListener::sizeValueChanged(const std::string &key,
572                                                           size_t value) {
573    if (key == "dcp_consumer_process_buffered_messages_yield_limit") {
574        myConnMap.consumerYieldConfigChanged(value);
575    } else if (key == "dcp_consumer_process_buffered_messages_batch_size") {
576        myConnMap.consumerBatchSizeConfigChanged(value);
577    } else if (key == "dcp_idle_timeout") {
578        myConnMap.idleTimeoutConfigChanged(value);
579    }
580}
581
582void DcpConnMap::DcpConfigChangeListener::booleanValueChanged(const std::string& key,
583                                                           bool value) {
584    if (key == "dcp_blacklist_fts_connection_logs") {
585        myConnMap.blacklistFtsConnectionLogsConfigChanged(value);
586    }
587}
588
589/*
590 * Find all DcpConsumers and set the yield threshold
591 */
592void DcpConnMap::consumerYieldConfigChanged(size_t newValue) {
593    LockHolder lh(connsLock);
594    for (const auto& cookieToConn : map_) {
595        auto* dcpConsumer =
596                dynamic_cast<DcpConsumer*>(cookieToConn.second.get());
597        if (dcpConsumer) {
598            dcpConsumer->setProcessorYieldThreshold(newValue);
599        }
600    }
601}
602
603/*
604 * Find all DcpConsumers and set the processor batchsize
605 */
606void DcpConnMap::consumerBatchSizeConfigChanged(size_t newValue) {
607    LockHolder lh(connsLock);
608    for (const auto& cookieToConn : map_) {
609        auto* dcpConsumer =
610                dynamic_cast<DcpConsumer*>(cookieToConn.second.get());
611        if (dcpConsumer) {
612            dcpConsumer->setProcessBufferedMessagesBatchSize(newValue);
613        }
614    }
615}
616
617void DcpConnMap::idleTimeoutConfigChanged(size_t newValue) {
618    LockHolder lh(connsLock);
619    for (const auto& cookieToConn : map_) {
620        cookieToConn.second.get()->setIdleTimeout(
621                std::chrono::seconds(newValue));
622    }
623}
624
625/**
626 * Find all DcpProducers and set the blacklistFtsConnectionsLogs setting
627 */
628void DcpConnMap::blacklistFtsConnectionLogsConfigChanged(bool newValue) {
629    LockHolder lh(connsLock);
630    for (const auto& cookieToConn : map_) {
631        auto* dcpProducer =
632            dynamic_cast<DcpProducer*>(cookieToConn.second.get());
633        if (dcpProducer) {
634            dcpProducer->setBlacklistFtsConnectionLogs(newValue);
635        }
636    }
637}
638
639std::shared_ptr<ConnHandler> DcpConnMap::findByName(const std::string& name) {
640    LockHolder lh(connsLock);
641    for (const auto& cookieToConn : map_) {
642        // If the connection is NOT about to be disconnected
643        // and the names match
644        if (!cookieToConn.second->doDisconnect() &&
645            cookieToConn.second->getName() == name) {
646            return cookieToConn.second;
647        }
648    }
649    return nullptr;
650}
651