xref: /6.6.0/kv_engine/engines/ep/src/dcp/dcpconnmap.h (revision 166a75af)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#pragma once
19
20#include "connmap.h"
21#include "ep_types.h"
22
23#include <memcached/engine.h>
24#include <platform/sized_buffer.h>
25
26#include <folly/SharedMutex.h>
27#include <atomic>
28#include <list>
29#include <string>
30
31class CheckpointCursor;
32class DcpProducer;
33class DcpConsumer;
34
35class DcpConnMap : public ConnMap {
36
37public:
38
39    DcpConnMap(EventuallyPersistentEngine &engine);
40
41    ~DcpConnMap();
42
43    /**
44     * Find or build a dcp connection for the given cookie and with
45     * the given name.
46     * @param cookie The cookie representing the client
47     * @param name The name of the connection
48     * @param flags The DCP open flags (as per protocol)
49     * @param jsonExtra An optional JSON document for additional configuration
50     */
51    DcpProducer* newProducer(const void* cookie,
52                             const std::string& name,
53                             uint32_t flags);
54
55    /**
56     * Create a new consumer and add it in the list of DCP Connections
57     *
58     * @param cookie the cookie representing the client
59     * @param name The name of the connection
60     * @param consumerName (Optional) If non-empty an identifier for the
61     *        consumer to advertise itself to the producer as.
62     * @return Pointer to the new dcp connection
63     */
64    DcpConsumer* newConsumer(const void* cookie,
65                             const std::string& name,
66                             const std::string& consumerName = {});
67
68    void notifyVBConnections(Vbid vbid,
69                             uint64_t bySeqno,
70                             SyncWriteOperation syncWrite);
71
72    /**
73     * Send a SeqnoAck message over the PassiveStream for the given VBucket.
74     *
75     * @param vbid
76     * @param seqno The payload
77     */
78    void seqnoAckVBPassiveStream(Vbid vbid, int64_t seqno);
79
80    void notifyBackfillManagerTasks();
81
82    void removeVBConnections(DcpProducer& prod);
83
84    /**
85     * Close outbound (active) streams for a vbucket whenever a state
86     * change is detected. In case of failovers, close inbound (passive)
87     * streams as well.
88     *
89     * @param vbucket the vbucket id
90     * @param state the new state of the vbucket
91     * @closeInboundStreams bool flag indicating failover
92     * @param vbstateLock (optional) Exclusive lock to vbstate
93     */
94    void vbucketStateChanged(
95            Vbid vbucket,
96            vbucket_state_t state,
97            bool closeInboundStreams = true,
98            boost::optional<folly::SharedMutex::WriteHolder&> vbstateLock = {});
99
100    /**
101     * Close outbound (active) streams for a vbucket on vBucket rollback.
102     *
103     * @param vbucket the vbucket id
104     */
105    void closeStreamsDueToRollback(Vbid vbucket);
106
107    void shutdownAllConnections();
108
109    bool isDeadConnectionsEmpty() {
110        LockHolder lh(connsLock);
111        return deadConnections.empty();
112    }
113
114    /**
115     * Handles the slow stream with the specified name.
116     * Returns true if the stream dropped its cursors on the
117     * checkpoint.
118     */
119    bool handleSlowStream(Vbid vbid, const CheckpointCursor* cursor);
120
121    void disconnect(const void *cookie);
122
123    void manageConnections();
124
125    bool canAddBackfillToActiveQ();
126
127    void decrNumActiveSnoozingBackfills();
128
129    void updateMaxActiveSnoozingBackfills(size_t maxDataSize);
130
131    uint16_t getNumActiveSnoozingBackfills () {
132        std::lock_guard<std::mutex> lh(backfills.mutex);
133        return backfills.numActiveSnoozing;
134    }
135
136    uint16_t getMaxActiveSnoozingBackfills () {
137        std::lock_guard<std::mutex> lh(backfills.mutex);
138        return backfills.maxActiveSnoozing;
139    }
140
141    ENGINE_ERROR_CODE addPassiveStream(ConnHandler& conn,
142                                       uint32_t opaque,
143                                       Vbid vbucket,
144                                       uint32_t flags);
145
146    /* Use this only for any quick direct stats from DcpConnMap. To collect
147       individual conn stats from conn lists please use ConnStatBuilder */
148    void addStats(const AddStatFn& add_stat, const void* c);
149
150    /* Updates the minimum compression ratio to be achieved for docs by
151     * all the producers, which will be in effect if the producer side
152     * value compression is enabled */
153    void updateMinCompressionRatioForProducers(float value);
154
155    float getMinCompressionRatio();
156
157    std::shared_ptr<ConnHandler> findByName(const std::string& name);
158
159    bool isConnections() {
160        LockHolder lh(connsLock);
161        return !map_.empty();
162    }
163
164    /**
165     * Call a function on each DCP connection.
166     */
167    template <typename Fun>
168    void each(Fun f) {
169        LockHolder lh(connsLock);
170        for (auto& c : map_) {
171            f(c.second);
172        }
173    }
174
175protected:
176    /*
177     * deadConnections is protected (as opposed to private) because
178     * of the module test ep-engine_dead_connections_test
179     */
180    std::list<std::shared_ptr<ConnHandler>> deadConnections;
181
182    /*
183     * Change the value at which a DcpConsumer::Processor task will yield
184     */
185    void consumerYieldConfigChanged(size_t newValue);
186
187    /*
188     * Change the batchsize that the DcpConsumer::Processor operates with
189     */
190    void consumerBatchSizeConfigChanged(size_t newValue);
191
192    /**
193     * Change the idle timeout that Producers and Consumers operate with
194     */
195    void idleTimeoutConfigChanged(size_t newValue);
196
197    /**
198     * Enable/disable the logging on FTS connections
199     */
200    void blacklistFtsConnectionLogsConfigChanged(bool newValue);
201
202    /**
203     * @param engine The engine
204     * @param cookie The cookie that identifies the connection
205     * @param connName The name that identifies the connection
206     * @param consumerName The name that identifies the consumer
207     * @return a shared instance of DcpConsumer
208     */
209    virtual std::shared_ptr<DcpConsumer> makeConsumer(
210            EventuallyPersistentEngine& engine,
211            const void* cookie,
212            const std::string& connName,
213            const std::string& consumerName) const;
214
215    bool isPassiveStreamConnected_UNLOCKED(Vbid vbucket);
216
217    /*
218     * Closes all streams associated with each connection in `map`.
219     */
220    static void closeStreams(CookieToConnectionMap& map);
221
222    /*
223     * Cancels all tasks assocuated with each connection in `map`.
224     */
225    static void cancelTasks(CookieToConnectionMap& map);
226
227    /* Db file memory */
228    static const uint32_t dbFileMem;
229
230    // Current and maximum number of backfills which are snoozing.
231    struct {
232        std::mutex mutex;
233        uint16_t numActiveSnoozing;
234        uint16_t maxActiveSnoozing;
235    } backfills;
236
237    /* Max num of backfills we want to have irrespective of memory */
238    static const uint16_t numBackfillsThreshold;
239    /* Max percentage of memory we want backfills to occupy */
240    static const uint8_t numBackfillsMemThreshold;
241
242    std::atomic<float> minCompressionRatioForProducer;
243
244    /* Total memory used by all DCP consumer buffers */
245    std::atomic<size_t> aggrDcpConsumerBufferSize;
246
247    class DcpConfigChangeListener;
248};
249